001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import java.io.DataInputStream; 021import java.io.DataOutput; 022import java.io.DataOutputStream; 023import java.io.IOException; 024import java.io.InputStream; 025import java.nio.ByteBuffer; 026import java.util.concurrent.atomic.AtomicReference; 027import java.util.concurrent.locks.Lock; 028import java.util.concurrent.locks.ReentrantLock; 029 030import org.apache.hadoop.fs.FSDataInputStream; 031import org.apache.hadoop.fs.FSDataOutputStream; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.Cell; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038import org.apache.hadoop.hbase.fs.HFileSystem; 039import org.apache.hadoop.hbase.io.ByteArrayOutputStream; 040import org.apache.hadoop.hbase.io.ByteBuffInputStream; 041import org.apache.hadoop.hbase.io.ByteBufferWriterDataOutputStream; 042import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; 043import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 044import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; 045import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext; 046import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext; 047import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext; 048import org.apache.hadoop.hbase.nio.ByteBuff; 049import org.apache.hadoop.hbase.nio.MultiByteBuff; 050import org.apache.hadoop.hbase.nio.SingleByteBuff; 051import org.apache.hadoop.hbase.util.Bytes; 052import org.apache.hadoop.hbase.util.ChecksumType; 053import org.apache.hadoop.hbase.util.ClassSize; 054import org.apache.hadoop.io.IOUtils; 055 056import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 057import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 058 059/** 060 * Cacheable Blocks of an {@link HFile} version 2 file. 061 * Version 2 was introduced in hbase-0.92.0. 062 * 063 * <p>Version 1 was the original file block. Version 2 was introduced when we changed the hbase file 064 * format to support multi-level block indexes and compound bloom filters (HBASE-3857). Support 065 * for Version 1 was removed in hbase-1.3.0. 066 * 067 * <h3>HFileBlock: Version 2</h3> 068 * In version 2, a block is structured as follows: 069 * <ul> 070 * <li><b>Header:</b> See Writer#putHeader() for where header is written; header total size is 071 * HFILEBLOCK_HEADER_SIZE 072 * <ul> 073 * <li>0. blockType: Magic record identifying the {@link BlockType} (8 bytes): 074 * e.g. <code>DATABLK*</code> 075 * <li>1. onDiskSizeWithoutHeader: Compressed -- a.k.a 'on disk' -- block size, excluding header, 076 * but including tailing checksum bytes (4 bytes) 077 * <li>2. uncompressedSizeWithoutHeader: Uncompressed block size, excluding header, and excluding 078 * checksum bytes (4 bytes) 079 * <li>3. prevBlockOffset: The offset of the previous block of the same type (8 bytes). This is 080 * used to navigate to the previous block without having to go to the block index 081 * <li>4: For minorVersions >=1, the ordinal describing checksum type (1 byte) 082 * <li>5: For minorVersions >=1, the number of data bytes/checksum chunk (4 bytes) 083 * <li>6: onDiskDataSizeWithHeader: For minorVersions >=1, the size of data 'on disk', including 084 * header, excluding checksums (4 bytes) 085 * </ul> 086 * </li> 087 * <li><b>Raw/Compressed/Encrypted/Encoded data:</b> The compression 088 * algorithm is the same for all the blocks in an {@link HFile}. If compression is NONE, this is 089 * just raw, serialized Cells. 090 * <li><b>Tail:</b> For minorVersions >=1, a series of 4 byte checksums, one each for 091 * the number of bytes specified by bytesPerChecksum. 092 * </ul> 093 * 094 * <h3>Caching</h3> 095 * Caches cache whole blocks with trailing checksums if any. We then tag on some metadata, the 096 * content of BLOCK_METADATA_SPACE which will be flag on if we are doing 'hbase' 097 * checksums and then the offset into the file which is needed when we re-make a cache key 098 * when we return the block to the cache as 'done'. 099 * See {@link Cacheable#serialize(ByteBuffer, boolean)} and {@link Cacheable#getDeserializer()}. 100 * 101 * <p>TODO: Should we cache the checksums? Down in Writer#getBlockForCaching(CacheConfig) where 102 * we make a block to cache-on-write, there is an attempt at turning off checksums. This is not the 103 * only place we get blocks to cache. We also will cache the raw return from an hdfs read. In this 104 * case, the checksums may be present. If the cache is backed by something that doesn't do ECC, 105 * say an SSD, we might want to preserve checksums. For now this is open question. 106 * <p>TODO: Over in BucketCache, we save a block allocation by doing a custom serialization. 107 * Be sure to change it if serialization changes in here. Could we add a method here that takes an 108 * IOEngine and that then serializes to it rather than expose our internals over in BucketCache? 109 * IOEngine is in the bucket subpackage. Pull it up? Then this class knows about bucketcache. Ugh. 110 */ 111@InterfaceAudience.Private 112public class HFileBlock implements Cacheable { 113 private static final Logger LOG = LoggerFactory.getLogger(HFileBlock.class); 114 115 // Block Header fields. 116 117 // TODO: encapsulate Header related logic in this inner class. 118 static class Header { 119 // Format of header is: 120 // 8 bytes - block magic 121 // 4 bytes int - onDiskSizeWithoutHeader 122 // 4 bytes int - uncompressedSizeWithoutHeader 123 // 8 bytes long - prevBlockOffset 124 // The following 3 are only present if header contains checksum information 125 // 1 byte - checksum type 126 // 4 byte int - bytes per checksum 127 // 4 byte int - onDiskDataSizeWithHeader 128 static int BLOCK_MAGIC_INDEX = 0; 129 static int ON_DISK_SIZE_WITHOUT_HEADER_INDEX = 8; 130 static int UNCOMPRESSED_SIZE_WITHOUT_HEADER_INDEX = 12; 131 static int PREV_BLOCK_OFFSET_INDEX = 16; 132 static int CHECKSUM_TYPE_INDEX = 24; 133 static int BYTES_PER_CHECKSUM_INDEX = 25; 134 static int ON_DISK_DATA_SIZE_WITH_HEADER_INDEX = 29; 135 } 136 137 /** Type of block. Header field 0. */ 138 private BlockType blockType; 139 140 /** 141 * Size on disk excluding header, including checksum. Header field 1. 142 * @see Writer#putHeader(byte[], int, int, int, int) 143 */ 144 private int onDiskSizeWithoutHeader; 145 146 /** 147 * Size of pure data. Does not include header or checksums. Header field 2. 148 * @see Writer#putHeader(byte[], int, int, int, int) 149 */ 150 private int uncompressedSizeWithoutHeader; 151 152 /** 153 * The offset of the previous block on disk. Header field 3. 154 * @see Writer#putHeader(byte[], int, int, int, int) 155 */ 156 private long prevBlockOffset; 157 158 /** 159 * Size on disk of header + data. Excludes checksum. Header field 6, 160 * OR calculated from {@link #onDiskSizeWithoutHeader} when using HDFS checksum. 161 * @see Writer#putHeader(byte[], int, int, int, int) 162 */ 163 private int onDiskDataSizeWithHeader; 164 // End of Block Header fields. 165 166 /** 167 * The in-memory representation of the hfile block. Can be on or offheap. Can be backed by 168 * a single ByteBuffer or by many. Make no assumptions. 169 * 170 * <p>Be careful reading from this <code>buf</code>. Duplicate and work on the duplicate or if 171 * not, be sure to reset position and limit else trouble down the road. 172 * 173 * <p>TODO: Make this read-only once made. 174 * 175 * <p>We are using the ByteBuff type. ByteBuffer is not extensible yet we need to be able to have 176 * a ByteBuffer-like API across multiple ByteBuffers reading from a cache such as BucketCache. 177 * So, we have this ByteBuff type. Unfortunately, it is spread all about HFileBlock. Would be 178 * good if could be confined to cache-use only but hard-to-do. 179 */ 180 private ByteBuff buf; 181 182 /** Meta data that holds meta information on the hfileblock. 183 */ 184 private HFileContext fileContext; 185 186 /** 187 * The offset of this block in the file. Populated by the reader for 188 * convenience of access. This offset is not part of the block header. 189 */ 190 private long offset = UNSET; 191 192 private MemoryType memType = MemoryType.EXCLUSIVE; 193 194 /** 195 * The on-disk size of the next block, including the header and checksums if present. 196 * UNSET if unknown. 197 * 198 * Blocks try to carry the size of the next block to read in this data member. Usually 199 * we get block sizes from the hfile index but sometimes the index is not available: 200 * e.g. when we read the indexes themselves (indexes are stored in blocks, we do not 201 * have an index for the indexes). Saves seeks especially around file open when 202 * there is a flurry of reading in hfile metadata. 203 */ 204 private int nextBlockOnDiskSize = UNSET; 205 206 /** 207 * On a checksum failure, do these many succeeding read requests using hdfs checksums before 208 * auto-reenabling hbase checksum verification. 209 */ 210 static final int CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD = 3; 211 212 private static int UNSET = -1; 213 public static final boolean FILL_HEADER = true; 214 public static final boolean DONT_FILL_HEADER = false; 215 216 // How to get the estimate correctly? if it is a singleBB? 217 public static final int MULTI_BYTE_BUFFER_HEAP_SIZE = 218 (int)ClassSize.estimateBase(MultiByteBuff.class, false); 219 220 /** 221 * Space for metadata on a block that gets stored along with the block when we cache it. 222 * There are a few bytes stuck on the end of the HFileBlock that we pull in from HDFS. 223 * 8 bytes are for the offset of this block (long) in the file. Offset is important because is is 224 * used when we remake the CacheKey when we return block to the cache when done. There is also 225 * a flag on whether checksumming is being done by hbase or not. See class comment for note on 226 * uncertain state of checksumming of blocks that come out of cache (should we or should we not?). 227 * Finally there are 4 bytes to hold the length of the next block which can save a seek on 228 * occasion if available. 229 * (This EXTRA info came in with original commit of the bucketcache, HBASE-7404. It was 230 * formerly known as EXTRA_SERIALIZATION_SPACE). 231 */ 232 static final int BLOCK_METADATA_SPACE = Bytes.SIZEOF_BYTE + Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT; 233 234 /** 235 * Each checksum value is an integer that can be stored in 4 bytes. 236 */ 237 static final int CHECKSUM_SIZE = Bytes.SIZEOF_INT; 238 239 static final byte[] DUMMY_HEADER_NO_CHECKSUM = 240 new byte[HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM]; 241 242 /** 243 * Used deserializing blocks from Cache. 244 * 245 * <code> 246 * ++++++++++++++ 247 * + HFileBlock + 248 * ++++++++++++++ 249 * + Checksums + <= Optional 250 * ++++++++++++++ 251 * + Metadata! + <= See note on BLOCK_METADATA_SPACE above. 252 * ++++++++++++++ 253 * </code> 254 * @see #serialize(ByteBuffer) 255 */ 256 static final CacheableDeserializer<Cacheable> BLOCK_DESERIALIZER = 257 new CacheableDeserializer<Cacheable>() { 258 @Override 259 public HFileBlock deserialize(ByteBuff buf, boolean reuse, MemoryType memType) 260 throws IOException { 261 // The buf has the file block followed by block metadata. 262 // Set limit to just before the BLOCK_METADATA_SPACE then rewind. 263 buf.limit(buf.limit() - BLOCK_METADATA_SPACE).rewind(); 264 // Get a new buffer to pass the HFileBlock for it to 'own'. 265 ByteBuff newByteBuff; 266 if (reuse) { 267 newByteBuff = buf.slice(); 268 } else { 269 int len = buf.limit(); 270 newByteBuff = new SingleByteBuff(ByteBuffer.allocate(len)); 271 newByteBuff.put(0, buf, buf.position(), len); 272 } 273 // Read out the BLOCK_METADATA_SPACE content and shove into our HFileBlock. 274 buf.position(buf.limit()); 275 buf.limit(buf.limit() + HFileBlock.BLOCK_METADATA_SPACE); 276 boolean usesChecksum = buf.get() == (byte) 1; 277 long offset = buf.getLong(); 278 int nextBlockOnDiskSize = buf.getInt(); 279 HFileBlock hFileBlock = 280 new HFileBlock(newByteBuff, usesChecksum, memType, offset, nextBlockOnDiskSize, null); 281 return hFileBlock; 282 } 283 284 @Override 285 public int getDeserialiserIdentifier() { 286 return DESERIALIZER_IDENTIFIER; 287 } 288 289 @Override 290 public HFileBlock deserialize(ByteBuff b) throws IOException { 291 // Used only in tests 292 return deserialize(b, false, MemoryType.EXCLUSIVE); 293 } 294 }; 295 296 private static final int DESERIALIZER_IDENTIFIER; 297 static { 298 DESERIALIZER_IDENTIFIER = 299 CacheableDeserializerIdManager.registerDeserializer(BLOCK_DESERIALIZER); 300 } 301 302 /** 303 * Copy constructor. Creates a shallow copy of {@code that}'s buffer. 304 */ 305 private HFileBlock(HFileBlock that) { 306 this(that, false); 307 } 308 309 /** 310 * Copy constructor. Creates a shallow/deep copy of {@code that}'s buffer as per the boolean 311 * param. 312 */ 313 private HFileBlock(HFileBlock that, boolean bufCopy) { 314 init(that.blockType, that.onDiskSizeWithoutHeader, 315 that.uncompressedSizeWithoutHeader, that.prevBlockOffset, 316 that.offset, that.onDiskDataSizeWithHeader, that.nextBlockOnDiskSize, that.fileContext); 317 if (bufCopy) { 318 this.buf = new SingleByteBuff(ByteBuffer.wrap(that.buf.toBytes(0, that.buf.limit()))); 319 } else { 320 this.buf = that.buf.duplicate(); 321 } 322 } 323 324 /** 325 * Creates a new {@link HFile} block from the given fields. This constructor 326 * is used only while writing blocks and caching, 327 * and is sitting in a byte buffer and we want to stuff the block into cache. 328 * See {@link Writer#getBlockForCaching(CacheConfig)}. 329 * 330 * <p>TODO: The caller presumes no checksumming 331 * required of this block instance since going into cache; checksum already verified on 332 * underlying block data pulled in from filesystem. Is that correct? What if cache is SSD? 333 * 334 * @param blockType the type of this block, see {@link BlockType} 335 * @param onDiskSizeWithoutHeader see {@link #onDiskSizeWithoutHeader} 336 * @param uncompressedSizeWithoutHeader see {@link #uncompressedSizeWithoutHeader} 337 * @param prevBlockOffset see {@link #prevBlockOffset} 338 * @param b block header ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes) 339 * @param fillHeader when true, write the first 4 header fields into passed buffer. 340 * @param offset the file offset the block was read from 341 * @param onDiskDataSizeWithHeader see {@link #onDiskDataSizeWithHeader} 342 * @param fileContext HFile meta data 343 */ 344 @VisibleForTesting 345 public HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader, 346 int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuffer b, boolean fillHeader, 347 long offset, final int nextBlockOnDiskSize, int onDiskDataSizeWithHeader, 348 HFileContext fileContext) { 349 init(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, 350 prevBlockOffset, offset, onDiskDataSizeWithHeader, nextBlockOnDiskSize, fileContext); 351 this.buf = new SingleByteBuff(b); 352 if (fillHeader) { 353 overwriteHeader(); 354 } 355 this.buf.rewind(); 356 } 357 358 /** 359 * Creates a block from an existing buffer starting with a header. Rewinds 360 * and takes ownership of the buffer. By definition of rewind, ignores the 361 * buffer position, but if you slice the buffer beforehand, it will rewind 362 * to that point. 363 * @param buf Has header, content, and trailing checksums if present. 364 */ 365 HFileBlock(ByteBuff buf, boolean usesHBaseChecksum, MemoryType memType, final long offset, 366 final int nextBlockOnDiskSize, HFileContext fileContext) throws IOException { 367 buf.rewind(); 368 final BlockType blockType = BlockType.read(buf); 369 final int onDiskSizeWithoutHeader = buf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX); 370 final int uncompressedSizeWithoutHeader = 371 buf.getInt(Header.UNCOMPRESSED_SIZE_WITHOUT_HEADER_INDEX); 372 final long prevBlockOffset = buf.getLong(Header.PREV_BLOCK_OFFSET_INDEX); 373 // This constructor is called when we deserialize a block from cache and when we read a block in 374 // from the fs. fileCache is null when deserialized from cache so need to make up one. 375 HFileContextBuilder fileContextBuilder = fileContext != null? 376 new HFileContextBuilder(fileContext): new HFileContextBuilder(); 377 fileContextBuilder.withHBaseCheckSum(usesHBaseChecksum); 378 int onDiskDataSizeWithHeader; 379 if (usesHBaseChecksum) { 380 byte checksumType = buf.get(Header.CHECKSUM_TYPE_INDEX); 381 int bytesPerChecksum = buf.getInt(Header.BYTES_PER_CHECKSUM_INDEX); 382 onDiskDataSizeWithHeader = buf.getInt(Header.ON_DISK_DATA_SIZE_WITH_HEADER_INDEX); 383 // Use the checksum type and bytes per checksum from header, not from filecontext. 384 fileContextBuilder.withChecksumType(ChecksumType.codeToType(checksumType)); 385 fileContextBuilder.withBytesPerCheckSum(bytesPerChecksum); 386 } else { 387 fileContextBuilder.withChecksumType(ChecksumType.NULL); 388 fileContextBuilder.withBytesPerCheckSum(0); 389 // Need to fix onDiskDataSizeWithHeader; there are not checksums after-block-data 390 onDiskDataSizeWithHeader = onDiskSizeWithoutHeader + headerSize(usesHBaseChecksum); 391 } 392 fileContext = fileContextBuilder.build(); 393 assert usesHBaseChecksum == fileContext.isUseHBaseChecksum(); 394 init(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, 395 prevBlockOffset, offset, onDiskDataSizeWithHeader, nextBlockOnDiskSize, fileContext); 396 this.memType = memType; 397 this.offset = offset; 398 this.buf = buf; 399 this.buf.rewind(); 400 } 401 402 /** 403 * Called from constructors. 404 */ 405 private void init(BlockType blockType, int onDiskSizeWithoutHeader, 406 int uncompressedSizeWithoutHeader, long prevBlockOffset, 407 long offset, int onDiskDataSizeWithHeader, final int nextBlockOnDiskSize, 408 HFileContext fileContext) { 409 this.blockType = blockType; 410 this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader; 411 this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader; 412 this.prevBlockOffset = prevBlockOffset; 413 this.offset = offset; 414 this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader; 415 this.nextBlockOnDiskSize = nextBlockOnDiskSize; 416 this.fileContext = fileContext; 417 } 418 419 /** 420 * Parse total on disk size including header and checksum. 421 * @param headerBuf Header ByteBuffer. Presumed exact size of header. 422 * @param verifyChecksum true if checksum verification is in use. 423 * @return Size of the block with header included. 424 */ 425 private static int getOnDiskSizeWithHeader(final ByteBuffer headerBuf, 426 boolean verifyChecksum) { 427 return headerBuf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX) + 428 headerSize(verifyChecksum); 429 } 430 431 /** 432 * @return the on-disk size of the next block (including the header size and any checksums if 433 * present) read by peeking into the next block's header; use as a hint when doing 434 * a read of the next block when scanning or running over a file. 435 */ 436 int getNextBlockOnDiskSize() { 437 return nextBlockOnDiskSize; 438 } 439 440 @Override 441 public BlockType getBlockType() { 442 return blockType; 443 } 444 445 /** @return get data block encoding id that was used to encode this block */ 446 short getDataBlockEncodingId() { 447 if (blockType != BlockType.ENCODED_DATA) { 448 throw new IllegalArgumentException("Querying encoder ID of a block " + 449 "of type other than " + BlockType.ENCODED_DATA + ": " + blockType); 450 } 451 return buf.getShort(headerSize()); 452 } 453 454 /** 455 * @return the on-disk size of header + data part + checksum. 456 */ 457 public int getOnDiskSizeWithHeader() { 458 return onDiskSizeWithoutHeader + headerSize(); 459 } 460 461 /** 462 * @return the on-disk size of the data part + checksum (header excluded). 463 */ 464 int getOnDiskSizeWithoutHeader() { 465 return onDiskSizeWithoutHeader; 466 } 467 468 /** 469 * @return the uncompressed size of data part (header and checksum excluded). 470 */ 471 int getUncompressedSizeWithoutHeader() { 472 return uncompressedSizeWithoutHeader; 473 } 474 475 /** 476 * @return the offset of the previous block of the same type in the file, or 477 * -1 if unknown 478 */ 479 long getPrevBlockOffset() { 480 return prevBlockOffset; 481 } 482 483 /** 484 * Rewinds {@code buf} and writes first 4 header fields. {@code buf} position 485 * is modified as side-effect. 486 */ 487 private void overwriteHeader() { 488 buf.rewind(); 489 blockType.write(buf); 490 buf.putInt(onDiskSizeWithoutHeader); 491 buf.putInt(uncompressedSizeWithoutHeader); 492 buf.putLong(prevBlockOffset); 493 if (this.fileContext.isUseHBaseChecksum()) { 494 buf.put(fileContext.getChecksumType().getCode()); 495 buf.putInt(fileContext.getBytesPerChecksum()); 496 buf.putInt(onDiskDataSizeWithHeader); 497 } 498 } 499 500 /** 501 * Returns a buffer that does not include the header or checksum. 502 * 503 * @return the buffer with header skipped and checksum omitted. 504 */ 505 public ByteBuff getBufferWithoutHeader() { 506 ByteBuff dup = getBufferReadOnly(); 507 // Now set it up so Buffer spans content only -- no header or no checksums. 508 return dup.position(headerSize()).limit(buf.limit() - totalChecksumBytes()).slice(); 509 } 510 511 /** 512 * Returns a read-only duplicate of the buffer this block stores internally ready to be read. 513 * Clients must not modify the buffer object though they may set position and limit on the 514 * returned buffer since we pass back a duplicate. This method has to be public because it is used 515 * in {@link CompoundBloomFilter} to avoid object creation on every Bloom 516 * filter lookup, but has to be used with caution. Buffer holds header, block content, 517 * and any follow-on checksums if present. 518 * 519 * @return the buffer of this block for read-only operations 520 */ 521 public ByteBuff getBufferReadOnly() { 522 // TODO: ByteBuf does not support asReadOnlyBuffer(). Fix. 523 ByteBuff dup = this.buf.duplicate(); 524 assert dup.position() == 0; 525 return dup; 526 } 527 528 @VisibleForTesting 529 private void sanityCheckAssertion(long valueFromBuf, long valueFromField, 530 String fieldName) throws IOException { 531 if (valueFromBuf != valueFromField) { 532 throw new AssertionError(fieldName + " in the buffer (" + valueFromBuf 533 + ") is different from that in the field (" + valueFromField + ")"); 534 } 535 } 536 537 @VisibleForTesting 538 private void sanityCheckAssertion(BlockType valueFromBuf, BlockType valueFromField) 539 throws IOException { 540 if (valueFromBuf != valueFromField) { 541 throw new IOException("Block type stored in the buffer: " + 542 valueFromBuf + ", block type field: " + valueFromField); 543 } 544 } 545 546 /** 547 * Checks if the block is internally consistent, i.e. the first 548 * {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the buffer contain a 549 * valid header consistent with the fields. Assumes a packed block structure. 550 * This function is primary for testing and debugging, and is not 551 * thread-safe, because it alters the internal buffer pointer. 552 * Used by tests only. 553 */ 554 @VisibleForTesting 555 void sanityCheck() throws IOException { 556 // Duplicate so no side-effects 557 ByteBuff dup = this.buf.duplicate().rewind(); 558 sanityCheckAssertion(BlockType.read(dup), blockType); 559 560 sanityCheckAssertion(dup.getInt(), onDiskSizeWithoutHeader, "onDiskSizeWithoutHeader"); 561 562 sanityCheckAssertion(dup.getInt(), uncompressedSizeWithoutHeader, 563 "uncompressedSizeWithoutHeader"); 564 565 sanityCheckAssertion(dup.getLong(), prevBlockOffset, "prevBlockOffset"); 566 if (this.fileContext.isUseHBaseChecksum()) { 567 sanityCheckAssertion(dup.get(), this.fileContext.getChecksumType().getCode(), "checksumType"); 568 sanityCheckAssertion(dup.getInt(), this.fileContext.getBytesPerChecksum(), 569 "bytesPerChecksum"); 570 sanityCheckAssertion(dup.getInt(), onDiskDataSizeWithHeader, "onDiskDataSizeWithHeader"); 571 } 572 573 int cksumBytes = totalChecksumBytes(); 574 int expectedBufLimit = onDiskDataSizeWithHeader + cksumBytes; 575 if (dup.limit() != expectedBufLimit) { 576 throw new AssertionError("Expected limit " + expectedBufLimit + ", got " + dup.limit()); 577 } 578 579 // We might optionally allocate HFILEBLOCK_HEADER_SIZE more bytes to read the next 580 // block's header, so there are two sensible values for buffer capacity. 581 int hdrSize = headerSize(); 582 if (dup.capacity() != expectedBufLimit && dup.capacity() != expectedBufLimit + hdrSize) { 583 throw new AssertionError("Invalid buffer capacity: " + dup.capacity() + 584 ", expected " + expectedBufLimit + " or " + (expectedBufLimit + hdrSize)); 585 } 586 } 587 588 @Override 589 public String toString() { 590 StringBuilder sb = new StringBuilder() 591 .append("[") 592 .append("blockType=").append(blockType) 593 .append(", fileOffset=").append(offset) 594 .append(", headerSize=").append(headerSize()) 595 .append(", onDiskSizeWithoutHeader=").append(onDiskSizeWithoutHeader) 596 .append(", uncompressedSizeWithoutHeader=").append(uncompressedSizeWithoutHeader) 597 .append(", prevBlockOffset=").append(prevBlockOffset) 598 .append(", isUseHBaseChecksum=").append(fileContext.isUseHBaseChecksum()); 599 if (fileContext.isUseHBaseChecksum()) { 600 sb.append(", checksumType=").append(ChecksumType.codeToType(this.buf.get(24))) 601 .append(", bytesPerChecksum=").append(this.buf.getInt(24 + 1)) 602 .append(", onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader); 603 } else { 604 sb.append(", onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader) 605 .append("(").append(onDiskSizeWithoutHeader) 606 .append("+").append(HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM).append(")"); 607 } 608 String dataBegin = null; 609 if (buf.hasArray()) { 610 dataBegin = Bytes.toStringBinary(buf.array(), buf.arrayOffset() + headerSize(), 611 Math.min(32, buf.limit() - buf.arrayOffset() - headerSize())); 612 } else { 613 ByteBuff bufWithoutHeader = getBufferWithoutHeader(); 614 byte[] dataBeginBytes = new byte[Math.min(32, 615 bufWithoutHeader.limit() - bufWithoutHeader.position())]; 616 bufWithoutHeader.get(dataBeginBytes); 617 dataBegin = Bytes.toStringBinary(dataBeginBytes); 618 } 619 sb.append(", getOnDiskSizeWithHeader=").append(getOnDiskSizeWithHeader()) 620 .append(", totalChecksumBytes=").append(totalChecksumBytes()) 621 .append(", isUnpacked=").append(isUnpacked()) 622 .append(", buf=[").append(buf).append("]") 623 .append(", dataBeginsWith=").append(dataBegin) 624 .append(", fileContext=").append(fileContext) 625 .append(", nextBlockOnDiskSize=").append(nextBlockOnDiskSize) 626 .append("]"); 627 return sb.toString(); 628 } 629 630 /** 631 * Retrieves the decompressed/decrypted view of this block. An encoded block remains in its 632 * encoded structure. Internal structures are shared between instances where applicable. 633 */ 634 HFileBlock unpack(HFileContext fileContext, FSReader reader) throws IOException { 635 if (!fileContext.isCompressedOrEncrypted()) { 636 // TODO: cannot use our own fileContext here because HFileBlock(ByteBuffer, boolean), 637 // which is used for block serialization to L2 cache, does not preserve encoding and 638 // encryption details. 639 return this; 640 } 641 642 HFileBlock unpacked = new HFileBlock(this); 643 unpacked.allocateBuffer(); // allocates space for the decompressed block 644 645 HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA ? 646 reader.getBlockDecodingContext() : reader.getDefaultBlockDecodingContext(); 647 648 ByteBuff dup = this.buf.duplicate(); 649 dup.position(this.headerSize()); 650 dup = dup.slice(); 651 ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(), 652 unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(), 653 dup); 654 return unpacked; 655 } 656 657 /** 658 * Always allocates a new buffer of the correct size. Copies header bytes 659 * from the existing buffer. Does not change header fields. 660 * Reserve room to keep checksum bytes too. 661 */ 662 private void allocateBuffer() { 663 int cksumBytes = totalChecksumBytes(); 664 int headerSize = headerSize(); 665 int capacityNeeded = headerSize + uncompressedSizeWithoutHeader + cksumBytes; 666 667 // TODO we need consider allocating offheap here? 668 ByteBuffer newBuf = ByteBuffer.allocate(capacityNeeded); 669 670 // Copy header bytes into newBuf. 671 // newBuf is HBB so no issue in calling array() 672 buf.position(0); 673 buf.get(newBuf.array(), newBuf.arrayOffset(), headerSize); 674 675 buf = new SingleByteBuff(newBuf); 676 // set limit to exclude next block's header 677 buf.limit(headerSize + uncompressedSizeWithoutHeader + cksumBytes); 678 } 679 680 /** 681 * Return true when this block's buffer has been unpacked, false otherwise. Note this is a 682 * calculated heuristic, not tracked attribute of the block. 683 */ 684 public boolean isUnpacked() { 685 final int cksumBytes = totalChecksumBytes(); 686 final int headerSize = headerSize(); 687 final int expectedCapacity = headerSize + uncompressedSizeWithoutHeader + cksumBytes; 688 final int bufCapacity = buf.capacity(); 689 return bufCapacity == expectedCapacity || bufCapacity == expectedCapacity + headerSize; 690 } 691 692 /** An additional sanity-check in case no compression or encryption is being used. */ 693 @VisibleForTesting 694 void sanityCheckUncompressedSize() throws IOException { 695 if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader + totalChecksumBytes()) { 696 throw new IOException("Using no compression but " 697 + "onDiskSizeWithoutHeader=" + onDiskSizeWithoutHeader + ", " 698 + "uncompressedSizeWithoutHeader=" + uncompressedSizeWithoutHeader 699 + ", numChecksumbytes=" + totalChecksumBytes()); 700 } 701 } 702 703 /** 704 * Cannot be {@link #UNSET}. Must be a legitimate value. Used re-making the {@link BlockCacheKey} when 705 * block is returned to the cache. 706 * @return the offset of this block in the file it was read from 707 */ 708 long getOffset() { 709 if (offset < 0) { 710 throw new IllegalStateException("HFile block offset not initialized properly"); 711 } 712 return offset; 713 } 714 715 /** 716 * @return a byte stream reading the data + checksum of this block 717 */ 718 DataInputStream getByteStream() { 719 ByteBuff dup = this.buf.duplicate(); 720 dup.position(this.headerSize()); 721 return new DataInputStream(new ByteBuffInputStream(dup)); 722 } 723 724 @Override 725 public long heapSize() { 726 long size = ClassSize.align( 727 ClassSize.OBJECT + 728 // Block type, multi byte buffer, MemoryType and meta references 729 4 * ClassSize.REFERENCE + 730 // On-disk size, uncompressed size, and next block's on-disk size 731 // bytePerChecksum and onDiskDataSize 732 4 * Bytes.SIZEOF_INT + 733 // This and previous block offset 734 2 * Bytes.SIZEOF_LONG + 735 // Heap size of the meta object. meta will be always not null. 736 fileContext.heapSize() 737 ); 738 739 if (buf != null) { 740 // Deep overhead of the byte buffer. Needs to be aligned separately. 741 size += ClassSize.align(buf.capacity() + MULTI_BYTE_BUFFER_HEAP_SIZE); 742 } 743 744 return ClassSize.align(size); 745 } 746 747 /** 748 * Read from an input stream at least <code>necessaryLen</code> and if possible, 749 * <code>extraLen</code> also if available. Analogous to 750 * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but specifies a 751 * number of "extra" bytes to also optionally read. 752 * 753 * @param in the input stream to read from 754 * @param buf the buffer to read into 755 * @param bufOffset the destination offset in the buffer 756 * @param necessaryLen the number of bytes that are absolutely necessary to read 757 * @param extraLen the number of extra bytes that would be nice to read 758 * @return true if succeeded reading the extra bytes 759 * @throws IOException if failed to read the necessary bytes 760 */ 761 static boolean readWithExtra(InputStream in, byte[] buf, 762 int bufOffset, int necessaryLen, int extraLen) throws IOException { 763 int bytesRemaining = necessaryLen + extraLen; 764 while (bytesRemaining > 0) { 765 int ret = in.read(buf, bufOffset, bytesRemaining); 766 if (ret == -1 && bytesRemaining <= extraLen) { 767 // We could not read the "extra data", but that is OK. 768 break; 769 } 770 if (ret < 0) { 771 throw new IOException("Premature EOF from inputStream (read " 772 + "returned " + ret + ", was trying to read " + necessaryLen 773 + " necessary bytes and " + extraLen + " extra bytes, " 774 + "successfully read " 775 + (necessaryLen + extraLen - bytesRemaining)); 776 } 777 bufOffset += ret; 778 bytesRemaining -= ret; 779 } 780 return bytesRemaining <= 0; 781 } 782 783 /** 784 * Read from an input stream at least <code>necessaryLen</code> and if possible, 785 * <code>extraLen</code> also if available. Analogous to 786 * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but uses 787 * positional read and specifies a number of "extra" bytes that would be 788 * desirable but not absolutely necessary to read. 789 * 790 * @param in the input stream to read from 791 * @param position the position within the stream from which to start reading 792 * @param buf the buffer to read into 793 * @param bufOffset the destination offset in the buffer 794 * @param necessaryLen the number of bytes that are absolutely necessary to 795 * read 796 * @param extraLen the number of extra bytes that would be nice to read 797 * @return true if and only if extraLen is > 0 and reading those extra bytes 798 * was successful 799 * @throws IOException if failed to read the necessary bytes 800 */ 801 @VisibleForTesting 802 static boolean positionalReadWithExtra(FSDataInputStream in, 803 long position, byte[] buf, int bufOffset, int necessaryLen, int extraLen) 804 throws IOException { 805 int bytesRemaining = necessaryLen + extraLen; 806 int bytesRead = 0; 807 while (bytesRead < necessaryLen) { 808 int ret = in.read(position, buf, bufOffset, bytesRemaining); 809 if (ret < 0) { 810 throw new IOException("Premature EOF from inputStream (positional read " 811 + "returned " + ret + ", was trying to read " + necessaryLen 812 + " necessary bytes and " + extraLen + " extra bytes, " 813 + "successfully read " + bytesRead); 814 } 815 position += ret; 816 bufOffset += ret; 817 bytesRemaining -= ret; 818 bytesRead += ret; 819 } 820 return bytesRead != necessaryLen && bytesRemaining <= 0; 821 } 822 823 /** 824 * Unified version 2 {@link HFile} block writer. The intended usage pattern 825 * is as follows: 826 * <ol> 827 * <li>Construct an {@link HFileBlock.Writer}, providing a compression algorithm. 828 * <li>Call {@link Writer#startWriting} and get a data stream to write to. 829 * <li>Write your data into the stream. 830 * <li>Call Writer#writeHeaderAndData(FSDataOutputStream) as many times as you need to. 831 * store the serialized block into an external stream. 832 * <li>Repeat to write more blocks. 833 * </ol> 834 * <p> 835 */ 836 static class Writer { 837 private enum State { 838 INIT, 839 WRITING, 840 BLOCK_READY 841 }; 842 843 /** Writer state. Used to ensure the correct usage protocol. */ 844 private State state = State.INIT; 845 846 /** Data block encoder used for data blocks */ 847 private final HFileDataBlockEncoder dataBlockEncoder; 848 849 private HFileBlockEncodingContext dataBlockEncodingCtx; 850 851 /** block encoding context for non-data blocks*/ 852 private HFileBlockDefaultEncodingContext defaultBlockEncodingCtx; 853 854 /** 855 * The stream we use to accumulate data into a block in an uncompressed format. 856 * We reset this stream at the end of each block and reuse it. The 857 * header is written as the first {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes into this 858 * stream. 859 */ 860 private ByteArrayOutputStream baosInMemory; 861 862 /** 863 * Current block type. Set in {@link #startWriting(BlockType)}. Could be 864 * changed in {@link #finishBlock()} from {@link BlockType#DATA} 865 * to {@link BlockType#ENCODED_DATA}. 866 */ 867 private BlockType blockType; 868 869 /** 870 * A stream that we write uncompressed bytes to, which compresses them and 871 * writes them to {@link #baosInMemory}. 872 */ 873 private DataOutputStream userDataStream; 874 875 // Size of actual data being written. Not considering the block encoding/compression. This 876 // includes the header size also. 877 private int unencodedDataSizeWritten; 878 879 // Size of actual data being written. considering the block encoding. This 880 // includes the header size also. 881 private int encodedDataSizeWritten; 882 883 /** 884 * Bytes to be written to the file system, including the header. Compressed 885 * if compression is turned on. It also includes the checksum data that 886 * immediately follows the block data. (header + data + checksums) 887 */ 888 private ByteArrayOutputStream onDiskBlockBytesWithHeader; 889 890 /** 891 * The size of the checksum data on disk. It is used only if data is 892 * not compressed. If data is compressed, then the checksums are already 893 * part of onDiskBytesWithHeader. If data is uncompressed, then this 894 * variable stores the checksum data for this block. 895 */ 896 private byte[] onDiskChecksum = HConstants.EMPTY_BYTE_ARRAY; 897 898 /** 899 * Current block's start offset in the {@link HFile}. Set in 900 * {@link #writeHeaderAndData(FSDataOutputStream)}. 901 */ 902 private long startOffset; 903 904 /** 905 * Offset of previous block by block type. Updated when the next block is 906 * started. 907 */ 908 private long[] prevOffsetByType; 909 910 /** The offset of the previous block of the same type */ 911 private long prevOffset; 912 /** Meta data that holds information about the hfileblock**/ 913 private HFileContext fileContext; 914 915 /** 916 * @param dataBlockEncoder data block encoding algorithm to use 917 */ 918 public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext fileContext) { 919 if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) { 920 throw new RuntimeException("Unsupported value of bytesPerChecksum. " + 921 " Minimum is " + HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is " + 922 fileContext.getBytesPerChecksum()); 923 } 924 this.dataBlockEncoder = dataBlockEncoder != null? 925 dataBlockEncoder: NoOpDataBlockEncoder.INSTANCE; 926 this.dataBlockEncodingCtx = this.dataBlockEncoder. 927 newDataBlockEncodingContext(HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext); 928 // TODO: This should be lazily instantiated since we usually do NOT need this default encoder 929 this.defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(null, 930 HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext); 931 // TODO: Set BAOS initial size. Use fileContext.getBlocksize() and add for header/checksum 932 baosInMemory = new ByteArrayOutputStream(); 933 prevOffsetByType = new long[BlockType.values().length]; 934 for (int i = 0; i < prevOffsetByType.length; ++i) { 935 prevOffsetByType[i] = UNSET; 936 } 937 // TODO: Why fileContext saved away when we have dataBlockEncoder and/or 938 // defaultDataBlockEncoder? 939 this.fileContext = fileContext; 940 } 941 942 /** 943 * Starts writing into the block. The previous block's data is discarded. 944 * 945 * @return the stream the user can write their data into 946 * @throws IOException 947 */ 948 DataOutputStream startWriting(BlockType newBlockType) 949 throws IOException { 950 if (state == State.BLOCK_READY && startOffset != -1) { 951 // We had a previous block that was written to a stream at a specific 952 // offset. Save that offset as the last offset of a block of that type. 953 prevOffsetByType[blockType.getId()] = startOffset; 954 } 955 956 startOffset = -1; 957 blockType = newBlockType; 958 959 baosInMemory.reset(); 960 baosInMemory.write(HConstants.HFILEBLOCK_DUMMY_HEADER); 961 962 state = State.WRITING; 963 964 // We will compress it later in finishBlock() 965 userDataStream = new ByteBufferWriterDataOutputStream(baosInMemory); 966 if (newBlockType == BlockType.DATA) { 967 this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream); 968 } 969 this.unencodedDataSizeWritten = 0; 970 this.encodedDataSizeWritten = 0; 971 return userDataStream; 972 } 973 974 /** 975 * Writes the Cell to this block 976 * @param cell 977 * @throws IOException 978 */ 979 void write(Cell cell) throws IOException{ 980 expectState(State.WRITING); 981 int posBeforeEncode = this.userDataStream.size(); 982 this.unencodedDataSizeWritten += 983 this.dataBlockEncoder.encode(cell, dataBlockEncodingCtx, this.userDataStream); 984 this.encodedDataSizeWritten += this.userDataStream.size() - posBeforeEncode; 985 } 986 987 /** 988 * Returns the stream for the user to write to. The block writer takes care 989 * of handling compression and buffering for caching on write. Can only be 990 * called in the "writing" state. 991 * 992 * @return the data output stream for the user to write to 993 */ 994 DataOutputStream getUserDataStream() { 995 expectState(State.WRITING); 996 return userDataStream; 997 } 998 999 /** 1000 * Transitions the block writer from the "writing" state to the "block 1001 * ready" state. Does nothing if a block is already finished. 1002 */ 1003 void ensureBlockReady() throws IOException { 1004 Preconditions.checkState(state != State.INIT, 1005 "Unexpected state: " + state); 1006 1007 if (state == State.BLOCK_READY) { 1008 return; 1009 } 1010 1011 // This will set state to BLOCK_READY. 1012 finishBlock(); 1013 } 1014 1015 /** 1016 * Finish up writing of the block. 1017 * Flushes the compressing stream (if using compression), fills out the header, 1018 * does any compression/encryption of bytes to flush out to disk, and manages 1019 * the cache on write content, if applicable. Sets block write state to "block ready". 1020 */ 1021 private void finishBlock() throws IOException { 1022 if (blockType == BlockType.DATA) { 1023 this.dataBlockEncoder.endBlockEncoding(dataBlockEncodingCtx, userDataStream, 1024 baosInMemory.getBuffer(), blockType); 1025 blockType = dataBlockEncodingCtx.getBlockType(); 1026 } 1027 userDataStream.flush(); 1028 prevOffset = prevOffsetByType[blockType.getId()]; 1029 1030 // We need to set state before we can package the block up for cache-on-write. In a way, the 1031 // block is ready, but not yet encoded or compressed. 1032 state = State.BLOCK_READY; 1033 Bytes compressAndEncryptDat; 1034 if (blockType == BlockType.DATA || blockType == BlockType.ENCODED_DATA) { 1035 compressAndEncryptDat = dataBlockEncodingCtx. 1036 compressAndEncrypt(baosInMemory.getBuffer(), 0, baosInMemory.size()); 1037 } else { 1038 compressAndEncryptDat = defaultBlockEncodingCtx. 1039 compressAndEncrypt(baosInMemory.getBuffer(), 0, baosInMemory.size()); 1040 } 1041 if (compressAndEncryptDat == null) { 1042 compressAndEncryptDat = new Bytes(baosInMemory.getBuffer(), 0, baosInMemory.size()); 1043 } 1044 if (onDiskBlockBytesWithHeader == null) { 1045 onDiskBlockBytesWithHeader = new ByteArrayOutputStream(compressAndEncryptDat.getLength()); 1046 } 1047 onDiskBlockBytesWithHeader.reset(); 1048 onDiskBlockBytesWithHeader.write(compressAndEncryptDat.get(), 1049 compressAndEncryptDat.getOffset(), compressAndEncryptDat.getLength()); 1050 // Calculate how many bytes we need for checksum on the tail of the block. 1051 int numBytes = (int) ChecksumUtil.numBytes( 1052 onDiskBlockBytesWithHeader.size(), 1053 fileContext.getBytesPerChecksum()); 1054 1055 // Put the header for the on disk bytes; header currently is unfilled-out 1056 putHeader(onDiskBlockBytesWithHeader, 1057 onDiskBlockBytesWithHeader.size() + numBytes, 1058 baosInMemory.size(), onDiskBlockBytesWithHeader.size()); 1059 if (onDiskChecksum.length != numBytes) { 1060 onDiskChecksum = new byte[numBytes]; 1061 } 1062 ChecksumUtil.generateChecksums( 1063 onDiskBlockBytesWithHeader.getBuffer(), 0,onDiskBlockBytesWithHeader.size(), 1064 onDiskChecksum, 0, fileContext.getChecksumType(), fileContext.getBytesPerChecksum()); 1065 } 1066 1067 /** 1068 * Put the header into the given byte array at the given offset. 1069 * @param onDiskSize size of the block on disk header + data + checksum 1070 * @param uncompressedSize size of the block after decompression (but 1071 * before optional data block decoding) including header 1072 * @param onDiskDataSize size of the block on disk with header 1073 * and data but not including the checksums 1074 */ 1075 private void putHeader(byte[] dest, int offset, int onDiskSize, 1076 int uncompressedSize, int onDiskDataSize) { 1077 offset = blockType.put(dest, offset); 1078 offset = Bytes.putInt(dest, offset, onDiskSize - HConstants.HFILEBLOCK_HEADER_SIZE); 1079 offset = Bytes.putInt(dest, offset, uncompressedSize - HConstants.HFILEBLOCK_HEADER_SIZE); 1080 offset = Bytes.putLong(dest, offset, prevOffset); 1081 offset = Bytes.putByte(dest, offset, fileContext.getChecksumType().getCode()); 1082 offset = Bytes.putInt(dest, offset, fileContext.getBytesPerChecksum()); 1083 Bytes.putInt(dest, offset, onDiskDataSize); 1084 } 1085 1086 private void putHeader(ByteArrayOutputStream dest, int onDiskSize, 1087 int uncompressedSize, int onDiskDataSize) { 1088 putHeader(dest.getBuffer(),0, onDiskSize, uncompressedSize, onDiskDataSize); 1089 } 1090 1091 /** 1092 * Similar to {@link #writeHeaderAndData(FSDataOutputStream)}, but records 1093 * the offset of this block so that it can be referenced in the next block 1094 * of the same type. 1095 * 1096 * @param out 1097 * @throws IOException 1098 */ 1099 void writeHeaderAndData(FSDataOutputStream out) throws IOException { 1100 long offset = out.getPos(); 1101 if (startOffset != UNSET && offset != startOffset) { 1102 throw new IOException("A " + blockType + " block written to a " 1103 + "stream twice, first at offset " + startOffset + ", then at " 1104 + offset); 1105 } 1106 startOffset = offset; 1107 1108 finishBlockAndWriteHeaderAndData((DataOutputStream) out); 1109 } 1110 1111 /** 1112 * Writes the header and the compressed data of this block (or uncompressed 1113 * data when not using compression) into the given stream. Can be called in 1114 * the "writing" state or in the "block ready" state. If called in the 1115 * "writing" state, transitions the writer to the "block ready" state. 1116 * 1117 * @param out the output stream to write the 1118 * @throws IOException 1119 */ 1120 protected void finishBlockAndWriteHeaderAndData(DataOutputStream out) 1121 throws IOException { 1122 ensureBlockReady(); 1123 long startTime = System.currentTimeMillis(); 1124 out.write(onDiskBlockBytesWithHeader.getBuffer(), 0, onDiskBlockBytesWithHeader.size()); 1125 out.write(onDiskChecksum); 1126 HFile.updateWriteLatency(System.currentTimeMillis() - startTime); 1127 } 1128 1129 /** 1130 * Returns the header or the compressed data (or uncompressed data when not 1131 * using compression) as a byte array. Can be called in the "writing" state 1132 * or in the "block ready" state. If called in the "writing" state, 1133 * transitions the writer to the "block ready" state. This returns 1134 * the header + data + checksums stored on disk. 1135 * 1136 * @return header and data as they would be stored on disk in a byte array 1137 * @throws IOException 1138 */ 1139 byte[] getHeaderAndDataForTest() throws IOException { 1140 ensureBlockReady(); 1141 // This is not very optimal, because we are doing an extra copy. 1142 // But this method is used only by unit tests. 1143 byte[] output = 1144 new byte[onDiskBlockBytesWithHeader.size() 1145 + onDiskChecksum.length]; 1146 System.arraycopy(onDiskBlockBytesWithHeader.getBuffer(), 0, output, 0, 1147 onDiskBlockBytesWithHeader.size()); 1148 System.arraycopy(onDiskChecksum, 0, output, 1149 onDiskBlockBytesWithHeader.size(), onDiskChecksum.length); 1150 return output; 1151 } 1152 1153 /** 1154 * Releases resources used by this writer. 1155 */ 1156 void release() { 1157 if (dataBlockEncodingCtx != null) { 1158 dataBlockEncodingCtx.close(); 1159 dataBlockEncodingCtx = null; 1160 } 1161 if (defaultBlockEncodingCtx != null) { 1162 defaultBlockEncodingCtx.close(); 1163 defaultBlockEncodingCtx = null; 1164 } 1165 } 1166 1167 /** 1168 * Returns the on-disk size of the data portion of the block. This is the 1169 * compressed size if compression is enabled. Can only be called in the 1170 * "block ready" state. Header is not compressed, and its size is not 1171 * included in the return value. 1172 * 1173 * @return the on-disk size of the block, not including the header. 1174 */ 1175 int getOnDiskSizeWithoutHeader() { 1176 expectState(State.BLOCK_READY); 1177 return onDiskBlockBytesWithHeader.size() + 1178 onDiskChecksum.length - HConstants.HFILEBLOCK_HEADER_SIZE; 1179 } 1180 1181 /** 1182 * Returns the on-disk size of the block. Can only be called in the 1183 * "block ready" state. 1184 * 1185 * @return the on-disk size of the block ready to be written, including the 1186 * header size, the data and the checksum data. 1187 */ 1188 int getOnDiskSizeWithHeader() { 1189 expectState(State.BLOCK_READY); 1190 return onDiskBlockBytesWithHeader.size() + onDiskChecksum.length; 1191 } 1192 1193 /** 1194 * The uncompressed size of the block data. Does not include header size. 1195 */ 1196 int getUncompressedSizeWithoutHeader() { 1197 expectState(State.BLOCK_READY); 1198 return baosInMemory.size() - HConstants.HFILEBLOCK_HEADER_SIZE; 1199 } 1200 1201 /** 1202 * The uncompressed size of the block data, including header size. 1203 */ 1204 int getUncompressedSizeWithHeader() { 1205 expectState(State.BLOCK_READY); 1206 return baosInMemory.size(); 1207 } 1208 1209 /** @return true if a block is being written */ 1210 boolean isWriting() { 1211 return state == State.WRITING; 1212 } 1213 1214 /** 1215 * Returns the number of bytes written into the current block so far, or 1216 * zero if not writing the block at the moment. Note that this will return 1217 * zero in the "block ready" state as well. 1218 * 1219 * @return the number of bytes written 1220 */ 1221 public int encodedBlockSizeWritten() { 1222 if (state != State.WRITING) 1223 return 0; 1224 return this.encodedDataSizeWritten; 1225 } 1226 1227 /** 1228 * Returns the number of bytes written into the current block so far, or 1229 * zero if not writing the block at the moment. Note that this will return 1230 * zero in the "block ready" state as well. 1231 * 1232 * @return the number of bytes written 1233 */ 1234 int blockSizeWritten() { 1235 if (state != State.WRITING) return 0; 1236 return this.unencodedDataSizeWritten; 1237 } 1238 1239 /** 1240 * Clones the header followed by the uncompressed data, even if using 1241 * compression. This is needed for storing uncompressed blocks in the block 1242 * cache. Can be called in the "writing" state or the "block ready" state. 1243 * Returns only the header and data, does not include checksum data. 1244 * 1245 * @return Returns a copy of uncompressed block bytes for caching on write 1246 */ 1247 @VisibleForTesting 1248 ByteBuffer cloneUncompressedBufferWithHeader() { 1249 expectState(State.BLOCK_READY); 1250 byte[] uncompressedBlockBytesWithHeader = baosInMemory.toByteArray(); 1251 int numBytes = (int) ChecksumUtil.numBytes( 1252 onDiskBlockBytesWithHeader.size(), 1253 fileContext.getBytesPerChecksum()); 1254 putHeader(uncompressedBlockBytesWithHeader, 0, 1255 onDiskBlockBytesWithHeader.size() + numBytes, 1256 baosInMemory.size(), onDiskBlockBytesWithHeader.size()); 1257 return ByteBuffer.wrap(uncompressedBlockBytesWithHeader); 1258 } 1259 1260 /** 1261 * Clones the header followed by the on-disk (compressed/encoded/encrypted) data. This is 1262 * needed for storing packed blocks in the block cache. Expects calling semantics identical to 1263 * {@link #getUncompressedBufferWithHeader()}. Returns only the header and data, 1264 * Does not include checksum data. 1265 * 1266 * @return Returns a copy of block bytes for caching on write 1267 */ 1268 private ByteBuffer cloneOnDiskBufferWithHeader() { 1269 expectState(State.BLOCK_READY); 1270 return ByteBuffer.wrap(onDiskBlockBytesWithHeader.toByteArray()); 1271 } 1272 1273 private void expectState(State expectedState) { 1274 if (state != expectedState) { 1275 throw new IllegalStateException("Expected state: " + expectedState + 1276 ", actual state: " + state); 1277 } 1278 } 1279 1280 /** 1281 * Takes the given {@link BlockWritable} instance, creates a new block of 1282 * its appropriate type, writes the writable into this block, and flushes 1283 * the block into the output stream. The writer is instructed not to buffer 1284 * uncompressed bytes for cache-on-write. 1285 * 1286 * @param bw the block-writable object to write as a block 1287 * @param out the file system output stream 1288 * @throws IOException 1289 */ 1290 void writeBlock(BlockWritable bw, FSDataOutputStream out) 1291 throws IOException { 1292 bw.writeToBlock(startWriting(bw.getBlockType())); 1293 writeHeaderAndData(out); 1294 } 1295 1296 /** 1297 * Creates a new HFileBlock. Checksums have already been validated, so 1298 * the byte buffer passed into the constructor of this newly created 1299 * block does not have checksum data even though the header minor 1300 * version is MINOR_VERSION_WITH_CHECKSUM. This is indicated by setting a 1301 * 0 value in bytesPerChecksum. This method copies the on-disk or 1302 * uncompressed data to build the HFileBlock which is used only 1303 * while writing blocks and caching. 1304 * 1305 * <p>TODO: Should there be an option where a cache can ask that hbase preserve block 1306 * checksums for checking after a block comes out of the cache? Otehrwise, cache is responsible 1307 * for blocks being wholesome (ECC memory or if file-backed, it does checksumming). 1308 */ 1309 HFileBlock getBlockForCaching(CacheConfig cacheConf) { 1310 HFileContext newContext = new HFileContextBuilder() 1311 .withBlockSize(fileContext.getBlocksize()) 1312 .withBytesPerCheckSum(0) 1313 .withChecksumType(ChecksumType.NULL) // no checksums in cached data 1314 .withCompression(fileContext.getCompression()) 1315 .withDataBlockEncoding(fileContext.getDataBlockEncoding()) 1316 .withHBaseCheckSum(fileContext.isUseHBaseChecksum()) 1317 .withCompressTags(fileContext.isCompressTags()) 1318 .withIncludesMvcc(fileContext.isIncludesMvcc()) 1319 .withIncludesTags(fileContext.isIncludesTags()) 1320 .build(); 1321 return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(), 1322 getUncompressedSizeWithoutHeader(), prevOffset, 1323 cacheConf.shouldCacheCompressed(blockType.getCategory())? 1324 cloneOnDiskBufferWithHeader() : 1325 cloneUncompressedBufferWithHeader(), 1326 FILL_HEADER, startOffset, UNSET, 1327 onDiskBlockBytesWithHeader.size() + onDiskChecksum.length, newContext); 1328 } 1329 } 1330 1331 /** Something that can be written into a block. */ 1332 interface BlockWritable { 1333 /** The type of block this data should use. */ 1334 BlockType getBlockType(); 1335 1336 /** 1337 * Writes the block to the provided stream. Must not write any magic 1338 * records. 1339 * 1340 * @param out a stream to write uncompressed data into 1341 */ 1342 void writeToBlock(DataOutput out) throws IOException; 1343 } 1344 1345 /** Iterator for {@link HFileBlock}s. */ 1346 interface BlockIterator { 1347 /** 1348 * Get the next block, or null if there are no more blocks to iterate. 1349 */ 1350 HFileBlock nextBlock() throws IOException; 1351 1352 /** 1353 * Similar to {@link #nextBlock()} but checks block type, throws an 1354 * exception if incorrect, and returns the HFile block 1355 */ 1356 HFileBlock nextBlockWithBlockType(BlockType blockType) throws IOException; 1357 } 1358 1359 /** An HFile block reader with iteration ability. */ 1360 interface FSReader { 1361 /** 1362 * Reads the block at the given offset in the file with the given on-disk 1363 * size and uncompressed size. 1364 * 1365 * @param offset 1366 * @param onDiskSize the on-disk size of the entire block, including all 1367 * applicable headers, or -1 if unknown 1368 * @return the newly read block 1369 */ 1370 HFileBlock readBlockData(long offset, long onDiskSize, boolean pread, boolean updateMetrics) 1371 throws IOException; 1372 1373 /** 1374 * Creates a block iterator over the given portion of the {@link HFile}. 1375 * The iterator returns blocks starting with offset such that offset <= 1376 * startOffset < endOffset. Returned blocks are always unpacked. 1377 * Used when no hfile index available; e.g. reading in the hfile index 1378 * blocks themselves on file open. 1379 * 1380 * @param startOffset the offset of the block to start iteration with 1381 * @param endOffset the offset to end iteration at (exclusive) 1382 * @return an iterator of blocks between the two given offsets 1383 */ 1384 BlockIterator blockRange(long startOffset, long endOffset); 1385 1386 /** Closes the backing streams */ 1387 void closeStreams() throws IOException; 1388 1389 /** Get a decoder for {@link BlockType#ENCODED_DATA} blocks from this file. */ 1390 HFileBlockDecodingContext getBlockDecodingContext(); 1391 1392 /** Get the default decoder for blocks from this file. */ 1393 HFileBlockDecodingContext getDefaultBlockDecodingContext(); 1394 1395 void setIncludesMemStoreTS(boolean includesMemstoreTS); 1396 void setDataBlockEncoder(HFileDataBlockEncoder encoder); 1397 1398 /** 1399 * To close the stream's socket. Note: This can be concurrently called from multiple threads and 1400 * implementation should take care of thread safety. 1401 */ 1402 void unbufferStream(); 1403 } 1404 1405 /** 1406 * Data-structure to use caching the header of the NEXT block. Only works if next read 1407 * that comes in here is next in sequence in this block. 1408 * 1409 * When we read, we read current block and the next blocks' header. We do this so we have 1410 * the length of the next block to read if the hfile index is not available (rare, at 1411 * hfile open only). 1412 */ 1413 private static class PrefetchedHeader { 1414 long offset = -1; 1415 byte [] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE]; 1416 final ByteBuffer buf = ByteBuffer.wrap(header, 0, HConstants.HFILEBLOCK_HEADER_SIZE); 1417 1418 @Override 1419 public String toString() { 1420 return "offset=" + this.offset + ", header=" + Bytes.toStringBinary(header); 1421 } 1422 } 1423 1424 /** 1425 * Reads version 2 HFile blocks from the filesystem. 1426 */ 1427 static class FSReaderImpl implements FSReader { 1428 /** The file system stream of the underlying {@link HFile} that 1429 * does or doesn't do checksum validations in the filesystem */ 1430 private FSDataInputStreamWrapper streamWrapper; 1431 1432 private HFileBlockDecodingContext encodedBlockDecodingCtx; 1433 1434 /** Default context used when BlockType != {@link BlockType#ENCODED_DATA}. */ 1435 private final HFileBlockDefaultDecodingContext defaultDecodingCtx; 1436 1437 /** 1438 * Cache of the NEXT header after this. Check it is indeed next blocks header 1439 * before using it. TODO: Review. This overread into next block to fetch 1440 * next blocks header seems unnecessary given we usually get the block size 1441 * from the hfile index. Review! 1442 */ 1443 private AtomicReference<PrefetchedHeader> prefetchedHeader = new AtomicReference<>(new PrefetchedHeader()); 1444 1445 /** The size of the file we are reading from, or -1 if unknown. */ 1446 private long fileSize; 1447 1448 /** The size of the header */ 1449 @VisibleForTesting 1450 protected final int hdrSize; 1451 1452 /** The filesystem used to access data */ 1453 private HFileSystem hfs; 1454 1455 private HFileContext fileContext; 1456 // Cache the fileName 1457 private String pathName; 1458 1459 private final Lock streamLock = new ReentrantLock(); 1460 1461 FSReaderImpl(FSDataInputStreamWrapper stream, long fileSize, HFileSystem hfs, Path path, 1462 HFileContext fileContext) throws IOException { 1463 this.fileSize = fileSize; 1464 this.hfs = hfs; 1465 if (path != null) { 1466 this.pathName = path.toString(); 1467 } 1468 this.fileContext = fileContext; 1469 this.hdrSize = headerSize(fileContext.isUseHBaseChecksum()); 1470 1471 this.streamWrapper = stream; 1472 // Older versions of HBase didn't support checksum. 1473 this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum()); 1474 defaultDecodingCtx = new HFileBlockDefaultDecodingContext(fileContext); 1475 encodedBlockDecodingCtx = defaultDecodingCtx; 1476 } 1477 1478 /** 1479 * A constructor that reads files with the latest minor version. 1480 * This is used by unit tests only. 1481 */ 1482 FSReaderImpl(FSDataInputStream istream, long fileSize, HFileContext fileContext) 1483 throws IOException { 1484 this(new FSDataInputStreamWrapper(istream), fileSize, null, null, fileContext); 1485 } 1486 1487 @Override 1488 public BlockIterator blockRange(final long startOffset, final long endOffset) { 1489 final FSReader owner = this; // handle for inner class 1490 return new BlockIterator() { 1491 private long offset = startOffset; 1492 // Cache length of next block. Current block has the length of next block in it. 1493 private long length = -1; 1494 1495 @Override 1496 public HFileBlock nextBlock() throws IOException { 1497 if (offset >= endOffset) { 1498 return null; 1499 } 1500 HFileBlock b = readBlockData(offset, length, false, false); 1501 offset += b.getOnDiskSizeWithHeader(); 1502 length = b.getNextBlockOnDiskSize(); 1503 return b.unpack(fileContext, owner); 1504 } 1505 1506 @Override 1507 public HFileBlock nextBlockWithBlockType(BlockType blockType) 1508 throws IOException { 1509 HFileBlock blk = nextBlock(); 1510 if (blk.getBlockType() != blockType) { 1511 throw new IOException("Expected block of type " + blockType 1512 + " but found " + blk.getBlockType()); 1513 } 1514 return blk; 1515 } 1516 }; 1517 } 1518 1519 /** 1520 * Does a positional read or a seek and read into the given buffer. Returns 1521 * the on-disk size of the next block, or -1 if it could not be read/determined; e.g. EOF. 1522 * 1523 * @param dest destination buffer 1524 * @param destOffset offset into the destination buffer at where to put the bytes we read 1525 * @param size size of read 1526 * @param peekIntoNextBlock whether to read the next block's on-disk size 1527 * @param fileOffset position in the stream to read at 1528 * @param pread whether we should do a positional read 1529 * @param istream The input source of data 1530 * @return the on-disk size of the next block with header size included, or 1531 * -1 if it could not be determined; if not -1, the <code>dest</code> INCLUDES the 1532 * next header 1533 * @throws IOException 1534 */ 1535 @VisibleForTesting 1536 protected int readAtOffset(FSDataInputStream istream, byte[] dest, int destOffset, int size, 1537 boolean peekIntoNextBlock, long fileOffset, boolean pread) 1538 throws IOException { 1539 if (peekIntoNextBlock && destOffset + size + hdrSize > dest.length) { 1540 // We are asked to read the next block's header as well, but there is 1541 // not enough room in the array. 1542 throw new IOException("Attempted to read " + size + " bytes and " + hdrSize + 1543 " bytes of next header into a " + dest.length + "-byte array at offset " + destOffset); 1544 } 1545 1546 if (!pread) { 1547 // Seek + read. Better for scanning. 1548 HFileUtil.seekOnMultipleSources(istream, fileOffset); 1549 // TODO: do we need seek time latencies? 1550 long realOffset = istream.getPos(); 1551 if (realOffset != fileOffset) { 1552 throw new IOException("Tried to seek to " + fileOffset + " to " + "read " + size + 1553 " bytes, but pos=" + realOffset + " after seek"); 1554 } 1555 1556 if (!peekIntoNextBlock) { 1557 IOUtils.readFully(istream, dest, destOffset, size); 1558 return -1; 1559 } 1560 1561 // Try to read the next block header. 1562 if (!readWithExtra(istream, dest, destOffset, size, hdrSize)) { 1563 return -1; 1564 } 1565 } else { 1566 // Positional read. Better for random reads; or when the streamLock is already locked. 1567 int extraSize = peekIntoNextBlock ? hdrSize : 0; 1568 if (!positionalReadWithExtra(istream, fileOffset, dest, destOffset, size, extraSize)) { 1569 return -1; 1570 } 1571 } 1572 assert peekIntoNextBlock; 1573 return Bytes.toInt(dest, destOffset + size + BlockType.MAGIC_LENGTH) + hdrSize; 1574 } 1575 1576 /** 1577 * Reads a version 2 block (version 1 blocks not supported and not expected). Tries to do as 1578 * little memory allocation as possible, using the provided on-disk size. 1579 * 1580 * @param offset the offset in the stream to read at 1581 * @param onDiskSizeWithHeaderL the on-disk size of the block, including 1582 * the header, or -1 if unknown; i.e. when iterating over blocks reading 1583 * in the file metadata info. 1584 * @param pread whether to use a positional read 1585 */ 1586 @Override 1587 public HFileBlock readBlockData(long offset, long onDiskSizeWithHeaderL, boolean pread, 1588 boolean updateMetrics) throws IOException { 1589 // Get a copy of the current state of whether to validate 1590 // hbase checksums or not for this read call. This is not 1591 // thread-safe but the one constaint is that if we decide 1592 // to skip hbase checksum verification then we are 1593 // guaranteed to use hdfs checksum verification. 1594 boolean doVerificationThruHBaseChecksum = streamWrapper.shouldUseHBaseChecksum(); 1595 FSDataInputStream is = streamWrapper.getStream(doVerificationThruHBaseChecksum); 1596 1597 HFileBlock blk = readBlockDataInternal(is, offset, 1598 onDiskSizeWithHeaderL, pread, 1599 doVerificationThruHBaseChecksum, updateMetrics); 1600 if (blk == null) { 1601 HFile.LOG.warn("HBase checksum verification failed for file " + 1602 pathName + " at offset " + 1603 offset + " filesize " + fileSize + 1604 ". Retrying read with HDFS checksums turned on..."); 1605 1606 if (!doVerificationThruHBaseChecksum) { 1607 String msg = "HBase checksum verification failed for file " + 1608 pathName + " at offset " + 1609 offset + " filesize " + fileSize + 1610 " but this cannot happen because doVerify is " + 1611 doVerificationThruHBaseChecksum; 1612 HFile.LOG.warn(msg); 1613 throw new IOException(msg); // cannot happen case here 1614 } 1615 HFile.CHECKSUM_FAILURES.increment(); // update metrics 1616 1617 // If we have a checksum failure, we fall back into a mode where 1618 // the next few reads use HDFS level checksums. We aim to make the 1619 // next CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD reads avoid 1620 // hbase checksum verification, but since this value is set without 1621 // holding any locks, it can so happen that we might actually do 1622 // a few more than precisely this number. 1623 is = this.streamWrapper.fallbackToFsChecksum(CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD); 1624 doVerificationThruHBaseChecksum = false; 1625 blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread, 1626 doVerificationThruHBaseChecksum, updateMetrics); 1627 if (blk != null) { 1628 HFile.LOG.warn("HDFS checksum verification succeeded for file " + 1629 pathName + " at offset " + 1630 offset + " filesize " + fileSize); 1631 } 1632 } 1633 if (blk == null && !doVerificationThruHBaseChecksum) { 1634 String msg = "readBlockData failed, possibly due to " + 1635 "checksum verification failed for file " + pathName + 1636 " at offset " + offset + " filesize " + fileSize; 1637 HFile.LOG.warn(msg); 1638 throw new IOException(msg); 1639 } 1640 1641 // If there is a checksum mismatch earlier, then retry with 1642 // HBase checksums switched off and use HDFS checksum verification. 1643 // This triggers HDFS to detect and fix corrupt replicas. The 1644 // next checksumOffCount read requests will use HDFS checksums. 1645 // The decrementing of this.checksumOffCount is not thread-safe, 1646 // but it is harmless because eventually checksumOffCount will be 1647 // a negative number. 1648 streamWrapper.checksumOk(); 1649 return blk; 1650 } 1651 1652 /** 1653 * @return Check <code>onDiskSizeWithHeaderL</code> size is healthy and then return it as an int 1654 * @throws IOException 1655 */ 1656 private static int checkAndGetSizeAsInt(final long onDiskSizeWithHeaderL, final int hdrSize) 1657 throws IOException { 1658 if ((onDiskSizeWithHeaderL < hdrSize && onDiskSizeWithHeaderL != -1) 1659 || onDiskSizeWithHeaderL >= Integer.MAX_VALUE) { 1660 throw new IOException("Invalid onDisksize=" + onDiskSizeWithHeaderL 1661 + ": expected to be at least " + hdrSize 1662 + " and at most " + Integer.MAX_VALUE + ", or -1"); 1663 } 1664 return (int)onDiskSizeWithHeaderL; 1665 } 1666 1667 /** 1668 * Verify the passed in onDiskSizeWithHeader aligns with what is in the header else something 1669 * is not right. 1670 * @throws IOException 1671 */ 1672 private void verifyOnDiskSizeMatchesHeader(final int passedIn, final ByteBuffer headerBuf, 1673 final long offset, boolean verifyChecksum) 1674 throws IOException { 1675 // Assert size provided aligns with what is in the header 1676 int fromHeader = getOnDiskSizeWithHeader(headerBuf, verifyChecksum); 1677 if (passedIn != fromHeader) { 1678 throw new IOException("Passed in onDiskSizeWithHeader=" + passedIn + " != " + fromHeader + 1679 ", offset=" + offset + ", fileContext=" + this.fileContext); 1680 } 1681 } 1682 1683 /** 1684 * Check atomic reference cache for this block's header. Cache only good if next 1685 * read coming through is next in sequence in the block. We read next block's 1686 * header on the tail of reading the previous block to save a seek. Otherwise, 1687 * we have to do a seek to read the header before we can pull in the block OR 1688 * we have to backup the stream because we over-read (the next block's header). 1689 * @see PrefetchedHeader 1690 * @return The cached block header or null if not found. 1691 * @see #cacheNextBlockHeader(long, byte[], int, int) 1692 */ 1693 private ByteBuffer getCachedHeader(final long offset) { 1694 PrefetchedHeader ph = this.prefetchedHeader.get(); 1695 return ph != null && ph.offset == offset? ph.buf: null; 1696 } 1697 1698 /** 1699 * Save away the next blocks header in atomic reference. 1700 * @see #getCachedHeader(long) 1701 * @see PrefetchedHeader 1702 */ 1703 private void cacheNextBlockHeader(final long offset, 1704 final byte [] header, final int headerOffset, final int headerLength) { 1705 PrefetchedHeader ph = new PrefetchedHeader(); 1706 ph.offset = offset; 1707 System.arraycopy(header, headerOffset, ph.header, 0, headerLength); 1708 this.prefetchedHeader.set(ph); 1709 } 1710 1711 /** 1712 * Reads a version 2 block. 1713 * 1714 * @param offset the offset in the stream to read at. 1715 * @param onDiskSizeWithHeaderL the on-disk size of the block, including 1716 * the header and checksums if present or -1 if unknown (as a long). Can be -1 1717 * if we are doing raw iteration of blocks as when loading up file metadata; i.e. 1718 * the first read of a new file. Usually non-null gotten from the file index. 1719 * @param pread whether to use a positional read 1720 * @param verifyChecksum Whether to use HBase checksums. 1721 * If HBase checksum is switched off, then use HDFS checksum. Can also flip on/off 1722 * reading same file if we hit a troublesome patch in an hfile. 1723 * @return the HFileBlock or null if there is a HBase checksum mismatch 1724 */ 1725 @VisibleForTesting 1726 protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset, 1727 long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum, boolean updateMetrics) 1728 throws IOException { 1729 if (offset < 0) { 1730 throw new IOException("Invalid offset=" + offset + " trying to read " 1731 + "block (onDiskSize=" + onDiskSizeWithHeaderL + ")"); 1732 } 1733 int onDiskSizeWithHeader = checkAndGetSizeAsInt(onDiskSizeWithHeaderL, hdrSize); 1734 // Try and get cached header. Will serve us in rare case where onDiskSizeWithHeaderL is -1 1735 // and will save us having to seek the stream backwards to reread the header we 1736 // read the last time through here. 1737 ByteBuffer headerBuf = getCachedHeader(offset); 1738 LOG.trace("Reading {} at offset={}, pread={}, verifyChecksum={}, cachedHeader={}, " + 1739 "onDiskSizeWithHeader={}", this.fileContext.getHFileName(), offset, pread, 1740 verifyChecksum, headerBuf, onDiskSizeWithHeader); 1741 // This is NOT same as verifyChecksum. This latter is whether to do hbase 1742 // checksums. Can change with circumstances. The below flag is whether the 1743 // file has support for checksums (version 2+). 1744 boolean checksumSupport = this.fileContext.isUseHBaseChecksum(); 1745 long startTime = System.currentTimeMillis(); 1746 if (onDiskSizeWithHeader <= 0) { 1747 // We were not passed the block size. Need to get it from the header. If header was 1748 // not cached (see getCachedHeader above), need to seek to pull it in. This is costly 1749 // and should happen very rarely. Currently happens on open of a hfile reader where we 1750 // read the trailer blocks to pull in the indices. Otherwise, we are reading block sizes 1751 // out of the hfile index. To check, enable TRACE in this file and you'll get an exception 1752 // in a LOG every time we seek. See HBASE-17072 for more detail. 1753 if (headerBuf == null) { 1754 if (LOG.isTraceEnabled()) { 1755 LOG.trace("Extra see to get block size!", new RuntimeException()); 1756 } 1757 headerBuf = ByteBuffer.allocate(hdrSize); 1758 readAtOffset(is, headerBuf.array(), headerBuf.arrayOffset(), hdrSize, false, 1759 offset, pread); 1760 } 1761 onDiskSizeWithHeader = getOnDiskSizeWithHeader(headerBuf, checksumSupport); 1762 } 1763 int preReadHeaderSize = headerBuf == null? 0 : hdrSize; 1764 // Allocate enough space to fit the next block's header too; saves a seek next time through. 1765 // onDiskBlock is whole block + header + checksums then extra hdrSize to read next header; 1766 // onDiskSizeWithHeader is header, body, and any checksums if present. preReadHeaderSize 1767 // says where to start reading. If we have the header cached, then we don't need to read 1768 // it again and we can likely read from last place we left off w/o need to backup and reread 1769 // the header we read last time through here. 1770 // TODO: Make this ByteBuffer-based. Will make it easier to go to HDFS with BBPool (offheap). 1771 byte [] onDiskBlock = new byte[onDiskSizeWithHeader + hdrSize]; 1772 int nextBlockOnDiskSize = readAtOffset(is, onDiskBlock, preReadHeaderSize, 1773 onDiskSizeWithHeader - preReadHeaderSize, true, offset + preReadHeaderSize, pread); 1774 if (headerBuf != null) { 1775 // The header has been read when reading the previous block OR in a distinct header-only 1776 // read. Copy to this block's header. 1777 System.arraycopy(headerBuf.array(), headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize); 1778 } else { 1779 headerBuf = ByteBuffer.wrap(onDiskBlock, 0, hdrSize); 1780 } 1781 // Do a few checks before we go instantiate HFileBlock. 1782 assert onDiskSizeWithHeader > this.hdrSize; 1783 verifyOnDiskSizeMatchesHeader(onDiskSizeWithHeader, headerBuf, offset, checksumSupport); 1784 ByteBuffer onDiskBlockByteBuffer = ByteBuffer.wrap(onDiskBlock, 0, onDiskSizeWithHeader); 1785 // Verify checksum of the data before using it for building HFileBlock. 1786 if (verifyChecksum && 1787 !validateChecksum(offset, onDiskBlockByteBuffer, hdrSize)) { 1788 return null; 1789 } 1790 long duration = System.currentTimeMillis() - startTime; 1791 if (updateMetrics) { 1792 HFile.updateReadLatency(duration, pread); 1793 } 1794 // The onDiskBlock will become the headerAndDataBuffer for this block. 1795 // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already 1796 // contains the header of next block, so no need to set next block's header in it. 1797 HFileBlock hFileBlock = 1798 new HFileBlock(new SingleByteBuff(onDiskBlockByteBuffer), checksumSupport, 1799 MemoryType.EXCLUSIVE, offset, nextBlockOnDiskSize, fileContext); 1800 // Run check on uncompressed sizings. 1801 if (!fileContext.isCompressedOrEncrypted()) { 1802 hFileBlock.sanityCheckUncompressed(); 1803 } 1804 LOG.trace("Read {} in {} ns", hFileBlock, duration); 1805 // Cache next block header if we read it for the next time through here. 1806 if (nextBlockOnDiskSize != -1) { 1807 cacheNextBlockHeader(offset + hFileBlock.getOnDiskSizeWithHeader(), 1808 onDiskBlock, onDiskSizeWithHeader, hdrSize); 1809 } 1810 return hFileBlock; 1811 } 1812 1813 @Override 1814 public void setIncludesMemStoreTS(boolean includesMemstoreTS) { 1815 this.fileContext.setIncludesMvcc(includesMemstoreTS); 1816 } 1817 1818 @Override 1819 public void setDataBlockEncoder(HFileDataBlockEncoder encoder) { 1820 encodedBlockDecodingCtx = encoder.newDataBlockDecodingContext(this.fileContext); 1821 } 1822 1823 @Override 1824 public HFileBlockDecodingContext getBlockDecodingContext() { 1825 return this.encodedBlockDecodingCtx; 1826 } 1827 1828 @Override 1829 public HFileBlockDecodingContext getDefaultBlockDecodingContext() { 1830 return this.defaultDecodingCtx; 1831 } 1832 1833 /** 1834 * Generates the checksum for the header as well as the data and then validates it. 1835 * If the block doesn't uses checksum, returns false. 1836 * @return True if checksum matches, else false. 1837 */ 1838 private boolean validateChecksum(long offset, ByteBuffer data, int hdrSize) 1839 throws IOException { 1840 // If this is an older version of the block that does not have checksums, then return false 1841 // indicating that checksum verification did not succeed. Actually, this method should never 1842 // be called when the minorVersion is 0, thus this is a defensive check for a cannot-happen 1843 // case. Since this is a cannot-happen case, it is better to return false to indicate a 1844 // checksum validation failure. 1845 if (!fileContext.isUseHBaseChecksum()) { 1846 return false; 1847 } 1848 return ChecksumUtil.validateChecksum(data, pathName, offset, hdrSize); 1849 } 1850 1851 @Override 1852 public void closeStreams() throws IOException { 1853 streamWrapper.close(); 1854 } 1855 1856 @Override 1857 public void unbufferStream() { 1858 // To handle concurrent reads, ensure that no other client is accessing the streams while we 1859 // unbuffer it. 1860 if (streamLock.tryLock()) { 1861 try { 1862 this.streamWrapper.unbuffer(); 1863 } finally { 1864 streamLock.unlock(); 1865 } 1866 } 1867 } 1868 1869 @Override 1870 public String toString() { 1871 return "hfs=" + hfs + ", path=" + pathName + ", fileContext=" + fileContext; 1872 } 1873 } 1874 1875 /** An additional sanity-check in case no compression or encryption is being used. */ 1876 @VisibleForTesting 1877 void sanityCheckUncompressed() throws IOException { 1878 if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader + 1879 totalChecksumBytes()) { 1880 throw new IOException("Using no compression but " 1881 + "onDiskSizeWithoutHeader=" + onDiskSizeWithoutHeader + ", " 1882 + "uncompressedSizeWithoutHeader=" + uncompressedSizeWithoutHeader 1883 + ", numChecksumbytes=" + totalChecksumBytes()); 1884 } 1885 } 1886 1887 // Cacheable implementation 1888 @Override 1889 public int getSerializedLength() { 1890 if (buf != null) { 1891 // Include extra bytes for block metadata. 1892 return this.buf.limit() + BLOCK_METADATA_SPACE; 1893 } 1894 return 0; 1895 } 1896 1897 // Cacheable implementation 1898 @Override 1899 public void serialize(ByteBuffer destination, boolean includeNextBlockMetadata) { 1900 this.buf.get(destination, 0, getSerializedLength() - BLOCK_METADATA_SPACE); 1901 destination = addMetaData(destination, includeNextBlockMetadata); 1902 1903 // Make it ready for reading. flip sets position to zero and limit to current position which 1904 // is what we want if we do not want to serialize the block plus checksums if present plus 1905 // metadata. 1906 destination.flip(); 1907 } 1908 1909 /** 1910 * For use by bucketcache. This exposes internals. 1911 */ 1912 public ByteBuffer getMetaData() { 1913 ByteBuffer bb = ByteBuffer.allocate(BLOCK_METADATA_SPACE); 1914 bb = addMetaData(bb, true); 1915 bb.flip(); 1916 return bb; 1917 } 1918 1919 /** 1920 * Adds metadata at current position (position is moved forward). Does not flip or reset. 1921 * @return The passed <code>destination</code> with metadata added. 1922 */ 1923 private ByteBuffer addMetaData(final ByteBuffer destination, boolean includeNextBlockMetadata) { 1924 destination.put(this.fileContext.isUseHBaseChecksum() ? (byte) 1 : (byte) 0); 1925 destination.putLong(this.offset); 1926 if (includeNextBlockMetadata) { 1927 destination.putInt(this.nextBlockOnDiskSize); 1928 } 1929 return destination; 1930 } 1931 1932 // Cacheable implementation 1933 @Override 1934 public CacheableDeserializer<Cacheable> getDeserializer() { 1935 return HFileBlock.BLOCK_DESERIALIZER; 1936 } 1937 1938 @Override 1939 public int hashCode() { 1940 int result = 1; 1941 result = result * 31 + blockType.hashCode(); 1942 result = result * 31 + nextBlockOnDiskSize; 1943 result = result * 31 + (int) (offset ^ (offset >>> 32)); 1944 result = result * 31 + onDiskSizeWithoutHeader; 1945 result = result * 31 + (int) (prevBlockOffset ^ (prevBlockOffset >>> 32)); 1946 result = result * 31 + uncompressedSizeWithoutHeader; 1947 result = result * 31 + buf.hashCode(); 1948 return result; 1949 } 1950 1951 @Override 1952 public boolean equals(Object comparison) { 1953 if (this == comparison) { 1954 return true; 1955 } 1956 if (comparison == null) { 1957 return false; 1958 } 1959 if (comparison.getClass() != this.getClass()) { 1960 return false; 1961 } 1962 1963 HFileBlock castedComparison = (HFileBlock) comparison; 1964 1965 if (castedComparison.blockType != this.blockType) { 1966 return false; 1967 } 1968 if (castedComparison.nextBlockOnDiskSize != this.nextBlockOnDiskSize) { 1969 return false; 1970 } 1971 // Offset is important. Needed when we have to remake cachekey when block is returned to cache. 1972 if (castedComparison.offset != this.offset) { 1973 return false; 1974 } 1975 if (castedComparison.onDiskSizeWithoutHeader != this.onDiskSizeWithoutHeader) { 1976 return false; 1977 } 1978 if (castedComparison.prevBlockOffset != this.prevBlockOffset) { 1979 return false; 1980 } 1981 if (castedComparison.uncompressedSizeWithoutHeader != this.uncompressedSizeWithoutHeader) { 1982 return false; 1983 } 1984 if (ByteBuff.compareTo(this.buf, 0, this.buf.limit(), castedComparison.buf, 0, 1985 castedComparison.buf.limit()) != 0) { 1986 return false; 1987 } 1988 return true; 1989 } 1990 1991 DataBlockEncoding getDataBlockEncoding() { 1992 if (blockType == BlockType.ENCODED_DATA) { 1993 return DataBlockEncoding.getEncodingById(getDataBlockEncodingId()); 1994 } 1995 return DataBlockEncoding.NONE; 1996 } 1997 1998 @VisibleForTesting 1999 byte getChecksumType() { 2000 return this.fileContext.getChecksumType().getCode(); 2001 } 2002 2003 int getBytesPerChecksum() { 2004 return this.fileContext.getBytesPerChecksum(); 2005 } 2006 2007 /** @return the size of data on disk + header. Excludes checksum. */ 2008 @VisibleForTesting 2009 int getOnDiskDataSizeWithHeader() { 2010 return this.onDiskDataSizeWithHeader; 2011 } 2012 2013 /** 2014 * Calculate the number of bytes required to store all the checksums 2015 * for this block. Each checksum value is a 4 byte integer. 2016 */ 2017 int totalChecksumBytes() { 2018 // If the hfile block has minorVersion 0, then there are no checksum 2019 // data to validate. Similarly, a zero value in this.bytesPerChecksum 2020 // indicates that cached blocks do not have checksum data because 2021 // checksums were already validated when the block was read from disk. 2022 if (!fileContext.isUseHBaseChecksum() || this.fileContext.getBytesPerChecksum() == 0) { 2023 return 0; 2024 } 2025 return (int) ChecksumUtil.numBytes(onDiskDataSizeWithHeader, 2026 this.fileContext.getBytesPerChecksum()); 2027 } 2028 2029 /** 2030 * Returns the size of this block header. 2031 */ 2032 public int headerSize() { 2033 return headerSize(this.fileContext.isUseHBaseChecksum()); 2034 } 2035 2036 /** 2037 * Maps a minor version to the size of the header. 2038 */ 2039 public static int headerSize(boolean usesHBaseChecksum) { 2040 return usesHBaseChecksum? 2041 HConstants.HFILEBLOCK_HEADER_SIZE: HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM; 2042 } 2043 2044 /** 2045 * Return the appropriate DUMMY_HEADER for the minor version 2046 */ 2047 @VisibleForTesting 2048 // TODO: Why is this in here? 2049 byte[] getDummyHeaderForVersion() { 2050 return getDummyHeaderForVersion(this.fileContext.isUseHBaseChecksum()); 2051 } 2052 2053 /** 2054 * Return the appropriate DUMMY_HEADER for the minor version 2055 */ 2056 static private byte[] getDummyHeaderForVersion(boolean usesHBaseChecksum) { 2057 return usesHBaseChecksum? HConstants.HFILEBLOCK_DUMMY_HEADER: DUMMY_HEADER_NO_CHECKSUM; 2058 } 2059 2060 /** 2061 * @return This HFileBlocks fileContext which will a derivative of the 2062 * fileContext for the file from which this block's data was originally read. 2063 */ 2064 HFileContext getHFileContext() { 2065 return this.fileContext; 2066 } 2067 2068 @Override 2069 public MemoryType getMemoryType() { 2070 return this.memType; 2071 } 2072 2073 /** 2074 * @return true if this block is backed by a shared memory area(such as that of a BucketCache). 2075 */ 2076 boolean usesSharedMemory() { 2077 return this.memType == MemoryType.SHARED; 2078 } 2079 2080 /** 2081 * Convert the contents of the block header into a human readable string. 2082 * This is mostly helpful for debugging. This assumes that the block 2083 * has minor version > 0. 2084 */ 2085 @VisibleForTesting 2086 static String toStringHeader(ByteBuff buf) throws IOException { 2087 byte[] magicBuf = new byte[Math.min(buf.limit() - buf.position(), BlockType.MAGIC_LENGTH)]; 2088 buf.get(magicBuf); 2089 BlockType bt = BlockType.parse(magicBuf, 0, BlockType.MAGIC_LENGTH); 2090 int compressedBlockSizeNoHeader = buf.getInt(); 2091 int uncompressedBlockSizeNoHeader = buf.getInt(); 2092 long prevBlockOffset = buf.getLong(); 2093 byte cksumtype = buf.get(); 2094 long bytesPerChecksum = buf.getInt(); 2095 long onDiskDataSizeWithHeader = buf.getInt(); 2096 return " Header dump: magic: " + Bytes.toString(magicBuf) + 2097 " blockType " + bt + 2098 " compressedBlockSizeNoHeader " + 2099 compressedBlockSizeNoHeader + 2100 " uncompressedBlockSizeNoHeader " + 2101 uncompressedBlockSizeNoHeader + 2102 " prevBlockOffset " + prevBlockOffset + 2103 " checksumType " + ChecksumType.codeToType(cksumtype) + 2104 " bytesPerChecksum " + bytesPerChecksum + 2105 " onDiskDataSizeWithHeader " + onDiskDataSizeWithHeader; 2106 } 2107 2108 public HFileBlock deepClone() { 2109 return new HFileBlock(this, true); 2110 } 2111}