View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations
15   * under the License.
16   */
17  package org.apache.hadoop.hbase.io.encoding;
18  
19  import java.io.DataInputStream;
20  import java.io.DataOutputStream;
21  import java.io.IOException;
22  import java.io.OutputStream;
23  import java.nio.ByteBuffer;
24  
25  import org.apache.hadoop.hbase.Cell;
26  import org.apache.hadoop.hbase.CellComparator;
27  import org.apache.hadoop.hbase.CellUtil;
28  import org.apache.hadoop.hbase.HConstants;
29  import org.apache.hadoop.hbase.KeyValue;
30  import org.apache.hadoop.hbase.KeyValue.Type;
31  import org.apache.hadoop.hbase.KeyValueUtil;
32  import org.apache.hadoop.hbase.Streamable;
33  import org.apache.hadoop.hbase.SettableSequenceId;
34  import org.apache.hadoop.hbase.classification.InterfaceAudience;
35  import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
36  import org.apache.hadoop.hbase.io.HeapSize;
37  import org.apache.hadoop.hbase.io.TagCompressionContext;
38  import org.apache.hadoop.hbase.io.hfile.BlockType;
39  import org.apache.hadoop.hbase.io.hfile.HFileContext;
40  import org.apache.hadoop.hbase.io.util.LRUDictionary;
41  import org.apache.hadoop.hbase.io.util.StreamUtils;
42  import org.apache.hadoop.hbase.util.ByteBufferUtils;
43  import org.apache.hadoop.hbase.util.Bytes;
44  import org.apache.hadoop.hbase.util.ClassSize;
45  import org.apache.hadoop.io.WritableUtils;
46  
47  /**
48   * Base class for all data block encoders that use a buffer.
49   */
50  @InterfaceAudience.Private
51  abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
52  
53    private static int INITIAL_KEY_BUFFER_SIZE = 512;
54  
55    @Override
56    public ByteBuffer decodeKeyValues(DataInputStream source,
57        HFileBlockDecodingContext blkDecodingCtx) throws IOException {
58      if (blkDecodingCtx.getClass() != HFileBlockDefaultDecodingContext.class) {
59        throw new IOException(this.getClass().getName() + " only accepts "
60            + HFileBlockDefaultDecodingContext.class.getName() + " as the decoding context.");
61      }
62  
63      HFileBlockDefaultDecodingContext decodingCtx =
64          (HFileBlockDefaultDecodingContext) blkDecodingCtx;
65      if (decodingCtx.getHFileContext().isIncludesTags()
66          && decodingCtx.getHFileContext().isCompressTags()) {
67        if (decodingCtx.getTagCompressionContext() != null) {
68          // It will be overhead to create the TagCompressionContext again and again for every block
69          // decoding.
70          decodingCtx.getTagCompressionContext().clear();
71        } else {
72          try {
73            TagCompressionContext tagCompressionContext = new TagCompressionContext(
74                LRUDictionary.class, Byte.MAX_VALUE);
75            decodingCtx.setTagCompressionContext(tagCompressionContext);
76          } catch (Exception e) {
77            throw new IOException("Failed to initialize TagCompressionContext", e);
78          }
79        }
80      }
81      return internalDecodeKeyValues(source, 0, 0, decodingCtx);
82    }
83  
84    /********************* common prefixes *************************/
85    // Having this as static is fine but if META is having DBE then we should
86    // change this.
87    public static int compareCommonRowPrefix(Cell left, Cell right, int rowCommonPrefix) {
88      return Bytes.compareTo(left.getRowArray(), left.getRowOffset() + rowCommonPrefix,
89          left.getRowLength() - rowCommonPrefix, right.getRowArray(), right.getRowOffset()
90              + rowCommonPrefix, right.getRowLength() - rowCommonPrefix);
91    }
92  
93    public static int compareCommonFamilyPrefix(Cell left, Cell right, int familyCommonPrefix) {
94      return Bytes.compareTo(left.getFamilyArray(), left.getFamilyOffset() + familyCommonPrefix,
95          left.getFamilyLength() - familyCommonPrefix, right.getFamilyArray(),
96          right.getFamilyOffset() + familyCommonPrefix, right.getFamilyLength() - familyCommonPrefix);
97    }
98  
99    public static int compareCommonQualifierPrefix(Cell left, Cell right, int qualCommonPrefix) {
100     return Bytes.compareTo(left.getQualifierArray(), left.getQualifierOffset() + qualCommonPrefix,
101         left.getQualifierLength() - qualCommonPrefix, right.getQualifierArray(),
102         right.getQualifierOffset() + qualCommonPrefix, right.getQualifierLength()
103             - qualCommonPrefix);
104   }
105 
106   protected static class SeekerState implements Cell {
107     protected ByteBuffer currentBuffer;
108     protected TagCompressionContext tagCompressionContext;
109     protected int valueOffset = -1;
110     protected int keyLength;
111     protected int valueLength;
112     protected int lastCommonPrefix;
113     protected int tagsLength = 0;
114     protected int tagsOffset = -1;
115     protected int tagsCompressedLength = 0;
116     protected boolean uncompressTags = true;
117 
118     /** We need to store a copy of the key. */
119     protected byte[] keyBuffer = new byte[INITIAL_KEY_BUFFER_SIZE];
120     protected byte[] tagsBuffer = new byte[INITIAL_KEY_BUFFER_SIZE];
121 
122     protected long memstoreTS;
123     protected int nextKvOffset;
124     protected KeyValue.KeyOnlyKeyValue currentKey = new KeyValue.KeyOnlyKeyValue();
125 
126     protected boolean isValid() {
127       return valueOffset != -1;
128     }
129 
130     protected void invalidate() {
131       valueOffset = -1;
132       tagsCompressedLength = 0;
133       currentKey = new KeyValue.KeyOnlyKeyValue();
134       uncompressTags = true;
135       currentBuffer = null;
136     }
137 
138     protected void ensureSpaceForKey() {
139       if (keyLength > keyBuffer.length) {
140         // rare case, but we need to handle arbitrary length of key
141         int newKeyBufferLength = Math.max(keyBuffer.length, 1) * 2;
142         while (keyLength > newKeyBufferLength) {
143           newKeyBufferLength *= 2;
144         }
145         byte[] newKeyBuffer = new byte[newKeyBufferLength];
146         System.arraycopy(keyBuffer, 0, newKeyBuffer, 0, keyBuffer.length);
147         keyBuffer = newKeyBuffer;
148       }
149     }
150 
151     protected void ensureSpaceForTags() {
152       if (tagsLength > tagsBuffer.length) {
153         // rare case, but we need to handle arbitrary length of tags
154         int newTagsBufferLength = Math.max(tagsBuffer.length, 1) * 2;
155         while (tagsLength > newTagsBufferLength) {
156           newTagsBufferLength *= 2;
157         }
158         byte[] newTagsBuffer = new byte[newTagsBufferLength];
159         System.arraycopy(tagsBuffer, 0, newTagsBuffer, 0, tagsBuffer.length);
160         tagsBuffer = newTagsBuffer;
161       }
162     }
163 
164     protected void setKey(byte[] keyBuffer, long memTS) {
165       currentKey.setKey(keyBuffer, 0, keyLength);
166       memstoreTS = memTS;
167     }
168 
169     /**
170      * Copy the state from the next one into this instance (the previous state
171      * placeholder). Used to save the previous state when we are advancing the
172      * seeker to the next key/value.
173      */
174     protected void copyFromNext(SeekerState nextState) {
175       if (keyBuffer.length != nextState.keyBuffer.length) {
176         keyBuffer = nextState.keyBuffer.clone();
177       } else if (!isValid()) {
178         // Note: we can only call isValid before we override our state, so this
179         // comes before all the assignments at the end of this method.
180         System.arraycopy(nextState.keyBuffer, 0, keyBuffer, 0,
181              nextState.keyLength);
182       } else {
183         // don't copy the common prefix between this key and the previous one
184         System.arraycopy(nextState.keyBuffer, nextState.lastCommonPrefix,
185             keyBuffer, nextState.lastCommonPrefix, nextState.keyLength
186                 - nextState.lastCommonPrefix);
187       }
188       currentKey = nextState.currentKey;
189 
190       valueOffset = nextState.valueOffset;
191       keyLength = nextState.keyLength;
192       valueLength = nextState.valueLength;
193       lastCommonPrefix = nextState.lastCommonPrefix;
194       nextKvOffset = nextState.nextKvOffset;
195       memstoreTS = nextState.memstoreTS;
196       currentBuffer = nextState.currentBuffer;
197       tagsOffset = nextState.tagsOffset;
198       tagsLength = nextState.tagsLength;
199       if (nextState.tagCompressionContext != null) {
200         tagCompressionContext = nextState.tagCompressionContext;
201       }
202     }
203 
204     @Override
205     public byte[] getRowArray() {
206       return currentKey.getRowArray();
207     }
208 
209     @Override
210     public int getRowOffset() {
211       return Bytes.SIZEOF_SHORT;
212     }
213 
214     @Override
215     public short getRowLength() {
216       return currentKey.getRowLength();
217     }
218 
219     @Override
220     public byte[] getFamilyArray() {
221       return currentKey.getFamilyArray();
222     }
223 
224     @Override
225     public int getFamilyOffset() {
226       return currentKey.getFamilyOffset();
227     }
228 
229     @Override
230     public byte getFamilyLength() {
231       return currentKey.getFamilyLength();
232     }
233 
234     @Override
235     public byte[] getQualifierArray() {
236       return currentKey.getQualifierArray();
237     }
238 
239     @Override
240     public int getQualifierOffset() {
241       return currentKey.getQualifierOffset();
242     }
243 
244     @Override
245     public int getQualifierLength() {
246       return currentKey.getQualifierLength();
247     }
248 
249     @Override
250     public long getTimestamp() {
251       return currentKey.getTimestamp();
252     }
253 
254     @Override
255     public byte getTypeByte() {
256       return currentKey.getTypeByte();
257     }
258 
259     @Override
260     public long getMvccVersion() {
261       return memstoreTS;
262     }
263 
264     @Override
265     public long getSequenceId() {
266       return memstoreTS;
267     }
268 
269     @Override
270     public byte[] getValueArray() {
271       return currentBuffer.array();
272     }
273 
274     @Override
275     public int getValueOffset() {
276       return currentBuffer.arrayOffset() + valueOffset;
277     }
278 
279     @Override
280     public int getValueLength() {
281       return valueLength;
282     }
283 
284     @Override
285     public byte[] getTagsArray() {
286       if (tagCompressionContext != null) {
287         return tagsBuffer;
288       }
289       return currentBuffer.array();
290     }
291 
292     @Override
293     public int getTagsOffset() {
294       if (tagCompressionContext != null) {
295         return 0;
296       }
297       return currentBuffer.arrayOffset() + tagsOffset;
298     }
299 
300     @Override
301     public int getTagsLength() {
302       return tagsLength;
303     }
304 
305     @Override
306     @Deprecated
307     public byte[] getValue() {
308       throw new UnsupportedOperationException("getValue() not supported");
309     }
310 
311     @Override
312     @Deprecated
313     public byte[] getFamily() {
314       throw new UnsupportedOperationException("getFamily() not supported");
315     }
316 
317     @Override
318     @Deprecated
319     public byte[] getQualifier() {
320       throw new UnsupportedOperationException("getQualifier() not supported");
321     }
322 
323     @Override
324     @Deprecated
325     public byte[] getRow() {
326       throw new UnsupportedOperationException("getRow() not supported");
327     }
328 
329     @Override
330     public String toString() {
331       return KeyValue.keyToString(this.keyBuffer, 0, KeyValueUtil.keyLength(this)) + "/vlen="
332           + getValueLength() + "/seqid=" + memstoreTS;
333     }
334 
335     public Cell shallowCopy() {
336       return new ClonedSeekerState(currentBuffer, keyBuffer, currentKey.getRowLength(),
337           currentKey.getFamilyOffset(), currentKey.getFamilyLength(), keyLength, 
338           currentKey.getQualifierOffset(), currentKey.getQualifierLength(),
339           currentKey.getTimestamp(), currentKey.getTypeByte(), valueLength, valueOffset,
340           memstoreTS, tagsOffset, tagsLength, tagCompressionContext, tagsBuffer);
341     }
342   }
343 
344   /**
345    * Copies only the key part of the keybuffer by doing a deep copy and passes the 
346    * seeker state members for taking a clone.
347    * Note that the value byte[] part is still pointing to the currentBuffer and the 
348    * represented by the valueOffset and valueLength
349    */
350   // We return this as a Cell to the upper layers of read flow and might try setting a new SeqId
351   // there. So this has to be an instance of SettableSequenceId. SeekerState need not be
352   // SettableSequenceId as we never return that to top layers. When we have to, we make
353   // ClonedSeekerState from it.
354   protected static class ClonedSeekerState implements Cell, HeapSize, SettableSequenceId,
355       Streamable {
356     private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
357         + (4 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT)
358         + (Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) + (2 * ClassSize.ARRAY));
359     private byte[] keyOnlyBuffer;
360     private ByteBuffer currentBuffer;
361     private short rowLength;
362     private int familyOffset;
363     private byte familyLength;
364     private int qualifierOffset;
365     private int qualifierLength;
366     private long timestamp;
367     private byte typeByte;
368     private int valueOffset;
369     private int valueLength;
370     private int tagsLength;
371     private int tagsOffset;
372     private byte[] cloneTagsBuffer;
373     private long seqId;
374     private TagCompressionContext tagCompressionContext;
375     
376     protected ClonedSeekerState(ByteBuffer currentBuffer, byte[] keyBuffer, short rowLength,
377         int familyOffset, byte familyLength, int keyLength, int qualOffset, int qualLength,
378         long timeStamp, byte typeByte, int valueLen, int valueOffset, long seqId,
379         int tagsOffset, int tagsLength, TagCompressionContext tagCompressionContext,
380         byte[] tagsBuffer) {
381       this.currentBuffer = currentBuffer;
382       keyOnlyBuffer = new byte[keyLength];
383       this.tagCompressionContext = tagCompressionContext;
384       this.rowLength = rowLength;
385       this.familyOffset = familyOffset;
386       this.familyLength = familyLength;
387       this.qualifierOffset = qualOffset;
388       this.qualifierLength = qualLength;
389       this.timestamp = timeStamp;
390       this.typeByte = typeByte;
391       this.valueLength = valueLen;
392       this.valueOffset = valueOffset;
393       this.tagsOffset = tagsOffset;
394       this.tagsLength = tagsLength;
395       System.arraycopy(keyBuffer, 0, keyOnlyBuffer, 0, keyLength);
396       if (tagCompressionContext != null) {
397         this.cloneTagsBuffer = new byte[tagsLength];
398         System.arraycopy(tagsBuffer, 0, this.cloneTagsBuffer, 0, tagsLength);
399       }
400       setSequenceId(seqId);
401     }
402 
403     @Override
404     public byte[] getRowArray() {
405       return keyOnlyBuffer;
406     }
407 
408     @Override
409     public byte[] getFamilyArray() {
410       return keyOnlyBuffer;
411     }
412 
413     @Override
414     public byte[] getQualifierArray() {
415       return keyOnlyBuffer;
416     }
417 
418     @Override
419     public int getRowOffset() {
420       return Bytes.SIZEOF_SHORT;
421     }
422 
423     @Override
424     public short getRowLength() {
425       return rowLength;
426     }
427 
428     @Override
429     public int getFamilyOffset() {
430       return familyOffset;
431     }
432 
433     @Override
434     public byte getFamilyLength() {
435       return familyLength;
436     }
437 
438     @Override
439     public int getQualifierOffset() {
440       return qualifierOffset;
441     }
442 
443     @Override
444     public int getQualifierLength() {
445       return qualifierLength;
446     }
447 
448     @Override
449     public long getTimestamp() {
450       return timestamp;
451     }
452 
453     @Override
454     public byte getTypeByte() {
455       return typeByte;
456     }
457 
458     @Override
459     @Deprecated
460     public long getMvccVersion() {
461       return getSequenceId();
462     }
463 
464     @Override
465     public long getSequenceId() {
466       return seqId;
467     }
468 
469     @Override
470     public byte[] getValueArray() {
471       return currentBuffer.array();
472     }
473 
474     @Override
475     public int getValueOffset() {
476       return currentBuffer.arrayOffset() + valueOffset;
477     }
478 
479     @Override
480     public int getValueLength() {
481       return valueLength;
482     }
483 
484     @Override
485     public byte[] getTagsArray() {
486       if (tagCompressionContext != null) {
487         return cloneTagsBuffer;
488       }
489       return currentBuffer.array();
490     }
491 
492     @Override
493     public int getTagsOffset() {
494       if (tagCompressionContext != null) {
495         return 0;
496       }
497       return currentBuffer.arrayOffset() + tagsOffset;
498     }
499 
500     @Override
501     public int getTagsLength() {
502       return tagsLength;
503     }
504 
505     @Override
506     @Deprecated
507     public byte[] getValue() {
508       return CellUtil.cloneValue(this);
509     }
510 
511     @Override
512     @Deprecated
513     public byte[] getFamily() {
514       return CellUtil.cloneFamily(this);
515     }
516 
517     @Override
518     @Deprecated
519     public byte[] getQualifier() {
520       return CellUtil.cloneQualifier(this);
521     }
522 
523     @Override
524     @Deprecated
525     public byte[] getRow() {
526       return CellUtil.cloneRow(this);
527     }
528 
529     @Override
530     public String toString() {
531       return KeyValue.keyToString(this.keyOnlyBuffer, 0, KeyValueUtil.keyLength(this)) + "/vlen="
532           + getValueLength() + "/seqid=" + seqId;
533     }
534 
535     @Override
536     public void setSequenceId(long seqId) {
537       this.seqId = seqId;
538     }
539 
540     @Override
541     public long heapSize() {
542       return FIXED_OVERHEAD + rowLength + familyLength + qualifierLength + valueLength + tagsLength;
543     }
544 
545     @Override
546     public int write(OutputStream out) throws IOException {
547       return write(out, true);
548     }
549 
550     @Override
551     public int write(OutputStream out, boolean withTags) throws IOException {
552       int lenToWrite = KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength,
553           tagsLength, withTags);
554       writeInt(out, lenToWrite);
555       writeInt(out, keyOnlyBuffer.length);
556       writeInt(out, valueLength);
557       // Write key
558       out.write(keyOnlyBuffer);
559       // Write value
560       assert this.currentBuffer.hasArray();
561       out.write(this.currentBuffer.array(), this.currentBuffer.arrayOffset() + this.valueOffset,
562           this.valueLength);
563       if (withTags) {
564         // 2 bytes tags length followed by tags bytes
565         // tags length is serialized with 2 bytes only(short way) even if the type is int.
566         // As this is non -ve numbers, we save the sign bit. See HBASE-11437
567         out.write((byte) (0xff & (this.tagsLength >> 8)));
568         out.write((byte) (0xff & this.tagsLength));
569         if (this.tagCompressionContext != null) {
570           out.write(cloneTagsBuffer);
571         } else {
572           out.write(this.currentBuffer.array(), this.currentBuffer.arrayOffset() + this.tagsOffset,
573               this.tagsLength);
574         }
575       }
576       return lenToWrite + Bytes.SIZEOF_INT;
577     }
578   }
579 
580   private static void writeInt(OutputStream out, int v) throws IOException {
581     // We have writeInt in ByteBufferOutputStream so that it can directly write int to underlying
582     // ByteBuffer in one step.
583     if (out instanceof ByteBufferOutputStream) {
584       ((ByteBufferOutputStream) out).writeInt(v);
585     } else {
586       StreamUtils.writeInt(out, v);
587     }
588   }
589 
590   protected abstract static class
591       BufferedEncodedSeeker<STATE extends SeekerState>
592       implements EncodedSeeker {
593     protected HFileBlockDecodingContext decodingCtx;
594     protected final CellComparator comparator;
595     protected ByteBuffer currentBuffer;
596     protected STATE current = createSeekerState(); // always valid
597     protected STATE previous = createSeekerState(); // may not be valid
598     protected TagCompressionContext tagCompressionContext = null;
599     protected  KeyValue.KeyOnlyKeyValue keyOnlyKV = new KeyValue.KeyOnlyKeyValue();
600 
601     public BufferedEncodedSeeker(CellComparator comparator,
602         HFileBlockDecodingContext decodingCtx) {
603       this.comparator = comparator;
604       this.decodingCtx = decodingCtx;
605       if (decodingCtx.getHFileContext().isCompressTags()) {
606         try {
607           tagCompressionContext = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
608         } catch (Exception e) {
609           throw new RuntimeException("Failed to initialize TagCompressionContext", e);
610         }
611       }
612     }
613 
614     protected boolean includesMvcc() {
615       return this.decodingCtx.getHFileContext().isIncludesMvcc();
616     }
617 
618     protected boolean includesTags() {
619       return this.decodingCtx.getHFileContext().isIncludesTags();
620     }
621 
622     @Override
623     public int compareKey(CellComparator comparator, Cell key) {
624       keyOnlyKV.setKey(current.keyBuffer, 0, current.keyLength);
625       return comparator.compareKeyIgnoresMvcc(key, keyOnlyKV);
626     }
627 
628     @Override
629     public void setCurrentBuffer(ByteBuffer buffer) {
630       if (this.tagCompressionContext != null) {
631         this.tagCompressionContext.clear();
632       }
633       currentBuffer = buffer;
634       current.currentBuffer = currentBuffer;
635       if(tagCompressionContext != null) {
636         current.tagCompressionContext = tagCompressionContext;
637       }
638       decodeFirst();
639       current.setKey(current.keyBuffer, current.memstoreTS);
640       previous.invalidate();
641     }
642 
643     @Override
644     public ByteBuffer getKeyDeepCopy() {
645       ByteBuffer keyBuffer = ByteBuffer.allocate(current.keyLength);
646       keyBuffer.put(current.keyBuffer, 0, current.keyLength);
647       keyBuffer.rewind();
648       return keyBuffer;
649     }
650 
651     @Override
652     public ByteBuffer getValueShallowCopy() {
653       ByteBuffer dup = currentBuffer.duplicate();
654       dup.position(current.valueOffset);
655       dup.limit(current.valueOffset + current.valueLength);
656       return dup.slice();
657     }
658 
659     @Override
660     public ByteBuffer getKeyValueBuffer() {
661       ByteBuffer kvBuffer = createKVBuffer();
662       kvBuffer.putInt(current.keyLength);
663       kvBuffer.putInt(current.valueLength);
664       kvBuffer.put(current.keyBuffer, 0, current.keyLength);
665       ByteBufferUtils.copyFromBufferToBuffer(kvBuffer, currentBuffer, current.valueOffset,
666           current.valueLength);
667       if (current.tagsLength > 0) {
668         // Put short as unsigned
669         kvBuffer.put((byte) (current.tagsLength >> 8 & 0xff));
670         kvBuffer.put((byte) (current.tagsLength & 0xff));
671         if (current.tagsOffset != -1) {
672           // the offset of the tags bytes in the underlying buffer is marked. So the temp
673           // buffer,tagsBuffer was not been used.
674           ByteBufferUtils.copyFromBufferToBuffer(kvBuffer, currentBuffer, current.tagsOffset,
675               current.tagsLength);
676         } else {
677           // When tagsOffset is marked as -1, tag compression was present and so the tags were
678           // uncompressed into temp buffer, tagsBuffer. Let us copy it from there
679           kvBuffer.put(current.tagsBuffer, 0, current.tagsLength);
680         }
681       }
682       kvBuffer.rewind();
683       return kvBuffer;
684     }
685 
686     protected ByteBuffer createKVBuffer() {
687       int kvBufSize = (int) KeyValue.getKeyValueDataStructureSize(current.keyLength,
688           current.valueLength, current.tagsLength);
689       ByteBuffer kvBuffer = ByteBuffer.allocate(kvBufSize);
690       return kvBuffer;
691     }
692 
693     @Override
694     public Cell getKeyValue() {
695       return current.shallowCopy();
696     }
697 
698     @Override
699     public void rewind() {
700       currentBuffer.rewind();
701       if (tagCompressionContext != null) {
702         tagCompressionContext.clear();
703       }
704       decodeFirst();
705       current.setKey(current.keyBuffer, current.memstoreTS);
706       previous.invalidate();
707     }
708 
709     @Override
710     public boolean next() {
711       if (!currentBuffer.hasRemaining()) {
712         return false;
713       }
714       decodeNext();
715       current.setKey(current.keyBuffer, current.memstoreTS);
716       previous.invalidate();
717       return true;
718     }
719 
720     protected void decodeTags() {
721       current.tagsLength = ByteBufferUtils.readCompressedInt(currentBuffer);
722       if (tagCompressionContext != null) {
723         if (current.uncompressTags) {
724           // Tag compression is been used. uncompress it into tagsBuffer
725           current.ensureSpaceForTags();
726           try {
727             current.tagsCompressedLength = tagCompressionContext.uncompressTags(currentBuffer,
728                 current.tagsBuffer, 0, current.tagsLength);
729           } catch (IOException e) {
730             throw new RuntimeException("Exception while uncompressing tags", e);
731           }
732         } else {
733           ByteBufferUtils.skip(currentBuffer, current.tagsCompressedLength);
734           current.uncompressTags = true;// Reset this.
735         }
736         current.tagsOffset = -1;
737       } else {
738         // When tag compress is not used, let us not do copying of tags bytes into tagsBuffer.
739         // Just mark the tags Offset so as to create the KV buffer later in getKeyValueBuffer()
740         current.tagsOffset = currentBuffer.position();
741         ByteBufferUtils.skip(currentBuffer, current.tagsLength);
742       }
743     }
744 
745     @Override
746     public int seekToKeyInBlock(Cell seekCell, boolean seekBefore) {
747       int rowCommonPrefix = 0;
748       int familyCommonPrefix = 0;
749       int qualCommonPrefix = 0;
750       previous.invalidate();
751       do {
752         int comp;
753         keyOnlyKV.setKey(current.keyBuffer, 0, current.keyLength);
754         if (current.lastCommonPrefix != 0) {
755           // The KV format has row key length also in the byte array. The
756           // common prefix
757           // includes it. So we need to subtract to find out the common prefix
758           // in the
759           // row part alone
760           rowCommonPrefix = Math.min(rowCommonPrefix, current.lastCommonPrefix - 2);
761         }
762         if (current.lastCommonPrefix <= 2) {
763           rowCommonPrefix = 0;
764         }
765         rowCommonPrefix += findCommonPrefixInRowPart(seekCell, keyOnlyKV, rowCommonPrefix);
766         comp = compareCommonRowPrefix(seekCell, keyOnlyKV, rowCommonPrefix);
767         if (comp == 0) {
768           comp = compareTypeBytes(seekCell, keyOnlyKV);
769           if (comp == 0) {
770             // Subtract the fixed row key length and the family key fixed length
771             familyCommonPrefix = Math.max(
772                 0,
773                 Math.min(familyCommonPrefix,
774                     current.lastCommonPrefix - (3 + keyOnlyKV.getRowLength())));
775             familyCommonPrefix += findCommonPrefixInFamilyPart(seekCell, keyOnlyKV,
776                 familyCommonPrefix);
777             comp = compareCommonFamilyPrefix(seekCell, keyOnlyKV, familyCommonPrefix);
778             if (comp == 0) {
779               // subtract the rowkey fixed length and the family key fixed
780               // length
781               qualCommonPrefix = Math.max(
782                   0,
783                   Math.min(
784                       qualCommonPrefix,
785                       current.lastCommonPrefix
786                           - (3 + keyOnlyKV.getRowLength() + keyOnlyKV.getFamilyLength())));
787               qualCommonPrefix += findCommonPrefixInQualifierPart(seekCell, keyOnlyKV,
788                   qualCommonPrefix);
789               comp = compareCommonQualifierPrefix(seekCell, keyOnlyKV, qualCommonPrefix);
790               if (comp == 0) {
791                 comp = CellComparator.compareTimestamps(seekCell, keyOnlyKV);
792                 if (comp == 0) {
793                   // Compare types. Let the delete types sort ahead of puts;
794                   // i.e. types
795                   // of higher numbers sort before those of lesser numbers.
796                   // Maximum
797                   // (255)
798                   // appears ahead of everything, and minimum (0) appears
799                   // after
800                   // everything.
801                   comp = (0xff & keyOnlyKV.getTypeByte()) - (0xff & seekCell.getTypeByte());
802                 }
803               }
804             }
805           }
806         }
807         if (comp == 0) { // exact match
808           if (seekBefore) {
809             if (!previous.isValid()) {
810               // The caller (seekBefore) has to ensure that we are not at the
811               // first key in the block.
812               throw new IllegalStateException("Cannot seekBefore if "
813                   + "positioned at the first key in the block: key="
814                   + Bytes.toStringBinary(seekCell.getRowArray()));
815             }
816             moveToPrevious();
817             return 1;
818           }
819           return 0;
820         }
821 
822         if (comp < 0) { // already too large, check previous
823           if (previous.isValid()) {
824             moveToPrevious();
825           } else {
826             return HConstants.INDEX_KEY_MAGIC; // using optimized index key
827           }
828           return 1;
829         }
830 
831         // move to next, if more data is available
832         if (currentBuffer.hasRemaining()) {
833           previous.copyFromNext(current);
834           decodeNext();
835           current.setKey(current.keyBuffer, current.memstoreTS);
836         } else {
837           break;
838         }
839       } while (true);
840 
841       // we hit the end of the block, not an exact match
842       return 1;
843     }
844 
845     private int compareTypeBytes(Cell key, Cell right) {
846       if (key.getFamilyLength() + key.getQualifierLength() == 0
847           && key.getTypeByte() == Type.Minimum.getCode()) {
848         // left is "bigger", i.e. it appears later in the sorted order
849         return 1;
850       }
851       if (right.getFamilyLength() + right.getQualifierLength() == 0
852           && right.getTypeByte() == Type.Minimum.getCode()) {
853         return -1;
854       }
855       return 0;
856     }
857 
858     private static int findCommonPrefixInRowPart(Cell left, Cell right, int rowCommonPrefix) {
859       return Bytes.findCommonPrefix(left.getRowArray(), right.getRowArray(), left.getRowLength()
860           - rowCommonPrefix, right.getRowLength() - rowCommonPrefix, left.getRowOffset()
861           + rowCommonPrefix, right.getRowOffset() + rowCommonPrefix);
862     }
863 
864     private static int findCommonPrefixInFamilyPart(Cell left, Cell right, int familyCommonPrefix) {
865       return Bytes
866           .findCommonPrefix(left.getFamilyArray(), right.getFamilyArray(), left.getFamilyLength()
867               - familyCommonPrefix, right.getFamilyLength() - familyCommonPrefix,
868               left.getFamilyOffset() + familyCommonPrefix, right.getFamilyOffset()
869                   + familyCommonPrefix);
870     }
871 
872     private static int findCommonPrefixInQualifierPart(Cell left, Cell right,
873         int qualifierCommonPrefix) {
874       return Bytes.findCommonPrefix(left.getQualifierArray(), right.getQualifierArray(),
875           left.getQualifierLength() - qualifierCommonPrefix, right.getQualifierLength()
876               - qualifierCommonPrefix, left.getQualifierOffset() + qualifierCommonPrefix,
877           right.getQualifierOffset() + qualifierCommonPrefix);
878     }
879 
880     private void moveToPrevious() {
881       if (!previous.isValid()) {
882         throw new IllegalStateException(
883             "Can move back only once and not in first key in the block.");
884       }
885 
886       STATE tmp = previous;
887       previous = current;
888       current = tmp;
889 
890       // move after last key value
891       currentBuffer.position(current.nextKvOffset);
892       // Already decoded the tag bytes. We cache this tags into current state and also the total
893       // compressed length of the tags bytes. For the next time decodeNext() we don't need to decode
894       // the tags again. This might pollute the Data Dictionary what we use for the compression.
895       // When current.uncompressTags is false, we will just reuse the current.tagsBuffer and skip
896       // 'tagsCompressedLength' bytes of source stream.
897       // See in decodeTags()
898       current.tagsBuffer = previous.tagsBuffer;
899       current.tagsCompressedLength = previous.tagsCompressedLength;
900       current.uncompressTags = false;
901       previous.invalidate();
902     }
903 
904     @SuppressWarnings("unchecked")
905     protected STATE createSeekerState() {
906       // This will fail for non-default seeker state if the subclass does not
907       // override this method.
908       return (STATE) new SeekerState();
909     }
910 
911     abstract protected void decodeFirst();
912     abstract protected void decodeNext();
913   }
914 
915   /**
916    * @param cell
917    * @param out
918    * @param encodingCtx
919    * @return unencoded size added
920    * @throws IOException
921    */
922   protected final int afterEncodingKeyValue(Cell cell, DataOutputStream out,
923       HFileBlockDefaultEncodingContext encodingCtx) throws IOException {
924     int size = 0;
925     if (encodingCtx.getHFileContext().isIncludesTags()) {
926       int tagsLength = cell.getTagsLength();
927       ByteBufferUtils.putCompressedInt(out, tagsLength);
928       // There are some tags to be written
929       if (tagsLength > 0) {
930         TagCompressionContext tagCompressionContext = encodingCtx.getTagCompressionContext();
931         // When tag compression is enabled, tagCompressionContext will have a not null value. Write
932         // the tags using Dictionary compression in such a case
933         if (tagCompressionContext != null) {
934           tagCompressionContext
935               .compressTags(out, cell.getTagsArray(), cell.getTagsOffset(), tagsLength);
936         } else {
937           out.write(cell.getTagsArray(), cell.getTagsOffset(), tagsLength);
938         }
939       }
940       size += tagsLength + KeyValue.TAGS_LENGTH_SIZE;
941     }
942     if (encodingCtx.getHFileContext().isIncludesMvcc()) {
943       // Copy memstore timestamp from the byte buffer to the output stream.
944       long memstoreTS = cell.getSequenceId();
945       WritableUtils.writeVLong(out, memstoreTS);
946       // TODO use a writeVLong which returns the #bytes written so that 2 time parsing can be
947       // avoided.
948       size += WritableUtils.getVIntSize(memstoreTS);
949     }
950     return size;
951   }
952 
953   protected final void afterDecodingKeyValue(DataInputStream source,
954       ByteBuffer dest, HFileBlockDefaultDecodingContext decodingCtx) throws IOException {
955     if (decodingCtx.getHFileContext().isIncludesTags()) {
956       int tagsLength = ByteBufferUtils.readCompressedInt(source);
957       // Put as unsigned short
958       dest.put((byte) ((tagsLength >> 8) & 0xff));
959       dest.put((byte) (tagsLength & 0xff));
960       if (tagsLength > 0) {
961         TagCompressionContext tagCompressionContext = decodingCtx.getTagCompressionContext();
962         // When tag compression is been used in this file, tagCompressionContext will have a not
963         // null value passed.
964         if (tagCompressionContext != null) {
965           tagCompressionContext.uncompressTags(source, dest, tagsLength);
966         } else {
967           ByteBufferUtils.copyFromStreamToBuffer(dest, source, tagsLength);
968         }
969       }
970     }
971     if (decodingCtx.getHFileContext().isIncludesMvcc()) {
972       long memstoreTS = -1;
973       try {
974         // Copy memstore timestamp from the data input stream to the byte
975         // buffer.
976         memstoreTS = WritableUtils.readVLong(source);
977         ByteBufferUtils.writeVLong(dest, memstoreTS);
978       } catch (IOException ex) {
979         throw new RuntimeException("Unable to copy memstore timestamp " +
980             memstoreTS + " after decoding a key/value");
981       }
982     }
983   }
984 
985   @Override
986   public HFileBlockEncodingContext newDataBlockEncodingContext(DataBlockEncoding encoding,
987       byte[] header, HFileContext meta) {
988     return new HFileBlockDefaultEncodingContext(encoding, header, meta);
989   }
990 
991   @Override
992   public HFileBlockDecodingContext newDataBlockDecodingContext(HFileContext meta) {
993     return new HFileBlockDefaultDecodingContext(meta);
994   }
995 
996   protected abstract ByteBuffer internalDecodeKeyValues(DataInputStream source,
997       int allocateHeaderLength, int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx)
998       throws IOException;
999 
1000   /**
1001    * Asserts that there is at least the given amount of unfilled space
1002    * remaining in the given buffer.
1003    * @param out typically, the buffer we are writing to
1004    * @param length the required space in the buffer
1005    * @throws EncoderBufferTooSmallException If there are no enough bytes.
1006    */
1007   protected static void ensureSpace(ByteBuffer out, int length)
1008       throws EncoderBufferTooSmallException {
1009     if (out.position() + length > out.limit()) {
1010       throw new EncoderBufferTooSmallException(
1011           "Buffer position=" + out.position() +
1012           ", buffer limit=" + out.limit() +
1013           ", length to be written=" + length);
1014     }
1015   }
1016 
1017   @Override
1018   public void startBlockEncoding(HFileBlockEncodingContext blkEncodingCtx, DataOutputStream out)
1019       throws IOException {
1020     if (blkEncodingCtx.getClass() != HFileBlockDefaultEncodingContext.class) {
1021       throw new IOException (this.getClass().getName() + " only accepts "
1022           + HFileBlockDefaultEncodingContext.class.getName() + " as the " +
1023           "encoding context.");
1024     }
1025 
1026     HFileBlockDefaultEncodingContext encodingCtx =
1027         (HFileBlockDefaultEncodingContext) blkEncodingCtx;
1028     encodingCtx.prepareEncoding(out);
1029     if (encodingCtx.getHFileContext().isIncludesTags()
1030         && encodingCtx.getHFileContext().isCompressTags()) {
1031       if (encodingCtx.getTagCompressionContext() != null) {
1032         // It will be overhead to create the TagCompressionContext again and again for every block
1033         // encoding.
1034         encodingCtx.getTagCompressionContext().clear();
1035       } else {
1036         try {
1037           TagCompressionContext tagCompressionContext = new TagCompressionContext(
1038               LRUDictionary.class, Byte.MAX_VALUE);
1039           encodingCtx.setTagCompressionContext(tagCompressionContext);
1040         } catch (Exception e) {
1041           throw new IOException("Failed to initialize TagCompressionContext", e);
1042         }
1043       }
1044     }
1045     ByteBufferUtils.putInt(out, 0); // DUMMY length. This will be updated in endBlockEncoding()
1046     blkEncodingCtx.setEncodingState(new BufferedDataBlockEncodingState());
1047   }
1048 
1049   private static class BufferedDataBlockEncodingState extends EncodingState {
1050     int unencodedDataSizeWritten = 0;
1051   }
1052 
1053   @Override
1054   public int encode(Cell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
1055       throws IOException {
1056     BufferedDataBlockEncodingState state = (BufferedDataBlockEncodingState) encodingCtx
1057         .getEncodingState();
1058     int encodedKvSize = internalEncode(cell, (HFileBlockDefaultEncodingContext) encodingCtx, out);
1059     state.unencodedDataSizeWritten += encodedKvSize;
1060     return encodedKvSize;
1061   }
1062 
1063   public abstract int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingCtx,
1064       DataOutputStream out) throws IOException;
1065 
1066   @Override
1067   public void endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out,
1068       byte[] uncompressedBytesWithHeader) throws IOException {
1069     BufferedDataBlockEncodingState state = (BufferedDataBlockEncodingState) encodingCtx
1070         .getEncodingState();
1071     // Write the unencodedDataSizeWritten (with header size)
1072     Bytes.putInt(uncompressedBytesWithHeader, HConstants.HFILEBLOCK_HEADER_SIZE
1073         + DataBlockEncoding.ID_SIZE, state.unencodedDataSizeWritten
1074         );
1075     if (encodingCtx.getDataBlockEncoding() != DataBlockEncoding.NONE) {
1076       encodingCtx.postEncoding(BlockType.ENCODED_DATA);
1077     } else {
1078       encodingCtx.postEncoding(BlockType.DATA);
1079     }
1080   }
1081 }