001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP; 021import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ; 022import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE; 023import static org.junit.Assert.*; 024 025import java.io.ByteArrayOutputStream; 026import java.io.DataOutputStream; 027import java.io.IOException; 028import java.io.OutputStream; 029import java.nio.ByteBuffer; 030import java.util.ArrayList; 031import java.util.Collection; 032import java.util.Collections; 033import java.util.HashMap; 034import java.util.List; 035import java.util.Map; 036import java.util.Optional; 037import java.util.Random; 038import java.util.concurrent.Callable; 039import java.util.concurrent.ExecutionException; 040import java.util.concurrent.Executor; 041import java.util.concurrent.ExecutorCompletionService; 042import java.util.concurrent.Executors; 043import java.util.concurrent.Future; 044 045import org.apache.hadoop.conf.Configuration; 046import org.apache.hadoop.fs.FSDataInputStream; 047import org.apache.hadoop.fs.FSDataOutputStream; 048import org.apache.hadoop.fs.FileSystem; 049import org.apache.hadoop.fs.Path; 050import org.apache.hadoop.hbase.ArrayBackedTag; 051import org.apache.hadoop.hbase.CellComparatorImpl; 052import org.apache.hadoop.hbase.CellUtil; 053import org.apache.hadoop.hbase.HBaseClassTestRule; 054import org.apache.hadoop.hbase.HBaseConfiguration; 055import org.apache.hadoop.hbase.HBaseTestingUtility; 056import org.apache.hadoop.hbase.HConstants; 057import org.apache.hadoop.hbase.KeyValue; 058import org.apache.hadoop.hbase.Tag; 059import org.apache.hadoop.hbase.fs.HFileSystem; 060import org.apache.hadoop.hbase.io.ByteBuffAllocator; 061import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; 062import org.apache.hadoop.hbase.io.compress.Compression; 063import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 064import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 065import org.apache.hadoop.hbase.nio.ByteBuff; 066import org.apache.hadoop.hbase.nio.MultiByteBuff; 067import org.apache.hadoop.hbase.nio.SingleByteBuff; 068import org.apache.hadoop.hbase.testclassification.IOTests; 069import org.apache.hadoop.hbase.testclassification.LargeTests; 070import org.apache.hadoop.hbase.util.Bytes; 071import org.apache.hadoop.hbase.util.ChecksumType; 072import org.apache.hadoop.hbase.util.ClassSize; 073import org.apache.hadoop.io.WritableUtils; 074import org.apache.hadoop.io.compress.Compressor; 075import org.junit.After; 076import org.junit.Before; 077import org.junit.ClassRule; 078import org.junit.Test; 079import org.junit.experimental.categories.Category; 080import org.junit.runner.RunWith; 081import org.junit.runners.Parameterized; 082import org.junit.runners.Parameterized.Parameters; 083import org.mockito.Mockito; 084import org.slf4j.Logger; 085import org.slf4j.LoggerFactory; 086 087@Category({IOTests.class, LargeTests.class}) 088@RunWith(Parameterized.class) 089public class TestHFileBlock { 090 091 @ClassRule 092 public static final HBaseClassTestRule CLASS_RULE = 093 HBaseClassTestRule.forClass(TestHFileBlock.class); 094 095 // change this value to activate more logs 096 private static final boolean detailedLogging = false; 097 private static final boolean[] BOOLEAN_VALUES = new boolean[] { false, true }; 098 099 private static final Logger LOG = LoggerFactory.getLogger(TestHFileBlock.class); 100 101 static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = { NONE, GZ }; 102 103 private static final int NUM_TEST_BLOCKS = 1000; 104 private static final int NUM_READER_THREADS = 26; 105 private static final int MAX_BUFFER_COUNT = 2048; 106 107 // Used to generate KeyValues 108 private static int NUM_KEYVALUES = 50; 109 private static int FIELD_LENGTH = 10; 110 private static float CHANCE_TO_REPEAT = 0.6f; 111 112 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 113 private FileSystem fs; 114 115 private final boolean includesMemstoreTS; 116 private final boolean includesTag; 117 private final boolean useHeapAllocator; 118 private final ByteBuffAllocator alloc; 119 120 public TestHFileBlock(boolean includesMemstoreTS, boolean includesTag, boolean useHeapAllocator) { 121 this.includesMemstoreTS = includesMemstoreTS; 122 this.includesTag = includesTag; 123 this.useHeapAllocator = useHeapAllocator; 124 this.alloc = useHeapAllocator ? HEAP : createOffHeapAlloc(); 125 assertAllocator(); 126 } 127 128 @Parameters 129 public static Collection<Object[]> parameters() { 130 List<Object[]> params = new ArrayList<>(); 131 // Generate boolean triples from 000 to 111 132 for (int i = 0; i < (1 << 3); i++) { 133 Object[] flags = new Boolean[3]; 134 for (int k = 0; k < 3; k++) { 135 flags[k] = (i & (1 << k)) != 0; 136 } 137 params.add(flags); 138 } 139 return params; 140 } 141 142 private ByteBuffAllocator createOffHeapAlloc() { 143 Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 144 conf.setInt(ByteBuffAllocator.MAX_BUFFER_COUNT_KEY, MAX_BUFFER_COUNT); 145 conf.setInt(ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY, 0); 146 ByteBuffAllocator alloc = ByteBuffAllocator.create(conf, true); 147 // Fill the allocator 148 List<ByteBuff> bufs = new ArrayList<>(); 149 for (int i = 0; i < MAX_BUFFER_COUNT; i++) { 150 ByteBuff bb = alloc.allocateOneBuffer(); 151 assertTrue(!bb.hasArray()); 152 bufs.add(bb); 153 } 154 bufs.forEach(ByteBuff::release); 155 return alloc; 156 } 157 158 private void assertAllocator() { 159 if (!useHeapAllocator) { 160 assertEquals(MAX_BUFFER_COUNT, alloc.getFreeBufferCount()); 161 } 162 } 163 164 @Before 165 public void setUp() throws IOException { 166 fs = HFileSystem.get(TEST_UTIL.getConfiguration()); 167 } 168 169 @After 170 public void tearDown() throws IOException { 171 assertAllocator(); 172 alloc.clean(); 173 } 174 175 static void writeTestBlockContents(DataOutputStream dos) throws IOException { 176 // This compresses really well. 177 for (int i = 0; i < 1000; ++i) 178 dos.writeInt(i / 100); 179 } 180 181 static int writeTestKeyValues(HFileBlock.Writer hbw, int seed, boolean includesMemstoreTS, 182 boolean useTag) throws IOException { 183 List<KeyValue> keyValues = new ArrayList<>(); 184 Random randomizer = new Random(42L + seed); // just any fixed number 185 186 // generate keyValues 187 for (int i = 0; i < NUM_KEYVALUES; ++i) { 188 byte[] row; 189 long timestamp; 190 byte[] family; 191 byte[] qualifier; 192 byte[] value; 193 194 // generate it or repeat, it should compress well 195 if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) { 196 row = CellUtil.cloneRow(keyValues.get(randomizer.nextInt(keyValues.size()))); 197 } else { 198 row = new byte[FIELD_LENGTH]; 199 randomizer.nextBytes(row); 200 } 201 if (0 == i) { 202 family = new byte[FIELD_LENGTH]; 203 randomizer.nextBytes(family); 204 } else { 205 family = CellUtil.cloneFamily(keyValues.get(0)); 206 } 207 if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) { 208 qualifier = CellUtil.cloneQualifier(keyValues.get(randomizer.nextInt(keyValues.size()))); 209 } else { 210 qualifier = new byte[FIELD_LENGTH]; 211 randomizer.nextBytes(qualifier); 212 } 213 if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) { 214 value = CellUtil.cloneValue(keyValues.get(randomizer.nextInt(keyValues.size()))); 215 } else { 216 value = new byte[FIELD_LENGTH]; 217 randomizer.nextBytes(value); 218 } 219 if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) { 220 timestamp = keyValues.get( 221 randomizer.nextInt(keyValues.size())).getTimestamp(); 222 } else { 223 timestamp = randomizer.nextLong(); 224 } 225 if (!useTag) { 226 keyValues.add(new KeyValue(row, family, qualifier, timestamp, value)); 227 } else { 228 keyValues.add(new KeyValue(row, family, qualifier, timestamp, value, 229 new Tag[] { new ArrayBackedTag((byte) 1, Bytes.toBytes("myTagVal")) })); 230 } 231 } 232 233 // sort it and write to stream 234 int totalSize = 0; 235 Collections.sort(keyValues, CellComparatorImpl.COMPARATOR); 236 237 for (KeyValue kv : keyValues) { 238 totalSize += kv.getLength(); 239 if (includesMemstoreTS) { 240 long memstoreTS = randomizer.nextLong(); 241 kv.setSequenceId(memstoreTS); 242 totalSize += WritableUtils.getVIntSize(memstoreTS); 243 } 244 hbw.write(kv); 245 } 246 return totalSize; 247 } 248 249 public byte[] createTestV1Block(Compression.Algorithm algo) 250 throws IOException { 251 Compressor compressor = algo.getCompressor(); 252 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 253 OutputStream os = algo.createCompressionStream(baos, compressor, 0); 254 DataOutputStream dos = new DataOutputStream(os); 255 BlockType.META.write(dos); // Let's make this a meta block. 256 writeTestBlockContents(dos); 257 dos.flush(); 258 algo.returnCompressor(compressor); 259 return baos.toByteArray(); 260 } 261 262 static HFileBlock.Writer createTestV2Block(Compression.Algorithm algo, 263 boolean includesMemstoreTS, boolean includesTag) throws IOException { 264 final BlockType blockType = BlockType.DATA; 265 HFileContext meta = new HFileContextBuilder() 266 .withCompression(algo) 267 .withIncludesMvcc(includesMemstoreTS) 268 .withIncludesTags(includesTag) 269 .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM) 270 .build(); 271 HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta); 272 DataOutputStream dos = hbw.startWriting(blockType); 273 writeTestBlockContents(dos); 274 dos.flush(); 275 hbw.ensureBlockReady(); 276 assertEquals(1000 * 4, hbw.getUncompressedSizeWithoutHeader()); 277 hbw.release(); 278 return hbw; 279 } 280 281 public String createTestBlockStr(Compression.Algorithm algo, 282 int correctLength, boolean useTag) throws IOException { 283 HFileBlock.Writer hbw = createTestV2Block(algo, includesMemstoreTS, useTag); 284 byte[] testV2Block = hbw.getHeaderAndDataForTest(); 285 int osOffset = HConstants.HFILEBLOCK_HEADER_SIZE + 9; 286 if (testV2Block.length == correctLength) { 287 // Force-set the "OS" field of the gzip header to 3 (Unix) to avoid 288 // variations across operating systems. 289 // See http://www.gzip.org/zlib/rfc-gzip.html for gzip format. 290 // We only make this change when the compressed block length matches. 291 // Otherwise, there are obviously other inconsistencies. 292 testV2Block[osOffset] = 3; 293 } 294 return Bytes.toStringBinary(testV2Block); 295 } 296 297 @Test 298 public void testNoCompression() throws IOException { 299 CacheConfig cacheConf = Mockito.mock(CacheConfig.class); 300 Mockito.when(cacheConf.getBlockCache()).thenReturn(Optional.empty()); 301 302 HFileBlock block = 303 createTestV2Block(NONE, includesMemstoreTS, false).getBlockForCaching(cacheConf); 304 assertEquals(4000, block.getUncompressedSizeWithoutHeader()); 305 assertEquals(4004, block.getOnDiskSizeWithoutHeader()); 306 assertTrue(block.isUnpacked()); 307 } 308 309 @Test 310 public void testGzipCompression() throws IOException { 311 final String correctTestBlockStr = 312 "DATABLK*\\x00\\x00\\x00>\\x00\\x00\\x0F\\xA0\\xFF\\xFF\\xFF\\xFF" 313 + "\\xFF\\xFF\\xFF\\xFF" 314 + "\\x0" + ChecksumType.getDefaultChecksumType().getCode() 315 + "\\x00\\x00@\\x00\\x00\\x00\\x00[" 316 // gzip-compressed block: http://www.gzip.org/zlib/rfc-gzip.html 317 + "\\x1F\\x8B" // gzip magic signature 318 + "\\x08" // Compression method: 8 = "deflate" 319 + "\\x00" // Flags 320 + "\\x00\\x00\\x00\\x00" // mtime 321 + "\\x00" // XFL (extra flags) 322 // OS (0 = FAT filesystems, 3 = Unix). However, this field 323 // sometimes gets set to 0 on Linux and Mac, so we reset it to 3. 324 // This appears to be a difference caused by the availability 325 // (and use) of the native GZ codec. 326 + "\\x03" 327 + "\\xED\\xC3\\xC1\\x11\\x00 \\x08\\xC00DD\\xDD\\x7Fa" 328 + "\\xD6\\xE8\\xA3\\xB9K\\x84`\\x96Q\\xD3\\xA8\\xDB\\xA8e\\xD4c" 329 + "\\xD46\\xEA5\\xEA3\\xEA7\\xE7\\x00LI\\x5Cs\\xA0\\x0F\\x00\\x00" 330 + "\\x00\\x00\\x00\\x00"; // 4 byte checksum (ignored) 331 final int correctGzipBlockLength = 95; 332 final String testBlockStr = createTestBlockStr(GZ, correctGzipBlockLength, false); 333 // We ignore the block checksum because createTestBlockStr can change the 334 // gzip header after the block is produced 335 assertEquals(correctTestBlockStr.substring(0, correctGzipBlockLength - 4), 336 testBlockStr.substring(0, correctGzipBlockLength - 4)); 337 } 338 339 @Test 340 public void testReaderV2() throws IOException { 341 testReaderV2Internals(); 342 } 343 344 private void assertRelease(HFileBlock blk) { 345 if (blk instanceof ExclusiveMemHFileBlock) { 346 assertFalse(blk.release()); 347 } else { 348 assertTrue(blk.release()); 349 } 350 } 351 352 protected void testReaderV2Internals() throws IOException { 353 if(includesTag) { 354 TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3); 355 } 356 for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) { 357 for (boolean pread : new boolean[] { false, true }) { 358 LOG.info("testReaderV2: Compression algorithm: " + algo + 359 ", pread=" + pread); 360 Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_" 361 + algo); 362 FSDataOutputStream os = fs.create(path); 363 HFileContext meta = new HFileContextBuilder() 364 .withCompression(algo) 365 .withIncludesMvcc(includesMemstoreTS) 366 .withIncludesTags(includesTag) 367 .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM) 368 .build(); 369 HFileBlock.Writer hbw = new HFileBlock.Writer(null, 370 meta); 371 long totalSize = 0; 372 for (int blockId = 0; blockId < 2; ++blockId) { 373 DataOutputStream dos = hbw.startWriting(BlockType.DATA); 374 for (int i = 0; i < 1234; ++i) 375 dos.writeInt(i); 376 hbw.writeHeaderAndData(os); 377 totalSize += hbw.getOnDiskSizeWithHeader(); 378 } 379 os.close(); 380 381 FSDataInputStream is = fs.open(path); 382 meta = new HFileContextBuilder() 383 .withHBaseCheckSum(true) 384 .withIncludesMvcc(includesMemstoreTS) 385 .withIncludesTags(includesTag) 386 .withCompression(algo).build(); 387 ReaderContext context = new ReaderContextBuilder() 388 .withInputStreamWrapper(new FSDataInputStreamWrapper(is)) 389 .withFileSize(totalSize) 390 .withFilePath(path) 391 .withFileSystem(fs) 392 .build(); 393 HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(context, meta, alloc); 394 HFileBlock b = hbr.readBlockData(0, -1, pread, false, true); 395 is.close(); 396 assertEquals(0, HFile.getAndResetChecksumFailuresCount()); 397 398 b.sanityCheck(); 399 assertEquals(4936, b.getUncompressedSizeWithoutHeader()); 400 assertEquals(algo == GZ ? 2173 : 4936, 401 b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes()); 402 HFileBlock expected = b; 403 404 if (algo == GZ) { 405 is = fs.open(path); 406 ReaderContext readerContext = new ReaderContextBuilder() 407 .withInputStreamWrapper(new FSDataInputStreamWrapper(is)) 408 .withFileSize(totalSize) 409 .withFilePath(path) 410 .withFileSystem(fs) 411 .build(); 412 hbr = new HFileBlock.FSReaderImpl(readerContext, meta, alloc); 413 b = hbr.readBlockData(0, 414 2173 + HConstants.HFILEBLOCK_HEADER_SIZE + b.totalChecksumBytes(), pread, false, true); 415 assertEquals(expected, b); 416 int wrongCompressedSize = 2172; 417 try { 418 hbr.readBlockData(0, wrongCompressedSize + HConstants.HFILEBLOCK_HEADER_SIZE, pread, 419 false, true); 420 fail("Exception expected"); 421 } catch (IOException ex) { 422 String expectedPrefix = "Passed in onDiskSizeWithHeader="; 423 assertTrue("Invalid exception message: '" + ex.getMessage() 424 + "'.\nMessage is expected to start with: '" + expectedPrefix 425 + "'", ex.getMessage().startsWith(expectedPrefix)); 426 } 427 assertRelease(b); 428 is.close(); 429 } 430 assertRelease(expected); 431 } 432 } 433 } 434 435 /** 436 * Test encoding/decoding data blocks. 437 * @throws IOException a bug or a problem with temporary files. 438 */ 439 @Test 440 public void testDataBlockEncoding() throws IOException { 441 testInternals(); 442 } 443 444 private void testInternals() throws IOException { 445 final int numBlocks = 5; 446 if(includesTag) { 447 TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3); 448 } 449 for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) { 450 for (boolean pread : new boolean[] { false, true }) { 451 for (DataBlockEncoding encoding : DataBlockEncoding.values()) { 452 LOG.info("testDataBlockEncoding: Compression algorithm={}, pread={}, dataBlockEncoder={}", 453 algo.toString(), pread, encoding); 454 Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_" 455 + algo + "_" + encoding.toString()); 456 FSDataOutputStream os = fs.create(path); 457 HFileDataBlockEncoder dataBlockEncoder = (encoding != DataBlockEncoding.NONE) ? 458 new HFileDataBlockEncoderImpl(encoding) : NoOpDataBlockEncoder.INSTANCE; 459 HFileContext meta = new HFileContextBuilder() 460 .withCompression(algo) 461 .withIncludesMvcc(includesMemstoreTS) 462 .withIncludesTags(includesTag) 463 .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM) 464 .build(); 465 HFileBlock.Writer hbw = new HFileBlock.Writer(dataBlockEncoder, meta); 466 long totalSize = 0; 467 final List<Integer> encodedSizes = new ArrayList<>(); 468 final List<ByteBuff> encodedBlocks = new ArrayList<>(); 469 for (int blockId = 0; blockId < numBlocks; ++blockId) { 470 hbw.startWriting(BlockType.DATA); 471 writeTestKeyValues(hbw, blockId, includesMemstoreTS, includesTag); 472 hbw.writeHeaderAndData(os); 473 int headerLen = HConstants.HFILEBLOCK_HEADER_SIZE; 474 ByteBuff encodedResultWithHeader = hbw.cloneUncompressedBufferWithHeader(); 475 final int encodedSize = encodedResultWithHeader.limit() - headerLen; 476 if (encoding != DataBlockEncoding.NONE) { 477 // We need to account for the two-byte encoding algorithm ID that 478 // comes after the 24-byte block header but before encoded KVs. 479 headerLen += DataBlockEncoding.ID_SIZE; 480 } 481 encodedSizes.add(encodedSize); 482 ByteBuff encodedBuf = encodedResultWithHeader.position(headerLen).slice(); 483 encodedBlocks.add(encodedBuf); 484 totalSize += hbw.getOnDiskSizeWithHeader(); 485 } 486 os.close(); 487 488 FSDataInputStream is = fs.open(path); 489 meta = new HFileContextBuilder() 490 .withHBaseCheckSum(true) 491 .withCompression(algo) 492 .withIncludesMvcc(includesMemstoreTS) 493 .withIncludesTags(includesTag) 494 .build(); 495 ReaderContext context = new ReaderContextBuilder() 496 .withInputStreamWrapper(new FSDataInputStreamWrapper(is)) 497 .withFileSize(totalSize) 498 .withFilePath(path) 499 .withFileSystem(fs) 500 .build(); 501 HFileBlock.FSReaderImpl hbr = 502 new HFileBlock.FSReaderImpl(context, meta, alloc); 503 hbr.setDataBlockEncoder(dataBlockEncoder); 504 hbr.setIncludesMemStoreTS(includesMemstoreTS); 505 HFileBlock blockFromHFile, blockUnpacked; 506 int pos = 0; 507 for (int blockId = 0; blockId < numBlocks; ++blockId) { 508 blockFromHFile = hbr.readBlockData(pos, -1, pread, false, true); 509 assertEquals(0, HFile.getAndResetChecksumFailuresCount()); 510 blockFromHFile.sanityCheck(); 511 pos += blockFromHFile.getOnDiskSizeWithHeader(); 512 assertEquals((int) encodedSizes.get(blockId), 513 blockFromHFile.getUncompressedSizeWithoutHeader()); 514 assertEquals(meta.isCompressedOrEncrypted(), !blockFromHFile.isUnpacked()); 515 long packedHeapsize = blockFromHFile.heapSize(); 516 blockUnpacked = blockFromHFile.unpack(meta, hbr); 517 assertTrue(blockUnpacked.isUnpacked()); 518 if (meta.isCompressedOrEncrypted()) { 519 LOG.info("packedHeapsize=" + packedHeapsize + ", unpackedHeadsize=" + blockUnpacked 520 .heapSize()); 521 assertFalse(packedHeapsize == blockUnpacked.heapSize()); 522 assertTrue("Packed heapSize should be < unpacked heapSize", 523 packedHeapsize < blockUnpacked.heapSize()); 524 } 525 ByteBuff actualBuffer = blockUnpacked.getBufferWithoutHeader(); 526 if (encoding != DataBlockEncoding.NONE) { 527 // We expect a two-byte big-endian encoding id. 528 assertEquals( 529 "Unexpected first byte with " + buildMessageDetails(algo, encoding, pread), 530 Long.toHexString(0), Long.toHexString(actualBuffer.get(0))); 531 assertEquals( 532 "Unexpected second byte with " + buildMessageDetails(algo, encoding, pread), 533 Long.toHexString(encoding.getId()), Long.toHexString(actualBuffer.get(1))); 534 actualBuffer.position(2); 535 actualBuffer = actualBuffer.slice(); 536 } 537 538 ByteBuff expectedBuff = encodedBlocks.get(blockId); 539 expectedBuff.rewind(); 540 541 // test if content matches, produce nice message 542 assertBuffersEqual(expectedBuff, actualBuffer, algo, encoding, pread); 543 544 // test serialized blocks 545 for (boolean reuseBuffer : new boolean[] { false, true }) { 546 ByteBuffer serialized = ByteBuffer.allocate(blockFromHFile.getSerializedLength()); 547 blockFromHFile.serialize(serialized, true); 548 HFileBlock deserialized = (HFileBlock) blockFromHFile.getDeserializer() 549 .deserialize(new SingleByteBuff(serialized), HEAP); 550 assertEquals("Serialization did not preserve block state. reuseBuffer=" + reuseBuffer, 551 blockFromHFile, deserialized); 552 // intentional reference comparison 553 if (blockFromHFile != blockUnpacked) { 554 assertEquals("Deserialized block cannot be unpacked correctly.", blockUnpacked, 555 deserialized.unpack(meta, hbr)); 556 } 557 } 558 assertRelease(blockUnpacked); 559 if (blockFromHFile != blockUnpacked) { 560 blockFromHFile.release(); 561 } 562 } 563 is.close(); 564 } 565 } 566 } 567 } 568 569 static String buildMessageDetails(Algorithm compression, DataBlockEncoding encoding, 570 boolean pread) { 571 return String.format("compression %s, encoding %s, pread %s", compression, encoding, pread); 572 } 573 574 static void assertBuffersEqual(ByteBuff expectedBuffer, 575 ByteBuff actualBuffer, Compression.Algorithm compression, 576 DataBlockEncoding encoding, boolean pread) { 577 if (!actualBuffer.equals(expectedBuffer)) { 578 int prefix = 0; 579 int minLimit = Math.min(expectedBuffer.limit(), actualBuffer.limit()); 580 while (prefix < minLimit && 581 expectedBuffer.get(prefix) == actualBuffer.get(prefix)) { 582 prefix++; 583 } 584 585 fail(String.format( 586 "Content mismatch for %s, commonPrefix %d, expected %s, got %s", 587 buildMessageDetails(compression, encoding, pread), prefix, 588 nextBytesToStr(expectedBuffer, prefix), 589 nextBytesToStr(actualBuffer, prefix))); 590 } 591 } 592 593 /** 594 * Convert a few next bytes in the given buffer at the given position to 595 * string. Used for error messages. 596 */ 597 private static String nextBytesToStr(ByteBuff buf, int pos) { 598 int maxBytes = buf.limit() - pos; 599 int numBytes = Math.min(16, maxBytes); 600 return Bytes.toStringBinary(buf.array(), buf.arrayOffset() + pos, 601 numBytes) + (numBytes < maxBytes ? "..." : ""); 602 } 603 604 @Test 605 public void testPreviousOffset() throws IOException { 606 testPreviousOffsetInternals(); 607 } 608 609 protected void testPreviousOffsetInternals() throws IOException { 610 // TODO: parameterize these nested loops. 611 for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) { 612 for (boolean pread : BOOLEAN_VALUES) { 613 for (boolean cacheOnWrite : BOOLEAN_VALUES) { 614 Random rand = defaultRandom(); 615 LOG.info("testPreviousOffset: Compression algorithm={}, pread={}, cacheOnWrite={}", 616 algo.toString(), pread, cacheOnWrite); 617 Path path = new Path(TEST_UTIL.getDataTestDir(), "prev_offset"); 618 List<Long> expectedOffsets = new ArrayList<>(); 619 List<Long> expectedPrevOffsets = new ArrayList<>(); 620 List<BlockType> expectedTypes = new ArrayList<>(); 621 List<ByteBuffer> expectedContents = cacheOnWrite ? new ArrayList<>() : null; 622 long totalSize = writeBlocks(rand, algo, path, expectedOffsets, 623 expectedPrevOffsets, expectedTypes, expectedContents); 624 625 FSDataInputStream is = fs.open(path); 626 HFileContext meta = new HFileContextBuilder() 627 .withHBaseCheckSum(true) 628 .withIncludesMvcc(includesMemstoreTS) 629 .withIncludesTags(includesTag) 630 .withCompression(algo).build(); 631 ReaderContext context = new ReaderContextBuilder() 632 .withInputStreamWrapper(new FSDataInputStreamWrapper(is)) 633 .withFileSize(totalSize) 634 .withFilePath(path) 635 .withFileSystem(fs) 636 .build(); 637 HFileBlock.FSReader hbr = 638 new HFileBlock.FSReaderImpl(context, meta, alloc); 639 long curOffset = 0; 640 for (int i = 0; i < NUM_TEST_BLOCKS; ++i) { 641 if (!pread) { 642 assertEquals(is.getPos(), curOffset + (i == 0 ? 0 : 643 HConstants.HFILEBLOCK_HEADER_SIZE)); 644 } 645 646 assertEquals(expectedOffsets.get(i).longValue(), curOffset); 647 if (detailedLogging) { 648 LOG.info("Reading block #" + i + " at offset " + curOffset); 649 } 650 HFileBlock b = hbr.readBlockData(curOffset, -1, pread, false, false); 651 if (detailedLogging) { 652 LOG.info("Block #" + i + ": " + b); 653 } 654 assertEquals("Invalid block #" + i + "'s type:", 655 expectedTypes.get(i), b.getBlockType()); 656 assertEquals("Invalid previous block offset for block " + i 657 + " of " + "type " + b.getBlockType() + ":", 658 (long) expectedPrevOffsets.get(i), b.getPrevBlockOffset()); 659 b.sanityCheck(); 660 assertEquals(curOffset, b.getOffset()); 661 662 // Now re-load this block knowing the on-disk size. This tests a 663 // different branch in the loader. 664 HFileBlock b2 = 665 hbr.readBlockData(curOffset, b.getOnDiskSizeWithHeader(), pread, false, false); 666 b2.sanityCheck(); 667 668 assertEquals(b.getBlockType(), b2.getBlockType()); 669 assertEquals(b.getOnDiskSizeWithoutHeader(), 670 b2.getOnDiskSizeWithoutHeader()); 671 assertEquals(b.getOnDiskSizeWithHeader(), 672 b2.getOnDiskSizeWithHeader()); 673 assertEquals(b.getUncompressedSizeWithoutHeader(), 674 b2.getUncompressedSizeWithoutHeader()); 675 assertEquals(b.getPrevBlockOffset(), b2.getPrevBlockOffset()); 676 assertEquals(curOffset, b2.getOffset()); 677 assertEquals(b.getBytesPerChecksum(), b2.getBytesPerChecksum()); 678 assertEquals(b.getOnDiskDataSizeWithHeader(), 679 b2.getOnDiskDataSizeWithHeader()); 680 assertEquals(0, HFile.getAndResetChecksumFailuresCount()); 681 assertRelease(b2); 682 683 curOffset += b.getOnDiskSizeWithHeader(); 684 685 if (cacheOnWrite) { 686 // NOTE: cache-on-write testing doesn't actually involve a BlockCache. It simply 687 // verifies that the unpacked value read back off disk matches the unpacked value 688 // generated before writing to disk. 689 HFileBlock newBlock = b.unpack(meta, hbr); 690 // b's buffer has header + data + checksum while 691 // expectedContents have header + data only 692 ByteBuff bufRead = newBlock.getBufferReadOnly(); 693 ByteBuffer bufExpected = expectedContents.get(i); 694 byte[] tmp = new byte[bufRead.limit() - newBlock.totalChecksumBytes()]; 695 bufRead.get(tmp, 0, tmp.length); 696 boolean bytesAreCorrect = Bytes.compareTo(tmp, 0, tmp.length, bufExpected.array(), 697 bufExpected.arrayOffset(), bufExpected.limit()) == 0; 698 String wrongBytesMsg = ""; 699 700 if (!bytesAreCorrect) { 701 // Optimization: only construct an error message in case we 702 // will need it. 703 wrongBytesMsg = "Expected bytes in block #" + i + " (algo=" 704 + algo + ", pread=" + pread 705 + ", cacheOnWrite=" + cacheOnWrite + "):\n"; 706 wrongBytesMsg += Bytes.toStringBinary(bufExpected.array(), 707 bufExpected.arrayOffset(), Math.min(32 + 10, bufExpected.limit())) 708 + ", actual:\n" 709 + Bytes.toStringBinary(bufRead.array(), 710 bufRead.arrayOffset(), Math.min(32 + 10, bufRead.limit())); 711 if (detailedLogging) { 712 LOG.warn("expected header" + 713 HFileBlock.toStringHeader(new SingleByteBuff(bufExpected)) + 714 "\nfound header" + 715 HFileBlock.toStringHeader(bufRead)); 716 LOG.warn("bufread offset " + bufRead.arrayOffset() + 717 " limit " + bufRead.limit() + 718 " expected offset " + bufExpected.arrayOffset() + 719 " limit " + bufExpected.limit()); 720 LOG.warn(wrongBytesMsg); 721 } 722 } 723 assertTrue(wrongBytesMsg, bytesAreCorrect); 724 assertRelease(newBlock); 725 if (newBlock != b) { 726 assertRelease(b); 727 } 728 } else { 729 assertRelease(b); 730 } 731 } 732 assertEquals(curOffset, fs.getFileStatus(path).getLen()); 733 is.close(); 734 } 735 } 736 } 737 } 738 739 private Random defaultRandom() { 740 return new Random(189237); 741 } 742 743 private class BlockReaderThread implements Callable<Boolean> { 744 private final String clientId; 745 private final HFileBlock.FSReader hbr; 746 private final List<Long> offsets; 747 private final List<BlockType> types; 748 private final long fileSize; 749 750 public BlockReaderThread(String clientId, 751 HFileBlock.FSReader hbr, List<Long> offsets, List<BlockType> types, 752 long fileSize) { 753 this.clientId = clientId; 754 this.offsets = offsets; 755 this.hbr = hbr; 756 this.types = types; 757 this.fileSize = fileSize; 758 } 759 760 @Override 761 public Boolean call() throws Exception { 762 Random rand = new Random(clientId.hashCode()); 763 long endTime = System.currentTimeMillis() + 10000; 764 int numBlocksRead = 0; 765 int numPositionalRead = 0; 766 int numWithOnDiskSize = 0; 767 while (System.currentTimeMillis() < endTime) { 768 int blockId = rand.nextInt(NUM_TEST_BLOCKS); 769 long offset = offsets.get(blockId); 770 // now we only support concurrent read with pread = true 771 boolean pread = true; 772 boolean withOnDiskSize = rand.nextBoolean(); 773 long expectedSize = 774 (blockId == NUM_TEST_BLOCKS - 1 ? fileSize : offsets.get(blockId + 1)) - offset; 775 HFileBlock b = null; 776 try { 777 long onDiskSizeArg = withOnDiskSize ? expectedSize : -1; 778 b = hbr.readBlockData(offset, onDiskSizeArg, pread, false, false); 779 if (useHeapAllocator) { 780 assertTrue(!b.isSharedMem()); 781 } else { 782 assertTrue(!b.getBlockType().isData() || b.isSharedMem()); 783 } 784 assertEquals(types.get(blockId), b.getBlockType()); 785 assertEquals(expectedSize, b.getOnDiskSizeWithHeader()); 786 assertEquals(offset, b.getOffset()); 787 } catch (IOException ex) { 788 LOG.error("Error in client " + clientId + " trying to read block at " + offset 789 + ", pread=" + pread + ", withOnDiskSize=" + withOnDiskSize, 790 ex); 791 return false; 792 } finally { 793 if (b != null) { 794 b.release(); 795 } 796 } 797 ++numBlocksRead; 798 if (pread) { 799 ++numPositionalRead; 800 } 801 802 if (withOnDiskSize) { 803 ++numWithOnDiskSize; 804 } 805 } 806 LOG.info("Client " + clientId + " successfully read " + numBlocksRead + 807 " blocks (with pread: " + numPositionalRead + ", with onDiskSize " + 808 "specified: " + numWithOnDiskSize + ")"); 809 810 return true; 811 } 812 } 813 814 @Test 815 public void testConcurrentReading() throws Exception { 816 testConcurrentReadingInternals(); 817 } 818 819 protected void testConcurrentReadingInternals() throws IOException, 820 InterruptedException, ExecutionException { 821 for (Compression.Algorithm compressAlgo : COMPRESSION_ALGORITHMS) { 822 Path path = new Path(TEST_UTIL.getDataTestDir(), "concurrent_reading"); 823 Random rand = defaultRandom(); 824 List<Long> offsets = new ArrayList<>(); 825 List<BlockType> types = new ArrayList<>(); 826 writeBlocks(rand, compressAlgo, path, offsets, null, types, null); 827 FSDataInputStream is = fs.open(path); 828 long fileSize = fs.getFileStatus(path).getLen(); 829 HFileContext meta = new HFileContextBuilder() 830 .withHBaseCheckSum(true) 831 .withIncludesMvcc(includesMemstoreTS) 832 .withIncludesTags(includesTag) 833 .withCompression(compressAlgo) 834 .build(); 835 ReaderContext context = new ReaderContextBuilder() 836 .withInputStreamWrapper(new FSDataInputStreamWrapper(is)) 837 .withFileSize(fileSize) 838 .withFilePath(path) 839 .withFileSystem(fs) 840 .build(); 841 HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(context, meta, alloc); 842 843 Executor exec = Executors.newFixedThreadPool(NUM_READER_THREADS); 844 ExecutorCompletionService<Boolean> ecs = new ExecutorCompletionService<>(exec); 845 846 for (int i = 0; i < NUM_READER_THREADS; ++i) { 847 ecs.submit(new BlockReaderThread("reader_" + (char) ('A' + i), hbr, 848 offsets, types, fileSize)); 849 } 850 851 for (int i = 0; i < NUM_READER_THREADS; ++i) { 852 Future<Boolean> result = ecs.take(); 853 assertTrue(result.get()); 854 if (detailedLogging) { 855 LOG.info(String.valueOf(i + 1) 856 + " reader threads finished successfully (algo=" + compressAlgo 857 + ")"); 858 } 859 } 860 is.close(); 861 } 862 } 863 864 private long writeBlocks(Random rand, Compression.Algorithm compressAlgo, 865 Path path, List<Long> expectedOffsets, List<Long> expectedPrevOffsets, 866 List<BlockType> expectedTypes, List<ByteBuffer> expectedContents 867 ) throws IOException { 868 boolean cacheOnWrite = expectedContents != null; 869 FSDataOutputStream os = fs.create(path); 870 HFileContext meta = new HFileContextBuilder() 871 .withHBaseCheckSum(true) 872 .withIncludesMvcc(includesMemstoreTS) 873 .withIncludesTags(includesTag) 874 .withCompression(compressAlgo) 875 .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM) 876 .build(); 877 HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta); 878 Map<BlockType, Long> prevOffsetByType = new HashMap<>(); 879 long totalSize = 0; 880 for (int i = 0; i < NUM_TEST_BLOCKS; ++i) { 881 long pos = os.getPos(); 882 int blockTypeOrdinal = rand.nextInt(BlockType.values().length); 883 if (blockTypeOrdinal == BlockType.ENCODED_DATA.ordinal()) { 884 blockTypeOrdinal = BlockType.DATA.ordinal(); 885 } 886 BlockType bt = BlockType.values()[blockTypeOrdinal]; 887 DataOutputStream dos = hbw.startWriting(bt); 888 int size = rand.nextInt(500); 889 for (int j = 0; j < size; ++j) { 890 // This might compress well. 891 dos.writeShort(i + 1); 892 dos.writeInt(j + 1); 893 } 894 895 if (expectedOffsets != null) 896 expectedOffsets.add(os.getPos()); 897 898 if (expectedPrevOffsets != null) { 899 Long prevOffset = prevOffsetByType.get(bt); 900 expectedPrevOffsets.add(prevOffset != null ? prevOffset : -1); 901 prevOffsetByType.put(bt, os.getPos()); 902 } 903 904 expectedTypes.add(bt); 905 906 hbw.writeHeaderAndData(os); 907 totalSize += hbw.getOnDiskSizeWithHeader(); 908 909 if (cacheOnWrite) { 910 ByteBuff buff = hbw.cloneUncompressedBufferWithHeader(); 911 expectedContents.add(buff.asSubByteBuffer(buff.capacity())); 912 } 913 914 if (detailedLogging) { 915 LOG.info("Written block #" + i + " of type " + bt 916 + ", uncompressed size " + hbw.getUncompressedSizeWithoutHeader() 917 + ", packed size " + hbw.getOnDiskSizeWithoutHeader() 918 + " at offset " + pos); 919 } 920 } 921 os.close(); 922 LOG.info("Created a temporary file at " + path + ", " 923 + fs.getFileStatus(path).getLen() + " byte, compression=" + 924 compressAlgo); 925 return totalSize; 926 } 927 928 @Test 929 public void testBlockHeapSize() { 930 testBlockHeapSizeInternals(); 931 } 932 933 protected void testBlockHeapSizeInternals() { 934 if (ClassSize.is32BitJVM()) { 935 assertEquals(64, HFileBlock.MULTI_BYTE_BUFFER_HEAP_SIZE); 936 } else { 937 assertEquals(80, HFileBlock.MULTI_BYTE_BUFFER_HEAP_SIZE); 938 } 939 940 for (int size : new int[] { 100, 256, 12345 }) { 941 byte[] byteArr = new byte[HConstants.HFILEBLOCK_HEADER_SIZE + size]; 942 ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size); 943 HFileContext meta = new HFileContextBuilder() 944 .withIncludesMvcc(includesMemstoreTS) 945 .withIncludesTags(includesTag) 946 .withHBaseCheckSum(false) 947 .withCompression(Algorithm.NONE) 948 .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM) 949 .withChecksumType(ChecksumType.NULL).build(); 950 HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf), 951 HFileBlock.FILL_HEADER, -1, 0, -1, meta, HEAP); 952 long byteBufferExpectedSize = 953 ClassSize.align(ClassSize.estimateBase(new MultiByteBuff(buf).getClass(), true) 954 + HConstants.HFILEBLOCK_HEADER_SIZE + size); 955 long hfileMetaSize = ClassSize.align(ClassSize.estimateBase(HFileContext.class, true)); 956 long hfileBlockExpectedSize = ClassSize.align(ClassSize.estimateBase(HFileBlock.class, true)); 957 long expected = hfileBlockExpectedSize + byteBufferExpectedSize + hfileMetaSize; 958 assertEquals("Block data size: " + size + ", byte buffer expected " + 959 "size: " + byteBufferExpectedSize + ", HFileBlock class expected " + 960 "size: " + hfileBlockExpectedSize + " HFileContext class expected size: " 961 + hfileMetaSize + "; ", expected, 962 block.heapSize()); 963 } 964 } 965 966 @Test 967 public void testSerializeWithoutNextBlockMetadata() { 968 int size = 100; 969 int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; 970 byte[] byteArr = new byte[length]; 971 ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size); 972 HFileContext meta = new HFileContextBuilder().build(); 973 HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, 974 ByteBuff.wrap(buf), HFileBlock.FILL_HEADER, -1, 52, -1, meta, alloc); 975 HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, 976 ByteBuff.wrap(buf), HFileBlock.FILL_HEADER, -1, -1, -1, meta, alloc); 977 ByteBuffer buff1 = ByteBuffer.allocate(length); 978 ByteBuffer buff2 = ByteBuffer.allocate(length); 979 blockWithNextBlockMetadata.serialize(buff1, true); 980 blockWithoutNextBlockMetadata.serialize(buff2, true); 981 assertNotEquals(buff1, buff2); 982 buff1.clear(); 983 buff2.clear(); 984 blockWithNextBlockMetadata.serialize(buff1, false); 985 blockWithoutNextBlockMetadata.serialize(buff2, false); 986 assertEquals(buff1, buff2); 987 } 988}