View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations
15   * under the License.
16   */
17  package org.apache.hadoop.hbase.io.encoding;
18  
19  import java.io.DataInputStream;
20  import java.io.DataOutputStream;
21  import java.io.IOException;
22  import java.io.OutputStream;
23  import java.nio.ByteBuffer;
24  
25  import org.apache.hadoop.hbase.ByteBufferedCell;
26  import org.apache.hadoop.hbase.Cell;
27  import org.apache.hadoop.hbase.CellComparator;
28  import org.apache.hadoop.hbase.CellUtil;
29  import org.apache.hadoop.hbase.HConstants;
30  import org.apache.hadoop.hbase.KeyValue;
31  import org.apache.hadoop.hbase.KeyValue.Type;
32  import org.apache.hadoop.hbase.KeyValueUtil;
33  import org.apache.hadoop.hbase.SettableSequenceId;
34  import org.apache.hadoop.hbase.Streamable;
35  import org.apache.hadoop.hbase.classification.InterfaceAudience;
36  import org.apache.hadoop.hbase.io.HeapSize;
37  import org.apache.hadoop.hbase.io.TagCompressionContext;
38  import org.apache.hadoop.hbase.io.util.LRUDictionary;
39  import org.apache.hadoop.hbase.io.util.StreamUtils;
40  import org.apache.hadoop.hbase.nio.ByteBuff;
41  import org.apache.hadoop.hbase.util.ByteBufferUtils;
42  import org.apache.hadoop.hbase.util.Bytes;
43  import org.apache.hadoop.hbase.util.ClassSize;
44  import org.apache.hadoop.hbase.util.ObjectIntPair;
45  import org.apache.hadoop.io.WritableUtils;
46
47  /**
48   * Base class for all data block encoders that use a buffer.
49   */
50  @InterfaceAudience.Private
51  abstract class BufferedDataBlockEncoder extends AbstractDataBlockEncoder {
52    /**
53     * TODO: This datablockencoder is dealing in internals of hfileblocks. Purge reference to HFBs
54     */
55    private static int INITIAL_KEY_BUFFER_SIZE = 512;
56
57    @Override
58    public ByteBuffer decodeKeyValues(DataInputStream source,
59        HFileBlockDecodingContext blkDecodingCtx) throws IOException {
60      if (blkDecodingCtx.getClass() != HFileBlockDefaultDecodingContext.class) {
61        throw new IOException(this.getClass().getName() + " only accepts "
62            + HFileBlockDefaultDecodingContext.class.getName() + " as the decoding context.");
63      }
64
65      HFileBlockDefaultDecodingContext decodingCtx =
66          (HFileBlockDefaultDecodingContext) blkDecodingCtx;
67      if (decodingCtx.getHFileContext().isIncludesTags()
68          && decodingCtx.getHFileContext().isCompressTags()) {
69        if (decodingCtx.getTagCompressionContext() != null) {
70          // It will be overhead to create the TagCompressionContext again and again for every block
71          // decoding.
72          decodingCtx.getTagCompressionContext().clear();
73        } else {
74          try {
75            TagCompressionContext tagCompressionContext = new TagCompressionContext(
76                LRUDictionary.class, Byte.MAX_VALUE);
77            decodingCtx.setTagCompressionContext(tagCompressionContext);
78          } catch (Exception e) {
79            throw new IOException("Failed to initialize TagCompressionContext", e);
80          }
81        }
82      }
83      return internalDecodeKeyValues(source, 0, 0, decodingCtx);
84    }
85
86    /********************* common prefixes *************************/
87    // Having this as static is fine but if META is having DBE then we should
88    // change this.
89    public static int compareCommonRowPrefix(Cell left, Cell right, int rowCommonPrefix) {
90      return Bytes.compareTo(left.getRowArray(), left.getRowOffset() + rowCommonPrefix,
91          left.getRowLength() - rowCommonPrefix, right.getRowArray(), right.getRowOffset()
92              + rowCommonPrefix, right.getRowLength() - rowCommonPrefix);
93    }
94
95    public static int compareCommonFamilyPrefix(Cell left, Cell right, int familyCommonPrefix) {
96      return Bytes.compareTo(left.getFamilyArray(), left.getFamilyOffset() + familyCommonPrefix,
97          left.getFamilyLength() - familyCommonPrefix, right.getFamilyArray(),
98          right.getFamilyOffset() + familyCommonPrefix, right.getFamilyLength() - familyCommonPrefix);
99    }
100
101   public static int compareCommonQualifierPrefix(Cell left, Cell right, int qualCommonPrefix) {
102     return Bytes.compareTo(left.getQualifierArray(), left.getQualifierOffset() + qualCommonPrefix,
103         left.getQualifierLength() - qualCommonPrefix, right.getQualifierArray(),
104         right.getQualifierOffset() + qualCommonPrefix, right.getQualifierLength()
105             - qualCommonPrefix);
106   }
107
108   protected static class SeekerState {
109     protected ByteBuff currentBuffer;
110     protected TagCompressionContext tagCompressionContext;
111     protected int valueOffset = -1;
112     protected int keyLength;
113     protected int valueLength;
114     protected int lastCommonPrefix;
115     protected int tagsLength = 0;
116     protected int tagsOffset = -1;
117     protected int tagsCompressedLength = 0;
118     protected boolean uncompressTags = true;
119
120     /** We need to store a copy of the key. */
121     protected byte[] keyBuffer = HConstants.EMPTY_BYTE_ARRAY;
122     protected byte[] tagsBuffer = HConstants.EMPTY_BYTE_ARRAY;
123
124     protected long memstoreTS;
125     protected int nextKvOffset;
126     protected KeyValue.KeyOnlyKeyValue currentKey = new KeyValue.KeyOnlyKeyValue();
127     // A temp pair object which will be reused by ByteBuff#asSubByteBuffer calls. This avoids too
128     // many object creations.
129     private final ObjectIntPair<ByteBuffer> tmpPair;
130     private final boolean includeTags;
131
132     public SeekerState(ObjectIntPair<ByteBuffer> tmpPair, boolean includeTags) {
133       this.tmpPair = tmpPair;
134       this.includeTags = includeTags;
135     }
136
137     protected boolean isValid() {
138       return valueOffset != -1;
139     }
140
141     protected void invalidate() {
142       valueOffset = -1;
143       tagsCompressedLength = 0;
144       currentKey = new KeyValue.KeyOnlyKeyValue();
145       uncompressTags = true;
146       currentBuffer = null;
147     }
148
149     protected void ensureSpaceForKey() {
150       if (keyLength > keyBuffer.length) {
151         int newKeyBufferLength = Integer.highestOneBit(Math.max(
152             INITIAL_KEY_BUFFER_SIZE, keyLength) - 1) << 1;
153         byte[] newKeyBuffer = new byte[newKeyBufferLength];
154         System.arraycopy(keyBuffer, 0, newKeyBuffer, 0, keyBuffer.length);
155         keyBuffer = newKeyBuffer;
156       }
157     }
158
159     protected void ensureSpaceForTags() {
160       if (tagsLength > tagsBuffer.length) {
161         int newTagsBufferLength = Integer.highestOneBit(Math.max(
162             INITIAL_KEY_BUFFER_SIZE, tagsLength) - 1) << 1;
163         byte[] newTagsBuffer = new byte[newTagsBufferLength];
164         System.arraycopy(tagsBuffer, 0, newTagsBuffer, 0, tagsBuffer.length);
165         tagsBuffer = newTagsBuffer;
166       }
167     }
168
169     protected void setKey(byte[] keyBuffer, long memTS) {
170       currentKey.setKey(keyBuffer, 0, keyLength);
171       memstoreTS = memTS;
172     }
173
174     /**
175      * Copy the state from the next one into this instance (the previous state
176      * placeholder). Used to save the previous state when we are advancing the
177      * seeker to the next key/value.
178      */
179     protected void copyFromNext(SeekerState nextState) {
180       if (keyBuffer.length != nextState.keyBuffer.length) {
181         keyBuffer = nextState.keyBuffer.clone();
182       } else if (!isValid()) {
183         // Note: we can only call isValid before we override our state, so this
184         // comes before all the assignments at the end of this method.
185         System.arraycopy(nextState.keyBuffer, 0, keyBuffer, 0,
186              nextState.keyLength);
187       } else {
188         // don't copy the common prefix between this key and the previous one
189         System.arraycopy(nextState.keyBuffer, nextState.lastCommonPrefix,
190             keyBuffer, nextState.lastCommonPrefix, nextState.keyLength
191                 - nextState.lastCommonPrefix);
192       }
193       currentKey = nextState.currentKey;
194
195       valueOffset = nextState.valueOffset;
196       keyLength = nextState.keyLength;
197       valueLength = nextState.valueLength;
198       lastCommonPrefix = nextState.lastCommonPrefix;
199       nextKvOffset = nextState.nextKvOffset;
200       memstoreTS = nextState.memstoreTS;
201       currentBuffer = nextState.currentBuffer;
202       tagsOffset = nextState.tagsOffset;
203       tagsLength = nextState.tagsLength;
204       if (nextState.tagCompressionContext != null) {
205         tagCompressionContext = nextState.tagCompressionContext;
206       }
207     }
208
209     public Cell toCell() {
210       // Buffer backing the value and tags part from the HFileBlock's buffer
211       // When tag compression in use, this will be only the value bytes area.
212       ByteBuffer valAndTagsBuffer;
213       int vOffset;
214       int valAndTagsLength = this.valueLength;
215       int tagsLenSerializationSize = 0;
216       if (this.includeTags && this.tagCompressionContext == null) {
217         // Include the tags part also. This will be the tags bytes + 2 bytes of for storing tags
218         // length
219         tagsLenSerializationSize = this.tagsOffset - (this.valueOffset + this.valueLength);
220         valAndTagsLength += tagsLenSerializationSize + this.tagsLength;
221       }
222       this.currentBuffer.asSubByteBuffer(this.valueOffset, valAndTagsLength, this.tmpPair);
223       valAndTagsBuffer = this.tmpPair.getFirst();
224       vOffset = this.tmpPair.getSecond();// This is the offset to value part in the BB
225       if (valAndTagsBuffer.hasArray()) {
226         return toOnheapCell(valAndTagsBuffer, vOffset, tagsLenSerializationSize);
227       } else {
228         return toOffheapCell(valAndTagsBuffer, vOffset, tagsLenSerializationSize);
229       }
230     }
231
232     private Cell toOnheapCell(ByteBuffer valAndTagsBuffer, int vOffset,
233         int tagsLenSerializationSize) {
234       byte[] tagsArray = HConstants.EMPTY_BYTE_ARRAY;
235       int tOffset = 0;
236       if (this.includeTags) {
237         if (this.tagCompressionContext == null) {
238           tagsArray = valAndTagsBuffer.array();
239           tOffset = valAndTagsBuffer.arrayOffset() + vOffset + this.valueLength
240               + tagsLenSerializationSize;
241         } else {
242           tagsArray = Bytes.copy(tagsBuffer, 0, this.tagsLength);
243           tOffset = 0;
244         }
245       }
246       return new OnheapDecodedCell(Bytes.copy(keyBuffer, 0, this.keyLength),
247           currentKey.getRowLength(), currentKey.getFamilyOffset(), currentKey.getFamilyLength(),
248           currentKey.getQualifierOffset(), currentKey.getQualifierLength(),
249           currentKey.getTimestamp(), currentKey.getTypeByte(), valAndTagsBuffer.array(),
250           valAndTagsBuffer.arrayOffset() + vOffset, this.valueLength, memstoreTS, tagsArray,
251           tOffset, this.tagsLength);
252     }
253
254     private Cell toOffheapCell(ByteBuffer valAndTagsBuffer, int vOffset,
255         int tagsLenSerializationSize) {
256       ByteBuffer tagsBuf =  HConstants.EMPTY_BYTE_BUFFER;
257       int tOffset = 0;
258       if (this.includeTags) {
259         if (this.tagCompressionContext == null) {
260           tagsBuf = valAndTagsBuffer;
261           tOffset = vOffset + this.valueLength + tagsLenSerializationSize;
262         } else {
263           tagsBuf = ByteBuffer.wrap(Bytes.copy(tagsBuffer, 0, this.tagsLength));
264           tOffset = 0;
265         }
266       }
267       return new OffheapDecodedCell(ByteBuffer.wrap(Bytes.copy(keyBuffer, 0, this.keyLength)),
268           currentKey.getRowLength(), currentKey.getFamilyOffset(), currentKey.getFamilyLength(),
269           currentKey.getQualifierOffset(), currentKey.getQualifierLength(),
270           currentKey.getTimestamp(), currentKey.getTypeByte(), valAndTagsBuffer, vOffset,
271           this.valueLength, memstoreTS, tagsBuf, tOffset, this.tagsLength);
272     }
273   }
274
275   /**
276    * Copies only the key part of the keybuffer by doing a deep copy and passes the
277    * seeker state members for taking a clone.
278    * Note that the value byte[] part is still pointing to the currentBuffer and
279    * represented by the valueOffset and valueLength
280    */
281   // We return this as a Cell to the upper layers of read flow and might try setting a new SeqId
282   // there. So this has to be an instance of SettableSequenceId.
283   protected static class OnheapDecodedCell implements Cell, HeapSize, SettableSequenceId,
284       Streamable {
285     private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
286         + (3 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT)
287         + (Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) + (3 * ClassSize.ARRAY));
288     private byte[] keyOnlyBuffer;
289     private short rowLength;
290     private int familyOffset;
291     private byte familyLength;
292     private int qualifierOffset;
293     private int qualifierLength;
294     private long timestamp;
295     private byte typeByte;
296     private byte[] valueBuffer;
297     private int valueOffset;
298     private int valueLength;
299     private byte[] tagsBuffer;
300     private int tagsOffset;
301     private int tagsLength;
302     private long seqId;
303
304     protected OnheapDecodedCell(byte[] keyBuffer, short rowLength, int familyOffset,
305         byte familyLength, int qualOffset, int qualLength, long timeStamp, byte typeByte,
306         byte[] valueBuffer, int valueOffset, int valueLen, long seqId, byte[] tagsBuffer,
307         int tagsOffset, int tagsLength) {
308       this.keyOnlyBuffer = keyBuffer;
309       this.rowLength = rowLength;
310       this.familyOffset = familyOffset;
311       this.familyLength = familyLength;
312       this.qualifierOffset = qualOffset;
313       this.qualifierLength = qualLength;
314       this.timestamp = timeStamp;
315       this.typeByte = typeByte;
316       this.valueBuffer = valueBuffer;
317       this.valueOffset = valueOffset;
318       this.valueLength = valueLen;
319       this.tagsBuffer = tagsBuffer;
320       this.tagsOffset = tagsOffset;
321       this.tagsLength = tagsLength;
322       setSequenceId(seqId);
323     }
324
325     @Override
326     public byte[] getRowArray() {
327       return keyOnlyBuffer;
328     }
329
330     @Override
331     public byte[] getFamilyArray() {
332       return keyOnlyBuffer;
333     }
334
335     @Override
336     public byte[] getQualifierArray() {
337       return keyOnlyBuffer;
338     }
339
340     @Override
341     public int getRowOffset() {
342       return Bytes.SIZEOF_SHORT;
343     }
344
345     @Override
346     public short getRowLength() {
347       return rowLength;
348     }
349
350     @Override
351     public int getFamilyOffset() {
352       return familyOffset;
353     }
354
355     @Override
356     public byte getFamilyLength() {
357       return familyLength;
358     }
359
360     @Override
361     public int getQualifierOffset() {
362       return qualifierOffset;
363     }
364
365     @Override
366     public int getQualifierLength() {
367       return qualifierLength;
368     }
369
370     @Override
371     public long getTimestamp() {
372       return timestamp;
373     }
374
375     @Override
376     public byte getTypeByte() {
377       return typeByte;
378     }
379
380     @Override
381     public long getSequenceId() {
382       return seqId;
383     }
384
385     @Override
386     public byte[] getValueArray() {
387       return this.valueBuffer;
388     }
389
390     @Override
391     public int getValueOffset() {
392       return valueOffset;
393     }
394
395     @Override
396     public int getValueLength() {
397       return valueLength;
398     }
399
400     @Override
401     public byte[] getTagsArray() {
402       return this.tagsBuffer;
403     }
404
405     @Override
406     public int getTagsOffset() {
407       return this.tagsOffset;
408     }
409
410     @Override
411     public int getTagsLength() {
412       return tagsLength;
413     }
414
415     @Override
416     public String toString() {
417       return KeyValue.keyToString(this.keyOnlyBuffer, 0, KeyValueUtil.keyLength(this)) + "/vlen="
418           + getValueLength() + "/seqid=" + seqId;
419     }
420
421     @Override
422     public void setSequenceId(long seqId) {
423       this.seqId = seqId;
424     }
425
426     @Override
427     public long heapSize() {
428       return FIXED_OVERHEAD + rowLength + familyLength + qualifierLength + valueLength + tagsLength;
429     }
430
431     @Override
432     public int write(OutputStream out) throws IOException {
433       return write(out, true);
434     }
435
436     @Override
437     public int write(OutputStream out, boolean withTags) throws IOException {
438       int lenToWrite = KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength,
439           tagsLength, withTags);
440       ByteBufferUtils.putInt(out, lenToWrite);
441       ByteBufferUtils.putInt(out, keyOnlyBuffer.length);
442       ByteBufferUtils.putInt(out, valueLength);
443       // Write key
444       out.write(keyOnlyBuffer);
445       // Write value
446       out.write(this.valueBuffer, this.valueOffset, this.valueLength);
447       if (withTags) {
448         // 2 bytes tags length followed by tags bytes
449         // tags length is serialized with 2 bytes only(short way) even if the type is int.
450         // As this is non -ve numbers, we save the sign bit. See HBASE-11437
451         out.write((byte) (0xff & (this.tagsLength >> 8)));
452         out.write((byte) (0xff & this.tagsLength));
453         out.write(this.tagsBuffer, this.tagsOffset, this.tagsLength);
454       }
455       return lenToWrite + Bytes.SIZEOF_INT;
456     }
457   }
458
459   protected static class OffheapDecodedCell extends ByteBufferedCell implements HeapSize,
460       SettableSequenceId, Streamable {
461     private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
462         + (3 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT)
463         + (Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) + (3 * ClassSize.BYTE_BUFFER));
464     private ByteBuffer keyBuffer;
465     private short rowLength;
466     private int familyOffset;
467     private byte familyLength;
468     private int qualifierOffset;
469     private int qualifierLength;
470     private long timestamp;
471     private byte typeByte;
472     private ByteBuffer valueBuffer;
473     private int valueOffset;
474     private int valueLength;
475     private ByteBuffer tagsBuffer;
476     private int tagsOffset;
477     private int tagsLength;
478     private long seqId;
479
480     protected OffheapDecodedCell(ByteBuffer keyBuffer, short rowLength, int familyOffset,
481         byte familyLength, int qualOffset, int qualLength, long timeStamp, byte typeByte,
482         ByteBuffer valueBuffer, int valueOffset, int valueLen, long seqId, ByteBuffer tagsBuffer,
483         int tagsOffset, int tagsLength) {
484       // The keyBuffer is always onheap
485       assert keyBuffer.hasArray();
486       assert keyBuffer.arrayOffset() == 0;
487       this.keyBuffer = keyBuffer;
488       this.rowLength = rowLength;
489       this.familyOffset = familyOffset;
490       this.familyLength = familyLength;
491       this.qualifierOffset = qualOffset;
492       this.qualifierLength = qualLength;
493       this.timestamp = timeStamp;
494       this.typeByte = typeByte;
495       this.valueBuffer = valueBuffer;
496       this.valueOffset = valueOffset;
497       this.valueLength = valueLen;
498       this.tagsBuffer = tagsBuffer;
499       this.tagsOffset = tagsOffset;
500       this.tagsLength = tagsLength;
501       setSequenceId(seqId);
502     }
503
504     @Override
505     public byte[] getRowArray() {
506       return this.keyBuffer.array();
507     }
508
509     @Override
510     public int getRowOffset() {
511       return getRowPosition();
512     }
513
514     @Override
515     public short getRowLength() {
516       return this.rowLength;
517     }
518
519     @Override
520     public byte[] getFamilyArray() {
521       return this.keyBuffer.array();
522     }
523
524     @Override
525     public int getFamilyOffset() {
526       return getFamilyPosition();
527     }
528
529     @Override
530     public byte getFamilyLength() {
531       return this.familyLength;
532     }
533
534     @Override
535     public byte[] getQualifierArray() {
536       return this.keyBuffer.array();
537     }
538
539     @Override
540     public int getQualifierOffset() {
541       return getQualifierPosition();
542     }
543
544     @Override
545     public int getQualifierLength() {
546       return this.qualifierLength;
547     }
548
549     @Override
550     public long getTimestamp() {
551       return this.timestamp;
552     }
553
554     @Override
555     public byte getTypeByte() {
556       return this.typeByte;
557     }
558
559     @Override
560     public long getSequenceId() {
561       return this.seqId;
562     }
563
564     @Override
565     public byte[] getValueArray() {
566       return CellUtil.cloneValue(this);
567     }
568
569     @Override
570     public int getValueOffset() {
571       return 0;
572     }
573
574     @Override
575     public int getValueLength() {
576       return this.valueLength;
577     }
578
579     @Override
580     public byte[] getTagsArray() {
581       return CellUtil.cloneTags(this);
582     }
583
584     @Override
585     public int getTagsOffset() {
586       return 0;
587     }
588
589     @Override
590     public int getTagsLength() {
591       return this.tagsLength;
592     }
593
594     @Override
595     public ByteBuffer getRowByteBuffer() {
596       return this.keyBuffer;
597     }
598
599     @Override
600     public int getRowPosition() {
601       return Bytes.SIZEOF_SHORT;
602     }
603
604     @Override
605     public ByteBuffer getFamilyByteBuffer() {
606       return this.keyBuffer;
607     }
608
609     @Override
610     public int getFamilyPosition() {
611       return this.familyOffset;
612     }
613
614     @Override
615     public ByteBuffer getQualifierByteBuffer() {
616       return this.keyBuffer;
617     }
618
619     @Override
620     public int getQualifierPosition() {
621       return this.qualifierOffset;
622     }
623
624     @Override
625     public ByteBuffer getValueByteBuffer() {
626       return this.valueBuffer;
627     }
628
629     @Override
630     public int getValuePosition() {
631       return this.valueOffset;
632     }
633
634     @Override
635     public ByteBuffer getTagsByteBuffer() {
636       return this.tagsBuffer;
637     }
638
639     @Override
640     public int getTagsPosition() {
641       return this.tagsOffset;
642     }
643
644     @Override
645     public long heapSize() {
646       return FIXED_OVERHEAD + rowLength + familyLength + qualifierLength + valueLength + tagsLength;
647     }
648
649     @Override
650     public void setSequenceId(long seqId) {
651       this.seqId = seqId;
652     }
653
654     @Override
655     public int write(OutputStream out) throws IOException {
656       return write(out, true);
657     }
658
659     @Override
660     public int write(OutputStream out, boolean withTags) throws IOException {
661       int lenToWrite = KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength,
662           tagsLength, withTags);
663       ByteBufferUtils.putInt(out, lenToWrite);
664       ByteBufferUtils.putInt(out, keyBuffer.capacity());
665       ByteBufferUtils.putInt(out, valueLength);
666       // Write key
667       out.write(keyBuffer.array());
668       // Write value
669       ByteBufferUtils.copyBufferToStream(out, this.valueBuffer, this.valueOffset, this.valueLength);
670       if (withTags) {
671         // 2 bytes tags length followed by tags bytes
672         // tags length is serialized with 2 bytes only(short way) even if the type is int.
673         // As this is non -ve numbers, we save the sign bit. See HBASE-11437
674         out.write((byte) (0xff & (this.tagsLength >> 8)));
675         out.write((byte) (0xff & this.tagsLength));
676         ByteBufferUtils.copyBufferToStream(out, this.tagsBuffer, this.tagsOffset, this.tagsLength);
677       }
678       return lenToWrite + Bytes.SIZEOF_INT;
679     }
680   }
681
682   protected abstract static class BufferedEncodedSeeker<STATE extends SeekerState>
683       extends AbstractEncodedSeeker {
684     protected ByteBuff currentBuffer;
685     protected TagCompressionContext tagCompressionContext = null;
686     protected  KeyValue.KeyOnlyKeyValue keyOnlyKV = new KeyValue.KeyOnlyKeyValue();
687     // A temp pair object which will be reused by ByteBuff#asSubByteBuffer calls. This avoids too
688     // many object creations.
689     protected final ObjectIntPair<ByteBuffer> tmpPair = new ObjectIntPair<ByteBuffer>();
690     protected STATE current, previous;
691
692     public BufferedEncodedSeeker(CellComparator comparator,
693         HFileBlockDecodingContext decodingCtx) {
694       super(comparator, decodingCtx);
695       if (decodingCtx.getHFileContext().isCompressTags()) {
696         try {
697           tagCompressionContext = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
698         } catch (Exception e) {
699           throw new RuntimeException("Failed to initialize TagCompressionContext", e);
700         }
701       }
702       current = createSeekerState(); // always valid
703       previous = createSeekerState(); // may not be valid
704     }
705
706     @Override
707     public int compareKey(CellComparator comparator, Cell key) {
708       keyOnlyKV.setKey(current.keyBuffer, 0, current.keyLength);
709       return comparator.compareKeyIgnoresMvcc(key, keyOnlyKV);
710     }
711
712     @Override
713     public void setCurrentBuffer(ByteBuff buffer) {
714       if (this.tagCompressionContext != null) {
715         this.tagCompressionContext.clear();
716       }
717       currentBuffer = buffer;
718       current.currentBuffer = currentBuffer;
719       if(tagCompressionContext != null) {
720         current.tagCompressionContext = tagCompressionContext;
721       }
722       decodeFirst();
723       current.setKey(current.keyBuffer, current.memstoreTS);
724       previous.invalidate();
725     }
726
727     @Override
728     public Cell getKey() {
729       byte[] key = new byte[current.keyLength];
730       System.arraycopy(current.keyBuffer, 0, key, 0, current.keyLength);
731       return new KeyValue.KeyOnlyKeyValue(key);
732     }
733
734     @Override
735     public ByteBuffer getValueShallowCopy() {
736       currentBuffer.asSubByteBuffer(current.valueOffset, current.valueLength, tmpPair);
737       ByteBuffer dup = tmpPair.getFirst().duplicate();
738       dup.position(tmpPair.getSecond());
739       dup.limit(tmpPair.getSecond() + current.valueLength);
740       return dup.slice();
741     }
742
743     @Override
744     public Cell getCell() {
745       return current.toCell();
746     }
747
748     @Override
749     public void rewind() {
750       currentBuffer.rewind();
751       if (tagCompressionContext != null) {
752         tagCompressionContext.clear();
753       }
754       decodeFirst();
755       current.setKey(current.keyBuffer, current.memstoreTS);
756       previous.invalidate();
757     }
758
759     @Override
760     public boolean next() {
761       if (!currentBuffer.hasRemaining()) {
762         return false;
763       }
764       decodeNext();
765       current.setKey(current.keyBuffer, current.memstoreTS);
766       previous.invalidate();
767       return true;
768     }
769
770     protected void decodeTags() {
771       current.tagsLength = ByteBuff.readCompressedInt(currentBuffer);
772       if (tagCompressionContext != null) {
773         if (current.uncompressTags) {
774           // Tag compression is been used. uncompress it into tagsBuffer
775           current.ensureSpaceForTags();
776           try {
777             current.tagsCompressedLength = tagCompressionContext.uncompressTags(currentBuffer,
778                 current.tagsBuffer, 0, current.tagsLength);
779           } catch (IOException e) {
780             throw new RuntimeException("Exception while uncompressing tags", e);
781           }
782         } else {
783           currentBuffer.skip(current.tagsCompressedLength);
784           current.uncompressTags = true;// Reset this.
785         }
786         current.tagsOffset = -1;
787       } else {
788         // When tag compress is not used, let us not do copying of tags bytes into tagsBuffer.
789         // Just mark the tags Offset so as to create the KV buffer later in getKeyValueBuffer()
790         current.tagsOffset = currentBuffer.position();
791         currentBuffer.skip(current.tagsLength);
792       }
793     }
794
795     @Override
796     public int seekToKeyInBlock(Cell seekCell, boolean seekBefore) {
797       int rowCommonPrefix = 0;
798       int familyCommonPrefix = 0;
799       int qualCommonPrefix = 0;
800       previous.invalidate();
801       do {
802         int comp;
803         keyOnlyKV.setKey(current.keyBuffer, 0, current.keyLength);
804         if (current.lastCommonPrefix != 0) {
805           // The KV format has row key length also in the byte array. The
806           // common prefix
807           // includes it. So we need to subtract to find out the common prefix
808           // in the
809           // row part alone
810           rowCommonPrefix = Math.min(rowCommonPrefix, current.lastCommonPrefix - 2);
811         }
812         if (current.lastCommonPrefix <= 2) {
813           rowCommonPrefix = 0;
814         }
815         rowCommonPrefix += findCommonPrefixInRowPart(seekCell, keyOnlyKV, rowCommonPrefix);
816         comp = compareCommonRowPrefix(seekCell, keyOnlyKV, rowCommonPrefix);
817         if (comp == 0) {
818           comp = compareTypeBytes(seekCell, keyOnlyKV);
819           if (comp == 0) {
820             // Subtract the fixed row key length and the family key fixed length
821             familyCommonPrefix = Math.max(
822                 0,
823                 Math.min(familyCommonPrefix,
824                     current.lastCommonPrefix - (3 + keyOnlyKV.getRowLength())));
825             familyCommonPrefix += findCommonPrefixInFamilyPart(seekCell, keyOnlyKV,
826                 familyCommonPrefix);
827             comp = compareCommonFamilyPrefix(seekCell, keyOnlyKV, familyCommonPrefix);
828             if (comp == 0) {
829               // subtract the rowkey fixed length and the family key fixed
830               // length
831               qualCommonPrefix = Math.max(
832                   0,
833                   Math.min(
834                       qualCommonPrefix,
835                       current.lastCommonPrefix
836                           - (3 + keyOnlyKV.getRowLength() + keyOnlyKV.getFamilyLength())));
837               qualCommonPrefix += findCommonPrefixInQualifierPart(seekCell, keyOnlyKV,
838                   qualCommonPrefix);
839               comp = compareCommonQualifierPrefix(seekCell, keyOnlyKV, qualCommonPrefix);
840               if (comp == 0) {
841                 comp = CellComparator.compareTimestamps(seekCell, keyOnlyKV);
842                 if (comp == 0) {
843                   // Compare types. Let the delete types sort ahead of puts;
844                   // i.e. types
845                   // of higher numbers sort before those of lesser numbers.
846                   // Maximum
847                   // (255)
848                   // appears ahead of everything, and minimum (0) appears
849                   // after
850                   // everything.
851                   comp = (0xff & keyOnlyKV.getTypeByte()) - (0xff & seekCell.getTypeByte());
852                 }
853               }
854             }
855           }
856         }
857         if (comp == 0) { // exact match
858           if (seekBefore) {
859             if (!previous.isValid()) {
860               // The caller (seekBefore) has to ensure that we are not at the
861               // first key in the block.
862               throw new IllegalStateException("Cannot seekBefore if "
863                   + "positioned at the first key in the block: key="
864                   + Bytes.toStringBinary(seekCell.getRowArray()));
865             }
866             moveToPrevious();
867             return 1;
868           }
869           return 0;
870         }
871
872         if (comp < 0) { // already too large, check previous
873           if (previous.isValid()) {
874             moveToPrevious();
875           } else {
876             return HConstants.INDEX_KEY_MAGIC; // using optimized index key
877           }
878           return 1;
879         }
880
881         // move to next, if more data is available
882         if (currentBuffer.hasRemaining()) {
883           previous.copyFromNext(current);
884           decodeNext();
885           current.setKey(current.keyBuffer, current.memstoreTS);
886         } else {
887           break;
888         }
889       } while (true);
890
891       // we hit the end of the block, not an exact match
892       return 1;
893     }
894
895     private int compareTypeBytes(Cell key, Cell right) {
896       if (key.getFamilyLength() + key.getQualifierLength() == 0
897           && key.getTypeByte() == Type.Minimum.getCode()) {
898         // left is "bigger", i.e. it appears later in the sorted order
899         return 1;
900       }
901       if (right.getFamilyLength() + right.getQualifierLength() == 0
902           && right.getTypeByte() == Type.Minimum.getCode()) {
903         return -1;
904       }
905       return 0;
906     }
907
908     private static int findCommonPrefixInRowPart(Cell left, Cell right, int rowCommonPrefix) {
909       return Bytes.findCommonPrefix(left.getRowArray(), right.getRowArray(), left.getRowLength()
910           - rowCommonPrefix, right.getRowLength() - rowCommonPrefix, left.getRowOffset()
911           + rowCommonPrefix, right.getRowOffset() + rowCommonPrefix);
912     }
913
914     private static int findCommonPrefixInFamilyPart(Cell left, Cell right, int familyCommonPrefix) {
915       return Bytes
916           .findCommonPrefix(left.getFamilyArray(), right.getFamilyArray(), left.getFamilyLength()
917               - familyCommonPrefix, right.getFamilyLength() - familyCommonPrefix,
918               left.getFamilyOffset() + familyCommonPrefix, right.getFamilyOffset()
919                   + familyCommonPrefix);
920     }
921
922     private static int findCommonPrefixInQualifierPart(Cell left, Cell right,
923         int qualifierCommonPrefix) {
924       return Bytes.findCommonPrefix(left.getQualifierArray(), right.getQualifierArray(),
925           left.getQualifierLength() - qualifierCommonPrefix, right.getQualifierLength()
926               - qualifierCommonPrefix, left.getQualifierOffset() + qualifierCommonPrefix,
927           right.getQualifierOffset() + qualifierCommonPrefix);
928     }
929
930     private void moveToPrevious() {
931       if (!previous.isValid()) {
932         throw new IllegalStateException(
933             "Can move back only once and not in first key in the block.");
934       }
935
936       STATE tmp = previous;
937       previous = current;
938       current = tmp;
939
940       // move after last key value
941       currentBuffer.position(current.nextKvOffset);
942       // Already decoded the tag bytes. We cache this tags into current state and also the total
943       // compressed length of the tags bytes. For the next time decodeNext() we don't need to decode
944       // the tags again. This might pollute the Data Dictionary what we use for the compression.
945       // When current.uncompressTags is false, we will just reuse the current.tagsBuffer and skip
946       // 'tagsCompressedLength' bytes of source stream.
947       // See in decodeTags()
948       current.tagsBuffer = previous.tagsBuffer;
949       current.tagsCompressedLength = previous.tagsCompressedLength;
950       current.uncompressTags = false;
951       // The current key has to be reset with the previous Cell
952       current.setKey(current.keyBuffer, current.memstoreTS);
953       previous.invalidate();
954     }
955
956     @SuppressWarnings("unchecked")
957     protected STATE createSeekerState() {
958       // This will fail for non-default seeker state if the subclass does not
959       // override this method.
960       return (STATE) new SeekerState(this.tmpPair, this.includesTags());
961     }
962
963     abstract protected void decodeFirst();
964     abstract protected void decodeNext();
965   }
966
967   /**
968    * @param cell
969    * @param out
970    * @param encodingCtx
971    * @return unencoded size added
972    * @throws IOException
973    */
974   protected final int afterEncodingKeyValue(Cell cell, DataOutputStream out,
975       HFileBlockDefaultEncodingContext encodingCtx) throws IOException {
976     int size = 0;
977     if (encodingCtx.getHFileContext().isIncludesTags()) {
978       int tagsLength = cell.getTagsLength();
979       ByteBufferUtils.putCompressedInt(out, tagsLength);
980       // There are some tags to be written
981       if (tagsLength > 0) {
982         TagCompressionContext tagCompressionContext = encodingCtx.getTagCompressionContext();
983         // When tag compression is enabled, tagCompressionContext will have a not null value. Write
984         // the tags using Dictionary compression in such a case
985         if (tagCompressionContext != null) {
986           // Not passing tagsLength considering that parsing of the tagsLength is not costly
987           CellUtil.compressTags(out, cell, tagCompressionContext);
988         } else {
989           CellUtil.writeTags(out, cell, tagsLength);
990         }
991       }
992       size += tagsLength + KeyValue.TAGS_LENGTH_SIZE;
993     }
994     if (encodingCtx.getHFileContext().isIncludesMvcc()) {
995       // Copy memstore timestamp from the byte buffer to the output stream.
996       long memstoreTS = cell.getSequenceId();
997       WritableUtils.writeVLong(out, memstoreTS);
998       // TODO use a writeVLong which returns the #bytes written so that 2 time parsing can be
999       // avoided.
1000       size += WritableUtils.getVIntSize(memstoreTS);
1001     }
1002     return size;
1003   }
1004
1005   protected final void afterDecodingKeyValue(DataInputStream source,
1006       ByteBuffer dest, HFileBlockDefaultDecodingContext decodingCtx) throws IOException {
1007     if (decodingCtx.getHFileContext().isIncludesTags()) {
1008       int tagsLength = ByteBufferUtils.readCompressedInt(source);
1009       // Put as unsigned short
1010       dest.put((byte) ((tagsLength >> 8) & 0xff));
1011       dest.put((byte) (tagsLength & 0xff));
1012       if (tagsLength > 0) {
1013         TagCompressionContext tagCompressionContext = decodingCtx.getTagCompressionContext();
1014         // When tag compression is been used in this file, tagCompressionContext will have a not
1015         // null value passed.
1016         if (tagCompressionContext != null) {
1017           tagCompressionContext.uncompressTags(source, dest, tagsLength);
1018         } else {
1019           ByteBufferUtils.copyFromStreamToBuffer(dest, source, tagsLength);
1020         }
1021       }
1022     }
1023     if (decodingCtx.getHFileContext().isIncludesMvcc()) {
1024       long memstoreTS = -1;
1025       try {
1026         // Copy memstore timestamp from the data input stream to the byte
1027         // buffer.
1028         memstoreTS = WritableUtils.readVLong(source);
1029         ByteBufferUtils.writeVLong(dest, memstoreTS);
1030       } catch (IOException ex) {
1031         throw new RuntimeException("Unable to copy memstore timestamp " +
1032             memstoreTS + " after decoding a key/value");
1033       }
1034     }
1035   }
1036
1037   protected abstract ByteBuffer internalDecodeKeyValues(DataInputStream source,
1038       int allocateHeaderLength, int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx)
1039       throws IOException;
1040
1041   /**
1042    * Asserts that there is at least the given amount of unfilled space
1043    * remaining in the given buffer.
1044    * @param out typically, the buffer we are writing to
1045    * @param length the required space in the buffer
1046    * @throws EncoderBufferTooSmallException If there are no enough bytes.
1047    */
1048   protected static void ensureSpace(ByteBuffer out, int length)
1049       throws EncoderBufferTooSmallException {
1050     if (out.position() + length > out.limit()) {
1051       throw new EncoderBufferTooSmallException(
1052           "Buffer position=" + out.position() +
1053           ", buffer limit=" + out.limit() +
1054           ", length to be written=" + length);
1055     }
1056   }
1057
1058   @Override
1059   public void startBlockEncoding(HFileBlockEncodingContext blkEncodingCtx, DataOutputStream out)
1060       throws IOException {
1061     if (blkEncodingCtx.getClass() != HFileBlockDefaultEncodingContext.class) {
1062       throw new IOException (this.getClass().getName() + " only accepts "
1063           + HFileBlockDefaultEncodingContext.class.getName() + " as the " +
1064           "encoding context.");
1065     }
1066
1067     HFileBlockDefaultEncodingContext encodingCtx =
1068         (HFileBlockDefaultEncodingContext) blkEncodingCtx;
1069     encodingCtx.prepareEncoding(out);
1070     if (encodingCtx.getHFileContext().isIncludesTags()
1071         && encodingCtx.getHFileContext().isCompressTags()) {
1072       if (encodingCtx.getTagCompressionContext() != null) {
1073         // It will be overhead to create the TagCompressionContext again and again for every block
1074         // encoding.
1075         encodingCtx.getTagCompressionContext().clear();
1076       } else {
1077         try {
1078           TagCompressionContext tagCompressionContext = new TagCompressionContext(
1079               LRUDictionary.class, Byte.MAX_VALUE);
1080           encodingCtx.setTagCompressionContext(tagCompressionContext);
1081         } catch (Exception e) {
1082           throw new IOException("Failed to initialize TagCompressionContext", e);
1083         }
1084       }
1085     }
1086     StreamUtils.writeInt(out, 0); // DUMMY length. This will be updated in endBlockEncoding()
1087     blkEncodingCtx.setEncodingState(new BufferedDataBlockEncodingState());
1088   }
1089
1090   private static class BufferedDataBlockEncodingState extends EncodingState {
1091     int unencodedDataSizeWritten = 0;
1092   }
1093
1094   @Override
1095   public int encode(Cell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
1096       throws IOException {
1097     BufferedDataBlockEncodingState state = (BufferedDataBlockEncodingState) encodingCtx
1098         .getEncodingState();
1099     int encodedKvSize = internalEncode(cell, (HFileBlockDefaultEncodingContext) encodingCtx, out);
1100     state.unencodedDataSizeWritten += encodedKvSize;
1101     return encodedKvSize;
1102   }
1103
1104   public abstract int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingCtx,
1105       DataOutputStream out) throws IOException;
1106
1107   @Override
1108   public void endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out,
1109       byte[] uncompressedBytesWithHeader) throws IOException {
1110     BufferedDataBlockEncodingState state = (BufferedDataBlockEncodingState) encodingCtx
1111         .getEncodingState();
1112     // Write the unencodedDataSizeWritten (with header size)
1113     Bytes.putInt(uncompressedBytesWithHeader,
1114       HConstants.HFILEBLOCK_HEADER_SIZE + DataBlockEncoding.ID_SIZE, state.unencodedDataSizeWritten
1115         );
1116     postEncoding(encodingCtx);
1117   }
1118
1119 }