View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations
15   * under the License.
16   */
17  package org.apache.hadoop.hbase.io.encoding;
18  
19  import java.io.DataInputStream;
20  import java.io.DataOutputStream;
21  import java.io.IOException;
22  import java.io.OutputStream;
23  import java.nio.ByteBuffer;
24  
25  import org.apache.hadoop.hbase.ByteBufferedCell;
26  import org.apache.hadoop.hbase.Cell;
27  import org.apache.hadoop.hbase.CellComparator;
28  import org.apache.hadoop.hbase.CellUtil;
29  import org.apache.hadoop.hbase.HConstants;
30  import org.apache.hadoop.hbase.KeyValue;
31  import org.apache.hadoop.hbase.OffheapKeyOnlyKeyValue;
32  import org.apache.hadoop.hbase.KeyValue.Type;
33  import org.apache.hadoop.hbase.KeyValueUtil;
34  import org.apache.hadoop.hbase.SettableSequenceId;
35  import org.apache.hadoop.hbase.Streamable;
36  import org.apache.hadoop.hbase.classification.InterfaceAudience;
37  import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
38  import org.apache.hadoop.hbase.io.HeapSize;
39  import org.apache.hadoop.hbase.io.TagCompressionContext;
40  import org.apache.hadoop.hbase.io.hfile.BlockType;
41  import org.apache.hadoop.hbase.io.hfile.HFileContext;
42  import org.apache.hadoop.hbase.io.util.LRUDictionary;
43  import org.apache.hadoop.hbase.io.util.StreamUtils;
44  import org.apache.hadoop.hbase.nio.ByteBuff;
45  import org.apache.hadoop.hbase.util.ByteBufferUtils;
46  import org.apache.hadoop.hbase.util.Bytes;
47  import org.apache.hadoop.hbase.util.ClassSize;
48  import org.apache.hadoop.hbase.util.Pair;
49  import org.apache.hadoop.io.WritableUtils;
50  
51  /**
52   * Base class for all data block encoders that use a buffer.
53   */
54  @InterfaceAudience.Private
55  abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
56  
57    private static int INITIAL_KEY_BUFFER_SIZE = 512;
58  
59    @Override
60    public ByteBuffer decodeKeyValues(DataInputStream source,
61        HFileBlockDecodingContext blkDecodingCtx) throws IOException {
62      if (blkDecodingCtx.getClass() != HFileBlockDefaultDecodingContext.class) {
63        throw new IOException(this.getClass().getName() + " only accepts "
64            + HFileBlockDefaultDecodingContext.class.getName() + " as the decoding context.");
65      }
66  
67      HFileBlockDefaultDecodingContext decodingCtx =
68          (HFileBlockDefaultDecodingContext) blkDecodingCtx;
69      if (decodingCtx.getHFileContext().isIncludesTags()
70          && decodingCtx.getHFileContext().isCompressTags()) {
71        if (decodingCtx.getTagCompressionContext() != null) {
72          // It will be overhead to create the TagCompressionContext again and again for every block
73          // decoding.
74          decodingCtx.getTagCompressionContext().clear();
75        } else {
76          try {
77            TagCompressionContext tagCompressionContext = new TagCompressionContext(
78                LRUDictionary.class, Byte.MAX_VALUE);
79            decodingCtx.setTagCompressionContext(tagCompressionContext);
80          } catch (Exception e) {
81            throw new IOException("Failed to initialize TagCompressionContext", e);
82          }
83        }
84      }
85      return internalDecodeKeyValues(source, 0, 0, decodingCtx);
86    }
87  
88    /********************* common prefixes *************************/
89    // Having this as static is fine but if META is having DBE then we should
90    // change this.
91    public static int compareCommonRowPrefix(Cell left, Cell right, int rowCommonPrefix) {
92      return Bytes.compareTo(left.getRowArray(), left.getRowOffset() + rowCommonPrefix,
93          left.getRowLength() - rowCommonPrefix, right.getRowArray(), right.getRowOffset()
94              + rowCommonPrefix, right.getRowLength() - rowCommonPrefix);
95    }
96  
97    public static int compareCommonFamilyPrefix(Cell left, Cell right, int familyCommonPrefix) {
98      return Bytes.compareTo(left.getFamilyArray(), left.getFamilyOffset() + familyCommonPrefix,
99          left.getFamilyLength() - familyCommonPrefix, right.getFamilyArray(),
100         right.getFamilyOffset() + familyCommonPrefix, right.getFamilyLength() - familyCommonPrefix);
101   }
102 
103   public static int compareCommonQualifierPrefix(Cell left, Cell right, int qualCommonPrefix) {
104     return Bytes.compareTo(left.getQualifierArray(), left.getQualifierOffset() + qualCommonPrefix,
105         left.getQualifierLength() - qualCommonPrefix, right.getQualifierArray(),
106         right.getQualifierOffset() + qualCommonPrefix, right.getQualifierLength()
107             - qualCommonPrefix);
108   }
109 
110   protected static class SeekerState {
111     protected ByteBuff currentBuffer;
112     protected TagCompressionContext tagCompressionContext;
113     protected int valueOffset = -1;
114     protected int keyLength;
115     protected int valueLength;
116     protected int lastCommonPrefix;
117     protected int tagsLength = 0;
118     protected int tagsOffset = -1;
119     protected int tagsCompressedLength = 0;
120     protected boolean uncompressTags = true;
121 
122     /** We need to store a copy of the key. */
123     protected byte[] keyBuffer = new byte[INITIAL_KEY_BUFFER_SIZE];
124     protected byte[] tagsBuffer = new byte[INITIAL_KEY_BUFFER_SIZE];
125 
126     protected long memstoreTS;
127     protected int nextKvOffset;
128     protected KeyValue.KeyOnlyKeyValue currentKey = new KeyValue.KeyOnlyKeyValue();
129     // A temp pair object which will be reused by ByteBuff#asSubByteBuffer calls. This avoids too
130     // many object creations.
131     private final Pair<ByteBuffer, Integer> tmpPair;
132     private final boolean includeTags;
133 
134     public SeekerState(Pair<ByteBuffer, Integer> tmpPair, boolean includeTags) {
135       this.tmpPair = tmpPair;
136       this.includeTags = includeTags;
137     }
138 
139     protected boolean isValid() {
140       return valueOffset != -1;
141     }
142 
143     protected void invalidate() {
144       valueOffset = -1;
145       tagsCompressedLength = 0;
146       currentKey = new KeyValue.KeyOnlyKeyValue();
147       uncompressTags = true;
148       currentBuffer = null;
149     }
150 
151     protected void ensureSpaceForKey() {
152       if (keyLength > keyBuffer.length) {
153         // rare case, but we need to handle arbitrary length of key
154         int newKeyBufferLength = Math.max(keyBuffer.length, 1) * 2;
155         while (keyLength > newKeyBufferLength) {
156           newKeyBufferLength *= 2;
157         }
158         byte[] newKeyBuffer = new byte[newKeyBufferLength];
159         System.arraycopy(keyBuffer, 0, newKeyBuffer, 0, keyBuffer.length);
160         keyBuffer = newKeyBuffer;
161       }
162     }
163 
164     protected void ensureSpaceForTags() {
165       if (tagsLength > tagsBuffer.length) {
166         // rare case, but we need to handle arbitrary length of tags
167         int newTagsBufferLength = Math.max(tagsBuffer.length, 1) * 2;
168         while (tagsLength > newTagsBufferLength) {
169           newTagsBufferLength *= 2;
170         }
171         byte[] newTagsBuffer = new byte[newTagsBufferLength];
172         System.arraycopy(tagsBuffer, 0, newTagsBuffer, 0, tagsBuffer.length);
173         tagsBuffer = newTagsBuffer;
174       }
175     }
176 
177     protected void setKey(byte[] keyBuffer, long memTS) {
178       currentKey.setKey(keyBuffer, 0, keyLength);
179       memstoreTS = memTS;
180     }
181 
182     /**
183      * Copy the state from the next one into this instance (the previous state
184      * placeholder). Used to save the previous state when we are advancing the
185      * seeker to the next key/value.
186      */
187     protected void copyFromNext(SeekerState nextState) {
188       if (keyBuffer.length != nextState.keyBuffer.length) {
189         keyBuffer = nextState.keyBuffer.clone();
190       } else if (!isValid()) {
191         // Note: we can only call isValid before we override our state, so this
192         // comes before all the assignments at the end of this method.
193         System.arraycopy(nextState.keyBuffer, 0, keyBuffer, 0,
194              nextState.keyLength);
195       } else {
196         // don't copy the common prefix between this key and the previous one
197         System.arraycopy(nextState.keyBuffer, nextState.lastCommonPrefix,
198             keyBuffer, nextState.lastCommonPrefix, nextState.keyLength
199                 - nextState.lastCommonPrefix);
200       }
201       currentKey = nextState.currentKey;
202 
203       valueOffset = nextState.valueOffset;
204       keyLength = nextState.keyLength;
205       valueLength = nextState.valueLength;
206       lastCommonPrefix = nextState.lastCommonPrefix;
207       nextKvOffset = nextState.nextKvOffset;
208       memstoreTS = nextState.memstoreTS;
209       currentBuffer = nextState.currentBuffer;
210       tagsOffset = nextState.tagsOffset;
211       tagsLength = nextState.tagsLength;
212       if (nextState.tagCompressionContext != null) {
213         tagCompressionContext = nextState.tagCompressionContext;
214       }
215     }
216 
217     public Cell toCell() {
218       // Buffer backing the value and tags part from the HFileBlock's buffer
219       // When tag compression in use, this will be only the value bytes area.
220       ByteBuffer valAndTagsBuffer;
221       int vOffset;
222       int valAndTagsLength = this.valueLength;
223       int tagsLenSerializationSize = 0;
224       if (this.includeTags && this.tagCompressionContext == null) {
225         // Include the tags part also. This will be the tags bytes + 2 bytes of for storing tags
226         // length
227         tagsLenSerializationSize = this.tagsOffset - (this.valueOffset + this.valueLength);
228         valAndTagsLength += tagsLenSerializationSize + this.tagsLength;
229       }
230       this.currentBuffer.asSubByteBuffer(this.valueOffset, valAndTagsLength, this.tmpPair);
231       valAndTagsBuffer = this.tmpPair.getFirst();
232       vOffset = this.tmpPair.getSecond();// This is the offset to value part in the BB
233       if (valAndTagsBuffer.hasArray()) {
234         return toOnheapCell(valAndTagsBuffer, vOffset, tagsLenSerializationSize);
235       } else {
236         return toOffheapCell(valAndTagsBuffer, vOffset, tagsLenSerializationSize);
237       }
238     }
239 
240     private Cell toOnheapCell(ByteBuffer valAndTagsBuffer, int vOffset,
241         int tagsLenSerializationSize) {
242       byte[] tagsArray = HConstants.EMPTY_BYTE_ARRAY;
243       int tOffset = 0;
244       if (this.includeTags) {
245         if (this.tagCompressionContext == null) {
246           tagsArray = valAndTagsBuffer.array();
247           tOffset = valAndTagsBuffer.arrayOffset() + vOffset + this.valueLength
248               + tagsLenSerializationSize;
249         } else {
250           tagsArray = Bytes.copy(tagsBuffer, 0, this.tagsLength);
251           tOffset = 0;
252         }
253       }
254       return new OnheapDecodedCell(Bytes.copy(keyBuffer, 0, this.keyLength),
255           currentKey.getRowLength(), currentKey.getFamilyOffset(), currentKey.getFamilyLength(),
256           currentKey.getQualifierOffset(), currentKey.getQualifierLength(),
257           currentKey.getTimestamp(), currentKey.getTypeByte(), valAndTagsBuffer.array(),
258           valAndTagsBuffer.arrayOffset() + vOffset, this.valueLength, memstoreTS, tagsArray,
259           tOffset, this.tagsLength);
260     }
261 
262     private Cell toOffheapCell(ByteBuffer valAndTagsBuffer, int vOffset,
263         int tagsLenSerializationSize) {
264       ByteBuffer tagsBuf =  HConstants.EMPTY_BYTE_BUFFER;
265       int tOffset = 0;
266       if (this.includeTags) {
267         if (this.tagCompressionContext == null) {
268           tagsBuf = valAndTagsBuffer;
269           tOffset = vOffset + this.valueLength + tagsLenSerializationSize;
270         } else {
271           tagsBuf = ByteBuffer.wrap(Bytes.copy(tagsBuffer, 0, this.tagsLength));
272           tOffset = 0;
273         }
274       }
275       return new OffheapDecodedCell(ByteBuffer.wrap(Bytes.copy(keyBuffer, 0, this.keyLength)),
276           currentKey.getRowLength(), currentKey.getFamilyOffset(), currentKey.getFamilyLength(),
277           currentKey.getQualifierOffset(), currentKey.getQualifierLength(),
278           currentKey.getTimestamp(), currentKey.getTypeByte(), valAndTagsBuffer, vOffset,
279           this.valueLength, memstoreTS, tagsBuf, tOffset, this.tagsLength);
280     }
281   }
282 
283   /**
284    * Copies only the key part of the keybuffer by doing a deep copy and passes the
285    * seeker state members for taking a clone.
286    * Note that the value byte[] part is still pointing to the currentBuffer and
287    * represented by the valueOffset and valueLength
288    */
289   // We return this as a Cell to the upper layers of read flow and might try setting a new SeqId
290   // there. So this has to be an instance of SettableSequenceId.
291   protected static class OnheapDecodedCell implements Cell, HeapSize, SettableSequenceId,
292       Streamable {
293     private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
294         + (3 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT)
295         + (Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) + (3 * ClassSize.ARRAY));
296     private byte[] keyOnlyBuffer;
297     private short rowLength;
298     private int familyOffset;
299     private byte familyLength;
300     private int qualifierOffset;
301     private int qualifierLength;
302     private long timestamp;
303     private byte typeByte;
304     private byte[] valueBuffer;
305     private int valueOffset;
306     private int valueLength;
307     private byte[] tagsBuffer;
308     private int tagsOffset;
309     private int tagsLength;
310     private long seqId;
311 
312     protected OnheapDecodedCell(byte[] keyBuffer, short rowLength, int familyOffset,
313         byte familyLength, int qualOffset, int qualLength, long timeStamp, byte typeByte,
314         byte[] valueBuffer, int valueOffset, int valueLen, long seqId, byte[] tagsBuffer,
315         int tagsOffset, int tagsLength) {
316       this.keyOnlyBuffer = keyBuffer;
317       this.rowLength = rowLength;
318       this.familyOffset = familyOffset;
319       this.familyLength = familyLength;
320       this.qualifierOffset = qualOffset;
321       this.qualifierLength = qualLength;
322       this.timestamp = timeStamp;
323       this.typeByte = typeByte;
324       this.valueBuffer = valueBuffer;
325       this.valueOffset = valueOffset;
326       this.valueLength = valueLen;
327       this.tagsBuffer = tagsBuffer;
328       this.tagsOffset = tagsOffset;
329       this.tagsLength = tagsLength;
330       setSequenceId(seqId);
331     }
332 
333     @Override
334     public byte[] getRowArray() {
335       return keyOnlyBuffer;
336     }
337 
338     @Override
339     public byte[] getFamilyArray() {
340       return keyOnlyBuffer;
341     }
342 
343     @Override
344     public byte[] getQualifierArray() {
345       return keyOnlyBuffer;
346     }
347 
348     @Override
349     public int getRowOffset() {
350       return Bytes.SIZEOF_SHORT;
351     }
352 
353     @Override
354     public short getRowLength() {
355       return rowLength;
356     }
357 
358     @Override
359     public int getFamilyOffset() {
360       return familyOffset;
361     }
362 
363     @Override
364     public byte getFamilyLength() {
365       return familyLength;
366     }
367 
368     @Override
369     public int getQualifierOffset() {
370       return qualifierOffset;
371     }
372 
373     @Override
374     public int getQualifierLength() {
375       return qualifierLength;
376     }
377 
378     @Override
379     public long getTimestamp() {
380       return timestamp;
381     }
382 
383     @Override
384     public byte getTypeByte() {
385       return typeByte;
386     }
387 
388     @Override
389     public long getSequenceId() {
390       return seqId;
391     }
392 
393     @Override
394     public byte[] getValueArray() {
395       return this.valueBuffer;
396     }
397 
398     @Override
399     public int getValueOffset() {
400       return valueOffset;
401     }
402 
403     @Override
404     public int getValueLength() {
405       return valueLength;
406     }
407 
408     @Override
409     public byte[] getTagsArray() {
410       return this.tagsBuffer;
411     }
412 
413     @Override
414     public int getTagsOffset() {
415       return this.tagsOffset;
416     }
417 
418     @Override
419     public int getTagsLength() {
420       return tagsLength;
421     }
422 
423     @Override
424     public String toString() {
425       return KeyValue.keyToString(this.keyOnlyBuffer, 0, KeyValueUtil.keyLength(this)) + "/vlen="
426           + getValueLength() + "/seqid=" + seqId;
427     }
428 
429     @Override
430     public void setSequenceId(long seqId) {
431       this.seqId = seqId;
432     }
433 
434     @Override
435     public long heapSize() {
436       return FIXED_OVERHEAD + rowLength + familyLength + qualifierLength + valueLength + tagsLength;
437     }
438 
439     @Override
440     public int write(OutputStream out) throws IOException {
441       return write(out, true);
442     }
443 
444     @Override
445     public int write(OutputStream out, boolean withTags) throws IOException {
446       int lenToWrite = KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength,
447           tagsLength, withTags);
448       writeInt(out, lenToWrite);
449       writeInt(out, keyOnlyBuffer.length);
450       writeInt(out, valueLength);
451       // Write key
452       out.write(keyOnlyBuffer);
453       // Write value
454       out.write(this.valueBuffer, this.valueOffset, this.valueLength);
455       if (withTags) {
456         // 2 bytes tags length followed by tags bytes
457         // tags length is serialized with 2 bytes only(short way) even if the type is int.
458         // As this is non -ve numbers, we save the sign bit. See HBASE-11437
459         out.write((byte) (0xff & (this.tagsLength >> 8)));
460         out.write((byte) (0xff & this.tagsLength));
461         out.write(this.tagsBuffer, this.tagsOffset, this.tagsLength);
462       }
463       return lenToWrite + Bytes.SIZEOF_INT;
464     }
465   }
466 
467   protected static class OffheapDecodedCell extends ByteBufferedCell implements HeapSize,
468       SettableSequenceId, Streamable {
469     private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
470         + (3 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT)
471         + (Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) + (3 * ClassSize.BYTE_BUFFER));
472     private ByteBuffer keyBuffer;
473     private short rowLength;
474     private int familyOffset;
475     private byte familyLength;
476     private int qualifierOffset;
477     private int qualifierLength;
478     private long timestamp;
479     private byte typeByte;
480     private ByteBuffer valueBuffer;
481     private int valueOffset;
482     private int valueLength;
483     private ByteBuffer tagsBuffer;
484     private int tagsOffset;
485     private int tagsLength;
486     private long seqId;
487 
488     protected OffheapDecodedCell(ByteBuffer keyBuffer, short rowLength, int familyOffset,
489         byte familyLength, int qualOffset, int qualLength, long timeStamp, byte typeByte,
490         ByteBuffer valueBuffer, int valueOffset, int valueLen, long seqId, ByteBuffer tagsBuffer,
491         int tagsOffset, int tagsLength) {
492       // The keyBuffer is always onheap
493       assert keyBuffer.hasArray();
494       assert keyBuffer.arrayOffset() == 0;
495       this.keyBuffer = keyBuffer;
496       this.rowLength = rowLength;
497       this.familyOffset = familyOffset;
498       this.familyLength = familyLength;
499       this.qualifierOffset = qualOffset;
500       this.qualifierLength = qualLength;
501       this.timestamp = timeStamp;
502       this.typeByte = typeByte;
503       this.valueBuffer = valueBuffer;
504       this.valueOffset = valueOffset;
505       this.valueLength = valueLen;
506       this.tagsBuffer = tagsBuffer;
507       this.tagsOffset = tagsOffset;
508       this.tagsLength = tagsLength;
509       setSequenceId(seqId);
510     }
511 
512     @Override
513     public byte[] getRowArray() {
514       return this.keyBuffer.array();
515     }
516 
517     @Override
518     public int getRowOffset() {
519       return getRowPositionInByteBuffer();
520     }
521 
522     @Override
523     public short getRowLength() {
524       return this.rowLength;
525     }
526 
527     @Override
528     public byte[] getFamilyArray() {
529       return this.keyBuffer.array();
530     }
531 
532     @Override
533     public int getFamilyOffset() {
534       return getFamilyPositionInByteBuffer();
535     }
536 
537     @Override
538     public byte getFamilyLength() {
539       return this.familyLength;
540     }
541 
542     @Override
543     public byte[] getQualifierArray() {
544       return this.keyBuffer.array();
545     }
546 
547     @Override
548     public int getQualifierOffset() {
549       return getQualifierPositionInByteBuffer();
550     }
551 
552     @Override
553     public int getQualifierLength() {
554       return this.qualifierLength;
555     }
556 
557     @Override
558     public long getTimestamp() {
559       return this.timestamp;
560     }
561 
562     @Override
563     public byte getTypeByte() {
564       return this.typeByte;
565     }
566 
567     @Override
568     public long getSequenceId() {
569       return this.seqId;
570     }
571 
572     @Override
573     public byte[] getValueArray() {
574       return CellUtil.cloneValue(this);
575     }
576 
577     @Override
578     public int getValueOffset() {
579       return 0;
580     }
581 
582     @Override
583     public int getValueLength() {
584       return this.valueLength;
585     }
586 
587     @Override
588     public byte[] getTagsArray() {
589       return CellUtil.cloneTags(this);
590     }
591 
592     @Override
593     public int getTagsOffset() {
594       return 0;
595     }
596 
597     @Override
598     public int getTagsLength() {
599       return this.tagsLength;
600     }
601 
602     @Override
603     public ByteBuffer getRowByteBuffer() {
604       return this.keyBuffer;
605     }
606 
607     @Override
608     public int getRowPositionInByteBuffer() {
609       return Bytes.SIZEOF_SHORT;
610     }
611 
612     @Override
613     public ByteBuffer getFamilyByteBuffer() {
614       return this.keyBuffer;
615     }
616 
617     @Override
618     public int getFamilyPositionInByteBuffer() {
619       return this.familyOffset;
620     }
621 
622     @Override
623     public ByteBuffer getQualifierByteBuffer() {
624       return this.keyBuffer;
625     }
626 
627     @Override
628     public int getQualifierPositionInByteBuffer() {
629       return this.qualifierOffset;
630     }
631 
632     @Override
633     public ByteBuffer getValueByteBuffer() {
634       return this.valueBuffer;
635     }
636 
637     @Override
638     public int getValuePositionInByteBuffer() {
639       return this.valueOffset;
640     }
641 
642     @Override
643     public ByteBuffer getTagsByteBuffer() {
644       return this.tagsBuffer;
645     }
646 
647     @Override
648     public int getTagsPositionInByteBuffer() {
649       return this.tagsOffset;
650     }
651 
652     @Override
653     public long heapSize() {
654       return FIXED_OVERHEAD + rowLength + familyLength + qualifierLength + valueLength + tagsLength;
655     }
656 
657     @Override
658     public void setSequenceId(long seqId) {
659       this.seqId = seqId;
660     }
661 
662     @Override
663     public int write(OutputStream out) throws IOException {
664       return write(out, true);
665     }
666 
667     @Override
668     public int write(OutputStream out, boolean withTags) throws IOException {
669       int lenToWrite = KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength,
670           tagsLength, withTags);
671       writeInt(out, lenToWrite);
672       writeInt(out, keyBuffer.capacity());
673       writeInt(out, valueLength);
674       // Write key
675       out.write(keyBuffer.array());
676       // Write value
677       writeByteBuffer(out, this.valueBuffer, this.valueOffset, this.valueLength);
678       if (withTags) {
679         // 2 bytes tags length followed by tags bytes
680         // tags length is serialized with 2 bytes only(short way) even if the type is int.
681         // As this is non -ve numbers, we save the sign bit. See HBASE-11437
682         out.write((byte) (0xff & (this.tagsLength >> 8)));
683         out.write((byte) (0xff & this.tagsLength));
684         writeByteBuffer(out, this.tagsBuffer, this.tagsOffset, this.tagsLength);
685       }
686       return lenToWrite + Bytes.SIZEOF_INT;
687     }
688   }
689 
690   private static void writeInt(OutputStream out, int v) throws IOException {
691     // We have writeInt in ByteBufferOutputStream so that it can directly write int to underlying
692     // ByteBuffer in one step.
693     if (out instanceof ByteBufferOutputStream) {
694       ((ByteBufferOutputStream) out).writeInt(v);
695     } else {
696       StreamUtils.writeInt(out, v);
697     }
698   }
699 
700   private static void writeByteBuffer(OutputStream out, ByteBuffer b, int offset, int length)
701       throws IOException {
702     // We have write which takes ByteBuffer in ByteBufferOutputStream so that it can directly write
703     // bytes from the src ByteBuffer to the destination ByteBuffer. This avoid need for temp array
704     // creation and copy
705     if (out instanceof ByteBufferOutputStream) {
706       ((ByteBufferOutputStream) out).write(b, offset, length);
707     } else {
708       ByteBufferUtils.copyBufferToStream(out, b, offset, length);
709     }
710   }
711 
712   protected abstract static class
713       BufferedEncodedSeeker<STATE extends SeekerState>
714       implements EncodedSeeker {
715     protected HFileBlockDecodingContext decodingCtx;
716     protected final CellComparator comparator;
717     protected ByteBuff currentBuffer;
718     protected TagCompressionContext tagCompressionContext = null;
719     protected  KeyValue.KeyOnlyKeyValue keyOnlyKV = new KeyValue.KeyOnlyKeyValue();
720     // A temp pair object which will be reused by ByteBuff#asSubByteBuffer calls. This avoids too
721     // many object creations.
722     protected final Pair<ByteBuffer, Integer> tmpPair = new Pair<ByteBuffer, Integer>();
723     protected STATE current, previous;
724 
725     public BufferedEncodedSeeker(CellComparator comparator,
726         HFileBlockDecodingContext decodingCtx) {
727       this.comparator = comparator;
728       this.decodingCtx = decodingCtx;
729       if (decodingCtx.getHFileContext().isCompressTags()) {
730         try {
731           tagCompressionContext = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
732         } catch (Exception e) {
733           throw new RuntimeException("Failed to initialize TagCompressionContext", e);
734         }
735       }
736       current = createSeekerState(); // always valid
737       previous = createSeekerState(); // may not be valid
738     }
739 
740     protected boolean includesMvcc() {
741       return this.decodingCtx.getHFileContext().isIncludesMvcc();
742     }
743 
744     protected boolean includesTags() {
745       return this.decodingCtx.getHFileContext().isIncludesTags();
746     }
747 
748     @Override
749     public int compareKey(CellComparator comparator, Cell key) {
750       keyOnlyKV.setKey(current.keyBuffer, 0, current.keyLength);
751       return comparator.compareKeyIgnoresMvcc(key, keyOnlyKV);
752     }
753 
754     @Override
755     public void setCurrentBuffer(ByteBuff buffer) {
756       if (this.tagCompressionContext != null) {
757         this.tagCompressionContext.clear();
758       }
759       currentBuffer = buffer;
760       current.currentBuffer = currentBuffer;
761       if(tagCompressionContext != null) {
762         current.tagCompressionContext = tagCompressionContext;
763       }
764       decodeFirst();
765       current.setKey(current.keyBuffer, current.memstoreTS);
766       previous.invalidate();
767     }
768 
769     @Override
770     public Cell getKey() {
771       byte[] key = new byte[current.keyLength];
772       System.arraycopy(current.keyBuffer, 0, key, 0, current.keyLength);
773       return new KeyValue.KeyOnlyKeyValue(key);
774     }
775 
776     @Override
777     public ByteBuffer getValueShallowCopy() {
778       currentBuffer.asSubByteBuffer(current.valueOffset, current.valueLength, tmpPair);
779       ByteBuffer dup = tmpPair.getFirst().duplicate();
780       dup.position(tmpPair.getSecond());
781       dup.limit(tmpPair.getSecond() + current.valueLength);
782       return dup.slice();
783     }
784 
785     @Override
786     public ByteBuffer getKeyValueBuffer() {
787       ByteBuffer kvBuffer = createKVBuffer();
788       kvBuffer.putInt(current.keyLength);
789       kvBuffer.putInt(current.valueLength);
790       kvBuffer.put(current.keyBuffer, 0, current.keyLength);
791       currentBuffer.get(kvBuffer, current.valueOffset, current.valueLength);
792       if (current.tagsLength > 0) {
793         // Put short as unsigned
794         kvBuffer.put((byte) (current.tagsLength >> 8 & 0xff));
795         kvBuffer.put((byte) (current.tagsLength & 0xff));
796         if (current.tagsOffset != -1) {
797           // the offset of the tags bytes in the underlying buffer is marked. So the temp
798           // buffer,tagsBuffer was not been used.
799           currentBuffer.get(kvBuffer, current.tagsOffset, current.tagsLength);
800         } else {
801           // When tagsOffset is marked as -1, tag compression was present and so the tags were
802           // uncompressed into temp buffer, tagsBuffer. Let us copy it from there
803           kvBuffer.put(current.tagsBuffer, 0, current.tagsLength);
804         }
805       }
806       kvBuffer.rewind();
807       return kvBuffer;
808     }
809 
810     protected ByteBuffer createKVBuffer() {
811       int kvBufSize = (int) KeyValue.getKeyValueDataStructureSize(current.keyLength,
812           current.valueLength, current.tagsLength);
813       ByteBuffer kvBuffer = ByteBuffer.allocate(kvBufSize);
814       return kvBuffer;
815     }
816 
817     @Override
818     public Cell getCell() {
819       return current.toCell();
820     }
821 
822     @Override
823     public void rewind() {
824       currentBuffer.rewind();
825       if (tagCompressionContext != null) {
826         tagCompressionContext.clear();
827       }
828       decodeFirst();
829       current.setKey(current.keyBuffer, current.memstoreTS);
830       previous.invalidate();
831     }
832 
833     @Override
834     public boolean next() {
835       if (!currentBuffer.hasRemaining()) {
836         return false;
837       }
838       decodeNext();
839       current.setKey(current.keyBuffer, current.memstoreTS);
840       previous.invalidate();
841       return true;
842     }
843 
844     protected void decodeTags() {
845       current.tagsLength = ByteBuff.readCompressedInt(currentBuffer);
846       if (tagCompressionContext != null) {
847         if (current.uncompressTags) {
848           // Tag compression is been used. uncompress it into tagsBuffer
849           current.ensureSpaceForTags();
850           try {
851             current.tagsCompressedLength = tagCompressionContext.uncompressTags(currentBuffer,
852                 current.tagsBuffer, 0, current.tagsLength);
853           } catch (IOException e) {
854             throw new RuntimeException("Exception while uncompressing tags", e);
855           }
856         } else {
857           currentBuffer.skip(current.tagsCompressedLength);
858           current.uncompressTags = true;// Reset this.
859         }
860         current.tagsOffset = -1;
861       } else {
862         // When tag compress is not used, let us not do copying of tags bytes into tagsBuffer.
863         // Just mark the tags Offset so as to create the KV buffer later in getKeyValueBuffer()
864         current.tagsOffset = currentBuffer.position();
865         currentBuffer.skip(current.tagsLength);
866       }
867     }
868 
869     @Override
870     public int seekToKeyInBlock(Cell seekCell, boolean seekBefore) {
871       int rowCommonPrefix = 0;
872       int familyCommonPrefix = 0;
873       int qualCommonPrefix = 0;
874       previous.invalidate();
875       do {
876         int comp;
877         keyOnlyKV.setKey(current.keyBuffer, 0, current.keyLength);
878         if (current.lastCommonPrefix != 0) {
879           // The KV format has row key length also in the byte array. The
880           // common prefix
881           // includes it. So we need to subtract to find out the common prefix
882           // in the
883           // row part alone
884           rowCommonPrefix = Math.min(rowCommonPrefix, current.lastCommonPrefix - 2);
885         }
886         if (current.lastCommonPrefix <= 2) {
887           rowCommonPrefix = 0;
888         }
889         rowCommonPrefix += findCommonPrefixInRowPart(seekCell, keyOnlyKV, rowCommonPrefix);
890         comp = compareCommonRowPrefix(seekCell, keyOnlyKV, rowCommonPrefix);
891         if (comp == 0) {
892           comp = compareTypeBytes(seekCell, keyOnlyKV);
893           if (comp == 0) {
894             // Subtract the fixed row key length and the family key fixed length
895             familyCommonPrefix = Math.max(
896                 0,
897                 Math.min(familyCommonPrefix,
898                     current.lastCommonPrefix - (3 + keyOnlyKV.getRowLength())));
899             familyCommonPrefix += findCommonPrefixInFamilyPart(seekCell, keyOnlyKV,
900                 familyCommonPrefix);
901             comp = compareCommonFamilyPrefix(seekCell, keyOnlyKV, familyCommonPrefix);
902             if (comp == 0) {
903               // subtract the rowkey fixed length and the family key fixed
904               // length
905               qualCommonPrefix = Math.max(
906                   0,
907                   Math.min(
908                       qualCommonPrefix,
909                       current.lastCommonPrefix
910                           - (3 + keyOnlyKV.getRowLength() + keyOnlyKV.getFamilyLength())));
911               qualCommonPrefix += findCommonPrefixInQualifierPart(seekCell, keyOnlyKV,
912                   qualCommonPrefix);
913               comp = compareCommonQualifierPrefix(seekCell, keyOnlyKV, qualCommonPrefix);
914               if (comp == 0) {
915                 comp = CellComparator.compareTimestamps(seekCell, keyOnlyKV);
916                 if (comp == 0) {
917                   // Compare types. Let the delete types sort ahead of puts;
918                   // i.e. types
919                   // of higher numbers sort before those of lesser numbers.
920                   // Maximum
921                   // (255)
922                   // appears ahead of everything, and minimum (0) appears
923                   // after
924                   // everything.
925                   comp = (0xff & keyOnlyKV.getTypeByte()) - (0xff & seekCell.getTypeByte());
926                 }
927               }
928             }
929           }
930         }
931         if (comp == 0) { // exact match
932           if (seekBefore) {
933             if (!previous.isValid()) {
934               // The caller (seekBefore) has to ensure that we are not at the
935               // first key in the block.
936               throw new IllegalStateException("Cannot seekBefore if "
937                   + "positioned at the first key in the block: key="
938                   + Bytes.toStringBinary(seekCell.getRowArray()));
939             }
940             moveToPrevious();
941             return 1;
942           }
943           return 0;
944         }
945 
946         if (comp < 0) { // already too large, check previous
947           if (previous.isValid()) {
948             moveToPrevious();
949           } else {
950             return HConstants.INDEX_KEY_MAGIC; // using optimized index key
951           }
952           return 1;
953         }
954 
955         // move to next, if more data is available
956         if (currentBuffer.hasRemaining()) {
957           previous.copyFromNext(current);
958           decodeNext();
959           current.setKey(current.keyBuffer, current.memstoreTS);
960         } else {
961           break;
962         }
963       } while (true);
964 
965       // we hit the end of the block, not an exact match
966       return 1;
967     }
968 
969     private int compareTypeBytes(Cell key, Cell right) {
970       if (key.getFamilyLength() + key.getQualifierLength() == 0
971           && key.getTypeByte() == Type.Minimum.getCode()) {
972         // left is "bigger", i.e. it appears later in the sorted order
973         return 1;
974       }
975       if (right.getFamilyLength() + right.getQualifierLength() == 0
976           && right.getTypeByte() == Type.Minimum.getCode()) {
977         return -1;
978       }
979       return 0;
980     }
981 
982     private static int findCommonPrefixInRowPart(Cell left, Cell right, int rowCommonPrefix) {
983       return Bytes.findCommonPrefix(left.getRowArray(), right.getRowArray(), left.getRowLength()
984           - rowCommonPrefix, right.getRowLength() - rowCommonPrefix, left.getRowOffset()
985           + rowCommonPrefix, right.getRowOffset() + rowCommonPrefix);
986     }
987 
988     private static int findCommonPrefixInFamilyPart(Cell left, Cell right, int familyCommonPrefix) {
989       return Bytes
990           .findCommonPrefix(left.getFamilyArray(), right.getFamilyArray(), left.getFamilyLength()
991               - familyCommonPrefix, right.getFamilyLength() - familyCommonPrefix,
992               left.getFamilyOffset() + familyCommonPrefix, right.getFamilyOffset()
993                   + familyCommonPrefix);
994     }
995 
996     private static int findCommonPrefixInQualifierPart(Cell left, Cell right,
997         int qualifierCommonPrefix) {
998       return Bytes.findCommonPrefix(left.getQualifierArray(), right.getQualifierArray(),
999           left.getQualifierLength() - qualifierCommonPrefix, right.getQualifierLength()
1000               - qualifierCommonPrefix, left.getQualifierOffset() + qualifierCommonPrefix,
1001           right.getQualifierOffset() + qualifierCommonPrefix);
1002     }
1003 
1004     private void moveToPrevious() {
1005       if (!previous.isValid()) {
1006         throw new IllegalStateException(
1007             "Can move back only once and not in first key in the block.");
1008       }
1009 
1010       STATE tmp = previous;
1011       previous = current;
1012       current = tmp;
1013 
1014       // move after last key value
1015       currentBuffer.position(current.nextKvOffset);
1016       // Already decoded the tag bytes. We cache this tags into current state and also the total
1017       // compressed length of the tags bytes. For the next time decodeNext() we don't need to decode
1018       // the tags again. This might pollute the Data Dictionary what we use for the compression.
1019       // When current.uncompressTags is false, we will just reuse the current.tagsBuffer and skip
1020       // 'tagsCompressedLength' bytes of source stream.
1021       // See in decodeTags()
1022       current.tagsBuffer = previous.tagsBuffer;
1023       current.tagsCompressedLength = previous.tagsCompressedLength;
1024       current.uncompressTags = false;
1025       previous.invalidate();
1026     }
1027 
1028     @SuppressWarnings("unchecked")
1029     protected STATE createSeekerState() {
1030       // This will fail for non-default seeker state if the subclass does not
1031       // override this method.
1032       return (STATE) new SeekerState(this.tmpPair, this.includesTags());
1033     }
1034 
1035     abstract protected void decodeFirst();
1036     abstract protected void decodeNext();
1037   }
1038 
1039   /**
1040    * @param cell
1041    * @param out
1042    * @param encodingCtx
1043    * @return unencoded size added
1044    * @throws IOException
1045    */
1046   protected final int afterEncodingKeyValue(Cell cell, DataOutputStream out,
1047       HFileBlockDefaultEncodingContext encodingCtx) throws IOException {
1048     int size = 0;
1049     if (encodingCtx.getHFileContext().isIncludesTags()) {
1050       int tagsLength = cell.getTagsLength();
1051       ByteBufferUtils.putCompressedInt(out, tagsLength);
1052       // There are some tags to be written
1053       if (tagsLength > 0) {
1054         TagCompressionContext tagCompressionContext = encodingCtx.getTagCompressionContext();
1055         // When tag compression is enabled, tagCompressionContext will have a not null value. Write
1056         // the tags using Dictionary compression in such a case
1057         if (tagCompressionContext != null) {
1058           tagCompressionContext
1059               .compressTags(out, cell.getTagsArray(), cell.getTagsOffset(), tagsLength);
1060         } else {
1061           out.write(cell.getTagsArray(), cell.getTagsOffset(), tagsLength);
1062         }
1063       }
1064       size += tagsLength + KeyValue.TAGS_LENGTH_SIZE;
1065     }
1066     if (encodingCtx.getHFileContext().isIncludesMvcc()) {
1067       // Copy memstore timestamp from the byte buffer to the output stream.
1068       long memstoreTS = cell.getSequenceId();
1069       WritableUtils.writeVLong(out, memstoreTS);
1070       // TODO use a writeVLong which returns the #bytes written so that 2 time parsing can be
1071       // avoided.
1072       size += WritableUtils.getVIntSize(memstoreTS);
1073     }
1074     return size;
1075   }
1076 
1077   protected final void afterDecodingKeyValue(DataInputStream source,
1078       ByteBuffer dest, HFileBlockDefaultDecodingContext decodingCtx) throws IOException {
1079     if (decodingCtx.getHFileContext().isIncludesTags()) {
1080       int tagsLength = ByteBufferUtils.readCompressedInt(source);
1081       // Put as unsigned short
1082       dest.put((byte) ((tagsLength >> 8) & 0xff));
1083       dest.put((byte) (tagsLength & 0xff));
1084       if (tagsLength > 0) {
1085         TagCompressionContext tagCompressionContext = decodingCtx.getTagCompressionContext();
1086         // When tag compression is been used in this file, tagCompressionContext will have a not
1087         // null value passed.
1088         if (tagCompressionContext != null) {
1089           tagCompressionContext.uncompressTags(source, dest, tagsLength);
1090         } else {
1091           ByteBufferUtils.copyFromStreamToBuffer(dest, source, tagsLength);
1092         }
1093       }
1094     }
1095     if (decodingCtx.getHFileContext().isIncludesMvcc()) {
1096       long memstoreTS = -1;
1097       try {
1098         // Copy memstore timestamp from the data input stream to the byte
1099         // buffer.
1100         memstoreTS = WritableUtils.readVLong(source);
1101         ByteBufferUtils.writeVLong(dest, memstoreTS);
1102       } catch (IOException ex) {
1103         throw new RuntimeException("Unable to copy memstore timestamp " +
1104             memstoreTS + " after decoding a key/value");
1105       }
1106     }
1107   }
1108 
1109   @Override
1110   public HFileBlockEncodingContext newDataBlockEncodingContext(DataBlockEncoding encoding,
1111       byte[] header, HFileContext meta) {
1112     return new HFileBlockDefaultEncodingContext(encoding, header, meta);
1113   }
1114 
1115   @Override
1116   public HFileBlockDecodingContext newDataBlockDecodingContext(HFileContext meta) {
1117     return new HFileBlockDefaultDecodingContext(meta);
1118   }
1119 
1120   protected abstract ByteBuffer internalDecodeKeyValues(DataInputStream source,
1121       int allocateHeaderLength, int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx)
1122       throws IOException;
1123 
1124   /**
1125    * Asserts that there is at least the given amount of unfilled space
1126    * remaining in the given buffer.
1127    * @param out typically, the buffer we are writing to
1128    * @param length the required space in the buffer
1129    * @throws EncoderBufferTooSmallException If there are no enough bytes.
1130    */
1131   protected static void ensureSpace(ByteBuffer out, int length)
1132       throws EncoderBufferTooSmallException {
1133     if (out.position() + length > out.limit()) {
1134       throw new EncoderBufferTooSmallException(
1135           "Buffer position=" + out.position() +
1136           ", buffer limit=" + out.limit() +
1137           ", length to be written=" + length);
1138     }
1139   }
1140 
1141   @Override
1142   public void startBlockEncoding(HFileBlockEncodingContext blkEncodingCtx, DataOutputStream out)
1143       throws IOException {
1144     if (blkEncodingCtx.getClass() != HFileBlockDefaultEncodingContext.class) {
1145       throw new IOException (this.getClass().getName() + " only accepts "
1146           + HFileBlockDefaultEncodingContext.class.getName() + " as the " +
1147           "encoding context.");
1148     }
1149 
1150     HFileBlockDefaultEncodingContext encodingCtx =
1151         (HFileBlockDefaultEncodingContext) blkEncodingCtx;
1152     encodingCtx.prepareEncoding(out);
1153     if (encodingCtx.getHFileContext().isIncludesTags()
1154         && encodingCtx.getHFileContext().isCompressTags()) {
1155       if (encodingCtx.getTagCompressionContext() != null) {
1156         // It will be overhead to create the TagCompressionContext again and again for every block
1157         // encoding.
1158         encodingCtx.getTagCompressionContext().clear();
1159       } else {
1160         try {
1161           TagCompressionContext tagCompressionContext = new TagCompressionContext(
1162               LRUDictionary.class, Byte.MAX_VALUE);
1163           encodingCtx.setTagCompressionContext(tagCompressionContext);
1164         } catch (Exception e) {
1165           throw new IOException("Failed to initialize TagCompressionContext", e);
1166         }
1167       }
1168     }
1169     ByteBufferUtils.putInt(out, 0); // DUMMY length. This will be updated in endBlockEncoding()
1170     blkEncodingCtx.setEncodingState(new BufferedDataBlockEncodingState());
1171   }
1172 
1173   private static class BufferedDataBlockEncodingState extends EncodingState {
1174     int unencodedDataSizeWritten = 0;
1175   }
1176 
1177   @Override
1178   public int encode(Cell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
1179       throws IOException {
1180     BufferedDataBlockEncodingState state = (BufferedDataBlockEncodingState) encodingCtx
1181         .getEncodingState();
1182     int encodedKvSize = internalEncode(cell, (HFileBlockDefaultEncodingContext) encodingCtx, out);
1183     state.unencodedDataSizeWritten += encodedKvSize;
1184     return encodedKvSize;
1185   }
1186 
1187   public abstract int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingCtx,
1188       DataOutputStream out) throws IOException;
1189 
1190   @Override
1191   public void endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out,
1192       byte[] uncompressedBytesWithHeader) throws IOException {
1193     BufferedDataBlockEncodingState state = (BufferedDataBlockEncodingState) encodingCtx
1194         .getEncodingState();
1195     // Write the unencodedDataSizeWritten (with header size)
1196     Bytes.putInt(uncompressedBytesWithHeader, HConstants.HFILEBLOCK_HEADER_SIZE
1197         + DataBlockEncoding.ID_SIZE, state.unencodedDataSizeWritten
1198         );
1199     if (encodingCtx.getDataBlockEncoding() != DataBlockEncoding.NONE) {
1200       encodingCtx.postEncoding(BlockType.ENCODED_DATA);
1201     } else {
1202       encodingCtx.postEncoding(BlockType.DATA);
1203     }
1204   }
1205 
1206   protected Cell createFirstKeyCell(ByteBuffer key, int keyLength) {
1207     if (key.hasArray()) {
1208       return new KeyValue.KeyOnlyKeyValue(key.array(), key.arrayOffset() + key.position(),
1209           keyLength);
1210     } else {
1211       return new OffheapKeyOnlyKeyValue(key, key.position(), keyLength);
1212     }
1213   }
1214 }