View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.io.hfile;
19  
20  import java.io.ByteArrayOutputStream;
21  import java.io.DataInputStream;
22  import java.io.DataOutput;
23  import java.io.DataOutputStream;
24  import java.io.IOException;
25  import java.io.InputStream;
26  import java.nio.ByteBuffer;
27  import java.util.concurrent.locks.Lock;
28  import java.util.concurrent.locks.ReentrantLock;
29  
30  import org.apache.hadoop.fs.FSDataInputStream;
31  import org.apache.hadoop.fs.FSDataOutputStream;
32  import org.apache.hadoop.fs.Path;
33  import org.apache.hadoop.hbase.Cell;
34  import org.apache.hadoop.hbase.HConstants;
35  import org.apache.hadoop.hbase.classification.InterfaceAudience;
36  import org.apache.hadoop.hbase.fs.HFileSystem;
37  import org.apache.hadoop.hbase.io.ByteBufferInputStream;
38  import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
39  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
40  import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
41  import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
42  import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
43  import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
44  import org.apache.hadoop.hbase.util.ByteBufferUtils;
45  import org.apache.hadoop.hbase.util.Bytes;
46  import org.apache.hadoop.hbase.util.ChecksumType;
47  import org.apache.hadoop.hbase.util.ClassSize;
48  import org.apache.hadoop.io.IOUtils;
49  
50  import com.google.common.annotations.VisibleForTesting;
51  import com.google.common.base.Preconditions;
52  
53  /**
54   * Reading {@link HFile} version 1 and 2 blocks, and writing version 2 blocks.
55   * <ul>
56   * <li>In version 1 all blocks are always compressed or uncompressed, as
57   * specified by the {@link HFile}'s compression algorithm, with a type-specific
58   * magic record stored in the beginning of the compressed data (i.e. one needs
59   * to uncompress the compressed block to determine the block type). There is
60   * only a single compression algorithm setting for all blocks. Offset and size
61   * information from the block index are required to read a block.
62   * <li>In version 2 a block is structured as follows:
63   * <ul>
64   * <li>header (see Writer#finishBlock())
65   * <ul>
66   * <li>Magic record identifying the block type (8 bytes)
67   * <li>Compressed block size, excluding header, including checksum (4 bytes)
68   * <li>Uncompressed block size, excluding header, excluding checksum (4 bytes)
69   * <li>The offset of the previous block of the same type (8 bytes). This is
70   * used to be able to navigate to the previous block without going to the block
71   * <li>For minorVersions &gt;=1, the ordinal describing checksum type (1 byte)
72   * <li>For minorVersions &gt;=1, the number of data bytes/checksum chunk (4 bytes)
73   * <li>For minorVersions &gt;=1, the size of data on disk, including header,
74   * excluding checksums (4 bytes)
75   * </ul>
76   * </li>
77   * <li>Raw/Compressed/Encrypted/Encoded data. The compression algorithm is the
78   * same for all the blocks in the {@link HFile}, similarly to what was done in
79   * version 1.
80   * <li>For minorVersions &gt;=1, a series of 4 byte checksums, one each for
81   * the number of bytes specified by bytesPerChecksum.
82   * </ul>
83   * </ul>
84   */
85  @InterfaceAudience.Private
86  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="HE_EQUALS_USE_HASHCODE",
87    justification="Fix!!! Fine for now bug FIXXXXXXX!!!!")
88  public class HFileBlock implements Cacheable {
89  
90    /**
91     * On a checksum failure on a Reader, these many suceeding read
92     * requests switch back to using hdfs checksums before auto-reenabling
93     * hbase checksum verification.
94     */
95    static final int CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD = 3;
96  
97    public static final boolean FILL_HEADER = true;
98    public static final boolean DONT_FILL_HEADER = false;
99  
100   /**
101    * The size of block header when blockType is {@link BlockType#ENCODED_DATA}.
102    * This extends normal header by adding the id of encoder.
103    */
104   public static final int ENCODED_HEADER_SIZE = HConstants.HFILEBLOCK_HEADER_SIZE
105       + DataBlockEncoding.ID_SIZE;
106 
107   static final byte[] DUMMY_HEADER_NO_CHECKSUM =
108      new byte[HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM];
109 
110   public static final int BYTE_BUFFER_HEAP_SIZE = (int) ClassSize.estimateBase(
111       ByteBuffer.wrap(new byte[0], 0, 0).getClass(), false);
112 
113   // meta.usesHBaseChecksum+offset+nextBlockOnDiskSizeWithHeader
114   public static final int EXTRA_SERIALIZATION_SPACE = Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT
115       + Bytes.SIZEOF_LONG;
116 
117   /**
118    * Each checksum value is an integer that can be stored in 4 bytes.
119    */
120   static final int CHECKSUM_SIZE = Bytes.SIZEOF_INT;
121 
122   static final CacheableDeserializer<Cacheable> blockDeserializer =
123       new CacheableDeserializer<Cacheable>() {
124         public HFileBlock deserialize(ByteBuffer buf, boolean reuse) throws IOException{
125           buf.limit(buf.limit() - HFileBlock.EXTRA_SERIALIZATION_SPACE).rewind();
126           ByteBuffer newByteBuffer;
127           if (reuse) {
128             newByteBuffer = buf.slice();
129           } else {
130            newByteBuffer = ByteBuffer.allocate(buf.limit());
131            newByteBuffer.put(buf);
132           }
133           buf.position(buf.limit());
134           buf.limit(buf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE);
135           boolean usesChecksum = buf.get() == (byte)1;
136           HFileBlock hFileBlock = new HFileBlock(newByteBuffer, usesChecksum);
137           hFileBlock.offset = buf.getLong();
138           hFileBlock.nextBlockOnDiskSizeWithHeader = buf.getInt();
139           if (hFileBlock.hasNextBlockHeader()) {
140             hFileBlock.buf.limit(hFileBlock.buf.limit() - hFileBlock.headerSize());
141           }
142           return hFileBlock;
143         }
144 
145         @Override
146         public int getDeserialiserIdentifier() {
147           return deserializerIdentifier;
148         }
149 
150         @Override
151         public HFileBlock deserialize(ByteBuffer b) throws IOException {
152           return deserialize(b, false);
153         }
154       };
155   private static final int deserializerIdentifier;
156   static {
157     deserializerIdentifier = CacheableDeserializerIdManager
158         .registerDeserializer(blockDeserializer);
159   }
160 
161     // Todo: encapsulate Header related logic in this inner class.
162   static class Header {
163     // Format of header is:
164     // 8 bytes - block magic
165     // 4 bytes int - onDiskSizeWithoutHeader
166     // 4 bytes int - uncompressedSizeWithoutHeader
167     // 8 bytes long - prevBlockOffset
168     // The following 3 are only present if header contains checksum information
169     // 1 byte - checksum type
170     // 4 byte int - bytes per checksum
171     // 4 byte int - onDiskDataSizeWithHeader
172     static int BLOCK_MAGIC_INDEX = 0;
173     static int ON_DISK_SIZE_WITHOUT_HEADER_INDEX = 8;
174     static int UNCOMPRESSED_SIZE_WITHOUT_HEADER_INDEX = 12;
175     static int PREV_BLOCK_OFFSET_INDEX = 16;
176     static int CHECKSUM_TYPE_INDEX = 24;
177     static int BYTES_PER_CHECKSUM_INDEX = 25;
178     static int ON_DISK_DATA_SIZE_WITH_HEADER_INDEX = 29;
179   }
180 
181   /** Type of block. Header field 0. */
182   private BlockType blockType;
183 
184   /** Size on disk excluding header, including checksum. Header field 1. */
185   private int onDiskSizeWithoutHeader;
186 
187   /** Size of pure data. Does not include header or checksums. Header field 2. */
188   private final int uncompressedSizeWithoutHeader;
189 
190   /** The offset of the previous block on disk. Header field 3. */
191   private final long prevBlockOffset;
192 
193   /**
194    * Size on disk of header + data. Excludes checksum. Header field 6,
195    * OR calculated from {@link #onDiskSizeWithoutHeader} when using HDFS checksum.
196    */
197   private final int onDiskDataSizeWithHeader;
198 
199   /** The in-memory representation of the hfile block */
200   private ByteBuffer buf;
201 
202   /** Meta data that holds meta information on the hfileblock */
203   private HFileContext fileContext;
204 
205   /**
206    * The offset of this block in the file. Populated by the reader for
207    * convenience of access. This offset is not part of the block header.
208    */
209   private long offset = -1;
210 
211   /**
212    * The on-disk size of the next block, including the header, obtained by
213    * peeking into the first {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the next block's
214    * header, or -1 if unknown.
215    */
216   private int nextBlockOnDiskSizeWithHeader = -1;
217 
218   /**
219    * Creates a new {@link HFile} block from the given fields. This constructor
220    * is mostly used when the block data has already been read and uncompressed,
221    * and is sitting in a byte buffer.
222    *
223    * @param blockType the type of this block, see {@link BlockType}
224    * @param onDiskSizeWithoutHeader see {@link #onDiskSizeWithoutHeader}
225    * @param uncompressedSizeWithoutHeader see {@link #uncompressedSizeWithoutHeader}
226    * @param prevBlockOffset see {@link #prevBlockOffset}
227    * @param buf block header ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes) followed by
228    *          uncompressed data. This
229    * @param fillHeader when true, parse {@code buf} and override the first 4 header fields.
230    * @param offset the file offset the block was read from
231    * @param onDiskDataSizeWithHeader see {@link #onDiskDataSizeWithHeader}
232    * @param fileContext HFile meta data
233    */
234   HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader, int uncompressedSizeWithoutHeader,
235       long prevBlockOffset, ByteBuffer buf, boolean fillHeader, long offset,
236       int onDiskDataSizeWithHeader, HFileContext fileContext) {
237     this.blockType = blockType;
238     this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader;
239     this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader;
240     this.prevBlockOffset = prevBlockOffset;
241     this.buf = buf;
242     this.offset = offset;
243     this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader;
244     this.fileContext = fileContext;
245     if (fillHeader)
246       overwriteHeader();
247     this.buf.rewind();
248   }
249 
250   /**
251    * Copy constructor. Creates a shallow copy of {@code that}'s buffer.
252    */
253   HFileBlock(HFileBlock that) {
254     this.blockType = that.blockType;
255     this.onDiskSizeWithoutHeader = that.onDiskSizeWithoutHeader;
256     this.uncompressedSizeWithoutHeader = that.uncompressedSizeWithoutHeader;
257     this.prevBlockOffset = that.prevBlockOffset;
258     this.buf = that.buf.duplicate();
259     this.offset = that.offset;
260     this.onDiskDataSizeWithHeader = that.onDiskDataSizeWithHeader;
261     this.fileContext = that.fileContext;
262     this.nextBlockOnDiskSizeWithHeader = that.nextBlockOnDiskSizeWithHeader;
263   }
264 
265   /**
266    * Creates a block from an existing buffer starting with a header. Rewinds
267    * and takes ownership of the buffer. By definition of rewind, ignores the
268    * buffer position, but if you slice the buffer beforehand, it will rewind
269    * to that point. The reason this has a minorNumber and not a majorNumber is
270    * because majorNumbers indicate the format of a HFile whereas minorNumbers
271    * indicate the format inside a HFileBlock.
272    */
273   HFileBlock(ByteBuffer b, boolean usesHBaseChecksum) throws IOException {
274     b.rewind();
275     blockType = BlockType.read(b);
276     onDiskSizeWithoutHeader = b.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX);
277     uncompressedSizeWithoutHeader = b.getInt(Header.UNCOMPRESSED_SIZE_WITHOUT_HEADER_INDEX);
278     prevBlockOffset = b.getLong(Header.PREV_BLOCK_OFFSET_INDEX);
279     HFileContextBuilder contextBuilder = new HFileContextBuilder();
280     contextBuilder.withHBaseCheckSum(usesHBaseChecksum);
281     if (usesHBaseChecksum) {
282       contextBuilder.withChecksumType(ChecksumType.codeToType(b.get(Header.CHECKSUM_TYPE_INDEX)));
283       contextBuilder.withBytesPerCheckSum(b.getInt(Header.BYTES_PER_CHECKSUM_INDEX));
284       this.onDiskDataSizeWithHeader = b.getInt(Header.ON_DISK_DATA_SIZE_WITH_HEADER_INDEX);
285     } else {
286       contextBuilder.withChecksumType(ChecksumType.NULL);
287       contextBuilder.withBytesPerCheckSum(0);
288       this.onDiskDataSizeWithHeader = onDiskSizeWithoutHeader +
289                                        HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
290     }
291     this.fileContext = contextBuilder.build();
292     buf = b;
293     buf.rewind();
294   }
295 
296   public BlockType getBlockType() {
297     return blockType;
298   }
299 
300   /** @return get data block encoding id that was used to encode this block */
301   public short getDataBlockEncodingId() {
302     if (blockType != BlockType.ENCODED_DATA) {
303       throw new IllegalArgumentException("Querying encoder ID of a block " +
304           "of type other than " + BlockType.ENCODED_DATA + ": " + blockType);
305     }
306     return buf.getShort(headerSize());
307   }
308 
309   /**
310    * @return the on-disk size of header + data part + checksum.
311    */
312   public int getOnDiskSizeWithHeader() {
313     return onDiskSizeWithoutHeader + headerSize();
314   }
315 
316   /**
317    * @return the on-disk size of the data part + checksum (header excluded).
318    */
319   public int getOnDiskSizeWithoutHeader() {
320     return onDiskSizeWithoutHeader;
321   }
322 
323   /**
324    * @return the uncompressed size of data part (header and checksum excluded).
325    */
326    public int getUncompressedSizeWithoutHeader() {
327     return uncompressedSizeWithoutHeader;
328   }
329 
330   /**
331    * @return the offset of the previous block of the same type in the file, or
332    *         -1 if unknown
333    */
334   public long getPrevBlockOffset() {
335     return prevBlockOffset;
336   }
337 
338   /**
339    * Rewinds {@code buf} and writes first 4 header fields. {@code buf} position
340    * is modified as side-effect.
341    */
342   private void overwriteHeader() {
343     buf.rewind();
344     blockType.write(buf);
345     buf.putInt(onDiskSizeWithoutHeader);
346     buf.putInt(uncompressedSizeWithoutHeader);
347     buf.putLong(prevBlockOffset);
348     if (this.fileContext.isUseHBaseChecksum()) {
349       buf.put(fileContext.getChecksumType().getCode());
350       buf.putInt(fileContext.getBytesPerChecksum());
351       buf.putInt(onDiskDataSizeWithHeader);
352     }
353   }
354 
355   /**
356    * Returns a buffer that does not include the header or checksum.
357    *
358    * @return the buffer with header skipped and checksum omitted.
359    */
360   public ByteBuffer getBufferWithoutHeader() {
361     ByteBuffer dup = this.buf.duplicate();
362     dup.position(headerSize());
363     dup.limit(buf.limit() - totalChecksumBytes());
364     return dup.slice();
365   }
366 
367   /**
368    * Returns the buffer this block stores internally. The clients must not
369    * modify the buffer object. This method has to be public because it is
370    * used in {@link org.apache.hadoop.hbase.util.CompoundBloomFilter}
371    * to avoid object creation on every Bloom filter lookup, but has to
372    * be used with caution. Checksum data is not included in the returned
373    * buffer but header data is.
374    *
375    * @return the buffer of this block for read-only operations
376    */
377   public ByteBuffer getBufferReadOnly() {
378     ByteBuffer dup = this.buf.duplicate();
379     dup.limit(buf.limit() - totalChecksumBytes());
380     return dup.slice();
381   }
382 
383   /**
384    * Returns the buffer of this block, including header data. The clients must
385    * not modify the buffer object. This method has to be public because it is
386    * used in {@link org.apache.hadoop.hbase.io.hfile.bucket.BucketCache} to avoid buffer copy.
387    *
388    * @return the buffer with header and checksum included for read-only operations
389    */
390   public ByteBuffer getBufferReadOnlyWithHeader() {
391     ByteBuffer dup = this.buf.duplicate();
392     return dup.slice();
393   }
394 
395   /**
396    * Returns a byte buffer of this block, including header data and checksum, positioned at
397    * the beginning of header. The underlying data array is not copied.
398    *
399    * @return the byte buffer with header and checksum included
400    */
401   ByteBuffer getBufferWithHeader() {
402     ByteBuffer dupBuf = buf.duplicate();
403     dupBuf.rewind();
404     return dupBuf;
405   }
406 
407   private void sanityCheckAssertion(long valueFromBuf, long valueFromField,
408       String fieldName) throws IOException {
409     if (valueFromBuf != valueFromField) {
410       throw new AssertionError(fieldName + " in the buffer (" + valueFromBuf
411           + ") is different from that in the field (" + valueFromField + ")");
412     }
413   }
414 
415   private void sanityCheckAssertion(BlockType valueFromBuf, BlockType valueFromField)
416       throws IOException {
417     if (valueFromBuf != valueFromField) {
418       throw new IOException("Block type stored in the buffer: " +
419         valueFromBuf + ", block type field: " + valueFromField);
420     }
421   }
422 
423   /**
424    * Checks if the block is internally consistent, i.e. the first
425    * {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the buffer contain a
426    * valid header consistent with the fields. Assumes a packed block structure.
427    * This function is primary for testing and debugging, and is not
428    * thread-safe, because it alters the internal buffer pointer.
429    */
430   void sanityCheck() throws IOException {
431     buf.rewind();
432 
433     sanityCheckAssertion(BlockType.read(buf), blockType);
434 
435     sanityCheckAssertion(buf.getInt(), onDiskSizeWithoutHeader,
436         "onDiskSizeWithoutHeader");
437 
438     sanityCheckAssertion(buf.getInt(), uncompressedSizeWithoutHeader,
439         "uncompressedSizeWithoutHeader");
440 
441     sanityCheckAssertion(buf.getLong(), prevBlockOffset, "prevBlocKOffset");
442     if (this.fileContext.isUseHBaseChecksum()) {
443       sanityCheckAssertion(buf.get(), this.fileContext.getChecksumType().getCode(), "checksumType");
444       sanityCheckAssertion(buf.getInt(), this.fileContext.getBytesPerChecksum(), "bytesPerChecksum");
445       sanityCheckAssertion(buf.getInt(), onDiskDataSizeWithHeader, "onDiskDataSizeWithHeader");
446     }
447 
448     int cksumBytes = totalChecksumBytes();
449     int expectedBufLimit = onDiskDataSizeWithHeader + cksumBytes;
450     if (buf.limit() != expectedBufLimit) {
451       throw new AssertionError("Expected buffer limit " + expectedBufLimit
452           + ", got " + buf.limit());
453     }
454 
455     // We might optionally allocate HFILEBLOCK_HEADER_SIZE more bytes to read the next
456     // block's header, so there are two sensible values for buffer capacity.
457     int hdrSize = headerSize();
458     if (buf.capacity() != expectedBufLimit &&
459         buf.capacity() != expectedBufLimit + hdrSize) {
460       throw new AssertionError("Invalid buffer capacity: " + buf.capacity() +
461           ", expected " + expectedBufLimit + " or " + (expectedBufLimit + hdrSize));
462     }
463   }
464 
465   @Override
466   public String toString() {
467     StringBuilder sb = new StringBuilder()
468       .append("HFileBlock [")
469       .append(" fileOffset=").append(offset)
470       .append(" headerSize()=").append(headerSize())
471       .append(" blockType=").append(blockType)
472       .append(" onDiskSizeWithoutHeader=").append(onDiskSizeWithoutHeader)
473       .append(" uncompressedSizeWithoutHeader=").append(uncompressedSizeWithoutHeader)
474       .append(" prevBlockOffset=").append(prevBlockOffset)
475       .append(" isUseHBaseChecksum()=").append(fileContext.isUseHBaseChecksum());
476     if (fileContext.isUseHBaseChecksum()) {
477       sb.append(" checksumType=").append(ChecksumType.codeToType(this.buf.get(24)))
478         .append(" bytesPerChecksum=").append(this.buf.getInt(24 + 1))
479         .append(" onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader);
480     } else {
481       sb.append(" onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader)
482         .append("(").append(onDiskSizeWithoutHeader)
483         .append("+").append(HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM).append(")");
484     }
485     String dataBegin = null;
486     if (buf.hasArray()) {
487       dataBegin = Bytes.toStringBinary(buf.array(), buf.arrayOffset() + headerSize(),
488           Math.min(32, buf.limit() - buf.arrayOffset() - headerSize()));
489     } else {
490       ByteBuffer bufWithoutHeader = getBufferWithoutHeader();
491       byte[] dataBeginBytes = new byte[Math.min(32,
492           bufWithoutHeader.limit() - bufWithoutHeader.position())];
493       bufWithoutHeader.get(dataBeginBytes);
494       dataBegin = Bytes.toStringBinary(dataBeginBytes);
495     }
496     sb.append(" getOnDiskSizeWithHeader()=").append(getOnDiskSizeWithHeader())
497       .append(" totalChecksumBytes()=").append(totalChecksumBytes())
498       .append(" isUnpacked()=").append(isUnpacked())
499       .append(" buf=[ ").append(buf).append(" ]")
500       .append(" dataBeginsWith=").append(dataBegin)
501       .append(" fileContext=").append(fileContext)
502       .append(" ]");
503     return sb.toString();
504   }
505 
506   /**
507    * Called after reading a block with provided onDiskSizeWithHeader.
508    */
509   private static void validateOnDiskSizeWithoutHeader(int expectedOnDiskSizeWithoutHeader,
510       int actualOnDiskSizeWithoutHeader, ByteBuffer buf, long offset) throws IOException {
511     if (actualOnDiskSizeWithoutHeader != expectedOnDiskSizeWithoutHeader) {
512       // We make the read-only copy here instead of when passing the parameter to this function
513       // to make duplicates only in failure cases, instead of every single time.
514       ByteBuffer bufReadOnly = buf.asReadOnlyBuffer();
515       String dataBegin = null;
516       byte[] dataBeginBytes = new byte[Math.min(32, bufReadOnly.limit() - bufReadOnly.position())];
517       bufReadOnly.get(dataBeginBytes);
518       dataBegin = Bytes.toStringBinary(dataBeginBytes);
519       String blockInfoMsg =
520         "Block offset: " + offset + ", data starts with: " + dataBegin;
521       throw new IOException("On-disk size without header provided is "
522           + expectedOnDiskSizeWithoutHeader + ", but block "
523           + "header contains " + actualOnDiskSizeWithoutHeader + ". " +
524           blockInfoMsg);
525     }
526   }
527 
528   /**
529    * Retrieves the decompressed/decrypted view of this block. An encoded block remains in its
530    * encoded structure. Internal structures are shared between instances where applicable.
531    */
532   HFileBlock unpack(HFileContext fileContext, FSReader reader) throws IOException {
533     if (!fileContext.isCompressedOrEncrypted()) {
534       // TODO: cannot use our own fileContext here because HFileBlock(ByteBuffer, boolean),
535       // which is used for block serialization to L2 cache, does not preserve encoding and
536       // encryption details.
537       return this;
538     }
539 
540     HFileBlock unpacked = new HFileBlock(this);
541     unpacked.allocateBuffer(); // allocates space for the decompressed block
542 
543     HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA ?
544       reader.getBlockDecodingContext() : reader.getDefaultBlockDecodingContext();
545 
546     ByteBuffer dup = this.buf.duplicate();
547     dup.position(this.headerSize());
548     dup = dup.slice();
549     ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(),
550       unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(),
551       dup);
552 
553     // Preserve the next block's header bytes in the new block if we have them.
554     if (unpacked.hasNextBlockHeader()) {
555       // Both the buffers are limited till checksum bytes and avoid the next block's header.
556       // Below call to copyFromBufferToBuffer() will try positional read/write from/to buffers when
557       // any of the buffer is DBB. So we change the limit on a dup buffer. No copying just create
558       // new BB objects
559       ByteBuffer inDup = this.buf.duplicate();
560       inDup.limit(inDup.limit() + headerSize());
561       ByteBuffer outDup = unpacked.buf.duplicate();
562       outDup.limit(outDup.limit() + unpacked.headerSize());
563       ByteBufferUtils.copyFromBufferToBuffer(
564           outDup,
565           inDup,
566           this.onDiskDataSizeWithHeader,
567           unpacked.headerSize() + unpacked.uncompressedSizeWithoutHeader
568               + unpacked.totalChecksumBytes(), unpacked.headerSize());
569     }
570     return unpacked;
571   }
572 
573   /**
574    * Return true when this buffer includes next block's header.
575    */
576   private boolean hasNextBlockHeader() {
577     return nextBlockOnDiskSizeWithHeader > 0;
578   }
579 
580   /**
581    * Always allocates a new buffer of the correct size. Copies header bytes
582    * from the existing buffer. Does not change header fields.
583    * Reserve room to keep checksum bytes too.
584    */
585   private void allocateBuffer() {
586     int cksumBytes = totalChecksumBytes();
587     int headerSize = headerSize();
588     int capacityNeeded = headerSize + uncompressedSizeWithoutHeader +
589         cksumBytes + (hasNextBlockHeader() ? headerSize : 0);
590 
591     // TODO we need consider allocating offheap here?
592     ByteBuffer newBuf = ByteBuffer.allocate(capacityNeeded);
593 
594     // Copy header bytes into newBuf.
595     // newBuf is HBB so no issue in calling array()
596     ByteBuffer dup = buf.duplicate();
597     dup.position(0);
598     dup.get(newBuf.array(), newBuf.arrayOffset(), headerSize);
599 
600     buf = newBuf;
601     // set limit to exclude next block's header
602     buf.limit(headerSize + uncompressedSizeWithoutHeader + cksumBytes);
603   }
604 
605   /**
606    * Return true when this block's buffer has been unpacked, false otherwise. Note this is a
607    * calculated heuristic, not tracked attribute of the block.
608    */
609   public boolean isUnpacked() {
610     final int cksumBytes = totalChecksumBytes();
611     final int headerSize = headerSize();
612     final int expectedCapacity = headerSize + uncompressedSizeWithoutHeader + cksumBytes;
613     final int bufCapacity = buf.capacity();
614     return bufCapacity == expectedCapacity || bufCapacity == expectedCapacity + headerSize;
615   }
616 
617   /** An additional sanity-check in case no compression or encryption is being used. */
618   public static void verifyUncompressed(ByteBuffer buf, boolean useHBaseChecksum)
619       throws IOException {
620     int onDiskSizeWithoutHeader = buf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX);
621     int uncompressedSizeWithoutHeader = buf.getInt(Header.UNCOMPRESSED_SIZE_WITHOUT_HEADER_INDEX);
622     int onDiskDataSizeWithHeader;
623     int checksumBytes = 0;
624     if (useHBaseChecksum) {
625       onDiskDataSizeWithHeader = buf.getInt(Header.ON_DISK_DATA_SIZE_WITH_HEADER_INDEX);
626       checksumBytes = (int) ChecksumUtil.numBytes(onDiskDataSizeWithHeader,
627           buf.getInt(Header.BYTES_PER_CHECKSUM_INDEX));
628     }
629 
630     if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader + checksumBytes) {
631       throw new IOException("Using no compression but "
632           + "onDiskSizeWithoutHeader=" + onDiskSizeWithoutHeader + ", "
633           + "uncompressedSizeWithoutHeader=" + uncompressedSizeWithoutHeader
634           + ", numChecksumbytes=" + checksumBytes);
635     }
636   }
637 
638   /**
639    * @param expectedType the expected type of this block
640    * @throws IOException if this block's type is different than expected
641    */
642   public void expectType(BlockType expectedType) throws IOException {
643     if (blockType != expectedType) {
644       throw new IOException("Invalid block type: expected=" + expectedType
645           + ", actual=" + blockType);
646     }
647   }
648 
649   /** @return the offset of this block in the file it was read from */
650   public long getOffset() {
651     if (offset < 0) {
652       throw new IllegalStateException(
653           "HFile block offset not initialized properly");
654     }
655     return offset;
656   }
657 
658   /**
659    * @return a byte stream reading the data + checksum of this block
660    */
661   public DataInputStream getByteStream() {
662     ByteBuffer dup = this.buf.duplicate();
663     dup.position(this.headerSize());
664     return new DataInputStream(new ByteBufferInputStream(dup));
665   }
666 
667   @Override
668   public long heapSize() {
669     long size = ClassSize.align(
670         ClassSize.OBJECT +
671         // Block type, byte buffer and meta references
672         3 * ClassSize.REFERENCE +
673         // On-disk size, uncompressed size, and next block's on-disk size
674         // bytePerChecksum and onDiskDataSize
675         4 * Bytes.SIZEOF_INT +
676         // This and previous block offset
677         2 * Bytes.SIZEOF_LONG +
678         // Heap size of the meta object. meta will be always not null.
679         fileContext.heapSize()
680     );
681 
682     if (buf != null) {
683       // Deep overhead of the byte buffer. Needs to be aligned separately.
684       size += ClassSize.align(buf.capacity() + BYTE_BUFFER_HEAP_SIZE);
685     }
686 
687     return ClassSize.align(size);
688   }
689 
690   /**
691    * Read from an input stream. Analogous to
692    * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but specifies a
693    * number of "extra" bytes that would be desirable but not absolutely
694    * necessary to read.
695    *
696    * @param in the input stream to read from
697    * @param buf the buffer to read into
698    * @param bufOffset the destination offset in the buffer
699    * @param necessaryLen the number of bytes that are absolutely necessary to
700    *          read
701    * @param extraLen the number of extra bytes that would be nice to read
702    * @return true if succeeded reading the extra bytes
703    * @throws IOException if failed to read the necessary bytes
704    */
705   public static boolean readWithExtra(InputStream in, byte[] buf,
706       int bufOffset, int necessaryLen, int extraLen) throws IOException {
707     int bytesRemaining = necessaryLen + extraLen;
708     while (bytesRemaining > 0) {
709       int ret = in.read(buf, bufOffset, bytesRemaining);
710       if (ret == -1 && bytesRemaining <= extraLen) {
711         // We could not read the "extra data", but that is OK.
712         break;
713       }
714 
715       if (ret < 0) {
716         throw new IOException("Premature EOF from inputStream (read "
717             + "returned " + ret + ", was trying to read " + necessaryLen
718             + " necessary bytes and " + extraLen + " extra bytes, "
719             + "successfully read "
720             + (necessaryLen + extraLen - bytesRemaining));
721       }
722       bufOffset += ret;
723       bytesRemaining -= ret;
724     }
725     return bytesRemaining <= 0;
726   }
727 
728   /**
729    * Read from an input stream. Analogous to
730    * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but uses
731    * positional read and specifies a number of "extra" bytes that would be
732    * desirable but not absolutely necessary to read.
733    *
734    * @param in the input stream to read from
735    * @param position the position within the stream from which to start reading
736    * @param buf the buffer to read into
737    * @param bufOffset the destination offset in the buffer
738    * @param necessaryLen the number of bytes that are absolutely necessary to
739    *     read
740    * @param extraLen the number of extra bytes that would be nice to read
741    * @return true if and only if extraLen is > 0 and reading those extra bytes
742    *     was successful
743    * @throws IOException if failed to read the necessary bytes
744    */
745   @VisibleForTesting
746   static boolean positionalReadWithExtra(FSDataInputStream in,
747       long position, byte[] buf, int bufOffset, int necessaryLen, int extraLen)
748       throws IOException {
749     int bytesRemaining = necessaryLen + extraLen;
750     int bytesRead = 0;
751     while (bytesRead < necessaryLen) {
752       int ret = in.read(position, buf, bufOffset, bytesRemaining);
753       if (ret < 0) {
754         throw new IOException("Premature EOF from inputStream (positional read "
755             + "returned " + ret + ", was trying to read " + necessaryLen
756             + " necessary bytes and " + extraLen + " extra bytes, "
757             + "successfully read " + bytesRead);
758       }
759       position += ret;
760       bufOffset += ret;
761       bytesRemaining -= ret;
762       bytesRead += ret;
763     }
764     return bytesRead != necessaryLen && bytesRemaining <= 0;
765   }
766 
767   /**
768    * @return the on-disk size of the next block (including the header size)
769    *         that was read by peeking into the next block's header
770    */
771   public int getNextBlockOnDiskSizeWithHeader() {
772     return nextBlockOnDiskSizeWithHeader;
773   }
774 
775   /**
776    * Unified version 2 {@link HFile} block writer. The intended usage pattern
777    * is as follows:
778    * <ol>
779    * <li>Construct an {@link HFileBlock.Writer}, providing a compression algorithm.
780    * <li>Call {@link Writer#startWriting} and get a data stream to write to.
781    * <li>Write your data into the stream.
782    * <li>Call {@link Writer#writeHeaderAndData(FSDataOutputStream)} as many times as you need to.
783    * store the serialized block into an external stream.
784    * <li>Repeat to write more blocks.
785    * </ol>
786    * <p>
787    */
788   public static class Writer {
789 
790     private enum State {
791       INIT,
792       WRITING,
793       BLOCK_READY
794     };
795 
796     /** Writer state. Used to ensure the correct usage protocol. */
797     private State state = State.INIT;
798 
799     /** Data block encoder used for data blocks */
800     private final HFileDataBlockEncoder dataBlockEncoder;
801 
802     private HFileBlockEncodingContext dataBlockEncodingCtx;
803 
804     /** block encoding context for non-data blocks */
805     private HFileBlockDefaultEncodingContext defaultBlockEncodingCtx;
806 
807     /**
808      * The stream we use to accumulate data in uncompressed format for each
809      * block. We reset this stream at the end of each block and reuse it. The
810      * header is written as the first {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes into this
811      * stream.
812      */
813     private ByteArrayOutputStream baosInMemory;
814 
815     /**
816      * Current block type. Set in {@link #startWriting(BlockType)}. Could be
817      * changed in {@link #finishBlock()} from {@link BlockType#DATA}
818      * to {@link BlockType#ENCODED_DATA}.
819      */
820     private BlockType blockType;
821 
822     /**
823      * A stream that we write uncompressed bytes to, which compresses them and
824      * writes them to {@link #baosInMemory}.
825      */
826     private DataOutputStream userDataStream;
827 
828     // Size of actual data being written. Not considering the block encoding/compression. This
829     // includes the header size also.
830     private int unencodedDataSizeWritten;
831 
832     /**
833      * Bytes to be written to the file system, including the header. Compressed
834      * if compression is turned on. It also includes the checksum data that
835      * immediately follows the block data. (header + data + checksums)
836      */
837     private byte[] onDiskBytesWithHeader;
838 
839     /**
840      * The size of the checksum data on disk. It is used only if data is
841      * not compressed. If data is compressed, then the checksums are already
842      * part of onDiskBytesWithHeader. If data is uncompressed, then this
843      * variable stores the checksum data for this block.
844      */
845     private byte[] onDiskChecksum;
846 
847     /**
848      * Valid in the READY state. Contains the header and the uncompressed (but
849      * potentially encoded, if this is a data block) bytes, so the length is
850      * {@link #uncompressedSizeWithoutHeader} +
851      * {@link org.apache.hadoop.hbase.HConstants#HFILEBLOCK_HEADER_SIZE}.
852      * Does not store checksums.
853      */
854     private byte[] uncompressedBytesWithHeader;
855 
856     /**
857      * Current block's start offset in the {@link HFile}. Set in
858      * {@link #writeHeaderAndData(FSDataOutputStream)}.
859      */
860     private long startOffset;
861 
862     /**
863      * Offset of previous block by block type. Updated when the next block is
864      * started.
865      */
866     private long[] prevOffsetByType;
867 
868     /** The offset of the previous block of the same type */
869     private long prevOffset;
870     /** Meta data that holds information about the hfileblock**/
871     private HFileContext fileContext;
872 
873     /**
874      * @param dataBlockEncoder data block encoding algorithm to use
875      */
876     public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext fileContext) {
877       this.dataBlockEncoder = dataBlockEncoder != null
878           ? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE;
879       defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(null,
880           HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
881       dataBlockEncodingCtx = this.dataBlockEncoder
882           .newDataBlockEncodingContext(HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
883 
884       if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) {
885         throw new RuntimeException("Unsupported value of bytesPerChecksum. " +
886             " Minimum is " + HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is " +
887             fileContext.getBytesPerChecksum());
888       }
889 
890       baosInMemory = new ByteArrayOutputStream();
891 
892       prevOffsetByType = new long[BlockType.values().length];
893       for (int i = 0; i < prevOffsetByType.length; ++i)
894         prevOffsetByType[i] = -1;
895 
896       this.fileContext = fileContext;
897     }
898 
899     /**
900      * Starts writing into the block. The previous block's data is discarded.
901      *
902      * @return the stream the user can write their data into
903      * @throws IOException
904      */
905     public DataOutputStream startWriting(BlockType newBlockType)
906         throws IOException {
907       if (state == State.BLOCK_READY && startOffset != -1) {
908         // We had a previous block that was written to a stream at a specific
909         // offset. Save that offset as the last offset of a block of that type.
910         prevOffsetByType[blockType.getId()] = startOffset;
911       }
912 
913       startOffset = -1;
914       blockType = newBlockType;
915 
916       baosInMemory.reset();
917       baosInMemory.write(HConstants.HFILEBLOCK_DUMMY_HEADER);
918 
919       state = State.WRITING;
920 
921       // We will compress it later in finishBlock()
922       userDataStream = new DataOutputStream(baosInMemory);
923       if (newBlockType == BlockType.DATA) {
924         this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream);
925       }
926       this.unencodedDataSizeWritten = 0;
927       return userDataStream;
928     }
929 
930     /**
931      * Writes the Cell to this block
932      * @param cell
933      * @throws IOException
934      */
935     public void write(Cell cell) throws IOException{
936       expectState(State.WRITING);
937       this.unencodedDataSizeWritten += this.dataBlockEncoder.encode(cell, dataBlockEncodingCtx,
938           this.userDataStream);
939     }
940 
941     /**
942      * Returns the stream for the user to write to. The block writer takes care
943      * of handling compression and buffering for caching on write. Can only be
944      * called in the "writing" state.
945      *
946      * @return the data output stream for the user to write to
947      */
948     DataOutputStream getUserDataStream() {
949       expectState(State.WRITING);
950       return userDataStream;
951     }
952 
953     /**
954      * Transitions the block writer from the "writing" state to the "block
955      * ready" state.  Does nothing if a block is already finished.
956      */
957     void ensureBlockReady() throws IOException {
958       Preconditions.checkState(state != State.INIT,
959           "Unexpected state: " + state);
960 
961       if (state == State.BLOCK_READY)
962         return;
963 
964       // This will set state to BLOCK_READY.
965       finishBlock();
966     }
967 
968     /**
969      * An internal method that flushes the compressing stream (if using
970      * compression), serializes the header, and takes care of the separate
971      * uncompressed stream for caching on write, if applicable. Sets block
972      * write state to "block ready".
973      */
974     private void finishBlock() throws IOException {
975       if (blockType == BlockType.DATA) {
976         BufferGrabbingByteArrayOutputStream baosInMemoryCopy =
977             new BufferGrabbingByteArrayOutputStream();
978         baosInMemory.writeTo(baosInMemoryCopy);
979         this.dataBlockEncoder.endBlockEncoding(dataBlockEncodingCtx, userDataStream,
980             baosInMemoryCopy.buf, blockType);
981         blockType = dataBlockEncodingCtx.getBlockType();
982       }
983       userDataStream.flush();
984       // This does an array copy, so it is safe to cache this byte array.
985       uncompressedBytesWithHeader = baosInMemory.toByteArray();
986       prevOffset = prevOffsetByType[blockType.getId()];
987 
988       // We need to set state before we can package the block up for
989       // cache-on-write. In a way, the block is ready, but not yet encoded or
990       // compressed.
991       state = State.BLOCK_READY;
992       if (blockType == BlockType.DATA || blockType == BlockType.ENCODED_DATA) {
993         onDiskBytesWithHeader = dataBlockEncodingCtx
994             .compressAndEncrypt(uncompressedBytesWithHeader);
995       } else {
996         onDiskBytesWithHeader = defaultBlockEncodingCtx
997             .compressAndEncrypt(uncompressedBytesWithHeader);
998       }
999       int numBytes = (int) ChecksumUtil.numBytes(
1000           onDiskBytesWithHeader.length,
1001           fileContext.getBytesPerChecksum());
1002 
1003       // put the header for on disk bytes
1004       putHeader(onDiskBytesWithHeader, 0,
1005           onDiskBytesWithHeader.length + numBytes,
1006           uncompressedBytesWithHeader.length, onDiskBytesWithHeader.length);
1007       // set the header for the uncompressed bytes (for cache-on-write)
1008       putHeader(uncompressedBytesWithHeader, 0,
1009           onDiskBytesWithHeader.length + numBytes,
1010           uncompressedBytesWithHeader.length, onDiskBytesWithHeader.length);
1011 
1012       onDiskChecksum = new byte[numBytes];
1013       ChecksumUtil.generateChecksums(
1014           onDiskBytesWithHeader, 0, onDiskBytesWithHeader.length,
1015           onDiskChecksum, 0, fileContext.getChecksumType(), fileContext.getBytesPerChecksum());
1016     }
1017 
1018     public static class BufferGrabbingByteArrayOutputStream extends ByteArrayOutputStream {
1019       private byte[] buf;
1020 
1021       @Override
1022       public void write(byte[] b, int off, int len) {
1023         this.buf = b;
1024       }
1025 
1026       public byte[] getBuffer() {
1027         return this.buf;
1028       }
1029     }
1030 
1031     /**
1032      * Put the header into the given byte array at the given offset.
1033      * @param onDiskSize size of the block on disk header + data + checksum
1034      * @param uncompressedSize size of the block after decompression (but
1035      *          before optional data block decoding) including header
1036      * @param onDiskDataSize size of the block on disk with header
1037      *        and data but not including the checksums
1038      */
1039     private void putHeader(byte[] dest, int offset, int onDiskSize,
1040         int uncompressedSize, int onDiskDataSize) {
1041       offset = blockType.put(dest, offset);
1042       offset = Bytes.putInt(dest, offset, onDiskSize - HConstants.HFILEBLOCK_HEADER_SIZE);
1043       offset = Bytes.putInt(dest, offset, uncompressedSize - HConstants.HFILEBLOCK_HEADER_SIZE);
1044       offset = Bytes.putLong(dest, offset, prevOffset);
1045       offset = Bytes.putByte(dest, offset, fileContext.getChecksumType().getCode());
1046       offset = Bytes.putInt(dest, offset, fileContext.getBytesPerChecksum());
1047       Bytes.putInt(dest, offset, onDiskDataSize);
1048     }
1049 
1050     /**
1051      * Similar to {@link #writeHeaderAndData(FSDataOutputStream)}, but records
1052      * the offset of this block so that it can be referenced in the next block
1053      * of the same type.
1054      *
1055      * @param out
1056      * @throws IOException
1057      */
1058     public void writeHeaderAndData(FSDataOutputStream out) throws IOException {
1059       long offset = out.getPos();
1060       if (startOffset != -1 && offset != startOffset) {
1061         throw new IOException("A " + blockType + " block written to a "
1062             + "stream twice, first at offset " + startOffset + ", then at "
1063             + offset);
1064       }
1065       startOffset = offset;
1066 
1067       finishBlockAndWriteHeaderAndData((DataOutputStream) out);
1068     }
1069 
1070     /**
1071      * Writes the header and the compressed data of this block (or uncompressed
1072      * data when not using compression) into the given stream. Can be called in
1073      * the "writing" state or in the "block ready" state. If called in the
1074      * "writing" state, transitions the writer to the "block ready" state.
1075      *
1076      * @param out the output stream to write the
1077      * @throws IOException
1078      */
1079     protected void finishBlockAndWriteHeaderAndData(DataOutputStream out)
1080       throws IOException {
1081       ensureBlockReady();
1082       out.write(onDiskBytesWithHeader);
1083       out.write(onDiskChecksum);
1084     }
1085 
1086     /**
1087      * Returns the header or the compressed data (or uncompressed data when not
1088      * using compression) as a byte array. Can be called in the "writing" state
1089      * or in the "block ready" state. If called in the "writing" state,
1090      * transitions the writer to the "block ready" state. This returns
1091      * the header + data + checksums stored on disk.
1092      *
1093      * @return header and data as they would be stored on disk in a byte array
1094      * @throws IOException
1095      */
1096     byte[] getHeaderAndDataForTest() throws IOException {
1097       ensureBlockReady();
1098       // This is not very optimal, because we are doing an extra copy.
1099       // But this method is used only by unit tests.
1100       byte[] output =
1101           new byte[onDiskBytesWithHeader.length
1102               + onDiskChecksum.length];
1103       System.arraycopy(onDiskBytesWithHeader, 0, output, 0,
1104           onDiskBytesWithHeader.length);
1105       System.arraycopy(onDiskChecksum, 0, output,
1106           onDiskBytesWithHeader.length, onDiskChecksum.length);
1107       return output;
1108     }
1109 
1110     /**
1111      * Releases resources used by this writer.
1112      */
1113     public void release() {
1114       if (dataBlockEncodingCtx != null) {
1115         dataBlockEncodingCtx.close();
1116         dataBlockEncodingCtx = null;
1117       }
1118       if (defaultBlockEncodingCtx != null) {
1119         defaultBlockEncodingCtx.close();
1120         defaultBlockEncodingCtx = null;
1121       }
1122     }
1123 
1124     /**
1125      * Returns the on-disk size of the data portion of the block. This is the
1126      * compressed size if compression is enabled. Can only be called in the
1127      * "block ready" state. Header is not compressed, and its size is not
1128      * included in the return value.
1129      *
1130      * @return the on-disk size of the block, not including the header.
1131      */
1132     int getOnDiskSizeWithoutHeader() {
1133       expectState(State.BLOCK_READY);
1134       return onDiskBytesWithHeader.length
1135           + onDiskChecksum.length
1136           - HConstants.HFILEBLOCK_HEADER_SIZE;
1137     }
1138 
1139     /**
1140      * Returns the on-disk size of the block. Can only be called in the
1141      * "block ready" state.
1142      *
1143      * @return the on-disk size of the block ready to be written, including the
1144      *         header size, the data and the checksum data.
1145      */
1146     int getOnDiskSizeWithHeader() {
1147       expectState(State.BLOCK_READY);
1148       return onDiskBytesWithHeader.length + onDiskChecksum.length;
1149     }
1150 
1151     /**
1152      * The uncompressed size of the block data. Does not include header size.
1153      */
1154     int getUncompressedSizeWithoutHeader() {
1155       expectState(State.BLOCK_READY);
1156       return uncompressedBytesWithHeader.length - HConstants.HFILEBLOCK_HEADER_SIZE;
1157     }
1158 
1159     /**
1160      * The uncompressed size of the block data, including header size.
1161      */
1162     int getUncompressedSizeWithHeader() {
1163       expectState(State.BLOCK_READY);
1164       return uncompressedBytesWithHeader.length;
1165     }
1166 
1167     /** @return true if a block is being written  */
1168     public boolean isWriting() {
1169       return state == State.WRITING;
1170     }
1171 
1172     /**
1173      * Returns the number of bytes written into the current block so far, or
1174      * zero if not writing the block at the moment. Note that this will return
1175      * zero in the "block ready" state as well.
1176      *
1177      * @return the number of bytes written
1178      */
1179     public int blockSizeWritten() {
1180       if (state != State.WRITING) return 0;
1181       return this.unencodedDataSizeWritten;
1182     }
1183 
1184     /**
1185      * Returns the header followed by the uncompressed data, even if using
1186      * compression. This is needed for storing uncompressed blocks in the block
1187      * cache. Can be called in the "writing" state or the "block ready" state.
1188      * Returns only the header and data, does not include checksum data.
1189      *
1190      * @return uncompressed block bytes for caching on write
1191      */
1192     ByteBuffer getUncompressedBufferWithHeader() {
1193       expectState(State.BLOCK_READY);
1194       return ByteBuffer.wrap(uncompressedBytesWithHeader);
1195     }
1196 
1197     /**
1198      * Returns the header followed by the on-disk (compressed/encoded/encrypted) data. This is
1199      * needed for storing packed blocks in the block cache. Expects calling semantics identical to
1200      * {@link #getUncompressedBufferWithHeader()}. Returns only the header and data,
1201      * Does not include checksum data.
1202      *
1203      * @return packed block bytes for caching on write
1204      */
1205     ByteBuffer getOnDiskBufferWithHeader() {
1206       expectState(State.BLOCK_READY);
1207       return ByteBuffer.wrap(onDiskBytesWithHeader);
1208     }
1209 
1210     private void expectState(State expectedState) {
1211       if (state != expectedState) {
1212         throw new IllegalStateException("Expected state: " + expectedState +
1213             ", actual state: " + state);
1214       }
1215     }
1216 
1217     /**
1218      * Takes the given {@link BlockWritable} instance, creates a new block of
1219      * its appropriate type, writes the writable into this block, and flushes
1220      * the block into the output stream. The writer is instructed not to buffer
1221      * uncompressed bytes for cache-on-write.
1222      *
1223      * @param bw the block-writable object to write as a block
1224      * @param out the file system output stream
1225      * @throws IOException
1226      */
1227     public void writeBlock(BlockWritable bw, FSDataOutputStream out)
1228         throws IOException {
1229       bw.writeToBlock(startWriting(bw.getBlockType()));
1230       writeHeaderAndData(out);
1231     }
1232 
1233     /**
1234      * Creates a new HFileBlock. Checksums have already been validated, so
1235      * the byte buffer passed into the constructor of this newly created
1236      * block does not have checksum data even though the header minor
1237      * version is MINOR_VERSION_WITH_CHECKSUM. This is indicated by setting a
1238      * 0 value in bytesPerChecksum.
1239      */
1240     public HFileBlock getBlockForCaching(CacheConfig cacheConf) {
1241       HFileContext newContext = new HFileContextBuilder()
1242                                 .withBlockSize(fileContext.getBlocksize())
1243                                 .withBytesPerCheckSum(0)
1244                                 .withChecksumType(ChecksumType.NULL) // no checksums in cached data
1245                                 .withCompression(fileContext.getCompression())
1246                                 .withDataBlockEncoding(fileContext.getDataBlockEncoding())
1247                                 .withHBaseCheckSum(fileContext.isUseHBaseChecksum())
1248                                 .withCompressTags(fileContext.isCompressTags())
1249                                 .withIncludesMvcc(fileContext.isIncludesMvcc())
1250                                 .withIncludesTags(fileContext.isIncludesTags())
1251                                 .build();
1252       return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(),
1253           getUncompressedSizeWithoutHeader(), prevOffset,
1254           cacheConf.shouldCacheCompressed(blockType.getCategory()) ?
1255             getOnDiskBufferWithHeader() :
1256             getUncompressedBufferWithHeader(),
1257           FILL_HEADER, startOffset,
1258           onDiskBytesWithHeader.length + onDiskChecksum.length, newContext);
1259     }
1260   }
1261 
1262   /** Something that can be written into a block. */
1263   public interface BlockWritable {
1264 
1265     /** The type of block this data should use. */
1266     BlockType getBlockType();
1267 
1268     /**
1269      * Writes the block to the provided stream. Must not write any magic
1270      * records.
1271      *
1272      * @param out a stream to write uncompressed data into
1273      */
1274     void writeToBlock(DataOutput out) throws IOException;
1275   }
1276 
1277   // Block readers and writers
1278 
1279   /** An interface allowing to iterate {@link HFileBlock}s. */
1280   public interface BlockIterator {
1281 
1282     /**
1283      * Get the next block, or null if there are no more blocks to iterate.
1284      */
1285     HFileBlock nextBlock() throws IOException;
1286 
1287     /**
1288      * Similar to {@link #nextBlock()} but checks block type, throws an
1289      * exception if incorrect, and returns the HFile block
1290      */
1291     HFileBlock nextBlockWithBlockType(BlockType blockType) throws IOException;
1292   }
1293 
1294   /** A full-fledged reader with iteration ability. */
1295   public interface FSReader {
1296 
1297     /**
1298      * Reads the block at the given offset in the file with the given on-disk
1299      * size and uncompressed size.
1300      *
1301      * @param offset
1302      * @param onDiskSize the on-disk size of the entire block, including all
1303      *          applicable headers, or -1 if unknown
1304      * @param uncompressedSize the uncompressed size of the compressed part of
1305      *          the block, or -1 if unknown
1306      * @return the newly read block
1307      */
1308     HFileBlock readBlockData(long offset, long onDiskSize,
1309         int uncompressedSize, boolean pread) throws IOException;
1310 
1311     /**
1312      * Creates a block iterator over the given portion of the {@link HFile}.
1313      * The iterator returns blocks starting with offset such that offset &lt;=
1314      * startOffset &lt; endOffset. Returned blocks are always unpacked.
1315      *
1316      * @param startOffset the offset of the block to start iteration with
1317      * @param endOffset the offset to end iteration at (exclusive)
1318      * @return an iterator of blocks between the two given offsets
1319      */
1320     BlockIterator blockRange(long startOffset, long endOffset);
1321 
1322     /** Closes the backing streams */
1323     void closeStreams() throws IOException;
1324 
1325     /** Get a decoder for {@link BlockType#ENCODED_DATA} blocks from this file. */
1326     HFileBlockDecodingContext getBlockDecodingContext();
1327 
1328     /** Get the default decoder for blocks from this file. */
1329     HFileBlockDecodingContext getDefaultBlockDecodingContext();
1330   }
1331 
1332   /**
1333    * A common implementation of some methods of {@link FSReader} and some
1334    * tools for implementing HFile format version-specific block readers.
1335    */
1336   private abstract static class AbstractFSReader implements FSReader {
1337     /** Compression algorithm used by the {@link HFile} */
1338 
1339     /** The size of the file we are reading from, or -1 if unknown. */
1340     protected long fileSize;
1341 
1342     /** The size of the header */
1343     protected final int hdrSize;
1344 
1345     /** The filesystem used to access data */
1346     protected HFileSystem hfs;
1347 
1348     /** The path (if any) where this data is coming from */
1349     protected Path path;
1350 
1351     private final Lock streamLock = new ReentrantLock();
1352 
1353     /** The default buffer size for our buffered streams */
1354     public static final int DEFAULT_BUFFER_SIZE = 1 << 20;
1355 
1356     protected HFileContext fileContext;
1357 
1358     public AbstractFSReader(long fileSize, HFileSystem hfs, Path path, HFileContext fileContext)
1359         throws IOException {
1360       this.fileSize = fileSize;
1361       this.hfs = hfs;
1362       this.path = path;
1363       this.fileContext = fileContext;
1364       this.hdrSize = headerSize(fileContext.isUseHBaseChecksum());
1365     }
1366 
1367     @Override
1368     public BlockIterator blockRange(final long startOffset,
1369         final long endOffset) {
1370       final FSReader owner = this; // handle for inner class
1371       return new BlockIterator() {
1372         private long offset = startOffset;
1373 
1374         @Override
1375         public HFileBlock nextBlock() throws IOException {
1376           if (offset >= endOffset)
1377             return null;
1378           HFileBlock b = readBlockData(offset, -1, -1, false);
1379           offset += b.getOnDiskSizeWithHeader();
1380           return b.unpack(fileContext, owner);
1381         }
1382 
1383         @Override
1384         public HFileBlock nextBlockWithBlockType(BlockType blockType)
1385             throws IOException {
1386           HFileBlock blk = nextBlock();
1387           if (blk.getBlockType() != blockType) {
1388             throw new IOException("Expected block of type " + blockType
1389                 + " but found " + blk.getBlockType());
1390           }
1391           return blk;
1392         }
1393       };
1394     }
1395 
1396     /**
1397      * Does a positional read or a seek and read into the given buffer. Returns
1398      * the on-disk size of the next block, or -1 if it could not be determined.
1399      *
1400      * @param dest destination buffer
1401      * @param destOffset offset in the destination buffer
1402      * @param size size of the block to be read
1403      * @param peekIntoNextBlock whether to read the next block's on-disk size
1404      * @param fileOffset position in the stream to read at
1405      * @param pread whether we should do a positional read
1406      * @param istream The input source of data
1407      * @return the on-disk size of the next block with header size included, or
1408      *         -1 if it could not be determined
1409      * @throws IOException
1410      */
1411     protected int readAtOffset(FSDataInputStream istream, byte[] dest, int destOffset, int size,
1412         boolean peekIntoNextBlock, long fileOffset, boolean pread) throws IOException {
1413       if (peekIntoNextBlock &&
1414           destOffset + size + hdrSize > dest.length) {
1415         // We are asked to read the next block's header as well, but there is
1416         // not enough room in the array.
1417         throw new IOException("Attempted to read " + size + " bytes and " +
1418             hdrSize + " bytes of next header into a " + dest.length +
1419             "-byte array at offset " + destOffset);
1420       }
1421 
1422       if (!pread && streamLock.tryLock()) {
1423         // Seek + read. Better for scanning.
1424         try {
1425           istream.seek(fileOffset);
1426 
1427           long realOffset = istream.getPos();
1428           if (realOffset != fileOffset) {
1429             throw new IOException("Tried to seek to " + fileOffset + " to "
1430                 + "read " + size + " bytes, but pos=" + realOffset
1431                 + " after seek");
1432           }
1433 
1434           if (!peekIntoNextBlock) {
1435             IOUtils.readFully(istream, dest, destOffset, size);
1436             return -1;
1437           }
1438 
1439           // Try to read the next block header.
1440           if (!readWithExtra(istream, dest, destOffset, size, hdrSize))
1441             return -1;
1442         } finally {
1443           streamLock.unlock();
1444         }
1445       } else {
1446         // Positional read. Better for random reads; or when the streamLock is already locked.
1447         int extraSize = peekIntoNextBlock ? hdrSize : 0;
1448         if (!positionalReadWithExtra(istream, fileOffset, dest, destOffset,
1449             size, extraSize)) {
1450           return -1;
1451         }
1452       }
1453 
1454       assert peekIntoNextBlock;
1455       return Bytes.toInt(dest, destOffset + size + BlockType.MAGIC_LENGTH) + hdrSize;
1456     }
1457 
1458   }
1459 
1460   /**
1461    * We always prefetch the header of the next block, so that we know its
1462    * on-disk size in advance and can read it in one operation.
1463    */
1464   private static class PrefetchedHeader {
1465     long offset = -1;
1466     byte[] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE];
1467     final ByteBuffer buf = ByteBuffer.wrap(header, 0, HConstants.HFILEBLOCK_HEADER_SIZE);
1468   }
1469 
1470   /** Reads version 2 blocks from the filesystem. */
1471   static class FSReaderImpl extends AbstractFSReader {
1472     /** The file system stream of the underlying {@link HFile} that
1473      * does or doesn't do checksum validations in the filesystem */
1474     protected FSDataInputStreamWrapper streamWrapper;
1475 
1476     private HFileBlockDecodingContext encodedBlockDecodingCtx;
1477 
1478     /** Default context used when BlockType != {@link BlockType#ENCODED_DATA}. */
1479     private final HFileBlockDefaultDecodingContext defaultDecodingCtx;
1480 
1481     private ThreadLocal<PrefetchedHeader> prefetchedHeaderForThread =
1482         new ThreadLocal<PrefetchedHeader>() {
1483           @Override
1484           public PrefetchedHeader initialValue() {
1485             return new PrefetchedHeader();
1486           }
1487         };
1488 
1489     public FSReaderImpl(FSDataInputStreamWrapper stream, long fileSize, HFileSystem hfs, Path path,
1490         HFileContext fileContext) throws IOException {
1491       super(fileSize, hfs, path, fileContext);
1492       this.streamWrapper = stream;
1493       // Older versions of HBase didn't support checksum.
1494       this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum());
1495       defaultDecodingCtx = new HFileBlockDefaultDecodingContext(fileContext);
1496       encodedBlockDecodingCtx = defaultDecodingCtx;
1497     }
1498 
1499     /**
1500      * A constructor that reads files with the latest minor version.
1501      * This is used by unit tests only.
1502      */
1503     FSReaderImpl(FSDataInputStream istream, long fileSize, HFileContext fileContext)
1504     throws IOException {
1505       this(new FSDataInputStreamWrapper(istream), fileSize, null, null, fileContext);
1506     }
1507 
1508     /**
1509      * Reads a version 2 block (version 1 blocks not supported and not expected). Tries to do as
1510      * little memory allocation as possible, using the provided on-disk size.
1511      *
1512      * @param offset the offset in the stream to read at
1513      * @param onDiskSizeWithHeaderL the on-disk size of the block, including
1514      *          the header, or -1 if unknown
1515      * @param uncompressedSize the uncompressed size of the the block. Always
1516      *          expected to be -1. This parameter is only used in version 1.
1517      * @param pread whether to use a positional read
1518      */
1519     @Override
1520     public HFileBlock readBlockData(long offset, long onDiskSizeWithHeaderL,
1521         int uncompressedSize, boolean pread)
1522     throws IOException {
1523 
1524       // get a copy of the current state of whether to validate
1525       // hbase checksums or not for this read call. This is not
1526       // thread-safe but the one constaint is that if we decide
1527       // to skip hbase checksum verification then we are
1528       // guaranteed to use hdfs checksum verification.
1529       boolean doVerificationThruHBaseChecksum = streamWrapper.shouldUseHBaseChecksum();
1530       FSDataInputStream is = streamWrapper.getStream(doVerificationThruHBaseChecksum);
1531 
1532       HFileBlock blk = readBlockDataInternal(is, offset, (int) onDiskSizeWithHeaderL,
1533           uncompressedSize, pread, doVerificationThruHBaseChecksum);
1534       if (blk == null) {
1535         HFile.LOG.warn("HBase checksum verification failed for file " +
1536                        path + " at offset " +
1537                        offset + " filesize " + fileSize +
1538                        ". Retrying read with HDFS checksums turned on...");
1539 
1540         if (!doVerificationThruHBaseChecksum) {
1541           String msg = "HBase checksum verification failed for file " +
1542                        path + " at offset " +
1543                        offset + " filesize " + fileSize +
1544                        " but this cannot happen because doVerify is " +
1545                        doVerificationThruHBaseChecksum;
1546           HFile.LOG.warn(msg);
1547           throw new IOException(msg); // cannot happen case here
1548         }
1549         HFile.checksumFailures.incrementAndGet(); // update metrics
1550 
1551         // If we have a checksum failure, we fall back into a mode where
1552         // the next few reads use HDFS level checksums. We aim to make the
1553         // next CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD reads avoid
1554         // hbase checksum verification, but since this value is set without
1555         // holding any locks, it can so happen that we might actually do
1556         // a few more than precisely this number.
1557         is = this.streamWrapper.fallbackToFsChecksum(CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD);
1558         doVerificationThruHBaseChecksum = false;
1559         blk = readBlockDataInternal(is, offset, (int) onDiskSizeWithHeaderL, uncompressedSize,
1560             pread, doVerificationThruHBaseChecksum);
1561         if (blk != null) {
1562           HFile.LOG.warn("HDFS checksum verification suceeded for file " +
1563                          path + " at offset " +
1564                          offset + " filesize " + fileSize);
1565         }
1566       }
1567       if (blk == null && !doVerificationThruHBaseChecksum) {
1568         String msg = "readBlockData failed, possibly due to " +
1569                      "checksum verification failed for file " + path +
1570                      " at offset " + offset + " filesize " + fileSize;
1571         HFile.LOG.warn(msg);
1572         throw new IOException(msg);
1573       }
1574 
1575       // If there is a checksum mismatch earlier, then retry with
1576       // HBase checksums switched off and use HDFS checksum verification.
1577       // This triggers HDFS to detect and fix corrupt replicas. The
1578       // next checksumOffCount read requests will use HDFS checksums.
1579       // The decrementing of this.checksumOffCount is not thread-safe,
1580       // but it is harmless because eventually checksumOffCount will be
1581       // a negative number.
1582       streamWrapper.checksumOk();
1583       return blk;
1584     }
1585 
1586     /**
1587      * Reads a version 2 block.
1588      *
1589      * @param offset the offset in the stream to read at
1590      * @param onDiskSizeWithHeader the on-disk size of the block, including
1591      *          the header, or -1 if unknown
1592      * @param uncompressedSize the uncompressed size of the the block. Always
1593      *          expected to be -1. This parameter is only used in version 1.
1594      * @param pread whether to use a positional read
1595      * @param verifyChecksum Whether to use HBase checksums.
1596      *        If HBase checksum is switched off, then use HDFS checksum.
1597      * @return the HFileBlock or null if there is a HBase checksum mismatch
1598      */
1599     protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset,
1600         int onDiskSizeWithHeader, int uncompressedSize, boolean pread,
1601         boolean verifyChecksum)
1602     throws IOException {
1603       if (offset < 0) {
1604         throw new IOException("Invalid offset=" + offset + " trying to read "
1605             + "block (onDiskSize=" + onDiskSizeWithHeader
1606             + ", uncompressedSize=" + uncompressedSize + ")");
1607       }
1608 
1609       if (uncompressedSize != -1) {
1610         throw new IOException("Version 2 block reader API does not need " +
1611             "the uncompressed size parameter");
1612       }
1613 
1614       if ((onDiskSizeWithHeader < hdrSize && onDiskSizeWithHeader != -1)
1615           || onDiskSizeWithHeader >= Integer.MAX_VALUE) {
1616         throw new IOException("Invalid onDisksize=" + onDiskSizeWithHeader
1617             + ": expected to be at least " + hdrSize
1618             + " and at most " + Integer.MAX_VALUE + ", or -1 (offset="
1619             + offset + ", uncompressedSize=" + uncompressedSize + ")");
1620       }
1621 
1622       // See if we can avoid reading the header. This is desirable, because
1623       // we will not incur a backward seek operation if we have already
1624       // read this block's header as part of the previous read's look-ahead.
1625       // And we also want to skip reading the header again if it has already
1626       // been read.
1627       // TODO: How often does this optimization fire? Has to be same thread so the thread local
1628       // is pertinent and we have to be reading next block as in a big scan.
1629       PrefetchedHeader prefetchedHeader = prefetchedHeaderForThread.get();
1630       ByteBuffer headerBuf = prefetchedHeader.offset == offset? prefetchedHeader.buf: null;
1631 
1632       // Allocate enough space to fit the next block's header too.
1633       int nextBlockOnDiskSize = 0;
1634       byte[] onDiskBlock = null;
1635 
1636       if (onDiskSizeWithHeader > 0) {
1637         // We know the total on-disk size. Read the entire block into memory,
1638         // then parse the header. This code path is used when
1639         // doing a random read operation relying on the block index, as well as
1640         // when the client knows the on-disk size from peeking into the next
1641         // block's header (e.g. this block's header) when reading the previous
1642         // block. This is the faster and more preferable case.
1643 
1644         // Size that we have to skip in case we have already read the header.
1645         int preReadHeaderSize = headerBuf == null ? 0 : hdrSize;
1646         onDiskBlock = new byte[onDiskSizeWithHeader + hdrSize]; // room for this block plus the
1647                                                                 // next block's header
1648         nextBlockOnDiskSize = readAtOffset(is, onDiskBlock,
1649             preReadHeaderSize, onDiskSizeWithHeader - preReadHeaderSize,
1650             true, offset + preReadHeaderSize, pread);
1651         if (headerBuf != null) {
1652           // the header has been read when reading the previous block, copy
1653           // to this block's header
1654           // headerBuf is HBB
1655           assert headerBuf.hasArray();
1656           System.arraycopy(headerBuf.array(),
1657               headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize);
1658         } else {
1659           headerBuf = ByteBuffer.wrap(onDiskBlock, 0, hdrSize);
1660         }
1661         // if the caller specifies a onDiskSizeWithHeader, validate it.
1662         int expectedOnDiskSizeWithoutHeader = onDiskSizeWithHeader - hdrSize;
1663         int actualOnDiskSizeWithoutHeader =
1664             headerBuf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX);
1665         validateOnDiskSizeWithoutHeader(expectedOnDiskSizeWithoutHeader,
1666             actualOnDiskSizeWithoutHeader, headerBuf, offset);
1667       } else {
1668         // Check headerBuf to see if we have read this block's header as part of
1669         // reading the previous block. This is an optimization of peeking into
1670         // the next block's header (e.g.this block's header) when reading the
1671         // previous block. This is the faster and more preferable case. If the
1672         // header is already there, don't read the header again.
1673 
1674         // Unfortunately, we still have to do a separate read operation to
1675         // read the header.
1676         if (headerBuf == null) {
1677           // From the header, determine the on-disk size of the given hfile
1678           // block, and read the remaining data, thereby incurring two read
1679           // operations. This might happen when we are doing the first read
1680           // in a series of reads or a random read, and we don't have access
1681           // to the block index. This is costly and should happen very rarely.
1682           headerBuf = ByteBuffer.allocate(hdrSize);
1683           // headerBuf is HBB
1684           readAtOffset(is, headerBuf.array(), headerBuf.arrayOffset(),
1685               hdrSize, false, offset, pread);
1686         }
1687         int onDiskSizeWithoutHeader = headerBuf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX);
1688         onDiskSizeWithHeader = onDiskSizeWithoutHeader + hdrSize;
1689         onDiskBlock = new byte[onDiskSizeWithHeader + hdrSize];
1690         System.arraycopy(headerBuf.array(), headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize);
1691         nextBlockOnDiskSize =
1692           readAtOffset(is, onDiskBlock, hdrSize, onDiskSizeWithHeader - hdrSize, true,
1693               offset + hdrSize, pread);
1694       }
1695       ByteBuffer onDiskBlockByteBuffer = ByteBuffer.wrap(onDiskBlock, 0, onDiskSizeWithHeader);
1696 
1697       if (!fileContext.isCompressedOrEncrypted()) {
1698         verifyUncompressed(headerBuf, fileContext.isUseHBaseChecksum());
1699       }
1700 
1701       if (verifyChecksum && !validateChecksum(offset, onDiskBlockByteBuffer, hdrSize)) {
1702         return null;             // checksum mismatch
1703       }
1704 
1705       // The onDiskBlock will become the headerAndDataBuffer for this block.
1706       // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already
1707       // contains the header of next block, so no need to set next
1708       // block's header in it.
1709        HFileBlock b = new HFileBlock(onDiskBlockByteBuffer, this.fileContext.isUseHBaseChecksum());
1710 
1711       b.nextBlockOnDiskSizeWithHeader = nextBlockOnDiskSize;
1712 
1713       // Set prefetched header
1714       if (b.hasNextBlockHeader()) {
1715         prefetchedHeader.offset = offset + b.getOnDiskSizeWithHeader();
1716         System.arraycopy(onDiskBlock, onDiskSizeWithHeader, prefetchedHeader.header, 0, hdrSize);
1717       }
1718 
1719       b.offset = offset;
1720       b.fileContext.setIncludesTags(this.fileContext.isIncludesTags());
1721       b.fileContext.setIncludesMvcc(this.fileContext.isIncludesMvcc());
1722       return b;
1723     }
1724 
1725     void setIncludesMemstoreTS(boolean includesMemstoreTS) {
1726       this.fileContext.setIncludesMvcc(includesMemstoreTS);
1727     }
1728 
1729     void setDataBlockEncoder(HFileDataBlockEncoder encoder) {
1730       encodedBlockDecodingCtx = encoder.newDataBlockDecodingContext(this.fileContext);
1731     }
1732 
1733     @Override
1734     public HFileBlockDecodingContext getBlockDecodingContext() {
1735       return this.encodedBlockDecodingCtx;
1736     }
1737 
1738     @Override
1739     public HFileBlockDecodingContext getDefaultBlockDecodingContext() {
1740       return this.defaultDecodingCtx;
1741     }
1742 
1743     /**
1744      * Generates the checksum for the header as well as the data and then validates it.
1745      * If the block doesn't uses checksum, returns false.
1746      * @return True if checksum matches, else false.
1747      */
1748     protected boolean validateChecksum(long offset, ByteBuffer data, int hdrSize)
1749         throws IOException {
1750       // If this is an older version of the block that does not have checksums, then return false
1751       // indicating that checksum verification did not succeed. Actually, this method should never
1752       // be called when the minorVersion is 0, thus this is a defensive check for a cannot-happen
1753       // case. Since this is a cannot-happen case, it is better to return false to indicate a
1754       // checksum validation failure.
1755       if (!fileContext.isUseHBaseChecksum()) {
1756         return false;
1757       }
1758       return ChecksumUtil.validateChecksum(data, path, offset, hdrSize);
1759     }
1760 
1761     @Override
1762     public void closeStreams() throws IOException {
1763       streamWrapper.close();
1764     }
1765 
1766     @Override
1767     public String toString() {
1768       return "hfs=" + hfs + ", path=" + path + ", fileContext=" + fileContext;
1769     }
1770   }
1771 
1772   @Override
1773   public int getSerializedLength() {
1774     if (buf != null) {
1775       // include extra bytes for the next header when it's available.
1776       int extraSpace = hasNextBlockHeader() ? headerSize() : 0;
1777       return this.buf.limit() + extraSpace + HFileBlock.EXTRA_SERIALIZATION_SPACE;
1778     }
1779     return 0;
1780   }
1781 
1782   @Override
1783   public void serialize(ByteBuffer destination) {
1784     ByteBufferUtils.copyFromBufferToBuffer(destination, this.buf, 0, getSerializedLength()
1785         - EXTRA_SERIALIZATION_SPACE);
1786     serializeExtraInfo(destination);
1787   }
1788 
1789   public void serializeExtraInfo(ByteBuffer destination) {
1790     destination.put(this.fileContext.isUseHBaseChecksum() ? (byte) 1 : (byte) 0);
1791     destination.putLong(this.offset);
1792     destination.putInt(this.nextBlockOnDiskSizeWithHeader);
1793     destination.rewind();
1794   }
1795 
1796   @Override
1797   public CacheableDeserializer<Cacheable> getDeserializer() {
1798     return HFileBlock.blockDeserializer;
1799   }
1800 
1801   @Override
1802   public boolean equals(Object comparison) {
1803     if (this == comparison) {
1804       return true;
1805     }
1806     if (comparison == null) {
1807       return false;
1808     }
1809     if (comparison.getClass() != this.getClass()) {
1810       return false;
1811     }
1812 
1813     HFileBlock castedComparison = (HFileBlock) comparison;
1814 
1815     if (castedComparison.blockType != this.blockType) {
1816       return false;
1817     }
1818     if (castedComparison.nextBlockOnDiskSizeWithHeader != this.nextBlockOnDiskSizeWithHeader) {
1819       return false;
1820     }
1821     if (castedComparison.offset != this.offset) {
1822       return false;
1823     }
1824     if (castedComparison.onDiskSizeWithoutHeader != this.onDiskSizeWithoutHeader) {
1825       return false;
1826     }
1827     if (castedComparison.prevBlockOffset != this.prevBlockOffset) {
1828       return false;
1829     }
1830     if (castedComparison.uncompressedSizeWithoutHeader != this.uncompressedSizeWithoutHeader) {
1831       return false;
1832     }
1833     if (ByteBufferUtils.compareTo(this.buf, 0, this.buf.limit(), castedComparison.buf, 0,
1834         castedComparison.buf.limit()) != 0) {
1835       return false;
1836     }
1837     return true;
1838   }
1839 
1840   public DataBlockEncoding getDataBlockEncoding() {
1841     if (blockType == BlockType.ENCODED_DATA) {
1842       return DataBlockEncoding.getEncodingById(getDataBlockEncodingId());
1843     }
1844     return DataBlockEncoding.NONE;
1845   }
1846 
1847   byte getChecksumType() {
1848     return this.fileContext.getChecksumType().getCode();
1849   }
1850 
1851   int getBytesPerChecksum() {
1852     return this.fileContext.getBytesPerChecksum();
1853   }
1854 
1855   /** @return the size of data on disk + header. Excludes checksum. */
1856   int getOnDiskDataSizeWithHeader() {
1857     return this.onDiskDataSizeWithHeader;
1858   }
1859 
1860   /**
1861    * Calculate the number of bytes required to store all the checksums for this block. Each
1862    * checksum value is a 4 byte integer ({@link HFileBlock#CHECKSUM_SIZE}).
1863    */
1864   int totalChecksumBytes() {
1865     return HFileBlock.totalChecksumBytes(this.fileContext, onDiskDataSizeWithHeader);
1866   }
1867 
1868   private static int totalChecksumBytes(HFileContext fileContext, int onDiskDataSizeWithHeader) {
1869     // If the hfile block has minorVersion 0, then there are no checksum
1870     // data to validate. Similarly, a zero value in this.bytesPerChecksum
1871     // indicates that cached blocks do not have checksum data because
1872     // checksums were already validated when the block was read from disk.
1873     if (!fileContext.isUseHBaseChecksum() || fileContext.getBytesPerChecksum() == 0) {
1874       return 0;
1875     }
1876     return (int) ChecksumUtil.numBytes(onDiskDataSizeWithHeader,
1877         fileContext.getBytesPerChecksum());
1878   }
1879 
1880   /**
1881    * Returns the size of this block header.
1882    */
1883   public int headerSize() {
1884     return headerSize(this.fileContext.isUseHBaseChecksum());
1885   }
1886 
1887   /**
1888    * Maps a minor version to the size of the header.
1889    */
1890   public static int headerSize(boolean usesHBaseChecksum) {
1891     if (usesHBaseChecksum) {
1892       return HConstants.HFILEBLOCK_HEADER_SIZE;
1893     }
1894     return HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
1895   }
1896 
1897   /**
1898    * Return the appropriate DUMMY_HEADER for the minor version
1899    */
1900   public byte[] getDummyHeaderForVersion() {
1901     return getDummyHeaderForVersion(this.fileContext.isUseHBaseChecksum());
1902   }
1903 
1904   /**
1905    * Return the appropriate DUMMY_HEADER for the minor version
1906    */
1907   static private byte[] getDummyHeaderForVersion(boolean usesHBaseChecksum) {
1908     if (usesHBaseChecksum) {
1909       return HConstants.HFILEBLOCK_DUMMY_HEADER;
1910     }
1911     return DUMMY_HEADER_NO_CHECKSUM;
1912   }
1913 
1914   /**
1915    * @return the HFileContext used to create this HFileBlock. Not necessary the
1916    * fileContext for the file from which this block's data was originally read.
1917    */
1918   public HFileContext getHFileContext() {
1919     return this.fileContext;
1920   }
1921 
1922   /**
1923    * Convert the contents of the block header into a human readable string.
1924    * This is mostly helpful for debugging. This assumes that the block
1925    * has minor version > 0.
1926    */
1927   static String toStringHeader(ByteBuffer buf) throws IOException {
1928     byte[] magicBuf = new byte[Math.min(buf.limit() - buf.position(), BlockType.MAGIC_LENGTH)];
1929     buf.get(magicBuf);
1930     BlockType bt = BlockType.parse(magicBuf, 0, BlockType.MAGIC_LENGTH);
1931     int compressedBlockSizeNoHeader = buf.getInt();
1932     int uncompressedBlockSizeNoHeader = buf.getInt();
1933     long prevBlockOffset = buf.getLong();
1934     byte cksumtype = buf.get();
1935     long bytesPerChecksum = buf.getInt();
1936     long onDiskDataSizeWithHeader = buf.getInt();
1937     return " Header dump: magic: " + Bytes.toString(magicBuf) +
1938                    " blockType " + bt +
1939                    " compressedBlockSizeNoHeader " +
1940                    compressedBlockSizeNoHeader +
1941                    " uncompressedBlockSizeNoHeader " +
1942                    uncompressedBlockSizeNoHeader +
1943                    " prevBlockOffset " + prevBlockOffset +
1944                    " checksumType " + ChecksumType.codeToType(cksumtype) +
1945                    " bytesPerChecksum " + bytesPerChecksum +
1946                    " onDiskDataSizeWithHeader " + onDiskDataSizeWithHeader;
1947   }
1948 }