001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import java.io.DataInputStream;
021import java.io.DataOutput;
022import java.io.DataOutputStream;
023import java.io.IOException;
024import java.io.InputStream;
025import java.nio.ByteBuffer;
026import java.util.concurrent.atomic.AtomicReference;
027import java.util.concurrent.locks.Lock;
028import java.util.concurrent.locks.ReentrantLock;
029
030import org.apache.hadoop.fs.FSDataInputStream;
031import org.apache.hadoop.fs.FSDataOutputStream;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038import org.apache.hadoop.hbase.fs.HFileSystem;
039import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
040import org.apache.hadoop.hbase.io.ByteBuffInputStream;
041import org.apache.hadoop.hbase.io.ByteBufferWriterDataOutputStream;
042import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
043import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
044import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
045import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
046import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
047import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
048import org.apache.hadoop.hbase.nio.ByteBuff;
049import org.apache.hadoop.hbase.nio.MultiByteBuff;
050import org.apache.hadoop.hbase.nio.SingleByteBuff;
051import org.apache.hadoop.hbase.util.Bytes;
052import org.apache.hadoop.hbase.util.ChecksumType;
053import org.apache.hadoop.hbase.util.ClassSize;
054import org.apache.hadoop.io.IOUtils;
055
056import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
057import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
058
059/**
060 * Cacheable Blocks of an {@link HFile} version 2 file.
061 * Version 2 was introduced in hbase-0.92.0.
062 *
063 * <p>Version 1 was the original file block. Version 2 was introduced when we changed the hbase file
064 * format to support multi-level block indexes and compound bloom filters (HBASE-3857). Support
065 * for Version 1 was removed in hbase-1.3.0.
066 *
067 * <h3>HFileBlock: Version 2</h3>
068 * In version 2, a block is structured as follows:
069 * <ul>
070 * <li><b>Header:</b> See Writer#putHeader() for where header is written; header total size is
071 * HFILEBLOCK_HEADER_SIZE
072 * <ul>
073 * <li>0. blockType: Magic record identifying the {@link BlockType} (8 bytes):
074 * e.g. <code>DATABLK*</code>
075 * <li>1. onDiskSizeWithoutHeader: Compressed -- a.k.a 'on disk' -- block size, excluding header,
076 * but including tailing checksum bytes (4 bytes)
077 * <li>2. uncompressedSizeWithoutHeader: Uncompressed block size, excluding header, and excluding
078 * checksum bytes (4 bytes)
079 * <li>3. prevBlockOffset: The offset of the previous block of the same type (8 bytes). This is
080 * used to navigate to the previous block without having to go to the block index
081 * <li>4: For minorVersions &gt;=1, the ordinal describing checksum type (1 byte)
082 * <li>5: For minorVersions &gt;=1, the number of data bytes/checksum chunk (4 bytes)
083 * <li>6: onDiskDataSizeWithHeader: For minorVersions &gt;=1, the size of data 'on disk', including
084 * header, excluding checksums (4 bytes)
085 * </ul>
086 * </li>
087 * <li><b>Raw/Compressed/Encrypted/Encoded data:</b> The compression
088 * algorithm is the same for all the blocks in an {@link HFile}. If compression is NONE, this is
089 * just raw, serialized Cells.
090 * <li><b>Tail:</b> For minorVersions &gt;=1, a series of 4 byte checksums, one each for
091 * the number of bytes specified by bytesPerChecksum.
092 * </ul>
093 *
094 * <h3>Caching</h3>
095 * Caches cache whole blocks with trailing checksums if any. We then tag on some metadata, the
096 * content of BLOCK_METADATA_SPACE which will be flag on if we are doing 'hbase'
097 * checksums and then the offset into the file which is needed when we re-make a cache key
098 * when we return the block to the cache as 'done'.
099 * See {@link Cacheable#serialize(ByteBuffer, boolean)} and {@link Cacheable#getDeserializer()}.
100 *
101 * <p>TODO: Should we cache the checksums? Down in Writer#getBlockForCaching(CacheConfig) where
102 * we make a block to cache-on-write, there is an attempt at turning off checksums. This is not the
103 * only place we get blocks to cache. We also will cache the raw return from an hdfs read. In this
104 * case, the checksums may be present. If the cache is backed by something that doesn't do ECC,
105 * say an SSD, we might want to preserve checksums. For now this is open question.
106 * <p>TODO: Over in BucketCache, we save a block allocation by doing a custom serialization.
107 * Be sure to change it if serialization changes in here. Could we add a method here that takes an
108 * IOEngine and that then serializes to it rather than expose our internals over in BucketCache?
109 * IOEngine is in the bucket subpackage. Pull it up? Then this class knows about bucketcache. Ugh.
110 */
111@InterfaceAudience.Private
112public class HFileBlock implements Cacheable {
113  private static final Logger LOG = LoggerFactory.getLogger(HFileBlock.class);
114
115  // Block Header fields.
116
117  // TODO: encapsulate Header related logic in this inner class.
118  static class Header {
119    // Format of header is:
120    // 8 bytes - block magic
121    // 4 bytes int - onDiskSizeWithoutHeader
122    // 4 bytes int - uncompressedSizeWithoutHeader
123    // 8 bytes long - prevBlockOffset
124    // The following 3 are only present if header contains checksum information
125    // 1 byte - checksum type
126    // 4 byte int - bytes per checksum
127    // 4 byte int - onDiskDataSizeWithHeader
128    static int BLOCK_MAGIC_INDEX = 0;
129    static int ON_DISK_SIZE_WITHOUT_HEADER_INDEX = 8;
130    static int UNCOMPRESSED_SIZE_WITHOUT_HEADER_INDEX = 12;
131    static int PREV_BLOCK_OFFSET_INDEX = 16;
132    static int CHECKSUM_TYPE_INDEX = 24;
133    static int BYTES_PER_CHECKSUM_INDEX = 25;
134    static int ON_DISK_DATA_SIZE_WITH_HEADER_INDEX = 29;
135  }
136
137  /** Type of block. Header field 0. */
138  private BlockType blockType;
139
140  /**
141   * Size on disk excluding header, including checksum. Header field 1.
142   * @see Writer#putHeader(byte[], int, int, int, int)
143   */
144  private int onDiskSizeWithoutHeader;
145
146  /**
147   * Size of pure data. Does not include header or checksums. Header field 2.
148   * @see Writer#putHeader(byte[], int, int, int, int)
149   */
150  private int uncompressedSizeWithoutHeader;
151
152  /**
153   * The offset of the previous block on disk. Header field 3.
154   * @see Writer#putHeader(byte[], int, int, int, int)
155   */
156  private long prevBlockOffset;
157
158  /**
159   * Size on disk of header + data. Excludes checksum. Header field 6,
160   * OR calculated from {@link #onDiskSizeWithoutHeader} when using HDFS checksum.
161   * @see Writer#putHeader(byte[], int, int, int, int)
162   */
163  private int onDiskDataSizeWithHeader;
164  // End of Block Header fields.
165
166  /**
167   * The in-memory representation of the hfile block. Can be on or offheap. Can be backed by
168   * a single ByteBuffer or by many. Make no assumptions.
169   *
170   * <p>Be careful reading from this <code>buf</code>. Duplicate and work on the duplicate or if
171   * not, be sure to reset position and limit else trouble down the road.
172   *
173   * <p>TODO: Make this read-only once made.
174   *
175   * <p>We are using the ByteBuff type. ByteBuffer is not extensible yet we need to be able to have
176   * a ByteBuffer-like API across multiple ByteBuffers reading from a cache such as BucketCache.
177   * So, we have this ByteBuff type. Unfortunately, it is spread all about HFileBlock. Would be
178   * good if could be confined to cache-use only but hard-to-do.
179   */
180  private ByteBuff buf;
181
182  /** Meta data that holds meta information on the hfileblock.
183   */
184  private HFileContext fileContext;
185
186  /**
187   * The offset of this block in the file. Populated by the reader for
188   * convenience of access. This offset is not part of the block header.
189   */
190  private long offset = UNSET;
191
192  private MemoryType memType = MemoryType.EXCLUSIVE;
193
194  /**
195   * The on-disk size of the next block, including the header and checksums if present.
196   * UNSET if unknown.
197   *
198   * Blocks try to carry the size of the next block to read in this data member. Usually
199   * we get block sizes from the hfile index but sometimes the index is not available:
200   * e.g. when we read the indexes themselves (indexes are stored in blocks, we do not
201   * have an index for the indexes). Saves seeks especially around file open when
202   * there is a flurry of reading in hfile metadata.
203   */
204  private int nextBlockOnDiskSize = UNSET;
205
206  /**
207   * On a checksum failure, do these many succeeding read requests using hdfs checksums before
208   * auto-reenabling hbase checksum verification.
209   */
210  static final int CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD = 3;
211
212  private static int UNSET = -1;
213  public static final boolean FILL_HEADER = true;
214  public static final boolean DONT_FILL_HEADER = false;
215
216  // How to get the estimate correctly? if it is a singleBB?
217  public static final int MULTI_BYTE_BUFFER_HEAP_SIZE =
218      (int)ClassSize.estimateBase(MultiByteBuff.class, false);
219
220  /**
221   * Space for metadata on a block that gets stored along with the block when we cache it.
222   * There are a few bytes stuck on the end of the HFileBlock that we pull in from HDFS.
223   * 8 bytes are for the offset of this block (long) in the file. Offset is important because is is
224   * used when we remake the CacheKey when we return block to the cache when done. There is also
225   * a flag on whether checksumming is being done by hbase or not. See class comment for note on
226   * uncertain state of checksumming of blocks that come out of cache (should we or should we not?).
227   * Finally there are 4 bytes to hold the length of the next block which can save a seek on
228   * occasion if available.
229   * (This EXTRA info came in with original commit of the bucketcache, HBASE-7404. It was
230   * formerly known as EXTRA_SERIALIZATION_SPACE).
231   */
232  static final int BLOCK_METADATA_SPACE = Bytes.SIZEOF_BYTE + Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT;
233
234  /**
235   * Each checksum value is an integer that can be stored in 4 bytes.
236   */
237  static final int CHECKSUM_SIZE = Bytes.SIZEOF_INT;
238
239  static final byte[] DUMMY_HEADER_NO_CHECKSUM =
240      new byte[HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM];
241
242  /**
243   * Used deserializing blocks from Cache.
244   *
245   * <code>
246   * ++++++++++++++
247   * + HFileBlock +
248   * ++++++++++++++
249   * + Checksums  + <= Optional
250   * ++++++++++++++
251   * + Metadata!  + <= See note on BLOCK_METADATA_SPACE above.
252   * ++++++++++++++
253   * </code>
254   * @see #serialize(ByteBuffer)
255   */
256  static final CacheableDeserializer<Cacheable> BLOCK_DESERIALIZER =
257      new CacheableDeserializer<Cacheable>() {
258    @Override
259    public HFileBlock deserialize(ByteBuff buf, boolean reuse, MemoryType memType)
260        throws IOException {
261      // The buf has the file block followed by block metadata.
262      // Set limit to just before the BLOCK_METADATA_SPACE then rewind.
263      buf.limit(buf.limit() - BLOCK_METADATA_SPACE).rewind();
264      // Get a new buffer to pass the HFileBlock for it to 'own'.
265      ByteBuff newByteBuff;
266      if (reuse) {
267        newByteBuff = buf.slice();
268      } else {
269        int len = buf.limit();
270        newByteBuff = new SingleByteBuff(ByteBuffer.allocate(len));
271        newByteBuff.put(0, buf, buf.position(), len);
272      }
273      // Read out the BLOCK_METADATA_SPACE content and shove into our HFileBlock.
274      buf.position(buf.limit());
275      buf.limit(buf.limit() + HFileBlock.BLOCK_METADATA_SPACE);
276      boolean usesChecksum = buf.get() == (byte) 1;
277      long offset = buf.getLong();
278      int nextBlockOnDiskSize = buf.getInt();
279      HFileBlock hFileBlock =
280          new HFileBlock(newByteBuff, usesChecksum, memType, offset, nextBlockOnDiskSize, null);
281      return hFileBlock;
282    }
283
284    @Override
285    public int getDeserialiserIdentifier() {
286      return DESERIALIZER_IDENTIFIER;
287    }
288
289    @Override
290    public HFileBlock deserialize(ByteBuff b) throws IOException {
291      // Used only in tests
292      return deserialize(b, false, MemoryType.EXCLUSIVE);
293    }
294  };
295
296  private static final int DESERIALIZER_IDENTIFIER;
297  static {
298    DESERIALIZER_IDENTIFIER =
299        CacheableDeserializerIdManager.registerDeserializer(BLOCK_DESERIALIZER);
300  }
301
302  /**
303   * Copy constructor. Creates a shallow copy of {@code that}'s buffer.
304   */
305  private HFileBlock(HFileBlock that) {
306    this(that, false);
307  }
308
309  /**
310   * Copy constructor. Creates a shallow/deep copy of {@code that}'s buffer as per the boolean
311   * param.
312   */
313  private HFileBlock(HFileBlock that, boolean bufCopy) {
314    init(that.blockType, that.onDiskSizeWithoutHeader,
315        that.uncompressedSizeWithoutHeader, that.prevBlockOffset,
316        that.offset, that.onDiskDataSizeWithHeader, that.nextBlockOnDiskSize, that.fileContext);
317    if (bufCopy) {
318      this.buf = new SingleByteBuff(ByteBuffer.wrap(that.buf.toBytes(0, that.buf.limit())));
319    } else {
320      this.buf = that.buf.duplicate();
321    }
322  }
323
324  /**
325   * Creates a new {@link HFile} block from the given fields. This constructor
326   * is used only while writing blocks and caching,
327   * and is sitting in a byte buffer and we want to stuff the block into cache.
328   * See {@link Writer#getBlockForCaching(CacheConfig)}.
329   *
330   * <p>TODO: The caller presumes no checksumming
331   * required of this block instance since going into cache; checksum already verified on
332   * underlying block data pulled in from filesystem. Is that correct? What if cache is SSD?
333   *
334   * @param blockType the type of this block, see {@link BlockType}
335   * @param onDiskSizeWithoutHeader see {@link #onDiskSizeWithoutHeader}
336   * @param uncompressedSizeWithoutHeader see {@link #uncompressedSizeWithoutHeader}
337   * @param prevBlockOffset see {@link #prevBlockOffset}
338   * @param b block header ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes)
339   * @param fillHeader when true, write the first 4 header fields into passed buffer.
340   * @param offset the file offset the block was read from
341   * @param onDiskDataSizeWithHeader see {@link #onDiskDataSizeWithHeader}
342   * @param fileContext HFile meta data
343   */
344  @VisibleForTesting
345  public HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader,
346      int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuffer b, boolean fillHeader,
347      long offset, final int nextBlockOnDiskSize, int onDiskDataSizeWithHeader,
348      HFileContext fileContext) {
349    init(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader,
350        prevBlockOffset, offset, onDiskDataSizeWithHeader, nextBlockOnDiskSize, fileContext);
351    this.buf = new SingleByteBuff(b);
352    if (fillHeader) {
353      overwriteHeader();
354    }
355    this.buf.rewind();
356  }
357
358  /**
359   * Creates a block from an existing buffer starting with a header. Rewinds
360   * and takes ownership of the buffer. By definition of rewind, ignores the
361   * buffer position, but if you slice the buffer beforehand, it will rewind
362   * to that point.
363   * @param buf Has header, content, and trailing checksums if present.
364   */
365  HFileBlock(ByteBuff buf, boolean usesHBaseChecksum, MemoryType memType, final long offset,
366      final int nextBlockOnDiskSize, HFileContext fileContext) throws IOException {
367    buf.rewind();
368    final BlockType blockType = BlockType.read(buf);
369    final int onDiskSizeWithoutHeader = buf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX);
370    final int uncompressedSizeWithoutHeader =
371        buf.getInt(Header.UNCOMPRESSED_SIZE_WITHOUT_HEADER_INDEX);
372    final long prevBlockOffset = buf.getLong(Header.PREV_BLOCK_OFFSET_INDEX);
373    // This constructor is called when we deserialize a block from cache and when we read a block in
374    // from the fs. fileCache is null when deserialized from cache so need to make up one.
375    HFileContextBuilder fileContextBuilder = fileContext != null?
376        new HFileContextBuilder(fileContext): new HFileContextBuilder();
377    fileContextBuilder.withHBaseCheckSum(usesHBaseChecksum);
378    int onDiskDataSizeWithHeader;
379    if (usesHBaseChecksum) {
380      byte checksumType = buf.get(Header.CHECKSUM_TYPE_INDEX);
381      int bytesPerChecksum = buf.getInt(Header.BYTES_PER_CHECKSUM_INDEX);
382      onDiskDataSizeWithHeader = buf.getInt(Header.ON_DISK_DATA_SIZE_WITH_HEADER_INDEX);
383      // Use the checksum type and bytes per checksum from header, not from filecontext.
384      fileContextBuilder.withChecksumType(ChecksumType.codeToType(checksumType));
385      fileContextBuilder.withBytesPerCheckSum(bytesPerChecksum);
386    } else {
387      fileContextBuilder.withChecksumType(ChecksumType.NULL);
388      fileContextBuilder.withBytesPerCheckSum(0);
389      // Need to fix onDiskDataSizeWithHeader; there are not checksums after-block-data
390      onDiskDataSizeWithHeader = onDiskSizeWithoutHeader + headerSize(usesHBaseChecksum);
391    }
392    fileContext = fileContextBuilder.build();
393    assert usesHBaseChecksum == fileContext.isUseHBaseChecksum();
394    init(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader,
395        prevBlockOffset, offset, onDiskDataSizeWithHeader, nextBlockOnDiskSize, fileContext);
396    this.memType = memType;
397    this.offset = offset;
398    this.buf = buf;
399    this.buf.rewind();
400  }
401
402  /**
403   * Called from constructors.
404   */
405  private void init(BlockType blockType, int onDiskSizeWithoutHeader,
406      int uncompressedSizeWithoutHeader, long prevBlockOffset,
407      long offset, int onDiskDataSizeWithHeader, final int nextBlockOnDiskSize,
408      HFileContext fileContext) {
409    this.blockType = blockType;
410    this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader;
411    this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader;
412    this.prevBlockOffset = prevBlockOffset;
413    this.offset = offset;
414    this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader;
415    this.nextBlockOnDiskSize = nextBlockOnDiskSize;
416    this.fileContext = fileContext;
417  }
418
419  /**
420   * Parse total on disk size including header and checksum.
421   * @param headerBuf Header ByteBuffer. Presumed exact size of header.
422   * @param verifyChecksum true if checksum verification is in use.
423   * @return Size of the block with header included.
424   */
425  private static int getOnDiskSizeWithHeader(final ByteBuffer headerBuf,
426      boolean verifyChecksum) {
427    return headerBuf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX) +
428      headerSize(verifyChecksum);
429  }
430
431  /**
432   * @return the on-disk size of the next block (including the header size and any checksums if
433   * present) read by peeking into the next block's header; use as a hint when doing
434   * a read of the next block when scanning or running over a file.
435   */
436  int getNextBlockOnDiskSize() {
437    return nextBlockOnDiskSize;
438  }
439
440  @Override
441  public BlockType getBlockType() {
442    return blockType;
443  }
444
445  /** @return get data block encoding id that was used to encode this block */
446  short getDataBlockEncodingId() {
447    if (blockType != BlockType.ENCODED_DATA) {
448      throw new IllegalArgumentException("Querying encoder ID of a block " +
449          "of type other than " + BlockType.ENCODED_DATA + ": " + blockType);
450    }
451    return buf.getShort(headerSize());
452  }
453
454  /**
455   * @return the on-disk size of header + data part + checksum.
456   */
457  public int getOnDiskSizeWithHeader() {
458    return onDiskSizeWithoutHeader + headerSize();
459  }
460
461  /**
462   * @return the on-disk size of the data part + checksum (header excluded).
463   */
464  int getOnDiskSizeWithoutHeader() {
465    return onDiskSizeWithoutHeader;
466  }
467
468  /**
469   * @return the uncompressed size of data part (header and checksum excluded).
470   */
471   int getUncompressedSizeWithoutHeader() {
472    return uncompressedSizeWithoutHeader;
473  }
474
475  /**
476   * @return the offset of the previous block of the same type in the file, or
477   *         -1 if unknown
478   */
479  long getPrevBlockOffset() {
480    return prevBlockOffset;
481  }
482
483  /**
484   * Rewinds {@code buf} and writes first 4 header fields. {@code buf} position
485   * is modified as side-effect.
486   */
487  private void overwriteHeader() {
488    buf.rewind();
489    blockType.write(buf);
490    buf.putInt(onDiskSizeWithoutHeader);
491    buf.putInt(uncompressedSizeWithoutHeader);
492    buf.putLong(prevBlockOffset);
493    if (this.fileContext.isUseHBaseChecksum()) {
494      buf.put(fileContext.getChecksumType().getCode());
495      buf.putInt(fileContext.getBytesPerChecksum());
496      buf.putInt(onDiskDataSizeWithHeader);
497    }
498  }
499
500  /**
501   * Returns a buffer that does not include the header or checksum.
502   *
503   * @return the buffer with header skipped and checksum omitted.
504   */
505  public ByteBuff getBufferWithoutHeader() {
506    ByteBuff dup = getBufferReadOnly();
507    // Now set it up so Buffer spans content only -- no header or no checksums.
508    return dup.position(headerSize()).limit(buf.limit() - totalChecksumBytes()).slice();
509  }
510
511  /**
512   * Returns a read-only duplicate of the buffer this block stores internally ready to be read.
513   * Clients must not modify the buffer object though they may set position and limit on the
514   * returned buffer since we pass back a duplicate. This method has to be public because it is used
515   * in {@link CompoundBloomFilter} to avoid object creation on every Bloom
516   * filter lookup, but has to be used with caution. Buffer holds header, block content,
517   * and any follow-on checksums if present.
518   *
519   * @return the buffer of this block for read-only operations
520   */
521  public ByteBuff getBufferReadOnly() {
522    // TODO: ByteBuf does not support asReadOnlyBuffer(). Fix.
523    ByteBuff dup = this.buf.duplicate();
524    assert dup.position() == 0;
525    return dup;
526  }
527
528  @VisibleForTesting
529  private void sanityCheckAssertion(long valueFromBuf, long valueFromField,
530      String fieldName) throws IOException {
531    if (valueFromBuf != valueFromField) {
532      throw new AssertionError(fieldName + " in the buffer (" + valueFromBuf
533          + ") is different from that in the field (" + valueFromField + ")");
534    }
535  }
536
537  @VisibleForTesting
538  private void sanityCheckAssertion(BlockType valueFromBuf, BlockType valueFromField)
539      throws IOException {
540    if (valueFromBuf != valueFromField) {
541      throw new IOException("Block type stored in the buffer: " +
542        valueFromBuf + ", block type field: " + valueFromField);
543    }
544  }
545
546  /**
547   * Checks if the block is internally consistent, i.e. the first
548   * {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the buffer contain a
549   * valid header consistent with the fields. Assumes a packed block structure.
550   * This function is primary for testing and debugging, and is not
551   * thread-safe, because it alters the internal buffer pointer.
552   * Used by tests only.
553   */
554  @VisibleForTesting
555  void sanityCheck() throws IOException {
556    // Duplicate so no side-effects
557    ByteBuff dup = this.buf.duplicate().rewind();
558    sanityCheckAssertion(BlockType.read(dup), blockType);
559
560    sanityCheckAssertion(dup.getInt(), onDiskSizeWithoutHeader, "onDiskSizeWithoutHeader");
561
562    sanityCheckAssertion(dup.getInt(), uncompressedSizeWithoutHeader,
563        "uncompressedSizeWithoutHeader");
564
565    sanityCheckAssertion(dup.getLong(), prevBlockOffset, "prevBlockOffset");
566    if (this.fileContext.isUseHBaseChecksum()) {
567      sanityCheckAssertion(dup.get(), this.fileContext.getChecksumType().getCode(), "checksumType");
568      sanityCheckAssertion(dup.getInt(), this.fileContext.getBytesPerChecksum(),
569          "bytesPerChecksum");
570      sanityCheckAssertion(dup.getInt(), onDiskDataSizeWithHeader, "onDiskDataSizeWithHeader");
571    }
572
573    int cksumBytes = totalChecksumBytes();
574    int expectedBufLimit = onDiskDataSizeWithHeader + cksumBytes;
575    if (dup.limit() != expectedBufLimit) {
576      throw new AssertionError("Expected limit " + expectedBufLimit + ", got " + dup.limit());
577    }
578
579    // We might optionally allocate HFILEBLOCK_HEADER_SIZE more bytes to read the next
580    // block's header, so there are two sensible values for buffer capacity.
581    int hdrSize = headerSize();
582    if (dup.capacity() != expectedBufLimit && dup.capacity() != expectedBufLimit + hdrSize) {
583      throw new AssertionError("Invalid buffer capacity: " + dup.capacity() +
584          ", expected " + expectedBufLimit + " or " + (expectedBufLimit + hdrSize));
585    }
586  }
587
588  @Override
589  public String toString() {
590    StringBuilder sb = new StringBuilder()
591      .append("[")
592      .append("blockType=").append(blockType)
593      .append(", fileOffset=").append(offset)
594      .append(", headerSize=").append(headerSize())
595      .append(", onDiskSizeWithoutHeader=").append(onDiskSizeWithoutHeader)
596      .append(", uncompressedSizeWithoutHeader=").append(uncompressedSizeWithoutHeader)
597      .append(", prevBlockOffset=").append(prevBlockOffset)
598      .append(", isUseHBaseChecksum=").append(fileContext.isUseHBaseChecksum());
599    if (fileContext.isUseHBaseChecksum()) {
600      sb.append(", checksumType=").append(ChecksumType.codeToType(this.buf.get(24)))
601        .append(", bytesPerChecksum=").append(this.buf.getInt(24 + 1))
602        .append(", onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader);
603    } else {
604      sb.append(", onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader)
605        .append("(").append(onDiskSizeWithoutHeader)
606        .append("+").append(HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM).append(")");
607    }
608    String dataBegin = null;
609    if (buf.hasArray()) {
610      dataBegin = Bytes.toStringBinary(buf.array(), buf.arrayOffset() + headerSize(),
611          Math.min(32, buf.limit() - buf.arrayOffset() - headerSize()));
612    } else {
613      ByteBuff bufWithoutHeader = getBufferWithoutHeader();
614      byte[] dataBeginBytes = new byte[Math.min(32,
615          bufWithoutHeader.limit() - bufWithoutHeader.position())];
616      bufWithoutHeader.get(dataBeginBytes);
617      dataBegin = Bytes.toStringBinary(dataBeginBytes);
618    }
619    sb.append(", getOnDiskSizeWithHeader=").append(getOnDiskSizeWithHeader())
620      .append(", totalChecksumBytes=").append(totalChecksumBytes())
621      .append(", isUnpacked=").append(isUnpacked())
622      .append(", buf=[").append(buf).append("]")
623      .append(", dataBeginsWith=").append(dataBegin)
624      .append(", fileContext=").append(fileContext)
625      .append(", nextBlockOnDiskSize=").append(nextBlockOnDiskSize)
626      .append("]");
627    return sb.toString();
628  }
629
630  /**
631   * Retrieves the decompressed/decrypted view of this block. An encoded block remains in its
632   * encoded structure. Internal structures are shared between instances where applicable.
633   */
634  HFileBlock unpack(HFileContext fileContext, FSReader reader) throws IOException {
635    if (!fileContext.isCompressedOrEncrypted()) {
636      // TODO: cannot use our own fileContext here because HFileBlock(ByteBuffer, boolean),
637      // which is used for block serialization to L2 cache, does not preserve encoding and
638      // encryption details.
639      return this;
640    }
641
642    HFileBlock unpacked = new HFileBlock(this);
643    unpacked.allocateBuffer(); // allocates space for the decompressed block
644
645    HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA ?
646      reader.getBlockDecodingContext() : reader.getDefaultBlockDecodingContext();
647
648    ByteBuff dup = this.buf.duplicate();
649    dup.position(this.headerSize());
650    dup = dup.slice();
651    ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(),
652      unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(),
653      dup);
654    return unpacked;
655  }
656
657  /**
658   * Always allocates a new buffer of the correct size. Copies header bytes
659   * from the existing buffer. Does not change header fields.
660   * Reserve room to keep checksum bytes too.
661   */
662  private void allocateBuffer() {
663    int cksumBytes = totalChecksumBytes();
664    int headerSize = headerSize();
665    int capacityNeeded = headerSize + uncompressedSizeWithoutHeader + cksumBytes;
666
667    // TODO we need consider allocating offheap here?
668    ByteBuffer newBuf = ByteBuffer.allocate(capacityNeeded);
669
670    // Copy header bytes into newBuf.
671    // newBuf is HBB so no issue in calling array()
672    buf.position(0);
673    buf.get(newBuf.array(), newBuf.arrayOffset(), headerSize);
674
675    buf = new SingleByteBuff(newBuf);
676    // set limit to exclude next block's header
677    buf.limit(headerSize + uncompressedSizeWithoutHeader + cksumBytes);
678  }
679
680  /**
681   * Return true when this block's buffer has been unpacked, false otherwise. Note this is a
682   * calculated heuristic, not tracked attribute of the block.
683   */
684  public boolean isUnpacked() {
685    final int cksumBytes = totalChecksumBytes();
686    final int headerSize = headerSize();
687    final int expectedCapacity = headerSize + uncompressedSizeWithoutHeader + cksumBytes;
688    final int bufCapacity = buf.capacity();
689    return bufCapacity == expectedCapacity || bufCapacity == expectedCapacity + headerSize;
690  }
691
692  /** An additional sanity-check in case no compression or encryption is being used. */
693  @VisibleForTesting
694  void sanityCheckUncompressedSize() throws IOException {
695    if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader + totalChecksumBytes()) {
696      throw new IOException("Using no compression but "
697          + "onDiskSizeWithoutHeader=" + onDiskSizeWithoutHeader + ", "
698          + "uncompressedSizeWithoutHeader=" + uncompressedSizeWithoutHeader
699          + ", numChecksumbytes=" + totalChecksumBytes());
700    }
701  }
702
703  /**
704   * Cannot be {@link #UNSET}. Must be a legitimate value. Used re-making the {@link BlockCacheKey} when
705   * block is returned to the cache.
706   * @return the offset of this block in the file it was read from
707   */
708  long getOffset() {
709    if (offset < 0) {
710      throw new IllegalStateException("HFile block offset not initialized properly");
711    }
712    return offset;
713  }
714
715  /**
716   * @return a byte stream reading the data + checksum of this block
717   */
718  DataInputStream getByteStream() {
719    ByteBuff dup = this.buf.duplicate();
720    dup.position(this.headerSize());
721    return new DataInputStream(new ByteBuffInputStream(dup));
722  }
723
724  @Override
725  public long heapSize() {
726    long size = ClassSize.align(
727        ClassSize.OBJECT +
728        // Block type, multi byte buffer, MemoryType and meta references
729        4 * ClassSize.REFERENCE +
730        // On-disk size, uncompressed size, and next block's on-disk size
731        // bytePerChecksum and onDiskDataSize
732        4 * Bytes.SIZEOF_INT +
733        // This and previous block offset
734        2 * Bytes.SIZEOF_LONG +
735        // Heap size of the meta object. meta will be always not null.
736        fileContext.heapSize()
737    );
738
739    if (buf != null) {
740      // Deep overhead of the byte buffer. Needs to be aligned separately.
741      size += ClassSize.align(buf.capacity() + MULTI_BYTE_BUFFER_HEAP_SIZE);
742    }
743
744    return ClassSize.align(size);
745  }
746
747  /**
748   * Read from an input stream at least <code>necessaryLen</code> and if possible,
749   * <code>extraLen</code> also if available. Analogous to
750   * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but specifies a
751   * number of "extra" bytes to also optionally read.
752   *
753   * @param in the input stream to read from
754   * @param buf the buffer to read into
755   * @param bufOffset the destination offset in the buffer
756   * @param necessaryLen the number of bytes that are absolutely necessary to read
757   * @param extraLen the number of extra bytes that would be nice to read
758   * @return true if succeeded reading the extra bytes
759   * @throws IOException if failed to read the necessary bytes
760   */
761  static boolean readWithExtra(InputStream in, byte[] buf,
762      int bufOffset, int necessaryLen, int extraLen) throws IOException {
763    int bytesRemaining = necessaryLen + extraLen;
764    while (bytesRemaining > 0) {
765      int ret = in.read(buf, bufOffset, bytesRemaining);
766      if (ret == -1 && bytesRemaining <= extraLen) {
767        // We could not read the "extra data", but that is OK.
768        break;
769      }
770      if (ret < 0) {
771        throw new IOException("Premature EOF from inputStream (read "
772            + "returned " + ret + ", was trying to read " + necessaryLen
773            + " necessary bytes and " + extraLen + " extra bytes, "
774            + "successfully read "
775            + (necessaryLen + extraLen - bytesRemaining));
776      }
777      bufOffset += ret;
778      bytesRemaining -= ret;
779    }
780    return bytesRemaining <= 0;
781  }
782
783  /**
784   * Read from an input stream at least <code>necessaryLen</code> and if possible,
785   * <code>extraLen</code> also if available. Analogous to
786   * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but uses
787   * positional read and specifies a number of "extra" bytes that would be
788   * desirable but not absolutely necessary to read.
789   *
790   * @param in the input stream to read from
791   * @param position the position within the stream from which to start reading
792   * @param buf the buffer to read into
793   * @param bufOffset the destination offset in the buffer
794   * @param necessaryLen the number of bytes that are absolutely necessary to
795   *     read
796   * @param extraLen the number of extra bytes that would be nice to read
797   * @return true if and only if extraLen is > 0 and reading those extra bytes
798   *     was successful
799   * @throws IOException if failed to read the necessary bytes
800   */
801  @VisibleForTesting
802  static boolean positionalReadWithExtra(FSDataInputStream in,
803      long position, byte[] buf, int bufOffset, int necessaryLen, int extraLen)
804      throws IOException {
805    int bytesRemaining = necessaryLen + extraLen;
806    int bytesRead = 0;
807    while (bytesRead < necessaryLen) {
808      int ret = in.read(position, buf, bufOffset, bytesRemaining);
809      if (ret < 0) {
810        throw new IOException("Premature EOF from inputStream (positional read "
811            + "returned " + ret + ", was trying to read " + necessaryLen
812            + " necessary bytes and " + extraLen + " extra bytes, "
813            + "successfully read " + bytesRead);
814      }
815      position += ret;
816      bufOffset += ret;
817      bytesRemaining -= ret;
818      bytesRead += ret;
819    }
820    return bytesRead != necessaryLen && bytesRemaining <= 0;
821  }
822
823  /**
824   * Unified version 2 {@link HFile} block writer. The intended usage pattern
825   * is as follows:
826   * <ol>
827   * <li>Construct an {@link HFileBlock.Writer}, providing a compression algorithm.
828   * <li>Call {@link Writer#startWriting} and get a data stream to write to.
829   * <li>Write your data into the stream.
830   * <li>Call Writer#writeHeaderAndData(FSDataOutputStream) as many times as you need to.
831   * store the serialized block into an external stream.
832   * <li>Repeat to write more blocks.
833   * </ol>
834   * <p>
835   */
836  static class Writer {
837    private enum State {
838      INIT,
839      WRITING,
840      BLOCK_READY
841    };
842
843    /** Writer state. Used to ensure the correct usage protocol. */
844    private State state = State.INIT;
845
846    /** Data block encoder used for data blocks */
847    private final HFileDataBlockEncoder dataBlockEncoder;
848
849    private HFileBlockEncodingContext dataBlockEncodingCtx;
850
851    /** block encoding context for non-data blocks*/
852    private HFileBlockDefaultEncodingContext defaultBlockEncodingCtx;
853
854    /**
855     * The stream we use to accumulate data into a block in an uncompressed format.
856     * We reset this stream at the end of each block and reuse it. The
857     * header is written as the first {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes into this
858     * stream.
859     */
860    private ByteArrayOutputStream baosInMemory;
861
862    /**
863     * Current block type. Set in {@link #startWriting(BlockType)}. Could be
864     * changed in {@link #finishBlock()} from {@link BlockType#DATA}
865     * to {@link BlockType#ENCODED_DATA}.
866     */
867    private BlockType blockType;
868
869    /**
870     * A stream that we write uncompressed bytes to, which compresses them and
871     * writes them to {@link #baosInMemory}.
872     */
873    private DataOutputStream userDataStream;
874
875    // Size of actual data being written. Not considering the block encoding/compression. This
876    // includes the header size also.
877    private int unencodedDataSizeWritten;
878
879    // Size of actual data being written. considering the block encoding. This
880    // includes the header size also.
881    private int encodedDataSizeWritten;
882
883    /**
884     * Bytes to be written to the file system, including the header. Compressed
885     * if compression is turned on. It also includes the checksum data that
886     * immediately follows the block data. (header + data + checksums)
887     */
888    private ByteArrayOutputStream onDiskBlockBytesWithHeader;
889
890    /**
891     * The size of the checksum data on disk. It is used only if data is
892     * not compressed. If data is compressed, then the checksums are already
893     * part of onDiskBytesWithHeader. If data is uncompressed, then this
894     * variable stores the checksum data for this block.
895     */
896    private byte[] onDiskChecksum = HConstants.EMPTY_BYTE_ARRAY;
897
898    /**
899     * Current block's start offset in the {@link HFile}. Set in
900     * {@link #writeHeaderAndData(FSDataOutputStream)}.
901     */
902    private long startOffset;
903
904    /**
905     * Offset of previous block by block type. Updated when the next block is
906     * started.
907     */
908    private long[] prevOffsetByType;
909
910    /** The offset of the previous block of the same type */
911    private long prevOffset;
912    /** Meta data that holds information about the hfileblock**/
913    private HFileContext fileContext;
914
915    /**
916     * @param dataBlockEncoder data block encoding algorithm to use
917     */
918    public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext fileContext) {
919      if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) {
920        throw new RuntimeException("Unsupported value of bytesPerChecksum. " +
921            " Minimum is " + HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is " +
922            fileContext.getBytesPerChecksum());
923      }
924      this.dataBlockEncoder = dataBlockEncoder != null?
925          dataBlockEncoder: NoOpDataBlockEncoder.INSTANCE;
926      this.dataBlockEncodingCtx = this.dataBlockEncoder.
927          newDataBlockEncodingContext(HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
928      // TODO: This should be lazily instantiated since we usually do NOT need this default encoder
929      this.defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(null,
930          HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
931      // TODO: Set BAOS initial size. Use fileContext.getBlocksize() and add for header/checksum
932      baosInMemory = new ByteArrayOutputStream();
933      prevOffsetByType = new long[BlockType.values().length];
934      for (int i = 0; i < prevOffsetByType.length; ++i) {
935        prevOffsetByType[i] = UNSET;
936      }
937      // TODO: Why fileContext saved away when we have dataBlockEncoder and/or
938      // defaultDataBlockEncoder?
939      this.fileContext = fileContext;
940    }
941
942    /**
943     * Starts writing into the block. The previous block's data is discarded.
944     *
945     * @return the stream the user can write their data into
946     * @throws IOException
947     */
948    DataOutputStream startWriting(BlockType newBlockType)
949        throws IOException {
950      if (state == State.BLOCK_READY && startOffset != -1) {
951        // We had a previous block that was written to a stream at a specific
952        // offset. Save that offset as the last offset of a block of that type.
953        prevOffsetByType[blockType.getId()] = startOffset;
954      }
955
956      startOffset = -1;
957      blockType = newBlockType;
958
959      baosInMemory.reset();
960      baosInMemory.write(HConstants.HFILEBLOCK_DUMMY_HEADER);
961
962      state = State.WRITING;
963
964      // We will compress it later in finishBlock()
965      userDataStream = new ByteBufferWriterDataOutputStream(baosInMemory);
966      if (newBlockType == BlockType.DATA) {
967        this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream);
968      }
969      this.unencodedDataSizeWritten = 0;
970      this.encodedDataSizeWritten = 0;
971      return userDataStream;
972    }
973
974    /**
975     * Writes the Cell to this block
976     * @param cell
977     * @throws IOException
978     */
979    void write(Cell cell) throws IOException{
980      expectState(State.WRITING);
981      int posBeforeEncode = this.userDataStream.size();
982      this.unencodedDataSizeWritten +=
983          this.dataBlockEncoder.encode(cell, dataBlockEncodingCtx, this.userDataStream);
984      this.encodedDataSizeWritten += this.userDataStream.size() - posBeforeEncode;
985    }
986
987    /**
988     * Returns the stream for the user to write to. The block writer takes care
989     * of handling compression and buffering for caching on write. Can only be
990     * called in the "writing" state.
991     *
992     * @return the data output stream for the user to write to
993     */
994    DataOutputStream getUserDataStream() {
995      expectState(State.WRITING);
996      return userDataStream;
997    }
998
999    /**
1000     * Transitions the block writer from the "writing" state to the "block
1001     * ready" state.  Does nothing if a block is already finished.
1002     */
1003    void ensureBlockReady() throws IOException {
1004      Preconditions.checkState(state != State.INIT,
1005          "Unexpected state: " + state);
1006
1007      if (state == State.BLOCK_READY) {
1008        return;
1009      }
1010
1011      // This will set state to BLOCK_READY.
1012      finishBlock();
1013    }
1014
1015    /**
1016     * Finish up writing of the block.
1017     * Flushes the compressing stream (if using compression), fills out the header,
1018     * does any compression/encryption of bytes to flush out to disk, and manages
1019     * the cache on write content, if applicable. Sets block write state to "block ready".
1020     */
1021    private void finishBlock() throws IOException {
1022      if (blockType == BlockType.DATA) {
1023        this.dataBlockEncoder.endBlockEncoding(dataBlockEncodingCtx, userDataStream,
1024            baosInMemory.getBuffer(), blockType);
1025        blockType = dataBlockEncodingCtx.getBlockType();
1026      }
1027      userDataStream.flush();
1028      prevOffset = prevOffsetByType[blockType.getId()];
1029
1030      // We need to set state before we can package the block up for cache-on-write. In a way, the
1031      // block is ready, but not yet encoded or compressed.
1032      state = State.BLOCK_READY;
1033      Bytes compressAndEncryptDat;
1034      if (blockType == BlockType.DATA || blockType == BlockType.ENCODED_DATA) {
1035        compressAndEncryptDat = dataBlockEncodingCtx.
1036            compressAndEncrypt(baosInMemory.getBuffer(), 0, baosInMemory.size());
1037      } else {
1038        compressAndEncryptDat = defaultBlockEncodingCtx.
1039            compressAndEncrypt(baosInMemory.getBuffer(), 0, baosInMemory.size());
1040      }
1041      if (compressAndEncryptDat == null) {
1042        compressAndEncryptDat = new Bytes(baosInMemory.getBuffer(), 0, baosInMemory.size());
1043      }
1044      if (onDiskBlockBytesWithHeader == null) {
1045        onDiskBlockBytesWithHeader = new ByteArrayOutputStream(compressAndEncryptDat.getLength());
1046      }
1047      onDiskBlockBytesWithHeader.reset();
1048      onDiskBlockBytesWithHeader.write(compressAndEncryptDat.get(),
1049            compressAndEncryptDat.getOffset(), compressAndEncryptDat.getLength());
1050      // Calculate how many bytes we need for checksum on the tail of the block.
1051      int numBytes = (int) ChecksumUtil.numBytes(
1052          onDiskBlockBytesWithHeader.size(),
1053          fileContext.getBytesPerChecksum());
1054
1055      // Put the header for the on disk bytes; header currently is unfilled-out
1056      putHeader(onDiskBlockBytesWithHeader,
1057          onDiskBlockBytesWithHeader.size() + numBytes,
1058          baosInMemory.size(), onDiskBlockBytesWithHeader.size());
1059      if (onDiskChecksum.length != numBytes) {
1060        onDiskChecksum = new byte[numBytes];
1061      }
1062      ChecksumUtil.generateChecksums(
1063          onDiskBlockBytesWithHeader.getBuffer(), 0,onDiskBlockBytesWithHeader.size(),
1064          onDiskChecksum, 0, fileContext.getChecksumType(), fileContext.getBytesPerChecksum());
1065    }
1066
1067    /**
1068     * Put the header into the given byte array at the given offset.
1069     * @param onDiskSize size of the block on disk header + data + checksum
1070     * @param uncompressedSize size of the block after decompression (but
1071     *          before optional data block decoding) including header
1072     * @param onDiskDataSize size of the block on disk with header
1073     *        and data but not including the checksums
1074     */
1075    private void putHeader(byte[] dest, int offset, int onDiskSize,
1076        int uncompressedSize, int onDiskDataSize) {
1077      offset = blockType.put(dest, offset);
1078      offset = Bytes.putInt(dest, offset, onDiskSize - HConstants.HFILEBLOCK_HEADER_SIZE);
1079      offset = Bytes.putInt(dest, offset, uncompressedSize - HConstants.HFILEBLOCK_HEADER_SIZE);
1080      offset = Bytes.putLong(dest, offset, prevOffset);
1081      offset = Bytes.putByte(dest, offset, fileContext.getChecksumType().getCode());
1082      offset = Bytes.putInt(dest, offset, fileContext.getBytesPerChecksum());
1083      Bytes.putInt(dest, offset, onDiskDataSize);
1084    }
1085
1086    private void putHeader(ByteArrayOutputStream dest, int onDiskSize,
1087        int uncompressedSize, int onDiskDataSize) {
1088      putHeader(dest.getBuffer(),0, onDiskSize, uncompressedSize, onDiskDataSize);
1089    }
1090
1091    /**
1092     * Similar to {@link #writeHeaderAndData(FSDataOutputStream)}, but records
1093     * the offset of this block so that it can be referenced in the next block
1094     * of the same type.
1095     *
1096     * @param out
1097     * @throws IOException
1098     */
1099    void writeHeaderAndData(FSDataOutputStream out) throws IOException {
1100      long offset = out.getPos();
1101      if (startOffset != UNSET && offset != startOffset) {
1102        throw new IOException("A " + blockType + " block written to a "
1103            + "stream twice, first at offset " + startOffset + ", then at "
1104            + offset);
1105      }
1106      startOffset = offset;
1107
1108      finishBlockAndWriteHeaderAndData((DataOutputStream) out);
1109    }
1110
1111    /**
1112     * Writes the header and the compressed data of this block (or uncompressed
1113     * data when not using compression) into the given stream. Can be called in
1114     * the "writing" state or in the "block ready" state. If called in the
1115     * "writing" state, transitions the writer to the "block ready" state.
1116     *
1117     * @param out the output stream to write the
1118     * @throws IOException
1119     */
1120    protected void finishBlockAndWriteHeaderAndData(DataOutputStream out)
1121      throws IOException {
1122      ensureBlockReady();
1123      long startTime = System.currentTimeMillis();
1124      out.write(onDiskBlockBytesWithHeader.getBuffer(), 0, onDiskBlockBytesWithHeader.size());
1125      out.write(onDiskChecksum);
1126      HFile.updateWriteLatency(System.currentTimeMillis() - startTime);
1127    }
1128
1129    /**
1130     * Returns the header or the compressed data (or uncompressed data when not
1131     * using compression) as a byte array. Can be called in the "writing" state
1132     * or in the "block ready" state. If called in the "writing" state,
1133     * transitions the writer to the "block ready" state. This returns
1134     * the header + data + checksums stored on disk.
1135     *
1136     * @return header and data as they would be stored on disk in a byte array
1137     * @throws IOException
1138     */
1139    byte[] getHeaderAndDataForTest() throws IOException {
1140      ensureBlockReady();
1141      // This is not very optimal, because we are doing an extra copy.
1142      // But this method is used only by unit tests.
1143      byte[] output =
1144          new byte[onDiskBlockBytesWithHeader.size()
1145              + onDiskChecksum.length];
1146      System.arraycopy(onDiskBlockBytesWithHeader.getBuffer(), 0, output, 0,
1147          onDiskBlockBytesWithHeader.size());
1148      System.arraycopy(onDiskChecksum, 0, output,
1149          onDiskBlockBytesWithHeader.size(), onDiskChecksum.length);
1150      return output;
1151    }
1152
1153    /**
1154     * Releases resources used by this writer.
1155     */
1156    void release() {
1157      if (dataBlockEncodingCtx != null) {
1158        dataBlockEncodingCtx.close();
1159        dataBlockEncodingCtx = null;
1160      }
1161      if (defaultBlockEncodingCtx != null) {
1162        defaultBlockEncodingCtx.close();
1163        defaultBlockEncodingCtx = null;
1164      }
1165    }
1166
1167    /**
1168     * Returns the on-disk size of the data portion of the block. This is the
1169     * compressed size if compression is enabled. Can only be called in the
1170     * "block ready" state. Header is not compressed, and its size is not
1171     * included in the return value.
1172     *
1173     * @return the on-disk size of the block, not including the header.
1174     */
1175    int getOnDiskSizeWithoutHeader() {
1176      expectState(State.BLOCK_READY);
1177      return onDiskBlockBytesWithHeader.size() +
1178          onDiskChecksum.length - HConstants.HFILEBLOCK_HEADER_SIZE;
1179    }
1180
1181    /**
1182     * Returns the on-disk size of the block. Can only be called in the
1183     * "block ready" state.
1184     *
1185     * @return the on-disk size of the block ready to be written, including the
1186     *         header size, the data and the checksum data.
1187     */
1188    int getOnDiskSizeWithHeader() {
1189      expectState(State.BLOCK_READY);
1190      return onDiskBlockBytesWithHeader.size() + onDiskChecksum.length;
1191    }
1192
1193    /**
1194     * The uncompressed size of the block data. Does not include header size.
1195     */
1196    int getUncompressedSizeWithoutHeader() {
1197      expectState(State.BLOCK_READY);
1198      return baosInMemory.size() - HConstants.HFILEBLOCK_HEADER_SIZE;
1199    }
1200
1201    /**
1202     * The uncompressed size of the block data, including header size.
1203     */
1204    int getUncompressedSizeWithHeader() {
1205      expectState(State.BLOCK_READY);
1206      return baosInMemory.size();
1207    }
1208
1209    /** @return true if a block is being written  */
1210    boolean isWriting() {
1211      return state == State.WRITING;
1212    }
1213
1214    /**
1215     * Returns the number of bytes written into the current block so far, or
1216     * zero if not writing the block at the moment. Note that this will return
1217     * zero in the "block ready" state as well.
1218     *
1219     * @return the number of bytes written
1220     */
1221    public int encodedBlockSizeWritten() {
1222      if (state != State.WRITING)
1223        return 0;
1224      return this.encodedDataSizeWritten;
1225    }
1226
1227    /**
1228     * Returns the number of bytes written into the current block so far, or
1229     * zero if not writing the block at the moment. Note that this will return
1230     * zero in the "block ready" state as well.
1231     *
1232     * @return the number of bytes written
1233     */
1234    int blockSizeWritten() {
1235      if (state != State.WRITING) return 0;
1236      return this.unencodedDataSizeWritten;
1237    }
1238
1239    /**
1240     * Clones the header followed by the uncompressed data, even if using
1241     * compression. This is needed for storing uncompressed blocks in the block
1242     * cache. Can be called in the "writing" state or the "block ready" state.
1243     * Returns only the header and data, does not include checksum data.
1244     *
1245     * @return Returns a copy of uncompressed block bytes for caching on write
1246     */
1247    @VisibleForTesting
1248    ByteBuffer cloneUncompressedBufferWithHeader() {
1249      expectState(State.BLOCK_READY);
1250      byte[] uncompressedBlockBytesWithHeader = baosInMemory.toByteArray();
1251      int numBytes = (int) ChecksumUtil.numBytes(
1252          onDiskBlockBytesWithHeader.size(),
1253          fileContext.getBytesPerChecksum());
1254      putHeader(uncompressedBlockBytesWithHeader, 0,
1255        onDiskBlockBytesWithHeader.size() + numBytes,
1256        baosInMemory.size(), onDiskBlockBytesWithHeader.size());
1257      return ByteBuffer.wrap(uncompressedBlockBytesWithHeader);
1258    }
1259
1260    /**
1261     * Clones the header followed by the on-disk (compressed/encoded/encrypted) data. This is
1262     * needed for storing packed blocks in the block cache. Expects calling semantics identical to
1263     * {@link #getUncompressedBufferWithHeader()}. Returns only the header and data,
1264     * Does not include checksum data.
1265     *
1266     * @return Returns a copy of block bytes for caching on write
1267     */
1268    private ByteBuffer cloneOnDiskBufferWithHeader() {
1269      expectState(State.BLOCK_READY);
1270      return ByteBuffer.wrap(onDiskBlockBytesWithHeader.toByteArray());
1271    }
1272
1273    private void expectState(State expectedState) {
1274      if (state != expectedState) {
1275        throw new IllegalStateException("Expected state: " + expectedState +
1276            ", actual state: " + state);
1277      }
1278    }
1279
1280    /**
1281     * Takes the given {@link BlockWritable} instance, creates a new block of
1282     * its appropriate type, writes the writable into this block, and flushes
1283     * the block into the output stream. The writer is instructed not to buffer
1284     * uncompressed bytes for cache-on-write.
1285     *
1286     * @param bw the block-writable object to write as a block
1287     * @param out the file system output stream
1288     * @throws IOException
1289     */
1290    void writeBlock(BlockWritable bw, FSDataOutputStream out)
1291        throws IOException {
1292      bw.writeToBlock(startWriting(bw.getBlockType()));
1293      writeHeaderAndData(out);
1294    }
1295
1296    /**
1297     * Creates a new HFileBlock. Checksums have already been validated, so
1298     * the byte buffer passed into the constructor of this newly created
1299     * block does not have checksum data even though the header minor
1300     * version is MINOR_VERSION_WITH_CHECKSUM. This is indicated by setting a
1301     * 0 value in bytesPerChecksum. This method copies the on-disk or
1302     * uncompressed data to build the HFileBlock which is used only
1303     * while writing blocks and caching.
1304     *
1305     * <p>TODO: Should there be an option where a cache can ask that hbase preserve block
1306     * checksums for checking after a block comes out of the cache? Otehrwise, cache is responsible
1307     * for blocks being wholesome (ECC memory or if file-backed, it does checksumming).
1308     */
1309    HFileBlock getBlockForCaching(CacheConfig cacheConf) {
1310      HFileContext newContext = new HFileContextBuilder()
1311                                .withBlockSize(fileContext.getBlocksize())
1312                                .withBytesPerCheckSum(0)
1313                                .withChecksumType(ChecksumType.NULL) // no checksums in cached data
1314                                .withCompression(fileContext.getCompression())
1315                                .withDataBlockEncoding(fileContext.getDataBlockEncoding())
1316                                .withHBaseCheckSum(fileContext.isUseHBaseChecksum())
1317                                .withCompressTags(fileContext.isCompressTags())
1318                                .withIncludesMvcc(fileContext.isIncludesMvcc())
1319                                .withIncludesTags(fileContext.isIncludesTags())
1320                                .build();
1321       return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(),
1322          getUncompressedSizeWithoutHeader(), prevOffset,
1323          cacheConf.shouldCacheCompressed(blockType.getCategory())?
1324            cloneOnDiskBufferWithHeader() :
1325            cloneUncompressedBufferWithHeader(),
1326          FILL_HEADER, startOffset, UNSET,
1327          onDiskBlockBytesWithHeader.size() + onDiskChecksum.length, newContext);
1328    }
1329  }
1330
1331  /** Something that can be written into a block. */
1332  interface BlockWritable {
1333    /** The type of block this data should use. */
1334    BlockType getBlockType();
1335
1336    /**
1337     * Writes the block to the provided stream. Must not write any magic
1338     * records.
1339     *
1340     * @param out a stream to write uncompressed data into
1341     */
1342    void writeToBlock(DataOutput out) throws IOException;
1343  }
1344
1345  /** Iterator for {@link HFileBlock}s. */
1346  interface BlockIterator {
1347    /**
1348     * Get the next block, or null if there are no more blocks to iterate.
1349     */
1350    HFileBlock nextBlock() throws IOException;
1351
1352    /**
1353     * Similar to {@link #nextBlock()} but checks block type, throws an
1354     * exception if incorrect, and returns the HFile block
1355     */
1356    HFileBlock nextBlockWithBlockType(BlockType blockType) throws IOException;
1357  }
1358
1359  /** An HFile block reader with iteration ability. */
1360  interface FSReader {
1361    /**
1362     * Reads the block at the given offset in the file with the given on-disk
1363     * size and uncompressed size.
1364     *
1365     * @param offset
1366     * @param onDiskSize the on-disk size of the entire block, including all
1367     *          applicable headers, or -1 if unknown
1368     * @return the newly read block
1369     */
1370    HFileBlock readBlockData(long offset, long onDiskSize, boolean pread, boolean updateMetrics)
1371        throws IOException;
1372
1373    /**
1374     * Creates a block iterator over the given portion of the {@link HFile}.
1375     * The iterator returns blocks starting with offset such that offset &lt;=
1376     * startOffset &lt; endOffset. Returned blocks are always unpacked.
1377     * Used when no hfile index available; e.g. reading in the hfile index
1378     * blocks themselves on file open.
1379     *
1380     * @param startOffset the offset of the block to start iteration with
1381     * @param endOffset the offset to end iteration at (exclusive)
1382     * @return an iterator of blocks between the two given offsets
1383     */
1384    BlockIterator blockRange(long startOffset, long endOffset);
1385
1386    /** Closes the backing streams */
1387    void closeStreams() throws IOException;
1388
1389    /** Get a decoder for {@link BlockType#ENCODED_DATA} blocks from this file. */
1390    HFileBlockDecodingContext getBlockDecodingContext();
1391
1392    /** Get the default decoder for blocks from this file. */
1393    HFileBlockDecodingContext getDefaultBlockDecodingContext();
1394
1395    void setIncludesMemStoreTS(boolean includesMemstoreTS);
1396    void setDataBlockEncoder(HFileDataBlockEncoder encoder);
1397
1398    /**
1399     * To close the stream's socket. Note: This can be concurrently called from multiple threads and
1400     * implementation should take care of thread safety.
1401     */
1402    void unbufferStream();
1403  }
1404
1405  /**
1406   * Data-structure to use caching the header of the NEXT block. Only works if next read
1407   * that comes in here is next in sequence in this block.
1408   *
1409   * When we read, we read current block and the next blocks' header. We do this so we have
1410   * the length of the next block to read if the hfile index is not available (rare, at
1411   * hfile open only).
1412   */
1413  private static class PrefetchedHeader {
1414    long offset = -1;
1415    byte [] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE];
1416    final ByteBuffer buf = ByteBuffer.wrap(header, 0, HConstants.HFILEBLOCK_HEADER_SIZE);
1417
1418    @Override
1419    public String toString() {
1420      return "offset=" + this.offset + ", header=" + Bytes.toStringBinary(header);
1421    }
1422  }
1423
1424  /**
1425   * Reads version 2 HFile blocks from the filesystem.
1426   */
1427  static class FSReaderImpl implements FSReader {
1428    /** The file system stream of the underlying {@link HFile} that
1429     * does or doesn't do checksum validations in the filesystem */
1430    private FSDataInputStreamWrapper streamWrapper;
1431
1432    private HFileBlockDecodingContext encodedBlockDecodingCtx;
1433
1434    /** Default context used when BlockType != {@link BlockType#ENCODED_DATA}. */
1435    private final HFileBlockDefaultDecodingContext defaultDecodingCtx;
1436
1437    /**
1438     * Cache of the NEXT header after this. Check it is indeed next blocks header
1439     * before using it. TODO: Review. This overread into next block to fetch
1440     * next blocks header seems unnecessary given we usually get the block size
1441     * from the hfile index. Review!
1442     */
1443    private AtomicReference<PrefetchedHeader> prefetchedHeader = new AtomicReference<>(new PrefetchedHeader());
1444
1445    /** The size of the file we are reading from, or -1 if unknown. */
1446    private long fileSize;
1447
1448    /** The size of the header */
1449    @VisibleForTesting
1450    protected final int hdrSize;
1451
1452    /** The filesystem used to access data */
1453    private HFileSystem hfs;
1454
1455    private HFileContext fileContext;
1456    // Cache the fileName
1457    private String pathName;
1458
1459    private final Lock streamLock = new ReentrantLock();
1460
1461    FSReaderImpl(FSDataInputStreamWrapper stream, long fileSize, HFileSystem hfs, Path path,
1462        HFileContext fileContext) throws IOException {
1463      this.fileSize = fileSize;
1464      this.hfs = hfs;
1465      if (path != null) {
1466        this.pathName = path.toString();
1467      }
1468      this.fileContext = fileContext;
1469      this.hdrSize = headerSize(fileContext.isUseHBaseChecksum());
1470
1471      this.streamWrapper = stream;
1472      // Older versions of HBase didn't support checksum.
1473      this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum());
1474      defaultDecodingCtx = new HFileBlockDefaultDecodingContext(fileContext);
1475      encodedBlockDecodingCtx = defaultDecodingCtx;
1476    }
1477
1478    /**
1479     * A constructor that reads files with the latest minor version.
1480     * This is used by unit tests only.
1481     */
1482    FSReaderImpl(FSDataInputStream istream, long fileSize, HFileContext fileContext)
1483    throws IOException {
1484      this(new FSDataInputStreamWrapper(istream), fileSize, null, null, fileContext);
1485    }
1486
1487    @Override
1488    public BlockIterator blockRange(final long startOffset, final long endOffset) {
1489      final FSReader owner = this; // handle for inner class
1490      return new BlockIterator() {
1491        private long offset = startOffset;
1492        // Cache length of next block. Current block has the length of next block in it.
1493        private long length = -1;
1494
1495        @Override
1496        public HFileBlock nextBlock() throws IOException {
1497          if (offset >= endOffset) {
1498            return null;
1499          }
1500          HFileBlock b = readBlockData(offset, length, false, false);
1501          offset += b.getOnDiskSizeWithHeader();
1502          length = b.getNextBlockOnDiskSize();
1503          return b.unpack(fileContext, owner);
1504        }
1505
1506        @Override
1507        public HFileBlock nextBlockWithBlockType(BlockType blockType)
1508            throws IOException {
1509          HFileBlock blk = nextBlock();
1510          if (blk.getBlockType() != blockType) {
1511            throw new IOException("Expected block of type " + blockType
1512                + " but found " + blk.getBlockType());
1513          }
1514          return blk;
1515        }
1516      };
1517    }
1518
1519    /**
1520     * Does a positional read or a seek and read into the given buffer. Returns
1521     * the on-disk size of the next block, or -1 if it could not be read/determined; e.g. EOF.
1522     *
1523     * @param dest destination buffer
1524     * @param destOffset offset into the destination buffer at where to put the bytes we read
1525     * @param size size of read
1526     * @param peekIntoNextBlock whether to read the next block's on-disk size
1527     * @param fileOffset position in the stream to read at
1528     * @param pread whether we should do a positional read
1529     * @param istream The input source of data
1530     * @return the on-disk size of the next block with header size included, or
1531     *         -1 if it could not be determined; if not -1, the <code>dest</code> INCLUDES the
1532     *         next header
1533     * @throws IOException
1534     */
1535    @VisibleForTesting
1536    protected int readAtOffset(FSDataInputStream istream, byte[] dest, int destOffset, int size,
1537        boolean peekIntoNextBlock, long fileOffset, boolean pread)
1538        throws IOException {
1539      if (peekIntoNextBlock && destOffset + size + hdrSize > dest.length) {
1540        // We are asked to read the next block's header as well, but there is
1541        // not enough room in the array.
1542        throw new IOException("Attempted to read " + size + " bytes and " + hdrSize +
1543            " bytes of next header into a " + dest.length + "-byte array at offset " + destOffset);
1544      }
1545
1546      if (!pread) {
1547        // Seek + read. Better for scanning.
1548        HFileUtil.seekOnMultipleSources(istream, fileOffset);
1549        // TODO: do we need seek time latencies?
1550        long realOffset = istream.getPos();
1551        if (realOffset != fileOffset) {
1552          throw new IOException("Tried to seek to " + fileOffset + " to " + "read " + size +
1553              " bytes, but pos=" + realOffset + " after seek");
1554        }
1555
1556        if (!peekIntoNextBlock) {
1557          IOUtils.readFully(istream, dest, destOffset, size);
1558          return -1;
1559        }
1560
1561        // Try to read the next block header.
1562        if (!readWithExtra(istream, dest, destOffset, size, hdrSize)) {
1563          return -1;
1564        }
1565      } else {
1566        // Positional read. Better for random reads; or when the streamLock is already locked.
1567        int extraSize = peekIntoNextBlock ? hdrSize : 0;
1568        if (!positionalReadWithExtra(istream, fileOffset, dest, destOffset, size, extraSize)) {
1569          return -1;
1570        }
1571      }
1572      assert peekIntoNextBlock;
1573      return Bytes.toInt(dest, destOffset + size + BlockType.MAGIC_LENGTH) + hdrSize;
1574    }
1575
1576    /**
1577     * Reads a version 2 block (version 1 blocks not supported and not expected). Tries to do as
1578     * little memory allocation as possible, using the provided on-disk size.
1579     *
1580     * @param offset the offset in the stream to read at
1581     * @param onDiskSizeWithHeaderL the on-disk size of the block, including
1582     *          the header, or -1 if unknown; i.e. when iterating over blocks reading
1583     *          in the file metadata info.
1584     * @param pread whether to use a positional read
1585     */
1586    @Override
1587    public HFileBlock readBlockData(long offset, long onDiskSizeWithHeaderL, boolean pread,
1588                                    boolean updateMetrics) throws IOException {
1589      // Get a copy of the current state of whether to validate
1590      // hbase checksums or not for this read call. This is not
1591      // thread-safe but the one constaint is that if we decide
1592      // to skip hbase checksum verification then we are
1593      // guaranteed to use hdfs checksum verification.
1594      boolean doVerificationThruHBaseChecksum = streamWrapper.shouldUseHBaseChecksum();
1595      FSDataInputStream is = streamWrapper.getStream(doVerificationThruHBaseChecksum);
1596
1597      HFileBlock blk = readBlockDataInternal(is, offset,
1598                         onDiskSizeWithHeaderL, pread,
1599                         doVerificationThruHBaseChecksum, updateMetrics);
1600      if (blk == null) {
1601        HFile.LOG.warn("HBase checksum verification failed for file " +
1602                       pathName + " at offset " +
1603                       offset + " filesize " + fileSize +
1604                       ". Retrying read with HDFS checksums turned on...");
1605
1606        if (!doVerificationThruHBaseChecksum) {
1607          String msg = "HBase checksum verification failed for file " +
1608                       pathName + " at offset " +
1609                       offset + " filesize " + fileSize +
1610                       " but this cannot happen because doVerify is " +
1611                       doVerificationThruHBaseChecksum;
1612          HFile.LOG.warn(msg);
1613          throw new IOException(msg); // cannot happen case here
1614        }
1615        HFile.CHECKSUM_FAILURES.increment(); // update metrics
1616
1617        // If we have a checksum failure, we fall back into a mode where
1618        // the next few reads use HDFS level checksums. We aim to make the
1619        // next CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD reads avoid
1620        // hbase checksum verification, but since this value is set without
1621        // holding any locks, it can so happen that we might actually do
1622        // a few more than precisely this number.
1623        is = this.streamWrapper.fallbackToFsChecksum(CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD);
1624        doVerificationThruHBaseChecksum = false;
1625        blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread,
1626                                    doVerificationThruHBaseChecksum, updateMetrics);
1627        if (blk != null) {
1628          HFile.LOG.warn("HDFS checksum verification succeeded for file " +
1629                         pathName + " at offset " +
1630                         offset + " filesize " + fileSize);
1631        }
1632      }
1633      if (blk == null && !doVerificationThruHBaseChecksum) {
1634        String msg = "readBlockData failed, possibly due to " +
1635                     "checksum verification failed for file " + pathName +
1636                     " at offset " + offset + " filesize " + fileSize;
1637        HFile.LOG.warn(msg);
1638        throw new IOException(msg);
1639      }
1640
1641      // If there is a checksum mismatch earlier, then retry with
1642      // HBase checksums switched off and use HDFS checksum verification.
1643      // This triggers HDFS to detect and fix corrupt replicas. The
1644      // next checksumOffCount read requests will use HDFS checksums.
1645      // The decrementing of this.checksumOffCount is not thread-safe,
1646      // but it is harmless because eventually checksumOffCount will be
1647      // a negative number.
1648      streamWrapper.checksumOk();
1649      return blk;
1650    }
1651
1652    /**
1653     * @return Check <code>onDiskSizeWithHeaderL</code> size is healthy and then return it as an int
1654     * @throws IOException
1655     */
1656    private static int checkAndGetSizeAsInt(final long onDiskSizeWithHeaderL, final int hdrSize)
1657    throws IOException {
1658      if ((onDiskSizeWithHeaderL < hdrSize && onDiskSizeWithHeaderL != -1)
1659          || onDiskSizeWithHeaderL >= Integer.MAX_VALUE) {
1660        throw new IOException("Invalid onDisksize=" + onDiskSizeWithHeaderL
1661            + ": expected to be at least " + hdrSize
1662            + " and at most " + Integer.MAX_VALUE + ", or -1");
1663      }
1664      return (int)onDiskSizeWithHeaderL;
1665    }
1666
1667    /**
1668     * Verify the passed in onDiskSizeWithHeader aligns with what is in the header else something
1669     * is not right.
1670     * @throws IOException
1671     */
1672    private void verifyOnDiskSizeMatchesHeader(final int passedIn, final ByteBuffer headerBuf,
1673        final long offset, boolean verifyChecksum)
1674    throws IOException {
1675      // Assert size provided aligns with what is in the header
1676      int fromHeader = getOnDiskSizeWithHeader(headerBuf, verifyChecksum);
1677      if (passedIn != fromHeader) {
1678        throw new IOException("Passed in onDiskSizeWithHeader=" + passedIn + " != " + fromHeader +
1679            ", offset=" + offset + ", fileContext=" + this.fileContext);
1680      }
1681    }
1682
1683    /**
1684     * Check atomic reference cache for this block's header. Cache only good if next
1685     * read coming through is next in sequence in the block. We read next block's
1686     * header on the tail of reading the previous block to save a seek. Otherwise,
1687     * we have to do a seek to read the header before we can pull in the block OR
1688     * we have to backup the stream because we over-read (the next block's header).
1689     * @see PrefetchedHeader
1690     * @return The cached block header or null if not found.
1691     * @see #cacheNextBlockHeader(long, byte[], int, int)
1692     */
1693    private ByteBuffer getCachedHeader(final long offset) {
1694      PrefetchedHeader ph = this.prefetchedHeader.get();
1695      return ph != null && ph.offset == offset? ph.buf: null;
1696    }
1697
1698    /**
1699     * Save away the next blocks header in atomic reference.
1700     * @see #getCachedHeader(long)
1701     * @see PrefetchedHeader
1702     */
1703    private void cacheNextBlockHeader(final long offset,
1704        final byte [] header, final int headerOffset, final int headerLength) {
1705      PrefetchedHeader ph = new PrefetchedHeader();
1706      ph.offset = offset;
1707      System.arraycopy(header, headerOffset, ph.header, 0, headerLength);
1708      this.prefetchedHeader.set(ph);
1709    }
1710
1711    /**
1712     * Reads a version 2 block.
1713     *
1714     * @param offset the offset in the stream to read at.
1715     * @param onDiskSizeWithHeaderL the on-disk size of the block, including
1716     *          the header and checksums if present or -1 if unknown (as a long). Can be -1
1717     *          if we are doing raw iteration of blocks as when loading up file metadata; i.e.
1718     *          the first read of a new file. Usually non-null gotten from the file index.
1719     * @param pread whether to use a positional read
1720     * @param verifyChecksum Whether to use HBase checksums.
1721     *        If HBase checksum is switched off, then use HDFS checksum. Can also flip on/off
1722     *        reading same file if we hit a troublesome patch in an hfile.
1723     * @return the HFileBlock or null if there is a HBase checksum mismatch
1724     */
1725    @VisibleForTesting
1726    protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset,
1727        long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum, boolean updateMetrics)
1728     throws IOException {
1729      if (offset < 0) {
1730        throw new IOException("Invalid offset=" + offset + " trying to read "
1731            + "block (onDiskSize=" + onDiskSizeWithHeaderL + ")");
1732      }
1733      int onDiskSizeWithHeader = checkAndGetSizeAsInt(onDiskSizeWithHeaderL, hdrSize);
1734      // Try and get cached header. Will serve us in rare case where onDiskSizeWithHeaderL is -1
1735      // and will save us having to seek the stream backwards to reread the header we
1736      // read the last time through here.
1737      ByteBuffer headerBuf = getCachedHeader(offset);
1738      LOG.trace("Reading {} at offset={}, pread={}, verifyChecksum={}, cachedHeader={}, " +
1739          "onDiskSizeWithHeader={}", this.fileContext.getHFileName(), offset, pread,
1740          verifyChecksum, headerBuf, onDiskSizeWithHeader);
1741      // This is NOT same as verifyChecksum. This latter is whether to do hbase
1742      // checksums. Can change with circumstances. The below flag is whether the
1743      // file has support for checksums (version 2+).
1744      boolean checksumSupport = this.fileContext.isUseHBaseChecksum();
1745      long startTime = System.currentTimeMillis();
1746      if (onDiskSizeWithHeader <= 0) {
1747        // We were not passed the block size. Need to get it from the header. If header was
1748        // not cached (see getCachedHeader above), need to seek to pull it in. This is costly
1749        // and should happen very rarely. Currently happens on open of a hfile reader where we
1750        // read the trailer blocks to pull in the indices. Otherwise, we are reading block sizes
1751        // out of the hfile index. To check, enable TRACE in this file and you'll get an exception
1752        // in a LOG every time we seek. See HBASE-17072 for more detail.
1753        if (headerBuf == null) {
1754          if (LOG.isTraceEnabled()) {
1755            LOG.trace("Extra see to get block size!", new RuntimeException());
1756          }
1757          headerBuf = ByteBuffer.allocate(hdrSize);
1758          readAtOffset(is, headerBuf.array(), headerBuf.arrayOffset(), hdrSize, false,
1759              offset, pread);
1760        }
1761        onDiskSizeWithHeader = getOnDiskSizeWithHeader(headerBuf, checksumSupport);
1762      }
1763      int preReadHeaderSize = headerBuf == null? 0 : hdrSize;
1764      // Allocate enough space to fit the next block's header too; saves a seek next time through.
1765      // onDiskBlock is whole block + header + checksums then extra hdrSize to read next header;
1766      // onDiskSizeWithHeader is header, body, and any checksums if present. preReadHeaderSize
1767      // says where to start reading. If we have the header cached, then we don't need to read
1768      // it again and we can likely read from last place we left off w/o need to backup and reread
1769      // the header we read last time through here.
1770      // TODO: Make this ByteBuffer-based. Will make it easier to go to HDFS with BBPool (offheap).
1771      byte [] onDiskBlock = new byte[onDiskSizeWithHeader + hdrSize];
1772      int nextBlockOnDiskSize = readAtOffset(is, onDiskBlock, preReadHeaderSize,
1773          onDiskSizeWithHeader - preReadHeaderSize, true, offset + preReadHeaderSize, pread);
1774      if (headerBuf != null) {
1775        // The header has been read when reading the previous block OR in a distinct header-only
1776        // read. Copy to this block's header.
1777        System.arraycopy(headerBuf.array(), headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize);
1778      } else {
1779        headerBuf = ByteBuffer.wrap(onDiskBlock, 0, hdrSize);
1780      }
1781      // Do a few checks before we go instantiate HFileBlock.
1782      assert onDiskSizeWithHeader > this.hdrSize;
1783      verifyOnDiskSizeMatchesHeader(onDiskSizeWithHeader, headerBuf, offset, checksumSupport);
1784      ByteBuffer onDiskBlockByteBuffer = ByteBuffer.wrap(onDiskBlock, 0, onDiskSizeWithHeader);
1785      // Verify checksum of the data before using it for building HFileBlock.
1786      if (verifyChecksum &&
1787          !validateChecksum(offset, onDiskBlockByteBuffer, hdrSize)) {
1788        return null;
1789      }
1790      long duration = System.currentTimeMillis() - startTime;
1791      if (updateMetrics) {
1792        HFile.updateReadLatency(duration, pread);
1793      }
1794      // The onDiskBlock will become the headerAndDataBuffer for this block.
1795      // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already
1796      // contains the header of next block, so no need to set next block's header in it.
1797      HFileBlock hFileBlock =
1798          new HFileBlock(new SingleByteBuff(onDiskBlockByteBuffer), checksumSupport,
1799              MemoryType.EXCLUSIVE, offset, nextBlockOnDiskSize, fileContext);
1800      // Run check on uncompressed sizings.
1801      if (!fileContext.isCompressedOrEncrypted()) {
1802        hFileBlock.sanityCheckUncompressed();
1803      }
1804      LOG.trace("Read {} in {} ns", hFileBlock, duration);
1805      // Cache next block header if we read it for the next time through here.
1806      if (nextBlockOnDiskSize != -1) {
1807        cacheNextBlockHeader(offset + hFileBlock.getOnDiskSizeWithHeader(),
1808            onDiskBlock, onDiskSizeWithHeader, hdrSize);
1809      }
1810      return hFileBlock;
1811    }
1812
1813    @Override
1814    public void setIncludesMemStoreTS(boolean includesMemstoreTS) {
1815      this.fileContext.setIncludesMvcc(includesMemstoreTS);
1816    }
1817
1818    @Override
1819    public void setDataBlockEncoder(HFileDataBlockEncoder encoder) {
1820      encodedBlockDecodingCtx = encoder.newDataBlockDecodingContext(this.fileContext);
1821    }
1822
1823    @Override
1824    public HFileBlockDecodingContext getBlockDecodingContext() {
1825      return this.encodedBlockDecodingCtx;
1826    }
1827
1828    @Override
1829    public HFileBlockDecodingContext getDefaultBlockDecodingContext() {
1830      return this.defaultDecodingCtx;
1831    }
1832
1833    /**
1834     * Generates the checksum for the header as well as the data and then validates it.
1835     * If the block doesn't uses checksum, returns false.
1836     * @return True if checksum matches, else false.
1837     */
1838    private boolean validateChecksum(long offset, ByteBuffer data, int hdrSize)
1839        throws IOException {
1840      // If this is an older version of the block that does not have checksums, then return false
1841      // indicating that checksum verification did not succeed. Actually, this method should never
1842      // be called when the minorVersion is 0, thus this is a defensive check for a cannot-happen
1843      // case. Since this is a cannot-happen case, it is better to return false to indicate a
1844      // checksum validation failure.
1845      if (!fileContext.isUseHBaseChecksum()) {
1846        return false;
1847      }
1848      return ChecksumUtil.validateChecksum(data, pathName, offset, hdrSize);
1849    }
1850
1851    @Override
1852    public void closeStreams() throws IOException {
1853      streamWrapper.close();
1854    }
1855
1856    @Override
1857    public void unbufferStream() {
1858      // To handle concurrent reads, ensure that no other client is accessing the streams while we
1859      // unbuffer it.
1860      if (streamLock.tryLock()) {
1861        try {
1862          this.streamWrapper.unbuffer();
1863        } finally {
1864          streamLock.unlock();
1865        }
1866      }
1867    }
1868
1869    @Override
1870    public String toString() {
1871      return "hfs=" + hfs + ", path=" + pathName + ", fileContext=" + fileContext;
1872    }
1873  }
1874
1875  /** An additional sanity-check in case no compression or encryption is being used. */
1876  @VisibleForTesting
1877  void sanityCheckUncompressed() throws IOException {
1878    if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader +
1879        totalChecksumBytes()) {
1880      throw new IOException("Using no compression but "
1881          + "onDiskSizeWithoutHeader=" + onDiskSizeWithoutHeader + ", "
1882          + "uncompressedSizeWithoutHeader=" + uncompressedSizeWithoutHeader
1883          + ", numChecksumbytes=" + totalChecksumBytes());
1884    }
1885  }
1886
1887  // Cacheable implementation
1888  @Override
1889  public int getSerializedLength() {
1890    if (buf != null) {
1891      // Include extra bytes for block metadata.
1892      return this.buf.limit() + BLOCK_METADATA_SPACE;
1893    }
1894    return 0;
1895  }
1896
1897  // Cacheable implementation
1898  @Override
1899  public void serialize(ByteBuffer destination, boolean includeNextBlockMetadata) {
1900    this.buf.get(destination, 0, getSerializedLength() - BLOCK_METADATA_SPACE);
1901    destination = addMetaData(destination, includeNextBlockMetadata);
1902
1903    // Make it ready for reading. flip sets position to zero and limit to current position which
1904    // is what we want if we do not want to serialize the block plus checksums if present plus
1905    // metadata.
1906    destination.flip();
1907  }
1908
1909  /**
1910   * For use by bucketcache. This exposes internals.
1911   */
1912  public ByteBuffer getMetaData() {
1913    ByteBuffer bb = ByteBuffer.allocate(BLOCK_METADATA_SPACE);
1914    bb = addMetaData(bb, true);
1915    bb.flip();
1916    return bb;
1917  }
1918
1919  /**
1920   * Adds metadata at current position (position is moved forward). Does not flip or reset.
1921   * @return The passed <code>destination</code> with metadata added.
1922   */
1923  private ByteBuffer addMetaData(final ByteBuffer destination, boolean includeNextBlockMetadata) {
1924    destination.put(this.fileContext.isUseHBaseChecksum() ? (byte) 1 : (byte) 0);
1925    destination.putLong(this.offset);
1926    if (includeNextBlockMetadata) {
1927      destination.putInt(this.nextBlockOnDiskSize);
1928    }
1929    return destination;
1930  }
1931
1932  // Cacheable implementation
1933  @Override
1934  public CacheableDeserializer<Cacheable> getDeserializer() {
1935    return HFileBlock.BLOCK_DESERIALIZER;
1936  }
1937
1938  @Override
1939  public int hashCode() {
1940    int result = 1;
1941    result = result * 31 + blockType.hashCode();
1942    result = result * 31 + nextBlockOnDiskSize;
1943    result = result * 31 + (int) (offset ^ (offset >>> 32));
1944    result = result * 31 + onDiskSizeWithoutHeader;
1945    result = result * 31 + (int) (prevBlockOffset ^ (prevBlockOffset >>> 32));
1946    result = result * 31 + uncompressedSizeWithoutHeader;
1947    result = result * 31 + buf.hashCode();
1948    return result;
1949  }
1950
1951  @Override
1952  public boolean equals(Object comparison) {
1953    if (this == comparison) {
1954      return true;
1955    }
1956    if (comparison == null) {
1957      return false;
1958    }
1959    if (comparison.getClass() != this.getClass()) {
1960      return false;
1961    }
1962
1963    HFileBlock castedComparison = (HFileBlock) comparison;
1964
1965    if (castedComparison.blockType != this.blockType) {
1966      return false;
1967    }
1968    if (castedComparison.nextBlockOnDiskSize != this.nextBlockOnDiskSize) {
1969      return false;
1970    }
1971    // Offset is important. Needed when we have to remake cachekey when block is returned to cache.
1972    if (castedComparison.offset != this.offset) {
1973      return false;
1974    }
1975    if (castedComparison.onDiskSizeWithoutHeader != this.onDiskSizeWithoutHeader) {
1976      return false;
1977    }
1978    if (castedComparison.prevBlockOffset != this.prevBlockOffset) {
1979      return false;
1980    }
1981    if (castedComparison.uncompressedSizeWithoutHeader != this.uncompressedSizeWithoutHeader) {
1982      return false;
1983    }
1984    if (ByteBuff.compareTo(this.buf, 0, this.buf.limit(), castedComparison.buf, 0,
1985        castedComparison.buf.limit()) != 0) {
1986      return false;
1987    }
1988    return true;
1989  }
1990
1991  DataBlockEncoding getDataBlockEncoding() {
1992    if (blockType == BlockType.ENCODED_DATA) {
1993      return DataBlockEncoding.getEncodingById(getDataBlockEncodingId());
1994    }
1995    return DataBlockEncoding.NONE;
1996  }
1997
1998  @VisibleForTesting
1999  byte getChecksumType() {
2000    return this.fileContext.getChecksumType().getCode();
2001  }
2002
2003  int getBytesPerChecksum() {
2004    return this.fileContext.getBytesPerChecksum();
2005  }
2006
2007  /** @return the size of data on disk + header. Excludes checksum. */
2008  @VisibleForTesting
2009  int getOnDiskDataSizeWithHeader() {
2010    return this.onDiskDataSizeWithHeader;
2011  }
2012
2013  /**
2014   * Calculate the number of bytes required to store all the checksums
2015   * for this block. Each checksum value is a 4 byte integer.
2016   */
2017  int totalChecksumBytes() {
2018    // If the hfile block has minorVersion 0, then there are no checksum
2019    // data to validate. Similarly, a zero value in this.bytesPerChecksum
2020    // indicates that cached blocks do not have checksum data because
2021    // checksums were already validated when the block was read from disk.
2022    if (!fileContext.isUseHBaseChecksum() || this.fileContext.getBytesPerChecksum() == 0) {
2023      return 0;
2024    }
2025    return (int) ChecksumUtil.numBytes(onDiskDataSizeWithHeader,
2026        this.fileContext.getBytesPerChecksum());
2027  }
2028
2029  /**
2030   * Returns the size of this block header.
2031   */
2032  public int headerSize() {
2033    return headerSize(this.fileContext.isUseHBaseChecksum());
2034  }
2035
2036  /**
2037   * Maps a minor version to the size of the header.
2038   */
2039  public static int headerSize(boolean usesHBaseChecksum) {
2040    return usesHBaseChecksum?
2041        HConstants.HFILEBLOCK_HEADER_SIZE: HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
2042  }
2043
2044  /**
2045   * Return the appropriate DUMMY_HEADER for the minor version
2046   */
2047  @VisibleForTesting
2048  // TODO: Why is this in here?
2049  byte[] getDummyHeaderForVersion() {
2050    return getDummyHeaderForVersion(this.fileContext.isUseHBaseChecksum());
2051  }
2052
2053  /**
2054   * Return the appropriate DUMMY_HEADER for the minor version
2055   */
2056  static private byte[] getDummyHeaderForVersion(boolean usesHBaseChecksum) {
2057    return usesHBaseChecksum? HConstants.HFILEBLOCK_DUMMY_HEADER: DUMMY_HEADER_NO_CHECKSUM;
2058  }
2059
2060  /**
2061   * @return This HFileBlocks fileContext which will a derivative of the
2062   * fileContext for the file from which this block's data was originally read.
2063   */
2064  HFileContext getHFileContext() {
2065    return this.fileContext;
2066  }
2067
2068  @Override
2069  public MemoryType getMemoryType() {
2070    return this.memType;
2071  }
2072
2073  /**
2074   * @return true if this block is backed by a shared memory area(such as that of a BucketCache).
2075   */
2076  boolean usesSharedMemory() {
2077    return this.memType == MemoryType.SHARED;
2078  }
2079
2080  /**
2081   * Convert the contents of the block header into a human readable string.
2082   * This is mostly helpful for debugging. This assumes that the block
2083   * has minor version > 0.
2084   */
2085  @VisibleForTesting
2086  static String toStringHeader(ByteBuff buf) throws IOException {
2087    byte[] magicBuf = new byte[Math.min(buf.limit() - buf.position(), BlockType.MAGIC_LENGTH)];
2088    buf.get(magicBuf);
2089    BlockType bt = BlockType.parse(magicBuf, 0, BlockType.MAGIC_LENGTH);
2090    int compressedBlockSizeNoHeader = buf.getInt();
2091    int uncompressedBlockSizeNoHeader = buf.getInt();
2092    long prevBlockOffset = buf.getLong();
2093    byte cksumtype = buf.get();
2094    long bytesPerChecksum = buf.getInt();
2095    long onDiskDataSizeWithHeader = buf.getInt();
2096    return " Header dump: magic: " + Bytes.toString(magicBuf) +
2097                   " blockType " + bt +
2098                   " compressedBlockSizeNoHeader " +
2099                   compressedBlockSizeNoHeader +
2100                   " uncompressedBlockSizeNoHeader " +
2101                   uncompressedBlockSizeNoHeader +
2102                   " prevBlockOffset " + prevBlockOffset +
2103                   " checksumType " + ChecksumType.codeToType(cksumtype) +
2104                   " bytesPerChecksum " + bytesPerChecksum +
2105                   " onDiskDataSizeWithHeader " + onDiskDataSizeWithHeader;
2106  }
2107
2108  public HFileBlock deepClone() {
2109    return new HFileBlock(this, true);
2110  }
2111}