001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP; 021import java.io.DataInputStream; 022import java.io.DataOutput; 023import java.io.DataOutputStream; 024import java.io.IOException; 025import java.nio.ByteBuffer; 026import java.util.ArrayList; 027import java.util.List; 028import java.util.concurrent.atomic.AtomicReference; 029import java.util.concurrent.locks.Lock; 030import java.util.concurrent.locks.ReentrantLock; 031import org.apache.hadoop.fs.FSDataInputStream; 032import org.apache.hadoop.fs.FSDataOutputStream; 033import org.apache.hadoop.hbase.Cell; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.fs.HFileSystem; 036import org.apache.hadoop.hbase.io.ByteArrayOutputStream; 037import org.apache.hadoop.hbase.io.ByteBuffAllocator; 038import org.apache.hadoop.hbase.io.ByteBuffInputStream; 039import org.apache.hadoop.hbase.io.ByteBufferWriterDataOutputStream; 040import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; 041import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 042import org.apache.hadoop.hbase.io.encoding.EncodingState; 043import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; 044import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext; 045import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext; 046import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext; 047import org.apache.hadoop.hbase.io.util.BlockIOUtils; 048import org.apache.hadoop.hbase.nio.ByteBuff; 049import org.apache.hadoop.hbase.nio.MultiByteBuff; 050import org.apache.hadoop.hbase.nio.SingleByteBuff; 051import org.apache.hadoop.hbase.regionserver.ShipperListener; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.apache.hadoop.hbase.util.ChecksumType; 054import org.apache.hadoop.hbase.util.ClassSize; 055import org.apache.yetus.audience.InterfaceAudience; 056import org.slf4j.Logger; 057import org.slf4j.LoggerFactory; 058import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 059import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 060 061/** 062 * Cacheable Blocks of an {@link HFile} version 2 file. 063 * Version 2 was introduced in hbase-0.92.0. 064 * 065 * <p>Version 1 was the original file block. Version 2 was introduced when we changed the hbase file 066 * format to support multi-level block indexes and compound bloom filters (HBASE-3857). Support 067 * for Version 1 was removed in hbase-1.3.0. 068 * 069 * <h3>HFileBlock: Version 2</h3> 070 * In version 2, a block is structured as follows: 071 * <ul> 072 * <li><b>Header:</b> See Writer#putHeader() for where header is written; header total size is 073 * HFILEBLOCK_HEADER_SIZE 074 * <ul> 075 * <li>0. blockType: Magic record identifying the {@link BlockType} (8 bytes): 076 * e.g. <code>DATABLK*</code> 077 * <li>1. onDiskSizeWithoutHeader: Compressed -- a.k.a 'on disk' -- block size, excluding header, 078 * but including tailing checksum bytes (4 bytes) 079 * <li>2. uncompressedSizeWithoutHeader: Uncompressed block size, excluding header, and excluding 080 * checksum bytes (4 bytes) 081 * <li>3. prevBlockOffset: The offset of the previous block of the same type (8 bytes). This is 082 * used to navigate to the previous block without having to go to the block index 083 * <li>4: For minorVersions >=1, the ordinal describing checksum type (1 byte) 084 * <li>5: For minorVersions >=1, the number of data bytes/checksum chunk (4 bytes) 085 * <li>6: onDiskDataSizeWithHeader: For minorVersions >=1, the size of data 'on disk', including 086 * header, excluding checksums (4 bytes) 087 * </ul> 088 * </li> 089 * <li><b>Raw/Compressed/Encrypted/Encoded data:</b> The compression 090 * algorithm is the same for all the blocks in an {@link HFile}. If compression is NONE, this is 091 * just raw, serialized Cells. 092 * <li><b>Tail:</b> For minorVersions >=1, a series of 4 byte checksums, one each for 093 * the number of bytes specified by bytesPerChecksum. 094 * </ul> 095 * 096 * <h3>Caching</h3> 097 * Caches cache whole blocks with trailing checksums if any. We then tag on some metadata, the 098 * content of BLOCK_METADATA_SPACE which will be flag on if we are doing 'hbase' 099 * checksums and then the offset into the file which is needed when we re-make a cache key 100 * when we return the block to the cache as 'done'. 101 * See {@link Cacheable#serialize(ByteBuffer, boolean)} and {@link Cacheable#getDeserializer()}. 102 * 103 * <p>TODO: Should we cache the checksums? Down in Writer#getBlockForCaching(CacheConfig) where 104 * we make a block to cache-on-write, there is an attempt at turning off checksums. This is not the 105 * only place we get blocks to cache. We also will cache the raw return from an hdfs read. In this 106 * case, the checksums may be present. If the cache is backed by something that doesn't do ECC, 107 * say an SSD, we might want to preserve checksums. For now this is open question. 108 * <p>TODO: Over in BucketCache, we save a block allocation by doing a custom serialization. 109 * Be sure to change it if serialization changes in here. Could we add a method here that takes an 110 * IOEngine and that then serializes to it rather than expose our internals over in BucketCache? 111 * IOEngine is in the bucket subpackage. Pull it up? Then this class knows about bucketcache. Ugh. 112 */ 113@InterfaceAudience.Private 114public class HFileBlock implements Cacheable { 115 private static final Logger LOG = LoggerFactory.getLogger(HFileBlock.class); 116 public static final int FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT + 117 // BlockType, ByteBuff, MemoryType, HFileContext, ByteBuffAllocator 118 5 * ClassSize.REFERENCE + 119 // On-disk size, uncompressed size, and next block's on-disk size 120 // bytePerChecksum and onDiskDataSize 121 4 * Bytes.SIZEOF_INT + 122 // This and previous block offset 123 2 * Bytes.SIZEOF_LONG); 124 125 // Block Header fields. 126 127 // TODO: encapsulate Header related logic in this inner class. 128 static class Header { 129 // Format of header is: 130 // 8 bytes - block magic 131 // 4 bytes int - onDiskSizeWithoutHeader 132 // 4 bytes int - uncompressedSizeWithoutHeader 133 // 8 bytes long - prevBlockOffset 134 // The following 3 are only present if header contains checksum information 135 // 1 byte - checksum type 136 // 4 byte int - bytes per checksum 137 // 4 byte int - onDiskDataSizeWithHeader 138 static int BLOCK_MAGIC_INDEX = 0; 139 static int ON_DISK_SIZE_WITHOUT_HEADER_INDEX = 8; 140 static int UNCOMPRESSED_SIZE_WITHOUT_HEADER_INDEX = 12; 141 static int PREV_BLOCK_OFFSET_INDEX = 16; 142 static int CHECKSUM_TYPE_INDEX = 24; 143 static int BYTES_PER_CHECKSUM_INDEX = 25; 144 static int ON_DISK_DATA_SIZE_WITH_HEADER_INDEX = 29; 145 } 146 147 /** Type of block. Header field 0. */ 148 private BlockType blockType; 149 150 /** 151 * Size on disk excluding header, including checksum. Header field 1. 152 * @see Writer#putHeader(byte[], int, int, int, int) 153 */ 154 private int onDiskSizeWithoutHeader; 155 156 /** 157 * Size of pure data. Does not include header or checksums. Header field 2. 158 * @see Writer#putHeader(byte[], int, int, int, int) 159 */ 160 private int uncompressedSizeWithoutHeader; 161 162 /** 163 * The offset of the previous block on disk. Header field 3. 164 * @see Writer#putHeader(byte[], int, int, int, int) 165 */ 166 private long prevBlockOffset; 167 168 /** 169 * Size on disk of header + data. Excludes checksum. Header field 6, 170 * OR calculated from {@link #onDiskSizeWithoutHeader} when using HDFS checksum. 171 * @see Writer#putHeader(byte[], int, int, int, int) 172 */ 173 private int onDiskDataSizeWithHeader; 174 // End of Block Header fields. 175 176 /** 177 * The in-memory representation of the hfile block. Can be on or offheap. Can be backed by 178 * a single ByteBuffer or by many. Make no assumptions. 179 * 180 * <p>Be careful reading from this <code>buf</code>. Duplicate and work on the duplicate or if 181 * not, be sure to reset position and limit else trouble down the road. 182 * 183 * <p>TODO: Make this read-only once made. 184 * 185 * <p>We are using the ByteBuff type. ByteBuffer is not extensible yet we need to be able to have 186 * a ByteBuffer-like API across multiple ByteBuffers reading from a cache such as BucketCache. 187 * So, we have this ByteBuff type. Unfortunately, it is spread all about HFileBlock. Would be 188 * good if could be confined to cache-use only but hard-to-do. 189 */ 190 private ByteBuff buf; 191 192 /** Meta data that holds meta information on the hfileblock. 193 */ 194 private HFileContext fileContext; 195 196 /** 197 * The offset of this block in the file. Populated by the reader for 198 * convenience of access. This offset is not part of the block header. 199 */ 200 private long offset = UNSET; 201 202 /** 203 * The on-disk size of the next block, including the header and checksums if present. 204 * UNSET if unknown. 205 * 206 * Blocks try to carry the size of the next block to read in this data member. Usually 207 * we get block sizes from the hfile index but sometimes the index is not available: 208 * e.g. when we read the indexes themselves (indexes are stored in blocks, we do not 209 * have an index for the indexes). Saves seeks especially around file open when 210 * there is a flurry of reading in hfile metadata. 211 */ 212 private int nextBlockOnDiskSize = UNSET; 213 214 private ByteBuffAllocator allocator; 215 216 /** 217 * On a checksum failure, do these many succeeding read requests using hdfs checksums before 218 * auto-reenabling hbase checksum verification. 219 */ 220 static final int CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD = 3; 221 222 private static int UNSET = -1; 223 public static final boolean FILL_HEADER = true; 224 public static final boolean DONT_FILL_HEADER = false; 225 226 // How to get the estimate correctly? if it is a singleBB? 227 public static final int MULTI_BYTE_BUFFER_HEAP_SIZE = 228 (int)ClassSize.estimateBase(MultiByteBuff.class, false); 229 230 /** 231 * Space for metadata on a block that gets stored along with the block when we cache it. 232 * There are a few bytes stuck on the end of the HFileBlock that we pull in from HDFS. 233 * 8 bytes are for the offset of this block (long) in the file. Offset is important because is is 234 * used when we remake the CacheKey when we return block to the cache when done. There is also 235 * a flag on whether checksumming is being done by hbase or not. See class comment for note on 236 * uncertain state of checksumming of blocks that come out of cache (should we or should we not?). 237 * Finally there are 4 bytes to hold the length of the next block which can save a seek on 238 * occasion if available. 239 * (This EXTRA info came in with original commit of the bucketcache, HBASE-7404. It was 240 * formerly known as EXTRA_SERIALIZATION_SPACE). 241 */ 242 static final int BLOCK_METADATA_SPACE = Bytes.SIZEOF_BYTE + Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT; 243 244 /** 245 * Each checksum value is an integer that can be stored in 4 bytes. 246 */ 247 static final int CHECKSUM_SIZE = Bytes.SIZEOF_INT; 248 249 static final byte[] DUMMY_HEADER_NO_CHECKSUM = 250 new byte[HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM]; 251 252 /** 253 * Used deserializing blocks from Cache. 254 * 255 * <code> 256 * ++++++++++++++ 257 * + HFileBlock + 258 * ++++++++++++++ 259 * + Checksums + <= Optional 260 * ++++++++++++++ 261 * + Metadata! + <= See note on BLOCK_METADATA_SPACE above. 262 * ++++++++++++++ 263 * </code> 264 * @see #serialize(ByteBuffer, boolean) 265 */ 266 public static final CacheableDeserializer<Cacheable> BLOCK_DESERIALIZER = new BlockDeserializer(); 267 268 public static final class BlockDeserializer implements CacheableDeserializer<Cacheable> { 269 private BlockDeserializer() { 270 } 271 272 @Override 273 public HFileBlock deserialize(ByteBuff buf, ByteBuffAllocator alloc) 274 throws IOException { 275 // The buf has the file block followed by block metadata. 276 // Set limit to just before the BLOCK_METADATA_SPACE then rewind. 277 buf.limit(buf.limit() - BLOCK_METADATA_SPACE).rewind(); 278 // Get a new buffer to pass the HFileBlock for it to 'own'. 279 ByteBuff newByteBuff = buf.slice(); 280 // Read out the BLOCK_METADATA_SPACE content and shove into our HFileBlock. 281 buf.position(buf.limit()); 282 buf.limit(buf.limit() + HFileBlock.BLOCK_METADATA_SPACE); 283 boolean usesChecksum = buf.get() == (byte) 1; 284 long offset = buf.getLong(); 285 int nextBlockOnDiskSize = buf.getInt(); 286 return createFromBuff(newByteBuff, usesChecksum, offset, nextBlockOnDiskSize, null, alloc); 287 } 288 289 @Override 290 public int getDeserializerIdentifier() { 291 return DESERIALIZER_IDENTIFIER; 292 } 293 } 294 295 private static final int DESERIALIZER_IDENTIFIER; 296 static { 297 DESERIALIZER_IDENTIFIER = 298 CacheableDeserializerIdManager.registerDeserializer(BLOCK_DESERIALIZER); 299 } 300 301 /** 302 * Creates a new {@link HFile} block from the given fields. This constructor 303 * is used only while writing blocks and caching, 304 * and is sitting in a byte buffer and we want to stuff the block into cache. 305 * See {@link Writer#getBlockForCaching(CacheConfig)}. 306 * 307 * <p>TODO: The caller presumes no checksumming 308 * <p>TODO: HFile block writer can also off-heap ? </p> 309 * required of this block instance since going into cache; checksum already verified on 310 * underlying block data pulled in from filesystem. Is that correct? What if cache is SSD? 311 * 312 * @param blockType the type of this block, see {@link BlockType} 313 * @param onDiskSizeWithoutHeader see {@link #onDiskSizeWithoutHeader} 314 * @param uncompressedSizeWithoutHeader see {@link #uncompressedSizeWithoutHeader} 315 * @param prevBlockOffset see {@link #prevBlockOffset} 316 * @param buf block buffer with header ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes) 317 * @param fillHeader when true, write the first 4 header fields into passed buffer. 318 * @param offset the file offset the block was read from 319 * @param onDiskDataSizeWithHeader see {@link #onDiskDataSizeWithHeader} 320 * @param fileContext HFile meta data 321 */ 322 @VisibleForTesting 323 public HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader, 324 int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuff buf, boolean fillHeader, 325 long offset, int nextBlockOnDiskSize, int onDiskDataSizeWithHeader, HFileContext fileContext, 326 ByteBuffAllocator allocator) { 327 this.blockType = blockType; 328 this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader; 329 this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader; 330 this.prevBlockOffset = prevBlockOffset; 331 this.offset = offset; 332 this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader; 333 this.nextBlockOnDiskSize = nextBlockOnDiskSize; 334 this.fileContext = fileContext; 335 this.allocator = allocator; 336 this.buf = buf; 337 if (fillHeader) { 338 overwriteHeader(); 339 } 340 this.buf.rewind(); 341 } 342 343 /** 344 * Creates a block from an existing buffer starting with a header. Rewinds 345 * and takes ownership of the buffer. By definition of rewind, ignores the 346 * buffer position, but if you slice the buffer beforehand, it will rewind 347 * to that point. 348 * @param buf Has header, content, and trailing checksums if present. 349 */ 350 static HFileBlock createFromBuff(ByteBuff buf, boolean usesHBaseChecksum, final long offset, 351 final int nextBlockOnDiskSize, HFileContext fileContext, ByteBuffAllocator allocator) 352 throws IOException { 353 buf.rewind(); 354 final BlockType blockType = BlockType.read(buf); 355 final int onDiskSizeWithoutHeader = buf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX); 356 final int uncompressedSizeWithoutHeader = 357 buf.getInt(Header.UNCOMPRESSED_SIZE_WITHOUT_HEADER_INDEX); 358 final long prevBlockOffset = buf.getLong(Header.PREV_BLOCK_OFFSET_INDEX); 359 // This constructor is called when we deserialize a block from cache and when we read a block in 360 // from the fs. fileCache is null when deserialized from cache so need to make up one. 361 HFileContextBuilder fileContextBuilder = fileContext != null ? 362 new HFileContextBuilder(fileContext) : new HFileContextBuilder(); 363 fileContextBuilder.withHBaseCheckSum(usesHBaseChecksum); 364 int onDiskDataSizeWithHeader; 365 if (usesHBaseChecksum) { 366 byte checksumType = buf.get(Header.CHECKSUM_TYPE_INDEX); 367 int bytesPerChecksum = buf.getInt(Header.BYTES_PER_CHECKSUM_INDEX); 368 onDiskDataSizeWithHeader = buf.getInt(Header.ON_DISK_DATA_SIZE_WITH_HEADER_INDEX); 369 // Use the checksum type and bytes per checksum from header, not from fileContext. 370 fileContextBuilder.withChecksumType(ChecksumType.codeToType(checksumType)); 371 fileContextBuilder.withBytesPerCheckSum(bytesPerChecksum); 372 } else { 373 fileContextBuilder.withChecksumType(ChecksumType.NULL); 374 fileContextBuilder.withBytesPerCheckSum(0); 375 // Need to fix onDiskDataSizeWithHeader; there are not checksums after-block-data 376 onDiskDataSizeWithHeader = onDiskSizeWithoutHeader + headerSize(usesHBaseChecksum); 377 } 378 fileContext = fileContextBuilder.build(); 379 assert usesHBaseChecksum == fileContext.isUseHBaseChecksum(); 380 return new HFileBlockBuilder() 381 .withBlockType(blockType) 382 .withOnDiskSizeWithoutHeader(onDiskSizeWithoutHeader) 383 .withUncompressedSizeWithoutHeader(uncompressedSizeWithoutHeader) 384 .withPrevBlockOffset(prevBlockOffset) 385 .withOffset(offset) 386 .withOnDiskDataSizeWithHeader(onDiskDataSizeWithHeader) 387 .withNextBlockOnDiskSize(nextBlockOnDiskSize) 388 .withHFileContext(fileContext) 389 .withByteBuffAllocator(allocator) 390 .withByteBuff(buf.rewind()) 391 .withShared(!buf.hasArray()) 392 .build(); 393 } 394 395 /** 396 * Parse total on disk size including header and checksum. 397 * @param headerBuf Header ByteBuffer. Presumed exact size of header. 398 * @param verifyChecksum true if checksum verification is in use. 399 * @return Size of the block with header included. 400 */ 401 private static int getOnDiskSizeWithHeader(final ByteBuff headerBuf, 402 boolean verifyChecksum) { 403 return headerBuf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX) + headerSize(verifyChecksum); 404 } 405 406 /** 407 * @return the on-disk size of the next block (including the header size and any checksums if 408 * present) read by peeking into the next block's header; use as a hint when doing 409 * a read of the next block when scanning or running over a file. 410 */ 411 int getNextBlockOnDiskSize() { 412 return nextBlockOnDiskSize; 413 } 414 415 @Override 416 public BlockType getBlockType() { 417 return blockType; 418 } 419 420 @Override 421 public int refCnt() { 422 return buf.refCnt(); 423 } 424 425 @Override 426 public HFileBlock retain() { 427 buf.retain(); 428 return this; 429 } 430 431 /** 432 * Call {@link ByteBuff#release()} to decrease the reference count, if no other reference, it will 433 * return back the {@link ByteBuffer} to {@link org.apache.hadoop.hbase.io.ByteBuffAllocator} 434 */ 435 @Override 436 public boolean release() { 437 return buf.release(); 438 } 439 440 /** @return get data block encoding id that was used to encode this block */ 441 short getDataBlockEncodingId() { 442 if (blockType != BlockType.ENCODED_DATA) { 443 throw new IllegalArgumentException("Querying encoder ID of a block " + 444 "of type other than " + BlockType.ENCODED_DATA + ": " + blockType); 445 } 446 return buf.getShort(headerSize()); 447 } 448 449 /** 450 * @return the on-disk size of header + data part + checksum. 451 */ 452 public int getOnDiskSizeWithHeader() { 453 return onDiskSizeWithoutHeader + headerSize(); 454 } 455 456 /** 457 * @return the on-disk size of the data part + checksum (header excluded). 458 */ 459 int getOnDiskSizeWithoutHeader() { 460 return onDiskSizeWithoutHeader; 461 } 462 463 /** 464 * @return the uncompressed size of data part (header and checksum excluded). 465 */ 466 int getUncompressedSizeWithoutHeader() { 467 return uncompressedSizeWithoutHeader; 468 } 469 470 /** 471 * @return the offset of the previous block of the same type in the file, or 472 * -1 if unknown 473 */ 474 long getPrevBlockOffset() { 475 return prevBlockOffset; 476 } 477 478 /** 479 * Rewinds {@code buf} and writes first 4 header fields. {@code buf} position 480 * is modified as side-effect. 481 */ 482 private void overwriteHeader() { 483 buf.rewind(); 484 blockType.write(buf); 485 buf.putInt(onDiskSizeWithoutHeader); 486 buf.putInt(uncompressedSizeWithoutHeader); 487 buf.putLong(prevBlockOffset); 488 if (this.fileContext.isUseHBaseChecksum()) { 489 buf.put(fileContext.getChecksumType().getCode()); 490 buf.putInt(fileContext.getBytesPerChecksum()); 491 buf.putInt(onDiskDataSizeWithHeader); 492 } 493 } 494 495 /** 496 * Returns a buffer that does not include the header and checksum. 497 * @return the buffer with header skipped and checksum omitted. 498 */ 499 public ByteBuff getBufferWithoutHeader() { 500 return this.getBufferWithoutHeader(false); 501 } 502 503 /** 504 * Returns a buffer that does not include the header or checksum. 505 * @param withChecksum to indicate whether include the checksum or not. 506 * @return the buffer with header skipped and checksum omitted. 507 */ 508 public ByteBuff getBufferWithoutHeader(boolean withChecksum) { 509 ByteBuff dup = getBufferReadOnly(); 510 int delta = withChecksum ? 0 : totalChecksumBytes(); 511 return dup.position(headerSize()).limit(buf.limit() - delta).slice(); 512 } 513 514 /** 515 * Returns a read-only duplicate of the buffer this block stores internally ready to be read. 516 * Clients must not modify the buffer object though they may set position and limit on the 517 * returned buffer since we pass back a duplicate. This method has to be public because it is used 518 * in {@link CompoundBloomFilter} to avoid object creation on every Bloom 519 * filter lookup, but has to be used with caution. Buffer holds header, block content, 520 * and any follow-on checksums if present. 521 * 522 * @return the buffer of this block for read-only operations 523 */ 524 public ByteBuff getBufferReadOnly() { 525 // TODO: ByteBuf does not support asReadOnlyBuffer(). Fix. 526 ByteBuff dup = this.buf.duplicate(); 527 assert dup.position() == 0; 528 return dup; 529 } 530 531 public ByteBuffAllocator getByteBuffAllocator() { 532 return this.allocator; 533 } 534 535 @VisibleForTesting 536 private void sanityCheckAssertion(long valueFromBuf, long valueFromField, 537 String fieldName) throws IOException { 538 if (valueFromBuf != valueFromField) { 539 throw new AssertionError(fieldName + " in the buffer (" + valueFromBuf 540 + ") is different from that in the field (" + valueFromField + ")"); 541 } 542 } 543 544 @VisibleForTesting 545 private void sanityCheckAssertion(BlockType valueFromBuf, BlockType valueFromField) 546 throws IOException { 547 if (valueFromBuf != valueFromField) { 548 throw new IOException("Block type stored in the buffer: " + 549 valueFromBuf + ", block type field: " + valueFromField); 550 } 551 } 552 553 /** 554 * Checks if the block is internally consistent, i.e. the first 555 * {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the buffer contain a 556 * valid header consistent with the fields. Assumes a packed block structure. 557 * This function is primary for testing and debugging, and is not 558 * thread-safe, because it alters the internal buffer pointer. 559 * Used by tests only. 560 */ 561 @VisibleForTesting 562 void sanityCheck() throws IOException { 563 // Duplicate so no side-effects 564 ByteBuff dup = this.buf.duplicate().rewind(); 565 sanityCheckAssertion(BlockType.read(dup), blockType); 566 567 sanityCheckAssertion(dup.getInt(), onDiskSizeWithoutHeader, "onDiskSizeWithoutHeader"); 568 569 sanityCheckAssertion(dup.getInt(), uncompressedSizeWithoutHeader, 570 "uncompressedSizeWithoutHeader"); 571 572 sanityCheckAssertion(dup.getLong(), prevBlockOffset, "prevBlockOffset"); 573 if (this.fileContext.isUseHBaseChecksum()) { 574 sanityCheckAssertion(dup.get(), this.fileContext.getChecksumType().getCode(), "checksumType"); 575 sanityCheckAssertion(dup.getInt(), this.fileContext.getBytesPerChecksum(), 576 "bytesPerChecksum"); 577 sanityCheckAssertion(dup.getInt(), onDiskDataSizeWithHeader, "onDiskDataSizeWithHeader"); 578 } 579 580 int cksumBytes = totalChecksumBytes(); 581 int expectedBufLimit = onDiskDataSizeWithHeader + cksumBytes; 582 if (dup.limit() != expectedBufLimit) { 583 throw new AssertionError("Expected limit " + expectedBufLimit + ", got " + dup.limit()); 584 } 585 586 // We might optionally allocate HFILEBLOCK_HEADER_SIZE more bytes to read the next 587 // block's header, so there are two sensible values for buffer capacity. 588 int hdrSize = headerSize(); 589 dup.rewind(); 590 if (dup.remaining() != expectedBufLimit && dup.remaining() != expectedBufLimit + hdrSize) { 591 throw new AssertionError("Invalid buffer capacity: " + dup.remaining() + 592 ", expected " + expectedBufLimit + " or " + (expectedBufLimit + hdrSize)); 593 } 594 } 595 596 @Override 597 public String toString() { 598 StringBuilder sb = new StringBuilder() 599 .append("[") 600 .append("blockType=").append(blockType) 601 .append(", fileOffset=").append(offset) 602 .append(", headerSize=").append(headerSize()) 603 .append(", onDiskSizeWithoutHeader=").append(onDiskSizeWithoutHeader) 604 .append(", uncompressedSizeWithoutHeader=").append(uncompressedSizeWithoutHeader) 605 .append(", prevBlockOffset=").append(prevBlockOffset) 606 .append(", isUseHBaseChecksum=").append(fileContext.isUseHBaseChecksum()); 607 if (fileContext.isUseHBaseChecksum()) { 608 sb.append(", checksumType=").append(ChecksumType.codeToType(this.buf.get(24))) 609 .append(", bytesPerChecksum=").append(this.buf.getInt(24 + 1)) 610 .append(", onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader); 611 } else { 612 sb.append(", onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader) 613 .append("(").append(onDiskSizeWithoutHeader) 614 .append("+").append(HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM).append(")"); 615 } 616 String dataBegin; 617 if (buf.hasArray()) { 618 dataBegin = Bytes.toStringBinary(buf.array(), buf.arrayOffset() + headerSize(), 619 Math.min(32, buf.limit() - buf.arrayOffset() - headerSize())); 620 } else { 621 ByteBuff bufWithoutHeader = getBufferWithoutHeader(); 622 byte[] dataBeginBytes = new byte[Math.min(32, 623 bufWithoutHeader.limit() - bufWithoutHeader.position())]; 624 bufWithoutHeader.get(dataBeginBytes); 625 dataBegin = Bytes.toStringBinary(dataBeginBytes); 626 } 627 sb.append(", getOnDiskSizeWithHeader=").append(getOnDiskSizeWithHeader()) 628 .append(", totalChecksumBytes=").append(totalChecksumBytes()) 629 .append(", isUnpacked=").append(isUnpacked()) 630 .append(", buf=[").append(buf).append("]") 631 .append(", dataBeginsWith=").append(dataBegin) 632 .append(", fileContext=").append(fileContext) 633 .append(", nextBlockOnDiskSize=").append(nextBlockOnDiskSize) 634 .append("]"); 635 return sb.toString(); 636 } 637 638 /** 639 * Retrieves the decompressed/decrypted view of this block. An encoded block remains in its 640 * encoded structure. Internal structures are shared between instances where applicable. 641 */ 642 HFileBlock unpack(HFileContext fileContext, FSReader reader) throws IOException { 643 if (!fileContext.isCompressedOrEncrypted()) { 644 // TODO: cannot use our own fileContext here because HFileBlock(ByteBuffer, boolean), 645 // which is used for block serialization to L2 cache, does not preserve encoding and 646 // encryption details. 647 return this; 648 } 649 650 HFileBlock unpacked = shallowClone(this); 651 unpacked.allocateBuffer(); // allocates space for the decompressed block 652 boolean succ = false; 653 try { 654 HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA 655 ? reader.getBlockDecodingContext() : reader.getDefaultBlockDecodingContext(); 656 // Create a duplicated buffer without the header part. 657 ByteBuff dup = this.buf.duplicate(); 658 dup.position(this.headerSize()); 659 dup = dup.slice(); 660 // Decode the dup into unpacked#buf 661 ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(), 662 unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(true), dup); 663 succ = true; 664 return unpacked; 665 } finally { 666 if (!succ) { 667 unpacked.release(); 668 } 669 } 670 } 671 672 /** 673 * Always allocates a new buffer of the correct size. Copies header bytes 674 * from the existing buffer. Does not change header fields. 675 * Reserve room to keep checksum bytes too. 676 */ 677 private void allocateBuffer() { 678 int cksumBytes = totalChecksumBytes(); 679 int headerSize = headerSize(); 680 int capacityNeeded = headerSize + uncompressedSizeWithoutHeader + cksumBytes; 681 682 ByteBuff newBuf = allocator.allocate(capacityNeeded); 683 684 // Copy header bytes into newBuf. 685 buf.position(0); 686 newBuf.put(0, buf, 0, headerSize); 687 688 buf = newBuf; 689 // set limit to exclude next block's header 690 buf.limit(capacityNeeded); 691 } 692 693 /** 694 * Return true when this block's buffer has been unpacked, false otherwise. Note this is a 695 * calculated heuristic, not tracked attribute of the block. 696 */ 697 public boolean isUnpacked() { 698 final int cksumBytes = totalChecksumBytes(); 699 final int headerSize = headerSize(); 700 final int expectedCapacity = headerSize + uncompressedSizeWithoutHeader + cksumBytes; 701 final int bufCapacity = buf.remaining(); 702 return bufCapacity == expectedCapacity || bufCapacity == expectedCapacity + headerSize; 703 } 704 705 /** 706 * Cannot be {@link #UNSET}. Must be a legitimate value. Used re-making the {@link BlockCacheKey} 707 * when block is returned to the cache. 708 * @return the offset of this block in the file it was read from 709 */ 710 long getOffset() { 711 if (offset < 0) { 712 throw new IllegalStateException("HFile block offset not initialized properly"); 713 } 714 return offset; 715 } 716 717 /** 718 * @return a byte stream reading the data + checksum of this block 719 */ 720 DataInputStream getByteStream() { 721 ByteBuff dup = this.buf.duplicate(); 722 dup.position(this.headerSize()); 723 return new DataInputStream(new ByteBuffInputStream(dup)); 724 } 725 726 @Override 727 public long heapSize() { 728 long size = FIXED_OVERHEAD; 729 size += fileContext.heapSize(); 730 if (buf != null) { 731 // Deep overhead of the byte buffer. Needs to be aligned separately. 732 size += ClassSize.align(buf.capacity() + MULTI_BYTE_BUFFER_HEAP_SIZE); 733 } 734 return ClassSize.align(size); 735 } 736 737 /** 738 * Will be override by {@link SharedMemHFileBlock} or {@link ExclusiveMemHFileBlock}. Return true 739 * by default. 740 */ 741 public boolean isSharedMem() { 742 if (this instanceof SharedMemHFileBlock) { 743 return true; 744 } else if (this instanceof ExclusiveMemHFileBlock) { 745 return false; 746 } 747 return true; 748 } 749 750 /** 751 * Unified version 2 {@link HFile} block writer. The intended usage pattern 752 * is as follows: 753 * <ol> 754 * <li>Construct an {@link HFileBlock.Writer}, providing a compression algorithm. 755 * <li>Call {@link Writer#startWriting} and get a data stream to write to. 756 * <li>Write your data into the stream. 757 * <li>Call Writer#writeHeaderAndData(FSDataOutputStream) as many times as you need to. 758 * store the serialized block into an external stream. 759 * <li>Repeat to write more blocks. 760 * </ol> 761 * <p> 762 */ 763 static class Writer implements ShipperListener { 764 private enum State { 765 INIT, 766 WRITING, 767 BLOCK_READY 768 }; 769 770 /** Writer state. Used to ensure the correct usage protocol. */ 771 private State state = State.INIT; 772 773 /** Data block encoder used for data blocks */ 774 private final HFileDataBlockEncoder dataBlockEncoder; 775 776 private HFileBlockEncodingContext dataBlockEncodingCtx; 777 778 /** block encoding context for non-data blocks*/ 779 private HFileBlockDefaultEncodingContext defaultBlockEncodingCtx; 780 781 /** 782 * The stream we use to accumulate data into a block in an uncompressed format. 783 * We reset this stream at the end of each block and reuse it. The 784 * header is written as the first {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes into this 785 * stream. 786 */ 787 private ByteArrayOutputStream baosInMemory; 788 789 /** 790 * Current block type. Set in {@link #startWriting(BlockType)}. Could be 791 * changed in {@link #finishBlock()} from {@link BlockType#DATA} 792 * to {@link BlockType#ENCODED_DATA}. 793 */ 794 private BlockType blockType; 795 796 /** 797 * A stream that we write uncompressed bytes to, which compresses them and 798 * writes them to {@link #baosInMemory}. 799 */ 800 private DataOutputStream userDataStream; 801 802 /** 803 * Bytes to be written to the file system, including the header. Compressed 804 * if compression is turned on. It also includes the checksum data that 805 * immediately follows the block data. (header + data + checksums) 806 */ 807 private ByteArrayOutputStream onDiskBlockBytesWithHeader; 808 809 /** 810 * The size of the checksum data on disk. It is used only if data is 811 * not compressed. If data is compressed, then the checksums are already 812 * part of onDiskBytesWithHeader. If data is uncompressed, then this 813 * variable stores the checksum data for this block. 814 */ 815 private byte[] onDiskChecksum = HConstants.EMPTY_BYTE_ARRAY; 816 817 /** 818 * Current block's start offset in the {@link HFile}. Set in 819 * {@link #writeHeaderAndData(FSDataOutputStream)}. 820 */ 821 private long startOffset; 822 823 /** 824 * Offset of previous block by block type. Updated when the next block is 825 * started. 826 */ 827 private long[] prevOffsetByType; 828 829 /** The offset of the previous block of the same type */ 830 private long prevOffset; 831 /** Meta data that holds information about the hfileblock**/ 832 private HFileContext fileContext; 833 834 private final ByteBuffAllocator allocator; 835 836 @Override 837 public void beforeShipped() { 838 if (getEncodingState() != null) { 839 getEncodingState().beforeShipped(); 840 } 841 } 842 843 EncodingState getEncodingState() { 844 return dataBlockEncodingCtx.getEncodingState(); 845 } 846 847 /** 848 * @param dataBlockEncoder data block encoding algorithm to use 849 */ 850 @VisibleForTesting 851 public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext fileContext) { 852 this(dataBlockEncoder, fileContext, ByteBuffAllocator.HEAP); 853 } 854 855 public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext fileContext, 856 ByteBuffAllocator allocator) { 857 if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) { 858 throw new RuntimeException("Unsupported value of bytesPerChecksum. " + 859 " Minimum is " + HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is " + 860 fileContext.getBytesPerChecksum()); 861 } 862 this.allocator = allocator; 863 this.dataBlockEncoder = dataBlockEncoder != null? 864 dataBlockEncoder: NoOpDataBlockEncoder.INSTANCE; 865 this.dataBlockEncodingCtx = this.dataBlockEncoder. 866 newDataBlockEncodingContext(HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext); 867 // TODO: This should be lazily instantiated since we usually do NOT need this default encoder 868 this.defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(null, 869 HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext); 870 // TODO: Set BAOS initial size. Use fileContext.getBlocksize() and add for header/checksum 871 baosInMemory = new ByteArrayOutputStream(); 872 prevOffsetByType = new long[BlockType.values().length]; 873 for (int i = 0; i < prevOffsetByType.length; ++i) { 874 prevOffsetByType[i] = UNSET; 875 } 876 // TODO: Why fileContext saved away when we have dataBlockEncoder and/or 877 // defaultDataBlockEncoder? 878 this.fileContext = fileContext; 879 } 880 881 /** 882 * Starts writing into the block. The previous block's data is discarded. 883 * 884 * @return the stream the user can write their data into 885 */ 886 DataOutputStream startWriting(BlockType newBlockType) 887 throws IOException { 888 if (state == State.BLOCK_READY && startOffset != -1) { 889 // We had a previous block that was written to a stream at a specific 890 // offset. Save that offset as the last offset of a block of that type. 891 prevOffsetByType[blockType.getId()] = startOffset; 892 } 893 894 startOffset = -1; 895 blockType = newBlockType; 896 897 baosInMemory.reset(); 898 baosInMemory.write(HConstants.HFILEBLOCK_DUMMY_HEADER); 899 900 state = State.WRITING; 901 902 // We will compress it later in finishBlock() 903 userDataStream = new ByteBufferWriterDataOutputStream(baosInMemory); 904 if (newBlockType == BlockType.DATA) { 905 this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream); 906 } 907 return userDataStream; 908 } 909 910 /** 911 * Writes the Cell to this block 912 */ 913 void write(Cell cell) throws IOException{ 914 expectState(State.WRITING); 915 this.dataBlockEncoder.encode(cell, dataBlockEncodingCtx, this.userDataStream); 916 } 917 918 /** 919 * Transitions the block writer from the "writing" state to the "block 920 * ready" state. Does nothing if a block is already finished. 921 */ 922 void ensureBlockReady() throws IOException { 923 Preconditions.checkState(state != State.INIT, 924 "Unexpected state: " + state); 925 926 if (state == State.BLOCK_READY) { 927 return; 928 } 929 930 // This will set state to BLOCK_READY. 931 finishBlock(); 932 } 933 934 /** 935 * Finish up writing of the block. 936 * Flushes the compressing stream (if using compression), fills out the header, 937 * does any compression/encryption of bytes to flush out to disk, and manages 938 * the cache on write content, if applicable. Sets block write state to "block ready". 939 */ 940 private void finishBlock() throws IOException { 941 if (blockType == BlockType.DATA) { 942 this.dataBlockEncoder.endBlockEncoding(dataBlockEncodingCtx, userDataStream, 943 baosInMemory.getBuffer(), blockType); 944 blockType = dataBlockEncodingCtx.getBlockType(); 945 } 946 userDataStream.flush(); 947 prevOffset = prevOffsetByType[blockType.getId()]; 948 949 // We need to set state before we can package the block up for cache-on-write. In a way, the 950 // block is ready, but not yet encoded or compressed. 951 state = State.BLOCK_READY; 952 Bytes compressAndEncryptDat; 953 if (blockType == BlockType.DATA || blockType == BlockType.ENCODED_DATA) { 954 compressAndEncryptDat = dataBlockEncodingCtx. 955 compressAndEncrypt(baosInMemory.getBuffer(), 0, baosInMemory.size()); 956 } else { 957 compressAndEncryptDat = defaultBlockEncodingCtx. 958 compressAndEncrypt(baosInMemory.getBuffer(), 0, baosInMemory.size()); 959 } 960 if (compressAndEncryptDat == null) { 961 compressAndEncryptDat = new Bytes(baosInMemory.getBuffer(), 0, baosInMemory.size()); 962 } 963 if (onDiskBlockBytesWithHeader == null) { 964 onDiskBlockBytesWithHeader = new ByteArrayOutputStream(compressAndEncryptDat.getLength()); 965 } 966 onDiskBlockBytesWithHeader.reset(); 967 onDiskBlockBytesWithHeader.write(compressAndEncryptDat.get(), 968 compressAndEncryptDat.getOffset(), compressAndEncryptDat.getLength()); 969 // Calculate how many bytes we need for checksum on the tail of the block. 970 int numBytes = (int) ChecksumUtil.numBytes( 971 onDiskBlockBytesWithHeader.size(), 972 fileContext.getBytesPerChecksum()); 973 974 // Put the header for the on disk bytes; header currently is unfilled-out 975 putHeader(onDiskBlockBytesWithHeader, 976 onDiskBlockBytesWithHeader.size() + numBytes, 977 baosInMemory.size(), onDiskBlockBytesWithHeader.size()); 978 if (onDiskChecksum.length != numBytes) { 979 onDiskChecksum = new byte[numBytes]; 980 } 981 ChecksumUtil.generateChecksums( 982 onDiskBlockBytesWithHeader.getBuffer(), 0,onDiskBlockBytesWithHeader.size(), 983 onDiskChecksum, 0, fileContext.getChecksumType(), fileContext.getBytesPerChecksum()); 984 } 985 986 /** 987 * Put the header into the given byte array at the given offset. 988 * @param onDiskSize size of the block on disk header + data + checksum 989 * @param uncompressedSize size of the block after decompression (but 990 * before optional data block decoding) including header 991 * @param onDiskDataSize size of the block on disk with header 992 * and data but not including the checksums 993 */ 994 private void putHeader(byte[] dest, int offset, int onDiskSize, 995 int uncompressedSize, int onDiskDataSize) { 996 offset = blockType.put(dest, offset); 997 offset = Bytes.putInt(dest, offset, onDiskSize - HConstants.HFILEBLOCK_HEADER_SIZE); 998 offset = Bytes.putInt(dest, offset, uncompressedSize - HConstants.HFILEBLOCK_HEADER_SIZE); 999 offset = Bytes.putLong(dest, offset, prevOffset); 1000 offset = Bytes.putByte(dest, offset, fileContext.getChecksumType().getCode()); 1001 offset = Bytes.putInt(dest, offset, fileContext.getBytesPerChecksum()); 1002 Bytes.putInt(dest, offset, onDiskDataSize); 1003 } 1004 1005 private void putHeader(ByteBuff buff, int onDiskSize, 1006 int uncompressedSize, int onDiskDataSize) { 1007 buff.rewind(); 1008 blockType.write(buff); 1009 buff.putInt(onDiskSize - HConstants.HFILEBLOCK_HEADER_SIZE); 1010 buff.putInt(uncompressedSize - HConstants.HFILEBLOCK_HEADER_SIZE); 1011 buff.putLong(prevOffset); 1012 buff.put(fileContext.getChecksumType().getCode()); 1013 buff.putInt(fileContext.getBytesPerChecksum()); 1014 buff.putInt(onDiskDataSize); 1015 } 1016 1017 private void putHeader(ByteArrayOutputStream dest, int onDiskSize, 1018 int uncompressedSize, int onDiskDataSize) { 1019 putHeader(dest.getBuffer(),0, onDiskSize, uncompressedSize, onDiskDataSize); 1020 } 1021 1022 /** 1023 * Similar to {@link #writeHeaderAndData(FSDataOutputStream)}, but records 1024 * the offset of this block so that it can be referenced in the next block 1025 * of the same type. 1026 */ 1027 void writeHeaderAndData(FSDataOutputStream out) throws IOException { 1028 long offset = out.getPos(); 1029 if (startOffset != UNSET && offset != startOffset) { 1030 throw new IOException("A " + blockType + " block written to a " 1031 + "stream twice, first at offset " + startOffset + ", then at " 1032 + offset); 1033 } 1034 startOffset = offset; 1035 finishBlockAndWriteHeaderAndData(out); 1036 } 1037 1038 /** 1039 * Writes the header and the compressed data of this block (or uncompressed 1040 * data when not using compression) into the given stream. Can be called in 1041 * the "writing" state or in the "block ready" state. If called in the 1042 * "writing" state, transitions the writer to the "block ready" state. 1043 * @param out the output stream to write the 1044 */ 1045 protected void finishBlockAndWriteHeaderAndData(DataOutputStream out) 1046 throws IOException { 1047 ensureBlockReady(); 1048 long startTime = System.currentTimeMillis(); 1049 out.write(onDiskBlockBytesWithHeader.getBuffer(), 0, onDiskBlockBytesWithHeader.size()); 1050 out.write(onDiskChecksum); 1051 HFile.updateWriteLatency(System.currentTimeMillis() - startTime); 1052 } 1053 1054 /** 1055 * Returns the header or the compressed data (or uncompressed data when not 1056 * using compression) as a byte array. Can be called in the "writing" state 1057 * or in the "block ready" state. If called in the "writing" state, 1058 * transitions the writer to the "block ready" state. This returns 1059 * the header + data + checksums stored on disk. 1060 * 1061 * @return header and data as they would be stored on disk in a byte array 1062 */ 1063 byte[] getHeaderAndDataForTest() throws IOException { 1064 ensureBlockReady(); 1065 // This is not very optimal, because we are doing an extra copy. 1066 // But this method is used only by unit tests. 1067 byte[] output = 1068 new byte[onDiskBlockBytesWithHeader.size() 1069 + onDiskChecksum.length]; 1070 System.arraycopy(onDiskBlockBytesWithHeader.getBuffer(), 0, output, 0, 1071 onDiskBlockBytesWithHeader.size()); 1072 System.arraycopy(onDiskChecksum, 0, output, 1073 onDiskBlockBytesWithHeader.size(), onDiskChecksum.length); 1074 return output; 1075 } 1076 1077 /** 1078 * Releases resources used by this writer. 1079 */ 1080 void release() { 1081 if (dataBlockEncodingCtx != null) { 1082 dataBlockEncodingCtx.close(); 1083 dataBlockEncodingCtx = null; 1084 } 1085 if (defaultBlockEncodingCtx != null) { 1086 defaultBlockEncodingCtx.close(); 1087 defaultBlockEncodingCtx = null; 1088 } 1089 } 1090 1091 /** 1092 * Returns the on-disk size of the data portion of the block. This is the 1093 * compressed size if compression is enabled. Can only be called in the 1094 * "block ready" state. Header is not compressed, and its size is not 1095 * included in the return value. 1096 * 1097 * @return the on-disk size of the block, not including the header. 1098 */ 1099 int getOnDiskSizeWithoutHeader() { 1100 expectState(State.BLOCK_READY); 1101 return onDiskBlockBytesWithHeader.size() + 1102 onDiskChecksum.length - HConstants.HFILEBLOCK_HEADER_SIZE; 1103 } 1104 1105 /** 1106 * Returns the on-disk size of the block. Can only be called in the 1107 * "block ready" state. 1108 * 1109 * @return the on-disk size of the block ready to be written, including the 1110 * header size, the data and the checksum data. 1111 */ 1112 int getOnDiskSizeWithHeader() { 1113 expectState(State.BLOCK_READY); 1114 return onDiskBlockBytesWithHeader.size() + onDiskChecksum.length; 1115 } 1116 1117 /** 1118 * The uncompressed size of the block data. Does not include header size. 1119 */ 1120 int getUncompressedSizeWithoutHeader() { 1121 expectState(State.BLOCK_READY); 1122 return baosInMemory.size() - HConstants.HFILEBLOCK_HEADER_SIZE; 1123 } 1124 1125 /** 1126 * The uncompressed size of the block data, including header size. 1127 */ 1128 int getUncompressedSizeWithHeader() { 1129 expectState(State.BLOCK_READY); 1130 return baosInMemory.size(); 1131 } 1132 1133 /** @return true if a block is being written */ 1134 boolean isWriting() { 1135 return state == State.WRITING; 1136 } 1137 1138 /** 1139 * Returns the number of bytes written into the current block so far, or 1140 * zero if not writing the block at the moment. Note that this will return 1141 * zero in the "block ready" state as well. 1142 * 1143 * @return the number of bytes written 1144 */ 1145 public int encodedBlockSizeWritten() { 1146 return state != State.WRITING ? 0 : this.getEncodingState().getEncodedDataSizeWritten(); 1147 } 1148 1149 /** 1150 * Returns the number of bytes written into the current block so far, or 1151 * zero if not writing the block at the moment. Note that this will return 1152 * zero in the "block ready" state as well. 1153 * 1154 * @return the number of bytes written 1155 */ 1156 int blockSizeWritten() { 1157 return state != State.WRITING ? 0 : this.getEncodingState().getUnencodedDataSizeWritten(); 1158 } 1159 1160 /** 1161 * Clones the header followed by the uncompressed data, even if using 1162 * compression. This is needed for storing uncompressed blocks in the block 1163 * cache. Can be called in the "writing" state or the "block ready" state. 1164 * Returns only the header and data, does not include checksum data. 1165 * 1166 * @return Returns an uncompressed block ByteBuff for caching on write 1167 */ 1168 ByteBuff cloneUncompressedBufferWithHeader() { 1169 expectState(State.BLOCK_READY); 1170 ByteBuff bytebuff = allocator.allocate(baosInMemory.size()); 1171 baosInMemory.toByteBuff(bytebuff); 1172 int numBytes = (int) ChecksumUtil.numBytes( 1173 onDiskBlockBytesWithHeader.size(), 1174 fileContext.getBytesPerChecksum()); 1175 putHeader(bytebuff, onDiskBlockBytesWithHeader.size() + numBytes, 1176 baosInMemory.size(), onDiskBlockBytesWithHeader.size()); 1177 bytebuff.rewind(); 1178 return bytebuff; 1179 } 1180 1181 /** 1182 * Clones the header followed by the on-disk (compressed/encoded/encrypted) data. This is needed 1183 * for storing packed blocks in the block cache. Returns only the header and data, Does not 1184 * include checksum data. 1185 * @return Returns a copy of block bytes for caching on write 1186 */ 1187 private ByteBuff cloneOnDiskBufferWithHeader() { 1188 expectState(State.BLOCK_READY); 1189 ByteBuff bytebuff = allocator.allocate(onDiskBlockBytesWithHeader.size()); 1190 onDiskBlockBytesWithHeader.toByteBuff(bytebuff); 1191 bytebuff.rewind(); 1192 return bytebuff; 1193 } 1194 1195 private void expectState(State expectedState) { 1196 if (state != expectedState) { 1197 throw new IllegalStateException("Expected state: " + expectedState + 1198 ", actual state: " + state); 1199 } 1200 } 1201 1202 /** 1203 * Takes the given {@link BlockWritable} instance, creates a new block of 1204 * its appropriate type, writes the writable into this block, and flushes 1205 * the block into the output stream. The writer is instructed not to buffer 1206 * uncompressed bytes for cache-on-write. 1207 * 1208 * @param bw the block-writable object to write as a block 1209 * @param out the file system output stream 1210 */ 1211 void writeBlock(BlockWritable bw, FSDataOutputStream out) 1212 throws IOException { 1213 bw.writeToBlock(startWriting(bw.getBlockType())); 1214 writeHeaderAndData(out); 1215 } 1216 1217 /** 1218 * Creates a new HFileBlock. Checksums have already been validated, so 1219 * the byte buffer passed into the constructor of this newly created 1220 * block does not have checksum data even though the header minor 1221 * version is MINOR_VERSION_WITH_CHECKSUM. This is indicated by setting a 1222 * 0 value in bytesPerChecksum. This method copies the on-disk or 1223 * uncompressed data to build the HFileBlock which is used only 1224 * while writing blocks and caching. 1225 * 1226 * <p>TODO: Should there be an option where a cache can ask that hbase preserve block 1227 * checksums for checking after a block comes out of the cache? Otehrwise, cache is responsible 1228 * for blocks being wholesome (ECC memory or if file-backed, it does checksumming). 1229 */ 1230 HFileBlock getBlockForCaching(CacheConfig cacheConf) { 1231 HFileContext newContext = new HFileContextBuilder() 1232 .withBlockSize(fileContext.getBlocksize()) 1233 .withBytesPerCheckSum(0) 1234 .withChecksumType(ChecksumType.NULL) // no checksums in cached data 1235 .withCompression(fileContext.getCompression()) 1236 .withDataBlockEncoding(fileContext.getDataBlockEncoding()) 1237 .withHBaseCheckSum(fileContext.isUseHBaseChecksum()) 1238 .withCompressTags(fileContext.isCompressTags()) 1239 .withIncludesMvcc(fileContext.isIncludesMvcc()) 1240 .withIncludesTags(fileContext.isIncludesTags()) 1241 .withColumnFamily(fileContext.getColumnFamily()) 1242 .withTableName(fileContext.getTableName()) 1243 .build(); 1244 // Build the HFileBlock. 1245 HFileBlockBuilder builder = new HFileBlockBuilder(); 1246 ByteBuff buff; 1247 if (cacheConf.shouldCacheCompressed(blockType.getCategory())) { 1248 buff = cloneOnDiskBufferWithHeader(); 1249 } else { 1250 buff = cloneUncompressedBufferWithHeader(); 1251 } 1252 return builder.withBlockType(blockType) 1253 .withOnDiskSizeWithoutHeader(getOnDiskSizeWithoutHeader()) 1254 .withUncompressedSizeWithoutHeader(getUncompressedSizeWithoutHeader()) 1255 .withPrevBlockOffset(prevOffset) 1256 .withByteBuff(buff) 1257 .withFillHeader(FILL_HEADER) 1258 .withOffset(startOffset) 1259 .withNextBlockOnDiskSize(UNSET) 1260 .withOnDiskDataSizeWithHeader(onDiskBlockBytesWithHeader.size() + onDiskChecksum.length) 1261 .withHFileContext(newContext) 1262 .withByteBuffAllocator(cacheConf.getByteBuffAllocator()) 1263 .withShared(!buff.hasArray()) 1264 .build(); 1265 } 1266 } 1267 1268 /** Something that can be written into a block. */ 1269 interface BlockWritable { 1270 /** The type of block this data should use. */ 1271 BlockType getBlockType(); 1272 1273 /** 1274 * Writes the block to the provided stream. Must not write any magic 1275 * records. 1276 * 1277 * @param out a stream to write uncompressed data into 1278 */ 1279 void writeToBlock(DataOutput out) throws IOException; 1280 } 1281 1282 /** 1283 * Iterator for reading {@link HFileBlock}s in load-on-open-section, such as root data index 1284 * block, meta index block, file info block etc. 1285 */ 1286 interface BlockIterator { 1287 /** 1288 * Get the next block, or null if there are no more blocks to iterate. 1289 */ 1290 HFileBlock nextBlock() throws IOException; 1291 1292 /** 1293 * Similar to {@link #nextBlock()} but checks block type, throws an exception if incorrect, and 1294 * returns the HFile block 1295 */ 1296 HFileBlock nextBlockWithBlockType(BlockType blockType) throws IOException; 1297 1298 /** 1299 * Now we use the {@link ByteBuffAllocator} to manage the nio ByteBuffers for HFileBlocks, so we 1300 * must deallocate all of the ByteBuffers in the end life. the BlockIterator's life cycle is 1301 * starting from opening an HFileReader and stopped when the HFileReader#close, so we will keep 1302 * track all the read blocks until we call {@link BlockIterator#freeBlocks()} when closing the 1303 * HFileReader. Sum bytes of those blocks in load-on-open section should be quite small, so 1304 * tracking them should be OK. 1305 */ 1306 void freeBlocks(); 1307 } 1308 1309 /** An HFile block reader with iteration ability. */ 1310 interface FSReader { 1311 /** 1312 * Reads the block at the given offset in the file with the given on-disk size and uncompressed 1313 * size. 1314 * @param offset of the file to read 1315 * @param onDiskSize the on-disk size of the entire block, including all applicable headers, or 1316 * -1 if unknown 1317 * @param pread true to use pread, otherwise use the stream read. 1318 * @param updateMetrics update the metrics or not. 1319 * @param intoHeap allocate the block's ByteBuff by {@link ByteBuffAllocator} or JVM heap. For 1320 * LRUBlockCache, we must ensure that the block to cache is an heap one, because the 1321 * memory occupation is based on heap now, also for {@link CombinedBlockCache}, we use 1322 * the heap LRUBlockCache as L1 cache to cache small blocks such as IndexBlock or 1323 * MetaBlock for faster access. So introduce an flag here to decide whether allocate 1324 * from JVM heap or not so that we can avoid an extra off-heap to heap memory copy when 1325 * using LRUBlockCache. For most cases, we known what's the expected block type we'll 1326 * read, while for some special case (Example: HFileReaderImpl#readNextDataBlock()), we 1327 * cannot pre-decide what's the expected block type, then we can only allocate block's 1328 * ByteBuff from {@link ByteBuffAllocator} firstly, and then when caching it in 1329 * {@link LruBlockCache} we'll check whether the ByteBuff is from heap or not, if not 1330 * then we'll clone it to an heap one and cache it. 1331 * @return the newly read block 1332 */ 1333 HFileBlock readBlockData(long offset, long onDiskSize, boolean pread, boolean updateMetrics, 1334 boolean intoHeap) throws IOException; 1335 1336 /** 1337 * Creates a block iterator over the given portion of the {@link HFile}. 1338 * The iterator returns blocks starting with offset such that offset <= 1339 * startOffset < endOffset. Returned blocks are always unpacked. 1340 * Used when no hfile index available; e.g. reading in the hfile index 1341 * blocks themselves on file open. 1342 * 1343 * @param startOffset the offset of the block to start iteration with 1344 * @param endOffset the offset to end iteration at (exclusive) 1345 * @return an iterator of blocks between the two given offsets 1346 */ 1347 BlockIterator blockRange(long startOffset, long endOffset); 1348 1349 /** Closes the backing streams */ 1350 void closeStreams() throws IOException; 1351 1352 /** Get a decoder for {@link BlockType#ENCODED_DATA} blocks from this file. */ 1353 HFileBlockDecodingContext getBlockDecodingContext(); 1354 1355 /** Get the default decoder for blocks from this file. */ 1356 HFileBlockDecodingContext getDefaultBlockDecodingContext(); 1357 1358 void setIncludesMemStoreTS(boolean includesMemstoreTS); 1359 void setDataBlockEncoder(HFileDataBlockEncoder encoder); 1360 1361 /** 1362 * To close the stream's socket. Note: This can be concurrently called from multiple threads and 1363 * implementation should take care of thread safety. 1364 */ 1365 void unbufferStream(); 1366 } 1367 1368 /** 1369 * Data-structure to use caching the header of the NEXT block. Only works if next read 1370 * that comes in here is next in sequence in this block. 1371 * 1372 * When we read, we read current block and the next blocks' header. We do this so we have 1373 * the length of the next block to read if the hfile index is not available (rare, at 1374 * hfile open only). 1375 */ 1376 private static class PrefetchedHeader { 1377 long offset = -1; 1378 byte[] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE]; 1379 final ByteBuff buf = new SingleByteBuff(ByteBuffer.wrap(header, 0, header.length)); 1380 1381 @Override 1382 public String toString() { 1383 return "offset=" + this.offset + ", header=" + Bytes.toStringBinary(header); 1384 } 1385 } 1386 1387 /** 1388 * Reads version 2 HFile blocks from the filesystem. 1389 */ 1390 static class FSReaderImpl implements FSReader { 1391 /** The file system stream of the underlying {@link HFile} that 1392 * does or doesn't do checksum validations in the filesystem */ 1393 private FSDataInputStreamWrapper streamWrapper; 1394 1395 private HFileBlockDecodingContext encodedBlockDecodingCtx; 1396 1397 /** Default context used when BlockType != {@link BlockType#ENCODED_DATA}. */ 1398 private final HFileBlockDefaultDecodingContext defaultDecodingCtx; 1399 1400 /** 1401 * Cache of the NEXT header after this. Check it is indeed next blocks header 1402 * before using it. TODO: Review. This overread into next block to fetch 1403 * next blocks header seems unnecessary given we usually get the block size 1404 * from the hfile index. Review! 1405 */ 1406 private AtomicReference<PrefetchedHeader> prefetchedHeader = 1407 new AtomicReference<>(new PrefetchedHeader()); 1408 1409 /** The size of the file we are reading from, or -1 if unknown. */ 1410 private long fileSize; 1411 1412 /** The size of the header */ 1413 @VisibleForTesting 1414 protected final int hdrSize; 1415 1416 /** The filesystem used to access data */ 1417 private HFileSystem hfs; 1418 1419 private HFileContext fileContext; 1420 // Cache the fileName 1421 private String pathName; 1422 1423 private final ByteBuffAllocator allocator; 1424 1425 private final Lock streamLock = new ReentrantLock(); 1426 1427 FSReaderImpl(ReaderContext readerContext, HFileContext fileContext, 1428 ByteBuffAllocator allocator) throws IOException { 1429 this.fileSize = readerContext.getFileSize(); 1430 this.hfs = readerContext.getFileSystem(); 1431 if (readerContext.getFilePath() != null) { 1432 this.pathName = readerContext.getFilePath().toString(); 1433 } 1434 this.fileContext = fileContext; 1435 this.hdrSize = headerSize(fileContext.isUseHBaseChecksum()); 1436 this.allocator = allocator; 1437 1438 this.streamWrapper = readerContext.getInputStreamWrapper(); 1439 // Older versions of HBase didn't support checksum. 1440 this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum()); 1441 defaultDecodingCtx = new HFileBlockDefaultDecodingContext(fileContext); 1442 encodedBlockDecodingCtx = defaultDecodingCtx; 1443 } 1444 1445 @Override 1446 public BlockIterator blockRange(final long startOffset, final long endOffset) { 1447 final FSReader owner = this; // handle for inner class 1448 return new BlockIterator() { 1449 private volatile boolean freed = false; 1450 // Tracking all read blocks until we call freeBlocks. 1451 private List<HFileBlock> blockTracker = new ArrayList<>(); 1452 private long offset = startOffset; 1453 // Cache length of next block. Current block has the length of next block in it. 1454 private long length = -1; 1455 1456 @Override 1457 public HFileBlock nextBlock() throws IOException { 1458 if (offset >= endOffset) { 1459 return null; 1460 } 1461 HFileBlock b = readBlockData(offset, length, false, false, true); 1462 offset += b.getOnDiskSizeWithHeader(); 1463 length = b.getNextBlockOnDiskSize(); 1464 HFileBlock uncompressed = b.unpack(fileContext, owner); 1465 if (uncompressed != b) { 1466 b.release(); // Need to release the compressed Block now. 1467 } 1468 blockTracker.add(uncompressed); 1469 return uncompressed; 1470 } 1471 1472 @Override 1473 public HFileBlock nextBlockWithBlockType(BlockType blockType) throws IOException { 1474 HFileBlock blk = nextBlock(); 1475 if (blk.getBlockType() != blockType) { 1476 throw new IOException( 1477 "Expected block of type " + blockType + " but found " + blk.getBlockType()); 1478 } 1479 return blk; 1480 } 1481 1482 @Override 1483 public void freeBlocks() { 1484 if (freed) { 1485 return; 1486 } 1487 blockTracker.forEach(HFileBlock::release); 1488 blockTracker = null; 1489 freed = true; 1490 } 1491 }; 1492 } 1493 1494 /** 1495 * Does a positional read or a seek and read into the given byte buffer. We need take care that 1496 * we will call the {@link ByteBuff#release()} for every exit to deallocate the ByteBuffers, 1497 * otherwise the memory leak may happen. 1498 * @param dest destination buffer 1499 * @param size size of read 1500 * @param peekIntoNextBlock whether to read the next block's on-disk size 1501 * @param fileOffset position in the stream to read at 1502 * @param pread whether we should do a positional read 1503 * @param istream The input source of data 1504 * @return true to indicate the destination buffer include the next block header, otherwise only 1505 * include the current block data without the next block header. 1506 * @throws IOException if any IO error happen. 1507 */ 1508 protected boolean readAtOffset(FSDataInputStream istream, ByteBuff dest, int size, 1509 boolean peekIntoNextBlock, long fileOffset, boolean pread) throws IOException { 1510 if (!pread) { 1511 // Seek + read. Better for scanning. 1512 HFileUtil.seekOnMultipleSources(istream, fileOffset); 1513 long realOffset = istream.getPos(); 1514 if (realOffset != fileOffset) { 1515 throw new IOException("Tried to seek to " + fileOffset + " to read " + size 1516 + " bytes, but pos=" + realOffset + " after seek"); 1517 } 1518 if (!peekIntoNextBlock) { 1519 BlockIOUtils.readFully(dest, istream, size); 1520 return false; 1521 } 1522 1523 // Try to read the next block header 1524 if (!BlockIOUtils.readWithExtra(dest, istream, size, hdrSize)) { 1525 // did not read the next block header. 1526 return false; 1527 } 1528 } else { 1529 // Positional read. Better for random reads; or when the streamLock is already locked. 1530 int extraSize = peekIntoNextBlock ? hdrSize : 0; 1531 if (!BlockIOUtils.preadWithExtra(dest, istream, fileOffset, size, extraSize)) { 1532 // did not read the next block header. 1533 return false; 1534 } 1535 } 1536 assert peekIntoNextBlock; 1537 return true; 1538 } 1539 1540 /** 1541 * Reads a version 2 block (version 1 blocks not supported and not expected). Tries to do as 1542 * little memory allocation as possible, using the provided on-disk size. 1543 * @param offset the offset in the stream to read at 1544 * @param onDiskSizeWithHeaderL the on-disk size of the block, including the header, or -1 if 1545 * unknown; i.e. when iterating over blocks reading in the file metadata info. 1546 * @param pread whether to use a positional read 1547 * @param updateMetrics whether to update the metrics 1548 * @param intoHeap allocate ByteBuff of block from heap or off-heap. 1549 * @see FSReader#readBlockData(long, long, boolean, boolean, boolean) for more details about the 1550 * useHeap. 1551 */ 1552 @Override 1553 public HFileBlock readBlockData(long offset, long onDiskSizeWithHeaderL, boolean pread, 1554 boolean updateMetrics, boolean intoHeap) throws IOException { 1555 // Get a copy of the current state of whether to validate 1556 // hbase checksums or not for this read call. This is not 1557 // thread-safe but the one constaint is that if we decide 1558 // to skip hbase checksum verification then we are 1559 // guaranteed to use hdfs checksum verification. 1560 boolean doVerificationThruHBaseChecksum = streamWrapper.shouldUseHBaseChecksum(); 1561 FSDataInputStream is = streamWrapper.getStream(doVerificationThruHBaseChecksum); 1562 1563 HFileBlock blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread, 1564 doVerificationThruHBaseChecksum, updateMetrics, intoHeap); 1565 if (blk == null) { 1566 HFile.LOG.warn("HBase checksum verification failed for file " + 1567 pathName + " at offset " + 1568 offset + " filesize " + fileSize + 1569 ". Retrying read with HDFS checksums turned on..."); 1570 1571 if (!doVerificationThruHBaseChecksum) { 1572 String msg = "HBase checksum verification failed for file " + 1573 pathName + " at offset " + 1574 offset + " filesize " + fileSize + 1575 " but this cannot happen because doVerify is " + 1576 doVerificationThruHBaseChecksum; 1577 HFile.LOG.warn(msg); 1578 throw new IOException(msg); // cannot happen case here 1579 } 1580 HFile.CHECKSUM_FAILURES.increment(); // update metrics 1581 1582 // If we have a checksum failure, we fall back into a mode where 1583 // the next few reads use HDFS level checksums. We aim to make the 1584 // next CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD reads avoid 1585 // hbase checksum verification, but since this value is set without 1586 // holding any locks, it can so happen that we might actually do 1587 // a few more than precisely this number. 1588 is = this.streamWrapper.fallbackToFsChecksum(CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD); 1589 doVerificationThruHBaseChecksum = false; 1590 blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread, 1591 doVerificationThruHBaseChecksum, updateMetrics, intoHeap); 1592 if (blk != null) { 1593 HFile.LOG.warn("HDFS checksum verification succeeded for file " + 1594 pathName + " at offset " + 1595 offset + " filesize " + fileSize); 1596 } 1597 } 1598 if (blk == null && !doVerificationThruHBaseChecksum) { 1599 String msg = "readBlockData failed, possibly due to " + 1600 "checksum verification failed for file " + pathName + 1601 " at offset " + offset + " filesize " + fileSize; 1602 HFile.LOG.warn(msg); 1603 throw new IOException(msg); 1604 } 1605 1606 // If there is a checksum mismatch earlier, then retry with 1607 // HBase checksums switched off and use HDFS checksum verification. 1608 // This triggers HDFS to detect and fix corrupt replicas. The 1609 // next checksumOffCount read requests will use HDFS checksums. 1610 // The decrementing of this.checksumOffCount is not thread-safe, 1611 // but it is harmless because eventually checksumOffCount will be 1612 // a negative number. 1613 streamWrapper.checksumOk(); 1614 return blk; 1615 } 1616 1617 /** 1618 * @return Check <code>onDiskSizeWithHeaderL</code> size is healthy and then return it as an int 1619 */ 1620 private static int checkAndGetSizeAsInt(final long onDiskSizeWithHeaderL, final int hdrSize) 1621 throws IOException { 1622 if ((onDiskSizeWithHeaderL < hdrSize && onDiskSizeWithHeaderL != -1) 1623 || onDiskSizeWithHeaderL >= Integer.MAX_VALUE) { 1624 throw new IOException("Invalid onDisksize=" + onDiskSizeWithHeaderL 1625 + ": expected to be at least " + hdrSize 1626 + " and at most " + Integer.MAX_VALUE + ", or -1"); 1627 } 1628 return (int)onDiskSizeWithHeaderL; 1629 } 1630 1631 /** 1632 * Verify the passed in onDiskSizeWithHeader aligns with what is in the header else something 1633 * is not right. 1634 */ 1635 private void verifyOnDiskSizeMatchesHeader(final int passedIn, final ByteBuff headerBuf, 1636 final long offset, boolean verifyChecksum) 1637 throws IOException { 1638 // Assert size provided aligns with what is in the header 1639 int fromHeader = getOnDiskSizeWithHeader(headerBuf, verifyChecksum); 1640 if (passedIn != fromHeader) { 1641 throw new IOException("Passed in onDiskSizeWithHeader=" + passedIn + " != " + fromHeader + 1642 ", offset=" + offset + ", fileContext=" + this.fileContext); 1643 } 1644 } 1645 1646 /** 1647 * Check atomic reference cache for this block's header. Cache only good if next 1648 * read coming through is next in sequence in the block. We read next block's 1649 * header on the tail of reading the previous block to save a seek. Otherwise, 1650 * we have to do a seek to read the header before we can pull in the block OR 1651 * we have to backup the stream because we over-read (the next block's header). 1652 * @see PrefetchedHeader 1653 * @return The cached block header or null if not found. 1654 * @see #cacheNextBlockHeader(long, ByteBuff, int, int) 1655 */ 1656 private ByteBuff getCachedHeader(final long offset) { 1657 PrefetchedHeader ph = this.prefetchedHeader.get(); 1658 return ph != null && ph.offset == offset ? ph.buf : null; 1659 } 1660 1661 /** 1662 * Save away the next blocks header in atomic reference. 1663 * @see #getCachedHeader(long) 1664 * @see PrefetchedHeader 1665 */ 1666 private void cacheNextBlockHeader(final long offset, 1667 ByteBuff onDiskBlock, int onDiskSizeWithHeader, int headerLength) { 1668 PrefetchedHeader ph = new PrefetchedHeader(); 1669 ph.offset = offset; 1670 onDiskBlock.get(onDiskSizeWithHeader, ph.header, 0, headerLength); 1671 this.prefetchedHeader.set(ph); 1672 } 1673 1674 private int getNextBlockOnDiskSize(boolean readNextHeader, ByteBuff onDiskBlock, 1675 int onDiskSizeWithHeader) { 1676 int nextBlockOnDiskSize = -1; 1677 if (readNextHeader) { 1678 nextBlockOnDiskSize = 1679 onDiskBlock.getIntAfterPosition(onDiskSizeWithHeader + BlockType.MAGIC_LENGTH) 1680 + hdrSize; 1681 } 1682 return nextBlockOnDiskSize; 1683 } 1684 1685 private ByteBuff allocate(int size, boolean intoHeap) { 1686 return intoHeap ? HEAP.allocate(size) : allocator.allocate(size); 1687 } 1688 1689 /** 1690 * Reads a version 2 block. 1691 * @param offset the offset in the stream to read at. 1692 * @param onDiskSizeWithHeaderL the on-disk size of the block, including the header and 1693 * checksums if present or -1 if unknown (as a long). Can be -1 if we are doing raw 1694 * iteration of blocks as when loading up file metadata; i.e. the first read of a new 1695 * file. Usually non-null gotten from the file index. 1696 * @param pread whether to use a positional read 1697 * @param verifyChecksum Whether to use HBase checksums. If HBase checksum is switched off, then 1698 * use HDFS checksum. Can also flip on/off reading same file if we hit a troublesome 1699 * patch in an hfile. 1700 * @param updateMetrics whether need to update the metrics. 1701 * @param intoHeap allocate the ByteBuff of block from heap or off-heap. 1702 * @return the HFileBlock or null if there is a HBase checksum mismatch 1703 */ 1704 @VisibleForTesting 1705 protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset, 1706 long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum, boolean updateMetrics, 1707 boolean intoHeap) throws IOException { 1708 if (offset < 0) { 1709 throw new IOException("Invalid offset=" + offset + " trying to read " 1710 + "block (onDiskSize=" + onDiskSizeWithHeaderL + ")"); 1711 } 1712 int onDiskSizeWithHeader = checkAndGetSizeAsInt(onDiskSizeWithHeaderL, hdrSize); 1713 // Try and get cached header. Will serve us in rare case where onDiskSizeWithHeaderL is -1 1714 // and will save us having to seek the stream backwards to reread the header we 1715 // read the last time through here. 1716 ByteBuff headerBuf = getCachedHeader(offset); 1717 LOG.trace("Reading {} at offset={}, pread={}, verifyChecksum={}, cachedHeader={}, " + 1718 "onDiskSizeWithHeader={}", this.fileContext.getHFileName(), offset, pread, 1719 verifyChecksum, headerBuf, onDiskSizeWithHeader); 1720 // This is NOT same as verifyChecksum. This latter is whether to do hbase 1721 // checksums. Can change with circumstances. The below flag is whether the 1722 // file has support for checksums (version 2+). 1723 boolean checksumSupport = this.fileContext.isUseHBaseChecksum(); 1724 long startTime = System.currentTimeMillis(); 1725 if (onDiskSizeWithHeader <= 0) { 1726 // We were not passed the block size. Need to get it from the header. If header was 1727 // not cached (see getCachedHeader above), need to seek to pull it in. This is costly 1728 // and should happen very rarely. Currently happens on open of a hfile reader where we 1729 // read the trailer blocks to pull in the indices. Otherwise, we are reading block sizes 1730 // out of the hfile index. To check, enable TRACE in this file and you'll get an exception 1731 // in a LOG every time we seek. See HBASE-17072 for more detail. 1732 if (headerBuf == null) { 1733 if (LOG.isTraceEnabled()) { 1734 LOG.trace("Extra see to get block size!", new RuntimeException()); 1735 } 1736 headerBuf = HEAP.allocate(hdrSize); 1737 readAtOffset(is, headerBuf, hdrSize, false, offset, pread); 1738 headerBuf.rewind(); 1739 } 1740 onDiskSizeWithHeader = getOnDiskSizeWithHeader(headerBuf, checksumSupport); 1741 } 1742 int preReadHeaderSize = headerBuf == null? 0 : hdrSize; 1743 // Allocate enough space to fit the next block's header too; saves a seek next time through. 1744 // onDiskBlock is whole block + header + checksums then extra hdrSize to read next header; 1745 // onDiskSizeWithHeader is header, body, and any checksums if present. preReadHeaderSize 1746 // says where to start reading. If we have the header cached, then we don't need to read 1747 // it again and we can likely read from last place we left off w/o need to backup and reread 1748 // the header we read last time through here. 1749 ByteBuff onDiskBlock = this.allocate(onDiskSizeWithHeader + hdrSize, intoHeap); 1750 boolean initHFileBlockSuccess = false; 1751 try { 1752 if (headerBuf != null) { 1753 onDiskBlock.put(0, headerBuf, 0, hdrSize).position(hdrSize); 1754 } 1755 boolean readNextHeader = readAtOffset(is, onDiskBlock, 1756 onDiskSizeWithHeader - preReadHeaderSize, true, offset + preReadHeaderSize, pread); 1757 onDiskBlock.rewind(); // in case of moving position when copying a cached header 1758 int nextBlockOnDiskSize = 1759 getNextBlockOnDiskSize(readNextHeader, onDiskBlock, onDiskSizeWithHeader); 1760 if (headerBuf == null) { 1761 headerBuf = onDiskBlock.duplicate().position(0).limit(hdrSize); 1762 } 1763 // Do a few checks before we go instantiate HFileBlock. 1764 assert onDiskSizeWithHeader > this.hdrSize; 1765 verifyOnDiskSizeMatchesHeader(onDiskSizeWithHeader, headerBuf, offset, checksumSupport); 1766 ByteBuff curBlock = onDiskBlock.duplicate().position(0).limit(onDiskSizeWithHeader); 1767 // Verify checksum of the data before using it for building HFileBlock. 1768 if (verifyChecksum && !validateChecksum(offset, curBlock, hdrSize)) { 1769 return null; 1770 } 1771 long duration = System.currentTimeMillis() - startTime; 1772 if (updateMetrics) { 1773 HFile.updateReadLatency(duration, pread); 1774 } 1775 // The onDiskBlock will become the headerAndDataBuffer for this block. 1776 // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already 1777 // contains the header of next block, so no need to set next block's header in it. 1778 HFileBlock hFileBlock = createFromBuff(curBlock, checksumSupport, offset, 1779 nextBlockOnDiskSize, fileContext, intoHeap ? HEAP : allocator); 1780 // Run check on uncompressed sizings. 1781 if (!fileContext.isCompressedOrEncrypted()) { 1782 hFileBlock.sanityCheckUncompressed(); 1783 } 1784 LOG.trace("Read {} in {} ns", hFileBlock, duration); 1785 // Cache next block header if we read it for the next time through here. 1786 if (nextBlockOnDiskSize != -1) { 1787 cacheNextBlockHeader(offset + hFileBlock.getOnDiskSizeWithHeader(), onDiskBlock, 1788 onDiskSizeWithHeader, hdrSize); 1789 } 1790 initHFileBlockSuccess = true; 1791 return hFileBlock; 1792 } finally { 1793 if (!initHFileBlockSuccess) { 1794 onDiskBlock.release(); 1795 } 1796 } 1797 } 1798 1799 @Override 1800 public void setIncludesMemStoreTS(boolean includesMemstoreTS) { 1801 this.fileContext = new HFileContextBuilder(this.fileContext) 1802 .withIncludesMvcc(includesMemstoreTS).build(); 1803 } 1804 1805 @Override 1806 public void setDataBlockEncoder(HFileDataBlockEncoder encoder) { 1807 encodedBlockDecodingCtx = encoder.newDataBlockDecodingContext(this.fileContext); 1808 } 1809 1810 @Override 1811 public HFileBlockDecodingContext getBlockDecodingContext() { 1812 return this.encodedBlockDecodingCtx; 1813 } 1814 1815 @Override 1816 public HFileBlockDecodingContext getDefaultBlockDecodingContext() { 1817 return this.defaultDecodingCtx; 1818 } 1819 1820 /** 1821 * Generates the checksum for the header as well as the data and then validates it. 1822 * If the block doesn't uses checksum, returns false. 1823 * @return True if checksum matches, else false. 1824 */ 1825 private boolean validateChecksum(long offset, ByteBuff data, int hdrSize) { 1826 // If this is an older version of the block that does not have checksums, then return false 1827 // indicating that checksum verification did not succeed. Actually, this method should never 1828 // be called when the minorVersion is 0, thus this is a defensive check for a cannot-happen 1829 // case. Since this is a cannot-happen case, it is better to return false to indicate a 1830 // checksum validation failure. 1831 if (!fileContext.isUseHBaseChecksum()) { 1832 return false; 1833 } 1834 return ChecksumUtil.validateChecksum(data, pathName, offset, hdrSize); 1835 } 1836 1837 @Override 1838 public void closeStreams() throws IOException { 1839 streamWrapper.close(); 1840 } 1841 1842 @Override 1843 public void unbufferStream() { 1844 // To handle concurrent reads, ensure that no other client is accessing the streams while we 1845 // unbuffer it. 1846 if (streamLock.tryLock()) { 1847 try { 1848 this.streamWrapper.unbuffer(); 1849 } finally { 1850 streamLock.unlock(); 1851 } 1852 } 1853 } 1854 1855 @Override 1856 public String toString() { 1857 return "hfs=" + hfs + ", path=" + pathName + ", fileContext=" + fileContext; 1858 } 1859 } 1860 1861 /** An additional sanity-check in case no compression or encryption is being used. */ 1862 @VisibleForTesting 1863 void sanityCheckUncompressed() throws IOException { 1864 if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader + 1865 totalChecksumBytes()) { 1866 throw new IOException("Using no compression but " 1867 + "onDiskSizeWithoutHeader=" + onDiskSizeWithoutHeader + ", " 1868 + "uncompressedSizeWithoutHeader=" + uncompressedSizeWithoutHeader 1869 + ", numChecksumbytes=" + totalChecksumBytes()); 1870 } 1871 } 1872 1873 // Cacheable implementation 1874 @Override 1875 public int getSerializedLength() { 1876 if (buf != null) { 1877 // Include extra bytes for block metadata. 1878 return this.buf.limit() + BLOCK_METADATA_SPACE; 1879 } 1880 return 0; 1881 } 1882 1883 // Cacheable implementation 1884 @Override 1885 public void serialize(ByteBuffer destination, boolean includeNextBlockMetadata) { 1886 this.buf.get(destination, 0, getSerializedLength() - BLOCK_METADATA_SPACE); 1887 destination = addMetaData(destination, includeNextBlockMetadata); 1888 1889 // Make it ready for reading. flip sets position to zero and limit to current position which 1890 // is what we want if we do not want to serialize the block plus checksums if present plus 1891 // metadata. 1892 destination.flip(); 1893 } 1894 1895 /** 1896 * For use by bucketcache. This exposes internals. 1897 */ 1898 public ByteBuffer getMetaData() { 1899 ByteBuffer bb = ByteBuffer.allocate(BLOCK_METADATA_SPACE); 1900 bb = addMetaData(bb, true); 1901 bb.flip(); 1902 return bb; 1903 } 1904 1905 /** 1906 * Adds metadata at current position (position is moved forward). Does not flip or reset. 1907 * @return The passed <code>destination</code> with metadata added. 1908 */ 1909 private ByteBuffer addMetaData(final ByteBuffer destination, boolean includeNextBlockMetadata) { 1910 destination.put(this.fileContext.isUseHBaseChecksum() ? (byte) 1 : (byte) 0); 1911 destination.putLong(this.offset); 1912 if (includeNextBlockMetadata) { 1913 destination.putInt(this.nextBlockOnDiskSize); 1914 } 1915 return destination; 1916 } 1917 1918 // Cacheable implementation 1919 @Override 1920 public CacheableDeserializer<Cacheable> getDeserializer() { 1921 return HFileBlock.BLOCK_DESERIALIZER; 1922 } 1923 1924 @Override 1925 public int hashCode() { 1926 int result = 1; 1927 result = result * 31 + blockType.hashCode(); 1928 result = result * 31 + nextBlockOnDiskSize; 1929 result = result * 31 + (int) (offset ^ (offset >>> 32)); 1930 result = result * 31 + onDiskSizeWithoutHeader; 1931 result = result * 31 + (int) (prevBlockOffset ^ (prevBlockOffset >>> 32)); 1932 result = result * 31 + uncompressedSizeWithoutHeader; 1933 result = result * 31 + buf.hashCode(); 1934 return result; 1935 } 1936 1937 @Override 1938 public boolean equals(Object comparison) { 1939 if (this == comparison) { 1940 return true; 1941 } 1942 if (comparison == null) { 1943 return false; 1944 } 1945 if (!(comparison instanceof HFileBlock)) { 1946 return false; 1947 } 1948 1949 HFileBlock castedComparison = (HFileBlock) comparison; 1950 1951 if (castedComparison.blockType != this.blockType) { 1952 return false; 1953 } 1954 if (castedComparison.nextBlockOnDiskSize != this.nextBlockOnDiskSize) { 1955 return false; 1956 } 1957 // Offset is important. Needed when we have to remake cachekey when block is returned to cache. 1958 if (castedComparison.offset != this.offset) { 1959 return false; 1960 } 1961 if (castedComparison.onDiskSizeWithoutHeader != this.onDiskSizeWithoutHeader) { 1962 return false; 1963 } 1964 if (castedComparison.prevBlockOffset != this.prevBlockOffset) { 1965 return false; 1966 } 1967 if (castedComparison.uncompressedSizeWithoutHeader != this.uncompressedSizeWithoutHeader) { 1968 return false; 1969 } 1970 if (ByteBuff.compareTo(this.buf, 0, this.buf.limit(), castedComparison.buf, 0, 1971 castedComparison.buf.limit()) != 0) { 1972 return false; 1973 } 1974 return true; 1975 } 1976 1977 DataBlockEncoding getDataBlockEncoding() { 1978 if (blockType == BlockType.ENCODED_DATA) { 1979 return DataBlockEncoding.getEncodingById(getDataBlockEncodingId()); 1980 } 1981 return DataBlockEncoding.NONE; 1982 } 1983 1984 @VisibleForTesting 1985 byte getChecksumType() { 1986 return this.fileContext.getChecksumType().getCode(); 1987 } 1988 1989 int getBytesPerChecksum() { 1990 return this.fileContext.getBytesPerChecksum(); 1991 } 1992 1993 /** @return the size of data on disk + header. Excludes checksum. */ 1994 @VisibleForTesting 1995 int getOnDiskDataSizeWithHeader() { 1996 return this.onDiskDataSizeWithHeader; 1997 } 1998 1999 /** 2000 * Calculate the number of bytes required to store all the checksums 2001 * for this block. Each checksum value is a 4 byte integer. 2002 */ 2003 int totalChecksumBytes() { 2004 // If the hfile block has minorVersion 0, then there are no checksum 2005 // data to validate. Similarly, a zero value in this.bytesPerChecksum 2006 // indicates that cached blocks do not have checksum data because 2007 // checksums were already validated when the block was read from disk. 2008 if (!fileContext.isUseHBaseChecksum() || this.fileContext.getBytesPerChecksum() == 0) { 2009 return 0; 2010 } 2011 return (int) ChecksumUtil.numBytes(onDiskDataSizeWithHeader, 2012 this.fileContext.getBytesPerChecksum()); 2013 } 2014 2015 /** 2016 * Returns the size of this block header. 2017 */ 2018 public int headerSize() { 2019 return headerSize(this.fileContext.isUseHBaseChecksum()); 2020 } 2021 2022 /** 2023 * Maps a minor version to the size of the header. 2024 */ 2025 public static int headerSize(boolean usesHBaseChecksum) { 2026 return usesHBaseChecksum? 2027 HConstants.HFILEBLOCK_HEADER_SIZE: HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM; 2028 } 2029 2030 /** 2031 * Return the appropriate DUMMY_HEADER for the minor version 2032 */ 2033 @VisibleForTesting 2034 // TODO: Why is this in here? 2035 byte[] getDummyHeaderForVersion() { 2036 return getDummyHeaderForVersion(this.fileContext.isUseHBaseChecksum()); 2037 } 2038 2039 /** 2040 * Return the appropriate DUMMY_HEADER for the minor version 2041 */ 2042 static private byte[] getDummyHeaderForVersion(boolean usesHBaseChecksum) { 2043 return usesHBaseChecksum? HConstants.HFILEBLOCK_DUMMY_HEADER: DUMMY_HEADER_NO_CHECKSUM; 2044 } 2045 2046 /** 2047 * @return This HFileBlocks fileContext which will a derivative of the 2048 * fileContext for the file from which this block's data was originally read. 2049 */ 2050 HFileContext getHFileContext() { 2051 return this.fileContext; 2052 } 2053 2054 /** 2055 * Convert the contents of the block header into a human readable string. 2056 * This is mostly helpful for debugging. This assumes that the block 2057 * has minor version > 0. 2058 */ 2059 @VisibleForTesting 2060 static String toStringHeader(ByteBuff buf) throws IOException { 2061 byte[] magicBuf = new byte[Math.min(buf.limit() - buf.position(), BlockType.MAGIC_LENGTH)]; 2062 buf.get(magicBuf); 2063 BlockType bt = BlockType.parse(magicBuf, 0, BlockType.MAGIC_LENGTH); 2064 int compressedBlockSizeNoHeader = buf.getInt(); 2065 int uncompressedBlockSizeNoHeader = buf.getInt(); 2066 long prevBlockOffset = buf.getLong(); 2067 byte cksumtype = buf.get(); 2068 long bytesPerChecksum = buf.getInt(); 2069 long onDiskDataSizeWithHeader = buf.getInt(); 2070 return " Header dump: magic: " + Bytes.toString(magicBuf) + 2071 " blockType " + bt + 2072 " compressedBlockSizeNoHeader " + 2073 compressedBlockSizeNoHeader + 2074 " uncompressedBlockSizeNoHeader " + 2075 uncompressedBlockSizeNoHeader + 2076 " prevBlockOffset " + prevBlockOffset + 2077 " checksumType " + ChecksumType.codeToType(cksumtype) + 2078 " bytesPerChecksum " + bytesPerChecksum + 2079 " onDiskDataSizeWithHeader " + onDiskDataSizeWithHeader; 2080 } 2081 2082 private static HFileBlockBuilder createBuilder(HFileBlock blk){ 2083 return new HFileBlockBuilder() 2084 .withBlockType(blk.blockType) 2085 .withOnDiskSizeWithoutHeader(blk.onDiskSizeWithoutHeader) 2086 .withUncompressedSizeWithoutHeader(blk.uncompressedSizeWithoutHeader) 2087 .withPrevBlockOffset(blk.prevBlockOffset) 2088 .withByteBuff(blk.buf.duplicate()) // Duplicate the buffer. 2089 .withOffset(blk.offset) 2090 .withOnDiskDataSizeWithHeader(blk.onDiskDataSizeWithHeader) 2091 .withNextBlockOnDiskSize(blk.nextBlockOnDiskSize) 2092 .withHFileContext(blk.fileContext) 2093 .withByteBuffAllocator(blk.allocator) 2094 .withShared(blk.isSharedMem()); 2095 } 2096 2097 static HFileBlock shallowClone(HFileBlock blk) { 2098 return createBuilder(blk).build(); 2099 } 2100 2101 static HFileBlock deepCloneOnHeap(HFileBlock blk) { 2102 ByteBuff deepCloned = ByteBuff.wrap(ByteBuffer.wrap(blk.buf.toBytes(0, blk.buf.limit()))); 2103 return createBuilder(blk).withByteBuff(deepCloned).withShared(false).build(); 2104 } 2105}