1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.io.hfile;
19
20 import java.io.DataInput;
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.util.ArrayList;
24 import java.util.List;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.hbase.classification.InterfaceAudience;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.fs.Path;
31 import org.apache.hadoop.hbase.Cell;
32 import org.apache.hadoop.hbase.CellUtil;
33 import org.apache.hadoop.hbase.HConstants;
34 import org.apache.hadoop.hbase.KeyValue;
35 import org.apache.hadoop.hbase.KeyValue.KVComparator;
36 import org.apache.hadoop.hbase.NoTagsKeyValue;
37 import org.apache.hadoop.hbase.fs.HFileSystem;
38 import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
39 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
40 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
41 import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
42 import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
43 import org.apache.hadoop.hbase.util.Bytes;
44 import org.apache.hadoop.hbase.util.IdLock;
45 import org.apache.hadoop.io.WritableUtils;
46 import org.apache.htrace.Trace;
47 import org.apache.htrace.TraceScope;
48
49 import com.google.common.annotations.VisibleForTesting;
50
51
52
53
54 @InterfaceAudience.Private
55 public class HFileReaderV2 extends AbstractHFileReader {
56
57 private static final Log LOG = LogFactory.getLog(HFileReaderV2.class);
58
59
60 public static final int MINOR_VERSION_WITH_CHECKSUM = 1;
61
62 public static final int MINOR_VERSION_NO_CHECKSUM = 0;
63
64
65 public static final int PBUF_TRAILER_MINOR_VERSION = 2;
66
67
68
69
70
71 public final static int KEY_VALUE_LEN_SIZE = 2 * Bytes.SIZEOF_INT;
72
73 private boolean includesMemstoreTS = false;
74 protected boolean decodeMemstoreTS = false;
75
76 protected boolean shouldIncludeMemstoreTS() {
77 return includesMemstoreTS;
78 }
79
80
81 private HFileBlock.FSReader fsBlockReader;
82
83
84
85
86
87
88
89 private IdLock offsetLock = new IdLock();
90
91
92
93
94
95 private List<HFileBlock> loadOnOpenBlocks = new ArrayList<HFileBlock>();
96
97
98 static final int MIN_MINOR_VERSION = 0;
99
100
101
102
103 static final int MAX_MINOR_VERSION = 3;
104
105
106 static final int MINOR_VERSION_WITH_FAKED_KEY = 3;
107
108 HFileContext hfileContext;
109
110
111
112
113
114
115
116
117
118
119
120
121
122 public HFileReaderV2(final Path path, final FixedFileTrailer trailer,
123 final FSDataInputStreamWrapper fsdis, final long size, final CacheConfig cacheConf,
124 final HFileSystem hfs, final Configuration conf) throws IOException {
125 super(path, trailer, size, cacheConf, hfs, conf);
126 this.conf = conf;
127 trailer.expectMajorVersion(getMajorVersion());
128 validateMinorVersion(path, trailer.getMinorVersion());
129 this.hfileContext = createHFileContext(fsdis, fileSize, hfs, path, trailer);
130 HFileBlock.FSReaderImpl fsBlockReaderV2 =
131 new HFileBlock.FSReaderImpl(fsdis, fileSize, hfs, path, hfileContext);
132 this.fsBlockReader = fsBlockReaderV2;
133
134
135 comparator = trailer.createComparator();
136 dataBlockIndexReader = new HFileBlockIndex.BlockIndexReader(comparator,
137 trailer.getNumDataIndexLevels(), this);
138 metaBlockIndexReader = new HFileBlockIndex.BlockIndexReader(
139 KeyValue.RAW_COMPARATOR, 1);
140
141
142
143 HFileBlock.BlockIterator blockIter = fsBlockReaderV2.blockRange(
144 trailer.getLoadOnOpenDataOffset(),
145 fileSize - trailer.getTrailerSize());
146
147
148
149 dataBlockIndexReader.readMultiLevelIndexRoot(
150 blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX),
151 trailer.getDataIndexCount());
152
153
154 metaBlockIndexReader.readRootIndex(
155 blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX),
156 trailer.getMetaIndexCount());
157
158
159 fileInfo = new FileInfo();
160 fileInfo.read(blockIter.nextBlockWithBlockType(BlockType.FILE_INFO).getByteStream());
161 byte[] creationTimeBytes = fileInfo.get(FileInfo.CREATE_TIME_TS);
162 this.hfileContext.setFileCreateTime(creationTimeBytes == null? 0:
163 Bytes.toLong(creationTimeBytes));
164 lastKey = fileInfo.get(FileInfo.LASTKEY);
165 avgKeyLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_KEY_LEN));
166 avgValueLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_VALUE_LEN));
167 byte [] keyValueFormatVersion =
168 fileInfo.get(HFileWriterV2.KEY_VALUE_VERSION);
169 includesMemstoreTS = keyValueFormatVersion != null &&
170 Bytes.toInt(keyValueFormatVersion) ==
171 HFileWriterV2.KEY_VALUE_VER_WITH_MEMSTORE;
172 fsBlockReaderV2.setIncludesMemstoreTS(includesMemstoreTS);
173 if (includesMemstoreTS) {
174 decodeMemstoreTS = Bytes.toLong(fileInfo.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY)) > 0;
175 }
176
177
178 dataBlockEncoder = HFileDataBlockEncoderImpl.createFromFileInfo(fileInfo);
179 fsBlockReaderV2.setDataBlockEncoder(dataBlockEncoder);
180
181
182 HFileBlock b;
183 while ((b = blockIter.nextBlock()) != null) {
184 loadOnOpenBlocks.add(b);
185 }
186
187
188 if (cacheConf.shouldPrefetchOnOpen()) {
189 PrefetchExecutor.request(path, new Runnable() {
190 public void run() {
191 try {
192 long offset = 0;
193 long end = fileSize - getTrailer().getTrailerSize();
194 HFileBlock prevBlock = null;
195 while (offset < end) {
196 if (Thread.interrupted()) {
197 break;
198 }
199 long onDiskSize = -1;
200 if (prevBlock != null) {
201 onDiskSize = prevBlock.getNextBlockOnDiskSizeWithHeader();
202 }
203 HFileBlock block = readBlock(offset, onDiskSize, true, false, false, false,
204 null, null);
205 prevBlock = block;
206 offset += block.getOnDiskSizeWithHeader();
207 }
208 } catch (IOException e) {
209
210 if (LOG.isTraceEnabled()) {
211 LOG.trace("Exception encountered while prefetching " + path + ":", e);
212 }
213 } catch (Exception e) {
214
215 LOG.warn("Exception encountered while prefetching " + path + ":", e);
216 } finally {
217 PrefetchExecutor.complete(path);
218 }
219 }
220 });
221 }
222 }
223
224 protected HFileContext createHFileContext(FSDataInputStreamWrapper fsdis, long fileSize,
225 HFileSystem hfs, Path path, FixedFileTrailer trailer) throws IOException {
226 return new HFileContextBuilder()
227 .withIncludesMvcc(this.includesMemstoreTS)
228 .withCompression(this.compressAlgo)
229 .withHBaseCheckSum(trailer.getMinorVersion() >= MINOR_VERSION_WITH_CHECKSUM)
230 .build();
231 }
232
233
234
235
236
237
238
239
240
241
242
243
244
245 @Override
246 public HFileScanner getScanner(boolean cacheBlocks, final boolean pread,
247 final boolean isCompaction) {
248 if (dataBlockEncoder.useEncodedScanner()) {
249 return new EncodedScannerV2(this, cacheBlocks, pread, isCompaction,
250 hfileContext);
251 }
252
253 return new ScannerV2(this, cacheBlocks, pread, isCompaction);
254 }
255
256
257
258
259
260 private HFileBlock getCachedBlock(BlockCacheKey cacheKey, boolean cacheBlock, boolean useLock,
261 boolean isCompaction, boolean updateCacheMetrics, BlockType expectedBlockType,
262 DataBlockEncoding expectedDataBlockEncoding) throws IOException {
263
264 if (cacheConf.isBlockCacheEnabled()) {
265 BlockCache cache = cacheConf.getBlockCache();
266 HFileBlock cachedBlock = (HFileBlock) cache.getBlock(cacheKey, cacheBlock, useLock,
267 updateCacheMetrics);
268 if (cachedBlock != null) {
269 if (cacheConf.shouldCacheCompressed(cachedBlock.getBlockType().getCategory())) {
270 cachedBlock = cachedBlock.unpack(hfileContext, fsBlockReader);
271 }
272 validateBlockType(cachedBlock, expectedBlockType);
273
274 if (expectedDataBlockEncoding == null) {
275 return cachedBlock;
276 }
277 DataBlockEncoding actualDataBlockEncoding =
278 cachedBlock.getDataBlockEncoding();
279
280
281
282 if (cachedBlock.getBlockType().isData() &&
283 !actualDataBlockEncoding.equals(expectedDataBlockEncoding)) {
284
285
286
287
288
289
290
291
292
293
294 if (!expectedDataBlockEncoding.equals(DataBlockEncoding.NONE) &&
295 !actualDataBlockEncoding.equals(DataBlockEncoding.NONE)) {
296
297
298
299
300
301
302 LOG.info("Evicting cached block with key " + cacheKey +
303 " because of a data block encoding mismatch" +
304 "; expected: " + expectedDataBlockEncoding +
305 ", actual: " + actualDataBlockEncoding);
306 cache.evictBlock(cacheKey);
307 }
308 return null;
309 }
310 return cachedBlock;
311 }
312 }
313 return null;
314 }
315
316
317
318
319
320
321 @Override
322 public ByteBuffer getMetaBlock(String metaBlockName, boolean cacheBlock)
323 throws IOException {
324 if (trailer.getMetaIndexCount() == 0) {
325 return null;
326 }
327 if (metaBlockIndexReader == null) {
328 throw new IOException("Meta index not loaded");
329 }
330
331 byte[] mbname = Bytes.toBytes(metaBlockName);
332 int block = metaBlockIndexReader.rootBlockContainingKey(mbname,
333 0, mbname.length);
334 if (block == -1)
335 return null;
336 long blockSize = metaBlockIndexReader.getRootBlockDataSize(block);
337
338
339
340
341 synchronized (metaBlockIndexReader.getRootBlockKey(block)) {
342
343 long metaBlockOffset = metaBlockIndexReader.getRootBlockOffset(block);
344 BlockCacheKey cacheKey = new BlockCacheKey(name, metaBlockOffset);
345
346 cacheBlock &= cacheConf.shouldCacheDataOnRead();
347 if (cacheConf.isBlockCacheEnabled()) {
348 HFileBlock cachedBlock = getCachedBlock(cacheKey, cacheBlock, false, true, true,
349 BlockType.META, null);
350 if (cachedBlock != null) {
351 assert cachedBlock.isUnpacked() : "Packed block leak.";
352
353
354 return cachedBlock.getBufferWithoutHeader();
355 }
356
357 }
358
359 HFileBlock metaBlock = fsBlockReader.readBlockData(metaBlockOffset,
360 blockSize, -1, true).unpack(hfileContext, fsBlockReader);
361
362
363 if (cacheBlock) {
364 cacheConf.getBlockCache().cacheBlock(cacheKey, metaBlock,
365 cacheConf.isInMemory(), this.cacheConf.isCacheDataInL1());
366 }
367
368 return metaBlock.getBufferWithoutHeader();
369 }
370 }
371
372 @Override
373 public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize,
374 final boolean cacheBlock, boolean pread, final boolean isCompaction,
375 boolean updateCacheMetrics, BlockType expectedBlockType,
376 DataBlockEncoding expectedDataBlockEncoding)
377 throws IOException {
378 if (dataBlockIndexReader == null) {
379 throw new IOException("Block index not loaded");
380 }
381 if (dataBlockOffset < 0 || dataBlockOffset >= trailer.getLoadOnOpenDataOffset()) {
382 throw new IOException("Requested block is out of range: " + dataBlockOffset +
383 ", lastDataBlockOffset: " + trailer.getLastDataBlockOffset());
384 }
385
386
387
388
389
390 BlockCacheKey cacheKey = new BlockCacheKey(name, dataBlockOffset);
391 boolean useLock = false;
392 IdLock.Entry lockEntry = null;
393 TraceScope traceScope = Trace.startSpan("HFileReaderV2.readBlock");
394 try {
395 while (true) {
396
397 if (cacheConf.shouldReadBlockFromCache(expectedBlockType)) {
398 if (useLock) {
399 lockEntry = offsetLock.getLockEntry(dataBlockOffset);
400 }
401
402
403 HFileBlock cachedBlock = getCachedBlock(cacheKey, cacheBlock, useLock, isCompaction,
404 updateCacheMetrics, expectedBlockType, expectedDataBlockEncoding);
405 if (cachedBlock != null) {
406 if (Trace.isTracing()) {
407 traceScope.getSpan().addTimelineAnnotation("blockCacheHit");
408 }
409 assert cachedBlock.isUnpacked() : "Packed block leak.";
410 if (cachedBlock.getBlockType().isData()) {
411 if (updateCacheMetrics) {
412 HFile.dataBlockReadCnt.incrementAndGet();
413 }
414
415
416 if (cachedBlock.getDataBlockEncoding() != dataBlockEncoder.getDataBlockEncoding()) {
417 throw new IOException("Cached block under key " + cacheKey + " "
418 + "has wrong encoding: " + cachedBlock.getDataBlockEncoding() + " (expected: "
419 + dataBlockEncoder.getDataBlockEncoding() + ")");
420 }
421 }
422
423 return cachedBlock;
424 }
425 if (!useLock && cacheBlock && cacheConf.shouldLockOnCacheMiss(expectedBlockType)) {
426
427 useLock = true;
428 continue;
429 }
430
431 }
432
433 if (Trace.isTracing()) {
434 traceScope.getSpan().addTimelineAnnotation("blockCacheMiss");
435 }
436
437 HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, -1,
438 pread);
439 validateBlockType(hfileBlock, expectedBlockType);
440 HFileBlock unpacked = hfileBlock.unpack(hfileContext, fsBlockReader);
441 BlockType.BlockCategory category = hfileBlock.getBlockType().getCategory();
442
443
444 if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
445 cacheConf.getBlockCache().cacheBlock(cacheKey,
446 cacheConf.shouldCacheCompressed(category) ? hfileBlock : unpacked,
447 cacheConf.isInMemory(), this.cacheConf.isCacheDataInL1());
448 }
449
450 if (updateCacheMetrics && hfileBlock.getBlockType().isData()) {
451 HFile.dataBlockReadCnt.incrementAndGet();
452 }
453
454 return unpacked;
455 }
456 } finally {
457 traceScope.close();
458 if (lockEntry != null) {
459 offsetLock.releaseLockEntry(lockEntry);
460 }
461 }
462 }
463
464 @Override
465 public boolean hasMVCCInfo() {
466 return includesMemstoreTS && decodeMemstoreTS;
467 }
468
469
470
471
472
473
474
475
476
477
478 private void validateBlockType(HFileBlock block,
479 BlockType expectedBlockType) throws IOException {
480 if (expectedBlockType == null) {
481 return;
482 }
483 BlockType actualBlockType = block.getBlockType();
484 if (expectedBlockType.isData() && actualBlockType.isData()) {
485
486
487 return;
488 }
489 if (actualBlockType != expectedBlockType) {
490 throw new IOException("Expected block type " + expectedBlockType + ", " +
491 "but got " + actualBlockType + ": " + block);
492 }
493 }
494
495
496
497
498
499
500 @Override
501 public byte[] getLastKey() {
502 return dataBlockIndexReader.isEmpty() ? null : lastKey;
503 }
504
505
506
507
508
509
510 @Override
511 public byte[] midkey() throws IOException {
512 return dataBlockIndexReader.midkey();
513 }
514
515 @Override
516 public void close() throws IOException {
517 close(cacheConf.shouldEvictOnClose());
518 }
519
520 public void close(boolean evictOnClose) throws IOException {
521 PrefetchExecutor.cancel(path);
522 if (evictOnClose && cacheConf.isBlockCacheEnabled()) {
523 int numEvicted = cacheConf.getBlockCache().evictBlocksByHfileName(name);
524 if (LOG.isTraceEnabled()) {
525 LOG.trace("On close, file=" + name + " evicted=" + numEvicted
526 + " block(s)");
527 }
528 }
529 fsBlockReader.closeStreams();
530 }
531
532 public DataBlockEncoding getEffectiveEncodingInCache(boolean isCompaction) {
533 return dataBlockEncoder.getEffectiveEncodingInCache(isCompaction);
534 }
535
536
537 @Override
538 HFileBlock.FSReader getUncachedBlockReader() {
539 return fsBlockReader;
540 }
541
542
543 protected abstract static class AbstractScannerV2
544 extends AbstractHFileReader.Scanner {
545 protected HFileBlock block;
546
547 @Override
548 public Cell getNextIndexedKey() {
549 return nextIndexedKey;
550 }
551
552
553
554
555
556
557
558 protected Cell nextIndexedKey;
559
560 public AbstractScannerV2(HFileReaderV2 r, boolean cacheBlocks,
561 final boolean pread, final boolean isCompaction) {
562 super(r, cacheBlocks, pread, isCompaction);
563 }
564
565 protected abstract ByteBuffer getFirstKeyInBlock(HFileBlock curBlock);
566
567 protected abstract int loadBlockAndSeekToKey(HFileBlock seekToBlock, Cell nextIndexedKey,
568 boolean rewind, Cell key, boolean seekBefore) throws IOException;
569
570 @Override
571 public int seekTo(byte[] key, int offset, int length) throws IOException {
572
573
574 return seekTo(new KeyValue.KeyOnlyKeyValue(key, offset, length));
575 }
576
577 @Override
578 public int reseekTo(byte[] key, int offset, int length) throws IOException {
579 return reseekTo(new KeyValue.KeyOnlyKeyValue(key, offset, length));
580 }
581
582 @Override
583 public int seekTo(Cell key) throws IOException {
584 return seekTo(key, true);
585 }
586
587 @Override
588 public int reseekTo(Cell key) throws IOException {
589 int compared;
590 if (isSeeked()) {
591 compared = compareKey(reader.getComparator(), key);
592 if (compared < 1) {
593
594
595 return compared;
596 } else {
597
598 if (this.nextIndexedKey != null &&
599 (this.nextIndexedKey == HConstants.NO_NEXT_INDEXED_KEY || reader
600 .getComparator()
601 .compareOnlyKeyPortion(key, nextIndexedKey) < 0)) {
602
603
604
605
606
607
608 return loadBlockAndSeekToKey(this.block, nextIndexedKey, false, key, false);
609 }
610 }
611 }
612
613
614 return seekTo(key, false);
615 }
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632 public int seekTo(Cell key, boolean rewind) throws IOException {
633 HFileBlockIndex.BlockIndexReader indexReader = reader.getDataBlockIndexReader();
634 BlockWithScanInfo blockWithScanInfo = indexReader.loadDataBlockWithScanInfo(key, block,
635 cacheBlocks, pread, isCompaction, getEffectiveDataBlockEncoding());
636 if (blockWithScanInfo == null || blockWithScanInfo.getHFileBlock() == null) {
637
638 return -1;
639 }
640 return loadBlockAndSeekToKey(blockWithScanInfo.getHFileBlock(),
641 blockWithScanInfo.getNextIndexedKey(), rewind, key, false);
642 }
643
644 @Override
645 public boolean seekBefore(byte[] key, int offset, int length) throws IOException {
646 return seekBefore(new KeyValue.KeyOnlyKeyValue(key, offset, length));
647 }
648
649 @Override
650 public boolean seekBefore(Cell key) throws IOException {
651 HFileBlock seekToBlock = reader.getDataBlockIndexReader().seekToDataBlock(key, block,
652 cacheBlocks, pread, isCompaction,
653 ((HFileReaderV2) reader).getEffectiveEncodingInCache(isCompaction));
654 if (seekToBlock == null) {
655 return false;
656 }
657 ByteBuffer firstKey = getFirstKeyInBlock(seekToBlock);
658
659 if (reader.getComparator()
660 .compareOnlyKeyPortion(
661 new KeyValue.KeyOnlyKeyValue(firstKey.array(), firstKey.arrayOffset(),
662 firstKey.limit()), key) >= 0) {
663 long previousBlockOffset = seekToBlock.getPrevBlockOffset();
664
665 if (previousBlockOffset == -1) {
666
667 return false;
668 }
669
670
671
672
673
674
675 int prevBlockSize = -1;
676 seekToBlock = reader.readBlock(previousBlockOffset,
677 prevBlockSize, cacheBlocks,
678 pread, isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding());
679
680
681 }
682 Cell firstKeyInCurrentBlock = new KeyValue.KeyOnlyKeyValue(Bytes.getBytes(firstKey));
683 loadBlockAndSeekToKey(seekToBlock, firstKeyInCurrentBlock, true, key, true);
684 return true;
685 }
686
687
688
689
690
691
692
693
694 protected HFileBlock readNextDataBlock() throws IOException {
695 long lastDataBlockOffset = reader.getTrailer().getLastDataBlockOffset();
696 if (block == null)
697 return null;
698
699 HFileBlock curBlock = block;
700
701 do {
702 if (curBlock.getOffset() >= lastDataBlockOffset)
703 return null;
704
705 if (curBlock.getOffset() < 0) {
706 throw new IOException("Invalid block file offset: " + block);
707 }
708
709
710
711 curBlock = reader.readBlock(curBlock.getOffset()
712 + curBlock.getOnDiskSizeWithHeader(),
713 curBlock.getNextBlockOnDiskSizeWithHeader(), cacheBlocks, pread,
714 isCompaction, true, null, getEffectiveDataBlockEncoding());
715 } while (!curBlock.getBlockType().isData());
716
717 return curBlock;
718 }
719
720 public DataBlockEncoding getEffectiveDataBlockEncoding() {
721 return ((HFileReaderV2)reader).getEffectiveEncodingInCache(isCompaction);
722 }
723
724
725
726
727
728
729
730
731 public abstract int compareKey(KVComparator comparator, byte[] key, int offset,
732 int length);
733
734 public abstract int compareKey(KVComparator comparator, Cell kv);
735 }
736
737
738
739
740 protected static class ScannerV2 extends AbstractScannerV2 {
741 private HFileReaderV2 reader;
742
743 public ScannerV2(HFileReaderV2 r, boolean cacheBlocks,
744 final boolean pread, final boolean isCompaction) {
745 super(r, cacheBlocks, pread, isCompaction);
746 this.reader = r;
747 }
748
749 @Override
750 public Cell getKeyValue() {
751 if (!isSeeked())
752 return null;
753
754 return formNoTagsKeyValue();
755 }
756
757 protected Cell formNoTagsKeyValue() {
758 NoTagsKeyValue ret = new NoTagsKeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
759 + blockBuffer.position(), getCellBufSize());
760 if (this.reader.shouldIncludeMemstoreTS()) {
761 ret.setSequenceId(currMemstoreTS);
762 }
763 return ret;
764 }
765
766 protected int getCellBufSize() {
767 return KEY_VALUE_LEN_SIZE + currKeyLen + currValueLen;
768 }
769
770 @Override
771 public ByteBuffer getKey() {
772 assertSeeked();
773 return ByteBuffer.wrap(
774 blockBuffer.array(),
775 blockBuffer.arrayOffset() + blockBuffer.position()
776 + KEY_VALUE_LEN_SIZE, currKeyLen).slice();
777 }
778
779 @Override
780 public int compareKey(KVComparator comparator, byte[] key, int offset, int length) {
781 return comparator.compareFlatKey(key, offset, length, blockBuffer.array(),
782 blockBuffer.arrayOffset() + blockBuffer.position() + KEY_VALUE_LEN_SIZE, currKeyLen);
783 }
784
785 @Override
786 public ByteBuffer getValue() {
787 assertSeeked();
788 return ByteBuffer.wrap(
789 blockBuffer.array(),
790 blockBuffer.arrayOffset() + blockBuffer.position()
791 + KEY_VALUE_LEN_SIZE + currKeyLen, currValueLen).slice();
792 }
793
794 protected void setNonSeekedState() {
795 block = null;
796 blockBuffer = null;
797 currKeyLen = 0;
798 currValueLen = 0;
799 currMemstoreTS = 0;
800 currMemstoreTSLen = 0;
801 }
802
803
804
805
806 private void positionThisBlockBuffer() {
807 try {
808 blockBuffer.position(getNextCellStartPosition());
809 } catch (IllegalArgumentException e) {
810 LOG.error("Current pos = " + blockBuffer.position()
811 + "; currKeyLen = " + currKeyLen + "; currValLen = "
812 + currValueLen + "; block limit = " + blockBuffer.limit()
813 + "; HFile name = " + reader.getName()
814 + "; currBlock currBlockOffset = " + block.getOffset());
815 throw e;
816 }
817 }
818
819
820
821
822
823
824 private boolean positionForNextBlock() throws IOException {
825
826 long lastDataBlockOffset = reader.getTrailer().getLastDataBlockOffset();
827 if (block.getOffset() >= lastDataBlockOffset) {
828 setNonSeekedState();
829 return false;
830 }
831 return isNextBlock();
832 }
833
834 private boolean isNextBlock() throws IOException {
835
836 HFileBlock nextBlock = readNextDataBlock();
837 if (nextBlock == null) {
838 setNonSeekedState();
839 return false;
840 }
841 updateCurrBlock(nextBlock);
842 return true;
843 }
844
845 private final boolean _next() throws IOException {
846
847 if (blockBuffer.remaining() <= 0) {
848 return positionForNextBlock();
849 }
850
851 readKeyValueLen();
852 return true;
853 }
854
855
856
857
858
859
860
861
862 @Override
863 public boolean next() throws IOException {
864
865
866 assertSeeked();
867 positionThisBlockBuffer();
868 return _next();
869 }
870
871 protected int getNextCellStartPosition() {
872 return blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen + currValueLen
873 + currMemstoreTSLen;
874 }
875
876
877
878
879
880
881
882
883 @Override
884 public boolean seekTo() throws IOException {
885 if (reader == null) {
886 return false;
887 }
888
889 if (reader.getTrailer().getEntryCount() == 0) {
890
891 return false;
892 }
893
894 long firstDataBlockOffset =
895 reader.getTrailer().getFirstDataBlockOffset();
896 if (block != null && block.getOffset() == firstDataBlockOffset) {
897 blockBuffer.rewind();
898 readKeyValueLen();
899 return true;
900 }
901
902 block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread,
903 isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding());
904 if (block.getOffset() < 0) {
905 throw new IOException("Invalid block offset: " + block.getOffset());
906 }
907 updateCurrBlock(block);
908 return true;
909 }
910
911 @Override
912 protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, Cell nextIndexedKey,
913 boolean rewind, Cell key, boolean seekBefore) throws IOException {
914 if (block == null || block.getOffset() != seekToBlock.getOffset()) {
915 updateCurrBlock(seekToBlock);
916 } else if (rewind) {
917 blockBuffer.rewind();
918 }
919
920
921 this.nextIndexedKey = nextIndexedKey;
922 return blockSeek(key, seekBefore);
923 }
924
925
926
927
928
929
930
931 protected void updateCurrBlock(HFileBlock newBlock) {
932 block = newBlock;
933
934
935 if (block.getBlockType() != BlockType.DATA) {
936 throw new IllegalStateException("ScannerV2 works only on data " +
937 "blocks, got " + block.getBlockType() + "; " +
938 "fileName=" + reader.name + ", " +
939 "dataBlockEncoder=" + reader.dataBlockEncoder + ", " +
940 "isCompaction=" + isCompaction);
941 }
942
943 blockBuffer = block.getBufferWithoutHeader();
944 readKeyValueLen();
945 blockFetches++;
946
947
948 this.nextIndexedKey = null;
949 }
950
951
952
953
954
955 protected final boolean checkLen(final int v) {
956 return v < 0 || v > this.blockBuffer.limit();
957 }
958
959
960
961
962 protected final void checkKeyValueLen() {
963 if (checkLen(this.currKeyLen) || checkLen(this.currValueLen)) {
964 throw new IllegalStateException("Invalid currKeyLen " + this.currKeyLen +
965 " or currValueLen " + this.currValueLen + ". Block offset: " + block.getOffset() +
966 ", block length: " + this.blockBuffer.limit() + ", position: " +
967 this.blockBuffer.position() + " (without header).");
968 }
969 }
970
971 protected void readKeyValueLen() {
972
973
974
975
976
977 int p = blockBuffer.position() + blockBuffer.arrayOffset();
978
979
980 long ll = Bytes.toLong(blockBuffer.array(), p);
981
982 this.currKeyLen = (int)(ll >> Integer.SIZE);
983 this.currValueLen = (int)(Bytes.MASK_FOR_LOWER_INT_IN_LONG ^ ll);
984 checkKeyValueLen();
985
986 p += (Bytes.SIZEOF_LONG + currKeyLen + currValueLen);
987 readMvccVersion(p);
988 }
989
990
991
992
993
994 protected void readMvccVersion(final int position) {
995
996 if (!this.reader.shouldIncludeMemstoreTS()) return;
997 if (!this.reader.decodeMemstoreTS) {
998 currMemstoreTS = 0;
999 currMemstoreTSLen = 1;
1000 return;
1001 }
1002 _readMvccVersion(position);
1003 }
1004
1005
1006
1007
1008
1009 private void _readMvccVersion(final int position) {
1010
1011
1012
1013 byte firstByte = blockBuffer.array()[position];
1014 int len = WritableUtils.decodeVIntSize(firstByte);
1015 if (len == 1) {
1016 this.currMemstoreTS = firstByte;
1017 } else {
1018 long i = 0;
1019 for (int idx = 0; idx < len - 1; idx++) {
1020 byte b = blockBuffer.array()[position + 1 + idx];
1021 i = i << 8;
1022 i = i | (b & 0xFF);
1023 }
1024 currMemstoreTS = (WritableUtils.isNegativeVInt(firstByte) ? ~i : i);
1025 }
1026 this.currMemstoreTSLen = len;
1027 }
1028
1029 protected void readMvccVersion() {
1030
1031 readMvccVersion(blockBuffer.arrayOffset() + blockBuffer.position());
1032 }
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051 protected int blockSeek(Cell key, boolean seekBefore) {
1052 int klen, vlen;
1053 long memstoreTS = 0;
1054 int memstoreTSLen = 0;
1055 int lastKeyValueSize = -1;
1056 KeyValue.KeyOnlyKeyValue keyOnlykv = new KeyValue.KeyOnlyKeyValue();
1057 do {
1058 blockBuffer.mark();
1059 klen = blockBuffer.getInt();
1060 vlen = blockBuffer.getInt();
1061 blockBuffer.reset();
1062 if (this.reader.shouldIncludeMemstoreTS()) {
1063 if (this.reader.decodeMemstoreTS) {
1064 int memstoreTSOffset = blockBuffer.arrayOffset() + blockBuffer.position()
1065 + KEY_VALUE_LEN_SIZE + klen + vlen;
1066 memstoreTS = Bytes.readAsVLong(blockBuffer.array(), memstoreTSOffset);
1067 memstoreTSLen = WritableUtils.getVIntSize(memstoreTS);
1068 } else {
1069 memstoreTS = 0;
1070 memstoreTSLen = 1;
1071 }
1072 }
1073
1074 int keyOffset = blockBuffer.arrayOffset() + blockBuffer.position() + KEY_VALUE_LEN_SIZE;
1075 keyOnlykv.setKey(blockBuffer.array(), keyOffset, klen);
1076 int comp = reader.getComparator().compareOnlyKeyPortion(key, keyOnlykv);
1077
1078 if (comp == 0) {
1079 if (seekBefore) {
1080 if (lastKeyValueSize < 0) {
1081 throw new IllegalStateException("blockSeek with seekBefore "
1082 + "at the first key of the block: key="
1083 + CellUtil.getCellKeyAsString(key)
1084 + ", blockOffset=" + block.getOffset() + ", onDiskSize="
1085 + block.getOnDiskSizeWithHeader());
1086 }
1087 blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
1088 readKeyValueLen();
1089 return 1;
1090 }
1091 currKeyLen = klen;
1092 currValueLen = vlen;
1093 if (this.reader.shouldIncludeMemstoreTS()) {
1094 currMemstoreTS = memstoreTS;
1095 currMemstoreTSLen = memstoreTSLen;
1096 }
1097 return 0;
1098 } else if (comp < 0) {
1099 if (lastKeyValueSize > 0)
1100 blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
1101 readKeyValueLen();
1102 if (lastKeyValueSize == -1 && blockBuffer.position() == 0
1103 && this.reader.trailer.getMinorVersion() >= MINOR_VERSION_WITH_FAKED_KEY) {
1104 return HConstants.INDEX_KEY_MAGIC;
1105 }
1106 return 1;
1107 }
1108
1109
1110 lastKeyValueSize = klen + vlen + memstoreTSLen + KEY_VALUE_LEN_SIZE;
1111 blockBuffer.position(blockBuffer.position() + lastKeyValueSize);
1112 } while (blockBuffer.remaining() > 0);
1113
1114
1115
1116
1117 blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
1118 readKeyValueLen();
1119 return 1;
1120 }
1121
1122 @Override
1123 protected ByteBuffer getFirstKeyInBlock(HFileBlock curBlock) {
1124 ByteBuffer buffer = curBlock.getBufferWithoutHeader();
1125
1126 buffer.rewind();
1127 int klen = buffer.getInt();
1128 buffer.getInt();
1129 ByteBuffer keyBuff = buffer.slice();
1130 keyBuff.limit(klen);
1131 keyBuff.rewind();
1132 return keyBuff;
1133 }
1134
1135 @Override
1136 public String getKeyString() {
1137 return Bytes.toStringBinary(blockBuffer.array(),
1138 blockBuffer.arrayOffset() + blockBuffer.position()
1139 + KEY_VALUE_LEN_SIZE, currKeyLen);
1140 }
1141
1142 @Override
1143 public String getValueString() {
1144 return Bytes.toString(blockBuffer.array(), blockBuffer.arrayOffset()
1145 + blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen,
1146 currValueLen);
1147 }
1148
1149 @Override
1150 public int compareKey(KVComparator comparator, Cell key) {
1151 return comparator.compareOnlyKeyPortion(
1152 key,
1153 new KeyValue.KeyOnlyKeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
1154 + blockBuffer.position() + KEY_VALUE_LEN_SIZE, currKeyLen));
1155 }
1156 }
1157
1158
1159
1160
1161 protected static class EncodedScannerV2 extends AbstractScannerV2 {
1162 private final HFileBlockDecodingContext decodingCtx;
1163 private final DataBlockEncoder.EncodedSeeker seeker;
1164 private final DataBlockEncoder dataBlockEncoder;
1165 protected final HFileContext meta;
1166
1167 public EncodedScannerV2(HFileReaderV2 reader, boolean cacheBlocks,
1168 boolean pread, boolean isCompaction, HFileContext meta) {
1169 super(reader, cacheBlocks, pread, isCompaction);
1170 DataBlockEncoding encoding = reader.dataBlockEncoder.getDataBlockEncoding();
1171 dataBlockEncoder = encoding.getEncoder();
1172 decodingCtx = dataBlockEncoder.newDataBlockDecodingContext(meta);
1173 seeker = dataBlockEncoder.createSeeker(
1174 reader.getComparator(), decodingCtx);
1175 this.meta = meta;
1176 }
1177
1178 @Override
1179 public boolean isSeeked(){
1180 return this.block != null;
1181 }
1182
1183
1184
1185
1186
1187
1188
1189
1190 private void updateCurrentBlock(HFileBlock newBlock) throws CorruptHFileException {
1191 block = newBlock;
1192
1193
1194 if (block.getBlockType() != BlockType.ENCODED_DATA) {
1195 throw new IllegalStateException(
1196 "EncodedScanner works only on encoded data blocks");
1197 }
1198 short dataBlockEncoderId = block.getDataBlockEncodingId();
1199 if (!DataBlockEncoding.isCorrectEncoder(dataBlockEncoder, dataBlockEncoderId)) {
1200 String encoderCls = dataBlockEncoder.getClass().getName();
1201 throw new CorruptHFileException("Encoder " + encoderCls
1202 + " doesn't support data block encoding "
1203 + DataBlockEncoding.getNameFromId(dataBlockEncoderId));
1204 }
1205
1206 seeker.setCurrentBuffer(getEncodedBuffer(newBlock));
1207 blockFetches++;
1208
1209
1210 this.nextIndexedKey = null;
1211 }
1212
1213 private ByteBuffer getEncodedBuffer(HFileBlock newBlock) {
1214 ByteBuffer origBlock = newBlock.getBufferReadOnly();
1215 ByteBuffer encodedBlock = ByteBuffer.wrap(origBlock.array(),
1216 origBlock.arrayOffset() + newBlock.headerSize() +
1217 DataBlockEncoding.ID_SIZE,
1218 newBlock.getUncompressedSizeWithoutHeader() -
1219 DataBlockEncoding.ID_SIZE).slice();
1220 return encodedBlock;
1221 }
1222
1223 @Override
1224 public boolean seekTo() throws IOException {
1225 if (reader == null) {
1226 return false;
1227 }
1228
1229 if (reader.getTrailer().getEntryCount() == 0) {
1230
1231 return false;
1232 }
1233
1234 long firstDataBlockOffset =
1235 reader.getTrailer().getFirstDataBlockOffset();
1236 if (block != null && block.getOffset() == firstDataBlockOffset) {
1237 seeker.rewind();
1238 return true;
1239 }
1240
1241 block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread,
1242 isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding());
1243 if (block.getOffset() < 0) {
1244 throw new IOException("Invalid block offset: " + block.getOffset());
1245 }
1246 updateCurrentBlock(block);
1247 return true;
1248 }
1249
1250 @Override
1251 public boolean next() throws IOException {
1252 boolean isValid = seeker.next();
1253 if (!isValid) {
1254 block = readNextDataBlock();
1255 isValid = block != null;
1256 if (isValid) {
1257 updateCurrentBlock(block);
1258 }
1259 }
1260 return isValid;
1261 }
1262
1263 @Override
1264 public ByteBuffer getKey() {
1265 assertValidSeek();
1266 return seeker.getKeyDeepCopy();
1267 }
1268
1269 @Override
1270 public int compareKey(KVComparator comparator, byte[] key, int offset, int length) {
1271 return seeker.compareKey(comparator, key, offset, length);
1272 }
1273
1274 @Override
1275 public ByteBuffer getValue() {
1276 assertValidSeek();
1277 return seeker.getValueShallowCopy();
1278 }
1279
1280 @Override
1281 public Cell getKeyValue() {
1282 if (block == null) {
1283 return null;
1284 }
1285 return seeker.getKeyValue();
1286 }
1287
1288 @Override
1289 public String getKeyString() {
1290 ByteBuffer keyBuffer = getKey();
1291 return Bytes.toStringBinary(keyBuffer.array(),
1292 keyBuffer.arrayOffset(), keyBuffer.limit());
1293 }
1294
1295 @Override
1296 public String getValueString() {
1297 ByteBuffer valueBuffer = getValue();
1298 return Bytes.toStringBinary(valueBuffer.array(),
1299 valueBuffer.arrayOffset(), valueBuffer.limit());
1300 }
1301
1302 private void assertValidSeek() {
1303 if (block == null) {
1304 throw new NotSeekedException();
1305 }
1306 }
1307
1308 @Override
1309 protected ByteBuffer getFirstKeyInBlock(HFileBlock curBlock) {
1310 return dataBlockEncoder.getFirstKeyInBlock(getEncodedBuffer(curBlock));
1311 }
1312
1313 @Override
1314 protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, Cell nextIndexedKey,
1315 boolean rewind, Cell key, boolean seekBefore) throws IOException {
1316 if (block == null || block.getOffset() != seekToBlock.getOffset()) {
1317 updateCurrentBlock(seekToBlock);
1318 } else if (rewind) {
1319 seeker.rewind();
1320 }
1321 this.nextIndexedKey = nextIndexedKey;
1322 return seeker.seekToKeyInBlock(key, seekBefore);
1323 }
1324
1325 @Override
1326 public int compareKey(KVComparator comparator, Cell key) {
1327 return seeker.compareKey(comparator, key);
1328 }
1329 }
1330
1331
1332
1333
1334
1335 @Override
1336 public DataInput getGeneralBloomFilterMetadata() throws IOException {
1337 return this.getBloomFilterMetadata(BlockType.GENERAL_BLOOM_META);
1338 }
1339
1340 @Override
1341 public DataInput getDeleteBloomFilterMetadata() throws IOException {
1342 return this.getBloomFilterMetadata(BlockType.DELETE_FAMILY_BLOOM_META);
1343 }
1344
1345 private DataInput getBloomFilterMetadata(BlockType blockType)
1346 throws IOException {
1347 if (blockType != BlockType.GENERAL_BLOOM_META &&
1348 blockType != BlockType.DELETE_FAMILY_BLOOM_META) {
1349 throw new RuntimeException("Block Type: " + blockType.toString() +
1350 " is not supported") ;
1351 }
1352
1353 for (HFileBlock b : loadOnOpenBlocks)
1354 if (b.getBlockType() == blockType)
1355 return b.getByteStream();
1356 return null;
1357 }
1358
1359 @Override
1360 public boolean isFileInfoLoaded() {
1361 return true;
1362 }
1363
1364
1365
1366
1367
1368 private void validateMinorVersion(Path path, int minorVersion) {
1369 if (minorVersion < MIN_MINOR_VERSION ||
1370 minorVersion > MAX_MINOR_VERSION) {
1371 String msg = "Minor version for path " + path +
1372 " is expected to be between " +
1373 MIN_MINOR_VERSION + " and " + MAX_MINOR_VERSION +
1374 " but is found to be " + minorVersion;
1375 LOG.error(msg);
1376 throw new RuntimeException(msg);
1377 }
1378 }
1379
1380 @Override
1381 public int getMajorVersion() {
1382 return 2;
1383 }
1384
1385 @Override
1386 public HFileContext getFileContext() {
1387 return hfileContext;
1388 }
1389
1390
1391
1392
1393
1394 @VisibleForTesting
1395 boolean prefetchComplete() {
1396 return PrefetchExecutor.isCompleted(path);
1397 }
1398 }