1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.io.hfile;
20  
21  import java.io.DataInput;
22  import java.io.IOException;
23  import java.nio.ByteBuffer;
24  import java.util.ArrayList;
25  import java.util.List;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.classification.InterfaceAudience;
30  import org.apache.hadoop.fs.FSDataInputStream;
31  import org.apache.hadoop.fs.Path;
32  import org.apache.hadoop.hbase.HConstants;
33  import org.apache.hadoop.hbase.KeyValue;
34  import org.apache.hadoop.hbase.fs.HFileSystem;
35  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
36  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
37  import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
38  import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
39  import org.apache.hadoop.hbase.util.Bytes;
40  import org.apache.hadoop.hbase.util.IdLock;
41  import org.apache.hadoop.io.WritableUtils;
42  
43  /**
44   * {@link HFile} reader for version 2.
45   */
46  @InterfaceAudience.Private
47  public class HFileReaderV2 extends AbstractHFileReader {
48  
49    private static final Log LOG = LogFactory.getLog(HFileReaderV2.class);
50  
51    /**
52     * The size of a (key length, value length) tuple that prefixes each entry in
53     * a data block.
54     */
55    private static int KEY_VALUE_LEN_SIZE = 2 * Bytes.SIZEOF_INT;
56  
57    private boolean includesMemstoreTS = false;
58    private boolean decodeMemstoreTS = false;
59  
60    private boolean shouldIncludeMemstoreTS() {
61      return includesMemstoreTS;
62    }
63  
64    /** Filesystem-level block reader. */
65    private HFileBlock.FSReader fsBlockReader;
66  
67    /**
68     * A "sparse lock" implementation allowing to lock on a particular block
69     * identified by offset. The purpose of this is to avoid two clients loading
70     * the same block, and have all but one client wait to get the block from the
71     * cache.
72     */
73    private IdLock offsetLock = new IdLock();
74  
75    /**
76     * Blocks read from the load-on-open section, excluding data root index, meta
77     * index, and file info.
78     */
79    private List<HFileBlock> loadOnOpenBlocks = new ArrayList<HFileBlock>();
80  
81    /** Minimum minor version supported by this HFile format */
82    static final int MIN_MINOR_VERSION = 0;
83  
84    /** Maximum minor version supported by this HFile format */
85    // We went to version 2 when we moved to pb'ing fileinfo and the trailer on
86    // the file. This version can read Writables version 1.
87    static final int MAX_MINOR_VERSION = 3;
88  
89    /** Minor versions starting with this number have faked index key */
90    static final int MINOR_VERSION_WITH_FAKED_KEY = 3;
91  
92    /**
93     * Opens a HFile. You must load the index before you can use it by calling
94     * {@link #loadFileInfo()}.
95     *
96     * @param path Path to HFile.
97     * @param trailer File trailer.
98     * @param fsdis input stream.
99     * @param size Length of the stream.
100    * @param cacheConf Cache configuration.
101    * @param preferredEncodingInCache the encoding to use in cache in case we
102    *          have a choice. If the file is already encoded on disk, we will
103    *          still use its on-disk encoding in cache.
104    */
105   public HFileReaderV2(Path path, FixedFileTrailer trailer,
106       final FSDataInputStreamWrapper fsdis, final long size, final CacheConfig cacheConf,
107       DataBlockEncoding preferredEncodingInCache, final HFileSystem hfs)
108       throws IOException {
109     super(path, trailer, size, cacheConf, hfs);
110     trailer.expectMajorVersion(2);
111     validateMinorVersion(path, trailer.getMinorVersion());
112     HFileBlock.FSReaderV2 fsBlockReaderV2 = new HFileBlock.FSReaderV2(fsdis,
113         compressAlgo, fileSize, trailer.getMinorVersion(), hfs, path);
114     this.fsBlockReader = fsBlockReaderV2; // upcast
115 
116     // Comparator class name is stored in the trailer in version 2.
117     comparator = trailer.createComparator();
118     dataBlockIndexReader = new HFileBlockIndex.BlockIndexReader(comparator,
119         trailer.getNumDataIndexLevels(), this);
120     metaBlockIndexReader = new HFileBlockIndex.BlockIndexReader(
121         Bytes.BYTES_RAWCOMPARATOR, 1);
122 
123     // Parse load-on-open data.
124 
125     HFileBlock.BlockIterator blockIter = fsBlockReaderV2.blockRange(
126         trailer.getLoadOnOpenDataOffset(),
127         fileSize - trailer.getTrailerSize());
128 
129     // Data index. We also read statistics about the block index written after
130     // the root level.
131     dataBlockIndexReader.readMultiLevelIndexRoot(
132         blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX),
133         trailer.getDataIndexCount());
134 
135     // Meta index.
136     metaBlockIndexReader.readRootIndex(
137         blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX),
138         trailer.getMetaIndexCount());
139 
140     // File info
141     fileInfo = new FileInfo();
142     fileInfo.read(blockIter.nextBlockWithBlockType(BlockType.FILE_INFO).getByteStream());
143     lastKey = fileInfo.get(FileInfo.LASTKEY);
144     avgKeyLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_KEY_LEN));
145     avgValueLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_VALUE_LEN));
146     byte [] keyValueFormatVersion =
147         fileInfo.get(HFileWriterV2.KEY_VALUE_VERSION);
148     includesMemstoreTS = keyValueFormatVersion != null &&
149         Bytes.toInt(keyValueFormatVersion) ==
150             HFileWriterV2.KEY_VALUE_VER_WITH_MEMSTORE;
151     fsBlockReaderV2.setIncludesMemstoreTS(includesMemstoreTS);
152     if (includesMemstoreTS) {
153       decodeMemstoreTS = Bytes.toLong(fileInfo.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY)) > 0;
154     }
155 
156     // Read data block encoding algorithm name from file info.
157     dataBlockEncoder = HFileDataBlockEncoderImpl.createFromFileInfo(fileInfo,
158         preferredEncodingInCache);
159     fsBlockReaderV2.setDataBlockEncoder(dataBlockEncoder);
160 
161     // Store all other load-on-open blocks for further consumption.
162     HFileBlock b;
163     while ((b = blockIter.nextBlock()) != null) {
164       loadOnOpenBlocks.add(b);
165     }
166   }
167 
168   /**
169    * Create a Scanner on this file. No seeks or reads are done on creation. Call
170    * {@link HFileScanner#seekTo(byte[])} to position an start the read. There is
171    * nothing to clean up in a Scanner. Letting go of your references to the
172    * scanner is sufficient.
173    *
174    * @param cacheBlocks True if we should cache blocks read in by this scanner.
175    * @param pread Use positional read rather than seek+read if true (pread is
176    *          better for random reads, seek+read is better scanning).
177    * @param isCompaction is scanner being used for a compaction?
178    * @return Scanner on this file.
179    */
180    @Override
181    public HFileScanner getScanner(boolean cacheBlocks, final boolean pread,
182       final boolean isCompaction) {
183     // check if we want to use data block encoding in memory
184     if (dataBlockEncoder.useEncodedScanner(isCompaction)) {
185       return new EncodedScannerV2(this, cacheBlocks, pread, isCompaction,
186           includesMemstoreTS);
187     }
188 
189     return new ScannerV2(this, cacheBlocks, pread, isCompaction);
190   }
191 
192   /**
193    * @param metaBlockName
194    * @param cacheBlock Add block to cache, if found
195    * @return block wrapped in a ByteBuffer, with header skipped
196    * @throws IOException
197    */
198   @Override
199   public ByteBuffer getMetaBlock(String metaBlockName, boolean cacheBlock)
200       throws IOException {
201     if (trailer.getMetaIndexCount() == 0) {
202       return null; // there are no meta blocks
203     }
204     if (metaBlockIndexReader == null) {
205       throw new IOException("Meta index not loaded");
206     }
207 
208     byte[] mbname = Bytes.toBytes(metaBlockName);
209     int block = metaBlockIndexReader.rootBlockContainingKey(mbname, 0,
210         mbname.length);
211     if (block == -1)
212       return null;
213     long blockSize = metaBlockIndexReader.getRootBlockDataSize(block);
214     long startTimeNs = System.nanoTime();
215 
216     // Per meta key from any given file, synchronize reads for said block. This
217     // is OK to do for meta blocks because the meta block index is always
218     // single-level.
219     synchronized (metaBlockIndexReader.getRootBlockKey(block)) {
220       // Check cache for block. If found return.
221       long metaBlockOffset = metaBlockIndexReader.getRootBlockOffset(block);
222       BlockCacheKey cacheKey = new BlockCacheKey(name, metaBlockOffset,
223           DataBlockEncoding.NONE, BlockType.META);
224 
225       cacheBlock &= cacheConf.shouldCacheDataOnRead();
226       if (cacheConf.isBlockCacheEnabled()) {
227         HFileBlock cachedBlock =
228           (HFileBlock) cacheConf.getBlockCache().getBlock(cacheKey, cacheBlock, false);
229         if (cachedBlock != null) {
230           // Return a distinct 'shallow copy' of the block,
231           // so pos does not get messed by the scanner
232           return cachedBlock.getBufferWithoutHeader();
233         }
234         // Cache Miss, please load.
235       }
236 
237       HFileBlock metaBlock = fsBlockReader.readBlockData(metaBlockOffset,
238           blockSize, -1, true);
239 
240       final long delta = System.nanoTime() - startTimeNs;
241       HFile.offerReadLatency(delta, true);
242 
243       // Cache the block
244       if (cacheBlock) {
245         cacheConf.getBlockCache().cacheBlock(cacheKey, metaBlock,
246             cacheConf.isInMemory());
247       }
248 
249       return metaBlock.getBufferWithoutHeader();
250     }
251   }
252 
253   /**
254    * Read in a file block.
255    * @param dataBlockOffset offset to read.
256    * @param onDiskBlockSize size of the block
257    * @param cacheBlock
258    * @param pread Use positional read instead of seek+read (positional is
259    *          better doing random reads whereas seek+read is better scanning).
260    * @param isCompaction is this block being read as part of a compaction
261    * @param expectedBlockType the block type we are expecting to read with this
262    *          read operation, or null to read whatever block type is available
263    *          and avoid checking (that might reduce caching efficiency of
264    *          encoded data blocks)
265    * @return Block wrapped in a ByteBuffer.
266    * @throws IOException
267    */
268   @Override
269   public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize,
270       final boolean cacheBlock, boolean pread, final boolean isCompaction,
271       BlockType expectedBlockType)
272       throws IOException {
273     if (dataBlockIndexReader == null) {
274       throw new IOException("Block index not loaded");
275     }
276     if (dataBlockOffset < 0
277         || dataBlockOffset >= trailer.getLoadOnOpenDataOffset()) {
278       throw new IOException("Requested block is out of range: "
279           + dataBlockOffset + ", lastDataBlockOffset: "
280           + trailer.getLastDataBlockOffset());
281     }
282     // For any given block from any given file, synchronize reads for said
283     // block.
284     // Without a cache, this synchronizing is needless overhead, but really
285     // the other choice is to duplicate work (which the cache would prevent you
286     // from doing).
287 
288     BlockCacheKey cacheKey =
289         new BlockCacheKey(name, dataBlockOffset,
290             dataBlockEncoder.getEffectiveEncodingInCache(isCompaction),
291             expectedBlockType);
292 
293     boolean useLock = false;
294     IdLock.Entry lockEntry = null;
295     try {
296       while (true) {
297 
298         if (useLock) {
299           lockEntry = offsetLock.getLockEntry(dataBlockOffset);
300         }
301 
302         // Check cache for block. If found return.
303         if (cacheConf.isBlockCacheEnabled()) {
304           // Try and get the block from the block cache. If the useLock variable is true then this
305           // is the second time through the loop and it should not be counted as a block cache miss.
306           HFileBlock cachedBlock = (HFileBlock) cacheConf.getBlockCache().getBlock(cacheKey,
307               cacheBlock, useLock);
308           if (cachedBlock != null) {
309             if (cachedBlock.getBlockType() == BlockType.DATA) {
310               HFile.dataBlockReadCnt.incrementAndGet();
311             }
312 
313             validateBlockType(cachedBlock, expectedBlockType);
314 
315             // Validate encoding type for encoded blocks. We include encoding
316             // type in the cache key, and we expect it to match on a cache hit.
317             if (cachedBlock.getBlockType() == BlockType.ENCODED_DATA
318                 && cachedBlock.getDataBlockEncoding() != dataBlockEncoder.getEncodingInCache()) {
319               throw new IOException("Cached block under key " + cacheKey + " "
320                   + "has wrong encoding: " + cachedBlock.getDataBlockEncoding() + " (expected: "
321                   + dataBlockEncoder.getEncodingInCache() + ")");
322             }
323             return cachedBlock;
324           }
325           // Carry on, please load.
326         }
327         if (!useLock) {
328           // check cache again with lock
329           useLock = true;
330           continue;
331         }
332 
333         // Load block from filesystem.
334         long startTimeNs = System.nanoTime();
335         HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, -1,
336             pread);
337         hfileBlock = dataBlockEncoder.diskToCacheFormat(hfileBlock, isCompaction);
338         validateBlockType(hfileBlock, expectedBlockType);
339 
340         final long delta = System.nanoTime() - startTimeNs;
341         HFile.offerReadLatency(delta, pread);
342 
343         // Cache the block if necessary
344         if (cacheBlock && cacheConf.shouldCacheBlockOnRead(hfileBlock.getBlockType().getCategory())) {
345           cacheConf.getBlockCache().cacheBlock(cacheKey, hfileBlock, cacheConf.isInMemory());
346         }
347 
348         if (hfileBlock.getBlockType() == BlockType.DATA) {
349           HFile.dataBlockReadCnt.incrementAndGet();
350         }
351 
352         return hfileBlock;
353       }
354     } finally {
355       if (lockEntry != null) {
356         offsetLock.releaseLockEntry(lockEntry);
357       }
358     }
359   }
360 
361   /**
362    * Compares the actual type of a block retrieved from cache or disk with its
363    * expected type and throws an exception in case of a mismatch. Expected
364    * block type of {@link BlockType#DATA} is considered to match the actual
365    * block type [@link {@link BlockType#ENCODED_DATA} as well.
366    * @param block a block retrieved from cache or disk
367    * @param expectedBlockType the expected block type, or null to skip the
368    *          check
369    */
370   private void validateBlockType(HFileBlock block,
371       BlockType expectedBlockType) throws IOException {
372     if (expectedBlockType == null) {
373       return;
374     }
375     BlockType actualBlockType = block.getBlockType();
376     if (actualBlockType == BlockType.ENCODED_DATA &&
377         expectedBlockType == BlockType.DATA) {
378       // We consider DATA to match ENCODED_DATA for the purpose of this
379       // verification.
380       return;
381     }
382     if (actualBlockType != expectedBlockType) {
383       throw new IOException("Expected block type " + expectedBlockType + ", " +
384           "but got " + actualBlockType + ": " + block);
385     }
386   }
387 
388   /**
389    * @return Last key in the file. May be null if file has no entries. Note that
390    *         this is not the last row key, but rather the byte form of the last
391    *         KeyValue.
392    */
393   @Override
394   public byte[] getLastKey() {
395     return dataBlockIndexReader.isEmpty() ? null : lastKey;
396   }
397 
398   /**
399    * @return Midkey for this file. We work with block boundaries only so
400    *         returned midkey is an approximation only.
401    * @throws IOException
402    */
403   @Override
404   public byte[] midkey() throws IOException {
405     return dataBlockIndexReader.midkey();
406   }
407 
408   @Override
409   public void close() throws IOException {
410     close(cacheConf.shouldEvictOnClose());
411   }
412 
413   public void close(boolean evictOnClose) throws IOException {
414     if (evictOnClose && cacheConf.isBlockCacheEnabled()) {
415       int numEvicted = cacheConf.getBlockCache().evictBlocksByHfileName(name);
416       if (LOG.isTraceEnabled()) {
417         LOG.trace("On close, file=" + name + " evicted=" + numEvicted
418           + " block(s)");
419       }
420     }
421     fsBlockReader.closeStreams();
422   }
423 
424   /** For testing */
425   @Override
426   HFileBlock.FSReader getUncachedBlockReader() {
427     return fsBlockReader;
428   }
429 
430 
431   protected abstract static class AbstractScannerV2
432       extends AbstractHFileReader.Scanner {
433     protected HFileBlock block;
434 
435     /**
436      * The next indexed key is to keep track of the indexed key of the next data block.
437      * If the nextIndexedKey is HConstants.NO_NEXT_INDEXED_KEY, it means that the
438      * current data block is the last data block.
439      *
440      * If the nextIndexedKey is null, it means the nextIndexedKey has not been loaded yet.
441      */
442     protected byte[] nextIndexedKey;
443 
444     public AbstractScannerV2(HFileReaderV2 r, boolean cacheBlocks,
445         final boolean pread, final boolean isCompaction) {
446       super(r, cacheBlocks, pread, isCompaction);
447     }
448 
449     /**
450      * An internal API function. Seek to the given key, optionally rewinding to
451      * the first key of the block before doing the seek.
452      *
453      * @param key key byte array
454      * @param offset key offset in the key byte array
455      * @param length key length
456      * @param rewind whether to rewind to the first key of the block before
457      *        doing the seek. If this is false, we are assuming we never go
458      *        back, otherwise the result is undefined.
459      * @return -1 if the key is earlier than the first key of the file,
460      *         0 if we are at the given key, 1 if we are past the given key
461      *         -2 if the key is earlier than the first key of the file while
462      *         using a faked index key
463      * @throws IOException
464      */
465     protected int seekTo(byte[] key, int offset, int length, boolean rewind)
466         throws IOException {
467       HFileBlockIndex.BlockIndexReader indexReader =
468           reader.getDataBlockIndexReader();
469       BlockWithScanInfo blockWithScanInfo =
470         indexReader.loadDataBlockWithScanInfo(key, offset, length, block,
471             cacheBlocks, pread, isCompaction);
472       if (blockWithScanInfo == null || blockWithScanInfo.getHFileBlock() == null) {
473         // This happens if the key e.g. falls before the beginning of the file.
474         return -1;
475       }
476       return loadBlockAndSeekToKey(blockWithScanInfo.getHFileBlock(),
477           blockWithScanInfo.getNextIndexedKey(), rewind, key, offset, length, false);
478     }
479 
480     protected abstract ByteBuffer getFirstKeyInBlock(HFileBlock curBlock);
481 
482     protected abstract int loadBlockAndSeekToKey(HFileBlock seekToBlock, byte[] nextIndexedKey,
483         boolean rewind, byte[] key, int offset, int length, boolean seekBefore)
484         throws IOException;
485 
486     @Override
487     public int seekTo(byte[] key, int offset, int length) throws IOException {
488       // Always rewind to the first key of the block, because the given key
489       // might be before or after the current key.
490       return seekTo(key, offset, length, true);
491     }
492 
493     @Override
494     public int reseekTo(byte[] key, int offset, int length) throws IOException {
495       int compared;
496       if (isSeeked()) {
497         ByteBuffer bb = getKey();
498         compared = reader.getComparator().compare(key, offset,
499             length, bb.array(), bb.arrayOffset(), bb.limit());
500         if (compared < 1) {
501           // If the required key is less than or equal to current key, then
502           // don't do anything.
503           return compared;
504         } else {
505           if (this.nextIndexedKey != null &&
506               (this.nextIndexedKey == HConstants.NO_NEXT_INDEXED_KEY ||
507                reader.getComparator().compare(key, offset, length,
508                    nextIndexedKey, 0, nextIndexedKey.length) < 0)) {
509             // The reader shall continue to scan the current data block instead of querying the
510             // block index as long as it knows the target key is strictly smaller than
511             // the next indexed key or the current data block is the last data block.
512             return loadBlockAndSeekToKey(this.block, this.nextIndexedKey,
513                 false, key, offset, length, false);
514           }
515         }
516       }
517       // Don't rewind on a reseek operation, because reseek implies that we are
518       // always going forward in the file.
519       return seekTo(key, offset, length, false);
520     }
521 
522     @Override
523     public boolean seekBefore(byte[] key, int offset, int length)
524         throws IOException {
525       HFileBlock seekToBlock =
526           reader.getDataBlockIndexReader().seekToDataBlock(key, offset, length,
527               block, cacheBlocks, pread, isCompaction);
528       if (seekToBlock == null) {
529         return false;
530       }
531       ByteBuffer firstKey = getFirstKeyInBlock(seekToBlock);
532 
533       if (reader.getComparator().compare(firstKey.array(),
534           firstKey.arrayOffset(), firstKey.limit(), key, offset, length) == 0)
535       {
536         long previousBlockOffset = seekToBlock.getPrevBlockOffset();
537         // The key we are interested in
538         if (previousBlockOffset == -1) {
539           // we have a 'problem', the key we want is the first of the file.
540           return false;
541         }
542 
543         // It is important that we compute and pass onDiskSize to the block
544         // reader so that it does not have to read the header separately to
545         // figure out the size.
546         seekToBlock = reader.readBlock(previousBlockOffset,
547             seekToBlock.getOffset() - previousBlockOffset, cacheBlocks,
548             pread, isCompaction, BlockType.DATA);
549         // TODO shortcut: seek forward in this block to the last key of the
550         // block.
551       }
552       byte[] firstKeyInCurrentBlock = Bytes.getBytes(firstKey);
553       loadBlockAndSeekToKey(seekToBlock, firstKeyInCurrentBlock, true, key, offset, length, true);
554       return true;
555     }
556 
557 
558     /**
559      * Scans blocks in the "scanned" section of the {@link HFile} until the next
560      * data block is found.
561      *
562      * @return the next block, or null if there are no more data blocks
563      * @throws IOException
564      */
565     protected HFileBlock readNextDataBlock() throws IOException {
566       long lastDataBlockOffset = reader.getTrailer().getLastDataBlockOffset();
567       if (block == null)
568         return null;
569 
570       HFileBlock curBlock = block;
571 
572       do {
573         if (curBlock.getOffset() >= lastDataBlockOffset)
574           return null;
575 
576         if (curBlock.getOffset() < 0) {
577           throw new IOException("Invalid block file offset: " + block);
578         }
579 
580         // We are reading the next block without block type validation, because
581         // it might turn out to be a non-data block.
582         curBlock = reader.readBlock(curBlock.getOffset()
583             + curBlock.getOnDiskSizeWithHeader(),
584             curBlock.getNextBlockOnDiskSizeWithHeader(), cacheBlocks, pread,
585             isCompaction, null);
586       } while (!(curBlock.getBlockType().equals(BlockType.DATA) ||
587           curBlock.getBlockType().equals(BlockType.ENCODED_DATA)));
588 
589       return curBlock;
590     }
591   }
592 
593   /**
594    * Implementation of {@link HFileScanner} interface.
595    */
596   protected static class ScannerV2 extends AbstractScannerV2 {
597     private HFileReaderV2 reader;
598 
599     public ScannerV2(HFileReaderV2 r, boolean cacheBlocks,
600         final boolean pread, final boolean isCompaction) {
601       super(r, cacheBlocks, pread, isCompaction);
602       this.reader = r;
603     }
604 
605     @Override
606     public KeyValue getKeyValue() {
607       if (!isSeeked())
608         return null;
609 
610       KeyValue ret = new KeyValue(blockBuffer.array(),
611           blockBuffer.arrayOffset() + blockBuffer.position(),
612           KEY_VALUE_LEN_SIZE + currKeyLen + currValueLen,
613           currKeyLen);
614       if (this.reader.shouldIncludeMemstoreTS()) {
615         ret.setMemstoreTS(currMemstoreTS);
616       }
617       return ret;
618     }
619 
620     @Override
621     public ByteBuffer getKey() {
622       assertSeeked();
623       return ByteBuffer.wrap(
624           blockBuffer.array(),
625           blockBuffer.arrayOffset() + blockBuffer.position()
626               + KEY_VALUE_LEN_SIZE, currKeyLen).slice();
627     }
628 
629     @Override
630     public ByteBuffer getValue() {
631       assertSeeked();
632       return ByteBuffer.wrap(
633           blockBuffer.array(),
634           blockBuffer.arrayOffset() + blockBuffer.position()
635               + KEY_VALUE_LEN_SIZE + currKeyLen, currValueLen).slice();
636     }
637 
638     private void setNonSeekedState() {
639       block = null;
640       blockBuffer = null;
641       currKeyLen = 0;
642       currValueLen = 0;
643       currMemstoreTS = 0;
644       currMemstoreTSLen = 0;
645     }
646 
647     /**
648      * Go to the next key/value in the block section. Loads the next block if
649      * necessary. If successful, {@link #getKey()} and {@link #getValue()} can
650      * be called.
651      *
652      * @return true if successfully navigated to the next key/value
653      */
654     @Override
655     public boolean next() throws IOException {
656       assertSeeked();
657 
658       try {
659         blockBuffer.position(blockBuffer.position() + KEY_VALUE_LEN_SIZE
660             + currKeyLen + currValueLen + currMemstoreTSLen);
661       } catch (IllegalArgumentException e) {
662         LOG.error("Current pos = " + blockBuffer.position()
663             + "; currKeyLen = " + currKeyLen + "; currValLen = "
664             + currValueLen + "; block limit = " + blockBuffer.limit()
665             + "; HFile name = " + reader.getName()
666             + "; currBlock currBlockOffset = " + block.getOffset());
667         throw e;
668       }
669 
670       if (blockBuffer.remaining() <= 0) {
671         long lastDataBlockOffset =
672             reader.getTrailer().getLastDataBlockOffset();
673 
674         if (block.getOffset() >= lastDataBlockOffset) {
675           setNonSeekedState();
676           return false;
677         }
678 
679         // read the next block
680         HFileBlock nextBlock = readNextDataBlock();
681         if (nextBlock == null) {
682           setNonSeekedState();
683           return false;
684         }
685 
686         updateCurrBlock(nextBlock);
687         return true;
688       }
689 
690       // We are still in the same block.
691       readKeyValueLen();
692       return true;
693     }
694 
695     /**
696      * Positions this scanner at the start of the file.
697      *
698      * @return false if empty file; i.e. a call to next would return false and
699      *         the current key and value are undefined.
700      * @throws IOException
701      */
702     @Override
703     public boolean seekTo() throws IOException {
704       if (reader == null) {
705         return false;
706       }
707 
708       if (reader.getTrailer().getEntryCount() == 0) {
709         // No data blocks.
710         return false;
711       }
712 
713       long firstDataBlockOffset =
714           reader.getTrailer().getFirstDataBlockOffset();
715       if (block != null && block.getOffset() == firstDataBlockOffset) {
716         blockBuffer.rewind();
717         readKeyValueLen();
718         return true;
719       }
720 
721       block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread,
722           isCompaction, BlockType.DATA);
723       if (block.getOffset() < 0) {
724         throw new IOException("Invalid block offset: " + block.getOffset());
725       }
726       updateCurrBlock(block);
727       return true;
728     }
729 
730     @Override
731     protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, byte[] nextIndexedKey,
732         boolean rewind, byte[] key, int offset, int length, boolean seekBefore)
733         throws IOException {
734       if (block == null || block.getOffset() != seekToBlock.getOffset()) {
735         updateCurrBlock(seekToBlock);
736       } else if (rewind) {
737         blockBuffer.rewind();
738       }
739 
740       // Update the nextIndexedKey
741       this.nextIndexedKey = nextIndexedKey;
742       return blockSeek(key, offset, length, seekBefore);
743     }
744 
745     /**
746      * Updates the current block to be the given {@link HFileBlock}. Seeks to
747      * the the first key/value pair.
748      *
749      * @param newBlock the block to make current
750      */
751     private void updateCurrBlock(HFileBlock newBlock) {
752       block = newBlock;
753 
754       // sanity check
755       if (block.getBlockType() != BlockType.DATA) {
756         throw new IllegalStateException("ScannerV2 works only on data " +
757             "blocks, got " + block.getBlockType() + "; " +
758             "fileName=" + reader.name + ", " +
759             "dataBlockEncoder=" + reader.dataBlockEncoder + ", " +
760             "isCompaction=" + isCompaction);
761       }
762 
763       blockBuffer = block.getBufferWithoutHeader();
764       readKeyValueLen();
765       blockFetches++;
766 
767       // Reset the next indexed key
768       this.nextIndexedKey = null;
769     }
770 
771     private final void readKeyValueLen() {
772       blockBuffer.mark();
773       currKeyLen = blockBuffer.getInt();
774       currValueLen = blockBuffer.getInt();
775       blockBuffer.reset();
776       if (this.reader.shouldIncludeMemstoreTS()) {
777         if (this.reader.decodeMemstoreTS) {
778           try {
779             int memstoreTSOffset = blockBuffer.arrayOffset()
780                 + blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen
781                 + currValueLen;
782             currMemstoreTS = Bytes.readVLong(blockBuffer.array(),
783                 memstoreTSOffset);
784             currMemstoreTSLen = WritableUtils.getVIntSize(currMemstoreTS);
785           } catch (Exception e) {
786             throw new RuntimeException("Error reading memstore timestamp", e);
787           }
788         } else {
789           currMemstoreTS = 0;
790           currMemstoreTSLen = 1;
791         }
792       }
793 
794       if (currKeyLen < 0 || currValueLen < 0
795           || currKeyLen > blockBuffer.limit()
796           || currValueLen > blockBuffer.limit()) {
797         throw new IllegalStateException("Invalid currKeyLen " + currKeyLen
798             + " or currValueLen " + currValueLen + ". Block offset: "
799             + block.getOffset() + ", block length: " + blockBuffer.limit()
800             + ", position: " + blockBuffer.position() + " (without header).");
801       }
802     }
803 
804     /**
805      * Within a loaded block, seek looking for the last key that is smaller
806      * than (or equal to?) the key we are interested in.
807      *
808      * A note on the seekBefore: if you have seekBefore = true, AND the first
809      * key in the block = key, then you'll get thrown exceptions. The caller has
810      * to check for that case and load the previous block as appropriate.
811      *
812      * @param key the key to find
813      * @param seekBefore find the key before the given key in case of exact
814      *          match.
815      * @return 0 in case of an exact key match, 1 in case of an inexact match,
816      *         -2 in case of an inexact match and furthermore, the input key less
817      *         than the first key of current block(e.g. using a faked index key)
818      */
819     private int blockSeek(byte[] key, int offset, int length,
820         boolean seekBefore) {
821       int klen, vlen;
822       long memstoreTS = 0;
823       int memstoreTSLen = 0;
824       int lastKeyValueSize = -1;
825       do {
826         blockBuffer.mark();
827         klen = blockBuffer.getInt();
828         vlen = blockBuffer.getInt();
829         blockBuffer.reset();
830         if (this.reader.shouldIncludeMemstoreTS()) {
831           if (this.reader.decodeMemstoreTS) {
832             try {
833               int memstoreTSOffset = blockBuffer.arrayOffset()
834                   + blockBuffer.position() + KEY_VALUE_LEN_SIZE + klen + vlen;
835               memstoreTS = Bytes.readVLong(blockBuffer.array(),
836                   memstoreTSOffset);
837               memstoreTSLen = WritableUtils.getVIntSize(memstoreTS);
838             } catch (Exception e) {
839               throw new RuntimeException("Error reading memstore timestamp", e);
840             }
841           } else {
842             memstoreTS = 0;
843             memstoreTSLen = 1;
844           }
845         }
846 
847         int keyOffset = blockBuffer.arrayOffset() + blockBuffer.position()
848             + KEY_VALUE_LEN_SIZE;
849         int comp = reader.getComparator().compare(key, offset, length,
850             blockBuffer.array(), keyOffset, klen);
851 
852         if (comp == 0) {
853           if (seekBefore) {
854             if (lastKeyValueSize < 0) {
855               throw new IllegalStateException("blockSeek with seekBefore "
856                   + "at the first key of the block: key="
857                   + Bytes.toStringBinary(key) + ", blockOffset="
858                   + block.getOffset() + ", onDiskSize="
859                   + block.getOnDiskSizeWithHeader());
860             }
861             blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
862             readKeyValueLen();
863             return 1; // non exact match.
864           }
865           currKeyLen = klen;
866           currValueLen = vlen;
867           if (this.reader.shouldIncludeMemstoreTS()) {
868             currMemstoreTS = memstoreTS;
869             currMemstoreTSLen = memstoreTSLen;
870           }
871           return 0; // indicate exact match
872         } else if (comp < 0) {
873           if (lastKeyValueSize > 0)
874             blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
875           readKeyValueLen();
876           if (lastKeyValueSize == -1 && blockBuffer.position() == 0
877               && this.reader.trailer.getMinorVersion() >= MINOR_VERSION_WITH_FAKED_KEY) {
878             return HConstants.INDEX_KEY_MAGIC;
879           }
880           return 1;
881         }
882 
883         // The size of this key/value tuple, including key/value length fields.
884         lastKeyValueSize = klen + vlen + memstoreTSLen + KEY_VALUE_LEN_SIZE;
885         blockBuffer.position(blockBuffer.position() + lastKeyValueSize);
886       } while (blockBuffer.remaining() > 0);
887 
888       // Seek to the last key we successfully read. This will happen if this is
889       // the last key/value pair in the file, in which case the following call
890       // to next() has to return false.
891       blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
892       readKeyValueLen();
893       return 1; // didn't exactly find it.
894     }
895 
896     @Override
897     protected ByteBuffer getFirstKeyInBlock(HFileBlock curBlock) {
898       ByteBuffer buffer = curBlock.getBufferWithoutHeader();
899       // It is safe to manipulate this buffer because we own the buffer object.
900       buffer.rewind();
901       int klen = buffer.getInt();
902       buffer.getInt();
903       ByteBuffer keyBuff = buffer.slice();
904       keyBuff.limit(klen);
905       keyBuff.rewind();
906       return keyBuff;
907     }
908 
909     @Override
910     public String getKeyString() {
911       return Bytes.toStringBinary(blockBuffer.array(),
912           blockBuffer.arrayOffset() + blockBuffer.position()
913               + KEY_VALUE_LEN_SIZE, currKeyLen);
914     }
915 
916     @Override
917     public String getValueString() {
918       return Bytes.toString(blockBuffer.array(), blockBuffer.arrayOffset()
919           + blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen,
920           currValueLen);
921     }
922   }
923 
924   /**
925    * ScannerV2 that operates on encoded data blocks.
926    */
927   protected static class EncodedScannerV2 extends AbstractScannerV2 {
928     private DataBlockEncoder.EncodedSeeker seeker = null;
929     private DataBlockEncoder dataBlockEncoder = null;
930     private final boolean includesMemstoreTS;
931 
932     public EncodedScannerV2(HFileReaderV2 reader, boolean cacheBlocks,
933         boolean pread, boolean isCompaction, boolean includesMemstoreTS) {
934       super(reader, cacheBlocks, pread, isCompaction);
935       this.includesMemstoreTS = includesMemstoreTS;
936     }
937 
938     private void setDataBlockEncoder(DataBlockEncoder dataBlockEncoder) {
939       this.dataBlockEncoder = dataBlockEncoder;
940       seeker = dataBlockEncoder.createSeeker(reader.getComparator(),
941           includesMemstoreTS);
942     }
943 
944     /**
945      * Updates the current block to be the given {@link HFileBlock}. Seeks to
946      * the the first key/value pair.
947      *
948      * @param newBlock the block to make current
949      */
950     private void updateCurrentBlock(HFileBlock newBlock) {
951       block = newBlock;
952 
953       // sanity checks
954       if (block.getBlockType() != BlockType.ENCODED_DATA) {
955         throw new IllegalStateException(
956             "EncodedScannerV2 works only on encoded data blocks");
957       }
958 
959       short dataBlockEncoderId = block.getDataBlockEncodingId();
960       if (dataBlockEncoder == null ||
961           !DataBlockEncoding.isCorrectEncoder(dataBlockEncoder,
962               dataBlockEncoderId)) {
963         DataBlockEncoder encoder =
964             DataBlockEncoding.getDataBlockEncoderById(dataBlockEncoderId);
965         setDataBlockEncoder(encoder);
966       }
967 
968       seeker.setCurrentBuffer(getEncodedBuffer(newBlock));
969       blockFetches++;
970     }
971 
972     private ByteBuffer getEncodedBuffer(HFileBlock newBlock) {
973       ByteBuffer origBlock = newBlock.getBufferReadOnly();
974       ByteBuffer encodedBlock = ByteBuffer.wrap(origBlock.array(),
975           origBlock.arrayOffset() + newBlock.headerSize() +
976           DataBlockEncoding.ID_SIZE,
977           newBlock.getUncompressedSizeWithoutHeader() -
978           DataBlockEncoding.ID_SIZE).slice();
979       return encodedBlock;
980     }
981 
982     @Override
983     public boolean seekTo() throws IOException {
984       if (reader == null) {
985         return false;
986       }
987 
988       if (reader.getTrailer().getEntryCount() == 0) {
989         // No data blocks.
990         return false;
991       }
992 
993       long firstDataBlockOffset =
994           reader.getTrailer().getFirstDataBlockOffset();
995       if (block != null && block.getOffset() == firstDataBlockOffset) {
996         seeker.rewind();
997         return true;
998       }
999 
1000       block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread,
1001           isCompaction, BlockType.DATA);
1002       if (block.getOffset() < 0) {
1003         throw new IOException("Invalid block offset: " + block.getOffset());
1004       }
1005       updateCurrentBlock(block);
1006       return true;
1007     }
1008 
1009     @Override
1010     public boolean next() throws IOException {
1011       boolean isValid = seeker.next();
1012       if (!isValid) {
1013         block = readNextDataBlock();
1014         isValid = block != null;
1015         if (isValid) {
1016           updateCurrentBlock(block);
1017         }
1018       }
1019       return isValid;
1020     }
1021 
1022     @Override
1023     public ByteBuffer getKey() {
1024       assertValidSeek();
1025       return seeker.getKeyDeepCopy();
1026     }
1027 
1028     @Override
1029     public ByteBuffer getValue() {
1030       assertValidSeek();
1031       return seeker.getValueShallowCopy();
1032     }
1033 
1034     @Override
1035     public KeyValue getKeyValue() {
1036       if (block == null) {
1037         return null;
1038       }
1039       return seeker.getKeyValue();
1040     }
1041 
1042     @Override
1043     public String getKeyString() {
1044       ByteBuffer keyBuffer = getKey();
1045       return Bytes.toStringBinary(keyBuffer.array(),
1046           keyBuffer.arrayOffset(), keyBuffer.limit());
1047     }
1048 
1049     @Override
1050     public String getValueString() {
1051       ByteBuffer valueBuffer = getValue();
1052       return Bytes.toStringBinary(valueBuffer.array(),
1053           valueBuffer.arrayOffset(), valueBuffer.limit());
1054     }
1055 
1056     private void assertValidSeek() {
1057       if (block == null) {
1058         throw new NotSeekedException();
1059       }
1060     }
1061 
1062     @Override
1063     protected ByteBuffer getFirstKeyInBlock(HFileBlock curBlock) {
1064       return dataBlockEncoder.getFirstKeyInBlock(getEncodedBuffer(curBlock));
1065     }
1066 
1067     @Override
1068     protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, byte[] nextIndexedKey,
1069         boolean rewind, byte[] key, int offset, int length, boolean seekBefore)
1070         throws IOException  {
1071       if (block == null || block.getOffset() != seekToBlock.getOffset()) {
1072         updateCurrentBlock(seekToBlock);
1073       } else if (rewind) {
1074         seeker.rewind();
1075       }
1076       this.nextIndexedKey = nextIndexedKey;
1077       return seeker.seekToKeyInBlock(key, offset, length, seekBefore);
1078     }
1079   }
1080 
1081   /**
1082    * Returns a buffer with the Bloom filter metadata. The caller takes
1083    * ownership of the buffer.
1084    */
1085   @Override
1086   public DataInput getGeneralBloomFilterMetadata() throws IOException {
1087     return this.getBloomFilterMetadata(BlockType.GENERAL_BLOOM_META);
1088   }
1089 
1090   @Override
1091   public DataInput getDeleteBloomFilterMetadata() throws IOException {
1092     return this.getBloomFilterMetadata(BlockType.DELETE_FAMILY_BLOOM_META);
1093   }
1094 
1095   private DataInput getBloomFilterMetadata(BlockType blockType)
1096   throws IOException {
1097     if (blockType != BlockType.GENERAL_BLOOM_META &&
1098         blockType != BlockType.DELETE_FAMILY_BLOOM_META) {
1099       throw new RuntimeException("Block Type: " + blockType.toString() +
1100           " is not supported") ;
1101     }
1102 
1103     for (HFileBlock b : loadOnOpenBlocks)
1104       if (b.getBlockType() == blockType)
1105         return b.getByteStream();
1106     return null;
1107   }
1108 
1109   @Override
1110   public boolean isFileInfoLoaded() {
1111     return true; // We load file info in constructor in version 2.
1112   }
1113 
1114   /**
1115    * Validates that the minor version is within acceptable limits.
1116    * Otherwise throws an Runtime exception
1117    */
1118   private void validateMinorVersion(Path path, int minorVersion) {
1119     if (minorVersion < MIN_MINOR_VERSION ||
1120         minorVersion > MAX_MINOR_VERSION) {
1121       String msg = "Minor version for path " + path + 
1122                    " is expected to be between " +
1123                    MIN_MINOR_VERSION + " and " + MAX_MINOR_VERSION +
1124                    " but is found to be " + minorVersion;
1125       LOG.error(msg);
1126       throw new RuntimeException(msg);
1127     }
1128   }
1129 }