View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.io.hfile;
19  
20  import java.io.ByteArrayInputStream;
21  import java.io.ByteArrayOutputStream;
22  import java.io.DataInputStream;
23  import java.io.DataOutput;
24  import java.io.DataOutputStream;
25  import java.io.IOException;
26  import java.io.InputStream;
27  import java.nio.ByteBuffer;
28  import java.util.concurrent.locks.Lock;
29  import java.util.concurrent.locks.ReentrantLock;
30  
31  import org.apache.hadoop.classification.InterfaceAudience;
32  import org.apache.hadoop.fs.FSDataInputStream;
33  import org.apache.hadoop.fs.FSDataOutputStream;
34  import org.apache.hadoop.fs.Path;
35  import org.apache.hadoop.hbase.HConstants;
36  import org.apache.hadoop.hbase.fs.HFileSystem;
37  import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
38  import org.apache.hadoop.hbase.io.compress.Compression;
39  import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
40  import org.apache.hadoop.hbase.io.crypto.Encryption;
41  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
42  import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
43  import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
44  import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
45  import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
46  import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
47  import org.apache.hadoop.hbase.util.Bytes;
48  import org.apache.hadoop.hbase.util.ChecksumType;
49  import org.apache.hadoop.hbase.util.ClassSize;
50  import org.apache.hadoop.hbase.util.CompoundBloomFilter;
51  import org.apache.hadoop.io.IOUtils;
52  
53  import com.google.common.base.Preconditions;
54  
55  /**
56   * Reading {@link HFile} version 1 and 2 blocks, and writing version 2 blocks.
57   * <ul>
58   * <li>In version 1 all blocks are always compressed or uncompressed, as
59   * specified by the {@link HFile}'s compression algorithm, with a type-specific
60   * magic record stored in the beginning of the compressed data (i.e. one needs
61   * to uncompress the compressed block to determine the block type). There is
62   * only a single compression algorithm setting for all blocks. Offset and size
63   * information from the block index are required to read a block.
64   * <li>In version 2 a block is structured as follows:
65   * <ul>
66   * <li>Magic record identifying the block type (8 bytes)
67   * <li>Compressed block size, header not included (4 bytes)
68   * <li>Uncompressed block size, header not included (4 bytes)
69   * <li>The offset of the previous block of the same type (8 bytes). This is
70   * used to be able to navigate to the previous block without going to the block
71   * <li>For minorVersions >=1, there is an additional 4 byte field 
72   * bytesPerChecksum that records the number of bytes in a checksum chunk.
73   * <li>For minorVersions >=1, there is a 4 byte value to store the size of
74   * data on disk (excluding the checksums)
75   * <li>For minorVersions >=1, a series of 4 byte checksums, one each for
76   * the number of bytes specified by bytesPerChecksum.
77   * index.
78   * <li>Compressed data (or uncompressed data if compression is disabled). The
79   * compression algorithm is the same for all the blocks in the {@link HFile},
80   * similarly to what was done in version 1.
81   * </ul>
82   * </ul>
83   * The version 2 block representation in the block cache is the same as above,
84   * except that the data section is always uncompressed in the cache.
85   */
86  @InterfaceAudience.Private
87  public class HFileBlock implements Cacheable {
88  
89    /**
90     * On a checksum failure on a Reader, these many suceeding read
91     * requests switch back to using hdfs checksums before auto-reenabling
92     * hbase checksum verification.
93     */
94    static final int CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD = 3;
95  
96    public static final boolean FILL_HEADER = true;
97    public static final boolean DONT_FILL_HEADER = false;
98  
99    /**
100    * The size of block header when blockType is {@link BlockType#ENCODED_DATA}.
101    * This extends normal header by adding the id of encoder.
102    */
103   public static final int ENCODED_HEADER_SIZE = HConstants.HFILEBLOCK_HEADER_SIZE
104       + DataBlockEncoding.ID_SIZE;
105 
106   static final byte[] DUMMY_HEADER_NO_CHECKSUM =
107      new byte[HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM];
108 
109   public static final int BYTE_BUFFER_HEAP_SIZE = (int) ClassSize.estimateBase(
110       ByteBuffer.wrap(new byte[0], 0, 0).getClass(), false);
111 
112   // meta.usesHBaseChecksum+offset+nextBlockOnDiskSizeWithHeader
113   public static final int EXTRA_SERIALIZATION_SPACE = Bytes.SIZEOF_BYTE +  Bytes.SIZEOF_INT
114       + Bytes.SIZEOF_LONG;
115 
116   /**
117    * Each checksum value is an integer that can be stored in 4 bytes.
118    */
119   static final int CHECKSUM_SIZE = Bytes.SIZEOF_INT;
120 
121   private static final CacheableDeserializer<Cacheable> blockDeserializer =
122       new CacheableDeserializer<Cacheable>() {
123         public HFileBlock deserialize(ByteBuffer buf, boolean reuse) throws IOException{
124           buf.limit(buf.limit() - HFileBlock.EXTRA_SERIALIZATION_SPACE).rewind();
125           ByteBuffer newByteBuffer;
126           if (reuse) {
127             newByteBuffer = buf.slice();
128           } else {
129            newByteBuffer = ByteBuffer.allocate(buf.limit());
130            newByteBuffer.put(buf);
131           }
132           buf.position(buf.limit());
133           buf.limit(buf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE);
134           boolean usesChecksum = buf.get() == (byte)1;
135           HFileBlock ourBuffer = new HFileBlock(newByteBuffer, usesChecksum);
136           ourBuffer.offset = buf.getLong();
137           ourBuffer.nextBlockOnDiskSizeWithHeader = buf.getInt();
138           return ourBuffer;
139         }
140         
141         @Override
142         public int getDeserialiserIdentifier() {
143           return deserializerIdentifier;
144         }
145 
146         @Override
147         public HFileBlock deserialize(ByteBuffer b) throws IOException {
148           return deserialize(b, false);
149         }
150       };
151   private static final int deserializerIdentifier;
152   static {
153     deserializerIdentifier = CacheableDeserializerIdManager
154         .registerDeserializer(blockDeserializer);
155   }
156 
157   private BlockType blockType;
158 
159   /** Size on disk without the header. It includes checksum data too. */
160   private int onDiskSizeWithoutHeader;
161 
162   /** Size of pure data. Does not include header or checksums */
163   private final int uncompressedSizeWithoutHeader;
164 
165   /** The offset of the previous block on disk */
166   private final long prevBlockOffset;
167 
168   /** Size on disk of header and data. Does not include checksum data */
169   private final int onDiskDataSizeWithHeader;
170 
171   /** The in-memory representation of the hfile block */
172   private ByteBuffer buf;
173   /** Meta data that holds meta information on the hfileblock**/
174   private HFileContext fileContext;
175 
176   /**
177    * The offset of this block in the file. Populated by the reader for
178    * convenience of access. This offset is not part of the block header.
179    */
180   private long offset = -1;
181 
182   /**
183    * The on-disk size of the next block, including the header, obtained by
184    * peeking into the first {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the next block's
185    * header, or -1 if unknown.
186    */
187   private int nextBlockOnDiskSizeWithHeader = -1;
188 
189   /**
190    * Creates a new {@link HFile} block from the given fields. This constructor
191    * is mostly used when the block data has already been read and uncompressed,
192    * and is sitting in a byte buffer. 
193    *
194    * @param blockType the type of this block, see {@link BlockType}
195    * @param onDiskSizeWithoutHeader compressed size of the block if compression
196    *          is used, otherwise uncompressed size, header size not included
197    * @param uncompressedSizeWithoutHeader uncompressed size of the block,
198    *          header size not included. Equals onDiskSizeWithoutHeader if
199    *          compression is disabled.
200    * @param prevBlockOffset the offset of the previous block in the
201    *          {@link HFile}
202    * @param buf block header ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes) followed by
203    *          uncompressed data. This
204    * @param fillHeader true to fill in the first {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of
205    *          the buffer based on the header fields provided
206    * @param offset the file offset the block was read from
207    * @param bytesPerChecksum the number of bytes per checksum chunk
208    * @param checksumType the checksum algorithm to use
209    * @param onDiskDataSizeWithHeader size of header and data on disk not
210    *        including checksum data
211    * @param fileContext HFile meta data
212    */
213   HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader,
214       int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuffer buf,
215       boolean fillHeader, long offset,
216       int onDiskDataSizeWithHeader, HFileContext fileContext) {
217     this.blockType = blockType;
218     this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader;
219     this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader;
220     this.prevBlockOffset = prevBlockOffset;
221     this.buf = buf;
222     if (fillHeader)
223       overwriteHeader();
224     this.offset = offset;
225     this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader;
226     this.fileContext = fileContext;
227   }
228 
229   /**
230    * Creates a block from an existing buffer starting with a header. Rewinds
231    * and takes ownership of the buffer. By definition of rewind, ignores the
232    * buffer position, but if you slice the buffer beforehand, it will rewind
233    * to that point. The reason this has a minorNumber and not a majorNumber is
234    * because majorNumbers indicate the format of a HFile whereas minorNumbers 
235    * indicate the format inside a HFileBlock.
236    */
237   HFileBlock(ByteBuffer b, boolean usesHBaseChecksum) throws IOException {
238     b.rewind();
239     blockType = BlockType.read(b);
240     onDiskSizeWithoutHeader = b.getInt();
241     uncompressedSizeWithoutHeader = b.getInt();
242     prevBlockOffset = b.getLong();
243     HFileContextBuilder contextBuilder = new HFileContextBuilder();
244     contextBuilder.withHBaseCheckSum(usesHBaseChecksum);
245     if (usesHBaseChecksum) {
246       contextBuilder.withChecksumType(ChecksumType.codeToType(b.get()));
247       contextBuilder.withBytesPerCheckSum(b.getInt());
248       this.onDiskDataSizeWithHeader = b.getInt();
249     } else {
250       contextBuilder.withChecksumType(ChecksumType.NULL);
251       contextBuilder.withBytesPerCheckSum(0);
252       this.onDiskDataSizeWithHeader = onDiskSizeWithoutHeader +
253                                        HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
254     }
255     this.fileContext = contextBuilder.build();
256     buf = b;
257     buf.rewind();
258   }
259 
260   public BlockType getBlockType() {
261     return blockType;
262   }
263 
264   /** @return get data block encoding id that was used to encode this block */
265   public short getDataBlockEncodingId() {
266     if (blockType != BlockType.ENCODED_DATA) {
267       throw new IllegalArgumentException("Querying encoder ID of a block " +
268           "of type other than " + BlockType.ENCODED_DATA + ": " + blockType);
269     }
270     return buf.getShort(headerSize());
271   }
272 
273   /**
274    * @return the on-disk size of the block with header size included. This
275    * includes the header, the data and the checksum data.
276    */
277   public int getOnDiskSizeWithHeader() {
278     return onDiskSizeWithoutHeader + headerSize();
279   }
280 
281   /**
282    * Returns the size of the compressed part of the block in case compression
283    * is used, or the uncompressed size of the data part otherwise. Header size
284    * and checksum data size is not included.
285    *
286    * @return the on-disk size of the data part of the block, header and
287    *         checksum not included. 
288    */
289   public int getOnDiskSizeWithoutHeader() {
290     return onDiskSizeWithoutHeader;
291   }
292 
293   /**
294    * @return the uncompressed size of the data part of the block, header not
295    *         included
296    */
297    public int getUncompressedSizeWithoutHeader() {
298     return uncompressedSizeWithoutHeader;
299   }
300 
301   /**
302    * @return the offset of the previous block of the same type in the file, or
303    *         -1 if unknown
304    */
305   public long getPrevBlockOffset() {
306     return prevBlockOffset;
307   }
308 
309   /**
310    * Writes header fields into the first {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the
311    * buffer. Resets the buffer position to the end of header as side effect.
312    */
313   private void overwriteHeader() {
314     buf.rewind();
315     blockType.write(buf);
316     buf.putInt(onDiskSizeWithoutHeader);
317     buf.putInt(uncompressedSizeWithoutHeader);
318     buf.putLong(prevBlockOffset);
319   }
320 
321   /**
322    * Returns a buffer that does not include the header. The array offset points
323    * to the start of the block data right after the header. The underlying data
324    * array is not copied. Checksum data is not included in the returned buffer.
325    *
326    * @return the buffer with header skipped
327    */
328   public ByteBuffer getBufferWithoutHeader() {
329     return ByteBuffer.wrap(buf.array(), buf.arrayOffset() + headerSize(),
330         buf.limit() - headerSize() - totalChecksumBytes()).slice();
331   }
332 
333   /**
334    * Returns the buffer this block stores internally. The clients must not
335    * modify the buffer object. This method has to be public because it is
336    * used in {@link CompoundBloomFilter} to avoid object creation on every
337    * Bloom filter lookup, but has to be used with caution. Checksum data
338    * is not included in the returned buffer.
339    *
340    * @return the buffer of this block for read-only operations
341    */
342   public ByteBuffer getBufferReadOnly() {
343     return ByteBuffer.wrap(buf.array(), buf.arrayOffset(),
344         buf.limit() - totalChecksumBytes()).slice();
345   }
346 
347   /**
348    * Returns the buffer of this block, including header data. The clients must
349    * not modify the buffer object. This method has to be public because it is
350    * used in {@link BucketCache} to avoid buffer copy.
351    * 
352    * @return the byte buffer with header included for read-only operations
353    */
354   public ByteBuffer getBufferReadOnlyWithHeader() {
355     return ByteBuffer.wrap(buf.array(), buf.arrayOffset(), buf.limit()).slice();
356   }
357 
358   /**
359    * Returns a byte buffer of this block, including header data, positioned at
360    * the beginning of header. The underlying data array is not copied.
361    *
362    * @return the byte buffer with header included
363    */
364   ByteBuffer getBufferWithHeader() {
365     ByteBuffer dupBuf = buf.duplicate();
366     dupBuf.rewind();
367     return dupBuf;
368   }
369 
370   private void sanityCheckAssertion(long valueFromBuf, long valueFromField,
371       String fieldName) throws IOException {
372     if (valueFromBuf != valueFromField) {
373       throw new AssertionError(fieldName + " in the buffer (" + valueFromBuf
374           + ") is different from that in the field (" + valueFromField + ")");
375     }
376   }
377 
378   /**
379    * Checks if the block is internally consistent, i.e. the first
380    * {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the buffer contain a valid header consistent
381    * with the fields. This function is primary for testing and debugging, and
382    * is not thread-safe, because it alters the internal buffer pointer.
383    */
384   void sanityCheck() throws IOException {
385     buf.rewind();
386 
387     {
388       BlockType blockTypeFromBuf = BlockType.read(buf);
389       if (blockTypeFromBuf != blockType) {
390         throw new IOException("Block type stored in the buffer: " +
391             blockTypeFromBuf + ", block type field: " + blockType);
392       }
393     }
394 
395     sanityCheckAssertion(buf.getInt(), onDiskSizeWithoutHeader,
396         "onDiskSizeWithoutHeader");
397 
398     sanityCheckAssertion(buf.getInt(), uncompressedSizeWithoutHeader,
399         "uncompressedSizeWithoutHeader");
400 
401     sanityCheckAssertion(buf.getLong(), prevBlockOffset, "prevBlocKOffset");
402     if (this.fileContext.isUseHBaseChecksum()) {
403       sanityCheckAssertion(buf.get(), this.fileContext.getChecksumType().getCode(), "checksumType");
404       sanityCheckAssertion(buf.getInt(), this.fileContext.getBytesPerChecksum(), "bytesPerChecksum");
405       sanityCheckAssertion(buf.getInt(), onDiskDataSizeWithHeader, 
406                            "onDiskDataSizeWithHeader");
407     }
408 
409     int cksumBytes = totalChecksumBytes();
410     int hdrSize = headerSize();
411     int expectedBufLimit = uncompressedSizeWithoutHeader + headerSize() +
412                            cksumBytes;
413     if (buf.limit() != expectedBufLimit) {
414       throw new AssertionError("Expected buffer limit " + expectedBufLimit
415           + ", got " + buf.limit());
416     }
417 
418     // We might optionally allocate HFILEBLOCK_HEADER_SIZE more bytes to read the next
419     // block's, header, so there are two sensible values for buffer capacity.
420     int size = uncompressedSizeWithoutHeader + hdrSize + cksumBytes;
421     if (buf.capacity() != size &&
422         buf.capacity() != size + hdrSize) {
423       throw new AssertionError("Invalid buffer capacity: " + buf.capacity() +
424           ", expected " + size + " or " + (size + hdrSize));
425     }
426   }
427 
428   @Override
429   public String toString() {
430     return "blockType="
431         + blockType
432         + ", onDiskSizeWithoutHeader="
433         + onDiskSizeWithoutHeader
434         + ", uncompressedSizeWithoutHeader="
435         + uncompressedSizeWithoutHeader
436         + ", prevBlockOffset="
437         + prevBlockOffset
438         + ", dataBeginsWith="
439         + Bytes.toStringBinary(buf.array(), buf.arrayOffset() + headerSize(),
440             Math.min(32, buf.limit() - buf.arrayOffset() - headerSize()))
441         + ", fileOffset=" + offset;
442   }
443 
444   private void validateOnDiskSizeWithoutHeader(
445       int expectedOnDiskSizeWithoutHeader) throws IOException {
446     if (onDiskSizeWithoutHeader != expectedOnDiskSizeWithoutHeader) {
447       String blockInfoMsg =
448         "Block offset: " + offset + ", data starts with: "
449           + Bytes.toStringBinary(buf.array(), buf.arrayOffset(),
450               buf.arrayOffset() + Math.min(32, buf.limit()));
451       throw new IOException("On-disk size without header provided is "
452           + expectedOnDiskSizeWithoutHeader + ", but block "
453           + "header contains " + onDiskSizeWithoutHeader + ". " +
454           blockInfoMsg);
455     }
456   }
457 
458   /**
459    * Always allocates a new buffer of the correct size. Copies header bytes
460    * from the existing buffer. Does not change header fields. 
461    * Reserve room to keep checksum bytes too.
462    *
463    * @param extraBytes whether to reserve room in the buffer to read the next
464    *          block's header
465    */
466   private void allocateBuffer(boolean extraBytes) {
467     int cksumBytes = totalChecksumBytes();
468     int capacityNeeded = headerSize() + uncompressedSizeWithoutHeader +
469         cksumBytes +
470         (extraBytes ? headerSize() : 0);
471 
472     ByteBuffer newBuf = ByteBuffer.allocate(capacityNeeded);
473 
474     // Copy header bytes.
475     System.arraycopy(buf.array(), buf.arrayOffset(), newBuf.array(),
476         newBuf.arrayOffset(), headerSize());
477 
478     buf = newBuf;
479     buf.limit(headerSize() + uncompressedSizeWithoutHeader + cksumBytes);
480   }
481 
482   /** An additional sanity-check in case no compression is being used. */
483   public void assumeUncompressed() throws IOException {
484     if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader + 
485         totalChecksumBytes()) {
486       throw new IOException("Using no compression but "
487           + "onDiskSizeWithoutHeader=" + onDiskSizeWithoutHeader + ", "
488           + "uncompressedSizeWithoutHeader=" + uncompressedSizeWithoutHeader
489           + ", numChecksumbytes=" + totalChecksumBytes());
490     }
491   }
492 
493   /**
494    * @param expectedType the expected type of this block
495    * @throws IOException if this block's type is different than expected
496    */
497   public void expectType(BlockType expectedType) throws IOException {
498     if (blockType != expectedType) {
499       throw new IOException("Invalid block type: expected=" + expectedType
500           + ", actual=" + blockType);
501     }
502   }
503 
504   /** @return the offset of this block in the file it was read from */
505   public long getOffset() {
506     if (offset < 0) {
507       throw new IllegalStateException(
508           "HFile block offset not initialized properly");
509     }
510     return offset;
511   }
512 
513   /**
514    * @return a byte stream reading the data section of this block
515    */
516   public DataInputStream getByteStream() {
517     return new DataInputStream(new ByteArrayInputStream(buf.array(),
518         buf.arrayOffset() + headerSize(), buf.limit() - headerSize()));
519   }
520 
521   @Override
522   public long heapSize() {
523     long size = ClassSize.align(
524         ClassSize.OBJECT +
525         // Block type, byte buffer and meta references
526         3 * ClassSize.REFERENCE +
527         // On-disk size, uncompressed size, and next block's on-disk size
528         // bytePerChecksum and onDiskDataSize
529         4 * Bytes.SIZEOF_INT +
530         // This and previous block offset
531         2 * Bytes.SIZEOF_LONG +
532         // Heap size of the meta object. meta will be always not null.
533         fileContext.heapSize()
534     );
535 
536     if (buf != null) {
537       // Deep overhead of the byte buffer. Needs to be aligned separately.
538       size += ClassSize.align(buf.capacity() + BYTE_BUFFER_HEAP_SIZE);
539     }
540 
541     return ClassSize.align(size);
542   }
543 
544   /**
545    * Read from an input stream. Analogous to
546    * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but specifies a
547    * number of "extra" bytes that would be desirable but not absolutely
548    * necessary to read.
549    *
550    * @param in the input stream to read from
551    * @param buf the buffer to read into
552    * @param bufOffset the destination offset in the buffer
553    * @param necessaryLen the number of bytes that are absolutely necessary to
554    *          read
555    * @param extraLen the number of extra bytes that would be nice to read
556    * @return true if succeeded reading the extra bytes
557    * @throws IOException if failed to read the necessary bytes
558    */
559   public static boolean readWithExtra(InputStream in, byte buf[],
560       int bufOffset, int necessaryLen, int extraLen) throws IOException {
561     int bytesRemaining = necessaryLen + extraLen;
562     while (bytesRemaining > 0) {
563       int ret = in.read(buf, bufOffset, bytesRemaining);
564       if (ret == -1 && bytesRemaining <= extraLen) {
565         // We could not read the "extra data", but that is OK.
566         break;
567       }
568 
569       if (ret < 0) {
570         throw new IOException("Premature EOF from inputStream (read "
571             + "returned " + ret + ", was trying to read " + necessaryLen
572             + " necessary bytes and " + extraLen + " extra bytes, "
573             + "successfully read "
574             + (necessaryLen + extraLen - bytesRemaining));
575       }
576       bufOffset += ret;
577       bytesRemaining -= ret;
578     }
579     return bytesRemaining <= 0;
580   }
581 
582   /**
583    * @return the on-disk size of the next block (including the header size)
584    *         that was read by peeking into the next block's header
585    */
586   public int getNextBlockOnDiskSizeWithHeader() {
587     return nextBlockOnDiskSizeWithHeader;
588   }
589 
590 
591   /**
592    * Unified version 2 {@link HFile} block writer. The intended usage pattern
593    * is as follows:
594    * <ol>
595    * <li>Construct an {@link HFileBlock.Writer}, providing a compression algorithm.
596    * <li>Call {@link Writer#startWriting} and get a data stream to write to.
597    * <li>Write your data into the stream.
598    * <li>Call {@link Writer#writeHeaderAndData(FSDataOutputStream)} as many times as you need to.
599    * store the serialized block into an external stream.
600    * <li>Repeat to write more blocks.
601    * </ol>
602    * <p>
603    */
604   public static class Writer {
605 
606     private enum State {
607       INIT,
608       WRITING,
609       BLOCK_READY
610     };
611 
612     /** Writer state. Used to ensure the correct usage protocol. */
613     private State state = State.INIT;
614 
615     /** Data block encoder used for data blocks */
616     private final HFileDataBlockEncoder dataBlockEncoder;
617 
618     private HFileBlockEncodingContext dataBlockEncodingCtx;
619 
620     /** block encoding context for non-data blocks */
621     private HFileBlockDefaultEncodingContext defaultBlockEncodingCtx;
622 
623     /**
624      * The stream we use to accumulate data in uncompressed format for each
625      * block. We reset this stream at the end of each block and reuse it. The
626      * header is written as the first {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes into this
627      * stream.
628      */
629     private ByteArrayOutputStream baosInMemory;
630 
631     /**
632      * Current block type. Set in {@link #startWriting(BlockType)}. Could be
633      * changed in {@link #encodeDataBlockForDisk()} from {@link BlockType#DATA}
634      * to {@link BlockType#ENCODED_DATA}.
635      */
636     private BlockType blockType;
637 
638     /**
639      * A stream that we write uncompressed bytes to, which compresses them and
640      * writes them to {@link #baosInMemory}.
641      */
642     private DataOutputStream userDataStream;
643 
644     /**
645      * Bytes to be written to the file system, including the header. Compressed
646      * if compression is turned on. It also includes the checksum data that 
647      * immediately follows the block data. (header + data + checksums)
648      */
649     private byte[] onDiskBytesWithHeader;
650 
651     /**
652      * The size of the checksum data on disk. It is used only if data is
653      * not compressed. If data is compressed, then the checksums are already
654      * part of onDiskBytesWithHeader. If data is uncompressed, then this
655      * variable stores the checksum data for this block.
656      */
657     private byte[] onDiskChecksum;
658 
659     /**
660      * Valid in the READY state. Contains the header and the uncompressed (but
661      * potentially encoded, if this is a data block) bytes, so the length is
662      * {@link #uncompressedSizeWithoutHeader} + {@link org.apache.hadoop.hbase.HConstants#HFILEBLOCK_HEADER_SIZE}.
663      * Does not store checksums.
664      */
665     private byte[] uncompressedBytesWithHeader;
666 
667     /**
668      * Current block's start offset in the {@link HFile}. Set in
669      * {@link #writeHeaderAndData(FSDataOutputStream)}.
670      */
671     private long startOffset;
672 
673     /**
674      * Offset of previous block by block type. Updated when the next block is
675      * started.
676      */
677     private long[] prevOffsetByType;
678 
679     /** The offset of the previous block of the same type */
680     private long prevOffset;
681     /** Meta data that holds information about the hfileblock**/
682     private HFileContext fileContext;
683 
684     /**
685      * @param dataBlockEncoder data block encoding algorithm to use
686      */
687     public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext fileContext) {
688       this.dataBlockEncoder = dataBlockEncoder != null
689           ? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE;
690       defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(null,
691           HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
692       dataBlockEncodingCtx = this.dataBlockEncoder
693           .newDataBlockEncodingContext(HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
694 
695       if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) {
696         throw new RuntimeException("Unsupported value of bytesPerChecksum. " +
697             " Minimum is " + HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is " +
698             fileContext.getBytesPerChecksum());
699       }
700 
701       baosInMemory = new ByteArrayOutputStream();
702       
703       prevOffsetByType = new long[BlockType.values().length];
704       for (int i = 0; i < prevOffsetByType.length; ++i)
705         prevOffsetByType[i] = -1;
706 
707       this.fileContext = fileContext;
708     }
709 
710     /**
711      * Starts writing into the block. The previous block's data is discarded.
712      *
713      * @return the stream the user can write their data into
714      * @throws IOException
715      */
716     public DataOutputStream startWriting(BlockType newBlockType)
717         throws IOException {
718       if (state == State.BLOCK_READY && startOffset != -1) {
719         // We had a previous block that was written to a stream at a specific
720         // offset. Save that offset as the last offset of a block of that type.
721         prevOffsetByType[blockType.getId()] = startOffset;
722       }
723 
724       startOffset = -1;
725       blockType = newBlockType;
726 
727       baosInMemory.reset();
728       baosInMemory.write(HConstants.HFILEBLOCK_DUMMY_HEADER);
729 
730       state = State.WRITING;
731 
732       // We will compress it later in finishBlock()
733       userDataStream = new DataOutputStream(baosInMemory);
734       return userDataStream;
735     }
736 
737     /**
738      * Returns the stream for the user to write to. The block writer takes care
739      * of handling compression and buffering for caching on write. Can only be
740      * called in the "writing" state.
741      *
742      * @return the data output stream for the user to write to
743      */
744     DataOutputStream getUserDataStream() {
745       expectState(State.WRITING);
746       return userDataStream;
747     }
748 
749     /**
750      * Transitions the block writer from the "writing" state to the "block
751      * ready" state.  Does nothing if a block is already finished.
752      */
753     private void ensureBlockReady() throws IOException {
754       Preconditions.checkState(state != State.INIT,
755           "Unexpected state: " + state);
756 
757       if (state == State.BLOCK_READY)
758         return;
759 
760       // This will set state to BLOCK_READY.
761       finishBlock();
762     }
763 
764     /**
765      * An internal method that flushes the compressing stream (if using
766      * compression), serializes the header, and takes care of the separate
767      * uncompressed stream for caching on write, if applicable. Sets block
768      * write state to "block ready".
769      */
770     private void finishBlock() throws IOException {
771       userDataStream.flush();
772       // This does an array copy, so it is safe to cache this byte array.
773       uncompressedBytesWithHeader = baosInMemory.toByteArray();
774       prevOffset = prevOffsetByType[blockType.getId()];
775 
776       // We need to set state before we can package the block up for
777       // cache-on-write. In a way, the block is ready, but not yet encoded or
778       // compressed.
779       state = State.BLOCK_READY;
780       if (blockType == BlockType.DATA) {
781         encodeDataBlockForDisk();
782       } else {
783         defaultBlockEncodingCtx.compressAfterEncodingWithBlockType(
784             uncompressedBytesWithHeader, blockType);
785         onDiskBytesWithHeader =
786           defaultBlockEncodingCtx.getOnDiskBytesWithHeader();
787       }
788 
789       int numBytes = (int) ChecksumUtil.numBytes(
790           onDiskBytesWithHeader.length,
791           fileContext.getBytesPerChecksum());
792 
793       // put the header for on disk bytes
794       putHeader(onDiskBytesWithHeader, 0,
795           onDiskBytesWithHeader.length + numBytes,
796           uncompressedBytesWithHeader.length, onDiskBytesWithHeader.length);
797       // set the header for the uncompressed bytes (for cache-on-write)
798       putHeader(uncompressedBytesWithHeader, 0,
799           onDiskBytesWithHeader.length + numBytes,
800           uncompressedBytesWithHeader.length, onDiskBytesWithHeader.length);
801 
802       onDiskChecksum = new byte[numBytes];
803       ChecksumUtil.generateChecksums(
804           onDiskBytesWithHeader, 0, onDiskBytesWithHeader.length,
805           onDiskChecksum, 0, fileContext.getChecksumType(), fileContext.getBytesPerChecksum());
806     }
807 
808     /**
809      * Encodes this block if it is a data block and encoding is turned on in
810      * {@link #dataBlockEncoder}.
811      */
812     private void encodeDataBlockForDisk() throws IOException {
813       // do data block encoding, if data block encoder is set
814       ByteBuffer rawKeyValues =
815           ByteBuffer.wrap(uncompressedBytesWithHeader, HConstants.HFILEBLOCK_HEADER_SIZE,
816               uncompressedBytesWithHeader.length - HConstants.HFILEBLOCK_HEADER_SIZE).slice();
817 
818       // do the encoding
819       dataBlockEncoder.beforeWriteToDisk(rawKeyValues, dataBlockEncodingCtx, blockType);
820 
821       uncompressedBytesWithHeader =
822           dataBlockEncodingCtx.getUncompressedBytesWithHeader();
823       onDiskBytesWithHeader =
824           dataBlockEncodingCtx.getOnDiskBytesWithHeader();
825       blockType = dataBlockEncodingCtx.getBlockType();
826     }
827 
828     /**
829      * Put the header into the given byte array at the given offset.
830      * @param onDiskSize size of the block on disk header + data + checksum
831      * @param uncompressedSize size of the block after decompression (but
832      *          before optional data block decoding) including header
833      * @param onDiskDataSize size of the block on disk with header
834      *        and data but not including the checksums
835      */
836     private void putHeader(byte[] dest, int offset, int onDiskSize,
837         int uncompressedSize, int onDiskDataSize) {
838       offset = blockType.put(dest, offset);
839       offset = Bytes.putInt(dest, offset, onDiskSize - HConstants.HFILEBLOCK_HEADER_SIZE);
840       offset = Bytes.putInt(dest, offset, uncompressedSize - HConstants.HFILEBLOCK_HEADER_SIZE);
841       offset = Bytes.putLong(dest, offset, prevOffset);
842       offset = Bytes.putByte(dest, offset, fileContext.getChecksumType().getCode());
843       offset = Bytes.putInt(dest, offset, fileContext.getBytesPerChecksum());
844       Bytes.putInt(dest, offset, onDiskDataSize);
845     }
846 
847     /**
848      * Similar to {@link #writeHeaderAndData(FSDataOutputStream)}, but records
849      * the offset of this block so that it can be referenced in the next block
850      * of the same type.
851      *
852      * @param out
853      * @throws IOException
854      */
855     public void writeHeaderAndData(FSDataOutputStream out) throws IOException {
856       long offset = out.getPos();
857       if (startOffset != -1 && offset != startOffset) {
858         throw new IOException("A " + blockType + " block written to a "
859             + "stream twice, first at offset " + startOffset + ", then at "
860             + offset);
861       }
862       startOffset = offset;
863 
864       finishBlockAndWriteHeaderAndData((DataOutputStream) out);
865     }
866 
867     /**
868      * Writes the header and the compressed data of this block (or uncompressed
869      * data when not using compression) into the given stream. Can be called in
870      * the "writing" state or in the "block ready" state. If called in the
871      * "writing" state, transitions the writer to the "block ready" state.
872      *
873      * @param out the output stream to write the
874      * @throws IOException
875      */
876     private void finishBlockAndWriteHeaderAndData(DataOutputStream out)
877       throws IOException {
878       ensureBlockReady();
879       out.write(onDiskBytesWithHeader);
880       out.write(onDiskChecksum);
881     }
882 
883     /**
884      * Returns the header or the compressed data (or uncompressed data when not
885      * using compression) as a byte array. Can be called in the "writing" state
886      * or in the "block ready" state. If called in the "writing" state,
887      * transitions the writer to the "block ready" state. This returns
888      * the header + data + checksums stored on disk.
889      *
890      * @return header and data as they would be stored on disk in a byte array
891      * @throws IOException
892      */
893     byte[] getHeaderAndDataForTest() throws IOException {
894       ensureBlockReady();
895       // This is not very optimal, because we are doing an extra copy.
896       // But this method is used only by unit tests.
897       byte[] output =
898           new byte[onDiskBytesWithHeader.length
899               + onDiskChecksum.length];
900       System.arraycopy(onDiskBytesWithHeader, 0, output, 0,
901           onDiskBytesWithHeader.length);
902       System.arraycopy(onDiskChecksum, 0, output,
903           onDiskBytesWithHeader.length, onDiskChecksum.length);
904       return output;
905     }
906 
907     /**
908      * Releases resources used by this writer.
909      */
910     public void release() {
911       if (dataBlockEncodingCtx != null) {
912         dataBlockEncodingCtx.close();
913         dataBlockEncodingCtx = null;
914       }
915       if (defaultBlockEncodingCtx != null) {
916         defaultBlockEncodingCtx.close();
917         defaultBlockEncodingCtx = null;
918       }
919     }
920 
921     /**
922      * Returns the on-disk size of the data portion of the block. This is the
923      * compressed size if compression is enabled. Can only be called in the
924      * "block ready" state. Header is not compressed, and its size is not
925      * included in the return value.
926      *
927      * @return the on-disk size of the block, not including the header.
928      */
929     int getOnDiskSizeWithoutHeader() {
930       expectState(State.BLOCK_READY);
931       return onDiskBytesWithHeader.length + onDiskChecksum.length - HConstants.HFILEBLOCK_HEADER_SIZE;
932     }
933 
934     /**
935      * Returns the on-disk size of the block. Can only be called in the
936      * "block ready" state.
937      *
938      * @return the on-disk size of the block ready to be written, including the
939      *         header size, the data and the checksum data.
940      */
941     int getOnDiskSizeWithHeader() {
942       expectState(State.BLOCK_READY);
943       return onDiskBytesWithHeader.length + onDiskChecksum.length;
944     }
945 
946     /**
947      * The uncompressed size of the block data. Does not include header size.
948      */
949     int getUncompressedSizeWithoutHeader() {
950       expectState(State.BLOCK_READY);
951       return uncompressedBytesWithHeader.length - HConstants.HFILEBLOCK_HEADER_SIZE;
952     }
953 
954     /**
955      * The uncompressed size of the block data, including header size.
956      */
957     int getUncompressedSizeWithHeader() {
958       expectState(State.BLOCK_READY);
959       return uncompressedBytesWithHeader.length;
960     }
961 
962     /** @return true if a block is being written  */
963     public boolean isWriting() {
964       return state == State.WRITING;
965     }
966 
967     /**
968      * Returns the number of bytes written into the current block so far, or
969      * zero if not writing the block at the moment. Note that this will return
970      * zero in the "block ready" state as well.
971      *
972      * @return the number of bytes written
973      */
974     public int blockSizeWritten() {
975       if (state != State.WRITING)
976         return 0;
977       return userDataStream.size();
978     }
979 
980     /**
981      * Returns the header followed by the uncompressed data, even if using
982      * compression. This is needed for storing uncompressed blocks in the block
983      * cache. Can be called in the "writing" state or the "block ready" state.
984      * Returns only the header and data, does not include checksum data.
985      *
986      * @return uncompressed block bytes for caching on write
987      */
988     ByteBuffer getUncompressedBufferWithHeader() {
989       expectState(State.BLOCK_READY);
990       return ByteBuffer.wrap(uncompressedBytesWithHeader);
991     }
992 
993     private void expectState(State expectedState) {
994       if (state != expectedState) {
995         throw new IllegalStateException("Expected state: " + expectedState +
996             ", actual state: " + state);
997       }
998     }
999 
1000     /**
1001      * Takes the given {@link BlockWritable} instance, creates a new block of
1002      * its appropriate type, writes the writable into this block, and flushes
1003      * the block into the output stream. The writer is instructed not to buffer
1004      * uncompressed bytes for cache-on-write.
1005      *
1006      * @param bw the block-writable object to write as a block
1007      * @param out the file system output stream
1008      * @throws IOException
1009      */
1010     public void writeBlock(BlockWritable bw, FSDataOutputStream out)
1011         throws IOException {
1012       bw.writeToBlock(startWriting(bw.getBlockType()));
1013       writeHeaderAndData(out);
1014     }
1015 
1016     /**
1017      * Creates a new HFileBlock. Checksums have already been validated, so
1018      * the byte buffer passed into the constructor of this newly created
1019      * block does not have checksum data even though the header minor 
1020      * version is MINOR_VERSION_WITH_CHECKSUM. This is indicated by setting a
1021      * 0 value in bytesPerChecksum.
1022      */
1023     public HFileBlock getBlockForCaching() {
1024       HFileContext newContext = new HFileContextBuilder()
1025                                 .withBlockSize(fileContext.getBlocksize())
1026                                 .withBytesPerCheckSum(0)
1027                                 .withChecksumType(ChecksumType.NULL) // no checksums in cached data
1028                                 .withCompression(fileContext.getCompression())
1029                                 .withDataBlockEncoding(fileContext.getDataBlockEncoding())
1030                                 .withHBaseCheckSum(fileContext.isUseHBaseChecksum())
1031                                 .withCompressTags(fileContext.isCompressTags())
1032                                 .withIncludesMvcc(fileContext.isIncludesMvcc())
1033                                 .withIncludesTags(fileContext.isIncludesTags())
1034                                 .build();
1035       return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(),
1036           getUncompressedSizeWithoutHeader(), prevOffset, getUncompressedBufferWithHeader(),
1037           DONT_FILL_HEADER, startOffset,
1038           onDiskBytesWithHeader.length + onDiskChecksum.length, newContext);
1039     }
1040   }
1041 
1042   /** Something that can be written into a block. */
1043   public interface BlockWritable {
1044 
1045     /** The type of block this data should use. */
1046     BlockType getBlockType();
1047 
1048     /**
1049      * Writes the block to the provided stream. Must not write any magic
1050      * records.
1051      *
1052      * @param out a stream to write uncompressed data into
1053      */
1054     void writeToBlock(DataOutput out) throws IOException;
1055   }
1056 
1057   // Block readers and writers
1058 
1059   /** An interface allowing to iterate {@link HFileBlock}s. */
1060   public interface BlockIterator {
1061 
1062     /**
1063      * Get the next block, or null if there are no more blocks to iterate.
1064      */
1065     HFileBlock nextBlock() throws IOException;
1066 
1067     /**
1068      * Similar to {@link #nextBlock()} but checks block type, throws an
1069      * exception if incorrect, and returns the HFile block
1070      */
1071     HFileBlock nextBlockWithBlockType(BlockType blockType) throws IOException;
1072   }
1073 
1074   /** A full-fledged reader with iteration ability. */
1075   public interface FSReader {
1076 
1077     /**
1078      * Reads the block at the given offset in the file with the given on-disk
1079      * size and uncompressed size.
1080      *
1081      * @param offset
1082      * @param onDiskSize the on-disk size of the entire block, including all
1083      *          applicable headers, or -1 if unknown
1084      * @param uncompressedSize the uncompressed size of the compressed part of
1085      *          the block, or -1 if unknown
1086      * @return the newly read block
1087      */
1088     HFileBlock readBlockData(long offset, long onDiskSize,
1089         int uncompressedSize, boolean pread) throws IOException;
1090 
1091     /**
1092      * Creates a block iterator over the given portion of the {@link HFile}.
1093      * The iterator returns blocks starting with offset such that offset <=
1094      * startOffset < endOffset.
1095      *
1096      * @param startOffset the offset of the block to start iteration with
1097      * @param endOffset the offset to end iteration at (exclusive)
1098      * @return an iterator of blocks between the two given offsets
1099      */
1100     BlockIterator blockRange(long startOffset, long endOffset);
1101 
1102     /** Closes the backing streams */
1103     void closeStreams() throws IOException;
1104   }
1105 
1106   /**
1107    * A common implementation of some methods of {@link FSReader} and some
1108    * tools for implementing HFile format version-specific block readers.
1109    */
1110   private abstract static class AbstractFSReader implements FSReader {
1111     /** Compression algorithm used by the {@link HFile} */
1112 
1113     /** The size of the file we are reading from, or -1 if unknown. */
1114     protected long fileSize;
1115 
1116     /** The size of the header */
1117     protected final int hdrSize;
1118 
1119     /** The filesystem used to access data */
1120     protected HFileSystem hfs;
1121 
1122     /** The path (if any) where this data is coming from */
1123     protected Path path;
1124 
1125     private final Lock streamLock = new ReentrantLock();
1126 
1127     /** The default buffer size for our buffered streams */
1128     public static final int DEFAULT_BUFFER_SIZE = 1 << 20;
1129 
1130     protected HFileContext fileContext;
1131 
1132     public AbstractFSReader(long fileSize, HFileSystem hfs, Path path, HFileContext fileContext)
1133         throws IOException {
1134       this.fileSize = fileSize;
1135       this.hfs = hfs;
1136       this.path = path;
1137       this.fileContext = fileContext;
1138       this.hdrSize = headerSize(fileContext.isUseHBaseChecksum());
1139     }
1140 
1141     @Override
1142     public BlockIterator blockRange(final long startOffset,
1143         final long endOffset) {
1144       return new BlockIterator() {
1145         private long offset = startOffset;
1146 
1147         @Override
1148         public HFileBlock nextBlock() throws IOException {
1149           if (offset >= endOffset)
1150             return null;
1151           HFileBlock b = readBlockData(offset, -1, -1, false);
1152           offset += b.getOnDiskSizeWithHeader();
1153           return b;
1154         }
1155 
1156         @Override
1157         public HFileBlock nextBlockWithBlockType(BlockType blockType)
1158             throws IOException {
1159           HFileBlock blk = nextBlock();
1160           if (blk.getBlockType() != blockType) {
1161             throw new IOException("Expected block of type " + blockType
1162                 + " but found " + blk.getBlockType());
1163           }
1164           return blk;
1165         }
1166       };
1167     }
1168 
1169     /**
1170      * Does a positional read or a seek and read into the given buffer. Returns
1171      * the on-disk size of the next block, or -1 if it could not be determined.
1172      *
1173      * @param dest destination buffer
1174      * @param destOffset offset in the destination buffer
1175      * @param size size of the block to be read
1176      * @param peekIntoNextBlock whether to read the next block's on-disk size
1177      * @param fileOffset position in the stream to read at
1178      * @param pread whether we should do a positional read
1179      * @param istream The input source of data
1180      * @return the on-disk size of the next block with header size included, or
1181      *         -1 if it could not be determined
1182      * @throws IOException
1183      */
1184     protected int readAtOffset(FSDataInputStream istream,
1185         byte[] dest, int destOffset, int size,
1186         boolean peekIntoNextBlock, long fileOffset, boolean pread)
1187         throws IOException {
1188       if (peekIntoNextBlock &&
1189           destOffset + size + hdrSize > dest.length) {
1190         // We are asked to read the next block's header as well, but there is
1191         // not enough room in the array.
1192         throw new IOException("Attempted to read " + size + " bytes and " +
1193             hdrSize + " bytes of next header into a " + dest.length +
1194             "-byte array at offset " + destOffset);
1195       }
1196 
1197       if (!pread && streamLock.tryLock()) {
1198         // Seek + read. Better for scanning.
1199         try {
1200           istream.seek(fileOffset);
1201 
1202           long realOffset = istream.getPos();
1203           if (realOffset != fileOffset) {
1204             throw new IOException("Tried to seek to " + fileOffset + " to "
1205                 + "read " + size + " bytes, but pos=" + realOffset
1206                 + " after seek");
1207           }
1208 
1209           if (!peekIntoNextBlock) {
1210             IOUtils.readFully(istream, dest, destOffset, size);
1211             return -1;
1212           }
1213 
1214           // Try to read the next block header.
1215           if (!readWithExtra(istream, dest, destOffset, size, hdrSize))
1216             return -1;
1217         } finally {
1218           streamLock.unlock();
1219         }
1220       } else {
1221         // Positional read. Better for random reads; or when the streamLock is already locked.
1222         int extraSize = peekIntoNextBlock ? hdrSize : 0;
1223 
1224         int ret = istream.read(fileOffset, dest, destOffset, size + extraSize);
1225         if (ret < size) {
1226           throw new IOException("Positional read of " + size + " bytes " +
1227               "failed at offset " + fileOffset + " (returned " + ret + ")");
1228         }
1229 
1230         if (ret == size || ret < size + extraSize) {
1231           // Could not read the next block's header, or did not try.
1232           return -1;
1233         }
1234       }
1235 
1236       assert peekIntoNextBlock;
1237       return Bytes.toInt(dest, destOffset + size + BlockType.MAGIC_LENGTH) +
1238           hdrSize;
1239     }
1240 
1241   }
1242 
1243   /**
1244    * We always prefetch the header of the next block, so that we know its
1245    * on-disk size in advance and can read it in one operation.
1246    */
1247   private static class PrefetchedHeader {
1248     long offset = -1;
1249     byte[] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE];
1250     ByteBuffer buf = ByteBuffer.wrap(header, 0, HConstants.HFILEBLOCK_HEADER_SIZE);
1251   }
1252 
1253   /** Reads version 2 blocks from the filesystem. */
1254   static class FSReaderV2 extends AbstractFSReader {
1255     /** The file system stream of the underlying {@link HFile} that 
1256      * does or doesn't do checksum validations in the filesystem */
1257     protected FSDataInputStreamWrapper streamWrapper;
1258 
1259     private HFileBlockDecodingContext encodedBlockDecodingCtx;
1260 
1261     private HFileBlockDefaultDecodingContext defaultDecodingCtx;
1262 
1263     private ThreadLocal<PrefetchedHeader> prefetchedHeaderForThread =
1264         new ThreadLocal<PrefetchedHeader>() {
1265           @Override
1266           public PrefetchedHeader initialValue() {
1267             return new PrefetchedHeader();
1268           }
1269         };
1270 
1271     public FSReaderV2(FSDataInputStreamWrapper stream, long fileSize, HFileSystem hfs, Path path,
1272         HFileContext fileContext) throws IOException {
1273       super(fileSize, hfs, path, fileContext);
1274       this.streamWrapper = stream;
1275       // Older versions of HBase didn't support checksum.
1276       this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum());
1277       defaultDecodingCtx =
1278         new HFileBlockDefaultDecodingContext(fileContext);
1279       encodedBlockDecodingCtx =
1280           new HFileBlockDefaultDecodingContext(fileContext);
1281     }
1282 
1283     /**
1284      * A constructor that reads files with the latest minor version.
1285      * This is used by unit tests only.
1286      */
1287     FSReaderV2(FSDataInputStream istream, long fileSize, HFileContext fileContext) throws IOException {
1288       this(new FSDataInputStreamWrapper(istream), fileSize, null, null, fileContext);
1289     }
1290 
1291     /**
1292      * Reads a version 2 block. Tries to do as little memory allocation as
1293      * possible, using the provided on-disk size.
1294      *
1295      * @param offset the offset in the stream to read at
1296      * @param onDiskSizeWithHeaderL the on-disk size of the block, including
1297      *          the header, or -1 if unknown
1298      * @param uncompressedSize the uncompressed size of the the block. Always
1299      *          expected to be -1. This parameter is only used in version 1.
1300      * @param pread whether to use a positional read
1301      */
1302     @Override
1303     public HFileBlock readBlockData(long offset, long onDiskSizeWithHeaderL,
1304         int uncompressedSize, boolean pread) throws IOException {
1305 
1306       // get a copy of the current state of whether to validate
1307       // hbase checksums or not for this read call. This is not 
1308       // thread-safe but the one constaint is that if we decide 
1309       // to skip hbase checksum verification then we are 
1310       // guaranteed to use hdfs checksum verification.
1311       boolean doVerificationThruHBaseChecksum = streamWrapper.shouldUseHBaseChecksum();
1312       FSDataInputStream is = streamWrapper.getStream(doVerificationThruHBaseChecksum);
1313 
1314       HFileBlock blk = readBlockDataInternal(is, offset, 
1315                          onDiskSizeWithHeaderL, 
1316                          uncompressedSize, pread,
1317                          doVerificationThruHBaseChecksum);
1318       if (blk == null) {
1319         HFile.LOG.warn("HBase checksum verification failed for file " +
1320                        path + " at offset " +
1321                        offset + " filesize " + fileSize +
1322                        ". Retrying read with HDFS checksums turned on...");
1323 
1324         if (!doVerificationThruHBaseChecksum) {
1325           String msg = "HBase checksum verification failed for file " +
1326                        path + " at offset " +
1327                        offset + " filesize " + fileSize + 
1328                        " but this cannot happen because doVerify is " +
1329                        doVerificationThruHBaseChecksum;
1330           HFile.LOG.warn(msg);
1331           throw new IOException(msg); // cannot happen case here
1332         }
1333         HFile.checksumFailures.incrementAndGet(); // update metrics
1334 
1335         // If we have a checksum failure, we fall back into a mode where
1336         // the next few reads use HDFS level checksums. We aim to make the
1337         // next CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD reads avoid
1338         // hbase checksum verification, but since this value is set without
1339         // holding any locks, it can so happen that we might actually do
1340         // a few more than precisely this number.
1341         is = this.streamWrapper.fallbackToFsChecksum(CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD);
1342         doVerificationThruHBaseChecksum = false;
1343         blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL,
1344                                     uncompressedSize, pread,
1345                                     doVerificationThruHBaseChecksum);
1346         if (blk != null) {
1347           HFile.LOG.warn("HDFS checksum verification suceeded for file " +
1348                          path + " at offset " +
1349                          offset + " filesize " + fileSize);
1350         }
1351       } 
1352       if (blk == null && !doVerificationThruHBaseChecksum) {
1353         String msg = "readBlockData failed, possibly due to " +
1354                      "checksum verification failed for file " + path +
1355                      " at offset " + offset + " filesize " + fileSize;
1356         HFile.LOG.warn(msg);
1357         throw new IOException(msg);
1358       }
1359 
1360       // If there is a checksum mismatch earlier, then retry with 
1361       // HBase checksums switched off and use HDFS checksum verification.
1362       // This triggers HDFS to detect and fix corrupt replicas. The
1363       // next checksumOffCount read requests will use HDFS checksums.
1364       // The decrementing of this.checksumOffCount is not thread-safe,
1365       // but it is harmless because eventually checksumOffCount will be
1366       // a negative number.
1367       streamWrapper.checksumOk();
1368       return blk;
1369     }
1370 
1371     /**
1372      * Reads a version 2 block. 
1373      *
1374      * @param offset the offset in the stream to read at
1375      * @param onDiskSizeWithHeaderL the on-disk size of the block, including
1376      *          the header, or -1 if unknown
1377      * @param uncompressedSize the uncompressed size of the the block. Always
1378      *          expected to be -1. This parameter is only used in version 1.
1379      * @param pread whether to use a positional read
1380      * @param verifyChecksum Whether to use HBase checksums. 
1381      *        If HBase checksum is switched off, then use HDFS checksum.
1382      * @return the HFileBlock or null if there is a HBase checksum mismatch
1383      */
1384     private HFileBlock readBlockDataInternal(FSDataInputStream is, long offset, 
1385         long onDiskSizeWithHeaderL, int uncompressedSize, boolean pread,
1386         boolean verifyChecksum) throws IOException {
1387       if (offset < 0) {
1388         throw new IOException("Invalid offset=" + offset + " trying to read "
1389             + "block (onDiskSize=" + onDiskSizeWithHeaderL
1390             + ", uncompressedSize=" + uncompressedSize + ")");
1391       }
1392       if (uncompressedSize != -1) {
1393         throw new IOException("Version 2 block reader API does not need " +
1394             "the uncompressed size parameter");
1395       }
1396 
1397       if ((onDiskSizeWithHeaderL < hdrSize && onDiskSizeWithHeaderL != -1)
1398           || onDiskSizeWithHeaderL >= Integer.MAX_VALUE) {
1399         throw new IOException("Invalid onDisksize=" + onDiskSizeWithHeaderL
1400             + ": expected to be at least " + hdrSize
1401             + " and at most " + Integer.MAX_VALUE + ", or -1 (offset="
1402             + offset + ", uncompressedSize=" + uncompressedSize + ")");
1403       }
1404 
1405       int onDiskSizeWithHeader = (int) onDiskSizeWithHeaderL;
1406       // See if we can avoid reading the header. This is desirable, because
1407       // we will not incur a backward seek operation if we have already
1408       // read this block's header as part of the previous read's look-ahead.
1409       // And we also want to skip reading the header again if it has already
1410       // been read.
1411       PrefetchedHeader prefetchedHeader = prefetchedHeaderForThread.get();
1412       ByteBuffer headerBuf = prefetchedHeader.offset == offset ?
1413           prefetchedHeader.buf : null;
1414 
1415       int nextBlockOnDiskSize = 0;
1416       // Allocate enough space to fit the next block's header too.
1417       byte[] onDiskBlock = null;
1418 
1419       HFileBlock b = null;
1420       if (onDiskSizeWithHeader > 0) {
1421         // We know the total on-disk size but not the uncompressed size. Read
1422         // the entire block into memory, then parse the header and decompress
1423         // from memory if using compression. This code path is used when
1424         // doing a random read operation relying on the block index, as well as
1425         // when the client knows the on-disk size from peeking into the next
1426         // block's header (e.g. this block's header) when reading the previous
1427         // block. This is the faster and more preferable case.
1428 
1429         // Size that we have to skip in case we have already read the header.
1430         int preReadHeaderSize = headerBuf == null ? 0 : hdrSize;
1431         onDiskBlock = new byte[onDiskSizeWithHeader + hdrSize];
1432         nextBlockOnDiskSize = readAtOffset(is, onDiskBlock,
1433             preReadHeaderSize, onDiskSizeWithHeader - preReadHeaderSize,
1434             true, offset + preReadHeaderSize, pread);
1435         if (headerBuf != null) {
1436           // the header has been read when reading the previous block, copy
1437           // to this block's header
1438           System.arraycopy(headerBuf.array(),
1439               headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize);
1440         } else {
1441           headerBuf = ByteBuffer.wrap(onDiskBlock, 0, hdrSize);
1442         }
1443         // We know the total on-disk size but not the uncompressed size. Read
1444         // the entire block into memory, then parse the header and decompress
1445         // from memory if using compression. Here we have already read the
1446         // block's header
1447         try {
1448           b = new HFileBlock(headerBuf, this.fileContext.isUseHBaseChecksum());
1449         } catch (IOException ex) {
1450           // Seen in load testing. Provide comprehensive debug info.
1451           throw new IOException("Failed to read compressed block at "
1452               + offset
1453               + ", onDiskSizeWithoutHeader="
1454               + onDiskSizeWithHeader
1455               + ", preReadHeaderSize="
1456               + hdrSize
1457               + ", header.length="
1458               + prefetchedHeader.header.length
1459               + ", header bytes: "
1460               + Bytes.toStringBinary(prefetchedHeader.header, 0,
1461                   hdrSize), ex);
1462         }
1463         // if the caller specifies a onDiskSizeWithHeader, validate it.
1464         int onDiskSizeWithoutHeader = onDiskSizeWithHeader - hdrSize;
1465         assert onDiskSizeWithoutHeader >= 0;
1466         b.validateOnDiskSizeWithoutHeader(onDiskSizeWithoutHeader);
1467       } else {
1468         // Check headerBuf to see if we have read this block's header as part of
1469         // reading the previous block. This is an optimization of peeking into
1470         // the next block's header (e.g.this block's header) when reading the
1471         // previous block. This is the faster and more preferable case. If the
1472         // header is already there, don't read the header again.
1473 
1474         // Unfortunately, we still have to do a separate read operation to
1475         // read the header.
1476         if (headerBuf == null) {
1477           // From the header, determine the on-disk size of the given hfile
1478           // block, and read the remaining data, thereby incurring two read
1479           // operations. This might happen when we are doing the first read
1480           // in a series of reads or a random read, and we don't have access
1481           // to the block index. This is costly and should happen very rarely.
1482           headerBuf = ByteBuffer.allocate(hdrSize);
1483           readAtOffset(is, headerBuf.array(), headerBuf.arrayOffset(),
1484               hdrSize, false, offset, pread);
1485         }
1486         b = new HFileBlock(headerBuf, this.fileContext.isUseHBaseChecksum());
1487         onDiskBlock = new byte[b.getOnDiskSizeWithHeader() + hdrSize];
1488         System.arraycopy(headerBuf.array(),
1489               headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize);
1490         nextBlockOnDiskSize =
1491           readAtOffset(is, onDiskBlock, hdrSize, b.getOnDiskSizeWithHeader()
1492               - hdrSize, true, offset + hdrSize, pread);
1493         onDiskSizeWithHeader = b.onDiskSizeWithoutHeader + hdrSize;
1494       }
1495 
1496       Algorithm compressAlgo = fileContext.getCompression();
1497       boolean isCompressed =
1498         compressAlgo != null
1499             && compressAlgo != Compression.Algorithm.NONE;
1500 
1501       Encryption.Context cryptoContext = fileContext.getEncryptionContext();
1502       boolean isEncrypted = cryptoContext != null
1503           && cryptoContext != Encryption.Context.NONE;
1504 
1505       if (!isCompressed && !isEncrypted) {
1506         b.assumeUncompressed();
1507       }
1508 
1509       if (verifyChecksum &&
1510           !validateBlockChecksum(b, onDiskBlock, hdrSize)) {
1511         return null;             // checksum mismatch
1512       }
1513 
1514       if (isCompressed || isEncrypted) {
1515         // This will allocate a new buffer but keep header bytes.
1516         b.allocateBuffer(nextBlockOnDiskSize > 0);
1517         if (b.blockType == BlockType.ENCODED_DATA) {
1518           encodedBlockDecodingCtx.prepareDecoding(b.getOnDiskSizeWithoutHeader(),
1519               b.getUncompressedSizeWithoutHeader(), b.getBufferWithoutHeader(), onDiskBlock,
1520               hdrSize);
1521         } else {
1522           defaultDecodingCtx.prepareDecoding(b.getOnDiskSizeWithoutHeader(),
1523               b.getUncompressedSizeWithoutHeader(), b.getBufferWithoutHeader(), onDiskBlock,
1524               hdrSize);
1525         }
1526         if (nextBlockOnDiskSize > 0) {
1527           // Copy next block's header bytes into the new block if we have them.
1528           System.arraycopy(onDiskBlock, onDiskSizeWithHeader, b.buf.array(),
1529               b.buf.arrayOffset() + hdrSize
1530               + b.uncompressedSizeWithoutHeader + b.totalChecksumBytes(),
1531               hdrSize);
1532         }
1533       } else {
1534         // The onDiskBlock will become the headerAndDataBuffer for this block.
1535         // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already
1536         // contains the header of next block, so no need to set next
1537         // block's header in it.
1538         b = new HFileBlock(ByteBuffer.wrap(onDiskBlock, 0,
1539                 onDiskSizeWithHeader), this.fileContext.isUseHBaseChecksum());
1540       }
1541 
1542       b.nextBlockOnDiskSizeWithHeader = nextBlockOnDiskSize;
1543 
1544       // Set prefetched header
1545       if (b.nextBlockOnDiskSizeWithHeader > 0) {
1546         prefetchedHeader.offset = offset + b.getOnDiskSizeWithHeader();
1547         System.arraycopy(onDiskBlock, onDiskSizeWithHeader,
1548             prefetchedHeader.header, 0, hdrSize);
1549       }
1550 
1551       b.offset = offset;
1552       b.fileContext.setIncludesTags(this.fileContext.isIncludesTags());
1553       b.fileContext.setIncludesMvcc(this.fileContext.isIncludesMvcc());
1554       return b;
1555     }
1556 
1557     void setIncludesMemstoreTS(boolean includesMemstoreTS) {
1558       this.fileContext.setIncludesMvcc(includesMemstoreTS);
1559     }
1560 
1561     void setDataBlockEncoder(HFileDataBlockEncoder encoder) {
1562       encodedBlockDecodingCtx = encoder.newDataBlockDecodingContext(this.fileContext);
1563     }
1564 
1565     /**
1566      * Generates the checksum for the header as well as the data and
1567      * then validates that it matches the value stored in the header.
1568      * If there is a checksum mismatch, then return false. Otherwise
1569      * return true.
1570      */
1571     protected boolean validateBlockChecksum(HFileBlock block, 
1572       byte[] data, int hdrSize) throws IOException {
1573       return ChecksumUtil.validateBlockChecksum(path, block,
1574                                                 data, hdrSize);
1575     }
1576 
1577     @Override
1578     public void closeStreams() throws IOException {
1579       streamWrapper.close();
1580     }
1581   }
1582 
1583   @Override
1584   public int getSerializedLength() {
1585     if (buf != null) {
1586       return this.buf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE;
1587     }
1588     return 0;
1589   }
1590 
1591   @Override
1592   public void serialize(ByteBuffer destination) {
1593     ByteBuffer dupBuf = this.buf.duplicate();
1594     dupBuf.rewind();
1595     destination.put(dupBuf);
1596     serializeExtraInfo(destination);
1597   }
1598 
1599   public void serializeExtraInfo(ByteBuffer destination) {
1600     destination.put(this.fileContext.isUseHBaseChecksum() ? (byte) 1 : (byte) 0);
1601     destination.putLong(this.offset);
1602     destination.putInt(this.nextBlockOnDiskSizeWithHeader);
1603     destination.rewind();
1604   }
1605 
1606   @Override
1607   public CacheableDeserializer<Cacheable> getDeserializer() {
1608     return HFileBlock.blockDeserializer;
1609   }
1610 
1611   @Override
1612   public boolean equals(Object comparison) {
1613     if (this == comparison) {
1614       return true;
1615     }
1616     if (comparison == null) {
1617       return false;
1618     }
1619     if (comparison.getClass() != this.getClass()) {
1620       return false;
1621     }
1622 
1623     HFileBlock castedComparison = (HFileBlock) comparison;
1624 
1625     if (castedComparison.blockType != this.blockType) {
1626       return false;
1627     }
1628     if (castedComparison.nextBlockOnDiskSizeWithHeader != this.nextBlockOnDiskSizeWithHeader) {
1629       return false;
1630     }
1631     if (castedComparison.offset != this.offset) {
1632       return false;
1633     }
1634     if (castedComparison.onDiskSizeWithoutHeader != this.onDiskSizeWithoutHeader) {
1635       return false;
1636     }
1637     if (castedComparison.prevBlockOffset != this.prevBlockOffset) {
1638       return false;
1639     }
1640     if (castedComparison.uncompressedSizeWithoutHeader != this.uncompressedSizeWithoutHeader) {
1641       return false;
1642     }
1643     if (this.buf.compareTo(castedComparison.buf) != 0) {
1644       return false;
1645     }
1646     if (this.buf.position() != castedComparison.buf.position()){
1647       return false;
1648     }
1649     if (this.buf.limit() != castedComparison.buf.limit()){
1650       return false;
1651     }
1652     return true;
1653   }
1654 
1655   public DataBlockEncoding getDataBlockEncoding() {
1656     if (blockType == BlockType.ENCODED_DATA) {
1657       return DataBlockEncoding.getEncodingById(getDataBlockEncodingId());
1658     }
1659     return DataBlockEncoding.NONE;
1660   }
1661 
1662   byte getChecksumType() {
1663     return this.fileContext.getChecksumType().getCode();
1664   }
1665 
1666   int getBytesPerChecksum() {
1667     return this.fileContext.getBytesPerChecksum();
1668   }
1669 
1670   int getOnDiskDataSizeWithHeader() {
1671     return this.onDiskDataSizeWithHeader;
1672   }
1673 
1674   /** 
1675    * Calcuate the number of bytes required to store all the checksums
1676    * for this block. Each checksum value is a 4 byte integer.
1677    */
1678   int totalChecksumBytes() {
1679     // If the hfile block has minorVersion 0, then there are no checksum
1680     // data to validate. Similarly, a zero value in this.bytesPerChecksum
1681     // indicates that cached blocks do not have checksum data because
1682     // checksums were already validated when the block was read from disk.
1683     if (!fileContext.isUseHBaseChecksum() || this.fileContext.getBytesPerChecksum() == 0) {
1684       return 0;
1685     }
1686     return (int)ChecksumUtil.numBytes(onDiskDataSizeWithHeader, this.fileContext.getBytesPerChecksum());
1687   }
1688 
1689   /**
1690    * Returns the size of this block header.
1691    */
1692   public int headerSize() {
1693     return headerSize(this.fileContext.isUseHBaseChecksum());
1694   }
1695 
1696   /**
1697    * Maps a minor version to the size of the header.
1698    */
1699   public static int headerSize(boolean usesHBaseChecksum) {
1700     if (usesHBaseChecksum) {
1701       return HConstants.HFILEBLOCK_HEADER_SIZE;
1702     }
1703     return HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
1704   }
1705 
1706   /**
1707    * Return the appropriate DUMMY_HEADER for the minor version
1708    */
1709   public byte[] getDummyHeaderForVersion() {
1710     return getDummyHeaderForVersion(this.fileContext.isUseHBaseChecksum());
1711   }
1712 
1713   /**
1714    * Return the appropriate DUMMY_HEADER for the minor version
1715    */
1716   static private byte[] getDummyHeaderForVersion(boolean usesHBaseChecksum) {
1717     if (usesHBaseChecksum) {
1718       return HConstants.HFILEBLOCK_DUMMY_HEADER;
1719     }
1720     return DUMMY_HEADER_NO_CHECKSUM;
1721   }
1722 
1723   public HFileContext getHFileContext() {
1724     return this.fileContext;
1725   }
1726 
1727   /**
1728    * Convert the contents of the block header into a human readable string.
1729    * This is mostly helpful for debugging. This assumes that the block
1730    * has minor version > 0.
1731    */
1732   static String toStringHeader(ByteBuffer buf) throws IOException {
1733     int offset = buf.arrayOffset();
1734     byte[] b = buf.array();
1735     long magic = Bytes.toLong(b, offset); 
1736     BlockType bt = BlockType.read(buf);
1737     offset += Bytes.SIZEOF_LONG;
1738     int compressedBlockSizeNoHeader = Bytes.toInt(b, offset);
1739     offset += Bytes.SIZEOF_INT;
1740     int uncompressedBlockSizeNoHeader = Bytes.toInt(b, offset);
1741     offset += Bytes.SIZEOF_INT;
1742     long prevBlockOffset = Bytes.toLong(b, offset); 
1743     offset += Bytes.SIZEOF_LONG;
1744     byte cksumtype = b[offset];
1745     offset += Bytes.SIZEOF_BYTE;
1746     long bytesPerChecksum = Bytes.toInt(b, offset); 
1747     offset += Bytes.SIZEOF_INT;
1748     long onDiskDataSizeWithHeader = Bytes.toInt(b, offset); 
1749     offset += Bytes.SIZEOF_INT;
1750     return " Header dump: magic: " + magic +
1751                    " blockType " + bt +
1752                    " compressedBlockSizeNoHeader " + 
1753                    compressedBlockSizeNoHeader +
1754                    " uncompressedBlockSizeNoHeader " + 
1755                    uncompressedBlockSizeNoHeader +
1756                    " prevBlockOffset " + prevBlockOffset +
1757                    " checksumType " + ChecksumType.codeToType(cksumtype) +
1758                    " bytesPerChecksum " + bytesPerChecksum +
1759                    " onDiskDataSizeWithHeader " + onDiskDataSizeWithHeader;
1760   }
1761 }
1762