001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP;
021import java.io.DataInputStream;
022import java.io.DataOutput;
023import java.io.DataOutputStream;
024import java.io.IOException;
025import java.nio.ByteBuffer;
026import java.util.ArrayList;
027import java.util.List;
028import java.util.concurrent.atomic.AtomicReference;
029import java.util.concurrent.locks.Lock;
030import java.util.concurrent.locks.ReentrantLock;
031import org.apache.hadoop.fs.FSDataInputStream;
032import org.apache.hadoop.fs.FSDataOutputStream;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.fs.HFileSystem;
036import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
037import org.apache.hadoop.hbase.io.ByteBuffAllocator;
038import org.apache.hadoop.hbase.io.ByteBuffInputStream;
039import org.apache.hadoop.hbase.io.ByteBufferWriterDataOutputStream;
040import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
041import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
042import org.apache.hadoop.hbase.io.encoding.EncodingState;
043import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
044import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
045import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
046import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
047import org.apache.hadoop.hbase.io.util.BlockIOUtils;
048import org.apache.hadoop.hbase.nio.ByteBuff;
049import org.apache.hadoop.hbase.nio.MultiByteBuff;
050import org.apache.hadoop.hbase.nio.SingleByteBuff;
051import org.apache.hadoop.hbase.regionserver.ShipperListener;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.apache.hadoop.hbase.util.ChecksumType;
054import org.apache.hadoop.hbase.util.ClassSize;
055import org.apache.yetus.audience.InterfaceAudience;
056import org.slf4j.Logger;
057import org.slf4j.LoggerFactory;
058import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
059import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
060
061/**
062 * Cacheable Blocks of an {@link HFile} version 2 file.
063 * Version 2 was introduced in hbase-0.92.0.
064 *
065 * <p>Version 1 was the original file block. Version 2 was introduced when we changed the hbase file
066 * format to support multi-level block indexes and compound bloom filters (HBASE-3857). Support
067 * for Version 1 was removed in hbase-1.3.0.
068 *
069 * <h3>HFileBlock: Version 2</h3>
070 * In version 2, a block is structured as follows:
071 * <ul>
072 * <li><b>Header:</b> See Writer#putHeader() for where header is written; header total size is
073 * HFILEBLOCK_HEADER_SIZE
074 * <ul>
075 * <li>0. blockType: Magic record identifying the {@link BlockType} (8 bytes):
076 * e.g. <code>DATABLK*</code>
077 * <li>1. onDiskSizeWithoutHeader: Compressed -- a.k.a 'on disk' -- block size, excluding header,
078 * but including tailing checksum bytes (4 bytes)
079 * <li>2. uncompressedSizeWithoutHeader: Uncompressed block size, excluding header, and excluding
080 * checksum bytes (4 bytes)
081 * <li>3. prevBlockOffset: The offset of the previous block of the same type (8 bytes). This is
082 * used to navigate to the previous block without having to go to the block index
083 * <li>4: For minorVersions &gt;=1, the ordinal describing checksum type (1 byte)
084 * <li>5: For minorVersions &gt;=1, the number of data bytes/checksum chunk (4 bytes)
085 * <li>6: onDiskDataSizeWithHeader: For minorVersions &gt;=1, the size of data 'on disk', including
086 * header, excluding checksums (4 bytes)
087 * </ul>
088 * </li>
089 * <li><b>Raw/Compressed/Encrypted/Encoded data:</b> The compression
090 * algorithm is the same for all the blocks in an {@link HFile}. If compression is NONE, this is
091 * just raw, serialized Cells.
092 * <li><b>Tail:</b> For minorVersions &gt;=1, a series of 4 byte checksums, one each for
093 * the number of bytes specified by bytesPerChecksum.
094 * </ul>
095 *
096 * <h3>Caching</h3>
097 * Caches cache whole blocks with trailing checksums if any. We then tag on some metadata, the
098 * content of BLOCK_METADATA_SPACE which will be flag on if we are doing 'hbase'
099 * checksums and then the offset into the file which is needed when we re-make a cache key
100 * when we return the block to the cache as 'done'.
101 * See {@link Cacheable#serialize(ByteBuffer, boolean)} and {@link Cacheable#getDeserializer()}.
102 *
103 * <p>TODO: Should we cache the checksums? Down in Writer#getBlockForCaching(CacheConfig) where
104 * we make a block to cache-on-write, there is an attempt at turning off checksums. This is not the
105 * only place we get blocks to cache. We also will cache the raw return from an hdfs read. In this
106 * case, the checksums may be present. If the cache is backed by something that doesn't do ECC,
107 * say an SSD, we might want to preserve checksums. For now this is open question.
108 * <p>TODO: Over in BucketCache, we save a block allocation by doing a custom serialization.
109 * Be sure to change it if serialization changes in here. Could we add a method here that takes an
110 * IOEngine and that then serializes to it rather than expose our internals over in BucketCache?
111 * IOEngine is in the bucket subpackage. Pull it up? Then this class knows about bucketcache. Ugh.
112 */
113@InterfaceAudience.Private
114public class HFileBlock implements Cacheable {
115  private static final Logger LOG = LoggerFactory.getLogger(HFileBlock.class);
116  public static final int FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT +
117     // BlockType, ByteBuff, MemoryType, HFileContext, ByteBuffAllocator
118      5 * ClassSize.REFERENCE +
119      // On-disk size, uncompressed size, and next block's on-disk size
120      // bytePerChecksum and onDiskDataSize
121      4 * Bytes.SIZEOF_INT +
122      // This and previous block offset
123      2 * Bytes.SIZEOF_LONG);
124
125  // Block Header fields.
126
127  // TODO: encapsulate Header related logic in this inner class.
128  static class Header {
129    // Format of header is:
130    // 8 bytes - block magic
131    // 4 bytes int - onDiskSizeWithoutHeader
132    // 4 bytes int - uncompressedSizeWithoutHeader
133    // 8 bytes long - prevBlockOffset
134    // The following 3 are only present if header contains checksum information
135    // 1 byte - checksum type
136    // 4 byte int - bytes per checksum
137    // 4 byte int - onDiskDataSizeWithHeader
138    static int BLOCK_MAGIC_INDEX = 0;
139    static int ON_DISK_SIZE_WITHOUT_HEADER_INDEX = 8;
140    static int UNCOMPRESSED_SIZE_WITHOUT_HEADER_INDEX = 12;
141    static int PREV_BLOCK_OFFSET_INDEX = 16;
142    static int CHECKSUM_TYPE_INDEX = 24;
143    static int BYTES_PER_CHECKSUM_INDEX = 25;
144    static int ON_DISK_DATA_SIZE_WITH_HEADER_INDEX = 29;
145  }
146
147  /** Type of block. Header field 0. */
148  private BlockType blockType;
149
150  /**
151   * Size on disk excluding header, including checksum. Header field 1.
152   * @see Writer#putHeader(byte[], int, int, int, int)
153   */
154  private int onDiskSizeWithoutHeader;
155
156  /**
157   * Size of pure data. Does not include header or checksums. Header field 2.
158   * @see Writer#putHeader(byte[], int, int, int, int)
159   */
160  private int uncompressedSizeWithoutHeader;
161
162  /**
163   * The offset of the previous block on disk. Header field 3.
164   * @see Writer#putHeader(byte[], int, int, int, int)
165   */
166  private long prevBlockOffset;
167
168  /**
169   * Size on disk of header + data. Excludes checksum. Header field 6,
170   * OR calculated from {@link #onDiskSizeWithoutHeader} when using HDFS checksum.
171   * @see Writer#putHeader(byte[], int, int, int, int)
172   */
173  private int onDiskDataSizeWithHeader;
174  // End of Block Header fields.
175
176  /**
177   * The in-memory representation of the hfile block. Can be on or offheap. Can be backed by
178   * a single ByteBuffer or by many. Make no assumptions.
179   *
180   * <p>Be careful reading from this <code>buf</code>. Duplicate and work on the duplicate or if
181   * not, be sure to reset position and limit else trouble down the road.
182   *
183   * <p>TODO: Make this read-only once made.
184   *
185   * <p>We are using the ByteBuff type. ByteBuffer is not extensible yet we need to be able to have
186   * a ByteBuffer-like API across multiple ByteBuffers reading from a cache such as BucketCache.
187   * So, we have this ByteBuff type. Unfortunately, it is spread all about HFileBlock. Would be
188   * good if could be confined to cache-use only but hard-to-do.
189   */
190  private ByteBuff buf;
191
192  /** Meta data that holds meta information on the hfileblock.
193   */
194  private HFileContext fileContext;
195
196  /**
197   * The offset of this block in the file. Populated by the reader for
198   * convenience of access. This offset is not part of the block header.
199   */
200  private long offset = UNSET;
201
202  /**
203   * The on-disk size of the next block, including the header and checksums if present.
204   * UNSET if unknown.
205   *
206   * Blocks try to carry the size of the next block to read in this data member. Usually
207   * we get block sizes from the hfile index but sometimes the index is not available:
208   * e.g. when we read the indexes themselves (indexes are stored in blocks, we do not
209   * have an index for the indexes). Saves seeks especially around file open when
210   * there is a flurry of reading in hfile metadata.
211   */
212  private int nextBlockOnDiskSize = UNSET;
213
214  private ByteBuffAllocator allocator;
215
216  /**
217   * On a checksum failure, do these many succeeding read requests using hdfs checksums before
218   * auto-reenabling hbase checksum verification.
219   */
220  static final int CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD = 3;
221
222  private static int UNSET = -1;
223  public static final boolean FILL_HEADER = true;
224  public static final boolean DONT_FILL_HEADER = false;
225
226  // How to get the estimate correctly? if it is a singleBB?
227  public static final int MULTI_BYTE_BUFFER_HEAP_SIZE =
228      (int)ClassSize.estimateBase(MultiByteBuff.class, false);
229
230  /**
231   * Space for metadata on a block that gets stored along with the block when we cache it.
232   * There are a few bytes stuck on the end of the HFileBlock that we pull in from HDFS.
233   * 8 bytes are for the offset of this block (long) in the file. Offset is important because is is
234   * used when we remake the CacheKey when we return block to the cache when done. There is also
235   * a flag on whether checksumming is being done by hbase or not. See class comment for note on
236   * uncertain state of checksumming of blocks that come out of cache (should we or should we not?).
237   * Finally there are 4 bytes to hold the length of the next block which can save a seek on
238   * occasion if available.
239   * (This EXTRA info came in with original commit of the bucketcache, HBASE-7404. It was
240   * formerly known as EXTRA_SERIALIZATION_SPACE).
241   */
242  static final int BLOCK_METADATA_SPACE = Bytes.SIZEOF_BYTE + Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT;
243
244  /**
245   * Each checksum value is an integer that can be stored in 4 bytes.
246   */
247  static final int CHECKSUM_SIZE = Bytes.SIZEOF_INT;
248
249  static final byte[] DUMMY_HEADER_NO_CHECKSUM =
250      new byte[HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM];
251
252  /**
253   * Used deserializing blocks from Cache.
254   *
255   * <code>
256   * ++++++++++++++
257   * + HFileBlock +
258   * ++++++++++++++
259   * + Checksums  + <= Optional
260   * ++++++++++++++
261   * + Metadata!  + <= See note on BLOCK_METADATA_SPACE above.
262   * ++++++++++++++
263   * </code>
264   * @see #serialize(ByteBuffer, boolean)
265   */
266  public static final CacheableDeserializer<Cacheable> BLOCK_DESERIALIZER = new BlockDeserializer();
267
268  public static final class BlockDeserializer implements CacheableDeserializer<Cacheable> {
269    private BlockDeserializer() {
270    }
271
272    @Override
273    public HFileBlock deserialize(ByteBuff buf, ByteBuffAllocator alloc)
274        throws IOException {
275      // The buf has the file block followed by block metadata.
276      // Set limit to just before the BLOCK_METADATA_SPACE then rewind.
277      buf.limit(buf.limit() - BLOCK_METADATA_SPACE).rewind();
278      // Get a new buffer to pass the HFileBlock for it to 'own'.
279      ByteBuff newByteBuff = buf.slice();
280      // Read out the BLOCK_METADATA_SPACE content and shove into our HFileBlock.
281      buf.position(buf.limit());
282      buf.limit(buf.limit() + HFileBlock.BLOCK_METADATA_SPACE);
283      boolean usesChecksum = buf.get() == (byte) 1;
284      long offset = buf.getLong();
285      int nextBlockOnDiskSize = buf.getInt();
286      return createFromBuff(newByteBuff, usesChecksum, offset, nextBlockOnDiskSize, null, alloc);
287    }
288
289    @Override
290    public int getDeserializerIdentifier() {
291      return DESERIALIZER_IDENTIFIER;
292    }
293  }
294
295  private static final int DESERIALIZER_IDENTIFIER;
296  static {
297    DESERIALIZER_IDENTIFIER =
298        CacheableDeserializerIdManager.registerDeserializer(BLOCK_DESERIALIZER);
299  }
300
301  /**
302   * Creates a new {@link HFile} block from the given fields. This constructor
303   * is used only while writing blocks and caching,
304   * and is sitting in a byte buffer and we want to stuff the block into cache.
305   * See {@link Writer#getBlockForCaching(CacheConfig)}.
306   *
307   * <p>TODO: The caller presumes no checksumming
308   * <p>TODO: HFile block writer can also off-heap ? </p>
309   * required of this block instance since going into cache; checksum already verified on
310   * underlying block data pulled in from filesystem. Is that correct? What if cache is SSD?
311   *
312   * @param blockType the type of this block, see {@link BlockType}
313   * @param onDiskSizeWithoutHeader see {@link #onDiskSizeWithoutHeader}
314   * @param uncompressedSizeWithoutHeader see {@link #uncompressedSizeWithoutHeader}
315   * @param prevBlockOffset see {@link #prevBlockOffset}
316   * @param buf block buffer with header ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes)
317   * @param fillHeader when true, write the first 4 header fields into passed buffer.
318   * @param offset the file offset the block was read from
319   * @param onDiskDataSizeWithHeader see {@link #onDiskDataSizeWithHeader}
320   * @param fileContext HFile meta data
321   */
322  @VisibleForTesting
323  public HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader,
324      int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuff buf, boolean fillHeader,
325      long offset, int nextBlockOnDiskSize, int onDiskDataSizeWithHeader, HFileContext fileContext,
326      ByteBuffAllocator allocator) {
327    this.blockType = blockType;
328    this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader;
329    this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader;
330    this.prevBlockOffset = prevBlockOffset;
331    this.offset = offset;
332    this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader;
333    this.nextBlockOnDiskSize = nextBlockOnDiskSize;
334    this.fileContext = fileContext;
335    this.allocator = allocator;
336    this.buf = buf;
337    if (fillHeader) {
338      overwriteHeader();
339    }
340    this.buf.rewind();
341  }
342
343  /**
344   * Creates a block from an existing buffer starting with a header. Rewinds
345   * and takes ownership of the buffer. By definition of rewind, ignores the
346   * buffer position, but if you slice the buffer beforehand, it will rewind
347   * to that point.
348   * @param buf Has header, content, and trailing checksums if present.
349   */
350  static HFileBlock createFromBuff(ByteBuff buf, boolean usesHBaseChecksum, final long offset,
351      final int nextBlockOnDiskSize, HFileContext fileContext, ByteBuffAllocator allocator)
352      throws IOException {
353    buf.rewind();
354    final BlockType blockType = BlockType.read(buf);
355    final int onDiskSizeWithoutHeader = buf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX);
356    final int uncompressedSizeWithoutHeader =
357        buf.getInt(Header.UNCOMPRESSED_SIZE_WITHOUT_HEADER_INDEX);
358    final long prevBlockOffset = buf.getLong(Header.PREV_BLOCK_OFFSET_INDEX);
359    // This constructor is called when we deserialize a block from cache and when we read a block in
360    // from the fs. fileCache is null when deserialized from cache so need to make up one.
361    HFileContextBuilder fileContextBuilder = fileContext != null ?
362      new HFileContextBuilder(fileContext) : new HFileContextBuilder();
363    fileContextBuilder.withHBaseCheckSum(usesHBaseChecksum);
364    int onDiskDataSizeWithHeader;
365    if (usesHBaseChecksum) {
366      byte checksumType = buf.get(Header.CHECKSUM_TYPE_INDEX);
367      int bytesPerChecksum = buf.getInt(Header.BYTES_PER_CHECKSUM_INDEX);
368      onDiskDataSizeWithHeader = buf.getInt(Header.ON_DISK_DATA_SIZE_WITH_HEADER_INDEX);
369      // Use the checksum type and bytes per checksum from header, not from fileContext.
370      fileContextBuilder.withChecksumType(ChecksumType.codeToType(checksumType));
371      fileContextBuilder.withBytesPerCheckSum(bytesPerChecksum);
372    } else {
373      fileContextBuilder.withChecksumType(ChecksumType.NULL);
374      fileContextBuilder.withBytesPerCheckSum(0);
375      // Need to fix onDiskDataSizeWithHeader; there are not checksums after-block-data
376      onDiskDataSizeWithHeader = onDiskSizeWithoutHeader + headerSize(usesHBaseChecksum);
377    }
378    fileContext = fileContextBuilder.build();
379    assert usesHBaseChecksum == fileContext.isUseHBaseChecksum();
380    return new HFileBlockBuilder()
381        .withBlockType(blockType)
382        .withOnDiskSizeWithoutHeader(onDiskSizeWithoutHeader)
383        .withUncompressedSizeWithoutHeader(uncompressedSizeWithoutHeader)
384        .withPrevBlockOffset(prevBlockOffset)
385        .withOffset(offset)
386        .withOnDiskDataSizeWithHeader(onDiskDataSizeWithHeader)
387        .withNextBlockOnDiskSize(nextBlockOnDiskSize)
388        .withHFileContext(fileContext)
389        .withByteBuffAllocator(allocator)
390        .withByteBuff(buf.rewind())
391        .withShared(!buf.hasArray())
392        .build();
393  }
394
395  /**
396   * Parse total on disk size including header and checksum.
397   * @param headerBuf Header ByteBuffer. Presumed exact size of header.
398   * @param verifyChecksum true if checksum verification is in use.
399   * @return Size of the block with header included.
400   */
401  private static int getOnDiskSizeWithHeader(final ByteBuff headerBuf,
402      boolean verifyChecksum) {
403    return headerBuf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX) + headerSize(verifyChecksum);
404  }
405
406  /**
407   * @return the on-disk size of the next block (including the header size and any checksums if
408   *   present) read by peeking into the next block's header; use as a hint when doing
409   *   a read of the next block when scanning or running over a file.
410   */
411  int getNextBlockOnDiskSize() {
412    return nextBlockOnDiskSize;
413  }
414
415  @Override
416  public BlockType getBlockType() {
417    return blockType;
418  }
419
420  @Override
421  public int refCnt() {
422    return buf.refCnt();
423  }
424
425  @Override
426  public HFileBlock retain() {
427    buf.retain();
428    return this;
429  }
430
431  /**
432   * Call {@link ByteBuff#release()} to decrease the reference count, if no other reference, it will
433   * return back the {@link ByteBuffer} to {@link org.apache.hadoop.hbase.io.ByteBuffAllocator}
434   */
435  @Override
436  public boolean release() {
437    return buf.release();
438  }
439
440  /** @return get data block encoding id that was used to encode this block */
441  short getDataBlockEncodingId() {
442    if (blockType != BlockType.ENCODED_DATA) {
443      throw new IllegalArgumentException("Querying encoder ID of a block " +
444          "of type other than " + BlockType.ENCODED_DATA + ": " + blockType);
445    }
446    return buf.getShort(headerSize());
447  }
448
449  /**
450   * @return the on-disk size of header + data part + checksum.
451   */
452  public int getOnDiskSizeWithHeader() {
453    return onDiskSizeWithoutHeader + headerSize();
454  }
455
456  /**
457   * @return the on-disk size of the data part + checksum (header excluded).
458   */
459  int getOnDiskSizeWithoutHeader() {
460    return onDiskSizeWithoutHeader;
461  }
462
463  /**
464   * @return the uncompressed size of data part (header and checksum excluded).
465   */
466  int getUncompressedSizeWithoutHeader() {
467    return uncompressedSizeWithoutHeader;
468  }
469
470  /**
471   * @return the offset of the previous block of the same type in the file, or
472   *         -1 if unknown
473   */
474  long getPrevBlockOffset() {
475    return prevBlockOffset;
476  }
477
478  /**
479   * Rewinds {@code buf} and writes first 4 header fields. {@code buf} position
480   * is modified as side-effect.
481   */
482  private void overwriteHeader() {
483    buf.rewind();
484    blockType.write(buf);
485    buf.putInt(onDiskSizeWithoutHeader);
486    buf.putInt(uncompressedSizeWithoutHeader);
487    buf.putLong(prevBlockOffset);
488    if (this.fileContext.isUseHBaseChecksum()) {
489      buf.put(fileContext.getChecksumType().getCode());
490      buf.putInt(fileContext.getBytesPerChecksum());
491      buf.putInt(onDiskDataSizeWithHeader);
492    }
493  }
494
495  /**
496   * Returns a buffer that does not include the header and checksum.
497   * @return the buffer with header skipped and checksum omitted.
498   */
499  public ByteBuff getBufferWithoutHeader() {
500    return this.getBufferWithoutHeader(false);
501  }
502
503  /**
504   * Returns a buffer that does not include the header or checksum.
505   * @param withChecksum to indicate whether include the checksum or not.
506   * @return the buffer with header skipped and checksum omitted.
507   */
508  public ByteBuff getBufferWithoutHeader(boolean withChecksum) {
509    ByteBuff dup = getBufferReadOnly();
510    int delta = withChecksum ? 0 : totalChecksumBytes();
511    return dup.position(headerSize()).limit(buf.limit() - delta).slice();
512  }
513
514  /**
515   * Returns a read-only duplicate of the buffer this block stores internally ready to be read.
516   * Clients must not modify the buffer object though they may set position and limit on the
517   * returned buffer since we pass back a duplicate. This method has to be public because it is used
518   * in {@link CompoundBloomFilter} to avoid object creation on every Bloom
519   * filter lookup, but has to be used with caution. Buffer holds header, block content,
520   * and any follow-on checksums if present.
521   *
522   * @return the buffer of this block for read-only operations
523   */
524  public ByteBuff getBufferReadOnly() {
525    // TODO: ByteBuf does not support asReadOnlyBuffer(). Fix.
526    ByteBuff dup = this.buf.duplicate();
527    assert dup.position() == 0;
528    return dup;
529  }
530
531  public ByteBuffAllocator getByteBuffAllocator() {
532    return this.allocator;
533  }
534
535  @VisibleForTesting
536  private void sanityCheckAssertion(long valueFromBuf, long valueFromField,
537      String fieldName) throws IOException {
538    if (valueFromBuf != valueFromField) {
539      throw new AssertionError(fieldName + " in the buffer (" + valueFromBuf
540          + ") is different from that in the field (" + valueFromField + ")");
541    }
542  }
543
544  @VisibleForTesting
545  private void sanityCheckAssertion(BlockType valueFromBuf, BlockType valueFromField)
546      throws IOException {
547    if (valueFromBuf != valueFromField) {
548      throw new IOException("Block type stored in the buffer: " +
549        valueFromBuf + ", block type field: " + valueFromField);
550    }
551  }
552
553  /**
554   * Checks if the block is internally consistent, i.e. the first
555   * {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the buffer contain a
556   * valid header consistent with the fields. Assumes a packed block structure.
557   * This function is primary for testing and debugging, and is not
558   * thread-safe, because it alters the internal buffer pointer.
559   * Used by tests only.
560   */
561  @VisibleForTesting
562  void sanityCheck() throws IOException {
563    // Duplicate so no side-effects
564    ByteBuff dup = this.buf.duplicate().rewind();
565    sanityCheckAssertion(BlockType.read(dup), blockType);
566
567    sanityCheckAssertion(dup.getInt(), onDiskSizeWithoutHeader, "onDiskSizeWithoutHeader");
568
569    sanityCheckAssertion(dup.getInt(), uncompressedSizeWithoutHeader,
570        "uncompressedSizeWithoutHeader");
571
572    sanityCheckAssertion(dup.getLong(), prevBlockOffset, "prevBlockOffset");
573    if (this.fileContext.isUseHBaseChecksum()) {
574      sanityCheckAssertion(dup.get(), this.fileContext.getChecksumType().getCode(), "checksumType");
575      sanityCheckAssertion(dup.getInt(), this.fileContext.getBytesPerChecksum(),
576          "bytesPerChecksum");
577      sanityCheckAssertion(dup.getInt(), onDiskDataSizeWithHeader, "onDiskDataSizeWithHeader");
578    }
579
580    int cksumBytes = totalChecksumBytes();
581    int expectedBufLimit = onDiskDataSizeWithHeader + cksumBytes;
582    if (dup.limit() != expectedBufLimit) {
583      throw new AssertionError("Expected limit " + expectedBufLimit + ", got " + dup.limit());
584    }
585
586    // We might optionally allocate HFILEBLOCK_HEADER_SIZE more bytes to read the next
587    // block's header, so there are two sensible values for buffer capacity.
588    int hdrSize = headerSize();
589    dup.rewind();
590    if (dup.remaining() != expectedBufLimit && dup.remaining() != expectedBufLimit + hdrSize) {
591      throw new AssertionError("Invalid buffer capacity: " + dup.remaining() +
592          ", expected " + expectedBufLimit + " or " + (expectedBufLimit + hdrSize));
593    }
594  }
595
596  @Override
597  public String toString() {
598    StringBuilder sb = new StringBuilder()
599      .append("[")
600      .append("blockType=").append(blockType)
601      .append(", fileOffset=").append(offset)
602      .append(", headerSize=").append(headerSize())
603      .append(", onDiskSizeWithoutHeader=").append(onDiskSizeWithoutHeader)
604      .append(", uncompressedSizeWithoutHeader=").append(uncompressedSizeWithoutHeader)
605      .append(", prevBlockOffset=").append(prevBlockOffset)
606      .append(", isUseHBaseChecksum=").append(fileContext.isUseHBaseChecksum());
607    if (fileContext.isUseHBaseChecksum()) {
608      sb.append(", checksumType=").append(ChecksumType.codeToType(this.buf.get(24)))
609        .append(", bytesPerChecksum=").append(this.buf.getInt(24 + 1))
610        .append(", onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader);
611    } else {
612      sb.append(", onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader)
613        .append("(").append(onDiskSizeWithoutHeader)
614        .append("+").append(HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM).append(")");
615    }
616    String dataBegin;
617    if (buf.hasArray()) {
618      dataBegin = Bytes.toStringBinary(buf.array(), buf.arrayOffset() + headerSize(),
619          Math.min(32, buf.limit() - buf.arrayOffset() - headerSize()));
620    } else {
621      ByteBuff bufWithoutHeader = getBufferWithoutHeader();
622      byte[] dataBeginBytes = new byte[Math.min(32,
623          bufWithoutHeader.limit() - bufWithoutHeader.position())];
624      bufWithoutHeader.get(dataBeginBytes);
625      dataBegin = Bytes.toStringBinary(dataBeginBytes);
626    }
627    sb.append(", getOnDiskSizeWithHeader=").append(getOnDiskSizeWithHeader())
628      .append(", totalChecksumBytes=").append(totalChecksumBytes())
629      .append(", isUnpacked=").append(isUnpacked())
630      .append(", buf=[").append(buf).append("]")
631      .append(", dataBeginsWith=").append(dataBegin)
632      .append(", fileContext=").append(fileContext)
633      .append(", nextBlockOnDiskSize=").append(nextBlockOnDiskSize)
634      .append("]");
635    return sb.toString();
636  }
637
638  /**
639   * Retrieves the decompressed/decrypted view of this block. An encoded block remains in its
640   * encoded structure. Internal structures are shared between instances where applicable.
641   */
642  HFileBlock unpack(HFileContext fileContext, FSReader reader) throws IOException {
643    if (!fileContext.isCompressedOrEncrypted()) {
644      // TODO: cannot use our own fileContext here because HFileBlock(ByteBuffer, boolean),
645      // which is used for block serialization to L2 cache, does not preserve encoding and
646      // encryption details.
647      return this;
648    }
649
650    HFileBlock unpacked = shallowClone(this);
651    unpacked.allocateBuffer(); // allocates space for the decompressed block
652    boolean succ = false;
653    try {
654      HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA
655          ? reader.getBlockDecodingContext() : reader.getDefaultBlockDecodingContext();
656      // Create a duplicated buffer without the header part.
657      ByteBuff dup = this.buf.duplicate();
658      dup.position(this.headerSize());
659      dup = dup.slice();
660      // Decode the dup into unpacked#buf
661      ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(),
662        unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(true), dup);
663      succ = true;
664      return unpacked;
665    } finally {
666      if (!succ) {
667        unpacked.release();
668      }
669    }
670  }
671
672  /**
673   * Always allocates a new buffer of the correct size. Copies header bytes
674   * from the existing buffer. Does not change header fields.
675   * Reserve room to keep checksum bytes too.
676   */
677  private void allocateBuffer() {
678    int cksumBytes = totalChecksumBytes();
679    int headerSize = headerSize();
680    int capacityNeeded = headerSize + uncompressedSizeWithoutHeader + cksumBytes;
681
682    ByteBuff newBuf = allocator.allocate(capacityNeeded);
683
684    // Copy header bytes into newBuf.
685    buf.position(0);
686    newBuf.put(0, buf, 0, headerSize);
687
688    buf = newBuf;
689    // set limit to exclude next block's header
690    buf.limit(capacityNeeded);
691  }
692
693  /**
694   * Return true when this block's buffer has been unpacked, false otherwise. Note this is a
695   * calculated heuristic, not tracked attribute of the block.
696   */
697  public boolean isUnpacked() {
698    final int cksumBytes = totalChecksumBytes();
699    final int headerSize = headerSize();
700    final int expectedCapacity = headerSize + uncompressedSizeWithoutHeader + cksumBytes;
701    final int bufCapacity = buf.remaining();
702    return bufCapacity == expectedCapacity || bufCapacity == expectedCapacity + headerSize;
703  }
704
705  /**
706   * Cannot be {@link #UNSET}. Must be a legitimate value. Used re-making the {@link BlockCacheKey}
707   * when block is returned to the cache.
708   * @return the offset of this block in the file it was read from
709   */
710  long getOffset() {
711    if (offset < 0) {
712      throw new IllegalStateException("HFile block offset not initialized properly");
713    }
714    return offset;
715  }
716
717  /**
718   * @return a byte stream reading the data + checksum of this block
719   */
720  DataInputStream getByteStream() {
721    ByteBuff dup = this.buf.duplicate();
722    dup.position(this.headerSize());
723    return new DataInputStream(new ByteBuffInputStream(dup));
724  }
725
726  @Override
727  public long heapSize() {
728    long size = FIXED_OVERHEAD;
729    size += fileContext.heapSize();
730    if (buf != null) {
731      // Deep overhead of the byte buffer. Needs to be aligned separately.
732      size += ClassSize.align(buf.capacity() + MULTI_BYTE_BUFFER_HEAP_SIZE);
733    }
734    return ClassSize.align(size);
735  }
736
737  /**
738   * Will be override by {@link SharedMemHFileBlock} or {@link ExclusiveMemHFileBlock}. Return true
739   * by default.
740   */
741  public boolean isSharedMem() {
742    if (this instanceof SharedMemHFileBlock) {
743      return true;
744    } else if (this instanceof ExclusiveMemHFileBlock) {
745      return false;
746    }
747    return true;
748  }
749
750  /**
751   * Unified version 2 {@link HFile} block writer. The intended usage pattern
752   * is as follows:
753   * <ol>
754   * <li>Construct an {@link HFileBlock.Writer}, providing a compression algorithm.
755   * <li>Call {@link Writer#startWriting} and get a data stream to write to.
756   * <li>Write your data into the stream.
757   * <li>Call Writer#writeHeaderAndData(FSDataOutputStream) as many times as you need to.
758   * store the serialized block into an external stream.
759   * <li>Repeat to write more blocks.
760   * </ol>
761   * <p>
762   */
763  static class Writer implements ShipperListener {
764    private enum State {
765      INIT,
766      WRITING,
767      BLOCK_READY
768    };
769
770    /** Writer state. Used to ensure the correct usage protocol. */
771    private State state = State.INIT;
772
773    /** Data block encoder used for data blocks */
774    private final HFileDataBlockEncoder dataBlockEncoder;
775
776    private HFileBlockEncodingContext dataBlockEncodingCtx;
777
778    /** block encoding context for non-data blocks*/
779    private HFileBlockDefaultEncodingContext defaultBlockEncodingCtx;
780
781    /**
782     * The stream we use to accumulate data into a block in an uncompressed format.
783     * We reset this stream at the end of each block and reuse it. The
784     * header is written as the first {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes into this
785     * stream.
786     */
787    private ByteArrayOutputStream baosInMemory;
788
789    /**
790     * Current block type. Set in {@link #startWriting(BlockType)}. Could be
791     * changed in {@link #finishBlock()} from {@link BlockType#DATA}
792     * to {@link BlockType#ENCODED_DATA}.
793     */
794    private BlockType blockType;
795
796    /**
797     * A stream that we write uncompressed bytes to, which compresses them and
798     * writes them to {@link #baosInMemory}.
799     */
800    private DataOutputStream userDataStream;
801
802    /**
803     * Bytes to be written to the file system, including the header. Compressed
804     * if compression is turned on. It also includes the checksum data that
805     * immediately follows the block data. (header + data + checksums)
806     */
807    private ByteArrayOutputStream onDiskBlockBytesWithHeader;
808
809    /**
810     * The size of the checksum data on disk. It is used only if data is
811     * not compressed. If data is compressed, then the checksums are already
812     * part of onDiskBytesWithHeader. If data is uncompressed, then this
813     * variable stores the checksum data for this block.
814     */
815    private byte[] onDiskChecksum = HConstants.EMPTY_BYTE_ARRAY;
816
817    /**
818     * Current block's start offset in the {@link HFile}. Set in
819     * {@link #writeHeaderAndData(FSDataOutputStream)}.
820     */
821    private long startOffset;
822
823    /**
824     * Offset of previous block by block type. Updated when the next block is
825     * started.
826     */
827    private long[] prevOffsetByType;
828
829    /** The offset of the previous block of the same type */
830    private long prevOffset;
831    /** Meta data that holds information about the hfileblock**/
832    private HFileContext fileContext;
833
834    private final ByteBuffAllocator allocator;
835
836    @Override
837    public void beforeShipped() {
838      if (getEncodingState() != null) {
839        getEncodingState().beforeShipped();
840      }
841    }
842
843    EncodingState getEncodingState() {
844      return dataBlockEncodingCtx.getEncodingState();
845    }
846
847    /**
848     * @param dataBlockEncoder data block encoding algorithm to use
849     */
850    @VisibleForTesting
851    public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext fileContext) {
852      this(dataBlockEncoder, fileContext, ByteBuffAllocator.HEAP);
853    }
854
855    public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext fileContext,
856        ByteBuffAllocator allocator) {
857      if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) {
858        throw new RuntimeException("Unsupported value of bytesPerChecksum. " +
859            " Minimum is " + HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is " +
860            fileContext.getBytesPerChecksum());
861      }
862      this.allocator = allocator;
863      this.dataBlockEncoder = dataBlockEncoder != null?
864          dataBlockEncoder: NoOpDataBlockEncoder.INSTANCE;
865      this.dataBlockEncodingCtx = this.dataBlockEncoder.
866          newDataBlockEncodingContext(HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
867      // TODO: This should be lazily instantiated since we usually do NOT need this default encoder
868      this.defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(null,
869          HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
870      // TODO: Set BAOS initial size. Use fileContext.getBlocksize() and add for header/checksum
871      baosInMemory = new ByteArrayOutputStream();
872      prevOffsetByType = new long[BlockType.values().length];
873      for (int i = 0; i < prevOffsetByType.length; ++i) {
874        prevOffsetByType[i] = UNSET;
875      }
876      // TODO: Why fileContext saved away when we have dataBlockEncoder and/or
877      // defaultDataBlockEncoder?
878      this.fileContext = fileContext;
879    }
880
881    /**
882     * Starts writing into the block. The previous block's data is discarded.
883     *
884     * @return the stream the user can write their data into
885     */
886    DataOutputStream startWriting(BlockType newBlockType)
887        throws IOException {
888      if (state == State.BLOCK_READY && startOffset != -1) {
889        // We had a previous block that was written to a stream at a specific
890        // offset. Save that offset as the last offset of a block of that type.
891        prevOffsetByType[blockType.getId()] = startOffset;
892      }
893
894      startOffset = -1;
895      blockType = newBlockType;
896
897      baosInMemory.reset();
898      baosInMemory.write(HConstants.HFILEBLOCK_DUMMY_HEADER);
899
900      state = State.WRITING;
901
902      // We will compress it later in finishBlock()
903      userDataStream = new ByteBufferWriterDataOutputStream(baosInMemory);
904      if (newBlockType == BlockType.DATA) {
905        this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream);
906      }
907      return userDataStream;
908    }
909
910    /**
911     * Writes the Cell to this block
912     */
913    void write(Cell cell) throws IOException{
914      expectState(State.WRITING);
915      this.dataBlockEncoder.encode(cell, dataBlockEncodingCtx, this.userDataStream);
916    }
917
918    /**
919     * Transitions the block writer from the "writing" state to the "block
920     * ready" state.  Does nothing if a block is already finished.
921     */
922    void ensureBlockReady() throws IOException {
923      Preconditions.checkState(state != State.INIT,
924          "Unexpected state: " + state);
925
926      if (state == State.BLOCK_READY) {
927        return;
928      }
929
930      // This will set state to BLOCK_READY.
931      finishBlock();
932    }
933
934    /**
935     * Finish up writing of the block.
936     * Flushes the compressing stream (if using compression), fills out the header,
937     * does any compression/encryption of bytes to flush out to disk, and manages
938     * the cache on write content, if applicable. Sets block write state to "block ready".
939     */
940    private void finishBlock() throws IOException {
941      if (blockType == BlockType.DATA) {
942        this.dataBlockEncoder.endBlockEncoding(dataBlockEncodingCtx, userDataStream,
943            baosInMemory.getBuffer(), blockType);
944        blockType = dataBlockEncodingCtx.getBlockType();
945      }
946      userDataStream.flush();
947      prevOffset = prevOffsetByType[blockType.getId()];
948
949      // We need to set state before we can package the block up for cache-on-write. In a way, the
950      // block is ready, but not yet encoded or compressed.
951      state = State.BLOCK_READY;
952      Bytes compressAndEncryptDat;
953      if (blockType == BlockType.DATA || blockType == BlockType.ENCODED_DATA) {
954        compressAndEncryptDat = dataBlockEncodingCtx.
955            compressAndEncrypt(baosInMemory.getBuffer(), 0, baosInMemory.size());
956      } else {
957        compressAndEncryptDat = defaultBlockEncodingCtx.
958            compressAndEncrypt(baosInMemory.getBuffer(), 0, baosInMemory.size());
959      }
960      if (compressAndEncryptDat == null) {
961        compressAndEncryptDat = new Bytes(baosInMemory.getBuffer(), 0, baosInMemory.size());
962      }
963      if (onDiskBlockBytesWithHeader == null) {
964        onDiskBlockBytesWithHeader = new ByteArrayOutputStream(compressAndEncryptDat.getLength());
965      }
966      onDiskBlockBytesWithHeader.reset();
967      onDiskBlockBytesWithHeader.write(compressAndEncryptDat.get(),
968            compressAndEncryptDat.getOffset(), compressAndEncryptDat.getLength());
969      // Calculate how many bytes we need for checksum on the tail of the block.
970      int numBytes = (int) ChecksumUtil.numBytes(
971          onDiskBlockBytesWithHeader.size(),
972          fileContext.getBytesPerChecksum());
973
974      // Put the header for the on disk bytes; header currently is unfilled-out
975      putHeader(onDiskBlockBytesWithHeader,
976          onDiskBlockBytesWithHeader.size() + numBytes,
977          baosInMemory.size(), onDiskBlockBytesWithHeader.size());
978      if (onDiskChecksum.length != numBytes) {
979        onDiskChecksum = new byte[numBytes];
980      }
981      ChecksumUtil.generateChecksums(
982          onDiskBlockBytesWithHeader.getBuffer(), 0,onDiskBlockBytesWithHeader.size(),
983          onDiskChecksum, 0, fileContext.getChecksumType(), fileContext.getBytesPerChecksum());
984    }
985
986    /**
987     * Put the header into the given byte array at the given offset.
988     * @param onDiskSize size of the block on disk header + data + checksum
989     * @param uncompressedSize size of the block after decompression (but
990     *          before optional data block decoding) including header
991     * @param onDiskDataSize size of the block on disk with header
992     *        and data but not including the checksums
993     */
994    private void putHeader(byte[] dest, int offset, int onDiskSize,
995        int uncompressedSize, int onDiskDataSize) {
996      offset = blockType.put(dest, offset);
997      offset = Bytes.putInt(dest, offset, onDiskSize - HConstants.HFILEBLOCK_HEADER_SIZE);
998      offset = Bytes.putInt(dest, offset, uncompressedSize - HConstants.HFILEBLOCK_HEADER_SIZE);
999      offset = Bytes.putLong(dest, offset, prevOffset);
1000      offset = Bytes.putByte(dest, offset, fileContext.getChecksumType().getCode());
1001      offset = Bytes.putInt(dest, offset, fileContext.getBytesPerChecksum());
1002      Bytes.putInt(dest, offset, onDiskDataSize);
1003    }
1004
1005    private void putHeader(ByteBuff buff, int onDiskSize,
1006        int uncompressedSize, int onDiskDataSize) {
1007      buff.rewind();
1008      blockType.write(buff);
1009      buff.putInt(onDiskSize - HConstants.HFILEBLOCK_HEADER_SIZE);
1010      buff.putInt(uncompressedSize - HConstants.HFILEBLOCK_HEADER_SIZE);
1011      buff.putLong(prevOffset);
1012      buff.put(fileContext.getChecksumType().getCode());
1013      buff.putInt(fileContext.getBytesPerChecksum());
1014      buff.putInt(onDiskDataSize);
1015    }
1016
1017    private void putHeader(ByteArrayOutputStream dest, int onDiskSize,
1018        int uncompressedSize, int onDiskDataSize) {
1019      putHeader(dest.getBuffer(),0, onDiskSize, uncompressedSize, onDiskDataSize);
1020    }
1021
1022    /**
1023     * Similar to {@link #writeHeaderAndData(FSDataOutputStream)}, but records
1024     * the offset of this block so that it can be referenced in the next block
1025     * of the same type.
1026     */
1027    void writeHeaderAndData(FSDataOutputStream out) throws IOException {
1028      long offset = out.getPos();
1029      if (startOffset != UNSET && offset != startOffset) {
1030        throw new IOException("A " + blockType + " block written to a "
1031            + "stream twice, first at offset " + startOffset + ", then at "
1032            + offset);
1033      }
1034      startOffset = offset;
1035      finishBlockAndWriteHeaderAndData(out);
1036    }
1037
1038    /**
1039     * Writes the header and the compressed data of this block (or uncompressed
1040     * data when not using compression) into the given stream. Can be called in
1041     * the "writing" state or in the "block ready" state. If called in the
1042     * "writing" state, transitions the writer to the "block ready" state.
1043     * @param out the output stream to write the
1044     */
1045    protected void finishBlockAndWriteHeaderAndData(DataOutputStream out)
1046      throws IOException {
1047      ensureBlockReady();
1048      long startTime = System.currentTimeMillis();
1049      out.write(onDiskBlockBytesWithHeader.getBuffer(), 0, onDiskBlockBytesWithHeader.size());
1050      out.write(onDiskChecksum);
1051      HFile.updateWriteLatency(System.currentTimeMillis() - startTime);
1052    }
1053
1054    /**
1055     * Returns the header or the compressed data (or uncompressed data when not
1056     * using compression) as a byte array. Can be called in the "writing" state
1057     * or in the "block ready" state. If called in the "writing" state,
1058     * transitions the writer to the "block ready" state. This returns
1059     * the header + data + checksums stored on disk.
1060     *
1061     * @return header and data as they would be stored on disk in a byte array
1062     */
1063    byte[] getHeaderAndDataForTest() throws IOException {
1064      ensureBlockReady();
1065      // This is not very optimal, because we are doing an extra copy.
1066      // But this method is used only by unit tests.
1067      byte[] output =
1068          new byte[onDiskBlockBytesWithHeader.size()
1069              + onDiskChecksum.length];
1070      System.arraycopy(onDiskBlockBytesWithHeader.getBuffer(), 0, output, 0,
1071          onDiskBlockBytesWithHeader.size());
1072      System.arraycopy(onDiskChecksum, 0, output,
1073          onDiskBlockBytesWithHeader.size(), onDiskChecksum.length);
1074      return output;
1075    }
1076
1077    /**
1078     * Releases resources used by this writer.
1079     */
1080    void release() {
1081      if (dataBlockEncodingCtx != null) {
1082        dataBlockEncodingCtx.close();
1083        dataBlockEncodingCtx = null;
1084      }
1085      if (defaultBlockEncodingCtx != null) {
1086        defaultBlockEncodingCtx.close();
1087        defaultBlockEncodingCtx = null;
1088      }
1089    }
1090
1091    /**
1092     * Returns the on-disk size of the data portion of the block. This is the
1093     * compressed size if compression is enabled. Can only be called in the
1094     * "block ready" state. Header is not compressed, and its size is not
1095     * included in the return value.
1096     *
1097     * @return the on-disk size of the block, not including the header.
1098     */
1099    int getOnDiskSizeWithoutHeader() {
1100      expectState(State.BLOCK_READY);
1101      return onDiskBlockBytesWithHeader.size() +
1102          onDiskChecksum.length - HConstants.HFILEBLOCK_HEADER_SIZE;
1103    }
1104
1105    /**
1106     * Returns the on-disk size of the block. Can only be called in the
1107     * "block ready" state.
1108     *
1109     * @return the on-disk size of the block ready to be written, including the
1110     *         header size, the data and the checksum data.
1111     */
1112    int getOnDiskSizeWithHeader() {
1113      expectState(State.BLOCK_READY);
1114      return onDiskBlockBytesWithHeader.size() + onDiskChecksum.length;
1115    }
1116
1117    /**
1118     * The uncompressed size of the block data. Does not include header size.
1119     */
1120    int getUncompressedSizeWithoutHeader() {
1121      expectState(State.BLOCK_READY);
1122      return baosInMemory.size() - HConstants.HFILEBLOCK_HEADER_SIZE;
1123    }
1124
1125    /**
1126     * The uncompressed size of the block data, including header size.
1127     */
1128    int getUncompressedSizeWithHeader() {
1129      expectState(State.BLOCK_READY);
1130      return baosInMemory.size();
1131    }
1132
1133    /** @return true if a block is being written  */
1134    boolean isWriting() {
1135      return state == State.WRITING;
1136    }
1137
1138    /**
1139     * Returns the number of bytes written into the current block so far, or
1140     * zero if not writing the block at the moment. Note that this will return
1141     * zero in the "block ready" state as well.
1142     *
1143     * @return the number of bytes written
1144     */
1145    public int encodedBlockSizeWritten() {
1146      return state != State.WRITING ? 0 : this.getEncodingState().getEncodedDataSizeWritten();
1147    }
1148
1149    /**
1150     * Returns the number of bytes written into the current block so far, or
1151     * zero if not writing the block at the moment. Note that this will return
1152     * zero in the "block ready" state as well.
1153     *
1154     * @return the number of bytes written
1155     */
1156    int blockSizeWritten() {
1157      return state != State.WRITING ? 0 : this.getEncodingState().getUnencodedDataSizeWritten();
1158    }
1159
1160    /**
1161     * Clones the header followed by the uncompressed data, even if using
1162     * compression. This is needed for storing uncompressed blocks in the block
1163     * cache. Can be called in the "writing" state or the "block ready" state.
1164     * Returns only the header and data, does not include checksum data.
1165     *
1166     * @return Returns an uncompressed block ByteBuff for caching on write
1167     */
1168    ByteBuff cloneUncompressedBufferWithHeader() {
1169      expectState(State.BLOCK_READY);
1170      ByteBuff bytebuff = allocator.allocate(baosInMemory.size());
1171      baosInMemory.toByteBuff(bytebuff);
1172      int numBytes = (int) ChecksumUtil.numBytes(
1173          onDiskBlockBytesWithHeader.size(),
1174          fileContext.getBytesPerChecksum());
1175      putHeader(bytebuff, onDiskBlockBytesWithHeader.size() + numBytes,
1176          baosInMemory.size(), onDiskBlockBytesWithHeader.size());
1177      bytebuff.rewind();
1178      return bytebuff;
1179    }
1180
1181    /**
1182     * Clones the header followed by the on-disk (compressed/encoded/encrypted) data. This is needed
1183     * for storing packed blocks in the block cache. Returns only the header and data, Does not
1184     * include checksum data.
1185     * @return Returns a copy of block bytes for caching on write
1186     */
1187    private ByteBuff cloneOnDiskBufferWithHeader() {
1188      expectState(State.BLOCK_READY);
1189      ByteBuff bytebuff = allocator.allocate(onDiskBlockBytesWithHeader.size());
1190      onDiskBlockBytesWithHeader.toByteBuff(bytebuff);
1191      bytebuff.rewind();
1192      return bytebuff;
1193    }
1194
1195    private void expectState(State expectedState) {
1196      if (state != expectedState) {
1197        throw new IllegalStateException("Expected state: " + expectedState +
1198            ", actual state: " + state);
1199      }
1200    }
1201
1202    /**
1203     * Takes the given {@link BlockWritable} instance, creates a new block of
1204     * its appropriate type, writes the writable into this block, and flushes
1205     * the block into the output stream. The writer is instructed not to buffer
1206     * uncompressed bytes for cache-on-write.
1207     *
1208     * @param bw the block-writable object to write as a block
1209     * @param out the file system output stream
1210     */
1211    void writeBlock(BlockWritable bw, FSDataOutputStream out)
1212        throws IOException {
1213      bw.writeToBlock(startWriting(bw.getBlockType()));
1214      writeHeaderAndData(out);
1215    }
1216
1217    /**
1218     * Creates a new HFileBlock. Checksums have already been validated, so
1219     * the byte buffer passed into the constructor of this newly created
1220     * block does not have checksum data even though the header minor
1221     * version is MINOR_VERSION_WITH_CHECKSUM. This is indicated by setting a
1222     * 0 value in bytesPerChecksum. This method copies the on-disk or
1223     * uncompressed data to build the HFileBlock which is used only
1224     * while writing blocks and caching.
1225     *
1226     * <p>TODO: Should there be an option where a cache can ask that hbase preserve block
1227     * checksums for checking after a block comes out of the cache? Otehrwise, cache is responsible
1228     * for blocks being wholesome (ECC memory or if file-backed, it does checksumming).
1229     */
1230    HFileBlock getBlockForCaching(CacheConfig cacheConf) {
1231      HFileContext newContext = new HFileContextBuilder()
1232                                .withBlockSize(fileContext.getBlocksize())
1233                                .withBytesPerCheckSum(0)
1234                                .withChecksumType(ChecksumType.NULL) // no checksums in cached data
1235                                .withCompression(fileContext.getCompression())
1236                                .withDataBlockEncoding(fileContext.getDataBlockEncoding())
1237                                .withHBaseCheckSum(fileContext.isUseHBaseChecksum())
1238                                .withCompressTags(fileContext.isCompressTags())
1239                                .withIncludesMvcc(fileContext.isIncludesMvcc())
1240                                .withIncludesTags(fileContext.isIncludesTags())
1241                                .withColumnFamily(fileContext.getColumnFamily())
1242                                .withTableName(fileContext.getTableName())
1243                                .build();
1244      // Build the HFileBlock.
1245      HFileBlockBuilder builder = new HFileBlockBuilder();
1246      ByteBuff buff;
1247      if (cacheConf.shouldCacheCompressed(blockType.getCategory())) {
1248        buff = cloneOnDiskBufferWithHeader();
1249      } else {
1250        buff = cloneUncompressedBufferWithHeader();
1251      }
1252      return builder.withBlockType(blockType)
1253          .withOnDiskSizeWithoutHeader(getOnDiskSizeWithoutHeader())
1254          .withUncompressedSizeWithoutHeader(getUncompressedSizeWithoutHeader())
1255          .withPrevBlockOffset(prevOffset)
1256          .withByteBuff(buff)
1257          .withFillHeader(FILL_HEADER)
1258          .withOffset(startOffset)
1259          .withNextBlockOnDiskSize(UNSET)
1260          .withOnDiskDataSizeWithHeader(onDiskBlockBytesWithHeader.size() + onDiskChecksum.length)
1261          .withHFileContext(newContext)
1262          .withByteBuffAllocator(cacheConf.getByteBuffAllocator())
1263          .withShared(!buff.hasArray())
1264          .build();
1265    }
1266  }
1267
1268  /** Something that can be written into a block. */
1269  interface BlockWritable {
1270    /** The type of block this data should use. */
1271    BlockType getBlockType();
1272
1273    /**
1274     * Writes the block to the provided stream. Must not write any magic
1275     * records.
1276     *
1277     * @param out a stream to write uncompressed data into
1278     */
1279    void writeToBlock(DataOutput out) throws IOException;
1280  }
1281
1282  /**
1283   * Iterator for reading {@link HFileBlock}s in load-on-open-section, such as root data index
1284   * block, meta index block, file info block etc.
1285   */
1286  interface BlockIterator {
1287    /**
1288     * Get the next block, or null if there are no more blocks to iterate.
1289     */
1290    HFileBlock nextBlock() throws IOException;
1291
1292    /**
1293     * Similar to {@link #nextBlock()} but checks block type, throws an exception if incorrect, and
1294     * returns the HFile block
1295     */
1296    HFileBlock nextBlockWithBlockType(BlockType blockType) throws IOException;
1297
1298    /**
1299     * Now we use the {@link ByteBuffAllocator} to manage the nio ByteBuffers for HFileBlocks, so we
1300     * must deallocate all of the ByteBuffers in the end life. the BlockIterator's life cycle is
1301     * starting from opening an HFileReader and stopped when the HFileReader#close, so we will keep
1302     * track all the read blocks until we call {@link BlockIterator#freeBlocks()} when closing the
1303     * HFileReader. Sum bytes of those blocks in load-on-open section should be quite small, so
1304     * tracking them should be OK.
1305     */
1306    void freeBlocks();
1307  }
1308
1309  /** An HFile block reader with iteration ability. */
1310  interface FSReader {
1311    /**
1312     * Reads the block at the given offset in the file with the given on-disk size and uncompressed
1313     * size.
1314     * @param offset of the file to read
1315     * @param onDiskSize the on-disk size of the entire block, including all applicable headers, or
1316     *          -1 if unknown
1317     * @param pread true to use pread, otherwise use the stream read.
1318     * @param updateMetrics update the metrics or not.
1319     * @param intoHeap allocate the block's ByteBuff by {@link ByteBuffAllocator} or JVM heap. For
1320     *          LRUBlockCache, we must ensure that the block to cache is an heap one, because the
1321     *          memory occupation is based on heap now, also for {@link CombinedBlockCache}, we use
1322     *          the heap LRUBlockCache as L1 cache to cache small blocks such as IndexBlock or
1323     *          MetaBlock for faster access. So introduce an flag here to decide whether allocate
1324     *          from JVM heap or not so that we can avoid an extra off-heap to heap memory copy when
1325     *          using LRUBlockCache. For most cases, we known what's the expected block type we'll
1326     *          read, while for some special case (Example: HFileReaderImpl#readNextDataBlock()), we
1327     *          cannot pre-decide what's the expected block type, then we can only allocate block's
1328     *          ByteBuff from {@link ByteBuffAllocator} firstly, and then when caching it in
1329     *          {@link LruBlockCache} we'll check whether the ByteBuff is from heap or not, if not
1330     *          then we'll clone it to an heap one and cache it.
1331     * @return the newly read block
1332     */
1333    HFileBlock readBlockData(long offset, long onDiskSize, boolean pread, boolean updateMetrics,
1334        boolean intoHeap) throws IOException;
1335
1336    /**
1337     * Creates a block iterator over the given portion of the {@link HFile}.
1338     * The iterator returns blocks starting with offset such that offset &lt;=
1339     * startOffset &lt; endOffset. Returned blocks are always unpacked.
1340     * Used when no hfile index available; e.g. reading in the hfile index
1341     * blocks themselves on file open.
1342     *
1343     * @param startOffset the offset of the block to start iteration with
1344     * @param endOffset the offset to end iteration at (exclusive)
1345     * @return an iterator of blocks between the two given offsets
1346     */
1347    BlockIterator blockRange(long startOffset, long endOffset);
1348
1349    /** Closes the backing streams */
1350    void closeStreams() throws IOException;
1351
1352    /** Get a decoder for {@link BlockType#ENCODED_DATA} blocks from this file. */
1353    HFileBlockDecodingContext getBlockDecodingContext();
1354
1355    /** Get the default decoder for blocks from this file. */
1356    HFileBlockDecodingContext getDefaultBlockDecodingContext();
1357
1358    void setIncludesMemStoreTS(boolean includesMemstoreTS);
1359    void setDataBlockEncoder(HFileDataBlockEncoder encoder);
1360
1361    /**
1362     * To close the stream's socket. Note: This can be concurrently called from multiple threads and
1363     * implementation should take care of thread safety.
1364     */
1365    void unbufferStream();
1366  }
1367
1368  /**
1369   * Data-structure to use caching the header of the NEXT block. Only works if next read
1370   * that comes in here is next in sequence in this block.
1371   *
1372   * When we read, we read current block and the next blocks' header. We do this so we have
1373   * the length of the next block to read if the hfile index is not available (rare, at
1374   * hfile open only).
1375   */
1376  private static class PrefetchedHeader {
1377    long offset = -1;
1378    byte[] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE];
1379    final ByteBuff buf = new SingleByteBuff(ByteBuffer.wrap(header, 0, header.length));
1380
1381    @Override
1382    public String toString() {
1383      return "offset=" + this.offset + ", header=" + Bytes.toStringBinary(header);
1384    }
1385  }
1386
1387  /**
1388   * Reads version 2 HFile blocks from the filesystem.
1389   */
1390  static class FSReaderImpl implements FSReader {
1391    /** The file system stream of the underlying {@link HFile} that
1392     * does or doesn't do checksum validations in the filesystem */
1393    private FSDataInputStreamWrapper streamWrapper;
1394
1395    private HFileBlockDecodingContext encodedBlockDecodingCtx;
1396
1397    /** Default context used when BlockType != {@link BlockType#ENCODED_DATA}. */
1398    private final HFileBlockDefaultDecodingContext defaultDecodingCtx;
1399
1400    /**
1401     * Cache of the NEXT header after this. Check it is indeed next blocks header
1402     * before using it. TODO: Review. This overread into next block to fetch
1403     * next blocks header seems unnecessary given we usually get the block size
1404     * from the hfile index. Review!
1405     */
1406    private AtomicReference<PrefetchedHeader> prefetchedHeader =
1407      new AtomicReference<>(new PrefetchedHeader());
1408
1409    /** The size of the file we are reading from, or -1 if unknown. */
1410    private long fileSize;
1411
1412    /** The size of the header */
1413    @VisibleForTesting
1414    protected final int hdrSize;
1415
1416    /** The filesystem used to access data */
1417    private HFileSystem hfs;
1418
1419    private HFileContext fileContext;
1420    // Cache the fileName
1421    private String pathName;
1422
1423    private final ByteBuffAllocator allocator;
1424
1425    private final Lock streamLock = new ReentrantLock();
1426
1427    FSReaderImpl(ReaderContext readerContext, HFileContext fileContext,
1428        ByteBuffAllocator allocator) throws IOException {
1429      this.fileSize = readerContext.getFileSize();
1430      this.hfs = readerContext.getFileSystem();
1431      if (readerContext.getFilePath() != null) {
1432        this.pathName = readerContext.getFilePath().toString();
1433      }
1434      this.fileContext = fileContext;
1435      this.hdrSize = headerSize(fileContext.isUseHBaseChecksum());
1436      this.allocator = allocator;
1437
1438      this.streamWrapper = readerContext.getInputStreamWrapper();
1439      // Older versions of HBase didn't support checksum.
1440      this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum());
1441      defaultDecodingCtx = new HFileBlockDefaultDecodingContext(fileContext);
1442      encodedBlockDecodingCtx = defaultDecodingCtx;
1443    }
1444
1445    @Override
1446    public BlockIterator blockRange(final long startOffset, final long endOffset) {
1447      final FSReader owner = this; // handle for inner class
1448      return new BlockIterator() {
1449        private volatile boolean freed = false;
1450        // Tracking all read blocks until we call freeBlocks.
1451        private List<HFileBlock> blockTracker = new ArrayList<>();
1452        private long offset = startOffset;
1453        // Cache length of next block. Current block has the length of next block in it.
1454        private long length = -1;
1455
1456        @Override
1457        public HFileBlock nextBlock() throws IOException {
1458          if (offset >= endOffset) {
1459            return null;
1460          }
1461          HFileBlock b = readBlockData(offset, length, false, false, true);
1462          offset += b.getOnDiskSizeWithHeader();
1463          length = b.getNextBlockOnDiskSize();
1464          HFileBlock uncompressed = b.unpack(fileContext, owner);
1465          if (uncompressed != b) {
1466            b.release(); // Need to release the compressed Block now.
1467          }
1468          blockTracker.add(uncompressed);
1469          return uncompressed;
1470        }
1471
1472        @Override
1473        public HFileBlock nextBlockWithBlockType(BlockType blockType) throws IOException {
1474          HFileBlock blk = nextBlock();
1475          if (blk.getBlockType() != blockType) {
1476            throw new IOException(
1477                "Expected block of type " + blockType + " but found " + blk.getBlockType());
1478          }
1479          return blk;
1480        }
1481
1482        @Override
1483        public void freeBlocks() {
1484          if (freed) {
1485            return;
1486          }
1487          blockTracker.forEach(HFileBlock::release);
1488          blockTracker = null;
1489          freed = true;
1490        }
1491      };
1492    }
1493
1494    /**
1495     * Does a positional read or a seek and read into the given byte buffer. We need take care that
1496     * we will call the {@link ByteBuff#release()} for every exit to deallocate the ByteBuffers,
1497     * otherwise the memory leak may happen.
1498     * @param dest destination buffer
1499     * @param size size of read
1500     * @param peekIntoNextBlock whether to read the next block's on-disk size
1501     * @param fileOffset position in the stream to read at
1502     * @param pread whether we should do a positional read
1503     * @param istream The input source of data
1504     * @return true to indicate the destination buffer include the next block header, otherwise only
1505     *         include the current block data without the next block header.
1506     * @throws IOException if any IO error happen.
1507     */
1508    protected boolean readAtOffset(FSDataInputStream istream, ByteBuff dest, int size,
1509        boolean peekIntoNextBlock, long fileOffset, boolean pread) throws IOException {
1510      if (!pread) {
1511        // Seek + read. Better for scanning.
1512        HFileUtil.seekOnMultipleSources(istream, fileOffset);
1513        long realOffset = istream.getPos();
1514        if (realOffset != fileOffset) {
1515          throw new IOException("Tried to seek to " + fileOffset + " to read " + size
1516              + " bytes, but pos=" + realOffset + " after seek");
1517        }
1518        if (!peekIntoNextBlock) {
1519          BlockIOUtils.readFully(dest, istream, size);
1520          return false;
1521        }
1522
1523        // Try to read the next block header
1524        if (!BlockIOUtils.readWithExtra(dest, istream, size, hdrSize)) {
1525          // did not read the next block header.
1526          return false;
1527        }
1528      } else {
1529        // Positional read. Better for random reads; or when the streamLock is already locked.
1530        int extraSize = peekIntoNextBlock ? hdrSize : 0;
1531        if (!BlockIOUtils.preadWithExtra(dest, istream, fileOffset, size, extraSize)) {
1532          // did not read the next block header.
1533          return false;
1534        }
1535      }
1536      assert peekIntoNextBlock;
1537      return true;
1538    }
1539
1540    /**
1541     * Reads a version 2 block (version 1 blocks not supported and not expected). Tries to do as
1542     * little memory allocation as possible, using the provided on-disk size.
1543     * @param offset the offset in the stream to read at
1544     * @param onDiskSizeWithHeaderL the on-disk size of the block, including the header, or -1 if
1545     *          unknown; i.e. when iterating over blocks reading in the file metadata info.
1546     * @param pread whether to use a positional read
1547     * @param updateMetrics whether to update the metrics
1548     * @param intoHeap allocate ByteBuff of block from heap or off-heap.
1549     * @see FSReader#readBlockData(long, long, boolean, boolean, boolean) for more details about the
1550     *      useHeap.
1551     */
1552    @Override
1553    public HFileBlock readBlockData(long offset, long onDiskSizeWithHeaderL, boolean pread,
1554        boolean updateMetrics, boolean intoHeap) throws IOException {
1555      // Get a copy of the current state of whether to validate
1556      // hbase checksums or not for this read call. This is not
1557      // thread-safe but the one constaint is that if we decide
1558      // to skip hbase checksum verification then we are
1559      // guaranteed to use hdfs checksum verification.
1560      boolean doVerificationThruHBaseChecksum = streamWrapper.shouldUseHBaseChecksum();
1561      FSDataInputStream is = streamWrapper.getStream(doVerificationThruHBaseChecksum);
1562
1563      HFileBlock blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread,
1564        doVerificationThruHBaseChecksum, updateMetrics, intoHeap);
1565      if (blk == null) {
1566        HFile.LOG.warn("HBase checksum verification failed for file " +
1567                       pathName + " at offset " +
1568                       offset + " filesize " + fileSize +
1569                       ". Retrying read with HDFS checksums turned on...");
1570
1571        if (!doVerificationThruHBaseChecksum) {
1572          String msg = "HBase checksum verification failed for file " +
1573                       pathName + " at offset " +
1574                       offset + " filesize " + fileSize +
1575                       " but this cannot happen because doVerify is " +
1576                       doVerificationThruHBaseChecksum;
1577          HFile.LOG.warn(msg);
1578          throw new IOException(msg); // cannot happen case here
1579        }
1580        HFile.CHECKSUM_FAILURES.increment(); // update metrics
1581
1582        // If we have a checksum failure, we fall back into a mode where
1583        // the next few reads use HDFS level checksums. We aim to make the
1584        // next CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD reads avoid
1585        // hbase checksum verification, but since this value is set without
1586        // holding any locks, it can so happen that we might actually do
1587        // a few more than precisely this number.
1588        is = this.streamWrapper.fallbackToFsChecksum(CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD);
1589        doVerificationThruHBaseChecksum = false;
1590        blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread,
1591          doVerificationThruHBaseChecksum, updateMetrics, intoHeap);
1592        if (blk != null) {
1593          HFile.LOG.warn("HDFS checksum verification succeeded for file " +
1594                         pathName + " at offset " +
1595                         offset + " filesize " + fileSize);
1596        }
1597      }
1598      if (blk == null && !doVerificationThruHBaseChecksum) {
1599        String msg = "readBlockData failed, possibly due to " +
1600                     "checksum verification failed for file " + pathName +
1601                     " at offset " + offset + " filesize " + fileSize;
1602        HFile.LOG.warn(msg);
1603        throw new IOException(msg);
1604      }
1605
1606      // If there is a checksum mismatch earlier, then retry with
1607      // HBase checksums switched off and use HDFS checksum verification.
1608      // This triggers HDFS to detect and fix corrupt replicas. The
1609      // next checksumOffCount read requests will use HDFS checksums.
1610      // The decrementing of this.checksumOffCount is not thread-safe,
1611      // but it is harmless because eventually checksumOffCount will be
1612      // a negative number.
1613      streamWrapper.checksumOk();
1614      return blk;
1615    }
1616
1617    /**
1618     * @return Check <code>onDiskSizeWithHeaderL</code> size is healthy and then return it as an int
1619     */
1620    private static int checkAndGetSizeAsInt(final long onDiskSizeWithHeaderL, final int hdrSize)
1621        throws IOException {
1622      if ((onDiskSizeWithHeaderL < hdrSize && onDiskSizeWithHeaderL != -1)
1623          || onDiskSizeWithHeaderL >= Integer.MAX_VALUE) {
1624        throw new IOException("Invalid onDisksize=" + onDiskSizeWithHeaderL
1625            + ": expected to be at least " + hdrSize
1626            + " and at most " + Integer.MAX_VALUE + ", or -1");
1627      }
1628      return (int)onDiskSizeWithHeaderL;
1629    }
1630
1631    /**
1632     * Verify the passed in onDiskSizeWithHeader aligns with what is in the header else something
1633     * is not right.
1634     */
1635    private void verifyOnDiskSizeMatchesHeader(final int passedIn, final ByteBuff headerBuf,
1636           final long offset, boolean verifyChecksum)
1637         throws IOException {
1638      // Assert size provided aligns with what is in the header
1639      int fromHeader = getOnDiskSizeWithHeader(headerBuf, verifyChecksum);
1640      if (passedIn != fromHeader) {
1641        throw new IOException("Passed in onDiskSizeWithHeader=" + passedIn + " != " + fromHeader +
1642            ", offset=" + offset + ", fileContext=" + this.fileContext);
1643      }
1644    }
1645
1646    /**
1647     * Check atomic reference cache for this block's header. Cache only good if next
1648     * read coming through is next in sequence in the block. We read next block's
1649     * header on the tail of reading the previous block to save a seek. Otherwise,
1650     * we have to do a seek to read the header before we can pull in the block OR
1651     * we have to backup the stream because we over-read (the next block's header).
1652     * @see PrefetchedHeader
1653     * @return The cached block header or null if not found.
1654     * @see #cacheNextBlockHeader(long, ByteBuff, int, int)
1655     */
1656    private ByteBuff getCachedHeader(final long offset) {
1657      PrefetchedHeader ph = this.prefetchedHeader.get();
1658      return ph != null && ph.offset == offset ? ph.buf : null;
1659    }
1660
1661    /**
1662     * Save away the next blocks header in atomic reference.
1663     * @see #getCachedHeader(long)
1664     * @see PrefetchedHeader
1665     */
1666    private void cacheNextBlockHeader(final long offset,
1667        ByteBuff onDiskBlock, int onDiskSizeWithHeader, int headerLength) {
1668      PrefetchedHeader ph = new PrefetchedHeader();
1669      ph.offset = offset;
1670      onDiskBlock.get(onDiskSizeWithHeader, ph.header, 0, headerLength);
1671      this.prefetchedHeader.set(ph);
1672    }
1673
1674    private int getNextBlockOnDiskSize(boolean readNextHeader, ByteBuff onDiskBlock,
1675        int onDiskSizeWithHeader) {
1676      int nextBlockOnDiskSize = -1;
1677      if (readNextHeader) {
1678        nextBlockOnDiskSize =
1679            onDiskBlock.getIntAfterPosition(onDiskSizeWithHeader + BlockType.MAGIC_LENGTH)
1680                + hdrSize;
1681      }
1682      return nextBlockOnDiskSize;
1683    }
1684
1685    private ByteBuff allocate(int size, boolean intoHeap) {
1686      return intoHeap ? HEAP.allocate(size) : allocator.allocate(size);
1687    }
1688
1689    /**
1690     * Reads a version 2 block.
1691     * @param offset the offset in the stream to read at.
1692     * @param onDiskSizeWithHeaderL the on-disk size of the block, including the header and
1693     *          checksums if present or -1 if unknown (as a long). Can be -1 if we are doing raw
1694     *          iteration of blocks as when loading up file metadata; i.e. the first read of a new
1695     *          file. Usually non-null gotten from the file index.
1696     * @param pread whether to use a positional read
1697     * @param verifyChecksum Whether to use HBase checksums. If HBase checksum is switched off, then
1698     *          use HDFS checksum. Can also flip on/off reading same file if we hit a troublesome
1699     *          patch in an hfile.
1700     * @param updateMetrics whether need to update the metrics.
1701     * @param intoHeap allocate the ByteBuff of block from heap or off-heap.
1702     * @return the HFileBlock or null if there is a HBase checksum mismatch
1703     */
1704    @VisibleForTesting
1705    protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset,
1706        long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum, boolean updateMetrics,
1707        boolean intoHeap) throws IOException {
1708      if (offset < 0) {
1709        throw new IOException("Invalid offset=" + offset + " trying to read "
1710            + "block (onDiskSize=" + onDiskSizeWithHeaderL + ")");
1711      }
1712      int onDiskSizeWithHeader = checkAndGetSizeAsInt(onDiskSizeWithHeaderL, hdrSize);
1713      // Try and get cached header. Will serve us in rare case where onDiskSizeWithHeaderL is -1
1714      // and will save us having to seek the stream backwards to reread the header we
1715      // read the last time through here.
1716      ByteBuff headerBuf = getCachedHeader(offset);
1717      LOG.trace("Reading {} at offset={}, pread={}, verifyChecksum={}, cachedHeader={}, " +
1718          "onDiskSizeWithHeader={}", this.fileContext.getHFileName(), offset, pread,
1719          verifyChecksum, headerBuf, onDiskSizeWithHeader);
1720      // This is NOT same as verifyChecksum. This latter is whether to do hbase
1721      // checksums. Can change with circumstances. The below flag is whether the
1722      // file has support for checksums (version 2+).
1723      boolean checksumSupport = this.fileContext.isUseHBaseChecksum();
1724      long startTime = System.currentTimeMillis();
1725      if (onDiskSizeWithHeader <= 0) {
1726        // We were not passed the block size. Need to get it from the header. If header was
1727        // not cached (see getCachedHeader above), need to seek to pull it in. This is costly
1728        // and should happen very rarely. Currently happens on open of a hfile reader where we
1729        // read the trailer blocks to pull in the indices. Otherwise, we are reading block sizes
1730        // out of the hfile index. To check, enable TRACE in this file and you'll get an exception
1731        // in a LOG every time we seek. See HBASE-17072 for more detail.
1732        if (headerBuf == null) {
1733          if (LOG.isTraceEnabled()) {
1734            LOG.trace("Extra see to get block size!", new RuntimeException());
1735          }
1736          headerBuf = HEAP.allocate(hdrSize);
1737          readAtOffset(is, headerBuf, hdrSize, false, offset, pread);
1738          headerBuf.rewind();
1739        }
1740        onDiskSizeWithHeader = getOnDiskSizeWithHeader(headerBuf, checksumSupport);
1741      }
1742      int preReadHeaderSize = headerBuf == null? 0 : hdrSize;
1743      // Allocate enough space to fit the next block's header too; saves a seek next time through.
1744      // onDiskBlock is whole block + header + checksums then extra hdrSize to read next header;
1745      // onDiskSizeWithHeader is header, body, and any checksums if present. preReadHeaderSize
1746      // says where to start reading. If we have the header cached, then we don't need to read
1747      // it again and we can likely read from last place we left off w/o need to backup and reread
1748      // the header we read last time through here.
1749      ByteBuff onDiskBlock = this.allocate(onDiskSizeWithHeader + hdrSize, intoHeap);
1750      boolean initHFileBlockSuccess = false;
1751      try {
1752        if (headerBuf != null) {
1753          onDiskBlock.put(0, headerBuf, 0, hdrSize).position(hdrSize);
1754        }
1755        boolean readNextHeader = readAtOffset(is, onDiskBlock,
1756          onDiskSizeWithHeader - preReadHeaderSize, true, offset + preReadHeaderSize, pread);
1757        onDiskBlock.rewind(); // in case of moving position when copying a cached header
1758        int nextBlockOnDiskSize =
1759            getNextBlockOnDiskSize(readNextHeader, onDiskBlock, onDiskSizeWithHeader);
1760        if (headerBuf == null) {
1761          headerBuf = onDiskBlock.duplicate().position(0).limit(hdrSize);
1762        }
1763        // Do a few checks before we go instantiate HFileBlock.
1764        assert onDiskSizeWithHeader > this.hdrSize;
1765        verifyOnDiskSizeMatchesHeader(onDiskSizeWithHeader, headerBuf, offset, checksumSupport);
1766        ByteBuff curBlock = onDiskBlock.duplicate().position(0).limit(onDiskSizeWithHeader);
1767        // Verify checksum of the data before using it for building HFileBlock.
1768        if (verifyChecksum && !validateChecksum(offset, curBlock, hdrSize)) {
1769          return null;
1770        }
1771        long duration = System.currentTimeMillis() - startTime;
1772        if (updateMetrics) {
1773          HFile.updateReadLatency(duration, pread);
1774        }
1775        // The onDiskBlock will become the headerAndDataBuffer for this block.
1776        // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already
1777        // contains the header of next block, so no need to set next block's header in it.
1778        HFileBlock hFileBlock = createFromBuff(curBlock, checksumSupport, offset,
1779          nextBlockOnDiskSize, fileContext, intoHeap ? HEAP : allocator);
1780        // Run check on uncompressed sizings.
1781        if (!fileContext.isCompressedOrEncrypted()) {
1782          hFileBlock.sanityCheckUncompressed();
1783        }
1784        LOG.trace("Read {} in {} ns", hFileBlock, duration);
1785        // Cache next block header if we read it for the next time through here.
1786        if (nextBlockOnDiskSize != -1) {
1787          cacheNextBlockHeader(offset + hFileBlock.getOnDiskSizeWithHeader(), onDiskBlock,
1788            onDiskSizeWithHeader, hdrSize);
1789        }
1790        initHFileBlockSuccess = true;
1791        return hFileBlock;
1792      } finally {
1793        if (!initHFileBlockSuccess) {
1794          onDiskBlock.release();
1795        }
1796      }
1797    }
1798
1799    @Override
1800    public void setIncludesMemStoreTS(boolean includesMemstoreTS) {
1801      this.fileContext = new HFileContextBuilder(this.fileContext)
1802        .withIncludesMvcc(includesMemstoreTS).build();
1803    }
1804
1805    @Override
1806    public void setDataBlockEncoder(HFileDataBlockEncoder encoder) {
1807      encodedBlockDecodingCtx = encoder.newDataBlockDecodingContext(this.fileContext);
1808    }
1809
1810    @Override
1811    public HFileBlockDecodingContext getBlockDecodingContext() {
1812      return this.encodedBlockDecodingCtx;
1813    }
1814
1815    @Override
1816    public HFileBlockDecodingContext getDefaultBlockDecodingContext() {
1817      return this.defaultDecodingCtx;
1818    }
1819
1820    /**
1821     * Generates the checksum for the header as well as the data and then validates it.
1822     * If the block doesn't uses checksum, returns false.
1823     * @return True if checksum matches, else false.
1824     */
1825    private boolean validateChecksum(long offset, ByteBuff data, int hdrSize) {
1826      // If this is an older version of the block that does not have checksums, then return false
1827      // indicating that checksum verification did not succeed. Actually, this method should never
1828      // be called when the minorVersion is 0, thus this is a defensive check for a cannot-happen
1829      // case. Since this is a cannot-happen case, it is better to return false to indicate a
1830      // checksum validation failure.
1831      if (!fileContext.isUseHBaseChecksum()) {
1832        return false;
1833      }
1834      return ChecksumUtil.validateChecksum(data, pathName, offset, hdrSize);
1835    }
1836
1837    @Override
1838    public void closeStreams() throws IOException {
1839      streamWrapper.close();
1840    }
1841
1842    @Override
1843    public void unbufferStream() {
1844      // To handle concurrent reads, ensure that no other client is accessing the streams while we
1845      // unbuffer it.
1846      if (streamLock.tryLock()) {
1847        try {
1848          this.streamWrapper.unbuffer();
1849        } finally {
1850          streamLock.unlock();
1851        }
1852      }
1853    }
1854
1855    @Override
1856    public String toString() {
1857      return "hfs=" + hfs + ", path=" + pathName + ", fileContext=" + fileContext;
1858    }
1859  }
1860
1861  /** An additional sanity-check in case no compression or encryption is being used. */
1862  @VisibleForTesting
1863  void sanityCheckUncompressed() throws IOException {
1864    if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader +
1865        totalChecksumBytes()) {
1866      throw new IOException("Using no compression but "
1867          + "onDiskSizeWithoutHeader=" + onDiskSizeWithoutHeader + ", "
1868          + "uncompressedSizeWithoutHeader=" + uncompressedSizeWithoutHeader
1869          + ", numChecksumbytes=" + totalChecksumBytes());
1870    }
1871  }
1872
1873  // Cacheable implementation
1874  @Override
1875  public int getSerializedLength() {
1876    if (buf != null) {
1877      // Include extra bytes for block metadata.
1878      return this.buf.limit() + BLOCK_METADATA_SPACE;
1879    }
1880    return 0;
1881  }
1882
1883  // Cacheable implementation
1884  @Override
1885  public void serialize(ByteBuffer destination, boolean includeNextBlockMetadata) {
1886    this.buf.get(destination, 0, getSerializedLength() - BLOCK_METADATA_SPACE);
1887    destination = addMetaData(destination, includeNextBlockMetadata);
1888
1889    // Make it ready for reading. flip sets position to zero and limit to current position which
1890    // is what we want if we do not want to serialize the block plus checksums if present plus
1891    // metadata.
1892    destination.flip();
1893  }
1894
1895  /**
1896   * For use by bucketcache. This exposes internals.
1897   */
1898  public ByteBuffer getMetaData() {
1899    ByteBuffer bb = ByteBuffer.allocate(BLOCK_METADATA_SPACE);
1900    bb = addMetaData(bb, true);
1901    bb.flip();
1902    return bb;
1903  }
1904
1905  /**
1906   * Adds metadata at current position (position is moved forward). Does not flip or reset.
1907   * @return The passed <code>destination</code> with metadata added.
1908   */
1909  private ByteBuffer addMetaData(final ByteBuffer destination, boolean includeNextBlockMetadata) {
1910    destination.put(this.fileContext.isUseHBaseChecksum() ? (byte) 1 : (byte) 0);
1911    destination.putLong(this.offset);
1912    if (includeNextBlockMetadata) {
1913      destination.putInt(this.nextBlockOnDiskSize);
1914    }
1915    return destination;
1916  }
1917
1918  // Cacheable implementation
1919  @Override
1920  public CacheableDeserializer<Cacheable> getDeserializer() {
1921    return HFileBlock.BLOCK_DESERIALIZER;
1922  }
1923
1924  @Override
1925  public int hashCode() {
1926    int result = 1;
1927    result = result * 31 + blockType.hashCode();
1928    result = result * 31 + nextBlockOnDiskSize;
1929    result = result * 31 + (int) (offset ^ (offset >>> 32));
1930    result = result * 31 + onDiskSizeWithoutHeader;
1931    result = result * 31 + (int) (prevBlockOffset ^ (prevBlockOffset >>> 32));
1932    result = result * 31 + uncompressedSizeWithoutHeader;
1933    result = result * 31 + buf.hashCode();
1934    return result;
1935  }
1936
1937  @Override
1938  public boolean equals(Object comparison) {
1939    if (this == comparison) {
1940      return true;
1941    }
1942    if (comparison == null) {
1943      return false;
1944    }
1945    if (!(comparison instanceof HFileBlock)) {
1946      return false;
1947    }
1948
1949    HFileBlock castedComparison = (HFileBlock) comparison;
1950
1951    if (castedComparison.blockType != this.blockType) {
1952      return false;
1953    }
1954    if (castedComparison.nextBlockOnDiskSize != this.nextBlockOnDiskSize) {
1955      return false;
1956    }
1957    // Offset is important. Needed when we have to remake cachekey when block is returned to cache.
1958    if (castedComparison.offset != this.offset) {
1959      return false;
1960    }
1961    if (castedComparison.onDiskSizeWithoutHeader != this.onDiskSizeWithoutHeader) {
1962      return false;
1963    }
1964    if (castedComparison.prevBlockOffset != this.prevBlockOffset) {
1965      return false;
1966    }
1967    if (castedComparison.uncompressedSizeWithoutHeader != this.uncompressedSizeWithoutHeader) {
1968      return false;
1969    }
1970    if (ByteBuff.compareTo(this.buf, 0, this.buf.limit(), castedComparison.buf, 0,
1971        castedComparison.buf.limit()) != 0) {
1972      return false;
1973    }
1974    return true;
1975  }
1976
1977  DataBlockEncoding getDataBlockEncoding() {
1978    if (blockType == BlockType.ENCODED_DATA) {
1979      return DataBlockEncoding.getEncodingById(getDataBlockEncodingId());
1980    }
1981    return DataBlockEncoding.NONE;
1982  }
1983
1984  @VisibleForTesting
1985  byte getChecksumType() {
1986    return this.fileContext.getChecksumType().getCode();
1987  }
1988
1989  int getBytesPerChecksum() {
1990    return this.fileContext.getBytesPerChecksum();
1991  }
1992
1993  /** @return the size of data on disk + header. Excludes checksum. */
1994  @VisibleForTesting
1995  int getOnDiskDataSizeWithHeader() {
1996    return this.onDiskDataSizeWithHeader;
1997  }
1998
1999  /**
2000   * Calculate the number of bytes required to store all the checksums
2001   * for this block. Each checksum value is a 4 byte integer.
2002   */
2003  int totalChecksumBytes() {
2004    // If the hfile block has minorVersion 0, then there are no checksum
2005    // data to validate. Similarly, a zero value in this.bytesPerChecksum
2006    // indicates that cached blocks do not have checksum data because
2007    // checksums were already validated when the block was read from disk.
2008    if (!fileContext.isUseHBaseChecksum() || this.fileContext.getBytesPerChecksum() == 0) {
2009      return 0;
2010    }
2011    return (int) ChecksumUtil.numBytes(onDiskDataSizeWithHeader,
2012        this.fileContext.getBytesPerChecksum());
2013  }
2014
2015  /**
2016   * Returns the size of this block header.
2017   */
2018  public int headerSize() {
2019    return headerSize(this.fileContext.isUseHBaseChecksum());
2020  }
2021
2022  /**
2023   * Maps a minor version to the size of the header.
2024   */
2025  public static int headerSize(boolean usesHBaseChecksum) {
2026    return usesHBaseChecksum?
2027        HConstants.HFILEBLOCK_HEADER_SIZE: HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
2028  }
2029
2030  /**
2031   * Return the appropriate DUMMY_HEADER for the minor version
2032   */
2033  @VisibleForTesting
2034  // TODO: Why is this in here?
2035  byte[] getDummyHeaderForVersion() {
2036    return getDummyHeaderForVersion(this.fileContext.isUseHBaseChecksum());
2037  }
2038
2039  /**
2040   * Return the appropriate DUMMY_HEADER for the minor version
2041   */
2042  static private byte[] getDummyHeaderForVersion(boolean usesHBaseChecksum) {
2043    return usesHBaseChecksum? HConstants.HFILEBLOCK_DUMMY_HEADER: DUMMY_HEADER_NO_CHECKSUM;
2044  }
2045
2046  /**
2047   * @return This HFileBlocks fileContext which will a derivative of the
2048   *   fileContext for the file from which this block's data was originally read.
2049   */
2050  HFileContext getHFileContext() {
2051    return this.fileContext;
2052  }
2053
2054  /**
2055   * Convert the contents of the block header into a human readable string.
2056   * This is mostly helpful for debugging. This assumes that the block
2057   * has minor version > 0.
2058   */
2059  @VisibleForTesting
2060  static String toStringHeader(ByteBuff buf) throws IOException {
2061    byte[] magicBuf = new byte[Math.min(buf.limit() - buf.position(), BlockType.MAGIC_LENGTH)];
2062    buf.get(magicBuf);
2063    BlockType bt = BlockType.parse(magicBuf, 0, BlockType.MAGIC_LENGTH);
2064    int compressedBlockSizeNoHeader = buf.getInt();
2065    int uncompressedBlockSizeNoHeader = buf.getInt();
2066    long prevBlockOffset = buf.getLong();
2067    byte cksumtype = buf.get();
2068    long bytesPerChecksum = buf.getInt();
2069    long onDiskDataSizeWithHeader = buf.getInt();
2070    return " Header dump: magic: " + Bytes.toString(magicBuf) +
2071                   " blockType " + bt +
2072                   " compressedBlockSizeNoHeader " +
2073                   compressedBlockSizeNoHeader +
2074                   " uncompressedBlockSizeNoHeader " +
2075                   uncompressedBlockSizeNoHeader +
2076                   " prevBlockOffset " + prevBlockOffset +
2077                   " checksumType " + ChecksumType.codeToType(cksumtype) +
2078                   " bytesPerChecksum " + bytesPerChecksum +
2079                   " onDiskDataSizeWithHeader " + onDiskDataSizeWithHeader;
2080  }
2081
2082  private static HFileBlockBuilder createBuilder(HFileBlock blk){
2083    return new HFileBlockBuilder()
2084          .withBlockType(blk.blockType)
2085          .withOnDiskSizeWithoutHeader(blk.onDiskSizeWithoutHeader)
2086          .withUncompressedSizeWithoutHeader(blk.uncompressedSizeWithoutHeader)
2087          .withPrevBlockOffset(blk.prevBlockOffset)
2088          .withByteBuff(blk.buf.duplicate()) // Duplicate the buffer.
2089          .withOffset(blk.offset)
2090          .withOnDiskDataSizeWithHeader(blk.onDiskDataSizeWithHeader)
2091          .withNextBlockOnDiskSize(blk.nextBlockOnDiskSize)
2092          .withHFileContext(blk.fileContext)
2093          .withByteBuffAllocator(blk.allocator)
2094          .withShared(blk.isSharedMem());
2095  }
2096
2097  static HFileBlock shallowClone(HFileBlock blk) {
2098    return createBuilder(blk).build();
2099  }
2100
2101  static HFileBlock deepCloneOnHeap(HFileBlock blk) {
2102    ByteBuff deepCloned = ByteBuff.wrap(ByteBuffer.wrap(blk.buf.toBytes(0, blk.buf.limit())));
2103    return createBuilder(blk).withByteBuff(deepCloned).withShared(false).build();
2104  }
2105}