1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.io.hfile;
19
20 import java.io.ByteArrayOutputStream;
21 import java.io.DataInputStream;
22 import java.io.DataOutput;
23 import java.io.DataOutputStream;
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.nio.ByteBuffer;
27 import java.util.concurrent.locks.Lock;
28 import java.util.concurrent.locks.ReentrantLock;
29
30 import org.apache.hadoop.fs.FSDataInputStream;
31 import org.apache.hadoop.fs.FSDataOutputStream;
32 import org.apache.hadoop.fs.Path;
33 import org.apache.hadoop.hbase.Cell;
34 import org.apache.hadoop.hbase.HConstants;
35 import org.apache.hadoop.hbase.classification.InterfaceAudience;
36 import org.apache.hadoop.hbase.fs.HFileSystem;
37 import org.apache.hadoop.hbase.io.ByteBufferInputStream;
38 import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
39 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
40 import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
41 import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
42 import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
43 import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
44 import org.apache.hadoop.hbase.util.ByteBufferUtils;
45 import org.apache.hadoop.hbase.util.Bytes;
46 import org.apache.hadoop.hbase.util.ChecksumType;
47 import org.apache.hadoop.hbase.util.ClassSize;
48 import org.apache.hadoop.io.IOUtils;
49
50 import com.google.common.annotations.VisibleForTesting;
51 import com.google.common.base.Preconditions;
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85 @InterfaceAudience.Private
86 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="HE_EQUALS_USE_HASHCODE",
87 justification="Fix!!! Fine for now bug FIXXXXXXX!!!!")
88 public class HFileBlock implements Cacheable {
89
90
91
92
93
94
95 static final int CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD = 3;
96
97 public static final boolean FILL_HEADER = true;
98 public static final boolean DONT_FILL_HEADER = false;
99
100
101
102
103
104 public static final int ENCODED_HEADER_SIZE = HConstants.HFILEBLOCK_HEADER_SIZE
105 + DataBlockEncoding.ID_SIZE;
106
107 static final byte[] DUMMY_HEADER_NO_CHECKSUM =
108 new byte[HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM];
109
110 public static final int BYTE_BUFFER_HEAP_SIZE = (int) ClassSize.estimateBase(
111 ByteBuffer.wrap(new byte[0], 0, 0).getClass(), false);
112
113
114 public static final int EXTRA_SERIALIZATION_SPACE = Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT
115 + Bytes.SIZEOF_LONG;
116
117
118
119
120 static final int CHECKSUM_SIZE = Bytes.SIZEOF_INT;
121
122 static final CacheableDeserializer<Cacheable> blockDeserializer =
123 new CacheableDeserializer<Cacheable>() {
124 public HFileBlock deserialize(ByteBuffer buf, boolean reuse) throws IOException{
125 buf.limit(buf.limit() - HFileBlock.EXTRA_SERIALIZATION_SPACE).rewind();
126 ByteBuffer newByteBuffer;
127 if (reuse) {
128 newByteBuffer = buf.slice();
129 } else {
130 newByteBuffer = ByteBuffer.allocate(buf.limit());
131 newByteBuffer.put(buf);
132 }
133 buf.position(buf.limit());
134 buf.limit(buf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE);
135 boolean usesChecksum = buf.get() == (byte)1;
136 HFileBlock hFileBlock = new HFileBlock(newByteBuffer, usesChecksum);
137 hFileBlock.offset = buf.getLong();
138 hFileBlock.nextBlockOnDiskSizeWithHeader = buf.getInt();
139 if (hFileBlock.hasNextBlockHeader()) {
140 hFileBlock.buf.limit(hFileBlock.buf.limit() - hFileBlock.headerSize());
141 }
142 return hFileBlock;
143 }
144
145 @Override
146 public int getDeserialiserIdentifier() {
147 return deserializerIdentifier;
148 }
149
150 @Override
151 public HFileBlock deserialize(ByteBuffer b) throws IOException {
152 return deserialize(b, false);
153 }
154 };
155 private static final int deserializerIdentifier;
156 static {
157 deserializerIdentifier = CacheableDeserializerIdManager
158 .registerDeserializer(blockDeserializer);
159 }
160
161
162 static class Header {
163
164
165
166
167
168
169
170
171
172 static int BLOCK_MAGIC_INDEX = 0;
173 static int ON_DISK_SIZE_WITHOUT_HEADER_INDEX = 8;
174 static int UNCOMPRESSED_SIZE_WITHOUT_HEADER_INDEX = 12;
175 static int PREV_BLOCK_OFFSET_INDEX = 16;
176 static int CHECKSUM_TYPE_INDEX = 24;
177 static int BYTES_PER_CHECKSUM_INDEX = 25;
178 static int ON_DISK_DATA_SIZE_WITH_HEADER_INDEX = 29;
179 }
180
181
182 private BlockType blockType;
183
184
185 private int onDiskSizeWithoutHeader;
186
187
188 private final int uncompressedSizeWithoutHeader;
189
190
191 private final long prevBlockOffset;
192
193
194
195
196
197 private final int onDiskDataSizeWithHeader;
198
199
200 private ByteBuffer buf;
201
202
203 private HFileContext fileContext;
204
205
206
207
208
209 private long offset = -1;
210
211
212
213
214
215
216 private int nextBlockOnDiskSizeWithHeader = -1;
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234 HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader, int uncompressedSizeWithoutHeader,
235 long prevBlockOffset, ByteBuffer buf, boolean fillHeader, long offset,
236 int onDiskDataSizeWithHeader, HFileContext fileContext) {
237 this.blockType = blockType;
238 this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader;
239 this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader;
240 this.prevBlockOffset = prevBlockOffset;
241 this.buf = buf;
242 this.offset = offset;
243 this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader;
244 this.fileContext = fileContext;
245 if (fillHeader)
246 overwriteHeader();
247 this.buf.rewind();
248 }
249
250
251
252
253 HFileBlock(HFileBlock that) {
254 this.blockType = that.blockType;
255 this.onDiskSizeWithoutHeader = that.onDiskSizeWithoutHeader;
256 this.uncompressedSizeWithoutHeader = that.uncompressedSizeWithoutHeader;
257 this.prevBlockOffset = that.prevBlockOffset;
258 this.buf = that.buf.duplicate();
259 this.offset = that.offset;
260 this.onDiskDataSizeWithHeader = that.onDiskDataSizeWithHeader;
261 this.fileContext = that.fileContext;
262 this.nextBlockOnDiskSizeWithHeader = that.nextBlockOnDiskSizeWithHeader;
263 }
264
265
266
267
268
269
270
271
272
273 HFileBlock(ByteBuffer b, boolean usesHBaseChecksum) throws IOException {
274 b.rewind();
275 blockType = BlockType.read(b);
276 onDiskSizeWithoutHeader = b.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX);
277 uncompressedSizeWithoutHeader = b.getInt(Header.UNCOMPRESSED_SIZE_WITHOUT_HEADER_INDEX);
278 prevBlockOffset = b.getLong(Header.PREV_BLOCK_OFFSET_INDEX);
279 HFileContextBuilder contextBuilder = new HFileContextBuilder();
280 contextBuilder.withHBaseCheckSum(usesHBaseChecksum);
281 if (usesHBaseChecksum) {
282 contextBuilder.withChecksumType(ChecksumType.codeToType(b.get(Header.CHECKSUM_TYPE_INDEX)));
283 contextBuilder.withBytesPerCheckSum(b.getInt(Header.BYTES_PER_CHECKSUM_INDEX));
284 this.onDiskDataSizeWithHeader = b.getInt(Header.ON_DISK_DATA_SIZE_WITH_HEADER_INDEX);
285 } else {
286 contextBuilder.withChecksumType(ChecksumType.NULL);
287 contextBuilder.withBytesPerCheckSum(0);
288 this.onDiskDataSizeWithHeader = onDiskSizeWithoutHeader +
289 HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
290 }
291 this.fileContext = contextBuilder.build();
292 buf = b;
293 buf.rewind();
294 }
295
296 public BlockType getBlockType() {
297 return blockType;
298 }
299
300
301 public short getDataBlockEncodingId() {
302 if (blockType != BlockType.ENCODED_DATA) {
303 throw new IllegalArgumentException("Querying encoder ID of a block " +
304 "of type other than " + BlockType.ENCODED_DATA + ": " + blockType);
305 }
306 return buf.getShort(headerSize());
307 }
308
309
310
311
312 public int getOnDiskSizeWithHeader() {
313 return onDiskSizeWithoutHeader + headerSize();
314 }
315
316
317
318
319 public int getOnDiskSizeWithoutHeader() {
320 return onDiskSizeWithoutHeader;
321 }
322
323
324
325
326 public int getUncompressedSizeWithoutHeader() {
327 return uncompressedSizeWithoutHeader;
328 }
329
330
331
332
333
334 public long getPrevBlockOffset() {
335 return prevBlockOffset;
336 }
337
338
339
340
341
342 private void overwriteHeader() {
343 buf.rewind();
344 blockType.write(buf);
345 buf.putInt(onDiskSizeWithoutHeader);
346 buf.putInt(uncompressedSizeWithoutHeader);
347 buf.putLong(prevBlockOffset);
348 if (this.fileContext.isUseHBaseChecksum()) {
349 buf.put(fileContext.getChecksumType().getCode());
350 buf.putInt(fileContext.getBytesPerChecksum());
351 buf.putInt(onDiskDataSizeWithHeader);
352 }
353 }
354
355
356
357
358
359
360 public ByteBuffer getBufferWithoutHeader() {
361 ByteBuffer dup = this.buf.duplicate();
362 dup.position(headerSize());
363 dup.limit(buf.limit() - totalChecksumBytes());
364 return dup.slice();
365 }
366
367
368
369
370
371
372
373
374
375
376
377 public ByteBuffer getBufferReadOnly() {
378 ByteBuffer dup = this.buf.duplicate();
379 dup.limit(buf.limit() - totalChecksumBytes());
380 return dup.slice();
381 }
382
383
384
385
386
387
388
389
390 public ByteBuffer getBufferReadOnlyWithHeader() {
391 ByteBuffer dup = this.buf.duplicate();
392 return dup.slice();
393 }
394
395
396
397
398
399
400
401 ByteBuffer getBufferWithHeader() {
402 ByteBuffer dupBuf = buf.duplicate();
403 dupBuf.rewind();
404 return dupBuf;
405 }
406
407 private void sanityCheckAssertion(long valueFromBuf, long valueFromField,
408 String fieldName) throws IOException {
409 if (valueFromBuf != valueFromField) {
410 throw new AssertionError(fieldName + " in the buffer (" + valueFromBuf
411 + ") is different from that in the field (" + valueFromField + ")");
412 }
413 }
414
415 private void sanityCheckAssertion(BlockType valueFromBuf, BlockType valueFromField)
416 throws IOException {
417 if (valueFromBuf != valueFromField) {
418 throw new IOException("Block type stored in the buffer: " +
419 valueFromBuf + ", block type field: " + valueFromField);
420 }
421 }
422
423
424
425
426
427
428
429
430 void sanityCheck() throws IOException {
431 buf.rewind();
432
433 sanityCheckAssertion(BlockType.read(buf), blockType);
434
435 sanityCheckAssertion(buf.getInt(), onDiskSizeWithoutHeader,
436 "onDiskSizeWithoutHeader");
437
438 sanityCheckAssertion(buf.getInt(), uncompressedSizeWithoutHeader,
439 "uncompressedSizeWithoutHeader");
440
441 sanityCheckAssertion(buf.getLong(), prevBlockOffset, "prevBlocKOffset");
442 if (this.fileContext.isUseHBaseChecksum()) {
443 sanityCheckAssertion(buf.get(), this.fileContext.getChecksumType().getCode(), "checksumType");
444 sanityCheckAssertion(buf.getInt(), this.fileContext.getBytesPerChecksum(), "bytesPerChecksum");
445 sanityCheckAssertion(buf.getInt(), onDiskDataSizeWithHeader, "onDiskDataSizeWithHeader");
446 }
447
448 int cksumBytes = totalChecksumBytes();
449 int expectedBufLimit = onDiskDataSizeWithHeader + cksumBytes;
450 if (buf.limit() != expectedBufLimit) {
451 throw new AssertionError("Expected buffer limit " + expectedBufLimit
452 + ", got " + buf.limit());
453 }
454
455
456
457 int hdrSize = headerSize();
458 if (buf.capacity() != expectedBufLimit &&
459 buf.capacity() != expectedBufLimit + hdrSize) {
460 throw new AssertionError("Invalid buffer capacity: " + buf.capacity() +
461 ", expected " + expectedBufLimit + " or " + (expectedBufLimit + hdrSize));
462 }
463 }
464
465 @Override
466 public String toString() {
467 StringBuilder sb = new StringBuilder()
468 .append("HFileBlock [")
469 .append(" fileOffset=").append(offset)
470 .append(" headerSize()=").append(headerSize())
471 .append(" blockType=").append(blockType)
472 .append(" onDiskSizeWithoutHeader=").append(onDiskSizeWithoutHeader)
473 .append(" uncompressedSizeWithoutHeader=").append(uncompressedSizeWithoutHeader)
474 .append(" prevBlockOffset=").append(prevBlockOffset)
475 .append(" isUseHBaseChecksum()=").append(fileContext.isUseHBaseChecksum());
476 if (fileContext.isUseHBaseChecksum()) {
477 sb.append(" checksumType=").append(ChecksumType.codeToType(this.buf.get(24)))
478 .append(" bytesPerChecksum=").append(this.buf.getInt(24 + 1))
479 .append(" onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader);
480 } else {
481 sb.append(" onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader)
482 .append("(").append(onDiskSizeWithoutHeader)
483 .append("+").append(HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM).append(")");
484 }
485 String dataBegin = null;
486 if (buf.hasArray()) {
487 dataBegin = Bytes.toStringBinary(buf.array(), buf.arrayOffset() + headerSize(),
488 Math.min(32, buf.limit() - buf.arrayOffset() - headerSize()));
489 } else {
490 ByteBuffer bufWithoutHeader = getBufferWithoutHeader();
491 byte[] dataBeginBytes = new byte[Math.min(32,
492 bufWithoutHeader.limit() - bufWithoutHeader.position())];
493 bufWithoutHeader.get(dataBeginBytes);
494 dataBegin = Bytes.toStringBinary(dataBeginBytes);
495 }
496 sb.append(" getOnDiskSizeWithHeader()=").append(getOnDiskSizeWithHeader())
497 .append(" totalChecksumBytes()=").append(totalChecksumBytes())
498 .append(" isUnpacked()=").append(isUnpacked())
499 .append(" buf=[ ").append(buf).append(" ]")
500 .append(" dataBeginsWith=").append(dataBegin)
501 .append(" fileContext=").append(fileContext)
502 .append(" ]");
503 return sb.toString();
504 }
505
506
507
508
509 private static void validateOnDiskSizeWithoutHeader(int expectedOnDiskSizeWithoutHeader,
510 int actualOnDiskSizeWithoutHeader, ByteBuffer buf, long offset) throws IOException {
511 if (actualOnDiskSizeWithoutHeader != expectedOnDiskSizeWithoutHeader) {
512
513
514 ByteBuffer bufReadOnly = buf.asReadOnlyBuffer();
515 String dataBegin = null;
516 byte[] dataBeginBytes = new byte[Math.min(32, bufReadOnly.limit() - bufReadOnly.position())];
517 bufReadOnly.get(dataBeginBytes);
518 dataBegin = Bytes.toStringBinary(dataBeginBytes);
519 String blockInfoMsg =
520 "Block offset: " + offset + ", data starts with: " + dataBegin;
521 throw new IOException("On-disk size without header provided is "
522 + expectedOnDiskSizeWithoutHeader + ", but block "
523 + "header contains " + actualOnDiskSizeWithoutHeader + ". " +
524 blockInfoMsg);
525 }
526 }
527
528
529
530
531
532 HFileBlock unpack(HFileContext fileContext, FSReader reader) throws IOException {
533 if (!fileContext.isCompressedOrEncrypted()) {
534
535
536
537 return this;
538 }
539
540 HFileBlock unpacked = new HFileBlock(this);
541 unpacked.allocateBuffer();
542
543 HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA ?
544 reader.getBlockDecodingContext() : reader.getDefaultBlockDecodingContext();
545
546 ByteBuffer dup = this.buf.duplicate();
547 dup.position(this.headerSize());
548 dup = dup.slice();
549 ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(),
550 unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(),
551 dup);
552
553
554 if (unpacked.hasNextBlockHeader()) {
555
556
557
558
559 ByteBuffer inDup = this.buf.duplicate();
560 inDup.limit(inDup.limit() + headerSize());
561 ByteBuffer outDup = unpacked.buf.duplicate();
562 outDup.limit(outDup.limit() + unpacked.headerSize());
563 ByteBufferUtils.copyFromBufferToBuffer(
564 outDup,
565 inDup,
566 this.onDiskDataSizeWithHeader,
567 unpacked.headerSize() + unpacked.uncompressedSizeWithoutHeader
568 + unpacked.totalChecksumBytes(), unpacked.headerSize());
569 }
570 return unpacked;
571 }
572
573
574
575
576 private boolean hasNextBlockHeader() {
577 return nextBlockOnDiskSizeWithHeader > 0;
578 }
579
580
581
582
583
584
585 private void allocateBuffer() {
586 int cksumBytes = totalChecksumBytes();
587 int headerSize = headerSize();
588 int capacityNeeded = headerSize + uncompressedSizeWithoutHeader +
589 cksumBytes + (hasNextBlockHeader() ? headerSize : 0);
590
591
592 ByteBuffer newBuf = ByteBuffer.allocate(capacityNeeded);
593
594
595
596 ByteBuffer dup = buf.duplicate();
597 dup.position(0);
598 dup.get(newBuf.array(), newBuf.arrayOffset(), headerSize);
599
600 buf = newBuf;
601
602 buf.limit(headerSize + uncompressedSizeWithoutHeader + cksumBytes);
603 }
604
605
606
607
608
609 public boolean isUnpacked() {
610 final int cksumBytes = totalChecksumBytes();
611 final int headerSize = headerSize();
612 final int expectedCapacity = headerSize + uncompressedSizeWithoutHeader + cksumBytes;
613 final int bufCapacity = buf.capacity();
614 return bufCapacity == expectedCapacity || bufCapacity == expectedCapacity + headerSize;
615 }
616
617
618 public static void verifyUncompressed(ByteBuffer buf, boolean useHBaseChecksum)
619 throws IOException {
620 int onDiskSizeWithoutHeader = buf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX);
621 int uncompressedSizeWithoutHeader = buf.getInt(Header.UNCOMPRESSED_SIZE_WITHOUT_HEADER_INDEX);
622 int onDiskDataSizeWithHeader;
623 int checksumBytes = 0;
624 if (useHBaseChecksum) {
625 onDiskDataSizeWithHeader = buf.getInt(Header.ON_DISK_DATA_SIZE_WITH_HEADER_INDEX);
626 checksumBytes = (int) ChecksumUtil.numBytes(onDiskDataSizeWithHeader,
627 buf.getInt(Header.BYTES_PER_CHECKSUM_INDEX));
628 }
629
630 if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader + checksumBytes) {
631 throw new IOException("Using no compression but "
632 + "onDiskSizeWithoutHeader=" + onDiskSizeWithoutHeader + ", "
633 + "uncompressedSizeWithoutHeader=" + uncompressedSizeWithoutHeader
634 + ", numChecksumbytes=" + checksumBytes);
635 }
636 }
637
638
639
640
641
642 public void expectType(BlockType expectedType) throws IOException {
643 if (blockType != expectedType) {
644 throw new IOException("Invalid block type: expected=" + expectedType
645 + ", actual=" + blockType);
646 }
647 }
648
649
650 public long getOffset() {
651 if (offset < 0) {
652 throw new IllegalStateException(
653 "HFile block offset not initialized properly");
654 }
655 return offset;
656 }
657
658
659
660
661 public DataInputStream getByteStream() {
662 ByteBuffer dup = this.buf.duplicate();
663 dup.position(this.headerSize());
664 return new DataInputStream(new ByteBufferInputStream(dup));
665 }
666
667 @Override
668 public long heapSize() {
669 long size = ClassSize.align(
670 ClassSize.OBJECT +
671
672 3 * ClassSize.REFERENCE +
673
674
675 4 * Bytes.SIZEOF_INT +
676
677 2 * Bytes.SIZEOF_LONG +
678
679 fileContext.heapSize()
680 );
681
682 if (buf != null) {
683
684 size += ClassSize.align(buf.capacity() + BYTE_BUFFER_HEAP_SIZE);
685 }
686
687 return ClassSize.align(size);
688 }
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705 public static boolean readWithExtra(InputStream in, byte[] buf,
706 int bufOffset, int necessaryLen, int extraLen) throws IOException {
707 int bytesRemaining = necessaryLen + extraLen;
708 while (bytesRemaining > 0) {
709 int ret = in.read(buf, bufOffset, bytesRemaining);
710 if (ret == -1 && bytesRemaining <= extraLen) {
711
712 break;
713 }
714
715 if (ret < 0) {
716 throw new IOException("Premature EOF from inputStream (read "
717 + "returned " + ret + ", was trying to read " + necessaryLen
718 + " necessary bytes and " + extraLen + " extra bytes, "
719 + "successfully read "
720 + (necessaryLen + extraLen - bytesRemaining));
721 }
722 bufOffset += ret;
723 bytesRemaining -= ret;
724 }
725 return bytesRemaining <= 0;
726 }
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745 @VisibleForTesting
746 static boolean positionalReadWithExtra(FSDataInputStream in,
747 long position, byte[] buf, int bufOffset, int necessaryLen, int extraLen)
748 throws IOException {
749 int bytesRemaining = necessaryLen + extraLen;
750 int bytesRead = 0;
751 while (bytesRead < necessaryLen) {
752 int ret = in.read(position, buf, bufOffset, bytesRemaining);
753 if (ret < 0) {
754 throw new IOException("Premature EOF from inputStream (positional read "
755 + "returned " + ret + ", was trying to read " + necessaryLen
756 + " necessary bytes and " + extraLen + " extra bytes, "
757 + "successfully read " + bytesRead);
758 }
759 position += ret;
760 bufOffset += ret;
761 bytesRemaining -= ret;
762 bytesRead += ret;
763 }
764 return bytesRead != necessaryLen && bytesRemaining <= 0;
765 }
766
767
768
769
770
771 public int getNextBlockOnDiskSizeWithHeader() {
772 return nextBlockOnDiskSizeWithHeader;
773 }
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788 public static class Writer {
789
790 private enum State {
791 INIT,
792 WRITING,
793 BLOCK_READY
794 };
795
796
797 private State state = State.INIT;
798
799
800 private final HFileDataBlockEncoder dataBlockEncoder;
801
802 private HFileBlockEncodingContext dataBlockEncodingCtx;
803
804
805 private HFileBlockDefaultEncodingContext defaultBlockEncodingCtx;
806
807
808
809
810
811
812
813 private ByteArrayOutputStream baosInMemory;
814
815
816
817
818
819
820 private BlockType blockType;
821
822
823
824
825
826 private DataOutputStream userDataStream;
827
828
829
830 private int unencodedDataSizeWritten;
831
832
833
834
835
836
837 private byte[] onDiskBytesWithHeader;
838
839
840
841
842
843
844
845 private byte[] onDiskChecksum;
846
847
848
849
850
851
852
853
854 private byte[] uncompressedBytesWithHeader;
855
856
857
858
859
860 private long startOffset;
861
862
863
864
865
866 private long[] prevOffsetByType;
867
868
869 private long prevOffset;
870
871 private HFileContext fileContext;
872
873
874
875
876 public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext fileContext) {
877 this.dataBlockEncoder = dataBlockEncoder != null
878 ? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE;
879 defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(null,
880 HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
881 dataBlockEncodingCtx = this.dataBlockEncoder
882 .newDataBlockEncodingContext(HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
883
884 if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) {
885 throw new RuntimeException("Unsupported value of bytesPerChecksum. " +
886 " Minimum is " + HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is " +
887 fileContext.getBytesPerChecksum());
888 }
889
890 baosInMemory = new ByteArrayOutputStream();
891
892 prevOffsetByType = new long[BlockType.values().length];
893 for (int i = 0; i < prevOffsetByType.length; ++i)
894 prevOffsetByType[i] = -1;
895
896 this.fileContext = fileContext;
897 }
898
899
900
901
902
903
904
905 public DataOutputStream startWriting(BlockType newBlockType)
906 throws IOException {
907 if (state == State.BLOCK_READY && startOffset != -1) {
908
909
910 prevOffsetByType[blockType.getId()] = startOffset;
911 }
912
913 startOffset = -1;
914 blockType = newBlockType;
915
916 baosInMemory.reset();
917 baosInMemory.write(HConstants.HFILEBLOCK_DUMMY_HEADER);
918
919 state = State.WRITING;
920
921
922 userDataStream = new DataOutputStream(baosInMemory);
923 if (newBlockType == BlockType.DATA) {
924 this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream);
925 }
926 this.unencodedDataSizeWritten = 0;
927 return userDataStream;
928 }
929
930
931
932
933
934
935 public void write(Cell cell) throws IOException{
936 expectState(State.WRITING);
937 this.unencodedDataSizeWritten += this.dataBlockEncoder.encode(cell, dataBlockEncodingCtx,
938 this.userDataStream);
939 }
940
941
942
943
944
945
946
947
948 DataOutputStream getUserDataStream() {
949 expectState(State.WRITING);
950 return userDataStream;
951 }
952
953
954
955
956
957 void ensureBlockReady() throws IOException {
958 Preconditions.checkState(state != State.INIT,
959 "Unexpected state: " + state);
960
961 if (state == State.BLOCK_READY)
962 return;
963
964
965 finishBlock();
966 }
967
968
969
970
971
972
973
974 private void finishBlock() throws IOException {
975 if (blockType == BlockType.DATA) {
976 BufferGrabbingByteArrayOutputStream baosInMemoryCopy =
977 new BufferGrabbingByteArrayOutputStream();
978 baosInMemory.writeTo(baosInMemoryCopy);
979 this.dataBlockEncoder.endBlockEncoding(dataBlockEncodingCtx, userDataStream,
980 baosInMemoryCopy.buf, blockType);
981 blockType = dataBlockEncodingCtx.getBlockType();
982 }
983 userDataStream.flush();
984
985 uncompressedBytesWithHeader = baosInMemory.toByteArray();
986 prevOffset = prevOffsetByType[blockType.getId()];
987
988
989
990
991 state = State.BLOCK_READY;
992 if (blockType == BlockType.DATA || blockType == BlockType.ENCODED_DATA) {
993 onDiskBytesWithHeader = dataBlockEncodingCtx
994 .compressAndEncrypt(uncompressedBytesWithHeader);
995 } else {
996 onDiskBytesWithHeader = defaultBlockEncodingCtx
997 .compressAndEncrypt(uncompressedBytesWithHeader);
998 }
999 int numBytes = (int) ChecksumUtil.numBytes(
1000 onDiskBytesWithHeader.length,
1001 fileContext.getBytesPerChecksum());
1002
1003
1004 putHeader(onDiskBytesWithHeader, 0,
1005 onDiskBytesWithHeader.length + numBytes,
1006 uncompressedBytesWithHeader.length, onDiskBytesWithHeader.length);
1007
1008 putHeader(uncompressedBytesWithHeader, 0,
1009 onDiskBytesWithHeader.length + numBytes,
1010 uncompressedBytesWithHeader.length, onDiskBytesWithHeader.length);
1011
1012 onDiskChecksum = new byte[numBytes];
1013 ChecksumUtil.generateChecksums(
1014 onDiskBytesWithHeader, 0, onDiskBytesWithHeader.length,
1015 onDiskChecksum, 0, fileContext.getChecksumType(), fileContext.getBytesPerChecksum());
1016 }
1017
1018 public static class BufferGrabbingByteArrayOutputStream extends ByteArrayOutputStream {
1019 private byte[] buf;
1020
1021 @Override
1022 public void write(byte[] b, int off, int len) {
1023 this.buf = b;
1024 }
1025
1026 public byte[] getBuffer() {
1027 return this.buf;
1028 }
1029 }
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039 private void putHeader(byte[] dest, int offset, int onDiskSize,
1040 int uncompressedSize, int onDiskDataSize) {
1041 offset = blockType.put(dest, offset);
1042 offset = Bytes.putInt(dest, offset, onDiskSize - HConstants.HFILEBLOCK_HEADER_SIZE);
1043 offset = Bytes.putInt(dest, offset, uncompressedSize - HConstants.HFILEBLOCK_HEADER_SIZE);
1044 offset = Bytes.putLong(dest, offset, prevOffset);
1045 offset = Bytes.putByte(dest, offset, fileContext.getChecksumType().getCode());
1046 offset = Bytes.putInt(dest, offset, fileContext.getBytesPerChecksum());
1047 Bytes.putInt(dest, offset, onDiskDataSize);
1048 }
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058 public void writeHeaderAndData(FSDataOutputStream out) throws IOException {
1059 long offset = out.getPos();
1060 if (startOffset != -1 && offset != startOffset) {
1061 throw new IOException("A " + blockType + " block written to a "
1062 + "stream twice, first at offset " + startOffset + ", then at "
1063 + offset);
1064 }
1065 startOffset = offset;
1066
1067 finishBlockAndWriteHeaderAndData((DataOutputStream) out);
1068 }
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079 protected void finishBlockAndWriteHeaderAndData(DataOutputStream out)
1080 throws IOException {
1081 ensureBlockReady();
1082 out.write(onDiskBytesWithHeader);
1083 out.write(onDiskChecksum);
1084 }
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096 byte[] getHeaderAndDataForTest() throws IOException {
1097 ensureBlockReady();
1098
1099
1100 byte[] output =
1101 new byte[onDiskBytesWithHeader.length
1102 + onDiskChecksum.length];
1103 System.arraycopy(onDiskBytesWithHeader, 0, output, 0,
1104 onDiskBytesWithHeader.length);
1105 System.arraycopy(onDiskChecksum, 0, output,
1106 onDiskBytesWithHeader.length, onDiskChecksum.length);
1107 return output;
1108 }
1109
1110
1111
1112
1113 public void release() {
1114 if (dataBlockEncodingCtx != null) {
1115 dataBlockEncodingCtx.close();
1116 dataBlockEncodingCtx = null;
1117 }
1118 if (defaultBlockEncodingCtx != null) {
1119 defaultBlockEncodingCtx.close();
1120 defaultBlockEncodingCtx = null;
1121 }
1122 }
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132 int getOnDiskSizeWithoutHeader() {
1133 expectState(State.BLOCK_READY);
1134 return onDiskBytesWithHeader.length
1135 + onDiskChecksum.length
1136 - HConstants.HFILEBLOCK_HEADER_SIZE;
1137 }
1138
1139
1140
1141
1142
1143
1144
1145
1146 int getOnDiskSizeWithHeader() {
1147 expectState(State.BLOCK_READY);
1148 return onDiskBytesWithHeader.length + onDiskChecksum.length;
1149 }
1150
1151
1152
1153
1154 int getUncompressedSizeWithoutHeader() {
1155 expectState(State.BLOCK_READY);
1156 return uncompressedBytesWithHeader.length - HConstants.HFILEBLOCK_HEADER_SIZE;
1157 }
1158
1159
1160
1161
1162 int getUncompressedSizeWithHeader() {
1163 expectState(State.BLOCK_READY);
1164 return uncompressedBytesWithHeader.length;
1165 }
1166
1167
1168 public boolean isWriting() {
1169 return state == State.WRITING;
1170 }
1171
1172
1173
1174
1175
1176
1177
1178
1179 public int blockSizeWritten() {
1180 if (state != State.WRITING) return 0;
1181 return this.unencodedDataSizeWritten;
1182 }
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192 ByteBuffer getUncompressedBufferWithHeader() {
1193 expectState(State.BLOCK_READY);
1194 return ByteBuffer.wrap(uncompressedBytesWithHeader);
1195 }
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205 ByteBuffer getOnDiskBufferWithHeader() {
1206 expectState(State.BLOCK_READY);
1207 return ByteBuffer.wrap(onDiskBytesWithHeader);
1208 }
1209
1210 private void expectState(State expectedState) {
1211 if (state != expectedState) {
1212 throw new IllegalStateException("Expected state: " + expectedState +
1213 ", actual state: " + state);
1214 }
1215 }
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227 public void writeBlock(BlockWritable bw, FSDataOutputStream out)
1228 throws IOException {
1229 bw.writeToBlock(startWriting(bw.getBlockType()));
1230 writeHeaderAndData(out);
1231 }
1232
1233
1234
1235
1236
1237
1238
1239
1240 public HFileBlock getBlockForCaching(CacheConfig cacheConf) {
1241 HFileContext newContext = new HFileContextBuilder()
1242 .withBlockSize(fileContext.getBlocksize())
1243 .withBytesPerCheckSum(0)
1244 .withChecksumType(ChecksumType.NULL)
1245 .withCompression(fileContext.getCompression())
1246 .withDataBlockEncoding(fileContext.getDataBlockEncoding())
1247 .withHBaseCheckSum(fileContext.isUseHBaseChecksum())
1248 .withCompressTags(fileContext.isCompressTags())
1249 .withIncludesMvcc(fileContext.isIncludesMvcc())
1250 .withIncludesTags(fileContext.isIncludesTags())
1251 .build();
1252 return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(),
1253 getUncompressedSizeWithoutHeader(), prevOffset,
1254 cacheConf.shouldCacheCompressed(blockType.getCategory()) ?
1255 getOnDiskBufferWithHeader() :
1256 getUncompressedBufferWithHeader(),
1257 FILL_HEADER, startOffset,
1258 onDiskBytesWithHeader.length + onDiskChecksum.length, newContext);
1259 }
1260 }
1261
1262
1263 public interface BlockWritable {
1264
1265
1266 BlockType getBlockType();
1267
1268
1269
1270
1271
1272
1273
1274 void writeToBlock(DataOutput out) throws IOException;
1275 }
1276
1277
1278
1279
1280 public interface BlockIterator {
1281
1282
1283
1284
1285 HFileBlock nextBlock() throws IOException;
1286
1287
1288
1289
1290
1291 HFileBlock nextBlockWithBlockType(BlockType blockType) throws IOException;
1292 }
1293
1294
1295 public interface FSReader {
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308 HFileBlock readBlockData(long offset, long onDiskSize,
1309 int uncompressedSize, boolean pread) throws IOException;
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320 BlockIterator blockRange(long startOffset, long endOffset);
1321
1322
1323 void closeStreams() throws IOException;
1324
1325
1326 HFileBlockDecodingContext getBlockDecodingContext();
1327
1328
1329 HFileBlockDecodingContext getDefaultBlockDecodingContext();
1330
1331
1332
1333
1334
1335 void unbufferStream();
1336 }
1337
1338
1339
1340
1341
1342 private abstract static class AbstractFSReader implements FSReader {
1343
1344
1345
1346 protected long fileSize;
1347
1348
1349 protected final int hdrSize;
1350
1351
1352 protected HFileSystem hfs;
1353
1354
1355 protected Path path;
1356
1357 protected final Lock streamLock = new ReentrantLock();
1358
1359
1360 public static final int DEFAULT_BUFFER_SIZE = 1 << 20;
1361
1362 protected HFileContext fileContext;
1363
1364 public AbstractFSReader(long fileSize, HFileSystem hfs, Path path, HFileContext fileContext)
1365 throws IOException {
1366 this.fileSize = fileSize;
1367 this.hfs = hfs;
1368 this.path = path;
1369 this.fileContext = fileContext;
1370 this.hdrSize = headerSize(fileContext.isUseHBaseChecksum());
1371 }
1372
1373 @Override
1374 public BlockIterator blockRange(final long startOffset,
1375 final long endOffset) {
1376 final FSReader owner = this;
1377 return new BlockIterator() {
1378 private long offset = startOffset;
1379
1380 @Override
1381 public HFileBlock nextBlock() throws IOException {
1382 if (offset >= endOffset)
1383 return null;
1384 HFileBlock b = readBlockData(offset, -1, -1, false);
1385 offset += b.getOnDiskSizeWithHeader();
1386 return b.unpack(fileContext, owner);
1387 }
1388
1389 @Override
1390 public HFileBlock nextBlockWithBlockType(BlockType blockType)
1391 throws IOException {
1392 HFileBlock blk = nextBlock();
1393 if (blk.getBlockType() != blockType) {
1394 throw new IOException("Expected block of type " + blockType
1395 + " but found " + blk.getBlockType());
1396 }
1397 return blk;
1398 }
1399 };
1400 }
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417 protected int readAtOffset(FSDataInputStream istream, byte[] dest, int destOffset, int size,
1418 boolean peekIntoNextBlock, long fileOffset, boolean pread) throws IOException {
1419 if (peekIntoNextBlock &&
1420 destOffset + size + hdrSize > dest.length) {
1421
1422
1423 throw new IOException("Attempted to read " + size + " bytes and " +
1424 hdrSize + " bytes of next header into a " + dest.length +
1425 "-byte array at offset " + destOffset);
1426 }
1427
1428 if (!pread && streamLock.tryLock()) {
1429
1430 try {
1431 HFileUtil.seekOnMultipleSources(istream, fileOffset);
1432
1433 long realOffset = istream.getPos();
1434 if (realOffset != fileOffset) {
1435 throw new IOException("Tried to seek to " + fileOffset + " to "
1436 + "read " + size + " bytes, but pos=" + realOffset
1437 + " after seek");
1438 }
1439
1440 if (!peekIntoNextBlock) {
1441 IOUtils.readFully(istream, dest, destOffset, size);
1442 return -1;
1443 }
1444
1445
1446 if (!readWithExtra(istream, dest, destOffset, size, hdrSize))
1447 return -1;
1448 } finally {
1449 streamLock.unlock();
1450 }
1451 } else {
1452
1453 int extraSize = peekIntoNextBlock ? hdrSize : 0;
1454 if (!positionalReadWithExtra(istream, fileOffset, dest, destOffset,
1455 size, extraSize)) {
1456 return -1;
1457 }
1458 }
1459
1460 assert peekIntoNextBlock;
1461 return Bytes.toInt(dest, destOffset + size + BlockType.MAGIC_LENGTH) + hdrSize;
1462 }
1463
1464 }
1465
1466
1467
1468
1469
1470 private static class PrefetchedHeader {
1471 long offset = -1;
1472 byte[] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE];
1473 final ByteBuffer buf = ByteBuffer.wrap(header, 0, HConstants.HFILEBLOCK_HEADER_SIZE);
1474 }
1475
1476
1477 static class FSReaderImpl extends AbstractFSReader {
1478
1479
1480 protected FSDataInputStreamWrapper streamWrapper;
1481
1482 private HFileBlockDecodingContext encodedBlockDecodingCtx;
1483
1484
1485 private final HFileBlockDefaultDecodingContext defaultDecodingCtx;
1486
1487 private ThreadLocal<PrefetchedHeader> prefetchedHeaderForThread =
1488 new ThreadLocal<PrefetchedHeader>() {
1489 @Override
1490 public PrefetchedHeader initialValue() {
1491 return new PrefetchedHeader();
1492 }
1493 };
1494
1495 public FSReaderImpl(FSDataInputStreamWrapper stream, long fileSize, HFileSystem hfs, Path path,
1496 HFileContext fileContext) throws IOException {
1497 super(fileSize, hfs, path, fileContext);
1498 this.streamWrapper = stream;
1499
1500 this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum());
1501 defaultDecodingCtx = new HFileBlockDefaultDecodingContext(fileContext);
1502 encodedBlockDecodingCtx = defaultDecodingCtx;
1503 }
1504
1505
1506
1507
1508
1509 FSReaderImpl(FSDataInputStream istream, long fileSize, HFileContext fileContext)
1510 throws IOException {
1511 this(new FSDataInputStreamWrapper(istream), fileSize, null, null, fileContext);
1512 }
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525 @Override
1526 public HFileBlock readBlockData(long offset, long onDiskSizeWithHeaderL,
1527 int uncompressedSize, boolean pread)
1528 throws IOException {
1529
1530
1531
1532
1533
1534
1535 boolean doVerificationThruHBaseChecksum = streamWrapper.shouldUseHBaseChecksum();
1536 FSDataInputStream is = streamWrapper.getStream(doVerificationThruHBaseChecksum);
1537
1538 HFileBlock blk = readBlockDataInternal(is, offset, (int) onDiskSizeWithHeaderL,
1539 uncompressedSize, pread, doVerificationThruHBaseChecksum);
1540 if (blk == null) {
1541 HFile.LOG.warn("HBase checksum verification failed for file " +
1542 path + " at offset " +
1543 offset + " filesize " + fileSize +
1544 ". Retrying read with HDFS checksums turned on...");
1545
1546 if (!doVerificationThruHBaseChecksum) {
1547 String msg = "HBase checksum verification failed for file " +
1548 path + " at offset " +
1549 offset + " filesize " + fileSize +
1550 " but this cannot happen because doVerify is " +
1551 doVerificationThruHBaseChecksum;
1552 HFile.LOG.warn(msg);
1553 throw new IOException(msg);
1554 }
1555 HFile.checksumFailures.incrementAndGet();
1556
1557
1558
1559
1560
1561
1562
1563 is = this.streamWrapper.fallbackToFsChecksum(CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD);
1564 doVerificationThruHBaseChecksum = false;
1565 blk = readBlockDataInternal(is, offset, (int) onDiskSizeWithHeaderL, uncompressedSize,
1566 pread, doVerificationThruHBaseChecksum);
1567 if (blk != null) {
1568 HFile.LOG.warn("HDFS checksum verification suceeded for file " +
1569 path + " at offset " +
1570 offset + " filesize " + fileSize);
1571 }
1572 }
1573 if (blk == null && !doVerificationThruHBaseChecksum) {
1574 String msg = "readBlockData failed, possibly due to " +
1575 "checksum verification failed for file " + path +
1576 " at offset " + offset + " filesize " + fileSize;
1577 HFile.LOG.warn(msg);
1578 throw new IOException(msg);
1579 }
1580
1581
1582
1583
1584
1585
1586
1587
1588 streamWrapper.checksumOk();
1589 return blk;
1590 }
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605 protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset,
1606 int onDiskSizeWithHeader, int uncompressedSize, boolean pread,
1607 boolean verifyChecksum)
1608 throws IOException {
1609 if (offset < 0) {
1610 throw new IOException("Invalid offset=" + offset + " trying to read "
1611 + "block (onDiskSize=" + onDiskSizeWithHeader
1612 + ", uncompressedSize=" + uncompressedSize + ")");
1613 }
1614
1615 if (uncompressedSize != -1) {
1616 throw new IOException("Version 2 block reader API does not need " +
1617 "the uncompressed size parameter");
1618 }
1619
1620 if ((onDiskSizeWithHeader < hdrSize && onDiskSizeWithHeader != -1)
1621 || onDiskSizeWithHeader >= Integer.MAX_VALUE) {
1622 throw new IOException("Invalid onDisksize=" + onDiskSizeWithHeader
1623 + ": expected to be at least " + hdrSize
1624 + " and at most " + Integer.MAX_VALUE + ", or -1 (offset="
1625 + offset + ", uncompressedSize=" + uncompressedSize + ")");
1626 }
1627
1628
1629
1630
1631
1632
1633
1634
1635 PrefetchedHeader prefetchedHeader = prefetchedHeaderForThread.get();
1636 ByteBuffer headerBuf = prefetchedHeader.offset == offset? prefetchedHeader.buf: null;
1637
1638
1639 int nextBlockOnDiskSize = 0;
1640 byte[] onDiskBlock = null;
1641
1642 if (onDiskSizeWithHeader > 0) {
1643
1644
1645
1646
1647
1648
1649
1650
1651 int preReadHeaderSize = headerBuf == null ? 0 : hdrSize;
1652 onDiskBlock = new byte[onDiskSizeWithHeader + hdrSize];
1653
1654 nextBlockOnDiskSize = readAtOffset(is, onDiskBlock,
1655 preReadHeaderSize, onDiskSizeWithHeader - preReadHeaderSize,
1656 true, offset + preReadHeaderSize, pread);
1657 if (headerBuf != null) {
1658
1659
1660
1661 assert headerBuf.hasArray();
1662 System.arraycopy(headerBuf.array(),
1663 headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize);
1664 } else {
1665 headerBuf = ByteBuffer.wrap(onDiskBlock, 0, hdrSize);
1666 }
1667
1668 int expectedOnDiskSizeWithoutHeader = onDiskSizeWithHeader - hdrSize;
1669 int actualOnDiskSizeWithoutHeader =
1670 headerBuf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX);
1671 validateOnDiskSizeWithoutHeader(expectedOnDiskSizeWithoutHeader,
1672 actualOnDiskSizeWithoutHeader, headerBuf, offset);
1673 } else {
1674
1675
1676
1677
1678
1679
1680
1681
1682 if (headerBuf == null) {
1683
1684
1685
1686
1687
1688 headerBuf = ByteBuffer.allocate(hdrSize);
1689
1690 readAtOffset(is, headerBuf.array(), headerBuf.arrayOffset(),
1691 hdrSize, false, offset, pread);
1692 }
1693 int onDiskSizeWithoutHeader = headerBuf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX);
1694 onDiskSizeWithHeader = onDiskSizeWithoutHeader + hdrSize;
1695 onDiskBlock = new byte[onDiskSizeWithHeader + hdrSize];
1696 System.arraycopy(headerBuf.array(), headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize);
1697 nextBlockOnDiskSize =
1698 readAtOffset(is, onDiskBlock, hdrSize, onDiskSizeWithHeader - hdrSize, true,
1699 offset + hdrSize, pread);
1700 }
1701 ByteBuffer onDiskBlockByteBuffer = ByteBuffer.wrap(onDiskBlock, 0, onDiskSizeWithHeader);
1702
1703 if (!fileContext.isCompressedOrEncrypted()) {
1704 verifyUncompressed(headerBuf, fileContext.isUseHBaseChecksum());
1705 }
1706
1707 if (verifyChecksum && !validateChecksum(offset, onDiskBlockByteBuffer, hdrSize)) {
1708 return null;
1709 }
1710
1711
1712
1713
1714
1715 HFileBlock b = new HFileBlock(onDiskBlockByteBuffer, this.fileContext.isUseHBaseChecksum());
1716
1717 b.nextBlockOnDiskSizeWithHeader = nextBlockOnDiskSize;
1718
1719
1720 if (b.hasNextBlockHeader()) {
1721 prefetchedHeader.offset = offset + b.getOnDiskSizeWithHeader();
1722 System.arraycopy(onDiskBlock, onDiskSizeWithHeader, prefetchedHeader.header, 0, hdrSize);
1723 }
1724
1725 b.offset = offset;
1726 b.fileContext.setIncludesTags(this.fileContext.isIncludesTags());
1727 b.fileContext.setIncludesMvcc(this.fileContext.isIncludesMvcc());
1728 return b;
1729 }
1730
1731 void setIncludesMemstoreTS(boolean includesMemstoreTS) {
1732 this.fileContext.setIncludesMvcc(includesMemstoreTS);
1733 }
1734
1735 void setDataBlockEncoder(HFileDataBlockEncoder encoder) {
1736 encodedBlockDecodingCtx = encoder.newDataBlockDecodingContext(this.fileContext);
1737 }
1738
1739 @Override
1740 public HFileBlockDecodingContext getBlockDecodingContext() {
1741 return this.encodedBlockDecodingCtx;
1742 }
1743
1744 @Override
1745 public HFileBlockDecodingContext getDefaultBlockDecodingContext() {
1746 return this.defaultDecodingCtx;
1747 }
1748
1749
1750
1751
1752
1753
1754 protected boolean validateChecksum(long offset, ByteBuffer data, int hdrSize)
1755 throws IOException {
1756
1757
1758
1759
1760
1761 if (!fileContext.isUseHBaseChecksum()) {
1762 return false;
1763 }
1764 return ChecksumUtil.validateChecksum(data, path, offset, hdrSize);
1765 }
1766
1767 @Override
1768 public void closeStreams() throws IOException {
1769 streamWrapper.close();
1770 }
1771
1772 @Override
1773 public void unbufferStream() {
1774
1775
1776 if (streamLock.tryLock()) {
1777 try {
1778 this.streamWrapper.unbuffer();
1779 } finally {
1780 streamLock.unlock();
1781 }
1782 }
1783 }
1784
1785 @Override
1786 public String toString() {
1787 return "hfs=" + hfs + ", path=" + path + ", fileContext=" + fileContext;
1788 }
1789 }
1790
1791 @Override
1792 public int getSerializedLength() {
1793 if (buf != null) {
1794
1795 int extraSpace = hasNextBlockHeader() ? headerSize() : 0;
1796 return this.buf.limit() + extraSpace + HFileBlock.EXTRA_SERIALIZATION_SPACE;
1797 }
1798 return 0;
1799 }
1800
1801 @Override
1802 public void serialize(ByteBuffer destination) {
1803 ByteBufferUtils.copyFromBufferToBuffer(destination, this.buf, 0, getSerializedLength()
1804 - EXTRA_SERIALIZATION_SPACE);
1805 serializeExtraInfo(destination);
1806 }
1807
1808 public void serializeExtraInfo(ByteBuffer destination) {
1809 destination.put(this.fileContext.isUseHBaseChecksum() ? (byte) 1 : (byte) 0);
1810 destination.putLong(this.offset);
1811 destination.putInt(this.nextBlockOnDiskSizeWithHeader);
1812 destination.rewind();
1813 }
1814
1815 @Override
1816 public CacheableDeserializer<Cacheable> getDeserializer() {
1817 return HFileBlock.blockDeserializer;
1818 }
1819
1820 @Override
1821 public boolean equals(Object comparison) {
1822 if (this == comparison) {
1823 return true;
1824 }
1825 if (comparison == null) {
1826 return false;
1827 }
1828 if (comparison.getClass() != this.getClass()) {
1829 return false;
1830 }
1831
1832 HFileBlock castedComparison = (HFileBlock) comparison;
1833
1834 if (castedComparison.blockType != this.blockType) {
1835 return false;
1836 }
1837 if (castedComparison.nextBlockOnDiskSizeWithHeader != this.nextBlockOnDiskSizeWithHeader) {
1838 return false;
1839 }
1840 if (castedComparison.offset != this.offset) {
1841 return false;
1842 }
1843 if (castedComparison.onDiskSizeWithoutHeader != this.onDiskSizeWithoutHeader) {
1844 return false;
1845 }
1846 if (castedComparison.prevBlockOffset != this.prevBlockOffset) {
1847 return false;
1848 }
1849 if (castedComparison.uncompressedSizeWithoutHeader != this.uncompressedSizeWithoutHeader) {
1850 return false;
1851 }
1852 if (ByteBufferUtils.compareTo(this.buf, 0, this.buf.limit(), castedComparison.buf, 0,
1853 castedComparison.buf.limit()) != 0) {
1854 return false;
1855 }
1856 return true;
1857 }
1858
1859 public DataBlockEncoding getDataBlockEncoding() {
1860 if (blockType == BlockType.ENCODED_DATA) {
1861 return DataBlockEncoding.getEncodingById(getDataBlockEncodingId());
1862 }
1863 return DataBlockEncoding.NONE;
1864 }
1865
1866 byte getChecksumType() {
1867 return this.fileContext.getChecksumType().getCode();
1868 }
1869
1870 int getBytesPerChecksum() {
1871 return this.fileContext.getBytesPerChecksum();
1872 }
1873
1874
1875 int getOnDiskDataSizeWithHeader() {
1876 return this.onDiskDataSizeWithHeader;
1877 }
1878
1879
1880
1881
1882
1883 int totalChecksumBytes() {
1884 return HFileBlock.totalChecksumBytes(this.fileContext, onDiskDataSizeWithHeader);
1885 }
1886
1887 private static int totalChecksumBytes(HFileContext fileContext, int onDiskDataSizeWithHeader) {
1888
1889
1890
1891
1892 if (!fileContext.isUseHBaseChecksum() || fileContext.getBytesPerChecksum() == 0) {
1893 return 0;
1894 }
1895 return (int) ChecksumUtil.numBytes(onDiskDataSizeWithHeader,
1896 fileContext.getBytesPerChecksum());
1897 }
1898
1899
1900
1901
1902 public int headerSize() {
1903 return headerSize(this.fileContext.isUseHBaseChecksum());
1904 }
1905
1906
1907
1908
1909 public static int headerSize(boolean usesHBaseChecksum) {
1910 if (usesHBaseChecksum) {
1911 return HConstants.HFILEBLOCK_HEADER_SIZE;
1912 }
1913 return HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
1914 }
1915
1916
1917
1918
1919 public byte[] getDummyHeaderForVersion() {
1920 return getDummyHeaderForVersion(this.fileContext.isUseHBaseChecksum());
1921 }
1922
1923
1924
1925
1926 static private byte[] getDummyHeaderForVersion(boolean usesHBaseChecksum) {
1927 if (usesHBaseChecksum) {
1928 return HConstants.HFILEBLOCK_DUMMY_HEADER;
1929 }
1930 return DUMMY_HEADER_NO_CHECKSUM;
1931 }
1932
1933
1934
1935
1936
1937 public HFileContext getHFileContext() {
1938 return this.fileContext;
1939 }
1940
1941
1942
1943
1944
1945
1946 static String toStringHeader(ByteBuffer buf) throws IOException {
1947 byte[] magicBuf = new byte[Math.min(buf.limit() - buf.position(), BlockType.MAGIC_LENGTH)];
1948 buf.get(magicBuf);
1949 BlockType bt = BlockType.parse(magicBuf, 0, BlockType.MAGIC_LENGTH);
1950 int compressedBlockSizeNoHeader = buf.getInt();
1951 int uncompressedBlockSizeNoHeader = buf.getInt();
1952 long prevBlockOffset = buf.getLong();
1953 byte cksumtype = buf.get();
1954 long bytesPerChecksum = buf.getInt();
1955 long onDiskDataSizeWithHeader = buf.getInt();
1956 return " Header dump: magic: " + Bytes.toString(magicBuf) +
1957 " blockType " + bt +
1958 " compressedBlockSizeNoHeader " +
1959 compressedBlockSizeNoHeader +
1960 " uncompressedBlockSizeNoHeader " +
1961 uncompressedBlockSizeNoHeader +
1962 " prevBlockOffset " + prevBlockOffset +
1963 " checksumType " + ChecksumType.codeToType(cksumtype) +
1964 " bytesPerChecksum " + bytesPerChecksum +
1965 " onDiskDataSizeWithHeader " + onDiskDataSizeWithHeader;
1966 }
1967 }