001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP; 021import static org.apache.hadoop.hbase.io.hfile.BlockCompressedSizePredicator.BLOCK_COMPRESSED_SIZE_PREDICATOR; 022import static org.apache.hadoop.hbase.io.hfile.trace.HFileContextAttributesBuilderConsumer.CONTEXT_KEY; 023 024import io.opentelemetry.api.common.Attributes; 025import io.opentelemetry.api.common.AttributesBuilder; 026import io.opentelemetry.api.trace.Span; 027import io.opentelemetry.context.Context; 028import io.opentelemetry.context.Scope; 029import java.io.DataInputStream; 030import java.io.DataOutput; 031import java.io.DataOutputStream; 032import java.io.IOException; 033import java.nio.ByteBuffer; 034import java.util.ArrayList; 035import java.util.List; 036import java.util.Optional; 037import java.util.concurrent.atomic.AtomicReference; 038import java.util.concurrent.locks.Lock; 039import java.util.concurrent.locks.ReentrantLock; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.fs.FSDataInputStream; 042import org.apache.hadoop.fs.FSDataOutputStream; 043import org.apache.hadoop.hbase.Cell; 044import org.apache.hadoop.hbase.HConstants; 045import org.apache.hadoop.hbase.fs.HFileSystem; 046import org.apache.hadoop.hbase.io.ByteArrayOutputStream; 047import org.apache.hadoop.hbase.io.ByteBuffAllocator; 048import org.apache.hadoop.hbase.io.ByteBuffInputStream; 049import org.apache.hadoop.hbase.io.ByteBufferWriterDataOutputStream; 050import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; 051import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 052import org.apache.hadoop.hbase.io.encoding.EncodingState; 053import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; 054import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext; 055import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext; 056import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext; 057import org.apache.hadoop.hbase.io.hfile.trace.HFileContextAttributesBuilderConsumer; 058import org.apache.hadoop.hbase.io.util.BlockIOUtils; 059import org.apache.hadoop.hbase.nio.ByteBuff; 060import org.apache.hadoop.hbase.nio.MultiByteBuff; 061import org.apache.hadoop.hbase.nio.SingleByteBuff; 062import org.apache.hadoop.hbase.regionserver.ShipperListener; 063import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.ReadType; 064import org.apache.hadoop.hbase.util.Bytes; 065import org.apache.hadoop.hbase.util.ChecksumType; 066import org.apache.hadoop.hbase.util.ClassSize; 067import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 068import org.apache.hadoop.util.ReflectionUtils; 069import org.apache.yetus.audience.InterfaceAudience; 070import org.slf4j.Logger; 071import org.slf4j.LoggerFactory; 072 073import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 074 075/** 076 * Cacheable Blocks of an {@link HFile} version 2 file. Version 2 was introduced in hbase-0.92.0. 077 * <p> 078 * Version 1 was the original file block. Version 2 was introduced when we changed the hbase file 079 * format to support multi-level block indexes and compound bloom filters (HBASE-3857). Support for 080 * Version 1 was removed in hbase-1.3.0. 081 * <h3>HFileBlock: Version 2</h3> In version 2, a block is structured as follows: 082 * <ul> 083 * <li><b>Header:</b> See Writer#putHeader() for where header is written; header total size is 084 * HFILEBLOCK_HEADER_SIZE 085 * <ul> 086 * <li>0. blockType: Magic record identifying the {@link BlockType} (8 bytes): e.g. 087 * <code>DATABLK*</code> 088 * <li>1. onDiskSizeWithoutHeader: Compressed -- a.k.a 'on disk' -- block size, excluding header, 089 * but including tailing checksum bytes (4 bytes) 090 * <li>2. uncompressedSizeWithoutHeader: Uncompressed block size, excluding header, and excluding 091 * checksum bytes (4 bytes) 092 * <li>3. prevBlockOffset: The offset of the previous block of the same type (8 bytes). This is used 093 * to navigate to the previous block without having to go to the block index 094 * <li>4: For minorVersions >=1, the ordinal describing checksum type (1 byte) 095 * <li>5: For minorVersions >=1, the number of data bytes/checksum chunk (4 bytes) 096 * <li>6: onDiskDataSizeWithHeader: For minorVersions >=1, the size of data 'on disk', including 097 * header, excluding checksums (4 bytes) 098 * </ul> 099 * </li> 100 * <li><b>Raw/Compressed/Encrypted/Encoded data:</b> The compression algorithm is the same for all 101 * the blocks in an {@link HFile}. If compression is NONE, this is just raw, serialized Cells. 102 * <li><b>Tail:</b> For minorVersions >=1, a series of 4 byte checksums, one each for the number 103 * of bytes specified by bytesPerChecksum. 104 * </ul> 105 * <h3>Caching</h3> Caches cache whole blocks with trailing checksums if any. We then tag on some 106 * metadata, the content of BLOCK_METADATA_SPACE which will be flag on if we are doing 'hbase' 107 * checksums and then the offset into the file which is needed when we re-make a cache key when we 108 * return the block to the cache as 'done'. See {@link Cacheable#serialize(ByteBuffer, boolean)} and 109 * {@link Cacheable#getDeserializer()}. 110 * <p> 111 * TODO: Should we cache the checksums? Down in Writer#getBlockForCaching(CacheConfig) where we make 112 * a block to cache-on-write, there is an attempt at turning off checksums. This is not the only 113 * place we get blocks to cache. We also will cache the raw return from an hdfs read. In this case, 114 * the checksums may be present. If the cache is backed by something that doesn't do ECC, say an 115 * SSD, we might want to preserve checksums. For now this is open question. 116 * <p> 117 * TODO: Over in BucketCache, we save a block allocation by doing a custom serialization. Be sure to 118 * change it if serialization changes in here. Could we add a method here that takes an IOEngine and 119 * that then serializes to it rather than expose our internals over in BucketCache? IOEngine is in 120 * the bucket subpackage. Pull it up? Then this class knows about bucketcache. Ugh. 121 */ 122@InterfaceAudience.Private 123public class HFileBlock implements Cacheable { 124 private static final Logger LOG = LoggerFactory.getLogger(HFileBlock.class); 125 public static final long FIXED_OVERHEAD = ClassSize.estimateBase(HFileBlock.class, false); 126 127 // Block Header fields. 128 129 // TODO: encapsulate Header related logic in this inner class. 130 static class Header { 131 // Format of header is: 132 // 8 bytes - block magic 133 // 4 bytes int - onDiskSizeWithoutHeader 134 // 4 bytes int - uncompressedSizeWithoutHeader 135 // 8 bytes long - prevBlockOffset 136 // The following 3 are only present if header contains checksum information 137 // 1 byte - checksum type 138 // 4 byte int - bytes per checksum 139 // 4 byte int - onDiskDataSizeWithHeader 140 static int BLOCK_MAGIC_INDEX = 0; 141 static int ON_DISK_SIZE_WITHOUT_HEADER_INDEX = 8; 142 static int UNCOMPRESSED_SIZE_WITHOUT_HEADER_INDEX = 12; 143 static int PREV_BLOCK_OFFSET_INDEX = 16; 144 static int CHECKSUM_TYPE_INDEX = 24; 145 static int BYTES_PER_CHECKSUM_INDEX = 25; 146 static int ON_DISK_DATA_SIZE_WITH_HEADER_INDEX = 29; 147 } 148 149 /** Type of block. Header field 0. */ 150 private BlockType blockType; 151 152 /** 153 * Size on disk excluding header, including checksum. Header field 1. 154 * @see Writer#putHeader(byte[], int, int, int, int) 155 */ 156 private int onDiskSizeWithoutHeader; 157 158 /** 159 * Size of pure data. Does not include header or checksums. Header field 2. 160 * @see Writer#putHeader(byte[], int, int, int, int) 161 */ 162 private int uncompressedSizeWithoutHeader; 163 164 /** 165 * The offset of the previous block on disk. Header field 3. 166 * @see Writer#putHeader(byte[], int, int, int, int) 167 */ 168 private long prevBlockOffset; 169 170 /** 171 * Size on disk of header + data. Excludes checksum. Header field 6, OR calculated from 172 * {@link #onDiskSizeWithoutHeader} when using HDFS checksum. 173 * @see Writer#putHeader(byte[], int, int, int, int) 174 */ 175 private int onDiskDataSizeWithHeader; 176 // End of Block Header fields. 177 178 /** 179 * The in-memory representation of the hfile block. Can be on or offheap. Can be backed by a 180 * single ByteBuffer or by many. Make no assumptions. 181 * <p> 182 * Be careful reading from this <code>buf</code>. Duplicate and work on the duplicate or if not, 183 * be sure to reset position and limit else trouble down the road. 184 * <p> 185 * TODO: Make this read-only once made. 186 * <p> 187 * We are using the ByteBuff type. ByteBuffer is not extensible yet we need to be able to have a 188 * ByteBuffer-like API across multiple ByteBuffers reading from a cache such as BucketCache. So, 189 * we have this ByteBuff type. Unfortunately, it is spread all about HFileBlock. Would be good if 190 * could be confined to cache-use only but hard-to-do. 191 */ 192 private ByteBuff buf; 193 194 /** 195 * Meta data that holds meta information on the hfileblock. 196 */ 197 private HFileContext fileContext; 198 199 /** 200 * The offset of this block in the file. Populated by the reader for convenience of access. This 201 * offset is not part of the block header. 202 */ 203 private long offset = UNSET; 204 205 /** 206 * The on-disk size of the next block, including the header and checksums if present. UNSET if 207 * unknown. Blocks try to carry the size of the next block to read in this data member. Usually we 208 * get block sizes from the hfile index but sometimes the index is not available: e.g. when we 209 * read the indexes themselves (indexes are stored in blocks, we do not have an index for the 210 * indexes). Saves seeks especially around file open when there is a flurry of reading in hfile 211 * metadata. 212 */ 213 private int nextBlockOnDiskSize = UNSET; 214 215 private ByteBuffAllocator allocator; 216 217 /** 218 * On a checksum failure, do these many succeeding read requests using hdfs checksums before 219 * auto-reenabling hbase checksum verification. 220 */ 221 static final int CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD = 3; 222 223 private static int UNSET = -1; 224 public static final boolean FILL_HEADER = true; 225 public static final boolean DONT_FILL_HEADER = false; 226 227 // How to get the estimate correctly? if it is a singleBB? 228 public static final int MULTI_BYTE_BUFFER_HEAP_SIZE = 229 (int) ClassSize.estimateBase(MultiByteBuff.class, false); 230 231 /** 232 * Space for metadata on a block that gets stored along with the block when we cache it. There are 233 * a few bytes stuck on the end of the HFileBlock that we pull in from HDFS. 8 bytes are for the 234 * offset of this block (long) in the file. Offset is important because is is used when we remake 235 * the CacheKey when we return block to the cache when done. There is also a flag on whether 236 * checksumming is being done by hbase or not. See class comment for note on uncertain state of 237 * checksumming of blocks that come out of cache (should we or should we not?). Finally there are 238 * 4 bytes to hold the length of the next block which can save a seek on occasion if available. 239 * (This EXTRA info came in with original commit of the bucketcache, HBASE-7404. It was formerly 240 * known as EXTRA_SERIALIZATION_SPACE). 241 */ 242 public static final int BLOCK_METADATA_SPACE = 243 Bytes.SIZEOF_BYTE + Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT; 244 245 /** 246 * Each checksum value is an integer that can be stored in 4 bytes. 247 */ 248 static final int CHECKSUM_SIZE = Bytes.SIZEOF_INT; 249 250 static final byte[] DUMMY_HEADER_NO_CHECKSUM = 251 new byte[HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM]; 252 253 /** 254 * Used deserializing blocks from Cache. <code> 255 * ++++++++++++++ 256 * + HFileBlock + 257 * ++++++++++++++ 258 * + Checksums + <= Optional 259 * ++++++++++++++ 260 * + Metadata! + <= See note on BLOCK_METADATA_SPACE above. 261 * ++++++++++++++ 262 * </code> 263 * @see #serialize(ByteBuffer, boolean) 264 */ 265 public static final CacheableDeserializer<Cacheable> BLOCK_DESERIALIZER = new BlockDeserializer(); 266 267 public static final class BlockDeserializer implements CacheableDeserializer<Cacheable> { 268 private BlockDeserializer() { 269 } 270 271 @Override 272 public HFileBlock deserialize(ByteBuff buf, ByteBuffAllocator alloc) throws IOException { 273 // The buf has the file block followed by block metadata. 274 // Set limit to just before the BLOCK_METADATA_SPACE then rewind. 275 buf.limit(buf.limit() - BLOCK_METADATA_SPACE).rewind(); 276 // Get a new buffer to pass the HFileBlock for it to 'own'. 277 ByteBuff newByteBuff = buf.slice(); 278 // Read out the BLOCK_METADATA_SPACE content and shove into our HFileBlock. 279 buf.position(buf.limit()); 280 buf.limit(buf.limit() + HFileBlock.BLOCK_METADATA_SPACE); 281 boolean usesChecksum = buf.get() == (byte) 1; 282 long offset = buf.getLong(); 283 int nextBlockOnDiskSize = buf.getInt(); 284 return createFromBuff(newByteBuff, usesChecksum, offset, nextBlockOnDiskSize, null, alloc); 285 } 286 287 @Override 288 public int getDeserializerIdentifier() { 289 return DESERIALIZER_IDENTIFIER; 290 } 291 } 292 293 private static final int DESERIALIZER_IDENTIFIER; 294 static { 295 DESERIALIZER_IDENTIFIER = 296 CacheableDeserializerIdManager.registerDeserializer(BLOCK_DESERIALIZER); 297 } 298 299 /** 300 * Creates a new {@link HFile} block from the given fields. This constructor is used only while 301 * writing blocks and caching, and is sitting in a byte buffer and we want to stuff the block into 302 * cache. 303 * <p> 304 * TODO: The caller presumes no checksumming 305 * <p> 306 * TODO: HFile block writer can also off-heap ? 307 * </p> 308 * required of this block instance since going into cache; checksum already verified on underlying 309 * block data pulled in from filesystem. Is that correct? What if cache is SSD? 310 * @param blockType the type of this block, see {@link BlockType} 311 * @param onDiskSizeWithoutHeader see {@link #onDiskSizeWithoutHeader} 312 * @param uncompressedSizeWithoutHeader see {@link #uncompressedSizeWithoutHeader} 313 * @param prevBlockOffset see {@link #prevBlockOffset} 314 * @param buf block buffer with header 315 * ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes) 316 * @param fillHeader when true, write the first 4 header fields into passed 317 * buffer. 318 * @param offset the file offset the block was read from 319 * @param onDiskDataSizeWithHeader see {@link #onDiskDataSizeWithHeader} 320 * @param fileContext HFile meta data 321 */ 322 public HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader, 323 int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuff buf, boolean fillHeader, 324 long offset, int nextBlockOnDiskSize, int onDiskDataSizeWithHeader, HFileContext fileContext, 325 ByteBuffAllocator allocator) { 326 this.blockType = blockType; 327 this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader; 328 this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader; 329 this.prevBlockOffset = prevBlockOffset; 330 this.offset = offset; 331 this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader; 332 this.nextBlockOnDiskSize = nextBlockOnDiskSize; 333 this.fileContext = fileContext; 334 this.allocator = allocator; 335 this.buf = buf; 336 if (fillHeader) { 337 overwriteHeader(); 338 } 339 this.buf.rewind(); 340 } 341 342 /** 343 * Creates a block from an existing buffer starting with a header. Rewinds and takes ownership of 344 * the buffer. By definition of rewind, ignores the buffer position, but if you slice the buffer 345 * beforehand, it will rewind to that point. 346 * @param buf Has header, content, and trailing checksums if present. 347 */ 348 static HFileBlock createFromBuff(ByteBuff buf, boolean usesHBaseChecksum, final long offset, 349 final int nextBlockOnDiskSize, HFileContext fileContext, ByteBuffAllocator allocator) 350 throws IOException { 351 buf.rewind(); 352 final BlockType blockType = BlockType.read(buf); 353 final int onDiskSizeWithoutHeader = buf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX); 354 final int uncompressedSizeWithoutHeader = 355 buf.getInt(Header.UNCOMPRESSED_SIZE_WITHOUT_HEADER_INDEX); 356 final long prevBlockOffset = buf.getLong(Header.PREV_BLOCK_OFFSET_INDEX); 357 // This constructor is called when we deserialize a block from cache and when we read a block in 358 // from the fs. fileCache is null when deserialized from cache so need to make up one. 359 HFileContextBuilder fileContextBuilder = 360 fileContext != null ? new HFileContextBuilder(fileContext) : new HFileContextBuilder(); 361 fileContextBuilder.withHBaseCheckSum(usesHBaseChecksum); 362 int onDiskDataSizeWithHeader; 363 if (usesHBaseChecksum) { 364 byte checksumType = buf.get(Header.CHECKSUM_TYPE_INDEX); 365 int bytesPerChecksum = buf.getInt(Header.BYTES_PER_CHECKSUM_INDEX); 366 onDiskDataSizeWithHeader = buf.getInt(Header.ON_DISK_DATA_SIZE_WITH_HEADER_INDEX); 367 // Use the checksum type and bytes per checksum from header, not from fileContext. 368 fileContextBuilder.withChecksumType(ChecksumType.codeToType(checksumType)); 369 fileContextBuilder.withBytesPerCheckSum(bytesPerChecksum); 370 } else { 371 fileContextBuilder.withChecksumType(ChecksumType.NULL); 372 fileContextBuilder.withBytesPerCheckSum(0); 373 // Need to fix onDiskDataSizeWithHeader; there are not checksums after-block-data 374 onDiskDataSizeWithHeader = onDiskSizeWithoutHeader + headerSize(usesHBaseChecksum); 375 } 376 fileContext = fileContextBuilder.build(); 377 assert usesHBaseChecksum == fileContext.isUseHBaseChecksum(); 378 return new HFileBlockBuilder().withBlockType(blockType) 379 .withOnDiskSizeWithoutHeader(onDiskSizeWithoutHeader) 380 .withUncompressedSizeWithoutHeader(uncompressedSizeWithoutHeader) 381 .withPrevBlockOffset(prevBlockOffset).withOffset(offset) 382 .withOnDiskDataSizeWithHeader(onDiskDataSizeWithHeader) 383 .withNextBlockOnDiskSize(nextBlockOnDiskSize).withHFileContext(fileContext) 384 .withByteBuffAllocator(allocator).withByteBuff(buf.rewind()).withShared(!buf.hasArray()) 385 .build(); 386 } 387 388 /** 389 * Parse total on disk size including header and checksum. 390 * @param headerBuf Header ByteBuffer. Presumed exact size of header. 391 * @param verifyChecksum true if checksum verification is in use. 392 * @return Size of the block with header included. 393 */ 394 private static int getOnDiskSizeWithHeader(final ByteBuff headerBuf, boolean verifyChecksum) { 395 return headerBuf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX) + headerSize(verifyChecksum); 396 } 397 398 /** 399 * @return the on-disk size of the next block (including the header size and any checksums if 400 * present) read by peeking into the next block's header; use as a hint when doing a read 401 * of the next block when scanning or running over a file. 402 */ 403 int getNextBlockOnDiskSize() { 404 return nextBlockOnDiskSize; 405 } 406 407 @Override 408 public BlockType getBlockType() { 409 return blockType; 410 } 411 412 @Override 413 public int refCnt() { 414 return buf.refCnt(); 415 } 416 417 @Override 418 public HFileBlock retain() { 419 buf.retain(); 420 return this; 421 } 422 423 /** 424 * Call {@link ByteBuff#release()} to decrease the reference count, if no other reference, it will 425 * return back the {@link ByteBuffer} to {@link org.apache.hadoop.hbase.io.ByteBuffAllocator} 426 */ 427 @Override 428 public boolean release() { 429 return buf.release(); 430 } 431 432 /** 433 * Calling this method in strategic locations where HFileBlocks are referenced may help diagnose 434 * potential buffer leaks. We pass the block itself as a default hint, but one can use 435 * {@link #touch(Object)} to pass their own hint as well. 436 */ 437 @Override 438 public HFileBlock touch() { 439 return touch(this); 440 } 441 442 @Override 443 public HFileBlock touch(Object hint) { 444 buf.touch(hint); 445 return this; 446 } 447 448 /** Returns get data block encoding id that was used to encode this block */ 449 short getDataBlockEncodingId() { 450 if (blockType != BlockType.ENCODED_DATA) { 451 throw new IllegalArgumentException("Querying encoder ID of a block " + "of type other than " 452 + BlockType.ENCODED_DATA + ": " + blockType); 453 } 454 return buf.getShort(headerSize()); 455 } 456 457 /** Returns the on-disk size of header + data part + checksum. */ 458 public int getOnDiskSizeWithHeader() { 459 return onDiskSizeWithoutHeader + headerSize(); 460 } 461 462 /** Returns the on-disk size of the data part + checksum (header excluded). */ 463 int getOnDiskSizeWithoutHeader() { 464 return onDiskSizeWithoutHeader; 465 } 466 467 /** Returns the uncompressed size of data part (header and checksum excluded). */ 468 public int getUncompressedSizeWithoutHeader() { 469 return uncompressedSizeWithoutHeader; 470 } 471 472 /** Returns the offset of the previous block of the same type in the file, or -1 if unknown */ 473 long getPrevBlockOffset() { 474 return prevBlockOffset; 475 } 476 477 /** 478 * Rewinds {@code buf} and writes first 4 header fields. {@code buf} position is modified as 479 * side-effect. 480 */ 481 private void overwriteHeader() { 482 buf.rewind(); 483 blockType.write(buf); 484 buf.putInt(onDiskSizeWithoutHeader); 485 buf.putInt(uncompressedSizeWithoutHeader); 486 buf.putLong(prevBlockOffset); 487 if (this.fileContext.isUseHBaseChecksum()) { 488 buf.put(fileContext.getChecksumType().getCode()); 489 buf.putInt(fileContext.getBytesPerChecksum()); 490 buf.putInt(onDiskDataSizeWithHeader); 491 } 492 } 493 494 /** 495 * Returns a buffer that does not include the header and checksum. 496 * @return the buffer with header skipped and checksum omitted. 497 */ 498 public ByteBuff getBufferWithoutHeader() { 499 ByteBuff dup = getBufferReadOnly(); 500 return dup.position(headerSize()).slice(); 501 } 502 503 /** 504 * Returns a read-only duplicate of the buffer this block stores internally ready to be read. 505 * Clients must not modify the buffer object though they may set position and limit on the 506 * returned buffer since we pass back a duplicate. This method has to be public because it is used 507 * in {@link CompoundBloomFilter} to avoid object creation on every Bloom filter lookup, but has 508 * to be used with caution. Buffer holds header, block content, and any follow-on checksums if 509 * present. 510 * @return the buffer of this block for read-only operations 511 */ 512 public ByteBuff getBufferReadOnly() { 513 // TODO: ByteBuf does not support asReadOnlyBuffer(). Fix. 514 ByteBuff dup = this.buf.duplicate(); 515 assert dup.position() == 0; 516 return dup; 517 } 518 519 public ByteBuffAllocator getByteBuffAllocator() { 520 return this.allocator; 521 } 522 523 private void sanityCheckAssertion(long valueFromBuf, long valueFromField, String fieldName) 524 throws IOException { 525 if (valueFromBuf != valueFromField) { 526 throw new AssertionError(fieldName + " in the buffer (" + valueFromBuf 527 + ") is different from that in the field (" + valueFromField + ")"); 528 } 529 } 530 531 private void sanityCheckAssertion(BlockType valueFromBuf, BlockType valueFromField) 532 throws IOException { 533 if (valueFromBuf != valueFromField) { 534 throw new IOException("Block type stored in the buffer: " + valueFromBuf 535 + ", block type field: " + valueFromField); 536 } 537 } 538 539 /** 540 * Checks if the block is internally consistent, i.e. the first 541 * {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the buffer contain a valid header consistent 542 * with the fields. Assumes a packed block structure. This function is primary for testing and 543 * debugging, and is not thread-safe, because it alters the internal buffer pointer. Used by tests 544 * only. 545 */ 546 void sanityCheck() throws IOException { 547 // Duplicate so no side-effects 548 ByteBuff dup = this.buf.duplicate().rewind(); 549 sanityCheckAssertion(BlockType.read(dup), blockType); 550 551 sanityCheckAssertion(dup.getInt(), onDiskSizeWithoutHeader, "onDiskSizeWithoutHeader"); 552 553 sanityCheckAssertion(dup.getInt(), uncompressedSizeWithoutHeader, 554 "uncompressedSizeWithoutHeader"); 555 556 sanityCheckAssertion(dup.getLong(), prevBlockOffset, "prevBlockOffset"); 557 if (this.fileContext.isUseHBaseChecksum()) { 558 sanityCheckAssertion(dup.get(), this.fileContext.getChecksumType().getCode(), "checksumType"); 559 sanityCheckAssertion(dup.getInt(), this.fileContext.getBytesPerChecksum(), 560 "bytesPerChecksum"); 561 sanityCheckAssertion(dup.getInt(), onDiskDataSizeWithHeader, "onDiskDataSizeWithHeader"); 562 } 563 564 if (dup.limit() != onDiskDataSizeWithHeader) { 565 throw new AssertionError( 566 "Expected limit " + onDiskDataSizeWithHeader + ", got " + dup.limit()); 567 } 568 569 // We might optionally allocate HFILEBLOCK_HEADER_SIZE more bytes to read the next 570 // block's header, so there are two sensible values for buffer capacity. 571 int hdrSize = headerSize(); 572 dup.rewind(); 573 if ( 574 dup.remaining() != onDiskDataSizeWithHeader 575 && dup.remaining() != onDiskDataSizeWithHeader + hdrSize 576 ) { 577 throw new AssertionError("Invalid buffer capacity: " + dup.remaining() + ", expected " 578 + onDiskDataSizeWithHeader + " or " + (onDiskDataSizeWithHeader + hdrSize)); 579 } 580 } 581 582 @Override 583 public String toString() { 584 StringBuilder sb = new StringBuilder().append("[").append("blockType=").append(blockType) 585 .append(", fileOffset=").append(offset).append(", headerSize=").append(headerSize()) 586 .append(", onDiskSizeWithoutHeader=").append(onDiskSizeWithoutHeader) 587 .append(", uncompressedSizeWithoutHeader=").append(uncompressedSizeWithoutHeader) 588 .append(", prevBlockOffset=").append(prevBlockOffset).append(", isUseHBaseChecksum=") 589 .append(fileContext.isUseHBaseChecksum()); 590 if (fileContext.isUseHBaseChecksum()) { 591 sb.append(", checksumType=").append(ChecksumType.codeToType(this.buf.get(24))) 592 .append(", bytesPerChecksum=").append(this.buf.getInt(24 + 1)) 593 .append(", onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader); 594 } else { 595 sb.append(", onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader).append("(") 596 .append(onDiskSizeWithoutHeader).append("+") 597 .append(HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM).append(")"); 598 } 599 String dataBegin; 600 if (buf.hasArray()) { 601 dataBegin = Bytes.toStringBinary(buf.array(), buf.arrayOffset() + headerSize(), 602 Math.min(32, buf.limit() - buf.arrayOffset() - headerSize())); 603 } else { 604 ByteBuff bufWithoutHeader = getBufferWithoutHeader(); 605 byte[] dataBeginBytes = 606 new byte[Math.min(32, bufWithoutHeader.limit() - bufWithoutHeader.position())]; 607 bufWithoutHeader.get(dataBeginBytes); 608 dataBegin = Bytes.toStringBinary(dataBeginBytes); 609 } 610 sb.append(", getOnDiskSizeWithHeader=").append(getOnDiskSizeWithHeader()) 611 .append(", totalChecksumBytes=").append(totalChecksumBytes()).append(", isUnpacked=") 612 .append(isUnpacked()).append(", buf=[").append(buf).append("]").append(", dataBeginsWith=") 613 .append(dataBegin).append(", fileContext=").append(fileContext) 614 .append(", nextBlockOnDiskSize=").append(nextBlockOnDiskSize).append("]"); 615 return sb.toString(); 616 } 617 618 /** 619 * Retrieves the decompressed/decrypted view of this block. An encoded block remains in its 620 * encoded structure. Internal structures are shared between instances where applicable. 621 */ 622 HFileBlock unpack(HFileContext fileContext, FSReader reader) throws IOException { 623 if (!fileContext.isCompressedOrEncrypted()) { 624 // TODO: cannot use our own fileContext here because HFileBlock(ByteBuffer, boolean), 625 // which is used for block serialization to L2 cache, does not preserve encoding and 626 // encryption details. 627 return this; 628 } 629 630 ByteBuff newBuf = allocateBufferForUnpacking(); // allocates space for the decompressed block 631 HFileBlock unpacked = shallowClone(this, newBuf); 632 633 boolean succ = false; 634 final Context context = 635 Context.current().with(CONTEXT_KEY, new HFileContextAttributesBuilderConsumer(fileContext)); 636 try (Scope ignored = context.makeCurrent()) { 637 HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA 638 ? reader.getBlockDecodingContext() 639 : reader.getDefaultBlockDecodingContext(); 640 // Create a duplicated buffer without the header part. 641 int headerSize = this.headerSize(); 642 ByteBuff dup = this.buf.duplicate(); 643 dup.position(headerSize); 644 dup = dup.slice(); 645 // Decode the dup into unpacked#buf 646 ctx.prepareDecoding(unpacked.getOnDiskDataSizeWithHeader() - headerSize, 647 unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(), dup); 648 succ = true; 649 return unpacked; 650 } finally { 651 if (!succ) { 652 unpacked.release(); 653 } 654 } 655 } 656 657 /** 658 * Always allocates a new buffer of the correct size. Copies header bytes from the existing 659 * buffer. Does not change header fields. Reserve room to keep checksum bytes too. 660 */ 661 private ByteBuff allocateBufferForUnpacking() { 662 int headerSize = headerSize(); 663 int capacityNeeded = headerSize + uncompressedSizeWithoutHeader; 664 665 ByteBuff source = buf.duplicate(); 666 ByteBuff newBuf = allocator.allocate(capacityNeeded); 667 668 // Copy header bytes into newBuf. 669 source.position(0); 670 newBuf.put(0, source, 0, headerSize); 671 672 // set limit to exclude next block's header 673 newBuf.limit(capacityNeeded); 674 return newBuf; 675 } 676 677 /** 678 * Return true when this block's buffer has been unpacked, false otherwise. Note this is a 679 * calculated heuristic, not tracked attribute of the block. 680 */ 681 public boolean isUnpacked() { 682 final int headerSize = headerSize(); 683 final int expectedCapacity = headerSize + uncompressedSizeWithoutHeader; 684 final int bufCapacity = buf.remaining(); 685 return bufCapacity == expectedCapacity || bufCapacity == expectedCapacity + headerSize; 686 } 687 688 /** 689 * Cannot be {@link #UNSET}. Must be a legitimate value. Used re-making the {@link BlockCacheKey} 690 * when block is returned to the cache. 691 * @return the offset of this block in the file it was read from 692 */ 693 long getOffset() { 694 if (offset < 0) { 695 throw new IllegalStateException("HFile block offset not initialized properly"); 696 } 697 return offset; 698 } 699 700 /** Returns a byte stream reading the data + checksum of this block */ 701 DataInputStream getByteStream() { 702 ByteBuff dup = this.buf.duplicate(); 703 dup.position(this.headerSize()); 704 return new DataInputStream(new ByteBuffInputStream(dup)); 705 } 706 707 @Override 708 public long heapSize() { 709 long size = FIXED_OVERHEAD; 710 size += fileContext.heapSize(); 711 if (buf != null) { 712 // Deep overhead of the byte buffer. Needs to be aligned separately. 713 size += ClassSize.align(buf.capacity() + MULTI_BYTE_BUFFER_HEAP_SIZE); 714 } 715 return ClassSize.align(size); 716 } 717 718 /** 719 * Will be override by {@link SharedMemHFileBlock} or {@link ExclusiveMemHFileBlock}. Return true 720 * by default. 721 */ 722 public boolean isSharedMem() { 723 return true; 724 } 725 726 /** 727 * Unified version 2 {@link HFile} block writer. The intended usage pattern is as follows: 728 * <ol> 729 * <li>Construct an {@link HFileBlock.Writer}, providing a compression algorithm. 730 * <li>Call {@link Writer#startWriting} and get a data stream to write to. 731 * <li>Write your data into the stream. 732 * <li>Call Writer#writeHeaderAndData(FSDataOutputStream) as many times as you need to. store the 733 * serialized block into an external stream. 734 * <li>Repeat to write more blocks. 735 * </ol> 736 * <p> 737 */ 738 static class Writer implements ShipperListener { 739 private enum State { 740 INIT, 741 WRITING, 742 BLOCK_READY 743 } 744 745 private int maxSizeUnCompressed; 746 747 private BlockCompressedSizePredicator compressedSizePredicator; 748 749 /** Writer state. Used to ensure the correct usage protocol. */ 750 private State state = State.INIT; 751 752 /** Data block encoder used for data blocks */ 753 private final HFileDataBlockEncoder dataBlockEncoder; 754 755 private HFileBlockEncodingContext dataBlockEncodingCtx; 756 757 /** block encoding context for non-data blocks */ 758 private HFileBlockDefaultEncodingContext defaultBlockEncodingCtx; 759 760 /** 761 * The stream we use to accumulate data into a block in an uncompressed format. We reset this 762 * stream at the end of each block and reuse it. The header is written as the first 763 * {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes into this stream. 764 */ 765 private ByteArrayOutputStream baosInMemory; 766 767 /** 768 * Current block type. Set in {@link #startWriting(BlockType)}. Could be changed in 769 * {@link #finishBlock()} from {@link BlockType#DATA} to {@link BlockType#ENCODED_DATA}. 770 */ 771 private BlockType blockType; 772 773 /** 774 * A stream that we write uncompressed bytes to, which compresses them and writes them to 775 * {@link #baosInMemory}. 776 */ 777 private DataOutputStream userDataStream; 778 779 /** 780 * Bytes to be written to the file system, including the header. Compressed if compression is 781 * turned on. It also includes the checksum data that immediately follows the block data. 782 * (header + data + checksums) 783 */ 784 private ByteArrayOutputStream onDiskBlockBytesWithHeader; 785 786 /** 787 * The size of the checksum data on disk. It is used only if data is not compressed. If data is 788 * compressed, then the checksums are already part of onDiskBytesWithHeader. If data is 789 * uncompressed, then this variable stores the checksum data for this block. 790 */ 791 private byte[] onDiskChecksum = HConstants.EMPTY_BYTE_ARRAY; 792 793 /** 794 * Current block's start offset in the {@link HFile}. Set in 795 * {@link #writeHeaderAndData(FSDataOutputStream)}. 796 */ 797 private long startOffset; 798 799 /** 800 * Offset of previous block by block type. Updated when the next block is started. 801 */ 802 private long[] prevOffsetByType; 803 804 /** The offset of the previous block of the same type */ 805 private long prevOffset; 806 /** Meta data that holds information about the hfileblock **/ 807 private HFileContext fileContext; 808 809 private final ByteBuffAllocator allocator; 810 811 @Override 812 public void beforeShipped() { 813 if (getEncodingState() != null) { 814 getEncodingState().beforeShipped(); 815 } 816 } 817 818 EncodingState getEncodingState() { 819 return dataBlockEncodingCtx.getEncodingState(); 820 } 821 822 /** 823 * @param dataBlockEncoder data block encoding algorithm to use 824 */ 825 public Writer(Configuration conf, HFileDataBlockEncoder dataBlockEncoder, 826 HFileContext fileContext) { 827 this(conf, dataBlockEncoder, fileContext, ByteBuffAllocator.HEAP, fileContext.getBlocksize()); 828 } 829 830 public Writer(Configuration conf, HFileDataBlockEncoder dataBlockEncoder, 831 HFileContext fileContext, ByteBuffAllocator allocator, int maxSizeUnCompressed) { 832 if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) { 833 throw new RuntimeException("Unsupported value of bytesPerChecksum. " + " Minimum is " 834 + HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is " 835 + fileContext.getBytesPerChecksum()); 836 } 837 this.allocator = allocator; 838 this.dataBlockEncoder = 839 dataBlockEncoder != null ? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE; 840 this.dataBlockEncodingCtx = this.dataBlockEncoder.newDataBlockEncodingContext(conf, 841 HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext); 842 // TODO: This should be lazily instantiated 843 this.defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(conf, null, 844 HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext); 845 // TODO: Set BAOS initial size. Use fileContext.getBlocksize() and add for header/checksum 846 baosInMemory = new ByteArrayOutputStream(); 847 prevOffsetByType = new long[BlockType.values().length]; 848 for (int i = 0; i < prevOffsetByType.length; ++i) { 849 prevOffsetByType[i] = UNSET; 850 } 851 // TODO: Why fileContext saved away when we have dataBlockEncoder and/or 852 // defaultDataBlockEncoder? 853 this.fileContext = fileContext; 854 this.compressedSizePredicator = (BlockCompressedSizePredicator) ReflectionUtils.newInstance( 855 conf.getClass(BLOCK_COMPRESSED_SIZE_PREDICATOR, UncompressedBlockSizePredicator.class), 856 new Configuration(conf)); 857 this.maxSizeUnCompressed = maxSizeUnCompressed; 858 } 859 860 /** 861 * Starts writing into the block. The previous block's data is discarded. 862 * @return the stream the user can write their data into 863 */ 864 DataOutputStream startWriting(BlockType newBlockType) throws IOException { 865 if (state == State.BLOCK_READY && startOffset != -1) { 866 // We had a previous block that was written to a stream at a specific 867 // offset. Save that offset as the last offset of a block of that type. 868 prevOffsetByType[blockType.getId()] = startOffset; 869 } 870 871 startOffset = -1; 872 blockType = newBlockType; 873 874 baosInMemory.reset(); 875 baosInMemory.write(HConstants.HFILEBLOCK_DUMMY_HEADER); 876 877 state = State.WRITING; 878 879 // We will compress it later in finishBlock() 880 userDataStream = new ByteBufferWriterDataOutputStream(baosInMemory); 881 if (newBlockType == BlockType.DATA) { 882 this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream); 883 } 884 return userDataStream; 885 } 886 887 /** 888 * Writes the Cell to this block 889 */ 890 void write(Cell cell) throws IOException { 891 expectState(State.WRITING); 892 this.dataBlockEncoder.encode(cell, dataBlockEncodingCtx, this.userDataStream); 893 } 894 895 /** 896 * Transitions the block writer from the "writing" state to the "block ready" state. Does 897 * nothing if a block is already finished. 898 */ 899 void ensureBlockReady() throws IOException { 900 Preconditions.checkState(state != State.INIT, "Unexpected state: " + state); 901 902 if (state == State.BLOCK_READY) { 903 return; 904 } 905 906 // This will set state to BLOCK_READY. 907 finishBlock(); 908 } 909 910 public boolean checkBoundariesWithPredicate() { 911 int rawBlockSize = encodedBlockSizeWritten(); 912 if (rawBlockSize >= maxSizeUnCompressed) { 913 return true; 914 } else { 915 return compressedSizePredicator.shouldFinishBlock(rawBlockSize); 916 } 917 } 918 919 /** 920 * Finish up writing of the block. Flushes the compressing stream (if using compression), fills 921 * out the header, does any compression/encryption of bytes to flush out to disk, and manages 922 * the cache on write content, if applicable. Sets block write state to "block ready". 923 */ 924 private void finishBlock() throws IOException { 925 if (blockType == BlockType.DATA) { 926 this.dataBlockEncoder.endBlockEncoding(dataBlockEncodingCtx, userDataStream, 927 baosInMemory.getBuffer(), blockType); 928 blockType = dataBlockEncodingCtx.getBlockType(); 929 } 930 userDataStream.flush(); 931 prevOffset = prevOffsetByType[blockType.getId()]; 932 933 // We need to cache the unencoded/uncompressed size before changing the block state 934 int rawBlockSize = 0; 935 if (this.getEncodingState() != null) { 936 rawBlockSize = encodedBlockSizeWritten(); 937 } 938 // We need to set state before we can package the block up for cache-on-write. In a way, the 939 // block is ready, but not yet encoded or compressed. 940 state = State.BLOCK_READY; 941 Bytes compressAndEncryptDat; 942 if (blockType == BlockType.DATA || blockType == BlockType.ENCODED_DATA) { 943 compressAndEncryptDat = 944 dataBlockEncodingCtx.compressAndEncrypt(baosInMemory.getBuffer(), 0, baosInMemory.size()); 945 } else { 946 compressAndEncryptDat = defaultBlockEncodingCtx.compressAndEncrypt(baosInMemory.getBuffer(), 947 0, baosInMemory.size()); 948 } 949 if (compressAndEncryptDat == null) { 950 compressAndEncryptDat = new Bytes(baosInMemory.getBuffer(), 0, baosInMemory.size()); 951 } 952 if (onDiskBlockBytesWithHeader == null) { 953 onDiskBlockBytesWithHeader = new ByteArrayOutputStream(compressAndEncryptDat.getLength()); 954 } 955 onDiskBlockBytesWithHeader.reset(); 956 onDiskBlockBytesWithHeader.write(compressAndEncryptDat.get(), 957 compressAndEncryptDat.getOffset(), compressAndEncryptDat.getLength()); 958 // Update raw and compressed sizes in the predicate 959 compressedSizePredicator.updateLatestBlockSizes(fileContext, rawBlockSize, 960 onDiskBlockBytesWithHeader.size()); 961 962 // Calculate how many bytes we need for checksum on the tail of the block. 963 int numBytes = (int) ChecksumUtil.numBytes(onDiskBlockBytesWithHeader.size(), 964 fileContext.getBytesPerChecksum()); 965 966 // Put the header for the on disk bytes; header currently is unfilled-out 967 putHeader(onDiskBlockBytesWithHeader, onDiskBlockBytesWithHeader.size() + numBytes, 968 baosInMemory.size(), onDiskBlockBytesWithHeader.size()); 969 970 if (onDiskChecksum.length != numBytes) { 971 onDiskChecksum = new byte[numBytes]; 972 } 973 ChecksumUtil.generateChecksums(onDiskBlockBytesWithHeader.getBuffer(), 0, 974 onDiskBlockBytesWithHeader.size(), onDiskChecksum, 0, fileContext.getChecksumType(), 975 fileContext.getBytesPerChecksum()); 976 } 977 978 /** 979 * Put the header into the given byte array at the given offset. 980 * @param onDiskSize size of the block on disk header + data + checksum 981 * @param uncompressedSize size of the block after decompression (but before optional data block 982 * decoding) including header 983 * @param onDiskDataSize size of the block on disk with header and data but not including the 984 * checksums 985 */ 986 private void putHeader(byte[] dest, int offset, int onDiskSize, int uncompressedSize, 987 int onDiskDataSize) { 988 offset = blockType.put(dest, offset); 989 offset = Bytes.putInt(dest, offset, onDiskSize - HConstants.HFILEBLOCK_HEADER_SIZE); 990 offset = Bytes.putInt(dest, offset, uncompressedSize - HConstants.HFILEBLOCK_HEADER_SIZE); 991 offset = Bytes.putLong(dest, offset, prevOffset); 992 offset = Bytes.putByte(dest, offset, fileContext.getChecksumType().getCode()); 993 offset = Bytes.putInt(dest, offset, fileContext.getBytesPerChecksum()); 994 Bytes.putInt(dest, offset, onDiskDataSize); 995 } 996 997 private void putHeader(ByteBuff buff, int onDiskSize, int uncompressedSize, 998 int onDiskDataSize) { 999 buff.rewind(); 1000 blockType.write(buff); 1001 buff.putInt(onDiskSize - HConstants.HFILEBLOCK_HEADER_SIZE); 1002 buff.putInt(uncompressedSize - HConstants.HFILEBLOCK_HEADER_SIZE); 1003 buff.putLong(prevOffset); 1004 buff.put(fileContext.getChecksumType().getCode()); 1005 buff.putInt(fileContext.getBytesPerChecksum()); 1006 buff.putInt(onDiskDataSize); 1007 } 1008 1009 private void putHeader(ByteArrayOutputStream dest, int onDiskSize, int uncompressedSize, 1010 int onDiskDataSize) { 1011 putHeader(dest.getBuffer(), 0, onDiskSize, uncompressedSize, onDiskDataSize); 1012 } 1013 1014 /** 1015 * Similar to {@link #writeHeaderAndData(FSDataOutputStream)}, but records the offset of this 1016 * block so that it can be referenced in the next block of the same type. 1017 */ 1018 void writeHeaderAndData(FSDataOutputStream out) throws IOException { 1019 long offset = out.getPos(); 1020 if (startOffset != UNSET && offset != startOffset) { 1021 throw new IOException("A " + blockType + " block written to a " 1022 + "stream twice, first at offset " + startOffset + ", then at " + offset); 1023 } 1024 startOffset = offset; 1025 finishBlockAndWriteHeaderAndData(out); 1026 } 1027 1028 /** 1029 * Writes the header and the compressed data of this block (or uncompressed data when not using 1030 * compression) into the given stream. Can be called in the "writing" state or in the "block 1031 * ready" state. If called in the "writing" state, transitions the writer to the "block ready" 1032 * state. 1033 * @param out the output stream to write the 1034 */ 1035 protected void finishBlockAndWriteHeaderAndData(DataOutputStream out) throws IOException { 1036 ensureBlockReady(); 1037 long startTime = EnvironmentEdgeManager.currentTime(); 1038 out.write(onDiskBlockBytesWithHeader.getBuffer(), 0, onDiskBlockBytesWithHeader.size()); 1039 out.write(onDiskChecksum); 1040 HFile.updateWriteLatency(EnvironmentEdgeManager.currentTime() - startTime); 1041 } 1042 1043 /** 1044 * Returns the header or the compressed data (or uncompressed data when not using compression) 1045 * as a byte array. Can be called in the "writing" state or in the "block ready" state. If 1046 * called in the "writing" state, transitions the writer to the "block ready" state. This 1047 * returns the header + data + checksums stored on disk. 1048 * @return header and data as they would be stored on disk in a byte array 1049 */ 1050 byte[] getHeaderAndDataForTest() throws IOException { 1051 ensureBlockReady(); 1052 // This is not very optimal, because we are doing an extra copy. 1053 // But this method is used only by unit tests. 1054 byte[] output = new byte[onDiskBlockBytesWithHeader.size() + onDiskChecksum.length]; 1055 System.arraycopy(onDiskBlockBytesWithHeader.getBuffer(), 0, output, 0, 1056 onDiskBlockBytesWithHeader.size()); 1057 System.arraycopy(onDiskChecksum, 0, output, onDiskBlockBytesWithHeader.size(), 1058 onDiskChecksum.length); 1059 return output; 1060 } 1061 1062 /** 1063 * Releases resources used by this writer. 1064 */ 1065 void release() { 1066 if (dataBlockEncodingCtx != null) { 1067 dataBlockEncodingCtx.close(); 1068 dataBlockEncodingCtx = null; 1069 } 1070 if (defaultBlockEncodingCtx != null) { 1071 defaultBlockEncodingCtx.close(); 1072 defaultBlockEncodingCtx = null; 1073 } 1074 } 1075 1076 /** 1077 * Returns the on-disk size of the data portion of the block. This is the compressed size if 1078 * compression is enabled. Can only be called in the "block ready" state. Header is not 1079 * compressed, and its size is not included in the return value. 1080 * @return the on-disk size of the block, not including the header. 1081 */ 1082 int getOnDiskSizeWithoutHeader() { 1083 expectState(State.BLOCK_READY); 1084 return onDiskBlockBytesWithHeader.size() + onDiskChecksum.length 1085 - HConstants.HFILEBLOCK_HEADER_SIZE; 1086 } 1087 1088 /** 1089 * Returns the on-disk size of the block. Can only be called in the "block ready" state. 1090 * @return the on-disk size of the block ready to be written, including the header size, the 1091 * data and the checksum data. 1092 */ 1093 int getOnDiskSizeWithHeader() { 1094 expectState(State.BLOCK_READY); 1095 return onDiskBlockBytesWithHeader.size() + onDiskChecksum.length; 1096 } 1097 1098 /** 1099 * The uncompressed size of the block data. Does not include header size. 1100 */ 1101 int getUncompressedSizeWithoutHeader() { 1102 expectState(State.BLOCK_READY); 1103 return baosInMemory.size() - HConstants.HFILEBLOCK_HEADER_SIZE; 1104 } 1105 1106 /** 1107 * The uncompressed size of the block data, including header size. 1108 */ 1109 public int getUncompressedSizeWithHeader() { 1110 expectState(State.BLOCK_READY); 1111 return baosInMemory.size(); 1112 } 1113 1114 /** Returns true if a block is being written */ 1115 boolean isWriting() { 1116 return state == State.WRITING; 1117 } 1118 1119 /** 1120 * Returns the number of bytes written into the current block so far, or zero if not writing the 1121 * block at the moment. Note that this will return zero in the "block ready" state as well. 1122 * @return the number of bytes written 1123 */ 1124 public int encodedBlockSizeWritten() { 1125 return state != State.WRITING ? 0 : this.getEncodingState().getEncodedDataSizeWritten(); 1126 } 1127 1128 /** 1129 * Returns the number of bytes written into the current block so far, or zero if not writing the 1130 * block at the moment. Note that this will return zero in the "block ready" state as well. 1131 * @return the number of bytes written 1132 */ 1133 public int blockSizeWritten() { 1134 return state != State.WRITING ? 0 : this.getEncodingState().getUnencodedDataSizeWritten(); 1135 } 1136 1137 /** 1138 * Clones the header followed by the uncompressed data, even if using compression. This is 1139 * needed for storing uncompressed blocks in the block cache. Can be called in the "writing" 1140 * state or the "block ready" state. Returns only the header and data, does not include checksum 1141 * data. 1142 * @return Returns an uncompressed block ByteBuff for caching on write 1143 */ 1144 ByteBuff cloneUncompressedBufferWithHeader() { 1145 expectState(State.BLOCK_READY); 1146 ByteBuff bytebuff = allocator.allocate(baosInMemory.size()); 1147 baosInMemory.toByteBuff(bytebuff); 1148 int numBytes = (int) ChecksumUtil.numBytes(onDiskBlockBytesWithHeader.size(), 1149 fileContext.getBytesPerChecksum()); 1150 putHeader(bytebuff, onDiskBlockBytesWithHeader.size() + numBytes, baosInMemory.size(), 1151 onDiskBlockBytesWithHeader.size()); 1152 bytebuff.rewind(); 1153 return bytebuff; 1154 } 1155 1156 /** 1157 * Clones the header followed by the on-disk (compressed/encoded/encrypted) data. This is needed 1158 * for storing packed blocks in the block cache. Returns only the header and data, Does not 1159 * include checksum data. 1160 * @return Returns a copy of block bytes for caching on write 1161 */ 1162 private ByteBuff cloneOnDiskBufferWithHeader() { 1163 expectState(State.BLOCK_READY); 1164 ByteBuff bytebuff = allocator.allocate(onDiskBlockBytesWithHeader.size()); 1165 onDiskBlockBytesWithHeader.toByteBuff(bytebuff); 1166 bytebuff.rewind(); 1167 return bytebuff; 1168 } 1169 1170 private void expectState(State expectedState) { 1171 if (state != expectedState) { 1172 throw new IllegalStateException( 1173 "Expected state: " + expectedState + ", actual state: " + state); 1174 } 1175 } 1176 1177 /** 1178 * Takes the given {@link BlockWritable} instance, creates a new block of its appropriate type, 1179 * writes the writable into this block, and flushes the block into the output stream. The writer 1180 * is instructed not to buffer uncompressed bytes for cache-on-write. 1181 * @param bw the block-writable object to write as a block 1182 * @param out the file system output stream 1183 */ 1184 void writeBlock(BlockWritable bw, FSDataOutputStream out) throws IOException { 1185 bw.writeToBlock(startWriting(bw.getBlockType())); 1186 writeHeaderAndData(out); 1187 } 1188 1189 /** 1190 * Creates a new HFileBlock. Checksums have already been validated, so the byte buffer passed 1191 * into the constructor of this newly created block does not have checksum data even though the 1192 * header minor version is MINOR_VERSION_WITH_CHECKSUM. This is indicated by setting a 0 value 1193 * in bytesPerChecksum. This method copies the on-disk or uncompressed data to build the 1194 * HFileBlock which is used only while writing blocks and caching. 1195 * <p> 1196 * TODO: Should there be an option where a cache can ask that hbase preserve block checksums for 1197 * checking after a block comes out of the cache? Otehrwise, cache is responsible for blocks 1198 * being wholesome (ECC memory or if file-backed, it does checksumming). 1199 */ 1200 HFileBlock getBlockForCaching(CacheConfig cacheConf) { 1201 HFileContext newContext = new HFileContextBuilder().withBlockSize(fileContext.getBlocksize()) 1202 .withBytesPerCheckSum(0).withChecksumType(ChecksumType.NULL) // no checksums in cached data 1203 .withCompression(fileContext.getCompression()) 1204 .withDataBlockEncoding(fileContext.getDataBlockEncoding()) 1205 .withHBaseCheckSum(fileContext.isUseHBaseChecksum()) 1206 .withCompressTags(fileContext.isCompressTags()) 1207 .withIncludesMvcc(fileContext.isIncludesMvcc()) 1208 .withIncludesTags(fileContext.isIncludesTags()) 1209 .withColumnFamily(fileContext.getColumnFamily()).withTableName(fileContext.getTableName()) 1210 .build(); 1211 // Build the HFileBlock. 1212 HFileBlockBuilder builder = new HFileBlockBuilder(); 1213 ByteBuff buff; 1214 if (cacheConf.shouldCacheCompressed(blockType.getCategory())) { 1215 buff = cloneOnDiskBufferWithHeader(); 1216 } else { 1217 buff = cloneUncompressedBufferWithHeader(); 1218 } 1219 return builder.withBlockType(blockType) 1220 .withOnDiskSizeWithoutHeader(getOnDiskSizeWithoutHeader()) 1221 .withUncompressedSizeWithoutHeader(getUncompressedSizeWithoutHeader()) 1222 .withPrevBlockOffset(prevOffset).withByteBuff(buff).withFillHeader(FILL_HEADER) 1223 .withOffset(startOffset).withNextBlockOnDiskSize(UNSET) 1224 .withOnDiskDataSizeWithHeader(onDiskBlockBytesWithHeader.size() + onDiskChecksum.length) 1225 .withHFileContext(newContext).withByteBuffAllocator(cacheConf.getByteBuffAllocator()) 1226 .withShared(!buff.hasArray()).build(); 1227 } 1228 } 1229 1230 /** Something that can be written into a block. */ 1231 interface BlockWritable { 1232 /** The type of block this data should use. */ 1233 BlockType getBlockType(); 1234 1235 /** 1236 * Writes the block to the provided stream. Must not write any magic records. 1237 * @param out a stream to write uncompressed data into 1238 */ 1239 void writeToBlock(DataOutput out) throws IOException; 1240 } 1241 1242 /** 1243 * Iterator for reading {@link HFileBlock}s in load-on-open-section, such as root data index 1244 * block, meta index block, file info block etc. 1245 */ 1246 interface BlockIterator { 1247 /** 1248 * Get the next block, or null if there are no more blocks to iterate. 1249 */ 1250 HFileBlock nextBlock() throws IOException; 1251 1252 /** 1253 * Similar to {@link #nextBlock()} but checks block type, throws an exception if incorrect, and 1254 * returns the HFile block 1255 */ 1256 HFileBlock nextBlockWithBlockType(BlockType blockType) throws IOException; 1257 1258 /** 1259 * Now we use the {@link ByteBuffAllocator} to manage the nio ByteBuffers for HFileBlocks, so we 1260 * must deallocate all of the ByteBuffers in the end life. the BlockIterator's life cycle is 1261 * starting from opening an HFileReader and stopped when the HFileReader#close, so we will keep 1262 * track all the read blocks until we call {@link BlockIterator#freeBlocks()} when closing the 1263 * HFileReader. Sum bytes of those blocks in load-on-open section should be quite small, so 1264 * tracking them should be OK. 1265 */ 1266 void freeBlocks(); 1267 } 1268 1269 /** An HFile block reader with iteration ability. */ 1270 interface FSReader { 1271 /** 1272 * Reads the block at the given offset in the file with the given on-disk size and uncompressed 1273 * size. 1274 * @param offset of the file to read 1275 * @param onDiskSize the on-disk size of the entire block, including all applicable headers, 1276 * or -1 if unknown 1277 * @param pread true to use pread, otherwise use the stream read. 1278 * @param updateMetrics update the metrics or not. 1279 * @param intoHeap allocate the block's ByteBuff by {@link ByteBuffAllocator} or JVM heap. 1280 * For LRUBlockCache, we must ensure that the block to cache is an heap 1281 * one, because the memory occupation is based on heap now, also for 1282 * {@link CombinedBlockCache}, we use the heap LRUBlockCache as L1 cache to 1283 * cache small blocks such as IndexBlock or MetaBlock for faster access. So 1284 * introduce an flag here to decide whether allocate from JVM heap or not 1285 * so that we can avoid an extra off-heap to heap memory copy when using 1286 * LRUBlockCache. For most cases, we known what's the expected block type 1287 * we'll read, while for some special case (Example: 1288 * HFileReaderImpl#readNextDataBlock()), we cannot pre-decide what's the 1289 * expected block type, then we can only allocate block's ByteBuff from 1290 * {@link ByteBuffAllocator} firstly, and then when caching it in 1291 * {@link LruBlockCache} we'll check whether the ByteBuff is from heap or 1292 * not, if not then we'll clone it to an heap one and cache it. 1293 * @return the newly read block 1294 */ 1295 HFileBlock readBlockData(long offset, long onDiskSize, boolean pread, boolean updateMetrics, 1296 boolean intoHeap) throws IOException; 1297 1298 /** 1299 * Creates a block iterator over the given portion of the {@link HFile}. The iterator returns 1300 * blocks starting with offset such that offset <= startOffset < endOffset. Returned 1301 * blocks are always unpacked. Used when no hfile index available; e.g. reading in the hfile 1302 * index blocks themselves on file open. 1303 * @param startOffset the offset of the block to start iteration with 1304 * @param endOffset the offset to end iteration at (exclusive) 1305 * @return an iterator of blocks between the two given offsets 1306 */ 1307 BlockIterator blockRange(long startOffset, long endOffset); 1308 1309 /** Closes the backing streams */ 1310 void closeStreams() throws IOException; 1311 1312 /** Get a decoder for {@link BlockType#ENCODED_DATA} blocks from this file. */ 1313 HFileBlockDecodingContext getBlockDecodingContext(); 1314 1315 /** Get the default decoder for blocks from this file. */ 1316 HFileBlockDecodingContext getDefaultBlockDecodingContext(); 1317 1318 void setIncludesMemStoreTS(boolean includesMemstoreTS); 1319 1320 void setDataBlockEncoder(HFileDataBlockEncoder encoder, Configuration conf); 1321 1322 /** 1323 * To close the stream's socket. Note: This can be concurrently called from multiple threads and 1324 * implementation should take care of thread safety. 1325 */ 1326 void unbufferStream(); 1327 } 1328 1329 /** 1330 * Data-structure to use caching the header of the NEXT block. Only works if next read that comes 1331 * in here is next in sequence in this block. When we read, we read current block and the next 1332 * blocks' header. We do this so we have the length of the next block to read if the hfile index 1333 * is not available (rare, at hfile open only). 1334 */ 1335 private static class PrefetchedHeader { 1336 long offset = -1; 1337 byte[] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE]; 1338 final ByteBuff buf = new SingleByteBuff(ByteBuffer.wrap(header, 0, header.length)); 1339 1340 @Override 1341 public String toString() { 1342 return "offset=" + this.offset + ", header=" + Bytes.toStringBinary(header); 1343 } 1344 } 1345 1346 /** 1347 * Reads version 2 HFile blocks from the filesystem. 1348 */ 1349 static class FSReaderImpl implements FSReader { 1350 /** 1351 * The file system stream of the underlying {@link HFile} that does or doesn't do checksum 1352 * validations in the filesystem 1353 */ 1354 private FSDataInputStreamWrapper streamWrapper; 1355 1356 private HFileBlockDecodingContext encodedBlockDecodingCtx; 1357 1358 /** Default context used when BlockType != {@link BlockType#ENCODED_DATA}. */ 1359 private final HFileBlockDefaultDecodingContext defaultDecodingCtx; 1360 1361 /** 1362 * Cache of the NEXT header after this. Check it is indeed next blocks header before using it. 1363 * TODO: Review. This overread into next block to fetch next blocks header seems unnecessary 1364 * given we usually get the block size from the hfile index. Review! 1365 */ 1366 private AtomicReference<PrefetchedHeader> prefetchedHeader = 1367 new AtomicReference<>(new PrefetchedHeader()); 1368 1369 /** The size of the file we are reading from, or -1 if unknown. */ 1370 private long fileSize; 1371 1372 /** The size of the header */ 1373 protected final int hdrSize; 1374 1375 /** The filesystem used to access data */ 1376 private HFileSystem hfs; 1377 1378 private HFileContext fileContext; 1379 // Cache the fileName 1380 private String pathName; 1381 1382 private final ByteBuffAllocator allocator; 1383 1384 private final Lock streamLock = new ReentrantLock(); 1385 1386 private final boolean isPreadAllBytes; 1387 1388 FSReaderImpl(ReaderContext readerContext, HFileContext fileContext, ByteBuffAllocator allocator, 1389 Configuration conf) throws IOException { 1390 this.fileSize = readerContext.getFileSize(); 1391 this.hfs = readerContext.getFileSystem(); 1392 if (readerContext.getFilePath() != null) { 1393 this.pathName = readerContext.getFilePath().toString(); 1394 } 1395 this.fileContext = fileContext; 1396 this.hdrSize = headerSize(fileContext.isUseHBaseChecksum()); 1397 this.allocator = allocator; 1398 1399 this.streamWrapper = readerContext.getInputStreamWrapper(); 1400 // Older versions of HBase didn't support checksum. 1401 this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum()); 1402 defaultDecodingCtx = new HFileBlockDefaultDecodingContext(conf, fileContext); 1403 encodedBlockDecodingCtx = defaultDecodingCtx; 1404 isPreadAllBytes = readerContext.isPreadAllBytes(); 1405 } 1406 1407 @Override 1408 public BlockIterator blockRange(final long startOffset, final long endOffset) { 1409 final FSReader owner = this; // handle for inner class 1410 return new BlockIterator() { 1411 private volatile boolean freed = false; 1412 // Tracking all read blocks until we call freeBlocks. 1413 private List<HFileBlock> blockTracker = new ArrayList<>(); 1414 private long offset = startOffset; 1415 // Cache length of next block. Current block has the length of next block in it. 1416 private long length = -1; 1417 1418 @Override 1419 public HFileBlock nextBlock() throws IOException { 1420 if (offset >= endOffset) { 1421 return null; 1422 } 1423 HFileBlock b = readBlockData(offset, length, false, false, true); 1424 offset += b.getOnDiskSizeWithHeader(); 1425 length = b.getNextBlockOnDiskSize(); 1426 HFileBlock uncompressed = b.unpack(fileContext, owner); 1427 if (uncompressed != b) { 1428 b.release(); // Need to release the compressed Block now. 1429 } 1430 blockTracker.add(uncompressed); 1431 return uncompressed; 1432 } 1433 1434 @Override 1435 public HFileBlock nextBlockWithBlockType(BlockType blockType) throws IOException { 1436 HFileBlock blk = nextBlock(); 1437 if (blk.getBlockType() != blockType) { 1438 throw new IOException( 1439 "Expected block of type " + blockType + " but found " + blk.getBlockType()); 1440 } 1441 return blk; 1442 } 1443 1444 @Override 1445 public void freeBlocks() { 1446 if (freed) { 1447 return; 1448 } 1449 blockTracker.forEach(HFileBlock::release); 1450 blockTracker = null; 1451 freed = true; 1452 } 1453 }; 1454 } 1455 1456 /** 1457 * Does a positional read or a seek and read into the given byte buffer. We need take care that 1458 * we will call the {@link ByteBuff#release()} for every exit to deallocate the ByteBuffers, 1459 * otherwise the memory leak may happen. 1460 * @param dest destination buffer 1461 * @param size size of read 1462 * @param peekIntoNextBlock whether to read the next block's on-disk size 1463 * @param fileOffset position in the stream to read at 1464 * @param pread whether we should do a positional read 1465 * @param istream The input source of data 1466 * @return true to indicate the destination buffer include the next block header, otherwise only 1467 * include the current block data without the next block header. 1468 * @throws IOException if any IO error happen. 1469 */ 1470 protected boolean readAtOffset(FSDataInputStream istream, ByteBuff dest, int size, 1471 boolean peekIntoNextBlock, long fileOffset, boolean pread) throws IOException { 1472 if (!pread) { 1473 // Seek + read. Better for scanning. 1474 istream.seek(fileOffset); 1475 long realOffset = istream.getPos(); 1476 if (realOffset != fileOffset) { 1477 throw new IOException("Tried to seek to " + fileOffset + " to read " + size 1478 + " bytes, but pos=" + realOffset + " after seek"); 1479 } 1480 if (!peekIntoNextBlock) { 1481 BlockIOUtils.readFully(dest, istream, size); 1482 return false; 1483 } 1484 1485 // Try to read the next block header 1486 if (!BlockIOUtils.readWithExtra(dest, istream, size, hdrSize)) { 1487 // did not read the next block header. 1488 return false; 1489 } 1490 } else { 1491 // Positional read. Better for random reads; or when the streamLock is already locked. 1492 int extraSize = peekIntoNextBlock ? hdrSize : 0; 1493 if ( 1494 !BlockIOUtils.preadWithExtra(dest, istream, fileOffset, size, extraSize, isPreadAllBytes) 1495 ) { 1496 // did not read the next block header. 1497 return false; 1498 } 1499 } 1500 assert peekIntoNextBlock; 1501 return true; 1502 } 1503 1504 /** 1505 * Reads a version 2 block (version 1 blocks not supported and not expected). Tries to do as 1506 * little memory allocation as possible, using the provided on-disk size. 1507 * @param offset the offset in the stream to read at 1508 * @param onDiskSizeWithHeaderL the on-disk size of the block, including the header, or -1 if 1509 * unknown; i.e. when iterating over blocks reading in the file 1510 * metadata info. 1511 * @param pread whether to use a positional read 1512 * @param updateMetrics whether to update the metrics 1513 * @param intoHeap allocate ByteBuff of block from heap or off-heap. 1514 * @see FSReader#readBlockData(long, long, boolean, boolean, boolean) for more details about the 1515 * useHeap. 1516 */ 1517 @Override 1518 public HFileBlock readBlockData(long offset, long onDiskSizeWithHeaderL, boolean pread, 1519 boolean updateMetrics, boolean intoHeap) throws IOException { 1520 // Get a copy of the current state of whether to validate 1521 // hbase checksums or not for this read call. This is not 1522 // thread-safe but the one constraint is that if we decide 1523 // to skip hbase checksum verification then we are 1524 // guaranteed to use hdfs checksum verification. 1525 boolean doVerificationThruHBaseChecksum = streamWrapper.shouldUseHBaseChecksum(); 1526 FSDataInputStream is = streamWrapper.getStream(doVerificationThruHBaseChecksum); 1527 final Context context = Context.current().with(CONTEXT_KEY, 1528 new HFileContextAttributesBuilderConsumer(fileContext) 1529 .setSkipChecksum(doVerificationThruHBaseChecksum) 1530 .setReadType(pread ? ReadType.POSITIONAL_READ : ReadType.SEEK_PLUS_READ)); 1531 try (Scope ignored = context.makeCurrent()) { 1532 HFileBlock blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread, 1533 doVerificationThruHBaseChecksum, updateMetrics, intoHeap); 1534 if (blk == null) { 1535 HFile.LOG.warn("HBase checksum verification failed for file {} at offset {} filesize {}." 1536 + " Retrying read with HDFS checksums turned on...", pathName, offset, fileSize); 1537 1538 if (!doVerificationThruHBaseChecksum) { 1539 String msg = "HBase checksum verification failed for file " + pathName + " at offset " 1540 + offset + " filesize " + fileSize + " but this cannot happen because doVerify is " 1541 + doVerificationThruHBaseChecksum; 1542 HFile.LOG.warn(msg); 1543 throw new IOException(msg); // cannot happen case here 1544 } 1545 HFile.CHECKSUM_FAILURES.increment(); // update metrics 1546 1547 // If we have a checksum failure, we fall back into a mode where 1548 // the next few reads use HDFS level checksums. We aim to make the 1549 // next CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD reads avoid 1550 // hbase checksum verification, but since this value is set without 1551 // holding any locks, it can so happen that we might actually do 1552 // a few more than precisely this number. 1553 is = this.streamWrapper.fallbackToFsChecksum(CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD); 1554 doVerificationThruHBaseChecksum = false; 1555 blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread, 1556 doVerificationThruHBaseChecksum, updateMetrics, intoHeap); 1557 if (blk != null) { 1558 HFile.LOG.warn( 1559 "HDFS checksum verification succeeded for file {} at offset {} filesize" + " {}", 1560 pathName, offset, fileSize); 1561 } 1562 } 1563 if (blk == null && !doVerificationThruHBaseChecksum) { 1564 String msg = 1565 "readBlockData failed, possibly due to " + "checksum verification failed for file " 1566 + pathName + " at offset " + offset + " filesize " + fileSize; 1567 HFile.LOG.warn(msg); 1568 throw new IOException(msg); 1569 } 1570 1571 // If there is a checksum mismatch earlier, then retry with 1572 // HBase checksums switched off and use HDFS checksum verification. 1573 // This triggers HDFS to detect and fix corrupt replicas. The 1574 // next checksumOffCount read requests will use HDFS checksums. 1575 // The decrementing of this.checksumOffCount is not thread-safe, 1576 // but it is harmless because eventually checksumOffCount will be 1577 // a negative number. 1578 streamWrapper.checksumOk(); 1579 return blk; 1580 } 1581 } 1582 1583 /** 1584 * Returns Check <code>onDiskSizeWithHeaderL</code> size is healthy and then return it as an int 1585 */ 1586 private static int checkAndGetSizeAsInt(final long onDiskSizeWithHeaderL, final int hdrSize) 1587 throws IOException { 1588 if ( 1589 (onDiskSizeWithHeaderL < hdrSize && onDiskSizeWithHeaderL != -1) 1590 || onDiskSizeWithHeaderL >= Integer.MAX_VALUE 1591 ) { 1592 throw new IOException( 1593 "Invalid onDisksize=" + onDiskSizeWithHeaderL + ": expected to be at least " + hdrSize 1594 + " and at most " + Integer.MAX_VALUE + ", or -1"); 1595 } 1596 return (int) onDiskSizeWithHeaderL; 1597 } 1598 1599 /** 1600 * Verify the passed in onDiskSizeWithHeader aligns with what is in the header else something is 1601 * not right. 1602 */ 1603 private void verifyOnDiskSizeMatchesHeader(final int passedIn, final ByteBuff headerBuf, 1604 final long offset, boolean verifyChecksum) throws IOException { 1605 // Assert size provided aligns with what is in the header 1606 int fromHeader = getOnDiskSizeWithHeader(headerBuf, verifyChecksum); 1607 if (passedIn != fromHeader) { 1608 throw new IOException("Passed in onDiskSizeWithHeader=" + passedIn + " != " + fromHeader 1609 + ", offset=" + offset + ", fileContext=" + this.fileContext); 1610 } 1611 } 1612 1613 /** 1614 * Check atomic reference cache for this block's header. Cache only good if next read coming 1615 * through is next in sequence in the block. We read next block's header on the tail of reading 1616 * the previous block to save a seek. Otherwise, we have to do a seek to read the header before 1617 * we can pull in the block OR we have to backup the stream because we over-read (the next 1618 * block's header). 1619 * @see PrefetchedHeader 1620 * @return The cached block header or null if not found. 1621 * @see #cacheNextBlockHeader(long, ByteBuff, int, int) 1622 */ 1623 private ByteBuff getCachedHeader(final long offset) { 1624 PrefetchedHeader ph = this.prefetchedHeader.get(); 1625 return ph != null && ph.offset == offset ? ph.buf : null; 1626 } 1627 1628 /** 1629 * Save away the next blocks header in atomic reference. 1630 * @see #getCachedHeader(long) 1631 * @see PrefetchedHeader 1632 */ 1633 private void cacheNextBlockHeader(final long offset, ByteBuff onDiskBlock, 1634 int onDiskSizeWithHeader, int headerLength) { 1635 PrefetchedHeader ph = new PrefetchedHeader(); 1636 ph.offset = offset; 1637 onDiskBlock.get(onDiskSizeWithHeader, ph.header, 0, headerLength); 1638 this.prefetchedHeader.set(ph); 1639 } 1640 1641 private int getNextBlockOnDiskSize(boolean readNextHeader, ByteBuff onDiskBlock, 1642 int onDiskSizeWithHeader) { 1643 int nextBlockOnDiskSize = -1; 1644 if (readNextHeader) { 1645 nextBlockOnDiskSize = 1646 onDiskBlock.getIntAfterPosition(onDiskSizeWithHeader + BlockType.MAGIC_LENGTH) + hdrSize; 1647 } 1648 return nextBlockOnDiskSize; 1649 } 1650 1651 private ByteBuff allocate(int size, boolean intoHeap) { 1652 return intoHeap ? HEAP.allocate(size) : allocator.allocate(size); 1653 } 1654 1655 /** 1656 * Reads a version 2 block. 1657 * @param offset the offset in the stream to read at. 1658 * @param onDiskSizeWithHeaderL the on-disk size of the block, including the header and 1659 * checksums if present or -1 if unknown (as a long). Can be -1 if 1660 * we are doing raw iteration of blocks as when loading up file 1661 * metadata; i.e. the first read of a new file. Usually non-null 1662 * gotten from the file index. 1663 * @param pread whether to use a positional read 1664 * @param verifyChecksum Whether to use HBase checksums. If HBase checksum is switched 1665 * off, then use HDFS checksum. Can also flip on/off reading same 1666 * file if we hit a troublesome patch in an hfile. 1667 * @param updateMetrics whether need to update the metrics. 1668 * @param intoHeap allocate the ByteBuff of block from heap or off-heap. 1669 * @return the HFileBlock or null if there is a HBase checksum mismatch 1670 */ 1671 protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset, 1672 long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum, boolean updateMetrics, 1673 boolean intoHeap) throws IOException { 1674 if (offset < 0) { 1675 throw new IOException("Invalid offset=" + offset + " trying to read " + "block (onDiskSize=" 1676 + onDiskSizeWithHeaderL + ")"); 1677 } 1678 1679 final Span span = Span.current(); 1680 final AttributesBuilder attributesBuilder = Attributes.builder(); 1681 Optional.of(Context.current()).map(val -> val.get(CONTEXT_KEY)) 1682 .ifPresent(c -> c.accept(attributesBuilder)); 1683 int onDiskSizeWithHeader = checkAndGetSizeAsInt(onDiskSizeWithHeaderL, hdrSize); 1684 // Try and get cached header. Will serve us in rare case where onDiskSizeWithHeaderL is -1 1685 // and will save us having to seek the stream backwards to reread the header we 1686 // read the last time through here. 1687 ByteBuff headerBuf = getCachedHeader(offset); 1688 LOG.trace( 1689 "Reading {} at offset={}, pread={}, verifyChecksum={}, cachedHeader={}, " 1690 + "onDiskSizeWithHeader={}", 1691 this.fileContext.getHFileName(), offset, pread, verifyChecksum, headerBuf, 1692 onDiskSizeWithHeader); 1693 // This is NOT same as verifyChecksum. This latter is whether to do hbase 1694 // checksums. Can change with circumstances. The below flag is whether the 1695 // file has support for checksums (version 2+). 1696 boolean checksumSupport = this.fileContext.isUseHBaseChecksum(); 1697 long startTime = EnvironmentEdgeManager.currentTime(); 1698 if (onDiskSizeWithHeader <= 0) { 1699 // We were not passed the block size. Need to get it from the header. If header was 1700 // not cached (see getCachedHeader above), need to seek to pull it in. This is costly 1701 // and should happen very rarely. Currently happens on open of a hfile reader where we 1702 // read the trailer blocks to pull in the indices. Otherwise, we are reading block sizes 1703 // out of the hfile index. To check, enable TRACE in this file and you'll get an exception 1704 // in a LOG every time we seek. See HBASE-17072 for more detail. 1705 if (headerBuf == null) { 1706 if (LOG.isTraceEnabled()) { 1707 LOG.trace("Extra seek to get block size!", new RuntimeException()); 1708 } 1709 span.addEvent("Extra seek to get block size!", attributesBuilder.build()); 1710 headerBuf = HEAP.allocate(hdrSize); 1711 readAtOffset(is, headerBuf, hdrSize, false, offset, pread); 1712 headerBuf.rewind(); 1713 } 1714 onDiskSizeWithHeader = getOnDiskSizeWithHeader(headerBuf, checksumSupport); 1715 } 1716 int preReadHeaderSize = headerBuf == null ? 0 : hdrSize; 1717 // Allocate enough space to fit the next block's header too; saves a seek next time through. 1718 // onDiskBlock is whole block + header + checksums then extra hdrSize to read next header; 1719 // onDiskSizeWithHeader is header, body, and any checksums if present. preReadHeaderSize 1720 // says where to start reading. If we have the header cached, then we don't need to read 1721 // it again and we can likely read from last place we left off w/o need to backup and reread 1722 // the header we read last time through here. 1723 ByteBuff onDiskBlock = this.allocate(onDiskSizeWithHeader + hdrSize, intoHeap); 1724 boolean initHFileBlockSuccess = false; 1725 try { 1726 if (headerBuf != null) { 1727 onDiskBlock.put(0, headerBuf, 0, hdrSize).position(hdrSize); 1728 } 1729 boolean readNextHeader = readAtOffset(is, onDiskBlock, 1730 onDiskSizeWithHeader - preReadHeaderSize, true, offset + preReadHeaderSize, pread); 1731 onDiskBlock.rewind(); // in case of moving position when copying a cached header 1732 int nextBlockOnDiskSize = 1733 getNextBlockOnDiskSize(readNextHeader, onDiskBlock, onDiskSizeWithHeader); 1734 if (headerBuf == null) { 1735 headerBuf = onDiskBlock.duplicate().position(0).limit(hdrSize); 1736 } 1737 // Do a few checks before we go instantiate HFileBlock. 1738 assert onDiskSizeWithHeader > this.hdrSize; 1739 verifyOnDiskSizeMatchesHeader(onDiskSizeWithHeader, headerBuf, offset, checksumSupport); 1740 ByteBuff curBlock = onDiskBlock.duplicate().position(0).limit(onDiskSizeWithHeader); 1741 // Verify checksum of the data before using it for building HFileBlock. 1742 if (verifyChecksum && !validateChecksum(offset, curBlock, hdrSize)) { 1743 return null; 1744 } 1745 // remove checksum from buffer now that it's verified 1746 int sizeWithoutChecksum = curBlock.getInt(Header.ON_DISK_DATA_SIZE_WITH_HEADER_INDEX); 1747 curBlock.limit(sizeWithoutChecksum); 1748 long duration = EnvironmentEdgeManager.currentTime() - startTime; 1749 if (updateMetrics) { 1750 HFile.updateReadLatency(duration, pread); 1751 } 1752 // The onDiskBlock will become the headerAndDataBuffer for this block. 1753 // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already 1754 // contains the header of next block, so no need to set next block's header in it. 1755 HFileBlock hFileBlock = createFromBuff(curBlock, checksumSupport, offset, 1756 nextBlockOnDiskSize, fileContext, intoHeap ? HEAP : allocator); 1757 // Run check on uncompressed sizings. 1758 if (!fileContext.isCompressedOrEncrypted()) { 1759 hFileBlock.sanityCheckUncompressed(); 1760 } 1761 LOG.trace("Read {} in {} ms", hFileBlock, duration); 1762 span.addEvent("Read block", attributesBuilder.build()); 1763 // Cache next block header if we read it for the next time through here. 1764 if (nextBlockOnDiskSize != -1) { 1765 cacheNextBlockHeader(offset + hFileBlock.getOnDiskSizeWithHeader(), onDiskBlock, 1766 onDiskSizeWithHeader, hdrSize); 1767 } 1768 initHFileBlockSuccess = true; 1769 return hFileBlock; 1770 } finally { 1771 if (!initHFileBlockSuccess) { 1772 onDiskBlock.release(); 1773 } 1774 } 1775 } 1776 1777 @Override 1778 public void setIncludesMemStoreTS(boolean includesMemstoreTS) { 1779 this.fileContext = 1780 new HFileContextBuilder(this.fileContext).withIncludesMvcc(includesMemstoreTS).build(); 1781 } 1782 1783 @Override 1784 public void setDataBlockEncoder(HFileDataBlockEncoder encoder, Configuration conf) { 1785 encodedBlockDecodingCtx = encoder.newDataBlockDecodingContext(conf, fileContext); 1786 } 1787 1788 @Override 1789 public HFileBlockDecodingContext getBlockDecodingContext() { 1790 return this.encodedBlockDecodingCtx; 1791 } 1792 1793 @Override 1794 public HFileBlockDecodingContext getDefaultBlockDecodingContext() { 1795 return this.defaultDecodingCtx; 1796 } 1797 1798 /** 1799 * Generates the checksum for the header as well as the data and then validates it. If the block 1800 * doesn't uses checksum, returns false. 1801 * @return True if checksum matches, else false. 1802 */ 1803 private boolean validateChecksum(long offset, ByteBuff data, int hdrSize) { 1804 // If this is an older version of the block that does not have checksums, then return false 1805 // indicating that checksum verification did not succeed. Actually, this method should never 1806 // be called when the minorVersion is 0, thus this is a defensive check for a cannot-happen 1807 // case. Since this is a cannot-happen case, it is better to return false to indicate a 1808 // checksum validation failure. 1809 if (!fileContext.isUseHBaseChecksum()) { 1810 return false; 1811 } 1812 return ChecksumUtil.validateChecksum(data, pathName, offset, hdrSize); 1813 } 1814 1815 @Override 1816 public void closeStreams() throws IOException { 1817 streamWrapper.close(); 1818 } 1819 1820 @Override 1821 public void unbufferStream() { 1822 // To handle concurrent reads, ensure that no other client is accessing the streams while we 1823 // unbuffer it. 1824 if (streamLock.tryLock()) { 1825 try { 1826 this.streamWrapper.unbuffer(); 1827 } finally { 1828 streamLock.unlock(); 1829 } 1830 } 1831 } 1832 1833 @Override 1834 public String toString() { 1835 return "hfs=" + hfs + ", path=" + pathName + ", fileContext=" + fileContext; 1836 } 1837 } 1838 1839 /** An additional sanity-check in case no compression or encryption is being used. */ 1840 void sanityCheckUncompressed() throws IOException { 1841 if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader + totalChecksumBytes()) { 1842 throw new IOException("Using no compression but " + "onDiskSizeWithoutHeader=" 1843 + onDiskSizeWithoutHeader + ", " + "uncompressedSizeWithoutHeader=" 1844 + uncompressedSizeWithoutHeader + ", numChecksumbytes=" + totalChecksumBytes()); 1845 } 1846 } 1847 1848 // Cacheable implementation 1849 @Override 1850 public int getSerializedLength() { 1851 if (buf != null) { 1852 // Include extra bytes for block metadata. 1853 return this.buf.limit() + BLOCK_METADATA_SPACE; 1854 } 1855 return 0; 1856 } 1857 1858 // Cacheable implementation 1859 @Override 1860 public void serialize(ByteBuffer destination, boolean includeNextBlockMetadata) { 1861 this.buf.get(destination, 0, getSerializedLength() - BLOCK_METADATA_SPACE); 1862 destination = addMetaData(destination, includeNextBlockMetadata); 1863 1864 // Make it ready for reading. flip sets position to zero and limit to current position which 1865 // is what we want if we do not want to serialize the block plus checksums if present plus 1866 // metadata. 1867 destination.flip(); 1868 } 1869 1870 /** 1871 * For use by bucketcache. This exposes internals. 1872 */ 1873 public ByteBuffer getMetaData(ByteBuffer bb) { 1874 bb = addMetaData(bb, true); 1875 bb.flip(); 1876 return bb; 1877 } 1878 1879 /** 1880 * Adds metadata at current position (position is moved forward). Does not flip or reset. 1881 * @return The passed <code>destination</code> with metadata added. 1882 */ 1883 private ByteBuffer addMetaData(final ByteBuffer destination, boolean includeNextBlockMetadata) { 1884 destination.put(this.fileContext.isUseHBaseChecksum() ? (byte) 1 : (byte) 0); 1885 destination.putLong(this.offset); 1886 if (includeNextBlockMetadata) { 1887 destination.putInt(this.nextBlockOnDiskSize); 1888 } 1889 return destination; 1890 } 1891 1892 // Cacheable implementation 1893 @Override 1894 public CacheableDeserializer<Cacheable> getDeserializer() { 1895 return HFileBlock.BLOCK_DESERIALIZER; 1896 } 1897 1898 @Override 1899 public int hashCode() { 1900 int result = 1; 1901 result = result * 31 + blockType.hashCode(); 1902 result = result * 31 + nextBlockOnDiskSize; 1903 result = result * 31 + (int) (offset ^ (offset >>> 32)); 1904 result = result * 31 + onDiskSizeWithoutHeader; 1905 result = result * 31 + (int) (prevBlockOffset ^ (prevBlockOffset >>> 32)); 1906 result = result * 31 + uncompressedSizeWithoutHeader; 1907 result = result * 31 + buf.hashCode(); 1908 return result; 1909 } 1910 1911 @Override 1912 public boolean equals(Object comparison) { 1913 if (this == comparison) { 1914 return true; 1915 } 1916 if (comparison == null) { 1917 return false; 1918 } 1919 if (!(comparison instanceof HFileBlock)) { 1920 return false; 1921 } 1922 1923 HFileBlock castedComparison = (HFileBlock) comparison; 1924 1925 if (castedComparison.blockType != this.blockType) { 1926 return false; 1927 } 1928 if (castedComparison.nextBlockOnDiskSize != this.nextBlockOnDiskSize) { 1929 return false; 1930 } 1931 // Offset is important. Needed when we have to remake cachekey when block is returned to cache. 1932 if (castedComparison.offset != this.offset) { 1933 return false; 1934 } 1935 if (castedComparison.onDiskSizeWithoutHeader != this.onDiskSizeWithoutHeader) { 1936 return false; 1937 } 1938 if (castedComparison.prevBlockOffset != this.prevBlockOffset) { 1939 return false; 1940 } 1941 if (castedComparison.uncompressedSizeWithoutHeader != this.uncompressedSizeWithoutHeader) { 1942 return false; 1943 } 1944 if ( 1945 ByteBuff.compareTo(this.buf, 0, this.buf.limit(), castedComparison.buf, 0, 1946 castedComparison.buf.limit()) != 0 1947 ) { 1948 return false; 1949 } 1950 return true; 1951 } 1952 1953 DataBlockEncoding getDataBlockEncoding() { 1954 if (blockType == BlockType.ENCODED_DATA) { 1955 return DataBlockEncoding.getEncodingById(getDataBlockEncodingId()); 1956 } 1957 return DataBlockEncoding.NONE; 1958 } 1959 1960 byte getChecksumType() { 1961 return this.fileContext.getChecksumType().getCode(); 1962 } 1963 1964 int getBytesPerChecksum() { 1965 return this.fileContext.getBytesPerChecksum(); 1966 } 1967 1968 /** Returns the size of data on disk + header. Excludes checksum. */ 1969 int getOnDiskDataSizeWithHeader() { 1970 return this.onDiskDataSizeWithHeader; 1971 } 1972 1973 /** 1974 * Calculate the number of bytes required to store all the checksums for this block. Each checksum 1975 * value is a 4 byte integer. 1976 */ 1977 int totalChecksumBytes() { 1978 // If the hfile block has minorVersion 0, then there are no checksum 1979 // data to validate. Similarly, a zero value in this.bytesPerChecksum 1980 // indicates that cached blocks do not have checksum data because 1981 // checksums were already validated when the block was read from disk. 1982 if (!fileContext.isUseHBaseChecksum() || this.fileContext.getBytesPerChecksum() == 0) { 1983 return 0; 1984 } 1985 return (int) ChecksumUtil.numBytes(onDiskDataSizeWithHeader, 1986 this.fileContext.getBytesPerChecksum()); 1987 } 1988 1989 /** 1990 * Returns the size of this block header. 1991 */ 1992 public int headerSize() { 1993 return headerSize(this.fileContext.isUseHBaseChecksum()); 1994 } 1995 1996 /** 1997 * Maps a minor version to the size of the header. 1998 */ 1999 public static int headerSize(boolean usesHBaseChecksum) { 2000 return usesHBaseChecksum 2001 ? HConstants.HFILEBLOCK_HEADER_SIZE 2002 : HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM; 2003 } 2004 2005 /** 2006 * Return the appropriate DUMMY_HEADER for the minor version 2007 */ 2008 // TODO: Why is this in here? 2009 byte[] getDummyHeaderForVersion() { 2010 return getDummyHeaderForVersion(this.fileContext.isUseHBaseChecksum()); 2011 } 2012 2013 /** 2014 * Return the appropriate DUMMY_HEADER for the minor version 2015 */ 2016 static private byte[] getDummyHeaderForVersion(boolean usesHBaseChecksum) { 2017 return usesHBaseChecksum ? HConstants.HFILEBLOCK_DUMMY_HEADER : DUMMY_HEADER_NO_CHECKSUM; 2018 } 2019 2020 /** 2021 * @return This HFileBlocks fileContext which will a derivative of the fileContext for the file 2022 * from which this block's data was originally read. 2023 */ 2024 public HFileContext getHFileContext() { 2025 return this.fileContext; 2026 } 2027 2028 /** 2029 * Convert the contents of the block header into a human readable string. This is mostly helpful 2030 * for debugging. This assumes that the block has minor version > 0. 2031 */ 2032 static String toStringHeader(ByteBuff buf) throws IOException { 2033 byte[] magicBuf = new byte[Math.min(buf.limit() - buf.position(), BlockType.MAGIC_LENGTH)]; 2034 buf.get(magicBuf); 2035 BlockType bt = BlockType.parse(magicBuf, 0, BlockType.MAGIC_LENGTH); 2036 int compressedBlockSizeNoHeader = buf.getInt(); 2037 int uncompressedBlockSizeNoHeader = buf.getInt(); 2038 long prevBlockOffset = buf.getLong(); 2039 byte cksumtype = buf.get(); 2040 long bytesPerChecksum = buf.getInt(); 2041 long onDiskDataSizeWithHeader = buf.getInt(); 2042 return " Header dump: magic: " + Bytes.toString(magicBuf) + " blockType " + bt 2043 + " compressedBlockSizeNoHeader " + compressedBlockSizeNoHeader 2044 + " uncompressedBlockSizeNoHeader " + uncompressedBlockSizeNoHeader + " prevBlockOffset " 2045 + prevBlockOffset + " checksumType " + ChecksumType.codeToType(cksumtype) 2046 + " bytesPerChecksum " + bytesPerChecksum + " onDiskDataSizeWithHeader " 2047 + onDiskDataSizeWithHeader; 2048 } 2049 2050 /** 2051 * Creates a new HFileBlockBuilder from the existing block and a new ByteBuff. The builder will be 2052 * loaded with all of the original fields from blk, except now using the newBuff and setting 2053 * isSharedMem based on the source of the passed in newBuff. An existing HFileBlock may have been 2054 * an {@link ExclusiveMemHFileBlock}, but the new buffer might call for a 2055 * {@link SharedMemHFileBlock}. Or vice versa. 2056 * @param blk the block to clone from 2057 * @param newBuff the new buffer to use 2058 */ 2059 private static HFileBlockBuilder createBuilder(HFileBlock blk, ByteBuff newBuff) { 2060 return new HFileBlockBuilder().withBlockType(blk.blockType) 2061 .withOnDiskSizeWithoutHeader(blk.onDiskSizeWithoutHeader) 2062 .withUncompressedSizeWithoutHeader(blk.uncompressedSizeWithoutHeader) 2063 .withPrevBlockOffset(blk.prevBlockOffset).withByteBuff(newBuff).withOffset(blk.offset) 2064 .withOnDiskDataSizeWithHeader(blk.onDiskDataSizeWithHeader) 2065 .withNextBlockOnDiskSize(blk.nextBlockOnDiskSize).withHFileContext(blk.fileContext) 2066 .withByteBuffAllocator(blk.allocator).withShared(!newBuff.hasArray()); 2067 } 2068 2069 private static HFileBlock shallowClone(HFileBlock blk, ByteBuff newBuf) { 2070 return createBuilder(blk, newBuf).build(); 2071 } 2072 2073 static HFileBlock deepCloneOnHeap(HFileBlock blk) { 2074 ByteBuff deepCloned = ByteBuff.wrap(ByteBuffer.wrap(blk.buf.toBytes(0, blk.buf.limit()))); 2075 return createBuilder(blk, deepCloned).build(); 2076 } 2077}