001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP; 021import static org.apache.hadoop.hbase.io.hfile.BlockCompressedSizePredicator.BLOCK_COMPRESSED_SIZE_PREDICATOR; 022import static org.apache.hadoop.hbase.io.hfile.trace.HFileContextAttributesBuilderConsumer.CONTEXT_KEY; 023 024import io.opentelemetry.api.common.Attributes; 025import io.opentelemetry.api.common.AttributesBuilder; 026import io.opentelemetry.api.trace.Span; 027import io.opentelemetry.context.Context; 028import io.opentelemetry.context.Scope; 029import java.io.DataInputStream; 030import java.io.DataOutput; 031import java.io.DataOutputStream; 032import java.io.IOException; 033import java.nio.ByteBuffer; 034import java.util.ArrayList; 035import java.util.List; 036import java.util.Optional; 037import java.util.concurrent.atomic.AtomicReference; 038import java.util.concurrent.locks.Lock; 039import java.util.concurrent.locks.ReentrantLock; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.fs.FSDataInputStream; 042import org.apache.hadoop.fs.FSDataOutputStream; 043import org.apache.hadoop.hbase.ExtendedCell; 044import org.apache.hadoop.hbase.HBaseInterfaceAudience; 045import org.apache.hadoop.hbase.HConstants; 046import org.apache.hadoop.hbase.fs.HFileSystem; 047import org.apache.hadoop.hbase.io.ByteArrayOutputStream; 048import org.apache.hadoop.hbase.io.ByteBuffAllocator; 049import org.apache.hadoop.hbase.io.ByteBuffInputStream; 050import org.apache.hadoop.hbase.io.ByteBufferWriterDataOutputStream; 051import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; 052import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 053import org.apache.hadoop.hbase.io.encoding.EncodingState; 054import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; 055import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext; 056import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext; 057import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext; 058import org.apache.hadoop.hbase.io.hfile.trace.HFileContextAttributesBuilderConsumer; 059import org.apache.hadoop.hbase.io.util.BlockIOUtils; 060import org.apache.hadoop.hbase.monitoring.ThreadLocalServerSideScanMetrics; 061import org.apache.hadoop.hbase.nio.ByteBuff; 062import org.apache.hadoop.hbase.nio.MultiByteBuff; 063import org.apache.hadoop.hbase.nio.SingleByteBuff; 064import org.apache.hadoop.hbase.regionserver.ShipperListener; 065import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.ReadType; 066import org.apache.hadoop.hbase.util.Bytes; 067import org.apache.hadoop.hbase.util.ChecksumType; 068import org.apache.hadoop.hbase.util.ClassSize; 069import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 070import org.apache.hadoop.util.ReflectionUtils; 071import org.apache.yetus.audience.InterfaceAudience; 072import org.slf4j.Logger; 073import org.slf4j.LoggerFactory; 074 075import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 076 077/** 078 * Cacheable Blocks of an {@link HFile} version 2 file. Version 2 was introduced in hbase-0.92.0. 079 * <p> 080 * Version 1 was the original file block. Version 2 was introduced when we changed the hbase file 081 * format to support multi-level block indexes and compound bloom filters (HBASE-3857). Support for 082 * Version 1 was removed in hbase-1.3.0. 083 * <h3>HFileBlock: Version 2</h3> In version 2, a block is structured as follows: 084 * <ul> 085 * <li><b>Header:</b> See Writer#putHeader() for where header is written; header total size is 086 * HFILEBLOCK_HEADER_SIZE 087 * <ul> 088 * <li>0. blockType: Magic record identifying the {@link BlockType} (8 bytes): e.g. 089 * <code>DATABLK*</code> 090 * <li>1. onDiskSizeWithoutHeader: Compressed -- a.k.a 'on disk' -- block size, excluding header, 091 * but including tailing checksum bytes (4 bytes) 092 * <li>2. uncompressedSizeWithoutHeader: Uncompressed block size, excluding header, and excluding 093 * checksum bytes (4 bytes) 094 * <li>3. prevBlockOffset: The offset of the previous block of the same type (8 bytes). This is used 095 * to navigate to the previous block without having to go to the block index 096 * <li>4: For minorVersions >=1, the ordinal describing checksum type (1 byte) 097 * <li>5: For minorVersions >=1, the number of data bytes/checksum chunk (4 bytes) 098 * <li>6: onDiskDataSizeWithHeader: For minorVersions >=1, the size of data 'on disk', including 099 * header, excluding checksums (4 bytes) 100 * </ul> 101 * </li> 102 * <li><b>Raw/Compressed/Encrypted/Encoded data:</b> The compression algorithm is the same for all 103 * the blocks in an {@link HFile}. If compression is NONE, this is just raw, serialized Cells. 104 * <li><b>Tail:</b> For minorVersions >=1, a series of 4 byte checksums, one each for the number 105 * of bytes specified by bytesPerChecksum. 106 * </ul> 107 * <h3>Caching</h3> Caches cache whole blocks with trailing checksums if any. We then tag on some 108 * metadata, the content of BLOCK_METADATA_SPACE which will be flag on if we are doing 'hbase' 109 * checksums and then the offset into the file which is needed when we re-make a cache key when we 110 * return the block to the cache as 'done'. See {@link Cacheable#serialize(ByteBuffer, boolean)} and 111 * {@link Cacheable#getDeserializer()}. 112 * <p> 113 * TODO: Should we cache the checksums? Down in Writer#getBlockForCaching(CacheConfig) where we make 114 * a block to cache-on-write, there is an attempt at turning off checksums. This is not the only 115 * place we get blocks to cache. We also will cache the raw return from an hdfs read. In this case, 116 * the checksums may be present. If the cache is backed by something that doesn't do ECC, say an 117 * SSD, we might want to preserve checksums. For now this is open question. 118 * <p> 119 * TODO: Over in BucketCache, we save a block allocation by doing a custom serialization. Be sure to 120 * change it if serialization changes in here. Could we add a method here that takes an IOEngine and 121 * that then serializes to it rather than expose our internals over in BucketCache? IOEngine is in 122 * the bucket subpackage. Pull it up? Then this class knows about bucketcache. Ugh. 123 */ 124@InterfaceAudience.Private 125public class HFileBlock implements Cacheable { 126 private static final Logger LOG = LoggerFactory.getLogger(HFileBlock.class); 127 public static final long FIXED_OVERHEAD = ClassSize.estimateBase(HFileBlock.class, false); 128 129 // Block Header fields. 130 131 // TODO: encapsulate Header related logic in this inner class. 132 static class Header { 133 // Format of header is: 134 // 8 bytes - block magic 135 // 4 bytes int - onDiskSizeWithoutHeader 136 // 4 bytes int - uncompressedSizeWithoutHeader 137 // 8 bytes long - prevBlockOffset 138 // The following 3 are only present if header contains checksum information 139 // 1 byte - checksum type 140 // 4 byte int - bytes per checksum 141 // 4 byte int - onDiskDataSizeWithHeader 142 static int BLOCK_MAGIC_INDEX = 0; 143 static int ON_DISK_SIZE_WITHOUT_HEADER_INDEX = 8; 144 static int UNCOMPRESSED_SIZE_WITHOUT_HEADER_INDEX = 12; 145 static int PREV_BLOCK_OFFSET_INDEX = 16; 146 static int CHECKSUM_TYPE_INDEX = 24; 147 static int BYTES_PER_CHECKSUM_INDEX = 25; 148 static int ON_DISK_DATA_SIZE_WITH_HEADER_INDEX = 29; 149 } 150 151 /** Type of block. Header field 0. */ 152 private BlockType blockType; 153 154 /** 155 * Size on disk excluding header, including checksum. Header field 1. 156 * @see Writer#putHeader(byte[], int, int, int, int) 157 */ 158 private int onDiskSizeWithoutHeader; 159 160 /** 161 * Size of pure data. Does not include header or checksums. Header field 2. 162 * @see Writer#putHeader(byte[], int, int, int, int) 163 */ 164 private int uncompressedSizeWithoutHeader; 165 166 /** 167 * The offset of the previous block on disk. Header field 3. 168 * @see Writer#putHeader(byte[], int, int, int, int) 169 */ 170 private long prevBlockOffset; 171 172 /** 173 * Size on disk of header + data. Excludes checksum. Header field 6, OR calculated from 174 * {@link #onDiskSizeWithoutHeader} when using HDFS checksum. 175 * @see Writer#putHeader(byte[], int, int, int, int) 176 */ 177 private final int onDiskDataSizeWithHeader; 178 // End of Block Header fields. 179 180 /** 181 * The in-memory representation of the hfile block. Can be on or offheap. Can be backed by a 182 * single ByteBuffer or by many. Make no assumptions. 183 * <p> 184 * Be careful reading from this <code>buf</code>. Duplicate and work on the duplicate or if not, 185 * be sure to reset position and limit else trouble down the road. 186 * <p> 187 * TODO: Make this read-only once made. 188 * <p> 189 * We are using the ByteBuff type. ByteBuffer is not extensible yet we need to be able to have a 190 * ByteBuffer-like API across multiple ByteBuffers reading from a cache such as BucketCache. So, 191 * we have this ByteBuff type. Unfortunately, it is spread all about HFileBlock. Would be good if 192 * could be confined to cache-use only but hard-to-do. 193 * <p> 194 * NOTE: this byteBuff including HFileBlock header and data, but excluding checksum. 195 */ 196 private ByteBuff bufWithoutChecksum; 197 198 /** 199 * Meta data that holds meta information on the hfileblock. 200 */ 201 private final HFileContext fileContext; 202 203 /** 204 * The offset of this block in the file. Populated by the reader for convenience of access. This 205 * offset is not part of the block header. 206 */ 207 private long offset = UNSET; 208 209 /** 210 * The on-disk size of the next block, including the header and checksums if present. UNSET if 211 * unknown. Blocks try to carry the size of the next block to read in this data member. Usually we 212 * get block sizes from the hfile index but sometimes the index is not available: e.g. when we 213 * read the indexes themselves (indexes are stored in blocks, we do not have an index for the 214 * indexes). Saves seeks especially around file open when there is a flurry of reading in hfile 215 * metadata. 216 */ 217 private int nextBlockOnDiskSize = UNSET; 218 219 private ByteBuffAllocator allocator; 220 221 /** 222 * On a checksum failure, do these many succeeding read requests using hdfs checksums before 223 * auto-reenabling hbase checksum verification. 224 */ 225 static final int CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD = 3; 226 227 private static int UNSET = -1; 228 public static final boolean FILL_HEADER = true; 229 public static final boolean DONT_FILL_HEADER = false; 230 231 // How to get the estimate correctly? if it is a singleBB? 232 public static final int MULTI_BYTE_BUFFER_HEAP_SIZE = 233 (int) ClassSize.estimateBase(MultiByteBuff.class, false); 234 235 /** 236 * Space for metadata on a block that gets stored along with the block when we cache it. There are 237 * a few bytes stuck on the end of the HFileBlock that we pull in from HDFS. 8 bytes are for the 238 * offset of this block (long) in the file. Offset is important because is is used when we remake 239 * the CacheKey when we return block to the cache when done. There is also a flag on whether 240 * checksumming is being done by hbase or not. See class comment for note on uncertain state of 241 * checksumming of blocks that come out of cache (should we or should we not?). Finally there are 242 * 4 bytes to hold the length of the next block which can save a seek on occasion if available. 243 * (This EXTRA info came in with original commit of the bucketcache, HBASE-7404. It was formerly 244 * known as EXTRA_SERIALIZATION_SPACE). 245 */ 246 public static final int BLOCK_METADATA_SPACE = 247 Bytes.SIZEOF_BYTE + Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT; 248 249 /** 250 * Each checksum value is an integer that can be stored in 4 bytes. 251 */ 252 static final int CHECKSUM_SIZE = Bytes.SIZEOF_INT; 253 254 static final byte[] DUMMY_HEADER_NO_CHECKSUM = 255 new byte[HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM]; 256 257 /** 258 * Used deserializing blocks from Cache. <code> 259 * ++++++++++++++ 260 * + HFileBlock + 261 * ++++++++++++++ 262 * + Checksums + <= Optional 263 * ++++++++++++++ 264 * + Metadata! + <= See note on BLOCK_METADATA_SPACE above. 265 * ++++++++++++++ 266 * </code> 267 * @see #serialize(ByteBuffer, boolean) 268 */ 269 public static final CacheableDeserializer<Cacheable> BLOCK_DESERIALIZER = new BlockDeserializer(); 270 271 public static final class BlockDeserializer implements CacheableDeserializer<Cacheable> { 272 private BlockDeserializer() { 273 } 274 275 @Override 276 public HFileBlock deserialize(ByteBuff buf, ByteBuffAllocator alloc) throws IOException { 277 // The buf has the file block followed by block metadata. 278 // Set limit to just before the BLOCK_METADATA_SPACE then rewind. 279 buf.limit(buf.limit() - BLOCK_METADATA_SPACE).rewind(); 280 // Get a new buffer to pass the HFileBlock for it to 'own'. 281 ByteBuff newByteBuff = buf.slice(); 282 // Read out the BLOCK_METADATA_SPACE content and shove into our HFileBlock. 283 buf.position(buf.limit()); 284 buf.limit(buf.limit() + HFileBlock.BLOCK_METADATA_SPACE); 285 boolean usesChecksum = buf.get() == (byte) 1; 286 long offset = buf.getLong(); 287 int nextBlockOnDiskSize = buf.getInt(); 288 return createFromBuff(newByteBuff, usesChecksum, offset, nextBlockOnDiskSize, null, alloc); 289 } 290 291 @Override 292 public int getDeserializerIdentifier() { 293 return DESERIALIZER_IDENTIFIER; 294 } 295 } 296 297 private static final int DESERIALIZER_IDENTIFIER; 298 static { 299 DESERIALIZER_IDENTIFIER = 300 CacheableDeserializerIdManager.registerDeserializer(BLOCK_DESERIALIZER); 301 } 302 303 private final int totalChecksumBytes; 304 305 /** 306 * Creates a new {@link HFile} block from the given fields. This constructor is used only while 307 * writing blocks and caching, and is sitting in a byte buffer and we want to stuff the block into 308 * cache. 309 * <p> 310 * TODO: The caller presumes no checksumming 311 * <p> 312 * TODO: HFile block writer can also off-heap ? 313 * </p> 314 * required of this block instance since going into cache; checksum already verified on underlying 315 * block data pulled in from filesystem. Is that correct? What if cache is SSD? 316 * @param blockType the type of this block, see {@link BlockType} 317 * @param onDiskSizeWithoutHeader see {@link #onDiskSizeWithoutHeader} 318 * @param uncompressedSizeWithoutHeader see {@link #uncompressedSizeWithoutHeader} 319 * @param prevBlockOffset see {@link #prevBlockOffset} 320 * @param buf block buffer with header 321 * ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes) 322 * @param fillHeader when true, write the first 4 header fields into passed 323 * buffer. 324 * @param offset the file offset the block was read from 325 * @param onDiskDataSizeWithHeader see {@link #onDiskDataSizeWithHeader} 326 * @param fileContext HFile meta data 327 */ 328 public HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader, 329 int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuff buf, boolean fillHeader, 330 long offset, int nextBlockOnDiskSize, int onDiskDataSizeWithHeader, HFileContext fileContext, 331 ByteBuffAllocator allocator) { 332 this.blockType = blockType; 333 this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader; 334 this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader; 335 this.prevBlockOffset = prevBlockOffset; 336 this.offset = offset; 337 this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader; 338 this.nextBlockOnDiskSize = nextBlockOnDiskSize; 339 this.fileContext = fileContext; 340 this.allocator = allocator; 341 this.bufWithoutChecksum = buf; 342 if (fillHeader) { 343 overwriteHeader(); 344 } 345 this.bufWithoutChecksum.rewind(); 346 this.totalChecksumBytes = computeTotalChecksumBytes(); 347 } 348 349 /** 350 * Creates a block from an existing buffer starting with a header. Rewinds and takes ownership of 351 * the buffer. By definition of rewind, ignores the buffer position, but if you slice the buffer 352 * beforehand, it will rewind to that point. 353 * @param buf Has header, content, and trailing checksums if present. 354 */ 355 static HFileBlock createFromBuff(ByteBuff buf, boolean usesHBaseChecksum, final long offset, 356 final int nextBlockOnDiskSize, HFileContext fileContext, ByteBuffAllocator allocator) 357 throws IOException { 358 buf.rewind(); 359 final BlockType blockType = BlockType.read(buf); 360 final int onDiskSizeWithoutHeader = buf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX); 361 final int uncompressedSizeWithoutHeader = 362 buf.getInt(Header.UNCOMPRESSED_SIZE_WITHOUT_HEADER_INDEX); 363 final long prevBlockOffset = buf.getLong(Header.PREV_BLOCK_OFFSET_INDEX); 364 // This constructor is called when we deserialize a block from cache and when we read a block in 365 // from the fs. fileCache is null when deserialized from cache so need to make up one. 366 HFileContextBuilder fileContextBuilder = 367 fileContext != null ? new HFileContextBuilder(fileContext) : new HFileContextBuilder(); 368 fileContextBuilder.withHBaseCheckSum(usesHBaseChecksum); 369 int onDiskDataSizeWithHeader; 370 if (usesHBaseChecksum) { 371 byte checksumType = buf.get(Header.CHECKSUM_TYPE_INDEX); 372 int bytesPerChecksum = buf.getInt(Header.BYTES_PER_CHECKSUM_INDEX); 373 onDiskDataSizeWithHeader = buf.getInt(Header.ON_DISK_DATA_SIZE_WITH_HEADER_INDEX); 374 // Use the checksum type and bytes per checksum from header, not from fileContext. 375 fileContextBuilder.withChecksumType(ChecksumType.codeToType(checksumType)); 376 fileContextBuilder.withBytesPerCheckSum(bytesPerChecksum); 377 } else { 378 fileContextBuilder.withChecksumType(ChecksumType.NULL); 379 fileContextBuilder.withBytesPerCheckSum(0); 380 // Need to fix onDiskDataSizeWithHeader; there are not checksums after-block-data 381 onDiskDataSizeWithHeader = onDiskSizeWithoutHeader + headerSize(usesHBaseChecksum); 382 } 383 fileContext = fileContextBuilder.build(); 384 assert usesHBaseChecksum == fileContext.isUseHBaseChecksum(); 385 return new HFileBlockBuilder().withBlockType(blockType) 386 .withOnDiskSizeWithoutHeader(onDiskSizeWithoutHeader) 387 .withUncompressedSizeWithoutHeader(uncompressedSizeWithoutHeader) 388 .withPrevBlockOffset(prevBlockOffset).withOffset(offset) 389 .withOnDiskDataSizeWithHeader(onDiskDataSizeWithHeader) 390 .withNextBlockOnDiskSize(nextBlockOnDiskSize).withHFileContext(fileContext) 391 .withByteBuffAllocator(allocator).withByteBuff(buf.rewind()).withShared(!buf.hasArray()) 392 .build(); 393 } 394 395 /** 396 * Parse total on disk size including header and checksum. 397 * @param headerBuf Header ByteBuffer. Presumed exact size of header. 398 * @param checksumSupport true if checksum verification is in use. 399 * @return Size of the block with header included. 400 */ 401 private static int getOnDiskSizeWithHeader(final ByteBuff headerBuf, boolean checksumSupport) { 402 return headerBuf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX) + headerSize(checksumSupport); 403 } 404 405 /** 406 * @return the on-disk size of the next block (including the header size and any checksums if 407 * present) read by peeking into the next block's header; use as a hint when doing a read 408 * of the next block when scanning or running over a file. 409 */ 410 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST) 411 public int getNextBlockOnDiskSize() { 412 return nextBlockOnDiskSize; 413 } 414 415 @Override 416 public BlockType getBlockType() { 417 return blockType; 418 } 419 420 @Override 421 public int refCnt() { 422 return bufWithoutChecksum.refCnt(); 423 } 424 425 @Override 426 public HFileBlock retain() { 427 bufWithoutChecksum.retain(); 428 return this; 429 } 430 431 /** 432 * Call {@link ByteBuff#release()} to decrease the reference count, if no other reference, it will 433 * return back the {@link ByteBuffer} to {@link org.apache.hadoop.hbase.io.ByteBuffAllocator} 434 */ 435 @Override 436 public boolean release() { 437 return bufWithoutChecksum.release(); 438 } 439 440 /** 441 * Calling this method in strategic locations where HFileBlocks are referenced may help diagnose 442 * potential buffer leaks. We pass the block itself as a default hint, but one can use 443 * {@link #touch(Object)} to pass their own hint as well. 444 */ 445 @Override 446 public HFileBlock touch() { 447 return touch(this); 448 } 449 450 @Override 451 public HFileBlock touch(Object hint) { 452 bufWithoutChecksum.touch(hint); 453 return this; 454 } 455 456 /** Returns get data block encoding id that was used to encode this block */ 457 short getDataBlockEncodingId() { 458 if (blockType != BlockType.ENCODED_DATA) { 459 throw new IllegalArgumentException("Querying encoder ID of a block " + "of type other than " 460 + BlockType.ENCODED_DATA + ": " + blockType); 461 } 462 return bufWithoutChecksum.getShort(headerSize()); 463 } 464 465 /** Returns the on-disk size of header + data part + checksum. */ 466 public int getOnDiskSizeWithHeader() { 467 return onDiskSizeWithoutHeader + headerSize(); 468 } 469 470 /** Returns the on-disk size of the data part + checksum (header excluded). */ 471 int getOnDiskSizeWithoutHeader() { 472 return onDiskSizeWithoutHeader; 473 } 474 475 /** Returns the uncompressed size of data part (header and checksum excluded). */ 476 public int getUncompressedSizeWithoutHeader() { 477 return uncompressedSizeWithoutHeader; 478 } 479 480 /** Returns the offset of the previous block of the same type in the file, or -1 if unknown */ 481 long getPrevBlockOffset() { 482 return prevBlockOffset; 483 } 484 485 /** 486 * Rewinds {@code buf} and writes first 4 header fields. {@code buf} position is modified as 487 * side-effect. 488 */ 489 private void overwriteHeader() { 490 bufWithoutChecksum.rewind(); 491 blockType.write(bufWithoutChecksum); 492 bufWithoutChecksum.putInt(onDiskSizeWithoutHeader); 493 bufWithoutChecksum.putInt(uncompressedSizeWithoutHeader); 494 bufWithoutChecksum.putLong(prevBlockOffset); 495 if (this.fileContext.isUseHBaseChecksum()) { 496 bufWithoutChecksum.put(fileContext.getChecksumType().getCode()); 497 bufWithoutChecksum.putInt(fileContext.getBytesPerChecksum()); 498 bufWithoutChecksum.putInt(onDiskDataSizeWithHeader); 499 } 500 } 501 502 /** 503 * Returns a buffer that does not include the header and checksum. 504 * @return the buffer with header skipped and checksum omitted. 505 */ 506 public ByteBuff getBufferWithoutHeader() { 507 ByteBuff dup = getBufferReadOnly(); 508 return dup.position(headerSize()).slice(); 509 } 510 511 /** 512 * Returns a read-only duplicate of the buffer this block stores internally ready to be read. 513 * Clients must not modify the buffer object though they may set position and limit on the 514 * returned buffer since we pass back a duplicate. This method has to be public because it is used 515 * in {@link CompoundBloomFilter} to avoid object creation on every Bloom filter lookup, but has 516 * to be used with caution. Buffer holds header, block content, and any follow-on checksums if 517 * present. 518 * @return the buffer of this block for read-only operations,the buffer includes header,but not 519 * checksum. 520 */ 521 public ByteBuff getBufferReadOnly() { 522 // TODO: ByteBuf does not support asReadOnlyBuffer(). Fix. 523 ByteBuff dup = this.bufWithoutChecksum.duplicate(); 524 assert dup.position() == 0; 525 return dup; 526 } 527 528 public ByteBuffAllocator getByteBuffAllocator() { 529 return this.allocator; 530 } 531 532 private void sanityCheckAssertion(long valueFromBuf, long valueFromField, String fieldName) 533 throws IOException { 534 if (valueFromBuf != valueFromField) { 535 throw new AssertionError(fieldName + " in the buffer (" + valueFromBuf 536 + ") is different from that in the field (" + valueFromField + ")"); 537 } 538 } 539 540 private void sanityCheckAssertion(BlockType valueFromBuf, BlockType valueFromField) 541 throws IOException { 542 if (valueFromBuf != valueFromField) { 543 throw new IOException("Block type stored in the buffer: " + valueFromBuf 544 + ", block type field: " + valueFromField); 545 } 546 } 547 548 /** 549 * Checks if the block is internally consistent, i.e. the first 550 * {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the buffer contain a valid header consistent 551 * with the fields. Assumes a packed block structure. This function is primary for testing and 552 * debugging, and is not thread-safe, because it alters the internal buffer pointer. Used by tests 553 * only. 554 */ 555 void sanityCheck() throws IOException { 556 // Duplicate so no side-effects 557 ByteBuff dup = this.bufWithoutChecksum.duplicate().rewind(); 558 sanityCheckAssertion(BlockType.read(dup), blockType); 559 560 sanityCheckAssertion(dup.getInt(), onDiskSizeWithoutHeader, "onDiskSizeWithoutHeader"); 561 562 sanityCheckAssertion(dup.getInt(), uncompressedSizeWithoutHeader, 563 "uncompressedSizeWithoutHeader"); 564 565 sanityCheckAssertion(dup.getLong(), prevBlockOffset, "prevBlockOffset"); 566 if (this.fileContext.isUseHBaseChecksum()) { 567 sanityCheckAssertion(dup.get(), this.fileContext.getChecksumType().getCode(), "checksumType"); 568 sanityCheckAssertion(dup.getInt(), this.fileContext.getBytesPerChecksum(), 569 "bytesPerChecksum"); 570 sanityCheckAssertion(dup.getInt(), onDiskDataSizeWithHeader, "onDiskDataSizeWithHeader"); 571 } 572 573 if (dup.limit() != onDiskDataSizeWithHeader) { 574 throw new AssertionError( 575 "Expected limit " + onDiskDataSizeWithHeader + ", got " + dup.limit()); 576 } 577 578 // We might optionally allocate HFILEBLOCK_HEADER_SIZE more bytes to read the next 579 // block's header, so there are two sensible values for buffer capacity. 580 int hdrSize = headerSize(); 581 dup.rewind(); 582 if ( 583 dup.remaining() != onDiskDataSizeWithHeader 584 && dup.remaining() != onDiskDataSizeWithHeader + hdrSize 585 ) { 586 throw new AssertionError("Invalid buffer capacity: " + dup.remaining() + ", expected " 587 + onDiskDataSizeWithHeader + " or " + (onDiskDataSizeWithHeader + hdrSize)); 588 } 589 } 590 591 @Override 592 public String toString() { 593 StringBuilder sb = new StringBuilder().append("[").append("blockType=").append(blockType) 594 .append(", fileOffset=").append(offset).append(", headerSize=").append(headerSize()) 595 .append(", onDiskSizeWithoutHeader=").append(onDiskSizeWithoutHeader) 596 .append(", uncompressedSizeWithoutHeader=").append(uncompressedSizeWithoutHeader) 597 .append(", prevBlockOffset=").append(prevBlockOffset).append(", isUseHBaseChecksum=") 598 .append(fileContext.isUseHBaseChecksum()); 599 if (fileContext.isUseHBaseChecksum()) { 600 sb.append(", checksumType=").append(ChecksumType.codeToType(this.bufWithoutChecksum.get(24))) 601 .append(", bytesPerChecksum=").append(this.bufWithoutChecksum.getInt(24 + 1)) 602 .append(", onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader); 603 } else { 604 sb.append(", onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader).append("(") 605 .append(onDiskSizeWithoutHeader).append("+") 606 .append(HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM).append(")"); 607 } 608 String dataBegin; 609 if (bufWithoutChecksum.hasArray()) { 610 dataBegin = Bytes.toStringBinary(bufWithoutChecksum.array(), 611 bufWithoutChecksum.arrayOffset() + headerSize(), 612 Math.min(32, bufWithoutChecksum.limit() - bufWithoutChecksum.arrayOffset() - headerSize())); 613 } else { 614 ByteBuff bufWithoutHeader = getBufferWithoutHeader(); 615 byte[] dataBeginBytes = 616 new byte[Math.min(32, bufWithoutHeader.limit() - bufWithoutHeader.position())]; 617 bufWithoutHeader.get(dataBeginBytes); 618 dataBegin = Bytes.toStringBinary(dataBeginBytes); 619 } 620 sb.append(", getOnDiskSizeWithHeader=").append(getOnDiskSizeWithHeader()) 621 .append(", totalChecksumBytes=").append(totalChecksumBytes()).append(", isUnpacked=") 622 .append(isUnpacked()).append(", buf=[").append(bufWithoutChecksum).append("]") 623 .append(", dataBeginsWith=").append(dataBegin).append(", fileContext=").append(fileContext) 624 .append(", nextBlockOnDiskSize=").append(nextBlockOnDiskSize).append("]"); 625 return sb.toString(); 626 } 627 628 /** 629 * Retrieves the decompressed/decrypted view of this block. An encoded block remains in its 630 * encoded structure. Internal structures are shared between instances where applicable. 631 */ 632 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST) 633 public HFileBlock unpack(HFileContext fileContext, FSReader reader) throws IOException { 634 if (!fileContext.isCompressedOrEncrypted()) { 635 // TODO: cannot use our own fileContext here because HFileBlock(ByteBuffer, boolean), 636 // which is used for block serialization to L2 cache, does not preserve encoding and 637 // encryption details. 638 return this; 639 } 640 641 ByteBuff newBuf = allocateBufferForUnpacking(); // allocates space for the decompressed block 642 HFileBlock unpacked = shallowClone(this, newBuf); 643 644 boolean succ = false; 645 final Context context = 646 Context.current().with(CONTEXT_KEY, new HFileContextAttributesBuilderConsumer(fileContext)); 647 try (Scope ignored = context.makeCurrent()) { 648 HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA 649 ? reader.getBlockDecodingContext() 650 : reader.getDefaultBlockDecodingContext(); 651 // Create a duplicated buffer without the header part. 652 int headerSize = this.headerSize(); 653 ByteBuff dup = this.bufWithoutChecksum.duplicate(); 654 dup.position(headerSize); 655 dup = dup.slice(); 656 // Decode the dup into unpacked#buf 657 ctx.prepareDecoding(unpacked.getOnDiskDataSizeWithHeader() - headerSize, 658 unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(), dup); 659 succ = true; 660 return unpacked; 661 } finally { 662 if (!succ) { 663 unpacked.release(); 664 } 665 } 666 } 667 668 /** 669 * Always allocates a new buffer of the correct size. Copies header bytes from the existing 670 * buffer. Does not change header fields. Reserve room to keep checksum bytes too. 671 */ 672 private ByteBuff allocateBufferForUnpacking() { 673 int headerSize = headerSize(); 674 int capacityNeeded = headerSize + uncompressedSizeWithoutHeader; 675 676 ByteBuff source = bufWithoutChecksum.duplicate(); 677 ByteBuff newBuf = allocator.allocate(capacityNeeded); 678 679 // Copy header bytes into newBuf. 680 source.position(0); 681 newBuf.put(0, source, 0, headerSize); 682 683 // set limit to exclude next block's header 684 newBuf.limit(capacityNeeded); 685 return newBuf; 686 } 687 688 /** 689 * Return true when this block's buffer has been unpacked, false otherwise. Note this is a 690 * calculated heuristic, not tracked attribute of the block. 691 */ 692 public boolean isUnpacked() { 693 final int headerSize = headerSize(); 694 final int expectedCapacity = headerSize + uncompressedSizeWithoutHeader; 695 final int bufCapacity = bufWithoutChecksum.remaining(); 696 return bufCapacity == expectedCapacity || bufCapacity == expectedCapacity + headerSize; 697 } 698 699 /** 700 * Cannot be {@link #UNSET}. Must be a legitimate value. Used re-making the {@link BlockCacheKey} 701 * when block is returned to the cache. 702 * @return the offset of this block in the file it was read from 703 */ 704 public long getOffset() { 705 if (offset < 0) { 706 throw new IllegalStateException("HFile block offset not initialized properly"); 707 } 708 return offset; 709 } 710 711 /** Returns a byte stream reading the data(excluding header and checksum) of this block */ 712 DataInputStream getByteStream() { 713 ByteBuff dup = this.bufWithoutChecksum.duplicate(); 714 dup.position(this.headerSize()); 715 return new DataInputStream(new ByteBuffInputStream(dup)); 716 } 717 718 @Override 719 public long heapSize() { 720 long size = FIXED_OVERHEAD; 721 size += fileContext.heapSize(); 722 if (bufWithoutChecksum != null) { 723 // Deep overhead of the byte buffer. Needs to be aligned separately. 724 size += ClassSize.align(bufWithoutChecksum.capacity() + MULTI_BYTE_BUFFER_HEAP_SIZE); 725 } 726 return ClassSize.align(size); 727 } 728 729 /** 730 * Will be override by {@link SharedMemHFileBlock} or {@link ExclusiveMemHFileBlock}. Return true 731 * by default. 732 */ 733 public boolean isSharedMem() { 734 return true; 735 } 736 737 /** 738 * Unified version 2 {@link HFile} block writer. The intended usage pattern is as follows: 739 * <ol> 740 * <li>Construct an {@link HFileBlock.Writer}, providing a compression algorithm. 741 * <li>Call {@link Writer#startWriting} and get a data stream to write to. 742 * <li>Write your data into the stream. 743 * <li>Call Writer#writeHeaderAndData(FSDataOutputStream) as many times as you need to. store the 744 * serialized block into an external stream. 745 * <li>Repeat to write more blocks. 746 * </ol> 747 * <p> 748 */ 749 static class Writer implements ShipperListener { 750 private enum State { 751 INIT, 752 WRITING, 753 BLOCK_READY 754 } 755 756 private int maxSizeUnCompressed; 757 758 private BlockCompressedSizePredicator compressedSizePredicator; 759 760 /** Writer state. Used to ensure the correct usage protocol. */ 761 private State state = State.INIT; 762 763 /** Data block encoder used for data blocks */ 764 private final HFileDataBlockEncoder dataBlockEncoder; 765 766 private HFileBlockEncodingContext dataBlockEncodingCtx; 767 768 /** block encoding context for non-data blocks */ 769 private HFileBlockDefaultEncodingContext defaultBlockEncodingCtx; 770 771 /** 772 * The stream we use to accumulate data into a block in an uncompressed format. We reset this 773 * stream at the end of each block and reuse it. The header is written as the first 774 * {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes into this stream. 775 */ 776 private ByteArrayOutputStream baosInMemory; 777 778 /** 779 * Current block type. Set in {@link #startWriting(BlockType)}. Could be changed in 780 * {@link #finishBlock()} from {@link BlockType#DATA} to {@link BlockType#ENCODED_DATA}. 781 */ 782 private BlockType blockType; 783 784 /** 785 * A stream that we write uncompressed bytes to, which compresses them and writes them to 786 * {@link #baosInMemory}. 787 */ 788 private DataOutputStream userDataStream; 789 790 /** 791 * Bytes to be written to the file system, including the header. Compressed if compression is 792 * turned on. It also includes the checksum data that immediately follows the block data. 793 * (header + data + checksums) 794 */ 795 private ByteArrayOutputStream onDiskBlockBytesWithHeader; 796 797 /** 798 * The size of the checksum data on disk. It is used only if data is not compressed. If data is 799 * compressed, then the checksums are already part of onDiskBytesWithHeader. If data is 800 * uncompressed, then this variable stores the checksum data for this block. 801 */ 802 private byte[] onDiskChecksum = HConstants.EMPTY_BYTE_ARRAY; 803 804 /** 805 * Current block's start offset in the {@link HFile}. Set in 806 * {@link #writeHeaderAndData(FSDataOutputStream)}. 807 */ 808 private long startOffset; 809 810 /** 811 * Offset of previous block by block type. Updated when the next block is started. 812 */ 813 private long[] prevOffsetByType; 814 815 /** The offset of the previous block of the same type */ 816 private long prevOffset; 817 /** Meta data that holds information about the hfileblock **/ 818 private HFileContext fileContext; 819 820 private final ByteBuffAllocator allocator; 821 822 @Override 823 public void beforeShipped() { 824 if (getEncodingState() != null) { 825 getEncodingState().beforeShipped(); 826 } 827 } 828 829 EncodingState getEncodingState() { 830 return dataBlockEncodingCtx.getEncodingState(); 831 } 832 833 /** 834 * @param dataBlockEncoder data block encoding algorithm to use 835 */ 836 public Writer(Configuration conf, HFileDataBlockEncoder dataBlockEncoder, 837 HFileContext fileContext) { 838 this(conf, dataBlockEncoder, fileContext, ByteBuffAllocator.HEAP, fileContext.getBlocksize()); 839 } 840 841 public Writer(Configuration conf, HFileDataBlockEncoder dataBlockEncoder, 842 HFileContext fileContext, ByteBuffAllocator allocator, int maxSizeUnCompressed) { 843 if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) { 844 throw new RuntimeException("Unsupported value of bytesPerChecksum. " + " Minimum is " 845 + HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is " 846 + fileContext.getBytesPerChecksum()); 847 } 848 this.allocator = allocator; 849 this.dataBlockEncoder = 850 dataBlockEncoder != null ? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE; 851 this.dataBlockEncodingCtx = this.dataBlockEncoder.newDataBlockEncodingContext(conf, 852 HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext); 853 // TODO: This should be lazily instantiated 854 this.defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(conf, null, 855 HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext); 856 // TODO: Set BAOS initial size. Use fileContext.getBlocksize() and add for header/checksum 857 baosInMemory = new ByteArrayOutputStream(); 858 prevOffsetByType = new long[BlockType.values().length]; 859 for (int i = 0; i < prevOffsetByType.length; ++i) { 860 prevOffsetByType[i] = UNSET; 861 } 862 // TODO: Why fileContext saved away when we have dataBlockEncoder and/or 863 // defaultDataBlockEncoder? 864 this.fileContext = fileContext; 865 this.compressedSizePredicator = (BlockCompressedSizePredicator) ReflectionUtils.newInstance( 866 conf.getClass(BLOCK_COMPRESSED_SIZE_PREDICATOR, UncompressedBlockSizePredicator.class), 867 new Configuration(conf)); 868 this.maxSizeUnCompressed = maxSizeUnCompressed; 869 } 870 871 /** 872 * Starts writing into the block. The previous block's data is discarded. 873 * @return the stream the user can write their data into 874 */ 875 DataOutputStream startWriting(BlockType newBlockType) throws IOException { 876 if (state == State.BLOCK_READY && startOffset != -1) { 877 // We had a previous block that was written to a stream at a specific 878 // offset. Save that offset as the last offset of a block of that type. 879 prevOffsetByType[blockType.getId()] = startOffset; 880 } 881 882 startOffset = -1; 883 blockType = newBlockType; 884 885 baosInMemory.reset(); 886 baosInMemory.write(HConstants.HFILEBLOCK_DUMMY_HEADER); 887 888 state = State.WRITING; 889 890 // We will compress it later in finishBlock() 891 userDataStream = new ByteBufferWriterDataOutputStream(baosInMemory); 892 if (newBlockType == BlockType.DATA) { 893 this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream); 894 } 895 return userDataStream; 896 } 897 898 /** 899 * Writes the Cell to this block 900 */ 901 void write(ExtendedCell cell) throws IOException { 902 expectState(State.WRITING); 903 this.dataBlockEncoder.encode(cell, dataBlockEncodingCtx, this.userDataStream); 904 } 905 906 /** 907 * Transitions the block writer from the "writing" state to the "block ready" state. Does 908 * nothing if a block is already finished. 909 */ 910 void ensureBlockReady() throws IOException { 911 Preconditions.checkState(state != State.INIT, "Unexpected state: " + state); 912 913 if (state == State.BLOCK_READY) { 914 return; 915 } 916 917 // This will set state to BLOCK_READY. 918 finishBlock(); 919 } 920 921 public boolean checkBoundariesWithPredicate() { 922 int rawBlockSize = encodedBlockSizeWritten(); 923 if (rawBlockSize >= maxSizeUnCompressed) { 924 return true; 925 } else { 926 return compressedSizePredicator.shouldFinishBlock(rawBlockSize); 927 } 928 } 929 930 /** 931 * Finish up writing of the block. Flushes the compressing stream (if using compression), fills 932 * out the header, does any compression/encryption of bytes to flush out to disk, and manages 933 * the cache on write content, if applicable. Sets block write state to "block ready". 934 */ 935 private void finishBlock() throws IOException { 936 if (blockType == BlockType.DATA) { 937 this.dataBlockEncoder.endBlockEncoding(dataBlockEncodingCtx, userDataStream, 938 baosInMemory.getBuffer(), blockType); 939 blockType = dataBlockEncodingCtx.getBlockType(); 940 } 941 userDataStream.flush(); 942 prevOffset = prevOffsetByType[blockType.getId()]; 943 944 // We need to cache the unencoded/uncompressed size before changing the block state 945 int rawBlockSize = 0; 946 if (this.getEncodingState() != null) { 947 rawBlockSize = encodedBlockSizeWritten(); 948 } 949 // We need to set state before we can package the block up for cache-on-write. In a way, the 950 // block is ready, but not yet encoded or compressed. 951 state = State.BLOCK_READY; 952 Bytes compressAndEncryptDat; 953 if (blockType == BlockType.DATA || blockType == BlockType.ENCODED_DATA) { 954 compressAndEncryptDat = 955 dataBlockEncodingCtx.compressAndEncrypt(baosInMemory.getBuffer(), 0, baosInMemory.size()); 956 } else { 957 compressAndEncryptDat = defaultBlockEncodingCtx.compressAndEncrypt(baosInMemory.getBuffer(), 958 0, baosInMemory.size()); 959 } 960 if (compressAndEncryptDat == null) { 961 compressAndEncryptDat = new Bytes(baosInMemory.getBuffer(), 0, baosInMemory.size()); 962 } 963 if (onDiskBlockBytesWithHeader == null) { 964 onDiskBlockBytesWithHeader = new ByteArrayOutputStream(compressAndEncryptDat.getLength()); 965 } 966 onDiskBlockBytesWithHeader.reset(); 967 onDiskBlockBytesWithHeader.write(compressAndEncryptDat.get(), 968 compressAndEncryptDat.getOffset(), compressAndEncryptDat.getLength()); 969 // Update raw and compressed sizes in the predicate 970 compressedSizePredicator.updateLatestBlockSizes(fileContext, rawBlockSize, 971 onDiskBlockBytesWithHeader.size()); 972 973 // Calculate how many bytes we need for checksum on the tail of the block. 974 int numBytes = (int) ChecksumUtil.numBytes(onDiskBlockBytesWithHeader.size(), 975 fileContext.getBytesPerChecksum()); 976 977 // Put the header for the on disk bytes; header currently is unfilled-out 978 putHeader(onDiskBlockBytesWithHeader, onDiskBlockBytesWithHeader.size() + numBytes, 979 baosInMemory.size(), onDiskBlockBytesWithHeader.size()); 980 981 if (onDiskChecksum.length != numBytes) { 982 onDiskChecksum = new byte[numBytes]; 983 } 984 ChecksumUtil.generateChecksums(onDiskBlockBytesWithHeader.getBuffer(), 0, 985 onDiskBlockBytesWithHeader.size(), onDiskChecksum, 0, fileContext.getChecksumType(), 986 fileContext.getBytesPerChecksum()); 987 } 988 989 /** 990 * Put the header into the given byte array at the given offset. 991 * @param onDiskSize size of the block on disk header + data + checksum 992 * @param uncompressedSize size of the block after decompression (but before optional data block 993 * decoding) including header 994 * @param onDiskDataSize size of the block on disk with header and data but not including the 995 * checksums 996 */ 997 private void putHeader(byte[] dest, int offset, int onDiskSize, int uncompressedSize, 998 int onDiskDataSize) { 999 offset = blockType.put(dest, offset); 1000 offset = Bytes.putInt(dest, offset, onDiskSize - HConstants.HFILEBLOCK_HEADER_SIZE); 1001 offset = Bytes.putInt(dest, offset, uncompressedSize - HConstants.HFILEBLOCK_HEADER_SIZE); 1002 offset = Bytes.putLong(dest, offset, prevOffset); 1003 offset = Bytes.putByte(dest, offset, fileContext.getChecksumType().getCode()); 1004 offset = Bytes.putInt(dest, offset, fileContext.getBytesPerChecksum()); 1005 Bytes.putInt(dest, offset, onDiskDataSize); 1006 } 1007 1008 private void putHeader(ByteBuff buff, int onDiskSize, int uncompressedSize, 1009 int onDiskDataSize) { 1010 buff.rewind(); 1011 blockType.write(buff); 1012 buff.putInt(onDiskSize - HConstants.HFILEBLOCK_HEADER_SIZE); 1013 buff.putInt(uncompressedSize - HConstants.HFILEBLOCK_HEADER_SIZE); 1014 buff.putLong(prevOffset); 1015 buff.put(fileContext.getChecksumType().getCode()); 1016 buff.putInt(fileContext.getBytesPerChecksum()); 1017 buff.putInt(onDiskDataSize); 1018 } 1019 1020 private void putHeader(ByteArrayOutputStream dest, int onDiskSize, int uncompressedSize, 1021 int onDiskDataSize) { 1022 putHeader(dest.getBuffer(), 0, onDiskSize, uncompressedSize, onDiskDataSize); 1023 } 1024 1025 /** 1026 * Similar to {@link #writeHeaderAndData(FSDataOutputStream)}, but records the offset of this 1027 * block so that it can be referenced in the next block of the same type. 1028 */ 1029 void writeHeaderAndData(FSDataOutputStream out) throws IOException { 1030 long offset = out.getPos(); 1031 if (startOffset != UNSET && offset != startOffset) { 1032 throw new IOException("A " + blockType + " block written to a " 1033 + "stream twice, first at offset " + startOffset + ", then at " + offset); 1034 } 1035 startOffset = offset; 1036 finishBlockAndWriteHeaderAndData(out); 1037 } 1038 1039 /** 1040 * Writes the header and the compressed data of this block (or uncompressed data when not using 1041 * compression) into the given stream. Can be called in the "writing" state or in the "block 1042 * ready" state. If called in the "writing" state, transitions the writer to the "block ready" 1043 * state. 1044 * @param out the output stream to write the 1045 */ 1046 protected void finishBlockAndWriteHeaderAndData(DataOutputStream out) throws IOException { 1047 ensureBlockReady(); 1048 long startTime = EnvironmentEdgeManager.currentTime(); 1049 out.write(onDiskBlockBytesWithHeader.getBuffer(), 0, onDiskBlockBytesWithHeader.size()); 1050 out.write(onDiskChecksum); 1051 HFile.updateWriteLatency(EnvironmentEdgeManager.currentTime() - startTime); 1052 } 1053 1054 /** 1055 * Returns the header or the compressed data (or uncompressed data when not using compression) 1056 * as a byte array. Can be called in the "writing" state or in the "block ready" state. If 1057 * called in the "writing" state, transitions the writer to the "block ready" state. This 1058 * returns the header + data + checksums stored on disk. 1059 * @return header and data as they would be stored on disk in a byte array 1060 */ 1061 byte[] getHeaderAndDataForTest() throws IOException { 1062 ensureBlockReady(); 1063 // This is not very optimal, because we are doing an extra copy. 1064 // But this method is used only by unit tests. 1065 byte[] output = new byte[onDiskBlockBytesWithHeader.size() + onDiskChecksum.length]; 1066 System.arraycopy(onDiskBlockBytesWithHeader.getBuffer(), 0, output, 0, 1067 onDiskBlockBytesWithHeader.size()); 1068 System.arraycopy(onDiskChecksum, 0, output, onDiskBlockBytesWithHeader.size(), 1069 onDiskChecksum.length); 1070 return output; 1071 } 1072 1073 /** 1074 * Releases resources used by this writer. 1075 */ 1076 void release() { 1077 if (dataBlockEncodingCtx != null) { 1078 dataBlockEncodingCtx.close(); 1079 dataBlockEncodingCtx = null; 1080 } 1081 if (defaultBlockEncodingCtx != null) { 1082 defaultBlockEncodingCtx.close(); 1083 defaultBlockEncodingCtx = null; 1084 } 1085 } 1086 1087 /** 1088 * Returns the on-disk size of the data portion of the block. This is the compressed size if 1089 * compression is enabled. Can only be called in the "block ready" state. Header is not 1090 * compressed, and its size is not included in the return value. 1091 * @return the on-disk size of the block, not including the header. 1092 */ 1093 int getOnDiskSizeWithoutHeader() { 1094 expectState(State.BLOCK_READY); 1095 return onDiskBlockBytesWithHeader.size() + onDiskChecksum.length 1096 - HConstants.HFILEBLOCK_HEADER_SIZE; 1097 } 1098 1099 /** 1100 * Returns the on-disk size of the block. Can only be called in the "block ready" state. 1101 * @return the on-disk size of the block ready to be written, including the header size, the 1102 * data and the checksum data. 1103 */ 1104 int getOnDiskSizeWithHeader() { 1105 expectState(State.BLOCK_READY); 1106 return onDiskBlockBytesWithHeader.size() + onDiskChecksum.length; 1107 } 1108 1109 /** 1110 * The uncompressed size of the block data. Does not include header size. 1111 */ 1112 int getUncompressedSizeWithoutHeader() { 1113 expectState(State.BLOCK_READY); 1114 return baosInMemory.size() - HConstants.HFILEBLOCK_HEADER_SIZE; 1115 } 1116 1117 /** 1118 * The uncompressed size of the block data, including header size. 1119 */ 1120 public int getUncompressedSizeWithHeader() { 1121 expectState(State.BLOCK_READY); 1122 return baosInMemory.size(); 1123 } 1124 1125 /** Returns true if a block is being written */ 1126 boolean isWriting() { 1127 return state == State.WRITING; 1128 } 1129 1130 /** 1131 * Returns the number of bytes written into the current block so far, or zero if not writing the 1132 * block at the moment. Note that this will return zero in the "block ready" state as well. 1133 * @return the number of bytes written 1134 */ 1135 public int encodedBlockSizeWritten() { 1136 return state != State.WRITING ? 0 : this.getEncodingState().getEncodedDataSizeWritten(); 1137 } 1138 1139 /** 1140 * Returns the number of bytes written into the current block so far, or zero if not writing the 1141 * block at the moment. Note that this will return zero in the "block ready" state as well. 1142 * @return the number of bytes written 1143 */ 1144 public int blockSizeWritten() { 1145 return state != State.WRITING ? 0 : this.getEncodingState().getUnencodedDataSizeWritten(); 1146 } 1147 1148 /** 1149 * Clones the header followed by the uncompressed data, even if using compression. This is 1150 * needed for storing uncompressed blocks in the block cache. Can be called in the "writing" 1151 * state or the "block ready" state. Returns only the header and data, does not include checksum 1152 * data. 1153 * @return Returns an uncompressed block ByteBuff for caching on write 1154 */ 1155 ByteBuff cloneUncompressedBufferWithHeader() { 1156 expectState(State.BLOCK_READY); 1157 ByteBuff bytebuff = allocator.allocate(baosInMemory.size()); 1158 baosInMemory.toByteBuff(bytebuff); 1159 int numBytes = (int) ChecksumUtil.numBytes(onDiskBlockBytesWithHeader.size(), 1160 fileContext.getBytesPerChecksum()); 1161 putHeader(bytebuff, onDiskBlockBytesWithHeader.size() + numBytes, baosInMemory.size(), 1162 onDiskBlockBytesWithHeader.size()); 1163 bytebuff.rewind(); 1164 return bytebuff; 1165 } 1166 1167 /** 1168 * Clones the header followed by the on-disk (compressed/encoded/encrypted) data. This is needed 1169 * for storing packed blocks in the block cache. Returns only the header and data, Does not 1170 * include checksum data. 1171 * @return Returns a copy of block bytes for caching on write 1172 */ 1173 private ByteBuff cloneOnDiskBufferWithHeader() { 1174 expectState(State.BLOCK_READY); 1175 ByteBuff bytebuff = allocator.allocate(onDiskBlockBytesWithHeader.size()); 1176 onDiskBlockBytesWithHeader.toByteBuff(bytebuff); 1177 bytebuff.rewind(); 1178 return bytebuff; 1179 } 1180 1181 private void expectState(State expectedState) { 1182 if (state != expectedState) { 1183 throw new IllegalStateException( 1184 "Expected state: " + expectedState + ", actual state: " + state); 1185 } 1186 } 1187 1188 /** 1189 * Takes the given {@link BlockWritable} instance, creates a new block of its appropriate type, 1190 * writes the writable into this block, and flushes the block into the output stream. The writer 1191 * is instructed not to buffer uncompressed bytes for cache-on-write. 1192 * @param bw the block-writable object to write as a block 1193 * @param out the file system output stream 1194 */ 1195 void writeBlock(BlockWritable bw, FSDataOutputStream out) throws IOException { 1196 bw.writeToBlock(startWriting(bw.getBlockType())); 1197 writeHeaderAndData(out); 1198 } 1199 1200 /** 1201 * Creates a new HFileBlock. Checksums have already been validated, so the byte buffer passed 1202 * into the constructor of this newly created block does not have checksum data even though the 1203 * header minor version is MINOR_VERSION_WITH_CHECKSUM. This is indicated by setting a 0 value 1204 * in bytesPerChecksum. This method copies the on-disk or uncompressed data to build the 1205 * HFileBlock which is used only while writing blocks and caching. 1206 * <p> 1207 * TODO: Should there be an option where a cache can ask that hbase preserve block checksums for 1208 * checking after a block comes out of the cache? Otehrwise, cache is responsible for blocks 1209 * being wholesome (ECC memory or if file-backed, it does checksumming). 1210 */ 1211 HFileBlock getBlockForCaching(CacheConfig cacheConf) { 1212 HFileContext newContext = BlockCacheUtil.cloneContext(fileContext); 1213 // Build the HFileBlock. 1214 HFileBlockBuilder builder = new HFileBlockBuilder(); 1215 ByteBuff buff; 1216 if (cacheConf.shouldCacheCompressed(blockType.getCategory())) { 1217 buff = cloneOnDiskBufferWithHeader(); 1218 } else { 1219 buff = cloneUncompressedBufferWithHeader(); 1220 } 1221 return builder.withBlockType(blockType) 1222 .withOnDiskSizeWithoutHeader(getOnDiskSizeWithoutHeader()) 1223 .withUncompressedSizeWithoutHeader(getUncompressedSizeWithoutHeader()) 1224 .withPrevBlockOffset(prevOffset).withByteBuff(buff).withFillHeader(FILL_HEADER) 1225 .withOffset(startOffset).withNextBlockOnDiskSize(UNSET) 1226 .withOnDiskDataSizeWithHeader(onDiskBlockBytesWithHeader.size() + onDiskChecksum.length) 1227 .withHFileContext(newContext).withByteBuffAllocator(cacheConf.getByteBuffAllocator()) 1228 .withShared(!buff.hasArray()).build(); 1229 } 1230 } 1231 1232 /** Something that can be written into a block. */ 1233 interface BlockWritable { 1234 /** The type of block this data should use. */ 1235 BlockType getBlockType(); 1236 1237 /** 1238 * Writes the block to the provided stream. Must not write any magic records. 1239 * @param out a stream to write uncompressed data into 1240 */ 1241 void writeToBlock(DataOutput out) throws IOException; 1242 } 1243 1244 /** 1245 * Iterator for reading {@link HFileBlock}s in load-on-open-section, such as root data index 1246 * block, meta index block, file info block etc. 1247 */ 1248 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST) 1249 public interface BlockIterator { 1250 /** 1251 * Get the next block, or null if there are no more blocks to iterate. 1252 */ 1253 HFileBlock nextBlock() throws IOException; 1254 1255 /** 1256 * Similar to {@link #nextBlock()} but checks block type, throws an exception if incorrect, and 1257 * returns the HFile block 1258 */ 1259 HFileBlock nextBlockWithBlockType(BlockType blockType) throws IOException; 1260 1261 /** 1262 * Now we use the {@link ByteBuffAllocator} to manage the nio ByteBuffers for HFileBlocks, so we 1263 * must deallocate all of the ByteBuffers in the end life. the BlockIterator's life cycle is 1264 * starting from opening an HFileReader and stopped when the HFileReader#close, so we will keep 1265 * track all the read blocks until we call {@link BlockIterator#freeBlocks()} when closing the 1266 * HFileReader. Sum bytes of those blocks in load-on-open section should be quite small, so 1267 * tracking them should be OK. 1268 */ 1269 void freeBlocks(); 1270 } 1271 1272 /** An HFile block reader with iteration ability. */ 1273 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST) 1274 public interface FSReader { 1275 /** 1276 * Reads the block at the given offset in the file with the given on-disk size and uncompressed 1277 * size. 1278 * @param offset of the file to read 1279 * @param onDiskSize the on-disk size of the entire block, including all applicable headers, 1280 * or -1 if unknown 1281 * @param pread true to use pread, otherwise use the stream read. 1282 * @param updateMetrics update the metrics or not. 1283 * @param intoHeap allocate the block's ByteBuff by {@link ByteBuffAllocator} or JVM heap. 1284 * For LRUBlockCache, we must ensure that the block to cache is an heap 1285 * one, because the memory occupation is based on heap now, also for 1286 * {@link CombinedBlockCache}, we use the heap LRUBlockCache as L1 cache to 1287 * cache small blocks such as IndexBlock or MetaBlock for faster access. So 1288 * introduce an flag here to decide whether allocate from JVM heap or not 1289 * so that we can avoid an extra off-heap to heap memory copy when using 1290 * LRUBlockCache. For most cases, we known what's the expected block type 1291 * we'll read, while for some special case (Example: 1292 * HFileReaderImpl#readNextDataBlock()), we cannot pre-decide what's the 1293 * expected block type, then we can only allocate block's ByteBuff from 1294 * {@link ByteBuffAllocator} firstly, and then when caching it in 1295 * {@link LruBlockCache} we'll check whether the ByteBuff is from heap or 1296 * not, if not then we'll clone it to an heap one and cache it. 1297 * @return the newly read block 1298 */ 1299 HFileBlock readBlockData(long offset, long onDiskSize, boolean pread, boolean updateMetrics, 1300 boolean intoHeap) throws IOException; 1301 1302 /** 1303 * Creates a block iterator over the given portion of the {@link HFile}. The iterator returns 1304 * blocks starting with offset such that offset <= startOffset < endOffset. Returned 1305 * blocks are always unpacked. Used when no hfile index available; e.g. reading in the hfile 1306 * index blocks themselves on file open. 1307 * @param startOffset the offset of the block to start iteration with 1308 * @param endOffset the offset to end iteration at (exclusive) 1309 * @return an iterator of blocks between the two given offsets 1310 */ 1311 BlockIterator blockRange(long startOffset, long endOffset); 1312 1313 /** Closes the backing streams */ 1314 void closeStreams() throws IOException; 1315 1316 /** Get a decoder for {@link BlockType#ENCODED_DATA} blocks from this file. */ 1317 HFileBlockDecodingContext getBlockDecodingContext(); 1318 1319 /** Get the default decoder for blocks from this file. */ 1320 HFileBlockDecodingContext getDefaultBlockDecodingContext(); 1321 1322 void setIncludesMemStoreTS(boolean includesMemstoreTS); 1323 1324 void setDataBlockEncoder(HFileDataBlockEncoder encoder, Configuration conf); 1325 1326 /** 1327 * To close the stream's socket. Note: This can be concurrently called from multiple threads and 1328 * implementation should take care of thread safety. 1329 */ 1330 void unbufferStream(); 1331 } 1332 1333 /** 1334 * Data-structure to use caching the header of the NEXT block. Only works if next read that comes 1335 * in here is next in sequence in this block. When we read, we read current block and the next 1336 * blocks' header. We do this so we have the length of the next block to read if the hfile index 1337 * is not available (rare, at hfile open only). 1338 */ 1339 private static class PrefetchedHeader { 1340 long offset = -1; 1341 byte[] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE]; 1342 final ByteBuff buf = new SingleByteBuff(ByteBuffer.wrap(header, 0, header.length)); 1343 1344 @Override 1345 public String toString() { 1346 return "offset=" + this.offset + ", header=" + Bytes.toStringBinary(header); 1347 } 1348 } 1349 1350 /** 1351 * Reads version 2 HFile blocks from the filesystem. 1352 */ 1353 static class FSReaderImpl implements FSReader { 1354 /** 1355 * The file system stream of the underlying {@link HFile} that does or doesn't do checksum 1356 * validations in the filesystem 1357 */ 1358 private FSDataInputStreamWrapper streamWrapper; 1359 1360 private HFileBlockDecodingContext encodedBlockDecodingCtx; 1361 1362 /** Default context used when BlockType != {@link BlockType#ENCODED_DATA}. */ 1363 private final HFileBlockDefaultDecodingContext defaultDecodingCtx; 1364 1365 /** 1366 * Cache of the NEXT header after this. Check it is indeed next blocks header before using it. 1367 * TODO: Review. This overread into next block to fetch next blocks header seems unnecessary 1368 * given we usually get the block size from the hfile index. Review! 1369 */ 1370 private AtomicReference<PrefetchedHeader> prefetchedHeader = 1371 new AtomicReference<>(new PrefetchedHeader()); 1372 1373 /** The size of the file we are reading from, or -1 if unknown. */ 1374 private long fileSize; 1375 1376 /** The size of the header */ 1377 protected final int hdrSize; 1378 1379 /** The filesystem used to access data */ 1380 private HFileSystem hfs; 1381 1382 private HFileContext fileContext; 1383 // Cache the fileName 1384 private String pathName; 1385 1386 private final ByteBuffAllocator allocator; 1387 1388 private final Lock streamLock = new ReentrantLock(); 1389 1390 private final boolean isPreadAllBytes; 1391 1392 private final long readWarnTime; 1393 1394 /** 1395 * If reading block cost time in milliseconds more than the threshold, a warning will be logged. 1396 */ 1397 public static final String FS_READER_WARN_TIME_MS = "hbase.fs.reader.warn.time.ms"; 1398 1399 FSReaderImpl(ReaderContext readerContext, HFileContext fileContext, ByteBuffAllocator allocator, 1400 Configuration conf) throws IOException { 1401 this.fileSize = readerContext.getFileSize(); 1402 this.hfs = readerContext.getFileSystem(); 1403 if (readerContext.getFilePath() != null) { 1404 this.pathName = readerContext.getFilePath().toString(); 1405 } 1406 this.fileContext = fileContext; 1407 this.hdrSize = headerSize(fileContext.isUseHBaseChecksum()); 1408 this.allocator = allocator; 1409 1410 this.streamWrapper = readerContext.getInputStreamWrapper(); 1411 // Older versions of HBase didn't support checksum. 1412 this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum()); 1413 defaultDecodingCtx = new HFileBlockDefaultDecodingContext(conf, fileContext); 1414 encodedBlockDecodingCtx = defaultDecodingCtx; 1415 isPreadAllBytes = readerContext.isPreadAllBytes(); 1416 // Default warn threshold set to -1, it means skipping record the read block slow warning log. 1417 readWarnTime = conf.getLong(FS_READER_WARN_TIME_MS, -1L); 1418 } 1419 1420 @Override 1421 public BlockIterator blockRange(final long startOffset, final long endOffset) { 1422 final FSReader owner = this; // handle for inner class 1423 return new BlockIterator() { 1424 private volatile boolean freed = false; 1425 // Tracking all read blocks until we call freeBlocks. 1426 private List<HFileBlock> blockTracker = new ArrayList<>(); 1427 private long offset = startOffset; 1428 // Cache length of next block. Current block has the length of next block in it. 1429 private long length = -1; 1430 1431 @Override 1432 public HFileBlock nextBlock() throws IOException { 1433 if (offset >= endOffset) { 1434 return null; 1435 } 1436 HFileBlock b = readBlockData(offset, length, false, false, true); 1437 offset += b.getOnDiskSizeWithHeader(); 1438 length = b.getNextBlockOnDiskSize(); 1439 HFileBlock uncompressed = b.unpack(fileContext, owner); 1440 if (uncompressed != b) { 1441 b.release(); // Need to release the compressed Block now. 1442 } 1443 blockTracker.add(uncompressed); 1444 return uncompressed; 1445 } 1446 1447 @Override 1448 public HFileBlock nextBlockWithBlockType(BlockType blockType) throws IOException { 1449 HFileBlock blk = nextBlock(); 1450 if (blk.getBlockType() != blockType) { 1451 throw new IOException( 1452 "Expected block of type " + blockType + " but found " + blk.getBlockType()); 1453 } 1454 return blk; 1455 } 1456 1457 @Override 1458 public void freeBlocks() { 1459 if (freed) { 1460 return; 1461 } 1462 blockTracker.forEach(HFileBlock::release); 1463 blockTracker = null; 1464 freed = true; 1465 } 1466 }; 1467 } 1468 1469 /** 1470 * Does a positional read or a seek and read into the given byte buffer. We need take care that 1471 * we will call the {@link ByteBuff#release()} for every exit to deallocate the ByteBuffers, 1472 * otherwise the memory leak may happen. 1473 * @param dest destination buffer 1474 * @param size size of read 1475 * @param peekIntoNextBlock whether to read the next block's on-disk size 1476 * @param fileOffset position in the stream to read at 1477 * @param pread whether we should do a positional read 1478 * @param istream The input source of data 1479 * @return true to indicate the destination buffer include the next block header, otherwise only 1480 * include the current block data without the next block header. 1481 * @throws IOException if any IO error happen. 1482 */ 1483 protected boolean readAtOffset(FSDataInputStream istream, ByteBuff dest, int size, 1484 boolean peekIntoNextBlock, long fileOffset, boolean pread) throws IOException { 1485 if (!pread) { 1486 // Seek + read. Better for scanning. 1487 istream.seek(fileOffset); 1488 long realOffset = istream.getPos(); 1489 if (realOffset != fileOffset) { 1490 throw new IOException("Tried to seek to " + fileOffset + " to read " + size 1491 + " bytes, but pos=" + realOffset + " after seek"); 1492 } 1493 if (!peekIntoNextBlock) { 1494 BlockIOUtils.readFully(dest, istream, size); 1495 return false; 1496 } 1497 1498 // Try to read the next block header 1499 if (!BlockIOUtils.readWithExtra(dest, istream, size, hdrSize)) { 1500 // did not read the next block header. 1501 return false; 1502 } 1503 } else { 1504 // Positional read. Better for random reads; or when the streamLock is already locked. 1505 int extraSize = peekIntoNextBlock ? hdrSize : 0; 1506 if ( 1507 !BlockIOUtils.preadWithExtra(dest, istream, fileOffset, size, extraSize, isPreadAllBytes) 1508 ) { 1509 // did not read the next block header. 1510 return false; 1511 } 1512 } 1513 assert peekIntoNextBlock; 1514 return true; 1515 } 1516 1517 /** 1518 * Reads a version 2 block (version 1 blocks not supported and not expected). Tries to do as 1519 * little memory allocation as possible, using the provided on-disk size. 1520 * @param offset the offset in the stream to read at 1521 * @param onDiskSizeWithHeaderL the on-disk size of the block, including the header, or -1 if 1522 * unknown; i.e. when iterating over blocks reading in the file 1523 * metadata info. 1524 * @param pread whether to use a positional read 1525 * @param updateMetrics whether to update the metrics 1526 * @param intoHeap allocate ByteBuff of block from heap or off-heap. 1527 * @see FSReader#readBlockData(long, long, boolean, boolean, boolean) for more details about the 1528 * useHeap. 1529 */ 1530 @Override 1531 public HFileBlock readBlockData(long offset, long onDiskSizeWithHeaderL, boolean pread, 1532 boolean updateMetrics, boolean intoHeap) throws IOException { 1533 // Get a copy of the current state of whether to validate 1534 // hbase checksums or not for this read call. This is not 1535 // thread-safe but the one constraint is that if we decide 1536 // to skip hbase checksum verification then we are 1537 // guaranteed to use hdfs checksum verification. 1538 boolean doVerificationThruHBaseChecksum = streamWrapper.shouldUseHBaseChecksum(); 1539 FSDataInputStream is = streamWrapper.getStream(doVerificationThruHBaseChecksum); 1540 final Context context = Context.current().with(CONTEXT_KEY, 1541 new HFileContextAttributesBuilderConsumer(fileContext) 1542 .setSkipChecksum(doVerificationThruHBaseChecksum) 1543 .setReadType(pread ? ReadType.POSITIONAL_READ : ReadType.SEEK_PLUS_READ)); 1544 try (Scope ignored = context.makeCurrent()) { 1545 HFileBlock blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread, 1546 doVerificationThruHBaseChecksum, updateMetrics, intoHeap); 1547 if (blk == null) { 1548 HFile.LOG.warn("HBase checksum verification failed for file {} at offset {} filesize {}." 1549 + " Retrying read with HDFS checksums turned on...", pathName, offset, fileSize); 1550 1551 if (!doVerificationThruHBaseChecksum) { 1552 String msg = "HBase checksum verification failed for file " + pathName + " at offset " 1553 + offset + " filesize " + fileSize + " but this cannot happen because doVerify is " 1554 + doVerificationThruHBaseChecksum; 1555 HFile.LOG.warn(msg); 1556 throw new IOException(msg); // cannot happen case here 1557 } 1558 HFile.CHECKSUM_FAILURES.increment(); // update metrics 1559 1560 // If we have a checksum failure, we fall back into a mode where 1561 // the next few reads use HDFS level checksums. We aim to make the 1562 // next CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD reads avoid 1563 // hbase checksum verification, but since this value is set without 1564 // holding any locks, it can so happen that we might actually do 1565 // a few more than precisely this number. 1566 is = this.streamWrapper.fallbackToFsChecksum(CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD); 1567 doVerificationThruHBaseChecksum = false; 1568 blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread, 1569 doVerificationThruHBaseChecksum, updateMetrics, intoHeap); 1570 if (blk != null) { 1571 HFile.LOG.warn( 1572 "HDFS checksum verification succeeded for file {} at offset {} filesize" + " {}", 1573 pathName, offset, fileSize); 1574 } 1575 } 1576 if (blk == null && !doVerificationThruHBaseChecksum) { 1577 String msg = 1578 "readBlockData failed, possibly due to " + "checksum verification failed for file " 1579 + pathName + " at offset " + offset + " filesize " + fileSize; 1580 HFile.LOG.warn(msg); 1581 throw new IOException(msg); 1582 } 1583 1584 // If there is a checksum mismatch earlier, then retry with 1585 // HBase checksums switched off and use HDFS checksum verification. 1586 // This triggers HDFS to detect and fix corrupt replicas. The 1587 // next checksumOffCount read requests will use HDFS checksums. 1588 // The decrementing of this.checksumOffCount is not thread-safe, 1589 // but it is harmless because eventually checksumOffCount will be 1590 // a negative number. 1591 streamWrapper.checksumOk(); 1592 return blk; 1593 } 1594 } 1595 1596 /** 1597 * Check that checksumType on {@code headerBuf} read from a block header seems reasonable, 1598 * within the known value range. 1599 * @return {@code true} if the headerBuf is safe to proceed, {@code false} otherwise. 1600 */ 1601 private boolean checkCheckSumTypeOnHeaderBuf(ByteBuff headerBuf) { 1602 if (headerBuf == null) { 1603 return true; 1604 } 1605 byte b = headerBuf.get(HFileBlock.Header.CHECKSUM_TYPE_INDEX); 1606 for (ChecksumType t : ChecksumType.values()) { 1607 if (t.getCode() == b) { 1608 return true; 1609 } 1610 } 1611 return false; 1612 } 1613 1614 /** 1615 * Check that {@code value} read from a block header seems reasonable, within a large margin of 1616 * error. 1617 * @return {@code true} if the value is safe to proceed, {@code false} otherwise. 1618 */ 1619 private boolean checkOnDiskSizeWithHeader(int value) { 1620 if (value < 0) { 1621 if (LOG.isTraceEnabled()) { 1622 LOG.trace( 1623 "onDiskSizeWithHeader={}; value represents a size, so it should never be negative.", 1624 value); 1625 } 1626 return false; 1627 } 1628 if (value - hdrSize < 0) { 1629 if (LOG.isTraceEnabled()) { 1630 LOG.trace("onDiskSizeWithHeader={}, hdrSize={}; don't accept a value that is negative" 1631 + " after the header size is excluded.", value, hdrSize); 1632 } 1633 return false; 1634 } 1635 return true; 1636 } 1637 1638 /** 1639 * Check that {@code value} provided by the calling context seems reasonable, within a large 1640 * margin of error. 1641 * @return {@code true} if the value is safe to proceed, {@code false} otherwise. 1642 */ 1643 private boolean checkCallerProvidedOnDiskSizeWithHeader(long value) { 1644 // same validation logic as is used by Math.toIntExact(long) 1645 int intValue = (int) value; 1646 if (intValue != value) { 1647 if (LOG.isTraceEnabled()) { 1648 LOG.trace("onDiskSizeWithHeaderL={}; value exceeds int size limits.", value); 1649 } 1650 return false; 1651 } 1652 if (intValue == -1) { 1653 // a magic value we expect to see. 1654 return true; 1655 } 1656 return checkOnDiskSizeWithHeader(intValue); 1657 } 1658 1659 /** 1660 * Check atomic reference cache for this block's header. Cache only good if next read coming 1661 * through is next in sequence in the block. We read next block's header on the tail of reading 1662 * the previous block to save a seek. Otherwise, we have to do a seek to read the header before 1663 * we can pull in the block OR we have to backup the stream because we over-read (the next 1664 * block's header). 1665 * @see PrefetchedHeader 1666 * @return The cached block header or null if not found. 1667 * @see #cacheNextBlockHeader(long, ByteBuff, int, int) 1668 */ 1669 private ByteBuff getCachedHeader(final long offset) { 1670 PrefetchedHeader ph = this.prefetchedHeader.get(); 1671 return ph != null && ph.offset == offset ? ph.buf : null; 1672 } 1673 1674 /** 1675 * Save away the next blocks header in atomic reference. 1676 * @see #getCachedHeader(long) 1677 * @see PrefetchedHeader 1678 */ 1679 private void cacheNextBlockHeader(final long offset, ByteBuff onDiskBlock, 1680 int onDiskSizeWithHeader, int headerLength) { 1681 PrefetchedHeader ph = new PrefetchedHeader(); 1682 ph.offset = offset; 1683 onDiskBlock.get(onDiskSizeWithHeader, ph.header, 0, headerLength); 1684 this.prefetchedHeader.set(ph); 1685 } 1686 1687 /** 1688 * Clear the cached value when its integrity is suspect. 1689 */ 1690 private void invalidateNextBlockHeader() { 1691 prefetchedHeader.set(null); 1692 } 1693 1694 private int getNextBlockOnDiskSize(ByteBuff onDiskBlock, int onDiskSizeWithHeader) { 1695 return onDiskBlock.getIntAfterPosition(onDiskSizeWithHeader + BlockType.MAGIC_LENGTH) 1696 + hdrSize; 1697 } 1698 1699 private ByteBuff allocate(int size, boolean intoHeap) { 1700 return intoHeap ? HEAP.allocate(size) : allocator.allocate(size); 1701 } 1702 1703 /** 1704 * Reads a version 2 block. 1705 * @param offset the offset in the stream to read at. 1706 * @param onDiskSizeWithHeaderL the on-disk size of the block, including the header and 1707 * checksums if present or -1 if unknown (as a long). Can be -1 if 1708 * we are doing raw iteration of blocks as when loading up file 1709 * metadata; i.e. the first read of a new file. Usually non-null 1710 * gotten from the file index. 1711 * @param pread whether to use a positional read 1712 * @param verifyChecksum Whether to use HBase checksums. If HBase checksum is switched 1713 * off, then use HDFS checksum. Can also flip on/off reading same 1714 * file if we hit a troublesome patch in an hfile. 1715 * @param updateMetrics whether need to update the metrics. 1716 * @param intoHeap allocate the ByteBuff of block from heap or off-heap. 1717 * @return the HFileBlock or null if there is a HBase checksum mismatch 1718 */ 1719 protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset, 1720 long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum, boolean updateMetrics, 1721 boolean intoHeap) throws IOException { 1722 final Span span = Span.current(); 1723 final Attributes attributes = getReadDataBlockInternalAttributes(span); 1724 if (offset < 0) { 1725 throw new IOException("Invalid offset=" + offset + " trying to read " + "block (onDiskSize=" 1726 + onDiskSizeWithHeaderL + ")"); 1727 } 1728 if (!checkCallerProvidedOnDiskSizeWithHeader(onDiskSizeWithHeaderL)) { 1729 LOG.trace("Caller provided invalid onDiskSizeWithHeaderL={}", onDiskSizeWithHeaderL); 1730 onDiskSizeWithHeaderL = -1; 1731 } 1732 int onDiskSizeWithHeader = (int) onDiskSizeWithHeaderL; 1733 1734 // Try to use the cached header. Will serve us in rare case where onDiskSizeWithHeaderL==-1 1735 // and will save us having to seek the stream backwards to reread the header we 1736 // read the last time through here. 1737 ByteBuff headerBuf = getCachedHeader(offset); 1738 LOG.trace( 1739 "Reading {} at offset={}, pread={}, verifyChecksum={}, cachedHeader={}, " 1740 + "onDiskSizeWithHeader={}", 1741 this.fileContext.getHFileName(), offset, pread, verifyChecksum, headerBuf, 1742 onDiskSizeWithHeader); 1743 // This is NOT same as verifyChecksum. This latter is whether to do hbase 1744 // checksums. Can change with circumstances. The below flag is whether the 1745 // file has support for checksums (version 2+). 1746 boolean checksumSupport = this.fileContext.isUseHBaseChecksum(); 1747 boolean isScanMetricsEnabled = ThreadLocalServerSideScanMetrics.isScanMetricsEnabled(); 1748 long startTime = EnvironmentEdgeManager.currentTime(); 1749 if (onDiskSizeWithHeader == -1) { 1750 // The caller does not know the block size. Need to get it from the header. If header was 1751 // not cached (see getCachedHeader above), need to seek to pull it in. This is costly 1752 // and should happen very rarely. Currently happens on open of a hfile reader where we 1753 // read the trailer blocks to pull in the indices. Otherwise, we are reading block sizes 1754 // out of the hfile index. To check, enable TRACE in this file and you'll get an exception 1755 // in a LOG every time we seek. See HBASE-17072 for more detail. 1756 if (headerBuf == null) { 1757 if (LOG.isTraceEnabled()) { 1758 LOG.trace("Extra seek to get block size!", new RuntimeException()); 1759 } 1760 span.addEvent("Extra seek to get block size!", attributes); 1761 headerBuf = HEAP.allocate(hdrSize); 1762 readAtOffset(is, headerBuf, hdrSize, false, offset, pread); 1763 headerBuf.rewind(); 1764 if (isScanMetricsEnabled) { 1765 ThreadLocalServerSideScanMetrics.addBytesReadFromFs(hdrSize); 1766 } 1767 } 1768 onDiskSizeWithHeader = getOnDiskSizeWithHeader(headerBuf, checksumSupport); 1769 } 1770 1771 // Inspect the header's checksumType for known valid values. If we don't find such a value, 1772 // assume that the bytes read are corrupted.We will clear the cached value and roll back to 1773 // HDFS checksum 1774 if (!checkCheckSumTypeOnHeaderBuf(headerBuf)) { 1775 if (verifyChecksum) { 1776 invalidateNextBlockHeader(); 1777 span.addEvent("Falling back to HDFS checksumming.", attributes); 1778 return null; 1779 } else { 1780 throw new IOException( 1781 "Unknown checksum type code " + headerBuf.get(HFileBlock.Header.CHECKSUM_TYPE_INDEX) 1782 + "for file " + pathName + ", the headerBuf of HFileBlock may corrupted."); 1783 } 1784 } 1785 1786 // The common case is that onDiskSizeWithHeader was produced by a read without checksum 1787 // validation, so give it a sanity check before trying to use it. 1788 if (!checkOnDiskSizeWithHeader(onDiskSizeWithHeader)) { 1789 if (verifyChecksum) { 1790 invalidateNextBlockHeader(); 1791 span.addEvent("Falling back to HDFS checksumming.", attributes); 1792 return null; 1793 } else { 1794 throw new IOException("Invalid onDiskSizeWithHeader=" + onDiskSizeWithHeader); 1795 } 1796 } 1797 1798 int preReadHeaderSize = headerBuf == null ? 0 : hdrSize; 1799 // Allocate enough space to fit the next block's header too; saves a seek next time through. 1800 // onDiskBlock is whole block + header + checksums then extra hdrSize to read next header; 1801 // onDiskSizeWithHeader is header, body, and any checksums if present. preReadHeaderSize 1802 // says where to start reading. If we have the header cached, then we don't need to read 1803 // it again and we can likely read from last place we left off w/o need to backup and reread 1804 // the header we read last time through here. 1805 ByteBuff onDiskBlock = this.allocate(onDiskSizeWithHeader + hdrSize, intoHeap); 1806 boolean initHFileBlockSuccess = false; 1807 try { 1808 if (headerBuf != null) { 1809 onDiskBlock.put(0, headerBuf, 0, hdrSize).position(hdrSize); 1810 } 1811 boolean readNextHeader = readAtOffset(is, onDiskBlock, 1812 onDiskSizeWithHeader - preReadHeaderSize, true, offset + preReadHeaderSize, pread); 1813 onDiskBlock.rewind(); // in case of moving position when copying a cached header 1814 if (isScanMetricsEnabled) { 1815 long bytesRead = 1816 (onDiskSizeWithHeader - preReadHeaderSize) + (readNextHeader ? hdrSize : 0); 1817 ThreadLocalServerSideScanMetrics.addBytesReadFromFs(bytesRead); 1818 ThreadLocalServerSideScanMetrics.addBlockReadOpsCount(1); 1819 } 1820 1821 // the call to validateChecksum for this block excludes the next block header over-read, so 1822 // no reason to delay extracting this value. 1823 int nextBlockOnDiskSize = -1; 1824 if (readNextHeader) { 1825 int parsedVal = getNextBlockOnDiskSize(onDiskBlock, onDiskSizeWithHeader); 1826 if (checkOnDiskSizeWithHeader(parsedVal)) { 1827 nextBlockOnDiskSize = parsedVal; 1828 } 1829 } 1830 if (headerBuf == null) { 1831 headerBuf = onDiskBlock.duplicate().position(0).limit(hdrSize); 1832 } 1833 1834 ByteBuff curBlock = onDiskBlock.duplicate().position(0).limit(onDiskSizeWithHeader); 1835 // Verify checksum of the data before using it for building HFileBlock. 1836 if (verifyChecksum && !validateChecksum(offset, curBlock, hdrSize)) { 1837 invalidateNextBlockHeader(); 1838 span.addEvent("Falling back to HDFS checksumming.", attributes); 1839 return null; 1840 } 1841 1842 // TODO: is this check necessary or can we proceed with a provided value regardless of 1843 // what is in the header? 1844 int fromHeader = getOnDiskSizeWithHeader(headerBuf, checksumSupport); 1845 if (onDiskSizeWithHeader != fromHeader) { 1846 if (LOG.isTraceEnabled()) { 1847 LOG.trace("Passed in onDiskSizeWithHeader={} != {}, offset={}, fileContext={}", 1848 onDiskSizeWithHeader, fromHeader, offset, this.fileContext); 1849 } 1850 if (checksumSupport && verifyChecksum) { 1851 // This file supports HBase checksums and verification of those checksums was 1852 // requested. The block size provided by the caller (presumably from the block index) 1853 // does not match the block size written to the block header. treat this as 1854 // HBase-checksum failure. 1855 span.addEvent("Falling back to HDFS checksumming.", attributes); 1856 invalidateNextBlockHeader(); 1857 return null; 1858 } 1859 throw new IOException("Passed in onDiskSizeWithHeader=" + onDiskSizeWithHeader + " != " 1860 + fromHeader + ", offset=" + offset + ", fileContext=" + this.fileContext); 1861 } 1862 1863 // remove checksum from buffer now that it's verified 1864 int sizeWithoutChecksum = curBlock.getInt(Header.ON_DISK_DATA_SIZE_WITH_HEADER_INDEX); 1865 curBlock.limit(sizeWithoutChecksum); 1866 long duration = EnvironmentEdgeManager.currentTime() - startTime; 1867 boolean tooSlow = this.readWarnTime >= 0 && duration > this.readWarnTime; 1868 if (updateMetrics) { 1869 HFile.updateReadLatency(duration, pread, tooSlow); 1870 } 1871 // The onDiskBlock will become the headerAndDataBuffer for this block. 1872 // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already 1873 // contains the header of next block, so no need to set next block's header in it. 1874 HFileBlock hFileBlock = createFromBuff(curBlock, checksumSupport, offset, 1875 nextBlockOnDiskSize, fileContext, intoHeap ? HEAP : allocator); 1876 // Run check on uncompressed sizings. 1877 if (!fileContext.isCompressedOrEncrypted()) { 1878 hFileBlock.sanityCheckUncompressed(); 1879 } 1880 LOG.trace("Read {} in {} ms", hFileBlock, duration); 1881 if (!LOG.isTraceEnabled() && tooSlow) { 1882 LOG.warn("Read Block Slow: read {} cost {} ms, threshold = {} ms", hFileBlock, duration, 1883 this.readWarnTime); 1884 } 1885 span.addEvent("Read block", attributes); 1886 // Cache next block header if we read it for the next time through here. 1887 if (nextBlockOnDiskSize != -1) { 1888 cacheNextBlockHeader(offset + hFileBlock.getOnDiskSizeWithHeader(), onDiskBlock, 1889 onDiskSizeWithHeader, hdrSize); 1890 } 1891 initHFileBlockSuccess = true; 1892 return hFileBlock; 1893 } finally { 1894 if (!initHFileBlockSuccess) { 1895 onDiskBlock.release(); 1896 } 1897 } 1898 } 1899 1900 @Override 1901 public void setIncludesMemStoreTS(boolean includesMemstoreTS) { 1902 this.fileContext = 1903 new HFileContextBuilder(this.fileContext).withIncludesMvcc(includesMemstoreTS).build(); 1904 } 1905 1906 @Override 1907 public void setDataBlockEncoder(HFileDataBlockEncoder encoder, Configuration conf) { 1908 encodedBlockDecodingCtx = encoder.newDataBlockDecodingContext(conf, fileContext); 1909 } 1910 1911 @Override 1912 public HFileBlockDecodingContext getBlockDecodingContext() { 1913 return this.encodedBlockDecodingCtx; 1914 } 1915 1916 @Override 1917 public HFileBlockDecodingContext getDefaultBlockDecodingContext() { 1918 return this.defaultDecodingCtx; 1919 } 1920 1921 /** 1922 * Generates the checksum for the header as well as the data and then validates it. If the block 1923 * doesn't uses checksum, returns false. 1924 * @return True if checksum matches, else false. 1925 */ 1926 private boolean validateChecksum(long offset, ByteBuff data, int hdrSize) { 1927 // If this is an older version of the block that does not have checksums, then return false 1928 // indicating that checksum verification did not succeed. Actually, this method should never 1929 // be called when the minorVersion is 0, thus this is a defensive check for a cannot-happen 1930 // case. Since this is a cannot-happen case, it is better to return false to indicate a 1931 // checksum validation failure. 1932 if (!fileContext.isUseHBaseChecksum()) { 1933 return false; 1934 } 1935 1936 // If the checksumType of the read block header is incorrect, it indicates that the block is 1937 // corrupted and can be directly rolled back to HDFS checksum verification 1938 if (!checkCheckSumTypeOnHeaderBuf(data)) { 1939 HFile.LOG.warn( 1940 "HBase checksumType verification failed for file {} at offset {} filesize {}" 1941 + " checksumType {}. Retrying read with HDFS checksums turned on...", 1942 pathName, offset, fileSize, data.get(HFileBlock.Header.CHECKSUM_TYPE_INDEX)); 1943 return false; 1944 } 1945 1946 return ChecksumUtil.validateChecksum(data, pathName, offset, hdrSize); 1947 } 1948 1949 @Override 1950 public void closeStreams() throws IOException { 1951 streamWrapper.close(); 1952 } 1953 1954 @Override 1955 public void unbufferStream() { 1956 // To handle concurrent reads, ensure that no other client is accessing the streams while we 1957 // unbuffer it. 1958 if (streamLock.tryLock()) { 1959 try { 1960 this.streamWrapper.unbuffer(); 1961 } finally { 1962 streamLock.unlock(); 1963 } 1964 } 1965 } 1966 1967 @Override 1968 public String toString() { 1969 return "hfs=" + hfs + ", path=" + pathName + ", fileContext=" + fileContext; 1970 } 1971 } 1972 1973 /** An additional sanity-check in case no compression or encryption is being used. */ 1974 void sanityCheckUncompressed() throws IOException { 1975 if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader + totalChecksumBytes()) { 1976 throw new IOException("Using no compression but " + "onDiskSizeWithoutHeader=" 1977 + onDiskSizeWithoutHeader + ", " + "uncompressedSizeWithoutHeader=" 1978 + uncompressedSizeWithoutHeader + ", numChecksumbytes=" + totalChecksumBytes()); 1979 } 1980 } 1981 1982 // Cacheable implementation 1983 @Override 1984 public int getSerializedLength() { 1985 if (bufWithoutChecksum != null) { 1986 // Include extra bytes for block metadata. 1987 return this.bufWithoutChecksum.limit() + BLOCK_METADATA_SPACE; 1988 } 1989 return 0; 1990 } 1991 1992 // Cacheable implementation 1993 @Override 1994 public void serialize(ByteBuffer destination, boolean includeNextBlockMetadata) { 1995 this.bufWithoutChecksum.get(destination, 0, getSerializedLength() - BLOCK_METADATA_SPACE); 1996 destination = addMetaData(destination, includeNextBlockMetadata); 1997 1998 // Make it ready for reading. flip sets position to zero and limit to current position which 1999 // is what we want if we do not want to serialize the block plus checksums if present plus 2000 // metadata. 2001 destination.flip(); 2002 } 2003 2004 /** 2005 * For use by bucketcache. This exposes internals. 2006 */ 2007 public ByteBuffer getMetaData(ByteBuffer bb) { 2008 bb = addMetaData(bb, true); 2009 bb.flip(); 2010 return bb; 2011 } 2012 2013 /** 2014 * Adds metadata at current position (position is moved forward). Does not flip or reset. 2015 * @return The passed <code>destination</code> with metadata added. 2016 */ 2017 private ByteBuffer addMetaData(final ByteBuffer destination, boolean includeNextBlockMetadata) { 2018 destination.put(this.fileContext.isUseHBaseChecksum() ? (byte) 1 : (byte) 0); 2019 destination.putLong(this.offset); 2020 if (includeNextBlockMetadata) { 2021 destination.putInt(this.nextBlockOnDiskSize); 2022 } 2023 return destination; 2024 } 2025 2026 // Cacheable implementation 2027 @Override 2028 public CacheableDeserializer<Cacheable> getDeserializer() { 2029 return HFileBlock.BLOCK_DESERIALIZER; 2030 } 2031 2032 @Override 2033 public int hashCode() { 2034 int result = 1; 2035 result = result * 31 + blockType.hashCode(); 2036 result = result * 31 + nextBlockOnDiskSize; 2037 result = result * 31 + (int) (offset ^ (offset >>> 32)); 2038 result = result * 31 + onDiskSizeWithoutHeader; 2039 result = result * 31 + (int) (prevBlockOffset ^ (prevBlockOffset >>> 32)); 2040 result = result * 31 + uncompressedSizeWithoutHeader; 2041 result = result * 31 + bufWithoutChecksum.hashCode(); 2042 return result; 2043 } 2044 2045 @Override 2046 public boolean equals(Object comparison) { 2047 if (this == comparison) { 2048 return true; 2049 } 2050 if (comparison == null) { 2051 return false; 2052 } 2053 if (!(comparison instanceof HFileBlock)) { 2054 return false; 2055 } 2056 2057 HFileBlock castedComparison = (HFileBlock) comparison; 2058 2059 if (castedComparison.blockType != this.blockType) { 2060 return false; 2061 } 2062 if (castedComparison.nextBlockOnDiskSize != this.nextBlockOnDiskSize) { 2063 return false; 2064 } 2065 // Offset is important. Needed when we have to remake cachekey when block is returned to cache. 2066 if (castedComparison.offset != this.offset) { 2067 return false; 2068 } 2069 if (castedComparison.onDiskSizeWithoutHeader != this.onDiskSizeWithoutHeader) { 2070 return false; 2071 } 2072 if (castedComparison.prevBlockOffset != this.prevBlockOffset) { 2073 return false; 2074 } 2075 if (castedComparison.uncompressedSizeWithoutHeader != this.uncompressedSizeWithoutHeader) { 2076 return false; 2077 } 2078 if ( 2079 ByteBuff.compareTo(this.bufWithoutChecksum, 0, this.bufWithoutChecksum.limit(), 2080 castedComparison.bufWithoutChecksum, 0, castedComparison.bufWithoutChecksum.limit()) != 0 2081 ) { 2082 return false; 2083 } 2084 return true; 2085 } 2086 2087 DataBlockEncoding getDataBlockEncoding() { 2088 if (blockType == BlockType.ENCODED_DATA) { 2089 return DataBlockEncoding.getEncodingById(getDataBlockEncodingId()); 2090 } 2091 return DataBlockEncoding.NONE; 2092 } 2093 2094 byte getChecksumType() { 2095 return this.fileContext.getChecksumType().getCode(); 2096 } 2097 2098 int getBytesPerChecksum() { 2099 return this.fileContext.getBytesPerChecksum(); 2100 } 2101 2102 /** Returns the size of data on disk + header. Excludes checksum. */ 2103 int getOnDiskDataSizeWithHeader() { 2104 return this.onDiskDataSizeWithHeader; 2105 } 2106 2107 /** 2108 * Return the number of bytes required to store all the checksums for this block. Each checksum 2109 * value is a 4 byte integer. <br/> 2110 * NOTE: ByteBuff returned by {@link HFileBlock#getBufferWithoutHeader()} and 2111 * {@link HFileBlock#getBufferReadOnly} or DataInputStream returned by 2112 * {@link HFileBlock#getByteStream()} does not include checksum. 2113 */ 2114 int totalChecksumBytes() { 2115 return totalChecksumBytes; 2116 } 2117 2118 private int computeTotalChecksumBytes() { 2119 // If the hfile block has minorVersion 0, then there are no checksum 2120 // data to validate. Similarly, a zero value in this.bytesPerChecksum 2121 // indicates that cached blocks do not have checksum data because 2122 // checksums were already validated when the block was read from disk. 2123 if (!fileContext.isUseHBaseChecksum() || this.fileContext.getBytesPerChecksum() == 0) { 2124 return 0; 2125 } 2126 return (int) ChecksumUtil.numBytes(onDiskDataSizeWithHeader, 2127 this.fileContext.getBytesPerChecksum()); 2128 } 2129 2130 /** 2131 * Returns the size of this block header. 2132 */ 2133 public int headerSize() { 2134 return headerSize(this.fileContext.isUseHBaseChecksum()); 2135 } 2136 2137 /** 2138 * Maps a minor version to the size of the header. 2139 */ 2140 public static int headerSize(boolean usesHBaseChecksum) { 2141 return usesHBaseChecksum 2142 ? HConstants.HFILEBLOCK_HEADER_SIZE 2143 : HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM; 2144 } 2145 2146 /** 2147 * Return the appropriate DUMMY_HEADER for the minor version 2148 */ 2149 // TODO: Why is this in here? 2150 byte[] getDummyHeaderForVersion() { 2151 return getDummyHeaderForVersion(this.fileContext.isUseHBaseChecksum()); 2152 } 2153 2154 /** 2155 * Return the appropriate DUMMY_HEADER for the minor version 2156 */ 2157 static private byte[] getDummyHeaderForVersion(boolean usesHBaseChecksum) { 2158 return usesHBaseChecksum ? HConstants.HFILEBLOCK_DUMMY_HEADER : DUMMY_HEADER_NO_CHECKSUM; 2159 } 2160 2161 /** 2162 * @return This HFileBlocks fileContext which will a derivative of the fileContext for the file 2163 * from which this block's data was originally read. 2164 */ 2165 public HFileContext getHFileContext() { 2166 return this.fileContext; 2167 } 2168 2169 /** 2170 * Convert the contents of the block header into a human readable string. This is mostly helpful 2171 * for debugging. This assumes that the block has minor version > 0. 2172 */ 2173 static String toStringHeader(ByteBuff buf) throws IOException { 2174 byte[] magicBuf = new byte[Math.min(buf.limit() - buf.position(), BlockType.MAGIC_LENGTH)]; 2175 buf.get(magicBuf); 2176 BlockType bt = BlockType.parse(magicBuf, 0, BlockType.MAGIC_LENGTH); 2177 int compressedBlockSizeNoHeader = buf.getInt(); 2178 int uncompressedBlockSizeNoHeader = buf.getInt(); 2179 long prevBlockOffset = buf.getLong(); 2180 byte cksumtype = buf.get(); 2181 long bytesPerChecksum = buf.getInt(); 2182 long onDiskDataSizeWithHeader = buf.getInt(); 2183 return " Header dump: magic: " + Bytes.toString(magicBuf) + " blockType " + bt 2184 + " compressedBlockSizeNoHeader " + compressedBlockSizeNoHeader 2185 + " uncompressedBlockSizeNoHeader " + uncompressedBlockSizeNoHeader + " prevBlockOffset " 2186 + prevBlockOffset + " checksumType " + ChecksumType.codeToType(cksumtype) 2187 + " bytesPerChecksum " + bytesPerChecksum + " onDiskDataSizeWithHeader " 2188 + onDiskDataSizeWithHeader; 2189 } 2190 2191 /** 2192 * Creates a new HFileBlockBuilder from the existing block and a new ByteBuff. The builder will be 2193 * loaded with all of the original fields from blk, except now using the newBuff and setting 2194 * isSharedMem based on the source of the passed in newBuff. An existing HFileBlock may have been 2195 * an {@link ExclusiveMemHFileBlock}, but the new buffer might call for a 2196 * {@link SharedMemHFileBlock}. Or vice versa. 2197 * @param blk the block to clone from 2198 * @param newBuff the new buffer to use 2199 */ 2200 private static HFileBlockBuilder createBuilder(HFileBlock blk, ByteBuff newBuff) { 2201 return new HFileBlockBuilder().withBlockType(blk.blockType) 2202 .withOnDiskSizeWithoutHeader(blk.onDiskSizeWithoutHeader) 2203 .withUncompressedSizeWithoutHeader(blk.uncompressedSizeWithoutHeader) 2204 .withPrevBlockOffset(blk.prevBlockOffset).withByteBuff(newBuff).withOffset(blk.offset) 2205 .withOnDiskDataSizeWithHeader(blk.onDiskDataSizeWithHeader) 2206 .withNextBlockOnDiskSize(blk.nextBlockOnDiskSize).withHFileContext(blk.fileContext) 2207 .withByteBuffAllocator(blk.allocator).withShared(!newBuff.hasArray()); 2208 } 2209 2210 private static HFileBlock shallowClone(HFileBlock blk, ByteBuff newBuf) { 2211 return createBuilder(blk, newBuf).build(); 2212 } 2213 2214 static HFileBlock deepCloneOnHeap(HFileBlock blk) { 2215 ByteBuff deepCloned = ByteBuff 2216 .wrap(ByteBuffer.wrap(blk.bufWithoutChecksum.toBytes(0, blk.bufWithoutChecksum.limit()))); 2217 return createBuilder(blk, deepCloned).build(); 2218 } 2219 2220 /** 2221 * Returns OpenTelemetry Attributes for a Span that is reading a data block with relevant 2222 * metadata. Will short-circuit if the span isn't going to be captured/OTEL isn't enabled. 2223 */ 2224 private static Attributes getReadDataBlockInternalAttributes(Span span) { 2225 // It's expensive to record these attributes, so we avoid the cost of doing this if the span 2226 // isn't going to be persisted 2227 if (!span.isRecording()) { 2228 return Attributes.empty(); 2229 } 2230 2231 final AttributesBuilder attributesBuilder = Attributes.builder(); 2232 Optional.of(Context.current()).map(val -> val.get(CONTEXT_KEY)) 2233 .ifPresent(c -> c.accept(attributesBuilder)); 2234 return attributesBuilder.build(); 2235 } 2236}