1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.io.hfile;
20
21 import java.io.ByteArrayInputStream;
22 import java.io.DataInput;
23 import java.io.DataInputStream;
24 import java.io.IOException;
25 import java.nio.ByteBuffer;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.classification.InterfaceAudience;
30 import org.apache.hadoop.fs.FSDataInputStream;
31 import org.apache.hadoop.fs.Path;
32 import org.apache.hadoop.hbase.KeyValue;
33 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
34 import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
35 import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
36 import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
37 import org.apache.hadoop.hbase.util.Bytes;
38 import org.apache.hadoop.io.IOUtils;
39 import org.apache.hadoop.io.RawComparator;
40
41 import com.google.common.base.Preconditions;
42
43
44
45
46
47
48 @InterfaceAudience.Private
49 public class HFileReaderV1 extends AbstractHFileReader {
50 private static final Log LOG = LogFactory.getLog(HFileReaderV1.class);
51
52 private volatile boolean fileInfoLoaded = false;
53
54
55
56
57
58
59
60
61
62
63 public HFileReaderV1(Path path, FixedFileTrailer trailer,
64 final FSDataInputStream fsdis, final long size,
65 final boolean closeIStream,
66 final CacheConfig cacheConf) throws IOException {
67 super(path, trailer, fsdis, size, closeIStream, cacheConf);
68
69 trailer.expectMajorVersion(1);
70 fsBlockReader = new HFileBlock.FSReaderV1(fsdis, compressAlgo, fileSize);
71 }
72
73 private byte[] readAllIndex(final FSDataInputStream in,
74 final long indexOffset, final int indexSize) throws IOException {
75 byte[] allIndex = new byte[indexSize];
76 in.seek(indexOffset);
77 IOUtils.readFully(in, allIndex, 0, allIndex.length);
78
79 return allIndex;
80 }
81
82
83
84
85
86
87
88
89 @Override
90 public FileInfo loadFileInfo() throws IOException {
91 if (fileInfoLoaded)
92 return fileInfo;
93
94
95 istream.seek(trailer.getFileInfoOffset());
96 fileInfo = new FileInfo();
97 fileInfo.read(istream);
98 lastKey = fileInfo.get(FileInfo.LASTKEY);
99 avgKeyLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_KEY_LEN));
100 avgValueLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_VALUE_LEN));
101
102
103 String clazzName = Bytes.toString(fileInfo.get(FileInfo.COMPARATOR));
104 comparator = getComparator(clazzName);
105
106 dataBlockIndexReader =
107 new HFileBlockIndex.BlockIndexReader(comparator, 1);
108 metaBlockIndexReader =
109 new HFileBlockIndex.BlockIndexReader(Bytes.BYTES_RAWCOMPARATOR, 1);
110
111 int sizeToLoadOnOpen = (int) (fileSize - trailer.getLoadOnOpenDataOffset() -
112 trailer.getTrailerSize());
113 byte[] dataAndMetaIndex = readAllIndex(istream,
114 trailer.getLoadOnOpenDataOffset(), sizeToLoadOnOpen);
115
116 ByteArrayInputStream bis = new ByteArrayInputStream(dataAndMetaIndex);
117 DataInputStream dis = new DataInputStream(bis);
118
119
120 if (trailer.getDataIndexCount() > 0)
121 BlockType.INDEX_V1.readAndCheck(dis);
122 dataBlockIndexReader.readRootIndex(dis, trailer.getDataIndexCount());
123
124
125 if (trailer.getMetaIndexCount() > 0)
126 BlockType.INDEX_V1.readAndCheck(dis);
127 metaBlockIndexReader.readRootIndex(dis, trailer.getMetaIndexCount());
128
129 fileInfoLoaded = true;
130 return fileInfo;
131 }
132
133
134
135
136
137
138
139
140 @SuppressWarnings("unchecked")
141 private RawComparator<byte[]> getComparator(final String clazzName)
142 throws IOException {
143 if (clazzName == null || clazzName.length() == 0) {
144 return null;
145 }
146 try {
147 return (RawComparator<byte[]>)Class.forName(clazzName).newInstance();
148 } catch (InstantiationException e) {
149 throw new IOException(e);
150 } catch (IllegalAccessException e) {
151 throw new IOException(e);
152 } catch (ClassNotFoundException e) {
153 throw new IOException(e);
154 }
155 }
156
157
158
159
160
161
162
163
164
165
166
167
168
169 @Override
170 public HFileScanner getScanner(boolean cacheBlocks, final boolean pread,
171 final boolean isCompaction) {
172 return new ScannerV1(this, cacheBlocks, pread, isCompaction);
173 }
174
175
176
177
178
179
180 protected int blockContainingKey(final byte[] key, int offset, int length) {
181 Preconditions.checkState(!dataBlockIndexReader.isEmpty(),
182 "Block index not loaded");
183 return dataBlockIndexReader.rootBlockContainingKey(key, offset, length);
184 }
185
186
187
188
189
190
191
192 @Override
193 public ByteBuffer getMetaBlock(String metaBlockName, boolean cacheBlock)
194 throws IOException {
195 if (trailer.getMetaIndexCount() == 0) {
196 return null;
197 }
198 if (metaBlockIndexReader == null) {
199 throw new IOException("Meta index not loaded");
200 }
201
202 byte[] nameBytes = Bytes.toBytes(metaBlockName);
203 int block = metaBlockIndexReader.rootBlockContainingKey(nameBytes, 0,
204 nameBytes.length);
205 if (block == -1)
206 return null;
207 long offset = metaBlockIndexReader.getRootBlockOffset(block);
208 long nextOffset;
209 if (block == metaBlockIndexReader.getRootBlockCount() - 1) {
210 nextOffset = trailer.getFileInfoOffset();
211 } else {
212 nextOffset = metaBlockIndexReader.getRootBlockOffset(block + 1);
213 }
214
215 long startTimeNs = System.nanoTime();
216
217 BlockCacheKey cacheKey = new BlockCacheKey(name, offset,
218 DataBlockEncoding.NONE, BlockType.META);
219
220 BlockCategory effectiveCategory = BlockCategory.META;
221 if (metaBlockName.equals(HFileWriterV1.BLOOM_FILTER_META_KEY) ||
222 metaBlockName.equals(HFileWriterV1.BLOOM_FILTER_DATA_KEY)) {
223 effectiveCategory = BlockCategory.BLOOM;
224 }
225
226
227 synchronized (metaBlockIndexReader.getRootBlockKey(block)) {
228
229 if (cacheConf.isBlockCacheEnabled()) {
230 HFileBlock cachedBlock =
231 (HFileBlock) cacheConf.getBlockCache().getBlock(cacheKey,
232 cacheConf.shouldCacheBlockOnRead(effectiveCategory), false);
233 if (cachedBlock != null) {
234 return cachedBlock.getBufferWithoutHeader();
235 }
236
237 }
238
239 HFileBlock hfileBlock = fsBlockReader.readBlockData(offset,
240 nextOffset - offset, metaBlockIndexReader.getRootBlockDataSize(block),
241 true);
242 hfileBlock.expectType(BlockType.META);
243
244 final long delta = System.nanoTime() - startTimeNs;
245 HFile.offerReadLatency(delta, true);
246
247
248 if (cacheBlock && cacheConf.shouldCacheBlockOnRead(effectiveCategory)) {
249 cacheConf.getBlockCache().cacheBlock(cacheKey, hfileBlock,
250 cacheConf.isInMemory());
251 }
252
253 return hfileBlock.getBufferWithoutHeader();
254 }
255 }
256
257
258
259
260
261
262
263
264
265
266 ByteBuffer readBlockBuffer(int block, boolean cacheBlock,
267 final boolean pread, final boolean isCompaction) throws IOException {
268 if (dataBlockIndexReader == null) {
269 throw new IOException("Block index not loaded");
270 }
271 if (block < 0 || block >= dataBlockIndexReader.getRootBlockCount()) {
272 throw new IOException("Requested block is out of range: " + block +
273 ", max: " + dataBlockIndexReader.getRootBlockCount());
274 }
275
276 long offset = dataBlockIndexReader.getRootBlockOffset(block);
277 BlockCacheKey cacheKey = new BlockCacheKey(name, offset);
278
279
280
281
282
283
284 synchronized (dataBlockIndexReader.getRootBlockKey(block)) {
285
286 if (cacheConf.isBlockCacheEnabled()) {
287 HFileBlock cachedBlock =
288 (HFileBlock) cacheConf.getBlockCache().getBlock(cacheKey,
289 cacheConf.shouldCacheDataOnRead(), false);
290 if (cachedBlock != null) {
291 return cachedBlock.getBufferWithoutHeader();
292 }
293
294 }
295
296
297 long startTimeNs = System.nanoTime();
298 long nextOffset;
299
300 if (block == dataBlockIndexReader.getRootBlockCount() - 1) {
301
302
303 nextOffset = (metaBlockIndexReader.getRootBlockCount() == 0) ?
304 this.trailer.getFileInfoOffset() :
305 metaBlockIndexReader.getRootBlockOffset(0);
306 } else {
307 nextOffset = dataBlockIndexReader.getRootBlockOffset(block + 1);
308 }
309
310 HFileBlock hfileBlock = fsBlockReader.readBlockData(offset, nextOffset
311 - offset, dataBlockIndexReader.getRootBlockDataSize(block), pread);
312 hfileBlock.expectType(BlockType.DATA);
313
314 final long delta = System.nanoTime() - startTimeNs;
315 HFile.offerReadLatency(delta, pread);
316
317
318 if (cacheBlock && cacheConf.shouldCacheBlockOnRead(
319 hfileBlock.getBlockType().getCategory())) {
320 cacheConf.getBlockCache().cacheBlock(cacheKey, hfileBlock,
321 cacheConf.isInMemory());
322 }
323 return hfileBlock.getBufferWithoutHeader();
324 }
325 }
326
327
328
329
330
331
332 public byte[] getLastKey() {
333 if (!fileInfoLoaded) {
334 throw new RuntimeException("Load file info first");
335 }
336 return dataBlockIndexReader.isEmpty() ? null : lastKey;
337 }
338
339
340
341
342
343
344
345 @Override
346 public byte[] midkey() throws IOException {
347 Preconditions.checkState(isFileInfoLoaded(), "File info is not loaded");
348 Preconditions.checkState(!dataBlockIndexReader.isEmpty(),
349 "Data block index is not loaded or is empty");
350 return dataBlockIndexReader.midkey();
351 }
352
353 @Override
354 public void close() throws IOException {
355 close(cacheConf.shouldEvictOnClose());
356 }
357
358 @Override
359 public void close(boolean evictOnClose) throws IOException {
360 if (evictOnClose && cacheConf.isBlockCacheEnabled()) {
361 int numEvicted = 0;
362 for (int i = 0; i < dataBlockIndexReader.getRootBlockCount(); i++) {
363 if (cacheConf.getBlockCache().evictBlock(
364 new BlockCacheKey(name,
365 dataBlockIndexReader.getRootBlockOffset(i),
366 DataBlockEncoding.NONE, BlockType.DATA))) {
367 numEvicted++;
368 }
369 }
370 LOG.debug("On close of file " + name + " evicted " + numEvicted
371 + " block(s) of " + dataBlockIndexReader.getRootBlockCount()
372 + " total blocks");
373 }
374 if (this.closeIStream && this.istream != null) {
375 this.istream.close();
376 this.istream = null;
377 }
378 }
379
380 protected abstract static class AbstractScannerV1
381 extends AbstractHFileReader.Scanner {
382 protected int currBlock;
383
384
385
386
387
388 protected HFileReaderV1 reader;
389
390 public AbstractScannerV1(HFileReaderV1 reader, boolean cacheBlocks,
391 final boolean pread, final boolean isCompaction) {
392 super(reader, cacheBlocks, pread, isCompaction);
393 this.reader = (HFileReaderV1) reader;
394 }
395
396
397
398
399
400
401
402
403
404
405 protected abstract int blockSeek(byte[] key, int offset, int length,
406 boolean seekBefore);
407
408 protected abstract void loadBlock(int bloc, boolean rewind)
409 throws IOException;
410
411 @Override
412 public int seekTo(byte[] key, int offset, int length) throws IOException {
413 int b = reader.blockContainingKey(key, offset, length);
414 if (b < 0) return -1;
415
416 loadBlock(b, true);
417 return blockSeek(key, offset, length, false);
418 }
419
420 @Override
421 public int reseekTo(byte[] key, int offset, int length)
422 throws IOException {
423 if (blockBuffer != null && currKeyLen != 0) {
424 ByteBuffer bb = getKey();
425 int compared = reader.getComparator().compare(key, offset,
426 length, bb.array(), bb.arrayOffset(), bb.limit());
427 if (compared < 1) {
428
429
430 return compared;
431 }
432 }
433
434 int b = reader.blockContainingKey(key, offset, length);
435 if (b < 0) {
436 return -1;
437 }
438 loadBlock(b, false);
439 return blockSeek(key, offset, length, false);
440 }
441
442 @Override
443 public boolean seekBefore(byte[] key, int offset, int length)
444 throws IOException {
445 int b = reader.blockContainingKey(key, offset, length);
446 if (b < 0)
447 return false;
448
449
450 byte[] firstkKey = reader.getDataBlockIndexReader().getRootBlockKey(b);
451 if (reader.getComparator().compare(firstkKey, 0, firstkKey.length,
452 key, offset, length) == 0) {
453
454
455 if (b == 0) {
456
457 return false;
458 }
459 b--;
460
461
462 }
463 loadBlock(b, true);
464 blockSeek(key, offset, length, true);
465 return true;
466 }
467 }
468
469
470
471
472
473 protected static class ScannerV1 extends AbstractScannerV1 {
474 private HFileReaderV1 reader;
475
476 public ScannerV1(HFileReaderV1 reader, boolean cacheBlocks,
477 final boolean pread, final boolean isCompaction) {
478 super(reader, cacheBlocks, pread, isCompaction);
479 this.reader = reader;
480 }
481
482 @Override
483 public KeyValue getKeyValue() {
484 if (blockBuffer == null) {
485 return null;
486 }
487 return new KeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
488 + blockBuffer.position() - 8);
489 }
490
491 @Override
492 public ByteBuffer getKey() {
493 Preconditions.checkState(blockBuffer != null && currKeyLen > 0,
494 "you need to seekTo() before calling getKey()");
495
496 ByteBuffer keyBuff = blockBuffer.slice();
497 keyBuff.limit(currKeyLen);
498 keyBuff.rewind();
499
500 return keyBuff;
501 }
502
503 @Override
504 public ByteBuffer getValue() {
505 if (blockBuffer == null || currKeyLen == 0) {
506 throw new RuntimeException(
507 "you need to seekTo() before calling getValue()");
508 }
509
510
511 ByteBuffer valueBuff = blockBuffer.slice();
512 valueBuff.position(currKeyLen);
513 valueBuff = valueBuff.slice();
514 valueBuff.limit(currValueLen);
515 valueBuff.rewind();
516 return valueBuff;
517 }
518
519 @Override
520 public boolean next() throws IOException {
521 if (blockBuffer == null) {
522 throw new IOException("Next called on non-seeked scanner");
523 }
524
525 try {
526 blockBuffer.position(blockBuffer.position() + currKeyLen
527 + currValueLen);
528 } catch (IllegalArgumentException e) {
529 LOG.error("Current pos = " + blockBuffer.position() +
530 "; currKeyLen = " + currKeyLen +
531 "; currValLen = " + currValueLen +
532 "; block limit = " + blockBuffer.limit() +
533 "; HFile name = " + reader.getName() +
534 "; currBlock id = " + currBlock, e);
535 throw e;
536 }
537 if (blockBuffer.remaining() <= 0) {
538 currBlock++;
539 if (currBlock >= reader.getDataBlockIndexReader().getRootBlockCount()) {
540
541 currBlock = 0;
542 blockBuffer = null;
543 return false;
544 }
545 blockBuffer = reader.readBlockBuffer(currBlock, cacheBlocks, pread,
546 isCompaction);
547 currKeyLen = blockBuffer.getInt();
548 currValueLen = blockBuffer.getInt();
549 blockFetches++;
550 return true;
551 }
552
553 currKeyLen = blockBuffer.getInt();
554 currValueLen = blockBuffer.getInt();
555 return true;
556 }
557
558 @Override
559 protected int blockSeek(byte[] key, int offset, int length,
560 boolean seekBefore) {
561 int klen, vlen;
562 int lastLen = 0;
563 do {
564 klen = blockBuffer.getInt();
565 vlen = blockBuffer.getInt();
566 int comp = reader.getComparator().compare(key, offset, length,
567 blockBuffer.array(),
568 blockBuffer.arrayOffset() + blockBuffer.position(), klen);
569 if (comp == 0) {
570 if (seekBefore) {
571 blockBuffer.position(blockBuffer.position() - lastLen - 16);
572 currKeyLen = blockBuffer.getInt();
573 currValueLen = blockBuffer.getInt();
574 return 1;
575 }
576 currKeyLen = klen;
577 currValueLen = vlen;
578 return 0;
579 }
580 if (comp < 0) {
581
582 blockBuffer.position(blockBuffer.position() - lastLen - 16);
583 currKeyLen = blockBuffer.getInt();
584 currValueLen = blockBuffer.getInt();
585 return 1;
586 }
587 blockBuffer.position(blockBuffer.position() + klen + vlen);
588 lastLen = klen + vlen;
589 } while (blockBuffer.remaining() > 0);
590
591
592
593
594 blockBuffer.position(blockBuffer.position() - lastLen - 8);
595 currKeyLen = blockBuffer.getInt();
596 currValueLen = blockBuffer.getInt();
597 return 1;
598 }
599
600 @Override
601 public String getKeyString() {
602 return Bytes.toStringBinary(blockBuffer.array(),
603 blockBuffer.arrayOffset() + blockBuffer.position(), currKeyLen);
604 }
605
606 @Override
607 public String getValueString() {
608 return Bytes.toString(blockBuffer.array(), blockBuffer.arrayOffset() +
609 blockBuffer.position() + currKeyLen, currValueLen);
610 }
611
612 @Override
613 public boolean seekTo() throws IOException {
614 if (reader.getDataBlockIndexReader().isEmpty()) {
615 return false;
616 }
617 if (blockBuffer != null && currBlock == 0) {
618 blockBuffer.rewind();
619 currKeyLen = blockBuffer.getInt();
620 currValueLen = blockBuffer.getInt();
621 return true;
622 }
623 currBlock = 0;
624 blockBuffer = reader.readBlockBuffer(currBlock, cacheBlocks, pread,
625 isCompaction);
626 currKeyLen = blockBuffer.getInt();
627 currValueLen = blockBuffer.getInt();
628 blockFetches++;
629 return true;
630 }
631
632 @Override
633 protected void loadBlock(int bloc, boolean rewind) throws IOException {
634 if (blockBuffer == null) {
635 blockBuffer = reader.readBlockBuffer(bloc, cacheBlocks, pread,
636 isCompaction);
637 currBlock = bloc;
638 blockFetches++;
639 } else {
640 if (bloc != currBlock) {
641 blockBuffer = reader.readBlockBuffer(bloc, cacheBlocks, pread,
642 isCompaction);
643 currBlock = bloc;
644 blockFetches++;
645 } else {
646
647 if (rewind) {
648 blockBuffer.rewind();
649 }
650 else {
651
652 blockBuffer.position(blockBuffer.position()-8);
653 }
654 }
655 }
656 }
657
658 }
659
660 @Override
661 public HFileBlock readBlock(long offset, long onDiskBlockSize,
662 boolean cacheBlock, boolean pread, boolean isCompaction,
663 BlockType expectedBlockType) {
664 throw new UnsupportedOperationException();
665 }
666
667 @Override
668 public DataInput getGeneralBloomFilterMetadata() throws IOException {
669
670
671 ByteBuffer buf = getMetaBlock(HFileWriterV1.BLOOM_FILTER_META_KEY, false);
672 if (buf == null)
673 return null;
674 ByteArrayInputStream bais = new ByteArrayInputStream(buf.array(),
675 buf.arrayOffset(), buf.limit());
676 return new DataInputStream(bais);
677 }
678
679 @Override
680 public DataInput getDeleteBloomFilterMetadata() throws IOException {
681 return null;
682 }
683
684 @Override
685 public boolean isFileInfoLoaded() {
686 return fileInfoLoaded;
687 }
688
689 }