001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP; 021 022import java.io.DataInputStream; 023import java.io.DataOutput; 024import java.io.DataOutputStream; 025import java.io.IOException; 026import java.nio.ByteBuffer; 027import java.util.ArrayList; 028import java.util.List; 029import java.util.concurrent.atomic.AtomicReference; 030import java.util.concurrent.locks.Lock; 031import java.util.concurrent.locks.ReentrantLock; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FSDataInputStream; 034import org.apache.hadoop.fs.FSDataOutputStream; 035import org.apache.hadoop.hbase.Cell; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.fs.HFileSystem; 038import org.apache.hadoop.hbase.io.ByteArrayOutputStream; 039import org.apache.hadoop.hbase.io.ByteBuffAllocator; 040import org.apache.hadoop.hbase.io.ByteBuffInputStream; 041import org.apache.hadoop.hbase.io.ByteBufferWriterDataOutputStream; 042import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; 043import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 044import org.apache.hadoop.hbase.io.encoding.EncodingState; 045import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; 046import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext; 047import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext; 048import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext; 049import org.apache.hadoop.hbase.io.util.BlockIOUtils; 050import org.apache.hadoop.hbase.nio.ByteBuff; 051import org.apache.hadoop.hbase.nio.MultiByteBuff; 052import org.apache.hadoop.hbase.nio.SingleByteBuff; 053import org.apache.hadoop.hbase.regionserver.ShipperListener; 054import org.apache.hadoop.hbase.util.Bytes; 055import org.apache.hadoop.hbase.util.ChecksumType; 056import org.apache.hadoop.hbase.util.ClassSize; 057import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 058import org.apache.yetus.audience.InterfaceAudience; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061 062import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 063 064/** 065 * Cacheable Blocks of an {@link HFile} version 2 file. Version 2 was introduced in hbase-0.92.0. 066 * <p> 067 * Version 1 was the original file block. Version 2 was introduced when we changed the hbase file 068 * format to support multi-level block indexes and compound bloom filters (HBASE-3857). Support for 069 * Version 1 was removed in hbase-1.3.0. 070 * <h3>HFileBlock: Version 2</h3> In version 2, a block is structured as follows: 071 * <ul> 072 * <li><b>Header:</b> See Writer#putHeader() for where header is written; header total size is 073 * HFILEBLOCK_HEADER_SIZE 074 * <ul> 075 * <li>0. blockType: Magic record identifying the {@link BlockType} (8 bytes): e.g. 076 * <code>DATABLK*</code> 077 * <li>1. onDiskSizeWithoutHeader: Compressed -- a.k.a 'on disk' -- block size, excluding header, 078 * but including tailing checksum bytes (4 bytes) 079 * <li>2. uncompressedSizeWithoutHeader: Uncompressed block size, excluding header, and excluding 080 * checksum bytes (4 bytes) 081 * <li>3. prevBlockOffset: The offset of the previous block of the same type (8 bytes). This is used 082 * to navigate to the previous block without having to go to the block index 083 * <li>4: For minorVersions >=1, the ordinal describing checksum type (1 byte) 084 * <li>5: For minorVersions >=1, the number of data bytes/checksum chunk (4 bytes) 085 * <li>6: onDiskDataSizeWithHeader: For minorVersions >=1, the size of data 'on disk', including 086 * header, excluding checksums (4 bytes) 087 * </ul> 088 * </li> 089 * <li><b>Raw/Compressed/Encrypted/Encoded data:</b> The compression algorithm is the same for all 090 * the blocks in an {@link HFile}. If compression is NONE, this is just raw, serialized Cells. 091 * <li><b>Tail:</b> For minorVersions >=1, a series of 4 byte checksums, one each for the number 092 * of bytes specified by bytesPerChecksum. 093 * </ul> 094 * <h3>Caching</h3> Caches cache whole blocks with trailing checksums if any. We then tag on some 095 * metadata, the content of BLOCK_METADATA_SPACE which will be flag on if we are doing 'hbase' 096 * checksums and then the offset into the file which is needed when we re-make a cache key when we 097 * return the block to the cache as 'done'. See {@link Cacheable#serialize(ByteBuffer, boolean)} and 098 * {@link Cacheable#getDeserializer()}. 099 * <p> 100 * TODO: Should we cache the checksums? Down in Writer#getBlockForCaching(CacheConfig) where we make 101 * a block to cache-on-write, there is an attempt at turning off checksums. This is not the only 102 * place we get blocks to cache. We also will cache the raw return from an hdfs read. In this case, 103 * the checksums may be present. If the cache is backed by something that doesn't do ECC, say an 104 * SSD, we might want to preserve checksums. For now this is open question. 105 * <p> 106 * TODO: Over in BucketCache, we save a block allocation by doing a custom serialization. Be sure to 107 * change it if serialization changes in here. Could we add a method here that takes an IOEngine and 108 * that then serializes to it rather than expose our internals over in BucketCache? IOEngine is in 109 * the bucket subpackage. Pull it up? Then this class knows about bucketcache. Ugh. 110 */ 111@InterfaceAudience.Private 112public class HFileBlock implements Cacheable { 113 private static final Logger LOG = LoggerFactory.getLogger(HFileBlock.class); 114 public static final long FIXED_OVERHEAD = ClassSize.estimateBase(HFileBlock.class, false); 115 116 // Block Header fields. 117 118 // TODO: encapsulate Header related logic in this inner class. 119 static class Header { 120 // Format of header is: 121 // 8 bytes - block magic 122 // 4 bytes int - onDiskSizeWithoutHeader 123 // 4 bytes int - uncompressedSizeWithoutHeader 124 // 8 bytes long - prevBlockOffset 125 // The following 3 are only present if header contains checksum information 126 // 1 byte - checksum type 127 // 4 byte int - bytes per checksum 128 // 4 byte int - onDiskDataSizeWithHeader 129 static int BLOCK_MAGIC_INDEX = 0; 130 static int ON_DISK_SIZE_WITHOUT_HEADER_INDEX = 8; 131 static int UNCOMPRESSED_SIZE_WITHOUT_HEADER_INDEX = 12; 132 static int PREV_BLOCK_OFFSET_INDEX = 16; 133 static int CHECKSUM_TYPE_INDEX = 24; 134 static int BYTES_PER_CHECKSUM_INDEX = 25; 135 static int ON_DISK_DATA_SIZE_WITH_HEADER_INDEX = 29; 136 } 137 138 /** Type of block. Header field 0. */ 139 private BlockType blockType; 140 141 /** 142 * Size on disk excluding header, including checksum. Header field 1. 143 * @see Writer#putHeader(byte[], int, int, int, int) 144 */ 145 private int onDiskSizeWithoutHeader; 146 147 /** 148 * Size of pure data. Does not include header or checksums. Header field 2. 149 * @see Writer#putHeader(byte[], int, int, int, int) 150 */ 151 private int uncompressedSizeWithoutHeader; 152 153 /** 154 * The offset of the previous block on disk. Header field 3. 155 * @see Writer#putHeader(byte[], int, int, int, int) 156 */ 157 private long prevBlockOffset; 158 159 /** 160 * Size on disk of header + data. Excludes checksum. Header field 6, OR calculated from 161 * {@link #onDiskSizeWithoutHeader} when using HDFS checksum. 162 * @see Writer#putHeader(byte[], int, int, int, int) 163 */ 164 private int onDiskDataSizeWithHeader; 165 // End of Block Header fields. 166 167 /** 168 * The in-memory representation of the hfile block. Can be on or offheap. Can be backed by a 169 * single ByteBuffer or by many. Make no assumptions. 170 * <p> 171 * Be careful reading from this <code>buf</code>. Duplicate and work on the duplicate or if not, 172 * be sure to reset position and limit else trouble down the road. 173 * <p> 174 * TODO: Make this read-only once made. 175 * <p> 176 * We are using the ByteBuff type. ByteBuffer is not extensible yet we need to be able to have a 177 * ByteBuffer-like API across multiple ByteBuffers reading from a cache such as BucketCache. So, 178 * we have this ByteBuff type. Unfortunately, it is spread all about HFileBlock. Would be good if 179 * could be confined to cache-use only but hard-to-do. 180 */ 181 private ByteBuff buf; 182 183 /** 184 * Meta data that holds meta information on the hfileblock. 185 */ 186 private HFileContext fileContext; 187 188 /** 189 * The offset of this block in the file. Populated by the reader for convenience of access. This 190 * offset is not part of the block header. 191 */ 192 private long offset = UNSET; 193 194 /** 195 * The on-disk size of the next block, including the header and checksums if present. UNSET if 196 * unknown. Blocks try to carry the size of the next block to read in this data member. Usually we 197 * get block sizes from the hfile index but sometimes the index is not available: e.g. when we 198 * read the indexes themselves (indexes are stored in blocks, we do not have an index for the 199 * indexes). Saves seeks especially around file open when there is a flurry of reading in hfile 200 * metadata. 201 */ 202 private int nextBlockOnDiskSize = UNSET; 203 204 private ByteBuffAllocator allocator; 205 206 /** 207 * On a checksum failure, do these many succeeding read requests using hdfs checksums before 208 * auto-reenabling hbase checksum verification. 209 */ 210 static final int CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD = 3; 211 212 private static int UNSET = -1; 213 public static final boolean FILL_HEADER = true; 214 public static final boolean DONT_FILL_HEADER = false; 215 216 // How to get the estimate correctly? if it is a singleBB? 217 public static final int MULTI_BYTE_BUFFER_HEAP_SIZE = 218 (int) ClassSize.estimateBase(MultiByteBuff.class, false); 219 220 /** 221 * Space for metadata on a block that gets stored along with the block when we cache it. There are 222 * a few bytes stuck on the end of the HFileBlock that we pull in from HDFS. 8 bytes are for the 223 * offset of this block (long) in the file. Offset is important because is is used when we remake 224 * the CacheKey when we return block to the cache when done. There is also a flag on whether 225 * checksumming is being done by hbase or not. See class comment for note on uncertain state of 226 * checksumming of blocks that come out of cache (should we or should we not?). Finally there are 227 * 4 bytes to hold the length of the next block which can save a seek on occasion if available. 228 * (This EXTRA info came in with original commit of the bucketcache, HBASE-7404. It was formerly 229 * known as EXTRA_SERIALIZATION_SPACE). 230 */ 231 public static final int BLOCK_METADATA_SPACE = 232 Bytes.SIZEOF_BYTE + Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT; 233 234 /** 235 * Each checksum value is an integer that can be stored in 4 bytes. 236 */ 237 static final int CHECKSUM_SIZE = Bytes.SIZEOF_INT; 238 239 static final byte[] DUMMY_HEADER_NO_CHECKSUM = 240 new byte[HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM]; 241 242 /** 243 * Used deserializing blocks from Cache. <code> 244 * ++++++++++++++ 245 * + HFileBlock + 246 * ++++++++++++++ 247 * + Checksums + <= Optional 248 * ++++++++++++++ 249 * + Metadata! + <= See note on BLOCK_METADATA_SPACE above. 250 * ++++++++++++++ 251 * </code> 252 * @see #serialize(ByteBuffer, boolean) 253 */ 254 public static final CacheableDeserializer<Cacheable> BLOCK_DESERIALIZER = new BlockDeserializer(); 255 256 public static final class BlockDeserializer implements CacheableDeserializer<Cacheable> { 257 private BlockDeserializer() { 258 } 259 260 @Override 261 public HFileBlock deserialize(ByteBuff buf, ByteBuffAllocator alloc) throws IOException { 262 // The buf has the file block followed by block metadata. 263 // Set limit to just before the BLOCK_METADATA_SPACE then rewind. 264 buf.limit(buf.limit() - BLOCK_METADATA_SPACE).rewind(); 265 // Get a new buffer to pass the HFileBlock for it to 'own'. 266 ByteBuff newByteBuff = buf.slice(); 267 // Read out the BLOCK_METADATA_SPACE content and shove into our HFileBlock. 268 buf.position(buf.limit()); 269 buf.limit(buf.limit() + HFileBlock.BLOCK_METADATA_SPACE); 270 boolean usesChecksum = buf.get() == (byte) 1; 271 long offset = buf.getLong(); 272 int nextBlockOnDiskSize = buf.getInt(); 273 return createFromBuff(newByteBuff, usesChecksum, offset, nextBlockOnDiskSize, null, alloc); 274 } 275 276 @Override 277 public int getDeserializerIdentifier() { 278 return DESERIALIZER_IDENTIFIER; 279 } 280 } 281 282 private static final int DESERIALIZER_IDENTIFIER; 283 static { 284 DESERIALIZER_IDENTIFIER = 285 CacheableDeserializerIdManager.registerDeserializer(BLOCK_DESERIALIZER); 286 } 287 288 /** 289 * Creates a new {@link HFile} block from the given fields. This constructor is used only while 290 * writing blocks and caching, and is sitting in a byte buffer and we want to stuff the block into 291 * cache. 292 * <p> 293 * TODO: The caller presumes no checksumming 294 * <p> 295 * TODO: HFile block writer can also off-heap ? 296 * </p> 297 * required of this block instance since going into cache; checksum already verified on underlying 298 * block data pulled in from filesystem. Is that correct? What if cache is SSD? 299 * @param blockType the type of this block, see {@link BlockType} 300 * @param onDiskSizeWithoutHeader see {@link #onDiskSizeWithoutHeader} 301 * @param uncompressedSizeWithoutHeader see {@link #uncompressedSizeWithoutHeader} 302 * @param prevBlockOffset see {@link #prevBlockOffset} 303 * @param buf block buffer with header 304 * ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes) 305 * @param fillHeader when true, write the first 4 header fields into passed 306 * buffer. 307 * @param offset the file offset the block was read from 308 * @param onDiskDataSizeWithHeader see {@link #onDiskDataSizeWithHeader} 309 * @param fileContext HFile meta data 310 */ 311 public HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader, 312 int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuff buf, boolean fillHeader, 313 long offset, int nextBlockOnDiskSize, int onDiskDataSizeWithHeader, HFileContext fileContext, 314 ByteBuffAllocator allocator) { 315 this.blockType = blockType; 316 this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader; 317 this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader; 318 this.prevBlockOffset = prevBlockOffset; 319 this.offset = offset; 320 this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader; 321 this.nextBlockOnDiskSize = nextBlockOnDiskSize; 322 this.fileContext = fileContext; 323 this.allocator = allocator; 324 this.buf = buf; 325 if (fillHeader) { 326 overwriteHeader(); 327 } 328 this.buf.rewind(); 329 } 330 331 /** 332 * Creates a block from an existing buffer starting with a header. Rewinds and takes ownership of 333 * the buffer. By definition of rewind, ignores the buffer position, but if you slice the buffer 334 * beforehand, it will rewind to that point. 335 * @param buf Has header, content, and trailing checksums if present. 336 */ 337 static HFileBlock createFromBuff(ByteBuff buf, boolean usesHBaseChecksum, final long offset, 338 final int nextBlockOnDiskSize, HFileContext fileContext, ByteBuffAllocator allocator) 339 throws IOException { 340 buf.rewind(); 341 final BlockType blockType = BlockType.read(buf); 342 final int onDiskSizeWithoutHeader = buf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX); 343 final int uncompressedSizeWithoutHeader = 344 buf.getInt(Header.UNCOMPRESSED_SIZE_WITHOUT_HEADER_INDEX); 345 final long prevBlockOffset = buf.getLong(Header.PREV_BLOCK_OFFSET_INDEX); 346 // This constructor is called when we deserialize a block from cache and when we read a block in 347 // from the fs. fileCache is null when deserialized from cache so need to make up one. 348 HFileContextBuilder fileContextBuilder = 349 fileContext != null ? new HFileContextBuilder(fileContext) : new HFileContextBuilder(); 350 fileContextBuilder.withHBaseCheckSum(usesHBaseChecksum); 351 int onDiskDataSizeWithHeader; 352 if (usesHBaseChecksum) { 353 byte checksumType = buf.get(Header.CHECKSUM_TYPE_INDEX); 354 int bytesPerChecksum = buf.getInt(Header.BYTES_PER_CHECKSUM_INDEX); 355 onDiskDataSizeWithHeader = buf.getInt(Header.ON_DISK_DATA_SIZE_WITH_HEADER_INDEX); 356 // Use the checksum type and bytes per checksum from header, not from fileContext. 357 fileContextBuilder.withChecksumType(ChecksumType.codeToType(checksumType)); 358 fileContextBuilder.withBytesPerCheckSum(bytesPerChecksum); 359 } else { 360 fileContextBuilder.withChecksumType(ChecksumType.NULL); 361 fileContextBuilder.withBytesPerCheckSum(0); 362 // Need to fix onDiskDataSizeWithHeader; there are not checksums after-block-data 363 onDiskDataSizeWithHeader = onDiskSizeWithoutHeader + headerSize(usesHBaseChecksum); 364 } 365 fileContext = fileContextBuilder.build(); 366 assert usesHBaseChecksum == fileContext.isUseHBaseChecksum(); 367 return new HFileBlockBuilder().withBlockType(blockType) 368 .withOnDiskSizeWithoutHeader(onDiskSizeWithoutHeader) 369 .withUncompressedSizeWithoutHeader(uncompressedSizeWithoutHeader) 370 .withPrevBlockOffset(prevBlockOffset).withOffset(offset) 371 .withOnDiskDataSizeWithHeader(onDiskDataSizeWithHeader) 372 .withNextBlockOnDiskSize(nextBlockOnDiskSize).withHFileContext(fileContext) 373 .withByteBuffAllocator(allocator).withByteBuff(buf.rewind()).withShared(!buf.hasArray()) 374 .build(); 375 } 376 377 /** 378 * Parse total on disk size including header and checksum. 379 * @param headerBuf Header ByteBuffer. Presumed exact size of header. 380 * @param verifyChecksum true if checksum verification is in use. 381 * @return Size of the block with header included. 382 */ 383 private static int getOnDiskSizeWithHeader(final ByteBuff headerBuf, boolean verifyChecksum) { 384 return headerBuf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX) + headerSize(verifyChecksum); 385 } 386 387 /** 388 * @return the on-disk size of the next block (including the header size and any checksums if 389 * present) read by peeking into the next block's header; use as a hint when doing a read 390 * of the next block when scanning or running over a file. 391 */ 392 int getNextBlockOnDiskSize() { 393 return nextBlockOnDiskSize; 394 } 395 396 @Override 397 public BlockType getBlockType() { 398 return blockType; 399 } 400 401 @Override 402 public int refCnt() { 403 return buf.refCnt(); 404 } 405 406 @Override 407 public HFileBlock retain() { 408 buf.retain(); 409 return this; 410 } 411 412 /** 413 * Call {@link ByteBuff#release()} to decrease the reference count, if no other reference, it will 414 * return back the {@link ByteBuffer} to {@link org.apache.hadoop.hbase.io.ByteBuffAllocator} 415 */ 416 @Override 417 public boolean release() { 418 return buf.release(); 419 } 420 421 /** @return get data block encoding id that was used to encode this block */ 422 short getDataBlockEncodingId() { 423 if (blockType != BlockType.ENCODED_DATA) { 424 throw new IllegalArgumentException("Querying encoder ID of a block " + "of type other than " 425 + BlockType.ENCODED_DATA + ": " + blockType); 426 } 427 return buf.getShort(headerSize()); 428 } 429 430 /** 431 * @return the on-disk size of header + data part + checksum. 432 */ 433 public int getOnDiskSizeWithHeader() { 434 return onDiskSizeWithoutHeader + headerSize(); 435 } 436 437 /** 438 * @return the on-disk size of the data part + checksum (header excluded). 439 */ 440 int getOnDiskSizeWithoutHeader() { 441 return onDiskSizeWithoutHeader; 442 } 443 444 /** 445 * @return the uncompressed size of data part (header and checksum excluded). 446 */ 447 int getUncompressedSizeWithoutHeader() { 448 return uncompressedSizeWithoutHeader; 449 } 450 451 /** 452 * @return the offset of the previous block of the same type in the file, or -1 if unknown 453 */ 454 long getPrevBlockOffset() { 455 return prevBlockOffset; 456 } 457 458 /** 459 * Rewinds {@code buf} and writes first 4 header fields. {@code buf} position is modified as 460 * side-effect. 461 */ 462 private void overwriteHeader() { 463 buf.rewind(); 464 blockType.write(buf); 465 buf.putInt(onDiskSizeWithoutHeader); 466 buf.putInt(uncompressedSizeWithoutHeader); 467 buf.putLong(prevBlockOffset); 468 if (this.fileContext.isUseHBaseChecksum()) { 469 buf.put(fileContext.getChecksumType().getCode()); 470 buf.putInt(fileContext.getBytesPerChecksum()); 471 buf.putInt(onDiskDataSizeWithHeader); 472 } 473 } 474 475 /** 476 * Returns a buffer that does not include the header and checksum. 477 * @return the buffer with header skipped and checksum omitted. 478 */ 479 public ByteBuff getBufferWithoutHeader() { 480 return this.getBufferWithoutHeader(false); 481 } 482 483 /** 484 * Returns a buffer that does not include the header or checksum. 485 * @param withChecksum to indicate whether include the checksum or not. 486 * @return the buffer with header skipped and checksum omitted. 487 */ 488 public ByteBuff getBufferWithoutHeader(boolean withChecksum) { 489 ByteBuff dup = getBufferReadOnly(); 490 int delta = withChecksum ? 0 : totalChecksumBytes(); 491 return dup.position(headerSize()).limit(buf.limit() - delta).slice(); 492 } 493 494 /** 495 * Returns a read-only duplicate of the buffer this block stores internally ready to be read. 496 * Clients must not modify the buffer object though they may set position and limit on the 497 * returned buffer since we pass back a duplicate. This method has to be public because it is used 498 * in {@link CompoundBloomFilter} to avoid object creation on every Bloom filter lookup, but has 499 * to be used with caution. Buffer holds header, block content, and any follow-on checksums if 500 * present. 501 * @return the buffer of this block for read-only operations 502 */ 503 public ByteBuff getBufferReadOnly() { 504 // TODO: ByteBuf does not support asReadOnlyBuffer(). Fix. 505 ByteBuff dup = this.buf.duplicate(); 506 assert dup.position() == 0; 507 return dup; 508 } 509 510 public ByteBuffAllocator getByteBuffAllocator() { 511 return this.allocator; 512 } 513 514 private void sanityCheckAssertion(long valueFromBuf, long valueFromField, String fieldName) 515 throws IOException { 516 if (valueFromBuf != valueFromField) { 517 throw new AssertionError(fieldName + " in the buffer (" + valueFromBuf 518 + ") is different from that in the field (" + valueFromField + ")"); 519 } 520 } 521 522 private void sanityCheckAssertion(BlockType valueFromBuf, BlockType valueFromField) 523 throws IOException { 524 if (valueFromBuf != valueFromField) { 525 throw new IOException("Block type stored in the buffer: " + valueFromBuf 526 + ", block type field: " + valueFromField); 527 } 528 } 529 530 /** 531 * Checks if the block is internally consistent, i.e. the first 532 * {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the buffer contain a valid header consistent 533 * with the fields. Assumes a packed block structure. This function is primary for testing and 534 * debugging, and is not thread-safe, because it alters the internal buffer pointer. Used by tests 535 * only. 536 */ 537 void sanityCheck() throws IOException { 538 // Duplicate so no side-effects 539 ByteBuff dup = this.buf.duplicate().rewind(); 540 sanityCheckAssertion(BlockType.read(dup), blockType); 541 542 sanityCheckAssertion(dup.getInt(), onDiskSizeWithoutHeader, "onDiskSizeWithoutHeader"); 543 544 sanityCheckAssertion(dup.getInt(), uncompressedSizeWithoutHeader, 545 "uncompressedSizeWithoutHeader"); 546 547 sanityCheckAssertion(dup.getLong(), prevBlockOffset, "prevBlockOffset"); 548 if (this.fileContext.isUseHBaseChecksum()) { 549 sanityCheckAssertion(dup.get(), this.fileContext.getChecksumType().getCode(), "checksumType"); 550 sanityCheckAssertion(dup.getInt(), this.fileContext.getBytesPerChecksum(), 551 "bytesPerChecksum"); 552 sanityCheckAssertion(dup.getInt(), onDiskDataSizeWithHeader, "onDiskDataSizeWithHeader"); 553 } 554 555 int cksumBytes = totalChecksumBytes(); 556 int expectedBufLimit = onDiskDataSizeWithHeader + cksumBytes; 557 if (dup.limit() != expectedBufLimit) { 558 throw new AssertionError("Expected limit " + expectedBufLimit + ", got " + dup.limit()); 559 } 560 561 // We might optionally allocate HFILEBLOCK_HEADER_SIZE more bytes to read the next 562 // block's header, so there are two sensible values for buffer capacity. 563 int hdrSize = headerSize(); 564 dup.rewind(); 565 if (dup.remaining() != expectedBufLimit && dup.remaining() != expectedBufLimit + hdrSize) { 566 throw new AssertionError("Invalid buffer capacity: " + dup.remaining() + ", expected " 567 + expectedBufLimit + " or " + (expectedBufLimit + hdrSize)); 568 } 569 } 570 571 @Override 572 public String toString() { 573 StringBuilder sb = new StringBuilder().append("[").append("blockType=").append(blockType) 574 .append(", fileOffset=").append(offset).append(", headerSize=").append(headerSize()) 575 .append(", onDiskSizeWithoutHeader=").append(onDiskSizeWithoutHeader) 576 .append(", uncompressedSizeWithoutHeader=").append(uncompressedSizeWithoutHeader) 577 .append(", prevBlockOffset=").append(prevBlockOffset).append(", isUseHBaseChecksum=") 578 .append(fileContext.isUseHBaseChecksum()); 579 if (fileContext.isUseHBaseChecksum()) { 580 sb.append(", checksumType=").append(ChecksumType.codeToType(this.buf.get(24))) 581 .append(", bytesPerChecksum=").append(this.buf.getInt(24 + 1)) 582 .append(", onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader); 583 } else { 584 sb.append(", onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader).append("(") 585 .append(onDiskSizeWithoutHeader).append("+") 586 .append(HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM).append(")"); 587 } 588 String dataBegin; 589 if (buf.hasArray()) { 590 dataBegin = Bytes.toStringBinary(buf.array(), buf.arrayOffset() + headerSize(), 591 Math.min(32, buf.limit() - buf.arrayOffset() - headerSize())); 592 } else { 593 ByteBuff bufWithoutHeader = getBufferWithoutHeader(); 594 byte[] dataBeginBytes = 595 new byte[Math.min(32, bufWithoutHeader.limit() - bufWithoutHeader.position())]; 596 bufWithoutHeader.get(dataBeginBytes); 597 dataBegin = Bytes.toStringBinary(dataBeginBytes); 598 } 599 sb.append(", getOnDiskSizeWithHeader=").append(getOnDiskSizeWithHeader()) 600 .append(", totalChecksumBytes=").append(totalChecksumBytes()).append(", isUnpacked=") 601 .append(isUnpacked()).append(", buf=[").append(buf).append("]").append(", dataBeginsWith=") 602 .append(dataBegin).append(", fileContext=").append(fileContext) 603 .append(", nextBlockOnDiskSize=").append(nextBlockOnDiskSize).append("]"); 604 return sb.toString(); 605 } 606 607 /** 608 * Retrieves the decompressed/decrypted view of this block. An encoded block remains in its 609 * encoded structure. Internal structures are shared between instances where applicable. 610 */ 611 HFileBlock unpack(HFileContext fileContext, FSReader reader) throws IOException { 612 if (!fileContext.isCompressedOrEncrypted()) { 613 // TODO: cannot use our own fileContext here because HFileBlock(ByteBuffer, boolean), 614 // which is used for block serialization to L2 cache, does not preserve encoding and 615 // encryption details. 616 return this; 617 } 618 619 HFileBlock unpacked = shallowClone(this); 620 unpacked.allocateBuffer(); // allocates space for the decompressed block 621 boolean succ = false; 622 try { 623 HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA 624 ? reader.getBlockDecodingContext() 625 : reader.getDefaultBlockDecodingContext(); 626 // Create a duplicated buffer without the header part. 627 ByteBuff dup = this.buf.duplicate(); 628 dup.position(this.headerSize()); 629 dup = dup.slice(); 630 // Decode the dup into unpacked#buf 631 ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(), 632 unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(true), dup); 633 succ = true; 634 return unpacked; 635 } finally { 636 if (!succ) { 637 unpacked.release(); 638 } 639 } 640 } 641 642 /** 643 * Always allocates a new buffer of the correct size. Copies header bytes from the existing 644 * buffer. Does not change header fields. Reserve room to keep checksum bytes too. 645 */ 646 private void allocateBuffer() { 647 int cksumBytes = totalChecksumBytes(); 648 int headerSize = headerSize(); 649 int capacityNeeded = headerSize + uncompressedSizeWithoutHeader + cksumBytes; 650 651 ByteBuff newBuf = allocator.allocate(capacityNeeded); 652 653 // Copy header bytes into newBuf. 654 buf.position(0); 655 newBuf.put(0, buf, 0, headerSize); 656 657 buf = newBuf; 658 // set limit to exclude next block's header 659 buf.limit(capacityNeeded); 660 } 661 662 /** 663 * Return true when this block's buffer has been unpacked, false otherwise. Note this is a 664 * calculated heuristic, not tracked attribute of the block. 665 */ 666 public boolean isUnpacked() { 667 final int cksumBytes = totalChecksumBytes(); 668 final int headerSize = headerSize(); 669 final int expectedCapacity = headerSize + uncompressedSizeWithoutHeader + cksumBytes; 670 final int bufCapacity = buf.remaining(); 671 return bufCapacity == expectedCapacity || bufCapacity == expectedCapacity + headerSize; 672 } 673 674 /** 675 * Cannot be {@link #UNSET}. Must be a legitimate value. Used re-making the {@link BlockCacheKey} 676 * when block is returned to the cache. 677 * @return the offset of this block in the file it was read from 678 */ 679 long getOffset() { 680 if (offset < 0) { 681 throw new IllegalStateException("HFile block offset not initialized properly"); 682 } 683 return offset; 684 } 685 686 /** 687 * @return a byte stream reading the data + checksum of this block 688 */ 689 DataInputStream getByteStream() { 690 ByteBuff dup = this.buf.duplicate(); 691 dup.position(this.headerSize()); 692 return new DataInputStream(new ByteBuffInputStream(dup)); 693 } 694 695 @Override 696 public long heapSize() { 697 long size = FIXED_OVERHEAD; 698 size += fileContext.heapSize(); 699 if (buf != null) { 700 // Deep overhead of the byte buffer. Needs to be aligned separately. 701 size += ClassSize.align(buf.capacity() + MULTI_BYTE_BUFFER_HEAP_SIZE); 702 } 703 return ClassSize.align(size); 704 } 705 706 /** 707 * Will be override by {@link SharedMemHFileBlock} or {@link ExclusiveMemHFileBlock}. Return true 708 * by default. 709 */ 710 public boolean isSharedMem() { 711 if (this instanceof SharedMemHFileBlock) { 712 return true; 713 } else if (this instanceof ExclusiveMemHFileBlock) { 714 return false; 715 } 716 return true; 717 } 718 719 /** 720 * Unified version 2 {@link HFile} block writer. The intended usage pattern is as follows: 721 * <ol> 722 * <li>Construct an {@link HFileBlock.Writer}, providing a compression algorithm. 723 * <li>Call {@link Writer#startWriting} and get a data stream to write to. 724 * <li>Write your data into the stream. 725 * <li>Call Writer#writeHeaderAndData(FSDataOutputStream) as many times as you need to. store the 726 * serialized block into an external stream. 727 * <li>Repeat to write more blocks. 728 * </ol> 729 * <p> 730 */ 731 static class Writer implements ShipperListener { 732 private enum State { 733 INIT, 734 WRITING, 735 BLOCK_READY 736 } 737 738 /** Writer state. Used to ensure the correct usage protocol. */ 739 private State state = State.INIT; 740 741 /** Data block encoder used for data blocks */ 742 private final HFileDataBlockEncoder dataBlockEncoder; 743 744 private HFileBlockEncodingContext dataBlockEncodingCtx; 745 746 /** block encoding context for non-data blocks */ 747 private HFileBlockDefaultEncodingContext defaultBlockEncodingCtx; 748 749 /** 750 * The stream we use to accumulate data into a block in an uncompressed format. We reset this 751 * stream at the end of each block and reuse it. The header is written as the first 752 * {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes into this stream. 753 */ 754 private ByteArrayOutputStream baosInMemory; 755 756 /** 757 * Current block type. Set in {@link #startWriting(BlockType)}. Could be changed in 758 * {@link #finishBlock()} from {@link BlockType#DATA} to {@link BlockType#ENCODED_DATA}. 759 */ 760 private BlockType blockType; 761 762 /** 763 * A stream that we write uncompressed bytes to, which compresses them and writes them to 764 * {@link #baosInMemory}. 765 */ 766 private DataOutputStream userDataStream; 767 768 /** 769 * Bytes to be written to the file system, including the header. Compressed if compression is 770 * turned on. It also includes the checksum data that immediately follows the block data. 771 * (header + data + checksums) 772 */ 773 private ByteArrayOutputStream onDiskBlockBytesWithHeader; 774 775 /** 776 * The size of the checksum data on disk. It is used only if data is not compressed. If data is 777 * compressed, then the checksums are already part of onDiskBytesWithHeader. If data is 778 * uncompressed, then this variable stores the checksum data for this block. 779 */ 780 private byte[] onDiskChecksum = HConstants.EMPTY_BYTE_ARRAY; 781 782 /** 783 * Current block's start offset in the {@link HFile}. Set in 784 * {@link #writeHeaderAndData(FSDataOutputStream)}. 785 */ 786 private long startOffset; 787 788 /** 789 * Offset of previous block by block type. Updated when the next block is started. 790 */ 791 private long[] prevOffsetByType; 792 793 /** The offset of the previous block of the same type */ 794 private long prevOffset; 795 /** Meta data that holds information about the hfileblock **/ 796 private HFileContext fileContext; 797 798 private final ByteBuffAllocator allocator; 799 800 @Override 801 public void beforeShipped() { 802 if (getEncodingState() != null) { 803 getEncodingState().beforeShipped(); 804 } 805 } 806 807 EncodingState getEncodingState() { 808 return dataBlockEncodingCtx.getEncodingState(); 809 } 810 811 /** 812 * @param dataBlockEncoder data block encoding algorithm to use 813 */ 814 public Writer(Configuration conf, HFileDataBlockEncoder dataBlockEncoder, 815 HFileContext fileContext) { 816 this(conf, dataBlockEncoder, fileContext, ByteBuffAllocator.HEAP); 817 } 818 819 public Writer(Configuration conf, HFileDataBlockEncoder dataBlockEncoder, 820 HFileContext fileContext, ByteBuffAllocator allocator) { 821 if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) { 822 throw new RuntimeException("Unsupported value of bytesPerChecksum. " + " Minimum is " 823 + HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is " 824 + fileContext.getBytesPerChecksum()); 825 } 826 this.allocator = allocator; 827 this.dataBlockEncoder = 828 dataBlockEncoder != null ? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE; 829 this.dataBlockEncodingCtx = this.dataBlockEncoder.newDataBlockEncodingContext(conf, 830 HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext); 831 // TODO: This should be lazily instantiated 832 this.defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(conf, null, 833 HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext); 834 // TODO: Set BAOS initial size. Use fileContext.getBlocksize() and add for header/checksum 835 baosInMemory = new ByteArrayOutputStream(); 836 prevOffsetByType = new long[BlockType.values().length]; 837 for (int i = 0; i < prevOffsetByType.length; ++i) { 838 prevOffsetByType[i] = UNSET; 839 } 840 // TODO: Why fileContext saved away when we have dataBlockEncoder and/or 841 // defaultDataBlockEncoder? 842 this.fileContext = fileContext; 843 } 844 845 /** 846 * Starts writing into the block. The previous block's data is discarded. 847 * @return the stream the user can write their data into 848 */ 849 DataOutputStream startWriting(BlockType newBlockType) throws IOException { 850 if (state == State.BLOCK_READY && startOffset != -1) { 851 // We had a previous block that was written to a stream at a specific 852 // offset. Save that offset as the last offset of a block of that type. 853 prevOffsetByType[blockType.getId()] = startOffset; 854 } 855 856 startOffset = -1; 857 blockType = newBlockType; 858 859 baosInMemory.reset(); 860 baosInMemory.write(HConstants.HFILEBLOCK_DUMMY_HEADER); 861 862 state = State.WRITING; 863 864 // We will compress it later in finishBlock() 865 userDataStream = new ByteBufferWriterDataOutputStream(baosInMemory); 866 if (newBlockType == BlockType.DATA) { 867 this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream); 868 } 869 return userDataStream; 870 } 871 872 /** 873 * Writes the Cell to this block 874 */ 875 void write(Cell cell) throws IOException { 876 expectState(State.WRITING); 877 this.dataBlockEncoder.encode(cell, dataBlockEncodingCtx, this.userDataStream); 878 } 879 880 /** 881 * Transitions the block writer from the "writing" state to the "block ready" state. Does 882 * nothing if a block is already finished. 883 */ 884 void ensureBlockReady() throws IOException { 885 Preconditions.checkState(state != State.INIT, "Unexpected state: " + state); 886 887 if (state == State.BLOCK_READY) { 888 return; 889 } 890 891 // This will set state to BLOCK_READY. 892 finishBlock(); 893 } 894 895 /** 896 * Finish up writing of the block. Flushes the compressing stream (if using compression), fills 897 * out the header, does any compression/encryption of bytes to flush out to disk, and manages 898 * the cache on write content, if applicable. Sets block write state to "block ready". 899 */ 900 private void finishBlock() throws IOException { 901 if (blockType == BlockType.DATA) { 902 this.dataBlockEncoder.endBlockEncoding(dataBlockEncodingCtx, userDataStream, 903 baosInMemory.getBuffer(), blockType); 904 blockType = dataBlockEncodingCtx.getBlockType(); 905 } 906 userDataStream.flush(); 907 prevOffset = prevOffsetByType[blockType.getId()]; 908 909 // We need to set state before we can package the block up for cache-on-write. In a way, the 910 // block is ready, but not yet encoded or compressed. 911 state = State.BLOCK_READY; 912 Bytes compressAndEncryptDat; 913 if (blockType == BlockType.DATA || blockType == BlockType.ENCODED_DATA) { 914 compressAndEncryptDat = 915 dataBlockEncodingCtx.compressAndEncrypt(baosInMemory.getBuffer(), 0, baosInMemory.size()); 916 } else { 917 compressAndEncryptDat = defaultBlockEncodingCtx.compressAndEncrypt(baosInMemory.getBuffer(), 918 0, baosInMemory.size()); 919 } 920 if (compressAndEncryptDat == null) { 921 compressAndEncryptDat = new Bytes(baosInMemory.getBuffer(), 0, baosInMemory.size()); 922 } 923 if (onDiskBlockBytesWithHeader == null) { 924 onDiskBlockBytesWithHeader = new ByteArrayOutputStream(compressAndEncryptDat.getLength()); 925 } 926 onDiskBlockBytesWithHeader.reset(); 927 onDiskBlockBytesWithHeader.write(compressAndEncryptDat.get(), 928 compressAndEncryptDat.getOffset(), compressAndEncryptDat.getLength()); 929 // Calculate how many bytes we need for checksum on the tail of the block. 930 int numBytes = (int) ChecksumUtil.numBytes(onDiskBlockBytesWithHeader.size(), 931 fileContext.getBytesPerChecksum()); 932 933 // Put the header for the on disk bytes; header currently is unfilled-out 934 putHeader(onDiskBlockBytesWithHeader, onDiskBlockBytesWithHeader.size() + numBytes, 935 baosInMemory.size(), onDiskBlockBytesWithHeader.size()); 936 if (onDiskChecksum.length != numBytes) { 937 onDiskChecksum = new byte[numBytes]; 938 } 939 ChecksumUtil.generateChecksums(onDiskBlockBytesWithHeader.getBuffer(), 0, 940 onDiskBlockBytesWithHeader.size(), onDiskChecksum, 0, fileContext.getChecksumType(), 941 fileContext.getBytesPerChecksum()); 942 } 943 944 /** 945 * Put the header into the given byte array at the given offset. 946 * @param onDiskSize size of the block on disk header + data + checksum 947 * @param uncompressedSize size of the block after decompression (but before optional data block 948 * decoding) including header 949 * @param onDiskDataSize size of the block on disk with header and data but not including the 950 * checksums 951 */ 952 private void putHeader(byte[] dest, int offset, int onDiskSize, int uncompressedSize, 953 int onDiskDataSize) { 954 offset = blockType.put(dest, offset); 955 offset = Bytes.putInt(dest, offset, onDiskSize - HConstants.HFILEBLOCK_HEADER_SIZE); 956 offset = Bytes.putInt(dest, offset, uncompressedSize - HConstants.HFILEBLOCK_HEADER_SIZE); 957 offset = Bytes.putLong(dest, offset, prevOffset); 958 offset = Bytes.putByte(dest, offset, fileContext.getChecksumType().getCode()); 959 offset = Bytes.putInt(dest, offset, fileContext.getBytesPerChecksum()); 960 Bytes.putInt(dest, offset, onDiskDataSize); 961 } 962 963 private void putHeader(ByteBuff buff, int onDiskSize, int uncompressedSize, 964 int onDiskDataSize) { 965 buff.rewind(); 966 blockType.write(buff); 967 buff.putInt(onDiskSize - HConstants.HFILEBLOCK_HEADER_SIZE); 968 buff.putInt(uncompressedSize - HConstants.HFILEBLOCK_HEADER_SIZE); 969 buff.putLong(prevOffset); 970 buff.put(fileContext.getChecksumType().getCode()); 971 buff.putInt(fileContext.getBytesPerChecksum()); 972 buff.putInt(onDiskDataSize); 973 } 974 975 private void putHeader(ByteArrayOutputStream dest, int onDiskSize, int uncompressedSize, 976 int onDiskDataSize) { 977 putHeader(dest.getBuffer(), 0, onDiskSize, uncompressedSize, onDiskDataSize); 978 } 979 980 /** 981 * Similar to {@link #writeHeaderAndData(FSDataOutputStream)}, but records the offset of this 982 * block so that it can be referenced in the next block of the same type. 983 */ 984 void writeHeaderAndData(FSDataOutputStream out) throws IOException { 985 long offset = out.getPos(); 986 if (startOffset != UNSET && offset != startOffset) { 987 throw new IOException("A " + blockType + " block written to a " 988 + "stream twice, first at offset " + startOffset + ", then at " + offset); 989 } 990 startOffset = offset; 991 finishBlockAndWriteHeaderAndData(out); 992 } 993 994 /** 995 * Writes the header and the compressed data of this block (or uncompressed data when not using 996 * compression) into the given stream. Can be called in the "writing" state or in the "block 997 * ready" state. If called in the "writing" state, transitions the writer to the "block ready" 998 * state. 999 * @param out the output stream to write the 1000 */ 1001 protected void finishBlockAndWriteHeaderAndData(DataOutputStream out) throws IOException { 1002 ensureBlockReady(); 1003 long startTime = EnvironmentEdgeManager.currentTime(); 1004 out.write(onDiskBlockBytesWithHeader.getBuffer(), 0, onDiskBlockBytesWithHeader.size()); 1005 out.write(onDiskChecksum); 1006 HFile.updateWriteLatency(EnvironmentEdgeManager.currentTime() - startTime); 1007 } 1008 1009 /** 1010 * Returns the header or the compressed data (or uncompressed data when not using compression) 1011 * as a byte array. Can be called in the "writing" state or in the "block ready" state. If 1012 * called in the "writing" state, transitions the writer to the "block ready" state. This 1013 * returns the header + data + checksums stored on disk. 1014 * @return header and data as they would be stored on disk in a byte array 1015 */ 1016 byte[] getHeaderAndDataForTest() throws IOException { 1017 ensureBlockReady(); 1018 // This is not very optimal, because we are doing an extra copy. 1019 // But this method is used only by unit tests. 1020 byte[] output = new byte[onDiskBlockBytesWithHeader.size() + onDiskChecksum.length]; 1021 System.arraycopy(onDiskBlockBytesWithHeader.getBuffer(), 0, output, 0, 1022 onDiskBlockBytesWithHeader.size()); 1023 System.arraycopy(onDiskChecksum, 0, output, onDiskBlockBytesWithHeader.size(), 1024 onDiskChecksum.length); 1025 return output; 1026 } 1027 1028 /** 1029 * Releases resources used by this writer. 1030 */ 1031 void release() { 1032 if (dataBlockEncodingCtx != null) { 1033 dataBlockEncodingCtx.close(); 1034 dataBlockEncodingCtx = null; 1035 } 1036 if (defaultBlockEncodingCtx != null) { 1037 defaultBlockEncodingCtx.close(); 1038 defaultBlockEncodingCtx = null; 1039 } 1040 } 1041 1042 /** 1043 * Returns the on-disk size of the data portion of the block. This is the compressed size if 1044 * compression is enabled. Can only be called in the "block ready" state. Header is not 1045 * compressed, and its size is not included in the return value. 1046 * @return the on-disk size of the block, not including the header. 1047 */ 1048 int getOnDiskSizeWithoutHeader() { 1049 expectState(State.BLOCK_READY); 1050 return onDiskBlockBytesWithHeader.size() + onDiskChecksum.length 1051 - HConstants.HFILEBLOCK_HEADER_SIZE; 1052 } 1053 1054 /** 1055 * Returns the on-disk size of the block. Can only be called in the "block ready" state. 1056 * @return the on-disk size of the block ready to be written, including the header size, the 1057 * data and the checksum data. 1058 */ 1059 int getOnDiskSizeWithHeader() { 1060 expectState(State.BLOCK_READY); 1061 return onDiskBlockBytesWithHeader.size() + onDiskChecksum.length; 1062 } 1063 1064 /** 1065 * The uncompressed size of the block data. Does not include header size. 1066 */ 1067 int getUncompressedSizeWithoutHeader() { 1068 expectState(State.BLOCK_READY); 1069 return baosInMemory.size() - HConstants.HFILEBLOCK_HEADER_SIZE; 1070 } 1071 1072 /** 1073 * The uncompressed size of the block data, including header size. 1074 */ 1075 int getUncompressedSizeWithHeader() { 1076 expectState(State.BLOCK_READY); 1077 return baosInMemory.size(); 1078 } 1079 1080 /** @return true if a block is being written */ 1081 boolean isWriting() { 1082 return state == State.WRITING; 1083 } 1084 1085 /** 1086 * Returns the number of bytes written into the current block so far, or zero if not writing the 1087 * block at the moment. Note that this will return zero in the "block ready" state as well. 1088 * @return the number of bytes written 1089 */ 1090 public int encodedBlockSizeWritten() { 1091 return state != State.WRITING ? 0 : this.getEncodingState().getEncodedDataSizeWritten(); 1092 } 1093 1094 /** 1095 * Returns the number of bytes written into the current block so far, or zero if not writing the 1096 * block at the moment. Note that this will return zero in the "block ready" state as well. 1097 * @return the number of bytes written 1098 */ 1099 int blockSizeWritten() { 1100 return state != State.WRITING ? 0 : this.getEncodingState().getUnencodedDataSizeWritten(); 1101 } 1102 1103 /** 1104 * Clones the header followed by the uncompressed data, even if using compression. This is 1105 * needed for storing uncompressed blocks in the block cache. Can be called in the "writing" 1106 * state or the "block ready" state. Returns only the header and data, does not include checksum 1107 * data. 1108 * @return Returns an uncompressed block ByteBuff for caching on write 1109 */ 1110 ByteBuff cloneUncompressedBufferWithHeader() { 1111 expectState(State.BLOCK_READY); 1112 ByteBuff bytebuff = allocator.allocate(baosInMemory.size()); 1113 baosInMemory.toByteBuff(bytebuff); 1114 int numBytes = (int) ChecksumUtil.numBytes(onDiskBlockBytesWithHeader.size(), 1115 fileContext.getBytesPerChecksum()); 1116 putHeader(bytebuff, onDiskBlockBytesWithHeader.size() + numBytes, baosInMemory.size(), 1117 onDiskBlockBytesWithHeader.size()); 1118 bytebuff.rewind(); 1119 return bytebuff; 1120 } 1121 1122 /** 1123 * Clones the header followed by the on-disk (compressed/encoded/encrypted) data. This is needed 1124 * for storing packed blocks in the block cache. Returns only the header and data, Does not 1125 * include checksum data. 1126 * @return Returns a copy of block bytes for caching on write 1127 */ 1128 private ByteBuff cloneOnDiskBufferWithHeader() { 1129 expectState(State.BLOCK_READY); 1130 ByteBuff bytebuff = allocator.allocate(onDiskBlockBytesWithHeader.size()); 1131 onDiskBlockBytesWithHeader.toByteBuff(bytebuff); 1132 bytebuff.rewind(); 1133 return bytebuff; 1134 } 1135 1136 private void expectState(State expectedState) { 1137 if (state != expectedState) { 1138 throw new IllegalStateException( 1139 "Expected state: " + expectedState + ", actual state: " + state); 1140 } 1141 } 1142 1143 /** 1144 * Takes the given {@link BlockWritable} instance, creates a new block of its appropriate type, 1145 * writes the writable into this block, and flushes the block into the output stream. The writer 1146 * is instructed not to buffer uncompressed bytes for cache-on-write. 1147 * @param bw the block-writable object to write as a block 1148 * @param out the file system output stream 1149 */ 1150 void writeBlock(BlockWritable bw, FSDataOutputStream out) throws IOException { 1151 bw.writeToBlock(startWriting(bw.getBlockType())); 1152 writeHeaderAndData(out); 1153 } 1154 1155 /** 1156 * Creates a new HFileBlock. Checksums have already been validated, so the byte buffer passed 1157 * into the constructor of this newly created block does not have checksum data even though the 1158 * header minor version is MINOR_VERSION_WITH_CHECKSUM. This is indicated by setting a 0 value 1159 * in bytesPerChecksum. This method copies the on-disk or uncompressed data to build the 1160 * HFileBlock which is used only while writing blocks and caching. 1161 * <p> 1162 * TODO: Should there be an option where a cache can ask that hbase preserve block checksums for 1163 * checking after a block comes out of the cache? Otehrwise, cache is responsible for blocks 1164 * being wholesome (ECC memory or if file-backed, it does checksumming). 1165 */ 1166 HFileBlock getBlockForCaching(CacheConfig cacheConf) { 1167 HFileContext newContext = new HFileContextBuilder().withBlockSize(fileContext.getBlocksize()) 1168 .withBytesPerCheckSum(0).withChecksumType(ChecksumType.NULL) // no checksums in cached data 1169 .withCompression(fileContext.getCompression()) 1170 .withDataBlockEncoding(fileContext.getDataBlockEncoding()) 1171 .withHBaseCheckSum(fileContext.isUseHBaseChecksum()) 1172 .withCompressTags(fileContext.isCompressTags()) 1173 .withIncludesMvcc(fileContext.isIncludesMvcc()) 1174 .withIncludesTags(fileContext.isIncludesTags()) 1175 .withColumnFamily(fileContext.getColumnFamily()).withTableName(fileContext.getTableName()) 1176 .build(); 1177 // Build the HFileBlock. 1178 HFileBlockBuilder builder = new HFileBlockBuilder(); 1179 ByteBuff buff; 1180 if (cacheConf.shouldCacheCompressed(blockType.getCategory())) { 1181 buff = cloneOnDiskBufferWithHeader(); 1182 } else { 1183 buff = cloneUncompressedBufferWithHeader(); 1184 } 1185 return builder.withBlockType(blockType) 1186 .withOnDiskSizeWithoutHeader(getOnDiskSizeWithoutHeader()) 1187 .withUncompressedSizeWithoutHeader(getUncompressedSizeWithoutHeader()) 1188 .withPrevBlockOffset(prevOffset).withByteBuff(buff).withFillHeader(FILL_HEADER) 1189 .withOffset(startOffset).withNextBlockOnDiskSize(UNSET) 1190 .withOnDiskDataSizeWithHeader(onDiskBlockBytesWithHeader.size() + onDiskChecksum.length) 1191 .withHFileContext(newContext).withByteBuffAllocator(cacheConf.getByteBuffAllocator()) 1192 .withShared(!buff.hasArray()).build(); 1193 } 1194 } 1195 1196 /** Something that can be written into a block. */ 1197 interface BlockWritable { 1198 /** The type of block this data should use. */ 1199 BlockType getBlockType(); 1200 1201 /** 1202 * Writes the block to the provided stream. Must not write any magic records. 1203 * @param out a stream to write uncompressed data into 1204 */ 1205 void writeToBlock(DataOutput out) throws IOException; 1206 } 1207 1208 /** 1209 * Iterator for reading {@link HFileBlock}s in load-on-open-section, such as root data index 1210 * block, meta index block, file info block etc. 1211 */ 1212 interface BlockIterator { 1213 /** 1214 * Get the next block, or null if there are no more blocks to iterate. 1215 */ 1216 HFileBlock nextBlock() throws IOException; 1217 1218 /** 1219 * Similar to {@link #nextBlock()} but checks block type, throws an exception if incorrect, and 1220 * returns the HFile block 1221 */ 1222 HFileBlock nextBlockWithBlockType(BlockType blockType) throws IOException; 1223 1224 /** 1225 * Now we use the {@link ByteBuffAllocator} to manage the nio ByteBuffers for HFileBlocks, so we 1226 * must deallocate all of the ByteBuffers in the end life. the BlockIterator's life cycle is 1227 * starting from opening an HFileReader and stopped when the HFileReader#close, so we will keep 1228 * track all the read blocks until we call {@link BlockIterator#freeBlocks()} when closing the 1229 * HFileReader. Sum bytes of those blocks in load-on-open section should be quite small, so 1230 * tracking them should be OK. 1231 */ 1232 void freeBlocks(); 1233 } 1234 1235 /** An HFile block reader with iteration ability. */ 1236 interface FSReader { 1237 /** 1238 * Reads the block at the given offset in the file with the given on-disk size and uncompressed 1239 * size. 1240 * @param offset of the file to read 1241 * @param onDiskSize the on-disk size of the entire block, including all applicable headers, 1242 * or -1 if unknown 1243 * @param pread true to use pread, otherwise use the stream read. 1244 * @param updateMetrics update the metrics or not. 1245 * @param intoHeap allocate the block's ByteBuff by {@link ByteBuffAllocator} or JVM heap. 1246 * For LRUBlockCache, we must ensure that the block to cache is an heap 1247 * one, because the memory occupation is based on heap now, also for 1248 * {@link CombinedBlockCache}, we use the heap LRUBlockCache as L1 cache to 1249 * cache small blocks such as IndexBlock or MetaBlock for faster access. So 1250 * introduce an flag here to decide whether allocate from JVM heap or not 1251 * so that we can avoid an extra off-heap to heap memory copy when using 1252 * LRUBlockCache. For most cases, we known what's the expected block type 1253 * we'll read, while for some special case (Example: 1254 * HFileReaderImpl#readNextDataBlock()), we cannot pre-decide what's the 1255 * expected block type, then we can only allocate block's ByteBuff from 1256 * {@link ByteBuffAllocator} firstly, and then when caching it in 1257 * {@link LruBlockCache} we'll check whether the ByteBuff is from heap or 1258 * not, if not then we'll clone it to an heap one and cache it. 1259 * @return the newly read block 1260 */ 1261 HFileBlock readBlockData(long offset, long onDiskSize, boolean pread, boolean updateMetrics, 1262 boolean intoHeap) throws IOException; 1263 1264 /** 1265 * Creates a block iterator over the given portion of the {@link HFile}. The iterator returns 1266 * blocks starting with offset such that offset <= startOffset < endOffset. Returned 1267 * blocks are always unpacked. Used when no hfile index available; e.g. reading in the hfile 1268 * index blocks themselves on file open. 1269 * @param startOffset the offset of the block to start iteration with 1270 * @param endOffset the offset to end iteration at (exclusive) 1271 * @return an iterator of blocks between the two given offsets 1272 */ 1273 BlockIterator blockRange(long startOffset, long endOffset); 1274 1275 /** Closes the backing streams */ 1276 void closeStreams() throws IOException; 1277 1278 /** Get a decoder for {@link BlockType#ENCODED_DATA} blocks from this file. */ 1279 HFileBlockDecodingContext getBlockDecodingContext(); 1280 1281 /** Get the default decoder for blocks from this file. */ 1282 HFileBlockDecodingContext getDefaultBlockDecodingContext(); 1283 1284 void setIncludesMemStoreTS(boolean includesMemstoreTS); 1285 1286 void setDataBlockEncoder(HFileDataBlockEncoder encoder, Configuration conf); 1287 1288 /** 1289 * To close the stream's socket. Note: This can be concurrently called from multiple threads and 1290 * implementation should take care of thread safety. 1291 */ 1292 void unbufferStream(); 1293 } 1294 1295 /** 1296 * Data-structure to use caching the header of the NEXT block. Only works if next read that comes 1297 * in here is next in sequence in this block. When we read, we read current block and the next 1298 * blocks' header. We do this so we have the length of the next block to read if the hfile index 1299 * is not available (rare, at hfile open only). 1300 */ 1301 private static class PrefetchedHeader { 1302 long offset = -1; 1303 byte[] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE]; 1304 final ByteBuff buf = new SingleByteBuff(ByteBuffer.wrap(header, 0, header.length)); 1305 1306 @Override 1307 public String toString() { 1308 return "offset=" + this.offset + ", header=" + Bytes.toStringBinary(header); 1309 } 1310 } 1311 1312 /** 1313 * Reads version 2 HFile blocks from the filesystem. 1314 */ 1315 static class FSReaderImpl implements FSReader { 1316 /** 1317 * The file system stream of the underlying {@link HFile} that does or doesn't do checksum 1318 * validations in the filesystem 1319 */ 1320 private FSDataInputStreamWrapper streamWrapper; 1321 1322 private HFileBlockDecodingContext encodedBlockDecodingCtx; 1323 1324 /** Default context used when BlockType != {@link BlockType#ENCODED_DATA}. */ 1325 private final HFileBlockDefaultDecodingContext defaultDecodingCtx; 1326 1327 /** 1328 * Cache of the NEXT header after this. Check it is indeed next blocks header before using it. 1329 * TODO: Review. This overread into next block to fetch next blocks header seems unnecessary 1330 * given we usually get the block size from the hfile index. Review! 1331 */ 1332 private AtomicReference<PrefetchedHeader> prefetchedHeader = 1333 new AtomicReference<>(new PrefetchedHeader()); 1334 1335 /** The size of the file we are reading from, or -1 if unknown. */ 1336 private long fileSize; 1337 1338 /** The size of the header */ 1339 protected final int hdrSize; 1340 1341 /** The filesystem used to access data */ 1342 private HFileSystem hfs; 1343 1344 private HFileContext fileContext; 1345 // Cache the fileName 1346 private String pathName; 1347 1348 private final ByteBuffAllocator allocator; 1349 1350 private final Lock streamLock = new ReentrantLock(); 1351 1352 private final boolean isPreadAllBytes; 1353 1354 FSReaderImpl(ReaderContext readerContext, HFileContext fileContext, ByteBuffAllocator allocator, 1355 Configuration conf) throws IOException { 1356 this.fileSize = readerContext.getFileSize(); 1357 this.hfs = readerContext.getFileSystem(); 1358 if (readerContext.getFilePath() != null) { 1359 this.pathName = readerContext.getFilePath().toString(); 1360 } 1361 this.fileContext = fileContext; 1362 this.hdrSize = headerSize(fileContext.isUseHBaseChecksum()); 1363 this.allocator = allocator; 1364 1365 this.streamWrapper = readerContext.getInputStreamWrapper(); 1366 // Older versions of HBase didn't support checksum. 1367 this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum()); 1368 defaultDecodingCtx = new HFileBlockDefaultDecodingContext(conf, fileContext); 1369 encodedBlockDecodingCtx = defaultDecodingCtx; 1370 isPreadAllBytes = readerContext.isPreadAllBytes(); 1371 } 1372 1373 @Override 1374 public BlockIterator blockRange(final long startOffset, final long endOffset) { 1375 final FSReader owner = this; // handle for inner class 1376 return new BlockIterator() { 1377 private volatile boolean freed = false; 1378 // Tracking all read blocks until we call freeBlocks. 1379 private List<HFileBlock> blockTracker = new ArrayList<>(); 1380 private long offset = startOffset; 1381 // Cache length of next block. Current block has the length of next block in it. 1382 private long length = -1; 1383 1384 @Override 1385 public HFileBlock nextBlock() throws IOException { 1386 if (offset >= endOffset) { 1387 return null; 1388 } 1389 HFileBlock b = readBlockData(offset, length, false, false, true); 1390 offset += b.getOnDiskSizeWithHeader(); 1391 length = b.getNextBlockOnDiskSize(); 1392 HFileBlock uncompressed = b.unpack(fileContext, owner); 1393 if (uncompressed != b) { 1394 b.release(); // Need to release the compressed Block now. 1395 } 1396 blockTracker.add(uncompressed); 1397 return uncompressed; 1398 } 1399 1400 @Override 1401 public HFileBlock nextBlockWithBlockType(BlockType blockType) throws IOException { 1402 HFileBlock blk = nextBlock(); 1403 if (blk.getBlockType() != blockType) { 1404 throw new IOException( 1405 "Expected block of type " + blockType + " but found " + blk.getBlockType()); 1406 } 1407 return blk; 1408 } 1409 1410 @Override 1411 public void freeBlocks() { 1412 if (freed) { 1413 return; 1414 } 1415 blockTracker.forEach(HFileBlock::release); 1416 blockTracker = null; 1417 freed = true; 1418 } 1419 }; 1420 } 1421 1422 /** 1423 * Does a positional read or a seek and read into the given byte buffer. We need take care that 1424 * we will call the {@link ByteBuff#release()} for every exit to deallocate the ByteBuffers, 1425 * otherwise the memory leak may happen. 1426 * @param dest destination buffer 1427 * @param size size of read 1428 * @param peekIntoNextBlock whether to read the next block's on-disk size 1429 * @param fileOffset position in the stream to read at 1430 * @param pread whether we should do a positional read 1431 * @param istream The input source of data 1432 * @return true to indicate the destination buffer include the next block header, otherwise only 1433 * include the current block data without the next block header. 1434 * @throws IOException if any IO error happen. 1435 */ 1436 protected boolean readAtOffset(FSDataInputStream istream, ByteBuff dest, int size, 1437 boolean peekIntoNextBlock, long fileOffset, boolean pread) throws IOException { 1438 if (!pread) { 1439 // Seek + read. Better for scanning. 1440 HFileUtil.seekOnMultipleSources(istream, fileOffset); 1441 long realOffset = istream.getPos(); 1442 if (realOffset != fileOffset) { 1443 throw new IOException("Tried to seek to " + fileOffset + " to read " + size 1444 + " bytes, but pos=" + realOffset + " after seek"); 1445 } 1446 if (!peekIntoNextBlock) { 1447 BlockIOUtils.readFully(dest, istream, size); 1448 return false; 1449 } 1450 1451 // Try to read the next block header 1452 if (!BlockIOUtils.readWithExtra(dest, istream, size, hdrSize)) { 1453 // did not read the next block header. 1454 return false; 1455 } 1456 } else { 1457 // Positional read. Better for random reads; or when the streamLock is already locked. 1458 int extraSize = peekIntoNextBlock ? hdrSize : 0; 1459 if ( 1460 !BlockIOUtils.preadWithExtra(dest, istream, fileOffset, size, extraSize, isPreadAllBytes) 1461 ) { 1462 // did not read the next block header. 1463 return false; 1464 } 1465 } 1466 assert peekIntoNextBlock; 1467 return true; 1468 } 1469 1470 /** 1471 * Reads a version 2 block (version 1 blocks not supported and not expected). Tries to do as 1472 * little memory allocation as possible, using the provided on-disk size. 1473 * @param offset the offset in the stream to read at 1474 * @param onDiskSizeWithHeaderL the on-disk size of the block, including the header, or -1 if 1475 * unknown; i.e. when iterating over blocks reading in the file 1476 * metadata info. 1477 * @param pread whether to use a positional read 1478 * @param updateMetrics whether to update the metrics 1479 * @param intoHeap allocate ByteBuff of block from heap or off-heap. 1480 * @see FSReader#readBlockData(long, long, boolean, boolean, boolean) for more details about the 1481 * useHeap. 1482 */ 1483 @Override 1484 public HFileBlock readBlockData(long offset, long onDiskSizeWithHeaderL, boolean pread, 1485 boolean updateMetrics, boolean intoHeap) throws IOException { 1486 // Get a copy of the current state of whether to validate 1487 // hbase checksums or not for this read call. This is not 1488 // thread-safe but the one constaint is that if we decide 1489 // to skip hbase checksum verification then we are 1490 // guaranteed to use hdfs checksum verification. 1491 boolean doVerificationThruHBaseChecksum = streamWrapper.shouldUseHBaseChecksum(); 1492 FSDataInputStream is = streamWrapper.getStream(doVerificationThruHBaseChecksum); 1493 1494 HFileBlock blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread, 1495 doVerificationThruHBaseChecksum, updateMetrics, intoHeap); 1496 if (blk == null) { 1497 HFile.LOG.warn("HBase checksum verification failed for file " + pathName + " at offset " 1498 + offset + " filesize " + fileSize + ". Retrying read with HDFS checksums turned on..."); 1499 1500 if (!doVerificationThruHBaseChecksum) { 1501 String msg = "HBase checksum verification failed for file " + pathName + " at offset " 1502 + offset + " filesize " + fileSize + " but this cannot happen because doVerify is " 1503 + doVerificationThruHBaseChecksum; 1504 HFile.LOG.warn(msg); 1505 throw new IOException(msg); // cannot happen case here 1506 } 1507 HFile.CHECKSUM_FAILURES.increment(); // update metrics 1508 1509 // If we have a checksum failure, we fall back into a mode where 1510 // the next few reads use HDFS level checksums. We aim to make the 1511 // next CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD reads avoid 1512 // hbase checksum verification, but since this value is set without 1513 // holding any locks, it can so happen that we might actually do 1514 // a few more than precisely this number. 1515 is = this.streamWrapper.fallbackToFsChecksum(CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD); 1516 doVerificationThruHBaseChecksum = false; 1517 blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread, 1518 doVerificationThruHBaseChecksum, updateMetrics, intoHeap); 1519 if (blk != null) { 1520 HFile.LOG.warn("HDFS checksum verification succeeded for file " + pathName + " at offset " 1521 + offset + " filesize " + fileSize); 1522 } 1523 } 1524 if (blk == null && !doVerificationThruHBaseChecksum) { 1525 String msg = 1526 "readBlockData failed, possibly due to " + "checksum verification failed for file " 1527 + pathName + " at offset " + offset + " filesize " + fileSize; 1528 HFile.LOG.warn(msg); 1529 throw new IOException(msg); 1530 } 1531 1532 // If there is a checksum mismatch earlier, then retry with 1533 // HBase checksums switched off and use HDFS checksum verification. 1534 // This triggers HDFS to detect and fix corrupt replicas. The 1535 // next checksumOffCount read requests will use HDFS checksums. 1536 // The decrementing of this.checksumOffCount is not thread-safe, 1537 // but it is harmless because eventually checksumOffCount will be 1538 // a negative number. 1539 streamWrapper.checksumOk(); 1540 return blk; 1541 } 1542 1543 /** 1544 * @return Check <code>onDiskSizeWithHeaderL</code> size is healthy and then return it as an int 1545 */ 1546 private static int checkAndGetSizeAsInt(final long onDiskSizeWithHeaderL, final int hdrSize) 1547 throws IOException { 1548 if ( 1549 (onDiskSizeWithHeaderL < hdrSize && onDiskSizeWithHeaderL != -1) 1550 || onDiskSizeWithHeaderL >= Integer.MAX_VALUE 1551 ) { 1552 throw new IOException( 1553 "Invalid onDisksize=" + onDiskSizeWithHeaderL + ": expected to be at least " + hdrSize 1554 + " and at most " + Integer.MAX_VALUE + ", or -1"); 1555 } 1556 return (int) onDiskSizeWithHeaderL; 1557 } 1558 1559 /** 1560 * Verify the passed in onDiskSizeWithHeader aligns with what is in the header else something is 1561 * not right. 1562 */ 1563 private void verifyOnDiskSizeMatchesHeader(final int passedIn, final ByteBuff headerBuf, 1564 final long offset, boolean verifyChecksum) throws IOException { 1565 // Assert size provided aligns with what is in the header 1566 int fromHeader = getOnDiskSizeWithHeader(headerBuf, verifyChecksum); 1567 if (passedIn != fromHeader) { 1568 throw new IOException("Passed in onDiskSizeWithHeader=" + passedIn + " != " + fromHeader 1569 + ", offset=" + offset + ", fileContext=" + this.fileContext); 1570 } 1571 } 1572 1573 /** 1574 * Check atomic reference cache for this block's header. Cache only good if next read coming 1575 * through is next in sequence in the block. We read next block's header on the tail of reading 1576 * the previous block to save a seek. Otherwise, we have to do a seek to read the header before 1577 * we can pull in the block OR we have to backup the stream because we over-read (the next 1578 * block's header). 1579 * @see PrefetchedHeader 1580 * @return The cached block header or null if not found. 1581 * @see #cacheNextBlockHeader(long, ByteBuff, int, int) 1582 */ 1583 private ByteBuff getCachedHeader(final long offset) { 1584 PrefetchedHeader ph = this.prefetchedHeader.get(); 1585 return ph != null && ph.offset == offset ? ph.buf : null; 1586 } 1587 1588 /** 1589 * Save away the next blocks header in atomic reference. 1590 * @see #getCachedHeader(long) 1591 * @see PrefetchedHeader 1592 */ 1593 private void cacheNextBlockHeader(final long offset, ByteBuff onDiskBlock, 1594 int onDiskSizeWithHeader, int headerLength) { 1595 PrefetchedHeader ph = new PrefetchedHeader(); 1596 ph.offset = offset; 1597 onDiskBlock.get(onDiskSizeWithHeader, ph.header, 0, headerLength); 1598 this.prefetchedHeader.set(ph); 1599 } 1600 1601 private int getNextBlockOnDiskSize(boolean readNextHeader, ByteBuff onDiskBlock, 1602 int onDiskSizeWithHeader) { 1603 int nextBlockOnDiskSize = -1; 1604 if (readNextHeader) { 1605 nextBlockOnDiskSize = 1606 onDiskBlock.getIntAfterPosition(onDiskSizeWithHeader + BlockType.MAGIC_LENGTH) + hdrSize; 1607 } 1608 return nextBlockOnDiskSize; 1609 } 1610 1611 private ByteBuff allocate(int size, boolean intoHeap) { 1612 return intoHeap ? HEAP.allocate(size) : allocator.allocate(size); 1613 } 1614 1615 /** 1616 * Reads a version 2 block. 1617 * @param offset the offset in the stream to read at. 1618 * @param onDiskSizeWithHeaderL the on-disk size of the block, including the header and 1619 * checksums if present or -1 if unknown (as a long). Can be -1 if 1620 * we are doing raw iteration of blocks as when loading up file 1621 * metadata; i.e. the first read of a new file. Usually non-null 1622 * gotten from the file index. 1623 * @param pread whether to use a positional read 1624 * @param verifyChecksum Whether to use HBase checksums. If HBase checksum is switched 1625 * off, then use HDFS checksum. Can also flip on/off reading same 1626 * file if we hit a troublesome patch in an hfile. 1627 * @param updateMetrics whether need to update the metrics. 1628 * @param intoHeap allocate the ByteBuff of block from heap or off-heap. 1629 * @return the HFileBlock or null if there is a HBase checksum mismatch 1630 */ 1631 protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset, 1632 long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum, boolean updateMetrics, 1633 boolean intoHeap) throws IOException { 1634 if (offset < 0) { 1635 throw new IOException("Invalid offset=" + offset + " trying to read " + "block (onDiskSize=" 1636 + onDiskSizeWithHeaderL + ")"); 1637 } 1638 int onDiskSizeWithHeader = checkAndGetSizeAsInt(onDiskSizeWithHeaderL, hdrSize); 1639 // Try and get cached header. Will serve us in rare case where onDiskSizeWithHeaderL is -1 1640 // and will save us having to seek the stream backwards to reread the header we 1641 // read the last time through here. 1642 ByteBuff headerBuf = getCachedHeader(offset); 1643 LOG.trace( 1644 "Reading {} at offset={}, pread={}, verifyChecksum={}, cachedHeader={}, " 1645 + "onDiskSizeWithHeader={}", 1646 this.fileContext.getHFileName(), offset, pread, verifyChecksum, headerBuf, 1647 onDiskSizeWithHeader); 1648 // This is NOT same as verifyChecksum. This latter is whether to do hbase 1649 // checksums. Can change with circumstances. The below flag is whether the 1650 // file has support for checksums (version 2+). 1651 boolean checksumSupport = this.fileContext.isUseHBaseChecksum(); 1652 long startTime = EnvironmentEdgeManager.currentTime(); 1653 if (onDiskSizeWithHeader <= 0) { 1654 // We were not passed the block size. Need to get it from the header. If header was 1655 // not cached (see getCachedHeader above), need to seek to pull it in. This is costly 1656 // and should happen very rarely. Currently happens on open of a hfile reader where we 1657 // read the trailer blocks to pull in the indices. Otherwise, we are reading block sizes 1658 // out of the hfile index. To check, enable TRACE in this file and you'll get an exception 1659 // in a LOG every time we seek. See HBASE-17072 for more detail. 1660 if (headerBuf == null) { 1661 if (LOG.isTraceEnabled()) { 1662 LOG.trace("Extra see to get block size!", new RuntimeException()); 1663 } 1664 headerBuf = HEAP.allocate(hdrSize); 1665 readAtOffset(is, headerBuf, hdrSize, false, offset, pread); 1666 headerBuf.rewind(); 1667 } 1668 onDiskSizeWithHeader = getOnDiskSizeWithHeader(headerBuf, checksumSupport); 1669 } 1670 int preReadHeaderSize = headerBuf == null ? 0 : hdrSize; 1671 // Allocate enough space to fit the next block's header too; saves a seek next time through. 1672 // onDiskBlock is whole block + header + checksums then extra hdrSize to read next header; 1673 // onDiskSizeWithHeader is header, body, and any checksums if present. preReadHeaderSize 1674 // says where to start reading. If we have the header cached, then we don't need to read 1675 // it again and we can likely read from last place we left off w/o need to backup and reread 1676 // the header we read last time through here. 1677 ByteBuff onDiskBlock = this.allocate(onDiskSizeWithHeader + hdrSize, intoHeap); 1678 boolean initHFileBlockSuccess = false; 1679 try { 1680 if (headerBuf != null) { 1681 onDiskBlock.put(0, headerBuf, 0, hdrSize).position(hdrSize); 1682 } 1683 boolean readNextHeader = readAtOffset(is, onDiskBlock, 1684 onDiskSizeWithHeader - preReadHeaderSize, true, offset + preReadHeaderSize, pread); 1685 onDiskBlock.rewind(); // in case of moving position when copying a cached header 1686 int nextBlockOnDiskSize = 1687 getNextBlockOnDiskSize(readNextHeader, onDiskBlock, onDiskSizeWithHeader); 1688 if (headerBuf == null) { 1689 headerBuf = onDiskBlock.duplicate().position(0).limit(hdrSize); 1690 } 1691 // Do a few checks before we go instantiate HFileBlock. 1692 assert onDiskSizeWithHeader > this.hdrSize; 1693 verifyOnDiskSizeMatchesHeader(onDiskSizeWithHeader, headerBuf, offset, checksumSupport); 1694 ByteBuff curBlock = onDiskBlock.duplicate().position(0).limit(onDiskSizeWithHeader); 1695 // Verify checksum of the data before using it for building HFileBlock. 1696 if (verifyChecksum && !validateChecksum(offset, curBlock, hdrSize)) { 1697 return null; 1698 } 1699 long duration = EnvironmentEdgeManager.currentTime() - startTime; 1700 if (updateMetrics) { 1701 HFile.updateReadLatency(duration, pread); 1702 } 1703 // The onDiskBlock will become the headerAndDataBuffer for this block. 1704 // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already 1705 // contains the header of next block, so no need to set next block's header in it. 1706 HFileBlock hFileBlock = createFromBuff(curBlock, checksumSupport, offset, 1707 nextBlockOnDiskSize, fileContext, intoHeap ? HEAP : allocator); 1708 // Run check on uncompressed sizings. 1709 if (!fileContext.isCompressedOrEncrypted()) { 1710 hFileBlock.sanityCheckUncompressed(); 1711 } 1712 LOG.trace("Read {} in {} ms", hFileBlock, duration); 1713 // Cache next block header if we read it for the next time through here. 1714 if (nextBlockOnDiskSize != -1) { 1715 cacheNextBlockHeader(offset + hFileBlock.getOnDiskSizeWithHeader(), onDiskBlock, 1716 onDiskSizeWithHeader, hdrSize); 1717 } 1718 initHFileBlockSuccess = true; 1719 return hFileBlock; 1720 } finally { 1721 if (!initHFileBlockSuccess) { 1722 onDiskBlock.release(); 1723 } 1724 } 1725 } 1726 1727 @Override 1728 public void setIncludesMemStoreTS(boolean includesMemstoreTS) { 1729 this.fileContext = 1730 new HFileContextBuilder(this.fileContext).withIncludesMvcc(includesMemstoreTS).build(); 1731 } 1732 1733 @Override 1734 public void setDataBlockEncoder(HFileDataBlockEncoder encoder, Configuration conf) { 1735 encodedBlockDecodingCtx = encoder.newDataBlockDecodingContext(conf, fileContext); 1736 } 1737 1738 @Override 1739 public HFileBlockDecodingContext getBlockDecodingContext() { 1740 return this.encodedBlockDecodingCtx; 1741 } 1742 1743 @Override 1744 public HFileBlockDecodingContext getDefaultBlockDecodingContext() { 1745 return this.defaultDecodingCtx; 1746 } 1747 1748 /** 1749 * Generates the checksum for the header as well as the data and then validates it. If the block 1750 * doesn't uses checksum, returns false. 1751 * @return True if checksum matches, else false. 1752 */ 1753 private boolean validateChecksum(long offset, ByteBuff data, int hdrSize) { 1754 // If this is an older version of the block that does not have checksums, then return false 1755 // indicating that checksum verification did not succeed. Actually, this method should never 1756 // be called when the minorVersion is 0, thus this is a defensive check for a cannot-happen 1757 // case. Since this is a cannot-happen case, it is better to return false to indicate a 1758 // checksum validation failure. 1759 if (!fileContext.isUseHBaseChecksum()) { 1760 return false; 1761 } 1762 return ChecksumUtil.validateChecksum(data, pathName, offset, hdrSize); 1763 } 1764 1765 @Override 1766 public void closeStreams() throws IOException { 1767 streamWrapper.close(); 1768 } 1769 1770 @Override 1771 public void unbufferStream() { 1772 // To handle concurrent reads, ensure that no other client is accessing the streams while we 1773 // unbuffer it. 1774 if (streamLock.tryLock()) { 1775 try { 1776 this.streamWrapper.unbuffer(); 1777 } finally { 1778 streamLock.unlock(); 1779 } 1780 } 1781 } 1782 1783 @Override 1784 public String toString() { 1785 return "hfs=" + hfs + ", path=" + pathName + ", fileContext=" + fileContext; 1786 } 1787 } 1788 1789 /** An additional sanity-check in case no compression or encryption is being used. */ 1790 void sanityCheckUncompressed() throws IOException { 1791 if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader + totalChecksumBytes()) { 1792 throw new IOException("Using no compression but " + "onDiskSizeWithoutHeader=" 1793 + onDiskSizeWithoutHeader + ", " + "uncompressedSizeWithoutHeader=" 1794 + uncompressedSizeWithoutHeader + ", numChecksumbytes=" + totalChecksumBytes()); 1795 } 1796 } 1797 1798 // Cacheable implementation 1799 @Override 1800 public int getSerializedLength() { 1801 if (buf != null) { 1802 // Include extra bytes for block metadata. 1803 return this.buf.limit() + BLOCK_METADATA_SPACE; 1804 } 1805 return 0; 1806 } 1807 1808 // Cacheable implementation 1809 @Override 1810 public void serialize(ByteBuffer destination, boolean includeNextBlockMetadata) { 1811 this.buf.get(destination, 0, getSerializedLength() - BLOCK_METADATA_SPACE); 1812 destination = addMetaData(destination, includeNextBlockMetadata); 1813 1814 // Make it ready for reading. flip sets position to zero and limit to current position which 1815 // is what we want if we do not want to serialize the block plus checksums if present plus 1816 // metadata. 1817 destination.flip(); 1818 } 1819 1820 /** 1821 * For use by bucketcache. This exposes internals. 1822 */ 1823 public ByteBuffer getMetaData(ByteBuffer bb) { 1824 bb = addMetaData(bb, true); 1825 bb.flip(); 1826 return bb; 1827 } 1828 1829 /** 1830 * Adds metadata at current position (position is moved forward). Does not flip or reset. 1831 * @return The passed <code>destination</code> with metadata added. 1832 */ 1833 private ByteBuffer addMetaData(final ByteBuffer destination, boolean includeNextBlockMetadata) { 1834 destination.put(this.fileContext.isUseHBaseChecksum() ? (byte) 1 : (byte) 0); 1835 destination.putLong(this.offset); 1836 if (includeNextBlockMetadata) { 1837 destination.putInt(this.nextBlockOnDiskSize); 1838 } 1839 return destination; 1840 } 1841 1842 // Cacheable implementation 1843 @Override 1844 public CacheableDeserializer<Cacheable> getDeserializer() { 1845 return HFileBlock.BLOCK_DESERIALIZER; 1846 } 1847 1848 @Override 1849 public int hashCode() { 1850 int result = 1; 1851 result = result * 31 + blockType.hashCode(); 1852 result = result * 31 + nextBlockOnDiskSize; 1853 result = result * 31 + (int) (offset ^ (offset >>> 32)); 1854 result = result * 31 + onDiskSizeWithoutHeader; 1855 result = result * 31 + (int) (prevBlockOffset ^ (prevBlockOffset >>> 32)); 1856 result = result * 31 + uncompressedSizeWithoutHeader; 1857 result = result * 31 + buf.hashCode(); 1858 return result; 1859 } 1860 1861 @Override 1862 public boolean equals(Object comparison) { 1863 if (this == comparison) { 1864 return true; 1865 } 1866 if (comparison == null) { 1867 return false; 1868 } 1869 if (!(comparison instanceof HFileBlock)) { 1870 return false; 1871 } 1872 1873 HFileBlock castedComparison = (HFileBlock) comparison; 1874 1875 if (castedComparison.blockType != this.blockType) { 1876 return false; 1877 } 1878 if (castedComparison.nextBlockOnDiskSize != this.nextBlockOnDiskSize) { 1879 return false; 1880 } 1881 // Offset is important. Needed when we have to remake cachekey when block is returned to cache. 1882 if (castedComparison.offset != this.offset) { 1883 return false; 1884 } 1885 if (castedComparison.onDiskSizeWithoutHeader != this.onDiskSizeWithoutHeader) { 1886 return false; 1887 } 1888 if (castedComparison.prevBlockOffset != this.prevBlockOffset) { 1889 return false; 1890 } 1891 if (castedComparison.uncompressedSizeWithoutHeader != this.uncompressedSizeWithoutHeader) { 1892 return false; 1893 } 1894 if ( 1895 ByteBuff.compareTo(this.buf, 0, this.buf.limit(), castedComparison.buf, 0, 1896 castedComparison.buf.limit()) != 0 1897 ) { 1898 return false; 1899 } 1900 return true; 1901 } 1902 1903 DataBlockEncoding getDataBlockEncoding() { 1904 if (blockType == BlockType.ENCODED_DATA) { 1905 return DataBlockEncoding.getEncodingById(getDataBlockEncodingId()); 1906 } 1907 return DataBlockEncoding.NONE; 1908 } 1909 1910 byte getChecksumType() { 1911 return this.fileContext.getChecksumType().getCode(); 1912 } 1913 1914 int getBytesPerChecksum() { 1915 return this.fileContext.getBytesPerChecksum(); 1916 } 1917 1918 /** @return the size of data on disk + header. Excludes checksum. */ 1919 int getOnDiskDataSizeWithHeader() { 1920 return this.onDiskDataSizeWithHeader; 1921 } 1922 1923 /** 1924 * Calculate the number of bytes required to store all the checksums for this block. Each checksum 1925 * value is a 4 byte integer. 1926 */ 1927 int totalChecksumBytes() { 1928 // If the hfile block has minorVersion 0, then there are no checksum 1929 // data to validate. Similarly, a zero value in this.bytesPerChecksum 1930 // indicates that cached blocks do not have checksum data because 1931 // checksums were already validated when the block was read from disk. 1932 if (!fileContext.isUseHBaseChecksum() || this.fileContext.getBytesPerChecksum() == 0) { 1933 return 0; 1934 } 1935 return (int) ChecksumUtil.numBytes(onDiskDataSizeWithHeader, 1936 this.fileContext.getBytesPerChecksum()); 1937 } 1938 1939 /** 1940 * Returns the size of this block header. 1941 */ 1942 public int headerSize() { 1943 return headerSize(this.fileContext.isUseHBaseChecksum()); 1944 } 1945 1946 /** 1947 * Maps a minor version to the size of the header. 1948 */ 1949 public static int headerSize(boolean usesHBaseChecksum) { 1950 return usesHBaseChecksum 1951 ? HConstants.HFILEBLOCK_HEADER_SIZE 1952 : HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM; 1953 } 1954 1955 /** 1956 * Return the appropriate DUMMY_HEADER for the minor version 1957 */ 1958 // TODO: Why is this in here? 1959 byte[] getDummyHeaderForVersion() { 1960 return getDummyHeaderForVersion(this.fileContext.isUseHBaseChecksum()); 1961 } 1962 1963 /** 1964 * Return the appropriate DUMMY_HEADER for the minor version 1965 */ 1966 static private byte[] getDummyHeaderForVersion(boolean usesHBaseChecksum) { 1967 return usesHBaseChecksum ? HConstants.HFILEBLOCK_DUMMY_HEADER : DUMMY_HEADER_NO_CHECKSUM; 1968 } 1969 1970 /** 1971 * @return This HFileBlocks fileContext which will a derivative of the fileContext for the file 1972 * from which this block's data was originally read. 1973 */ 1974 public HFileContext getHFileContext() { 1975 return this.fileContext; 1976 } 1977 1978 /** 1979 * Convert the contents of the block header into a human readable string. This is mostly helpful 1980 * for debugging. This assumes that the block has minor version > 0. 1981 */ 1982 static String toStringHeader(ByteBuff buf) throws IOException { 1983 byte[] magicBuf = new byte[Math.min(buf.limit() - buf.position(), BlockType.MAGIC_LENGTH)]; 1984 buf.get(magicBuf); 1985 BlockType bt = BlockType.parse(magicBuf, 0, BlockType.MAGIC_LENGTH); 1986 int compressedBlockSizeNoHeader = buf.getInt(); 1987 int uncompressedBlockSizeNoHeader = buf.getInt(); 1988 long prevBlockOffset = buf.getLong(); 1989 byte cksumtype = buf.get(); 1990 long bytesPerChecksum = buf.getInt(); 1991 long onDiskDataSizeWithHeader = buf.getInt(); 1992 return " Header dump: magic: " + Bytes.toString(magicBuf) + " blockType " + bt 1993 + " compressedBlockSizeNoHeader " + compressedBlockSizeNoHeader 1994 + " uncompressedBlockSizeNoHeader " + uncompressedBlockSizeNoHeader + " prevBlockOffset " 1995 + prevBlockOffset + " checksumType " + ChecksumType.codeToType(cksumtype) 1996 + " bytesPerChecksum " + bytesPerChecksum + " onDiskDataSizeWithHeader " 1997 + onDiskDataSizeWithHeader; 1998 } 1999 2000 private static HFileBlockBuilder createBuilder(HFileBlock blk) { 2001 return new HFileBlockBuilder().withBlockType(blk.blockType) 2002 .withOnDiskSizeWithoutHeader(blk.onDiskSizeWithoutHeader) 2003 .withUncompressedSizeWithoutHeader(blk.uncompressedSizeWithoutHeader) 2004 .withPrevBlockOffset(blk.prevBlockOffset).withByteBuff(blk.buf.duplicate()) // Duplicate the 2005 // buffer. 2006 .withOffset(blk.offset).withOnDiskDataSizeWithHeader(blk.onDiskDataSizeWithHeader) 2007 .withNextBlockOnDiskSize(blk.nextBlockOnDiskSize).withHFileContext(blk.fileContext) 2008 .withByteBuffAllocator(blk.allocator).withShared(blk.isSharedMem()); 2009 } 2010 2011 static HFileBlock shallowClone(HFileBlock blk) { 2012 return createBuilder(blk).build(); 2013 } 2014 2015 static HFileBlock deepCloneOnHeap(HFileBlock blk) { 2016 ByteBuff deepCloned = ByteBuff.wrap(ByteBuffer.wrap(blk.buf.toBytes(0, blk.buf.limit()))); 2017 return createBuilder(blk).withByteBuff(deepCloned).withShared(false).build(); 2018 } 2019}