View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.io.hfile;
19  
20  import java.io.DataInput;
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  import java.util.ArrayList;
24  import java.util.List;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.classification.InterfaceAudience;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.fs.Path;
31  import org.apache.hadoop.hbase.HConstants;
32  import org.apache.hadoop.hbase.KeyValue;
33  import org.apache.hadoop.hbase.KeyValue.KVComparator;
34  import org.apache.hadoop.hbase.fs.HFileSystem;
35  import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
36  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
37  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
38  import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
39  import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
40  import org.apache.hadoop.hbase.util.ByteBufferUtils;
41  import org.apache.hadoop.hbase.util.Bytes;
42  import org.apache.hadoop.hbase.util.IdLock;
43  import org.apache.hadoop.io.WritableUtils;
44  import org.cloudera.htrace.Trace;
45  import org.cloudera.htrace.TraceScope;
46  
47  /**
48   * {@link HFile} reader for version 2.
49   */
50  @InterfaceAudience.Private
51  public class HFileReaderV2 extends AbstractHFileReader {
52  
53    private static final Log LOG = LogFactory.getLog(HFileReaderV2.class);
54  
55    /** Minor versions in HFile V2 starting with this number have hbase checksums */
56    public static final int MINOR_VERSION_WITH_CHECKSUM = 1;
57    /** In HFile V2 minor version that does not support checksums */
58    public static final int MINOR_VERSION_NO_CHECKSUM = 0;
59  
60    /** HFile minor version that introduced pbuf filetrailer */
61    public static final int PBUF_TRAILER_MINOR_VERSION = 2;
62  
63    /**
64     * The size of a (key length, value length) tuple that prefixes each entry in
65     * a data block.
66     */
67    public final static int KEY_VALUE_LEN_SIZE = 2 * Bytes.SIZEOF_INT;
68  
69    protected boolean includesMemstoreTS = false;
70    protected boolean decodeMemstoreTS = false;
71    protected boolean shouldIncludeMemstoreTS() {
72      return includesMemstoreTS;
73    }
74  
75    /** Filesystem-level block reader. */
76    protected HFileBlock.FSReader fsBlockReader;
77  
78    /**
79     * A "sparse lock" implementation allowing to lock on a particular block
80     * identified by offset. The purpose of this is to avoid two clients loading
81     * the same block, and have all but one client wait to get the block from the
82     * cache.
83     */
84    private IdLock offsetLock = new IdLock();
85  
86    /**
87     * Blocks read from the load-on-open section, excluding data root index, meta
88     * index, and file info.
89     */
90    private List<HFileBlock> loadOnOpenBlocks = new ArrayList<HFileBlock>();
91  
92    /** Minimum minor version supported by this HFile format */
93    static final int MIN_MINOR_VERSION = 0;
94  
95    /** Maximum minor version supported by this HFile format */
96    // We went to version 2 when we moved to pb'ing fileinfo and the trailer on
97    // the file. This version can read Writables version 1.
98    static final int MAX_MINOR_VERSION = 3;
99  
100   /** Minor versions starting with this number have faked index key */
101   static final int MINOR_VERSION_WITH_FAKED_KEY = 3;
102 
103   protected HFileContext hfileContext;
104 
105   /**
106    * Opens a HFile. You must load the index before you can use it by calling
107    * {@link #loadFileInfo()}.
108    *
109    * @param path Path to HFile.
110    * @param trailer File trailer.
111    * @param fsdis input stream.
112    * @param size Length of the stream.
113    * @param cacheConf Cache configuration.
114    * @param hfs
115    * @param conf
116    */
117   public HFileReaderV2(Path path, FixedFileTrailer trailer,
118       final FSDataInputStreamWrapper fsdis, final long size, final CacheConfig cacheConf,
119       final HFileSystem hfs, final Configuration conf) throws IOException {
120     super(path, trailer, size, cacheConf, hfs, conf);
121     this.conf = conf;
122     trailer.expectMajorVersion(getMajorVersion());
123     validateMinorVersion(path, trailer.getMinorVersion());
124     this.hfileContext = createHFileContext(fsdis, fileSize, hfs, path, trailer);
125     HFileBlock.FSReaderV2 fsBlockReaderV2 = new HFileBlock.FSReaderV2(fsdis, fileSize, hfs, path,
126         hfileContext);
127     this.fsBlockReader = fsBlockReaderV2; // upcast
128 
129     // Comparator class name is stored in the trailer in version 2.
130     comparator = trailer.createComparator();
131     dataBlockIndexReader = new HFileBlockIndex.BlockIndexReader(comparator,
132         trailer.getNumDataIndexLevels(), this);
133     metaBlockIndexReader = new HFileBlockIndex.BlockIndexReader(
134         KeyValue.RAW_COMPARATOR, 1);
135 
136     // Parse load-on-open data.
137 
138     HFileBlock.BlockIterator blockIter = fsBlockReaderV2.blockRange(
139         trailer.getLoadOnOpenDataOffset(),
140         fileSize - trailer.getTrailerSize());
141 
142     // Data index. We also read statistics about the block index written after
143     // the root level.
144     dataBlockIndexReader.readMultiLevelIndexRoot(
145         blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX),
146         trailer.getDataIndexCount());
147 
148     // Meta index.
149     metaBlockIndexReader.readRootIndex(
150         blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX),
151         trailer.getMetaIndexCount());
152 
153     // File info
154     fileInfo = new FileInfo();
155     fileInfo.read(blockIter.nextBlockWithBlockType(BlockType.FILE_INFO).getByteStream());
156     lastKey = fileInfo.get(FileInfo.LASTKEY);
157     avgKeyLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_KEY_LEN));
158     avgValueLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_VALUE_LEN));
159     byte [] keyValueFormatVersion =
160         fileInfo.get(HFileWriterV2.KEY_VALUE_VERSION);
161     includesMemstoreTS = keyValueFormatVersion != null &&
162         Bytes.toInt(keyValueFormatVersion) ==
163             HFileWriterV2.KEY_VALUE_VER_WITH_MEMSTORE;
164     fsBlockReaderV2.setIncludesMemstoreTS(includesMemstoreTS);
165     if (includesMemstoreTS) {
166       decodeMemstoreTS = Bytes.toLong(fileInfo.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY)) > 0;
167     }
168 
169     // Read data block encoding algorithm name from file info.
170     dataBlockEncoder = HFileDataBlockEncoderImpl.createFromFileInfo(fileInfo);
171     fsBlockReaderV2.setDataBlockEncoder(dataBlockEncoder);
172 
173     // Store all other load-on-open blocks for further consumption.
174     HFileBlock b;
175     while ((b = blockIter.nextBlock()) != null) {
176       loadOnOpenBlocks.add(b);
177     }
178   }
179 
180   protected HFileContext createHFileContext(FSDataInputStreamWrapper fsdis, long fileSize,
181       HFileSystem hfs, Path path, FixedFileTrailer trailer) throws IOException {
182     return new HFileContextBuilder()
183       .withIncludesMvcc(this.includesMemstoreTS)
184       .withCompression(this.compressAlgo)
185       .withHBaseCheckSum(trailer.getMinorVersion() >= MINOR_VERSION_WITH_CHECKSUM)
186       .build();
187   }
188 
189   /**
190    * Create a Scanner on this file. No seeks or reads are done on creation. Call
191    * {@link HFileScanner#seekTo(byte[])} to position an start the read. There is
192    * nothing to clean up in a Scanner. Letting go of your references to the
193    * scanner is sufficient.
194    *
195    * @param cacheBlocks True if we should cache blocks read in by this scanner.
196    * @param pread Use positional read rather than seek+read if true (pread is
197    *          better for random reads, seek+read is better scanning).
198    * @param isCompaction is scanner being used for a compaction?
199    * @return Scanner on this file.
200    */
201    @Override
202    public HFileScanner getScanner(boolean cacheBlocks, final boolean pread,
203       final boolean isCompaction) {
204     if (dataBlockEncoder.useEncodedScanner()) {
205       return new EncodedScannerV2(this, cacheBlocks, pread, isCompaction,
206           hfileContext);
207     }
208 
209     return new ScannerV2(this, cacheBlocks, pread, isCompaction);
210   }
211 
212   /**
213    * @param metaBlockName
214    * @param cacheBlock Add block to cache, if found
215    * @return block wrapped in a ByteBuffer, with header skipped
216    * @throws IOException
217    */
218   @Override
219   public ByteBuffer getMetaBlock(String metaBlockName, boolean cacheBlock)
220       throws IOException {
221     if (trailer.getMetaIndexCount() == 0) {
222       return null; // there are no meta blocks
223     }
224     if (metaBlockIndexReader == null) {
225       throw new IOException("Meta index not loaded");
226     }
227 
228     byte[] mbname = Bytes.toBytes(metaBlockName);
229     int block = metaBlockIndexReader.rootBlockContainingKey(mbname, 0,
230         mbname.length);
231     if (block == -1)
232       return null;
233     long blockSize = metaBlockIndexReader.getRootBlockDataSize(block);
234     long startTimeNs = System.nanoTime();
235 
236     // Per meta key from any given file, synchronize reads for said block. This
237     // is OK to do for meta blocks because the meta block index is always
238     // single-level.
239     synchronized (metaBlockIndexReader.getRootBlockKey(block)) {
240       // Check cache for block. If found return.
241       long metaBlockOffset = metaBlockIndexReader.getRootBlockOffset(block);
242       BlockCacheKey cacheKey = new BlockCacheKey(name, metaBlockOffset,
243           DataBlockEncoding.NONE, BlockType.META);
244 
245       cacheBlock &= cacheConf.shouldCacheDataOnRead();
246       if (cacheConf.isBlockCacheEnabled()) {
247         HFileBlock cachedBlock =
248           (HFileBlock) cacheConf.getBlockCache().getBlock(cacheKey, cacheBlock, false);
249         if (cachedBlock != null) {
250           // Return a distinct 'shallow copy' of the block,
251           // so pos does not get messed by the scanner
252           return cachedBlock.getBufferWithoutHeader();
253         }
254         // Cache Miss, please load.
255       }
256 
257       HFileBlock metaBlock = fsBlockReader.readBlockData(metaBlockOffset,
258           blockSize, -1, true);
259 
260       final long delta = System.nanoTime() - startTimeNs;
261       HFile.offerReadLatency(delta, true);
262 
263       // Cache the block
264       if (cacheBlock) {
265         cacheConf.getBlockCache().cacheBlock(cacheKey, metaBlock,
266             cacheConf.isInMemory());
267       }
268 
269       return metaBlock.getBufferWithoutHeader();
270     }
271   }
272 
273   /**
274    * Read in a file block.
275    * @param dataBlockOffset offset to read.
276    * @param onDiskBlockSize size of the block
277    * @param cacheBlock
278    * @param pread Use positional read instead of seek+read (positional is
279    *          better doing random reads whereas seek+read is better scanning).
280    * @param isCompaction is this block being read as part of a compaction
281    * @param expectedBlockType the block type we are expecting to read with this
282    *          read operation, or null to read whatever block type is available
283    *          and avoid checking (that might reduce caching efficiency of
284    *          encoded data blocks)
285    * @return Block wrapped in a ByteBuffer.
286    * @throws IOException
287    */
288   @Override
289   public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize,
290       final boolean cacheBlock, boolean pread, final boolean isCompaction,
291       BlockType expectedBlockType)
292       throws IOException {
293     if (dataBlockIndexReader == null) {
294       throw new IOException("Block index not loaded");
295     }
296     if (dataBlockOffset < 0
297         || dataBlockOffset >= trailer.getLoadOnOpenDataOffset()) {
298       throw new IOException("Requested block is out of range: "
299           + dataBlockOffset + ", lastDataBlockOffset: "
300           + trailer.getLastDataBlockOffset());
301     }
302     // For any given block from any given file, synchronize reads for said
303     // block.
304     // Without a cache, this synchronizing is needless overhead, but really
305     // the other choice is to duplicate work (which the cache would prevent you
306     // from doing).
307 
308     BlockCacheKey cacheKey =
309         new BlockCacheKey(name, dataBlockOffset,
310             dataBlockEncoder.getDataBlockEncoding(),
311             expectedBlockType);
312 
313     boolean useLock = false;
314     IdLock.Entry lockEntry = null;
315     TraceScope traceScope = Trace.startSpan("HFileReaderV2.readBlock");
316     try {
317       while (true) {
318         if (useLock) {
319           lockEntry = offsetLock.getLockEntry(dataBlockOffset);
320         }
321 
322         // Check cache for block. If found return.
323         if (cacheConf.isBlockCacheEnabled()) {
324           // Try and get the block from the block cache. If the useLock variable is true then this
325           // is the second time through the loop and it should not be counted as a block cache miss.
326           HFileBlock cachedBlock = (HFileBlock) cacheConf.getBlockCache().getBlock(cacheKey,
327               cacheBlock, useLock);
328           if (cachedBlock != null) {
329             validateBlockType(cachedBlock, expectedBlockType);
330             if (cachedBlock.getBlockType().isData()) {
331               HFile.dataBlockReadCnt.incrementAndGet();
332 
333               // Validate encoding type for data blocks. We include encoding
334               // type in the cache key, and we expect it to match on a cache hit.
335               if (cachedBlock.getDataBlockEncoding() != dataBlockEncoder.getDataBlockEncoding()) {
336                 throw new IOException("Cached block under key " + cacheKey + " "
337                   + "has wrong encoding: " + cachedBlock.getDataBlockEncoding() + " (expected: "
338                   + dataBlockEncoder.getDataBlockEncoding() + ")");
339               }
340             }
341             return cachedBlock;
342           }
343           // Carry on, please load.
344         }
345         if (!useLock) {
346           // check cache again with lock
347           useLock = true;
348           continue;
349         }
350         if (Trace.isTracing()) {
351           traceScope.getSpan().addTimelineAnnotation("blockCacheMiss");
352         }
353         // Load block from filesystem.
354         long startTimeNs = System.nanoTime();
355         HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, -1,
356             pread);
357         validateBlockType(hfileBlock, expectedBlockType);
358 
359         final long delta = System.nanoTime() - startTimeNs;
360         HFile.offerReadLatency(delta, pread);
361 
362         // Cache the block if necessary
363         if (cacheBlock && cacheConf.shouldCacheBlockOnRead(hfileBlock.getBlockType().getCategory())) {
364           cacheConf.getBlockCache().cacheBlock(cacheKey, hfileBlock, cacheConf.isInMemory());
365         }
366 
367         if (hfileBlock.getBlockType().isData()) {
368           HFile.dataBlockReadCnt.incrementAndGet();
369         }
370 
371         return hfileBlock;
372       }
373     } finally {
374       traceScope.close();
375       if (lockEntry != null) {
376         offsetLock.releaseLockEntry(lockEntry);
377       }
378     }
379   }
380 
381   @Override
382   public boolean hasMVCCInfo() {
383     return includesMemstoreTS && decodeMemstoreTS;
384   }
385 
386   /**
387    * Compares the actual type of a block retrieved from cache or disk with its
388    * expected type and throws an exception in case of a mismatch. Expected
389    * block type of {@link BlockType#DATA} is considered to match the actual
390    * block type [@link {@link BlockType#ENCODED_DATA} as well.
391    * @param block a block retrieved from cache or disk
392    * @param expectedBlockType the expected block type, or null to skip the
393    *          check
394    */
395   private void validateBlockType(HFileBlock block,
396       BlockType expectedBlockType) throws IOException {
397     if (expectedBlockType == null) {
398       return;
399     }
400     BlockType actualBlockType = block.getBlockType();
401     if (actualBlockType == BlockType.ENCODED_DATA &&
402         expectedBlockType == BlockType.DATA) {
403       // We consider DATA to match ENCODED_DATA for the purpose of this
404       // verification.
405       return;
406     }
407     if (actualBlockType != expectedBlockType) {
408       throw new IOException("Expected block type " + expectedBlockType + ", " +
409           "but got " + actualBlockType + ": " + block);
410     }
411   }
412 
413   /**
414    * @return Last key in the file. May be null if file has no entries. Note that
415    *         this is not the last row key, but rather the byte form of the last
416    *         KeyValue.
417    */
418   @Override
419   public byte[] getLastKey() {
420     return dataBlockIndexReader.isEmpty() ? null : lastKey;
421   }
422 
423   /**
424    * @return Midkey for this file. We work with block boundaries only so
425    *         returned midkey is an approximation only.
426    * @throws IOException
427    */
428   @Override
429   public byte[] midkey() throws IOException {
430     return dataBlockIndexReader.midkey();
431   }
432 
433   @Override
434   public void close() throws IOException {
435     close(cacheConf.shouldEvictOnClose());
436   }
437 
438   public void close(boolean evictOnClose) throws IOException {
439     if (evictOnClose && cacheConf.isBlockCacheEnabled()) {
440       int numEvicted = cacheConf.getBlockCache().evictBlocksByHfileName(name);
441       if (LOG.isTraceEnabled()) {
442         LOG.trace("On close, file=" + name + " evicted=" + numEvicted
443           + " block(s)");
444       }
445     }
446     fsBlockReader.closeStreams();
447   }
448 
449   /** For testing */
450   @Override
451   HFileBlock.FSReader getUncachedBlockReader() {
452     return fsBlockReader;
453   }
454 
455 
456   protected abstract static class AbstractScannerV2
457       extends AbstractHFileReader.Scanner {
458     protected HFileBlock block;
459 
460     /**
461      * The next indexed key is to keep track of the indexed key of the next data block.
462      * If the nextIndexedKey is HConstants.NO_NEXT_INDEXED_KEY, it means that the
463      * current data block is the last data block.
464      *
465      * If the nextIndexedKey is null, it means the nextIndexedKey has not been loaded yet.
466      */
467     protected byte[] nextIndexedKey;
468 
469     public AbstractScannerV2(HFileReaderV2 r, boolean cacheBlocks,
470         final boolean pread, final boolean isCompaction) {
471       super(r, cacheBlocks, pread, isCompaction);
472     }
473 
474     /**
475      * An internal API function. Seek to the given key, optionally rewinding to
476      * the first key of the block before doing the seek.
477      *
478      * @param key key byte array
479      * @param offset key offset in the key byte array
480      * @param length key length
481      * @param rewind whether to rewind to the first key of the block before
482      *        doing the seek. If this is false, we are assuming we never go
483      *        back, otherwise the result is undefined.
484      * @return -1 if the key is earlier than the first key of the file,
485      *         0 if we are at the given key, 1 if we are past the given key
486      *         -2 if the key is earlier than the first key of the file while
487      *         using a faked index key
488      * @throws IOException
489      */
490     protected int seekTo(byte[] key, int offset, int length, boolean rewind)
491         throws IOException {
492       HFileBlockIndex.BlockIndexReader indexReader =
493           reader.getDataBlockIndexReader();
494       BlockWithScanInfo blockWithScanInfo =
495         indexReader.loadDataBlockWithScanInfo(key, offset, length, block,
496             cacheBlocks, pread, isCompaction);
497       if (blockWithScanInfo == null || blockWithScanInfo.getHFileBlock() == null) {
498         // This happens if the key e.g. falls before the beginning of the file.
499         return -1;
500       }
501       return loadBlockAndSeekToKey(blockWithScanInfo.getHFileBlock(),
502           blockWithScanInfo.getNextIndexedKey(), rewind, key, offset, length, false);
503     }
504 
505     protected abstract ByteBuffer getFirstKeyInBlock(HFileBlock curBlock);
506 
507     protected abstract int loadBlockAndSeekToKey(HFileBlock seekToBlock, byte[] nextIndexedKey,
508         boolean rewind, byte[] key, int offset, int length, boolean seekBefore)
509         throws IOException;
510 
511     @Override
512     public int seekTo(byte[] key, int offset, int length) throws IOException {
513       // Always rewind to the first key of the block, because the given key
514       // might be before or after the current key.
515       return seekTo(key, offset, length, true);
516     }
517 
518     @Override
519     public int reseekTo(byte[] key, int offset, int length) throws IOException {
520       int compared;
521       if (isSeeked()) {
522         compared = compareKey(reader.getComparator(), key, offset, length);
523         if (compared < 1) {
524           // If the required key is less than or equal to current key, then
525           // don't do anything.
526           return compared;
527         } else {
528           if (this.nextIndexedKey != null &&
529               (this.nextIndexedKey == HConstants.NO_NEXT_INDEXED_KEY ||
530                reader.getComparator().compareFlatKey(key, offset, length,
531                    nextIndexedKey, 0, nextIndexedKey.length) < 0)) {
532             // The reader shall continue to scan the current data block instead of querying the
533             // block index as long as it knows the target key is strictly smaller than
534             // the next indexed key or the current data block is the last data block.
535             return loadBlockAndSeekToKey(this.block, this.nextIndexedKey,
536                 false, key, offset, length, false);
537           }
538         }
539       }
540       // Don't rewind on a reseek operation, because reseek implies that we are
541       // always going forward in the file.
542       return seekTo(key, offset, length, false);
543     }
544 
545     @Override
546     public boolean seekBefore(byte[] key, int offset, int length)
547         throws IOException {
548       HFileBlock seekToBlock =
549           reader.getDataBlockIndexReader().seekToDataBlock(key, offset, length,
550               block, cacheBlocks, pread, isCompaction);
551       if (seekToBlock == null) {
552         return false;
553       }
554       ByteBuffer firstKey = getFirstKeyInBlock(seekToBlock);
555 
556       if (reader.getComparator().compareFlatKey(firstKey.array(),
557           firstKey.arrayOffset(), firstKey.limit(), key, offset, length) >= 0)
558       {
559         long previousBlockOffset = seekToBlock.getPrevBlockOffset();
560         // The key we are interested in
561         if (previousBlockOffset == -1) {
562           // we have a 'problem', the key we want is the first of the file.
563           return false;
564         }
565 
566         // It is important that we compute and pass onDiskSize to the block
567         // reader so that it does not have to read the header separately to
568         // figure out the size.
569         seekToBlock = reader.readBlock(previousBlockOffset,
570             seekToBlock.getOffset() - previousBlockOffset, cacheBlocks,
571             pread, isCompaction, BlockType.DATA);
572         // TODO shortcut: seek forward in this block to the last key of the
573         // block.
574       }
575       byte[] firstKeyInCurrentBlock = Bytes.getBytes(firstKey);
576       loadBlockAndSeekToKey(seekToBlock, firstKeyInCurrentBlock, true, key, offset, length, true);
577       return true;
578     }
579 
580 
581     /**
582      * Scans blocks in the "scanned" section of the {@link HFile} until the next
583      * data block is found.
584      *
585      * @return the next block, or null if there are no more data blocks
586      * @throws IOException
587      */
588     protected HFileBlock readNextDataBlock() throws IOException {
589       long lastDataBlockOffset = reader.getTrailer().getLastDataBlockOffset();
590       if (block == null)
591         return null;
592 
593       HFileBlock curBlock = block;
594 
595       do {
596         if (curBlock.getOffset() >= lastDataBlockOffset)
597           return null;
598 
599         if (curBlock.getOffset() < 0) {
600           throw new IOException("Invalid block file offset: " + block);
601         }
602 
603         // We are reading the next block without block type validation, because
604         // it might turn out to be a non-data block.
605         curBlock = reader.readBlock(curBlock.getOffset()
606             + curBlock.getOnDiskSizeWithHeader(),
607             curBlock.getNextBlockOnDiskSizeWithHeader(), cacheBlocks, pread,
608             isCompaction, null);
609       } while (!curBlock.getBlockType().isData());
610 
611       return curBlock;
612     }
613     /**
614      * Compare the given key against the current key
615      * @param comparator
616      * @param key
617      * @param offset
618      * @param length
619      * @return -1 is the passed key is smaller than the current key, 0 if equal and 1 if greater
620      */
621     public abstract int compareKey(KVComparator comparator, byte[] key, int offset,
622         int length);
623   }
624 
625   /**
626    * Implementation of {@link HFileScanner} interface.
627    */
628   protected static class ScannerV2 extends AbstractScannerV2 {
629     private HFileReaderV2 reader;
630 
631     public ScannerV2(HFileReaderV2 r, boolean cacheBlocks,
632         final boolean pread, final boolean isCompaction) {
633       super(r, cacheBlocks, pread, isCompaction);
634       this.reader = r;
635     }
636 
637     @Override
638     public KeyValue getKeyValue() {
639       if (!isSeeked())
640         return null;
641 
642       KeyValue ret = new KeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
643           + blockBuffer.position(), getCellBufSize());
644       if (this.reader.shouldIncludeMemstoreTS()) {
645         ret.setMvccVersion(currMemstoreTS);
646       }
647       return ret;
648     }
649 
650     protected int getCellBufSize() {
651       return KEY_VALUE_LEN_SIZE + currKeyLen + currValueLen;
652     }
653 
654     @Override
655     public ByteBuffer getKey() {
656       assertSeeked();
657       return ByteBuffer.wrap(
658           blockBuffer.array(),
659           blockBuffer.arrayOffset() + blockBuffer.position()
660               + KEY_VALUE_LEN_SIZE, currKeyLen).slice();
661     }
662 
663     @Override
664     public int compareKey(KVComparator comparator, byte[] key, int offset, int length) {
665       return comparator.compareFlatKey(key, offset, length, blockBuffer.array(),
666           blockBuffer.arrayOffset() + blockBuffer.position() + KEY_VALUE_LEN_SIZE, currKeyLen);
667     }
668 
669     @Override
670     public ByteBuffer getValue() {
671       assertSeeked();
672       return ByteBuffer.wrap(
673           blockBuffer.array(),
674           blockBuffer.arrayOffset() + blockBuffer.position()
675               + KEY_VALUE_LEN_SIZE + currKeyLen, currValueLen).slice();
676     }
677 
678     protected void setNonSeekedState() {
679       block = null;
680       blockBuffer = null;
681       currKeyLen = 0;
682       currValueLen = 0;
683       currMemstoreTS = 0;
684       currMemstoreTSLen = 0;
685     }
686 
687     /**
688      * Go to the next key/value in the block section. Loads the next block if
689      * necessary. If successful, {@link #getKey()} and {@link #getValue()} can
690      * be called.
691      *
692      * @return true if successfully navigated to the next key/value
693      */
694     @Override
695     public boolean next() throws IOException {
696       assertSeeked();
697 
698       try {
699         blockBuffer.position(getNextCellStartPosition());
700       } catch (IllegalArgumentException e) {
701         LOG.error("Current pos = " + blockBuffer.position()
702             + "; currKeyLen = " + currKeyLen + "; currValLen = "
703             + currValueLen + "; block limit = " + blockBuffer.limit()
704             + "; HFile name = " + reader.getName()
705             + "; currBlock currBlockOffset = " + block.getOffset());
706         throw e;
707       }
708 
709       if (blockBuffer.remaining() <= 0) {
710         long lastDataBlockOffset =
711             reader.getTrailer().getLastDataBlockOffset();
712 
713         if (block.getOffset() >= lastDataBlockOffset) {
714           setNonSeekedState();
715           return false;
716         }
717 
718         // read the next block
719         HFileBlock nextBlock = readNextDataBlock();
720         if (nextBlock == null) {
721           setNonSeekedState();
722           return false;
723         }
724 
725         updateCurrBlock(nextBlock);
726         return true;
727       }
728 
729       // We are still in the same block.
730       readKeyValueLen();
731       return true;
732     }
733 
734     protected int getNextCellStartPosition() {
735       return blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen + currValueLen
736           + currMemstoreTSLen;
737     }
738 
739     /**
740      * Positions this scanner at the start of the file.
741      *
742      * @return false if empty file; i.e. a call to next would return false and
743      *         the current key and value are undefined.
744      * @throws IOException
745      */
746     @Override
747     public boolean seekTo() throws IOException {
748       if (reader == null) {
749         return false;
750       }
751 
752       if (reader.getTrailer().getEntryCount() == 0) {
753         // No data blocks.
754         return false;
755       }
756 
757       long firstDataBlockOffset =
758           reader.getTrailer().getFirstDataBlockOffset();
759       if (block != null && block.getOffset() == firstDataBlockOffset) {
760         blockBuffer.rewind();
761         readKeyValueLen();
762         return true;
763       }
764 
765       block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread,
766           isCompaction, BlockType.DATA);
767       if (block.getOffset() < 0) {
768         throw new IOException("Invalid block offset: " + block.getOffset());
769       }
770       updateCurrBlock(block);
771       return true;
772     }
773 
774     @Override
775     protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, byte[] nextIndexedKey,
776         boolean rewind, byte[] key, int offset, int length, boolean seekBefore)
777         throws IOException {
778       if (block == null || block.getOffset() != seekToBlock.getOffset()) {
779         updateCurrBlock(seekToBlock);
780       } else if (rewind) {
781         blockBuffer.rewind();
782       }
783 
784       // Update the nextIndexedKey
785       this.nextIndexedKey = nextIndexedKey;
786       return blockSeek(key, offset, length, seekBefore);
787     }
788 
789     /**
790      * Updates the current block to be the given {@link HFileBlock}. Seeks to
791      * the the first key/value pair.
792      *
793      * @param newBlock the block to make current
794      */
795     protected void updateCurrBlock(HFileBlock newBlock) {
796       block = newBlock;
797 
798       // sanity check
799       if (block.getBlockType() != BlockType.DATA) {
800         throw new IllegalStateException("ScannerV2 works only on data " +
801             "blocks, got " + block.getBlockType() + "; " +
802             "fileName=" + reader.name + ", " +
803             "dataBlockEncoder=" + reader.dataBlockEncoder + ", " +
804             "isCompaction=" + isCompaction);
805       }
806 
807       blockBuffer = block.getBufferWithoutHeader();
808       readKeyValueLen();
809       blockFetches++;
810 
811       // Reset the next indexed key
812       this.nextIndexedKey = null;
813     }
814 
815     protected void readKeyValueLen() {
816       blockBuffer.mark();
817       currKeyLen = blockBuffer.getInt();
818       currValueLen = blockBuffer.getInt();
819       ByteBufferUtils.skip(blockBuffer, currKeyLen + currValueLen);
820       readMvccVersion();
821       if (currKeyLen < 0 || currValueLen < 0
822           || currKeyLen > blockBuffer.limit()
823           || currValueLen > blockBuffer.limit()) {
824         throw new IllegalStateException("Invalid currKeyLen " + currKeyLen
825             + " or currValueLen " + currValueLen + ". Block offset: "
826             + block.getOffset() + ", block length: " + blockBuffer.limit()
827             + ", position: " + blockBuffer.position() + " (without header).");
828       }
829       blockBuffer.reset();
830     }
831 
832     protected void readMvccVersion() {
833       if (this.reader.shouldIncludeMemstoreTS()) {
834         if (this.reader.decodeMemstoreTS) {
835           try {
836             currMemstoreTS = Bytes.readVLong(blockBuffer.array(), blockBuffer.arrayOffset()
837                 + blockBuffer.position());
838             currMemstoreTSLen = WritableUtils.getVIntSize(currMemstoreTS);
839           } catch (Exception e) {
840             throw new RuntimeException("Error reading memstore timestamp", e);
841           }
842         } else {
843           currMemstoreTS = 0;
844           currMemstoreTSLen = 1;
845         }
846       }
847     }
848 
849     /**
850      * Within a loaded block, seek looking for the last key that is smaller
851      * than (or equal to?) the key we are interested in.
852      *
853      * A note on the seekBefore: if you have seekBefore = true, AND the first
854      * key in the block = key, then you'll get thrown exceptions. The caller has
855      * to check for that case and load the previous block as appropriate.
856      *
857      * @param key the key to find
858      * @param seekBefore find the key before the given key in case of exact
859      *          match.
860      * @return 0 in case of an exact key match, 1 in case of an inexact match,
861      *         -2 in case of an inexact match and furthermore, the input key less
862      *         than the first key of current block(e.g. using a faked index key)
863      */
864     protected int blockSeek(byte[] key, int offset, int length,
865         boolean seekBefore) {
866       int klen, vlen;
867       long memstoreTS = 0;
868       int memstoreTSLen = 0;
869       int lastKeyValueSize = -1;
870       do {
871         blockBuffer.mark();
872         klen = blockBuffer.getInt();
873         vlen = blockBuffer.getInt();
874         blockBuffer.reset();
875         if (this.reader.shouldIncludeMemstoreTS()) {
876           if (this.reader.decodeMemstoreTS) {
877             try {
878               int memstoreTSOffset = blockBuffer.arrayOffset()
879                   + blockBuffer.position() + KEY_VALUE_LEN_SIZE + klen + vlen;
880               memstoreTS = Bytes.readVLong(blockBuffer.array(),
881                   memstoreTSOffset);
882               memstoreTSLen = WritableUtils.getVIntSize(memstoreTS);
883             } catch (Exception e) {
884               throw new RuntimeException("Error reading memstore timestamp", e);
885             }
886           } else {
887             memstoreTS = 0;
888             memstoreTSLen = 1;
889           }
890         }
891 
892         int keyOffset = blockBuffer.arrayOffset() + blockBuffer.position()
893             + KEY_VALUE_LEN_SIZE;
894         int comp = reader.getComparator().compareFlatKey(key, offset, length,
895             blockBuffer.array(), keyOffset, klen);
896 
897         if (comp == 0) {
898           if (seekBefore) {
899             if (lastKeyValueSize < 0) {
900               throw new IllegalStateException("blockSeek with seekBefore "
901                   + "at the first key of the block: key="
902                   + Bytes.toStringBinary(key) + ", blockOffset="
903                   + block.getOffset() + ", onDiskSize="
904                   + block.getOnDiskSizeWithHeader());
905             }
906             blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
907             readKeyValueLen();
908             return 1; // non exact match.
909           }
910           currKeyLen = klen;
911           currValueLen = vlen;
912           if (this.reader.shouldIncludeMemstoreTS()) {
913             currMemstoreTS = memstoreTS;
914             currMemstoreTSLen = memstoreTSLen;
915           }
916           return 0; // indicate exact match
917         } else if (comp < 0) {
918           if (lastKeyValueSize > 0)
919             blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
920           readKeyValueLen();
921           if (lastKeyValueSize == -1 && blockBuffer.position() == 0
922               && this.reader.trailer.getMinorVersion() >= MINOR_VERSION_WITH_FAKED_KEY) {
923             return HConstants.INDEX_KEY_MAGIC;
924           }
925           return 1;
926         }
927 
928         // The size of this key/value tuple, including key/value length fields.
929         lastKeyValueSize = klen + vlen + memstoreTSLen + KEY_VALUE_LEN_SIZE;
930         blockBuffer.position(blockBuffer.position() + lastKeyValueSize);
931       } while (blockBuffer.remaining() > 0);
932 
933       // Seek to the last key we successfully read. This will happen if this is
934       // the last key/value pair in the file, in which case the following call
935       // to next() has to return false.
936       blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
937       readKeyValueLen();
938       return 1; // didn't exactly find it.
939     }
940 
941     @Override
942     protected ByteBuffer getFirstKeyInBlock(HFileBlock curBlock) {
943       ByteBuffer buffer = curBlock.getBufferWithoutHeader();
944       // It is safe to manipulate this buffer because we own the buffer object.
945       buffer.rewind();
946       int klen = buffer.getInt();
947       buffer.getInt();
948       ByteBuffer keyBuff = buffer.slice();
949       keyBuff.limit(klen);
950       keyBuff.rewind();
951       return keyBuff;
952     }
953 
954     @Override
955     public String getKeyString() {
956       return Bytes.toStringBinary(blockBuffer.array(),
957           blockBuffer.arrayOffset() + blockBuffer.position()
958               + KEY_VALUE_LEN_SIZE, currKeyLen);
959     }
960 
961     @Override
962     public String getValueString() {
963       return Bytes.toString(blockBuffer.array(), blockBuffer.arrayOffset()
964           + blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen,
965           currValueLen);
966     }
967   }
968 
969   /**
970    * ScannerV2 that operates on encoded data blocks.
971    */
972   protected static class EncodedScannerV2 extends AbstractScannerV2 {
973     private final HFileBlockDecodingContext decodingCtx;
974     private final DataBlockEncoder.EncodedSeeker seeker;
975     private final DataBlockEncoder dataBlockEncoder;
976     protected final HFileContext meta;
977 
978     public EncodedScannerV2(HFileReaderV2 reader, boolean cacheBlocks,
979         boolean pread, boolean isCompaction, HFileContext meta) {
980       super(reader, cacheBlocks, pread, isCompaction);
981       DataBlockEncoding encoding = reader.dataBlockEncoder.getDataBlockEncoding();
982       dataBlockEncoder = encoding.getEncoder();
983       decodingCtx = dataBlockEncoder.newDataBlockDecodingContext(meta);
984       seeker = dataBlockEncoder.createSeeker(
985         reader.getComparator(), decodingCtx);
986       this.meta = meta;
987     }
988 
989     @Override
990     public boolean isSeeked(){
991       return this.block != null;
992     }
993 
994     /**
995      * Updates the current block to be the given {@link HFileBlock}. Seeks to
996      * the the first key/value pair.
997      *
998      * @param newBlock the block to make current
999      * @throws CorruptHFileException
1000      */
1001     private void updateCurrentBlock(HFileBlock newBlock) throws CorruptHFileException {
1002       block = newBlock;
1003 
1004       // sanity checks
1005       if (block.getBlockType() != BlockType.ENCODED_DATA) {
1006         throw new IllegalStateException(
1007             "EncodedScanner works only on encoded data blocks");
1008       }
1009       short dataBlockEncoderId = block.getDataBlockEncodingId();
1010       if (!DataBlockEncoding.isCorrectEncoder(dataBlockEncoder, dataBlockEncoderId)) {
1011         String encoderCls = dataBlockEncoder.getClass().getName();
1012         throw new CorruptHFileException("Encoder " + encoderCls
1013           + " doesn't support data block encoding "
1014           + DataBlockEncoding.getNameFromId(dataBlockEncoderId));
1015       }
1016 
1017       seeker.setCurrentBuffer(getEncodedBuffer(newBlock));
1018       blockFetches++;
1019 
1020       // Reset the next indexed key
1021       this.nextIndexedKey = null;
1022     }
1023 
1024     private ByteBuffer getEncodedBuffer(HFileBlock newBlock) {
1025       ByteBuffer origBlock = newBlock.getBufferReadOnly();
1026       ByteBuffer encodedBlock = ByteBuffer.wrap(origBlock.array(),
1027           origBlock.arrayOffset() + newBlock.headerSize() +
1028           DataBlockEncoding.ID_SIZE,
1029           newBlock.getUncompressedSizeWithoutHeader() -
1030           DataBlockEncoding.ID_SIZE).slice();
1031       return encodedBlock;
1032     }
1033 
1034     @Override
1035     public boolean seekTo() throws IOException {
1036       if (reader == null) {
1037         return false;
1038       }
1039 
1040       if (reader.getTrailer().getEntryCount() == 0) {
1041         // No data blocks.
1042         return false;
1043       }
1044 
1045       long firstDataBlockOffset =
1046           reader.getTrailer().getFirstDataBlockOffset();
1047       if (block != null && block.getOffset() == firstDataBlockOffset) {
1048         seeker.rewind();
1049         return true;
1050       }
1051 
1052       block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread,
1053           isCompaction, BlockType.DATA);
1054       if (block.getOffset() < 0) {
1055         throw new IOException("Invalid block offset: " + block.getOffset());
1056       }
1057       updateCurrentBlock(block);
1058       return true;
1059     }
1060 
1061     @Override
1062     public boolean next() throws IOException {
1063       boolean isValid = seeker.next();
1064       if (!isValid) {
1065         block = readNextDataBlock();
1066         isValid = block != null;
1067         if (isValid) {
1068           updateCurrentBlock(block);
1069         }
1070       }
1071       return isValid;
1072     }
1073 
1074     @Override
1075     public ByteBuffer getKey() {
1076       assertValidSeek();
1077       return seeker.getKeyDeepCopy();
1078     }
1079 
1080     @Override
1081     public int compareKey(KVComparator comparator, byte[] key, int offset, int length) {
1082       return seeker.compareKey(comparator, key, offset, length);
1083     }
1084 
1085     @Override
1086     public ByteBuffer getValue() {
1087       assertValidSeek();
1088       return seeker.getValueShallowCopy();
1089     }
1090 
1091     @Override
1092     public KeyValue getKeyValue() {
1093       if (block == null) {
1094         return null;
1095       }
1096       return seeker.getKeyValue();
1097     }
1098 
1099     @Override
1100     public String getKeyString() {
1101       ByteBuffer keyBuffer = getKey();
1102       return Bytes.toStringBinary(keyBuffer.array(),
1103           keyBuffer.arrayOffset(), keyBuffer.limit());
1104     }
1105 
1106     @Override
1107     public String getValueString() {
1108       ByteBuffer valueBuffer = getValue();
1109       return Bytes.toStringBinary(valueBuffer.array(),
1110           valueBuffer.arrayOffset(), valueBuffer.limit());
1111     }
1112 
1113     private void assertValidSeek() {
1114       if (block == null) {
1115         throw new NotSeekedException();
1116       }
1117     }
1118 
1119     @Override
1120     protected ByteBuffer getFirstKeyInBlock(HFileBlock curBlock) {
1121       return dataBlockEncoder.getFirstKeyInBlock(getEncodedBuffer(curBlock));
1122     }
1123 
1124     @Override
1125     protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, byte[] nextIndexedKey,
1126         boolean rewind, byte[] key, int offset, int length, boolean seekBefore)
1127         throws IOException  {
1128       if (block == null || block.getOffset() != seekToBlock.getOffset()) {
1129         updateCurrentBlock(seekToBlock);
1130       } else if (rewind) {
1131         seeker.rewind();
1132       }
1133       this.nextIndexedKey = nextIndexedKey;
1134       return seeker.seekToKeyInBlock(key, offset, length, seekBefore);
1135     }
1136   }
1137 
1138   /**
1139    * Returns a buffer with the Bloom filter metadata. The caller takes
1140    * ownership of the buffer.
1141    */
1142   @Override
1143   public DataInput getGeneralBloomFilterMetadata() throws IOException {
1144     return this.getBloomFilterMetadata(BlockType.GENERAL_BLOOM_META);
1145   }
1146 
1147   @Override
1148   public DataInput getDeleteBloomFilterMetadata() throws IOException {
1149     return this.getBloomFilterMetadata(BlockType.DELETE_FAMILY_BLOOM_META);
1150   }
1151 
1152   private DataInput getBloomFilterMetadata(BlockType blockType)
1153   throws IOException {
1154     if (blockType != BlockType.GENERAL_BLOOM_META &&
1155         blockType != BlockType.DELETE_FAMILY_BLOOM_META) {
1156       throw new RuntimeException("Block Type: " + blockType.toString() +
1157           " is not supported") ;
1158     }
1159 
1160     for (HFileBlock b : loadOnOpenBlocks)
1161       if (b.getBlockType() == blockType)
1162         return b.getByteStream();
1163     return null;
1164   }
1165 
1166   @Override
1167   public boolean isFileInfoLoaded() {
1168     return true; // We load file info in constructor in version 2.
1169   }
1170 
1171   /**
1172    * Validates that the minor version is within acceptable limits.
1173    * Otherwise throws an Runtime exception
1174    */
1175   private void validateMinorVersion(Path path, int minorVersion) {
1176     if (minorVersion < MIN_MINOR_VERSION ||
1177         minorVersion > MAX_MINOR_VERSION) {
1178       String msg = "Minor version for path " + path + 
1179                    " is expected to be between " +
1180                    MIN_MINOR_VERSION + " and " + MAX_MINOR_VERSION +
1181                    " but is found to be " + minorVersion;
1182       LOG.error(msg);
1183       throw new RuntimeException(msg);
1184     }
1185   }
1186 
1187   @Override
1188   public int getMajorVersion() {
1189     return 2;
1190   }
1191 
1192   @Override
1193   public HFileContext getFileContext() {
1194     return hfileContext;
1195   }
1196 }