1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.hadoop.hbase.io.encoding;
18
19 import java.io.DataInputStream;
20 import java.io.DataOutputStream;
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23
24 import org.apache.hadoop.hbase.Cell;
25 import org.apache.hadoop.hbase.CellComparator;
26 import org.apache.hadoop.hbase.CellUtil;
27 import org.apache.hadoop.hbase.HConstants;
28 import org.apache.hadoop.hbase.KeyValue;
29 import org.apache.hadoop.hbase.KeyValue.KVComparator;
30 import org.apache.hadoop.hbase.KeyValue.SamePrefixComparator;
31 import org.apache.hadoop.hbase.KeyValue.Type;
32 import org.apache.hadoop.hbase.KeyValueUtil;
33 import org.apache.hadoop.hbase.SettableSequenceId;
34 import org.apache.hadoop.hbase.classification.InterfaceAudience;
35 import org.apache.hadoop.hbase.io.HeapSize;
36 import org.apache.hadoop.hbase.io.TagCompressionContext;
37 import org.apache.hadoop.hbase.io.hfile.BlockType;
38 import org.apache.hadoop.hbase.io.hfile.HFileContext;
39 import org.apache.hadoop.hbase.io.util.LRUDictionary;
40 import org.apache.hadoop.hbase.util.ByteBufferUtils;
41 import org.apache.hadoop.hbase.util.Bytes;
42 import org.apache.hadoop.hbase.util.ClassSize;
43 import org.apache.hadoop.io.WritableUtils;
44
45
46
47
48 @InterfaceAudience.Private
49 abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
50
51 private static int INITIAL_KEY_BUFFER_SIZE = 512;
52
53 @Override
54 public ByteBuffer decodeKeyValues(DataInputStream source,
55 HFileBlockDecodingContext blkDecodingCtx) throws IOException {
56 if (blkDecodingCtx.getClass() != HFileBlockDefaultDecodingContext.class) {
57 throw new IOException(this.getClass().getName() + " only accepts "
58 + HFileBlockDefaultDecodingContext.class.getName() + " as the decoding context.");
59 }
60
61 HFileBlockDefaultDecodingContext decodingCtx =
62 (HFileBlockDefaultDecodingContext) blkDecodingCtx;
63 if (decodingCtx.getHFileContext().isIncludesTags()
64 && decodingCtx.getHFileContext().isCompressTags()) {
65 if (decodingCtx.getTagCompressionContext() != null) {
66
67
68 decodingCtx.getTagCompressionContext().clear();
69 } else {
70 try {
71 TagCompressionContext tagCompressionContext = new TagCompressionContext(
72 LRUDictionary.class, Byte.MAX_VALUE);
73 decodingCtx.setTagCompressionContext(tagCompressionContext);
74 } catch (Exception e) {
75 throw new IOException("Failed to initialize TagCompressionContext", e);
76 }
77 }
78 }
79 return internalDecodeKeyValues(source, 0, 0, decodingCtx);
80 }
81
82 protected static class SeekerState implements Cell {
83 protected ByteBuffer currentBuffer;
84 protected TagCompressionContext tagCompressionContext;
85 protected int valueOffset = -1;
86 protected int keyLength;
87 protected int valueLength;
88 protected int lastCommonPrefix;
89 protected int tagsLength = 0;
90 protected int tagsOffset = -1;
91 protected int tagsCompressedLength = 0;
92 protected boolean uncompressTags = true;
93
94
95 protected byte[] keyBuffer = new byte[INITIAL_KEY_BUFFER_SIZE];
96 protected byte[] tagsBuffer = new byte[INITIAL_KEY_BUFFER_SIZE];
97
98 protected long memstoreTS;
99 protected int nextKvOffset;
100 protected KeyValue.KeyOnlyKeyValue currentKey = new KeyValue.KeyOnlyKeyValue();
101
102 protected boolean isValid() {
103 return valueOffset != -1;
104 }
105
106 protected void invalidate() {
107 valueOffset = -1;
108 tagsCompressedLength = 0;
109 currentKey = new KeyValue.KeyOnlyKeyValue();
110 uncompressTags = true;
111 currentBuffer = null;
112 }
113
114 protected void ensureSpaceForKey() {
115 if (keyLength > keyBuffer.length) {
116
117 int newKeyBufferLength = Math.max(keyBuffer.length, 1) * 2;
118 while (keyLength > newKeyBufferLength) {
119 newKeyBufferLength *= 2;
120 }
121 byte[] newKeyBuffer = new byte[newKeyBufferLength];
122 System.arraycopy(keyBuffer, 0, newKeyBuffer, 0, keyBuffer.length);
123 keyBuffer = newKeyBuffer;
124 }
125 }
126
127 protected void ensureSpaceForTags() {
128 if (tagsLength > tagsBuffer.length) {
129
130 int newTagsBufferLength = Math.max(tagsBuffer.length, 1) * 2;
131 while (tagsLength > newTagsBufferLength) {
132 newTagsBufferLength *= 2;
133 }
134 byte[] newTagsBuffer = new byte[newTagsBufferLength];
135 System.arraycopy(tagsBuffer, 0, newTagsBuffer, 0, tagsBuffer.length);
136 tagsBuffer = newTagsBuffer;
137 }
138 }
139
140 protected void setKey(byte[] keyBuffer, long memTS) {
141 currentKey.setKey(keyBuffer, 0, keyLength);
142 memstoreTS = memTS;
143 }
144
145
146
147
148
149
150 protected void copyFromNext(SeekerState nextState) {
151 if (keyBuffer.length != nextState.keyBuffer.length) {
152 keyBuffer = nextState.keyBuffer.clone();
153 } else if (!isValid()) {
154
155
156 System.arraycopy(nextState.keyBuffer, 0, keyBuffer, 0,
157 nextState.keyLength);
158 } else {
159
160 System.arraycopy(nextState.keyBuffer, nextState.lastCommonPrefix,
161 keyBuffer, nextState.lastCommonPrefix, nextState.keyLength
162 - nextState.lastCommonPrefix);
163 }
164 currentKey = nextState.currentKey;
165
166 valueOffset = nextState.valueOffset;
167 keyLength = nextState.keyLength;
168 valueLength = nextState.valueLength;
169 lastCommonPrefix = nextState.lastCommonPrefix;
170 nextKvOffset = nextState.nextKvOffset;
171 memstoreTS = nextState.memstoreTS;
172 currentBuffer = nextState.currentBuffer;
173 tagsOffset = nextState.tagsOffset;
174 tagsLength = nextState.tagsLength;
175 if (nextState.tagCompressionContext != null) {
176 tagCompressionContext = nextState.tagCompressionContext;
177 }
178 }
179
180 @Override
181 public byte[] getRowArray() {
182 return currentKey.getRowArray();
183 }
184
185 @Override
186 public int getRowOffset() {
187 return Bytes.SIZEOF_SHORT;
188 }
189
190 @Override
191 public short getRowLength() {
192 return currentKey.getRowLength();
193 }
194
195 @Override
196 public byte[] getFamilyArray() {
197 return currentKey.getFamilyArray();
198 }
199
200 @Override
201 public int getFamilyOffset() {
202 return currentKey.getFamilyOffset();
203 }
204
205 @Override
206 public byte getFamilyLength() {
207 return currentKey.getFamilyLength();
208 }
209
210 @Override
211 public byte[] getQualifierArray() {
212 return currentKey.getQualifierArray();
213 }
214
215 @Override
216 public int getQualifierOffset() {
217 return currentKey.getQualifierOffset();
218 }
219
220 @Override
221 public int getQualifierLength() {
222 return currentKey.getQualifierLength();
223 }
224
225 @Override
226 public long getTimestamp() {
227 return currentKey.getTimestamp();
228 }
229
230 @Override
231 public byte getTypeByte() {
232 return currentKey.getTypeByte();
233 }
234
235 @Override
236 public long getMvccVersion() {
237 return memstoreTS;
238 }
239
240 @Override
241 public long getSequenceId() {
242 return memstoreTS;
243 }
244
245 @Override
246 public byte[] getValueArray() {
247 return currentBuffer.array();
248 }
249
250 @Override
251 public int getValueOffset() {
252 return currentBuffer.arrayOffset() + valueOffset;
253 }
254
255 @Override
256 public int getValueLength() {
257 return valueLength;
258 }
259
260 @Override
261 public byte[] getTagsArray() {
262 if (tagCompressionContext != null) {
263 return tagsBuffer;
264 }
265 return currentBuffer.array();
266 }
267
268 @Override
269 public int getTagsOffset() {
270 if (tagCompressionContext != null) {
271 return 0;
272 }
273 return currentBuffer.arrayOffset() + tagsOffset;
274 }
275
276 @Override
277 public int getTagsLength() {
278 return tagsLength;
279 }
280
281 @Override
282 @Deprecated
283 public byte[] getValue() {
284 throw new UnsupportedOperationException("getValue() not supported");
285 }
286
287 @Override
288 @Deprecated
289 public byte[] getFamily() {
290 throw new UnsupportedOperationException("getFamily() not supported");
291 }
292
293 @Override
294 @Deprecated
295 public byte[] getQualifier() {
296 throw new UnsupportedOperationException("getQualifier() not supported");
297 }
298
299 @Override
300 @Deprecated
301 public byte[] getRow() {
302 throw new UnsupportedOperationException("getRow() not supported");
303 }
304
305 @Override
306 public String toString() {
307 return KeyValue.keyToString(this.keyBuffer, 0, KeyValueUtil.keyLength(this)) + "/vlen="
308 + getValueLength() + "/seqid=" + memstoreTS;
309 }
310
311 public Cell shallowCopy() {
312 return new ClonedSeekerState(currentBuffer, keyBuffer, currentKey.getRowLength(),
313 currentKey.getFamilyOffset(), currentKey.getFamilyLength(), keyLength,
314 currentKey.getQualifierOffset(), currentKey.getQualifierLength(),
315 currentKey.getTimestamp(), currentKey.getTypeByte(), valueLength, valueOffset,
316 memstoreTS, tagsOffset, tagsLength, tagCompressionContext, tagsBuffer);
317 }
318 }
319
320
321
322
323
324
325
326
327
328
329
330 protected static class ClonedSeekerState implements Cell, HeapSize, SettableSequenceId {
331 private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
332 + (4 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT)
333 + (Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) + (2 * ClassSize.ARRAY));
334 private byte[] keyOnlyBuffer;
335 private ByteBuffer currentBuffer;
336 private short rowLength;
337 private int familyOffset;
338 private byte familyLength;
339 private int qualifierOffset;
340 private int qualifierLength;
341 private long timestamp;
342 private byte typeByte;
343 private int valueOffset;
344 private int valueLength;
345 private int tagsLength;
346 private int tagsOffset;
347 private byte[] cloneTagsBuffer;
348 private long seqId;
349 private TagCompressionContext tagCompressionContext;
350
351 protected ClonedSeekerState(ByteBuffer currentBuffer, byte[] keyBuffer, short rowLength,
352 int familyOffset, byte familyLength, int keyLength, int qualOffset, int qualLength,
353 long timeStamp, byte typeByte, int valueLen, int valueOffset, long seqId,
354 int tagsOffset, int tagsLength, TagCompressionContext tagCompressionContext,
355 byte[] tagsBuffer) {
356 this.currentBuffer = currentBuffer;
357 keyOnlyBuffer = new byte[keyLength];
358 this.tagCompressionContext = tagCompressionContext;
359 this.rowLength = rowLength;
360 this.familyOffset = familyOffset;
361 this.familyLength = familyLength;
362 this.qualifierOffset = qualOffset;
363 this.qualifierLength = qualLength;
364 this.timestamp = timeStamp;
365 this.typeByte = typeByte;
366 this.valueLength = valueLen;
367 this.valueOffset = valueOffset;
368 this.tagsOffset = tagsOffset;
369 this.tagsLength = tagsLength;
370 System.arraycopy(keyBuffer, 0, keyOnlyBuffer, 0, keyLength);
371 if (tagCompressionContext != null) {
372 this.cloneTagsBuffer = new byte[tagsLength];
373 System.arraycopy(tagsBuffer, 0, this.cloneTagsBuffer, 0, tagsLength);
374 }
375 setSequenceId(seqId);
376 }
377
378 @Override
379 public byte[] getRowArray() {
380 return keyOnlyBuffer;
381 }
382
383 @Override
384 public byte[] getFamilyArray() {
385 return keyOnlyBuffer;
386 }
387
388 @Override
389 public byte[] getQualifierArray() {
390 return keyOnlyBuffer;
391 }
392
393 @Override
394 public int getRowOffset() {
395 return Bytes.SIZEOF_SHORT;
396 }
397
398 @Override
399 public short getRowLength() {
400 return rowLength;
401 }
402
403 @Override
404 public int getFamilyOffset() {
405 return familyOffset;
406 }
407
408 @Override
409 public byte getFamilyLength() {
410 return familyLength;
411 }
412
413 @Override
414 public int getQualifierOffset() {
415 return qualifierOffset;
416 }
417
418 @Override
419 public int getQualifierLength() {
420 return qualifierLength;
421 }
422
423 @Override
424 public long getTimestamp() {
425 return timestamp;
426 }
427
428 @Override
429 public byte getTypeByte() {
430 return typeByte;
431 }
432
433 @Override
434 @Deprecated
435 public long getMvccVersion() {
436 return getSequenceId();
437 }
438
439 @Override
440 public long getSequenceId() {
441 return seqId;
442 }
443
444 @Override
445 public byte[] getValueArray() {
446 return currentBuffer.array();
447 }
448
449 @Override
450 public int getValueOffset() {
451 return currentBuffer.arrayOffset() + valueOffset;
452 }
453
454 @Override
455 public int getValueLength() {
456 return valueLength;
457 }
458
459 @Override
460 public byte[] getTagsArray() {
461 if (tagCompressionContext != null) {
462 return cloneTagsBuffer;
463 }
464 return currentBuffer.array();
465 }
466
467 @Override
468 public int getTagsOffset() {
469 if (tagCompressionContext != null) {
470 return 0;
471 }
472 return currentBuffer.arrayOffset() + tagsOffset;
473 }
474
475 @Override
476 public int getTagsLength() {
477 return tagsLength;
478 }
479
480 @Override
481 @Deprecated
482 public byte[] getValue() {
483 return CellUtil.cloneValue(this);
484 }
485
486 @Override
487 @Deprecated
488 public byte[] getFamily() {
489 return CellUtil.cloneFamily(this);
490 }
491
492 @Override
493 @Deprecated
494 public byte[] getQualifier() {
495 return CellUtil.cloneQualifier(this);
496 }
497
498 @Override
499 @Deprecated
500 public byte[] getRow() {
501 return CellUtil.cloneRow(this);
502 }
503
504 @Override
505 public String toString() {
506 return KeyValue.keyToString(this.keyOnlyBuffer, 0, KeyValueUtil.keyLength(this)) + "/vlen="
507 + getValueLength() + "/seqid=" + seqId;
508 }
509
510 @Override
511 public void setSequenceId(long seqId) {
512 this.seqId = seqId;
513 }
514
515 @Override
516 public long heapSize() {
517 return FIXED_OVERHEAD + rowLength + familyLength + qualifierLength + valueLength + tagsLength;
518 }
519 }
520
521 protected abstract static class
522 BufferedEncodedSeeker<STATE extends SeekerState>
523 implements EncodedSeeker {
524 protected HFileBlockDecodingContext decodingCtx;
525 protected final KVComparator comparator;
526 protected final SamePrefixComparator<byte[]> samePrefixComparator;
527 protected ByteBuffer currentBuffer;
528 protected STATE current = createSeekerState();
529 protected STATE previous = createSeekerState();
530 protected TagCompressionContext tagCompressionContext = null;
531
532 public BufferedEncodedSeeker(KVComparator comparator,
533 HFileBlockDecodingContext decodingCtx) {
534 this.comparator = comparator;
535 this.samePrefixComparator = comparator;
536 this.decodingCtx = decodingCtx;
537 if (decodingCtx.getHFileContext().isCompressTags()) {
538 try {
539 tagCompressionContext = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
540 } catch (Exception e) {
541 throw new RuntimeException("Failed to initialize TagCompressionContext", e);
542 }
543 }
544 }
545
546 protected boolean includesMvcc() {
547 return this.decodingCtx.getHFileContext().isIncludesMvcc();
548 }
549
550 protected boolean includesTags() {
551 return this.decodingCtx.getHFileContext().isIncludesTags();
552 }
553
554 @Override
555 public int compareKey(KVComparator comparator, byte[] key, int offset, int length) {
556 return comparator.compareFlatKey(key, offset, length,
557 current.keyBuffer, 0, current.keyLength);
558 }
559
560 @Override
561 public int compareKey(KVComparator comparator, Cell key) {
562 return comparator.compareOnlyKeyPortion(key,
563 new KeyValue.KeyOnlyKeyValue(current.keyBuffer, 0, current.keyLength));
564 }
565
566 @Override
567 public void setCurrentBuffer(ByteBuffer buffer) {
568 if (this.tagCompressionContext != null) {
569 this.tagCompressionContext.clear();
570 }
571 currentBuffer = buffer;
572 current.currentBuffer = currentBuffer;
573 if(tagCompressionContext != null) {
574 current.tagCompressionContext = tagCompressionContext;
575 }
576 decodeFirst();
577 current.setKey(current.keyBuffer, current.memstoreTS);
578 previous.invalidate();
579 }
580
581 @Override
582 public ByteBuffer getKeyDeepCopy() {
583 ByteBuffer keyBuffer = ByteBuffer.allocate(current.keyLength);
584 keyBuffer.put(current.keyBuffer, 0, current.keyLength);
585 keyBuffer.rewind();
586 return keyBuffer;
587 }
588
589 @Override
590 public ByteBuffer getValueShallowCopy() {
591 ByteBuffer dup = currentBuffer.duplicate();
592 dup.position(current.valueOffset);
593 dup.limit(current.valueOffset + current.valueLength);
594 return dup.slice();
595 }
596
597 @Override
598 public Cell getKeyValue() {
599 return current.shallowCopy();
600 }
601
602 @Override
603 public void rewind() {
604 currentBuffer.rewind();
605 if (tagCompressionContext != null) {
606 tagCompressionContext.clear();
607 }
608 decodeFirst();
609 current.setKey(current.keyBuffer, current.memstoreTS);
610 previous.invalidate();
611 }
612
613 @Override
614 public boolean next() {
615 if (!currentBuffer.hasRemaining()) {
616 return false;
617 }
618 decodeNext();
619 current.setKey(current.keyBuffer, current.memstoreTS);
620 previous.invalidate();
621 return true;
622 }
623
624 protected void decodeTags() {
625 current.tagsLength = ByteBufferUtils.readCompressedInt(currentBuffer);
626 if (tagCompressionContext != null) {
627 if (current.uncompressTags) {
628
629 current.ensureSpaceForTags();
630 try {
631 current.tagsCompressedLength = tagCompressionContext.uncompressTags(currentBuffer,
632 current.tagsBuffer, 0, current.tagsLength);
633 } catch (IOException e) {
634 throw new RuntimeException("Exception while uncompressing tags", e);
635 }
636 } else {
637 ByteBufferUtils.skip(currentBuffer, current.tagsCompressedLength);
638 current.uncompressTags = true;
639 }
640 current.tagsOffset = -1;
641 } else {
642
643
644 current.tagsOffset = currentBuffer.position();
645 ByteBufferUtils.skip(currentBuffer, current.tagsLength);
646 }
647 }
648
649 @Override
650 public int seekToKeyInBlock(byte[] key, int offset, int length, boolean seekBefore) {
651 return seekToKeyInBlock(new KeyValue.KeyOnlyKeyValue(key, offset, length), seekBefore);
652 }
653
654 @Override
655 public int seekToKeyInBlock(Cell seekCell, boolean seekBefore) {
656 int rowCommonPrefix = 0;
657 int familyCommonPrefix = 0;
658 int qualCommonPrefix = 0;
659 previous.invalidate();
660 KeyValue.KeyOnlyKeyValue currentCell = new KeyValue.KeyOnlyKeyValue();
661 do {
662 int comp;
663 if (samePrefixComparator != null) {
664 currentCell.setKey(current.keyBuffer, 0, current.keyLength);
665 if (current.lastCommonPrefix != 0) {
666
667
668
669
670
671 rowCommonPrefix = Math.min(rowCommonPrefix, current.lastCommonPrefix - 2);
672 }
673 if (current.lastCommonPrefix <= 2) {
674 rowCommonPrefix = 0;
675 }
676 rowCommonPrefix += CellComparator.findCommonPrefixInRowPart(seekCell, currentCell,
677 rowCommonPrefix);
678 comp = CellComparator.compareCommonRowPrefix(seekCell, currentCell, rowCommonPrefix);
679 if (comp == 0) {
680 comp = compareTypeBytes(seekCell, currentCell);
681 if (comp == 0) {
682
683 familyCommonPrefix = Math.max(
684 0,
685 Math.min(familyCommonPrefix,
686 current.lastCommonPrefix - (3 + currentCell.getRowLength())));
687 familyCommonPrefix += CellComparator.findCommonPrefixInFamilyPart(seekCell,
688 currentCell, familyCommonPrefix);
689 comp = CellComparator.compareCommonFamilyPrefix(seekCell, currentCell,
690 familyCommonPrefix);
691 if (comp == 0) {
692
693
694 qualCommonPrefix = Math.max(
695 0,
696 Math.min(
697 qualCommonPrefix,
698 current.lastCommonPrefix
699 - (3 + currentCell.getRowLength() + currentCell.getFamilyLength())));
700 qualCommonPrefix += CellComparator.findCommonPrefixInQualifierPart(seekCell,
701 currentCell, qualCommonPrefix);
702 comp = CellComparator.compareCommonQualifierPrefix(seekCell, currentCell,
703 qualCommonPrefix);
704 if (comp == 0) {
705 comp = CellComparator.compareTimestamps(seekCell, currentCell);
706 if (comp == 0) {
707
708
709
710
711
712
713
714
715 comp = (0xff & currentCell.getTypeByte()) - (0xff & seekCell.getTypeByte());
716 }
717 }
718 }
719 }
720 }
721 } else {
722 Cell r = new KeyValue.KeyOnlyKeyValue(current.keyBuffer, 0, current.keyLength);
723 comp = comparator.compareOnlyKeyPortion(seekCell, r);
724 }
725 if (comp == 0) {
726 if (seekBefore) {
727 if (!previous.isValid()) {
728
729
730 throw new IllegalStateException("Cannot seekBefore if "
731 + "positioned at the first key in the block: key="
732 + Bytes.toStringBinary(seekCell.getRowArray()));
733 }
734 moveToPrevious();
735 return 1;
736 }
737 return 0;
738 }
739
740 if (comp < 0) {
741 if (previous.isValid()) {
742 moveToPrevious();
743 } else {
744 return HConstants.INDEX_KEY_MAGIC;
745 }
746 return 1;
747 }
748
749
750 if (currentBuffer.hasRemaining()) {
751 previous.copyFromNext(current);
752 decodeNext();
753 current.setKey(current.keyBuffer, current.memstoreTS);
754 } else {
755 break;
756 }
757 } while (true);
758
759
760 return 1;
761 }
762
763 private int compareTypeBytes(Cell key, Cell right) {
764 if (key.getFamilyLength() + key.getQualifierLength() == 0
765 && key.getTypeByte() == Type.Minimum.getCode()) {
766
767 return 1;
768 }
769 if (right.getFamilyLength() + right.getQualifierLength() == 0
770 && right.getTypeByte() == Type.Minimum.getCode()) {
771 return -1;
772 }
773 return 0;
774 }
775
776
777 private void moveToPrevious() {
778 if (!previous.isValid()) {
779 throw new IllegalStateException(
780 "Can move back only once and not in first key in the block.");
781 }
782
783 STATE tmp = previous;
784 previous = current;
785 current = tmp;
786
787
788 currentBuffer.position(current.nextKvOffset);
789
790
791
792
793
794
795 current.tagsBuffer = previous.tagsBuffer;
796 current.tagsCompressedLength = previous.tagsCompressedLength;
797 current.uncompressTags = false;
798 current.setKey(current.keyBuffer, current.memstoreTS);
799 previous.invalidate();
800 }
801
802 @SuppressWarnings("unchecked")
803 protected STATE createSeekerState() {
804
805
806 return (STATE) new SeekerState();
807 }
808
809 abstract protected void decodeFirst();
810 abstract protected void decodeNext();
811 }
812
813
814
815
816
817
818
819
820 protected final int afterEncodingKeyValue(Cell cell, DataOutputStream out,
821 HFileBlockDefaultEncodingContext encodingCtx) throws IOException {
822 int size = 0;
823 if (encodingCtx.getHFileContext().isIncludesTags()) {
824 int tagsLength = cell.getTagsLength();
825 ByteBufferUtils.putCompressedInt(out, tagsLength);
826
827 if (tagsLength > 0) {
828 TagCompressionContext tagCompressionContext = encodingCtx.getTagCompressionContext();
829
830
831 if (tagCompressionContext != null) {
832 tagCompressionContext
833 .compressTags(out, cell.getTagsArray(), cell.getTagsOffset(), tagsLength);
834 } else {
835 out.write(cell.getTagsArray(), cell.getTagsOffset(), tagsLength);
836 }
837 }
838 size += tagsLength + KeyValue.TAGS_LENGTH_SIZE;
839 }
840 if (encodingCtx.getHFileContext().isIncludesMvcc()) {
841
842 long memstoreTS = cell.getSequenceId();
843 WritableUtils.writeVLong(out, memstoreTS);
844
845
846 size += WritableUtils.getVIntSize(memstoreTS);
847 }
848 return size;
849 }
850
851 protected final void afterDecodingKeyValue(DataInputStream source,
852 ByteBuffer dest, HFileBlockDefaultDecodingContext decodingCtx) throws IOException {
853 if (decodingCtx.getHFileContext().isIncludesTags()) {
854 int tagsLength = ByteBufferUtils.readCompressedInt(source);
855
856 dest.put((byte) ((tagsLength >> 8) & 0xff));
857 dest.put((byte) (tagsLength & 0xff));
858 if (tagsLength > 0) {
859 TagCompressionContext tagCompressionContext = decodingCtx.getTagCompressionContext();
860
861
862 if (tagCompressionContext != null) {
863 tagCompressionContext.uncompressTags(source, dest, tagsLength);
864 } else {
865 ByteBufferUtils.copyFromStreamToBuffer(dest, source, tagsLength);
866 }
867 }
868 }
869 if (decodingCtx.getHFileContext().isIncludesMvcc()) {
870 long memstoreTS = -1;
871 try {
872
873
874 memstoreTS = WritableUtils.readVLong(source);
875 ByteBufferUtils.writeVLong(dest, memstoreTS);
876 } catch (IOException ex) {
877 throw new RuntimeException("Unable to copy memstore timestamp " +
878 memstoreTS + " after decoding a key/value");
879 }
880 }
881 }
882
883 @Override
884 public HFileBlockEncodingContext newDataBlockEncodingContext(DataBlockEncoding encoding,
885 byte[] header, HFileContext meta) {
886 return new HFileBlockDefaultEncodingContext(encoding, header, meta);
887 }
888
889 @Override
890 public HFileBlockDecodingContext newDataBlockDecodingContext(HFileContext meta) {
891 return new HFileBlockDefaultDecodingContext(meta);
892 }
893
894 protected abstract ByteBuffer internalDecodeKeyValues(DataInputStream source,
895 int allocateHeaderLength, int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx)
896 throws IOException;
897
898
899
900
901
902
903
904
905 protected static void ensureSpace(ByteBuffer out, int length)
906 throws EncoderBufferTooSmallException {
907 if (out.position() + length > out.limit()) {
908 throw new EncoderBufferTooSmallException(
909 "Buffer position=" + out.position() +
910 ", buffer limit=" + out.limit() +
911 ", length to be written=" + length);
912 }
913 }
914
915 @Override
916 public void startBlockEncoding(HFileBlockEncodingContext blkEncodingCtx, DataOutputStream out)
917 throws IOException {
918 if (blkEncodingCtx.getClass() != HFileBlockDefaultEncodingContext.class) {
919 throw new IOException (this.getClass().getName() + " only accepts "
920 + HFileBlockDefaultEncodingContext.class.getName() + " as the " +
921 "encoding context.");
922 }
923
924 HFileBlockDefaultEncodingContext encodingCtx =
925 (HFileBlockDefaultEncodingContext) blkEncodingCtx;
926 encodingCtx.prepareEncoding(out);
927 if (encodingCtx.getHFileContext().isIncludesTags()
928 && encodingCtx.getHFileContext().isCompressTags()) {
929 if (encodingCtx.getTagCompressionContext() != null) {
930
931
932 encodingCtx.getTagCompressionContext().clear();
933 } else {
934 try {
935 TagCompressionContext tagCompressionContext = new TagCompressionContext(
936 LRUDictionary.class, Byte.MAX_VALUE);
937 encodingCtx.setTagCompressionContext(tagCompressionContext);
938 } catch (Exception e) {
939 throw new IOException("Failed to initialize TagCompressionContext", e);
940 }
941 }
942 }
943 ByteBufferUtils.putInt(out, 0);
944 blkEncodingCtx.setEncodingState(new BufferedDataBlockEncodingState());
945 }
946
947 private static class BufferedDataBlockEncodingState extends EncodingState {
948 int unencodedDataSizeWritten = 0;
949 }
950
951 @Override
952 public int encode(Cell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
953 throws IOException {
954 BufferedDataBlockEncodingState state = (BufferedDataBlockEncodingState) encodingCtx
955 .getEncodingState();
956 int encodedKvSize = internalEncode(cell, (HFileBlockDefaultEncodingContext) encodingCtx, out);
957 state.unencodedDataSizeWritten += encodedKvSize;
958 return encodedKvSize;
959 }
960
961 public abstract int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingCtx,
962 DataOutputStream out) throws IOException;
963
964 @Override
965 public void endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out,
966 byte[] uncompressedBytesWithHeader) throws IOException {
967 BufferedDataBlockEncodingState state = (BufferedDataBlockEncodingState) encodingCtx
968 .getEncodingState();
969
970 Bytes.putInt(uncompressedBytesWithHeader, HConstants.HFILEBLOCK_HEADER_SIZE
971 + DataBlockEncoding.ID_SIZE, state.unencodedDataSizeWritten
972 );
973 if (encodingCtx.getDataBlockEncoding() != DataBlockEncoding.NONE) {
974 encodingCtx.postEncoding(BlockType.ENCODED_DATA);
975 } else {
976 encodingCtx.postEncoding(BlockType.DATA);
977 }
978 }
979 }