View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.io.hfile;
19  
20  import java.io.ByteArrayOutputStream;
21  import java.io.DataInputStream;
22  import java.io.DataOutput;
23  import java.io.DataOutputStream;
24  import java.io.IOException;
25  import java.io.InputStream;
26  import java.nio.ByteBuffer;
27  import java.util.concurrent.locks.Lock;
28  import java.util.concurrent.locks.ReentrantLock;
29  
30  import org.apache.hadoop.fs.FSDataInputStream;
31  import org.apache.hadoop.fs.FSDataOutputStream;
32  import org.apache.hadoop.fs.Path;
33  import org.apache.hadoop.hbase.Cell;
34  import org.apache.hadoop.hbase.HConstants;
35  import org.apache.hadoop.hbase.classification.InterfaceAudience;
36  import org.apache.hadoop.hbase.fs.HFileSystem;
37  import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
38  import org.apache.hadoop.hbase.io.ByteBuffInputStream;
39  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
40  import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
41  import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
42  import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
43  import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
44  import org.apache.hadoop.hbase.nio.ByteBuff;
45  import org.apache.hadoop.hbase.nio.MultiByteBuff;
46  import org.apache.hadoop.hbase.nio.SingleByteBuff;
47  import org.apache.hadoop.hbase.util.Bytes;
48  import org.apache.hadoop.hbase.util.ChecksumType;
49  import org.apache.hadoop.hbase.util.ClassSize;
50  import org.apache.hadoop.io.IOUtils;
51  
52  import com.google.common.base.Preconditions;
53  
54  /**
55   * Reading {@link HFile} version 1 and 2 blocks, and writing version 2 blocks.
56   * <ul>
57   * <li>In version 1 all blocks are always compressed or uncompressed, as
58   * specified by the {@link HFile}'s compression algorithm, with a type-specific
59   * magic record stored in the beginning of the compressed data (i.e. one needs
60   * to uncompress the compressed block to determine the block type). There is
61   * only a single compression algorithm setting for all blocks. Offset and size
62   * information from the block index are required to read a block.
63   * <li>In version 2 a block is structured as follows:
64   * <ul>
65   * <li>header (see Writer#finishBlock())
66   * <ul>
67   * <li>Magic record identifying the block type (8 bytes)
68   * <li>Compressed block size, excluding header, including checksum (4 bytes)
69   * <li>Uncompressed block size, excluding header, excluding checksum (4 bytes)
70   * <li>The offset of the previous block of the same type (8 bytes). This is
71   * used to be able to navigate to the previous block without going to the block
72   * <li>For minorVersions &gt;=1, the ordinal describing checksum type (1 byte)
73   * <li>For minorVersions &gt;=1, the number of data bytes/checksum chunk (4 bytes)
74   * <li>For minorVersions &gt;=1, the size of data on disk, including header,
75   * excluding checksums (4 bytes)
76   * </ul>
77   * </li>
78   * <li>Raw/Compressed/Encrypted/Encoded data. The compression algorithm is the
79   * same for all the blocks in the {@link HFile}, similarly to what was done in
80   * version 1.
81   * <li>For minorVersions &gt;=1, a series of 4 byte checksums, one each for
82   * the number of bytes specified by bytesPerChecksum.
83   * </ul>
84   * </ul>
85   */
86  @InterfaceAudience.Private
87  public class HFileBlock implements Cacheable {
88  
89    /**
90     * On a checksum failure on a Reader, these many suceeding read
91     * requests switch back to using hdfs checksums before auto-reenabling
92     * hbase checksum verification.
93     */
94    static final int CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD = 3;
95  
96    public static final boolean FILL_HEADER = true;
97    public static final boolean DONT_FILL_HEADER = false;
98  
99    /**
100    * The size of block header when blockType is {@link BlockType#ENCODED_DATA}.
101    * This extends normal header by adding the id of encoder.
102    */
103   public static final int ENCODED_HEADER_SIZE = HConstants.HFILEBLOCK_HEADER_SIZE
104       + DataBlockEncoding.ID_SIZE;
105 
106   static final byte[] DUMMY_HEADER_NO_CHECKSUM =
107      new byte[HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM];
108 
109   // How to get the estimate correctly? if it is a singleBB?
110   public static final int MULTI_BYTE_BUFFER_HEAP_SIZE = (int) ClassSize.estimateBase(
111       new MultiByteBuff(ByteBuffer.wrap(new byte[0], 0, 0)).getClass(), false);
112 
113   // meta.usesHBaseChecksum+offset+nextBlockOnDiskSizeWithHeader
114   public static final int EXTRA_SERIALIZATION_SPACE = Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT
115       + Bytes.SIZEOF_LONG;
116 
117   /**
118    * Each checksum value is an integer that can be stored in 4 bytes.
119    */
120   static final int CHECKSUM_SIZE = Bytes.SIZEOF_INT;
121 
122   static final CacheableDeserializer<Cacheable> blockDeserializer =
123       new CacheableDeserializer<Cacheable>() {
124         public HFileBlock deserialize(ByteBuff buf, boolean reuse, MemoryType memType)
125             throws IOException {
126           buf.limit(buf.limit() - HFileBlock.EXTRA_SERIALIZATION_SPACE).rewind();
127           ByteBuff newByteBuffer;
128           if (reuse) {
129             newByteBuffer = buf.slice();
130           } else {
131             // Used only in tests
132             int len = buf.limit();
133             newByteBuffer = new SingleByteBuff(ByteBuffer.allocate(len));
134             newByteBuffer.put(0, buf, buf.position(), len);
135           }
136           buf.position(buf.limit());
137           buf.limit(buf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE);
138           boolean usesChecksum = buf.get() == (byte)1;
139           HFileBlock hFileBlock = new HFileBlock(newByteBuffer, usesChecksum, memType);
140           hFileBlock.offset = buf.getLong();
141           hFileBlock.nextBlockOnDiskSizeWithHeader = buf.getInt();
142           if (hFileBlock.hasNextBlockHeader()) {
143             hFileBlock.buf.limit(hFileBlock.buf.limit() - hFileBlock.headerSize());
144           }
145           return hFileBlock;
146         }
147 
148         @Override
149         public int getDeserialiserIdentifier() {
150           return deserializerIdentifier;
151         }
152 
153         @Override
154         public HFileBlock deserialize(ByteBuff b) throws IOException {
155           // Used only in tests
156           return deserialize(b, false, MemoryType.EXCLUSIVE);
157         }
158       };
159   private static final int deserializerIdentifier;
160   static {
161     deserializerIdentifier = CacheableDeserializerIdManager
162         .registerDeserializer(blockDeserializer);
163   }
164 
165   /** Type of block. Header field 0. */
166   private BlockType blockType;
167 
168   /** Size on disk excluding header, including checksum. Header field 1. */
169   private int onDiskSizeWithoutHeader;
170 
171   /** Size of pure data. Does not include header or checksums. Header field 2. */
172   private final int uncompressedSizeWithoutHeader;
173 
174   /** The offset of the previous block on disk. Header field 3. */
175   private final long prevBlockOffset;
176 
177   /**
178    * Size on disk of header + data. Excludes checksum. Header field 6,
179    * OR calculated from {@link #onDiskSizeWithoutHeader} when using HDFS checksum.
180    */
181   private final int onDiskDataSizeWithHeader;
182 
183   /** The in-memory representation of the hfile block */
184   private ByteBuff buf;
185 
186   /** Meta data that holds meta information on the hfileblock */
187   private HFileContext fileContext;
188 
189   /**
190    * The offset of this block in the file. Populated by the reader for
191    * convenience of access. This offset is not part of the block header.
192    */
193   private long offset = -1;
194 
195   /**
196    * The on-disk size of the next block, including the header, obtained by
197    * peeking into the first {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the next block's
198    * header, or -1 if unknown.
199    */
200   private int nextBlockOnDiskSizeWithHeader = -1;
201 
202   private MemoryType memType = MemoryType.EXCLUSIVE;
203 
204   /**
205    * Creates a new {@link HFile} block from the given fields. This constructor
206    * is mostly used when the block data has already been read and uncompressed,
207    * and is sitting in a byte buffer.
208    *
209    * @param blockType the type of this block, see {@link BlockType}
210    * @param onDiskSizeWithoutHeader see {@link #onDiskSizeWithoutHeader}
211    * @param uncompressedSizeWithoutHeader see {@link #uncompressedSizeWithoutHeader}
212    * @param prevBlockOffset see {@link #prevBlockOffset}
213    * @param buf block header ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes) followed by
214    *          uncompressed data. This
215    * @param fillHeader when true, parse {@code buf} and override the first 4 header fields.
216    * @param offset the file offset the block was read from
217    * @param onDiskDataSizeWithHeader see {@link #onDiskDataSizeWithHeader}
218    * @param fileContext HFile meta data
219    */
220   HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader, int uncompressedSizeWithoutHeader,
221       long prevBlockOffset, ByteBuff buf, boolean fillHeader, long offset,
222       int onDiskDataSizeWithHeader, HFileContext fileContext) {
223     this.blockType = blockType;
224     this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader;
225     this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader;
226     this.prevBlockOffset = prevBlockOffset;
227     this.buf = buf;
228     this.offset = offset;
229     this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader;
230     this.fileContext = fileContext;
231     if (fillHeader)
232       overwriteHeader();
233     this.buf.rewind();
234   }
235 
236   HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader, int uncompressedSizeWithoutHeader,
237       long prevBlockOffset, ByteBuffer buf, boolean fillHeader, long offset,
238       int onDiskDataSizeWithHeader, HFileContext fileContext) {
239     this(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, prevBlockOffset,
240         new SingleByteBuff(buf), fillHeader, offset, onDiskDataSizeWithHeader, fileContext);
241   }
242 
243   /**
244    * Copy constructor. Creates a shallow copy of {@code that}'s buffer.
245    */
246   HFileBlock(HFileBlock that) {
247     this.blockType = that.blockType;
248     this.onDiskSizeWithoutHeader = that.onDiskSizeWithoutHeader;
249     this.uncompressedSizeWithoutHeader = that.uncompressedSizeWithoutHeader;
250     this.prevBlockOffset = that.prevBlockOffset;
251     this.buf = that.buf.duplicate();
252     this.offset = that.offset;
253     this.onDiskDataSizeWithHeader = that.onDiskDataSizeWithHeader;
254     this.fileContext = that.fileContext;
255     this.nextBlockOnDiskSizeWithHeader = that.nextBlockOnDiskSizeWithHeader;
256   }
257 
258   HFileBlock(ByteBuffer b, boolean usesHBaseChecksum) throws IOException {
259     this(new SingleByteBuff(b), usesHBaseChecksum);
260   }
261 
262   /**
263    * Creates a block from an existing buffer starting with a header. Rewinds
264    * and takes ownership of the buffer. By definition of rewind, ignores the
265    * buffer position, but if you slice the buffer beforehand, it will rewind
266    * to that point.
267    */
268   HFileBlock(ByteBuff b, boolean usesHBaseChecksum) throws IOException {
269     this(b, usesHBaseChecksum, MemoryType.EXCLUSIVE);
270   }
271 
272   /**
273    * Creates a block from an existing buffer starting with a header. Rewinds
274    * and takes ownership of the buffer. By definition of rewind, ignores the
275    * buffer position, but if you slice the buffer beforehand, it will rewind
276    * to that point.
277    */
278   HFileBlock(ByteBuff b, boolean usesHBaseChecksum, MemoryType memType) throws IOException {
279     b.rewind();
280     blockType = BlockType.read(b);
281     onDiskSizeWithoutHeader = b.getInt();
282     uncompressedSizeWithoutHeader = b.getInt();
283     prevBlockOffset = b.getLong();
284     HFileContextBuilder contextBuilder = new HFileContextBuilder();
285     contextBuilder.withHBaseCheckSum(usesHBaseChecksum);
286     if (usesHBaseChecksum) {
287       contextBuilder.withChecksumType(ChecksumType.codeToType(b.get()));
288       contextBuilder.withBytesPerCheckSum(b.getInt());
289       this.onDiskDataSizeWithHeader = b.getInt();
290     } else {
291       contextBuilder.withChecksumType(ChecksumType.NULL);
292       contextBuilder.withBytesPerCheckSum(0);
293       this.onDiskDataSizeWithHeader = onDiskSizeWithoutHeader +
294                                        HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
295     }
296     this.fileContext = contextBuilder.build();
297     this.memType = memType;
298     buf = b;
299     buf.rewind();
300   }
301 
302   public BlockType getBlockType() {
303     return blockType;
304   }
305 
306   /** @return get data block encoding id that was used to encode this block */
307   public short getDataBlockEncodingId() {
308     if (blockType != BlockType.ENCODED_DATA) {
309       throw new IllegalArgumentException("Querying encoder ID of a block " +
310           "of type other than " + BlockType.ENCODED_DATA + ": " + blockType);
311     }
312     return buf.getShort(headerSize());
313   }
314 
315   /**
316    * @return the on-disk size of header + data part + checksum.
317    */
318   public int getOnDiskSizeWithHeader() {
319     return onDiskSizeWithoutHeader + headerSize();
320   }
321 
322   /**
323    * @return the on-disk size of the data part + checksum (header excluded).
324    */
325   public int getOnDiskSizeWithoutHeader() {
326     return onDiskSizeWithoutHeader;
327   }
328 
329   /**
330    * @return the uncompressed size of data part (header and checksum excluded).
331    */
332    public int getUncompressedSizeWithoutHeader() {
333     return uncompressedSizeWithoutHeader;
334   }
335 
336   /**
337    * @return the offset of the previous block of the same type in the file, or
338    *         -1 if unknown
339    */
340   public long getPrevBlockOffset() {
341     return prevBlockOffset;
342   }
343 
344   /**
345    * Rewinds {@code buf} and writes first 4 header fields. {@code buf} position
346    * is modified as side-effect.
347    */
348   private void overwriteHeader() {
349     buf.rewind();
350     blockType.write(buf);
351     buf.putInt(onDiskSizeWithoutHeader);
352     buf.putInt(uncompressedSizeWithoutHeader);
353     buf.putLong(prevBlockOffset);
354     if (this.fileContext.isUseHBaseChecksum()) {
355       buf.put(fileContext.getChecksumType().getCode());
356       buf.putInt(fileContext.getBytesPerChecksum());
357       buf.putInt(onDiskDataSizeWithHeader);
358     }
359   }
360 
361   /**
362    * Returns a buffer that does not include the header or checksum.
363    *
364    * @return the buffer with header skipped and checksum omitted.
365    */
366   public ByteBuff getBufferWithoutHeader() {
367     ByteBuff dup = this.buf.duplicate();
368     dup.position(headerSize());
369     dup.limit(buf.limit() - totalChecksumBytes());
370     return dup.slice();
371   }
372 
373   /**
374    * Returns the buffer this block stores internally. The clients must not
375    * modify the buffer object. This method has to be public because it is used
376    * in {@link CompoundBloomFilter} to avoid object creation on every Bloom
377    * filter lookup, but has to be used with caution. Checksum data is not
378    * included in the returned buffer but header data is.
379    *
380    * @return the buffer of this block for read-only operations
381    */
382   public ByteBuff getBufferReadOnly() {
383     ByteBuff dup = this.buf.duplicate();
384     dup.limit(buf.limit() - totalChecksumBytes());
385     return dup.slice();
386   }
387 
388   /**
389    * Returns the buffer of this block, including header data. The clients must
390    * not modify the buffer object. This method has to be public because it is
391    * used in {@link org.apache.hadoop.hbase.io.hfile.bucket.BucketCache} to avoid buffer copy.
392    *
393    * @return the buffer with header and checksum included for read-only operations
394    */
395   public ByteBuff getBufferReadOnlyWithHeader() {
396     ByteBuff dup = this.buf.duplicate();
397     return dup.slice();
398   }
399 
400   /**
401    * Returns a byte buffer of this block, including header data and checksum, positioned at
402    * the beginning of header. The underlying data array is not copied.
403    *
404    * @return the byte buffer with header and checksum included
405    */
406   ByteBuff getBufferWithHeader() {
407     ByteBuff dupBuf = buf.duplicate();
408     dupBuf.rewind();
409     return dupBuf;
410   }
411 
412   private void sanityCheckAssertion(long valueFromBuf, long valueFromField,
413       String fieldName) throws IOException {
414     if (valueFromBuf != valueFromField) {
415       throw new AssertionError(fieldName + " in the buffer (" + valueFromBuf
416           + ") is different from that in the field (" + valueFromField + ")");
417     }
418   }
419 
420   private void sanityCheckAssertion(BlockType valueFromBuf, BlockType valueFromField)
421       throws IOException {
422     if (valueFromBuf != valueFromField) {
423       throw new IOException("Block type stored in the buffer: " +
424         valueFromBuf + ", block type field: " + valueFromField);
425     }
426   }
427 
428   /**
429    * Checks if the block is internally consistent, i.e. the first
430    * {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the buffer contain a
431    * valid header consistent with the fields. Assumes a packed block structure.
432    * This function is primary for testing and debugging, and is not
433    * thread-safe, because it alters the internal buffer pointer.
434    */
435   void sanityCheck() throws IOException {
436     buf.rewind();
437 
438     sanityCheckAssertion(BlockType.read(buf), blockType);
439 
440     sanityCheckAssertion(buf.getInt(), onDiskSizeWithoutHeader,
441         "onDiskSizeWithoutHeader");
442 
443     sanityCheckAssertion(buf.getInt(), uncompressedSizeWithoutHeader,
444         "uncompressedSizeWithoutHeader");
445 
446     sanityCheckAssertion(buf.getLong(), prevBlockOffset, "prevBlocKOffset");
447     if (this.fileContext.isUseHBaseChecksum()) {
448       sanityCheckAssertion(buf.get(), this.fileContext.getChecksumType().getCode(), "checksumType");
449       sanityCheckAssertion(buf.getInt(), this.fileContext.getBytesPerChecksum(),
450           "bytesPerChecksum");
451       sanityCheckAssertion(buf.getInt(), onDiskDataSizeWithHeader, "onDiskDataSizeWithHeader");
452     }
453 
454     int cksumBytes = totalChecksumBytes();
455     int expectedBufLimit = onDiskDataSizeWithHeader + cksumBytes;
456     if (buf.limit() != expectedBufLimit) {
457       throw new AssertionError("Expected buffer limit " + expectedBufLimit
458           + ", got " + buf.limit());
459     }
460 
461     // We might optionally allocate HFILEBLOCK_HEADER_SIZE more bytes to read the next
462     // block's header, so there are two sensible values for buffer capacity.
463     int hdrSize = headerSize();
464     if (buf.capacity() != expectedBufLimit &&
465         buf.capacity() != expectedBufLimit + hdrSize) {
466       throw new AssertionError("Invalid buffer capacity: " + buf.capacity() +
467           ", expected " + expectedBufLimit + " or " + (expectedBufLimit + hdrSize));
468     }
469   }
470 
471   @Override
472   public String toString() {
473     StringBuilder sb = new StringBuilder()
474       .append("HFileBlock [")
475       .append(" fileOffset=").append(offset)
476       .append(" headerSize()=").append(headerSize())
477       .append(" blockType=").append(blockType)
478       .append(" onDiskSizeWithoutHeader=").append(onDiskSizeWithoutHeader)
479       .append(" uncompressedSizeWithoutHeader=").append(uncompressedSizeWithoutHeader)
480       .append(" prevBlockOffset=").append(prevBlockOffset)
481       .append(" isUseHBaseChecksum()=").append(fileContext.isUseHBaseChecksum());
482     if (fileContext.isUseHBaseChecksum()) {
483       sb.append(" checksumType=").append(ChecksumType.codeToType(this.buf.get(24)))
484         .append(" bytesPerChecksum=").append(this.buf.getInt(24 + 1))
485         .append(" onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader);
486     } else {
487       sb.append(" onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader)
488         .append("(").append(onDiskSizeWithoutHeader)
489         .append("+").append(HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM).append(")");
490     }
491     String dataBegin = null;
492     if (buf.hasArray()) {
493       dataBegin = Bytes.toStringBinary(buf.array(), buf.arrayOffset() + headerSize(),
494           Math.min(32, buf.limit() - buf.arrayOffset() - headerSize()));
495     } else {
496       ByteBuff bufWithoutHeader = getBufferWithoutHeader();
497       byte[] dataBeginBytes = new byte[Math.min(32,
498           bufWithoutHeader.limit() - bufWithoutHeader.position())];
499       bufWithoutHeader.get(dataBeginBytes);
500       dataBegin = Bytes.toStringBinary(dataBeginBytes);
501     }
502     sb.append(" getOnDiskSizeWithHeader()=").append(getOnDiskSizeWithHeader())
503       .append(" totalChecksumBytes()=").append(totalChecksumBytes())
504       .append(" isUnpacked()=").append(isUnpacked())
505       .append(" buf=[ ").append(buf).append(" ]")
506       .append(" dataBeginsWith=").append(dataBegin)
507       .append(" fileContext=").append(fileContext)
508       .append(" ]");
509     return sb.toString();
510   }
511 
512   /**
513    * Called after reading a block with provided onDiskSizeWithHeader.
514    */
515   private void validateOnDiskSizeWithoutHeader(int expectedOnDiskSizeWithoutHeader)
516   throws IOException {
517     if (onDiskSizeWithoutHeader != expectedOnDiskSizeWithoutHeader) {
518       String dataBegin = null;
519       if (buf.hasArray()) {
520         dataBegin = Bytes.toStringBinary(buf.array(), buf.arrayOffset(), Math.min(32, buf.limit()));
521       } else {
522         ByteBuff bufDup = getBufferReadOnly();
523         byte[] dataBeginBytes = new byte[Math.min(32, bufDup.limit() - bufDup.position())];
524         bufDup.get(dataBeginBytes);
525         dataBegin = Bytes.toStringBinary(dataBeginBytes);
526       }
527       String blockInfoMsg =
528         "Block offset: " + offset + ", data starts with: " + dataBegin;
529       throw new IOException("On-disk size without header provided is "
530           + expectedOnDiskSizeWithoutHeader + ", but block "
531           + "header contains " + onDiskSizeWithoutHeader + ". " +
532           blockInfoMsg);
533     }
534   }
535 
536   /**
537    * Retrieves the decompressed/decrypted view of this block. An encoded block remains in its
538    * encoded structure. Internal structures are shared between instances where applicable.
539    */
540   HFileBlock unpack(HFileContext fileContext, FSReader reader) throws IOException {
541     if (!fileContext.isCompressedOrEncrypted()) {
542       // TODO: cannot use our own fileContext here because HFileBlock(ByteBuffer, boolean),
543       // which is used for block serialization to L2 cache, does not preserve encoding and
544       // encryption details.
545       return this;
546     }
547 
548     HFileBlock unpacked = new HFileBlock(this);
549     unpacked.allocateBuffer(); // allocates space for the decompressed block
550 
551     HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA ?
552       reader.getBlockDecodingContext() : reader.getDefaultBlockDecodingContext();
553 
554     ByteBuff dup = this.buf.duplicate();
555     dup.position(this.headerSize());
556     dup = dup.slice();
557     ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(),
558       unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(),
559       dup);
560 
561     // Preserve the next block's header bytes in the new block if we have them.
562     if (unpacked.hasNextBlockHeader()) {
563       // Both the buffers are limited till checksum bytes and avoid the next block's header.
564       // Below call to copyFromBufferToBuffer() will try positional read/write from/to buffers when
565       // any of the buffer is DBB. So we change the limit on a dup buffer. No copying just create
566       // new BB objects
567       ByteBuff inDup = this.buf.duplicate();
568       inDup.limit(inDup.limit() + headerSize());
569       ByteBuff outDup = unpacked.buf.duplicate();
570       outDup.limit(outDup.limit() + unpacked.headerSize());
571       outDup.put(
572           unpacked.headerSize() + unpacked.uncompressedSizeWithoutHeader
573               + unpacked.totalChecksumBytes(), inDup, this.onDiskDataSizeWithHeader,
574           unpacked.headerSize());
575     }
576     return unpacked;
577   }
578 
579   /**
580    * Return true when this buffer includes next block's header.
581    */
582   private boolean hasNextBlockHeader() {
583     return nextBlockOnDiskSizeWithHeader > 0;
584   }
585 
586   /**
587    * Always allocates a new buffer of the correct size. Copies header bytes
588    * from the existing buffer. Does not change header fields.
589    * Reserve room to keep checksum bytes too.
590    */
591   private void allocateBuffer() {
592     int cksumBytes = totalChecksumBytes();
593     int headerSize = headerSize();
594     int capacityNeeded = headerSize + uncompressedSizeWithoutHeader +
595         cksumBytes + (hasNextBlockHeader() ? headerSize : 0);
596 
597     // TODO we need consider allocating offheap here?
598     ByteBuffer newBuf = ByteBuffer.allocate(capacityNeeded);
599 
600     // Copy header bytes into newBuf.
601     // newBuf is HBB so no issue in calling array()
602     buf.position(0);
603     buf.get(newBuf.array(), newBuf.arrayOffset(), headerSize);
604 
605     buf = new SingleByteBuff(newBuf);
606     // set limit to exclude next block's header
607     buf.limit(headerSize + uncompressedSizeWithoutHeader + cksumBytes);
608   }
609 
610   /**
611    * Return true when this block's buffer has been unpacked, false otherwise. Note this is a
612    * calculated heuristic, not tracked attribute of the block.
613    */
614   public boolean isUnpacked() {
615     final int cksumBytes = totalChecksumBytes();
616     final int headerSize = headerSize();
617     final int expectedCapacity = headerSize + uncompressedSizeWithoutHeader + cksumBytes;
618     final int bufCapacity = buf.capacity();
619     return bufCapacity == expectedCapacity || bufCapacity == expectedCapacity + headerSize;
620   }
621 
622   /** An additional sanity-check in case no compression or encryption is being used. */
623   public void assumeUncompressed() throws IOException {
624     if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader +
625         totalChecksumBytes()) {
626       throw new IOException("Using no compression but "
627           + "onDiskSizeWithoutHeader=" + onDiskSizeWithoutHeader + ", "
628           + "uncompressedSizeWithoutHeader=" + uncompressedSizeWithoutHeader
629           + ", numChecksumbytes=" + totalChecksumBytes());
630     }
631   }
632 
633   /**
634    * @param expectedType the expected type of this block
635    * @throws IOException if this block's type is different than expected
636    */
637   public void expectType(BlockType expectedType) throws IOException {
638     if (blockType != expectedType) {
639       throw new IOException("Invalid block type: expected=" + expectedType
640           + ", actual=" + blockType);
641     }
642   }
643 
644   /** @return the offset of this block in the file it was read from */
645   public long getOffset() {
646     if (offset < 0) {
647       throw new IllegalStateException(
648           "HFile block offset not initialized properly");
649     }
650     return offset;
651   }
652 
653   /**
654    * @return a byte stream reading the data + checksum of this block
655    */
656   public DataInputStream getByteStream() {
657     ByteBuff dup = this.buf.duplicate();
658     dup.position(this.headerSize());
659     return new DataInputStream(new ByteBuffInputStream(dup));
660   }
661 
662   @Override
663   public long heapSize() {
664     long size = ClassSize.align(
665         ClassSize.OBJECT +
666         // Block type, multi byte buffer, MemoryType and meta references
667         4 * ClassSize.REFERENCE +
668         // On-disk size, uncompressed size, and next block's on-disk size
669         // bytePerChecksum and onDiskDataSize
670         4 * Bytes.SIZEOF_INT +
671         // This and previous block offset
672         2 * Bytes.SIZEOF_LONG +
673         // Heap size of the meta object. meta will be always not null.
674         fileContext.heapSize()
675     );
676 
677     if (buf != null) {
678       // Deep overhead of the byte buffer. Needs to be aligned separately.
679       size += ClassSize.align(buf.capacity() + MULTI_BYTE_BUFFER_HEAP_SIZE);
680     }
681 
682     return ClassSize.align(size);
683   }
684 
685   /**
686    * Read from an input stream. Analogous to
687    * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but specifies a
688    * number of "extra" bytes that would be desirable but not absolutely
689    * necessary to read.
690    *
691    * @param in the input stream to read from
692    * @param buf the buffer to read into
693    * @param bufOffset the destination offset in the buffer
694    * @param necessaryLen the number of bytes that are absolutely necessary to
695    *          read
696    * @param extraLen the number of extra bytes that would be nice to read
697    * @return true if succeeded reading the extra bytes
698    * @throws IOException if failed to read the necessary bytes
699    */
700   public static boolean readWithExtra(InputStream in, byte[] buf,
701       int bufOffset, int necessaryLen, int extraLen) throws IOException {
702     int bytesRemaining = necessaryLen + extraLen;
703     while (bytesRemaining > 0) {
704       int ret = in.read(buf, bufOffset, bytesRemaining);
705       if (ret == -1 && bytesRemaining <= extraLen) {
706         // We could not read the "extra data", but that is OK.
707         break;
708       }
709 
710       if (ret < 0) {
711         throw new IOException("Premature EOF from inputStream (read "
712             + "returned " + ret + ", was trying to read " + necessaryLen
713             + " necessary bytes and " + extraLen + " extra bytes, "
714             + "successfully read "
715             + (necessaryLen + extraLen - bytesRemaining));
716       }
717       bufOffset += ret;
718       bytesRemaining -= ret;
719     }
720     return bytesRemaining <= 0;
721   }
722 
723   /**
724    * @return the on-disk size of the next block (including the header size)
725    *         that was read by peeking into the next block's header
726    */
727   public int getNextBlockOnDiskSizeWithHeader() {
728     return nextBlockOnDiskSizeWithHeader;
729   }
730 
731   /**
732    * Unified version 2 {@link HFile} block writer. The intended usage pattern
733    * is as follows:
734    * <ol>
735    * <li>Construct an {@link HFileBlock.Writer}, providing a compression algorithm.
736    * <li>Call {@link Writer#startWriting} and get a data stream to write to.
737    * <li>Write your data into the stream.
738    * <li>Call {@link Writer#writeHeaderAndData(FSDataOutputStream)} as many times as you need to.
739    * store the serialized block into an external stream.
740    * <li>Repeat to write more blocks.
741    * </ol>
742    * <p>
743    */
744   public static class Writer {
745 
746     private enum State {
747       INIT,
748       WRITING,
749       BLOCK_READY
750     };
751 
752     /** Writer state. Used to ensure the correct usage protocol. */
753     private State state = State.INIT;
754 
755     /** Data block encoder used for data blocks */
756     private final HFileDataBlockEncoder dataBlockEncoder;
757 
758     private HFileBlockEncodingContext dataBlockEncodingCtx;
759 
760     /** block encoding context for non-data blocks */
761     private HFileBlockDefaultEncodingContext defaultBlockEncodingCtx;
762 
763     /**
764      * The stream we use to accumulate data in uncompressed format for each
765      * block. We reset this stream at the end of each block and reuse it. The
766      * header is written as the first {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes into this
767      * stream.
768      */
769     private ByteArrayOutputStream baosInMemory;
770 
771     /**
772      * Current block type. Set in {@link #startWriting(BlockType)}. Could be
773      * changed in {@link #finishBlock()} from {@link BlockType#DATA}
774      * to {@link BlockType#ENCODED_DATA}.
775      */
776     private BlockType blockType;
777 
778     /**
779      * A stream that we write uncompressed bytes to, which compresses them and
780      * writes them to {@link #baosInMemory}.
781      */
782     private DataOutputStream userDataStream;
783 
784     // Size of actual data being written. Not considering the block encoding/compression. This
785     // includes the header size also.
786     private int unencodedDataSizeWritten;
787 
788     /**
789      * Bytes to be written to the file system, including the header. Compressed
790      * if compression is turned on. It also includes the checksum data that
791      * immediately follows the block data. (header + data + checksums)
792      */
793     private byte[] onDiskBytesWithHeader;
794 
795     /**
796      * The size of the checksum data on disk. It is used only if data is
797      * not compressed. If data is compressed, then the checksums are already
798      * part of onDiskBytesWithHeader. If data is uncompressed, then this
799      * variable stores the checksum data for this block.
800      */
801     private byte[] onDiskChecksum;
802 
803     /**
804      * Valid in the READY state. Contains the header and the uncompressed (but
805      * potentially encoded, if this is a data block) bytes, so the length is
806      * {@link #uncompressedSizeWithoutHeader} +
807      * {@link org.apache.hadoop.hbase.HConstants#HFILEBLOCK_HEADER_SIZE}.
808      * Does not store checksums.
809      */
810     private byte[] uncompressedBytesWithHeader;
811 
812     /**
813      * Current block's start offset in the {@link HFile}. Set in
814      * {@link #writeHeaderAndData(FSDataOutputStream)}.
815      */
816     private long startOffset;
817 
818     /**
819      * Offset of previous block by block type. Updated when the next block is
820      * started.
821      */
822     private long[] prevOffsetByType;
823 
824     /** The offset of the previous block of the same type */
825     private long prevOffset;
826     /** Meta data that holds information about the hfileblock**/
827     private HFileContext fileContext;
828 
829     /**
830      * @param dataBlockEncoder data block encoding algorithm to use
831      */
832     public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext fileContext) {
833       this.dataBlockEncoder = dataBlockEncoder != null
834           ? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE;
835       defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(null,
836           HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
837       dataBlockEncodingCtx = this.dataBlockEncoder
838           .newDataBlockEncodingContext(HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
839 
840       if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) {
841         throw new RuntimeException("Unsupported value of bytesPerChecksum. " +
842             " Minimum is " + HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is " +
843             fileContext.getBytesPerChecksum());
844       }
845 
846       baosInMemory = new ByteArrayOutputStream();
847 
848       prevOffsetByType = new long[BlockType.values().length];
849       for (int i = 0; i < prevOffsetByType.length; ++i)
850         prevOffsetByType[i] = -1;
851 
852       this.fileContext = fileContext;
853     }
854 
855     /**
856      * Starts writing into the block. The previous block's data is discarded.
857      *
858      * @return the stream the user can write their data into
859      * @throws IOException
860      */
861     public DataOutputStream startWriting(BlockType newBlockType)
862         throws IOException {
863       if (state == State.BLOCK_READY && startOffset != -1) {
864         // We had a previous block that was written to a stream at a specific
865         // offset. Save that offset as the last offset of a block of that type.
866         prevOffsetByType[blockType.getId()] = startOffset;
867       }
868 
869       startOffset = -1;
870       blockType = newBlockType;
871 
872       baosInMemory.reset();
873       baosInMemory.write(HConstants.HFILEBLOCK_DUMMY_HEADER);
874 
875       state = State.WRITING;
876 
877       // We will compress it later in finishBlock()
878       userDataStream = new DataOutputStream(baosInMemory);
879       if (newBlockType == BlockType.DATA) {
880         this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream);
881       }
882       this.unencodedDataSizeWritten = 0;
883       return userDataStream;
884     }
885 
886     /**
887      * Writes the Cell to this block
888      * @param cell
889      * @throws IOException
890      */
891     public void write(Cell cell) throws IOException{
892       expectState(State.WRITING);
893       this.unencodedDataSizeWritten += this.dataBlockEncoder.encode(cell, dataBlockEncodingCtx,
894           this.userDataStream);
895     }
896 
897     /**
898      * Returns the stream for the user to write to. The block writer takes care
899      * of handling compression and buffering for caching on write. Can only be
900      * called in the "writing" state.
901      *
902      * @return the data output stream for the user to write to
903      */
904     DataOutputStream getUserDataStream() {
905       expectState(State.WRITING);
906       return userDataStream;
907     }
908 
909     /**
910      * Transitions the block writer from the "writing" state to the "block
911      * ready" state.  Does nothing if a block is already finished.
912      */
913     void ensureBlockReady() throws IOException {
914       Preconditions.checkState(state != State.INIT,
915           "Unexpected state: " + state);
916 
917       if (state == State.BLOCK_READY)
918         return;
919 
920       // This will set state to BLOCK_READY.
921       finishBlock();
922     }
923 
924     /**
925      * An internal method that flushes the compressing stream (if using
926      * compression), serializes the header, and takes care of the separate
927      * uncompressed stream for caching on write, if applicable. Sets block
928      * write state to "block ready".
929      */
930     private void finishBlock() throws IOException {
931       if (blockType == BlockType.DATA) {
932         BufferGrabbingByteArrayOutputStream baosInMemoryCopy =
933             new BufferGrabbingByteArrayOutputStream();
934         baosInMemory.writeTo(baosInMemoryCopy);
935         this.dataBlockEncoder.endBlockEncoding(dataBlockEncodingCtx, userDataStream,
936             baosInMemoryCopy.buf, blockType);
937         blockType = dataBlockEncodingCtx.getBlockType();
938       }
939       userDataStream.flush();
940       // This does an array copy, so it is safe to cache this byte array.
941       uncompressedBytesWithHeader = baosInMemory.toByteArray();
942       prevOffset = prevOffsetByType[blockType.getId()];
943 
944       // We need to set state before we can package the block up for
945       // cache-on-write. In a way, the block is ready, but not yet encoded or
946       // compressed.
947       state = State.BLOCK_READY;
948       if (blockType == BlockType.DATA || blockType == BlockType.ENCODED_DATA) {
949         onDiskBytesWithHeader = dataBlockEncodingCtx
950             .compressAndEncrypt(uncompressedBytesWithHeader);
951       } else {
952         onDiskBytesWithHeader = defaultBlockEncodingCtx
953             .compressAndEncrypt(uncompressedBytesWithHeader);
954       }
955       int numBytes = (int) ChecksumUtil.numBytes(
956           onDiskBytesWithHeader.length,
957           fileContext.getBytesPerChecksum());
958 
959       // put the header for on disk bytes
960       putHeader(onDiskBytesWithHeader, 0,
961           onDiskBytesWithHeader.length + numBytes,
962           uncompressedBytesWithHeader.length, onDiskBytesWithHeader.length);
963       // set the header for the uncompressed bytes (for cache-on-write)
964       putHeader(uncompressedBytesWithHeader, 0,
965           onDiskBytesWithHeader.length + numBytes,
966           uncompressedBytesWithHeader.length, onDiskBytesWithHeader.length);
967 
968       onDiskChecksum = new byte[numBytes];
969       ChecksumUtil.generateChecksums(
970           onDiskBytesWithHeader, 0, onDiskBytesWithHeader.length,
971           onDiskChecksum, 0, fileContext.getChecksumType(), fileContext.getBytesPerChecksum());
972     }
973 
974     public static class BufferGrabbingByteArrayOutputStream extends ByteArrayOutputStream {
975       private byte[] buf;
976 
977       @Override
978       public void write(byte[] b, int off, int len) {
979         this.buf = b;
980       }
981 
982       public byte[] getBuffer() {
983         return this.buf;
984       }
985     }
986 
987     /**
988      * Put the header into the given byte array at the given offset.
989      * @param onDiskSize size of the block on disk header + data + checksum
990      * @param uncompressedSize size of the block after decompression (but
991      *          before optional data block decoding) including header
992      * @param onDiskDataSize size of the block on disk with header
993      *        and data but not including the checksums
994      */
995     private void putHeader(byte[] dest, int offset, int onDiskSize,
996         int uncompressedSize, int onDiskDataSize) {
997       offset = blockType.put(dest, offset);
998       offset = Bytes.putInt(dest, offset, onDiskSize - HConstants.HFILEBLOCK_HEADER_SIZE);
999       offset = Bytes.putInt(dest, offset, uncompressedSize - HConstants.HFILEBLOCK_HEADER_SIZE);
1000       offset = Bytes.putLong(dest, offset, prevOffset);
1001       offset = Bytes.putByte(dest, offset, fileContext.getChecksumType().getCode());
1002       offset = Bytes.putInt(dest, offset, fileContext.getBytesPerChecksum());
1003       Bytes.putInt(dest, offset, onDiskDataSize);
1004     }
1005 
1006     /**
1007      * Similar to {@link #writeHeaderAndData(FSDataOutputStream)}, but records
1008      * the offset of this block so that it can be referenced in the next block
1009      * of the same type.
1010      *
1011      * @param out
1012      * @throws IOException
1013      */
1014     public void writeHeaderAndData(FSDataOutputStream out) throws IOException {
1015       long offset = out.getPos();
1016       if (startOffset != -1 && offset != startOffset) {
1017         throw new IOException("A " + blockType + " block written to a "
1018             + "stream twice, first at offset " + startOffset + ", then at "
1019             + offset);
1020       }
1021       startOffset = offset;
1022 
1023       finishBlockAndWriteHeaderAndData((DataOutputStream) out);
1024     }
1025 
1026     /**
1027      * Writes the header and the compressed data of this block (or uncompressed
1028      * data when not using compression) into the given stream. Can be called in
1029      * the "writing" state or in the "block ready" state. If called in the
1030      * "writing" state, transitions the writer to the "block ready" state.
1031      *
1032      * @param out the output stream to write the
1033      * @throws IOException
1034      */
1035     protected void finishBlockAndWriteHeaderAndData(DataOutputStream out)
1036       throws IOException {
1037       ensureBlockReady();
1038       out.write(onDiskBytesWithHeader);
1039       out.write(onDiskChecksum);
1040     }
1041 
1042     /**
1043      * Returns the header or the compressed data (or uncompressed data when not
1044      * using compression) as a byte array. Can be called in the "writing" state
1045      * or in the "block ready" state. If called in the "writing" state,
1046      * transitions the writer to the "block ready" state. This returns
1047      * the header + data + checksums stored on disk.
1048      *
1049      * @return header and data as they would be stored on disk in a byte array
1050      * @throws IOException
1051      */
1052     byte[] getHeaderAndDataForTest() throws IOException {
1053       ensureBlockReady();
1054       // This is not very optimal, because we are doing an extra copy.
1055       // But this method is used only by unit tests.
1056       byte[] output =
1057           new byte[onDiskBytesWithHeader.length
1058               + onDiskChecksum.length];
1059       System.arraycopy(onDiskBytesWithHeader, 0, output, 0,
1060           onDiskBytesWithHeader.length);
1061       System.arraycopy(onDiskChecksum, 0, output,
1062           onDiskBytesWithHeader.length, onDiskChecksum.length);
1063       return output;
1064     }
1065 
1066     /**
1067      * Releases resources used by this writer.
1068      */
1069     public void release() {
1070       if (dataBlockEncodingCtx != null) {
1071         dataBlockEncodingCtx.close();
1072         dataBlockEncodingCtx = null;
1073       }
1074       if (defaultBlockEncodingCtx != null) {
1075         defaultBlockEncodingCtx.close();
1076         defaultBlockEncodingCtx = null;
1077       }
1078     }
1079 
1080     /**
1081      * Returns the on-disk size of the data portion of the block. This is the
1082      * compressed size if compression is enabled. Can only be called in the
1083      * "block ready" state. Header is not compressed, and its size is not
1084      * included in the return value.
1085      *
1086      * @return the on-disk size of the block, not including the header.
1087      */
1088     int getOnDiskSizeWithoutHeader() {
1089       expectState(State.BLOCK_READY);
1090       return onDiskBytesWithHeader.length
1091           + onDiskChecksum.length
1092           - HConstants.HFILEBLOCK_HEADER_SIZE;
1093     }
1094 
1095     /**
1096      * Returns the on-disk size of the block. Can only be called in the
1097      * "block ready" state.
1098      *
1099      * @return the on-disk size of the block ready to be written, including the
1100      *         header size, the data and the checksum data.
1101      */
1102     int getOnDiskSizeWithHeader() {
1103       expectState(State.BLOCK_READY);
1104       return onDiskBytesWithHeader.length + onDiskChecksum.length;
1105     }
1106 
1107     /**
1108      * The uncompressed size of the block data. Does not include header size.
1109      */
1110     int getUncompressedSizeWithoutHeader() {
1111       expectState(State.BLOCK_READY);
1112       return uncompressedBytesWithHeader.length - HConstants.HFILEBLOCK_HEADER_SIZE;
1113     }
1114 
1115     /**
1116      * The uncompressed size of the block data, including header size.
1117      */
1118     int getUncompressedSizeWithHeader() {
1119       expectState(State.BLOCK_READY);
1120       return uncompressedBytesWithHeader.length;
1121     }
1122 
1123     /** @return true if a block is being written  */
1124     public boolean isWriting() {
1125       return state == State.WRITING;
1126     }
1127 
1128     /**
1129      * Returns the number of bytes written into the current block so far, or
1130      * zero if not writing the block at the moment. Note that this will return
1131      * zero in the "block ready" state as well.
1132      *
1133      * @return the number of bytes written
1134      */
1135     public int blockSizeWritten() {
1136       if (state != State.WRITING) return 0;
1137       return this.unencodedDataSizeWritten;
1138     }
1139 
1140     /**
1141      * Returns the header followed by the uncompressed data, even if using
1142      * compression. This is needed for storing uncompressed blocks in the block
1143      * cache. Can be called in the "writing" state or the "block ready" state.
1144      * Returns only the header and data, does not include checksum data.
1145      *
1146      * @return uncompressed block bytes for caching on write
1147      */
1148     ByteBuffer getUncompressedBufferWithHeader() {
1149       expectState(State.BLOCK_READY);
1150       return ByteBuffer.wrap(uncompressedBytesWithHeader);
1151     }
1152 
1153     /**
1154      * Returns the header followed by the on-disk (compressed/encoded/encrypted) data. This is
1155      * needed for storing packed blocks in the block cache. Expects calling semantics identical to
1156      * {@link #getUncompressedBufferWithHeader()}. Returns only the header and data,
1157      * Does not include checksum data.
1158      *
1159      * @return packed block bytes for caching on write
1160      */
1161     ByteBuffer getOnDiskBufferWithHeader() {
1162       expectState(State.BLOCK_READY);
1163       return ByteBuffer.wrap(onDiskBytesWithHeader);
1164     }
1165 
1166     private void expectState(State expectedState) {
1167       if (state != expectedState) {
1168         throw new IllegalStateException("Expected state: " + expectedState +
1169             ", actual state: " + state);
1170       }
1171     }
1172 
1173     /**
1174      * Takes the given {@link BlockWritable} instance, creates a new block of
1175      * its appropriate type, writes the writable into this block, and flushes
1176      * the block into the output stream. The writer is instructed not to buffer
1177      * uncompressed bytes for cache-on-write.
1178      *
1179      * @param bw the block-writable object to write as a block
1180      * @param out the file system output stream
1181      * @throws IOException
1182      */
1183     public void writeBlock(BlockWritable bw, FSDataOutputStream out)
1184         throws IOException {
1185       bw.writeToBlock(startWriting(bw.getBlockType()));
1186       writeHeaderAndData(out);
1187     }
1188 
1189     /**
1190      * Creates a new HFileBlock. Checksums have already been validated, so
1191      * the byte buffer passed into the constructor of this newly created
1192      * block does not have checksum data even though the header minor
1193      * version is MINOR_VERSION_WITH_CHECKSUM. This is indicated by setting a
1194      * 0 value in bytesPerChecksum.
1195      */
1196     public HFileBlock getBlockForCaching(CacheConfig cacheConf) {
1197       HFileContext newContext = new HFileContextBuilder()
1198                                 .withBlockSize(fileContext.getBlocksize())
1199                                 .withBytesPerCheckSum(0)
1200                                 .withChecksumType(ChecksumType.NULL) // no checksums in cached data
1201                                 .withCompression(fileContext.getCompression())
1202                                 .withDataBlockEncoding(fileContext.getDataBlockEncoding())
1203                                 .withHBaseCheckSum(fileContext.isUseHBaseChecksum())
1204                                 .withCompressTags(fileContext.isCompressTags())
1205                                 .withIncludesMvcc(fileContext.isIncludesMvcc())
1206                                 .withIncludesTags(fileContext.isIncludesTags())
1207                                 .build();
1208       return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(),
1209           getUncompressedSizeWithoutHeader(), prevOffset,
1210           cacheConf.shouldCacheCompressed(blockType.getCategory()) ?
1211             getOnDiskBufferWithHeader() :
1212             getUncompressedBufferWithHeader(),
1213           FILL_HEADER, startOffset,
1214           onDiskBytesWithHeader.length + onDiskChecksum.length, newContext);
1215     }
1216   }
1217 
1218   /** Something that can be written into a block. */
1219   public interface BlockWritable {
1220 
1221     /** The type of block this data should use. */
1222     BlockType getBlockType();
1223 
1224     /**
1225      * Writes the block to the provided stream. Must not write any magic
1226      * records.
1227      *
1228      * @param out a stream to write uncompressed data into
1229      */
1230     void writeToBlock(DataOutput out) throws IOException;
1231   }
1232 
1233   // Block readers and writers
1234 
1235   /** An interface allowing to iterate {@link HFileBlock}s. */
1236   public interface BlockIterator {
1237 
1238     /**
1239      * Get the next block, or null if there are no more blocks to iterate.
1240      */
1241     HFileBlock nextBlock() throws IOException;
1242 
1243     /**
1244      * Similar to {@link #nextBlock()} but checks block type, throws an
1245      * exception if incorrect, and returns the HFile block
1246      */
1247     HFileBlock nextBlockWithBlockType(BlockType blockType) throws IOException;
1248   }
1249 
1250   /** A full-fledged reader with iteration ability. */
1251   public interface FSReader {
1252 
1253     /**
1254      * Reads the block at the given offset in the file with the given on-disk
1255      * size and uncompressed size.
1256      *
1257      * @param offset
1258      * @param onDiskSize the on-disk size of the entire block, including all
1259      *          applicable headers, or -1 if unknown
1260      * @param uncompressedSize the uncompressed size of the compressed part of
1261      *          the block, or -1 if unknown
1262      * @return the newly read block
1263      */
1264     HFileBlock readBlockData(long offset, long onDiskSize,
1265         int uncompressedSize, boolean pread) throws IOException;
1266 
1267     /**
1268      * Creates a block iterator over the given portion of the {@link HFile}.
1269      * The iterator returns blocks starting with offset such that offset &lt;=
1270      * startOffset &lt; endOffset. Returned blocks are always unpacked.
1271      *
1272      * @param startOffset the offset of the block to start iteration with
1273      * @param endOffset the offset to end iteration at (exclusive)
1274      * @return an iterator of blocks between the two given offsets
1275      */
1276     BlockIterator blockRange(long startOffset, long endOffset);
1277 
1278     /** Closes the backing streams */
1279     void closeStreams() throws IOException;
1280 
1281     /** Get a decoder for {@link BlockType#ENCODED_DATA} blocks from this file. */
1282     HFileBlockDecodingContext getBlockDecodingContext();
1283 
1284     /** Get the default decoder for blocks from this file. */
1285     HFileBlockDecodingContext getDefaultBlockDecodingContext();
1286 
1287     void setIncludesMemstoreTS(boolean includesMemstoreTS);
1288     void setDataBlockEncoder(HFileDataBlockEncoder encoder);
1289   }
1290 
1291   /**
1292    * We always prefetch the header of the next block, so that we know its
1293    * on-disk size in advance and can read it in one operation.
1294    */
1295   private static class PrefetchedHeader {
1296     long offset = -1;
1297     byte[] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE];
1298     final ByteBuffer buf = ByteBuffer.wrap(header, 0, HConstants.HFILEBLOCK_HEADER_SIZE);
1299   }
1300 
1301   /** Reads version 2 blocks from the filesystem. */
1302   static class FSReaderImpl implements FSReader {
1303     /** The file system stream of the underlying {@link HFile} that
1304      * does or doesn't do checksum validations in the filesystem */
1305     protected FSDataInputStreamWrapper streamWrapper;
1306 
1307     private HFileBlockDecodingContext encodedBlockDecodingCtx;
1308 
1309     /** Default context used when BlockType != {@link BlockType#ENCODED_DATA}. */
1310     private final HFileBlockDefaultDecodingContext defaultDecodingCtx;
1311 
1312     private ThreadLocal<PrefetchedHeader> prefetchedHeaderForThread =
1313         new ThreadLocal<PrefetchedHeader>() {
1314       @Override
1315       public PrefetchedHeader initialValue() {
1316         return new PrefetchedHeader();
1317       }
1318     };
1319 
1320     /** Compression algorithm used by the {@link HFile} */
1321 
1322     /** The size of the file we are reading from, or -1 if unknown. */
1323     protected long fileSize;
1324 
1325     /** The size of the header */
1326     protected final int hdrSize;
1327 
1328     /** The filesystem used to access data */
1329     protected HFileSystem hfs;
1330 
1331     /** The path (if any) where this data is coming from */
1332     protected Path path;
1333 
1334     private final Lock streamLock = new ReentrantLock();
1335 
1336     /** The default buffer size for our buffered streams */
1337     public static final int DEFAULT_BUFFER_SIZE = 1 << 20;
1338 
1339     protected HFileContext fileContext;
1340 
1341     public FSReaderImpl(FSDataInputStreamWrapper stream, long fileSize, HFileSystem hfs, Path path,
1342         HFileContext fileContext) throws IOException {
1343       this.fileSize = fileSize;
1344       this.hfs = hfs;
1345       this.path = path;
1346       this.fileContext = fileContext;
1347       this.hdrSize = headerSize(fileContext.isUseHBaseChecksum());
1348 
1349       this.streamWrapper = stream;
1350       // Older versions of HBase didn't support checksum.
1351       this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum());
1352       defaultDecodingCtx = new HFileBlockDefaultDecodingContext(fileContext);
1353       encodedBlockDecodingCtx = defaultDecodingCtx;
1354     }
1355 
1356     /**
1357      * A constructor that reads files with the latest minor version.
1358      * This is used by unit tests only.
1359      */
1360     FSReaderImpl(FSDataInputStream istream, long fileSize, HFileContext fileContext)
1361     throws IOException {
1362       this(new FSDataInputStreamWrapper(istream), fileSize, null, null, fileContext);
1363     }
1364 
1365     public BlockIterator blockRange(final long startOffset, final long endOffset) {
1366       final FSReader owner = this; // handle for inner class
1367       return new BlockIterator() {
1368         private long offset = startOffset;
1369 
1370         @Override
1371         public HFileBlock nextBlock() throws IOException {
1372           if (offset >= endOffset)
1373             return null;
1374           HFileBlock b = readBlockData(offset, -1, -1, false);
1375           offset += b.getOnDiskSizeWithHeader();
1376           return b.unpack(fileContext, owner);
1377         }
1378 
1379         @Override
1380         public HFileBlock nextBlockWithBlockType(BlockType blockType)
1381             throws IOException {
1382           HFileBlock blk = nextBlock();
1383           if (blk.getBlockType() != blockType) {
1384             throw new IOException("Expected block of type " + blockType
1385                 + " but found " + blk.getBlockType());
1386           }
1387           return blk;
1388         }
1389       };
1390     }
1391 
1392     /**
1393      * Does a positional read or a seek and read into the given buffer. Returns
1394      * the on-disk size of the next block, or -1 if it could not be determined.
1395      *
1396      * @param dest destination buffer
1397      * @param destOffset offset in the destination buffer
1398      * @param size size of the block to be read
1399      * @param peekIntoNextBlock whether to read the next block's on-disk size
1400      * @param fileOffset position in the stream to read at
1401      * @param pread whether we should do a positional read
1402      * @param istream The input source of data
1403      * @return the on-disk size of the next block with header size included, or
1404      *         -1 if it could not be determined
1405      * @throws IOException
1406      */
1407     protected int readAtOffset(FSDataInputStream istream,
1408         byte[] dest, int destOffset, int size,
1409         boolean peekIntoNextBlock, long fileOffset, boolean pread)
1410         throws IOException {
1411       if (peekIntoNextBlock &&
1412           destOffset + size + hdrSize > dest.length) {
1413         // We are asked to read the next block's header as well, but there is
1414         // not enough room in the array.
1415         throw new IOException("Attempted to read " + size + " bytes and " +
1416             hdrSize + " bytes of next header into a " + dest.length +
1417             "-byte array at offset " + destOffset);
1418       }
1419 
1420       if (!pread && streamLock.tryLock()) {
1421         // Seek + read. Better for scanning.
1422         try {
1423           istream.seek(fileOffset);
1424 
1425           long realOffset = istream.getPos();
1426           if (realOffset != fileOffset) {
1427             throw new IOException("Tried to seek to " + fileOffset + " to "
1428                 + "read " + size + " bytes, but pos=" + realOffset
1429                 + " after seek");
1430           }
1431 
1432           if (!peekIntoNextBlock) {
1433             IOUtils.readFully(istream, dest, destOffset, size);
1434             return -1;
1435           }
1436 
1437           // Try to read the next block header.
1438           if (!readWithExtra(istream, dest, destOffset, size, hdrSize))
1439             return -1;
1440         } finally {
1441           streamLock.unlock();
1442         }
1443       } else {
1444         // Positional read. Better for random reads; or when the streamLock is already locked.
1445         int extraSize = peekIntoNextBlock ? hdrSize : 0;
1446         int ret = istream.read(fileOffset, dest, destOffset, size + extraSize);
1447         if (ret < size) {
1448           throw new IOException("Positional read of " + size + " bytes " +
1449               "failed at offset " + fileOffset + " (returned " + ret + ")");
1450         }
1451 
1452         if (ret == size || ret < size + extraSize) {
1453           // Could not read the next block's header, or did not try.
1454           return -1;
1455         }
1456       }
1457 
1458       assert peekIntoNextBlock;
1459       return Bytes.toInt(dest, destOffset + size + BlockType.MAGIC_LENGTH) + hdrSize;
1460     }
1461 
1462     /**
1463      * Reads a version 2 block (version 1 blocks not supported and not expected). Tries to do as
1464      * little memory allocation as possible, using the provided on-disk size.
1465      *
1466      * @param offset the offset in the stream to read at
1467      * @param onDiskSizeWithHeaderL the on-disk size of the block, including
1468      *          the header, or -1 if unknown
1469      * @param uncompressedSize the uncompressed size of the the block. Always
1470      *          expected to be -1. This parameter is only used in version 1.
1471      * @param pread whether to use a positional read
1472      */
1473     @Override
1474     public HFileBlock readBlockData(long offset, long onDiskSizeWithHeaderL,
1475         int uncompressedSize, boolean pread)
1476     throws IOException {
1477 
1478       // get a copy of the current state of whether to validate
1479       // hbase checksums or not for this read call. This is not
1480       // thread-safe but the one constaint is that if we decide
1481       // to skip hbase checksum verification then we are
1482       // guaranteed to use hdfs checksum verification.
1483       boolean doVerificationThruHBaseChecksum = streamWrapper.shouldUseHBaseChecksum();
1484       FSDataInputStream is = streamWrapper.getStream(doVerificationThruHBaseChecksum);
1485 
1486       HFileBlock blk = readBlockDataInternal(is, offset,
1487                          onDiskSizeWithHeaderL,
1488                          uncompressedSize, pread,
1489                          doVerificationThruHBaseChecksum);
1490       if (blk == null) {
1491         HFile.LOG.warn("HBase checksum verification failed for file " +
1492                        path + " at offset " +
1493                        offset + " filesize " + fileSize +
1494                        ". Retrying read with HDFS checksums turned on...");
1495 
1496         if (!doVerificationThruHBaseChecksum) {
1497           String msg = "HBase checksum verification failed for file " +
1498                        path + " at offset " +
1499                        offset + " filesize " + fileSize +
1500                        " but this cannot happen because doVerify is " +
1501                        doVerificationThruHBaseChecksum;
1502           HFile.LOG.warn(msg);
1503           throw new IOException(msg); // cannot happen case here
1504         }
1505         HFile.checksumFailures.incrementAndGet(); // update metrics
1506 
1507         // If we have a checksum failure, we fall back into a mode where
1508         // the next few reads use HDFS level checksums. We aim to make the
1509         // next CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD reads avoid
1510         // hbase checksum verification, but since this value is set without
1511         // holding any locks, it can so happen that we might actually do
1512         // a few more than precisely this number.
1513         is = this.streamWrapper.fallbackToFsChecksum(CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD);
1514         doVerificationThruHBaseChecksum = false;
1515         blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL,
1516                                     uncompressedSize, pread,
1517                                     doVerificationThruHBaseChecksum);
1518         if (blk != null) {
1519           HFile.LOG.warn("HDFS checksum verification suceeded for file " +
1520                          path + " at offset " +
1521                          offset + " filesize " + fileSize);
1522         }
1523       }
1524       if (blk == null && !doVerificationThruHBaseChecksum) {
1525         String msg = "readBlockData failed, possibly due to " +
1526                      "checksum verification failed for file " + path +
1527                      " at offset " + offset + " filesize " + fileSize;
1528         HFile.LOG.warn(msg);
1529         throw new IOException(msg);
1530       }
1531 
1532       // If there is a checksum mismatch earlier, then retry with
1533       // HBase checksums switched off and use HDFS checksum verification.
1534       // This triggers HDFS to detect and fix corrupt replicas. The
1535       // next checksumOffCount read requests will use HDFS checksums.
1536       // The decrementing of this.checksumOffCount is not thread-safe,
1537       // but it is harmless because eventually checksumOffCount will be
1538       // a negative number.
1539       streamWrapper.checksumOk();
1540       return blk;
1541     }
1542 
1543     /**
1544      * Reads a version 2 block.
1545      *
1546      * @param offset the offset in the stream to read at
1547      * @param onDiskSizeWithHeaderL the on-disk size of the block, including
1548      *          the header, or -1 if unknown
1549      * @param uncompressedSize the uncompressed size of the the block. Always
1550      *          expected to be -1. This parameter is only used in version 1.
1551      * @param pread whether to use a positional read
1552      * @param verifyChecksum Whether to use HBase checksums.
1553      *        If HBase checksum is switched off, then use HDFS checksum.
1554      * @return the HFileBlock or null if there is a HBase checksum mismatch
1555      */
1556     private HFileBlock readBlockDataInternal(FSDataInputStream is, long offset,
1557         long onDiskSizeWithHeaderL, int uncompressedSize, boolean pread,
1558         boolean verifyChecksum)
1559     throws IOException {
1560       if (offset < 0) {
1561         throw new IOException("Invalid offset=" + offset + " trying to read "
1562             + "block (onDiskSize=" + onDiskSizeWithHeaderL
1563             + ", uncompressedSize=" + uncompressedSize + ")");
1564       }
1565 
1566       if (uncompressedSize != -1) {
1567         throw new IOException("Version 2 block reader API does not need " +
1568             "the uncompressed size parameter");
1569       }
1570 
1571       if ((onDiskSizeWithHeaderL < hdrSize && onDiskSizeWithHeaderL != -1)
1572           || onDiskSizeWithHeaderL >= Integer.MAX_VALUE) {
1573         throw new IOException("Invalid onDisksize=" + onDiskSizeWithHeaderL
1574             + ": expected to be at least " + hdrSize
1575             + " and at most " + Integer.MAX_VALUE + ", or -1 (offset="
1576             + offset + ", uncompressedSize=" + uncompressedSize + ")");
1577       }
1578 
1579       int onDiskSizeWithHeader = (int) onDiskSizeWithHeaderL;
1580       // See if we can avoid reading the header. This is desirable, because
1581       // we will not incur a backward seek operation if we have already
1582       // read this block's header as part of the previous read's look-ahead.
1583       // And we also want to skip reading the header again if it has already
1584       // been read.
1585       // TODO: How often does this optimization fire? Has to be same thread so the thread local
1586       // is pertinent and we have to be reading next block as in a big scan.
1587       PrefetchedHeader prefetchedHeader = prefetchedHeaderForThread.get();
1588       ByteBuffer headerBuf = prefetchedHeader.offset == offset? prefetchedHeader.buf: null;
1589 
1590       // Allocate enough space to fit the next block's header too.
1591       int nextBlockOnDiskSize = 0;
1592       byte[] onDiskBlock = null;
1593 
1594       HFileBlock b = null;
1595       if (onDiskSizeWithHeader > 0) {
1596         // We know the total on-disk size. Read the entire block into memory,
1597         // then parse the header. This code path is used when
1598         // doing a random read operation relying on the block index, as well as
1599         // when the client knows the on-disk size from peeking into the next
1600         // block's header (e.g. this block's header) when reading the previous
1601         // block. This is the faster and more preferable case.
1602 
1603         // Size that we have to skip in case we have already read the header.
1604         int preReadHeaderSize = headerBuf == null ? 0 : hdrSize;
1605         onDiskBlock = new byte[onDiskSizeWithHeader + hdrSize]; // room for this block plus the
1606                                                                 // next block's header
1607         nextBlockOnDiskSize = readAtOffset(is, onDiskBlock,
1608             preReadHeaderSize, onDiskSizeWithHeader - preReadHeaderSize,
1609             true, offset + preReadHeaderSize, pread);
1610         if (headerBuf != null) {
1611           // the header has been read when reading the previous block, copy
1612           // to this block's header
1613           // headerBuf is HBB
1614           assert headerBuf.hasArray();
1615           System.arraycopy(headerBuf.array(),
1616               headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize);
1617         } else {
1618           headerBuf = ByteBuffer.wrap(onDiskBlock, 0, hdrSize);
1619         }
1620         // We know the total on-disk size but not the uncompressed size. Parse the header.
1621         try {
1622           // TODO: FIX!!! Expensive parse just to get a length
1623           b = new HFileBlock(headerBuf, fileContext.isUseHBaseChecksum());
1624         } catch (IOException ex) {
1625           // Seen in load testing. Provide comprehensive debug info.
1626           throw new IOException("Failed to read compressed block at "
1627               + offset
1628               + ", onDiskSizeWithoutHeader="
1629               + onDiskSizeWithHeader
1630               + ", preReadHeaderSize="
1631               + hdrSize
1632               + ", header.length="
1633               + prefetchedHeader.header.length
1634               + ", header bytes: "
1635               + Bytes.toStringBinary(prefetchedHeader.header, 0,
1636                   hdrSize), ex);
1637         }
1638         // if the caller specifies a onDiskSizeWithHeader, validate it.
1639         int onDiskSizeWithoutHeader = onDiskSizeWithHeader - hdrSize;
1640         assert onDiskSizeWithoutHeader >= 0;
1641         b.validateOnDiskSizeWithoutHeader(onDiskSizeWithoutHeader);
1642       } else {
1643         // Check headerBuf to see if we have read this block's header as part of
1644         // reading the previous block. This is an optimization of peeking into
1645         // the next block's header (e.g.this block's header) when reading the
1646         // previous block. This is the faster and more preferable case. If the
1647         // header is already there, don't read the header again.
1648 
1649         // Unfortunately, we still have to do a separate read operation to
1650         // read the header.
1651         if (headerBuf == null) {
1652           // From the header, determine the on-disk size of the given hfile
1653           // block, and read the remaining data, thereby incurring two read
1654           // operations. This might happen when we are doing the first read
1655           // in a series of reads or a random read, and we don't have access
1656           // to the block index. This is costly and should happen very rarely.
1657           headerBuf = ByteBuffer.allocate(hdrSize);
1658           // headerBuf is HBB
1659           readAtOffset(is, headerBuf.array(), headerBuf.arrayOffset(),
1660               hdrSize, false, offset, pread);
1661         }
1662         // TODO: FIX!!! Expensive parse just to get a length
1663         b = new HFileBlock(headerBuf, fileContext.isUseHBaseChecksum());
1664         onDiskBlock = new byte[b.getOnDiskSizeWithHeader() + hdrSize];
1665         // headerBuf is HBB
1666         System.arraycopy(headerBuf.array(), headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize);
1667         nextBlockOnDiskSize =
1668           readAtOffset(is, onDiskBlock, hdrSize, b.getOnDiskSizeWithHeader()
1669               - hdrSize, true, offset + hdrSize, pread);
1670         onDiskSizeWithHeader = b.onDiskSizeWithoutHeader + hdrSize;
1671       }
1672 
1673       if (!fileContext.isCompressedOrEncrypted()) {
1674         b.assumeUncompressed();
1675       }
1676 
1677       if (verifyChecksum && !validateBlockChecksum(b, onDiskBlock, hdrSize)) {
1678         return null;             // checksum mismatch
1679       }
1680 
1681       // The onDiskBlock will become the headerAndDataBuffer for this block.
1682       // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already
1683       // contains the header of next block, so no need to set next
1684       // block's header in it.
1685       b = new HFileBlock(ByteBuffer.wrap(onDiskBlock, 0, onDiskSizeWithHeader),
1686         this.fileContext.isUseHBaseChecksum());
1687 
1688       b.nextBlockOnDiskSizeWithHeader = nextBlockOnDiskSize;
1689 
1690       // Set prefetched header
1691       if (b.hasNextBlockHeader()) {
1692         prefetchedHeader.offset = offset + b.getOnDiskSizeWithHeader();
1693         System.arraycopy(onDiskBlock, onDiskSizeWithHeader, prefetchedHeader.header, 0, hdrSize);
1694       }
1695 
1696       b.offset = offset;
1697       b.fileContext.setIncludesTags(this.fileContext.isIncludesTags());
1698       b.fileContext.setIncludesMvcc(this.fileContext.isIncludesMvcc());
1699       return b;
1700     }
1701 
1702     public void setIncludesMemstoreTS(boolean includesMemstoreTS) {
1703       this.fileContext.setIncludesMvcc(includesMemstoreTS);
1704     }
1705 
1706     public void setDataBlockEncoder(HFileDataBlockEncoder encoder) {
1707       encodedBlockDecodingCtx = encoder.newDataBlockDecodingContext(this.fileContext);
1708     }
1709 
1710     @Override
1711     public HFileBlockDecodingContext getBlockDecodingContext() {
1712       return this.encodedBlockDecodingCtx;
1713     }
1714 
1715     @Override
1716     public HFileBlockDecodingContext getDefaultBlockDecodingContext() {
1717       return this.defaultDecodingCtx;
1718     }
1719 
1720     /**
1721      * Generates the checksum for the header as well as the data and
1722      * then validates that it matches the value stored in the header.
1723      * If there is a checksum mismatch, then return false. Otherwise
1724      * return true.
1725      */
1726     protected boolean validateBlockChecksum(HFileBlock block,  byte[] data, int hdrSize)
1727         throws IOException {
1728       return ChecksumUtil.validateBlockChecksum(path, block, data, hdrSize);
1729     }
1730 
1731     @Override
1732     public void closeStreams() throws IOException {
1733       streamWrapper.close();
1734     }
1735 
1736     @Override
1737     public String toString() {
1738       return "hfs=" + hfs + ", path=" + path + ", fileContext=" + fileContext;
1739     }
1740   }
1741 
1742   @Override
1743   public int getSerializedLength() {
1744     if (buf != null) {
1745       // include extra bytes for the next header when it's available.
1746       int extraSpace = hasNextBlockHeader() ? headerSize() : 0;
1747       return this.buf.limit() + extraSpace + HFileBlock.EXTRA_SERIALIZATION_SPACE;
1748     }
1749     return 0;
1750   }
1751 
1752   @Override
1753   public void serialize(ByteBuffer destination) {
1754     this.buf.get(destination, 0, getSerializedLength()
1755         - EXTRA_SERIALIZATION_SPACE);
1756     serializeExtraInfo(destination);
1757   }
1758 
1759   public void serializeExtraInfo(ByteBuffer destination) {
1760     destination.put(this.fileContext.isUseHBaseChecksum() ? (byte) 1 : (byte) 0);
1761     destination.putLong(this.offset);
1762     destination.putInt(this.nextBlockOnDiskSizeWithHeader);
1763     destination.rewind();
1764   }
1765 
1766   @Override
1767   public CacheableDeserializer<Cacheable> getDeserializer() {
1768     return HFileBlock.blockDeserializer;
1769   }
1770 
1771   @Override
1772   public int hashCode() {
1773     int result = 1;
1774     result = result * 31 + blockType.hashCode();
1775     result = result * 31 + nextBlockOnDiskSizeWithHeader;
1776     result = result * 31 + (int) (offset ^ (offset >>> 32));
1777     result = result * 31 + onDiskSizeWithoutHeader;
1778     result = result * 31 + (int) (prevBlockOffset ^ (prevBlockOffset >>> 32));
1779     result = result * 31 + uncompressedSizeWithoutHeader;
1780     result = result * 31 + buf.hashCode();
1781     return result;
1782   }
1783 
1784   @Override
1785   public boolean equals(Object comparison) {
1786     if (this == comparison) {
1787       return true;
1788     }
1789     if (comparison == null) {
1790       return false;
1791     }
1792     if (comparison.getClass() != this.getClass()) {
1793       return false;
1794     }
1795 
1796     HFileBlock castedComparison = (HFileBlock) comparison;
1797 
1798     if (castedComparison.blockType != this.blockType) {
1799       return false;
1800     }
1801     if (castedComparison.nextBlockOnDiskSizeWithHeader != this.nextBlockOnDiskSizeWithHeader) {
1802       return false;
1803     }
1804     if (castedComparison.offset != this.offset) {
1805       return false;
1806     }
1807     if (castedComparison.onDiskSizeWithoutHeader != this.onDiskSizeWithoutHeader) {
1808       return false;
1809     }
1810     if (castedComparison.prevBlockOffset != this.prevBlockOffset) {
1811       return false;
1812     }
1813     if (castedComparison.uncompressedSizeWithoutHeader != this.uncompressedSizeWithoutHeader) {
1814       return false;
1815     }
1816     if (ByteBuff.compareTo(this.buf, 0, this.buf.limit(), castedComparison.buf, 0,
1817         castedComparison.buf.limit()) != 0) {
1818       return false;
1819     }
1820     return true;
1821   }
1822 
1823   public DataBlockEncoding getDataBlockEncoding() {
1824     if (blockType == BlockType.ENCODED_DATA) {
1825       return DataBlockEncoding.getEncodingById(getDataBlockEncodingId());
1826     }
1827     return DataBlockEncoding.NONE;
1828   }
1829 
1830   byte getChecksumType() {
1831     return this.fileContext.getChecksumType().getCode();
1832   }
1833 
1834   int getBytesPerChecksum() {
1835     return this.fileContext.getBytesPerChecksum();
1836   }
1837 
1838   /** @return the size of data on disk + header. Excludes checksum. */
1839   int getOnDiskDataSizeWithHeader() {
1840     return this.onDiskDataSizeWithHeader;
1841   }
1842 
1843   /**
1844    * Calcuate the number of bytes required to store all the checksums
1845    * for this block. Each checksum value is a 4 byte integer.
1846    */
1847   int totalChecksumBytes() {
1848     // If the hfile block has minorVersion 0, then there are no checksum
1849     // data to validate. Similarly, a zero value in this.bytesPerChecksum
1850     // indicates that cached blocks do not have checksum data because
1851     // checksums were already validated when the block was read from disk.
1852     if (!fileContext.isUseHBaseChecksum() || this.fileContext.getBytesPerChecksum() == 0) {
1853       return 0;
1854     }
1855     return (int) ChecksumUtil.numBytes(onDiskDataSizeWithHeader,
1856         this.fileContext.getBytesPerChecksum());
1857   }
1858 
1859   /**
1860    * Returns the size of this block header.
1861    */
1862   public int headerSize() {
1863     return headerSize(this.fileContext.isUseHBaseChecksum());
1864   }
1865 
1866   /**
1867    * Maps a minor version to the size of the header.
1868    */
1869   public static int headerSize(boolean usesHBaseChecksum) {
1870     if (usesHBaseChecksum) {
1871       return HConstants.HFILEBLOCK_HEADER_SIZE;
1872     }
1873     return HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
1874   }
1875 
1876   /**
1877    * Return the appropriate DUMMY_HEADER for the minor version
1878    */
1879   public byte[] getDummyHeaderForVersion() {
1880     return getDummyHeaderForVersion(this.fileContext.isUseHBaseChecksum());
1881   }
1882 
1883   /**
1884    * Return the appropriate DUMMY_HEADER for the minor version
1885    */
1886   static private byte[] getDummyHeaderForVersion(boolean usesHBaseChecksum) {
1887     if (usesHBaseChecksum) {
1888       return HConstants.HFILEBLOCK_DUMMY_HEADER;
1889     }
1890     return DUMMY_HEADER_NO_CHECKSUM;
1891   }
1892 
1893   /**
1894    * @return the HFileContext used to create this HFileBlock. Not necessary the
1895    * fileContext for the file from which this block's data was originally read.
1896    */
1897   public HFileContext getHFileContext() {
1898     return this.fileContext;
1899   }
1900 
1901   @Override
1902   public MemoryType getMemoryType() {
1903     return this.memType;
1904   }
1905 
1906   /**
1907    * Convert the contents of the block header into a human readable string.
1908    * This is mostly helpful for debugging. This assumes that the block
1909    * has minor version > 0.
1910    */
1911   static String toStringHeader(ByteBuff buf) throws IOException {
1912     byte[] magicBuf = new byte[Math.min(buf.limit() - buf.position(), BlockType.MAGIC_LENGTH)];
1913     buf.get(magicBuf);
1914     BlockType bt = BlockType.parse(magicBuf, 0, BlockType.MAGIC_LENGTH);
1915     int compressedBlockSizeNoHeader = buf.getInt();
1916     int uncompressedBlockSizeNoHeader = buf.getInt();
1917     long prevBlockOffset = buf.getLong();
1918     byte cksumtype = buf.get();
1919     long bytesPerChecksum = buf.getInt();
1920     long onDiskDataSizeWithHeader = buf.getInt();
1921     return " Header dump: magic: " + Bytes.toString(magicBuf) +
1922                    " blockType " + bt +
1923                    " compressedBlockSizeNoHeader " +
1924                    compressedBlockSizeNoHeader +
1925                    " uncompressedBlockSizeNoHeader " +
1926                    uncompressedBlockSizeNoHeader +
1927                    " prevBlockOffset " + prevBlockOffset +
1928                    " checksumType " + ChecksumType.codeToType(cksumtype) +
1929                    " bytesPerChecksum " + bytesPerChecksum +
1930                    " onDiskDataSizeWithHeader " + onDiskDataSizeWithHeader;
1931   }
1932 }