001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import java.io.DataInputStream; 021import java.io.DataOutput; 022import java.io.DataOutputStream; 023import java.io.IOException; 024import java.io.InputStream; 025import java.nio.ByteBuffer; 026import java.util.concurrent.atomic.AtomicReference; 027import java.util.concurrent.locks.Lock; 028import java.util.concurrent.locks.ReentrantLock; 029 030import org.apache.hadoop.fs.FSDataInputStream; 031import org.apache.hadoop.fs.FSDataOutputStream; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.Cell; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038import org.apache.hadoop.hbase.fs.HFileSystem; 039import org.apache.hadoop.hbase.io.ByteArrayOutputStream; 040import org.apache.hadoop.hbase.io.ByteBuffInputStream; 041import org.apache.hadoop.hbase.io.ByteBufferWriterDataOutputStream; 042import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; 043import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 044import org.apache.hadoop.hbase.io.encoding.EncodingState; 045import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; 046import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext; 047import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext; 048import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext; 049import org.apache.hadoop.hbase.nio.ByteBuff; 050import org.apache.hadoop.hbase.nio.MultiByteBuff; 051import org.apache.hadoop.hbase.nio.SingleByteBuff; 052import org.apache.hadoop.hbase.regionserver.ShipperListener; 053import org.apache.hadoop.hbase.util.Bytes; 054import org.apache.hadoop.hbase.util.ChecksumType; 055import org.apache.hadoop.hbase.util.ClassSize; 056import org.apache.hadoop.io.IOUtils; 057 058import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 059import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 060 061/** 062 * Cacheable Blocks of an {@link HFile} version 2 file. 063 * Version 2 was introduced in hbase-0.92.0. 064 * 065 * <p>Version 1 was the original file block. Version 2 was introduced when we changed the hbase file 066 * format to support multi-level block indexes and compound bloom filters (HBASE-3857). Support 067 * for Version 1 was removed in hbase-1.3.0. 068 * 069 * <h3>HFileBlock: Version 2</h3> 070 * In version 2, a block is structured as follows: 071 * <ul> 072 * <li><b>Header:</b> See Writer#putHeader() for where header is written; header total size is 073 * HFILEBLOCK_HEADER_SIZE 074 * <ul> 075 * <li>0. blockType: Magic record identifying the {@link BlockType} (8 bytes): 076 * e.g. <code>DATABLK*</code> 077 * <li>1. onDiskSizeWithoutHeader: Compressed -- a.k.a 'on disk' -- block size, excluding header, 078 * but including tailing checksum bytes (4 bytes) 079 * <li>2. uncompressedSizeWithoutHeader: Uncompressed block size, excluding header, and excluding 080 * checksum bytes (4 bytes) 081 * <li>3. prevBlockOffset: The offset of the previous block of the same type (8 bytes). This is 082 * used to navigate to the previous block without having to go to the block index 083 * <li>4: For minorVersions >=1, the ordinal describing checksum type (1 byte) 084 * <li>5: For minorVersions >=1, the number of data bytes/checksum chunk (4 bytes) 085 * <li>6: onDiskDataSizeWithHeader: For minorVersions >=1, the size of data 'on disk', including 086 * header, excluding checksums (4 bytes) 087 * </ul> 088 * </li> 089 * <li><b>Raw/Compressed/Encrypted/Encoded data:</b> The compression 090 * algorithm is the same for all the blocks in an {@link HFile}. If compression is NONE, this is 091 * just raw, serialized Cells. 092 * <li><b>Tail:</b> For minorVersions >=1, a series of 4 byte checksums, one each for 093 * the number of bytes specified by bytesPerChecksum. 094 * </ul> 095 * 096 * <h3>Caching</h3> 097 * Caches cache whole blocks with trailing checksums if any. We then tag on some metadata, the 098 * content of BLOCK_METADATA_SPACE which will be flag on if we are doing 'hbase' 099 * checksums and then the offset into the file which is needed when we re-make a cache key 100 * when we return the block to the cache as 'done'. 101 * See {@link Cacheable#serialize(ByteBuffer, boolean)} and {@link Cacheable#getDeserializer()}. 102 * 103 * <p>TODO: Should we cache the checksums? Down in Writer#getBlockForCaching(CacheConfig) where 104 * we make a block to cache-on-write, there is an attempt at turning off checksums. This is not the 105 * only place we get blocks to cache. We also will cache the raw return from an hdfs read. In this 106 * case, the checksums may be present. If the cache is backed by something that doesn't do ECC, 107 * say an SSD, we might want to preserve checksums. For now this is open question. 108 * <p>TODO: Over in BucketCache, we save a block allocation by doing a custom serialization. 109 * Be sure to change it if serialization changes in here. Could we add a method here that takes an 110 * IOEngine and that then serializes to it rather than expose our internals over in BucketCache? 111 * IOEngine is in the bucket subpackage. Pull it up? Then this class knows about bucketcache. Ugh. 112 */ 113@InterfaceAudience.Private 114public class HFileBlock implements Cacheable { 115 private static final Logger LOG = LoggerFactory.getLogger(HFileBlock.class); 116 117 // Block Header fields. 118 119 // TODO: encapsulate Header related logic in this inner class. 120 static class Header { 121 // Format of header is: 122 // 8 bytes - block magic 123 // 4 bytes int - onDiskSizeWithoutHeader 124 // 4 bytes int - uncompressedSizeWithoutHeader 125 // 8 bytes long - prevBlockOffset 126 // The following 3 are only present if header contains checksum information 127 // 1 byte - checksum type 128 // 4 byte int - bytes per checksum 129 // 4 byte int - onDiskDataSizeWithHeader 130 static int BLOCK_MAGIC_INDEX = 0; 131 static int ON_DISK_SIZE_WITHOUT_HEADER_INDEX = 8; 132 static int UNCOMPRESSED_SIZE_WITHOUT_HEADER_INDEX = 12; 133 static int PREV_BLOCK_OFFSET_INDEX = 16; 134 static int CHECKSUM_TYPE_INDEX = 24; 135 static int BYTES_PER_CHECKSUM_INDEX = 25; 136 static int ON_DISK_DATA_SIZE_WITH_HEADER_INDEX = 29; 137 } 138 139 /** Type of block. Header field 0. */ 140 private BlockType blockType; 141 142 /** 143 * Size on disk excluding header, including checksum. Header field 1. 144 * @see Writer#putHeader(byte[], int, int, int, int) 145 */ 146 private int onDiskSizeWithoutHeader; 147 148 /** 149 * Size of pure data. Does not include header or checksums. Header field 2. 150 * @see Writer#putHeader(byte[], int, int, int, int) 151 */ 152 private int uncompressedSizeWithoutHeader; 153 154 /** 155 * The offset of the previous block on disk. Header field 3. 156 * @see Writer#putHeader(byte[], int, int, int, int) 157 */ 158 private long prevBlockOffset; 159 160 /** 161 * Size on disk of header + data. Excludes checksum. Header field 6, 162 * OR calculated from {@link #onDiskSizeWithoutHeader} when using HDFS checksum. 163 * @see Writer#putHeader(byte[], int, int, int, int) 164 */ 165 private int onDiskDataSizeWithHeader; 166 // End of Block Header fields. 167 168 /** 169 * The in-memory representation of the hfile block. Can be on or offheap. Can be backed by 170 * a single ByteBuffer or by many. Make no assumptions. 171 * 172 * <p>Be careful reading from this <code>buf</code>. Duplicate and work on the duplicate or if 173 * not, be sure to reset position and limit else trouble down the road. 174 * 175 * <p>TODO: Make this read-only once made. 176 * 177 * <p>We are using the ByteBuff type. ByteBuffer is not extensible yet we need to be able to have 178 * a ByteBuffer-like API across multiple ByteBuffers reading from a cache such as BucketCache. 179 * So, we have this ByteBuff type. Unfortunately, it is spread all about HFileBlock. Would be 180 * good if could be confined to cache-use only but hard-to-do. 181 */ 182 private ByteBuff buf; 183 184 /** Meta data that holds meta information on the hfileblock. 185 */ 186 private HFileContext fileContext; 187 188 /** 189 * The offset of this block in the file. Populated by the reader for 190 * convenience of access. This offset is not part of the block header. 191 */ 192 private long offset = UNSET; 193 194 private MemoryType memType = MemoryType.EXCLUSIVE; 195 196 /** 197 * The on-disk size of the next block, including the header and checksums if present. 198 * UNSET if unknown. 199 * 200 * Blocks try to carry the size of the next block to read in this data member. Usually 201 * we get block sizes from the hfile index but sometimes the index is not available: 202 * e.g. when we read the indexes themselves (indexes are stored in blocks, we do not 203 * have an index for the indexes). Saves seeks especially around file open when 204 * there is a flurry of reading in hfile metadata. 205 */ 206 private int nextBlockOnDiskSize = UNSET; 207 208 /** 209 * On a checksum failure, do these many succeeding read requests using hdfs checksums before 210 * auto-reenabling hbase checksum verification. 211 */ 212 static final int CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD = 3; 213 214 private static int UNSET = -1; 215 public static final boolean FILL_HEADER = true; 216 public static final boolean DONT_FILL_HEADER = false; 217 218 // How to get the estimate correctly? if it is a singleBB? 219 public static final int MULTI_BYTE_BUFFER_HEAP_SIZE = 220 (int)ClassSize.estimateBase(MultiByteBuff.class, false); 221 222 /** 223 * Space for metadata on a block that gets stored along with the block when we cache it. 224 * There are a few bytes stuck on the end of the HFileBlock that we pull in from HDFS. 225 * 8 bytes are for the offset of this block (long) in the file. Offset is important because is is 226 * used when we remake the CacheKey when we return block to the cache when done. There is also 227 * a flag on whether checksumming is being done by hbase or not. See class comment for note on 228 * uncertain state of checksumming of blocks that come out of cache (should we or should we not?). 229 * Finally there are 4 bytes to hold the length of the next block which can save a seek on 230 * occasion if available. 231 * (This EXTRA info came in with original commit of the bucketcache, HBASE-7404. It was 232 * formerly known as EXTRA_SERIALIZATION_SPACE). 233 */ 234 static final int BLOCK_METADATA_SPACE = Bytes.SIZEOF_BYTE + Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT; 235 236 /** 237 * Each checksum value is an integer that can be stored in 4 bytes. 238 */ 239 static final int CHECKSUM_SIZE = Bytes.SIZEOF_INT; 240 241 static final byte[] DUMMY_HEADER_NO_CHECKSUM = 242 new byte[HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM]; 243 244 /** 245 * Used deserializing blocks from Cache. 246 * 247 * <code> 248 * ++++++++++++++ 249 * + HFileBlock + 250 * ++++++++++++++ 251 * + Checksums + <= Optional 252 * ++++++++++++++ 253 * + Metadata! + <= See note on BLOCK_METADATA_SPACE above. 254 * ++++++++++++++ 255 * </code> 256 * @see #serialize(ByteBuffer) 257 */ 258 static final CacheableDeserializer<Cacheable> BLOCK_DESERIALIZER = 259 new CacheableDeserializer<Cacheable>() { 260 @Override 261 public HFileBlock deserialize(ByteBuff buf, boolean reuse, MemoryType memType) 262 throws IOException { 263 // The buf has the file block followed by block metadata. 264 // Set limit to just before the BLOCK_METADATA_SPACE then rewind. 265 buf.limit(buf.limit() - BLOCK_METADATA_SPACE).rewind(); 266 // Get a new buffer to pass the HFileBlock for it to 'own'. 267 ByteBuff newByteBuff; 268 if (reuse) { 269 newByteBuff = buf.slice(); 270 } else { 271 int len = buf.limit(); 272 newByteBuff = new SingleByteBuff(ByteBuffer.allocate(len)); 273 newByteBuff.put(0, buf, buf.position(), len); 274 } 275 // Read out the BLOCK_METADATA_SPACE content and shove into our HFileBlock. 276 buf.position(buf.limit()); 277 buf.limit(buf.limit() + HFileBlock.BLOCK_METADATA_SPACE); 278 boolean usesChecksum = buf.get() == (byte) 1; 279 long offset = buf.getLong(); 280 int nextBlockOnDiskSize = buf.getInt(); 281 HFileBlock hFileBlock = 282 new HFileBlock(newByteBuff, usesChecksum, memType, offset, nextBlockOnDiskSize, null); 283 return hFileBlock; 284 } 285 286 @Override 287 public int getDeserialiserIdentifier() { 288 return DESERIALIZER_IDENTIFIER; 289 } 290 291 @Override 292 public HFileBlock deserialize(ByteBuff b) throws IOException { 293 // Used only in tests 294 return deserialize(b, false, MemoryType.EXCLUSIVE); 295 } 296 }; 297 298 private static final int DESERIALIZER_IDENTIFIER; 299 static { 300 DESERIALIZER_IDENTIFIER = 301 CacheableDeserializerIdManager.registerDeserializer(BLOCK_DESERIALIZER); 302 } 303 304 /** 305 * Copy constructor. Creates a shallow copy of {@code that}'s buffer. 306 */ 307 private HFileBlock(HFileBlock that) { 308 this(that, false); 309 } 310 311 /** 312 * Copy constructor. Creates a shallow/deep copy of {@code that}'s buffer as per the boolean 313 * param. 314 */ 315 private HFileBlock(HFileBlock that, boolean bufCopy) { 316 init(that.blockType, that.onDiskSizeWithoutHeader, 317 that.uncompressedSizeWithoutHeader, that.prevBlockOffset, 318 that.offset, that.onDiskDataSizeWithHeader, that.nextBlockOnDiskSize, that.fileContext); 319 if (bufCopy) { 320 this.buf = new SingleByteBuff(ByteBuffer.wrap(that.buf.toBytes(0, that.buf.limit()))); 321 } else { 322 this.buf = that.buf.duplicate(); 323 } 324 } 325 326 /** 327 * Creates a new {@link HFile} block from the given fields. This constructor 328 * is used only while writing blocks and caching, 329 * and is sitting in a byte buffer and we want to stuff the block into cache. 330 * See {@link Writer#getBlockForCaching(CacheConfig)}. 331 * 332 * <p>TODO: The caller presumes no checksumming 333 * required of this block instance since going into cache; checksum already verified on 334 * underlying block data pulled in from filesystem. Is that correct? What if cache is SSD? 335 * 336 * @param blockType the type of this block, see {@link BlockType} 337 * @param onDiskSizeWithoutHeader see {@link #onDiskSizeWithoutHeader} 338 * @param uncompressedSizeWithoutHeader see {@link #uncompressedSizeWithoutHeader} 339 * @param prevBlockOffset see {@link #prevBlockOffset} 340 * @param b block header ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes) 341 * @param fillHeader when true, write the first 4 header fields into passed buffer. 342 * @param offset the file offset the block was read from 343 * @param onDiskDataSizeWithHeader see {@link #onDiskDataSizeWithHeader} 344 * @param fileContext HFile meta data 345 */ 346 @VisibleForTesting 347 public HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader, 348 int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuffer b, boolean fillHeader, 349 long offset, final int nextBlockOnDiskSize, int onDiskDataSizeWithHeader, 350 HFileContext fileContext) { 351 init(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, 352 prevBlockOffset, offset, onDiskDataSizeWithHeader, nextBlockOnDiskSize, fileContext); 353 this.buf = new SingleByteBuff(b); 354 if (fillHeader) { 355 overwriteHeader(); 356 } 357 this.buf.rewind(); 358 } 359 360 /** 361 * Creates a block from an existing buffer starting with a header. Rewinds 362 * and takes ownership of the buffer. By definition of rewind, ignores the 363 * buffer position, but if you slice the buffer beforehand, it will rewind 364 * to that point. 365 * @param buf Has header, content, and trailing checksums if present. 366 */ 367 HFileBlock(ByteBuff buf, boolean usesHBaseChecksum, MemoryType memType, final long offset, 368 final int nextBlockOnDiskSize, HFileContext fileContext) throws IOException { 369 buf.rewind(); 370 final BlockType blockType = BlockType.read(buf); 371 final int onDiskSizeWithoutHeader = buf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX); 372 final int uncompressedSizeWithoutHeader = 373 buf.getInt(Header.UNCOMPRESSED_SIZE_WITHOUT_HEADER_INDEX); 374 final long prevBlockOffset = buf.getLong(Header.PREV_BLOCK_OFFSET_INDEX); 375 // This constructor is called when we deserialize a block from cache and when we read a block in 376 // from the fs. fileCache is null when deserialized from cache so need to make up one. 377 HFileContextBuilder fileContextBuilder = fileContext != null? 378 new HFileContextBuilder(fileContext): new HFileContextBuilder(); 379 fileContextBuilder.withHBaseCheckSum(usesHBaseChecksum); 380 int onDiskDataSizeWithHeader; 381 if (usesHBaseChecksum) { 382 byte checksumType = buf.get(Header.CHECKSUM_TYPE_INDEX); 383 int bytesPerChecksum = buf.getInt(Header.BYTES_PER_CHECKSUM_INDEX); 384 onDiskDataSizeWithHeader = buf.getInt(Header.ON_DISK_DATA_SIZE_WITH_HEADER_INDEX); 385 // Use the checksum type and bytes per checksum from header, not from filecontext. 386 fileContextBuilder.withChecksumType(ChecksumType.codeToType(checksumType)); 387 fileContextBuilder.withBytesPerCheckSum(bytesPerChecksum); 388 } else { 389 fileContextBuilder.withChecksumType(ChecksumType.NULL); 390 fileContextBuilder.withBytesPerCheckSum(0); 391 // Need to fix onDiskDataSizeWithHeader; there are not checksums after-block-data 392 onDiskDataSizeWithHeader = onDiskSizeWithoutHeader + headerSize(usesHBaseChecksum); 393 } 394 fileContext = fileContextBuilder.build(); 395 assert usesHBaseChecksum == fileContext.isUseHBaseChecksum(); 396 init(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, 397 prevBlockOffset, offset, onDiskDataSizeWithHeader, nextBlockOnDiskSize, fileContext); 398 this.memType = memType; 399 this.offset = offset; 400 this.buf = buf; 401 this.buf.rewind(); 402 } 403 404 /** 405 * Called from constructors. 406 */ 407 private void init(BlockType blockType, int onDiskSizeWithoutHeader, 408 int uncompressedSizeWithoutHeader, long prevBlockOffset, 409 long offset, int onDiskDataSizeWithHeader, final int nextBlockOnDiskSize, 410 HFileContext fileContext) { 411 this.blockType = blockType; 412 this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader; 413 this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader; 414 this.prevBlockOffset = prevBlockOffset; 415 this.offset = offset; 416 this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader; 417 this.nextBlockOnDiskSize = nextBlockOnDiskSize; 418 this.fileContext = fileContext; 419 } 420 421 /** 422 * Parse total on disk size including header and checksum. 423 * @param headerBuf Header ByteBuffer. Presumed exact size of header. 424 * @param verifyChecksum true if checksum verification is in use. 425 * @return Size of the block with header included. 426 */ 427 private static int getOnDiskSizeWithHeader(final ByteBuffer headerBuf, 428 boolean verifyChecksum) { 429 return headerBuf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX) + 430 headerSize(verifyChecksum); 431 } 432 433 /** 434 * @return the on-disk size of the next block (including the header size and any checksums if 435 * present) read by peeking into the next block's header; use as a hint when doing 436 * a read of the next block when scanning or running over a file. 437 */ 438 int getNextBlockOnDiskSize() { 439 return nextBlockOnDiskSize; 440 } 441 442 @Override 443 public BlockType getBlockType() { 444 return blockType; 445 } 446 447 /** @return get data block encoding id that was used to encode this block */ 448 short getDataBlockEncodingId() { 449 if (blockType != BlockType.ENCODED_DATA) { 450 throw new IllegalArgumentException("Querying encoder ID of a block " + 451 "of type other than " + BlockType.ENCODED_DATA + ": " + blockType); 452 } 453 return buf.getShort(headerSize()); 454 } 455 456 /** 457 * @return the on-disk size of header + data part + checksum. 458 */ 459 public int getOnDiskSizeWithHeader() { 460 return onDiskSizeWithoutHeader + headerSize(); 461 } 462 463 /** 464 * @return the on-disk size of the data part + checksum (header excluded). 465 */ 466 int getOnDiskSizeWithoutHeader() { 467 return onDiskSizeWithoutHeader; 468 } 469 470 /** 471 * @return the uncompressed size of data part (header and checksum excluded). 472 */ 473 int getUncompressedSizeWithoutHeader() { 474 return uncompressedSizeWithoutHeader; 475 } 476 477 /** 478 * @return the offset of the previous block of the same type in the file, or 479 * -1 if unknown 480 */ 481 long getPrevBlockOffset() { 482 return prevBlockOffset; 483 } 484 485 /** 486 * Rewinds {@code buf} and writes first 4 header fields. {@code buf} position 487 * is modified as side-effect. 488 */ 489 private void overwriteHeader() { 490 buf.rewind(); 491 blockType.write(buf); 492 buf.putInt(onDiskSizeWithoutHeader); 493 buf.putInt(uncompressedSizeWithoutHeader); 494 buf.putLong(prevBlockOffset); 495 if (this.fileContext.isUseHBaseChecksum()) { 496 buf.put(fileContext.getChecksumType().getCode()); 497 buf.putInt(fileContext.getBytesPerChecksum()); 498 buf.putInt(onDiskDataSizeWithHeader); 499 } 500 } 501 502 /** 503 * Returns a buffer that does not include the header or checksum. 504 * 505 * @return the buffer with header skipped and checksum omitted. 506 */ 507 public ByteBuff getBufferWithoutHeader() { 508 ByteBuff dup = getBufferReadOnly(); 509 // Now set it up so Buffer spans content only -- no header or no checksums. 510 return dup.position(headerSize()).limit(buf.limit() - totalChecksumBytes()).slice(); 511 } 512 513 /** 514 * Returns a read-only duplicate of the buffer this block stores internally ready to be read. 515 * Clients must not modify the buffer object though they may set position and limit on the 516 * returned buffer since we pass back a duplicate. This method has to be public because it is used 517 * in {@link CompoundBloomFilter} to avoid object creation on every Bloom 518 * filter lookup, but has to be used with caution. Buffer holds header, block content, 519 * and any follow-on checksums if present. 520 * 521 * @return the buffer of this block for read-only operations 522 */ 523 public ByteBuff getBufferReadOnly() { 524 // TODO: ByteBuf does not support asReadOnlyBuffer(). Fix. 525 ByteBuff dup = this.buf.duplicate(); 526 assert dup.position() == 0; 527 return dup; 528 } 529 530 @VisibleForTesting 531 private void sanityCheckAssertion(long valueFromBuf, long valueFromField, 532 String fieldName) throws IOException { 533 if (valueFromBuf != valueFromField) { 534 throw new AssertionError(fieldName + " in the buffer (" + valueFromBuf 535 + ") is different from that in the field (" + valueFromField + ")"); 536 } 537 } 538 539 @VisibleForTesting 540 private void sanityCheckAssertion(BlockType valueFromBuf, BlockType valueFromField) 541 throws IOException { 542 if (valueFromBuf != valueFromField) { 543 throw new IOException("Block type stored in the buffer: " + 544 valueFromBuf + ", block type field: " + valueFromField); 545 } 546 } 547 548 /** 549 * Checks if the block is internally consistent, i.e. the first 550 * {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the buffer contain a 551 * valid header consistent with the fields. Assumes a packed block structure. 552 * This function is primary for testing and debugging, and is not 553 * thread-safe, because it alters the internal buffer pointer. 554 * Used by tests only. 555 */ 556 @VisibleForTesting 557 void sanityCheck() throws IOException { 558 // Duplicate so no side-effects 559 ByteBuff dup = this.buf.duplicate().rewind(); 560 sanityCheckAssertion(BlockType.read(dup), blockType); 561 562 sanityCheckAssertion(dup.getInt(), onDiskSizeWithoutHeader, "onDiskSizeWithoutHeader"); 563 564 sanityCheckAssertion(dup.getInt(), uncompressedSizeWithoutHeader, 565 "uncompressedSizeWithoutHeader"); 566 567 sanityCheckAssertion(dup.getLong(), prevBlockOffset, "prevBlockOffset"); 568 if (this.fileContext.isUseHBaseChecksum()) { 569 sanityCheckAssertion(dup.get(), this.fileContext.getChecksumType().getCode(), "checksumType"); 570 sanityCheckAssertion(dup.getInt(), this.fileContext.getBytesPerChecksum(), 571 "bytesPerChecksum"); 572 sanityCheckAssertion(dup.getInt(), onDiskDataSizeWithHeader, "onDiskDataSizeWithHeader"); 573 } 574 575 int cksumBytes = totalChecksumBytes(); 576 int expectedBufLimit = onDiskDataSizeWithHeader + cksumBytes; 577 if (dup.limit() != expectedBufLimit) { 578 throw new AssertionError("Expected limit " + expectedBufLimit + ", got " + dup.limit()); 579 } 580 581 // We might optionally allocate HFILEBLOCK_HEADER_SIZE more bytes to read the next 582 // block's header, so there are two sensible values for buffer capacity. 583 int hdrSize = headerSize(); 584 if (dup.capacity() != expectedBufLimit && dup.capacity() != expectedBufLimit + hdrSize) { 585 throw new AssertionError("Invalid buffer capacity: " + dup.capacity() + 586 ", expected " + expectedBufLimit + " or " + (expectedBufLimit + hdrSize)); 587 } 588 } 589 590 @Override 591 public String toString() { 592 StringBuilder sb = new StringBuilder() 593 .append("[") 594 .append("blockType=").append(blockType) 595 .append(", fileOffset=").append(offset) 596 .append(", headerSize=").append(headerSize()) 597 .append(", onDiskSizeWithoutHeader=").append(onDiskSizeWithoutHeader) 598 .append(", uncompressedSizeWithoutHeader=").append(uncompressedSizeWithoutHeader) 599 .append(", prevBlockOffset=").append(prevBlockOffset) 600 .append(", isUseHBaseChecksum=").append(fileContext.isUseHBaseChecksum()); 601 if (fileContext.isUseHBaseChecksum()) { 602 sb.append(", checksumType=").append(ChecksumType.codeToType(this.buf.get(24))) 603 .append(", bytesPerChecksum=").append(this.buf.getInt(24 + 1)) 604 .append(", onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader); 605 } else { 606 sb.append(", onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader) 607 .append("(").append(onDiskSizeWithoutHeader) 608 .append("+").append(HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM).append(")"); 609 } 610 String dataBegin = null; 611 if (buf.hasArray()) { 612 dataBegin = Bytes.toStringBinary(buf.array(), buf.arrayOffset() + headerSize(), 613 Math.min(32, buf.limit() - buf.arrayOffset() - headerSize())); 614 } else { 615 ByteBuff bufWithoutHeader = getBufferWithoutHeader(); 616 byte[] dataBeginBytes = new byte[Math.min(32, 617 bufWithoutHeader.limit() - bufWithoutHeader.position())]; 618 bufWithoutHeader.get(dataBeginBytes); 619 dataBegin = Bytes.toStringBinary(dataBeginBytes); 620 } 621 sb.append(", getOnDiskSizeWithHeader=").append(getOnDiskSizeWithHeader()) 622 .append(", totalChecksumBytes=").append(totalChecksumBytes()) 623 .append(", isUnpacked=").append(isUnpacked()) 624 .append(", buf=[").append(buf).append("]") 625 .append(", dataBeginsWith=").append(dataBegin) 626 .append(", fileContext=").append(fileContext) 627 .append(", nextBlockOnDiskSize=").append(nextBlockOnDiskSize) 628 .append("]"); 629 return sb.toString(); 630 } 631 632 /** 633 * Retrieves the decompressed/decrypted view of this block. An encoded block remains in its 634 * encoded structure. Internal structures are shared between instances where applicable. 635 */ 636 HFileBlock unpack(HFileContext fileContext, FSReader reader) throws IOException { 637 if (!fileContext.isCompressedOrEncrypted()) { 638 // TODO: cannot use our own fileContext here because HFileBlock(ByteBuffer, boolean), 639 // which is used for block serialization to L2 cache, does not preserve encoding and 640 // encryption details. 641 return this; 642 } 643 644 HFileBlock unpacked = new HFileBlock(this); 645 unpacked.allocateBuffer(); // allocates space for the decompressed block 646 647 HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA ? 648 reader.getBlockDecodingContext() : reader.getDefaultBlockDecodingContext(); 649 650 ByteBuff dup = this.buf.duplicate(); 651 dup.position(this.headerSize()); 652 dup = dup.slice(); 653 ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(), 654 unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(), 655 dup); 656 return unpacked; 657 } 658 659 /** 660 * Always allocates a new buffer of the correct size. Copies header bytes 661 * from the existing buffer. Does not change header fields. 662 * Reserve room to keep checksum bytes too. 663 */ 664 private void allocateBuffer() { 665 int cksumBytes = totalChecksumBytes(); 666 int headerSize = headerSize(); 667 int capacityNeeded = headerSize + uncompressedSizeWithoutHeader + cksumBytes; 668 669 // TODO we need consider allocating offheap here? 670 ByteBuffer newBuf = ByteBuffer.allocate(capacityNeeded); 671 672 // Copy header bytes into newBuf. 673 // newBuf is HBB so no issue in calling array() 674 buf.position(0); 675 buf.get(newBuf.array(), newBuf.arrayOffset(), headerSize); 676 677 buf = new SingleByteBuff(newBuf); 678 // set limit to exclude next block's header 679 buf.limit(headerSize + uncompressedSizeWithoutHeader + cksumBytes); 680 } 681 682 /** 683 * Return true when this block's buffer has been unpacked, false otherwise. Note this is a 684 * calculated heuristic, not tracked attribute of the block. 685 */ 686 public boolean isUnpacked() { 687 final int cksumBytes = totalChecksumBytes(); 688 final int headerSize = headerSize(); 689 final int expectedCapacity = headerSize + uncompressedSizeWithoutHeader + cksumBytes; 690 final int bufCapacity = buf.capacity(); 691 return bufCapacity == expectedCapacity || bufCapacity == expectedCapacity + headerSize; 692 } 693 694 /** An additional sanity-check in case no compression or encryption is being used. */ 695 @VisibleForTesting 696 void sanityCheckUncompressedSize() throws IOException { 697 if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader + totalChecksumBytes()) { 698 throw new IOException("Using no compression but " 699 + "onDiskSizeWithoutHeader=" + onDiskSizeWithoutHeader + ", " 700 + "uncompressedSizeWithoutHeader=" + uncompressedSizeWithoutHeader 701 + ", numChecksumbytes=" + totalChecksumBytes()); 702 } 703 } 704 705 /** 706 * Cannot be {@link #UNSET}. Must be a legitimate value. Used re-making the {@link BlockCacheKey} when 707 * block is returned to the cache. 708 * @return the offset of this block in the file it was read from 709 */ 710 long getOffset() { 711 if (offset < 0) { 712 throw new IllegalStateException("HFile block offset not initialized properly"); 713 } 714 return offset; 715 } 716 717 /** 718 * @return a byte stream reading the data + checksum of this block 719 */ 720 DataInputStream getByteStream() { 721 ByteBuff dup = this.buf.duplicate(); 722 dup.position(this.headerSize()); 723 return new DataInputStream(new ByteBuffInputStream(dup)); 724 } 725 726 @Override 727 public long heapSize() { 728 long size = ClassSize.align( 729 ClassSize.OBJECT + 730 // Block type, multi byte buffer, MemoryType and meta references 731 4 * ClassSize.REFERENCE + 732 // On-disk size, uncompressed size, and next block's on-disk size 733 // bytePerChecksum and onDiskDataSize 734 4 * Bytes.SIZEOF_INT + 735 // This and previous block offset 736 2 * Bytes.SIZEOF_LONG + 737 // Heap size of the meta object. meta will be always not null. 738 fileContext.heapSize() 739 ); 740 741 if (buf != null) { 742 // Deep overhead of the byte buffer. Needs to be aligned separately. 743 size += ClassSize.align(buf.capacity() + MULTI_BYTE_BUFFER_HEAP_SIZE); 744 } 745 746 return ClassSize.align(size); 747 } 748 749 /** 750 * Read from an input stream at least <code>necessaryLen</code> and if possible, 751 * <code>extraLen</code> also if available. Analogous to 752 * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but specifies a 753 * number of "extra" bytes to also optionally read. 754 * 755 * @param in the input stream to read from 756 * @param buf the buffer to read into 757 * @param bufOffset the destination offset in the buffer 758 * @param necessaryLen the number of bytes that are absolutely necessary to read 759 * @param extraLen the number of extra bytes that would be nice to read 760 * @return true if succeeded reading the extra bytes 761 * @throws IOException if failed to read the necessary bytes 762 */ 763 static boolean readWithExtra(InputStream in, byte[] buf, 764 int bufOffset, int necessaryLen, int extraLen) throws IOException { 765 int bytesRemaining = necessaryLen + extraLen; 766 while (bytesRemaining > 0) { 767 int ret = in.read(buf, bufOffset, bytesRemaining); 768 if (ret == -1 && bytesRemaining <= extraLen) { 769 // We could not read the "extra data", but that is OK. 770 break; 771 } 772 if (ret < 0) { 773 throw new IOException("Premature EOF from inputStream (read " 774 + "returned " + ret + ", was trying to read " + necessaryLen 775 + " necessary bytes and " + extraLen + " extra bytes, " 776 + "successfully read " 777 + (necessaryLen + extraLen - bytesRemaining)); 778 } 779 bufOffset += ret; 780 bytesRemaining -= ret; 781 } 782 return bytesRemaining <= 0; 783 } 784 785 /** 786 * Read from an input stream at least <code>necessaryLen</code> and if possible, 787 * <code>extraLen</code> also if available. Analogous to 788 * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but uses 789 * positional read and specifies a number of "extra" bytes that would be 790 * desirable but not absolutely necessary to read. 791 * 792 * @param in the input stream to read from 793 * @param position the position within the stream from which to start reading 794 * @param buf the buffer to read into 795 * @param bufOffset the destination offset in the buffer 796 * @param necessaryLen the number of bytes that are absolutely necessary to 797 * read 798 * @param extraLen the number of extra bytes that would be nice to read 799 * @return true if and only if extraLen is > 0 and reading those extra bytes 800 * was successful 801 * @throws IOException if failed to read the necessary bytes 802 */ 803 @VisibleForTesting 804 static boolean positionalReadWithExtra(FSDataInputStream in, 805 long position, byte[] buf, int bufOffset, int necessaryLen, int extraLen) 806 throws IOException { 807 int bytesRemaining = necessaryLen + extraLen; 808 int bytesRead = 0; 809 while (bytesRead < necessaryLen) { 810 int ret = in.read(position, buf, bufOffset, bytesRemaining); 811 if (ret < 0) { 812 throw new IOException("Premature EOF from inputStream (positional read " 813 + "returned " + ret + ", was trying to read " + necessaryLen 814 + " necessary bytes and " + extraLen + " extra bytes, " 815 + "successfully read " + bytesRead); 816 } 817 position += ret; 818 bufOffset += ret; 819 bytesRemaining -= ret; 820 bytesRead += ret; 821 } 822 return bytesRead != necessaryLen && bytesRemaining <= 0; 823 } 824 825 /** 826 * Unified version 2 {@link HFile} block writer. The intended usage pattern 827 * is as follows: 828 * <ol> 829 * <li>Construct an {@link HFileBlock.Writer}, providing a compression algorithm. 830 * <li>Call {@link Writer#startWriting} and get a data stream to write to. 831 * <li>Write your data into the stream. 832 * <li>Call Writer#writeHeaderAndData(FSDataOutputStream) as many times as you need to. 833 * store the serialized block into an external stream. 834 * <li>Repeat to write more blocks. 835 * </ol> 836 * <p> 837 */ 838 static class Writer implements ShipperListener { 839 private enum State { 840 INIT, 841 WRITING, 842 BLOCK_READY 843 }; 844 845 /** Writer state. Used to ensure the correct usage protocol. */ 846 private State state = State.INIT; 847 848 /** Data block encoder used for data blocks */ 849 private final HFileDataBlockEncoder dataBlockEncoder; 850 851 private HFileBlockEncodingContext dataBlockEncodingCtx; 852 853 /** block encoding context for non-data blocks*/ 854 private HFileBlockDefaultEncodingContext defaultBlockEncodingCtx; 855 856 /** 857 * The stream we use to accumulate data into a block in an uncompressed format. 858 * We reset this stream at the end of each block and reuse it. The 859 * header is written as the first {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes into this 860 * stream. 861 */ 862 private ByteArrayOutputStream baosInMemory; 863 864 /** 865 * Current block type. Set in {@link #startWriting(BlockType)}. Could be 866 * changed in {@link #finishBlock()} from {@link BlockType#DATA} 867 * to {@link BlockType#ENCODED_DATA}. 868 */ 869 private BlockType blockType; 870 871 /** 872 * A stream that we write uncompressed bytes to, which compresses them and 873 * writes them to {@link #baosInMemory}. 874 */ 875 private DataOutputStream userDataStream; 876 877 // Size of actual data being written. Not considering the block encoding/compression. This 878 // includes the header size also. 879 private int unencodedDataSizeWritten; 880 881 // Size of actual data being written. considering the block encoding. This 882 // includes the header size also. 883 private int encodedDataSizeWritten; 884 885 /** 886 * Bytes to be written to the file system, including the header. Compressed 887 * if compression is turned on. It also includes the checksum data that 888 * immediately follows the block data. (header + data + checksums) 889 */ 890 private ByteArrayOutputStream onDiskBlockBytesWithHeader; 891 892 /** 893 * The size of the checksum data on disk. It is used only if data is 894 * not compressed. If data is compressed, then the checksums are already 895 * part of onDiskBytesWithHeader. If data is uncompressed, then this 896 * variable stores the checksum data for this block. 897 */ 898 private byte[] onDiskChecksum = HConstants.EMPTY_BYTE_ARRAY; 899 900 /** 901 * Current block's start offset in the {@link HFile}. Set in 902 * {@link #writeHeaderAndData(FSDataOutputStream)}. 903 */ 904 private long startOffset; 905 906 /** 907 * Offset of previous block by block type. Updated when the next block is 908 * started. 909 */ 910 private long[] prevOffsetByType; 911 912 /** The offset of the previous block of the same type */ 913 private long prevOffset; 914 /** Meta data that holds information about the hfileblock**/ 915 private HFileContext fileContext; 916 917 @Override 918 public void beforeShipped() { 919 if (getEncodingState() != null) { 920 getEncodingState().beforeShipped(); 921 } 922 } 923 924 EncodingState getEncodingState() { 925 return dataBlockEncodingCtx.getEncodingState(); 926 } 927 928 /** 929 * @param dataBlockEncoder data block encoding algorithm to use 930 */ 931 public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext fileContext) { 932 if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) { 933 throw new RuntimeException("Unsupported value of bytesPerChecksum. " + 934 " Minimum is " + HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is " + 935 fileContext.getBytesPerChecksum()); 936 } 937 this.dataBlockEncoder = dataBlockEncoder != null? 938 dataBlockEncoder: NoOpDataBlockEncoder.INSTANCE; 939 this.dataBlockEncodingCtx = this.dataBlockEncoder. 940 newDataBlockEncodingContext(HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext); 941 // TODO: This should be lazily instantiated since we usually do NOT need this default encoder 942 this.defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(null, 943 HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext); 944 // TODO: Set BAOS initial size. Use fileContext.getBlocksize() and add for header/checksum 945 baosInMemory = new ByteArrayOutputStream(); 946 prevOffsetByType = new long[BlockType.values().length]; 947 for (int i = 0; i < prevOffsetByType.length; ++i) { 948 prevOffsetByType[i] = UNSET; 949 } 950 // TODO: Why fileContext saved away when we have dataBlockEncoder and/or 951 // defaultDataBlockEncoder? 952 this.fileContext = fileContext; 953 } 954 955 /** 956 * Starts writing into the block. The previous block's data is discarded. 957 * 958 * @return the stream the user can write their data into 959 * @throws IOException 960 */ 961 DataOutputStream startWriting(BlockType newBlockType) 962 throws IOException { 963 if (state == State.BLOCK_READY && startOffset != -1) { 964 // We had a previous block that was written to a stream at a specific 965 // offset. Save that offset as the last offset of a block of that type. 966 prevOffsetByType[blockType.getId()] = startOffset; 967 } 968 969 startOffset = -1; 970 blockType = newBlockType; 971 972 baosInMemory.reset(); 973 baosInMemory.write(HConstants.HFILEBLOCK_DUMMY_HEADER); 974 975 state = State.WRITING; 976 977 // We will compress it later in finishBlock() 978 userDataStream = new ByteBufferWriterDataOutputStream(baosInMemory); 979 if (newBlockType == BlockType.DATA) { 980 this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream); 981 } 982 this.unencodedDataSizeWritten = 0; 983 this.encodedDataSizeWritten = 0; 984 return userDataStream; 985 } 986 987 /** 988 * Writes the Cell to this block 989 * @param cell 990 * @throws IOException 991 */ 992 void write(Cell cell) throws IOException{ 993 expectState(State.WRITING); 994 int posBeforeEncode = this.userDataStream.size(); 995 this.unencodedDataSizeWritten += 996 this.dataBlockEncoder.encode(cell, dataBlockEncodingCtx, this.userDataStream); 997 this.encodedDataSizeWritten += this.userDataStream.size() - posBeforeEncode; 998 } 999 1000 /** 1001 * Returns the stream for the user to write to. The block writer takes care 1002 * of handling compression and buffering for caching on write. Can only be 1003 * called in the "writing" state. 1004 * 1005 * @return the data output stream for the user to write to 1006 */ 1007 DataOutputStream getUserDataStream() { 1008 expectState(State.WRITING); 1009 return userDataStream; 1010 } 1011 1012 /** 1013 * Transitions the block writer from the "writing" state to the "block 1014 * ready" state. Does nothing if a block is already finished. 1015 */ 1016 void ensureBlockReady() throws IOException { 1017 Preconditions.checkState(state != State.INIT, 1018 "Unexpected state: " + state); 1019 1020 if (state == State.BLOCK_READY) { 1021 return; 1022 } 1023 1024 // This will set state to BLOCK_READY. 1025 finishBlock(); 1026 } 1027 1028 /** 1029 * Finish up writing of the block. 1030 * Flushes the compressing stream (if using compression), fills out the header, 1031 * does any compression/encryption of bytes to flush out to disk, and manages 1032 * the cache on write content, if applicable. Sets block write state to "block ready". 1033 */ 1034 private void finishBlock() throws IOException { 1035 if (blockType == BlockType.DATA) { 1036 this.dataBlockEncoder.endBlockEncoding(dataBlockEncodingCtx, userDataStream, 1037 baosInMemory.getBuffer(), blockType); 1038 blockType = dataBlockEncodingCtx.getBlockType(); 1039 } 1040 userDataStream.flush(); 1041 prevOffset = prevOffsetByType[blockType.getId()]; 1042 1043 // We need to set state before we can package the block up for cache-on-write. In a way, the 1044 // block is ready, but not yet encoded or compressed. 1045 state = State.BLOCK_READY; 1046 Bytes compressAndEncryptDat; 1047 if (blockType == BlockType.DATA || blockType == BlockType.ENCODED_DATA) { 1048 compressAndEncryptDat = dataBlockEncodingCtx. 1049 compressAndEncrypt(baosInMemory.getBuffer(), 0, baosInMemory.size()); 1050 } else { 1051 compressAndEncryptDat = defaultBlockEncodingCtx. 1052 compressAndEncrypt(baosInMemory.getBuffer(), 0, baosInMemory.size()); 1053 } 1054 if (compressAndEncryptDat == null) { 1055 compressAndEncryptDat = new Bytes(baosInMemory.getBuffer(), 0, baosInMemory.size()); 1056 } 1057 if (onDiskBlockBytesWithHeader == null) { 1058 onDiskBlockBytesWithHeader = new ByteArrayOutputStream(compressAndEncryptDat.getLength()); 1059 } 1060 onDiskBlockBytesWithHeader.reset(); 1061 onDiskBlockBytesWithHeader.write(compressAndEncryptDat.get(), 1062 compressAndEncryptDat.getOffset(), compressAndEncryptDat.getLength()); 1063 // Calculate how many bytes we need for checksum on the tail of the block. 1064 int numBytes = (int) ChecksumUtil.numBytes( 1065 onDiskBlockBytesWithHeader.size(), 1066 fileContext.getBytesPerChecksum()); 1067 1068 // Put the header for the on disk bytes; header currently is unfilled-out 1069 putHeader(onDiskBlockBytesWithHeader, 1070 onDiskBlockBytesWithHeader.size() + numBytes, 1071 baosInMemory.size(), onDiskBlockBytesWithHeader.size()); 1072 if (onDiskChecksum.length != numBytes) { 1073 onDiskChecksum = new byte[numBytes]; 1074 } 1075 ChecksumUtil.generateChecksums( 1076 onDiskBlockBytesWithHeader.getBuffer(), 0,onDiskBlockBytesWithHeader.size(), 1077 onDiskChecksum, 0, fileContext.getChecksumType(), fileContext.getBytesPerChecksum()); 1078 } 1079 1080 /** 1081 * Put the header into the given byte array at the given offset. 1082 * @param onDiskSize size of the block on disk header + data + checksum 1083 * @param uncompressedSize size of the block after decompression (but 1084 * before optional data block decoding) including header 1085 * @param onDiskDataSize size of the block on disk with header 1086 * and data but not including the checksums 1087 */ 1088 private void putHeader(byte[] dest, int offset, int onDiskSize, 1089 int uncompressedSize, int onDiskDataSize) { 1090 offset = blockType.put(dest, offset); 1091 offset = Bytes.putInt(dest, offset, onDiskSize - HConstants.HFILEBLOCK_HEADER_SIZE); 1092 offset = Bytes.putInt(dest, offset, uncompressedSize - HConstants.HFILEBLOCK_HEADER_SIZE); 1093 offset = Bytes.putLong(dest, offset, prevOffset); 1094 offset = Bytes.putByte(dest, offset, fileContext.getChecksumType().getCode()); 1095 offset = Bytes.putInt(dest, offset, fileContext.getBytesPerChecksum()); 1096 Bytes.putInt(dest, offset, onDiskDataSize); 1097 } 1098 1099 private void putHeader(ByteArrayOutputStream dest, int onDiskSize, 1100 int uncompressedSize, int onDiskDataSize) { 1101 putHeader(dest.getBuffer(),0, onDiskSize, uncompressedSize, onDiskDataSize); 1102 } 1103 1104 /** 1105 * Similar to {@link #writeHeaderAndData(FSDataOutputStream)}, but records 1106 * the offset of this block so that it can be referenced in the next block 1107 * of the same type. 1108 * 1109 * @param out 1110 * @throws IOException 1111 */ 1112 void writeHeaderAndData(FSDataOutputStream out) throws IOException { 1113 long offset = out.getPos(); 1114 if (startOffset != UNSET && offset != startOffset) { 1115 throw new IOException("A " + blockType + " block written to a " 1116 + "stream twice, first at offset " + startOffset + ", then at " 1117 + offset); 1118 } 1119 startOffset = offset; 1120 1121 finishBlockAndWriteHeaderAndData((DataOutputStream) out); 1122 } 1123 1124 /** 1125 * Writes the header and the compressed data of this block (or uncompressed 1126 * data when not using compression) into the given stream. Can be called in 1127 * the "writing" state or in the "block ready" state. If called in the 1128 * "writing" state, transitions the writer to the "block ready" state. 1129 * 1130 * @param out the output stream to write the 1131 * @throws IOException 1132 */ 1133 protected void finishBlockAndWriteHeaderAndData(DataOutputStream out) 1134 throws IOException { 1135 ensureBlockReady(); 1136 long startTime = System.currentTimeMillis(); 1137 out.write(onDiskBlockBytesWithHeader.getBuffer(), 0, onDiskBlockBytesWithHeader.size()); 1138 out.write(onDiskChecksum); 1139 HFile.updateWriteLatency(System.currentTimeMillis() - startTime); 1140 } 1141 1142 /** 1143 * Returns the header or the compressed data (or uncompressed data when not 1144 * using compression) as a byte array. Can be called in the "writing" state 1145 * or in the "block ready" state. If called in the "writing" state, 1146 * transitions the writer to the "block ready" state. This returns 1147 * the header + data + checksums stored on disk. 1148 * 1149 * @return header and data as they would be stored on disk in a byte array 1150 * @throws IOException 1151 */ 1152 byte[] getHeaderAndDataForTest() throws IOException { 1153 ensureBlockReady(); 1154 // This is not very optimal, because we are doing an extra copy. 1155 // But this method is used only by unit tests. 1156 byte[] output = 1157 new byte[onDiskBlockBytesWithHeader.size() 1158 + onDiskChecksum.length]; 1159 System.arraycopy(onDiskBlockBytesWithHeader.getBuffer(), 0, output, 0, 1160 onDiskBlockBytesWithHeader.size()); 1161 System.arraycopy(onDiskChecksum, 0, output, 1162 onDiskBlockBytesWithHeader.size(), onDiskChecksum.length); 1163 return output; 1164 } 1165 1166 /** 1167 * Releases resources used by this writer. 1168 */ 1169 void release() { 1170 if (dataBlockEncodingCtx != null) { 1171 dataBlockEncodingCtx.close(); 1172 dataBlockEncodingCtx = null; 1173 } 1174 if (defaultBlockEncodingCtx != null) { 1175 defaultBlockEncodingCtx.close(); 1176 defaultBlockEncodingCtx = null; 1177 } 1178 } 1179 1180 /** 1181 * Returns the on-disk size of the data portion of the block. This is the 1182 * compressed size if compression is enabled. Can only be called in the 1183 * "block ready" state. Header is not compressed, and its size is not 1184 * included in the return value. 1185 * 1186 * @return the on-disk size of the block, not including the header. 1187 */ 1188 int getOnDiskSizeWithoutHeader() { 1189 expectState(State.BLOCK_READY); 1190 return onDiskBlockBytesWithHeader.size() + 1191 onDiskChecksum.length - HConstants.HFILEBLOCK_HEADER_SIZE; 1192 } 1193 1194 /** 1195 * Returns the on-disk size of the block. Can only be called in the 1196 * "block ready" state. 1197 * 1198 * @return the on-disk size of the block ready to be written, including the 1199 * header size, the data and the checksum data. 1200 */ 1201 int getOnDiskSizeWithHeader() { 1202 expectState(State.BLOCK_READY); 1203 return onDiskBlockBytesWithHeader.size() + onDiskChecksum.length; 1204 } 1205 1206 /** 1207 * The uncompressed size of the block data. Does not include header size. 1208 */ 1209 int getUncompressedSizeWithoutHeader() { 1210 expectState(State.BLOCK_READY); 1211 return baosInMemory.size() - HConstants.HFILEBLOCK_HEADER_SIZE; 1212 } 1213 1214 /** 1215 * The uncompressed size of the block data, including header size. 1216 */ 1217 int getUncompressedSizeWithHeader() { 1218 expectState(State.BLOCK_READY); 1219 return baosInMemory.size(); 1220 } 1221 1222 /** @return true if a block is being written */ 1223 boolean isWriting() { 1224 return state == State.WRITING; 1225 } 1226 1227 /** 1228 * Returns the number of bytes written into the current block so far, or 1229 * zero if not writing the block at the moment. Note that this will return 1230 * zero in the "block ready" state as well. 1231 * 1232 * @return the number of bytes written 1233 */ 1234 public int encodedBlockSizeWritten() { 1235 if (state != State.WRITING) 1236 return 0; 1237 return this.encodedDataSizeWritten; 1238 } 1239 1240 /** 1241 * Returns the number of bytes written into the current block so far, or 1242 * zero if not writing the block at the moment. Note that this will return 1243 * zero in the "block ready" state as well. 1244 * 1245 * @return the number of bytes written 1246 */ 1247 int blockSizeWritten() { 1248 if (state != State.WRITING) return 0; 1249 return this.unencodedDataSizeWritten; 1250 } 1251 1252 /** 1253 * Clones the header followed by the uncompressed data, even if using 1254 * compression. This is needed for storing uncompressed blocks in the block 1255 * cache. Can be called in the "writing" state or the "block ready" state. 1256 * Returns only the header and data, does not include checksum data. 1257 * 1258 * @return Returns a copy of uncompressed block bytes for caching on write 1259 */ 1260 @VisibleForTesting 1261 ByteBuffer cloneUncompressedBufferWithHeader() { 1262 expectState(State.BLOCK_READY); 1263 byte[] uncompressedBlockBytesWithHeader = baosInMemory.toByteArray(); 1264 int numBytes = (int) ChecksumUtil.numBytes( 1265 onDiskBlockBytesWithHeader.size(), 1266 fileContext.getBytesPerChecksum()); 1267 putHeader(uncompressedBlockBytesWithHeader, 0, 1268 onDiskBlockBytesWithHeader.size() + numBytes, 1269 baosInMemory.size(), onDiskBlockBytesWithHeader.size()); 1270 return ByteBuffer.wrap(uncompressedBlockBytesWithHeader); 1271 } 1272 1273 /** 1274 * Clones the header followed by the on-disk (compressed/encoded/encrypted) data. This is 1275 * needed for storing packed blocks in the block cache. Expects calling semantics identical to 1276 * {@link #getUncompressedBufferWithHeader()}. Returns only the header and data, 1277 * Does not include checksum data. 1278 * 1279 * @return Returns a copy of block bytes for caching on write 1280 */ 1281 private ByteBuffer cloneOnDiskBufferWithHeader() { 1282 expectState(State.BLOCK_READY); 1283 return ByteBuffer.wrap(onDiskBlockBytesWithHeader.toByteArray()); 1284 } 1285 1286 private void expectState(State expectedState) { 1287 if (state != expectedState) { 1288 throw new IllegalStateException("Expected state: " + expectedState + 1289 ", actual state: " + state); 1290 } 1291 } 1292 1293 /** 1294 * Takes the given {@link BlockWritable} instance, creates a new block of 1295 * its appropriate type, writes the writable into this block, and flushes 1296 * the block into the output stream. The writer is instructed not to buffer 1297 * uncompressed bytes for cache-on-write. 1298 * 1299 * @param bw the block-writable object to write as a block 1300 * @param out the file system output stream 1301 * @throws IOException 1302 */ 1303 void writeBlock(BlockWritable bw, FSDataOutputStream out) 1304 throws IOException { 1305 bw.writeToBlock(startWriting(bw.getBlockType())); 1306 writeHeaderAndData(out); 1307 } 1308 1309 /** 1310 * Creates a new HFileBlock. Checksums have already been validated, so 1311 * the byte buffer passed into the constructor of this newly created 1312 * block does not have checksum data even though the header minor 1313 * version is MINOR_VERSION_WITH_CHECKSUM. This is indicated by setting a 1314 * 0 value in bytesPerChecksum. This method copies the on-disk or 1315 * uncompressed data to build the HFileBlock which is used only 1316 * while writing blocks and caching. 1317 * 1318 * <p>TODO: Should there be an option where a cache can ask that hbase preserve block 1319 * checksums for checking after a block comes out of the cache? Otehrwise, cache is responsible 1320 * for blocks being wholesome (ECC memory or if file-backed, it does checksumming). 1321 */ 1322 HFileBlock getBlockForCaching(CacheConfig cacheConf) { 1323 HFileContext newContext = new HFileContextBuilder() 1324 .withBlockSize(fileContext.getBlocksize()) 1325 .withBytesPerCheckSum(0) 1326 .withChecksumType(ChecksumType.NULL) // no checksums in cached data 1327 .withCompression(fileContext.getCompression()) 1328 .withDataBlockEncoding(fileContext.getDataBlockEncoding()) 1329 .withHBaseCheckSum(fileContext.isUseHBaseChecksum()) 1330 .withCompressTags(fileContext.isCompressTags()) 1331 .withIncludesMvcc(fileContext.isIncludesMvcc()) 1332 .withIncludesTags(fileContext.isIncludesTags()) 1333 .build(); 1334 return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(), 1335 getUncompressedSizeWithoutHeader(), prevOffset, 1336 cacheConf.shouldCacheCompressed(blockType.getCategory())? 1337 cloneOnDiskBufferWithHeader() : 1338 cloneUncompressedBufferWithHeader(), 1339 FILL_HEADER, startOffset, UNSET, 1340 onDiskBlockBytesWithHeader.size() + onDiskChecksum.length, newContext); 1341 } 1342 } 1343 1344 /** Something that can be written into a block. */ 1345 interface BlockWritable { 1346 /** The type of block this data should use. */ 1347 BlockType getBlockType(); 1348 1349 /** 1350 * Writes the block to the provided stream. Must not write any magic 1351 * records. 1352 * 1353 * @param out a stream to write uncompressed data into 1354 */ 1355 void writeToBlock(DataOutput out) throws IOException; 1356 } 1357 1358 /** Iterator for {@link HFileBlock}s. */ 1359 interface BlockIterator { 1360 /** 1361 * Get the next block, or null if there are no more blocks to iterate. 1362 */ 1363 HFileBlock nextBlock() throws IOException; 1364 1365 /** 1366 * Similar to {@link #nextBlock()} but checks block type, throws an 1367 * exception if incorrect, and returns the HFile block 1368 */ 1369 HFileBlock nextBlockWithBlockType(BlockType blockType) throws IOException; 1370 } 1371 1372 /** An HFile block reader with iteration ability. */ 1373 interface FSReader { 1374 /** 1375 * Reads the block at the given offset in the file with the given on-disk 1376 * size and uncompressed size. 1377 * 1378 * @param offset 1379 * @param onDiskSize the on-disk size of the entire block, including all 1380 * applicable headers, or -1 if unknown 1381 * @return the newly read block 1382 */ 1383 HFileBlock readBlockData(long offset, long onDiskSize, boolean pread, boolean updateMetrics) 1384 throws IOException; 1385 1386 /** 1387 * Creates a block iterator over the given portion of the {@link HFile}. 1388 * The iterator returns blocks starting with offset such that offset <= 1389 * startOffset < endOffset. Returned blocks are always unpacked. 1390 * Used when no hfile index available; e.g. reading in the hfile index 1391 * blocks themselves on file open. 1392 * 1393 * @param startOffset the offset of the block to start iteration with 1394 * @param endOffset the offset to end iteration at (exclusive) 1395 * @return an iterator of blocks between the two given offsets 1396 */ 1397 BlockIterator blockRange(long startOffset, long endOffset); 1398 1399 /** Closes the backing streams */ 1400 void closeStreams() throws IOException; 1401 1402 /** Get a decoder for {@link BlockType#ENCODED_DATA} blocks from this file. */ 1403 HFileBlockDecodingContext getBlockDecodingContext(); 1404 1405 /** Get the default decoder for blocks from this file. */ 1406 HFileBlockDecodingContext getDefaultBlockDecodingContext(); 1407 1408 void setIncludesMemStoreTS(boolean includesMemstoreTS); 1409 void setDataBlockEncoder(HFileDataBlockEncoder encoder); 1410 1411 /** 1412 * To close the stream's socket. Note: This can be concurrently called from multiple threads and 1413 * implementation should take care of thread safety. 1414 */ 1415 void unbufferStream(); 1416 } 1417 1418 /** 1419 * Data-structure to use caching the header of the NEXT block. Only works if next read 1420 * that comes in here is next in sequence in this block. 1421 * 1422 * When we read, we read current block and the next blocks' header. We do this so we have 1423 * the length of the next block to read if the hfile index is not available (rare, at 1424 * hfile open only). 1425 */ 1426 private static class PrefetchedHeader { 1427 long offset = -1; 1428 byte [] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE]; 1429 final ByteBuffer buf = ByteBuffer.wrap(header, 0, HConstants.HFILEBLOCK_HEADER_SIZE); 1430 1431 @Override 1432 public String toString() { 1433 return "offset=" + this.offset + ", header=" + Bytes.toStringBinary(header); 1434 } 1435 } 1436 1437 /** 1438 * Reads version 2 HFile blocks from the filesystem. 1439 */ 1440 static class FSReaderImpl implements FSReader { 1441 /** The file system stream of the underlying {@link HFile} that 1442 * does or doesn't do checksum validations in the filesystem */ 1443 private FSDataInputStreamWrapper streamWrapper; 1444 1445 private HFileBlockDecodingContext encodedBlockDecodingCtx; 1446 1447 /** Default context used when BlockType != {@link BlockType#ENCODED_DATA}. */ 1448 private final HFileBlockDefaultDecodingContext defaultDecodingCtx; 1449 1450 /** 1451 * Cache of the NEXT header after this. Check it is indeed next blocks header 1452 * before using it. TODO: Review. This overread into next block to fetch 1453 * next blocks header seems unnecessary given we usually get the block size 1454 * from the hfile index. Review! 1455 */ 1456 private AtomicReference<PrefetchedHeader> prefetchedHeader = new AtomicReference<>(new PrefetchedHeader()); 1457 1458 /** The size of the file we are reading from, or -1 if unknown. */ 1459 private long fileSize; 1460 1461 /** The size of the header */ 1462 @VisibleForTesting 1463 protected final int hdrSize; 1464 1465 /** The filesystem used to access data */ 1466 private HFileSystem hfs; 1467 1468 private HFileContext fileContext; 1469 // Cache the fileName 1470 private String pathName; 1471 1472 private final Lock streamLock = new ReentrantLock(); 1473 1474 FSReaderImpl(FSDataInputStreamWrapper stream, long fileSize, HFileSystem hfs, Path path, 1475 HFileContext fileContext) throws IOException { 1476 this.fileSize = fileSize; 1477 this.hfs = hfs; 1478 if (path != null) { 1479 this.pathName = path.toString(); 1480 } 1481 this.fileContext = fileContext; 1482 this.hdrSize = headerSize(fileContext.isUseHBaseChecksum()); 1483 1484 this.streamWrapper = stream; 1485 // Older versions of HBase didn't support checksum. 1486 this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum()); 1487 defaultDecodingCtx = new HFileBlockDefaultDecodingContext(fileContext); 1488 encodedBlockDecodingCtx = defaultDecodingCtx; 1489 } 1490 1491 /** 1492 * A constructor that reads files with the latest minor version. 1493 * This is used by unit tests only. 1494 */ 1495 FSReaderImpl(FSDataInputStream istream, long fileSize, HFileContext fileContext) 1496 throws IOException { 1497 this(new FSDataInputStreamWrapper(istream), fileSize, null, null, fileContext); 1498 } 1499 1500 @Override 1501 public BlockIterator blockRange(final long startOffset, final long endOffset) { 1502 final FSReader owner = this; // handle for inner class 1503 return new BlockIterator() { 1504 private long offset = startOffset; 1505 // Cache length of next block. Current block has the length of next block in it. 1506 private long length = -1; 1507 1508 @Override 1509 public HFileBlock nextBlock() throws IOException { 1510 if (offset >= endOffset) { 1511 return null; 1512 } 1513 HFileBlock b = readBlockData(offset, length, false, false); 1514 offset += b.getOnDiskSizeWithHeader(); 1515 length = b.getNextBlockOnDiskSize(); 1516 return b.unpack(fileContext, owner); 1517 } 1518 1519 @Override 1520 public HFileBlock nextBlockWithBlockType(BlockType blockType) 1521 throws IOException { 1522 HFileBlock blk = nextBlock(); 1523 if (blk.getBlockType() != blockType) { 1524 throw new IOException("Expected block of type " + blockType 1525 + " but found " + blk.getBlockType()); 1526 } 1527 return blk; 1528 } 1529 }; 1530 } 1531 1532 /** 1533 * Does a positional read or a seek and read into the given buffer. Returns 1534 * the on-disk size of the next block, or -1 if it could not be read/determined; e.g. EOF. 1535 * 1536 * @param dest destination buffer 1537 * @param destOffset offset into the destination buffer at where to put the bytes we read 1538 * @param size size of read 1539 * @param peekIntoNextBlock whether to read the next block's on-disk size 1540 * @param fileOffset position in the stream to read at 1541 * @param pread whether we should do a positional read 1542 * @param istream The input source of data 1543 * @return the on-disk size of the next block with header size included, or 1544 * -1 if it could not be determined; if not -1, the <code>dest</code> INCLUDES the 1545 * next header 1546 * @throws IOException 1547 */ 1548 @VisibleForTesting 1549 protected int readAtOffset(FSDataInputStream istream, byte[] dest, int destOffset, int size, 1550 boolean peekIntoNextBlock, long fileOffset, boolean pread) 1551 throws IOException { 1552 if (peekIntoNextBlock && destOffset + size + hdrSize > dest.length) { 1553 // We are asked to read the next block's header as well, but there is 1554 // not enough room in the array. 1555 throw new IOException("Attempted to read " + size + " bytes and " + hdrSize + 1556 " bytes of next header into a " + dest.length + "-byte array at offset " + destOffset); 1557 } 1558 1559 if (!pread) { 1560 // Seek + read. Better for scanning. 1561 HFileUtil.seekOnMultipleSources(istream, fileOffset); 1562 // TODO: do we need seek time latencies? 1563 long realOffset = istream.getPos(); 1564 if (realOffset != fileOffset) { 1565 throw new IOException("Tried to seek to " + fileOffset + " to " + "read " + size + 1566 " bytes, but pos=" + realOffset + " after seek"); 1567 } 1568 1569 if (!peekIntoNextBlock) { 1570 IOUtils.readFully(istream, dest, destOffset, size); 1571 return -1; 1572 } 1573 1574 // Try to read the next block header. 1575 if (!readWithExtra(istream, dest, destOffset, size, hdrSize)) { 1576 return -1; 1577 } 1578 } else { 1579 // Positional read. Better for random reads; or when the streamLock is already locked. 1580 int extraSize = peekIntoNextBlock ? hdrSize : 0; 1581 if (!positionalReadWithExtra(istream, fileOffset, dest, destOffset, size, extraSize)) { 1582 return -1; 1583 } 1584 } 1585 assert peekIntoNextBlock; 1586 return Bytes.toInt(dest, destOffset + size + BlockType.MAGIC_LENGTH) + hdrSize; 1587 } 1588 1589 /** 1590 * Reads a version 2 block (version 1 blocks not supported and not expected). Tries to do as 1591 * little memory allocation as possible, using the provided on-disk size. 1592 * 1593 * @param offset the offset in the stream to read at 1594 * @param onDiskSizeWithHeaderL the on-disk size of the block, including 1595 * the header, or -1 if unknown; i.e. when iterating over blocks reading 1596 * in the file metadata info. 1597 * @param pread whether to use a positional read 1598 */ 1599 @Override 1600 public HFileBlock readBlockData(long offset, long onDiskSizeWithHeaderL, boolean pread, 1601 boolean updateMetrics) throws IOException { 1602 // Get a copy of the current state of whether to validate 1603 // hbase checksums or not for this read call. This is not 1604 // thread-safe but the one constaint is that if we decide 1605 // to skip hbase checksum verification then we are 1606 // guaranteed to use hdfs checksum verification. 1607 boolean doVerificationThruHBaseChecksum = streamWrapper.shouldUseHBaseChecksum(); 1608 FSDataInputStream is = streamWrapper.getStream(doVerificationThruHBaseChecksum); 1609 1610 HFileBlock blk = readBlockDataInternal(is, offset, 1611 onDiskSizeWithHeaderL, pread, 1612 doVerificationThruHBaseChecksum, updateMetrics); 1613 if (blk == null) { 1614 HFile.LOG.warn("HBase checksum verification failed for file " + 1615 pathName + " at offset " + 1616 offset + " filesize " + fileSize + 1617 ". Retrying read with HDFS checksums turned on..."); 1618 1619 if (!doVerificationThruHBaseChecksum) { 1620 String msg = "HBase checksum verification failed for file " + 1621 pathName + " at offset " + 1622 offset + " filesize " + fileSize + 1623 " but this cannot happen because doVerify is " + 1624 doVerificationThruHBaseChecksum; 1625 HFile.LOG.warn(msg); 1626 throw new IOException(msg); // cannot happen case here 1627 } 1628 HFile.CHECKSUM_FAILURES.increment(); // update metrics 1629 1630 // If we have a checksum failure, we fall back into a mode where 1631 // the next few reads use HDFS level checksums. We aim to make the 1632 // next CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD reads avoid 1633 // hbase checksum verification, but since this value is set without 1634 // holding any locks, it can so happen that we might actually do 1635 // a few more than precisely this number. 1636 is = this.streamWrapper.fallbackToFsChecksum(CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD); 1637 doVerificationThruHBaseChecksum = false; 1638 blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread, 1639 doVerificationThruHBaseChecksum, updateMetrics); 1640 if (blk != null) { 1641 HFile.LOG.warn("HDFS checksum verification succeeded for file " + 1642 pathName + " at offset " + 1643 offset + " filesize " + fileSize); 1644 } 1645 } 1646 if (blk == null && !doVerificationThruHBaseChecksum) { 1647 String msg = "readBlockData failed, possibly due to " + 1648 "checksum verification failed for file " + pathName + 1649 " at offset " + offset + " filesize " + fileSize; 1650 HFile.LOG.warn(msg); 1651 throw new IOException(msg); 1652 } 1653 1654 // If there is a checksum mismatch earlier, then retry with 1655 // HBase checksums switched off and use HDFS checksum verification. 1656 // This triggers HDFS to detect and fix corrupt replicas. The 1657 // next checksumOffCount read requests will use HDFS checksums. 1658 // The decrementing of this.checksumOffCount is not thread-safe, 1659 // but it is harmless because eventually checksumOffCount will be 1660 // a negative number. 1661 streamWrapper.checksumOk(); 1662 return blk; 1663 } 1664 1665 /** 1666 * @return Check <code>onDiskSizeWithHeaderL</code> size is healthy and then return it as an int 1667 * @throws IOException 1668 */ 1669 private static int checkAndGetSizeAsInt(final long onDiskSizeWithHeaderL, final int hdrSize) 1670 throws IOException { 1671 if ((onDiskSizeWithHeaderL < hdrSize && onDiskSizeWithHeaderL != -1) 1672 || onDiskSizeWithHeaderL >= Integer.MAX_VALUE) { 1673 throw new IOException("Invalid onDisksize=" + onDiskSizeWithHeaderL 1674 + ": expected to be at least " + hdrSize 1675 + " and at most " + Integer.MAX_VALUE + ", or -1"); 1676 } 1677 return (int)onDiskSizeWithHeaderL; 1678 } 1679 1680 /** 1681 * Verify the passed in onDiskSizeWithHeader aligns with what is in the header else something 1682 * is not right. 1683 * @throws IOException 1684 */ 1685 private void verifyOnDiskSizeMatchesHeader(final int passedIn, final ByteBuffer headerBuf, 1686 final long offset, boolean verifyChecksum) 1687 throws IOException { 1688 // Assert size provided aligns with what is in the header 1689 int fromHeader = getOnDiskSizeWithHeader(headerBuf, verifyChecksum); 1690 if (passedIn != fromHeader) { 1691 throw new IOException("Passed in onDiskSizeWithHeader=" + passedIn + " != " + fromHeader + 1692 ", offset=" + offset + ", fileContext=" + this.fileContext); 1693 } 1694 } 1695 1696 /** 1697 * Check atomic reference cache for this block's header. Cache only good if next 1698 * read coming through is next in sequence in the block. We read next block's 1699 * header on the tail of reading the previous block to save a seek. Otherwise, 1700 * we have to do a seek to read the header before we can pull in the block OR 1701 * we have to backup the stream because we over-read (the next block's header). 1702 * @see PrefetchedHeader 1703 * @return The cached block header or null if not found. 1704 * @see #cacheNextBlockHeader(long, byte[], int, int) 1705 */ 1706 private ByteBuffer getCachedHeader(final long offset) { 1707 PrefetchedHeader ph = this.prefetchedHeader.get(); 1708 return ph != null && ph.offset == offset? ph.buf: null; 1709 } 1710 1711 /** 1712 * Save away the next blocks header in atomic reference. 1713 * @see #getCachedHeader(long) 1714 * @see PrefetchedHeader 1715 */ 1716 private void cacheNextBlockHeader(final long offset, 1717 final byte [] header, final int headerOffset, final int headerLength) { 1718 PrefetchedHeader ph = new PrefetchedHeader(); 1719 ph.offset = offset; 1720 System.arraycopy(header, headerOffset, ph.header, 0, headerLength); 1721 this.prefetchedHeader.set(ph); 1722 } 1723 1724 /** 1725 * Reads a version 2 block. 1726 * 1727 * @param offset the offset in the stream to read at. 1728 * @param onDiskSizeWithHeaderL the on-disk size of the block, including 1729 * the header and checksums if present or -1 if unknown (as a long). Can be -1 1730 * if we are doing raw iteration of blocks as when loading up file metadata; i.e. 1731 * the first read of a new file. Usually non-null gotten from the file index. 1732 * @param pread whether to use a positional read 1733 * @param verifyChecksum Whether to use HBase checksums. 1734 * If HBase checksum is switched off, then use HDFS checksum. Can also flip on/off 1735 * reading same file if we hit a troublesome patch in an hfile. 1736 * @return the HFileBlock or null if there is a HBase checksum mismatch 1737 */ 1738 @VisibleForTesting 1739 protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset, 1740 long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum, boolean updateMetrics) 1741 throws IOException { 1742 if (offset < 0) { 1743 throw new IOException("Invalid offset=" + offset + " trying to read " 1744 + "block (onDiskSize=" + onDiskSizeWithHeaderL + ")"); 1745 } 1746 int onDiskSizeWithHeader = checkAndGetSizeAsInt(onDiskSizeWithHeaderL, hdrSize); 1747 // Try and get cached header. Will serve us in rare case where onDiskSizeWithHeaderL is -1 1748 // and will save us having to seek the stream backwards to reread the header we 1749 // read the last time through here. 1750 ByteBuffer headerBuf = getCachedHeader(offset); 1751 LOG.trace("Reading {} at offset={}, pread={}, verifyChecksum={}, cachedHeader={}, " + 1752 "onDiskSizeWithHeader={}", this.fileContext.getHFileName(), offset, pread, 1753 verifyChecksum, headerBuf, onDiskSizeWithHeader); 1754 // This is NOT same as verifyChecksum. This latter is whether to do hbase 1755 // checksums. Can change with circumstances. The below flag is whether the 1756 // file has support for checksums (version 2+). 1757 boolean checksumSupport = this.fileContext.isUseHBaseChecksum(); 1758 long startTime = System.currentTimeMillis(); 1759 if (onDiskSizeWithHeader <= 0) { 1760 // We were not passed the block size. Need to get it from the header. If header was 1761 // not cached (see getCachedHeader above), need to seek to pull it in. This is costly 1762 // and should happen very rarely. Currently happens on open of a hfile reader where we 1763 // read the trailer blocks to pull in the indices. Otherwise, we are reading block sizes 1764 // out of the hfile index. To check, enable TRACE in this file and you'll get an exception 1765 // in a LOG every time we seek. See HBASE-17072 for more detail. 1766 if (headerBuf == null) { 1767 if (LOG.isTraceEnabled()) { 1768 LOG.trace("Extra see to get block size!", new RuntimeException()); 1769 } 1770 headerBuf = ByteBuffer.allocate(hdrSize); 1771 readAtOffset(is, headerBuf.array(), headerBuf.arrayOffset(), hdrSize, false, 1772 offset, pread); 1773 } 1774 onDiskSizeWithHeader = getOnDiskSizeWithHeader(headerBuf, checksumSupport); 1775 } 1776 int preReadHeaderSize = headerBuf == null? 0 : hdrSize; 1777 // Allocate enough space to fit the next block's header too; saves a seek next time through. 1778 // onDiskBlock is whole block + header + checksums then extra hdrSize to read next header; 1779 // onDiskSizeWithHeader is header, body, and any checksums if present. preReadHeaderSize 1780 // says where to start reading. If we have the header cached, then we don't need to read 1781 // it again and we can likely read from last place we left off w/o need to backup and reread 1782 // the header we read last time through here. 1783 // TODO: Make this ByteBuffer-based. Will make it easier to go to HDFS with BBPool (offheap). 1784 byte [] onDiskBlock = new byte[onDiskSizeWithHeader + hdrSize]; 1785 int nextBlockOnDiskSize = readAtOffset(is, onDiskBlock, preReadHeaderSize, 1786 onDiskSizeWithHeader - preReadHeaderSize, true, offset + preReadHeaderSize, pread); 1787 if (headerBuf != null) { 1788 // The header has been read when reading the previous block OR in a distinct header-only 1789 // read. Copy to this block's header. 1790 System.arraycopy(headerBuf.array(), headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize); 1791 } else { 1792 headerBuf = ByteBuffer.wrap(onDiskBlock, 0, hdrSize); 1793 } 1794 // Do a few checks before we go instantiate HFileBlock. 1795 assert onDiskSizeWithHeader > this.hdrSize; 1796 verifyOnDiskSizeMatchesHeader(onDiskSizeWithHeader, headerBuf, offset, checksumSupport); 1797 ByteBuffer onDiskBlockByteBuffer = ByteBuffer.wrap(onDiskBlock, 0, onDiskSizeWithHeader); 1798 // Verify checksum of the data before using it for building HFileBlock. 1799 if (verifyChecksum && 1800 !validateChecksum(offset, onDiskBlockByteBuffer, hdrSize)) { 1801 return null; 1802 } 1803 long duration = System.currentTimeMillis() - startTime; 1804 if (updateMetrics) { 1805 HFile.updateReadLatency(duration, pread); 1806 } 1807 // The onDiskBlock will become the headerAndDataBuffer for this block. 1808 // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already 1809 // contains the header of next block, so no need to set next block's header in it. 1810 HFileBlock hFileBlock = 1811 new HFileBlock(new SingleByteBuff(onDiskBlockByteBuffer), checksumSupport, 1812 MemoryType.EXCLUSIVE, offset, nextBlockOnDiskSize, fileContext); 1813 // Run check on uncompressed sizings. 1814 if (!fileContext.isCompressedOrEncrypted()) { 1815 hFileBlock.sanityCheckUncompressed(); 1816 } 1817 LOG.trace("Read {} in {} ns", hFileBlock, duration); 1818 // Cache next block header if we read it for the next time through here. 1819 if (nextBlockOnDiskSize != -1) { 1820 cacheNextBlockHeader(offset + hFileBlock.getOnDiskSizeWithHeader(), 1821 onDiskBlock, onDiskSizeWithHeader, hdrSize); 1822 } 1823 return hFileBlock; 1824 } 1825 1826 @Override 1827 public void setIncludesMemStoreTS(boolean includesMemstoreTS) { 1828 this.fileContext.setIncludesMvcc(includesMemstoreTS); 1829 } 1830 1831 @Override 1832 public void setDataBlockEncoder(HFileDataBlockEncoder encoder) { 1833 encodedBlockDecodingCtx = encoder.newDataBlockDecodingContext(this.fileContext); 1834 } 1835 1836 @Override 1837 public HFileBlockDecodingContext getBlockDecodingContext() { 1838 return this.encodedBlockDecodingCtx; 1839 } 1840 1841 @Override 1842 public HFileBlockDecodingContext getDefaultBlockDecodingContext() { 1843 return this.defaultDecodingCtx; 1844 } 1845 1846 /** 1847 * Generates the checksum for the header as well as the data and then validates it. 1848 * If the block doesn't uses checksum, returns false. 1849 * @return True if checksum matches, else false. 1850 */ 1851 private boolean validateChecksum(long offset, ByteBuffer data, int hdrSize) 1852 throws IOException { 1853 // If this is an older version of the block that does not have checksums, then return false 1854 // indicating that checksum verification did not succeed. Actually, this method should never 1855 // be called when the minorVersion is 0, thus this is a defensive check for a cannot-happen 1856 // case. Since this is a cannot-happen case, it is better to return false to indicate a 1857 // checksum validation failure. 1858 if (!fileContext.isUseHBaseChecksum()) { 1859 return false; 1860 } 1861 return ChecksumUtil.validateChecksum(data, pathName, offset, hdrSize); 1862 } 1863 1864 @Override 1865 public void closeStreams() throws IOException { 1866 streamWrapper.close(); 1867 } 1868 1869 @Override 1870 public void unbufferStream() { 1871 // To handle concurrent reads, ensure that no other client is accessing the streams while we 1872 // unbuffer it. 1873 if (streamLock.tryLock()) { 1874 try { 1875 this.streamWrapper.unbuffer(); 1876 } finally { 1877 streamLock.unlock(); 1878 } 1879 } 1880 } 1881 1882 @Override 1883 public String toString() { 1884 return "hfs=" + hfs + ", path=" + pathName + ", fileContext=" + fileContext; 1885 } 1886 } 1887 1888 /** An additional sanity-check in case no compression or encryption is being used. */ 1889 @VisibleForTesting 1890 void sanityCheckUncompressed() throws IOException { 1891 if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader + 1892 totalChecksumBytes()) { 1893 throw new IOException("Using no compression but " 1894 + "onDiskSizeWithoutHeader=" + onDiskSizeWithoutHeader + ", " 1895 + "uncompressedSizeWithoutHeader=" + uncompressedSizeWithoutHeader 1896 + ", numChecksumbytes=" + totalChecksumBytes()); 1897 } 1898 } 1899 1900 // Cacheable implementation 1901 @Override 1902 public int getSerializedLength() { 1903 if (buf != null) { 1904 // Include extra bytes for block metadata. 1905 return this.buf.limit() + BLOCK_METADATA_SPACE; 1906 } 1907 return 0; 1908 } 1909 1910 // Cacheable implementation 1911 @Override 1912 public void serialize(ByteBuffer destination, boolean includeNextBlockMetadata) { 1913 this.buf.get(destination, 0, getSerializedLength() - BLOCK_METADATA_SPACE); 1914 destination = addMetaData(destination, includeNextBlockMetadata); 1915 1916 // Make it ready for reading. flip sets position to zero and limit to current position which 1917 // is what we want if we do not want to serialize the block plus checksums if present plus 1918 // metadata. 1919 destination.flip(); 1920 } 1921 1922 /** 1923 * For use by bucketcache. This exposes internals. 1924 */ 1925 public ByteBuffer getMetaData() { 1926 ByteBuffer bb = ByteBuffer.allocate(BLOCK_METADATA_SPACE); 1927 bb = addMetaData(bb, true); 1928 bb.flip(); 1929 return bb; 1930 } 1931 1932 /** 1933 * Adds metadata at current position (position is moved forward). Does not flip or reset. 1934 * @return The passed <code>destination</code> with metadata added. 1935 */ 1936 private ByteBuffer addMetaData(final ByteBuffer destination, boolean includeNextBlockMetadata) { 1937 destination.put(this.fileContext.isUseHBaseChecksum() ? (byte) 1 : (byte) 0); 1938 destination.putLong(this.offset); 1939 if (includeNextBlockMetadata) { 1940 destination.putInt(this.nextBlockOnDiskSize); 1941 } 1942 return destination; 1943 } 1944 1945 // Cacheable implementation 1946 @Override 1947 public CacheableDeserializer<Cacheable> getDeserializer() { 1948 return HFileBlock.BLOCK_DESERIALIZER; 1949 } 1950 1951 @Override 1952 public int hashCode() { 1953 int result = 1; 1954 result = result * 31 + blockType.hashCode(); 1955 result = result * 31 + nextBlockOnDiskSize; 1956 result = result * 31 + (int) (offset ^ (offset >>> 32)); 1957 result = result * 31 + onDiskSizeWithoutHeader; 1958 result = result * 31 + (int) (prevBlockOffset ^ (prevBlockOffset >>> 32)); 1959 result = result * 31 + uncompressedSizeWithoutHeader; 1960 result = result * 31 + buf.hashCode(); 1961 return result; 1962 } 1963 1964 @Override 1965 public boolean equals(Object comparison) { 1966 if (this == comparison) { 1967 return true; 1968 } 1969 if (comparison == null) { 1970 return false; 1971 } 1972 if (comparison.getClass() != this.getClass()) { 1973 return false; 1974 } 1975 1976 HFileBlock castedComparison = (HFileBlock) comparison; 1977 1978 if (castedComparison.blockType != this.blockType) { 1979 return false; 1980 } 1981 if (castedComparison.nextBlockOnDiskSize != this.nextBlockOnDiskSize) { 1982 return false; 1983 } 1984 // Offset is important. Needed when we have to remake cachekey when block is returned to cache. 1985 if (castedComparison.offset != this.offset) { 1986 return false; 1987 } 1988 if (castedComparison.onDiskSizeWithoutHeader != this.onDiskSizeWithoutHeader) { 1989 return false; 1990 } 1991 if (castedComparison.prevBlockOffset != this.prevBlockOffset) { 1992 return false; 1993 } 1994 if (castedComparison.uncompressedSizeWithoutHeader != this.uncompressedSizeWithoutHeader) { 1995 return false; 1996 } 1997 if (ByteBuff.compareTo(this.buf, 0, this.buf.limit(), castedComparison.buf, 0, 1998 castedComparison.buf.limit()) != 0) { 1999 return false; 2000 } 2001 return true; 2002 } 2003 2004 DataBlockEncoding getDataBlockEncoding() { 2005 if (blockType == BlockType.ENCODED_DATA) { 2006 return DataBlockEncoding.getEncodingById(getDataBlockEncodingId()); 2007 } 2008 return DataBlockEncoding.NONE; 2009 } 2010 2011 @VisibleForTesting 2012 byte getChecksumType() { 2013 return this.fileContext.getChecksumType().getCode(); 2014 } 2015 2016 int getBytesPerChecksum() { 2017 return this.fileContext.getBytesPerChecksum(); 2018 } 2019 2020 /** @return the size of data on disk + header. Excludes checksum. */ 2021 @VisibleForTesting 2022 int getOnDiskDataSizeWithHeader() { 2023 return this.onDiskDataSizeWithHeader; 2024 } 2025 2026 /** 2027 * Calculate the number of bytes required to store all the checksums 2028 * for this block. Each checksum value is a 4 byte integer. 2029 */ 2030 int totalChecksumBytes() { 2031 // If the hfile block has minorVersion 0, then there are no checksum 2032 // data to validate. Similarly, a zero value in this.bytesPerChecksum 2033 // indicates that cached blocks do not have checksum data because 2034 // checksums were already validated when the block was read from disk. 2035 if (!fileContext.isUseHBaseChecksum() || this.fileContext.getBytesPerChecksum() == 0) { 2036 return 0; 2037 } 2038 return (int) ChecksumUtil.numBytes(onDiskDataSizeWithHeader, 2039 this.fileContext.getBytesPerChecksum()); 2040 } 2041 2042 /** 2043 * Returns the size of this block header. 2044 */ 2045 public int headerSize() { 2046 return headerSize(this.fileContext.isUseHBaseChecksum()); 2047 } 2048 2049 /** 2050 * Maps a minor version to the size of the header. 2051 */ 2052 public static int headerSize(boolean usesHBaseChecksum) { 2053 return usesHBaseChecksum? 2054 HConstants.HFILEBLOCK_HEADER_SIZE: HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM; 2055 } 2056 2057 /** 2058 * Return the appropriate DUMMY_HEADER for the minor version 2059 */ 2060 @VisibleForTesting 2061 // TODO: Why is this in here? 2062 byte[] getDummyHeaderForVersion() { 2063 return getDummyHeaderForVersion(this.fileContext.isUseHBaseChecksum()); 2064 } 2065 2066 /** 2067 * Return the appropriate DUMMY_HEADER for the minor version 2068 */ 2069 static private byte[] getDummyHeaderForVersion(boolean usesHBaseChecksum) { 2070 return usesHBaseChecksum? HConstants.HFILEBLOCK_DUMMY_HEADER: DUMMY_HEADER_NO_CHECKSUM; 2071 } 2072 2073 /** 2074 * @return This HFileBlocks fileContext which will a derivative of the 2075 * fileContext for the file from which this block's data was originally read. 2076 */ 2077 HFileContext getHFileContext() { 2078 return this.fileContext; 2079 } 2080 2081 @Override 2082 public MemoryType getMemoryType() { 2083 return this.memType; 2084 } 2085 2086 /** 2087 * @return true if this block is backed by a shared memory area(such as that of a BucketCache). 2088 */ 2089 boolean usesSharedMemory() { 2090 return this.memType == MemoryType.SHARED; 2091 } 2092 2093 /** 2094 * Convert the contents of the block header into a human readable string. 2095 * This is mostly helpful for debugging. This assumes that the block 2096 * has minor version > 0. 2097 */ 2098 @VisibleForTesting 2099 static String toStringHeader(ByteBuff buf) throws IOException { 2100 byte[] magicBuf = new byte[Math.min(buf.limit() - buf.position(), BlockType.MAGIC_LENGTH)]; 2101 buf.get(magicBuf); 2102 BlockType bt = BlockType.parse(magicBuf, 0, BlockType.MAGIC_LENGTH); 2103 int compressedBlockSizeNoHeader = buf.getInt(); 2104 int uncompressedBlockSizeNoHeader = buf.getInt(); 2105 long prevBlockOffset = buf.getLong(); 2106 byte cksumtype = buf.get(); 2107 long bytesPerChecksum = buf.getInt(); 2108 long onDiskDataSizeWithHeader = buf.getInt(); 2109 return " Header dump: magic: " + Bytes.toString(magicBuf) + 2110 " blockType " + bt + 2111 " compressedBlockSizeNoHeader " + 2112 compressedBlockSizeNoHeader + 2113 " uncompressedBlockSizeNoHeader " + 2114 uncompressedBlockSizeNoHeader + 2115 " prevBlockOffset " + prevBlockOffset + 2116 " checksumType " + ChecksumType.codeToType(cksumtype) + 2117 " bytesPerChecksum " + bytesPerChecksum + 2118 " onDiskDataSizeWithHeader " + onDiskDataSizeWithHeader; 2119 } 2120 2121 public HFileBlock deepClone() { 2122 return new HFileBlock(this, true); 2123 } 2124}