001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP;
021import static org.apache.hadoop.hbase.io.hfile.BlockCompressedSizePredicator.BLOCK_COMPRESSED_SIZE_PREDICATOR;
022import static org.apache.hadoop.hbase.io.hfile.trace.HFileContextAttributesBuilderConsumer.CONTEXT_KEY;
023
024import io.opentelemetry.api.common.Attributes;
025import io.opentelemetry.api.common.AttributesBuilder;
026import io.opentelemetry.api.trace.Span;
027import io.opentelemetry.context.Context;
028import io.opentelemetry.context.Scope;
029import java.io.DataInputStream;
030import java.io.DataOutput;
031import java.io.DataOutputStream;
032import java.io.IOException;
033import java.nio.ByteBuffer;
034import java.util.ArrayList;
035import java.util.List;
036import java.util.Optional;
037import java.util.concurrent.atomic.AtomicReference;
038import java.util.concurrent.locks.Lock;
039import java.util.concurrent.locks.ReentrantLock;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.fs.FSDataInputStream;
042import org.apache.hadoop.fs.FSDataOutputStream;
043import org.apache.hadoop.hbase.ExtendedCell;
044import org.apache.hadoop.hbase.HBaseInterfaceAudience;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.fs.HFileSystem;
047import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
048import org.apache.hadoop.hbase.io.ByteBuffAllocator;
049import org.apache.hadoop.hbase.io.ByteBuffInputStream;
050import org.apache.hadoop.hbase.io.ByteBufferWriterDataOutputStream;
051import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
052import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
053import org.apache.hadoop.hbase.io.encoding.EncodingState;
054import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
055import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
056import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
057import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
058import org.apache.hadoop.hbase.io.hfile.trace.HFileContextAttributesBuilderConsumer;
059import org.apache.hadoop.hbase.io.util.BlockIOUtils;
060import org.apache.hadoop.hbase.monitoring.ThreadLocalServerSideScanMetrics;
061import org.apache.hadoop.hbase.nio.ByteBuff;
062import org.apache.hadoop.hbase.nio.MultiByteBuff;
063import org.apache.hadoop.hbase.nio.SingleByteBuff;
064import org.apache.hadoop.hbase.regionserver.ShipperListener;
065import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.ReadType;
066import org.apache.hadoop.hbase.util.Bytes;
067import org.apache.hadoop.hbase.util.ChecksumType;
068import org.apache.hadoop.hbase.util.ClassSize;
069import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
070import org.apache.hadoop.util.ReflectionUtils;
071import org.apache.yetus.audience.InterfaceAudience;
072import org.slf4j.Logger;
073import org.slf4j.LoggerFactory;
074
075import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
076
077/**
078 * Cacheable Blocks of an {@link HFile} version 2 file. Version 2 was introduced in hbase-0.92.0.
079 * <p>
080 * Version 1 was the original file block. Version 2 was introduced when we changed the hbase file
081 * format to support multi-level block indexes and compound bloom filters (HBASE-3857). Support for
082 * Version 1 was removed in hbase-1.3.0.
083 * <h3>HFileBlock: Version 2</h3> In version 2, a block is structured as follows:
084 * <ul>
085 * <li><b>Header:</b> See Writer#putHeader() for where header is written; header total size is
086 * HFILEBLOCK_HEADER_SIZE
087 * <ul>
088 * <li>0. blockType: Magic record identifying the {@link BlockType} (8 bytes): e.g.
089 * <code>DATABLK*</code>
090 * <li>1. onDiskSizeWithoutHeader: Compressed -- a.k.a 'on disk' -- block size, excluding header,
091 * but including tailing checksum bytes (4 bytes)
092 * <li>2. uncompressedSizeWithoutHeader: Uncompressed block size, excluding header, and excluding
093 * checksum bytes (4 bytes)
094 * <li>3. prevBlockOffset: The offset of the previous block of the same type (8 bytes). This is used
095 * to navigate to the previous block without having to go to the block index
096 * <li>4: For minorVersions &gt;=1, the ordinal describing checksum type (1 byte)
097 * <li>5: For minorVersions &gt;=1, the number of data bytes/checksum chunk (4 bytes)
098 * <li>6: onDiskDataSizeWithHeader: For minorVersions &gt;=1, the size of data 'on disk', including
099 * header, excluding checksums (4 bytes)
100 * </ul>
101 * </li>
102 * <li><b>Raw/Compressed/Encrypted/Encoded data:</b> The compression algorithm is the same for all
103 * the blocks in an {@link HFile}. If compression is NONE, this is just raw, serialized Cells.
104 * <li><b>Tail:</b> For minorVersions &gt;=1, a series of 4 byte checksums, one each for the number
105 * of bytes specified by bytesPerChecksum.
106 * </ul>
107 * <h3>Caching</h3> Caches cache whole blocks with trailing checksums if any. We then tag on some
108 * metadata, the content of BLOCK_METADATA_SPACE which will be flag on if we are doing 'hbase'
109 * checksums and then the offset into the file which is needed when we re-make a cache key when we
110 * return the block to the cache as 'done'. See {@link Cacheable#serialize(ByteBuffer, boolean)} and
111 * {@link Cacheable#getDeserializer()}.
112 * <p>
113 * TODO: Should we cache the checksums? Down in Writer#getBlockForCaching(CacheConfig) where we make
114 * a block to cache-on-write, there is an attempt at turning off checksums. This is not the only
115 * place we get blocks to cache. We also will cache the raw return from an hdfs read. In this case,
116 * the checksums may be present. If the cache is backed by something that doesn't do ECC, say an
117 * SSD, we might want to preserve checksums. For now this is open question.
118 * <p>
119 * TODO: Over in BucketCache, we save a block allocation by doing a custom serialization. Be sure to
120 * change it if serialization changes in here. Could we add a method here that takes an IOEngine and
121 * that then serializes to it rather than expose our internals over in BucketCache? IOEngine is in
122 * the bucket subpackage. Pull it up? Then this class knows about bucketcache. Ugh.
123 */
124@InterfaceAudience.Private
125public class HFileBlock implements Cacheable {
126  private static final Logger LOG = LoggerFactory.getLogger(HFileBlock.class);
127  public static final long FIXED_OVERHEAD = ClassSize.estimateBase(HFileBlock.class, false);
128
129  // Block Header fields.
130
131  // TODO: encapsulate Header related logic in this inner class.
132  static class Header {
133    // Format of header is:
134    // 8 bytes - block magic
135    // 4 bytes int - onDiskSizeWithoutHeader
136    // 4 bytes int - uncompressedSizeWithoutHeader
137    // 8 bytes long - prevBlockOffset
138    // The following 3 are only present if header contains checksum information
139    // 1 byte - checksum type
140    // 4 byte int - bytes per checksum
141    // 4 byte int - onDiskDataSizeWithHeader
142    static int BLOCK_MAGIC_INDEX = 0;
143    static int ON_DISK_SIZE_WITHOUT_HEADER_INDEX = 8;
144    static int UNCOMPRESSED_SIZE_WITHOUT_HEADER_INDEX = 12;
145    static int PREV_BLOCK_OFFSET_INDEX = 16;
146    static int CHECKSUM_TYPE_INDEX = 24;
147    static int BYTES_PER_CHECKSUM_INDEX = 25;
148    static int ON_DISK_DATA_SIZE_WITH_HEADER_INDEX = 29;
149  }
150
151  /** Type of block. Header field 0. */
152  private BlockType blockType;
153
154  /**
155   * Size on disk excluding header, including checksum. Header field 1.
156   * @see Writer#putHeader(byte[], int, int, int, int)
157   */
158  private int onDiskSizeWithoutHeader;
159
160  /**
161   * Size of pure data. Does not include header or checksums. Header field 2.
162   * @see Writer#putHeader(byte[], int, int, int, int)
163   */
164  private int uncompressedSizeWithoutHeader;
165
166  /**
167   * The offset of the previous block on disk. Header field 3.
168   * @see Writer#putHeader(byte[], int, int, int, int)
169   */
170  private long prevBlockOffset;
171
172  /**
173   * Size on disk of header + data. Excludes checksum. Header field 6, OR calculated from
174   * {@link #onDiskSizeWithoutHeader} when using HDFS checksum.
175   * @see Writer#putHeader(byte[], int, int, int, int)
176   */
177  private final int onDiskDataSizeWithHeader;
178  // End of Block Header fields.
179
180  /**
181   * The in-memory representation of the hfile block. Can be on or offheap. Can be backed by a
182   * single ByteBuffer or by many. Make no assumptions.
183   * <p>
184   * Be careful reading from this <code>buf</code>. Duplicate and work on the duplicate or if not,
185   * be sure to reset position and limit else trouble down the road.
186   * <p>
187   * TODO: Make this read-only once made.
188   * <p>
189   * We are using the ByteBuff type. ByteBuffer is not extensible yet we need to be able to have a
190   * ByteBuffer-like API across multiple ByteBuffers reading from a cache such as BucketCache. So,
191   * we have this ByteBuff type. Unfortunately, it is spread all about HFileBlock. Would be good if
192   * could be confined to cache-use only but hard-to-do.
193   * <p>
194   * NOTE: this byteBuff including HFileBlock header and data, but excluding checksum.
195   */
196  private ByteBuff bufWithoutChecksum;
197
198  /**
199   * Meta data that holds meta information on the hfileblock.
200   */
201  private final HFileContext fileContext;
202
203  /**
204   * The offset of this block in the file. Populated by the reader for convenience of access. This
205   * offset is not part of the block header.
206   */
207  private long offset = UNSET;
208
209  /**
210   * The on-disk size of the next block, including the header and checksums if present. UNSET if
211   * unknown. Blocks try to carry the size of the next block to read in this data member. Usually we
212   * get block sizes from the hfile index but sometimes the index is not available: e.g. when we
213   * read the indexes themselves (indexes are stored in blocks, we do not have an index for the
214   * indexes). Saves seeks especially around file open when there is a flurry of reading in hfile
215   * metadata.
216   */
217  private int nextBlockOnDiskSize = UNSET;
218
219  private ByteBuffAllocator allocator;
220
221  /**
222   * On a checksum failure, do these many succeeding read requests using hdfs checksums before
223   * auto-reenabling hbase checksum verification.
224   */
225  static final int CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD = 3;
226
227  private static int UNSET = -1;
228  public static final boolean FILL_HEADER = true;
229  public static final boolean DONT_FILL_HEADER = false;
230
231  // How to get the estimate correctly? if it is a singleBB?
232  public static final int MULTI_BYTE_BUFFER_HEAP_SIZE =
233    (int) ClassSize.estimateBase(MultiByteBuff.class, false);
234
235  /**
236   * Space for metadata on a block that gets stored along with the block when we cache it. There are
237   * a few bytes stuck on the end of the HFileBlock that we pull in from HDFS. 8 bytes are for the
238   * offset of this block (long) in the file. Offset is important because is is used when we remake
239   * the CacheKey when we return block to the cache when done. There is also a flag on whether
240   * checksumming is being done by hbase or not. See class comment for note on uncertain state of
241   * checksumming of blocks that come out of cache (should we or should we not?). Finally there are
242   * 4 bytes to hold the length of the next block which can save a seek on occasion if available.
243   * (This EXTRA info came in with original commit of the bucketcache, HBASE-7404. It was formerly
244   * known as EXTRA_SERIALIZATION_SPACE).
245   */
246  public static final int BLOCK_METADATA_SPACE =
247    Bytes.SIZEOF_BYTE + Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT;
248
249  /**
250   * Each checksum value is an integer that can be stored in 4 bytes.
251   */
252  static final int CHECKSUM_SIZE = Bytes.SIZEOF_INT;
253
254  static final byte[] DUMMY_HEADER_NO_CHECKSUM =
255    new byte[HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM];
256
257  /**
258   * Used deserializing blocks from Cache. <code>
259   * ++++++++++++++
260   * + HFileBlock +
261   * ++++++++++++++
262   * + Checksums  + <= Optional
263   * ++++++++++++++
264   * + Metadata!  + <= See note on BLOCK_METADATA_SPACE above.
265   * ++++++++++++++
266   * </code>
267   * @see #serialize(ByteBuffer, boolean)
268   */
269  public static final CacheableDeserializer<Cacheable> BLOCK_DESERIALIZER = new BlockDeserializer();
270
271  public static final class BlockDeserializer implements CacheableDeserializer<Cacheable> {
272    private BlockDeserializer() {
273    }
274
275    @Override
276    public HFileBlock deserialize(ByteBuff buf, ByteBuffAllocator alloc) throws IOException {
277      // The buf has the file block followed by block metadata.
278      // Set limit to just before the BLOCK_METADATA_SPACE then rewind.
279      buf.limit(buf.limit() - BLOCK_METADATA_SPACE).rewind();
280      // Get a new buffer to pass the HFileBlock for it to 'own'.
281      ByteBuff newByteBuff = buf.slice();
282      // Read out the BLOCK_METADATA_SPACE content and shove into our HFileBlock.
283      buf.position(buf.limit());
284      buf.limit(buf.limit() + HFileBlock.BLOCK_METADATA_SPACE);
285      boolean usesChecksum = buf.get() == (byte) 1;
286      long offset = buf.getLong();
287      int nextBlockOnDiskSize = buf.getInt();
288      return createFromBuff(newByteBuff, usesChecksum, offset, nextBlockOnDiskSize, null, alloc);
289    }
290
291    @Override
292    public int getDeserializerIdentifier() {
293      return DESERIALIZER_IDENTIFIER;
294    }
295  }
296
297  private static final int DESERIALIZER_IDENTIFIER;
298  static {
299    DESERIALIZER_IDENTIFIER =
300      CacheableDeserializerIdManager.registerDeserializer(BLOCK_DESERIALIZER);
301  }
302
303  private final int totalChecksumBytes;
304
305  /**
306   * Creates a new {@link HFile} block from the given fields. This constructor is used only while
307   * writing blocks and caching, and is sitting in a byte buffer and we want to stuff the block into
308   * cache.
309   * <p>
310   * TODO: The caller presumes no checksumming
311   * <p>
312   * TODO: HFile block writer can also off-heap ?
313   * </p>
314   * required of this block instance since going into cache; checksum already verified on underlying
315   * block data pulled in from filesystem. Is that correct? What if cache is SSD?
316   * @param blockType                     the type of this block, see {@link BlockType}
317   * @param onDiskSizeWithoutHeader       see {@link #onDiskSizeWithoutHeader}
318   * @param uncompressedSizeWithoutHeader see {@link #uncompressedSizeWithoutHeader}
319   * @param prevBlockOffset               see {@link #prevBlockOffset}
320   * @param buf                           block buffer with header
321   *                                      ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes)
322   * @param fillHeader                    when true, write the first 4 header fields into passed
323   *                                      buffer.
324   * @param offset                        the file offset the block was read from
325   * @param onDiskDataSizeWithHeader      see {@link #onDiskDataSizeWithHeader}
326   * @param fileContext                   HFile meta data
327   */
328  public HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader,
329    int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuff buf, boolean fillHeader,
330    long offset, int nextBlockOnDiskSize, int onDiskDataSizeWithHeader, HFileContext fileContext,
331    ByteBuffAllocator allocator) {
332    this.blockType = blockType;
333    this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader;
334    this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader;
335    this.prevBlockOffset = prevBlockOffset;
336    this.offset = offset;
337    this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader;
338    this.nextBlockOnDiskSize = nextBlockOnDiskSize;
339    this.fileContext = fileContext;
340    this.allocator = allocator;
341    this.bufWithoutChecksum = buf;
342    if (fillHeader) {
343      overwriteHeader();
344    }
345    this.bufWithoutChecksum.rewind();
346    this.totalChecksumBytes = computeTotalChecksumBytes();
347  }
348
349  /**
350   * Creates a block from an existing buffer starting with a header. Rewinds and takes ownership of
351   * the buffer. By definition of rewind, ignores the buffer position, but if you slice the buffer
352   * beforehand, it will rewind to that point.
353   * @param buf Has header, content, and trailing checksums if present.
354   */
355  static HFileBlock createFromBuff(ByteBuff buf, boolean usesHBaseChecksum, final long offset,
356    final int nextBlockOnDiskSize, HFileContext fileContext, ByteBuffAllocator allocator)
357    throws IOException {
358    buf.rewind();
359    final BlockType blockType = BlockType.read(buf);
360    final int onDiskSizeWithoutHeader = buf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX);
361    final int uncompressedSizeWithoutHeader =
362      buf.getInt(Header.UNCOMPRESSED_SIZE_WITHOUT_HEADER_INDEX);
363    final long prevBlockOffset = buf.getLong(Header.PREV_BLOCK_OFFSET_INDEX);
364    // This constructor is called when we deserialize a block from cache and when we read a block in
365    // from the fs. fileCache is null when deserialized from cache so need to make up one.
366    HFileContextBuilder fileContextBuilder =
367      fileContext != null ? new HFileContextBuilder(fileContext) : new HFileContextBuilder();
368    fileContextBuilder.withHBaseCheckSum(usesHBaseChecksum);
369    int onDiskDataSizeWithHeader;
370    if (usesHBaseChecksum) {
371      byte checksumType = buf.get(Header.CHECKSUM_TYPE_INDEX);
372      int bytesPerChecksum = buf.getInt(Header.BYTES_PER_CHECKSUM_INDEX);
373      onDiskDataSizeWithHeader = buf.getInt(Header.ON_DISK_DATA_SIZE_WITH_HEADER_INDEX);
374      // Use the checksum type and bytes per checksum from header, not from fileContext.
375      fileContextBuilder.withChecksumType(ChecksumType.codeToType(checksumType));
376      fileContextBuilder.withBytesPerCheckSum(bytesPerChecksum);
377    } else {
378      fileContextBuilder.withChecksumType(ChecksumType.NULL);
379      fileContextBuilder.withBytesPerCheckSum(0);
380      // Need to fix onDiskDataSizeWithHeader; there are not checksums after-block-data
381      onDiskDataSizeWithHeader = onDiskSizeWithoutHeader + headerSize(usesHBaseChecksum);
382    }
383    fileContext = fileContextBuilder.build();
384    assert usesHBaseChecksum == fileContext.isUseHBaseChecksum();
385    return new HFileBlockBuilder().withBlockType(blockType)
386      .withOnDiskSizeWithoutHeader(onDiskSizeWithoutHeader)
387      .withUncompressedSizeWithoutHeader(uncompressedSizeWithoutHeader)
388      .withPrevBlockOffset(prevBlockOffset).withOffset(offset)
389      .withOnDiskDataSizeWithHeader(onDiskDataSizeWithHeader)
390      .withNextBlockOnDiskSize(nextBlockOnDiskSize).withHFileContext(fileContext)
391      .withByteBuffAllocator(allocator).withByteBuff(buf.rewind()).withShared(!buf.hasArray())
392      .build();
393  }
394
395  /**
396   * Parse total on disk size including header and checksum.
397   * @param headerBuf       Header ByteBuffer. Presumed exact size of header.
398   * @param checksumSupport true if checksum verification is in use.
399   * @return Size of the block with header included.
400   */
401  private static int getOnDiskSizeWithHeader(final ByteBuff headerBuf, boolean checksumSupport) {
402    return headerBuf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX) + headerSize(checksumSupport);
403  }
404
405  /**
406   * @return the on-disk size of the next block (including the header size and any checksums if
407   *         present) read by peeking into the next block's header; use as a hint when doing a read
408   *         of the next block when scanning or running over a file.
409   */
410  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST)
411  public int getNextBlockOnDiskSize() {
412    return nextBlockOnDiskSize;
413  }
414
415  @Override
416  public BlockType getBlockType() {
417    return blockType;
418  }
419
420  @Override
421  public int refCnt() {
422    return bufWithoutChecksum.refCnt();
423  }
424
425  @Override
426  public HFileBlock retain() {
427    bufWithoutChecksum.retain();
428    return this;
429  }
430
431  /**
432   * Call {@link ByteBuff#release()} to decrease the reference count, if no other reference, it will
433   * return back the {@link ByteBuffer} to {@link org.apache.hadoop.hbase.io.ByteBuffAllocator}
434   */
435  @Override
436  public boolean release() {
437    return bufWithoutChecksum.release();
438  }
439
440  /**
441   * Calling this method in strategic locations where HFileBlocks are referenced may help diagnose
442   * potential buffer leaks. We pass the block itself as a default hint, but one can use
443   * {@link #touch(Object)} to pass their own hint as well.
444   */
445  @Override
446  public HFileBlock touch() {
447    return touch(this);
448  }
449
450  @Override
451  public HFileBlock touch(Object hint) {
452    bufWithoutChecksum.touch(hint);
453    return this;
454  }
455
456  /** Returns get data block encoding id that was used to encode this block */
457  short getDataBlockEncodingId() {
458    if (blockType != BlockType.ENCODED_DATA) {
459      throw new IllegalArgumentException("Querying encoder ID of a block " + "of type other than "
460        + BlockType.ENCODED_DATA + ": " + blockType);
461    }
462    return bufWithoutChecksum.getShort(headerSize());
463  }
464
465  /** Returns the on-disk size of header + data part + checksum. */
466  public int getOnDiskSizeWithHeader() {
467    return onDiskSizeWithoutHeader + headerSize();
468  }
469
470  /** Returns the on-disk size of the data part + checksum (header excluded). */
471  int getOnDiskSizeWithoutHeader() {
472    return onDiskSizeWithoutHeader;
473  }
474
475  /** Returns the uncompressed size of data part (header and checksum excluded). */
476  public int getUncompressedSizeWithoutHeader() {
477    return uncompressedSizeWithoutHeader;
478  }
479
480  /** Returns the offset of the previous block of the same type in the file, or -1 if unknown */
481  long getPrevBlockOffset() {
482    return prevBlockOffset;
483  }
484
485  /**
486   * Rewinds {@code buf} and writes first 4 header fields. {@code buf} position is modified as
487   * side-effect.
488   */
489  private void overwriteHeader() {
490    bufWithoutChecksum.rewind();
491    blockType.write(bufWithoutChecksum);
492    bufWithoutChecksum.putInt(onDiskSizeWithoutHeader);
493    bufWithoutChecksum.putInt(uncompressedSizeWithoutHeader);
494    bufWithoutChecksum.putLong(prevBlockOffset);
495    if (this.fileContext.isUseHBaseChecksum()) {
496      bufWithoutChecksum.put(fileContext.getChecksumType().getCode());
497      bufWithoutChecksum.putInt(fileContext.getBytesPerChecksum());
498      bufWithoutChecksum.putInt(onDiskDataSizeWithHeader);
499    }
500  }
501
502  /**
503   * Returns a buffer that does not include the header and checksum.
504   * @return the buffer with header skipped and checksum omitted.
505   */
506  public ByteBuff getBufferWithoutHeader() {
507    ByteBuff dup = getBufferReadOnly();
508    return dup.position(headerSize()).slice();
509  }
510
511  /**
512   * Returns a read-only duplicate of the buffer this block stores internally ready to be read.
513   * Clients must not modify the buffer object though they may set position and limit on the
514   * returned buffer since we pass back a duplicate. This method has to be public because it is used
515   * in {@link CompoundBloomFilter} to avoid object creation on every Bloom filter lookup, but has
516   * to be used with caution. Buffer holds header, block content, and any follow-on checksums if
517   * present.
518   * @return the buffer of this block for read-only operations,the buffer includes header,but not
519   *         checksum.
520   */
521  public ByteBuff getBufferReadOnly() {
522    // TODO: ByteBuf does not support asReadOnlyBuffer(). Fix.
523    ByteBuff dup = this.bufWithoutChecksum.duplicate();
524    assert dup.position() == 0;
525    return dup;
526  }
527
528  public ByteBuffAllocator getByteBuffAllocator() {
529    return this.allocator;
530  }
531
532  private void sanityCheckAssertion(long valueFromBuf, long valueFromField, String fieldName)
533    throws IOException {
534    if (valueFromBuf != valueFromField) {
535      throw new AssertionError(fieldName + " in the buffer (" + valueFromBuf
536        + ") is different from that in the field (" + valueFromField + ")");
537    }
538  }
539
540  private void sanityCheckAssertion(BlockType valueFromBuf, BlockType valueFromField)
541    throws IOException {
542    if (valueFromBuf != valueFromField) {
543      throw new IOException("Block type stored in the buffer: " + valueFromBuf
544        + ", block type field: " + valueFromField);
545    }
546  }
547
548  /**
549   * Checks if the block is internally consistent, i.e. the first
550   * {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the buffer contain a valid header consistent
551   * with the fields. Assumes a packed block structure. This function is primary for testing and
552   * debugging, and is not thread-safe, because it alters the internal buffer pointer. Used by tests
553   * only.
554   */
555  void sanityCheck() throws IOException {
556    // Duplicate so no side-effects
557    ByteBuff dup = this.bufWithoutChecksum.duplicate().rewind();
558    sanityCheckAssertion(BlockType.read(dup), blockType);
559
560    sanityCheckAssertion(dup.getInt(), onDiskSizeWithoutHeader, "onDiskSizeWithoutHeader");
561
562    sanityCheckAssertion(dup.getInt(), uncompressedSizeWithoutHeader,
563      "uncompressedSizeWithoutHeader");
564
565    sanityCheckAssertion(dup.getLong(), prevBlockOffset, "prevBlockOffset");
566    if (this.fileContext.isUseHBaseChecksum()) {
567      sanityCheckAssertion(dup.get(), this.fileContext.getChecksumType().getCode(), "checksumType");
568      sanityCheckAssertion(dup.getInt(), this.fileContext.getBytesPerChecksum(),
569        "bytesPerChecksum");
570      sanityCheckAssertion(dup.getInt(), onDiskDataSizeWithHeader, "onDiskDataSizeWithHeader");
571    }
572
573    if (dup.limit() != onDiskDataSizeWithHeader) {
574      throw new AssertionError(
575        "Expected limit " + onDiskDataSizeWithHeader + ", got " + dup.limit());
576    }
577
578    // We might optionally allocate HFILEBLOCK_HEADER_SIZE more bytes to read the next
579    // block's header, so there are two sensible values for buffer capacity.
580    int hdrSize = headerSize();
581    dup.rewind();
582    if (
583      dup.remaining() != onDiskDataSizeWithHeader
584        && dup.remaining() != onDiskDataSizeWithHeader + hdrSize
585    ) {
586      throw new AssertionError("Invalid buffer capacity: " + dup.remaining() + ", expected "
587        + onDiskDataSizeWithHeader + " or " + (onDiskDataSizeWithHeader + hdrSize));
588    }
589  }
590
591  @Override
592  public String toString() {
593    StringBuilder sb = new StringBuilder().append("[").append("blockType=").append(blockType)
594      .append(", fileOffset=").append(offset).append(", headerSize=").append(headerSize())
595      .append(", onDiskSizeWithoutHeader=").append(onDiskSizeWithoutHeader)
596      .append(", uncompressedSizeWithoutHeader=").append(uncompressedSizeWithoutHeader)
597      .append(", prevBlockOffset=").append(prevBlockOffset).append(", isUseHBaseChecksum=")
598      .append(fileContext.isUseHBaseChecksum());
599    if (fileContext.isUseHBaseChecksum()) {
600      sb.append(", checksumType=").append(ChecksumType.codeToType(this.bufWithoutChecksum.get(24)))
601        .append(", bytesPerChecksum=").append(this.bufWithoutChecksum.getInt(24 + 1))
602        .append(", onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader);
603    } else {
604      sb.append(", onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader).append("(")
605        .append(onDiskSizeWithoutHeader).append("+")
606        .append(HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM).append(")");
607    }
608    String dataBegin;
609    if (bufWithoutChecksum.hasArray()) {
610      dataBegin = Bytes.toStringBinary(bufWithoutChecksum.array(),
611        bufWithoutChecksum.arrayOffset() + headerSize(),
612        Math.min(32, bufWithoutChecksum.limit() - bufWithoutChecksum.arrayOffset() - headerSize()));
613    } else {
614      ByteBuff bufWithoutHeader = getBufferWithoutHeader();
615      byte[] dataBeginBytes =
616        new byte[Math.min(32, bufWithoutHeader.limit() - bufWithoutHeader.position())];
617      bufWithoutHeader.get(dataBeginBytes);
618      dataBegin = Bytes.toStringBinary(dataBeginBytes);
619    }
620    sb.append(", getOnDiskSizeWithHeader=").append(getOnDiskSizeWithHeader())
621      .append(", totalChecksumBytes=").append(totalChecksumBytes()).append(", isUnpacked=")
622      .append(isUnpacked()).append(", buf=[").append(bufWithoutChecksum).append("]")
623      .append(", dataBeginsWith=").append(dataBegin).append(", fileContext=").append(fileContext)
624      .append(", nextBlockOnDiskSize=").append(nextBlockOnDiskSize).append("]");
625    return sb.toString();
626  }
627
628  /**
629   * Retrieves the decompressed/decrypted view of this block. An encoded block remains in its
630   * encoded structure. Internal structures are shared between instances where applicable.
631   */
632  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST)
633  public HFileBlock unpack(HFileContext fileContext, FSReader reader) throws IOException {
634    if (!fileContext.isCompressedOrEncrypted()) {
635      // TODO: cannot use our own fileContext here because HFileBlock(ByteBuffer, boolean),
636      // which is used for block serialization to L2 cache, does not preserve encoding and
637      // encryption details.
638      return this;
639    }
640
641    ByteBuff newBuf = allocateBufferForUnpacking(); // allocates space for the decompressed block
642    HFileBlock unpacked = shallowClone(this, newBuf);
643
644    boolean succ = false;
645    final Context context =
646      Context.current().with(CONTEXT_KEY, new HFileContextAttributesBuilderConsumer(fileContext));
647    try (Scope ignored = context.makeCurrent()) {
648      HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA
649        ? reader.getBlockDecodingContext()
650        : reader.getDefaultBlockDecodingContext();
651      // Create a duplicated buffer without the header part.
652      int headerSize = this.headerSize();
653      ByteBuff dup = this.bufWithoutChecksum.duplicate();
654      dup.position(headerSize);
655      dup = dup.slice();
656      // Decode the dup into unpacked#buf
657      ctx.prepareDecoding(unpacked.getOnDiskDataSizeWithHeader() - headerSize,
658        unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(), dup);
659      succ = true;
660      return unpacked;
661    } finally {
662      if (!succ) {
663        unpacked.release();
664      }
665    }
666  }
667
668  /**
669   * Always allocates a new buffer of the correct size. Copies header bytes from the existing
670   * buffer. Does not change header fields. Reserve room to keep checksum bytes too.
671   */
672  private ByteBuff allocateBufferForUnpacking() {
673    int headerSize = headerSize();
674    int capacityNeeded = headerSize + uncompressedSizeWithoutHeader;
675
676    ByteBuff source = bufWithoutChecksum.duplicate();
677    ByteBuff newBuf = allocator.allocate(capacityNeeded);
678
679    // Copy header bytes into newBuf.
680    source.position(0);
681    newBuf.put(0, source, 0, headerSize);
682
683    // set limit to exclude next block's header
684    newBuf.limit(capacityNeeded);
685    return newBuf;
686  }
687
688  /**
689   * Return true when this block's buffer has been unpacked, false otherwise. Note this is a
690   * calculated heuristic, not tracked attribute of the block.
691   */
692  public boolean isUnpacked() {
693    final int headerSize = headerSize();
694    final int expectedCapacity = headerSize + uncompressedSizeWithoutHeader;
695    final int bufCapacity = bufWithoutChecksum.remaining();
696    return bufCapacity == expectedCapacity || bufCapacity == expectedCapacity + headerSize;
697  }
698
699  /**
700   * Cannot be {@link #UNSET}. Must be a legitimate value. Used re-making the {@link BlockCacheKey}
701   * when block is returned to the cache.
702   * @return the offset of this block in the file it was read from
703   */
704  public long getOffset() {
705    if (offset < 0) {
706      throw new IllegalStateException("HFile block offset not initialized properly");
707    }
708    return offset;
709  }
710
711  /** Returns a byte stream reading the data(excluding header and checksum) of this block */
712  DataInputStream getByteStream() {
713    ByteBuff dup = this.bufWithoutChecksum.duplicate();
714    dup.position(this.headerSize());
715    return new DataInputStream(new ByteBuffInputStream(dup));
716  }
717
718  @Override
719  public long heapSize() {
720    long size = FIXED_OVERHEAD;
721    size += fileContext.heapSize();
722    if (bufWithoutChecksum != null) {
723      // Deep overhead of the byte buffer. Needs to be aligned separately.
724      size += ClassSize.align(bufWithoutChecksum.capacity() + MULTI_BYTE_BUFFER_HEAP_SIZE);
725    }
726    return ClassSize.align(size);
727  }
728
729  /**
730   * Will be override by {@link SharedMemHFileBlock} or {@link ExclusiveMemHFileBlock}. Return true
731   * by default.
732   */
733  public boolean isSharedMem() {
734    return true;
735  }
736
737  /**
738   * Unified version 2 {@link HFile} block writer. The intended usage pattern is as follows:
739   * <ol>
740   * <li>Construct an {@link HFileBlock.Writer}, providing a compression algorithm.
741   * <li>Call {@link Writer#startWriting} and get a data stream to write to.
742   * <li>Write your data into the stream.
743   * <li>Call Writer#writeHeaderAndData(FSDataOutputStream) as many times as you need to. store the
744   * serialized block into an external stream.
745   * <li>Repeat to write more blocks.
746   * </ol>
747   * <p>
748   */
749  static class Writer implements ShipperListener {
750    private enum State {
751      INIT,
752      WRITING,
753      BLOCK_READY
754    }
755
756    private int maxSizeUnCompressed;
757
758    private BlockCompressedSizePredicator compressedSizePredicator;
759
760    /** Writer state. Used to ensure the correct usage protocol. */
761    private State state = State.INIT;
762
763    /** Data block encoder used for data blocks */
764    private final HFileDataBlockEncoder dataBlockEncoder;
765
766    private HFileBlockEncodingContext dataBlockEncodingCtx;
767
768    /** block encoding context for non-data blocks */
769    private HFileBlockDefaultEncodingContext defaultBlockEncodingCtx;
770
771    /**
772     * The stream we use to accumulate data into a block in an uncompressed format. We reset this
773     * stream at the end of each block and reuse it. The header is written as the first
774     * {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes into this stream.
775     */
776    private ByteArrayOutputStream baosInMemory;
777
778    /**
779     * Current block type. Set in {@link #startWriting(BlockType)}. Could be changed in
780     * {@link #finishBlock()} from {@link BlockType#DATA} to {@link BlockType#ENCODED_DATA}.
781     */
782    private BlockType blockType;
783
784    /**
785     * A stream that we write uncompressed bytes to, which compresses them and writes them to
786     * {@link #baosInMemory}.
787     */
788    private DataOutputStream userDataStream;
789
790    /**
791     * Bytes to be written to the file system, including the header. Compressed if compression is
792     * turned on. It also includes the checksum data that immediately follows the block data.
793     * (header + data + checksums)
794     */
795    private ByteArrayOutputStream onDiskBlockBytesWithHeader;
796
797    /**
798     * The size of the checksum data on disk. It is used only if data is not compressed. If data is
799     * compressed, then the checksums are already part of onDiskBytesWithHeader. If data is
800     * uncompressed, then this variable stores the checksum data for this block.
801     */
802    private byte[] onDiskChecksum = HConstants.EMPTY_BYTE_ARRAY;
803
804    /**
805     * Current block's start offset in the {@link HFile}. Set in
806     * {@link #writeHeaderAndData(FSDataOutputStream)}.
807     */
808    private long startOffset;
809
810    /**
811     * Offset of previous block by block type. Updated when the next block is started.
812     */
813    private long[] prevOffsetByType;
814
815    /** The offset of the previous block of the same type */
816    private long prevOffset;
817    /** Meta data that holds information about the hfileblock **/
818    private HFileContext fileContext;
819
820    private final ByteBuffAllocator allocator;
821
822    @Override
823    public void beforeShipped() {
824      if (getEncodingState() != null) {
825        getEncodingState().beforeShipped();
826      }
827    }
828
829    EncodingState getEncodingState() {
830      return dataBlockEncodingCtx.getEncodingState();
831    }
832
833    /**
834     * @param dataBlockEncoder data block encoding algorithm to use
835     */
836    public Writer(Configuration conf, HFileDataBlockEncoder dataBlockEncoder,
837      HFileContext fileContext) {
838      this(conf, dataBlockEncoder, fileContext, ByteBuffAllocator.HEAP, fileContext.getBlocksize());
839    }
840
841    public Writer(Configuration conf, HFileDataBlockEncoder dataBlockEncoder,
842      HFileContext fileContext, ByteBuffAllocator allocator, int maxSizeUnCompressed) {
843      if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) {
844        throw new RuntimeException("Unsupported value of bytesPerChecksum. " + " Minimum is "
845          + HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is "
846          + fileContext.getBytesPerChecksum());
847      }
848      this.allocator = allocator;
849      this.dataBlockEncoder =
850        dataBlockEncoder != null ? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE;
851      this.dataBlockEncodingCtx = this.dataBlockEncoder.newDataBlockEncodingContext(conf,
852        HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
853      // TODO: This should be lazily instantiated
854      this.defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(conf, null,
855        HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
856      // TODO: Set BAOS initial size. Use fileContext.getBlocksize() and add for header/checksum
857      baosInMemory = new ByteArrayOutputStream();
858      prevOffsetByType = new long[BlockType.values().length];
859      for (int i = 0; i < prevOffsetByType.length; ++i) {
860        prevOffsetByType[i] = UNSET;
861      }
862      // TODO: Why fileContext saved away when we have dataBlockEncoder and/or
863      // defaultDataBlockEncoder?
864      this.fileContext = fileContext;
865      this.compressedSizePredicator = (BlockCompressedSizePredicator) ReflectionUtils.newInstance(
866        conf.getClass(BLOCK_COMPRESSED_SIZE_PREDICATOR, UncompressedBlockSizePredicator.class),
867        new Configuration(conf));
868      this.maxSizeUnCompressed = maxSizeUnCompressed;
869    }
870
871    /**
872     * Starts writing into the block. The previous block's data is discarded.
873     * @return the stream the user can write their data into
874     */
875    DataOutputStream startWriting(BlockType newBlockType) throws IOException {
876      if (state == State.BLOCK_READY && startOffset != -1) {
877        // We had a previous block that was written to a stream at a specific
878        // offset. Save that offset as the last offset of a block of that type.
879        prevOffsetByType[blockType.getId()] = startOffset;
880      }
881
882      startOffset = -1;
883      blockType = newBlockType;
884
885      baosInMemory.reset();
886      baosInMemory.write(HConstants.HFILEBLOCK_DUMMY_HEADER);
887
888      state = State.WRITING;
889
890      // We will compress it later in finishBlock()
891      userDataStream = new ByteBufferWriterDataOutputStream(baosInMemory);
892      if (newBlockType == BlockType.DATA) {
893        this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream);
894      }
895      return userDataStream;
896    }
897
898    /**
899     * Writes the Cell to this block
900     */
901    void write(ExtendedCell cell) throws IOException {
902      expectState(State.WRITING);
903      this.dataBlockEncoder.encode(cell, dataBlockEncodingCtx, this.userDataStream);
904    }
905
906    /**
907     * Transitions the block writer from the "writing" state to the "block ready" state. Does
908     * nothing if a block is already finished.
909     */
910    void ensureBlockReady() throws IOException {
911      Preconditions.checkState(state != State.INIT, "Unexpected state: " + state);
912
913      if (state == State.BLOCK_READY) {
914        return;
915      }
916
917      // This will set state to BLOCK_READY.
918      finishBlock();
919    }
920
921    public boolean checkBoundariesWithPredicate() {
922      int rawBlockSize = encodedBlockSizeWritten();
923      if (rawBlockSize >= maxSizeUnCompressed) {
924        return true;
925      } else {
926        return compressedSizePredicator.shouldFinishBlock(rawBlockSize);
927      }
928    }
929
930    /**
931     * Finish up writing of the block. Flushes the compressing stream (if using compression), fills
932     * out the header, does any compression/encryption of bytes to flush out to disk, and manages
933     * the cache on write content, if applicable. Sets block write state to "block ready".
934     */
935    private void finishBlock() throws IOException {
936      if (blockType == BlockType.DATA) {
937        this.dataBlockEncoder.endBlockEncoding(dataBlockEncodingCtx, userDataStream,
938          baosInMemory.getBuffer(), blockType);
939        blockType = dataBlockEncodingCtx.getBlockType();
940      }
941      userDataStream.flush();
942      prevOffset = prevOffsetByType[blockType.getId()];
943
944      // We need to cache the unencoded/uncompressed size before changing the block state
945      int rawBlockSize = 0;
946      if (this.getEncodingState() != null) {
947        rawBlockSize = encodedBlockSizeWritten();
948      }
949      // We need to set state before we can package the block up for cache-on-write. In a way, the
950      // block is ready, but not yet encoded or compressed.
951      state = State.BLOCK_READY;
952      Bytes compressAndEncryptDat;
953      if (blockType == BlockType.DATA || blockType == BlockType.ENCODED_DATA) {
954        compressAndEncryptDat =
955          dataBlockEncodingCtx.compressAndEncrypt(baosInMemory.getBuffer(), 0, baosInMemory.size());
956      } else {
957        compressAndEncryptDat = defaultBlockEncodingCtx.compressAndEncrypt(baosInMemory.getBuffer(),
958          0, baosInMemory.size());
959      }
960      if (compressAndEncryptDat == null) {
961        compressAndEncryptDat = new Bytes(baosInMemory.getBuffer(), 0, baosInMemory.size());
962      }
963      if (onDiskBlockBytesWithHeader == null) {
964        onDiskBlockBytesWithHeader = new ByteArrayOutputStream(compressAndEncryptDat.getLength());
965      }
966      onDiskBlockBytesWithHeader.reset();
967      onDiskBlockBytesWithHeader.write(compressAndEncryptDat.get(),
968        compressAndEncryptDat.getOffset(), compressAndEncryptDat.getLength());
969      // Update raw and compressed sizes in the predicate
970      compressedSizePredicator.updateLatestBlockSizes(fileContext, rawBlockSize,
971        onDiskBlockBytesWithHeader.size());
972
973      // Calculate how many bytes we need for checksum on the tail of the block.
974      int numBytes = (int) ChecksumUtil.numBytes(onDiskBlockBytesWithHeader.size(),
975        fileContext.getBytesPerChecksum());
976
977      // Put the header for the on disk bytes; header currently is unfilled-out
978      putHeader(onDiskBlockBytesWithHeader, onDiskBlockBytesWithHeader.size() + numBytes,
979        baosInMemory.size(), onDiskBlockBytesWithHeader.size());
980
981      if (onDiskChecksum.length != numBytes) {
982        onDiskChecksum = new byte[numBytes];
983      }
984      ChecksumUtil.generateChecksums(onDiskBlockBytesWithHeader.getBuffer(), 0,
985        onDiskBlockBytesWithHeader.size(), onDiskChecksum, 0, fileContext.getChecksumType(),
986        fileContext.getBytesPerChecksum());
987    }
988
989    /**
990     * Put the header into the given byte array at the given offset.
991     * @param onDiskSize       size of the block on disk header + data + checksum
992     * @param uncompressedSize size of the block after decompression (but before optional data block
993     *                         decoding) including header
994     * @param onDiskDataSize   size of the block on disk with header and data but not including the
995     *                         checksums
996     */
997    private void putHeader(byte[] dest, int offset, int onDiskSize, int uncompressedSize,
998      int onDiskDataSize) {
999      offset = blockType.put(dest, offset);
1000      offset = Bytes.putInt(dest, offset, onDiskSize - HConstants.HFILEBLOCK_HEADER_SIZE);
1001      offset = Bytes.putInt(dest, offset, uncompressedSize - HConstants.HFILEBLOCK_HEADER_SIZE);
1002      offset = Bytes.putLong(dest, offset, prevOffset);
1003      offset = Bytes.putByte(dest, offset, fileContext.getChecksumType().getCode());
1004      offset = Bytes.putInt(dest, offset, fileContext.getBytesPerChecksum());
1005      Bytes.putInt(dest, offset, onDiskDataSize);
1006    }
1007
1008    private void putHeader(ByteBuff buff, int onDiskSize, int uncompressedSize,
1009      int onDiskDataSize) {
1010      buff.rewind();
1011      blockType.write(buff);
1012      buff.putInt(onDiskSize - HConstants.HFILEBLOCK_HEADER_SIZE);
1013      buff.putInt(uncompressedSize - HConstants.HFILEBLOCK_HEADER_SIZE);
1014      buff.putLong(prevOffset);
1015      buff.put(fileContext.getChecksumType().getCode());
1016      buff.putInt(fileContext.getBytesPerChecksum());
1017      buff.putInt(onDiskDataSize);
1018    }
1019
1020    private void putHeader(ByteArrayOutputStream dest, int onDiskSize, int uncompressedSize,
1021      int onDiskDataSize) {
1022      putHeader(dest.getBuffer(), 0, onDiskSize, uncompressedSize, onDiskDataSize);
1023    }
1024
1025    /**
1026     * Similar to {@link #writeHeaderAndData(FSDataOutputStream)}, but records the offset of this
1027     * block so that it can be referenced in the next block of the same type.
1028     */
1029    void writeHeaderAndData(FSDataOutputStream out) throws IOException {
1030      long offset = out.getPos();
1031      if (startOffset != UNSET && offset != startOffset) {
1032        throw new IOException("A " + blockType + " block written to a "
1033          + "stream twice, first at offset " + startOffset + ", then at " + offset);
1034      }
1035      startOffset = offset;
1036      finishBlockAndWriteHeaderAndData(out);
1037    }
1038
1039    /**
1040     * Writes the header and the compressed data of this block (or uncompressed data when not using
1041     * compression) into the given stream. Can be called in the "writing" state or in the "block
1042     * ready" state. If called in the "writing" state, transitions the writer to the "block ready"
1043     * state.
1044     * @param out the output stream to write the
1045     */
1046    protected void finishBlockAndWriteHeaderAndData(DataOutputStream out) throws IOException {
1047      ensureBlockReady();
1048      long startTime = EnvironmentEdgeManager.currentTime();
1049      out.write(onDiskBlockBytesWithHeader.getBuffer(), 0, onDiskBlockBytesWithHeader.size());
1050      out.write(onDiskChecksum);
1051      HFile.updateWriteLatency(EnvironmentEdgeManager.currentTime() - startTime);
1052    }
1053
1054    /**
1055     * Returns the header or the compressed data (or uncompressed data when not using compression)
1056     * as a byte array. Can be called in the "writing" state or in the "block ready" state. If
1057     * called in the "writing" state, transitions the writer to the "block ready" state. This
1058     * returns the header + data + checksums stored on disk.
1059     * @return header and data as they would be stored on disk in a byte array
1060     */
1061    byte[] getHeaderAndDataForTest() throws IOException {
1062      ensureBlockReady();
1063      // This is not very optimal, because we are doing an extra copy.
1064      // But this method is used only by unit tests.
1065      byte[] output = new byte[onDiskBlockBytesWithHeader.size() + onDiskChecksum.length];
1066      System.arraycopy(onDiskBlockBytesWithHeader.getBuffer(), 0, output, 0,
1067        onDiskBlockBytesWithHeader.size());
1068      System.arraycopy(onDiskChecksum, 0, output, onDiskBlockBytesWithHeader.size(),
1069        onDiskChecksum.length);
1070      return output;
1071    }
1072
1073    /**
1074     * Releases resources used by this writer.
1075     */
1076    void release() {
1077      if (dataBlockEncodingCtx != null) {
1078        dataBlockEncodingCtx.close();
1079        dataBlockEncodingCtx = null;
1080      }
1081      if (defaultBlockEncodingCtx != null) {
1082        defaultBlockEncodingCtx.close();
1083        defaultBlockEncodingCtx = null;
1084      }
1085    }
1086
1087    /**
1088     * Returns the on-disk size of the data portion of the block. This is the compressed size if
1089     * compression is enabled. Can only be called in the "block ready" state. Header is not
1090     * compressed, and its size is not included in the return value.
1091     * @return the on-disk size of the block, not including the header.
1092     */
1093    int getOnDiskSizeWithoutHeader() {
1094      expectState(State.BLOCK_READY);
1095      return onDiskBlockBytesWithHeader.size() + onDiskChecksum.length
1096        - HConstants.HFILEBLOCK_HEADER_SIZE;
1097    }
1098
1099    /**
1100     * Returns the on-disk size of the block. Can only be called in the "block ready" state.
1101     * @return the on-disk size of the block ready to be written, including the header size, the
1102     *         data and the checksum data.
1103     */
1104    int getOnDiskSizeWithHeader() {
1105      expectState(State.BLOCK_READY);
1106      return onDiskBlockBytesWithHeader.size() + onDiskChecksum.length;
1107    }
1108
1109    /**
1110     * The uncompressed size of the block data. Does not include header size.
1111     */
1112    int getUncompressedSizeWithoutHeader() {
1113      expectState(State.BLOCK_READY);
1114      return baosInMemory.size() - HConstants.HFILEBLOCK_HEADER_SIZE;
1115    }
1116
1117    /**
1118     * The uncompressed size of the block data, including header size.
1119     */
1120    public int getUncompressedSizeWithHeader() {
1121      expectState(State.BLOCK_READY);
1122      return baosInMemory.size();
1123    }
1124
1125    /** Returns true if a block is being written */
1126    boolean isWriting() {
1127      return state == State.WRITING;
1128    }
1129
1130    /**
1131     * Returns the number of bytes written into the current block so far, or zero if not writing the
1132     * block at the moment. Note that this will return zero in the "block ready" state as well.
1133     * @return the number of bytes written
1134     */
1135    public int encodedBlockSizeWritten() {
1136      return state != State.WRITING ? 0 : this.getEncodingState().getEncodedDataSizeWritten();
1137    }
1138
1139    /**
1140     * Returns the number of bytes written into the current block so far, or zero if not writing the
1141     * block at the moment. Note that this will return zero in the "block ready" state as well.
1142     * @return the number of bytes written
1143     */
1144    public int blockSizeWritten() {
1145      return state != State.WRITING ? 0 : this.getEncodingState().getUnencodedDataSizeWritten();
1146    }
1147
1148    /**
1149     * Clones the header followed by the uncompressed data, even if using compression. This is
1150     * needed for storing uncompressed blocks in the block cache. Can be called in the "writing"
1151     * state or the "block ready" state. Returns only the header and data, does not include checksum
1152     * data.
1153     * @return Returns an uncompressed block ByteBuff for caching on write
1154     */
1155    ByteBuff cloneUncompressedBufferWithHeader() {
1156      expectState(State.BLOCK_READY);
1157      ByteBuff bytebuff = allocator.allocate(baosInMemory.size());
1158      baosInMemory.toByteBuff(bytebuff);
1159      int numBytes = (int) ChecksumUtil.numBytes(onDiskBlockBytesWithHeader.size(),
1160        fileContext.getBytesPerChecksum());
1161      putHeader(bytebuff, onDiskBlockBytesWithHeader.size() + numBytes, baosInMemory.size(),
1162        onDiskBlockBytesWithHeader.size());
1163      bytebuff.rewind();
1164      return bytebuff;
1165    }
1166
1167    /**
1168     * Clones the header followed by the on-disk (compressed/encoded/encrypted) data. This is needed
1169     * for storing packed blocks in the block cache. Returns only the header and data, Does not
1170     * include checksum data.
1171     * @return Returns a copy of block bytes for caching on write
1172     */
1173    private ByteBuff cloneOnDiskBufferWithHeader() {
1174      expectState(State.BLOCK_READY);
1175      ByteBuff bytebuff = allocator.allocate(onDiskBlockBytesWithHeader.size());
1176      onDiskBlockBytesWithHeader.toByteBuff(bytebuff);
1177      bytebuff.rewind();
1178      return bytebuff;
1179    }
1180
1181    private void expectState(State expectedState) {
1182      if (state != expectedState) {
1183        throw new IllegalStateException(
1184          "Expected state: " + expectedState + ", actual state: " + state);
1185      }
1186    }
1187
1188    /**
1189     * Takes the given {@link BlockWritable} instance, creates a new block of its appropriate type,
1190     * writes the writable into this block, and flushes the block into the output stream. The writer
1191     * is instructed not to buffer uncompressed bytes for cache-on-write.
1192     * @param bw  the block-writable object to write as a block
1193     * @param out the file system output stream
1194     */
1195    void writeBlock(BlockWritable bw, FSDataOutputStream out) throws IOException {
1196      bw.writeToBlock(startWriting(bw.getBlockType()));
1197      writeHeaderAndData(out);
1198    }
1199
1200    /**
1201     * Creates a new HFileBlock. Checksums have already been validated, so the byte buffer passed
1202     * into the constructor of this newly created block does not have checksum data even though the
1203     * header minor version is MINOR_VERSION_WITH_CHECKSUM. This is indicated by setting a 0 value
1204     * in bytesPerChecksum. This method copies the on-disk or uncompressed data to build the
1205     * HFileBlock which is used only while writing blocks and caching.
1206     * <p>
1207     * TODO: Should there be an option where a cache can ask that hbase preserve block checksums for
1208     * checking after a block comes out of the cache? Otehrwise, cache is responsible for blocks
1209     * being wholesome (ECC memory or if file-backed, it does checksumming).
1210     */
1211    HFileBlock getBlockForCaching(CacheConfig cacheConf) {
1212      HFileContext newContext = BlockCacheUtil.cloneContext(fileContext);
1213      // Build the HFileBlock.
1214      HFileBlockBuilder builder = new HFileBlockBuilder();
1215      ByteBuff buff;
1216      if (cacheConf.shouldCacheCompressed(blockType.getCategory())) {
1217        buff = cloneOnDiskBufferWithHeader();
1218      } else {
1219        buff = cloneUncompressedBufferWithHeader();
1220      }
1221      return builder.withBlockType(blockType)
1222        .withOnDiskSizeWithoutHeader(getOnDiskSizeWithoutHeader())
1223        .withUncompressedSizeWithoutHeader(getUncompressedSizeWithoutHeader())
1224        .withPrevBlockOffset(prevOffset).withByteBuff(buff).withFillHeader(FILL_HEADER)
1225        .withOffset(startOffset).withNextBlockOnDiskSize(UNSET)
1226        .withOnDiskDataSizeWithHeader(onDiskBlockBytesWithHeader.size() + onDiskChecksum.length)
1227        .withHFileContext(newContext).withByteBuffAllocator(cacheConf.getByteBuffAllocator())
1228        .withShared(!buff.hasArray()).build();
1229    }
1230  }
1231
1232  /** Something that can be written into a block. */
1233  interface BlockWritable {
1234    /** The type of block this data should use. */
1235    BlockType getBlockType();
1236
1237    /**
1238     * Writes the block to the provided stream. Must not write any magic records.
1239     * @param out a stream to write uncompressed data into
1240     */
1241    void writeToBlock(DataOutput out) throws IOException;
1242  }
1243
1244  /**
1245   * Iterator for reading {@link HFileBlock}s in load-on-open-section, such as root data index
1246   * block, meta index block, file info block etc.
1247   */
1248  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST)
1249  public interface BlockIterator {
1250    /**
1251     * Get the next block, or null if there are no more blocks to iterate.
1252     */
1253    HFileBlock nextBlock() throws IOException;
1254
1255    /**
1256     * Similar to {@link #nextBlock()} but checks block type, throws an exception if incorrect, and
1257     * returns the HFile block
1258     */
1259    HFileBlock nextBlockWithBlockType(BlockType blockType) throws IOException;
1260
1261    /**
1262     * Now we use the {@link ByteBuffAllocator} to manage the nio ByteBuffers for HFileBlocks, so we
1263     * must deallocate all of the ByteBuffers in the end life. the BlockIterator's life cycle is
1264     * starting from opening an HFileReader and stopped when the HFileReader#close, so we will keep
1265     * track all the read blocks until we call {@link BlockIterator#freeBlocks()} when closing the
1266     * HFileReader. Sum bytes of those blocks in load-on-open section should be quite small, so
1267     * tracking them should be OK.
1268     */
1269    void freeBlocks();
1270  }
1271
1272  /** An HFile block reader with iteration ability. */
1273  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST)
1274  public interface FSReader {
1275    /**
1276     * Reads the block at the given offset in the file with the given on-disk size and uncompressed
1277     * size.
1278     * @param offset        of the file to read
1279     * @param onDiskSize    the on-disk size of the entire block, including all applicable headers,
1280     *                      or -1 if unknown
1281     * @param pread         true to use pread, otherwise use the stream read.
1282     * @param updateMetrics update the metrics or not.
1283     * @param intoHeap      allocate the block's ByteBuff by {@link ByteBuffAllocator} or JVM heap.
1284     *                      For LRUBlockCache, we must ensure that the block to cache is an heap
1285     *                      one, because the memory occupation is based on heap now, also for
1286     *                      {@link CombinedBlockCache}, we use the heap LRUBlockCache as L1 cache to
1287     *                      cache small blocks such as IndexBlock or MetaBlock for faster access. So
1288     *                      introduce an flag here to decide whether allocate from JVM heap or not
1289     *                      so that we can avoid an extra off-heap to heap memory copy when using
1290     *                      LRUBlockCache. For most cases, we known what's the expected block type
1291     *                      we'll read, while for some special case (Example:
1292     *                      HFileReaderImpl#readNextDataBlock()), we cannot pre-decide what's the
1293     *                      expected block type, then we can only allocate block's ByteBuff from
1294     *                      {@link ByteBuffAllocator} firstly, and then when caching it in
1295     *                      {@link LruBlockCache} we'll check whether the ByteBuff is from heap or
1296     *                      not, if not then we'll clone it to an heap one and cache it.
1297     * @return the newly read block
1298     */
1299    HFileBlock readBlockData(long offset, long onDiskSize, boolean pread, boolean updateMetrics,
1300      boolean intoHeap) throws IOException;
1301
1302    /**
1303     * Creates a block iterator over the given portion of the {@link HFile}. The iterator returns
1304     * blocks starting with offset such that offset &lt;= startOffset &lt; endOffset. Returned
1305     * blocks are always unpacked. Used when no hfile index available; e.g. reading in the hfile
1306     * index blocks themselves on file open.
1307     * @param startOffset the offset of the block to start iteration with
1308     * @param endOffset   the offset to end iteration at (exclusive)
1309     * @return an iterator of blocks between the two given offsets
1310     */
1311    BlockIterator blockRange(long startOffset, long endOffset);
1312
1313    /** Closes the backing streams */
1314    void closeStreams() throws IOException;
1315
1316    /** Get a decoder for {@link BlockType#ENCODED_DATA} blocks from this file. */
1317    HFileBlockDecodingContext getBlockDecodingContext();
1318
1319    /** Get the default decoder for blocks from this file. */
1320    HFileBlockDecodingContext getDefaultBlockDecodingContext();
1321
1322    void setIncludesMemStoreTS(boolean includesMemstoreTS);
1323
1324    void setDataBlockEncoder(HFileDataBlockEncoder encoder, Configuration conf);
1325
1326    /**
1327     * To close the stream's socket. Note: This can be concurrently called from multiple threads and
1328     * implementation should take care of thread safety.
1329     */
1330    void unbufferStream();
1331  }
1332
1333  /**
1334   * Data-structure to use caching the header of the NEXT block. Only works if next read that comes
1335   * in here is next in sequence in this block. When we read, we read current block and the next
1336   * blocks' header. We do this so we have the length of the next block to read if the hfile index
1337   * is not available (rare, at hfile open only).
1338   */
1339  private static class PrefetchedHeader {
1340    long offset = -1;
1341    byte[] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE];
1342    final ByteBuff buf = new SingleByteBuff(ByteBuffer.wrap(header, 0, header.length));
1343
1344    @Override
1345    public String toString() {
1346      return "offset=" + this.offset + ", header=" + Bytes.toStringBinary(header);
1347    }
1348  }
1349
1350  /**
1351   * Reads version 2 HFile blocks from the filesystem.
1352   */
1353  static class FSReaderImpl implements FSReader {
1354    /**
1355     * The file system stream of the underlying {@link HFile} that does or doesn't do checksum
1356     * validations in the filesystem
1357     */
1358    private FSDataInputStreamWrapper streamWrapper;
1359
1360    private HFileBlockDecodingContext encodedBlockDecodingCtx;
1361
1362    /** Default context used when BlockType != {@link BlockType#ENCODED_DATA}. */
1363    private final HFileBlockDefaultDecodingContext defaultDecodingCtx;
1364
1365    /**
1366     * Cache of the NEXT header after this. Check it is indeed next blocks header before using it.
1367     * TODO: Review. This overread into next block to fetch next blocks header seems unnecessary
1368     * given we usually get the block size from the hfile index. Review!
1369     */
1370    private AtomicReference<PrefetchedHeader> prefetchedHeader =
1371      new AtomicReference<>(new PrefetchedHeader());
1372
1373    /** The size of the file we are reading from, or -1 if unknown. */
1374    private long fileSize;
1375
1376    /** The size of the header */
1377    protected final int hdrSize;
1378
1379    /** The filesystem used to access data */
1380    private HFileSystem hfs;
1381
1382    private HFileContext fileContext;
1383    // Cache the fileName
1384    private String pathName;
1385
1386    private final ByteBuffAllocator allocator;
1387
1388    private final Lock streamLock = new ReentrantLock();
1389
1390    private final boolean isPreadAllBytes;
1391
1392    private final long readWarnTime;
1393
1394    /**
1395     * If reading block cost time in milliseconds more than the threshold, a warning will be logged.
1396     */
1397    public static final String FS_READER_WARN_TIME_MS = "hbase.fs.reader.warn.time.ms";
1398
1399    FSReaderImpl(ReaderContext readerContext, HFileContext fileContext, ByteBuffAllocator allocator,
1400      Configuration conf) throws IOException {
1401      this.fileSize = readerContext.getFileSize();
1402      this.hfs = readerContext.getFileSystem();
1403      if (readerContext.getFilePath() != null) {
1404        this.pathName = readerContext.getFilePath().toString();
1405      }
1406      this.fileContext = fileContext;
1407      this.hdrSize = headerSize(fileContext.isUseHBaseChecksum());
1408      this.allocator = allocator;
1409
1410      this.streamWrapper = readerContext.getInputStreamWrapper();
1411      // Older versions of HBase didn't support checksum.
1412      this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum());
1413      defaultDecodingCtx = new HFileBlockDefaultDecodingContext(conf, fileContext);
1414      encodedBlockDecodingCtx = defaultDecodingCtx;
1415      isPreadAllBytes = readerContext.isPreadAllBytes();
1416      // Default warn threshold set to -1, it means skipping record the read block slow warning log.
1417      readWarnTime = conf.getLong(FS_READER_WARN_TIME_MS, -1L);
1418    }
1419
1420    @Override
1421    public BlockIterator blockRange(final long startOffset, final long endOffset) {
1422      final FSReader owner = this; // handle for inner class
1423      return new BlockIterator() {
1424        private volatile boolean freed = false;
1425        // Tracking all read blocks until we call freeBlocks.
1426        private List<HFileBlock> blockTracker = new ArrayList<>();
1427        private long offset = startOffset;
1428        // Cache length of next block. Current block has the length of next block in it.
1429        private long length = -1;
1430
1431        @Override
1432        public HFileBlock nextBlock() throws IOException {
1433          if (offset >= endOffset) {
1434            return null;
1435          }
1436          HFileBlock b = readBlockData(offset, length, false, false, true);
1437          offset += b.getOnDiskSizeWithHeader();
1438          length = b.getNextBlockOnDiskSize();
1439          HFileBlock uncompressed = b.unpack(fileContext, owner);
1440          if (uncompressed != b) {
1441            b.release(); // Need to release the compressed Block now.
1442          }
1443          blockTracker.add(uncompressed);
1444          return uncompressed;
1445        }
1446
1447        @Override
1448        public HFileBlock nextBlockWithBlockType(BlockType blockType) throws IOException {
1449          HFileBlock blk = nextBlock();
1450          if (blk.getBlockType() != blockType) {
1451            throw new IOException(
1452              "Expected block of type " + blockType + " but found " + blk.getBlockType());
1453          }
1454          return blk;
1455        }
1456
1457        @Override
1458        public void freeBlocks() {
1459          if (freed) {
1460            return;
1461          }
1462          blockTracker.forEach(HFileBlock::release);
1463          blockTracker = null;
1464          freed = true;
1465        }
1466      };
1467    }
1468
1469    /**
1470     * Does a positional read or a seek and read into the given byte buffer. We need take care that
1471     * we will call the {@link ByteBuff#release()} for every exit to deallocate the ByteBuffers,
1472     * otherwise the memory leak may happen.
1473     * @param dest              destination buffer
1474     * @param size              size of read
1475     * @param peekIntoNextBlock whether to read the next block's on-disk size
1476     * @param fileOffset        position in the stream to read at
1477     * @param pread             whether we should do a positional read
1478     * @param istream           The input source of data
1479     * @return true to indicate the destination buffer include the next block header, otherwise only
1480     *         include the current block data without the next block header.
1481     * @throws IOException if any IO error happen.
1482     */
1483    protected boolean readAtOffset(FSDataInputStream istream, ByteBuff dest, int size,
1484      boolean peekIntoNextBlock, long fileOffset, boolean pread) throws IOException {
1485      if (!pread) {
1486        // Seek + read. Better for scanning.
1487        istream.seek(fileOffset);
1488        long realOffset = istream.getPos();
1489        if (realOffset != fileOffset) {
1490          throw new IOException("Tried to seek to " + fileOffset + " to read " + size
1491            + " bytes, but pos=" + realOffset + " after seek");
1492        }
1493        if (!peekIntoNextBlock) {
1494          BlockIOUtils.readFully(dest, istream, size);
1495          return false;
1496        }
1497
1498        // Try to read the next block header
1499        if (!BlockIOUtils.readWithExtra(dest, istream, size, hdrSize)) {
1500          // did not read the next block header.
1501          return false;
1502        }
1503      } else {
1504        // Positional read. Better for random reads; or when the streamLock is already locked.
1505        int extraSize = peekIntoNextBlock ? hdrSize : 0;
1506        if (
1507          !BlockIOUtils.preadWithExtra(dest, istream, fileOffset, size, extraSize, isPreadAllBytes)
1508        ) {
1509          // did not read the next block header.
1510          return false;
1511        }
1512      }
1513      assert peekIntoNextBlock;
1514      return true;
1515    }
1516
1517    /**
1518     * Reads a version 2 block (version 1 blocks not supported and not expected). Tries to do as
1519     * little memory allocation as possible, using the provided on-disk size.
1520     * @param offset                the offset in the stream to read at
1521     * @param onDiskSizeWithHeaderL the on-disk size of the block, including the header, or -1 if
1522     *                              unknown; i.e. when iterating over blocks reading in the file
1523     *                              metadata info.
1524     * @param pread                 whether to use a positional read
1525     * @param updateMetrics         whether to update the metrics
1526     * @param intoHeap              allocate ByteBuff of block from heap or off-heap.
1527     * @see FSReader#readBlockData(long, long, boolean, boolean, boolean) for more details about the
1528     *      useHeap.
1529     */
1530    @Override
1531    public HFileBlock readBlockData(long offset, long onDiskSizeWithHeaderL, boolean pread,
1532      boolean updateMetrics, boolean intoHeap) throws IOException {
1533      // Get a copy of the current state of whether to validate
1534      // hbase checksums or not for this read call. This is not
1535      // thread-safe but the one constraint is that if we decide
1536      // to skip hbase checksum verification then we are
1537      // guaranteed to use hdfs checksum verification.
1538      boolean doVerificationThruHBaseChecksum = streamWrapper.shouldUseHBaseChecksum();
1539      FSDataInputStream is = streamWrapper.getStream(doVerificationThruHBaseChecksum);
1540      final Context context = Context.current().with(CONTEXT_KEY,
1541        new HFileContextAttributesBuilderConsumer(fileContext)
1542          .setSkipChecksum(doVerificationThruHBaseChecksum)
1543          .setReadType(pread ? ReadType.POSITIONAL_READ : ReadType.SEEK_PLUS_READ));
1544      try (Scope ignored = context.makeCurrent()) {
1545        HFileBlock blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread,
1546          doVerificationThruHBaseChecksum, updateMetrics, intoHeap);
1547        if (blk == null) {
1548          HFile.LOG.warn("HBase checksum verification failed for file {} at offset {} filesize {}."
1549            + " Retrying read with HDFS checksums turned on...", pathName, offset, fileSize);
1550
1551          if (!doVerificationThruHBaseChecksum) {
1552            String msg = "HBase checksum verification failed for file " + pathName + " at offset "
1553              + offset + " filesize " + fileSize + " but this cannot happen because doVerify is "
1554              + doVerificationThruHBaseChecksum;
1555            HFile.LOG.warn(msg);
1556            throw new IOException(msg); // cannot happen case here
1557          }
1558          HFile.CHECKSUM_FAILURES.increment(); // update metrics
1559
1560          // If we have a checksum failure, we fall back into a mode where
1561          // the next few reads use HDFS level checksums. We aim to make the
1562          // next CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD reads avoid
1563          // hbase checksum verification, but since this value is set without
1564          // holding any locks, it can so happen that we might actually do
1565          // a few more than precisely this number.
1566          is = this.streamWrapper.fallbackToFsChecksum(CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD);
1567          doVerificationThruHBaseChecksum = false;
1568          blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread,
1569            doVerificationThruHBaseChecksum, updateMetrics, intoHeap);
1570          if (blk != null) {
1571            HFile.LOG.warn(
1572              "HDFS checksum verification succeeded for file {} at offset {} filesize" + " {}",
1573              pathName, offset, fileSize);
1574          }
1575        }
1576        if (blk == null && !doVerificationThruHBaseChecksum) {
1577          String msg =
1578            "readBlockData failed, possibly due to " + "checksum verification failed for file "
1579              + pathName + " at offset " + offset + " filesize " + fileSize;
1580          HFile.LOG.warn(msg);
1581          throw new IOException(msg);
1582        }
1583
1584        // If there is a checksum mismatch earlier, then retry with
1585        // HBase checksums switched off and use HDFS checksum verification.
1586        // This triggers HDFS to detect and fix corrupt replicas. The
1587        // next checksumOffCount read requests will use HDFS checksums.
1588        // The decrementing of this.checksumOffCount is not thread-safe,
1589        // but it is harmless because eventually checksumOffCount will be
1590        // a negative number.
1591        streamWrapper.checksumOk();
1592        return blk;
1593      }
1594    }
1595
1596    /**
1597     * Check that checksumType on {@code headerBuf} read from a block header seems reasonable,
1598     * within the known value range.
1599     * @return {@code true} if the headerBuf is safe to proceed, {@code false} otherwise.
1600     */
1601    private boolean checkCheckSumTypeOnHeaderBuf(ByteBuff headerBuf) {
1602      if (headerBuf == null) {
1603        return true;
1604      }
1605      byte b = headerBuf.get(HFileBlock.Header.CHECKSUM_TYPE_INDEX);
1606      for (ChecksumType t : ChecksumType.values()) {
1607        if (t.getCode() == b) {
1608          return true;
1609        }
1610      }
1611      return false;
1612    }
1613
1614    /**
1615     * Check that {@code value} read from a block header seems reasonable, within a large margin of
1616     * error.
1617     * @return {@code true} if the value is safe to proceed, {@code false} otherwise.
1618     */
1619    private boolean checkOnDiskSizeWithHeader(int value) {
1620      if (value < 0) {
1621        if (LOG.isTraceEnabled()) {
1622          LOG.trace(
1623            "onDiskSizeWithHeader={}; value represents a size, so it should never be negative.",
1624            value);
1625        }
1626        return false;
1627      }
1628      if (value - hdrSize < 0) {
1629        if (LOG.isTraceEnabled()) {
1630          LOG.trace("onDiskSizeWithHeader={}, hdrSize={}; don't accept a value that is negative"
1631            + " after the header size is excluded.", value, hdrSize);
1632        }
1633        return false;
1634      }
1635      return true;
1636    }
1637
1638    /**
1639     * Check that {@code value} provided by the calling context seems reasonable, within a large
1640     * margin of error.
1641     * @return {@code true} if the value is safe to proceed, {@code false} otherwise.
1642     */
1643    private boolean checkCallerProvidedOnDiskSizeWithHeader(long value) {
1644      // same validation logic as is used by Math.toIntExact(long)
1645      int intValue = (int) value;
1646      if (intValue != value) {
1647        if (LOG.isTraceEnabled()) {
1648          LOG.trace("onDiskSizeWithHeaderL={}; value exceeds int size limits.", value);
1649        }
1650        return false;
1651      }
1652      if (intValue == -1) {
1653        // a magic value we expect to see.
1654        return true;
1655      }
1656      return checkOnDiskSizeWithHeader(intValue);
1657    }
1658
1659    /**
1660     * Check atomic reference cache for this block's header. Cache only good if next read coming
1661     * through is next in sequence in the block. We read next block's header on the tail of reading
1662     * the previous block to save a seek. Otherwise, we have to do a seek to read the header before
1663     * we can pull in the block OR we have to backup the stream because we over-read (the next
1664     * block's header).
1665     * @see PrefetchedHeader
1666     * @return The cached block header or null if not found.
1667     * @see #cacheNextBlockHeader(long, ByteBuff, int, int)
1668     */
1669    private ByteBuff getCachedHeader(final long offset) {
1670      PrefetchedHeader ph = this.prefetchedHeader.get();
1671      return ph != null && ph.offset == offset ? ph.buf : null;
1672    }
1673
1674    /**
1675     * Save away the next blocks header in atomic reference.
1676     * @see #getCachedHeader(long)
1677     * @see PrefetchedHeader
1678     */
1679    private void cacheNextBlockHeader(final long offset, ByteBuff onDiskBlock,
1680      int onDiskSizeWithHeader, int headerLength) {
1681      PrefetchedHeader ph = new PrefetchedHeader();
1682      ph.offset = offset;
1683      onDiskBlock.get(onDiskSizeWithHeader, ph.header, 0, headerLength);
1684      this.prefetchedHeader.set(ph);
1685    }
1686
1687    /**
1688     * Clear the cached value when its integrity is suspect.
1689     */
1690    private void invalidateNextBlockHeader() {
1691      prefetchedHeader.set(null);
1692    }
1693
1694    private int getNextBlockOnDiskSize(ByteBuff onDiskBlock, int onDiskSizeWithHeader) {
1695      return onDiskBlock.getIntAfterPosition(onDiskSizeWithHeader + BlockType.MAGIC_LENGTH)
1696        + hdrSize;
1697    }
1698
1699    private ByteBuff allocate(int size, boolean intoHeap) {
1700      return intoHeap ? HEAP.allocate(size) : allocator.allocate(size);
1701    }
1702
1703    /**
1704     * Reads a version 2 block.
1705     * @param offset                the offset in the stream to read at.
1706     * @param onDiskSizeWithHeaderL the on-disk size of the block, including the header and
1707     *                              checksums if present or -1 if unknown (as a long). Can be -1 if
1708     *                              we are doing raw iteration of blocks as when loading up file
1709     *                              metadata; i.e. the first read of a new file. Usually non-null
1710     *                              gotten from the file index.
1711     * @param pread                 whether to use a positional read
1712     * @param verifyChecksum        Whether to use HBase checksums. If HBase checksum is switched
1713     *                              off, then use HDFS checksum. Can also flip on/off reading same
1714     *                              file if we hit a troublesome patch in an hfile.
1715     * @param updateMetrics         whether need to update the metrics.
1716     * @param intoHeap              allocate the ByteBuff of block from heap or off-heap.
1717     * @return the HFileBlock or null if there is a HBase checksum mismatch
1718     */
1719    protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset,
1720      long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum, boolean updateMetrics,
1721      boolean intoHeap) throws IOException {
1722      final Span span = Span.current();
1723      final Attributes attributes = getReadDataBlockInternalAttributes(span);
1724      if (offset < 0) {
1725        throw new IOException("Invalid offset=" + offset + " trying to read " + "block (onDiskSize="
1726          + onDiskSizeWithHeaderL + ")");
1727      }
1728      if (!checkCallerProvidedOnDiskSizeWithHeader(onDiskSizeWithHeaderL)) {
1729        LOG.trace("Caller provided invalid onDiskSizeWithHeaderL={}", onDiskSizeWithHeaderL);
1730        onDiskSizeWithHeaderL = -1;
1731      }
1732      int onDiskSizeWithHeader = (int) onDiskSizeWithHeaderL;
1733
1734      // Try to use the cached header. Will serve us in rare case where onDiskSizeWithHeaderL==-1
1735      // and will save us having to seek the stream backwards to reread the header we
1736      // read the last time through here.
1737      ByteBuff headerBuf = getCachedHeader(offset);
1738      LOG.trace(
1739        "Reading {} at offset={}, pread={}, verifyChecksum={}, cachedHeader={}, "
1740          + "onDiskSizeWithHeader={}",
1741        this.fileContext.getHFileName(), offset, pread, verifyChecksum, headerBuf,
1742        onDiskSizeWithHeader);
1743      // This is NOT same as verifyChecksum. This latter is whether to do hbase
1744      // checksums. Can change with circumstances. The below flag is whether the
1745      // file has support for checksums (version 2+).
1746      boolean checksumSupport = this.fileContext.isUseHBaseChecksum();
1747      boolean isScanMetricsEnabled = ThreadLocalServerSideScanMetrics.isScanMetricsEnabled();
1748      long startTime = EnvironmentEdgeManager.currentTime();
1749      if (onDiskSizeWithHeader == -1) {
1750        // The caller does not know the block size. Need to get it from the header. If header was
1751        // not cached (see getCachedHeader above), need to seek to pull it in. This is costly
1752        // and should happen very rarely. Currently happens on open of a hfile reader where we
1753        // read the trailer blocks to pull in the indices. Otherwise, we are reading block sizes
1754        // out of the hfile index. To check, enable TRACE in this file and you'll get an exception
1755        // in a LOG every time we seek. See HBASE-17072 for more detail.
1756        if (headerBuf == null) {
1757          if (LOG.isTraceEnabled()) {
1758            LOG.trace("Extra seek to get block size!", new RuntimeException());
1759          }
1760          span.addEvent("Extra seek to get block size!", attributes);
1761          headerBuf = HEAP.allocate(hdrSize);
1762          readAtOffset(is, headerBuf, hdrSize, false, offset, pread);
1763          headerBuf.rewind();
1764          if (isScanMetricsEnabled) {
1765            ThreadLocalServerSideScanMetrics.addBytesReadFromFs(hdrSize);
1766          }
1767        }
1768        onDiskSizeWithHeader = getOnDiskSizeWithHeader(headerBuf, checksumSupport);
1769      }
1770
1771      // Inspect the header's checksumType for known valid values. If we don't find such a value,
1772      // assume that the bytes read are corrupted.We will clear the cached value and roll back to
1773      // HDFS checksum
1774      if (!checkCheckSumTypeOnHeaderBuf(headerBuf)) {
1775        if (verifyChecksum) {
1776          invalidateNextBlockHeader();
1777          span.addEvent("Falling back to HDFS checksumming.", attributes);
1778          return null;
1779        } else {
1780          throw new IOException(
1781            "Unknown checksum type code " + headerBuf.get(HFileBlock.Header.CHECKSUM_TYPE_INDEX)
1782              + "for file " + pathName + ", the headerBuf of HFileBlock may corrupted.");
1783        }
1784      }
1785
1786      // The common case is that onDiskSizeWithHeader was produced by a read without checksum
1787      // validation, so give it a sanity check before trying to use it.
1788      if (!checkOnDiskSizeWithHeader(onDiskSizeWithHeader)) {
1789        if (verifyChecksum) {
1790          invalidateNextBlockHeader();
1791          span.addEvent("Falling back to HDFS checksumming.", attributes);
1792          return null;
1793        } else {
1794          throw new IOException("Invalid onDiskSizeWithHeader=" + onDiskSizeWithHeader);
1795        }
1796      }
1797
1798      int preReadHeaderSize = headerBuf == null ? 0 : hdrSize;
1799      // Allocate enough space to fit the next block's header too; saves a seek next time through.
1800      // onDiskBlock is whole block + header + checksums then extra hdrSize to read next header;
1801      // onDiskSizeWithHeader is header, body, and any checksums if present. preReadHeaderSize
1802      // says where to start reading. If we have the header cached, then we don't need to read
1803      // it again and we can likely read from last place we left off w/o need to backup and reread
1804      // the header we read last time through here.
1805      ByteBuff onDiskBlock = this.allocate(onDiskSizeWithHeader + hdrSize, intoHeap);
1806      boolean initHFileBlockSuccess = false;
1807      try {
1808        if (headerBuf != null) {
1809          onDiskBlock.put(0, headerBuf, 0, hdrSize).position(hdrSize);
1810        }
1811        boolean readNextHeader = readAtOffset(is, onDiskBlock,
1812          onDiskSizeWithHeader - preReadHeaderSize, true, offset + preReadHeaderSize, pread);
1813        onDiskBlock.rewind(); // in case of moving position when copying a cached header
1814        if (isScanMetricsEnabled) {
1815          long bytesRead =
1816            (onDiskSizeWithHeader - preReadHeaderSize) + (readNextHeader ? hdrSize : 0);
1817          ThreadLocalServerSideScanMetrics.addBytesReadFromFs(bytesRead);
1818          ThreadLocalServerSideScanMetrics.addBlockReadOpsCount(1);
1819        }
1820
1821        // the call to validateChecksum for this block excludes the next block header over-read, so
1822        // no reason to delay extracting this value.
1823        int nextBlockOnDiskSize = -1;
1824        if (readNextHeader) {
1825          int parsedVal = getNextBlockOnDiskSize(onDiskBlock, onDiskSizeWithHeader);
1826          if (checkOnDiskSizeWithHeader(parsedVal)) {
1827            nextBlockOnDiskSize = parsedVal;
1828          }
1829        }
1830        if (headerBuf == null) {
1831          headerBuf = onDiskBlock.duplicate().position(0).limit(hdrSize);
1832        }
1833
1834        ByteBuff curBlock = onDiskBlock.duplicate().position(0).limit(onDiskSizeWithHeader);
1835        // Verify checksum of the data before using it for building HFileBlock.
1836        if (verifyChecksum && !validateChecksum(offset, curBlock, hdrSize)) {
1837          invalidateNextBlockHeader();
1838          span.addEvent("Falling back to HDFS checksumming.", attributes);
1839          return null;
1840        }
1841
1842        // TODO: is this check necessary or can we proceed with a provided value regardless of
1843        // what is in the header?
1844        int fromHeader = getOnDiskSizeWithHeader(headerBuf, checksumSupport);
1845        if (onDiskSizeWithHeader != fromHeader) {
1846          if (LOG.isTraceEnabled()) {
1847            LOG.trace("Passed in onDiskSizeWithHeader={} != {}, offset={}, fileContext={}",
1848              onDiskSizeWithHeader, fromHeader, offset, this.fileContext);
1849          }
1850          if (checksumSupport && verifyChecksum) {
1851            // This file supports HBase checksums and verification of those checksums was
1852            // requested. The block size provided by the caller (presumably from the block index)
1853            // does not match the block size written to the block header. treat this as
1854            // HBase-checksum failure.
1855            span.addEvent("Falling back to HDFS checksumming.", attributes);
1856            invalidateNextBlockHeader();
1857            return null;
1858          }
1859          throw new IOException("Passed in onDiskSizeWithHeader=" + onDiskSizeWithHeader + " != "
1860            + fromHeader + ", offset=" + offset + ", fileContext=" + this.fileContext);
1861        }
1862
1863        // remove checksum from buffer now that it's verified
1864        int sizeWithoutChecksum = curBlock.getInt(Header.ON_DISK_DATA_SIZE_WITH_HEADER_INDEX);
1865        curBlock.limit(sizeWithoutChecksum);
1866        long duration = EnvironmentEdgeManager.currentTime() - startTime;
1867        boolean tooSlow = this.readWarnTime >= 0 && duration > this.readWarnTime;
1868        if (updateMetrics) {
1869          HFile.updateReadLatency(duration, pread, tooSlow);
1870        }
1871        // The onDiskBlock will become the headerAndDataBuffer for this block.
1872        // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already
1873        // contains the header of next block, so no need to set next block's header in it.
1874        HFileBlock hFileBlock = createFromBuff(curBlock, checksumSupport, offset,
1875          nextBlockOnDiskSize, fileContext, intoHeap ? HEAP : allocator);
1876        // Run check on uncompressed sizings.
1877        if (!fileContext.isCompressedOrEncrypted()) {
1878          hFileBlock.sanityCheckUncompressed();
1879        }
1880        LOG.trace("Read {} in {} ms", hFileBlock, duration);
1881        if (!LOG.isTraceEnabled() && tooSlow) {
1882          LOG.warn("Read Block Slow: read {} cost {} ms, threshold = {} ms", hFileBlock, duration,
1883            this.readWarnTime);
1884        }
1885        span.addEvent("Read block", attributes);
1886        // Cache next block header if we read it for the next time through here.
1887        if (nextBlockOnDiskSize != -1) {
1888          cacheNextBlockHeader(offset + hFileBlock.getOnDiskSizeWithHeader(), onDiskBlock,
1889            onDiskSizeWithHeader, hdrSize);
1890        }
1891        initHFileBlockSuccess = true;
1892        return hFileBlock;
1893      } finally {
1894        if (!initHFileBlockSuccess) {
1895          onDiskBlock.release();
1896        }
1897      }
1898    }
1899
1900    @Override
1901    public void setIncludesMemStoreTS(boolean includesMemstoreTS) {
1902      this.fileContext =
1903        new HFileContextBuilder(this.fileContext).withIncludesMvcc(includesMemstoreTS).build();
1904    }
1905
1906    @Override
1907    public void setDataBlockEncoder(HFileDataBlockEncoder encoder, Configuration conf) {
1908      encodedBlockDecodingCtx = encoder.newDataBlockDecodingContext(conf, fileContext);
1909    }
1910
1911    @Override
1912    public HFileBlockDecodingContext getBlockDecodingContext() {
1913      return this.encodedBlockDecodingCtx;
1914    }
1915
1916    @Override
1917    public HFileBlockDecodingContext getDefaultBlockDecodingContext() {
1918      return this.defaultDecodingCtx;
1919    }
1920
1921    /**
1922     * Generates the checksum for the header as well as the data and then validates it. If the block
1923     * doesn't uses checksum, returns false.
1924     * @return True if checksum matches, else false.
1925     */
1926    private boolean validateChecksum(long offset, ByteBuff data, int hdrSize) {
1927      // If this is an older version of the block that does not have checksums, then return false
1928      // indicating that checksum verification did not succeed. Actually, this method should never
1929      // be called when the minorVersion is 0, thus this is a defensive check for a cannot-happen
1930      // case. Since this is a cannot-happen case, it is better to return false to indicate a
1931      // checksum validation failure.
1932      if (!fileContext.isUseHBaseChecksum()) {
1933        return false;
1934      }
1935
1936      // If the checksumType of the read block header is incorrect, it indicates that the block is
1937      // corrupted and can be directly rolled back to HDFS checksum verification
1938      if (!checkCheckSumTypeOnHeaderBuf(data)) {
1939        HFile.LOG.warn(
1940          "HBase checksumType verification failed for file {} at offset {} filesize {}"
1941            + " checksumType {}. Retrying read with HDFS checksums turned on...",
1942          pathName, offset, fileSize, data.get(HFileBlock.Header.CHECKSUM_TYPE_INDEX));
1943        return false;
1944      }
1945
1946      return ChecksumUtil.validateChecksum(data, pathName, offset, hdrSize);
1947    }
1948
1949    @Override
1950    public void closeStreams() throws IOException {
1951      streamWrapper.close();
1952    }
1953
1954    @Override
1955    public void unbufferStream() {
1956      // To handle concurrent reads, ensure that no other client is accessing the streams while we
1957      // unbuffer it.
1958      if (streamLock.tryLock()) {
1959        try {
1960          this.streamWrapper.unbuffer();
1961        } finally {
1962          streamLock.unlock();
1963        }
1964      }
1965    }
1966
1967    @Override
1968    public String toString() {
1969      return "hfs=" + hfs + ", path=" + pathName + ", fileContext=" + fileContext;
1970    }
1971  }
1972
1973  /** An additional sanity-check in case no compression or encryption is being used. */
1974  void sanityCheckUncompressed() throws IOException {
1975    if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader + totalChecksumBytes()) {
1976      throw new IOException("Using no compression but " + "onDiskSizeWithoutHeader="
1977        + onDiskSizeWithoutHeader + ", " + "uncompressedSizeWithoutHeader="
1978        + uncompressedSizeWithoutHeader + ", numChecksumbytes=" + totalChecksumBytes());
1979    }
1980  }
1981
1982  // Cacheable implementation
1983  @Override
1984  public int getSerializedLength() {
1985    if (bufWithoutChecksum != null) {
1986      // Include extra bytes for block metadata.
1987      return this.bufWithoutChecksum.limit() + BLOCK_METADATA_SPACE;
1988    }
1989    return 0;
1990  }
1991
1992  // Cacheable implementation
1993  @Override
1994  public void serialize(ByteBuffer destination, boolean includeNextBlockMetadata) {
1995    this.bufWithoutChecksum.get(destination, 0, getSerializedLength() - BLOCK_METADATA_SPACE);
1996    destination = addMetaData(destination, includeNextBlockMetadata);
1997
1998    // Make it ready for reading. flip sets position to zero and limit to current position which
1999    // is what we want if we do not want to serialize the block plus checksums if present plus
2000    // metadata.
2001    destination.flip();
2002  }
2003
2004  /**
2005   * For use by bucketcache. This exposes internals.
2006   */
2007  public ByteBuffer getMetaData(ByteBuffer bb) {
2008    bb = addMetaData(bb, true);
2009    bb.flip();
2010    return bb;
2011  }
2012
2013  /**
2014   * Adds metadata at current position (position is moved forward). Does not flip or reset.
2015   * @return The passed <code>destination</code> with metadata added.
2016   */
2017  private ByteBuffer addMetaData(final ByteBuffer destination, boolean includeNextBlockMetadata) {
2018    destination.put(this.fileContext.isUseHBaseChecksum() ? (byte) 1 : (byte) 0);
2019    destination.putLong(this.offset);
2020    if (includeNextBlockMetadata) {
2021      destination.putInt(this.nextBlockOnDiskSize);
2022    }
2023    return destination;
2024  }
2025
2026  // Cacheable implementation
2027  @Override
2028  public CacheableDeserializer<Cacheable> getDeserializer() {
2029    return HFileBlock.BLOCK_DESERIALIZER;
2030  }
2031
2032  @Override
2033  public int hashCode() {
2034    int result = 1;
2035    result = result * 31 + blockType.hashCode();
2036    result = result * 31 + nextBlockOnDiskSize;
2037    result = result * 31 + (int) (offset ^ (offset >>> 32));
2038    result = result * 31 + onDiskSizeWithoutHeader;
2039    result = result * 31 + (int) (prevBlockOffset ^ (prevBlockOffset >>> 32));
2040    result = result * 31 + uncompressedSizeWithoutHeader;
2041    result = result * 31 + bufWithoutChecksum.hashCode();
2042    return result;
2043  }
2044
2045  @Override
2046  public boolean equals(Object comparison) {
2047    if (this == comparison) {
2048      return true;
2049    }
2050    if (comparison == null) {
2051      return false;
2052    }
2053    if (!(comparison instanceof HFileBlock)) {
2054      return false;
2055    }
2056
2057    HFileBlock castedComparison = (HFileBlock) comparison;
2058
2059    if (castedComparison.blockType != this.blockType) {
2060      return false;
2061    }
2062    if (castedComparison.nextBlockOnDiskSize != this.nextBlockOnDiskSize) {
2063      return false;
2064    }
2065    // Offset is important. Needed when we have to remake cachekey when block is returned to cache.
2066    if (castedComparison.offset != this.offset) {
2067      return false;
2068    }
2069    if (castedComparison.onDiskSizeWithoutHeader != this.onDiskSizeWithoutHeader) {
2070      return false;
2071    }
2072    if (castedComparison.prevBlockOffset != this.prevBlockOffset) {
2073      return false;
2074    }
2075    if (castedComparison.uncompressedSizeWithoutHeader != this.uncompressedSizeWithoutHeader) {
2076      return false;
2077    }
2078    if (
2079      ByteBuff.compareTo(this.bufWithoutChecksum, 0, this.bufWithoutChecksum.limit(),
2080        castedComparison.bufWithoutChecksum, 0, castedComparison.bufWithoutChecksum.limit()) != 0
2081    ) {
2082      return false;
2083    }
2084    return true;
2085  }
2086
2087  DataBlockEncoding getDataBlockEncoding() {
2088    if (blockType == BlockType.ENCODED_DATA) {
2089      return DataBlockEncoding.getEncodingById(getDataBlockEncodingId());
2090    }
2091    return DataBlockEncoding.NONE;
2092  }
2093
2094  byte getChecksumType() {
2095    return this.fileContext.getChecksumType().getCode();
2096  }
2097
2098  int getBytesPerChecksum() {
2099    return this.fileContext.getBytesPerChecksum();
2100  }
2101
2102  /** Returns the size of data on disk + header. Excludes checksum. */
2103  int getOnDiskDataSizeWithHeader() {
2104    return this.onDiskDataSizeWithHeader;
2105  }
2106
2107  /**
2108   * Return the number of bytes required to store all the checksums for this block. Each checksum
2109   * value is a 4 byte integer. <br/>
2110   * NOTE: ByteBuff returned by {@link HFileBlock#getBufferWithoutHeader()} and
2111   * {@link HFileBlock#getBufferReadOnly} or DataInputStream returned by
2112   * {@link HFileBlock#getByteStream()} does not include checksum.
2113   */
2114  int totalChecksumBytes() {
2115    return totalChecksumBytes;
2116  }
2117
2118  private int computeTotalChecksumBytes() {
2119    // If the hfile block has minorVersion 0, then there are no checksum
2120    // data to validate. Similarly, a zero value in this.bytesPerChecksum
2121    // indicates that cached blocks do not have checksum data because
2122    // checksums were already validated when the block was read from disk.
2123    if (!fileContext.isUseHBaseChecksum() || this.fileContext.getBytesPerChecksum() == 0) {
2124      return 0;
2125    }
2126    return (int) ChecksumUtil.numBytes(onDiskDataSizeWithHeader,
2127      this.fileContext.getBytesPerChecksum());
2128  }
2129
2130  /**
2131   * Returns the size of this block header.
2132   */
2133  public int headerSize() {
2134    return headerSize(this.fileContext.isUseHBaseChecksum());
2135  }
2136
2137  /**
2138   * Maps a minor version to the size of the header.
2139   */
2140  public static int headerSize(boolean usesHBaseChecksum) {
2141    return usesHBaseChecksum
2142      ? HConstants.HFILEBLOCK_HEADER_SIZE
2143      : HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
2144  }
2145
2146  /**
2147   * Return the appropriate DUMMY_HEADER for the minor version
2148   */
2149  // TODO: Why is this in here?
2150  byte[] getDummyHeaderForVersion() {
2151    return getDummyHeaderForVersion(this.fileContext.isUseHBaseChecksum());
2152  }
2153
2154  /**
2155   * Return the appropriate DUMMY_HEADER for the minor version
2156   */
2157  static private byte[] getDummyHeaderForVersion(boolean usesHBaseChecksum) {
2158    return usesHBaseChecksum ? HConstants.HFILEBLOCK_DUMMY_HEADER : DUMMY_HEADER_NO_CHECKSUM;
2159  }
2160
2161  /**
2162   * @return This HFileBlocks fileContext which will a derivative of the fileContext for the file
2163   *         from which this block's data was originally read.
2164   */
2165  public HFileContext getHFileContext() {
2166    return this.fileContext;
2167  }
2168
2169  /**
2170   * Convert the contents of the block header into a human readable string. This is mostly helpful
2171   * for debugging. This assumes that the block has minor version > 0.
2172   */
2173  static String toStringHeader(ByteBuff buf) throws IOException {
2174    byte[] magicBuf = new byte[Math.min(buf.limit() - buf.position(), BlockType.MAGIC_LENGTH)];
2175    buf.get(magicBuf);
2176    BlockType bt = BlockType.parse(magicBuf, 0, BlockType.MAGIC_LENGTH);
2177    int compressedBlockSizeNoHeader = buf.getInt();
2178    int uncompressedBlockSizeNoHeader = buf.getInt();
2179    long prevBlockOffset = buf.getLong();
2180    byte cksumtype = buf.get();
2181    long bytesPerChecksum = buf.getInt();
2182    long onDiskDataSizeWithHeader = buf.getInt();
2183    return " Header dump: magic: " + Bytes.toString(magicBuf) + " blockType " + bt
2184      + " compressedBlockSizeNoHeader " + compressedBlockSizeNoHeader
2185      + " uncompressedBlockSizeNoHeader " + uncompressedBlockSizeNoHeader + " prevBlockOffset "
2186      + prevBlockOffset + " checksumType " + ChecksumType.codeToType(cksumtype)
2187      + " bytesPerChecksum " + bytesPerChecksum + " onDiskDataSizeWithHeader "
2188      + onDiskDataSizeWithHeader;
2189  }
2190
2191  /**
2192   * Creates a new HFileBlockBuilder from the existing block and a new ByteBuff. The builder will be
2193   * loaded with all of the original fields from blk, except now using the newBuff and setting
2194   * isSharedMem based on the source of the passed in newBuff. An existing HFileBlock may have been
2195   * an {@link ExclusiveMemHFileBlock}, but the new buffer might call for a
2196   * {@link SharedMemHFileBlock}. Or vice versa.
2197   * @param blk     the block to clone from
2198   * @param newBuff the new buffer to use
2199   */
2200  private static HFileBlockBuilder createBuilder(HFileBlock blk, ByteBuff newBuff) {
2201    return new HFileBlockBuilder().withBlockType(blk.blockType)
2202      .withOnDiskSizeWithoutHeader(blk.onDiskSizeWithoutHeader)
2203      .withUncompressedSizeWithoutHeader(blk.uncompressedSizeWithoutHeader)
2204      .withPrevBlockOffset(blk.prevBlockOffset).withByteBuff(newBuff).withOffset(blk.offset)
2205      .withOnDiskDataSizeWithHeader(blk.onDiskDataSizeWithHeader)
2206      .withNextBlockOnDiskSize(blk.nextBlockOnDiskSize).withHFileContext(blk.fileContext)
2207      .withByteBuffAllocator(blk.allocator).withShared(!newBuff.hasArray());
2208  }
2209
2210  private static HFileBlock shallowClone(HFileBlock blk, ByteBuff newBuf) {
2211    return createBuilder(blk, newBuf).build();
2212  }
2213
2214  static HFileBlock deepCloneOnHeap(HFileBlock blk) {
2215    ByteBuff deepCloned = ByteBuff
2216      .wrap(ByteBuffer.wrap(blk.bufWithoutChecksum.toBytes(0, blk.bufWithoutChecksum.limit())));
2217    return createBuilder(blk, deepCloned).build();
2218  }
2219
2220  /**
2221   * Returns OpenTelemetry Attributes for a Span that is reading a data block with relevant
2222   * metadata. Will short-circuit if the span isn't going to be captured/OTEL isn't enabled.
2223   */
2224  private static Attributes getReadDataBlockInternalAttributes(Span span) {
2225    // It's expensive to record these attributes, so we avoid the cost of doing this if the span
2226    // isn't going to be persisted
2227    if (!span.isRecording()) {
2228      return Attributes.empty();
2229    }
2230
2231    final AttributesBuilder attributesBuilder = Attributes.builder();
2232    Optional.of(Context.current()).map(val -> val.get(CONTEXT_KEY))
2233      .ifPresent(c -> c.accept(attributesBuilder));
2234    return attributesBuilder.build();
2235  }
2236}