001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP;
021
022import java.io.DataInputStream;
023import java.io.DataOutput;
024import java.io.DataOutputStream;
025import java.io.IOException;
026import java.nio.ByteBuffer;
027import java.util.ArrayList;
028import java.util.List;
029import java.util.concurrent.atomic.AtomicReference;
030import java.util.concurrent.locks.Lock;
031import java.util.concurrent.locks.ReentrantLock;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FSDataInputStream;
034import org.apache.hadoop.fs.FSDataOutputStream;
035import org.apache.hadoop.hbase.Cell;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.fs.HFileSystem;
038import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
039import org.apache.hadoop.hbase.io.ByteBuffAllocator;
040import org.apache.hadoop.hbase.io.ByteBuffInputStream;
041import org.apache.hadoop.hbase.io.ByteBufferWriterDataOutputStream;
042import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
043import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
044import org.apache.hadoop.hbase.io.encoding.EncodingState;
045import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
046import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
047import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
048import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
049import org.apache.hadoop.hbase.io.util.BlockIOUtils;
050import org.apache.hadoop.hbase.nio.ByteBuff;
051import org.apache.hadoop.hbase.nio.MultiByteBuff;
052import org.apache.hadoop.hbase.nio.SingleByteBuff;
053import org.apache.hadoop.hbase.regionserver.ShipperListener;
054import org.apache.hadoop.hbase.util.Bytes;
055import org.apache.hadoop.hbase.util.ChecksumType;
056import org.apache.hadoop.hbase.util.ClassSize;
057import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
058import org.apache.yetus.audience.InterfaceAudience;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
063
064/**
065 * Cacheable Blocks of an {@link HFile} version 2 file. Version 2 was introduced in hbase-0.92.0.
066 * <p>
067 * Version 1 was the original file block. Version 2 was introduced when we changed the hbase file
068 * format to support multi-level block indexes and compound bloom filters (HBASE-3857). Support for
069 * Version 1 was removed in hbase-1.3.0.
070 * <h3>HFileBlock: Version 2</h3> In version 2, a block is structured as follows:
071 * <ul>
072 * <li><b>Header:</b> See Writer#putHeader() for where header is written; header total size is
073 * HFILEBLOCK_HEADER_SIZE
074 * <ul>
075 * <li>0. blockType: Magic record identifying the {@link BlockType} (8 bytes): e.g.
076 * <code>DATABLK*</code>
077 * <li>1. onDiskSizeWithoutHeader: Compressed -- a.k.a 'on disk' -- block size, excluding header,
078 * but including tailing checksum bytes (4 bytes)
079 * <li>2. uncompressedSizeWithoutHeader: Uncompressed block size, excluding header, and excluding
080 * checksum bytes (4 bytes)
081 * <li>3. prevBlockOffset: The offset of the previous block of the same type (8 bytes). This is used
082 * to navigate to the previous block without having to go to the block index
083 * <li>4: For minorVersions &gt;=1, the ordinal describing checksum type (1 byte)
084 * <li>5: For minorVersions &gt;=1, the number of data bytes/checksum chunk (4 bytes)
085 * <li>6: onDiskDataSizeWithHeader: For minorVersions &gt;=1, the size of data 'on disk', including
086 * header, excluding checksums (4 bytes)
087 * </ul>
088 * </li>
089 * <li><b>Raw/Compressed/Encrypted/Encoded data:</b> The compression algorithm is the same for all
090 * the blocks in an {@link HFile}. If compression is NONE, this is just raw, serialized Cells.
091 * <li><b>Tail:</b> For minorVersions &gt;=1, a series of 4 byte checksums, one each for the number
092 * of bytes specified by bytesPerChecksum.
093 * </ul>
094 * <h3>Caching</h3> Caches cache whole blocks with trailing checksums if any. We then tag on some
095 * metadata, the content of BLOCK_METADATA_SPACE which will be flag on if we are doing 'hbase'
096 * checksums and then the offset into the file which is needed when we re-make a cache key when we
097 * return the block to the cache as 'done'. See {@link Cacheable#serialize(ByteBuffer, boolean)} and
098 * {@link Cacheable#getDeserializer()}.
099 * <p>
100 * TODO: Should we cache the checksums? Down in Writer#getBlockForCaching(CacheConfig) where we make
101 * a block to cache-on-write, there is an attempt at turning off checksums. This is not the only
102 * place we get blocks to cache. We also will cache the raw return from an hdfs read. In this case,
103 * the checksums may be present. If the cache is backed by something that doesn't do ECC, say an
104 * SSD, we might want to preserve checksums. For now this is open question.
105 * <p>
106 * TODO: Over in BucketCache, we save a block allocation by doing a custom serialization. Be sure to
107 * change it if serialization changes in here. Could we add a method here that takes an IOEngine and
108 * that then serializes to it rather than expose our internals over in BucketCache? IOEngine is in
109 * the bucket subpackage. Pull it up? Then this class knows about bucketcache. Ugh.
110 */
111@InterfaceAudience.Private
112public class HFileBlock implements Cacheable {
113  private static final Logger LOG = LoggerFactory.getLogger(HFileBlock.class);
114  public static final long FIXED_OVERHEAD = ClassSize.estimateBase(HFileBlock.class, false);
115
116  // Block Header fields.
117
118  // TODO: encapsulate Header related logic in this inner class.
119  static class Header {
120    // Format of header is:
121    // 8 bytes - block magic
122    // 4 bytes int - onDiskSizeWithoutHeader
123    // 4 bytes int - uncompressedSizeWithoutHeader
124    // 8 bytes long - prevBlockOffset
125    // The following 3 are only present if header contains checksum information
126    // 1 byte - checksum type
127    // 4 byte int - bytes per checksum
128    // 4 byte int - onDiskDataSizeWithHeader
129    static int BLOCK_MAGIC_INDEX = 0;
130    static int ON_DISK_SIZE_WITHOUT_HEADER_INDEX = 8;
131    static int UNCOMPRESSED_SIZE_WITHOUT_HEADER_INDEX = 12;
132    static int PREV_BLOCK_OFFSET_INDEX = 16;
133    static int CHECKSUM_TYPE_INDEX = 24;
134    static int BYTES_PER_CHECKSUM_INDEX = 25;
135    static int ON_DISK_DATA_SIZE_WITH_HEADER_INDEX = 29;
136  }
137
138  /** Type of block. Header field 0. */
139  private BlockType blockType;
140
141  /**
142   * Size on disk excluding header, including checksum. Header field 1.
143   * @see Writer#putHeader(byte[], int, int, int, int)
144   */
145  private int onDiskSizeWithoutHeader;
146
147  /**
148   * Size of pure data. Does not include header or checksums. Header field 2.
149   * @see Writer#putHeader(byte[], int, int, int, int)
150   */
151  private int uncompressedSizeWithoutHeader;
152
153  /**
154   * The offset of the previous block on disk. Header field 3.
155   * @see Writer#putHeader(byte[], int, int, int, int)
156   */
157  private long prevBlockOffset;
158
159  /**
160   * Size on disk of header + data. Excludes checksum. Header field 6, OR calculated from
161   * {@link #onDiskSizeWithoutHeader} when using HDFS checksum.
162   * @see Writer#putHeader(byte[], int, int, int, int)
163   */
164  private int onDiskDataSizeWithHeader;
165  // End of Block Header fields.
166
167  /**
168   * The in-memory representation of the hfile block. Can be on or offheap. Can be backed by a
169   * single ByteBuffer or by many. Make no assumptions.
170   * <p>
171   * Be careful reading from this <code>buf</code>. Duplicate and work on the duplicate or if not,
172   * be sure to reset position and limit else trouble down the road.
173   * <p>
174   * TODO: Make this read-only once made.
175   * <p>
176   * We are using the ByteBuff type. ByteBuffer is not extensible yet we need to be able to have a
177   * ByteBuffer-like API across multiple ByteBuffers reading from a cache such as BucketCache. So,
178   * we have this ByteBuff type. Unfortunately, it is spread all about HFileBlock. Would be good if
179   * could be confined to cache-use only but hard-to-do.
180   */
181  private ByteBuff buf;
182
183  /**
184   * Meta data that holds meta information on the hfileblock.
185   */
186  private HFileContext fileContext;
187
188  /**
189   * The offset of this block in the file. Populated by the reader for convenience of access. This
190   * offset is not part of the block header.
191   */
192  private long offset = UNSET;
193
194  /**
195   * The on-disk size of the next block, including the header and checksums if present. UNSET if
196   * unknown. Blocks try to carry the size of the next block to read in this data member. Usually we
197   * get block sizes from the hfile index but sometimes the index is not available: e.g. when we
198   * read the indexes themselves (indexes are stored in blocks, we do not have an index for the
199   * indexes). Saves seeks especially around file open when there is a flurry of reading in hfile
200   * metadata.
201   */
202  private int nextBlockOnDiskSize = UNSET;
203
204  private ByteBuffAllocator allocator;
205
206  /**
207   * On a checksum failure, do these many succeeding read requests using hdfs checksums before
208   * auto-reenabling hbase checksum verification.
209   */
210  static final int CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD = 3;
211
212  private static int UNSET = -1;
213  public static final boolean FILL_HEADER = true;
214  public static final boolean DONT_FILL_HEADER = false;
215
216  // How to get the estimate correctly? if it is a singleBB?
217  public static final int MULTI_BYTE_BUFFER_HEAP_SIZE =
218    (int) ClassSize.estimateBase(MultiByteBuff.class, false);
219
220  /**
221   * Space for metadata on a block that gets stored along with the block when we cache it. There are
222   * a few bytes stuck on the end of the HFileBlock that we pull in from HDFS. 8 bytes are for the
223   * offset of this block (long) in the file. Offset is important because is is used when we remake
224   * the CacheKey when we return block to the cache when done. There is also a flag on whether
225   * checksumming is being done by hbase or not. See class comment for note on uncertain state of
226   * checksumming of blocks that come out of cache (should we or should we not?). Finally there are
227   * 4 bytes to hold the length of the next block which can save a seek on occasion if available.
228   * (This EXTRA info came in with original commit of the bucketcache, HBASE-7404. It was formerly
229   * known as EXTRA_SERIALIZATION_SPACE).
230   */
231  public static final int BLOCK_METADATA_SPACE =
232    Bytes.SIZEOF_BYTE + Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT;
233
234  /**
235   * Each checksum value is an integer that can be stored in 4 bytes.
236   */
237  static final int CHECKSUM_SIZE = Bytes.SIZEOF_INT;
238
239  static final byte[] DUMMY_HEADER_NO_CHECKSUM =
240    new byte[HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM];
241
242  /**
243   * Used deserializing blocks from Cache. <code>
244   * ++++++++++++++
245   * + HFileBlock +
246   * ++++++++++++++
247   * + Checksums  + <= Optional
248   * ++++++++++++++
249   * + Metadata!  + <= See note on BLOCK_METADATA_SPACE above.
250   * ++++++++++++++
251   * </code>
252   * @see #serialize(ByteBuffer, boolean)
253   */
254  public static final CacheableDeserializer<Cacheable> BLOCK_DESERIALIZER = new BlockDeserializer();
255
256  public static final class BlockDeserializer implements CacheableDeserializer<Cacheable> {
257    private BlockDeserializer() {
258    }
259
260    @Override
261    public HFileBlock deserialize(ByteBuff buf, ByteBuffAllocator alloc) throws IOException {
262      // The buf has the file block followed by block metadata.
263      // Set limit to just before the BLOCK_METADATA_SPACE then rewind.
264      buf.limit(buf.limit() - BLOCK_METADATA_SPACE).rewind();
265      // Get a new buffer to pass the HFileBlock for it to 'own'.
266      ByteBuff newByteBuff = buf.slice();
267      // Read out the BLOCK_METADATA_SPACE content and shove into our HFileBlock.
268      buf.position(buf.limit());
269      buf.limit(buf.limit() + HFileBlock.BLOCK_METADATA_SPACE);
270      boolean usesChecksum = buf.get() == (byte) 1;
271      long offset = buf.getLong();
272      int nextBlockOnDiskSize = buf.getInt();
273      return createFromBuff(newByteBuff, usesChecksum, offset, nextBlockOnDiskSize, null, alloc);
274    }
275
276    @Override
277    public int getDeserializerIdentifier() {
278      return DESERIALIZER_IDENTIFIER;
279    }
280  }
281
282  private static final int DESERIALIZER_IDENTIFIER;
283  static {
284    DESERIALIZER_IDENTIFIER =
285      CacheableDeserializerIdManager.registerDeserializer(BLOCK_DESERIALIZER);
286  }
287
288  /**
289   * Creates a new {@link HFile} block from the given fields. This constructor is used only while
290   * writing blocks and caching, and is sitting in a byte buffer and we want to stuff the block into
291   * cache.
292   * <p>
293   * TODO: The caller presumes no checksumming
294   * <p>
295   * TODO: HFile block writer can also off-heap ?
296   * </p>
297   * required of this block instance since going into cache; checksum already verified on underlying
298   * block data pulled in from filesystem. Is that correct? What if cache is SSD?
299   * @param blockType                     the type of this block, see {@link BlockType}
300   * @param onDiskSizeWithoutHeader       see {@link #onDiskSizeWithoutHeader}
301   * @param uncompressedSizeWithoutHeader see {@link #uncompressedSizeWithoutHeader}
302   * @param prevBlockOffset               see {@link #prevBlockOffset}
303   * @param buf                           block buffer with header
304   *                                      ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes)
305   * @param fillHeader                    when true, write the first 4 header fields into passed
306   *                                      buffer.
307   * @param offset                        the file offset the block was read from
308   * @param onDiskDataSizeWithHeader      see {@link #onDiskDataSizeWithHeader}
309   * @param fileContext                   HFile meta data
310   */
311  public HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader,
312    int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuff buf, boolean fillHeader,
313    long offset, int nextBlockOnDiskSize, int onDiskDataSizeWithHeader, HFileContext fileContext,
314    ByteBuffAllocator allocator) {
315    this.blockType = blockType;
316    this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader;
317    this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader;
318    this.prevBlockOffset = prevBlockOffset;
319    this.offset = offset;
320    this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader;
321    this.nextBlockOnDiskSize = nextBlockOnDiskSize;
322    this.fileContext = fileContext;
323    this.allocator = allocator;
324    this.buf = buf;
325    if (fillHeader) {
326      overwriteHeader();
327    }
328    this.buf.rewind();
329  }
330
331  /**
332   * Creates a block from an existing buffer starting with a header. Rewinds and takes ownership of
333   * the buffer. By definition of rewind, ignores the buffer position, but if you slice the buffer
334   * beforehand, it will rewind to that point.
335   * @param buf Has header, content, and trailing checksums if present.
336   */
337  static HFileBlock createFromBuff(ByteBuff buf, boolean usesHBaseChecksum, final long offset,
338    final int nextBlockOnDiskSize, HFileContext fileContext, ByteBuffAllocator allocator)
339    throws IOException {
340    buf.rewind();
341    final BlockType blockType = BlockType.read(buf);
342    final int onDiskSizeWithoutHeader = buf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX);
343    final int uncompressedSizeWithoutHeader =
344      buf.getInt(Header.UNCOMPRESSED_SIZE_WITHOUT_HEADER_INDEX);
345    final long prevBlockOffset = buf.getLong(Header.PREV_BLOCK_OFFSET_INDEX);
346    // This constructor is called when we deserialize a block from cache and when we read a block in
347    // from the fs. fileCache is null when deserialized from cache so need to make up one.
348    HFileContextBuilder fileContextBuilder =
349      fileContext != null ? new HFileContextBuilder(fileContext) : new HFileContextBuilder();
350    fileContextBuilder.withHBaseCheckSum(usesHBaseChecksum);
351    int onDiskDataSizeWithHeader;
352    if (usesHBaseChecksum) {
353      byte checksumType = buf.get(Header.CHECKSUM_TYPE_INDEX);
354      int bytesPerChecksum = buf.getInt(Header.BYTES_PER_CHECKSUM_INDEX);
355      onDiskDataSizeWithHeader = buf.getInt(Header.ON_DISK_DATA_SIZE_WITH_HEADER_INDEX);
356      // Use the checksum type and bytes per checksum from header, not from fileContext.
357      fileContextBuilder.withChecksumType(ChecksumType.codeToType(checksumType));
358      fileContextBuilder.withBytesPerCheckSum(bytesPerChecksum);
359    } else {
360      fileContextBuilder.withChecksumType(ChecksumType.NULL);
361      fileContextBuilder.withBytesPerCheckSum(0);
362      // Need to fix onDiskDataSizeWithHeader; there are not checksums after-block-data
363      onDiskDataSizeWithHeader = onDiskSizeWithoutHeader + headerSize(usesHBaseChecksum);
364    }
365    fileContext = fileContextBuilder.build();
366    assert usesHBaseChecksum == fileContext.isUseHBaseChecksum();
367    return new HFileBlockBuilder().withBlockType(blockType)
368      .withOnDiskSizeWithoutHeader(onDiskSizeWithoutHeader)
369      .withUncompressedSizeWithoutHeader(uncompressedSizeWithoutHeader)
370      .withPrevBlockOffset(prevBlockOffset).withOffset(offset)
371      .withOnDiskDataSizeWithHeader(onDiskDataSizeWithHeader)
372      .withNextBlockOnDiskSize(nextBlockOnDiskSize).withHFileContext(fileContext)
373      .withByteBuffAllocator(allocator).withByteBuff(buf.rewind()).withShared(!buf.hasArray())
374      .build();
375  }
376
377  /**
378   * Parse total on disk size including header and checksum.
379   * @param headerBuf      Header ByteBuffer. Presumed exact size of header.
380   * @param verifyChecksum true if checksum verification is in use.
381   * @return Size of the block with header included.
382   */
383  private static int getOnDiskSizeWithHeader(final ByteBuff headerBuf, boolean verifyChecksum) {
384    return headerBuf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX) + headerSize(verifyChecksum);
385  }
386
387  /**
388   * @return the on-disk size of the next block (including the header size and any checksums if
389   *         present) read by peeking into the next block's header; use as a hint when doing a read
390   *         of the next block when scanning or running over a file.
391   */
392  int getNextBlockOnDiskSize() {
393    return nextBlockOnDiskSize;
394  }
395
396  @Override
397  public BlockType getBlockType() {
398    return blockType;
399  }
400
401  @Override
402  public int refCnt() {
403    return buf.refCnt();
404  }
405
406  @Override
407  public HFileBlock retain() {
408    buf.retain();
409    return this;
410  }
411
412  /**
413   * Call {@link ByteBuff#release()} to decrease the reference count, if no other reference, it will
414   * return back the {@link ByteBuffer} to {@link org.apache.hadoop.hbase.io.ByteBuffAllocator}
415   */
416  @Override
417  public boolean release() {
418    return buf.release();
419  }
420
421  /** @return get data block encoding id that was used to encode this block */
422  short getDataBlockEncodingId() {
423    if (blockType != BlockType.ENCODED_DATA) {
424      throw new IllegalArgumentException("Querying encoder ID of a block " + "of type other than "
425        + BlockType.ENCODED_DATA + ": " + blockType);
426    }
427    return buf.getShort(headerSize());
428  }
429
430  /**
431   * @return the on-disk size of header + data part + checksum.
432   */
433  public int getOnDiskSizeWithHeader() {
434    return onDiskSizeWithoutHeader + headerSize();
435  }
436
437  /**
438   * @return the on-disk size of the data part + checksum (header excluded).
439   */
440  int getOnDiskSizeWithoutHeader() {
441    return onDiskSizeWithoutHeader;
442  }
443
444  /**
445   * @return the uncompressed size of data part (header and checksum excluded).
446   */
447  int getUncompressedSizeWithoutHeader() {
448    return uncompressedSizeWithoutHeader;
449  }
450
451  /**
452   * @return the offset of the previous block of the same type in the file, or -1 if unknown
453   */
454  long getPrevBlockOffset() {
455    return prevBlockOffset;
456  }
457
458  /**
459   * Rewinds {@code buf} and writes first 4 header fields. {@code buf} position is modified as
460   * side-effect.
461   */
462  private void overwriteHeader() {
463    buf.rewind();
464    blockType.write(buf);
465    buf.putInt(onDiskSizeWithoutHeader);
466    buf.putInt(uncompressedSizeWithoutHeader);
467    buf.putLong(prevBlockOffset);
468    if (this.fileContext.isUseHBaseChecksum()) {
469      buf.put(fileContext.getChecksumType().getCode());
470      buf.putInt(fileContext.getBytesPerChecksum());
471      buf.putInt(onDiskDataSizeWithHeader);
472    }
473  }
474
475  /**
476   * Returns a buffer that does not include the header and checksum.
477   * @return the buffer with header skipped and checksum omitted.
478   */
479  public ByteBuff getBufferWithoutHeader() {
480    return this.getBufferWithoutHeader(false);
481  }
482
483  /**
484   * Returns a buffer that does not include the header or checksum.
485   * @param withChecksum to indicate whether include the checksum or not.
486   * @return the buffer with header skipped and checksum omitted.
487   */
488  public ByteBuff getBufferWithoutHeader(boolean withChecksum) {
489    ByteBuff dup = getBufferReadOnly();
490    int delta = withChecksum ? 0 : totalChecksumBytes();
491    return dup.position(headerSize()).limit(buf.limit() - delta).slice();
492  }
493
494  /**
495   * Returns a read-only duplicate of the buffer this block stores internally ready to be read.
496   * Clients must not modify the buffer object though they may set position and limit on the
497   * returned buffer since we pass back a duplicate. This method has to be public because it is used
498   * in {@link CompoundBloomFilter} to avoid object creation on every Bloom filter lookup, but has
499   * to be used with caution. Buffer holds header, block content, and any follow-on checksums if
500   * present.
501   * @return the buffer of this block for read-only operations
502   */
503  public ByteBuff getBufferReadOnly() {
504    // TODO: ByteBuf does not support asReadOnlyBuffer(). Fix.
505    ByteBuff dup = this.buf.duplicate();
506    assert dup.position() == 0;
507    return dup;
508  }
509
510  public ByteBuffAllocator getByteBuffAllocator() {
511    return this.allocator;
512  }
513
514  private void sanityCheckAssertion(long valueFromBuf, long valueFromField, String fieldName)
515    throws IOException {
516    if (valueFromBuf != valueFromField) {
517      throw new AssertionError(fieldName + " in the buffer (" + valueFromBuf
518        + ") is different from that in the field (" + valueFromField + ")");
519    }
520  }
521
522  private void sanityCheckAssertion(BlockType valueFromBuf, BlockType valueFromField)
523    throws IOException {
524    if (valueFromBuf != valueFromField) {
525      throw new IOException("Block type stored in the buffer: " + valueFromBuf
526        + ", block type field: " + valueFromField);
527    }
528  }
529
530  /**
531   * Checks if the block is internally consistent, i.e. the first
532   * {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the buffer contain a valid header consistent
533   * with the fields. Assumes a packed block structure. This function is primary for testing and
534   * debugging, and is not thread-safe, because it alters the internal buffer pointer. Used by tests
535   * only.
536   */
537  void sanityCheck() throws IOException {
538    // Duplicate so no side-effects
539    ByteBuff dup = this.buf.duplicate().rewind();
540    sanityCheckAssertion(BlockType.read(dup), blockType);
541
542    sanityCheckAssertion(dup.getInt(), onDiskSizeWithoutHeader, "onDiskSizeWithoutHeader");
543
544    sanityCheckAssertion(dup.getInt(), uncompressedSizeWithoutHeader,
545      "uncompressedSizeWithoutHeader");
546
547    sanityCheckAssertion(dup.getLong(), prevBlockOffset, "prevBlockOffset");
548    if (this.fileContext.isUseHBaseChecksum()) {
549      sanityCheckAssertion(dup.get(), this.fileContext.getChecksumType().getCode(), "checksumType");
550      sanityCheckAssertion(dup.getInt(), this.fileContext.getBytesPerChecksum(),
551        "bytesPerChecksum");
552      sanityCheckAssertion(dup.getInt(), onDiskDataSizeWithHeader, "onDiskDataSizeWithHeader");
553    }
554
555    int cksumBytes = totalChecksumBytes();
556    int expectedBufLimit = onDiskDataSizeWithHeader + cksumBytes;
557    if (dup.limit() != expectedBufLimit) {
558      throw new AssertionError("Expected limit " + expectedBufLimit + ", got " + dup.limit());
559    }
560
561    // We might optionally allocate HFILEBLOCK_HEADER_SIZE more bytes to read the next
562    // block's header, so there are two sensible values for buffer capacity.
563    int hdrSize = headerSize();
564    dup.rewind();
565    if (dup.remaining() != expectedBufLimit && dup.remaining() != expectedBufLimit + hdrSize) {
566      throw new AssertionError("Invalid buffer capacity: " + dup.remaining() + ", expected "
567        + expectedBufLimit + " or " + (expectedBufLimit + hdrSize));
568    }
569  }
570
571  @Override
572  public String toString() {
573    StringBuilder sb = new StringBuilder().append("[").append("blockType=").append(blockType)
574      .append(", fileOffset=").append(offset).append(", headerSize=").append(headerSize())
575      .append(", onDiskSizeWithoutHeader=").append(onDiskSizeWithoutHeader)
576      .append(", uncompressedSizeWithoutHeader=").append(uncompressedSizeWithoutHeader)
577      .append(", prevBlockOffset=").append(prevBlockOffset).append(", isUseHBaseChecksum=")
578      .append(fileContext.isUseHBaseChecksum());
579    if (fileContext.isUseHBaseChecksum()) {
580      sb.append(", checksumType=").append(ChecksumType.codeToType(this.buf.get(24)))
581        .append(", bytesPerChecksum=").append(this.buf.getInt(24 + 1))
582        .append(", onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader);
583    } else {
584      sb.append(", onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader).append("(")
585        .append(onDiskSizeWithoutHeader).append("+")
586        .append(HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM).append(")");
587    }
588    String dataBegin;
589    if (buf.hasArray()) {
590      dataBegin = Bytes.toStringBinary(buf.array(), buf.arrayOffset() + headerSize(),
591        Math.min(32, buf.limit() - buf.arrayOffset() - headerSize()));
592    } else {
593      ByteBuff bufWithoutHeader = getBufferWithoutHeader();
594      byte[] dataBeginBytes =
595        new byte[Math.min(32, bufWithoutHeader.limit() - bufWithoutHeader.position())];
596      bufWithoutHeader.get(dataBeginBytes);
597      dataBegin = Bytes.toStringBinary(dataBeginBytes);
598    }
599    sb.append(", getOnDiskSizeWithHeader=").append(getOnDiskSizeWithHeader())
600      .append(", totalChecksumBytes=").append(totalChecksumBytes()).append(", isUnpacked=")
601      .append(isUnpacked()).append(", buf=[").append(buf).append("]").append(", dataBeginsWith=")
602      .append(dataBegin).append(", fileContext=").append(fileContext)
603      .append(", nextBlockOnDiskSize=").append(nextBlockOnDiskSize).append("]");
604    return sb.toString();
605  }
606
607  /**
608   * Retrieves the decompressed/decrypted view of this block. An encoded block remains in its
609   * encoded structure. Internal structures are shared between instances where applicable.
610   */
611  HFileBlock unpack(HFileContext fileContext, FSReader reader) throws IOException {
612    if (!fileContext.isCompressedOrEncrypted()) {
613      // TODO: cannot use our own fileContext here because HFileBlock(ByteBuffer, boolean),
614      // which is used for block serialization to L2 cache, does not preserve encoding and
615      // encryption details.
616      return this;
617    }
618
619    HFileBlock unpacked = shallowClone(this);
620    unpacked.allocateBuffer(); // allocates space for the decompressed block
621    boolean succ = false;
622    try {
623      HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA
624        ? reader.getBlockDecodingContext()
625        : reader.getDefaultBlockDecodingContext();
626      // Create a duplicated buffer without the header part.
627      ByteBuff dup = this.buf.duplicate();
628      dup.position(this.headerSize());
629      dup = dup.slice();
630      // Decode the dup into unpacked#buf
631      ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(),
632        unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(true), dup);
633      succ = true;
634      return unpacked;
635    } finally {
636      if (!succ) {
637        unpacked.release();
638      }
639    }
640  }
641
642  /**
643   * Always allocates a new buffer of the correct size. Copies header bytes from the existing
644   * buffer. Does not change header fields. Reserve room to keep checksum bytes too.
645   */
646  private void allocateBuffer() {
647    int cksumBytes = totalChecksumBytes();
648    int headerSize = headerSize();
649    int capacityNeeded = headerSize + uncompressedSizeWithoutHeader + cksumBytes;
650
651    ByteBuff newBuf = allocator.allocate(capacityNeeded);
652
653    // Copy header bytes into newBuf.
654    buf.position(0);
655    newBuf.put(0, buf, 0, headerSize);
656
657    buf = newBuf;
658    // set limit to exclude next block's header
659    buf.limit(capacityNeeded);
660  }
661
662  /**
663   * Return true when this block's buffer has been unpacked, false otherwise. Note this is a
664   * calculated heuristic, not tracked attribute of the block.
665   */
666  public boolean isUnpacked() {
667    final int cksumBytes = totalChecksumBytes();
668    final int headerSize = headerSize();
669    final int expectedCapacity = headerSize + uncompressedSizeWithoutHeader + cksumBytes;
670    final int bufCapacity = buf.remaining();
671    return bufCapacity == expectedCapacity || bufCapacity == expectedCapacity + headerSize;
672  }
673
674  /**
675   * Cannot be {@link #UNSET}. Must be a legitimate value. Used re-making the {@link BlockCacheKey}
676   * when block is returned to the cache.
677   * @return the offset of this block in the file it was read from
678   */
679  long getOffset() {
680    if (offset < 0) {
681      throw new IllegalStateException("HFile block offset not initialized properly");
682    }
683    return offset;
684  }
685
686  /**
687   * @return a byte stream reading the data + checksum of this block
688   */
689  DataInputStream getByteStream() {
690    ByteBuff dup = this.buf.duplicate();
691    dup.position(this.headerSize());
692    return new DataInputStream(new ByteBuffInputStream(dup));
693  }
694
695  @Override
696  public long heapSize() {
697    long size = FIXED_OVERHEAD;
698    size += fileContext.heapSize();
699    if (buf != null) {
700      // Deep overhead of the byte buffer. Needs to be aligned separately.
701      size += ClassSize.align(buf.capacity() + MULTI_BYTE_BUFFER_HEAP_SIZE);
702    }
703    return ClassSize.align(size);
704  }
705
706  /**
707   * Will be override by {@link SharedMemHFileBlock} or {@link ExclusiveMemHFileBlock}. Return true
708   * by default.
709   */
710  public boolean isSharedMem() {
711    if (this instanceof SharedMemHFileBlock) {
712      return true;
713    } else if (this instanceof ExclusiveMemHFileBlock) {
714      return false;
715    }
716    return true;
717  }
718
719  /**
720   * Unified version 2 {@link HFile} block writer. The intended usage pattern is as follows:
721   * <ol>
722   * <li>Construct an {@link HFileBlock.Writer}, providing a compression algorithm.
723   * <li>Call {@link Writer#startWriting} and get a data stream to write to.
724   * <li>Write your data into the stream.
725   * <li>Call Writer#writeHeaderAndData(FSDataOutputStream) as many times as you need to. store the
726   * serialized block into an external stream.
727   * <li>Repeat to write more blocks.
728   * </ol>
729   * <p>
730   */
731  static class Writer implements ShipperListener {
732    private enum State {
733      INIT,
734      WRITING,
735      BLOCK_READY
736    }
737
738    /** Writer state. Used to ensure the correct usage protocol. */
739    private State state = State.INIT;
740
741    /** Data block encoder used for data blocks */
742    private final HFileDataBlockEncoder dataBlockEncoder;
743
744    private HFileBlockEncodingContext dataBlockEncodingCtx;
745
746    /** block encoding context for non-data blocks */
747    private HFileBlockDefaultEncodingContext defaultBlockEncodingCtx;
748
749    /**
750     * The stream we use to accumulate data into a block in an uncompressed format. We reset this
751     * stream at the end of each block and reuse it. The header is written as the first
752     * {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes into this stream.
753     */
754    private ByteArrayOutputStream baosInMemory;
755
756    /**
757     * Current block type. Set in {@link #startWriting(BlockType)}. Could be changed in
758     * {@link #finishBlock()} from {@link BlockType#DATA} to {@link BlockType#ENCODED_DATA}.
759     */
760    private BlockType blockType;
761
762    /**
763     * A stream that we write uncompressed bytes to, which compresses them and writes them to
764     * {@link #baosInMemory}.
765     */
766    private DataOutputStream userDataStream;
767
768    /**
769     * Bytes to be written to the file system, including the header. Compressed if compression is
770     * turned on. It also includes the checksum data that immediately follows the block data.
771     * (header + data + checksums)
772     */
773    private ByteArrayOutputStream onDiskBlockBytesWithHeader;
774
775    /**
776     * The size of the checksum data on disk. It is used only if data is not compressed. If data is
777     * compressed, then the checksums are already part of onDiskBytesWithHeader. If data is
778     * uncompressed, then this variable stores the checksum data for this block.
779     */
780    private byte[] onDiskChecksum = HConstants.EMPTY_BYTE_ARRAY;
781
782    /**
783     * Current block's start offset in the {@link HFile}. Set in
784     * {@link #writeHeaderAndData(FSDataOutputStream)}.
785     */
786    private long startOffset;
787
788    /**
789     * Offset of previous block by block type. Updated when the next block is started.
790     */
791    private long[] prevOffsetByType;
792
793    /** The offset of the previous block of the same type */
794    private long prevOffset;
795    /** Meta data that holds information about the hfileblock **/
796    private HFileContext fileContext;
797
798    private final ByteBuffAllocator allocator;
799
800    @Override
801    public void beforeShipped() {
802      if (getEncodingState() != null) {
803        getEncodingState().beforeShipped();
804      }
805    }
806
807    EncodingState getEncodingState() {
808      return dataBlockEncodingCtx.getEncodingState();
809    }
810
811    /**
812     * @param dataBlockEncoder data block encoding algorithm to use
813     */
814    public Writer(Configuration conf, HFileDataBlockEncoder dataBlockEncoder,
815      HFileContext fileContext) {
816      this(conf, dataBlockEncoder, fileContext, ByteBuffAllocator.HEAP);
817    }
818
819    public Writer(Configuration conf, HFileDataBlockEncoder dataBlockEncoder,
820      HFileContext fileContext, ByteBuffAllocator allocator) {
821      if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) {
822        throw new RuntimeException("Unsupported value of bytesPerChecksum. " + " Minimum is "
823          + HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is "
824          + fileContext.getBytesPerChecksum());
825      }
826      this.allocator = allocator;
827      this.dataBlockEncoder =
828        dataBlockEncoder != null ? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE;
829      this.dataBlockEncodingCtx = this.dataBlockEncoder.newDataBlockEncodingContext(conf,
830        HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
831      // TODO: This should be lazily instantiated
832      this.defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(conf, null,
833        HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
834      // TODO: Set BAOS initial size. Use fileContext.getBlocksize() and add for header/checksum
835      baosInMemory = new ByteArrayOutputStream();
836      prevOffsetByType = new long[BlockType.values().length];
837      for (int i = 0; i < prevOffsetByType.length; ++i) {
838        prevOffsetByType[i] = UNSET;
839      }
840      // TODO: Why fileContext saved away when we have dataBlockEncoder and/or
841      // defaultDataBlockEncoder?
842      this.fileContext = fileContext;
843    }
844
845    /**
846     * Starts writing into the block. The previous block's data is discarded.
847     * @return the stream the user can write their data into
848     */
849    DataOutputStream startWriting(BlockType newBlockType) throws IOException {
850      if (state == State.BLOCK_READY && startOffset != -1) {
851        // We had a previous block that was written to a stream at a specific
852        // offset. Save that offset as the last offset of a block of that type.
853        prevOffsetByType[blockType.getId()] = startOffset;
854      }
855
856      startOffset = -1;
857      blockType = newBlockType;
858
859      baosInMemory.reset();
860      baosInMemory.write(HConstants.HFILEBLOCK_DUMMY_HEADER);
861
862      state = State.WRITING;
863
864      // We will compress it later in finishBlock()
865      userDataStream = new ByteBufferWriterDataOutputStream(baosInMemory);
866      if (newBlockType == BlockType.DATA) {
867        this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream);
868      }
869      return userDataStream;
870    }
871
872    /**
873     * Writes the Cell to this block
874     */
875    void write(Cell cell) throws IOException {
876      expectState(State.WRITING);
877      this.dataBlockEncoder.encode(cell, dataBlockEncodingCtx, this.userDataStream);
878    }
879
880    /**
881     * Transitions the block writer from the "writing" state to the "block ready" state. Does
882     * nothing if a block is already finished.
883     */
884    void ensureBlockReady() throws IOException {
885      Preconditions.checkState(state != State.INIT, "Unexpected state: " + state);
886
887      if (state == State.BLOCK_READY) {
888        return;
889      }
890
891      // This will set state to BLOCK_READY.
892      finishBlock();
893    }
894
895    /**
896     * Finish up writing of the block. Flushes the compressing stream (if using compression), fills
897     * out the header, does any compression/encryption of bytes to flush out to disk, and manages
898     * the cache on write content, if applicable. Sets block write state to "block ready".
899     */
900    private void finishBlock() throws IOException {
901      if (blockType == BlockType.DATA) {
902        this.dataBlockEncoder.endBlockEncoding(dataBlockEncodingCtx, userDataStream,
903          baosInMemory.getBuffer(), blockType);
904        blockType = dataBlockEncodingCtx.getBlockType();
905      }
906      userDataStream.flush();
907      prevOffset = prevOffsetByType[blockType.getId()];
908
909      // We need to set state before we can package the block up for cache-on-write. In a way, the
910      // block is ready, but not yet encoded or compressed.
911      state = State.BLOCK_READY;
912      Bytes compressAndEncryptDat;
913      if (blockType == BlockType.DATA || blockType == BlockType.ENCODED_DATA) {
914        compressAndEncryptDat =
915          dataBlockEncodingCtx.compressAndEncrypt(baosInMemory.getBuffer(), 0, baosInMemory.size());
916      } else {
917        compressAndEncryptDat = defaultBlockEncodingCtx.compressAndEncrypt(baosInMemory.getBuffer(),
918          0, baosInMemory.size());
919      }
920      if (compressAndEncryptDat == null) {
921        compressAndEncryptDat = new Bytes(baosInMemory.getBuffer(), 0, baosInMemory.size());
922      }
923      if (onDiskBlockBytesWithHeader == null) {
924        onDiskBlockBytesWithHeader = new ByteArrayOutputStream(compressAndEncryptDat.getLength());
925      }
926      onDiskBlockBytesWithHeader.reset();
927      onDiskBlockBytesWithHeader.write(compressAndEncryptDat.get(),
928        compressAndEncryptDat.getOffset(), compressAndEncryptDat.getLength());
929      // Calculate how many bytes we need for checksum on the tail of the block.
930      int numBytes = (int) ChecksumUtil.numBytes(onDiskBlockBytesWithHeader.size(),
931        fileContext.getBytesPerChecksum());
932
933      // Put the header for the on disk bytes; header currently is unfilled-out
934      putHeader(onDiskBlockBytesWithHeader, onDiskBlockBytesWithHeader.size() + numBytes,
935        baosInMemory.size(), onDiskBlockBytesWithHeader.size());
936      if (onDiskChecksum.length != numBytes) {
937        onDiskChecksum = new byte[numBytes];
938      }
939      ChecksumUtil.generateChecksums(onDiskBlockBytesWithHeader.getBuffer(), 0,
940        onDiskBlockBytesWithHeader.size(), onDiskChecksum, 0, fileContext.getChecksumType(),
941        fileContext.getBytesPerChecksum());
942    }
943
944    /**
945     * Put the header into the given byte array at the given offset.
946     * @param onDiskSize       size of the block on disk header + data + checksum
947     * @param uncompressedSize size of the block after decompression (but before optional data block
948     *                         decoding) including header
949     * @param onDiskDataSize   size of the block on disk with header and data but not including the
950     *                         checksums
951     */
952    private void putHeader(byte[] dest, int offset, int onDiskSize, int uncompressedSize,
953      int onDiskDataSize) {
954      offset = blockType.put(dest, offset);
955      offset = Bytes.putInt(dest, offset, onDiskSize - HConstants.HFILEBLOCK_HEADER_SIZE);
956      offset = Bytes.putInt(dest, offset, uncompressedSize - HConstants.HFILEBLOCK_HEADER_SIZE);
957      offset = Bytes.putLong(dest, offset, prevOffset);
958      offset = Bytes.putByte(dest, offset, fileContext.getChecksumType().getCode());
959      offset = Bytes.putInt(dest, offset, fileContext.getBytesPerChecksum());
960      Bytes.putInt(dest, offset, onDiskDataSize);
961    }
962
963    private void putHeader(ByteBuff buff, int onDiskSize, int uncompressedSize,
964      int onDiskDataSize) {
965      buff.rewind();
966      blockType.write(buff);
967      buff.putInt(onDiskSize - HConstants.HFILEBLOCK_HEADER_SIZE);
968      buff.putInt(uncompressedSize - HConstants.HFILEBLOCK_HEADER_SIZE);
969      buff.putLong(prevOffset);
970      buff.put(fileContext.getChecksumType().getCode());
971      buff.putInt(fileContext.getBytesPerChecksum());
972      buff.putInt(onDiskDataSize);
973    }
974
975    private void putHeader(ByteArrayOutputStream dest, int onDiskSize, int uncompressedSize,
976      int onDiskDataSize) {
977      putHeader(dest.getBuffer(), 0, onDiskSize, uncompressedSize, onDiskDataSize);
978    }
979
980    /**
981     * Similar to {@link #writeHeaderAndData(FSDataOutputStream)}, but records the offset of this
982     * block so that it can be referenced in the next block of the same type.
983     */
984    void writeHeaderAndData(FSDataOutputStream out) throws IOException {
985      long offset = out.getPos();
986      if (startOffset != UNSET && offset != startOffset) {
987        throw new IOException("A " + blockType + " block written to a "
988          + "stream twice, first at offset " + startOffset + ", then at " + offset);
989      }
990      startOffset = offset;
991      finishBlockAndWriteHeaderAndData(out);
992    }
993
994    /**
995     * Writes the header and the compressed data of this block (or uncompressed data when not using
996     * compression) into the given stream. Can be called in the "writing" state or in the "block
997     * ready" state. If called in the "writing" state, transitions the writer to the "block ready"
998     * state.
999     * @param out the output stream to write the
1000     */
1001    protected void finishBlockAndWriteHeaderAndData(DataOutputStream out) throws IOException {
1002      ensureBlockReady();
1003      long startTime = EnvironmentEdgeManager.currentTime();
1004      out.write(onDiskBlockBytesWithHeader.getBuffer(), 0, onDiskBlockBytesWithHeader.size());
1005      out.write(onDiskChecksum);
1006      HFile.updateWriteLatency(EnvironmentEdgeManager.currentTime() - startTime);
1007    }
1008
1009    /**
1010     * Returns the header or the compressed data (or uncompressed data when not using compression)
1011     * as a byte array. Can be called in the "writing" state or in the "block ready" state. If
1012     * called in the "writing" state, transitions the writer to the "block ready" state. This
1013     * returns the header + data + checksums stored on disk.
1014     * @return header and data as they would be stored on disk in a byte array
1015     */
1016    byte[] getHeaderAndDataForTest() throws IOException {
1017      ensureBlockReady();
1018      // This is not very optimal, because we are doing an extra copy.
1019      // But this method is used only by unit tests.
1020      byte[] output = new byte[onDiskBlockBytesWithHeader.size() + onDiskChecksum.length];
1021      System.arraycopy(onDiskBlockBytesWithHeader.getBuffer(), 0, output, 0,
1022        onDiskBlockBytesWithHeader.size());
1023      System.arraycopy(onDiskChecksum, 0, output, onDiskBlockBytesWithHeader.size(),
1024        onDiskChecksum.length);
1025      return output;
1026    }
1027
1028    /**
1029     * Releases resources used by this writer.
1030     */
1031    void release() {
1032      if (dataBlockEncodingCtx != null) {
1033        dataBlockEncodingCtx.close();
1034        dataBlockEncodingCtx = null;
1035      }
1036      if (defaultBlockEncodingCtx != null) {
1037        defaultBlockEncodingCtx.close();
1038        defaultBlockEncodingCtx = null;
1039      }
1040    }
1041
1042    /**
1043     * Returns the on-disk size of the data portion of the block. This is the compressed size if
1044     * compression is enabled. Can only be called in the "block ready" state. Header is not
1045     * compressed, and its size is not included in the return value.
1046     * @return the on-disk size of the block, not including the header.
1047     */
1048    int getOnDiskSizeWithoutHeader() {
1049      expectState(State.BLOCK_READY);
1050      return onDiskBlockBytesWithHeader.size() + onDiskChecksum.length
1051        - HConstants.HFILEBLOCK_HEADER_SIZE;
1052    }
1053
1054    /**
1055     * Returns the on-disk size of the block. Can only be called in the "block ready" state.
1056     * @return the on-disk size of the block ready to be written, including the header size, the
1057     *         data and the checksum data.
1058     */
1059    int getOnDiskSizeWithHeader() {
1060      expectState(State.BLOCK_READY);
1061      return onDiskBlockBytesWithHeader.size() + onDiskChecksum.length;
1062    }
1063
1064    /**
1065     * The uncompressed size of the block data. Does not include header size.
1066     */
1067    int getUncompressedSizeWithoutHeader() {
1068      expectState(State.BLOCK_READY);
1069      return baosInMemory.size() - HConstants.HFILEBLOCK_HEADER_SIZE;
1070    }
1071
1072    /**
1073     * The uncompressed size of the block data, including header size.
1074     */
1075    int getUncompressedSizeWithHeader() {
1076      expectState(State.BLOCK_READY);
1077      return baosInMemory.size();
1078    }
1079
1080    /** @return true if a block is being written */
1081    boolean isWriting() {
1082      return state == State.WRITING;
1083    }
1084
1085    /**
1086     * Returns the number of bytes written into the current block so far, or zero if not writing the
1087     * block at the moment. Note that this will return zero in the "block ready" state as well.
1088     * @return the number of bytes written
1089     */
1090    public int encodedBlockSizeWritten() {
1091      return state != State.WRITING ? 0 : this.getEncodingState().getEncodedDataSizeWritten();
1092    }
1093
1094    /**
1095     * Returns the number of bytes written into the current block so far, or zero if not writing the
1096     * block at the moment. Note that this will return zero in the "block ready" state as well.
1097     * @return the number of bytes written
1098     */
1099    int blockSizeWritten() {
1100      return state != State.WRITING ? 0 : this.getEncodingState().getUnencodedDataSizeWritten();
1101    }
1102
1103    /**
1104     * Clones the header followed by the uncompressed data, even if using compression. This is
1105     * needed for storing uncompressed blocks in the block cache. Can be called in the "writing"
1106     * state or the "block ready" state. Returns only the header and data, does not include checksum
1107     * data.
1108     * @return Returns an uncompressed block ByteBuff for caching on write
1109     */
1110    ByteBuff cloneUncompressedBufferWithHeader() {
1111      expectState(State.BLOCK_READY);
1112      ByteBuff bytebuff = allocator.allocate(baosInMemory.size());
1113      baosInMemory.toByteBuff(bytebuff);
1114      int numBytes = (int) ChecksumUtil.numBytes(onDiskBlockBytesWithHeader.size(),
1115        fileContext.getBytesPerChecksum());
1116      putHeader(bytebuff, onDiskBlockBytesWithHeader.size() + numBytes, baosInMemory.size(),
1117        onDiskBlockBytesWithHeader.size());
1118      bytebuff.rewind();
1119      return bytebuff;
1120    }
1121
1122    /**
1123     * Clones the header followed by the on-disk (compressed/encoded/encrypted) data. This is needed
1124     * for storing packed blocks in the block cache. Returns only the header and data, Does not
1125     * include checksum data.
1126     * @return Returns a copy of block bytes for caching on write
1127     */
1128    private ByteBuff cloneOnDiskBufferWithHeader() {
1129      expectState(State.BLOCK_READY);
1130      ByteBuff bytebuff = allocator.allocate(onDiskBlockBytesWithHeader.size());
1131      onDiskBlockBytesWithHeader.toByteBuff(bytebuff);
1132      bytebuff.rewind();
1133      return bytebuff;
1134    }
1135
1136    private void expectState(State expectedState) {
1137      if (state != expectedState) {
1138        throw new IllegalStateException(
1139          "Expected state: " + expectedState + ", actual state: " + state);
1140      }
1141    }
1142
1143    /**
1144     * Takes the given {@link BlockWritable} instance, creates a new block of its appropriate type,
1145     * writes the writable into this block, and flushes the block into the output stream. The writer
1146     * is instructed not to buffer uncompressed bytes for cache-on-write.
1147     * @param bw  the block-writable object to write as a block
1148     * @param out the file system output stream
1149     */
1150    void writeBlock(BlockWritable bw, FSDataOutputStream out) throws IOException {
1151      bw.writeToBlock(startWriting(bw.getBlockType()));
1152      writeHeaderAndData(out);
1153    }
1154
1155    /**
1156     * Creates a new HFileBlock. Checksums have already been validated, so the byte buffer passed
1157     * into the constructor of this newly created block does not have checksum data even though the
1158     * header minor version is MINOR_VERSION_WITH_CHECKSUM. This is indicated by setting a 0 value
1159     * in bytesPerChecksum. This method copies the on-disk or uncompressed data to build the
1160     * HFileBlock which is used only while writing blocks and caching.
1161     * <p>
1162     * TODO: Should there be an option where a cache can ask that hbase preserve block checksums for
1163     * checking after a block comes out of the cache? Otehrwise, cache is responsible for blocks
1164     * being wholesome (ECC memory or if file-backed, it does checksumming).
1165     */
1166    HFileBlock getBlockForCaching(CacheConfig cacheConf) {
1167      HFileContext newContext = new HFileContextBuilder().withBlockSize(fileContext.getBlocksize())
1168        .withBytesPerCheckSum(0).withChecksumType(ChecksumType.NULL) // no checksums in cached data
1169        .withCompression(fileContext.getCompression())
1170        .withDataBlockEncoding(fileContext.getDataBlockEncoding())
1171        .withHBaseCheckSum(fileContext.isUseHBaseChecksum())
1172        .withCompressTags(fileContext.isCompressTags())
1173        .withIncludesMvcc(fileContext.isIncludesMvcc())
1174        .withIncludesTags(fileContext.isIncludesTags())
1175        .withColumnFamily(fileContext.getColumnFamily()).withTableName(fileContext.getTableName())
1176        .build();
1177      // Build the HFileBlock.
1178      HFileBlockBuilder builder = new HFileBlockBuilder();
1179      ByteBuff buff;
1180      if (cacheConf.shouldCacheCompressed(blockType.getCategory())) {
1181        buff = cloneOnDiskBufferWithHeader();
1182      } else {
1183        buff = cloneUncompressedBufferWithHeader();
1184      }
1185      return builder.withBlockType(blockType)
1186        .withOnDiskSizeWithoutHeader(getOnDiskSizeWithoutHeader())
1187        .withUncompressedSizeWithoutHeader(getUncompressedSizeWithoutHeader())
1188        .withPrevBlockOffset(prevOffset).withByteBuff(buff).withFillHeader(FILL_HEADER)
1189        .withOffset(startOffset).withNextBlockOnDiskSize(UNSET)
1190        .withOnDiskDataSizeWithHeader(onDiskBlockBytesWithHeader.size() + onDiskChecksum.length)
1191        .withHFileContext(newContext).withByteBuffAllocator(cacheConf.getByteBuffAllocator())
1192        .withShared(!buff.hasArray()).build();
1193    }
1194  }
1195
1196  /** Something that can be written into a block. */
1197  interface BlockWritable {
1198    /** The type of block this data should use. */
1199    BlockType getBlockType();
1200
1201    /**
1202     * Writes the block to the provided stream. Must not write any magic records.
1203     * @param out a stream to write uncompressed data into
1204     */
1205    void writeToBlock(DataOutput out) throws IOException;
1206  }
1207
1208  /**
1209   * Iterator for reading {@link HFileBlock}s in load-on-open-section, such as root data index
1210   * block, meta index block, file info block etc.
1211   */
1212  interface BlockIterator {
1213    /**
1214     * Get the next block, or null if there are no more blocks to iterate.
1215     */
1216    HFileBlock nextBlock() throws IOException;
1217
1218    /**
1219     * Similar to {@link #nextBlock()} but checks block type, throws an exception if incorrect, and
1220     * returns the HFile block
1221     */
1222    HFileBlock nextBlockWithBlockType(BlockType blockType) throws IOException;
1223
1224    /**
1225     * Now we use the {@link ByteBuffAllocator} to manage the nio ByteBuffers for HFileBlocks, so we
1226     * must deallocate all of the ByteBuffers in the end life. the BlockIterator's life cycle is
1227     * starting from opening an HFileReader and stopped when the HFileReader#close, so we will keep
1228     * track all the read blocks until we call {@link BlockIterator#freeBlocks()} when closing the
1229     * HFileReader. Sum bytes of those blocks in load-on-open section should be quite small, so
1230     * tracking them should be OK.
1231     */
1232    void freeBlocks();
1233  }
1234
1235  /** An HFile block reader with iteration ability. */
1236  interface FSReader {
1237    /**
1238     * Reads the block at the given offset in the file with the given on-disk size and uncompressed
1239     * size.
1240     * @param offset        of the file to read
1241     * @param onDiskSize    the on-disk size of the entire block, including all applicable headers,
1242     *                      or -1 if unknown
1243     * @param pread         true to use pread, otherwise use the stream read.
1244     * @param updateMetrics update the metrics or not.
1245     * @param intoHeap      allocate the block's ByteBuff by {@link ByteBuffAllocator} or JVM heap.
1246     *                      For LRUBlockCache, we must ensure that the block to cache is an heap
1247     *                      one, because the memory occupation is based on heap now, also for
1248     *                      {@link CombinedBlockCache}, we use the heap LRUBlockCache as L1 cache to
1249     *                      cache small blocks such as IndexBlock or MetaBlock for faster access. So
1250     *                      introduce an flag here to decide whether allocate from JVM heap or not
1251     *                      so that we can avoid an extra off-heap to heap memory copy when using
1252     *                      LRUBlockCache. For most cases, we known what's the expected block type
1253     *                      we'll read, while for some special case (Example:
1254     *                      HFileReaderImpl#readNextDataBlock()), we cannot pre-decide what's the
1255     *                      expected block type, then we can only allocate block's ByteBuff from
1256     *                      {@link ByteBuffAllocator} firstly, and then when caching it in
1257     *                      {@link LruBlockCache} we'll check whether the ByteBuff is from heap or
1258     *                      not, if not then we'll clone it to an heap one and cache it.
1259     * @return the newly read block
1260     */
1261    HFileBlock readBlockData(long offset, long onDiskSize, boolean pread, boolean updateMetrics,
1262      boolean intoHeap) throws IOException;
1263
1264    /**
1265     * Creates a block iterator over the given portion of the {@link HFile}. The iterator returns
1266     * blocks starting with offset such that offset &lt;= startOffset &lt; endOffset. Returned
1267     * blocks are always unpacked. Used when no hfile index available; e.g. reading in the hfile
1268     * index blocks themselves on file open.
1269     * @param startOffset the offset of the block to start iteration with
1270     * @param endOffset   the offset to end iteration at (exclusive)
1271     * @return an iterator of blocks between the two given offsets
1272     */
1273    BlockIterator blockRange(long startOffset, long endOffset);
1274
1275    /** Closes the backing streams */
1276    void closeStreams() throws IOException;
1277
1278    /** Get a decoder for {@link BlockType#ENCODED_DATA} blocks from this file. */
1279    HFileBlockDecodingContext getBlockDecodingContext();
1280
1281    /** Get the default decoder for blocks from this file. */
1282    HFileBlockDecodingContext getDefaultBlockDecodingContext();
1283
1284    void setIncludesMemStoreTS(boolean includesMemstoreTS);
1285
1286    void setDataBlockEncoder(HFileDataBlockEncoder encoder, Configuration conf);
1287
1288    /**
1289     * To close the stream's socket. Note: This can be concurrently called from multiple threads and
1290     * implementation should take care of thread safety.
1291     */
1292    void unbufferStream();
1293  }
1294
1295  /**
1296   * Data-structure to use caching the header of the NEXT block. Only works if next read that comes
1297   * in here is next in sequence in this block. When we read, we read current block and the next
1298   * blocks' header. We do this so we have the length of the next block to read if the hfile index
1299   * is not available (rare, at hfile open only).
1300   */
1301  private static class PrefetchedHeader {
1302    long offset = -1;
1303    byte[] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE];
1304    final ByteBuff buf = new SingleByteBuff(ByteBuffer.wrap(header, 0, header.length));
1305
1306    @Override
1307    public String toString() {
1308      return "offset=" + this.offset + ", header=" + Bytes.toStringBinary(header);
1309    }
1310  }
1311
1312  /**
1313   * Reads version 2 HFile blocks from the filesystem.
1314   */
1315  static class FSReaderImpl implements FSReader {
1316    /**
1317     * The file system stream of the underlying {@link HFile} that does or doesn't do checksum
1318     * validations in the filesystem
1319     */
1320    private FSDataInputStreamWrapper streamWrapper;
1321
1322    private HFileBlockDecodingContext encodedBlockDecodingCtx;
1323
1324    /** Default context used when BlockType != {@link BlockType#ENCODED_DATA}. */
1325    private final HFileBlockDefaultDecodingContext defaultDecodingCtx;
1326
1327    /**
1328     * Cache of the NEXT header after this. Check it is indeed next blocks header before using it.
1329     * TODO: Review. This overread into next block to fetch next blocks header seems unnecessary
1330     * given we usually get the block size from the hfile index. Review!
1331     */
1332    private AtomicReference<PrefetchedHeader> prefetchedHeader =
1333      new AtomicReference<>(new PrefetchedHeader());
1334
1335    /** The size of the file we are reading from, or -1 if unknown. */
1336    private long fileSize;
1337
1338    /** The size of the header */
1339    protected final int hdrSize;
1340
1341    /** The filesystem used to access data */
1342    private HFileSystem hfs;
1343
1344    private HFileContext fileContext;
1345    // Cache the fileName
1346    private String pathName;
1347
1348    private final ByteBuffAllocator allocator;
1349
1350    private final Lock streamLock = new ReentrantLock();
1351
1352    private final boolean isPreadAllBytes;
1353
1354    FSReaderImpl(ReaderContext readerContext, HFileContext fileContext, ByteBuffAllocator allocator,
1355      Configuration conf) throws IOException {
1356      this.fileSize = readerContext.getFileSize();
1357      this.hfs = readerContext.getFileSystem();
1358      if (readerContext.getFilePath() != null) {
1359        this.pathName = readerContext.getFilePath().toString();
1360      }
1361      this.fileContext = fileContext;
1362      this.hdrSize = headerSize(fileContext.isUseHBaseChecksum());
1363      this.allocator = allocator;
1364
1365      this.streamWrapper = readerContext.getInputStreamWrapper();
1366      // Older versions of HBase didn't support checksum.
1367      this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum());
1368      defaultDecodingCtx = new HFileBlockDefaultDecodingContext(conf, fileContext);
1369      encodedBlockDecodingCtx = defaultDecodingCtx;
1370      isPreadAllBytes = readerContext.isPreadAllBytes();
1371    }
1372
1373    @Override
1374    public BlockIterator blockRange(final long startOffset, final long endOffset) {
1375      final FSReader owner = this; // handle for inner class
1376      return new BlockIterator() {
1377        private volatile boolean freed = false;
1378        // Tracking all read blocks until we call freeBlocks.
1379        private List<HFileBlock> blockTracker = new ArrayList<>();
1380        private long offset = startOffset;
1381        // Cache length of next block. Current block has the length of next block in it.
1382        private long length = -1;
1383
1384        @Override
1385        public HFileBlock nextBlock() throws IOException {
1386          if (offset >= endOffset) {
1387            return null;
1388          }
1389          HFileBlock b = readBlockData(offset, length, false, false, true);
1390          offset += b.getOnDiskSizeWithHeader();
1391          length = b.getNextBlockOnDiskSize();
1392          HFileBlock uncompressed = b.unpack(fileContext, owner);
1393          if (uncompressed != b) {
1394            b.release(); // Need to release the compressed Block now.
1395          }
1396          blockTracker.add(uncompressed);
1397          return uncompressed;
1398        }
1399
1400        @Override
1401        public HFileBlock nextBlockWithBlockType(BlockType blockType) throws IOException {
1402          HFileBlock blk = nextBlock();
1403          if (blk.getBlockType() != blockType) {
1404            throw new IOException(
1405              "Expected block of type " + blockType + " but found " + blk.getBlockType());
1406          }
1407          return blk;
1408        }
1409
1410        @Override
1411        public void freeBlocks() {
1412          if (freed) {
1413            return;
1414          }
1415          blockTracker.forEach(HFileBlock::release);
1416          blockTracker = null;
1417          freed = true;
1418        }
1419      };
1420    }
1421
1422    /**
1423     * Does a positional read or a seek and read into the given byte buffer. We need take care that
1424     * we will call the {@link ByteBuff#release()} for every exit to deallocate the ByteBuffers,
1425     * otherwise the memory leak may happen.
1426     * @param dest              destination buffer
1427     * @param size              size of read
1428     * @param peekIntoNextBlock whether to read the next block's on-disk size
1429     * @param fileOffset        position in the stream to read at
1430     * @param pread             whether we should do a positional read
1431     * @param istream           The input source of data
1432     * @return true to indicate the destination buffer include the next block header, otherwise only
1433     *         include the current block data without the next block header.
1434     * @throws IOException if any IO error happen.
1435     */
1436    protected boolean readAtOffset(FSDataInputStream istream, ByteBuff dest, int size,
1437      boolean peekIntoNextBlock, long fileOffset, boolean pread) throws IOException {
1438      if (!pread) {
1439        // Seek + read. Better for scanning.
1440        HFileUtil.seekOnMultipleSources(istream, fileOffset);
1441        long realOffset = istream.getPos();
1442        if (realOffset != fileOffset) {
1443          throw new IOException("Tried to seek to " + fileOffset + " to read " + size
1444            + " bytes, but pos=" + realOffset + " after seek");
1445        }
1446        if (!peekIntoNextBlock) {
1447          BlockIOUtils.readFully(dest, istream, size);
1448          return false;
1449        }
1450
1451        // Try to read the next block header
1452        if (!BlockIOUtils.readWithExtra(dest, istream, size, hdrSize)) {
1453          // did not read the next block header.
1454          return false;
1455        }
1456      } else {
1457        // Positional read. Better for random reads; or when the streamLock is already locked.
1458        int extraSize = peekIntoNextBlock ? hdrSize : 0;
1459        if (
1460          !BlockIOUtils.preadWithExtra(dest, istream, fileOffset, size, extraSize, isPreadAllBytes)
1461        ) {
1462          // did not read the next block header.
1463          return false;
1464        }
1465      }
1466      assert peekIntoNextBlock;
1467      return true;
1468    }
1469
1470    /**
1471     * Reads a version 2 block (version 1 blocks not supported and not expected). Tries to do as
1472     * little memory allocation as possible, using the provided on-disk size.
1473     * @param offset                the offset in the stream to read at
1474     * @param onDiskSizeWithHeaderL the on-disk size of the block, including the header, or -1 if
1475     *                              unknown; i.e. when iterating over blocks reading in the file
1476     *                              metadata info.
1477     * @param pread                 whether to use a positional read
1478     * @param updateMetrics         whether to update the metrics
1479     * @param intoHeap              allocate ByteBuff of block from heap or off-heap.
1480     * @see FSReader#readBlockData(long, long, boolean, boolean, boolean) for more details about the
1481     *      useHeap.
1482     */
1483    @Override
1484    public HFileBlock readBlockData(long offset, long onDiskSizeWithHeaderL, boolean pread,
1485      boolean updateMetrics, boolean intoHeap) throws IOException {
1486      // Get a copy of the current state of whether to validate
1487      // hbase checksums or not for this read call. This is not
1488      // thread-safe but the one constaint is that if we decide
1489      // to skip hbase checksum verification then we are
1490      // guaranteed to use hdfs checksum verification.
1491      boolean doVerificationThruHBaseChecksum = streamWrapper.shouldUseHBaseChecksum();
1492      FSDataInputStream is = streamWrapper.getStream(doVerificationThruHBaseChecksum);
1493
1494      HFileBlock blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread,
1495        doVerificationThruHBaseChecksum, updateMetrics, intoHeap);
1496      if (blk == null) {
1497        HFile.LOG.warn("HBase checksum verification failed for file " + pathName + " at offset "
1498          + offset + " filesize " + fileSize + ". Retrying read with HDFS checksums turned on...");
1499
1500        if (!doVerificationThruHBaseChecksum) {
1501          String msg = "HBase checksum verification failed for file " + pathName + " at offset "
1502            + offset + " filesize " + fileSize + " but this cannot happen because doVerify is "
1503            + doVerificationThruHBaseChecksum;
1504          HFile.LOG.warn(msg);
1505          throw new IOException(msg); // cannot happen case here
1506        }
1507        HFile.CHECKSUM_FAILURES.increment(); // update metrics
1508
1509        // If we have a checksum failure, we fall back into a mode where
1510        // the next few reads use HDFS level checksums. We aim to make the
1511        // next CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD reads avoid
1512        // hbase checksum verification, but since this value is set without
1513        // holding any locks, it can so happen that we might actually do
1514        // a few more than precisely this number.
1515        is = this.streamWrapper.fallbackToFsChecksum(CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD);
1516        doVerificationThruHBaseChecksum = false;
1517        blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread,
1518          doVerificationThruHBaseChecksum, updateMetrics, intoHeap);
1519        if (blk != null) {
1520          HFile.LOG.warn("HDFS checksum verification succeeded for file " + pathName + " at offset "
1521            + offset + " filesize " + fileSize);
1522        }
1523      }
1524      if (blk == null && !doVerificationThruHBaseChecksum) {
1525        String msg =
1526          "readBlockData failed, possibly due to " + "checksum verification failed for file "
1527            + pathName + " at offset " + offset + " filesize " + fileSize;
1528        HFile.LOG.warn(msg);
1529        throw new IOException(msg);
1530      }
1531
1532      // If there is a checksum mismatch earlier, then retry with
1533      // HBase checksums switched off and use HDFS checksum verification.
1534      // This triggers HDFS to detect and fix corrupt replicas. The
1535      // next checksumOffCount read requests will use HDFS checksums.
1536      // The decrementing of this.checksumOffCount is not thread-safe,
1537      // but it is harmless because eventually checksumOffCount will be
1538      // a negative number.
1539      streamWrapper.checksumOk();
1540      return blk;
1541    }
1542
1543    /**
1544     * @return Check <code>onDiskSizeWithHeaderL</code> size is healthy and then return it as an int
1545     */
1546    private static int checkAndGetSizeAsInt(final long onDiskSizeWithHeaderL, final int hdrSize)
1547      throws IOException {
1548      if (
1549        (onDiskSizeWithHeaderL < hdrSize && onDiskSizeWithHeaderL != -1)
1550          || onDiskSizeWithHeaderL >= Integer.MAX_VALUE
1551      ) {
1552        throw new IOException(
1553          "Invalid onDisksize=" + onDiskSizeWithHeaderL + ": expected to be at least " + hdrSize
1554            + " and at most " + Integer.MAX_VALUE + ", or -1");
1555      }
1556      return (int) onDiskSizeWithHeaderL;
1557    }
1558
1559    /**
1560     * Verify the passed in onDiskSizeWithHeader aligns with what is in the header else something is
1561     * not right.
1562     */
1563    private void verifyOnDiskSizeMatchesHeader(final int passedIn, final ByteBuff headerBuf,
1564      final long offset, boolean verifyChecksum) throws IOException {
1565      // Assert size provided aligns with what is in the header
1566      int fromHeader = getOnDiskSizeWithHeader(headerBuf, verifyChecksum);
1567      if (passedIn != fromHeader) {
1568        throw new IOException("Passed in onDiskSizeWithHeader=" + passedIn + " != " + fromHeader
1569          + ", offset=" + offset + ", fileContext=" + this.fileContext);
1570      }
1571    }
1572
1573    /**
1574     * Check atomic reference cache for this block's header. Cache only good if next read coming
1575     * through is next in sequence in the block. We read next block's header on the tail of reading
1576     * the previous block to save a seek. Otherwise, we have to do a seek to read the header before
1577     * we can pull in the block OR we have to backup the stream because we over-read (the next
1578     * block's header).
1579     * @see PrefetchedHeader
1580     * @return The cached block header or null if not found.
1581     * @see #cacheNextBlockHeader(long, ByteBuff, int, int)
1582     */
1583    private ByteBuff getCachedHeader(final long offset) {
1584      PrefetchedHeader ph = this.prefetchedHeader.get();
1585      return ph != null && ph.offset == offset ? ph.buf : null;
1586    }
1587
1588    /**
1589     * Save away the next blocks header in atomic reference.
1590     * @see #getCachedHeader(long)
1591     * @see PrefetchedHeader
1592     */
1593    private void cacheNextBlockHeader(final long offset, ByteBuff onDiskBlock,
1594      int onDiskSizeWithHeader, int headerLength) {
1595      PrefetchedHeader ph = new PrefetchedHeader();
1596      ph.offset = offset;
1597      onDiskBlock.get(onDiskSizeWithHeader, ph.header, 0, headerLength);
1598      this.prefetchedHeader.set(ph);
1599    }
1600
1601    private int getNextBlockOnDiskSize(boolean readNextHeader, ByteBuff onDiskBlock,
1602      int onDiskSizeWithHeader) {
1603      int nextBlockOnDiskSize = -1;
1604      if (readNextHeader) {
1605        nextBlockOnDiskSize =
1606          onDiskBlock.getIntAfterPosition(onDiskSizeWithHeader + BlockType.MAGIC_LENGTH) + hdrSize;
1607      }
1608      return nextBlockOnDiskSize;
1609    }
1610
1611    private ByteBuff allocate(int size, boolean intoHeap) {
1612      return intoHeap ? HEAP.allocate(size) : allocator.allocate(size);
1613    }
1614
1615    /**
1616     * Reads a version 2 block.
1617     * @param offset                the offset in the stream to read at.
1618     * @param onDiskSizeWithHeaderL the on-disk size of the block, including the header and
1619     *                              checksums if present or -1 if unknown (as a long). Can be -1 if
1620     *                              we are doing raw iteration of blocks as when loading up file
1621     *                              metadata; i.e. the first read of a new file. Usually non-null
1622     *                              gotten from the file index.
1623     * @param pread                 whether to use a positional read
1624     * @param verifyChecksum        Whether to use HBase checksums. If HBase checksum is switched
1625     *                              off, then use HDFS checksum. Can also flip on/off reading same
1626     *                              file if we hit a troublesome patch in an hfile.
1627     * @param updateMetrics         whether need to update the metrics.
1628     * @param intoHeap              allocate the ByteBuff of block from heap or off-heap.
1629     * @return the HFileBlock or null if there is a HBase checksum mismatch
1630     */
1631    protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset,
1632      long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum, boolean updateMetrics,
1633      boolean intoHeap) throws IOException {
1634      if (offset < 0) {
1635        throw new IOException("Invalid offset=" + offset + " trying to read " + "block (onDiskSize="
1636          + onDiskSizeWithHeaderL + ")");
1637      }
1638      int onDiskSizeWithHeader = checkAndGetSizeAsInt(onDiskSizeWithHeaderL, hdrSize);
1639      // Try and get cached header. Will serve us in rare case where onDiskSizeWithHeaderL is -1
1640      // and will save us having to seek the stream backwards to reread the header we
1641      // read the last time through here.
1642      ByteBuff headerBuf = getCachedHeader(offset);
1643      LOG.trace(
1644        "Reading {} at offset={}, pread={}, verifyChecksum={}, cachedHeader={}, "
1645          + "onDiskSizeWithHeader={}",
1646        this.fileContext.getHFileName(), offset, pread, verifyChecksum, headerBuf,
1647        onDiskSizeWithHeader);
1648      // This is NOT same as verifyChecksum. This latter is whether to do hbase
1649      // checksums. Can change with circumstances. The below flag is whether the
1650      // file has support for checksums (version 2+).
1651      boolean checksumSupport = this.fileContext.isUseHBaseChecksum();
1652      long startTime = EnvironmentEdgeManager.currentTime();
1653      if (onDiskSizeWithHeader <= 0) {
1654        // We were not passed the block size. Need to get it from the header. If header was
1655        // not cached (see getCachedHeader above), need to seek to pull it in. This is costly
1656        // and should happen very rarely. Currently happens on open of a hfile reader where we
1657        // read the trailer blocks to pull in the indices. Otherwise, we are reading block sizes
1658        // out of the hfile index. To check, enable TRACE in this file and you'll get an exception
1659        // in a LOG every time we seek. See HBASE-17072 for more detail.
1660        if (headerBuf == null) {
1661          if (LOG.isTraceEnabled()) {
1662            LOG.trace("Extra see to get block size!", new RuntimeException());
1663          }
1664          headerBuf = HEAP.allocate(hdrSize);
1665          readAtOffset(is, headerBuf, hdrSize, false, offset, pread);
1666          headerBuf.rewind();
1667        }
1668        onDiskSizeWithHeader = getOnDiskSizeWithHeader(headerBuf, checksumSupport);
1669      }
1670      int preReadHeaderSize = headerBuf == null ? 0 : hdrSize;
1671      // Allocate enough space to fit the next block's header too; saves a seek next time through.
1672      // onDiskBlock is whole block + header + checksums then extra hdrSize to read next header;
1673      // onDiskSizeWithHeader is header, body, and any checksums if present. preReadHeaderSize
1674      // says where to start reading. If we have the header cached, then we don't need to read
1675      // it again and we can likely read from last place we left off w/o need to backup and reread
1676      // the header we read last time through here.
1677      ByteBuff onDiskBlock = this.allocate(onDiskSizeWithHeader + hdrSize, intoHeap);
1678      boolean initHFileBlockSuccess = false;
1679      try {
1680        if (headerBuf != null) {
1681          onDiskBlock.put(0, headerBuf, 0, hdrSize).position(hdrSize);
1682        }
1683        boolean readNextHeader = readAtOffset(is, onDiskBlock,
1684          onDiskSizeWithHeader - preReadHeaderSize, true, offset + preReadHeaderSize, pread);
1685        onDiskBlock.rewind(); // in case of moving position when copying a cached header
1686        int nextBlockOnDiskSize =
1687          getNextBlockOnDiskSize(readNextHeader, onDiskBlock, onDiskSizeWithHeader);
1688        if (headerBuf == null) {
1689          headerBuf = onDiskBlock.duplicate().position(0).limit(hdrSize);
1690        }
1691        // Do a few checks before we go instantiate HFileBlock.
1692        assert onDiskSizeWithHeader > this.hdrSize;
1693        verifyOnDiskSizeMatchesHeader(onDiskSizeWithHeader, headerBuf, offset, checksumSupport);
1694        ByteBuff curBlock = onDiskBlock.duplicate().position(0).limit(onDiskSizeWithHeader);
1695        // Verify checksum of the data before using it for building HFileBlock.
1696        if (verifyChecksum && !validateChecksum(offset, curBlock, hdrSize)) {
1697          return null;
1698        }
1699        long duration = EnvironmentEdgeManager.currentTime() - startTime;
1700        if (updateMetrics) {
1701          HFile.updateReadLatency(duration, pread);
1702        }
1703        // The onDiskBlock will become the headerAndDataBuffer for this block.
1704        // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already
1705        // contains the header of next block, so no need to set next block's header in it.
1706        HFileBlock hFileBlock = createFromBuff(curBlock, checksumSupport, offset,
1707          nextBlockOnDiskSize, fileContext, intoHeap ? HEAP : allocator);
1708        // Run check on uncompressed sizings.
1709        if (!fileContext.isCompressedOrEncrypted()) {
1710          hFileBlock.sanityCheckUncompressed();
1711        }
1712        LOG.trace("Read {} in {} ms", hFileBlock, duration);
1713        // Cache next block header if we read it for the next time through here.
1714        if (nextBlockOnDiskSize != -1) {
1715          cacheNextBlockHeader(offset + hFileBlock.getOnDiskSizeWithHeader(), onDiskBlock,
1716            onDiskSizeWithHeader, hdrSize);
1717        }
1718        initHFileBlockSuccess = true;
1719        return hFileBlock;
1720      } finally {
1721        if (!initHFileBlockSuccess) {
1722          onDiskBlock.release();
1723        }
1724      }
1725    }
1726
1727    @Override
1728    public void setIncludesMemStoreTS(boolean includesMemstoreTS) {
1729      this.fileContext =
1730        new HFileContextBuilder(this.fileContext).withIncludesMvcc(includesMemstoreTS).build();
1731    }
1732
1733    @Override
1734    public void setDataBlockEncoder(HFileDataBlockEncoder encoder, Configuration conf) {
1735      encodedBlockDecodingCtx = encoder.newDataBlockDecodingContext(conf, fileContext);
1736    }
1737
1738    @Override
1739    public HFileBlockDecodingContext getBlockDecodingContext() {
1740      return this.encodedBlockDecodingCtx;
1741    }
1742
1743    @Override
1744    public HFileBlockDecodingContext getDefaultBlockDecodingContext() {
1745      return this.defaultDecodingCtx;
1746    }
1747
1748    /**
1749     * Generates the checksum for the header as well as the data and then validates it. If the block
1750     * doesn't uses checksum, returns false.
1751     * @return True if checksum matches, else false.
1752     */
1753    private boolean validateChecksum(long offset, ByteBuff data, int hdrSize) {
1754      // If this is an older version of the block that does not have checksums, then return false
1755      // indicating that checksum verification did not succeed. Actually, this method should never
1756      // be called when the minorVersion is 0, thus this is a defensive check for a cannot-happen
1757      // case. Since this is a cannot-happen case, it is better to return false to indicate a
1758      // checksum validation failure.
1759      if (!fileContext.isUseHBaseChecksum()) {
1760        return false;
1761      }
1762      return ChecksumUtil.validateChecksum(data, pathName, offset, hdrSize);
1763    }
1764
1765    @Override
1766    public void closeStreams() throws IOException {
1767      streamWrapper.close();
1768    }
1769
1770    @Override
1771    public void unbufferStream() {
1772      // To handle concurrent reads, ensure that no other client is accessing the streams while we
1773      // unbuffer it.
1774      if (streamLock.tryLock()) {
1775        try {
1776          this.streamWrapper.unbuffer();
1777        } finally {
1778          streamLock.unlock();
1779        }
1780      }
1781    }
1782
1783    @Override
1784    public String toString() {
1785      return "hfs=" + hfs + ", path=" + pathName + ", fileContext=" + fileContext;
1786    }
1787  }
1788
1789  /** An additional sanity-check in case no compression or encryption is being used. */
1790  void sanityCheckUncompressed() throws IOException {
1791    if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader + totalChecksumBytes()) {
1792      throw new IOException("Using no compression but " + "onDiskSizeWithoutHeader="
1793        + onDiskSizeWithoutHeader + ", " + "uncompressedSizeWithoutHeader="
1794        + uncompressedSizeWithoutHeader + ", numChecksumbytes=" + totalChecksumBytes());
1795    }
1796  }
1797
1798  // Cacheable implementation
1799  @Override
1800  public int getSerializedLength() {
1801    if (buf != null) {
1802      // Include extra bytes for block metadata.
1803      return this.buf.limit() + BLOCK_METADATA_SPACE;
1804    }
1805    return 0;
1806  }
1807
1808  // Cacheable implementation
1809  @Override
1810  public void serialize(ByteBuffer destination, boolean includeNextBlockMetadata) {
1811    this.buf.get(destination, 0, getSerializedLength() - BLOCK_METADATA_SPACE);
1812    destination = addMetaData(destination, includeNextBlockMetadata);
1813
1814    // Make it ready for reading. flip sets position to zero and limit to current position which
1815    // is what we want if we do not want to serialize the block plus checksums if present plus
1816    // metadata.
1817    destination.flip();
1818  }
1819
1820  /**
1821   * For use by bucketcache. This exposes internals.
1822   */
1823  public ByteBuffer getMetaData(ByteBuffer bb) {
1824    bb = addMetaData(bb, true);
1825    bb.flip();
1826    return bb;
1827  }
1828
1829  /**
1830   * Adds metadata at current position (position is moved forward). Does not flip or reset.
1831   * @return The passed <code>destination</code> with metadata added.
1832   */
1833  private ByteBuffer addMetaData(final ByteBuffer destination, boolean includeNextBlockMetadata) {
1834    destination.put(this.fileContext.isUseHBaseChecksum() ? (byte) 1 : (byte) 0);
1835    destination.putLong(this.offset);
1836    if (includeNextBlockMetadata) {
1837      destination.putInt(this.nextBlockOnDiskSize);
1838    }
1839    return destination;
1840  }
1841
1842  // Cacheable implementation
1843  @Override
1844  public CacheableDeserializer<Cacheable> getDeserializer() {
1845    return HFileBlock.BLOCK_DESERIALIZER;
1846  }
1847
1848  @Override
1849  public int hashCode() {
1850    int result = 1;
1851    result = result * 31 + blockType.hashCode();
1852    result = result * 31 + nextBlockOnDiskSize;
1853    result = result * 31 + (int) (offset ^ (offset >>> 32));
1854    result = result * 31 + onDiskSizeWithoutHeader;
1855    result = result * 31 + (int) (prevBlockOffset ^ (prevBlockOffset >>> 32));
1856    result = result * 31 + uncompressedSizeWithoutHeader;
1857    result = result * 31 + buf.hashCode();
1858    return result;
1859  }
1860
1861  @Override
1862  public boolean equals(Object comparison) {
1863    if (this == comparison) {
1864      return true;
1865    }
1866    if (comparison == null) {
1867      return false;
1868    }
1869    if (!(comparison instanceof HFileBlock)) {
1870      return false;
1871    }
1872
1873    HFileBlock castedComparison = (HFileBlock) comparison;
1874
1875    if (castedComparison.blockType != this.blockType) {
1876      return false;
1877    }
1878    if (castedComparison.nextBlockOnDiskSize != this.nextBlockOnDiskSize) {
1879      return false;
1880    }
1881    // Offset is important. Needed when we have to remake cachekey when block is returned to cache.
1882    if (castedComparison.offset != this.offset) {
1883      return false;
1884    }
1885    if (castedComparison.onDiskSizeWithoutHeader != this.onDiskSizeWithoutHeader) {
1886      return false;
1887    }
1888    if (castedComparison.prevBlockOffset != this.prevBlockOffset) {
1889      return false;
1890    }
1891    if (castedComparison.uncompressedSizeWithoutHeader != this.uncompressedSizeWithoutHeader) {
1892      return false;
1893    }
1894    if (
1895      ByteBuff.compareTo(this.buf, 0, this.buf.limit(), castedComparison.buf, 0,
1896        castedComparison.buf.limit()) != 0
1897    ) {
1898      return false;
1899    }
1900    return true;
1901  }
1902
1903  DataBlockEncoding getDataBlockEncoding() {
1904    if (blockType == BlockType.ENCODED_DATA) {
1905      return DataBlockEncoding.getEncodingById(getDataBlockEncodingId());
1906    }
1907    return DataBlockEncoding.NONE;
1908  }
1909
1910  byte getChecksumType() {
1911    return this.fileContext.getChecksumType().getCode();
1912  }
1913
1914  int getBytesPerChecksum() {
1915    return this.fileContext.getBytesPerChecksum();
1916  }
1917
1918  /** @return the size of data on disk + header. Excludes checksum. */
1919  int getOnDiskDataSizeWithHeader() {
1920    return this.onDiskDataSizeWithHeader;
1921  }
1922
1923  /**
1924   * Calculate the number of bytes required to store all the checksums for this block. Each checksum
1925   * value is a 4 byte integer.
1926   */
1927  int totalChecksumBytes() {
1928    // If the hfile block has minorVersion 0, then there are no checksum
1929    // data to validate. Similarly, a zero value in this.bytesPerChecksum
1930    // indicates that cached blocks do not have checksum data because
1931    // checksums were already validated when the block was read from disk.
1932    if (!fileContext.isUseHBaseChecksum() || this.fileContext.getBytesPerChecksum() == 0) {
1933      return 0;
1934    }
1935    return (int) ChecksumUtil.numBytes(onDiskDataSizeWithHeader,
1936      this.fileContext.getBytesPerChecksum());
1937  }
1938
1939  /**
1940   * Returns the size of this block header.
1941   */
1942  public int headerSize() {
1943    return headerSize(this.fileContext.isUseHBaseChecksum());
1944  }
1945
1946  /**
1947   * Maps a minor version to the size of the header.
1948   */
1949  public static int headerSize(boolean usesHBaseChecksum) {
1950    return usesHBaseChecksum
1951      ? HConstants.HFILEBLOCK_HEADER_SIZE
1952      : HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
1953  }
1954
1955  /**
1956   * Return the appropriate DUMMY_HEADER for the minor version
1957   */
1958  // TODO: Why is this in here?
1959  byte[] getDummyHeaderForVersion() {
1960    return getDummyHeaderForVersion(this.fileContext.isUseHBaseChecksum());
1961  }
1962
1963  /**
1964   * Return the appropriate DUMMY_HEADER for the minor version
1965   */
1966  static private byte[] getDummyHeaderForVersion(boolean usesHBaseChecksum) {
1967    return usesHBaseChecksum ? HConstants.HFILEBLOCK_DUMMY_HEADER : DUMMY_HEADER_NO_CHECKSUM;
1968  }
1969
1970  /**
1971   * @return This HFileBlocks fileContext which will a derivative of the fileContext for the file
1972   *         from which this block's data was originally read.
1973   */
1974  public HFileContext getHFileContext() {
1975    return this.fileContext;
1976  }
1977
1978  /**
1979   * Convert the contents of the block header into a human readable string. This is mostly helpful
1980   * for debugging. This assumes that the block has minor version > 0.
1981   */
1982  static String toStringHeader(ByteBuff buf) throws IOException {
1983    byte[] magicBuf = new byte[Math.min(buf.limit() - buf.position(), BlockType.MAGIC_LENGTH)];
1984    buf.get(magicBuf);
1985    BlockType bt = BlockType.parse(magicBuf, 0, BlockType.MAGIC_LENGTH);
1986    int compressedBlockSizeNoHeader = buf.getInt();
1987    int uncompressedBlockSizeNoHeader = buf.getInt();
1988    long prevBlockOffset = buf.getLong();
1989    byte cksumtype = buf.get();
1990    long bytesPerChecksum = buf.getInt();
1991    long onDiskDataSizeWithHeader = buf.getInt();
1992    return " Header dump: magic: " + Bytes.toString(magicBuf) + " blockType " + bt
1993      + " compressedBlockSizeNoHeader " + compressedBlockSizeNoHeader
1994      + " uncompressedBlockSizeNoHeader " + uncompressedBlockSizeNoHeader + " prevBlockOffset "
1995      + prevBlockOffset + " checksumType " + ChecksumType.codeToType(cksumtype)
1996      + " bytesPerChecksum " + bytesPerChecksum + " onDiskDataSizeWithHeader "
1997      + onDiskDataSizeWithHeader;
1998  }
1999
2000  private static HFileBlockBuilder createBuilder(HFileBlock blk) {
2001    return new HFileBlockBuilder().withBlockType(blk.blockType)
2002      .withOnDiskSizeWithoutHeader(blk.onDiskSizeWithoutHeader)
2003      .withUncompressedSizeWithoutHeader(blk.uncompressedSizeWithoutHeader)
2004      .withPrevBlockOffset(blk.prevBlockOffset).withByteBuff(blk.buf.duplicate()) // Duplicate the
2005                                                                                  // buffer.
2006      .withOffset(blk.offset).withOnDiskDataSizeWithHeader(blk.onDiskDataSizeWithHeader)
2007      .withNextBlockOnDiskSize(blk.nextBlockOnDiskSize).withHFileContext(blk.fileContext)
2008      .withByteBuffAllocator(blk.allocator).withShared(blk.isSharedMem());
2009  }
2010
2011  static HFileBlock shallowClone(HFileBlock blk) {
2012    return createBuilder(blk).build();
2013  }
2014
2015  static HFileBlock deepCloneOnHeap(HFileBlock blk) {
2016    ByteBuff deepCloned = ByteBuff.wrap(ByteBuffer.wrap(blk.buf.toBytes(0, blk.buf.limit())));
2017    return createBuilder(blk).withByteBuff(deepCloned).withShared(false).build();
2018  }
2019}