View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.io.hfile;
19  
20  import java.io.ByteArrayOutputStream;
21  import java.io.DataInputStream;
22  import java.io.DataOutput;
23  import java.io.DataOutputStream;
24  import java.io.IOException;
25  import java.io.InputStream;
26  import java.nio.ByteBuffer;
27  import java.util.concurrent.locks.Lock;
28  import java.util.concurrent.locks.ReentrantLock;
29  
30  import org.apache.hadoop.fs.FSDataInputStream;
31  import org.apache.hadoop.fs.FSDataOutputStream;
32  import org.apache.hadoop.fs.Path;
33  import org.apache.hadoop.hbase.Cell;
34  import org.apache.hadoop.hbase.HConstants;
35  import org.apache.hadoop.hbase.classification.InterfaceAudience;
36  import org.apache.hadoop.hbase.fs.HFileSystem;
37  import org.apache.hadoop.hbase.io.ByteBufferInputStream;
38  import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
39  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
40  import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
41  import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
42  import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
43  import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
44  import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
45  import org.apache.hadoop.hbase.util.ByteBufferUtils;
46  import org.apache.hadoop.hbase.util.Bytes;
47  import org.apache.hadoop.hbase.util.ChecksumType;
48  import org.apache.hadoop.hbase.util.ClassSize;
49  import org.apache.hadoop.hbase.util.CompoundBloomFilter;
50  import org.apache.hadoop.io.IOUtils;
51  
52  import com.google.common.base.Preconditions;
53  
54  /**
55   * Reading {@link HFile} version 1 and 2 blocks, and writing version 2 blocks.
56   * <ul>
57   * <li>In version 1 all blocks are always compressed or uncompressed, as
58   * specified by the {@link HFile}'s compression algorithm, with a type-specific
59   * magic record stored in the beginning of the compressed data (i.e. one needs
60   * to uncompress the compressed block to determine the block type). There is
61   * only a single compression algorithm setting for all blocks. Offset and size
62   * information from the block index are required to read a block.
63   * <li>In version 2 a block is structured as follows:
64   * <ul>
65   * <li>header (see Writer#finishBlock())
66   * <ul>
67   * <li>Magic record identifying the block type (8 bytes)
68   * <li>Compressed block size, excluding header, including checksum (4 bytes)
69   * <li>Uncompressed block size, excluding header, excluding checksum (4 bytes)
70   * <li>The offset of the previous block of the same type (8 bytes). This is
71   * used to be able to navigate to the previous block without going to the block
72   * <li>For minorVersions >=1, the ordinal describing checksum type (1 byte)
73   * <li>For minorVersions >=1, the number of data bytes/checksum chunk (4 bytes)
74   * <li>For minorVersions >=1, the size of data on disk, including header,
75   * excluding checksums (4 bytes)
76   * </ul>
77   * </li>
78   * <li>Raw/Compressed/Encrypted/Encoded data. The compression algorithm is the
79   * same for all the blocks in the {@link HFile}, similarly to what was done in
80   * version 1.
81   * <li>For minorVersions >=1, a series of 4 byte checksums, one each for
82   * the number of bytes specified by bytesPerChecksum.
83   * </ul>
84   * </ul>
85   */
86  @InterfaceAudience.Private
87  public class HFileBlock implements Cacheable {
88  
89    /**
90     * On a checksum failure on a Reader, these many suceeding read
91     * requests switch back to using hdfs checksums before auto-reenabling
92     * hbase checksum verification.
93     */
94    static final int CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD = 3;
95  
96    public static final boolean FILL_HEADER = true;
97    public static final boolean DONT_FILL_HEADER = false;
98  
99    /**
100    * The size of block header when blockType is {@link BlockType#ENCODED_DATA}.
101    * This extends normal header by adding the id of encoder.
102    */
103   public static final int ENCODED_HEADER_SIZE = HConstants.HFILEBLOCK_HEADER_SIZE
104       + DataBlockEncoding.ID_SIZE;
105 
106   static final byte[] DUMMY_HEADER_NO_CHECKSUM =
107      new byte[HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM];
108 
109   public static final int BYTE_BUFFER_HEAP_SIZE = (int) ClassSize.estimateBase(
110       ByteBuffer.wrap(new byte[0], 0, 0).getClass(), false);
111 
112   // meta.usesHBaseChecksum+offset+nextBlockOnDiskSizeWithHeader
113   public static final int EXTRA_SERIALIZATION_SPACE = Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT
114       + Bytes.SIZEOF_LONG;
115 
116   /**
117    * Each checksum value is an integer that can be stored in 4 bytes.
118    */
119   static final int CHECKSUM_SIZE = Bytes.SIZEOF_INT;
120 
121   private static final CacheableDeserializer<Cacheable> blockDeserializer =
122       new CacheableDeserializer<Cacheable>() {
123         public HFileBlock deserialize(ByteBuffer buf, boolean reuse) throws IOException{
124           buf.limit(buf.limit() - HFileBlock.EXTRA_SERIALIZATION_SPACE).rewind();
125           ByteBuffer newByteBuffer;
126           if (reuse) {
127             newByteBuffer = buf.slice();
128           } else {
129            newByteBuffer = ByteBuffer.allocate(buf.limit());
130            newByteBuffer.put(buf);
131           }
132           buf.position(buf.limit());
133           buf.limit(buf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE);
134           boolean usesChecksum = buf.get() == (byte)1;
135           HFileBlock ourBuffer = new HFileBlock(newByteBuffer, usesChecksum);
136           ourBuffer.offset = buf.getLong();
137           ourBuffer.nextBlockOnDiskSizeWithHeader = buf.getInt();
138           if (ourBuffer.hasNextBlockHeader()) {
139             ourBuffer.buf.limit(ourBuffer.buf.limit() - ourBuffer.headerSize());
140           }
141           return ourBuffer;
142         }
143         
144         @Override
145         public int getDeserialiserIdentifier() {
146           return deserializerIdentifier;
147         }
148 
149         @Override
150         public HFileBlock deserialize(ByteBuffer b) throws IOException {
151           return deserialize(b, false);
152         }
153       };
154   private static final int deserializerIdentifier;
155   static {
156     deserializerIdentifier = CacheableDeserializerIdManager
157         .registerDeserializer(blockDeserializer);
158   }
159 
160   /** Type of block. Header field 0. */
161   private BlockType blockType;
162 
163   /** Size on disk excluding header, including checksum. Header field 1. */
164   private int onDiskSizeWithoutHeader;
165 
166   /** Size of pure data. Does not include header or checksums. Header field 2. */
167   private final int uncompressedSizeWithoutHeader;
168 
169   /** The offset of the previous block on disk. Header field 3. */
170   private final long prevBlockOffset;
171 
172   /**
173    * Size on disk of header + data. Excludes checksum. Header field 6,
174    * OR calculated from {@link #onDiskSizeWithoutHeader} when using HDFS checksum.
175    */
176   private final int onDiskDataSizeWithHeader;
177 
178   /** The in-memory representation of the hfile block */
179   private ByteBuffer buf;
180 
181   /** Meta data that holds meta information on the hfileblock */
182   private HFileContext fileContext;
183 
184   /**
185    * The offset of this block in the file. Populated by the reader for
186    * convenience of access. This offset is not part of the block header.
187    */
188   private long offset = -1;
189 
190   /**
191    * The on-disk size of the next block, including the header, obtained by
192    * peeking into the first {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the next block's
193    * header, or -1 if unknown.
194    */
195   private int nextBlockOnDiskSizeWithHeader = -1;
196 
197   /**
198    * Creates a new {@link HFile} block from the given fields. This constructor
199    * is mostly used when the block data has already been read and uncompressed,
200    * and is sitting in a byte buffer. 
201    *
202    * @param blockType the type of this block, see {@link BlockType}
203    * @param onDiskSizeWithoutHeader see {@link #onDiskSizeWithoutHeader}
204    * @param uncompressedSizeWithoutHeader see {@link #uncompressedSizeWithoutHeader}
205    * @param prevBlockOffset see {@link #prevBlockOffset}
206    * @param buf block header ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes) followed by
207    *          uncompressed data. This
208    * @param fillHeader when true, parse {@code buf} and override the first 4 header fields.
209    * @param offset the file offset the block was read from
210    * @param onDiskDataSizeWithHeader see {@link #onDiskDataSizeWithHeader}
211    * @param fileContext HFile meta data
212    */
213   HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader, int uncompressedSizeWithoutHeader,
214       long prevBlockOffset, ByteBuffer buf, boolean fillHeader, long offset,
215       int onDiskDataSizeWithHeader, HFileContext fileContext) {
216     this.blockType = blockType;
217     this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader;
218     this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader;
219     this.prevBlockOffset = prevBlockOffset;
220     this.buf = buf;
221     if (fillHeader)
222       overwriteHeader();
223     this.offset = offset;
224     this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader;
225     this.fileContext = fileContext;
226     this.buf.rewind();
227   }
228 
229   /**
230    * Copy constructor. Creates a shallow copy of {@code that}'s buffer.
231    */
232   HFileBlock(HFileBlock that) {
233     this.blockType = that.blockType;
234     this.onDiskSizeWithoutHeader = that.onDiskSizeWithoutHeader;
235     this.uncompressedSizeWithoutHeader = that.uncompressedSizeWithoutHeader;
236     this.prevBlockOffset = that.prevBlockOffset;
237     this.buf = that.buf.duplicate();
238     this.offset = that.offset;
239     this.onDiskDataSizeWithHeader = that.onDiskDataSizeWithHeader;
240     this.fileContext = that.fileContext;
241     this.nextBlockOnDiskSizeWithHeader = that.nextBlockOnDiskSizeWithHeader;
242   }
243 
244   /**
245    * Creates a block from an existing buffer starting with a header. Rewinds
246    * and takes ownership of the buffer. By definition of rewind, ignores the
247    * buffer position, but if you slice the buffer beforehand, it will rewind
248    * to that point. The reason this has a minorNumber and not a majorNumber is
249    * because majorNumbers indicate the format of a HFile whereas minorNumbers 
250    * indicate the format inside a HFileBlock.
251    */
252   HFileBlock(ByteBuffer b, boolean usesHBaseChecksum) throws IOException {
253     b.rewind();
254     blockType = BlockType.read(b);
255     onDiskSizeWithoutHeader = b.getInt();
256     uncompressedSizeWithoutHeader = b.getInt();
257     prevBlockOffset = b.getLong();
258     HFileContextBuilder contextBuilder = new HFileContextBuilder();
259     contextBuilder.withHBaseCheckSum(usesHBaseChecksum);
260     if (usesHBaseChecksum) {
261       contextBuilder.withChecksumType(ChecksumType.codeToType(b.get()));
262       contextBuilder.withBytesPerCheckSum(b.getInt());
263       this.onDiskDataSizeWithHeader = b.getInt();
264     } else {
265       contextBuilder.withChecksumType(ChecksumType.NULL);
266       contextBuilder.withBytesPerCheckSum(0);
267       this.onDiskDataSizeWithHeader = onDiskSizeWithoutHeader +
268                                        HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
269     }
270     this.fileContext = contextBuilder.build();
271     buf = b;
272     buf.rewind();
273   }
274 
275   public BlockType getBlockType() {
276     return blockType;
277   }
278 
279   /** @return get data block encoding id that was used to encode this block */
280   public short getDataBlockEncodingId() {
281     if (blockType != BlockType.ENCODED_DATA) {
282       throw new IllegalArgumentException("Querying encoder ID of a block " +
283           "of type other than " + BlockType.ENCODED_DATA + ": " + blockType);
284     }
285     return buf.getShort(headerSize());
286   }
287 
288   /**
289    * @return the on-disk size of header + data part + checksum.
290    */
291   public int getOnDiskSizeWithHeader() {
292     return onDiskSizeWithoutHeader + headerSize();
293   }
294 
295   /**
296    * @return the on-disk size of the data part + checksum (header excluded).
297    */
298   public int getOnDiskSizeWithoutHeader() {
299     return onDiskSizeWithoutHeader;
300   }
301 
302   /**
303    * @return the uncompressed size of data part (header and checksum excluded).
304    */
305    public int getUncompressedSizeWithoutHeader() {
306     return uncompressedSizeWithoutHeader;
307   }
308 
309   /**
310    * @return the offset of the previous block of the same type in the file, or
311    *         -1 if unknown
312    */
313   public long getPrevBlockOffset() {
314     return prevBlockOffset;
315   }
316 
317   /**
318    * Rewinds {@code buf} and writes first 4 header fields. {@code buf} position
319    * is modified as side-effect.
320    */
321   private void overwriteHeader() {
322     buf.rewind();
323     blockType.write(buf);
324     buf.putInt(onDiskSizeWithoutHeader);
325     buf.putInt(uncompressedSizeWithoutHeader);
326     buf.putLong(prevBlockOffset);
327   }
328 
329   /**
330    * Returns a buffer that does not include the header or checksum.
331    *
332    * @return the buffer with header skipped and checksum omitted.
333    */
334   public ByteBuffer getBufferWithoutHeader() {
335     ByteBuffer dup = this.buf.duplicate();
336     dup.position(headerSize());
337     dup.limit(buf.limit() - totalChecksumBytes());
338     return dup.slice();
339   }
340 
341   /**
342    * Returns the buffer this block stores internally. The clients must not
343    * modify the buffer object. This method has to be public because it is
344    * used in {@link CompoundBloomFilter} to avoid object creation on every
345    * Bloom filter lookup, but has to be used with caution. Checksum data
346    * is not included in the returned buffer but header data is.
347    *
348    * @return the buffer of this block for read-only operations
349    */
350   public ByteBuffer getBufferReadOnly() {
351     ByteBuffer dup = this.buf.duplicate();
352     dup.limit(buf.limit() - totalChecksumBytes());
353     return dup.slice();
354   }
355 
356   /**
357    * Returns the buffer of this block, including header data. The clients must
358    * not modify the buffer object. This method has to be public because it is
359    * used in {@link BucketCache} to avoid buffer copy.
360    * 
361    * @return the buffer with header and checksum included for read-only operations
362    */
363   public ByteBuffer getBufferReadOnlyWithHeader() {
364     ByteBuffer dup = this.buf.duplicate();
365     return dup.slice();
366   }
367 
368   /**
369    * Returns a byte buffer of this block, including header data and checksum, positioned at
370    * the beginning of header. The underlying data array is not copied.
371    *
372    * @return the byte buffer with header and checksum included
373    */
374   ByteBuffer getBufferWithHeader() {
375     ByteBuffer dupBuf = buf.duplicate();
376     dupBuf.rewind();
377     return dupBuf;
378   }
379 
380   private void sanityCheckAssertion(long valueFromBuf, long valueFromField,
381       String fieldName) throws IOException {
382     if (valueFromBuf != valueFromField) {
383       throw new AssertionError(fieldName + " in the buffer (" + valueFromBuf
384           + ") is different from that in the field (" + valueFromField + ")");
385     }
386   }
387 
388   private void sanityCheckAssertion(BlockType valueFromBuf, BlockType valueFromField)
389       throws IOException {
390     if (valueFromBuf != valueFromField) {
391       throw new IOException("Block type stored in the buffer: " +
392         valueFromBuf + ", block type field: " + valueFromField);
393     }
394   }
395 
396   /**
397    * Checks if the block is internally consistent, i.e. the first
398    * {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the buffer contain a
399    * valid header consistent with the fields. Assumes a packed block structure.
400    * This function is primary for testing and debugging, and is not
401    * thread-safe, because it alters the internal buffer pointer.
402    */
403   void sanityCheck() throws IOException {
404     buf.rewind();
405 
406     sanityCheckAssertion(BlockType.read(buf), blockType);
407 
408     sanityCheckAssertion(buf.getInt(), onDiskSizeWithoutHeader,
409         "onDiskSizeWithoutHeader");
410 
411     sanityCheckAssertion(buf.getInt(), uncompressedSizeWithoutHeader,
412         "uncompressedSizeWithoutHeader");
413 
414     sanityCheckAssertion(buf.getLong(), prevBlockOffset, "prevBlocKOffset");
415     if (this.fileContext.isUseHBaseChecksum()) {
416       sanityCheckAssertion(buf.get(), this.fileContext.getChecksumType().getCode(), "checksumType");
417       sanityCheckAssertion(buf.getInt(), this.fileContext.getBytesPerChecksum(), "bytesPerChecksum");
418       sanityCheckAssertion(buf.getInt(), onDiskDataSizeWithHeader, "onDiskDataSizeWithHeader");
419     }
420 
421     int cksumBytes = totalChecksumBytes();
422     int expectedBufLimit = onDiskDataSizeWithHeader + cksumBytes;
423     if (buf.limit() != expectedBufLimit) {
424       throw new AssertionError("Expected buffer limit " + expectedBufLimit
425           + ", got " + buf.limit());
426     }
427 
428     // We might optionally allocate HFILEBLOCK_HEADER_SIZE more bytes to read the next
429     // block's header, so there are two sensible values for buffer capacity.
430     int hdrSize = headerSize();
431     if (buf.capacity() != expectedBufLimit &&
432         buf.capacity() != expectedBufLimit + hdrSize) {
433       throw new AssertionError("Invalid buffer capacity: " + buf.capacity() +
434           ", expected " + expectedBufLimit + " or " + (expectedBufLimit + hdrSize));
435     }
436   }
437 
438   @Override
439   public String toString() {
440     StringBuilder sb = new StringBuilder()
441       .append("HFileBlock [")
442       .append(" fileOffset=").append(offset)
443       .append(" headerSize()=").append(headerSize())
444       .append(" blockType=").append(blockType)
445       .append(" onDiskSizeWithoutHeader=").append(onDiskSizeWithoutHeader)
446       .append(" uncompressedSizeWithoutHeader=").append(uncompressedSizeWithoutHeader)
447       .append(" prevBlockOffset=").append(prevBlockOffset)
448       .append(" isUseHBaseChecksum()=").append(fileContext.isUseHBaseChecksum());
449     if (fileContext.isUseHBaseChecksum()) {
450       sb.append(" checksumType=").append(ChecksumType.codeToType(this.buf.get(24)))
451         .append(" bytesPerChecksum=").append(this.buf.getInt(24 + 1))
452         .append(" onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader);
453     } else {
454       sb.append(" onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader)
455         .append("(").append(onDiskSizeWithoutHeader)
456         .append("+").append(HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM).append(")");
457     }
458     String dataBegin = null;
459     if (buf.hasArray()) {
460       dataBegin = Bytes.toStringBinary(buf.array(), buf.arrayOffset() + headerSize(),
461           Math.min(32, buf.limit() - buf.arrayOffset() - headerSize()));
462     } else {
463       ByteBuffer bufWithoutHeader = getBufferWithoutHeader();
464       byte[] dataBeginBytes = new byte[Math.min(32,
465           bufWithoutHeader.limit() - bufWithoutHeader.position())];
466       bufWithoutHeader.get(dataBeginBytes);
467       dataBegin = Bytes.toStringBinary(dataBeginBytes);
468     }
469     sb.append(" getOnDiskSizeWithHeader()=").append(getOnDiskSizeWithHeader())
470       .append(" totalChecksumBytes()=").append(totalChecksumBytes())
471       .append(" isUnpacked()=").append(isUnpacked())
472       .append(" buf=[ ").append(buf).append(" ]")
473       .append(" dataBeginsWith=").append(dataBegin)
474       .append(" fileContext=").append(fileContext)
475       .append(" ]");
476     return sb.toString();
477   }
478 
479   /**
480    * Called after reading a block with provided onDiskSizeWithHeader.
481    */
482   private void validateOnDiskSizeWithoutHeader(
483       int expectedOnDiskSizeWithoutHeader) throws IOException {
484     if (onDiskSizeWithoutHeader != expectedOnDiskSizeWithoutHeader) {
485       String dataBegin = null;
486       if (buf.hasArray()) {
487         dataBegin = Bytes.toStringBinary(buf.array(), buf.arrayOffset(), Math.min(32, buf.limit()));
488       } else {
489         ByteBuffer bufDup = getBufferReadOnly();
490         byte[] dataBeginBytes = new byte[Math.min(32, bufDup.limit() - bufDup.position())];
491         bufDup.get(dataBeginBytes);
492         dataBegin = Bytes.toStringBinary(dataBeginBytes);
493       }
494       String blockInfoMsg =
495         "Block offset: " + offset + ", data starts with: " + dataBegin;
496       throw new IOException("On-disk size without header provided is "
497           + expectedOnDiskSizeWithoutHeader + ", but block "
498           + "header contains " + onDiskSizeWithoutHeader + ". " +
499           blockInfoMsg);
500     }
501   }
502 
503   /**
504    * Retrieves the decompressed/decrypted view of this block. An encoded block remains in its
505    * encoded structure. Internal structures are shared between instances where applicable.
506    */
507   HFileBlock unpack(HFileContext fileContext, FSReader reader) throws IOException {
508     if (!fileContext.isCompressedOrEncrypted()) {
509       // TODO: cannot use our own fileContext here because HFileBlock(ByteBuffer, boolean),
510       // which is used for block serialization to L2 cache, does not preserve encoding and
511       // encryption details.
512       return this;
513     }
514 
515     HFileBlock unpacked = new HFileBlock(this);
516     unpacked.allocateBuffer(); // allocates space for the decompressed block
517 
518     HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA ?
519       reader.getBlockDecodingContext() : reader.getDefaultBlockDecodingContext();
520 
521     ByteBuffer dup = this.buf.duplicate();
522     dup.position(this.headerSize());
523     dup = dup.slice();
524     ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(),
525       unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(),
526       dup);
527 
528     // Preserve the next block's header bytes in the new block if we have them.
529     if (unpacked.hasNextBlockHeader()) {
530       // Both the buffers are limited till checksum bytes and avoid the next block's header.
531       // Below call to copyFromBufferToBuffer() will try positional read/write from/to buffers when
532       // any of the buffer is DBB. So we change the limit on a dup buffer. No copying just create
533       // new BB objects
534       ByteBuffer inDup = this.buf.duplicate();
535       inDup.limit(inDup.limit() + headerSize());
536       ByteBuffer outDup = unpacked.buf.duplicate();
537       outDup.limit(outDup.limit() + unpacked.headerSize());
538       ByteBufferUtils.copyFromBufferToBuffer(
539           outDup,
540           inDup,
541           this.onDiskDataSizeWithHeader,
542           unpacked.headerSize() + unpacked.uncompressedSizeWithoutHeader
543               + unpacked.totalChecksumBytes(), unpacked.headerSize());
544     }
545     return unpacked;
546   }
547 
548   /**
549    * Return true when this buffer includes next block's header.
550    */
551   private boolean hasNextBlockHeader() {
552     return nextBlockOnDiskSizeWithHeader > 0;
553   }
554 
555   /**
556    * Always allocates a new buffer of the correct size. Copies header bytes
557    * from the existing buffer. Does not change header fields. 
558    * Reserve room to keep checksum bytes too.
559    */
560   private void allocateBuffer() {
561     int cksumBytes = totalChecksumBytes();
562     int headerSize = headerSize();
563     int capacityNeeded = headerSize + uncompressedSizeWithoutHeader +
564         cksumBytes + (hasNextBlockHeader() ? headerSize : 0);
565 
566     // TODO we need consider allocating offheap here?
567     ByteBuffer newBuf = ByteBuffer.allocate(capacityNeeded);
568 
569     // Copy header bytes into newBuf.
570     // newBuf is HBB so no issue in calling array()
571     ByteBuffer dup = buf.duplicate();
572     dup.position(0);
573     dup.get(newBuf.array(), newBuf.arrayOffset(), headerSize);
574 
575     buf = newBuf;
576     // set limit to exclude next block's header
577     buf.limit(headerSize + uncompressedSizeWithoutHeader + cksumBytes);
578   }
579 
580   /**
581    * Return true when this block's buffer has been unpacked, false otherwise. Note this is a
582    * calculated heuristic, not tracked attribute of the block.
583    */
584   public boolean isUnpacked() {
585     final int cksumBytes = totalChecksumBytes();
586     final int headerSize = headerSize();
587     final int expectedCapacity = headerSize + uncompressedSizeWithoutHeader + cksumBytes;
588     final int bufCapacity = buf.capacity();
589     return bufCapacity == expectedCapacity || bufCapacity == expectedCapacity + headerSize;
590   }
591 
592   /** An additional sanity-check in case no compression or encryption is being used. */
593   public void assumeUncompressed() throws IOException {
594     if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader +
595         totalChecksumBytes()) {
596       throw new IOException("Using no compression but "
597           + "onDiskSizeWithoutHeader=" + onDiskSizeWithoutHeader + ", "
598           + "uncompressedSizeWithoutHeader=" + uncompressedSizeWithoutHeader
599           + ", numChecksumbytes=" + totalChecksumBytes());
600     }
601   }
602 
603   /**
604    * @param expectedType the expected type of this block
605    * @throws IOException if this block's type is different than expected
606    */
607   public void expectType(BlockType expectedType) throws IOException {
608     if (blockType != expectedType) {
609       throw new IOException("Invalid block type: expected=" + expectedType
610           + ", actual=" + blockType);
611     }
612   }
613 
614   /** @return the offset of this block in the file it was read from */
615   public long getOffset() {
616     if (offset < 0) {
617       throw new IllegalStateException(
618           "HFile block offset not initialized properly");
619     }
620     return offset;
621   }
622 
623   /**
624    * @return a byte stream reading the data + checksum of this block
625    */
626   public DataInputStream getByteStream() {
627     ByteBuffer dup = this.buf.duplicate();
628     dup.position(this.headerSize());
629     return new DataInputStream(new ByteBufferInputStream(dup));
630   }
631 
632   @Override
633   public long heapSize() {
634     long size = ClassSize.align(
635         ClassSize.OBJECT +
636         // Block type, byte buffer and meta references
637         3 * ClassSize.REFERENCE +
638         // On-disk size, uncompressed size, and next block's on-disk size
639         // bytePerChecksum and onDiskDataSize
640         4 * Bytes.SIZEOF_INT +
641         // This and previous block offset
642         2 * Bytes.SIZEOF_LONG +
643         // Heap size of the meta object. meta will be always not null.
644         fileContext.heapSize()
645     );
646 
647     if (buf != null) {
648       // Deep overhead of the byte buffer. Needs to be aligned separately.
649       size += ClassSize.align(buf.capacity() + BYTE_BUFFER_HEAP_SIZE);
650     }
651 
652     return ClassSize.align(size);
653   }
654 
655   /**
656    * Read from an input stream. Analogous to
657    * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but specifies a
658    * number of "extra" bytes that would be desirable but not absolutely
659    * necessary to read.
660    *
661    * @param in the input stream to read from
662    * @param buf the buffer to read into
663    * @param bufOffset the destination offset in the buffer
664    * @param necessaryLen the number of bytes that are absolutely necessary to
665    *          read
666    * @param extraLen the number of extra bytes that would be nice to read
667    * @return true if succeeded reading the extra bytes
668    * @throws IOException if failed to read the necessary bytes
669    */
670   public static boolean readWithExtra(InputStream in, byte buf[],
671       int bufOffset, int necessaryLen, int extraLen) throws IOException {
672     int bytesRemaining = necessaryLen + extraLen;
673     while (bytesRemaining > 0) {
674       int ret = in.read(buf, bufOffset, bytesRemaining);
675       if (ret == -1 && bytesRemaining <= extraLen) {
676         // We could not read the "extra data", but that is OK.
677         break;
678       }
679 
680       if (ret < 0) {
681         throw new IOException("Premature EOF from inputStream (read "
682             + "returned " + ret + ", was trying to read " + necessaryLen
683             + " necessary bytes and " + extraLen + " extra bytes, "
684             + "successfully read "
685             + (necessaryLen + extraLen - bytesRemaining));
686       }
687       bufOffset += ret;
688       bytesRemaining -= ret;
689     }
690     return bytesRemaining <= 0;
691   }
692 
693   /**
694    * @return the on-disk size of the next block (including the header size)
695    *         that was read by peeking into the next block's header
696    */
697   public int getNextBlockOnDiskSizeWithHeader() {
698     return nextBlockOnDiskSizeWithHeader;
699   }
700 
701   /**
702    * Unified version 2 {@link HFile} block writer. The intended usage pattern
703    * is as follows:
704    * <ol>
705    * <li>Construct an {@link HFileBlock.Writer}, providing a compression algorithm.
706    * <li>Call {@link Writer#startWriting} and get a data stream to write to.
707    * <li>Write your data into the stream.
708    * <li>Call {@link Writer#writeHeaderAndData(FSDataOutputStream)} as many times as you need to.
709    * store the serialized block into an external stream.
710    * <li>Repeat to write more blocks.
711    * </ol>
712    * <p>
713    */
714   public static class Writer {
715 
716     private enum State {
717       INIT,
718       WRITING,
719       BLOCK_READY
720     };
721 
722     /** Writer state. Used to ensure the correct usage protocol. */
723     private State state = State.INIT;
724 
725     /** Data block encoder used for data blocks */
726     private final HFileDataBlockEncoder dataBlockEncoder;
727 
728     private HFileBlockEncodingContext dataBlockEncodingCtx;
729 
730     /** block encoding context for non-data blocks */
731     private HFileBlockDefaultEncodingContext defaultBlockEncodingCtx;
732 
733     /**
734      * The stream we use to accumulate data in uncompressed format for each
735      * block. We reset this stream at the end of each block and reuse it. The
736      * header is written as the first {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes into this
737      * stream.
738      */
739     private ByteArrayOutputStream baosInMemory;
740 
741     /**
742      * Current block type. Set in {@link #startWriting(BlockType)}. Could be
743      * changed in {@link #finishBlock()} from {@link BlockType#DATA}
744      * to {@link BlockType#ENCODED_DATA}.
745      */
746     private BlockType blockType;
747 
748     /**
749      * A stream that we write uncompressed bytes to, which compresses them and
750      * writes them to {@link #baosInMemory}.
751      */
752     private DataOutputStream userDataStream;
753 
754     // Size of actual data being written. Not considering the block encoding/compression. This
755     // includes the header size also.
756     private int unencodedDataSizeWritten;
757 
758     /**
759      * Bytes to be written to the file system, including the header. Compressed
760      * if compression is turned on. It also includes the checksum data that
761      * immediately follows the block data. (header + data + checksums)
762      */
763     private byte[] onDiskBytesWithHeader;
764 
765     /**
766      * The size of the checksum data on disk. It is used only if data is
767      * not compressed. If data is compressed, then the checksums are already
768      * part of onDiskBytesWithHeader. If data is uncompressed, then this
769      * variable stores the checksum data for this block.
770      */
771     private byte[] onDiskChecksum;
772 
773     /**
774      * Valid in the READY state. Contains the header and the uncompressed (but
775      * potentially encoded, if this is a data block) bytes, so the length is
776      * {@link #uncompressedSizeWithoutHeader} + {@link org.apache.hadoop.hbase.HConstants#HFILEBLOCK_HEADER_SIZE}.
777      * Does not store checksums.
778      */
779     private byte[] uncompressedBytesWithHeader;
780 
781     /**
782      * Current block's start offset in the {@link HFile}. Set in
783      * {@link #writeHeaderAndData(FSDataOutputStream)}.
784      */
785     private long startOffset;
786 
787     /**
788      * Offset of previous block by block type. Updated when the next block is
789      * started.
790      */
791     private long[] prevOffsetByType;
792 
793     /** The offset of the previous block of the same type */
794     private long prevOffset;
795     /** Meta data that holds information about the hfileblock**/
796     private HFileContext fileContext;
797 
798     /**
799      * @param dataBlockEncoder data block encoding algorithm to use
800      */
801     public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext fileContext) {
802       this.dataBlockEncoder = dataBlockEncoder != null
803           ? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE;
804       defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(null,
805           HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
806       dataBlockEncodingCtx = this.dataBlockEncoder
807           .newDataBlockEncodingContext(HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
808 
809       if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) {
810         throw new RuntimeException("Unsupported value of bytesPerChecksum. " +
811             " Minimum is " + HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is " +
812             fileContext.getBytesPerChecksum());
813       }
814 
815       baosInMemory = new ByteArrayOutputStream();
816       
817       prevOffsetByType = new long[BlockType.values().length];
818       for (int i = 0; i < prevOffsetByType.length; ++i)
819         prevOffsetByType[i] = -1;
820 
821       this.fileContext = fileContext;
822     }
823 
824     /**
825      * Starts writing into the block. The previous block's data is discarded.
826      *
827      * @return the stream the user can write their data into
828      * @throws IOException
829      */
830     public DataOutputStream startWriting(BlockType newBlockType)
831         throws IOException {
832       if (state == State.BLOCK_READY && startOffset != -1) {
833         // We had a previous block that was written to a stream at a specific
834         // offset. Save that offset as the last offset of a block of that type.
835         prevOffsetByType[blockType.getId()] = startOffset;
836       }
837 
838       startOffset = -1;
839       blockType = newBlockType;
840 
841       baosInMemory.reset();
842       baosInMemory.write(HConstants.HFILEBLOCK_DUMMY_HEADER);
843 
844       state = State.WRITING;
845 
846       // We will compress it later in finishBlock()
847       userDataStream = new DataOutputStream(baosInMemory);
848       if (newBlockType == BlockType.DATA) {
849         this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream);
850       }
851       this.unencodedDataSizeWritten = 0;
852       return userDataStream;
853     }
854 
855     /**
856      * Writes the Cell to this block
857      * @param cell
858      * @throws IOException
859      */
860     public void write(Cell cell) throws IOException{
861       expectState(State.WRITING);
862       this.unencodedDataSizeWritten += this.dataBlockEncoder.encode(cell, dataBlockEncodingCtx,
863           this.userDataStream);
864     }
865 
866     /**
867      * Returns the stream for the user to write to. The block writer takes care
868      * of handling compression and buffering for caching on write. Can only be
869      * called in the "writing" state.
870      *
871      * @return the data output stream for the user to write to
872      */
873     DataOutputStream getUserDataStream() {
874       expectState(State.WRITING);
875       return userDataStream;
876     }
877 
878     /**
879      * Transitions the block writer from the "writing" state to the "block
880      * ready" state.  Does nothing if a block is already finished.
881      */
882     void ensureBlockReady() throws IOException {
883       Preconditions.checkState(state != State.INIT,
884           "Unexpected state: " + state);
885 
886       if (state == State.BLOCK_READY)
887         return;
888 
889       // This will set state to BLOCK_READY.
890       finishBlock();
891     }
892 
893     /**
894      * An internal method that flushes the compressing stream (if using
895      * compression), serializes the header, and takes care of the separate
896      * uncompressed stream for caching on write, if applicable. Sets block
897      * write state to "block ready".
898      */
899     private void finishBlock() throws IOException {
900       if (blockType == BlockType.DATA) {
901         BufferGrabbingByteArrayOutputStream baosInMemoryCopy = 
902             new BufferGrabbingByteArrayOutputStream();
903         baosInMemory.writeTo(baosInMemoryCopy);
904         this.dataBlockEncoder.endBlockEncoding(dataBlockEncodingCtx, userDataStream,
905             baosInMemoryCopy.buf, blockType);
906         blockType = dataBlockEncodingCtx.getBlockType();
907       }
908       userDataStream.flush();
909       // This does an array copy, so it is safe to cache this byte array.
910       uncompressedBytesWithHeader = baosInMemory.toByteArray();
911       prevOffset = prevOffsetByType[blockType.getId()];
912 
913       // We need to set state before we can package the block up for
914       // cache-on-write. In a way, the block is ready, but not yet encoded or
915       // compressed.
916       state = State.BLOCK_READY;
917       if (blockType == BlockType.DATA || blockType == BlockType.ENCODED_DATA) {
918         onDiskBytesWithHeader = dataBlockEncodingCtx
919             .compressAndEncrypt(uncompressedBytesWithHeader);
920       } else {
921         onDiskBytesWithHeader = defaultBlockEncodingCtx
922             .compressAndEncrypt(uncompressedBytesWithHeader);
923       }
924       int numBytes = (int) ChecksumUtil.numBytes(
925           onDiskBytesWithHeader.length,
926           fileContext.getBytesPerChecksum());
927 
928       // put the header for on disk bytes
929       putHeader(onDiskBytesWithHeader, 0,
930           onDiskBytesWithHeader.length + numBytes,
931           uncompressedBytesWithHeader.length, onDiskBytesWithHeader.length);
932       // set the header for the uncompressed bytes (for cache-on-write)
933       putHeader(uncompressedBytesWithHeader, 0,
934           onDiskBytesWithHeader.length + numBytes,
935           uncompressedBytesWithHeader.length, onDiskBytesWithHeader.length);
936 
937       onDiskChecksum = new byte[numBytes];
938       ChecksumUtil.generateChecksums(
939           onDiskBytesWithHeader, 0, onDiskBytesWithHeader.length,
940           onDiskChecksum, 0, fileContext.getChecksumType(), fileContext.getBytesPerChecksum());
941     }
942 
943     public static class BufferGrabbingByteArrayOutputStream extends ByteArrayOutputStream {
944       private byte[] buf;
945 
946       @Override
947       public void write(byte[] b, int off, int len) {
948         this.buf = b;
949       }
950 
951       public byte[] getBuffer() {
952         return this.buf;
953       }
954     }
955 
956     /**
957      * Put the header into the given byte array at the given offset.
958      * @param onDiskSize size of the block on disk header + data + checksum
959      * @param uncompressedSize size of the block after decompression (but
960      *          before optional data block decoding) including header
961      * @param onDiskDataSize size of the block on disk with header
962      *        and data but not including the checksums
963      */
964     private void putHeader(byte[] dest, int offset, int onDiskSize,
965         int uncompressedSize, int onDiskDataSize) {
966       offset = blockType.put(dest, offset);
967       offset = Bytes.putInt(dest, offset, onDiskSize - HConstants.HFILEBLOCK_HEADER_SIZE);
968       offset = Bytes.putInt(dest, offset, uncompressedSize - HConstants.HFILEBLOCK_HEADER_SIZE);
969       offset = Bytes.putLong(dest, offset, prevOffset);
970       offset = Bytes.putByte(dest, offset, fileContext.getChecksumType().getCode());
971       offset = Bytes.putInt(dest, offset, fileContext.getBytesPerChecksum());
972       Bytes.putInt(dest, offset, onDiskDataSize);
973     }
974 
975     /**
976      * Similar to {@link #writeHeaderAndData(FSDataOutputStream)}, but records
977      * the offset of this block so that it can be referenced in the next block
978      * of the same type.
979      *
980      * @param out
981      * @throws IOException
982      */
983     public void writeHeaderAndData(FSDataOutputStream out) throws IOException {
984       long offset = out.getPos();
985       if (startOffset != -1 && offset != startOffset) {
986         throw new IOException("A " + blockType + " block written to a "
987             + "stream twice, first at offset " + startOffset + ", then at "
988             + offset);
989       }
990       startOffset = offset;
991 
992       finishBlockAndWriteHeaderAndData((DataOutputStream) out);
993     }
994 
995     /**
996      * Writes the header and the compressed data of this block (or uncompressed
997      * data when not using compression) into the given stream. Can be called in
998      * the "writing" state or in the "block ready" state. If called in the
999      * "writing" state, transitions the writer to the "block ready" state.
1000      *
1001      * @param out the output stream to write the
1002      * @throws IOException
1003      */
1004     protected void finishBlockAndWriteHeaderAndData(DataOutputStream out)
1005       throws IOException {
1006       ensureBlockReady();
1007       out.write(onDiskBytesWithHeader);
1008       out.write(onDiskChecksum);
1009     }
1010 
1011     /**
1012      * Returns the header or the compressed data (or uncompressed data when not
1013      * using compression) as a byte array. Can be called in the "writing" state
1014      * or in the "block ready" state. If called in the "writing" state,
1015      * transitions the writer to the "block ready" state. This returns
1016      * the header + data + checksums stored on disk.
1017      *
1018      * @return header and data as they would be stored on disk in a byte array
1019      * @throws IOException
1020      */
1021     byte[] getHeaderAndDataForTest() throws IOException {
1022       ensureBlockReady();
1023       // This is not very optimal, because we are doing an extra copy.
1024       // But this method is used only by unit tests.
1025       byte[] output =
1026           new byte[onDiskBytesWithHeader.length
1027               + onDiskChecksum.length];
1028       System.arraycopy(onDiskBytesWithHeader, 0, output, 0,
1029           onDiskBytesWithHeader.length);
1030       System.arraycopy(onDiskChecksum, 0, output,
1031           onDiskBytesWithHeader.length, onDiskChecksum.length);
1032       return output;
1033     }
1034 
1035     /**
1036      * Releases resources used by this writer.
1037      */
1038     public void release() {
1039       if (dataBlockEncodingCtx != null) {
1040         dataBlockEncodingCtx.close();
1041         dataBlockEncodingCtx = null;
1042       }
1043       if (defaultBlockEncodingCtx != null) {
1044         defaultBlockEncodingCtx.close();
1045         defaultBlockEncodingCtx = null;
1046       }
1047     }
1048 
1049     /**
1050      * Returns the on-disk size of the data portion of the block. This is the
1051      * compressed size if compression is enabled. Can only be called in the
1052      * "block ready" state. Header is not compressed, and its size is not
1053      * included in the return value.
1054      *
1055      * @return the on-disk size of the block, not including the header.
1056      */
1057     int getOnDiskSizeWithoutHeader() {
1058       expectState(State.BLOCK_READY);
1059       return onDiskBytesWithHeader.length + onDiskChecksum.length - HConstants.HFILEBLOCK_HEADER_SIZE;
1060     }
1061 
1062     /**
1063      * Returns the on-disk size of the block. Can only be called in the
1064      * "block ready" state.
1065      *
1066      * @return the on-disk size of the block ready to be written, including the
1067      *         header size, the data and the checksum data.
1068      */
1069     int getOnDiskSizeWithHeader() {
1070       expectState(State.BLOCK_READY);
1071       return onDiskBytesWithHeader.length + onDiskChecksum.length;
1072     }
1073 
1074     /**
1075      * The uncompressed size of the block data. Does not include header size.
1076      */
1077     int getUncompressedSizeWithoutHeader() {
1078       expectState(State.BLOCK_READY);
1079       return uncompressedBytesWithHeader.length - HConstants.HFILEBLOCK_HEADER_SIZE;
1080     }
1081 
1082     /**
1083      * The uncompressed size of the block data, including header size.
1084      */
1085     int getUncompressedSizeWithHeader() {
1086       expectState(State.BLOCK_READY);
1087       return uncompressedBytesWithHeader.length;
1088     }
1089 
1090     /** @return true if a block is being written  */
1091     public boolean isWriting() {
1092       return state == State.WRITING;
1093     }
1094 
1095     /**
1096      * Returns the number of bytes written into the current block so far, or
1097      * zero if not writing the block at the moment. Note that this will return
1098      * zero in the "block ready" state as well.
1099      *
1100      * @return the number of bytes written
1101      */
1102     public int blockSizeWritten() {
1103       if (state != State.WRITING) return 0;
1104       return this.unencodedDataSizeWritten;
1105     }
1106 
1107     /**
1108      * Returns the header followed by the uncompressed data, even if using
1109      * compression. This is needed for storing uncompressed blocks in the block
1110      * cache. Can be called in the "writing" state or the "block ready" state.
1111      * Returns only the header and data, does not include checksum data.
1112      *
1113      * @return uncompressed block bytes for caching on write
1114      */
1115     ByteBuffer getUncompressedBufferWithHeader() {
1116       expectState(State.BLOCK_READY);
1117       return ByteBuffer.wrap(uncompressedBytesWithHeader);
1118     }
1119 
1120     /**
1121      * Returns the header followed by the on-disk (compressed/encoded/encrypted) data. This is
1122      * needed for storing packed blocks in the block cache. Expects calling semantics identical to
1123      * {@link #getUncompressedBufferWithHeader()}. Returns only the header and data,
1124      * Does not include checksum data.
1125      *
1126      * @return packed block bytes for caching on write
1127      */
1128     ByteBuffer getOnDiskBufferWithHeader() {
1129       expectState(State.BLOCK_READY);
1130       return ByteBuffer.wrap(onDiskBytesWithHeader);
1131     }
1132 
1133     private void expectState(State expectedState) {
1134       if (state != expectedState) {
1135         throw new IllegalStateException("Expected state: " + expectedState +
1136             ", actual state: " + state);
1137       }
1138     }
1139 
1140     /**
1141      * Takes the given {@link BlockWritable} instance, creates a new block of
1142      * its appropriate type, writes the writable into this block, and flushes
1143      * the block into the output stream. The writer is instructed not to buffer
1144      * uncompressed bytes for cache-on-write.
1145      *
1146      * @param bw the block-writable object to write as a block
1147      * @param out the file system output stream
1148      * @throws IOException
1149      */
1150     public void writeBlock(BlockWritable bw, FSDataOutputStream out)
1151         throws IOException {
1152       bw.writeToBlock(startWriting(bw.getBlockType()));
1153       writeHeaderAndData(out);
1154     }
1155 
1156     /**
1157      * Creates a new HFileBlock. Checksums have already been validated, so
1158      * the byte buffer passed into the constructor of this newly created
1159      * block does not have checksum data even though the header minor 
1160      * version is MINOR_VERSION_WITH_CHECKSUM. This is indicated by setting a
1161      * 0 value in bytesPerChecksum.
1162      */
1163     public HFileBlock getBlockForCaching(CacheConfig cacheConf) {
1164       HFileContext newContext = new HFileContextBuilder()
1165                                 .withBlockSize(fileContext.getBlocksize())
1166                                 .withBytesPerCheckSum(0)
1167                                 .withChecksumType(ChecksumType.NULL) // no checksums in cached data
1168                                 .withCompression(fileContext.getCompression())
1169                                 .withDataBlockEncoding(fileContext.getDataBlockEncoding())
1170                                 .withHBaseCheckSum(fileContext.isUseHBaseChecksum())
1171                                 .withCompressTags(fileContext.isCompressTags())
1172                                 .withIncludesMvcc(fileContext.isIncludesMvcc())
1173                                 .withIncludesTags(fileContext.isIncludesTags())
1174                                 .build();
1175       return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(),
1176           getUncompressedSizeWithoutHeader(), prevOffset,
1177           cacheConf.shouldCacheCompressed(blockType.getCategory()) ?
1178             getOnDiskBufferWithHeader() :
1179             getUncompressedBufferWithHeader(),
1180           DONT_FILL_HEADER, startOffset,
1181           onDiskBytesWithHeader.length + onDiskChecksum.length, newContext);
1182     }
1183   }
1184 
1185   /** Something that can be written into a block. */
1186   public interface BlockWritable {
1187 
1188     /** The type of block this data should use. */
1189     BlockType getBlockType();
1190 
1191     /**
1192      * Writes the block to the provided stream. Must not write any magic
1193      * records.
1194      *
1195      * @param out a stream to write uncompressed data into
1196      */
1197     void writeToBlock(DataOutput out) throws IOException;
1198   }
1199 
1200   // Block readers and writers
1201 
1202   /** An interface allowing to iterate {@link HFileBlock}s. */
1203   public interface BlockIterator {
1204 
1205     /**
1206      * Get the next block, or null if there are no more blocks to iterate.
1207      */
1208     HFileBlock nextBlock() throws IOException;
1209 
1210     /**
1211      * Similar to {@link #nextBlock()} but checks block type, throws an
1212      * exception if incorrect, and returns the HFile block
1213      */
1214     HFileBlock nextBlockWithBlockType(BlockType blockType) throws IOException;
1215   }
1216 
1217   /** A full-fledged reader with iteration ability. */
1218   public interface FSReader {
1219 
1220     /**
1221      * Reads the block at the given offset in the file with the given on-disk
1222      * size and uncompressed size.
1223      *
1224      * @param offset
1225      * @param onDiskSize the on-disk size of the entire block, including all
1226      *          applicable headers, or -1 if unknown
1227      * @param uncompressedSize the uncompressed size of the compressed part of
1228      *          the block, or -1 if unknown
1229      * @return the newly read block
1230      */
1231     HFileBlock readBlockData(long offset, long onDiskSize,
1232         int uncompressedSize, boolean pread) throws IOException;
1233 
1234     /**
1235      * Creates a block iterator over the given portion of the {@link HFile}.
1236      * The iterator returns blocks starting with offset such that offset <=
1237      * startOffset < endOffset. Returned blocks are always unpacked.
1238      *
1239      * @param startOffset the offset of the block to start iteration with
1240      * @param endOffset the offset to end iteration at (exclusive)
1241      * @return an iterator of blocks between the two given offsets
1242      */
1243     BlockIterator blockRange(long startOffset, long endOffset);
1244 
1245     /** Closes the backing streams */
1246     void closeStreams() throws IOException;
1247 
1248     /** Get a decoder for {@link BlockType#ENCODED_DATA} blocks from this file. */
1249     HFileBlockDecodingContext getBlockDecodingContext();
1250 
1251     /** Get the default decoder for blocks from this file. */
1252     HFileBlockDecodingContext getDefaultBlockDecodingContext();
1253   }
1254 
1255   /**
1256    * A common implementation of some methods of {@link FSReader} and some
1257    * tools for implementing HFile format version-specific block readers.
1258    */
1259   private abstract static class AbstractFSReader implements FSReader {
1260     /** Compression algorithm used by the {@link HFile} */
1261 
1262     /** The size of the file we are reading from, or -1 if unknown. */
1263     protected long fileSize;
1264 
1265     /** The size of the header */
1266     protected final int hdrSize;
1267 
1268     /** The filesystem used to access data */
1269     protected HFileSystem hfs;
1270 
1271     /** The path (if any) where this data is coming from */
1272     protected Path path;
1273 
1274     private final Lock streamLock = new ReentrantLock();
1275 
1276     /** The default buffer size for our buffered streams */
1277     public static final int DEFAULT_BUFFER_SIZE = 1 << 20;
1278 
1279     protected HFileContext fileContext;
1280 
1281     public AbstractFSReader(long fileSize, HFileSystem hfs, Path path, HFileContext fileContext)
1282         throws IOException {
1283       this.fileSize = fileSize;
1284       this.hfs = hfs;
1285       this.path = path;
1286       this.fileContext = fileContext;
1287       this.hdrSize = headerSize(fileContext.isUseHBaseChecksum());
1288     }
1289 
1290     @Override
1291     public BlockIterator blockRange(final long startOffset,
1292         final long endOffset) {
1293       final FSReader owner = this; // handle for inner class
1294       return new BlockIterator() {
1295         private long offset = startOffset;
1296 
1297         @Override
1298         public HFileBlock nextBlock() throws IOException {
1299           if (offset >= endOffset)
1300             return null;
1301           HFileBlock b = readBlockData(offset, -1, -1, false);
1302           offset += b.getOnDiskSizeWithHeader();
1303           return b.unpack(fileContext, owner);
1304         }
1305 
1306         @Override
1307         public HFileBlock nextBlockWithBlockType(BlockType blockType)
1308             throws IOException {
1309           HFileBlock blk = nextBlock();
1310           if (blk.getBlockType() != blockType) {
1311             throw new IOException("Expected block of type " + blockType
1312                 + " but found " + blk.getBlockType());
1313           }
1314           return blk;
1315         }
1316       };
1317     }
1318 
1319     /**
1320      * Does a positional read or a seek and read into the given buffer. Returns
1321      * the on-disk size of the next block, or -1 if it could not be determined.
1322      *
1323      * @param dest destination buffer
1324      * @param destOffset offset in the destination buffer
1325      * @param size size of the block to be read
1326      * @param peekIntoNextBlock whether to read the next block's on-disk size
1327      * @param fileOffset position in the stream to read at
1328      * @param pread whether we should do a positional read
1329      * @param istream The input source of data
1330      * @return the on-disk size of the next block with header size included, or
1331      *         -1 if it could not be determined
1332      * @throws IOException
1333      */
1334     protected int readAtOffset(FSDataInputStream istream,
1335         byte[] dest, int destOffset, int size,
1336         boolean peekIntoNextBlock, long fileOffset, boolean pread)
1337         throws IOException {
1338       if (peekIntoNextBlock &&
1339           destOffset + size + hdrSize > dest.length) {
1340         // We are asked to read the next block's header as well, but there is
1341         // not enough room in the array.
1342         throw new IOException("Attempted to read " + size + " bytes and " +
1343             hdrSize + " bytes of next header into a " + dest.length +
1344             "-byte array at offset " + destOffset);
1345       }
1346 
1347       if (!pread && streamLock.tryLock()) {
1348         // Seek + read. Better for scanning.
1349         try {
1350           istream.seek(fileOffset);
1351 
1352           long realOffset = istream.getPos();
1353           if (realOffset != fileOffset) {
1354             throw new IOException("Tried to seek to " + fileOffset + " to "
1355                 + "read " + size + " bytes, but pos=" + realOffset
1356                 + " after seek");
1357           }
1358 
1359           if (!peekIntoNextBlock) {
1360             IOUtils.readFully(istream, dest, destOffset, size);
1361             return -1;
1362           }
1363 
1364           // Try to read the next block header.
1365           if (!readWithExtra(istream, dest, destOffset, size, hdrSize))
1366             return -1;
1367         } finally {
1368           streamLock.unlock();
1369         }
1370       } else {
1371         // Positional read. Better for random reads; or when the streamLock is already locked.
1372         int extraSize = peekIntoNextBlock ? hdrSize : 0;
1373         int ret = istream.read(fileOffset, dest, destOffset, size + extraSize);
1374         if (ret < size) {
1375           throw new IOException("Positional read of " + size + " bytes " +
1376               "failed at offset " + fileOffset + " (returned " + ret + ")");
1377         }
1378 
1379         if (ret == size || ret < size + extraSize) {
1380           // Could not read the next block's header, or did not try.
1381           return -1;
1382         }
1383       }
1384 
1385       assert peekIntoNextBlock;
1386       return Bytes.toInt(dest, destOffset + size + BlockType.MAGIC_LENGTH) + hdrSize;
1387     }
1388 
1389   }
1390 
1391   /**
1392    * We always prefetch the header of the next block, so that we know its
1393    * on-disk size in advance and can read it in one operation.
1394    */
1395   private static class PrefetchedHeader {
1396     long offset = -1;
1397     byte[] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE];
1398     final ByteBuffer buf = ByteBuffer.wrap(header, 0, HConstants.HFILEBLOCK_HEADER_SIZE);
1399   }
1400 
1401   /** Reads version 2 blocks from the filesystem. */
1402   static class FSReaderV2 extends AbstractFSReader {
1403     /** The file system stream of the underlying {@link HFile} that 
1404      * does or doesn't do checksum validations in the filesystem */
1405     protected FSDataInputStreamWrapper streamWrapper;
1406 
1407     private HFileBlockDecodingContext encodedBlockDecodingCtx;
1408 
1409     /** Default context used when BlockType != {@link BlockType#ENCODED_DATA}. */
1410     private final HFileBlockDefaultDecodingContext defaultDecodingCtx;
1411 
1412     private ThreadLocal<PrefetchedHeader> prefetchedHeaderForThread =
1413         new ThreadLocal<PrefetchedHeader>() {
1414           @Override
1415           public PrefetchedHeader initialValue() {
1416             return new PrefetchedHeader();
1417           }
1418         };
1419 
1420     public FSReaderV2(FSDataInputStreamWrapper stream, long fileSize, HFileSystem hfs, Path path,
1421         HFileContext fileContext) throws IOException {
1422       super(fileSize, hfs, path, fileContext);
1423       this.streamWrapper = stream;
1424       // Older versions of HBase didn't support checksum.
1425       this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum());
1426       defaultDecodingCtx = new HFileBlockDefaultDecodingContext(fileContext);
1427       encodedBlockDecodingCtx = defaultDecodingCtx;
1428     }
1429 
1430     /**
1431      * A constructor that reads files with the latest minor version.
1432      * This is used by unit tests only.
1433      */
1434     FSReaderV2(FSDataInputStream istream, long fileSize, HFileContext fileContext) throws IOException {
1435       this(new FSDataInputStreamWrapper(istream), fileSize, null, null, fileContext);
1436     }
1437 
1438     /**
1439      * Reads a version 2 block. Tries to do as little memory allocation as
1440      * possible, using the provided on-disk size.
1441      *
1442      * @param offset the offset in the stream to read at
1443      * @param onDiskSizeWithHeaderL the on-disk size of the block, including
1444      *          the header, or -1 if unknown
1445      * @param uncompressedSize the uncompressed size of the the block. Always
1446      *          expected to be -1. This parameter is only used in version 1.
1447      * @param pread whether to use a positional read
1448      */
1449     @Override
1450     public HFileBlock readBlockData(long offset, long onDiskSizeWithHeaderL,
1451         int uncompressedSize, boolean pread) throws IOException {
1452 
1453       // get a copy of the current state of whether to validate
1454       // hbase checksums or not for this read call. This is not 
1455       // thread-safe but the one constaint is that if we decide 
1456       // to skip hbase checksum verification then we are 
1457       // guaranteed to use hdfs checksum verification.
1458       boolean doVerificationThruHBaseChecksum = streamWrapper.shouldUseHBaseChecksum();
1459       FSDataInputStream is = streamWrapper.getStream(doVerificationThruHBaseChecksum);
1460 
1461       HFileBlock blk = readBlockDataInternal(is, offset, 
1462                          onDiskSizeWithHeaderL, 
1463                          uncompressedSize, pread,
1464                          doVerificationThruHBaseChecksum);
1465       if (blk == null) {
1466         HFile.LOG.warn("HBase checksum verification failed for file " +
1467                        path + " at offset " +
1468                        offset + " filesize " + fileSize +
1469                        ". Retrying read with HDFS checksums turned on...");
1470 
1471         if (!doVerificationThruHBaseChecksum) {
1472           String msg = "HBase checksum verification failed for file " +
1473                        path + " at offset " +
1474                        offset + " filesize " + fileSize + 
1475                        " but this cannot happen because doVerify is " +
1476                        doVerificationThruHBaseChecksum;
1477           HFile.LOG.warn(msg);
1478           throw new IOException(msg); // cannot happen case here
1479         }
1480         HFile.checksumFailures.incrementAndGet(); // update metrics
1481 
1482         // If we have a checksum failure, we fall back into a mode where
1483         // the next few reads use HDFS level checksums. We aim to make the
1484         // next CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD reads avoid
1485         // hbase checksum verification, but since this value is set without
1486         // holding any locks, it can so happen that we might actually do
1487         // a few more than precisely this number.
1488         is = this.streamWrapper.fallbackToFsChecksum(CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD);
1489         doVerificationThruHBaseChecksum = false;
1490         blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL,
1491                                     uncompressedSize, pread,
1492                                     doVerificationThruHBaseChecksum);
1493         if (blk != null) {
1494           HFile.LOG.warn("HDFS checksum verification suceeded for file " +
1495                          path + " at offset " +
1496                          offset + " filesize " + fileSize);
1497         }
1498       } 
1499       if (blk == null && !doVerificationThruHBaseChecksum) {
1500         String msg = "readBlockData failed, possibly due to " +
1501                      "checksum verification failed for file " + path +
1502                      " at offset " + offset + " filesize " + fileSize;
1503         HFile.LOG.warn(msg);
1504         throw new IOException(msg);
1505       }
1506 
1507       // If there is a checksum mismatch earlier, then retry with 
1508       // HBase checksums switched off and use HDFS checksum verification.
1509       // This triggers HDFS to detect and fix corrupt replicas. The
1510       // next checksumOffCount read requests will use HDFS checksums.
1511       // The decrementing of this.checksumOffCount is not thread-safe,
1512       // but it is harmless because eventually checksumOffCount will be
1513       // a negative number.
1514       streamWrapper.checksumOk();
1515       return blk;
1516     }
1517 
1518     /**
1519      * Reads a version 2 block. 
1520      *
1521      * @param offset the offset in the stream to read at
1522      * @param onDiskSizeWithHeaderL the on-disk size of the block, including
1523      *          the header, or -1 if unknown
1524      * @param uncompressedSize the uncompressed size of the the block. Always
1525      *          expected to be -1. This parameter is only used in version 1.
1526      * @param pread whether to use a positional read
1527      * @param verifyChecksum Whether to use HBase checksums. 
1528      *        If HBase checksum is switched off, then use HDFS checksum.
1529      * @return the HFileBlock or null if there is a HBase checksum mismatch
1530      */
1531     private HFileBlock readBlockDataInternal(FSDataInputStream is, long offset, 
1532         long onDiskSizeWithHeaderL, int uncompressedSize, boolean pread,
1533         boolean verifyChecksum) throws IOException {
1534       if (offset < 0) {
1535         throw new IOException("Invalid offset=" + offset + " trying to read "
1536             + "block (onDiskSize=" + onDiskSizeWithHeaderL
1537             + ", uncompressedSize=" + uncompressedSize + ")");
1538       }
1539       if (uncompressedSize != -1) {
1540         throw new IOException("Version 2 block reader API does not need " +
1541             "the uncompressed size parameter");
1542       }
1543 
1544       if ((onDiskSizeWithHeaderL < hdrSize && onDiskSizeWithHeaderL != -1)
1545           || onDiskSizeWithHeaderL >= Integer.MAX_VALUE) {
1546         throw new IOException("Invalid onDisksize=" + onDiskSizeWithHeaderL
1547             + ": expected to be at least " + hdrSize
1548             + " and at most " + Integer.MAX_VALUE + ", or -1 (offset="
1549             + offset + ", uncompressedSize=" + uncompressedSize + ")");
1550       }
1551 
1552       int onDiskSizeWithHeader = (int) onDiskSizeWithHeaderL;
1553       // See if we can avoid reading the header. This is desirable, because
1554       // we will not incur a backward seek operation if we have already
1555       // read this block's header as part of the previous read's look-ahead.
1556       // And we also want to skip reading the header again if it has already
1557       // been read.
1558       PrefetchedHeader prefetchedHeader = prefetchedHeaderForThread.get();
1559       ByteBuffer headerBuf = prefetchedHeader.offset == offset ?
1560           prefetchedHeader.buf : null;
1561 
1562       int nextBlockOnDiskSize = 0;
1563       // Allocate enough space to fit the next block's header too.
1564       byte[] onDiskBlock = null;
1565 
1566       HFileBlock b = null;
1567       if (onDiskSizeWithHeader > 0) {
1568         // We know the total on-disk size. Read the entire block into memory,
1569         // then parse the header. This code path is used when
1570         // doing a random read operation relying on the block index, as well as
1571         // when the client knows the on-disk size from peeking into the next
1572         // block's header (e.g. this block's header) when reading the previous
1573         // block. This is the faster and more preferable case.
1574 
1575         // Size that we have to skip in case we have already read the header.
1576         int preReadHeaderSize = headerBuf == null ? 0 : hdrSize;
1577         onDiskBlock = new byte[onDiskSizeWithHeader + hdrSize]; // room for this block plus the
1578                                                                 // next block's header
1579         nextBlockOnDiskSize = readAtOffset(is, onDiskBlock,
1580             preReadHeaderSize, onDiskSizeWithHeader - preReadHeaderSize,
1581             true, offset + preReadHeaderSize, pread);
1582         if (headerBuf != null) {
1583           // the header has been read when reading the previous block, copy
1584           // to this block's header
1585           // headerBuf is HBB
1586           System.arraycopy(headerBuf.array(),
1587               headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize);
1588         } else {
1589           headerBuf = ByteBuffer.wrap(onDiskBlock, 0, hdrSize);
1590         }
1591         // We know the total on-disk size but not the uncompressed size. Read
1592         // the entire block into memory, then parse the header. Here we have
1593         // already read the block's header
1594         try {
1595           b = new HFileBlock(headerBuf, fileContext.isUseHBaseChecksum());
1596         } catch (IOException ex) {
1597           // Seen in load testing. Provide comprehensive debug info.
1598           throw new IOException("Failed to read compressed block at "
1599               + offset
1600               + ", onDiskSizeWithoutHeader="
1601               + onDiskSizeWithHeader
1602               + ", preReadHeaderSize="
1603               + hdrSize
1604               + ", header.length="
1605               + prefetchedHeader.header.length
1606               + ", header bytes: "
1607               + Bytes.toStringBinary(prefetchedHeader.header, 0,
1608                   hdrSize), ex);
1609         }
1610         // if the caller specifies a onDiskSizeWithHeader, validate it.
1611         int onDiskSizeWithoutHeader = onDiskSizeWithHeader - hdrSize;
1612         assert onDiskSizeWithoutHeader >= 0;
1613         b.validateOnDiskSizeWithoutHeader(onDiskSizeWithoutHeader);
1614       } else {
1615         // Check headerBuf to see if we have read this block's header as part of
1616         // reading the previous block. This is an optimization of peeking into
1617         // the next block's header (e.g.this block's header) when reading the
1618         // previous block. This is the faster and more preferable case. If the
1619         // header is already there, don't read the header again.
1620 
1621         // Unfortunately, we still have to do a separate read operation to
1622         // read the header.
1623         if (headerBuf == null) {
1624           // From the header, determine the on-disk size of the given hfile
1625           // block, and read the remaining data, thereby incurring two read
1626           // operations. This might happen when we are doing the first read
1627           // in a series of reads or a random read, and we don't have access
1628           // to the block index. This is costly and should happen very rarely.
1629           headerBuf = ByteBuffer.allocate(hdrSize);
1630           // headerBuf is HBB
1631           readAtOffset(is, headerBuf.array(), headerBuf.arrayOffset(),
1632               hdrSize, false, offset, pread);
1633         }
1634         b = new HFileBlock(headerBuf, fileContext.isUseHBaseChecksum());
1635         onDiskBlock = new byte[b.getOnDiskSizeWithHeader() + hdrSize];
1636         // headerBuf is HBB
1637         System.arraycopy(headerBuf.array(), headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize);
1638         nextBlockOnDiskSize =
1639           readAtOffset(is, onDiskBlock, hdrSize, b.getOnDiskSizeWithHeader()
1640               - hdrSize, true, offset + hdrSize, pread);
1641         onDiskSizeWithHeader = b.onDiskSizeWithoutHeader + hdrSize;
1642       }
1643 
1644       if (!fileContext.isCompressedOrEncrypted()) {
1645         b.assumeUncompressed();
1646       }
1647 
1648       if (verifyChecksum && !validateBlockChecksum(b, onDiskBlock, hdrSize)) {
1649         return null;             // checksum mismatch
1650       }
1651 
1652       // The onDiskBlock will become the headerAndDataBuffer for this block.
1653       // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already
1654       // contains the header of next block, so no need to set next
1655       // block's header in it.
1656       b = new HFileBlock(ByteBuffer.wrap(onDiskBlock, 0, onDiskSizeWithHeader),
1657         this.fileContext.isUseHBaseChecksum());
1658 
1659       b.nextBlockOnDiskSizeWithHeader = nextBlockOnDiskSize;
1660 
1661       // Set prefetched header
1662       if (b.hasNextBlockHeader()) {
1663         prefetchedHeader.offset = offset + b.getOnDiskSizeWithHeader();
1664         System.arraycopy(onDiskBlock, onDiskSizeWithHeader,
1665             prefetchedHeader.header, 0, hdrSize);
1666       }
1667 
1668       b.offset = offset;
1669       b.fileContext.setIncludesTags(this.fileContext.isIncludesTags());
1670       b.fileContext.setIncludesMvcc(this.fileContext.isIncludesMvcc());
1671       return b;
1672     }
1673 
1674     void setIncludesMemstoreTS(boolean includesMemstoreTS) {
1675       this.fileContext.setIncludesMvcc(includesMemstoreTS);
1676     }
1677 
1678     void setDataBlockEncoder(HFileDataBlockEncoder encoder) {
1679       encodedBlockDecodingCtx = encoder.newDataBlockDecodingContext(this.fileContext);
1680     }
1681 
1682     @Override
1683     public HFileBlockDecodingContext getBlockDecodingContext() {
1684       return this.encodedBlockDecodingCtx;
1685     }
1686 
1687     @Override
1688     public HFileBlockDecodingContext getDefaultBlockDecodingContext() {
1689       return this.defaultDecodingCtx;
1690     }
1691 
1692     /**
1693      * Generates the checksum for the header as well as the data and
1694      * then validates that it matches the value stored in the header.
1695      * If there is a checksum mismatch, then return false. Otherwise
1696      * return true.
1697      */
1698     protected boolean validateBlockChecksum(HFileBlock block,  byte[] data, int hdrSize)
1699         throws IOException {
1700       return ChecksumUtil.validateBlockChecksum(path, block, data, hdrSize);
1701     }
1702 
1703     @Override
1704     public void closeStreams() throws IOException {
1705       streamWrapper.close();
1706     }
1707 
1708     @Override
1709     public String toString() {
1710       return "FSReaderV2 [ hfs=" + hfs + " path=" + path + " fileContext=" + fileContext + " ]";
1711     }
1712   }
1713 
1714   @Override
1715   public int getSerializedLength() {
1716     if (buf != null) {
1717       // include extra bytes for the next header when it's available.
1718       int extraSpace = hasNextBlockHeader() ? headerSize() : 0;
1719       return this.buf.limit() + extraSpace + HFileBlock.EXTRA_SERIALIZATION_SPACE;
1720     }
1721     return 0;
1722   }
1723 
1724   @Override
1725   public void serialize(ByteBuffer destination) {
1726     ByteBufferUtils.copyFromBufferToBuffer(destination, this.buf, 0, getSerializedLength()
1727         - EXTRA_SERIALIZATION_SPACE);
1728     serializeExtraInfo(destination);
1729   }
1730 
1731   public void serializeExtraInfo(ByteBuffer destination) {
1732     destination.put(this.fileContext.isUseHBaseChecksum() ? (byte) 1 : (byte) 0);
1733     destination.putLong(this.offset);
1734     destination.putInt(this.nextBlockOnDiskSizeWithHeader);
1735     destination.rewind();
1736   }
1737 
1738   @Override
1739   public CacheableDeserializer<Cacheable> getDeserializer() {
1740     return HFileBlock.blockDeserializer;
1741   }
1742 
1743   @Override
1744   public boolean equals(Object comparison) {
1745     if (this == comparison) {
1746       return true;
1747     }
1748     if (comparison == null) {
1749       return false;
1750     }
1751     if (comparison.getClass() != this.getClass()) {
1752       return false;
1753     }
1754 
1755     HFileBlock castedComparison = (HFileBlock) comparison;
1756 
1757     if (castedComparison.blockType != this.blockType) {
1758       return false;
1759     }
1760     if (castedComparison.nextBlockOnDiskSizeWithHeader != this.nextBlockOnDiskSizeWithHeader) {
1761       return false;
1762     }
1763     if (castedComparison.offset != this.offset) {
1764       return false;
1765     }
1766     if (castedComparison.onDiskSizeWithoutHeader != this.onDiskSizeWithoutHeader) {
1767       return false;
1768     }
1769     if (castedComparison.prevBlockOffset != this.prevBlockOffset) {
1770       return false;
1771     }
1772     if (castedComparison.uncompressedSizeWithoutHeader != this.uncompressedSizeWithoutHeader) {
1773       return false;
1774     }
1775     if (ByteBufferUtils.compareTo(this.buf, 0, this.buf.limit(), castedComparison.buf, 0,
1776         castedComparison.buf.limit()) != 0) {
1777       return false;
1778     }
1779     return true;
1780   }
1781 
1782   public DataBlockEncoding getDataBlockEncoding() {
1783     if (blockType == BlockType.ENCODED_DATA) {
1784       return DataBlockEncoding.getEncodingById(getDataBlockEncodingId());
1785     }
1786     return DataBlockEncoding.NONE;
1787   }
1788 
1789   byte getChecksumType() {
1790     return this.fileContext.getChecksumType().getCode();
1791   }
1792 
1793   int getBytesPerChecksum() {
1794     return this.fileContext.getBytesPerChecksum();
1795   }
1796 
1797   /** @return the size of data on disk + header. Excludes checksum. */
1798   int getOnDiskDataSizeWithHeader() {
1799     return this.onDiskDataSizeWithHeader;
1800   }
1801 
1802   /** 
1803    * Calcuate the number of bytes required to store all the checksums
1804    * for this block. Each checksum value is a 4 byte integer.
1805    */
1806   int totalChecksumBytes() {
1807     // If the hfile block has minorVersion 0, then there are no checksum
1808     // data to validate. Similarly, a zero value in this.bytesPerChecksum
1809     // indicates that cached blocks do not have checksum data because
1810     // checksums were already validated when the block was read from disk.
1811     if (!fileContext.isUseHBaseChecksum() || this.fileContext.getBytesPerChecksum() == 0) {
1812       return 0;
1813     }
1814     return (int)ChecksumUtil.numBytes(onDiskDataSizeWithHeader, this.fileContext.getBytesPerChecksum());
1815   }
1816 
1817   /**
1818    * Returns the size of this block header.
1819    */
1820   public int headerSize() {
1821     return headerSize(this.fileContext.isUseHBaseChecksum());
1822   }
1823 
1824   /**
1825    * Maps a minor version to the size of the header.
1826    */
1827   public static int headerSize(boolean usesHBaseChecksum) {
1828     if (usesHBaseChecksum) {
1829       return HConstants.HFILEBLOCK_HEADER_SIZE;
1830     }
1831     return HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
1832   }
1833 
1834   /**
1835    * Return the appropriate DUMMY_HEADER for the minor version
1836    */
1837   public byte[] getDummyHeaderForVersion() {
1838     return getDummyHeaderForVersion(this.fileContext.isUseHBaseChecksum());
1839   }
1840 
1841   /**
1842    * Return the appropriate DUMMY_HEADER for the minor version
1843    */
1844   static private byte[] getDummyHeaderForVersion(boolean usesHBaseChecksum) {
1845     if (usesHBaseChecksum) {
1846       return HConstants.HFILEBLOCK_DUMMY_HEADER;
1847     }
1848     return DUMMY_HEADER_NO_CHECKSUM;
1849   }
1850 
1851   /**
1852    * @return the HFileContext used to create this HFileBlock. Not necessary the
1853    * fileContext for the file from which this block's data was originally read.
1854    */
1855   public HFileContext getHFileContext() {
1856     return this.fileContext;
1857   }
1858 
1859   /**
1860    * Convert the contents of the block header into a human readable string.
1861    * This is mostly helpful for debugging. This assumes that the block
1862    * has minor version > 0.
1863    */
1864   static String toStringHeader(ByteBuffer buf) throws IOException {
1865     byte[] magicBuf = new byte[Math.min(buf.limit() - buf.position(), BlockType.MAGIC_LENGTH)];
1866     buf.get(magicBuf);
1867     BlockType bt = BlockType.parse(magicBuf, 0, BlockType.MAGIC_LENGTH);
1868     int compressedBlockSizeNoHeader = buf.getInt();;
1869     int uncompressedBlockSizeNoHeader = buf.getInt();;
1870     long prevBlockOffset = buf.getLong();
1871     byte cksumtype = buf.get();
1872     long bytesPerChecksum = buf.getInt();
1873     long onDiskDataSizeWithHeader = buf.getInt();
1874     return " Header dump: magic: " + Bytes.toString(magicBuf) +
1875                    " blockType " + bt +
1876                    " compressedBlockSizeNoHeader " + 
1877                    compressedBlockSizeNoHeader +
1878                    " uncompressedBlockSizeNoHeader " + 
1879                    uncompressedBlockSizeNoHeader +
1880                    " prevBlockOffset " + prevBlockOffset +
1881                    " checksumType " + ChecksumType.codeToType(cksumtype) +
1882                    " bytesPerChecksum " + bytesPerChecksum +
1883                    " onDiskDataSizeWithHeader " + onDiskDataSizeWithHeader;
1884   }
1885 }