View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.io.hfile;
19  
20  import java.io.ByteArrayInputStream;
21  import java.io.ByteArrayOutputStream;
22  import java.io.DataInputStream;
23  import java.io.DataOutput;
24  import java.io.DataOutputStream;
25  import java.io.IOException;
26  import java.io.InputStream;
27  import java.nio.ByteBuffer;
28  import java.util.concurrent.locks.Lock;
29  import java.util.concurrent.locks.ReentrantLock;
30  
31  import org.apache.hadoop.classification.InterfaceAudience;
32  import org.apache.hadoop.fs.FSDataInputStream;
33  import org.apache.hadoop.fs.FSDataOutputStream;
34  import org.apache.hadoop.fs.Path;
35  import org.apache.hadoop.hbase.HConstants;
36  import org.apache.hadoop.hbase.KeyValue;
37  import org.apache.hadoop.hbase.fs.HFileSystem;
38  import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
39  import org.apache.hadoop.hbase.io.compress.Compression;
40  import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
41  import org.apache.hadoop.hbase.io.crypto.Encryption;
42  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
43  import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
44  import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
45  import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
46  import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
47  import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
48  import org.apache.hadoop.hbase.util.Bytes;
49  import org.apache.hadoop.hbase.util.ChecksumType;
50  import org.apache.hadoop.hbase.util.ClassSize;
51  import org.apache.hadoop.hbase.util.CompoundBloomFilter;
52  import org.apache.hadoop.io.IOUtils;
53  
54  import com.google.common.base.Preconditions;
55  
56  /**
57   * Reading {@link HFile} version 1 and 2 blocks, and writing version 2 blocks.
58   * <ul>
59   * <li>In version 1 all blocks are always compressed or uncompressed, as
60   * specified by the {@link HFile}'s compression algorithm, with a type-specific
61   * magic record stored in the beginning of the compressed data (i.e. one needs
62   * to uncompress the compressed block to determine the block type). There is
63   * only a single compression algorithm setting for all blocks. Offset and size
64   * information from the block index are required to read a block.
65   * <li>In version 2 a block is structured as follows:
66   * <ul>
67   * <li>Magic record identifying the block type (8 bytes)
68   * <li>Compressed block size, header not included (4 bytes)
69   * <li>Uncompressed block size, header not included (4 bytes)
70   * <li>The offset of the previous block of the same type (8 bytes). This is
71   * used to be able to navigate to the previous block without going to the block
72   * <li>For minorVersions >=1, there is an additional 4 byte field 
73   * bytesPerChecksum that records the number of bytes in a checksum chunk.
74   * <li>For minorVersions >=1, there is a 4 byte value to store the size of
75   * data on disk (excluding the checksums)
76   * <li>For minorVersions >=1, a series of 4 byte checksums, one each for
77   * the number of bytes specified by bytesPerChecksum.
78   * index.
79   * <li>Compressed data (or uncompressed data if compression is disabled). The
80   * compression algorithm is the same for all the blocks in the {@link HFile},
81   * similarly to what was done in version 1.
82   * </ul>
83   * </ul>
84   * The version 2 block representation in the block cache is the same as above,
85   * except that the data section is always uncompressed in the cache.
86   */
87  @InterfaceAudience.Private
88  public class HFileBlock implements Cacheable {
89  
90    /**
91     * On a checksum failure on a Reader, these many suceeding read
92     * requests switch back to using hdfs checksums before auto-reenabling
93     * hbase checksum verification.
94     */
95    static final int CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD = 3;
96  
97    public static final boolean FILL_HEADER = true;
98    public static final boolean DONT_FILL_HEADER = false;
99  
100   /**
101    * The size of block header when blockType is {@link BlockType#ENCODED_DATA}.
102    * This extends normal header by adding the id of encoder.
103    */
104   public static final int ENCODED_HEADER_SIZE = HConstants.HFILEBLOCK_HEADER_SIZE
105       + DataBlockEncoding.ID_SIZE;
106 
107   static final byte[] DUMMY_HEADER_NO_CHECKSUM =
108      new byte[HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM];
109 
110   public static final int BYTE_BUFFER_HEAP_SIZE = (int) ClassSize.estimateBase(
111       ByteBuffer.wrap(new byte[0], 0, 0).getClass(), false);
112 
113   // meta.usesHBaseChecksum+offset+nextBlockOnDiskSizeWithHeader
114   public static final int EXTRA_SERIALIZATION_SPACE = Bytes.SIZEOF_BYTE +  Bytes.SIZEOF_INT
115       + Bytes.SIZEOF_LONG;
116 
117   /**
118    * Each checksum value is an integer that can be stored in 4 bytes.
119    */
120   static final int CHECKSUM_SIZE = Bytes.SIZEOF_INT;
121 
122   private static final CacheableDeserializer<Cacheable> blockDeserializer =
123       new CacheableDeserializer<Cacheable>() {
124         public HFileBlock deserialize(ByteBuffer buf, boolean reuse) throws IOException{
125           buf.limit(buf.limit() - HFileBlock.EXTRA_SERIALIZATION_SPACE).rewind();
126           ByteBuffer newByteBuffer;
127           if (reuse) {
128             newByteBuffer = buf.slice();
129           } else {
130            newByteBuffer = ByteBuffer.allocate(buf.limit());
131            newByteBuffer.put(buf);
132           }
133           buf.position(buf.limit());
134           buf.limit(buf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE);
135           boolean usesChecksum = buf.get() == (byte)1;
136           HFileBlock ourBuffer = new HFileBlock(newByteBuffer, usesChecksum);
137           ourBuffer.offset = buf.getLong();
138           ourBuffer.nextBlockOnDiskSizeWithHeader = buf.getInt();
139           return ourBuffer;
140         }
141         
142         @Override
143         public int getDeserialiserIdentifier() {
144           return deserializerIdentifier;
145         }
146 
147         @Override
148         public HFileBlock deserialize(ByteBuffer b) throws IOException {
149           return deserialize(b, false);
150         }
151       };
152   private static final int deserializerIdentifier;
153   static {
154     deserializerIdentifier = CacheableDeserializerIdManager
155         .registerDeserializer(blockDeserializer);
156   }
157 
158   private BlockType blockType;
159 
160   /** Size on disk without the header. It includes checksum data too. */
161   private int onDiskSizeWithoutHeader;
162 
163   /** Size of pure data. Does not include header or checksums */
164   private final int uncompressedSizeWithoutHeader;
165 
166   /** The offset of the previous block on disk */
167   private final long prevBlockOffset;
168 
169   /** Size on disk of header and data. Does not include checksum data */
170   private final int onDiskDataSizeWithHeader;
171 
172   /** The in-memory representation of the hfile block */
173   private ByteBuffer buf;
174   /** Meta data that holds meta information on the hfileblock**/
175   private HFileContext fileContext;
176 
177   /**
178    * The offset of this block in the file. Populated by the reader for
179    * convenience of access. This offset is not part of the block header.
180    */
181   private long offset = -1;
182 
183   /**
184    * The on-disk size of the next block, including the header, obtained by
185    * peeking into the first {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the next block's
186    * header, or -1 if unknown.
187    */
188   private int nextBlockOnDiskSizeWithHeader = -1;
189 
190   /**
191    * Creates a new {@link HFile} block from the given fields. This constructor
192    * is mostly used when the block data has already been read and uncompressed,
193    * and is sitting in a byte buffer. 
194    *
195    * @param blockType the type of this block, see {@link BlockType}
196    * @param onDiskSizeWithoutHeader compressed size of the block if compression
197    *          is used, otherwise uncompressed size, header size not included
198    * @param uncompressedSizeWithoutHeader uncompressed size of the block,
199    *          header size not included. Equals onDiskSizeWithoutHeader if
200    *          compression is disabled.
201    * @param prevBlockOffset the offset of the previous block in the
202    *          {@link HFile}
203    * @param buf block header ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes) followed by
204    *          uncompressed data. This
205    * @param fillHeader true to fill in the first {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of
206    *          the buffer based on the header fields provided
207    * @param offset the file offset the block was read from
208    * @param bytesPerChecksum the number of bytes per checksum chunk
209    * @param checksumType the checksum algorithm to use
210    * @param onDiskDataSizeWithHeader size of header and data on disk not
211    *        including checksum data
212    * @param fileContext HFile meta data
213    */
214   HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader,
215       int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuffer buf,
216       boolean fillHeader, long offset,
217       int onDiskDataSizeWithHeader, HFileContext fileContext) {
218     this.blockType = blockType;
219     this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader;
220     this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader;
221     this.prevBlockOffset = prevBlockOffset;
222     this.buf = buf;
223     if (fillHeader)
224       overwriteHeader();
225     this.offset = offset;
226     this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader;
227     this.fileContext = fileContext;
228   }
229 
230   /**
231    * Creates a block from an existing buffer starting with a header. Rewinds
232    * and takes ownership of the buffer. By definition of rewind, ignores the
233    * buffer position, but if you slice the buffer beforehand, it will rewind
234    * to that point. The reason this has a minorNumber and not a majorNumber is
235    * because majorNumbers indicate the format of a HFile whereas minorNumbers 
236    * indicate the format inside a HFileBlock.
237    */
238   HFileBlock(ByteBuffer b, boolean usesHBaseChecksum) throws IOException {
239     b.rewind();
240     blockType = BlockType.read(b);
241     onDiskSizeWithoutHeader = b.getInt();
242     uncompressedSizeWithoutHeader = b.getInt();
243     prevBlockOffset = b.getLong();
244     HFileContextBuilder contextBuilder = new HFileContextBuilder();
245     contextBuilder.withHBaseCheckSum(usesHBaseChecksum);
246     if (usesHBaseChecksum) {
247       contextBuilder.withChecksumType(ChecksumType.codeToType(b.get()));
248       contextBuilder.withBytesPerCheckSum(b.getInt());
249       this.onDiskDataSizeWithHeader = b.getInt();
250     } else {
251       contextBuilder.withChecksumType(ChecksumType.NULL);
252       contextBuilder.withBytesPerCheckSum(0);
253       this.onDiskDataSizeWithHeader = onDiskSizeWithoutHeader +
254                                        HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
255     }
256     this.fileContext = contextBuilder.build();
257     buf = b;
258     buf.rewind();
259   }
260 
261   public BlockType getBlockType() {
262     return blockType;
263   }
264 
265   /** @return get data block encoding id that was used to encode this block */
266   public short getDataBlockEncodingId() {
267     if (blockType != BlockType.ENCODED_DATA) {
268       throw new IllegalArgumentException("Querying encoder ID of a block " +
269           "of type other than " + BlockType.ENCODED_DATA + ": " + blockType);
270     }
271     return buf.getShort(headerSize());
272   }
273 
274   /**
275    * @return the on-disk size of the block with header size included. This
276    * includes the header, the data and the checksum data.
277    */
278   public int getOnDiskSizeWithHeader() {
279     return onDiskSizeWithoutHeader + headerSize();
280   }
281 
282   /**
283    * Returns the size of the compressed part of the block in case compression
284    * is used, or the uncompressed size of the data part otherwise. Header size
285    * and checksum data size is not included.
286    *
287    * @return the on-disk size of the data part of the block, header and
288    *         checksum not included. 
289    */
290   public int getOnDiskSizeWithoutHeader() {
291     return onDiskSizeWithoutHeader;
292   }
293 
294   /**
295    * @return the uncompressed size of the data part of the block, header not
296    *         included
297    */
298    public int getUncompressedSizeWithoutHeader() {
299     return uncompressedSizeWithoutHeader;
300   }
301 
302   /**
303    * @return the offset of the previous block of the same type in the file, or
304    *         -1 if unknown
305    */
306   public long getPrevBlockOffset() {
307     return prevBlockOffset;
308   }
309 
310   /**
311    * Writes header fields into the first {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the
312    * buffer. Resets the buffer position to the end of header as side effect.
313    */
314   private void overwriteHeader() {
315     buf.rewind();
316     blockType.write(buf);
317     buf.putInt(onDiskSizeWithoutHeader);
318     buf.putInt(uncompressedSizeWithoutHeader);
319     buf.putLong(prevBlockOffset);
320   }
321 
322   /**
323    * Returns a buffer that does not include the header. The array offset points
324    * to the start of the block data right after the header. The underlying data
325    * array is not copied. Checksum data is not included in the returned buffer.
326    *
327    * @return the buffer with header skipped
328    */
329   public ByteBuffer getBufferWithoutHeader() {
330     return ByteBuffer.wrap(buf.array(), buf.arrayOffset() + headerSize(),
331         buf.limit() - headerSize() - totalChecksumBytes()).slice();
332   }
333 
334   /**
335    * Returns the buffer this block stores internally. The clients must not
336    * modify the buffer object. This method has to be public because it is
337    * used in {@link CompoundBloomFilter} to avoid object creation on every
338    * Bloom filter lookup, but has to be used with caution. Checksum data
339    * is not included in the returned buffer.
340    *
341    * @return the buffer of this block for read-only operations
342    */
343   public ByteBuffer getBufferReadOnly() {
344     return ByteBuffer.wrap(buf.array(), buf.arrayOffset(),
345         buf.limit() - totalChecksumBytes()).slice();
346   }
347 
348   /**
349    * Returns the buffer of this block, including header data. The clients must
350    * not modify the buffer object. This method has to be public because it is
351    * used in {@link BucketCache} to avoid buffer copy.
352    * 
353    * @return the byte buffer with header included for read-only operations
354    */
355   public ByteBuffer getBufferReadOnlyWithHeader() {
356     return ByteBuffer.wrap(buf.array(), buf.arrayOffset(), buf.limit()).slice();
357   }
358 
359   /**
360    * Returns a byte buffer of this block, including header data, positioned at
361    * the beginning of header. The underlying data array is not copied.
362    *
363    * @return the byte buffer with header included
364    */
365   ByteBuffer getBufferWithHeader() {
366     ByteBuffer dupBuf = buf.duplicate();
367     dupBuf.rewind();
368     return dupBuf;
369   }
370 
371   private void sanityCheckAssertion(long valueFromBuf, long valueFromField,
372       String fieldName) throws IOException {
373     if (valueFromBuf != valueFromField) {
374       throw new AssertionError(fieldName + " in the buffer (" + valueFromBuf
375           + ") is different from that in the field (" + valueFromField + ")");
376     }
377   }
378 
379   /**
380    * Checks if the block is internally consistent, i.e. the first
381    * {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the buffer contain a valid header consistent
382    * with the fields. This function is primary for testing and debugging, and
383    * is not thread-safe, because it alters the internal buffer pointer.
384    */
385   void sanityCheck() throws IOException {
386     buf.rewind();
387 
388     {
389       BlockType blockTypeFromBuf = BlockType.read(buf);
390       if (blockTypeFromBuf != blockType) {
391         throw new IOException("Block type stored in the buffer: " +
392             blockTypeFromBuf + ", block type field: " + blockType);
393       }
394     }
395 
396     sanityCheckAssertion(buf.getInt(), onDiskSizeWithoutHeader,
397         "onDiskSizeWithoutHeader");
398 
399     sanityCheckAssertion(buf.getInt(), uncompressedSizeWithoutHeader,
400         "uncompressedSizeWithoutHeader");
401 
402     sanityCheckAssertion(buf.getLong(), prevBlockOffset, "prevBlocKOffset");
403     if (this.fileContext.isUseHBaseChecksum()) {
404       sanityCheckAssertion(buf.get(), this.fileContext.getChecksumType().getCode(), "checksumType");
405       sanityCheckAssertion(buf.getInt(), this.fileContext.getBytesPerChecksum(), "bytesPerChecksum");
406       sanityCheckAssertion(buf.getInt(), onDiskDataSizeWithHeader, 
407                            "onDiskDataSizeWithHeader");
408     }
409 
410     int cksumBytes = totalChecksumBytes();
411     int hdrSize = headerSize();
412     int expectedBufLimit = uncompressedSizeWithoutHeader + headerSize() +
413                            cksumBytes;
414     if (buf.limit() != expectedBufLimit) {
415       throw new AssertionError("Expected buffer limit " + expectedBufLimit
416           + ", got " + buf.limit());
417     }
418 
419     // We might optionally allocate HFILEBLOCK_HEADER_SIZE more bytes to read the next
420     // block's, header, so there are two sensible values for buffer capacity.
421     int size = uncompressedSizeWithoutHeader + hdrSize + cksumBytes;
422     if (buf.capacity() != size &&
423         buf.capacity() != size + hdrSize) {
424       throw new AssertionError("Invalid buffer capacity: " + buf.capacity() +
425           ", expected " + size + " or " + (size + hdrSize));
426     }
427   }
428 
429   @Override
430   public String toString() {
431     return "blockType="
432         + blockType
433         + ", onDiskSizeWithoutHeader="
434         + onDiskSizeWithoutHeader
435         + ", uncompressedSizeWithoutHeader="
436         + uncompressedSizeWithoutHeader
437         + ", prevBlockOffset="
438         + prevBlockOffset
439         + ", dataBeginsWith="
440         + Bytes.toStringBinary(buf.array(), buf.arrayOffset() + headerSize(),
441             Math.min(32, buf.limit() - buf.arrayOffset() - headerSize()))
442         + ", fileOffset=" + offset;
443   }
444 
445   private void validateOnDiskSizeWithoutHeader(
446       int expectedOnDiskSizeWithoutHeader) throws IOException {
447     if (onDiskSizeWithoutHeader != expectedOnDiskSizeWithoutHeader) {
448       String blockInfoMsg =
449         "Block offset: " + offset + ", data starts with: "
450           + Bytes.toStringBinary(buf.array(), buf.arrayOffset(),
451               buf.arrayOffset() + Math.min(32, buf.limit()));
452       throw new IOException("On-disk size without header provided is "
453           + expectedOnDiskSizeWithoutHeader + ", but block "
454           + "header contains " + onDiskSizeWithoutHeader + ". " +
455           blockInfoMsg);
456     }
457   }
458 
459   /**
460    * Always allocates a new buffer of the correct size. Copies header bytes
461    * from the existing buffer. Does not change header fields. 
462    * Reserve room to keep checksum bytes too.
463    *
464    * @param extraBytes whether to reserve room in the buffer to read the next
465    *          block's header
466    */
467   private void allocateBuffer(boolean extraBytes) {
468     int cksumBytes = totalChecksumBytes();
469     int capacityNeeded = headerSize() + uncompressedSizeWithoutHeader +
470         cksumBytes +
471         (extraBytes ? headerSize() : 0);
472 
473     ByteBuffer newBuf = ByteBuffer.allocate(capacityNeeded);
474 
475     // Copy header bytes.
476     System.arraycopy(buf.array(), buf.arrayOffset(), newBuf.array(),
477         newBuf.arrayOffset(), headerSize());
478 
479     buf = newBuf;
480     buf.limit(headerSize() + uncompressedSizeWithoutHeader + cksumBytes);
481   }
482 
483   /** An additional sanity-check in case no compression is being used. */
484   public void assumeUncompressed() throws IOException {
485     if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader + 
486         totalChecksumBytes()) {
487       throw new IOException("Using no compression but "
488           + "onDiskSizeWithoutHeader=" + onDiskSizeWithoutHeader + ", "
489           + "uncompressedSizeWithoutHeader=" + uncompressedSizeWithoutHeader
490           + ", numChecksumbytes=" + totalChecksumBytes());
491     }
492   }
493 
494   /**
495    * @param expectedType the expected type of this block
496    * @throws IOException if this block's type is different than expected
497    */
498   public void expectType(BlockType expectedType) throws IOException {
499     if (blockType != expectedType) {
500       throw new IOException("Invalid block type: expected=" + expectedType
501           + ", actual=" + blockType);
502     }
503   }
504 
505   /** @return the offset of this block in the file it was read from */
506   public long getOffset() {
507     if (offset < 0) {
508       throw new IllegalStateException(
509           "HFile block offset not initialized properly");
510     }
511     return offset;
512   }
513 
514   /**
515    * @return a byte stream reading the data section of this block
516    */
517   public DataInputStream getByteStream() {
518     return new DataInputStream(new ByteArrayInputStream(buf.array(),
519         buf.arrayOffset() + headerSize(), buf.limit() - headerSize()));
520   }
521 
522   @Override
523   public long heapSize() {
524     long size = ClassSize.align(
525         ClassSize.OBJECT +
526         // Block type, byte buffer and meta references
527         3 * ClassSize.REFERENCE +
528         // On-disk size, uncompressed size, and next block's on-disk size
529         // bytePerChecksum and onDiskDataSize
530         4 * Bytes.SIZEOF_INT +
531         // This and previous block offset
532         2 * Bytes.SIZEOF_LONG +
533         // Heap size of the meta object. meta will be always not null.
534         fileContext.heapSize()
535     );
536 
537     if (buf != null) {
538       // Deep overhead of the byte buffer. Needs to be aligned separately.
539       size += ClassSize.align(buf.capacity() + BYTE_BUFFER_HEAP_SIZE);
540     }
541 
542     return ClassSize.align(size);
543   }
544 
545   /**
546    * Read from an input stream. Analogous to
547    * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but specifies a
548    * number of "extra" bytes that would be desirable but not absolutely
549    * necessary to read.
550    *
551    * @param in the input stream to read from
552    * @param buf the buffer to read into
553    * @param bufOffset the destination offset in the buffer
554    * @param necessaryLen the number of bytes that are absolutely necessary to
555    *          read
556    * @param extraLen the number of extra bytes that would be nice to read
557    * @return true if succeeded reading the extra bytes
558    * @throws IOException if failed to read the necessary bytes
559    */
560   public static boolean readWithExtra(InputStream in, byte buf[],
561       int bufOffset, int necessaryLen, int extraLen) throws IOException {
562     int bytesRemaining = necessaryLen + extraLen;
563     while (bytesRemaining > 0) {
564       int ret = in.read(buf, bufOffset, bytesRemaining);
565       if (ret == -1 && bytesRemaining <= extraLen) {
566         // We could not read the "extra data", but that is OK.
567         break;
568       }
569 
570       if (ret < 0) {
571         throw new IOException("Premature EOF from inputStream (read "
572             + "returned " + ret + ", was trying to read " + necessaryLen
573             + " necessary bytes and " + extraLen + " extra bytes, "
574             + "successfully read "
575             + (necessaryLen + extraLen - bytesRemaining));
576       }
577       bufOffset += ret;
578       bytesRemaining -= ret;
579     }
580     return bytesRemaining <= 0;
581   }
582 
583   /**
584    * @return the on-disk size of the next block (including the header size)
585    *         that was read by peeking into the next block's header
586    */
587   public int getNextBlockOnDiskSizeWithHeader() {
588     return nextBlockOnDiskSizeWithHeader;
589   }
590 
591 
592   /**
593    * Unified version 2 {@link HFile} block writer. The intended usage pattern
594    * is as follows:
595    * <ol>
596    * <li>Construct an {@link HFileBlock.Writer}, providing a compression algorithm.
597    * <li>Call {@link Writer#startWriting} and get a data stream to write to.
598    * <li>Write your data into the stream.
599    * <li>Call {@link Writer#writeHeaderAndData(FSDataOutputStream)} as many times as you need to.
600    * store the serialized block into an external stream.
601    * <li>Repeat to write more blocks.
602    * </ol>
603    * <p>
604    */
605   public static class Writer {
606 
607     private enum State {
608       INIT,
609       WRITING,
610       BLOCK_READY
611     };
612 
613     /** Writer state. Used to ensure the correct usage protocol. */
614     private State state = State.INIT;
615 
616     /** Data block encoder used for data blocks */
617     private final HFileDataBlockEncoder dataBlockEncoder;
618 
619     private HFileBlockEncodingContext dataBlockEncodingCtx;
620 
621     /** block encoding context for non-data blocks */
622     private HFileBlockDefaultEncodingContext defaultBlockEncodingCtx;
623 
624     /**
625      * The stream we use to accumulate data in uncompressed format for each
626      * block. We reset this stream at the end of each block and reuse it. The
627      * header is written as the first {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes into this
628      * stream.
629      */
630     private ByteArrayOutputStream baosInMemory;
631 
632     /**
633      * Current block type. Set in {@link #startWriting(BlockType)}. Could be
634      * changed in {@link #encodeDataBlockForDisk()} from {@link BlockType#DATA}
635      * to {@link BlockType#ENCODED_DATA}.
636      */
637     private BlockType blockType;
638 
639     /**
640      * A stream that we write uncompressed bytes to, which compresses them and
641      * writes them to {@link #baosInMemory}.
642      */
643     private DataOutputStream userDataStream;
644 
645     // Size of actual data being written. Not considering the block encoding/compression. This
646     // includes the header size also.
647     private int unencodedDataSizeWritten;
648 
649     /**
650      * Bytes to be written to the file system, including the header. Compressed
651      * if compression is turned on. It also includes the checksum data that 
652      * immediately follows the block data. (header + data + checksums)
653      */
654     private byte[] onDiskBytesWithHeader;
655 
656     /**
657      * The size of the checksum data on disk. It is used only if data is
658      * not compressed. If data is compressed, then the checksums are already
659      * part of onDiskBytesWithHeader. If data is uncompressed, then this
660      * variable stores the checksum data for this block.
661      */
662     private byte[] onDiskChecksum;
663 
664     /**
665      * Valid in the READY state. Contains the header and the uncompressed (but
666      * potentially encoded, if this is a data block) bytes, so the length is
667      * {@link #uncompressedSizeWithoutHeader} + {@link org.apache.hadoop.hbase.HConstants#HFILEBLOCK_HEADER_SIZE}.
668      * Does not store checksums.
669      */
670     private byte[] uncompressedBytesWithHeader;
671 
672     /**
673      * Current block's start offset in the {@link HFile}. Set in
674      * {@link #writeHeaderAndData(FSDataOutputStream)}.
675      */
676     private long startOffset;
677 
678     /**
679      * Offset of previous block by block type. Updated when the next block is
680      * started.
681      */
682     private long[] prevOffsetByType;
683 
684     /** The offset of the previous block of the same type */
685     private long prevOffset;
686     /** Meta data that holds information about the hfileblock**/
687     private HFileContext fileContext;
688 
689     /**
690      * @param dataBlockEncoder data block encoding algorithm to use
691      */
692     public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext fileContext) {
693       this.dataBlockEncoder = dataBlockEncoder != null
694           ? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE;
695       defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(null,
696           HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
697       dataBlockEncodingCtx = this.dataBlockEncoder
698           .newDataBlockEncodingContext(HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
699 
700       if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) {
701         throw new RuntimeException("Unsupported value of bytesPerChecksum. " +
702             " Minimum is " + HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is " +
703             fileContext.getBytesPerChecksum());
704       }
705 
706       baosInMemory = new ByteArrayOutputStream();
707       
708       prevOffsetByType = new long[BlockType.values().length];
709       for (int i = 0; i < prevOffsetByType.length; ++i)
710         prevOffsetByType[i] = -1;
711 
712       this.fileContext = fileContext;
713     }
714 
715     /**
716      * Starts writing into the block. The previous block's data is discarded.
717      *
718      * @return the stream the user can write their data into
719      * @throws IOException
720      */
721     public DataOutputStream startWriting(BlockType newBlockType)
722         throws IOException {
723       if (state == State.BLOCK_READY && startOffset != -1) {
724         // We had a previous block that was written to a stream at a specific
725         // offset. Save that offset as the last offset of a block of that type.
726         prevOffsetByType[blockType.getId()] = startOffset;
727       }
728 
729       startOffset = -1;
730       blockType = newBlockType;
731 
732       baosInMemory.reset();
733       baosInMemory.write(HConstants.HFILEBLOCK_DUMMY_HEADER);
734 
735       state = State.WRITING;
736 
737       // We will compress it later in finishBlock()
738       userDataStream = new DataOutputStream(baosInMemory);
739       if (newBlockType == BlockType.DATA) {
740         this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream);
741       }
742       this.unencodedDataSizeWritten = 0;
743       return userDataStream;
744     }
745 
746     /**
747      * Writes the kv to this block
748      * @param kv
749      * @throws IOException
750      */
751     public void write(KeyValue kv) throws IOException{
752       expectState(State.WRITING);
753       this.unencodedDataSizeWritten += this.dataBlockEncoder.encode(kv, dataBlockEncodingCtx,
754           this.userDataStream);
755     }
756 
757     /**
758      * Returns the stream for the user to write to. The block writer takes care
759      * of handling compression and buffering for caching on write. Can only be
760      * called in the "writing" state.
761      *
762      * @return the data output stream for the user to write to
763      */
764     DataOutputStream getUserDataStream() {
765       expectState(State.WRITING);
766       return userDataStream;
767     }
768 
769     /**
770      * Transitions the block writer from the "writing" state to the "block
771      * ready" state.  Does nothing if a block is already finished.
772      */
773     void ensureBlockReady() throws IOException {
774       Preconditions.checkState(state != State.INIT,
775           "Unexpected state: " + state);
776 
777       if (state == State.BLOCK_READY)
778         return;
779 
780       // This will set state to BLOCK_READY.
781       finishBlock();
782     }
783 
784     /**
785      * An internal method that flushes the compressing stream (if using
786      * compression), serializes the header, and takes care of the separate
787      * uncompressed stream for caching on write, if applicable. Sets block
788      * write state to "block ready".
789      */
790     private void finishBlock() throws IOException {
791       if (blockType == BlockType.DATA) {
792         BufferGrabbingByteArrayOutputStream baosInMemoryCopy = 
793             new BufferGrabbingByteArrayOutputStream();
794         baosInMemory.writeTo(baosInMemoryCopy);
795         this.dataBlockEncoder.endBlockEncoding(dataBlockEncodingCtx, userDataStream,
796             baosInMemoryCopy.buf, blockType);
797         blockType = dataBlockEncodingCtx.getBlockType();
798       }
799       userDataStream.flush();
800       // This does an array copy, so it is safe to cache this byte array.
801       uncompressedBytesWithHeader = baosInMemory.toByteArray();
802       prevOffset = prevOffsetByType[blockType.getId()];
803 
804       // We need to set state before we can package the block up for
805       // cache-on-write. In a way, the block is ready, but not yet encoded or
806       // compressed.
807       state = State.BLOCK_READY;
808       if (blockType == BlockType.DATA || blockType == BlockType.ENCODED_DATA) {
809         onDiskBytesWithHeader = dataBlockEncodingCtx
810             .compressAndEncrypt(uncompressedBytesWithHeader);
811       } else {
812         onDiskBytesWithHeader = defaultBlockEncodingCtx
813             .compressAndEncrypt(uncompressedBytesWithHeader);
814       }
815       int numBytes = (int) ChecksumUtil.numBytes(
816           onDiskBytesWithHeader.length,
817           fileContext.getBytesPerChecksum());
818 
819       // put the header for on disk bytes
820       putHeader(onDiskBytesWithHeader, 0,
821           onDiskBytesWithHeader.length + numBytes,
822           uncompressedBytesWithHeader.length, onDiskBytesWithHeader.length);
823       // set the header for the uncompressed bytes (for cache-on-write)
824       putHeader(uncompressedBytesWithHeader, 0,
825           onDiskBytesWithHeader.length + numBytes,
826           uncompressedBytesWithHeader.length, onDiskBytesWithHeader.length);
827 
828       onDiskChecksum = new byte[numBytes];
829       ChecksumUtil.generateChecksums(
830           onDiskBytesWithHeader, 0, onDiskBytesWithHeader.length,
831           onDiskChecksum, 0, fileContext.getChecksumType(), fileContext.getBytesPerChecksum());
832     }
833 
834     public static class BufferGrabbingByteArrayOutputStream extends ByteArrayOutputStream {
835       private byte[] buf;
836 
837       @Override
838       public void write(byte[] b, int off, int len) {
839         this.buf = b;
840       }
841 
842       public byte[] getBuffer() {
843         return this.buf;
844       }
845     }
846 
847     /**
848      * Put the header into the given byte array at the given offset.
849      * @param onDiskSize size of the block on disk header + data + checksum
850      * @param uncompressedSize size of the block after decompression (but
851      *          before optional data block decoding) including header
852      * @param onDiskDataSize size of the block on disk with header
853      *        and data but not including the checksums
854      */
855     private void putHeader(byte[] dest, int offset, int onDiskSize,
856         int uncompressedSize, int onDiskDataSize) {
857       offset = blockType.put(dest, offset);
858       offset = Bytes.putInt(dest, offset, onDiskSize - HConstants.HFILEBLOCK_HEADER_SIZE);
859       offset = Bytes.putInt(dest, offset, uncompressedSize - HConstants.HFILEBLOCK_HEADER_SIZE);
860       offset = Bytes.putLong(dest, offset, prevOffset);
861       offset = Bytes.putByte(dest, offset, fileContext.getChecksumType().getCode());
862       offset = Bytes.putInt(dest, offset, fileContext.getBytesPerChecksum());
863       Bytes.putInt(dest, offset, onDiskDataSize);
864     }
865 
866     /**
867      * Similar to {@link #writeHeaderAndData(FSDataOutputStream)}, but records
868      * the offset of this block so that it can be referenced in the next block
869      * of the same type.
870      *
871      * @param out
872      * @throws IOException
873      */
874     public void writeHeaderAndData(FSDataOutputStream out) throws IOException {
875       long offset = out.getPos();
876       if (startOffset != -1 && offset != startOffset) {
877         throw new IOException("A " + blockType + " block written to a "
878             + "stream twice, first at offset " + startOffset + ", then at "
879             + offset);
880       }
881       startOffset = offset;
882 
883       finishBlockAndWriteHeaderAndData((DataOutputStream) out);
884     }
885 
886     /**
887      * Writes the header and the compressed data of this block (or uncompressed
888      * data when not using compression) into the given stream. Can be called in
889      * the "writing" state or in the "block ready" state. If called in the
890      * "writing" state, transitions the writer to the "block ready" state.
891      *
892      * @param out the output stream to write the
893      * @throws IOException
894      */
895     protected void finishBlockAndWriteHeaderAndData(DataOutputStream out)
896       throws IOException {
897       ensureBlockReady();
898       out.write(onDiskBytesWithHeader);
899       out.write(onDiskChecksum);
900     }
901 
902     /**
903      * Returns the header or the compressed data (or uncompressed data when not
904      * using compression) as a byte array. Can be called in the "writing" state
905      * or in the "block ready" state. If called in the "writing" state,
906      * transitions the writer to the "block ready" state. This returns
907      * the header + data + checksums stored on disk.
908      *
909      * @return header and data as they would be stored on disk in a byte array
910      * @throws IOException
911      */
912     byte[] getHeaderAndDataForTest() throws IOException {
913       ensureBlockReady();
914       // This is not very optimal, because we are doing an extra copy.
915       // But this method is used only by unit tests.
916       byte[] output =
917           new byte[onDiskBytesWithHeader.length
918               + onDiskChecksum.length];
919       System.arraycopy(onDiskBytesWithHeader, 0, output, 0,
920           onDiskBytesWithHeader.length);
921       System.arraycopy(onDiskChecksum, 0, output,
922           onDiskBytesWithHeader.length, onDiskChecksum.length);
923       return output;
924     }
925 
926     /**
927      * Releases resources used by this writer.
928      */
929     public void release() {
930       if (dataBlockEncodingCtx != null) {
931         dataBlockEncodingCtx.close();
932         dataBlockEncodingCtx = null;
933       }
934       if (defaultBlockEncodingCtx != null) {
935         defaultBlockEncodingCtx.close();
936         defaultBlockEncodingCtx = null;
937       }
938     }
939 
940     /**
941      * Returns the on-disk size of the data portion of the block. This is the
942      * compressed size if compression is enabled. Can only be called in the
943      * "block ready" state. Header is not compressed, and its size is not
944      * included in the return value.
945      *
946      * @return the on-disk size of the block, not including the header.
947      */
948     int getOnDiskSizeWithoutHeader() {
949       expectState(State.BLOCK_READY);
950       return onDiskBytesWithHeader.length + onDiskChecksum.length - HConstants.HFILEBLOCK_HEADER_SIZE;
951     }
952 
953     /**
954      * Returns the on-disk size of the block. Can only be called in the
955      * "block ready" state.
956      *
957      * @return the on-disk size of the block ready to be written, including the
958      *         header size, the data and the checksum data.
959      */
960     int getOnDiskSizeWithHeader() {
961       expectState(State.BLOCK_READY);
962       return onDiskBytesWithHeader.length + onDiskChecksum.length;
963     }
964 
965     /**
966      * The uncompressed size of the block data. Does not include header size.
967      */
968     int getUncompressedSizeWithoutHeader() {
969       expectState(State.BLOCK_READY);
970       return uncompressedBytesWithHeader.length - HConstants.HFILEBLOCK_HEADER_SIZE;
971     }
972 
973     /**
974      * The uncompressed size of the block data, including header size.
975      */
976     int getUncompressedSizeWithHeader() {
977       expectState(State.BLOCK_READY);
978       return uncompressedBytesWithHeader.length;
979     }
980 
981     /** @return true if a block is being written  */
982     public boolean isWriting() {
983       return state == State.WRITING;
984     }
985 
986     /**
987      * Returns the number of bytes written into the current block so far, or
988      * zero if not writing the block at the moment. Note that this will return
989      * zero in the "block ready" state as well.
990      *
991      * @return the number of bytes written
992      */
993     public int blockSizeWritten() {
994       if (state != State.WRITING) return 0;
995       return this.unencodedDataSizeWritten;
996     }
997 
998     /**
999      * Returns the header followed by the uncompressed data, even if using
1000      * compression. This is needed for storing uncompressed blocks in the block
1001      * cache. Can be called in the "writing" state or the "block ready" state.
1002      * Returns only the header and data, does not include checksum data.
1003      *
1004      * @return uncompressed block bytes for caching on write
1005      */
1006     ByteBuffer getUncompressedBufferWithHeader() {
1007       expectState(State.BLOCK_READY);
1008       return ByteBuffer.wrap(uncompressedBytesWithHeader);
1009     }
1010 
1011     private void expectState(State expectedState) {
1012       if (state != expectedState) {
1013         throw new IllegalStateException("Expected state: " + expectedState +
1014             ", actual state: " + state);
1015       }
1016     }
1017 
1018     /**
1019      * Takes the given {@link BlockWritable} instance, creates a new block of
1020      * its appropriate type, writes the writable into this block, and flushes
1021      * the block into the output stream. The writer is instructed not to buffer
1022      * uncompressed bytes for cache-on-write.
1023      *
1024      * @param bw the block-writable object to write as a block
1025      * @param out the file system output stream
1026      * @throws IOException
1027      */
1028     public void writeBlock(BlockWritable bw, FSDataOutputStream out)
1029         throws IOException {
1030       bw.writeToBlock(startWriting(bw.getBlockType()));
1031       writeHeaderAndData(out);
1032     }
1033 
1034     /**
1035      * Creates a new HFileBlock. Checksums have already been validated, so
1036      * the byte buffer passed into the constructor of this newly created
1037      * block does not have checksum data even though the header minor 
1038      * version is MINOR_VERSION_WITH_CHECKSUM. This is indicated by setting a
1039      * 0 value in bytesPerChecksum.
1040      */
1041     public HFileBlock getBlockForCaching() {
1042       HFileContext newContext = new HFileContextBuilder()
1043                                 .withBlockSize(fileContext.getBlocksize())
1044                                 .withBytesPerCheckSum(0)
1045                                 .withChecksumType(ChecksumType.NULL) // no checksums in cached data
1046                                 .withCompression(fileContext.getCompression())
1047                                 .withDataBlockEncoding(fileContext.getDataBlockEncoding())
1048                                 .withHBaseCheckSum(fileContext.isUseHBaseChecksum())
1049                                 .withCompressTags(fileContext.isCompressTags())
1050                                 .withIncludesMvcc(fileContext.isIncludesMvcc())
1051                                 .withIncludesTags(fileContext.isIncludesTags())
1052                                 .build();
1053       return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(),
1054           getUncompressedSizeWithoutHeader(), prevOffset, getUncompressedBufferWithHeader(),
1055           DONT_FILL_HEADER, startOffset,
1056           onDiskBytesWithHeader.length + onDiskChecksum.length, newContext);
1057     }
1058   }
1059 
1060   /** Something that can be written into a block. */
1061   public interface BlockWritable {
1062 
1063     /** The type of block this data should use. */
1064     BlockType getBlockType();
1065 
1066     /**
1067      * Writes the block to the provided stream. Must not write any magic
1068      * records.
1069      *
1070      * @param out a stream to write uncompressed data into
1071      */
1072     void writeToBlock(DataOutput out) throws IOException;
1073   }
1074 
1075   // Block readers and writers
1076 
1077   /** An interface allowing to iterate {@link HFileBlock}s. */
1078   public interface BlockIterator {
1079 
1080     /**
1081      * Get the next block, or null if there are no more blocks to iterate.
1082      */
1083     HFileBlock nextBlock() throws IOException;
1084 
1085     /**
1086      * Similar to {@link #nextBlock()} but checks block type, throws an
1087      * exception if incorrect, and returns the HFile block
1088      */
1089     HFileBlock nextBlockWithBlockType(BlockType blockType) throws IOException;
1090   }
1091 
1092   /** A full-fledged reader with iteration ability. */
1093   public interface FSReader {
1094 
1095     /**
1096      * Reads the block at the given offset in the file with the given on-disk
1097      * size and uncompressed size.
1098      *
1099      * @param offset
1100      * @param onDiskSize the on-disk size of the entire block, including all
1101      *          applicable headers, or -1 if unknown
1102      * @param uncompressedSize the uncompressed size of the compressed part of
1103      *          the block, or -1 if unknown
1104      * @return the newly read block
1105      */
1106     HFileBlock readBlockData(long offset, long onDiskSize,
1107         int uncompressedSize, boolean pread) throws IOException;
1108 
1109     /**
1110      * Creates a block iterator over the given portion of the {@link HFile}.
1111      * The iterator returns blocks starting with offset such that offset <=
1112      * startOffset < endOffset.
1113      *
1114      * @param startOffset the offset of the block to start iteration with
1115      * @param endOffset the offset to end iteration at (exclusive)
1116      * @return an iterator of blocks between the two given offsets
1117      */
1118     BlockIterator blockRange(long startOffset, long endOffset);
1119 
1120     /** Closes the backing streams */
1121     void closeStreams() throws IOException;
1122   }
1123 
1124   /**
1125    * A common implementation of some methods of {@link FSReader} and some
1126    * tools for implementing HFile format version-specific block readers.
1127    */
1128   private abstract static class AbstractFSReader implements FSReader {
1129     /** Compression algorithm used by the {@link HFile} */
1130 
1131     /** The size of the file we are reading from, or -1 if unknown. */
1132     protected long fileSize;
1133 
1134     /** The size of the header */
1135     protected final int hdrSize;
1136 
1137     /** The filesystem used to access data */
1138     protected HFileSystem hfs;
1139 
1140     /** The path (if any) where this data is coming from */
1141     protected Path path;
1142 
1143     private final Lock streamLock = new ReentrantLock();
1144 
1145     /** The default buffer size for our buffered streams */
1146     public static final int DEFAULT_BUFFER_SIZE = 1 << 20;
1147 
1148     protected HFileContext fileContext;
1149 
1150     public AbstractFSReader(long fileSize, HFileSystem hfs, Path path, HFileContext fileContext)
1151         throws IOException {
1152       this.fileSize = fileSize;
1153       this.hfs = hfs;
1154       this.path = path;
1155       this.fileContext = fileContext;
1156       this.hdrSize = headerSize(fileContext.isUseHBaseChecksum());
1157     }
1158 
1159     @Override
1160     public BlockIterator blockRange(final long startOffset,
1161         final long endOffset) {
1162       return new BlockIterator() {
1163         private long offset = startOffset;
1164 
1165         @Override
1166         public HFileBlock nextBlock() throws IOException {
1167           if (offset >= endOffset)
1168             return null;
1169           HFileBlock b = readBlockData(offset, -1, -1, false);
1170           offset += b.getOnDiskSizeWithHeader();
1171           return b;
1172         }
1173 
1174         @Override
1175         public HFileBlock nextBlockWithBlockType(BlockType blockType)
1176             throws IOException {
1177           HFileBlock blk = nextBlock();
1178           if (blk.getBlockType() != blockType) {
1179             throw new IOException("Expected block of type " + blockType
1180                 + " but found " + blk.getBlockType());
1181           }
1182           return blk;
1183         }
1184       };
1185     }
1186 
1187     /**
1188      * Does a positional read or a seek and read into the given buffer. Returns
1189      * the on-disk size of the next block, or -1 if it could not be determined.
1190      *
1191      * @param dest destination buffer
1192      * @param destOffset offset in the destination buffer
1193      * @param size size of the block to be read
1194      * @param peekIntoNextBlock whether to read the next block's on-disk size
1195      * @param fileOffset position in the stream to read at
1196      * @param pread whether we should do a positional read
1197      * @param istream The input source of data
1198      * @return the on-disk size of the next block with header size included, or
1199      *         -1 if it could not be determined
1200      * @throws IOException
1201      */
1202     protected int readAtOffset(FSDataInputStream istream,
1203         byte[] dest, int destOffset, int size,
1204         boolean peekIntoNextBlock, long fileOffset, boolean pread)
1205         throws IOException {
1206       if (peekIntoNextBlock &&
1207           destOffset + size + hdrSize > dest.length) {
1208         // We are asked to read the next block's header as well, but there is
1209         // not enough room in the array.
1210         throw new IOException("Attempted to read " + size + " bytes and " +
1211             hdrSize + " bytes of next header into a " + dest.length +
1212             "-byte array at offset " + destOffset);
1213       }
1214 
1215       if (!pread && streamLock.tryLock()) {
1216         // Seek + read. Better for scanning.
1217         try {
1218           istream.seek(fileOffset);
1219 
1220           long realOffset = istream.getPos();
1221           if (realOffset != fileOffset) {
1222             throw new IOException("Tried to seek to " + fileOffset + " to "
1223                 + "read " + size + " bytes, but pos=" + realOffset
1224                 + " after seek");
1225           }
1226 
1227           if (!peekIntoNextBlock) {
1228             IOUtils.readFully(istream, dest, destOffset, size);
1229             return -1;
1230           }
1231 
1232           // Try to read the next block header.
1233           if (!readWithExtra(istream, dest, destOffset, size, hdrSize))
1234             return -1;
1235         } finally {
1236           streamLock.unlock();
1237         }
1238       } else {
1239         // Positional read. Better for random reads; or when the streamLock is already locked.
1240         int extraSize = peekIntoNextBlock ? hdrSize : 0;
1241 
1242         int ret = istream.read(fileOffset, dest, destOffset, size + extraSize);
1243         if (ret < size) {
1244           throw new IOException("Positional read of " + size + " bytes " +
1245               "failed at offset " + fileOffset + " (returned " + ret + ")");
1246         }
1247 
1248         if (ret == size || ret < size + extraSize) {
1249           // Could not read the next block's header, or did not try.
1250           return -1;
1251         }
1252       }
1253 
1254       assert peekIntoNextBlock;
1255       return Bytes.toInt(dest, destOffset + size + BlockType.MAGIC_LENGTH) +
1256           hdrSize;
1257     }
1258 
1259   }
1260 
1261   /**
1262    * We always prefetch the header of the next block, so that we know its
1263    * on-disk size in advance and can read it in one operation.
1264    */
1265   private static class PrefetchedHeader {
1266     long offset = -1;
1267     byte[] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE];
1268     ByteBuffer buf = ByteBuffer.wrap(header, 0, HConstants.HFILEBLOCK_HEADER_SIZE);
1269   }
1270 
1271   /** Reads version 2 blocks from the filesystem. */
1272   static class FSReaderV2 extends AbstractFSReader {
1273     /** The file system stream of the underlying {@link HFile} that 
1274      * does or doesn't do checksum validations in the filesystem */
1275     protected FSDataInputStreamWrapper streamWrapper;
1276 
1277     private HFileBlockDecodingContext encodedBlockDecodingCtx;
1278 
1279     private HFileBlockDefaultDecodingContext defaultDecodingCtx;
1280 
1281     private ThreadLocal<PrefetchedHeader> prefetchedHeaderForThread =
1282         new ThreadLocal<PrefetchedHeader>() {
1283           @Override
1284           public PrefetchedHeader initialValue() {
1285             return new PrefetchedHeader();
1286           }
1287         };
1288 
1289     public FSReaderV2(FSDataInputStreamWrapper stream, long fileSize, HFileSystem hfs, Path path,
1290         HFileContext fileContext) throws IOException {
1291       super(fileSize, hfs, path, fileContext);
1292       this.streamWrapper = stream;
1293       // Older versions of HBase didn't support checksum.
1294       this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum());
1295       defaultDecodingCtx =
1296         new HFileBlockDefaultDecodingContext(fileContext);
1297       encodedBlockDecodingCtx =
1298           new HFileBlockDefaultDecodingContext(fileContext);
1299     }
1300 
1301     /**
1302      * A constructor that reads files with the latest minor version.
1303      * This is used by unit tests only.
1304      */
1305     FSReaderV2(FSDataInputStream istream, long fileSize, HFileContext fileContext) throws IOException {
1306       this(new FSDataInputStreamWrapper(istream), fileSize, null, null, fileContext);
1307     }
1308 
1309     /**
1310      * Reads a version 2 block. Tries to do as little memory allocation as
1311      * possible, using the provided on-disk size.
1312      *
1313      * @param offset the offset in the stream to read at
1314      * @param onDiskSizeWithHeaderL the on-disk size of the block, including
1315      *          the header, or -1 if unknown
1316      * @param uncompressedSize the uncompressed size of the the block. Always
1317      *          expected to be -1. This parameter is only used in version 1.
1318      * @param pread whether to use a positional read
1319      */
1320     @Override
1321     public HFileBlock readBlockData(long offset, long onDiskSizeWithHeaderL,
1322         int uncompressedSize, boolean pread) throws IOException {
1323 
1324       // get a copy of the current state of whether to validate
1325       // hbase checksums or not for this read call. This is not 
1326       // thread-safe but the one constaint is that if we decide 
1327       // to skip hbase checksum verification then we are 
1328       // guaranteed to use hdfs checksum verification.
1329       boolean doVerificationThruHBaseChecksum = streamWrapper.shouldUseHBaseChecksum();
1330       FSDataInputStream is = streamWrapper.getStream(doVerificationThruHBaseChecksum);
1331 
1332       HFileBlock blk = readBlockDataInternal(is, offset, 
1333                          onDiskSizeWithHeaderL, 
1334                          uncompressedSize, pread,
1335                          doVerificationThruHBaseChecksum);
1336       if (blk == null) {
1337         HFile.LOG.warn("HBase checksum verification failed for file " +
1338                        path + " at offset " +
1339                        offset + " filesize " + fileSize +
1340                        ". Retrying read with HDFS checksums turned on...");
1341 
1342         if (!doVerificationThruHBaseChecksum) {
1343           String msg = "HBase checksum verification failed for file " +
1344                        path + " at offset " +
1345                        offset + " filesize " + fileSize + 
1346                        " but this cannot happen because doVerify is " +
1347                        doVerificationThruHBaseChecksum;
1348           HFile.LOG.warn(msg);
1349           throw new IOException(msg); // cannot happen case here
1350         }
1351         HFile.checksumFailures.incrementAndGet(); // update metrics
1352 
1353         // If we have a checksum failure, we fall back into a mode where
1354         // the next few reads use HDFS level checksums. We aim to make the
1355         // next CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD reads avoid
1356         // hbase checksum verification, but since this value is set without
1357         // holding any locks, it can so happen that we might actually do
1358         // a few more than precisely this number.
1359         is = this.streamWrapper.fallbackToFsChecksum(CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD);
1360         doVerificationThruHBaseChecksum = false;
1361         blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL,
1362                                     uncompressedSize, pread,
1363                                     doVerificationThruHBaseChecksum);
1364         if (blk != null) {
1365           HFile.LOG.warn("HDFS checksum verification suceeded for file " +
1366                          path + " at offset " +
1367                          offset + " filesize " + fileSize);
1368         }
1369       } 
1370       if (blk == null && !doVerificationThruHBaseChecksum) {
1371         String msg = "readBlockData failed, possibly due to " +
1372                      "checksum verification failed for file " + path +
1373                      " at offset " + offset + " filesize " + fileSize;
1374         HFile.LOG.warn(msg);
1375         throw new IOException(msg);
1376       }
1377 
1378       // If there is a checksum mismatch earlier, then retry with 
1379       // HBase checksums switched off and use HDFS checksum verification.
1380       // This triggers HDFS to detect and fix corrupt replicas. The
1381       // next checksumOffCount read requests will use HDFS checksums.
1382       // The decrementing of this.checksumOffCount is not thread-safe,
1383       // but it is harmless because eventually checksumOffCount will be
1384       // a negative number.
1385       streamWrapper.checksumOk();
1386       return blk;
1387     }
1388 
1389     /**
1390      * Reads a version 2 block. 
1391      *
1392      * @param offset the offset in the stream to read at
1393      * @param onDiskSizeWithHeaderL the on-disk size of the block, including
1394      *          the header, or -1 if unknown
1395      * @param uncompressedSize the uncompressed size of the the block. Always
1396      *          expected to be -1. This parameter is only used in version 1.
1397      * @param pread whether to use a positional read
1398      * @param verifyChecksum Whether to use HBase checksums. 
1399      *        If HBase checksum is switched off, then use HDFS checksum.
1400      * @return the HFileBlock or null if there is a HBase checksum mismatch
1401      */
1402     private HFileBlock readBlockDataInternal(FSDataInputStream is, long offset, 
1403         long onDiskSizeWithHeaderL, int uncompressedSize, boolean pread,
1404         boolean verifyChecksum) throws IOException {
1405       if (offset < 0) {
1406         throw new IOException("Invalid offset=" + offset + " trying to read "
1407             + "block (onDiskSize=" + onDiskSizeWithHeaderL
1408             + ", uncompressedSize=" + uncompressedSize + ")");
1409       }
1410       if (uncompressedSize != -1) {
1411         throw new IOException("Version 2 block reader API does not need " +
1412             "the uncompressed size parameter");
1413       }
1414 
1415       if ((onDiskSizeWithHeaderL < hdrSize && onDiskSizeWithHeaderL != -1)
1416           || onDiskSizeWithHeaderL >= Integer.MAX_VALUE) {
1417         throw new IOException("Invalid onDisksize=" + onDiskSizeWithHeaderL
1418             + ": expected to be at least " + hdrSize
1419             + " and at most " + Integer.MAX_VALUE + ", or -1 (offset="
1420             + offset + ", uncompressedSize=" + uncompressedSize + ")");
1421       }
1422 
1423       int onDiskSizeWithHeader = (int) onDiskSizeWithHeaderL;
1424       // See if we can avoid reading the header. This is desirable, because
1425       // we will not incur a backward seek operation if we have already
1426       // read this block's header as part of the previous read's look-ahead.
1427       // And we also want to skip reading the header again if it has already
1428       // been read.
1429       PrefetchedHeader prefetchedHeader = prefetchedHeaderForThread.get();
1430       ByteBuffer headerBuf = prefetchedHeader.offset == offset ?
1431           prefetchedHeader.buf : null;
1432 
1433       int nextBlockOnDiskSize = 0;
1434       // Allocate enough space to fit the next block's header too.
1435       byte[] onDiskBlock = null;
1436 
1437       HFileBlock b = null;
1438       if (onDiskSizeWithHeader > 0) {
1439         // We know the total on-disk size but not the uncompressed size. Read
1440         // the entire block into memory, then parse the header and decompress
1441         // from memory if using compression. This code path is used when
1442         // doing a random read operation relying on the block index, as well as
1443         // when the client knows the on-disk size from peeking into the next
1444         // block's header (e.g. this block's header) when reading the previous
1445         // block. This is the faster and more preferable case.
1446 
1447         // Size that we have to skip in case we have already read the header.
1448         int preReadHeaderSize = headerBuf == null ? 0 : hdrSize;
1449         onDiskBlock = new byte[onDiskSizeWithHeader + hdrSize];
1450         nextBlockOnDiskSize = readAtOffset(is, onDiskBlock,
1451             preReadHeaderSize, onDiskSizeWithHeader - preReadHeaderSize,
1452             true, offset + preReadHeaderSize, pread);
1453         if (headerBuf != null) {
1454           // the header has been read when reading the previous block, copy
1455           // to this block's header
1456           System.arraycopy(headerBuf.array(),
1457               headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize);
1458         } else {
1459           headerBuf = ByteBuffer.wrap(onDiskBlock, 0, hdrSize);
1460         }
1461         // We know the total on-disk size but not the uncompressed size. Read
1462         // the entire block into memory, then parse the header and decompress
1463         // from memory if using compression. Here we have already read the
1464         // block's header
1465         try {
1466           b = new HFileBlock(headerBuf, this.fileContext.isUseHBaseChecksum());
1467         } catch (IOException ex) {
1468           // Seen in load testing. Provide comprehensive debug info.
1469           throw new IOException("Failed to read compressed block at "
1470               + offset
1471               + ", onDiskSizeWithoutHeader="
1472               + onDiskSizeWithHeader
1473               + ", preReadHeaderSize="
1474               + hdrSize
1475               + ", header.length="
1476               + prefetchedHeader.header.length
1477               + ", header bytes: "
1478               + Bytes.toStringBinary(prefetchedHeader.header, 0,
1479                   hdrSize), ex);
1480         }
1481         // if the caller specifies a onDiskSizeWithHeader, validate it.
1482         int onDiskSizeWithoutHeader = onDiskSizeWithHeader - hdrSize;
1483         assert onDiskSizeWithoutHeader >= 0;
1484         b.validateOnDiskSizeWithoutHeader(onDiskSizeWithoutHeader);
1485       } else {
1486         // Check headerBuf to see if we have read this block's header as part of
1487         // reading the previous block. This is an optimization of peeking into
1488         // the next block's header (e.g.this block's header) when reading the
1489         // previous block. This is the faster and more preferable case. If the
1490         // header is already there, don't read the header again.
1491 
1492         // Unfortunately, we still have to do a separate read operation to
1493         // read the header.
1494         if (headerBuf == null) {
1495           // From the header, determine the on-disk size of the given hfile
1496           // block, and read the remaining data, thereby incurring two read
1497           // operations. This might happen when we are doing the first read
1498           // in a series of reads or a random read, and we don't have access
1499           // to the block index. This is costly and should happen very rarely.
1500           headerBuf = ByteBuffer.allocate(hdrSize);
1501           readAtOffset(is, headerBuf.array(), headerBuf.arrayOffset(),
1502               hdrSize, false, offset, pread);
1503         }
1504         b = new HFileBlock(headerBuf, this.fileContext.isUseHBaseChecksum());
1505         onDiskBlock = new byte[b.getOnDiskSizeWithHeader() + hdrSize];
1506         System.arraycopy(headerBuf.array(),
1507               headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize);
1508         nextBlockOnDiskSize =
1509           readAtOffset(is, onDiskBlock, hdrSize, b.getOnDiskSizeWithHeader()
1510               - hdrSize, true, offset + hdrSize, pread);
1511         onDiskSizeWithHeader = b.onDiskSizeWithoutHeader + hdrSize;
1512       }
1513 
1514       Algorithm compressAlgo = fileContext.getCompression();
1515       boolean isCompressed =
1516         compressAlgo != null
1517             && compressAlgo != Compression.Algorithm.NONE;
1518 
1519       Encryption.Context cryptoContext = fileContext.getEncryptionContext();
1520       boolean isEncrypted = cryptoContext != null
1521           && cryptoContext != Encryption.Context.NONE;
1522 
1523       if (!isCompressed && !isEncrypted) {
1524         b.assumeUncompressed();
1525       }
1526 
1527       if (verifyChecksum &&
1528           !validateBlockChecksum(b, onDiskBlock, hdrSize)) {
1529         return null;             // checksum mismatch
1530       }
1531 
1532       if (isCompressed || isEncrypted) {
1533         // This will allocate a new buffer but keep header bytes.
1534         b.allocateBuffer(nextBlockOnDiskSize > 0);
1535         if (b.blockType == BlockType.ENCODED_DATA) {
1536           encodedBlockDecodingCtx.prepareDecoding(b.getOnDiskSizeWithoutHeader(),
1537               b.getUncompressedSizeWithoutHeader(), b.getBufferWithoutHeader(), onDiskBlock,
1538               hdrSize);
1539         } else {
1540           defaultDecodingCtx.prepareDecoding(b.getOnDiskSizeWithoutHeader(),
1541               b.getUncompressedSizeWithoutHeader(), b.getBufferWithoutHeader(), onDiskBlock,
1542               hdrSize);
1543         }
1544         if (nextBlockOnDiskSize > 0) {
1545           // Copy next block's header bytes into the new block if we have them.
1546           System.arraycopy(onDiskBlock, onDiskSizeWithHeader, b.buf.array(),
1547               b.buf.arrayOffset() + hdrSize
1548               + b.uncompressedSizeWithoutHeader + b.totalChecksumBytes(),
1549               hdrSize);
1550         }
1551       } else {
1552         // The onDiskBlock will become the headerAndDataBuffer for this block.
1553         // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already
1554         // contains the header of next block, so no need to set next
1555         // block's header in it.
1556         b = new HFileBlock(ByteBuffer.wrap(onDiskBlock, 0,
1557                 onDiskSizeWithHeader), this.fileContext.isUseHBaseChecksum());
1558       }
1559 
1560       b.nextBlockOnDiskSizeWithHeader = nextBlockOnDiskSize;
1561 
1562       // Set prefetched header
1563       if (b.nextBlockOnDiskSizeWithHeader > 0) {
1564         prefetchedHeader.offset = offset + b.getOnDiskSizeWithHeader();
1565         System.arraycopy(onDiskBlock, onDiskSizeWithHeader,
1566             prefetchedHeader.header, 0, hdrSize);
1567       }
1568 
1569       b.offset = offset;
1570       b.fileContext.setIncludesTags(this.fileContext.isIncludesTags());
1571       b.fileContext.setIncludesMvcc(this.fileContext.isIncludesMvcc());
1572       return b;
1573     }
1574 
1575     void setIncludesMemstoreTS(boolean includesMemstoreTS) {
1576       this.fileContext.setIncludesMvcc(includesMemstoreTS);
1577     }
1578 
1579     void setDataBlockEncoder(HFileDataBlockEncoder encoder) {
1580       encodedBlockDecodingCtx = encoder.newDataBlockDecodingContext(this.fileContext);
1581     }
1582 
1583     /**
1584      * Generates the checksum for the header as well as the data and
1585      * then validates that it matches the value stored in the header.
1586      * If there is a checksum mismatch, then return false. Otherwise
1587      * return true.
1588      */
1589     protected boolean validateBlockChecksum(HFileBlock block, 
1590       byte[] data, int hdrSize) throws IOException {
1591       return ChecksumUtil.validateBlockChecksum(path, block,
1592                                                 data, hdrSize);
1593     }
1594 
1595     @Override
1596     public void closeStreams() throws IOException {
1597       streamWrapper.close();
1598     }
1599   }
1600 
1601   @Override
1602   public int getSerializedLength() {
1603     if (buf != null) {
1604       return this.buf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE;
1605     }
1606     return 0;
1607   }
1608 
1609   @Override
1610   public void serialize(ByteBuffer destination) {
1611     ByteBuffer dupBuf = this.buf.duplicate();
1612     dupBuf.rewind();
1613     destination.put(dupBuf);
1614     serializeExtraInfo(destination);
1615   }
1616 
1617   public void serializeExtraInfo(ByteBuffer destination) {
1618     destination.put(this.fileContext.isUseHBaseChecksum() ? (byte) 1 : (byte) 0);
1619     destination.putLong(this.offset);
1620     destination.putInt(this.nextBlockOnDiskSizeWithHeader);
1621     destination.rewind();
1622   }
1623 
1624   @Override
1625   public CacheableDeserializer<Cacheable> getDeserializer() {
1626     return HFileBlock.blockDeserializer;
1627   }
1628 
1629   @Override
1630   public boolean equals(Object comparison) {
1631     if (this == comparison) {
1632       return true;
1633     }
1634     if (comparison == null) {
1635       return false;
1636     }
1637     if (comparison.getClass() != this.getClass()) {
1638       return false;
1639     }
1640 
1641     HFileBlock castedComparison = (HFileBlock) comparison;
1642 
1643     if (castedComparison.blockType != this.blockType) {
1644       return false;
1645     }
1646     if (castedComparison.nextBlockOnDiskSizeWithHeader != this.nextBlockOnDiskSizeWithHeader) {
1647       return false;
1648     }
1649     if (castedComparison.offset != this.offset) {
1650       return false;
1651     }
1652     if (castedComparison.onDiskSizeWithoutHeader != this.onDiskSizeWithoutHeader) {
1653       return false;
1654     }
1655     if (castedComparison.prevBlockOffset != this.prevBlockOffset) {
1656       return false;
1657     }
1658     if (castedComparison.uncompressedSizeWithoutHeader != this.uncompressedSizeWithoutHeader) {
1659       return false;
1660     }
1661     if (this.buf.compareTo(castedComparison.buf) != 0) {
1662       return false;
1663     }
1664     if (this.buf.position() != castedComparison.buf.position()){
1665       return false;
1666     }
1667     if (this.buf.limit() != castedComparison.buf.limit()){
1668       return false;
1669     }
1670     return true;
1671   }
1672 
1673   public DataBlockEncoding getDataBlockEncoding() {
1674     if (blockType == BlockType.ENCODED_DATA) {
1675       return DataBlockEncoding.getEncodingById(getDataBlockEncodingId());
1676     }
1677     return DataBlockEncoding.NONE;
1678   }
1679 
1680   byte getChecksumType() {
1681     return this.fileContext.getChecksumType().getCode();
1682   }
1683 
1684   int getBytesPerChecksum() {
1685     return this.fileContext.getBytesPerChecksum();
1686   }
1687 
1688   int getOnDiskDataSizeWithHeader() {
1689     return this.onDiskDataSizeWithHeader;
1690   }
1691 
1692   /** 
1693    * Calcuate the number of bytes required to store all the checksums
1694    * for this block. Each checksum value is a 4 byte integer.
1695    */
1696   int totalChecksumBytes() {
1697     // If the hfile block has minorVersion 0, then there are no checksum
1698     // data to validate. Similarly, a zero value in this.bytesPerChecksum
1699     // indicates that cached blocks do not have checksum data because
1700     // checksums were already validated when the block was read from disk.
1701     if (!fileContext.isUseHBaseChecksum() || this.fileContext.getBytesPerChecksum() == 0) {
1702       return 0;
1703     }
1704     return (int)ChecksumUtil.numBytes(onDiskDataSizeWithHeader, this.fileContext.getBytesPerChecksum());
1705   }
1706 
1707   /**
1708    * Returns the size of this block header.
1709    */
1710   public int headerSize() {
1711     return headerSize(this.fileContext.isUseHBaseChecksum());
1712   }
1713 
1714   /**
1715    * Maps a minor version to the size of the header.
1716    */
1717   public static int headerSize(boolean usesHBaseChecksum) {
1718     if (usesHBaseChecksum) {
1719       return HConstants.HFILEBLOCK_HEADER_SIZE;
1720     }
1721     return HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
1722   }
1723 
1724   /**
1725    * Return the appropriate DUMMY_HEADER for the minor version
1726    */
1727   public byte[] getDummyHeaderForVersion() {
1728     return getDummyHeaderForVersion(this.fileContext.isUseHBaseChecksum());
1729   }
1730 
1731   /**
1732    * Return the appropriate DUMMY_HEADER for the minor version
1733    */
1734   static private byte[] getDummyHeaderForVersion(boolean usesHBaseChecksum) {
1735     if (usesHBaseChecksum) {
1736       return HConstants.HFILEBLOCK_DUMMY_HEADER;
1737     }
1738     return DUMMY_HEADER_NO_CHECKSUM;
1739   }
1740 
1741   public HFileContext getHFileContext() {
1742     return this.fileContext;
1743   }
1744 
1745   /**
1746    * Convert the contents of the block header into a human readable string.
1747    * This is mostly helpful for debugging. This assumes that the block
1748    * has minor version > 0.
1749    */
1750   static String toStringHeader(ByteBuffer buf) throws IOException {
1751     int offset = buf.arrayOffset();
1752     byte[] b = buf.array();
1753     long magic = Bytes.toLong(b, offset); 
1754     BlockType bt = BlockType.read(buf);
1755     offset += Bytes.SIZEOF_LONG;
1756     int compressedBlockSizeNoHeader = Bytes.toInt(b, offset);
1757     offset += Bytes.SIZEOF_INT;
1758     int uncompressedBlockSizeNoHeader = Bytes.toInt(b, offset);
1759     offset += Bytes.SIZEOF_INT;
1760     long prevBlockOffset = Bytes.toLong(b, offset); 
1761     offset += Bytes.SIZEOF_LONG;
1762     byte cksumtype = b[offset];
1763     offset += Bytes.SIZEOF_BYTE;
1764     long bytesPerChecksum = Bytes.toInt(b, offset); 
1765     offset += Bytes.SIZEOF_INT;
1766     long onDiskDataSizeWithHeader = Bytes.toInt(b, offset); 
1767     offset += Bytes.SIZEOF_INT;
1768     return " Header dump: magic: " + magic +
1769                    " blockType " + bt +
1770                    " compressedBlockSizeNoHeader " + 
1771                    compressedBlockSizeNoHeader +
1772                    " uncompressedBlockSizeNoHeader " + 
1773                    uncompressedBlockSizeNoHeader +
1774                    " prevBlockOffset " + prevBlockOffset +
1775                    " checksumType " + ChecksumType.codeToType(cksumtype) +
1776                    " bytesPerChecksum " + bytesPerChecksum +
1777                    " onDiskDataSizeWithHeader " + onDiskDataSizeWithHeader;
1778   }
1779 }
1780