1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.io.hfile;
19
20 import java.io.ByteArrayOutputStream;
21 import java.io.DataInputStream;
22 import java.io.DataOutput;
23 import java.io.DataOutputStream;
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.nio.ByteBuffer;
27 import java.util.concurrent.locks.Lock;
28 import java.util.concurrent.locks.ReentrantLock;
29
30 import org.apache.hadoop.fs.FSDataInputStream;
31 import org.apache.hadoop.fs.FSDataOutputStream;
32 import org.apache.hadoop.fs.Path;
33 import org.apache.hadoop.hbase.Cell;
34 import org.apache.hadoop.hbase.HConstants;
35 import org.apache.hadoop.hbase.classification.InterfaceAudience;
36 import org.apache.hadoop.hbase.fs.HFileSystem;
37 import org.apache.hadoop.hbase.io.ByteBufferInputStream;
38 import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
39 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
40 import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
41 import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
42 import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
43 import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
44 import org.apache.hadoop.hbase.util.ByteBufferUtils;
45 import org.apache.hadoop.hbase.util.Bytes;
46 import org.apache.hadoop.hbase.util.ChecksumType;
47 import org.apache.hadoop.hbase.util.ClassSize;
48 import org.apache.hadoop.io.IOUtils;
49
50 import com.google.common.annotations.VisibleForTesting;
51 import com.google.common.base.Preconditions;
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85 @InterfaceAudience.Private
86 public class HFileBlock implements Cacheable {
87
88
89
90
91
92
93 static final int CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD = 3;
94
95 public static final boolean FILL_HEADER = true;
96 public static final boolean DONT_FILL_HEADER = false;
97
98
99
100
101
102 public static final int ENCODED_HEADER_SIZE = HConstants.HFILEBLOCK_HEADER_SIZE
103 + DataBlockEncoding.ID_SIZE;
104
105 static final byte[] DUMMY_HEADER_NO_CHECKSUM =
106 new byte[HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM];
107
108 public static final int BYTE_BUFFER_HEAP_SIZE = (int) ClassSize.estimateBase(
109 ByteBuffer.wrap(new byte[0], 0, 0).getClass(), false);
110
111
112 public static final int EXTRA_SERIALIZATION_SPACE = Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT
113 + Bytes.SIZEOF_LONG;
114
115
116
117
118 static final int CHECKSUM_SIZE = Bytes.SIZEOF_INT;
119
120 static final CacheableDeserializer<Cacheable> blockDeserializer =
121 new CacheableDeserializer<Cacheable>() {
122 public HFileBlock deserialize(ByteBuffer buf, boolean reuse) throws IOException{
123 buf.limit(buf.limit() - HFileBlock.EXTRA_SERIALIZATION_SPACE).rewind();
124 ByteBuffer newByteBuffer;
125 if (reuse) {
126 newByteBuffer = buf.slice();
127 } else {
128 newByteBuffer = ByteBuffer.allocate(buf.limit());
129 newByteBuffer.put(buf);
130 }
131 buf.position(buf.limit());
132 buf.limit(buf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE);
133 boolean usesChecksum = buf.get() == (byte)1;
134 HFileBlock hFileBlock = new HFileBlock(newByteBuffer, usesChecksum);
135 hFileBlock.offset = buf.getLong();
136 hFileBlock.nextBlockOnDiskSizeWithHeader = buf.getInt();
137 if (hFileBlock.hasNextBlockHeader()) {
138 hFileBlock.buf.limit(hFileBlock.buf.limit() - hFileBlock.headerSize());
139 }
140 return hFileBlock;
141 }
142
143 @Override
144 public int getDeserialiserIdentifier() {
145 return deserializerIdentifier;
146 }
147
148 @Override
149 public HFileBlock deserialize(ByteBuffer b) throws IOException {
150 return deserialize(b, false);
151 }
152 };
153 private static final int deserializerIdentifier;
154 static {
155 deserializerIdentifier = CacheableDeserializerIdManager
156 .registerDeserializer(blockDeserializer);
157 }
158
159
160 static class Header {
161
162
163
164
165
166
167
168
169
170 static int BLOCK_MAGIC_INDEX = 0;
171 static int ON_DISK_SIZE_WITHOUT_HEADER_INDEX = 8;
172 static int UNCOMPRESSED_SIZE_WITHOUT_HEADER_INDEX = 12;
173 static int PREV_BLOCK_OFFSET_INDEX = 16;
174 static int CHECKSUM_TYPE_INDEX = 24;
175 static int BYTES_PER_CHECKSUM_INDEX = 25;
176 static int ON_DISK_DATA_SIZE_WITH_HEADER_INDEX = 29;
177 }
178
179
180 private BlockType blockType;
181
182
183 private int onDiskSizeWithoutHeader;
184
185
186 private final int uncompressedSizeWithoutHeader;
187
188
189 private final long prevBlockOffset;
190
191
192
193
194
195 private final int onDiskDataSizeWithHeader;
196
197
198 private ByteBuffer buf;
199
200
201 private HFileContext fileContext;
202
203
204
205
206
207 private long offset = -1;
208
209
210
211
212
213
214 private int nextBlockOnDiskSizeWithHeader = -1;
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232 HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader, int uncompressedSizeWithoutHeader,
233 long prevBlockOffset, ByteBuffer buf, boolean fillHeader, long offset,
234 int onDiskDataSizeWithHeader, HFileContext fileContext) {
235 this.blockType = blockType;
236 this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader;
237 this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader;
238 this.prevBlockOffset = prevBlockOffset;
239 this.buf = buf;
240 this.offset = offset;
241 this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader;
242 this.fileContext = fileContext;
243 if (fillHeader)
244 overwriteHeader();
245 this.buf.rewind();
246 }
247
248
249
250
251 HFileBlock(HFileBlock that) {
252 this.blockType = that.blockType;
253 this.onDiskSizeWithoutHeader = that.onDiskSizeWithoutHeader;
254 this.uncompressedSizeWithoutHeader = that.uncompressedSizeWithoutHeader;
255 this.prevBlockOffset = that.prevBlockOffset;
256 this.buf = that.buf.duplicate();
257 this.offset = that.offset;
258 this.onDiskDataSizeWithHeader = that.onDiskDataSizeWithHeader;
259 this.fileContext = that.fileContext;
260 this.nextBlockOnDiskSizeWithHeader = that.nextBlockOnDiskSizeWithHeader;
261 }
262
263
264
265
266
267
268
269
270
271 HFileBlock(ByteBuffer b, boolean usesHBaseChecksum) throws IOException {
272 b.rewind();
273 blockType = BlockType.read(b);
274 onDiskSizeWithoutHeader = b.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX);
275 uncompressedSizeWithoutHeader = b.getInt(Header.UNCOMPRESSED_SIZE_WITHOUT_HEADER_INDEX);
276 prevBlockOffset = b.getLong(Header.PREV_BLOCK_OFFSET_INDEX);
277 HFileContextBuilder contextBuilder = new HFileContextBuilder();
278 contextBuilder.withHBaseCheckSum(usesHBaseChecksum);
279 if (usesHBaseChecksum) {
280 contextBuilder.withChecksumType(ChecksumType.codeToType(b.get(Header.CHECKSUM_TYPE_INDEX)));
281 contextBuilder.withBytesPerCheckSum(b.getInt(Header.BYTES_PER_CHECKSUM_INDEX));
282 this.onDiskDataSizeWithHeader = b.getInt(Header.ON_DISK_DATA_SIZE_WITH_HEADER_INDEX);
283 } else {
284 contextBuilder.withChecksumType(ChecksumType.NULL);
285 contextBuilder.withBytesPerCheckSum(0);
286 this.onDiskDataSizeWithHeader = onDiskSizeWithoutHeader +
287 HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
288 }
289 this.fileContext = contextBuilder.build();
290 buf = b;
291 buf.rewind();
292 }
293
294 public BlockType getBlockType() {
295 return blockType;
296 }
297
298
299 public short getDataBlockEncodingId() {
300 if (blockType != BlockType.ENCODED_DATA) {
301 throw new IllegalArgumentException("Querying encoder ID of a block " +
302 "of type other than " + BlockType.ENCODED_DATA + ": " + blockType);
303 }
304 return buf.getShort(headerSize());
305 }
306
307
308
309
310 public int getOnDiskSizeWithHeader() {
311 return onDiskSizeWithoutHeader + headerSize();
312 }
313
314
315
316
317 public int getOnDiskSizeWithoutHeader() {
318 return onDiskSizeWithoutHeader;
319 }
320
321
322
323
324 public int getUncompressedSizeWithoutHeader() {
325 return uncompressedSizeWithoutHeader;
326 }
327
328
329
330
331
332 public long getPrevBlockOffset() {
333 return prevBlockOffset;
334 }
335
336
337
338
339
340 private void overwriteHeader() {
341 buf.rewind();
342 blockType.write(buf);
343 buf.putInt(onDiskSizeWithoutHeader);
344 buf.putInt(uncompressedSizeWithoutHeader);
345 buf.putLong(prevBlockOffset);
346 if (this.fileContext.isUseHBaseChecksum()) {
347 buf.put(fileContext.getChecksumType().getCode());
348 buf.putInt(fileContext.getBytesPerChecksum());
349 buf.putInt(onDiskDataSizeWithHeader);
350 }
351 }
352
353
354
355
356
357
358 public ByteBuffer getBufferWithoutHeader() {
359 ByteBuffer dup = this.buf.duplicate();
360 dup.position(headerSize());
361 dup.limit(buf.limit() - totalChecksumBytes());
362 return dup.slice();
363 }
364
365
366
367
368
369
370
371
372
373
374
375 public ByteBuffer getBufferReadOnly() {
376 ByteBuffer dup = this.buf.duplicate();
377 dup.limit(buf.limit() - totalChecksumBytes());
378 return dup.slice();
379 }
380
381
382
383
384
385
386
387
388 public ByteBuffer getBufferReadOnlyWithHeader() {
389 ByteBuffer dup = this.buf.duplicate();
390 return dup.slice();
391 }
392
393
394
395
396
397
398
399 ByteBuffer getBufferWithHeader() {
400 ByteBuffer dupBuf = buf.duplicate();
401 dupBuf.rewind();
402 return dupBuf;
403 }
404
405 private void sanityCheckAssertion(long valueFromBuf, long valueFromField,
406 String fieldName) throws IOException {
407 if (valueFromBuf != valueFromField) {
408 throw new AssertionError(fieldName + " in the buffer (" + valueFromBuf
409 + ") is different from that in the field (" + valueFromField + ")");
410 }
411 }
412
413 private void sanityCheckAssertion(BlockType valueFromBuf, BlockType valueFromField)
414 throws IOException {
415 if (valueFromBuf != valueFromField) {
416 throw new IOException("Block type stored in the buffer: " +
417 valueFromBuf + ", block type field: " + valueFromField);
418 }
419 }
420
421
422
423
424
425
426
427
428 void sanityCheck() throws IOException {
429 buf.rewind();
430
431 sanityCheckAssertion(BlockType.read(buf), blockType);
432
433 sanityCheckAssertion(buf.getInt(), onDiskSizeWithoutHeader,
434 "onDiskSizeWithoutHeader");
435
436 sanityCheckAssertion(buf.getInt(), uncompressedSizeWithoutHeader,
437 "uncompressedSizeWithoutHeader");
438
439 sanityCheckAssertion(buf.getLong(), prevBlockOffset, "prevBlocKOffset");
440 if (this.fileContext.isUseHBaseChecksum()) {
441 sanityCheckAssertion(buf.get(), this.fileContext.getChecksumType().getCode(), "checksumType");
442 sanityCheckAssertion(buf.getInt(), this.fileContext.getBytesPerChecksum(), "bytesPerChecksum");
443 sanityCheckAssertion(buf.getInt(), onDiskDataSizeWithHeader, "onDiskDataSizeWithHeader");
444 }
445
446 int cksumBytes = totalChecksumBytes();
447 int expectedBufLimit = onDiskDataSizeWithHeader + cksumBytes;
448 if (buf.limit() != expectedBufLimit) {
449 throw new AssertionError("Expected buffer limit " + expectedBufLimit
450 + ", got " + buf.limit());
451 }
452
453
454
455 int hdrSize = headerSize();
456 if (buf.capacity() != expectedBufLimit &&
457 buf.capacity() != expectedBufLimit + hdrSize) {
458 throw new AssertionError("Invalid buffer capacity: " + buf.capacity() +
459 ", expected " + expectedBufLimit + " or " + (expectedBufLimit + hdrSize));
460 }
461 }
462
463 @Override
464 public String toString() {
465 StringBuilder sb = new StringBuilder()
466 .append("HFileBlock [")
467 .append(" fileOffset=").append(offset)
468 .append(" headerSize()=").append(headerSize())
469 .append(" blockType=").append(blockType)
470 .append(" onDiskSizeWithoutHeader=").append(onDiskSizeWithoutHeader)
471 .append(" uncompressedSizeWithoutHeader=").append(uncompressedSizeWithoutHeader)
472 .append(" prevBlockOffset=").append(prevBlockOffset)
473 .append(" isUseHBaseChecksum()=").append(fileContext.isUseHBaseChecksum());
474 if (fileContext.isUseHBaseChecksum()) {
475 sb.append(" checksumType=").append(ChecksumType.codeToType(this.buf.get(24)))
476 .append(" bytesPerChecksum=").append(this.buf.getInt(24 + 1))
477 .append(" onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader);
478 } else {
479 sb.append(" onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader)
480 .append("(").append(onDiskSizeWithoutHeader)
481 .append("+").append(HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM).append(")");
482 }
483 String dataBegin = null;
484 if (buf.hasArray()) {
485 dataBegin = Bytes.toStringBinary(buf.array(), buf.arrayOffset() + headerSize(),
486 Math.min(32, buf.limit() - buf.arrayOffset() - headerSize()));
487 } else {
488 ByteBuffer bufWithoutHeader = getBufferWithoutHeader();
489 byte[] dataBeginBytes = new byte[Math.min(32,
490 bufWithoutHeader.limit() - bufWithoutHeader.position())];
491 bufWithoutHeader.get(dataBeginBytes);
492 dataBegin = Bytes.toStringBinary(dataBeginBytes);
493 }
494 sb.append(" getOnDiskSizeWithHeader()=").append(getOnDiskSizeWithHeader())
495 .append(" totalChecksumBytes()=").append(totalChecksumBytes())
496 .append(" isUnpacked()=").append(isUnpacked())
497 .append(" buf=[ ").append(buf).append(" ]")
498 .append(" dataBeginsWith=").append(dataBegin)
499 .append(" fileContext=").append(fileContext)
500 .append(" ]");
501 return sb.toString();
502 }
503
504
505
506
507 private static void validateOnDiskSizeWithoutHeader(int expectedOnDiskSizeWithoutHeader,
508 int actualOnDiskSizeWithoutHeader, ByteBuffer buf, long offset) throws IOException {
509 if (actualOnDiskSizeWithoutHeader != expectedOnDiskSizeWithoutHeader) {
510
511
512 ByteBuffer bufReadOnly = buf.asReadOnlyBuffer();
513 String dataBegin = null;
514 byte[] dataBeginBytes = new byte[Math.min(32, bufReadOnly.limit() - bufReadOnly.position())];
515 bufReadOnly.get(dataBeginBytes);
516 dataBegin = Bytes.toStringBinary(dataBeginBytes);
517 String blockInfoMsg =
518 "Block offset: " + offset + ", data starts with: " + dataBegin;
519 throw new IOException("On-disk size without header provided is "
520 + expectedOnDiskSizeWithoutHeader + ", but block "
521 + "header contains " + actualOnDiskSizeWithoutHeader + ". " +
522 blockInfoMsg);
523 }
524 }
525
526
527
528
529
530 HFileBlock unpack(HFileContext fileContext, FSReader reader) throws IOException {
531 if (!fileContext.isCompressedOrEncrypted()) {
532
533
534
535 return this;
536 }
537
538 HFileBlock unpacked = new HFileBlock(this);
539 unpacked.allocateBuffer();
540
541 HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA ?
542 reader.getBlockDecodingContext() : reader.getDefaultBlockDecodingContext();
543
544 ByteBuffer dup = this.buf.duplicate();
545 dup.position(this.headerSize());
546 dup = dup.slice();
547 ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(),
548 unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(),
549 dup);
550
551
552 if (unpacked.hasNextBlockHeader()) {
553
554
555
556
557 ByteBuffer inDup = this.buf.duplicate();
558 inDup.limit(inDup.limit() + headerSize());
559 ByteBuffer outDup = unpacked.buf.duplicate();
560 outDup.limit(outDup.limit() + unpacked.headerSize());
561 ByteBufferUtils.copyFromBufferToBuffer(
562 outDup,
563 inDup,
564 this.onDiskDataSizeWithHeader,
565 unpacked.headerSize() + unpacked.uncompressedSizeWithoutHeader
566 + unpacked.totalChecksumBytes(), unpacked.headerSize());
567 }
568 return unpacked;
569 }
570
571
572
573
574 private boolean hasNextBlockHeader() {
575 return nextBlockOnDiskSizeWithHeader > 0;
576 }
577
578
579
580
581
582
583 private void allocateBuffer() {
584 int cksumBytes = totalChecksumBytes();
585 int headerSize = headerSize();
586 int capacityNeeded = headerSize + uncompressedSizeWithoutHeader +
587 cksumBytes + (hasNextBlockHeader() ? headerSize : 0);
588
589
590 ByteBuffer newBuf = ByteBuffer.allocate(capacityNeeded);
591
592
593
594 ByteBuffer dup = buf.duplicate();
595 dup.position(0);
596 dup.get(newBuf.array(), newBuf.arrayOffset(), headerSize);
597
598 buf = newBuf;
599
600 buf.limit(headerSize + uncompressedSizeWithoutHeader + cksumBytes);
601 }
602
603
604
605
606
607 public boolean isUnpacked() {
608 final int cksumBytes = totalChecksumBytes();
609 final int headerSize = headerSize();
610 final int expectedCapacity = headerSize + uncompressedSizeWithoutHeader + cksumBytes;
611 final int bufCapacity = buf.capacity();
612 return bufCapacity == expectedCapacity || bufCapacity == expectedCapacity + headerSize;
613 }
614
615
616 public static void verifyUncompressed(ByteBuffer buf, boolean useHBaseChecksum)
617 throws IOException {
618 int onDiskSizeWithoutHeader = buf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX);
619 int uncompressedSizeWithoutHeader = buf.getInt(Header.UNCOMPRESSED_SIZE_WITHOUT_HEADER_INDEX);
620 int onDiskDataSizeWithHeader;
621 int checksumBytes = 0;
622 if (useHBaseChecksum) {
623 onDiskDataSizeWithHeader = buf.getInt(Header.ON_DISK_DATA_SIZE_WITH_HEADER_INDEX);
624 checksumBytes = (int) ChecksumUtil.numBytes(onDiskDataSizeWithHeader,
625 buf.getInt(Header.BYTES_PER_CHECKSUM_INDEX));
626 }
627
628 if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader + checksumBytes) {
629 throw new IOException("Using no compression but "
630 + "onDiskSizeWithoutHeader=" + onDiskSizeWithoutHeader + ", "
631 + "uncompressedSizeWithoutHeader=" + uncompressedSizeWithoutHeader
632 + ", numChecksumbytes=" + checksumBytes);
633 }
634 }
635
636
637
638
639
640 public void expectType(BlockType expectedType) throws IOException {
641 if (blockType != expectedType) {
642 throw new IOException("Invalid block type: expected=" + expectedType
643 + ", actual=" + blockType);
644 }
645 }
646
647
648 public long getOffset() {
649 if (offset < 0) {
650 throw new IllegalStateException(
651 "HFile block offset not initialized properly");
652 }
653 return offset;
654 }
655
656
657
658
659 public DataInputStream getByteStream() {
660 ByteBuffer dup = this.buf.duplicate();
661 dup.position(this.headerSize());
662 return new DataInputStream(new ByteBufferInputStream(dup));
663 }
664
665 @Override
666 public long heapSize() {
667 long size = ClassSize.align(
668 ClassSize.OBJECT +
669
670 3 * ClassSize.REFERENCE +
671
672
673 4 * Bytes.SIZEOF_INT +
674
675 2 * Bytes.SIZEOF_LONG +
676
677 fileContext.heapSize()
678 );
679
680 if (buf != null) {
681
682 size += ClassSize.align(buf.capacity() + BYTE_BUFFER_HEAP_SIZE);
683 }
684
685 return ClassSize.align(size);
686 }
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703 public static boolean readWithExtra(InputStream in, byte[] buf,
704 int bufOffset, int necessaryLen, int extraLen) throws IOException {
705 int bytesRemaining = necessaryLen + extraLen;
706 while (bytesRemaining > 0) {
707 int ret = in.read(buf, bufOffset, bytesRemaining);
708 if (ret == -1 && bytesRemaining <= extraLen) {
709
710 break;
711 }
712
713 if (ret < 0) {
714 throw new IOException("Premature EOF from inputStream (read "
715 + "returned " + ret + ", was trying to read " + necessaryLen
716 + " necessary bytes and " + extraLen + " extra bytes, "
717 + "successfully read "
718 + (necessaryLen + extraLen - bytesRemaining));
719 }
720 bufOffset += ret;
721 bytesRemaining -= ret;
722 }
723 return bytesRemaining <= 0;
724 }
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743 @VisibleForTesting
744 static boolean positionalReadWithExtra(FSDataInputStream in,
745 long position, byte[] buf, int bufOffset, int necessaryLen, int extraLen)
746 throws IOException {
747 int bytesRemaining = necessaryLen + extraLen;
748 int bytesRead = 0;
749 while (bytesRead < necessaryLen) {
750 int ret = in.read(position, buf, bufOffset, bytesRemaining);
751 if (ret < 0) {
752 throw new IOException("Premature EOF from inputStream (positional read "
753 + "returned " + ret + ", was trying to read " + necessaryLen
754 + " necessary bytes and " + extraLen + " extra bytes, "
755 + "successfully read " + bytesRead);
756 }
757 position += ret;
758 bufOffset += ret;
759 bytesRemaining -= ret;
760 bytesRead += ret;
761 }
762 return bytesRead != necessaryLen && bytesRemaining <= 0;
763 }
764
765
766
767
768
769 public int getNextBlockOnDiskSizeWithHeader() {
770 return nextBlockOnDiskSizeWithHeader;
771 }
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786 public static class Writer {
787
788 private enum State {
789 INIT,
790 WRITING,
791 BLOCK_READY
792 };
793
794
795 private State state = State.INIT;
796
797
798 private final HFileDataBlockEncoder dataBlockEncoder;
799
800 private HFileBlockEncodingContext dataBlockEncodingCtx;
801
802
803 private HFileBlockDefaultEncodingContext defaultBlockEncodingCtx;
804
805
806
807
808
809
810
811 private ByteArrayOutputStream baosInMemory;
812
813
814
815
816
817
818 private BlockType blockType;
819
820
821
822
823
824 private DataOutputStream userDataStream;
825
826
827
828 private int unencodedDataSizeWritten;
829
830
831
832
833
834
835 private byte[] onDiskBytesWithHeader;
836
837
838
839
840
841
842
843 private byte[] onDiskChecksum;
844
845
846
847
848
849
850
851
852 private byte[] uncompressedBytesWithHeader;
853
854
855
856
857
858 private long startOffset;
859
860
861
862
863
864 private long[] prevOffsetByType;
865
866
867 private long prevOffset;
868
869 private HFileContext fileContext;
870
871
872
873
874 public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext fileContext) {
875 this.dataBlockEncoder = dataBlockEncoder != null
876 ? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE;
877 defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(null,
878 HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
879 dataBlockEncodingCtx = this.dataBlockEncoder
880 .newDataBlockEncodingContext(HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
881
882 if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) {
883 throw new RuntimeException("Unsupported value of bytesPerChecksum. " +
884 " Minimum is " + HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is " +
885 fileContext.getBytesPerChecksum());
886 }
887
888 baosInMemory = new ByteArrayOutputStream();
889
890 prevOffsetByType = new long[BlockType.values().length];
891 for (int i = 0; i < prevOffsetByType.length; ++i)
892 prevOffsetByType[i] = -1;
893
894 this.fileContext = fileContext;
895 }
896
897
898
899
900
901
902
903 public DataOutputStream startWriting(BlockType newBlockType)
904 throws IOException {
905 if (state == State.BLOCK_READY && startOffset != -1) {
906
907
908 prevOffsetByType[blockType.getId()] = startOffset;
909 }
910
911 startOffset = -1;
912 blockType = newBlockType;
913
914 baosInMemory.reset();
915 baosInMemory.write(HConstants.HFILEBLOCK_DUMMY_HEADER);
916
917 state = State.WRITING;
918
919
920 userDataStream = new DataOutputStream(baosInMemory);
921 if (newBlockType == BlockType.DATA) {
922 this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream);
923 }
924 this.unencodedDataSizeWritten = 0;
925 return userDataStream;
926 }
927
928
929
930
931
932
933 public void write(Cell cell) throws IOException{
934 expectState(State.WRITING);
935 this.unencodedDataSizeWritten += this.dataBlockEncoder.encode(cell, dataBlockEncodingCtx,
936 this.userDataStream);
937 }
938
939
940
941
942
943
944
945
946 DataOutputStream getUserDataStream() {
947 expectState(State.WRITING);
948 return userDataStream;
949 }
950
951
952
953
954
955 void ensureBlockReady() throws IOException {
956 Preconditions.checkState(state != State.INIT,
957 "Unexpected state: " + state);
958
959 if (state == State.BLOCK_READY)
960 return;
961
962
963 finishBlock();
964 }
965
966
967
968
969
970
971
972 private void finishBlock() throws IOException {
973 if (blockType == BlockType.DATA) {
974 BufferGrabbingByteArrayOutputStream baosInMemoryCopy =
975 new BufferGrabbingByteArrayOutputStream();
976 baosInMemory.writeTo(baosInMemoryCopy);
977 this.dataBlockEncoder.endBlockEncoding(dataBlockEncodingCtx, userDataStream,
978 baosInMemoryCopy.buf, blockType);
979 blockType = dataBlockEncodingCtx.getBlockType();
980 }
981 userDataStream.flush();
982
983 uncompressedBytesWithHeader = baosInMemory.toByteArray();
984 prevOffset = prevOffsetByType[blockType.getId()];
985
986
987
988
989 state = State.BLOCK_READY;
990 if (blockType == BlockType.DATA || blockType == BlockType.ENCODED_DATA) {
991 onDiskBytesWithHeader = dataBlockEncodingCtx
992 .compressAndEncrypt(uncompressedBytesWithHeader);
993 } else {
994 onDiskBytesWithHeader = defaultBlockEncodingCtx
995 .compressAndEncrypt(uncompressedBytesWithHeader);
996 }
997 int numBytes = (int) ChecksumUtil.numBytes(
998 onDiskBytesWithHeader.length,
999 fileContext.getBytesPerChecksum());
1000
1001
1002 putHeader(onDiskBytesWithHeader, 0,
1003 onDiskBytesWithHeader.length + numBytes,
1004 uncompressedBytesWithHeader.length, onDiskBytesWithHeader.length);
1005
1006 putHeader(uncompressedBytesWithHeader, 0,
1007 onDiskBytesWithHeader.length + numBytes,
1008 uncompressedBytesWithHeader.length, onDiskBytesWithHeader.length);
1009
1010 onDiskChecksum = new byte[numBytes];
1011 ChecksumUtil.generateChecksums(
1012 onDiskBytesWithHeader, 0, onDiskBytesWithHeader.length,
1013 onDiskChecksum, 0, fileContext.getChecksumType(), fileContext.getBytesPerChecksum());
1014 }
1015
1016 public static class BufferGrabbingByteArrayOutputStream extends ByteArrayOutputStream {
1017 private byte[] buf;
1018
1019 @Override
1020 public void write(byte[] b, int off, int len) {
1021 this.buf = b;
1022 }
1023
1024 public byte[] getBuffer() {
1025 return this.buf;
1026 }
1027 }
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037 private void putHeader(byte[] dest, int offset, int onDiskSize,
1038 int uncompressedSize, int onDiskDataSize) {
1039 offset = blockType.put(dest, offset);
1040 offset = Bytes.putInt(dest, offset, onDiskSize - HConstants.HFILEBLOCK_HEADER_SIZE);
1041 offset = Bytes.putInt(dest, offset, uncompressedSize - HConstants.HFILEBLOCK_HEADER_SIZE);
1042 offset = Bytes.putLong(dest, offset, prevOffset);
1043 offset = Bytes.putByte(dest, offset, fileContext.getChecksumType().getCode());
1044 offset = Bytes.putInt(dest, offset, fileContext.getBytesPerChecksum());
1045 Bytes.putInt(dest, offset, onDiskDataSize);
1046 }
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056 public void writeHeaderAndData(FSDataOutputStream out) throws IOException {
1057 long offset = out.getPos();
1058 if (startOffset != -1 && offset != startOffset) {
1059 throw new IOException("A " + blockType + " block written to a "
1060 + "stream twice, first at offset " + startOffset + ", then at "
1061 + offset);
1062 }
1063 startOffset = offset;
1064
1065 finishBlockAndWriteHeaderAndData((DataOutputStream) out);
1066 }
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077 protected void finishBlockAndWriteHeaderAndData(DataOutputStream out)
1078 throws IOException {
1079 ensureBlockReady();
1080 out.write(onDiskBytesWithHeader);
1081 out.write(onDiskChecksum);
1082 }
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094 byte[] getHeaderAndDataForTest() throws IOException {
1095 ensureBlockReady();
1096
1097
1098 byte[] output =
1099 new byte[onDiskBytesWithHeader.length
1100 + onDiskChecksum.length];
1101 System.arraycopy(onDiskBytesWithHeader, 0, output, 0,
1102 onDiskBytesWithHeader.length);
1103 System.arraycopy(onDiskChecksum, 0, output,
1104 onDiskBytesWithHeader.length, onDiskChecksum.length);
1105 return output;
1106 }
1107
1108
1109
1110
1111 public void release() {
1112 if (dataBlockEncodingCtx != null) {
1113 dataBlockEncodingCtx.close();
1114 dataBlockEncodingCtx = null;
1115 }
1116 if (defaultBlockEncodingCtx != null) {
1117 defaultBlockEncodingCtx.close();
1118 defaultBlockEncodingCtx = null;
1119 }
1120 }
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130 int getOnDiskSizeWithoutHeader() {
1131 expectState(State.BLOCK_READY);
1132 return onDiskBytesWithHeader.length
1133 + onDiskChecksum.length
1134 - HConstants.HFILEBLOCK_HEADER_SIZE;
1135 }
1136
1137
1138
1139
1140
1141
1142
1143
1144 int getOnDiskSizeWithHeader() {
1145 expectState(State.BLOCK_READY);
1146 return onDiskBytesWithHeader.length + onDiskChecksum.length;
1147 }
1148
1149
1150
1151
1152 int getUncompressedSizeWithoutHeader() {
1153 expectState(State.BLOCK_READY);
1154 return uncompressedBytesWithHeader.length - HConstants.HFILEBLOCK_HEADER_SIZE;
1155 }
1156
1157
1158
1159
1160 int getUncompressedSizeWithHeader() {
1161 expectState(State.BLOCK_READY);
1162 return uncompressedBytesWithHeader.length;
1163 }
1164
1165
1166 public boolean isWriting() {
1167 return state == State.WRITING;
1168 }
1169
1170
1171
1172
1173
1174
1175
1176
1177 public int blockSizeWritten() {
1178 if (state != State.WRITING) return 0;
1179 return this.unencodedDataSizeWritten;
1180 }
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190 ByteBuffer getUncompressedBufferWithHeader() {
1191 expectState(State.BLOCK_READY);
1192 return ByteBuffer.wrap(uncompressedBytesWithHeader);
1193 }
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203 ByteBuffer getOnDiskBufferWithHeader() {
1204 expectState(State.BLOCK_READY);
1205 return ByteBuffer.wrap(onDiskBytesWithHeader);
1206 }
1207
1208 private void expectState(State expectedState) {
1209 if (state != expectedState) {
1210 throw new IllegalStateException("Expected state: " + expectedState +
1211 ", actual state: " + state);
1212 }
1213 }
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225 public void writeBlock(BlockWritable bw, FSDataOutputStream out)
1226 throws IOException {
1227 bw.writeToBlock(startWriting(bw.getBlockType()));
1228 writeHeaderAndData(out);
1229 }
1230
1231
1232
1233
1234
1235
1236
1237
1238 public HFileBlock getBlockForCaching(CacheConfig cacheConf) {
1239 HFileContext newContext = new HFileContextBuilder()
1240 .withBlockSize(fileContext.getBlocksize())
1241 .withBytesPerCheckSum(0)
1242 .withChecksumType(ChecksumType.NULL)
1243 .withCompression(fileContext.getCompression())
1244 .withDataBlockEncoding(fileContext.getDataBlockEncoding())
1245 .withHBaseCheckSum(fileContext.isUseHBaseChecksum())
1246 .withCompressTags(fileContext.isCompressTags())
1247 .withIncludesMvcc(fileContext.isIncludesMvcc())
1248 .withIncludesTags(fileContext.isIncludesTags())
1249 .build();
1250 return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(),
1251 getUncompressedSizeWithoutHeader(), prevOffset,
1252 cacheConf.shouldCacheCompressed(blockType.getCategory()) ?
1253 getOnDiskBufferWithHeader() :
1254 getUncompressedBufferWithHeader(),
1255 FILL_HEADER, startOffset,
1256 onDiskBytesWithHeader.length + onDiskChecksum.length, newContext);
1257 }
1258 }
1259
1260
1261 public interface BlockWritable {
1262
1263
1264 BlockType getBlockType();
1265
1266
1267
1268
1269
1270
1271
1272 void writeToBlock(DataOutput out) throws IOException;
1273 }
1274
1275
1276
1277
1278 public interface BlockIterator {
1279
1280
1281
1282
1283 HFileBlock nextBlock() throws IOException;
1284
1285
1286
1287
1288
1289 HFileBlock nextBlockWithBlockType(BlockType blockType) throws IOException;
1290 }
1291
1292
1293 public interface FSReader {
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306 HFileBlock readBlockData(long offset, long onDiskSize,
1307 int uncompressedSize, boolean pread) throws IOException;
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318 BlockIterator blockRange(long startOffset, long endOffset);
1319
1320
1321 void closeStreams() throws IOException;
1322
1323
1324 HFileBlockDecodingContext getBlockDecodingContext();
1325
1326
1327 HFileBlockDecodingContext getDefaultBlockDecodingContext();
1328 }
1329
1330
1331
1332
1333
1334 private abstract static class AbstractFSReader implements FSReader {
1335
1336
1337
1338 protected long fileSize;
1339
1340
1341 protected final int hdrSize;
1342
1343
1344 protected HFileSystem hfs;
1345
1346
1347 protected Path path;
1348
1349 private final Lock streamLock = new ReentrantLock();
1350
1351
1352 public static final int DEFAULT_BUFFER_SIZE = 1 << 20;
1353
1354 protected HFileContext fileContext;
1355
1356 public AbstractFSReader(long fileSize, HFileSystem hfs, Path path, HFileContext fileContext)
1357 throws IOException {
1358 this.fileSize = fileSize;
1359 this.hfs = hfs;
1360 this.path = path;
1361 this.fileContext = fileContext;
1362 this.hdrSize = headerSize(fileContext.isUseHBaseChecksum());
1363 }
1364
1365 @Override
1366 public BlockIterator blockRange(final long startOffset,
1367 final long endOffset) {
1368 final FSReader owner = this;
1369 return new BlockIterator() {
1370 private long offset = startOffset;
1371
1372 @Override
1373 public HFileBlock nextBlock() throws IOException {
1374 if (offset >= endOffset)
1375 return null;
1376 HFileBlock b = readBlockData(offset, -1, -1, false);
1377 offset += b.getOnDiskSizeWithHeader();
1378 return b.unpack(fileContext, owner);
1379 }
1380
1381 @Override
1382 public HFileBlock nextBlockWithBlockType(BlockType blockType)
1383 throws IOException {
1384 HFileBlock blk = nextBlock();
1385 if (blk.getBlockType() != blockType) {
1386 throw new IOException("Expected block of type " + blockType
1387 + " but found " + blk.getBlockType());
1388 }
1389 return blk;
1390 }
1391 };
1392 }
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409 protected int readAtOffset(FSDataInputStream istream, byte[] dest, int destOffset, int size,
1410 boolean peekIntoNextBlock, long fileOffset, boolean pread) throws IOException {
1411 if (peekIntoNextBlock &&
1412 destOffset + size + hdrSize > dest.length) {
1413
1414
1415 throw new IOException("Attempted to read " + size + " bytes and " +
1416 hdrSize + " bytes of next header into a " + dest.length +
1417 "-byte array at offset " + destOffset);
1418 }
1419
1420 if (!pread && streamLock.tryLock()) {
1421
1422 try {
1423 istream.seek(fileOffset);
1424
1425 long realOffset = istream.getPos();
1426 if (realOffset != fileOffset) {
1427 throw new IOException("Tried to seek to " + fileOffset + " to "
1428 + "read " + size + " bytes, but pos=" + realOffset
1429 + " after seek");
1430 }
1431
1432 if (!peekIntoNextBlock) {
1433 IOUtils.readFully(istream, dest, destOffset, size);
1434 return -1;
1435 }
1436
1437
1438 if (!readWithExtra(istream, dest, destOffset, size, hdrSize))
1439 return -1;
1440 } finally {
1441 streamLock.unlock();
1442 }
1443 } else {
1444
1445 int extraSize = peekIntoNextBlock ? hdrSize : 0;
1446 if (!positionalReadWithExtra(istream, fileOffset, dest, destOffset,
1447 size, extraSize)) {
1448 return -1;
1449 }
1450 }
1451
1452 assert peekIntoNextBlock;
1453 return Bytes.toInt(dest, destOffset + size + BlockType.MAGIC_LENGTH) + hdrSize;
1454 }
1455
1456 }
1457
1458
1459
1460
1461
1462 private static class PrefetchedHeader {
1463 long offset = -1;
1464 byte[] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE];
1465 final ByteBuffer buf = ByteBuffer.wrap(header, 0, HConstants.HFILEBLOCK_HEADER_SIZE);
1466 }
1467
1468
1469 static class FSReaderImpl extends AbstractFSReader {
1470
1471
1472 protected FSDataInputStreamWrapper streamWrapper;
1473
1474 private HFileBlockDecodingContext encodedBlockDecodingCtx;
1475
1476
1477 private final HFileBlockDefaultDecodingContext defaultDecodingCtx;
1478
1479 private ThreadLocal<PrefetchedHeader> prefetchedHeaderForThread =
1480 new ThreadLocal<PrefetchedHeader>() {
1481 @Override
1482 public PrefetchedHeader initialValue() {
1483 return new PrefetchedHeader();
1484 }
1485 };
1486
1487 public FSReaderImpl(FSDataInputStreamWrapper stream, long fileSize, HFileSystem hfs, Path path,
1488 HFileContext fileContext) throws IOException {
1489 super(fileSize, hfs, path, fileContext);
1490 this.streamWrapper = stream;
1491
1492 this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum());
1493 defaultDecodingCtx = new HFileBlockDefaultDecodingContext(fileContext);
1494 encodedBlockDecodingCtx = defaultDecodingCtx;
1495 }
1496
1497
1498
1499
1500
1501 FSReaderImpl(FSDataInputStream istream, long fileSize, HFileContext fileContext)
1502 throws IOException {
1503 this(new FSDataInputStreamWrapper(istream), fileSize, null, null, fileContext);
1504 }
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517 @Override
1518 public HFileBlock readBlockData(long offset, long onDiskSizeWithHeaderL,
1519 int uncompressedSize, boolean pread)
1520 throws IOException {
1521
1522
1523
1524
1525
1526
1527 boolean doVerificationThruHBaseChecksum = streamWrapper.shouldUseHBaseChecksum();
1528 FSDataInputStream is = streamWrapper.getStream(doVerificationThruHBaseChecksum);
1529
1530 HFileBlock blk = readBlockDataInternal(is, offset, (int) onDiskSizeWithHeaderL,
1531 uncompressedSize, pread, doVerificationThruHBaseChecksum);
1532 if (blk == null) {
1533 HFile.LOG.warn("HBase checksum verification failed for file " +
1534 path + " at offset " +
1535 offset + " filesize " + fileSize +
1536 ". Retrying read with HDFS checksums turned on...");
1537
1538 if (!doVerificationThruHBaseChecksum) {
1539 String msg = "HBase checksum verification failed for file " +
1540 path + " at offset " +
1541 offset + " filesize " + fileSize +
1542 " but this cannot happen because doVerify is " +
1543 doVerificationThruHBaseChecksum;
1544 HFile.LOG.warn(msg);
1545 throw new IOException(msg);
1546 }
1547 HFile.checksumFailures.incrementAndGet();
1548
1549
1550
1551
1552
1553
1554
1555 is = this.streamWrapper.fallbackToFsChecksum(CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD);
1556 doVerificationThruHBaseChecksum = false;
1557 blk = readBlockDataInternal(is, offset, (int) onDiskSizeWithHeaderL, uncompressedSize,
1558 pread, doVerificationThruHBaseChecksum);
1559 if (blk != null) {
1560 HFile.LOG.warn("HDFS checksum verification suceeded for file " +
1561 path + " at offset " +
1562 offset + " filesize " + fileSize);
1563 }
1564 }
1565 if (blk == null && !doVerificationThruHBaseChecksum) {
1566 String msg = "readBlockData failed, possibly due to " +
1567 "checksum verification failed for file " + path +
1568 " at offset " + offset + " filesize " + fileSize;
1569 HFile.LOG.warn(msg);
1570 throw new IOException(msg);
1571 }
1572
1573
1574
1575
1576
1577
1578
1579
1580 streamWrapper.checksumOk();
1581 return blk;
1582 }
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597 protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset,
1598 int onDiskSizeWithHeader, int uncompressedSize, boolean pread,
1599 boolean verifyChecksum)
1600 throws IOException {
1601 if (offset < 0) {
1602 throw new IOException("Invalid offset=" + offset + " trying to read "
1603 + "block (onDiskSize=" + onDiskSizeWithHeader
1604 + ", uncompressedSize=" + uncompressedSize + ")");
1605 }
1606
1607 if (uncompressedSize != -1) {
1608 throw new IOException("Version 2 block reader API does not need " +
1609 "the uncompressed size parameter");
1610 }
1611
1612 if ((onDiskSizeWithHeader < hdrSize && onDiskSizeWithHeader != -1)
1613 || onDiskSizeWithHeader >= Integer.MAX_VALUE) {
1614 throw new IOException("Invalid onDisksize=" + onDiskSizeWithHeader
1615 + ": expected to be at least " + hdrSize
1616 + " and at most " + Integer.MAX_VALUE + ", or -1 (offset="
1617 + offset + ", uncompressedSize=" + uncompressedSize + ")");
1618 }
1619
1620
1621
1622
1623
1624
1625
1626
1627 PrefetchedHeader prefetchedHeader = prefetchedHeaderForThread.get();
1628 ByteBuffer headerBuf = prefetchedHeader.offset == offset? prefetchedHeader.buf: null;
1629
1630
1631 int nextBlockOnDiskSize = 0;
1632 byte[] onDiskBlock = null;
1633
1634 if (onDiskSizeWithHeader > 0) {
1635
1636
1637
1638
1639
1640
1641
1642
1643 int preReadHeaderSize = headerBuf == null ? 0 : hdrSize;
1644 onDiskBlock = new byte[onDiskSizeWithHeader + hdrSize];
1645
1646 nextBlockOnDiskSize = readAtOffset(is, onDiskBlock,
1647 preReadHeaderSize, onDiskSizeWithHeader - preReadHeaderSize,
1648 true, offset + preReadHeaderSize, pread);
1649 if (headerBuf != null) {
1650
1651
1652
1653 assert headerBuf.hasArray();
1654 System.arraycopy(headerBuf.array(),
1655 headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize);
1656 } else {
1657 headerBuf = ByteBuffer.wrap(onDiskBlock, 0, hdrSize);
1658 }
1659
1660 int expectedOnDiskSizeWithoutHeader = onDiskSizeWithHeader - hdrSize;
1661 int actualOnDiskSizeWithoutHeader =
1662 headerBuf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX);
1663 validateOnDiskSizeWithoutHeader(expectedOnDiskSizeWithoutHeader,
1664 actualOnDiskSizeWithoutHeader, headerBuf, offset);
1665 } else {
1666
1667
1668
1669
1670
1671
1672
1673
1674 if (headerBuf == null) {
1675
1676
1677
1678
1679
1680 headerBuf = ByteBuffer.allocate(hdrSize);
1681
1682 readAtOffset(is, headerBuf.array(), headerBuf.arrayOffset(),
1683 hdrSize, false, offset, pread);
1684 }
1685 int onDiskSizeWithoutHeader = headerBuf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX);
1686 onDiskSizeWithHeader = onDiskSizeWithoutHeader + hdrSize;
1687 onDiskBlock = new byte[onDiskSizeWithHeader + hdrSize];
1688 System.arraycopy(headerBuf.array(), headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize);
1689 nextBlockOnDiskSize =
1690 readAtOffset(is, onDiskBlock, hdrSize, onDiskSizeWithHeader - hdrSize, true,
1691 offset + hdrSize, pread);
1692 }
1693 ByteBuffer onDiskBlockByteBuffer = ByteBuffer.wrap(onDiskBlock, 0, onDiskSizeWithHeader);
1694
1695 if (!fileContext.isCompressedOrEncrypted()) {
1696 verifyUncompressed(headerBuf, fileContext.isUseHBaseChecksum());
1697 }
1698
1699 if (verifyChecksum && !validateChecksum(offset, onDiskBlockByteBuffer, hdrSize)) {
1700 return null;
1701 }
1702
1703
1704
1705
1706
1707 HFileBlock b = new HFileBlock(onDiskBlockByteBuffer, this.fileContext.isUseHBaseChecksum());
1708
1709 b.nextBlockOnDiskSizeWithHeader = nextBlockOnDiskSize;
1710
1711
1712 if (b.hasNextBlockHeader()) {
1713 prefetchedHeader.offset = offset + b.getOnDiskSizeWithHeader();
1714 System.arraycopy(onDiskBlock, onDiskSizeWithHeader, prefetchedHeader.header, 0, hdrSize);
1715 }
1716
1717 b.offset = offset;
1718 b.fileContext.setIncludesTags(this.fileContext.isIncludesTags());
1719 b.fileContext.setIncludesMvcc(this.fileContext.isIncludesMvcc());
1720 return b;
1721 }
1722
1723 void setIncludesMemstoreTS(boolean includesMemstoreTS) {
1724 this.fileContext.setIncludesMvcc(includesMemstoreTS);
1725 }
1726
1727 void setDataBlockEncoder(HFileDataBlockEncoder encoder) {
1728 encodedBlockDecodingCtx = encoder.newDataBlockDecodingContext(this.fileContext);
1729 }
1730
1731 @Override
1732 public HFileBlockDecodingContext getBlockDecodingContext() {
1733 return this.encodedBlockDecodingCtx;
1734 }
1735
1736 @Override
1737 public HFileBlockDecodingContext getDefaultBlockDecodingContext() {
1738 return this.defaultDecodingCtx;
1739 }
1740
1741
1742
1743
1744
1745
1746 protected boolean validateChecksum(long offset, ByteBuffer data, int hdrSize)
1747 throws IOException {
1748
1749
1750
1751
1752
1753 if (!fileContext.isUseHBaseChecksum()) {
1754 return false;
1755 }
1756 return ChecksumUtil.validateChecksum(data, path, offset, hdrSize);
1757 }
1758
1759 @Override
1760 public void closeStreams() throws IOException {
1761 streamWrapper.close();
1762 }
1763
1764 @Override
1765 public String toString() {
1766 return "hfs=" + hfs + ", path=" + path + ", fileContext=" + fileContext;
1767 }
1768 }
1769
1770 @Override
1771 public int getSerializedLength() {
1772 if (buf != null) {
1773
1774 int extraSpace = hasNextBlockHeader() ? headerSize() : 0;
1775 return this.buf.limit() + extraSpace + HFileBlock.EXTRA_SERIALIZATION_SPACE;
1776 }
1777 return 0;
1778 }
1779
1780 @Override
1781 public void serialize(ByteBuffer destination) {
1782 ByteBufferUtils.copyFromBufferToBuffer(destination, this.buf, 0, getSerializedLength()
1783 - EXTRA_SERIALIZATION_SPACE);
1784 serializeExtraInfo(destination);
1785 }
1786
1787 public void serializeExtraInfo(ByteBuffer destination) {
1788 destination.put(this.fileContext.isUseHBaseChecksum() ? (byte) 1 : (byte) 0);
1789 destination.putLong(this.offset);
1790 destination.putInt(this.nextBlockOnDiskSizeWithHeader);
1791 destination.rewind();
1792 }
1793
1794 @Override
1795 public CacheableDeserializer<Cacheable> getDeserializer() {
1796 return HFileBlock.blockDeserializer;
1797 }
1798
1799 @Override
1800 public boolean equals(Object comparison) {
1801 if (this == comparison) {
1802 return true;
1803 }
1804 if (comparison == null) {
1805 return false;
1806 }
1807 if (comparison.getClass() != this.getClass()) {
1808 return false;
1809 }
1810
1811 HFileBlock castedComparison = (HFileBlock) comparison;
1812
1813 if (castedComparison.blockType != this.blockType) {
1814 return false;
1815 }
1816 if (castedComparison.nextBlockOnDiskSizeWithHeader != this.nextBlockOnDiskSizeWithHeader) {
1817 return false;
1818 }
1819 if (castedComparison.offset != this.offset) {
1820 return false;
1821 }
1822 if (castedComparison.onDiskSizeWithoutHeader != this.onDiskSizeWithoutHeader) {
1823 return false;
1824 }
1825 if (castedComparison.prevBlockOffset != this.prevBlockOffset) {
1826 return false;
1827 }
1828 if (castedComparison.uncompressedSizeWithoutHeader != this.uncompressedSizeWithoutHeader) {
1829 return false;
1830 }
1831 if (ByteBufferUtils.compareTo(this.buf, 0, this.buf.limit(), castedComparison.buf, 0,
1832 castedComparison.buf.limit()) != 0) {
1833 return false;
1834 }
1835 return true;
1836 }
1837
1838 public DataBlockEncoding getDataBlockEncoding() {
1839 if (blockType == BlockType.ENCODED_DATA) {
1840 return DataBlockEncoding.getEncodingById(getDataBlockEncodingId());
1841 }
1842 return DataBlockEncoding.NONE;
1843 }
1844
1845 byte getChecksumType() {
1846 return this.fileContext.getChecksumType().getCode();
1847 }
1848
1849 int getBytesPerChecksum() {
1850 return this.fileContext.getBytesPerChecksum();
1851 }
1852
1853
1854 int getOnDiskDataSizeWithHeader() {
1855 return this.onDiskDataSizeWithHeader;
1856 }
1857
1858
1859
1860
1861
1862 int totalChecksumBytes() {
1863 return HFileBlock.totalChecksumBytes(this.fileContext, onDiskDataSizeWithHeader);
1864 }
1865
1866 private static int totalChecksumBytes(HFileContext fileContext, int onDiskDataSizeWithHeader) {
1867
1868
1869
1870
1871 if (!fileContext.isUseHBaseChecksum() || fileContext.getBytesPerChecksum() == 0) {
1872 return 0;
1873 }
1874 return (int) ChecksumUtil.numBytes(onDiskDataSizeWithHeader,
1875 fileContext.getBytesPerChecksum());
1876 }
1877
1878
1879
1880
1881 public int headerSize() {
1882 return headerSize(this.fileContext.isUseHBaseChecksum());
1883 }
1884
1885
1886
1887
1888 public static int headerSize(boolean usesHBaseChecksum) {
1889 if (usesHBaseChecksum) {
1890 return HConstants.HFILEBLOCK_HEADER_SIZE;
1891 }
1892 return HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
1893 }
1894
1895
1896
1897
1898 public byte[] getDummyHeaderForVersion() {
1899 return getDummyHeaderForVersion(this.fileContext.isUseHBaseChecksum());
1900 }
1901
1902
1903
1904
1905 static private byte[] getDummyHeaderForVersion(boolean usesHBaseChecksum) {
1906 if (usesHBaseChecksum) {
1907 return HConstants.HFILEBLOCK_DUMMY_HEADER;
1908 }
1909 return DUMMY_HEADER_NO_CHECKSUM;
1910 }
1911
1912
1913
1914
1915
1916 public HFileContext getHFileContext() {
1917 return this.fileContext;
1918 }
1919
1920
1921
1922
1923
1924
1925 public static String toStringHeader(ByteBuffer buf) throws IOException {
1926 byte[] magicBuf = new byte[Math.min(buf.limit() - buf.position(), BlockType.MAGIC_LENGTH)];
1927 buf.get(magicBuf);
1928 int compressedBlockSizeNoHeader = buf.getInt();
1929 int uncompressedBlockSizeNoHeader = buf.getInt();
1930 long prevBlockOffset = buf.getLong();
1931 byte cksumtype = buf.get();
1932 long bytesPerChecksum = buf.getInt();
1933 long onDiskDataSizeWithHeader = buf.getInt();
1934 return " Header dump: magic: " + Bytes.toString(magicBuf) +
1935 " blockType " + magicBuf +
1936 " compressedBlockSizeNoHeader " +
1937 compressedBlockSizeNoHeader +
1938 " uncompressedBlockSizeNoHeader " +
1939 uncompressedBlockSizeNoHeader +
1940 " prevBlockOffset " + prevBlockOffset +
1941 " checksumType " + ChecksumType.codeToType(cksumtype) +
1942 " bytesPerChecksum " + bytesPerChecksum +
1943 " onDiskDataSizeWithHeader " + onDiskDataSizeWithHeader;
1944 }
1945 }