001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP;
021import static org.apache.hadoop.hbase.io.hfile.trace.HFileContextAttributesBuilderConsumer.CONTEXT_KEY;
022
023import io.opentelemetry.api.common.Attributes;
024import io.opentelemetry.api.common.AttributesBuilder;
025import io.opentelemetry.api.trace.Span;
026import io.opentelemetry.context.Context;
027import io.opentelemetry.context.Scope;
028import java.io.DataInputStream;
029import java.io.DataOutput;
030import java.io.DataOutputStream;
031import java.io.IOException;
032import java.nio.ByteBuffer;
033import java.util.ArrayList;
034import java.util.List;
035import java.util.Optional;
036import java.util.concurrent.atomic.AtomicReference;
037import java.util.concurrent.locks.Lock;
038import java.util.concurrent.locks.ReentrantLock;
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.fs.FSDataInputStream;
041import org.apache.hadoop.fs.FSDataOutputStream;
042import org.apache.hadoop.hbase.Cell;
043import org.apache.hadoop.hbase.HConstants;
044import org.apache.hadoop.hbase.fs.HFileSystem;
045import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
046import org.apache.hadoop.hbase.io.ByteBuffAllocator;
047import org.apache.hadoop.hbase.io.ByteBuffInputStream;
048import org.apache.hadoop.hbase.io.ByteBufferWriterDataOutputStream;
049import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
050import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
051import org.apache.hadoop.hbase.io.encoding.EncodingState;
052import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
053import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
054import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
055import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
056import org.apache.hadoop.hbase.io.hfile.trace.HFileContextAttributesBuilderConsumer;
057import org.apache.hadoop.hbase.io.util.BlockIOUtils;
058import org.apache.hadoop.hbase.nio.ByteBuff;
059import org.apache.hadoop.hbase.nio.MultiByteBuff;
060import org.apache.hadoop.hbase.nio.SingleByteBuff;
061import org.apache.hadoop.hbase.regionserver.ShipperListener;
062import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.ReadType;
063import org.apache.hadoop.hbase.util.Bytes;
064import org.apache.hadoop.hbase.util.ChecksumType;
065import org.apache.hadoop.hbase.util.ClassSize;
066import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
067import org.apache.yetus.audience.InterfaceAudience;
068import org.slf4j.Logger;
069import org.slf4j.LoggerFactory;
070
071import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
072
073/**
074 * Cacheable Blocks of an {@link HFile} version 2 file. Version 2 was introduced in hbase-0.92.0.
075 * <p>
076 * Version 1 was the original file block. Version 2 was introduced when we changed the hbase file
077 * format to support multi-level block indexes and compound bloom filters (HBASE-3857). Support for
078 * Version 1 was removed in hbase-1.3.0.
079 * <h3>HFileBlock: Version 2</h3> In version 2, a block is structured as follows:
080 * <ul>
081 * <li><b>Header:</b> See Writer#putHeader() for where header is written; header total size is
082 * HFILEBLOCK_HEADER_SIZE
083 * <ul>
084 * <li>0. blockType: Magic record identifying the {@link BlockType} (8 bytes): e.g.
085 * <code>DATABLK*</code>
086 * <li>1. onDiskSizeWithoutHeader: Compressed -- a.k.a 'on disk' -- block size, excluding header,
087 * but including tailing checksum bytes (4 bytes)
088 * <li>2. uncompressedSizeWithoutHeader: Uncompressed block size, excluding header, and excluding
089 * checksum bytes (4 bytes)
090 * <li>3. prevBlockOffset: The offset of the previous block of the same type (8 bytes). This is used
091 * to navigate to the previous block without having to go to the block index
092 * <li>4: For minorVersions &gt;=1, the ordinal describing checksum type (1 byte)
093 * <li>5: For minorVersions &gt;=1, the number of data bytes/checksum chunk (4 bytes)
094 * <li>6: onDiskDataSizeWithHeader: For minorVersions &gt;=1, the size of data 'on disk', including
095 * header, excluding checksums (4 bytes)
096 * </ul>
097 * </li>
098 * <li><b>Raw/Compressed/Encrypted/Encoded data:</b> The compression algorithm is the same for all
099 * the blocks in an {@link HFile}. If compression is NONE, this is just raw, serialized Cells.
100 * <li><b>Tail:</b> For minorVersions &gt;=1, a series of 4 byte checksums, one each for the number
101 * of bytes specified by bytesPerChecksum.
102 * </ul>
103 * <h3>Caching</h3> Caches cache whole blocks with trailing checksums if any. We then tag on some
104 * metadata, the content of BLOCK_METADATA_SPACE which will be flag on if we are doing 'hbase'
105 * checksums and then the offset into the file which is needed when we re-make a cache key when we
106 * return the block to the cache as 'done'. See {@link Cacheable#serialize(ByteBuffer, boolean)} and
107 * {@link Cacheable#getDeserializer()}.
108 * <p>
109 * TODO: Should we cache the checksums? Down in Writer#getBlockForCaching(CacheConfig) where we make
110 * a block to cache-on-write, there is an attempt at turning off checksums. This is not the only
111 * place we get blocks to cache. We also will cache the raw return from an hdfs read. In this case,
112 * the checksums may be present. If the cache is backed by something that doesn't do ECC, say an
113 * SSD, we might want to preserve checksums. For now this is open question.
114 * <p>
115 * TODO: Over in BucketCache, we save a block allocation by doing a custom serialization. Be sure to
116 * change it if serialization changes in here. Could we add a method here that takes an IOEngine and
117 * that then serializes to it rather than expose our internals over in BucketCache? IOEngine is in
118 * the bucket subpackage. Pull it up? Then this class knows about bucketcache. Ugh.
119 */
120@InterfaceAudience.Private
121public class HFileBlock implements Cacheable {
122  private static final Logger LOG = LoggerFactory.getLogger(HFileBlock.class);
123  public static final long FIXED_OVERHEAD = ClassSize.estimateBase(HFileBlock.class, false);
124
125  // Block Header fields.
126
127  // TODO: encapsulate Header related logic in this inner class.
128  static class Header {
129    // Format of header is:
130    // 8 bytes - block magic
131    // 4 bytes int - onDiskSizeWithoutHeader
132    // 4 bytes int - uncompressedSizeWithoutHeader
133    // 8 bytes long - prevBlockOffset
134    // The following 3 are only present if header contains checksum information
135    // 1 byte - checksum type
136    // 4 byte int - bytes per checksum
137    // 4 byte int - onDiskDataSizeWithHeader
138    static int BLOCK_MAGIC_INDEX = 0;
139    static int ON_DISK_SIZE_WITHOUT_HEADER_INDEX = 8;
140    static int UNCOMPRESSED_SIZE_WITHOUT_HEADER_INDEX = 12;
141    static int PREV_BLOCK_OFFSET_INDEX = 16;
142    static int CHECKSUM_TYPE_INDEX = 24;
143    static int BYTES_PER_CHECKSUM_INDEX = 25;
144    static int ON_DISK_DATA_SIZE_WITH_HEADER_INDEX = 29;
145  }
146
147  /** Type of block. Header field 0. */
148  private BlockType blockType;
149
150  /**
151   * Size on disk excluding header, including checksum. Header field 1.
152   * @see Writer#putHeader(byte[], int, int, int, int)
153   */
154  private int onDiskSizeWithoutHeader;
155
156  /**
157   * Size of pure data. Does not include header or checksums. Header field 2.
158   * @see Writer#putHeader(byte[], int, int, int, int)
159   */
160  private int uncompressedSizeWithoutHeader;
161
162  /**
163   * The offset of the previous block on disk. Header field 3.
164   * @see Writer#putHeader(byte[], int, int, int, int)
165   */
166  private long prevBlockOffset;
167
168  /**
169   * Size on disk of header + data. Excludes checksum. Header field 6, OR calculated from
170   * {@link #onDiskSizeWithoutHeader} when using HDFS checksum.
171   * @see Writer#putHeader(byte[], int, int, int, int)
172   */
173  private int onDiskDataSizeWithHeader;
174  // End of Block Header fields.
175
176  /**
177   * The in-memory representation of the hfile block. Can be on or offheap. Can be backed by a
178   * single ByteBuffer or by many. Make no assumptions.
179   * <p>
180   * Be careful reading from this <code>buf</code>. Duplicate and work on the duplicate or if not,
181   * be sure to reset position and limit else trouble down the road.
182   * <p>
183   * TODO: Make this read-only once made.
184   * <p>
185   * We are using the ByteBuff type. ByteBuffer is not extensible yet we need to be able to have a
186   * ByteBuffer-like API across multiple ByteBuffers reading from a cache such as BucketCache. So,
187   * we have this ByteBuff type. Unfortunately, it is spread all about HFileBlock. Would be good if
188   * could be confined to cache-use only but hard-to-do.
189   */
190  private ByteBuff buf;
191
192  /**
193   * Meta data that holds meta information on the hfileblock.
194   */
195  private HFileContext fileContext;
196
197  /**
198   * The offset of this block in the file. Populated by the reader for convenience of access. This
199   * offset is not part of the block header.
200   */
201  private long offset = UNSET;
202
203  /**
204   * The on-disk size of the next block, including the header and checksums if present. UNSET if
205   * unknown. Blocks try to carry the size of the next block to read in this data member. Usually we
206   * get block sizes from the hfile index but sometimes the index is not available: e.g. when we
207   * read the indexes themselves (indexes are stored in blocks, we do not have an index for the
208   * indexes). Saves seeks especially around file open when there is a flurry of reading in hfile
209   * metadata.
210   */
211  private int nextBlockOnDiskSize = UNSET;
212
213  private ByteBuffAllocator allocator;
214
215  /**
216   * On a checksum failure, do these many succeeding read requests using hdfs checksums before
217   * auto-reenabling hbase checksum verification.
218   */
219  static final int CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD = 3;
220
221  private static int UNSET = -1;
222  public static final boolean FILL_HEADER = true;
223  public static final boolean DONT_FILL_HEADER = false;
224
225  // How to get the estimate correctly? if it is a singleBB?
226  public static final int MULTI_BYTE_BUFFER_HEAP_SIZE =
227    (int) ClassSize.estimateBase(MultiByteBuff.class, false);
228
229  /**
230   * Space for metadata on a block that gets stored along with the block when we cache it. There are
231   * a few bytes stuck on the end of the HFileBlock that we pull in from HDFS. 8 bytes are for the
232   * offset of this block (long) in the file. Offset is important because is is used when we remake
233   * the CacheKey when we return block to the cache when done. There is also a flag on whether
234   * checksumming is being done by hbase or not. See class comment for note on uncertain state of
235   * checksumming of blocks that come out of cache (should we or should we not?). Finally there are
236   * 4 bytes to hold the length of the next block which can save a seek on occasion if available.
237   * (This EXTRA info came in with original commit of the bucketcache, HBASE-7404. It was formerly
238   * known as EXTRA_SERIALIZATION_SPACE).
239   */
240  public static final int BLOCK_METADATA_SPACE =
241    Bytes.SIZEOF_BYTE + Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT;
242
243  /**
244   * Each checksum value is an integer that can be stored in 4 bytes.
245   */
246  static final int CHECKSUM_SIZE = Bytes.SIZEOF_INT;
247
248  static final byte[] DUMMY_HEADER_NO_CHECKSUM =
249    new byte[HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM];
250
251  /**
252   * Used deserializing blocks from Cache. <code>
253   * ++++++++++++++
254   * + HFileBlock +
255   * ++++++++++++++
256   * + Checksums  + <= Optional
257   * ++++++++++++++
258   * + Metadata!  + <= See note on BLOCK_METADATA_SPACE above.
259   * ++++++++++++++
260   * </code>
261   * @see #serialize(ByteBuffer, boolean)
262   */
263  public static final CacheableDeserializer<Cacheable> BLOCK_DESERIALIZER = new BlockDeserializer();
264
265  public static final class BlockDeserializer implements CacheableDeserializer<Cacheable> {
266    private BlockDeserializer() {
267    }
268
269    @Override
270    public HFileBlock deserialize(ByteBuff buf, ByteBuffAllocator alloc) throws IOException {
271      // The buf has the file block followed by block metadata.
272      // Set limit to just before the BLOCK_METADATA_SPACE then rewind.
273      buf.limit(buf.limit() - BLOCK_METADATA_SPACE).rewind();
274      // Get a new buffer to pass the HFileBlock for it to 'own'.
275      ByteBuff newByteBuff = buf.slice();
276      // Read out the BLOCK_METADATA_SPACE content and shove into our HFileBlock.
277      buf.position(buf.limit());
278      buf.limit(buf.limit() + HFileBlock.BLOCK_METADATA_SPACE);
279      boolean usesChecksum = buf.get() == (byte) 1;
280      long offset = buf.getLong();
281      int nextBlockOnDiskSize = buf.getInt();
282      return createFromBuff(newByteBuff, usesChecksum, offset, nextBlockOnDiskSize, null, alloc);
283    }
284
285    @Override
286    public int getDeserializerIdentifier() {
287      return DESERIALIZER_IDENTIFIER;
288    }
289  }
290
291  private static final int DESERIALIZER_IDENTIFIER;
292  static {
293    DESERIALIZER_IDENTIFIER =
294      CacheableDeserializerIdManager.registerDeserializer(BLOCK_DESERIALIZER);
295  }
296
297  /**
298   * Creates a new {@link HFile} block from the given fields. This constructor is used only while
299   * writing blocks and caching, and is sitting in a byte buffer and we want to stuff the block into
300   * cache. See {@link Writer#getBlockForCaching(CacheConfig)}.
301   * <p>
302   * TODO: The caller presumes no checksumming
303   * <p>
304   * TODO: HFile block writer can also off-heap ?
305   * </p>
306   * required of this block instance since going into cache; checksum already verified on underlying
307   * block data pulled in from filesystem. Is that correct? What if cache is SSD?
308   * @param blockType                     the type of this block, see {@link BlockType}
309   * @param onDiskSizeWithoutHeader       see {@link #onDiskSizeWithoutHeader}
310   * @param uncompressedSizeWithoutHeader see {@link #uncompressedSizeWithoutHeader}
311   * @param prevBlockOffset               see {@link #prevBlockOffset}
312   * @param buf                           block buffer with header
313   *                                      ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes)
314   * @param fillHeader                    when true, write the first 4 header fields into passed
315   *                                      buffer.
316   * @param offset                        the file offset the block was read from
317   * @param onDiskDataSizeWithHeader      see {@link #onDiskDataSizeWithHeader}
318   * @param fileContext                   HFile meta data
319   */
320  public HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader,
321    int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuff buf, boolean fillHeader,
322    long offset, int nextBlockOnDiskSize, int onDiskDataSizeWithHeader, HFileContext fileContext,
323    ByteBuffAllocator allocator) {
324    this.blockType = blockType;
325    this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader;
326    this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader;
327    this.prevBlockOffset = prevBlockOffset;
328    this.offset = offset;
329    this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader;
330    this.nextBlockOnDiskSize = nextBlockOnDiskSize;
331    this.fileContext = fileContext;
332    this.allocator = allocator;
333    this.buf = buf;
334    if (fillHeader) {
335      overwriteHeader();
336    }
337    this.buf.rewind();
338  }
339
340  /**
341   * Creates a block from an existing buffer starting with a header. Rewinds and takes ownership of
342   * the buffer. By definition of rewind, ignores the buffer position, but if you slice the buffer
343   * beforehand, it will rewind to that point.
344   * @param buf Has header, content, and trailing checksums if present.
345   */
346  static HFileBlock createFromBuff(ByteBuff buf, boolean usesHBaseChecksum, final long offset,
347    final int nextBlockOnDiskSize, HFileContext fileContext, ByteBuffAllocator allocator)
348    throws IOException {
349    buf.rewind();
350    final BlockType blockType = BlockType.read(buf);
351    final int onDiskSizeWithoutHeader = buf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX);
352    final int uncompressedSizeWithoutHeader =
353      buf.getInt(Header.UNCOMPRESSED_SIZE_WITHOUT_HEADER_INDEX);
354    final long prevBlockOffset = buf.getLong(Header.PREV_BLOCK_OFFSET_INDEX);
355    // This constructor is called when we deserialize a block from cache and when we read a block in
356    // from the fs. fileCache is null when deserialized from cache so need to make up one.
357    HFileContextBuilder fileContextBuilder =
358      fileContext != null ? new HFileContextBuilder(fileContext) : new HFileContextBuilder();
359    fileContextBuilder.withHBaseCheckSum(usesHBaseChecksum);
360    int onDiskDataSizeWithHeader;
361    if (usesHBaseChecksum) {
362      byte checksumType = buf.get(Header.CHECKSUM_TYPE_INDEX);
363      int bytesPerChecksum = buf.getInt(Header.BYTES_PER_CHECKSUM_INDEX);
364      onDiskDataSizeWithHeader = buf.getInt(Header.ON_DISK_DATA_SIZE_WITH_HEADER_INDEX);
365      // Use the checksum type and bytes per checksum from header, not from fileContext.
366      fileContextBuilder.withChecksumType(ChecksumType.codeToType(checksumType));
367      fileContextBuilder.withBytesPerCheckSum(bytesPerChecksum);
368    } else {
369      fileContextBuilder.withChecksumType(ChecksumType.NULL);
370      fileContextBuilder.withBytesPerCheckSum(0);
371      // Need to fix onDiskDataSizeWithHeader; there are not checksums after-block-data
372      onDiskDataSizeWithHeader = onDiskSizeWithoutHeader + headerSize(usesHBaseChecksum);
373    }
374    fileContext = fileContextBuilder.build();
375    assert usesHBaseChecksum == fileContext.isUseHBaseChecksum();
376    return new HFileBlockBuilder().withBlockType(blockType)
377      .withOnDiskSizeWithoutHeader(onDiskSizeWithoutHeader)
378      .withUncompressedSizeWithoutHeader(uncompressedSizeWithoutHeader)
379      .withPrevBlockOffset(prevBlockOffset).withOffset(offset)
380      .withOnDiskDataSizeWithHeader(onDiskDataSizeWithHeader)
381      .withNextBlockOnDiskSize(nextBlockOnDiskSize).withHFileContext(fileContext)
382      .withByteBuffAllocator(allocator).withByteBuff(buf.rewind()).withShared(!buf.hasArray())
383      .build();
384  }
385
386  /**
387   * Parse total on disk size including header and checksum.
388   * @param headerBuf      Header ByteBuffer. Presumed exact size of header.
389   * @param verifyChecksum true if checksum verification is in use.
390   * @return Size of the block with header included.
391   */
392  private static int getOnDiskSizeWithHeader(final ByteBuff headerBuf, boolean verifyChecksum) {
393    return headerBuf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX) + headerSize(verifyChecksum);
394  }
395
396  /**
397   * @return the on-disk size of the next block (including the header size and any checksums if
398   *         present) read by peeking into the next block's header; use as a hint when doing a read
399   *         of the next block when scanning or running over a file.
400   */
401  int getNextBlockOnDiskSize() {
402    return nextBlockOnDiskSize;
403  }
404
405  @Override
406  public BlockType getBlockType() {
407    return blockType;
408  }
409
410  @Override
411  public int refCnt() {
412    return buf.refCnt();
413  }
414
415  @Override
416  public HFileBlock retain() {
417    buf.retain();
418    return this;
419  }
420
421  /**
422   * Call {@link ByteBuff#release()} to decrease the reference count, if no other reference, it will
423   * return back the {@link ByteBuffer} to {@link org.apache.hadoop.hbase.io.ByteBuffAllocator}
424   */
425  @Override
426  public boolean release() {
427    return buf.release();
428  }
429
430  /**
431   * Calling this method in strategic locations where HFileBlocks are referenced may help diagnose
432   * potential buffer leaks. We pass the block itself as a default hint, but one can use
433   * {@link #touch(Object)} to pass their own hint as well.
434   */
435  @Override
436  public HFileBlock touch() {
437    return touch(this);
438  }
439
440  @Override
441  public HFileBlock touch(Object hint) {
442    buf.touch(hint);
443    return this;
444  }
445
446  /** Returns get data block encoding id that was used to encode this block */
447  short getDataBlockEncodingId() {
448    if (blockType != BlockType.ENCODED_DATA) {
449      throw new IllegalArgumentException("Querying encoder ID of a block " + "of type other than "
450        + BlockType.ENCODED_DATA + ": " + blockType);
451    }
452    return buf.getShort(headerSize());
453  }
454
455  /** Returns the on-disk size of header + data part + checksum. */
456  public int getOnDiskSizeWithHeader() {
457    return onDiskSizeWithoutHeader + headerSize();
458  }
459
460  /** Returns the on-disk size of the data part + checksum (header excluded). */
461  int getOnDiskSizeWithoutHeader() {
462    return onDiskSizeWithoutHeader;
463  }
464
465  /** Returns the uncompressed size of data part (header and checksum excluded). */
466  int getUncompressedSizeWithoutHeader() {
467    return uncompressedSizeWithoutHeader;
468  }
469
470  /** Returns the offset of the previous block of the same type in the file, or -1 if unknown */
471  long getPrevBlockOffset() {
472    return prevBlockOffset;
473  }
474
475  /**
476   * Rewinds {@code buf} and writes first 4 header fields. {@code buf} position is modified as
477   * side-effect.
478   */
479  private void overwriteHeader() {
480    buf.rewind();
481    blockType.write(buf);
482    buf.putInt(onDiskSizeWithoutHeader);
483    buf.putInt(uncompressedSizeWithoutHeader);
484    buf.putLong(prevBlockOffset);
485    if (this.fileContext.isUseHBaseChecksum()) {
486      buf.put(fileContext.getChecksumType().getCode());
487      buf.putInt(fileContext.getBytesPerChecksum());
488      buf.putInt(onDiskDataSizeWithHeader);
489    }
490  }
491
492  /**
493   * Returns a buffer that does not include the header and checksum.
494   * @return the buffer with header skipped and checksum omitted.
495   */
496  public ByteBuff getBufferWithoutHeader() {
497    ByteBuff dup = getBufferReadOnly();
498    return dup.position(headerSize()).slice();
499  }
500
501  /**
502   * Returns a read-only duplicate of the buffer this block stores internally ready to be read.
503   * Clients must not modify the buffer object though they may set position and limit on the
504   * returned buffer since we pass back a duplicate. This method has to be public because it is used
505   * in {@link CompoundBloomFilter} to avoid object creation on every Bloom filter lookup, but has
506   * to be used with caution. Buffer holds header, block content, and any follow-on checksums if
507   * present.
508   * @return the buffer of this block for read-only operations
509   */
510  public ByteBuff getBufferReadOnly() {
511    // TODO: ByteBuf does not support asReadOnlyBuffer(). Fix.
512    ByteBuff dup = this.buf.duplicate();
513    assert dup.position() == 0;
514    return dup;
515  }
516
517  public ByteBuffAllocator getByteBuffAllocator() {
518    return this.allocator;
519  }
520
521  private void sanityCheckAssertion(long valueFromBuf, long valueFromField, String fieldName)
522    throws IOException {
523    if (valueFromBuf != valueFromField) {
524      throw new AssertionError(fieldName + " in the buffer (" + valueFromBuf
525        + ") is different from that in the field (" + valueFromField + ")");
526    }
527  }
528
529  private void sanityCheckAssertion(BlockType valueFromBuf, BlockType valueFromField)
530    throws IOException {
531    if (valueFromBuf != valueFromField) {
532      throw new IOException("Block type stored in the buffer: " + valueFromBuf
533        + ", block type field: " + valueFromField);
534    }
535  }
536
537  /**
538   * Checks if the block is internally consistent, i.e. the first
539   * {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the buffer contain a valid header consistent
540   * with the fields. Assumes a packed block structure. This function is primary for testing and
541   * debugging, and is not thread-safe, because it alters the internal buffer pointer. Used by tests
542   * only.
543   */
544  void sanityCheck() throws IOException {
545    // Duplicate so no side-effects
546    ByteBuff dup = this.buf.duplicate().rewind();
547    sanityCheckAssertion(BlockType.read(dup), blockType);
548
549    sanityCheckAssertion(dup.getInt(), onDiskSizeWithoutHeader, "onDiskSizeWithoutHeader");
550
551    sanityCheckAssertion(dup.getInt(), uncompressedSizeWithoutHeader,
552      "uncompressedSizeWithoutHeader");
553
554    sanityCheckAssertion(dup.getLong(), prevBlockOffset, "prevBlockOffset");
555    if (this.fileContext.isUseHBaseChecksum()) {
556      sanityCheckAssertion(dup.get(), this.fileContext.getChecksumType().getCode(), "checksumType");
557      sanityCheckAssertion(dup.getInt(), this.fileContext.getBytesPerChecksum(),
558        "bytesPerChecksum");
559      sanityCheckAssertion(dup.getInt(), onDiskDataSizeWithHeader, "onDiskDataSizeWithHeader");
560    }
561
562    if (dup.limit() != onDiskDataSizeWithHeader) {
563      throw new AssertionError(
564        "Expected limit " + onDiskDataSizeWithHeader + ", got " + dup.limit());
565    }
566
567    // We might optionally allocate HFILEBLOCK_HEADER_SIZE more bytes to read the next
568    // block's header, so there are two sensible values for buffer capacity.
569    int hdrSize = headerSize();
570    dup.rewind();
571    if (
572      dup.remaining() != onDiskDataSizeWithHeader
573        && dup.remaining() != onDiskDataSizeWithHeader + hdrSize
574    ) {
575      throw new AssertionError("Invalid buffer capacity: " + dup.remaining() + ", expected "
576        + onDiskDataSizeWithHeader + " or " + (onDiskDataSizeWithHeader + hdrSize));
577    }
578  }
579
580  @Override
581  public String toString() {
582    StringBuilder sb = new StringBuilder().append("[").append("blockType=").append(blockType)
583      .append(", fileOffset=").append(offset).append(", headerSize=").append(headerSize())
584      .append(", onDiskSizeWithoutHeader=").append(onDiskSizeWithoutHeader)
585      .append(", uncompressedSizeWithoutHeader=").append(uncompressedSizeWithoutHeader)
586      .append(", prevBlockOffset=").append(prevBlockOffset).append(", isUseHBaseChecksum=")
587      .append(fileContext.isUseHBaseChecksum());
588    if (fileContext.isUseHBaseChecksum()) {
589      sb.append(", checksumType=").append(ChecksumType.codeToType(this.buf.get(24)))
590        .append(", bytesPerChecksum=").append(this.buf.getInt(24 + 1))
591        .append(", onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader);
592    } else {
593      sb.append(", onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader).append("(")
594        .append(onDiskSizeWithoutHeader).append("+")
595        .append(HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM).append(")");
596    }
597    String dataBegin;
598    if (buf.hasArray()) {
599      dataBegin = Bytes.toStringBinary(buf.array(), buf.arrayOffset() + headerSize(),
600        Math.min(32, buf.limit() - buf.arrayOffset() - headerSize()));
601    } else {
602      ByteBuff bufWithoutHeader = getBufferWithoutHeader();
603      byte[] dataBeginBytes =
604        new byte[Math.min(32, bufWithoutHeader.limit() - bufWithoutHeader.position())];
605      bufWithoutHeader.get(dataBeginBytes);
606      dataBegin = Bytes.toStringBinary(dataBeginBytes);
607    }
608    sb.append(", getOnDiskSizeWithHeader=").append(getOnDiskSizeWithHeader())
609      .append(", totalChecksumBytes=").append(totalChecksumBytes()).append(", isUnpacked=")
610      .append(isUnpacked()).append(", buf=[").append(buf).append("]").append(", dataBeginsWith=")
611      .append(dataBegin).append(", fileContext=").append(fileContext)
612      .append(", nextBlockOnDiskSize=").append(nextBlockOnDiskSize).append("]");
613    return sb.toString();
614  }
615
616  /**
617   * Retrieves the decompressed/decrypted view of this block. An encoded block remains in its
618   * encoded structure. Internal structures are shared between instances where applicable.
619   */
620  HFileBlock unpack(HFileContext fileContext, FSReader reader) throws IOException {
621    if (!fileContext.isCompressedOrEncrypted()) {
622      // TODO: cannot use our own fileContext here because HFileBlock(ByteBuffer, boolean),
623      // which is used for block serialization to L2 cache, does not preserve encoding and
624      // encryption details.
625      return this;
626    }
627
628    ByteBuff newBuf = allocateBufferForUnpacking(); // allocates space for the decompressed block
629    HFileBlock unpacked = shallowClone(this, newBuf);
630
631    boolean succ = false;
632    final Context context =
633      Context.current().with(CONTEXT_KEY, new HFileContextAttributesBuilderConsumer(fileContext));
634    try (Scope ignored = context.makeCurrent()) {
635      HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA
636        ? reader.getBlockDecodingContext()
637        : reader.getDefaultBlockDecodingContext();
638      // Create a duplicated buffer without the header part.
639      int headerSize = this.headerSize();
640      ByteBuff dup = this.buf.duplicate();
641      dup.position(headerSize);
642      dup = dup.slice();
643      // Decode the dup into unpacked#buf
644      ctx.prepareDecoding(unpacked.getOnDiskDataSizeWithHeader() - headerSize,
645        unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(), dup);
646      succ = true;
647      return unpacked;
648    } finally {
649      if (!succ) {
650        unpacked.release();
651      }
652    }
653  }
654
655  /**
656   * Always allocates a new buffer of the correct size. Copies header bytes from the existing
657   * buffer. Does not change header fields. Reserve room to keep checksum bytes too.
658   */
659  private ByteBuff allocateBufferForUnpacking() {
660    int headerSize = headerSize();
661    int capacityNeeded = headerSize + uncompressedSizeWithoutHeader;
662
663    ByteBuff source = buf.duplicate();
664    ByteBuff newBuf = allocator.allocate(capacityNeeded);
665
666    // Copy header bytes into newBuf.
667    source.position(0);
668    newBuf.put(0, source, 0, headerSize);
669
670    // set limit to exclude next block's header
671    newBuf.limit(capacityNeeded);
672    return newBuf;
673  }
674
675  /**
676   * Return true when this block's buffer has been unpacked, false otherwise. Note this is a
677   * calculated heuristic, not tracked attribute of the block.
678   */
679  public boolean isUnpacked() {
680    final int headerSize = headerSize();
681    final int expectedCapacity = headerSize + uncompressedSizeWithoutHeader;
682    final int bufCapacity = buf.remaining();
683    return bufCapacity == expectedCapacity || bufCapacity == expectedCapacity + headerSize;
684  }
685
686  /**
687   * Cannot be {@link #UNSET}. Must be a legitimate value. Used re-making the {@link BlockCacheKey}
688   * when block is returned to the cache.
689   * @return the offset of this block in the file it was read from
690   */
691  long getOffset() {
692    if (offset < 0) {
693      throw new IllegalStateException("HFile block offset not initialized properly");
694    }
695    return offset;
696  }
697
698  /** Returns a byte stream reading the data + checksum of this block */
699  DataInputStream getByteStream() {
700    ByteBuff dup = this.buf.duplicate();
701    dup.position(this.headerSize());
702    return new DataInputStream(new ByteBuffInputStream(dup));
703  }
704
705  @Override
706  public long heapSize() {
707    long size = FIXED_OVERHEAD;
708    size += fileContext.heapSize();
709    if (buf != null) {
710      // Deep overhead of the byte buffer. Needs to be aligned separately.
711      size += ClassSize.align(buf.capacity() + MULTI_BYTE_BUFFER_HEAP_SIZE);
712    }
713    return ClassSize.align(size);
714  }
715
716  /**
717   * Will be override by {@link SharedMemHFileBlock} or {@link ExclusiveMemHFileBlock}. Return true
718   * by default.
719   */
720  public boolean isSharedMem() {
721    return true;
722  }
723
724  /**
725   * Unified version 2 {@link HFile} block writer. The intended usage pattern is as follows:
726   * <ol>
727   * <li>Construct an {@link HFileBlock.Writer}, providing a compression algorithm.
728   * <li>Call {@link Writer#startWriting} and get a data stream to write to.
729   * <li>Write your data into the stream.
730   * <li>Call Writer#writeHeaderAndData(FSDataOutputStream) as many times as you need to. store the
731   * serialized block into an external stream.
732   * <li>Repeat to write more blocks.
733   * </ol>
734   * <p>
735   */
736  static class Writer implements ShipperListener {
737    private enum State {
738      INIT,
739      WRITING,
740      BLOCK_READY
741    };
742
743    /** Writer state. Used to ensure the correct usage protocol. */
744    private State state = State.INIT;
745
746    /** Data block encoder used for data blocks */
747    private final HFileDataBlockEncoder dataBlockEncoder;
748
749    private HFileBlockEncodingContext dataBlockEncodingCtx;
750
751    /** block encoding context for non-data blocks */
752    private HFileBlockDefaultEncodingContext defaultBlockEncodingCtx;
753
754    /**
755     * The stream we use to accumulate data into a block in an uncompressed format. We reset this
756     * stream at the end of each block and reuse it. The header is written as the first
757     * {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes into this stream.
758     */
759    private ByteArrayOutputStream baosInMemory;
760
761    /**
762     * Current block type. Set in {@link #startWriting(BlockType)}. Could be changed in
763     * {@link #finishBlock()} from {@link BlockType#DATA} to {@link BlockType#ENCODED_DATA}.
764     */
765    private BlockType blockType;
766
767    /**
768     * A stream that we write uncompressed bytes to, which compresses them and writes them to
769     * {@link #baosInMemory}.
770     */
771    private DataOutputStream userDataStream;
772
773    /**
774     * Bytes to be written to the file system, including the header. Compressed if compression is
775     * turned on. It also includes the checksum data that immediately follows the block data.
776     * (header + data + checksums)
777     */
778    private ByteArrayOutputStream onDiskBlockBytesWithHeader;
779
780    /**
781     * The size of the checksum data on disk. It is used only if data is not compressed. If data is
782     * compressed, then the checksums are already part of onDiskBytesWithHeader. If data is
783     * uncompressed, then this variable stores the checksum data for this block.
784     */
785    private byte[] onDiskChecksum = HConstants.EMPTY_BYTE_ARRAY;
786
787    /**
788     * Current block's start offset in the {@link HFile}. Set in
789     * {@link #writeHeaderAndData(FSDataOutputStream)}.
790     */
791    private long startOffset;
792
793    /**
794     * Offset of previous block by block type. Updated when the next block is started.
795     */
796    private long[] prevOffsetByType;
797
798    /** The offset of the previous block of the same type */
799    private long prevOffset;
800    /** Meta data that holds information about the hfileblock **/
801    private HFileContext fileContext;
802
803    private final ByteBuffAllocator allocator;
804
805    @Override
806    public void beforeShipped() {
807      if (getEncodingState() != null) {
808        getEncodingState().beforeShipped();
809      }
810    }
811
812    EncodingState getEncodingState() {
813      return dataBlockEncodingCtx.getEncodingState();
814    }
815
816    /**
817     * @param dataBlockEncoder data block encoding algorithm to use
818     */
819    public Writer(Configuration conf, HFileDataBlockEncoder dataBlockEncoder,
820      HFileContext fileContext) {
821      this(conf, dataBlockEncoder, fileContext, ByteBuffAllocator.HEAP);
822    }
823
824    public Writer(Configuration conf, HFileDataBlockEncoder dataBlockEncoder,
825      HFileContext fileContext, ByteBuffAllocator allocator) {
826      if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) {
827        throw new RuntimeException("Unsupported value of bytesPerChecksum. " + " Minimum is "
828          + HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is "
829          + fileContext.getBytesPerChecksum());
830      }
831      this.allocator = allocator;
832      this.dataBlockEncoder =
833        dataBlockEncoder != null ? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE;
834      this.dataBlockEncodingCtx = this.dataBlockEncoder.newDataBlockEncodingContext(conf,
835        HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
836      // TODO: This should be lazily instantiated
837      this.defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(conf, null,
838        HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
839      // TODO: Set BAOS initial size. Use fileContext.getBlocksize() and add for header/checksum
840      baosInMemory = new ByteArrayOutputStream();
841      prevOffsetByType = new long[BlockType.values().length];
842      for (int i = 0; i < prevOffsetByType.length; ++i) {
843        prevOffsetByType[i] = UNSET;
844      }
845      // TODO: Why fileContext saved away when we have dataBlockEncoder and/or
846      // defaultDataBlockEncoder?
847      this.fileContext = fileContext;
848    }
849
850    /**
851     * Starts writing into the block. The previous block's data is discarded.
852     * @return the stream the user can write their data into
853     */
854    DataOutputStream startWriting(BlockType newBlockType) throws IOException {
855      if (state == State.BLOCK_READY && startOffset != -1) {
856        // We had a previous block that was written to a stream at a specific
857        // offset. Save that offset as the last offset of a block of that type.
858        prevOffsetByType[blockType.getId()] = startOffset;
859      }
860
861      startOffset = -1;
862      blockType = newBlockType;
863
864      baosInMemory.reset();
865      baosInMemory.write(HConstants.HFILEBLOCK_DUMMY_HEADER);
866
867      state = State.WRITING;
868
869      // We will compress it later in finishBlock()
870      userDataStream = new ByteBufferWriterDataOutputStream(baosInMemory);
871      if (newBlockType == BlockType.DATA) {
872        this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream);
873      }
874      return userDataStream;
875    }
876
877    /**
878     * Writes the Cell to this block
879     */
880    void write(Cell cell) throws IOException {
881      expectState(State.WRITING);
882      this.dataBlockEncoder.encode(cell, dataBlockEncodingCtx, this.userDataStream);
883    }
884
885    /**
886     * Transitions the block writer from the "writing" state to the "block ready" state. Does
887     * nothing if a block is already finished.
888     */
889    void ensureBlockReady() throws IOException {
890      Preconditions.checkState(state != State.INIT, "Unexpected state: " + state);
891
892      if (state == State.BLOCK_READY) {
893        return;
894      }
895
896      // This will set state to BLOCK_READY.
897      finishBlock();
898    }
899
900    /**
901     * Finish up writing of the block. Flushes the compressing stream (if using compression), fills
902     * out the header, does any compression/encryption of bytes to flush out to disk, and manages
903     * the cache on write content, if applicable. Sets block write state to "block ready".
904     */
905    private void finishBlock() throws IOException {
906      if (blockType == BlockType.DATA) {
907        this.dataBlockEncoder.endBlockEncoding(dataBlockEncodingCtx, userDataStream,
908          baosInMemory.getBuffer(), blockType);
909        blockType = dataBlockEncodingCtx.getBlockType();
910      }
911      userDataStream.flush();
912      prevOffset = prevOffsetByType[blockType.getId()];
913
914      // We need to set state before we can package the block up for cache-on-write. In a way, the
915      // block is ready, but not yet encoded or compressed.
916      state = State.BLOCK_READY;
917      Bytes compressAndEncryptDat;
918      if (blockType == BlockType.DATA || blockType == BlockType.ENCODED_DATA) {
919        compressAndEncryptDat =
920          dataBlockEncodingCtx.compressAndEncrypt(baosInMemory.getBuffer(), 0, baosInMemory.size());
921      } else {
922        compressAndEncryptDat = defaultBlockEncodingCtx.compressAndEncrypt(baosInMemory.getBuffer(),
923          0, baosInMemory.size());
924      }
925      if (compressAndEncryptDat == null) {
926        compressAndEncryptDat = new Bytes(baosInMemory.getBuffer(), 0, baosInMemory.size());
927      }
928      if (onDiskBlockBytesWithHeader == null) {
929        onDiskBlockBytesWithHeader = new ByteArrayOutputStream(compressAndEncryptDat.getLength());
930      }
931      onDiskBlockBytesWithHeader.reset();
932      onDiskBlockBytesWithHeader.write(compressAndEncryptDat.get(),
933        compressAndEncryptDat.getOffset(), compressAndEncryptDat.getLength());
934      // Calculate how many bytes we need for checksum on the tail of the block.
935      int numBytes = (int) ChecksumUtil.numBytes(onDiskBlockBytesWithHeader.size(),
936        fileContext.getBytesPerChecksum());
937
938      // Put the header for the on disk bytes; header currently is unfilled-out
939      putHeader(onDiskBlockBytesWithHeader, onDiskBlockBytesWithHeader.size() + numBytes,
940        baosInMemory.size(), onDiskBlockBytesWithHeader.size());
941      if (onDiskChecksum.length != numBytes) {
942        onDiskChecksum = new byte[numBytes];
943      }
944      ChecksumUtil.generateChecksums(onDiskBlockBytesWithHeader.getBuffer(), 0,
945        onDiskBlockBytesWithHeader.size(), onDiskChecksum, 0, fileContext.getChecksumType(),
946        fileContext.getBytesPerChecksum());
947    }
948
949    /**
950     * Put the header into the given byte array at the given offset.
951     * @param onDiskSize       size of the block on disk header + data + checksum
952     * @param uncompressedSize size of the block after decompression (but before optional data block
953     *                         decoding) including header
954     * @param onDiskDataSize   size of the block on disk with header and data but not including the
955     *                         checksums
956     */
957    private void putHeader(byte[] dest, int offset, int onDiskSize, int uncompressedSize,
958      int onDiskDataSize) {
959      offset = blockType.put(dest, offset);
960      offset = Bytes.putInt(dest, offset, onDiskSize - HConstants.HFILEBLOCK_HEADER_SIZE);
961      offset = Bytes.putInt(dest, offset, uncompressedSize - HConstants.HFILEBLOCK_HEADER_SIZE);
962      offset = Bytes.putLong(dest, offset, prevOffset);
963      offset = Bytes.putByte(dest, offset, fileContext.getChecksumType().getCode());
964      offset = Bytes.putInt(dest, offset, fileContext.getBytesPerChecksum());
965      Bytes.putInt(dest, offset, onDiskDataSize);
966    }
967
968    private void putHeader(ByteBuff buff, int onDiskSize, int uncompressedSize,
969      int onDiskDataSize) {
970      buff.rewind();
971      blockType.write(buff);
972      buff.putInt(onDiskSize - HConstants.HFILEBLOCK_HEADER_SIZE);
973      buff.putInt(uncompressedSize - HConstants.HFILEBLOCK_HEADER_SIZE);
974      buff.putLong(prevOffset);
975      buff.put(fileContext.getChecksumType().getCode());
976      buff.putInt(fileContext.getBytesPerChecksum());
977      buff.putInt(onDiskDataSize);
978    }
979
980    private void putHeader(ByteArrayOutputStream dest, int onDiskSize, int uncompressedSize,
981      int onDiskDataSize) {
982      putHeader(dest.getBuffer(), 0, onDiskSize, uncompressedSize, onDiskDataSize);
983    }
984
985    /**
986     * Similar to {@link #writeHeaderAndData(FSDataOutputStream)}, but records the offset of this
987     * block so that it can be referenced in the next block of the same type.
988     */
989    void writeHeaderAndData(FSDataOutputStream out) throws IOException {
990      long offset = out.getPos();
991      if (startOffset != UNSET && offset != startOffset) {
992        throw new IOException("A " + blockType + " block written to a "
993          + "stream twice, first at offset " + startOffset + ", then at " + offset);
994      }
995      startOffset = offset;
996      finishBlockAndWriteHeaderAndData(out);
997    }
998
999    /**
1000     * Writes the header and the compressed data of this block (or uncompressed data when not using
1001     * compression) into the given stream. Can be called in the "writing" state or in the "block
1002     * ready" state. If called in the "writing" state, transitions the writer to the "block ready"
1003     * state.
1004     * @param out the output stream to write the
1005     */
1006    protected void finishBlockAndWriteHeaderAndData(DataOutputStream out) throws IOException {
1007      ensureBlockReady();
1008      long startTime = EnvironmentEdgeManager.currentTime();
1009      out.write(onDiskBlockBytesWithHeader.getBuffer(), 0, onDiskBlockBytesWithHeader.size());
1010      out.write(onDiskChecksum);
1011      HFile.updateWriteLatency(EnvironmentEdgeManager.currentTime() - startTime);
1012    }
1013
1014    /**
1015     * Returns the header or the compressed data (or uncompressed data when not using compression)
1016     * as a byte array. Can be called in the "writing" state or in the "block ready" state. If
1017     * called in the "writing" state, transitions the writer to the "block ready" state. This
1018     * returns the header + data + checksums stored on disk.
1019     * @return header and data as they would be stored on disk in a byte array
1020     */
1021    byte[] getHeaderAndDataForTest() throws IOException {
1022      ensureBlockReady();
1023      // This is not very optimal, because we are doing an extra copy.
1024      // But this method is used only by unit tests.
1025      byte[] output = new byte[onDiskBlockBytesWithHeader.size() + onDiskChecksum.length];
1026      System.arraycopy(onDiskBlockBytesWithHeader.getBuffer(), 0, output, 0,
1027        onDiskBlockBytesWithHeader.size());
1028      System.arraycopy(onDiskChecksum, 0, output, onDiskBlockBytesWithHeader.size(),
1029        onDiskChecksum.length);
1030      return output;
1031    }
1032
1033    /**
1034     * Releases resources used by this writer.
1035     */
1036    void release() {
1037      if (dataBlockEncodingCtx != null) {
1038        dataBlockEncodingCtx.close();
1039        dataBlockEncodingCtx = null;
1040      }
1041      if (defaultBlockEncodingCtx != null) {
1042        defaultBlockEncodingCtx.close();
1043        defaultBlockEncodingCtx = null;
1044      }
1045    }
1046
1047    /**
1048     * Returns the on-disk size of the data portion of the block. This is the compressed size if
1049     * compression is enabled. Can only be called in the "block ready" state. Header is not
1050     * compressed, and its size is not included in the return value.
1051     * @return the on-disk size of the block, not including the header.
1052     */
1053    int getOnDiskSizeWithoutHeader() {
1054      expectState(State.BLOCK_READY);
1055      return onDiskBlockBytesWithHeader.size() + onDiskChecksum.length
1056        - HConstants.HFILEBLOCK_HEADER_SIZE;
1057    }
1058
1059    /**
1060     * Returns the on-disk size of the block. Can only be called in the "block ready" state.
1061     * @return the on-disk size of the block ready to be written, including the header size, the
1062     *         data and the checksum data.
1063     */
1064    int getOnDiskSizeWithHeader() {
1065      expectState(State.BLOCK_READY);
1066      return onDiskBlockBytesWithHeader.size() + onDiskChecksum.length;
1067    }
1068
1069    /**
1070     * The uncompressed size of the block data. Does not include header size.
1071     */
1072    int getUncompressedSizeWithoutHeader() {
1073      expectState(State.BLOCK_READY);
1074      return baosInMemory.size() - HConstants.HFILEBLOCK_HEADER_SIZE;
1075    }
1076
1077    /**
1078     * The uncompressed size of the block data, including header size.
1079     */
1080    int getUncompressedSizeWithHeader() {
1081      expectState(State.BLOCK_READY);
1082      return baosInMemory.size();
1083    }
1084
1085    /** Returns true if a block is being written */
1086    boolean isWriting() {
1087      return state == State.WRITING;
1088    }
1089
1090    /**
1091     * Returns the number of bytes written into the current block so far, or zero if not writing the
1092     * block at the moment. Note that this will return zero in the "block ready" state as well.
1093     * @return the number of bytes written
1094     */
1095    public int encodedBlockSizeWritten() {
1096      return state != State.WRITING ? 0 : this.getEncodingState().getEncodedDataSizeWritten();
1097    }
1098
1099    /**
1100     * Returns the number of bytes written into the current block so far, or zero if not writing the
1101     * block at the moment. Note that this will return zero in the "block ready" state as well.
1102     * @return the number of bytes written
1103     */
1104    int blockSizeWritten() {
1105      return state != State.WRITING ? 0 : this.getEncodingState().getUnencodedDataSizeWritten();
1106    }
1107
1108    /**
1109     * Clones the header followed by the uncompressed data, even if using compression. This is
1110     * needed for storing uncompressed blocks in the block cache. Can be called in the "writing"
1111     * state or the "block ready" state. Returns only the header and data, does not include checksum
1112     * data.
1113     * @return Returns an uncompressed block ByteBuff for caching on write
1114     */
1115    ByteBuff cloneUncompressedBufferWithHeader() {
1116      expectState(State.BLOCK_READY);
1117      ByteBuff bytebuff = allocator.allocate(baosInMemory.size());
1118      baosInMemory.toByteBuff(bytebuff);
1119      int numBytes = (int) ChecksumUtil.numBytes(onDiskBlockBytesWithHeader.size(),
1120        fileContext.getBytesPerChecksum());
1121      putHeader(bytebuff, onDiskBlockBytesWithHeader.size() + numBytes, baosInMemory.size(),
1122        onDiskBlockBytesWithHeader.size());
1123      bytebuff.rewind();
1124      return bytebuff;
1125    }
1126
1127    /**
1128     * Clones the header followed by the on-disk (compressed/encoded/encrypted) data. This is needed
1129     * for storing packed blocks in the block cache. Returns only the header and data, Does not
1130     * include checksum data.
1131     * @return Returns a copy of block bytes for caching on write
1132     */
1133    private ByteBuff cloneOnDiskBufferWithHeader() {
1134      expectState(State.BLOCK_READY);
1135      ByteBuff bytebuff = allocator.allocate(onDiskBlockBytesWithHeader.size());
1136      onDiskBlockBytesWithHeader.toByteBuff(bytebuff);
1137      bytebuff.rewind();
1138      return bytebuff;
1139    }
1140
1141    private void expectState(State expectedState) {
1142      if (state != expectedState) {
1143        throw new IllegalStateException(
1144          "Expected state: " + expectedState + ", actual state: " + state);
1145      }
1146    }
1147
1148    /**
1149     * Takes the given {@link BlockWritable} instance, creates a new block of its appropriate type,
1150     * writes the writable into this block, and flushes the block into the output stream. The writer
1151     * is instructed not to buffer uncompressed bytes for cache-on-write.
1152     * @param bw  the block-writable object to write as a block
1153     * @param out the file system output stream
1154     */
1155    void writeBlock(BlockWritable bw, FSDataOutputStream out) throws IOException {
1156      bw.writeToBlock(startWriting(bw.getBlockType()));
1157      writeHeaderAndData(out);
1158    }
1159
1160    /**
1161     * Creates a new HFileBlock. Checksums have already been validated, so the byte buffer passed
1162     * into the constructor of this newly created block does not have checksum data even though the
1163     * header minor version is MINOR_VERSION_WITH_CHECKSUM. This is indicated by setting a 0 value
1164     * in bytesPerChecksum. This method copies the on-disk or uncompressed data to build the
1165     * HFileBlock which is used only while writing blocks and caching.
1166     * <p>
1167     * TODO: Should there be an option where a cache can ask that hbase preserve block checksums for
1168     * checking after a block comes out of the cache? Otehrwise, cache is responsible for blocks
1169     * being wholesome (ECC memory or if file-backed, it does checksumming).
1170     */
1171    HFileBlock getBlockForCaching(CacheConfig cacheConf) {
1172      HFileContext newContext = new HFileContextBuilder().withBlockSize(fileContext.getBlocksize())
1173        .withBytesPerCheckSum(0).withChecksumType(ChecksumType.NULL) // no checksums in cached data
1174        .withCompression(fileContext.getCompression())
1175        .withDataBlockEncoding(fileContext.getDataBlockEncoding())
1176        .withHBaseCheckSum(fileContext.isUseHBaseChecksum())
1177        .withCompressTags(fileContext.isCompressTags())
1178        .withIncludesMvcc(fileContext.isIncludesMvcc())
1179        .withIncludesTags(fileContext.isIncludesTags())
1180        .withColumnFamily(fileContext.getColumnFamily()).withTableName(fileContext.getTableName())
1181        .build();
1182      // Build the HFileBlock.
1183      HFileBlockBuilder builder = new HFileBlockBuilder();
1184      ByteBuff buff;
1185      if (cacheConf.shouldCacheCompressed(blockType.getCategory())) {
1186        buff = cloneOnDiskBufferWithHeader();
1187      } else {
1188        buff = cloneUncompressedBufferWithHeader();
1189      }
1190      return builder.withBlockType(blockType)
1191        .withOnDiskSizeWithoutHeader(getOnDiskSizeWithoutHeader())
1192        .withUncompressedSizeWithoutHeader(getUncompressedSizeWithoutHeader())
1193        .withPrevBlockOffset(prevOffset).withByteBuff(buff).withFillHeader(FILL_HEADER)
1194        .withOffset(startOffset).withNextBlockOnDiskSize(UNSET)
1195        .withOnDiskDataSizeWithHeader(onDiskBlockBytesWithHeader.size() + onDiskChecksum.length)
1196        .withHFileContext(newContext).withByteBuffAllocator(cacheConf.getByteBuffAllocator())
1197        .withShared(!buff.hasArray()).build();
1198    }
1199  }
1200
1201  /** Something that can be written into a block. */
1202  interface BlockWritable {
1203    /** The type of block this data should use. */
1204    BlockType getBlockType();
1205
1206    /**
1207     * Writes the block to the provided stream. Must not write any magic records.
1208     * @param out a stream to write uncompressed data into
1209     */
1210    void writeToBlock(DataOutput out) throws IOException;
1211  }
1212
1213  /**
1214   * Iterator for reading {@link HFileBlock}s in load-on-open-section, such as root data index
1215   * block, meta index block, file info block etc.
1216   */
1217  interface BlockIterator {
1218    /**
1219     * Get the next block, or null if there are no more blocks to iterate.
1220     */
1221    HFileBlock nextBlock() throws IOException;
1222
1223    /**
1224     * Similar to {@link #nextBlock()} but checks block type, throws an exception if incorrect, and
1225     * returns the HFile block
1226     */
1227    HFileBlock nextBlockWithBlockType(BlockType blockType) throws IOException;
1228
1229    /**
1230     * Now we use the {@link ByteBuffAllocator} to manage the nio ByteBuffers for HFileBlocks, so we
1231     * must deallocate all of the ByteBuffers in the end life. the BlockIterator's life cycle is
1232     * starting from opening an HFileReader and stopped when the HFileReader#close, so we will keep
1233     * track all the read blocks until we call {@link BlockIterator#freeBlocks()} when closing the
1234     * HFileReader. Sum bytes of those blocks in load-on-open section should be quite small, so
1235     * tracking them should be OK.
1236     */
1237    void freeBlocks();
1238  }
1239
1240  /** An HFile block reader with iteration ability. */
1241  interface FSReader {
1242    /**
1243     * Reads the block at the given offset in the file with the given on-disk size and uncompressed
1244     * size.
1245     * @param offset        of the file to read
1246     * @param onDiskSize    the on-disk size of the entire block, including all applicable headers,
1247     *                      or -1 if unknown
1248     * @param pread         true to use pread, otherwise use the stream read.
1249     * @param updateMetrics update the metrics or not.
1250     * @param intoHeap      allocate the block's ByteBuff by {@link ByteBuffAllocator} or JVM heap.
1251     *                      For LRUBlockCache, we must ensure that the block to cache is an heap
1252     *                      one, because the memory occupation is based on heap now, also for
1253     *                      {@link CombinedBlockCache}, we use the heap LRUBlockCache as L1 cache to
1254     *                      cache small blocks such as IndexBlock or MetaBlock for faster access. So
1255     *                      introduce an flag here to decide whether allocate from JVM heap or not
1256     *                      so that we can avoid an extra off-heap to heap memory copy when using
1257     *                      LRUBlockCache. For most cases, we known what's the expected block type
1258     *                      we'll read, while for some special case (Example:
1259     *                      HFileReaderImpl#readNextDataBlock()), we cannot pre-decide what's the
1260     *                      expected block type, then we can only allocate block's ByteBuff from
1261     *                      {@link ByteBuffAllocator} firstly, and then when caching it in
1262     *                      {@link LruBlockCache} we'll check whether the ByteBuff is from heap or
1263     *                      not, if not then we'll clone it to an heap one and cache it.
1264     * @return the newly read block
1265     */
1266    HFileBlock readBlockData(long offset, long onDiskSize, boolean pread, boolean updateMetrics,
1267      boolean intoHeap) throws IOException;
1268
1269    /**
1270     * Creates a block iterator over the given portion of the {@link HFile}. The iterator returns
1271     * blocks starting with offset such that offset &lt;= startOffset &lt; endOffset. Returned
1272     * blocks are always unpacked. Used when no hfile index available; e.g. reading in the hfile
1273     * index blocks themselves on file open.
1274     * @param startOffset the offset of the block to start iteration with
1275     * @param endOffset   the offset to end iteration at (exclusive)
1276     * @return an iterator of blocks between the two given offsets
1277     */
1278    BlockIterator blockRange(long startOffset, long endOffset);
1279
1280    /** Closes the backing streams */
1281    void closeStreams() throws IOException;
1282
1283    /** Get a decoder for {@link BlockType#ENCODED_DATA} blocks from this file. */
1284    HFileBlockDecodingContext getBlockDecodingContext();
1285
1286    /** Get the default decoder for blocks from this file. */
1287    HFileBlockDecodingContext getDefaultBlockDecodingContext();
1288
1289    void setIncludesMemStoreTS(boolean includesMemstoreTS);
1290
1291    void setDataBlockEncoder(HFileDataBlockEncoder encoder, Configuration conf);
1292
1293    /**
1294     * To close the stream's socket. Note: This can be concurrently called from multiple threads and
1295     * implementation should take care of thread safety.
1296     */
1297    void unbufferStream();
1298  }
1299
1300  /**
1301   * Data-structure to use caching the header of the NEXT block. Only works if next read that comes
1302   * in here is next in sequence in this block. When we read, we read current block and the next
1303   * blocks' header. We do this so we have the length of the next block to read if the hfile index
1304   * is not available (rare, at hfile open only).
1305   */
1306  private static class PrefetchedHeader {
1307    long offset = -1;
1308    byte[] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE];
1309    final ByteBuff buf = new SingleByteBuff(ByteBuffer.wrap(header, 0, header.length));
1310
1311    @Override
1312    public String toString() {
1313      return "offset=" + this.offset + ", header=" + Bytes.toStringBinary(header);
1314    }
1315  }
1316
1317  /**
1318   * Reads version 2 HFile blocks from the filesystem.
1319   */
1320  static class FSReaderImpl implements FSReader {
1321    /**
1322     * The file system stream of the underlying {@link HFile} that does or doesn't do checksum
1323     * validations in the filesystem
1324     */
1325    private FSDataInputStreamWrapper streamWrapper;
1326
1327    private HFileBlockDecodingContext encodedBlockDecodingCtx;
1328
1329    /** Default context used when BlockType != {@link BlockType#ENCODED_DATA}. */
1330    private final HFileBlockDefaultDecodingContext defaultDecodingCtx;
1331
1332    /**
1333     * Cache of the NEXT header after this. Check it is indeed next blocks header before using it.
1334     * TODO: Review. This overread into next block to fetch next blocks header seems unnecessary
1335     * given we usually get the block size from the hfile index. Review!
1336     */
1337    private AtomicReference<PrefetchedHeader> prefetchedHeader =
1338      new AtomicReference<>(new PrefetchedHeader());
1339
1340    /** The size of the file we are reading from, or -1 if unknown. */
1341    private long fileSize;
1342
1343    /** The size of the header */
1344    protected final int hdrSize;
1345
1346    /** The filesystem used to access data */
1347    private HFileSystem hfs;
1348
1349    private HFileContext fileContext;
1350    // Cache the fileName
1351    private String pathName;
1352
1353    private final ByteBuffAllocator allocator;
1354
1355    private final Lock streamLock = new ReentrantLock();
1356
1357    private final boolean isPreadAllBytes;
1358
1359    FSReaderImpl(ReaderContext readerContext, HFileContext fileContext, ByteBuffAllocator allocator,
1360      Configuration conf) throws IOException {
1361      this.fileSize = readerContext.getFileSize();
1362      this.hfs = readerContext.getFileSystem();
1363      if (readerContext.getFilePath() != null) {
1364        this.pathName = readerContext.getFilePath().toString();
1365      }
1366      this.fileContext = fileContext;
1367      this.hdrSize = headerSize(fileContext.isUseHBaseChecksum());
1368      this.allocator = allocator;
1369
1370      this.streamWrapper = readerContext.getInputStreamWrapper();
1371      // Older versions of HBase didn't support checksum.
1372      this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum());
1373      defaultDecodingCtx = new HFileBlockDefaultDecodingContext(conf, fileContext);
1374      encodedBlockDecodingCtx = defaultDecodingCtx;
1375      isPreadAllBytes = readerContext.isPreadAllBytes();
1376    }
1377
1378    @Override
1379    public BlockIterator blockRange(final long startOffset, final long endOffset) {
1380      final FSReader owner = this; // handle for inner class
1381      return new BlockIterator() {
1382        private volatile boolean freed = false;
1383        // Tracking all read blocks until we call freeBlocks.
1384        private List<HFileBlock> blockTracker = new ArrayList<>();
1385        private long offset = startOffset;
1386        // Cache length of next block. Current block has the length of next block in it.
1387        private long length = -1;
1388
1389        @Override
1390        public HFileBlock nextBlock() throws IOException {
1391          if (offset >= endOffset) {
1392            return null;
1393          }
1394          HFileBlock b = readBlockData(offset, length, false, false, true);
1395          offset += b.getOnDiskSizeWithHeader();
1396          length = b.getNextBlockOnDiskSize();
1397          HFileBlock uncompressed = b.unpack(fileContext, owner);
1398          if (uncompressed != b) {
1399            b.release(); // Need to release the compressed Block now.
1400          }
1401          blockTracker.add(uncompressed);
1402          return uncompressed;
1403        }
1404
1405        @Override
1406        public HFileBlock nextBlockWithBlockType(BlockType blockType) throws IOException {
1407          HFileBlock blk = nextBlock();
1408          if (blk.getBlockType() != blockType) {
1409            throw new IOException(
1410              "Expected block of type " + blockType + " but found " + blk.getBlockType());
1411          }
1412          return blk;
1413        }
1414
1415        @Override
1416        public void freeBlocks() {
1417          if (freed) {
1418            return;
1419          }
1420          blockTracker.forEach(HFileBlock::release);
1421          blockTracker = null;
1422          freed = true;
1423        }
1424      };
1425    }
1426
1427    /**
1428     * Does a positional read or a seek and read into the given byte buffer. We need take care that
1429     * we will call the {@link ByteBuff#release()} for every exit to deallocate the ByteBuffers,
1430     * otherwise the memory leak may happen.
1431     * @param dest              destination buffer
1432     * @param size              size of read
1433     * @param peekIntoNextBlock whether to read the next block's on-disk size
1434     * @param fileOffset        position in the stream to read at
1435     * @param pread             whether we should do a positional read
1436     * @param istream           The input source of data
1437     * @return true to indicate the destination buffer include the next block header, otherwise only
1438     *         include the current block data without the next block header.
1439     * @throws IOException if any IO error happen.
1440     */
1441    protected boolean readAtOffset(FSDataInputStream istream, ByteBuff dest, int size,
1442      boolean peekIntoNextBlock, long fileOffset, boolean pread) throws IOException {
1443      if (!pread) {
1444        // Seek + read. Better for scanning.
1445        HFileUtil.seekOnMultipleSources(istream, fileOffset);
1446        long realOffset = istream.getPos();
1447        if (realOffset != fileOffset) {
1448          throw new IOException("Tried to seek to " + fileOffset + " to read " + size
1449            + " bytes, but pos=" + realOffset + " after seek");
1450        }
1451        if (!peekIntoNextBlock) {
1452          BlockIOUtils.readFully(dest, istream, size);
1453          return false;
1454        }
1455
1456        // Try to read the next block header
1457        if (!BlockIOUtils.readWithExtra(dest, istream, size, hdrSize)) {
1458          // did not read the next block header.
1459          return false;
1460        }
1461      } else {
1462        // Positional read. Better for random reads; or when the streamLock is already locked.
1463        int extraSize = peekIntoNextBlock ? hdrSize : 0;
1464        if (
1465          !BlockIOUtils.preadWithExtra(dest, istream, fileOffset, size, extraSize, isPreadAllBytes)
1466        ) {
1467          // did not read the next block header.
1468          return false;
1469        }
1470      }
1471      assert peekIntoNextBlock;
1472      return true;
1473    }
1474
1475    /**
1476     * Reads a version 2 block (version 1 blocks not supported and not expected). Tries to do as
1477     * little memory allocation as possible, using the provided on-disk size.
1478     * @param offset                the offset in the stream to read at
1479     * @param onDiskSizeWithHeaderL the on-disk size of the block, including the header, or -1 if
1480     *                              unknown; i.e. when iterating over blocks reading in the file
1481     *                              metadata info.
1482     * @param pread                 whether to use a positional read
1483     * @param updateMetrics         whether to update the metrics
1484     * @param intoHeap              allocate ByteBuff of block from heap or off-heap.
1485     * @see FSReader#readBlockData(long, long, boolean, boolean, boolean) for more details about the
1486     *      useHeap.
1487     */
1488    @Override
1489    public HFileBlock readBlockData(long offset, long onDiskSizeWithHeaderL, boolean pread,
1490      boolean updateMetrics, boolean intoHeap) throws IOException {
1491      // Get a copy of the current state of whether to validate
1492      // hbase checksums or not for this read call. This is not
1493      // thread-safe but the one constraint is that if we decide
1494      // to skip hbase checksum verification then we are
1495      // guaranteed to use hdfs checksum verification.
1496      boolean doVerificationThruHBaseChecksum = streamWrapper.shouldUseHBaseChecksum();
1497      FSDataInputStream is = streamWrapper.getStream(doVerificationThruHBaseChecksum);
1498      final Context context = Context.current().with(CONTEXT_KEY,
1499        new HFileContextAttributesBuilderConsumer(fileContext)
1500          .setSkipChecksum(doVerificationThruHBaseChecksum)
1501          .setReadType(pread ? ReadType.POSITIONAL_READ : ReadType.SEEK_PLUS_READ));
1502      try (Scope ignored = context.makeCurrent()) {
1503        HFileBlock blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread,
1504          doVerificationThruHBaseChecksum, updateMetrics, intoHeap);
1505        if (blk == null) {
1506          HFile.LOG.warn("HBase checksum verification failed for file {} at offset {} filesize {}."
1507            + " Retrying read with HDFS checksums turned on...", pathName, offset, fileSize);
1508
1509          if (!doVerificationThruHBaseChecksum) {
1510            String msg = "HBase checksum verification failed for file " + pathName + " at offset "
1511              + offset + " filesize " + fileSize + " but this cannot happen because doVerify is "
1512              + doVerificationThruHBaseChecksum;
1513            HFile.LOG.warn(msg);
1514            throw new IOException(msg); // cannot happen case here
1515          }
1516          HFile.CHECKSUM_FAILURES.increment(); // update metrics
1517
1518          // If we have a checksum failure, we fall back into a mode where
1519          // the next few reads use HDFS level checksums. We aim to make the
1520          // next CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD reads avoid
1521          // hbase checksum verification, but since this value is set without
1522          // holding any locks, it can so happen that we might actually do
1523          // a few more than precisely this number.
1524          is = this.streamWrapper.fallbackToFsChecksum(CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD);
1525          doVerificationThruHBaseChecksum = false;
1526          blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread,
1527            doVerificationThruHBaseChecksum, updateMetrics, intoHeap);
1528          if (blk != null) {
1529            HFile.LOG.warn(
1530              "HDFS checksum verification succeeded for file {} at offset {} filesize" + " {}",
1531              pathName, offset, fileSize);
1532          }
1533        }
1534        if (blk == null && !doVerificationThruHBaseChecksum) {
1535          String msg =
1536            "readBlockData failed, possibly due to " + "checksum verification failed for file "
1537              + pathName + " at offset " + offset + " filesize " + fileSize;
1538          HFile.LOG.warn(msg);
1539          throw new IOException(msg);
1540        }
1541
1542        // If there is a checksum mismatch earlier, then retry with
1543        // HBase checksums switched off and use HDFS checksum verification.
1544        // This triggers HDFS to detect and fix corrupt replicas. The
1545        // next checksumOffCount read requests will use HDFS checksums.
1546        // The decrementing of this.checksumOffCount is not thread-safe,
1547        // but it is harmless because eventually checksumOffCount will be
1548        // a negative number.
1549        streamWrapper.checksumOk();
1550        return blk;
1551      }
1552    }
1553
1554    /**
1555     * Returns Check <code>onDiskSizeWithHeaderL</code> size is healthy and then return it as an int
1556     */
1557    private static int checkAndGetSizeAsInt(final long onDiskSizeWithHeaderL, final int hdrSize)
1558      throws IOException {
1559      if (
1560        (onDiskSizeWithHeaderL < hdrSize && onDiskSizeWithHeaderL != -1)
1561          || onDiskSizeWithHeaderL >= Integer.MAX_VALUE
1562      ) {
1563        throw new IOException(
1564          "Invalid onDisksize=" + onDiskSizeWithHeaderL + ": expected to be at least " + hdrSize
1565            + " and at most " + Integer.MAX_VALUE + ", or -1");
1566      }
1567      return (int) onDiskSizeWithHeaderL;
1568    }
1569
1570    /**
1571     * Verify the passed in onDiskSizeWithHeader aligns with what is in the header else something is
1572     * not right.
1573     */
1574    private void verifyOnDiskSizeMatchesHeader(final int passedIn, final ByteBuff headerBuf,
1575      final long offset, boolean verifyChecksum) throws IOException {
1576      // Assert size provided aligns with what is in the header
1577      int fromHeader = getOnDiskSizeWithHeader(headerBuf, verifyChecksum);
1578      if (passedIn != fromHeader) {
1579        throw new IOException("Passed in onDiskSizeWithHeader=" + passedIn + " != " + fromHeader
1580          + ", offset=" + offset + ", fileContext=" + this.fileContext);
1581      }
1582    }
1583
1584    /**
1585     * Check atomic reference cache for this block's header. Cache only good if next read coming
1586     * through is next in sequence in the block. We read next block's header on the tail of reading
1587     * the previous block to save a seek. Otherwise, we have to do a seek to read the header before
1588     * we can pull in the block OR we have to backup the stream because we over-read (the next
1589     * block's header).
1590     * @see PrefetchedHeader
1591     * @return The cached block header or null if not found.
1592     * @see #cacheNextBlockHeader(long, ByteBuff, int, int)
1593     */
1594    private ByteBuff getCachedHeader(final long offset) {
1595      PrefetchedHeader ph = this.prefetchedHeader.get();
1596      return ph != null && ph.offset == offset ? ph.buf : null;
1597    }
1598
1599    /**
1600     * Save away the next blocks header in atomic reference.
1601     * @see #getCachedHeader(long)
1602     * @see PrefetchedHeader
1603     */
1604    private void cacheNextBlockHeader(final long offset, ByteBuff onDiskBlock,
1605      int onDiskSizeWithHeader, int headerLength) {
1606      PrefetchedHeader ph = new PrefetchedHeader();
1607      ph.offset = offset;
1608      onDiskBlock.get(onDiskSizeWithHeader, ph.header, 0, headerLength);
1609      this.prefetchedHeader.set(ph);
1610    }
1611
1612    private int getNextBlockOnDiskSize(boolean readNextHeader, ByteBuff onDiskBlock,
1613      int onDiskSizeWithHeader) {
1614      int nextBlockOnDiskSize = -1;
1615      if (readNextHeader) {
1616        nextBlockOnDiskSize =
1617          onDiskBlock.getIntAfterPosition(onDiskSizeWithHeader + BlockType.MAGIC_LENGTH) + hdrSize;
1618      }
1619      return nextBlockOnDiskSize;
1620    }
1621
1622    private ByteBuff allocate(int size, boolean intoHeap) {
1623      return intoHeap ? HEAP.allocate(size) : allocator.allocate(size);
1624    }
1625
1626    /**
1627     * Reads a version 2 block.
1628     * @param offset                the offset in the stream to read at.
1629     * @param onDiskSizeWithHeaderL the on-disk size of the block, including the header and
1630     *                              checksums if present or -1 if unknown (as a long). Can be -1 if
1631     *                              we are doing raw iteration of blocks as when loading up file
1632     *                              metadata; i.e. the first read of a new file. Usually non-null
1633     *                              gotten from the file index.
1634     * @param pread                 whether to use a positional read
1635     * @param verifyChecksum        Whether to use HBase checksums. If HBase checksum is switched
1636     *                              off, then use HDFS checksum. Can also flip on/off reading same
1637     *                              file if we hit a troublesome patch in an hfile.
1638     * @param updateMetrics         whether need to update the metrics.
1639     * @param intoHeap              allocate the ByteBuff of block from heap or off-heap.
1640     * @return the HFileBlock or null if there is a HBase checksum mismatch
1641     */
1642    protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset,
1643      long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum, boolean updateMetrics,
1644      boolean intoHeap) throws IOException {
1645      if (offset < 0) {
1646        throw new IOException("Invalid offset=" + offset + " trying to read " + "block (onDiskSize="
1647          + onDiskSizeWithHeaderL + ")");
1648      }
1649
1650      final Span span = Span.current();
1651      final AttributesBuilder attributesBuilder = Attributes.builder();
1652      Optional.of(Context.current()).map(val -> val.get(CONTEXT_KEY))
1653        .ifPresent(c -> c.accept(attributesBuilder));
1654      int onDiskSizeWithHeader = checkAndGetSizeAsInt(onDiskSizeWithHeaderL, hdrSize);
1655      // Try and get cached header. Will serve us in rare case where onDiskSizeWithHeaderL is -1
1656      // and will save us having to seek the stream backwards to reread the header we
1657      // read the last time through here.
1658      ByteBuff headerBuf = getCachedHeader(offset);
1659      LOG.trace(
1660        "Reading {} at offset={}, pread={}, verifyChecksum={}, cachedHeader={}, "
1661          + "onDiskSizeWithHeader={}",
1662        this.fileContext.getHFileName(), offset, pread, verifyChecksum, headerBuf,
1663        onDiskSizeWithHeader);
1664      // This is NOT same as verifyChecksum. This latter is whether to do hbase
1665      // checksums. Can change with circumstances. The below flag is whether the
1666      // file has support for checksums (version 2+).
1667      boolean checksumSupport = this.fileContext.isUseHBaseChecksum();
1668      long startTime = EnvironmentEdgeManager.currentTime();
1669      if (onDiskSizeWithHeader <= 0) {
1670        // We were not passed the block size. Need to get it from the header. If header was
1671        // not cached (see getCachedHeader above), need to seek to pull it in. This is costly
1672        // and should happen very rarely. Currently happens on open of a hfile reader where we
1673        // read the trailer blocks to pull in the indices. Otherwise, we are reading block sizes
1674        // out of the hfile index. To check, enable TRACE in this file and you'll get an exception
1675        // in a LOG every time we seek. See HBASE-17072 for more detail.
1676        if (headerBuf == null) {
1677          if (LOG.isTraceEnabled()) {
1678            LOG.trace("Extra seek to get block size!", new RuntimeException());
1679          }
1680          span.addEvent("Extra seek to get block size!", attributesBuilder.build());
1681          headerBuf = HEAP.allocate(hdrSize);
1682          readAtOffset(is, headerBuf, hdrSize, false, offset, pread);
1683          headerBuf.rewind();
1684        }
1685        onDiskSizeWithHeader = getOnDiskSizeWithHeader(headerBuf, checksumSupport);
1686      }
1687      int preReadHeaderSize = headerBuf == null ? 0 : hdrSize;
1688      // Allocate enough space to fit the next block's header too; saves a seek next time through.
1689      // onDiskBlock is whole block + header + checksums then extra hdrSize to read next header;
1690      // onDiskSizeWithHeader is header, body, and any checksums if present. preReadHeaderSize
1691      // says where to start reading. If we have the header cached, then we don't need to read
1692      // it again and we can likely read from last place we left off w/o need to backup and reread
1693      // the header we read last time through here.
1694      ByteBuff onDiskBlock = this.allocate(onDiskSizeWithHeader + hdrSize, intoHeap);
1695      boolean initHFileBlockSuccess = false;
1696      try {
1697        if (headerBuf != null) {
1698          onDiskBlock.put(0, headerBuf, 0, hdrSize).position(hdrSize);
1699        }
1700        boolean readNextHeader = readAtOffset(is, onDiskBlock,
1701          onDiskSizeWithHeader - preReadHeaderSize, true, offset + preReadHeaderSize, pread);
1702        onDiskBlock.rewind(); // in case of moving position when copying a cached header
1703        int nextBlockOnDiskSize =
1704          getNextBlockOnDiskSize(readNextHeader, onDiskBlock, onDiskSizeWithHeader);
1705        if (headerBuf == null) {
1706          headerBuf = onDiskBlock.duplicate().position(0).limit(hdrSize);
1707        }
1708        // Do a few checks before we go instantiate HFileBlock.
1709        assert onDiskSizeWithHeader > this.hdrSize;
1710        verifyOnDiskSizeMatchesHeader(onDiskSizeWithHeader, headerBuf, offset, checksumSupport);
1711        ByteBuff curBlock = onDiskBlock.duplicate().position(0).limit(onDiskSizeWithHeader);
1712        // Verify checksum of the data before using it for building HFileBlock.
1713        if (verifyChecksum && !validateChecksum(offset, curBlock, hdrSize)) {
1714          return null;
1715        }
1716        // remove checksum from buffer now that it's verified
1717        int sizeWithoutChecksum = curBlock.getInt(Header.ON_DISK_DATA_SIZE_WITH_HEADER_INDEX);
1718        curBlock.limit(sizeWithoutChecksum);
1719        long duration = EnvironmentEdgeManager.currentTime() - startTime;
1720        if (updateMetrics) {
1721          HFile.updateReadLatency(duration, pread);
1722        }
1723        // The onDiskBlock will become the headerAndDataBuffer for this block.
1724        // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already
1725        // contains the header of next block, so no need to set next block's header in it.
1726        HFileBlock hFileBlock = createFromBuff(curBlock, checksumSupport, offset,
1727          nextBlockOnDiskSize, fileContext, intoHeap ? HEAP : allocator);
1728        // Run check on uncompressed sizings.
1729        if (!fileContext.isCompressedOrEncrypted()) {
1730          hFileBlock.sanityCheckUncompressed();
1731        }
1732        LOG.trace("Read {} in {} ms", hFileBlock, duration);
1733        span.addEvent("Read block", attributesBuilder.build());
1734        // Cache next block header if we read it for the next time through here.
1735        if (nextBlockOnDiskSize != -1) {
1736          cacheNextBlockHeader(offset + hFileBlock.getOnDiskSizeWithHeader(), onDiskBlock,
1737            onDiskSizeWithHeader, hdrSize);
1738        }
1739        initHFileBlockSuccess = true;
1740        return hFileBlock;
1741      } finally {
1742        if (!initHFileBlockSuccess) {
1743          onDiskBlock.release();
1744        }
1745      }
1746    }
1747
1748    @Override
1749    public void setIncludesMemStoreTS(boolean includesMemstoreTS) {
1750      this.fileContext =
1751        new HFileContextBuilder(this.fileContext).withIncludesMvcc(includesMemstoreTS).build();
1752    }
1753
1754    @Override
1755    public void setDataBlockEncoder(HFileDataBlockEncoder encoder, Configuration conf) {
1756      encodedBlockDecodingCtx = encoder.newDataBlockDecodingContext(conf, fileContext);
1757    }
1758
1759    @Override
1760    public HFileBlockDecodingContext getBlockDecodingContext() {
1761      return this.encodedBlockDecodingCtx;
1762    }
1763
1764    @Override
1765    public HFileBlockDecodingContext getDefaultBlockDecodingContext() {
1766      return this.defaultDecodingCtx;
1767    }
1768
1769    /**
1770     * Generates the checksum for the header as well as the data and then validates it. If the block
1771     * doesn't uses checksum, returns false.
1772     * @return True if checksum matches, else false.
1773     */
1774    private boolean validateChecksum(long offset, ByteBuff data, int hdrSize) {
1775      // If this is an older version of the block that does not have checksums, then return false
1776      // indicating that checksum verification did not succeed. Actually, this method should never
1777      // be called when the minorVersion is 0, thus this is a defensive check for a cannot-happen
1778      // case. Since this is a cannot-happen case, it is better to return false to indicate a
1779      // checksum validation failure.
1780      if (!fileContext.isUseHBaseChecksum()) {
1781        return false;
1782      }
1783      return ChecksumUtil.validateChecksum(data, pathName, offset, hdrSize);
1784    }
1785
1786    @Override
1787    public void closeStreams() throws IOException {
1788      streamWrapper.close();
1789    }
1790
1791    @Override
1792    public void unbufferStream() {
1793      // To handle concurrent reads, ensure that no other client is accessing the streams while we
1794      // unbuffer it.
1795      if (streamLock.tryLock()) {
1796        try {
1797          this.streamWrapper.unbuffer();
1798        } finally {
1799          streamLock.unlock();
1800        }
1801      }
1802    }
1803
1804    @Override
1805    public String toString() {
1806      return "hfs=" + hfs + ", path=" + pathName + ", fileContext=" + fileContext;
1807    }
1808  }
1809
1810  /** An additional sanity-check in case no compression or encryption is being used. */
1811  void sanityCheckUncompressed() throws IOException {
1812    if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader + totalChecksumBytes()) {
1813      throw new IOException("Using no compression but " + "onDiskSizeWithoutHeader="
1814        + onDiskSizeWithoutHeader + ", " + "uncompressedSizeWithoutHeader="
1815        + uncompressedSizeWithoutHeader + ", numChecksumbytes=" + totalChecksumBytes());
1816    }
1817  }
1818
1819  // Cacheable implementation
1820  @Override
1821  public int getSerializedLength() {
1822    if (buf != null) {
1823      // Include extra bytes for block metadata.
1824      return this.buf.limit() + BLOCK_METADATA_SPACE;
1825    }
1826    return 0;
1827  }
1828
1829  // Cacheable implementation
1830  @Override
1831  public void serialize(ByteBuffer destination, boolean includeNextBlockMetadata) {
1832    this.buf.get(destination, 0, getSerializedLength() - BLOCK_METADATA_SPACE);
1833    destination = addMetaData(destination, includeNextBlockMetadata);
1834
1835    // Make it ready for reading. flip sets position to zero and limit to current position which
1836    // is what we want if we do not want to serialize the block plus checksums if present plus
1837    // metadata.
1838    destination.flip();
1839  }
1840
1841  /**
1842   * For use by bucketcache. This exposes internals.
1843   */
1844  public ByteBuffer getMetaData(ByteBuffer bb) {
1845    bb = addMetaData(bb, true);
1846    bb.flip();
1847    return bb;
1848  }
1849
1850  /**
1851   * Adds metadata at current position (position is moved forward). Does not flip or reset.
1852   * @return The passed <code>destination</code> with metadata added.
1853   */
1854  private ByteBuffer addMetaData(final ByteBuffer destination, boolean includeNextBlockMetadata) {
1855    destination.put(this.fileContext.isUseHBaseChecksum() ? (byte) 1 : (byte) 0);
1856    destination.putLong(this.offset);
1857    if (includeNextBlockMetadata) {
1858      destination.putInt(this.nextBlockOnDiskSize);
1859    }
1860    return destination;
1861  }
1862
1863  // Cacheable implementation
1864  @Override
1865  public CacheableDeserializer<Cacheable> getDeserializer() {
1866    return HFileBlock.BLOCK_DESERIALIZER;
1867  }
1868
1869  @Override
1870  public int hashCode() {
1871    int result = 1;
1872    result = result * 31 + blockType.hashCode();
1873    result = result * 31 + nextBlockOnDiskSize;
1874    result = result * 31 + (int) (offset ^ (offset >>> 32));
1875    result = result * 31 + onDiskSizeWithoutHeader;
1876    result = result * 31 + (int) (prevBlockOffset ^ (prevBlockOffset >>> 32));
1877    result = result * 31 + uncompressedSizeWithoutHeader;
1878    result = result * 31 + buf.hashCode();
1879    return result;
1880  }
1881
1882  @Override
1883  public boolean equals(Object comparison) {
1884    if (this == comparison) {
1885      return true;
1886    }
1887    if (comparison == null) {
1888      return false;
1889    }
1890    if (!(comparison instanceof HFileBlock)) {
1891      return false;
1892    }
1893
1894    HFileBlock castedComparison = (HFileBlock) comparison;
1895
1896    if (castedComparison.blockType != this.blockType) {
1897      return false;
1898    }
1899    if (castedComparison.nextBlockOnDiskSize != this.nextBlockOnDiskSize) {
1900      return false;
1901    }
1902    // Offset is important. Needed when we have to remake cachekey when block is returned to cache.
1903    if (castedComparison.offset != this.offset) {
1904      return false;
1905    }
1906    if (castedComparison.onDiskSizeWithoutHeader != this.onDiskSizeWithoutHeader) {
1907      return false;
1908    }
1909    if (castedComparison.prevBlockOffset != this.prevBlockOffset) {
1910      return false;
1911    }
1912    if (castedComparison.uncompressedSizeWithoutHeader != this.uncompressedSizeWithoutHeader) {
1913      return false;
1914    }
1915    if (
1916      ByteBuff.compareTo(this.buf, 0, this.buf.limit(), castedComparison.buf, 0,
1917        castedComparison.buf.limit()) != 0
1918    ) {
1919      return false;
1920    }
1921    return true;
1922  }
1923
1924  DataBlockEncoding getDataBlockEncoding() {
1925    if (blockType == BlockType.ENCODED_DATA) {
1926      return DataBlockEncoding.getEncodingById(getDataBlockEncodingId());
1927    }
1928    return DataBlockEncoding.NONE;
1929  }
1930
1931  byte getChecksumType() {
1932    return this.fileContext.getChecksumType().getCode();
1933  }
1934
1935  int getBytesPerChecksum() {
1936    return this.fileContext.getBytesPerChecksum();
1937  }
1938
1939  /** Returns the size of data on disk + header. Excludes checksum. */
1940  int getOnDiskDataSizeWithHeader() {
1941    return this.onDiskDataSizeWithHeader;
1942  }
1943
1944  /**
1945   * Calculate the number of bytes required to store all the checksums for this block. Each checksum
1946   * value is a 4 byte integer.
1947   */
1948  int totalChecksumBytes() {
1949    // If the hfile block has minorVersion 0, then there are no checksum
1950    // data to validate. Similarly, a zero value in this.bytesPerChecksum
1951    // indicates that cached blocks do not have checksum data because
1952    // checksums were already validated when the block was read from disk.
1953    if (!fileContext.isUseHBaseChecksum() || this.fileContext.getBytesPerChecksum() == 0) {
1954      return 0;
1955    }
1956    return (int) ChecksumUtil.numBytes(onDiskDataSizeWithHeader,
1957      this.fileContext.getBytesPerChecksum());
1958  }
1959
1960  /**
1961   * Returns the size of this block header.
1962   */
1963  public int headerSize() {
1964    return headerSize(this.fileContext.isUseHBaseChecksum());
1965  }
1966
1967  /**
1968   * Maps a minor version to the size of the header.
1969   */
1970  public static int headerSize(boolean usesHBaseChecksum) {
1971    return usesHBaseChecksum
1972      ? HConstants.HFILEBLOCK_HEADER_SIZE
1973      : HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
1974  }
1975
1976  /**
1977   * Return the appropriate DUMMY_HEADER for the minor version
1978   */
1979  // TODO: Why is this in here?
1980  byte[] getDummyHeaderForVersion() {
1981    return getDummyHeaderForVersion(this.fileContext.isUseHBaseChecksum());
1982  }
1983
1984  /**
1985   * Return the appropriate DUMMY_HEADER for the minor version
1986   */
1987  static private byte[] getDummyHeaderForVersion(boolean usesHBaseChecksum) {
1988    return usesHBaseChecksum ? HConstants.HFILEBLOCK_DUMMY_HEADER : DUMMY_HEADER_NO_CHECKSUM;
1989  }
1990
1991  /**
1992   * @return This HFileBlocks fileContext which will a derivative of the fileContext for the file
1993   *         from which this block's data was originally read.
1994   */
1995  public HFileContext getHFileContext() {
1996    return this.fileContext;
1997  }
1998
1999  /**
2000   * Convert the contents of the block header into a human readable string. This is mostly helpful
2001   * for debugging. This assumes that the block has minor version > 0.
2002   */
2003  static String toStringHeader(ByteBuff buf) throws IOException {
2004    byte[] magicBuf = new byte[Math.min(buf.limit() - buf.position(), BlockType.MAGIC_LENGTH)];
2005    buf.get(magicBuf);
2006    BlockType bt = BlockType.parse(magicBuf, 0, BlockType.MAGIC_LENGTH);
2007    int compressedBlockSizeNoHeader = buf.getInt();
2008    int uncompressedBlockSizeNoHeader = buf.getInt();
2009    long prevBlockOffset = buf.getLong();
2010    byte cksumtype = buf.get();
2011    long bytesPerChecksum = buf.getInt();
2012    long onDiskDataSizeWithHeader = buf.getInt();
2013    return " Header dump: magic: " + Bytes.toString(magicBuf) + " blockType " + bt
2014      + " compressedBlockSizeNoHeader " + compressedBlockSizeNoHeader
2015      + " uncompressedBlockSizeNoHeader " + uncompressedBlockSizeNoHeader + " prevBlockOffset "
2016      + prevBlockOffset + " checksumType " + ChecksumType.codeToType(cksumtype)
2017      + " bytesPerChecksum " + bytesPerChecksum + " onDiskDataSizeWithHeader "
2018      + onDiskDataSizeWithHeader;
2019  }
2020
2021  /**
2022   * Creates a new HFileBlockBuilder from the existing block and a new ByteBuff. The builder will be
2023   * loaded with all of the original fields from blk, except now using the newBuff and setting
2024   * isSharedMem based on the source of the passed in newBuff. An existing HFileBlock may have been
2025   * an {@link ExclusiveMemHFileBlock}, but the new buffer might call for a
2026   * {@link SharedMemHFileBlock}. Or vice versa.
2027   * @param blk     the block to clone from
2028   * @param newBuff the new buffer to use
2029   */
2030  private static HFileBlockBuilder createBuilder(HFileBlock blk, ByteBuff newBuff) {
2031    return new HFileBlockBuilder().withBlockType(blk.blockType)
2032      .withOnDiskSizeWithoutHeader(blk.onDiskSizeWithoutHeader)
2033      .withUncompressedSizeWithoutHeader(blk.uncompressedSizeWithoutHeader)
2034      .withPrevBlockOffset(blk.prevBlockOffset).withByteBuff(newBuff).withOffset(blk.offset)
2035      .withOnDiskDataSizeWithHeader(blk.onDiskDataSizeWithHeader)
2036      .withNextBlockOnDiskSize(blk.nextBlockOnDiskSize).withHFileContext(blk.fileContext)
2037      .withByteBuffAllocator(blk.allocator).withShared(!newBuff.hasArray());
2038  }
2039
2040  private static HFileBlock shallowClone(HFileBlock blk, ByteBuff newBuf) {
2041    return createBuilder(blk, newBuf).build();
2042  }
2043
2044  static HFileBlock deepCloneOnHeap(HFileBlock blk) {
2045    ByteBuff deepCloned = ByteBuff.wrap(ByteBuffer.wrap(blk.buf.toBytes(0, blk.buf.limit())));
2046    return createBuilder(blk, deepCloned).build();
2047  }
2048}