View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations
15   * under the License.
16   */
17  package org.apache.hadoop.hbase.io.encoding;
18  
19  import java.io.DataInputStream;
20  import java.io.DataOutputStream;
21  import java.io.IOException;
22  import java.io.OutputStream;
23  import java.nio.ByteBuffer;
24  
25  import org.apache.hadoop.hbase.ByteBufferedCell;
26  import org.apache.hadoop.hbase.Cell;
27  import org.apache.hadoop.hbase.CellComparator;
28  import org.apache.hadoop.hbase.CellUtil;
29  import org.apache.hadoop.hbase.HConstants;
30  import org.apache.hadoop.hbase.KeyValue;
31  import org.apache.hadoop.hbase.ByteBufferedKeyOnlyKeyValue;
32  import org.apache.hadoop.hbase.KeyValue.Type;
33  import org.apache.hadoop.hbase.KeyValueUtil;
34  import org.apache.hadoop.hbase.SettableSequenceId;
35  import org.apache.hadoop.hbase.Streamable;
36  import org.apache.hadoop.hbase.classification.InterfaceAudience;
37  import org.apache.hadoop.hbase.io.HeapSize;
38  import org.apache.hadoop.hbase.io.TagCompressionContext;
39  import org.apache.hadoop.hbase.io.hfile.BlockType;
40  import org.apache.hadoop.hbase.io.hfile.HFileContext;
41  import org.apache.hadoop.hbase.io.util.LRUDictionary;
42  import org.apache.hadoop.hbase.io.util.StreamUtils;
43  import org.apache.hadoop.hbase.nio.ByteBuff;
44  import org.apache.hadoop.hbase.util.ByteBufferUtils;
45  import org.apache.hadoop.hbase.util.Bytes;
46  import org.apache.hadoop.hbase.util.ClassSize;
47  import org.apache.hadoop.hbase.util.ObjectIntPair;
48  import org.apache.hadoop.io.WritableUtils;
49  
50  /**
51   * Base class for all data block encoders that use a buffer.
52   */
53  @InterfaceAudience.Private
54  abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
55  
56    private static int INITIAL_KEY_BUFFER_SIZE = 512;
57  
58    @Override
59    public ByteBuffer decodeKeyValues(DataInputStream source,
60        HFileBlockDecodingContext blkDecodingCtx) throws IOException {
61      if (blkDecodingCtx.getClass() != HFileBlockDefaultDecodingContext.class) {
62        throw new IOException(this.getClass().getName() + " only accepts "
63            + HFileBlockDefaultDecodingContext.class.getName() + " as the decoding context.");
64      }
65  
66      HFileBlockDefaultDecodingContext decodingCtx =
67          (HFileBlockDefaultDecodingContext) blkDecodingCtx;
68      if (decodingCtx.getHFileContext().isIncludesTags()
69          && decodingCtx.getHFileContext().isCompressTags()) {
70        if (decodingCtx.getTagCompressionContext() != null) {
71          // It will be overhead to create the TagCompressionContext again and again for every block
72          // decoding.
73          decodingCtx.getTagCompressionContext().clear();
74        } else {
75          try {
76            TagCompressionContext tagCompressionContext = new TagCompressionContext(
77                LRUDictionary.class, Byte.MAX_VALUE);
78            decodingCtx.setTagCompressionContext(tagCompressionContext);
79          } catch (Exception e) {
80            throw new IOException("Failed to initialize TagCompressionContext", e);
81          }
82        }
83      }
84      return internalDecodeKeyValues(source, 0, 0, decodingCtx);
85    }
86  
87    /********************* common prefixes *************************/
88    // Having this as static is fine but if META is having DBE then we should
89    // change this.
90    public static int compareCommonRowPrefix(Cell left, Cell right, int rowCommonPrefix) {
91      return Bytes.compareTo(left.getRowArray(), left.getRowOffset() + rowCommonPrefix,
92          left.getRowLength() - rowCommonPrefix, right.getRowArray(), right.getRowOffset()
93              + rowCommonPrefix, right.getRowLength() - rowCommonPrefix);
94    }
95  
96    public static int compareCommonFamilyPrefix(Cell left, Cell right, int familyCommonPrefix) {
97      return Bytes.compareTo(left.getFamilyArray(), left.getFamilyOffset() + familyCommonPrefix,
98          left.getFamilyLength() - familyCommonPrefix, right.getFamilyArray(),
99          right.getFamilyOffset() + familyCommonPrefix, right.getFamilyLength() - familyCommonPrefix);
100   }
101 
102   public static int compareCommonQualifierPrefix(Cell left, Cell right, int qualCommonPrefix) {
103     return Bytes.compareTo(left.getQualifierArray(), left.getQualifierOffset() + qualCommonPrefix,
104         left.getQualifierLength() - qualCommonPrefix, right.getQualifierArray(),
105         right.getQualifierOffset() + qualCommonPrefix, right.getQualifierLength()
106             - qualCommonPrefix);
107   }
108 
109   protected static class SeekerState {
110     protected ByteBuff currentBuffer;
111     protected TagCompressionContext tagCompressionContext;
112     protected int valueOffset = -1;
113     protected int keyLength;
114     protected int valueLength;
115     protected int lastCommonPrefix;
116     protected int tagsLength = 0;
117     protected int tagsOffset = -1;
118     protected int tagsCompressedLength = 0;
119     protected boolean uncompressTags = true;
120 
121     /** We need to store a copy of the key. */
122     protected byte[] keyBuffer = new byte[INITIAL_KEY_BUFFER_SIZE];
123     protected byte[] tagsBuffer = new byte[INITIAL_KEY_BUFFER_SIZE];
124 
125     protected long memstoreTS;
126     protected int nextKvOffset;
127     protected KeyValue.KeyOnlyKeyValue currentKey = new KeyValue.KeyOnlyKeyValue();
128     // A temp pair object which will be reused by ByteBuff#asSubByteBuffer calls. This avoids too
129     // many object creations.
130     private final ObjectIntPair<ByteBuffer> tmpPair;
131     private final boolean includeTags;
132 
133     public SeekerState(ObjectIntPair<ByteBuffer> tmpPair, boolean includeTags) {
134       this.tmpPair = tmpPair;
135       this.includeTags = includeTags;
136     }
137 
138     protected boolean isValid() {
139       return valueOffset != -1;
140     }
141 
142     protected void invalidate() {
143       valueOffset = -1;
144       tagsCompressedLength = 0;
145       currentKey = new KeyValue.KeyOnlyKeyValue();
146       uncompressTags = true;
147       currentBuffer = null;
148     }
149 
150     protected void ensureSpaceForKey() {
151       if (keyLength > keyBuffer.length) {
152         // rare case, but we need to handle arbitrary length of key
153         int newKeyBufferLength = Math.max(keyBuffer.length, 1) * 2;
154         while (keyLength > newKeyBufferLength) {
155           newKeyBufferLength *= 2;
156         }
157         byte[] newKeyBuffer = new byte[newKeyBufferLength];
158         System.arraycopy(keyBuffer, 0, newKeyBuffer, 0, keyBuffer.length);
159         keyBuffer = newKeyBuffer;
160       }
161     }
162 
163     protected void ensureSpaceForTags() {
164       if (tagsLength > tagsBuffer.length) {
165         // rare case, but we need to handle arbitrary length of tags
166         int newTagsBufferLength = Math.max(tagsBuffer.length, 1) * 2;
167         while (tagsLength > newTagsBufferLength) {
168           newTagsBufferLength *= 2;
169         }
170         byte[] newTagsBuffer = new byte[newTagsBufferLength];
171         System.arraycopy(tagsBuffer, 0, newTagsBuffer, 0, tagsBuffer.length);
172         tagsBuffer = newTagsBuffer;
173       }
174     }
175 
176     protected void setKey(byte[] keyBuffer, long memTS) {
177       currentKey.setKey(keyBuffer, 0, keyLength);
178       memstoreTS = memTS;
179     }
180 
181     /**
182      * Copy the state from the next one into this instance (the previous state
183      * placeholder). Used to save the previous state when we are advancing the
184      * seeker to the next key/value.
185      */
186     protected void copyFromNext(SeekerState nextState) {
187       if (keyBuffer.length != nextState.keyBuffer.length) {
188         keyBuffer = nextState.keyBuffer.clone();
189       } else if (!isValid()) {
190         // Note: we can only call isValid before we override our state, so this
191         // comes before all the assignments at the end of this method.
192         System.arraycopy(nextState.keyBuffer, 0, keyBuffer, 0,
193              nextState.keyLength);
194       } else {
195         // don't copy the common prefix between this key and the previous one
196         System.arraycopy(nextState.keyBuffer, nextState.lastCommonPrefix,
197             keyBuffer, nextState.lastCommonPrefix, nextState.keyLength
198                 - nextState.lastCommonPrefix);
199       }
200       currentKey = nextState.currentKey;
201 
202       valueOffset = nextState.valueOffset;
203       keyLength = nextState.keyLength;
204       valueLength = nextState.valueLength;
205       lastCommonPrefix = nextState.lastCommonPrefix;
206       nextKvOffset = nextState.nextKvOffset;
207       memstoreTS = nextState.memstoreTS;
208       currentBuffer = nextState.currentBuffer;
209       tagsOffset = nextState.tagsOffset;
210       tagsLength = nextState.tagsLength;
211       if (nextState.tagCompressionContext != null) {
212         tagCompressionContext = nextState.tagCompressionContext;
213       }
214     }
215 
216     public Cell toCell() {
217       // Buffer backing the value and tags part from the HFileBlock's buffer
218       // When tag compression in use, this will be only the value bytes area.
219       ByteBuffer valAndTagsBuffer;
220       int vOffset;
221       int valAndTagsLength = this.valueLength;
222       int tagsLenSerializationSize = 0;
223       if (this.includeTags && this.tagCompressionContext == null) {
224         // Include the tags part also. This will be the tags bytes + 2 bytes of for storing tags
225         // length
226         tagsLenSerializationSize = this.tagsOffset - (this.valueOffset + this.valueLength);
227         valAndTagsLength += tagsLenSerializationSize + this.tagsLength;
228       }
229       this.currentBuffer.asSubByteBuffer(this.valueOffset, valAndTagsLength, this.tmpPair);
230       valAndTagsBuffer = this.tmpPair.getFirst();
231       vOffset = this.tmpPair.getSecond();// This is the offset to value part in the BB
232       if (valAndTagsBuffer.hasArray()) {
233         return toOnheapCell(valAndTagsBuffer, vOffset, tagsLenSerializationSize);
234       } else {
235         return toOffheapCell(valAndTagsBuffer, vOffset, tagsLenSerializationSize);
236       }
237     }
238 
239     private Cell toOnheapCell(ByteBuffer valAndTagsBuffer, int vOffset,
240         int tagsLenSerializationSize) {
241       byte[] tagsArray = HConstants.EMPTY_BYTE_ARRAY;
242       int tOffset = 0;
243       if (this.includeTags) {
244         if (this.tagCompressionContext == null) {
245           tagsArray = valAndTagsBuffer.array();
246           tOffset = valAndTagsBuffer.arrayOffset() + vOffset + this.valueLength
247               + tagsLenSerializationSize;
248         } else {
249           tagsArray = Bytes.copy(tagsBuffer, 0, this.tagsLength);
250           tOffset = 0;
251         }
252       }
253       return new OnheapDecodedCell(Bytes.copy(keyBuffer, 0, this.keyLength),
254           currentKey.getRowLength(), currentKey.getFamilyOffset(), currentKey.getFamilyLength(),
255           currentKey.getQualifierOffset(), currentKey.getQualifierLength(),
256           currentKey.getTimestamp(), currentKey.getTypeByte(), valAndTagsBuffer.array(),
257           valAndTagsBuffer.arrayOffset() + vOffset, this.valueLength, memstoreTS, tagsArray,
258           tOffset, this.tagsLength);
259     }
260 
261     private Cell toOffheapCell(ByteBuffer valAndTagsBuffer, int vOffset,
262         int tagsLenSerializationSize) {
263       ByteBuffer tagsBuf =  HConstants.EMPTY_BYTE_BUFFER;
264       int tOffset = 0;
265       if (this.includeTags) {
266         if (this.tagCompressionContext == null) {
267           tagsBuf = valAndTagsBuffer;
268           tOffset = vOffset + this.valueLength + tagsLenSerializationSize;
269         } else {
270           tagsBuf = ByteBuffer.wrap(Bytes.copy(tagsBuffer, 0, this.tagsLength));
271           tOffset = 0;
272         }
273       }
274       return new OffheapDecodedCell(ByteBuffer.wrap(Bytes.copy(keyBuffer, 0, this.keyLength)),
275           currentKey.getRowLength(), currentKey.getFamilyOffset(), currentKey.getFamilyLength(),
276           currentKey.getQualifierOffset(), currentKey.getQualifierLength(),
277           currentKey.getTimestamp(), currentKey.getTypeByte(), valAndTagsBuffer, vOffset,
278           this.valueLength, memstoreTS, tagsBuf, tOffset, this.tagsLength);
279     }
280   }
281 
282   /**
283    * Copies only the key part of the keybuffer by doing a deep copy and passes the
284    * seeker state members for taking a clone.
285    * Note that the value byte[] part is still pointing to the currentBuffer and
286    * represented by the valueOffset and valueLength
287    */
288   // We return this as a Cell to the upper layers of read flow and might try setting a new SeqId
289   // there. So this has to be an instance of SettableSequenceId.
290   protected static class OnheapDecodedCell implements Cell, HeapSize, SettableSequenceId,
291       Streamable {
292     private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
293         + (3 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT)
294         + (Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) + (3 * ClassSize.ARRAY));
295     private byte[] keyOnlyBuffer;
296     private short rowLength;
297     private int familyOffset;
298     private byte familyLength;
299     private int qualifierOffset;
300     private int qualifierLength;
301     private long timestamp;
302     private byte typeByte;
303     private byte[] valueBuffer;
304     private int valueOffset;
305     private int valueLength;
306     private byte[] tagsBuffer;
307     private int tagsOffset;
308     private int tagsLength;
309     private long seqId;
310 
311     protected OnheapDecodedCell(byte[] keyBuffer, short rowLength, int familyOffset,
312         byte familyLength, int qualOffset, int qualLength, long timeStamp, byte typeByte,
313         byte[] valueBuffer, int valueOffset, int valueLen, long seqId, byte[] tagsBuffer,
314         int tagsOffset, int tagsLength) {
315       this.keyOnlyBuffer = keyBuffer;
316       this.rowLength = rowLength;
317       this.familyOffset = familyOffset;
318       this.familyLength = familyLength;
319       this.qualifierOffset = qualOffset;
320       this.qualifierLength = qualLength;
321       this.timestamp = timeStamp;
322       this.typeByte = typeByte;
323       this.valueBuffer = valueBuffer;
324       this.valueOffset = valueOffset;
325       this.valueLength = valueLen;
326       this.tagsBuffer = tagsBuffer;
327       this.tagsOffset = tagsOffset;
328       this.tagsLength = tagsLength;
329       setSequenceId(seqId);
330     }
331 
332     @Override
333     public byte[] getRowArray() {
334       return keyOnlyBuffer;
335     }
336 
337     @Override
338     public byte[] getFamilyArray() {
339       return keyOnlyBuffer;
340     }
341 
342     @Override
343     public byte[] getQualifierArray() {
344       return keyOnlyBuffer;
345     }
346 
347     @Override
348     public int getRowOffset() {
349       return Bytes.SIZEOF_SHORT;
350     }
351 
352     @Override
353     public short getRowLength() {
354       return rowLength;
355     }
356 
357     @Override
358     public int getFamilyOffset() {
359       return familyOffset;
360     }
361 
362     @Override
363     public byte getFamilyLength() {
364       return familyLength;
365     }
366 
367     @Override
368     public int getQualifierOffset() {
369       return qualifierOffset;
370     }
371 
372     @Override
373     public int getQualifierLength() {
374       return qualifierLength;
375     }
376 
377     @Override
378     public long getTimestamp() {
379       return timestamp;
380     }
381 
382     @Override
383     public byte getTypeByte() {
384       return typeByte;
385     }
386 
387     @Override
388     public long getSequenceId() {
389       return seqId;
390     }
391 
392     @Override
393     public byte[] getValueArray() {
394       return this.valueBuffer;
395     }
396 
397     @Override
398     public int getValueOffset() {
399       return valueOffset;
400     }
401 
402     @Override
403     public int getValueLength() {
404       return valueLength;
405     }
406 
407     @Override
408     public byte[] getTagsArray() {
409       return this.tagsBuffer;
410     }
411 
412     @Override
413     public int getTagsOffset() {
414       return this.tagsOffset;
415     }
416 
417     @Override
418     public int getTagsLength() {
419       return tagsLength;
420     }
421 
422     @Override
423     public String toString() {
424       return KeyValue.keyToString(this.keyOnlyBuffer, 0, KeyValueUtil.keyLength(this)) + "/vlen="
425           + getValueLength() + "/seqid=" + seqId;
426     }
427 
428     @Override
429     public void setSequenceId(long seqId) {
430       this.seqId = seqId;
431     }
432 
433     @Override
434     public long heapSize() {
435       return FIXED_OVERHEAD + rowLength + familyLength + qualifierLength + valueLength + tagsLength;
436     }
437 
438     @Override
439     public int write(OutputStream out) throws IOException {
440       return write(out, true);
441     }
442 
443     @Override
444     public int write(OutputStream out, boolean withTags) throws IOException {
445       int lenToWrite = KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength,
446           tagsLength, withTags);
447       ByteBufferUtils.putInt(out, lenToWrite);
448       ByteBufferUtils.putInt(out, keyOnlyBuffer.length);
449       ByteBufferUtils.putInt(out, valueLength);
450       // Write key
451       out.write(keyOnlyBuffer);
452       // Write value
453       out.write(this.valueBuffer, this.valueOffset, this.valueLength);
454       if (withTags) {
455         // 2 bytes tags length followed by tags bytes
456         // tags length is serialized with 2 bytes only(short way) even if the type is int.
457         // As this is non -ve numbers, we save the sign bit. See HBASE-11437
458         out.write((byte) (0xff & (this.tagsLength >> 8)));
459         out.write((byte) (0xff & this.tagsLength));
460         out.write(this.tagsBuffer, this.tagsOffset, this.tagsLength);
461       }
462       return lenToWrite + Bytes.SIZEOF_INT;
463     }
464   }
465 
466   protected static class OffheapDecodedCell extends ByteBufferedCell implements HeapSize,
467       SettableSequenceId, Streamable {
468     private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
469         + (3 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT)
470         + (Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) + (3 * ClassSize.BYTE_BUFFER));
471     private ByteBuffer keyBuffer;
472     private short rowLength;
473     private int familyOffset;
474     private byte familyLength;
475     private int qualifierOffset;
476     private int qualifierLength;
477     private long timestamp;
478     private byte typeByte;
479     private ByteBuffer valueBuffer;
480     private int valueOffset;
481     private int valueLength;
482     private ByteBuffer tagsBuffer;
483     private int tagsOffset;
484     private int tagsLength;
485     private long seqId;
486 
487     protected OffheapDecodedCell(ByteBuffer keyBuffer, short rowLength, int familyOffset,
488         byte familyLength, int qualOffset, int qualLength, long timeStamp, byte typeByte,
489         ByteBuffer valueBuffer, int valueOffset, int valueLen, long seqId, ByteBuffer tagsBuffer,
490         int tagsOffset, int tagsLength) {
491       // The keyBuffer is always onheap
492       assert keyBuffer.hasArray();
493       assert keyBuffer.arrayOffset() == 0;
494       this.keyBuffer = keyBuffer;
495       this.rowLength = rowLength;
496       this.familyOffset = familyOffset;
497       this.familyLength = familyLength;
498       this.qualifierOffset = qualOffset;
499       this.qualifierLength = qualLength;
500       this.timestamp = timeStamp;
501       this.typeByte = typeByte;
502       this.valueBuffer = valueBuffer;
503       this.valueOffset = valueOffset;
504       this.valueLength = valueLen;
505       this.tagsBuffer = tagsBuffer;
506       this.tagsOffset = tagsOffset;
507       this.tagsLength = tagsLength;
508       setSequenceId(seqId);
509     }
510 
511     @Override
512     public byte[] getRowArray() {
513       return this.keyBuffer.array();
514     }
515 
516     @Override
517     public int getRowOffset() {
518       return getRowPosition();
519     }
520 
521     @Override
522     public short getRowLength() {
523       return this.rowLength;
524     }
525 
526     @Override
527     public byte[] getFamilyArray() {
528       return this.keyBuffer.array();
529     }
530 
531     @Override
532     public int getFamilyOffset() {
533       return getFamilyPosition();
534     }
535 
536     @Override
537     public byte getFamilyLength() {
538       return this.familyLength;
539     }
540 
541     @Override
542     public byte[] getQualifierArray() {
543       return this.keyBuffer.array();
544     }
545 
546     @Override
547     public int getQualifierOffset() {
548       return getQualifierPosition();
549     }
550 
551     @Override
552     public int getQualifierLength() {
553       return this.qualifierLength;
554     }
555 
556     @Override
557     public long getTimestamp() {
558       return this.timestamp;
559     }
560 
561     @Override
562     public byte getTypeByte() {
563       return this.typeByte;
564     }
565 
566     @Override
567     public long getSequenceId() {
568       return this.seqId;
569     }
570 
571     @Override
572     public byte[] getValueArray() {
573       return CellUtil.cloneValue(this);
574     }
575 
576     @Override
577     public int getValueOffset() {
578       return 0;
579     }
580 
581     @Override
582     public int getValueLength() {
583       return this.valueLength;
584     }
585 
586     @Override
587     public byte[] getTagsArray() {
588       return CellUtil.cloneTags(this);
589     }
590 
591     @Override
592     public int getTagsOffset() {
593       return 0;
594     }
595 
596     @Override
597     public int getTagsLength() {
598       return this.tagsLength;
599     }
600 
601     @Override
602     public ByteBuffer getRowByteBuffer() {
603       return this.keyBuffer;
604     }
605 
606     @Override
607     public int getRowPosition() {
608       return Bytes.SIZEOF_SHORT;
609     }
610 
611     @Override
612     public ByteBuffer getFamilyByteBuffer() {
613       return this.keyBuffer;
614     }
615 
616     @Override
617     public int getFamilyPosition() {
618       return this.familyOffset;
619     }
620 
621     @Override
622     public ByteBuffer getQualifierByteBuffer() {
623       return this.keyBuffer;
624     }
625 
626     @Override
627     public int getQualifierPosition() {
628       return this.qualifierOffset;
629     }
630 
631     @Override
632     public ByteBuffer getValueByteBuffer() {
633       return this.valueBuffer;
634     }
635 
636     @Override
637     public int getValuePosition() {
638       return this.valueOffset;
639     }
640 
641     @Override
642     public ByteBuffer getTagsByteBuffer() {
643       return this.tagsBuffer;
644     }
645 
646     @Override
647     public int getTagsPosition() {
648       return this.tagsOffset;
649     }
650 
651     @Override
652     public long heapSize() {
653       return FIXED_OVERHEAD + rowLength + familyLength + qualifierLength + valueLength + tagsLength;
654     }
655 
656     @Override
657     public void setSequenceId(long seqId) {
658       this.seqId = seqId;
659     }
660 
661     @Override
662     public int write(OutputStream out) throws IOException {
663       return write(out, true);
664     }
665 
666     @Override
667     public int write(OutputStream out, boolean withTags) throws IOException {
668       int lenToWrite = KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength,
669           tagsLength, withTags);
670       ByteBufferUtils.putInt(out, lenToWrite);
671       ByteBufferUtils.putInt(out, keyBuffer.capacity());
672       ByteBufferUtils.putInt(out, valueLength);
673       // Write key
674       out.write(keyBuffer.array());
675       // Write value
676       ByteBufferUtils.copyBufferToStream(out, this.valueBuffer, this.valueOffset, this.valueLength);
677       if (withTags) {
678         // 2 bytes tags length followed by tags bytes
679         // tags length is serialized with 2 bytes only(short way) even if the type is int.
680         // As this is non -ve numbers, we save the sign bit. See HBASE-11437
681         out.write((byte) (0xff & (this.tagsLength >> 8)));
682         out.write((byte) (0xff & this.tagsLength));
683         ByteBufferUtils.copyBufferToStream(out, this.tagsBuffer, this.tagsOffset, this.tagsLength);
684       }
685       return lenToWrite + Bytes.SIZEOF_INT;
686     }
687   }
688 
689   protected abstract static class
690       BufferedEncodedSeeker<STATE extends SeekerState>
691       implements EncodedSeeker {
692     protected HFileBlockDecodingContext decodingCtx;
693     protected final CellComparator comparator;
694     protected ByteBuff currentBuffer;
695     protected TagCompressionContext tagCompressionContext = null;
696     protected  KeyValue.KeyOnlyKeyValue keyOnlyKV = new KeyValue.KeyOnlyKeyValue();
697     // A temp pair object which will be reused by ByteBuff#asSubByteBuffer calls. This avoids too
698     // many object creations.
699     protected final ObjectIntPair<ByteBuffer> tmpPair = new ObjectIntPair<ByteBuffer>();
700     protected STATE current, previous;
701 
702     public BufferedEncodedSeeker(CellComparator comparator,
703         HFileBlockDecodingContext decodingCtx) {
704       this.comparator = comparator;
705       this.decodingCtx = decodingCtx;
706       if (decodingCtx.getHFileContext().isCompressTags()) {
707         try {
708           tagCompressionContext = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
709         } catch (Exception e) {
710           throw new RuntimeException("Failed to initialize TagCompressionContext", e);
711         }
712       }
713       current = createSeekerState(); // always valid
714       previous = createSeekerState(); // may not be valid
715     }
716 
717     protected boolean includesMvcc() {
718       return this.decodingCtx.getHFileContext().isIncludesMvcc();
719     }
720 
721     protected boolean includesTags() {
722       return this.decodingCtx.getHFileContext().isIncludesTags();
723     }
724 
725     @Override
726     public int compareKey(CellComparator comparator, Cell key) {
727       keyOnlyKV.setKey(current.keyBuffer, 0, current.keyLength);
728       return comparator.compareKeyIgnoresMvcc(key, keyOnlyKV);
729     }
730 
731     @Override
732     public void setCurrentBuffer(ByteBuff buffer) {
733       if (this.tagCompressionContext != null) {
734         this.tagCompressionContext.clear();
735       }
736       currentBuffer = buffer;
737       current.currentBuffer = currentBuffer;
738       if(tagCompressionContext != null) {
739         current.tagCompressionContext = tagCompressionContext;
740       }
741       decodeFirst();
742       current.setKey(current.keyBuffer, current.memstoreTS);
743       previous.invalidate();
744     }
745 
746     @Override
747     public Cell getKey() {
748       byte[] key = new byte[current.keyLength];
749       System.arraycopy(current.keyBuffer, 0, key, 0, current.keyLength);
750       return new KeyValue.KeyOnlyKeyValue(key);
751     }
752 
753     @Override
754     public ByteBuffer getValueShallowCopy() {
755       currentBuffer.asSubByteBuffer(current.valueOffset, current.valueLength, tmpPair);
756       ByteBuffer dup = tmpPair.getFirst().duplicate();
757       dup.position(tmpPair.getSecond());
758       dup.limit(tmpPair.getSecond() + current.valueLength);
759       return dup.slice();
760     }
761 
762     @Override
763     public Cell getCell() {
764       return current.toCell();
765     }
766 
767     @Override
768     public void rewind() {
769       currentBuffer.rewind();
770       if (tagCompressionContext != null) {
771         tagCompressionContext.clear();
772       }
773       decodeFirst();
774       current.setKey(current.keyBuffer, current.memstoreTS);
775       previous.invalidate();
776     }
777 
778     @Override
779     public boolean next() {
780       if (!currentBuffer.hasRemaining()) {
781         return false;
782       }
783       decodeNext();
784       current.setKey(current.keyBuffer, current.memstoreTS);
785       previous.invalidate();
786       return true;
787     }
788 
789     protected void decodeTags() {
790       current.tagsLength = ByteBuff.readCompressedInt(currentBuffer);
791       if (tagCompressionContext != null) {
792         if (current.uncompressTags) {
793           // Tag compression is been used. uncompress it into tagsBuffer
794           current.ensureSpaceForTags();
795           try {
796             current.tagsCompressedLength = tagCompressionContext.uncompressTags(currentBuffer,
797                 current.tagsBuffer, 0, current.tagsLength);
798           } catch (IOException e) {
799             throw new RuntimeException("Exception while uncompressing tags", e);
800           }
801         } else {
802           currentBuffer.skip(current.tagsCompressedLength);
803           current.uncompressTags = true;// Reset this.
804         }
805         current.tagsOffset = -1;
806       } else {
807         // When tag compress is not used, let us not do copying of tags bytes into tagsBuffer.
808         // Just mark the tags Offset so as to create the KV buffer later in getKeyValueBuffer()
809         current.tagsOffset = currentBuffer.position();
810         currentBuffer.skip(current.tagsLength);
811       }
812     }
813 
814     @Override
815     public int seekToKeyInBlock(Cell seekCell, boolean seekBefore) {
816       int rowCommonPrefix = 0;
817       int familyCommonPrefix = 0;
818       int qualCommonPrefix = 0;
819       previous.invalidate();
820       do {
821         int comp;
822         keyOnlyKV.setKey(current.keyBuffer, 0, current.keyLength);
823         if (current.lastCommonPrefix != 0) {
824           // The KV format has row key length also in the byte array. The
825           // common prefix
826           // includes it. So we need to subtract to find out the common prefix
827           // in the
828           // row part alone
829           rowCommonPrefix = Math.min(rowCommonPrefix, current.lastCommonPrefix - 2);
830         }
831         if (current.lastCommonPrefix <= 2) {
832           rowCommonPrefix = 0;
833         }
834         rowCommonPrefix += findCommonPrefixInRowPart(seekCell, keyOnlyKV, rowCommonPrefix);
835         comp = compareCommonRowPrefix(seekCell, keyOnlyKV, rowCommonPrefix);
836         if (comp == 0) {
837           comp = compareTypeBytes(seekCell, keyOnlyKV);
838           if (comp == 0) {
839             // Subtract the fixed row key length and the family key fixed length
840             familyCommonPrefix = Math.max(
841                 0,
842                 Math.min(familyCommonPrefix,
843                     current.lastCommonPrefix - (3 + keyOnlyKV.getRowLength())));
844             familyCommonPrefix += findCommonPrefixInFamilyPart(seekCell, keyOnlyKV,
845                 familyCommonPrefix);
846             comp = compareCommonFamilyPrefix(seekCell, keyOnlyKV, familyCommonPrefix);
847             if (comp == 0) {
848               // subtract the rowkey fixed length and the family key fixed
849               // length
850               qualCommonPrefix = Math.max(
851                   0,
852                   Math.min(
853                       qualCommonPrefix,
854                       current.lastCommonPrefix
855                           - (3 + keyOnlyKV.getRowLength() + keyOnlyKV.getFamilyLength())));
856               qualCommonPrefix += findCommonPrefixInQualifierPart(seekCell, keyOnlyKV,
857                   qualCommonPrefix);
858               comp = compareCommonQualifierPrefix(seekCell, keyOnlyKV, qualCommonPrefix);
859               if (comp == 0) {
860                 comp = CellComparator.compareTimestamps(seekCell, keyOnlyKV);
861                 if (comp == 0) {
862                   // Compare types. Let the delete types sort ahead of puts;
863                   // i.e. types
864                   // of higher numbers sort before those of lesser numbers.
865                   // Maximum
866                   // (255)
867                   // appears ahead of everything, and minimum (0) appears
868                   // after
869                   // everything.
870                   comp = (0xff & keyOnlyKV.getTypeByte()) - (0xff & seekCell.getTypeByte());
871                 }
872               }
873             }
874           }
875         }
876         if (comp == 0) { // exact match
877           if (seekBefore) {
878             if (!previous.isValid()) {
879               // The caller (seekBefore) has to ensure that we are not at the
880               // first key in the block.
881               throw new IllegalStateException("Cannot seekBefore if "
882                   + "positioned at the first key in the block: key="
883                   + Bytes.toStringBinary(seekCell.getRowArray()));
884             }
885             moveToPrevious();
886             return 1;
887           }
888           return 0;
889         }
890 
891         if (comp < 0) { // already too large, check previous
892           if (previous.isValid()) {
893             moveToPrevious();
894           } else {
895             return HConstants.INDEX_KEY_MAGIC; // using optimized index key
896           }
897           return 1;
898         }
899 
900         // move to next, if more data is available
901         if (currentBuffer.hasRemaining()) {
902           previous.copyFromNext(current);
903           decodeNext();
904           current.setKey(current.keyBuffer, current.memstoreTS);
905         } else {
906           break;
907         }
908       } while (true);
909 
910       // we hit the end of the block, not an exact match
911       return 1;
912     }
913 
914     private int compareTypeBytes(Cell key, Cell right) {
915       if (key.getFamilyLength() + key.getQualifierLength() == 0
916           && key.getTypeByte() == Type.Minimum.getCode()) {
917         // left is "bigger", i.e. it appears later in the sorted order
918         return 1;
919       }
920       if (right.getFamilyLength() + right.getQualifierLength() == 0
921           && right.getTypeByte() == Type.Minimum.getCode()) {
922         return -1;
923       }
924       return 0;
925     }
926 
927     private static int findCommonPrefixInRowPart(Cell left, Cell right, int rowCommonPrefix) {
928       return Bytes.findCommonPrefix(left.getRowArray(), right.getRowArray(), left.getRowLength()
929           - rowCommonPrefix, right.getRowLength() - rowCommonPrefix, left.getRowOffset()
930           + rowCommonPrefix, right.getRowOffset() + rowCommonPrefix);
931     }
932 
933     private static int findCommonPrefixInFamilyPart(Cell left, Cell right, int familyCommonPrefix) {
934       return Bytes
935           .findCommonPrefix(left.getFamilyArray(), right.getFamilyArray(), left.getFamilyLength()
936               - familyCommonPrefix, right.getFamilyLength() - familyCommonPrefix,
937               left.getFamilyOffset() + familyCommonPrefix, right.getFamilyOffset()
938                   + familyCommonPrefix);
939     }
940 
941     private static int findCommonPrefixInQualifierPart(Cell left, Cell right,
942         int qualifierCommonPrefix) {
943       return Bytes.findCommonPrefix(left.getQualifierArray(), right.getQualifierArray(),
944           left.getQualifierLength() - qualifierCommonPrefix, right.getQualifierLength()
945               - qualifierCommonPrefix, left.getQualifierOffset() + qualifierCommonPrefix,
946           right.getQualifierOffset() + qualifierCommonPrefix);
947     }
948 
949     private void moveToPrevious() {
950       if (!previous.isValid()) {
951         throw new IllegalStateException(
952             "Can move back only once and not in first key in the block.");
953       }
954 
955       STATE tmp = previous;
956       previous = current;
957       current = tmp;
958 
959       // move after last key value
960       currentBuffer.position(current.nextKvOffset);
961       // Already decoded the tag bytes. We cache this tags into current state and also the total
962       // compressed length of the tags bytes. For the next time decodeNext() we don't need to decode
963       // the tags again. This might pollute the Data Dictionary what we use for the compression.
964       // When current.uncompressTags is false, we will just reuse the current.tagsBuffer and skip
965       // 'tagsCompressedLength' bytes of source stream.
966       // See in decodeTags()
967       current.tagsBuffer = previous.tagsBuffer;
968       current.tagsCompressedLength = previous.tagsCompressedLength;
969       current.uncompressTags = false;
970       // The current key has to be reset with the previous Cell
971       current.setKey(current.keyBuffer, current.memstoreTS);
972       previous.invalidate();
973     }
974 
975     @SuppressWarnings("unchecked")
976     protected STATE createSeekerState() {
977       // This will fail for non-default seeker state if the subclass does not
978       // override this method.
979       return (STATE) new SeekerState(this.tmpPair, this.includesTags());
980     }
981 
982     abstract protected void decodeFirst();
983     abstract protected void decodeNext();
984   }
985 
986   /**
987    * @param cell
988    * @param out
989    * @param encodingCtx
990    * @return unencoded size added
991    * @throws IOException
992    */
993   protected final int afterEncodingKeyValue(Cell cell, DataOutputStream out,
994       HFileBlockDefaultEncodingContext encodingCtx) throws IOException {
995     int size = 0;
996     if (encodingCtx.getHFileContext().isIncludesTags()) {
997       int tagsLength = cell.getTagsLength();
998       ByteBufferUtils.putCompressedInt(out, tagsLength);
999       // There are some tags to be written
1000       if (tagsLength > 0) {
1001         TagCompressionContext tagCompressionContext = encodingCtx.getTagCompressionContext();
1002         // When tag compression is enabled, tagCompressionContext will have a not null value. Write
1003         // the tags using Dictionary compression in such a case
1004         if (tagCompressionContext != null) {
1005           // Not passing tagsLength considering that parsing of the tagsLength is not costly
1006           CellUtil.compressTags(out, cell, tagCompressionContext);
1007         } else {
1008           CellUtil.writeTags(out, cell, tagsLength);
1009         }
1010       }
1011       size += tagsLength + KeyValue.TAGS_LENGTH_SIZE;
1012     }
1013     if (encodingCtx.getHFileContext().isIncludesMvcc()) {
1014       // Copy memstore timestamp from the byte buffer to the output stream.
1015       long memstoreTS = cell.getSequenceId();
1016       WritableUtils.writeVLong(out, memstoreTS);
1017       // TODO use a writeVLong which returns the #bytes written so that 2 time parsing can be
1018       // avoided.
1019       size += WritableUtils.getVIntSize(memstoreTS);
1020     }
1021     return size;
1022   }
1023 
1024   protected final void afterDecodingKeyValue(DataInputStream source,
1025       ByteBuffer dest, HFileBlockDefaultDecodingContext decodingCtx) throws IOException {
1026     if (decodingCtx.getHFileContext().isIncludesTags()) {
1027       int tagsLength = ByteBufferUtils.readCompressedInt(source);
1028       // Put as unsigned short
1029       dest.put((byte) ((tagsLength >> 8) & 0xff));
1030       dest.put((byte) (tagsLength & 0xff));
1031       if (tagsLength > 0) {
1032         TagCompressionContext tagCompressionContext = decodingCtx.getTagCompressionContext();
1033         // When tag compression is been used in this file, tagCompressionContext will have a not
1034         // null value passed.
1035         if (tagCompressionContext != null) {
1036           tagCompressionContext.uncompressTags(source, dest, tagsLength);
1037         } else {
1038           ByteBufferUtils.copyFromStreamToBuffer(dest, source, tagsLength);
1039         }
1040       }
1041     }
1042     if (decodingCtx.getHFileContext().isIncludesMvcc()) {
1043       long memstoreTS = -1;
1044       try {
1045         // Copy memstore timestamp from the data input stream to the byte
1046         // buffer.
1047         memstoreTS = WritableUtils.readVLong(source);
1048         ByteBufferUtils.writeVLong(dest, memstoreTS);
1049       } catch (IOException ex) {
1050         throw new RuntimeException("Unable to copy memstore timestamp " +
1051             memstoreTS + " after decoding a key/value");
1052       }
1053     }
1054   }
1055 
1056   @Override
1057   public HFileBlockEncodingContext newDataBlockEncodingContext(DataBlockEncoding encoding,
1058       byte[] header, HFileContext meta) {
1059     return new HFileBlockDefaultEncodingContext(encoding, header, meta);
1060   }
1061 
1062   @Override
1063   public HFileBlockDecodingContext newDataBlockDecodingContext(HFileContext meta) {
1064     return new HFileBlockDefaultDecodingContext(meta);
1065   }
1066 
1067   protected abstract ByteBuffer internalDecodeKeyValues(DataInputStream source,
1068       int allocateHeaderLength, int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx)
1069       throws IOException;
1070 
1071   /**
1072    * Asserts that there is at least the given amount of unfilled space
1073    * remaining in the given buffer.
1074    * @param out typically, the buffer we are writing to
1075    * @param length the required space in the buffer
1076    * @throws EncoderBufferTooSmallException If there are no enough bytes.
1077    */
1078   protected static void ensureSpace(ByteBuffer out, int length)
1079       throws EncoderBufferTooSmallException {
1080     if (out.position() + length > out.limit()) {
1081       throw new EncoderBufferTooSmallException(
1082           "Buffer position=" + out.position() +
1083           ", buffer limit=" + out.limit() +
1084           ", length to be written=" + length);
1085     }
1086   }
1087 
1088   @Override
1089   public void startBlockEncoding(HFileBlockEncodingContext blkEncodingCtx, DataOutputStream out)
1090       throws IOException {
1091     if (blkEncodingCtx.getClass() != HFileBlockDefaultEncodingContext.class) {
1092       throw new IOException (this.getClass().getName() + " only accepts "
1093           + HFileBlockDefaultEncodingContext.class.getName() + " as the " +
1094           "encoding context.");
1095     }
1096 
1097     HFileBlockDefaultEncodingContext encodingCtx =
1098         (HFileBlockDefaultEncodingContext) blkEncodingCtx;
1099     encodingCtx.prepareEncoding(out);
1100     if (encodingCtx.getHFileContext().isIncludesTags()
1101         && encodingCtx.getHFileContext().isCompressTags()) {
1102       if (encodingCtx.getTagCompressionContext() != null) {
1103         // It will be overhead to create the TagCompressionContext again and again for every block
1104         // encoding.
1105         encodingCtx.getTagCompressionContext().clear();
1106       } else {
1107         try {
1108           TagCompressionContext tagCompressionContext = new TagCompressionContext(
1109               LRUDictionary.class, Byte.MAX_VALUE);
1110           encodingCtx.setTagCompressionContext(tagCompressionContext);
1111         } catch (Exception e) {
1112           throw new IOException("Failed to initialize TagCompressionContext", e);
1113         }
1114       }
1115     }
1116     StreamUtils.writeInt(out, 0); // DUMMY length. This will be updated in endBlockEncoding()
1117     blkEncodingCtx.setEncodingState(new BufferedDataBlockEncodingState());
1118   }
1119 
1120   private static class BufferedDataBlockEncodingState extends EncodingState {
1121     int unencodedDataSizeWritten = 0;
1122   }
1123 
1124   @Override
1125   public int encode(Cell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
1126       throws IOException {
1127     BufferedDataBlockEncodingState state = (BufferedDataBlockEncodingState) encodingCtx
1128         .getEncodingState();
1129     int encodedKvSize = internalEncode(cell, (HFileBlockDefaultEncodingContext) encodingCtx, out);
1130     state.unencodedDataSizeWritten += encodedKvSize;
1131     return encodedKvSize;
1132   }
1133 
1134   public abstract int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingCtx,
1135       DataOutputStream out) throws IOException;
1136 
1137   @Override
1138   public void endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out,
1139       byte[] uncompressedBytesWithHeader) throws IOException {
1140     BufferedDataBlockEncodingState state = (BufferedDataBlockEncodingState) encodingCtx
1141         .getEncodingState();
1142     // Write the unencodedDataSizeWritten (with header size)
1143     Bytes.putInt(uncompressedBytesWithHeader, HConstants.HFILEBLOCK_HEADER_SIZE
1144         + DataBlockEncoding.ID_SIZE, state.unencodedDataSizeWritten
1145         );
1146     if (encodingCtx.getDataBlockEncoding() != DataBlockEncoding.NONE) {
1147       encodingCtx.postEncoding(BlockType.ENCODED_DATA);
1148     } else {
1149       encodingCtx.postEncoding(BlockType.DATA);
1150     }
1151   }
1152 
1153   protected Cell createFirstKeyCell(ByteBuffer key, int keyLength) {
1154     if (key.hasArray()) {
1155       return new KeyValue.KeyOnlyKeyValue(key.array(), key.arrayOffset() + key.position(),
1156           keyLength);
1157     } else {
1158       return new ByteBufferedKeyOnlyKeyValue(key, key.position(), keyLength);
1159     }
1160   }
1161 }