View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.io.hfile;
19  
20  import static org.apache.hadoop.hbase.io.hfile.BlockType.MAGIC_LENGTH;
21  import static org.apache.hadoop.hbase.io.hfile.Compression.Algorithm.NONE;
22  
23  import java.io.BufferedInputStream;
24  import java.io.ByteArrayInputStream;
25  import java.io.ByteArrayOutputStream;
26  import java.io.DataInputStream;
27  import java.io.DataOutput;
28  import java.io.DataOutputStream;
29  import java.io.IOException;
30  import java.io.InputStream;
31  import java.nio.ByteBuffer;
32  import java.util.concurrent.locks.Lock;
33  import java.util.concurrent.locks.ReentrantLock;
34  
35  import org.apache.hadoop.fs.FSDataInputStream;
36  import org.apache.hadoop.fs.FSDataOutputStream;
37  import org.apache.hadoop.fs.Path;
38  import org.apache.hadoop.hbase.HConstants;
39  import org.apache.hadoop.hbase.fs.HFileSystem;
40  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
41  import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
42  import org.apache.hadoop.hbase.regionserver.MemStore;
43  import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured;
44  import org.apache.hadoop.hbase.util.Bytes;
45  import org.apache.hadoop.hbase.util.ChecksumType;
46  import org.apache.hadoop.hbase.util.ClassSize;
47  import org.apache.hadoop.hbase.util.CompoundBloomFilter;
48  import org.apache.hadoop.hbase.util.Pair;
49  import org.apache.hadoop.hbase.util.Writables;
50  import org.apache.hadoop.io.IOUtils;
51  import org.apache.hadoop.io.Writable;
52  import org.apache.hadoop.io.compress.CompressionOutputStream;
53  import org.apache.hadoop.io.compress.Compressor;
54  import org.apache.hadoop.io.compress.Decompressor;
55  
56  import com.google.common.base.Preconditions;
57  
58  /**
59   * Reading {@link HFile} version 1 and 2 blocks, and writing version 2 blocks.
60   * <ul>
61   * <li>In version 1 all blocks are always compressed or uncompressed, as
62   * specified by the {@link HFile}'s compression algorithm, with a type-specific
63   * magic record stored in the beginning of the compressed data (i.e. one needs
64   * to uncompress the compressed block to determine the block type). There is
65   * only a single compression algorithm setting for all blocks. Offset and size
66   * information from the block index are required to read a block.
67   * <li>In version 2 a block is structured as follows:
68   * <ul>
69   * <li>Magic record identifying the block type (8 bytes)
70   * <li>Compressed block size, header not included (4 bytes)
71   * <li>Uncompressed block size, header not included (4 bytes)
72   * <li>The offset of the previous block of the same type (8 bytes). This is
73   * used to be able to navigate to the previous block without going to the block
74   * <li>For minorVersions >=1, there is an additional 4 byte field 
75   * bytesPerChecksum that records the number of bytes in a checksum chunk.
76   * <li>For minorVersions >=1, there is a 4 byte value to store the size of
77   * data on disk (excluding the checksums)
78   * <li>For minorVersions >=1, a series of 4 byte checksums, one each for
79   * the number of bytes specified by bytesPerChecksum.
80   * index.
81   * <li>Compressed data (or uncompressed data if compression is disabled). The
82   * compression algorithm is the same for all the blocks in the {@link HFile},
83   * similarly to what was done in version 1.
84   * </ul>
85   * </ul>
86   * The version 2 block representation in the block cache is the same as above,
87   * except that the data section is always uncompressed in the cache.
88   */
89  public class HFileBlock extends SchemaConfigured implements Cacheable {
90  
91    /** Minor versions starting with this number have hbase checksums */
92    static final int MINOR_VERSION_WITH_CHECKSUM = 1;
93  
94    /** minor version that does not support checksums */
95    static final int MINOR_VERSION_NO_CHECKSUM = 0;
96  
97    /**
98     * On a checksum failure on a Reader, these many suceeding read
99     * requests switch back to using hdfs checksums before auto-reenabling
100    * hbase checksum verification.
101    */
102   static final int CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD = 3;
103 
104   /** The size data structures with minor version is 0 */
105   static final int HEADER_SIZE_NO_CHECKSUM = MAGIC_LENGTH + 2 * Bytes.SIZEOF_INT
106       + Bytes.SIZEOF_LONG;
107 
108   public static final boolean FILL_HEADER = true;
109   public static final boolean DONT_FILL_HEADER = false;
110 
111   /** The size of a version 2 {@link HFile} block header, minor version 1.
112    * There is a 1 byte checksum type, followed by a 4 byte bytesPerChecksum
113    * followed by another 4 byte value to store sizeofDataOnDisk.
114    */
115   static final int HEADER_SIZE_WITH_CHECKSUMS = HEADER_SIZE_NO_CHECKSUM + Bytes.SIZEOF_BYTE +
116                                  2 * Bytes.SIZEOF_INT;
117 
118   /**
119    * The size of block header when blockType is {@link BlockType#ENCODED_DATA}.
120    * This extends normal header by adding the id of encoder.
121    */
122   public static final int ENCODED_HEADER_SIZE = HEADER_SIZE_WITH_CHECKSUMS
123       + DataBlockEncoding.ID_SIZE;
124 
125   /** Just an array of bytes of the right size. */
126   static final byte[] DUMMY_HEADER_WITH_CHECKSUM = new byte[HEADER_SIZE_WITH_CHECKSUMS];
127   static final byte[] DUMMY_HEADER_NO_CHECKSUM = 
128      new byte[HEADER_SIZE_NO_CHECKSUM];
129 
130   public static final int BYTE_BUFFER_HEAP_SIZE = (int) ClassSize.estimateBase(
131       ByteBuffer.wrap(new byte[0], 0, 0).getClass(), false);
132 
133   static final int EXTRA_SERIALIZATION_SPACE = Bytes.SIZEOF_LONG +
134       Bytes.SIZEOF_INT;
135 
136   /**
137    * Each checksum value is an integer that can be stored in 4 bytes.
138    */
139   static final int CHECKSUM_SIZE = Bytes.SIZEOF_INT;
140 
141   private static final CacheableDeserializer<Cacheable> blockDeserializer =
142       new CacheableDeserializer<Cacheable>() {
143         public HFileBlock deserialize(ByteBuffer buf) throws IOException{
144           ByteBuffer newByteBuffer = ByteBuffer.allocate(buf.limit()
145               - HFileBlock.EXTRA_SERIALIZATION_SPACE);
146           buf.limit(buf.limit()
147               - HFileBlock.EXTRA_SERIALIZATION_SPACE).rewind();
148           newByteBuffer.put(buf);
149           HFileBlock ourBuffer = new HFileBlock(newByteBuffer, 
150                                    MINOR_VERSION_NO_CHECKSUM);
151 
152           buf.position(buf.limit());
153           buf.limit(buf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE);
154           ourBuffer.offset = buf.getLong();
155           ourBuffer.nextBlockOnDiskSizeWithHeader = buf.getInt();
156           return ourBuffer;
157         }
158       };
159 
160   private BlockType blockType;
161 
162   /** Size on disk without the header. It includes checksum data too. */
163   private int onDiskSizeWithoutHeader;
164 
165   /** Size of pure data. Does not include header or checksums */
166   private final int uncompressedSizeWithoutHeader;
167 
168   /** The offset of the previous block on disk */
169   private final long prevBlockOffset;
170 
171   /** The Type of checksum, better to store the byte than an object */
172   private final byte checksumType;
173 
174   /** The number of bytes for which a checksum is computed */
175   private final int bytesPerChecksum;
176 
177   /** Size on disk of header and data. Does not include checksum data */
178   private final int onDiskDataSizeWithHeader;
179 
180   /** The minor version of the hfile. */
181   private final int minorVersion;
182 
183   /** The in-memory representation of the hfile block */
184   private ByteBuffer buf;
185 
186   /** Whether there is a memstore timestamp after every key/value */
187   private boolean includesMemstoreTS;
188 
189   /**
190    * The offset of this block in the file. Populated by the reader for
191    * convenience of access. This offset is not part of the block header.
192    */
193   private long offset = -1;
194 
195   /**
196    * The on-disk size of the next block, including the header, obtained by
197    * peeking into the first {@link HFileBlock#headerSize(int)} bytes of the next block's
198    * header, or -1 if unknown.
199    */
200   private int nextBlockOnDiskSizeWithHeader = -1;
201 
202   /**
203    * Creates a new {@link HFile} block from the given fields. This constructor
204    * is mostly used when the block data has already been read and uncompressed,
205    * and is sitting in a byte buffer. 
206    *
207    * @param blockType the type of this block, see {@link BlockType}
208    * @param onDiskSizeWithoutHeader compressed size of the block if compression
209    *          is used, otherwise uncompressed size, header size not included
210    * @param uncompressedSizeWithoutHeader uncompressed size of the block,
211    *          header size not included. Equals onDiskSizeWithoutHeader if
212    *          compression is disabled.
213    * @param prevBlockOffset the offset of the previous block in the
214    *          {@link HFile}
215    * @param buf block header {@link HFileBlock#headerSize(int)} bytes) followed by
216    *          uncompressed data. This
217    * @param fillHeader true to fill in the first {@link HFileBlock#headerSize(int)} bytes of
218    *          the buffer based on the header fields provided
219    * @param offset the file offset the block was read from
220    * @param minorVersion the minor version of this block
221    * @param bytesPerChecksum the number of bytes per checksum chunk
222    * @param checksumType the checksum algorithm to use
223    * @param onDiskDataSizeWithHeader size of header and data on disk not
224    *        including checksum data
225    */
226   HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader,
227       int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuffer buf,
228       boolean fillHeader, long offset, boolean includesMemstoreTS, 
229       int minorVersion, int bytesPerChecksum, byte checksumType,
230       int onDiskDataSizeWithHeader) {
231     this.blockType = blockType;
232     this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader;
233     this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader;
234     this.prevBlockOffset = prevBlockOffset;
235     this.buf = buf;
236     if (fillHeader)
237       overwriteHeader();
238     this.offset = offset;
239     this.includesMemstoreTS = includesMemstoreTS;
240     this.minorVersion = minorVersion;
241     this.bytesPerChecksum = bytesPerChecksum;
242     this.checksumType = checksumType;
243     this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader;
244   }
245 
246   /**
247    * Creates a block from an existing buffer starting with a header. Rewinds
248    * and takes ownership of the buffer. By definition of rewind, ignores the
249    * buffer position, but if you slice the buffer beforehand, it will rewind
250    * to that point. The reason this has a minorNumber and not a majorNumber is
251    * because majorNumbers indicate the format of a HFile whereas minorNumbers 
252    * indicate the format inside a HFileBlock.
253    */
254   HFileBlock(ByteBuffer b, int minorVersion) throws IOException {
255     b.rewind();
256     blockType = BlockType.read(b);
257     onDiskSizeWithoutHeader = b.getInt();
258     uncompressedSizeWithoutHeader = b.getInt();
259     prevBlockOffset = b.getLong();
260     this.minorVersion = minorVersion;
261     if (minorVersion >= MINOR_VERSION_WITH_CHECKSUM) {
262       this.checksumType = b.get();
263       this.bytesPerChecksum = b.getInt();
264       this.onDiskDataSizeWithHeader = b.getInt();
265     } else {
266       this.checksumType = ChecksumType.NULL.getCode();
267       this.bytesPerChecksum = 0;
268       this.onDiskDataSizeWithHeader = onDiskSizeWithoutHeader +
269                                        HEADER_SIZE_NO_CHECKSUM;
270     }
271     buf = b;
272     buf.rewind();
273   }
274 
275   public BlockType getBlockType() {
276     return blockType;
277   }
278 
279   /** @return get data block encoding id that was used to encode this block */
280   public short getDataBlockEncodingId() {
281     if (blockType != BlockType.ENCODED_DATA) {
282       throw new IllegalArgumentException("Querying encoder ID of a block " +
283           "of type other than " + BlockType.ENCODED_DATA + ": " + blockType);
284     }
285     return buf.getShort(headerSize());
286   }
287 
288   /**
289    * @return the on-disk size of the block with header size included. This
290    * includes the header, the data and the checksum data.
291    */
292   public int getOnDiskSizeWithHeader() {
293     return onDiskSizeWithoutHeader + headerSize();
294   }
295 
296   /**
297    * Returns the size of the compressed part of the block in case compression
298    * is used, or the uncompressed size of the data part otherwise. Header size
299    * and checksum data size is not included.
300    *
301    * @return the on-disk size of the data part of the block, header and
302    *         checksum not included. 
303    */
304   int getOnDiskSizeWithoutHeader() {
305     return onDiskSizeWithoutHeader;
306   }
307 
308   /**
309    * @return the uncompressed size of the data part of the block, header not
310    *         included
311    */
312    public int getUncompressedSizeWithoutHeader() {
313     return uncompressedSizeWithoutHeader;
314   }
315 
316   /**
317    * @return the offset of the previous block of the same type in the file, or
318    *         -1 if unknown
319    */
320   public long getPrevBlockOffset() {
321     return prevBlockOffset;
322   }
323 
324   /**
325    * Writes header fields into the first {@link ©HEADER_SIZE_WITH_CHECKSUMS} bytes of the
326    * buffer. Resets the buffer position to the end of header as side effect.
327    */
328   private void overwriteHeader() {
329     buf.rewind();
330     blockType.write(buf);
331     buf.putInt(onDiskSizeWithoutHeader);
332     buf.putInt(uncompressedSizeWithoutHeader);
333     buf.putLong(prevBlockOffset);
334   }
335 
336   /**
337    * Returns a buffer that does not include the header. The array offset points
338    * to the start of the block data right after the header. The underlying data
339    * array is not copied. Checksum data is not included in the returned buffer.
340    *
341    * @return the buffer with header skipped
342    */
343   ByteBuffer getBufferWithoutHeader() {
344     return ByteBuffer.wrap(buf.array(), buf.arrayOffset() + headerSize(),
345         buf.limit() - headerSize() - totalChecksumBytes()).slice();
346   }
347 
348   /**
349    * Returns the buffer this block stores internally. The clients must not
350    * modify the buffer object. This method has to be public because it is
351    * used in {@link CompoundBloomFilter} to avoid object creation on every
352    * Bloom filter lookup, but has to be used with caution. Checksum data
353    * is not included in the returned buffer.
354    *
355    * @return the buffer of this block for read-only operations
356    */
357   public ByteBuffer getBufferReadOnly() {
358     return ByteBuffer.wrap(buf.array(), buf.arrayOffset(),
359         buf.limit() - totalChecksumBytes()).slice();
360   }
361 
362   /**
363    * Returns a byte buffer of this block, including header data, positioned at
364    * the beginning of header. The underlying data array is not copied.
365    *
366    * @return the byte buffer with header included
367    */
368   ByteBuffer getBufferWithHeader() {
369     ByteBuffer dupBuf = buf.duplicate();
370     dupBuf.rewind();
371     return dupBuf;
372   }
373 
374   /**
375    * Deserializes fields of the given writable using the data portion of this
376    * block. Does not check that all the block data has been read.
377    */
378   void readInto(Writable w) throws IOException {
379     Preconditions.checkNotNull(w);
380 
381     if (Writables.getWritable(buf.array(), buf.arrayOffset() + headerSize(),
382         buf.limit() - headerSize(), w) == null) {
383       throw new IOException("Failed to deserialize block " + this + " into a "
384           + w.getClass().getSimpleName());
385     }
386   }
387 
388   private void sanityCheckAssertion(long valueFromBuf, long valueFromField,
389       String fieldName) throws IOException {
390     if (valueFromBuf != valueFromField) {
391       throw new AssertionError(fieldName + " in the buffer (" + valueFromBuf
392           + ") is different from that in the field (" + valueFromField + ")");
393     }
394   }
395 
396   /**
397    * Checks if the block is internally consistent, i.e. the first
398    * {@link HFileBlock#headerSize(int)} bytes of the buffer contain a valid header consistent
399    * with the fields. This function is primary for testing and debugging, and
400    * is not thread-safe, because it alters the internal buffer pointer.
401    */
402   void sanityCheck() throws IOException {
403     buf.rewind();
404 
405     {
406       BlockType blockTypeFromBuf = BlockType.read(buf);
407       if (blockTypeFromBuf != blockType) {
408         throw new IOException("Block type stored in the buffer: " +
409             blockTypeFromBuf + ", block type field: " + blockType);
410       }
411     }
412 
413     sanityCheckAssertion(buf.getInt(), onDiskSizeWithoutHeader,
414         "onDiskSizeWithoutHeader");
415 
416     sanityCheckAssertion(buf.getInt(), uncompressedSizeWithoutHeader,
417         "uncompressedSizeWithoutHeader");
418 
419     sanityCheckAssertion(buf.getLong(), prevBlockOffset, "prevBlocKOffset");
420     if (minorVersion >= MINOR_VERSION_WITH_CHECKSUM) {
421       sanityCheckAssertion(buf.get(), checksumType, "checksumType");
422       sanityCheckAssertion(buf.getInt(), bytesPerChecksum, "bytesPerChecksum");
423       sanityCheckAssertion(buf.getInt(), onDiskDataSizeWithHeader, 
424                            "onDiskDataSizeWithHeader");
425     }
426 
427     int cksumBytes = totalChecksumBytes();
428     int hdrSize = headerSize();
429     int expectedBufLimit = uncompressedSizeWithoutHeader + headerSize() +
430                            cksumBytes;
431     if (buf.limit() != expectedBufLimit) {
432       throw new AssertionError("Expected buffer limit " + expectedBufLimit
433           + ", got " + buf.limit());
434     }
435 
436     // We might optionally allocate HEADER_SIZE_WITH_CHECKSUMS more bytes to read the next
437     // block's, header, so there are two sensible values for buffer capacity.
438     int size = uncompressedSizeWithoutHeader + hdrSize + cksumBytes;
439     if (buf.capacity() != size &&
440         buf.capacity() != size + hdrSize) {
441       throw new AssertionError("Invalid buffer capacity: " + buf.capacity() +
442           ", expected " + size + " or " + (size + hdrSize));
443     }
444   }
445 
446   @Override
447   public String toString() {
448     return "blockType="
449         + blockType
450         + ", onDiskSizeWithoutHeader="
451         + onDiskSizeWithoutHeader
452         + ", uncompressedSizeWithoutHeader="
453         + uncompressedSizeWithoutHeader
454         + ", prevBlockOffset="
455         + prevBlockOffset
456         + ", dataBeginsWith="
457         + Bytes.toStringBinary(buf.array(), buf.arrayOffset() + headerSize(),
458             Math.min(32, buf.limit() - buf.arrayOffset() - headerSize()))
459         + ", fileOffset=" + offset;
460   }
461 
462   private void validateOnDiskSizeWithoutHeader(
463       int expectedOnDiskSizeWithoutHeader) throws IOException {
464     if (onDiskSizeWithoutHeader != expectedOnDiskSizeWithoutHeader) {
465       String blockInfoMsg =
466         "Block offset: " + offset + ", data starts with: "
467           + Bytes.toStringBinary(buf.array(), buf.arrayOffset(),
468               buf.arrayOffset() + Math.min(32, buf.limit()));
469       throw new IOException("On-disk size without header provided is "
470           + expectedOnDiskSizeWithoutHeader + ", but block "
471           + "header contains " + onDiskSizeWithoutHeader + ". " +
472           blockInfoMsg);
473     }
474   }
475 
476   /**
477    * Always allocates a new buffer of the correct size. Copies header bytes
478    * from the existing buffer. Does not change header fields. 
479    * Reserve room to keep checksum bytes too.
480    *
481    * @param extraBytes whether to reserve room in the buffer to read the next
482    *          block's header
483    */
484   private void allocateBuffer(boolean extraBytes) {
485     int cksumBytes = totalChecksumBytes();
486     int capacityNeeded = headerSize() + uncompressedSizeWithoutHeader +
487         cksumBytes +
488         (extraBytes ? headerSize() : 0);
489 
490     ByteBuffer newBuf = ByteBuffer.allocate(capacityNeeded);
491 
492     // Copy header bytes.
493     System.arraycopy(buf.array(), buf.arrayOffset(), newBuf.array(),
494         newBuf.arrayOffset(), headerSize());
495 
496     buf = newBuf;
497     buf.limit(headerSize() + uncompressedSizeWithoutHeader + cksumBytes);
498   }
499 
500   /** An additional sanity-check in case no compression is being used. */
501   public void assumeUncompressed() throws IOException {
502     if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader + 
503         totalChecksumBytes()) {
504       throw new IOException("Using no compression but "
505           + "onDiskSizeWithoutHeader=" + onDiskSizeWithoutHeader + ", "
506           + "uncompressedSizeWithoutHeader=" + uncompressedSizeWithoutHeader
507           + ", numChecksumbytes=" + totalChecksumBytes());
508     }
509   }
510 
511   /**
512    * @param expectedType the expected type of this block
513    * @throws IOException if this block's type is different than expected
514    */
515   public void expectType(BlockType expectedType) throws IOException {
516     if (blockType != expectedType) {
517       throw new IOException("Invalid block type: expected=" + expectedType
518           + ", actual=" + blockType);
519     }
520   }
521 
522   /** @return the offset of this block in the file it was read from */
523   public long getOffset() {
524     if (offset < 0) {
525       throw new IllegalStateException(
526           "HFile block offset not initialized properly");
527     }
528     return offset;
529   }
530 
531   /**
532    * @return a byte stream reading the data section of this block
533    */
534   public DataInputStream getByteStream() {
535     return new DataInputStream(new ByteArrayInputStream(buf.array(),
536         buf.arrayOffset() + headerSize(), buf.limit() - headerSize()));
537   }
538 
539   @Override
540   public long heapSize() {
541     long size = ClassSize.align(
542         // Base class size, including object overhead.
543         SCHEMA_CONFIGURED_UNALIGNED_HEAP_SIZE +
544         // Block type and byte buffer references
545         2 * ClassSize.REFERENCE +
546         // On-disk size, uncompressed size, and next block's on-disk size
547         // bytePerChecksum,  onDiskDataSize and minorVersion
548         6 * Bytes.SIZEOF_INT +
549         // Checksum type
550         1 * Bytes.SIZEOF_BYTE +
551         // This and previous block offset
552         2 * Bytes.SIZEOF_LONG +
553         // "Include memstore timestamp" flag
554         Bytes.SIZEOF_BOOLEAN
555     );
556 
557     if (buf != null) {
558       // Deep overhead of the byte buffer. Needs to be aligned separately.
559       size += ClassSize.align(buf.capacity() + BYTE_BUFFER_HEAP_SIZE);
560     }
561 
562     return ClassSize.align(size);
563   }
564 
565   /**
566    * Read from an input stream. Analogous to
567    * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but specifies a
568    * number of "extra" bytes that would be desirable but not absolutely
569    * necessary to read.
570    *
571    * @param in the input stream to read from
572    * @param buf the buffer to read into
573    * @param bufOffset the destination offset in the buffer
574    * @param necessaryLen the number of bytes that are absolutely necessary to
575    *          read
576    * @param extraLen the number of extra bytes that would be nice to read
577    * @return true if succeeded reading the extra bytes
578    * @throws IOException if failed to read the necessary bytes
579    */
580   public static boolean readWithExtra(InputStream in, byte buf[],
581       int bufOffset, int necessaryLen, int extraLen) throws IOException {
582     int bytesRemaining = necessaryLen + extraLen;
583     while (bytesRemaining > 0) {
584       int ret = in.read(buf, bufOffset, bytesRemaining);
585       if (ret == -1 && bytesRemaining <= extraLen) {
586         // We could not read the "extra data", but that is OK.
587         break;
588       }
589 
590       if (ret < 0) {
591         throw new IOException("Premature EOF from inputStream (read "
592             + "returned " + ret + ", was trying to read " + necessaryLen
593             + " necessary bytes and " + extraLen + " extra bytes, "
594             + "successfully read "
595             + (necessaryLen + extraLen - bytesRemaining));
596       }
597       bufOffset += ret;
598       bytesRemaining -= ret;
599     }
600     return bytesRemaining <= 0;
601   }
602 
603   /**
604    * @return the on-disk size of the next block (including the header size)
605    *         that was read by peeking into the next block's header
606    */
607   public int getNextBlockOnDiskSizeWithHeader() {
608     return nextBlockOnDiskSizeWithHeader;
609   }
610 
611 
612   /**
613    * Unified version 2 {@link HFile} block writer. The intended usage pattern
614    * is as follows:
615    * <ul>
616    * <li>Construct an {@link HFileBlock.Writer}, providing a compression
617    * algorithm
618    * <li>Call {@link Writer#startWriting(BlockType, boolean)} and get a data stream to
619    * write to
620    * <li>Write your data into the stream
621    * <li>Call {@link Writer#writeHeaderAndData(FSDataOutputStream)} as many times as you need to
622    * store the serialized block into an external stream, or call
623    * {@link Writer#getHeaderAndData()} to get it as a byte array.
624    * <li>Repeat to write more blocks
625    * </ul>
626    * <p>
627    */
628   public static class Writer {
629 
630     private enum State {
631       INIT,
632       WRITING,
633       BLOCK_READY
634     };
635 
636     /** Writer state. Used to ensure the correct usage protocol. */
637     private State state = State.INIT;
638 
639     /** Compression algorithm for all blocks this instance writes. */
640     private final Compression.Algorithm compressAlgo;
641 
642     /** Data block encoder used for data blocks */
643     private final HFileDataBlockEncoder dataBlockEncoder;
644 
645     /**
646      * The stream we use to accumulate data in uncompressed format for each
647      * block. We reset this stream at the end of each block and reuse it. The
648      * header is written as the first {@link HFileBlock#headerSize(int)}  bytes into this
649      * stream.
650      */
651     private ByteArrayOutputStream baosInMemory;
652 
653     /** Compressor, which is also reused between consecutive blocks. */
654     private Compressor compressor;
655 
656     /** Compression output stream */
657     private CompressionOutputStream compressionStream;
658     
659     /** Underlying stream to write compressed bytes to */
660     private ByteArrayOutputStream compressedByteStream;
661 
662     /**
663      * Current block type. Set in {@link #startWriting(BlockType)}. Could be
664      * changed in {@link #encodeDataBlockForDisk()} from {@link BlockType#DATA}
665      * to {@link BlockType#ENCODED_DATA}.
666      */
667     private BlockType blockType;
668 
669     /**
670      * A stream that we write uncompressed bytes to, which compresses them and
671      * writes them to {@link #baosInMemory}.
672      */
673     private DataOutputStream userDataStream;
674 
675     /**
676      * Bytes to be written to the file system, including the header. Compressed
677      * if compression is turned on. It also includes the checksum data that 
678      * immediately follows the block data. (header + data + checksums)
679      */
680     private byte[] onDiskBytesWithHeader;
681 
682     /**
683      * The size of the data on disk that does not include the checksums.
684      * (header + data)
685      */
686     private int onDiskDataSizeWithHeader;
687 
688     /**
689      * The size of the checksum data on disk. It is used only if data is
690      * not compressed. If data is compressed, then the checksums are already
691      * part of onDiskBytesWithHeader. If data is uncompressed, then this
692      * variable stores the checksum data for this block.
693      */
694     private byte[] onDiskChecksum;
695 
696     /**
697      * Valid in the READY state. Contains the header and the uncompressed (but
698      * potentially encoded, if this is a data block) bytes, so the length is
699      * {@link #uncompressedSizeWithoutHeader} + {@link HFileBlock#headerSize(int)}.
700      * Does not store checksums.
701      */
702     private byte[] uncompressedBytesWithHeader;
703 
704     /**
705      * Current block's start offset in the {@link HFile}. Set in
706      * {@link #writeHeaderAndData(FSDataOutputStream)}.
707      */
708     private long startOffset;
709 
710     /**
711      * Offset of previous block by block type. Updated when the next block is
712      * started.
713      */
714     private long[] prevOffsetByType;
715 
716     /** The offset of the previous block of the same type */
717     private long prevOffset;
718 
719     /** Whether we are including memstore timestamp after every key/value */
720     private boolean includesMemstoreTS;
721 
722     /** Checksum settings */
723     private ChecksumType checksumType;
724     private int bytesPerChecksum;
725 
726     private final int minorVersion;
727 
728     /**
729      * @param compressionAlgorithm compression algorithm to use
730      * @param dataBlockEncoderAlgo data block encoding algorithm to use
731      * @param checksumType type of checksum
732      * @param bytesPerChecksum bytes per checksum
733      */
734     public Writer(Compression.Algorithm compressionAlgorithm,
735           HFileDataBlockEncoder dataBlockEncoder, boolean includesMemstoreTS,
736           int minorVersion,
737           ChecksumType checksumType, int bytesPerChecksum) {
738       this.minorVersion = minorVersion;
739       compressAlgo = compressionAlgorithm == null ? NONE : compressionAlgorithm;
740       this.dataBlockEncoder = dataBlockEncoder != null
741           ? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE;
742 
743       baosInMemory = new ByteArrayOutputStream();
744       if (compressAlgo != NONE) {
745         compressor = compressionAlgorithm.getCompressor();
746         compressedByteStream = new ByteArrayOutputStream();
747         try {
748           compressionStream =
749               compressionAlgorithm.createPlainCompressionStream(
750                   compressedByteStream, compressor);
751         } catch (IOException e) {
752           throw new RuntimeException("Could not create compression stream " + 
753               "for algorithm " + compressionAlgorithm, e);
754         }
755       }
756       if (minorVersion > MINOR_VERSION_NO_CHECKSUM
757           && bytesPerChecksum < HEADER_SIZE_WITH_CHECKSUMS) {
758         throw new RuntimeException("Unsupported value of bytesPerChecksum. " +
759             " Minimum is " + HEADER_SIZE_WITH_CHECKSUMS + " but the configured value is " +
760             bytesPerChecksum);
761       }
762       
763       prevOffsetByType = new long[BlockType.values().length];
764       for (int i = 0; i < prevOffsetByType.length; ++i)
765         prevOffsetByType[i] = -1;
766 
767       this.includesMemstoreTS = includesMemstoreTS;
768       this.checksumType = checksumType;
769       this.bytesPerChecksum = bytesPerChecksum;
770     }
771 
772     /**
773      * Starts writing into the block. The previous block's data is discarded.
774      *
775      * @return the stream the user can write their data into
776      * @throws IOException
777      */
778     public DataOutputStream startWriting(BlockType newBlockType)
779         throws IOException {
780       if (state == State.BLOCK_READY && startOffset != -1) {
781         // We had a previous block that was written to a stream at a specific
782         // offset. Save that offset as the last offset of a block of that type.
783         prevOffsetByType[blockType.getId()] = startOffset;
784       }
785 
786       startOffset = -1;
787       blockType = newBlockType;
788 
789       baosInMemory.reset();
790       baosInMemory.write(getDummyHeaderForVersion(this.minorVersion));
791 
792       state = State.WRITING;
793 
794       // We will compress it later in finishBlock()
795       userDataStream = new DataOutputStream(baosInMemory);
796       return userDataStream;
797     }
798 
799     /**
800      * Returns the stream for the user to write to. The block writer takes care
801      * of handling compression and buffering for caching on write. Can only be
802      * called in the "writing" state.
803      *
804      * @return the data output stream for the user to write to
805      */
806     DataOutputStream getUserDataStream() {
807       expectState(State.WRITING);
808       return userDataStream;
809     }
810 
811     /**
812      * Transitions the block writer from the "writing" state to the "block
813      * ready" state.  Does nothing if a block is already finished.
814      */
815     private void ensureBlockReady() throws IOException {
816       Preconditions.checkState(state != State.INIT,
817           "Unexpected state: " + state);
818 
819       if (state == State.BLOCK_READY)
820         return;
821 
822       // This will set state to BLOCK_READY.
823       finishBlock();
824     }
825 
826     /**
827      * An internal method that flushes the compressing stream (if using
828      * compression), serializes the header, and takes care of the separate
829      * uncompressed stream for caching on write, if applicable. Sets block
830      * write state to "block ready".
831      */
832     private void finishBlock() throws IOException {
833       userDataStream.flush();
834 
835       // This does an array copy, so it is safe to cache this byte array.
836       uncompressedBytesWithHeader = baosInMemory.toByteArray();
837       prevOffset = prevOffsetByType[blockType.getId()];
838 
839       // We need to set state before we can package the block up for
840       // cache-on-write. In a way, the block is ready, but not yet encoded or
841       // compressed.
842       state = State.BLOCK_READY;
843       encodeDataBlockForDisk();
844 
845       doCompressionAndChecksumming();
846     }
847 
848     /**
849      * Do compression if it is enabled, or re-use the uncompressed buffer if
850      * it is not. Fills in the compressed block's header if doing compression.
851      * Also, compute the checksums. In the case of no-compression, write the
852      * checksums to its own seperate data structure called onDiskChecksum. In
853      * the case when compression is enabled, the checksums are written to the
854      * outputbyte stream 'baos'.
855      */
856     private void doCompressionAndChecksumming() throws IOException {
857       if ( minorVersion <= MINOR_VERSION_NO_CHECKSUM) {
858         version20compression();
859       } else {
860         version21ChecksumAndCompression();
861       }
862     }
863 
864     private void version20compression() throws IOException {
865       onDiskChecksum = HConstants.EMPTY_BYTE_ARRAY;
866 
867       if (compressAlgo != NONE) {
868         compressedByteStream.reset();
869         compressedByteStream.write(DUMMY_HEADER_NO_CHECKSUM);
870 
871         compressionStream.resetState();
872 
873         compressionStream.write(uncompressedBytesWithHeader, headerSize(this.minorVersion),
874             uncompressedBytesWithHeader.length - headerSize(this.minorVersion));
875 
876 
877         compressionStream.flush();
878         compressionStream.finish();
879         onDiskDataSizeWithHeader = compressedByteStream.size(); // data size
880         onDiskBytesWithHeader = compressedByteStream.toByteArray();
881 
882         put20Header(onDiskBytesWithHeader, 0, onDiskBytesWithHeader.length,
883             uncompressedBytesWithHeader.length);
884 
885 
886         //set the header for the uncompressed bytes (for cache-on-write)
887         put20Header(uncompressedBytesWithHeader, 0,
888             onDiskBytesWithHeader.length + onDiskChecksum.length,
889             uncompressedBytesWithHeader.length);
890 
891       } else {
892         onDiskBytesWithHeader = uncompressedBytesWithHeader;
893 
894         onDiskDataSizeWithHeader = onDiskBytesWithHeader.length;
895 
896         //set the header for the uncompressed bytes
897         put20Header(uncompressedBytesWithHeader, 0,
898             onDiskBytesWithHeader.length,
899             uncompressedBytesWithHeader.length);
900       }
901     }
902 
903     private void version21ChecksumAndCompression() throws IOException {
904       // do the compression
905       if (compressAlgo != NONE) {
906         compressedByteStream.reset();
907         compressedByteStream.write(DUMMY_HEADER_WITH_CHECKSUM);
908 
909         compressionStream.resetState();
910 
911         compressionStream.write(uncompressedBytesWithHeader, headerSize(this.minorVersion),
912             uncompressedBytesWithHeader.length - headerSize(this.minorVersion));
913 
914         compressionStream.flush();
915         compressionStream.finish();
916 
917         // generate checksums
918         onDiskDataSizeWithHeader = compressedByteStream.size(); // data size
919 
920         // reserve space for checksums in the output byte stream
921         ChecksumUtil.reserveSpaceForChecksums(compressedByteStream, 
922           onDiskDataSizeWithHeader, bytesPerChecksum);
923 
924 
925         onDiskBytesWithHeader = compressedByteStream.toByteArray();
926         put21Header(onDiskBytesWithHeader, 0, onDiskBytesWithHeader.length,
927             uncompressedBytesWithHeader.length, onDiskDataSizeWithHeader);
928 
929        // generate checksums for header and data. The checksums are
930        // part of onDiskBytesWithHeader itself.
931        ChecksumUtil.generateChecksums(
932          onDiskBytesWithHeader, 0, onDiskDataSizeWithHeader,
933          onDiskBytesWithHeader, onDiskDataSizeWithHeader,
934          checksumType, bytesPerChecksum);
935 
936         // Checksums are already part of onDiskBytesWithHeader
937         onDiskChecksum = HConstants.EMPTY_BYTE_ARRAY;
938 
939         //set the header for the uncompressed bytes (for cache-on-write)
940         put21Header(uncompressedBytesWithHeader, 0,
941             onDiskBytesWithHeader.length + onDiskChecksum.length,
942             uncompressedBytesWithHeader.length, onDiskDataSizeWithHeader);
943 
944       } else {
945         // If we are not using any compression, then the
946         // checksums are written to its own array onDiskChecksum.
947         onDiskBytesWithHeader = uncompressedBytesWithHeader;
948 
949         onDiskDataSizeWithHeader = onDiskBytesWithHeader.length;
950         int numBytes = (int)ChecksumUtil.numBytes(
951                           uncompressedBytesWithHeader.length,
952                           bytesPerChecksum);
953         onDiskChecksum = new byte[numBytes];
954 
955         //set the header for the uncompressed bytes
956         put21Header(uncompressedBytesWithHeader, 0,
957             onDiskBytesWithHeader.length + onDiskChecksum.length,
958             uncompressedBytesWithHeader.length, onDiskDataSizeWithHeader);
959 
960         ChecksumUtil.generateChecksums(
961           uncompressedBytesWithHeader, 0, uncompressedBytesWithHeader.length,
962           onDiskChecksum, 0,
963           checksumType, bytesPerChecksum);
964       }
965     }
966 
967     /**
968      * Encodes this block if it is a data block and encoding is turned on in
969      * {@link #dataBlockEncoder}.
970      */
971     private void encodeDataBlockForDisk() throws IOException {
972       if (blockType != BlockType.DATA) {
973         return; // skip any non-data block
974       }
975 
976       // do data block encoding, if data block encoder is set
977       ByteBuffer rawKeyValues = ByteBuffer.wrap(uncompressedBytesWithHeader,
978           headerSize(this.minorVersion), uncompressedBytesWithHeader.length -
979           headerSize(this.minorVersion)).slice();
980       Pair<ByteBuffer, BlockType> encodingResult =
981           dataBlockEncoder.beforeWriteToDisk(rawKeyValues,
982               includesMemstoreTS, getDummyHeaderForVersion(this.minorVersion));
983 
984       BlockType encodedBlockType = encodingResult.getSecond();
985       if (encodedBlockType == BlockType.ENCODED_DATA) {
986         uncompressedBytesWithHeader = encodingResult.getFirst().array();
987         blockType = BlockType.ENCODED_DATA;
988       } else {
989         // There is no encoding configured. Do some extra sanity-checking.
990         if (encodedBlockType != BlockType.DATA) {
991           throw new IOException("Unexpected block type coming out of data " +
992               "block encoder: " + encodedBlockType);
993         }
994         if (userDataStream.size() !=
995             uncompressedBytesWithHeader.length - headerSize(this.minorVersion)) {
996           throw new IOException("Uncompressed size mismatch: "
997               + userDataStream.size() + " vs. "
998               + (uncompressedBytesWithHeader.length - headerSize(this.minorVersion)));
999         }
1000       }
1001     }
1002 
1003     /**
1004      * Put the header into the given byte array at the given offset.
1005      * @param onDiskSize size of the block on disk header + data + checksum
1006      * @param uncompressedSize size of the block after decompression (but
1007      *          before optional data block decoding) including header
1008      * @param onDiskDataSize size of the block on disk with header
1009      *        and data but not including the checksums
1010      */
1011     private void put21Header(byte[] dest, int offset, int onDiskSize,
1012                              int uncompressedSize, int onDiskDataSize) {
1013       offset = blockType.put(dest, offset);
1014       offset = Bytes.putInt(dest, offset, onDiskSize - HEADER_SIZE_WITH_CHECKSUMS);
1015       offset = Bytes.putInt(dest, offset, uncompressedSize - HEADER_SIZE_WITH_CHECKSUMS);
1016       offset = Bytes.putLong(dest, offset, prevOffset);
1017       offset = Bytes.putByte(dest, offset, checksumType.getCode());
1018       offset = Bytes.putInt(dest, offset, bytesPerChecksum);
1019       offset = Bytes.putInt(dest, offset, onDiskDataSizeWithHeader);
1020     }
1021 
1022 
1023     private void put20Header(byte[] dest, int offset, int onDiskSize,
1024                              int uncompressedSize) {
1025       offset = blockType.put(dest, offset);
1026       offset = Bytes.putInt(dest, offset, onDiskSize - HEADER_SIZE_NO_CHECKSUM);
1027       offset = Bytes.putInt(dest, offset, uncompressedSize - HEADER_SIZE_NO_CHECKSUM);
1028       Bytes.putLong(dest, offset, prevOffset);
1029     }
1030     /**
1031      * Similar to {@link #writeHeaderAndData(FSDataOutputStream)}, but records
1032      * the offset of this block so that it can be referenced in the next block
1033      * of the same type.
1034      *
1035      * @param out
1036      * @throws IOException
1037      */
1038     public void writeHeaderAndData(FSDataOutputStream out) throws IOException {
1039       long offset = out.getPos();
1040       if (startOffset != -1 && offset != startOffset) {
1041         throw new IOException("A " + blockType + " block written to a "
1042             + "stream twice, first at offset " + startOffset + ", then at "
1043             + offset);
1044       }
1045       startOffset = offset;
1046 
1047       writeHeaderAndData((DataOutputStream) out);
1048     }
1049 
1050     /**
1051      * Writes the header and the compressed data of this block (or uncompressed
1052      * data when not using compression) into the given stream. Can be called in
1053      * the "writing" state or in the "block ready" state. If called in the
1054      * "writing" state, transitions the writer to the "block ready" state.
1055      *
1056      * @param out the output stream to write the
1057      * @throws IOException
1058      */
1059     private void writeHeaderAndData(DataOutputStream out) throws IOException {
1060       ensureBlockReady();
1061       out.write(onDiskBytesWithHeader);
1062       if (compressAlgo == NONE && minorVersion > MINOR_VERSION_NO_CHECKSUM) {
1063         if (onDiskChecksum == HConstants.EMPTY_BYTE_ARRAY) {
1064           throw new IOException("A " + blockType 
1065               + " without compression should have checksums " 
1066               + " stored separately.");
1067         }
1068         out.write(onDiskChecksum);
1069       }
1070     }
1071 
1072     /**
1073      * Returns the header or the compressed data (or uncompressed data when not
1074      * using compression) as a byte array. Can be called in the "writing" state
1075      * or in the "block ready" state. If called in the "writing" state,
1076      * transitions the writer to the "block ready" state. This returns
1077      * the header + data + checksums stored on disk.
1078      *
1079      * @return header and data as they would be stored on disk in a byte array
1080      * @throws IOException
1081      */
1082     byte[] getHeaderAndDataForTest() throws IOException {
1083       ensureBlockReady();
1084       if (compressAlgo == NONE) {
1085         if (onDiskChecksum == HConstants.EMPTY_BYTE_ARRAY) {
1086           throw new IOException("A " + blockType 
1087               + " without compression should have checksums " 
1088               + " stored separately.");
1089         }
1090         // This is not very optimal, because we are doing an extra copy.
1091         // But this method is used only by unit tests.
1092         byte[] output = new byte[onDiskBytesWithHeader.length +
1093                                  onDiskChecksum.length];
1094         System.arraycopy(onDiskBytesWithHeader, 0,
1095                          output, 0, onDiskBytesWithHeader.length);
1096         System.arraycopy(onDiskChecksum, 0,
1097                          output, onDiskBytesWithHeader.length,
1098                          onDiskChecksum.length);
1099         return output;
1100       }
1101       return onDiskBytesWithHeader;
1102     }
1103 
1104     /**
1105      * Releases the compressor this writer uses to compress blocks into the
1106      * compressor pool. Needs to be called before the writer is discarded.
1107      */
1108     public void releaseCompressor() {
1109       if (compressor != null) {
1110         compressAlgo.returnCompressor(compressor);
1111         compressor = null;
1112       }
1113     }
1114 
1115     /**
1116      * Returns the on-disk size of the data portion of the block. This is the
1117      * compressed size if compression is enabled. Can only be called in the
1118      * "block ready" state. Header is not compressed, and its size is not
1119      * included in the return value.
1120      *
1121      * @return the on-disk size of the block, not including the header.
1122      */
1123     int getOnDiskSizeWithoutHeader() {
1124       expectState(State.BLOCK_READY);
1125       return onDiskBytesWithHeader.length + onDiskChecksum.length - headerSize(this.minorVersion);
1126     }
1127 
1128     /**
1129      * Returns the on-disk size of the block. Can only be called in the
1130      * "block ready" state.
1131      *
1132      * @return the on-disk size of the block ready to be written, including the
1133      *         header size, the data and the checksum data.
1134      */
1135     int getOnDiskSizeWithHeader() {
1136       expectState(State.BLOCK_READY);
1137       return onDiskBytesWithHeader.length + onDiskChecksum.length;
1138     }
1139 
1140     /**
1141      * The uncompressed size of the block data. Does not include header size.
1142      */
1143     int getUncompressedSizeWithoutHeader() {
1144       expectState(State.BLOCK_READY);
1145       return uncompressedBytesWithHeader.length - headerSize(this.minorVersion);
1146     }
1147 
1148     /**
1149      * The uncompressed size of the block data, including header size.
1150      */
1151     int getUncompressedSizeWithHeader() {
1152       expectState(State.BLOCK_READY);
1153       return uncompressedBytesWithHeader.length;
1154     }
1155 
1156     /** @return true if a block is being written  */
1157     public boolean isWriting() {
1158       return state == State.WRITING;
1159     }
1160 
1161     /**
1162      * Returns the number of bytes written into the current block so far, or
1163      * zero if not writing the block at the moment. Note that this will return
1164      * zero in the "block ready" state as well.
1165      *
1166      * @return the number of bytes written
1167      */
1168     public int blockSizeWritten() {
1169       if (state != State.WRITING)
1170         return 0;
1171       return userDataStream.size();
1172     }
1173 
1174     /**
1175      * Returns the header followed by the uncompressed data, even if using
1176      * compression. This is needed for storing uncompressed blocks in the block
1177      * cache. Can be called in the "writing" state or the "block ready" state.
1178      * Returns only the header and data, does not include checksum data.
1179      *
1180      * @return uncompressed block bytes for caching on write
1181      */
1182     ByteBuffer getUncompressedBufferWithHeader() {
1183       expectState(State.BLOCK_READY);
1184       return ByteBuffer.wrap(uncompressedBytesWithHeader);
1185     }
1186 
1187     private void expectState(State expectedState) {
1188       if (state != expectedState) {
1189         throw new IllegalStateException("Expected state: " + expectedState +
1190             ", actual state: " + state);
1191       }
1192     }
1193 
1194     /**
1195      * Takes the given {@link BlockWritable} instance, creates a new block of
1196      * its appropriate type, writes the writable into this block, and flushes
1197      * the block into the output stream. The writer is instructed not to buffer
1198      * uncompressed bytes for cache-on-write.
1199      *
1200      * @param bw the block-writable object to write as a block
1201      * @param out the file system output stream
1202      * @throws IOException
1203      */
1204     public void writeBlock(BlockWritable bw, FSDataOutputStream out)
1205         throws IOException {
1206       bw.writeToBlock(startWriting(bw.getBlockType()));
1207       writeHeaderAndData(out);
1208     }
1209 
1210     /**
1211      * Creates a new HFileBlock. Checksums have already been validated, so
1212      * the byte buffer passed into the constructor of this newly created
1213      * block does not have checksum data even though the header minor 
1214      * version is MINOR_VERSION_WITH_CHECKSUM. This is indicated by setting a
1215      * 0 value in bytesPerChecksum.
1216      */
1217     public HFileBlock getBlockForCaching() {
1218       return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(),
1219           getUncompressedSizeWithoutHeader(), prevOffset,
1220           getUncompressedBufferWithHeader(), DONT_FILL_HEADER, startOffset,
1221           includesMemstoreTS, this.minorVersion,
1222           0, ChecksumType.NULL.getCode(),  // no checksums in cached data
1223           onDiskBytesWithHeader.length + onDiskChecksum.length);
1224     }
1225   }
1226 
1227   /** Something that can be written into a block. */
1228   public interface BlockWritable {
1229 
1230     /** The type of block this data should use. */
1231     BlockType getBlockType();
1232 
1233     /**
1234      * Writes the block to the provided stream. Must not write any magic
1235      * records.
1236      *
1237      * @param out a stream to write uncompressed data into
1238      */
1239     void writeToBlock(DataOutput out) throws IOException;
1240   }
1241 
1242   // Block readers and writers
1243 
1244   /** An interface allowing to iterate {@link HFileBlock}s. */
1245   public interface BlockIterator {
1246 
1247     /**
1248      * Get the next block, or null if there are no more blocks to iterate.
1249      */
1250     HFileBlock nextBlock() throws IOException;
1251 
1252     /**
1253      * Similar to {@link #nextBlock()} but checks block type, throws an
1254      * exception if incorrect, and returns the HFile block
1255      */
1256     HFileBlock nextBlockWithBlockType(BlockType blockType) throws IOException;
1257   }
1258 
1259   /** A full-fledged reader with iteration ability. */
1260   public interface FSReader {
1261 
1262     /**
1263      * Reads the block at the given offset in the file with the given on-disk
1264      * size and uncompressed size.
1265      *
1266      * @param offset
1267      * @param onDiskSize the on-disk size of the entire block, including all
1268      *          applicable headers, or -1 if unknown
1269      * @param uncompressedSize the uncompressed size of the compressed part of
1270      *          the block, or -1 if unknown
1271      * @return the newly read block
1272      */
1273     HFileBlock readBlockData(long offset, long onDiskSize,
1274         int uncompressedSize, boolean pread) throws IOException;
1275 
1276     /**
1277      * Creates a block iterator over the given portion of the {@link HFile}.
1278      * The iterator returns blocks starting with offset such that offset <=
1279      * startOffset < endOffset.
1280      *
1281      * @param startOffset the offset of the block to start iteration with
1282      * @param endOffset the offset to end iteration at (exclusive)
1283      * @return an iterator of blocks between the two given offsets
1284      */
1285     BlockIterator blockRange(long startOffset, long endOffset);
1286   }
1287 
1288   /**
1289    * A common implementation of some methods of {@link FSReader} and some
1290    * tools for implementing HFile format version-specific block readers.
1291    */
1292   private abstract static class AbstractFSReader implements FSReader {
1293 
1294     /** The file system stream of the underlying {@link HFile} that 
1295      * does checksum validations in the filesystem */
1296     protected final FSDataInputStream istream;
1297 
1298     /** The file system stream of the underlying {@link HFile} that
1299      * does not do checksum verification in the file system */
1300     protected final FSDataInputStream istreamNoFsChecksum;
1301 
1302     /** Compression algorithm used by the {@link HFile} */
1303     protected Compression.Algorithm compressAlgo;
1304 
1305     /** The size of the file we are reading from, or -1 if unknown. */
1306     protected long fileSize;
1307 
1308     /** The minor version of this reader */
1309     private int minorVersion;
1310 
1311     /** The size of the header */
1312     protected int hdrSize;
1313 
1314     /** The filesystem used to access data */
1315     protected HFileSystem hfs;
1316 
1317     /** The path (if any) where this data is coming from */
1318     protected Path path;
1319 
1320     private final Lock streamLock = new ReentrantLock();
1321 
1322     /** The default buffer size for our buffered streams */
1323     public static final int DEFAULT_BUFFER_SIZE = 1 << 20;
1324 
1325     public AbstractFSReader(FSDataInputStream istream, 
1326         FSDataInputStream istreamNoFsChecksum,
1327         Algorithm compressAlgo,
1328         long fileSize, int minorVersion, HFileSystem hfs, Path path) 
1329         throws IOException {
1330       this.istream = istream;
1331       this.compressAlgo = compressAlgo;
1332       this.fileSize = fileSize;
1333       this.minorVersion = minorVersion;
1334       this.hfs = hfs;
1335       this.path = path;
1336       this.hdrSize = headerSize(minorVersion);
1337       this.istreamNoFsChecksum = istreamNoFsChecksum;
1338     }
1339 
1340     @Override
1341     public BlockIterator blockRange(final long startOffset,
1342         final long endOffset) {
1343       return new BlockIterator() {
1344         private long offset = startOffset;
1345 
1346         @Override
1347         public HFileBlock nextBlock() throws IOException {
1348           if (offset >= endOffset)
1349             return null;
1350           HFileBlock b = readBlockData(offset, -1, -1, false);
1351           offset += b.getOnDiskSizeWithHeader();
1352           return b;
1353         }
1354 
1355         @Override
1356         public HFileBlock nextBlockWithBlockType(BlockType blockType)
1357             throws IOException {
1358           HFileBlock blk = nextBlock();
1359           if (blk.getBlockType() != blockType) {
1360             throw new IOException("Expected block of type " + blockType
1361                 + " but found " + blk.getBlockType());
1362           }
1363           return blk;
1364         }
1365       };
1366     }
1367 
1368     /**
1369      * Does a positional read or a seek and read into the given buffer. Returns
1370      * the on-disk size of the next block, or -1 if it could not be determined.
1371      *
1372      * @param dest destination buffer
1373      * @param destOffset offset in the destination buffer
1374      * @param size size of the block to be read
1375      * @param peekIntoNextBlock whether to read the next block's on-disk size
1376      * @param fileOffset position in the stream to read at
1377      * @param pread whether we should do a positional read
1378      * @param istream The input source of data
1379      * @return the on-disk size of the next block with header size included, or
1380      *         -1 if it could not be determined
1381      * @throws IOException
1382      */
1383     protected int readAtOffset(FSDataInputStream istream,
1384         byte[] dest, int destOffset, int size,
1385         boolean peekIntoNextBlock, long fileOffset, boolean pread)
1386         throws IOException {
1387       if (peekIntoNextBlock &&
1388           destOffset + size + hdrSize > dest.length) {
1389         // We are asked to read the next block's header as well, but there is
1390         // not enough room in the array.
1391         throw new IOException("Attempted to read " + size + " bytes and " +
1392             hdrSize + " bytes of next header into a " + dest.length +
1393             "-byte array at offset " + destOffset);
1394       }
1395 
1396       if (!pread && streamLock.tryLock()) {
1397         // Seek + read. Better for scanning.
1398         try {
1399           istream.seek(fileOffset);
1400 
1401           long realOffset = istream.getPos();
1402           if (realOffset != fileOffset) {
1403             throw new IOException("Tried to seek to " + fileOffset + " to "
1404                 + "read " + size + " bytes, but pos=" + realOffset
1405                 + " after seek");
1406           }
1407 
1408           if (!peekIntoNextBlock) {
1409             IOUtils.readFully(istream, dest, destOffset, size);
1410             return -1;
1411           }
1412 
1413           // Try to read the next block header.
1414           if (!readWithExtra(istream, dest, destOffset, size, hdrSize))
1415             return -1;
1416         } finally {
1417           streamLock.unlock();
1418         }
1419       } else {
1420         // Positional read. Better for random reads; or when the streamLock is already locked.
1421         int extraSize = peekIntoNextBlock ? hdrSize : 0;
1422 
1423         int ret = istream.read(fileOffset, dest, destOffset, size + extraSize);
1424         if (ret < size) {
1425           throw new IOException("Positional read of " + size + " bytes " +
1426               "failed at offset " + fileOffset + " (returned " + ret + ")");
1427         }
1428 
1429         if (ret == size || ret < size + extraSize) {
1430           // Could not read the next block's header, or did not try.
1431           return -1;
1432         }
1433       }
1434 
1435       assert peekIntoNextBlock;
1436       return Bytes.toInt(dest, destOffset + size + BlockType.MAGIC_LENGTH) +
1437           hdrSize;
1438     }
1439 
1440     /**
1441      * Decompresses data from the given stream using the configured compression
1442      * algorithm.
1443      * @param dest
1444      * @param destOffset
1445      * @param bufferedBoundedStream
1446      *          a stream to read compressed data from, bounded to the exact
1447      *          amount of compressed data
1448      * @param uncompressedSize
1449      *          uncompressed data size, header not included
1450      * @throws IOException
1451      */
1452     protected void decompress(byte[] dest, int destOffset,
1453         InputStream bufferedBoundedStream,
1454         int uncompressedSize) throws IOException {
1455       Decompressor decompressor = null;
1456       try {
1457         decompressor = compressAlgo.getDecompressor();
1458         InputStream is = compressAlgo.createDecompressionStream(
1459             bufferedBoundedStream, decompressor, 0);
1460 
1461         IOUtils.readFully(is, dest, destOffset, uncompressedSize);
1462         is.close();
1463       } finally {
1464         if (decompressor != null) {
1465           compressAlgo.returnDecompressor(decompressor);
1466         }
1467       }
1468     }
1469 
1470     /**
1471      * Creates a buffered stream reading a certain slice of the file system
1472      * input stream. We need this because the decompression we use seems to
1473      * expect the input stream to be bounded.
1474      *
1475      * @param offset the starting file offset the bounded stream reads from
1476      * @param size the size of the segment of the file the stream should read
1477      * @param pread whether to use position reads
1478      * @return a stream restricted to the given portion of the file
1479      */
1480     protected InputStream createBufferedBoundedStream(long offset,
1481         int size, boolean pread) {
1482       return new BufferedInputStream(new BoundedRangeFileInputStream(istream,
1483           offset, size, pread), Math.min(DEFAULT_BUFFER_SIZE, size));
1484     }
1485 
1486     /**
1487      * @return The minorVersion of this HFile
1488      */
1489     protected int getMinorVersion() {
1490       return minorVersion;
1491     }
1492   }
1493 
1494   /**
1495    * Reads version 1 blocks from the file system. In version 1 blocks,
1496    * everything is compressed, including the magic record, if compression is
1497    * enabled. Everything might be uncompressed if no compression is used. This
1498    * reader returns blocks represented in the uniform version 2 format in
1499    * memory.
1500    */
1501   static class FSReaderV1 extends AbstractFSReader {
1502 
1503     /** Header size difference between version 1 and 2 */
1504     private static final int HEADER_DELTA = HEADER_SIZE_NO_CHECKSUM - 
1505                                             MAGIC_LENGTH;
1506 
1507     public FSReaderV1(FSDataInputStream istream, Algorithm compressAlgo,
1508         long fileSize) throws IOException {
1509       super(istream, istream, compressAlgo, fileSize, 0, null, null);
1510     }
1511 
1512     /**
1513      * Read a version 1 block. There is no uncompressed header, and the block
1514      * type (the magic record) is part of the compressed data. This
1515      * implementation assumes that the bounded range file input stream is
1516      * needed to stop the decompressor reading into next block, because the
1517      * decompressor just grabs a bunch of data without regard to whether it is
1518      * coming to end of the compressed section.
1519      *
1520      * The block returned is still a version 2 block, and in particular, its
1521      * first {@link #HEADER_SIZE_WITH_CHECKSUMS} bytes contain a valid version 2 header.
1522      *
1523      * @param offset the offset of the block to read in the file
1524      * @param onDiskSizeWithMagic the on-disk size of the version 1 block,
1525      *          including the magic record, which is the part of compressed
1526      *          data if using compression
1527      * @param uncompressedSizeWithMagic uncompressed size of the version 1
1528      *          block, including the magic record
1529      */
1530     @Override
1531     public HFileBlock readBlockData(long offset, long onDiskSizeWithMagic,
1532         int uncompressedSizeWithMagic, boolean pread) throws IOException {
1533       if (uncompressedSizeWithMagic <= 0) {
1534         throw new IOException("Invalid uncompressedSize="
1535             + uncompressedSizeWithMagic + " for a version 1 block");
1536       }
1537 
1538       if (onDiskSizeWithMagic <= 0 || onDiskSizeWithMagic >= Integer.MAX_VALUE)
1539       {
1540         throw new IOException("Invalid onDiskSize=" + onDiskSizeWithMagic
1541             + " (maximum allowed: " + Integer.MAX_VALUE + ")");
1542       }
1543 
1544       int onDiskSize = (int) onDiskSizeWithMagic;
1545 
1546       if (uncompressedSizeWithMagic < MAGIC_LENGTH) {
1547         throw new IOException("Uncompressed size for a version 1 block is "
1548             + uncompressedSizeWithMagic + " but must be at least "
1549             + MAGIC_LENGTH);
1550       }
1551 
1552       // The existing size already includes magic size, and we are inserting
1553       // a version 2 header.
1554       ByteBuffer buf = ByteBuffer.allocate(uncompressedSizeWithMagic
1555           + HEADER_DELTA);
1556 
1557       int onDiskSizeWithoutHeader;
1558       if (compressAlgo == Compression.Algorithm.NONE) {
1559         // A special case when there is no compression.
1560         if (onDiskSize != uncompressedSizeWithMagic) {
1561           throw new IOException("onDiskSize=" + onDiskSize
1562               + " and uncompressedSize=" + uncompressedSizeWithMagic
1563               + " must be equal for version 1 with no compression");
1564         }
1565 
1566         // The first MAGIC_LENGTH bytes of what this will read will be
1567         // overwritten.
1568         readAtOffset(istream, buf.array(), buf.arrayOffset() + HEADER_DELTA,
1569             onDiskSize, false, offset, pread);
1570 
1571         onDiskSizeWithoutHeader = uncompressedSizeWithMagic - MAGIC_LENGTH;
1572       } else {
1573         InputStream bufferedBoundedStream = createBufferedBoundedStream(
1574             offset, onDiskSize, pread);
1575         decompress(buf.array(), buf.arrayOffset() + HEADER_DELTA,
1576             bufferedBoundedStream, uncompressedSizeWithMagic);
1577 
1578         // We don't really have a good way to exclude the "magic record" size
1579         // from the compressed block's size, since it is compressed as well.
1580         onDiskSizeWithoutHeader = onDiskSize;
1581       }
1582 
1583       BlockType newBlockType = BlockType.parse(buf.array(), buf.arrayOffset()
1584           + HEADER_DELTA, MAGIC_LENGTH);
1585 
1586       // We set the uncompressed size of the new HFile block we are creating
1587       // to the size of the data portion of the block without the magic record,
1588       // since the magic record gets moved to the header.
1589       HFileBlock b = new HFileBlock(newBlockType, onDiskSizeWithoutHeader,
1590           uncompressedSizeWithMagic - MAGIC_LENGTH, -1L, buf, FILL_HEADER,
1591           offset, MemStore.NO_PERSISTENT_TS, 0, 0, ChecksumType.NULL.getCode(),
1592           onDiskSizeWithoutHeader + HEADER_SIZE_NO_CHECKSUM);
1593       return b;
1594     }
1595   }
1596 
1597   /**
1598    * We always prefetch the header of the next block, so that we know its
1599    * on-disk size in advance and can read it in one operation.
1600    */
1601   private static class PrefetchedHeader {
1602     long offset = -1;
1603     byte[] header = new byte[HEADER_SIZE_WITH_CHECKSUMS];
1604     ByteBuffer buf = ByteBuffer.wrap(header, 0, HEADER_SIZE_WITH_CHECKSUMS);
1605   }
1606 
1607   /** Reads version 2 blocks from the filesystem. */
1608   static class FSReaderV2 extends AbstractFSReader {
1609 
1610     // The configuration states that we should validate hbase checksums
1611     private final boolean useHBaseChecksumConfigured;
1612 
1613     // Record the current state of this reader with respect to
1614     // validating checkums in HBase. This is originally set the same
1615     // value as useHBaseChecksumConfigured, but can change state as and when
1616     // we encounter checksum verification failures.
1617     private volatile boolean useHBaseChecksum;
1618 
1619     // In the case of a checksum failure, do these many succeeding
1620     // reads without hbase checksum verification.
1621     private volatile int checksumOffCount = -1;
1622 
1623     /** Whether we include memstore timestamp in data blocks */
1624     protected boolean includesMemstoreTS;
1625 
1626     /** Data block encoding used to read from file */
1627     protected HFileDataBlockEncoder dataBlockEncoder =
1628         NoOpDataBlockEncoder.INSTANCE;
1629 
1630     private ThreadLocal<PrefetchedHeader> prefetchedHeaderForThread =
1631         new ThreadLocal<PrefetchedHeader>() {
1632           @Override
1633           public PrefetchedHeader initialValue() {
1634             return new PrefetchedHeader();
1635           }
1636         };
1637 
1638     public FSReaderV2(FSDataInputStream istream, 
1639         FSDataInputStream istreamNoFsChecksum, Algorithm compressAlgo,
1640         long fileSize, int minorVersion, HFileSystem hfs, Path path) 
1641       throws IOException {
1642       super(istream, istreamNoFsChecksum, compressAlgo, fileSize, 
1643             minorVersion, hfs, path);
1644 
1645       if (hfs != null) {
1646         // Check the configuration to determine whether hbase-level
1647         // checksum verification is needed or not.
1648         useHBaseChecksum = hfs.useHBaseChecksum();
1649       } else {
1650         // The configuration does not specify anything about hbase checksum
1651         // validations. Set it to true here assuming that we will verify
1652         // hbase checksums for all reads. For older files that do not have 
1653         // stored checksums, this flag will be reset later.
1654         useHBaseChecksum = true;
1655       }
1656 
1657       // for older versions, hbase did not store checksums.
1658       if (getMinorVersion() < MINOR_VERSION_WITH_CHECKSUM) {
1659         useHBaseChecksum = false;
1660       }
1661       this.useHBaseChecksumConfigured = useHBaseChecksum;
1662     }
1663 
1664     /**
1665      * A constructor that reads files with the latest minor version.
1666      * This is used by unit tests only.
1667      */
1668     FSReaderV2(FSDataInputStream istream, Algorithm compressAlgo,
1669         long fileSize) throws IOException {
1670       this(istream, istream, compressAlgo, fileSize,
1671            HFileReaderV2.MAX_MINOR_VERSION, null, null);
1672     }
1673 
1674     /**
1675      * Reads a version 2 block. Tries to do as little memory allocation as
1676      * possible, using the provided on-disk size.
1677      *
1678      * @param offset the offset in the stream to read at
1679      * @param onDiskSizeWithHeaderL the on-disk size of the block, including
1680      *          the header, or -1 if unknown
1681      * @param uncompressedSize the uncompressed size of the the block. Always
1682      *          expected to be -1. This parameter is only used in version 1.
1683      * @param pread whether to use a positional read
1684      */
1685     @Override
1686     public HFileBlock readBlockData(long offset, long onDiskSizeWithHeaderL,
1687         int uncompressedSize, boolean pread) throws IOException {
1688 
1689       // It is ok to get a reference to the stream here without any
1690       // locks because it is marked final.
1691       FSDataInputStream is = this.istreamNoFsChecksum;
1692 
1693       // get a copy of the current state of whether to validate
1694       // hbase checksums or not for this read call. This is not 
1695       // thread-safe but the one constaint is that if we decide 
1696       // to skip hbase checksum verification then we are 
1697       // guaranteed to use hdfs checksum verification.
1698       boolean doVerificationThruHBaseChecksum = this.useHBaseChecksum;
1699       if (!doVerificationThruHBaseChecksum) {
1700         is = this.istream;
1701       }
1702                      
1703       HFileBlock blk = readBlockDataInternal(is, offset, 
1704                          onDiskSizeWithHeaderL, 
1705                          uncompressedSize, pread,
1706                          doVerificationThruHBaseChecksum);
1707       if (blk == null) {
1708         HFile.LOG.warn("HBase checksum verification failed for file " +
1709                        path + " at offset " +
1710                        offset + " filesize " + fileSize +
1711                        ". Retrying read with HDFS checksums turned on...");
1712 
1713         if (!doVerificationThruHBaseChecksum) {
1714           String msg = "HBase checksum verification failed for file " +
1715                        path + " at offset " +
1716                        offset + " filesize " + fileSize + 
1717                        " but this cannot happen because doVerify is " +
1718                        doVerificationThruHBaseChecksum;
1719           HFile.LOG.warn(msg);
1720           throw new IOException(msg); // cannot happen case here
1721         }
1722         HFile.checksumFailures.incrementAndGet(); // update metrics
1723  
1724         // If we have a checksum failure, we fall back into a mode where
1725         // the next few reads use HDFS level checksums. We aim to make the
1726         // next CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD reads avoid
1727         // hbase checksum verification, but since this value is set without
1728         // holding any locks, it can so happen that we might actually do
1729         // a few more than precisely this number.
1730         this.checksumOffCount = CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD;
1731         this.useHBaseChecksum = false;
1732         doVerificationThruHBaseChecksum = false;
1733         is = this.istream;
1734         blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL,
1735                                     uncompressedSize, pread,
1736                                     doVerificationThruHBaseChecksum);
1737         if (blk != null) {
1738           HFile.LOG.warn("HDFS checksum verification suceeded for file " +
1739                          path + " at offset " +
1740                          offset + " filesize " + fileSize);
1741         }
1742       } 
1743       if (blk == null && !doVerificationThruHBaseChecksum) {
1744         String msg = "readBlockData failed, possibly due to " +
1745                      "checksum verification failed for file " + path +
1746                      " at offset " + offset + " filesize " + fileSize;
1747         HFile.LOG.warn(msg);
1748         throw new IOException(msg);
1749       }
1750 
1751       // If there is a checksum mismatch earlier, then retry with 
1752       // HBase checksums switched off and use HDFS checksum verification.
1753       // This triggers HDFS to detect and fix corrupt replicas. The
1754       // next checksumOffCount read requests will use HDFS checksums.
1755       // The decrementing of this.checksumOffCount is not thread-safe,
1756       // but it is harmless because eventually checksumOffCount will be
1757       // a negative number.
1758       if (!this.useHBaseChecksum && this.useHBaseChecksumConfigured) {
1759         if (this.checksumOffCount-- < 0) {
1760           this.useHBaseChecksum = true; // auto re-enable hbase checksums
1761         }
1762       }
1763       return blk;
1764     }
1765 
1766     /**
1767      * Reads a version 2 block. 
1768      *
1769      * @param offset the offset in the stream to read at
1770      * @param onDiskSizeWithHeaderL the on-disk size of the block, including
1771      *          the header, or -1 if unknown
1772      * @param uncompressedSize the uncompressed size of the the block. Always
1773      *          expected to be -1. This parameter is only used in version 1.
1774      * @param pread whether to use a positional read
1775      * @param verifyChecksum Whether to use HBase checksums. 
1776      *        If HBase checksum is switched off, then use HDFS checksum.
1777      * @return the HFileBlock or null if there is a HBase checksum mismatch
1778      */
1779     private HFileBlock readBlockDataInternal(FSDataInputStream is, long offset, 
1780         long onDiskSizeWithHeaderL,
1781         int uncompressedSize, boolean pread, boolean verifyChecksum) 
1782         throws IOException {
1783       if (offset < 0) {
1784         throw new IOException("Invalid offset=" + offset + " trying to read "
1785             + "block (onDiskSize=" + onDiskSizeWithHeaderL
1786             + ", uncompressedSize=" + uncompressedSize + ")");
1787       }
1788       if (uncompressedSize != -1) {
1789         throw new IOException("Version 2 block reader API does not need " +
1790             "the uncompressed size parameter");
1791       }
1792 
1793       if ((onDiskSizeWithHeaderL < hdrSize && onDiskSizeWithHeaderL != -1)
1794           || onDiskSizeWithHeaderL >= Integer.MAX_VALUE) {
1795         throw new IOException("Invalid onDisksize=" + onDiskSizeWithHeaderL
1796             + ": expected to be at least " + hdrSize
1797             + " and at most " + Integer.MAX_VALUE + ", or -1 (offset="
1798             + offset + ", uncompressedSize=" + uncompressedSize + ")");
1799       }
1800 
1801       int onDiskSizeWithHeader = (int) onDiskSizeWithHeaderL;
1802 
1803       HFileBlock b;
1804       if (onDiskSizeWithHeader > 0) {
1805         // We know the total on-disk size but not the uncompressed size. Read
1806         // the entire block into memory, then parse the header and decompress
1807         // from memory if using compression. This code path is used when
1808         // doing a random read operation relying on the block index, as well as
1809         // when the client knows the on-disk size from peeking into the next
1810         // block's header (e.g. this block's header) when reading the previous
1811         // block. This is the faster and more preferable case.
1812 
1813         int onDiskSizeWithoutHeader = onDiskSizeWithHeader - hdrSize;
1814         assert onDiskSizeWithoutHeader >= 0;
1815 
1816         // See if we can avoid reading the header. This is desirable, because
1817         // we will not incur a seek operation to seek back if we have already
1818         // read this block's header as part of the previous read's look-ahead.
1819         PrefetchedHeader prefetchedHeader = prefetchedHeaderForThread.get();
1820         byte[] header = prefetchedHeader.offset == offset
1821             ? prefetchedHeader.header : null;
1822 
1823         // Size that we have to skip in case we have already read the header.
1824         int preReadHeaderSize = header == null ? 0 : hdrSize;
1825 
1826         if (compressAlgo == Compression.Algorithm.NONE) {
1827           // Just read the whole thing. Allocate enough space to read the
1828           // next block's header too.
1829 
1830           ByteBuffer headerAndData = ByteBuffer.allocate(onDiskSizeWithHeader
1831               + hdrSize);
1832           headerAndData.limit(onDiskSizeWithHeader);
1833 
1834           if (header != null) {
1835             System.arraycopy(header, 0, headerAndData.array(), 0,
1836                 hdrSize);
1837           }
1838 
1839           int nextBlockOnDiskSizeWithHeader = readAtOffset(is,
1840               headerAndData.array(), headerAndData.arrayOffset()
1841                   + preReadHeaderSize, onDiskSizeWithHeader
1842                   - preReadHeaderSize, true, offset + preReadHeaderSize,
1843                   pread);
1844 
1845           b = new HFileBlock(headerAndData, getMinorVersion());
1846           b.assumeUncompressed();
1847           b.validateOnDiskSizeWithoutHeader(onDiskSizeWithoutHeader);
1848           b.nextBlockOnDiskSizeWithHeader = nextBlockOnDiskSizeWithHeader;
1849           if (verifyChecksum &&
1850               !validateBlockChecksum(b, headerAndData.array(), hdrSize)) {
1851             return null;             // checksum mismatch
1852           }
1853           if (b.nextBlockOnDiskSizeWithHeader > 0)
1854             setNextBlockHeader(offset, b);
1855         } else {
1856           // Allocate enough space to fit the next block's header too.
1857           byte[] onDiskBlock = new byte[onDiskSizeWithHeader + hdrSize];
1858 
1859           int nextBlockOnDiskSize = readAtOffset(is, onDiskBlock,
1860               preReadHeaderSize, onDiskSizeWithHeader - preReadHeaderSize,
1861               true, offset + preReadHeaderSize, pread);
1862 
1863           if (header == null)
1864             header = onDiskBlock;
1865 
1866           try {
1867             b = new HFileBlock(ByteBuffer.wrap(header, 0, hdrSize), 
1868                                getMinorVersion());
1869           } catch (IOException ex) {
1870             // Seen in load testing. Provide comprehensive debug info.
1871             throw new IOException("Failed to read compressed block at "
1872                 + offset + ", onDiskSizeWithoutHeader=" + onDiskSizeWithHeader
1873                 + ", preReadHeaderSize=" + preReadHeaderSize
1874                 + ", header.length=" + header.length + ", header bytes: "
1875                 + Bytes.toStringBinary(header, 0, hdrSize), ex);
1876           }
1877           b.validateOnDiskSizeWithoutHeader(onDiskSizeWithoutHeader);
1878           b.nextBlockOnDiskSizeWithHeader = nextBlockOnDiskSize;
1879           if (verifyChecksum && 
1880               !validateBlockChecksum(b, onDiskBlock, hdrSize)) {
1881             return null;             // checksum mismatch
1882           }
1883 
1884           DataInputStream dis = new DataInputStream(new ByteArrayInputStream(
1885               onDiskBlock, hdrSize, onDiskSizeWithoutHeader));
1886 
1887           // This will allocate a new buffer but keep header bytes.
1888           b.allocateBuffer(b.nextBlockOnDiskSizeWithHeader > 0);
1889 
1890           decompress(b.buf.array(), b.buf.arrayOffset() + hdrSize, dis,
1891               b.uncompressedSizeWithoutHeader);
1892 
1893           // Copy next block's header bytes into the new block if we have them.
1894           if (nextBlockOnDiskSize > 0) {
1895             System.arraycopy(onDiskBlock, onDiskSizeWithHeader, b.buf.array(),
1896                 b.buf.arrayOffset() + hdrSize
1897                     + b.uncompressedSizeWithoutHeader + b.totalChecksumBytes(), 
1898                 hdrSize);
1899 
1900             setNextBlockHeader(offset, b);
1901           }
1902         }
1903 
1904       } else {
1905         // We don't know the on-disk size. Read the header first, determine the
1906         // on-disk size from it, and read the remaining data, thereby incurring
1907         // two read operations. This might happen when we are doing the first
1908         // read in a series of reads or a random read, and we don't have access
1909         // to the block index. This is costly and should happen very rarely.
1910 
1911         // Check if we have read this block's header as part of reading the
1912         // previous block. If so, don't read the header again.
1913         PrefetchedHeader prefetchedHeader = prefetchedHeaderForThread.get();
1914         ByteBuffer headerBuf = prefetchedHeader.offset == offset ?
1915             prefetchedHeader.buf : null;
1916 
1917         if (headerBuf == null) {
1918           // Unfortunately, we still have to do a separate read operation to
1919           // read the header.
1920           headerBuf = ByteBuffer.allocate(hdrSize);
1921           readAtOffset(is, headerBuf.array(), headerBuf.arrayOffset(), hdrSize,
1922               false, offset, pread);
1923         }
1924 
1925         b = new HFileBlock(headerBuf, getMinorVersion());
1926 
1927         // This will also allocate enough room for the next block's header.
1928         b.allocateBuffer(true);
1929 
1930         if (compressAlgo == Compression.Algorithm.NONE) {
1931 
1932           // Avoid creating bounded streams and using a "codec" that does
1933           // nothing.
1934           b.assumeUncompressed();
1935           b.nextBlockOnDiskSizeWithHeader = readAtOffset(is, b.buf.array(),
1936               b.buf.arrayOffset() + hdrSize,
1937               b.uncompressedSizeWithoutHeader + b.totalChecksumBytes(), 
1938               true, offset + hdrSize,
1939               pread);
1940           if (verifyChecksum && 
1941               !validateBlockChecksum(b, b.buf.array(), hdrSize)) {
1942             return null;             // checksum mismatch
1943           }
1944 
1945           if (b.nextBlockOnDiskSizeWithHeader > 0) {
1946             setNextBlockHeader(offset, b);
1947           }
1948         } else {
1949           // Allocate enough space for the block's header and compressed data.
1950           byte[] compressedBytes = new byte[b.getOnDiskSizeWithHeader()
1951               + hdrSize];
1952 
1953           b.nextBlockOnDiskSizeWithHeader = readAtOffset(is, compressedBytes,
1954               hdrSize, b.onDiskSizeWithoutHeader, true, offset
1955                   + hdrSize, pread);
1956           if (verifyChecksum &&
1957               !validateBlockChecksum(b, compressedBytes, hdrSize)) {
1958             return null;             // checksum mismatch
1959           }
1960           DataInputStream dis = new DataInputStream(new ByteArrayInputStream(
1961               compressedBytes, hdrSize, b.onDiskSizeWithoutHeader));
1962 
1963           decompress(b.buf.array(), b.buf.arrayOffset() + hdrSize, dis,
1964               b.uncompressedSizeWithoutHeader);
1965 
1966           if (b.nextBlockOnDiskSizeWithHeader > 0) {
1967             // Copy the next block's header into the new block.
1968             int nextHeaderOffset = b.buf.arrayOffset() + hdrSize
1969                 + b.uncompressedSizeWithoutHeader + b.totalChecksumBytes();
1970             System.arraycopy(compressedBytes,
1971                 compressedBytes.length - hdrSize,
1972                 b.buf.array(),
1973                 nextHeaderOffset,
1974                 hdrSize);
1975 
1976             setNextBlockHeader(offset, b);
1977           }
1978         }
1979       }
1980 
1981       b.includesMemstoreTS = includesMemstoreTS;
1982       b.offset = offset;
1983       return b;
1984     }
1985 
1986     private void setNextBlockHeader(long offset, HFileBlock b) {
1987       PrefetchedHeader prefetchedHeader = prefetchedHeaderForThread.get();
1988       prefetchedHeader.offset = offset + b.getOnDiskSizeWithHeader();
1989       int nextHeaderOffset = b.buf.arrayOffset() + hdrSize
1990           + b.uncompressedSizeWithoutHeader + b.totalChecksumBytes();
1991       System.arraycopy(b.buf.array(), nextHeaderOffset,
1992           prefetchedHeader.header, 0, hdrSize);
1993     }
1994 
1995     void setIncludesMemstoreTS(boolean enabled) {
1996       includesMemstoreTS = enabled;
1997     }
1998 
1999     void setDataBlockEncoder(HFileDataBlockEncoder encoder) {
2000       this.dataBlockEncoder = encoder;
2001     }
2002 
2003     /**
2004      * Generates the checksum for the header as well as the data and
2005      * then validates that it matches the value stored in the header.
2006      * If there is a checksum mismatch, then return false. Otherwise
2007      * return true.
2008      */
2009     protected boolean validateBlockChecksum(HFileBlock block, 
2010       byte[] data, int hdrSize) throws IOException {
2011       return ChecksumUtil.validateBlockChecksum(path, block,
2012                                                 data, hdrSize);
2013     }
2014   }
2015 
2016   @Override
2017   public int getSerializedLength() {
2018     if (buf != null) {
2019       return this.buf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE;
2020     }
2021     return 0;
2022   }
2023 
2024   @Override
2025   public void serialize(ByteBuffer destination) {
2026     destination.put(this.buf.duplicate());
2027     destination.putLong(this.offset);
2028     destination.putInt(this.nextBlockOnDiskSizeWithHeader);
2029     destination.rewind();
2030   }
2031 
2032   @Override
2033   public CacheableDeserializer<Cacheable> getDeserializer() {
2034     return HFileBlock.blockDeserializer;
2035   }
2036 
2037   @Override
2038   public boolean equals(Object comparison) {
2039     if (this == comparison) {
2040       return true;
2041     }
2042     if (comparison == null) {
2043       return false;
2044     }
2045     if (comparison.getClass() != this.getClass()) {
2046       return false;
2047     }
2048 
2049     HFileBlock castedComparison = (HFileBlock) comparison;
2050 
2051     if (castedComparison.blockType != this.blockType) {
2052       return false;
2053     }
2054     if (castedComparison.nextBlockOnDiskSizeWithHeader != this.nextBlockOnDiskSizeWithHeader) {
2055       return false;
2056     }
2057     if (castedComparison.offset != this.offset) {
2058       return false;
2059     }
2060     if (castedComparison.onDiskSizeWithoutHeader != this.onDiskSizeWithoutHeader) {
2061       return false;
2062     }
2063     if (castedComparison.prevBlockOffset != this.prevBlockOffset) {
2064       return false;
2065     }
2066     if (castedComparison.uncompressedSizeWithoutHeader != this.uncompressedSizeWithoutHeader) {
2067       return false;
2068     }
2069     if (this.buf.compareTo(castedComparison.buf) != 0) {
2070       return false;
2071     }
2072     if (this.buf.position() != castedComparison.buf.position()){
2073       return false;
2074     }
2075     if (this.buf.limit() != castedComparison.buf.limit()){
2076       return false;
2077     }
2078     return true;
2079   }
2080 
2081   public boolean doesIncludeMemstoreTS() {
2082     return includesMemstoreTS;
2083   }
2084 
2085   public DataBlockEncoding getDataBlockEncoding() {
2086     if (blockType == BlockType.ENCODED_DATA) {
2087       return DataBlockEncoding.getEncodingById(getDataBlockEncodingId());
2088     }
2089     return DataBlockEncoding.NONE;
2090   }
2091 
2092   byte getChecksumType() {
2093     return this.checksumType;
2094   }
2095 
2096   int getBytesPerChecksum() {
2097     return this.bytesPerChecksum;
2098   }
2099 
2100   int getOnDiskDataSizeWithHeader() {
2101     return this.onDiskDataSizeWithHeader;
2102   }
2103 
2104   int getMinorVersion() {
2105     return this.minorVersion;
2106   }
2107 
2108   /** 
2109    * Calcuate the number of bytes required to store all the checksums
2110    * for this block. Each checksum value is a 4 byte integer.
2111    */
2112   int totalChecksumBytes() {
2113     // If the hfile block has minorVersion 0, then there are no checksum
2114     // data to validate. Similarly, a zero value in this.bytesPerChecksum
2115     // indicates that cached blocks do not have checksum data because
2116     // checksums were already validated when the block was read from disk.
2117     if (minorVersion < MINOR_VERSION_WITH_CHECKSUM || this.bytesPerChecksum == 0) {
2118       return 0;
2119     }
2120     return (int)ChecksumUtil.numBytes(onDiskDataSizeWithHeader, bytesPerChecksum);
2121   }
2122 
2123   /**
2124    * Returns the size of this block header.
2125    */
2126   public int headerSize() {
2127     return headerSize(this.minorVersion);
2128   }
2129 
2130   /**
2131    * Maps a minor version to the size of the header.
2132    */
2133   static private int headerSize(int minorVersion) {
2134     if (minorVersion < MINOR_VERSION_WITH_CHECKSUM) {
2135       return HEADER_SIZE_NO_CHECKSUM;
2136     }
2137     return HEADER_SIZE_WITH_CHECKSUMS;
2138   }
2139 
2140   /**
2141    * Return the appropriate DUMMY_HEADER_WITH_CHECKSUM for the minor version
2142    */
2143   public byte[] getDummyHeaderForVersion() {
2144     return getDummyHeaderForVersion(minorVersion);
2145   }
2146 
2147   /**
2148    * Return the appropriate DUMMY_HEADER_WITH_CHECKSUM for the minor version
2149    */
2150   static private byte[] getDummyHeaderForVersion(int minorVersion) {
2151     if (minorVersion < MINOR_VERSION_WITH_CHECKSUM) {
2152       return DUMMY_HEADER_NO_CHECKSUM;
2153     }
2154     return DUMMY_HEADER_WITH_CHECKSUM;
2155   }
2156 
2157   /**
2158    * Convert the contents of the block header into a human readable string.
2159    * This is mostly helpful for debugging. This assumes that the block
2160    * has minor version > 0.
2161    */
2162   static String toStringHeader(ByteBuffer buf) throws IOException {
2163     int offset = buf.arrayOffset();
2164     byte[] b = buf.array();
2165     long magic = Bytes.toLong(b, offset); 
2166     BlockType bt = BlockType.read(buf);
2167     offset += Bytes.SIZEOF_LONG;
2168     int compressedBlockSizeNoHeader = Bytes.toInt(b, offset);
2169     offset += Bytes.SIZEOF_INT;
2170     int uncompressedBlockSizeNoHeader = Bytes.toInt(b, offset);
2171     offset += Bytes.SIZEOF_INT;
2172     long prevBlockOffset = Bytes.toLong(b, offset); 
2173     offset += Bytes.SIZEOF_LONG;
2174     byte cksumtype = b[offset];
2175     offset += Bytes.SIZEOF_BYTE;
2176     long bytesPerChecksum = Bytes.toInt(b, offset); 
2177     offset += Bytes.SIZEOF_INT;
2178     long onDiskDataSizeWithHeader = Bytes.toInt(b, offset); 
2179     offset += Bytes.SIZEOF_INT;
2180     return " Header dump: magic: " + magic +
2181                    " blockType " + bt +
2182                    " compressedBlockSizeNoHeader " + 
2183                    compressedBlockSizeNoHeader +
2184                    " uncompressedBlockSizeNoHeader " + 
2185                    uncompressedBlockSizeNoHeader +
2186                    " prevBlockOffset " + prevBlockOffset +
2187                    " checksumType " + ChecksumType.codeToType(cksumtype) +
2188                    " bytesPerChecksum " + bytesPerChecksum +
2189                    " onDiskDataSizeWithHeader " + onDiskDataSizeWithHeader;
2190   }
2191 }
2192