001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP; 021import static org.apache.hadoop.hbase.io.hfile.BlockCompressedSizePredicator.BLOCK_COMPRESSED_SIZE_PREDICATOR; 022import static org.apache.hadoop.hbase.io.hfile.trace.HFileContextAttributesBuilderConsumer.CONTEXT_KEY; 023 024import io.opentelemetry.api.common.Attributes; 025import io.opentelemetry.api.common.AttributesBuilder; 026import io.opentelemetry.api.trace.Span; 027import io.opentelemetry.context.Context; 028import io.opentelemetry.context.Scope; 029import java.io.DataInputStream; 030import java.io.DataOutput; 031import java.io.DataOutputStream; 032import java.io.IOException; 033import java.nio.ByteBuffer; 034import java.util.ArrayList; 035import java.util.List; 036import java.util.Optional; 037import java.util.concurrent.atomic.AtomicReference; 038import java.util.concurrent.locks.Lock; 039import java.util.concurrent.locks.ReentrantLock; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.fs.FSDataInputStream; 042import org.apache.hadoop.fs.FSDataOutputStream; 043import org.apache.hadoop.hbase.ExtendedCell; 044import org.apache.hadoop.hbase.HConstants; 045import org.apache.hadoop.hbase.fs.HFileSystem; 046import org.apache.hadoop.hbase.io.ByteArrayOutputStream; 047import org.apache.hadoop.hbase.io.ByteBuffAllocator; 048import org.apache.hadoop.hbase.io.ByteBuffInputStream; 049import org.apache.hadoop.hbase.io.ByteBufferWriterDataOutputStream; 050import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; 051import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 052import org.apache.hadoop.hbase.io.encoding.EncodingState; 053import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; 054import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext; 055import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext; 056import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext; 057import org.apache.hadoop.hbase.io.hfile.trace.HFileContextAttributesBuilderConsumer; 058import org.apache.hadoop.hbase.io.util.BlockIOUtils; 059import org.apache.hadoop.hbase.nio.ByteBuff; 060import org.apache.hadoop.hbase.nio.MultiByteBuff; 061import org.apache.hadoop.hbase.nio.SingleByteBuff; 062import org.apache.hadoop.hbase.regionserver.ShipperListener; 063import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.ReadType; 064import org.apache.hadoop.hbase.util.Bytes; 065import org.apache.hadoop.hbase.util.ChecksumType; 066import org.apache.hadoop.hbase.util.ClassSize; 067import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 068import org.apache.hadoop.util.ReflectionUtils; 069import org.apache.yetus.audience.InterfaceAudience; 070import org.slf4j.Logger; 071import org.slf4j.LoggerFactory; 072 073import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 074 075/** 076 * Cacheable Blocks of an {@link HFile} version 2 file. Version 2 was introduced in hbase-0.92.0. 077 * <p> 078 * Version 1 was the original file block. Version 2 was introduced when we changed the hbase file 079 * format to support multi-level block indexes and compound bloom filters (HBASE-3857). Support for 080 * Version 1 was removed in hbase-1.3.0. 081 * <h3>HFileBlock: Version 2</h3> In version 2, a block is structured as follows: 082 * <ul> 083 * <li><b>Header:</b> See Writer#putHeader() for where header is written; header total size is 084 * HFILEBLOCK_HEADER_SIZE 085 * <ul> 086 * <li>0. blockType: Magic record identifying the {@link BlockType} (8 bytes): e.g. 087 * <code>DATABLK*</code> 088 * <li>1. onDiskSizeWithoutHeader: Compressed -- a.k.a 'on disk' -- block size, excluding header, 089 * but including tailing checksum bytes (4 bytes) 090 * <li>2. uncompressedSizeWithoutHeader: Uncompressed block size, excluding header, and excluding 091 * checksum bytes (4 bytes) 092 * <li>3. prevBlockOffset: The offset of the previous block of the same type (8 bytes). This is used 093 * to navigate to the previous block without having to go to the block index 094 * <li>4: For minorVersions >=1, the ordinal describing checksum type (1 byte) 095 * <li>5: For minorVersions >=1, the number of data bytes/checksum chunk (4 bytes) 096 * <li>6: onDiskDataSizeWithHeader: For minorVersions >=1, the size of data 'on disk', including 097 * header, excluding checksums (4 bytes) 098 * </ul> 099 * </li> 100 * <li><b>Raw/Compressed/Encrypted/Encoded data:</b> The compression algorithm is the same for all 101 * the blocks in an {@link HFile}. If compression is NONE, this is just raw, serialized Cells. 102 * <li><b>Tail:</b> For minorVersions >=1, a series of 4 byte checksums, one each for the number 103 * of bytes specified by bytesPerChecksum. 104 * </ul> 105 * <h3>Caching</h3> Caches cache whole blocks with trailing checksums if any. We then tag on some 106 * metadata, the content of BLOCK_METADATA_SPACE which will be flag on if we are doing 'hbase' 107 * checksums and then the offset into the file which is needed when we re-make a cache key when we 108 * return the block to the cache as 'done'. See {@link Cacheable#serialize(ByteBuffer, boolean)} and 109 * {@link Cacheable#getDeserializer()}. 110 * <p> 111 * TODO: Should we cache the checksums? Down in Writer#getBlockForCaching(CacheConfig) where we make 112 * a block to cache-on-write, there is an attempt at turning off checksums. This is not the only 113 * place we get blocks to cache. We also will cache the raw return from an hdfs read. In this case, 114 * the checksums may be present. If the cache is backed by something that doesn't do ECC, say an 115 * SSD, we might want to preserve checksums. For now this is open question. 116 * <p> 117 * TODO: Over in BucketCache, we save a block allocation by doing a custom serialization. Be sure to 118 * change it if serialization changes in here. Could we add a method here that takes an IOEngine and 119 * that then serializes to it rather than expose our internals over in BucketCache? IOEngine is in 120 * the bucket subpackage. Pull it up? Then this class knows about bucketcache. Ugh. 121 */ 122@InterfaceAudience.Private 123public class HFileBlock implements Cacheable { 124 private static final Logger LOG = LoggerFactory.getLogger(HFileBlock.class); 125 public static final long FIXED_OVERHEAD = ClassSize.estimateBase(HFileBlock.class, false); 126 127 // Block Header fields. 128 129 // TODO: encapsulate Header related logic in this inner class. 130 static class Header { 131 // Format of header is: 132 // 8 bytes - block magic 133 // 4 bytes int - onDiskSizeWithoutHeader 134 // 4 bytes int - uncompressedSizeWithoutHeader 135 // 8 bytes long - prevBlockOffset 136 // The following 3 are only present if header contains checksum information 137 // 1 byte - checksum type 138 // 4 byte int - bytes per checksum 139 // 4 byte int - onDiskDataSizeWithHeader 140 static int BLOCK_MAGIC_INDEX = 0; 141 static int ON_DISK_SIZE_WITHOUT_HEADER_INDEX = 8; 142 static int UNCOMPRESSED_SIZE_WITHOUT_HEADER_INDEX = 12; 143 static int PREV_BLOCK_OFFSET_INDEX = 16; 144 static int CHECKSUM_TYPE_INDEX = 24; 145 static int BYTES_PER_CHECKSUM_INDEX = 25; 146 static int ON_DISK_DATA_SIZE_WITH_HEADER_INDEX = 29; 147 } 148 149 /** Type of block. Header field 0. */ 150 private BlockType blockType; 151 152 /** 153 * Size on disk excluding header, including checksum. Header field 1. 154 * @see Writer#putHeader(byte[], int, int, int, int) 155 */ 156 private int onDiskSizeWithoutHeader; 157 158 /** 159 * Size of pure data. Does not include header or checksums. Header field 2. 160 * @see Writer#putHeader(byte[], int, int, int, int) 161 */ 162 private int uncompressedSizeWithoutHeader; 163 164 /** 165 * The offset of the previous block on disk. Header field 3. 166 * @see Writer#putHeader(byte[], int, int, int, int) 167 */ 168 private long prevBlockOffset; 169 170 /** 171 * Size on disk of header + data. Excludes checksum. Header field 6, OR calculated from 172 * {@link #onDiskSizeWithoutHeader} when using HDFS checksum. 173 * @see Writer#putHeader(byte[], int, int, int, int) 174 */ 175 private final int onDiskDataSizeWithHeader; 176 // End of Block Header fields. 177 178 /** 179 * The in-memory representation of the hfile block. Can be on or offheap. Can be backed by a 180 * single ByteBuffer or by many. Make no assumptions. 181 * <p> 182 * Be careful reading from this <code>buf</code>. Duplicate and work on the duplicate or if not, 183 * be sure to reset position and limit else trouble down the road. 184 * <p> 185 * TODO: Make this read-only once made. 186 * <p> 187 * We are using the ByteBuff type. ByteBuffer is not extensible yet we need to be able to have a 188 * ByteBuffer-like API across multiple ByteBuffers reading from a cache such as BucketCache. So, 189 * we have this ByteBuff type. Unfortunately, it is spread all about HFileBlock. Would be good if 190 * could be confined to cache-use only but hard-to-do. 191 * <p> 192 * NOTE: this byteBuff including HFileBlock header and data, but excluding checksum. 193 */ 194 private ByteBuff bufWithoutChecksum; 195 196 /** 197 * Meta data that holds meta information on the hfileblock. 198 */ 199 private final HFileContext fileContext; 200 201 /** 202 * The offset of this block in the file. Populated by the reader for convenience of access. This 203 * offset is not part of the block header. 204 */ 205 private long offset = UNSET; 206 207 /** 208 * The on-disk size of the next block, including the header and checksums if present. UNSET if 209 * unknown. Blocks try to carry the size of the next block to read in this data member. Usually we 210 * get block sizes from the hfile index but sometimes the index is not available: e.g. when we 211 * read the indexes themselves (indexes are stored in blocks, we do not have an index for the 212 * indexes). Saves seeks especially around file open when there is a flurry of reading in hfile 213 * metadata. 214 */ 215 private int nextBlockOnDiskSize = UNSET; 216 217 private ByteBuffAllocator allocator; 218 219 /** 220 * On a checksum failure, do these many succeeding read requests using hdfs checksums before 221 * auto-reenabling hbase checksum verification. 222 */ 223 static final int CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD = 3; 224 225 private static int UNSET = -1; 226 public static final boolean FILL_HEADER = true; 227 public static final boolean DONT_FILL_HEADER = false; 228 229 // How to get the estimate correctly? if it is a singleBB? 230 public static final int MULTI_BYTE_BUFFER_HEAP_SIZE = 231 (int) ClassSize.estimateBase(MultiByteBuff.class, false); 232 233 /** 234 * Space for metadata on a block that gets stored along with the block when we cache it. There are 235 * a few bytes stuck on the end of the HFileBlock that we pull in from HDFS. 8 bytes are for the 236 * offset of this block (long) in the file. Offset is important because is is used when we remake 237 * the CacheKey when we return block to the cache when done. There is also a flag on whether 238 * checksumming is being done by hbase or not. See class comment for note on uncertain state of 239 * checksumming of blocks that come out of cache (should we or should we not?). Finally there are 240 * 4 bytes to hold the length of the next block which can save a seek on occasion if available. 241 * (This EXTRA info came in with original commit of the bucketcache, HBASE-7404. It was formerly 242 * known as EXTRA_SERIALIZATION_SPACE). 243 */ 244 public static final int BLOCK_METADATA_SPACE = 245 Bytes.SIZEOF_BYTE + Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT; 246 247 /** 248 * Each checksum value is an integer that can be stored in 4 bytes. 249 */ 250 static final int CHECKSUM_SIZE = Bytes.SIZEOF_INT; 251 252 static final byte[] DUMMY_HEADER_NO_CHECKSUM = 253 new byte[HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM]; 254 255 /** 256 * Used deserializing blocks from Cache. <code> 257 * ++++++++++++++ 258 * + HFileBlock + 259 * ++++++++++++++ 260 * + Checksums + <= Optional 261 * ++++++++++++++ 262 * + Metadata! + <= See note on BLOCK_METADATA_SPACE above. 263 * ++++++++++++++ 264 * </code> 265 * @see #serialize(ByteBuffer, boolean) 266 */ 267 public static final CacheableDeserializer<Cacheable> BLOCK_DESERIALIZER = new BlockDeserializer(); 268 269 public static final class BlockDeserializer implements CacheableDeserializer<Cacheable> { 270 private BlockDeserializer() { 271 } 272 273 @Override 274 public HFileBlock deserialize(ByteBuff buf, ByteBuffAllocator alloc) throws IOException { 275 // The buf has the file block followed by block metadata. 276 // Set limit to just before the BLOCK_METADATA_SPACE then rewind. 277 buf.limit(buf.limit() - BLOCK_METADATA_SPACE).rewind(); 278 // Get a new buffer to pass the HFileBlock for it to 'own'. 279 ByteBuff newByteBuff = buf.slice(); 280 // Read out the BLOCK_METADATA_SPACE content and shove into our HFileBlock. 281 buf.position(buf.limit()); 282 buf.limit(buf.limit() + HFileBlock.BLOCK_METADATA_SPACE); 283 boolean usesChecksum = buf.get() == (byte) 1; 284 long offset = buf.getLong(); 285 int nextBlockOnDiskSize = buf.getInt(); 286 return createFromBuff(newByteBuff, usesChecksum, offset, nextBlockOnDiskSize, null, alloc); 287 } 288 289 @Override 290 public int getDeserializerIdentifier() { 291 return DESERIALIZER_IDENTIFIER; 292 } 293 } 294 295 private static final int DESERIALIZER_IDENTIFIER; 296 static { 297 DESERIALIZER_IDENTIFIER = 298 CacheableDeserializerIdManager.registerDeserializer(BLOCK_DESERIALIZER); 299 } 300 301 private final int totalChecksumBytes; 302 303 /** 304 * Creates a new {@link HFile} block from the given fields. This constructor is used only while 305 * writing blocks and caching, and is sitting in a byte buffer and we want to stuff the block into 306 * cache. 307 * <p> 308 * TODO: The caller presumes no checksumming 309 * <p> 310 * TODO: HFile block writer can also off-heap ? 311 * </p> 312 * required of this block instance since going into cache; checksum already verified on underlying 313 * block data pulled in from filesystem. Is that correct? What if cache is SSD? 314 * @param blockType the type of this block, see {@link BlockType} 315 * @param onDiskSizeWithoutHeader see {@link #onDiskSizeWithoutHeader} 316 * @param uncompressedSizeWithoutHeader see {@link #uncompressedSizeWithoutHeader} 317 * @param prevBlockOffset see {@link #prevBlockOffset} 318 * @param buf block buffer with header 319 * ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes) 320 * @param fillHeader when true, write the first 4 header fields into passed 321 * buffer. 322 * @param offset the file offset the block was read from 323 * @param onDiskDataSizeWithHeader see {@link #onDiskDataSizeWithHeader} 324 * @param fileContext HFile meta data 325 */ 326 public HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader, 327 int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuff buf, boolean fillHeader, 328 long offset, int nextBlockOnDiskSize, int onDiskDataSizeWithHeader, HFileContext fileContext, 329 ByteBuffAllocator allocator) { 330 this.blockType = blockType; 331 this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader; 332 this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader; 333 this.prevBlockOffset = prevBlockOffset; 334 this.offset = offset; 335 this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader; 336 this.nextBlockOnDiskSize = nextBlockOnDiskSize; 337 this.fileContext = fileContext; 338 this.allocator = allocator; 339 this.bufWithoutChecksum = buf; 340 if (fillHeader) { 341 overwriteHeader(); 342 } 343 this.bufWithoutChecksum.rewind(); 344 this.totalChecksumBytes = computeTotalChecksumBytes(); 345 } 346 347 /** 348 * Creates a block from an existing buffer starting with a header. Rewinds and takes ownership of 349 * the buffer. By definition of rewind, ignores the buffer position, but if you slice the buffer 350 * beforehand, it will rewind to that point. 351 * @param buf Has header, content, and trailing checksums if present. 352 */ 353 static HFileBlock createFromBuff(ByteBuff buf, boolean usesHBaseChecksum, final long offset, 354 final int nextBlockOnDiskSize, HFileContext fileContext, ByteBuffAllocator allocator) 355 throws IOException { 356 buf.rewind(); 357 final BlockType blockType = BlockType.read(buf); 358 final int onDiskSizeWithoutHeader = buf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX); 359 final int uncompressedSizeWithoutHeader = 360 buf.getInt(Header.UNCOMPRESSED_SIZE_WITHOUT_HEADER_INDEX); 361 final long prevBlockOffset = buf.getLong(Header.PREV_BLOCK_OFFSET_INDEX); 362 // This constructor is called when we deserialize a block from cache and when we read a block in 363 // from the fs. fileCache is null when deserialized from cache so need to make up one. 364 HFileContextBuilder fileContextBuilder = 365 fileContext != null ? new HFileContextBuilder(fileContext) : new HFileContextBuilder(); 366 fileContextBuilder.withHBaseCheckSum(usesHBaseChecksum); 367 int onDiskDataSizeWithHeader; 368 if (usesHBaseChecksum) { 369 byte checksumType = buf.get(Header.CHECKSUM_TYPE_INDEX); 370 int bytesPerChecksum = buf.getInt(Header.BYTES_PER_CHECKSUM_INDEX); 371 onDiskDataSizeWithHeader = buf.getInt(Header.ON_DISK_DATA_SIZE_WITH_HEADER_INDEX); 372 // Use the checksum type and bytes per checksum from header, not from fileContext. 373 fileContextBuilder.withChecksumType(ChecksumType.codeToType(checksumType)); 374 fileContextBuilder.withBytesPerCheckSum(bytesPerChecksum); 375 } else { 376 fileContextBuilder.withChecksumType(ChecksumType.NULL); 377 fileContextBuilder.withBytesPerCheckSum(0); 378 // Need to fix onDiskDataSizeWithHeader; there are not checksums after-block-data 379 onDiskDataSizeWithHeader = onDiskSizeWithoutHeader + headerSize(usesHBaseChecksum); 380 } 381 fileContext = fileContextBuilder.build(); 382 assert usesHBaseChecksum == fileContext.isUseHBaseChecksum(); 383 return new HFileBlockBuilder().withBlockType(blockType) 384 .withOnDiskSizeWithoutHeader(onDiskSizeWithoutHeader) 385 .withUncompressedSizeWithoutHeader(uncompressedSizeWithoutHeader) 386 .withPrevBlockOffset(prevBlockOffset).withOffset(offset) 387 .withOnDiskDataSizeWithHeader(onDiskDataSizeWithHeader) 388 .withNextBlockOnDiskSize(nextBlockOnDiskSize).withHFileContext(fileContext) 389 .withByteBuffAllocator(allocator).withByteBuff(buf.rewind()).withShared(!buf.hasArray()) 390 .build(); 391 } 392 393 /** 394 * Parse total on disk size including header and checksum. 395 * @param headerBuf Header ByteBuffer. Presumed exact size of header. 396 * @param checksumSupport true if checksum verification is in use. 397 * @return Size of the block with header included. 398 */ 399 private static int getOnDiskSizeWithHeader(final ByteBuff headerBuf, boolean checksumSupport) { 400 return headerBuf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX) + headerSize(checksumSupport); 401 } 402 403 /** 404 * @return the on-disk size of the next block (including the header size and any checksums if 405 * present) read by peeking into the next block's header; use as a hint when doing a read 406 * of the next block when scanning or running over a file. 407 */ 408 int getNextBlockOnDiskSize() { 409 return nextBlockOnDiskSize; 410 } 411 412 @Override 413 public BlockType getBlockType() { 414 return blockType; 415 } 416 417 @Override 418 public int refCnt() { 419 return bufWithoutChecksum.refCnt(); 420 } 421 422 @Override 423 public HFileBlock retain() { 424 bufWithoutChecksum.retain(); 425 return this; 426 } 427 428 /** 429 * Call {@link ByteBuff#release()} to decrease the reference count, if no other reference, it will 430 * return back the {@link ByteBuffer} to {@link org.apache.hadoop.hbase.io.ByteBuffAllocator} 431 */ 432 @Override 433 public boolean release() { 434 return bufWithoutChecksum.release(); 435 } 436 437 /** 438 * Calling this method in strategic locations where HFileBlocks are referenced may help diagnose 439 * potential buffer leaks. We pass the block itself as a default hint, but one can use 440 * {@link #touch(Object)} to pass their own hint as well. 441 */ 442 @Override 443 public HFileBlock touch() { 444 return touch(this); 445 } 446 447 @Override 448 public HFileBlock touch(Object hint) { 449 bufWithoutChecksum.touch(hint); 450 return this; 451 } 452 453 /** Returns get data block encoding id that was used to encode this block */ 454 short getDataBlockEncodingId() { 455 if (blockType != BlockType.ENCODED_DATA) { 456 throw new IllegalArgumentException("Querying encoder ID of a block " + "of type other than " 457 + BlockType.ENCODED_DATA + ": " + blockType); 458 } 459 return bufWithoutChecksum.getShort(headerSize()); 460 } 461 462 /** Returns the on-disk size of header + data part + checksum. */ 463 public int getOnDiskSizeWithHeader() { 464 return onDiskSizeWithoutHeader + headerSize(); 465 } 466 467 /** Returns the on-disk size of the data part + checksum (header excluded). */ 468 int getOnDiskSizeWithoutHeader() { 469 return onDiskSizeWithoutHeader; 470 } 471 472 /** Returns the uncompressed size of data part (header and checksum excluded). */ 473 public int getUncompressedSizeWithoutHeader() { 474 return uncompressedSizeWithoutHeader; 475 } 476 477 /** Returns the offset of the previous block of the same type in the file, or -1 if unknown */ 478 long getPrevBlockOffset() { 479 return prevBlockOffset; 480 } 481 482 /** 483 * Rewinds {@code buf} and writes first 4 header fields. {@code buf} position is modified as 484 * side-effect. 485 */ 486 private void overwriteHeader() { 487 bufWithoutChecksum.rewind(); 488 blockType.write(bufWithoutChecksum); 489 bufWithoutChecksum.putInt(onDiskSizeWithoutHeader); 490 bufWithoutChecksum.putInt(uncompressedSizeWithoutHeader); 491 bufWithoutChecksum.putLong(prevBlockOffset); 492 if (this.fileContext.isUseHBaseChecksum()) { 493 bufWithoutChecksum.put(fileContext.getChecksumType().getCode()); 494 bufWithoutChecksum.putInt(fileContext.getBytesPerChecksum()); 495 bufWithoutChecksum.putInt(onDiskDataSizeWithHeader); 496 } 497 } 498 499 /** 500 * Returns a buffer that does not include the header and checksum. 501 * @return the buffer with header skipped and checksum omitted. 502 */ 503 public ByteBuff getBufferWithoutHeader() { 504 ByteBuff dup = getBufferReadOnly(); 505 return dup.position(headerSize()).slice(); 506 } 507 508 /** 509 * Returns a read-only duplicate of the buffer this block stores internally ready to be read. 510 * Clients must not modify the buffer object though they may set position and limit on the 511 * returned buffer since we pass back a duplicate. This method has to be public because it is used 512 * in {@link CompoundBloomFilter} to avoid object creation on every Bloom filter lookup, but has 513 * to be used with caution. Buffer holds header, block content, and any follow-on checksums if 514 * present. 515 * @return the buffer of this block for read-only operations,the buffer includes header,but not 516 * checksum. 517 */ 518 public ByteBuff getBufferReadOnly() { 519 // TODO: ByteBuf does not support asReadOnlyBuffer(). Fix. 520 ByteBuff dup = this.bufWithoutChecksum.duplicate(); 521 assert dup.position() == 0; 522 return dup; 523 } 524 525 public ByteBuffAllocator getByteBuffAllocator() { 526 return this.allocator; 527 } 528 529 private void sanityCheckAssertion(long valueFromBuf, long valueFromField, String fieldName) 530 throws IOException { 531 if (valueFromBuf != valueFromField) { 532 throw new AssertionError(fieldName + " in the buffer (" + valueFromBuf 533 + ") is different from that in the field (" + valueFromField + ")"); 534 } 535 } 536 537 private void sanityCheckAssertion(BlockType valueFromBuf, BlockType valueFromField) 538 throws IOException { 539 if (valueFromBuf != valueFromField) { 540 throw new IOException("Block type stored in the buffer: " + valueFromBuf 541 + ", block type field: " + valueFromField); 542 } 543 } 544 545 /** 546 * Checks if the block is internally consistent, i.e. the first 547 * {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the buffer contain a valid header consistent 548 * with the fields. Assumes a packed block structure. This function is primary for testing and 549 * debugging, and is not thread-safe, because it alters the internal buffer pointer. Used by tests 550 * only. 551 */ 552 void sanityCheck() throws IOException { 553 // Duplicate so no side-effects 554 ByteBuff dup = this.bufWithoutChecksum.duplicate().rewind(); 555 sanityCheckAssertion(BlockType.read(dup), blockType); 556 557 sanityCheckAssertion(dup.getInt(), onDiskSizeWithoutHeader, "onDiskSizeWithoutHeader"); 558 559 sanityCheckAssertion(dup.getInt(), uncompressedSizeWithoutHeader, 560 "uncompressedSizeWithoutHeader"); 561 562 sanityCheckAssertion(dup.getLong(), prevBlockOffset, "prevBlockOffset"); 563 if (this.fileContext.isUseHBaseChecksum()) { 564 sanityCheckAssertion(dup.get(), this.fileContext.getChecksumType().getCode(), "checksumType"); 565 sanityCheckAssertion(dup.getInt(), this.fileContext.getBytesPerChecksum(), 566 "bytesPerChecksum"); 567 sanityCheckAssertion(dup.getInt(), onDiskDataSizeWithHeader, "onDiskDataSizeWithHeader"); 568 } 569 570 if (dup.limit() != onDiskDataSizeWithHeader) { 571 throw new AssertionError( 572 "Expected limit " + onDiskDataSizeWithHeader + ", got " + dup.limit()); 573 } 574 575 // We might optionally allocate HFILEBLOCK_HEADER_SIZE more bytes to read the next 576 // block's header, so there are two sensible values for buffer capacity. 577 int hdrSize = headerSize(); 578 dup.rewind(); 579 if ( 580 dup.remaining() != onDiskDataSizeWithHeader 581 && dup.remaining() != onDiskDataSizeWithHeader + hdrSize 582 ) { 583 throw new AssertionError("Invalid buffer capacity: " + dup.remaining() + ", expected " 584 + onDiskDataSizeWithHeader + " or " + (onDiskDataSizeWithHeader + hdrSize)); 585 } 586 } 587 588 @Override 589 public String toString() { 590 StringBuilder sb = new StringBuilder().append("[").append("blockType=").append(blockType) 591 .append(", fileOffset=").append(offset).append(", headerSize=").append(headerSize()) 592 .append(", onDiskSizeWithoutHeader=").append(onDiskSizeWithoutHeader) 593 .append(", uncompressedSizeWithoutHeader=").append(uncompressedSizeWithoutHeader) 594 .append(", prevBlockOffset=").append(prevBlockOffset).append(", isUseHBaseChecksum=") 595 .append(fileContext.isUseHBaseChecksum()); 596 if (fileContext.isUseHBaseChecksum()) { 597 sb.append(", checksumType=").append(ChecksumType.codeToType(this.bufWithoutChecksum.get(24))) 598 .append(", bytesPerChecksum=").append(this.bufWithoutChecksum.getInt(24 + 1)) 599 .append(", onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader); 600 } else { 601 sb.append(", onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader).append("(") 602 .append(onDiskSizeWithoutHeader).append("+") 603 .append(HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM).append(")"); 604 } 605 String dataBegin; 606 if (bufWithoutChecksum.hasArray()) { 607 dataBegin = Bytes.toStringBinary(bufWithoutChecksum.array(), 608 bufWithoutChecksum.arrayOffset() + headerSize(), 609 Math.min(32, bufWithoutChecksum.limit() - bufWithoutChecksum.arrayOffset() - headerSize())); 610 } else { 611 ByteBuff bufWithoutHeader = getBufferWithoutHeader(); 612 byte[] dataBeginBytes = 613 new byte[Math.min(32, bufWithoutHeader.limit() - bufWithoutHeader.position())]; 614 bufWithoutHeader.get(dataBeginBytes); 615 dataBegin = Bytes.toStringBinary(dataBeginBytes); 616 } 617 sb.append(", getOnDiskSizeWithHeader=").append(getOnDiskSizeWithHeader()) 618 .append(", totalChecksumBytes=").append(totalChecksumBytes()).append(", isUnpacked=") 619 .append(isUnpacked()).append(", buf=[").append(bufWithoutChecksum).append("]") 620 .append(", dataBeginsWith=").append(dataBegin).append(", fileContext=").append(fileContext) 621 .append(", nextBlockOnDiskSize=").append(nextBlockOnDiskSize).append("]"); 622 return sb.toString(); 623 } 624 625 /** 626 * Retrieves the decompressed/decrypted view of this block. An encoded block remains in its 627 * encoded structure. Internal structures are shared between instances where applicable. 628 */ 629 HFileBlock unpack(HFileContext fileContext, FSReader reader) throws IOException { 630 if (!fileContext.isCompressedOrEncrypted()) { 631 // TODO: cannot use our own fileContext here because HFileBlock(ByteBuffer, boolean), 632 // which is used for block serialization to L2 cache, does not preserve encoding and 633 // encryption details. 634 return this; 635 } 636 637 ByteBuff newBuf = allocateBufferForUnpacking(); // allocates space for the decompressed block 638 HFileBlock unpacked = shallowClone(this, newBuf); 639 640 boolean succ = false; 641 final Context context = 642 Context.current().with(CONTEXT_KEY, new HFileContextAttributesBuilderConsumer(fileContext)); 643 try (Scope ignored = context.makeCurrent()) { 644 HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA 645 ? reader.getBlockDecodingContext() 646 : reader.getDefaultBlockDecodingContext(); 647 // Create a duplicated buffer without the header part. 648 int headerSize = this.headerSize(); 649 ByteBuff dup = this.bufWithoutChecksum.duplicate(); 650 dup.position(headerSize); 651 dup = dup.slice(); 652 // Decode the dup into unpacked#buf 653 ctx.prepareDecoding(unpacked.getOnDiskDataSizeWithHeader() - headerSize, 654 unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(), dup); 655 succ = true; 656 return unpacked; 657 } finally { 658 if (!succ) { 659 unpacked.release(); 660 } 661 } 662 } 663 664 /** 665 * Always allocates a new buffer of the correct size. Copies header bytes from the existing 666 * buffer. Does not change header fields. Reserve room to keep checksum bytes too. 667 */ 668 private ByteBuff allocateBufferForUnpacking() { 669 int headerSize = headerSize(); 670 int capacityNeeded = headerSize + uncompressedSizeWithoutHeader; 671 672 ByteBuff source = bufWithoutChecksum.duplicate(); 673 ByteBuff newBuf = allocator.allocate(capacityNeeded); 674 675 // Copy header bytes into newBuf. 676 source.position(0); 677 newBuf.put(0, source, 0, headerSize); 678 679 // set limit to exclude next block's header 680 newBuf.limit(capacityNeeded); 681 return newBuf; 682 } 683 684 /** 685 * Return true when this block's buffer has been unpacked, false otherwise. Note this is a 686 * calculated heuristic, not tracked attribute of the block. 687 */ 688 public boolean isUnpacked() { 689 final int headerSize = headerSize(); 690 final int expectedCapacity = headerSize + uncompressedSizeWithoutHeader; 691 final int bufCapacity = bufWithoutChecksum.remaining(); 692 return bufCapacity == expectedCapacity || bufCapacity == expectedCapacity + headerSize; 693 } 694 695 /** 696 * Cannot be {@link #UNSET}. Must be a legitimate value. Used re-making the {@link BlockCacheKey} 697 * when block is returned to the cache. 698 * @return the offset of this block in the file it was read from 699 */ 700 public long getOffset() { 701 if (offset < 0) { 702 throw new IllegalStateException("HFile block offset not initialized properly"); 703 } 704 return offset; 705 } 706 707 /** Returns a byte stream reading the data(excluding header and checksum) of this block */ 708 DataInputStream getByteStream() { 709 ByteBuff dup = this.bufWithoutChecksum.duplicate(); 710 dup.position(this.headerSize()); 711 return new DataInputStream(new ByteBuffInputStream(dup)); 712 } 713 714 @Override 715 public long heapSize() { 716 long size = FIXED_OVERHEAD; 717 size += fileContext.heapSize(); 718 if (bufWithoutChecksum != null) { 719 // Deep overhead of the byte buffer. Needs to be aligned separately. 720 size += ClassSize.align(bufWithoutChecksum.capacity() + MULTI_BYTE_BUFFER_HEAP_SIZE); 721 } 722 return ClassSize.align(size); 723 } 724 725 /** 726 * Will be override by {@link SharedMemHFileBlock} or {@link ExclusiveMemHFileBlock}. Return true 727 * by default. 728 */ 729 public boolean isSharedMem() { 730 return true; 731 } 732 733 /** 734 * Unified version 2 {@link HFile} block writer. The intended usage pattern is as follows: 735 * <ol> 736 * <li>Construct an {@link HFileBlock.Writer}, providing a compression algorithm. 737 * <li>Call {@link Writer#startWriting} and get a data stream to write to. 738 * <li>Write your data into the stream. 739 * <li>Call Writer#writeHeaderAndData(FSDataOutputStream) as many times as you need to. store the 740 * serialized block into an external stream. 741 * <li>Repeat to write more blocks. 742 * </ol> 743 * <p> 744 */ 745 static class Writer implements ShipperListener { 746 private enum State { 747 INIT, 748 WRITING, 749 BLOCK_READY 750 } 751 752 private int maxSizeUnCompressed; 753 754 private BlockCompressedSizePredicator compressedSizePredicator; 755 756 /** Writer state. Used to ensure the correct usage protocol. */ 757 private State state = State.INIT; 758 759 /** Data block encoder used for data blocks */ 760 private final HFileDataBlockEncoder dataBlockEncoder; 761 762 private HFileBlockEncodingContext dataBlockEncodingCtx; 763 764 /** block encoding context for non-data blocks */ 765 private HFileBlockDefaultEncodingContext defaultBlockEncodingCtx; 766 767 /** 768 * The stream we use to accumulate data into a block in an uncompressed format. We reset this 769 * stream at the end of each block and reuse it. The header is written as the first 770 * {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes into this stream. 771 */ 772 private ByteArrayOutputStream baosInMemory; 773 774 /** 775 * Current block type. Set in {@link #startWriting(BlockType)}. Could be changed in 776 * {@link #finishBlock()} from {@link BlockType#DATA} to {@link BlockType#ENCODED_DATA}. 777 */ 778 private BlockType blockType; 779 780 /** 781 * A stream that we write uncompressed bytes to, which compresses them and writes them to 782 * {@link #baosInMemory}. 783 */ 784 private DataOutputStream userDataStream; 785 786 /** 787 * Bytes to be written to the file system, including the header. Compressed if compression is 788 * turned on. It also includes the checksum data that immediately follows the block data. 789 * (header + data + checksums) 790 */ 791 private ByteArrayOutputStream onDiskBlockBytesWithHeader; 792 793 /** 794 * The size of the checksum data on disk. It is used only if data is not compressed. If data is 795 * compressed, then the checksums are already part of onDiskBytesWithHeader. If data is 796 * uncompressed, then this variable stores the checksum data for this block. 797 */ 798 private byte[] onDiskChecksum = HConstants.EMPTY_BYTE_ARRAY; 799 800 /** 801 * Current block's start offset in the {@link HFile}. Set in 802 * {@link #writeHeaderAndData(FSDataOutputStream)}. 803 */ 804 private long startOffset; 805 806 /** 807 * Offset of previous block by block type. Updated when the next block is started. 808 */ 809 private long[] prevOffsetByType; 810 811 /** The offset of the previous block of the same type */ 812 private long prevOffset; 813 /** Meta data that holds information about the hfileblock **/ 814 private HFileContext fileContext; 815 816 private final ByteBuffAllocator allocator; 817 818 @Override 819 public void beforeShipped() { 820 if (getEncodingState() != null) { 821 getEncodingState().beforeShipped(); 822 } 823 } 824 825 EncodingState getEncodingState() { 826 return dataBlockEncodingCtx.getEncodingState(); 827 } 828 829 /** 830 * @param dataBlockEncoder data block encoding algorithm to use 831 */ 832 public Writer(Configuration conf, HFileDataBlockEncoder dataBlockEncoder, 833 HFileContext fileContext) { 834 this(conf, dataBlockEncoder, fileContext, ByteBuffAllocator.HEAP, fileContext.getBlocksize()); 835 } 836 837 public Writer(Configuration conf, HFileDataBlockEncoder dataBlockEncoder, 838 HFileContext fileContext, ByteBuffAllocator allocator, int maxSizeUnCompressed) { 839 if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) { 840 throw new RuntimeException("Unsupported value of bytesPerChecksum. " + " Minimum is " 841 + HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is " 842 + fileContext.getBytesPerChecksum()); 843 } 844 this.allocator = allocator; 845 this.dataBlockEncoder = 846 dataBlockEncoder != null ? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE; 847 this.dataBlockEncodingCtx = this.dataBlockEncoder.newDataBlockEncodingContext(conf, 848 HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext); 849 // TODO: This should be lazily instantiated 850 this.defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(conf, null, 851 HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext); 852 // TODO: Set BAOS initial size. Use fileContext.getBlocksize() and add for header/checksum 853 baosInMemory = new ByteArrayOutputStream(); 854 prevOffsetByType = new long[BlockType.values().length]; 855 for (int i = 0; i < prevOffsetByType.length; ++i) { 856 prevOffsetByType[i] = UNSET; 857 } 858 // TODO: Why fileContext saved away when we have dataBlockEncoder and/or 859 // defaultDataBlockEncoder? 860 this.fileContext = fileContext; 861 this.compressedSizePredicator = (BlockCompressedSizePredicator) ReflectionUtils.newInstance( 862 conf.getClass(BLOCK_COMPRESSED_SIZE_PREDICATOR, UncompressedBlockSizePredicator.class), 863 new Configuration(conf)); 864 this.maxSizeUnCompressed = maxSizeUnCompressed; 865 } 866 867 /** 868 * Starts writing into the block. The previous block's data is discarded. 869 * @return the stream the user can write their data into 870 */ 871 DataOutputStream startWriting(BlockType newBlockType) throws IOException { 872 if (state == State.BLOCK_READY && startOffset != -1) { 873 // We had a previous block that was written to a stream at a specific 874 // offset. Save that offset as the last offset of a block of that type. 875 prevOffsetByType[blockType.getId()] = startOffset; 876 } 877 878 startOffset = -1; 879 blockType = newBlockType; 880 881 baosInMemory.reset(); 882 baosInMemory.write(HConstants.HFILEBLOCK_DUMMY_HEADER); 883 884 state = State.WRITING; 885 886 // We will compress it later in finishBlock() 887 userDataStream = new ByteBufferWriterDataOutputStream(baosInMemory); 888 if (newBlockType == BlockType.DATA) { 889 this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream); 890 } 891 return userDataStream; 892 } 893 894 /** 895 * Writes the Cell to this block 896 */ 897 void write(ExtendedCell cell) throws IOException { 898 expectState(State.WRITING); 899 this.dataBlockEncoder.encode(cell, dataBlockEncodingCtx, this.userDataStream); 900 } 901 902 /** 903 * Transitions the block writer from the "writing" state to the "block ready" state. Does 904 * nothing if a block is already finished. 905 */ 906 void ensureBlockReady() throws IOException { 907 Preconditions.checkState(state != State.INIT, "Unexpected state: " + state); 908 909 if (state == State.BLOCK_READY) { 910 return; 911 } 912 913 // This will set state to BLOCK_READY. 914 finishBlock(); 915 } 916 917 public boolean checkBoundariesWithPredicate() { 918 int rawBlockSize = encodedBlockSizeWritten(); 919 if (rawBlockSize >= maxSizeUnCompressed) { 920 return true; 921 } else { 922 return compressedSizePredicator.shouldFinishBlock(rawBlockSize); 923 } 924 } 925 926 /** 927 * Finish up writing of the block. Flushes the compressing stream (if using compression), fills 928 * out the header, does any compression/encryption of bytes to flush out to disk, and manages 929 * the cache on write content, if applicable. Sets block write state to "block ready". 930 */ 931 private void finishBlock() throws IOException { 932 if (blockType == BlockType.DATA) { 933 this.dataBlockEncoder.endBlockEncoding(dataBlockEncodingCtx, userDataStream, 934 baosInMemory.getBuffer(), blockType); 935 blockType = dataBlockEncodingCtx.getBlockType(); 936 } 937 userDataStream.flush(); 938 prevOffset = prevOffsetByType[blockType.getId()]; 939 940 // We need to cache the unencoded/uncompressed size before changing the block state 941 int rawBlockSize = 0; 942 if (this.getEncodingState() != null) { 943 rawBlockSize = encodedBlockSizeWritten(); 944 } 945 // We need to set state before we can package the block up for cache-on-write. In a way, the 946 // block is ready, but not yet encoded or compressed. 947 state = State.BLOCK_READY; 948 Bytes compressAndEncryptDat; 949 if (blockType == BlockType.DATA || blockType == BlockType.ENCODED_DATA) { 950 compressAndEncryptDat = 951 dataBlockEncodingCtx.compressAndEncrypt(baosInMemory.getBuffer(), 0, baosInMemory.size()); 952 } else { 953 compressAndEncryptDat = defaultBlockEncodingCtx.compressAndEncrypt(baosInMemory.getBuffer(), 954 0, baosInMemory.size()); 955 } 956 if (compressAndEncryptDat == null) { 957 compressAndEncryptDat = new Bytes(baosInMemory.getBuffer(), 0, baosInMemory.size()); 958 } 959 if (onDiskBlockBytesWithHeader == null) { 960 onDiskBlockBytesWithHeader = new ByteArrayOutputStream(compressAndEncryptDat.getLength()); 961 } 962 onDiskBlockBytesWithHeader.reset(); 963 onDiskBlockBytesWithHeader.write(compressAndEncryptDat.get(), 964 compressAndEncryptDat.getOffset(), compressAndEncryptDat.getLength()); 965 // Update raw and compressed sizes in the predicate 966 compressedSizePredicator.updateLatestBlockSizes(fileContext, rawBlockSize, 967 onDiskBlockBytesWithHeader.size()); 968 969 // Calculate how many bytes we need for checksum on the tail of the block. 970 int numBytes = (int) ChecksumUtil.numBytes(onDiskBlockBytesWithHeader.size(), 971 fileContext.getBytesPerChecksum()); 972 973 // Put the header for the on disk bytes; header currently is unfilled-out 974 putHeader(onDiskBlockBytesWithHeader, onDiskBlockBytesWithHeader.size() + numBytes, 975 baosInMemory.size(), onDiskBlockBytesWithHeader.size()); 976 977 if (onDiskChecksum.length != numBytes) { 978 onDiskChecksum = new byte[numBytes]; 979 } 980 ChecksumUtil.generateChecksums(onDiskBlockBytesWithHeader.getBuffer(), 0, 981 onDiskBlockBytesWithHeader.size(), onDiskChecksum, 0, fileContext.getChecksumType(), 982 fileContext.getBytesPerChecksum()); 983 } 984 985 /** 986 * Put the header into the given byte array at the given offset. 987 * @param onDiskSize size of the block on disk header + data + checksum 988 * @param uncompressedSize size of the block after decompression (but before optional data block 989 * decoding) including header 990 * @param onDiskDataSize size of the block on disk with header and data but not including the 991 * checksums 992 */ 993 private void putHeader(byte[] dest, int offset, int onDiskSize, int uncompressedSize, 994 int onDiskDataSize) { 995 offset = blockType.put(dest, offset); 996 offset = Bytes.putInt(dest, offset, onDiskSize - HConstants.HFILEBLOCK_HEADER_SIZE); 997 offset = Bytes.putInt(dest, offset, uncompressedSize - HConstants.HFILEBLOCK_HEADER_SIZE); 998 offset = Bytes.putLong(dest, offset, prevOffset); 999 offset = Bytes.putByte(dest, offset, fileContext.getChecksumType().getCode()); 1000 offset = Bytes.putInt(dest, offset, fileContext.getBytesPerChecksum()); 1001 Bytes.putInt(dest, offset, onDiskDataSize); 1002 } 1003 1004 private void putHeader(ByteBuff buff, int onDiskSize, int uncompressedSize, 1005 int onDiskDataSize) { 1006 buff.rewind(); 1007 blockType.write(buff); 1008 buff.putInt(onDiskSize - HConstants.HFILEBLOCK_HEADER_SIZE); 1009 buff.putInt(uncompressedSize - HConstants.HFILEBLOCK_HEADER_SIZE); 1010 buff.putLong(prevOffset); 1011 buff.put(fileContext.getChecksumType().getCode()); 1012 buff.putInt(fileContext.getBytesPerChecksum()); 1013 buff.putInt(onDiskDataSize); 1014 } 1015 1016 private void putHeader(ByteArrayOutputStream dest, int onDiskSize, int uncompressedSize, 1017 int onDiskDataSize) { 1018 putHeader(dest.getBuffer(), 0, onDiskSize, uncompressedSize, onDiskDataSize); 1019 } 1020 1021 /** 1022 * Similar to {@link #writeHeaderAndData(FSDataOutputStream)}, but records the offset of this 1023 * block so that it can be referenced in the next block of the same type. 1024 */ 1025 void writeHeaderAndData(FSDataOutputStream out) throws IOException { 1026 long offset = out.getPos(); 1027 if (startOffset != UNSET && offset != startOffset) { 1028 throw new IOException("A " + blockType + " block written to a " 1029 + "stream twice, first at offset " + startOffset + ", then at " + offset); 1030 } 1031 startOffset = offset; 1032 finishBlockAndWriteHeaderAndData(out); 1033 } 1034 1035 /** 1036 * Writes the header and the compressed data of this block (or uncompressed data when not using 1037 * compression) into the given stream. Can be called in the "writing" state or in the "block 1038 * ready" state. If called in the "writing" state, transitions the writer to the "block ready" 1039 * state. 1040 * @param out the output stream to write the 1041 */ 1042 protected void finishBlockAndWriteHeaderAndData(DataOutputStream out) throws IOException { 1043 ensureBlockReady(); 1044 long startTime = EnvironmentEdgeManager.currentTime(); 1045 out.write(onDiskBlockBytesWithHeader.getBuffer(), 0, onDiskBlockBytesWithHeader.size()); 1046 out.write(onDiskChecksum); 1047 HFile.updateWriteLatency(EnvironmentEdgeManager.currentTime() - startTime); 1048 } 1049 1050 /** 1051 * Returns the header or the compressed data (or uncompressed data when not using compression) 1052 * as a byte array. Can be called in the "writing" state or in the "block ready" state. If 1053 * called in the "writing" state, transitions the writer to the "block ready" state. This 1054 * returns the header + data + checksums stored on disk. 1055 * @return header and data as they would be stored on disk in a byte array 1056 */ 1057 byte[] getHeaderAndDataForTest() throws IOException { 1058 ensureBlockReady(); 1059 // This is not very optimal, because we are doing an extra copy. 1060 // But this method is used only by unit tests. 1061 byte[] output = new byte[onDiskBlockBytesWithHeader.size() + onDiskChecksum.length]; 1062 System.arraycopy(onDiskBlockBytesWithHeader.getBuffer(), 0, output, 0, 1063 onDiskBlockBytesWithHeader.size()); 1064 System.arraycopy(onDiskChecksum, 0, output, onDiskBlockBytesWithHeader.size(), 1065 onDiskChecksum.length); 1066 return output; 1067 } 1068 1069 /** 1070 * Releases resources used by this writer. 1071 */ 1072 void release() { 1073 if (dataBlockEncodingCtx != null) { 1074 dataBlockEncodingCtx.close(); 1075 dataBlockEncodingCtx = null; 1076 } 1077 if (defaultBlockEncodingCtx != null) { 1078 defaultBlockEncodingCtx.close(); 1079 defaultBlockEncodingCtx = null; 1080 } 1081 } 1082 1083 /** 1084 * Returns the on-disk size of the data portion of the block. This is the compressed size if 1085 * compression is enabled. Can only be called in the "block ready" state. Header is not 1086 * compressed, and its size is not included in the return value. 1087 * @return the on-disk size of the block, not including the header. 1088 */ 1089 int getOnDiskSizeWithoutHeader() { 1090 expectState(State.BLOCK_READY); 1091 return onDiskBlockBytesWithHeader.size() + onDiskChecksum.length 1092 - HConstants.HFILEBLOCK_HEADER_SIZE; 1093 } 1094 1095 /** 1096 * Returns the on-disk size of the block. Can only be called in the "block ready" state. 1097 * @return the on-disk size of the block ready to be written, including the header size, the 1098 * data and the checksum data. 1099 */ 1100 int getOnDiskSizeWithHeader() { 1101 expectState(State.BLOCK_READY); 1102 return onDiskBlockBytesWithHeader.size() + onDiskChecksum.length; 1103 } 1104 1105 /** 1106 * The uncompressed size of the block data. Does not include header size. 1107 */ 1108 int getUncompressedSizeWithoutHeader() { 1109 expectState(State.BLOCK_READY); 1110 return baosInMemory.size() - HConstants.HFILEBLOCK_HEADER_SIZE; 1111 } 1112 1113 /** 1114 * The uncompressed size of the block data, including header size. 1115 */ 1116 public int getUncompressedSizeWithHeader() { 1117 expectState(State.BLOCK_READY); 1118 return baosInMemory.size(); 1119 } 1120 1121 /** Returns true if a block is being written */ 1122 boolean isWriting() { 1123 return state == State.WRITING; 1124 } 1125 1126 /** 1127 * Returns the number of bytes written into the current block so far, or zero if not writing the 1128 * block at the moment. Note that this will return zero in the "block ready" state as well. 1129 * @return the number of bytes written 1130 */ 1131 public int encodedBlockSizeWritten() { 1132 return state != State.WRITING ? 0 : this.getEncodingState().getEncodedDataSizeWritten(); 1133 } 1134 1135 /** 1136 * Returns the number of bytes written into the current block so far, or zero if not writing the 1137 * block at the moment. Note that this will return zero in the "block ready" state as well. 1138 * @return the number of bytes written 1139 */ 1140 public int blockSizeWritten() { 1141 return state != State.WRITING ? 0 : this.getEncodingState().getUnencodedDataSizeWritten(); 1142 } 1143 1144 /** 1145 * Clones the header followed by the uncompressed data, even if using compression. This is 1146 * needed for storing uncompressed blocks in the block cache. Can be called in the "writing" 1147 * state or the "block ready" state. Returns only the header and data, does not include checksum 1148 * data. 1149 * @return Returns an uncompressed block ByteBuff for caching on write 1150 */ 1151 ByteBuff cloneUncompressedBufferWithHeader() { 1152 expectState(State.BLOCK_READY); 1153 ByteBuff bytebuff = allocator.allocate(baosInMemory.size()); 1154 baosInMemory.toByteBuff(bytebuff); 1155 int numBytes = (int) ChecksumUtil.numBytes(onDiskBlockBytesWithHeader.size(), 1156 fileContext.getBytesPerChecksum()); 1157 putHeader(bytebuff, onDiskBlockBytesWithHeader.size() + numBytes, baosInMemory.size(), 1158 onDiskBlockBytesWithHeader.size()); 1159 bytebuff.rewind(); 1160 return bytebuff; 1161 } 1162 1163 /** 1164 * Clones the header followed by the on-disk (compressed/encoded/encrypted) data. This is needed 1165 * for storing packed blocks in the block cache. Returns only the header and data, Does not 1166 * include checksum data. 1167 * @return Returns a copy of block bytes for caching on write 1168 */ 1169 private ByteBuff cloneOnDiskBufferWithHeader() { 1170 expectState(State.BLOCK_READY); 1171 ByteBuff bytebuff = allocator.allocate(onDiskBlockBytesWithHeader.size()); 1172 onDiskBlockBytesWithHeader.toByteBuff(bytebuff); 1173 bytebuff.rewind(); 1174 return bytebuff; 1175 } 1176 1177 private void expectState(State expectedState) { 1178 if (state != expectedState) { 1179 throw new IllegalStateException( 1180 "Expected state: " + expectedState + ", actual state: " + state); 1181 } 1182 } 1183 1184 /** 1185 * Takes the given {@link BlockWritable} instance, creates a new block of its appropriate type, 1186 * writes the writable into this block, and flushes the block into the output stream. The writer 1187 * is instructed not to buffer uncompressed bytes for cache-on-write. 1188 * @param bw the block-writable object to write as a block 1189 * @param out the file system output stream 1190 */ 1191 void writeBlock(BlockWritable bw, FSDataOutputStream out) throws IOException { 1192 bw.writeToBlock(startWriting(bw.getBlockType())); 1193 writeHeaderAndData(out); 1194 } 1195 1196 /** 1197 * Creates a new HFileBlock. Checksums have already been validated, so the byte buffer passed 1198 * into the constructor of this newly created block does not have checksum data even though the 1199 * header minor version is MINOR_VERSION_WITH_CHECKSUM. This is indicated by setting a 0 value 1200 * in bytesPerChecksum. This method copies the on-disk or uncompressed data to build the 1201 * HFileBlock which is used only while writing blocks and caching. 1202 * <p> 1203 * TODO: Should there be an option where a cache can ask that hbase preserve block checksums for 1204 * checking after a block comes out of the cache? Otehrwise, cache is responsible for blocks 1205 * being wholesome (ECC memory or if file-backed, it does checksumming). 1206 */ 1207 HFileBlock getBlockForCaching(CacheConfig cacheConf) { 1208 HFileContext newContext = BlockCacheUtil.cloneContext(fileContext); 1209 // Build the HFileBlock. 1210 HFileBlockBuilder builder = new HFileBlockBuilder(); 1211 ByteBuff buff; 1212 if (cacheConf.shouldCacheCompressed(blockType.getCategory())) { 1213 buff = cloneOnDiskBufferWithHeader(); 1214 } else { 1215 buff = cloneUncompressedBufferWithHeader(); 1216 } 1217 return builder.withBlockType(blockType) 1218 .withOnDiskSizeWithoutHeader(getOnDiskSizeWithoutHeader()) 1219 .withUncompressedSizeWithoutHeader(getUncompressedSizeWithoutHeader()) 1220 .withPrevBlockOffset(prevOffset).withByteBuff(buff).withFillHeader(FILL_HEADER) 1221 .withOffset(startOffset).withNextBlockOnDiskSize(UNSET) 1222 .withOnDiskDataSizeWithHeader(onDiskBlockBytesWithHeader.size() + onDiskChecksum.length) 1223 .withHFileContext(newContext).withByteBuffAllocator(cacheConf.getByteBuffAllocator()) 1224 .withShared(!buff.hasArray()).build(); 1225 } 1226 } 1227 1228 /** Something that can be written into a block. */ 1229 interface BlockWritable { 1230 /** The type of block this data should use. */ 1231 BlockType getBlockType(); 1232 1233 /** 1234 * Writes the block to the provided stream. Must not write any magic records. 1235 * @param out a stream to write uncompressed data into 1236 */ 1237 void writeToBlock(DataOutput out) throws IOException; 1238 } 1239 1240 /** 1241 * Iterator for reading {@link HFileBlock}s in load-on-open-section, such as root data index 1242 * block, meta index block, file info block etc. 1243 */ 1244 interface BlockIterator { 1245 /** 1246 * Get the next block, or null if there are no more blocks to iterate. 1247 */ 1248 HFileBlock nextBlock() throws IOException; 1249 1250 /** 1251 * Similar to {@link #nextBlock()} but checks block type, throws an exception if incorrect, and 1252 * returns the HFile block 1253 */ 1254 HFileBlock nextBlockWithBlockType(BlockType blockType) throws IOException; 1255 1256 /** 1257 * Now we use the {@link ByteBuffAllocator} to manage the nio ByteBuffers for HFileBlocks, so we 1258 * must deallocate all of the ByteBuffers in the end life. the BlockIterator's life cycle is 1259 * starting from opening an HFileReader and stopped when the HFileReader#close, so we will keep 1260 * track all the read blocks until we call {@link BlockIterator#freeBlocks()} when closing the 1261 * HFileReader. Sum bytes of those blocks in load-on-open section should be quite small, so 1262 * tracking them should be OK. 1263 */ 1264 void freeBlocks(); 1265 } 1266 1267 /** An HFile block reader with iteration ability. */ 1268 interface FSReader { 1269 /** 1270 * Reads the block at the given offset in the file with the given on-disk size and uncompressed 1271 * size. 1272 * @param offset of the file to read 1273 * @param onDiskSize the on-disk size of the entire block, including all applicable headers, 1274 * or -1 if unknown 1275 * @param pread true to use pread, otherwise use the stream read. 1276 * @param updateMetrics update the metrics or not. 1277 * @param intoHeap allocate the block's ByteBuff by {@link ByteBuffAllocator} or JVM heap. 1278 * For LRUBlockCache, we must ensure that the block to cache is an heap 1279 * one, because the memory occupation is based on heap now, also for 1280 * {@link CombinedBlockCache}, we use the heap LRUBlockCache as L1 cache to 1281 * cache small blocks such as IndexBlock or MetaBlock for faster access. So 1282 * introduce an flag here to decide whether allocate from JVM heap or not 1283 * so that we can avoid an extra off-heap to heap memory copy when using 1284 * LRUBlockCache. For most cases, we known what's the expected block type 1285 * we'll read, while for some special case (Example: 1286 * HFileReaderImpl#readNextDataBlock()), we cannot pre-decide what's the 1287 * expected block type, then we can only allocate block's ByteBuff from 1288 * {@link ByteBuffAllocator} firstly, and then when caching it in 1289 * {@link LruBlockCache} we'll check whether the ByteBuff is from heap or 1290 * not, if not then we'll clone it to an heap one and cache it. 1291 * @return the newly read block 1292 */ 1293 HFileBlock readBlockData(long offset, long onDiskSize, boolean pread, boolean updateMetrics, 1294 boolean intoHeap) throws IOException; 1295 1296 /** 1297 * Creates a block iterator over the given portion of the {@link HFile}. The iterator returns 1298 * blocks starting with offset such that offset <= startOffset < endOffset. Returned 1299 * blocks are always unpacked. Used when no hfile index available; e.g. reading in the hfile 1300 * index blocks themselves on file open. 1301 * @param startOffset the offset of the block to start iteration with 1302 * @param endOffset the offset to end iteration at (exclusive) 1303 * @return an iterator of blocks between the two given offsets 1304 */ 1305 BlockIterator blockRange(long startOffset, long endOffset); 1306 1307 /** Closes the backing streams */ 1308 void closeStreams() throws IOException; 1309 1310 /** Get a decoder for {@link BlockType#ENCODED_DATA} blocks from this file. */ 1311 HFileBlockDecodingContext getBlockDecodingContext(); 1312 1313 /** Get the default decoder for blocks from this file. */ 1314 HFileBlockDecodingContext getDefaultBlockDecodingContext(); 1315 1316 void setIncludesMemStoreTS(boolean includesMemstoreTS); 1317 1318 void setDataBlockEncoder(HFileDataBlockEncoder encoder, Configuration conf); 1319 1320 /** 1321 * To close the stream's socket. Note: This can be concurrently called from multiple threads and 1322 * implementation should take care of thread safety. 1323 */ 1324 void unbufferStream(); 1325 } 1326 1327 /** 1328 * Data-structure to use caching the header of the NEXT block. Only works if next read that comes 1329 * in here is next in sequence in this block. When we read, we read current block and the next 1330 * blocks' header. We do this so we have the length of the next block to read if the hfile index 1331 * is not available (rare, at hfile open only). 1332 */ 1333 private static class PrefetchedHeader { 1334 long offset = -1; 1335 byte[] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE]; 1336 final ByteBuff buf = new SingleByteBuff(ByteBuffer.wrap(header, 0, header.length)); 1337 1338 @Override 1339 public String toString() { 1340 return "offset=" + this.offset + ", header=" + Bytes.toStringBinary(header); 1341 } 1342 } 1343 1344 /** 1345 * Reads version 2 HFile blocks from the filesystem. 1346 */ 1347 static class FSReaderImpl implements FSReader { 1348 /** 1349 * The file system stream of the underlying {@link HFile} that does or doesn't do checksum 1350 * validations in the filesystem 1351 */ 1352 private FSDataInputStreamWrapper streamWrapper; 1353 1354 private HFileBlockDecodingContext encodedBlockDecodingCtx; 1355 1356 /** Default context used when BlockType != {@link BlockType#ENCODED_DATA}. */ 1357 private final HFileBlockDefaultDecodingContext defaultDecodingCtx; 1358 1359 /** 1360 * Cache of the NEXT header after this. Check it is indeed next blocks header before using it. 1361 * TODO: Review. This overread into next block to fetch next blocks header seems unnecessary 1362 * given we usually get the block size from the hfile index. Review! 1363 */ 1364 private AtomicReference<PrefetchedHeader> prefetchedHeader = 1365 new AtomicReference<>(new PrefetchedHeader()); 1366 1367 /** The size of the file we are reading from, or -1 if unknown. */ 1368 private long fileSize; 1369 1370 /** The size of the header */ 1371 protected final int hdrSize; 1372 1373 /** The filesystem used to access data */ 1374 private HFileSystem hfs; 1375 1376 private HFileContext fileContext; 1377 // Cache the fileName 1378 private String pathName; 1379 1380 private final ByteBuffAllocator allocator; 1381 1382 private final Lock streamLock = new ReentrantLock(); 1383 1384 private final boolean isPreadAllBytes; 1385 1386 private final long readWarnTime; 1387 1388 /** 1389 * If reading block cost time in milliseconds more than the threshold, a warning will be logged. 1390 */ 1391 public static final String FS_READER_WARN_TIME_MS = "hbase.fs.reader.warn.time.ms"; 1392 1393 FSReaderImpl(ReaderContext readerContext, HFileContext fileContext, ByteBuffAllocator allocator, 1394 Configuration conf) throws IOException { 1395 this.fileSize = readerContext.getFileSize(); 1396 this.hfs = readerContext.getFileSystem(); 1397 if (readerContext.getFilePath() != null) { 1398 this.pathName = readerContext.getFilePath().toString(); 1399 } 1400 this.fileContext = fileContext; 1401 this.hdrSize = headerSize(fileContext.isUseHBaseChecksum()); 1402 this.allocator = allocator; 1403 1404 this.streamWrapper = readerContext.getInputStreamWrapper(); 1405 // Older versions of HBase didn't support checksum. 1406 this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum()); 1407 defaultDecodingCtx = new HFileBlockDefaultDecodingContext(conf, fileContext); 1408 encodedBlockDecodingCtx = defaultDecodingCtx; 1409 isPreadAllBytes = readerContext.isPreadAllBytes(); 1410 // Default warn threshold set to -1, it means skipping record the read block slow warning log. 1411 readWarnTime = conf.getLong(FS_READER_WARN_TIME_MS, -1L); 1412 } 1413 1414 @Override 1415 public BlockIterator blockRange(final long startOffset, final long endOffset) { 1416 final FSReader owner = this; // handle for inner class 1417 return new BlockIterator() { 1418 private volatile boolean freed = false; 1419 // Tracking all read blocks until we call freeBlocks. 1420 private List<HFileBlock> blockTracker = new ArrayList<>(); 1421 private long offset = startOffset; 1422 // Cache length of next block. Current block has the length of next block in it. 1423 private long length = -1; 1424 1425 @Override 1426 public HFileBlock nextBlock() throws IOException { 1427 if (offset >= endOffset) { 1428 return null; 1429 } 1430 HFileBlock b = readBlockData(offset, length, false, false, true); 1431 offset += b.getOnDiskSizeWithHeader(); 1432 length = b.getNextBlockOnDiskSize(); 1433 HFileBlock uncompressed = b.unpack(fileContext, owner); 1434 if (uncompressed != b) { 1435 b.release(); // Need to release the compressed Block now. 1436 } 1437 blockTracker.add(uncompressed); 1438 return uncompressed; 1439 } 1440 1441 @Override 1442 public HFileBlock nextBlockWithBlockType(BlockType blockType) throws IOException { 1443 HFileBlock blk = nextBlock(); 1444 if (blk.getBlockType() != blockType) { 1445 throw new IOException( 1446 "Expected block of type " + blockType + " but found " + blk.getBlockType()); 1447 } 1448 return blk; 1449 } 1450 1451 @Override 1452 public void freeBlocks() { 1453 if (freed) { 1454 return; 1455 } 1456 blockTracker.forEach(HFileBlock::release); 1457 blockTracker = null; 1458 freed = true; 1459 } 1460 }; 1461 } 1462 1463 /** 1464 * Does a positional read or a seek and read into the given byte buffer. We need take care that 1465 * we will call the {@link ByteBuff#release()} for every exit to deallocate the ByteBuffers, 1466 * otherwise the memory leak may happen. 1467 * @param dest destination buffer 1468 * @param size size of read 1469 * @param peekIntoNextBlock whether to read the next block's on-disk size 1470 * @param fileOffset position in the stream to read at 1471 * @param pread whether we should do a positional read 1472 * @param istream The input source of data 1473 * @return true to indicate the destination buffer include the next block header, otherwise only 1474 * include the current block data without the next block header. 1475 * @throws IOException if any IO error happen. 1476 */ 1477 protected boolean readAtOffset(FSDataInputStream istream, ByteBuff dest, int size, 1478 boolean peekIntoNextBlock, long fileOffset, boolean pread) throws IOException { 1479 if (!pread) { 1480 // Seek + read. Better for scanning. 1481 istream.seek(fileOffset); 1482 long realOffset = istream.getPos(); 1483 if (realOffset != fileOffset) { 1484 throw new IOException("Tried to seek to " + fileOffset + " to read " + size 1485 + " bytes, but pos=" + realOffset + " after seek"); 1486 } 1487 if (!peekIntoNextBlock) { 1488 BlockIOUtils.readFully(dest, istream, size); 1489 return false; 1490 } 1491 1492 // Try to read the next block header 1493 if (!BlockIOUtils.readWithExtra(dest, istream, size, hdrSize)) { 1494 // did not read the next block header. 1495 return false; 1496 } 1497 } else { 1498 // Positional read. Better for random reads; or when the streamLock is already locked. 1499 int extraSize = peekIntoNextBlock ? hdrSize : 0; 1500 if ( 1501 !BlockIOUtils.preadWithExtra(dest, istream, fileOffset, size, extraSize, isPreadAllBytes) 1502 ) { 1503 // did not read the next block header. 1504 return false; 1505 } 1506 } 1507 assert peekIntoNextBlock; 1508 return true; 1509 } 1510 1511 /** 1512 * Reads a version 2 block (version 1 blocks not supported and not expected). Tries to do as 1513 * little memory allocation as possible, using the provided on-disk size. 1514 * @param offset the offset in the stream to read at 1515 * @param onDiskSizeWithHeaderL the on-disk size of the block, including the header, or -1 if 1516 * unknown; i.e. when iterating over blocks reading in the file 1517 * metadata info. 1518 * @param pread whether to use a positional read 1519 * @param updateMetrics whether to update the metrics 1520 * @param intoHeap allocate ByteBuff of block from heap or off-heap. 1521 * @see FSReader#readBlockData(long, long, boolean, boolean, boolean) for more details about the 1522 * useHeap. 1523 */ 1524 @Override 1525 public HFileBlock readBlockData(long offset, long onDiskSizeWithHeaderL, boolean pread, 1526 boolean updateMetrics, boolean intoHeap) throws IOException { 1527 // Get a copy of the current state of whether to validate 1528 // hbase checksums or not for this read call. This is not 1529 // thread-safe but the one constraint is that if we decide 1530 // to skip hbase checksum verification then we are 1531 // guaranteed to use hdfs checksum verification. 1532 boolean doVerificationThruHBaseChecksum = streamWrapper.shouldUseHBaseChecksum(); 1533 FSDataInputStream is = streamWrapper.getStream(doVerificationThruHBaseChecksum); 1534 final Context context = Context.current().with(CONTEXT_KEY, 1535 new HFileContextAttributesBuilderConsumer(fileContext) 1536 .setSkipChecksum(doVerificationThruHBaseChecksum) 1537 .setReadType(pread ? ReadType.POSITIONAL_READ : ReadType.SEEK_PLUS_READ)); 1538 try (Scope ignored = context.makeCurrent()) { 1539 HFileBlock blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread, 1540 doVerificationThruHBaseChecksum, updateMetrics, intoHeap); 1541 if (blk == null) { 1542 HFile.LOG.warn("HBase checksum verification failed for file {} at offset {} filesize {}." 1543 + " Retrying read with HDFS checksums turned on...", pathName, offset, fileSize); 1544 1545 if (!doVerificationThruHBaseChecksum) { 1546 String msg = "HBase checksum verification failed for file " + pathName + " at offset " 1547 + offset + " filesize " + fileSize + " but this cannot happen because doVerify is " 1548 + doVerificationThruHBaseChecksum; 1549 HFile.LOG.warn(msg); 1550 throw new IOException(msg); // cannot happen case here 1551 } 1552 HFile.CHECKSUM_FAILURES.increment(); // update metrics 1553 1554 // If we have a checksum failure, we fall back into a mode where 1555 // the next few reads use HDFS level checksums. We aim to make the 1556 // next CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD reads avoid 1557 // hbase checksum verification, but since this value is set without 1558 // holding any locks, it can so happen that we might actually do 1559 // a few more than precisely this number. 1560 is = this.streamWrapper.fallbackToFsChecksum(CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD); 1561 doVerificationThruHBaseChecksum = false; 1562 blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread, 1563 doVerificationThruHBaseChecksum, updateMetrics, intoHeap); 1564 if (blk != null) { 1565 HFile.LOG.warn( 1566 "HDFS checksum verification succeeded for file {} at offset {} filesize" + " {}", 1567 pathName, offset, fileSize); 1568 } 1569 } 1570 if (blk == null && !doVerificationThruHBaseChecksum) { 1571 String msg = 1572 "readBlockData failed, possibly due to " + "checksum verification failed for file " 1573 + pathName + " at offset " + offset + " filesize " + fileSize; 1574 HFile.LOG.warn(msg); 1575 throw new IOException(msg); 1576 } 1577 1578 // If there is a checksum mismatch earlier, then retry with 1579 // HBase checksums switched off and use HDFS checksum verification. 1580 // This triggers HDFS to detect and fix corrupt replicas. The 1581 // next checksumOffCount read requests will use HDFS checksums. 1582 // The decrementing of this.checksumOffCount is not thread-safe, 1583 // but it is harmless because eventually checksumOffCount will be 1584 // a negative number. 1585 streamWrapper.checksumOk(); 1586 return blk; 1587 } 1588 } 1589 1590 /** 1591 * Check that checksumType on {@code headerBuf} read from a block header seems reasonable, 1592 * within the known value range. 1593 * @return {@code true} if the headerBuf is safe to proceed, {@code false} otherwise. 1594 */ 1595 private boolean checkCheckSumTypeOnHeaderBuf(ByteBuff headerBuf) { 1596 if (headerBuf == null) { 1597 return true; 1598 } 1599 byte b = headerBuf.get(HFileBlock.Header.CHECKSUM_TYPE_INDEX); 1600 for (ChecksumType t : ChecksumType.values()) { 1601 if (t.getCode() == b) { 1602 return true; 1603 } 1604 } 1605 return false; 1606 } 1607 1608 /** 1609 * Check that {@code value} read from a block header seems reasonable, within a large margin of 1610 * error. 1611 * @return {@code true} if the value is safe to proceed, {@code false} otherwise. 1612 */ 1613 private boolean checkOnDiskSizeWithHeader(int value) { 1614 if (value < 0) { 1615 if (LOG.isTraceEnabled()) { 1616 LOG.trace( 1617 "onDiskSizeWithHeader={}; value represents a size, so it should never be negative.", 1618 value); 1619 } 1620 return false; 1621 } 1622 if (value - hdrSize < 0) { 1623 if (LOG.isTraceEnabled()) { 1624 LOG.trace("onDiskSizeWithHeader={}, hdrSize={}; don't accept a value that is negative" 1625 + " after the header size is excluded.", value, hdrSize); 1626 } 1627 return false; 1628 } 1629 return true; 1630 } 1631 1632 /** 1633 * Check that {@code value} provided by the calling context seems reasonable, within a large 1634 * margin of error. 1635 * @return {@code true} if the value is safe to proceed, {@code false} otherwise. 1636 */ 1637 private boolean checkCallerProvidedOnDiskSizeWithHeader(long value) { 1638 // same validation logic as is used by Math.toIntExact(long) 1639 int intValue = (int) value; 1640 if (intValue != value) { 1641 if (LOG.isTraceEnabled()) { 1642 LOG.trace("onDiskSizeWithHeaderL={}; value exceeds int size limits.", value); 1643 } 1644 return false; 1645 } 1646 if (intValue == -1) { 1647 // a magic value we expect to see. 1648 return true; 1649 } 1650 return checkOnDiskSizeWithHeader(intValue); 1651 } 1652 1653 /** 1654 * Check atomic reference cache for this block's header. Cache only good if next read coming 1655 * through is next in sequence in the block. We read next block's header on the tail of reading 1656 * the previous block to save a seek. Otherwise, we have to do a seek to read the header before 1657 * we can pull in the block OR we have to backup the stream because we over-read (the next 1658 * block's header). 1659 * @see PrefetchedHeader 1660 * @return The cached block header or null if not found. 1661 * @see #cacheNextBlockHeader(long, ByteBuff, int, int) 1662 */ 1663 private ByteBuff getCachedHeader(final long offset) { 1664 PrefetchedHeader ph = this.prefetchedHeader.get(); 1665 return ph != null && ph.offset == offset ? ph.buf : null; 1666 } 1667 1668 /** 1669 * Save away the next blocks header in atomic reference. 1670 * @see #getCachedHeader(long) 1671 * @see PrefetchedHeader 1672 */ 1673 private void cacheNextBlockHeader(final long offset, ByteBuff onDiskBlock, 1674 int onDiskSizeWithHeader, int headerLength) { 1675 PrefetchedHeader ph = new PrefetchedHeader(); 1676 ph.offset = offset; 1677 onDiskBlock.get(onDiskSizeWithHeader, ph.header, 0, headerLength); 1678 this.prefetchedHeader.set(ph); 1679 } 1680 1681 /** 1682 * Clear the cached value when its integrity is suspect. 1683 */ 1684 private void invalidateNextBlockHeader() { 1685 prefetchedHeader.set(null); 1686 } 1687 1688 private int getNextBlockOnDiskSize(ByteBuff onDiskBlock, int onDiskSizeWithHeader) { 1689 return onDiskBlock.getIntAfterPosition(onDiskSizeWithHeader + BlockType.MAGIC_LENGTH) 1690 + hdrSize; 1691 } 1692 1693 private ByteBuff allocate(int size, boolean intoHeap) { 1694 return intoHeap ? HEAP.allocate(size) : allocator.allocate(size); 1695 } 1696 1697 /** 1698 * Reads a version 2 block. 1699 * @param offset the offset in the stream to read at. 1700 * @param onDiskSizeWithHeaderL the on-disk size of the block, including the header and 1701 * checksums if present or -1 if unknown (as a long). Can be -1 if 1702 * we are doing raw iteration of blocks as when loading up file 1703 * metadata; i.e. the first read of a new file. Usually non-null 1704 * gotten from the file index. 1705 * @param pread whether to use a positional read 1706 * @param verifyChecksum Whether to use HBase checksums. If HBase checksum is switched 1707 * off, then use HDFS checksum. Can also flip on/off reading same 1708 * file if we hit a troublesome patch in an hfile. 1709 * @param updateMetrics whether need to update the metrics. 1710 * @param intoHeap allocate the ByteBuff of block from heap or off-heap. 1711 * @return the HFileBlock or null if there is a HBase checksum mismatch 1712 */ 1713 protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset, 1714 long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum, boolean updateMetrics, 1715 boolean intoHeap) throws IOException { 1716 final Span span = Span.current(); 1717 final AttributesBuilder attributesBuilder = Attributes.builder(); 1718 Optional.of(Context.current()).map(val -> val.get(CONTEXT_KEY)) 1719 .ifPresent(c -> c.accept(attributesBuilder)); 1720 if (offset < 0) { 1721 throw new IOException("Invalid offset=" + offset + " trying to read " + "block (onDiskSize=" 1722 + onDiskSizeWithHeaderL + ")"); 1723 } 1724 if (!checkCallerProvidedOnDiskSizeWithHeader(onDiskSizeWithHeaderL)) { 1725 LOG.trace("Caller provided invalid onDiskSizeWithHeaderL={}", onDiskSizeWithHeaderL); 1726 onDiskSizeWithHeaderL = -1; 1727 } 1728 int onDiskSizeWithHeader = (int) onDiskSizeWithHeaderL; 1729 1730 // Try to use the cached header. Will serve us in rare case where onDiskSizeWithHeaderL==-1 1731 // and will save us having to seek the stream backwards to reread the header we 1732 // read the last time through here. 1733 ByteBuff headerBuf = getCachedHeader(offset); 1734 LOG.trace( 1735 "Reading {} at offset={}, pread={}, verifyChecksum={}, cachedHeader={}, " 1736 + "onDiskSizeWithHeader={}", 1737 this.fileContext.getHFileName(), offset, pread, verifyChecksum, headerBuf, 1738 onDiskSizeWithHeader); 1739 // This is NOT same as verifyChecksum. This latter is whether to do hbase 1740 // checksums. Can change with circumstances. The below flag is whether the 1741 // file has support for checksums (version 2+). 1742 boolean checksumSupport = this.fileContext.isUseHBaseChecksum(); 1743 long startTime = EnvironmentEdgeManager.currentTime(); 1744 if (onDiskSizeWithHeader == -1) { 1745 // The caller does not know the block size. Need to get it from the header. If header was 1746 // not cached (see getCachedHeader above), need to seek to pull it in. This is costly 1747 // and should happen very rarely. Currently happens on open of a hfile reader where we 1748 // read the trailer blocks to pull in the indices. Otherwise, we are reading block sizes 1749 // out of the hfile index. To check, enable TRACE in this file and you'll get an exception 1750 // in a LOG every time we seek. See HBASE-17072 for more detail. 1751 if (headerBuf == null) { 1752 if (LOG.isTraceEnabled()) { 1753 LOG.trace("Extra seek to get block size!", new RuntimeException()); 1754 } 1755 span.addEvent("Extra seek to get block size!", attributesBuilder.build()); 1756 headerBuf = HEAP.allocate(hdrSize); 1757 readAtOffset(is, headerBuf, hdrSize, false, offset, pread); 1758 headerBuf.rewind(); 1759 } 1760 onDiskSizeWithHeader = getOnDiskSizeWithHeader(headerBuf, checksumSupport); 1761 } 1762 1763 // Inspect the header's checksumType for known valid values. If we don't find such a value, 1764 // assume that the bytes read are corrupted.We will clear the cached value and roll back to 1765 // HDFS checksum 1766 if (!checkCheckSumTypeOnHeaderBuf(headerBuf)) { 1767 if (verifyChecksum) { 1768 invalidateNextBlockHeader(); 1769 span.addEvent("Falling back to HDFS checksumming.", attributesBuilder.build()); 1770 return null; 1771 } else { 1772 throw new IOException( 1773 "Unknown checksum type code " + headerBuf.get(HFileBlock.Header.CHECKSUM_TYPE_INDEX) 1774 + "for file " + pathName + ", the headerBuf of HFileBlock may corrupted."); 1775 } 1776 } 1777 1778 // The common case is that onDiskSizeWithHeader was produced by a read without checksum 1779 // validation, so give it a sanity check before trying to use it. 1780 if (!checkOnDiskSizeWithHeader(onDiskSizeWithHeader)) { 1781 if (verifyChecksum) { 1782 invalidateNextBlockHeader(); 1783 span.addEvent("Falling back to HDFS checksumming.", attributesBuilder.build()); 1784 return null; 1785 } else { 1786 throw new IOException("Invalid onDiskSizeWithHeader=" + onDiskSizeWithHeader); 1787 } 1788 } 1789 1790 int preReadHeaderSize = headerBuf == null ? 0 : hdrSize; 1791 // Allocate enough space to fit the next block's header too; saves a seek next time through. 1792 // onDiskBlock is whole block + header + checksums then extra hdrSize to read next header; 1793 // onDiskSizeWithHeader is header, body, and any checksums if present. preReadHeaderSize 1794 // says where to start reading. If we have the header cached, then we don't need to read 1795 // it again and we can likely read from last place we left off w/o need to backup and reread 1796 // the header we read last time through here. 1797 ByteBuff onDiskBlock = this.allocate(onDiskSizeWithHeader + hdrSize, intoHeap); 1798 boolean initHFileBlockSuccess = false; 1799 try { 1800 if (headerBuf != null) { 1801 onDiskBlock.put(0, headerBuf, 0, hdrSize).position(hdrSize); 1802 } 1803 boolean readNextHeader = readAtOffset(is, onDiskBlock, 1804 onDiskSizeWithHeader - preReadHeaderSize, true, offset + preReadHeaderSize, pread); 1805 onDiskBlock.rewind(); // in case of moving position when copying a cached header 1806 1807 // the call to validateChecksum for this block excludes the next block header over-read, so 1808 // no reason to delay extracting this value. 1809 int nextBlockOnDiskSize = -1; 1810 if (readNextHeader) { 1811 int parsedVal = getNextBlockOnDiskSize(onDiskBlock, onDiskSizeWithHeader); 1812 if (checkOnDiskSizeWithHeader(parsedVal)) { 1813 nextBlockOnDiskSize = parsedVal; 1814 } 1815 } 1816 if (headerBuf == null) { 1817 headerBuf = onDiskBlock.duplicate().position(0).limit(hdrSize); 1818 } 1819 1820 ByteBuff curBlock = onDiskBlock.duplicate().position(0).limit(onDiskSizeWithHeader); 1821 // Verify checksum of the data before using it for building HFileBlock. 1822 if (verifyChecksum && !validateChecksum(offset, curBlock, hdrSize)) { 1823 invalidateNextBlockHeader(); 1824 span.addEvent("Falling back to HDFS checksumming.", attributesBuilder.build()); 1825 return null; 1826 } 1827 1828 // TODO: is this check necessary or can we proceed with a provided value regardless of 1829 // what is in the header? 1830 int fromHeader = getOnDiskSizeWithHeader(headerBuf, checksumSupport); 1831 if (onDiskSizeWithHeader != fromHeader) { 1832 if (LOG.isTraceEnabled()) { 1833 LOG.trace("Passed in onDiskSizeWithHeader={} != {}, offset={}, fileContext={}", 1834 onDiskSizeWithHeader, fromHeader, offset, this.fileContext); 1835 } 1836 if (checksumSupport && verifyChecksum) { 1837 // This file supports HBase checksums and verification of those checksums was 1838 // requested. The block size provided by the caller (presumably from the block index) 1839 // does not match the block size written to the block header. treat this as 1840 // HBase-checksum failure. 1841 span.addEvent("Falling back to HDFS checksumming.", attributesBuilder.build()); 1842 invalidateNextBlockHeader(); 1843 return null; 1844 } 1845 throw new IOException("Passed in onDiskSizeWithHeader=" + onDiskSizeWithHeader + " != " 1846 + fromHeader + ", offset=" + offset + ", fileContext=" + this.fileContext); 1847 } 1848 1849 // remove checksum from buffer now that it's verified 1850 int sizeWithoutChecksum = curBlock.getInt(Header.ON_DISK_DATA_SIZE_WITH_HEADER_INDEX); 1851 curBlock.limit(sizeWithoutChecksum); 1852 long duration = EnvironmentEdgeManager.currentTime() - startTime; 1853 boolean tooSlow = this.readWarnTime >= 0 && duration > this.readWarnTime; 1854 if (updateMetrics) { 1855 HFile.updateReadLatency(duration, pread, tooSlow); 1856 } 1857 // The onDiskBlock will become the headerAndDataBuffer for this block. 1858 // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already 1859 // contains the header of next block, so no need to set next block's header in it. 1860 HFileBlock hFileBlock = createFromBuff(curBlock, checksumSupport, offset, 1861 nextBlockOnDiskSize, fileContext, intoHeap ? HEAP : allocator); 1862 // Run check on uncompressed sizings. 1863 if (!fileContext.isCompressedOrEncrypted()) { 1864 hFileBlock.sanityCheckUncompressed(); 1865 } 1866 LOG.trace("Read {} in {} ms", hFileBlock, duration); 1867 if (!LOG.isTraceEnabled() && tooSlow) { 1868 LOG.warn("Read Block Slow: read {} cost {} ms, threshold = {} ms", hFileBlock, duration, 1869 this.readWarnTime); 1870 } 1871 span.addEvent("Read block", attributesBuilder.build()); 1872 // Cache next block header if we read it for the next time through here. 1873 if (nextBlockOnDiskSize != -1) { 1874 cacheNextBlockHeader(offset + hFileBlock.getOnDiskSizeWithHeader(), onDiskBlock, 1875 onDiskSizeWithHeader, hdrSize); 1876 } 1877 initHFileBlockSuccess = true; 1878 return hFileBlock; 1879 } finally { 1880 if (!initHFileBlockSuccess) { 1881 onDiskBlock.release(); 1882 } 1883 } 1884 } 1885 1886 @Override 1887 public void setIncludesMemStoreTS(boolean includesMemstoreTS) { 1888 this.fileContext = 1889 new HFileContextBuilder(this.fileContext).withIncludesMvcc(includesMemstoreTS).build(); 1890 } 1891 1892 @Override 1893 public void setDataBlockEncoder(HFileDataBlockEncoder encoder, Configuration conf) { 1894 encodedBlockDecodingCtx = encoder.newDataBlockDecodingContext(conf, fileContext); 1895 } 1896 1897 @Override 1898 public HFileBlockDecodingContext getBlockDecodingContext() { 1899 return this.encodedBlockDecodingCtx; 1900 } 1901 1902 @Override 1903 public HFileBlockDecodingContext getDefaultBlockDecodingContext() { 1904 return this.defaultDecodingCtx; 1905 } 1906 1907 /** 1908 * Generates the checksum for the header as well as the data and then validates it. If the block 1909 * doesn't uses checksum, returns false. 1910 * @return True if checksum matches, else false. 1911 */ 1912 private boolean validateChecksum(long offset, ByteBuff data, int hdrSize) { 1913 // If this is an older version of the block that does not have checksums, then return false 1914 // indicating that checksum verification did not succeed. Actually, this method should never 1915 // be called when the minorVersion is 0, thus this is a defensive check for a cannot-happen 1916 // case. Since this is a cannot-happen case, it is better to return false to indicate a 1917 // checksum validation failure. 1918 if (!fileContext.isUseHBaseChecksum()) { 1919 return false; 1920 } 1921 1922 // If the checksumType of the read block header is incorrect, it indicates that the block is 1923 // corrupted and can be directly rolled back to HDFS checksum verification 1924 if (!checkCheckSumTypeOnHeaderBuf(data)) { 1925 HFile.LOG.warn( 1926 "HBase checksumType verification failed for file {} at offset {} filesize {}" 1927 + " checksumType {}. Retrying read with HDFS checksums turned on...", 1928 pathName, offset, fileSize, data.get(HFileBlock.Header.CHECKSUM_TYPE_INDEX)); 1929 return false; 1930 } 1931 1932 return ChecksumUtil.validateChecksum(data, pathName, offset, hdrSize); 1933 } 1934 1935 @Override 1936 public void closeStreams() throws IOException { 1937 streamWrapper.close(); 1938 } 1939 1940 @Override 1941 public void unbufferStream() { 1942 // To handle concurrent reads, ensure that no other client is accessing the streams while we 1943 // unbuffer it. 1944 if (streamLock.tryLock()) { 1945 try { 1946 this.streamWrapper.unbuffer(); 1947 } finally { 1948 streamLock.unlock(); 1949 } 1950 } 1951 } 1952 1953 @Override 1954 public String toString() { 1955 return "hfs=" + hfs + ", path=" + pathName + ", fileContext=" + fileContext; 1956 } 1957 } 1958 1959 /** An additional sanity-check in case no compression or encryption is being used. */ 1960 void sanityCheckUncompressed() throws IOException { 1961 if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader + totalChecksumBytes()) { 1962 throw new IOException("Using no compression but " + "onDiskSizeWithoutHeader=" 1963 + onDiskSizeWithoutHeader + ", " + "uncompressedSizeWithoutHeader=" 1964 + uncompressedSizeWithoutHeader + ", numChecksumbytes=" + totalChecksumBytes()); 1965 } 1966 } 1967 1968 // Cacheable implementation 1969 @Override 1970 public int getSerializedLength() { 1971 if (bufWithoutChecksum != null) { 1972 // Include extra bytes for block metadata. 1973 return this.bufWithoutChecksum.limit() + BLOCK_METADATA_SPACE; 1974 } 1975 return 0; 1976 } 1977 1978 // Cacheable implementation 1979 @Override 1980 public void serialize(ByteBuffer destination, boolean includeNextBlockMetadata) { 1981 this.bufWithoutChecksum.get(destination, 0, getSerializedLength() - BLOCK_METADATA_SPACE); 1982 destination = addMetaData(destination, includeNextBlockMetadata); 1983 1984 // Make it ready for reading. flip sets position to zero and limit to current position which 1985 // is what we want if we do not want to serialize the block plus checksums if present plus 1986 // metadata. 1987 destination.flip(); 1988 } 1989 1990 /** 1991 * For use by bucketcache. This exposes internals. 1992 */ 1993 public ByteBuffer getMetaData(ByteBuffer bb) { 1994 bb = addMetaData(bb, true); 1995 bb.flip(); 1996 return bb; 1997 } 1998 1999 /** 2000 * Adds metadata at current position (position is moved forward). Does not flip or reset. 2001 * @return The passed <code>destination</code> with metadata added. 2002 */ 2003 private ByteBuffer addMetaData(final ByteBuffer destination, boolean includeNextBlockMetadata) { 2004 destination.put(this.fileContext.isUseHBaseChecksum() ? (byte) 1 : (byte) 0); 2005 destination.putLong(this.offset); 2006 if (includeNextBlockMetadata) { 2007 destination.putInt(this.nextBlockOnDiskSize); 2008 } 2009 return destination; 2010 } 2011 2012 // Cacheable implementation 2013 @Override 2014 public CacheableDeserializer<Cacheable> getDeserializer() { 2015 return HFileBlock.BLOCK_DESERIALIZER; 2016 } 2017 2018 @Override 2019 public int hashCode() { 2020 int result = 1; 2021 result = result * 31 + blockType.hashCode(); 2022 result = result * 31 + nextBlockOnDiskSize; 2023 result = result * 31 + (int) (offset ^ (offset >>> 32)); 2024 result = result * 31 + onDiskSizeWithoutHeader; 2025 result = result * 31 + (int) (prevBlockOffset ^ (prevBlockOffset >>> 32)); 2026 result = result * 31 + uncompressedSizeWithoutHeader; 2027 result = result * 31 + bufWithoutChecksum.hashCode(); 2028 return result; 2029 } 2030 2031 @Override 2032 public boolean equals(Object comparison) { 2033 if (this == comparison) { 2034 return true; 2035 } 2036 if (comparison == null) { 2037 return false; 2038 } 2039 if (!(comparison instanceof HFileBlock)) { 2040 return false; 2041 } 2042 2043 HFileBlock castedComparison = (HFileBlock) comparison; 2044 2045 if (castedComparison.blockType != this.blockType) { 2046 return false; 2047 } 2048 if (castedComparison.nextBlockOnDiskSize != this.nextBlockOnDiskSize) { 2049 return false; 2050 } 2051 // Offset is important. Needed when we have to remake cachekey when block is returned to cache. 2052 if (castedComparison.offset != this.offset) { 2053 return false; 2054 } 2055 if (castedComparison.onDiskSizeWithoutHeader != this.onDiskSizeWithoutHeader) { 2056 return false; 2057 } 2058 if (castedComparison.prevBlockOffset != this.prevBlockOffset) { 2059 return false; 2060 } 2061 if (castedComparison.uncompressedSizeWithoutHeader != this.uncompressedSizeWithoutHeader) { 2062 return false; 2063 } 2064 if ( 2065 ByteBuff.compareTo(this.bufWithoutChecksum, 0, this.bufWithoutChecksum.limit(), 2066 castedComparison.bufWithoutChecksum, 0, castedComparison.bufWithoutChecksum.limit()) != 0 2067 ) { 2068 return false; 2069 } 2070 return true; 2071 } 2072 2073 DataBlockEncoding getDataBlockEncoding() { 2074 if (blockType == BlockType.ENCODED_DATA) { 2075 return DataBlockEncoding.getEncodingById(getDataBlockEncodingId()); 2076 } 2077 return DataBlockEncoding.NONE; 2078 } 2079 2080 byte getChecksumType() { 2081 return this.fileContext.getChecksumType().getCode(); 2082 } 2083 2084 int getBytesPerChecksum() { 2085 return this.fileContext.getBytesPerChecksum(); 2086 } 2087 2088 /** Returns the size of data on disk + header. Excludes checksum. */ 2089 int getOnDiskDataSizeWithHeader() { 2090 return this.onDiskDataSizeWithHeader; 2091 } 2092 2093 /** 2094 * Return the number of bytes required to store all the checksums for this block. Each checksum 2095 * value is a 4 byte integer. <br/> 2096 * NOTE: ByteBuff returned by {@link HFileBlock#getBufferWithoutHeader()} and 2097 * {@link HFileBlock#getBufferReadOnly} or DataInputStream returned by 2098 * {@link HFileBlock#getByteStream()} does not include checksum. 2099 */ 2100 int totalChecksumBytes() { 2101 return totalChecksumBytes; 2102 } 2103 2104 private int computeTotalChecksumBytes() { 2105 // If the hfile block has minorVersion 0, then there are no checksum 2106 // data to validate. Similarly, a zero value in this.bytesPerChecksum 2107 // indicates that cached blocks do not have checksum data because 2108 // checksums were already validated when the block was read from disk. 2109 if (!fileContext.isUseHBaseChecksum() || this.fileContext.getBytesPerChecksum() == 0) { 2110 return 0; 2111 } 2112 return (int) ChecksumUtil.numBytes(onDiskDataSizeWithHeader, 2113 this.fileContext.getBytesPerChecksum()); 2114 } 2115 2116 /** 2117 * Returns the size of this block header. 2118 */ 2119 public int headerSize() { 2120 return headerSize(this.fileContext.isUseHBaseChecksum()); 2121 } 2122 2123 /** 2124 * Maps a minor version to the size of the header. 2125 */ 2126 public static int headerSize(boolean usesHBaseChecksum) { 2127 return usesHBaseChecksum 2128 ? HConstants.HFILEBLOCK_HEADER_SIZE 2129 : HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM; 2130 } 2131 2132 /** 2133 * Return the appropriate DUMMY_HEADER for the minor version 2134 */ 2135 // TODO: Why is this in here? 2136 byte[] getDummyHeaderForVersion() { 2137 return getDummyHeaderForVersion(this.fileContext.isUseHBaseChecksum()); 2138 } 2139 2140 /** 2141 * Return the appropriate DUMMY_HEADER for the minor version 2142 */ 2143 static private byte[] getDummyHeaderForVersion(boolean usesHBaseChecksum) { 2144 return usesHBaseChecksum ? HConstants.HFILEBLOCK_DUMMY_HEADER : DUMMY_HEADER_NO_CHECKSUM; 2145 } 2146 2147 /** 2148 * @return This HFileBlocks fileContext which will a derivative of the fileContext for the file 2149 * from which this block's data was originally read. 2150 */ 2151 public HFileContext getHFileContext() { 2152 return this.fileContext; 2153 } 2154 2155 /** 2156 * Convert the contents of the block header into a human readable string. This is mostly helpful 2157 * for debugging. This assumes that the block has minor version > 0. 2158 */ 2159 static String toStringHeader(ByteBuff buf) throws IOException { 2160 byte[] magicBuf = new byte[Math.min(buf.limit() - buf.position(), BlockType.MAGIC_LENGTH)]; 2161 buf.get(magicBuf); 2162 BlockType bt = BlockType.parse(magicBuf, 0, BlockType.MAGIC_LENGTH); 2163 int compressedBlockSizeNoHeader = buf.getInt(); 2164 int uncompressedBlockSizeNoHeader = buf.getInt(); 2165 long prevBlockOffset = buf.getLong(); 2166 byte cksumtype = buf.get(); 2167 long bytesPerChecksum = buf.getInt(); 2168 long onDiskDataSizeWithHeader = buf.getInt(); 2169 return " Header dump: magic: " + Bytes.toString(magicBuf) + " blockType " + bt 2170 + " compressedBlockSizeNoHeader " + compressedBlockSizeNoHeader 2171 + " uncompressedBlockSizeNoHeader " + uncompressedBlockSizeNoHeader + " prevBlockOffset " 2172 + prevBlockOffset + " checksumType " + ChecksumType.codeToType(cksumtype) 2173 + " bytesPerChecksum " + bytesPerChecksum + " onDiskDataSizeWithHeader " 2174 + onDiskDataSizeWithHeader; 2175 } 2176 2177 /** 2178 * Creates a new HFileBlockBuilder from the existing block and a new ByteBuff. The builder will be 2179 * loaded with all of the original fields from blk, except now using the newBuff and setting 2180 * isSharedMem based on the source of the passed in newBuff. An existing HFileBlock may have been 2181 * an {@link ExclusiveMemHFileBlock}, but the new buffer might call for a 2182 * {@link SharedMemHFileBlock}. Or vice versa. 2183 * @param blk the block to clone from 2184 * @param newBuff the new buffer to use 2185 */ 2186 private static HFileBlockBuilder createBuilder(HFileBlock blk, ByteBuff newBuff) { 2187 return new HFileBlockBuilder().withBlockType(blk.blockType) 2188 .withOnDiskSizeWithoutHeader(blk.onDiskSizeWithoutHeader) 2189 .withUncompressedSizeWithoutHeader(blk.uncompressedSizeWithoutHeader) 2190 .withPrevBlockOffset(blk.prevBlockOffset).withByteBuff(newBuff).withOffset(blk.offset) 2191 .withOnDiskDataSizeWithHeader(blk.onDiskDataSizeWithHeader) 2192 .withNextBlockOnDiskSize(blk.nextBlockOnDiskSize).withHFileContext(blk.fileContext) 2193 .withByteBuffAllocator(blk.allocator).withShared(!newBuff.hasArray()); 2194 } 2195 2196 private static HFileBlock shallowClone(HFileBlock blk, ByteBuff newBuf) { 2197 return createBuilder(blk, newBuf).build(); 2198 } 2199 2200 static HFileBlock deepCloneOnHeap(HFileBlock blk) { 2201 ByteBuff deepCloned = ByteBuff 2202 .wrap(ByteBuffer.wrap(blk.bufWithoutChecksum.toBytes(0, blk.bufWithoutChecksum.limit()))); 2203 return createBuilder(blk, deepCloned).build(); 2204 } 2205}