001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP;
021import static org.apache.hadoop.hbase.io.hfile.BlockCompressedSizePredicator.BLOCK_COMPRESSED_SIZE_PREDICATOR;
022import static org.apache.hadoop.hbase.io.hfile.trace.HFileContextAttributesBuilderConsumer.CONTEXT_KEY;
023
024import io.opentelemetry.api.common.Attributes;
025import io.opentelemetry.api.common.AttributesBuilder;
026import io.opentelemetry.api.trace.Span;
027import io.opentelemetry.context.Context;
028import io.opentelemetry.context.Scope;
029import java.io.DataInputStream;
030import java.io.DataOutput;
031import java.io.DataOutputStream;
032import java.io.IOException;
033import java.nio.ByteBuffer;
034import java.util.ArrayList;
035import java.util.List;
036import java.util.Optional;
037import java.util.concurrent.atomic.AtomicReference;
038import java.util.concurrent.locks.Lock;
039import java.util.concurrent.locks.ReentrantLock;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.fs.FSDataInputStream;
042import org.apache.hadoop.fs.FSDataOutputStream;
043import org.apache.hadoop.hbase.Cell;
044import org.apache.hadoop.hbase.HConstants;
045import org.apache.hadoop.hbase.fs.HFileSystem;
046import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
047import org.apache.hadoop.hbase.io.ByteBuffAllocator;
048import org.apache.hadoop.hbase.io.ByteBuffInputStream;
049import org.apache.hadoop.hbase.io.ByteBufferWriterDataOutputStream;
050import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
051import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
052import org.apache.hadoop.hbase.io.encoding.EncodingState;
053import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
054import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
055import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
056import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
057import org.apache.hadoop.hbase.io.hfile.trace.HFileContextAttributesBuilderConsumer;
058import org.apache.hadoop.hbase.io.util.BlockIOUtils;
059import org.apache.hadoop.hbase.nio.ByteBuff;
060import org.apache.hadoop.hbase.nio.MultiByteBuff;
061import org.apache.hadoop.hbase.nio.SingleByteBuff;
062import org.apache.hadoop.hbase.regionserver.ShipperListener;
063import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.ReadType;
064import org.apache.hadoop.hbase.util.Bytes;
065import org.apache.hadoop.hbase.util.ChecksumType;
066import org.apache.hadoop.hbase.util.ClassSize;
067import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
068import org.apache.hadoop.util.ReflectionUtils;
069import org.apache.yetus.audience.InterfaceAudience;
070import org.slf4j.Logger;
071import org.slf4j.LoggerFactory;
072
073import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
074
075/**
076 * Cacheable Blocks of an {@link HFile} version 2 file. Version 2 was introduced in hbase-0.92.0.
077 * <p>
078 * Version 1 was the original file block. Version 2 was introduced when we changed the hbase file
079 * format to support multi-level block indexes and compound bloom filters (HBASE-3857). Support for
080 * Version 1 was removed in hbase-1.3.0.
081 * <h3>HFileBlock: Version 2</h3> In version 2, a block is structured as follows:
082 * <ul>
083 * <li><b>Header:</b> See Writer#putHeader() for where header is written; header total size is
084 * HFILEBLOCK_HEADER_SIZE
085 * <ul>
086 * <li>0. blockType: Magic record identifying the {@link BlockType} (8 bytes): e.g.
087 * <code>DATABLK*</code>
088 * <li>1. onDiskSizeWithoutHeader: Compressed -- a.k.a 'on disk' -- block size, excluding header,
089 * but including tailing checksum bytes (4 bytes)
090 * <li>2. uncompressedSizeWithoutHeader: Uncompressed block size, excluding header, and excluding
091 * checksum bytes (4 bytes)
092 * <li>3. prevBlockOffset: The offset of the previous block of the same type (8 bytes). This is used
093 * to navigate to the previous block without having to go to the block index
094 * <li>4: For minorVersions &gt;=1, the ordinal describing checksum type (1 byte)
095 * <li>5: For minorVersions &gt;=1, the number of data bytes/checksum chunk (4 bytes)
096 * <li>6: onDiskDataSizeWithHeader: For minorVersions &gt;=1, the size of data 'on disk', including
097 * header, excluding checksums (4 bytes)
098 * </ul>
099 * </li>
100 * <li><b>Raw/Compressed/Encrypted/Encoded data:</b> The compression algorithm is the same for all
101 * the blocks in an {@link HFile}. If compression is NONE, this is just raw, serialized Cells.
102 * <li><b>Tail:</b> For minorVersions &gt;=1, a series of 4 byte checksums, one each for the number
103 * of bytes specified by bytesPerChecksum.
104 * </ul>
105 * <h3>Caching</h3> Caches cache whole blocks with trailing checksums if any. We then tag on some
106 * metadata, the content of BLOCK_METADATA_SPACE which will be flag on if we are doing 'hbase'
107 * checksums and then the offset into the file which is needed when we re-make a cache key when we
108 * return the block to the cache as 'done'. See {@link Cacheable#serialize(ByteBuffer, boolean)} and
109 * {@link Cacheable#getDeserializer()}.
110 * <p>
111 * TODO: Should we cache the checksums? Down in Writer#getBlockForCaching(CacheConfig) where we make
112 * a block to cache-on-write, there is an attempt at turning off checksums. This is not the only
113 * place we get blocks to cache. We also will cache the raw return from an hdfs read. In this case,
114 * the checksums may be present. If the cache is backed by something that doesn't do ECC, say an
115 * SSD, we might want to preserve checksums. For now this is open question.
116 * <p>
117 * TODO: Over in BucketCache, we save a block allocation by doing a custom serialization. Be sure to
118 * change it if serialization changes in here. Could we add a method here that takes an IOEngine and
119 * that then serializes to it rather than expose our internals over in BucketCache? IOEngine is in
120 * the bucket subpackage. Pull it up? Then this class knows about bucketcache. Ugh.
121 */
122@InterfaceAudience.Private
123public class HFileBlock implements Cacheable {
124  private static final Logger LOG = LoggerFactory.getLogger(HFileBlock.class);
125  public static final long FIXED_OVERHEAD = ClassSize.estimateBase(HFileBlock.class, false);
126
127  // Block Header fields.
128
129  // TODO: encapsulate Header related logic in this inner class.
130  static class Header {
131    // Format of header is:
132    // 8 bytes - block magic
133    // 4 bytes int - onDiskSizeWithoutHeader
134    // 4 bytes int - uncompressedSizeWithoutHeader
135    // 8 bytes long - prevBlockOffset
136    // The following 3 are only present if header contains checksum information
137    // 1 byte - checksum type
138    // 4 byte int - bytes per checksum
139    // 4 byte int - onDiskDataSizeWithHeader
140    static int BLOCK_MAGIC_INDEX = 0;
141    static int ON_DISK_SIZE_WITHOUT_HEADER_INDEX = 8;
142    static int UNCOMPRESSED_SIZE_WITHOUT_HEADER_INDEX = 12;
143    static int PREV_BLOCK_OFFSET_INDEX = 16;
144    static int CHECKSUM_TYPE_INDEX = 24;
145    static int BYTES_PER_CHECKSUM_INDEX = 25;
146    static int ON_DISK_DATA_SIZE_WITH_HEADER_INDEX = 29;
147  }
148
149  /** Type of block. Header field 0. */
150  private BlockType blockType;
151
152  /**
153   * Size on disk excluding header, including checksum. Header field 1.
154   * @see Writer#putHeader(byte[], int, int, int, int)
155   */
156  private int onDiskSizeWithoutHeader;
157
158  /**
159   * Size of pure data. Does not include header or checksums. Header field 2.
160   * @see Writer#putHeader(byte[], int, int, int, int)
161   */
162  private int uncompressedSizeWithoutHeader;
163
164  /**
165   * The offset of the previous block on disk. Header field 3.
166   * @see Writer#putHeader(byte[], int, int, int, int)
167   */
168  private long prevBlockOffset;
169
170  /**
171   * Size on disk of header + data. Excludes checksum. Header field 6, OR calculated from
172   * {@link #onDiskSizeWithoutHeader} when using HDFS checksum.
173   * @see Writer#putHeader(byte[], int, int, int, int)
174   */
175  private int onDiskDataSizeWithHeader;
176  // End of Block Header fields.
177
178  /**
179   * The in-memory representation of the hfile block. Can be on or offheap. Can be backed by a
180   * single ByteBuffer or by many. Make no assumptions.
181   * <p>
182   * Be careful reading from this <code>buf</code>. Duplicate and work on the duplicate or if not,
183   * be sure to reset position and limit else trouble down the road.
184   * <p>
185   * TODO: Make this read-only once made.
186   * <p>
187   * We are using the ByteBuff type. ByteBuffer is not extensible yet we need to be able to have a
188   * ByteBuffer-like API across multiple ByteBuffers reading from a cache such as BucketCache. So,
189   * we have this ByteBuff type. Unfortunately, it is spread all about HFileBlock. Would be good if
190   * could be confined to cache-use only but hard-to-do.
191   */
192  private ByteBuff buf;
193
194  /**
195   * Meta data that holds meta information on the hfileblock.
196   */
197  private HFileContext fileContext;
198
199  /**
200   * The offset of this block in the file. Populated by the reader for convenience of access. This
201   * offset is not part of the block header.
202   */
203  private long offset = UNSET;
204
205  /**
206   * The on-disk size of the next block, including the header and checksums if present. UNSET if
207   * unknown. Blocks try to carry the size of the next block to read in this data member. Usually we
208   * get block sizes from the hfile index but sometimes the index is not available: e.g. when we
209   * read the indexes themselves (indexes are stored in blocks, we do not have an index for the
210   * indexes). Saves seeks especially around file open when there is a flurry of reading in hfile
211   * metadata.
212   */
213  private int nextBlockOnDiskSize = UNSET;
214
215  private ByteBuffAllocator allocator;
216
217  /**
218   * On a checksum failure, do these many succeeding read requests using hdfs checksums before
219   * auto-reenabling hbase checksum verification.
220   */
221  static final int CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD = 3;
222
223  private static int UNSET = -1;
224  public static final boolean FILL_HEADER = true;
225  public static final boolean DONT_FILL_HEADER = false;
226
227  // How to get the estimate correctly? if it is a singleBB?
228  public static final int MULTI_BYTE_BUFFER_HEAP_SIZE =
229    (int) ClassSize.estimateBase(MultiByteBuff.class, false);
230
231  /**
232   * Space for metadata on a block that gets stored along with the block when we cache it. There are
233   * a few bytes stuck on the end of the HFileBlock that we pull in from HDFS. 8 bytes are for the
234   * offset of this block (long) in the file. Offset is important because is is used when we remake
235   * the CacheKey when we return block to the cache when done. There is also a flag on whether
236   * checksumming is being done by hbase or not. See class comment for note on uncertain state of
237   * checksumming of blocks that come out of cache (should we or should we not?). Finally there are
238   * 4 bytes to hold the length of the next block which can save a seek on occasion if available.
239   * (This EXTRA info came in with original commit of the bucketcache, HBASE-7404. It was formerly
240   * known as EXTRA_SERIALIZATION_SPACE).
241   */
242  public static final int BLOCK_METADATA_SPACE =
243    Bytes.SIZEOF_BYTE + Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT;
244
245  /**
246   * Each checksum value is an integer that can be stored in 4 bytes.
247   */
248  static final int CHECKSUM_SIZE = Bytes.SIZEOF_INT;
249
250  static final byte[] DUMMY_HEADER_NO_CHECKSUM =
251    new byte[HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM];
252
253  /**
254   * Used deserializing blocks from Cache. <code>
255   * ++++++++++++++
256   * + HFileBlock +
257   * ++++++++++++++
258   * + Checksums  + <= Optional
259   * ++++++++++++++
260   * + Metadata!  + <= See note on BLOCK_METADATA_SPACE above.
261   * ++++++++++++++
262   * </code>
263   * @see #serialize(ByteBuffer, boolean)
264   */
265  public static final CacheableDeserializer<Cacheable> BLOCK_DESERIALIZER = new BlockDeserializer();
266
267  public static final class BlockDeserializer implements CacheableDeserializer<Cacheable> {
268    private BlockDeserializer() {
269    }
270
271    @Override
272    public HFileBlock deserialize(ByteBuff buf, ByteBuffAllocator alloc) throws IOException {
273      // The buf has the file block followed by block metadata.
274      // Set limit to just before the BLOCK_METADATA_SPACE then rewind.
275      buf.limit(buf.limit() - BLOCK_METADATA_SPACE).rewind();
276      // Get a new buffer to pass the HFileBlock for it to 'own'.
277      ByteBuff newByteBuff = buf.slice();
278      // Read out the BLOCK_METADATA_SPACE content and shove into our HFileBlock.
279      buf.position(buf.limit());
280      buf.limit(buf.limit() + HFileBlock.BLOCK_METADATA_SPACE);
281      boolean usesChecksum = buf.get() == (byte) 1;
282      long offset = buf.getLong();
283      int nextBlockOnDiskSize = buf.getInt();
284      return createFromBuff(newByteBuff, usesChecksum, offset, nextBlockOnDiskSize, null, alloc);
285    }
286
287    @Override
288    public int getDeserializerIdentifier() {
289      return DESERIALIZER_IDENTIFIER;
290    }
291  }
292
293  private static final int DESERIALIZER_IDENTIFIER;
294  static {
295    DESERIALIZER_IDENTIFIER =
296      CacheableDeserializerIdManager.registerDeserializer(BLOCK_DESERIALIZER);
297  }
298
299  /**
300   * Creates a new {@link HFile} block from the given fields. This constructor is used only while
301   * writing blocks and caching, and is sitting in a byte buffer and we want to stuff the block into
302   * cache.
303   * <p>
304   * TODO: The caller presumes no checksumming
305   * <p>
306   * TODO: HFile block writer can also off-heap ?
307   * </p>
308   * required of this block instance since going into cache; checksum already verified on underlying
309   * block data pulled in from filesystem. Is that correct? What if cache is SSD?
310   * @param blockType                     the type of this block, see {@link BlockType}
311   * @param onDiskSizeWithoutHeader       see {@link #onDiskSizeWithoutHeader}
312   * @param uncompressedSizeWithoutHeader see {@link #uncompressedSizeWithoutHeader}
313   * @param prevBlockOffset               see {@link #prevBlockOffset}
314   * @param buf                           block buffer with header
315   *                                      ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes)
316   * @param fillHeader                    when true, write the first 4 header fields into passed
317   *                                      buffer.
318   * @param offset                        the file offset the block was read from
319   * @param onDiskDataSizeWithHeader      see {@link #onDiskDataSizeWithHeader}
320   * @param fileContext                   HFile meta data
321   */
322  public HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader,
323    int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuff buf, boolean fillHeader,
324    long offset, int nextBlockOnDiskSize, int onDiskDataSizeWithHeader, HFileContext fileContext,
325    ByteBuffAllocator allocator) {
326    this.blockType = blockType;
327    this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader;
328    this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader;
329    this.prevBlockOffset = prevBlockOffset;
330    this.offset = offset;
331    this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader;
332    this.nextBlockOnDiskSize = nextBlockOnDiskSize;
333    this.fileContext = fileContext;
334    this.allocator = allocator;
335    this.buf = buf;
336    if (fillHeader) {
337      overwriteHeader();
338    }
339    this.buf.rewind();
340  }
341
342  /**
343   * Creates a block from an existing buffer starting with a header. Rewinds and takes ownership of
344   * the buffer. By definition of rewind, ignores the buffer position, but if you slice the buffer
345   * beforehand, it will rewind to that point.
346   * @param buf Has header, content, and trailing checksums if present.
347   */
348  static HFileBlock createFromBuff(ByteBuff buf, boolean usesHBaseChecksum, final long offset,
349    final int nextBlockOnDiskSize, HFileContext fileContext, ByteBuffAllocator allocator)
350    throws IOException {
351    buf.rewind();
352    final BlockType blockType = BlockType.read(buf);
353    final int onDiskSizeWithoutHeader = buf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX);
354    final int uncompressedSizeWithoutHeader =
355      buf.getInt(Header.UNCOMPRESSED_SIZE_WITHOUT_HEADER_INDEX);
356    final long prevBlockOffset = buf.getLong(Header.PREV_BLOCK_OFFSET_INDEX);
357    // This constructor is called when we deserialize a block from cache and when we read a block in
358    // from the fs. fileCache is null when deserialized from cache so need to make up one.
359    HFileContextBuilder fileContextBuilder =
360      fileContext != null ? new HFileContextBuilder(fileContext) : new HFileContextBuilder();
361    fileContextBuilder.withHBaseCheckSum(usesHBaseChecksum);
362    int onDiskDataSizeWithHeader;
363    if (usesHBaseChecksum) {
364      byte checksumType = buf.get(Header.CHECKSUM_TYPE_INDEX);
365      int bytesPerChecksum = buf.getInt(Header.BYTES_PER_CHECKSUM_INDEX);
366      onDiskDataSizeWithHeader = buf.getInt(Header.ON_DISK_DATA_SIZE_WITH_HEADER_INDEX);
367      // Use the checksum type and bytes per checksum from header, not from fileContext.
368      fileContextBuilder.withChecksumType(ChecksumType.codeToType(checksumType));
369      fileContextBuilder.withBytesPerCheckSum(bytesPerChecksum);
370    } else {
371      fileContextBuilder.withChecksumType(ChecksumType.NULL);
372      fileContextBuilder.withBytesPerCheckSum(0);
373      // Need to fix onDiskDataSizeWithHeader; there are not checksums after-block-data
374      onDiskDataSizeWithHeader = onDiskSizeWithoutHeader + headerSize(usesHBaseChecksum);
375    }
376    fileContext = fileContextBuilder.build();
377    assert usesHBaseChecksum == fileContext.isUseHBaseChecksum();
378    return new HFileBlockBuilder().withBlockType(blockType)
379      .withOnDiskSizeWithoutHeader(onDiskSizeWithoutHeader)
380      .withUncompressedSizeWithoutHeader(uncompressedSizeWithoutHeader)
381      .withPrevBlockOffset(prevBlockOffset).withOffset(offset)
382      .withOnDiskDataSizeWithHeader(onDiskDataSizeWithHeader)
383      .withNextBlockOnDiskSize(nextBlockOnDiskSize).withHFileContext(fileContext)
384      .withByteBuffAllocator(allocator).withByteBuff(buf.rewind()).withShared(!buf.hasArray())
385      .build();
386  }
387
388  /**
389   * Parse total on disk size including header and checksum.
390   * @param headerBuf      Header ByteBuffer. Presumed exact size of header.
391   * @param verifyChecksum true if checksum verification is in use.
392   * @return Size of the block with header included.
393   */
394  private static int getOnDiskSizeWithHeader(final ByteBuff headerBuf, boolean verifyChecksum) {
395    return headerBuf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX) + headerSize(verifyChecksum);
396  }
397
398  /**
399   * @return the on-disk size of the next block (including the header size and any checksums if
400   *         present) read by peeking into the next block's header; use as a hint when doing a read
401   *         of the next block when scanning or running over a file.
402   */
403  int getNextBlockOnDiskSize() {
404    return nextBlockOnDiskSize;
405  }
406
407  @Override
408  public BlockType getBlockType() {
409    return blockType;
410  }
411
412  @Override
413  public int refCnt() {
414    return buf.refCnt();
415  }
416
417  @Override
418  public HFileBlock retain() {
419    buf.retain();
420    return this;
421  }
422
423  /**
424   * Call {@link ByteBuff#release()} to decrease the reference count, if no other reference, it will
425   * return back the {@link ByteBuffer} to {@link org.apache.hadoop.hbase.io.ByteBuffAllocator}
426   */
427  @Override
428  public boolean release() {
429    return buf.release();
430  }
431
432  /**
433   * Calling this method in strategic locations where HFileBlocks are referenced may help diagnose
434   * potential buffer leaks. We pass the block itself as a default hint, but one can use
435   * {@link #touch(Object)} to pass their own hint as well.
436   */
437  @Override
438  public HFileBlock touch() {
439    return touch(this);
440  }
441
442  @Override
443  public HFileBlock touch(Object hint) {
444    buf.touch(hint);
445    return this;
446  }
447
448  /** Returns get data block encoding id that was used to encode this block */
449  short getDataBlockEncodingId() {
450    if (blockType != BlockType.ENCODED_DATA) {
451      throw new IllegalArgumentException("Querying encoder ID of a block " + "of type other than "
452        + BlockType.ENCODED_DATA + ": " + blockType);
453    }
454    return buf.getShort(headerSize());
455  }
456
457  /** Returns the on-disk size of header + data part + checksum. */
458  public int getOnDiskSizeWithHeader() {
459    return onDiskSizeWithoutHeader + headerSize();
460  }
461
462  /** Returns the on-disk size of the data part + checksum (header excluded). */
463  int getOnDiskSizeWithoutHeader() {
464    return onDiskSizeWithoutHeader;
465  }
466
467  /** Returns the uncompressed size of data part (header and checksum excluded). */
468  public int getUncompressedSizeWithoutHeader() {
469    return uncompressedSizeWithoutHeader;
470  }
471
472  /** Returns the offset of the previous block of the same type in the file, or -1 if unknown */
473  long getPrevBlockOffset() {
474    return prevBlockOffset;
475  }
476
477  /**
478   * Rewinds {@code buf} and writes first 4 header fields. {@code buf} position is modified as
479   * side-effect.
480   */
481  private void overwriteHeader() {
482    buf.rewind();
483    blockType.write(buf);
484    buf.putInt(onDiskSizeWithoutHeader);
485    buf.putInt(uncompressedSizeWithoutHeader);
486    buf.putLong(prevBlockOffset);
487    if (this.fileContext.isUseHBaseChecksum()) {
488      buf.put(fileContext.getChecksumType().getCode());
489      buf.putInt(fileContext.getBytesPerChecksum());
490      buf.putInt(onDiskDataSizeWithHeader);
491    }
492  }
493
494  /**
495   * Returns a buffer that does not include the header and checksum.
496   * @return the buffer with header skipped and checksum omitted.
497   */
498  public ByteBuff getBufferWithoutHeader() {
499    ByteBuff dup = getBufferReadOnly();
500    return dup.position(headerSize()).slice();
501  }
502
503  /**
504   * Returns a read-only duplicate of the buffer this block stores internally ready to be read.
505   * Clients must not modify the buffer object though they may set position and limit on the
506   * returned buffer since we pass back a duplicate. This method has to be public because it is used
507   * in {@link CompoundBloomFilter} to avoid object creation on every Bloom filter lookup, but has
508   * to be used with caution. Buffer holds header, block content, and any follow-on checksums if
509   * present.
510   * @return the buffer of this block for read-only operations
511   */
512  public ByteBuff getBufferReadOnly() {
513    // TODO: ByteBuf does not support asReadOnlyBuffer(). Fix.
514    ByteBuff dup = this.buf.duplicate();
515    assert dup.position() == 0;
516    return dup;
517  }
518
519  public ByteBuffAllocator getByteBuffAllocator() {
520    return this.allocator;
521  }
522
523  private void sanityCheckAssertion(long valueFromBuf, long valueFromField, String fieldName)
524    throws IOException {
525    if (valueFromBuf != valueFromField) {
526      throw new AssertionError(fieldName + " in the buffer (" + valueFromBuf
527        + ") is different from that in the field (" + valueFromField + ")");
528    }
529  }
530
531  private void sanityCheckAssertion(BlockType valueFromBuf, BlockType valueFromField)
532    throws IOException {
533    if (valueFromBuf != valueFromField) {
534      throw new IOException("Block type stored in the buffer: " + valueFromBuf
535        + ", block type field: " + valueFromField);
536    }
537  }
538
539  /**
540   * Checks if the block is internally consistent, i.e. the first
541   * {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the buffer contain a valid header consistent
542   * with the fields. Assumes a packed block structure. This function is primary for testing and
543   * debugging, and is not thread-safe, because it alters the internal buffer pointer. Used by tests
544   * only.
545   */
546  void sanityCheck() throws IOException {
547    // Duplicate so no side-effects
548    ByteBuff dup = this.buf.duplicate().rewind();
549    sanityCheckAssertion(BlockType.read(dup), blockType);
550
551    sanityCheckAssertion(dup.getInt(), onDiskSizeWithoutHeader, "onDiskSizeWithoutHeader");
552
553    sanityCheckAssertion(dup.getInt(), uncompressedSizeWithoutHeader,
554      "uncompressedSizeWithoutHeader");
555
556    sanityCheckAssertion(dup.getLong(), prevBlockOffset, "prevBlockOffset");
557    if (this.fileContext.isUseHBaseChecksum()) {
558      sanityCheckAssertion(dup.get(), this.fileContext.getChecksumType().getCode(), "checksumType");
559      sanityCheckAssertion(dup.getInt(), this.fileContext.getBytesPerChecksum(),
560        "bytesPerChecksum");
561      sanityCheckAssertion(dup.getInt(), onDiskDataSizeWithHeader, "onDiskDataSizeWithHeader");
562    }
563
564    if (dup.limit() != onDiskDataSizeWithHeader) {
565      throw new AssertionError(
566        "Expected limit " + onDiskDataSizeWithHeader + ", got " + dup.limit());
567    }
568
569    // We might optionally allocate HFILEBLOCK_HEADER_SIZE more bytes to read the next
570    // block's header, so there are two sensible values for buffer capacity.
571    int hdrSize = headerSize();
572    dup.rewind();
573    if (
574      dup.remaining() != onDiskDataSizeWithHeader
575        && dup.remaining() != onDiskDataSizeWithHeader + hdrSize
576    ) {
577      throw new AssertionError("Invalid buffer capacity: " + dup.remaining() + ", expected "
578        + onDiskDataSizeWithHeader + " or " + (onDiskDataSizeWithHeader + hdrSize));
579    }
580  }
581
582  @Override
583  public String toString() {
584    StringBuilder sb = new StringBuilder().append("[").append("blockType=").append(blockType)
585      .append(", fileOffset=").append(offset).append(", headerSize=").append(headerSize())
586      .append(", onDiskSizeWithoutHeader=").append(onDiskSizeWithoutHeader)
587      .append(", uncompressedSizeWithoutHeader=").append(uncompressedSizeWithoutHeader)
588      .append(", prevBlockOffset=").append(prevBlockOffset).append(", isUseHBaseChecksum=")
589      .append(fileContext.isUseHBaseChecksum());
590    if (fileContext.isUseHBaseChecksum()) {
591      sb.append(", checksumType=").append(ChecksumType.codeToType(this.buf.get(24)))
592        .append(", bytesPerChecksum=").append(this.buf.getInt(24 + 1))
593        .append(", onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader);
594    } else {
595      sb.append(", onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader).append("(")
596        .append(onDiskSizeWithoutHeader).append("+")
597        .append(HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM).append(")");
598    }
599    String dataBegin;
600    if (buf.hasArray()) {
601      dataBegin = Bytes.toStringBinary(buf.array(), buf.arrayOffset() + headerSize(),
602        Math.min(32, buf.limit() - buf.arrayOffset() - headerSize()));
603    } else {
604      ByteBuff bufWithoutHeader = getBufferWithoutHeader();
605      byte[] dataBeginBytes =
606        new byte[Math.min(32, bufWithoutHeader.limit() - bufWithoutHeader.position())];
607      bufWithoutHeader.get(dataBeginBytes);
608      dataBegin = Bytes.toStringBinary(dataBeginBytes);
609    }
610    sb.append(", getOnDiskSizeWithHeader=").append(getOnDiskSizeWithHeader())
611      .append(", totalChecksumBytes=").append(totalChecksumBytes()).append(", isUnpacked=")
612      .append(isUnpacked()).append(", buf=[").append(buf).append("]").append(", dataBeginsWith=")
613      .append(dataBegin).append(", fileContext=").append(fileContext)
614      .append(", nextBlockOnDiskSize=").append(nextBlockOnDiskSize).append("]");
615    return sb.toString();
616  }
617
618  /**
619   * Retrieves the decompressed/decrypted view of this block. An encoded block remains in its
620   * encoded structure. Internal structures are shared between instances where applicable.
621   */
622  HFileBlock unpack(HFileContext fileContext, FSReader reader) throws IOException {
623    if (!fileContext.isCompressedOrEncrypted()) {
624      // TODO: cannot use our own fileContext here because HFileBlock(ByteBuffer, boolean),
625      // which is used for block serialization to L2 cache, does not preserve encoding and
626      // encryption details.
627      return this;
628    }
629
630    ByteBuff newBuf = allocateBufferForUnpacking(); // allocates space for the decompressed block
631    HFileBlock unpacked = shallowClone(this, newBuf);
632
633    boolean succ = false;
634    final Context context =
635      Context.current().with(CONTEXT_KEY, new HFileContextAttributesBuilderConsumer(fileContext));
636    try (Scope ignored = context.makeCurrent()) {
637      HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA
638        ? reader.getBlockDecodingContext()
639        : reader.getDefaultBlockDecodingContext();
640      // Create a duplicated buffer without the header part.
641      int headerSize = this.headerSize();
642      ByteBuff dup = this.buf.duplicate();
643      dup.position(headerSize);
644      dup = dup.slice();
645      // Decode the dup into unpacked#buf
646      ctx.prepareDecoding(unpacked.getOnDiskDataSizeWithHeader() - headerSize,
647        unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(), dup);
648      succ = true;
649      return unpacked;
650    } finally {
651      if (!succ) {
652        unpacked.release();
653      }
654    }
655  }
656
657  /**
658   * Always allocates a new buffer of the correct size. Copies header bytes from the existing
659   * buffer. Does not change header fields. Reserve room to keep checksum bytes too.
660   */
661  private ByteBuff allocateBufferForUnpacking() {
662    int headerSize = headerSize();
663    int capacityNeeded = headerSize + uncompressedSizeWithoutHeader;
664
665    ByteBuff source = buf.duplicate();
666    ByteBuff newBuf = allocator.allocate(capacityNeeded);
667
668    // Copy header bytes into newBuf.
669    source.position(0);
670    newBuf.put(0, source, 0, headerSize);
671
672    // set limit to exclude next block's header
673    newBuf.limit(capacityNeeded);
674    return newBuf;
675  }
676
677  /**
678   * Return true when this block's buffer has been unpacked, false otherwise. Note this is a
679   * calculated heuristic, not tracked attribute of the block.
680   */
681  public boolean isUnpacked() {
682    final int headerSize = headerSize();
683    final int expectedCapacity = headerSize + uncompressedSizeWithoutHeader;
684    final int bufCapacity = buf.remaining();
685    return bufCapacity == expectedCapacity || bufCapacity == expectedCapacity + headerSize;
686  }
687
688  /**
689   * Cannot be {@link #UNSET}. Must be a legitimate value. Used re-making the {@link BlockCacheKey}
690   * when block is returned to the cache.
691   * @return the offset of this block in the file it was read from
692   */
693  long getOffset() {
694    if (offset < 0) {
695      throw new IllegalStateException("HFile block offset not initialized properly");
696    }
697    return offset;
698  }
699
700  /** Returns a byte stream reading the data + checksum of this block */
701  DataInputStream getByteStream() {
702    ByteBuff dup = this.buf.duplicate();
703    dup.position(this.headerSize());
704    return new DataInputStream(new ByteBuffInputStream(dup));
705  }
706
707  @Override
708  public long heapSize() {
709    long size = FIXED_OVERHEAD;
710    size += fileContext.heapSize();
711    if (buf != null) {
712      // Deep overhead of the byte buffer. Needs to be aligned separately.
713      size += ClassSize.align(buf.capacity() + MULTI_BYTE_BUFFER_HEAP_SIZE);
714    }
715    return ClassSize.align(size);
716  }
717
718  /**
719   * Will be override by {@link SharedMemHFileBlock} or {@link ExclusiveMemHFileBlock}. Return true
720   * by default.
721   */
722  public boolean isSharedMem() {
723    return true;
724  }
725
726  /**
727   * Unified version 2 {@link HFile} block writer. The intended usage pattern is as follows:
728   * <ol>
729   * <li>Construct an {@link HFileBlock.Writer}, providing a compression algorithm.
730   * <li>Call {@link Writer#startWriting} and get a data stream to write to.
731   * <li>Write your data into the stream.
732   * <li>Call Writer#writeHeaderAndData(FSDataOutputStream) as many times as you need to. store the
733   * serialized block into an external stream.
734   * <li>Repeat to write more blocks.
735   * </ol>
736   * <p>
737   */
738  static class Writer implements ShipperListener {
739    private enum State {
740      INIT,
741      WRITING,
742      BLOCK_READY
743    }
744
745    private int maxSizeUnCompressed;
746
747    private BlockCompressedSizePredicator compressedSizePredicator;
748
749    /** Writer state. Used to ensure the correct usage protocol. */
750    private State state = State.INIT;
751
752    /** Data block encoder used for data blocks */
753    private final HFileDataBlockEncoder dataBlockEncoder;
754
755    private HFileBlockEncodingContext dataBlockEncodingCtx;
756
757    /** block encoding context for non-data blocks */
758    private HFileBlockDefaultEncodingContext defaultBlockEncodingCtx;
759
760    /**
761     * The stream we use to accumulate data into a block in an uncompressed format. We reset this
762     * stream at the end of each block and reuse it. The header is written as the first
763     * {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes into this stream.
764     */
765    private ByteArrayOutputStream baosInMemory;
766
767    /**
768     * Current block type. Set in {@link #startWriting(BlockType)}. Could be changed in
769     * {@link #finishBlock()} from {@link BlockType#DATA} to {@link BlockType#ENCODED_DATA}.
770     */
771    private BlockType blockType;
772
773    /**
774     * A stream that we write uncompressed bytes to, which compresses them and writes them to
775     * {@link #baosInMemory}.
776     */
777    private DataOutputStream userDataStream;
778
779    /**
780     * Bytes to be written to the file system, including the header. Compressed if compression is
781     * turned on. It also includes the checksum data that immediately follows the block data.
782     * (header + data + checksums)
783     */
784    private ByteArrayOutputStream onDiskBlockBytesWithHeader;
785
786    /**
787     * The size of the checksum data on disk. It is used only if data is not compressed. If data is
788     * compressed, then the checksums are already part of onDiskBytesWithHeader. If data is
789     * uncompressed, then this variable stores the checksum data for this block.
790     */
791    private byte[] onDiskChecksum = HConstants.EMPTY_BYTE_ARRAY;
792
793    /**
794     * Current block's start offset in the {@link HFile}. Set in
795     * {@link #writeHeaderAndData(FSDataOutputStream)}.
796     */
797    private long startOffset;
798
799    /**
800     * Offset of previous block by block type. Updated when the next block is started.
801     */
802    private long[] prevOffsetByType;
803
804    /** The offset of the previous block of the same type */
805    private long prevOffset;
806    /** Meta data that holds information about the hfileblock **/
807    private HFileContext fileContext;
808
809    private final ByteBuffAllocator allocator;
810
811    @Override
812    public void beforeShipped() {
813      if (getEncodingState() != null) {
814        getEncodingState().beforeShipped();
815      }
816    }
817
818    EncodingState getEncodingState() {
819      return dataBlockEncodingCtx.getEncodingState();
820    }
821
822    /**
823     * @param dataBlockEncoder data block encoding algorithm to use
824     */
825    public Writer(Configuration conf, HFileDataBlockEncoder dataBlockEncoder,
826      HFileContext fileContext) {
827      this(conf, dataBlockEncoder, fileContext, ByteBuffAllocator.HEAP, fileContext.getBlocksize());
828    }
829
830    public Writer(Configuration conf, HFileDataBlockEncoder dataBlockEncoder,
831      HFileContext fileContext, ByteBuffAllocator allocator, int maxSizeUnCompressed) {
832      if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) {
833        throw new RuntimeException("Unsupported value of bytesPerChecksum. " + " Minimum is "
834          + HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is "
835          + fileContext.getBytesPerChecksum());
836      }
837      this.allocator = allocator;
838      this.dataBlockEncoder =
839        dataBlockEncoder != null ? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE;
840      this.dataBlockEncodingCtx = this.dataBlockEncoder.newDataBlockEncodingContext(conf,
841        HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
842      // TODO: This should be lazily instantiated
843      this.defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(conf, null,
844        HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
845      // TODO: Set BAOS initial size. Use fileContext.getBlocksize() and add for header/checksum
846      baosInMemory = new ByteArrayOutputStream();
847      prevOffsetByType = new long[BlockType.values().length];
848      for (int i = 0; i < prevOffsetByType.length; ++i) {
849        prevOffsetByType[i] = UNSET;
850      }
851      // TODO: Why fileContext saved away when we have dataBlockEncoder and/or
852      // defaultDataBlockEncoder?
853      this.fileContext = fileContext;
854      this.compressedSizePredicator = (BlockCompressedSizePredicator) ReflectionUtils.newInstance(
855        conf.getClass(BLOCK_COMPRESSED_SIZE_PREDICATOR, UncompressedBlockSizePredicator.class),
856        new Configuration(conf));
857      this.maxSizeUnCompressed = maxSizeUnCompressed;
858    }
859
860    /**
861     * Starts writing into the block. The previous block's data is discarded.
862     * @return the stream the user can write their data into
863     */
864    DataOutputStream startWriting(BlockType newBlockType) throws IOException {
865      if (state == State.BLOCK_READY && startOffset != -1) {
866        // We had a previous block that was written to a stream at a specific
867        // offset. Save that offset as the last offset of a block of that type.
868        prevOffsetByType[blockType.getId()] = startOffset;
869      }
870
871      startOffset = -1;
872      blockType = newBlockType;
873
874      baosInMemory.reset();
875      baosInMemory.write(HConstants.HFILEBLOCK_DUMMY_HEADER);
876
877      state = State.WRITING;
878
879      // We will compress it later in finishBlock()
880      userDataStream = new ByteBufferWriterDataOutputStream(baosInMemory);
881      if (newBlockType == BlockType.DATA) {
882        this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream);
883      }
884      return userDataStream;
885    }
886
887    /**
888     * Writes the Cell to this block
889     */
890    void write(Cell cell) throws IOException {
891      expectState(State.WRITING);
892      this.dataBlockEncoder.encode(cell, dataBlockEncodingCtx, this.userDataStream);
893    }
894
895    /**
896     * Transitions the block writer from the "writing" state to the "block ready" state. Does
897     * nothing if a block is already finished.
898     */
899    void ensureBlockReady() throws IOException {
900      Preconditions.checkState(state != State.INIT, "Unexpected state: " + state);
901
902      if (state == State.BLOCK_READY) {
903        return;
904      }
905
906      // This will set state to BLOCK_READY.
907      finishBlock();
908    }
909
910    public boolean checkBoundariesWithPredicate() {
911      int rawBlockSize = encodedBlockSizeWritten();
912      if (rawBlockSize >= maxSizeUnCompressed) {
913        return true;
914      } else {
915        return compressedSizePredicator.shouldFinishBlock(rawBlockSize);
916      }
917    }
918
919    /**
920     * Finish up writing of the block. Flushes the compressing stream (if using compression), fills
921     * out the header, does any compression/encryption of bytes to flush out to disk, and manages
922     * the cache on write content, if applicable. Sets block write state to "block ready".
923     */
924    private void finishBlock() throws IOException {
925      if (blockType == BlockType.DATA) {
926        this.dataBlockEncoder.endBlockEncoding(dataBlockEncodingCtx, userDataStream,
927          baosInMemory.getBuffer(), blockType);
928        blockType = dataBlockEncodingCtx.getBlockType();
929      }
930      userDataStream.flush();
931      prevOffset = prevOffsetByType[blockType.getId()];
932
933      // We need to cache the unencoded/uncompressed size before changing the block state
934      int rawBlockSize = 0;
935      if (this.getEncodingState() != null) {
936        rawBlockSize = encodedBlockSizeWritten();
937      }
938      // We need to set state before we can package the block up for cache-on-write. In a way, the
939      // block is ready, but not yet encoded or compressed.
940      state = State.BLOCK_READY;
941      Bytes compressAndEncryptDat;
942      if (blockType == BlockType.DATA || blockType == BlockType.ENCODED_DATA) {
943        compressAndEncryptDat =
944          dataBlockEncodingCtx.compressAndEncrypt(baosInMemory.getBuffer(), 0, baosInMemory.size());
945      } else {
946        compressAndEncryptDat = defaultBlockEncodingCtx.compressAndEncrypt(baosInMemory.getBuffer(),
947          0, baosInMemory.size());
948      }
949      if (compressAndEncryptDat == null) {
950        compressAndEncryptDat = new Bytes(baosInMemory.getBuffer(), 0, baosInMemory.size());
951      }
952      if (onDiskBlockBytesWithHeader == null) {
953        onDiskBlockBytesWithHeader = new ByteArrayOutputStream(compressAndEncryptDat.getLength());
954      }
955      onDiskBlockBytesWithHeader.reset();
956      onDiskBlockBytesWithHeader.write(compressAndEncryptDat.get(),
957        compressAndEncryptDat.getOffset(), compressAndEncryptDat.getLength());
958      // Update raw and compressed sizes in the predicate
959      compressedSizePredicator.updateLatestBlockSizes(fileContext, rawBlockSize,
960        onDiskBlockBytesWithHeader.size());
961
962      // Calculate how many bytes we need for checksum on the tail of the block.
963      int numBytes = (int) ChecksumUtil.numBytes(onDiskBlockBytesWithHeader.size(),
964        fileContext.getBytesPerChecksum());
965
966      // Put the header for the on disk bytes; header currently is unfilled-out
967      putHeader(onDiskBlockBytesWithHeader, onDiskBlockBytesWithHeader.size() + numBytes,
968        baosInMemory.size(), onDiskBlockBytesWithHeader.size());
969
970      if (onDiskChecksum.length != numBytes) {
971        onDiskChecksum = new byte[numBytes];
972      }
973      ChecksumUtil.generateChecksums(onDiskBlockBytesWithHeader.getBuffer(), 0,
974        onDiskBlockBytesWithHeader.size(), onDiskChecksum, 0, fileContext.getChecksumType(),
975        fileContext.getBytesPerChecksum());
976    }
977
978    /**
979     * Put the header into the given byte array at the given offset.
980     * @param onDiskSize       size of the block on disk header + data + checksum
981     * @param uncompressedSize size of the block after decompression (but before optional data block
982     *                         decoding) including header
983     * @param onDiskDataSize   size of the block on disk with header and data but not including the
984     *                         checksums
985     */
986    private void putHeader(byte[] dest, int offset, int onDiskSize, int uncompressedSize,
987      int onDiskDataSize) {
988      offset = blockType.put(dest, offset);
989      offset = Bytes.putInt(dest, offset, onDiskSize - HConstants.HFILEBLOCK_HEADER_SIZE);
990      offset = Bytes.putInt(dest, offset, uncompressedSize - HConstants.HFILEBLOCK_HEADER_SIZE);
991      offset = Bytes.putLong(dest, offset, prevOffset);
992      offset = Bytes.putByte(dest, offset, fileContext.getChecksumType().getCode());
993      offset = Bytes.putInt(dest, offset, fileContext.getBytesPerChecksum());
994      Bytes.putInt(dest, offset, onDiskDataSize);
995    }
996
997    private void putHeader(ByteBuff buff, int onDiskSize, int uncompressedSize,
998      int onDiskDataSize) {
999      buff.rewind();
1000      blockType.write(buff);
1001      buff.putInt(onDiskSize - HConstants.HFILEBLOCK_HEADER_SIZE);
1002      buff.putInt(uncompressedSize - HConstants.HFILEBLOCK_HEADER_SIZE);
1003      buff.putLong(prevOffset);
1004      buff.put(fileContext.getChecksumType().getCode());
1005      buff.putInt(fileContext.getBytesPerChecksum());
1006      buff.putInt(onDiskDataSize);
1007    }
1008
1009    private void putHeader(ByteArrayOutputStream dest, int onDiskSize, int uncompressedSize,
1010      int onDiskDataSize) {
1011      putHeader(dest.getBuffer(), 0, onDiskSize, uncompressedSize, onDiskDataSize);
1012    }
1013
1014    /**
1015     * Similar to {@link #writeHeaderAndData(FSDataOutputStream)}, but records the offset of this
1016     * block so that it can be referenced in the next block of the same type.
1017     */
1018    void writeHeaderAndData(FSDataOutputStream out) throws IOException {
1019      long offset = out.getPos();
1020      if (startOffset != UNSET && offset != startOffset) {
1021        throw new IOException("A " + blockType + " block written to a "
1022          + "stream twice, first at offset " + startOffset + ", then at " + offset);
1023      }
1024      startOffset = offset;
1025      finishBlockAndWriteHeaderAndData(out);
1026    }
1027
1028    /**
1029     * Writes the header and the compressed data of this block (or uncompressed data when not using
1030     * compression) into the given stream. Can be called in the "writing" state or in the "block
1031     * ready" state. If called in the "writing" state, transitions the writer to the "block ready"
1032     * state.
1033     * @param out the output stream to write the
1034     */
1035    protected void finishBlockAndWriteHeaderAndData(DataOutputStream out) throws IOException {
1036      ensureBlockReady();
1037      long startTime = EnvironmentEdgeManager.currentTime();
1038      out.write(onDiskBlockBytesWithHeader.getBuffer(), 0, onDiskBlockBytesWithHeader.size());
1039      out.write(onDiskChecksum);
1040      HFile.updateWriteLatency(EnvironmentEdgeManager.currentTime() - startTime);
1041    }
1042
1043    /**
1044     * Returns the header or the compressed data (or uncompressed data when not using compression)
1045     * as a byte array. Can be called in the "writing" state or in the "block ready" state. If
1046     * called in the "writing" state, transitions the writer to the "block ready" state. This
1047     * returns the header + data + checksums stored on disk.
1048     * @return header and data as they would be stored on disk in a byte array
1049     */
1050    byte[] getHeaderAndDataForTest() throws IOException {
1051      ensureBlockReady();
1052      // This is not very optimal, because we are doing an extra copy.
1053      // But this method is used only by unit tests.
1054      byte[] output = new byte[onDiskBlockBytesWithHeader.size() + onDiskChecksum.length];
1055      System.arraycopy(onDiskBlockBytesWithHeader.getBuffer(), 0, output, 0,
1056        onDiskBlockBytesWithHeader.size());
1057      System.arraycopy(onDiskChecksum, 0, output, onDiskBlockBytesWithHeader.size(),
1058        onDiskChecksum.length);
1059      return output;
1060    }
1061
1062    /**
1063     * Releases resources used by this writer.
1064     */
1065    void release() {
1066      if (dataBlockEncodingCtx != null) {
1067        dataBlockEncodingCtx.close();
1068        dataBlockEncodingCtx = null;
1069      }
1070      if (defaultBlockEncodingCtx != null) {
1071        defaultBlockEncodingCtx.close();
1072        defaultBlockEncodingCtx = null;
1073      }
1074    }
1075
1076    /**
1077     * Returns the on-disk size of the data portion of the block. This is the compressed size if
1078     * compression is enabled. Can only be called in the "block ready" state. Header is not
1079     * compressed, and its size is not included in the return value.
1080     * @return the on-disk size of the block, not including the header.
1081     */
1082    int getOnDiskSizeWithoutHeader() {
1083      expectState(State.BLOCK_READY);
1084      return onDiskBlockBytesWithHeader.size() + onDiskChecksum.length
1085        - HConstants.HFILEBLOCK_HEADER_SIZE;
1086    }
1087
1088    /**
1089     * Returns the on-disk size of the block. Can only be called in the "block ready" state.
1090     * @return the on-disk size of the block ready to be written, including the header size, the
1091     *         data and the checksum data.
1092     */
1093    int getOnDiskSizeWithHeader() {
1094      expectState(State.BLOCK_READY);
1095      return onDiskBlockBytesWithHeader.size() + onDiskChecksum.length;
1096    }
1097
1098    /**
1099     * The uncompressed size of the block data. Does not include header size.
1100     */
1101    int getUncompressedSizeWithoutHeader() {
1102      expectState(State.BLOCK_READY);
1103      return baosInMemory.size() - HConstants.HFILEBLOCK_HEADER_SIZE;
1104    }
1105
1106    /**
1107     * The uncompressed size of the block data, including header size.
1108     */
1109    public int getUncompressedSizeWithHeader() {
1110      expectState(State.BLOCK_READY);
1111      return baosInMemory.size();
1112    }
1113
1114    /** Returns true if a block is being written */
1115    boolean isWriting() {
1116      return state == State.WRITING;
1117    }
1118
1119    /**
1120     * Returns the number of bytes written into the current block so far, or zero if not writing the
1121     * block at the moment. Note that this will return zero in the "block ready" state as well.
1122     * @return the number of bytes written
1123     */
1124    public int encodedBlockSizeWritten() {
1125      return state != State.WRITING ? 0 : this.getEncodingState().getEncodedDataSizeWritten();
1126    }
1127
1128    /**
1129     * Returns the number of bytes written into the current block so far, or zero if not writing the
1130     * block at the moment. Note that this will return zero in the "block ready" state as well.
1131     * @return the number of bytes written
1132     */
1133    public int blockSizeWritten() {
1134      return state != State.WRITING ? 0 : this.getEncodingState().getUnencodedDataSizeWritten();
1135    }
1136
1137    /**
1138     * Clones the header followed by the uncompressed data, even if using compression. This is
1139     * needed for storing uncompressed blocks in the block cache. Can be called in the "writing"
1140     * state or the "block ready" state. Returns only the header and data, does not include checksum
1141     * data.
1142     * @return Returns an uncompressed block ByteBuff for caching on write
1143     */
1144    ByteBuff cloneUncompressedBufferWithHeader() {
1145      expectState(State.BLOCK_READY);
1146      ByteBuff bytebuff = allocator.allocate(baosInMemory.size());
1147      baosInMemory.toByteBuff(bytebuff);
1148      int numBytes = (int) ChecksumUtil.numBytes(onDiskBlockBytesWithHeader.size(),
1149        fileContext.getBytesPerChecksum());
1150      putHeader(bytebuff, onDiskBlockBytesWithHeader.size() + numBytes, baosInMemory.size(),
1151        onDiskBlockBytesWithHeader.size());
1152      bytebuff.rewind();
1153      return bytebuff;
1154    }
1155
1156    /**
1157     * Clones the header followed by the on-disk (compressed/encoded/encrypted) data. This is needed
1158     * for storing packed blocks in the block cache. Returns only the header and data, Does not
1159     * include checksum data.
1160     * @return Returns a copy of block bytes for caching on write
1161     */
1162    private ByteBuff cloneOnDiskBufferWithHeader() {
1163      expectState(State.BLOCK_READY);
1164      ByteBuff bytebuff = allocator.allocate(onDiskBlockBytesWithHeader.size());
1165      onDiskBlockBytesWithHeader.toByteBuff(bytebuff);
1166      bytebuff.rewind();
1167      return bytebuff;
1168    }
1169
1170    private void expectState(State expectedState) {
1171      if (state != expectedState) {
1172        throw new IllegalStateException(
1173          "Expected state: " + expectedState + ", actual state: " + state);
1174      }
1175    }
1176
1177    /**
1178     * Takes the given {@link BlockWritable} instance, creates a new block of its appropriate type,
1179     * writes the writable into this block, and flushes the block into the output stream. The writer
1180     * is instructed not to buffer uncompressed bytes for cache-on-write.
1181     * @param bw  the block-writable object to write as a block
1182     * @param out the file system output stream
1183     */
1184    void writeBlock(BlockWritable bw, FSDataOutputStream out) throws IOException {
1185      bw.writeToBlock(startWriting(bw.getBlockType()));
1186      writeHeaderAndData(out);
1187    }
1188
1189    /**
1190     * Creates a new HFileBlock. Checksums have already been validated, so the byte buffer passed
1191     * into the constructor of this newly created block does not have checksum data even though the
1192     * header minor version is MINOR_VERSION_WITH_CHECKSUM. This is indicated by setting a 0 value
1193     * in bytesPerChecksum. This method copies the on-disk or uncompressed data to build the
1194     * HFileBlock which is used only while writing blocks and caching.
1195     * <p>
1196     * TODO: Should there be an option where a cache can ask that hbase preserve block checksums for
1197     * checking after a block comes out of the cache? Otehrwise, cache is responsible for blocks
1198     * being wholesome (ECC memory or if file-backed, it does checksumming).
1199     */
1200    HFileBlock getBlockForCaching(CacheConfig cacheConf) {
1201      HFileContext newContext = new HFileContextBuilder().withBlockSize(fileContext.getBlocksize())
1202        .withBytesPerCheckSum(0).withChecksumType(ChecksumType.NULL) // no checksums in cached data
1203        .withCompression(fileContext.getCompression())
1204        .withDataBlockEncoding(fileContext.getDataBlockEncoding())
1205        .withHBaseCheckSum(fileContext.isUseHBaseChecksum())
1206        .withCompressTags(fileContext.isCompressTags())
1207        .withIncludesMvcc(fileContext.isIncludesMvcc())
1208        .withIncludesTags(fileContext.isIncludesTags())
1209        .withColumnFamily(fileContext.getColumnFamily()).withTableName(fileContext.getTableName())
1210        .build();
1211      // Build the HFileBlock.
1212      HFileBlockBuilder builder = new HFileBlockBuilder();
1213      ByteBuff buff;
1214      if (cacheConf.shouldCacheCompressed(blockType.getCategory())) {
1215        buff = cloneOnDiskBufferWithHeader();
1216      } else {
1217        buff = cloneUncompressedBufferWithHeader();
1218      }
1219      return builder.withBlockType(blockType)
1220        .withOnDiskSizeWithoutHeader(getOnDiskSizeWithoutHeader())
1221        .withUncompressedSizeWithoutHeader(getUncompressedSizeWithoutHeader())
1222        .withPrevBlockOffset(prevOffset).withByteBuff(buff).withFillHeader(FILL_HEADER)
1223        .withOffset(startOffset).withNextBlockOnDiskSize(UNSET)
1224        .withOnDiskDataSizeWithHeader(onDiskBlockBytesWithHeader.size() + onDiskChecksum.length)
1225        .withHFileContext(newContext).withByteBuffAllocator(cacheConf.getByteBuffAllocator())
1226        .withShared(!buff.hasArray()).build();
1227    }
1228  }
1229
1230  /** Something that can be written into a block. */
1231  interface BlockWritable {
1232    /** The type of block this data should use. */
1233    BlockType getBlockType();
1234
1235    /**
1236     * Writes the block to the provided stream. Must not write any magic records.
1237     * @param out a stream to write uncompressed data into
1238     */
1239    void writeToBlock(DataOutput out) throws IOException;
1240  }
1241
1242  /**
1243   * Iterator for reading {@link HFileBlock}s in load-on-open-section, such as root data index
1244   * block, meta index block, file info block etc.
1245   */
1246  interface BlockIterator {
1247    /**
1248     * Get the next block, or null if there are no more blocks to iterate.
1249     */
1250    HFileBlock nextBlock() throws IOException;
1251
1252    /**
1253     * Similar to {@link #nextBlock()} but checks block type, throws an exception if incorrect, and
1254     * returns the HFile block
1255     */
1256    HFileBlock nextBlockWithBlockType(BlockType blockType) throws IOException;
1257
1258    /**
1259     * Now we use the {@link ByteBuffAllocator} to manage the nio ByteBuffers for HFileBlocks, so we
1260     * must deallocate all of the ByteBuffers in the end life. the BlockIterator's life cycle is
1261     * starting from opening an HFileReader and stopped when the HFileReader#close, so we will keep
1262     * track all the read blocks until we call {@link BlockIterator#freeBlocks()} when closing the
1263     * HFileReader. Sum bytes of those blocks in load-on-open section should be quite small, so
1264     * tracking them should be OK.
1265     */
1266    void freeBlocks();
1267  }
1268
1269  /** An HFile block reader with iteration ability. */
1270  interface FSReader {
1271    /**
1272     * Reads the block at the given offset in the file with the given on-disk size and uncompressed
1273     * size.
1274     * @param offset        of the file to read
1275     * @param onDiskSize    the on-disk size of the entire block, including all applicable headers,
1276     *                      or -1 if unknown
1277     * @param pread         true to use pread, otherwise use the stream read.
1278     * @param updateMetrics update the metrics or not.
1279     * @param intoHeap      allocate the block's ByteBuff by {@link ByteBuffAllocator} or JVM heap.
1280     *                      For LRUBlockCache, we must ensure that the block to cache is an heap
1281     *                      one, because the memory occupation is based on heap now, also for
1282     *                      {@link CombinedBlockCache}, we use the heap LRUBlockCache as L1 cache to
1283     *                      cache small blocks such as IndexBlock or MetaBlock for faster access. So
1284     *                      introduce an flag here to decide whether allocate from JVM heap or not
1285     *                      so that we can avoid an extra off-heap to heap memory copy when using
1286     *                      LRUBlockCache. For most cases, we known what's the expected block type
1287     *                      we'll read, while for some special case (Example:
1288     *                      HFileReaderImpl#readNextDataBlock()), we cannot pre-decide what's the
1289     *                      expected block type, then we can only allocate block's ByteBuff from
1290     *                      {@link ByteBuffAllocator} firstly, and then when caching it in
1291     *                      {@link LruBlockCache} we'll check whether the ByteBuff is from heap or
1292     *                      not, if not then we'll clone it to an heap one and cache it.
1293     * @return the newly read block
1294     */
1295    HFileBlock readBlockData(long offset, long onDiskSize, boolean pread, boolean updateMetrics,
1296      boolean intoHeap) throws IOException;
1297
1298    /**
1299     * Creates a block iterator over the given portion of the {@link HFile}. The iterator returns
1300     * blocks starting with offset such that offset &lt;= startOffset &lt; endOffset. Returned
1301     * blocks are always unpacked. Used when no hfile index available; e.g. reading in the hfile
1302     * index blocks themselves on file open.
1303     * @param startOffset the offset of the block to start iteration with
1304     * @param endOffset   the offset to end iteration at (exclusive)
1305     * @return an iterator of blocks between the two given offsets
1306     */
1307    BlockIterator blockRange(long startOffset, long endOffset);
1308
1309    /** Closes the backing streams */
1310    void closeStreams() throws IOException;
1311
1312    /** Get a decoder for {@link BlockType#ENCODED_DATA} blocks from this file. */
1313    HFileBlockDecodingContext getBlockDecodingContext();
1314
1315    /** Get the default decoder for blocks from this file. */
1316    HFileBlockDecodingContext getDefaultBlockDecodingContext();
1317
1318    void setIncludesMemStoreTS(boolean includesMemstoreTS);
1319
1320    void setDataBlockEncoder(HFileDataBlockEncoder encoder, Configuration conf);
1321
1322    /**
1323     * To close the stream's socket. Note: This can be concurrently called from multiple threads and
1324     * implementation should take care of thread safety.
1325     */
1326    void unbufferStream();
1327  }
1328
1329  /**
1330   * Data-structure to use caching the header of the NEXT block. Only works if next read that comes
1331   * in here is next in sequence in this block. When we read, we read current block and the next
1332   * blocks' header. We do this so we have the length of the next block to read if the hfile index
1333   * is not available (rare, at hfile open only).
1334   */
1335  private static class PrefetchedHeader {
1336    long offset = -1;
1337    byte[] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE];
1338    final ByteBuff buf = new SingleByteBuff(ByteBuffer.wrap(header, 0, header.length));
1339
1340    @Override
1341    public String toString() {
1342      return "offset=" + this.offset + ", header=" + Bytes.toStringBinary(header);
1343    }
1344  }
1345
1346  /**
1347   * Reads version 2 HFile blocks from the filesystem.
1348   */
1349  static class FSReaderImpl implements FSReader {
1350    /**
1351     * The file system stream of the underlying {@link HFile} that does or doesn't do checksum
1352     * validations in the filesystem
1353     */
1354    private FSDataInputStreamWrapper streamWrapper;
1355
1356    private HFileBlockDecodingContext encodedBlockDecodingCtx;
1357
1358    /** Default context used when BlockType != {@link BlockType#ENCODED_DATA}. */
1359    private final HFileBlockDefaultDecodingContext defaultDecodingCtx;
1360
1361    /**
1362     * Cache of the NEXT header after this. Check it is indeed next blocks header before using it.
1363     * TODO: Review. This overread into next block to fetch next blocks header seems unnecessary
1364     * given we usually get the block size from the hfile index. Review!
1365     */
1366    private AtomicReference<PrefetchedHeader> prefetchedHeader =
1367      new AtomicReference<>(new PrefetchedHeader());
1368
1369    /** The size of the file we are reading from, or -1 if unknown. */
1370    private long fileSize;
1371
1372    /** The size of the header */
1373    protected final int hdrSize;
1374
1375    /** The filesystem used to access data */
1376    private HFileSystem hfs;
1377
1378    private HFileContext fileContext;
1379    // Cache the fileName
1380    private String pathName;
1381
1382    private final ByteBuffAllocator allocator;
1383
1384    private final Lock streamLock = new ReentrantLock();
1385
1386    private final boolean isPreadAllBytes;
1387
1388    FSReaderImpl(ReaderContext readerContext, HFileContext fileContext, ByteBuffAllocator allocator,
1389      Configuration conf) throws IOException {
1390      this.fileSize = readerContext.getFileSize();
1391      this.hfs = readerContext.getFileSystem();
1392      if (readerContext.getFilePath() != null) {
1393        this.pathName = readerContext.getFilePath().toString();
1394      }
1395      this.fileContext = fileContext;
1396      this.hdrSize = headerSize(fileContext.isUseHBaseChecksum());
1397      this.allocator = allocator;
1398
1399      this.streamWrapper = readerContext.getInputStreamWrapper();
1400      // Older versions of HBase didn't support checksum.
1401      this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum());
1402      defaultDecodingCtx = new HFileBlockDefaultDecodingContext(conf, fileContext);
1403      encodedBlockDecodingCtx = defaultDecodingCtx;
1404      isPreadAllBytes = readerContext.isPreadAllBytes();
1405    }
1406
1407    @Override
1408    public BlockIterator blockRange(final long startOffset, final long endOffset) {
1409      final FSReader owner = this; // handle for inner class
1410      return new BlockIterator() {
1411        private volatile boolean freed = false;
1412        // Tracking all read blocks until we call freeBlocks.
1413        private List<HFileBlock> blockTracker = new ArrayList<>();
1414        private long offset = startOffset;
1415        // Cache length of next block. Current block has the length of next block in it.
1416        private long length = -1;
1417
1418        @Override
1419        public HFileBlock nextBlock() throws IOException {
1420          if (offset >= endOffset) {
1421            return null;
1422          }
1423          HFileBlock b = readBlockData(offset, length, false, false, true);
1424          offset += b.getOnDiskSizeWithHeader();
1425          length = b.getNextBlockOnDiskSize();
1426          HFileBlock uncompressed = b.unpack(fileContext, owner);
1427          if (uncompressed != b) {
1428            b.release(); // Need to release the compressed Block now.
1429          }
1430          blockTracker.add(uncompressed);
1431          return uncompressed;
1432        }
1433
1434        @Override
1435        public HFileBlock nextBlockWithBlockType(BlockType blockType) throws IOException {
1436          HFileBlock blk = nextBlock();
1437          if (blk.getBlockType() != blockType) {
1438            throw new IOException(
1439              "Expected block of type " + blockType + " but found " + blk.getBlockType());
1440          }
1441          return blk;
1442        }
1443
1444        @Override
1445        public void freeBlocks() {
1446          if (freed) {
1447            return;
1448          }
1449          blockTracker.forEach(HFileBlock::release);
1450          blockTracker = null;
1451          freed = true;
1452        }
1453      };
1454    }
1455
1456    /**
1457     * Does a positional read or a seek and read into the given byte buffer. We need take care that
1458     * we will call the {@link ByteBuff#release()} for every exit to deallocate the ByteBuffers,
1459     * otherwise the memory leak may happen.
1460     * @param dest              destination buffer
1461     * @param size              size of read
1462     * @param peekIntoNextBlock whether to read the next block's on-disk size
1463     * @param fileOffset        position in the stream to read at
1464     * @param pread             whether we should do a positional read
1465     * @param istream           The input source of data
1466     * @return true to indicate the destination buffer include the next block header, otherwise only
1467     *         include the current block data without the next block header.
1468     * @throws IOException if any IO error happen.
1469     */
1470    protected boolean readAtOffset(FSDataInputStream istream, ByteBuff dest, int size,
1471      boolean peekIntoNextBlock, long fileOffset, boolean pread) throws IOException {
1472      if (!pread) {
1473        // Seek + read. Better for scanning.
1474        istream.seek(fileOffset);
1475        long realOffset = istream.getPos();
1476        if (realOffset != fileOffset) {
1477          throw new IOException("Tried to seek to " + fileOffset + " to read " + size
1478            + " bytes, but pos=" + realOffset + " after seek");
1479        }
1480        if (!peekIntoNextBlock) {
1481          BlockIOUtils.readFully(dest, istream, size);
1482          return false;
1483        }
1484
1485        // Try to read the next block header
1486        if (!BlockIOUtils.readWithExtra(dest, istream, size, hdrSize)) {
1487          // did not read the next block header.
1488          return false;
1489        }
1490      } else {
1491        // Positional read. Better for random reads; or when the streamLock is already locked.
1492        int extraSize = peekIntoNextBlock ? hdrSize : 0;
1493        if (
1494          !BlockIOUtils.preadWithExtra(dest, istream, fileOffset, size, extraSize, isPreadAllBytes)
1495        ) {
1496          // did not read the next block header.
1497          return false;
1498        }
1499      }
1500      assert peekIntoNextBlock;
1501      return true;
1502    }
1503
1504    /**
1505     * Reads a version 2 block (version 1 blocks not supported and not expected). Tries to do as
1506     * little memory allocation as possible, using the provided on-disk size.
1507     * @param offset                the offset in the stream to read at
1508     * @param onDiskSizeWithHeaderL the on-disk size of the block, including the header, or -1 if
1509     *                              unknown; i.e. when iterating over blocks reading in the file
1510     *                              metadata info.
1511     * @param pread                 whether to use a positional read
1512     * @param updateMetrics         whether to update the metrics
1513     * @param intoHeap              allocate ByteBuff of block from heap or off-heap.
1514     * @see FSReader#readBlockData(long, long, boolean, boolean, boolean) for more details about the
1515     *      useHeap.
1516     */
1517    @Override
1518    public HFileBlock readBlockData(long offset, long onDiskSizeWithHeaderL, boolean pread,
1519      boolean updateMetrics, boolean intoHeap) throws IOException {
1520      // Get a copy of the current state of whether to validate
1521      // hbase checksums or not for this read call. This is not
1522      // thread-safe but the one constraint is that if we decide
1523      // to skip hbase checksum verification then we are
1524      // guaranteed to use hdfs checksum verification.
1525      boolean doVerificationThruHBaseChecksum = streamWrapper.shouldUseHBaseChecksum();
1526      FSDataInputStream is = streamWrapper.getStream(doVerificationThruHBaseChecksum);
1527      final Context context = Context.current().with(CONTEXT_KEY,
1528        new HFileContextAttributesBuilderConsumer(fileContext)
1529          .setSkipChecksum(doVerificationThruHBaseChecksum)
1530          .setReadType(pread ? ReadType.POSITIONAL_READ : ReadType.SEEK_PLUS_READ));
1531      try (Scope ignored = context.makeCurrent()) {
1532        HFileBlock blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread,
1533          doVerificationThruHBaseChecksum, updateMetrics, intoHeap);
1534        if (blk == null) {
1535          HFile.LOG.warn("HBase checksum verification failed for file {} at offset {} filesize {}."
1536            + " Retrying read with HDFS checksums turned on...", pathName, offset, fileSize);
1537
1538          if (!doVerificationThruHBaseChecksum) {
1539            String msg = "HBase checksum verification failed for file " + pathName + " at offset "
1540              + offset + " filesize " + fileSize + " but this cannot happen because doVerify is "
1541              + doVerificationThruHBaseChecksum;
1542            HFile.LOG.warn(msg);
1543            throw new IOException(msg); // cannot happen case here
1544          }
1545          HFile.CHECKSUM_FAILURES.increment(); // update metrics
1546
1547          // If we have a checksum failure, we fall back into a mode where
1548          // the next few reads use HDFS level checksums. We aim to make the
1549          // next CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD reads avoid
1550          // hbase checksum verification, but since this value is set without
1551          // holding any locks, it can so happen that we might actually do
1552          // a few more than precisely this number.
1553          is = this.streamWrapper.fallbackToFsChecksum(CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD);
1554          doVerificationThruHBaseChecksum = false;
1555          blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread,
1556            doVerificationThruHBaseChecksum, updateMetrics, intoHeap);
1557          if (blk != null) {
1558            HFile.LOG.warn(
1559              "HDFS checksum verification succeeded for file {} at offset {} filesize" + " {}",
1560              pathName, offset, fileSize);
1561          }
1562        }
1563        if (blk == null && !doVerificationThruHBaseChecksum) {
1564          String msg =
1565            "readBlockData failed, possibly due to " + "checksum verification failed for file "
1566              + pathName + " at offset " + offset + " filesize " + fileSize;
1567          HFile.LOG.warn(msg);
1568          throw new IOException(msg);
1569        }
1570
1571        // If there is a checksum mismatch earlier, then retry with
1572        // HBase checksums switched off and use HDFS checksum verification.
1573        // This triggers HDFS to detect and fix corrupt replicas. The
1574        // next checksumOffCount read requests will use HDFS checksums.
1575        // The decrementing of this.checksumOffCount is not thread-safe,
1576        // but it is harmless because eventually checksumOffCount will be
1577        // a negative number.
1578        streamWrapper.checksumOk();
1579        return blk;
1580      }
1581    }
1582
1583    /**
1584     * Returns Check <code>onDiskSizeWithHeaderL</code> size is healthy and then return it as an int
1585     */
1586    private static int checkAndGetSizeAsInt(final long onDiskSizeWithHeaderL, final int hdrSize)
1587      throws IOException {
1588      if (
1589        (onDiskSizeWithHeaderL < hdrSize && onDiskSizeWithHeaderL != -1)
1590          || onDiskSizeWithHeaderL >= Integer.MAX_VALUE
1591      ) {
1592        throw new IOException(
1593          "Invalid onDisksize=" + onDiskSizeWithHeaderL + ": expected to be at least " + hdrSize
1594            + " and at most " + Integer.MAX_VALUE + ", or -1");
1595      }
1596      return (int) onDiskSizeWithHeaderL;
1597    }
1598
1599    /**
1600     * Verify the passed in onDiskSizeWithHeader aligns with what is in the header else something is
1601     * not right.
1602     */
1603    private void verifyOnDiskSizeMatchesHeader(final int passedIn, final ByteBuff headerBuf,
1604      final long offset, boolean verifyChecksum) throws IOException {
1605      // Assert size provided aligns with what is in the header
1606      int fromHeader = getOnDiskSizeWithHeader(headerBuf, verifyChecksum);
1607      if (passedIn != fromHeader) {
1608        throw new IOException("Passed in onDiskSizeWithHeader=" + passedIn + " != " + fromHeader
1609          + ", offset=" + offset + ", fileContext=" + this.fileContext);
1610      }
1611    }
1612
1613    /**
1614     * Check atomic reference cache for this block's header. Cache only good if next read coming
1615     * through is next in sequence in the block. We read next block's header on the tail of reading
1616     * the previous block to save a seek. Otherwise, we have to do a seek to read the header before
1617     * we can pull in the block OR we have to backup the stream because we over-read (the next
1618     * block's header).
1619     * @see PrefetchedHeader
1620     * @return The cached block header or null if not found.
1621     * @see #cacheNextBlockHeader(long, ByteBuff, int, int)
1622     */
1623    private ByteBuff getCachedHeader(final long offset) {
1624      PrefetchedHeader ph = this.prefetchedHeader.get();
1625      return ph != null && ph.offset == offset ? ph.buf : null;
1626    }
1627
1628    /**
1629     * Save away the next blocks header in atomic reference.
1630     * @see #getCachedHeader(long)
1631     * @see PrefetchedHeader
1632     */
1633    private void cacheNextBlockHeader(final long offset, ByteBuff onDiskBlock,
1634      int onDiskSizeWithHeader, int headerLength) {
1635      PrefetchedHeader ph = new PrefetchedHeader();
1636      ph.offset = offset;
1637      onDiskBlock.get(onDiskSizeWithHeader, ph.header, 0, headerLength);
1638      this.prefetchedHeader.set(ph);
1639    }
1640
1641    private int getNextBlockOnDiskSize(boolean readNextHeader, ByteBuff onDiskBlock,
1642      int onDiskSizeWithHeader) {
1643      int nextBlockOnDiskSize = -1;
1644      if (readNextHeader) {
1645        nextBlockOnDiskSize =
1646          onDiskBlock.getIntAfterPosition(onDiskSizeWithHeader + BlockType.MAGIC_LENGTH) + hdrSize;
1647      }
1648      return nextBlockOnDiskSize;
1649    }
1650
1651    private ByteBuff allocate(int size, boolean intoHeap) {
1652      return intoHeap ? HEAP.allocate(size) : allocator.allocate(size);
1653    }
1654
1655    /**
1656     * Reads a version 2 block.
1657     * @param offset                the offset in the stream to read at.
1658     * @param onDiskSizeWithHeaderL the on-disk size of the block, including the header and
1659     *                              checksums if present or -1 if unknown (as a long). Can be -1 if
1660     *                              we are doing raw iteration of blocks as when loading up file
1661     *                              metadata; i.e. the first read of a new file. Usually non-null
1662     *                              gotten from the file index.
1663     * @param pread                 whether to use a positional read
1664     * @param verifyChecksum        Whether to use HBase checksums. If HBase checksum is switched
1665     *                              off, then use HDFS checksum. Can also flip on/off reading same
1666     *                              file if we hit a troublesome patch in an hfile.
1667     * @param updateMetrics         whether need to update the metrics.
1668     * @param intoHeap              allocate the ByteBuff of block from heap or off-heap.
1669     * @return the HFileBlock or null if there is a HBase checksum mismatch
1670     */
1671    protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset,
1672      long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum, boolean updateMetrics,
1673      boolean intoHeap) throws IOException {
1674      if (offset < 0) {
1675        throw new IOException("Invalid offset=" + offset + " trying to read " + "block (onDiskSize="
1676          + onDiskSizeWithHeaderL + ")");
1677      }
1678
1679      final Span span = Span.current();
1680      final AttributesBuilder attributesBuilder = Attributes.builder();
1681      Optional.of(Context.current()).map(val -> val.get(CONTEXT_KEY))
1682        .ifPresent(c -> c.accept(attributesBuilder));
1683      int onDiskSizeWithHeader = checkAndGetSizeAsInt(onDiskSizeWithHeaderL, hdrSize);
1684      // Try and get cached header. Will serve us in rare case where onDiskSizeWithHeaderL is -1
1685      // and will save us having to seek the stream backwards to reread the header we
1686      // read the last time through here.
1687      ByteBuff headerBuf = getCachedHeader(offset);
1688      LOG.trace(
1689        "Reading {} at offset={}, pread={}, verifyChecksum={}, cachedHeader={}, "
1690          + "onDiskSizeWithHeader={}",
1691        this.fileContext.getHFileName(), offset, pread, verifyChecksum, headerBuf,
1692        onDiskSizeWithHeader);
1693      // This is NOT same as verifyChecksum. This latter is whether to do hbase
1694      // checksums. Can change with circumstances. The below flag is whether the
1695      // file has support for checksums (version 2+).
1696      boolean checksumSupport = this.fileContext.isUseHBaseChecksum();
1697      long startTime = EnvironmentEdgeManager.currentTime();
1698      if (onDiskSizeWithHeader <= 0) {
1699        // We were not passed the block size. Need to get it from the header. If header was
1700        // not cached (see getCachedHeader above), need to seek to pull it in. This is costly
1701        // and should happen very rarely. Currently happens on open of a hfile reader where we
1702        // read the trailer blocks to pull in the indices. Otherwise, we are reading block sizes
1703        // out of the hfile index. To check, enable TRACE in this file and you'll get an exception
1704        // in a LOG every time we seek. See HBASE-17072 for more detail.
1705        if (headerBuf == null) {
1706          if (LOG.isTraceEnabled()) {
1707            LOG.trace("Extra seek to get block size!", new RuntimeException());
1708          }
1709          span.addEvent("Extra seek to get block size!", attributesBuilder.build());
1710          headerBuf = HEAP.allocate(hdrSize);
1711          readAtOffset(is, headerBuf, hdrSize, false, offset, pread);
1712          headerBuf.rewind();
1713        }
1714        onDiskSizeWithHeader = getOnDiskSizeWithHeader(headerBuf, checksumSupport);
1715      }
1716      int preReadHeaderSize = headerBuf == null ? 0 : hdrSize;
1717      // Allocate enough space to fit the next block's header too; saves a seek next time through.
1718      // onDiskBlock is whole block + header + checksums then extra hdrSize to read next header;
1719      // onDiskSizeWithHeader is header, body, and any checksums if present. preReadHeaderSize
1720      // says where to start reading. If we have the header cached, then we don't need to read
1721      // it again and we can likely read from last place we left off w/o need to backup and reread
1722      // the header we read last time through here.
1723      ByteBuff onDiskBlock = this.allocate(onDiskSizeWithHeader + hdrSize, intoHeap);
1724      boolean initHFileBlockSuccess = false;
1725      try {
1726        if (headerBuf != null) {
1727          onDiskBlock.put(0, headerBuf, 0, hdrSize).position(hdrSize);
1728        }
1729        boolean readNextHeader = readAtOffset(is, onDiskBlock,
1730          onDiskSizeWithHeader - preReadHeaderSize, true, offset + preReadHeaderSize, pread);
1731        onDiskBlock.rewind(); // in case of moving position when copying a cached header
1732        int nextBlockOnDiskSize =
1733          getNextBlockOnDiskSize(readNextHeader, onDiskBlock, onDiskSizeWithHeader);
1734        if (headerBuf == null) {
1735          headerBuf = onDiskBlock.duplicate().position(0).limit(hdrSize);
1736        }
1737        // Do a few checks before we go instantiate HFileBlock.
1738        assert onDiskSizeWithHeader > this.hdrSize;
1739        verifyOnDiskSizeMatchesHeader(onDiskSizeWithHeader, headerBuf, offset, checksumSupport);
1740        ByteBuff curBlock = onDiskBlock.duplicate().position(0).limit(onDiskSizeWithHeader);
1741        // Verify checksum of the data before using it for building HFileBlock.
1742        if (verifyChecksum && !validateChecksum(offset, curBlock, hdrSize)) {
1743          return null;
1744        }
1745        // remove checksum from buffer now that it's verified
1746        int sizeWithoutChecksum = curBlock.getInt(Header.ON_DISK_DATA_SIZE_WITH_HEADER_INDEX);
1747        curBlock.limit(sizeWithoutChecksum);
1748        long duration = EnvironmentEdgeManager.currentTime() - startTime;
1749        if (updateMetrics) {
1750          HFile.updateReadLatency(duration, pread);
1751        }
1752        // The onDiskBlock will become the headerAndDataBuffer for this block.
1753        // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already
1754        // contains the header of next block, so no need to set next block's header in it.
1755        HFileBlock hFileBlock = createFromBuff(curBlock, checksumSupport, offset,
1756          nextBlockOnDiskSize, fileContext, intoHeap ? HEAP : allocator);
1757        // Run check on uncompressed sizings.
1758        if (!fileContext.isCompressedOrEncrypted()) {
1759          hFileBlock.sanityCheckUncompressed();
1760        }
1761        LOG.trace("Read {} in {} ms", hFileBlock, duration);
1762        span.addEvent("Read block", attributesBuilder.build());
1763        // Cache next block header if we read it for the next time through here.
1764        if (nextBlockOnDiskSize != -1) {
1765          cacheNextBlockHeader(offset + hFileBlock.getOnDiskSizeWithHeader(), onDiskBlock,
1766            onDiskSizeWithHeader, hdrSize);
1767        }
1768        initHFileBlockSuccess = true;
1769        return hFileBlock;
1770      } finally {
1771        if (!initHFileBlockSuccess) {
1772          onDiskBlock.release();
1773        }
1774      }
1775    }
1776
1777    @Override
1778    public void setIncludesMemStoreTS(boolean includesMemstoreTS) {
1779      this.fileContext =
1780        new HFileContextBuilder(this.fileContext).withIncludesMvcc(includesMemstoreTS).build();
1781    }
1782
1783    @Override
1784    public void setDataBlockEncoder(HFileDataBlockEncoder encoder, Configuration conf) {
1785      encodedBlockDecodingCtx = encoder.newDataBlockDecodingContext(conf, fileContext);
1786    }
1787
1788    @Override
1789    public HFileBlockDecodingContext getBlockDecodingContext() {
1790      return this.encodedBlockDecodingCtx;
1791    }
1792
1793    @Override
1794    public HFileBlockDecodingContext getDefaultBlockDecodingContext() {
1795      return this.defaultDecodingCtx;
1796    }
1797
1798    /**
1799     * Generates the checksum for the header as well as the data and then validates it. If the block
1800     * doesn't uses checksum, returns false.
1801     * @return True if checksum matches, else false.
1802     */
1803    private boolean validateChecksum(long offset, ByteBuff data, int hdrSize) {
1804      // If this is an older version of the block that does not have checksums, then return false
1805      // indicating that checksum verification did not succeed. Actually, this method should never
1806      // be called when the minorVersion is 0, thus this is a defensive check for a cannot-happen
1807      // case. Since this is a cannot-happen case, it is better to return false to indicate a
1808      // checksum validation failure.
1809      if (!fileContext.isUseHBaseChecksum()) {
1810        return false;
1811      }
1812      return ChecksumUtil.validateChecksum(data, pathName, offset, hdrSize);
1813    }
1814
1815    @Override
1816    public void closeStreams() throws IOException {
1817      streamWrapper.close();
1818    }
1819
1820    @Override
1821    public void unbufferStream() {
1822      // To handle concurrent reads, ensure that no other client is accessing the streams while we
1823      // unbuffer it.
1824      if (streamLock.tryLock()) {
1825        try {
1826          this.streamWrapper.unbuffer();
1827        } finally {
1828          streamLock.unlock();
1829        }
1830      }
1831    }
1832
1833    @Override
1834    public String toString() {
1835      return "hfs=" + hfs + ", path=" + pathName + ", fileContext=" + fileContext;
1836    }
1837  }
1838
1839  /** An additional sanity-check in case no compression or encryption is being used. */
1840  void sanityCheckUncompressed() throws IOException {
1841    if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader + totalChecksumBytes()) {
1842      throw new IOException("Using no compression but " + "onDiskSizeWithoutHeader="
1843        + onDiskSizeWithoutHeader + ", " + "uncompressedSizeWithoutHeader="
1844        + uncompressedSizeWithoutHeader + ", numChecksumbytes=" + totalChecksumBytes());
1845    }
1846  }
1847
1848  // Cacheable implementation
1849  @Override
1850  public int getSerializedLength() {
1851    if (buf != null) {
1852      // Include extra bytes for block metadata.
1853      return this.buf.limit() + BLOCK_METADATA_SPACE;
1854    }
1855    return 0;
1856  }
1857
1858  // Cacheable implementation
1859  @Override
1860  public void serialize(ByteBuffer destination, boolean includeNextBlockMetadata) {
1861    this.buf.get(destination, 0, getSerializedLength() - BLOCK_METADATA_SPACE);
1862    destination = addMetaData(destination, includeNextBlockMetadata);
1863
1864    // Make it ready for reading. flip sets position to zero and limit to current position which
1865    // is what we want if we do not want to serialize the block plus checksums if present plus
1866    // metadata.
1867    destination.flip();
1868  }
1869
1870  /**
1871   * For use by bucketcache. This exposes internals.
1872   */
1873  public ByteBuffer getMetaData(ByteBuffer bb) {
1874    bb = addMetaData(bb, true);
1875    bb.flip();
1876    return bb;
1877  }
1878
1879  /**
1880   * Adds metadata at current position (position is moved forward). Does not flip or reset.
1881   * @return The passed <code>destination</code> with metadata added.
1882   */
1883  private ByteBuffer addMetaData(final ByteBuffer destination, boolean includeNextBlockMetadata) {
1884    destination.put(this.fileContext.isUseHBaseChecksum() ? (byte) 1 : (byte) 0);
1885    destination.putLong(this.offset);
1886    if (includeNextBlockMetadata) {
1887      destination.putInt(this.nextBlockOnDiskSize);
1888    }
1889    return destination;
1890  }
1891
1892  // Cacheable implementation
1893  @Override
1894  public CacheableDeserializer<Cacheable> getDeserializer() {
1895    return HFileBlock.BLOCK_DESERIALIZER;
1896  }
1897
1898  @Override
1899  public int hashCode() {
1900    int result = 1;
1901    result = result * 31 + blockType.hashCode();
1902    result = result * 31 + nextBlockOnDiskSize;
1903    result = result * 31 + (int) (offset ^ (offset >>> 32));
1904    result = result * 31 + onDiskSizeWithoutHeader;
1905    result = result * 31 + (int) (prevBlockOffset ^ (prevBlockOffset >>> 32));
1906    result = result * 31 + uncompressedSizeWithoutHeader;
1907    result = result * 31 + buf.hashCode();
1908    return result;
1909  }
1910
1911  @Override
1912  public boolean equals(Object comparison) {
1913    if (this == comparison) {
1914      return true;
1915    }
1916    if (comparison == null) {
1917      return false;
1918    }
1919    if (!(comparison instanceof HFileBlock)) {
1920      return false;
1921    }
1922
1923    HFileBlock castedComparison = (HFileBlock) comparison;
1924
1925    if (castedComparison.blockType != this.blockType) {
1926      return false;
1927    }
1928    if (castedComparison.nextBlockOnDiskSize != this.nextBlockOnDiskSize) {
1929      return false;
1930    }
1931    // Offset is important. Needed when we have to remake cachekey when block is returned to cache.
1932    if (castedComparison.offset != this.offset) {
1933      return false;
1934    }
1935    if (castedComparison.onDiskSizeWithoutHeader != this.onDiskSizeWithoutHeader) {
1936      return false;
1937    }
1938    if (castedComparison.prevBlockOffset != this.prevBlockOffset) {
1939      return false;
1940    }
1941    if (castedComparison.uncompressedSizeWithoutHeader != this.uncompressedSizeWithoutHeader) {
1942      return false;
1943    }
1944    if (
1945      ByteBuff.compareTo(this.buf, 0, this.buf.limit(), castedComparison.buf, 0,
1946        castedComparison.buf.limit()) != 0
1947    ) {
1948      return false;
1949    }
1950    return true;
1951  }
1952
1953  DataBlockEncoding getDataBlockEncoding() {
1954    if (blockType == BlockType.ENCODED_DATA) {
1955      return DataBlockEncoding.getEncodingById(getDataBlockEncodingId());
1956    }
1957    return DataBlockEncoding.NONE;
1958  }
1959
1960  byte getChecksumType() {
1961    return this.fileContext.getChecksumType().getCode();
1962  }
1963
1964  int getBytesPerChecksum() {
1965    return this.fileContext.getBytesPerChecksum();
1966  }
1967
1968  /** Returns the size of data on disk + header. Excludes checksum. */
1969  int getOnDiskDataSizeWithHeader() {
1970    return this.onDiskDataSizeWithHeader;
1971  }
1972
1973  /**
1974   * Calculate the number of bytes required to store all the checksums for this block. Each checksum
1975   * value is a 4 byte integer.
1976   */
1977  int totalChecksumBytes() {
1978    // If the hfile block has minorVersion 0, then there are no checksum
1979    // data to validate. Similarly, a zero value in this.bytesPerChecksum
1980    // indicates that cached blocks do not have checksum data because
1981    // checksums were already validated when the block was read from disk.
1982    if (!fileContext.isUseHBaseChecksum() || this.fileContext.getBytesPerChecksum() == 0) {
1983      return 0;
1984    }
1985    return (int) ChecksumUtil.numBytes(onDiskDataSizeWithHeader,
1986      this.fileContext.getBytesPerChecksum());
1987  }
1988
1989  /**
1990   * Returns the size of this block header.
1991   */
1992  public int headerSize() {
1993    return headerSize(this.fileContext.isUseHBaseChecksum());
1994  }
1995
1996  /**
1997   * Maps a minor version to the size of the header.
1998   */
1999  public static int headerSize(boolean usesHBaseChecksum) {
2000    return usesHBaseChecksum
2001      ? HConstants.HFILEBLOCK_HEADER_SIZE
2002      : HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
2003  }
2004
2005  /**
2006   * Return the appropriate DUMMY_HEADER for the minor version
2007   */
2008  // TODO: Why is this in here?
2009  byte[] getDummyHeaderForVersion() {
2010    return getDummyHeaderForVersion(this.fileContext.isUseHBaseChecksum());
2011  }
2012
2013  /**
2014   * Return the appropriate DUMMY_HEADER for the minor version
2015   */
2016  static private byte[] getDummyHeaderForVersion(boolean usesHBaseChecksum) {
2017    return usesHBaseChecksum ? HConstants.HFILEBLOCK_DUMMY_HEADER : DUMMY_HEADER_NO_CHECKSUM;
2018  }
2019
2020  /**
2021   * @return This HFileBlocks fileContext which will a derivative of the fileContext for the file
2022   *         from which this block's data was originally read.
2023   */
2024  public HFileContext getHFileContext() {
2025    return this.fileContext;
2026  }
2027
2028  /**
2029   * Convert the contents of the block header into a human readable string. This is mostly helpful
2030   * for debugging. This assumes that the block has minor version > 0.
2031   */
2032  static String toStringHeader(ByteBuff buf) throws IOException {
2033    byte[] magicBuf = new byte[Math.min(buf.limit() - buf.position(), BlockType.MAGIC_LENGTH)];
2034    buf.get(magicBuf);
2035    BlockType bt = BlockType.parse(magicBuf, 0, BlockType.MAGIC_LENGTH);
2036    int compressedBlockSizeNoHeader = buf.getInt();
2037    int uncompressedBlockSizeNoHeader = buf.getInt();
2038    long prevBlockOffset = buf.getLong();
2039    byte cksumtype = buf.get();
2040    long bytesPerChecksum = buf.getInt();
2041    long onDiskDataSizeWithHeader = buf.getInt();
2042    return " Header dump: magic: " + Bytes.toString(magicBuf) + " blockType " + bt
2043      + " compressedBlockSizeNoHeader " + compressedBlockSizeNoHeader
2044      + " uncompressedBlockSizeNoHeader " + uncompressedBlockSizeNoHeader + " prevBlockOffset "
2045      + prevBlockOffset + " checksumType " + ChecksumType.codeToType(cksumtype)
2046      + " bytesPerChecksum " + bytesPerChecksum + " onDiskDataSizeWithHeader "
2047      + onDiskDataSizeWithHeader;
2048  }
2049
2050  /**
2051   * Creates a new HFileBlockBuilder from the existing block and a new ByteBuff. The builder will be
2052   * loaded with all of the original fields from blk, except now using the newBuff and setting
2053   * isSharedMem based on the source of the passed in newBuff. An existing HFileBlock may have been
2054   * an {@link ExclusiveMemHFileBlock}, but the new buffer might call for a
2055   * {@link SharedMemHFileBlock}. Or vice versa.
2056   * @param blk     the block to clone from
2057   * @param newBuff the new buffer to use
2058   */
2059  private static HFileBlockBuilder createBuilder(HFileBlock blk, ByteBuff newBuff) {
2060    return new HFileBlockBuilder().withBlockType(blk.blockType)
2061      .withOnDiskSizeWithoutHeader(blk.onDiskSizeWithoutHeader)
2062      .withUncompressedSizeWithoutHeader(blk.uncompressedSizeWithoutHeader)
2063      .withPrevBlockOffset(blk.prevBlockOffset).withByteBuff(newBuff).withOffset(blk.offset)
2064      .withOnDiskDataSizeWithHeader(blk.onDiskDataSizeWithHeader)
2065      .withNextBlockOnDiskSize(blk.nextBlockOnDiskSize).withHFileContext(blk.fileContext)
2066      .withByteBuffAllocator(blk.allocator).withShared(!newBuff.hasArray());
2067  }
2068
2069  private static HFileBlock shallowClone(HFileBlock blk, ByteBuff newBuf) {
2070    return createBuilder(blk, newBuf).build();
2071  }
2072
2073  static HFileBlock deepCloneOnHeap(HFileBlock blk) {
2074    ByteBuff deepCloned = ByteBuff.wrap(ByteBuffer.wrap(blk.buf.toBytes(0, blk.buf.limit())));
2075    return createBuilder(blk, deepCloned).build();
2076  }
2077}