001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP; 021import java.io.DataInputStream; 022import java.io.DataOutput; 023import java.io.DataOutputStream; 024import java.io.IOException; 025import java.nio.ByteBuffer; 026import java.util.ArrayList; 027import java.util.List; 028import java.util.concurrent.atomic.AtomicReference; 029import java.util.concurrent.locks.Lock; 030import java.util.concurrent.locks.ReentrantLock; 031import org.apache.hadoop.fs.FSDataInputStream; 032import org.apache.hadoop.fs.FSDataOutputStream; 033import org.apache.hadoop.hbase.Cell; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.fs.HFileSystem; 036import org.apache.hadoop.hbase.io.ByteArrayOutputStream; 037import org.apache.hadoop.hbase.io.ByteBuffAllocator; 038import org.apache.hadoop.hbase.io.ByteBuffInputStream; 039import org.apache.hadoop.hbase.io.ByteBufferWriterDataOutputStream; 040import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; 041import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 042import org.apache.hadoop.hbase.io.encoding.EncodingState; 043import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; 044import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext; 045import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext; 046import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext; 047import org.apache.hadoop.hbase.io.util.BlockIOUtils; 048import org.apache.hadoop.hbase.nio.ByteBuff; 049import org.apache.hadoop.hbase.nio.MultiByteBuff; 050import org.apache.hadoop.hbase.nio.SingleByteBuff; 051import org.apache.hadoop.hbase.regionserver.ShipperListener; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.apache.hadoop.hbase.util.ChecksumType; 054import org.apache.hadoop.hbase.util.ClassSize; 055import org.apache.yetus.audience.InterfaceAudience; 056import org.slf4j.Logger; 057import org.slf4j.LoggerFactory; 058 059import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 060 061/** 062 * Cacheable Blocks of an {@link HFile} version 2 file. 063 * Version 2 was introduced in hbase-0.92.0. 064 * 065 * <p>Version 1 was the original file block. Version 2 was introduced when we changed the hbase file 066 * format to support multi-level block indexes and compound bloom filters (HBASE-3857). Support 067 * for Version 1 was removed in hbase-1.3.0. 068 * 069 * <h3>HFileBlock: Version 2</h3> 070 * In version 2, a block is structured as follows: 071 * <ul> 072 * <li><b>Header:</b> See Writer#putHeader() for where header is written; header total size is 073 * HFILEBLOCK_HEADER_SIZE 074 * <ul> 075 * <li>0. blockType: Magic record identifying the {@link BlockType} (8 bytes): 076 * e.g. <code>DATABLK*</code> 077 * <li>1. onDiskSizeWithoutHeader: Compressed -- a.k.a 'on disk' -- block size, excluding header, 078 * but including tailing checksum bytes (4 bytes) 079 * <li>2. uncompressedSizeWithoutHeader: Uncompressed block size, excluding header, and excluding 080 * checksum bytes (4 bytes) 081 * <li>3. prevBlockOffset: The offset of the previous block of the same type (8 bytes). This is 082 * used to navigate to the previous block without having to go to the block index 083 * <li>4: For minorVersions >=1, the ordinal describing checksum type (1 byte) 084 * <li>5: For minorVersions >=1, the number of data bytes/checksum chunk (4 bytes) 085 * <li>6: onDiskDataSizeWithHeader: For minorVersions >=1, the size of data 'on disk', including 086 * header, excluding checksums (4 bytes) 087 * </ul> 088 * </li> 089 * <li><b>Raw/Compressed/Encrypted/Encoded data:</b> The compression 090 * algorithm is the same for all the blocks in an {@link HFile}. If compression is NONE, this is 091 * just raw, serialized Cells. 092 * <li><b>Tail:</b> For minorVersions >=1, a series of 4 byte checksums, one each for 093 * the number of bytes specified by bytesPerChecksum. 094 * </ul> 095 * 096 * <h3>Caching</h3> 097 * Caches cache whole blocks with trailing checksums if any. We then tag on some metadata, the 098 * content of BLOCK_METADATA_SPACE which will be flag on if we are doing 'hbase' 099 * checksums and then the offset into the file which is needed when we re-make a cache key 100 * when we return the block to the cache as 'done'. 101 * See {@link Cacheable#serialize(ByteBuffer, boolean)} and {@link Cacheable#getDeserializer()}. 102 * 103 * <p>TODO: Should we cache the checksums? Down in Writer#getBlockForCaching(CacheConfig) where 104 * we make a block to cache-on-write, there is an attempt at turning off checksums. This is not the 105 * only place we get blocks to cache. We also will cache the raw return from an hdfs read. In this 106 * case, the checksums may be present. If the cache is backed by something that doesn't do ECC, 107 * say an SSD, we might want to preserve checksums. For now this is open question. 108 * <p>TODO: Over in BucketCache, we save a block allocation by doing a custom serialization. 109 * Be sure to change it if serialization changes in here. Could we add a method here that takes an 110 * IOEngine and that then serializes to it rather than expose our internals over in BucketCache? 111 * IOEngine is in the bucket subpackage. Pull it up? Then this class knows about bucketcache. Ugh. 112 */ 113@InterfaceAudience.Private 114public class HFileBlock implements Cacheable { 115 private static final Logger LOG = LoggerFactory.getLogger(HFileBlock.class); 116 public static final long FIXED_OVERHEAD = ClassSize.estimateBase(HFileBlock.class, false); 117 118 // Block Header fields. 119 120 // TODO: encapsulate Header related logic in this inner class. 121 static class Header { 122 // Format of header is: 123 // 8 bytes - block magic 124 // 4 bytes int - onDiskSizeWithoutHeader 125 // 4 bytes int - uncompressedSizeWithoutHeader 126 // 8 bytes long - prevBlockOffset 127 // The following 3 are only present if header contains checksum information 128 // 1 byte - checksum type 129 // 4 byte int - bytes per checksum 130 // 4 byte int - onDiskDataSizeWithHeader 131 static int BLOCK_MAGIC_INDEX = 0; 132 static int ON_DISK_SIZE_WITHOUT_HEADER_INDEX = 8; 133 static int UNCOMPRESSED_SIZE_WITHOUT_HEADER_INDEX = 12; 134 static int PREV_BLOCK_OFFSET_INDEX = 16; 135 static int CHECKSUM_TYPE_INDEX = 24; 136 static int BYTES_PER_CHECKSUM_INDEX = 25; 137 static int ON_DISK_DATA_SIZE_WITH_HEADER_INDEX = 29; 138 } 139 140 /** Type of block. Header field 0. */ 141 private BlockType blockType; 142 143 /** 144 * Size on disk excluding header, including checksum. Header field 1. 145 * @see Writer#putHeader(byte[], int, int, int, int) 146 */ 147 private int onDiskSizeWithoutHeader; 148 149 /** 150 * Size of pure data. Does not include header or checksums. Header field 2. 151 * @see Writer#putHeader(byte[], int, int, int, int) 152 */ 153 private int uncompressedSizeWithoutHeader; 154 155 /** 156 * The offset of the previous block on disk. Header field 3. 157 * @see Writer#putHeader(byte[], int, int, int, int) 158 */ 159 private long prevBlockOffset; 160 161 /** 162 * Size on disk of header + data. Excludes checksum. Header field 6, 163 * OR calculated from {@link #onDiskSizeWithoutHeader} when using HDFS checksum. 164 * @see Writer#putHeader(byte[], int, int, int, int) 165 */ 166 private int onDiskDataSizeWithHeader; 167 // End of Block Header fields. 168 169 /** 170 * The in-memory representation of the hfile block. Can be on or offheap. Can be backed by 171 * a single ByteBuffer or by many. Make no assumptions. 172 * 173 * <p>Be careful reading from this <code>buf</code>. Duplicate and work on the duplicate or if 174 * not, be sure to reset position and limit else trouble down the road. 175 * 176 * <p>TODO: Make this read-only once made. 177 * 178 * <p>We are using the ByteBuff type. ByteBuffer is not extensible yet we need to be able to have 179 * a ByteBuffer-like API across multiple ByteBuffers reading from a cache such as BucketCache. 180 * So, we have this ByteBuff type. Unfortunately, it is spread all about HFileBlock. Would be 181 * good if could be confined to cache-use only but hard-to-do. 182 */ 183 private ByteBuff buf; 184 185 /** Meta data that holds meta information on the hfileblock. 186 */ 187 private HFileContext fileContext; 188 189 /** 190 * The offset of this block in the file. Populated by the reader for 191 * convenience of access. This offset is not part of the block header. 192 */ 193 private long offset = UNSET; 194 195 /** 196 * The on-disk size of the next block, including the header and checksums if present. 197 * UNSET if unknown. 198 * 199 * Blocks try to carry the size of the next block to read in this data member. Usually 200 * we get block sizes from the hfile index but sometimes the index is not available: 201 * e.g. when we read the indexes themselves (indexes are stored in blocks, we do not 202 * have an index for the indexes). Saves seeks especially around file open when 203 * there is a flurry of reading in hfile metadata. 204 */ 205 private int nextBlockOnDiskSize = UNSET; 206 207 private ByteBuffAllocator allocator; 208 209 /** 210 * On a checksum failure, do these many succeeding read requests using hdfs checksums before 211 * auto-reenabling hbase checksum verification. 212 */ 213 static final int CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD = 3; 214 215 private static int UNSET = -1; 216 public static final boolean FILL_HEADER = true; 217 public static final boolean DONT_FILL_HEADER = false; 218 219 // How to get the estimate correctly? if it is a singleBB? 220 public static final int MULTI_BYTE_BUFFER_HEAP_SIZE = 221 (int)ClassSize.estimateBase(MultiByteBuff.class, false); 222 223 /** 224 * Space for metadata on a block that gets stored along with the block when we cache it. 225 * There are a few bytes stuck on the end of the HFileBlock that we pull in from HDFS. 226 * 8 bytes are for the offset of this block (long) in the file. Offset is important because is is 227 * used when we remake the CacheKey when we return block to the cache when done. There is also 228 * a flag on whether checksumming is being done by hbase or not. See class comment for note on 229 * uncertain state of checksumming of blocks that come out of cache (should we or should we not?). 230 * Finally there are 4 bytes to hold the length of the next block which can save a seek on 231 * occasion if available. 232 * (This EXTRA info came in with original commit of the bucketcache, HBASE-7404. It was 233 * formerly known as EXTRA_SERIALIZATION_SPACE). 234 */ 235 static final int BLOCK_METADATA_SPACE = Bytes.SIZEOF_BYTE + Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT; 236 237 /** 238 * Each checksum value is an integer that can be stored in 4 bytes. 239 */ 240 static final int CHECKSUM_SIZE = Bytes.SIZEOF_INT; 241 242 static final byte[] DUMMY_HEADER_NO_CHECKSUM = 243 new byte[HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM]; 244 245 /** 246 * Used deserializing blocks from Cache. 247 * 248 * <code> 249 * ++++++++++++++ 250 * + HFileBlock + 251 * ++++++++++++++ 252 * + Checksums + <= Optional 253 * ++++++++++++++ 254 * + Metadata! + <= See note on BLOCK_METADATA_SPACE above. 255 * ++++++++++++++ 256 * </code> 257 * @see #serialize(ByteBuffer, boolean) 258 */ 259 public static final CacheableDeserializer<Cacheable> BLOCK_DESERIALIZER = new BlockDeserializer(); 260 261 public static final class BlockDeserializer implements CacheableDeserializer<Cacheable> { 262 private BlockDeserializer() { 263 } 264 265 @Override 266 public HFileBlock deserialize(ByteBuff buf, ByteBuffAllocator alloc) 267 throws IOException { 268 // The buf has the file block followed by block metadata. 269 // Set limit to just before the BLOCK_METADATA_SPACE then rewind. 270 buf.limit(buf.limit() - BLOCK_METADATA_SPACE).rewind(); 271 // Get a new buffer to pass the HFileBlock for it to 'own'. 272 ByteBuff newByteBuff = buf.slice(); 273 // Read out the BLOCK_METADATA_SPACE content and shove into our HFileBlock. 274 buf.position(buf.limit()); 275 buf.limit(buf.limit() + HFileBlock.BLOCK_METADATA_SPACE); 276 boolean usesChecksum = buf.get() == (byte) 1; 277 long offset = buf.getLong(); 278 int nextBlockOnDiskSize = buf.getInt(); 279 return createFromBuff(newByteBuff, usesChecksum, offset, nextBlockOnDiskSize, null, alloc); 280 } 281 282 @Override 283 public int getDeserializerIdentifier() { 284 return DESERIALIZER_IDENTIFIER; 285 } 286 } 287 288 private static final int DESERIALIZER_IDENTIFIER; 289 static { 290 DESERIALIZER_IDENTIFIER = 291 CacheableDeserializerIdManager.registerDeserializer(BLOCK_DESERIALIZER); 292 } 293 294 /** 295 * Creates a new {@link HFile} block from the given fields. This constructor 296 * is used only while writing blocks and caching, 297 * and is sitting in a byte buffer and we want to stuff the block into cache. 298 * See {@link Writer#getBlockForCaching(CacheConfig)}. 299 * 300 * <p>TODO: The caller presumes no checksumming 301 * <p>TODO: HFile block writer can also off-heap ? </p> 302 * required of this block instance since going into cache; checksum already verified on 303 * underlying block data pulled in from filesystem. Is that correct? What if cache is SSD? 304 * 305 * @param blockType the type of this block, see {@link BlockType} 306 * @param onDiskSizeWithoutHeader see {@link #onDiskSizeWithoutHeader} 307 * @param uncompressedSizeWithoutHeader see {@link #uncompressedSizeWithoutHeader} 308 * @param prevBlockOffset see {@link #prevBlockOffset} 309 * @param buf block buffer with header ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes) 310 * @param fillHeader when true, write the first 4 header fields into passed buffer. 311 * @param offset the file offset the block was read from 312 * @param onDiskDataSizeWithHeader see {@link #onDiskDataSizeWithHeader} 313 * @param fileContext HFile meta data 314 */ 315 public HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader, 316 int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuff buf, boolean fillHeader, 317 long offset, int nextBlockOnDiskSize, int onDiskDataSizeWithHeader, HFileContext fileContext, 318 ByteBuffAllocator allocator) { 319 this.blockType = blockType; 320 this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader; 321 this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader; 322 this.prevBlockOffset = prevBlockOffset; 323 this.offset = offset; 324 this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader; 325 this.nextBlockOnDiskSize = nextBlockOnDiskSize; 326 this.fileContext = fileContext; 327 this.allocator = allocator; 328 this.buf = buf; 329 if (fillHeader) { 330 overwriteHeader(); 331 } 332 this.buf.rewind(); 333 } 334 335 /** 336 * Creates a block from an existing buffer starting with a header. Rewinds 337 * and takes ownership of the buffer. By definition of rewind, ignores the 338 * buffer position, but if you slice the buffer beforehand, it will rewind 339 * to that point. 340 * @param buf Has header, content, and trailing checksums if present. 341 */ 342 static HFileBlock createFromBuff(ByteBuff buf, boolean usesHBaseChecksum, final long offset, 343 final int nextBlockOnDiskSize, HFileContext fileContext, ByteBuffAllocator allocator) 344 throws IOException { 345 buf.rewind(); 346 final BlockType blockType = BlockType.read(buf); 347 final int onDiskSizeWithoutHeader = buf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX); 348 final int uncompressedSizeWithoutHeader = 349 buf.getInt(Header.UNCOMPRESSED_SIZE_WITHOUT_HEADER_INDEX); 350 final long prevBlockOffset = buf.getLong(Header.PREV_BLOCK_OFFSET_INDEX); 351 // This constructor is called when we deserialize a block from cache and when we read a block in 352 // from the fs. fileCache is null when deserialized from cache so need to make up one. 353 HFileContextBuilder fileContextBuilder = fileContext != null ? 354 new HFileContextBuilder(fileContext) : new HFileContextBuilder(); 355 fileContextBuilder.withHBaseCheckSum(usesHBaseChecksum); 356 int onDiskDataSizeWithHeader; 357 if (usesHBaseChecksum) { 358 byte checksumType = buf.get(Header.CHECKSUM_TYPE_INDEX); 359 int bytesPerChecksum = buf.getInt(Header.BYTES_PER_CHECKSUM_INDEX); 360 onDiskDataSizeWithHeader = buf.getInt(Header.ON_DISK_DATA_SIZE_WITH_HEADER_INDEX); 361 // Use the checksum type and bytes per checksum from header, not from fileContext. 362 fileContextBuilder.withChecksumType(ChecksumType.codeToType(checksumType)); 363 fileContextBuilder.withBytesPerCheckSum(bytesPerChecksum); 364 } else { 365 fileContextBuilder.withChecksumType(ChecksumType.NULL); 366 fileContextBuilder.withBytesPerCheckSum(0); 367 // Need to fix onDiskDataSizeWithHeader; there are not checksums after-block-data 368 onDiskDataSizeWithHeader = onDiskSizeWithoutHeader + headerSize(usesHBaseChecksum); 369 } 370 fileContext = fileContextBuilder.build(); 371 assert usesHBaseChecksum == fileContext.isUseHBaseChecksum(); 372 return new HFileBlockBuilder() 373 .withBlockType(blockType) 374 .withOnDiskSizeWithoutHeader(onDiskSizeWithoutHeader) 375 .withUncompressedSizeWithoutHeader(uncompressedSizeWithoutHeader) 376 .withPrevBlockOffset(prevBlockOffset) 377 .withOffset(offset) 378 .withOnDiskDataSizeWithHeader(onDiskDataSizeWithHeader) 379 .withNextBlockOnDiskSize(nextBlockOnDiskSize) 380 .withHFileContext(fileContext) 381 .withByteBuffAllocator(allocator) 382 .withByteBuff(buf.rewind()) 383 .withShared(!buf.hasArray()) 384 .build(); 385 } 386 387 /** 388 * Parse total on disk size including header and checksum. 389 * @param headerBuf Header ByteBuffer. Presumed exact size of header. 390 * @param verifyChecksum true if checksum verification is in use. 391 * @return Size of the block with header included. 392 */ 393 private static int getOnDiskSizeWithHeader(final ByteBuff headerBuf, 394 boolean verifyChecksum) { 395 return headerBuf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX) + headerSize(verifyChecksum); 396 } 397 398 /** 399 * @return the on-disk size of the next block (including the header size and any checksums if 400 * present) read by peeking into the next block's header; use as a hint when doing 401 * a read of the next block when scanning or running over a file. 402 */ 403 int getNextBlockOnDiskSize() { 404 return nextBlockOnDiskSize; 405 } 406 407 @Override 408 public BlockType getBlockType() { 409 return blockType; 410 } 411 412 @Override 413 public int refCnt() { 414 return buf.refCnt(); 415 } 416 417 @Override 418 public HFileBlock retain() { 419 buf.retain(); 420 return this; 421 } 422 423 /** 424 * Call {@link ByteBuff#release()} to decrease the reference count, if no other reference, it will 425 * return back the {@link ByteBuffer} to {@link org.apache.hadoop.hbase.io.ByteBuffAllocator} 426 */ 427 @Override 428 public boolean release() { 429 return buf.release(); 430 } 431 432 /** @return get data block encoding id that was used to encode this block */ 433 short getDataBlockEncodingId() { 434 if (blockType != BlockType.ENCODED_DATA) { 435 throw new IllegalArgumentException("Querying encoder ID of a block " + 436 "of type other than " + BlockType.ENCODED_DATA + ": " + blockType); 437 } 438 return buf.getShort(headerSize()); 439 } 440 441 /** 442 * @return the on-disk size of header + data part + checksum. 443 */ 444 public int getOnDiskSizeWithHeader() { 445 return onDiskSizeWithoutHeader + headerSize(); 446 } 447 448 /** 449 * @return the on-disk size of the data part + checksum (header excluded). 450 */ 451 int getOnDiskSizeWithoutHeader() { 452 return onDiskSizeWithoutHeader; 453 } 454 455 /** 456 * @return the uncompressed size of data part (header and checksum excluded). 457 */ 458 int getUncompressedSizeWithoutHeader() { 459 return uncompressedSizeWithoutHeader; 460 } 461 462 /** 463 * @return the offset of the previous block of the same type in the file, or 464 * -1 if unknown 465 */ 466 long getPrevBlockOffset() { 467 return prevBlockOffset; 468 } 469 470 /** 471 * Rewinds {@code buf} and writes first 4 header fields. {@code buf} position 472 * is modified as side-effect. 473 */ 474 private void overwriteHeader() { 475 buf.rewind(); 476 blockType.write(buf); 477 buf.putInt(onDiskSizeWithoutHeader); 478 buf.putInt(uncompressedSizeWithoutHeader); 479 buf.putLong(prevBlockOffset); 480 if (this.fileContext.isUseHBaseChecksum()) { 481 buf.put(fileContext.getChecksumType().getCode()); 482 buf.putInt(fileContext.getBytesPerChecksum()); 483 buf.putInt(onDiskDataSizeWithHeader); 484 } 485 } 486 487 /** 488 * Returns a buffer that does not include the header and checksum. 489 * @return the buffer with header skipped and checksum omitted. 490 */ 491 public ByteBuff getBufferWithoutHeader() { 492 return this.getBufferWithoutHeader(false); 493 } 494 495 /** 496 * Returns a buffer that does not include the header or checksum. 497 * @param withChecksum to indicate whether include the checksum or not. 498 * @return the buffer with header skipped and checksum omitted. 499 */ 500 public ByteBuff getBufferWithoutHeader(boolean withChecksum) { 501 ByteBuff dup = getBufferReadOnly(); 502 int delta = withChecksum ? 0 : totalChecksumBytes(); 503 return dup.position(headerSize()).limit(buf.limit() - delta).slice(); 504 } 505 506 /** 507 * Returns a read-only duplicate of the buffer this block stores internally ready to be read. 508 * Clients must not modify the buffer object though they may set position and limit on the 509 * returned buffer since we pass back a duplicate. This method has to be public because it is used 510 * in {@link CompoundBloomFilter} to avoid object creation on every Bloom 511 * filter lookup, but has to be used with caution. Buffer holds header, block content, 512 * and any follow-on checksums if present. 513 * 514 * @return the buffer of this block for read-only operations 515 */ 516 public ByteBuff getBufferReadOnly() { 517 // TODO: ByteBuf does not support asReadOnlyBuffer(). Fix. 518 ByteBuff dup = this.buf.duplicate(); 519 assert dup.position() == 0; 520 return dup; 521 } 522 523 public ByteBuffAllocator getByteBuffAllocator() { 524 return this.allocator; 525 } 526 527 private void sanityCheckAssertion(long valueFromBuf, long valueFromField, 528 String fieldName) throws IOException { 529 if (valueFromBuf != valueFromField) { 530 throw new AssertionError(fieldName + " in the buffer (" + valueFromBuf 531 + ") is different from that in the field (" + valueFromField + ")"); 532 } 533 } 534 535 private void sanityCheckAssertion(BlockType valueFromBuf, BlockType valueFromField) 536 throws IOException { 537 if (valueFromBuf != valueFromField) { 538 throw new IOException("Block type stored in the buffer: " + 539 valueFromBuf + ", block type field: " + valueFromField); 540 } 541 } 542 543 /** 544 * Checks if the block is internally consistent, i.e. the first 545 * {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the buffer contain a 546 * valid header consistent with the fields. Assumes a packed block structure. 547 * This function is primary for testing and debugging, and is not 548 * thread-safe, because it alters the internal buffer pointer. 549 * Used by tests only. 550 */ 551 void sanityCheck() throws IOException { 552 // Duplicate so no side-effects 553 ByteBuff dup = this.buf.duplicate().rewind(); 554 sanityCheckAssertion(BlockType.read(dup), blockType); 555 556 sanityCheckAssertion(dup.getInt(), onDiskSizeWithoutHeader, "onDiskSizeWithoutHeader"); 557 558 sanityCheckAssertion(dup.getInt(), uncompressedSizeWithoutHeader, 559 "uncompressedSizeWithoutHeader"); 560 561 sanityCheckAssertion(dup.getLong(), prevBlockOffset, "prevBlockOffset"); 562 if (this.fileContext.isUseHBaseChecksum()) { 563 sanityCheckAssertion(dup.get(), this.fileContext.getChecksumType().getCode(), "checksumType"); 564 sanityCheckAssertion(dup.getInt(), this.fileContext.getBytesPerChecksum(), 565 "bytesPerChecksum"); 566 sanityCheckAssertion(dup.getInt(), onDiskDataSizeWithHeader, "onDiskDataSizeWithHeader"); 567 } 568 569 int cksumBytes = totalChecksumBytes(); 570 int expectedBufLimit = onDiskDataSizeWithHeader + cksumBytes; 571 if (dup.limit() != expectedBufLimit) { 572 throw new AssertionError("Expected limit " + expectedBufLimit + ", got " + dup.limit()); 573 } 574 575 // We might optionally allocate HFILEBLOCK_HEADER_SIZE more bytes to read the next 576 // block's header, so there are two sensible values for buffer capacity. 577 int hdrSize = headerSize(); 578 dup.rewind(); 579 if (dup.remaining() != expectedBufLimit && dup.remaining() != expectedBufLimit + hdrSize) { 580 throw new AssertionError("Invalid buffer capacity: " + dup.remaining() + 581 ", expected " + expectedBufLimit + " or " + (expectedBufLimit + hdrSize)); 582 } 583 } 584 585 @Override 586 public String toString() { 587 StringBuilder sb = new StringBuilder() 588 .append("[") 589 .append("blockType=").append(blockType) 590 .append(", fileOffset=").append(offset) 591 .append(", headerSize=").append(headerSize()) 592 .append(", onDiskSizeWithoutHeader=").append(onDiskSizeWithoutHeader) 593 .append(", uncompressedSizeWithoutHeader=").append(uncompressedSizeWithoutHeader) 594 .append(", prevBlockOffset=").append(prevBlockOffset) 595 .append(", isUseHBaseChecksum=").append(fileContext.isUseHBaseChecksum()); 596 if (fileContext.isUseHBaseChecksum()) { 597 sb.append(", checksumType=").append(ChecksumType.codeToType(this.buf.get(24))) 598 .append(", bytesPerChecksum=").append(this.buf.getInt(24 + 1)) 599 .append(", onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader); 600 } else { 601 sb.append(", onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader) 602 .append("(").append(onDiskSizeWithoutHeader) 603 .append("+").append(HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM).append(")"); 604 } 605 String dataBegin; 606 if (buf.hasArray()) { 607 dataBegin = Bytes.toStringBinary(buf.array(), buf.arrayOffset() + headerSize(), 608 Math.min(32, buf.limit() - buf.arrayOffset() - headerSize())); 609 } else { 610 ByteBuff bufWithoutHeader = getBufferWithoutHeader(); 611 byte[] dataBeginBytes = new byte[Math.min(32, 612 bufWithoutHeader.limit() - bufWithoutHeader.position())]; 613 bufWithoutHeader.get(dataBeginBytes); 614 dataBegin = Bytes.toStringBinary(dataBeginBytes); 615 } 616 sb.append(", getOnDiskSizeWithHeader=").append(getOnDiskSizeWithHeader()) 617 .append(", totalChecksumBytes=").append(totalChecksumBytes()) 618 .append(", isUnpacked=").append(isUnpacked()) 619 .append(", buf=[").append(buf).append("]") 620 .append(", dataBeginsWith=").append(dataBegin) 621 .append(", fileContext=").append(fileContext) 622 .append(", nextBlockOnDiskSize=").append(nextBlockOnDiskSize) 623 .append("]"); 624 return sb.toString(); 625 } 626 627 /** 628 * Retrieves the decompressed/decrypted view of this block. An encoded block remains in its 629 * encoded structure. Internal structures are shared between instances where applicable. 630 */ 631 HFileBlock unpack(HFileContext fileContext, FSReader reader) throws IOException { 632 if (!fileContext.isCompressedOrEncrypted()) { 633 // TODO: cannot use our own fileContext here because HFileBlock(ByteBuffer, boolean), 634 // which is used for block serialization to L2 cache, does not preserve encoding and 635 // encryption details. 636 return this; 637 } 638 639 HFileBlock unpacked = shallowClone(this); 640 unpacked.allocateBuffer(); // allocates space for the decompressed block 641 boolean succ = false; 642 try { 643 HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA 644 ? reader.getBlockDecodingContext() : reader.getDefaultBlockDecodingContext(); 645 // Create a duplicated buffer without the header part. 646 ByteBuff dup = this.buf.duplicate(); 647 dup.position(this.headerSize()); 648 dup = dup.slice(); 649 // Decode the dup into unpacked#buf 650 ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(), 651 unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(true), dup); 652 succ = true; 653 return unpacked; 654 } finally { 655 if (!succ) { 656 unpacked.release(); 657 } 658 } 659 } 660 661 /** 662 * Always allocates a new buffer of the correct size. Copies header bytes 663 * from the existing buffer. Does not change header fields. 664 * Reserve room to keep checksum bytes too. 665 */ 666 private void allocateBuffer() { 667 int cksumBytes = totalChecksumBytes(); 668 int headerSize = headerSize(); 669 int capacityNeeded = headerSize + uncompressedSizeWithoutHeader + cksumBytes; 670 671 ByteBuff newBuf = allocator.allocate(capacityNeeded); 672 673 // Copy header bytes into newBuf. 674 buf.position(0); 675 newBuf.put(0, buf, 0, headerSize); 676 677 buf = newBuf; 678 // set limit to exclude next block's header 679 buf.limit(capacityNeeded); 680 } 681 682 /** 683 * Return true when this block's buffer has been unpacked, false otherwise. Note this is a 684 * calculated heuristic, not tracked attribute of the block. 685 */ 686 public boolean isUnpacked() { 687 final int cksumBytes = totalChecksumBytes(); 688 final int headerSize = headerSize(); 689 final int expectedCapacity = headerSize + uncompressedSizeWithoutHeader + cksumBytes; 690 final int bufCapacity = buf.remaining(); 691 return bufCapacity == expectedCapacity || bufCapacity == expectedCapacity + headerSize; 692 } 693 694 /** 695 * Cannot be {@link #UNSET}. Must be a legitimate value. Used re-making the {@link BlockCacheKey} 696 * when block is returned to the cache. 697 * @return the offset of this block in the file it was read from 698 */ 699 long getOffset() { 700 if (offset < 0) { 701 throw new IllegalStateException("HFile block offset not initialized properly"); 702 } 703 return offset; 704 } 705 706 /** 707 * @return a byte stream reading the data + checksum of this block 708 */ 709 DataInputStream getByteStream() { 710 ByteBuff dup = this.buf.duplicate(); 711 dup.position(this.headerSize()); 712 return new DataInputStream(new ByteBuffInputStream(dup)); 713 } 714 715 @Override 716 public long heapSize() { 717 long size = FIXED_OVERHEAD; 718 size += fileContext.heapSize(); 719 if (buf != null) { 720 // Deep overhead of the byte buffer. Needs to be aligned separately. 721 size += ClassSize.align(buf.capacity() + MULTI_BYTE_BUFFER_HEAP_SIZE); 722 } 723 return ClassSize.align(size); 724 } 725 726 /** 727 * Will be override by {@link SharedMemHFileBlock} or {@link ExclusiveMemHFileBlock}. Return true 728 * by default. 729 */ 730 public boolean isSharedMem() { 731 if (this instanceof SharedMemHFileBlock) { 732 return true; 733 } else if (this instanceof ExclusiveMemHFileBlock) { 734 return false; 735 } 736 return true; 737 } 738 739 /** 740 * Unified version 2 {@link HFile} block writer. The intended usage pattern 741 * is as follows: 742 * <ol> 743 * <li>Construct an {@link HFileBlock.Writer}, providing a compression algorithm. 744 * <li>Call {@link Writer#startWriting} and get a data stream to write to. 745 * <li>Write your data into the stream. 746 * <li>Call Writer#writeHeaderAndData(FSDataOutputStream) as many times as you need to. 747 * store the serialized block into an external stream. 748 * <li>Repeat to write more blocks. 749 * </ol> 750 * <p> 751 */ 752 static class Writer implements ShipperListener { 753 private enum State { 754 INIT, 755 WRITING, 756 BLOCK_READY 757 }; 758 759 /** Writer state. Used to ensure the correct usage protocol. */ 760 private State state = State.INIT; 761 762 /** Data block encoder used for data blocks */ 763 private final HFileDataBlockEncoder dataBlockEncoder; 764 765 private HFileBlockEncodingContext dataBlockEncodingCtx; 766 767 /** block encoding context for non-data blocks*/ 768 private HFileBlockDefaultEncodingContext defaultBlockEncodingCtx; 769 770 /** 771 * The stream we use to accumulate data into a block in an uncompressed format. 772 * We reset this stream at the end of each block and reuse it. The 773 * header is written as the first {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes into this 774 * stream. 775 */ 776 private ByteArrayOutputStream baosInMemory; 777 778 /** 779 * Current block type. Set in {@link #startWriting(BlockType)}. Could be 780 * changed in {@link #finishBlock()} from {@link BlockType#DATA} 781 * to {@link BlockType#ENCODED_DATA}. 782 */ 783 private BlockType blockType; 784 785 /** 786 * A stream that we write uncompressed bytes to, which compresses them and 787 * writes them to {@link #baosInMemory}. 788 */ 789 private DataOutputStream userDataStream; 790 791 /** 792 * Bytes to be written to the file system, including the header. Compressed 793 * if compression is turned on. It also includes the checksum data that 794 * immediately follows the block data. (header + data + checksums) 795 */ 796 private ByteArrayOutputStream onDiskBlockBytesWithHeader; 797 798 /** 799 * The size of the checksum data on disk. It is used only if data is 800 * not compressed. If data is compressed, then the checksums are already 801 * part of onDiskBytesWithHeader. If data is uncompressed, then this 802 * variable stores the checksum data for this block. 803 */ 804 private byte[] onDiskChecksum = HConstants.EMPTY_BYTE_ARRAY; 805 806 /** 807 * Current block's start offset in the {@link HFile}. Set in 808 * {@link #writeHeaderAndData(FSDataOutputStream)}. 809 */ 810 private long startOffset; 811 812 /** 813 * Offset of previous block by block type. Updated when the next block is 814 * started. 815 */ 816 private long[] prevOffsetByType; 817 818 /** The offset of the previous block of the same type */ 819 private long prevOffset; 820 /** Meta data that holds information about the hfileblock**/ 821 private HFileContext fileContext; 822 823 private final ByteBuffAllocator allocator; 824 825 @Override 826 public void beforeShipped() { 827 if (getEncodingState() != null) { 828 getEncodingState().beforeShipped(); 829 } 830 } 831 832 EncodingState getEncodingState() { 833 return dataBlockEncodingCtx.getEncodingState(); 834 } 835 836 /** 837 * @param dataBlockEncoder data block encoding algorithm to use 838 */ 839 public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext fileContext) { 840 this(dataBlockEncoder, fileContext, ByteBuffAllocator.HEAP); 841 } 842 843 public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext fileContext, 844 ByteBuffAllocator allocator) { 845 if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) { 846 throw new RuntimeException("Unsupported value of bytesPerChecksum. " + 847 " Minimum is " + HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is " + 848 fileContext.getBytesPerChecksum()); 849 } 850 this.allocator = allocator; 851 this.dataBlockEncoder = dataBlockEncoder != null? 852 dataBlockEncoder: NoOpDataBlockEncoder.INSTANCE; 853 this.dataBlockEncodingCtx = this.dataBlockEncoder. 854 newDataBlockEncodingContext(HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext); 855 // TODO: This should be lazily instantiated since we usually do NOT need this default encoder 856 this.defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(null, 857 HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext); 858 // TODO: Set BAOS initial size. Use fileContext.getBlocksize() and add for header/checksum 859 baosInMemory = new ByteArrayOutputStream(); 860 prevOffsetByType = new long[BlockType.values().length]; 861 for (int i = 0; i < prevOffsetByType.length; ++i) { 862 prevOffsetByType[i] = UNSET; 863 } 864 // TODO: Why fileContext saved away when we have dataBlockEncoder and/or 865 // defaultDataBlockEncoder? 866 this.fileContext = fileContext; 867 } 868 869 /** 870 * Starts writing into the block. The previous block's data is discarded. 871 * 872 * @return the stream the user can write their data into 873 */ 874 DataOutputStream startWriting(BlockType newBlockType) 875 throws IOException { 876 if (state == State.BLOCK_READY && startOffset != -1) { 877 // We had a previous block that was written to a stream at a specific 878 // offset. Save that offset as the last offset of a block of that type. 879 prevOffsetByType[blockType.getId()] = startOffset; 880 } 881 882 startOffset = -1; 883 blockType = newBlockType; 884 885 baosInMemory.reset(); 886 baosInMemory.write(HConstants.HFILEBLOCK_DUMMY_HEADER); 887 888 state = State.WRITING; 889 890 // We will compress it later in finishBlock() 891 userDataStream = new ByteBufferWriterDataOutputStream(baosInMemory); 892 if (newBlockType == BlockType.DATA) { 893 this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream); 894 } 895 return userDataStream; 896 } 897 898 /** 899 * Writes the Cell to this block 900 */ 901 void write(Cell cell) throws IOException{ 902 expectState(State.WRITING); 903 this.dataBlockEncoder.encode(cell, dataBlockEncodingCtx, this.userDataStream); 904 } 905 906 /** 907 * Transitions the block writer from the "writing" state to the "block 908 * ready" state. Does nothing if a block is already finished. 909 */ 910 void ensureBlockReady() throws IOException { 911 Preconditions.checkState(state != State.INIT, 912 "Unexpected state: " + state); 913 914 if (state == State.BLOCK_READY) { 915 return; 916 } 917 918 // This will set state to BLOCK_READY. 919 finishBlock(); 920 } 921 922 /** 923 * Finish up writing of the block. 924 * Flushes the compressing stream (if using compression), fills out the header, 925 * does any compression/encryption of bytes to flush out to disk, and manages 926 * the cache on write content, if applicable. Sets block write state to "block ready". 927 */ 928 private void finishBlock() throws IOException { 929 if (blockType == BlockType.DATA) { 930 this.dataBlockEncoder.endBlockEncoding(dataBlockEncodingCtx, userDataStream, 931 baosInMemory.getBuffer(), blockType); 932 blockType = dataBlockEncodingCtx.getBlockType(); 933 } 934 userDataStream.flush(); 935 prevOffset = prevOffsetByType[blockType.getId()]; 936 937 // We need to set state before we can package the block up for cache-on-write. In a way, the 938 // block is ready, but not yet encoded or compressed. 939 state = State.BLOCK_READY; 940 Bytes compressAndEncryptDat; 941 if (blockType == BlockType.DATA || blockType == BlockType.ENCODED_DATA) { 942 compressAndEncryptDat = dataBlockEncodingCtx. 943 compressAndEncrypt(baosInMemory.getBuffer(), 0, baosInMemory.size()); 944 } else { 945 compressAndEncryptDat = defaultBlockEncodingCtx. 946 compressAndEncrypt(baosInMemory.getBuffer(), 0, baosInMemory.size()); 947 } 948 if (compressAndEncryptDat == null) { 949 compressAndEncryptDat = new Bytes(baosInMemory.getBuffer(), 0, baosInMemory.size()); 950 } 951 if (onDiskBlockBytesWithHeader == null) { 952 onDiskBlockBytesWithHeader = new ByteArrayOutputStream(compressAndEncryptDat.getLength()); 953 } 954 onDiskBlockBytesWithHeader.reset(); 955 onDiskBlockBytesWithHeader.write(compressAndEncryptDat.get(), 956 compressAndEncryptDat.getOffset(), compressAndEncryptDat.getLength()); 957 // Calculate how many bytes we need for checksum on the tail of the block. 958 int numBytes = (int) ChecksumUtil.numBytes( 959 onDiskBlockBytesWithHeader.size(), 960 fileContext.getBytesPerChecksum()); 961 962 // Put the header for the on disk bytes; header currently is unfilled-out 963 putHeader(onDiskBlockBytesWithHeader, 964 onDiskBlockBytesWithHeader.size() + numBytes, 965 baosInMemory.size(), onDiskBlockBytesWithHeader.size()); 966 if (onDiskChecksum.length != numBytes) { 967 onDiskChecksum = new byte[numBytes]; 968 } 969 ChecksumUtil.generateChecksums( 970 onDiskBlockBytesWithHeader.getBuffer(), 0,onDiskBlockBytesWithHeader.size(), 971 onDiskChecksum, 0, fileContext.getChecksumType(), fileContext.getBytesPerChecksum()); 972 } 973 974 /** 975 * Put the header into the given byte array at the given offset. 976 * @param onDiskSize size of the block on disk header + data + checksum 977 * @param uncompressedSize size of the block after decompression (but 978 * before optional data block decoding) including header 979 * @param onDiskDataSize size of the block on disk with header 980 * and data but not including the checksums 981 */ 982 private void putHeader(byte[] dest, int offset, int onDiskSize, 983 int uncompressedSize, int onDiskDataSize) { 984 offset = blockType.put(dest, offset); 985 offset = Bytes.putInt(dest, offset, onDiskSize - HConstants.HFILEBLOCK_HEADER_SIZE); 986 offset = Bytes.putInt(dest, offset, uncompressedSize - HConstants.HFILEBLOCK_HEADER_SIZE); 987 offset = Bytes.putLong(dest, offset, prevOffset); 988 offset = Bytes.putByte(dest, offset, fileContext.getChecksumType().getCode()); 989 offset = Bytes.putInt(dest, offset, fileContext.getBytesPerChecksum()); 990 Bytes.putInt(dest, offset, onDiskDataSize); 991 } 992 993 private void putHeader(ByteBuff buff, int onDiskSize, 994 int uncompressedSize, int onDiskDataSize) { 995 buff.rewind(); 996 blockType.write(buff); 997 buff.putInt(onDiskSize - HConstants.HFILEBLOCK_HEADER_SIZE); 998 buff.putInt(uncompressedSize - HConstants.HFILEBLOCK_HEADER_SIZE); 999 buff.putLong(prevOffset); 1000 buff.put(fileContext.getChecksumType().getCode()); 1001 buff.putInt(fileContext.getBytesPerChecksum()); 1002 buff.putInt(onDiskDataSize); 1003 } 1004 1005 private void putHeader(ByteArrayOutputStream dest, int onDiskSize, 1006 int uncompressedSize, int onDiskDataSize) { 1007 putHeader(dest.getBuffer(),0, onDiskSize, uncompressedSize, onDiskDataSize); 1008 } 1009 1010 /** 1011 * Similar to {@link #writeHeaderAndData(FSDataOutputStream)}, but records 1012 * the offset of this block so that it can be referenced in the next block 1013 * of the same type. 1014 */ 1015 void writeHeaderAndData(FSDataOutputStream out) throws IOException { 1016 long offset = out.getPos(); 1017 if (startOffset != UNSET && offset != startOffset) { 1018 throw new IOException("A " + blockType + " block written to a " 1019 + "stream twice, first at offset " + startOffset + ", then at " 1020 + offset); 1021 } 1022 startOffset = offset; 1023 finishBlockAndWriteHeaderAndData(out); 1024 } 1025 1026 /** 1027 * Writes the header and the compressed data of this block (or uncompressed 1028 * data when not using compression) into the given stream. Can be called in 1029 * the "writing" state or in the "block ready" state. If called in the 1030 * "writing" state, transitions the writer to the "block ready" state. 1031 * @param out the output stream to write the 1032 */ 1033 protected void finishBlockAndWriteHeaderAndData(DataOutputStream out) 1034 throws IOException { 1035 ensureBlockReady(); 1036 long startTime = System.currentTimeMillis(); 1037 out.write(onDiskBlockBytesWithHeader.getBuffer(), 0, onDiskBlockBytesWithHeader.size()); 1038 out.write(onDiskChecksum); 1039 HFile.updateWriteLatency(System.currentTimeMillis() - startTime); 1040 } 1041 1042 /** 1043 * Returns the header or the compressed data (or uncompressed data when not 1044 * using compression) as a byte array. Can be called in the "writing" state 1045 * or in the "block ready" state. If called in the "writing" state, 1046 * transitions the writer to the "block ready" state. This returns 1047 * the header + data + checksums stored on disk. 1048 * 1049 * @return header and data as they would be stored on disk in a byte array 1050 */ 1051 byte[] getHeaderAndDataForTest() throws IOException { 1052 ensureBlockReady(); 1053 // This is not very optimal, because we are doing an extra copy. 1054 // But this method is used only by unit tests. 1055 byte[] output = 1056 new byte[onDiskBlockBytesWithHeader.size() 1057 + onDiskChecksum.length]; 1058 System.arraycopy(onDiskBlockBytesWithHeader.getBuffer(), 0, output, 0, 1059 onDiskBlockBytesWithHeader.size()); 1060 System.arraycopy(onDiskChecksum, 0, output, 1061 onDiskBlockBytesWithHeader.size(), onDiskChecksum.length); 1062 return output; 1063 } 1064 1065 /** 1066 * Releases resources used by this writer. 1067 */ 1068 void release() { 1069 if (dataBlockEncodingCtx != null) { 1070 dataBlockEncodingCtx.close(); 1071 dataBlockEncodingCtx = null; 1072 } 1073 if (defaultBlockEncodingCtx != null) { 1074 defaultBlockEncodingCtx.close(); 1075 defaultBlockEncodingCtx = null; 1076 } 1077 } 1078 1079 /** 1080 * Returns the on-disk size of the data portion of the block. This is the 1081 * compressed size if compression is enabled. Can only be called in the 1082 * "block ready" state. Header is not compressed, and its size is not 1083 * included in the return value. 1084 * 1085 * @return the on-disk size of the block, not including the header. 1086 */ 1087 int getOnDiskSizeWithoutHeader() { 1088 expectState(State.BLOCK_READY); 1089 return onDiskBlockBytesWithHeader.size() + 1090 onDiskChecksum.length - HConstants.HFILEBLOCK_HEADER_SIZE; 1091 } 1092 1093 /** 1094 * Returns the on-disk size of the block. Can only be called in the 1095 * "block ready" state. 1096 * 1097 * @return the on-disk size of the block ready to be written, including the 1098 * header size, the data and the checksum data. 1099 */ 1100 int getOnDiskSizeWithHeader() { 1101 expectState(State.BLOCK_READY); 1102 return onDiskBlockBytesWithHeader.size() + onDiskChecksum.length; 1103 } 1104 1105 /** 1106 * The uncompressed size of the block data. Does not include header size. 1107 */ 1108 int getUncompressedSizeWithoutHeader() { 1109 expectState(State.BLOCK_READY); 1110 return baosInMemory.size() - HConstants.HFILEBLOCK_HEADER_SIZE; 1111 } 1112 1113 /** 1114 * The uncompressed size of the block data, including header size. 1115 */ 1116 int getUncompressedSizeWithHeader() { 1117 expectState(State.BLOCK_READY); 1118 return baosInMemory.size(); 1119 } 1120 1121 /** @return true if a block is being written */ 1122 boolean isWriting() { 1123 return state == State.WRITING; 1124 } 1125 1126 /** 1127 * Returns the number of bytes written into the current block so far, or 1128 * zero if not writing the block at the moment. Note that this will return 1129 * zero in the "block ready" state as well. 1130 * 1131 * @return the number of bytes written 1132 */ 1133 public int encodedBlockSizeWritten() { 1134 return state != State.WRITING ? 0 : this.getEncodingState().getEncodedDataSizeWritten(); 1135 } 1136 1137 /** 1138 * Returns the number of bytes written into the current block so far, or 1139 * zero if not writing the block at the moment. Note that this will return 1140 * zero in the "block ready" state as well. 1141 * 1142 * @return the number of bytes written 1143 */ 1144 int blockSizeWritten() { 1145 return state != State.WRITING ? 0 : this.getEncodingState().getUnencodedDataSizeWritten(); 1146 } 1147 1148 /** 1149 * Clones the header followed by the uncompressed data, even if using 1150 * compression. This is needed for storing uncompressed blocks in the block 1151 * cache. Can be called in the "writing" state or the "block ready" state. 1152 * Returns only the header and data, does not include checksum data. 1153 * 1154 * @return Returns an uncompressed block ByteBuff for caching on write 1155 */ 1156 ByteBuff cloneUncompressedBufferWithHeader() { 1157 expectState(State.BLOCK_READY); 1158 ByteBuff bytebuff = allocator.allocate(baosInMemory.size()); 1159 baosInMemory.toByteBuff(bytebuff); 1160 int numBytes = (int) ChecksumUtil.numBytes( 1161 onDiskBlockBytesWithHeader.size(), 1162 fileContext.getBytesPerChecksum()); 1163 putHeader(bytebuff, onDiskBlockBytesWithHeader.size() + numBytes, 1164 baosInMemory.size(), onDiskBlockBytesWithHeader.size()); 1165 bytebuff.rewind(); 1166 return bytebuff; 1167 } 1168 1169 /** 1170 * Clones the header followed by the on-disk (compressed/encoded/encrypted) data. This is needed 1171 * for storing packed blocks in the block cache. Returns only the header and data, Does not 1172 * include checksum data. 1173 * @return Returns a copy of block bytes for caching on write 1174 */ 1175 private ByteBuff cloneOnDiskBufferWithHeader() { 1176 expectState(State.BLOCK_READY); 1177 ByteBuff bytebuff = allocator.allocate(onDiskBlockBytesWithHeader.size()); 1178 onDiskBlockBytesWithHeader.toByteBuff(bytebuff); 1179 bytebuff.rewind(); 1180 return bytebuff; 1181 } 1182 1183 private void expectState(State expectedState) { 1184 if (state != expectedState) { 1185 throw new IllegalStateException("Expected state: " + expectedState + 1186 ", actual state: " + state); 1187 } 1188 } 1189 1190 /** 1191 * Takes the given {@link BlockWritable} instance, creates a new block of 1192 * its appropriate type, writes the writable into this block, and flushes 1193 * the block into the output stream. The writer is instructed not to buffer 1194 * uncompressed bytes for cache-on-write. 1195 * 1196 * @param bw the block-writable object to write as a block 1197 * @param out the file system output stream 1198 */ 1199 void writeBlock(BlockWritable bw, FSDataOutputStream out) 1200 throws IOException { 1201 bw.writeToBlock(startWriting(bw.getBlockType())); 1202 writeHeaderAndData(out); 1203 } 1204 1205 /** 1206 * Creates a new HFileBlock. Checksums have already been validated, so 1207 * the byte buffer passed into the constructor of this newly created 1208 * block does not have checksum data even though the header minor 1209 * version is MINOR_VERSION_WITH_CHECKSUM. This is indicated by setting a 1210 * 0 value in bytesPerChecksum. This method copies the on-disk or 1211 * uncompressed data to build the HFileBlock which is used only 1212 * while writing blocks and caching. 1213 * 1214 * <p>TODO: Should there be an option where a cache can ask that hbase preserve block 1215 * checksums for checking after a block comes out of the cache? Otehrwise, cache is responsible 1216 * for blocks being wholesome (ECC memory or if file-backed, it does checksumming). 1217 */ 1218 HFileBlock getBlockForCaching(CacheConfig cacheConf) { 1219 HFileContext newContext = new HFileContextBuilder() 1220 .withBlockSize(fileContext.getBlocksize()) 1221 .withBytesPerCheckSum(0) 1222 .withChecksumType(ChecksumType.NULL) // no checksums in cached data 1223 .withCompression(fileContext.getCompression()) 1224 .withDataBlockEncoding(fileContext.getDataBlockEncoding()) 1225 .withHBaseCheckSum(fileContext.isUseHBaseChecksum()) 1226 .withCompressTags(fileContext.isCompressTags()) 1227 .withIncludesMvcc(fileContext.isIncludesMvcc()) 1228 .withIncludesTags(fileContext.isIncludesTags()) 1229 .withColumnFamily(fileContext.getColumnFamily()) 1230 .withTableName(fileContext.getTableName()) 1231 .build(); 1232 // Build the HFileBlock. 1233 HFileBlockBuilder builder = new HFileBlockBuilder(); 1234 ByteBuff buff; 1235 if (cacheConf.shouldCacheCompressed(blockType.getCategory())) { 1236 buff = cloneOnDiskBufferWithHeader(); 1237 } else { 1238 buff = cloneUncompressedBufferWithHeader(); 1239 } 1240 return builder.withBlockType(blockType) 1241 .withOnDiskSizeWithoutHeader(getOnDiskSizeWithoutHeader()) 1242 .withUncompressedSizeWithoutHeader(getUncompressedSizeWithoutHeader()) 1243 .withPrevBlockOffset(prevOffset) 1244 .withByteBuff(buff) 1245 .withFillHeader(FILL_HEADER) 1246 .withOffset(startOffset) 1247 .withNextBlockOnDiskSize(UNSET) 1248 .withOnDiskDataSizeWithHeader(onDiskBlockBytesWithHeader.size() + onDiskChecksum.length) 1249 .withHFileContext(newContext) 1250 .withByteBuffAllocator(cacheConf.getByteBuffAllocator()) 1251 .withShared(!buff.hasArray()) 1252 .build(); 1253 } 1254 } 1255 1256 /** Something that can be written into a block. */ 1257 interface BlockWritable { 1258 /** The type of block this data should use. */ 1259 BlockType getBlockType(); 1260 1261 /** 1262 * Writes the block to the provided stream. Must not write any magic 1263 * records. 1264 * 1265 * @param out a stream to write uncompressed data into 1266 */ 1267 void writeToBlock(DataOutput out) throws IOException; 1268 } 1269 1270 /** 1271 * Iterator for reading {@link HFileBlock}s in load-on-open-section, such as root data index 1272 * block, meta index block, file info block etc. 1273 */ 1274 interface BlockIterator { 1275 /** 1276 * Get the next block, or null if there are no more blocks to iterate. 1277 */ 1278 HFileBlock nextBlock() throws IOException; 1279 1280 /** 1281 * Similar to {@link #nextBlock()} but checks block type, throws an exception if incorrect, and 1282 * returns the HFile block 1283 */ 1284 HFileBlock nextBlockWithBlockType(BlockType blockType) throws IOException; 1285 1286 /** 1287 * Now we use the {@link ByteBuffAllocator} to manage the nio ByteBuffers for HFileBlocks, so we 1288 * must deallocate all of the ByteBuffers in the end life. the BlockIterator's life cycle is 1289 * starting from opening an HFileReader and stopped when the HFileReader#close, so we will keep 1290 * track all the read blocks until we call {@link BlockIterator#freeBlocks()} when closing the 1291 * HFileReader. Sum bytes of those blocks in load-on-open section should be quite small, so 1292 * tracking them should be OK. 1293 */ 1294 void freeBlocks(); 1295 } 1296 1297 /** An HFile block reader with iteration ability. */ 1298 interface FSReader { 1299 /** 1300 * Reads the block at the given offset in the file with the given on-disk size and uncompressed 1301 * size. 1302 * @param offset of the file to read 1303 * @param onDiskSize the on-disk size of the entire block, including all applicable headers, or 1304 * -1 if unknown 1305 * @param pread true to use pread, otherwise use the stream read. 1306 * @param updateMetrics update the metrics or not. 1307 * @param intoHeap allocate the block's ByteBuff by {@link ByteBuffAllocator} or JVM heap. For 1308 * LRUBlockCache, we must ensure that the block to cache is an heap one, because the 1309 * memory occupation is based on heap now, also for {@link CombinedBlockCache}, we use 1310 * the heap LRUBlockCache as L1 cache to cache small blocks such as IndexBlock or 1311 * MetaBlock for faster access. So introduce an flag here to decide whether allocate 1312 * from JVM heap or not so that we can avoid an extra off-heap to heap memory copy when 1313 * using LRUBlockCache. For most cases, we known what's the expected block type we'll 1314 * read, while for some special case (Example: HFileReaderImpl#readNextDataBlock()), we 1315 * cannot pre-decide what's the expected block type, then we can only allocate block's 1316 * ByteBuff from {@link ByteBuffAllocator} firstly, and then when caching it in 1317 * {@link LruBlockCache} we'll check whether the ByteBuff is from heap or not, if not 1318 * then we'll clone it to an heap one and cache it. 1319 * @return the newly read block 1320 */ 1321 HFileBlock readBlockData(long offset, long onDiskSize, boolean pread, boolean updateMetrics, 1322 boolean intoHeap) throws IOException; 1323 1324 /** 1325 * Creates a block iterator over the given portion of the {@link HFile}. 1326 * The iterator returns blocks starting with offset such that offset <= 1327 * startOffset < endOffset. Returned blocks are always unpacked. 1328 * Used when no hfile index available; e.g. reading in the hfile index 1329 * blocks themselves on file open. 1330 * 1331 * @param startOffset the offset of the block to start iteration with 1332 * @param endOffset the offset to end iteration at (exclusive) 1333 * @return an iterator of blocks between the two given offsets 1334 */ 1335 BlockIterator blockRange(long startOffset, long endOffset); 1336 1337 /** Closes the backing streams */ 1338 void closeStreams() throws IOException; 1339 1340 /** Get a decoder for {@link BlockType#ENCODED_DATA} blocks from this file. */ 1341 HFileBlockDecodingContext getBlockDecodingContext(); 1342 1343 /** Get the default decoder for blocks from this file. */ 1344 HFileBlockDecodingContext getDefaultBlockDecodingContext(); 1345 1346 void setIncludesMemStoreTS(boolean includesMemstoreTS); 1347 void setDataBlockEncoder(HFileDataBlockEncoder encoder); 1348 1349 /** 1350 * To close the stream's socket. Note: This can be concurrently called from multiple threads and 1351 * implementation should take care of thread safety. 1352 */ 1353 void unbufferStream(); 1354 } 1355 1356 /** 1357 * Data-structure to use caching the header of the NEXT block. Only works if next read 1358 * that comes in here is next in sequence in this block. 1359 * 1360 * When we read, we read current block and the next blocks' header. We do this so we have 1361 * the length of the next block to read if the hfile index is not available (rare, at 1362 * hfile open only). 1363 */ 1364 private static class PrefetchedHeader { 1365 long offset = -1; 1366 byte[] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE]; 1367 final ByteBuff buf = new SingleByteBuff(ByteBuffer.wrap(header, 0, header.length)); 1368 1369 @Override 1370 public String toString() { 1371 return "offset=" + this.offset + ", header=" + Bytes.toStringBinary(header); 1372 } 1373 } 1374 1375 /** 1376 * Reads version 2 HFile blocks from the filesystem. 1377 */ 1378 static class FSReaderImpl implements FSReader { 1379 /** The file system stream of the underlying {@link HFile} that 1380 * does or doesn't do checksum validations in the filesystem */ 1381 private FSDataInputStreamWrapper streamWrapper; 1382 1383 private HFileBlockDecodingContext encodedBlockDecodingCtx; 1384 1385 /** Default context used when BlockType != {@link BlockType#ENCODED_DATA}. */ 1386 private final HFileBlockDefaultDecodingContext defaultDecodingCtx; 1387 1388 /** 1389 * Cache of the NEXT header after this. Check it is indeed next blocks header 1390 * before using it. TODO: Review. This overread into next block to fetch 1391 * next blocks header seems unnecessary given we usually get the block size 1392 * from the hfile index. Review! 1393 */ 1394 private AtomicReference<PrefetchedHeader> prefetchedHeader = 1395 new AtomicReference<>(new PrefetchedHeader()); 1396 1397 /** The size of the file we are reading from, or -1 if unknown. */ 1398 private long fileSize; 1399 1400 /** The size of the header */ 1401 protected final int hdrSize; 1402 1403 /** The filesystem used to access data */ 1404 private HFileSystem hfs; 1405 1406 private HFileContext fileContext; 1407 // Cache the fileName 1408 private String pathName; 1409 1410 private final ByteBuffAllocator allocator; 1411 1412 private final Lock streamLock = new ReentrantLock(); 1413 1414 FSReaderImpl(ReaderContext readerContext, HFileContext fileContext, 1415 ByteBuffAllocator allocator) throws IOException { 1416 this.fileSize = readerContext.getFileSize(); 1417 this.hfs = readerContext.getFileSystem(); 1418 if (readerContext.getFilePath() != null) { 1419 this.pathName = readerContext.getFilePath().toString(); 1420 } 1421 this.fileContext = fileContext; 1422 this.hdrSize = headerSize(fileContext.isUseHBaseChecksum()); 1423 this.allocator = allocator; 1424 1425 this.streamWrapper = readerContext.getInputStreamWrapper(); 1426 // Older versions of HBase didn't support checksum. 1427 this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum()); 1428 defaultDecodingCtx = new HFileBlockDefaultDecodingContext(fileContext); 1429 encodedBlockDecodingCtx = defaultDecodingCtx; 1430 } 1431 1432 @Override 1433 public BlockIterator blockRange(final long startOffset, final long endOffset) { 1434 final FSReader owner = this; // handle for inner class 1435 return new BlockIterator() { 1436 private volatile boolean freed = false; 1437 // Tracking all read blocks until we call freeBlocks. 1438 private List<HFileBlock> blockTracker = new ArrayList<>(); 1439 private long offset = startOffset; 1440 // Cache length of next block. Current block has the length of next block in it. 1441 private long length = -1; 1442 1443 @Override 1444 public HFileBlock nextBlock() throws IOException { 1445 if (offset >= endOffset) { 1446 return null; 1447 } 1448 HFileBlock b = readBlockData(offset, length, false, false, true); 1449 offset += b.getOnDiskSizeWithHeader(); 1450 length = b.getNextBlockOnDiskSize(); 1451 HFileBlock uncompressed = b.unpack(fileContext, owner); 1452 if (uncompressed != b) { 1453 b.release(); // Need to release the compressed Block now. 1454 } 1455 blockTracker.add(uncompressed); 1456 return uncompressed; 1457 } 1458 1459 @Override 1460 public HFileBlock nextBlockWithBlockType(BlockType blockType) throws IOException { 1461 HFileBlock blk = nextBlock(); 1462 if (blk.getBlockType() != blockType) { 1463 throw new IOException( 1464 "Expected block of type " + blockType + " but found " + blk.getBlockType()); 1465 } 1466 return blk; 1467 } 1468 1469 @Override 1470 public void freeBlocks() { 1471 if (freed) { 1472 return; 1473 } 1474 blockTracker.forEach(HFileBlock::release); 1475 blockTracker = null; 1476 freed = true; 1477 } 1478 }; 1479 } 1480 1481 /** 1482 * Does a positional read or a seek and read into the given byte buffer. We need take care that 1483 * we will call the {@link ByteBuff#release()} for every exit to deallocate the ByteBuffers, 1484 * otherwise the memory leak may happen. 1485 * @param dest destination buffer 1486 * @param size size of read 1487 * @param peekIntoNextBlock whether to read the next block's on-disk size 1488 * @param fileOffset position in the stream to read at 1489 * @param pread whether we should do a positional read 1490 * @param istream The input source of data 1491 * @return true to indicate the destination buffer include the next block header, otherwise only 1492 * include the current block data without the next block header. 1493 * @throws IOException if any IO error happen. 1494 */ 1495 protected boolean readAtOffset(FSDataInputStream istream, ByteBuff dest, int size, 1496 boolean peekIntoNextBlock, long fileOffset, boolean pread) throws IOException { 1497 if (!pread) { 1498 // Seek + read. Better for scanning. 1499 HFileUtil.seekOnMultipleSources(istream, fileOffset); 1500 long realOffset = istream.getPos(); 1501 if (realOffset != fileOffset) { 1502 throw new IOException("Tried to seek to " + fileOffset + " to read " + size 1503 + " bytes, but pos=" + realOffset + " after seek"); 1504 } 1505 if (!peekIntoNextBlock) { 1506 BlockIOUtils.readFully(dest, istream, size); 1507 return false; 1508 } 1509 1510 // Try to read the next block header 1511 if (!BlockIOUtils.readWithExtra(dest, istream, size, hdrSize)) { 1512 // did not read the next block header. 1513 return false; 1514 } 1515 } else { 1516 // Positional read. Better for random reads; or when the streamLock is already locked. 1517 int extraSize = peekIntoNextBlock ? hdrSize : 0; 1518 if (!BlockIOUtils.preadWithExtra(dest, istream, fileOffset, size, extraSize)) { 1519 // did not read the next block header. 1520 return false; 1521 } 1522 } 1523 assert peekIntoNextBlock; 1524 return true; 1525 } 1526 1527 /** 1528 * Reads a version 2 block (version 1 blocks not supported and not expected). Tries to do as 1529 * little memory allocation as possible, using the provided on-disk size. 1530 * @param offset the offset in the stream to read at 1531 * @param onDiskSizeWithHeaderL the on-disk size of the block, including the header, or -1 if 1532 * unknown; i.e. when iterating over blocks reading in the file metadata info. 1533 * @param pread whether to use a positional read 1534 * @param updateMetrics whether to update the metrics 1535 * @param intoHeap allocate ByteBuff of block from heap or off-heap. 1536 * @see FSReader#readBlockData(long, long, boolean, boolean, boolean) for more details about the 1537 * useHeap. 1538 */ 1539 @Override 1540 public HFileBlock readBlockData(long offset, long onDiskSizeWithHeaderL, boolean pread, 1541 boolean updateMetrics, boolean intoHeap) throws IOException { 1542 // Get a copy of the current state of whether to validate 1543 // hbase checksums or not for this read call. This is not 1544 // thread-safe but the one constaint is that if we decide 1545 // to skip hbase checksum verification then we are 1546 // guaranteed to use hdfs checksum verification. 1547 boolean doVerificationThruHBaseChecksum = streamWrapper.shouldUseHBaseChecksum(); 1548 FSDataInputStream is = streamWrapper.getStream(doVerificationThruHBaseChecksum); 1549 1550 HFileBlock blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread, 1551 doVerificationThruHBaseChecksum, updateMetrics, intoHeap); 1552 if (blk == null) { 1553 HFile.LOG.warn("HBase checksum verification failed for file " + 1554 pathName + " at offset " + 1555 offset + " filesize " + fileSize + 1556 ". Retrying read with HDFS checksums turned on..."); 1557 1558 if (!doVerificationThruHBaseChecksum) { 1559 String msg = "HBase checksum verification failed for file " + 1560 pathName + " at offset " + 1561 offset + " filesize " + fileSize + 1562 " but this cannot happen because doVerify is " + 1563 doVerificationThruHBaseChecksum; 1564 HFile.LOG.warn(msg); 1565 throw new IOException(msg); // cannot happen case here 1566 } 1567 HFile.CHECKSUM_FAILURES.increment(); // update metrics 1568 1569 // If we have a checksum failure, we fall back into a mode where 1570 // the next few reads use HDFS level checksums. We aim to make the 1571 // next CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD reads avoid 1572 // hbase checksum verification, but since this value is set without 1573 // holding any locks, it can so happen that we might actually do 1574 // a few more than precisely this number. 1575 is = this.streamWrapper.fallbackToFsChecksum(CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD); 1576 doVerificationThruHBaseChecksum = false; 1577 blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread, 1578 doVerificationThruHBaseChecksum, updateMetrics, intoHeap); 1579 if (blk != null) { 1580 HFile.LOG.warn("HDFS checksum verification succeeded for file " + 1581 pathName + " at offset " + 1582 offset + " filesize " + fileSize); 1583 } 1584 } 1585 if (blk == null && !doVerificationThruHBaseChecksum) { 1586 String msg = "readBlockData failed, possibly due to " + 1587 "checksum verification failed for file " + pathName + 1588 " at offset " + offset + " filesize " + fileSize; 1589 HFile.LOG.warn(msg); 1590 throw new IOException(msg); 1591 } 1592 1593 // If there is a checksum mismatch earlier, then retry with 1594 // HBase checksums switched off and use HDFS checksum verification. 1595 // This triggers HDFS to detect and fix corrupt replicas. The 1596 // next checksumOffCount read requests will use HDFS checksums. 1597 // The decrementing of this.checksumOffCount is not thread-safe, 1598 // but it is harmless because eventually checksumOffCount will be 1599 // a negative number. 1600 streamWrapper.checksumOk(); 1601 return blk; 1602 } 1603 1604 /** 1605 * @return Check <code>onDiskSizeWithHeaderL</code> size is healthy and then return it as an int 1606 */ 1607 private static int checkAndGetSizeAsInt(final long onDiskSizeWithHeaderL, final int hdrSize) 1608 throws IOException { 1609 if ((onDiskSizeWithHeaderL < hdrSize && onDiskSizeWithHeaderL != -1) 1610 || onDiskSizeWithHeaderL >= Integer.MAX_VALUE) { 1611 throw new IOException("Invalid onDisksize=" + onDiskSizeWithHeaderL 1612 + ": expected to be at least " + hdrSize 1613 + " and at most " + Integer.MAX_VALUE + ", or -1"); 1614 } 1615 return (int)onDiskSizeWithHeaderL; 1616 } 1617 1618 /** 1619 * Verify the passed in onDiskSizeWithHeader aligns with what is in the header else something 1620 * is not right. 1621 */ 1622 private void verifyOnDiskSizeMatchesHeader(final int passedIn, final ByteBuff headerBuf, 1623 final long offset, boolean verifyChecksum) 1624 throws IOException { 1625 // Assert size provided aligns with what is in the header 1626 int fromHeader = getOnDiskSizeWithHeader(headerBuf, verifyChecksum); 1627 if (passedIn != fromHeader) { 1628 throw new IOException("Passed in onDiskSizeWithHeader=" + passedIn + " != " + fromHeader + 1629 ", offset=" + offset + ", fileContext=" + this.fileContext); 1630 } 1631 } 1632 1633 /** 1634 * Check atomic reference cache for this block's header. Cache only good if next 1635 * read coming through is next in sequence in the block. We read next block's 1636 * header on the tail of reading the previous block to save a seek. Otherwise, 1637 * we have to do a seek to read the header before we can pull in the block OR 1638 * we have to backup the stream because we over-read (the next block's header). 1639 * @see PrefetchedHeader 1640 * @return The cached block header or null if not found. 1641 * @see #cacheNextBlockHeader(long, ByteBuff, int, int) 1642 */ 1643 private ByteBuff getCachedHeader(final long offset) { 1644 PrefetchedHeader ph = this.prefetchedHeader.get(); 1645 return ph != null && ph.offset == offset ? ph.buf : null; 1646 } 1647 1648 /** 1649 * Save away the next blocks header in atomic reference. 1650 * @see #getCachedHeader(long) 1651 * @see PrefetchedHeader 1652 */ 1653 private void cacheNextBlockHeader(final long offset, 1654 ByteBuff onDiskBlock, int onDiskSizeWithHeader, int headerLength) { 1655 PrefetchedHeader ph = new PrefetchedHeader(); 1656 ph.offset = offset; 1657 onDiskBlock.get(onDiskSizeWithHeader, ph.header, 0, headerLength); 1658 this.prefetchedHeader.set(ph); 1659 } 1660 1661 private int getNextBlockOnDiskSize(boolean readNextHeader, ByteBuff onDiskBlock, 1662 int onDiskSizeWithHeader) { 1663 int nextBlockOnDiskSize = -1; 1664 if (readNextHeader) { 1665 nextBlockOnDiskSize = 1666 onDiskBlock.getIntAfterPosition(onDiskSizeWithHeader + BlockType.MAGIC_LENGTH) 1667 + hdrSize; 1668 } 1669 return nextBlockOnDiskSize; 1670 } 1671 1672 private ByteBuff allocate(int size, boolean intoHeap) { 1673 return intoHeap ? HEAP.allocate(size) : allocator.allocate(size); 1674 } 1675 1676 /** 1677 * Reads a version 2 block. 1678 * @param offset the offset in the stream to read at. 1679 * @param onDiskSizeWithHeaderL the on-disk size of the block, including the header and 1680 * checksums if present or -1 if unknown (as a long). Can be -1 if we are doing raw 1681 * iteration of blocks as when loading up file metadata; i.e. the first read of a new 1682 * file. Usually non-null gotten from the file index. 1683 * @param pread whether to use a positional read 1684 * @param verifyChecksum Whether to use HBase checksums. If HBase checksum is switched off, then 1685 * use HDFS checksum. Can also flip on/off reading same file if we hit a troublesome 1686 * patch in an hfile. 1687 * @param updateMetrics whether need to update the metrics. 1688 * @param intoHeap allocate the ByteBuff of block from heap or off-heap. 1689 * @return the HFileBlock or null if there is a HBase checksum mismatch 1690 */ 1691 protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset, 1692 long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum, boolean updateMetrics, 1693 boolean intoHeap) throws IOException { 1694 if (offset < 0) { 1695 throw new IOException("Invalid offset=" + offset + " trying to read " 1696 + "block (onDiskSize=" + onDiskSizeWithHeaderL + ")"); 1697 } 1698 int onDiskSizeWithHeader = checkAndGetSizeAsInt(onDiskSizeWithHeaderL, hdrSize); 1699 // Try and get cached header. Will serve us in rare case where onDiskSizeWithHeaderL is -1 1700 // and will save us having to seek the stream backwards to reread the header we 1701 // read the last time through here. 1702 ByteBuff headerBuf = getCachedHeader(offset); 1703 LOG.trace("Reading {} at offset={}, pread={}, verifyChecksum={}, cachedHeader={}, " + 1704 "onDiskSizeWithHeader={}", this.fileContext.getHFileName(), offset, pread, 1705 verifyChecksum, headerBuf, onDiskSizeWithHeader); 1706 // This is NOT same as verifyChecksum. This latter is whether to do hbase 1707 // checksums. Can change with circumstances. The below flag is whether the 1708 // file has support for checksums (version 2+). 1709 boolean checksumSupport = this.fileContext.isUseHBaseChecksum(); 1710 long startTime = System.currentTimeMillis(); 1711 if (onDiskSizeWithHeader <= 0) { 1712 // We were not passed the block size. Need to get it from the header. If header was 1713 // not cached (see getCachedHeader above), need to seek to pull it in. This is costly 1714 // and should happen very rarely. Currently happens on open of a hfile reader where we 1715 // read the trailer blocks to pull in the indices. Otherwise, we are reading block sizes 1716 // out of the hfile index. To check, enable TRACE in this file and you'll get an exception 1717 // in a LOG every time we seek. See HBASE-17072 for more detail. 1718 if (headerBuf == null) { 1719 if (LOG.isTraceEnabled()) { 1720 LOG.trace("Extra see to get block size!", new RuntimeException()); 1721 } 1722 headerBuf = HEAP.allocate(hdrSize); 1723 readAtOffset(is, headerBuf, hdrSize, false, offset, pread); 1724 headerBuf.rewind(); 1725 } 1726 onDiskSizeWithHeader = getOnDiskSizeWithHeader(headerBuf, checksumSupport); 1727 } 1728 int preReadHeaderSize = headerBuf == null? 0 : hdrSize; 1729 // Allocate enough space to fit the next block's header too; saves a seek next time through. 1730 // onDiskBlock is whole block + header + checksums then extra hdrSize to read next header; 1731 // onDiskSizeWithHeader is header, body, and any checksums if present. preReadHeaderSize 1732 // says where to start reading. If we have the header cached, then we don't need to read 1733 // it again and we can likely read from last place we left off w/o need to backup and reread 1734 // the header we read last time through here. 1735 ByteBuff onDiskBlock = this.allocate(onDiskSizeWithHeader + hdrSize, intoHeap); 1736 boolean initHFileBlockSuccess = false; 1737 try { 1738 if (headerBuf != null) { 1739 onDiskBlock.put(0, headerBuf, 0, hdrSize).position(hdrSize); 1740 } 1741 boolean readNextHeader = readAtOffset(is, onDiskBlock, 1742 onDiskSizeWithHeader - preReadHeaderSize, true, offset + preReadHeaderSize, pread); 1743 onDiskBlock.rewind(); // in case of moving position when copying a cached header 1744 int nextBlockOnDiskSize = 1745 getNextBlockOnDiskSize(readNextHeader, onDiskBlock, onDiskSizeWithHeader); 1746 if (headerBuf == null) { 1747 headerBuf = onDiskBlock.duplicate().position(0).limit(hdrSize); 1748 } 1749 // Do a few checks before we go instantiate HFileBlock. 1750 assert onDiskSizeWithHeader > this.hdrSize; 1751 verifyOnDiskSizeMatchesHeader(onDiskSizeWithHeader, headerBuf, offset, checksumSupport); 1752 ByteBuff curBlock = onDiskBlock.duplicate().position(0).limit(onDiskSizeWithHeader); 1753 // Verify checksum of the data before using it for building HFileBlock. 1754 if (verifyChecksum && !validateChecksum(offset, curBlock, hdrSize)) { 1755 return null; 1756 } 1757 long duration = System.currentTimeMillis() - startTime; 1758 if (updateMetrics) { 1759 HFile.updateReadLatency(duration, pread); 1760 } 1761 // The onDiskBlock will become the headerAndDataBuffer for this block. 1762 // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already 1763 // contains the header of next block, so no need to set next block's header in it. 1764 HFileBlock hFileBlock = createFromBuff(curBlock, checksumSupport, offset, 1765 nextBlockOnDiskSize, fileContext, intoHeap ? HEAP : allocator); 1766 // Run check on uncompressed sizings. 1767 if (!fileContext.isCompressedOrEncrypted()) { 1768 hFileBlock.sanityCheckUncompressed(); 1769 } 1770 LOG.trace("Read {} in {} ns", hFileBlock, duration); 1771 // Cache next block header if we read it for the next time through here. 1772 if (nextBlockOnDiskSize != -1) { 1773 cacheNextBlockHeader(offset + hFileBlock.getOnDiskSizeWithHeader(), onDiskBlock, 1774 onDiskSizeWithHeader, hdrSize); 1775 } 1776 initHFileBlockSuccess = true; 1777 return hFileBlock; 1778 } finally { 1779 if (!initHFileBlockSuccess) { 1780 onDiskBlock.release(); 1781 } 1782 } 1783 } 1784 1785 @Override 1786 public void setIncludesMemStoreTS(boolean includesMemstoreTS) { 1787 this.fileContext = new HFileContextBuilder(this.fileContext) 1788 .withIncludesMvcc(includesMemstoreTS).build(); 1789 } 1790 1791 @Override 1792 public void setDataBlockEncoder(HFileDataBlockEncoder encoder) { 1793 encodedBlockDecodingCtx = encoder.newDataBlockDecodingContext(this.fileContext); 1794 } 1795 1796 @Override 1797 public HFileBlockDecodingContext getBlockDecodingContext() { 1798 return this.encodedBlockDecodingCtx; 1799 } 1800 1801 @Override 1802 public HFileBlockDecodingContext getDefaultBlockDecodingContext() { 1803 return this.defaultDecodingCtx; 1804 } 1805 1806 /** 1807 * Generates the checksum for the header as well as the data and then validates it. 1808 * If the block doesn't uses checksum, returns false. 1809 * @return True if checksum matches, else false. 1810 */ 1811 private boolean validateChecksum(long offset, ByteBuff data, int hdrSize) { 1812 // If this is an older version of the block that does not have checksums, then return false 1813 // indicating that checksum verification did not succeed. Actually, this method should never 1814 // be called when the minorVersion is 0, thus this is a defensive check for a cannot-happen 1815 // case. Since this is a cannot-happen case, it is better to return false to indicate a 1816 // checksum validation failure. 1817 if (!fileContext.isUseHBaseChecksum()) { 1818 return false; 1819 } 1820 return ChecksumUtil.validateChecksum(data, pathName, offset, hdrSize); 1821 } 1822 1823 @Override 1824 public void closeStreams() throws IOException { 1825 streamWrapper.close(); 1826 } 1827 1828 @Override 1829 public void unbufferStream() { 1830 // To handle concurrent reads, ensure that no other client is accessing the streams while we 1831 // unbuffer it. 1832 if (streamLock.tryLock()) { 1833 try { 1834 this.streamWrapper.unbuffer(); 1835 } finally { 1836 streamLock.unlock(); 1837 } 1838 } 1839 } 1840 1841 @Override 1842 public String toString() { 1843 return "hfs=" + hfs + ", path=" + pathName + ", fileContext=" + fileContext; 1844 } 1845 } 1846 1847 /** An additional sanity-check in case no compression or encryption is being used. */ 1848 void sanityCheckUncompressed() throws IOException { 1849 if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader + 1850 totalChecksumBytes()) { 1851 throw new IOException("Using no compression but " 1852 + "onDiskSizeWithoutHeader=" + onDiskSizeWithoutHeader + ", " 1853 + "uncompressedSizeWithoutHeader=" + uncompressedSizeWithoutHeader 1854 + ", numChecksumbytes=" + totalChecksumBytes()); 1855 } 1856 } 1857 1858 // Cacheable implementation 1859 @Override 1860 public int getSerializedLength() { 1861 if (buf != null) { 1862 // Include extra bytes for block metadata. 1863 return this.buf.limit() + BLOCK_METADATA_SPACE; 1864 } 1865 return 0; 1866 } 1867 1868 // Cacheable implementation 1869 @Override 1870 public void serialize(ByteBuffer destination, boolean includeNextBlockMetadata) { 1871 this.buf.get(destination, 0, getSerializedLength() - BLOCK_METADATA_SPACE); 1872 destination = addMetaData(destination, includeNextBlockMetadata); 1873 1874 // Make it ready for reading. flip sets position to zero and limit to current position which 1875 // is what we want if we do not want to serialize the block plus checksums if present plus 1876 // metadata. 1877 destination.flip(); 1878 } 1879 1880 /** 1881 * For use by bucketcache. This exposes internals. 1882 */ 1883 public ByteBuffer getMetaData() { 1884 ByteBuffer bb = ByteBuffer.allocate(BLOCK_METADATA_SPACE); 1885 bb = addMetaData(bb, true); 1886 bb.flip(); 1887 return bb; 1888 } 1889 1890 /** 1891 * Adds metadata at current position (position is moved forward). Does not flip or reset. 1892 * @return The passed <code>destination</code> with metadata added. 1893 */ 1894 private ByteBuffer addMetaData(final ByteBuffer destination, boolean includeNextBlockMetadata) { 1895 destination.put(this.fileContext.isUseHBaseChecksum() ? (byte) 1 : (byte) 0); 1896 destination.putLong(this.offset); 1897 if (includeNextBlockMetadata) { 1898 destination.putInt(this.nextBlockOnDiskSize); 1899 } 1900 return destination; 1901 } 1902 1903 // Cacheable implementation 1904 @Override 1905 public CacheableDeserializer<Cacheable> getDeserializer() { 1906 return HFileBlock.BLOCK_DESERIALIZER; 1907 } 1908 1909 @Override 1910 public int hashCode() { 1911 int result = 1; 1912 result = result * 31 + blockType.hashCode(); 1913 result = result * 31 + nextBlockOnDiskSize; 1914 result = result * 31 + (int) (offset ^ (offset >>> 32)); 1915 result = result * 31 + onDiskSizeWithoutHeader; 1916 result = result * 31 + (int) (prevBlockOffset ^ (prevBlockOffset >>> 32)); 1917 result = result * 31 + uncompressedSizeWithoutHeader; 1918 result = result * 31 + buf.hashCode(); 1919 return result; 1920 } 1921 1922 @Override 1923 public boolean equals(Object comparison) { 1924 if (this == comparison) { 1925 return true; 1926 } 1927 if (comparison == null) { 1928 return false; 1929 } 1930 if (!(comparison instanceof HFileBlock)) { 1931 return false; 1932 } 1933 1934 HFileBlock castedComparison = (HFileBlock) comparison; 1935 1936 if (castedComparison.blockType != this.blockType) { 1937 return false; 1938 } 1939 if (castedComparison.nextBlockOnDiskSize != this.nextBlockOnDiskSize) { 1940 return false; 1941 } 1942 // Offset is important. Needed when we have to remake cachekey when block is returned to cache. 1943 if (castedComparison.offset != this.offset) { 1944 return false; 1945 } 1946 if (castedComparison.onDiskSizeWithoutHeader != this.onDiskSizeWithoutHeader) { 1947 return false; 1948 } 1949 if (castedComparison.prevBlockOffset != this.prevBlockOffset) { 1950 return false; 1951 } 1952 if (castedComparison.uncompressedSizeWithoutHeader != this.uncompressedSizeWithoutHeader) { 1953 return false; 1954 } 1955 if (ByteBuff.compareTo(this.buf, 0, this.buf.limit(), castedComparison.buf, 0, 1956 castedComparison.buf.limit()) != 0) { 1957 return false; 1958 } 1959 return true; 1960 } 1961 1962 DataBlockEncoding getDataBlockEncoding() { 1963 if (blockType == BlockType.ENCODED_DATA) { 1964 return DataBlockEncoding.getEncodingById(getDataBlockEncodingId()); 1965 } 1966 return DataBlockEncoding.NONE; 1967 } 1968 1969 byte getChecksumType() { 1970 return this.fileContext.getChecksumType().getCode(); 1971 } 1972 1973 int getBytesPerChecksum() { 1974 return this.fileContext.getBytesPerChecksum(); 1975 } 1976 1977 /** @return the size of data on disk + header. Excludes checksum. */ 1978 int getOnDiskDataSizeWithHeader() { 1979 return this.onDiskDataSizeWithHeader; 1980 } 1981 1982 /** 1983 * Calculate the number of bytes required to store all the checksums 1984 * for this block. Each checksum value is a 4 byte integer. 1985 */ 1986 int totalChecksumBytes() { 1987 // If the hfile block has minorVersion 0, then there are no checksum 1988 // data to validate. Similarly, a zero value in this.bytesPerChecksum 1989 // indicates that cached blocks do not have checksum data because 1990 // checksums were already validated when the block was read from disk. 1991 if (!fileContext.isUseHBaseChecksum() || this.fileContext.getBytesPerChecksum() == 0) { 1992 return 0; 1993 } 1994 return (int) ChecksumUtil.numBytes(onDiskDataSizeWithHeader, 1995 this.fileContext.getBytesPerChecksum()); 1996 } 1997 1998 /** 1999 * Returns the size of this block header. 2000 */ 2001 public int headerSize() { 2002 return headerSize(this.fileContext.isUseHBaseChecksum()); 2003 } 2004 2005 /** 2006 * Maps a minor version to the size of the header. 2007 */ 2008 public static int headerSize(boolean usesHBaseChecksum) { 2009 return usesHBaseChecksum? 2010 HConstants.HFILEBLOCK_HEADER_SIZE: HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM; 2011 } 2012 2013 /** 2014 * Return the appropriate DUMMY_HEADER for the minor version 2015 */ 2016 // TODO: Why is this in here? 2017 byte[] getDummyHeaderForVersion() { 2018 return getDummyHeaderForVersion(this.fileContext.isUseHBaseChecksum()); 2019 } 2020 2021 /** 2022 * Return the appropriate DUMMY_HEADER for the minor version 2023 */ 2024 static private byte[] getDummyHeaderForVersion(boolean usesHBaseChecksum) { 2025 return usesHBaseChecksum? HConstants.HFILEBLOCK_DUMMY_HEADER: DUMMY_HEADER_NO_CHECKSUM; 2026 } 2027 2028 /** 2029 * @return This HFileBlocks fileContext which will a derivative of the 2030 * fileContext for the file from which this block's data was originally read. 2031 */ 2032 HFileContext getHFileContext() { 2033 return this.fileContext; 2034 } 2035 2036 /** 2037 * Convert the contents of the block header into a human readable string. 2038 * This is mostly helpful for debugging. This assumes that the block 2039 * has minor version > 0. 2040 */ 2041 static String toStringHeader(ByteBuff buf) throws IOException { 2042 byte[] magicBuf = new byte[Math.min(buf.limit() - buf.position(), BlockType.MAGIC_LENGTH)]; 2043 buf.get(magicBuf); 2044 BlockType bt = BlockType.parse(magicBuf, 0, BlockType.MAGIC_LENGTH); 2045 int compressedBlockSizeNoHeader = buf.getInt(); 2046 int uncompressedBlockSizeNoHeader = buf.getInt(); 2047 long prevBlockOffset = buf.getLong(); 2048 byte cksumtype = buf.get(); 2049 long bytesPerChecksum = buf.getInt(); 2050 long onDiskDataSizeWithHeader = buf.getInt(); 2051 return " Header dump: magic: " + Bytes.toString(magicBuf) + 2052 " blockType " + bt + 2053 " compressedBlockSizeNoHeader " + 2054 compressedBlockSizeNoHeader + 2055 " uncompressedBlockSizeNoHeader " + 2056 uncompressedBlockSizeNoHeader + 2057 " prevBlockOffset " + prevBlockOffset + 2058 " checksumType " + ChecksumType.codeToType(cksumtype) + 2059 " bytesPerChecksum " + bytesPerChecksum + 2060 " onDiskDataSizeWithHeader " + onDiskDataSizeWithHeader; 2061 } 2062 2063 private static HFileBlockBuilder createBuilder(HFileBlock blk){ 2064 return new HFileBlockBuilder() 2065 .withBlockType(blk.blockType) 2066 .withOnDiskSizeWithoutHeader(blk.onDiskSizeWithoutHeader) 2067 .withUncompressedSizeWithoutHeader(blk.uncompressedSizeWithoutHeader) 2068 .withPrevBlockOffset(blk.prevBlockOffset) 2069 .withByteBuff(blk.buf.duplicate()) // Duplicate the buffer. 2070 .withOffset(blk.offset) 2071 .withOnDiskDataSizeWithHeader(blk.onDiskDataSizeWithHeader) 2072 .withNextBlockOnDiskSize(blk.nextBlockOnDiskSize) 2073 .withHFileContext(blk.fileContext) 2074 .withByteBuffAllocator(blk.allocator) 2075 .withShared(blk.isSharedMem()); 2076 } 2077 2078 static HFileBlock shallowClone(HFileBlock blk) { 2079 return createBuilder(blk).build(); 2080 } 2081 2082 static HFileBlock deepCloneOnHeap(HFileBlock blk) { 2083 ByteBuff deepCloned = ByteBuff.wrap(ByteBuffer.wrap(blk.buf.toBytes(0, blk.buf.limit()))); 2084 return createBuilder(blk).withByteBuff(deepCloned).withShared(false).build(); 2085 } 2086}