1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.io.hfile;
19
20 import java.io.DataInput;
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.util.ArrayList;
24 import java.util.List;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.fs.Path;
30 import org.apache.hadoop.hbase.Cell;
31 import org.apache.hadoop.hbase.CellUtil;
32 import org.apache.hadoop.hbase.HConstants;
33 import org.apache.hadoop.hbase.KeyValue;
34 import org.apache.hadoop.hbase.KeyValue.KVComparator;
35 import org.apache.hadoop.hbase.NoTagsKeyValue;
36 import org.apache.hadoop.hbase.classification.InterfaceAudience;
37 import org.apache.hadoop.hbase.fs.HFileSystem;
38 import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
39 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
40 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
41 import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
42 import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
43 import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
44 import org.apache.hadoop.hbase.util.Bytes;
45 import org.apache.hadoop.hbase.util.IdLock;
46 import org.apache.hadoop.io.WritableUtils;
47 import org.apache.htrace.Trace;
48 import org.apache.htrace.TraceScope;
49
50 import com.google.common.annotations.VisibleForTesting;
51
52
53
54
55 @InterfaceAudience.Private
56 public class HFileReaderV2 extends AbstractHFileReader {
57
58 private static final Log LOG = LogFactory.getLog(HFileReaderV2.class);
59
60
61 public static final int MINOR_VERSION_WITH_CHECKSUM = 1;
62
63 public static final int MINOR_VERSION_NO_CHECKSUM = 0;
64
65
66 public static final int PBUF_TRAILER_MINOR_VERSION = 2;
67
68
69
70
71
72 public final static int KEY_VALUE_LEN_SIZE = 2 * Bytes.SIZEOF_INT;
73
74 private boolean includesMemstoreTS = false;
75 protected boolean decodeMemstoreTS = false;
76
77 protected boolean shouldIncludeMemstoreTS() {
78 return includesMemstoreTS;
79 }
80
81
82 private HFileBlock.FSReader fsBlockReader;
83
84
85
86
87
88
89
90 private IdLock offsetLock = new IdLock();
91
92
93
94
95
96 private List<HFileBlock> loadOnOpenBlocks = new ArrayList<HFileBlock>();
97
98
99 static final int MIN_MINOR_VERSION = 0;
100
101
102
103
104 static final int MAX_MINOR_VERSION = 3;
105
106
107 static final int MINOR_VERSION_WITH_FAKED_KEY = 3;
108
109 HFileContext hfileContext;
110
111
112
113
114
115
116
117
118
119
120
121
122
123 public HFileReaderV2(final Path path, final FixedFileTrailer trailer,
124 final FSDataInputStreamWrapper fsdis, final long size, final CacheConfig cacheConf,
125 final HFileSystem hfs, final Configuration conf) throws IOException {
126 super(path, trailer, size, cacheConf, hfs, conf);
127 this.conf = conf;
128 trailer.expectMajorVersion(getMajorVersion());
129 validateMinorVersion(path, trailer.getMinorVersion());
130 this.hfileContext = createHFileContext(fsdis, fileSize, hfs, path, trailer);
131 HFileBlock.FSReaderImpl fsBlockReaderV2 =
132 new HFileBlock.FSReaderImpl(fsdis, fileSize, hfs, path, hfileContext);
133 this.fsBlockReader = fsBlockReaderV2;
134
135
136 comparator = trailer.createComparator();
137 dataBlockIndexReader = new HFileBlockIndex.BlockIndexReader(comparator,
138 trailer.getNumDataIndexLevels(), this);
139 metaBlockIndexReader = new HFileBlockIndex.BlockIndexReader(
140 KeyValue.RAW_COMPARATOR, 1);
141
142
143
144 HFileBlock.BlockIterator blockIter = fsBlockReaderV2.blockRange(
145 trailer.getLoadOnOpenDataOffset(),
146 fileSize - trailer.getTrailerSize());
147
148
149
150 dataBlockIndexReader.readMultiLevelIndexRoot(
151 blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX),
152 trailer.getDataIndexCount());
153
154
155 metaBlockIndexReader.readRootIndex(
156 blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX),
157 trailer.getMetaIndexCount());
158
159
160 fileInfo = new FileInfo();
161 fileInfo.read(blockIter.nextBlockWithBlockType(BlockType.FILE_INFO).getByteStream());
162 byte[] creationTimeBytes = fileInfo.get(FileInfo.CREATE_TIME_TS);
163 this.hfileContext.setFileCreateTime(creationTimeBytes == null? 0:
164 Bytes.toLong(creationTimeBytes));
165 lastKey = fileInfo.get(FileInfo.LASTKEY);
166 avgKeyLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_KEY_LEN));
167 avgValueLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_VALUE_LEN));
168 byte [] keyValueFormatVersion =
169 fileInfo.get(HFileWriterV2.KEY_VALUE_VERSION);
170 includesMemstoreTS = keyValueFormatVersion != null &&
171 Bytes.toInt(keyValueFormatVersion) ==
172 HFileWriterV2.KEY_VALUE_VER_WITH_MEMSTORE;
173 fsBlockReaderV2.setIncludesMemstoreTS(includesMemstoreTS);
174 if (includesMemstoreTS) {
175 decodeMemstoreTS = Bytes.toLong(fileInfo.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY)) > 0;
176 }
177
178
179 dataBlockEncoder = HFileDataBlockEncoderImpl.createFromFileInfo(fileInfo);
180 fsBlockReaderV2.setDataBlockEncoder(dataBlockEncoder);
181
182
183 HFileBlock b;
184 while ((b = blockIter.nextBlock()) != null) {
185 loadOnOpenBlocks.add(b);
186 }
187
188
189 if (cacheConf.shouldPrefetchOnOpen()) {
190 PrefetchExecutor.request(path, new Runnable() {
191 public void run() {
192 long offset = 0;
193 long end = 0;
194 try {
195 end = getTrailer().getLoadOnOpenDataOffset();
196 HFileBlock prevBlock = null;
197 if (LOG.isTraceEnabled()) {
198 LOG.trace("Prefetch start " + getPathOffsetEndStr(path, offset, end));
199 }
200 while (offset < end) {
201 if (Thread.interrupted()) {
202 break;
203 }
204 long onDiskSize = -1;
205 if (prevBlock != null) {
206 onDiskSize = prevBlock.getNextBlockOnDiskSizeWithHeader();
207 }
208 HFileBlock block = readBlock(offset, onDiskSize, true, false, false, false,
209 null, null);
210 prevBlock = block;
211 offset += block.getOnDiskSizeWithHeader();
212 }
213 } catch (IOException e) {
214
215 if (LOG.isTraceEnabled()) {
216 LOG.trace("Prefetch " + getPathOffsetEndStr(path, offset, end), e);
217 }
218 } catch (NullPointerException e) {
219 LOG.warn("Stream moved/closed or prefetch cancelled?" +
220 getPathOffsetEndStr(path, offset, end), e);
221 } catch (Exception e) {
222
223 LOG.warn("Prefetch " + getPathOffsetEndStr(path, offset, end), e);
224 } finally {
225 PrefetchExecutor.complete(path);
226 }
227 }
228 });
229 }
230 }
231
232 protected HFileContext createHFileContext(FSDataInputStreamWrapper fsdis, long fileSize,
233 HFileSystem hfs, Path path, FixedFileTrailer trailer) throws IOException {
234 return new HFileContextBuilder()
235 .withIncludesMvcc(this.includesMemstoreTS)
236 .withCompression(this.compressAlgo)
237 .withHBaseCheckSum(trailer.getMinorVersion() >= MINOR_VERSION_WITH_CHECKSUM)
238 .build();
239 }
240
241 private static String getPathOffsetEndStr(final Path path, final long offset, final long end) {
242 return "path=" + path.toString() + ", offset=" + offset + ", end=" + end;
243 }
244
245
246
247
248
249
250
251
252
253
254
255
256
257 @Override
258 public HFileScanner getScanner(boolean cacheBlocks, final boolean pread,
259 final boolean isCompaction) {
260 if (dataBlockEncoder.useEncodedScanner()) {
261 return new EncodedScannerV2(this, cacheBlocks, pread, isCompaction,
262 hfileContext);
263 }
264
265 return new ScannerV2(this, cacheBlocks, pread, isCompaction);
266 }
267
268
269
270
271
272 private HFileBlock getCachedBlock(BlockCacheKey cacheKey, boolean cacheBlock, boolean useLock,
273 boolean isCompaction, boolean updateCacheMetrics, BlockType expectedBlockType,
274 DataBlockEncoding expectedDataBlockEncoding) throws IOException {
275
276 if (cacheConf.isBlockCacheEnabled()) {
277 BlockCache cache = cacheConf.getBlockCache();
278 HFileBlock cachedBlock = (HFileBlock) cache.getBlock(cacheKey, cacheBlock, useLock,
279 updateCacheMetrics);
280 if (cachedBlock != null) {
281 if (cacheConf.shouldCacheCompressed(cachedBlock.getBlockType().getCategory())) {
282 cachedBlock = cachedBlock.unpack(hfileContext, fsBlockReader);
283 }
284 validateBlockType(cachedBlock, expectedBlockType);
285
286 if (expectedDataBlockEncoding == null) {
287 return cachedBlock;
288 }
289 DataBlockEncoding actualDataBlockEncoding =
290 cachedBlock.getDataBlockEncoding();
291
292
293
294 if (cachedBlock.getBlockType().isData() &&
295 !actualDataBlockEncoding.equals(expectedDataBlockEncoding)) {
296
297
298
299
300
301
302
303
304
305
306 if (!expectedDataBlockEncoding.equals(DataBlockEncoding.NONE) &&
307 !actualDataBlockEncoding.equals(DataBlockEncoding.NONE)) {
308
309
310
311
312
313
314 LOG.info("Evicting cached block with key " + cacheKey +
315 " because of a data block encoding mismatch" +
316 "; expected: " + expectedDataBlockEncoding +
317 ", actual: " + actualDataBlockEncoding);
318 cache.evictBlock(cacheKey);
319 }
320 return null;
321 }
322 return cachedBlock;
323 }
324 }
325 return null;
326 }
327
328
329
330
331
332
333 @Override
334 public ByteBuffer getMetaBlock(String metaBlockName, boolean cacheBlock)
335 throws IOException {
336 if (trailer.getMetaIndexCount() == 0) {
337 return null;
338 }
339 if (metaBlockIndexReader == null) {
340 throw new IOException("Meta index not loaded");
341 }
342
343 byte[] mbname = Bytes.toBytes(metaBlockName);
344 int block = metaBlockIndexReader.rootBlockContainingKey(mbname,
345 0, mbname.length);
346 if (block == -1)
347 return null;
348 long blockSize = metaBlockIndexReader.getRootBlockDataSize(block);
349
350
351
352
353 synchronized (metaBlockIndexReader.getRootBlockKey(block)) {
354
355 long metaBlockOffset = metaBlockIndexReader.getRootBlockOffset(block);
356 BlockCacheKey cacheKey = new BlockCacheKey(name, metaBlockOffset,
357 this.isPrimaryReplicaReader());
358
359 cacheBlock &= cacheConf.shouldCacheDataOnRead();
360 if (cacheConf.isBlockCacheEnabled()) {
361 HFileBlock cachedBlock = getCachedBlock(cacheKey, cacheBlock, false, true, true,
362 BlockType.META, null);
363 if (cachedBlock != null) {
364 assert cachedBlock.isUnpacked() : "Packed block leak.";
365
366
367 return cachedBlock.getBufferWithoutHeader();
368 }
369
370 }
371
372 HFileBlock metaBlock = fsBlockReader.readBlockData(metaBlockOffset,
373 blockSize, -1, true).unpack(hfileContext, fsBlockReader);
374
375
376 if (cacheBlock) {
377 cacheConf.getBlockCache().cacheBlock(cacheKey, metaBlock,
378 cacheConf.isInMemory(), this.cacheConf.isCacheDataInL1());
379 }
380
381 return metaBlock.getBufferWithoutHeader();
382 }
383 }
384
385 @Override
386 public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize,
387 final boolean cacheBlock, boolean pread, final boolean isCompaction,
388 boolean updateCacheMetrics, BlockType expectedBlockType,
389 DataBlockEncoding expectedDataBlockEncoding)
390 throws IOException {
391 if (dataBlockIndexReader == null) {
392 throw new IOException("Block index not loaded");
393 }
394 long trailerOffset = trailer.getLoadOnOpenDataOffset();
395 if (dataBlockOffset < 0 || dataBlockOffset >= trailerOffset) {
396 throw new IOException("Requested block is out of range: " + dataBlockOffset +
397 ", lastDataBlockOffset: " + trailer.getLastDataBlockOffset() +
398 ", trailer.getLoadOnOpenDataOffset: " + trailerOffset);
399 }
400
401
402
403
404
405 BlockCacheKey cacheKey = new BlockCacheKey(name, dataBlockOffset,this.isPrimaryReplicaReader());
406 boolean useLock = false;
407 IdLock.Entry lockEntry = null;
408 TraceScope traceScope = Trace.startSpan("HFileReaderV2.readBlock");
409 try {
410 while (true) {
411
412 if (cacheConf.shouldReadBlockFromCache(expectedBlockType)) {
413 if (useLock) {
414 lockEntry = offsetLock.getLockEntry(dataBlockOffset);
415 }
416
417
418 HFileBlock cachedBlock = getCachedBlock(cacheKey, cacheBlock, useLock, isCompaction,
419 updateCacheMetrics, expectedBlockType, expectedDataBlockEncoding);
420 if (cachedBlock != null) {
421 if (Trace.isTracing()) {
422 traceScope.getSpan().addTimelineAnnotation("blockCacheHit");
423 }
424 assert cachedBlock.isUnpacked() : "Packed block leak.";
425 if (cachedBlock.getBlockType().isData()) {
426 if (updateCacheMetrics) {
427 HFile.dataBlockReadCnt.incrementAndGet();
428 }
429
430
431 if (cachedBlock.getDataBlockEncoding() != dataBlockEncoder.getDataBlockEncoding()) {
432 throw new IOException("Cached block under key " + cacheKey + " "
433 + "has wrong encoding: " + cachedBlock.getDataBlockEncoding() + " (expected: "
434 + dataBlockEncoder.getDataBlockEncoding() + ")");
435 }
436 }
437
438 return cachedBlock;
439 }
440 if (!useLock && cacheBlock && cacheConf.shouldLockOnCacheMiss(expectedBlockType)) {
441
442 useLock = true;
443 continue;
444 }
445
446 }
447
448 if (Trace.isTracing()) {
449 traceScope.getSpan().addTimelineAnnotation("blockCacheMiss");
450 }
451
452 HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, -1,
453 pread);
454 validateBlockType(hfileBlock, expectedBlockType);
455 HFileBlock unpacked = hfileBlock.unpack(hfileContext, fsBlockReader);
456 BlockType.BlockCategory category = hfileBlock.getBlockType().getCategory();
457
458
459 if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
460 cacheConf.getBlockCache().cacheBlock(cacheKey,
461 cacheConf.shouldCacheCompressed(category) ? hfileBlock : unpacked,
462 cacheConf.isInMemory(), this.cacheConf.isCacheDataInL1());
463 }
464
465 if (updateCacheMetrics && hfileBlock.getBlockType().isData()) {
466 HFile.dataBlockReadCnt.incrementAndGet();
467 }
468
469 return unpacked;
470 }
471 } finally {
472 traceScope.close();
473 if (lockEntry != null) {
474 offsetLock.releaseLockEntry(lockEntry);
475 }
476 }
477 }
478
479 @Override
480 public boolean hasMVCCInfo() {
481 return includesMemstoreTS && decodeMemstoreTS;
482 }
483
484
485
486
487
488
489
490
491
492
493 private void validateBlockType(HFileBlock block,
494 BlockType expectedBlockType) throws IOException {
495 if (expectedBlockType == null) {
496 return;
497 }
498 BlockType actualBlockType = block.getBlockType();
499 if (expectedBlockType.isData() && actualBlockType.isData()) {
500
501
502 return;
503 }
504 if (actualBlockType != expectedBlockType) {
505 throw new IOException("Expected block type " + expectedBlockType + ", " +
506 "but got " + actualBlockType + ": " + block);
507 }
508 }
509
510
511
512
513
514
515 @Override
516 public byte[] getLastKey() {
517 return dataBlockIndexReader.isEmpty() ? null : lastKey;
518 }
519
520
521
522
523
524
525 @Override
526 public byte[] midkey() throws IOException {
527 return dataBlockIndexReader.midkey();
528 }
529
530 @Override
531 public void close() throws IOException {
532 close(cacheConf.shouldEvictOnClose());
533 }
534
535 public void close(boolean evictOnClose) throws IOException {
536 PrefetchExecutor.cancel(path);
537 if (evictOnClose && cacheConf.isBlockCacheEnabled()) {
538 int numEvicted = cacheConf.getBlockCache().evictBlocksByHfileName(name);
539 if (LOG.isTraceEnabled()) {
540 LOG.trace("On close, file=" + name + " evicted=" + numEvicted
541 + " block(s)");
542 }
543 }
544 fsBlockReader.closeStreams();
545 }
546
547 public DataBlockEncoding getEffectiveEncodingInCache(boolean isCompaction) {
548 return dataBlockEncoder.getEffectiveEncodingInCache(isCompaction);
549 }
550
551
552 @Override
553 HFileBlock.FSReader getUncachedBlockReader() {
554 return fsBlockReader;
555 }
556
557
558 protected abstract static class AbstractScannerV2
559 extends AbstractHFileReader.Scanner {
560 protected HFileBlock block;
561
562 @Override
563 public Cell getNextIndexedKey() {
564 return nextIndexedKey;
565 }
566
567
568
569
570
571
572
573 protected Cell nextIndexedKey;
574
575 public AbstractScannerV2(HFileReaderV2 r, boolean cacheBlocks,
576 final boolean pread, final boolean isCompaction) {
577 super(r, cacheBlocks, pread, isCompaction);
578 }
579
580 protected abstract ByteBuffer getFirstKeyInBlock(HFileBlock curBlock);
581
582 protected abstract int loadBlockAndSeekToKey(HFileBlock seekToBlock, Cell nextIndexedKey,
583 boolean rewind, Cell key, boolean seekBefore) throws IOException;
584
585 @Override
586 public int seekTo(byte[] key, int offset, int length) throws IOException {
587
588
589 return seekTo(new KeyValue.KeyOnlyKeyValue(key, offset, length));
590 }
591
592 @Override
593 public int reseekTo(byte[] key, int offset, int length) throws IOException {
594 return reseekTo(new KeyValue.KeyOnlyKeyValue(key, offset, length));
595 }
596
597 @Override
598 public int seekTo(Cell key) throws IOException {
599 return seekTo(key, true);
600 }
601
602 @Override
603 public int reseekTo(Cell key) throws IOException {
604 int compared;
605 if (isSeeked()) {
606 compared = compareKey(reader.getComparator(), key);
607 if (compared < 1) {
608
609
610 return compared;
611 } else {
612
613 if (this.nextIndexedKey != null &&
614 (this.nextIndexedKey == KeyValueScanner.NO_NEXT_INDEXED_KEY || reader
615 .getComparator()
616 .compareOnlyKeyPortion(key, nextIndexedKey) < 0)) {
617
618
619
620
621
622
623 return loadBlockAndSeekToKey(this.block, nextIndexedKey, false, key, false);
624 }
625 }
626 }
627
628
629 return seekTo(key, false);
630 }
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647 public int seekTo(Cell key, boolean rewind) throws IOException {
648 HFileBlockIndex.BlockIndexReader indexReader = reader.getDataBlockIndexReader();
649 BlockWithScanInfo blockWithScanInfo = indexReader.loadDataBlockWithScanInfo(key, block,
650 cacheBlocks, pread, isCompaction, getEffectiveDataBlockEncoding());
651 if (blockWithScanInfo == null || blockWithScanInfo.getHFileBlock() == null) {
652
653 return -1;
654 }
655 return loadBlockAndSeekToKey(blockWithScanInfo.getHFileBlock(),
656 blockWithScanInfo.getNextIndexedKey(), rewind, key, false);
657 }
658
659 @Override
660 public boolean seekBefore(byte[] key, int offset, int length) throws IOException {
661 return seekBefore(new KeyValue.KeyOnlyKeyValue(key, offset, length));
662 }
663
664 @Override
665 public boolean seekBefore(Cell key) throws IOException {
666 HFileBlock seekToBlock = reader.getDataBlockIndexReader().seekToDataBlock(key, block,
667 cacheBlocks, pread, isCompaction,
668 ((HFileReaderV2) reader).getEffectiveEncodingInCache(isCompaction));
669 if (seekToBlock == null) {
670 return false;
671 }
672 ByteBuffer firstKey = getFirstKeyInBlock(seekToBlock);
673
674 if (reader.getComparator()
675 .compareOnlyKeyPortion(
676 new KeyValue.KeyOnlyKeyValue(firstKey.array(), firstKey.arrayOffset(),
677 firstKey.limit()), key) >= 0) {
678 long previousBlockOffset = seekToBlock.getPrevBlockOffset();
679
680 if (previousBlockOffset == -1) {
681
682 return false;
683 }
684
685
686
687
688
689
690 int prevBlockSize = -1;
691 seekToBlock = reader.readBlock(previousBlockOffset,
692 prevBlockSize, cacheBlocks,
693 pread, isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding());
694
695
696 }
697 Cell firstKeyInCurrentBlock = new KeyValue.KeyOnlyKeyValue(Bytes.getBytes(firstKey));
698 loadBlockAndSeekToKey(seekToBlock, firstKeyInCurrentBlock, true, key, true);
699 return true;
700 }
701
702
703
704
705
706
707
708
709 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
710 justification="Yeah, unnecessary null check; could do w/ clean up")
711 protected HFileBlock readNextDataBlock() throws IOException {
712 long lastDataBlockOffset = reader.getTrailer().getLastDataBlockOffset();
713 if (block == null)
714 return null;
715
716 HFileBlock curBlock = block;
717
718 do {
719 if (curBlock.getOffset() >= lastDataBlockOffset) {
720 return null;
721 }
722
723 if (curBlock.getOffset() < 0) {
724 throw new IOException("Invalid block file offset: " + block);
725 }
726
727
728
729 curBlock = reader.readBlock(curBlock.getOffset()
730 + curBlock.getOnDiskSizeWithHeader(),
731 curBlock.getNextBlockOnDiskSizeWithHeader(), cacheBlocks, pread,
732 isCompaction, true, null, getEffectiveDataBlockEncoding());
733 } while (!curBlock.getBlockType().isData());
734
735 return curBlock;
736 }
737
738 public DataBlockEncoding getEffectiveDataBlockEncoding() {
739 return ((HFileReaderV2)reader).getEffectiveEncodingInCache(isCompaction);
740 }
741
742
743
744
745
746
747
748
749 public abstract int compareKey(KVComparator comparator, byte[] key, int offset,
750 int length);
751
752 public abstract int compareKey(KVComparator comparator, Cell kv);
753 }
754
755
756
757
758 protected static class ScannerV2 extends AbstractScannerV2 {
759 private HFileReaderV2 reader;
760
761 public ScannerV2(HFileReaderV2 r, boolean cacheBlocks,
762 final boolean pread, final boolean isCompaction) {
763 super(r, cacheBlocks, pread, isCompaction);
764 this.reader = r;
765 }
766
767 @Override
768 public Cell getKeyValue() {
769 if (!isSeeked())
770 return null;
771
772 return formNoTagsKeyValue();
773 }
774
775 protected Cell formNoTagsKeyValue() {
776 NoTagsKeyValue ret = new NoTagsKeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
777 + blockBuffer.position(), getCellBufSize());
778 if (this.reader.shouldIncludeMemstoreTS()) {
779 ret.setSequenceId(currMemstoreTS);
780 }
781 return ret;
782 }
783
784 protected int getCellBufSize() {
785 return KEY_VALUE_LEN_SIZE + currKeyLen + currValueLen;
786 }
787
788 @Override
789 public ByteBuffer getKey() {
790 assertSeeked();
791 return ByteBuffer.wrap(
792 blockBuffer.array(),
793 blockBuffer.arrayOffset() + blockBuffer.position()
794 + KEY_VALUE_LEN_SIZE, currKeyLen).slice();
795 }
796
797 @Override
798 public int compareKey(KVComparator comparator, byte[] key, int offset, int length) {
799 return comparator.compareFlatKey(key, offset, length, blockBuffer.array(),
800 blockBuffer.arrayOffset() + blockBuffer.position() + KEY_VALUE_LEN_SIZE, currKeyLen);
801 }
802
803 @Override
804 public ByteBuffer getValue() {
805 assertSeeked();
806 return ByteBuffer.wrap(
807 blockBuffer.array(),
808 blockBuffer.arrayOffset() + blockBuffer.position()
809 + KEY_VALUE_LEN_SIZE + currKeyLen, currValueLen).slice();
810 }
811
812 protected void setNonSeekedState() {
813 block = null;
814 blockBuffer = null;
815 currKeyLen = 0;
816 currValueLen = 0;
817 currMemstoreTS = 0;
818 currMemstoreTSLen = 0;
819 }
820
821
822
823
824 private void positionThisBlockBuffer() {
825 try {
826 blockBuffer.position(getNextCellStartPosition());
827 } catch (IllegalArgumentException e) {
828 LOG.error("Current pos = " + blockBuffer.position()
829 + "; currKeyLen = " + currKeyLen + "; currValLen = "
830 + currValueLen + "; block limit = " + blockBuffer.limit()
831 + "; HFile name = " + reader.getName()
832 + "; currBlock currBlockOffset = " + block.getOffset());
833 throw e;
834 }
835 }
836
837
838
839
840
841
842 private boolean positionForNextBlock() throws IOException {
843
844 long lastDataBlockOffset = reader.getTrailer().getLastDataBlockOffset();
845 if (block.getOffset() >= lastDataBlockOffset) {
846 setNonSeekedState();
847 return false;
848 }
849 return isNextBlock();
850 }
851
852 private boolean isNextBlock() throws IOException {
853
854 HFileBlock nextBlock = readNextDataBlock();
855 if (nextBlock == null) {
856 setNonSeekedState();
857 return false;
858 }
859 updateCurrBlock(nextBlock);
860 return true;
861 }
862
863 private final boolean _next() throws IOException {
864
865 if (blockBuffer.remaining() <= 0) {
866 return positionForNextBlock();
867 }
868
869 readKeyValueLen();
870 return true;
871 }
872
873
874
875
876
877
878
879
880 @Override
881 public boolean next() throws IOException {
882
883
884 assertSeeked();
885 positionThisBlockBuffer();
886 return _next();
887 }
888
889 protected int getNextCellStartPosition() {
890 return blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen + currValueLen
891 + currMemstoreTSLen;
892 }
893
894
895
896
897
898
899
900
901 @Override
902 public boolean seekTo() throws IOException {
903 if (reader == null) {
904 return false;
905 }
906
907 if (reader.getTrailer().getEntryCount() == 0) {
908
909 return false;
910 }
911
912 long firstDataBlockOffset =
913 reader.getTrailer().getFirstDataBlockOffset();
914 if (block != null && block.getOffset() == firstDataBlockOffset) {
915 blockBuffer.rewind();
916 readKeyValueLen();
917 return true;
918 }
919
920 block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread,
921 isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding());
922 if (block.getOffset() < 0) {
923 throw new IOException("Invalid block offset: " + block.getOffset());
924 }
925 updateCurrBlock(block);
926 return true;
927 }
928
929 @Override
930 protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, Cell nextIndexedKey,
931 boolean rewind, Cell key, boolean seekBefore) throws IOException {
932 if (block == null || block.getOffset() != seekToBlock.getOffset()) {
933 updateCurrBlock(seekToBlock);
934 } else if (rewind) {
935 blockBuffer.rewind();
936 }
937
938
939 this.nextIndexedKey = nextIndexedKey;
940 return blockSeek(key, seekBefore);
941 }
942
943
944
945
946
947
948
949 protected void updateCurrBlock(HFileBlock newBlock) {
950 block = newBlock;
951
952
953 if (block.getBlockType() != BlockType.DATA) {
954 throw new IllegalStateException("ScannerV2 works only on data " +
955 "blocks, got " + block.getBlockType() + "; " +
956 "fileName=" + reader.name + ", " +
957 "dataBlockEncoder=" + reader.dataBlockEncoder + ", " +
958 "isCompaction=" + isCompaction);
959 }
960
961 blockBuffer = block.getBufferWithoutHeader();
962 readKeyValueLen();
963 blockFetches.incrementAndGet();
964
965
966 this.nextIndexedKey = null;
967 }
968
969
970
971
972
973 protected final boolean checkLen(final int v) {
974 return v < 0 || v > this.blockBuffer.limit();
975 }
976
977
978
979
980 protected final void checkKeyValueLen() {
981 if (checkLen(this.currKeyLen) || checkLen(this.currValueLen)) {
982 throw new IllegalStateException("Invalid currKeyLen " + this.currKeyLen +
983 " or currValueLen " + this.currValueLen + ". Block offset: " + block.getOffset() +
984 ", block length: " + this.blockBuffer.limit() + ", position: " +
985 this.blockBuffer.position() + " (without header).");
986 }
987 }
988
989 protected void readKeyValueLen() {
990
991
992
993
994
995 int p = blockBuffer.position() + blockBuffer.arrayOffset();
996
997
998 long ll = Bytes.toLong(blockBuffer.array(), p);
999
1000 this.currKeyLen = (int)(ll >> Integer.SIZE);
1001 this.currValueLen = (int)(Bytes.MASK_FOR_LOWER_INT_IN_LONG ^ ll);
1002 checkKeyValueLen();
1003
1004 p += (Bytes.SIZEOF_LONG + currKeyLen + currValueLen);
1005 readMvccVersion(p);
1006 }
1007
1008
1009
1010
1011
1012 protected void readMvccVersion(final int position) {
1013
1014 if (!this.reader.shouldIncludeMemstoreTS()) return;
1015 if (!this.reader.decodeMemstoreTS) {
1016 currMemstoreTS = 0;
1017 currMemstoreTSLen = 1;
1018 return;
1019 }
1020 _readMvccVersion(position);
1021 }
1022
1023
1024
1025
1026
1027 private void _readMvccVersion(final int position) {
1028
1029
1030
1031 byte firstByte = blockBuffer.array()[position];
1032 int len = WritableUtils.decodeVIntSize(firstByte);
1033 if (len == 1) {
1034 this.currMemstoreTS = firstByte;
1035 } else {
1036 long i = 0;
1037 for (int idx = 0; idx < len - 1; idx++) {
1038 byte b = blockBuffer.array()[position + 1 + idx];
1039 i = i << 8;
1040 i = i | (b & 0xFF);
1041 }
1042 currMemstoreTS = (WritableUtils.isNegativeVInt(firstByte) ? ~i : i);
1043 }
1044 this.currMemstoreTSLen = len;
1045 }
1046
1047 protected void readMvccVersion() {
1048
1049 readMvccVersion(blockBuffer.arrayOffset() + blockBuffer.position());
1050 }
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069 protected int blockSeek(Cell key, boolean seekBefore) {
1070 int klen, vlen;
1071 long memstoreTS = 0;
1072 int memstoreTSLen = 0;
1073 int lastKeyValueSize = -1;
1074 KeyValue.KeyOnlyKeyValue keyOnlykv = new KeyValue.KeyOnlyKeyValue();
1075 do {
1076 blockBuffer.mark();
1077 klen = blockBuffer.getInt();
1078 vlen = blockBuffer.getInt();
1079 blockBuffer.reset();
1080 if (this.reader.shouldIncludeMemstoreTS()) {
1081 if (this.reader.decodeMemstoreTS) {
1082 int memstoreTSOffset = blockBuffer.arrayOffset() + blockBuffer.position()
1083 + KEY_VALUE_LEN_SIZE + klen + vlen;
1084 memstoreTS = Bytes.readAsVLong(blockBuffer.array(), memstoreTSOffset);
1085 memstoreTSLen = WritableUtils.getVIntSize(memstoreTS);
1086 } else {
1087 memstoreTS = 0;
1088 memstoreTSLen = 1;
1089 }
1090 }
1091
1092 int keyOffset = blockBuffer.arrayOffset() + blockBuffer.position() + KEY_VALUE_LEN_SIZE;
1093 keyOnlykv.setKey(blockBuffer.array(), keyOffset, klen);
1094 int comp = reader.getComparator().compareOnlyKeyPortion(key, keyOnlykv);
1095
1096 if (comp == 0) {
1097 if (seekBefore) {
1098 if (lastKeyValueSize < 0) {
1099 throw new IllegalStateException("blockSeek with seekBefore "
1100 + "at the first key of the block: key="
1101 + CellUtil.getCellKeyAsString(key)
1102 + ", blockOffset=" + block.getOffset() + ", onDiskSize="
1103 + block.getOnDiskSizeWithHeader());
1104 }
1105 blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
1106 readKeyValueLen();
1107 return 1;
1108 }
1109 currKeyLen = klen;
1110 currValueLen = vlen;
1111 if (this.reader.shouldIncludeMemstoreTS()) {
1112 currMemstoreTS = memstoreTS;
1113 currMemstoreTSLen = memstoreTSLen;
1114 }
1115 return 0;
1116 } else if (comp < 0) {
1117 if (lastKeyValueSize > 0)
1118 blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
1119 readKeyValueLen();
1120 if (lastKeyValueSize == -1 && blockBuffer.position() == 0
1121 && this.reader.trailer.getMinorVersion() >= MINOR_VERSION_WITH_FAKED_KEY) {
1122 return HConstants.INDEX_KEY_MAGIC;
1123 }
1124 return 1;
1125 }
1126
1127
1128 lastKeyValueSize = klen + vlen + memstoreTSLen + KEY_VALUE_LEN_SIZE;
1129 blockBuffer.position(blockBuffer.position() + lastKeyValueSize);
1130 } while (blockBuffer.remaining() > 0);
1131
1132
1133
1134
1135 blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
1136 readKeyValueLen();
1137 return 1;
1138 }
1139
1140 @Override
1141 protected ByteBuffer getFirstKeyInBlock(HFileBlock curBlock) {
1142 ByteBuffer buffer = curBlock.getBufferWithoutHeader();
1143
1144 buffer.rewind();
1145 int klen = buffer.getInt();
1146 buffer.getInt();
1147 ByteBuffer keyBuff = buffer.slice();
1148 keyBuff.limit(klen);
1149 keyBuff.rewind();
1150 return keyBuff;
1151 }
1152
1153 @Override
1154 public String getKeyString() {
1155 return Bytes.toStringBinary(blockBuffer.array(),
1156 blockBuffer.arrayOffset() + blockBuffer.position()
1157 + KEY_VALUE_LEN_SIZE, currKeyLen);
1158 }
1159
1160 @Override
1161 public String getValueString() {
1162 return Bytes.toString(blockBuffer.array(), blockBuffer.arrayOffset()
1163 + blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen,
1164 currValueLen);
1165 }
1166
1167 @Override
1168 public int compareKey(KVComparator comparator, Cell key) {
1169 return comparator.compareOnlyKeyPortion(
1170 key,
1171 new KeyValue.KeyOnlyKeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
1172 + blockBuffer.position() + KEY_VALUE_LEN_SIZE, currKeyLen));
1173 }
1174 }
1175
1176
1177
1178
1179 protected static class EncodedScannerV2 extends AbstractScannerV2 {
1180 private final HFileBlockDecodingContext decodingCtx;
1181 private final DataBlockEncoder.EncodedSeeker seeker;
1182 private final DataBlockEncoder dataBlockEncoder;
1183 protected final HFileContext meta;
1184
1185 public EncodedScannerV2(HFileReaderV2 reader, boolean cacheBlocks,
1186 boolean pread, boolean isCompaction, HFileContext meta) {
1187 super(reader, cacheBlocks, pread, isCompaction);
1188 DataBlockEncoding encoding = reader.dataBlockEncoder.getDataBlockEncoding();
1189 dataBlockEncoder = encoding.getEncoder();
1190 decodingCtx = dataBlockEncoder.newDataBlockDecodingContext(meta);
1191 seeker = dataBlockEncoder.createSeeker(
1192 reader.getComparator(), decodingCtx);
1193 this.meta = meta;
1194 }
1195
1196 @Override
1197 public boolean isSeeked(){
1198 return this.block != null;
1199 }
1200
1201
1202
1203
1204
1205
1206
1207
1208 private void updateCurrentBlock(HFileBlock newBlock) throws CorruptHFileException {
1209 block = newBlock;
1210
1211
1212 if (block.getBlockType() != BlockType.ENCODED_DATA) {
1213 throw new IllegalStateException(
1214 "EncodedScanner works only on encoded data blocks");
1215 }
1216 short dataBlockEncoderId = block.getDataBlockEncodingId();
1217 if (!DataBlockEncoding.isCorrectEncoder(dataBlockEncoder, dataBlockEncoderId)) {
1218 String encoderCls = dataBlockEncoder.getClass().getName();
1219 throw new CorruptHFileException("Encoder " + encoderCls
1220 + " doesn't support data block encoding "
1221 + DataBlockEncoding.getNameFromId(dataBlockEncoderId));
1222 }
1223
1224 seeker.setCurrentBuffer(getEncodedBuffer(newBlock));
1225 blockFetches.incrementAndGet();
1226
1227
1228 this.nextIndexedKey = null;
1229 }
1230
1231 private ByteBuffer getEncodedBuffer(HFileBlock newBlock) {
1232 ByteBuffer origBlock = newBlock.getBufferReadOnly();
1233 ByteBuffer encodedBlock = ByteBuffer.wrap(origBlock.array(),
1234 origBlock.arrayOffset() + newBlock.headerSize() +
1235 DataBlockEncoding.ID_SIZE,
1236 newBlock.getUncompressedSizeWithoutHeader() -
1237 DataBlockEncoding.ID_SIZE).slice();
1238 return encodedBlock;
1239 }
1240
1241 @Override
1242 public boolean seekTo() throws IOException {
1243 if (reader == null) {
1244 return false;
1245 }
1246
1247 if (reader.getTrailer().getEntryCount() == 0) {
1248
1249 return false;
1250 }
1251
1252 long firstDataBlockOffset =
1253 reader.getTrailer().getFirstDataBlockOffset();
1254 if (block != null && block.getOffset() == firstDataBlockOffset) {
1255 seeker.rewind();
1256 return true;
1257 }
1258
1259 block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread,
1260 isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding());
1261 if (block.getOffset() < 0) {
1262 throw new IOException("Invalid block offset: " + block.getOffset());
1263 }
1264 updateCurrentBlock(block);
1265 return true;
1266 }
1267
1268 @Override
1269 public boolean next() throws IOException {
1270 boolean isValid = seeker.next();
1271 if (!isValid) {
1272 block = readNextDataBlock();
1273 isValid = block != null;
1274 if (isValid) {
1275 updateCurrentBlock(block);
1276 }
1277 }
1278 return isValid;
1279 }
1280
1281 @Override
1282 public ByteBuffer getKey() {
1283 assertValidSeek();
1284 return seeker.getKeyDeepCopy();
1285 }
1286
1287 @Override
1288 public int compareKey(KVComparator comparator, byte[] key, int offset, int length) {
1289 return seeker.compareKey(comparator, key, offset, length);
1290 }
1291
1292 @Override
1293 public ByteBuffer getValue() {
1294 assertValidSeek();
1295 return seeker.getValueShallowCopy();
1296 }
1297
1298 @Override
1299 public Cell getKeyValue() {
1300 if (block == null) {
1301 return null;
1302 }
1303 return seeker.getKeyValue();
1304 }
1305
1306 @Override
1307 public String getKeyString() {
1308 ByteBuffer keyBuffer = getKey();
1309 return Bytes.toStringBinary(keyBuffer.array(),
1310 keyBuffer.arrayOffset(), keyBuffer.limit());
1311 }
1312
1313 @Override
1314 public String getValueString() {
1315 ByteBuffer valueBuffer = getValue();
1316 return Bytes.toStringBinary(valueBuffer.array(),
1317 valueBuffer.arrayOffset(), valueBuffer.limit());
1318 }
1319
1320 private void assertValidSeek() {
1321 if (block == null) {
1322 throw new NotSeekedException();
1323 }
1324 }
1325
1326 @Override
1327 protected ByteBuffer getFirstKeyInBlock(HFileBlock curBlock) {
1328 return dataBlockEncoder.getFirstKeyInBlock(getEncodedBuffer(curBlock));
1329 }
1330
1331 @Override
1332 protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, Cell nextIndexedKey,
1333 boolean rewind, Cell key, boolean seekBefore) throws IOException {
1334 if (block == null || block.getOffset() != seekToBlock.getOffset()) {
1335 updateCurrentBlock(seekToBlock);
1336 } else if (rewind) {
1337 seeker.rewind();
1338 }
1339 this.nextIndexedKey = nextIndexedKey;
1340 return seeker.seekToKeyInBlock(key, seekBefore);
1341 }
1342
1343 @Override
1344 public int compareKey(KVComparator comparator, Cell key) {
1345 return seeker.compareKey(comparator, key);
1346 }
1347 }
1348
1349
1350
1351
1352
1353 @Override
1354 public DataInput getGeneralBloomFilterMetadata() throws IOException {
1355 return this.getBloomFilterMetadata(BlockType.GENERAL_BLOOM_META);
1356 }
1357
1358 @Override
1359 public DataInput getDeleteBloomFilterMetadata() throws IOException {
1360 return this.getBloomFilterMetadata(BlockType.DELETE_FAMILY_BLOOM_META);
1361 }
1362
1363 private DataInput getBloomFilterMetadata(BlockType blockType)
1364 throws IOException {
1365 if (blockType != BlockType.GENERAL_BLOOM_META &&
1366 blockType != BlockType.DELETE_FAMILY_BLOOM_META) {
1367 throw new RuntimeException("Block Type: " + blockType.toString() +
1368 " is not supported") ;
1369 }
1370
1371 for (HFileBlock b : loadOnOpenBlocks)
1372 if (b.getBlockType() == blockType)
1373 return b.getByteStream();
1374 return null;
1375 }
1376
1377 @Override
1378 public boolean isFileInfoLoaded() {
1379 return true;
1380 }
1381
1382
1383
1384
1385
1386 private void validateMinorVersion(Path path, int minorVersion) {
1387 if (minorVersion < MIN_MINOR_VERSION ||
1388 minorVersion > MAX_MINOR_VERSION) {
1389 String msg = "Minor version for path " + path +
1390 " is expected to be between " +
1391 MIN_MINOR_VERSION + " and " + MAX_MINOR_VERSION +
1392 " but is found to be " + minorVersion;
1393 LOG.error(msg);
1394 throw new RuntimeException(msg);
1395 }
1396 }
1397
1398 @Override
1399 public int getMajorVersion() {
1400 return 2;
1401 }
1402
1403 @Override
1404 public HFileContext getFileContext() {
1405 return hfileContext;
1406 }
1407
1408
1409
1410
1411
1412 @VisibleForTesting
1413 boolean prefetchComplete() {
1414 return PrefetchExecutor.isCompleted(path);
1415 }
1416
1417 @Override
1418 public void unbufferStream() {
1419 fsBlockReader.unbufferStream();
1420 }
1421 }