View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations
15   * under the License.
16   */
17  package org.apache.hadoop.hbase.io.encoding;
18  
19  import java.io.DataInputStream;
20  import java.io.DataOutputStream;
21  import java.io.IOException;
22  import java.io.OutputStream;
23  import java.nio.ByteBuffer;
24  
25  import org.apache.hadoop.hbase.ByteBufferedCell;
26  import org.apache.hadoop.hbase.ByteBufferedKeyOnlyKeyValue;
27  import org.apache.hadoop.hbase.Cell;
28  import org.apache.hadoop.hbase.CellComparator;
29  import org.apache.hadoop.hbase.CellUtil;
30  import org.apache.hadoop.hbase.HConstants;
31  import org.apache.hadoop.hbase.KeyValue;
32  import org.apache.hadoop.hbase.KeyValue.Type;
33  import org.apache.hadoop.hbase.KeyValueUtil;
34  import org.apache.hadoop.hbase.SettableSequenceId;
35  import org.apache.hadoop.hbase.Streamable;
36  import org.apache.hadoop.hbase.classification.InterfaceAudience;
37  import org.apache.hadoop.hbase.io.HeapSize;
38  import org.apache.hadoop.hbase.io.TagCompressionContext;
39  import org.apache.hadoop.hbase.io.hfile.BlockType;
40  import org.apache.hadoop.hbase.io.hfile.HFileContext;
41  import org.apache.hadoop.hbase.io.util.LRUDictionary;
42  import org.apache.hadoop.hbase.io.util.StreamUtils;
43  import org.apache.hadoop.hbase.nio.ByteBuff;
44  import org.apache.hadoop.hbase.util.ByteBufferUtils;
45  import org.apache.hadoop.hbase.util.Bytes;
46  import org.apache.hadoop.hbase.util.ClassSize;
47  import org.apache.hadoop.hbase.util.ObjectIntPair;
48  import org.apache.hadoop.io.WritableUtils;
49  
50  /**
51   * Base class for all data block encoders that use a buffer.
52   */
53  @InterfaceAudience.Private
54  abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
55    /**
56     * TODO: This datablockencoder is dealing in internals of hfileblocks. Purge reference to HFBs
57     */
58    private static int INITIAL_KEY_BUFFER_SIZE = 512;
59  
60    @Override
61    public ByteBuffer decodeKeyValues(DataInputStream source,
62        HFileBlockDecodingContext blkDecodingCtx) throws IOException {
63      if (blkDecodingCtx.getClass() != HFileBlockDefaultDecodingContext.class) {
64        throw new IOException(this.getClass().getName() + " only accepts "
65            + HFileBlockDefaultDecodingContext.class.getName() + " as the decoding context.");
66      }
67  
68      HFileBlockDefaultDecodingContext decodingCtx =
69          (HFileBlockDefaultDecodingContext) blkDecodingCtx;
70      if (decodingCtx.getHFileContext().isIncludesTags()
71          && decodingCtx.getHFileContext().isCompressTags()) {
72        if (decodingCtx.getTagCompressionContext() != null) {
73          // It will be overhead to create the TagCompressionContext again and again for every block
74          // decoding.
75          decodingCtx.getTagCompressionContext().clear();
76        } else {
77          try {
78            TagCompressionContext tagCompressionContext = new TagCompressionContext(
79                LRUDictionary.class, Byte.MAX_VALUE);
80            decodingCtx.setTagCompressionContext(tagCompressionContext);
81          } catch (Exception e) {
82            throw new IOException("Failed to initialize TagCompressionContext", e);
83          }
84        }
85      }
86      return internalDecodeKeyValues(source, 0, 0, decodingCtx);
87    }
88  
89    /********************* common prefixes *************************/
90    // Having this as static is fine but if META is having DBE then we should
91    // change this.
92    public static int compareCommonRowPrefix(Cell left, Cell right, int rowCommonPrefix) {
93      return Bytes.compareTo(left.getRowArray(), left.getRowOffset() + rowCommonPrefix,
94          left.getRowLength() - rowCommonPrefix, right.getRowArray(), right.getRowOffset()
95              + rowCommonPrefix, right.getRowLength() - rowCommonPrefix);
96    }
97  
98    public static int compareCommonFamilyPrefix(Cell left, Cell right, int familyCommonPrefix) {
99      return Bytes.compareTo(left.getFamilyArray(), left.getFamilyOffset() + familyCommonPrefix,
100         left.getFamilyLength() - familyCommonPrefix, right.getFamilyArray(),
101         right.getFamilyOffset() + familyCommonPrefix, right.getFamilyLength() - familyCommonPrefix);
102   }
103 
104   public static int compareCommonQualifierPrefix(Cell left, Cell right, int qualCommonPrefix) {
105     return Bytes.compareTo(left.getQualifierArray(), left.getQualifierOffset() + qualCommonPrefix,
106         left.getQualifierLength() - qualCommonPrefix, right.getQualifierArray(),
107         right.getQualifierOffset() + qualCommonPrefix, right.getQualifierLength()
108             - qualCommonPrefix);
109   }
110 
111   protected static class SeekerState {
112     protected ByteBuff currentBuffer;
113     protected TagCompressionContext tagCompressionContext;
114     protected int valueOffset = -1;
115     protected int keyLength;
116     protected int valueLength;
117     protected int lastCommonPrefix;
118     protected int tagsLength = 0;
119     protected int tagsOffset = -1;
120     protected int tagsCompressedLength = 0;
121     protected boolean uncompressTags = true;
122 
123     /** We need to store a copy of the key. */
124     protected byte[] keyBuffer = new byte[INITIAL_KEY_BUFFER_SIZE];
125     protected byte[] tagsBuffer = new byte[INITIAL_KEY_BUFFER_SIZE];
126 
127     protected long memstoreTS;
128     protected int nextKvOffset;
129     protected KeyValue.KeyOnlyKeyValue currentKey = new KeyValue.KeyOnlyKeyValue();
130     // A temp pair object which will be reused by ByteBuff#asSubByteBuffer calls. This avoids too
131     // many object creations.
132     private final ObjectIntPair<ByteBuffer> tmpPair;
133     private final boolean includeTags;
134 
135     public SeekerState(ObjectIntPair<ByteBuffer> tmpPair, boolean includeTags) {
136       this.tmpPair = tmpPair;
137       this.includeTags = includeTags;
138     }
139 
140     protected boolean isValid() {
141       return valueOffset != -1;
142     }
143 
144     protected void invalidate() {
145       valueOffset = -1;
146       tagsCompressedLength = 0;
147       currentKey = new KeyValue.KeyOnlyKeyValue();
148       uncompressTags = true;
149       currentBuffer = null;
150     }
151 
152     protected void ensureSpaceForKey() {
153       if (keyLength > keyBuffer.length) {
154         // rare case, but we need to handle arbitrary length of key
155         int newKeyBufferLength = Math.max(keyBuffer.length, 1) * 2;
156         while (keyLength > newKeyBufferLength) {
157           newKeyBufferLength *= 2;
158         }
159         byte[] newKeyBuffer = new byte[newKeyBufferLength];
160         System.arraycopy(keyBuffer, 0, newKeyBuffer, 0, keyBuffer.length);
161         keyBuffer = newKeyBuffer;
162       }
163     }
164 
165     protected void ensureSpaceForTags() {
166       if (tagsLength > tagsBuffer.length) {
167         // rare case, but we need to handle arbitrary length of tags
168         int newTagsBufferLength = Math.max(tagsBuffer.length, 1) * 2;
169         while (tagsLength > newTagsBufferLength) {
170           newTagsBufferLength *= 2;
171         }
172         byte[] newTagsBuffer = new byte[newTagsBufferLength];
173         System.arraycopy(tagsBuffer, 0, newTagsBuffer, 0, tagsBuffer.length);
174         tagsBuffer = newTagsBuffer;
175       }
176     }
177 
178     protected void setKey(byte[] keyBuffer, long memTS) {
179       currentKey.setKey(keyBuffer, 0, keyLength);
180       memstoreTS = memTS;
181     }
182 
183     /**
184      * Copy the state from the next one into this instance (the previous state
185      * placeholder). Used to save the previous state when we are advancing the
186      * seeker to the next key/value.
187      */
188     protected void copyFromNext(SeekerState nextState) {
189       if (keyBuffer.length != nextState.keyBuffer.length) {
190         keyBuffer = nextState.keyBuffer.clone();
191       } else if (!isValid()) {
192         // Note: we can only call isValid before we override our state, so this
193         // comes before all the assignments at the end of this method.
194         System.arraycopy(nextState.keyBuffer, 0, keyBuffer, 0,
195              nextState.keyLength);
196       } else {
197         // don't copy the common prefix between this key and the previous one
198         System.arraycopy(nextState.keyBuffer, nextState.lastCommonPrefix,
199             keyBuffer, nextState.lastCommonPrefix, nextState.keyLength
200                 - nextState.lastCommonPrefix);
201       }
202       currentKey = nextState.currentKey;
203 
204       valueOffset = nextState.valueOffset;
205       keyLength = nextState.keyLength;
206       valueLength = nextState.valueLength;
207       lastCommonPrefix = nextState.lastCommonPrefix;
208       nextKvOffset = nextState.nextKvOffset;
209       memstoreTS = nextState.memstoreTS;
210       currentBuffer = nextState.currentBuffer;
211       tagsOffset = nextState.tagsOffset;
212       tagsLength = nextState.tagsLength;
213       if (nextState.tagCompressionContext != null) {
214         tagCompressionContext = nextState.tagCompressionContext;
215       }
216     }
217 
218     public Cell toCell() {
219       // Buffer backing the value and tags part from the HFileBlock's buffer
220       // When tag compression in use, this will be only the value bytes area.
221       ByteBuffer valAndTagsBuffer;
222       int vOffset;
223       int valAndTagsLength = this.valueLength;
224       int tagsLenSerializationSize = 0;
225       if (this.includeTags && this.tagCompressionContext == null) {
226         // Include the tags part also. This will be the tags bytes + 2 bytes of for storing tags
227         // length
228         tagsLenSerializationSize = this.tagsOffset - (this.valueOffset + this.valueLength);
229         valAndTagsLength += tagsLenSerializationSize + this.tagsLength;
230       }
231       this.currentBuffer.asSubByteBuffer(this.valueOffset, valAndTagsLength, this.tmpPair);
232       valAndTagsBuffer = this.tmpPair.getFirst();
233       vOffset = this.tmpPair.getSecond();// This is the offset to value part in the BB
234       if (valAndTagsBuffer.hasArray()) {
235         return toOnheapCell(valAndTagsBuffer, vOffset, tagsLenSerializationSize);
236       } else {
237         return toOffheapCell(valAndTagsBuffer, vOffset, tagsLenSerializationSize);
238       }
239     }
240 
241     private Cell toOnheapCell(ByteBuffer valAndTagsBuffer, int vOffset,
242         int tagsLenSerializationSize) {
243       byte[] tagsArray = HConstants.EMPTY_BYTE_ARRAY;
244       int tOffset = 0;
245       if (this.includeTags) {
246         if (this.tagCompressionContext == null) {
247           tagsArray = valAndTagsBuffer.array();
248           tOffset = valAndTagsBuffer.arrayOffset() + vOffset + this.valueLength
249               + tagsLenSerializationSize;
250         } else {
251           tagsArray = Bytes.copy(tagsBuffer, 0, this.tagsLength);
252           tOffset = 0;
253         }
254       }
255       return new OnheapDecodedCell(Bytes.copy(keyBuffer, 0, this.keyLength),
256           currentKey.getRowLength(), currentKey.getFamilyOffset(), currentKey.getFamilyLength(),
257           currentKey.getQualifierOffset(), currentKey.getQualifierLength(),
258           currentKey.getTimestamp(), currentKey.getTypeByte(), valAndTagsBuffer.array(),
259           valAndTagsBuffer.arrayOffset() + vOffset, this.valueLength, memstoreTS, tagsArray,
260           tOffset, this.tagsLength);
261     }
262 
263     private Cell toOffheapCell(ByteBuffer valAndTagsBuffer, int vOffset,
264         int tagsLenSerializationSize) {
265       ByteBuffer tagsBuf =  HConstants.EMPTY_BYTE_BUFFER;
266       int tOffset = 0;
267       if (this.includeTags) {
268         if (this.tagCompressionContext == null) {
269           tagsBuf = valAndTagsBuffer;
270           tOffset = vOffset + this.valueLength + tagsLenSerializationSize;
271         } else {
272           tagsBuf = ByteBuffer.wrap(Bytes.copy(tagsBuffer, 0, this.tagsLength));
273           tOffset = 0;
274         }
275       }
276       return new OffheapDecodedCell(ByteBuffer.wrap(Bytes.copy(keyBuffer, 0, this.keyLength)),
277           currentKey.getRowLength(), currentKey.getFamilyOffset(), currentKey.getFamilyLength(),
278           currentKey.getQualifierOffset(), currentKey.getQualifierLength(),
279           currentKey.getTimestamp(), currentKey.getTypeByte(), valAndTagsBuffer, vOffset,
280           this.valueLength, memstoreTS, tagsBuf, tOffset, this.tagsLength);
281     }
282   }
283 
284   /**
285    * Copies only the key part of the keybuffer by doing a deep copy and passes the
286    * seeker state members for taking a clone.
287    * Note that the value byte[] part is still pointing to the currentBuffer and
288    * represented by the valueOffset and valueLength
289    */
290   // We return this as a Cell to the upper layers of read flow and might try setting a new SeqId
291   // there. So this has to be an instance of SettableSequenceId.
292   protected static class OnheapDecodedCell implements Cell, HeapSize, SettableSequenceId,
293       Streamable {
294     private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
295         + (3 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT)
296         + (Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) + (3 * ClassSize.ARRAY));
297     private byte[] keyOnlyBuffer;
298     private short rowLength;
299     private int familyOffset;
300     private byte familyLength;
301     private int qualifierOffset;
302     private int qualifierLength;
303     private long timestamp;
304     private byte typeByte;
305     private byte[] valueBuffer;
306     private int valueOffset;
307     private int valueLength;
308     private byte[] tagsBuffer;
309     private int tagsOffset;
310     private int tagsLength;
311     private long seqId;
312 
313     protected OnheapDecodedCell(byte[] keyBuffer, short rowLength, int familyOffset,
314         byte familyLength, int qualOffset, int qualLength, long timeStamp, byte typeByte,
315         byte[] valueBuffer, int valueOffset, int valueLen, long seqId, byte[] tagsBuffer,
316         int tagsOffset, int tagsLength) {
317       this.keyOnlyBuffer = keyBuffer;
318       this.rowLength = rowLength;
319       this.familyOffset = familyOffset;
320       this.familyLength = familyLength;
321       this.qualifierOffset = qualOffset;
322       this.qualifierLength = qualLength;
323       this.timestamp = timeStamp;
324       this.typeByte = typeByte;
325       this.valueBuffer = valueBuffer;
326       this.valueOffset = valueOffset;
327       this.valueLength = valueLen;
328       this.tagsBuffer = tagsBuffer;
329       this.tagsOffset = tagsOffset;
330       this.tagsLength = tagsLength;
331       setSequenceId(seqId);
332     }
333 
334     @Override
335     public byte[] getRowArray() {
336       return keyOnlyBuffer;
337     }
338 
339     @Override
340     public byte[] getFamilyArray() {
341       return keyOnlyBuffer;
342     }
343 
344     @Override
345     public byte[] getQualifierArray() {
346       return keyOnlyBuffer;
347     }
348 
349     @Override
350     public int getRowOffset() {
351       return Bytes.SIZEOF_SHORT;
352     }
353 
354     @Override
355     public short getRowLength() {
356       return rowLength;
357     }
358 
359     @Override
360     public int getFamilyOffset() {
361       return familyOffset;
362     }
363 
364     @Override
365     public byte getFamilyLength() {
366       return familyLength;
367     }
368 
369     @Override
370     public int getQualifierOffset() {
371       return qualifierOffset;
372     }
373 
374     @Override
375     public int getQualifierLength() {
376       return qualifierLength;
377     }
378 
379     @Override
380     public long getTimestamp() {
381       return timestamp;
382     }
383 
384     @Override
385     public byte getTypeByte() {
386       return typeByte;
387     }
388 
389     @Override
390     public long getSequenceId() {
391       return seqId;
392     }
393 
394     @Override
395     public byte[] getValueArray() {
396       return this.valueBuffer;
397     }
398 
399     @Override
400     public int getValueOffset() {
401       return valueOffset;
402     }
403 
404     @Override
405     public int getValueLength() {
406       return valueLength;
407     }
408 
409     @Override
410     public byte[] getTagsArray() {
411       return this.tagsBuffer;
412     }
413 
414     @Override
415     public int getTagsOffset() {
416       return this.tagsOffset;
417     }
418 
419     @Override
420     public int getTagsLength() {
421       return tagsLength;
422     }
423 
424     @Override
425     public String toString() {
426       return KeyValue.keyToString(this.keyOnlyBuffer, 0, KeyValueUtil.keyLength(this)) + "/vlen="
427           + getValueLength() + "/seqid=" + seqId;
428     }
429 
430     @Override
431     public void setSequenceId(long seqId) {
432       this.seqId = seqId;
433     }
434 
435     @Override
436     public long heapSize() {
437       return FIXED_OVERHEAD + rowLength + familyLength + qualifierLength + valueLength + tagsLength;
438     }
439 
440     @Override
441     public int write(OutputStream out) throws IOException {
442       return write(out, true);
443     }
444 
445     @Override
446     public int write(OutputStream out, boolean withTags) throws IOException {
447       int lenToWrite = KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength,
448           tagsLength, withTags);
449       ByteBufferUtils.putInt(out, lenToWrite);
450       ByteBufferUtils.putInt(out, keyOnlyBuffer.length);
451       ByteBufferUtils.putInt(out, valueLength);
452       // Write key
453       out.write(keyOnlyBuffer);
454       // Write value
455       out.write(this.valueBuffer, this.valueOffset, this.valueLength);
456       if (withTags) {
457         // 2 bytes tags length followed by tags bytes
458         // tags length is serialized with 2 bytes only(short way) even if the type is int.
459         // As this is non -ve numbers, we save the sign bit. See HBASE-11437
460         out.write((byte) (0xff & (this.tagsLength >> 8)));
461         out.write((byte) (0xff & this.tagsLength));
462         out.write(this.tagsBuffer, this.tagsOffset, this.tagsLength);
463       }
464       return lenToWrite + Bytes.SIZEOF_INT;
465     }
466   }
467 
468   protected static class OffheapDecodedCell extends ByteBufferedCell implements HeapSize,
469       SettableSequenceId, Streamable {
470     private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
471         + (3 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT)
472         + (Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) + (3 * ClassSize.BYTE_BUFFER));
473     private ByteBuffer keyBuffer;
474     private short rowLength;
475     private int familyOffset;
476     private byte familyLength;
477     private int qualifierOffset;
478     private int qualifierLength;
479     private long timestamp;
480     private byte typeByte;
481     private ByteBuffer valueBuffer;
482     private int valueOffset;
483     private int valueLength;
484     private ByteBuffer tagsBuffer;
485     private int tagsOffset;
486     private int tagsLength;
487     private long seqId;
488 
489     protected OffheapDecodedCell(ByteBuffer keyBuffer, short rowLength, int familyOffset,
490         byte familyLength, int qualOffset, int qualLength, long timeStamp, byte typeByte,
491         ByteBuffer valueBuffer, int valueOffset, int valueLen, long seqId, ByteBuffer tagsBuffer,
492         int tagsOffset, int tagsLength) {
493       // The keyBuffer is always onheap
494       assert keyBuffer.hasArray();
495       assert keyBuffer.arrayOffset() == 0;
496       this.keyBuffer = keyBuffer;
497       this.rowLength = rowLength;
498       this.familyOffset = familyOffset;
499       this.familyLength = familyLength;
500       this.qualifierOffset = qualOffset;
501       this.qualifierLength = qualLength;
502       this.timestamp = timeStamp;
503       this.typeByte = typeByte;
504       this.valueBuffer = valueBuffer;
505       this.valueOffset = valueOffset;
506       this.valueLength = valueLen;
507       this.tagsBuffer = tagsBuffer;
508       this.tagsOffset = tagsOffset;
509       this.tagsLength = tagsLength;
510       setSequenceId(seqId);
511     }
512 
513     @Override
514     public byte[] getRowArray() {
515       return this.keyBuffer.array();
516     }
517 
518     @Override
519     public int getRowOffset() {
520       return getRowPosition();
521     }
522 
523     @Override
524     public short getRowLength() {
525       return this.rowLength;
526     }
527 
528     @Override
529     public byte[] getFamilyArray() {
530       return this.keyBuffer.array();
531     }
532 
533     @Override
534     public int getFamilyOffset() {
535       return getFamilyPosition();
536     }
537 
538     @Override
539     public byte getFamilyLength() {
540       return this.familyLength;
541     }
542 
543     @Override
544     public byte[] getQualifierArray() {
545       return this.keyBuffer.array();
546     }
547 
548     @Override
549     public int getQualifierOffset() {
550       return getQualifierPosition();
551     }
552 
553     @Override
554     public int getQualifierLength() {
555       return this.qualifierLength;
556     }
557 
558     @Override
559     public long getTimestamp() {
560       return this.timestamp;
561     }
562 
563     @Override
564     public byte getTypeByte() {
565       return this.typeByte;
566     }
567 
568     @Override
569     public long getSequenceId() {
570       return this.seqId;
571     }
572 
573     @Override
574     public byte[] getValueArray() {
575       return CellUtil.cloneValue(this);
576     }
577 
578     @Override
579     public int getValueOffset() {
580       return 0;
581     }
582 
583     @Override
584     public int getValueLength() {
585       return this.valueLength;
586     }
587 
588     @Override
589     public byte[] getTagsArray() {
590       return CellUtil.cloneTags(this);
591     }
592 
593     @Override
594     public int getTagsOffset() {
595       return 0;
596     }
597 
598     @Override
599     public int getTagsLength() {
600       return this.tagsLength;
601     }
602 
603     @Override
604     public ByteBuffer getRowByteBuffer() {
605       return this.keyBuffer;
606     }
607 
608     @Override
609     public int getRowPosition() {
610       return Bytes.SIZEOF_SHORT;
611     }
612 
613     @Override
614     public ByteBuffer getFamilyByteBuffer() {
615       return this.keyBuffer;
616     }
617 
618     @Override
619     public int getFamilyPosition() {
620       return this.familyOffset;
621     }
622 
623     @Override
624     public ByteBuffer getQualifierByteBuffer() {
625       return this.keyBuffer;
626     }
627 
628     @Override
629     public int getQualifierPosition() {
630       return this.qualifierOffset;
631     }
632 
633     @Override
634     public ByteBuffer getValueByteBuffer() {
635       return this.valueBuffer;
636     }
637 
638     @Override
639     public int getValuePosition() {
640       return this.valueOffset;
641     }
642 
643     @Override
644     public ByteBuffer getTagsByteBuffer() {
645       return this.tagsBuffer;
646     }
647 
648     @Override
649     public int getTagsPosition() {
650       return this.tagsOffset;
651     }
652 
653     @Override
654     public long heapSize() {
655       return FIXED_OVERHEAD + rowLength + familyLength + qualifierLength + valueLength + tagsLength;
656     }
657 
658     @Override
659     public void setSequenceId(long seqId) {
660       this.seqId = seqId;
661     }
662 
663     @Override
664     public int write(OutputStream out) throws IOException {
665       return write(out, true);
666     }
667 
668     @Override
669     public int write(OutputStream out, boolean withTags) throws IOException {
670       int lenToWrite = KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength,
671           tagsLength, withTags);
672       ByteBufferUtils.putInt(out, lenToWrite);
673       ByteBufferUtils.putInt(out, keyBuffer.capacity());
674       ByteBufferUtils.putInt(out, valueLength);
675       // Write key
676       out.write(keyBuffer.array());
677       // Write value
678       ByteBufferUtils.copyBufferToStream(out, this.valueBuffer, this.valueOffset, this.valueLength);
679       if (withTags) {
680         // 2 bytes tags length followed by tags bytes
681         // tags length is serialized with 2 bytes only(short way) even if the type is int.
682         // As this is non -ve numbers, we save the sign bit. See HBASE-11437
683         out.write((byte) (0xff & (this.tagsLength >> 8)));
684         out.write((byte) (0xff & this.tagsLength));
685         ByteBufferUtils.copyBufferToStream(out, this.tagsBuffer, this.tagsOffset, this.tagsLength);
686       }
687       return lenToWrite + Bytes.SIZEOF_INT;
688     }
689   }
690 
691   protected abstract static class
692       BufferedEncodedSeeker<STATE extends SeekerState>
693       implements EncodedSeeker {
694     protected HFileBlockDecodingContext decodingCtx;
695     protected final CellComparator comparator;
696     protected ByteBuff currentBuffer;
697     protected TagCompressionContext tagCompressionContext = null;
698     protected  KeyValue.KeyOnlyKeyValue keyOnlyKV = new KeyValue.KeyOnlyKeyValue();
699     // A temp pair object which will be reused by ByteBuff#asSubByteBuffer calls. This avoids too
700     // many object creations.
701     protected final ObjectIntPair<ByteBuffer> tmpPair = new ObjectIntPair<ByteBuffer>();
702     protected STATE current, previous;
703 
704     public BufferedEncodedSeeker(CellComparator comparator,
705         HFileBlockDecodingContext decodingCtx) {
706       this.comparator = comparator;
707       this.decodingCtx = decodingCtx;
708       if (decodingCtx.getHFileContext().isCompressTags()) {
709         try {
710           tagCompressionContext = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
711         } catch (Exception e) {
712           throw new RuntimeException("Failed to initialize TagCompressionContext", e);
713         }
714       }
715       current = createSeekerState(); // always valid
716       previous = createSeekerState(); // may not be valid
717     }
718 
719     protected boolean includesMvcc() {
720       return this.decodingCtx.getHFileContext().isIncludesMvcc();
721     }
722 
723     protected boolean includesTags() {
724       return this.decodingCtx.getHFileContext().isIncludesTags();
725     }
726 
727     @Override
728     public int compareKey(CellComparator comparator, Cell key) {
729       keyOnlyKV.setKey(current.keyBuffer, 0, current.keyLength);
730       return comparator.compareKeyIgnoresMvcc(key, keyOnlyKV);
731     }
732 
733     @Override
734     public void setCurrentBuffer(ByteBuff buffer) {
735       if (this.tagCompressionContext != null) {
736         this.tagCompressionContext.clear();
737       }
738       currentBuffer = buffer;
739       current.currentBuffer = currentBuffer;
740       if(tagCompressionContext != null) {
741         current.tagCompressionContext = tagCompressionContext;
742       }
743       decodeFirst();
744       current.setKey(current.keyBuffer, current.memstoreTS);
745       previous.invalidate();
746     }
747 
748     @Override
749     public Cell getKey() {
750       byte[] key = new byte[current.keyLength];
751       System.arraycopy(current.keyBuffer, 0, key, 0, current.keyLength);
752       return new KeyValue.KeyOnlyKeyValue(key);
753     }
754 
755     @Override
756     public ByteBuffer getValueShallowCopy() {
757       currentBuffer.asSubByteBuffer(current.valueOffset, current.valueLength, tmpPair);
758       ByteBuffer dup = tmpPair.getFirst().duplicate();
759       dup.position(tmpPair.getSecond());
760       dup.limit(tmpPair.getSecond() + current.valueLength);
761       return dup.slice();
762     }
763 
764     @Override
765     public Cell getCell() {
766       return current.toCell();
767     }
768 
769     @Override
770     public void rewind() {
771       currentBuffer.rewind();
772       if (tagCompressionContext != null) {
773         tagCompressionContext.clear();
774       }
775       decodeFirst();
776       current.setKey(current.keyBuffer, current.memstoreTS);
777       previous.invalidate();
778     }
779 
780     @Override
781     public boolean next() {
782       if (!currentBuffer.hasRemaining()) {
783         return false;
784       }
785       decodeNext();
786       current.setKey(current.keyBuffer, current.memstoreTS);
787       previous.invalidate();
788       return true;
789     }
790 
791     protected void decodeTags() {
792       current.tagsLength = ByteBuff.readCompressedInt(currentBuffer);
793       if (tagCompressionContext != null) {
794         if (current.uncompressTags) {
795           // Tag compression is been used. uncompress it into tagsBuffer
796           current.ensureSpaceForTags();
797           try {
798             current.tagsCompressedLength = tagCompressionContext.uncompressTags(currentBuffer,
799                 current.tagsBuffer, 0, current.tagsLength);
800           } catch (IOException e) {
801             throw new RuntimeException("Exception while uncompressing tags", e);
802           }
803         } else {
804           currentBuffer.skip(current.tagsCompressedLength);
805           current.uncompressTags = true;// Reset this.
806         }
807         current.tagsOffset = -1;
808       } else {
809         // When tag compress is not used, let us not do copying of tags bytes into tagsBuffer.
810         // Just mark the tags Offset so as to create the KV buffer later in getKeyValueBuffer()
811         current.tagsOffset = currentBuffer.position();
812         currentBuffer.skip(current.tagsLength);
813       }
814     }
815 
816     @Override
817     public int seekToKeyInBlock(Cell seekCell, boolean seekBefore) {
818       int rowCommonPrefix = 0;
819       int familyCommonPrefix = 0;
820       int qualCommonPrefix = 0;
821       previous.invalidate();
822       do {
823         int comp;
824         keyOnlyKV.setKey(current.keyBuffer, 0, current.keyLength);
825         if (current.lastCommonPrefix != 0) {
826           // The KV format has row key length also in the byte array. The
827           // common prefix
828           // includes it. So we need to subtract to find out the common prefix
829           // in the
830           // row part alone
831           rowCommonPrefix = Math.min(rowCommonPrefix, current.lastCommonPrefix - 2);
832         }
833         if (current.lastCommonPrefix <= 2) {
834           rowCommonPrefix = 0;
835         }
836         rowCommonPrefix += findCommonPrefixInRowPart(seekCell, keyOnlyKV, rowCommonPrefix);
837         comp = compareCommonRowPrefix(seekCell, keyOnlyKV, rowCommonPrefix);
838         if (comp == 0) {
839           comp = compareTypeBytes(seekCell, keyOnlyKV);
840           if (comp == 0) {
841             // Subtract the fixed row key length and the family key fixed length
842             familyCommonPrefix = Math.max(
843                 0,
844                 Math.min(familyCommonPrefix,
845                     current.lastCommonPrefix - (3 + keyOnlyKV.getRowLength())));
846             familyCommonPrefix += findCommonPrefixInFamilyPart(seekCell, keyOnlyKV,
847                 familyCommonPrefix);
848             comp = compareCommonFamilyPrefix(seekCell, keyOnlyKV, familyCommonPrefix);
849             if (comp == 0) {
850               // subtract the rowkey fixed length and the family key fixed
851               // length
852               qualCommonPrefix = Math.max(
853                   0,
854                   Math.min(
855                       qualCommonPrefix,
856                       current.lastCommonPrefix
857                           - (3 + keyOnlyKV.getRowLength() + keyOnlyKV.getFamilyLength())));
858               qualCommonPrefix += findCommonPrefixInQualifierPart(seekCell, keyOnlyKV,
859                   qualCommonPrefix);
860               comp = compareCommonQualifierPrefix(seekCell, keyOnlyKV, qualCommonPrefix);
861               if (comp == 0) {
862                 comp = CellComparator.compareTimestamps(seekCell, keyOnlyKV);
863                 if (comp == 0) {
864                   // Compare types. Let the delete types sort ahead of puts;
865                   // i.e. types
866                   // of higher numbers sort before those of lesser numbers.
867                   // Maximum
868                   // (255)
869                   // appears ahead of everything, and minimum (0) appears
870                   // after
871                   // everything.
872                   comp = (0xff & keyOnlyKV.getTypeByte()) - (0xff & seekCell.getTypeByte());
873                 }
874               }
875             }
876           }
877         }
878         if (comp == 0) { // exact match
879           if (seekBefore) {
880             if (!previous.isValid()) {
881               // The caller (seekBefore) has to ensure that we are not at the
882               // first key in the block.
883               throw new IllegalStateException("Cannot seekBefore if "
884                   + "positioned at the first key in the block: key="
885                   + Bytes.toStringBinary(seekCell.getRowArray()));
886             }
887             moveToPrevious();
888             return 1;
889           }
890           return 0;
891         }
892 
893         if (comp < 0) { // already too large, check previous
894           if (previous.isValid()) {
895             moveToPrevious();
896           } else {
897             return HConstants.INDEX_KEY_MAGIC; // using optimized index key
898           }
899           return 1;
900         }
901 
902         // move to next, if more data is available
903         if (currentBuffer.hasRemaining()) {
904           previous.copyFromNext(current);
905           decodeNext();
906           current.setKey(current.keyBuffer, current.memstoreTS);
907         } else {
908           break;
909         }
910       } while (true);
911 
912       // we hit the end of the block, not an exact match
913       return 1;
914     }
915 
916     private int compareTypeBytes(Cell key, Cell right) {
917       if (key.getFamilyLength() + key.getQualifierLength() == 0
918           && key.getTypeByte() == Type.Minimum.getCode()) {
919         // left is "bigger", i.e. it appears later in the sorted order
920         return 1;
921       }
922       if (right.getFamilyLength() + right.getQualifierLength() == 0
923           && right.getTypeByte() == Type.Minimum.getCode()) {
924         return -1;
925       }
926       return 0;
927     }
928 
929     private static int findCommonPrefixInRowPart(Cell left, Cell right, int rowCommonPrefix) {
930       return Bytes.findCommonPrefix(left.getRowArray(), right.getRowArray(), left.getRowLength()
931           - rowCommonPrefix, right.getRowLength() - rowCommonPrefix, left.getRowOffset()
932           + rowCommonPrefix, right.getRowOffset() + rowCommonPrefix);
933     }
934 
935     private static int findCommonPrefixInFamilyPart(Cell left, Cell right, int familyCommonPrefix) {
936       return Bytes
937           .findCommonPrefix(left.getFamilyArray(), right.getFamilyArray(), left.getFamilyLength()
938               - familyCommonPrefix, right.getFamilyLength() - familyCommonPrefix,
939               left.getFamilyOffset() + familyCommonPrefix, right.getFamilyOffset()
940                   + familyCommonPrefix);
941     }
942 
943     private static int findCommonPrefixInQualifierPart(Cell left, Cell right,
944         int qualifierCommonPrefix) {
945       return Bytes.findCommonPrefix(left.getQualifierArray(), right.getQualifierArray(),
946           left.getQualifierLength() - qualifierCommonPrefix, right.getQualifierLength()
947               - qualifierCommonPrefix, left.getQualifierOffset() + qualifierCommonPrefix,
948           right.getQualifierOffset() + qualifierCommonPrefix);
949     }
950 
951     private void moveToPrevious() {
952       if (!previous.isValid()) {
953         throw new IllegalStateException(
954             "Can move back only once and not in first key in the block.");
955       }
956 
957       STATE tmp = previous;
958       previous = current;
959       current = tmp;
960 
961       // move after last key value
962       currentBuffer.position(current.nextKvOffset);
963       // Already decoded the tag bytes. We cache this tags into current state and also the total
964       // compressed length of the tags bytes. For the next time decodeNext() we don't need to decode
965       // the tags again. This might pollute the Data Dictionary what we use for the compression.
966       // When current.uncompressTags is false, we will just reuse the current.tagsBuffer and skip
967       // 'tagsCompressedLength' bytes of source stream.
968       // See in decodeTags()
969       current.tagsBuffer = previous.tagsBuffer;
970       current.tagsCompressedLength = previous.tagsCompressedLength;
971       current.uncompressTags = false;
972       // The current key has to be reset with the previous Cell
973       current.setKey(current.keyBuffer, current.memstoreTS);
974       previous.invalidate();
975     }
976 
977     @SuppressWarnings("unchecked")
978     protected STATE createSeekerState() {
979       // This will fail for non-default seeker state if the subclass does not
980       // override this method.
981       return (STATE) new SeekerState(this.tmpPair, this.includesTags());
982     }
983 
984     abstract protected void decodeFirst();
985     abstract protected void decodeNext();
986   }
987 
988   /**
989    * @param cell
990    * @param out
991    * @param encodingCtx
992    * @return unencoded size added
993    * @throws IOException
994    */
995   protected final int afterEncodingKeyValue(Cell cell, DataOutputStream out,
996       HFileBlockDefaultEncodingContext encodingCtx) throws IOException {
997     int size = 0;
998     if (encodingCtx.getHFileContext().isIncludesTags()) {
999       int tagsLength = cell.getTagsLength();
1000       ByteBufferUtils.putCompressedInt(out, tagsLength);
1001       // There are some tags to be written
1002       if (tagsLength > 0) {
1003         TagCompressionContext tagCompressionContext = encodingCtx.getTagCompressionContext();
1004         // When tag compression is enabled, tagCompressionContext will have a not null value. Write
1005         // the tags using Dictionary compression in such a case
1006         if (tagCompressionContext != null) {
1007           // Not passing tagsLength considering that parsing of the tagsLength is not costly
1008           CellUtil.compressTags(out, cell, tagCompressionContext);
1009         } else {
1010           CellUtil.writeTags(out, cell, tagsLength);
1011         }
1012       }
1013       size += tagsLength + KeyValue.TAGS_LENGTH_SIZE;
1014     }
1015     if (encodingCtx.getHFileContext().isIncludesMvcc()) {
1016       // Copy memstore timestamp from the byte buffer to the output stream.
1017       long memstoreTS = cell.getSequenceId();
1018       WritableUtils.writeVLong(out, memstoreTS);
1019       // TODO use a writeVLong which returns the #bytes written so that 2 time parsing can be
1020       // avoided.
1021       size += WritableUtils.getVIntSize(memstoreTS);
1022     }
1023     return size;
1024   }
1025 
1026   protected final void afterDecodingKeyValue(DataInputStream source,
1027       ByteBuffer dest, HFileBlockDefaultDecodingContext decodingCtx) throws IOException {
1028     if (decodingCtx.getHFileContext().isIncludesTags()) {
1029       int tagsLength = ByteBufferUtils.readCompressedInt(source);
1030       // Put as unsigned short
1031       dest.put((byte) ((tagsLength >> 8) & 0xff));
1032       dest.put((byte) (tagsLength & 0xff));
1033       if (tagsLength > 0) {
1034         TagCompressionContext tagCompressionContext = decodingCtx.getTagCompressionContext();
1035         // When tag compression is been used in this file, tagCompressionContext will have a not
1036         // null value passed.
1037         if (tagCompressionContext != null) {
1038           tagCompressionContext.uncompressTags(source, dest, tagsLength);
1039         } else {
1040           ByteBufferUtils.copyFromStreamToBuffer(dest, source, tagsLength);
1041         }
1042       }
1043     }
1044     if (decodingCtx.getHFileContext().isIncludesMvcc()) {
1045       long memstoreTS = -1;
1046       try {
1047         // Copy memstore timestamp from the data input stream to the byte
1048         // buffer.
1049         memstoreTS = WritableUtils.readVLong(source);
1050         ByteBufferUtils.writeVLong(dest, memstoreTS);
1051       } catch (IOException ex) {
1052         throw new RuntimeException("Unable to copy memstore timestamp " +
1053             memstoreTS + " after decoding a key/value");
1054       }
1055     }
1056   }
1057 
1058   @Override
1059   public HFileBlockEncodingContext newDataBlockEncodingContext(DataBlockEncoding encoding,
1060       byte[] header, HFileContext meta) {
1061     return new HFileBlockDefaultEncodingContext(encoding, header, meta);
1062   }
1063 
1064   @Override
1065   public HFileBlockDecodingContext newDataBlockDecodingContext(HFileContext meta) {
1066     return new HFileBlockDefaultDecodingContext(meta);
1067   }
1068 
1069   protected abstract ByteBuffer internalDecodeKeyValues(DataInputStream source,
1070       int allocateHeaderLength, int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx)
1071       throws IOException;
1072 
1073   /**
1074    * Asserts that there is at least the given amount of unfilled space
1075    * remaining in the given buffer.
1076    * @param out typically, the buffer we are writing to
1077    * @param length the required space in the buffer
1078    * @throws EncoderBufferTooSmallException If there are no enough bytes.
1079    */
1080   protected static void ensureSpace(ByteBuffer out, int length)
1081       throws EncoderBufferTooSmallException {
1082     if (out.position() + length > out.limit()) {
1083       throw new EncoderBufferTooSmallException(
1084           "Buffer position=" + out.position() +
1085           ", buffer limit=" + out.limit() +
1086           ", length to be written=" + length);
1087     }
1088   }
1089 
1090   @Override
1091   public void startBlockEncoding(HFileBlockEncodingContext blkEncodingCtx, DataOutputStream out)
1092       throws IOException {
1093     if (blkEncodingCtx.getClass() != HFileBlockDefaultEncodingContext.class) {
1094       throw new IOException (this.getClass().getName() + " only accepts "
1095           + HFileBlockDefaultEncodingContext.class.getName() + " as the " +
1096           "encoding context.");
1097     }
1098 
1099     HFileBlockDefaultEncodingContext encodingCtx =
1100         (HFileBlockDefaultEncodingContext) blkEncodingCtx;
1101     encodingCtx.prepareEncoding(out);
1102     if (encodingCtx.getHFileContext().isIncludesTags()
1103         && encodingCtx.getHFileContext().isCompressTags()) {
1104       if (encodingCtx.getTagCompressionContext() != null) {
1105         // It will be overhead to create the TagCompressionContext again and again for every block
1106         // encoding.
1107         encodingCtx.getTagCompressionContext().clear();
1108       } else {
1109         try {
1110           TagCompressionContext tagCompressionContext = new TagCompressionContext(
1111               LRUDictionary.class, Byte.MAX_VALUE);
1112           encodingCtx.setTagCompressionContext(tagCompressionContext);
1113         } catch (Exception e) {
1114           throw new IOException("Failed to initialize TagCompressionContext", e);
1115         }
1116       }
1117     }
1118     StreamUtils.writeInt(out, 0); // DUMMY length. This will be updated in endBlockEncoding()
1119     blkEncodingCtx.setEncodingState(new BufferedDataBlockEncodingState());
1120   }
1121 
1122   private static class BufferedDataBlockEncodingState extends EncodingState {
1123     int unencodedDataSizeWritten = 0;
1124   }
1125 
1126   @Override
1127   public int encode(Cell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
1128       throws IOException {
1129     BufferedDataBlockEncodingState state = (BufferedDataBlockEncodingState) encodingCtx
1130         .getEncodingState();
1131     int encodedKvSize = internalEncode(cell, (HFileBlockDefaultEncodingContext) encodingCtx, out);
1132     state.unencodedDataSizeWritten += encodedKvSize;
1133     return encodedKvSize;
1134   }
1135 
1136   public abstract int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingCtx,
1137       DataOutputStream out) throws IOException;
1138 
1139   @Override
1140   public void endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out,
1141       byte[] uncompressedBytesWithHeader) throws IOException {
1142     BufferedDataBlockEncodingState state = (BufferedDataBlockEncodingState) encodingCtx
1143         .getEncodingState();
1144     // Write the unencodedDataSizeWritten (with header size)
1145     Bytes.putInt(uncompressedBytesWithHeader,
1146       HConstants.HFILEBLOCK_HEADER_SIZE + DataBlockEncoding.ID_SIZE, state.unencodedDataSizeWritten
1147         );
1148     if (encodingCtx.getDataBlockEncoding() != DataBlockEncoding.NONE) {
1149       encodingCtx.postEncoding(BlockType.ENCODED_DATA);
1150     } else {
1151       encodingCtx.postEncoding(BlockType.DATA);
1152     }
1153   }
1154 
1155   protected Cell createFirstKeyCell(ByteBuffer key, int keyLength) {
1156     if (key.hasArray()) {
1157       return new KeyValue.KeyOnlyKeyValue(key.array(), key.arrayOffset() + key.position(),
1158           keyLength);
1159     } else {
1160       return new ByteBufferedKeyOnlyKeyValue(key, key.position(), keyLength);
1161     }
1162   }
1163 }