001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP;
021import java.io.DataInputStream;
022import java.io.DataOutput;
023import java.io.DataOutputStream;
024import java.io.IOException;
025import java.nio.ByteBuffer;
026import java.util.ArrayList;
027import java.util.List;
028import java.util.concurrent.atomic.AtomicReference;
029import java.util.concurrent.locks.Lock;
030import java.util.concurrent.locks.ReentrantLock;
031import org.apache.hadoop.fs.FSDataInputStream;
032import org.apache.hadoop.fs.FSDataOutputStream;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.fs.HFileSystem;
036import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
037import org.apache.hadoop.hbase.io.ByteBuffAllocator;
038import org.apache.hadoop.hbase.io.ByteBuffInputStream;
039import org.apache.hadoop.hbase.io.ByteBufferWriterDataOutputStream;
040import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
041import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
042import org.apache.hadoop.hbase.io.encoding.EncodingState;
043import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
044import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
045import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
046import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
047import org.apache.hadoop.hbase.io.util.BlockIOUtils;
048import org.apache.hadoop.hbase.nio.ByteBuff;
049import org.apache.hadoop.hbase.nio.MultiByteBuff;
050import org.apache.hadoop.hbase.nio.SingleByteBuff;
051import org.apache.hadoop.hbase.regionserver.ShipperListener;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.apache.hadoop.hbase.util.ChecksumType;
054import org.apache.hadoop.hbase.util.ClassSize;
055import org.apache.yetus.audience.InterfaceAudience;
056import org.slf4j.Logger;
057import org.slf4j.LoggerFactory;
058
059import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
060
061/**
062 * Cacheable Blocks of an {@link HFile} version 2 file.
063 * Version 2 was introduced in hbase-0.92.0.
064 *
065 * <p>Version 1 was the original file block. Version 2 was introduced when we changed the hbase file
066 * format to support multi-level block indexes and compound bloom filters (HBASE-3857). Support
067 * for Version 1 was removed in hbase-1.3.0.
068 *
069 * <h3>HFileBlock: Version 2</h3>
070 * In version 2, a block is structured as follows:
071 * <ul>
072 * <li><b>Header:</b> See Writer#putHeader() for where header is written; header total size is
073 * HFILEBLOCK_HEADER_SIZE
074 * <ul>
075 * <li>0. blockType: Magic record identifying the {@link BlockType} (8 bytes):
076 * e.g. <code>DATABLK*</code>
077 * <li>1. onDiskSizeWithoutHeader: Compressed -- a.k.a 'on disk' -- block size, excluding header,
078 * but including tailing checksum bytes (4 bytes)
079 * <li>2. uncompressedSizeWithoutHeader: Uncompressed block size, excluding header, and excluding
080 * checksum bytes (4 bytes)
081 * <li>3. prevBlockOffset: The offset of the previous block of the same type (8 bytes). This is
082 * used to navigate to the previous block without having to go to the block index
083 * <li>4: For minorVersions &gt;=1, the ordinal describing checksum type (1 byte)
084 * <li>5: For minorVersions &gt;=1, the number of data bytes/checksum chunk (4 bytes)
085 * <li>6: onDiskDataSizeWithHeader: For minorVersions &gt;=1, the size of data 'on disk', including
086 * header, excluding checksums (4 bytes)
087 * </ul>
088 * </li>
089 * <li><b>Raw/Compressed/Encrypted/Encoded data:</b> The compression
090 * algorithm is the same for all the blocks in an {@link HFile}. If compression is NONE, this is
091 * just raw, serialized Cells.
092 * <li><b>Tail:</b> For minorVersions &gt;=1, a series of 4 byte checksums, one each for
093 * the number of bytes specified by bytesPerChecksum.
094 * </ul>
095 *
096 * <h3>Caching</h3>
097 * Caches cache whole blocks with trailing checksums if any. We then tag on some metadata, the
098 * content of BLOCK_METADATA_SPACE which will be flag on if we are doing 'hbase'
099 * checksums and then the offset into the file which is needed when we re-make a cache key
100 * when we return the block to the cache as 'done'.
101 * See {@link Cacheable#serialize(ByteBuffer, boolean)} and {@link Cacheable#getDeserializer()}.
102 *
103 * <p>TODO: Should we cache the checksums? Down in Writer#getBlockForCaching(CacheConfig) where
104 * we make a block to cache-on-write, there is an attempt at turning off checksums. This is not the
105 * only place we get blocks to cache. We also will cache the raw return from an hdfs read. In this
106 * case, the checksums may be present. If the cache is backed by something that doesn't do ECC,
107 * say an SSD, we might want to preserve checksums. For now this is open question.
108 * <p>TODO: Over in BucketCache, we save a block allocation by doing a custom serialization.
109 * Be sure to change it if serialization changes in here. Could we add a method here that takes an
110 * IOEngine and that then serializes to it rather than expose our internals over in BucketCache?
111 * IOEngine is in the bucket subpackage. Pull it up? Then this class knows about bucketcache. Ugh.
112 */
113@InterfaceAudience.Private
114public class HFileBlock implements Cacheable {
115  private static final Logger LOG = LoggerFactory.getLogger(HFileBlock.class);
116  public static final long FIXED_OVERHEAD = ClassSize.estimateBase(HFileBlock.class, false);
117
118  // Block Header fields.
119
120  // TODO: encapsulate Header related logic in this inner class.
121  static class Header {
122    // Format of header is:
123    // 8 bytes - block magic
124    // 4 bytes int - onDiskSizeWithoutHeader
125    // 4 bytes int - uncompressedSizeWithoutHeader
126    // 8 bytes long - prevBlockOffset
127    // The following 3 are only present if header contains checksum information
128    // 1 byte - checksum type
129    // 4 byte int - bytes per checksum
130    // 4 byte int - onDiskDataSizeWithHeader
131    static int BLOCK_MAGIC_INDEX = 0;
132    static int ON_DISK_SIZE_WITHOUT_HEADER_INDEX = 8;
133    static int UNCOMPRESSED_SIZE_WITHOUT_HEADER_INDEX = 12;
134    static int PREV_BLOCK_OFFSET_INDEX = 16;
135    static int CHECKSUM_TYPE_INDEX = 24;
136    static int BYTES_PER_CHECKSUM_INDEX = 25;
137    static int ON_DISK_DATA_SIZE_WITH_HEADER_INDEX = 29;
138  }
139
140  /** Type of block. Header field 0. */
141  private BlockType blockType;
142
143  /**
144   * Size on disk excluding header, including checksum. Header field 1.
145   * @see Writer#putHeader(byte[], int, int, int, int)
146   */
147  private int onDiskSizeWithoutHeader;
148
149  /**
150   * Size of pure data. Does not include header or checksums. Header field 2.
151   * @see Writer#putHeader(byte[], int, int, int, int)
152   */
153  private int uncompressedSizeWithoutHeader;
154
155  /**
156   * The offset of the previous block on disk. Header field 3.
157   * @see Writer#putHeader(byte[], int, int, int, int)
158   */
159  private long prevBlockOffset;
160
161  /**
162   * Size on disk of header + data. Excludes checksum. Header field 6,
163   * OR calculated from {@link #onDiskSizeWithoutHeader} when using HDFS checksum.
164   * @see Writer#putHeader(byte[], int, int, int, int)
165   */
166  private int onDiskDataSizeWithHeader;
167  // End of Block Header fields.
168
169  /**
170   * The in-memory representation of the hfile block. Can be on or offheap. Can be backed by
171   * a single ByteBuffer or by many. Make no assumptions.
172   *
173   * <p>Be careful reading from this <code>buf</code>. Duplicate and work on the duplicate or if
174   * not, be sure to reset position and limit else trouble down the road.
175   *
176   * <p>TODO: Make this read-only once made.
177   *
178   * <p>We are using the ByteBuff type. ByteBuffer is not extensible yet we need to be able to have
179   * a ByteBuffer-like API across multiple ByteBuffers reading from a cache such as BucketCache.
180   * So, we have this ByteBuff type. Unfortunately, it is spread all about HFileBlock. Would be
181   * good if could be confined to cache-use only but hard-to-do.
182   */
183  private ByteBuff buf;
184
185  /** Meta data that holds meta information on the hfileblock.
186   */
187  private HFileContext fileContext;
188
189  /**
190   * The offset of this block in the file. Populated by the reader for
191   * convenience of access. This offset is not part of the block header.
192   */
193  private long offset = UNSET;
194
195  /**
196   * The on-disk size of the next block, including the header and checksums if present.
197   * UNSET if unknown.
198   *
199   * Blocks try to carry the size of the next block to read in this data member. Usually
200   * we get block sizes from the hfile index but sometimes the index is not available:
201   * e.g. when we read the indexes themselves (indexes are stored in blocks, we do not
202   * have an index for the indexes). Saves seeks especially around file open when
203   * there is a flurry of reading in hfile metadata.
204   */
205  private int nextBlockOnDiskSize = UNSET;
206
207  private ByteBuffAllocator allocator;
208
209  /**
210   * On a checksum failure, do these many succeeding read requests using hdfs checksums before
211   * auto-reenabling hbase checksum verification.
212   */
213  static final int CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD = 3;
214
215  private static int UNSET = -1;
216  public static final boolean FILL_HEADER = true;
217  public static final boolean DONT_FILL_HEADER = false;
218
219  // How to get the estimate correctly? if it is a singleBB?
220  public static final int MULTI_BYTE_BUFFER_HEAP_SIZE =
221      (int)ClassSize.estimateBase(MultiByteBuff.class, false);
222
223  /**
224   * Space for metadata on a block that gets stored along with the block when we cache it.
225   * There are a few bytes stuck on the end of the HFileBlock that we pull in from HDFS.
226   * 8 bytes are for the offset of this block (long) in the file. Offset is important because is is
227   * used when we remake the CacheKey when we return block to the cache when done. There is also
228   * a flag on whether checksumming is being done by hbase or not. See class comment for note on
229   * uncertain state of checksumming of blocks that come out of cache (should we or should we not?).
230   * Finally there are 4 bytes to hold the length of the next block which can save a seek on
231   * occasion if available.
232   * (This EXTRA info came in with original commit of the bucketcache, HBASE-7404. It was
233   * formerly known as EXTRA_SERIALIZATION_SPACE).
234   */
235  static final int BLOCK_METADATA_SPACE = Bytes.SIZEOF_BYTE + Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT;
236
237  /**
238   * Each checksum value is an integer that can be stored in 4 bytes.
239   */
240  static final int CHECKSUM_SIZE = Bytes.SIZEOF_INT;
241
242  static final byte[] DUMMY_HEADER_NO_CHECKSUM =
243      new byte[HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM];
244
245  /**
246   * Used deserializing blocks from Cache.
247   *
248   * <code>
249   * ++++++++++++++
250   * + HFileBlock +
251   * ++++++++++++++
252   * + Checksums  + <= Optional
253   * ++++++++++++++
254   * + Metadata!  + <= See note on BLOCK_METADATA_SPACE above.
255   * ++++++++++++++
256   * </code>
257   * @see #serialize(ByteBuffer, boolean)
258   */
259  public static final CacheableDeserializer<Cacheable> BLOCK_DESERIALIZER = new BlockDeserializer();
260
261  public static final class BlockDeserializer implements CacheableDeserializer<Cacheable> {
262    private BlockDeserializer() {
263    }
264
265    @Override
266    public HFileBlock deserialize(ByteBuff buf, ByteBuffAllocator alloc)
267        throws IOException {
268      // The buf has the file block followed by block metadata.
269      // Set limit to just before the BLOCK_METADATA_SPACE then rewind.
270      buf.limit(buf.limit() - BLOCK_METADATA_SPACE).rewind();
271      // Get a new buffer to pass the HFileBlock for it to 'own'.
272      ByteBuff newByteBuff = buf.slice();
273      // Read out the BLOCK_METADATA_SPACE content and shove into our HFileBlock.
274      buf.position(buf.limit());
275      buf.limit(buf.limit() + HFileBlock.BLOCK_METADATA_SPACE);
276      boolean usesChecksum = buf.get() == (byte) 1;
277      long offset = buf.getLong();
278      int nextBlockOnDiskSize = buf.getInt();
279      return createFromBuff(newByteBuff, usesChecksum, offset, nextBlockOnDiskSize, null, alloc);
280    }
281
282    @Override
283    public int getDeserializerIdentifier() {
284      return DESERIALIZER_IDENTIFIER;
285    }
286  }
287
288  private static final int DESERIALIZER_IDENTIFIER;
289  static {
290    DESERIALIZER_IDENTIFIER =
291        CacheableDeserializerIdManager.registerDeserializer(BLOCK_DESERIALIZER);
292  }
293
294  /**
295   * Creates a new {@link HFile} block from the given fields. This constructor
296   * is used only while writing blocks and caching,
297   * and is sitting in a byte buffer and we want to stuff the block into cache.
298   * See {@link Writer#getBlockForCaching(CacheConfig)}.
299   *
300   * <p>TODO: The caller presumes no checksumming
301   * <p>TODO: HFile block writer can also off-heap ? </p>
302   * required of this block instance since going into cache; checksum already verified on
303   * underlying block data pulled in from filesystem. Is that correct? What if cache is SSD?
304   *
305   * @param blockType the type of this block, see {@link BlockType}
306   * @param onDiskSizeWithoutHeader see {@link #onDiskSizeWithoutHeader}
307   * @param uncompressedSizeWithoutHeader see {@link #uncompressedSizeWithoutHeader}
308   * @param prevBlockOffset see {@link #prevBlockOffset}
309   * @param buf block buffer with header ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes)
310   * @param fillHeader when true, write the first 4 header fields into passed buffer.
311   * @param offset the file offset the block was read from
312   * @param onDiskDataSizeWithHeader see {@link #onDiskDataSizeWithHeader}
313   * @param fileContext HFile meta data
314   */
315  public HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader,
316      int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuff buf, boolean fillHeader,
317      long offset, int nextBlockOnDiskSize, int onDiskDataSizeWithHeader, HFileContext fileContext,
318      ByteBuffAllocator allocator) {
319    this.blockType = blockType;
320    this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader;
321    this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader;
322    this.prevBlockOffset = prevBlockOffset;
323    this.offset = offset;
324    this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader;
325    this.nextBlockOnDiskSize = nextBlockOnDiskSize;
326    this.fileContext = fileContext;
327    this.allocator = allocator;
328    this.buf = buf;
329    if (fillHeader) {
330      overwriteHeader();
331    }
332    this.buf.rewind();
333  }
334
335  /**
336   * Creates a block from an existing buffer starting with a header. Rewinds
337   * and takes ownership of the buffer. By definition of rewind, ignores the
338   * buffer position, but if you slice the buffer beforehand, it will rewind
339   * to that point.
340   * @param buf Has header, content, and trailing checksums if present.
341   */
342  static HFileBlock createFromBuff(ByteBuff buf, boolean usesHBaseChecksum, final long offset,
343      final int nextBlockOnDiskSize, HFileContext fileContext, ByteBuffAllocator allocator)
344      throws IOException {
345    buf.rewind();
346    final BlockType blockType = BlockType.read(buf);
347    final int onDiskSizeWithoutHeader = buf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX);
348    final int uncompressedSizeWithoutHeader =
349        buf.getInt(Header.UNCOMPRESSED_SIZE_WITHOUT_HEADER_INDEX);
350    final long prevBlockOffset = buf.getLong(Header.PREV_BLOCK_OFFSET_INDEX);
351    // This constructor is called when we deserialize a block from cache and when we read a block in
352    // from the fs. fileCache is null when deserialized from cache so need to make up one.
353    HFileContextBuilder fileContextBuilder = fileContext != null ?
354      new HFileContextBuilder(fileContext) : new HFileContextBuilder();
355    fileContextBuilder.withHBaseCheckSum(usesHBaseChecksum);
356    int onDiskDataSizeWithHeader;
357    if (usesHBaseChecksum) {
358      byte checksumType = buf.get(Header.CHECKSUM_TYPE_INDEX);
359      int bytesPerChecksum = buf.getInt(Header.BYTES_PER_CHECKSUM_INDEX);
360      onDiskDataSizeWithHeader = buf.getInt(Header.ON_DISK_DATA_SIZE_WITH_HEADER_INDEX);
361      // Use the checksum type and bytes per checksum from header, not from fileContext.
362      fileContextBuilder.withChecksumType(ChecksumType.codeToType(checksumType));
363      fileContextBuilder.withBytesPerCheckSum(bytesPerChecksum);
364    } else {
365      fileContextBuilder.withChecksumType(ChecksumType.NULL);
366      fileContextBuilder.withBytesPerCheckSum(0);
367      // Need to fix onDiskDataSizeWithHeader; there are not checksums after-block-data
368      onDiskDataSizeWithHeader = onDiskSizeWithoutHeader + headerSize(usesHBaseChecksum);
369    }
370    fileContext = fileContextBuilder.build();
371    assert usesHBaseChecksum == fileContext.isUseHBaseChecksum();
372    return new HFileBlockBuilder()
373        .withBlockType(blockType)
374        .withOnDiskSizeWithoutHeader(onDiskSizeWithoutHeader)
375        .withUncompressedSizeWithoutHeader(uncompressedSizeWithoutHeader)
376        .withPrevBlockOffset(prevBlockOffset)
377        .withOffset(offset)
378        .withOnDiskDataSizeWithHeader(onDiskDataSizeWithHeader)
379        .withNextBlockOnDiskSize(nextBlockOnDiskSize)
380        .withHFileContext(fileContext)
381        .withByteBuffAllocator(allocator)
382        .withByteBuff(buf.rewind())
383        .withShared(!buf.hasArray())
384        .build();
385  }
386
387  /**
388   * Parse total on disk size including header and checksum.
389   * @param headerBuf Header ByteBuffer. Presumed exact size of header.
390   * @param verifyChecksum true if checksum verification is in use.
391   * @return Size of the block with header included.
392   */
393  private static int getOnDiskSizeWithHeader(final ByteBuff headerBuf,
394      boolean verifyChecksum) {
395    return headerBuf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX) + headerSize(verifyChecksum);
396  }
397
398  /**
399   * @return the on-disk size of the next block (including the header size and any checksums if
400   *   present) read by peeking into the next block's header; use as a hint when doing
401   *   a read of the next block when scanning or running over a file.
402   */
403  int getNextBlockOnDiskSize() {
404    return nextBlockOnDiskSize;
405  }
406
407  @Override
408  public BlockType getBlockType() {
409    return blockType;
410  }
411
412  @Override
413  public int refCnt() {
414    return buf.refCnt();
415  }
416
417  @Override
418  public HFileBlock retain() {
419    buf.retain();
420    return this;
421  }
422
423  /**
424   * Call {@link ByteBuff#release()} to decrease the reference count, if no other reference, it will
425   * return back the {@link ByteBuffer} to {@link org.apache.hadoop.hbase.io.ByteBuffAllocator}
426   */
427  @Override
428  public boolean release() {
429    return buf.release();
430  }
431
432  /** @return get data block encoding id that was used to encode this block */
433  short getDataBlockEncodingId() {
434    if (blockType != BlockType.ENCODED_DATA) {
435      throw new IllegalArgumentException("Querying encoder ID of a block " +
436          "of type other than " + BlockType.ENCODED_DATA + ": " + blockType);
437    }
438    return buf.getShort(headerSize());
439  }
440
441  /**
442   * @return the on-disk size of header + data part + checksum.
443   */
444  public int getOnDiskSizeWithHeader() {
445    return onDiskSizeWithoutHeader + headerSize();
446  }
447
448  /**
449   * @return the on-disk size of the data part + checksum (header excluded).
450   */
451  int getOnDiskSizeWithoutHeader() {
452    return onDiskSizeWithoutHeader;
453  }
454
455  /**
456   * @return the uncompressed size of data part (header and checksum excluded).
457   */
458  int getUncompressedSizeWithoutHeader() {
459    return uncompressedSizeWithoutHeader;
460  }
461
462  /**
463   * @return the offset of the previous block of the same type in the file, or
464   *         -1 if unknown
465   */
466  long getPrevBlockOffset() {
467    return prevBlockOffset;
468  }
469
470  /**
471   * Rewinds {@code buf} and writes first 4 header fields. {@code buf} position
472   * is modified as side-effect.
473   */
474  private void overwriteHeader() {
475    buf.rewind();
476    blockType.write(buf);
477    buf.putInt(onDiskSizeWithoutHeader);
478    buf.putInt(uncompressedSizeWithoutHeader);
479    buf.putLong(prevBlockOffset);
480    if (this.fileContext.isUseHBaseChecksum()) {
481      buf.put(fileContext.getChecksumType().getCode());
482      buf.putInt(fileContext.getBytesPerChecksum());
483      buf.putInt(onDiskDataSizeWithHeader);
484    }
485  }
486
487  /**
488   * Returns a buffer that does not include the header and checksum.
489   * @return the buffer with header skipped and checksum omitted.
490   */
491  public ByteBuff getBufferWithoutHeader() {
492    return this.getBufferWithoutHeader(false);
493  }
494
495  /**
496   * Returns a buffer that does not include the header or checksum.
497   * @param withChecksum to indicate whether include the checksum or not.
498   * @return the buffer with header skipped and checksum omitted.
499   */
500  public ByteBuff getBufferWithoutHeader(boolean withChecksum) {
501    ByteBuff dup = getBufferReadOnly();
502    int delta = withChecksum ? 0 : totalChecksumBytes();
503    return dup.position(headerSize()).limit(buf.limit() - delta).slice();
504  }
505
506  /**
507   * Returns a read-only duplicate of the buffer this block stores internally ready to be read.
508   * Clients must not modify the buffer object though they may set position and limit on the
509   * returned buffer since we pass back a duplicate. This method has to be public because it is used
510   * in {@link CompoundBloomFilter} to avoid object creation on every Bloom
511   * filter lookup, but has to be used with caution. Buffer holds header, block content,
512   * and any follow-on checksums if present.
513   *
514   * @return the buffer of this block for read-only operations
515   */
516  public ByteBuff getBufferReadOnly() {
517    // TODO: ByteBuf does not support asReadOnlyBuffer(). Fix.
518    ByteBuff dup = this.buf.duplicate();
519    assert dup.position() == 0;
520    return dup;
521  }
522
523  public ByteBuffAllocator getByteBuffAllocator() {
524    return this.allocator;
525  }
526
527  private void sanityCheckAssertion(long valueFromBuf, long valueFromField,
528      String fieldName) throws IOException {
529    if (valueFromBuf != valueFromField) {
530      throw new AssertionError(fieldName + " in the buffer (" + valueFromBuf
531          + ") is different from that in the field (" + valueFromField + ")");
532    }
533  }
534
535  private void sanityCheckAssertion(BlockType valueFromBuf, BlockType valueFromField)
536      throws IOException {
537    if (valueFromBuf != valueFromField) {
538      throw new IOException("Block type stored in the buffer: " +
539        valueFromBuf + ", block type field: " + valueFromField);
540    }
541  }
542
543  /**
544   * Checks if the block is internally consistent, i.e. the first
545   * {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the buffer contain a
546   * valid header consistent with the fields. Assumes a packed block structure.
547   * This function is primary for testing and debugging, and is not
548   * thread-safe, because it alters the internal buffer pointer.
549   * Used by tests only.
550   */
551  void sanityCheck() throws IOException {
552    // Duplicate so no side-effects
553    ByteBuff dup = this.buf.duplicate().rewind();
554    sanityCheckAssertion(BlockType.read(dup), blockType);
555
556    sanityCheckAssertion(dup.getInt(), onDiskSizeWithoutHeader, "onDiskSizeWithoutHeader");
557
558    sanityCheckAssertion(dup.getInt(), uncompressedSizeWithoutHeader,
559        "uncompressedSizeWithoutHeader");
560
561    sanityCheckAssertion(dup.getLong(), prevBlockOffset, "prevBlockOffset");
562    if (this.fileContext.isUseHBaseChecksum()) {
563      sanityCheckAssertion(dup.get(), this.fileContext.getChecksumType().getCode(), "checksumType");
564      sanityCheckAssertion(dup.getInt(), this.fileContext.getBytesPerChecksum(),
565          "bytesPerChecksum");
566      sanityCheckAssertion(dup.getInt(), onDiskDataSizeWithHeader, "onDiskDataSizeWithHeader");
567    }
568
569    int cksumBytes = totalChecksumBytes();
570    int expectedBufLimit = onDiskDataSizeWithHeader + cksumBytes;
571    if (dup.limit() != expectedBufLimit) {
572      throw new AssertionError("Expected limit " + expectedBufLimit + ", got " + dup.limit());
573    }
574
575    // We might optionally allocate HFILEBLOCK_HEADER_SIZE more bytes to read the next
576    // block's header, so there are two sensible values for buffer capacity.
577    int hdrSize = headerSize();
578    dup.rewind();
579    if (dup.remaining() != expectedBufLimit && dup.remaining() != expectedBufLimit + hdrSize) {
580      throw new AssertionError("Invalid buffer capacity: " + dup.remaining() +
581          ", expected " + expectedBufLimit + " or " + (expectedBufLimit + hdrSize));
582    }
583  }
584
585  @Override
586  public String toString() {
587    StringBuilder sb = new StringBuilder()
588      .append("[")
589      .append("blockType=").append(blockType)
590      .append(", fileOffset=").append(offset)
591      .append(", headerSize=").append(headerSize())
592      .append(", onDiskSizeWithoutHeader=").append(onDiskSizeWithoutHeader)
593      .append(", uncompressedSizeWithoutHeader=").append(uncompressedSizeWithoutHeader)
594      .append(", prevBlockOffset=").append(prevBlockOffset)
595      .append(", isUseHBaseChecksum=").append(fileContext.isUseHBaseChecksum());
596    if (fileContext.isUseHBaseChecksum()) {
597      sb.append(", checksumType=").append(ChecksumType.codeToType(this.buf.get(24)))
598        .append(", bytesPerChecksum=").append(this.buf.getInt(24 + 1))
599        .append(", onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader);
600    } else {
601      sb.append(", onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader)
602        .append("(").append(onDiskSizeWithoutHeader)
603        .append("+").append(HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM).append(")");
604    }
605    String dataBegin;
606    if (buf.hasArray()) {
607      dataBegin = Bytes.toStringBinary(buf.array(), buf.arrayOffset() + headerSize(),
608          Math.min(32, buf.limit() - buf.arrayOffset() - headerSize()));
609    } else {
610      ByteBuff bufWithoutHeader = getBufferWithoutHeader();
611      byte[] dataBeginBytes = new byte[Math.min(32,
612          bufWithoutHeader.limit() - bufWithoutHeader.position())];
613      bufWithoutHeader.get(dataBeginBytes);
614      dataBegin = Bytes.toStringBinary(dataBeginBytes);
615    }
616    sb.append(", getOnDiskSizeWithHeader=").append(getOnDiskSizeWithHeader())
617      .append(", totalChecksumBytes=").append(totalChecksumBytes())
618      .append(", isUnpacked=").append(isUnpacked())
619      .append(", buf=[").append(buf).append("]")
620      .append(", dataBeginsWith=").append(dataBegin)
621      .append(", fileContext=").append(fileContext)
622      .append(", nextBlockOnDiskSize=").append(nextBlockOnDiskSize)
623      .append("]");
624    return sb.toString();
625  }
626
627  /**
628   * Retrieves the decompressed/decrypted view of this block. An encoded block remains in its
629   * encoded structure. Internal structures are shared between instances where applicable.
630   */
631  HFileBlock unpack(HFileContext fileContext, FSReader reader) throws IOException {
632    if (!fileContext.isCompressedOrEncrypted()) {
633      // TODO: cannot use our own fileContext here because HFileBlock(ByteBuffer, boolean),
634      // which is used for block serialization to L2 cache, does not preserve encoding and
635      // encryption details.
636      return this;
637    }
638
639    HFileBlock unpacked = shallowClone(this);
640    unpacked.allocateBuffer(); // allocates space for the decompressed block
641    boolean succ = false;
642    try {
643      HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA
644          ? reader.getBlockDecodingContext() : reader.getDefaultBlockDecodingContext();
645      // Create a duplicated buffer without the header part.
646      ByteBuff dup = this.buf.duplicate();
647      dup.position(this.headerSize());
648      dup = dup.slice();
649      // Decode the dup into unpacked#buf
650      ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(),
651        unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(true), dup);
652      succ = true;
653      return unpacked;
654    } finally {
655      if (!succ) {
656        unpacked.release();
657      }
658    }
659  }
660
661  /**
662   * Always allocates a new buffer of the correct size. Copies header bytes
663   * from the existing buffer. Does not change header fields.
664   * Reserve room to keep checksum bytes too.
665   */
666  private void allocateBuffer() {
667    int cksumBytes = totalChecksumBytes();
668    int headerSize = headerSize();
669    int capacityNeeded = headerSize + uncompressedSizeWithoutHeader + cksumBytes;
670
671    ByteBuff newBuf = allocator.allocate(capacityNeeded);
672
673    // Copy header bytes into newBuf.
674    buf.position(0);
675    newBuf.put(0, buf, 0, headerSize);
676
677    buf = newBuf;
678    // set limit to exclude next block's header
679    buf.limit(capacityNeeded);
680  }
681
682  /**
683   * Return true when this block's buffer has been unpacked, false otherwise. Note this is a
684   * calculated heuristic, not tracked attribute of the block.
685   */
686  public boolean isUnpacked() {
687    final int cksumBytes = totalChecksumBytes();
688    final int headerSize = headerSize();
689    final int expectedCapacity = headerSize + uncompressedSizeWithoutHeader + cksumBytes;
690    final int bufCapacity = buf.remaining();
691    return bufCapacity == expectedCapacity || bufCapacity == expectedCapacity + headerSize;
692  }
693
694  /**
695   * Cannot be {@link #UNSET}. Must be a legitimate value. Used re-making the {@link BlockCacheKey}
696   * when block is returned to the cache.
697   * @return the offset of this block in the file it was read from
698   */
699  long getOffset() {
700    if (offset < 0) {
701      throw new IllegalStateException("HFile block offset not initialized properly");
702    }
703    return offset;
704  }
705
706  /**
707   * @return a byte stream reading the data + checksum of this block
708   */
709  DataInputStream getByteStream() {
710    ByteBuff dup = this.buf.duplicate();
711    dup.position(this.headerSize());
712    return new DataInputStream(new ByteBuffInputStream(dup));
713  }
714
715  @Override
716  public long heapSize() {
717    long size = FIXED_OVERHEAD;
718    size += fileContext.heapSize();
719    if (buf != null) {
720      // Deep overhead of the byte buffer. Needs to be aligned separately.
721      size += ClassSize.align(buf.capacity() + MULTI_BYTE_BUFFER_HEAP_SIZE);
722    }
723    return ClassSize.align(size);
724  }
725
726  /**
727   * Will be override by {@link SharedMemHFileBlock} or {@link ExclusiveMemHFileBlock}. Return true
728   * by default.
729   */
730  public boolean isSharedMem() {
731    if (this instanceof SharedMemHFileBlock) {
732      return true;
733    } else if (this instanceof ExclusiveMemHFileBlock) {
734      return false;
735    }
736    return true;
737  }
738
739  /**
740   * Unified version 2 {@link HFile} block writer. The intended usage pattern
741   * is as follows:
742   * <ol>
743   * <li>Construct an {@link HFileBlock.Writer}, providing a compression algorithm.
744   * <li>Call {@link Writer#startWriting} and get a data stream to write to.
745   * <li>Write your data into the stream.
746   * <li>Call Writer#writeHeaderAndData(FSDataOutputStream) as many times as you need to.
747   * store the serialized block into an external stream.
748   * <li>Repeat to write more blocks.
749   * </ol>
750   * <p>
751   */
752  static class Writer implements ShipperListener {
753    private enum State {
754      INIT,
755      WRITING,
756      BLOCK_READY
757    };
758
759    /** Writer state. Used to ensure the correct usage protocol. */
760    private State state = State.INIT;
761
762    /** Data block encoder used for data blocks */
763    private final HFileDataBlockEncoder dataBlockEncoder;
764
765    private HFileBlockEncodingContext dataBlockEncodingCtx;
766
767    /** block encoding context for non-data blocks*/
768    private HFileBlockDefaultEncodingContext defaultBlockEncodingCtx;
769
770    /**
771     * The stream we use to accumulate data into a block in an uncompressed format.
772     * We reset this stream at the end of each block and reuse it. The
773     * header is written as the first {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes into this
774     * stream.
775     */
776    private ByteArrayOutputStream baosInMemory;
777
778    /**
779     * Current block type. Set in {@link #startWriting(BlockType)}. Could be
780     * changed in {@link #finishBlock()} from {@link BlockType#DATA}
781     * to {@link BlockType#ENCODED_DATA}.
782     */
783    private BlockType blockType;
784
785    /**
786     * A stream that we write uncompressed bytes to, which compresses them and
787     * writes them to {@link #baosInMemory}.
788     */
789    private DataOutputStream userDataStream;
790
791    /**
792     * Bytes to be written to the file system, including the header. Compressed
793     * if compression is turned on. It also includes the checksum data that
794     * immediately follows the block data. (header + data + checksums)
795     */
796    private ByteArrayOutputStream onDiskBlockBytesWithHeader;
797
798    /**
799     * The size of the checksum data on disk. It is used only if data is
800     * not compressed. If data is compressed, then the checksums are already
801     * part of onDiskBytesWithHeader. If data is uncompressed, then this
802     * variable stores the checksum data for this block.
803     */
804    private byte[] onDiskChecksum = HConstants.EMPTY_BYTE_ARRAY;
805
806    /**
807     * Current block's start offset in the {@link HFile}. Set in
808     * {@link #writeHeaderAndData(FSDataOutputStream)}.
809     */
810    private long startOffset;
811
812    /**
813     * Offset of previous block by block type. Updated when the next block is
814     * started.
815     */
816    private long[] prevOffsetByType;
817
818    /** The offset of the previous block of the same type */
819    private long prevOffset;
820    /** Meta data that holds information about the hfileblock**/
821    private HFileContext fileContext;
822
823    private final ByteBuffAllocator allocator;
824
825    @Override
826    public void beforeShipped() {
827      if (getEncodingState() != null) {
828        getEncodingState().beforeShipped();
829      }
830    }
831
832    EncodingState getEncodingState() {
833      return dataBlockEncodingCtx.getEncodingState();
834    }
835
836    /**
837     * @param dataBlockEncoder data block encoding algorithm to use
838     */
839    public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext fileContext) {
840      this(dataBlockEncoder, fileContext, ByteBuffAllocator.HEAP);
841    }
842
843    public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext fileContext,
844        ByteBuffAllocator allocator) {
845      if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) {
846        throw new RuntimeException("Unsupported value of bytesPerChecksum. " +
847            " Minimum is " + HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is " +
848            fileContext.getBytesPerChecksum());
849      }
850      this.allocator = allocator;
851      this.dataBlockEncoder = dataBlockEncoder != null?
852          dataBlockEncoder: NoOpDataBlockEncoder.INSTANCE;
853      this.dataBlockEncodingCtx = this.dataBlockEncoder.
854          newDataBlockEncodingContext(HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
855      // TODO: This should be lazily instantiated since we usually do NOT need this default encoder
856      this.defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(null,
857          HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
858      // TODO: Set BAOS initial size. Use fileContext.getBlocksize() and add for header/checksum
859      baosInMemory = new ByteArrayOutputStream();
860      prevOffsetByType = new long[BlockType.values().length];
861      for (int i = 0; i < prevOffsetByType.length; ++i) {
862        prevOffsetByType[i] = UNSET;
863      }
864      // TODO: Why fileContext saved away when we have dataBlockEncoder and/or
865      // defaultDataBlockEncoder?
866      this.fileContext = fileContext;
867    }
868
869    /**
870     * Starts writing into the block. The previous block's data is discarded.
871     *
872     * @return the stream the user can write their data into
873     */
874    DataOutputStream startWriting(BlockType newBlockType)
875        throws IOException {
876      if (state == State.BLOCK_READY && startOffset != -1) {
877        // We had a previous block that was written to a stream at a specific
878        // offset. Save that offset as the last offset of a block of that type.
879        prevOffsetByType[blockType.getId()] = startOffset;
880      }
881
882      startOffset = -1;
883      blockType = newBlockType;
884
885      baosInMemory.reset();
886      baosInMemory.write(HConstants.HFILEBLOCK_DUMMY_HEADER);
887
888      state = State.WRITING;
889
890      // We will compress it later in finishBlock()
891      userDataStream = new ByteBufferWriterDataOutputStream(baosInMemory);
892      if (newBlockType == BlockType.DATA) {
893        this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream);
894      }
895      return userDataStream;
896    }
897
898    /**
899     * Writes the Cell to this block
900     */
901    void write(Cell cell) throws IOException{
902      expectState(State.WRITING);
903      this.dataBlockEncoder.encode(cell, dataBlockEncodingCtx, this.userDataStream);
904    }
905
906    /**
907     * Transitions the block writer from the "writing" state to the "block
908     * ready" state.  Does nothing if a block is already finished.
909     */
910    void ensureBlockReady() throws IOException {
911      Preconditions.checkState(state != State.INIT,
912          "Unexpected state: " + state);
913
914      if (state == State.BLOCK_READY) {
915        return;
916      }
917
918      // This will set state to BLOCK_READY.
919      finishBlock();
920    }
921
922    /**
923     * Finish up writing of the block.
924     * Flushes the compressing stream (if using compression), fills out the header,
925     * does any compression/encryption of bytes to flush out to disk, and manages
926     * the cache on write content, if applicable. Sets block write state to "block ready".
927     */
928    private void finishBlock() throws IOException {
929      if (blockType == BlockType.DATA) {
930        this.dataBlockEncoder.endBlockEncoding(dataBlockEncodingCtx, userDataStream,
931            baosInMemory.getBuffer(), blockType);
932        blockType = dataBlockEncodingCtx.getBlockType();
933      }
934      userDataStream.flush();
935      prevOffset = prevOffsetByType[blockType.getId()];
936
937      // We need to set state before we can package the block up for cache-on-write. In a way, the
938      // block is ready, but not yet encoded or compressed.
939      state = State.BLOCK_READY;
940      Bytes compressAndEncryptDat;
941      if (blockType == BlockType.DATA || blockType == BlockType.ENCODED_DATA) {
942        compressAndEncryptDat = dataBlockEncodingCtx.
943            compressAndEncrypt(baosInMemory.getBuffer(), 0, baosInMemory.size());
944      } else {
945        compressAndEncryptDat = defaultBlockEncodingCtx.
946            compressAndEncrypt(baosInMemory.getBuffer(), 0, baosInMemory.size());
947      }
948      if (compressAndEncryptDat == null) {
949        compressAndEncryptDat = new Bytes(baosInMemory.getBuffer(), 0, baosInMemory.size());
950      }
951      if (onDiskBlockBytesWithHeader == null) {
952        onDiskBlockBytesWithHeader = new ByteArrayOutputStream(compressAndEncryptDat.getLength());
953      }
954      onDiskBlockBytesWithHeader.reset();
955      onDiskBlockBytesWithHeader.write(compressAndEncryptDat.get(),
956            compressAndEncryptDat.getOffset(), compressAndEncryptDat.getLength());
957      // Calculate how many bytes we need for checksum on the tail of the block.
958      int numBytes = (int) ChecksumUtil.numBytes(
959          onDiskBlockBytesWithHeader.size(),
960          fileContext.getBytesPerChecksum());
961
962      // Put the header for the on disk bytes; header currently is unfilled-out
963      putHeader(onDiskBlockBytesWithHeader,
964          onDiskBlockBytesWithHeader.size() + numBytes,
965          baosInMemory.size(), onDiskBlockBytesWithHeader.size());
966      if (onDiskChecksum.length != numBytes) {
967        onDiskChecksum = new byte[numBytes];
968      }
969      ChecksumUtil.generateChecksums(
970          onDiskBlockBytesWithHeader.getBuffer(), 0,onDiskBlockBytesWithHeader.size(),
971          onDiskChecksum, 0, fileContext.getChecksumType(), fileContext.getBytesPerChecksum());
972    }
973
974    /**
975     * Put the header into the given byte array at the given offset.
976     * @param onDiskSize size of the block on disk header + data + checksum
977     * @param uncompressedSize size of the block after decompression (but
978     *          before optional data block decoding) including header
979     * @param onDiskDataSize size of the block on disk with header
980     *        and data but not including the checksums
981     */
982    private void putHeader(byte[] dest, int offset, int onDiskSize,
983        int uncompressedSize, int onDiskDataSize) {
984      offset = blockType.put(dest, offset);
985      offset = Bytes.putInt(dest, offset, onDiskSize - HConstants.HFILEBLOCK_HEADER_SIZE);
986      offset = Bytes.putInt(dest, offset, uncompressedSize - HConstants.HFILEBLOCK_HEADER_SIZE);
987      offset = Bytes.putLong(dest, offset, prevOffset);
988      offset = Bytes.putByte(dest, offset, fileContext.getChecksumType().getCode());
989      offset = Bytes.putInt(dest, offset, fileContext.getBytesPerChecksum());
990      Bytes.putInt(dest, offset, onDiskDataSize);
991    }
992
993    private void putHeader(ByteBuff buff, int onDiskSize,
994        int uncompressedSize, int onDiskDataSize) {
995      buff.rewind();
996      blockType.write(buff);
997      buff.putInt(onDiskSize - HConstants.HFILEBLOCK_HEADER_SIZE);
998      buff.putInt(uncompressedSize - HConstants.HFILEBLOCK_HEADER_SIZE);
999      buff.putLong(prevOffset);
1000      buff.put(fileContext.getChecksumType().getCode());
1001      buff.putInt(fileContext.getBytesPerChecksum());
1002      buff.putInt(onDiskDataSize);
1003    }
1004
1005    private void putHeader(ByteArrayOutputStream dest, int onDiskSize,
1006        int uncompressedSize, int onDiskDataSize) {
1007      putHeader(dest.getBuffer(),0, onDiskSize, uncompressedSize, onDiskDataSize);
1008    }
1009
1010    /**
1011     * Similar to {@link #writeHeaderAndData(FSDataOutputStream)}, but records
1012     * the offset of this block so that it can be referenced in the next block
1013     * of the same type.
1014     */
1015    void writeHeaderAndData(FSDataOutputStream out) throws IOException {
1016      long offset = out.getPos();
1017      if (startOffset != UNSET && offset != startOffset) {
1018        throw new IOException("A " + blockType + " block written to a "
1019            + "stream twice, first at offset " + startOffset + ", then at "
1020            + offset);
1021      }
1022      startOffset = offset;
1023      finishBlockAndWriteHeaderAndData(out);
1024    }
1025
1026    /**
1027     * Writes the header and the compressed data of this block (or uncompressed
1028     * data when not using compression) into the given stream. Can be called in
1029     * the "writing" state or in the "block ready" state. If called in the
1030     * "writing" state, transitions the writer to the "block ready" state.
1031     * @param out the output stream to write the
1032     */
1033    protected void finishBlockAndWriteHeaderAndData(DataOutputStream out)
1034      throws IOException {
1035      ensureBlockReady();
1036      long startTime = System.currentTimeMillis();
1037      out.write(onDiskBlockBytesWithHeader.getBuffer(), 0, onDiskBlockBytesWithHeader.size());
1038      out.write(onDiskChecksum);
1039      HFile.updateWriteLatency(System.currentTimeMillis() - startTime);
1040    }
1041
1042    /**
1043     * Returns the header or the compressed data (or uncompressed data when not
1044     * using compression) as a byte array. Can be called in the "writing" state
1045     * or in the "block ready" state. If called in the "writing" state,
1046     * transitions the writer to the "block ready" state. This returns
1047     * the header + data + checksums stored on disk.
1048     *
1049     * @return header and data as they would be stored on disk in a byte array
1050     */
1051    byte[] getHeaderAndDataForTest() throws IOException {
1052      ensureBlockReady();
1053      // This is not very optimal, because we are doing an extra copy.
1054      // But this method is used only by unit tests.
1055      byte[] output =
1056          new byte[onDiskBlockBytesWithHeader.size()
1057              + onDiskChecksum.length];
1058      System.arraycopy(onDiskBlockBytesWithHeader.getBuffer(), 0, output, 0,
1059          onDiskBlockBytesWithHeader.size());
1060      System.arraycopy(onDiskChecksum, 0, output,
1061          onDiskBlockBytesWithHeader.size(), onDiskChecksum.length);
1062      return output;
1063    }
1064
1065    /**
1066     * Releases resources used by this writer.
1067     */
1068    void release() {
1069      if (dataBlockEncodingCtx != null) {
1070        dataBlockEncodingCtx.close();
1071        dataBlockEncodingCtx = null;
1072      }
1073      if (defaultBlockEncodingCtx != null) {
1074        defaultBlockEncodingCtx.close();
1075        defaultBlockEncodingCtx = null;
1076      }
1077    }
1078
1079    /**
1080     * Returns the on-disk size of the data portion of the block. This is the
1081     * compressed size if compression is enabled. Can only be called in the
1082     * "block ready" state. Header is not compressed, and its size is not
1083     * included in the return value.
1084     *
1085     * @return the on-disk size of the block, not including the header.
1086     */
1087    int getOnDiskSizeWithoutHeader() {
1088      expectState(State.BLOCK_READY);
1089      return onDiskBlockBytesWithHeader.size() +
1090          onDiskChecksum.length - HConstants.HFILEBLOCK_HEADER_SIZE;
1091    }
1092
1093    /**
1094     * Returns the on-disk size of the block. Can only be called in the
1095     * "block ready" state.
1096     *
1097     * @return the on-disk size of the block ready to be written, including the
1098     *         header size, the data and the checksum data.
1099     */
1100    int getOnDiskSizeWithHeader() {
1101      expectState(State.BLOCK_READY);
1102      return onDiskBlockBytesWithHeader.size() + onDiskChecksum.length;
1103    }
1104
1105    /**
1106     * The uncompressed size of the block data. Does not include header size.
1107     */
1108    int getUncompressedSizeWithoutHeader() {
1109      expectState(State.BLOCK_READY);
1110      return baosInMemory.size() - HConstants.HFILEBLOCK_HEADER_SIZE;
1111    }
1112
1113    /**
1114     * The uncompressed size of the block data, including header size.
1115     */
1116    int getUncompressedSizeWithHeader() {
1117      expectState(State.BLOCK_READY);
1118      return baosInMemory.size();
1119    }
1120
1121    /** @return true if a block is being written  */
1122    boolean isWriting() {
1123      return state == State.WRITING;
1124    }
1125
1126    /**
1127     * Returns the number of bytes written into the current block so far, or
1128     * zero if not writing the block at the moment. Note that this will return
1129     * zero in the "block ready" state as well.
1130     *
1131     * @return the number of bytes written
1132     */
1133    public int encodedBlockSizeWritten() {
1134      return state != State.WRITING ? 0 : this.getEncodingState().getEncodedDataSizeWritten();
1135    }
1136
1137    /**
1138     * Returns the number of bytes written into the current block so far, or
1139     * zero if not writing the block at the moment. Note that this will return
1140     * zero in the "block ready" state as well.
1141     *
1142     * @return the number of bytes written
1143     */
1144    int blockSizeWritten() {
1145      return state != State.WRITING ? 0 : this.getEncodingState().getUnencodedDataSizeWritten();
1146    }
1147
1148    /**
1149     * Clones the header followed by the uncompressed data, even if using
1150     * compression. This is needed for storing uncompressed blocks in the block
1151     * cache. Can be called in the "writing" state or the "block ready" state.
1152     * Returns only the header and data, does not include checksum data.
1153     *
1154     * @return Returns an uncompressed block ByteBuff for caching on write
1155     */
1156    ByteBuff cloneUncompressedBufferWithHeader() {
1157      expectState(State.BLOCK_READY);
1158      ByteBuff bytebuff = allocator.allocate(baosInMemory.size());
1159      baosInMemory.toByteBuff(bytebuff);
1160      int numBytes = (int) ChecksumUtil.numBytes(
1161          onDiskBlockBytesWithHeader.size(),
1162          fileContext.getBytesPerChecksum());
1163      putHeader(bytebuff, onDiskBlockBytesWithHeader.size() + numBytes,
1164          baosInMemory.size(), onDiskBlockBytesWithHeader.size());
1165      bytebuff.rewind();
1166      return bytebuff;
1167    }
1168
1169    /**
1170     * Clones the header followed by the on-disk (compressed/encoded/encrypted) data. This is needed
1171     * for storing packed blocks in the block cache. Returns only the header and data, Does not
1172     * include checksum data.
1173     * @return Returns a copy of block bytes for caching on write
1174     */
1175    private ByteBuff cloneOnDiskBufferWithHeader() {
1176      expectState(State.BLOCK_READY);
1177      ByteBuff bytebuff = allocator.allocate(onDiskBlockBytesWithHeader.size());
1178      onDiskBlockBytesWithHeader.toByteBuff(bytebuff);
1179      bytebuff.rewind();
1180      return bytebuff;
1181    }
1182
1183    private void expectState(State expectedState) {
1184      if (state != expectedState) {
1185        throw new IllegalStateException("Expected state: " + expectedState +
1186            ", actual state: " + state);
1187      }
1188    }
1189
1190    /**
1191     * Takes the given {@link BlockWritable} instance, creates a new block of
1192     * its appropriate type, writes the writable into this block, and flushes
1193     * the block into the output stream. The writer is instructed not to buffer
1194     * uncompressed bytes for cache-on-write.
1195     *
1196     * @param bw the block-writable object to write as a block
1197     * @param out the file system output stream
1198     */
1199    void writeBlock(BlockWritable bw, FSDataOutputStream out)
1200        throws IOException {
1201      bw.writeToBlock(startWriting(bw.getBlockType()));
1202      writeHeaderAndData(out);
1203    }
1204
1205    /**
1206     * Creates a new HFileBlock. Checksums have already been validated, so
1207     * the byte buffer passed into the constructor of this newly created
1208     * block does not have checksum data even though the header minor
1209     * version is MINOR_VERSION_WITH_CHECKSUM. This is indicated by setting a
1210     * 0 value in bytesPerChecksum. This method copies the on-disk or
1211     * uncompressed data to build the HFileBlock which is used only
1212     * while writing blocks and caching.
1213     *
1214     * <p>TODO: Should there be an option where a cache can ask that hbase preserve block
1215     * checksums for checking after a block comes out of the cache? Otehrwise, cache is responsible
1216     * for blocks being wholesome (ECC memory or if file-backed, it does checksumming).
1217     */
1218    HFileBlock getBlockForCaching(CacheConfig cacheConf) {
1219      HFileContext newContext = new HFileContextBuilder()
1220                                .withBlockSize(fileContext.getBlocksize())
1221                                .withBytesPerCheckSum(0)
1222                                .withChecksumType(ChecksumType.NULL) // no checksums in cached data
1223                                .withCompression(fileContext.getCompression())
1224                                .withDataBlockEncoding(fileContext.getDataBlockEncoding())
1225                                .withHBaseCheckSum(fileContext.isUseHBaseChecksum())
1226                                .withCompressTags(fileContext.isCompressTags())
1227                                .withIncludesMvcc(fileContext.isIncludesMvcc())
1228                                .withIncludesTags(fileContext.isIncludesTags())
1229                                .withColumnFamily(fileContext.getColumnFamily())
1230                                .withTableName(fileContext.getTableName())
1231                                .build();
1232      // Build the HFileBlock.
1233      HFileBlockBuilder builder = new HFileBlockBuilder();
1234      ByteBuff buff;
1235      if (cacheConf.shouldCacheCompressed(blockType.getCategory())) {
1236        buff = cloneOnDiskBufferWithHeader();
1237      } else {
1238        buff = cloneUncompressedBufferWithHeader();
1239      }
1240      return builder.withBlockType(blockType)
1241          .withOnDiskSizeWithoutHeader(getOnDiskSizeWithoutHeader())
1242          .withUncompressedSizeWithoutHeader(getUncompressedSizeWithoutHeader())
1243          .withPrevBlockOffset(prevOffset)
1244          .withByteBuff(buff)
1245          .withFillHeader(FILL_HEADER)
1246          .withOffset(startOffset)
1247          .withNextBlockOnDiskSize(UNSET)
1248          .withOnDiskDataSizeWithHeader(onDiskBlockBytesWithHeader.size() + onDiskChecksum.length)
1249          .withHFileContext(newContext)
1250          .withByteBuffAllocator(cacheConf.getByteBuffAllocator())
1251          .withShared(!buff.hasArray())
1252          .build();
1253    }
1254  }
1255
1256  /** Something that can be written into a block. */
1257  interface BlockWritable {
1258    /** The type of block this data should use. */
1259    BlockType getBlockType();
1260
1261    /**
1262     * Writes the block to the provided stream. Must not write any magic
1263     * records.
1264     *
1265     * @param out a stream to write uncompressed data into
1266     */
1267    void writeToBlock(DataOutput out) throws IOException;
1268  }
1269
1270  /**
1271   * Iterator for reading {@link HFileBlock}s in load-on-open-section, such as root data index
1272   * block, meta index block, file info block etc.
1273   */
1274  interface BlockIterator {
1275    /**
1276     * Get the next block, or null if there are no more blocks to iterate.
1277     */
1278    HFileBlock nextBlock() throws IOException;
1279
1280    /**
1281     * Similar to {@link #nextBlock()} but checks block type, throws an exception if incorrect, and
1282     * returns the HFile block
1283     */
1284    HFileBlock nextBlockWithBlockType(BlockType blockType) throws IOException;
1285
1286    /**
1287     * Now we use the {@link ByteBuffAllocator} to manage the nio ByteBuffers for HFileBlocks, so we
1288     * must deallocate all of the ByteBuffers in the end life. the BlockIterator's life cycle is
1289     * starting from opening an HFileReader and stopped when the HFileReader#close, so we will keep
1290     * track all the read blocks until we call {@link BlockIterator#freeBlocks()} when closing the
1291     * HFileReader. Sum bytes of those blocks in load-on-open section should be quite small, so
1292     * tracking them should be OK.
1293     */
1294    void freeBlocks();
1295  }
1296
1297  /** An HFile block reader with iteration ability. */
1298  interface FSReader {
1299    /**
1300     * Reads the block at the given offset in the file with the given on-disk size and uncompressed
1301     * size.
1302     * @param offset of the file to read
1303     * @param onDiskSize the on-disk size of the entire block, including all applicable headers, or
1304     *          -1 if unknown
1305     * @param pread true to use pread, otherwise use the stream read.
1306     * @param updateMetrics update the metrics or not.
1307     * @param intoHeap allocate the block's ByteBuff by {@link ByteBuffAllocator} or JVM heap. For
1308     *          LRUBlockCache, we must ensure that the block to cache is an heap one, because the
1309     *          memory occupation is based on heap now, also for {@link CombinedBlockCache}, we use
1310     *          the heap LRUBlockCache as L1 cache to cache small blocks such as IndexBlock or
1311     *          MetaBlock for faster access. So introduce an flag here to decide whether allocate
1312     *          from JVM heap or not so that we can avoid an extra off-heap to heap memory copy when
1313     *          using LRUBlockCache. For most cases, we known what's the expected block type we'll
1314     *          read, while for some special case (Example: HFileReaderImpl#readNextDataBlock()), we
1315     *          cannot pre-decide what's the expected block type, then we can only allocate block's
1316     *          ByteBuff from {@link ByteBuffAllocator} firstly, and then when caching it in
1317     *          {@link LruBlockCache} we'll check whether the ByteBuff is from heap or not, if not
1318     *          then we'll clone it to an heap one and cache it.
1319     * @return the newly read block
1320     */
1321    HFileBlock readBlockData(long offset, long onDiskSize, boolean pread, boolean updateMetrics,
1322        boolean intoHeap) throws IOException;
1323
1324    /**
1325     * Creates a block iterator over the given portion of the {@link HFile}.
1326     * The iterator returns blocks starting with offset such that offset &lt;=
1327     * startOffset &lt; endOffset. Returned blocks are always unpacked.
1328     * Used when no hfile index available; e.g. reading in the hfile index
1329     * blocks themselves on file open.
1330     *
1331     * @param startOffset the offset of the block to start iteration with
1332     * @param endOffset the offset to end iteration at (exclusive)
1333     * @return an iterator of blocks between the two given offsets
1334     */
1335    BlockIterator blockRange(long startOffset, long endOffset);
1336
1337    /** Closes the backing streams */
1338    void closeStreams() throws IOException;
1339
1340    /** Get a decoder for {@link BlockType#ENCODED_DATA} blocks from this file. */
1341    HFileBlockDecodingContext getBlockDecodingContext();
1342
1343    /** Get the default decoder for blocks from this file. */
1344    HFileBlockDecodingContext getDefaultBlockDecodingContext();
1345
1346    void setIncludesMemStoreTS(boolean includesMemstoreTS);
1347    void setDataBlockEncoder(HFileDataBlockEncoder encoder);
1348
1349    /**
1350     * To close the stream's socket. Note: This can be concurrently called from multiple threads and
1351     * implementation should take care of thread safety.
1352     */
1353    void unbufferStream();
1354  }
1355
1356  /**
1357   * Data-structure to use caching the header of the NEXT block. Only works if next read
1358   * that comes in here is next in sequence in this block.
1359   *
1360   * When we read, we read current block and the next blocks' header. We do this so we have
1361   * the length of the next block to read if the hfile index is not available (rare, at
1362   * hfile open only).
1363   */
1364  private static class PrefetchedHeader {
1365    long offset = -1;
1366    byte[] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE];
1367    final ByteBuff buf = new SingleByteBuff(ByteBuffer.wrap(header, 0, header.length));
1368
1369    @Override
1370    public String toString() {
1371      return "offset=" + this.offset + ", header=" + Bytes.toStringBinary(header);
1372    }
1373  }
1374
1375  /**
1376   * Reads version 2 HFile blocks from the filesystem.
1377   */
1378  static class FSReaderImpl implements FSReader {
1379    /** The file system stream of the underlying {@link HFile} that
1380     * does or doesn't do checksum validations in the filesystem */
1381    private FSDataInputStreamWrapper streamWrapper;
1382
1383    private HFileBlockDecodingContext encodedBlockDecodingCtx;
1384
1385    /** Default context used when BlockType != {@link BlockType#ENCODED_DATA}. */
1386    private final HFileBlockDefaultDecodingContext defaultDecodingCtx;
1387
1388    /**
1389     * Cache of the NEXT header after this. Check it is indeed next blocks header
1390     * before using it. TODO: Review. This overread into next block to fetch
1391     * next blocks header seems unnecessary given we usually get the block size
1392     * from the hfile index. Review!
1393     */
1394    private AtomicReference<PrefetchedHeader> prefetchedHeader =
1395      new AtomicReference<>(new PrefetchedHeader());
1396
1397    /** The size of the file we are reading from, or -1 if unknown. */
1398    private long fileSize;
1399
1400    /** The size of the header */
1401    protected final int hdrSize;
1402
1403    /** The filesystem used to access data */
1404    private HFileSystem hfs;
1405
1406    private HFileContext fileContext;
1407    // Cache the fileName
1408    private String pathName;
1409
1410    private final ByteBuffAllocator allocator;
1411
1412    private final Lock streamLock = new ReentrantLock();
1413
1414    FSReaderImpl(ReaderContext readerContext, HFileContext fileContext,
1415        ByteBuffAllocator allocator) throws IOException {
1416      this.fileSize = readerContext.getFileSize();
1417      this.hfs = readerContext.getFileSystem();
1418      if (readerContext.getFilePath() != null) {
1419        this.pathName = readerContext.getFilePath().toString();
1420      }
1421      this.fileContext = fileContext;
1422      this.hdrSize = headerSize(fileContext.isUseHBaseChecksum());
1423      this.allocator = allocator;
1424
1425      this.streamWrapper = readerContext.getInputStreamWrapper();
1426      // Older versions of HBase didn't support checksum.
1427      this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum());
1428      defaultDecodingCtx = new HFileBlockDefaultDecodingContext(fileContext);
1429      encodedBlockDecodingCtx = defaultDecodingCtx;
1430    }
1431
1432    @Override
1433    public BlockIterator blockRange(final long startOffset, final long endOffset) {
1434      final FSReader owner = this; // handle for inner class
1435      return new BlockIterator() {
1436        private volatile boolean freed = false;
1437        // Tracking all read blocks until we call freeBlocks.
1438        private List<HFileBlock> blockTracker = new ArrayList<>();
1439        private long offset = startOffset;
1440        // Cache length of next block. Current block has the length of next block in it.
1441        private long length = -1;
1442
1443        @Override
1444        public HFileBlock nextBlock() throws IOException {
1445          if (offset >= endOffset) {
1446            return null;
1447          }
1448          HFileBlock b = readBlockData(offset, length, false, false, true);
1449          offset += b.getOnDiskSizeWithHeader();
1450          length = b.getNextBlockOnDiskSize();
1451          HFileBlock uncompressed = b.unpack(fileContext, owner);
1452          if (uncompressed != b) {
1453            b.release(); // Need to release the compressed Block now.
1454          }
1455          blockTracker.add(uncompressed);
1456          return uncompressed;
1457        }
1458
1459        @Override
1460        public HFileBlock nextBlockWithBlockType(BlockType blockType) throws IOException {
1461          HFileBlock blk = nextBlock();
1462          if (blk.getBlockType() != blockType) {
1463            throw new IOException(
1464                "Expected block of type " + blockType + " but found " + blk.getBlockType());
1465          }
1466          return blk;
1467        }
1468
1469        @Override
1470        public void freeBlocks() {
1471          if (freed) {
1472            return;
1473          }
1474          blockTracker.forEach(HFileBlock::release);
1475          blockTracker = null;
1476          freed = true;
1477        }
1478      };
1479    }
1480
1481    /**
1482     * Does a positional read or a seek and read into the given byte buffer. We need take care that
1483     * we will call the {@link ByteBuff#release()} for every exit to deallocate the ByteBuffers,
1484     * otherwise the memory leak may happen.
1485     * @param dest destination buffer
1486     * @param size size of read
1487     * @param peekIntoNextBlock whether to read the next block's on-disk size
1488     * @param fileOffset position in the stream to read at
1489     * @param pread whether we should do a positional read
1490     * @param istream The input source of data
1491     * @return true to indicate the destination buffer include the next block header, otherwise only
1492     *         include the current block data without the next block header.
1493     * @throws IOException if any IO error happen.
1494     */
1495    protected boolean readAtOffset(FSDataInputStream istream, ByteBuff dest, int size,
1496        boolean peekIntoNextBlock, long fileOffset, boolean pread) throws IOException {
1497      if (!pread) {
1498        // Seek + read. Better for scanning.
1499        HFileUtil.seekOnMultipleSources(istream, fileOffset);
1500        long realOffset = istream.getPos();
1501        if (realOffset != fileOffset) {
1502          throw new IOException("Tried to seek to " + fileOffset + " to read " + size
1503              + " bytes, but pos=" + realOffset + " after seek");
1504        }
1505        if (!peekIntoNextBlock) {
1506          BlockIOUtils.readFully(dest, istream, size);
1507          return false;
1508        }
1509
1510        // Try to read the next block header
1511        if (!BlockIOUtils.readWithExtra(dest, istream, size, hdrSize)) {
1512          // did not read the next block header.
1513          return false;
1514        }
1515      } else {
1516        // Positional read. Better for random reads; or when the streamLock is already locked.
1517        int extraSize = peekIntoNextBlock ? hdrSize : 0;
1518        if (!BlockIOUtils.preadWithExtra(dest, istream, fileOffset, size, extraSize)) {
1519          // did not read the next block header.
1520          return false;
1521        }
1522      }
1523      assert peekIntoNextBlock;
1524      return true;
1525    }
1526
1527    /**
1528     * Reads a version 2 block (version 1 blocks not supported and not expected). Tries to do as
1529     * little memory allocation as possible, using the provided on-disk size.
1530     * @param offset the offset in the stream to read at
1531     * @param onDiskSizeWithHeaderL the on-disk size of the block, including the header, or -1 if
1532     *          unknown; i.e. when iterating over blocks reading in the file metadata info.
1533     * @param pread whether to use a positional read
1534     * @param updateMetrics whether to update the metrics
1535     * @param intoHeap allocate ByteBuff of block from heap or off-heap.
1536     * @see FSReader#readBlockData(long, long, boolean, boolean, boolean) for more details about the
1537     *      useHeap.
1538     */
1539    @Override
1540    public HFileBlock readBlockData(long offset, long onDiskSizeWithHeaderL, boolean pread,
1541        boolean updateMetrics, boolean intoHeap) throws IOException {
1542      // Get a copy of the current state of whether to validate
1543      // hbase checksums or not for this read call. This is not
1544      // thread-safe but the one constaint is that if we decide
1545      // to skip hbase checksum verification then we are
1546      // guaranteed to use hdfs checksum verification.
1547      boolean doVerificationThruHBaseChecksum = streamWrapper.shouldUseHBaseChecksum();
1548      FSDataInputStream is = streamWrapper.getStream(doVerificationThruHBaseChecksum);
1549
1550      HFileBlock blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread,
1551        doVerificationThruHBaseChecksum, updateMetrics, intoHeap);
1552      if (blk == null) {
1553        HFile.LOG.warn("HBase checksum verification failed for file " +
1554                       pathName + " at offset " +
1555                       offset + " filesize " + fileSize +
1556                       ". Retrying read with HDFS checksums turned on...");
1557
1558        if (!doVerificationThruHBaseChecksum) {
1559          String msg = "HBase checksum verification failed for file " +
1560                       pathName + " at offset " +
1561                       offset + " filesize " + fileSize +
1562                       " but this cannot happen because doVerify is " +
1563                       doVerificationThruHBaseChecksum;
1564          HFile.LOG.warn(msg);
1565          throw new IOException(msg); // cannot happen case here
1566        }
1567        HFile.CHECKSUM_FAILURES.increment(); // update metrics
1568
1569        // If we have a checksum failure, we fall back into a mode where
1570        // the next few reads use HDFS level checksums. We aim to make the
1571        // next CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD reads avoid
1572        // hbase checksum verification, but since this value is set without
1573        // holding any locks, it can so happen that we might actually do
1574        // a few more than precisely this number.
1575        is = this.streamWrapper.fallbackToFsChecksum(CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD);
1576        doVerificationThruHBaseChecksum = false;
1577        blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread,
1578          doVerificationThruHBaseChecksum, updateMetrics, intoHeap);
1579        if (blk != null) {
1580          HFile.LOG.warn("HDFS checksum verification succeeded for file " +
1581                         pathName + " at offset " +
1582                         offset + " filesize " + fileSize);
1583        }
1584      }
1585      if (blk == null && !doVerificationThruHBaseChecksum) {
1586        String msg = "readBlockData failed, possibly due to " +
1587                     "checksum verification failed for file " + pathName +
1588                     " at offset " + offset + " filesize " + fileSize;
1589        HFile.LOG.warn(msg);
1590        throw new IOException(msg);
1591      }
1592
1593      // If there is a checksum mismatch earlier, then retry with
1594      // HBase checksums switched off and use HDFS checksum verification.
1595      // This triggers HDFS to detect and fix corrupt replicas. The
1596      // next checksumOffCount read requests will use HDFS checksums.
1597      // The decrementing of this.checksumOffCount is not thread-safe,
1598      // but it is harmless because eventually checksumOffCount will be
1599      // a negative number.
1600      streamWrapper.checksumOk();
1601      return blk;
1602    }
1603
1604    /**
1605     * @return Check <code>onDiskSizeWithHeaderL</code> size is healthy and then return it as an int
1606     */
1607    private static int checkAndGetSizeAsInt(final long onDiskSizeWithHeaderL, final int hdrSize)
1608        throws IOException {
1609      if ((onDiskSizeWithHeaderL < hdrSize && onDiskSizeWithHeaderL != -1)
1610          || onDiskSizeWithHeaderL >= Integer.MAX_VALUE) {
1611        throw new IOException("Invalid onDisksize=" + onDiskSizeWithHeaderL
1612            + ": expected to be at least " + hdrSize
1613            + " and at most " + Integer.MAX_VALUE + ", or -1");
1614      }
1615      return (int)onDiskSizeWithHeaderL;
1616    }
1617
1618    /**
1619     * Verify the passed in onDiskSizeWithHeader aligns with what is in the header else something
1620     * is not right.
1621     */
1622    private void verifyOnDiskSizeMatchesHeader(final int passedIn, final ByteBuff headerBuf,
1623           final long offset, boolean verifyChecksum)
1624         throws IOException {
1625      // Assert size provided aligns with what is in the header
1626      int fromHeader = getOnDiskSizeWithHeader(headerBuf, verifyChecksum);
1627      if (passedIn != fromHeader) {
1628        throw new IOException("Passed in onDiskSizeWithHeader=" + passedIn + " != " + fromHeader +
1629            ", offset=" + offset + ", fileContext=" + this.fileContext);
1630      }
1631    }
1632
1633    /**
1634     * Check atomic reference cache for this block's header. Cache only good if next
1635     * read coming through is next in sequence in the block. We read next block's
1636     * header on the tail of reading the previous block to save a seek. Otherwise,
1637     * we have to do a seek to read the header before we can pull in the block OR
1638     * we have to backup the stream because we over-read (the next block's header).
1639     * @see PrefetchedHeader
1640     * @return The cached block header or null if not found.
1641     * @see #cacheNextBlockHeader(long, ByteBuff, int, int)
1642     */
1643    private ByteBuff getCachedHeader(final long offset) {
1644      PrefetchedHeader ph = this.prefetchedHeader.get();
1645      return ph != null && ph.offset == offset ? ph.buf : null;
1646    }
1647
1648    /**
1649     * Save away the next blocks header in atomic reference.
1650     * @see #getCachedHeader(long)
1651     * @see PrefetchedHeader
1652     */
1653    private void cacheNextBlockHeader(final long offset,
1654        ByteBuff onDiskBlock, int onDiskSizeWithHeader, int headerLength) {
1655      PrefetchedHeader ph = new PrefetchedHeader();
1656      ph.offset = offset;
1657      onDiskBlock.get(onDiskSizeWithHeader, ph.header, 0, headerLength);
1658      this.prefetchedHeader.set(ph);
1659    }
1660
1661    private int getNextBlockOnDiskSize(boolean readNextHeader, ByteBuff onDiskBlock,
1662        int onDiskSizeWithHeader) {
1663      int nextBlockOnDiskSize = -1;
1664      if (readNextHeader) {
1665        nextBlockOnDiskSize =
1666            onDiskBlock.getIntAfterPosition(onDiskSizeWithHeader + BlockType.MAGIC_LENGTH)
1667                + hdrSize;
1668      }
1669      return nextBlockOnDiskSize;
1670    }
1671
1672    private ByteBuff allocate(int size, boolean intoHeap) {
1673      return intoHeap ? HEAP.allocate(size) : allocator.allocate(size);
1674    }
1675
1676    /**
1677     * Reads a version 2 block.
1678     * @param offset the offset in the stream to read at.
1679     * @param onDiskSizeWithHeaderL the on-disk size of the block, including the header and
1680     *          checksums if present or -1 if unknown (as a long). Can be -1 if we are doing raw
1681     *          iteration of blocks as when loading up file metadata; i.e. the first read of a new
1682     *          file. Usually non-null gotten from the file index.
1683     * @param pread whether to use a positional read
1684     * @param verifyChecksum Whether to use HBase checksums. If HBase checksum is switched off, then
1685     *          use HDFS checksum. Can also flip on/off reading same file if we hit a troublesome
1686     *          patch in an hfile.
1687     * @param updateMetrics whether need to update the metrics.
1688     * @param intoHeap allocate the ByteBuff of block from heap or off-heap.
1689     * @return the HFileBlock or null if there is a HBase checksum mismatch
1690     */
1691    protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset,
1692        long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum, boolean updateMetrics,
1693        boolean intoHeap) throws IOException {
1694      if (offset < 0) {
1695        throw new IOException("Invalid offset=" + offset + " trying to read "
1696            + "block (onDiskSize=" + onDiskSizeWithHeaderL + ")");
1697      }
1698      int onDiskSizeWithHeader = checkAndGetSizeAsInt(onDiskSizeWithHeaderL, hdrSize);
1699      // Try and get cached header. Will serve us in rare case where onDiskSizeWithHeaderL is -1
1700      // and will save us having to seek the stream backwards to reread the header we
1701      // read the last time through here.
1702      ByteBuff headerBuf = getCachedHeader(offset);
1703      LOG.trace("Reading {} at offset={}, pread={}, verifyChecksum={}, cachedHeader={}, " +
1704          "onDiskSizeWithHeader={}", this.fileContext.getHFileName(), offset, pread,
1705          verifyChecksum, headerBuf, onDiskSizeWithHeader);
1706      // This is NOT same as verifyChecksum. This latter is whether to do hbase
1707      // checksums. Can change with circumstances. The below flag is whether the
1708      // file has support for checksums (version 2+).
1709      boolean checksumSupport = this.fileContext.isUseHBaseChecksum();
1710      long startTime = System.currentTimeMillis();
1711      if (onDiskSizeWithHeader <= 0) {
1712        // We were not passed the block size. Need to get it from the header. If header was
1713        // not cached (see getCachedHeader above), need to seek to pull it in. This is costly
1714        // and should happen very rarely. Currently happens on open of a hfile reader where we
1715        // read the trailer blocks to pull in the indices. Otherwise, we are reading block sizes
1716        // out of the hfile index. To check, enable TRACE in this file and you'll get an exception
1717        // in a LOG every time we seek. See HBASE-17072 for more detail.
1718        if (headerBuf == null) {
1719          if (LOG.isTraceEnabled()) {
1720            LOG.trace("Extra see to get block size!", new RuntimeException());
1721          }
1722          headerBuf = HEAP.allocate(hdrSize);
1723          readAtOffset(is, headerBuf, hdrSize, false, offset, pread);
1724          headerBuf.rewind();
1725        }
1726        onDiskSizeWithHeader = getOnDiskSizeWithHeader(headerBuf, checksumSupport);
1727      }
1728      int preReadHeaderSize = headerBuf == null? 0 : hdrSize;
1729      // Allocate enough space to fit the next block's header too; saves a seek next time through.
1730      // onDiskBlock is whole block + header + checksums then extra hdrSize to read next header;
1731      // onDiskSizeWithHeader is header, body, and any checksums if present. preReadHeaderSize
1732      // says where to start reading. If we have the header cached, then we don't need to read
1733      // it again and we can likely read from last place we left off w/o need to backup and reread
1734      // the header we read last time through here.
1735      ByteBuff onDiskBlock = this.allocate(onDiskSizeWithHeader + hdrSize, intoHeap);
1736      boolean initHFileBlockSuccess = false;
1737      try {
1738        if (headerBuf != null) {
1739          onDiskBlock.put(0, headerBuf, 0, hdrSize).position(hdrSize);
1740        }
1741        boolean readNextHeader = readAtOffset(is, onDiskBlock,
1742          onDiskSizeWithHeader - preReadHeaderSize, true, offset + preReadHeaderSize, pread);
1743        onDiskBlock.rewind(); // in case of moving position when copying a cached header
1744        int nextBlockOnDiskSize =
1745            getNextBlockOnDiskSize(readNextHeader, onDiskBlock, onDiskSizeWithHeader);
1746        if (headerBuf == null) {
1747          headerBuf = onDiskBlock.duplicate().position(0).limit(hdrSize);
1748        }
1749        // Do a few checks before we go instantiate HFileBlock.
1750        assert onDiskSizeWithHeader > this.hdrSize;
1751        verifyOnDiskSizeMatchesHeader(onDiskSizeWithHeader, headerBuf, offset, checksumSupport);
1752        ByteBuff curBlock = onDiskBlock.duplicate().position(0).limit(onDiskSizeWithHeader);
1753        // Verify checksum of the data before using it for building HFileBlock.
1754        if (verifyChecksum && !validateChecksum(offset, curBlock, hdrSize)) {
1755          return null;
1756        }
1757        long duration = System.currentTimeMillis() - startTime;
1758        if (updateMetrics) {
1759          HFile.updateReadLatency(duration, pread);
1760        }
1761        // The onDiskBlock will become the headerAndDataBuffer for this block.
1762        // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already
1763        // contains the header of next block, so no need to set next block's header in it.
1764        HFileBlock hFileBlock = createFromBuff(curBlock, checksumSupport, offset,
1765          nextBlockOnDiskSize, fileContext, intoHeap ? HEAP : allocator);
1766        // Run check on uncompressed sizings.
1767        if (!fileContext.isCompressedOrEncrypted()) {
1768          hFileBlock.sanityCheckUncompressed();
1769        }
1770        LOG.trace("Read {} in {} ns", hFileBlock, duration);
1771        // Cache next block header if we read it for the next time through here.
1772        if (nextBlockOnDiskSize != -1) {
1773          cacheNextBlockHeader(offset + hFileBlock.getOnDiskSizeWithHeader(), onDiskBlock,
1774            onDiskSizeWithHeader, hdrSize);
1775        }
1776        initHFileBlockSuccess = true;
1777        return hFileBlock;
1778      } finally {
1779        if (!initHFileBlockSuccess) {
1780          onDiskBlock.release();
1781        }
1782      }
1783    }
1784
1785    @Override
1786    public void setIncludesMemStoreTS(boolean includesMemstoreTS) {
1787      this.fileContext = new HFileContextBuilder(this.fileContext)
1788        .withIncludesMvcc(includesMemstoreTS).build();
1789    }
1790
1791    @Override
1792    public void setDataBlockEncoder(HFileDataBlockEncoder encoder) {
1793      encodedBlockDecodingCtx = encoder.newDataBlockDecodingContext(this.fileContext);
1794    }
1795
1796    @Override
1797    public HFileBlockDecodingContext getBlockDecodingContext() {
1798      return this.encodedBlockDecodingCtx;
1799    }
1800
1801    @Override
1802    public HFileBlockDecodingContext getDefaultBlockDecodingContext() {
1803      return this.defaultDecodingCtx;
1804    }
1805
1806    /**
1807     * Generates the checksum for the header as well as the data and then validates it.
1808     * If the block doesn't uses checksum, returns false.
1809     * @return True if checksum matches, else false.
1810     */
1811    private boolean validateChecksum(long offset, ByteBuff data, int hdrSize) {
1812      // If this is an older version of the block that does not have checksums, then return false
1813      // indicating that checksum verification did not succeed. Actually, this method should never
1814      // be called when the minorVersion is 0, thus this is a defensive check for a cannot-happen
1815      // case. Since this is a cannot-happen case, it is better to return false to indicate a
1816      // checksum validation failure.
1817      if (!fileContext.isUseHBaseChecksum()) {
1818        return false;
1819      }
1820      return ChecksumUtil.validateChecksum(data, pathName, offset, hdrSize);
1821    }
1822
1823    @Override
1824    public void closeStreams() throws IOException {
1825      streamWrapper.close();
1826    }
1827
1828    @Override
1829    public void unbufferStream() {
1830      // To handle concurrent reads, ensure that no other client is accessing the streams while we
1831      // unbuffer it.
1832      if (streamLock.tryLock()) {
1833        try {
1834          this.streamWrapper.unbuffer();
1835        } finally {
1836          streamLock.unlock();
1837        }
1838      }
1839    }
1840
1841    @Override
1842    public String toString() {
1843      return "hfs=" + hfs + ", path=" + pathName + ", fileContext=" + fileContext;
1844    }
1845  }
1846
1847  /** An additional sanity-check in case no compression or encryption is being used. */
1848  void sanityCheckUncompressed() throws IOException {
1849    if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader +
1850        totalChecksumBytes()) {
1851      throw new IOException("Using no compression but "
1852          + "onDiskSizeWithoutHeader=" + onDiskSizeWithoutHeader + ", "
1853          + "uncompressedSizeWithoutHeader=" + uncompressedSizeWithoutHeader
1854          + ", numChecksumbytes=" + totalChecksumBytes());
1855    }
1856  }
1857
1858  // Cacheable implementation
1859  @Override
1860  public int getSerializedLength() {
1861    if (buf != null) {
1862      // Include extra bytes for block metadata.
1863      return this.buf.limit() + BLOCK_METADATA_SPACE;
1864    }
1865    return 0;
1866  }
1867
1868  // Cacheable implementation
1869  @Override
1870  public void serialize(ByteBuffer destination, boolean includeNextBlockMetadata) {
1871    this.buf.get(destination, 0, getSerializedLength() - BLOCK_METADATA_SPACE);
1872    destination = addMetaData(destination, includeNextBlockMetadata);
1873
1874    // Make it ready for reading. flip sets position to zero and limit to current position which
1875    // is what we want if we do not want to serialize the block plus checksums if present plus
1876    // metadata.
1877    destination.flip();
1878  }
1879
1880  /**
1881   * For use by bucketcache. This exposes internals.
1882   */
1883  public ByteBuffer getMetaData() {
1884    ByteBuffer bb = ByteBuffer.allocate(BLOCK_METADATA_SPACE);
1885    bb = addMetaData(bb, true);
1886    bb.flip();
1887    return bb;
1888  }
1889
1890  /**
1891   * Adds metadata at current position (position is moved forward). Does not flip or reset.
1892   * @return The passed <code>destination</code> with metadata added.
1893   */
1894  private ByteBuffer addMetaData(final ByteBuffer destination, boolean includeNextBlockMetadata) {
1895    destination.put(this.fileContext.isUseHBaseChecksum() ? (byte) 1 : (byte) 0);
1896    destination.putLong(this.offset);
1897    if (includeNextBlockMetadata) {
1898      destination.putInt(this.nextBlockOnDiskSize);
1899    }
1900    return destination;
1901  }
1902
1903  // Cacheable implementation
1904  @Override
1905  public CacheableDeserializer<Cacheable> getDeserializer() {
1906    return HFileBlock.BLOCK_DESERIALIZER;
1907  }
1908
1909  @Override
1910  public int hashCode() {
1911    int result = 1;
1912    result = result * 31 + blockType.hashCode();
1913    result = result * 31 + nextBlockOnDiskSize;
1914    result = result * 31 + (int) (offset ^ (offset >>> 32));
1915    result = result * 31 + onDiskSizeWithoutHeader;
1916    result = result * 31 + (int) (prevBlockOffset ^ (prevBlockOffset >>> 32));
1917    result = result * 31 + uncompressedSizeWithoutHeader;
1918    result = result * 31 + buf.hashCode();
1919    return result;
1920  }
1921
1922  @Override
1923  public boolean equals(Object comparison) {
1924    if (this == comparison) {
1925      return true;
1926    }
1927    if (comparison == null) {
1928      return false;
1929    }
1930    if (!(comparison instanceof HFileBlock)) {
1931      return false;
1932    }
1933
1934    HFileBlock castedComparison = (HFileBlock) comparison;
1935
1936    if (castedComparison.blockType != this.blockType) {
1937      return false;
1938    }
1939    if (castedComparison.nextBlockOnDiskSize != this.nextBlockOnDiskSize) {
1940      return false;
1941    }
1942    // Offset is important. Needed when we have to remake cachekey when block is returned to cache.
1943    if (castedComparison.offset != this.offset) {
1944      return false;
1945    }
1946    if (castedComparison.onDiskSizeWithoutHeader != this.onDiskSizeWithoutHeader) {
1947      return false;
1948    }
1949    if (castedComparison.prevBlockOffset != this.prevBlockOffset) {
1950      return false;
1951    }
1952    if (castedComparison.uncompressedSizeWithoutHeader != this.uncompressedSizeWithoutHeader) {
1953      return false;
1954    }
1955    if (ByteBuff.compareTo(this.buf, 0, this.buf.limit(), castedComparison.buf, 0,
1956        castedComparison.buf.limit()) != 0) {
1957      return false;
1958    }
1959    return true;
1960  }
1961
1962  DataBlockEncoding getDataBlockEncoding() {
1963    if (blockType == BlockType.ENCODED_DATA) {
1964      return DataBlockEncoding.getEncodingById(getDataBlockEncodingId());
1965    }
1966    return DataBlockEncoding.NONE;
1967  }
1968
1969  byte getChecksumType() {
1970    return this.fileContext.getChecksumType().getCode();
1971  }
1972
1973  int getBytesPerChecksum() {
1974    return this.fileContext.getBytesPerChecksum();
1975  }
1976
1977  /** @return the size of data on disk + header. Excludes checksum. */
1978  int getOnDiskDataSizeWithHeader() {
1979    return this.onDiskDataSizeWithHeader;
1980  }
1981
1982  /**
1983   * Calculate the number of bytes required to store all the checksums
1984   * for this block. Each checksum value is a 4 byte integer.
1985   */
1986  int totalChecksumBytes() {
1987    // If the hfile block has minorVersion 0, then there are no checksum
1988    // data to validate. Similarly, a zero value in this.bytesPerChecksum
1989    // indicates that cached blocks do not have checksum data because
1990    // checksums were already validated when the block was read from disk.
1991    if (!fileContext.isUseHBaseChecksum() || this.fileContext.getBytesPerChecksum() == 0) {
1992      return 0;
1993    }
1994    return (int) ChecksumUtil.numBytes(onDiskDataSizeWithHeader,
1995        this.fileContext.getBytesPerChecksum());
1996  }
1997
1998  /**
1999   * Returns the size of this block header.
2000   */
2001  public int headerSize() {
2002    return headerSize(this.fileContext.isUseHBaseChecksum());
2003  }
2004
2005  /**
2006   * Maps a minor version to the size of the header.
2007   */
2008  public static int headerSize(boolean usesHBaseChecksum) {
2009    return usesHBaseChecksum?
2010        HConstants.HFILEBLOCK_HEADER_SIZE: HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
2011  }
2012
2013  /**
2014   * Return the appropriate DUMMY_HEADER for the minor version
2015   */
2016  // TODO: Why is this in here?
2017  byte[] getDummyHeaderForVersion() {
2018    return getDummyHeaderForVersion(this.fileContext.isUseHBaseChecksum());
2019  }
2020
2021  /**
2022   * Return the appropriate DUMMY_HEADER for the minor version
2023   */
2024  static private byte[] getDummyHeaderForVersion(boolean usesHBaseChecksum) {
2025    return usesHBaseChecksum? HConstants.HFILEBLOCK_DUMMY_HEADER: DUMMY_HEADER_NO_CHECKSUM;
2026  }
2027
2028  /**
2029   * @return This HFileBlocks fileContext which will a derivative of the
2030   *   fileContext for the file from which this block's data was originally read.
2031   */
2032  HFileContext getHFileContext() {
2033    return this.fileContext;
2034  }
2035
2036  /**
2037   * Convert the contents of the block header into a human readable string.
2038   * This is mostly helpful for debugging. This assumes that the block
2039   * has minor version > 0.
2040   */
2041  static String toStringHeader(ByteBuff buf) throws IOException {
2042    byte[] magicBuf = new byte[Math.min(buf.limit() - buf.position(), BlockType.MAGIC_LENGTH)];
2043    buf.get(magicBuf);
2044    BlockType bt = BlockType.parse(magicBuf, 0, BlockType.MAGIC_LENGTH);
2045    int compressedBlockSizeNoHeader = buf.getInt();
2046    int uncompressedBlockSizeNoHeader = buf.getInt();
2047    long prevBlockOffset = buf.getLong();
2048    byte cksumtype = buf.get();
2049    long bytesPerChecksum = buf.getInt();
2050    long onDiskDataSizeWithHeader = buf.getInt();
2051    return " Header dump: magic: " + Bytes.toString(magicBuf) +
2052                   " blockType " + bt +
2053                   " compressedBlockSizeNoHeader " +
2054                   compressedBlockSizeNoHeader +
2055                   " uncompressedBlockSizeNoHeader " +
2056                   uncompressedBlockSizeNoHeader +
2057                   " prevBlockOffset " + prevBlockOffset +
2058                   " checksumType " + ChecksumType.codeToType(cksumtype) +
2059                   " bytesPerChecksum " + bytesPerChecksum +
2060                   " onDiskDataSizeWithHeader " + onDiskDataSizeWithHeader;
2061  }
2062
2063  private static HFileBlockBuilder createBuilder(HFileBlock blk){
2064    return new HFileBlockBuilder()
2065          .withBlockType(blk.blockType)
2066          .withOnDiskSizeWithoutHeader(blk.onDiskSizeWithoutHeader)
2067          .withUncompressedSizeWithoutHeader(blk.uncompressedSizeWithoutHeader)
2068          .withPrevBlockOffset(blk.prevBlockOffset)
2069          .withByteBuff(blk.buf.duplicate()) // Duplicate the buffer.
2070          .withOffset(blk.offset)
2071          .withOnDiskDataSizeWithHeader(blk.onDiskDataSizeWithHeader)
2072          .withNextBlockOnDiskSize(blk.nextBlockOnDiskSize)
2073          .withHFileContext(blk.fileContext)
2074          .withByteBuffAllocator(blk.allocator)
2075          .withShared(blk.isSharedMem());
2076  }
2077
2078  static HFileBlock shallowClone(HFileBlock blk) {
2079    return createBuilder(blk).build();
2080  }
2081
2082  static HFileBlock deepCloneOnHeap(HFileBlock blk) {
2083    ByteBuff deepCloned = ByteBuff.wrap(ByteBuffer.wrap(blk.buf.toBytes(0, blk.buf.limit())));
2084    return createBuilder(blk).withByteBuff(deepCloned).withShared(false).build();
2085  }
2086}