1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.io.hfile;
20
21 import java.io.DataInput;
22 import java.io.IOException;
23 import java.nio.ByteBuffer;
24 import java.util.ArrayList;
25 import java.util.List;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.classification.InterfaceAudience;
30 import org.apache.hadoop.fs.FSDataInputStream;
31 import org.apache.hadoop.fs.Path;
32 import org.apache.hadoop.hbase.HConstants;
33 import org.apache.hadoop.hbase.KeyValue;
34 import org.apache.hadoop.hbase.fs.HFileSystem;
35 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
36 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
37 import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
38 import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
39 import org.apache.hadoop.hbase.util.Bytes;
40 import org.apache.hadoop.hbase.util.IdLock;
41 import org.apache.hadoop.io.WritableUtils;
42
43
44
45
46 @InterfaceAudience.Private
47 public class HFileReaderV2 extends AbstractHFileReader {
48
49 private static final Log LOG = LogFactory.getLog(HFileReaderV2.class);
50
51
52
53
54
55 private static int KEY_VALUE_LEN_SIZE = 2 * Bytes.SIZEOF_INT;
56
57 private boolean includesMemstoreTS = false;
58 private boolean decodeMemstoreTS = false;
59
60 private boolean shouldIncludeMemstoreTS() {
61 return includesMemstoreTS;
62 }
63
64
65 private HFileBlock.FSReader fsBlockReader;
66
67
68
69
70
71
72
73 private IdLock offsetLock = new IdLock();
74
75
76
77
78
79 private List<HFileBlock> loadOnOpenBlocks = new ArrayList<HFileBlock>();
80
81
82 static final int MIN_MINOR_VERSION = 0;
83
84
85
86
87 static final int MAX_MINOR_VERSION = 3;
88
89
90 static final int MINOR_VERSION_WITH_FAKED_KEY = 3;
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105 public HFileReaderV2(Path path, FixedFileTrailer trailer,
106 final FSDataInputStreamWrapper fsdis, final long size, final CacheConfig cacheConf,
107 DataBlockEncoding preferredEncodingInCache, final HFileSystem hfs)
108 throws IOException {
109 super(path, trailer, size, cacheConf, hfs);
110 trailer.expectMajorVersion(2);
111 validateMinorVersion(path, trailer.getMinorVersion());
112 HFileBlock.FSReaderV2 fsBlockReaderV2 = new HFileBlock.FSReaderV2(fsdis,
113 compressAlgo, fileSize, trailer.getMinorVersion(), hfs, path);
114 this.fsBlockReader = fsBlockReaderV2;
115
116
117 comparator = trailer.createComparator();
118 dataBlockIndexReader = new HFileBlockIndex.BlockIndexReader(comparator,
119 trailer.getNumDataIndexLevels(), this);
120 metaBlockIndexReader = new HFileBlockIndex.BlockIndexReader(
121 Bytes.BYTES_RAWCOMPARATOR, 1);
122
123
124
125 HFileBlock.BlockIterator blockIter = fsBlockReaderV2.blockRange(
126 trailer.getLoadOnOpenDataOffset(),
127 fileSize - trailer.getTrailerSize());
128
129
130
131 dataBlockIndexReader.readMultiLevelIndexRoot(
132 blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX),
133 trailer.getDataIndexCount());
134
135
136 metaBlockIndexReader.readRootIndex(
137 blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX),
138 trailer.getMetaIndexCount());
139
140
141 fileInfo = new FileInfo();
142 fileInfo.read(blockIter.nextBlockWithBlockType(BlockType.FILE_INFO).getByteStream());
143 lastKey = fileInfo.get(FileInfo.LASTKEY);
144 avgKeyLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_KEY_LEN));
145 avgValueLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_VALUE_LEN));
146 byte [] keyValueFormatVersion =
147 fileInfo.get(HFileWriterV2.KEY_VALUE_VERSION);
148 includesMemstoreTS = keyValueFormatVersion != null &&
149 Bytes.toInt(keyValueFormatVersion) ==
150 HFileWriterV2.KEY_VALUE_VER_WITH_MEMSTORE;
151 fsBlockReaderV2.setIncludesMemstoreTS(includesMemstoreTS);
152 if (includesMemstoreTS) {
153 decodeMemstoreTS = Bytes.toLong(fileInfo.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY)) > 0;
154 }
155
156
157 dataBlockEncoder = HFileDataBlockEncoderImpl.createFromFileInfo(fileInfo,
158 preferredEncodingInCache);
159 fsBlockReaderV2.setDataBlockEncoder(dataBlockEncoder);
160
161
162 HFileBlock b;
163 while ((b = blockIter.nextBlock()) != null) {
164 loadOnOpenBlocks.add(b);
165 }
166 }
167
168
169
170
171
172
173
174
175
176
177
178
179
180 @Override
181 public HFileScanner getScanner(boolean cacheBlocks, final boolean pread,
182 final boolean isCompaction) {
183
184 if (dataBlockEncoder.useEncodedScanner(isCompaction)) {
185 return new EncodedScannerV2(this, cacheBlocks, pread, isCompaction,
186 includesMemstoreTS);
187 }
188
189 return new ScannerV2(this, cacheBlocks, pread, isCompaction);
190 }
191
192
193
194
195
196
197
198 @Override
199 public ByteBuffer getMetaBlock(String metaBlockName, boolean cacheBlock)
200 throws IOException {
201 if (trailer.getMetaIndexCount() == 0) {
202 return null;
203 }
204 if (metaBlockIndexReader == null) {
205 throw new IOException("Meta index not loaded");
206 }
207
208 byte[] mbname = Bytes.toBytes(metaBlockName);
209 int block = metaBlockIndexReader.rootBlockContainingKey(mbname, 0,
210 mbname.length);
211 if (block == -1)
212 return null;
213 long blockSize = metaBlockIndexReader.getRootBlockDataSize(block);
214 long startTimeNs = System.nanoTime();
215
216
217
218
219 synchronized (metaBlockIndexReader.getRootBlockKey(block)) {
220
221 long metaBlockOffset = metaBlockIndexReader.getRootBlockOffset(block);
222 BlockCacheKey cacheKey = new BlockCacheKey(name, metaBlockOffset,
223 DataBlockEncoding.NONE, BlockType.META);
224
225 cacheBlock &= cacheConf.shouldCacheDataOnRead();
226 if (cacheConf.isBlockCacheEnabled()) {
227 HFileBlock cachedBlock =
228 (HFileBlock) cacheConf.getBlockCache().getBlock(cacheKey, cacheBlock, false);
229 if (cachedBlock != null) {
230
231
232 return cachedBlock.getBufferWithoutHeader();
233 }
234
235 }
236
237 HFileBlock metaBlock = fsBlockReader.readBlockData(metaBlockOffset,
238 blockSize, -1, true);
239
240 final long delta = System.nanoTime() - startTimeNs;
241 HFile.offerReadLatency(delta, true);
242
243
244 if (cacheBlock) {
245 cacheConf.getBlockCache().cacheBlock(cacheKey, metaBlock,
246 cacheConf.isInMemory());
247 }
248
249 return metaBlock.getBufferWithoutHeader();
250 }
251 }
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268 @Override
269 public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize,
270 final boolean cacheBlock, boolean pread, final boolean isCompaction,
271 BlockType expectedBlockType)
272 throws IOException {
273 if (dataBlockIndexReader == null) {
274 throw new IOException("Block index not loaded");
275 }
276 if (dataBlockOffset < 0
277 || dataBlockOffset >= trailer.getLoadOnOpenDataOffset()) {
278 throw new IOException("Requested block is out of range: "
279 + dataBlockOffset + ", lastDataBlockOffset: "
280 + trailer.getLastDataBlockOffset());
281 }
282
283
284
285
286
287
288 BlockCacheKey cacheKey =
289 new BlockCacheKey(name, dataBlockOffset,
290 dataBlockEncoder.getEffectiveEncodingInCache(isCompaction),
291 expectedBlockType);
292
293 boolean useLock = false;
294 IdLock.Entry lockEntry = null;
295 try {
296 while (true) {
297
298 if (useLock) {
299 lockEntry = offsetLock.getLockEntry(dataBlockOffset);
300 }
301
302
303 if (cacheConf.isBlockCacheEnabled()) {
304
305
306 HFileBlock cachedBlock = (HFileBlock) cacheConf.getBlockCache().getBlock(cacheKey,
307 cacheBlock, useLock);
308 if (cachedBlock != null) {
309 if (cachedBlock.getBlockType() == BlockType.DATA) {
310 HFile.dataBlockReadCnt.incrementAndGet();
311 }
312
313 validateBlockType(cachedBlock, expectedBlockType);
314
315
316
317 if (cachedBlock.getBlockType() == BlockType.ENCODED_DATA
318 && cachedBlock.getDataBlockEncoding() != dataBlockEncoder.getEncodingInCache()) {
319 throw new IOException("Cached block under key " + cacheKey + " "
320 + "has wrong encoding: " + cachedBlock.getDataBlockEncoding() + " (expected: "
321 + dataBlockEncoder.getEncodingInCache() + ")");
322 }
323 return cachedBlock;
324 }
325
326 }
327 if (!useLock) {
328
329 useLock = true;
330 continue;
331 }
332
333
334 long startTimeNs = System.nanoTime();
335 HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, -1,
336 pread);
337 hfileBlock = dataBlockEncoder.diskToCacheFormat(hfileBlock, isCompaction);
338 validateBlockType(hfileBlock, expectedBlockType);
339
340 final long delta = System.nanoTime() - startTimeNs;
341 HFile.offerReadLatency(delta, pread);
342
343
344 if (cacheBlock && cacheConf.shouldCacheBlockOnRead(hfileBlock.getBlockType().getCategory())) {
345 cacheConf.getBlockCache().cacheBlock(cacheKey, hfileBlock, cacheConf.isInMemory());
346 }
347
348 if (hfileBlock.getBlockType() == BlockType.DATA) {
349 HFile.dataBlockReadCnt.incrementAndGet();
350 }
351
352 return hfileBlock;
353 }
354 } finally {
355 if (lockEntry != null) {
356 offsetLock.releaseLockEntry(lockEntry);
357 }
358 }
359 }
360
361
362
363
364
365
366
367
368
369
370 private void validateBlockType(HFileBlock block,
371 BlockType expectedBlockType) throws IOException {
372 if (expectedBlockType == null) {
373 return;
374 }
375 BlockType actualBlockType = block.getBlockType();
376 if (actualBlockType == BlockType.ENCODED_DATA &&
377 expectedBlockType == BlockType.DATA) {
378
379
380 return;
381 }
382 if (actualBlockType != expectedBlockType) {
383 throw new IOException("Expected block type " + expectedBlockType + ", " +
384 "but got " + actualBlockType + ": " + block);
385 }
386 }
387
388
389
390
391
392
393 @Override
394 public byte[] getLastKey() {
395 return dataBlockIndexReader.isEmpty() ? null : lastKey;
396 }
397
398
399
400
401
402
403 @Override
404 public byte[] midkey() throws IOException {
405 return dataBlockIndexReader.midkey();
406 }
407
408 @Override
409 public void close() throws IOException {
410 close(cacheConf.shouldEvictOnClose());
411 }
412
413 public void close(boolean evictOnClose) throws IOException {
414 if (evictOnClose && cacheConf.isBlockCacheEnabled()) {
415 int numEvicted = cacheConf.getBlockCache().evictBlocksByHfileName(name);
416 if (LOG.isTraceEnabled()) {
417 LOG.trace("On close, file=" + name + " evicted=" + numEvicted
418 + " block(s)");
419 }
420 }
421 fsBlockReader.closeStreams();
422 }
423
424
425 @Override
426 HFileBlock.FSReader getUncachedBlockReader() {
427 return fsBlockReader;
428 }
429
430
431 protected abstract static class AbstractScannerV2
432 extends AbstractHFileReader.Scanner {
433 protected HFileBlock block;
434
435
436
437
438
439
440
441
442 protected byte[] nextIndexedKey;
443
444 public AbstractScannerV2(HFileReaderV2 r, boolean cacheBlocks,
445 final boolean pread, final boolean isCompaction) {
446 super(r, cacheBlocks, pread, isCompaction);
447 }
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465 protected int seekTo(byte[] key, int offset, int length, boolean rewind)
466 throws IOException {
467 HFileBlockIndex.BlockIndexReader indexReader =
468 reader.getDataBlockIndexReader();
469 BlockWithScanInfo blockWithScanInfo =
470 indexReader.loadDataBlockWithScanInfo(key, offset, length, block,
471 cacheBlocks, pread, isCompaction);
472 if (blockWithScanInfo == null || blockWithScanInfo.getHFileBlock() == null) {
473
474 return -1;
475 }
476 return loadBlockAndSeekToKey(blockWithScanInfo.getHFileBlock(),
477 blockWithScanInfo.getNextIndexedKey(), rewind, key, offset, length, false);
478 }
479
480 protected abstract ByteBuffer getFirstKeyInBlock(HFileBlock curBlock);
481
482 protected abstract int loadBlockAndSeekToKey(HFileBlock seekToBlock, byte[] nextIndexedKey,
483 boolean rewind, byte[] key, int offset, int length, boolean seekBefore)
484 throws IOException;
485
486 @Override
487 public int seekTo(byte[] key, int offset, int length) throws IOException {
488
489
490 return seekTo(key, offset, length, true);
491 }
492
493 @Override
494 public int reseekTo(byte[] key, int offset, int length) throws IOException {
495 int compared;
496 if (isSeeked()) {
497 ByteBuffer bb = getKey();
498 compared = reader.getComparator().compare(key, offset,
499 length, bb.array(), bb.arrayOffset(), bb.limit());
500 if (compared < 1) {
501
502
503 return compared;
504 } else {
505 if (this.nextIndexedKey != null &&
506 (this.nextIndexedKey == HConstants.NO_NEXT_INDEXED_KEY ||
507 reader.getComparator().compare(key, offset, length,
508 nextIndexedKey, 0, nextIndexedKey.length) < 0)) {
509
510
511
512 return loadBlockAndSeekToKey(this.block, this.nextIndexedKey,
513 false, key, offset, length, false);
514 }
515 }
516 }
517
518
519 return seekTo(key, offset, length, false);
520 }
521
522 @Override
523 public boolean seekBefore(byte[] key, int offset, int length)
524 throws IOException {
525 HFileBlock seekToBlock =
526 reader.getDataBlockIndexReader().seekToDataBlock(key, offset, length,
527 block, cacheBlocks, pread, isCompaction);
528 if (seekToBlock == null) {
529 return false;
530 }
531 ByteBuffer firstKey = getFirstKeyInBlock(seekToBlock);
532
533 if (reader.getComparator().compare(firstKey.array(),
534 firstKey.arrayOffset(), firstKey.limit(), key, offset, length) == 0)
535 {
536 long previousBlockOffset = seekToBlock.getPrevBlockOffset();
537
538 if (previousBlockOffset == -1) {
539
540 return false;
541 }
542
543
544
545
546 seekToBlock = reader.readBlock(previousBlockOffset,
547 seekToBlock.getOffset() - previousBlockOffset, cacheBlocks,
548 pread, isCompaction, BlockType.DATA);
549
550
551 }
552 byte[] firstKeyInCurrentBlock = Bytes.getBytes(firstKey);
553 loadBlockAndSeekToKey(seekToBlock, firstKeyInCurrentBlock, true, key, offset, length, true);
554 return true;
555 }
556
557
558
559
560
561
562
563
564
565 protected HFileBlock readNextDataBlock() throws IOException {
566 long lastDataBlockOffset = reader.getTrailer().getLastDataBlockOffset();
567 if (block == null)
568 return null;
569
570 HFileBlock curBlock = block;
571
572 do {
573 if (curBlock.getOffset() >= lastDataBlockOffset)
574 return null;
575
576 if (curBlock.getOffset() < 0) {
577 throw new IOException("Invalid block file offset: " + block);
578 }
579
580
581
582 curBlock = reader.readBlock(curBlock.getOffset()
583 + curBlock.getOnDiskSizeWithHeader(),
584 curBlock.getNextBlockOnDiskSizeWithHeader(), cacheBlocks, pread,
585 isCompaction, null);
586 } while (!(curBlock.getBlockType().equals(BlockType.DATA) ||
587 curBlock.getBlockType().equals(BlockType.ENCODED_DATA)));
588
589 return curBlock;
590 }
591 }
592
593
594
595
596 protected static class ScannerV2 extends AbstractScannerV2 {
597 private HFileReaderV2 reader;
598
599 public ScannerV2(HFileReaderV2 r, boolean cacheBlocks,
600 final boolean pread, final boolean isCompaction) {
601 super(r, cacheBlocks, pread, isCompaction);
602 this.reader = r;
603 }
604
605 @Override
606 public KeyValue getKeyValue() {
607 if (!isSeeked())
608 return null;
609
610 KeyValue ret = new KeyValue(blockBuffer.array(),
611 blockBuffer.arrayOffset() + blockBuffer.position(),
612 KEY_VALUE_LEN_SIZE + currKeyLen + currValueLen,
613 currKeyLen);
614 if (this.reader.shouldIncludeMemstoreTS()) {
615 ret.setMemstoreTS(currMemstoreTS);
616 }
617 return ret;
618 }
619
620 @Override
621 public ByteBuffer getKey() {
622 assertSeeked();
623 return ByteBuffer.wrap(
624 blockBuffer.array(),
625 blockBuffer.arrayOffset() + blockBuffer.position()
626 + KEY_VALUE_LEN_SIZE, currKeyLen).slice();
627 }
628
629 @Override
630 public ByteBuffer getValue() {
631 assertSeeked();
632 return ByteBuffer.wrap(
633 blockBuffer.array(),
634 blockBuffer.arrayOffset() + blockBuffer.position()
635 + KEY_VALUE_LEN_SIZE + currKeyLen, currValueLen).slice();
636 }
637
638 private void setNonSeekedState() {
639 block = null;
640 blockBuffer = null;
641 currKeyLen = 0;
642 currValueLen = 0;
643 currMemstoreTS = 0;
644 currMemstoreTSLen = 0;
645 }
646
647
648
649
650
651
652
653
654 @Override
655 public boolean next() throws IOException {
656 assertSeeked();
657
658 try {
659 blockBuffer.position(blockBuffer.position() + KEY_VALUE_LEN_SIZE
660 + currKeyLen + currValueLen + currMemstoreTSLen);
661 } catch (IllegalArgumentException e) {
662 LOG.error("Current pos = " + blockBuffer.position()
663 + "; currKeyLen = " + currKeyLen + "; currValLen = "
664 + currValueLen + "; block limit = " + blockBuffer.limit()
665 + "; HFile name = " + reader.getName()
666 + "; currBlock currBlockOffset = " + block.getOffset());
667 throw e;
668 }
669
670 if (blockBuffer.remaining() <= 0) {
671 long lastDataBlockOffset =
672 reader.getTrailer().getLastDataBlockOffset();
673
674 if (block.getOffset() >= lastDataBlockOffset) {
675 setNonSeekedState();
676 return false;
677 }
678
679
680 HFileBlock nextBlock = readNextDataBlock();
681 if (nextBlock == null) {
682 setNonSeekedState();
683 return false;
684 }
685
686 updateCurrBlock(nextBlock);
687 return true;
688 }
689
690
691 readKeyValueLen();
692 return true;
693 }
694
695
696
697
698
699
700
701
702 @Override
703 public boolean seekTo() throws IOException {
704 if (reader == null) {
705 return false;
706 }
707
708 if (reader.getTrailer().getEntryCount() == 0) {
709
710 return false;
711 }
712
713 long firstDataBlockOffset =
714 reader.getTrailer().getFirstDataBlockOffset();
715 if (block != null && block.getOffset() == firstDataBlockOffset) {
716 blockBuffer.rewind();
717 readKeyValueLen();
718 return true;
719 }
720
721 block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread,
722 isCompaction, BlockType.DATA);
723 if (block.getOffset() < 0) {
724 throw new IOException("Invalid block offset: " + block.getOffset());
725 }
726 updateCurrBlock(block);
727 return true;
728 }
729
730 @Override
731 protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, byte[] nextIndexedKey,
732 boolean rewind, byte[] key, int offset, int length, boolean seekBefore)
733 throws IOException {
734 if (block == null || block.getOffset() != seekToBlock.getOffset()) {
735 updateCurrBlock(seekToBlock);
736 } else if (rewind) {
737 blockBuffer.rewind();
738 }
739
740
741 this.nextIndexedKey = nextIndexedKey;
742 return blockSeek(key, offset, length, seekBefore);
743 }
744
745
746
747
748
749
750
751 private void updateCurrBlock(HFileBlock newBlock) {
752 block = newBlock;
753
754
755 if (block.getBlockType() != BlockType.DATA) {
756 throw new IllegalStateException("ScannerV2 works only on data " +
757 "blocks, got " + block.getBlockType() + "; " +
758 "fileName=" + reader.name + ", " +
759 "dataBlockEncoder=" + reader.dataBlockEncoder + ", " +
760 "isCompaction=" + isCompaction);
761 }
762
763 blockBuffer = block.getBufferWithoutHeader();
764 readKeyValueLen();
765 blockFetches++;
766
767
768 this.nextIndexedKey = null;
769 }
770
771 private final void readKeyValueLen() {
772 blockBuffer.mark();
773 currKeyLen = blockBuffer.getInt();
774 currValueLen = blockBuffer.getInt();
775 blockBuffer.reset();
776 if (this.reader.shouldIncludeMemstoreTS()) {
777 if (this.reader.decodeMemstoreTS) {
778 try {
779 int memstoreTSOffset = blockBuffer.arrayOffset()
780 + blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen
781 + currValueLen;
782 currMemstoreTS = Bytes.readVLong(blockBuffer.array(),
783 memstoreTSOffset);
784 currMemstoreTSLen = WritableUtils.getVIntSize(currMemstoreTS);
785 } catch (Exception e) {
786 throw new RuntimeException("Error reading memstore timestamp", e);
787 }
788 } else {
789 currMemstoreTS = 0;
790 currMemstoreTSLen = 1;
791 }
792 }
793
794 if (currKeyLen < 0 || currValueLen < 0
795 || currKeyLen > blockBuffer.limit()
796 || currValueLen > blockBuffer.limit()) {
797 throw new IllegalStateException("Invalid currKeyLen " + currKeyLen
798 + " or currValueLen " + currValueLen + ". Block offset: "
799 + block.getOffset() + ", block length: " + blockBuffer.limit()
800 + ", position: " + blockBuffer.position() + " (without header).");
801 }
802 }
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819 private int blockSeek(byte[] key, int offset, int length,
820 boolean seekBefore) {
821 int klen, vlen;
822 long memstoreTS = 0;
823 int memstoreTSLen = 0;
824 int lastKeyValueSize = -1;
825 do {
826 blockBuffer.mark();
827 klen = blockBuffer.getInt();
828 vlen = blockBuffer.getInt();
829 blockBuffer.reset();
830 if (this.reader.shouldIncludeMemstoreTS()) {
831 if (this.reader.decodeMemstoreTS) {
832 try {
833 int memstoreTSOffset = blockBuffer.arrayOffset()
834 + blockBuffer.position() + KEY_VALUE_LEN_SIZE + klen + vlen;
835 memstoreTS = Bytes.readVLong(blockBuffer.array(),
836 memstoreTSOffset);
837 memstoreTSLen = WritableUtils.getVIntSize(memstoreTS);
838 } catch (Exception e) {
839 throw new RuntimeException("Error reading memstore timestamp", e);
840 }
841 } else {
842 memstoreTS = 0;
843 memstoreTSLen = 1;
844 }
845 }
846
847 int keyOffset = blockBuffer.arrayOffset() + blockBuffer.position()
848 + KEY_VALUE_LEN_SIZE;
849 int comp = reader.getComparator().compare(key, offset, length,
850 blockBuffer.array(), keyOffset, klen);
851
852 if (comp == 0) {
853 if (seekBefore) {
854 if (lastKeyValueSize < 0) {
855 throw new IllegalStateException("blockSeek with seekBefore "
856 + "at the first key of the block: key="
857 + Bytes.toStringBinary(key) + ", blockOffset="
858 + block.getOffset() + ", onDiskSize="
859 + block.getOnDiskSizeWithHeader());
860 }
861 blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
862 readKeyValueLen();
863 return 1;
864 }
865 currKeyLen = klen;
866 currValueLen = vlen;
867 if (this.reader.shouldIncludeMemstoreTS()) {
868 currMemstoreTS = memstoreTS;
869 currMemstoreTSLen = memstoreTSLen;
870 }
871 return 0;
872 } else if (comp < 0) {
873 if (lastKeyValueSize > 0)
874 blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
875 readKeyValueLen();
876 if (lastKeyValueSize == -1 && blockBuffer.position() == 0
877 && this.reader.trailer.getMinorVersion() >= MINOR_VERSION_WITH_FAKED_KEY) {
878 return HConstants.INDEX_KEY_MAGIC;
879 }
880 return 1;
881 }
882
883
884 lastKeyValueSize = klen + vlen + memstoreTSLen + KEY_VALUE_LEN_SIZE;
885 blockBuffer.position(blockBuffer.position() + lastKeyValueSize);
886 } while (blockBuffer.remaining() > 0);
887
888
889
890
891 blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
892 readKeyValueLen();
893 return 1;
894 }
895
896 @Override
897 protected ByteBuffer getFirstKeyInBlock(HFileBlock curBlock) {
898 ByteBuffer buffer = curBlock.getBufferWithoutHeader();
899
900 buffer.rewind();
901 int klen = buffer.getInt();
902 buffer.getInt();
903 ByteBuffer keyBuff = buffer.slice();
904 keyBuff.limit(klen);
905 keyBuff.rewind();
906 return keyBuff;
907 }
908
909 @Override
910 public String getKeyString() {
911 return Bytes.toStringBinary(blockBuffer.array(),
912 blockBuffer.arrayOffset() + blockBuffer.position()
913 + KEY_VALUE_LEN_SIZE, currKeyLen);
914 }
915
916 @Override
917 public String getValueString() {
918 return Bytes.toString(blockBuffer.array(), blockBuffer.arrayOffset()
919 + blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen,
920 currValueLen);
921 }
922 }
923
924
925
926
927 protected static class EncodedScannerV2 extends AbstractScannerV2 {
928 private DataBlockEncoder.EncodedSeeker seeker = null;
929 private DataBlockEncoder dataBlockEncoder = null;
930 private final boolean includesMemstoreTS;
931
932 public EncodedScannerV2(HFileReaderV2 reader, boolean cacheBlocks,
933 boolean pread, boolean isCompaction, boolean includesMemstoreTS) {
934 super(reader, cacheBlocks, pread, isCompaction);
935 this.includesMemstoreTS = includesMemstoreTS;
936 }
937
938 private void setDataBlockEncoder(DataBlockEncoder dataBlockEncoder) {
939 this.dataBlockEncoder = dataBlockEncoder;
940 seeker = dataBlockEncoder.createSeeker(reader.getComparator(),
941 includesMemstoreTS);
942 }
943
944
945
946
947
948
949
950 private void updateCurrentBlock(HFileBlock newBlock) {
951 block = newBlock;
952
953
954 if (block.getBlockType() != BlockType.ENCODED_DATA) {
955 throw new IllegalStateException(
956 "EncodedScannerV2 works only on encoded data blocks");
957 }
958
959 short dataBlockEncoderId = block.getDataBlockEncodingId();
960 if (dataBlockEncoder == null ||
961 !DataBlockEncoding.isCorrectEncoder(dataBlockEncoder,
962 dataBlockEncoderId)) {
963 DataBlockEncoder encoder =
964 DataBlockEncoding.getDataBlockEncoderById(dataBlockEncoderId);
965 setDataBlockEncoder(encoder);
966 }
967
968 seeker.setCurrentBuffer(getEncodedBuffer(newBlock));
969 blockFetches++;
970 }
971
972 private ByteBuffer getEncodedBuffer(HFileBlock newBlock) {
973 ByteBuffer origBlock = newBlock.getBufferReadOnly();
974 ByteBuffer encodedBlock = ByteBuffer.wrap(origBlock.array(),
975 origBlock.arrayOffset() + newBlock.headerSize() +
976 DataBlockEncoding.ID_SIZE,
977 newBlock.getUncompressedSizeWithoutHeader() -
978 DataBlockEncoding.ID_SIZE).slice();
979 return encodedBlock;
980 }
981
982 @Override
983 public boolean seekTo() throws IOException {
984 if (reader == null) {
985 return false;
986 }
987
988 if (reader.getTrailer().getEntryCount() == 0) {
989
990 return false;
991 }
992
993 long firstDataBlockOffset =
994 reader.getTrailer().getFirstDataBlockOffset();
995 if (block != null && block.getOffset() == firstDataBlockOffset) {
996 seeker.rewind();
997 return true;
998 }
999
1000 block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread,
1001 isCompaction, BlockType.DATA);
1002 if (block.getOffset() < 0) {
1003 throw new IOException("Invalid block offset: " + block.getOffset());
1004 }
1005 updateCurrentBlock(block);
1006 return true;
1007 }
1008
1009 @Override
1010 public boolean next() throws IOException {
1011 boolean isValid = seeker.next();
1012 if (!isValid) {
1013 block = readNextDataBlock();
1014 isValid = block != null;
1015 if (isValid) {
1016 updateCurrentBlock(block);
1017 }
1018 }
1019 return isValid;
1020 }
1021
1022 @Override
1023 public ByteBuffer getKey() {
1024 assertValidSeek();
1025 return seeker.getKeyDeepCopy();
1026 }
1027
1028 @Override
1029 public ByteBuffer getValue() {
1030 assertValidSeek();
1031 return seeker.getValueShallowCopy();
1032 }
1033
1034 @Override
1035 public KeyValue getKeyValue() {
1036 if (block == null) {
1037 return null;
1038 }
1039 return seeker.getKeyValue();
1040 }
1041
1042 @Override
1043 public String getKeyString() {
1044 ByteBuffer keyBuffer = getKey();
1045 return Bytes.toStringBinary(keyBuffer.array(),
1046 keyBuffer.arrayOffset(), keyBuffer.limit());
1047 }
1048
1049 @Override
1050 public String getValueString() {
1051 ByteBuffer valueBuffer = getValue();
1052 return Bytes.toStringBinary(valueBuffer.array(),
1053 valueBuffer.arrayOffset(), valueBuffer.limit());
1054 }
1055
1056 private void assertValidSeek() {
1057 if (block == null) {
1058 throw new NotSeekedException();
1059 }
1060 }
1061
1062 @Override
1063 protected ByteBuffer getFirstKeyInBlock(HFileBlock curBlock) {
1064 return dataBlockEncoder.getFirstKeyInBlock(getEncodedBuffer(curBlock));
1065 }
1066
1067 @Override
1068 protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, byte[] nextIndexedKey,
1069 boolean rewind, byte[] key, int offset, int length, boolean seekBefore)
1070 throws IOException {
1071 if (block == null || block.getOffset() != seekToBlock.getOffset()) {
1072 updateCurrentBlock(seekToBlock);
1073 } else if (rewind) {
1074 seeker.rewind();
1075 }
1076 this.nextIndexedKey = nextIndexedKey;
1077 return seeker.seekToKeyInBlock(key, offset, length, seekBefore);
1078 }
1079 }
1080
1081
1082
1083
1084
1085 @Override
1086 public DataInput getGeneralBloomFilterMetadata() throws IOException {
1087 return this.getBloomFilterMetadata(BlockType.GENERAL_BLOOM_META);
1088 }
1089
1090 @Override
1091 public DataInput getDeleteBloomFilterMetadata() throws IOException {
1092 return this.getBloomFilterMetadata(BlockType.DELETE_FAMILY_BLOOM_META);
1093 }
1094
1095 private DataInput getBloomFilterMetadata(BlockType blockType)
1096 throws IOException {
1097 if (blockType != BlockType.GENERAL_BLOOM_META &&
1098 blockType != BlockType.DELETE_FAMILY_BLOOM_META) {
1099 throw new RuntimeException("Block Type: " + blockType.toString() +
1100 " is not supported") ;
1101 }
1102
1103 for (HFileBlock b : loadOnOpenBlocks)
1104 if (b.getBlockType() == blockType)
1105 return b.getByteStream();
1106 return null;
1107 }
1108
1109 @Override
1110 public boolean isFileInfoLoaded() {
1111 return true;
1112 }
1113
1114
1115
1116
1117
1118 private void validateMinorVersion(Path path, int minorVersion) {
1119 if (minorVersion < MIN_MINOR_VERSION ||
1120 minorVersion > MAX_MINOR_VERSION) {
1121 String msg = "Minor version for path " + path +
1122 " is expected to be between " +
1123 MIN_MINOR_VERSION + " and " + MAX_MINOR_VERSION +
1124 " but is found to be " + minorVersion;
1125 LOG.error(msg);
1126 throw new RuntimeException(msg);
1127 }
1128 }
1129 }