001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import java.io.DataInputStream;
021import java.io.DataOutput;
022import java.io.DataOutputStream;
023import java.io.IOException;
024import java.io.InputStream;
025import java.nio.ByteBuffer;
026import java.util.concurrent.atomic.AtomicReference;
027import java.util.concurrent.locks.Lock;
028import java.util.concurrent.locks.ReentrantLock;
029
030import org.apache.hadoop.fs.FSDataInputStream;
031import org.apache.hadoop.fs.FSDataOutputStream;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038import org.apache.hadoop.hbase.fs.HFileSystem;
039import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
040import org.apache.hadoop.hbase.io.ByteBuffInputStream;
041import org.apache.hadoop.hbase.io.ByteBufferWriterDataOutputStream;
042import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
043import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
044import org.apache.hadoop.hbase.io.encoding.EncodingState;
045import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
046import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
047import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
048import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
049import org.apache.hadoop.hbase.nio.ByteBuff;
050import org.apache.hadoop.hbase.nio.MultiByteBuff;
051import org.apache.hadoop.hbase.nio.SingleByteBuff;
052import org.apache.hadoop.hbase.regionserver.ShipperListener;
053import org.apache.hadoop.hbase.util.Bytes;
054import org.apache.hadoop.hbase.util.ChecksumType;
055import org.apache.hadoop.hbase.util.ClassSize;
056import org.apache.hadoop.io.IOUtils;
057
058import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
059import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
060
061/**
062 * Cacheable Blocks of an {@link HFile} version 2 file.
063 * Version 2 was introduced in hbase-0.92.0.
064 *
065 * <p>Version 1 was the original file block. Version 2 was introduced when we changed the hbase file
066 * format to support multi-level block indexes and compound bloom filters (HBASE-3857). Support
067 * for Version 1 was removed in hbase-1.3.0.
068 *
069 * <h3>HFileBlock: Version 2</h3>
070 * In version 2, a block is structured as follows:
071 * <ul>
072 * <li><b>Header:</b> See Writer#putHeader() for where header is written; header total size is
073 * HFILEBLOCK_HEADER_SIZE
074 * <ul>
075 * <li>0. blockType: Magic record identifying the {@link BlockType} (8 bytes):
076 * e.g. <code>DATABLK*</code>
077 * <li>1. onDiskSizeWithoutHeader: Compressed -- a.k.a 'on disk' -- block size, excluding header,
078 * but including tailing checksum bytes (4 bytes)
079 * <li>2. uncompressedSizeWithoutHeader: Uncompressed block size, excluding header, and excluding
080 * checksum bytes (4 bytes)
081 * <li>3. prevBlockOffset: The offset of the previous block of the same type (8 bytes). This is
082 * used to navigate to the previous block without having to go to the block index
083 * <li>4: For minorVersions &gt;=1, the ordinal describing checksum type (1 byte)
084 * <li>5: For minorVersions &gt;=1, the number of data bytes/checksum chunk (4 bytes)
085 * <li>6: onDiskDataSizeWithHeader: For minorVersions &gt;=1, the size of data 'on disk', including
086 * header, excluding checksums (4 bytes)
087 * </ul>
088 * </li>
089 * <li><b>Raw/Compressed/Encrypted/Encoded data:</b> The compression
090 * algorithm is the same for all the blocks in an {@link HFile}. If compression is NONE, this is
091 * just raw, serialized Cells.
092 * <li><b>Tail:</b> For minorVersions &gt;=1, a series of 4 byte checksums, one each for
093 * the number of bytes specified by bytesPerChecksum.
094 * </ul>
095 *
096 * <h3>Caching</h3>
097 * Caches cache whole blocks with trailing checksums if any. We then tag on some metadata, the
098 * content of BLOCK_METADATA_SPACE which will be flag on if we are doing 'hbase'
099 * checksums and then the offset into the file which is needed when we re-make a cache key
100 * when we return the block to the cache as 'done'.
101 * See {@link Cacheable#serialize(ByteBuffer, boolean)} and {@link Cacheable#getDeserializer()}.
102 *
103 * <p>TODO: Should we cache the checksums? Down in Writer#getBlockForCaching(CacheConfig) where
104 * we make a block to cache-on-write, there is an attempt at turning off checksums. This is not the
105 * only place we get blocks to cache. We also will cache the raw return from an hdfs read. In this
106 * case, the checksums may be present. If the cache is backed by something that doesn't do ECC,
107 * say an SSD, we might want to preserve checksums. For now this is open question.
108 * <p>TODO: Over in BucketCache, we save a block allocation by doing a custom serialization.
109 * Be sure to change it if serialization changes in here. Could we add a method here that takes an
110 * IOEngine and that then serializes to it rather than expose our internals over in BucketCache?
111 * IOEngine is in the bucket subpackage. Pull it up? Then this class knows about bucketcache. Ugh.
112 */
113@InterfaceAudience.Private
114public class HFileBlock implements Cacheable {
115  private static final Logger LOG = LoggerFactory.getLogger(HFileBlock.class);
116
117  // Block Header fields.
118
119  // TODO: encapsulate Header related logic in this inner class.
120  static class Header {
121    // Format of header is:
122    // 8 bytes - block magic
123    // 4 bytes int - onDiskSizeWithoutHeader
124    // 4 bytes int - uncompressedSizeWithoutHeader
125    // 8 bytes long - prevBlockOffset
126    // The following 3 are only present if header contains checksum information
127    // 1 byte - checksum type
128    // 4 byte int - bytes per checksum
129    // 4 byte int - onDiskDataSizeWithHeader
130    static int BLOCK_MAGIC_INDEX = 0;
131    static int ON_DISK_SIZE_WITHOUT_HEADER_INDEX = 8;
132    static int UNCOMPRESSED_SIZE_WITHOUT_HEADER_INDEX = 12;
133    static int PREV_BLOCK_OFFSET_INDEX = 16;
134    static int CHECKSUM_TYPE_INDEX = 24;
135    static int BYTES_PER_CHECKSUM_INDEX = 25;
136    static int ON_DISK_DATA_SIZE_WITH_HEADER_INDEX = 29;
137  }
138
139  /** Type of block. Header field 0. */
140  private BlockType blockType;
141
142  /**
143   * Size on disk excluding header, including checksum. Header field 1.
144   * @see Writer#putHeader(byte[], int, int, int, int)
145   */
146  private int onDiskSizeWithoutHeader;
147
148  /**
149   * Size of pure data. Does not include header or checksums. Header field 2.
150   * @see Writer#putHeader(byte[], int, int, int, int)
151   */
152  private int uncompressedSizeWithoutHeader;
153
154  /**
155   * The offset of the previous block on disk. Header field 3.
156   * @see Writer#putHeader(byte[], int, int, int, int)
157   */
158  private long prevBlockOffset;
159
160  /**
161   * Size on disk of header + data. Excludes checksum. Header field 6,
162   * OR calculated from {@link #onDiskSizeWithoutHeader} when using HDFS checksum.
163   * @see Writer#putHeader(byte[], int, int, int, int)
164   */
165  private int onDiskDataSizeWithHeader;
166  // End of Block Header fields.
167
168  /**
169   * The in-memory representation of the hfile block. Can be on or offheap. Can be backed by
170   * a single ByteBuffer or by many. Make no assumptions.
171   *
172   * <p>Be careful reading from this <code>buf</code>. Duplicate and work on the duplicate or if
173   * not, be sure to reset position and limit else trouble down the road.
174   *
175   * <p>TODO: Make this read-only once made.
176   *
177   * <p>We are using the ByteBuff type. ByteBuffer is not extensible yet we need to be able to have
178   * a ByteBuffer-like API across multiple ByteBuffers reading from a cache such as BucketCache.
179   * So, we have this ByteBuff type. Unfortunately, it is spread all about HFileBlock. Would be
180   * good if could be confined to cache-use only but hard-to-do.
181   */
182  private ByteBuff buf;
183
184  /** Meta data that holds meta information on the hfileblock.
185   */
186  private HFileContext fileContext;
187
188  /**
189   * The offset of this block in the file. Populated by the reader for
190   * convenience of access. This offset is not part of the block header.
191   */
192  private long offset = UNSET;
193
194  private MemoryType memType = MemoryType.EXCLUSIVE;
195
196  /**
197   * The on-disk size of the next block, including the header and checksums if present.
198   * UNSET if unknown.
199   *
200   * Blocks try to carry the size of the next block to read in this data member. Usually
201   * we get block sizes from the hfile index but sometimes the index is not available:
202   * e.g. when we read the indexes themselves (indexes are stored in blocks, we do not
203   * have an index for the indexes). Saves seeks especially around file open when
204   * there is a flurry of reading in hfile metadata.
205   */
206  private int nextBlockOnDiskSize = UNSET;
207
208  /**
209   * On a checksum failure, do these many succeeding read requests using hdfs checksums before
210   * auto-reenabling hbase checksum verification.
211   */
212  static final int CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD = 3;
213
214  private static int UNSET = -1;
215  public static final boolean FILL_HEADER = true;
216  public static final boolean DONT_FILL_HEADER = false;
217
218  // How to get the estimate correctly? if it is a singleBB?
219  public static final int MULTI_BYTE_BUFFER_HEAP_SIZE =
220      (int)ClassSize.estimateBase(MultiByteBuff.class, false);
221
222  /**
223   * Space for metadata on a block that gets stored along with the block when we cache it.
224   * There are a few bytes stuck on the end of the HFileBlock that we pull in from HDFS.
225   * 8 bytes are for the offset of this block (long) in the file. Offset is important because is is
226   * used when we remake the CacheKey when we return block to the cache when done. There is also
227   * a flag on whether checksumming is being done by hbase or not. See class comment for note on
228   * uncertain state of checksumming of blocks that come out of cache (should we or should we not?).
229   * Finally there are 4 bytes to hold the length of the next block which can save a seek on
230   * occasion if available.
231   * (This EXTRA info came in with original commit of the bucketcache, HBASE-7404. It was
232   * formerly known as EXTRA_SERIALIZATION_SPACE).
233   */
234  static final int BLOCK_METADATA_SPACE = Bytes.SIZEOF_BYTE + Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT;
235
236  /**
237   * Each checksum value is an integer that can be stored in 4 bytes.
238   */
239  static final int CHECKSUM_SIZE = Bytes.SIZEOF_INT;
240
241  static final byte[] DUMMY_HEADER_NO_CHECKSUM =
242      new byte[HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM];
243
244  /**
245   * Used deserializing blocks from Cache.
246   *
247   * <code>
248   * ++++++++++++++
249   * + HFileBlock +
250   * ++++++++++++++
251   * + Checksums  + <= Optional
252   * ++++++++++++++
253   * + Metadata!  + <= See note on BLOCK_METADATA_SPACE above.
254   * ++++++++++++++
255   * </code>
256   * @see #serialize(ByteBuffer)
257   */
258  static final CacheableDeserializer<Cacheable> BLOCK_DESERIALIZER =
259      new CacheableDeserializer<Cacheable>() {
260    @Override
261    public HFileBlock deserialize(ByteBuff buf, boolean reuse, MemoryType memType)
262        throws IOException {
263      // The buf has the file block followed by block metadata.
264      // Set limit to just before the BLOCK_METADATA_SPACE then rewind.
265      buf.limit(buf.limit() - BLOCK_METADATA_SPACE).rewind();
266      // Get a new buffer to pass the HFileBlock for it to 'own'.
267      ByteBuff newByteBuff;
268      if (reuse) {
269        newByteBuff = buf.slice();
270      } else {
271        int len = buf.limit();
272        newByteBuff = new SingleByteBuff(ByteBuffer.allocate(len));
273        newByteBuff.put(0, buf, buf.position(), len);
274      }
275      // Read out the BLOCK_METADATA_SPACE content and shove into our HFileBlock.
276      buf.position(buf.limit());
277      buf.limit(buf.limit() + HFileBlock.BLOCK_METADATA_SPACE);
278      boolean usesChecksum = buf.get() == (byte) 1;
279      long offset = buf.getLong();
280      int nextBlockOnDiskSize = buf.getInt();
281      HFileBlock hFileBlock =
282          new HFileBlock(newByteBuff, usesChecksum, memType, offset, nextBlockOnDiskSize, null);
283      return hFileBlock;
284    }
285
286    @Override
287    public int getDeserialiserIdentifier() {
288      return DESERIALIZER_IDENTIFIER;
289    }
290
291    @Override
292    public HFileBlock deserialize(ByteBuff b) throws IOException {
293      // Used only in tests
294      return deserialize(b, false, MemoryType.EXCLUSIVE);
295    }
296  };
297
298  private static final int DESERIALIZER_IDENTIFIER;
299  static {
300    DESERIALIZER_IDENTIFIER =
301        CacheableDeserializerIdManager.registerDeserializer(BLOCK_DESERIALIZER);
302  }
303
304  /**
305   * Copy constructor. Creates a shallow copy of {@code that}'s buffer.
306   */
307  private HFileBlock(HFileBlock that) {
308    this(that, false);
309  }
310
311  /**
312   * Copy constructor. Creates a shallow/deep copy of {@code that}'s buffer as per the boolean
313   * param.
314   */
315  private HFileBlock(HFileBlock that, boolean bufCopy) {
316    init(that.blockType, that.onDiskSizeWithoutHeader,
317        that.uncompressedSizeWithoutHeader, that.prevBlockOffset,
318        that.offset, that.onDiskDataSizeWithHeader, that.nextBlockOnDiskSize, that.fileContext);
319    if (bufCopy) {
320      this.buf = new SingleByteBuff(ByteBuffer.wrap(that.buf.toBytes(0, that.buf.limit())));
321    } else {
322      this.buf = that.buf.duplicate();
323    }
324  }
325
326  /**
327   * Creates a new {@link HFile} block from the given fields. This constructor
328   * is used only while writing blocks and caching,
329   * and is sitting in a byte buffer and we want to stuff the block into cache.
330   * See {@link Writer#getBlockForCaching(CacheConfig)}.
331   *
332   * <p>TODO: The caller presumes no checksumming
333   * required of this block instance since going into cache; checksum already verified on
334   * underlying block data pulled in from filesystem. Is that correct? What if cache is SSD?
335   *
336   * @param blockType the type of this block, see {@link BlockType}
337   * @param onDiskSizeWithoutHeader see {@link #onDiskSizeWithoutHeader}
338   * @param uncompressedSizeWithoutHeader see {@link #uncompressedSizeWithoutHeader}
339   * @param prevBlockOffset see {@link #prevBlockOffset}
340   * @param b block header ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes)
341   * @param fillHeader when true, write the first 4 header fields into passed buffer.
342   * @param offset the file offset the block was read from
343   * @param onDiskDataSizeWithHeader see {@link #onDiskDataSizeWithHeader}
344   * @param fileContext HFile meta data
345   */
346  @VisibleForTesting
347  public HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader,
348      int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuffer b, boolean fillHeader,
349      long offset, final int nextBlockOnDiskSize, int onDiskDataSizeWithHeader,
350      HFileContext fileContext) {
351    init(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader,
352        prevBlockOffset, offset, onDiskDataSizeWithHeader, nextBlockOnDiskSize, fileContext);
353    this.buf = new SingleByteBuff(b);
354    if (fillHeader) {
355      overwriteHeader();
356    }
357    this.buf.rewind();
358  }
359
360  /**
361   * Creates a block from an existing buffer starting with a header. Rewinds
362   * and takes ownership of the buffer. By definition of rewind, ignores the
363   * buffer position, but if you slice the buffer beforehand, it will rewind
364   * to that point.
365   * @param buf Has header, content, and trailing checksums if present.
366   */
367  HFileBlock(ByteBuff buf, boolean usesHBaseChecksum, MemoryType memType, final long offset,
368      final int nextBlockOnDiskSize, HFileContext fileContext) throws IOException {
369    buf.rewind();
370    final BlockType blockType = BlockType.read(buf);
371    final int onDiskSizeWithoutHeader = buf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX);
372    final int uncompressedSizeWithoutHeader =
373        buf.getInt(Header.UNCOMPRESSED_SIZE_WITHOUT_HEADER_INDEX);
374    final long prevBlockOffset = buf.getLong(Header.PREV_BLOCK_OFFSET_INDEX);
375    // This constructor is called when we deserialize a block from cache and when we read a block in
376    // from the fs. fileCache is null when deserialized from cache so need to make up one.
377    HFileContextBuilder fileContextBuilder = fileContext != null?
378        new HFileContextBuilder(fileContext): new HFileContextBuilder();
379    fileContextBuilder.withHBaseCheckSum(usesHBaseChecksum);
380    int onDiskDataSizeWithHeader;
381    if (usesHBaseChecksum) {
382      byte checksumType = buf.get(Header.CHECKSUM_TYPE_INDEX);
383      int bytesPerChecksum = buf.getInt(Header.BYTES_PER_CHECKSUM_INDEX);
384      onDiskDataSizeWithHeader = buf.getInt(Header.ON_DISK_DATA_SIZE_WITH_HEADER_INDEX);
385      // Use the checksum type and bytes per checksum from header, not from filecontext.
386      fileContextBuilder.withChecksumType(ChecksumType.codeToType(checksumType));
387      fileContextBuilder.withBytesPerCheckSum(bytesPerChecksum);
388    } else {
389      fileContextBuilder.withChecksumType(ChecksumType.NULL);
390      fileContextBuilder.withBytesPerCheckSum(0);
391      // Need to fix onDiskDataSizeWithHeader; there are not checksums after-block-data
392      onDiskDataSizeWithHeader = onDiskSizeWithoutHeader + headerSize(usesHBaseChecksum);
393    }
394    fileContext = fileContextBuilder.build();
395    assert usesHBaseChecksum == fileContext.isUseHBaseChecksum();
396    init(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader,
397        prevBlockOffset, offset, onDiskDataSizeWithHeader, nextBlockOnDiskSize, fileContext);
398    this.memType = memType;
399    this.offset = offset;
400    this.buf = buf;
401    this.buf.rewind();
402  }
403
404  /**
405   * Called from constructors.
406   */
407  private void init(BlockType blockType, int onDiskSizeWithoutHeader,
408      int uncompressedSizeWithoutHeader, long prevBlockOffset,
409      long offset, int onDiskDataSizeWithHeader, final int nextBlockOnDiskSize,
410      HFileContext fileContext) {
411    this.blockType = blockType;
412    this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader;
413    this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader;
414    this.prevBlockOffset = prevBlockOffset;
415    this.offset = offset;
416    this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader;
417    this.nextBlockOnDiskSize = nextBlockOnDiskSize;
418    this.fileContext = fileContext;
419  }
420
421  /**
422   * Parse total on disk size including header and checksum.
423   * @param headerBuf Header ByteBuffer. Presumed exact size of header.
424   * @param verifyChecksum true if checksum verification is in use.
425   * @return Size of the block with header included.
426   */
427  private static int getOnDiskSizeWithHeader(final ByteBuffer headerBuf,
428      boolean verifyChecksum) {
429    return headerBuf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX) +
430      headerSize(verifyChecksum);
431  }
432
433  /**
434   * @return the on-disk size of the next block (including the header size and any checksums if
435   * present) read by peeking into the next block's header; use as a hint when doing
436   * a read of the next block when scanning or running over a file.
437   */
438  int getNextBlockOnDiskSize() {
439    return nextBlockOnDiskSize;
440  }
441
442  @Override
443  public BlockType getBlockType() {
444    return blockType;
445  }
446
447  /** @return get data block encoding id that was used to encode this block */
448  short getDataBlockEncodingId() {
449    if (blockType != BlockType.ENCODED_DATA) {
450      throw new IllegalArgumentException("Querying encoder ID of a block " +
451          "of type other than " + BlockType.ENCODED_DATA + ": " + blockType);
452    }
453    return buf.getShort(headerSize());
454  }
455
456  /**
457   * @return the on-disk size of header + data part + checksum.
458   */
459  public int getOnDiskSizeWithHeader() {
460    return onDiskSizeWithoutHeader + headerSize();
461  }
462
463  /**
464   * @return the on-disk size of the data part + checksum (header excluded).
465   */
466  int getOnDiskSizeWithoutHeader() {
467    return onDiskSizeWithoutHeader;
468  }
469
470  /**
471   * @return the uncompressed size of data part (header and checksum excluded).
472   */
473   int getUncompressedSizeWithoutHeader() {
474    return uncompressedSizeWithoutHeader;
475  }
476
477  /**
478   * @return the offset of the previous block of the same type in the file, or
479   *         -1 if unknown
480   */
481  long getPrevBlockOffset() {
482    return prevBlockOffset;
483  }
484
485  /**
486   * Rewinds {@code buf} and writes first 4 header fields. {@code buf} position
487   * is modified as side-effect.
488   */
489  private void overwriteHeader() {
490    buf.rewind();
491    blockType.write(buf);
492    buf.putInt(onDiskSizeWithoutHeader);
493    buf.putInt(uncompressedSizeWithoutHeader);
494    buf.putLong(prevBlockOffset);
495    if (this.fileContext.isUseHBaseChecksum()) {
496      buf.put(fileContext.getChecksumType().getCode());
497      buf.putInt(fileContext.getBytesPerChecksum());
498      buf.putInt(onDiskDataSizeWithHeader);
499    }
500  }
501
502  /**
503   * Returns a buffer that does not include the header or checksum.
504   *
505   * @return the buffer with header skipped and checksum omitted.
506   */
507  public ByteBuff getBufferWithoutHeader() {
508    ByteBuff dup = getBufferReadOnly();
509    // Now set it up so Buffer spans content only -- no header or no checksums.
510    return dup.position(headerSize()).limit(buf.limit() - totalChecksumBytes()).slice();
511  }
512
513  /**
514   * Returns a read-only duplicate of the buffer this block stores internally ready to be read.
515   * Clients must not modify the buffer object though they may set position and limit on the
516   * returned buffer since we pass back a duplicate. This method has to be public because it is used
517   * in {@link CompoundBloomFilter} to avoid object creation on every Bloom
518   * filter lookup, but has to be used with caution. Buffer holds header, block content,
519   * and any follow-on checksums if present.
520   *
521   * @return the buffer of this block for read-only operations
522   */
523  public ByteBuff getBufferReadOnly() {
524    // TODO: ByteBuf does not support asReadOnlyBuffer(). Fix.
525    ByteBuff dup = this.buf.duplicate();
526    assert dup.position() == 0;
527    return dup;
528  }
529
530  @VisibleForTesting
531  private void sanityCheckAssertion(long valueFromBuf, long valueFromField,
532      String fieldName) throws IOException {
533    if (valueFromBuf != valueFromField) {
534      throw new AssertionError(fieldName + " in the buffer (" + valueFromBuf
535          + ") is different from that in the field (" + valueFromField + ")");
536    }
537  }
538
539  @VisibleForTesting
540  private void sanityCheckAssertion(BlockType valueFromBuf, BlockType valueFromField)
541      throws IOException {
542    if (valueFromBuf != valueFromField) {
543      throw new IOException("Block type stored in the buffer: " +
544        valueFromBuf + ", block type field: " + valueFromField);
545    }
546  }
547
548  /**
549   * Checks if the block is internally consistent, i.e. the first
550   * {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the buffer contain a
551   * valid header consistent with the fields. Assumes a packed block structure.
552   * This function is primary for testing and debugging, and is not
553   * thread-safe, because it alters the internal buffer pointer.
554   * Used by tests only.
555   */
556  @VisibleForTesting
557  void sanityCheck() throws IOException {
558    // Duplicate so no side-effects
559    ByteBuff dup = this.buf.duplicate().rewind();
560    sanityCheckAssertion(BlockType.read(dup), blockType);
561
562    sanityCheckAssertion(dup.getInt(), onDiskSizeWithoutHeader, "onDiskSizeWithoutHeader");
563
564    sanityCheckAssertion(dup.getInt(), uncompressedSizeWithoutHeader,
565        "uncompressedSizeWithoutHeader");
566
567    sanityCheckAssertion(dup.getLong(), prevBlockOffset, "prevBlockOffset");
568    if (this.fileContext.isUseHBaseChecksum()) {
569      sanityCheckAssertion(dup.get(), this.fileContext.getChecksumType().getCode(), "checksumType");
570      sanityCheckAssertion(dup.getInt(), this.fileContext.getBytesPerChecksum(),
571          "bytesPerChecksum");
572      sanityCheckAssertion(dup.getInt(), onDiskDataSizeWithHeader, "onDiskDataSizeWithHeader");
573    }
574
575    int cksumBytes = totalChecksumBytes();
576    int expectedBufLimit = onDiskDataSizeWithHeader + cksumBytes;
577    if (dup.limit() != expectedBufLimit) {
578      throw new AssertionError("Expected limit " + expectedBufLimit + ", got " + dup.limit());
579    }
580
581    // We might optionally allocate HFILEBLOCK_HEADER_SIZE more bytes to read the next
582    // block's header, so there are two sensible values for buffer capacity.
583    int hdrSize = headerSize();
584    if (dup.capacity() != expectedBufLimit && dup.capacity() != expectedBufLimit + hdrSize) {
585      throw new AssertionError("Invalid buffer capacity: " + dup.capacity() +
586          ", expected " + expectedBufLimit + " or " + (expectedBufLimit + hdrSize));
587    }
588  }
589
590  @Override
591  public String toString() {
592    StringBuilder sb = new StringBuilder()
593      .append("[")
594      .append("blockType=").append(blockType)
595      .append(", fileOffset=").append(offset)
596      .append(", headerSize=").append(headerSize())
597      .append(", onDiskSizeWithoutHeader=").append(onDiskSizeWithoutHeader)
598      .append(", uncompressedSizeWithoutHeader=").append(uncompressedSizeWithoutHeader)
599      .append(", prevBlockOffset=").append(prevBlockOffset)
600      .append(", isUseHBaseChecksum=").append(fileContext.isUseHBaseChecksum());
601    if (fileContext.isUseHBaseChecksum()) {
602      sb.append(", checksumType=").append(ChecksumType.codeToType(this.buf.get(24)))
603        .append(", bytesPerChecksum=").append(this.buf.getInt(24 + 1))
604        .append(", onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader);
605    } else {
606      sb.append(", onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader)
607        .append("(").append(onDiskSizeWithoutHeader)
608        .append("+").append(HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM).append(")");
609    }
610    String dataBegin = null;
611    if (buf.hasArray()) {
612      dataBegin = Bytes.toStringBinary(buf.array(), buf.arrayOffset() + headerSize(),
613          Math.min(32, buf.limit() - buf.arrayOffset() - headerSize()));
614    } else {
615      ByteBuff bufWithoutHeader = getBufferWithoutHeader();
616      byte[] dataBeginBytes = new byte[Math.min(32,
617          bufWithoutHeader.limit() - bufWithoutHeader.position())];
618      bufWithoutHeader.get(dataBeginBytes);
619      dataBegin = Bytes.toStringBinary(dataBeginBytes);
620    }
621    sb.append(", getOnDiskSizeWithHeader=").append(getOnDiskSizeWithHeader())
622      .append(", totalChecksumBytes=").append(totalChecksumBytes())
623      .append(", isUnpacked=").append(isUnpacked())
624      .append(", buf=[").append(buf).append("]")
625      .append(", dataBeginsWith=").append(dataBegin)
626      .append(", fileContext=").append(fileContext)
627      .append(", nextBlockOnDiskSize=").append(nextBlockOnDiskSize)
628      .append("]");
629    return sb.toString();
630  }
631
632  /**
633   * Retrieves the decompressed/decrypted view of this block. An encoded block remains in its
634   * encoded structure. Internal structures are shared between instances where applicable.
635   */
636  HFileBlock unpack(HFileContext fileContext, FSReader reader) throws IOException {
637    if (!fileContext.isCompressedOrEncrypted()) {
638      // TODO: cannot use our own fileContext here because HFileBlock(ByteBuffer, boolean),
639      // which is used for block serialization to L2 cache, does not preserve encoding and
640      // encryption details.
641      return this;
642    }
643
644    HFileBlock unpacked = new HFileBlock(this);
645    unpacked.allocateBuffer(); // allocates space for the decompressed block
646
647    HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA ?
648      reader.getBlockDecodingContext() : reader.getDefaultBlockDecodingContext();
649
650    ByteBuff dup = this.buf.duplicate();
651    dup.position(this.headerSize());
652    dup = dup.slice();
653    ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(),
654      unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(),
655      dup);
656    return unpacked;
657  }
658
659  /**
660   * Always allocates a new buffer of the correct size. Copies header bytes
661   * from the existing buffer. Does not change header fields.
662   * Reserve room to keep checksum bytes too.
663   */
664  private void allocateBuffer() {
665    int cksumBytes = totalChecksumBytes();
666    int headerSize = headerSize();
667    int capacityNeeded = headerSize + uncompressedSizeWithoutHeader + cksumBytes;
668
669    // TODO we need consider allocating offheap here?
670    ByteBuffer newBuf = ByteBuffer.allocate(capacityNeeded);
671
672    // Copy header bytes into newBuf.
673    // newBuf is HBB so no issue in calling array()
674    buf.position(0);
675    buf.get(newBuf.array(), newBuf.arrayOffset(), headerSize);
676
677    buf = new SingleByteBuff(newBuf);
678    // set limit to exclude next block's header
679    buf.limit(headerSize + uncompressedSizeWithoutHeader + cksumBytes);
680  }
681
682  /**
683   * Return true when this block's buffer has been unpacked, false otherwise. Note this is a
684   * calculated heuristic, not tracked attribute of the block.
685   */
686  public boolean isUnpacked() {
687    final int cksumBytes = totalChecksumBytes();
688    final int headerSize = headerSize();
689    final int expectedCapacity = headerSize + uncompressedSizeWithoutHeader + cksumBytes;
690    final int bufCapacity = buf.capacity();
691    return bufCapacity == expectedCapacity || bufCapacity == expectedCapacity + headerSize;
692  }
693
694  /** An additional sanity-check in case no compression or encryption is being used. */
695  @VisibleForTesting
696  void sanityCheckUncompressedSize() throws IOException {
697    if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader + totalChecksumBytes()) {
698      throw new IOException("Using no compression but "
699          + "onDiskSizeWithoutHeader=" + onDiskSizeWithoutHeader + ", "
700          + "uncompressedSizeWithoutHeader=" + uncompressedSizeWithoutHeader
701          + ", numChecksumbytes=" + totalChecksumBytes());
702    }
703  }
704
705  /**
706   * Cannot be {@link #UNSET}. Must be a legitimate value. Used re-making the {@link BlockCacheKey} when
707   * block is returned to the cache.
708   * @return the offset of this block in the file it was read from
709   */
710  long getOffset() {
711    if (offset < 0) {
712      throw new IllegalStateException("HFile block offset not initialized properly");
713    }
714    return offset;
715  }
716
717  /**
718   * @return a byte stream reading the data + checksum of this block
719   */
720  DataInputStream getByteStream() {
721    ByteBuff dup = this.buf.duplicate();
722    dup.position(this.headerSize());
723    return new DataInputStream(new ByteBuffInputStream(dup));
724  }
725
726  @Override
727  public long heapSize() {
728    long size = ClassSize.align(
729        ClassSize.OBJECT +
730        // Block type, multi byte buffer, MemoryType and meta references
731        4 * ClassSize.REFERENCE +
732        // On-disk size, uncompressed size, and next block's on-disk size
733        // bytePerChecksum and onDiskDataSize
734        4 * Bytes.SIZEOF_INT +
735        // This and previous block offset
736        2 * Bytes.SIZEOF_LONG +
737        // Heap size of the meta object. meta will be always not null.
738        fileContext.heapSize()
739    );
740
741    if (buf != null) {
742      // Deep overhead of the byte buffer. Needs to be aligned separately.
743      size += ClassSize.align(buf.capacity() + MULTI_BYTE_BUFFER_HEAP_SIZE);
744    }
745
746    return ClassSize.align(size);
747  }
748
749  /**
750   * Read from an input stream at least <code>necessaryLen</code> and if possible,
751   * <code>extraLen</code> also if available. Analogous to
752   * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but specifies a
753   * number of "extra" bytes to also optionally read.
754   *
755   * @param in the input stream to read from
756   * @param buf the buffer to read into
757   * @param bufOffset the destination offset in the buffer
758   * @param necessaryLen the number of bytes that are absolutely necessary to read
759   * @param extraLen the number of extra bytes that would be nice to read
760   * @return true if succeeded reading the extra bytes
761   * @throws IOException if failed to read the necessary bytes
762   */
763  static boolean readWithExtra(InputStream in, byte[] buf,
764      int bufOffset, int necessaryLen, int extraLen) throws IOException {
765    int bytesRemaining = necessaryLen + extraLen;
766    while (bytesRemaining > 0) {
767      int ret = in.read(buf, bufOffset, bytesRemaining);
768      if (ret == -1 && bytesRemaining <= extraLen) {
769        // We could not read the "extra data", but that is OK.
770        break;
771      }
772      if (ret < 0) {
773        throw new IOException("Premature EOF from inputStream (read "
774            + "returned " + ret + ", was trying to read " + necessaryLen
775            + " necessary bytes and " + extraLen + " extra bytes, "
776            + "successfully read "
777            + (necessaryLen + extraLen - bytesRemaining));
778      }
779      bufOffset += ret;
780      bytesRemaining -= ret;
781    }
782    return bytesRemaining <= 0;
783  }
784
785  /**
786   * Read from an input stream at least <code>necessaryLen</code> and if possible,
787   * <code>extraLen</code> also if available. Analogous to
788   * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but uses
789   * positional read and specifies a number of "extra" bytes that would be
790   * desirable but not absolutely necessary to read.
791   *
792   * @param in the input stream to read from
793   * @param position the position within the stream from which to start reading
794   * @param buf the buffer to read into
795   * @param bufOffset the destination offset in the buffer
796   * @param necessaryLen the number of bytes that are absolutely necessary to
797   *     read
798   * @param extraLen the number of extra bytes that would be nice to read
799   * @return true if and only if extraLen is > 0 and reading those extra bytes
800   *     was successful
801   * @throws IOException if failed to read the necessary bytes
802   */
803  @VisibleForTesting
804  static boolean positionalReadWithExtra(FSDataInputStream in,
805      long position, byte[] buf, int bufOffset, int necessaryLen, int extraLen)
806      throws IOException {
807    int bytesRemaining = necessaryLen + extraLen;
808    int bytesRead = 0;
809    while (bytesRead < necessaryLen) {
810      int ret = in.read(position, buf, bufOffset, bytesRemaining);
811      if (ret < 0) {
812        throw new IOException("Premature EOF from inputStream (positional read "
813            + "returned " + ret + ", was trying to read " + necessaryLen
814            + " necessary bytes and " + extraLen + " extra bytes, "
815            + "successfully read " + bytesRead);
816      }
817      position += ret;
818      bufOffset += ret;
819      bytesRemaining -= ret;
820      bytesRead += ret;
821    }
822    return bytesRead != necessaryLen && bytesRemaining <= 0;
823  }
824
825  /**
826   * Unified version 2 {@link HFile} block writer. The intended usage pattern
827   * is as follows:
828   * <ol>
829   * <li>Construct an {@link HFileBlock.Writer}, providing a compression algorithm.
830   * <li>Call {@link Writer#startWriting} and get a data stream to write to.
831   * <li>Write your data into the stream.
832   * <li>Call Writer#writeHeaderAndData(FSDataOutputStream) as many times as you need to.
833   * store the serialized block into an external stream.
834   * <li>Repeat to write more blocks.
835   * </ol>
836   * <p>
837   */
838  static class Writer implements ShipperListener {
839    private enum State {
840      INIT,
841      WRITING,
842      BLOCK_READY
843    };
844
845    /** Writer state. Used to ensure the correct usage protocol. */
846    private State state = State.INIT;
847
848    /** Data block encoder used for data blocks */
849    private final HFileDataBlockEncoder dataBlockEncoder;
850
851    private HFileBlockEncodingContext dataBlockEncodingCtx;
852
853    /** block encoding context for non-data blocks*/
854    private HFileBlockDefaultEncodingContext defaultBlockEncodingCtx;
855
856    /**
857     * The stream we use to accumulate data into a block in an uncompressed format.
858     * We reset this stream at the end of each block and reuse it. The
859     * header is written as the first {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes into this
860     * stream.
861     */
862    private ByteArrayOutputStream baosInMemory;
863
864    /**
865     * Current block type. Set in {@link #startWriting(BlockType)}. Could be
866     * changed in {@link #finishBlock()} from {@link BlockType#DATA}
867     * to {@link BlockType#ENCODED_DATA}.
868     */
869    private BlockType blockType;
870
871    /**
872     * A stream that we write uncompressed bytes to, which compresses them and
873     * writes them to {@link #baosInMemory}.
874     */
875    private DataOutputStream userDataStream;
876
877    // Size of actual data being written. Not considering the block encoding/compression. This
878    // includes the header size also.
879    private int unencodedDataSizeWritten;
880
881    // Size of actual data being written. considering the block encoding. This
882    // includes the header size also.
883    private int encodedDataSizeWritten;
884
885    /**
886     * Bytes to be written to the file system, including the header. Compressed
887     * if compression is turned on. It also includes the checksum data that
888     * immediately follows the block data. (header + data + checksums)
889     */
890    private ByteArrayOutputStream onDiskBlockBytesWithHeader;
891
892    /**
893     * The size of the checksum data on disk. It is used only if data is
894     * not compressed. If data is compressed, then the checksums are already
895     * part of onDiskBytesWithHeader. If data is uncompressed, then this
896     * variable stores the checksum data for this block.
897     */
898    private byte[] onDiskChecksum = HConstants.EMPTY_BYTE_ARRAY;
899
900    /**
901     * Current block's start offset in the {@link HFile}. Set in
902     * {@link #writeHeaderAndData(FSDataOutputStream)}.
903     */
904    private long startOffset;
905
906    /**
907     * Offset of previous block by block type. Updated when the next block is
908     * started.
909     */
910    private long[] prevOffsetByType;
911
912    /** The offset of the previous block of the same type */
913    private long prevOffset;
914    /** Meta data that holds information about the hfileblock**/
915    private HFileContext fileContext;
916
917    @Override
918    public void beforeShipped() {
919      if (getEncodingState() != null) {
920        getEncodingState().beforeShipped();
921      }
922    }
923
924    EncodingState getEncodingState() {
925      return dataBlockEncodingCtx.getEncodingState();
926    }
927
928    /**
929     * @param dataBlockEncoder data block encoding algorithm to use
930     */
931    public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext fileContext) {
932      if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) {
933        throw new RuntimeException("Unsupported value of bytesPerChecksum. " +
934            " Minimum is " + HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is " +
935            fileContext.getBytesPerChecksum());
936      }
937      this.dataBlockEncoder = dataBlockEncoder != null?
938          dataBlockEncoder: NoOpDataBlockEncoder.INSTANCE;
939      this.dataBlockEncodingCtx = this.dataBlockEncoder.
940          newDataBlockEncodingContext(HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
941      // TODO: This should be lazily instantiated since we usually do NOT need this default encoder
942      this.defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(null,
943          HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
944      // TODO: Set BAOS initial size. Use fileContext.getBlocksize() and add for header/checksum
945      baosInMemory = new ByteArrayOutputStream();
946      prevOffsetByType = new long[BlockType.values().length];
947      for (int i = 0; i < prevOffsetByType.length; ++i) {
948        prevOffsetByType[i] = UNSET;
949      }
950      // TODO: Why fileContext saved away when we have dataBlockEncoder and/or
951      // defaultDataBlockEncoder?
952      this.fileContext = fileContext;
953    }
954
955    /**
956     * Starts writing into the block. The previous block's data is discarded.
957     *
958     * @return the stream the user can write their data into
959     * @throws IOException
960     */
961    DataOutputStream startWriting(BlockType newBlockType)
962        throws IOException {
963      if (state == State.BLOCK_READY && startOffset != -1) {
964        // We had a previous block that was written to a stream at a specific
965        // offset. Save that offset as the last offset of a block of that type.
966        prevOffsetByType[blockType.getId()] = startOffset;
967      }
968
969      startOffset = -1;
970      blockType = newBlockType;
971
972      baosInMemory.reset();
973      baosInMemory.write(HConstants.HFILEBLOCK_DUMMY_HEADER);
974
975      state = State.WRITING;
976
977      // We will compress it later in finishBlock()
978      userDataStream = new ByteBufferWriterDataOutputStream(baosInMemory);
979      if (newBlockType == BlockType.DATA) {
980        this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream);
981      }
982      this.unencodedDataSizeWritten = 0;
983      this.encodedDataSizeWritten = 0;
984      return userDataStream;
985    }
986
987    /**
988     * Writes the Cell to this block
989     * @param cell
990     * @throws IOException
991     */
992    void write(Cell cell) throws IOException{
993      expectState(State.WRITING);
994      int posBeforeEncode = this.userDataStream.size();
995      this.unencodedDataSizeWritten +=
996          this.dataBlockEncoder.encode(cell, dataBlockEncodingCtx, this.userDataStream);
997      this.encodedDataSizeWritten += this.userDataStream.size() - posBeforeEncode;
998    }
999
1000    /**
1001     * Returns the stream for the user to write to. The block writer takes care
1002     * of handling compression and buffering for caching on write. Can only be
1003     * called in the "writing" state.
1004     *
1005     * @return the data output stream for the user to write to
1006     */
1007    DataOutputStream getUserDataStream() {
1008      expectState(State.WRITING);
1009      return userDataStream;
1010    }
1011
1012    /**
1013     * Transitions the block writer from the "writing" state to the "block
1014     * ready" state.  Does nothing if a block is already finished.
1015     */
1016    void ensureBlockReady() throws IOException {
1017      Preconditions.checkState(state != State.INIT,
1018          "Unexpected state: " + state);
1019
1020      if (state == State.BLOCK_READY) {
1021        return;
1022      }
1023
1024      // This will set state to BLOCK_READY.
1025      finishBlock();
1026    }
1027
1028    /**
1029     * Finish up writing of the block.
1030     * Flushes the compressing stream (if using compression), fills out the header,
1031     * does any compression/encryption of bytes to flush out to disk, and manages
1032     * the cache on write content, if applicable. Sets block write state to "block ready".
1033     */
1034    private void finishBlock() throws IOException {
1035      if (blockType == BlockType.DATA) {
1036        this.dataBlockEncoder.endBlockEncoding(dataBlockEncodingCtx, userDataStream,
1037            baosInMemory.getBuffer(), blockType);
1038        blockType = dataBlockEncodingCtx.getBlockType();
1039      }
1040      userDataStream.flush();
1041      prevOffset = prevOffsetByType[blockType.getId()];
1042
1043      // We need to set state before we can package the block up for cache-on-write. In a way, the
1044      // block is ready, but not yet encoded or compressed.
1045      state = State.BLOCK_READY;
1046      Bytes compressAndEncryptDat;
1047      if (blockType == BlockType.DATA || blockType == BlockType.ENCODED_DATA) {
1048        compressAndEncryptDat = dataBlockEncodingCtx.
1049            compressAndEncrypt(baosInMemory.getBuffer(), 0, baosInMemory.size());
1050      } else {
1051        compressAndEncryptDat = defaultBlockEncodingCtx.
1052            compressAndEncrypt(baosInMemory.getBuffer(), 0, baosInMemory.size());
1053      }
1054      if (compressAndEncryptDat == null) {
1055        compressAndEncryptDat = new Bytes(baosInMemory.getBuffer(), 0, baosInMemory.size());
1056      }
1057      if (onDiskBlockBytesWithHeader == null) {
1058        onDiskBlockBytesWithHeader = new ByteArrayOutputStream(compressAndEncryptDat.getLength());
1059      }
1060      onDiskBlockBytesWithHeader.reset();
1061      onDiskBlockBytesWithHeader.write(compressAndEncryptDat.get(),
1062            compressAndEncryptDat.getOffset(), compressAndEncryptDat.getLength());
1063      // Calculate how many bytes we need for checksum on the tail of the block.
1064      int numBytes = (int) ChecksumUtil.numBytes(
1065          onDiskBlockBytesWithHeader.size(),
1066          fileContext.getBytesPerChecksum());
1067
1068      // Put the header for the on disk bytes; header currently is unfilled-out
1069      putHeader(onDiskBlockBytesWithHeader,
1070          onDiskBlockBytesWithHeader.size() + numBytes,
1071          baosInMemory.size(), onDiskBlockBytesWithHeader.size());
1072      if (onDiskChecksum.length != numBytes) {
1073        onDiskChecksum = new byte[numBytes];
1074      }
1075      ChecksumUtil.generateChecksums(
1076          onDiskBlockBytesWithHeader.getBuffer(), 0,onDiskBlockBytesWithHeader.size(),
1077          onDiskChecksum, 0, fileContext.getChecksumType(), fileContext.getBytesPerChecksum());
1078    }
1079
1080    /**
1081     * Put the header into the given byte array at the given offset.
1082     * @param onDiskSize size of the block on disk header + data + checksum
1083     * @param uncompressedSize size of the block after decompression (but
1084     *          before optional data block decoding) including header
1085     * @param onDiskDataSize size of the block on disk with header
1086     *        and data but not including the checksums
1087     */
1088    private void putHeader(byte[] dest, int offset, int onDiskSize,
1089        int uncompressedSize, int onDiskDataSize) {
1090      offset = blockType.put(dest, offset);
1091      offset = Bytes.putInt(dest, offset, onDiskSize - HConstants.HFILEBLOCK_HEADER_SIZE);
1092      offset = Bytes.putInt(dest, offset, uncompressedSize - HConstants.HFILEBLOCK_HEADER_SIZE);
1093      offset = Bytes.putLong(dest, offset, prevOffset);
1094      offset = Bytes.putByte(dest, offset, fileContext.getChecksumType().getCode());
1095      offset = Bytes.putInt(dest, offset, fileContext.getBytesPerChecksum());
1096      Bytes.putInt(dest, offset, onDiskDataSize);
1097    }
1098
1099    private void putHeader(ByteArrayOutputStream dest, int onDiskSize,
1100        int uncompressedSize, int onDiskDataSize) {
1101      putHeader(dest.getBuffer(),0, onDiskSize, uncompressedSize, onDiskDataSize);
1102    }
1103
1104    /**
1105     * Similar to {@link #writeHeaderAndData(FSDataOutputStream)}, but records
1106     * the offset of this block so that it can be referenced in the next block
1107     * of the same type.
1108     *
1109     * @param out
1110     * @throws IOException
1111     */
1112    void writeHeaderAndData(FSDataOutputStream out) throws IOException {
1113      long offset = out.getPos();
1114      if (startOffset != UNSET && offset != startOffset) {
1115        throw new IOException("A " + blockType + " block written to a "
1116            + "stream twice, first at offset " + startOffset + ", then at "
1117            + offset);
1118      }
1119      startOffset = offset;
1120
1121      finishBlockAndWriteHeaderAndData((DataOutputStream) out);
1122    }
1123
1124    /**
1125     * Writes the header and the compressed data of this block (or uncompressed
1126     * data when not using compression) into the given stream. Can be called in
1127     * the "writing" state or in the "block ready" state. If called in the
1128     * "writing" state, transitions the writer to the "block ready" state.
1129     *
1130     * @param out the output stream to write the
1131     * @throws IOException
1132     */
1133    protected void finishBlockAndWriteHeaderAndData(DataOutputStream out)
1134      throws IOException {
1135      ensureBlockReady();
1136      long startTime = System.currentTimeMillis();
1137      out.write(onDiskBlockBytesWithHeader.getBuffer(), 0, onDiskBlockBytesWithHeader.size());
1138      out.write(onDiskChecksum);
1139      HFile.updateWriteLatency(System.currentTimeMillis() - startTime);
1140    }
1141
1142    /**
1143     * Returns the header or the compressed data (or uncompressed data when not
1144     * using compression) as a byte array. Can be called in the "writing" state
1145     * or in the "block ready" state. If called in the "writing" state,
1146     * transitions the writer to the "block ready" state. This returns
1147     * the header + data + checksums stored on disk.
1148     *
1149     * @return header and data as they would be stored on disk in a byte array
1150     * @throws IOException
1151     */
1152    byte[] getHeaderAndDataForTest() throws IOException {
1153      ensureBlockReady();
1154      // This is not very optimal, because we are doing an extra copy.
1155      // But this method is used only by unit tests.
1156      byte[] output =
1157          new byte[onDiskBlockBytesWithHeader.size()
1158              + onDiskChecksum.length];
1159      System.arraycopy(onDiskBlockBytesWithHeader.getBuffer(), 0, output, 0,
1160          onDiskBlockBytesWithHeader.size());
1161      System.arraycopy(onDiskChecksum, 0, output,
1162          onDiskBlockBytesWithHeader.size(), onDiskChecksum.length);
1163      return output;
1164    }
1165
1166    /**
1167     * Releases resources used by this writer.
1168     */
1169    void release() {
1170      if (dataBlockEncodingCtx != null) {
1171        dataBlockEncodingCtx.close();
1172        dataBlockEncodingCtx = null;
1173      }
1174      if (defaultBlockEncodingCtx != null) {
1175        defaultBlockEncodingCtx.close();
1176        defaultBlockEncodingCtx = null;
1177      }
1178    }
1179
1180    /**
1181     * Returns the on-disk size of the data portion of the block. This is the
1182     * compressed size if compression is enabled. Can only be called in the
1183     * "block ready" state. Header is not compressed, and its size is not
1184     * included in the return value.
1185     *
1186     * @return the on-disk size of the block, not including the header.
1187     */
1188    int getOnDiskSizeWithoutHeader() {
1189      expectState(State.BLOCK_READY);
1190      return onDiskBlockBytesWithHeader.size() +
1191          onDiskChecksum.length - HConstants.HFILEBLOCK_HEADER_SIZE;
1192    }
1193
1194    /**
1195     * Returns the on-disk size of the block. Can only be called in the
1196     * "block ready" state.
1197     *
1198     * @return the on-disk size of the block ready to be written, including the
1199     *         header size, the data and the checksum data.
1200     */
1201    int getOnDiskSizeWithHeader() {
1202      expectState(State.BLOCK_READY);
1203      return onDiskBlockBytesWithHeader.size() + onDiskChecksum.length;
1204    }
1205
1206    /**
1207     * The uncompressed size of the block data. Does not include header size.
1208     */
1209    int getUncompressedSizeWithoutHeader() {
1210      expectState(State.BLOCK_READY);
1211      return baosInMemory.size() - HConstants.HFILEBLOCK_HEADER_SIZE;
1212    }
1213
1214    /**
1215     * The uncompressed size of the block data, including header size.
1216     */
1217    int getUncompressedSizeWithHeader() {
1218      expectState(State.BLOCK_READY);
1219      return baosInMemory.size();
1220    }
1221
1222    /** @return true if a block is being written  */
1223    boolean isWriting() {
1224      return state == State.WRITING;
1225    }
1226
1227    /**
1228     * Returns the number of bytes written into the current block so far, or
1229     * zero if not writing the block at the moment. Note that this will return
1230     * zero in the "block ready" state as well.
1231     *
1232     * @return the number of bytes written
1233     */
1234    public int encodedBlockSizeWritten() {
1235      if (state != State.WRITING)
1236        return 0;
1237      return this.encodedDataSizeWritten;
1238    }
1239
1240    /**
1241     * Returns the number of bytes written into the current block so far, or
1242     * zero if not writing the block at the moment. Note that this will return
1243     * zero in the "block ready" state as well.
1244     *
1245     * @return the number of bytes written
1246     */
1247    int blockSizeWritten() {
1248      if (state != State.WRITING) return 0;
1249      return this.unencodedDataSizeWritten;
1250    }
1251
1252    /**
1253     * Clones the header followed by the uncompressed data, even if using
1254     * compression. This is needed for storing uncompressed blocks in the block
1255     * cache. Can be called in the "writing" state or the "block ready" state.
1256     * Returns only the header and data, does not include checksum data.
1257     *
1258     * @return Returns a copy of uncompressed block bytes for caching on write
1259     */
1260    @VisibleForTesting
1261    ByteBuffer cloneUncompressedBufferWithHeader() {
1262      expectState(State.BLOCK_READY);
1263      byte[] uncompressedBlockBytesWithHeader = baosInMemory.toByteArray();
1264      int numBytes = (int) ChecksumUtil.numBytes(
1265          onDiskBlockBytesWithHeader.size(),
1266          fileContext.getBytesPerChecksum());
1267      putHeader(uncompressedBlockBytesWithHeader, 0,
1268        onDiskBlockBytesWithHeader.size() + numBytes,
1269        baosInMemory.size(), onDiskBlockBytesWithHeader.size());
1270      return ByteBuffer.wrap(uncompressedBlockBytesWithHeader);
1271    }
1272
1273    /**
1274     * Clones the header followed by the on-disk (compressed/encoded/encrypted) data. This is
1275     * needed for storing packed blocks in the block cache. Expects calling semantics identical to
1276     * {@link #getUncompressedBufferWithHeader()}. Returns only the header and data,
1277     * Does not include checksum data.
1278     *
1279     * @return Returns a copy of block bytes for caching on write
1280     */
1281    private ByteBuffer cloneOnDiskBufferWithHeader() {
1282      expectState(State.BLOCK_READY);
1283      return ByteBuffer.wrap(onDiskBlockBytesWithHeader.toByteArray());
1284    }
1285
1286    private void expectState(State expectedState) {
1287      if (state != expectedState) {
1288        throw new IllegalStateException("Expected state: " + expectedState +
1289            ", actual state: " + state);
1290      }
1291    }
1292
1293    /**
1294     * Takes the given {@link BlockWritable} instance, creates a new block of
1295     * its appropriate type, writes the writable into this block, and flushes
1296     * the block into the output stream. The writer is instructed not to buffer
1297     * uncompressed bytes for cache-on-write.
1298     *
1299     * @param bw the block-writable object to write as a block
1300     * @param out the file system output stream
1301     * @throws IOException
1302     */
1303    void writeBlock(BlockWritable bw, FSDataOutputStream out)
1304        throws IOException {
1305      bw.writeToBlock(startWriting(bw.getBlockType()));
1306      writeHeaderAndData(out);
1307    }
1308
1309    /**
1310     * Creates a new HFileBlock. Checksums have already been validated, so
1311     * the byte buffer passed into the constructor of this newly created
1312     * block does not have checksum data even though the header minor
1313     * version is MINOR_VERSION_WITH_CHECKSUM. This is indicated by setting a
1314     * 0 value in bytesPerChecksum. This method copies the on-disk or
1315     * uncompressed data to build the HFileBlock which is used only
1316     * while writing blocks and caching.
1317     *
1318     * <p>TODO: Should there be an option where a cache can ask that hbase preserve block
1319     * checksums for checking after a block comes out of the cache? Otehrwise, cache is responsible
1320     * for blocks being wholesome (ECC memory or if file-backed, it does checksumming).
1321     */
1322    HFileBlock getBlockForCaching(CacheConfig cacheConf) {
1323      HFileContext newContext = new HFileContextBuilder()
1324                                .withBlockSize(fileContext.getBlocksize())
1325                                .withBytesPerCheckSum(0)
1326                                .withChecksumType(ChecksumType.NULL) // no checksums in cached data
1327                                .withCompression(fileContext.getCompression())
1328                                .withDataBlockEncoding(fileContext.getDataBlockEncoding())
1329                                .withHBaseCheckSum(fileContext.isUseHBaseChecksum())
1330                                .withCompressTags(fileContext.isCompressTags())
1331                                .withIncludesMvcc(fileContext.isIncludesMvcc())
1332                                .withIncludesTags(fileContext.isIncludesTags())
1333                                .build();
1334       return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(),
1335          getUncompressedSizeWithoutHeader(), prevOffset,
1336          cacheConf.shouldCacheCompressed(blockType.getCategory())?
1337            cloneOnDiskBufferWithHeader() :
1338            cloneUncompressedBufferWithHeader(),
1339          FILL_HEADER, startOffset, UNSET,
1340          onDiskBlockBytesWithHeader.size() + onDiskChecksum.length, newContext);
1341    }
1342  }
1343
1344  /** Something that can be written into a block. */
1345  interface BlockWritable {
1346    /** The type of block this data should use. */
1347    BlockType getBlockType();
1348
1349    /**
1350     * Writes the block to the provided stream. Must not write any magic
1351     * records.
1352     *
1353     * @param out a stream to write uncompressed data into
1354     */
1355    void writeToBlock(DataOutput out) throws IOException;
1356  }
1357
1358  /** Iterator for {@link HFileBlock}s. */
1359  interface BlockIterator {
1360    /**
1361     * Get the next block, or null if there are no more blocks to iterate.
1362     */
1363    HFileBlock nextBlock() throws IOException;
1364
1365    /**
1366     * Similar to {@link #nextBlock()} but checks block type, throws an
1367     * exception if incorrect, and returns the HFile block
1368     */
1369    HFileBlock nextBlockWithBlockType(BlockType blockType) throws IOException;
1370  }
1371
1372  /** An HFile block reader with iteration ability. */
1373  interface FSReader {
1374    /**
1375     * Reads the block at the given offset in the file with the given on-disk
1376     * size and uncompressed size.
1377     *
1378     * @param offset
1379     * @param onDiskSize the on-disk size of the entire block, including all
1380     *          applicable headers, or -1 if unknown
1381     * @return the newly read block
1382     */
1383    HFileBlock readBlockData(long offset, long onDiskSize, boolean pread, boolean updateMetrics)
1384        throws IOException;
1385
1386    /**
1387     * Creates a block iterator over the given portion of the {@link HFile}.
1388     * The iterator returns blocks starting with offset such that offset &lt;=
1389     * startOffset &lt; endOffset. Returned blocks are always unpacked.
1390     * Used when no hfile index available; e.g. reading in the hfile index
1391     * blocks themselves on file open.
1392     *
1393     * @param startOffset the offset of the block to start iteration with
1394     * @param endOffset the offset to end iteration at (exclusive)
1395     * @return an iterator of blocks between the two given offsets
1396     */
1397    BlockIterator blockRange(long startOffset, long endOffset);
1398
1399    /** Closes the backing streams */
1400    void closeStreams() throws IOException;
1401
1402    /** Get a decoder for {@link BlockType#ENCODED_DATA} blocks from this file. */
1403    HFileBlockDecodingContext getBlockDecodingContext();
1404
1405    /** Get the default decoder for blocks from this file. */
1406    HFileBlockDecodingContext getDefaultBlockDecodingContext();
1407
1408    void setIncludesMemStoreTS(boolean includesMemstoreTS);
1409    void setDataBlockEncoder(HFileDataBlockEncoder encoder);
1410
1411    /**
1412     * To close the stream's socket. Note: This can be concurrently called from multiple threads and
1413     * implementation should take care of thread safety.
1414     */
1415    void unbufferStream();
1416  }
1417
1418  /**
1419   * Data-structure to use caching the header of the NEXT block. Only works if next read
1420   * that comes in here is next in sequence in this block.
1421   *
1422   * When we read, we read current block and the next blocks' header. We do this so we have
1423   * the length of the next block to read if the hfile index is not available (rare, at
1424   * hfile open only).
1425   */
1426  private static class PrefetchedHeader {
1427    long offset = -1;
1428    byte [] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE];
1429    final ByteBuffer buf = ByteBuffer.wrap(header, 0, HConstants.HFILEBLOCK_HEADER_SIZE);
1430
1431    @Override
1432    public String toString() {
1433      return "offset=" + this.offset + ", header=" + Bytes.toStringBinary(header);
1434    }
1435  }
1436
1437  /**
1438   * Reads version 2 HFile blocks from the filesystem.
1439   */
1440  static class FSReaderImpl implements FSReader {
1441    /** The file system stream of the underlying {@link HFile} that
1442     * does or doesn't do checksum validations in the filesystem */
1443    private FSDataInputStreamWrapper streamWrapper;
1444
1445    private HFileBlockDecodingContext encodedBlockDecodingCtx;
1446
1447    /** Default context used when BlockType != {@link BlockType#ENCODED_DATA}. */
1448    private final HFileBlockDefaultDecodingContext defaultDecodingCtx;
1449
1450    /**
1451     * Cache of the NEXT header after this. Check it is indeed next blocks header
1452     * before using it. TODO: Review. This overread into next block to fetch
1453     * next blocks header seems unnecessary given we usually get the block size
1454     * from the hfile index. Review!
1455     */
1456    private AtomicReference<PrefetchedHeader> prefetchedHeader = new AtomicReference<>(new PrefetchedHeader());
1457
1458    /** The size of the file we are reading from, or -1 if unknown. */
1459    private long fileSize;
1460
1461    /** The size of the header */
1462    @VisibleForTesting
1463    protected final int hdrSize;
1464
1465    /** The filesystem used to access data */
1466    private HFileSystem hfs;
1467
1468    private HFileContext fileContext;
1469    // Cache the fileName
1470    private String pathName;
1471
1472    private final Lock streamLock = new ReentrantLock();
1473
1474    FSReaderImpl(FSDataInputStreamWrapper stream, long fileSize, HFileSystem hfs, Path path,
1475        HFileContext fileContext) throws IOException {
1476      this.fileSize = fileSize;
1477      this.hfs = hfs;
1478      if (path != null) {
1479        this.pathName = path.toString();
1480      }
1481      this.fileContext = fileContext;
1482      this.hdrSize = headerSize(fileContext.isUseHBaseChecksum());
1483
1484      this.streamWrapper = stream;
1485      // Older versions of HBase didn't support checksum.
1486      this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum());
1487      defaultDecodingCtx = new HFileBlockDefaultDecodingContext(fileContext);
1488      encodedBlockDecodingCtx = defaultDecodingCtx;
1489    }
1490
1491    /**
1492     * A constructor that reads files with the latest minor version.
1493     * This is used by unit tests only.
1494     */
1495    FSReaderImpl(FSDataInputStream istream, long fileSize, HFileContext fileContext)
1496    throws IOException {
1497      this(new FSDataInputStreamWrapper(istream), fileSize, null, null, fileContext);
1498    }
1499
1500    @Override
1501    public BlockIterator blockRange(final long startOffset, final long endOffset) {
1502      final FSReader owner = this; // handle for inner class
1503      return new BlockIterator() {
1504        private long offset = startOffset;
1505        // Cache length of next block. Current block has the length of next block in it.
1506        private long length = -1;
1507
1508        @Override
1509        public HFileBlock nextBlock() throws IOException {
1510          if (offset >= endOffset) {
1511            return null;
1512          }
1513          HFileBlock b = readBlockData(offset, length, false, false);
1514          offset += b.getOnDiskSizeWithHeader();
1515          length = b.getNextBlockOnDiskSize();
1516          return b.unpack(fileContext, owner);
1517        }
1518
1519        @Override
1520        public HFileBlock nextBlockWithBlockType(BlockType blockType)
1521            throws IOException {
1522          HFileBlock blk = nextBlock();
1523          if (blk.getBlockType() != blockType) {
1524            throw new IOException("Expected block of type " + blockType
1525                + " but found " + blk.getBlockType());
1526          }
1527          return blk;
1528        }
1529      };
1530    }
1531
1532    /**
1533     * Does a positional read or a seek and read into the given buffer. Returns
1534     * the on-disk size of the next block, or -1 if it could not be read/determined; e.g. EOF.
1535     *
1536     * @param dest destination buffer
1537     * @param destOffset offset into the destination buffer at where to put the bytes we read
1538     * @param size size of read
1539     * @param peekIntoNextBlock whether to read the next block's on-disk size
1540     * @param fileOffset position in the stream to read at
1541     * @param pread whether we should do a positional read
1542     * @param istream The input source of data
1543     * @return the on-disk size of the next block with header size included, or
1544     *         -1 if it could not be determined; if not -1, the <code>dest</code> INCLUDES the
1545     *         next header
1546     * @throws IOException
1547     */
1548    @VisibleForTesting
1549    protected int readAtOffset(FSDataInputStream istream, byte[] dest, int destOffset, int size,
1550        boolean peekIntoNextBlock, long fileOffset, boolean pread)
1551        throws IOException {
1552      if (peekIntoNextBlock && destOffset + size + hdrSize > dest.length) {
1553        // We are asked to read the next block's header as well, but there is
1554        // not enough room in the array.
1555        throw new IOException("Attempted to read " + size + " bytes and " + hdrSize +
1556            " bytes of next header into a " + dest.length + "-byte array at offset " + destOffset);
1557      }
1558
1559      if (!pread) {
1560        // Seek + read. Better for scanning.
1561        HFileUtil.seekOnMultipleSources(istream, fileOffset);
1562        // TODO: do we need seek time latencies?
1563        long realOffset = istream.getPos();
1564        if (realOffset != fileOffset) {
1565          throw new IOException("Tried to seek to " + fileOffset + " to " + "read " + size +
1566              " bytes, but pos=" + realOffset + " after seek");
1567        }
1568
1569        if (!peekIntoNextBlock) {
1570          IOUtils.readFully(istream, dest, destOffset, size);
1571          return -1;
1572        }
1573
1574        // Try to read the next block header.
1575        if (!readWithExtra(istream, dest, destOffset, size, hdrSize)) {
1576          return -1;
1577        }
1578      } else {
1579        // Positional read. Better for random reads; or when the streamLock is already locked.
1580        int extraSize = peekIntoNextBlock ? hdrSize : 0;
1581        if (!positionalReadWithExtra(istream, fileOffset, dest, destOffset, size, extraSize)) {
1582          return -1;
1583        }
1584      }
1585      assert peekIntoNextBlock;
1586      return Bytes.toInt(dest, destOffset + size + BlockType.MAGIC_LENGTH) + hdrSize;
1587    }
1588
1589    /**
1590     * Reads a version 2 block (version 1 blocks not supported and not expected). Tries to do as
1591     * little memory allocation as possible, using the provided on-disk size.
1592     *
1593     * @param offset the offset in the stream to read at
1594     * @param onDiskSizeWithHeaderL the on-disk size of the block, including
1595     *          the header, or -1 if unknown; i.e. when iterating over blocks reading
1596     *          in the file metadata info.
1597     * @param pread whether to use a positional read
1598     */
1599    @Override
1600    public HFileBlock readBlockData(long offset, long onDiskSizeWithHeaderL, boolean pread,
1601                                    boolean updateMetrics) throws IOException {
1602      // Get a copy of the current state of whether to validate
1603      // hbase checksums or not for this read call. This is not
1604      // thread-safe but the one constaint is that if we decide
1605      // to skip hbase checksum verification then we are
1606      // guaranteed to use hdfs checksum verification.
1607      boolean doVerificationThruHBaseChecksum = streamWrapper.shouldUseHBaseChecksum();
1608      FSDataInputStream is = streamWrapper.getStream(doVerificationThruHBaseChecksum);
1609
1610      HFileBlock blk = readBlockDataInternal(is, offset,
1611                         onDiskSizeWithHeaderL, pread,
1612                         doVerificationThruHBaseChecksum, updateMetrics);
1613      if (blk == null) {
1614        HFile.LOG.warn("HBase checksum verification failed for file " +
1615                       pathName + " at offset " +
1616                       offset + " filesize " + fileSize +
1617                       ". Retrying read with HDFS checksums turned on...");
1618
1619        if (!doVerificationThruHBaseChecksum) {
1620          String msg = "HBase checksum verification failed for file " +
1621                       pathName + " at offset " +
1622                       offset + " filesize " + fileSize +
1623                       " but this cannot happen because doVerify is " +
1624                       doVerificationThruHBaseChecksum;
1625          HFile.LOG.warn(msg);
1626          throw new IOException(msg); // cannot happen case here
1627        }
1628        HFile.CHECKSUM_FAILURES.increment(); // update metrics
1629
1630        // If we have a checksum failure, we fall back into a mode where
1631        // the next few reads use HDFS level checksums. We aim to make the
1632        // next CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD reads avoid
1633        // hbase checksum verification, but since this value is set without
1634        // holding any locks, it can so happen that we might actually do
1635        // a few more than precisely this number.
1636        is = this.streamWrapper.fallbackToFsChecksum(CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD);
1637        doVerificationThruHBaseChecksum = false;
1638        blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread,
1639                                    doVerificationThruHBaseChecksum, updateMetrics);
1640        if (blk != null) {
1641          HFile.LOG.warn("HDFS checksum verification succeeded for file " +
1642                         pathName + " at offset " +
1643                         offset + " filesize " + fileSize);
1644        }
1645      }
1646      if (blk == null && !doVerificationThruHBaseChecksum) {
1647        String msg = "readBlockData failed, possibly due to " +
1648                     "checksum verification failed for file " + pathName +
1649                     " at offset " + offset + " filesize " + fileSize;
1650        HFile.LOG.warn(msg);
1651        throw new IOException(msg);
1652      }
1653
1654      // If there is a checksum mismatch earlier, then retry with
1655      // HBase checksums switched off and use HDFS checksum verification.
1656      // This triggers HDFS to detect and fix corrupt replicas. The
1657      // next checksumOffCount read requests will use HDFS checksums.
1658      // The decrementing of this.checksumOffCount is not thread-safe,
1659      // but it is harmless because eventually checksumOffCount will be
1660      // a negative number.
1661      streamWrapper.checksumOk();
1662      return blk;
1663    }
1664
1665    /**
1666     * @return Check <code>onDiskSizeWithHeaderL</code> size is healthy and then return it as an int
1667     * @throws IOException
1668     */
1669    private static int checkAndGetSizeAsInt(final long onDiskSizeWithHeaderL, final int hdrSize)
1670    throws IOException {
1671      if ((onDiskSizeWithHeaderL < hdrSize && onDiskSizeWithHeaderL != -1)
1672          || onDiskSizeWithHeaderL >= Integer.MAX_VALUE) {
1673        throw new IOException("Invalid onDisksize=" + onDiskSizeWithHeaderL
1674            + ": expected to be at least " + hdrSize
1675            + " and at most " + Integer.MAX_VALUE + ", or -1");
1676      }
1677      return (int)onDiskSizeWithHeaderL;
1678    }
1679
1680    /**
1681     * Verify the passed in onDiskSizeWithHeader aligns with what is in the header else something
1682     * is not right.
1683     * @throws IOException
1684     */
1685    private void verifyOnDiskSizeMatchesHeader(final int passedIn, final ByteBuffer headerBuf,
1686        final long offset, boolean verifyChecksum)
1687    throws IOException {
1688      // Assert size provided aligns with what is in the header
1689      int fromHeader = getOnDiskSizeWithHeader(headerBuf, verifyChecksum);
1690      if (passedIn != fromHeader) {
1691        throw new IOException("Passed in onDiskSizeWithHeader=" + passedIn + " != " + fromHeader +
1692            ", offset=" + offset + ", fileContext=" + this.fileContext);
1693      }
1694    }
1695
1696    /**
1697     * Check atomic reference cache for this block's header. Cache only good if next
1698     * read coming through is next in sequence in the block. We read next block's
1699     * header on the tail of reading the previous block to save a seek. Otherwise,
1700     * we have to do a seek to read the header before we can pull in the block OR
1701     * we have to backup the stream because we over-read (the next block's header).
1702     * @see PrefetchedHeader
1703     * @return The cached block header or null if not found.
1704     * @see #cacheNextBlockHeader(long, byte[], int, int)
1705     */
1706    private ByteBuffer getCachedHeader(final long offset) {
1707      PrefetchedHeader ph = this.prefetchedHeader.get();
1708      return ph != null && ph.offset == offset? ph.buf: null;
1709    }
1710
1711    /**
1712     * Save away the next blocks header in atomic reference.
1713     * @see #getCachedHeader(long)
1714     * @see PrefetchedHeader
1715     */
1716    private void cacheNextBlockHeader(final long offset,
1717        final byte [] header, final int headerOffset, final int headerLength) {
1718      PrefetchedHeader ph = new PrefetchedHeader();
1719      ph.offset = offset;
1720      System.arraycopy(header, headerOffset, ph.header, 0, headerLength);
1721      this.prefetchedHeader.set(ph);
1722    }
1723
1724    /**
1725     * Reads a version 2 block.
1726     *
1727     * @param offset the offset in the stream to read at.
1728     * @param onDiskSizeWithHeaderL the on-disk size of the block, including
1729     *          the header and checksums if present or -1 if unknown (as a long). Can be -1
1730     *          if we are doing raw iteration of blocks as when loading up file metadata; i.e.
1731     *          the first read of a new file. Usually non-null gotten from the file index.
1732     * @param pread whether to use a positional read
1733     * @param verifyChecksum Whether to use HBase checksums.
1734     *        If HBase checksum is switched off, then use HDFS checksum. Can also flip on/off
1735     *        reading same file if we hit a troublesome patch in an hfile.
1736     * @return the HFileBlock or null if there is a HBase checksum mismatch
1737     */
1738    @VisibleForTesting
1739    protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset,
1740        long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum, boolean updateMetrics)
1741     throws IOException {
1742      if (offset < 0) {
1743        throw new IOException("Invalid offset=" + offset + " trying to read "
1744            + "block (onDiskSize=" + onDiskSizeWithHeaderL + ")");
1745      }
1746      int onDiskSizeWithHeader = checkAndGetSizeAsInt(onDiskSizeWithHeaderL, hdrSize);
1747      // Try and get cached header. Will serve us in rare case where onDiskSizeWithHeaderL is -1
1748      // and will save us having to seek the stream backwards to reread the header we
1749      // read the last time through here.
1750      ByteBuffer headerBuf = getCachedHeader(offset);
1751      LOG.trace("Reading {} at offset={}, pread={}, verifyChecksum={}, cachedHeader={}, " +
1752          "onDiskSizeWithHeader={}", this.fileContext.getHFileName(), offset, pread,
1753          verifyChecksum, headerBuf, onDiskSizeWithHeader);
1754      // This is NOT same as verifyChecksum. This latter is whether to do hbase
1755      // checksums. Can change with circumstances. The below flag is whether the
1756      // file has support for checksums (version 2+).
1757      boolean checksumSupport = this.fileContext.isUseHBaseChecksum();
1758      long startTime = System.currentTimeMillis();
1759      if (onDiskSizeWithHeader <= 0) {
1760        // We were not passed the block size. Need to get it from the header. If header was
1761        // not cached (see getCachedHeader above), need to seek to pull it in. This is costly
1762        // and should happen very rarely. Currently happens on open of a hfile reader where we
1763        // read the trailer blocks to pull in the indices. Otherwise, we are reading block sizes
1764        // out of the hfile index. To check, enable TRACE in this file and you'll get an exception
1765        // in a LOG every time we seek. See HBASE-17072 for more detail.
1766        if (headerBuf == null) {
1767          if (LOG.isTraceEnabled()) {
1768            LOG.trace("Extra see to get block size!", new RuntimeException());
1769          }
1770          headerBuf = ByteBuffer.allocate(hdrSize);
1771          readAtOffset(is, headerBuf.array(), headerBuf.arrayOffset(), hdrSize, false,
1772              offset, pread);
1773        }
1774        onDiskSizeWithHeader = getOnDiskSizeWithHeader(headerBuf, checksumSupport);
1775      }
1776      int preReadHeaderSize = headerBuf == null? 0 : hdrSize;
1777      // Allocate enough space to fit the next block's header too; saves a seek next time through.
1778      // onDiskBlock is whole block + header + checksums then extra hdrSize to read next header;
1779      // onDiskSizeWithHeader is header, body, and any checksums if present. preReadHeaderSize
1780      // says where to start reading. If we have the header cached, then we don't need to read
1781      // it again and we can likely read from last place we left off w/o need to backup and reread
1782      // the header we read last time through here.
1783      // TODO: Make this ByteBuffer-based. Will make it easier to go to HDFS with BBPool (offheap).
1784      byte [] onDiskBlock = new byte[onDiskSizeWithHeader + hdrSize];
1785      int nextBlockOnDiskSize = readAtOffset(is, onDiskBlock, preReadHeaderSize,
1786          onDiskSizeWithHeader - preReadHeaderSize, true, offset + preReadHeaderSize, pread);
1787      if (headerBuf != null) {
1788        // The header has been read when reading the previous block OR in a distinct header-only
1789        // read. Copy to this block's header.
1790        System.arraycopy(headerBuf.array(), headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize);
1791      } else {
1792        headerBuf = ByteBuffer.wrap(onDiskBlock, 0, hdrSize);
1793      }
1794      // Do a few checks before we go instantiate HFileBlock.
1795      assert onDiskSizeWithHeader > this.hdrSize;
1796      verifyOnDiskSizeMatchesHeader(onDiskSizeWithHeader, headerBuf, offset, checksumSupport);
1797      ByteBuffer onDiskBlockByteBuffer = ByteBuffer.wrap(onDiskBlock, 0, onDiskSizeWithHeader);
1798      // Verify checksum of the data before using it for building HFileBlock.
1799      if (verifyChecksum &&
1800          !validateChecksum(offset, onDiskBlockByteBuffer, hdrSize)) {
1801        return null;
1802      }
1803      long duration = System.currentTimeMillis() - startTime;
1804      if (updateMetrics) {
1805        HFile.updateReadLatency(duration, pread);
1806      }
1807      // The onDiskBlock will become the headerAndDataBuffer for this block.
1808      // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already
1809      // contains the header of next block, so no need to set next block's header in it.
1810      HFileBlock hFileBlock =
1811          new HFileBlock(new SingleByteBuff(onDiskBlockByteBuffer), checksumSupport,
1812              MemoryType.EXCLUSIVE, offset, nextBlockOnDiskSize, fileContext);
1813      // Run check on uncompressed sizings.
1814      if (!fileContext.isCompressedOrEncrypted()) {
1815        hFileBlock.sanityCheckUncompressed();
1816      }
1817      LOG.trace("Read {} in {} ns", hFileBlock, duration);
1818      // Cache next block header if we read it for the next time through here.
1819      if (nextBlockOnDiskSize != -1) {
1820        cacheNextBlockHeader(offset + hFileBlock.getOnDiskSizeWithHeader(),
1821            onDiskBlock, onDiskSizeWithHeader, hdrSize);
1822      }
1823      return hFileBlock;
1824    }
1825
1826    @Override
1827    public void setIncludesMemStoreTS(boolean includesMemstoreTS) {
1828      this.fileContext.setIncludesMvcc(includesMemstoreTS);
1829    }
1830
1831    @Override
1832    public void setDataBlockEncoder(HFileDataBlockEncoder encoder) {
1833      encodedBlockDecodingCtx = encoder.newDataBlockDecodingContext(this.fileContext);
1834    }
1835
1836    @Override
1837    public HFileBlockDecodingContext getBlockDecodingContext() {
1838      return this.encodedBlockDecodingCtx;
1839    }
1840
1841    @Override
1842    public HFileBlockDecodingContext getDefaultBlockDecodingContext() {
1843      return this.defaultDecodingCtx;
1844    }
1845
1846    /**
1847     * Generates the checksum for the header as well as the data and then validates it.
1848     * If the block doesn't uses checksum, returns false.
1849     * @return True if checksum matches, else false.
1850     */
1851    private boolean validateChecksum(long offset, ByteBuffer data, int hdrSize)
1852        throws IOException {
1853      // If this is an older version of the block that does not have checksums, then return false
1854      // indicating that checksum verification did not succeed. Actually, this method should never
1855      // be called when the minorVersion is 0, thus this is a defensive check for a cannot-happen
1856      // case. Since this is a cannot-happen case, it is better to return false to indicate a
1857      // checksum validation failure.
1858      if (!fileContext.isUseHBaseChecksum()) {
1859        return false;
1860      }
1861      return ChecksumUtil.validateChecksum(data, pathName, offset, hdrSize);
1862    }
1863
1864    @Override
1865    public void closeStreams() throws IOException {
1866      streamWrapper.close();
1867    }
1868
1869    @Override
1870    public void unbufferStream() {
1871      // To handle concurrent reads, ensure that no other client is accessing the streams while we
1872      // unbuffer it.
1873      if (streamLock.tryLock()) {
1874        try {
1875          this.streamWrapper.unbuffer();
1876        } finally {
1877          streamLock.unlock();
1878        }
1879      }
1880    }
1881
1882    @Override
1883    public String toString() {
1884      return "hfs=" + hfs + ", path=" + pathName + ", fileContext=" + fileContext;
1885    }
1886  }
1887
1888  /** An additional sanity-check in case no compression or encryption is being used. */
1889  @VisibleForTesting
1890  void sanityCheckUncompressed() throws IOException {
1891    if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader +
1892        totalChecksumBytes()) {
1893      throw new IOException("Using no compression but "
1894          + "onDiskSizeWithoutHeader=" + onDiskSizeWithoutHeader + ", "
1895          + "uncompressedSizeWithoutHeader=" + uncompressedSizeWithoutHeader
1896          + ", numChecksumbytes=" + totalChecksumBytes());
1897    }
1898  }
1899
1900  // Cacheable implementation
1901  @Override
1902  public int getSerializedLength() {
1903    if (buf != null) {
1904      // Include extra bytes for block metadata.
1905      return this.buf.limit() + BLOCK_METADATA_SPACE;
1906    }
1907    return 0;
1908  }
1909
1910  // Cacheable implementation
1911  @Override
1912  public void serialize(ByteBuffer destination, boolean includeNextBlockMetadata) {
1913    this.buf.get(destination, 0, getSerializedLength() - BLOCK_METADATA_SPACE);
1914    destination = addMetaData(destination, includeNextBlockMetadata);
1915
1916    // Make it ready for reading. flip sets position to zero and limit to current position which
1917    // is what we want if we do not want to serialize the block plus checksums if present plus
1918    // metadata.
1919    destination.flip();
1920  }
1921
1922  /**
1923   * For use by bucketcache. This exposes internals.
1924   */
1925  public ByteBuffer getMetaData() {
1926    ByteBuffer bb = ByteBuffer.allocate(BLOCK_METADATA_SPACE);
1927    bb = addMetaData(bb, true);
1928    bb.flip();
1929    return bb;
1930  }
1931
1932  /**
1933   * Adds metadata at current position (position is moved forward). Does not flip or reset.
1934   * @return The passed <code>destination</code> with metadata added.
1935   */
1936  private ByteBuffer addMetaData(final ByteBuffer destination, boolean includeNextBlockMetadata) {
1937    destination.put(this.fileContext.isUseHBaseChecksum() ? (byte) 1 : (byte) 0);
1938    destination.putLong(this.offset);
1939    if (includeNextBlockMetadata) {
1940      destination.putInt(this.nextBlockOnDiskSize);
1941    }
1942    return destination;
1943  }
1944
1945  // Cacheable implementation
1946  @Override
1947  public CacheableDeserializer<Cacheable> getDeserializer() {
1948    return HFileBlock.BLOCK_DESERIALIZER;
1949  }
1950
1951  @Override
1952  public int hashCode() {
1953    int result = 1;
1954    result = result * 31 + blockType.hashCode();
1955    result = result * 31 + nextBlockOnDiskSize;
1956    result = result * 31 + (int) (offset ^ (offset >>> 32));
1957    result = result * 31 + onDiskSizeWithoutHeader;
1958    result = result * 31 + (int) (prevBlockOffset ^ (prevBlockOffset >>> 32));
1959    result = result * 31 + uncompressedSizeWithoutHeader;
1960    result = result * 31 + buf.hashCode();
1961    return result;
1962  }
1963
1964  @Override
1965  public boolean equals(Object comparison) {
1966    if (this == comparison) {
1967      return true;
1968    }
1969    if (comparison == null) {
1970      return false;
1971    }
1972    if (comparison.getClass() != this.getClass()) {
1973      return false;
1974    }
1975
1976    HFileBlock castedComparison = (HFileBlock) comparison;
1977
1978    if (castedComparison.blockType != this.blockType) {
1979      return false;
1980    }
1981    if (castedComparison.nextBlockOnDiskSize != this.nextBlockOnDiskSize) {
1982      return false;
1983    }
1984    // Offset is important. Needed when we have to remake cachekey when block is returned to cache.
1985    if (castedComparison.offset != this.offset) {
1986      return false;
1987    }
1988    if (castedComparison.onDiskSizeWithoutHeader != this.onDiskSizeWithoutHeader) {
1989      return false;
1990    }
1991    if (castedComparison.prevBlockOffset != this.prevBlockOffset) {
1992      return false;
1993    }
1994    if (castedComparison.uncompressedSizeWithoutHeader != this.uncompressedSizeWithoutHeader) {
1995      return false;
1996    }
1997    if (ByteBuff.compareTo(this.buf, 0, this.buf.limit(), castedComparison.buf, 0,
1998        castedComparison.buf.limit()) != 0) {
1999      return false;
2000    }
2001    return true;
2002  }
2003
2004  DataBlockEncoding getDataBlockEncoding() {
2005    if (blockType == BlockType.ENCODED_DATA) {
2006      return DataBlockEncoding.getEncodingById(getDataBlockEncodingId());
2007    }
2008    return DataBlockEncoding.NONE;
2009  }
2010
2011  @VisibleForTesting
2012  byte getChecksumType() {
2013    return this.fileContext.getChecksumType().getCode();
2014  }
2015
2016  int getBytesPerChecksum() {
2017    return this.fileContext.getBytesPerChecksum();
2018  }
2019
2020  /** @return the size of data on disk + header. Excludes checksum. */
2021  @VisibleForTesting
2022  int getOnDiskDataSizeWithHeader() {
2023    return this.onDiskDataSizeWithHeader;
2024  }
2025
2026  /**
2027   * Calculate the number of bytes required to store all the checksums
2028   * for this block. Each checksum value is a 4 byte integer.
2029   */
2030  int totalChecksumBytes() {
2031    // If the hfile block has minorVersion 0, then there are no checksum
2032    // data to validate. Similarly, a zero value in this.bytesPerChecksum
2033    // indicates that cached blocks do not have checksum data because
2034    // checksums were already validated when the block was read from disk.
2035    if (!fileContext.isUseHBaseChecksum() || this.fileContext.getBytesPerChecksum() == 0) {
2036      return 0;
2037    }
2038    return (int) ChecksumUtil.numBytes(onDiskDataSizeWithHeader,
2039        this.fileContext.getBytesPerChecksum());
2040  }
2041
2042  /**
2043   * Returns the size of this block header.
2044   */
2045  public int headerSize() {
2046    return headerSize(this.fileContext.isUseHBaseChecksum());
2047  }
2048
2049  /**
2050   * Maps a minor version to the size of the header.
2051   */
2052  public static int headerSize(boolean usesHBaseChecksum) {
2053    return usesHBaseChecksum?
2054        HConstants.HFILEBLOCK_HEADER_SIZE: HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
2055  }
2056
2057  /**
2058   * Return the appropriate DUMMY_HEADER for the minor version
2059   */
2060  @VisibleForTesting
2061  // TODO: Why is this in here?
2062  byte[] getDummyHeaderForVersion() {
2063    return getDummyHeaderForVersion(this.fileContext.isUseHBaseChecksum());
2064  }
2065
2066  /**
2067   * Return the appropriate DUMMY_HEADER for the minor version
2068   */
2069  static private byte[] getDummyHeaderForVersion(boolean usesHBaseChecksum) {
2070    return usesHBaseChecksum? HConstants.HFILEBLOCK_DUMMY_HEADER: DUMMY_HEADER_NO_CHECKSUM;
2071  }
2072
2073  /**
2074   * @return This HFileBlocks fileContext which will a derivative of the
2075   * fileContext for the file from which this block's data was originally read.
2076   */
2077  HFileContext getHFileContext() {
2078    return this.fileContext;
2079  }
2080
2081  @Override
2082  public MemoryType getMemoryType() {
2083    return this.memType;
2084  }
2085
2086  /**
2087   * @return true if this block is backed by a shared memory area(such as that of a BucketCache).
2088   */
2089  boolean usesSharedMemory() {
2090    return this.memType == MemoryType.SHARED;
2091  }
2092
2093  /**
2094   * Convert the contents of the block header into a human readable string.
2095   * This is mostly helpful for debugging. This assumes that the block
2096   * has minor version > 0.
2097   */
2098  @VisibleForTesting
2099  static String toStringHeader(ByteBuff buf) throws IOException {
2100    byte[] magicBuf = new byte[Math.min(buf.limit() - buf.position(), BlockType.MAGIC_LENGTH)];
2101    buf.get(magicBuf);
2102    BlockType bt = BlockType.parse(magicBuf, 0, BlockType.MAGIC_LENGTH);
2103    int compressedBlockSizeNoHeader = buf.getInt();
2104    int uncompressedBlockSizeNoHeader = buf.getInt();
2105    long prevBlockOffset = buf.getLong();
2106    byte cksumtype = buf.get();
2107    long bytesPerChecksum = buf.getInt();
2108    long onDiskDataSizeWithHeader = buf.getInt();
2109    return " Header dump: magic: " + Bytes.toString(magicBuf) +
2110                   " blockType " + bt +
2111                   " compressedBlockSizeNoHeader " +
2112                   compressedBlockSizeNoHeader +
2113                   " uncompressedBlockSizeNoHeader " +
2114                   uncompressedBlockSizeNoHeader +
2115                   " prevBlockOffset " + prevBlockOffset +
2116                   " checksumType " + ChecksumType.codeToType(cksumtype) +
2117                   " bytesPerChecksum " + bytesPerChecksum +
2118                   " onDiskDataSizeWithHeader " + onDiskDataSizeWithHeader;
2119  }
2120
2121  public HFileBlock deepClone() {
2122    return new HFileBlock(this, true);
2123  }
2124}