001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP;
021
022import java.io.DataInputStream;
023import java.io.DataOutput;
024import java.io.DataOutputStream;
025import java.io.IOException;
026import java.nio.ByteBuffer;
027import java.util.ArrayList;
028import java.util.List;
029import java.util.concurrent.atomic.AtomicReference;
030import java.util.concurrent.locks.Lock;
031import java.util.concurrent.locks.ReentrantLock;
032
033import org.apache.hadoop.fs.FSDataInputStream;
034import org.apache.hadoop.fs.FSDataOutputStream;
035import org.apache.hadoop.hbase.Cell;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.io.ByteBuffAllocator;
038import org.apache.hadoop.hbase.io.util.BlockIOUtils;
039import org.apache.yetus.audience.InterfaceAudience;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042import org.apache.hadoop.hbase.fs.HFileSystem;
043import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
044import org.apache.hadoop.hbase.io.ByteBuffInputStream;
045import org.apache.hadoop.hbase.io.ByteBufferWriterDataOutputStream;
046import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
047import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
048import org.apache.hadoop.hbase.io.encoding.EncodingState;
049import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
050import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
051import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
052import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
053import org.apache.hadoop.hbase.nio.ByteBuff;
054import org.apache.hadoop.hbase.nio.MultiByteBuff;
055import org.apache.hadoop.hbase.nio.SingleByteBuff;
056import org.apache.hadoop.hbase.regionserver.ShipperListener;
057import org.apache.hadoop.hbase.util.Bytes;
058import org.apache.hadoop.hbase.util.ChecksumType;
059import org.apache.hadoop.hbase.util.ClassSize;
060
061import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
062import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
063
064/**
065 * Cacheable Blocks of an {@link HFile} version 2 file.
066 * Version 2 was introduced in hbase-0.92.0.
067 *
068 * <p>Version 1 was the original file block. Version 2 was introduced when we changed the hbase file
069 * format to support multi-level block indexes and compound bloom filters (HBASE-3857). Support
070 * for Version 1 was removed in hbase-1.3.0.
071 *
072 * <h3>HFileBlock: Version 2</h3>
073 * In version 2, a block is structured as follows:
074 * <ul>
075 * <li><b>Header:</b> See Writer#putHeader() for where header is written; header total size is
076 * HFILEBLOCK_HEADER_SIZE
077 * <ul>
078 * <li>0. blockType: Magic record identifying the {@link BlockType} (8 bytes):
079 * e.g. <code>DATABLK*</code>
080 * <li>1. onDiskSizeWithoutHeader: Compressed -- a.k.a 'on disk' -- block size, excluding header,
081 * but including tailing checksum bytes (4 bytes)
082 * <li>2. uncompressedSizeWithoutHeader: Uncompressed block size, excluding header, and excluding
083 * checksum bytes (4 bytes)
084 * <li>3. prevBlockOffset: The offset of the previous block of the same type (8 bytes). This is
085 * used to navigate to the previous block without having to go to the block index
086 * <li>4: For minorVersions &gt;=1, the ordinal describing checksum type (1 byte)
087 * <li>5: For minorVersions &gt;=1, the number of data bytes/checksum chunk (4 bytes)
088 * <li>6: onDiskDataSizeWithHeader: For minorVersions &gt;=1, the size of data 'on disk', including
089 * header, excluding checksums (4 bytes)
090 * </ul>
091 * </li>
092 * <li><b>Raw/Compressed/Encrypted/Encoded data:</b> The compression
093 * algorithm is the same for all the blocks in an {@link HFile}. If compression is NONE, this is
094 * just raw, serialized Cells.
095 * <li><b>Tail:</b> For minorVersions &gt;=1, a series of 4 byte checksums, one each for
096 * the number of bytes specified by bytesPerChecksum.
097 * </ul>
098 *
099 * <h3>Caching</h3>
100 * Caches cache whole blocks with trailing checksums if any. We then tag on some metadata, the
101 * content of BLOCK_METADATA_SPACE which will be flag on if we are doing 'hbase'
102 * checksums and then the offset into the file which is needed when we re-make a cache key
103 * when we return the block to the cache as 'done'.
104 * See {@link Cacheable#serialize(ByteBuffer, boolean)} and {@link Cacheable#getDeserializer()}.
105 *
106 * <p>TODO: Should we cache the checksums? Down in Writer#getBlockForCaching(CacheConfig) where
107 * we make a block to cache-on-write, there is an attempt at turning off checksums. This is not the
108 * only place we get blocks to cache. We also will cache the raw return from an hdfs read. In this
109 * case, the checksums may be present. If the cache is backed by something that doesn't do ECC,
110 * say an SSD, we might want to preserve checksums. For now this is open question.
111 * <p>TODO: Over in BucketCache, we save a block allocation by doing a custom serialization.
112 * Be sure to change it if serialization changes in here. Could we add a method here that takes an
113 * IOEngine and that then serializes to it rather than expose our internals over in BucketCache?
114 * IOEngine is in the bucket subpackage. Pull it up? Then this class knows about bucketcache. Ugh.
115 */
116@InterfaceAudience.Private
117public class HFileBlock implements Cacheable {
118  private static final Logger LOG = LoggerFactory.getLogger(HFileBlock.class);
119  public static final int FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT +
120     // BlockType, ByteBuff, MemoryType, HFileContext, ByteBuffAllocator
121      5 * ClassSize.REFERENCE +
122      // On-disk size, uncompressed size, and next block's on-disk size
123      // bytePerChecksum and onDiskDataSize
124      4 * Bytes.SIZEOF_INT +
125      // This and previous block offset
126      2 * Bytes.SIZEOF_LONG);
127
128  // Block Header fields.
129
130  // TODO: encapsulate Header related logic in this inner class.
131  static class Header {
132    // Format of header is:
133    // 8 bytes - block magic
134    // 4 bytes int - onDiskSizeWithoutHeader
135    // 4 bytes int - uncompressedSizeWithoutHeader
136    // 8 bytes long - prevBlockOffset
137    // The following 3 are only present if header contains checksum information
138    // 1 byte - checksum type
139    // 4 byte int - bytes per checksum
140    // 4 byte int - onDiskDataSizeWithHeader
141    static int BLOCK_MAGIC_INDEX = 0;
142    static int ON_DISK_SIZE_WITHOUT_HEADER_INDEX = 8;
143    static int UNCOMPRESSED_SIZE_WITHOUT_HEADER_INDEX = 12;
144    static int PREV_BLOCK_OFFSET_INDEX = 16;
145    static int CHECKSUM_TYPE_INDEX = 24;
146    static int BYTES_PER_CHECKSUM_INDEX = 25;
147    static int ON_DISK_DATA_SIZE_WITH_HEADER_INDEX = 29;
148  }
149
150  /** Type of block. Header field 0. */
151  private BlockType blockType;
152
153  /**
154   * Size on disk excluding header, including checksum. Header field 1.
155   * @see Writer#putHeader(byte[], int, int, int, int)
156   */
157  private int onDiskSizeWithoutHeader;
158
159  /**
160   * Size of pure data. Does not include header or checksums. Header field 2.
161   * @see Writer#putHeader(byte[], int, int, int, int)
162   */
163  private int uncompressedSizeWithoutHeader;
164
165  /**
166   * The offset of the previous block on disk. Header field 3.
167   * @see Writer#putHeader(byte[], int, int, int, int)
168   */
169  private long prevBlockOffset;
170
171  /**
172   * Size on disk of header + data. Excludes checksum. Header field 6,
173   * OR calculated from {@link #onDiskSizeWithoutHeader} when using HDFS checksum.
174   * @see Writer#putHeader(byte[], int, int, int, int)
175   */
176  private int onDiskDataSizeWithHeader;
177  // End of Block Header fields.
178
179  /**
180   * The in-memory representation of the hfile block. Can be on or offheap. Can be backed by
181   * a single ByteBuffer or by many. Make no assumptions.
182   *
183   * <p>Be careful reading from this <code>buf</code>. Duplicate and work on the duplicate or if
184   * not, be sure to reset position and limit else trouble down the road.
185   *
186   * <p>TODO: Make this read-only once made.
187   *
188   * <p>We are using the ByteBuff type. ByteBuffer is not extensible yet we need to be able to have
189   * a ByteBuffer-like API across multiple ByteBuffers reading from a cache such as BucketCache.
190   * So, we have this ByteBuff type. Unfortunately, it is spread all about HFileBlock. Would be
191   * good if could be confined to cache-use only but hard-to-do.
192   */
193  private ByteBuff buf;
194
195  /** Meta data that holds meta information on the hfileblock.
196   */
197  private HFileContext fileContext;
198
199  /**
200   * The offset of this block in the file. Populated by the reader for
201   * convenience of access. This offset is not part of the block header.
202   */
203  private long offset = UNSET;
204
205  /**
206   * The on-disk size of the next block, including the header and checksums if present.
207   * UNSET if unknown.
208   *
209   * Blocks try to carry the size of the next block to read in this data member. Usually
210   * we get block sizes from the hfile index but sometimes the index is not available:
211   * e.g. when we read the indexes themselves (indexes are stored in blocks, we do not
212   * have an index for the indexes). Saves seeks especially around file open when
213   * there is a flurry of reading in hfile metadata.
214   */
215  private int nextBlockOnDiskSize = UNSET;
216
217  private ByteBuffAllocator allocator;
218
219  /**
220   * On a checksum failure, do these many succeeding read requests using hdfs checksums before
221   * auto-reenabling hbase checksum verification.
222   */
223  static final int CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD = 3;
224
225  private static int UNSET = -1;
226  public static final boolean FILL_HEADER = true;
227  public static final boolean DONT_FILL_HEADER = false;
228
229  // How to get the estimate correctly? if it is a singleBB?
230  public static final int MULTI_BYTE_BUFFER_HEAP_SIZE =
231      (int)ClassSize.estimateBase(MultiByteBuff.class, false);
232
233  /**
234   * Space for metadata on a block that gets stored along with the block when we cache it.
235   * There are a few bytes stuck on the end of the HFileBlock that we pull in from HDFS.
236   * 8 bytes are for the offset of this block (long) in the file. Offset is important because is is
237   * used when we remake the CacheKey when we return block to the cache when done. There is also
238   * a flag on whether checksumming is being done by hbase or not. See class comment for note on
239   * uncertain state of checksumming of blocks that come out of cache (should we or should we not?).
240   * Finally there are 4 bytes to hold the length of the next block which can save a seek on
241   * occasion if available.
242   * (This EXTRA info came in with original commit of the bucketcache, HBASE-7404. It was
243   * formerly known as EXTRA_SERIALIZATION_SPACE).
244   */
245  static final int BLOCK_METADATA_SPACE = Bytes.SIZEOF_BYTE + Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT;
246
247  /**
248   * Each checksum value is an integer that can be stored in 4 bytes.
249   */
250  static final int CHECKSUM_SIZE = Bytes.SIZEOF_INT;
251
252  static final byte[] DUMMY_HEADER_NO_CHECKSUM =
253      new byte[HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM];
254
255  /**
256   * Used deserializing blocks from Cache.
257   *
258   * <code>
259   * ++++++++++++++
260   * + HFileBlock +
261   * ++++++++++++++
262   * + Checksums  + <= Optional
263   * ++++++++++++++
264   * + Metadata!  + <= See note on BLOCK_METADATA_SPACE above.
265   * ++++++++++++++
266   * </code>
267   * @see #serialize(ByteBuffer, boolean)
268   */
269  public static final CacheableDeserializer<Cacheable> BLOCK_DESERIALIZER = new BlockDeserializer();
270
271  public static final class BlockDeserializer implements CacheableDeserializer<Cacheable> {
272    private BlockDeserializer() {
273    }
274
275    @Override
276    public HFileBlock deserialize(ByteBuff buf, ByteBuffAllocator alloc)
277        throws IOException {
278      // The buf has the file block followed by block metadata.
279      // Set limit to just before the BLOCK_METADATA_SPACE then rewind.
280      buf.limit(buf.limit() - BLOCK_METADATA_SPACE).rewind();
281      // Get a new buffer to pass the HFileBlock for it to 'own'.
282      ByteBuff newByteBuff = buf.slice();
283      // Read out the BLOCK_METADATA_SPACE content and shove into our HFileBlock.
284      buf.position(buf.limit());
285      buf.limit(buf.limit() + HFileBlock.BLOCK_METADATA_SPACE);
286      boolean usesChecksum = buf.get() == (byte) 1;
287      long offset = buf.getLong();
288      int nextBlockOnDiskSize = buf.getInt();
289      return createFromBuff(newByteBuff, usesChecksum, offset, nextBlockOnDiskSize, null, alloc);
290    }
291
292    @Override
293    public int getDeserializerIdentifier() {
294      return DESERIALIZER_IDENTIFIER;
295    }
296  }
297
298  private static final int DESERIALIZER_IDENTIFIER;
299  static {
300    DESERIALIZER_IDENTIFIER =
301        CacheableDeserializerIdManager.registerDeserializer(BLOCK_DESERIALIZER);
302  }
303
304  /**
305   * Creates a new {@link HFile} block from the given fields. This constructor
306   * is used only while writing blocks and caching,
307   * and is sitting in a byte buffer and we want to stuff the block into cache.
308   *
309   * <p>TODO: The caller presumes no checksumming
310   * <p>TODO: HFile block writer can also off-heap ? </p>
311   * required of this block instance since going into cache; checksum already verified on
312   * underlying block data pulled in from filesystem. Is that correct? What if cache is SSD?
313   *
314   * @param blockType the type of this block, see {@link BlockType}
315   * @param onDiskSizeWithoutHeader see {@link #onDiskSizeWithoutHeader}
316   * @param uncompressedSizeWithoutHeader see {@link #uncompressedSizeWithoutHeader}
317   * @param prevBlockOffset see {@link #prevBlockOffset}
318   * @param buf block buffer with header ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes)
319   * @param fillHeader when true, write the first 4 header fields into passed buffer.
320   * @param offset the file offset the block was read from
321   * @param onDiskDataSizeWithHeader see {@link #onDiskDataSizeWithHeader}
322   * @param fileContext HFile meta data
323   */
324  @VisibleForTesting
325  public HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader,
326      int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuff buf, boolean fillHeader,
327      long offset, int nextBlockOnDiskSize, int onDiskDataSizeWithHeader, HFileContext fileContext,
328      ByteBuffAllocator allocator) {
329    this.blockType = blockType;
330    this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader;
331    this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader;
332    this.prevBlockOffset = prevBlockOffset;
333    this.offset = offset;
334    this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader;
335    this.nextBlockOnDiskSize = nextBlockOnDiskSize;
336    this.fileContext = fileContext;
337    this.allocator = allocator;
338    this.buf = buf;
339    if (fillHeader) {
340      overwriteHeader();
341    }
342    this.buf.rewind();
343  }
344
345  /**
346   * Creates a block from an existing buffer starting with a header. Rewinds
347   * and takes ownership of the buffer. By definition of rewind, ignores the
348   * buffer position, but if you slice the buffer beforehand, it will rewind
349   * to that point.
350   * @param buf Has header, content, and trailing checksums if present.
351   */
352  static HFileBlock createFromBuff(ByteBuff buf, boolean usesHBaseChecksum, final long offset,
353      final int nextBlockOnDiskSize, HFileContext fileContext, ByteBuffAllocator allocator)
354      throws IOException {
355    buf.rewind();
356    final BlockType blockType = BlockType.read(buf);
357    final int onDiskSizeWithoutHeader = buf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX);
358    final int uncompressedSizeWithoutHeader =
359        buf.getInt(Header.UNCOMPRESSED_SIZE_WITHOUT_HEADER_INDEX);
360    final long prevBlockOffset = buf.getLong(Header.PREV_BLOCK_OFFSET_INDEX);
361    // This constructor is called when we deserialize a block from cache and when we read a block in
362    // from the fs. fileCache is null when deserialized from cache so need to make up one.
363    HFileContextBuilder fileContextBuilder =
364        fileContext != null ? new HFileContextBuilder(fileContext) : new HFileContextBuilder();
365    fileContextBuilder.withHBaseCheckSum(usesHBaseChecksum);
366    int onDiskDataSizeWithHeader;
367    if (usesHBaseChecksum) {
368      byte checksumType = buf.get(Header.CHECKSUM_TYPE_INDEX);
369      int bytesPerChecksum = buf.getInt(Header.BYTES_PER_CHECKSUM_INDEX);
370      onDiskDataSizeWithHeader = buf.getInt(Header.ON_DISK_DATA_SIZE_WITH_HEADER_INDEX);
371      // Use the checksum type and bytes per checksum from header, not from fileContext.
372      fileContextBuilder.withChecksumType(ChecksumType.codeToType(checksumType));
373      fileContextBuilder.withBytesPerCheckSum(bytesPerChecksum);
374    } else {
375      fileContextBuilder.withChecksumType(ChecksumType.NULL);
376      fileContextBuilder.withBytesPerCheckSum(0);
377      // Need to fix onDiskDataSizeWithHeader; there are not checksums after-block-data
378      onDiskDataSizeWithHeader = onDiskSizeWithoutHeader + headerSize(usesHBaseChecksum);
379    }
380    fileContext = fileContextBuilder.build();
381    assert usesHBaseChecksum == fileContext.isUseHBaseChecksum();
382    return new HFileBlockBuilder()
383        .withBlockType(blockType)
384        .withOnDiskSizeWithoutHeader(onDiskSizeWithoutHeader)
385        .withUncompressedSizeWithoutHeader(uncompressedSizeWithoutHeader)
386        .withPrevBlockOffset(prevBlockOffset)
387        .withOffset(offset)
388        .withOnDiskDataSizeWithHeader(onDiskDataSizeWithHeader)
389        .withNextBlockOnDiskSize(nextBlockOnDiskSize)
390        .withHFileContext(fileContext)
391        .withByteBuffAllocator(allocator)
392        .withByteBuff(buf.rewind())
393        .withShared(!buf.hasArray())
394        .build();
395  }
396
397  /**
398   * Parse total on disk size including header and checksum.
399   * @param headerBuf Header ByteBuffer. Presumed exact size of header.
400   * @param verifyChecksum true if checksum verification is in use.
401   * @return Size of the block with header included.
402   */
403  private static int getOnDiskSizeWithHeader(final ByteBuff headerBuf,
404      boolean verifyChecksum) {
405    return headerBuf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX) + headerSize(verifyChecksum);
406  }
407
408  /**
409   * @return the on-disk size of the next block (including the header size and any checksums if
410   * present) read by peeking into the next block's header; use as a hint when doing
411   * a read of the next block when scanning or running over a file.
412   */
413  int getNextBlockOnDiskSize() {
414    return nextBlockOnDiskSize;
415  }
416
417  @Override
418  public BlockType getBlockType() {
419    return blockType;
420  }
421
422  @Override
423  public int refCnt() {
424    return buf.refCnt();
425  }
426
427  @Override
428  public HFileBlock retain() {
429    buf.retain();
430    return this;
431  }
432
433  /**
434   * Call {@link ByteBuff#release()} to decrease the reference count, if no other reference, it will
435   * return back the {@link ByteBuffer} to {@link org.apache.hadoop.hbase.io.ByteBuffAllocator}
436   */
437  @Override
438  public boolean release() {
439    return buf.release();
440  }
441
442  /** @return get data block encoding id that was used to encode this block */
443  short getDataBlockEncodingId() {
444    if (blockType != BlockType.ENCODED_DATA) {
445      throw new IllegalArgumentException("Querying encoder ID of a block " +
446          "of type other than " + BlockType.ENCODED_DATA + ": " + blockType);
447    }
448    return buf.getShort(headerSize());
449  }
450
451  /**
452   * @return the on-disk size of header + data part + checksum.
453   */
454  public int getOnDiskSizeWithHeader() {
455    return onDiskSizeWithoutHeader + headerSize();
456  }
457
458  /**
459   * @return the on-disk size of the data part + checksum (header excluded).
460   */
461  int getOnDiskSizeWithoutHeader() {
462    return onDiskSizeWithoutHeader;
463  }
464
465  /**
466   * @return the uncompressed size of data part (header and checksum excluded).
467   */
468   int getUncompressedSizeWithoutHeader() {
469    return uncompressedSizeWithoutHeader;
470  }
471
472  /**
473   * @return the offset of the previous block of the same type in the file, or
474   *         -1 if unknown
475   */
476  long getPrevBlockOffset() {
477    return prevBlockOffset;
478  }
479
480  /**
481   * Rewinds {@code buf} and writes first 4 header fields. {@code buf} position
482   * is modified as side-effect.
483   */
484  private void overwriteHeader() {
485    buf.rewind();
486    blockType.write(buf);
487    buf.putInt(onDiskSizeWithoutHeader);
488    buf.putInt(uncompressedSizeWithoutHeader);
489    buf.putLong(prevBlockOffset);
490    if (this.fileContext.isUseHBaseChecksum()) {
491      buf.put(fileContext.getChecksumType().getCode());
492      buf.putInt(fileContext.getBytesPerChecksum());
493      buf.putInt(onDiskDataSizeWithHeader);
494    }
495  }
496
497  /**
498   * Returns a buffer that does not include the header and checksum.
499   * @return the buffer with header skipped and checksum omitted.
500   */
501  public ByteBuff getBufferWithoutHeader() {
502    return this.getBufferWithoutHeader(false);
503  }
504
505  /**
506   * Returns a buffer that does not include the header or checksum.
507   * @param withChecksum to indicate whether include the checksum or not.
508   * @return the buffer with header skipped and checksum omitted.
509   */
510  public ByteBuff getBufferWithoutHeader(boolean withChecksum) {
511    ByteBuff dup = getBufferReadOnly();
512    int delta = withChecksum ? 0 : totalChecksumBytes();
513    return dup.position(headerSize()).limit(buf.limit() - delta).slice();
514  }
515
516  /**
517   * Returns a read-only duplicate of the buffer this block stores internally ready to be read.
518   * Clients must not modify the buffer object though they may set position and limit on the
519   * returned buffer since we pass back a duplicate. This method has to be public because it is used
520   * in {@link CompoundBloomFilter} to avoid object creation on every Bloom
521   * filter lookup, but has to be used with caution. Buffer holds header, block content,
522   * and any follow-on checksums if present.
523   *
524   * @return the buffer of this block for read-only operations
525   */
526  public ByteBuff getBufferReadOnly() {
527    // TODO: ByteBuf does not support asReadOnlyBuffer(). Fix.
528    ByteBuff dup = this.buf.duplicate();
529    assert dup.position() == 0;
530    return dup;
531  }
532
533  public ByteBuffAllocator getByteBuffAllocator() {
534    return this.allocator;
535  }
536
537  @VisibleForTesting
538  private void sanityCheckAssertion(long valueFromBuf, long valueFromField,
539      String fieldName) throws IOException {
540    if (valueFromBuf != valueFromField) {
541      throw new AssertionError(fieldName + " in the buffer (" + valueFromBuf
542          + ") is different from that in the field (" + valueFromField + ")");
543    }
544  }
545
546  @VisibleForTesting
547  private void sanityCheckAssertion(BlockType valueFromBuf, BlockType valueFromField)
548      throws IOException {
549    if (valueFromBuf != valueFromField) {
550      throw new IOException("Block type stored in the buffer: " +
551        valueFromBuf + ", block type field: " + valueFromField);
552    }
553  }
554
555  /**
556   * Checks if the block is internally consistent, i.e. the first
557   * {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the buffer contain a
558   * valid header consistent with the fields. Assumes a packed block structure.
559   * This function is primary for testing and debugging, and is not
560   * thread-safe, because it alters the internal buffer pointer.
561   * Used by tests only.
562   */
563  @VisibleForTesting
564  void sanityCheck() throws IOException {
565    // Duplicate so no side-effects
566    ByteBuff dup = this.buf.duplicate().rewind();
567    sanityCheckAssertion(BlockType.read(dup), blockType);
568
569    sanityCheckAssertion(dup.getInt(), onDiskSizeWithoutHeader, "onDiskSizeWithoutHeader");
570
571    sanityCheckAssertion(dup.getInt(), uncompressedSizeWithoutHeader,
572        "uncompressedSizeWithoutHeader");
573
574    sanityCheckAssertion(dup.getLong(), prevBlockOffset, "prevBlockOffset");
575    if (this.fileContext.isUseHBaseChecksum()) {
576      sanityCheckAssertion(dup.get(), this.fileContext.getChecksumType().getCode(), "checksumType");
577      sanityCheckAssertion(dup.getInt(), this.fileContext.getBytesPerChecksum(),
578          "bytesPerChecksum");
579      sanityCheckAssertion(dup.getInt(), onDiskDataSizeWithHeader, "onDiskDataSizeWithHeader");
580    }
581
582    int cksumBytes = totalChecksumBytes();
583    int expectedBufLimit = onDiskDataSizeWithHeader + cksumBytes;
584    if (dup.limit() != expectedBufLimit) {
585      throw new AssertionError("Expected limit " + expectedBufLimit + ", got " + dup.limit());
586    }
587
588    // We might optionally allocate HFILEBLOCK_HEADER_SIZE more bytes to read the next
589    // block's header, so there are two sensible values for buffer capacity.
590    int hdrSize = headerSize();
591    dup.rewind();
592    if (dup.remaining() != expectedBufLimit && dup.remaining() != expectedBufLimit + hdrSize) {
593      throw new AssertionError("Invalid buffer capacity: " + dup.remaining() +
594          ", expected " + expectedBufLimit + " or " + (expectedBufLimit + hdrSize));
595    }
596  }
597
598  @Override
599  public String toString() {
600    StringBuilder sb = new StringBuilder()
601      .append("[")
602      .append("blockType=").append(blockType)
603      .append(", fileOffset=").append(offset)
604      .append(", headerSize=").append(headerSize())
605      .append(", onDiskSizeWithoutHeader=").append(onDiskSizeWithoutHeader)
606      .append(", uncompressedSizeWithoutHeader=").append(uncompressedSizeWithoutHeader)
607      .append(", prevBlockOffset=").append(prevBlockOffset)
608      .append(", isUseHBaseChecksum=").append(fileContext.isUseHBaseChecksum());
609    if (fileContext.isUseHBaseChecksum()) {
610      sb.append(", checksumType=").append(ChecksumType.codeToType(this.buf.get(24)))
611        .append(", bytesPerChecksum=").append(this.buf.getInt(24 + 1))
612        .append(", onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader);
613    } else {
614      sb.append(", onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader)
615        .append("(").append(onDiskSizeWithoutHeader)
616        .append("+").append(HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM).append(")");
617    }
618    String dataBegin;
619    if (buf.hasArray()) {
620      dataBegin = Bytes.toStringBinary(buf.array(), buf.arrayOffset() + headerSize(),
621          Math.min(32, buf.limit() - buf.arrayOffset() - headerSize()));
622    } else {
623      ByteBuff bufWithoutHeader = getBufferWithoutHeader();
624      byte[] dataBeginBytes = new byte[Math.min(32,
625          bufWithoutHeader.limit() - bufWithoutHeader.position())];
626      bufWithoutHeader.get(dataBeginBytes);
627      dataBegin = Bytes.toStringBinary(dataBeginBytes);
628    }
629    sb.append(", getOnDiskSizeWithHeader=").append(getOnDiskSizeWithHeader())
630      .append(", totalChecksumBytes=").append(totalChecksumBytes())
631      .append(", isUnpacked=").append(isUnpacked())
632      .append(", buf=[").append(buf).append("]")
633      .append(", dataBeginsWith=").append(dataBegin)
634      .append(", fileContext=").append(fileContext)
635      .append(", nextBlockOnDiskSize=").append(nextBlockOnDiskSize)
636      .append("]");
637    return sb.toString();
638  }
639
640  /**
641   * Retrieves the decompressed/decrypted view of this block. An encoded block remains in its
642   * encoded structure. Internal structures are shared between instances where applicable.
643   */
644  HFileBlock unpack(HFileContext fileContext, FSReader reader) throws IOException {
645    if (!fileContext.isCompressedOrEncrypted()) {
646      // TODO: cannot use our own fileContext here because HFileBlock(ByteBuffer, boolean),
647      // which is used for block serialization to L2 cache, does not preserve encoding and
648      // encryption details.
649      return this;
650    }
651
652    HFileBlock unpacked = shallowClone(this);
653    unpacked.allocateBuffer(); // allocates space for the decompressed block
654    boolean succ = false;
655    try {
656      HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA
657          ? reader.getBlockDecodingContext() : reader.getDefaultBlockDecodingContext();
658      // Create a duplicated buffer without the header part.
659      ByteBuff dup = this.buf.duplicate();
660      dup.position(this.headerSize());
661      dup = dup.slice();
662      // Decode the dup into unpacked#buf
663      ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(),
664        unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(true), dup);
665      succ = true;
666      return unpacked;
667    } finally {
668      if (!succ) {
669        unpacked.release();
670      }
671    }
672  }
673
674  /**
675   * Always allocates a new buffer of the correct size. Copies header bytes
676   * from the existing buffer. Does not change header fields.
677   * Reserve room to keep checksum bytes too.
678   */
679  private void allocateBuffer() {
680    int cksumBytes = totalChecksumBytes();
681    int headerSize = headerSize();
682    int capacityNeeded = headerSize + uncompressedSizeWithoutHeader + cksumBytes;
683
684    ByteBuff newBuf = allocator.allocate(capacityNeeded);
685
686    // Copy header bytes into newBuf.
687    buf.position(0);
688    newBuf.put(0, buf, 0, headerSize);
689
690    buf = newBuf;
691    // set limit to exclude next block's header
692    buf.limit(capacityNeeded);
693  }
694
695  /**
696   * Return true when this block's buffer has been unpacked, false otherwise. Note this is a
697   * calculated heuristic, not tracked attribute of the block.
698   */
699  public boolean isUnpacked() {
700    final int cksumBytes = totalChecksumBytes();
701    final int headerSize = headerSize();
702    final int expectedCapacity = headerSize + uncompressedSizeWithoutHeader + cksumBytes;
703    final int bufCapacity = buf.remaining();
704    return bufCapacity == expectedCapacity || bufCapacity == expectedCapacity + headerSize;
705  }
706
707  /**
708   * Cannot be {@link #UNSET}. Must be a legitimate value. Used re-making the {@link BlockCacheKey} when
709   * block is returned to the cache.
710   * @return the offset of this block in the file it was read from
711   */
712  long getOffset() {
713    if (offset < 0) {
714      throw new IllegalStateException("HFile block offset not initialized properly");
715    }
716    return offset;
717  }
718
719  /**
720   * @return a byte stream reading the data + checksum of this block
721   */
722  DataInputStream getByteStream() {
723    ByteBuff dup = this.buf.duplicate();
724    dup.position(this.headerSize());
725    return new DataInputStream(new ByteBuffInputStream(dup));
726  }
727
728  @Override
729  public long heapSize() {
730    long size = FIXED_OVERHEAD;
731    size += fileContext.heapSize();
732    if (buf != null) {
733      // Deep overhead of the byte buffer. Needs to be aligned separately.
734      size += ClassSize.align(buf.capacity() + MULTI_BYTE_BUFFER_HEAP_SIZE);
735    }
736    return ClassSize.align(size);
737  }
738
739  /**
740   * Will be override by {@link SharedMemHFileBlock} or {@link ExclusiveMemHFileBlock}. Return true
741   * by default.
742   */
743  public boolean isSharedMem() {
744    if (this instanceof SharedMemHFileBlock) {
745      return true;
746    } else if (this instanceof ExclusiveMemHFileBlock) {
747      return false;
748    }
749    return true;
750  }
751
752  /**
753   * Unified version 2 {@link HFile} block writer. The intended usage pattern
754   * is as follows:
755   * <ol>
756   * <li>Construct an {@link HFileBlock.Writer}, providing a compression algorithm.
757   * <li>Call {@link Writer#startWriting} and get a data stream to write to.
758   * <li>Write your data into the stream.
759   * <li>Call Writer#writeHeaderAndData(FSDataOutputStream) as many times as you need to.
760   * store the serialized block into an external stream.
761   * <li>Repeat to write more blocks.
762   * </ol>
763   * <p>
764   */
765  static class Writer implements ShipperListener {
766    private enum State {
767      INIT,
768      WRITING,
769      BLOCK_READY
770    }
771
772    /** Writer state. Used to ensure the correct usage protocol. */
773    private State state = State.INIT;
774
775    /** Data block encoder used for data blocks */
776    private final HFileDataBlockEncoder dataBlockEncoder;
777
778    private HFileBlockEncodingContext dataBlockEncodingCtx;
779
780    /** block encoding context for non-data blocks*/
781    private HFileBlockDefaultEncodingContext defaultBlockEncodingCtx;
782
783    /**
784     * The stream we use to accumulate data into a block in an uncompressed format.
785     * We reset this stream at the end of each block and reuse it. The
786     * header is written as the first {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes into this
787     * stream.
788     */
789    private ByteArrayOutputStream baosInMemory;
790
791    /**
792     * Current block type. Set in {@link #startWriting(BlockType)}. Could be
793     * changed in {@link #finishBlock()} from {@link BlockType#DATA}
794     * to {@link BlockType#ENCODED_DATA}.
795     */
796    private BlockType blockType;
797
798    /**
799     * A stream that we write uncompressed bytes to, which compresses them and
800     * writes them to {@link #baosInMemory}.
801     */
802    private DataOutputStream userDataStream;
803
804    // Size of actual data being written. Not considering the block encoding/compression. This
805    // includes the header size also.
806    private int unencodedDataSizeWritten;
807
808    // Size of actual data being written. considering the block encoding. This
809    // includes the header size also.
810    private int encodedDataSizeWritten;
811
812    /**
813     * Bytes to be written to the file system, including the header. Compressed
814     * if compression is turned on. It also includes the checksum data that
815     * immediately follows the block data. (header + data + checksums)
816     */
817    private ByteArrayOutputStream onDiskBlockBytesWithHeader;
818
819    /**
820     * The size of the checksum data on disk. It is used only if data is
821     * not compressed. If data is compressed, then the checksums are already
822     * part of onDiskBytesWithHeader. If data is uncompressed, then this
823     * variable stores the checksum data for this block.
824     */
825    private byte[] onDiskChecksum = HConstants.EMPTY_BYTE_ARRAY;
826
827    /**
828     * Current block's start offset in the {@link HFile}. Set in
829     * {@link #writeHeaderAndData(FSDataOutputStream)}.
830     */
831    private long startOffset;
832
833    /**
834     * Offset of previous block by block type. Updated when the next block is
835     * started.
836     */
837    private long[] prevOffsetByType;
838
839    /** The offset of the previous block of the same type */
840    private long prevOffset;
841    /** Meta data that holds information about the hfileblock**/
842    private HFileContext fileContext;
843
844    private final ByteBuffAllocator allocator;
845
846    @Override
847    public void beforeShipped() {
848      if (getEncodingState() != null) {
849        getEncodingState().beforeShipped();
850      }
851    }
852
853    EncodingState getEncodingState() {
854      return dataBlockEncodingCtx.getEncodingState();
855    }
856
857    /**
858     * @param dataBlockEncoder data block encoding algorithm to use
859     */
860    @VisibleForTesting
861    public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext fileContext) {
862      this(dataBlockEncoder, fileContext, ByteBuffAllocator.HEAP);
863    }
864
865    public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext fileContext,
866        ByteBuffAllocator allocator) {
867      if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) {
868        throw new RuntimeException("Unsupported value of bytesPerChecksum. " +
869            " Minimum is " + HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is " +
870            fileContext.getBytesPerChecksum());
871      }
872      this.allocator = allocator;
873      this.dataBlockEncoder = dataBlockEncoder != null?
874          dataBlockEncoder: NoOpDataBlockEncoder.INSTANCE;
875      this.dataBlockEncodingCtx = this.dataBlockEncoder.
876          newDataBlockEncodingContext(HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
877      // TODO: This should be lazily instantiated since we usually do NOT need this default encoder
878      this.defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(null,
879          HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
880      // TODO: Set BAOS initial size. Use fileContext.getBlocksize() and add for header/checksum
881      baosInMemory = new ByteArrayOutputStream();
882      prevOffsetByType = new long[BlockType.values().length];
883      for (int i = 0; i < prevOffsetByType.length; ++i) {
884        prevOffsetByType[i] = UNSET;
885      }
886      // TODO: Why fileContext saved away when we have dataBlockEncoder and/or
887      // defaultDataBlockEncoder?
888      this.fileContext = fileContext;
889    }
890
891    /**
892     * Starts writing into the block. The previous block's data is discarded.
893     *
894     * @return the stream the user can write their data into
895     * @throws IOException
896     */
897    DataOutputStream startWriting(BlockType newBlockType)
898        throws IOException {
899      if (state == State.BLOCK_READY && startOffset != -1) {
900        // We had a previous block that was written to a stream at a specific
901        // offset. Save that offset as the last offset of a block of that type.
902        prevOffsetByType[blockType.getId()] = startOffset;
903      }
904
905      startOffset = -1;
906      blockType = newBlockType;
907
908      baosInMemory.reset();
909      baosInMemory.write(HConstants.HFILEBLOCK_DUMMY_HEADER);
910
911      state = State.WRITING;
912
913      // We will compress it later in finishBlock()
914      userDataStream = new ByteBufferWriterDataOutputStream(baosInMemory);
915      if (newBlockType == BlockType.DATA) {
916        this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream);
917      }
918      this.unencodedDataSizeWritten = 0;
919      this.encodedDataSizeWritten = 0;
920      return userDataStream;
921    }
922
923    /**
924     * Writes the Cell to this block
925     * @param cell
926     * @throws IOException
927     */
928    void write(Cell cell) throws IOException{
929      expectState(State.WRITING);
930      int posBeforeEncode = this.userDataStream.size();
931      this.unencodedDataSizeWritten +=
932          this.dataBlockEncoder.encode(cell, dataBlockEncodingCtx, this.userDataStream);
933      this.encodedDataSizeWritten += this.userDataStream.size() - posBeforeEncode;
934    }
935
936    /**
937     * Transitions the block writer from the "writing" state to the "block
938     * ready" state.  Does nothing if a block is already finished.
939     */
940    void ensureBlockReady() throws IOException {
941      Preconditions.checkState(state != State.INIT,
942          "Unexpected state: " + state);
943
944      if (state == State.BLOCK_READY) {
945        return;
946      }
947
948      // This will set state to BLOCK_READY.
949      finishBlock();
950    }
951
952    /**
953     * Finish up writing of the block.
954     * Flushes the compressing stream (if using compression), fills out the header,
955     * does any compression/encryption of bytes to flush out to disk, and manages
956     * the cache on write content, if applicable. Sets block write state to "block ready".
957     */
958    private void finishBlock() throws IOException {
959      if (blockType == BlockType.DATA) {
960        this.dataBlockEncoder.endBlockEncoding(dataBlockEncodingCtx, userDataStream,
961            baosInMemory.getBuffer(), blockType);
962        blockType = dataBlockEncodingCtx.getBlockType();
963      }
964      userDataStream.flush();
965      prevOffset = prevOffsetByType[blockType.getId()];
966
967      // We need to set state before we can package the block up for cache-on-write. In a way, the
968      // block is ready, but not yet encoded or compressed.
969      state = State.BLOCK_READY;
970      Bytes compressAndEncryptDat;
971      if (blockType == BlockType.DATA || blockType == BlockType.ENCODED_DATA) {
972        compressAndEncryptDat = dataBlockEncodingCtx.
973            compressAndEncrypt(baosInMemory.getBuffer(), 0, baosInMemory.size());
974      } else {
975        compressAndEncryptDat = defaultBlockEncodingCtx.
976            compressAndEncrypt(baosInMemory.getBuffer(), 0, baosInMemory.size());
977      }
978      if (compressAndEncryptDat == null) {
979        compressAndEncryptDat = new Bytes(baosInMemory.getBuffer(), 0, baosInMemory.size());
980      }
981      if (onDiskBlockBytesWithHeader == null) {
982        onDiskBlockBytesWithHeader = new ByteArrayOutputStream(compressAndEncryptDat.getLength());
983      }
984      onDiskBlockBytesWithHeader.reset();
985      onDiskBlockBytesWithHeader.write(compressAndEncryptDat.get(),
986            compressAndEncryptDat.getOffset(), compressAndEncryptDat.getLength());
987      // Calculate how many bytes we need for checksum on the tail of the block.
988      int numBytes = (int) ChecksumUtil.numBytes(
989          onDiskBlockBytesWithHeader.size(),
990          fileContext.getBytesPerChecksum());
991
992      // Put the header for the on disk bytes; header currently is unfilled-out
993      putHeader(onDiskBlockBytesWithHeader,
994          onDiskBlockBytesWithHeader.size() + numBytes,
995          baosInMemory.size(), onDiskBlockBytesWithHeader.size());
996      if (onDiskChecksum.length != numBytes) {
997        onDiskChecksum = new byte[numBytes];
998      }
999      ChecksumUtil.generateChecksums(
1000          onDiskBlockBytesWithHeader.getBuffer(), 0,onDiskBlockBytesWithHeader.size(),
1001          onDiskChecksum, 0, fileContext.getChecksumType(), fileContext.getBytesPerChecksum());
1002    }
1003
1004    /**
1005     * Put the header into the given byte array at the given offset.
1006     * @param onDiskSize size of the block on disk header + data + checksum
1007     * @param uncompressedSize size of the block after decompression (but
1008     *          before optional data block decoding) including header
1009     * @param onDiskDataSize size of the block on disk with header
1010     *        and data but not including the checksums
1011     */
1012    private void putHeader(byte[] dest, int offset, int onDiskSize,
1013        int uncompressedSize, int onDiskDataSize) {
1014      offset = blockType.put(dest, offset);
1015      offset = Bytes.putInt(dest, offset, onDiskSize - HConstants.HFILEBLOCK_HEADER_SIZE);
1016      offset = Bytes.putInt(dest, offset, uncompressedSize - HConstants.HFILEBLOCK_HEADER_SIZE);
1017      offset = Bytes.putLong(dest, offset, prevOffset);
1018      offset = Bytes.putByte(dest, offset, fileContext.getChecksumType().getCode());
1019      offset = Bytes.putInt(dest, offset, fileContext.getBytesPerChecksum());
1020      Bytes.putInt(dest, offset, onDiskDataSize);
1021    }
1022
1023    private void putHeader(ByteBuff buff, int onDiskSize,
1024        int uncompressedSize, int onDiskDataSize) {
1025      buff.rewind();
1026      blockType.write(buff);
1027      buff.putInt(onDiskSize - HConstants.HFILEBLOCK_HEADER_SIZE);
1028      buff.putInt(uncompressedSize - HConstants.HFILEBLOCK_HEADER_SIZE);
1029      buff.putLong(prevOffset);
1030      buff.put(fileContext.getChecksumType().getCode());
1031      buff.putInt(fileContext.getBytesPerChecksum());
1032      buff.putInt(onDiskDataSize);
1033    }
1034
1035    private void putHeader(ByteArrayOutputStream dest, int onDiskSize,
1036        int uncompressedSize, int onDiskDataSize) {
1037      putHeader(dest.getBuffer(),0, onDiskSize, uncompressedSize, onDiskDataSize);
1038    }
1039
1040    /**
1041     * Similar to {@link #writeHeaderAndData(FSDataOutputStream)}, but records
1042     * the offset of this block so that it can be referenced in the next block
1043     * of the same type.
1044     *
1045     * @param out
1046     * @throws IOException
1047     */
1048    void writeHeaderAndData(FSDataOutputStream out) throws IOException {
1049      long offset = out.getPos();
1050      if (startOffset != UNSET && offset != startOffset) {
1051        throw new IOException("A " + blockType + " block written to a "
1052            + "stream twice, first at offset " + startOffset + ", then at "
1053            + offset);
1054      }
1055      startOffset = offset;
1056      finishBlockAndWriteHeaderAndData(out);
1057    }
1058
1059    /**
1060     * Writes the header and the compressed data of this block (or uncompressed
1061     * data when not using compression) into the given stream. Can be called in
1062     * the "writing" state or in the "block ready" state. If called in the
1063     * "writing" state, transitions the writer to the "block ready" state.
1064     *
1065     * @param out the output stream to write the
1066     * @throws IOException
1067     */
1068    protected void finishBlockAndWriteHeaderAndData(DataOutputStream out)
1069      throws IOException {
1070      ensureBlockReady();
1071      long startTime = System.currentTimeMillis();
1072      out.write(onDiskBlockBytesWithHeader.getBuffer(), 0, onDiskBlockBytesWithHeader.size());
1073      out.write(onDiskChecksum);
1074      HFile.updateWriteLatency(System.currentTimeMillis() - startTime);
1075    }
1076
1077    /**
1078     * Returns the header or the compressed data (or uncompressed data when not
1079     * using compression) as a byte array. Can be called in the "writing" state
1080     * or in the "block ready" state. If called in the "writing" state,
1081     * transitions the writer to the "block ready" state. This returns
1082     * the header + data + checksums stored on disk.
1083     *
1084     * @return header and data as they would be stored on disk in a byte array
1085     * @throws IOException
1086     */
1087    byte[] getHeaderAndDataForTest() throws IOException {
1088      ensureBlockReady();
1089      // This is not very optimal, because we are doing an extra copy.
1090      // But this method is used only by unit tests.
1091      byte[] output =
1092          new byte[onDiskBlockBytesWithHeader.size()
1093              + onDiskChecksum.length];
1094      System.arraycopy(onDiskBlockBytesWithHeader.getBuffer(), 0, output, 0,
1095          onDiskBlockBytesWithHeader.size());
1096      System.arraycopy(onDiskChecksum, 0, output,
1097          onDiskBlockBytesWithHeader.size(), onDiskChecksum.length);
1098      return output;
1099    }
1100
1101    /**
1102     * Releases resources used by this writer.
1103     */
1104    void release() {
1105      if (dataBlockEncodingCtx != null) {
1106        dataBlockEncodingCtx.close();
1107        dataBlockEncodingCtx = null;
1108      }
1109      if (defaultBlockEncodingCtx != null) {
1110        defaultBlockEncodingCtx.close();
1111        defaultBlockEncodingCtx = null;
1112      }
1113    }
1114
1115    /**
1116     * Returns the on-disk size of the data portion of the block. This is the
1117     * compressed size if compression is enabled. Can only be called in the
1118     * "block ready" state. Header is not compressed, and its size is not
1119     * included in the return value.
1120     *
1121     * @return the on-disk size of the block, not including the header.
1122     */
1123    int getOnDiskSizeWithoutHeader() {
1124      expectState(State.BLOCK_READY);
1125      return onDiskBlockBytesWithHeader.size() +
1126          onDiskChecksum.length - HConstants.HFILEBLOCK_HEADER_SIZE;
1127    }
1128
1129    /**
1130     * Returns the on-disk size of the block. Can only be called in the
1131     * "block ready" state.
1132     *
1133     * @return the on-disk size of the block ready to be written, including the
1134     *         header size, the data and the checksum data.
1135     */
1136    int getOnDiskSizeWithHeader() {
1137      expectState(State.BLOCK_READY);
1138      return onDiskBlockBytesWithHeader.size() + onDiskChecksum.length;
1139    }
1140
1141    /**
1142     * The uncompressed size of the block data. Does not include header size.
1143     */
1144    int getUncompressedSizeWithoutHeader() {
1145      expectState(State.BLOCK_READY);
1146      return baosInMemory.size() - HConstants.HFILEBLOCK_HEADER_SIZE;
1147    }
1148
1149    /**
1150     * The uncompressed size of the block data, including header size.
1151     */
1152    int getUncompressedSizeWithHeader() {
1153      expectState(State.BLOCK_READY);
1154      return baosInMemory.size();
1155    }
1156
1157    /** @return true if a block is being written  */
1158    boolean isWriting() {
1159      return state == State.WRITING;
1160    }
1161
1162    /**
1163     * Returns the number of bytes written into the current block so far, or
1164     * zero if not writing the block at the moment. Note that this will return
1165     * zero in the "block ready" state as well.
1166     *
1167     * @return the number of bytes written
1168     */
1169    public int encodedBlockSizeWritten() {
1170      if (state != State.WRITING)
1171        return 0;
1172      return this.encodedDataSizeWritten;
1173    }
1174
1175    /**
1176     * Returns the number of bytes written into the current block so far, or
1177     * zero if not writing the block at the moment. Note that this will return
1178     * zero in the "block ready" state as well.
1179     *
1180     * @return the number of bytes written
1181     */
1182    int blockSizeWritten() {
1183      if (state != State.WRITING) return 0;
1184      return this.unencodedDataSizeWritten;
1185    }
1186
1187    /**
1188     * Clones the header followed by the uncompressed data, even if using
1189     * compression. This is needed for storing uncompressed blocks in the block
1190     * cache. Can be called in the "writing" state or the "block ready" state.
1191     * Returns only the header and data, does not include checksum data.
1192     *
1193     * @return Returns an uncompressed block ByteBuff for caching on write
1194     */
1195    ByteBuff cloneUncompressedBufferWithHeader() {
1196      expectState(State.BLOCK_READY);
1197      ByteBuff bytebuff = allocator.allocate(baosInMemory.size());
1198      baosInMemory.toByteBuff(bytebuff);
1199      int numBytes = (int) ChecksumUtil.numBytes(
1200          onDiskBlockBytesWithHeader.size(),
1201          fileContext.getBytesPerChecksum());
1202      putHeader(bytebuff, onDiskBlockBytesWithHeader.size() + numBytes,
1203          baosInMemory.size(), onDiskBlockBytesWithHeader.size());
1204      bytebuff.rewind();
1205      return bytebuff;
1206    }
1207
1208    /**
1209     * Clones the header followed by the on-disk (compressed/encoded/encrypted) data. This is needed
1210     * for storing packed blocks in the block cache. Returns only the header and data, Does not
1211     * include checksum data.
1212     * @return Returns a copy of block bytes for caching on write
1213     */
1214    private ByteBuff cloneOnDiskBufferWithHeader() {
1215      expectState(State.BLOCK_READY);
1216      ByteBuff bytebuff = allocator.allocate(onDiskBlockBytesWithHeader.size());
1217      onDiskBlockBytesWithHeader.toByteBuff(bytebuff);
1218      bytebuff.rewind();
1219      return bytebuff;
1220    }
1221
1222    private void expectState(State expectedState) {
1223      if (state != expectedState) {
1224        throw new IllegalStateException("Expected state: " + expectedState +
1225            ", actual state: " + state);
1226      }
1227    }
1228
1229    /**
1230     * Takes the given {@link BlockWritable} instance, creates a new block of
1231     * its appropriate type, writes the writable into this block, and flushes
1232     * the block into the output stream. The writer is instructed not to buffer
1233     * uncompressed bytes for cache-on-write.
1234     *
1235     * @param bw the block-writable object to write as a block
1236     * @param out the file system output stream
1237     * @throws IOException
1238     */
1239    void writeBlock(BlockWritable bw, FSDataOutputStream out)
1240        throws IOException {
1241      bw.writeToBlock(startWriting(bw.getBlockType()));
1242      writeHeaderAndData(out);
1243    }
1244
1245    /**
1246     * Creates a new HFileBlock. Checksums have already been validated, so
1247     * the byte buffer passed into the constructor of this newly created
1248     * block does not have checksum data even though the header minor
1249     * version is MINOR_VERSION_WITH_CHECKSUM. This is indicated by setting a
1250     * 0 value in bytesPerChecksum. This method copies the on-disk or
1251     * uncompressed data to build the HFileBlock which is used only
1252     * while writing blocks and caching.
1253     *
1254     * <p>TODO: Should there be an option where a cache can ask that hbase preserve block
1255     * checksums for checking after a block comes out of the cache? Otehrwise, cache is responsible
1256     * for blocks being wholesome (ECC memory or if file-backed, it does checksumming).
1257     */
1258    HFileBlock getBlockForCaching(CacheConfig cacheConf) {
1259      HFileContext newContext = new HFileContextBuilder()
1260                                .withBlockSize(fileContext.getBlocksize())
1261                                .withBytesPerCheckSum(0)
1262                                .withChecksumType(ChecksumType.NULL) // no checksums in cached data
1263                                .withCompression(fileContext.getCompression())
1264                                .withDataBlockEncoding(fileContext.getDataBlockEncoding())
1265                                .withHBaseCheckSum(fileContext.isUseHBaseChecksum())
1266                                .withCompressTags(fileContext.isCompressTags())
1267                                .withIncludesMvcc(fileContext.isIncludesMvcc())
1268                                .withIncludesTags(fileContext.isIncludesTags())
1269                                .withColumnFamily(fileContext.getColumnFamily())
1270                                .withTableName(fileContext.getTableName())
1271                                .build();
1272      // Build the HFileBlock.
1273      HFileBlockBuilder builder = new HFileBlockBuilder();
1274      ByteBuff buff;
1275      if (cacheConf.shouldCacheCompressed(blockType.getCategory())) {
1276        buff = cloneOnDiskBufferWithHeader();
1277      } else {
1278        buff = cloneUncompressedBufferWithHeader();
1279      }
1280      return builder.withBlockType(blockType)
1281          .withOnDiskSizeWithoutHeader(getOnDiskSizeWithoutHeader())
1282          .withUncompressedSizeWithoutHeader(getUncompressedSizeWithoutHeader())
1283          .withPrevBlockOffset(prevOffset)
1284          .withByteBuff(buff)
1285          .withFillHeader(FILL_HEADER)
1286          .withOffset(startOffset)
1287          .withNextBlockOnDiskSize(UNSET)
1288          .withOnDiskDataSizeWithHeader(onDiskBlockBytesWithHeader.size() + onDiskChecksum.length)
1289          .withHFileContext(newContext)
1290          .withByteBuffAllocator(cacheConf.getByteBuffAllocator())
1291          .withShared(!buff.hasArray())
1292          .build();
1293    }
1294  }
1295
1296  /** Something that can be written into a block. */
1297  interface BlockWritable {
1298    /** The type of block this data should use. */
1299    BlockType getBlockType();
1300
1301    /**
1302     * Writes the block to the provided stream. Must not write any magic
1303     * records.
1304     *
1305     * @param out a stream to write uncompressed data into
1306     */
1307    void writeToBlock(DataOutput out) throws IOException;
1308  }
1309
1310  /**
1311   * Iterator for reading {@link HFileBlock}s in load-on-open-section, such as root data index
1312   * block, meta index block, file info block etc.
1313   */
1314  interface BlockIterator {
1315    /**
1316     * Get the next block, or null if there are no more blocks to iterate.
1317     */
1318    HFileBlock nextBlock() throws IOException;
1319
1320    /**
1321     * Similar to {@link #nextBlock()} but checks block type, throws an exception if incorrect, and
1322     * returns the HFile block
1323     */
1324    HFileBlock nextBlockWithBlockType(BlockType blockType) throws IOException;
1325
1326    /**
1327     * Now we use the {@link ByteBuffAllocator} to manage the nio ByteBuffers for HFileBlocks, so we
1328     * must deallocate all of the ByteBuffers in the end life. the BlockIterator's life cycle is
1329     * starting from opening an HFileReader and stopped when the HFileReader#close, so we will keep
1330     * track all the read blocks until we call {@link BlockIterator#freeBlocks()} when closing the
1331     * HFileReader. Sum bytes of those blocks in load-on-open section should be quite small, so
1332     * tracking them should be OK.
1333     */
1334    void freeBlocks();
1335  }
1336
1337  /** An HFile block reader with iteration ability. */
1338  interface FSReader {
1339    /**
1340     * Reads the block at the given offset in the file with the given on-disk size and uncompressed
1341     * size.
1342     * @param offset of the file to read
1343     * @param onDiskSize the on-disk size of the entire block, including all applicable headers, or
1344     *          -1 if unknown
1345     * @param pread true to use pread, otherwise use the stream read.
1346     * @param updateMetrics update the metrics or not.
1347     * @param intoHeap allocate the block's ByteBuff by {@link ByteBuffAllocator} or JVM heap. For
1348     *          LRUBlockCache, we must ensure that the block to cache is an heap one, because the
1349     *          memory occupation is based on heap now, also for {@link CombinedBlockCache}, we use
1350     *          the heap LRUBlockCache as L1 cache to cache small blocks such as IndexBlock or
1351     *          MetaBlock for faster access. So introduce an flag here to decide whether allocate
1352     *          from JVM heap or not so that we can avoid an extra off-heap to heap memory copy when
1353     *          using LRUBlockCache. For most cases, we known what's the expected block type we'll
1354     *          read, while for some special case (Example: HFileReaderImpl#readNextDataBlock()), we
1355     *          cannot pre-decide what's the expected block type, then we can only allocate block's
1356     *          ByteBuff from {@link ByteBuffAllocator} firstly, and then when caching it in
1357     *          {@link LruBlockCache} we'll check whether the ByteBuff is from heap or not, if not
1358     *          then we'll clone it to an heap one and cache it.
1359     * @return the newly read block
1360     */
1361    HFileBlock readBlockData(long offset, long onDiskSize, boolean pread, boolean updateMetrics,
1362        boolean intoHeap) throws IOException;
1363
1364    /**
1365     * Creates a block iterator over the given portion of the {@link HFile}.
1366     * The iterator returns blocks starting with offset such that offset &lt;=
1367     * startOffset &lt; endOffset. Returned blocks are always unpacked.
1368     * Used when no hfile index available; e.g. reading in the hfile index
1369     * blocks themselves on file open.
1370     *
1371     * @param startOffset the offset of the block to start iteration with
1372     * @param endOffset the offset to end iteration at (exclusive)
1373     * @return an iterator of blocks between the two given offsets
1374     */
1375    BlockIterator blockRange(long startOffset, long endOffset);
1376
1377    /** Closes the backing streams */
1378    void closeStreams() throws IOException;
1379
1380    /** Get a decoder for {@link BlockType#ENCODED_DATA} blocks from this file. */
1381    HFileBlockDecodingContext getBlockDecodingContext();
1382
1383    /** Get the default decoder for blocks from this file. */
1384    HFileBlockDecodingContext getDefaultBlockDecodingContext();
1385
1386    void setIncludesMemStoreTS(boolean includesMemstoreTS);
1387    void setDataBlockEncoder(HFileDataBlockEncoder encoder);
1388
1389    /**
1390     * To close the stream's socket. Note: This can be concurrently called from multiple threads and
1391     * implementation should take care of thread safety.
1392     */
1393    void unbufferStream();
1394  }
1395
1396  /**
1397   * Data-structure to use caching the header of the NEXT block. Only works if next read
1398   * that comes in here is next in sequence in this block.
1399   *
1400   * When we read, we read current block and the next blocks' header. We do this so we have
1401   * the length of the next block to read if the hfile index is not available (rare, at
1402   * hfile open only).
1403   */
1404  private static class PrefetchedHeader {
1405    long offset = -1;
1406    byte[] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE];
1407    final ByteBuff buf = new SingleByteBuff(ByteBuffer.wrap(header, 0, header.length));
1408
1409    @Override
1410    public String toString() {
1411      return "offset=" + this.offset + ", header=" + Bytes.toStringBinary(header);
1412    }
1413  }
1414
1415  /**
1416   * Reads version 2 HFile blocks from the filesystem.
1417   */
1418  static class FSReaderImpl implements FSReader {
1419    /** The file system stream of the underlying {@link HFile} that
1420     * does or doesn't do checksum validations in the filesystem */
1421    private FSDataInputStreamWrapper streamWrapper;
1422
1423    private HFileBlockDecodingContext encodedBlockDecodingCtx;
1424
1425    /** Default context used when BlockType != {@link BlockType#ENCODED_DATA}. */
1426    private final HFileBlockDefaultDecodingContext defaultDecodingCtx;
1427
1428    /**
1429     * Cache of the NEXT header after this. Check it is indeed next blocks header
1430     * before using it. TODO: Review. This overread into next block to fetch
1431     * next blocks header seems unnecessary given we usually get the block size
1432     * from the hfile index. Review!
1433     */
1434    private AtomicReference<PrefetchedHeader> prefetchedHeader = new AtomicReference<>(new PrefetchedHeader());
1435
1436    /** The size of the file we are reading from, or -1 if unknown. */
1437    private long fileSize;
1438
1439    /** The size of the header */
1440    @VisibleForTesting
1441    protected final int hdrSize;
1442
1443    /** The filesystem used to access data */
1444    private HFileSystem hfs;
1445
1446    private HFileContext fileContext;
1447    // Cache the fileName
1448    private String pathName;
1449
1450    private final ByteBuffAllocator allocator;
1451
1452    private final Lock streamLock = new ReentrantLock();
1453
1454    FSReaderImpl(ReaderContext readerContext, HFileContext fileContext,
1455        ByteBuffAllocator allocator) throws IOException {
1456      this.fileSize = readerContext.getFileSize();
1457      this.hfs = readerContext.getFileSystem();
1458      if (readerContext.getFilePath() != null) {
1459        this.pathName = readerContext.getFilePath().toString();
1460      }
1461      this.fileContext = fileContext;
1462      this.hdrSize = headerSize(fileContext.isUseHBaseChecksum());
1463      this.allocator = allocator;
1464
1465      this.streamWrapper = readerContext.getInputStreamWrapper();
1466      // Older versions of HBase didn't support checksum.
1467      this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum());
1468      defaultDecodingCtx = new HFileBlockDefaultDecodingContext(fileContext);
1469      encodedBlockDecodingCtx = defaultDecodingCtx;
1470    }
1471
1472    @Override
1473    public BlockIterator blockRange(final long startOffset, final long endOffset) {
1474      final FSReader owner = this; // handle for inner class
1475      return new BlockIterator() {
1476        private volatile boolean freed = false;
1477        // Tracking all read blocks until we call freeBlocks.
1478        private List<HFileBlock> blockTracker = new ArrayList<>();
1479        private long offset = startOffset;
1480        // Cache length of next block. Current block has the length of next block in it.
1481        private long length = -1;
1482
1483        @Override
1484        public HFileBlock nextBlock() throws IOException {
1485          if (offset >= endOffset) {
1486            return null;
1487          }
1488          HFileBlock b = readBlockData(offset, length, false, false, true);
1489          offset += b.getOnDiskSizeWithHeader();
1490          length = b.getNextBlockOnDiskSize();
1491          HFileBlock uncompressed = b.unpack(fileContext, owner);
1492          if (uncompressed != b) {
1493            b.release(); // Need to release the compressed Block now.
1494          }
1495          blockTracker.add(uncompressed);
1496          return uncompressed;
1497        }
1498
1499        @Override
1500        public HFileBlock nextBlockWithBlockType(BlockType blockType) throws IOException {
1501          HFileBlock blk = nextBlock();
1502          if (blk.getBlockType() != blockType) {
1503            throw new IOException(
1504                "Expected block of type " + blockType + " but found " + blk.getBlockType());
1505          }
1506          return blk;
1507        }
1508
1509        @Override
1510        public void freeBlocks() {
1511          if (freed) {
1512            return;
1513          }
1514          blockTracker.forEach(HFileBlock::release);
1515          blockTracker = null;
1516          freed = true;
1517        }
1518      };
1519    }
1520
1521    /**
1522     * Does a positional read or a seek and read into the given byte buffer. We need take care that
1523     * we will call the {@link ByteBuff#release()} for every exit to deallocate the ByteBuffers,
1524     * otherwise the memory leak may happen.
1525     * @param dest destination buffer
1526     * @param size size of read
1527     * @param peekIntoNextBlock whether to read the next block's on-disk size
1528     * @param fileOffset position in the stream to read at
1529     * @param pread whether we should do a positional read
1530     * @param istream The input source of data
1531     * @return true to indicate the destination buffer include the next block header, otherwise only
1532     *         include the current block data without the next block header.
1533     * @throws IOException if any IO error happen.
1534     */
1535    protected boolean readAtOffset(FSDataInputStream istream, ByteBuff dest, int size,
1536        boolean peekIntoNextBlock, long fileOffset, boolean pread) throws IOException {
1537      if (!pread) {
1538        // Seek + read. Better for scanning.
1539        HFileUtil.seekOnMultipleSources(istream, fileOffset);
1540        long realOffset = istream.getPos();
1541        if (realOffset != fileOffset) {
1542          throw new IOException("Tried to seek to " + fileOffset + " to read " + size
1543              + " bytes, but pos=" + realOffset + " after seek");
1544        }
1545        if (!peekIntoNextBlock) {
1546          BlockIOUtils.readFully(dest, istream, size);
1547          return false;
1548        }
1549
1550        // Try to read the next block header
1551        if (!BlockIOUtils.readWithExtra(dest, istream, size, hdrSize)) {
1552          // did not read the next block header.
1553          return false;
1554        }
1555      } else {
1556        // Positional read. Better for random reads; or when the streamLock is already locked.
1557        int extraSize = peekIntoNextBlock ? hdrSize : 0;
1558        if (!BlockIOUtils.preadWithExtra(dest, istream, fileOffset, size, extraSize)) {
1559          // did not read the next block header.
1560          return false;
1561        }
1562      }
1563      assert peekIntoNextBlock;
1564      return true;
1565    }
1566
1567    /**
1568     * Reads a version 2 block (version 1 blocks not supported and not expected). Tries to do as
1569     * little memory allocation as possible, using the provided on-disk size.
1570     * @param offset the offset in the stream to read at
1571     * @param onDiskSizeWithHeaderL the on-disk size of the block, including the header, or -1 if
1572     *          unknown; i.e. when iterating over blocks reading in the file metadata info.
1573     * @param pread whether to use a positional read
1574     * @param updateMetrics whether to update the metrics
1575     * @param intoHeap allocate ByteBuff of block from heap or off-heap.
1576     * @see FSReader#readBlockData(long, long, boolean, boolean, boolean) for more details about the
1577     *      useHeap.
1578     */
1579    @Override
1580    public HFileBlock readBlockData(long offset, long onDiskSizeWithHeaderL, boolean pread,
1581        boolean updateMetrics, boolean intoHeap) throws IOException {
1582      // Get a copy of the current state of whether to validate
1583      // hbase checksums or not for this read call. This is not
1584      // thread-safe but the one constaint is that if we decide
1585      // to skip hbase checksum verification then we are
1586      // guaranteed to use hdfs checksum verification.
1587      boolean doVerificationThruHBaseChecksum = streamWrapper.shouldUseHBaseChecksum();
1588      FSDataInputStream is = streamWrapper.getStream(doVerificationThruHBaseChecksum);
1589
1590      HFileBlock blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread,
1591        doVerificationThruHBaseChecksum, updateMetrics, intoHeap);
1592      if (blk == null) {
1593        HFile.LOG.warn("HBase checksum verification failed for file " +
1594                       pathName + " at offset " +
1595                       offset + " filesize " + fileSize +
1596                       ". Retrying read with HDFS checksums turned on...");
1597
1598        if (!doVerificationThruHBaseChecksum) {
1599          String msg = "HBase checksum verification failed for file " +
1600                       pathName + " at offset " +
1601                       offset + " filesize " + fileSize +
1602                       " but this cannot happen because doVerify is " +
1603                       doVerificationThruHBaseChecksum;
1604          HFile.LOG.warn(msg);
1605          throw new IOException(msg); // cannot happen case here
1606        }
1607        HFile.CHECKSUM_FAILURES.increment(); // update metrics
1608
1609        // If we have a checksum failure, we fall back into a mode where
1610        // the next few reads use HDFS level checksums. We aim to make the
1611        // next CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD reads avoid
1612        // hbase checksum verification, but since this value is set without
1613        // holding any locks, it can so happen that we might actually do
1614        // a few more than precisely this number.
1615        is = this.streamWrapper.fallbackToFsChecksum(CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD);
1616        doVerificationThruHBaseChecksum = false;
1617        blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread,
1618          doVerificationThruHBaseChecksum, updateMetrics, intoHeap);
1619        if (blk != null) {
1620          HFile.LOG.warn("HDFS checksum verification succeeded for file " +
1621                         pathName + " at offset " +
1622                         offset + " filesize " + fileSize);
1623        }
1624      }
1625      if (blk == null && !doVerificationThruHBaseChecksum) {
1626        String msg = "readBlockData failed, possibly due to " +
1627                     "checksum verification failed for file " + pathName +
1628                     " at offset " + offset + " filesize " + fileSize;
1629        HFile.LOG.warn(msg);
1630        throw new IOException(msg);
1631      }
1632
1633      // If there is a checksum mismatch earlier, then retry with
1634      // HBase checksums switched off and use HDFS checksum verification.
1635      // This triggers HDFS to detect and fix corrupt replicas. The
1636      // next checksumOffCount read requests will use HDFS checksums.
1637      // The decrementing of this.checksumOffCount is not thread-safe,
1638      // but it is harmless because eventually checksumOffCount will be
1639      // a negative number.
1640      streamWrapper.checksumOk();
1641      return blk;
1642    }
1643
1644    /**
1645     * @return Check <code>onDiskSizeWithHeaderL</code> size is healthy and then return it as an int
1646     * @throws IOException
1647     */
1648    private static int checkAndGetSizeAsInt(final long onDiskSizeWithHeaderL, final int hdrSize)
1649    throws IOException {
1650      if ((onDiskSizeWithHeaderL < hdrSize && onDiskSizeWithHeaderL != -1)
1651          || onDiskSizeWithHeaderL >= Integer.MAX_VALUE) {
1652        throw new IOException("Invalid onDisksize=" + onDiskSizeWithHeaderL
1653            + ": expected to be at least " + hdrSize
1654            + " and at most " + Integer.MAX_VALUE + ", or -1");
1655      }
1656      return (int)onDiskSizeWithHeaderL;
1657    }
1658
1659    /**
1660     * Verify the passed in onDiskSizeWithHeader aligns with what is in the header else something
1661     * is not right.
1662     * @throws IOException
1663     */
1664    private void verifyOnDiskSizeMatchesHeader(final int passedIn, final ByteBuff headerBuf,
1665        final long offset, boolean verifyChecksum)
1666    throws IOException {
1667      // Assert size provided aligns with what is in the header
1668      int fromHeader = getOnDiskSizeWithHeader(headerBuf, verifyChecksum);
1669      if (passedIn != fromHeader) {
1670        throw new IOException("Passed in onDiskSizeWithHeader=" + passedIn + " != " + fromHeader +
1671            ", offset=" + offset + ", fileContext=" + this.fileContext);
1672      }
1673    }
1674
1675    /**
1676     * Check atomic reference cache for this block's header. Cache only good if next
1677     * read coming through is next in sequence in the block. We read next block's
1678     * header on the tail of reading the previous block to save a seek. Otherwise,
1679     * we have to do a seek to read the header before we can pull in the block OR
1680     * we have to backup the stream because we over-read (the next block's header).
1681     * @see PrefetchedHeader
1682     * @return The cached block header or null if not found.
1683     * @see #cacheNextBlockHeader(long, ByteBuff, int, int)
1684     */
1685    private ByteBuff getCachedHeader(final long offset) {
1686      PrefetchedHeader ph = this.prefetchedHeader.get();
1687      return ph != null && ph.offset == offset ? ph.buf : null;
1688    }
1689
1690    /**
1691     * Save away the next blocks header in atomic reference.
1692     * @see #getCachedHeader(long)
1693     * @see PrefetchedHeader
1694     */
1695    private void cacheNextBlockHeader(final long offset,
1696        ByteBuff onDiskBlock, int onDiskSizeWithHeader, int headerLength) {
1697      PrefetchedHeader ph = new PrefetchedHeader();
1698      ph.offset = offset;
1699      onDiskBlock.get(onDiskSizeWithHeader, ph.header, 0, headerLength);
1700      this.prefetchedHeader.set(ph);
1701    }
1702
1703    private int getNextBlockOnDiskSize(boolean readNextHeader, ByteBuff onDiskBlock,
1704        int onDiskSizeWithHeader) {
1705      int nextBlockOnDiskSize = -1;
1706      if (readNextHeader) {
1707        nextBlockOnDiskSize =
1708            onDiskBlock.getIntAfterPosition(onDiskSizeWithHeader + BlockType.MAGIC_LENGTH)
1709                + hdrSize;
1710      }
1711      return nextBlockOnDiskSize;
1712    }
1713
1714    private ByteBuff allocate(int size, boolean intoHeap) {
1715      return intoHeap ? HEAP.allocate(size) : allocator.allocate(size);
1716    }
1717
1718    /**
1719     * Reads a version 2 block.
1720     * @param offset the offset in the stream to read at.
1721     * @param onDiskSizeWithHeaderL the on-disk size of the block, including the header and
1722     *          checksums if present or -1 if unknown (as a long). Can be -1 if we are doing raw
1723     *          iteration of blocks as when loading up file metadata; i.e. the first read of a new
1724     *          file. Usually non-null gotten from the file index.
1725     * @param pread whether to use a positional read
1726     * @param verifyChecksum Whether to use HBase checksums. If HBase checksum is switched off, then
1727     *          use HDFS checksum. Can also flip on/off reading same file if we hit a troublesome
1728     *          patch in an hfile.
1729     * @param updateMetrics whether need to update the metrics.
1730     * @param intoHeap allocate the ByteBuff of block from heap or off-heap.
1731     * @return the HFileBlock or null if there is a HBase checksum mismatch
1732     */
1733    @VisibleForTesting
1734    protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset,
1735        long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum, boolean updateMetrics,
1736        boolean intoHeap) throws IOException {
1737      if (offset < 0) {
1738        throw new IOException("Invalid offset=" + offset + " trying to read "
1739            + "block (onDiskSize=" + onDiskSizeWithHeaderL + ")");
1740      }
1741      int onDiskSizeWithHeader = checkAndGetSizeAsInt(onDiskSizeWithHeaderL, hdrSize);
1742      // Try and get cached header. Will serve us in rare case where onDiskSizeWithHeaderL is -1
1743      // and will save us having to seek the stream backwards to reread the header we
1744      // read the last time through here.
1745      ByteBuff headerBuf = getCachedHeader(offset);
1746      LOG.trace("Reading {} at offset={}, pread={}, verifyChecksum={}, cachedHeader={}, " +
1747          "onDiskSizeWithHeader={}", this.fileContext.getHFileName(), offset, pread,
1748          verifyChecksum, headerBuf, onDiskSizeWithHeader);
1749      // This is NOT same as verifyChecksum. This latter is whether to do hbase
1750      // checksums. Can change with circumstances. The below flag is whether the
1751      // file has support for checksums (version 2+).
1752      boolean checksumSupport = this.fileContext.isUseHBaseChecksum();
1753      long startTime = System.currentTimeMillis();
1754      if (onDiskSizeWithHeader <= 0) {
1755        // We were not passed the block size. Need to get it from the header. If header was
1756        // not cached (see getCachedHeader above), need to seek to pull it in. This is costly
1757        // and should happen very rarely. Currently happens on open of a hfile reader where we
1758        // read the trailer blocks to pull in the indices. Otherwise, we are reading block sizes
1759        // out of the hfile index. To check, enable TRACE in this file and you'll get an exception
1760        // in a LOG every time we seek. See HBASE-17072 for more detail.
1761        if (headerBuf == null) {
1762          if (LOG.isTraceEnabled()) {
1763            LOG.trace("Extra see to get block size!", new RuntimeException());
1764          }
1765          headerBuf = HEAP.allocate(hdrSize);
1766          readAtOffset(is, headerBuf, hdrSize, false, offset, pread);
1767          headerBuf.rewind();
1768        }
1769        onDiskSizeWithHeader = getOnDiskSizeWithHeader(headerBuf, checksumSupport);
1770      }
1771      int preReadHeaderSize = headerBuf == null? 0 : hdrSize;
1772      // Allocate enough space to fit the next block's header too; saves a seek next time through.
1773      // onDiskBlock is whole block + header + checksums then extra hdrSize to read next header;
1774      // onDiskSizeWithHeader is header, body, and any checksums if present. preReadHeaderSize
1775      // says where to start reading. If we have the header cached, then we don't need to read
1776      // it again and we can likely read from last place we left off w/o need to backup and reread
1777      // the header we read last time through here.
1778      ByteBuff onDiskBlock = this.allocate(onDiskSizeWithHeader + hdrSize, intoHeap);
1779      boolean initHFileBlockSuccess = false;
1780      try {
1781        if (headerBuf != null) {
1782          onDiskBlock.put(0, headerBuf, 0, hdrSize).position(hdrSize);
1783        }
1784        boolean readNextHeader = readAtOffset(is, onDiskBlock,
1785          onDiskSizeWithHeader - preReadHeaderSize, true, offset + preReadHeaderSize, pread);
1786        onDiskBlock.rewind(); // in case of moving position when copying a cached header
1787        int nextBlockOnDiskSize =
1788            getNextBlockOnDiskSize(readNextHeader, onDiskBlock, onDiskSizeWithHeader);
1789        if (headerBuf == null) {
1790          headerBuf = onDiskBlock.duplicate().position(0).limit(hdrSize);
1791        }
1792        // Do a few checks before we go instantiate HFileBlock.
1793        assert onDiskSizeWithHeader > this.hdrSize;
1794        verifyOnDiskSizeMatchesHeader(onDiskSizeWithHeader, headerBuf, offset, checksumSupport);
1795        ByteBuff curBlock = onDiskBlock.duplicate().position(0).limit(onDiskSizeWithHeader);
1796        // Verify checksum of the data before using it for building HFileBlock.
1797        if (verifyChecksum && !validateChecksum(offset, curBlock, hdrSize)) {
1798          return null;
1799        }
1800        long duration = System.currentTimeMillis() - startTime;
1801        if (updateMetrics) {
1802          HFile.updateReadLatency(duration, pread);
1803        }
1804        // The onDiskBlock will become the headerAndDataBuffer for this block.
1805        // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already
1806        // contains the header of next block, so no need to set next block's header in it.
1807        HFileBlock hFileBlock = createFromBuff(curBlock, checksumSupport, offset,
1808          nextBlockOnDiskSize, fileContext, intoHeap ? HEAP : allocator);
1809        // Run check on uncompressed sizings.
1810        if (!fileContext.isCompressedOrEncrypted()) {
1811          hFileBlock.sanityCheckUncompressed();
1812        }
1813        LOG.trace("Read {} in {} ns", hFileBlock, duration);
1814        // Cache next block header if we read it for the next time through here.
1815        if (nextBlockOnDiskSize != -1) {
1816          cacheNextBlockHeader(offset + hFileBlock.getOnDiskSizeWithHeader(), onDiskBlock,
1817            onDiskSizeWithHeader, hdrSize);
1818        }
1819        initHFileBlockSuccess = true;
1820        return hFileBlock;
1821      } finally {
1822        if (!initHFileBlockSuccess) {
1823          onDiskBlock.release();
1824        }
1825      }
1826    }
1827
1828    @Override
1829    public void setIncludesMemStoreTS(boolean includesMemstoreTS) {
1830      this.fileContext.setIncludesMvcc(includesMemstoreTS);
1831    }
1832
1833    @Override
1834    public void setDataBlockEncoder(HFileDataBlockEncoder encoder) {
1835      encodedBlockDecodingCtx = encoder.newDataBlockDecodingContext(this.fileContext);
1836    }
1837
1838    @Override
1839    public HFileBlockDecodingContext getBlockDecodingContext() {
1840      return this.encodedBlockDecodingCtx;
1841    }
1842
1843    @Override
1844    public HFileBlockDecodingContext getDefaultBlockDecodingContext() {
1845      return this.defaultDecodingCtx;
1846    }
1847
1848    /**
1849     * Generates the checksum for the header as well as the data and then validates it.
1850     * If the block doesn't uses checksum, returns false.
1851     * @return True if checksum matches, else false.
1852     */
1853    private boolean validateChecksum(long offset, ByteBuff data, int hdrSize) {
1854      // If this is an older version of the block that does not have checksums, then return false
1855      // indicating that checksum verification did not succeed. Actually, this method should never
1856      // be called when the minorVersion is 0, thus this is a defensive check for a cannot-happen
1857      // case. Since this is a cannot-happen case, it is better to return false to indicate a
1858      // checksum validation failure.
1859      if (!fileContext.isUseHBaseChecksum()) {
1860        return false;
1861      }
1862      return ChecksumUtil.validateChecksum(data, pathName, offset, hdrSize);
1863    }
1864
1865    @Override
1866    public void closeStreams() throws IOException {
1867      streamWrapper.close();
1868    }
1869
1870    @Override
1871    public void unbufferStream() {
1872      // To handle concurrent reads, ensure that no other client is accessing the streams while we
1873      // unbuffer it.
1874      if (streamLock.tryLock()) {
1875        try {
1876          this.streamWrapper.unbuffer();
1877        } finally {
1878          streamLock.unlock();
1879        }
1880      }
1881    }
1882
1883    @Override
1884    public String toString() {
1885      return "hfs=" + hfs + ", path=" + pathName + ", fileContext=" + fileContext;
1886    }
1887  }
1888
1889  /** An additional sanity-check in case no compression or encryption is being used. */
1890  @VisibleForTesting
1891  void sanityCheckUncompressed() throws IOException {
1892    if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader +
1893        totalChecksumBytes()) {
1894      throw new IOException("Using no compression but "
1895          + "onDiskSizeWithoutHeader=" + onDiskSizeWithoutHeader + ", "
1896          + "uncompressedSizeWithoutHeader=" + uncompressedSizeWithoutHeader
1897          + ", numChecksumbytes=" + totalChecksumBytes());
1898    }
1899  }
1900
1901  // Cacheable implementation
1902  @Override
1903  public int getSerializedLength() {
1904    if (buf != null) {
1905      // Include extra bytes for block metadata.
1906      return this.buf.limit() + BLOCK_METADATA_SPACE;
1907    }
1908    return 0;
1909  }
1910
1911  // Cacheable implementation
1912  @Override
1913  public void serialize(ByteBuffer destination, boolean includeNextBlockMetadata) {
1914    this.buf.get(destination, 0, getSerializedLength() - BLOCK_METADATA_SPACE);
1915    destination = addMetaData(destination, includeNextBlockMetadata);
1916
1917    // Make it ready for reading. flip sets position to zero and limit to current position which
1918    // is what we want if we do not want to serialize the block plus checksums if present plus
1919    // metadata.
1920    destination.flip();
1921  }
1922
1923  /**
1924   * For use by bucketcache. This exposes internals.
1925   */
1926  public ByteBuffer getMetaData() {
1927    ByteBuffer bb = ByteBuffer.allocate(BLOCK_METADATA_SPACE);
1928    bb = addMetaData(bb, true);
1929    bb.flip();
1930    return bb;
1931  }
1932
1933  /**
1934   * Adds metadata at current position (position is moved forward). Does not flip or reset.
1935   * @return The passed <code>destination</code> with metadata added.
1936   */
1937  private ByteBuffer addMetaData(final ByteBuffer destination, boolean includeNextBlockMetadata) {
1938    destination.put(this.fileContext.isUseHBaseChecksum() ? (byte) 1 : (byte) 0);
1939    destination.putLong(this.offset);
1940    if (includeNextBlockMetadata) {
1941      destination.putInt(this.nextBlockOnDiskSize);
1942    }
1943    return destination;
1944  }
1945
1946  // Cacheable implementation
1947  @Override
1948  public CacheableDeserializer<Cacheable> getDeserializer() {
1949    return HFileBlock.BLOCK_DESERIALIZER;
1950  }
1951
1952  @Override
1953  public int hashCode() {
1954    int result = 1;
1955    result = result * 31 + blockType.hashCode();
1956    result = result * 31 + nextBlockOnDiskSize;
1957    result = result * 31 + (int) (offset ^ (offset >>> 32));
1958    result = result * 31 + onDiskSizeWithoutHeader;
1959    result = result * 31 + (int) (prevBlockOffset ^ (prevBlockOffset >>> 32));
1960    result = result * 31 + uncompressedSizeWithoutHeader;
1961    result = result * 31 + buf.hashCode();
1962    return result;
1963  }
1964
1965  @Override
1966  public boolean equals(Object comparison) {
1967    if (this == comparison) {
1968      return true;
1969    }
1970    if (comparison == null) {
1971      return false;
1972    }
1973    if (!(comparison instanceof HFileBlock)) {
1974      return false;
1975    }
1976
1977    HFileBlock castedComparison = (HFileBlock) comparison;
1978
1979    if (castedComparison.blockType != this.blockType) {
1980      return false;
1981    }
1982    if (castedComparison.nextBlockOnDiskSize != this.nextBlockOnDiskSize) {
1983      return false;
1984    }
1985    // Offset is important. Needed when we have to remake cachekey when block is returned to cache.
1986    if (castedComparison.offset != this.offset) {
1987      return false;
1988    }
1989    if (castedComparison.onDiskSizeWithoutHeader != this.onDiskSizeWithoutHeader) {
1990      return false;
1991    }
1992    if (castedComparison.prevBlockOffset != this.prevBlockOffset) {
1993      return false;
1994    }
1995    if (castedComparison.uncompressedSizeWithoutHeader != this.uncompressedSizeWithoutHeader) {
1996      return false;
1997    }
1998    if (ByteBuff.compareTo(this.buf, 0, this.buf.limit(), castedComparison.buf, 0,
1999        castedComparison.buf.limit()) != 0) {
2000      return false;
2001    }
2002    return true;
2003  }
2004
2005  DataBlockEncoding getDataBlockEncoding() {
2006    if (blockType == BlockType.ENCODED_DATA) {
2007      return DataBlockEncoding.getEncodingById(getDataBlockEncodingId());
2008    }
2009    return DataBlockEncoding.NONE;
2010  }
2011
2012  @VisibleForTesting
2013  byte getChecksumType() {
2014    return this.fileContext.getChecksumType().getCode();
2015  }
2016
2017  int getBytesPerChecksum() {
2018    return this.fileContext.getBytesPerChecksum();
2019  }
2020
2021  /** @return the size of data on disk + header. Excludes checksum. */
2022  @VisibleForTesting
2023  int getOnDiskDataSizeWithHeader() {
2024    return this.onDiskDataSizeWithHeader;
2025  }
2026
2027  /**
2028   * Calculate the number of bytes required to store all the checksums
2029   * for this block. Each checksum value is a 4 byte integer.
2030   */
2031  int totalChecksumBytes() {
2032    // If the hfile block has minorVersion 0, then there are no checksum
2033    // data to validate. Similarly, a zero value in this.bytesPerChecksum
2034    // indicates that cached blocks do not have checksum data because
2035    // checksums were already validated when the block was read from disk.
2036    if (!fileContext.isUseHBaseChecksum() || this.fileContext.getBytesPerChecksum() == 0) {
2037      return 0;
2038    }
2039    return (int) ChecksumUtil.numBytes(onDiskDataSizeWithHeader,
2040        this.fileContext.getBytesPerChecksum());
2041  }
2042
2043  /**
2044   * Returns the size of this block header.
2045   */
2046  public int headerSize() {
2047    return headerSize(this.fileContext.isUseHBaseChecksum());
2048  }
2049
2050  /**
2051   * Maps a minor version to the size of the header.
2052   */
2053  public static int headerSize(boolean usesHBaseChecksum) {
2054    return usesHBaseChecksum?
2055        HConstants.HFILEBLOCK_HEADER_SIZE: HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
2056  }
2057
2058  /**
2059   * Return the appropriate DUMMY_HEADER for the minor version
2060   */
2061  @VisibleForTesting
2062  // TODO: Why is this in here?
2063  byte[] getDummyHeaderForVersion() {
2064    return getDummyHeaderForVersion(this.fileContext.isUseHBaseChecksum());
2065  }
2066
2067  /**
2068   * Return the appropriate DUMMY_HEADER for the minor version
2069   */
2070  static private byte[] getDummyHeaderForVersion(boolean usesHBaseChecksum) {
2071    return usesHBaseChecksum? HConstants.HFILEBLOCK_DUMMY_HEADER: DUMMY_HEADER_NO_CHECKSUM;
2072  }
2073
2074  /**
2075   * @return This HFileBlocks fileContext which will a derivative of the
2076   * fileContext for the file from which this block's data was originally read.
2077   */
2078  HFileContext getHFileContext() {
2079    return this.fileContext;
2080  }
2081
2082  /**
2083   * Convert the contents of the block header into a human readable string.
2084   * This is mostly helpful for debugging. This assumes that the block
2085   * has minor version > 0.
2086   */
2087  @VisibleForTesting
2088  static String toStringHeader(ByteBuff buf) throws IOException {
2089    byte[] magicBuf = new byte[Math.min(buf.limit() - buf.position(), BlockType.MAGIC_LENGTH)];
2090    buf.get(magicBuf);
2091    BlockType bt = BlockType.parse(magicBuf, 0, BlockType.MAGIC_LENGTH);
2092    int compressedBlockSizeNoHeader = buf.getInt();
2093    int uncompressedBlockSizeNoHeader = buf.getInt();
2094    long prevBlockOffset = buf.getLong();
2095    byte cksumtype = buf.get();
2096    long bytesPerChecksum = buf.getInt();
2097    long onDiskDataSizeWithHeader = buf.getInt();
2098    return " Header dump: magic: " + Bytes.toString(magicBuf) +
2099                   " blockType " + bt +
2100                   " compressedBlockSizeNoHeader " +
2101                   compressedBlockSizeNoHeader +
2102                   " uncompressedBlockSizeNoHeader " +
2103                   uncompressedBlockSizeNoHeader +
2104                   " prevBlockOffset " + prevBlockOffset +
2105                   " checksumType " + ChecksumType.codeToType(cksumtype) +
2106                   " bytesPerChecksum " + bytesPerChecksum +
2107                   " onDiskDataSizeWithHeader " + onDiskDataSizeWithHeader;
2108  }
2109
2110  private static HFileBlockBuilder createBuilder(HFileBlock blk){
2111    return new HFileBlockBuilder()
2112          .withBlockType(blk.blockType)
2113          .withOnDiskSizeWithoutHeader(blk.onDiskSizeWithoutHeader)
2114          .withUncompressedSizeWithoutHeader(blk.uncompressedSizeWithoutHeader)
2115          .withPrevBlockOffset(blk.prevBlockOffset)
2116          .withByteBuff(blk.buf.duplicate()) // Duplicate the buffer.
2117          .withOffset(blk.offset)
2118          .withOnDiskDataSizeWithHeader(blk.onDiskDataSizeWithHeader)
2119          .withNextBlockOnDiskSize(blk.nextBlockOnDiskSize)
2120          .withHFileContext(blk.fileContext)
2121          .withByteBuffAllocator(blk.allocator)
2122          .withShared(blk.isSharedMem());
2123  }
2124
2125  static HFileBlock shallowClone(HFileBlock blk) {
2126    return createBuilder(blk).build();
2127  }
2128
2129  static HFileBlock deepCloneOnHeap(HFileBlock blk) {
2130    ByteBuff deepCloned = ByteBuff.wrap(ByteBuffer.wrap(blk.buf.toBytes(0, blk.buf.limit())));
2131    return createBuilder(blk).withByteBuff(deepCloned).withShared(false).build();
2132  }
2133}