View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.io.hfile;
19  
20  import java.io.ByteArrayOutputStream;
21  import java.io.DataInputStream;
22  import java.io.DataOutput;
23  import java.io.DataOutputStream;
24  import java.io.IOException;
25  import java.io.InputStream;
26  import java.nio.ByteBuffer;
27  import java.util.concurrent.locks.Lock;
28  import java.util.concurrent.locks.ReentrantLock;
29  
30  import org.apache.hadoop.fs.FSDataInputStream;
31  import org.apache.hadoop.fs.FSDataOutputStream;
32  import org.apache.hadoop.fs.Path;
33  import org.apache.hadoop.hbase.Cell;
34  import org.apache.hadoop.hbase.HConstants;
35  import org.apache.hadoop.hbase.classification.InterfaceAudience;
36  import org.apache.hadoop.hbase.fs.HFileSystem;
37  import org.apache.hadoop.hbase.io.ByteBufferInputStream;
38  import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
39  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
40  import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
41  import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
42  import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
43  import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
44  import org.apache.hadoop.hbase.util.ByteBufferUtils;
45  import org.apache.hadoop.hbase.util.Bytes;
46  import org.apache.hadoop.hbase.util.ChecksumType;
47  import org.apache.hadoop.hbase.util.ClassSize;
48  import org.apache.hadoop.io.IOUtils;
49  
50  import com.google.common.annotations.VisibleForTesting;
51  import com.google.common.base.Preconditions;
52  
53  /**
54   * Reading {@link HFile} version 1 and 2 blocks, and writing version 2 blocks.
55   * <ul>
56   * <li>In version 1 all blocks are always compressed or uncompressed, as
57   * specified by the {@link HFile}'s compression algorithm, with a type-specific
58   * magic record stored in the beginning of the compressed data (i.e. one needs
59   * to uncompress the compressed block to determine the block type). There is
60   * only a single compression algorithm setting for all blocks. Offset and size
61   * information from the block index are required to read a block.
62   * <li>In version 2 a block is structured as follows:
63   * <ul>
64   * <li>header (see Writer#finishBlock())
65   * <ul>
66   * <li>Magic record identifying the block type (8 bytes)
67   * <li>Compressed block size, excluding header, including checksum (4 bytes)
68   * <li>Uncompressed block size, excluding header, excluding checksum (4 bytes)
69   * <li>The offset of the previous block of the same type (8 bytes). This is
70   * used to be able to navigate to the previous block without going to the block
71   * <li>For minorVersions &gt;=1, the ordinal describing checksum type (1 byte)
72   * <li>For minorVersions &gt;=1, the number of data bytes/checksum chunk (4 bytes)
73   * <li>For minorVersions &gt;=1, the size of data on disk, including header,
74   * excluding checksums (4 bytes)
75   * </ul>
76   * </li>
77   * <li>Raw/Compressed/Encrypted/Encoded data. The compression algorithm is the
78   * same for all the blocks in the {@link HFile}, similarly to what was done in
79   * version 1.
80   * <li>For minorVersions &gt;=1, a series of 4 byte checksums, one each for
81   * the number of bytes specified by bytesPerChecksum.
82   * </ul>
83   * </ul>
84   */
85  @InterfaceAudience.Private
86  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="HE_EQUALS_USE_HASHCODE",
87    justification="Fix!!! Fine for now bug FIXXXXXXX!!!!")
88  public class HFileBlock implements Cacheable {
89  
90    /**
91     * On a checksum failure on a Reader, these many suceeding read
92     * requests switch back to using hdfs checksums before auto-reenabling
93     * hbase checksum verification.
94     */
95    static final int CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD = 3;
96  
97    public static final boolean FILL_HEADER = true;
98    public static final boolean DONT_FILL_HEADER = false;
99  
100   /**
101    * The size of block header when blockType is {@link BlockType#ENCODED_DATA}.
102    * This extends normal header by adding the id of encoder.
103    */
104   public static final int ENCODED_HEADER_SIZE = HConstants.HFILEBLOCK_HEADER_SIZE
105       + DataBlockEncoding.ID_SIZE;
106 
107   static final byte[] DUMMY_HEADER_NO_CHECKSUM =
108      new byte[HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM];
109 
110   public static final int BYTE_BUFFER_HEAP_SIZE = (int) ClassSize.estimateBase(
111       ByteBuffer.wrap(new byte[0], 0, 0).getClass(), false);
112 
113   // meta.usesHBaseChecksum+offset+nextBlockOnDiskSizeWithHeader
114   public static final int EXTRA_SERIALIZATION_SPACE = Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT
115       + Bytes.SIZEOF_LONG;
116 
117   /**
118    * Each checksum value is an integer that can be stored in 4 bytes.
119    */
120   static final int CHECKSUM_SIZE = Bytes.SIZEOF_INT;
121 
122   static final CacheableDeserializer<Cacheable> blockDeserializer =
123       new CacheableDeserializer<Cacheable>() {
124         public HFileBlock deserialize(ByteBuffer buf, boolean reuse) throws IOException{
125           buf.limit(buf.limit() - HFileBlock.EXTRA_SERIALIZATION_SPACE).rewind();
126           ByteBuffer newByteBuffer;
127           if (reuse) {
128             newByteBuffer = buf.slice();
129           } else {
130            newByteBuffer = ByteBuffer.allocate(buf.limit());
131            newByteBuffer.put(buf);
132           }
133           buf.position(buf.limit());
134           buf.limit(buf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE);
135           boolean usesChecksum = buf.get() == (byte)1;
136           HFileBlock hFileBlock = new HFileBlock(newByteBuffer, usesChecksum);
137           hFileBlock.offset = buf.getLong();
138           hFileBlock.nextBlockOnDiskSizeWithHeader = buf.getInt();
139           if (hFileBlock.hasNextBlockHeader()) {
140             hFileBlock.buf.limit(hFileBlock.buf.limit() - hFileBlock.headerSize());
141           }
142           return hFileBlock;
143         }
144 
145         @Override
146         public int getDeserialiserIdentifier() {
147           return deserializerIdentifier;
148         }
149 
150         @Override
151         public HFileBlock deserialize(ByteBuffer b) throws IOException {
152           return deserialize(b, false);
153         }
154       };
155   private static final int deserializerIdentifier;
156   static {
157     deserializerIdentifier = CacheableDeserializerIdManager
158         .registerDeserializer(blockDeserializer);
159   }
160 
161     // Todo: encapsulate Header related logic in this inner class.
162   static class Header {
163     // Format of header is:
164     // 8 bytes - block magic
165     // 4 bytes int - onDiskSizeWithoutHeader
166     // 4 bytes int - uncompressedSizeWithoutHeader
167     // 8 bytes long - prevBlockOffset
168     // The following 3 are only present if header contains checksum information
169     // 1 byte - checksum type
170     // 4 byte int - bytes per checksum
171     // 4 byte int - onDiskDataSizeWithHeader
172     static int BLOCK_MAGIC_INDEX = 0;
173     static int ON_DISK_SIZE_WITHOUT_HEADER_INDEX = 8;
174     static int UNCOMPRESSED_SIZE_WITHOUT_HEADER_INDEX = 12;
175     static int PREV_BLOCK_OFFSET_INDEX = 16;
176     static int CHECKSUM_TYPE_INDEX = 24;
177     static int BYTES_PER_CHECKSUM_INDEX = 25;
178     static int ON_DISK_DATA_SIZE_WITH_HEADER_INDEX = 29;
179   }
180 
181   /** Type of block. Header field 0. */
182   private BlockType blockType;
183 
184   /** Size on disk excluding header, including checksum. Header field 1. */
185   private int onDiskSizeWithoutHeader;
186 
187   /** Size of pure data. Does not include header or checksums. Header field 2. */
188   private final int uncompressedSizeWithoutHeader;
189 
190   /** The offset of the previous block on disk. Header field 3. */
191   private final long prevBlockOffset;
192 
193   /**
194    * Size on disk of header + data. Excludes checksum. Header field 6,
195    * OR calculated from {@link #onDiskSizeWithoutHeader} when using HDFS checksum.
196    */
197   private final int onDiskDataSizeWithHeader;
198 
199   /** The in-memory representation of the hfile block */
200   private ByteBuffer buf;
201 
202   /** Meta data that holds meta information on the hfileblock */
203   private HFileContext fileContext;
204 
205   /**
206    * The offset of this block in the file. Populated by the reader for
207    * convenience of access. This offset is not part of the block header.
208    */
209   private long offset = -1;
210 
211   /**
212    * The on-disk size of the next block, including the header, obtained by
213    * peeking into the first {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the next block's
214    * header, or -1 if unknown.
215    */
216   private int nextBlockOnDiskSizeWithHeader = -1;
217 
218   /**
219    * Creates a new {@link HFile} block from the given fields. This constructor
220    * is mostly used when the block data has already been read and uncompressed,
221    * and is sitting in a byte buffer.
222    *
223    * @param blockType the type of this block, see {@link BlockType}
224    * @param onDiskSizeWithoutHeader see {@link #onDiskSizeWithoutHeader}
225    * @param uncompressedSizeWithoutHeader see {@link #uncompressedSizeWithoutHeader}
226    * @param prevBlockOffset see {@link #prevBlockOffset}
227    * @param buf block header ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes) followed by
228    *          uncompressed data. This
229    * @param fillHeader when true, parse {@code buf} and override the first 4 header fields.
230    * @param offset the file offset the block was read from
231    * @param onDiskDataSizeWithHeader see {@link #onDiskDataSizeWithHeader}
232    * @param fileContext HFile meta data
233    */
234   HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader, int uncompressedSizeWithoutHeader,
235       long prevBlockOffset, ByteBuffer buf, boolean fillHeader, long offset,
236       int onDiskDataSizeWithHeader, HFileContext fileContext) {
237     this.blockType = blockType;
238     this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader;
239     this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader;
240     this.prevBlockOffset = prevBlockOffset;
241     this.buf = buf;
242     this.offset = offset;
243     this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader;
244     this.fileContext = fileContext;
245     if (fillHeader)
246       overwriteHeader();
247     this.buf.rewind();
248   }
249 
250   /**
251    * Copy constructor. Creates a shallow copy of {@code that}'s buffer.
252    */
253   HFileBlock(HFileBlock that) {
254     this.blockType = that.blockType;
255     this.onDiskSizeWithoutHeader = that.onDiskSizeWithoutHeader;
256     this.uncompressedSizeWithoutHeader = that.uncompressedSizeWithoutHeader;
257     this.prevBlockOffset = that.prevBlockOffset;
258     this.buf = that.buf.duplicate();
259     this.offset = that.offset;
260     this.onDiskDataSizeWithHeader = that.onDiskDataSizeWithHeader;
261     this.fileContext = that.fileContext;
262     this.nextBlockOnDiskSizeWithHeader = that.nextBlockOnDiskSizeWithHeader;
263   }
264 
265   /**
266    * Creates a block from an existing buffer starting with a header. Rewinds
267    * and takes ownership of the buffer. By definition of rewind, ignores the
268    * buffer position, but if you slice the buffer beforehand, it will rewind
269    * to that point. The reason this has a minorNumber and not a majorNumber is
270    * because majorNumbers indicate the format of a HFile whereas minorNumbers
271    * indicate the format inside a HFileBlock.
272    */
273   HFileBlock(ByteBuffer b, boolean usesHBaseChecksum) throws IOException {
274     b.rewind();
275     blockType = BlockType.read(b);
276     onDiskSizeWithoutHeader = b.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX);
277     uncompressedSizeWithoutHeader = b.getInt(Header.UNCOMPRESSED_SIZE_WITHOUT_HEADER_INDEX);
278     prevBlockOffset = b.getLong(Header.PREV_BLOCK_OFFSET_INDEX);
279     HFileContextBuilder contextBuilder = new HFileContextBuilder();
280     contextBuilder.withHBaseCheckSum(usesHBaseChecksum);
281     if (usesHBaseChecksum) {
282       contextBuilder.withChecksumType(ChecksumType.codeToType(b.get(Header.CHECKSUM_TYPE_INDEX)));
283       contextBuilder.withBytesPerCheckSum(b.getInt(Header.BYTES_PER_CHECKSUM_INDEX));
284       this.onDiskDataSizeWithHeader = b.getInt(Header.ON_DISK_DATA_SIZE_WITH_HEADER_INDEX);
285     } else {
286       contextBuilder.withChecksumType(ChecksumType.NULL);
287       contextBuilder.withBytesPerCheckSum(0);
288       this.onDiskDataSizeWithHeader = onDiskSizeWithoutHeader +
289                                        HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
290     }
291     this.fileContext = contextBuilder.build();
292     buf = b;
293     buf.rewind();
294   }
295 
296   public BlockType getBlockType() {
297     return blockType;
298   }
299 
300   /** @return get data block encoding id that was used to encode this block */
301   public short getDataBlockEncodingId() {
302     if (blockType != BlockType.ENCODED_DATA) {
303       throw new IllegalArgumentException("Querying encoder ID of a block " +
304           "of type other than " + BlockType.ENCODED_DATA + ": " + blockType);
305     }
306     return buf.getShort(headerSize());
307   }
308 
309   /**
310    * @return the on-disk size of header + data part + checksum.
311    */
312   public int getOnDiskSizeWithHeader() {
313     return onDiskSizeWithoutHeader + headerSize();
314   }
315 
316   /**
317    * @return the on-disk size of the data part + checksum (header excluded).
318    */
319   public int getOnDiskSizeWithoutHeader() {
320     return onDiskSizeWithoutHeader;
321   }
322 
323   /**
324    * @return the uncompressed size of data part (header and checksum excluded).
325    */
326    public int getUncompressedSizeWithoutHeader() {
327     return uncompressedSizeWithoutHeader;
328   }
329 
330   /**
331    * @return the offset of the previous block of the same type in the file, or
332    *         -1 if unknown
333    */
334   public long getPrevBlockOffset() {
335     return prevBlockOffset;
336   }
337 
338   /**
339    * Rewinds {@code buf} and writes first 4 header fields. {@code buf} position
340    * is modified as side-effect.
341    */
342   private void overwriteHeader() {
343     buf.rewind();
344     blockType.write(buf);
345     buf.putInt(onDiskSizeWithoutHeader);
346     buf.putInt(uncompressedSizeWithoutHeader);
347     buf.putLong(prevBlockOffset);
348     if (this.fileContext.isUseHBaseChecksum()) {
349       buf.put(fileContext.getChecksumType().getCode());
350       buf.putInt(fileContext.getBytesPerChecksum());
351       buf.putInt(onDiskDataSizeWithHeader);
352     }
353   }
354 
355   /**
356    * Returns a buffer that does not include the header or checksum.
357    *
358    * @return the buffer with header skipped and checksum omitted.
359    */
360   public ByteBuffer getBufferWithoutHeader() {
361     ByteBuffer dup = this.buf.duplicate();
362     dup.position(headerSize());
363     dup.limit(buf.limit() - totalChecksumBytes());
364     return dup.slice();
365   }
366 
367   /**
368    * Returns the buffer this block stores internally. The clients must not
369    * modify the buffer object. This method has to be public because it is
370    * used in {@link org.apache.hadoop.hbase.util.CompoundBloomFilter}
371    * to avoid object creation on every Bloom filter lookup, but has to
372    * be used with caution. Checksum data is not included in the returned
373    * buffer but header data is.
374    *
375    * @return the buffer of this block for read-only operations
376    */
377   public ByteBuffer getBufferReadOnly() {
378     ByteBuffer dup = this.buf.duplicate();
379     dup.limit(buf.limit() - totalChecksumBytes());
380     return dup.slice();
381   }
382 
383   /**
384    * Returns the buffer of this block, including header data. The clients must
385    * not modify the buffer object. This method has to be public because it is
386    * used in {@link org.apache.hadoop.hbase.io.hfile.bucket.BucketCache} to avoid buffer copy.
387    *
388    * @return the buffer with header and checksum included for read-only operations
389    */
390   public ByteBuffer getBufferReadOnlyWithHeader() {
391     ByteBuffer dup = this.buf.duplicate();
392     return dup.slice();
393   }
394 
395   /**
396    * Returns a byte buffer of this block, including header data and checksum, positioned at
397    * the beginning of header. The underlying data array is not copied.
398    *
399    * @return the byte buffer with header and checksum included
400    */
401   ByteBuffer getBufferWithHeader() {
402     ByteBuffer dupBuf = buf.duplicate();
403     dupBuf.rewind();
404     return dupBuf;
405   }
406 
407   private void sanityCheckAssertion(long valueFromBuf, long valueFromField,
408       String fieldName) throws IOException {
409     if (valueFromBuf != valueFromField) {
410       throw new AssertionError(fieldName + " in the buffer (" + valueFromBuf
411           + ") is different from that in the field (" + valueFromField + ")");
412     }
413   }
414 
415   private void sanityCheckAssertion(BlockType valueFromBuf, BlockType valueFromField)
416       throws IOException {
417     if (valueFromBuf != valueFromField) {
418       throw new IOException("Block type stored in the buffer: " +
419         valueFromBuf + ", block type field: " + valueFromField);
420     }
421   }
422 
423   /**
424    * Checks if the block is internally consistent, i.e. the first
425    * {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the buffer contain a
426    * valid header consistent with the fields. Assumes a packed block structure.
427    * This function is primary for testing and debugging, and is not
428    * thread-safe, because it alters the internal buffer pointer.
429    */
430   void sanityCheck() throws IOException {
431     buf.rewind();
432 
433     sanityCheckAssertion(BlockType.read(buf), blockType);
434 
435     sanityCheckAssertion(buf.getInt(), onDiskSizeWithoutHeader,
436         "onDiskSizeWithoutHeader");
437 
438     sanityCheckAssertion(buf.getInt(), uncompressedSizeWithoutHeader,
439         "uncompressedSizeWithoutHeader");
440 
441     sanityCheckAssertion(buf.getLong(), prevBlockOffset, "prevBlocKOffset");
442     if (this.fileContext.isUseHBaseChecksum()) {
443       sanityCheckAssertion(buf.get(), this.fileContext.getChecksumType().getCode(), "checksumType");
444       sanityCheckAssertion(buf.getInt(), this.fileContext.getBytesPerChecksum(), "bytesPerChecksum");
445       sanityCheckAssertion(buf.getInt(), onDiskDataSizeWithHeader, "onDiskDataSizeWithHeader");
446     }
447 
448     int cksumBytes = totalChecksumBytes();
449     int expectedBufLimit = onDiskDataSizeWithHeader + cksumBytes;
450     if (buf.limit() != expectedBufLimit) {
451       throw new AssertionError("Expected buffer limit " + expectedBufLimit
452           + ", got " + buf.limit());
453     }
454 
455     // We might optionally allocate HFILEBLOCK_HEADER_SIZE more bytes to read the next
456     // block's header, so there are two sensible values for buffer capacity.
457     int hdrSize = headerSize();
458     if (buf.capacity() != expectedBufLimit &&
459         buf.capacity() != expectedBufLimit + hdrSize) {
460       throw new AssertionError("Invalid buffer capacity: " + buf.capacity() +
461           ", expected " + expectedBufLimit + " or " + (expectedBufLimit + hdrSize));
462     }
463   }
464 
465   @Override
466   public String toString() {
467     StringBuilder sb = new StringBuilder()
468       .append("HFileBlock [")
469       .append(" fileOffset=").append(offset)
470       .append(" headerSize()=").append(headerSize())
471       .append(" blockType=").append(blockType)
472       .append(" onDiskSizeWithoutHeader=").append(onDiskSizeWithoutHeader)
473       .append(" uncompressedSizeWithoutHeader=").append(uncompressedSizeWithoutHeader)
474       .append(" prevBlockOffset=").append(prevBlockOffset)
475       .append(" isUseHBaseChecksum()=").append(fileContext.isUseHBaseChecksum());
476     if (fileContext.isUseHBaseChecksum()) {
477       sb.append(" checksumType=").append(ChecksumType.codeToType(this.buf.get(24)))
478         .append(" bytesPerChecksum=").append(this.buf.getInt(24 + 1))
479         .append(" onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader);
480     } else {
481       sb.append(" onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader)
482         .append("(").append(onDiskSizeWithoutHeader)
483         .append("+").append(HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM).append(")");
484     }
485     String dataBegin = null;
486     if (buf.hasArray()) {
487       dataBegin = Bytes.toStringBinary(buf.array(), buf.arrayOffset() + headerSize(),
488           Math.min(32, buf.limit() - buf.arrayOffset() - headerSize()));
489     } else {
490       ByteBuffer bufWithoutHeader = getBufferWithoutHeader();
491       byte[] dataBeginBytes = new byte[Math.min(32,
492           bufWithoutHeader.limit() - bufWithoutHeader.position())];
493       bufWithoutHeader.get(dataBeginBytes);
494       dataBegin = Bytes.toStringBinary(dataBeginBytes);
495     }
496     sb.append(" getOnDiskSizeWithHeader()=").append(getOnDiskSizeWithHeader())
497       .append(" totalChecksumBytes()=").append(totalChecksumBytes())
498       .append(" isUnpacked()=").append(isUnpacked())
499       .append(" buf=[ ").append(buf).append(" ]")
500       .append(" dataBeginsWith=").append(dataBegin)
501       .append(" fileContext=").append(fileContext)
502       .append(" ]");
503     return sb.toString();
504   }
505 
506   /**
507    * Called after reading a block with provided onDiskSizeWithHeader.
508    */
509   private static void validateOnDiskSizeWithoutHeader(int expectedOnDiskSizeWithoutHeader,
510       int actualOnDiskSizeWithoutHeader, ByteBuffer buf, long offset) throws IOException {
511     if (actualOnDiskSizeWithoutHeader != expectedOnDiskSizeWithoutHeader) {
512       // We make the read-only copy here instead of when passing the parameter to this function
513       // to make duplicates only in failure cases, instead of every single time.
514       ByteBuffer bufReadOnly = buf.asReadOnlyBuffer();
515       String dataBegin = null;
516       byte[] dataBeginBytes = new byte[Math.min(32, bufReadOnly.limit() - bufReadOnly.position())];
517       bufReadOnly.get(dataBeginBytes);
518       dataBegin = Bytes.toStringBinary(dataBeginBytes);
519       String blockInfoMsg =
520         "Block offset: " + offset + ", data starts with: " + dataBegin;
521       throw new IOException("On-disk size without header provided is "
522           + expectedOnDiskSizeWithoutHeader + ", but block "
523           + "header contains " + actualOnDiskSizeWithoutHeader + ". " +
524           blockInfoMsg);
525     }
526   }
527 
528   /**
529    * Retrieves the decompressed/decrypted view of this block. An encoded block remains in its
530    * encoded structure. Internal structures are shared between instances where applicable.
531    */
532   HFileBlock unpack(HFileContext fileContext, FSReader reader) throws IOException {
533     if (!fileContext.isCompressedOrEncrypted()) {
534       // TODO: cannot use our own fileContext here because HFileBlock(ByteBuffer, boolean),
535       // which is used for block serialization to L2 cache, does not preserve encoding and
536       // encryption details.
537       return this;
538     }
539 
540     HFileBlock unpacked = new HFileBlock(this);
541     unpacked.allocateBuffer(); // allocates space for the decompressed block
542 
543     HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA ?
544       reader.getBlockDecodingContext() : reader.getDefaultBlockDecodingContext();
545 
546     ByteBuffer dup = this.buf.duplicate();
547     dup.position(this.headerSize());
548     dup = dup.slice();
549     ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(),
550       unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(),
551       dup);
552 
553     // Preserve the next block's header bytes in the new block if we have them.
554     if (unpacked.hasNextBlockHeader()) {
555       // Both the buffers are limited till checksum bytes and avoid the next block's header.
556       // Below call to copyFromBufferToBuffer() will try positional read/write from/to buffers when
557       // any of the buffer is DBB. So we change the limit on a dup buffer. No copying just create
558       // new BB objects
559       ByteBuffer inDup = this.buf.duplicate();
560       inDup.limit(inDup.limit() + headerSize());
561       ByteBuffer outDup = unpacked.buf.duplicate();
562       outDup.limit(outDup.limit() + unpacked.headerSize());
563       ByteBufferUtils.copyFromBufferToBuffer(
564           outDup,
565           inDup,
566           this.onDiskDataSizeWithHeader,
567           unpacked.headerSize() + unpacked.uncompressedSizeWithoutHeader
568               + unpacked.totalChecksumBytes(), unpacked.headerSize());
569     }
570     return unpacked;
571   }
572 
573   /**
574    * Return true when this buffer includes next block's header.
575    */
576   private boolean hasNextBlockHeader() {
577     return nextBlockOnDiskSizeWithHeader > 0;
578   }
579 
580   /**
581    * Always allocates a new buffer of the correct size. Copies header bytes
582    * from the existing buffer. Does not change header fields.
583    * Reserve room to keep checksum bytes too.
584    */
585   private void allocateBuffer() {
586     int cksumBytes = totalChecksumBytes();
587     int headerSize = headerSize();
588     int capacityNeeded = headerSize + uncompressedSizeWithoutHeader +
589         cksumBytes + (hasNextBlockHeader() ? headerSize : 0);
590 
591     // TODO we need consider allocating offheap here?
592     ByteBuffer newBuf = ByteBuffer.allocate(capacityNeeded);
593 
594     // Copy header bytes into newBuf.
595     // newBuf is HBB so no issue in calling array()
596     ByteBuffer dup = buf.duplicate();
597     dup.position(0);
598     dup.get(newBuf.array(), newBuf.arrayOffset(), headerSize);
599 
600     buf = newBuf;
601     // set limit to exclude next block's header
602     buf.limit(headerSize + uncompressedSizeWithoutHeader + cksumBytes);
603   }
604 
605   /**
606    * Return true when this block's buffer has been unpacked, false otherwise. Note this is a
607    * calculated heuristic, not tracked attribute of the block.
608    */
609   public boolean isUnpacked() {
610     final int cksumBytes = totalChecksumBytes();
611     final int headerSize = headerSize();
612     final int expectedCapacity = headerSize + uncompressedSizeWithoutHeader + cksumBytes;
613     final int bufCapacity = buf.capacity();
614     return bufCapacity == expectedCapacity || bufCapacity == expectedCapacity + headerSize;
615   }
616 
617   /** An additional sanity-check in case no compression or encryption is being used. */
618   public static void verifyUncompressed(ByteBuffer buf, boolean useHBaseChecksum)
619       throws IOException {
620     int onDiskSizeWithoutHeader = buf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX);
621     int uncompressedSizeWithoutHeader = buf.getInt(Header.UNCOMPRESSED_SIZE_WITHOUT_HEADER_INDEX);
622     int onDiskDataSizeWithHeader;
623     int checksumBytes = 0;
624     if (useHBaseChecksum) {
625       onDiskDataSizeWithHeader = buf.getInt(Header.ON_DISK_DATA_SIZE_WITH_HEADER_INDEX);
626       checksumBytes = (int) ChecksumUtil.numBytes(onDiskDataSizeWithHeader,
627           buf.getInt(Header.BYTES_PER_CHECKSUM_INDEX));
628     }
629 
630     if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader + checksumBytes) {
631       throw new IOException("Using no compression but "
632           + "onDiskSizeWithoutHeader=" + onDiskSizeWithoutHeader + ", "
633           + "uncompressedSizeWithoutHeader=" + uncompressedSizeWithoutHeader
634           + ", numChecksumbytes=" + checksumBytes);
635     }
636   }
637 
638   /**
639    * @param expectedType the expected type of this block
640    * @throws IOException if this block's type is different than expected
641    */
642   public void expectType(BlockType expectedType) throws IOException {
643     if (blockType != expectedType) {
644       throw new IOException("Invalid block type: expected=" + expectedType
645           + ", actual=" + blockType);
646     }
647   }
648 
649   /** @return the offset of this block in the file it was read from */
650   public long getOffset() {
651     if (offset < 0) {
652       throw new IllegalStateException(
653           "HFile block offset not initialized properly");
654     }
655     return offset;
656   }
657 
658   /**
659    * @return a byte stream reading the data + checksum of this block
660    */
661   public DataInputStream getByteStream() {
662     ByteBuffer dup = this.buf.duplicate();
663     dup.position(this.headerSize());
664     return new DataInputStream(new ByteBufferInputStream(dup));
665   }
666 
667   @Override
668   public long heapSize() {
669     long size = ClassSize.align(
670         ClassSize.OBJECT +
671         // Block type, byte buffer and meta references
672         3 * ClassSize.REFERENCE +
673         // On-disk size, uncompressed size, and next block's on-disk size
674         // bytePerChecksum and onDiskDataSize
675         4 * Bytes.SIZEOF_INT +
676         // This and previous block offset
677         2 * Bytes.SIZEOF_LONG +
678         // Heap size of the meta object. meta will be always not null.
679         fileContext.heapSize()
680     );
681 
682     if (buf != null) {
683       // Deep overhead of the byte buffer. Needs to be aligned separately.
684       size += ClassSize.align(buf.capacity() + BYTE_BUFFER_HEAP_SIZE);
685     }
686 
687     return ClassSize.align(size);
688   }
689 
690   /**
691    * Read from an input stream. Analogous to
692    * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but specifies a
693    * number of "extra" bytes that would be desirable but not absolutely
694    * necessary to read.
695    *
696    * @param in the input stream to read from
697    * @param buf the buffer to read into
698    * @param bufOffset the destination offset in the buffer
699    * @param necessaryLen the number of bytes that are absolutely necessary to
700    *          read
701    * @param extraLen the number of extra bytes that would be nice to read
702    * @return true if succeeded reading the extra bytes
703    * @throws IOException if failed to read the necessary bytes
704    */
705   public static boolean readWithExtra(InputStream in, byte[] buf,
706       int bufOffset, int necessaryLen, int extraLen) throws IOException {
707     int bytesRemaining = necessaryLen + extraLen;
708     while (bytesRemaining > 0) {
709       int ret = in.read(buf, bufOffset, bytesRemaining);
710       if (ret == -1 && bytesRemaining <= extraLen) {
711         // We could not read the "extra data", but that is OK.
712         break;
713       }
714 
715       if (ret < 0) {
716         throw new IOException("Premature EOF from inputStream (read "
717             + "returned " + ret + ", was trying to read " + necessaryLen
718             + " necessary bytes and " + extraLen + " extra bytes, "
719             + "successfully read "
720             + (necessaryLen + extraLen - bytesRemaining));
721       }
722       bufOffset += ret;
723       bytesRemaining -= ret;
724     }
725     return bytesRemaining <= 0;
726   }
727 
728   /**
729    * Read from an input stream. Analogous to
730    * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but uses
731    * positional read and specifies a number of "extra" bytes that would be
732    * desirable but not absolutely necessary to read.
733    *
734    * @param in the input stream to read from
735    * @param position the position within the stream from which to start reading
736    * @param buf the buffer to read into
737    * @param bufOffset the destination offset in the buffer
738    * @param necessaryLen the number of bytes that are absolutely necessary to
739    *     read
740    * @param extraLen the number of extra bytes that would be nice to read
741    * @return true if and only if extraLen is > 0 and reading those extra bytes
742    *     was successful
743    * @throws IOException if failed to read the necessary bytes
744    */
745   @VisibleForTesting
746   static boolean positionalReadWithExtra(FSDataInputStream in,
747       long position, byte[] buf, int bufOffset, int necessaryLen, int extraLen)
748       throws IOException {
749     int bytesRemaining = necessaryLen + extraLen;
750     int bytesRead = 0;
751     while (bytesRead < necessaryLen) {
752       int ret = in.read(position, buf, bufOffset, bytesRemaining);
753       if (ret < 0) {
754         throw new IOException("Premature EOF from inputStream (positional read "
755             + "returned " + ret + ", was trying to read " + necessaryLen
756             + " necessary bytes and " + extraLen + " extra bytes, "
757             + "successfully read " + bytesRead);
758       }
759       position += ret;
760       bufOffset += ret;
761       bytesRemaining -= ret;
762       bytesRead += ret;
763     }
764     return bytesRead != necessaryLen && bytesRemaining <= 0;
765   }
766 
767   /**
768    * @return the on-disk size of the next block (including the header size)
769    *         that was read by peeking into the next block's header
770    */
771   public int getNextBlockOnDiskSizeWithHeader() {
772     return nextBlockOnDiskSizeWithHeader;
773   }
774 
775   /**
776    * Unified version 2 {@link HFile} block writer. The intended usage pattern
777    * is as follows:
778    * <ol>
779    * <li>Construct an {@link HFileBlock.Writer}, providing a compression algorithm.
780    * <li>Call {@link Writer#startWriting} and get a data stream to write to.
781    * <li>Write your data into the stream.
782    * <li>Call {@link Writer#writeHeaderAndData(FSDataOutputStream)} as many times as you need to.
783    * store the serialized block into an external stream.
784    * <li>Repeat to write more blocks.
785    * </ol>
786    * <p>
787    */
788   public static class Writer {
789 
790     private enum State {
791       INIT,
792       WRITING,
793       BLOCK_READY
794     };
795 
796     /** Writer state. Used to ensure the correct usage protocol. */
797     private State state = State.INIT;
798 
799     /** Data block encoder used for data blocks */
800     private final HFileDataBlockEncoder dataBlockEncoder;
801 
802     private HFileBlockEncodingContext dataBlockEncodingCtx;
803 
804     /** block encoding context for non-data blocks */
805     private HFileBlockDefaultEncodingContext defaultBlockEncodingCtx;
806 
807     /**
808      * The stream we use to accumulate data in uncompressed format for each
809      * block. We reset this stream at the end of each block and reuse it. The
810      * header is written as the first {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes into this
811      * stream.
812      */
813     private ByteArrayOutputStream baosInMemory;
814 
815     /**
816      * Current block type. Set in {@link #startWriting(BlockType)}. Could be
817      * changed in {@link #finishBlock()} from {@link BlockType#DATA}
818      * to {@link BlockType#ENCODED_DATA}.
819      */
820     private BlockType blockType;
821 
822     /**
823      * A stream that we write uncompressed bytes to, which compresses them and
824      * writes them to {@link #baosInMemory}.
825      */
826     private DataOutputStream userDataStream;
827 
828     // Size of actual data being written. Not considering the block encoding/compression. This
829     // includes the header size also.
830     private int unencodedDataSizeWritten;
831 
832     /**
833      * Bytes to be written to the file system, including the header. Compressed
834      * if compression is turned on. It also includes the checksum data that
835      * immediately follows the block data. (header + data + checksums)
836      */
837     private byte[] onDiskBytesWithHeader;
838 
839     /**
840      * The size of the checksum data on disk. It is used only if data is
841      * not compressed. If data is compressed, then the checksums are already
842      * part of onDiskBytesWithHeader. If data is uncompressed, then this
843      * variable stores the checksum data for this block.
844      */
845     private byte[] onDiskChecksum;
846 
847     /**
848      * Valid in the READY state. Contains the header and the uncompressed (but
849      * potentially encoded, if this is a data block) bytes, so the length is
850      * {@link #uncompressedSizeWithoutHeader} +
851      * {@link org.apache.hadoop.hbase.HConstants#HFILEBLOCK_HEADER_SIZE}.
852      * Does not store checksums.
853      */
854     private byte[] uncompressedBytesWithHeader;
855 
856     /**
857      * Current block's start offset in the {@link HFile}. Set in
858      * {@link #writeHeaderAndData(FSDataOutputStream)}.
859      */
860     private long startOffset;
861 
862     /**
863      * Offset of previous block by block type. Updated when the next block is
864      * started.
865      */
866     private long[] prevOffsetByType;
867 
868     /** The offset of the previous block of the same type */
869     private long prevOffset;
870     /** Meta data that holds information about the hfileblock**/
871     private HFileContext fileContext;
872 
873     /**
874      * @param dataBlockEncoder data block encoding algorithm to use
875      */
876     public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext fileContext) {
877       this.dataBlockEncoder = dataBlockEncoder != null
878           ? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE;
879       defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(null,
880           HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
881       dataBlockEncodingCtx = this.dataBlockEncoder
882           .newDataBlockEncodingContext(HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
883 
884       if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) {
885         throw new RuntimeException("Unsupported value of bytesPerChecksum. " +
886             " Minimum is " + HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is " +
887             fileContext.getBytesPerChecksum());
888       }
889 
890       baosInMemory = new ByteArrayOutputStream();
891 
892       prevOffsetByType = new long[BlockType.values().length];
893       for (int i = 0; i < prevOffsetByType.length; ++i)
894         prevOffsetByType[i] = -1;
895 
896       this.fileContext = fileContext;
897     }
898 
899     /**
900      * Starts writing into the block. The previous block's data is discarded.
901      *
902      * @return the stream the user can write their data into
903      * @throws IOException
904      */
905     public DataOutputStream startWriting(BlockType newBlockType)
906         throws IOException {
907       if (state == State.BLOCK_READY && startOffset != -1) {
908         // We had a previous block that was written to a stream at a specific
909         // offset. Save that offset as the last offset of a block of that type.
910         prevOffsetByType[blockType.getId()] = startOffset;
911       }
912 
913       startOffset = -1;
914       blockType = newBlockType;
915 
916       baosInMemory.reset();
917       baosInMemory.write(HConstants.HFILEBLOCK_DUMMY_HEADER);
918 
919       state = State.WRITING;
920 
921       // We will compress it later in finishBlock()
922       userDataStream = new DataOutputStream(baosInMemory);
923       if (newBlockType == BlockType.DATA) {
924         this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream);
925       }
926       this.unencodedDataSizeWritten = 0;
927       return userDataStream;
928     }
929 
930     /**
931      * Writes the Cell to this block
932      * @param cell
933      * @throws IOException
934      */
935     public void write(Cell cell) throws IOException{
936       expectState(State.WRITING);
937       this.unencodedDataSizeWritten += this.dataBlockEncoder.encode(cell, dataBlockEncodingCtx,
938           this.userDataStream);
939     }
940 
941     /**
942      * Returns the stream for the user to write to. The block writer takes care
943      * of handling compression and buffering for caching on write. Can only be
944      * called in the "writing" state.
945      *
946      * @return the data output stream for the user to write to
947      */
948     DataOutputStream getUserDataStream() {
949       expectState(State.WRITING);
950       return userDataStream;
951     }
952 
953     /**
954      * Transitions the block writer from the "writing" state to the "block
955      * ready" state.  Does nothing if a block is already finished.
956      */
957     void ensureBlockReady() throws IOException {
958       Preconditions.checkState(state != State.INIT,
959           "Unexpected state: " + state);
960 
961       if (state == State.BLOCK_READY)
962         return;
963 
964       // This will set state to BLOCK_READY.
965       finishBlock();
966     }
967 
968     /**
969      * An internal method that flushes the compressing stream (if using
970      * compression), serializes the header, and takes care of the separate
971      * uncompressed stream for caching on write, if applicable. Sets block
972      * write state to "block ready".
973      */
974     private void finishBlock() throws IOException {
975       if (blockType == BlockType.DATA) {
976         BufferGrabbingByteArrayOutputStream baosInMemoryCopy =
977             new BufferGrabbingByteArrayOutputStream();
978         baosInMemory.writeTo(baosInMemoryCopy);
979         this.dataBlockEncoder.endBlockEncoding(dataBlockEncodingCtx, userDataStream,
980             baosInMemoryCopy.buf, blockType);
981         blockType = dataBlockEncodingCtx.getBlockType();
982       }
983       userDataStream.flush();
984       // This does an array copy, so it is safe to cache this byte array.
985       uncompressedBytesWithHeader = baosInMemory.toByteArray();
986       prevOffset = prevOffsetByType[blockType.getId()];
987 
988       // We need to set state before we can package the block up for
989       // cache-on-write. In a way, the block is ready, but not yet encoded or
990       // compressed.
991       state = State.BLOCK_READY;
992       if (blockType == BlockType.DATA || blockType == BlockType.ENCODED_DATA) {
993         onDiskBytesWithHeader = dataBlockEncodingCtx
994             .compressAndEncrypt(uncompressedBytesWithHeader);
995       } else {
996         onDiskBytesWithHeader = defaultBlockEncodingCtx
997             .compressAndEncrypt(uncompressedBytesWithHeader);
998       }
999       int numBytes = (int) ChecksumUtil.numBytes(
1000           onDiskBytesWithHeader.length,
1001           fileContext.getBytesPerChecksum());
1002 
1003       // put the header for on disk bytes
1004       putHeader(onDiskBytesWithHeader, 0,
1005           onDiskBytesWithHeader.length + numBytes,
1006           uncompressedBytesWithHeader.length, onDiskBytesWithHeader.length);
1007       // set the header for the uncompressed bytes (for cache-on-write)
1008       putHeader(uncompressedBytesWithHeader, 0,
1009           onDiskBytesWithHeader.length + numBytes,
1010           uncompressedBytesWithHeader.length, onDiskBytesWithHeader.length);
1011 
1012       onDiskChecksum = new byte[numBytes];
1013       ChecksumUtil.generateChecksums(
1014           onDiskBytesWithHeader, 0, onDiskBytesWithHeader.length,
1015           onDiskChecksum, 0, fileContext.getChecksumType(), fileContext.getBytesPerChecksum());
1016     }
1017 
1018     public static class BufferGrabbingByteArrayOutputStream extends ByteArrayOutputStream {
1019       private byte[] buf;
1020 
1021       @Override
1022       public void write(byte[] b, int off, int len) {
1023         this.buf = b;
1024       }
1025 
1026       public byte[] getBuffer() {
1027         return this.buf;
1028       }
1029     }
1030 
1031     /**
1032      * Put the header into the given byte array at the given offset.
1033      * @param onDiskSize size of the block on disk header + data + checksum
1034      * @param uncompressedSize size of the block after decompression (but
1035      *          before optional data block decoding) including header
1036      * @param onDiskDataSize size of the block on disk with header
1037      *        and data but not including the checksums
1038      */
1039     private void putHeader(byte[] dest, int offset, int onDiskSize,
1040         int uncompressedSize, int onDiskDataSize) {
1041       offset = blockType.put(dest, offset);
1042       offset = Bytes.putInt(dest, offset, onDiskSize - HConstants.HFILEBLOCK_HEADER_SIZE);
1043       offset = Bytes.putInt(dest, offset, uncompressedSize - HConstants.HFILEBLOCK_HEADER_SIZE);
1044       offset = Bytes.putLong(dest, offset, prevOffset);
1045       offset = Bytes.putByte(dest, offset, fileContext.getChecksumType().getCode());
1046       offset = Bytes.putInt(dest, offset, fileContext.getBytesPerChecksum());
1047       Bytes.putInt(dest, offset, onDiskDataSize);
1048     }
1049 
1050     /**
1051      * Similar to {@link #writeHeaderAndData(FSDataOutputStream)}, but records
1052      * the offset of this block so that it can be referenced in the next block
1053      * of the same type.
1054      *
1055      * @param out
1056      * @throws IOException
1057      */
1058     public void writeHeaderAndData(FSDataOutputStream out) throws IOException {
1059       long offset = out.getPos();
1060       if (startOffset != -1 && offset != startOffset) {
1061         throw new IOException("A " + blockType + " block written to a "
1062             + "stream twice, first at offset " + startOffset + ", then at "
1063             + offset);
1064       }
1065       startOffset = offset;
1066 
1067       finishBlockAndWriteHeaderAndData((DataOutputStream) out);
1068     }
1069 
1070     /**
1071      * Writes the header and the compressed data of this block (or uncompressed
1072      * data when not using compression) into the given stream. Can be called in
1073      * the "writing" state or in the "block ready" state. If called in the
1074      * "writing" state, transitions the writer to the "block ready" state.
1075      *
1076      * @param out the output stream to write the
1077      * @throws IOException
1078      */
1079     protected void finishBlockAndWriteHeaderAndData(DataOutputStream out)
1080       throws IOException {
1081       ensureBlockReady();
1082       out.write(onDiskBytesWithHeader);
1083       out.write(onDiskChecksum);
1084     }
1085 
1086     /**
1087      * Returns the header or the compressed data (or uncompressed data when not
1088      * using compression) as a byte array. Can be called in the "writing" state
1089      * or in the "block ready" state. If called in the "writing" state,
1090      * transitions the writer to the "block ready" state. This returns
1091      * the header + data + checksums stored on disk.
1092      *
1093      * @return header and data as they would be stored on disk in a byte array
1094      * @throws IOException
1095      */
1096     byte[] getHeaderAndDataForTest() throws IOException {
1097       ensureBlockReady();
1098       // This is not very optimal, because we are doing an extra copy.
1099       // But this method is used only by unit tests.
1100       byte[] output =
1101           new byte[onDiskBytesWithHeader.length
1102               + onDiskChecksum.length];
1103       System.arraycopy(onDiskBytesWithHeader, 0, output, 0,
1104           onDiskBytesWithHeader.length);
1105       System.arraycopy(onDiskChecksum, 0, output,
1106           onDiskBytesWithHeader.length, onDiskChecksum.length);
1107       return output;
1108     }
1109 
1110     /**
1111      * Releases resources used by this writer.
1112      */
1113     public void release() {
1114       if (dataBlockEncodingCtx != null) {
1115         dataBlockEncodingCtx.close();
1116         dataBlockEncodingCtx = null;
1117       }
1118       if (defaultBlockEncodingCtx != null) {
1119         defaultBlockEncodingCtx.close();
1120         defaultBlockEncodingCtx = null;
1121       }
1122     }
1123 
1124     /**
1125      * Returns the on-disk size of the data portion of the block. This is the
1126      * compressed size if compression is enabled. Can only be called in the
1127      * "block ready" state. Header is not compressed, and its size is not
1128      * included in the return value.
1129      *
1130      * @return the on-disk size of the block, not including the header.
1131      */
1132     int getOnDiskSizeWithoutHeader() {
1133       expectState(State.BLOCK_READY);
1134       return onDiskBytesWithHeader.length
1135           + onDiskChecksum.length
1136           - HConstants.HFILEBLOCK_HEADER_SIZE;
1137     }
1138 
1139     /**
1140      * Returns the on-disk size of the block. Can only be called in the
1141      * "block ready" state.
1142      *
1143      * @return the on-disk size of the block ready to be written, including the
1144      *         header size, the data and the checksum data.
1145      */
1146     int getOnDiskSizeWithHeader() {
1147       expectState(State.BLOCK_READY);
1148       return onDiskBytesWithHeader.length + onDiskChecksum.length;
1149     }
1150 
1151     /**
1152      * The uncompressed size of the block data. Does not include header size.
1153      */
1154     int getUncompressedSizeWithoutHeader() {
1155       expectState(State.BLOCK_READY);
1156       return uncompressedBytesWithHeader.length - HConstants.HFILEBLOCK_HEADER_SIZE;
1157     }
1158 
1159     /**
1160      * The uncompressed size of the block data, including header size.
1161      */
1162     int getUncompressedSizeWithHeader() {
1163       expectState(State.BLOCK_READY);
1164       return uncompressedBytesWithHeader.length;
1165     }
1166 
1167     /** @return true if a block is being written  */
1168     public boolean isWriting() {
1169       return state == State.WRITING;
1170     }
1171 
1172     /**
1173      * Returns the number of bytes written into the current block so far, or
1174      * zero if not writing the block at the moment. Note that this will return
1175      * zero in the "block ready" state as well.
1176      *
1177      * @return the number of bytes written
1178      */
1179     public int blockSizeWritten() {
1180       if (state != State.WRITING) return 0;
1181       return this.unencodedDataSizeWritten;
1182     }
1183 
1184     /**
1185      * Returns the header followed by the uncompressed data, even if using
1186      * compression. This is needed for storing uncompressed blocks in the block
1187      * cache. Can be called in the "writing" state or the "block ready" state.
1188      * Returns only the header and data, does not include checksum data.
1189      *
1190      * @return uncompressed block bytes for caching on write
1191      */
1192     ByteBuffer getUncompressedBufferWithHeader() {
1193       expectState(State.BLOCK_READY);
1194       return ByteBuffer.wrap(uncompressedBytesWithHeader);
1195     }
1196 
1197     /**
1198      * Returns the header followed by the on-disk (compressed/encoded/encrypted) data. This is
1199      * needed for storing packed blocks in the block cache. Expects calling semantics identical to
1200      * {@link #getUncompressedBufferWithHeader()}. Returns only the header and data,
1201      * Does not include checksum data.
1202      *
1203      * @return packed block bytes for caching on write
1204      */
1205     ByteBuffer getOnDiskBufferWithHeader() {
1206       expectState(State.BLOCK_READY);
1207       return ByteBuffer.wrap(onDiskBytesWithHeader);
1208     }
1209 
1210     private void expectState(State expectedState) {
1211       if (state != expectedState) {
1212         throw new IllegalStateException("Expected state: " + expectedState +
1213             ", actual state: " + state);
1214       }
1215     }
1216 
1217     /**
1218      * Takes the given {@link BlockWritable} instance, creates a new block of
1219      * its appropriate type, writes the writable into this block, and flushes
1220      * the block into the output stream. The writer is instructed not to buffer
1221      * uncompressed bytes for cache-on-write.
1222      *
1223      * @param bw the block-writable object to write as a block
1224      * @param out the file system output stream
1225      * @throws IOException
1226      */
1227     public void writeBlock(BlockWritable bw, FSDataOutputStream out)
1228         throws IOException {
1229       bw.writeToBlock(startWriting(bw.getBlockType()));
1230       writeHeaderAndData(out);
1231     }
1232 
1233     /**
1234      * Creates a new HFileBlock. Checksums have already been validated, so
1235      * the byte buffer passed into the constructor of this newly created
1236      * block does not have checksum data even though the header minor
1237      * version is MINOR_VERSION_WITH_CHECKSUM. This is indicated by setting a
1238      * 0 value in bytesPerChecksum.
1239      */
1240     public HFileBlock getBlockForCaching(CacheConfig cacheConf) {
1241       HFileContext newContext = new HFileContextBuilder()
1242                                 .withBlockSize(fileContext.getBlocksize())
1243                                 .withBytesPerCheckSum(0)
1244                                 .withChecksumType(ChecksumType.NULL) // no checksums in cached data
1245                                 .withCompression(fileContext.getCompression())
1246                                 .withDataBlockEncoding(fileContext.getDataBlockEncoding())
1247                                 .withHBaseCheckSum(fileContext.isUseHBaseChecksum())
1248                                 .withCompressTags(fileContext.isCompressTags())
1249                                 .withIncludesMvcc(fileContext.isIncludesMvcc())
1250                                 .withIncludesTags(fileContext.isIncludesTags())
1251                                 .build();
1252       return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(),
1253           getUncompressedSizeWithoutHeader(), prevOffset,
1254           cacheConf.shouldCacheCompressed(blockType.getCategory()) ?
1255             getOnDiskBufferWithHeader() :
1256             getUncompressedBufferWithHeader(),
1257           FILL_HEADER, startOffset,
1258           onDiskBytesWithHeader.length + onDiskChecksum.length, newContext);
1259     }
1260   }
1261 
1262   /** Something that can be written into a block. */
1263   public interface BlockWritable {
1264 
1265     /** The type of block this data should use. */
1266     BlockType getBlockType();
1267 
1268     /**
1269      * Writes the block to the provided stream. Must not write any magic
1270      * records.
1271      *
1272      * @param out a stream to write uncompressed data into
1273      */
1274     void writeToBlock(DataOutput out) throws IOException;
1275   }
1276 
1277   // Block readers and writers
1278 
1279   /** An interface allowing to iterate {@link HFileBlock}s. */
1280   public interface BlockIterator {
1281 
1282     /**
1283      * Get the next block, or null if there are no more blocks to iterate.
1284      */
1285     HFileBlock nextBlock() throws IOException;
1286 
1287     /**
1288      * Similar to {@link #nextBlock()} but checks block type, throws an
1289      * exception if incorrect, and returns the HFile block
1290      */
1291     HFileBlock nextBlockWithBlockType(BlockType blockType) throws IOException;
1292   }
1293 
1294   /** A full-fledged reader with iteration ability. */
1295   public interface FSReader {
1296 
1297     /**
1298      * Reads the block at the given offset in the file with the given on-disk
1299      * size and uncompressed size.
1300      *
1301      * @param offset
1302      * @param onDiskSize the on-disk size of the entire block, including all
1303      *          applicable headers, or -1 if unknown
1304      * @param uncompressedSize the uncompressed size of the compressed part of
1305      *          the block, or -1 if unknown
1306      * @return the newly read block
1307      */
1308     HFileBlock readBlockData(long offset, long onDiskSize,
1309         int uncompressedSize, boolean pread) throws IOException;
1310 
1311     /**
1312      * Creates a block iterator over the given portion of the {@link HFile}.
1313      * The iterator returns blocks starting with offset such that offset &lt;=
1314      * startOffset &lt; endOffset. Returned blocks are always unpacked.
1315      *
1316      * @param startOffset the offset of the block to start iteration with
1317      * @param endOffset the offset to end iteration at (exclusive)
1318      * @return an iterator of blocks between the two given offsets
1319      */
1320     BlockIterator blockRange(long startOffset, long endOffset);
1321 
1322     /** Closes the backing streams */
1323     void closeStreams() throws IOException;
1324 
1325     /** Get a decoder for {@link BlockType#ENCODED_DATA} blocks from this file. */
1326     HFileBlockDecodingContext getBlockDecodingContext();
1327 
1328     /** Get the default decoder for blocks from this file. */
1329     HFileBlockDecodingContext getDefaultBlockDecodingContext();
1330 
1331     /**
1332      * To close the stream's socket. Note: This can be concurrently called from multiple threads and
1333      * implementation should take care of thread safety.
1334      */
1335     void unbufferStream();
1336   }
1337 
1338   /**
1339    * A common implementation of some methods of {@link FSReader} and some
1340    * tools for implementing HFile format version-specific block readers.
1341    */
1342   private abstract static class AbstractFSReader implements FSReader {
1343     /** Compression algorithm used by the {@link HFile} */
1344 
1345     /** The size of the file we are reading from, or -1 if unknown. */
1346     protected long fileSize;
1347 
1348     /** The size of the header */
1349     protected final int hdrSize;
1350 
1351     /** The filesystem used to access data */
1352     protected HFileSystem hfs;
1353 
1354     /** The path (if any) where this data is coming from */
1355     protected Path path;
1356 
1357     protected final Lock streamLock = new ReentrantLock();
1358 
1359     /** The default buffer size for our buffered streams */
1360     public static final int DEFAULT_BUFFER_SIZE = 1 << 20;
1361 
1362     protected HFileContext fileContext;
1363 
1364     public AbstractFSReader(long fileSize, HFileSystem hfs, Path path, HFileContext fileContext)
1365         throws IOException {
1366       this.fileSize = fileSize;
1367       this.hfs = hfs;
1368       this.path = path;
1369       this.fileContext = fileContext;
1370       this.hdrSize = headerSize(fileContext.isUseHBaseChecksum());
1371     }
1372 
1373     @Override
1374     public BlockIterator blockRange(final long startOffset,
1375         final long endOffset) {
1376       final FSReader owner = this; // handle for inner class
1377       return new BlockIterator() {
1378         private long offset = startOffset;
1379 
1380         @Override
1381         public HFileBlock nextBlock() throws IOException {
1382           if (offset >= endOffset)
1383             return null;
1384           HFileBlock b = readBlockData(offset, -1, -1, false);
1385           offset += b.getOnDiskSizeWithHeader();
1386           return b.unpack(fileContext, owner);
1387         }
1388 
1389         @Override
1390         public HFileBlock nextBlockWithBlockType(BlockType blockType)
1391             throws IOException {
1392           HFileBlock blk = nextBlock();
1393           if (blk.getBlockType() != blockType) {
1394             throw new IOException("Expected block of type " + blockType
1395                 + " but found " + blk.getBlockType());
1396           }
1397           return blk;
1398         }
1399       };
1400     }
1401 
1402     /**
1403      * Does a positional read or a seek and read into the given buffer. Returns
1404      * the on-disk size of the next block, or -1 if it could not be determined.
1405      *
1406      * @param dest destination buffer
1407      * @param destOffset offset in the destination buffer
1408      * @param size size of the block to be read
1409      * @param peekIntoNextBlock whether to read the next block's on-disk size
1410      * @param fileOffset position in the stream to read at
1411      * @param pread whether we should do a positional read
1412      * @param istream The input source of data
1413      * @return the on-disk size of the next block with header size included, or
1414      *         -1 if it could not be determined
1415      * @throws IOException
1416      */
1417     protected int readAtOffset(FSDataInputStream istream, byte[] dest, int destOffset, int size,
1418         boolean peekIntoNextBlock, long fileOffset, boolean pread) throws IOException {
1419       if (peekIntoNextBlock &&
1420           destOffset + size + hdrSize > dest.length) {
1421         // We are asked to read the next block's header as well, but there is
1422         // not enough room in the array.
1423         throw new IOException("Attempted to read " + size + " bytes and " +
1424             hdrSize + " bytes of next header into a " + dest.length +
1425             "-byte array at offset " + destOffset);
1426       }
1427 
1428       if (!pread && streamLock.tryLock()) {
1429         // Seek + read. Better for scanning.
1430         try {
1431           HFileUtil.seekOnMultipleSources(istream, fileOffset);
1432 
1433           long realOffset = istream.getPos();
1434           if (realOffset != fileOffset) {
1435             throw new IOException("Tried to seek to " + fileOffset + " to "
1436                 + "read " + size + " bytes, but pos=" + realOffset
1437                 + " after seek");
1438           }
1439 
1440           if (!peekIntoNextBlock) {
1441             IOUtils.readFully(istream, dest, destOffset, size);
1442             return -1;
1443           }
1444 
1445           // Try to read the next block header.
1446           if (!readWithExtra(istream, dest, destOffset, size, hdrSize))
1447             return -1;
1448         } finally {
1449           streamLock.unlock();
1450         }
1451       } else {
1452         // Positional read. Better for random reads; or when the streamLock is already locked.
1453         int extraSize = peekIntoNextBlock ? hdrSize : 0;
1454         if (!positionalReadWithExtra(istream, fileOffset, dest, destOffset,
1455             size, extraSize)) {
1456           return -1;
1457         }
1458       }
1459 
1460       assert peekIntoNextBlock;
1461       return Bytes.toInt(dest, destOffset + size + BlockType.MAGIC_LENGTH) + hdrSize;
1462     }
1463 
1464   }
1465 
1466   /**
1467    * We always prefetch the header of the next block, so that we know its
1468    * on-disk size in advance and can read it in one operation.
1469    */
1470   private static class PrefetchedHeader {
1471     long offset = -1;
1472     byte[] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE];
1473     final ByteBuffer buf = ByteBuffer.wrap(header, 0, HConstants.HFILEBLOCK_HEADER_SIZE);
1474   }
1475 
1476   /** Reads version 2 blocks from the filesystem. */
1477   static class FSReaderImpl extends AbstractFSReader {
1478     /** The file system stream of the underlying {@link HFile} that
1479      * does or doesn't do checksum validations in the filesystem */
1480     protected FSDataInputStreamWrapper streamWrapper;
1481 
1482     private HFileBlockDecodingContext encodedBlockDecodingCtx;
1483 
1484     /** Default context used when BlockType != {@link BlockType#ENCODED_DATA}. */
1485     private final HFileBlockDefaultDecodingContext defaultDecodingCtx;
1486 
1487     private ThreadLocal<PrefetchedHeader> prefetchedHeaderForThread =
1488         new ThreadLocal<PrefetchedHeader>() {
1489           @Override
1490           public PrefetchedHeader initialValue() {
1491             return new PrefetchedHeader();
1492           }
1493         };
1494 
1495     public FSReaderImpl(FSDataInputStreamWrapper stream, long fileSize, HFileSystem hfs, Path path,
1496         HFileContext fileContext) throws IOException {
1497       super(fileSize, hfs, path, fileContext);
1498       this.streamWrapper = stream;
1499       // Older versions of HBase didn't support checksum.
1500       this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum());
1501       defaultDecodingCtx = new HFileBlockDefaultDecodingContext(fileContext);
1502       encodedBlockDecodingCtx = defaultDecodingCtx;
1503     }
1504 
1505     /**
1506      * A constructor that reads files with the latest minor version.
1507      * This is used by unit tests only.
1508      */
1509     FSReaderImpl(FSDataInputStream istream, long fileSize, HFileContext fileContext)
1510     throws IOException {
1511       this(new FSDataInputStreamWrapper(istream), fileSize, null, null, fileContext);
1512     }
1513 
1514     /**
1515      * Reads a version 2 block (version 1 blocks not supported and not expected). Tries to do as
1516      * little memory allocation as possible, using the provided on-disk size.
1517      *
1518      * @param offset the offset in the stream to read at
1519      * @param onDiskSizeWithHeaderL the on-disk size of the block, including
1520      *          the header, or -1 if unknown
1521      * @param uncompressedSize the uncompressed size of the the block. Always
1522      *          expected to be -1. This parameter is only used in version 1.
1523      * @param pread whether to use a positional read
1524      */
1525     @Override
1526     public HFileBlock readBlockData(long offset, long onDiskSizeWithHeaderL,
1527         int uncompressedSize, boolean pread)
1528     throws IOException {
1529 
1530       // get a copy of the current state of whether to validate
1531       // hbase checksums or not for this read call. This is not
1532       // thread-safe but the one constaint is that if we decide
1533       // to skip hbase checksum verification then we are
1534       // guaranteed to use hdfs checksum verification.
1535       boolean doVerificationThruHBaseChecksum = streamWrapper.shouldUseHBaseChecksum();
1536       FSDataInputStream is = streamWrapper.getStream(doVerificationThruHBaseChecksum);
1537 
1538       HFileBlock blk = readBlockDataInternal(is, offset, (int) onDiskSizeWithHeaderL,
1539           uncompressedSize, pread, doVerificationThruHBaseChecksum);
1540       if (blk == null) {
1541         HFile.LOG.warn("HBase checksum verification failed for file " +
1542                        path + " at offset " +
1543                        offset + " filesize " + fileSize +
1544                        ". Retrying read with HDFS checksums turned on...");
1545 
1546         if (!doVerificationThruHBaseChecksum) {
1547           String msg = "HBase checksum verification failed for file " +
1548                        path + " at offset " +
1549                        offset + " filesize " + fileSize +
1550                        " but this cannot happen because doVerify is " +
1551                        doVerificationThruHBaseChecksum;
1552           HFile.LOG.warn(msg);
1553           throw new IOException(msg); // cannot happen case here
1554         }
1555         HFile.checksumFailures.incrementAndGet(); // update metrics
1556 
1557         // If we have a checksum failure, we fall back into a mode where
1558         // the next few reads use HDFS level checksums. We aim to make the
1559         // next CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD reads avoid
1560         // hbase checksum verification, but since this value is set without
1561         // holding any locks, it can so happen that we might actually do
1562         // a few more than precisely this number.
1563         is = this.streamWrapper.fallbackToFsChecksum(CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD);
1564         doVerificationThruHBaseChecksum = false;
1565         blk = readBlockDataInternal(is, offset, (int) onDiskSizeWithHeaderL, uncompressedSize,
1566             pread, doVerificationThruHBaseChecksum);
1567         if (blk != null) {
1568           HFile.LOG.warn("HDFS checksum verification suceeded for file " +
1569                          path + " at offset " +
1570                          offset + " filesize " + fileSize);
1571         }
1572       }
1573       if (blk == null && !doVerificationThruHBaseChecksum) {
1574         String msg = "readBlockData failed, possibly due to " +
1575                      "checksum verification failed for file " + path +
1576                      " at offset " + offset + " filesize " + fileSize;
1577         HFile.LOG.warn(msg);
1578         throw new IOException(msg);
1579       }
1580 
1581       // If there is a checksum mismatch earlier, then retry with
1582       // HBase checksums switched off and use HDFS checksum verification.
1583       // This triggers HDFS to detect and fix corrupt replicas. The
1584       // next checksumOffCount read requests will use HDFS checksums.
1585       // The decrementing of this.checksumOffCount is not thread-safe,
1586       // but it is harmless because eventually checksumOffCount will be
1587       // a negative number.
1588       streamWrapper.checksumOk();
1589       return blk;
1590     }
1591 
1592     /**
1593      * Reads a version 2 block.
1594      *
1595      * @param offset the offset in the stream to read at
1596      * @param onDiskSizeWithHeader the on-disk size of the block, including
1597      *          the header, or -1 if unknown
1598      * @param uncompressedSize the uncompressed size of the the block. Always
1599      *          expected to be -1. This parameter is only used in version 1.
1600      * @param pread whether to use a positional read
1601      * @param verifyChecksum Whether to use HBase checksums.
1602      *        If HBase checksum is switched off, then use HDFS checksum.
1603      * @return the HFileBlock or null if there is a HBase checksum mismatch
1604      */
1605     protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset,
1606         int onDiskSizeWithHeader, int uncompressedSize, boolean pread,
1607         boolean verifyChecksum)
1608     throws IOException {
1609       if (offset < 0) {
1610         throw new IOException("Invalid offset=" + offset + " trying to read "
1611             + "block (onDiskSize=" + onDiskSizeWithHeader
1612             + ", uncompressedSize=" + uncompressedSize + ")");
1613       }
1614 
1615       if (uncompressedSize != -1) {
1616         throw new IOException("Version 2 block reader API does not need " +
1617             "the uncompressed size parameter");
1618       }
1619 
1620       if ((onDiskSizeWithHeader < hdrSize && onDiskSizeWithHeader != -1)
1621           || onDiskSizeWithHeader >= Integer.MAX_VALUE) {
1622         throw new IOException("Invalid onDisksize=" + onDiskSizeWithHeader
1623             + ": expected to be at least " + hdrSize
1624             + " and at most " + Integer.MAX_VALUE + ", or -1 (offset="
1625             + offset + ", uncompressedSize=" + uncompressedSize + ")");
1626       }
1627 
1628       // See if we can avoid reading the header. This is desirable, because
1629       // we will not incur a backward seek operation if we have already
1630       // read this block's header as part of the previous read's look-ahead.
1631       // And we also want to skip reading the header again if it has already
1632       // been read.
1633       // TODO: How often does this optimization fire? Has to be same thread so the thread local
1634       // is pertinent and we have to be reading next block as in a big scan.
1635       PrefetchedHeader prefetchedHeader = prefetchedHeaderForThread.get();
1636       ByteBuffer headerBuf = prefetchedHeader.offset == offset? prefetchedHeader.buf: null;
1637 
1638       // Allocate enough space to fit the next block's header too.
1639       int nextBlockOnDiskSize = 0;
1640       byte[] onDiskBlock = null;
1641 
1642       if (onDiskSizeWithHeader > 0) {
1643         // We know the total on-disk size. Read the entire block into memory,
1644         // then parse the header. This code path is used when
1645         // doing a random read operation relying on the block index, as well as
1646         // when the client knows the on-disk size from peeking into the next
1647         // block's header (e.g. this block's header) when reading the previous
1648         // block. This is the faster and more preferable case.
1649 
1650         // Size that we have to skip in case we have already read the header.
1651         int preReadHeaderSize = headerBuf == null ? 0 : hdrSize;
1652         onDiskBlock = new byte[onDiskSizeWithHeader + hdrSize]; // room for this block plus the
1653                                                                 // next block's header
1654         nextBlockOnDiskSize = readAtOffset(is, onDiskBlock,
1655             preReadHeaderSize, onDiskSizeWithHeader - preReadHeaderSize,
1656             true, offset + preReadHeaderSize, pread);
1657         if (headerBuf != null) {
1658           // the header has been read when reading the previous block, copy
1659           // to this block's header
1660           // headerBuf is HBB
1661           assert headerBuf.hasArray();
1662           System.arraycopy(headerBuf.array(),
1663               headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize);
1664         } else {
1665           headerBuf = ByteBuffer.wrap(onDiskBlock, 0, hdrSize);
1666         }
1667         // if the caller specifies a onDiskSizeWithHeader, validate it.
1668         int expectedOnDiskSizeWithoutHeader = onDiskSizeWithHeader - hdrSize;
1669         int actualOnDiskSizeWithoutHeader =
1670             headerBuf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX);
1671         validateOnDiskSizeWithoutHeader(expectedOnDiskSizeWithoutHeader,
1672             actualOnDiskSizeWithoutHeader, headerBuf, offset);
1673       } else {
1674         // Check headerBuf to see if we have read this block's header as part of
1675         // reading the previous block. This is an optimization of peeking into
1676         // the next block's header (e.g.this block's header) when reading the
1677         // previous block. This is the faster and more preferable case. If the
1678         // header is already there, don't read the header again.
1679 
1680         // Unfortunately, we still have to do a separate read operation to
1681         // read the header.
1682         if (headerBuf == null) {
1683           // From the header, determine the on-disk size of the given hfile
1684           // block, and read the remaining data, thereby incurring two read
1685           // operations. This might happen when we are doing the first read
1686           // in a series of reads or a random read, and we don't have access
1687           // to the block index. This is costly and should happen very rarely.
1688           headerBuf = ByteBuffer.allocate(hdrSize);
1689           // headerBuf is HBB
1690           readAtOffset(is, headerBuf.array(), headerBuf.arrayOffset(),
1691               hdrSize, false, offset, pread);
1692         }
1693         int onDiskSizeWithoutHeader = headerBuf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX);
1694         onDiskSizeWithHeader = onDiskSizeWithoutHeader + hdrSize;
1695         onDiskBlock = new byte[onDiskSizeWithHeader + hdrSize];
1696         System.arraycopy(headerBuf.array(), headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize);
1697         nextBlockOnDiskSize =
1698           readAtOffset(is, onDiskBlock, hdrSize, onDiskSizeWithHeader - hdrSize, true,
1699               offset + hdrSize, pread);
1700       }
1701       ByteBuffer onDiskBlockByteBuffer = ByteBuffer.wrap(onDiskBlock, 0, onDiskSizeWithHeader);
1702 
1703       if (!fileContext.isCompressedOrEncrypted()) {
1704         verifyUncompressed(headerBuf, fileContext.isUseHBaseChecksum());
1705       }
1706 
1707       if (verifyChecksum && !validateChecksum(offset, onDiskBlockByteBuffer, hdrSize)) {
1708         return null;             // checksum mismatch
1709       }
1710 
1711       // The onDiskBlock will become the headerAndDataBuffer for this block.
1712       // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already
1713       // contains the header of next block, so no need to set next
1714       // block's header in it.
1715        HFileBlock b = new HFileBlock(onDiskBlockByteBuffer, this.fileContext.isUseHBaseChecksum());
1716 
1717       b.nextBlockOnDiskSizeWithHeader = nextBlockOnDiskSize;
1718 
1719       // Set prefetched header
1720       if (b.hasNextBlockHeader()) {
1721         prefetchedHeader.offset = offset + b.getOnDiskSizeWithHeader();
1722         System.arraycopy(onDiskBlock, onDiskSizeWithHeader, prefetchedHeader.header, 0, hdrSize);
1723       }
1724 
1725       b.offset = offset;
1726       b.fileContext.setIncludesTags(this.fileContext.isIncludesTags());
1727       b.fileContext.setIncludesMvcc(this.fileContext.isIncludesMvcc());
1728       return b;
1729     }
1730 
1731     void setIncludesMemstoreTS(boolean includesMemstoreTS) {
1732       this.fileContext.setIncludesMvcc(includesMemstoreTS);
1733     }
1734 
1735     void setDataBlockEncoder(HFileDataBlockEncoder encoder) {
1736       encodedBlockDecodingCtx = encoder.newDataBlockDecodingContext(this.fileContext);
1737     }
1738 
1739     @Override
1740     public HFileBlockDecodingContext getBlockDecodingContext() {
1741       return this.encodedBlockDecodingCtx;
1742     }
1743 
1744     @Override
1745     public HFileBlockDecodingContext getDefaultBlockDecodingContext() {
1746       return this.defaultDecodingCtx;
1747     }
1748 
1749     /**
1750      * Generates the checksum for the header as well as the data and then validates it.
1751      * If the block doesn't uses checksum, returns false.
1752      * @return True if checksum matches, else false.
1753      */
1754     protected boolean validateChecksum(long offset, ByteBuffer data, int hdrSize)
1755         throws IOException {
1756       // If this is an older version of the block that does not have checksums, then return false
1757       // indicating that checksum verification did not succeed. Actually, this method should never
1758       // be called when the minorVersion is 0, thus this is a defensive check for a cannot-happen
1759       // case. Since this is a cannot-happen case, it is better to return false to indicate a
1760       // checksum validation failure.
1761       if (!fileContext.isUseHBaseChecksum()) {
1762         return false;
1763       }
1764       return ChecksumUtil.validateChecksum(data, path, offset, hdrSize);
1765     }
1766 
1767     @Override
1768     public void closeStreams() throws IOException {
1769       streamWrapper.close();
1770     }
1771 
1772     @Override
1773     public void unbufferStream() {
1774       // To handle concurrent reads, ensure that no other client is accessing the streams while we
1775       // unbuffer it.
1776       if (streamLock.tryLock()) {
1777         try {
1778           this.streamWrapper.unbuffer();
1779         } finally {
1780           streamLock.unlock();
1781         }
1782       }
1783     }
1784 
1785     @Override
1786     public String toString() {
1787       return "hfs=" + hfs + ", path=" + path + ", fileContext=" + fileContext;
1788     }
1789   }
1790 
1791   @Override
1792   public int getSerializedLength() {
1793     if (buf != null) {
1794       // include extra bytes for the next header when it's available.
1795       int extraSpace = hasNextBlockHeader() ? headerSize() : 0;
1796       return this.buf.limit() + extraSpace + HFileBlock.EXTRA_SERIALIZATION_SPACE;
1797     }
1798     return 0;
1799   }
1800 
1801   @Override
1802   public void serialize(ByteBuffer destination) {
1803     ByteBufferUtils.copyFromBufferToBuffer(destination, this.buf, 0, getSerializedLength()
1804         - EXTRA_SERIALIZATION_SPACE);
1805     serializeExtraInfo(destination);
1806   }
1807 
1808   public void serializeExtraInfo(ByteBuffer destination) {
1809     destination.put(this.fileContext.isUseHBaseChecksum() ? (byte) 1 : (byte) 0);
1810     destination.putLong(this.offset);
1811     destination.putInt(this.nextBlockOnDiskSizeWithHeader);
1812     destination.rewind();
1813   }
1814 
1815   @Override
1816   public CacheableDeserializer<Cacheable> getDeserializer() {
1817     return HFileBlock.blockDeserializer;
1818   }
1819 
1820   @Override
1821   public boolean equals(Object comparison) {
1822     if (this == comparison) {
1823       return true;
1824     }
1825     if (comparison == null) {
1826       return false;
1827     }
1828     if (comparison.getClass() != this.getClass()) {
1829       return false;
1830     }
1831 
1832     HFileBlock castedComparison = (HFileBlock) comparison;
1833 
1834     if (castedComparison.blockType != this.blockType) {
1835       return false;
1836     }
1837     if (castedComparison.nextBlockOnDiskSizeWithHeader != this.nextBlockOnDiskSizeWithHeader) {
1838       return false;
1839     }
1840     if (castedComparison.offset != this.offset) {
1841       return false;
1842     }
1843     if (castedComparison.onDiskSizeWithoutHeader != this.onDiskSizeWithoutHeader) {
1844       return false;
1845     }
1846     if (castedComparison.prevBlockOffset != this.prevBlockOffset) {
1847       return false;
1848     }
1849     if (castedComparison.uncompressedSizeWithoutHeader != this.uncompressedSizeWithoutHeader) {
1850       return false;
1851     }
1852     if (ByteBufferUtils.compareTo(this.buf, 0, this.buf.limit(), castedComparison.buf, 0,
1853         castedComparison.buf.limit()) != 0) {
1854       return false;
1855     }
1856     return true;
1857   }
1858 
1859   public DataBlockEncoding getDataBlockEncoding() {
1860     if (blockType == BlockType.ENCODED_DATA) {
1861       return DataBlockEncoding.getEncodingById(getDataBlockEncodingId());
1862     }
1863     return DataBlockEncoding.NONE;
1864   }
1865 
1866   byte getChecksumType() {
1867     return this.fileContext.getChecksumType().getCode();
1868   }
1869 
1870   int getBytesPerChecksum() {
1871     return this.fileContext.getBytesPerChecksum();
1872   }
1873 
1874   /** @return the size of data on disk + header. Excludes checksum. */
1875   int getOnDiskDataSizeWithHeader() {
1876     return this.onDiskDataSizeWithHeader;
1877   }
1878 
1879   /**
1880    * Calculate the number of bytes required to store all the checksums for this block. Each
1881    * checksum value is a 4 byte integer ({@link HFileBlock#CHECKSUM_SIZE}).
1882    */
1883   int totalChecksumBytes() {
1884     return HFileBlock.totalChecksumBytes(this.fileContext, onDiskDataSizeWithHeader);
1885   }
1886 
1887   private static int totalChecksumBytes(HFileContext fileContext, int onDiskDataSizeWithHeader) {
1888     // If the hfile block has minorVersion 0, then there are no checksum
1889     // data to validate. Similarly, a zero value in this.bytesPerChecksum
1890     // indicates that cached blocks do not have checksum data because
1891     // checksums were already validated when the block was read from disk.
1892     if (!fileContext.isUseHBaseChecksum() || fileContext.getBytesPerChecksum() == 0) {
1893       return 0;
1894     }
1895     return (int) ChecksumUtil.numBytes(onDiskDataSizeWithHeader,
1896         fileContext.getBytesPerChecksum());
1897   }
1898 
1899   /**
1900    * Returns the size of this block header.
1901    */
1902   public int headerSize() {
1903     return headerSize(this.fileContext.isUseHBaseChecksum());
1904   }
1905 
1906   /**
1907    * Maps a minor version to the size of the header.
1908    */
1909   public static int headerSize(boolean usesHBaseChecksum) {
1910     if (usesHBaseChecksum) {
1911       return HConstants.HFILEBLOCK_HEADER_SIZE;
1912     }
1913     return HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
1914   }
1915 
1916   /**
1917    * Return the appropriate DUMMY_HEADER for the minor version
1918    */
1919   public byte[] getDummyHeaderForVersion() {
1920     return getDummyHeaderForVersion(this.fileContext.isUseHBaseChecksum());
1921   }
1922 
1923   /**
1924    * Return the appropriate DUMMY_HEADER for the minor version
1925    */
1926   static private byte[] getDummyHeaderForVersion(boolean usesHBaseChecksum) {
1927     if (usesHBaseChecksum) {
1928       return HConstants.HFILEBLOCK_DUMMY_HEADER;
1929     }
1930     return DUMMY_HEADER_NO_CHECKSUM;
1931   }
1932 
1933   /**
1934    * @return the HFileContext used to create this HFileBlock. Not necessary the
1935    * fileContext for the file from which this block's data was originally read.
1936    */
1937   public HFileContext getHFileContext() {
1938     return this.fileContext;
1939   }
1940 
1941   /**
1942    * Convert the contents of the block header into a human readable string.
1943    * This is mostly helpful for debugging. This assumes that the block
1944    * has minor version > 0.
1945    */
1946   static String toStringHeader(ByteBuffer buf) throws IOException {
1947     byte[] magicBuf = new byte[Math.min(buf.limit() - buf.position(), BlockType.MAGIC_LENGTH)];
1948     buf.get(magicBuf);
1949     BlockType bt = BlockType.parse(magicBuf, 0, BlockType.MAGIC_LENGTH);
1950     int compressedBlockSizeNoHeader = buf.getInt();
1951     int uncompressedBlockSizeNoHeader = buf.getInt();
1952     long prevBlockOffset = buf.getLong();
1953     byte cksumtype = buf.get();
1954     long bytesPerChecksum = buf.getInt();
1955     long onDiskDataSizeWithHeader = buf.getInt();
1956     return " Header dump: magic: " + Bytes.toString(magicBuf) +
1957                    " blockType " + bt +
1958                    " compressedBlockSizeNoHeader " +
1959                    compressedBlockSizeNoHeader +
1960                    " uncompressedBlockSizeNoHeader " +
1961                    uncompressedBlockSizeNoHeader +
1962                    " prevBlockOffset " + prevBlockOffset +
1963                    " checksumType " + ChecksumType.codeToType(cksumtype) +
1964                    " bytesPerChecksum " + bytesPerChecksum +
1965                    " onDiskDataSizeWithHeader " + onDiskDataSizeWithHeader;
1966   }
1967 }