View Javadoc

1   /*
2    * Copyright 2011 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.io.hfile;
21  
22  import java.io.DataInput;
23  import java.io.IOException;
24  import java.nio.ByteBuffer;
25  import java.util.ArrayList;
26  import java.util.List;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.fs.FSDataInputStream;
31  import org.apache.hadoop.fs.Path;
32  import org.apache.hadoop.hbase.HConstants;
33  import org.apache.hadoop.hbase.KeyValue;
34  import org.apache.hadoop.hbase.fs.HFileSystem;
35  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
36  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
37  import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
38  import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
39  import org.apache.hadoop.hbase.util.Bytes;
40  import org.apache.hadoop.hbase.util.IdLock;
41  import org.apache.hadoop.io.RawComparator;
42  import org.apache.hadoop.io.WritableUtils;
43  
44  /**
45   * {@link HFile} reader for version 2.
46   */
47  public class HFileReaderV2 extends AbstractHFileReader {
48  
49    private static final Log LOG = LogFactory.getLog(HFileReaderV2.class);
50  
51    /**
52     * The size of a (key length, value length) tuple that prefixes each entry in
53     * a data block.
54     */
55    private static int KEY_VALUE_LEN_SIZE = 2 * Bytes.SIZEOF_INT;
56  
57    private boolean includesMemstoreTS = false;
58    private boolean decodeMemstoreTS = false;
59  
60    private boolean shouldIncludeMemstoreTS() {
61      return includesMemstoreTS;
62    }
63  
64    /**
65     * A "sparse lock" implementation allowing to lock on a particular block
66     * identified by offset. The purpose of this is to avoid two clients loading
67     * the same block, and have all but one client wait to get the block from the
68     * cache.
69     */
70    private IdLock offsetLock = new IdLock();
71  
72    /**
73     * Blocks read from the load-on-open section, excluding data root index, meta
74     * index, and file info.
75     */
76    private List<HFileBlock> loadOnOpenBlocks = new ArrayList<HFileBlock>();
77  
78    /** Minimum minor version supported by this HFile format */
79    static final int MIN_MINOR_VERSION = 0;
80  
81    /** Maximum minor version supported by this HFile format */
82    static final int MAX_MINOR_VERSION = 1;
83  
84    /**
85     * Opens a HFile. You must load the index before you can use it by calling
86     * {@link #loadFileInfo()}.
87     *
88     * @param path Path to HFile.
89     * @param trailer File trailer.
90     * @param fsdis input stream. Caller is responsible for closing the passed
91     *          stream.
92     * @param size Length of the stream.
93     * @param closeIStream Whether to close the stream.
94     * @param cacheConf Cache configuration.
95     * @param preferredEncodingInCache the encoding to use in cache in case we
96     *          have a choice. If the file is already encoded on disk, we will
97     *          still use its on-disk encoding in cache.
98     */
99    public HFileReaderV2(Path path, FixedFileTrailer trailer,
100       final FSDataInputStream fsdis, final FSDataInputStream fsdisNoFsChecksum,
101       final long size,
102       final boolean closeIStream, final CacheConfig cacheConf,
103       DataBlockEncoding preferredEncodingInCache, final HFileSystem hfs)
104       throws IOException {
105     super(path, trailer, fsdis, fsdisNoFsChecksum, size, 
106           closeIStream, cacheConf, hfs);
107     trailer.expectMajorVersion(2);
108     validateMinorVersion(path, trailer.getMinorVersion());
109     HFileBlock.FSReaderV2 fsBlockReaderV2 = new HFileBlock.FSReaderV2(fsdis,
110         fsdisNoFsChecksum,
111         compressAlgo, fileSize, trailer.getMinorVersion(), hfs, path);
112     this.fsBlockReader = fsBlockReaderV2; // upcast
113 
114     // Comparator class name is stored in the trailer in version 2.
115     comparator = trailer.createComparator();
116     dataBlockIndexReader = new HFileBlockIndex.BlockIndexReader(comparator,
117         trailer.getNumDataIndexLevels(), this);
118     metaBlockIndexReader = new HFileBlockIndex.BlockIndexReader(
119         Bytes.BYTES_RAWCOMPARATOR, 1);
120 
121     // Parse load-on-open data.
122 
123     HFileBlock.BlockIterator blockIter = fsBlockReaderV2.blockRange(
124         trailer.getLoadOnOpenDataOffset(),
125         fileSize - trailer.getTrailerSize());
126 
127     // Data index. We also read statistics about the block index written after
128     // the root level.
129     dataBlockIndexReader.readMultiLevelIndexRoot(
130         blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX),
131         trailer.getDataIndexCount());
132 
133     // Meta index.
134     metaBlockIndexReader.readRootIndex(
135         blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX),
136         trailer.getMetaIndexCount());
137 
138     // File info
139     fileInfo = new FileInfo();
140     fileInfo.readFields(blockIter.nextBlockWithBlockType(BlockType.FILE_INFO).getByteStream());
141     lastKey = fileInfo.get(FileInfo.LASTKEY);
142     avgKeyLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_KEY_LEN));
143     avgValueLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_VALUE_LEN));
144     byte [] keyValueFormatVersion =
145         fileInfo.get(HFileWriterV2.KEY_VALUE_VERSION);
146     includesMemstoreTS = keyValueFormatVersion != null &&
147         Bytes.toInt(keyValueFormatVersion) ==
148             HFileWriterV2.KEY_VALUE_VER_WITH_MEMSTORE;
149     fsBlockReaderV2.setIncludesMemstoreTS(includesMemstoreTS);
150     if (includesMemstoreTS) {
151       decodeMemstoreTS = Bytes.toLong(fileInfo.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY)) > 0;
152     }
153 
154     // Read data block encoding algorithm name from file info.
155     dataBlockEncoder = HFileDataBlockEncoderImpl.createFromFileInfo(fileInfo,
156         preferredEncodingInCache);
157     fsBlockReaderV2.setDataBlockEncoder(dataBlockEncoder);
158 
159     // Store all other load-on-open blocks for further consumption.
160     HFileBlock b;
161     while ((b = blockIter.nextBlock()) != null) {
162       loadOnOpenBlocks.add(b);
163     }
164   }
165 
166   /**
167    * Create a Scanner on this file. No seeks or reads are done on creation. Call
168    * {@link HFileScanner#seekTo(byte[])} to position an start the read. There is
169    * nothing to clean up in a Scanner. Letting go of your references to the
170    * scanner is sufficient.
171    *
172    * @param cacheBlocks True if we should cache blocks read in by this scanner.
173    * @param pread Use positional read rather than seek+read if true (pread is
174    *          better for random reads, seek+read is better scanning).
175    * @param isCompaction is scanner being used for a compaction?
176    * @return Scanner on this file.
177    */
178    @Override
179    public HFileScanner getScanner(boolean cacheBlocks, final boolean pread,
180       final boolean isCompaction) {
181     // check if we want to use data block encoding in memory
182     if (dataBlockEncoder.useEncodedScanner(isCompaction)) {
183       return new EncodedScannerV2(this, cacheBlocks, pread, isCompaction,
184           includesMemstoreTS);
185     }
186 
187     return new ScannerV2(this, cacheBlocks, pread, isCompaction);
188   }
189 
190   /**
191    * @param metaBlockName
192    * @param cacheBlock Add block to cache, if found
193    * @return block wrapped in a ByteBuffer, with header skipped
194    * @throws IOException
195    */
196   @Override
197   public ByteBuffer getMetaBlock(String metaBlockName, boolean cacheBlock)
198       throws IOException {
199     if (trailer.getMetaIndexCount() == 0) {
200       return null; // there are no meta blocks
201     }
202     if (metaBlockIndexReader == null) {
203       throw new IOException("Meta index not loaded");
204     }
205 
206     byte[] mbname = Bytes.toBytes(metaBlockName);
207     int block = metaBlockIndexReader.rootBlockContainingKey(mbname, 0,
208         mbname.length);
209     if (block == -1)
210       return null;
211     long blockSize = metaBlockIndexReader.getRootBlockDataSize(block);
212     long startTimeNs = System.nanoTime();
213 
214     // Per meta key from any given file, synchronize reads for said block. This
215     // is OK to do for meta blocks because the meta block index is always
216     // single-level.
217     synchronized (metaBlockIndexReader.getRootBlockKey(block)) {
218       // Check cache for block. If found return.
219       long metaBlockOffset = metaBlockIndexReader.getRootBlockOffset(block);
220       BlockCacheKey cacheKey = new BlockCacheKey(name, metaBlockOffset,
221           DataBlockEncoding.NONE, BlockType.META);
222 
223       cacheBlock &= cacheConf.shouldCacheDataOnRead();
224       if (cacheConf.isBlockCacheEnabled()) {
225         HFileBlock cachedBlock =
226           (HFileBlock) cacheConf.getBlockCache().getBlock(cacheKey, cacheBlock, false);
227         if (cachedBlock != null) {
228           // Return a distinct 'shallow copy' of the block,
229           // so pos does not get messed by the scanner
230           getSchemaMetrics().updateOnCacheHit(BlockCategory.META, false);
231           return cachedBlock.getBufferWithoutHeader();
232         }
233         // Cache Miss, please load.
234       }
235 
236       HFileBlock metaBlock = fsBlockReader.readBlockData(metaBlockOffset,
237           blockSize, -1, true);
238       passSchemaMetricsTo(metaBlock);
239 
240       final long delta = System.nanoTime() - startTimeNs;
241       HFile.offerReadLatency(delta, true);
242       getSchemaMetrics().updateOnCacheMiss(BlockCategory.META, false, delta);
243 
244       // Cache the block
245       if (cacheBlock) {
246         cacheConf.getBlockCache().cacheBlock(cacheKey, metaBlock,
247             cacheConf.isInMemory());
248       }
249 
250       return metaBlock.getBufferWithoutHeader();
251     }
252   }
253 
254   /**
255    * Read in a file block.
256    * @param dataBlockOffset offset to read.
257    * @param onDiskBlockSize size of the block
258    * @param cacheBlock
259    * @param pread Use positional read instead of seek+read (positional is
260    *          better doing random reads whereas seek+read is better scanning).
261    * @param isCompaction is this block being read as part of a compaction
262    * @param expectedBlockType the block type we are expecting to read with this
263    *          read operation, or null to read whatever block type is available
264    *          and avoid checking (that might reduce caching efficiency of
265    *          encoded data blocks)
266    * @return Block wrapped in a ByteBuffer.
267    * @throws IOException
268    */
269   @Override
270   public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize,
271       final boolean cacheBlock, boolean pread, final boolean isCompaction,
272       BlockType expectedBlockType)
273       throws IOException {
274     if (dataBlockIndexReader == null) {
275       throw new IOException("Block index not loaded");
276     }
277     if (dataBlockOffset < 0
278         || dataBlockOffset >= trailer.getLoadOnOpenDataOffset()) {
279       throw new IOException("Requested block is out of range: "
280           + dataBlockOffset + ", lastDataBlockOffset: "
281           + trailer.getLastDataBlockOffset());
282     }
283     // For any given block from any given file, synchronize reads for said
284     // block.
285     // Without a cache, this synchronizing is needless overhead, but really
286     // the other choice is to duplicate work (which the cache would prevent you
287     // from doing).
288 
289     BlockCacheKey cacheKey =
290         new BlockCacheKey(name, dataBlockOffset,
291             dataBlockEncoder.getEffectiveEncodingInCache(isCompaction),
292             expectedBlockType);
293 
294     boolean useLock = false;
295     IdLock.Entry lockEntry = null;
296 
297     try {
298       while (true) {
299 
300         if (useLock) {
301           lockEntry = offsetLock.getLockEntry(dataBlockOffset);
302         }
303 
304         // Check cache for block. If found return.
305         if (cacheConf.isBlockCacheEnabled()) {
306           // Try and get the block from the block cache.  If the useLock variable is true then this
307           // is the second time through the loop and it should not be counted as a block cache miss.
308           HFileBlock cachedBlock = (HFileBlock)
309               cacheConf.getBlockCache().getBlock(cacheKey, cacheBlock, useLock);
310           if (cachedBlock != null) {
311             BlockCategory blockCategory =
312                 cachedBlock.getBlockType().getCategory();
313 
314             getSchemaMetrics().updateOnCacheHit(blockCategory, isCompaction);
315 
316             if (cachedBlock.getBlockType() == BlockType.DATA) {
317               HFile.dataBlockReadCnt.incrementAndGet();
318             }
319 
320             validateBlockType(cachedBlock, expectedBlockType);
321 
322             // Validate encoding type for encoded blocks. We include encoding
323             // type in the cache key, and we expect it to match on a cache hit.
324             if (cachedBlock.getBlockType() == BlockType.ENCODED_DATA &&
325                 cachedBlock.getDataBlockEncoding() !=
326                     dataBlockEncoder.getEncodingInCache()) {
327               throw new IOException("Cached block under key " + cacheKey + " " +
328                   "has wrong encoding: " + cachedBlock.getDataBlockEncoding() +
329                   " (expected: " + dataBlockEncoder.getEncodingInCache() + ")");
330             }
331             return cachedBlock;
332           }
333           // Carry on, please load.
334         }
335         if (!useLock) {
336           // check cache again with lock
337           useLock = true;
338           continue;
339         }
340 
341         // Load block from filesystem.
342         long startTimeNs = System.nanoTime();
343         HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset,
344             onDiskBlockSize, -1, pread);
345         hfileBlock = dataBlockEncoder.diskToCacheFormat(hfileBlock,
346             isCompaction);
347         validateBlockType(hfileBlock, expectedBlockType);
348         passSchemaMetricsTo(hfileBlock);
349         BlockCategory blockCategory = hfileBlock.getBlockType().getCategory();
350 
351         final long delta = System.nanoTime() - startTimeNs;
352         HFile.offerReadLatency(delta, pread);
353         getSchemaMetrics().updateOnCacheMiss(blockCategory, isCompaction, delta);
354 
355         // Cache the block if necessary
356         if (cacheBlock && cacheConf.shouldCacheBlockOnRead(
357             hfileBlock.getBlockType().getCategory())) {
358           cacheConf.getBlockCache().cacheBlock(cacheKey, hfileBlock,
359               cacheConf.isInMemory());
360         }
361 
362         if (hfileBlock.getBlockType() == BlockType.DATA) {
363           HFile.dataBlockReadCnt.incrementAndGet();
364         }
365 
366         return hfileBlock;
367       }
368     } finally {
369       if (lockEntry != null) {
370         offsetLock.releaseLockEntry(lockEntry);
371       }
372     }
373   }
374 
375   @Override
376   public boolean hasMVCCInfo() {
377     return includesMemstoreTS && decodeMemstoreTS;
378   }
379 
380   /**
381    * Compares the actual type of a block retrieved from cache or disk with its
382    * expected type and throws an exception in case of a mismatch. Expected
383    * block type of {@link BlockType#DATA} is considered to match the actual
384    * block type [@link {@link BlockType#ENCODED_DATA} as well.
385    * @param block a block retrieved from cache or disk
386    * @param expectedBlockType the expected block type, or null to skip the
387    *          check
388    */
389   private void validateBlockType(HFileBlock block,
390       BlockType expectedBlockType) throws IOException {
391     if (expectedBlockType == null) {
392       return;
393     }
394     BlockType actualBlockType = block.getBlockType();
395     if (actualBlockType == BlockType.ENCODED_DATA &&
396         expectedBlockType == BlockType.DATA) {
397       // We consider DATA to match ENCODED_DATA for the purpose of this
398       // verification.
399       return;
400     }
401     if (actualBlockType != expectedBlockType) {
402       throw new IOException("Expected block type " + expectedBlockType + ", " +
403           "but got " + actualBlockType + ": " + block);
404     }
405   }
406 
407   /**
408    * @return Last key in the file. May be null if file has no entries. Note that
409    *         this is not the last row key, but rather the byte form of the last
410    *         KeyValue.
411    */
412   @Override
413   public byte[] getLastKey() {
414     return dataBlockIndexReader.isEmpty() ? null : lastKey;
415   }
416 
417   /**
418    * @return Midkey for this file. We work with block boundaries only so
419    *         returned midkey is an approximation only.
420    * @throws IOException
421    */
422   @Override
423   public byte[] midkey() throws IOException {
424     return dataBlockIndexReader.midkey();
425   }
426 
427   @Override
428   public void close() throws IOException {
429     close(cacheConf.shouldEvictOnClose());
430   }
431 
432   public void close(boolean evictOnClose) throws IOException {
433     if (evictOnClose && cacheConf.isBlockCacheEnabled()) {
434       int numEvicted = cacheConf.getBlockCache().evictBlocksByHfileName(name);
435       if (LOG.isTraceEnabled()) {
436         LOG.trace("On close, file=" + name + " evicted=" + numEvicted
437           + " block(s)");
438       }
439     }
440     if (closeIStream) {
441       if (istream != istreamNoFsChecksum && istreamNoFsChecksum != null) {
442         istreamNoFsChecksum.close();
443         istreamNoFsChecksum = null;
444       }
445       if (istream != null) {
446         istream.close();
447         istream = null;
448       }
449     }
450 
451     getSchemaMetrics().flushMetrics();
452   }
453 
454   protected abstract static class AbstractScannerV2
455       extends AbstractHFileReader.Scanner {
456     protected HFileBlock block;
457 
458     /**
459      * The next indexed key is to keep track of the indexed key of the next data block.
460      * If the nextIndexedKey is HConstants.NO_NEXT_INDEXED_KEY, it means that the
461      * current data block is the last data block.
462      *
463      * If the nextIndexedKey is null, it means the nextIndexedKey has not been loaded yet.
464      */
465     protected byte[] nextIndexedKey;
466 
467     public AbstractScannerV2(HFileReaderV2 r, boolean cacheBlocks,
468         final boolean pread, final boolean isCompaction) {
469       super(r, cacheBlocks, pread, isCompaction);
470     }
471 
472     /**
473      * An internal API function. Seek to the given key, optionally rewinding to
474      * the first key of the block before doing the seek.
475      *
476      * @param key key byte array
477      * @param offset key offset in the key byte array
478      * @param length key length
479      * @param rewind whether to rewind to the first key of the block before
480      *        doing the seek. If this is false, we are assuming we never go
481      *        back, otherwise the result is undefined.
482      * @return -1 if the key is earlier than the first key of the file,
483      *         0 if we are at the given key, and 1 if we are past the given key
484      * @throws IOException
485      */
486     protected int seekTo(byte[] key, int offset, int length, boolean rewind)
487         throws IOException {
488       HFileBlockIndex.BlockIndexReader indexReader =
489           reader.getDataBlockIndexReader();
490       BlockWithScanInfo blockWithScanInfo =
491         indexReader.loadDataBlockWithScanInfo(key, offset, length, block,
492             cacheBlocks, pread, isCompaction);
493       if (blockWithScanInfo == null || blockWithScanInfo.getHFileBlock() == null) {
494         // This happens if the key e.g. falls before the beginning of the file.
495         return -1;
496       }
497       return loadBlockAndSeekToKey(blockWithScanInfo.getHFileBlock(),
498           blockWithScanInfo.getNextIndexedKey(), rewind, key, offset, length, false);
499     }
500 
501     protected abstract ByteBuffer getFirstKeyInBlock(HFileBlock curBlock);
502 
503     protected abstract int loadBlockAndSeekToKey(HFileBlock seekToBlock, byte[] nextIndexedKey,
504         boolean rewind, byte[] key, int offset, int length, boolean seekBefore)
505         throws IOException;
506 
507     @Override
508     public int seekTo(byte[] key, int offset, int length) throws IOException {
509       // Always rewind to the first key of the block, because the given key
510       // might be before or after the current key.
511       return seekTo(key, offset, length, true);
512     }
513 
514     @Override
515     public int reseekTo(byte[] key, int offset, int length) throws IOException {
516       int compared;
517       if (isSeeked()) {
518         compared = compareKey(reader.getComparator(), key, offset, length);
519         if (compared < 1) {
520           // If the required key is less than or equal to current key, then
521           // don't do anything.
522           return compared;
523         } else {
524           if (this.nextIndexedKey != null &&
525               (this.nextIndexedKey == HConstants.NO_NEXT_INDEXED_KEY ||
526                reader.getComparator().compare(key, offset, length,
527                    nextIndexedKey, 0, nextIndexedKey.length) < 0)) {
528             // The reader shall continue to scan the current data block instead of querying the
529             // block index as long as it knows the target key is strictly smaller than
530             // the next indexed key or the current data block is the last data block.
531             return loadBlockAndSeekToKey(this.block, this.nextIndexedKey,
532                 false, key, offset, length, false);
533           }
534         }
535       }
536       // Don't rewind on a reseek operation, because reseek implies that we are
537       // always going forward in the file.
538       return seekTo(key, offset, length, false);
539     }
540 
541     @Override
542     public boolean seekBefore(byte[] key, int offset, int length)
543         throws IOException {
544       HFileBlock seekToBlock =
545           reader.getDataBlockIndexReader().seekToDataBlock(key, offset, length,
546               block, cacheBlocks, pread, isCompaction);
547       if (seekToBlock == null) {
548         return false;
549       }
550       ByteBuffer firstKey = getFirstKeyInBlock(seekToBlock);
551 
552       if (reader.getComparator().compare(firstKey.array(),
553           firstKey.arrayOffset(), firstKey.limit(), key, offset, length) == 0)
554       {
555         long previousBlockOffset = seekToBlock.getPrevBlockOffset();
556         // The key we are interested in
557         if (previousBlockOffset == -1) {
558           // we have a 'problem', the key we want is the first of the file.
559           return false;
560         }
561 
562         // It is important that we compute and pass onDiskSize to the block
563         // reader so that it does not have to read the header separately to
564         // figure out the size.
565         seekToBlock = reader.readBlock(previousBlockOffset,
566             seekToBlock.getOffset() - previousBlockOffset, cacheBlocks,
567             pread, isCompaction, BlockType.DATA);
568         // TODO shortcut: seek forward in this block to the last key of the
569         // block.
570       }
571       byte[] firstKeyInCurrentBlock = Bytes.getBytes(firstKey);
572       loadBlockAndSeekToKey(seekToBlock, firstKeyInCurrentBlock, true, key, offset, length, true);
573       return true;
574     }
575 
576 
577     /**
578      * Scans blocks in the "scanned" section of the {@link HFile} until the next
579      * data block is found.
580      *
581      * @return the next block, or null if there are no more data blocks
582      * @throws IOException
583      */
584     protected HFileBlock readNextDataBlock() throws IOException {
585       long lastDataBlockOffset = reader.getTrailer().getLastDataBlockOffset();
586       if (block == null)
587         return null;
588 
589       HFileBlock curBlock = block;
590 
591       do {
592         if (curBlock.getOffset() >= lastDataBlockOffset)
593           return null;
594 
595         if (curBlock.getOffset() < 0) {
596           throw new IOException("Invalid block file offset: " + block);
597         }
598 
599         // We are reading the next block without block type validation, because
600         // it might turn out to be a non-data block.
601         curBlock = reader.readBlock(curBlock.getOffset()
602             + curBlock.getOnDiskSizeWithHeader(),
603             curBlock.getNextBlockOnDiskSizeWithHeader(), cacheBlocks, pread,
604             isCompaction, null);
605       } while (!(curBlock.getBlockType().equals(BlockType.DATA) ||
606           curBlock.getBlockType().equals(BlockType.ENCODED_DATA)));
607 
608       return curBlock;
609     }
610     /**
611      * Compare the given key against the current key
612      * @param comparator
613      * @param key
614      * @param offset
615      * @param length
616      * @return -1 is the passed key is smaller than the current key, 0 if equal and 1 if greater
617      */
618     public abstract int compareKey(RawComparator<byte[]> comparator, byte[] key, int offset,
619         int length);
620   }
621 
622   /**
623    * Implementation of {@link HFileScanner} interface.
624    */
625   protected static class ScannerV2 extends AbstractScannerV2 {
626     private HFileReaderV2 reader;
627 
628     public ScannerV2(HFileReaderV2 r, boolean cacheBlocks,
629         final boolean pread, final boolean isCompaction) {
630       super(r, cacheBlocks, pread, isCompaction);
631       this.reader = r;
632     }
633 
634     @Override
635     public KeyValue getKeyValue() {
636       if (!isSeeked())
637         return null;
638 
639       KeyValue ret = new KeyValue(blockBuffer.array(),
640           blockBuffer.arrayOffset() + blockBuffer.position(),
641           KEY_VALUE_LEN_SIZE + currKeyLen + currValueLen);
642       if (this.reader.shouldIncludeMemstoreTS()) {
643         ret.setMemstoreTS(currMemstoreTS);
644       }
645       return ret;
646     }
647 
648     @Override
649     public ByteBuffer getKey() {
650       assertSeeked();
651       return ByteBuffer.wrap(
652           blockBuffer.array(),
653           blockBuffer.arrayOffset() + blockBuffer.position()
654               + KEY_VALUE_LEN_SIZE, currKeyLen).slice();
655     }
656 
657     @Override
658     public int compareKey(RawComparator<byte []> comparator, byte[] key, int offset, int length) {
659       return comparator.compare(key, offset, length, blockBuffer.array(), blockBuffer.arrayOffset()
660           + blockBuffer.position() + KEY_VALUE_LEN_SIZE, currKeyLen);
661       }
662 
663     @Override
664     public ByteBuffer getValue() {
665       assertSeeked();
666       return ByteBuffer.wrap(
667           blockBuffer.array(),
668           blockBuffer.arrayOffset() + blockBuffer.position()
669               + KEY_VALUE_LEN_SIZE + currKeyLen, currValueLen).slice();
670     }
671 
672     private void setNonSeekedState() {
673       block = null;
674       blockBuffer = null;
675       currKeyLen = 0;
676       currValueLen = 0;
677       currMemstoreTS = 0;
678       currMemstoreTSLen = 0;
679     }
680 
681     /**
682      * Go to the next key/value in the block section. Loads the next block if
683      * necessary. If successful, {@link #getKey()} and {@link #getValue()} can
684      * be called.
685      *
686      * @return true if successfully navigated to the next key/value
687      */
688     @Override
689     public boolean next() throws IOException {
690       assertSeeked();
691 
692       try {
693         blockBuffer.position(blockBuffer.position() + KEY_VALUE_LEN_SIZE
694             + currKeyLen + currValueLen + currMemstoreTSLen);
695       } catch (IllegalArgumentException e) {
696         LOG.error("Current pos = " + blockBuffer.position()
697             + "; currKeyLen = " + currKeyLen + "; currValLen = "
698             + currValueLen + "; block limit = " + blockBuffer.limit()
699             + "; HFile name = " + reader.getName()
700             + "; currBlock currBlockOffset = " + block.getOffset());
701         throw e;
702       }
703 
704       if (blockBuffer.remaining() <= 0) {
705         long lastDataBlockOffset =
706             reader.getTrailer().getLastDataBlockOffset();
707 
708         if (block.getOffset() >= lastDataBlockOffset) {
709           setNonSeekedState();
710           return false;
711         }
712 
713         // read the next block
714         HFileBlock nextBlock = readNextDataBlock();
715         if (nextBlock == null) {
716           setNonSeekedState();
717           return false;
718         }
719 
720         updateCurrBlock(nextBlock);
721         return true;
722       }
723 
724       // We are still in the same block.
725       readKeyValueLen();
726       return true;
727     }
728 
729     /**
730      * Positions this scanner at the start of the file.
731      *
732      * @return false if empty file; i.e. a call to next would return false and
733      *         the current key and value are undefined.
734      * @throws IOException
735      */
736     @Override
737     public boolean seekTo() throws IOException {
738       if (reader == null) {
739         return false;
740       }
741 
742       if (reader.getTrailer().getEntryCount() == 0) {
743         // No data blocks.
744         return false;
745       }
746 
747       long firstDataBlockOffset =
748           reader.getTrailer().getFirstDataBlockOffset();
749       if (block != null && block.getOffset() == firstDataBlockOffset) {
750         blockBuffer.rewind();
751         readKeyValueLen();
752         return true;
753       }
754 
755       block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread,
756           isCompaction, BlockType.DATA);
757       if (block.getOffset() < 0) {
758         throw new IOException("Invalid block offset: " + block.getOffset());
759       }
760       updateCurrBlock(block);
761       return true;
762     }
763 
764     @Override
765     protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, byte[] nextIndexedKey,
766         boolean rewind, byte[] key, int offset, int length, boolean seekBefore)
767         throws IOException {
768       if (block == null || block.getOffset() != seekToBlock.getOffset()) {
769         updateCurrBlock(seekToBlock);
770       } else if (rewind) {
771         blockBuffer.rewind();
772       }
773 
774       // Update the nextIndexedKey
775       this.nextIndexedKey = nextIndexedKey;
776       return blockSeek(key, offset, length, seekBefore);
777     }
778 
779     /**
780      * Updates the current block to be the given {@link HFileBlock}. Seeks to
781      * the the first key/value pair.
782      *
783      * @param newBlock the block to make current
784      */
785     private void updateCurrBlock(HFileBlock newBlock) {
786       block = newBlock;
787 
788       // sanity check
789       if (block.getBlockType() != BlockType.DATA) {
790         throw new IllegalStateException("ScannerV2 works only on data " +
791             "blocks, got " + block.getBlockType() + "; " +
792             "fileName=" + reader.name + ", " +
793             "dataBlockEncoder=" + reader.dataBlockEncoder + ", " +
794             "isCompaction=" + isCompaction);
795       }
796 
797       blockBuffer = block.getBufferWithoutHeader();
798       readKeyValueLen();
799       blockFetches++;
800 
801       // Reset the next indexed key
802       this.nextIndexedKey = null;
803     }
804 
805     private final void readKeyValueLen() {
806       blockBuffer.mark();
807       currKeyLen = blockBuffer.getInt();
808       currValueLen = blockBuffer.getInt();
809       blockBuffer.reset();
810       if (this.reader.shouldIncludeMemstoreTS()) {
811         if (this.reader.decodeMemstoreTS) {
812           try {
813             int memstoreTSOffset = blockBuffer.arrayOffset()
814                 + blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen
815                 + currValueLen;
816             currMemstoreTS = Bytes.readVLong(blockBuffer.array(),
817                 memstoreTSOffset);
818             currMemstoreTSLen = WritableUtils.getVIntSize(currMemstoreTS);
819           } catch (Exception e) {
820             throw new RuntimeException("Error reading memstore timestamp", e);
821           }
822         } else {
823           currMemstoreTS = 0;
824           currMemstoreTSLen = 1;
825         }
826       }
827 
828       if (currKeyLen < 0 || currValueLen < 0
829           || currKeyLen > blockBuffer.limit()
830           || currValueLen > blockBuffer.limit()) {
831         throw new IllegalStateException("Invalid currKeyLen " + currKeyLen
832             + " or currValueLen " + currValueLen + ". Block offset: "
833             + block.getOffset() + ", block length: " + blockBuffer.limit()
834             + ", position: " + blockBuffer.position() + " (without header).");
835       }
836     }
837 
838     /**
839      * Within a loaded block, seek looking for the last key that is smaller
840      * than (or equal to?) the key we are interested in.
841      *
842      * A note on the seekBefore: if you have seekBefore = true, AND the first
843      * key in the block = key, then you'll get thrown exceptions. The caller has
844      * to check for that case and load the previous block as appropriate.
845      *
846      * @param key the key to find
847      * @param seekBefore find the key before the given key in case of exact
848      *          match.
849      * @return 0 in case of an exact key match, 1 in case of an inexact match
850      */
851     private int blockSeek(byte[] key, int offset, int length,
852         boolean seekBefore) {
853       int klen, vlen;
854       long memstoreTS = 0;
855       int memstoreTSLen = 0;
856       int lastKeyValueSize = -1;
857       do {
858         blockBuffer.mark();
859         klen = blockBuffer.getInt();
860         vlen = blockBuffer.getInt();
861         blockBuffer.reset();
862         if (this.reader.shouldIncludeMemstoreTS()) {
863           if (this.reader.decodeMemstoreTS) {
864             try {
865               int memstoreTSOffset = blockBuffer.arrayOffset()
866                   + blockBuffer.position() + KEY_VALUE_LEN_SIZE + klen + vlen;
867               memstoreTS = Bytes.readVLong(blockBuffer.array(),
868                   memstoreTSOffset);
869               memstoreTSLen = WritableUtils.getVIntSize(memstoreTS);
870             } catch (Exception e) {
871               throw new RuntimeException("Error reading memstore timestamp", e);
872             }
873           } else {
874             memstoreTS = 0;
875             memstoreTSLen = 1;
876           }
877         }
878 
879         int keyOffset = blockBuffer.arrayOffset() + blockBuffer.position()
880             + KEY_VALUE_LEN_SIZE;
881         int comp = reader.getComparator().compare(key, offset, length,
882             blockBuffer.array(), keyOffset, klen);
883 
884         if (comp == 0) {
885           if (seekBefore) {
886             if (lastKeyValueSize < 0) {
887               throw new IllegalStateException("blockSeek with seekBefore "
888                   + "at the first key of the block: key="
889                   + Bytes.toStringBinary(key) + ", blockOffset="
890                   + block.getOffset() + ", onDiskSize="
891                   + block.getOnDiskSizeWithHeader());
892             }
893             blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
894             readKeyValueLen();
895             return 1; // non exact match.
896           }
897           currKeyLen = klen;
898           currValueLen = vlen;
899           if (this.reader.shouldIncludeMemstoreTS()) {
900             currMemstoreTS = memstoreTS;
901             currMemstoreTSLen = memstoreTSLen;
902           }
903           return 0; // indicate exact match
904         }
905 
906         if (comp < 0) {
907           if (lastKeyValueSize > 0)
908             blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
909           readKeyValueLen();
910           return 1;
911         }
912 
913         // The size of this key/value tuple, including key/value length fields.
914         lastKeyValueSize = klen + vlen + memstoreTSLen + KEY_VALUE_LEN_SIZE;
915         blockBuffer.position(blockBuffer.position() + lastKeyValueSize);
916       } while (blockBuffer.remaining() > 0);
917 
918       // Seek to the last key we successfully read. This will happen if this is
919       // the last key/value pair in the file, in which case the following call
920       // to next() has to return false.
921       blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
922       readKeyValueLen();
923       return 1; // didn't exactly find it.
924     }
925 
926     @Override
927     protected ByteBuffer getFirstKeyInBlock(HFileBlock curBlock) {
928       ByteBuffer buffer = curBlock.getBufferWithoutHeader();
929       // It is safe to manipulate this buffer because we own the buffer object.
930       buffer.rewind();
931       int klen = buffer.getInt();
932       buffer.getInt();
933       ByteBuffer keyBuff = buffer.slice();
934       keyBuff.limit(klen);
935       keyBuff.rewind();
936       return keyBuff;
937     }
938 
939     @Override
940     public String getKeyString() {
941       return Bytes.toStringBinary(blockBuffer.array(),
942           blockBuffer.arrayOffset() + blockBuffer.position()
943               + KEY_VALUE_LEN_SIZE, currKeyLen);
944     }
945 
946     @Override
947     public String getValueString() {
948       return Bytes.toString(blockBuffer.array(), blockBuffer.arrayOffset()
949           + blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen,
950           currValueLen);
951     }
952   }
953 
954   /**
955    * ScannerV2 that operates on encoded data blocks.
956    */
957   protected static class EncodedScannerV2 extends AbstractScannerV2 {
958     private DataBlockEncoder.EncodedSeeker seeker = null;
959     private DataBlockEncoder dataBlockEncoder = null;
960     private final boolean includesMemstoreTS;
961 
962     public EncodedScannerV2(HFileReaderV2 reader, boolean cacheBlocks,
963         boolean pread, boolean isCompaction, boolean includesMemstoreTS) {
964       super(reader, cacheBlocks, pread, isCompaction);
965       this.includesMemstoreTS = includesMemstoreTS;
966     }
967 
968     private void setDataBlockEncoder(DataBlockEncoder dataBlockEncoder) {
969       this.dataBlockEncoder = dataBlockEncoder;
970       seeker = dataBlockEncoder.createSeeker(reader.getComparator(),
971           includesMemstoreTS);
972     }
973 
974     @Override
975     public boolean isSeeked(){
976       return this.block != null;
977     }
978 
979     /**
980      * Updates the current block to be the given {@link HFileBlock}. Seeks to
981      * the the first key/value pair.
982      *
983      * @param newBlock the block to make current
984      */
985     private void updateCurrentBlock(HFileBlock newBlock) {
986       block = newBlock;
987 
988       // sanity checks
989       if (block.getBlockType() != BlockType.ENCODED_DATA) {
990         throw new IllegalStateException(
991             "EncodedScannerV2 works only on encoded data blocks");
992       }
993 
994       short dataBlockEncoderId = block.getDataBlockEncodingId();
995       if (dataBlockEncoder == null ||
996           !DataBlockEncoding.isCorrectEncoder(dataBlockEncoder,
997               dataBlockEncoderId)) {
998         DataBlockEncoder encoder =
999             DataBlockEncoding.getDataBlockEncoderById(dataBlockEncoderId);
1000         setDataBlockEncoder(encoder);
1001       }
1002 
1003       seeker.setCurrentBuffer(getEncodedBuffer(newBlock));
1004       blockFetches++;
1005 
1006       // Reset the next indexed key
1007       this.nextIndexedKey = null;
1008     }
1009 
1010     private ByteBuffer getEncodedBuffer(HFileBlock newBlock) {
1011       ByteBuffer origBlock = newBlock.getBufferReadOnly();
1012       ByteBuffer encodedBlock = ByteBuffer.wrap(origBlock.array(),
1013           origBlock.arrayOffset() + newBlock.headerSize() +
1014           DataBlockEncoding.ID_SIZE,
1015           newBlock.getUncompressedSizeWithoutHeader() -
1016           DataBlockEncoding.ID_SIZE).slice();
1017       return encodedBlock;
1018     }
1019 
1020     @Override
1021     public boolean seekTo() throws IOException {
1022       if (reader == null) {
1023         return false;
1024       }
1025 
1026       if (reader.getTrailer().getEntryCount() == 0) {
1027         // No data blocks.
1028         return false;
1029       }
1030 
1031       long firstDataBlockOffset =
1032           reader.getTrailer().getFirstDataBlockOffset();
1033       if (block != null && block.getOffset() == firstDataBlockOffset) {
1034         seeker.rewind();
1035         return true;
1036       }
1037 
1038       block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread,
1039           isCompaction, BlockType.DATA);
1040       if (block.getOffset() < 0) {
1041         throw new IOException("Invalid block offset: " + block.getOffset());
1042       }
1043       updateCurrentBlock(block);
1044       return true;
1045     }
1046 
1047     @Override
1048     public boolean next() throws IOException {
1049       boolean isValid = seeker.next();
1050       if (!isValid) {
1051         block = readNextDataBlock();
1052         isValid = block != null;
1053         if (isValid) {
1054           updateCurrentBlock(block);
1055         }
1056       }
1057       return isValid;
1058     }
1059 
1060     @Override
1061     public ByteBuffer getKey() {
1062       assertValidSeek();
1063       return seeker.getKeyDeepCopy();
1064     }
1065 
1066     @Override
1067     public int compareKey(RawComparator<byte []> comparator, byte[] key, int offset, int length) {
1068       return seeker.compareKey(comparator, key, offset, length);
1069     }
1070 
1071     @Override
1072     public ByteBuffer getValue() {
1073       assertValidSeek();
1074       return seeker.getValueShallowCopy();
1075     }
1076 
1077     @Override
1078     public KeyValue getKeyValue() {
1079       if (block == null) {
1080         return null;
1081       }
1082       return seeker.getKeyValue();
1083     }
1084 
1085     @Override
1086     public String getKeyString() {
1087       ByteBuffer keyBuffer = getKey();
1088       return Bytes.toStringBinary(keyBuffer.array(),
1089           keyBuffer.arrayOffset(), keyBuffer.limit());
1090     }
1091 
1092     @Override
1093     public String getValueString() {
1094       ByteBuffer valueBuffer = getValue();
1095       return Bytes.toStringBinary(valueBuffer.array(),
1096           valueBuffer.arrayOffset(), valueBuffer.limit());
1097     }
1098 
1099     private void assertValidSeek() {
1100       if (block == null) {
1101         throw new NotSeekedException();
1102       }
1103     }
1104 
1105     @Override
1106     protected ByteBuffer getFirstKeyInBlock(HFileBlock curBlock) {
1107       return dataBlockEncoder.getFirstKeyInBlock(getEncodedBuffer(curBlock));
1108     }
1109 
1110     @Override
1111     protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, byte[] nextIndexedKey,
1112         boolean rewind, byte[] key, int offset, int length, boolean seekBefore)
1113         throws IOException  {
1114       if (block == null || block.getOffset() != seekToBlock.getOffset()) {
1115         updateCurrentBlock(seekToBlock);
1116       } else if (rewind) {
1117         seeker.rewind();
1118       }
1119       this.nextIndexedKey = nextIndexedKey;
1120       return seeker.seekToKeyInBlock(key, offset, length, seekBefore);
1121     }
1122   }
1123 
1124   /**
1125    * Returns a buffer with the Bloom filter metadata. The caller takes
1126    * ownership of the buffer.
1127    */
1128   @Override
1129   public DataInput getGeneralBloomFilterMetadata() throws IOException {
1130     return this.getBloomFilterMetadata(BlockType.GENERAL_BLOOM_META);
1131   }
1132 
1133   @Override
1134   public DataInput getDeleteBloomFilterMetadata() throws IOException {
1135     return this.getBloomFilterMetadata(BlockType.DELETE_FAMILY_BLOOM_META);
1136   }
1137 
1138   private DataInput getBloomFilterMetadata(BlockType blockType)
1139   throws IOException {
1140     if (blockType != BlockType.GENERAL_BLOOM_META &&
1141         blockType != BlockType.DELETE_FAMILY_BLOOM_META) {
1142       throw new RuntimeException("Block Type: " + blockType.toString() +
1143           " is not supported") ;
1144     }
1145 
1146     for (HFileBlock b : loadOnOpenBlocks)
1147       if (b.getBlockType() == blockType)
1148         return b.getByteStream();
1149     return null;
1150   }
1151 
1152   @Override
1153   public boolean isFileInfoLoaded() {
1154     return true; // We load file info in constructor in version 2.
1155   }
1156 
1157   /**
1158    * Validates that the minor version is within acceptable limits.
1159    * Otherwise throws an Runtime exception
1160    */
1161   private void validateMinorVersion(Path path, int minorVersion) {
1162     if (minorVersion < MIN_MINOR_VERSION ||
1163         minorVersion > MAX_MINOR_VERSION) {
1164       String msg = "Minor version for path " + path + 
1165                    " is expected to be between " +
1166                    MIN_MINOR_VERSION + " and " + MAX_MINOR_VERSION +
1167                    " but is found to be " + minorVersion;
1168       LOG.error(msg);
1169       throw new RuntimeException(msg);
1170     }
1171   }
1172 }