001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP; 021import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ; 022import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE; 023import static org.junit.jupiter.api.Assertions.assertEquals; 024import static org.junit.jupiter.api.Assertions.assertFalse; 025import static org.junit.jupiter.api.Assertions.assertNotEquals; 026import static org.junit.jupiter.api.Assertions.assertTrue; 027import static org.junit.jupiter.api.Assertions.fail; 028 029import java.io.ByteArrayOutputStream; 030import java.io.DataOutputStream; 031import java.io.IOException; 032import java.io.OutputStream; 033import java.nio.ByteBuffer; 034import java.util.ArrayList; 035import java.util.Collections; 036import java.util.HashMap; 037import java.util.List; 038import java.util.Map; 039import java.util.Optional; 040import java.util.Random; 041import java.util.concurrent.Callable; 042import java.util.concurrent.ExecutionException; 043import java.util.concurrent.Executor; 044import java.util.concurrent.ExecutorCompletionService; 045import java.util.concurrent.Executors; 046import java.util.concurrent.Future; 047import java.util.stream.Stream; 048import org.apache.hadoop.conf.Configuration; 049import org.apache.hadoop.fs.FSDataInputStream; 050import org.apache.hadoop.fs.FSDataOutputStream; 051import org.apache.hadoop.fs.FileSystem; 052import org.apache.hadoop.fs.Path; 053import org.apache.hadoop.hbase.ArrayBackedTag; 054import org.apache.hadoop.hbase.CellComparatorImpl; 055import org.apache.hadoop.hbase.CellUtil; 056import org.apache.hadoop.hbase.HBaseConfiguration; 057import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate; 058import org.apache.hadoop.hbase.HBaseTestingUtil; 059import org.apache.hadoop.hbase.HConstants; 060import org.apache.hadoop.hbase.KeyValue; 061import org.apache.hadoop.hbase.Tag; 062import org.apache.hadoop.hbase.fs.HFileSystem; 063import org.apache.hadoop.hbase.io.ByteBuffAllocator; 064import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; 065import org.apache.hadoop.hbase.io.compress.Compression; 066import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 067import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 068import org.apache.hadoop.hbase.nio.ByteBuff; 069import org.apache.hadoop.hbase.nio.MultiByteBuff; 070import org.apache.hadoop.hbase.nio.SingleByteBuff; 071import org.apache.hadoop.hbase.testclassification.IOTests; 072import org.apache.hadoop.hbase.testclassification.LargeTests; 073import org.apache.hadoop.hbase.util.Bytes; 074import org.apache.hadoop.hbase.util.ChecksumType; 075import org.apache.hadoop.hbase.util.ClassSize; 076import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 077import org.apache.hadoop.io.WritableUtils; 078import org.apache.hadoop.io.compress.Compressor; 079import org.junit.jupiter.api.AfterEach; 080import org.junit.jupiter.api.BeforeEach; 081import org.junit.jupiter.api.TestTemplate; 082import org.junit.jupiter.params.provider.Arguments; 083import org.mockito.Mockito; 084import org.slf4j.Logger; 085import org.slf4j.LoggerFactory; 086 087@org.junit.jupiter.api.Tag(IOTests.TAG) 088@org.junit.jupiter.api.Tag(LargeTests.TAG) 089@HBaseParameterizedTestTemplate( 090 name = "{index}: includesMemstoreTS={0}, includesTag={1}, useHeapAllocator={2}") 091public class TestHFileBlock { 092 093 // change this value to activate more logs 094 private static final boolean detailedLogging = false; 095 private static final boolean[] BOOLEAN_VALUES = new boolean[] { false, true }; 096 097 private static final Logger LOG = LoggerFactory.getLogger(TestHFileBlock.class); 098 099 static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = { NONE, GZ }; 100 101 private static final int NUM_TEST_BLOCKS = 1000; 102 private static final int NUM_READER_THREADS = 26; 103 private static final int MAX_BUFFER_COUNT = 2048; 104 105 // Used to generate KeyValues 106 private static int NUM_KEYVALUES = 50; 107 private static int FIELD_LENGTH = 10; 108 private static float CHANCE_TO_REPEAT = 0.6f; 109 110 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 111 private static final Random RNG = new Random(); // This test depends on Random#setSeed 112 private FileSystem fs; 113 114 private final boolean includesMemstoreTS; 115 private final boolean includesTag; 116 private final boolean useHeapAllocator; 117 private final ByteBuffAllocator alloc; 118 119 public TestHFileBlock(boolean includesMemstoreTS, boolean includesTag, boolean useHeapAllocator) { 120 this.includesMemstoreTS = includesMemstoreTS; 121 this.includesTag = includesTag; 122 this.useHeapAllocator = useHeapAllocator; 123 this.alloc = useHeapAllocator ? HEAP : createOffHeapAlloc(); 124 assertAllocator(); 125 } 126 127 public static Stream<Arguments> parameters() { 128 List<Arguments> params = new ArrayList<>(); 129 for (int i = 0; i < (1 << 3); i++) { 130 boolean v0 = (i & (1 << 0)) != 0; 131 boolean v1 = (i & (1 << 1)) != 0; 132 boolean v2 = (i & (1 << 2)) != 0; 133 params.add(Arguments.of(v0, v1, v2)); 134 } 135 return params.stream(); 136 } 137 138 private ByteBuffAllocator createOffHeapAlloc() { 139 Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 140 conf.setInt(ByteBuffAllocator.MAX_BUFFER_COUNT_KEY, MAX_BUFFER_COUNT); 141 conf.setInt(ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY, 0); 142 ByteBuffAllocator alloc = ByteBuffAllocator.create(conf, true); 143 // Fill the allocator 144 List<ByteBuff> bufs = new ArrayList<>(); 145 for (int i = 0; i < MAX_BUFFER_COUNT; i++) { 146 ByteBuff bb = alloc.allocateOneBuffer(); 147 assertTrue(!bb.hasArray()); 148 bufs.add(bb); 149 } 150 bufs.forEach(ByteBuff::release); 151 return alloc; 152 } 153 154 private void assertAllocator() { 155 if (!useHeapAllocator) { 156 assertEquals(MAX_BUFFER_COUNT, alloc.getFreeBufferCount()); 157 } 158 } 159 160 @BeforeEach 161 public void setUp() throws IOException { 162 fs = HFileSystem.get(TEST_UTIL.getConfiguration()); 163 } 164 165 @AfterEach 166 public void tearDown() throws IOException { 167 assertAllocator(); 168 alloc.clean(); 169 } 170 171 static void writeTestBlockContents(DataOutputStream dos) throws IOException { 172 // This compresses really well. 173 for (int i = 0; i < 1000; ++i) 174 dos.writeInt(i / 100); 175 } 176 177 static int writeTestKeyValues(HFileBlock.Writer hbw, int seed, boolean includesMemstoreTS, 178 boolean useTag) throws IOException { 179 List<KeyValue> keyValues = new ArrayList<>(); 180 181 // generate keyValues 182 RNG.setSeed(42); // just any fixed number 183 for (int i = 0; i < NUM_KEYVALUES; ++i) { 184 byte[] row; 185 long timestamp; 186 byte[] family; 187 byte[] qualifier; 188 byte[] value; 189 190 // generate it or repeat, it should compress well 191 if (0 < i && RNG.nextFloat() < CHANCE_TO_REPEAT) { 192 row = CellUtil.cloneRow(keyValues.get(RNG.nextInt(keyValues.size()))); 193 } else { 194 row = new byte[FIELD_LENGTH]; 195 RNG.nextBytes(row); 196 } 197 if (0 == i) { 198 family = new byte[FIELD_LENGTH]; 199 RNG.nextBytes(family); 200 } else { 201 family = CellUtil.cloneFamily(keyValues.get(0)); 202 } 203 if (0 < i && RNG.nextFloat() < CHANCE_TO_REPEAT) { 204 qualifier = CellUtil.cloneQualifier(keyValues.get(RNG.nextInt(keyValues.size()))); 205 } else { 206 qualifier = new byte[FIELD_LENGTH]; 207 RNG.nextBytes(qualifier); 208 } 209 if (0 < i && RNG.nextFloat() < CHANCE_TO_REPEAT) { 210 value = CellUtil.cloneValue(keyValues.get(RNG.nextInt(keyValues.size()))); 211 } else { 212 value = new byte[FIELD_LENGTH]; 213 RNG.nextBytes(value); 214 } 215 if (0 < i && RNG.nextFloat() < CHANCE_TO_REPEAT) { 216 timestamp = keyValues.get(RNG.nextInt(keyValues.size())).getTimestamp(); 217 } else { 218 timestamp = RNG.nextLong(); 219 } 220 if (!useTag) { 221 keyValues.add(new KeyValue(row, family, qualifier, timestamp, value)); 222 } else { 223 keyValues.add(new KeyValue(row, family, qualifier, timestamp, value, 224 new Tag[] { new ArrayBackedTag((byte) 1, Bytes.toBytes("myTagVal")) })); 225 } 226 } 227 228 // sort it and write to stream 229 int totalSize = 0; 230 Collections.sort(keyValues, CellComparatorImpl.COMPARATOR); 231 232 for (KeyValue kv : keyValues) { 233 totalSize += kv.getLength(); 234 if (includesMemstoreTS) { 235 long memstoreTS = RNG.nextLong(); 236 kv.setSequenceId(memstoreTS); 237 totalSize += WritableUtils.getVIntSize(memstoreTS); 238 } 239 hbw.write(kv); 240 } 241 return totalSize; 242 } 243 244 public byte[] createTestV1Block(Compression.Algorithm algo) throws IOException { 245 Compressor compressor = algo.getCompressor(); 246 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 247 OutputStream os = algo.createCompressionStream(baos, compressor, 0); 248 DataOutputStream dos = new DataOutputStream(os); 249 BlockType.META.write(dos); // Let's make this a meta block. 250 writeTestBlockContents(dos); 251 dos.flush(); 252 algo.returnCompressor(compressor); 253 return baos.toByteArray(); 254 } 255 256 static HFileBlock.Writer createTestV2Block(Compression.Algorithm algo, boolean includesMemstoreTS, 257 boolean includesTag) throws IOException { 258 final BlockType blockType = BlockType.DATA; 259 HFileContext meta = new HFileContextBuilder().withCompression(algo) 260 .withIncludesMvcc(includesMemstoreTS).withIncludesTags(includesTag) 261 .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM).build(); 262 HFileBlock.Writer hbw = new HFileBlock.Writer(TEST_UTIL.getConfiguration(), null, meta); 263 DataOutputStream dos = hbw.startWriting(blockType); 264 writeTestBlockContents(dos); 265 dos.flush(); 266 hbw.ensureBlockReady(); 267 assertEquals(1000 * 4, hbw.getUncompressedSizeWithoutHeader()); 268 hbw.release(); 269 return hbw; 270 } 271 272 public String createTestBlockStr(Compression.Algorithm algo, int correctLength, boolean useTag) 273 throws IOException { 274 HFileBlock.Writer hbw = createTestV2Block(algo, includesMemstoreTS, useTag); 275 byte[] testV2Block = hbw.getHeaderAndDataForTest(); 276 int osOffset = HConstants.HFILEBLOCK_HEADER_SIZE + 9; 277 if (testV2Block.length == correctLength) { 278 // Force-set the "OS" field of the gzip header to 3 (Unix) to avoid 279 // variations across operating systems. 280 // See http://www.gzip.org/zlib/rfc-gzip.html for gzip format. 281 // We only make this change when the compressed block length matches. 282 // Otherwise, there are obviously other inconsistencies. 283 testV2Block[osOffset] = 3; 284 } 285 return Bytes.toStringBinary(testV2Block); 286 } 287 288 @TestTemplate 289 public void testNoCompression() throws IOException { 290 CacheConfig cacheConf = Mockito.mock(CacheConfig.class); 291 Mockito.when(cacheConf.getBlockCache()).thenReturn(Optional.empty()); 292 293 HFileBlock block = 294 createTestV2Block(NONE, includesMemstoreTS, false).getBlockForCaching(cacheConf); 295 assertEquals(4000, block.getUncompressedSizeWithoutHeader()); 296 assertEquals(4004, block.getOnDiskSizeWithoutHeader()); 297 assertTrue(block.isUnpacked()); 298 } 299 300 @TestTemplate 301 public void testGzipCompression() throws IOException { 302 // @formatter:off 303 String correctTestBlockStr = "DATABLK*\\x00\\x00\\x00>\\x00\\x00\\x0F\\xA0\\xFF\\xFF\\xFF\\xFF" 304 + "\\xFF\\xFF\\xFF\\xFF" 305 + "\\x0" + ChecksumType.getDefaultChecksumType().getCode() 306 + "\\x00\\x00@\\x00\\x00\\x00\\x00[" 307 // gzip-compressed block: http://www.gzip.org/zlib/rfc-gzip.html 308 + "\\x1F\\x8B" // gzip magic signature 309 + "\\x08" // Compression method: 8 = "deflate" 310 + "\\x00" // Flags 311 + "\\x00\\x00\\x00\\x00" // mtime 312 + "\\x00" // XFL (extra flags) 313 // OS (0 = FAT filesystems, 3 = Unix). However, this field 314 // sometimes gets set to 0 on Linux and Mac, so we reset it to 3. 315 // This appears to be a difference caused by the availability 316 // (and use) of the native GZ codec. 317 + "\\x03" 318 + "\\xED\\xC3\\xC1\\x11\\x00 \\x08\\xC00DD\\xDD\\x7Fa" 319 + "\\xD6\\xE8\\xA3\\xB9K\\x84`\\x96Q\\xD3\\xA8\\xDB\\xA8e\\xD4c" 320 + "\\xD46\\xEA5\\xEA3\\xEA7\\xE7\\x00LI\\x5Cs\\xA0\\x0F\\x00\\x00" 321 + "\\x00\\x00\\x00\\x00"; // 4 byte checksum (ignored) 322 // @formatter:on 323 int correctGzipBlockLength = 95; 324 String testBlockStr = createTestBlockStr(GZ, correctGzipBlockLength, false); 325 // We ignore the block checksum because createTestBlockStr can change the 326 // gzip header after the block is produced 327 assertEquals(correctTestBlockStr.substring(0, correctGzipBlockLength - 4), 328 testBlockStr.substring(0, correctGzipBlockLength - 4)); 329 } 330 331 @TestTemplate 332 public void testReaderV2() throws IOException { 333 testReaderV2Internals(); 334 } 335 336 private void assertRelease(HFileBlock blk) { 337 if (blk instanceof ExclusiveMemHFileBlock) { 338 assertFalse(blk.release()); 339 } else { 340 assertTrue(blk.release()); 341 } 342 } 343 344 protected void testReaderV2Internals() throws IOException { 345 final Configuration conf = TEST_UTIL.getConfiguration(); 346 if (includesTag) { 347 conf.setInt("hfile.format.version", 3); 348 } 349 for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) { 350 for (boolean pread : new boolean[] { false, true }) { 351 LOG.info("testReaderV2: Compression algorithm: " + algo + ", pread=" + pread); 352 Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_" + algo); 353 FSDataOutputStream os = fs.create(path); 354 HFileContext meta = new HFileContextBuilder().withCompression(algo) 355 .withIncludesMvcc(includesMemstoreTS).withIncludesTags(includesTag) 356 .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM).build(); 357 HFileBlock.Writer hbw = new HFileBlock.Writer(conf, null, meta); 358 long totalSize = 0; 359 for (int blockId = 0; blockId < 2; ++blockId) { 360 DataOutputStream dos = hbw.startWriting(BlockType.DATA); 361 for (int i = 0; i < 1234; ++i) 362 dos.writeInt(i); 363 hbw.writeHeaderAndData(os); 364 totalSize += hbw.getOnDiskSizeWithHeader(); 365 } 366 os.close(); 367 368 FSDataInputStream is = fs.open(path); 369 meta = 370 new HFileContextBuilder().withHBaseCheckSum(true).withIncludesMvcc(includesMemstoreTS) 371 .withIncludesTags(includesTag).withCompression(algo).build(); 372 ReaderContext context = 373 new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(is)) 374 .withFileSize(totalSize).withFilePath(path).withFileSystem(fs).build(); 375 HFileBlock.FSReader hbr = 376 new HFileBlock.FSReaderImpl(context, meta, alloc, TEST_UTIL.getConfiguration()); 377 HFileBlock b = hbr.readBlockData(0, -1, pread, false, true); 378 is.close(); 379 assertEquals(0, HFile.getAndResetChecksumFailuresCount()); 380 381 b.sanityCheck(); 382 assertEquals(4936, b.getUncompressedSizeWithoutHeader()); 383 assertEquals(algo == GZ ? 2173 : 4936, 384 b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes()); 385 HFileBlock expected = b; 386 387 if (algo == GZ) { 388 is = fs.open(path); 389 ReaderContext readerContext = 390 new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(is)) 391 .withFileSize(totalSize).withFilePath(path).withFileSystem(fs).build(); 392 hbr = 393 new HFileBlock.FSReaderImpl(readerContext, meta, alloc, TEST_UTIL.getConfiguration()); 394 b = hbr.readBlockData(0, 395 2173 + HConstants.HFILEBLOCK_HEADER_SIZE + b.totalChecksumBytes(), pread, false, true); 396 assertEquals(expected, b); 397 int wrongCompressedSize = 2172; 398 try { 399 hbr.readBlockData(0, wrongCompressedSize + HConstants.HFILEBLOCK_HEADER_SIZE, pread, 400 false, true); 401 fail("Exception expected"); 402 } catch (IOException ex) { 403 String expectedPrefix = "Passed in onDiskSizeWithHeader="; 404 assertTrue(ex.getMessage().startsWith(expectedPrefix), 405 "Invalid exception message: '" + ex.getMessage() 406 + "'.\nMessage is expected to start with: '" + expectedPrefix + "'"); 407 } 408 assertRelease(b); 409 is.close(); 410 } 411 assertRelease(expected); 412 } 413 } 414 } 415 416 /** 417 * Test encoding/decoding data blocks. 418 * @throws IOException a bug or a problem with temporary files. 419 */ 420 @TestTemplate 421 public void testDataBlockEncoding() throws IOException { 422 testInternals(); 423 } 424 425 private void testInternals() throws IOException { 426 final int numBlocks = 5; 427 final Configuration conf = TEST_UTIL.getConfiguration(); 428 if (includesTag) { 429 conf.setInt("hfile.format.version", 3); 430 } 431 for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) { 432 for (boolean pread : new boolean[] { false, true }) { 433 for (DataBlockEncoding encoding : DataBlockEncoding.values()) { 434 LOG.info("testDataBlockEncoding: Compression algorithm={}, pread={}, dataBlockEncoder={}", 435 algo.toString(), pread, encoding); 436 Path path = 437 new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_" + algo + "_" + encoding.toString()); 438 FSDataOutputStream os = fs.create(path); 439 HFileDataBlockEncoder dataBlockEncoder = (encoding != DataBlockEncoding.NONE) 440 ? new HFileDataBlockEncoderImpl(encoding) 441 : NoOpDataBlockEncoder.INSTANCE; 442 HFileContext meta = new HFileContextBuilder().withCompression(algo) 443 .withIncludesMvcc(includesMemstoreTS).withIncludesTags(includesTag) 444 .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM).build(); 445 HFileBlock.Writer hbw = new HFileBlock.Writer(conf, dataBlockEncoder, meta); 446 long totalSize = 0; 447 final List<Integer> encodedSizes = new ArrayList<>(); 448 final List<ByteBuff> encodedBlocks = new ArrayList<>(); 449 for (int blockId = 0; blockId < numBlocks; ++blockId) { 450 hbw.startWriting(BlockType.DATA); 451 writeTestKeyValues(hbw, blockId, includesMemstoreTS, includesTag); 452 hbw.writeHeaderAndData(os); 453 int headerLen = HConstants.HFILEBLOCK_HEADER_SIZE; 454 ByteBuff encodedResultWithHeader = hbw.cloneUncompressedBufferWithHeader(); 455 final int encodedSize = encodedResultWithHeader.limit() - headerLen; 456 if (encoding != DataBlockEncoding.NONE) { 457 // We need to account for the two-byte encoding algorithm ID that 458 // comes after the 24-byte block header but before encoded KVs. 459 headerLen += DataBlockEncoding.ID_SIZE; 460 } 461 encodedSizes.add(encodedSize); 462 ByteBuff encodedBuf = encodedResultWithHeader.position(headerLen).slice(); 463 encodedBlocks.add(encodedBuf); 464 totalSize += hbw.getOnDiskSizeWithHeader(); 465 } 466 os.close(); 467 468 FSDataInputStream is = fs.open(path); 469 meta = new HFileContextBuilder().withHBaseCheckSum(true).withCompression(algo) 470 .withIncludesMvcc(includesMemstoreTS).withIncludesTags(includesTag).build(); 471 ReaderContext context = 472 new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(is)) 473 .withFileSize(totalSize).withFilePath(path).withFileSystem(fs).build(); 474 HFileBlock.FSReaderImpl hbr = new HFileBlock.FSReaderImpl(context, meta, alloc, conf); 475 hbr.setDataBlockEncoder(dataBlockEncoder, conf); 476 hbr.setIncludesMemStoreTS(includesMemstoreTS); 477 HFileBlock blockFromHFile, blockUnpacked; 478 int pos = 0; 479 for (int blockId = 0; blockId < numBlocks; ++blockId) { 480 blockFromHFile = hbr.readBlockData(pos, -1, pread, false, true); 481 assertEquals(0, HFile.getAndResetChecksumFailuresCount()); 482 blockFromHFile.sanityCheck(); 483 pos += blockFromHFile.getOnDiskSizeWithHeader(); 484 assertEquals((int) encodedSizes.get(blockId), 485 blockFromHFile.getUncompressedSizeWithoutHeader()); 486 assertEquals(meta.isCompressedOrEncrypted(), !blockFromHFile.isUnpacked()); 487 long packedHeapsize = blockFromHFile.heapSize(); 488 blockUnpacked = blockFromHFile.unpack(meta, hbr); 489 assertTrue(blockUnpacked.isUnpacked()); 490 if (meta.isCompressedOrEncrypted()) { 491 LOG.info("packedHeapsize=" + packedHeapsize + ", unpackedHeadsize=" 492 + blockUnpacked.heapSize()); 493 assertFalse(packedHeapsize == blockUnpacked.heapSize()); 494 assertTrue(packedHeapsize < blockUnpacked.heapSize(), 495 "Packed heapSize should be < unpacked heapSize"); 496 } 497 ByteBuff actualBuffer = blockUnpacked.getBufferWithoutHeader(); 498 if (encoding != DataBlockEncoding.NONE) { 499 // We expect a two-byte big-endian encoding id. 500 assertEquals(Long.toHexString(0), Long.toHexString(actualBuffer.get(0)), 501 "Unexpected first byte with " + buildMessageDetails(algo, encoding, pread)); 502 assertEquals(Long.toHexString(encoding.getId()), 503 Long.toHexString(actualBuffer.get(1)), 504 "Unexpected second byte with " + buildMessageDetails(algo, encoding, pread)); 505 actualBuffer.position(2); 506 actualBuffer = actualBuffer.slice(); 507 } 508 509 ByteBuff expectedBuff = encodedBlocks.get(blockId); 510 expectedBuff.rewind(); 511 512 // test if content matches, produce nice message 513 assertBuffersEqual(expectedBuff, actualBuffer, algo, encoding, pread); 514 515 // test serialized blocks 516 for (boolean reuseBuffer : new boolean[] { false, true }) { 517 ByteBuffer serialized = ByteBuffer.allocate(blockFromHFile.getSerializedLength()); 518 blockFromHFile.serialize(serialized, true); 519 HFileBlock deserialized = (HFileBlock) blockFromHFile.getDeserializer() 520 .deserialize(new SingleByteBuff(serialized), HEAP); 521 assertEquals(blockFromHFile, deserialized, 522 "Serialization did not preserve block state. reuseBuffer=" + reuseBuffer); 523 // intentional reference comparison 524 if (blockFromHFile != blockUnpacked) { 525 assertEquals(blockUnpacked, deserialized.unpack(meta, hbr), 526 "Deserialized block cannot be unpacked correctly."); 527 } 528 } 529 assertRelease(blockUnpacked); 530 if (blockFromHFile != blockUnpacked) { 531 blockFromHFile.release(); 532 } 533 } 534 is.close(); 535 } 536 } 537 } 538 } 539 540 static String buildMessageDetails(Algorithm compression, DataBlockEncoding encoding, 541 boolean pread) { 542 return String.format("compression %s, encoding %s, pread %s", compression, encoding, pread); 543 } 544 545 static void assertBuffersEqual(ByteBuff expectedBuffer, ByteBuff actualBuffer, 546 Compression.Algorithm compression, DataBlockEncoding encoding, boolean pread) { 547 if (!actualBuffer.equals(expectedBuffer)) { 548 int prefix = 0; 549 int minLimit = Math.min(expectedBuffer.limit(), actualBuffer.limit()); 550 while (prefix < minLimit && expectedBuffer.get(prefix) == actualBuffer.get(prefix)) { 551 prefix++; 552 } 553 554 fail(String.format("Content mismatch for %s, commonPrefix %d, expected %s, got %s", 555 buildMessageDetails(compression, encoding, pread), prefix, 556 nextBytesToStr(expectedBuffer, prefix), nextBytesToStr(actualBuffer, prefix))); 557 } 558 } 559 560 /** 561 * Convert a few next bytes in the given buffer at the given position to string. Used for error 562 * messages. 563 */ 564 private static String nextBytesToStr(ByteBuff buf, int pos) { 565 int maxBytes = buf.limit() - pos; 566 int numBytes = Math.min(16, maxBytes); 567 return Bytes.toStringBinary(buf.array(), buf.arrayOffset() + pos, numBytes) 568 + (numBytes < maxBytes ? "..." : ""); 569 } 570 571 @TestTemplate 572 public void testPreviousOffset() throws IOException { 573 testPreviousOffsetInternals(); 574 } 575 576 protected void testPreviousOffsetInternals() throws IOException { 577 // TODO: parameterize these nested loops. 578 Configuration conf = TEST_UTIL.getConfiguration(); 579 for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) { 580 for (boolean pread : BOOLEAN_VALUES) { 581 for (boolean cacheOnWrite : BOOLEAN_VALUES) { 582 Random rand = defaultRandom(); 583 LOG.info("testPreviousOffset: Compression algorithm={}, pread={}, cacheOnWrite={}", 584 algo.toString(), pread, cacheOnWrite); 585 Path path = new Path(TEST_UTIL.getDataTestDir(), "prev_offset"); 586 List<Long> expectedOffsets = new ArrayList<>(); 587 List<Long> expectedPrevOffsets = new ArrayList<>(); 588 List<BlockType> expectedTypes = new ArrayList<>(); 589 List<ByteBuffer> expectedContents = cacheOnWrite ? new ArrayList<>() : null; 590 long totalSize = writeBlocks(TEST_UTIL.getConfiguration(), rand, algo, path, 591 expectedOffsets, expectedPrevOffsets, expectedTypes, expectedContents); 592 593 FSDataInputStream is = fs.open(path); 594 HFileContext meta = 595 new HFileContextBuilder().withHBaseCheckSum(true).withIncludesMvcc(includesMemstoreTS) 596 .withIncludesTags(includesTag).withCompression(algo).build(); 597 ReaderContext context = 598 new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(is)) 599 .withFileSize(totalSize).withFilePath(path).withFileSystem(fs).build(); 600 HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(context, meta, alloc, conf); 601 long curOffset = 0; 602 for (int i = 0; i < NUM_TEST_BLOCKS; ++i) { 603 if (!pread) { 604 assertEquals(is.getPos(), 605 curOffset + (i == 0 ? 0 : HConstants.HFILEBLOCK_HEADER_SIZE)); 606 } 607 608 assertEquals(expectedOffsets.get(i).longValue(), curOffset); 609 if (detailedLogging) { 610 LOG.info("Reading block #" + i + " at offset " + curOffset); 611 } 612 HFileBlock b = hbr.readBlockData(curOffset, -1, pread, false, false); 613 if (detailedLogging) { 614 LOG.info("Block #" + i + ": " + b); 615 } 616 assertEquals(expectedTypes.get(i), b.getBlockType(), 617 "Invalid block #" + i + "'s type:"); 618 assertEquals((long) expectedPrevOffsets.get(i), b.getPrevBlockOffset(), 619 "Invalid previous block offset for block " + i + " of " + "type " + b.getBlockType() 620 + ":"); 621 b.sanityCheck(); 622 assertEquals(curOffset, b.getOffset()); 623 624 // Now re-load this block knowing the on-disk size. This tests a 625 // different branch in the loader. 626 HFileBlock b2 = 627 hbr.readBlockData(curOffset, b.getOnDiskSizeWithHeader(), pread, false, false); 628 b2.sanityCheck(); 629 630 assertEquals(b.getBlockType(), b2.getBlockType()); 631 assertEquals(b.getOnDiskSizeWithoutHeader(), b2.getOnDiskSizeWithoutHeader()); 632 assertEquals(b.getOnDiskSizeWithHeader(), b2.getOnDiskSizeWithHeader()); 633 assertEquals(b.getUncompressedSizeWithoutHeader(), 634 b2.getUncompressedSizeWithoutHeader()); 635 assertEquals(b.getPrevBlockOffset(), b2.getPrevBlockOffset()); 636 assertEquals(curOffset, b2.getOffset()); 637 assertEquals(b.getBytesPerChecksum(), b2.getBytesPerChecksum()); 638 assertEquals(b.getOnDiskDataSizeWithHeader(), b2.getOnDiskDataSizeWithHeader()); 639 assertEquals(0, HFile.getAndResetChecksumFailuresCount()); 640 assertRelease(b2); 641 642 curOffset += b.getOnDiskSizeWithHeader(); 643 644 if (cacheOnWrite) { 645 // NOTE: cache-on-write testing doesn't actually involve a BlockCache. It simply 646 // verifies that the unpacked value read back off disk matches the unpacked value 647 // generated before writing to disk. 648 HFileBlock newBlock = b.unpack(meta, hbr); 649 // neither b's unpacked nor the expectedContents have checksum. 650 // they should be identical 651 ByteBuff bufRead = newBlock.getBufferReadOnly(); 652 ByteBuffer bufExpected = expectedContents.get(i); 653 byte[] bytesRead = bufRead.toBytes(); 654 boolean bytesAreCorrect = Bytes.compareTo(bytesRead, 0, bytesRead.length, 655 bufExpected.array(), bufExpected.arrayOffset(), bufExpected.limit()) == 0; 656 String wrongBytesMsg = ""; 657 658 if (!bytesAreCorrect) { 659 // Optimization: only construct an error message in case we 660 // will need it. 661 wrongBytesMsg = "Expected bytes in block #" + i + " (algo=" + algo + ", pread=" 662 + pread + ", cacheOnWrite=" + cacheOnWrite + "):\n"; 663 wrongBytesMsg += Bytes.toStringBinary(bufExpected.array(), 664 bufExpected.arrayOffset(), Math.min(32 + 10, bufExpected.limit())) + ", actual:\n" 665 + Bytes.toStringBinary(bytesRead, 0, Math.min(32 + 10, bytesRead.length)); 666 if (detailedLogging) { 667 LOG.warn( 668 "expected header" + HFileBlock.toStringHeader(new SingleByteBuff(bufExpected)) 669 + "\nfound header" + HFileBlock.toStringHeader(bufRead)); 670 LOG.warn("bufread offset " + bufRead.arrayOffset() + " limit " + bufRead.limit() 671 + " expected offset " + bufExpected.arrayOffset() + " limit " 672 + bufExpected.limit()); 673 LOG.warn(wrongBytesMsg); 674 } 675 } 676 assertTrue(bytesAreCorrect, wrongBytesMsg); 677 assertRelease(newBlock); 678 if (newBlock != b) { 679 assertRelease(b); 680 } 681 } else { 682 assertRelease(b); 683 } 684 } 685 assertEquals(curOffset, fs.getFileStatus(path).getLen()); 686 is.close(); 687 } 688 } 689 } 690 } 691 692 private Random defaultRandom() { 693 return new Random(189237); 694 } 695 696 private class BlockReaderThread implements Callable<Boolean> { 697 private final String clientId; 698 private final HFileBlock.FSReader hbr; 699 private final List<Long> offsets; 700 private final List<BlockType> types; 701 private final long fileSize; 702 703 public BlockReaderThread(String clientId, HFileBlock.FSReader hbr, List<Long> offsets, 704 List<BlockType> types, long fileSize) { 705 this.clientId = clientId; 706 this.offsets = offsets; 707 this.hbr = hbr; 708 this.types = types; 709 this.fileSize = fileSize; 710 } 711 712 @Override 713 public Boolean call() throws Exception { 714 Random rand = new Random(clientId.hashCode()); 715 long endTime = EnvironmentEdgeManager.currentTime() + 10000; 716 int numBlocksRead = 0; 717 int numPositionalRead = 0; 718 int numWithOnDiskSize = 0; 719 while (EnvironmentEdgeManager.currentTime() < endTime) { 720 int blockId = rand.nextInt(NUM_TEST_BLOCKS); 721 long offset = offsets.get(blockId); 722 // now we only support concurrent read with pread = true 723 boolean pread = true; 724 boolean withOnDiskSize = rand.nextBoolean(); 725 long expectedSize = 726 (blockId == NUM_TEST_BLOCKS - 1 ? fileSize : offsets.get(blockId + 1)) - offset; 727 HFileBlock b = null; 728 try { 729 long onDiskSizeArg = withOnDiskSize ? expectedSize : -1; 730 b = hbr.readBlockData(offset, onDiskSizeArg, pread, false, false); 731 if (useHeapAllocator) { 732 assertTrue(!b.isSharedMem()); 733 } else { 734 assertTrue(!b.getBlockType().isData() || b.isSharedMem()); 735 } 736 assertEquals(types.get(blockId), b.getBlockType()); 737 assertEquals(expectedSize, b.getOnDiskSizeWithHeader()); 738 assertEquals(offset, b.getOffset()); 739 } catch (IOException ex) { 740 LOG.error("Error in client " + clientId + " trying to read block at " + offset 741 + ", pread=" + pread + ", withOnDiskSize=" + withOnDiskSize, ex); 742 return false; 743 } finally { 744 if (b != null) { 745 b.release(); 746 } 747 } 748 ++numBlocksRead; 749 if (pread) { 750 ++numPositionalRead; 751 } 752 753 if (withOnDiskSize) { 754 ++numWithOnDiskSize; 755 } 756 } 757 LOG 758 .info("Client " + clientId + " successfully read " + numBlocksRead + " blocks (with pread: " 759 + numPositionalRead + ", with onDiskSize " + "specified: " + numWithOnDiskSize + ")"); 760 761 return true; 762 } 763 } 764 765 @TestTemplate 766 public void testConcurrentReading() throws Exception { 767 testConcurrentReadingInternals(); 768 } 769 770 protected void testConcurrentReadingInternals() 771 throws IOException, InterruptedException, ExecutionException { 772 Configuration conf = TEST_UTIL.getConfiguration(); 773 for (Compression.Algorithm compressAlgo : COMPRESSION_ALGORITHMS) { 774 Path path = new Path(TEST_UTIL.getDataTestDir(), "concurrent_reading"); 775 Random rand = defaultRandom(); 776 List<Long> offsets = new ArrayList<>(); 777 List<BlockType> types = new ArrayList<>(); 778 writeBlocks(TEST_UTIL.getConfiguration(), rand, compressAlgo, path, offsets, null, types, 779 null); 780 FSDataInputStream is = fs.open(path); 781 long fileSize = fs.getFileStatus(path).getLen(); 782 HFileContext meta = 783 new HFileContextBuilder().withHBaseCheckSum(true).withIncludesMvcc(includesMemstoreTS) 784 .withIncludesTags(includesTag).withCompression(compressAlgo).build(); 785 ReaderContext context = 786 new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(is)) 787 .withFileSize(fileSize).withFilePath(path).withFileSystem(fs).build(); 788 HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(context, meta, alloc, conf); 789 790 Executor exec = Executors.newFixedThreadPool(NUM_READER_THREADS); 791 ExecutorCompletionService<Boolean> ecs = new ExecutorCompletionService<>(exec); 792 793 for (int i = 0; i < NUM_READER_THREADS; ++i) { 794 ecs.submit( 795 new BlockReaderThread("reader_" + (char) ('A' + i), hbr, offsets, types, fileSize)); 796 } 797 798 for (int i = 0; i < NUM_READER_THREADS; ++i) { 799 Future<Boolean> result = ecs.take(); 800 assertTrue(result.get()); 801 if (detailedLogging) { 802 LOG.info(String.valueOf(i + 1) + " reader threads finished successfully (algo=" 803 + compressAlgo + ")"); 804 } 805 } 806 is.close(); 807 } 808 } 809 810 private long writeBlocks(Configuration conf, Random rand, Compression.Algorithm compressAlgo, 811 Path path, List<Long> expectedOffsets, List<Long> expectedPrevOffsets, 812 List<BlockType> expectedTypes, List<ByteBuffer> expectedContents) throws IOException { 813 boolean cacheOnWrite = expectedContents != null; 814 FSDataOutputStream os = fs.create(path); 815 HFileContext meta = new HFileContextBuilder().withHBaseCheckSum(true) 816 .withIncludesMvcc(includesMemstoreTS).withIncludesTags(includesTag) 817 .withCompression(compressAlgo).withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM).build(); 818 HFileBlock.Writer hbw = new HFileBlock.Writer(conf, null, meta); 819 Map<BlockType, Long> prevOffsetByType = new HashMap<>(); 820 long totalSize = 0; 821 for (int i = 0; i < NUM_TEST_BLOCKS; ++i) { 822 long pos = os.getPos(); 823 int blockTypeOrdinal = rand.nextInt(BlockType.values().length); 824 if (blockTypeOrdinal == BlockType.ENCODED_DATA.ordinal()) { 825 blockTypeOrdinal = BlockType.DATA.ordinal(); 826 } 827 BlockType bt = BlockType.values()[blockTypeOrdinal]; 828 DataOutputStream dos = hbw.startWriting(bt); 829 int size = rand.nextInt(500); 830 for (int j = 0; j < size; ++j) { 831 // This might compress well. 832 dos.writeShort(i + 1); 833 dos.writeInt(j + 1); 834 } 835 836 if (expectedOffsets != null) expectedOffsets.add(os.getPos()); 837 838 if (expectedPrevOffsets != null) { 839 Long prevOffset = prevOffsetByType.get(bt); 840 expectedPrevOffsets.add(prevOffset != null ? prevOffset : -1); 841 prevOffsetByType.put(bt, os.getPos()); 842 } 843 844 expectedTypes.add(bt); 845 846 hbw.writeHeaderAndData(os); 847 totalSize += hbw.getOnDiskSizeWithHeader(); 848 849 if (cacheOnWrite) { 850 ByteBuff buff = hbw.cloneUncompressedBufferWithHeader(); 851 expectedContents.add(buff.asSubByteBuffer(buff.capacity())); 852 } 853 854 if (detailedLogging) { 855 LOG.info("Written block #" + i + " of type " + bt + ", uncompressed size " 856 + hbw.getUncompressedSizeWithoutHeader() + ", packed size " 857 + hbw.getOnDiskSizeWithoutHeader() + " at offset " + pos); 858 } 859 } 860 os.close(); 861 LOG.info("Created a temporary file at " + path + ", " + fs.getFileStatus(path).getLen() 862 + " byte, compression=" + compressAlgo); 863 return totalSize; 864 } 865 866 @TestTemplate 867 public void testBlockHeapSize() { 868 testBlockHeapSizeInternals(); 869 } 870 871 protected void testBlockHeapSizeInternals() { 872 if (ClassSize.is32BitJVM()) { 873 assertEquals(64, HFileBlock.MULTI_BYTE_BUFFER_HEAP_SIZE); 874 } else { 875 assertEquals(80, HFileBlock.MULTI_BYTE_BUFFER_HEAP_SIZE); 876 } 877 878 for (int size : new int[] { 100, 256, 12345 }) { 879 byte[] byteArr = new byte[HConstants.HFILEBLOCK_HEADER_SIZE + size]; 880 ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size); 881 HFileContext meta = new HFileContextBuilder().withIncludesMvcc(includesMemstoreTS) 882 .withIncludesTags(includesTag).withHBaseCheckSum(false).withCompression(Algorithm.NONE) 883 .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM).withChecksumType(ChecksumType.NULL) 884 .build(); 885 HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf), 886 HFileBlock.FILL_HEADER, -1, 0, -1, meta, HEAP); 887 long byteBufferExpectedSize = 888 ClassSize.align(ClassSize.estimateBase(new MultiByteBuff(buf).getClass(), true) 889 + HConstants.HFILEBLOCK_HEADER_SIZE + size); 890 long hfileMetaSize = ClassSize.align(ClassSize.estimateBase(HFileContext.class, true)); 891 long hfileBlockExpectedSize = ClassSize.align(ClassSize.estimateBase(HFileBlock.class, true)); 892 long expected = hfileBlockExpectedSize + byteBufferExpectedSize + hfileMetaSize; 893 assertEquals(expected, block.heapSize(), 894 "Block data size: " + size + ", byte buffer expected " + "size: " + byteBufferExpectedSize 895 + ", HFileBlock class expected " + "size: " + hfileBlockExpectedSize 896 + " HFileContext class expected size: " + hfileMetaSize + "; "); 897 } 898 } 899 900 @TestTemplate 901 public void testSerializeWithoutNextBlockMetadata() { 902 int size = 100; 903 int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; 904 byte[] byteArr = new byte[length]; 905 ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size); 906 HFileContext meta = new HFileContextBuilder().build(); 907 HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, 908 ByteBuff.wrap(buf), HFileBlock.FILL_HEADER, -1, 52, -1, meta, alloc); 909 HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, 910 ByteBuff.wrap(buf), HFileBlock.FILL_HEADER, -1, -1, -1, meta, alloc); 911 ByteBuffer buff1 = ByteBuffer.allocate(length); 912 ByteBuffer buff2 = ByteBuffer.allocate(length); 913 blockWithNextBlockMetadata.serialize(buff1, true); 914 blockWithoutNextBlockMetadata.serialize(buff2, true); 915 assertNotEquals(buff1, buff2); 916 buff1.clear(); 917 buff2.clear(); 918 blockWithNextBlockMetadata.serialize(buff1, false); 919 blockWithoutNextBlockMetadata.serialize(buff2, false); 920 assertEquals(buff1, buff2); 921 } 922}