001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP; 021import static org.apache.hadoop.hbase.io.hfile.trace.HFileContextAttributesBuilderConsumer.CONTEXT_KEY; 022 023import io.opentelemetry.api.common.Attributes; 024import io.opentelemetry.api.common.AttributesBuilder; 025import io.opentelemetry.api.trace.Span; 026import io.opentelemetry.context.Context; 027import io.opentelemetry.context.Scope; 028import java.io.DataInputStream; 029import java.io.DataOutput; 030import java.io.DataOutputStream; 031import java.io.IOException; 032import java.nio.ByteBuffer; 033import java.util.ArrayList; 034import java.util.List; 035import java.util.Optional; 036import java.util.concurrent.atomic.AtomicReference; 037import java.util.concurrent.locks.Lock; 038import java.util.concurrent.locks.ReentrantLock; 039import org.apache.hadoop.conf.Configuration; 040import org.apache.hadoop.fs.FSDataInputStream; 041import org.apache.hadoop.fs.FSDataOutputStream; 042import org.apache.hadoop.hbase.Cell; 043import org.apache.hadoop.hbase.HConstants; 044import org.apache.hadoop.hbase.fs.HFileSystem; 045import org.apache.hadoop.hbase.io.ByteArrayOutputStream; 046import org.apache.hadoop.hbase.io.ByteBuffAllocator; 047import org.apache.hadoop.hbase.io.ByteBuffInputStream; 048import org.apache.hadoop.hbase.io.ByteBufferWriterDataOutputStream; 049import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; 050import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 051import org.apache.hadoop.hbase.io.encoding.EncodingState; 052import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; 053import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext; 054import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext; 055import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext; 056import org.apache.hadoop.hbase.io.hfile.trace.HFileContextAttributesBuilderConsumer; 057import org.apache.hadoop.hbase.io.util.BlockIOUtils; 058import org.apache.hadoop.hbase.nio.ByteBuff; 059import org.apache.hadoop.hbase.nio.MultiByteBuff; 060import org.apache.hadoop.hbase.nio.SingleByteBuff; 061import org.apache.hadoop.hbase.regionserver.ShipperListener; 062import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.ReadType; 063import org.apache.hadoop.hbase.util.Bytes; 064import org.apache.hadoop.hbase.util.ChecksumType; 065import org.apache.hadoop.hbase.util.ClassSize; 066import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 067import org.apache.yetus.audience.InterfaceAudience; 068import org.slf4j.Logger; 069import org.slf4j.LoggerFactory; 070 071import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 072 073/** 074 * Cacheable Blocks of an {@link HFile} version 2 file. Version 2 was introduced in hbase-0.92.0. 075 * <p> 076 * Version 1 was the original file block. Version 2 was introduced when we changed the hbase file 077 * format to support multi-level block indexes and compound bloom filters (HBASE-3857). Support for 078 * Version 1 was removed in hbase-1.3.0. 079 * <h3>HFileBlock: Version 2</h3> In version 2, a block is structured as follows: 080 * <ul> 081 * <li><b>Header:</b> See Writer#putHeader() for where header is written; header total size is 082 * HFILEBLOCK_HEADER_SIZE 083 * <ul> 084 * <li>0. blockType: Magic record identifying the {@link BlockType} (8 bytes): e.g. 085 * <code>DATABLK*</code> 086 * <li>1. onDiskSizeWithoutHeader: Compressed -- a.k.a 'on disk' -- block size, excluding header, 087 * but including tailing checksum bytes (4 bytes) 088 * <li>2. uncompressedSizeWithoutHeader: Uncompressed block size, excluding header, and excluding 089 * checksum bytes (4 bytes) 090 * <li>3. prevBlockOffset: The offset of the previous block of the same type (8 bytes). This is used 091 * to navigate to the previous block without having to go to the block index 092 * <li>4: For minorVersions >=1, the ordinal describing checksum type (1 byte) 093 * <li>5: For minorVersions >=1, the number of data bytes/checksum chunk (4 bytes) 094 * <li>6: onDiskDataSizeWithHeader: For minorVersions >=1, the size of data 'on disk', including 095 * header, excluding checksums (4 bytes) 096 * </ul> 097 * </li> 098 * <li><b>Raw/Compressed/Encrypted/Encoded data:</b> The compression algorithm is the same for all 099 * the blocks in an {@link HFile}. If compression is NONE, this is just raw, serialized Cells. 100 * <li><b>Tail:</b> For minorVersions >=1, a series of 4 byte checksums, one each for the number 101 * of bytes specified by bytesPerChecksum. 102 * </ul> 103 * <h3>Caching</h3> Caches cache whole blocks with trailing checksums if any. We then tag on some 104 * metadata, the content of BLOCK_METADATA_SPACE which will be flag on if we are doing 'hbase' 105 * checksums and then the offset into the file which is needed when we re-make a cache key when we 106 * return the block to the cache as 'done'. See {@link Cacheable#serialize(ByteBuffer, boolean)} and 107 * {@link Cacheable#getDeserializer()}. 108 * <p> 109 * TODO: Should we cache the checksums? Down in Writer#getBlockForCaching(CacheConfig) where we make 110 * a block to cache-on-write, there is an attempt at turning off checksums. This is not the only 111 * place we get blocks to cache. We also will cache the raw return from an hdfs read. In this case, 112 * the checksums may be present. If the cache is backed by something that doesn't do ECC, say an 113 * SSD, we might want to preserve checksums. For now this is open question. 114 * <p> 115 * TODO: Over in BucketCache, we save a block allocation by doing a custom serialization. Be sure to 116 * change it if serialization changes in here. Could we add a method here that takes an IOEngine and 117 * that then serializes to it rather than expose our internals over in BucketCache? IOEngine is in 118 * the bucket subpackage. Pull it up? Then this class knows about bucketcache. Ugh. 119 */ 120@InterfaceAudience.Private 121public class HFileBlock implements Cacheable { 122 private static final Logger LOG = LoggerFactory.getLogger(HFileBlock.class); 123 public static final long FIXED_OVERHEAD = ClassSize.estimateBase(HFileBlock.class, false); 124 125 // Block Header fields. 126 127 // TODO: encapsulate Header related logic in this inner class. 128 static class Header { 129 // Format of header is: 130 // 8 bytes - block magic 131 // 4 bytes int - onDiskSizeWithoutHeader 132 // 4 bytes int - uncompressedSizeWithoutHeader 133 // 8 bytes long - prevBlockOffset 134 // The following 3 are only present if header contains checksum information 135 // 1 byte - checksum type 136 // 4 byte int - bytes per checksum 137 // 4 byte int - onDiskDataSizeWithHeader 138 static int BLOCK_MAGIC_INDEX = 0; 139 static int ON_DISK_SIZE_WITHOUT_HEADER_INDEX = 8; 140 static int UNCOMPRESSED_SIZE_WITHOUT_HEADER_INDEX = 12; 141 static int PREV_BLOCK_OFFSET_INDEX = 16; 142 static int CHECKSUM_TYPE_INDEX = 24; 143 static int BYTES_PER_CHECKSUM_INDEX = 25; 144 static int ON_DISK_DATA_SIZE_WITH_HEADER_INDEX = 29; 145 } 146 147 /** Type of block. Header field 0. */ 148 private BlockType blockType; 149 150 /** 151 * Size on disk excluding header, including checksum. Header field 1. 152 * @see Writer#putHeader(byte[], int, int, int, int) 153 */ 154 private int onDiskSizeWithoutHeader; 155 156 /** 157 * Size of pure data. Does not include header or checksums. Header field 2. 158 * @see Writer#putHeader(byte[], int, int, int, int) 159 */ 160 private int uncompressedSizeWithoutHeader; 161 162 /** 163 * The offset of the previous block on disk. Header field 3. 164 * @see Writer#putHeader(byte[], int, int, int, int) 165 */ 166 private long prevBlockOffset; 167 168 /** 169 * Size on disk of header + data. Excludes checksum. Header field 6, OR calculated from 170 * {@link #onDiskSizeWithoutHeader} when using HDFS checksum. 171 * @see Writer#putHeader(byte[], int, int, int, int) 172 */ 173 private int onDiskDataSizeWithHeader; 174 // End of Block Header fields. 175 176 /** 177 * The in-memory representation of the hfile block. Can be on or offheap. Can be backed by a 178 * single ByteBuffer or by many. Make no assumptions. 179 * <p> 180 * Be careful reading from this <code>buf</code>. Duplicate and work on the duplicate or if not, 181 * be sure to reset position and limit else trouble down the road. 182 * <p> 183 * TODO: Make this read-only once made. 184 * <p> 185 * We are using the ByteBuff type. ByteBuffer is not extensible yet we need to be able to have a 186 * ByteBuffer-like API across multiple ByteBuffers reading from a cache such as BucketCache. So, 187 * we have this ByteBuff type. Unfortunately, it is spread all about HFileBlock. Would be good if 188 * could be confined to cache-use only but hard-to-do. 189 */ 190 private ByteBuff buf; 191 192 /** 193 * Meta data that holds meta information on the hfileblock. 194 */ 195 private HFileContext fileContext; 196 197 /** 198 * The offset of this block in the file. Populated by the reader for convenience of access. This 199 * offset is not part of the block header. 200 */ 201 private long offset = UNSET; 202 203 /** 204 * The on-disk size of the next block, including the header and checksums if present. UNSET if 205 * unknown. Blocks try to carry the size of the next block to read in this data member. Usually we 206 * get block sizes from the hfile index but sometimes the index is not available: e.g. when we 207 * read the indexes themselves (indexes are stored in blocks, we do not have an index for the 208 * indexes). Saves seeks especially around file open when there is a flurry of reading in hfile 209 * metadata. 210 */ 211 private int nextBlockOnDiskSize = UNSET; 212 213 private ByteBuffAllocator allocator; 214 215 /** 216 * On a checksum failure, do these many succeeding read requests using hdfs checksums before 217 * auto-reenabling hbase checksum verification. 218 */ 219 static final int CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD = 3; 220 221 private static int UNSET = -1; 222 public static final boolean FILL_HEADER = true; 223 public static final boolean DONT_FILL_HEADER = false; 224 225 // How to get the estimate correctly? if it is a singleBB? 226 public static final int MULTI_BYTE_BUFFER_HEAP_SIZE = 227 (int) ClassSize.estimateBase(MultiByteBuff.class, false); 228 229 /** 230 * Space for metadata on a block that gets stored along with the block when we cache it. There are 231 * a few bytes stuck on the end of the HFileBlock that we pull in from HDFS. 8 bytes are for the 232 * offset of this block (long) in the file. Offset is important because is is used when we remake 233 * the CacheKey when we return block to the cache when done. There is also a flag on whether 234 * checksumming is being done by hbase or not. See class comment for note on uncertain state of 235 * checksumming of blocks that come out of cache (should we or should we not?). Finally there are 236 * 4 bytes to hold the length of the next block which can save a seek on occasion if available. 237 * (This EXTRA info came in with original commit of the bucketcache, HBASE-7404. It was formerly 238 * known as EXTRA_SERIALIZATION_SPACE). 239 */ 240 public static final int BLOCK_METADATA_SPACE = 241 Bytes.SIZEOF_BYTE + Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT; 242 243 /** 244 * Each checksum value is an integer that can be stored in 4 bytes. 245 */ 246 static final int CHECKSUM_SIZE = Bytes.SIZEOF_INT; 247 248 static final byte[] DUMMY_HEADER_NO_CHECKSUM = 249 new byte[HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM]; 250 251 /** 252 * Used deserializing blocks from Cache. <code> 253 * ++++++++++++++ 254 * + HFileBlock + 255 * ++++++++++++++ 256 * + Checksums + <= Optional 257 * ++++++++++++++ 258 * + Metadata! + <= See note on BLOCK_METADATA_SPACE above. 259 * ++++++++++++++ 260 * </code> 261 * @see #serialize(ByteBuffer, boolean) 262 */ 263 public static final CacheableDeserializer<Cacheable> BLOCK_DESERIALIZER = new BlockDeserializer(); 264 265 public static final class BlockDeserializer implements CacheableDeserializer<Cacheable> { 266 private BlockDeserializer() { 267 } 268 269 @Override 270 public HFileBlock deserialize(ByteBuff buf, ByteBuffAllocator alloc) throws IOException { 271 // The buf has the file block followed by block metadata. 272 // Set limit to just before the BLOCK_METADATA_SPACE then rewind. 273 buf.limit(buf.limit() - BLOCK_METADATA_SPACE).rewind(); 274 // Get a new buffer to pass the HFileBlock for it to 'own'. 275 ByteBuff newByteBuff = buf.slice(); 276 // Read out the BLOCK_METADATA_SPACE content and shove into our HFileBlock. 277 buf.position(buf.limit()); 278 buf.limit(buf.limit() + HFileBlock.BLOCK_METADATA_SPACE); 279 boolean usesChecksum = buf.get() == (byte) 1; 280 long offset = buf.getLong(); 281 int nextBlockOnDiskSize = buf.getInt(); 282 return createFromBuff(newByteBuff, usesChecksum, offset, nextBlockOnDiskSize, null, alloc); 283 } 284 285 @Override 286 public int getDeserializerIdentifier() { 287 return DESERIALIZER_IDENTIFIER; 288 } 289 } 290 291 private static final int DESERIALIZER_IDENTIFIER; 292 static { 293 DESERIALIZER_IDENTIFIER = 294 CacheableDeserializerIdManager.registerDeserializer(BLOCK_DESERIALIZER); 295 } 296 297 /** 298 * Creates a new {@link HFile} block from the given fields. This constructor is used only while 299 * writing blocks and caching, and is sitting in a byte buffer and we want to stuff the block into 300 * cache. See {@link Writer#getBlockForCaching(CacheConfig)}. 301 * <p> 302 * TODO: The caller presumes no checksumming 303 * <p> 304 * TODO: HFile block writer can also off-heap ? 305 * </p> 306 * required of this block instance since going into cache; checksum already verified on underlying 307 * block data pulled in from filesystem. Is that correct? What if cache is SSD? 308 * @param blockType the type of this block, see {@link BlockType} 309 * @param onDiskSizeWithoutHeader see {@link #onDiskSizeWithoutHeader} 310 * @param uncompressedSizeWithoutHeader see {@link #uncompressedSizeWithoutHeader} 311 * @param prevBlockOffset see {@link #prevBlockOffset} 312 * @param buf block buffer with header 313 * ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes) 314 * @param fillHeader when true, write the first 4 header fields into passed 315 * buffer. 316 * @param offset the file offset the block was read from 317 * @param onDiskDataSizeWithHeader see {@link #onDiskDataSizeWithHeader} 318 * @param fileContext HFile meta data 319 */ 320 public HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader, 321 int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuff buf, boolean fillHeader, 322 long offset, int nextBlockOnDiskSize, int onDiskDataSizeWithHeader, HFileContext fileContext, 323 ByteBuffAllocator allocator) { 324 this.blockType = blockType; 325 this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader; 326 this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader; 327 this.prevBlockOffset = prevBlockOffset; 328 this.offset = offset; 329 this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader; 330 this.nextBlockOnDiskSize = nextBlockOnDiskSize; 331 this.fileContext = fileContext; 332 this.allocator = allocator; 333 this.buf = buf; 334 if (fillHeader) { 335 overwriteHeader(); 336 } 337 this.buf.rewind(); 338 } 339 340 /** 341 * Creates a block from an existing buffer starting with a header. Rewinds and takes ownership of 342 * the buffer. By definition of rewind, ignores the buffer position, but if you slice the buffer 343 * beforehand, it will rewind to that point. 344 * @param buf Has header, content, and trailing checksums if present. 345 */ 346 static HFileBlock createFromBuff(ByteBuff buf, boolean usesHBaseChecksum, final long offset, 347 final int nextBlockOnDiskSize, HFileContext fileContext, ByteBuffAllocator allocator) 348 throws IOException { 349 buf.rewind(); 350 final BlockType blockType = BlockType.read(buf); 351 final int onDiskSizeWithoutHeader = buf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX); 352 final int uncompressedSizeWithoutHeader = 353 buf.getInt(Header.UNCOMPRESSED_SIZE_WITHOUT_HEADER_INDEX); 354 final long prevBlockOffset = buf.getLong(Header.PREV_BLOCK_OFFSET_INDEX); 355 // This constructor is called when we deserialize a block from cache and when we read a block in 356 // from the fs. fileCache is null when deserialized from cache so need to make up one. 357 HFileContextBuilder fileContextBuilder = 358 fileContext != null ? new HFileContextBuilder(fileContext) : new HFileContextBuilder(); 359 fileContextBuilder.withHBaseCheckSum(usesHBaseChecksum); 360 int onDiskDataSizeWithHeader; 361 if (usesHBaseChecksum) { 362 byte checksumType = buf.get(Header.CHECKSUM_TYPE_INDEX); 363 int bytesPerChecksum = buf.getInt(Header.BYTES_PER_CHECKSUM_INDEX); 364 onDiskDataSizeWithHeader = buf.getInt(Header.ON_DISK_DATA_SIZE_WITH_HEADER_INDEX); 365 // Use the checksum type and bytes per checksum from header, not from fileContext. 366 fileContextBuilder.withChecksumType(ChecksumType.codeToType(checksumType)); 367 fileContextBuilder.withBytesPerCheckSum(bytesPerChecksum); 368 } else { 369 fileContextBuilder.withChecksumType(ChecksumType.NULL); 370 fileContextBuilder.withBytesPerCheckSum(0); 371 // Need to fix onDiskDataSizeWithHeader; there are not checksums after-block-data 372 onDiskDataSizeWithHeader = onDiskSizeWithoutHeader + headerSize(usesHBaseChecksum); 373 } 374 fileContext = fileContextBuilder.build(); 375 assert usesHBaseChecksum == fileContext.isUseHBaseChecksum(); 376 return new HFileBlockBuilder().withBlockType(blockType) 377 .withOnDiskSizeWithoutHeader(onDiskSizeWithoutHeader) 378 .withUncompressedSizeWithoutHeader(uncompressedSizeWithoutHeader) 379 .withPrevBlockOffset(prevBlockOffset).withOffset(offset) 380 .withOnDiskDataSizeWithHeader(onDiskDataSizeWithHeader) 381 .withNextBlockOnDiskSize(nextBlockOnDiskSize).withHFileContext(fileContext) 382 .withByteBuffAllocator(allocator).withByteBuff(buf.rewind()).withShared(!buf.hasArray()) 383 .build(); 384 } 385 386 /** 387 * Parse total on disk size including header and checksum. 388 * @param headerBuf Header ByteBuffer. Presumed exact size of header. 389 * @param verifyChecksum true if checksum verification is in use. 390 * @return Size of the block with header included. 391 */ 392 private static int getOnDiskSizeWithHeader(final ByteBuff headerBuf, boolean verifyChecksum) { 393 return headerBuf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX) + headerSize(verifyChecksum); 394 } 395 396 /** 397 * @return the on-disk size of the next block (including the header size and any checksums if 398 * present) read by peeking into the next block's header; use as a hint when doing a read 399 * of the next block when scanning or running over a file. 400 */ 401 int getNextBlockOnDiskSize() { 402 return nextBlockOnDiskSize; 403 } 404 405 @Override 406 public BlockType getBlockType() { 407 return blockType; 408 } 409 410 @Override 411 public int refCnt() { 412 return buf.refCnt(); 413 } 414 415 @Override 416 public HFileBlock retain() { 417 buf.retain(); 418 return this; 419 } 420 421 /** 422 * Call {@link ByteBuff#release()} to decrease the reference count, if no other reference, it will 423 * return back the {@link ByteBuffer} to {@link org.apache.hadoop.hbase.io.ByteBuffAllocator} 424 */ 425 @Override 426 public boolean release() { 427 return buf.release(); 428 } 429 430 /** 431 * Calling this method in strategic locations where HFileBlocks are referenced may help diagnose 432 * potential buffer leaks. We pass the block itself as a default hint, but one can use 433 * {@link #touch(Object)} to pass their own hint as well. 434 */ 435 @Override 436 public HFileBlock touch() { 437 return touch(this); 438 } 439 440 @Override 441 public HFileBlock touch(Object hint) { 442 buf.touch(hint); 443 return this; 444 } 445 446 /** Returns get data block encoding id that was used to encode this block */ 447 short getDataBlockEncodingId() { 448 if (blockType != BlockType.ENCODED_DATA) { 449 throw new IllegalArgumentException("Querying encoder ID of a block " + "of type other than " 450 + BlockType.ENCODED_DATA + ": " + blockType); 451 } 452 return buf.getShort(headerSize()); 453 } 454 455 /** Returns the on-disk size of header + data part + checksum. */ 456 public int getOnDiskSizeWithHeader() { 457 return onDiskSizeWithoutHeader + headerSize(); 458 } 459 460 /** Returns the on-disk size of the data part + checksum (header excluded). */ 461 int getOnDiskSizeWithoutHeader() { 462 return onDiskSizeWithoutHeader; 463 } 464 465 /** Returns the uncompressed size of data part (header and checksum excluded). */ 466 int getUncompressedSizeWithoutHeader() { 467 return uncompressedSizeWithoutHeader; 468 } 469 470 /** Returns the offset of the previous block of the same type in the file, or -1 if unknown */ 471 long getPrevBlockOffset() { 472 return prevBlockOffset; 473 } 474 475 /** 476 * Rewinds {@code buf} and writes first 4 header fields. {@code buf} position is modified as 477 * side-effect. 478 */ 479 private void overwriteHeader() { 480 buf.rewind(); 481 blockType.write(buf); 482 buf.putInt(onDiskSizeWithoutHeader); 483 buf.putInt(uncompressedSizeWithoutHeader); 484 buf.putLong(prevBlockOffset); 485 if (this.fileContext.isUseHBaseChecksum()) { 486 buf.put(fileContext.getChecksumType().getCode()); 487 buf.putInt(fileContext.getBytesPerChecksum()); 488 buf.putInt(onDiskDataSizeWithHeader); 489 } 490 } 491 492 /** 493 * Returns a buffer that does not include the header and checksum. 494 * @return the buffer with header skipped and checksum omitted. 495 */ 496 public ByteBuff getBufferWithoutHeader() { 497 ByteBuff dup = getBufferReadOnly(); 498 return dup.position(headerSize()).slice(); 499 } 500 501 /** 502 * Returns a read-only duplicate of the buffer this block stores internally ready to be read. 503 * Clients must not modify the buffer object though they may set position and limit on the 504 * returned buffer since we pass back a duplicate. This method has to be public because it is used 505 * in {@link CompoundBloomFilter} to avoid object creation on every Bloom filter lookup, but has 506 * to be used with caution. Buffer holds header, block content, and any follow-on checksums if 507 * present. 508 * @return the buffer of this block for read-only operations 509 */ 510 public ByteBuff getBufferReadOnly() { 511 // TODO: ByteBuf does not support asReadOnlyBuffer(). Fix. 512 ByteBuff dup = this.buf.duplicate(); 513 assert dup.position() == 0; 514 return dup; 515 } 516 517 public ByteBuffAllocator getByteBuffAllocator() { 518 return this.allocator; 519 } 520 521 private void sanityCheckAssertion(long valueFromBuf, long valueFromField, String fieldName) 522 throws IOException { 523 if (valueFromBuf != valueFromField) { 524 throw new AssertionError(fieldName + " in the buffer (" + valueFromBuf 525 + ") is different from that in the field (" + valueFromField + ")"); 526 } 527 } 528 529 private void sanityCheckAssertion(BlockType valueFromBuf, BlockType valueFromField) 530 throws IOException { 531 if (valueFromBuf != valueFromField) { 532 throw new IOException("Block type stored in the buffer: " + valueFromBuf 533 + ", block type field: " + valueFromField); 534 } 535 } 536 537 /** 538 * Checks if the block is internally consistent, i.e. the first 539 * {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the buffer contain a valid header consistent 540 * with the fields. Assumes a packed block structure. This function is primary for testing and 541 * debugging, and is not thread-safe, because it alters the internal buffer pointer. Used by tests 542 * only. 543 */ 544 void sanityCheck() throws IOException { 545 // Duplicate so no side-effects 546 ByteBuff dup = this.buf.duplicate().rewind(); 547 sanityCheckAssertion(BlockType.read(dup), blockType); 548 549 sanityCheckAssertion(dup.getInt(), onDiskSizeWithoutHeader, "onDiskSizeWithoutHeader"); 550 551 sanityCheckAssertion(dup.getInt(), uncompressedSizeWithoutHeader, 552 "uncompressedSizeWithoutHeader"); 553 554 sanityCheckAssertion(dup.getLong(), prevBlockOffset, "prevBlockOffset"); 555 if (this.fileContext.isUseHBaseChecksum()) { 556 sanityCheckAssertion(dup.get(), this.fileContext.getChecksumType().getCode(), "checksumType"); 557 sanityCheckAssertion(dup.getInt(), this.fileContext.getBytesPerChecksum(), 558 "bytesPerChecksum"); 559 sanityCheckAssertion(dup.getInt(), onDiskDataSizeWithHeader, "onDiskDataSizeWithHeader"); 560 } 561 562 if (dup.limit() != onDiskDataSizeWithHeader) { 563 throw new AssertionError( 564 "Expected limit " + onDiskDataSizeWithHeader + ", got " + dup.limit()); 565 } 566 567 // We might optionally allocate HFILEBLOCK_HEADER_SIZE more bytes to read the next 568 // block's header, so there are two sensible values for buffer capacity. 569 int hdrSize = headerSize(); 570 dup.rewind(); 571 if ( 572 dup.remaining() != onDiskDataSizeWithHeader 573 && dup.remaining() != onDiskDataSizeWithHeader + hdrSize 574 ) { 575 throw new AssertionError("Invalid buffer capacity: " + dup.remaining() + ", expected " 576 + onDiskDataSizeWithHeader + " or " + (onDiskDataSizeWithHeader + hdrSize)); 577 } 578 } 579 580 @Override 581 public String toString() { 582 StringBuilder sb = new StringBuilder().append("[").append("blockType=").append(blockType) 583 .append(", fileOffset=").append(offset).append(", headerSize=").append(headerSize()) 584 .append(", onDiskSizeWithoutHeader=").append(onDiskSizeWithoutHeader) 585 .append(", uncompressedSizeWithoutHeader=").append(uncompressedSizeWithoutHeader) 586 .append(", prevBlockOffset=").append(prevBlockOffset).append(", isUseHBaseChecksum=") 587 .append(fileContext.isUseHBaseChecksum()); 588 if (fileContext.isUseHBaseChecksum()) { 589 sb.append(", checksumType=").append(ChecksumType.codeToType(this.buf.get(24))) 590 .append(", bytesPerChecksum=").append(this.buf.getInt(24 + 1)) 591 .append(", onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader); 592 } else { 593 sb.append(", onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader).append("(") 594 .append(onDiskSizeWithoutHeader).append("+") 595 .append(HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM).append(")"); 596 } 597 String dataBegin; 598 if (buf.hasArray()) { 599 dataBegin = Bytes.toStringBinary(buf.array(), buf.arrayOffset() + headerSize(), 600 Math.min(32, buf.limit() - buf.arrayOffset() - headerSize())); 601 } else { 602 ByteBuff bufWithoutHeader = getBufferWithoutHeader(); 603 byte[] dataBeginBytes = 604 new byte[Math.min(32, bufWithoutHeader.limit() - bufWithoutHeader.position())]; 605 bufWithoutHeader.get(dataBeginBytes); 606 dataBegin = Bytes.toStringBinary(dataBeginBytes); 607 } 608 sb.append(", getOnDiskSizeWithHeader=").append(getOnDiskSizeWithHeader()) 609 .append(", totalChecksumBytes=").append(totalChecksumBytes()).append(", isUnpacked=") 610 .append(isUnpacked()).append(", buf=[").append(buf).append("]").append(", dataBeginsWith=") 611 .append(dataBegin).append(", fileContext=").append(fileContext) 612 .append(", nextBlockOnDiskSize=").append(nextBlockOnDiskSize).append("]"); 613 return sb.toString(); 614 } 615 616 /** 617 * Retrieves the decompressed/decrypted view of this block. An encoded block remains in its 618 * encoded structure. Internal structures are shared between instances where applicable. 619 */ 620 HFileBlock unpack(HFileContext fileContext, FSReader reader) throws IOException { 621 if (!fileContext.isCompressedOrEncrypted()) { 622 // TODO: cannot use our own fileContext here because HFileBlock(ByteBuffer, boolean), 623 // which is used for block serialization to L2 cache, does not preserve encoding and 624 // encryption details. 625 return this; 626 } 627 628 ByteBuff newBuf = allocateBufferForUnpacking(); // allocates space for the decompressed block 629 HFileBlock unpacked = shallowClone(this, newBuf); 630 631 boolean succ = false; 632 final Context context = 633 Context.current().with(CONTEXT_KEY, new HFileContextAttributesBuilderConsumer(fileContext)); 634 try (Scope ignored = context.makeCurrent()) { 635 HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA 636 ? reader.getBlockDecodingContext() 637 : reader.getDefaultBlockDecodingContext(); 638 // Create a duplicated buffer without the header part. 639 int headerSize = this.headerSize(); 640 ByteBuff dup = this.buf.duplicate(); 641 dup.position(headerSize); 642 dup = dup.slice(); 643 // Decode the dup into unpacked#buf 644 ctx.prepareDecoding(unpacked.getOnDiskDataSizeWithHeader() - headerSize, 645 unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(), dup); 646 succ = true; 647 return unpacked; 648 } finally { 649 if (!succ) { 650 unpacked.release(); 651 } 652 } 653 } 654 655 /** 656 * Always allocates a new buffer of the correct size. Copies header bytes from the existing 657 * buffer. Does not change header fields. Reserve room to keep checksum bytes too. 658 */ 659 private ByteBuff allocateBufferForUnpacking() { 660 int headerSize = headerSize(); 661 int capacityNeeded = headerSize + uncompressedSizeWithoutHeader; 662 663 ByteBuff source = buf.duplicate(); 664 ByteBuff newBuf = allocator.allocate(capacityNeeded); 665 666 // Copy header bytes into newBuf. 667 source.position(0); 668 newBuf.put(0, source, 0, headerSize); 669 670 // set limit to exclude next block's header 671 newBuf.limit(capacityNeeded); 672 return newBuf; 673 } 674 675 /** 676 * Return true when this block's buffer has been unpacked, false otherwise. Note this is a 677 * calculated heuristic, not tracked attribute of the block. 678 */ 679 public boolean isUnpacked() { 680 final int headerSize = headerSize(); 681 final int expectedCapacity = headerSize + uncompressedSizeWithoutHeader; 682 final int bufCapacity = buf.remaining(); 683 return bufCapacity == expectedCapacity || bufCapacity == expectedCapacity + headerSize; 684 } 685 686 /** 687 * Cannot be {@link #UNSET}. Must be a legitimate value. Used re-making the {@link BlockCacheKey} 688 * when block is returned to the cache. 689 * @return the offset of this block in the file it was read from 690 */ 691 long getOffset() { 692 if (offset < 0) { 693 throw new IllegalStateException("HFile block offset not initialized properly"); 694 } 695 return offset; 696 } 697 698 /** Returns a byte stream reading the data + checksum of this block */ 699 DataInputStream getByteStream() { 700 ByteBuff dup = this.buf.duplicate(); 701 dup.position(this.headerSize()); 702 return new DataInputStream(new ByteBuffInputStream(dup)); 703 } 704 705 @Override 706 public long heapSize() { 707 long size = FIXED_OVERHEAD; 708 size += fileContext.heapSize(); 709 if (buf != null) { 710 // Deep overhead of the byte buffer. Needs to be aligned separately. 711 size += ClassSize.align(buf.capacity() + MULTI_BYTE_BUFFER_HEAP_SIZE); 712 } 713 return ClassSize.align(size); 714 } 715 716 /** 717 * Will be override by {@link SharedMemHFileBlock} or {@link ExclusiveMemHFileBlock}. Return true 718 * by default. 719 */ 720 public boolean isSharedMem() { 721 return true; 722 } 723 724 /** 725 * Unified version 2 {@link HFile} block writer. The intended usage pattern is as follows: 726 * <ol> 727 * <li>Construct an {@link HFileBlock.Writer}, providing a compression algorithm. 728 * <li>Call {@link Writer#startWriting} and get a data stream to write to. 729 * <li>Write your data into the stream. 730 * <li>Call Writer#writeHeaderAndData(FSDataOutputStream) as many times as you need to. store the 731 * serialized block into an external stream. 732 * <li>Repeat to write more blocks. 733 * </ol> 734 * <p> 735 */ 736 static class Writer implements ShipperListener { 737 private enum State { 738 INIT, 739 WRITING, 740 BLOCK_READY 741 }; 742 743 /** Writer state. Used to ensure the correct usage protocol. */ 744 private State state = State.INIT; 745 746 /** Data block encoder used for data blocks */ 747 private final HFileDataBlockEncoder dataBlockEncoder; 748 749 private HFileBlockEncodingContext dataBlockEncodingCtx; 750 751 /** block encoding context for non-data blocks */ 752 private HFileBlockDefaultEncodingContext defaultBlockEncodingCtx; 753 754 /** 755 * The stream we use to accumulate data into a block in an uncompressed format. We reset this 756 * stream at the end of each block and reuse it. The header is written as the first 757 * {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes into this stream. 758 */ 759 private ByteArrayOutputStream baosInMemory; 760 761 /** 762 * Current block type. Set in {@link #startWriting(BlockType)}. Could be changed in 763 * {@link #finishBlock()} from {@link BlockType#DATA} to {@link BlockType#ENCODED_DATA}. 764 */ 765 private BlockType blockType; 766 767 /** 768 * A stream that we write uncompressed bytes to, which compresses them and writes them to 769 * {@link #baosInMemory}. 770 */ 771 private DataOutputStream userDataStream; 772 773 /** 774 * Bytes to be written to the file system, including the header. Compressed if compression is 775 * turned on. It also includes the checksum data that immediately follows the block data. 776 * (header + data + checksums) 777 */ 778 private ByteArrayOutputStream onDiskBlockBytesWithHeader; 779 780 /** 781 * The size of the checksum data on disk. It is used only if data is not compressed. If data is 782 * compressed, then the checksums are already part of onDiskBytesWithHeader. If data is 783 * uncompressed, then this variable stores the checksum data for this block. 784 */ 785 private byte[] onDiskChecksum = HConstants.EMPTY_BYTE_ARRAY; 786 787 /** 788 * Current block's start offset in the {@link HFile}. Set in 789 * {@link #writeHeaderAndData(FSDataOutputStream)}. 790 */ 791 private long startOffset; 792 793 /** 794 * Offset of previous block by block type. Updated when the next block is started. 795 */ 796 private long[] prevOffsetByType; 797 798 /** The offset of the previous block of the same type */ 799 private long prevOffset; 800 /** Meta data that holds information about the hfileblock **/ 801 private HFileContext fileContext; 802 803 private final ByteBuffAllocator allocator; 804 805 @Override 806 public void beforeShipped() { 807 if (getEncodingState() != null) { 808 getEncodingState().beforeShipped(); 809 } 810 } 811 812 EncodingState getEncodingState() { 813 return dataBlockEncodingCtx.getEncodingState(); 814 } 815 816 /** 817 * @param dataBlockEncoder data block encoding algorithm to use 818 */ 819 public Writer(Configuration conf, HFileDataBlockEncoder dataBlockEncoder, 820 HFileContext fileContext) { 821 this(conf, dataBlockEncoder, fileContext, ByteBuffAllocator.HEAP); 822 } 823 824 public Writer(Configuration conf, HFileDataBlockEncoder dataBlockEncoder, 825 HFileContext fileContext, ByteBuffAllocator allocator) { 826 if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) { 827 throw new RuntimeException("Unsupported value of bytesPerChecksum. " + " Minimum is " 828 + HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is " 829 + fileContext.getBytesPerChecksum()); 830 } 831 this.allocator = allocator; 832 this.dataBlockEncoder = 833 dataBlockEncoder != null ? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE; 834 this.dataBlockEncodingCtx = this.dataBlockEncoder.newDataBlockEncodingContext(conf, 835 HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext); 836 // TODO: This should be lazily instantiated 837 this.defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(conf, null, 838 HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext); 839 // TODO: Set BAOS initial size. Use fileContext.getBlocksize() and add for header/checksum 840 baosInMemory = new ByteArrayOutputStream(); 841 prevOffsetByType = new long[BlockType.values().length]; 842 for (int i = 0; i < prevOffsetByType.length; ++i) { 843 prevOffsetByType[i] = UNSET; 844 } 845 // TODO: Why fileContext saved away when we have dataBlockEncoder and/or 846 // defaultDataBlockEncoder? 847 this.fileContext = fileContext; 848 } 849 850 /** 851 * Starts writing into the block. The previous block's data is discarded. 852 * @return the stream the user can write their data into 853 */ 854 DataOutputStream startWriting(BlockType newBlockType) throws IOException { 855 if (state == State.BLOCK_READY && startOffset != -1) { 856 // We had a previous block that was written to a stream at a specific 857 // offset. Save that offset as the last offset of a block of that type. 858 prevOffsetByType[blockType.getId()] = startOffset; 859 } 860 861 startOffset = -1; 862 blockType = newBlockType; 863 864 baosInMemory.reset(); 865 baosInMemory.write(HConstants.HFILEBLOCK_DUMMY_HEADER); 866 867 state = State.WRITING; 868 869 // We will compress it later in finishBlock() 870 userDataStream = new ByteBufferWriterDataOutputStream(baosInMemory); 871 if (newBlockType == BlockType.DATA) { 872 this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream); 873 } 874 return userDataStream; 875 } 876 877 /** 878 * Writes the Cell to this block 879 */ 880 void write(Cell cell) throws IOException { 881 expectState(State.WRITING); 882 this.dataBlockEncoder.encode(cell, dataBlockEncodingCtx, this.userDataStream); 883 } 884 885 /** 886 * Transitions the block writer from the "writing" state to the "block ready" state. Does 887 * nothing if a block is already finished. 888 */ 889 void ensureBlockReady() throws IOException { 890 Preconditions.checkState(state != State.INIT, "Unexpected state: " + state); 891 892 if (state == State.BLOCK_READY) { 893 return; 894 } 895 896 // This will set state to BLOCK_READY. 897 finishBlock(); 898 } 899 900 /** 901 * Finish up writing of the block. Flushes the compressing stream (if using compression), fills 902 * out the header, does any compression/encryption of bytes to flush out to disk, and manages 903 * the cache on write content, if applicable. Sets block write state to "block ready". 904 */ 905 private void finishBlock() throws IOException { 906 if (blockType == BlockType.DATA) { 907 this.dataBlockEncoder.endBlockEncoding(dataBlockEncodingCtx, userDataStream, 908 baosInMemory.getBuffer(), blockType); 909 blockType = dataBlockEncodingCtx.getBlockType(); 910 } 911 userDataStream.flush(); 912 prevOffset = prevOffsetByType[blockType.getId()]; 913 914 // We need to set state before we can package the block up for cache-on-write. In a way, the 915 // block is ready, but not yet encoded or compressed. 916 state = State.BLOCK_READY; 917 Bytes compressAndEncryptDat; 918 if (blockType == BlockType.DATA || blockType == BlockType.ENCODED_DATA) { 919 compressAndEncryptDat = 920 dataBlockEncodingCtx.compressAndEncrypt(baosInMemory.getBuffer(), 0, baosInMemory.size()); 921 } else { 922 compressAndEncryptDat = defaultBlockEncodingCtx.compressAndEncrypt(baosInMemory.getBuffer(), 923 0, baosInMemory.size()); 924 } 925 if (compressAndEncryptDat == null) { 926 compressAndEncryptDat = new Bytes(baosInMemory.getBuffer(), 0, baosInMemory.size()); 927 } 928 if (onDiskBlockBytesWithHeader == null) { 929 onDiskBlockBytesWithHeader = new ByteArrayOutputStream(compressAndEncryptDat.getLength()); 930 } 931 onDiskBlockBytesWithHeader.reset(); 932 onDiskBlockBytesWithHeader.write(compressAndEncryptDat.get(), 933 compressAndEncryptDat.getOffset(), compressAndEncryptDat.getLength()); 934 // Calculate how many bytes we need for checksum on the tail of the block. 935 int numBytes = (int) ChecksumUtil.numBytes(onDiskBlockBytesWithHeader.size(), 936 fileContext.getBytesPerChecksum()); 937 938 // Put the header for the on disk bytes; header currently is unfilled-out 939 putHeader(onDiskBlockBytesWithHeader, onDiskBlockBytesWithHeader.size() + numBytes, 940 baosInMemory.size(), onDiskBlockBytesWithHeader.size()); 941 if (onDiskChecksum.length != numBytes) { 942 onDiskChecksum = new byte[numBytes]; 943 } 944 ChecksumUtil.generateChecksums(onDiskBlockBytesWithHeader.getBuffer(), 0, 945 onDiskBlockBytesWithHeader.size(), onDiskChecksum, 0, fileContext.getChecksumType(), 946 fileContext.getBytesPerChecksum()); 947 } 948 949 /** 950 * Put the header into the given byte array at the given offset. 951 * @param onDiskSize size of the block on disk header + data + checksum 952 * @param uncompressedSize size of the block after decompression (but before optional data block 953 * decoding) including header 954 * @param onDiskDataSize size of the block on disk with header and data but not including the 955 * checksums 956 */ 957 private void putHeader(byte[] dest, int offset, int onDiskSize, int uncompressedSize, 958 int onDiskDataSize) { 959 offset = blockType.put(dest, offset); 960 offset = Bytes.putInt(dest, offset, onDiskSize - HConstants.HFILEBLOCK_HEADER_SIZE); 961 offset = Bytes.putInt(dest, offset, uncompressedSize - HConstants.HFILEBLOCK_HEADER_SIZE); 962 offset = Bytes.putLong(dest, offset, prevOffset); 963 offset = Bytes.putByte(dest, offset, fileContext.getChecksumType().getCode()); 964 offset = Bytes.putInt(dest, offset, fileContext.getBytesPerChecksum()); 965 Bytes.putInt(dest, offset, onDiskDataSize); 966 } 967 968 private void putHeader(ByteBuff buff, int onDiskSize, int uncompressedSize, 969 int onDiskDataSize) { 970 buff.rewind(); 971 blockType.write(buff); 972 buff.putInt(onDiskSize - HConstants.HFILEBLOCK_HEADER_SIZE); 973 buff.putInt(uncompressedSize - HConstants.HFILEBLOCK_HEADER_SIZE); 974 buff.putLong(prevOffset); 975 buff.put(fileContext.getChecksumType().getCode()); 976 buff.putInt(fileContext.getBytesPerChecksum()); 977 buff.putInt(onDiskDataSize); 978 } 979 980 private void putHeader(ByteArrayOutputStream dest, int onDiskSize, int uncompressedSize, 981 int onDiskDataSize) { 982 putHeader(dest.getBuffer(), 0, onDiskSize, uncompressedSize, onDiskDataSize); 983 } 984 985 /** 986 * Similar to {@link #writeHeaderAndData(FSDataOutputStream)}, but records the offset of this 987 * block so that it can be referenced in the next block of the same type. 988 */ 989 void writeHeaderAndData(FSDataOutputStream out) throws IOException { 990 long offset = out.getPos(); 991 if (startOffset != UNSET && offset != startOffset) { 992 throw new IOException("A " + blockType + " block written to a " 993 + "stream twice, first at offset " + startOffset + ", then at " + offset); 994 } 995 startOffset = offset; 996 finishBlockAndWriteHeaderAndData(out); 997 } 998 999 /** 1000 * Writes the header and the compressed data of this block (or uncompressed data when not using 1001 * compression) into the given stream. Can be called in the "writing" state or in the "block 1002 * ready" state. If called in the "writing" state, transitions the writer to the "block ready" 1003 * state. 1004 * @param out the output stream to write the 1005 */ 1006 protected void finishBlockAndWriteHeaderAndData(DataOutputStream out) throws IOException { 1007 ensureBlockReady(); 1008 long startTime = EnvironmentEdgeManager.currentTime(); 1009 out.write(onDiskBlockBytesWithHeader.getBuffer(), 0, onDiskBlockBytesWithHeader.size()); 1010 out.write(onDiskChecksum); 1011 HFile.updateWriteLatency(EnvironmentEdgeManager.currentTime() - startTime); 1012 } 1013 1014 /** 1015 * Returns the header or the compressed data (or uncompressed data when not using compression) 1016 * as a byte array. Can be called in the "writing" state or in the "block ready" state. If 1017 * called in the "writing" state, transitions the writer to the "block ready" state. This 1018 * returns the header + data + checksums stored on disk. 1019 * @return header and data as they would be stored on disk in a byte array 1020 */ 1021 byte[] getHeaderAndDataForTest() throws IOException { 1022 ensureBlockReady(); 1023 // This is not very optimal, because we are doing an extra copy. 1024 // But this method is used only by unit tests. 1025 byte[] output = new byte[onDiskBlockBytesWithHeader.size() + onDiskChecksum.length]; 1026 System.arraycopy(onDiskBlockBytesWithHeader.getBuffer(), 0, output, 0, 1027 onDiskBlockBytesWithHeader.size()); 1028 System.arraycopy(onDiskChecksum, 0, output, onDiskBlockBytesWithHeader.size(), 1029 onDiskChecksum.length); 1030 return output; 1031 } 1032 1033 /** 1034 * Releases resources used by this writer. 1035 */ 1036 void release() { 1037 if (dataBlockEncodingCtx != null) { 1038 dataBlockEncodingCtx.close(); 1039 dataBlockEncodingCtx = null; 1040 } 1041 if (defaultBlockEncodingCtx != null) { 1042 defaultBlockEncodingCtx.close(); 1043 defaultBlockEncodingCtx = null; 1044 } 1045 } 1046 1047 /** 1048 * Returns the on-disk size of the data portion of the block. This is the compressed size if 1049 * compression is enabled. Can only be called in the "block ready" state. Header is not 1050 * compressed, and its size is not included in the return value. 1051 * @return the on-disk size of the block, not including the header. 1052 */ 1053 int getOnDiskSizeWithoutHeader() { 1054 expectState(State.BLOCK_READY); 1055 return onDiskBlockBytesWithHeader.size() + onDiskChecksum.length 1056 - HConstants.HFILEBLOCK_HEADER_SIZE; 1057 } 1058 1059 /** 1060 * Returns the on-disk size of the block. Can only be called in the "block ready" state. 1061 * @return the on-disk size of the block ready to be written, including the header size, the 1062 * data and the checksum data. 1063 */ 1064 int getOnDiskSizeWithHeader() { 1065 expectState(State.BLOCK_READY); 1066 return onDiskBlockBytesWithHeader.size() + onDiskChecksum.length; 1067 } 1068 1069 /** 1070 * The uncompressed size of the block data. Does not include header size. 1071 */ 1072 int getUncompressedSizeWithoutHeader() { 1073 expectState(State.BLOCK_READY); 1074 return baosInMemory.size() - HConstants.HFILEBLOCK_HEADER_SIZE; 1075 } 1076 1077 /** 1078 * The uncompressed size of the block data, including header size. 1079 */ 1080 int getUncompressedSizeWithHeader() { 1081 expectState(State.BLOCK_READY); 1082 return baosInMemory.size(); 1083 } 1084 1085 /** Returns true if a block is being written */ 1086 boolean isWriting() { 1087 return state == State.WRITING; 1088 } 1089 1090 /** 1091 * Returns the number of bytes written into the current block so far, or zero if not writing the 1092 * block at the moment. Note that this will return zero in the "block ready" state as well. 1093 * @return the number of bytes written 1094 */ 1095 public int encodedBlockSizeWritten() { 1096 return state != State.WRITING ? 0 : this.getEncodingState().getEncodedDataSizeWritten(); 1097 } 1098 1099 /** 1100 * Returns the number of bytes written into the current block so far, or zero if not writing the 1101 * block at the moment. Note that this will return zero in the "block ready" state as well. 1102 * @return the number of bytes written 1103 */ 1104 int blockSizeWritten() { 1105 return state != State.WRITING ? 0 : this.getEncodingState().getUnencodedDataSizeWritten(); 1106 } 1107 1108 /** 1109 * Clones the header followed by the uncompressed data, even if using compression. This is 1110 * needed for storing uncompressed blocks in the block cache. Can be called in the "writing" 1111 * state or the "block ready" state. Returns only the header and data, does not include checksum 1112 * data. 1113 * @return Returns an uncompressed block ByteBuff for caching on write 1114 */ 1115 ByteBuff cloneUncompressedBufferWithHeader() { 1116 expectState(State.BLOCK_READY); 1117 ByteBuff bytebuff = allocator.allocate(baosInMemory.size()); 1118 baosInMemory.toByteBuff(bytebuff); 1119 int numBytes = (int) ChecksumUtil.numBytes(onDiskBlockBytesWithHeader.size(), 1120 fileContext.getBytesPerChecksum()); 1121 putHeader(bytebuff, onDiskBlockBytesWithHeader.size() + numBytes, baosInMemory.size(), 1122 onDiskBlockBytesWithHeader.size()); 1123 bytebuff.rewind(); 1124 return bytebuff; 1125 } 1126 1127 /** 1128 * Clones the header followed by the on-disk (compressed/encoded/encrypted) data. This is needed 1129 * for storing packed blocks in the block cache. Returns only the header and data, Does not 1130 * include checksum data. 1131 * @return Returns a copy of block bytes for caching on write 1132 */ 1133 private ByteBuff cloneOnDiskBufferWithHeader() { 1134 expectState(State.BLOCK_READY); 1135 ByteBuff bytebuff = allocator.allocate(onDiskBlockBytesWithHeader.size()); 1136 onDiskBlockBytesWithHeader.toByteBuff(bytebuff); 1137 bytebuff.rewind(); 1138 return bytebuff; 1139 } 1140 1141 private void expectState(State expectedState) { 1142 if (state != expectedState) { 1143 throw new IllegalStateException( 1144 "Expected state: " + expectedState + ", actual state: " + state); 1145 } 1146 } 1147 1148 /** 1149 * Takes the given {@link BlockWritable} instance, creates a new block of its appropriate type, 1150 * writes the writable into this block, and flushes the block into the output stream. The writer 1151 * is instructed not to buffer uncompressed bytes for cache-on-write. 1152 * @param bw the block-writable object to write as a block 1153 * @param out the file system output stream 1154 */ 1155 void writeBlock(BlockWritable bw, FSDataOutputStream out) throws IOException { 1156 bw.writeToBlock(startWriting(bw.getBlockType())); 1157 writeHeaderAndData(out); 1158 } 1159 1160 /** 1161 * Creates a new HFileBlock. Checksums have already been validated, so the byte buffer passed 1162 * into the constructor of this newly created block does not have checksum data even though the 1163 * header minor version is MINOR_VERSION_WITH_CHECKSUM. This is indicated by setting a 0 value 1164 * in bytesPerChecksum. This method copies the on-disk or uncompressed data to build the 1165 * HFileBlock which is used only while writing blocks and caching. 1166 * <p> 1167 * TODO: Should there be an option where a cache can ask that hbase preserve block checksums for 1168 * checking after a block comes out of the cache? Otehrwise, cache is responsible for blocks 1169 * being wholesome (ECC memory or if file-backed, it does checksumming). 1170 */ 1171 HFileBlock getBlockForCaching(CacheConfig cacheConf) { 1172 HFileContext newContext = new HFileContextBuilder().withBlockSize(fileContext.getBlocksize()) 1173 .withBytesPerCheckSum(0).withChecksumType(ChecksumType.NULL) // no checksums in cached data 1174 .withCompression(fileContext.getCompression()) 1175 .withDataBlockEncoding(fileContext.getDataBlockEncoding()) 1176 .withHBaseCheckSum(fileContext.isUseHBaseChecksum()) 1177 .withCompressTags(fileContext.isCompressTags()) 1178 .withIncludesMvcc(fileContext.isIncludesMvcc()) 1179 .withIncludesTags(fileContext.isIncludesTags()) 1180 .withColumnFamily(fileContext.getColumnFamily()).withTableName(fileContext.getTableName()) 1181 .build(); 1182 // Build the HFileBlock. 1183 HFileBlockBuilder builder = new HFileBlockBuilder(); 1184 ByteBuff buff; 1185 if (cacheConf.shouldCacheCompressed(blockType.getCategory())) { 1186 buff = cloneOnDiskBufferWithHeader(); 1187 } else { 1188 buff = cloneUncompressedBufferWithHeader(); 1189 } 1190 return builder.withBlockType(blockType) 1191 .withOnDiskSizeWithoutHeader(getOnDiskSizeWithoutHeader()) 1192 .withUncompressedSizeWithoutHeader(getUncompressedSizeWithoutHeader()) 1193 .withPrevBlockOffset(prevOffset).withByteBuff(buff).withFillHeader(FILL_HEADER) 1194 .withOffset(startOffset).withNextBlockOnDiskSize(UNSET) 1195 .withOnDiskDataSizeWithHeader(onDiskBlockBytesWithHeader.size() + onDiskChecksum.length) 1196 .withHFileContext(newContext).withByteBuffAllocator(cacheConf.getByteBuffAllocator()) 1197 .withShared(!buff.hasArray()).build(); 1198 } 1199 } 1200 1201 /** Something that can be written into a block. */ 1202 interface BlockWritable { 1203 /** The type of block this data should use. */ 1204 BlockType getBlockType(); 1205 1206 /** 1207 * Writes the block to the provided stream. Must not write any magic records. 1208 * @param out a stream to write uncompressed data into 1209 */ 1210 void writeToBlock(DataOutput out) throws IOException; 1211 } 1212 1213 /** 1214 * Iterator for reading {@link HFileBlock}s in load-on-open-section, such as root data index 1215 * block, meta index block, file info block etc. 1216 */ 1217 interface BlockIterator { 1218 /** 1219 * Get the next block, or null if there are no more blocks to iterate. 1220 */ 1221 HFileBlock nextBlock() throws IOException; 1222 1223 /** 1224 * Similar to {@link #nextBlock()} but checks block type, throws an exception if incorrect, and 1225 * returns the HFile block 1226 */ 1227 HFileBlock nextBlockWithBlockType(BlockType blockType) throws IOException; 1228 1229 /** 1230 * Now we use the {@link ByteBuffAllocator} to manage the nio ByteBuffers for HFileBlocks, so we 1231 * must deallocate all of the ByteBuffers in the end life. the BlockIterator's life cycle is 1232 * starting from opening an HFileReader and stopped when the HFileReader#close, so we will keep 1233 * track all the read blocks until we call {@link BlockIterator#freeBlocks()} when closing the 1234 * HFileReader. Sum bytes of those blocks in load-on-open section should be quite small, so 1235 * tracking them should be OK. 1236 */ 1237 void freeBlocks(); 1238 } 1239 1240 /** An HFile block reader with iteration ability. */ 1241 interface FSReader { 1242 /** 1243 * Reads the block at the given offset in the file with the given on-disk size and uncompressed 1244 * size. 1245 * @param offset of the file to read 1246 * @param onDiskSize the on-disk size of the entire block, including all applicable headers, 1247 * or -1 if unknown 1248 * @param pread true to use pread, otherwise use the stream read. 1249 * @param updateMetrics update the metrics or not. 1250 * @param intoHeap allocate the block's ByteBuff by {@link ByteBuffAllocator} or JVM heap. 1251 * For LRUBlockCache, we must ensure that the block to cache is an heap 1252 * one, because the memory occupation is based on heap now, also for 1253 * {@link CombinedBlockCache}, we use the heap LRUBlockCache as L1 cache to 1254 * cache small blocks such as IndexBlock or MetaBlock for faster access. So 1255 * introduce an flag here to decide whether allocate from JVM heap or not 1256 * so that we can avoid an extra off-heap to heap memory copy when using 1257 * LRUBlockCache. For most cases, we known what's the expected block type 1258 * we'll read, while for some special case (Example: 1259 * HFileReaderImpl#readNextDataBlock()), we cannot pre-decide what's the 1260 * expected block type, then we can only allocate block's ByteBuff from 1261 * {@link ByteBuffAllocator} firstly, and then when caching it in 1262 * {@link LruBlockCache} we'll check whether the ByteBuff is from heap or 1263 * not, if not then we'll clone it to an heap one and cache it. 1264 * @return the newly read block 1265 */ 1266 HFileBlock readBlockData(long offset, long onDiskSize, boolean pread, boolean updateMetrics, 1267 boolean intoHeap) throws IOException; 1268 1269 /** 1270 * Creates a block iterator over the given portion of the {@link HFile}. The iterator returns 1271 * blocks starting with offset such that offset <= startOffset < endOffset. Returned 1272 * blocks are always unpacked. Used when no hfile index available; e.g. reading in the hfile 1273 * index blocks themselves on file open. 1274 * @param startOffset the offset of the block to start iteration with 1275 * @param endOffset the offset to end iteration at (exclusive) 1276 * @return an iterator of blocks between the two given offsets 1277 */ 1278 BlockIterator blockRange(long startOffset, long endOffset); 1279 1280 /** Closes the backing streams */ 1281 void closeStreams() throws IOException; 1282 1283 /** Get a decoder for {@link BlockType#ENCODED_DATA} blocks from this file. */ 1284 HFileBlockDecodingContext getBlockDecodingContext(); 1285 1286 /** Get the default decoder for blocks from this file. */ 1287 HFileBlockDecodingContext getDefaultBlockDecodingContext(); 1288 1289 void setIncludesMemStoreTS(boolean includesMemstoreTS); 1290 1291 void setDataBlockEncoder(HFileDataBlockEncoder encoder, Configuration conf); 1292 1293 /** 1294 * To close the stream's socket. Note: This can be concurrently called from multiple threads and 1295 * implementation should take care of thread safety. 1296 */ 1297 void unbufferStream(); 1298 } 1299 1300 /** 1301 * Data-structure to use caching the header of the NEXT block. Only works if next read that comes 1302 * in here is next in sequence in this block. When we read, we read current block and the next 1303 * blocks' header. We do this so we have the length of the next block to read if the hfile index 1304 * is not available (rare, at hfile open only). 1305 */ 1306 private static class PrefetchedHeader { 1307 long offset = -1; 1308 byte[] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE]; 1309 final ByteBuff buf = new SingleByteBuff(ByteBuffer.wrap(header, 0, header.length)); 1310 1311 @Override 1312 public String toString() { 1313 return "offset=" + this.offset + ", header=" + Bytes.toStringBinary(header); 1314 } 1315 } 1316 1317 /** 1318 * Reads version 2 HFile blocks from the filesystem. 1319 */ 1320 static class FSReaderImpl implements FSReader { 1321 /** 1322 * The file system stream of the underlying {@link HFile} that does or doesn't do checksum 1323 * validations in the filesystem 1324 */ 1325 private FSDataInputStreamWrapper streamWrapper; 1326 1327 private HFileBlockDecodingContext encodedBlockDecodingCtx; 1328 1329 /** Default context used when BlockType != {@link BlockType#ENCODED_DATA}. */ 1330 private final HFileBlockDefaultDecodingContext defaultDecodingCtx; 1331 1332 /** 1333 * Cache of the NEXT header after this. Check it is indeed next blocks header before using it. 1334 * TODO: Review. This overread into next block to fetch next blocks header seems unnecessary 1335 * given we usually get the block size from the hfile index. Review! 1336 */ 1337 private AtomicReference<PrefetchedHeader> prefetchedHeader = 1338 new AtomicReference<>(new PrefetchedHeader()); 1339 1340 /** The size of the file we are reading from, or -1 if unknown. */ 1341 private long fileSize; 1342 1343 /** The size of the header */ 1344 protected final int hdrSize; 1345 1346 /** The filesystem used to access data */ 1347 private HFileSystem hfs; 1348 1349 private HFileContext fileContext; 1350 // Cache the fileName 1351 private String pathName; 1352 1353 private final ByteBuffAllocator allocator; 1354 1355 private final Lock streamLock = new ReentrantLock(); 1356 1357 private final boolean isPreadAllBytes; 1358 1359 FSReaderImpl(ReaderContext readerContext, HFileContext fileContext, ByteBuffAllocator allocator, 1360 Configuration conf) throws IOException { 1361 this.fileSize = readerContext.getFileSize(); 1362 this.hfs = readerContext.getFileSystem(); 1363 if (readerContext.getFilePath() != null) { 1364 this.pathName = readerContext.getFilePath().toString(); 1365 } 1366 this.fileContext = fileContext; 1367 this.hdrSize = headerSize(fileContext.isUseHBaseChecksum()); 1368 this.allocator = allocator; 1369 1370 this.streamWrapper = readerContext.getInputStreamWrapper(); 1371 // Older versions of HBase didn't support checksum. 1372 this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum()); 1373 defaultDecodingCtx = new HFileBlockDefaultDecodingContext(conf, fileContext); 1374 encodedBlockDecodingCtx = defaultDecodingCtx; 1375 isPreadAllBytes = readerContext.isPreadAllBytes(); 1376 } 1377 1378 @Override 1379 public BlockIterator blockRange(final long startOffset, final long endOffset) { 1380 final FSReader owner = this; // handle for inner class 1381 return new BlockIterator() { 1382 private volatile boolean freed = false; 1383 // Tracking all read blocks until we call freeBlocks. 1384 private List<HFileBlock> blockTracker = new ArrayList<>(); 1385 private long offset = startOffset; 1386 // Cache length of next block. Current block has the length of next block in it. 1387 private long length = -1; 1388 1389 @Override 1390 public HFileBlock nextBlock() throws IOException { 1391 if (offset >= endOffset) { 1392 return null; 1393 } 1394 HFileBlock b = readBlockData(offset, length, false, false, true); 1395 offset += b.getOnDiskSizeWithHeader(); 1396 length = b.getNextBlockOnDiskSize(); 1397 HFileBlock uncompressed = b.unpack(fileContext, owner); 1398 if (uncompressed != b) { 1399 b.release(); // Need to release the compressed Block now. 1400 } 1401 blockTracker.add(uncompressed); 1402 return uncompressed; 1403 } 1404 1405 @Override 1406 public HFileBlock nextBlockWithBlockType(BlockType blockType) throws IOException { 1407 HFileBlock blk = nextBlock(); 1408 if (blk.getBlockType() != blockType) { 1409 throw new IOException( 1410 "Expected block of type " + blockType + " but found " + blk.getBlockType()); 1411 } 1412 return blk; 1413 } 1414 1415 @Override 1416 public void freeBlocks() { 1417 if (freed) { 1418 return; 1419 } 1420 blockTracker.forEach(HFileBlock::release); 1421 blockTracker = null; 1422 freed = true; 1423 } 1424 }; 1425 } 1426 1427 /** 1428 * Does a positional read or a seek and read into the given byte buffer. We need take care that 1429 * we will call the {@link ByteBuff#release()} for every exit to deallocate the ByteBuffers, 1430 * otherwise the memory leak may happen. 1431 * @param dest destination buffer 1432 * @param size size of read 1433 * @param peekIntoNextBlock whether to read the next block's on-disk size 1434 * @param fileOffset position in the stream to read at 1435 * @param pread whether we should do a positional read 1436 * @param istream The input source of data 1437 * @return true to indicate the destination buffer include the next block header, otherwise only 1438 * include the current block data without the next block header. 1439 * @throws IOException if any IO error happen. 1440 */ 1441 protected boolean readAtOffset(FSDataInputStream istream, ByteBuff dest, int size, 1442 boolean peekIntoNextBlock, long fileOffset, boolean pread) throws IOException { 1443 if (!pread) { 1444 // Seek + read. Better for scanning. 1445 HFileUtil.seekOnMultipleSources(istream, fileOffset); 1446 long realOffset = istream.getPos(); 1447 if (realOffset != fileOffset) { 1448 throw new IOException("Tried to seek to " + fileOffset + " to read " + size 1449 + " bytes, but pos=" + realOffset + " after seek"); 1450 } 1451 if (!peekIntoNextBlock) { 1452 BlockIOUtils.readFully(dest, istream, size); 1453 return false; 1454 } 1455 1456 // Try to read the next block header 1457 if (!BlockIOUtils.readWithExtra(dest, istream, size, hdrSize)) { 1458 // did not read the next block header. 1459 return false; 1460 } 1461 } else { 1462 // Positional read. Better for random reads; or when the streamLock is already locked. 1463 int extraSize = peekIntoNextBlock ? hdrSize : 0; 1464 if ( 1465 !BlockIOUtils.preadWithExtra(dest, istream, fileOffset, size, extraSize, isPreadAllBytes) 1466 ) { 1467 // did not read the next block header. 1468 return false; 1469 } 1470 } 1471 assert peekIntoNextBlock; 1472 return true; 1473 } 1474 1475 /** 1476 * Reads a version 2 block (version 1 blocks not supported and not expected). Tries to do as 1477 * little memory allocation as possible, using the provided on-disk size. 1478 * @param offset the offset in the stream to read at 1479 * @param onDiskSizeWithHeaderL the on-disk size of the block, including the header, or -1 if 1480 * unknown; i.e. when iterating over blocks reading in the file 1481 * metadata info. 1482 * @param pread whether to use a positional read 1483 * @param updateMetrics whether to update the metrics 1484 * @param intoHeap allocate ByteBuff of block from heap or off-heap. 1485 * @see FSReader#readBlockData(long, long, boolean, boolean, boolean) for more details about the 1486 * useHeap. 1487 */ 1488 @Override 1489 public HFileBlock readBlockData(long offset, long onDiskSizeWithHeaderL, boolean pread, 1490 boolean updateMetrics, boolean intoHeap) throws IOException { 1491 // Get a copy of the current state of whether to validate 1492 // hbase checksums or not for this read call. This is not 1493 // thread-safe but the one constraint is that if we decide 1494 // to skip hbase checksum verification then we are 1495 // guaranteed to use hdfs checksum verification. 1496 boolean doVerificationThruHBaseChecksum = streamWrapper.shouldUseHBaseChecksum(); 1497 FSDataInputStream is = streamWrapper.getStream(doVerificationThruHBaseChecksum); 1498 final Context context = Context.current().with(CONTEXT_KEY, 1499 new HFileContextAttributesBuilderConsumer(fileContext) 1500 .setSkipChecksum(doVerificationThruHBaseChecksum) 1501 .setReadType(pread ? ReadType.POSITIONAL_READ : ReadType.SEEK_PLUS_READ)); 1502 try (Scope ignored = context.makeCurrent()) { 1503 HFileBlock blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread, 1504 doVerificationThruHBaseChecksum, updateMetrics, intoHeap); 1505 if (blk == null) { 1506 HFile.LOG.warn("HBase checksum verification failed for file {} at offset {} filesize {}." 1507 + " Retrying read with HDFS checksums turned on...", pathName, offset, fileSize); 1508 1509 if (!doVerificationThruHBaseChecksum) { 1510 String msg = "HBase checksum verification failed for file " + pathName + " at offset " 1511 + offset + " filesize " + fileSize + " but this cannot happen because doVerify is " 1512 + doVerificationThruHBaseChecksum; 1513 HFile.LOG.warn(msg); 1514 throw new IOException(msg); // cannot happen case here 1515 } 1516 HFile.CHECKSUM_FAILURES.increment(); // update metrics 1517 1518 // If we have a checksum failure, we fall back into a mode where 1519 // the next few reads use HDFS level checksums. We aim to make the 1520 // next CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD reads avoid 1521 // hbase checksum verification, but since this value is set without 1522 // holding any locks, it can so happen that we might actually do 1523 // a few more than precisely this number. 1524 is = this.streamWrapper.fallbackToFsChecksum(CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD); 1525 doVerificationThruHBaseChecksum = false; 1526 blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread, 1527 doVerificationThruHBaseChecksum, updateMetrics, intoHeap); 1528 if (blk != null) { 1529 HFile.LOG.warn( 1530 "HDFS checksum verification succeeded for file {} at offset {} filesize" + " {}", 1531 pathName, offset, fileSize); 1532 } 1533 } 1534 if (blk == null && !doVerificationThruHBaseChecksum) { 1535 String msg = 1536 "readBlockData failed, possibly due to " + "checksum verification failed for file " 1537 + pathName + " at offset " + offset + " filesize " + fileSize; 1538 HFile.LOG.warn(msg); 1539 throw new IOException(msg); 1540 } 1541 1542 // If there is a checksum mismatch earlier, then retry with 1543 // HBase checksums switched off and use HDFS checksum verification. 1544 // This triggers HDFS to detect and fix corrupt replicas. The 1545 // next checksumOffCount read requests will use HDFS checksums. 1546 // The decrementing of this.checksumOffCount is not thread-safe, 1547 // but it is harmless because eventually checksumOffCount will be 1548 // a negative number. 1549 streamWrapper.checksumOk(); 1550 return blk; 1551 } 1552 } 1553 1554 /** 1555 * Returns Check <code>onDiskSizeWithHeaderL</code> size is healthy and then return it as an int 1556 */ 1557 private static int checkAndGetSizeAsInt(final long onDiskSizeWithHeaderL, final int hdrSize) 1558 throws IOException { 1559 if ( 1560 (onDiskSizeWithHeaderL < hdrSize && onDiskSizeWithHeaderL != -1) 1561 || onDiskSizeWithHeaderL >= Integer.MAX_VALUE 1562 ) { 1563 throw new IOException( 1564 "Invalid onDisksize=" + onDiskSizeWithHeaderL + ": expected to be at least " + hdrSize 1565 + " and at most " + Integer.MAX_VALUE + ", or -1"); 1566 } 1567 return (int) onDiskSizeWithHeaderL; 1568 } 1569 1570 /** 1571 * Verify the passed in onDiskSizeWithHeader aligns with what is in the header else something is 1572 * not right. 1573 */ 1574 private void verifyOnDiskSizeMatchesHeader(final int passedIn, final ByteBuff headerBuf, 1575 final long offset, boolean verifyChecksum) throws IOException { 1576 // Assert size provided aligns with what is in the header 1577 int fromHeader = getOnDiskSizeWithHeader(headerBuf, verifyChecksum); 1578 if (passedIn != fromHeader) { 1579 throw new IOException("Passed in onDiskSizeWithHeader=" + passedIn + " != " + fromHeader 1580 + ", offset=" + offset + ", fileContext=" + this.fileContext); 1581 } 1582 } 1583 1584 /** 1585 * Check atomic reference cache for this block's header. Cache only good if next read coming 1586 * through is next in sequence in the block. We read next block's header on the tail of reading 1587 * the previous block to save a seek. Otherwise, we have to do a seek to read the header before 1588 * we can pull in the block OR we have to backup the stream because we over-read (the next 1589 * block's header). 1590 * @see PrefetchedHeader 1591 * @return The cached block header or null if not found. 1592 * @see #cacheNextBlockHeader(long, ByteBuff, int, int) 1593 */ 1594 private ByteBuff getCachedHeader(final long offset) { 1595 PrefetchedHeader ph = this.prefetchedHeader.get(); 1596 return ph != null && ph.offset == offset ? ph.buf : null; 1597 } 1598 1599 /** 1600 * Save away the next blocks header in atomic reference. 1601 * @see #getCachedHeader(long) 1602 * @see PrefetchedHeader 1603 */ 1604 private void cacheNextBlockHeader(final long offset, ByteBuff onDiskBlock, 1605 int onDiskSizeWithHeader, int headerLength) { 1606 PrefetchedHeader ph = new PrefetchedHeader(); 1607 ph.offset = offset; 1608 onDiskBlock.get(onDiskSizeWithHeader, ph.header, 0, headerLength); 1609 this.prefetchedHeader.set(ph); 1610 } 1611 1612 private int getNextBlockOnDiskSize(boolean readNextHeader, ByteBuff onDiskBlock, 1613 int onDiskSizeWithHeader) { 1614 int nextBlockOnDiskSize = -1; 1615 if (readNextHeader) { 1616 nextBlockOnDiskSize = 1617 onDiskBlock.getIntAfterPosition(onDiskSizeWithHeader + BlockType.MAGIC_LENGTH) + hdrSize; 1618 } 1619 return nextBlockOnDiskSize; 1620 } 1621 1622 private ByteBuff allocate(int size, boolean intoHeap) { 1623 return intoHeap ? HEAP.allocate(size) : allocator.allocate(size); 1624 } 1625 1626 /** 1627 * Reads a version 2 block. 1628 * @param offset the offset in the stream to read at. 1629 * @param onDiskSizeWithHeaderL the on-disk size of the block, including the header and 1630 * checksums if present or -1 if unknown (as a long). Can be -1 if 1631 * we are doing raw iteration of blocks as when loading up file 1632 * metadata; i.e. the first read of a new file. Usually non-null 1633 * gotten from the file index. 1634 * @param pread whether to use a positional read 1635 * @param verifyChecksum Whether to use HBase checksums. If HBase checksum is switched 1636 * off, then use HDFS checksum. Can also flip on/off reading same 1637 * file if we hit a troublesome patch in an hfile. 1638 * @param updateMetrics whether need to update the metrics. 1639 * @param intoHeap allocate the ByteBuff of block from heap or off-heap. 1640 * @return the HFileBlock or null if there is a HBase checksum mismatch 1641 */ 1642 protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset, 1643 long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum, boolean updateMetrics, 1644 boolean intoHeap) throws IOException { 1645 if (offset < 0) { 1646 throw new IOException("Invalid offset=" + offset + " trying to read " + "block (onDiskSize=" 1647 + onDiskSizeWithHeaderL + ")"); 1648 } 1649 1650 final Span span = Span.current(); 1651 final AttributesBuilder attributesBuilder = Attributes.builder(); 1652 Optional.of(Context.current()).map(val -> val.get(CONTEXT_KEY)) 1653 .ifPresent(c -> c.accept(attributesBuilder)); 1654 int onDiskSizeWithHeader = checkAndGetSizeAsInt(onDiskSizeWithHeaderL, hdrSize); 1655 // Try and get cached header. Will serve us in rare case where onDiskSizeWithHeaderL is -1 1656 // and will save us having to seek the stream backwards to reread the header we 1657 // read the last time through here. 1658 ByteBuff headerBuf = getCachedHeader(offset); 1659 LOG.trace( 1660 "Reading {} at offset={}, pread={}, verifyChecksum={}, cachedHeader={}, " 1661 + "onDiskSizeWithHeader={}", 1662 this.fileContext.getHFileName(), offset, pread, verifyChecksum, headerBuf, 1663 onDiskSizeWithHeader); 1664 // This is NOT same as verifyChecksum. This latter is whether to do hbase 1665 // checksums. Can change with circumstances. The below flag is whether the 1666 // file has support for checksums (version 2+). 1667 boolean checksumSupport = this.fileContext.isUseHBaseChecksum(); 1668 long startTime = EnvironmentEdgeManager.currentTime(); 1669 if (onDiskSizeWithHeader <= 0) { 1670 // We were not passed the block size. Need to get it from the header. If header was 1671 // not cached (see getCachedHeader above), need to seek to pull it in. This is costly 1672 // and should happen very rarely. Currently happens on open of a hfile reader where we 1673 // read the trailer blocks to pull in the indices. Otherwise, we are reading block sizes 1674 // out of the hfile index. To check, enable TRACE in this file and you'll get an exception 1675 // in a LOG every time we seek. See HBASE-17072 for more detail. 1676 if (headerBuf == null) { 1677 if (LOG.isTraceEnabled()) { 1678 LOG.trace("Extra seek to get block size!", new RuntimeException()); 1679 } 1680 span.addEvent("Extra seek to get block size!", attributesBuilder.build()); 1681 headerBuf = HEAP.allocate(hdrSize); 1682 readAtOffset(is, headerBuf, hdrSize, false, offset, pread); 1683 headerBuf.rewind(); 1684 } 1685 onDiskSizeWithHeader = getOnDiskSizeWithHeader(headerBuf, checksumSupport); 1686 } 1687 int preReadHeaderSize = headerBuf == null ? 0 : hdrSize; 1688 // Allocate enough space to fit the next block's header too; saves a seek next time through. 1689 // onDiskBlock is whole block + header + checksums then extra hdrSize to read next header; 1690 // onDiskSizeWithHeader is header, body, and any checksums if present. preReadHeaderSize 1691 // says where to start reading. If we have the header cached, then we don't need to read 1692 // it again and we can likely read from last place we left off w/o need to backup and reread 1693 // the header we read last time through here. 1694 ByteBuff onDiskBlock = this.allocate(onDiskSizeWithHeader + hdrSize, intoHeap); 1695 boolean initHFileBlockSuccess = false; 1696 try { 1697 if (headerBuf != null) { 1698 onDiskBlock.put(0, headerBuf, 0, hdrSize).position(hdrSize); 1699 } 1700 boolean readNextHeader = readAtOffset(is, onDiskBlock, 1701 onDiskSizeWithHeader - preReadHeaderSize, true, offset + preReadHeaderSize, pread); 1702 onDiskBlock.rewind(); // in case of moving position when copying a cached header 1703 int nextBlockOnDiskSize = 1704 getNextBlockOnDiskSize(readNextHeader, onDiskBlock, onDiskSizeWithHeader); 1705 if (headerBuf == null) { 1706 headerBuf = onDiskBlock.duplicate().position(0).limit(hdrSize); 1707 } 1708 // Do a few checks before we go instantiate HFileBlock. 1709 assert onDiskSizeWithHeader > this.hdrSize; 1710 verifyOnDiskSizeMatchesHeader(onDiskSizeWithHeader, headerBuf, offset, checksumSupport); 1711 ByteBuff curBlock = onDiskBlock.duplicate().position(0).limit(onDiskSizeWithHeader); 1712 // Verify checksum of the data before using it for building HFileBlock. 1713 if (verifyChecksum && !validateChecksum(offset, curBlock, hdrSize)) { 1714 return null; 1715 } 1716 // remove checksum from buffer now that it's verified 1717 int sizeWithoutChecksum = curBlock.getInt(Header.ON_DISK_DATA_SIZE_WITH_HEADER_INDEX); 1718 curBlock.limit(sizeWithoutChecksum); 1719 long duration = EnvironmentEdgeManager.currentTime() - startTime; 1720 if (updateMetrics) { 1721 HFile.updateReadLatency(duration, pread); 1722 } 1723 // The onDiskBlock will become the headerAndDataBuffer for this block. 1724 // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already 1725 // contains the header of next block, so no need to set next block's header in it. 1726 HFileBlock hFileBlock = createFromBuff(curBlock, checksumSupport, offset, 1727 nextBlockOnDiskSize, fileContext, intoHeap ? HEAP : allocator); 1728 // Run check on uncompressed sizings. 1729 if (!fileContext.isCompressedOrEncrypted()) { 1730 hFileBlock.sanityCheckUncompressed(); 1731 } 1732 LOG.trace("Read {} in {} ms", hFileBlock, duration); 1733 span.addEvent("Read block", attributesBuilder.build()); 1734 // Cache next block header if we read it for the next time through here. 1735 if (nextBlockOnDiskSize != -1) { 1736 cacheNextBlockHeader(offset + hFileBlock.getOnDiskSizeWithHeader(), onDiskBlock, 1737 onDiskSizeWithHeader, hdrSize); 1738 } 1739 initHFileBlockSuccess = true; 1740 return hFileBlock; 1741 } finally { 1742 if (!initHFileBlockSuccess) { 1743 onDiskBlock.release(); 1744 } 1745 } 1746 } 1747 1748 @Override 1749 public void setIncludesMemStoreTS(boolean includesMemstoreTS) { 1750 this.fileContext = 1751 new HFileContextBuilder(this.fileContext).withIncludesMvcc(includesMemstoreTS).build(); 1752 } 1753 1754 @Override 1755 public void setDataBlockEncoder(HFileDataBlockEncoder encoder, Configuration conf) { 1756 encodedBlockDecodingCtx = encoder.newDataBlockDecodingContext(conf, fileContext); 1757 } 1758 1759 @Override 1760 public HFileBlockDecodingContext getBlockDecodingContext() { 1761 return this.encodedBlockDecodingCtx; 1762 } 1763 1764 @Override 1765 public HFileBlockDecodingContext getDefaultBlockDecodingContext() { 1766 return this.defaultDecodingCtx; 1767 } 1768 1769 /** 1770 * Generates the checksum for the header as well as the data and then validates it. If the block 1771 * doesn't uses checksum, returns false. 1772 * @return True if checksum matches, else false. 1773 */ 1774 private boolean validateChecksum(long offset, ByteBuff data, int hdrSize) { 1775 // If this is an older version of the block that does not have checksums, then return false 1776 // indicating that checksum verification did not succeed. Actually, this method should never 1777 // be called when the minorVersion is 0, thus this is a defensive check for a cannot-happen 1778 // case. Since this is a cannot-happen case, it is better to return false to indicate a 1779 // checksum validation failure. 1780 if (!fileContext.isUseHBaseChecksum()) { 1781 return false; 1782 } 1783 return ChecksumUtil.validateChecksum(data, pathName, offset, hdrSize); 1784 } 1785 1786 @Override 1787 public void closeStreams() throws IOException { 1788 streamWrapper.close(); 1789 } 1790 1791 @Override 1792 public void unbufferStream() { 1793 // To handle concurrent reads, ensure that no other client is accessing the streams while we 1794 // unbuffer it. 1795 if (streamLock.tryLock()) { 1796 try { 1797 this.streamWrapper.unbuffer(); 1798 } finally { 1799 streamLock.unlock(); 1800 } 1801 } 1802 } 1803 1804 @Override 1805 public String toString() { 1806 return "hfs=" + hfs + ", path=" + pathName + ", fileContext=" + fileContext; 1807 } 1808 } 1809 1810 /** An additional sanity-check in case no compression or encryption is being used. */ 1811 void sanityCheckUncompressed() throws IOException { 1812 if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader + totalChecksumBytes()) { 1813 throw new IOException("Using no compression but " + "onDiskSizeWithoutHeader=" 1814 + onDiskSizeWithoutHeader + ", " + "uncompressedSizeWithoutHeader=" 1815 + uncompressedSizeWithoutHeader + ", numChecksumbytes=" + totalChecksumBytes()); 1816 } 1817 } 1818 1819 // Cacheable implementation 1820 @Override 1821 public int getSerializedLength() { 1822 if (buf != null) { 1823 // Include extra bytes for block metadata. 1824 return this.buf.limit() + BLOCK_METADATA_SPACE; 1825 } 1826 return 0; 1827 } 1828 1829 // Cacheable implementation 1830 @Override 1831 public void serialize(ByteBuffer destination, boolean includeNextBlockMetadata) { 1832 this.buf.get(destination, 0, getSerializedLength() - BLOCK_METADATA_SPACE); 1833 destination = addMetaData(destination, includeNextBlockMetadata); 1834 1835 // Make it ready for reading. flip sets position to zero and limit to current position which 1836 // is what we want if we do not want to serialize the block plus checksums if present plus 1837 // metadata. 1838 destination.flip(); 1839 } 1840 1841 /** 1842 * For use by bucketcache. This exposes internals. 1843 */ 1844 public ByteBuffer getMetaData(ByteBuffer bb) { 1845 bb = addMetaData(bb, true); 1846 bb.flip(); 1847 return bb; 1848 } 1849 1850 /** 1851 * Adds metadata at current position (position is moved forward). Does not flip or reset. 1852 * @return The passed <code>destination</code> with metadata added. 1853 */ 1854 private ByteBuffer addMetaData(final ByteBuffer destination, boolean includeNextBlockMetadata) { 1855 destination.put(this.fileContext.isUseHBaseChecksum() ? (byte) 1 : (byte) 0); 1856 destination.putLong(this.offset); 1857 if (includeNextBlockMetadata) { 1858 destination.putInt(this.nextBlockOnDiskSize); 1859 } 1860 return destination; 1861 } 1862 1863 // Cacheable implementation 1864 @Override 1865 public CacheableDeserializer<Cacheable> getDeserializer() { 1866 return HFileBlock.BLOCK_DESERIALIZER; 1867 } 1868 1869 @Override 1870 public int hashCode() { 1871 int result = 1; 1872 result = result * 31 + blockType.hashCode(); 1873 result = result * 31 + nextBlockOnDiskSize; 1874 result = result * 31 + (int) (offset ^ (offset >>> 32)); 1875 result = result * 31 + onDiskSizeWithoutHeader; 1876 result = result * 31 + (int) (prevBlockOffset ^ (prevBlockOffset >>> 32)); 1877 result = result * 31 + uncompressedSizeWithoutHeader; 1878 result = result * 31 + buf.hashCode(); 1879 return result; 1880 } 1881 1882 @Override 1883 public boolean equals(Object comparison) { 1884 if (this == comparison) { 1885 return true; 1886 } 1887 if (comparison == null) { 1888 return false; 1889 } 1890 if (!(comparison instanceof HFileBlock)) { 1891 return false; 1892 } 1893 1894 HFileBlock castedComparison = (HFileBlock) comparison; 1895 1896 if (castedComparison.blockType != this.blockType) { 1897 return false; 1898 } 1899 if (castedComparison.nextBlockOnDiskSize != this.nextBlockOnDiskSize) { 1900 return false; 1901 } 1902 // Offset is important. Needed when we have to remake cachekey when block is returned to cache. 1903 if (castedComparison.offset != this.offset) { 1904 return false; 1905 } 1906 if (castedComparison.onDiskSizeWithoutHeader != this.onDiskSizeWithoutHeader) { 1907 return false; 1908 } 1909 if (castedComparison.prevBlockOffset != this.prevBlockOffset) { 1910 return false; 1911 } 1912 if (castedComparison.uncompressedSizeWithoutHeader != this.uncompressedSizeWithoutHeader) { 1913 return false; 1914 } 1915 if ( 1916 ByteBuff.compareTo(this.buf, 0, this.buf.limit(), castedComparison.buf, 0, 1917 castedComparison.buf.limit()) != 0 1918 ) { 1919 return false; 1920 } 1921 return true; 1922 } 1923 1924 DataBlockEncoding getDataBlockEncoding() { 1925 if (blockType == BlockType.ENCODED_DATA) { 1926 return DataBlockEncoding.getEncodingById(getDataBlockEncodingId()); 1927 } 1928 return DataBlockEncoding.NONE; 1929 } 1930 1931 byte getChecksumType() { 1932 return this.fileContext.getChecksumType().getCode(); 1933 } 1934 1935 int getBytesPerChecksum() { 1936 return this.fileContext.getBytesPerChecksum(); 1937 } 1938 1939 /** Returns the size of data on disk + header. Excludes checksum. */ 1940 int getOnDiskDataSizeWithHeader() { 1941 return this.onDiskDataSizeWithHeader; 1942 } 1943 1944 /** 1945 * Calculate the number of bytes required to store all the checksums for this block. Each checksum 1946 * value is a 4 byte integer. 1947 */ 1948 int totalChecksumBytes() { 1949 // If the hfile block has minorVersion 0, then there are no checksum 1950 // data to validate. Similarly, a zero value in this.bytesPerChecksum 1951 // indicates that cached blocks do not have checksum data because 1952 // checksums were already validated when the block was read from disk. 1953 if (!fileContext.isUseHBaseChecksum() || this.fileContext.getBytesPerChecksum() == 0) { 1954 return 0; 1955 } 1956 return (int) ChecksumUtil.numBytes(onDiskDataSizeWithHeader, 1957 this.fileContext.getBytesPerChecksum()); 1958 } 1959 1960 /** 1961 * Returns the size of this block header. 1962 */ 1963 public int headerSize() { 1964 return headerSize(this.fileContext.isUseHBaseChecksum()); 1965 } 1966 1967 /** 1968 * Maps a minor version to the size of the header. 1969 */ 1970 public static int headerSize(boolean usesHBaseChecksum) { 1971 return usesHBaseChecksum 1972 ? HConstants.HFILEBLOCK_HEADER_SIZE 1973 : HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM; 1974 } 1975 1976 /** 1977 * Return the appropriate DUMMY_HEADER for the minor version 1978 */ 1979 // TODO: Why is this in here? 1980 byte[] getDummyHeaderForVersion() { 1981 return getDummyHeaderForVersion(this.fileContext.isUseHBaseChecksum()); 1982 } 1983 1984 /** 1985 * Return the appropriate DUMMY_HEADER for the minor version 1986 */ 1987 static private byte[] getDummyHeaderForVersion(boolean usesHBaseChecksum) { 1988 return usesHBaseChecksum ? HConstants.HFILEBLOCK_DUMMY_HEADER : DUMMY_HEADER_NO_CHECKSUM; 1989 } 1990 1991 /** 1992 * @return This HFileBlocks fileContext which will a derivative of the fileContext for the file 1993 * from which this block's data was originally read. 1994 */ 1995 public HFileContext getHFileContext() { 1996 return this.fileContext; 1997 } 1998 1999 /** 2000 * Convert the contents of the block header into a human readable string. This is mostly helpful 2001 * for debugging. This assumes that the block has minor version > 0. 2002 */ 2003 static String toStringHeader(ByteBuff buf) throws IOException { 2004 byte[] magicBuf = new byte[Math.min(buf.limit() - buf.position(), BlockType.MAGIC_LENGTH)]; 2005 buf.get(magicBuf); 2006 BlockType bt = BlockType.parse(magicBuf, 0, BlockType.MAGIC_LENGTH); 2007 int compressedBlockSizeNoHeader = buf.getInt(); 2008 int uncompressedBlockSizeNoHeader = buf.getInt(); 2009 long prevBlockOffset = buf.getLong(); 2010 byte cksumtype = buf.get(); 2011 long bytesPerChecksum = buf.getInt(); 2012 long onDiskDataSizeWithHeader = buf.getInt(); 2013 return " Header dump: magic: " + Bytes.toString(magicBuf) + " blockType " + bt 2014 + " compressedBlockSizeNoHeader " + compressedBlockSizeNoHeader 2015 + " uncompressedBlockSizeNoHeader " + uncompressedBlockSizeNoHeader + " prevBlockOffset " 2016 + prevBlockOffset + " checksumType " + ChecksumType.codeToType(cksumtype) 2017 + " bytesPerChecksum " + bytesPerChecksum + " onDiskDataSizeWithHeader " 2018 + onDiskDataSizeWithHeader; 2019 } 2020 2021 /** 2022 * Creates a new HFileBlockBuilder from the existing block and a new ByteBuff. The builder will be 2023 * loaded with all of the original fields from blk, except now using the newBuff and setting 2024 * isSharedMem based on the source of the passed in newBuff. An existing HFileBlock may have been 2025 * an {@link ExclusiveMemHFileBlock}, but the new buffer might call for a 2026 * {@link SharedMemHFileBlock}. Or vice versa. 2027 * @param blk the block to clone from 2028 * @param newBuff the new buffer to use 2029 */ 2030 private static HFileBlockBuilder createBuilder(HFileBlock blk, ByteBuff newBuff) { 2031 return new HFileBlockBuilder().withBlockType(blk.blockType) 2032 .withOnDiskSizeWithoutHeader(blk.onDiskSizeWithoutHeader) 2033 .withUncompressedSizeWithoutHeader(blk.uncompressedSizeWithoutHeader) 2034 .withPrevBlockOffset(blk.prevBlockOffset).withByteBuff(newBuff).withOffset(blk.offset) 2035 .withOnDiskDataSizeWithHeader(blk.onDiskDataSizeWithHeader) 2036 .withNextBlockOnDiskSize(blk.nextBlockOnDiskSize).withHFileContext(blk.fileContext) 2037 .withByteBuffAllocator(blk.allocator).withShared(!newBuff.hasArray()); 2038 } 2039 2040 private static HFileBlock shallowClone(HFileBlock blk, ByteBuff newBuf) { 2041 return createBuilder(blk, newBuf).build(); 2042 } 2043 2044 static HFileBlock deepCloneOnHeap(HFileBlock blk) { 2045 ByteBuff deepCloned = ByteBuff.wrap(ByteBuffer.wrap(blk.buf.toBytes(0, blk.buf.limit()))); 2046 return createBuilder(blk, deepCloned).build(); 2047 } 2048}