View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.io.hfile;
19  
20  import java.io.DataInputStream;
21  import java.io.DataOutput;
22  import java.io.DataOutputStream;
23  import java.io.IOException;
24  import java.io.InputStream;
25  import java.nio.ByteBuffer;
26  import java.util.concurrent.locks.Lock;
27  import java.util.concurrent.locks.ReentrantLock;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.fs.FSDataInputStream;
32  import org.apache.hadoop.fs.FSDataOutputStream;
33  import org.apache.hadoop.fs.Path;
34  import org.apache.hadoop.hbase.Cell;
35  import org.apache.hadoop.hbase.HConstants;
36  import org.apache.hadoop.hbase.classification.InterfaceAudience;
37  import org.apache.hadoop.hbase.fs.HFileSystem;
38  import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
39  import org.apache.hadoop.hbase.io.ByteBuffInputStream;
40  import org.apache.hadoop.hbase.io.ByteBufferSupportDataOutputStream;
41  import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
42  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
43  import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
44  import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
45  import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
46  import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
47  import org.apache.hadoop.hbase.nio.ByteBuff;
48  import org.apache.hadoop.hbase.nio.MultiByteBuff;
49  import org.apache.hadoop.hbase.nio.SingleByteBuff;
50  import org.apache.hadoop.hbase.util.Bytes;
51  import org.apache.hadoop.hbase.util.ChecksumType;
52  import org.apache.hadoop.hbase.util.ClassSize;
53  import org.apache.hadoop.io.IOUtils;
54  
55  import com.google.common.annotations.VisibleForTesting;
56  import com.google.common.base.Preconditions;
57  
58  /**
59   * Reads {@link HFile} version 2 blocks to HFiles and via {@link Cacheable} Interface to caches.
60   * Version 2 was introduced in hbase-0.92.0. No longer has support for version 1 blocks since
61   * hbase-1.3.0.
62   *
63   * <p>Version 1 was the original file block. Version 2 was introduced when we changed the hbase file
64   * format to support multi-level block indexes and compound bloom filters (HBASE-3857).
65   *
66   * <h3>HFileBlock: Version 2</h3>
67   * In version 2, a block is structured as follows:
68   * <ul>
69   * <li><b>Header:</b> See Writer#putHeader() for where header is written; header total size is
70   * HFILEBLOCK_HEADER_SIZE
71   * <ul>
72   * <li>0. blockType: Magic record identifying the {@link BlockType} (8 bytes):
73   * e.g. <code>DATABLK*</code>
74   * <li>1. onDiskSizeWithoutHeader: Compressed -- a.k.a 'on disk' -- block size, excluding header,
75   * but including tailing checksum bytes (4 bytes)
76   * <li>2. uncompressedSizeWithoutHeader: Uncompressed block size, excluding header, and excluding
77   * checksum bytes (4 bytes)
78   * <li>3. prevBlockOffset: The offset of the previous block of the same type (8 bytes). This is
79   * used to navigate to the previous block without having to go to the block index
80   * <li>4: For minorVersions &gt;=1, the ordinal describing checksum type (1 byte)
81   * <li>5: For minorVersions &gt;=1, the number of data bytes/checksum chunk (4 bytes)
82   * <li>6: onDiskDataSizeWithHeader: For minorVersions &gt;=1, the size of data 'on disk', including
83   * header, excluding checksums (4 bytes)
84   * </ul>
85   * </li>
86   * <li><b>Raw/Compressed/Encrypted/Encoded data:</b> The compression
87   * algorithm is the same for all the blocks in an {@link HFile}. If compression is NONE, this is
88   * just raw, serialized Cells.
89   * <li><b>Tail:</b> For minorVersions &gt;=1, a series of 4 byte checksums, one each for
90   * the number of bytes specified by bytesPerChecksum.
91   * </ul>
92   *
93   * <h3>Caching</h3>
94   * Caches cache whole blocks with trailing checksums if any. We then tag on some metadata, the
95   * content of BLOCK_METADATA_SPACE which will be flag on if we are doing 'hbase'
96   * checksums and then the offset into the file which is needed when we re-make a cache key
97   * when we return the block to the cache as 'done'. See {@link Cacheable#serialize(ByteBuffer)} and
98   * {@link Cacheable#getDeserializer()}.
99   *
100  * <p>TODO: Should we cache the checksums? Down in Writer#getBlockForCaching(CacheConfig) where
101  * we make a block to cache-on-write, there is an attempt at turning off checksums. This is not the
102  * only place we get blocks to cache. We also will cache the raw return from an hdfs read. In this
103  * case, the checksums may be present. If the cache is backed by something that doesn't do ECC,
104  * say an SSD, we might want to preserve checksums. For now this is open question.
105  * <p>TODO: Over in BucketCache, we save a block allocation by doing a custom serialization.
106  * Be sure to change it if serialization changes in here. Could we add a method here that takes an
107  * IOEngine and that then serializes to it rather than expose our internals over in BucketCache?
108  * IOEngine is in the bucket subpackage. Pull it up? Then this class knows about bucketcache. Ugh.
109  */
110 @InterfaceAudience.Private
111 public class HFileBlock implements Cacheable {
112   private static final Log LOG = LogFactory.getLog(HFileBlock.class);
113 
114   /** Type of block. Header field 0. */
115   private BlockType blockType;
116 
117   /**
118    * Size on disk excluding header, including checksum. Header field 1.
119    * @see Writer#putHeader(byte[], int, int, int, int)
120    */
121   private int onDiskSizeWithoutHeader;
122 
123   /**
124    * Size of pure data. Does not include header or checksums. Header field 2.
125    * @see Writer#putHeader(byte[], int, int, int, int)
126    */
127   private int uncompressedSizeWithoutHeader;
128 
129   /**
130    * The offset of the previous block on disk. Header field 3.
131    * @see Writer#putHeader(byte[], int, int, int, int)
132    */
133   private long prevBlockOffset;
134 
135   /**
136    * Size on disk of header + data. Excludes checksum. Header field 6,
137    * OR calculated from {@link #onDiskSizeWithoutHeader} when using HDFS checksum.
138    * @see Writer#putHeader(byte[], int, int, int, int)
139    */
140   private int onDiskDataSizeWithHeader;
141 
142 
143   /**
144    * The in-memory representation of the hfile block. Can be on or offheap. Can be backed by
145    * a single ByteBuffer or by many. Make no assumptions.
146    *
147    * <p>Be careful reading from this <code>buf</code>. Duplicate and work on the duplicate or if
148    * not, be sure to reset position and limit else trouble down the road.
149    *
150    * <p>TODO: Make this read-only once made.
151    *
152    * <p>We are using the ByteBuff type. ByteBuffer is not extensible yet we need to be able to have
153    * a ByteBuffer-like API across multiple ByteBuffers reading from a cache such as BucketCache.
154    * So, we have this ByteBuff type. Unfortunately, it is spread all about HFileBlock. Would be
155    * good if could be confined to cache-use only but hard-to-do.
156    */
157   private ByteBuff buf;
158 
159   /** Meta data that holds meta information on the hfileblock.
160    */
161   private HFileContext fileContext;
162 
163   /**
164    * The offset of this block in the file. Populated by the reader for
165    * convenience of access. This offset is not part of the block header.
166    */
167   private long offset = UNSET;
168 
169   private MemoryType memType = MemoryType.EXCLUSIVE;
170 
171   /**
172    * The on-disk size of the next block, including the header and checksums if present, obtained by
173    * peeking into the first {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the next block's
174    * header, or UNSET if unknown.
175    *
176    * Blocks try to carry the size of the next block to read in this data member. They will even have
177    * this value when served from cache. Could save a seek in the case where we are iterating through
178    * a file and some of the blocks come from cache. If from cache, then having this info to hand
179    * will save us doing a seek to read the header so we can read the body of a block.
180    * TODO: see how effective this is at saving seeks.
181    */
182   private int nextBlockOnDiskSize = UNSET;
183 
184   /**
185    * On a checksum failure, do these many succeeding read requests using hdfs checksums before
186    * auto-reenabling hbase checksum verification.
187    */
188   static final int CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD = 3;
189 
190   private static int UNSET = -1;
191   public static final boolean FILL_HEADER = true;
192   public static final boolean DONT_FILL_HEADER = false;
193 
194   // How to get the estimate correctly? if it is a singleBB?
195   public static final int MULTI_BYTE_BUFFER_HEAP_SIZE =
196       (int)ClassSize.estimateBase(MultiByteBuff.class, false);
197 
198   /**
199    * Space for metadata on a block that gets stored along with the block when we cache it.
200    * There are a few bytes stuck on the end of the HFileBlock that we pull in from HDFS (note,
201    * when we read from HDFS, we pull in an HFileBlock AND the header of the next block if one).
202    * 8 bytes are offset of this block (long) in the file. Offset is important because
203    * used when we remake the CacheKey when we return the block to cache when done. There is also
204    * a flag on whether checksumming is being done by hbase or not. See class comment for note on
205    * uncertain state of checksumming of blocks that come out of cache (should we or should we not?).
206    * Finally there 4 bytes to hold the length of the next block which can save a seek on occasion.
207    * <p>This EXTRA came in with original commit of the bucketcache, HBASE-7404. Was formerly
208    * known as EXTRA_SERIALIZATION_SPACE.
209    */
210   static final int BLOCK_METADATA_SPACE = Bytes.SIZEOF_BYTE + Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT;
211 
212   /**
213    * Each checksum value is an integer that can be stored in 4 bytes.
214    */
215   static final int CHECKSUM_SIZE = Bytes.SIZEOF_INT;
216 
217   static final byte[] DUMMY_HEADER_NO_CHECKSUM =
218       new byte[HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM];
219 
220   /**
221    * Used deserializing blocks from Cache.
222    *
223    * <code>
224    * ++++++++++++++
225    * + HFileBlock +
226    * ++++++++++++++
227    * + Checksums  + <= Optional
228    * ++++++++++++++
229    * + Metadata!  +
230    * ++++++++++++++
231    * </code>
232    * @see #serialize(ByteBuffer)
233    */
234   static final CacheableDeserializer<Cacheable> BLOCK_DESERIALIZER =
235       new CacheableDeserializer<Cacheable>() {
236         public HFileBlock deserialize(ByteBuff buf, boolean reuse, MemoryType memType)
237         throws IOException {
238           // The buf has the file block followed by block metadata.
239           // Set limit to just before the BLOCK_METADATA_SPACE then rewind.
240           buf.limit(buf.limit() - BLOCK_METADATA_SPACE).rewind();
241           // Get a new buffer to pass the HFileBlock for it to 'own'.
242           ByteBuff newByteBuff;
243           if (reuse) {
244             newByteBuff = buf.slice();
245           } else {
246             int len = buf.limit();
247             newByteBuff = new SingleByteBuff(ByteBuffer.allocate(len));
248             newByteBuff.put(0, buf, buf.position(), len);
249           }
250           // Read out the BLOCK_METADATA_SPACE content and shove into our HFileBlock.
251           buf.position(buf.limit());
252           buf.limit(buf.limit() + HFileBlock.BLOCK_METADATA_SPACE);
253           boolean usesChecksum = buf.get() == (byte)1;
254           long offset = buf.getLong();
255           int nextBlockOnDiskSize = buf.getInt();
256           HFileBlock hFileBlock =
257               new HFileBlock(newByteBuff, usesChecksum, memType, offset, nextBlockOnDiskSize, null);
258           return hFileBlock;
259         }
260 
261         @Override
262         public int getDeserialiserIdentifier() {
263           return DESERIALIZER_IDENTIFIER;
264         }
265 
266         @Override
267         public HFileBlock deserialize(ByteBuff b) throws IOException {
268           // Used only in tests
269           return deserialize(b, false, MemoryType.EXCLUSIVE);
270         }
271       };
272 
273   private static final int DESERIALIZER_IDENTIFIER;
274   static {
275     DESERIALIZER_IDENTIFIER =
276         CacheableDeserializerIdManager.registerDeserializer(BLOCK_DESERIALIZER);
277   }
278 
279   // Todo: encapsulate Header related logic in this inner class.
280   static class Header {
281     // Format of header is:
282     // 8 bytes - block magic
283     // 4 bytes int - onDiskSizeWithoutHeader
284     // 4 bytes int - uncompressedSizeWithoutHeader
285     // 8 bytes long - prevBlockOffset
286     // The following 3 are only present if header contains checksum information
287     // 1 byte - checksum type
288     // 4 byte int - bytes per checksum
289     // 4 byte int - onDiskDataSizeWithHeader
290     static int BLOCK_MAGIC_INDEX = 0;
291     static int ON_DISK_SIZE_WITHOUT_HEADER_INDEX = 8;
292     static int UNCOMPRESSED_SIZE_WITHOUT_HEADER_INDEX = 12;
293     static int PREV_BLOCK_OFFSET_INDEX = 16;
294     static int CHECKSUM_TYPE_INDEX = 24;
295     static int BYTES_PER_CHECKSUM_INDEX = 25;
296     static int ON_DISK_DATA_SIZE_WITH_HEADER_INDEX = 29;
297   }
298
299   /**
300    * Copy constructor. Creates a shallow copy of {@code that}'s buffer.
301    */
302   private HFileBlock(HFileBlock that) {
303     this.blockType = that.blockType;
304     this.onDiskSizeWithoutHeader = that.onDiskSizeWithoutHeader;
305     this.uncompressedSizeWithoutHeader = that.uncompressedSizeWithoutHeader;
306     this.prevBlockOffset = that.prevBlockOffset;
307     this.buf = that.buf.duplicate();
308     this.offset = that.offset;
309     this.onDiskDataSizeWithHeader = that.onDiskDataSizeWithHeader;
310     this.fileContext = that.fileContext;
311     this.nextBlockOnDiskSize = that.nextBlockOnDiskSize;
312   }
313
314   /**
315    * Creates a new {@link HFile} block from the given fields. This constructor
316    * is used when the block data has already been read and uncompressed,
317    * and is sitting in a byte buffer and we want to stuff the block into cache.
318    * See {@link Writer#getBlockForCaching(CacheConfig)}.
319    *
320    * <p>TODO: The caller presumes no checksumming
321    * required of this block instance since going into cache; checksum already verified on
322    * underlying block data pulled in from filesystem. Is that correct? What if cache is SSD?
323    *
324    * @param blockType the type of this block, see {@link BlockType}
325    * @param onDiskSizeWithoutHeader see {@link #onDiskSizeWithoutHeader}
326    * @param uncompressedSizeWithoutHeader see {@link #uncompressedSizeWithoutHeader}
327    * @param prevBlockOffset see {@link #prevBlockOffset}
328    * @param b block header ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes) followed by
329    *          uncompressed data.
330    * @param fillHeader when true, write the first 4 header fields into passed buffer.
331    * @param offset the file offset the block was read from
332    * @param onDiskDataSizeWithHeader see {@link #onDiskDataSizeWithHeader}
333    * @param fileContext HFile meta data
334    */
335   HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader, int uncompressedSizeWithoutHeader,
336       long prevBlockOffset, ByteBuffer b, boolean fillHeader, long offset,
337       final int nextBlockOnDiskSize, int onDiskDataSizeWithHeader, HFileContext fileContext) {
338     init(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader,
339         prevBlockOffset, offset, onDiskDataSizeWithHeader, nextBlockOnDiskSize, fileContext);
340     this.buf = new SingleByteBuff(b);
341     if (fillHeader) {
342       overwriteHeader();
343     }
344     this.buf.rewind();
345   }
346
347   /**
348    * Creates a block from an existing buffer starting with a header. Rewinds
349    * and takes ownership of the buffer. By definition of rewind, ignores the
350    * buffer position, but if you slice the buffer beforehand, it will rewind
351    * to that point.
352    * @param buf Has header, content, and trailing checksums if present.
353    */
354   HFileBlock(ByteBuff buf, boolean usesHBaseChecksum, MemoryType memType, final long offset,
355       final int nextBlockOnDiskSize, HFileContext fileContext) throws IOException {
356     buf.rewind();
357     final BlockType blockType = BlockType.read(buf);
358     final int onDiskSizeWithoutHeader = buf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX);
359     final int uncompressedSizeWithoutHeader =
360         buf.getInt(Header.UNCOMPRESSED_SIZE_WITHOUT_HEADER_INDEX);
361     final long prevBlockOffset = buf.getLong(Header.PREV_BLOCK_OFFSET_INDEX);
362     byte checksumType = buf.get(Header.CHECKSUM_TYPE_INDEX);
363     int bytesPerChecksum = buf.getInt(Header.BYTES_PER_CHECKSUM_INDEX);
364     int onDiskDataSizeWithHeader = buf.getInt(Header.ON_DISK_DATA_SIZE_WITH_HEADER_INDEX);
365     // This constructor is called when we deserialize a block from cache and when we read a block in
366     // from the fs. fileCache is null when deserialized from cache so need to make up one.
367     HFileContextBuilder fileContextBuilder = fileContext != null?
368         new HFileContextBuilder(fileContext): new HFileContextBuilder();
369     fileContextBuilder.withHBaseCheckSum(usesHBaseChecksum);
370     if (usesHBaseChecksum) {
371       // Use the checksum type and bytes per checksum from header, not from filecontext.
372       fileContextBuilder.withChecksumType(ChecksumType.codeToType(checksumType));
373       fileContextBuilder.withBytesPerCheckSum(bytesPerChecksum);
374     } else {
375       fileContextBuilder.withChecksumType(ChecksumType.NULL);
376       fileContextBuilder.withBytesPerCheckSum(0);
377       // Need to fix onDiskDataSizeWithHeader; there are not checksums after-block-data
378       onDiskDataSizeWithHeader = onDiskSizeWithoutHeader + headerSize(usesHBaseChecksum);
379     }
380     fileContext = fileContextBuilder.build();
381     assert usesHBaseChecksum == fileContext.isUseHBaseChecksum();
382     init(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader,
383         prevBlockOffset, offset, onDiskDataSizeWithHeader, nextBlockOnDiskSize, fileContext);
384     this.memType = memType;
385     this.offset = offset;
386     this.buf = buf;
387     this.buf.rewind();
388   }
389
390   /**
391    * Called from constructors.
392    */
393   private void init(BlockType blockType, int onDiskSizeWithoutHeader,
394       int uncompressedSizeWithoutHeader, long prevBlockOffset,
395       long offset, int onDiskDataSizeWithHeader, final int nextBlockOnDiskSize,
396       HFileContext fileContext) {
397     this.blockType = blockType;
398     this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader;
399     this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader;
400     this.prevBlockOffset = prevBlockOffset;
401     this.offset = offset;
402     this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader;
403     this.nextBlockOnDiskSize = nextBlockOnDiskSize;
404     this.fileContext = fileContext;
405   }
406
407   /**
408    * Parse total ondisk size including header and checksum.
409    * @param headerBuf Header ByteBuffer. Presumed exact size of header.
410    * @return Size of the block with header included.
411    */
412   private static int getOnDiskSizeWithHeader(final ByteBuffer headerBuf) {
413     // Set hbase checksum to true always calling headerSize.
414     return headerBuf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX) + headerSize(true);
415   }
416
417   /**
418    * @return the on-disk size of the next block (including the header size and any checksums if
419    * present) read by peeking into the next block's header; use as a hint when doing
420    * a read of the next block when scanning or running over a file.
421    */
422   public int getNextBlockOnDiskSize() {
423     return nextBlockOnDiskSize;
424   }
425 
426   public BlockType getBlockType() {
427     return blockType;
428   }
429
430   /** @return get data block encoding id that was used to encode this block */
431   public short getDataBlockEncodingId() {
432     if (blockType != BlockType.ENCODED_DATA) {
433       throw new IllegalArgumentException("Querying encoder ID of a block " +
434           "of type other than " + BlockType.ENCODED_DATA + ": " + blockType);
435     }
436     return buf.getShort(headerSize());
437   }
438
439   /**
440    * @return the on-disk size of header + data part + checksum.
441    */
442   public int getOnDiskSizeWithHeader() {
443     return onDiskSizeWithoutHeader + headerSize();
444   }
445
446   /**
447    * @return the on-disk size of the data part + checksum (header excluded).
448    */
449   int getOnDiskSizeWithoutHeader() {
450     return onDiskSizeWithoutHeader;
451   }
452
453   /**
454    * @return the uncompressed size of data part (header and checksum excluded).
455    */
456    int getUncompressedSizeWithoutHeader() {
457     return uncompressedSizeWithoutHeader;
458   }
459
460   /**
461    * @return the offset of the previous block of the same type in the file, or
462    *         -1 if unknown
463    */
464   long getPrevBlockOffset() {
465     return prevBlockOffset;
466   }
467
468   /**
469    * Rewinds {@code buf} and writes first 4 header fields. {@code buf} position
470    * is modified as side-effect.
471    */
472   private void overwriteHeader() {
473     buf.rewind();
474     blockType.write(buf);
475     buf.putInt(onDiskSizeWithoutHeader);
476     buf.putInt(uncompressedSizeWithoutHeader);
477     buf.putLong(prevBlockOffset);
478     if (this.fileContext.isUseHBaseChecksum()) {
479       buf.put(fileContext.getChecksumType().getCode());
480       buf.putInt(fileContext.getBytesPerChecksum());
481       buf.putInt(onDiskDataSizeWithHeader);
482     }
483   }
484
485   /**
486    * Returns a buffer that does not include the header or checksum.
487    *
488    * @return the buffer with header skipped and checksum omitted.
489    */
490   public ByteBuff getBufferWithoutHeader() {
491     ByteBuff dup = getBufferReadOnly();
492     // Now set it up so Buffer spans content only -- no header or no checksums.
493     return dup.position(headerSize()).limit(buf.limit() - totalChecksumBytes()).slice();
494   }
495
496   /**
497    * Returns a read-only duplicate of the buffer this block stores internally ready to be read.
498    * Clients must not modify the buffer object though they may set position and limit on the
499    * returned buffer since we pass back a duplicate. This method has to be public because it is used
500    * in {@link CompoundBloomFilter} to avoid object creation on every Bloom
501    * filter lookup, but has to be used with caution. Buffer holds header, block content,
502    * and any follow-on checksums if present.
503    *
504    * @return the buffer of this block for read-only operations
505    */
506   public ByteBuff getBufferReadOnly() {
507     // TODO: ByteBuf does not support asReadOnlyBuffer(). Fix.
508     ByteBuff dup = this.buf.duplicate();
509     assert dup.position() == 0;
510     return dup;
511   }
512
513   private void sanityCheckAssertion(long valueFromBuf, long valueFromField,
514       String fieldName) throws IOException {
515     if (valueFromBuf != valueFromField) {
516       throw new AssertionError(fieldName + " in the buffer (" + valueFromBuf
517           + ") is different from that in the field (" + valueFromField + ")");
518     }
519   }
520
521   private void sanityCheckAssertion(BlockType valueFromBuf, BlockType valueFromField)
522       throws IOException {
523     if (valueFromBuf != valueFromField) {
524       throw new IOException("Block type stored in the buffer: " +
525         valueFromBuf + ", block type field: " + valueFromField);
526     }
527   }
528
529   /**
530    * Checks if the block is internally consistent, i.e. the first
531    * {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the buffer contain a
532    * valid header consistent with the fields. Assumes a packed block structure.
533    * This function is primary for testing and debugging, and is not
534    * thread-safe, because it alters the internal buffer pointer.
535    * Used by tests only.
536    */
537   @VisibleForTesting
538   void sanityCheck() throws IOException {
539     // Duplicate so no side-effects
540     ByteBuff dup = this.buf.duplicate().rewind();
541     sanityCheckAssertion(BlockType.read(dup), blockType);
542
543     sanityCheckAssertion(dup.getInt(), onDiskSizeWithoutHeader, "onDiskSizeWithoutHeader");
544
545     sanityCheckAssertion(dup.getInt(), uncompressedSizeWithoutHeader,
546         "uncompressedSizeWithoutHeader");
547
548     sanityCheckAssertion(dup.getLong(), prevBlockOffset, "prevBlockOffset");
549     if (this.fileContext.isUseHBaseChecksum()) {
550       sanityCheckAssertion(dup.get(), this.fileContext.getChecksumType().getCode(), "checksumType");
551       sanityCheckAssertion(dup.getInt(), this.fileContext.getBytesPerChecksum(),
552           "bytesPerChecksum");
553       sanityCheckAssertion(dup.getInt(), onDiskDataSizeWithHeader, "onDiskDataSizeWithHeader");
554     }
555
556     int cksumBytes = totalChecksumBytes();
557     int expectedBufLimit = onDiskDataSizeWithHeader + cksumBytes;
558     if (dup.limit() != expectedBufLimit) {
559       throw new AssertionError("Expected limit " + expectedBufLimit + ", got " + dup.limit());
560     }
561
562     // We might optionally allocate HFILEBLOCK_HEADER_SIZE more bytes to read the next
563     // block's header, so there are two sensible values for buffer capacity.
564     int hdrSize = headerSize();
565     if (dup.capacity() != expectedBufLimit && dup.capacity() != expectedBufLimit + hdrSize) {
566       throw new AssertionError("Invalid buffer capacity: " + dup.capacity() +
567           ", expected " + expectedBufLimit + " or " + (expectedBufLimit + hdrSize));
568     }
569   }
570
571   @Override
572   public String toString() {
573     StringBuilder sb = new StringBuilder()
574       .append("[")
575       .append("blockType=").append(blockType)
576       .append(", fileOffset=").append(offset)
577       .append(", headerSize=").append(headerSize())
578       .append(", onDiskSizeWithoutHeader=").append(onDiskSizeWithoutHeader)
579       .append(", uncompressedSizeWithoutHeader=").append(uncompressedSizeWithoutHeader)
580       .append(", prevBlockOffset=").append(prevBlockOffset)
581       .append(", isUseHBaseChecksum=").append(fileContext.isUseHBaseChecksum());
582     if (fileContext.isUseHBaseChecksum()) {
583       sb.append(", checksumType=").append(ChecksumType.codeToType(this.buf.get(24)))
584         .append(", bytesPerChecksum=").append(this.buf.getInt(24 + 1))
585         .append(", onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader);
586     } else {
587       sb.append(", onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader)
588         .append("(").append(onDiskSizeWithoutHeader)
589         .append("+").append(HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM).append(")");
590     }
591     String dataBegin = null;
592     if (buf.hasArray()) {
593       dataBegin = Bytes.toStringBinary(buf.array(), buf.arrayOffset() + headerSize(),
594           Math.min(32, buf.limit() - buf.arrayOffset() - headerSize()));
595     } else {
596       ByteBuff bufWithoutHeader = getBufferWithoutHeader();
597       byte[] dataBeginBytes = new byte[Math.min(32,
598           bufWithoutHeader.limit() - bufWithoutHeader.position())];
599       bufWithoutHeader.get(dataBeginBytes);
600       dataBegin = Bytes.toStringBinary(dataBeginBytes);
601     }
602     sb.append(", getOnDiskSizeWithHeader=").append(getOnDiskSizeWithHeader())
603       .append(", totalChecksumBytes=").append(totalChecksumBytes())
604       .append(", isUnpacked=").append(isUnpacked())
605       .append(", buf=[").append(buf).append("]")
606       .append(", dataBeginsWith=").append(dataBegin)
607       .append(", fileContext=").append(fileContext)
608       .append("]");
609     return sb.toString();
610   }
611
612   /**
613    * Retrieves the decompressed/decrypted view of this block. An encoded block remains in its
614    * encoded structure. Internal structures are shared between instances where applicable.
615    */
616   HFileBlock unpack(HFileContext fileContext, FSReader reader) throws IOException {
617     if (!fileContext.isCompressedOrEncrypted()) {
618       // TODO: cannot use our own fileContext here because HFileBlock(ByteBuffer, boolean),
619       // which is used for block serialization to L2 cache, does not preserve encoding and
620       // encryption details.
621       return this;
622     }
623
624     HFileBlock unpacked = new HFileBlock(this);
625     unpacked.allocateBuffer(); // allocates space for the decompressed block
626
627     HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA ?
628       reader.getBlockDecodingContext() : reader.getDefaultBlockDecodingContext();
629
630     ByteBuff dup = this.buf.duplicate();
631     dup.position(this.headerSize());
632     dup = dup.slice();
633     ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(),
634       unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(),
635       dup);
636     return unpacked;
637   }
638
639   /**
640    * Always allocates a new buffer of the correct size. Copies header bytes
641    * from the existing buffer. Does not change header fields.
642    * Reserve room to keep checksum bytes too.
643    */
644   private void allocateBuffer() {
645     int cksumBytes = totalChecksumBytes();
646     int headerSize = headerSize();
647     int capacityNeeded = headerSize + uncompressedSizeWithoutHeader + cksumBytes;
648
649     // TODO we need consider allocating offheap here?
650     ByteBuffer newBuf = ByteBuffer.allocate(capacityNeeded);
651
652     // Copy header bytes into newBuf.
653     // newBuf is HBB so no issue in calling array()
654     buf.position(0);
655     buf.get(newBuf.array(), newBuf.arrayOffset(), headerSize);
656
657     buf = new SingleByteBuff(newBuf);
658     // set limit to exclude next block's header
659     buf.limit(headerSize + uncompressedSizeWithoutHeader + cksumBytes);
660   }
661
662   /**
663    * Return true when this block's buffer has been unpacked, false otherwise. Note this is a
664    * calculated heuristic, not tracked attribute of the block.
665    */
666   public boolean isUnpacked() {
667     final int cksumBytes = totalChecksumBytes();
668     final int headerSize = headerSize();
669     final int expectedCapacity = headerSize + uncompressedSizeWithoutHeader + cksumBytes;
670     final int bufCapacity = buf.capacity();
671     return bufCapacity == expectedCapacity || bufCapacity == expectedCapacity + headerSize;
672   }
673
674   /** An additional sanity-check in case no compression or encryption is being used. */
675   public void sanityCheckUncompressedSize() throws IOException {
676     if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader + totalChecksumBytes()) {
677       throw new IOException("Using no compression but "
678           + "onDiskSizeWithoutHeader=" + onDiskSizeWithoutHeader + ", "
679           + "uncompressedSizeWithoutHeader=" + uncompressedSizeWithoutHeader
680           + ", numChecksumbytes=" + totalChecksumBytes());
681     }
682   }
683
684   /**
685    * Cannot be {@link #UNSET}. Must be a legitimate value. Used re-making the {@link CacheKey} when
686    * block is returned to the cache.
687    * @return the offset of this block in the file it was read from
688    */
689   long getOffset() {
690     if (offset < 0) {
691       throw new IllegalStateException("HFile block offset not initialized properly");
692     }
693     return offset;
694   }
695
696   /**
697    * @return a byte stream reading the data + checksum of this block
698    */
699   DataInputStream getByteStream() {
700     ByteBuff dup = this.buf.duplicate();
701     dup.position(this.headerSize());
702     return new DataInputStream(new ByteBuffInputStream(dup));
703   }
704 
705   @Override
706   public long heapSize() {
707     long size = ClassSize.align(
708         ClassSize.OBJECT +
709         // Block type, multi byte buffer, MemoryType and meta references
710         4 * ClassSize.REFERENCE +
711         // On-disk size, uncompressed size, and next block's on-disk size
712         // bytePerChecksum and onDiskDataSize
713         4 * Bytes.SIZEOF_INT +
714         // This and previous block offset
715         2 * Bytes.SIZEOF_LONG +
716         // Heap size of the meta object. meta will be always not null.
717         fileContext.heapSize()
718     );
719
720     if (buf != null) {
721       // Deep overhead of the byte buffer. Needs to be aligned separately.
722       size += ClassSize.align(buf.capacity() + MULTI_BYTE_BUFFER_HEAP_SIZE);
723     }
724
725     return ClassSize.align(size);
726   }
727
728   /**
729    * Read from an input stream at least <code>necessaryLen</code> and if possible,
730    * <code>extraLen</code> also if available. Analogous to
731    * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but specifies a
732    * number of "extra" bytes to also optionally read.
733    *
734    * @param in the input stream to read from
735    * @param buf the buffer to read into
736    * @param bufOffset the destination offset in the buffer
737    * @param necessaryLen the number of bytes that are absolutely necessary to read
738    * @param extraLen the number of extra bytes that would be nice to read
739    * @return true if succeeded reading the extra bytes
740    * @throws IOException if failed to read the necessary bytes
741    */
742   static boolean readWithExtra(InputStream in, byte[] buf,
743       int bufOffset, int necessaryLen, int extraLen) throws IOException {
744     int bytesRemaining = necessaryLen + extraLen;
745     while (bytesRemaining > 0) {
746       int ret = in.read(buf, bufOffset, bytesRemaining);
747       if (ret == -1 && bytesRemaining <= extraLen) {
748         // We could not read the "extra data", but that is OK.
749         break;
750       }
751       if (ret < 0) {
752         throw new IOException("Premature EOF from inputStream (read "
753             + "returned " + ret + ", was trying to read " + necessaryLen
754             + " necessary bytes and " + extraLen + " extra bytes, "
755             + "successfully read "
756             + (necessaryLen + extraLen - bytesRemaining));
757       }
758       bufOffset += ret;
759       bytesRemaining -= ret;
760     }
761     return bytesRemaining <= 0;
762   }
763
764   /**
765    * Read from an input stream at least <code>necessaryLen</code> and if possible,
766    * <code>extraLen</code> also if available. Analogous to
767    * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but uses
768    * positional read and specifies a number of "extra" bytes that would be
769    * desirable but not absolutely necessary to read.
770    *
771    * @param in the input stream to read from
772    * @param position the position within the stream from which to start reading
773    * @param buf the buffer to read into
774    * @param bufOffset the destination offset in the buffer
775    * @param necessaryLen the number of bytes that are absolutely necessary to
776    *     read
777    * @param extraLen the number of extra bytes that would be nice to read
778    * @return true if and only if extraLen is > 0 and reading those extra bytes
779    *     was successful
780    * @throws IOException if failed to read the necessary bytes
781    */
782   @VisibleForTesting
783   static boolean positionalReadWithExtra(FSDataInputStream in,
784       long position, byte[] buf, int bufOffset, int necessaryLen, int extraLen)
785       throws IOException {
786     int bytesRemaining = necessaryLen + extraLen;
787     int bytesRead = 0;
788     while (bytesRead < necessaryLen) {
789       int ret = in.read(position, buf, bufOffset, bytesRemaining);
790       if (ret < 0) {
791         throw new IOException("Premature EOF from inputStream (positional read "
792             + "returned " + ret + ", was trying to read " + necessaryLen
793             + " necessary bytes and " + extraLen + " extra bytes, "
794             + "successfully read " + bytesRead);
795       }
796       position += ret;
797       bufOffset += ret;
798       bytesRemaining -= ret;
799       bytesRead += ret;
800     }
801     return bytesRead != necessaryLen && bytesRemaining <= 0;
802   }
803 
804   /**
805    * Unified version 2 {@link HFile} block writer. The intended usage pattern
806    * is as follows:
807    * <ol>
808    * <li>Construct an {@link HFileBlock.Writer}, providing a compression algorithm.
809    * <li>Call {@link Writer#startWriting} and get a data stream to write to.
810    * <li>Write your data into the stream.
811    * <li>Call Writer#writeHeaderAndData(FSDataOutputStream) as many times as you need to.
812    * store the serialized block into an external stream.
813    * <li>Repeat to write more blocks.
814    * </ol>
815    * <p>
816    */
817   static class Writer {
818     private enum State {
819       INIT,
820       WRITING,
821       BLOCK_READY
822     };
823
824     /** Writer state. Used to ensure the correct usage protocol. */
825     private State state = State.INIT;
826
827     /** Data block encoder used for data blocks */
828     private final HFileDataBlockEncoder dataBlockEncoder;
829 
830     private HFileBlockEncodingContext dataBlockEncodingCtx;
831
832     /** block encoding context for non-data blocks*/
833     private HFileBlockDefaultEncodingContext defaultBlockEncodingCtx;
834
835     /**
836      * The stream we use to accumulate data into a block in an uncompressed format.
837      * We reset this stream at the end of each block and reuse it. The
838      * header is written as the first {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes into this
839      * stream.
840      */
841     private ByteArrayOutputStream baosInMemory;
842
843     /**
844      * Current block type. Set in {@link #startWriting(BlockType)}. Could be
845      * changed in {@link #finishBlock()} from {@link BlockType#DATA}
846      * to {@link BlockType#ENCODED_DATA}.
847      */
848     private BlockType blockType;
849
850     /**
851      * A stream that we write uncompressed bytes to, which compresses them and
852      * writes them to {@link #baosInMemory}.
853      */
854     private DataOutputStream userDataStream;
855
856     // Size of actual data being written. Not considering the block encoding/compression. This
857     // includes the header size also.
858     private int unencodedDataSizeWritten;
859
860     /**
861      * Bytes to be written to the file system, including the header. Compressed
862      * if compression is turned on. It also includes the checksum data that
863      * immediately follows the block data. (header + data + checksums)
864      */
865     private byte[] onDiskBlockBytesWithHeader;
866
867     /**
868      * The size of the checksum data on disk. It is used only if data is
869      * not compressed. If data is compressed, then the checksums are already
870      * part of onDiskBytesWithHeader. If data is uncompressed, then this
871      * variable stores the checksum data for this block.
872      */
873     private byte[] onDiskChecksum = HConstants.EMPTY_BYTE_ARRAY;
874
875     /**
876      * Valid in the READY state. Contains the header and the uncompressed (but
877      * potentially encoded, if this is a data block) bytes, so the length is
878      * {@link #uncompressedSizeWithoutHeader} +
879      * {@link org.apache.hadoop.hbase.HConstants#HFILEBLOCK_HEADER_SIZE}.
880      * Does not store checksums.
881      */
882     private byte[] uncompressedBlockBytesWithHeader;
883
884     /**
885      * Current block's start offset in the {@link HFile}. Set in
886      * {@link #writeHeaderAndData(FSDataOutputStream)}.
887      */
888     private long startOffset;
889
890     /**
891      * Offset of previous block by block type. Updated when the next block is
892      * started.
893      */
894     private long[] prevOffsetByType;
895
896     /** The offset of the previous block of the same type */
897     private long prevOffset;
898     /** Meta data that holds information about the hfileblock**/
899     private HFileContext fileContext;
900
901     /**
902      * @param dataBlockEncoder data block encoding algorithm to use
903      */
904     public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext fileContext) {
905       if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) {
906         throw new RuntimeException("Unsupported value of bytesPerChecksum. " +
907             " Minimum is " + HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is " +
908             fileContext.getBytesPerChecksum());
909       }
910       this.dataBlockEncoder = dataBlockEncoder != null?
911           dataBlockEncoder: NoOpDataBlockEncoder.INSTANCE;
912       this.dataBlockEncodingCtx = this.dataBlockEncoder.
913           newDataBlockEncodingContext(HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
914       // TODO: This should be lazily instantiated since we usually do NOT need this default encoder
915       this.defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(null,
916           HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
917       // TODO: Set BAOS initial size. Use fileContext.getBlocksize() and add for header/checksum
918       baosInMemory = new ByteArrayOutputStream();
919       prevOffsetByType = new long[BlockType.values().length];
920       for (int i = 0; i < prevOffsetByType.length; ++i) {
921         prevOffsetByType[i] = UNSET;
922       }
923       // TODO: Why fileContext saved away when we have dataBlockEncoder and/or
924       // defaultDataBlockEncoder?
925       this.fileContext = fileContext;
926     }
927 
928     /**
929      * Starts writing into the block. The previous block's data is discarded.
930      *
931      * @return the stream the user can write their data into
932      * @throws IOException
933      */
934     DataOutputStream startWriting(BlockType newBlockType)
935         throws IOException {
936       if (state == State.BLOCK_READY && startOffset != -1) {
937         // We had a previous block that was written to a stream at a specific
938         // offset. Save that offset as the last offset of a block of that type.
939         prevOffsetByType[blockType.getId()] = startOffset;
940       }
941
942       startOffset = -1;
943       blockType = newBlockType;
944
945       baosInMemory.reset();
946       baosInMemory.write(HConstants.HFILEBLOCK_DUMMY_HEADER);
947
948       state = State.WRITING;
949 
950       // We will compress it later in finishBlock()
951       userDataStream = new ByteBufferSupportDataOutputStream(baosInMemory);
952       if (newBlockType == BlockType.DATA) {
953         this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream);
954       }
955       this.unencodedDataSizeWritten = 0;
956       return userDataStream;
957     }
958
959     /**
960      * Writes the Cell to this block
961      * @param cell
962      * @throws IOException
963      */
964     void write(Cell cell) throws IOException{
965       expectState(State.WRITING);
966       this.unencodedDataSizeWritten +=
967           this.dataBlockEncoder.encode(cell, dataBlockEncodingCtx, this.userDataStream);
968     }
969 
970     /**
971      * Returns the stream for the user to write to. The block writer takes care
972      * of handling compression and buffering for caching on write. Can only be
973      * called in the "writing" state.
974      *
975      * @return the data output stream for the user to write to
976      */
977     DataOutputStream getUserDataStream() {
978       expectState(State.WRITING);
979       return userDataStream;
980     }
981
982     /**
983      * Transitions the block writer from the "writing" state to the "block
984      * ready" state.  Does nothing if a block is already finished.
985      */
986     void ensureBlockReady() throws IOException {
987       Preconditions.checkState(state != State.INIT,
988           "Unexpected state: " + state);
989
990       if (state == State.BLOCK_READY) {
991         return;
992       }
993
994       // This will set state to BLOCK_READY.
995       finishBlock();
996     }
997
998     /**
999      * Finish up writing of the block.
1000      * Flushes the compressing stream (if using compression), fills out the header,
1001      * does any compression/encryption of bytes to flush out to disk, and manages
1002      * the cache on write content, if applicable. Sets block write state to "block ready".
1003      */
1004     private void finishBlock() throws IOException {
1005       if (blockType == BlockType.DATA) {
1006         this.dataBlockEncoder.endBlockEncoding(dataBlockEncodingCtx, userDataStream,
1007             baosInMemory.getBuffer(), blockType);
1008         blockType = dataBlockEncodingCtx.getBlockType();
1009       }
1010       userDataStream.flush();
1011       // This does an array copy, so it is safe to cache this byte array when cache-on-write.
1012       // Header is still the empty, 'dummy' header that is yet to be filled out.
1013       uncompressedBlockBytesWithHeader = baosInMemory.toByteArray();
1014       prevOffset = prevOffsetByType[blockType.getId()];
1015
1016       // We need to set state before we can package the block up for cache-on-write. In a way, the
1017       // block is ready, but not yet encoded or compressed.
1018       state = State.BLOCK_READY;
1019       if (blockType == BlockType.DATA || blockType == BlockType.ENCODED_DATA) {
1020         onDiskBlockBytesWithHeader = dataBlockEncodingCtx.
1021             compressAndEncrypt(uncompressedBlockBytesWithHeader);
1022       } else {
1023         onDiskBlockBytesWithHeader = defaultBlockEncodingCtx.
1024             compressAndEncrypt(uncompressedBlockBytesWithHeader);
1025       }
1026       // Calculate how many bytes we need for checksum on the tail of the block.
1027       int numBytes = (int) ChecksumUtil.numBytes(
1028           onDiskBlockBytesWithHeader.length,
1029           fileContext.getBytesPerChecksum());
1030
1031       // Put the header for the on disk bytes; header currently is unfilled-out
1032       putHeader(onDiskBlockBytesWithHeader, 0,
1033           onDiskBlockBytesWithHeader.length + numBytes,
1034           uncompressedBlockBytesWithHeader.length, onDiskBlockBytesWithHeader.length);
1035       // Set the header for the uncompressed bytes (for cache-on-write) -- IFF different from
1036       // onDiskBlockBytesWithHeader array.
1037       if (onDiskBlockBytesWithHeader != uncompressedBlockBytesWithHeader) {
1038         putHeader(uncompressedBlockBytesWithHeader, 0,
1039           onDiskBlockBytesWithHeader.length + numBytes,
1040           uncompressedBlockBytesWithHeader.length, onDiskBlockBytesWithHeader.length);
1041       }
1042       if (onDiskChecksum.length != numBytes) {
1043         onDiskChecksum = new byte[numBytes];
1044       }
1045       ChecksumUtil.generateChecksums(
1046           onDiskBlockBytesWithHeader, 0, onDiskBlockBytesWithHeader.length,
1047           onDiskChecksum, 0, fileContext.getChecksumType(), fileContext.getBytesPerChecksum());
1048     }
1049
1050     /**
1051      * Put the header into the given byte array at the given offset.
1052      * @param onDiskSize size of the block on disk header + data + checksum
1053      * @param uncompressedSize size of the block after decompression (but
1054      *          before optional data block decoding) including header
1055      * @param onDiskDataSize size of the block on disk with header
1056      *        and data but not including the checksums
1057      */
1058     private void putHeader(byte[] dest, int offset, int onDiskSize,
1059         int uncompressedSize, int onDiskDataSize) {
1060       offset = blockType.put(dest, offset);
1061       offset = Bytes.putInt(dest, offset, onDiskSize - HConstants.HFILEBLOCK_HEADER_SIZE);
1062       offset = Bytes.putInt(dest, offset, uncompressedSize - HConstants.HFILEBLOCK_HEADER_SIZE);
1063       offset = Bytes.putLong(dest, offset, prevOffset);
1064       offset = Bytes.putByte(dest, offset, fileContext.getChecksumType().getCode());
1065       offset = Bytes.putInt(dest, offset, fileContext.getBytesPerChecksum());
1066       Bytes.putInt(dest, offset, onDiskDataSize);
1067     }
1068
1069     /**
1070      * Similar to {@link #writeHeaderAndData(FSDataOutputStream)}, but records
1071      * the offset of this block so that it can be referenced in the next block
1072      * of the same type.
1073      *
1074      * @param out
1075      * @throws IOException
1076      */
1077     void writeHeaderAndData(FSDataOutputStream out) throws IOException {
1078       long offset = out.getPos();
1079       if (startOffset != UNSET && offset != startOffset) {
1080         throw new IOException("A " + blockType + " block written to a "
1081             + "stream twice, first at offset " + startOffset + ", then at "
1082             + offset);
1083       }
1084       startOffset = offset;
1085
1086       finishBlockAndWriteHeaderAndData((DataOutputStream) out);
1087     }
1088
1089     /**
1090      * Writes the header and the compressed data of this block (or uncompressed
1091      * data when not using compression) into the given stream. Can be called in
1092      * the "writing" state or in the "block ready" state. If called in the
1093      * "writing" state, transitions the writer to the "block ready" state.
1094      *
1095      * @param out the output stream to write the
1096      * @throws IOException
1097      */
1098     protected void finishBlockAndWriteHeaderAndData(DataOutputStream out)
1099       throws IOException {
1100       ensureBlockReady();
1101       out.write(onDiskBlockBytesWithHeader);
1102       out.write(onDiskChecksum);
1103     }
1104
1105     /**
1106      * Returns the header or the compressed data (or uncompressed data when not
1107      * using compression) as a byte array. Can be called in the "writing" state
1108      * or in the "block ready" state. If called in the "writing" state,
1109      * transitions the writer to the "block ready" state. This returns
1110      * the header + data + checksums stored on disk.
1111      *
1112      * @return header and data as they would be stored on disk in a byte array
1113      * @throws IOException
1114      */
1115     byte[] getHeaderAndDataForTest() throws IOException {
1116       ensureBlockReady();
1117       // This is not very optimal, because we are doing an extra copy.
1118       // But this method is used only by unit tests.
1119       byte[] output =
1120           new byte[onDiskBlockBytesWithHeader.length
1121               + onDiskChecksum.length];
1122       System.arraycopy(onDiskBlockBytesWithHeader, 0, output, 0,
1123           onDiskBlockBytesWithHeader.length);
1124       System.arraycopy(onDiskChecksum, 0, output,
1125           onDiskBlockBytesWithHeader.length, onDiskChecksum.length);
1126       return output;
1127     }
1128
1129     /**
1130      * Releases resources used by this writer.
1131      */
1132     void release() {
1133       if (dataBlockEncodingCtx != null) {
1134         dataBlockEncodingCtx.close();
1135         dataBlockEncodingCtx = null;
1136       }
1137       if (defaultBlockEncodingCtx != null) {
1138         defaultBlockEncodingCtx.close();
1139         defaultBlockEncodingCtx = null;
1140       }
1141     }
1142
1143     /**
1144      * Returns the on-disk size of the data portion of the block. This is the
1145      * compressed size if compression is enabled. Can only be called in the
1146      * "block ready" state. Header is not compressed, and its size is not
1147      * included in the return value.
1148      *
1149      * @return the on-disk size of the block, not including the header.
1150      */
1151     int getOnDiskSizeWithoutHeader() {
1152       expectState(State.BLOCK_READY);
1153       return onDiskBlockBytesWithHeader.length +
1154           onDiskChecksum.length - HConstants.HFILEBLOCK_HEADER_SIZE;
1155     }
1156
1157     /**
1158      * Returns the on-disk size of the block. Can only be called in the
1159      * "block ready" state.
1160      *
1161      * @return the on-disk size of the block ready to be written, including the
1162      *         header size, the data and the checksum data.
1163      */
1164     int getOnDiskSizeWithHeader() {
1165       expectState(State.BLOCK_READY);
1166       return onDiskBlockBytesWithHeader.length + onDiskChecksum.length;
1167     }
1168
1169     /**
1170      * The uncompressed size of the block data. Does not include header size.
1171      */
1172     int getUncompressedSizeWithoutHeader() {
1173       expectState(State.BLOCK_READY);
1174       return uncompressedBlockBytesWithHeader.length - HConstants.HFILEBLOCK_HEADER_SIZE;
1175     }
1176
1177     /**
1178      * The uncompressed size of the block data, including header size.
1179      */
1180     int getUncompressedSizeWithHeader() {
1181       expectState(State.BLOCK_READY);
1182       return uncompressedBlockBytesWithHeader.length;
1183     }
1184
1185     /** @return true if a block is being written  */
1186     boolean isWriting() {
1187       return state == State.WRITING;
1188     }
1189
1190     /**
1191      * Returns the number of bytes written into the current block so far, or
1192      * zero if not writing the block at the moment. Note that this will return
1193      * zero in the "block ready" state as well.
1194      *
1195      * @return the number of bytes written
1196      */
1197     int blockSizeWritten() {
1198       if (state != State.WRITING) return 0;
1199       return this.unencodedDataSizeWritten;
1200     }
1201
1202     /**
1203      * Returns the header followed by the uncompressed data, even if using
1204      * compression. This is needed for storing uncompressed blocks in the block
1205      * cache. Can be called in the "writing" state or the "block ready" state.
1206      * Returns only the header and data, does not include checksum data.
1207      *
1208      * @return uncompressed block bytes for caching on write
1209      */
1210     ByteBuffer getUncompressedBufferWithHeader() {
1211       expectState(State.BLOCK_READY);
1212       return ByteBuffer.wrap(uncompressedBlockBytesWithHeader);
1213     }
1214
1215     /**
1216      * Returns the header followed by the on-disk (compressed/encoded/encrypted) data. This is
1217      * needed for storing packed blocks in the block cache. Expects calling semantics identical to
1218      * {@link #getUncompressedBufferWithHeader()}. Returns only the header and data,
1219      * Does not include checksum data.
1220      *
1221      * @return packed block bytes for caching on write
1222      */
1223     ByteBuffer getOnDiskBufferWithHeader() {
1224       expectState(State.BLOCK_READY);
1225       return ByteBuffer.wrap(onDiskBlockBytesWithHeader);
1226     }
1227
1228     private void expectState(State expectedState) {
1229       if (state != expectedState) {
1230         throw new IllegalStateException("Expected state: " + expectedState +
1231             ", actual state: " + state);
1232       }
1233     }
1234
1235     /**
1236      * Takes the given {@link BlockWritable} instance, creates a new block of
1237      * its appropriate type, writes the writable into this block, and flushes
1238      * the block into the output stream. The writer is instructed not to buffer
1239      * uncompressed bytes for cache-on-write.
1240      *
1241      * @param bw the block-writable object to write as a block
1242      * @param out the file system output stream
1243      * @throws IOException
1244      */
1245     void writeBlock(BlockWritable bw, FSDataOutputStream out)
1246         throws IOException {
1247       bw.writeToBlock(startWriting(bw.getBlockType()));
1248       writeHeaderAndData(out);
1249     }
1250
1251     /**
1252      * Creates a new HFileBlock. Checksums have already been validated, so
1253      * the byte buffer passed into the constructor of this newly created
1254      * block does not have checksum data even though the header minor
1255      * version is MINOR_VERSION_WITH_CHECKSUM. This is indicated by setting a
1256      * 0 value in bytesPerChecksum.
1257      *
1258      * <p>TODO: Should there be an option where a cache can ask that hbase preserve block
1259      * checksums for checking after a block comes out of the cache? Otehrwise, cache is responsible
1260      * for blocks being wholesome (ECC memory or if file-backed, it does checksumming).
1261      */
1262     HFileBlock getBlockForCaching(CacheConfig cacheConf) {
1263       HFileContext newContext = new HFileContextBuilder()
1264                                 .withBlockSize(fileContext.getBlocksize())
1265                                 .withBytesPerCheckSum(0)
1266                                 .withChecksumType(ChecksumType.NULL) // no checksums in cached data
1267                                 .withCompression(fileContext.getCompression())
1268                                 .withDataBlockEncoding(fileContext.getDataBlockEncoding())
1269                                 .withHBaseCheckSum(fileContext.isUseHBaseChecksum())
1270                                 .withCompressTags(fileContext.isCompressTags())
1271                                 .withIncludesMvcc(fileContext.isIncludesMvcc())
1272                                 .withIncludesTags(fileContext.isIncludesTags())
1273                                 .build();
1274        return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(),
1275           getUncompressedSizeWithoutHeader(), prevOffset,
1276           cacheConf.shouldCacheCompressed(blockType.getCategory())?
1277             getOnDiskBufferWithHeader() :
1278             getUncompressedBufferWithHeader(),
1279           FILL_HEADER, startOffset, UNSET,
1280           onDiskBlockBytesWithHeader.length + onDiskChecksum.length, newContext);
1281     }
1282   }
1283
1284   /** Something that can be written into a block. */
1285   interface BlockWritable {
1286
1287     /** The type of block this data should use. */
1288     BlockType getBlockType();
1289
1290     /**
1291      * Writes the block to the provided stream. Must not write any magic
1292      * records.
1293      *
1294      * @param out a stream to write uncompressed data into
1295      */
1296     void writeToBlock(DataOutput out) throws IOException;
1297   }
1298
1299   // Block readers and writers
1300
1301   /** An interface allowing to iterate {@link HFileBlock}s. */
1302   interface BlockIterator {
1303
1304     /**
1305      * Get the next block, or null if there are no more blocks to iterate.
1306      */
1307     HFileBlock nextBlock() throws IOException;
1308
1309     /**
1310      * Similar to {@link #nextBlock()} but checks block type, throws an
1311      * exception if incorrect, and returns the HFile block
1312      */
1313     HFileBlock nextBlockWithBlockType(BlockType blockType) throws IOException;
1314   }
1315
1316   /** A full-fledged reader with iteration ability. */
1317   interface FSReader {
1318
1319     /**
1320      * Reads the block at the given offset in the file with the given on-disk
1321      * size and uncompressed size.
1322      *
1323      * @param offset
1324      * @param onDiskSize the on-disk size of the entire block, including all
1325      *          applicable headers, or -1 if unknown
1326      * @return the newly read block
1327      */
1328     HFileBlock readBlockData(long offset, long onDiskSize, boolean pread) throws IOException;
1329
1330     /**
1331      * Creates a block iterator over the given portion of the {@link HFile}.
1332      * The iterator returns blocks starting with offset such that offset &lt;=
1333      * startOffset &lt; endOffset. Returned blocks are always unpacked.
1334      *
1335      * @param startOffset the offset of the block to start iteration with
1336      * @param endOffset the offset to end iteration at (exclusive)
1337      * @return an iterator of blocks between the two given offsets
1338      */
1339     BlockIterator blockRange(long startOffset, long endOffset);
1340
1341     /** Closes the backing streams */
1342     void closeStreams() throws IOException;
1343
1344     /** Get a decoder for {@link BlockType#ENCODED_DATA} blocks from this file. */
1345     HFileBlockDecodingContext getBlockDecodingContext();
1346
1347     /** Get the default decoder for blocks from this file. */
1348     HFileBlockDecodingContext getDefaultBlockDecodingContext();
1349
1350     void setIncludesMemstoreTS(boolean includesMemstoreTS);
1351     void setDataBlockEncoder(HFileDataBlockEncoder encoder);
1352   }
1353
1354   /**
1355    * We always prefetch the header of the next block, so that we know its
1356    * on-disk size in advance and can read it in one operation.
1357    */
1358   private static class PrefetchedHeader {
1359     long offset = -1;
1360     byte[] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE];
1361     final ByteBuffer buf = ByteBuffer.wrap(header, 0, HConstants.HFILEBLOCK_HEADER_SIZE);
1362     @Override
1363     public String toString() {
1364       return "offset=" + this.offset + ", header=" + Bytes.toStringBinary(header);
1365     }
1366   }
1367
1368   /**
1369    * Reads version 2 blocks from the filesystem.
1370    */
1371   static class FSReaderImpl implements FSReader {
1372     /** The file system stream of the underlying {@link HFile} that
1373      * does or doesn't do checksum validations in the filesystem */
1374     protected FSDataInputStreamWrapper streamWrapper;
1375
1376     private HFileBlockDecodingContext encodedBlockDecodingCtx;
1377
1378     /** Default context used when BlockType != {@link BlockType#ENCODED_DATA}. */
1379     private final HFileBlockDefaultDecodingContext defaultDecodingCtx;
1380
1381     /**
1382      * When we read a block, we overread and pull in the next blocks header too. We will save it
1383      * here. If moving serially through the file, we will trip over this caching of the next blocks
1384      * header so we won't have to do explicit seek to find next blocks lengths, etc.
1385      */
1386     private ThreadLocal<PrefetchedHeader> prefetchedHeaderForThread =
1387         new ThreadLocal<PrefetchedHeader>() {
1388       @Override
1389       public PrefetchedHeader initialValue() {
1390         return new PrefetchedHeader();
1391       }
1392     };
1393
1394     /** Compression algorithm used by the {@link HFile} */
1395
1396     /** The size of the file we are reading from, or -1 if unknown. */
1397     protected long fileSize;
1398
1399     /** The size of the header */
1400     protected final int hdrSize;
1401
1402     /** The filesystem used to access data */
1403     protected HFileSystem hfs;
1404
1405     private final Lock streamLock = new ReentrantLock();
1406
1407     /** The default buffer size for our buffered streams */
1408     public static final int DEFAULT_BUFFER_SIZE = 1 << 20;
1409
1410     protected HFileContext fileContext;
1411     // Cache the fileName
1412     protected String pathName;
1413
1414     FSReaderImpl(FSDataInputStreamWrapper stream, long fileSize, HFileSystem hfs, Path path,
1415         HFileContext fileContext) throws IOException {
1416       this.fileSize = fileSize;
1417       this.hfs = hfs;
1418       if (path != null) {
1419         this.pathName = path.toString();
1420       }
1421       this.fileContext = fileContext;
1422       this.hdrSize = headerSize(fileContext.isUseHBaseChecksum());
1423
1424       this.streamWrapper = stream;
1425       // Older versions of HBase didn't support checksum.
1426       this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum());
1427       defaultDecodingCtx = new HFileBlockDefaultDecodingContext(fileContext);
1428       encodedBlockDecodingCtx = defaultDecodingCtx;
1429     }
1430
1431     /**
1432      * A constructor that reads files with the latest minor version.
1433      * This is used by unit tests only.
1434      */
1435     FSReaderImpl(FSDataInputStream istream, long fileSize, HFileContext fileContext)
1436     throws IOException {
1437       this(new FSDataInputStreamWrapper(istream), fileSize, null, null, fileContext);
1438     }
1439
1440     public BlockIterator blockRange(final long startOffset, final long endOffset) {
1441       final FSReader owner = this; // handle for inner class
1442       return new BlockIterator() {
1443         private long offset = startOffset;
1444
1445         @Override
1446         public HFileBlock nextBlock() throws IOException {
1447           if (offset >= endOffset)
1448             return null;
1449           HFileBlock b = readBlockData(offset, -1, false);
1450           offset += b.getOnDiskSizeWithHeader();
1451           return b.unpack(fileContext, owner);
1452         }
1453
1454         @Override
1455         public HFileBlock nextBlockWithBlockType(BlockType blockType)
1456             throws IOException {
1457           HFileBlock blk = nextBlock();
1458           if (blk.getBlockType() != blockType) {
1459             throw new IOException("Expected block of type " + blockType
1460                 + " but found " + blk.getBlockType());
1461           }
1462           return blk;
1463         }
1464       };
1465     }
1466
1467     /**
1468      * Does a positional read or a seek and read into the given buffer. Returns
1469      * the on-disk size of the next block, or -1 if it could not be read/determined; e.g. EOF.
1470      *
1471      * @param dest destination buffer
1472      * @param destOffset offset into the destination buffer at where to put the bytes we read
1473      * @param size size of read
1474      * @param peekIntoNextBlock whether to read the next block's on-disk size
1475      * @param fileOffset position in the stream to read at
1476      * @param pread whether we should do a positional read
1477      * @param istream The input source of data
1478      * @return the on-disk size of the next block with header size included, or
1479      *         -1 if it could not be determined; if not -1, the <code>dest</code> INCLUDES the
1480      *         next header
1481      * @throws IOException
1482      */
1483     protected int readAtOffset(FSDataInputStream istream, byte [] dest, int destOffset, int size,
1484         boolean peekIntoNextBlock, long fileOffset, boolean pread) throws IOException {
1485       if (peekIntoNextBlock && destOffset + size + hdrSize > dest.length) {
1486         // We are asked to read the next block's header as well, but there is
1487         // not enough room in the array.
1488         throw new IOException("Attempted to read " + size + " bytes and " +
1489             hdrSize + " bytes of next header into a " + dest.length +
1490             "-byte array at offset " + destOffset);
1491       }
1492
1493       if (!pread && streamLock.tryLock()) {
1494         // Seek + read. Better for scanning.
1495         try {
1496           istream.seek(fileOffset);
1497
1498           long realOffset = istream.getPos();
1499           if (realOffset != fileOffset) {
1500             throw new IOException("Tried to seek to " + fileOffset + " to "
1501                 + "read " + size + " bytes, but pos=" + realOffset
1502                 + " after seek");
1503           }
1504
1505           if (!peekIntoNextBlock) {
1506             IOUtils.readFully(istream, dest, destOffset, size);
1507             return -1;
1508           }
1509
1510           // Try to read the next block header.
1511           if (!readWithExtra(istream, dest, destOffset, size, hdrSize)) {
1512             return -1;
1513           }
1514         } finally {
1515           streamLock.unlock();
1516         }
1517       } else {
1518         // Positional read. Better for random reads; or when the streamLock is already locked.
1519         int extraSize = peekIntoNextBlock ? hdrSize : 0;
1520         if (!positionalReadWithExtra(istream, fileOffset, dest, destOffset, size, extraSize)) {
1521           return -1;
1522         }
1523       }
1524
1525       assert peekIntoNextBlock;
1526       return Bytes.toInt(dest, destOffset + size + BlockType.MAGIC_LENGTH) + hdrSize;
1527     }
1528
1529     /**
1530      * Reads a version 2 block (version 1 blocks not supported and not expected). Tries to do as
1531      * little memory allocation as possible, using the provided on-disk size.
1532      *
1533      * @param offset the offset in the stream to read at
1534      * @param onDiskSizeWithHeaderL the on-disk size of the block, including
1535      *          the header, or -1 if unknown
1536      * @param pread whether to use a positional read
1537      */
1538     @Override
1539     public HFileBlock readBlockData(long offset, long onDiskSizeWithHeaderL, boolean pread)
1540     throws IOException {
1541       // Get a copy of the current state of whether to validate
1542       // hbase checksums or not for this read call. This is not
1543       // thread-safe but the one constaint is that if we decide
1544       // to skip hbase checksum verification then we are
1545       // guaranteed to use hdfs checksum verification.
1546       boolean doVerificationThruHBaseChecksum = streamWrapper.shouldUseHBaseChecksum();
1547       FSDataInputStream is = streamWrapper.getStream(doVerificationThruHBaseChecksum);
1548
1549       HFileBlock blk = readBlockDataInternal(is, offset,
1550                          onDiskSizeWithHeaderL, pread,
1551                          doVerificationThruHBaseChecksum);
1552       if (blk == null) {
1553         HFile.LOG.warn("HBase checksum verification failed for file " +
1554                        pathName + " at offset " +
1555                        offset + " filesize " + fileSize +
1556                        ". Retrying read with HDFS checksums turned on...");
1557
1558         if (!doVerificationThruHBaseChecksum) {
1559           String msg = "HBase checksum verification failed for file " +
1560                        pathName + " at offset " +
1561                        offset + " filesize " + fileSize +
1562                        " but this cannot happen because doVerify is " +
1563                        doVerificationThruHBaseChecksum;
1564           HFile.LOG.warn(msg);
1565           throw new IOException(msg); // cannot happen case here
1566         }
1567         HFile.CHECKSUM_FAILURES.increment(); // update metrics
1568
1569         // If we have a checksum failure, we fall back into a mode where
1570         // the next few reads use HDFS level checksums. We aim to make the
1571         // next CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD reads avoid
1572         // hbase checksum verification, but since this value is set without
1573         // holding any locks, it can so happen that we might actually do
1574         // a few more than precisely this number.
1575         is = this.streamWrapper.fallbackToFsChecksum(CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD);
1576         doVerificationThruHBaseChecksum = false;
1577         blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread,
1578                                     doVerificationThruHBaseChecksum);
1579         if (blk != null) {
1580           HFile.LOG.warn("HDFS checksum verification suceeded for file " +
1581                          pathName + " at offset " +
1582                          offset + " filesize " + fileSize);
1583         }
1584       }
1585       if (blk == null && !doVerificationThruHBaseChecksum) {
1586         String msg = "readBlockData failed, possibly due to " +
1587                      "checksum verification failed for file " + pathName +
1588                      " at offset " + offset + " filesize " + fileSize;
1589         HFile.LOG.warn(msg);
1590         throw new IOException(msg);
1591       }
1592
1593       // If there is a checksum mismatch earlier, then retry with
1594       // HBase checksums switched off and use HDFS checksum verification.
1595       // This triggers HDFS to detect and fix corrupt replicas. The
1596       // next checksumOffCount read requests will use HDFS checksums.
1597       // The decrementing of this.checksumOffCount is not thread-safe,
1598       // but it is harmless because eventually checksumOffCount will be
1599       // a negative number.
1600       streamWrapper.checksumOk();
1601       return blk;
1602     }
1603
1604     /**
1605      * @return Check <code>onDiskSizeWithHeaderL</code> size is healthy and then return it as an int
1606      * @throws IOException
1607      */
1608     private static int checkAndGetSizeAsInt(final long onDiskSizeWithHeaderL, final int hdrSize)
1609     throws IOException {
1610       if ((onDiskSizeWithHeaderL < hdrSize && onDiskSizeWithHeaderL != -1)
1611           || onDiskSizeWithHeaderL >= Integer.MAX_VALUE) {
1612         throw new IOException("Invalid onDisksize=" + onDiskSizeWithHeaderL
1613             + ": expected to be at least " + hdrSize
1614             + " and at most " + Integer.MAX_VALUE + ", or -1");
1615       }
1616       return (int)onDiskSizeWithHeaderL;
1617     }
1618
1619     /**
1620      * Check threadlocal cache for this block's header; we usually read it on the tail of reading
1621      * the previous block to save a seek. Otherwise, we have to do a seek to read the header before
1622      * we can pull in the block.
1623      * @return The cached block header or null if not found.
1624      * @see #cacheNextBlockHeader(long, byte[], int, int)
1625      */
1626     private ByteBuffer getCachedHeader(final long offset) {
1627       PrefetchedHeader prefetchedHeader = prefetchedHeaderForThread.get();
1628       // PrefetchedHeader prefetchedHeader = prefetchedHeaderForThread.get();
1629       return prefetchedHeader != null && prefetchedHeader.offset == offset?
1630           prefetchedHeader.buf: null;
1631     }
1632
1633     /**
1634      * Save away the next blocks header in thread local.
1635      * @see #getCachedHeader(long)
1636      */
1637     private void cacheNextBlockHeader(final long nextBlockOffset,
1638         final byte [] header, final int headerOffset, final int headerLength) {
1639       PrefetchedHeader prefetchedHeader = prefetchedHeaderForThread.get();
1640       prefetchedHeader.offset = nextBlockOffset;
1641       System.arraycopy(header, headerOffset, prefetchedHeader.header, 0, headerLength);
1642     }
1643
1644     /**
1645      * Verify the passed in onDiskSizeWithHeader aligns with what is in the header else something
1646      * is not right.
1647      * @throws IOException
1648      */
1649     private void verifyOnDiskSizeMatchesHeader(final int passedIn, final ByteBuffer headerBuf,
1650         final long offset)
1651     throws IOException {
1652       // Assert size provided aligns with what is in the header
1653       int fromHeader = getOnDiskSizeWithHeader(headerBuf);
1654       if (passedIn != fromHeader) {
1655         throw new IOException("Passed in onDiskSizeWithHeader=" + passedIn + " != " + fromHeader +
1656             ", offset=" + offset + ", fileContext=" + this.fileContext);
1657       }
1658     }
1659
1660     /**
1661      * Reads a version 2 block.
1662      *
1663      * @param offset the offset in the stream to read at
1664      * @param onDiskSizeWithHeaderL the on-disk size of the block, including
1665      *          the header and checksums if present or -1 if unknown
1666      * @param pread whether to use a positional read
1667      * @param verifyChecksum Whether to use HBase checksums.
1668      *        If HBase checksum is switched off, then use HDFS checksum.
1669      * @return the HFileBlock or null if there is a HBase checksum mismatch
1670      */
1671     protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset,
1672         long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum) throws IOException {
1673       if (offset < 0) {
1674         throw new IOException("Invalid offset=" + offset + " trying to read "
1675             + "block (onDiskSize=" + onDiskSizeWithHeaderL + ")");
1676       }
1677       int onDiskSizeWithHeader = checkAndGetSizeAsInt(onDiskSizeWithHeaderL, hdrSize);
1678       ByteBuffer headerBuf = getCachedHeader(offset);
1679       if (LOG.isTraceEnabled()) {
1680         LOG.trace("Reading " + this.fileContext.getHFileName() + " at offset=" + offset +
1681           ", pread=" + pread + ", verifyChecksum=" + verifyChecksum + ", cachedHeader=" +
1682           headerBuf + ", onDiskSizeWithHeader=" + onDiskSizeWithHeader);
1683       }
1684       if (onDiskSizeWithHeader <= 0) {
1685         // We were not passed the block size. Need to get it from the header. If header was not in
1686         // cache, need to seek to pull it in. This latter might happen when we are doing the first
1687         // read in a series of reads or a random read, and we don't have access to the block index.
1688         // This is costly and should happen very rarely.
1689         if (headerBuf == null) {
1690           headerBuf = ByteBuffer.allocate(hdrSize);
1691           readAtOffset(is, headerBuf.array(), headerBuf.arrayOffset(), hdrSize, false,
1692               offset, pread);
1693         }
1694         onDiskSizeWithHeader = getOnDiskSizeWithHeader(headerBuf);
1695       }
1696       int preReadHeaderSize = headerBuf == null? 0 : hdrSize;
1697       // Allocate enough space to fit the next block's header too; saves a seek next time through.
1698       // onDiskBlock is whole block + header + checksums then extra hdrSize to read next header;
1699       // onDiskSizeWithHeader is header, body, and any checksums if present.
1700       // TODO: Make this ByteBuffer-based. Will make it easier to go to HDFS with BBPool (offheap).
1701       byte[] onDiskBlock = new byte[onDiskSizeWithHeader + hdrSize];
1702       int nextBlockOnDiskSize = readAtOffset(is, onDiskBlock, preReadHeaderSize,
1703           onDiskSizeWithHeader - preReadHeaderSize, true, offset + preReadHeaderSize, pread);
1704       if (headerBuf != null) {
1705         // The header has been read when reading the previous block OR in a distinct header-only
1706         // read. Copy to this block's header.
1707         System.arraycopy(headerBuf.array(), headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize);
1708       } else {
1709         headerBuf = ByteBuffer.wrap(onDiskBlock, 0, hdrSize);
1710       }
1711       // Do a few checks before we go instantiate HFileBlock.
1712       assert onDiskSizeWithHeader > this.hdrSize;
1713       verifyOnDiskSizeMatchesHeader(onDiskSizeWithHeader, headerBuf, offset);
1714       ByteBuffer onDiskBlockByteBuffer = ByteBuffer.wrap(onDiskBlock, 0, onDiskSizeWithHeader);
1715       // Verify checksum of the data before using it for building HFileBlock.
1716       if (verifyChecksum &&
1717           !validateChecksum(offset, onDiskBlockByteBuffer, hdrSize)) {
1718         return null;
1719       }
1720       // The onDiskBlock will become the headerAndDataBuffer for this block.
1721       // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already
1722       // contains the header of next block, so no need to set next block's header in it.
1723       HFileBlock hFileBlock =
1724           new HFileBlock(new SingleByteBuff(onDiskBlockByteBuffer),
1725               this.fileContext.isUseHBaseChecksum(), MemoryType.EXCLUSIVE, offset,
1726               nextBlockOnDiskSize, fileContext);
1727       // Run check on uncompressed sizings.
1728       if (!fileContext.isCompressedOrEncrypted()) {
1729         hFileBlock.sanityCheckUncompressed();
1730       }
1731       if (LOG.isTraceEnabled()) {
1732         LOG.trace("Read " + hFileBlock);
1733       }
1734       // Cache next block header if we read it for the next time through here.
1735       if (nextBlockOnDiskSize != -1) {
1736         cacheNextBlockHeader(offset + hFileBlock.getOnDiskSizeWithHeader(),
1737             onDiskBlock, onDiskSizeWithHeader, hdrSize);
1738       }
1739       return hFileBlock;
1740     }
1741
1742     @Override
1743     public void setIncludesMemstoreTS(boolean includesMemstoreTS) {
1744       this.fileContext.setIncludesMvcc(includesMemstoreTS);
1745     }
1746
1747     @Override
1748     public void setDataBlockEncoder(HFileDataBlockEncoder encoder) {
1749       encodedBlockDecodingCtx = encoder.newDataBlockDecodingContext(this.fileContext);
1750     }
1751
1752     @Override
1753     public HFileBlockDecodingContext getBlockDecodingContext() {
1754       return this.encodedBlockDecodingCtx;
1755     }
1756
1757     @Override
1758     public HFileBlockDecodingContext getDefaultBlockDecodingContext() {
1759       return this.defaultDecodingCtx;
1760     }
1761
1762     /**
1763      * Generates the checksum for the header as well as the data and then validates it.
1764      * If the block doesn't uses checksum, returns false.
1765      * @return True if checksum matches, else false.
1766      */
1767     protected boolean validateChecksum(long offset, ByteBuffer data, int hdrSize)
1768         throws IOException {
1769       // If this is an older version of the block that does not have checksums, then return false
1770       // indicating that checksum verification did not succeed. Actually, this method should never
1771       // be called when the minorVersion is 0, thus this is a defensive check for a cannot-happen
1772       // case. Since this is a cannot-happen case, it is better to return false to indicate a
1773       // checksum validation failure.
1774       if (!fileContext.isUseHBaseChecksum()) {
1775         return false;
1776       }
1777       return ChecksumUtil.validateChecksum(data, pathName, offset, hdrSize);
1778     }
1779
1780     @Override
1781     public void closeStreams() throws IOException {
1782       streamWrapper.close();
1783     }
1784
1785     @Override
1786     public String toString() {
1787       return "hfs=" + hfs + ", path=" + pathName + ", fileContext=" + fileContext;
1788     }
1789   }
1790
1791   /** An additional sanity-check in case no compression or encryption is being used. */
1792   void sanityCheckUncompressed() throws IOException {
1793     if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader +
1794         totalChecksumBytes()) {
1795       throw new IOException("Using no compression but "
1796           + "onDiskSizeWithoutHeader=" + onDiskSizeWithoutHeader + ", "
1797           + "uncompressedSizeWithoutHeader=" + uncompressedSizeWithoutHeader
1798           + ", numChecksumbytes=" + totalChecksumBytes());
1799     }
1800   }
1801
1802   // Cacheable implementation
1803   @Override
1804   public int getSerializedLength() {
1805     if (buf != null) {
1806       // Include extra bytes for block metadata.
1807       return this.buf.limit() + BLOCK_METADATA_SPACE;
1808     }
1809     return 0;
1810   }
1811
1812   // Cacheable implementation
1813   @Override
1814   public void serialize(ByteBuffer destination) {
1815     // BE CAREFUL!! There is a custom version of this serialization over in BucketCache#doDrain.
1816     // Make sure any changes in here are reflected over there.
1817     this.buf.get(destination, 0, getSerializedLength() - BLOCK_METADATA_SPACE);
1818     destination = addMetaData(destination);
1819
1820     // Make it ready for reading. flip sets position to zero and limit to current position which
1821     // is what we want if we do not want to serialize the block plus checksums if present plus
1822     // metadata.
1823     destination.flip();
1824   }
1825
1826   /**
1827    * For use by bucketcache. This exposes internals.
1828    */
1829   public ByteBuffer getMetaData() {
1830     ByteBuffer bb = ByteBuffer.allocate(BLOCK_METADATA_SPACE);
1831     bb = addMetaData(bb);
1832     bb.flip();
1833     return bb;
1834   }
1835
1836   /**
1837    * Adds metadata at current position (position is moved forward). Does not flip or reset.
1838    * @return The passed <code>destination</code> with metadata added.
1839    */
1840   private ByteBuffer addMetaData(final ByteBuffer destination) {
1841     destination.put(this.fileContext.isUseHBaseChecksum() ? (byte) 1 : (byte) 0);
1842     destination.putLong(this.offset);
1843     destination.putInt(this.nextBlockOnDiskSize);
1844     return destination;
1845   }
1846
1847   // Cacheable implementation
1848   @Override
1849   public CacheableDeserializer<Cacheable> getDeserializer() {
1850     return HFileBlock.BLOCK_DESERIALIZER;
1851   }
1852
1853   @Override
1854   public int hashCode() {
1855     int result = 1;
1856     result = result * 31 + blockType.hashCode();
1857     result = result * 31 + nextBlockOnDiskSize;
1858     result = result * 31 + (int) (offset ^ (offset >>> 32));
1859     result = result * 31 + onDiskSizeWithoutHeader;
1860     result = result * 31 + (int) (prevBlockOffset ^ (prevBlockOffset >>> 32));
1861     result = result * 31 + uncompressedSizeWithoutHeader;
1862     result = result * 31 + buf.hashCode();
1863     return result;
1864   }
1865
1866   @Override
1867   public boolean equals(Object comparison) {
1868     if (this == comparison) {
1869       return true;
1870     }
1871     if (comparison == null) {
1872       return false;
1873     }
1874     if (comparison.getClass() != this.getClass()) {
1875       return false;
1876     }
1877
1878     HFileBlock castedComparison = (HFileBlock) comparison;
1879
1880     if (castedComparison.blockType != this.blockType) {
1881       return false;
1882     }
1883     if (castedComparison.nextBlockOnDiskSize != this.nextBlockOnDiskSize) {
1884       return false;
1885     }
1886     // Offset is important. Needed when we have to remake cachekey when block is returned to cache.
1887     if (castedComparison.offset != this.offset) {
1888       return false;
1889     }
1890     if (castedComparison.onDiskSizeWithoutHeader != this.onDiskSizeWithoutHeader) {
1891       return false;
1892     }
1893     if (castedComparison.prevBlockOffset != this.prevBlockOffset) {
1894       return false;
1895     }
1896     if (castedComparison.uncompressedSizeWithoutHeader != this.uncompressedSizeWithoutHeader) {
1897       return false;
1898     }
1899     if (ByteBuff.compareTo(this.buf, 0, this.buf.limit(), castedComparison.buf, 0,
1900         castedComparison.buf.limit()) != 0) {
1901       return false;
1902     }
1903     return true;
1904   }
1905
1906   public DataBlockEncoding getDataBlockEncoding() {
1907     if (blockType == BlockType.ENCODED_DATA) {
1908       return DataBlockEncoding.getEncodingById(getDataBlockEncodingId());
1909     }
1910     return DataBlockEncoding.NONE;
1911   }
1912
1913   byte getChecksumType() {
1914     return this.fileContext.getChecksumType().getCode();
1915   }
1916
1917   int getBytesPerChecksum() {
1918     return this.fileContext.getBytesPerChecksum();
1919   }
1920
1921   /** @return the size of data on disk + header. Excludes checksum. */
1922   int getOnDiskDataSizeWithHeader() {
1923     return this.onDiskDataSizeWithHeader;
1924   }
1925
1926   /**
1927    * Calculate the number of bytes required to store all the checksums
1928    * for this block. Each checksum value is a 4 byte integer.
1929    */
1930   int totalChecksumBytes() {
1931     // If the hfile block has minorVersion 0, then there are no checksum
1932     // data to validate. Similarly, a zero value in this.bytesPerChecksum
1933     // indicates that cached blocks do not have checksum data because
1934     // checksums were already validated when the block was read from disk.
1935     if (!fileContext.isUseHBaseChecksum() || this.fileContext.getBytesPerChecksum() == 0) {
1936       return 0;
1937     }
1938     return (int) ChecksumUtil.numBytes(onDiskDataSizeWithHeader,
1939         this.fileContext.getBytesPerChecksum());
1940   }
1941
1942   /**
1943    * Returns the size of this block header.
1944    */
1945   public int headerSize() {
1946     return headerSize(this.fileContext.isUseHBaseChecksum());
1947   }
1948
1949   /**
1950    * Maps a minor version to the size of the header.
1951    */
1952   public static int headerSize(boolean usesHBaseChecksum) {
1953     return usesHBaseChecksum?
1954         HConstants.HFILEBLOCK_HEADER_SIZE: HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
1955   }
1956
1957   /**
1958    * Return the appropriate DUMMY_HEADER for the minor version
1959    */
1960   byte[] getDummyHeaderForVersion() {
1961     return getDummyHeaderForVersion(this.fileContext.isUseHBaseChecksum());
1962   }
1963
1964   /**
1965    * Return the appropriate DUMMY_HEADER for the minor version
1966    */
1967   static private byte[] getDummyHeaderForVersion(boolean usesHBaseChecksum) {
1968     return usesHBaseChecksum? HConstants.HFILEBLOCK_DUMMY_HEADER: DUMMY_HEADER_NO_CHECKSUM;
1969   }
1970
1971   /**
1972    * @return This HFileBlocks fileContext which will a derivative of the
1973    * fileContext for the file from which this block's data was originally read.
1974    */
1975   HFileContext getHFileContext() {
1976     return this.fileContext;
1977   }
1978
1979   @Override
1980   public MemoryType getMemoryType() {
1981     return this.memType;
1982   }
1983
1984   /**
1985    * @return true if this block is backed by a shared memory area(such as that of a BucketCache).
1986    */
1987   boolean usesSharedMemory() {
1988     return this.memType == MemoryType.SHARED;
1989   }
1990
1991   /**
1992    * Convert the contents of the block header into a human readable string.
1993    * This is mostly helpful for debugging. This assumes that the block
1994    * has minor version > 0.
1995    */
1996   @VisibleForTesting
1997   static String toStringHeader(ByteBuff buf) throws IOException {
1998     byte[] magicBuf = new byte[Math.min(buf.limit() - buf.position(), BlockType.MAGIC_LENGTH)];
1999     buf.get(magicBuf);
2000     BlockType bt = BlockType.parse(magicBuf, 0, BlockType.MAGIC_LENGTH);
2001     int compressedBlockSizeNoHeader = buf.getInt();
2002     int uncompressedBlockSizeNoHeader = buf.getInt();
2003     long prevBlockOffset = buf.getLong();
2004     byte cksumtype = buf.get();
2005     long bytesPerChecksum = buf.getInt();
2006     long onDiskDataSizeWithHeader = buf.getInt();
2007     return " Header dump: magic: " + Bytes.toString(magicBuf) +
2008                    " blockType " + bt +
2009                    " compressedBlockSizeNoHeader " +
2010                    compressedBlockSizeNoHeader +
2011                    " uncompressedBlockSizeNoHeader " +
2012                    uncompressedBlockSizeNoHeader +
2013                    " prevBlockOffset " + prevBlockOffset +
2014                    " checksumType " + ChecksumType.codeToType(cksumtype) +
2015                    " bytesPerChecksum " + bytesPerChecksum +
2016                    " onDiskDataSizeWithHeader " + onDiskDataSizeWithHeader;
2017   }
2018 }