View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.io.hfile;
19  
20  import java.io.ByteArrayOutputStream;
21  import java.io.DataInputStream;
22  import java.io.DataOutput;
23  import java.io.DataOutputStream;
24  import java.io.IOException;
25  import java.io.InputStream;
26  import java.nio.ByteBuffer;
27  import java.util.concurrent.locks.Lock;
28  import java.util.concurrent.locks.ReentrantLock;
29  
30  import org.apache.hadoop.fs.FSDataInputStream;
31  import org.apache.hadoop.fs.FSDataOutputStream;
32  import org.apache.hadoop.fs.Path;
33  import org.apache.hadoop.hbase.Cell;
34  import org.apache.hadoop.hbase.HConstants;
35  import org.apache.hadoop.hbase.classification.InterfaceAudience;
36  import org.apache.hadoop.hbase.fs.HFileSystem;
37  import org.apache.hadoop.hbase.io.ByteBufferInputStream;
38  import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
39  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
40  import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
41  import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
42  import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
43  import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
44  import org.apache.hadoop.hbase.util.ByteBufferUtils;
45  import org.apache.hadoop.hbase.util.Bytes;
46  import org.apache.hadoop.hbase.util.ChecksumType;
47  import org.apache.hadoop.hbase.util.ClassSize;
48  import org.apache.hadoop.io.IOUtils;
49  
50  import com.google.common.annotations.VisibleForTesting;
51  import com.google.common.base.Preconditions;
52  
53  /**
54   * Reading {@link HFile} version 1 and 2 blocks, and writing version 2 blocks.
55   * <ul>
56   * <li>In version 1 all blocks are always compressed or uncompressed, as
57   * specified by the {@link HFile}'s compression algorithm, with a type-specific
58   * magic record stored in the beginning of the compressed data (i.e. one needs
59   * to uncompress the compressed block to determine the block type). There is
60   * only a single compression algorithm setting for all blocks. Offset and size
61   * information from the block index are required to read a block.
62   * <li>In version 2 a block is structured as follows:
63   * <ul>
64   * <li>header (see Writer#finishBlock())
65   * <ul>
66   * <li>Magic record identifying the block type (8 bytes)
67   * <li>Compressed block size, excluding header, including checksum (4 bytes)
68   * <li>Uncompressed block size, excluding header, excluding checksum (4 bytes)
69   * <li>The offset of the previous block of the same type (8 bytes). This is
70   * used to be able to navigate to the previous block without going to the block
71   * <li>For minorVersions >=1, the ordinal describing checksum type (1 byte)
72   * <li>For minorVersions >=1, the number of data bytes/checksum chunk (4 bytes)
73   * <li>For minorVersions >=1, the size of data on disk, including header,
74   * excluding checksums (4 bytes)
75   * </ul>
76   * </li>
77   * <li>Raw/Compressed/Encrypted/Encoded data. The compression algorithm is the
78   * same for all the blocks in the {@link HFile}, similarly to what was done in
79   * version 1.
80   * <li>For minorVersions >=1, a series of 4 byte checksums, one each for
81   * the number of bytes specified by bytesPerChecksum.
82   * </ul>
83   * </ul>
84   */
85  @InterfaceAudience.Private
86  public class HFileBlock implements Cacheable {
87  
88    /**
89     * On a checksum failure on a Reader, these many suceeding read
90     * requests switch back to using hdfs checksums before auto-reenabling
91     * hbase checksum verification.
92     */
93    static final int CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD = 3;
94  
95    public static final boolean FILL_HEADER = true;
96    public static final boolean DONT_FILL_HEADER = false;
97  
98    /**
99     * The size of block header when blockType is {@link BlockType#ENCODED_DATA}.
100    * This extends normal header by adding the id of encoder.
101    */
102   public static final int ENCODED_HEADER_SIZE = HConstants.HFILEBLOCK_HEADER_SIZE
103       + DataBlockEncoding.ID_SIZE;
104 
105   static final byte[] DUMMY_HEADER_NO_CHECKSUM =
106      new byte[HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM];
107 
108   public static final int BYTE_BUFFER_HEAP_SIZE = (int) ClassSize.estimateBase(
109       ByteBuffer.wrap(new byte[0], 0, 0).getClass(), false);
110 
111   // meta.usesHBaseChecksum+offset+nextBlockOnDiskSizeWithHeader
112   public static final int EXTRA_SERIALIZATION_SPACE = Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT
113       + Bytes.SIZEOF_LONG;
114 
115   /**
116    * Each checksum value is an integer that can be stored in 4 bytes.
117    */
118   static final int CHECKSUM_SIZE = Bytes.SIZEOF_INT;
119 
120   static final CacheableDeserializer<Cacheable> blockDeserializer =
121       new CacheableDeserializer<Cacheable>() {
122         public HFileBlock deserialize(ByteBuffer buf, boolean reuse) throws IOException{
123           buf.limit(buf.limit() - HFileBlock.EXTRA_SERIALIZATION_SPACE).rewind();
124           ByteBuffer newByteBuffer;
125           if (reuse) {
126             newByteBuffer = buf.slice();
127           } else {
128            newByteBuffer = ByteBuffer.allocate(buf.limit());
129            newByteBuffer.put(buf);
130           }
131           buf.position(buf.limit());
132           buf.limit(buf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE);
133           boolean usesChecksum = buf.get() == (byte)1;
134           HFileBlock hFileBlock = new HFileBlock(newByteBuffer, usesChecksum);
135           hFileBlock.offset = buf.getLong();
136           hFileBlock.nextBlockOnDiskSizeWithHeader = buf.getInt();
137           if (hFileBlock.hasNextBlockHeader()) {
138             hFileBlock.buf.limit(hFileBlock.buf.limit() - hFileBlock.headerSize());
139           }
140           return hFileBlock;
141         }
142 
143         @Override
144         public int getDeserialiserIdentifier() {
145           return deserializerIdentifier;
146         }
147 
148         @Override
149         public HFileBlock deserialize(ByteBuffer b) throws IOException {
150           return deserialize(b, false);
151         }
152       };
153   private static final int deserializerIdentifier;
154   static {
155     deserializerIdentifier = CacheableDeserializerIdManager
156         .registerDeserializer(blockDeserializer);
157   }
158 
159     // Todo: encapsulate Header related logic in this inner class.
160   static class Header {
161     // Format of header is:
162     // 8 bytes - block magic
163     // 4 bytes int - onDiskSizeWithoutHeader
164     // 4 bytes int - uncompressedSizeWithoutHeader
165     // 8 bytes long - prevBlockOffset
166     // The following 3 are only present if header contains checksum information
167     // 1 byte - checksum type
168     // 4 byte int - bytes per checksum
169     // 4 byte int - onDiskDataSizeWithHeader
170     static int BLOCK_MAGIC_INDEX = 0;
171     static int ON_DISK_SIZE_WITHOUT_HEADER_INDEX = 8;
172     static int UNCOMPRESSED_SIZE_WITHOUT_HEADER_INDEX = 12;
173     static int PREV_BLOCK_OFFSET_INDEX = 16;
174     static int CHECKSUM_TYPE_INDEX = 24;
175     static int BYTES_PER_CHECKSUM_INDEX = 25;
176     static int ON_DISK_DATA_SIZE_WITH_HEADER_INDEX = 29;
177   }
178 
179   /** Type of block. Header field 0. */
180   private BlockType blockType;
181 
182   /** Size on disk excluding header, including checksum. Header field 1. */
183   private int onDiskSizeWithoutHeader;
184 
185   /** Size of pure data. Does not include header or checksums. Header field 2. */
186   private final int uncompressedSizeWithoutHeader;
187 
188   /** The offset of the previous block on disk. Header field 3. */
189   private final long prevBlockOffset;
190 
191   /**
192    * Size on disk of header + data. Excludes checksum. Header field 6,
193    * OR calculated from {@link #onDiskSizeWithoutHeader} when using HDFS checksum.
194    */
195   private final int onDiskDataSizeWithHeader;
196 
197   /** The in-memory representation of the hfile block */
198   private ByteBuffer buf;
199 
200   /** Meta data that holds meta information on the hfileblock */
201   private HFileContext fileContext;
202 
203   /**
204    * The offset of this block in the file. Populated by the reader for
205    * convenience of access. This offset is not part of the block header.
206    */
207   private long offset = -1;
208 
209   /**
210    * The on-disk size of the next block, including the header, obtained by
211    * peeking into the first {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the next block's
212    * header, or -1 if unknown.
213    */
214   private int nextBlockOnDiskSizeWithHeader = -1;
215 
216   /**
217    * Creates a new {@link HFile} block from the given fields. This constructor
218    * is mostly used when the block data has already been read and uncompressed,
219    * and is sitting in a byte buffer.
220    *
221    * @param blockType the type of this block, see {@link BlockType}
222    * @param onDiskSizeWithoutHeader see {@link #onDiskSizeWithoutHeader}
223    * @param uncompressedSizeWithoutHeader see {@link #uncompressedSizeWithoutHeader}
224    * @param prevBlockOffset see {@link #prevBlockOffset}
225    * @param buf block header ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes) followed by
226    *          uncompressed data. This
227    * @param fillHeader when true, parse {@code buf} and override the first 4 header fields.
228    * @param offset the file offset the block was read from
229    * @param onDiskDataSizeWithHeader see {@link #onDiskDataSizeWithHeader}
230    * @param fileContext HFile meta data
231    */
232   HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader, int uncompressedSizeWithoutHeader,
233       long prevBlockOffset, ByteBuffer buf, boolean fillHeader, long offset,
234       int onDiskDataSizeWithHeader, HFileContext fileContext) {
235     this.blockType = blockType;
236     this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader;
237     this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader;
238     this.prevBlockOffset = prevBlockOffset;
239     this.buf = buf;
240     this.offset = offset;
241     this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader;
242     this.fileContext = fileContext;
243     if (fillHeader)
244       overwriteHeader();
245     this.buf.rewind();
246   }
247 
248   /**
249    * Copy constructor. Creates a shallow copy of {@code that}'s buffer.
250    */
251   HFileBlock(HFileBlock that) {
252     this.blockType = that.blockType;
253     this.onDiskSizeWithoutHeader = that.onDiskSizeWithoutHeader;
254     this.uncompressedSizeWithoutHeader = that.uncompressedSizeWithoutHeader;
255     this.prevBlockOffset = that.prevBlockOffset;
256     this.buf = that.buf.duplicate();
257     this.offset = that.offset;
258     this.onDiskDataSizeWithHeader = that.onDiskDataSizeWithHeader;
259     this.fileContext = that.fileContext;
260     this.nextBlockOnDiskSizeWithHeader = that.nextBlockOnDiskSizeWithHeader;
261   }
262 
263   /**
264    * Creates a block from an existing buffer starting with a header. Rewinds
265    * and takes ownership of the buffer. By definition of rewind, ignores the
266    * buffer position, but if you slice the buffer beforehand, it will rewind
267    * to that point. The reason this has a minorNumber and not a majorNumber is
268    * because majorNumbers indicate the format of a HFile whereas minorNumbers
269    * indicate the format inside a HFileBlock.
270    */
271   HFileBlock(ByteBuffer b, boolean usesHBaseChecksum) throws IOException {
272     b.rewind();
273     blockType = BlockType.read(b);
274     onDiskSizeWithoutHeader = b.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX);
275     uncompressedSizeWithoutHeader = b.getInt(Header.UNCOMPRESSED_SIZE_WITHOUT_HEADER_INDEX);
276     prevBlockOffset = b.getLong(Header.PREV_BLOCK_OFFSET_INDEX);
277     HFileContextBuilder contextBuilder = new HFileContextBuilder();
278     contextBuilder.withHBaseCheckSum(usesHBaseChecksum);
279     if (usesHBaseChecksum) {
280       contextBuilder.withChecksumType(ChecksumType.codeToType(b.get(Header.CHECKSUM_TYPE_INDEX)));
281       contextBuilder.withBytesPerCheckSum(b.getInt(Header.BYTES_PER_CHECKSUM_INDEX));
282       this.onDiskDataSizeWithHeader = b.getInt(Header.ON_DISK_DATA_SIZE_WITH_HEADER_INDEX);
283     } else {
284       contextBuilder.withChecksumType(ChecksumType.NULL);
285       contextBuilder.withBytesPerCheckSum(0);
286       this.onDiskDataSizeWithHeader = onDiskSizeWithoutHeader +
287                                        HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
288     }
289     this.fileContext = contextBuilder.build();
290     buf = b;
291     buf.rewind();
292   }
293 
294   public BlockType getBlockType() {
295     return blockType;
296   }
297 
298   /** @return get data block encoding id that was used to encode this block */
299   public short getDataBlockEncodingId() {
300     if (blockType != BlockType.ENCODED_DATA) {
301       throw new IllegalArgumentException("Querying encoder ID of a block " +
302           "of type other than " + BlockType.ENCODED_DATA + ": " + blockType);
303     }
304     return buf.getShort(headerSize());
305   }
306 
307   /**
308    * @return the on-disk size of header + data part + checksum.
309    */
310   public int getOnDiskSizeWithHeader() {
311     return onDiskSizeWithoutHeader + headerSize();
312   }
313 
314   /**
315    * @return the on-disk size of the data part + checksum (header excluded).
316    */
317   public int getOnDiskSizeWithoutHeader() {
318     return onDiskSizeWithoutHeader;
319   }
320 
321   /**
322    * @return the uncompressed size of data part (header and checksum excluded).
323    */
324    public int getUncompressedSizeWithoutHeader() {
325     return uncompressedSizeWithoutHeader;
326   }
327 
328   /**
329    * @return the offset of the previous block of the same type in the file, or
330    *         -1 if unknown
331    */
332   public long getPrevBlockOffset() {
333     return prevBlockOffset;
334   }
335 
336   /**
337    * Rewinds {@code buf} and writes first 4 header fields. {@code buf} position
338    * is modified as side-effect.
339    */
340   private void overwriteHeader() {
341     buf.rewind();
342     blockType.write(buf);
343     buf.putInt(onDiskSizeWithoutHeader);
344     buf.putInt(uncompressedSizeWithoutHeader);
345     buf.putLong(prevBlockOffset);
346     if (this.fileContext.isUseHBaseChecksum()) {
347       buf.put(fileContext.getChecksumType().getCode());
348       buf.putInt(fileContext.getBytesPerChecksum());
349       buf.putInt(onDiskDataSizeWithHeader);
350     }
351   }
352 
353   /**
354    * Returns a buffer that does not include the header or checksum.
355    *
356    * @return the buffer with header skipped and checksum omitted.
357    */
358   public ByteBuffer getBufferWithoutHeader() {
359     ByteBuffer dup = this.buf.duplicate();
360     dup.position(headerSize());
361     dup.limit(buf.limit() - totalChecksumBytes());
362     return dup.slice();
363   }
364 
365   /**
366    * Returns the buffer this block stores internally. The clients must not
367    * modify the buffer object. This method has to be public because it is
368    * used in {@link org.apache.hadoop.hbase.util.CompoundBloomFilter} 
369    * to avoid object creation on every Bloom filter lookup, but has to 
370    * be used with caution. Checksum data is not included in the returned 
371    * buffer but header data is.
372    *
373    * @return the buffer of this block for read-only operations
374    */
375   public ByteBuffer getBufferReadOnly() {
376     ByteBuffer dup = this.buf.duplicate();
377     dup.limit(buf.limit() - totalChecksumBytes());
378     return dup.slice();
379   }
380 
381   /**
382    * Returns the buffer of this block, including header data. The clients must
383    * not modify the buffer object. This method has to be public because it is
384    * used in {@link org.apache.hadoop.hbase.io.hfile.bucket.BucketCache} to avoid buffer copy.
385    *
386    * @return the buffer with header and checksum included for read-only operations
387    */
388   public ByteBuffer getBufferReadOnlyWithHeader() {
389     ByteBuffer dup = this.buf.duplicate();
390     return dup.slice();
391   }
392 
393   /**
394    * Returns a byte buffer of this block, including header data and checksum, positioned at
395    * the beginning of header. The underlying data array is not copied.
396    *
397    * @return the byte buffer with header and checksum included
398    */
399   ByteBuffer getBufferWithHeader() {
400     ByteBuffer dupBuf = buf.duplicate();
401     dupBuf.rewind();
402     return dupBuf;
403   }
404 
405   private void sanityCheckAssertion(long valueFromBuf, long valueFromField,
406       String fieldName) throws IOException {
407     if (valueFromBuf != valueFromField) {
408       throw new AssertionError(fieldName + " in the buffer (" + valueFromBuf
409           + ") is different from that in the field (" + valueFromField + ")");
410     }
411   }
412 
413   private void sanityCheckAssertion(BlockType valueFromBuf, BlockType valueFromField)
414       throws IOException {
415     if (valueFromBuf != valueFromField) {
416       throw new IOException("Block type stored in the buffer: " +
417         valueFromBuf + ", block type field: " + valueFromField);
418     }
419   }
420 
421   /**
422    * Checks if the block is internally consistent, i.e. the first
423    * {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the buffer contain a
424    * valid header consistent with the fields. Assumes a packed block structure.
425    * This function is primary for testing and debugging, and is not
426    * thread-safe, because it alters the internal buffer pointer.
427    */
428   void sanityCheck() throws IOException {
429     buf.rewind();
430 
431     sanityCheckAssertion(BlockType.read(buf), blockType);
432 
433     sanityCheckAssertion(buf.getInt(), onDiskSizeWithoutHeader,
434         "onDiskSizeWithoutHeader");
435 
436     sanityCheckAssertion(buf.getInt(), uncompressedSizeWithoutHeader,
437         "uncompressedSizeWithoutHeader");
438 
439     sanityCheckAssertion(buf.getLong(), prevBlockOffset, "prevBlocKOffset");
440     if (this.fileContext.isUseHBaseChecksum()) {
441       sanityCheckAssertion(buf.get(), this.fileContext.getChecksumType().getCode(), "checksumType");
442       sanityCheckAssertion(buf.getInt(), this.fileContext.getBytesPerChecksum(), "bytesPerChecksum");
443       sanityCheckAssertion(buf.getInt(), onDiskDataSizeWithHeader, "onDiskDataSizeWithHeader");
444     }
445 
446     int cksumBytes = totalChecksumBytes();
447     int expectedBufLimit = onDiskDataSizeWithHeader + cksumBytes;
448     if (buf.limit() != expectedBufLimit) {
449       throw new AssertionError("Expected buffer limit " + expectedBufLimit
450           + ", got " + buf.limit());
451     }
452 
453     // We might optionally allocate HFILEBLOCK_HEADER_SIZE more bytes to read the next
454     // block's header, so there are two sensible values for buffer capacity.
455     int hdrSize = headerSize();
456     if (buf.capacity() != expectedBufLimit &&
457         buf.capacity() != expectedBufLimit + hdrSize) {
458       throw new AssertionError("Invalid buffer capacity: " + buf.capacity() +
459           ", expected " + expectedBufLimit + " or " + (expectedBufLimit + hdrSize));
460     }
461   }
462 
463   @Override
464   public String toString() {
465     StringBuilder sb = new StringBuilder()
466       .append("HFileBlock [")
467       .append(" fileOffset=").append(offset)
468       .append(" headerSize()=").append(headerSize())
469       .append(" blockType=").append(blockType)
470       .append(" onDiskSizeWithoutHeader=").append(onDiskSizeWithoutHeader)
471       .append(" uncompressedSizeWithoutHeader=").append(uncompressedSizeWithoutHeader)
472       .append(" prevBlockOffset=").append(prevBlockOffset)
473       .append(" isUseHBaseChecksum()=").append(fileContext.isUseHBaseChecksum());
474     if (fileContext.isUseHBaseChecksum()) {
475       sb.append(" checksumType=").append(ChecksumType.codeToType(this.buf.get(24)))
476         .append(" bytesPerChecksum=").append(this.buf.getInt(24 + 1))
477         .append(" onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader);
478     } else {
479       sb.append(" onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader)
480         .append("(").append(onDiskSizeWithoutHeader)
481         .append("+").append(HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM).append(")");
482     }
483     String dataBegin = null;
484     if (buf.hasArray()) {
485       dataBegin = Bytes.toStringBinary(buf.array(), buf.arrayOffset() + headerSize(),
486           Math.min(32, buf.limit() - buf.arrayOffset() - headerSize()));
487     } else {
488       ByteBuffer bufWithoutHeader = getBufferWithoutHeader();
489       byte[] dataBeginBytes = new byte[Math.min(32,
490           bufWithoutHeader.limit() - bufWithoutHeader.position())];
491       bufWithoutHeader.get(dataBeginBytes);
492       dataBegin = Bytes.toStringBinary(dataBeginBytes);
493     }
494     sb.append(" getOnDiskSizeWithHeader()=").append(getOnDiskSizeWithHeader())
495       .append(" totalChecksumBytes()=").append(totalChecksumBytes())
496       .append(" isUnpacked()=").append(isUnpacked())
497       .append(" buf=[ ").append(buf).append(" ]")
498       .append(" dataBeginsWith=").append(dataBegin)
499       .append(" fileContext=").append(fileContext)
500       .append(" ]");
501     return sb.toString();
502   }
503 
504   /**
505    * Called after reading a block with provided onDiskSizeWithHeader.
506    */
507   private static void validateOnDiskSizeWithoutHeader(int expectedOnDiskSizeWithoutHeader,
508       int actualOnDiskSizeWithoutHeader, ByteBuffer buf, long offset) throws IOException {
509     if (actualOnDiskSizeWithoutHeader != expectedOnDiskSizeWithoutHeader) {
510       // We make the read-only copy here instead of when passing the parameter to this function
511       // to make duplicates only in failure cases, instead of every single time.
512       ByteBuffer bufReadOnly = buf.asReadOnlyBuffer();
513       String dataBegin = null;
514       byte[] dataBeginBytes = new byte[Math.min(32, bufReadOnly.limit() - bufReadOnly.position())];
515       bufReadOnly.get(dataBeginBytes);
516       dataBegin = Bytes.toStringBinary(dataBeginBytes);
517       String blockInfoMsg =
518         "Block offset: " + offset + ", data starts with: " + dataBegin;
519       throw new IOException("On-disk size without header provided is "
520           + expectedOnDiskSizeWithoutHeader + ", but block "
521           + "header contains " + actualOnDiskSizeWithoutHeader + ". " +
522           blockInfoMsg);
523     }
524   }
525 
526   /**
527    * Retrieves the decompressed/decrypted view of this block. An encoded block remains in its
528    * encoded structure. Internal structures are shared between instances where applicable.
529    */
530   HFileBlock unpack(HFileContext fileContext, FSReader reader) throws IOException {
531     if (!fileContext.isCompressedOrEncrypted()) {
532       // TODO: cannot use our own fileContext here because HFileBlock(ByteBuffer, boolean),
533       // which is used for block serialization to L2 cache, does not preserve encoding and
534       // encryption details.
535       return this;
536     }
537 
538     HFileBlock unpacked = new HFileBlock(this);
539     unpacked.allocateBuffer(); // allocates space for the decompressed block
540 
541     HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA ?
542       reader.getBlockDecodingContext() : reader.getDefaultBlockDecodingContext();
543 
544     ByteBuffer dup = this.buf.duplicate();
545     dup.position(this.headerSize());
546     dup = dup.slice();
547     ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(),
548       unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(),
549       dup);
550 
551     // Preserve the next block's header bytes in the new block if we have them.
552     if (unpacked.hasNextBlockHeader()) {
553       // Both the buffers are limited till checksum bytes and avoid the next block's header.
554       // Below call to copyFromBufferToBuffer() will try positional read/write from/to buffers when
555       // any of the buffer is DBB. So we change the limit on a dup buffer. No copying just create
556       // new BB objects
557       ByteBuffer inDup = this.buf.duplicate();
558       inDup.limit(inDup.limit() + headerSize());
559       ByteBuffer outDup = unpacked.buf.duplicate();
560       outDup.limit(outDup.limit() + unpacked.headerSize());
561       ByteBufferUtils.copyFromBufferToBuffer(
562           outDup,
563           inDup,
564           this.onDiskDataSizeWithHeader,
565           unpacked.headerSize() + unpacked.uncompressedSizeWithoutHeader
566               + unpacked.totalChecksumBytes(), unpacked.headerSize());
567     }
568     return unpacked;
569   }
570 
571   /**
572    * Return true when this buffer includes next block's header.
573    */
574   private boolean hasNextBlockHeader() {
575     return nextBlockOnDiskSizeWithHeader > 0;
576   }
577 
578   /**
579    * Always allocates a new buffer of the correct size. Copies header bytes
580    * from the existing buffer. Does not change header fields.
581    * Reserve room to keep checksum bytes too.
582    */
583   private void allocateBuffer() {
584     int cksumBytes = totalChecksumBytes();
585     int headerSize = headerSize();
586     int capacityNeeded = headerSize + uncompressedSizeWithoutHeader +
587         cksumBytes + (hasNextBlockHeader() ? headerSize : 0);
588 
589     // TODO we need consider allocating offheap here?
590     ByteBuffer newBuf = ByteBuffer.allocate(capacityNeeded);
591 
592     // Copy header bytes into newBuf.
593     // newBuf is HBB so no issue in calling array()
594     ByteBuffer dup = buf.duplicate();
595     dup.position(0);
596     dup.get(newBuf.array(), newBuf.arrayOffset(), headerSize);
597 
598     buf = newBuf;
599     // set limit to exclude next block's header
600     buf.limit(headerSize + uncompressedSizeWithoutHeader + cksumBytes);
601   }
602 
603   /**
604    * Return true when this block's buffer has been unpacked, false otherwise. Note this is a
605    * calculated heuristic, not tracked attribute of the block.
606    */
607   public boolean isUnpacked() {
608     final int cksumBytes = totalChecksumBytes();
609     final int headerSize = headerSize();
610     final int expectedCapacity = headerSize + uncompressedSizeWithoutHeader + cksumBytes;
611     final int bufCapacity = buf.capacity();
612     return bufCapacity == expectedCapacity || bufCapacity == expectedCapacity + headerSize;
613   }
614 
615   /** An additional sanity-check in case no compression or encryption is being used. */
616   public static void verifyUncompressed(ByteBuffer buf, boolean useHBaseChecksum)
617       throws IOException {
618     int onDiskSizeWithoutHeader = buf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX);
619     int uncompressedSizeWithoutHeader = buf.getInt(Header.UNCOMPRESSED_SIZE_WITHOUT_HEADER_INDEX);
620     int onDiskDataSizeWithHeader;
621     int checksumBytes = 0;
622     if (useHBaseChecksum) {
623       onDiskDataSizeWithHeader = buf.getInt(Header.ON_DISK_DATA_SIZE_WITH_HEADER_INDEX);
624       checksumBytes = (int) ChecksumUtil.numBytes(onDiskDataSizeWithHeader,
625           buf.getInt(Header.BYTES_PER_CHECKSUM_INDEX));
626     }
627 
628     if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader + checksumBytes) {
629       throw new IOException("Using no compression but "
630           + "onDiskSizeWithoutHeader=" + onDiskSizeWithoutHeader + ", "
631           + "uncompressedSizeWithoutHeader=" + uncompressedSizeWithoutHeader
632           + ", numChecksumbytes=" + checksumBytes);
633     }
634   }
635 
636   /**
637    * @param expectedType the expected type of this block
638    * @throws IOException if this block's type is different than expected
639    */
640   public void expectType(BlockType expectedType) throws IOException {
641     if (blockType != expectedType) {
642       throw new IOException("Invalid block type: expected=" + expectedType
643           + ", actual=" + blockType);
644     }
645   }
646 
647   /** @return the offset of this block in the file it was read from */
648   public long getOffset() {
649     if (offset < 0) {
650       throw new IllegalStateException(
651           "HFile block offset not initialized properly");
652     }
653     return offset;
654   }
655 
656   /**
657    * @return a byte stream reading the data + checksum of this block
658    */
659   public DataInputStream getByteStream() {
660     ByteBuffer dup = this.buf.duplicate();
661     dup.position(this.headerSize());
662     return new DataInputStream(new ByteBufferInputStream(dup));
663   }
664 
665   @Override
666   public long heapSize() {
667     long size = ClassSize.align(
668         ClassSize.OBJECT +
669         // Block type, byte buffer and meta references
670         3 * ClassSize.REFERENCE +
671         // On-disk size, uncompressed size, and next block's on-disk size
672         // bytePerChecksum and onDiskDataSize
673         4 * Bytes.SIZEOF_INT +
674         // This and previous block offset
675         2 * Bytes.SIZEOF_LONG +
676         // Heap size of the meta object. meta will be always not null.
677         fileContext.heapSize()
678     );
679 
680     if (buf != null) {
681       // Deep overhead of the byte buffer. Needs to be aligned separately.
682       size += ClassSize.align(buf.capacity() + BYTE_BUFFER_HEAP_SIZE);
683     }
684 
685     return ClassSize.align(size);
686   }
687 
688   /**
689    * Read from an input stream. Analogous to
690    * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but specifies a
691    * number of "extra" bytes that would be desirable but not absolutely
692    * necessary to read.
693    *
694    * @param in the input stream to read from
695    * @param buf the buffer to read into
696    * @param bufOffset the destination offset in the buffer
697    * @param necessaryLen the number of bytes that are absolutely necessary to
698    *          read
699    * @param extraLen the number of extra bytes that would be nice to read
700    * @return true if succeeded reading the extra bytes
701    * @throws IOException if failed to read the necessary bytes
702    */
703   public static boolean readWithExtra(InputStream in, byte[] buf,
704       int bufOffset, int necessaryLen, int extraLen) throws IOException {
705     int bytesRemaining = necessaryLen + extraLen;
706     while (bytesRemaining > 0) {
707       int ret = in.read(buf, bufOffset, bytesRemaining);
708       if (ret == -1 && bytesRemaining <= extraLen) {
709         // We could not read the "extra data", but that is OK.
710         break;
711       }
712 
713       if (ret < 0) {
714         throw new IOException("Premature EOF from inputStream (read "
715             + "returned " + ret + ", was trying to read " + necessaryLen
716             + " necessary bytes and " + extraLen + " extra bytes, "
717             + "successfully read "
718             + (necessaryLen + extraLen - bytesRemaining));
719       }
720       bufOffset += ret;
721       bytesRemaining -= ret;
722     }
723     return bytesRemaining <= 0;
724   }
725 
726   /**
727    * Read from an input stream. Analogous to
728    * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but uses
729    * positional read and specifies a number of "extra" bytes that would be
730    * desirable but not absolutely necessary to read.
731    *
732    * @param in the input stream to read from
733    * @param position the position within the stream from which to start reading
734    * @param buf the buffer to read into
735    * @param bufOffset the destination offset in the buffer
736    * @param necessaryLen the number of bytes that are absolutely necessary to
737    *     read
738    * @param extraLen the number of extra bytes that would be nice to read
739    * @return true if and only if extraLen is > 0 and reading those extra bytes
740    *     was successful
741    * @throws IOException if failed to read the necessary bytes
742    */
743   @VisibleForTesting
744   static boolean positionalReadWithExtra(FSDataInputStream in,
745       long position, byte[] buf, int bufOffset, int necessaryLen, int extraLen)
746       throws IOException {
747     int bytesRemaining = necessaryLen + extraLen;
748     int bytesRead = 0;
749     while (bytesRead < necessaryLen) {
750       int ret = in.read(position, buf, bufOffset, bytesRemaining);
751       if (ret < 0) {
752         throw new IOException("Premature EOF from inputStream (positional read "
753             + "returned " + ret + ", was trying to read " + necessaryLen
754             + " necessary bytes and " + extraLen + " extra bytes, "
755             + "successfully read " + bytesRead);
756       }
757       position += ret;
758       bufOffset += ret;
759       bytesRemaining -= ret;
760       bytesRead += ret;
761     }
762     return bytesRead != necessaryLen && bytesRemaining <= 0;
763   }
764 
765   /**
766    * @return the on-disk size of the next block (including the header size)
767    *         that was read by peeking into the next block's header
768    */
769   public int getNextBlockOnDiskSizeWithHeader() {
770     return nextBlockOnDiskSizeWithHeader;
771   }
772 
773   /**
774    * Unified version 2 {@link HFile} block writer. The intended usage pattern
775    * is as follows:
776    * <ol>
777    * <li>Construct an {@link HFileBlock.Writer}, providing a compression algorithm.
778    * <li>Call {@link Writer#startWriting} and get a data stream to write to.
779    * <li>Write your data into the stream.
780    * <li>Call {@link Writer#writeHeaderAndData(FSDataOutputStream)} as many times as you need to.
781    * store the serialized block into an external stream.
782    * <li>Repeat to write more blocks.
783    * </ol>
784    * <p>
785    */
786   public static class Writer {
787 
788     private enum State {
789       INIT,
790       WRITING,
791       BLOCK_READY
792     };
793 
794     /** Writer state. Used to ensure the correct usage protocol. */
795     private State state = State.INIT;
796 
797     /** Data block encoder used for data blocks */
798     private final HFileDataBlockEncoder dataBlockEncoder;
799 
800     private HFileBlockEncodingContext dataBlockEncodingCtx;
801 
802     /** block encoding context for non-data blocks */
803     private HFileBlockDefaultEncodingContext defaultBlockEncodingCtx;
804 
805     /**
806      * The stream we use to accumulate data in uncompressed format for each
807      * block. We reset this stream at the end of each block and reuse it. The
808      * header is written as the first {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes into this
809      * stream.
810      */
811     private ByteArrayOutputStream baosInMemory;
812 
813     /**
814      * Current block type. Set in {@link #startWriting(BlockType)}. Could be
815      * changed in {@link #finishBlock()} from {@link BlockType#DATA}
816      * to {@link BlockType#ENCODED_DATA}.
817      */
818     private BlockType blockType;
819 
820     /**
821      * A stream that we write uncompressed bytes to, which compresses them and
822      * writes them to {@link #baosInMemory}.
823      */
824     private DataOutputStream userDataStream;
825 
826     // Size of actual data being written. Not considering the block encoding/compression. This
827     // includes the header size also.
828     private int unencodedDataSizeWritten;
829 
830     /**
831      * Bytes to be written to the file system, including the header. Compressed
832      * if compression is turned on. It also includes the checksum data that
833      * immediately follows the block data. (header + data + checksums)
834      */
835     private byte[] onDiskBytesWithHeader;
836 
837     /**
838      * The size of the checksum data on disk. It is used only if data is
839      * not compressed. If data is compressed, then the checksums are already
840      * part of onDiskBytesWithHeader. If data is uncompressed, then this
841      * variable stores the checksum data for this block.
842      */
843     private byte[] onDiskChecksum;
844 
845     /**
846      * Valid in the READY state. Contains the header and the uncompressed (but
847      * potentially encoded, if this is a data block) bytes, so the length is
848      * {@link #uncompressedSizeWithoutHeader} +
849      * {@link org.apache.hadoop.hbase.HConstants#HFILEBLOCK_HEADER_SIZE}.
850      * Does not store checksums.
851      */
852     private byte[] uncompressedBytesWithHeader;
853 
854     /**
855      * Current block's start offset in the {@link HFile}. Set in
856      * {@link #writeHeaderAndData(FSDataOutputStream)}.
857      */
858     private long startOffset;
859 
860     /**
861      * Offset of previous block by block type. Updated when the next block is
862      * started.
863      */
864     private long[] prevOffsetByType;
865 
866     /** The offset of the previous block of the same type */
867     private long prevOffset;
868     /** Meta data that holds information about the hfileblock**/
869     private HFileContext fileContext;
870 
871     /**
872      * @param dataBlockEncoder data block encoding algorithm to use
873      */
874     public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext fileContext) {
875       this.dataBlockEncoder = dataBlockEncoder != null
876           ? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE;
877       defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(null,
878           HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
879       dataBlockEncodingCtx = this.dataBlockEncoder
880           .newDataBlockEncodingContext(HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
881 
882       if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) {
883         throw new RuntimeException("Unsupported value of bytesPerChecksum. " +
884             " Minimum is " + HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is " +
885             fileContext.getBytesPerChecksum());
886       }
887 
888       baosInMemory = new ByteArrayOutputStream();
889 
890       prevOffsetByType = new long[BlockType.values().length];
891       for (int i = 0; i < prevOffsetByType.length; ++i)
892         prevOffsetByType[i] = -1;
893 
894       this.fileContext = fileContext;
895     }
896 
897     /**
898      * Starts writing into the block. The previous block's data is discarded.
899      *
900      * @return the stream the user can write their data into
901      * @throws IOException
902      */
903     public DataOutputStream startWriting(BlockType newBlockType)
904         throws IOException {
905       if (state == State.BLOCK_READY && startOffset != -1) {
906         // We had a previous block that was written to a stream at a specific
907         // offset. Save that offset as the last offset of a block of that type.
908         prevOffsetByType[blockType.getId()] = startOffset;
909       }
910 
911       startOffset = -1;
912       blockType = newBlockType;
913 
914       baosInMemory.reset();
915       baosInMemory.write(HConstants.HFILEBLOCK_DUMMY_HEADER);
916 
917       state = State.WRITING;
918 
919       // We will compress it later in finishBlock()
920       userDataStream = new DataOutputStream(baosInMemory);
921       if (newBlockType == BlockType.DATA) {
922         this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream);
923       }
924       this.unencodedDataSizeWritten = 0;
925       return userDataStream;
926     }
927 
928     /**
929      * Writes the Cell to this block
930      * @param cell
931      * @throws IOException
932      */
933     public void write(Cell cell) throws IOException{
934       expectState(State.WRITING);
935       this.unencodedDataSizeWritten += this.dataBlockEncoder.encode(cell, dataBlockEncodingCtx,
936           this.userDataStream);
937     }
938 
939     /**
940      * Returns the stream for the user to write to. The block writer takes care
941      * of handling compression and buffering for caching on write. Can only be
942      * called in the "writing" state.
943      *
944      * @return the data output stream for the user to write to
945      */
946     DataOutputStream getUserDataStream() {
947       expectState(State.WRITING);
948       return userDataStream;
949     }
950 
951     /**
952      * Transitions the block writer from the "writing" state to the "block
953      * ready" state.  Does nothing if a block is already finished.
954      */
955     void ensureBlockReady() throws IOException {
956       Preconditions.checkState(state != State.INIT,
957           "Unexpected state: " + state);
958 
959       if (state == State.BLOCK_READY)
960         return;
961 
962       // This will set state to BLOCK_READY.
963       finishBlock();
964     }
965 
966     /**
967      * An internal method that flushes the compressing stream (if using
968      * compression), serializes the header, and takes care of the separate
969      * uncompressed stream for caching on write, if applicable. Sets block
970      * write state to "block ready".
971      */
972     private void finishBlock() throws IOException {
973       if (blockType == BlockType.DATA) {
974         BufferGrabbingByteArrayOutputStream baosInMemoryCopy =
975             new BufferGrabbingByteArrayOutputStream();
976         baosInMemory.writeTo(baosInMemoryCopy);
977         this.dataBlockEncoder.endBlockEncoding(dataBlockEncodingCtx, userDataStream,
978             baosInMemoryCopy.buf, blockType);
979         blockType = dataBlockEncodingCtx.getBlockType();
980       }
981       userDataStream.flush();
982       // This does an array copy, so it is safe to cache this byte array.
983       uncompressedBytesWithHeader = baosInMemory.toByteArray();
984       prevOffset = prevOffsetByType[blockType.getId()];
985 
986       // We need to set state before we can package the block up for
987       // cache-on-write. In a way, the block is ready, but not yet encoded or
988       // compressed.
989       state = State.BLOCK_READY;
990       if (blockType == BlockType.DATA || blockType == BlockType.ENCODED_DATA) {
991         onDiskBytesWithHeader = dataBlockEncodingCtx
992             .compressAndEncrypt(uncompressedBytesWithHeader);
993       } else {
994         onDiskBytesWithHeader = defaultBlockEncodingCtx
995             .compressAndEncrypt(uncompressedBytesWithHeader);
996       }
997       int numBytes = (int) ChecksumUtil.numBytes(
998           onDiskBytesWithHeader.length,
999           fileContext.getBytesPerChecksum());
1000 
1001       // put the header for on disk bytes
1002       putHeader(onDiskBytesWithHeader, 0,
1003           onDiskBytesWithHeader.length + numBytes,
1004           uncompressedBytesWithHeader.length, onDiskBytesWithHeader.length);
1005       // set the header for the uncompressed bytes (for cache-on-write)
1006       putHeader(uncompressedBytesWithHeader, 0,
1007           onDiskBytesWithHeader.length + numBytes,
1008           uncompressedBytesWithHeader.length, onDiskBytesWithHeader.length);
1009 
1010       onDiskChecksum = new byte[numBytes];
1011       ChecksumUtil.generateChecksums(
1012           onDiskBytesWithHeader, 0, onDiskBytesWithHeader.length,
1013           onDiskChecksum, 0, fileContext.getChecksumType(), fileContext.getBytesPerChecksum());
1014     }
1015 
1016     public static class BufferGrabbingByteArrayOutputStream extends ByteArrayOutputStream {
1017       private byte[] buf;
1018 
1019       @Override
1020       public void write(byte[] b, int off, int len) {
1021         this.buf = b;
1022       }
1023 
1024       public byte[] getBuffer() {
1025         return this.buf;
1026       }
1027     }
1028 
1029     /**
1030      * Put the header into the given byte array at the given offset.
1031      * @param onDiskSize size of the block on disk header + data + checksum
1032      * @param uncompressedSize size of the block after decompression (but
1033      *          before optional data block decoding) including header
1034      * @param onDiskDataSize size of the block on disk with header
1035      *        and data but not including the checksums
1036      */
1037     private void putHeader(byte[] dest, int offset, int onDiskSize,
1038         int uncompressedSize, int onDiskDataSize) {
1039       offset = blockType.put(dest, offset);
1040       offset = Bytes.putInt(dest, offset, onDiskSize - HConstants.HFILEBLOCK_HEADER_SIZE);
1041       offset = Bytes.putInt(dest, offset, uncompressedSize - HConstants.HFILEBLOCK_HEADER_SIZE);
1042       offset = Bytes.putLong(dest, offset, prevOffset);
1043       offset = Bytes.putByte(dest, offset, fileContext.getChecksumType().getCode());
1044       offset = Bytes.putInt(dest, offset, fileContext.getBytesPerChecksum());
1045       Bytes.putInt(dest, offset, onDiskDataSize);
1046     }
1047 
1048     /**
1049      * Similar to {@link #writeHeaderAndData(FSDataOutputStream)}, but records
1050      * the offset of this block so that it can be referenced in the next block
1051      * of the same type.
1052      *
1053      * @param out
1054      * @throws IOException
1055      */
1056     public void writeHeaderAndData(FSDataOutputStream out) throws IOException {
1057       long offset = out.getPos();
1058       if (startOffset != -1 && offset != startOffset) {
1059         throw new IOException("A " + blockType + " block written to a "
1060             + "stream twice, first at offset " + startOffset + ", then at "
1061             + offset);
1062       }
1063       startOffset = offset;
1064 
1065       finishBlockAndWriteHeaderAndData((DataOutputStream) out);
1066     }
1067 
1068     /**
1069      * Writes the header and the compressed data of this block (or uncompressed
1070      * data when not using compression) into the given stream. Can be called in
1071      * the "writing" state or in the "block ready" state. If called in the
1072      * "writing" state, transitions the writer to the "block ready" state.
1073      *
1074      * @param out the output stream to write the
1075      * @throws IOException
1076      */
1077     protected void finishBlockAndWriteHeaderAndData(DataOutputStream out)
1078       throws IOException {
1079       ensureBlockReady();
1080       out.write(onDiskBytesWithHeader);
1081       out.write(onDiskChecksum);
1082     }
1083 
1084     /**
1085      * Returns the header or the compressed data (or uncompressed data when not
1086      * using compression) as a byte array. Can be called in the "writing" state
1087      * or in the "block ready" state. If called in the "writing" state,
1088      * transitions the writer to the "block ready" state. This returns
1089      * the header + data + checksums stored on disk.
1090      *
1091      * @return header and data as they would be stored on disk in a byte array
1092      * @throws IOException
1093      */
1094     byte[] getHeaderAndDataForTest() throws IOException {
1095       ensureBlockReady();
1096       // This is not very optimal, because we are doing an extra copy.
1097       // But this method is used only by unit tests.
1098       byte[] output =
1099           new byte[onDiskBytesWithHeader.length
1100               + onDiskChecksum.length];
1101       System.arraycopy(onDiskBytesWithHeader, 0, output, 0,
1102           onDiskBytesWithHeader.length);
1103       System.arraycopy(onDiskChecksum, 0, output,
1104           onDiskBytesWithHeader.length, onDiskChecksum.length);
1105       return output;
1106     }
1107 
1108     /**
1109      * Releases resources used by this writer.
1110      */
1111     public void release() {
1112       if (dataBlockEncodingCtx != null) {
1113         dataBlockEncodingCtx.close();
1114         dataBlockEncodingCtx = null;
1115       }
1116       if (defaultBlockEncodingCtx != null) {
1117         defaultBlockEncodingCtx.close();
1118         defaultBlockEncodingCtx = null;
1119       }
1120     }
1121 
1122     /**
1123      * Returns the on-disk size of the data portion of the block. This is the
1124      * compressed size if compression is enabled. Can only be called in the
1125      * "block ready" state. Header is not compressed, and its size is not
1126      * included in the return value.
1127      *
1128      * @return the on-disk size of the block, not including the header.
1129      */
1130     int getOnDiskSizeWithoutHeader() {
1131       expectState(State.BLOCK_READY);
1132       return onDiskBytesWithHeader.length
1133           + onDiskChecksum.length
1134           - HConstants.HFILEBLOCK_HEADER_SIZE;
1135     }
1136 
1137     /**
1138      * Returns the on-disk size of the block. Can only be called in the
1139      * "block ready" state.
1140      *
1141      * @return the on-disk size of the block ready to be written, including the
1142      *         header size, the data and the checksum data.
1143      */
1144     int getOnDiskSizeWithHeader() {
1145       expectState(State.BLOCK_READY);
1146       return onDiskBytesWithHeader.length + onDiskChecksum.length;
1147     }
1148 
1149     /**
1150      * The uncompressed size of the block data. Does not include header size.
1151      */
1152     int getUncompressedSizeWithoutHeader() {
1153       expectState(State.BLOCK_READY);
1154       return uncompressedBytesWithHeader.length - HConstants.HFILEBLOCK_HEADER_SIZE;
1155     }
1156 
1157     /**
1158      * The uncompressed size of the block data, including header size.
1159      */
1160     int getUncompressedSizeWithHeader() {
1161       expectState(State.BLOCK_READY);
1162       return uncompressedBytesWithHeader.length;
1163     }
1164 
1165     /** @return true if a block is being written  */
1166     public boolean isWriting() {
1167       return state == State.WRITING;
1168     }
1169 
1170     /**
1171      * Returns the number of bytes written into the current block so far, or
1172      * zero if not writing the block at the moment. Note that this will return
1173      * zero in the "block ready" state as well.
1174      *
1175      * @return the number of bytes written
1176      */
1177     public int blockSizeWritten() {
1178       if (state != State.WRITING) return 0;
1179       return this.unencodedDataSizeWritten;
1180     }
1181 
1182     /**
1183      * Returns the header followed by the uncompressed data, even if using
1184      * compression. This is needed for storing uncompressed blocks in the block
1185      * cache. Can be called in the "writing" state or the "block ready" state.
1186      * Returns only the header and data, does not include checksum data.
1187      *
1188      * @return uncompressed block bytes for caching on write
1189      */
1190     ByteBuffer getUncompressedBufferWithHeader() {
1191       expectState(State.BLOCK_READY);
1192       return ByteBuffer.wrap(uncompressedBytesWithHeader);
1193     }
1194 
1195     /**
1196      * Returns the header followed by the on-disk (compressed/encoded/encrypted) data. This is
1197      * needed for storing packed blocks in the block cache. Expects calling semantics identical to
1198      * {@link #getUncompressedBufferWithHeader()}. Returns only the header and data,
1199      * Does not include checksum data.
1200      *
1201      * @return packed block bytes for caching on write
1202      */
1203     ByteBuffer getOnDiskBufferWithHeader() {
1204       expectState(State.BLOCK_READY);
1205       return ByteBuffer.wrap(onDiskBytesWithHeader);
1206     }
1207 
1208     private void expectState(State expectedState) {
1209       if (state != expectedState) {
1210         throw new IllegalStateException("Expected state: " + expectedState +
1211             ", actual state: " + state);
1212       }
1213     }
1214 
1215     /**
1216      * Takes the given {@link BlockWritable} instance, creates a new block of
1217      * its appropriate type, writes the writable into this block, and flushes
1218      * the block into the output stream. The writer is instructed not to buffer
1219      * uncompressed bytes for cache-on-write.
1220      *
1221      * @param bw the block-writable object to write as a block
1222      * @param out the file system output stream
1223      * @throws IOException
1224      */
1225     public void writeBlock(BlockWritable bw, FSDataOutputStream out)
1226         throws IOException {
1227       bw.writeToBlock(startWriting(bw.getBlockType()));
1228       writeHeaderAndData(out);
1229     }
1230 
1231     /**
1232      * Creates a new HFileBlock. Checksums have already been validated, so
1233      * the byte buffer passed into the constructor of this newly created
1234      * block does not have checksum data even though the header minor
1235      * version is MINOR_VERSION_WITH_CHECKSUM. This is indicated by setting a
1236      * 0 value in bytesPerChecksum.
1237      */
1238     public HFileBlock getBlockForCaching(CacheConfig cacheConf) {
1239       HFileContext newContext = new HFileContextBuilder()
1240                                 .withBlockSize(fileContext.getBlocksize())
1241                                 .withBytesPerCheckSum(0)
1242                                 .withChecksumType(ChecksumType.NULL) // no checksums in cached data
1243                                 .withCompression(fileContext.getCompression())
1244                                 .withDataBlockEncoding(fileContext.getDataBlockEncoding())
1245                                 .withHBaseCheckSum(fileContext.isUseHBaseChecksum())
1246                                 .withCompressTags(fileContext.isCompressTags())
1247                                 .withIncludesMvcc(fileContext.isIncludesMvcc())
1248                                 .withIncludesTags(fileContext.isIncludesTags())
1249                                 .build();
1250       return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(),
1251           getUncompressedSizeWithoutHeader(), prevOffset,
1252           cacheConf.shouldCacheCompressed(blockType.getCategory()) ?
1253             getOnDiskBufferWithHeader() :
1254             getUncompressedBufferWithHeader(),
1255           FILL_HEADER, startOffset,
1256           onDiskBytesWithHeader.length + onDiskChecksum.length, newContext);
1257     }
1258   }
1259 
1260   /** Something that can be written into a block. */
1261   public interface BlockWritable {
1262 
1263     /** The type of block this data should use. */
1264     BlockType getBlockType();
1265 
1266     /**
1267      * Writes the block to the provided stream. Must not write any magic
1268      * records.
1269      *
1270      * @param out a stream to write uncompressed data into
1271      */
1272     void writeToBlock(DataOutput out) throws IOException;
1273   }
1274 
1275   // Block readers and writers
1276 
1277   /** An interface allowing to iterate {@link HFileBlock}s. */
1278   public interface BlockIterator {
1279 
1280     /**
1281      * Get the next block, or null if there are no more blocks to iterate.
1282      */
1283     HFileBlock nextBlock() throws IOException;
1284 
1285     /**
1286      * Similar to {@link #nextBlock()} but checks block type, throws an
1287      * exception if incorrect, and returns the HFile block
1288      */
1289     HFileBlock nextBlockWithBlockType(BlockType blockType) throws IOException;
1290   }
1291 
1292   /** A full-fledged reader with iteration ability. */
1293   public interface FSReader {
1294 
1295     /**
1296      * Reads the block at the given offset in the file with the given on-disk
1297      * size and uncompressed size.
1298      *
1299      * @param offset
1300      * @param onDiskSize the on-disk size of the entire block, including all
1301      *          applicable headers, or -1 if unknown
1302      * @param uncompressedSize the uncompressed size of the compressed part of
1303      *          the block, or -1 if unknown
1304      * @return the newly read block
1305      */
1306     HFileBlock readBlockData(long offset, long onDiskSize,
1307         int uncompressedSize, boolean pread) throws IOException;
1308 
1309     /**
1310      * Creates a block iterator over the given portion of the {@link HFile}.
1311      * The iterator returns blocks starting with offset such that offset <=
1312      * startOffset < endOffset. Returned blocks are always unpacked.
1313      *
1314      * @param startOffset the offset of the block to start iteration with
1315      * @param endOffset the offset to end iteration at (exclusive)
1316      * @return an iterator of blocks between the two given offsets
1317      */
1318     BlockIterator blockRange(long startOffset, long endOffset);
1319 
1320     /** Closes the backing streams */
1321     void closeStreams() throws IOException;
1322 
1323     /** Get a decoder for {@link BlockType#ENCODED_DATA} blocks from this file. */
1324     HFileBlockDecodingContext getBlockDecodingContext();
1325 
1326     /** Get the default decoder for blocks from this file. */
1327     HFileBlockDecodingContext getDefaultBlockDecodingContext();
1328   }
1329 
1330   /**
1331    * A common implementation of some methods of {@link FSReader} and some
1332    * tools for implementing HFile format version-specific block readers.
1333    */
1334   private abstract static class AbstractFSReader implements FSReader {
1335     /** Compression algorithm used by the {@link HFile} */
1336 
1337     /** The size of the file we are reading from, or -1 if unknown. */
1338     protected long fileSize;
1339 
1340     /** The size of the header */
1341     protected final int hdrSize;
1342 
1343     /** The filesystem used to access data */
1344     protected HFileSystem hfs;
1345 
1346     /** The path (if any) where this data is coming from */
1347     protected Path path;
1348 
1349     private final Lock streamLock = new ReentrantLock();
1350 
1351     /** The default buffer size for our buffered streams */
1352     public static final int DEFAULT_BUFFER_SIZE = 1 << 20;
1353 
1354     protected HFileContext fileContext;
1355 
1356     public AbstractFSReader(long fileSize, HFileSystem hfs, Path path, HFileContext fileContext)
1357         throws IOException {
1358       this.fileSize = fileSize;
1359       this.hfs = hfs;
1360       this.path = path;
1361       this.fileContext = fileContext;
1362       this.hdrSize = headerSize(fileContext.isUseHBaseChecksum());
1363     }
1364 
1365     @Override
1366     public BlockIterator blockRange(final long startOffset,
1367         final long endOffset) {
1368       final FSReader owner = this; // handle for inner class
1369       return new BlockIterator() {
1370         private long offset = startOffset;
1371 
1372         @Override
1373         public HFileBlock nextBlock() throws IOException {
1374           if (offset >= endOffset)
1375             return null;
1376           HFileBlock b = readBlockData(offset, -1, -1, false);
1377           offset += b.getOnDiskSizeWithHeader();
1378           return b.unpack(fileContext, owner);
1379         }
1380 
1381         @Override
1382         public HFileBlock nextBlockWithBlockType(BlockType blockType)
1383             throws IOException {
1384           HFileBlock blk = nextBlock();
1385           if (blk.getBlockType() != blockType) {
1386             throw new IOException("Expected block of type " + blockType
1387                 + " but found " + blk.getBlockType());
1388           }
1389           return blk;
1390         }
1391       };
1392     }
1393 
1394     /**
1395      * Does a positional read or a seek and read into the given buffer. Returns
1396      * the on-disk size of the next block, or -1 if it could not be determined.
1397      *
1398      * @param dest destination buffer
1399      * @param destOffset offset in the destination buffer
1400      * @param size size of the block to be read
1401      * @param peekIntoNextBlock whether to read the next block's on-disk size
1402      * @param fileOffset position in the stream to read at
1403      * @param pread whether we should do a positional read
1404      * @param istream The input source of data
1405      * @return the on-disk size of the next block with header size included, or
1406      *         -1 if it could not be determined
1407      * @throws IOException
1408      */
1409     protected int readAtOffset(FSDataInputStream istream, byte[] dest, int destOffset, int size,
1410         boolean peekIntoNextBlock, long fileOffset, boolean pread) throws IOException {
1411       if (peekIntoNextBlock &&
1412           destOffset + size + hdrSize > dest.length) {
1413         // We are asked to read the next block's header as well, but there is
1414         // not enough room in the array.
1415         throw new IOException("Attempted to read " + size + " bytes and " +
1416             hdrSize + " bytes of next header into a " + dest.length +
1417             "-byte array at offset " + destOffset);
1418       }
1419 
1420       if (!pread && streamLock.tryLock()) {
1421         // Seek + read. Better for scanning.
1422         try {
1423           istream.seek(fileOffset);
1424 
1425           long realOffset = istream.getPos();
1426           if (realOffset != fileOffset) {
1427             throw new IOException("Tried to seek to " + fileOffset + " to "
1428                 + "read " + size + " bytes, but pos=" + realOffset
1429                 + " after seek");
1430           }
1431 
1432           if (!peekIntoNextBlock) {
1433             IOUtils.readFully(istream, dest, destOffset, size);
1434             return -1;
1435           }
1436 
1437           // Try to read the next block header.
1438           if (!readWithExtra(istream, dest, destOffset, size, hdrSize))
1439             return -1;
1440         } finally {
1441           streamLock.unlock();
1442         }
1443       } else {
1444         // Positional read. Better for random reads; or when the streamLock is already locked.
1445         int extraSize = peekIntoNextBlock ? hdrSize : 0;
1446         if (!positionalReadWithExtra(istream, fileOffset, dest, destOffset,
1447             size, extraSize)) {
1448           return -1;
1449         }
1450       }
1451 
1452       assert peekIntoNextBlock;
1453       return Bytes.toInt(dest, destOffset + size + BlockType.MAGIC_LENGTH) + hdrSize;
1454     }
1455 
1456   }
1457 
1458   /**
1459    * We always prefetch the header of the next block, so that we know its
1460    * on-disk size in advance and can read it in one operation.
1461    */
1462   private static class PrefetchedHeader {
1463     long offset = -1;
1464     byte[] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE];
1465     final ByteBuffer buf = ByteBuffer.wrap(header, 0, HConstants.HFILEBLOCK_HEADER_SIZE);
1466   }
1467 
1468   /** Reads version 2 blocks from the filesystem. */
1469   static class FSReaderImpl extends AbstractFSReader {
1470     /** The file system stream of the underlying {@link HFile} that
1471      * does or doesn't do checksum validations in the filesystem */
1472     protected FSDataInputStreamWrapper streamWrapper;
1473 
1474     private HFileBlockDecodingContext encodedBlockDecodingCtx;
1475 
1476     /** Default context used when BlockType != {@link BlockType#ENCODED_DATA}. */
1477     private final HFileBlockDefaultDecodingContext defaultDecodingCtx;
1478 
1479     private ThreadLocal<PrefetchedHeader> prefetchedHeaderForThread =
1480         new ThreadLocal<PrefetchedHeader>() {
1481           @Override
1482           public PrefetchedHeader initialValue() {
1483             return new PrefetchedHeader();
1484           }
1485         };
1486 
1487     public FSReaderImpl(FSDataInputStreamWrapper stream, long fileSize, HFileSystem hfs, Path path,
1488         HFileContext fileContext) throws IOException {
1489       super(fileSize, hfs, path, fileContext);
1490       this.streamWrapper = stream;
1491       // Older versions of HBase didn't support checksum.
1492       this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum());
1493       defaultDecodingCtx = new HFileBlockDefaultDecodingContext(fileContext);
1494       encodedBlockDecodingCtx = defaultDecodingCtx;
1495     }
1496 
1497     /**
1498      * A constructor that reads files with the latest minor version.
1499      * This is used by unit tests only.
1500      */
1501     FSReaderImpl(FSDataInputStream istream, long fileSize, HFileContext fileContext)
1502     throws IOException {
1503       this(new FSDataInputStreamWrapper(istream), fileSize, null, null, fileContext);
1504     }
1505 
1506     /**
1507      * Reads a version 2 block (version 1 blocks not supported and not expected). Tries to do as
1508      * little memory allocation as possible, using the provided on-disk size.
1509      *
1510      * @param offset the offset in the stream to read at
1511      * @param onDiskSizeWithHeaderL the on-disk size of the block, including
1512      *          the header, or -1 if unknown
1513      * @param uncompressedSize the uncompressed size of the the block. Always
1514      *          expected to be -1. This parameter is only used in version 1.
1515      * @param pread whether to use a positional read
1516      */
1517     @Override
1518     public HFileBlock readBlockData(long offset, long onDiskSizeWithHeaderL,
1519         int uncompressedSize, boolean pread)
1520     throws IOException {
1521 
1522       // get a copy of the current state of whether to validate
1523       // hbase checksums or not for this read call. This is not
1524       // thread-safe but the one constaint is that if we decide
1525       // to skip hbase checksum verification then we are
1526       // guaranteed to use hdfs checksum verification.
1527       boolean doVerificationThruHBaseChecksum = streamWrapper.shouldUseHBaseChecksum();
1528       FSDataInputStream is = streamWrapper.getStream(doVerificationThruHBaseChecksum);
1529 
1530       HFileBlock blk = readBlockDataInternal(is, offset, (int) onDiskSizeWithHeaderL,
1531           uncompressedSize, pread, doVerificationThruHBaseChecksum);
1532       if (blk == null) {
1533         HFile.LOG.warn("HBase checksum verification failed for file " +
1534                        path + " at offset " +
1535                        offset + " filesize " + fileSize +
1536                        ". Retrying read with HDFS checksums turned on...");
1537 
1538         if (!doVerificationThruHBaseChecksum) {
1539           String msg = "HBase checksum verification failed for file " +
1540                        path + " at offset " +
1541                        offset + " filesize " + fileSize +
1542                        " but this cannot happen because doVerify is " +
1543                        doVerificationThruHBaseChecksum;
1544           HFile.LOG.warn(msg);
1545           throw new IOException(msg); // cannot happen case here
1546         }
1547         HFile.checksumFailures.incrementAndGet(); // update metrics
1548 
1549         // If we have a checksum failure, we fall back into a mode where
1550         // the next few reads use HDFS level checksums. We aim to make the
1551         // next CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD reads avoid
1552         // hbase checksum verification, but since this value is set without
1553         // holding any locks, it can so happen that we might actually do
1554         // a few more than precisely this number.
1555         is = this.streamWrapper.fallbackToFsChecksum(CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD);
1556         doVerificationThruHBaseChecksum = false;
1557         blk = readBlockDataInternal(is, offset, (int) onDiskSizeWithHeaderL, uncompressedSize,
1558             pread, doVerificationThruHBaseChecksum);
1559         if (blk != null) {
1560           HFile.LOG.warn("HDFS checksum verification suceeded for file " +
1561                          path + " at offset " +
1562                          offset + " filesize " + fileSize);
1563         }
1564       }
1565       if (blk == null && !doVerificationThruHBaseChecksum) {
1566         String msg = "readBlockData failed, possibly due to " +
1567                      "checksum verification failed for file " + path +
1568                      " at offset " + offset + " filesize " + fileSize;
1569         HFile.LOG.warn(msg);
1570         throw new IOException(msg);
1571       }
1572 
1573       // If there is a checksum mismatch earlier, then retry with
1574       // HBase checksums switched off and use HDFS checksum verification.
1575       // This triggers HDFS to detect and fix corrupt replicas. The
1576       // next checksumOffCount read requests will use HDFS checksums.
1577       // The decrementing of this.checksumOffCount is not thread-safe,
1578       // but it is harmless because eventually checksumOffCount will be
1579       // a negative number.
1580       streamWrapper.checksumOk();
1581       return blk;
1582     }
1583 
1584     /**
1585      * Reads a version 2 block.
1586      *
1587      * @param offset the offset in the stream to read at
1588      * @param onDiskSizeWithHeader the on-disk size of the block, including
1589      *          the header, or -1 if unknown
1590      * @param uncompressedSize the uncompressed size of the the block. Always
1591      *          expected to be -1. This parameter is only used in version 1.
1592      * @param pread whether to use a positional read
1593      * @param verifyChecksum Whether to use HBase checksums.
1594      *        If HBase checksum is switched off, then use HDFS checksum.
1595      * @return the HFileBlock or null if there is a HBase checksum mismatch
1596      */
1597     protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset,
1598         int onDiskSizeWithHeader, int uncompressedSize, boolean pread,
1599         boolean verifyChecksum)
1600     throws IOException {
1601       if (offset < 0) {
1602         throw new IOException("Invalid offset=" + offset + " trying to read "
1603             + "block (onDiskSize=" + onDiskSizeWithHeader
1604             + ", uncompressedSize=" + uncompressedSize + ")");
1605       }
1606 
1607       if (uncompressedSize != -1) {
1608         throw new IOException("Version 2 block reader API does not need " +
1609             "the uncompressed size parameter");
1610       }
1611 
1612       if ((onDiskSizeWithHeader < hdrSize && onDiskSizeWithHeader != -1)
1613           || onDiskSizeWithHeader >= Integer.MAX_VALUE) {
1614         throw new IOException("Invalid onDisksize=" + onDiskSizeWithHeader
1615             + ": expected to be at least " + hdrSize
1616             + " and at most " + Integer.MAX_VALUE + ", or -1 (offset="
1617             + offset + ", uncompressedSize=" + uncompressedSize + ")");
1618       }
1619 
1620       // See if we can avoid reading the header. This is desirable, because
1621       // we will not incur a backward seek operation if we have already
1622       // read this block's header as part of the previous read's look-ahead.
1623       // And we also want to skip reading the header again if it has already
1624       // been read.
1625       // TODO: How often does this optimization fire? Has to be same thread so the thread local
1626       // is pertinent and we have to be reading next block as in a big scan.
1627       PrefetchedHeader prefetchedHeader = prefetchedHeaderForThread.get();
1628       ByteBuffer headerBuf = prefetchedHeader.offset == offset? prefetchedHeader.buf: null;
1629 
1630       // Allocate enough space to fit the next block's header too.
1631       int nextBlockOnDiskSize = 0;
1632       byte[] onDiskBlock = null;
1633 
1634       if (onDiskSizeWithHeader > 0) {
1635         // We know the total on-disk size. Read the entire block into memory,
1636         // then parse the header. This code path is used when
1637         // doing a random read operation relying on the block index, as well as
1638         // when the client knows the on-disk size from peeking into the next
1639         // block's header (e.g. this block's header) when reading the previous
1640         // block. This is the faster and more preferable case.
1641 
1642         // Size that we have to skip in case we have already read the header.
1643         int preReadHeaderSize = headerBuf == null ? 0 : hdrSize;
1644         onDiskBlock = new byte[onDiskSizeWithHeader + hdrSize]; // room for this block plus the
1645                                                                 // next block's header
1646         nextBlockOnDiskSize = readAtOffset(is, onDiskBlock,
1647             preReadHeaderSize, onDiskSizeWithHeader - preReadHeaderSize,
1648             true, offset + preReadHeaderSize, pread);
1649         if (headerBuf != null) {
1650           // the header has been read when reading the previous block, copy
1651           // to this block's header
1652           // headerBuf is HBB
1653           assert headerBuf.hasArray();
1654           System.arraycopy(headerBuf.array(),
1655               headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize);
1656         } else {
1657           headerBuf = ByteBuffer.wrap(onDiskBlock, 0, hdrSize);
1658         }
1659         // if the caller specifies a onDiskSizeWithHeader, validate it.
1660         int expectedOnDiskSizeWithoutHeader = onDiskSizeWithHeader - hdrSize;
1661         int actualOnDiskSizeWithoutHeader =
1662             headerBuf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX);
1663         validateOnDiskSizeWithoutHeader(expectedOnDiskSizeWithoutHeader,
1664             actualOnDiskSizeWithoutHeader, headerBuf, offset);
1665       } else {
1666         // Check headerBuf to see if we have read this block's header as part of
1667         // reading the previous block. This is an optimization of peeking into
1668         // the next block's header (e.g.this block's header) when reading the
1669         // previous block. This is the faster and more preferable case. If the
1670         // header is already there, don't read the header again.
1671 
1672         // Unfortunately, we still have to do a separate read operation to
1673         // read the header.
1674         if (headerBuf == null) {
1675           // From the header, determine the on-disk size of the given hfile
1676           // block, and read the remaining data, thereby incurring two read
1677           // operations. This might happen when we are doing the first read
1678           // in a series of reads or a random read, and we don't have access
1679           // to the block index. This is costly and should happen very rarely.
1680           headerBuf = ByteBuffer.allocate(hdrSize);
1681           // headerBuf is HBB
1682           readAtOffset(is, headerBuf.array(), headerBuf.arrayOffset(),
1683               hdrSize, false, offset, pread);
1684         }
1685         int onDiskSizeWithoutHeader = headerBuf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX);
1686         onDiskSizeWithHeader = onDiskSizeWithoutHeader + hdrSize;
1687         onDiskBlock = new byte[onDiskSizeWithHeader + hdrSize];
1688         System.arraycopy(headerBuf.array(), headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize);
1689         nextBlockOnDiskSize =
1690           readAtOffset(is, onDiskBlock, hdrSize, onDiskSizeWithHeader - hdrSize, true,
1691               offset + hdrSize, pread);
1692       }
1693       ByteBuffer onDiskBlockByteBuffer = ByteBuffer.wrap(onDiskBlock, 0, onDiskSizeWithHeader);
1694 
1695       if (!fileContext.isCompressedOrEncrypted()) {
1696         verifyUncompressed(headerBuf, fileContext.isUseHBaseChecksum());
1697       }
1698 
1699       if (verifyChecksum && !validateChecksum(offset, onDiskBlockByteBuffer, hdrSize)) {
1700         return null;             // checksum mismatch
1701       }
1702 
1703       // The onDiskBlock will become the headerAndDataBuffer for this block.
1704       // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already
1705       // contains the header of next block, so no need to set next
1706       // block's header in it.
1707        HFileBlock b = new HFileBlock(onDiskBlockByteBuffer, this.fileContext.isUseHBaseChecksum());
1708 
1709       b.nextBlockOnDiskSizeWithHeader = nextBlockOnDiskSize;
1710 
1711       // Set prefetched header
1712       if (b.hasNextBlockHeader()) {
1713         prefetchedHeader.offset = offset + b.getOnDiskSizeWithHeader();
1714         System.arraycopy(onDiskBlock, onDiskSizeWithHeader, prefetchedHeader.header, 0, hdrSize);
1715       }
1716 
1717       b.offset = offset;
1718       b.fileContext.setIncludesTags(this.fileContext.isIncludesTags());
1719       b.fileContext.setIncludesMvcc(this.fileContext.isIncludesMvcc());
1720       return b;
1721     }
1722 
1723     void setIncludesMemstoreTS(boolean includesMemstoreTS) {
1724       this.fileContext.setIncludesMvcc(includesMemstoreTS);
1725     }
1726 
1727     void setDataBlockEncoder(HFileDataBlockEncoder encoder) {
1728       encodedBlockDecodingCtx = encoder.newDataBlockDecodingContext(this.fileContext);
1729     }
1730 
1731     @Override
1732     public HFileBlockDecodingContext getBlockDecodingContext() {
1733       return this.encodedBlockDecodingCtx;
1734     }
1735 
1736     @Override
1737     public HFileBlockDecodingContext getDefaultBlockDecodingContext() {
1738       return this.defaultDecodingCtx;
1739     }
1740 
1741     /**
1742      * Generates the checksum for the header as well as the data and then validates it.
1743      * If the block doesn't uses checksum, returns false.
1744      * @return True if checksum matches, else false.
1745      */
1746     protected boolean validateChecksum(long offset, ByteBuffer data, int hdrSize)
1747         throws IOException {
1748       // If this is an older version of the block that does not have checksums, then return false
1749       // indicating that checksum verification did not succeed. Actually, this method should never
1750       // be called when the minorVersion is 0, thus this is a defensive check for a cannot-happen
1751       // case. Since this is a cannot-happen case, it is better to return false to indicate a
1752       // checksum validation failure.
1753       if (!fileContext.isUseHBaseChecksum()) {
1754         return false;
1755       }
1756       return ChecksumUtil.validateChecksum(data, path, offset, hdrSize);
1757     }
1758 
1759     @Override
1760     public void closeStreams() throws IOException {
1761       streamWrapper.close();
1762     }
1763 
1764     @Override
1765     public String toString() {
1766       return "hfs=" + hfs + ", path=" + path + ", fileContext=" + fileContext;
1767     }
1768   }
1769 
1770   @Override
1771   public int getSerializedLength() {
1772     if (buf != null) {
1773       // include extra bytes for the next header when it's available.
1774       int extraSpace = hasNextBlockHeader() ? headerSize() : 0;
1775       return this.buf.limit() + extraSpace + HFileBlock.EXTRA_SERIALIZATION_SPACE;
1776     }
1777     return 0;
1778   }
1779 
1780   @Override
1781   public void serialize(ByteBuffer destination) {
1782     ByteBufferUtils.copyFromBufferToBuffer(destination, this.buf, 0, getSerializedLength()
1783         - EXTRA_SERIALIZATION_SPACE);
1784     serializeExtraInfo(destination);
1785   }
1786 
1787   public void serializeExtraInfo(ByteBuffer destination) {
1788     destination.put(this.fileContext.isUseHBaseChecksum() ? (byte) 1 : (byte) 0);
1789     destination.putLong(this.offset);
1790     destination.putInt(this.nextBlockOnDiskSizeWithHeader);
1791     destination.rewind();
1792   }
1793 
1794   @Override
1795   public CacheableDeserializer<Cacheable> getDeserializer() {
1796     return HFileBlock.blockDeserializer;
1797   }
1798 
1799   @Override
1800   public boolean equals(Object comparison) {
1801     if (this == comparison) {
1802       return true;
1803     }
1804     if (comparison == null) {
1805       return false;
1806     }
1807     if (comparison.getClass() != this.getClass()) {
1808       return false;
1809     }
1810 
1811     HFileBlock castedComparison = (HFileBlock) comparison;
1812 
1813     if (castedComparison.blockType != this.blockType) {
1814       return false;
1815     }
1816     if (castedComparison.nextBlockOnDiskSizeWithHeader != this.nextBlockOnDiskSizeWithHeader) {
1817       return false;
1818     }
1819     if (castedComparison.offset != this.offset) {
1820       return false;
1821     }
1822     if (castedComparison.onDiskSizeWithoutHeader != this.onDiskSizeWithoutHeader) {
1823       return false;
1824     }
1825     if (castedComparison.prevBlockOffset != this.prevBlockOffset) {
1826       return false;
1827     }
1828     if (castedComparison.uncompressedSizeWithoutHeader != this.uncompressedSizeWithoutHeader) {
1829       return false;
1830     }
1831     if (ByteBufferUtils.compareTo(this.buf, 0, this.buf.limit(), castedComparison.buf, 0,
1832         castedComparison.buf.limit()) != 0) {
1833       return false;
1834     }
1835     return true;
1836   }
1837 
1838   public DataBlockEncoding getDataBlockEncoding() {
1839     if (blockType == BlockType.ENCODED_DATA) {
1840       return DataBlockEncoding.getEncodingById(getDataBlockEncodingId());
1841     }
1842     return DataBlockEncoding.NONE;
1843   }
1844 
1845   byte getChecksumType() {
1846     return this.fileContext.getChecksumType().getCode();
1847   }
1848 
1849   int getBytesPerChecksum() {
1850     return this.fileContext.getBytesPerChecksum();
1851   }
1852 
1853   /** @return the size of data on disk + header. Excludes checksum. */
1854   int getOnDiskDataSizeWithHeader() {
1855     return this.onDiskDataSizeWithHeader;
1856   }
1857 
1858   /**
1859    * Calculate the number of bytes required to store all the checksums for this block. Each
1860    * checksum value is a 4 byte integer ({@link HFileBlock#CHECKSUM_SIZE}).
1861    */
1862   int totalChecksumBytes() {
1863     return HFileBlock.totalChecksumBytes(this.fileContext, onDiskDataSizeWithHeader);
1864   }
1865 
1866   private static int totalChecksumBytes(HFileContext fileContext, int onDiskDataSizeWithHeader) {
1867     // If the hfile block has minorVersion 0, then there are no checksum
1868     // data to validate. Similarly, a zero value in this.bytesPerChecksum
1869     // indicates that cached blocks do not have checksum data because
1870     // checksums were already validated when the block was read from disk.
1871     if (!fileContext.isUseHBaseChecksum() || fileContext.getBytesPerChecksum() == 0) {
1872       return 0;
1873     }
1874     return (int) ChecksumUtil.numBytes(onDiskDataSizeWithHeader,
1875         fileContext.getBytesPerChecksum());
1876   }
1877 
1878   /**
1879    * Returns the size of this block header.
1880    */
1881   public int headerSize() {
1882     return headerSize(this.fileContext.isUseHBaseChecksum());
1883   }
1884 
1885   /**
1886    * Maps a minor version to the size of the header.
1887    */
1888   public static int headerSize(boolean usesHBaseChecksum) {
1889     if (usesHBaseChecksum) {
1890       return HConstants.HFILEBLOCK_HEADER_SIZE;
1891     }
1892     return HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
1893   }
1894 
1895   /**
1896    * Return the appropriate DUMMY_HEADER for the minor version
1897    */
1898   public byte[] getDummyHeaderForVersion() {
1899     return getDummyHeaderForVersion(this.fileContext.isUseHBaseChecksum());
1900   }
1901 
1902   /**
1903    * Return the appropriate DUMMY_HEADER for the minor version
1904    */
1905   static private byte[] getDummyHeaderForVersion(boolean usesHBaseChecksum) {
1906     if (usesHBaseChecksum) {
1907       return HConstants.HFILEBLOCK_DUMMY_HEADER;
1908     }
1909     return DUMMY_HEADER_NO_CHECKSUM;
1910   }
1911 
1912   /**
1913    * @return the HFileContext used to create this HFileBlock. Not necessary the
1914    * fileContext for the file from which this block's data was originally read.
1915    */
1916   public HFileContext getHFileContext() {
1917     return this.fileContext;
1918   }
1919 
1920   /**
1921    * Convert the contents of the block header into a human readable string.
1922    * This is mostly helpful for debugging. This assumes that the block
1923    * has minor version > 0.
1924    */
1925   public static String toStringHeader(ByteBuffer buf) throws IOException {
1926     byte[] magicBuf = new byte[Math.min(buf.limit() - buf.position(), BlockType.MAGIC_LENGTH)];
1927     buf.get(magicBuf);
1928     int compressedBlockSizeNoHeader = buf.getInt();
1929     int uncompressedBlockSizeNoHeader = buf.getInt();
1930     long prevBlockOffset = buf.getLong();
1931     byte cksumtype = buf.get();
1932     long bytesPerChecksum = buf.getInt();
1933     long onDiskDataSizeWithHeader = buf.getInt();
1934     return " Header dump: magic: " + Bytes.toString(magicBuf) +
1935                    " blockType " + magicBuf +
1936                    " compressedBlockSizeNoHeader " +
1937                    compressedBlockSizeNoHeader +
1938                    " uncompressedBlockSizeNoHeader " +
1939                    uncompressedBlockSizeNoHeader +
1940                    " prevBlockOffset " + prevBlockOffset +
1941                    " checksumType " + ChecksumType.codeToType(cksumtype) +
1942                    " bytesPerChecksum " + bytesPerChecksum +
1943                    " onDiskDataSizeWithHeader " + onDiskDataSizeWithHeader;
1944   }
1945 }