View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations
15   * under the License.
16   */
17  package org.apache.hadoop.hbase.io.encoding;
18  
19  import java.io.DataInputStream;
20  import java.io.DataOutputStream;
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  
24  import org.apache.hadoop.hbase.Cell;
25  import org.apache.hadoop.hbase.CellComparator;
26  import org.apache.hadoop.hbase.CellUtil;
27  import org.apache.hadoop.hbase.HConstants;
28  import org.apache.hadoop.hbase.KeyValue;
29  import org.apache.hadoop.hbase.KeyValue.KVComparator;
30  import org.apache.hadoop.hbase.KeyValue.SamePrefixComparator;
31  import org.apache.hadoop.hbase.KeyValue.Type;
32  import org.apache.hadoop.hbase.KeyValueUtil;
33  import org.apache.hadoop.hbase.SettableSequenceId;
34  import org.apache.hadoop.hbase.classification.InterfaceAudience;
35  import org.apache.hadoop.hbase.io.HeapSize;
36  import org.apache.hadoop.hbase.io.TagCompressionContext;
37  import org.apache.hadoop.hbase.io.hfile.BlockType;
38  import org.apache.hadoop.hbase.io.hfile.HFileContext;
39  import org.apache.hadoop.hbase.io.util.LRUDictionary;
40  import org.apache.hadoop.hbase.util.ByteBufferUtils;
41  import org.apache.hadoop.hbase.util.Bytes;
42  import org.apache.hadoop.hbase.util.ClassSize;
43  import org.apache.hadoop.io.WritableUtils;
44  
45  /**
46   * Base class for all data block encoders that use a buffer.
47   */
48  @InterfaceAudience.Private
49  abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
50  
51    private static int INITIAL_KEY_BUFFER_SIZE = 512;
52  
53    @Override
54    public ByteBuffer decodeKeyValues(DataInputStream source,
55        HFileBlockDecodingContext blkDecodingCtx) throws IOException {
56      if (blkDecodingCtx.getClass() != HFileBlockDefaultDecodingContext.class) {
57        throw new IOException(this.getClass().getName() + " only accepts "
58            + HFileBlockDefaultDecodingContext.class.getName() + " as the decoding context.");
59      }
60  
61      HFileBlockDefaultDecodingContext decodingCtx =
62          (HFileBlockDefaultDecodingContext) blkDecodingCtx;
63      if (decodingCtx.getHFileContext().isIncludesTags()
64          && decodingCtx.getHFileContext().isCompressTags()) {
65        if (decodingCtx.getTagCompressionContext() != null) {
66          // It will be overhead to create the TagCompressionContext again and again for every block
67          // decoding.
68          decodingCtx.getTagCompressionContext().clear();
69        } else {
70          try {
71            TagCompressionContext tagCompressionContext = new TagCompressionContext(
72                LRUDictionary.class, Byte.MAX_VALUE);
73            decodingCtx.setTagCompressionContext(tagCompressionContext);
74          } catch (Exception e) {
75            throw new IOException("Failed to initialize TagCompressionContext", e);
76          }
77        }
78      }
79      return internalDecodeKeyValues(source, 0, 0, decodingCtx);
80    }
81  
82    protected static class SeekerState implements Cell {
83      protected ByteBuffer currentBuffer;
84      protected TagCompressionContext tagCompressionContext;
85      protected int valueOffset = -1;
86      protected int keyLength;
87      protected int valueLength;
88      protected int lastCommonPrefix;
89      protected int tagsLength = 0;
90      protected int tagsOffset = -1;
91      protected int tagsCompressedLength = 0;
92      protected boolean uncompressTags = true;
93  
94      /** We need to store a copy of the key. */
95      protected byte[] keyBuffer = new byte[INITIAL_KEY_BUFFER_SIZE];
96      protected byte[] tagsBuffer = new byte[INITIAL_KEY_BUFFER_SIZE];
97  
98      protected long memstoreTS;
99      protected int nextKvOffset;
100     protected KeyValue.KeyOnlyKeyValue currentKey = new KeyValue.KeyOnlyKeyValue();
101 
102     protected boolean isValid() {
103       return valueOffset != -1;
104     }
105 
106     protected void invalidate() {
107       valueOffset = -1;
108       tagsCompressedLength = 0;
109       currentKey = new KeyValue.KeyOnlyKeyValue();
110       uncompressTags = true;
111       currentBuffer = null;
112     }
113 
114     protected void ensureSpaceForKey() {
115       if (keyLength > keyBuffer.length) {
116         // rare case, but we need to handle arbitrary length of key
117         int newKeyBufferLength = Math.max(keyBuffer.length, 1) * 2;
118         while (keyLength > newKeyBufferLength) {
119           newKeyBufferLength *= 2;
120         }
121         byte[] newKeyBuffer = new byte[newKeyBufferLength];
122         System.arraycopy(keyBuffer, 0, newKeyBuffer, 0, keyBuffer.length);
123         keyBuffer = newKeyBuffer;
124       }
125     }
126 
127     protected void ensureSpaceForTags() {
128       if (tagsLength > tagsBuffer.length) {
129         // rare case, but we need to handle arbitrary length of tags
130         int newTagsBufferLength = Math.max(tagsBuffer.length, 1) * 2;
131         while (tagsLength > newTagsBufferLength) {
132           newTagsBufferLength *= 2;
133         }
134         byte[] newTagsBuffer = new byte[newTagsBufferLength];
135         System.arraycopy(tagsBuffer, 0, newTagsBuffer, 0, tagsBuffer.length);
136         tagsBuffer = newTagsBuffer;
137       }
138     }
139 
140     protected void createKeyOnlyKeyValue(byte[] keyBuffer, long memTS) {
141       currentKey.setKey(keyBuffer, 0, keyLength);
142       memstoreTS = memTS;
143     }
144 
145     /**
146      * Copy the state from the next one into this instance (the previous state
147      * placeholder). Used to save the previous state when we are advancing the
148      * seeker to the next key/value.
149      */
150     protected void copyFromNext(SeekerState nextState) {
151       if (keyBuffer.length != nextState.keyBuffer.length) {
152         keyBuffer = nextState.keyBuffer.clone();
153       } else if (!isValid()) {
154         // Note: we can only call isValid before we override our state, so this
155         // comes before all the assignments at the end of this method.
156         System.arraycopy(nextState.keyBuffer, 0, keyBuffer, 0,
157              nextState.keyLength);
158       } else {
159         // don't copy the common prefix between this key and the previous one
160         System.arraycopy(nextState.keyBuffer, nextState.lastCommonPrefix,
161             keyBuffer, nextState.lastCommonPrefix, nextState.keyLength
162                 - nextState.lastCommonPrefix);
163       }
164       currentKey = nextState.currentKey;
165 
166       valueOffset = nextState.valueOffset;
167       keyLength = nextState.keyLength;
168       valueLength = nextState.valueLength;
169       lastCommonPrefix = nextState.lastCommonPrefix;
170       nextKvOffset = nextState.nextKvOffset;
171       memstoreTS = nextState.memstoreTS;
172       currentBuffer = nextState.currentBuffer;
173       if (nextState.tagCompressionContext != null) {
174         tagCompressionContext = nextState.tagCompressionContext;
175       }
176     }
177 
178     @Override
179     public byte[] getRowArray() {
180       return currentKey.getRowArray();
181     }
182 
183     @Override
184     public int getRowOffset() {
185       return Bytes.SIZEOF_SHORT;
186     }
187 
188     @Override
189     public short getRowLength() {
190       return currentKey.getRowLength();
191     }
192 
193     @Override
194     public byte[] getFamilyArray() {
195       return currentKey.getFamilyArray();
196     }
197 
198     @Override
199     public int getFamilyOffset() {
200       return currentKey.getFamilyOffset();
201     }
202 
203     @Override
204     public byte getFamilyLength() {
205       return currentKey.getFamilyLength();
206     }
207 
208     @Override
209     public byte[] getQualifierArray() {
210       return currentKey.getQualifierArray();
211     }
212 
213     @Override
214     public int getQualifierOffset() {
215       return currentKey.getQualifierOffset();
216     }
217 
218     @Override
219     public int getQualifierLength() {
220       return currentKey.getQualifierLength();
221     }
222 
223     @Override
224     public long getTimestamp() {
225       return currentKey.getTimestamp();
226     }
227 
228     @Override
229     public byte getTypeByte() {
230       return currentKey.getTypeByte();
231     }
232 
233     @Override
234     public long getMvccVersion() {
235       return memstoreTS;
236     }
237 
238     @Override
239     public long getSequenceId() {
240       return memstoreTS;
241     }
242 
243     @Override
244     public byte[] getValueArray() {
245       return currentBuffer.array();
246     }
247 
248     @Override
249     public int getValueOffset() {
250       return currentBuffer.arrayOffset() + valueOffset;
251     }
252 
253     @Override
254     public int getValueLength() {
255       return valueLength;
256     }
257 
258     @Override
259     public byte[] getTagsArray() {
260       if (tagCompressionContext != null) {
261         return tagsBuffer;
262       }
263       return currentBuffer.array();
264     }
265 
266     @Override
267     public int getTagsOffset() {
268       if (tagCompressionContext != null) {
269         return 0;
270       }
271       return currentBuffer.arrayOffset() + tagsOffset;
272     }
273 
274     @Override
275     public int getTagsLength() {
276       return tagsLength;
277     }
278 
279     @Override
280     @Deprecated
281     public byte[] getValue() {
282       throw new UnsupportedOperationException("getValue() not supported");
283     }
284 
285     @Override
286     @Deprecated
287     public byte[] getFamily() {
288       throw new UnsupportedOperationException("getFamily() not supported");
289     }
290 
291     @Override
292     @Deprecated
293     public byte[] getQualifier() {
294       throw new UnsupportedOperationException("getQualifier() not supported");
295     }
296 
297     @Override
298     @Deprecated
299     public byte[] getRow() {
300       throw new UnsupportedOperationException("getRow() not supported");
301     }
302 
303     @Override
304     public String toString() {
305       KeyValue kv = KeyValueUtil.copyToNewKeyValue(this);
306       if (kv == null) {
307         return "null";
308       }
309       return kv.toString();
310     }
311 
312     public Cell shallowCopy() {
313       return new ClonedSeekerState(currentBuffer, keyBuffer, currentKey.getRowLength(),
314           currentKey.getFamilyOffset(), currentKey.getFamilyLength(), keyLength, 
315           currentKey.getQualifierOffset(), currentKey.getQualifierLength(),
316           currentKey.getTimestamp(), currentKey.getTypeByte(), valueLength, valueOffset,
317           memstoreTS, tagsOffset, tagsLength, tagCompressionContext, tagsBuffer);
318     }
319   }
320 
321   /**
322    * Copies only the key part of the keybuffer by doing a deep copy and passes the 
323    * seeker state members for taking a clone.
324    * Note that the value byte[] part is still pointing to the currentBuffer and the 
325    * represented by the valueOffset and valueLength
326    */
327   // We return this as a Cell to the upper layers of read flow and might try setting a new SeqId
328   // there. So this has to be an instance of SettableSequenceId. SeekerState need not be
329   // SettableSequenceId as we never return that to top layers. When we have to, we make
330   // ClonedSeekerState from it.
331   protected static class ClonedSeekerState implements Cell, HeapSize, SettableSequenceId {
332     private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
333         + (4 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT)
334         + (Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) + (2 * ClassSize.ARRAY));
335     private byte[] keyOnlyBuffer;
336     private ByteBuffer currentBuffer;
337     private short rowLength;
338     private int familyOffset;
339     private byte familyLength;
340     private int qualifierOffset;
341     private int qualifierLength;
342     private long timestamp;
343     private byte typeByte;
344     private int valueOffset;
345     private int valueLength;
346     private int tagsLength;
347     private int tagsOffset;
348     private byte[] cloneTagsBuffer;
349     private long seqId;
350     private TagCompressionContext tagCompressionContext;
351     
352     protected ClonedSeekerState(ByteBuffer currentBuffer, byte[] keyBuffer, short rowLength,
353         int familyOffset, byte familyLength, int keyLength, int qualOffset, int qualLength,
354         long timeStamp, byte typeByte, int valueLen, int valueOffset, long seqId,
355         int tagsOffset, int tagsLength, TagCompressionContext tagCompressionContext,
356         byte[] tagsBuffer) {
357       this.currentBuffer = currentBuffer;
358       keyOnlyBuffer = new byte[keyLength];
359       this.tagCompressionContext = tagCompressionContext;
360       this.rowLength = rowLength;
361       this.familyOffset = familyOffset;
362       this.familyLength = familyLength;
363       this.qualifierOffset = qualOffset;
364       this.qualifierLength = qualLength;
365       this.timestamp = timeStamp;
366       this.typeByte = typeByte;
367       this.valueLength = valueLen;
368       this.valueOffset = valueOffset;
369       this.tagsOffset = tagsOffset;
370       this.tagsLength = tagsLength;
371       System.arraycopy(keyBuffer, 0, keyOnlyBuffer, 0, keyLength);
372       if (tagCompressionContext != null) {
373         this.cloneTagsBuffer = new byte[tagsLength];
374         System.arraycopy(tagsBuffer, 0, this.cloneTagsBuffer, 0, tagsLength);
375       }
376       setSequenceId(seqId);
377     }
378 
379     @Override
380     public byte[] getRowArray() {
381       return keyOnlyBuffer;
382     }
383 
384     @Override
385     public byte[] getFamilyArray() {
386       return keyOnlyBuffer;
387     }
388 
389     @Override
390     public byte[] getQualifierArray() {
391       return keyOnlyBuffer;
392     }
393 
394     @Override
395     public int getRowOffset() {
396       return Bytes.SIZEOF_SHORT;
397     }
398 
399     @Override
400     public short getRowLength() {
401       return rowLength;
402     }
403 
404     @Override
405     public int getFamilyOffset() {
406       return familyOffset;
407     }
408 
409     @Override
410     public byte getFamilyLength() {
411       return familyLength;
412     }
413 
414     @Override
415     public int getQualifierOffset() {
416       return qualifierOffset;
417     }
418 
419     @Override
420     public int getQualifierLength() {
421       return qualifierLength;
422     }
423 
424     @Override
425     public long getTimestamp() {
426       return timestamp;
427     }
428 
429     @Override
430     public byte getTypeByte() {
431       return typeByte;
432     }
433 
434     @Override
435     @Deprecated
436     public long getMvccVersion() {
437       return getSequenceId();
438     }
439 
440     @Override
441     public long getSequenceId() {
442       return seqId;
443     }
444 
445     @Override
446     public byte[] getValueArray() {
447       return currentBuffer.array();
448     }
449 
450     @Override
451     public int getValueOffset() {
452       return currentBuffer.arrayOffset() + valueOffset;
453     }
454 
455     @Override
456     public int getValueLength() {
457       return valueLength;
458     }
459 
460     @Override
461     public byte[] getTagsArray() {
462       if (tagCompressionContext != null) {
463         return cloneTagsBuffer;
464       }
465       return currentBuffer.array();
466     }
467 
468     @Override
469     public int getTagsOffset() {
470       if (tagCompressionContext != null) {
471         return 0;
472       }
473       return currentBuffer.arrayOffset() + tagsOffset;
474     }
475 
476     @Override
477     public int getTagsLength() {
478       return tagsLength;
479     }
480 
481     @Override
482     @Deprecated
483     public byte[] getValue() {
484       return CellUtil.cloneValue(this);
485     }
486 
487     @Override
488     @Deprecated
489     public byte[] getFamily() {
490       return CellUtil.cloneFamily(this);
491     }
492 
493     @Override
494     @Deprecated
495     public byte[] getQualifier() {
496       return CellUtil.cloneQualifier(this);
497     }
498 
499     @Override
500     @Deprecated
501     public byte[] getRow() {
502       return CellUtil.cloneRow(this);
503     }
504 
505     @Override
506     public String toString() {
507       return KeyValue.keyToString(this.keyOnlyBuffer, 0, KeyValueUtil.keyLength(this)) + "/vlen="
508           + getValueLength() + "/seqid=" + seqId;
509     }
510 
511     @Override
512     public void setSequenceId(long seqId) {
513       this.seqId = seqId;
514     }
515 
516     @Override
517     public long heapSize() {
518       return FIXED_OVERHEAD + rowLength + familyLength + qualifierLength + valueLength + tagsLength;
519     }
520   }
521 
522   protected abstract static class
523       BufferedEncodedSeeker<STATE extends SeekerState>
524       implements EncodedSeeker {
525     protected HFileBlockDecodingContext decodingCtx;
526     protected final KVComparator comparator;
527     protected final SamePrefixComparator<byte[]> samePrefixComparator;
528     protected ByteBuffer currentBuffer;
529     protected STATE current = createSeekerState(); // always valid
530     protected STATE previous = createSeekerState(); // may not be valid
531     protected TagCompressionContext tagCompressionContext = null;
532 
533     public BufferedEncodedSeeker(KVComparator comparator,
534         HFileBlockDecodingContext decodingCtx) {
535       this.comparator = comparator;
536       this.samePrefixComparator = comparator;
537       this.decodingCtx = decodingCtx;
538       if (decodingCtx.getHFileContext().isCompressTags()) {
539         try {
540           tagCompressionContext = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
541         } catch (Exception e) {
542           throw new RuntimeException("Failed to initialize TagCompressionContext", e);
543         }
544       }
545     }
546     
547     protected boolean includesMvcc() {
548       return this.decodingCtx.getHFileContext().isIncludesMvcc();
549     }
550 
551     protected boolean includesTags() {
552       return this.decodingCtx.getHFileContext().isIncludesTags();
553     }
554 
555     @Override
556     public int compareKey(KVComparator comparator, byte[] key, int offset, int length) {
557       return comparator.compareFlatKey(key, offset, length,
558           current.keyBuffer, 0, current.keyLength);
559     }
560 
561     @Override
562     public int compareKey(KVComparator comparator, Cell key) {
563       return comparator.compareOnlyKeyPortion(key,
564           new KeyValue.KeyOnlyKeyValue(current.keyBuffer, 0, current.keyLength));
565     }
566 
567     @Override
568     public void setCurrentBuffer(ByteBuffer buffer) {
569       if (this.tagCompressionContext != null) {
570         this.tagCompressionContext.clear();
571       }
572       currentBuffer = buffer;
573       current.currentBuffer = currentBuffer;
574       if(tagCompressionContext != null) {
575         current.tagCompressionContext = tagCompressionContext;
576       }
577       decodeFirst();
578       current.createKeyOnlyKeyValue(current.keyBuffer, current.memstoreTS);
579       previous.invalidate();
580     }
581 
582     @Override
583     public ByteBuffer getKeyDeepCopy() {
584       ByteBuffer keyBuffer = ByteBuffer.allocate(current.keyLength);
585       keyBuffer.put(current.keyBuffer, 0, current.keyLength);
586       return keyBuffer;
587     }
588 
589     @Override
590     public ByteBuffer getValueShallowCopy() {
591       return ByteBuffer.wrap(currentBuffer.array(),
592           currentBuffer.arrayOffset() + current.valueOffset,
593           current.valueLength);
594     }
595 
596     @Override
597     public ByteBuffer getKeyValueBuffer() {
598       ByteBuffer kvBuffer = createKVBuffer();
599       kvBuffer.putInt(current.keyLength);
600       kvBuffer.putInt(current.valueLength);
601       kvBuffer.put(current.keyBuffer, 0, current.keyLength);
602       kvBuffer.put(currentBuffer.array(),
603           currentBuffer.arrayOffset() + current.valueOffset,
604           current.valueLength);
605       if (current.tagsLength > 0) {
606         // Put short as unsigned
607         kvBuffer.put((byte) (current.tagsLength >> 8 & 0xff));
608         kvBuffer.put((byte) (current.tagsLength & 0xff));
609         if (current.tagsOffset != -1) {
610           // the offset of the tags bytes in the underlying buffer is marked. So the temp
611           // buffer,tagsBuffer was not been used.
612           kvBuffer.put(currentBuffer.array(), currentBuffer.arrayOffset() + current.tagsOffset,
613               current.tagsLength);
614         } else {
615           // When tagsOffset is marked as -1, tag compression was present and so the tags were
616           // uncompressed into temp buffer, tagsBuffer. Let us copy it from there
617           kvBuffer.put(current.tagsBuffer, 0, current.tagsLength);
618         }
619       }
620       return kvBuffer;
621     }
622 
623     protected ByteBuffer createKVBuffer() {
624       int kvBufSize = (int) KeyValue.getKeyValueDataStructureSize(current.keyLength,
625           current.valueLength, current.tagsLength);
626       ByteBuffer kvBuffer = ByteBuffer.allocate(kvBufSize);
627       return kvBuffer;
628     }
629 
630     @Override
631     public Cell getKeyValue() {
632       return current.shallowCopy();
633     }
634 
635     @Override
636     public void rewind() {
637       currentBuffer.rewind();
638       if (tagCompressionContext != null) {
639         tagCompressionContext.clear();
640       }
641       decodeFirst();
642       current.createKeyOnlyKeyValue(current.keyBuffer, current.memstoreTS);
643       previous.invalidate();
644     }
645 
646     @Override
647     public boolean next() {
648       if (!currentBuffer.hasRemaining()) {
649         return false;
650       }
651       decodeNext();
652       current.createKeyOnlyKeyValue(current.keyBuffer, current.memstoreTS);
653       previous.invalidate();
654       return true;
655     }
656 
657     protected void decodeTags() {
658       current.tagsLength = ByteBufferUtils.readCompressedInt(currentBuffer);
659       if (tagCompressionContext != null) {
660         if (current.uncompressTags) {
661           // Tag compression is been used. uncompress it into tagsBuffer
662           current.ensureSpaceForTags();
663           try {
664             current.tagsCompressedLength = tagCompressionContext.uncompressTags(currentBuffer,
665                 current.tagsBuffer, 0, current.tagsLength);
666           } catch (IOException e) {
667             throw new RuntimeException("Exception while uncompressing tags", e);
668           }
669         } else {
670           ByteBufferUtils.skip(currentBuffer, current.tagsCompressedLength);
671           current.uncompressTags = true;// Reset this.
672         }
673         current.tagsOffset = -1;
674       } else {
675         // When tag compress is not used, let us not do temp copying of tags bytes into tagsBuffer.
676         // Just mark the tags Offset so as to create the KV buffer later in getKeyValueBuffer()
677         current.tagsOffset = currentBuffer.position();
678         ByteBufferUtils.skip(currentBuffer, current.tagsLength);
679       }
680     }
681 
682     @Override
683     public int seekToKeyInBlock(byte[] key, int offset, int length, boolean seekBefore) {
684       return seekToKeyInBlock(new KeyValue.KeyOnlyKeyValue(key, offset, length), seekBefore);
685     }
686 
687     @Override
688     public int seekToKeyInBlock(Cell seekCell, boolean seekBefore) {
689       int rowCommonPrefix = 0;
690       int familyCommonPrefix = 0;
691       int qualCommonPrefix = 0;
692       previous.invalidate();
693       KeyValue.KeyOnlyKeyValue currentCell = new KeyValue.KeyOnlyKeyValue();
694       do {
695         int comp;
696         if (samePrefixComparator != null) {
697           currentCell.setKey(current.keyBuffer, 0, current.keyLength);
698           if (current.lastCommonPrefix != 0) {
699             // The KV format has row key length also in the byte array. The
700             // common prefix
701             // includes it. So we need to subtract to find out the common prefix
702             // in the
703             // row part alone
704             rowCommonPrefix = Math.min(rowCommonPrefix, current.lastCommonPrefix - 2);
705           }
706           if (current.lastCommonPrefix <= 2) {
707             rowCommonPrefix = 0;
708           }
709           rowCommonPrefix += CellComparator.findCommonPrefixInRowPart(seekCell, currentCell,
710               rowCommonPrefix);
711           comp = CellComparator.compareCommonRowPrefix(seekCell, currentCell, rowCommonPrefix);
712           if (comp == 0) {
713             comp = compareTypeBytes(seekCell, currentCell);
714             if (comp == 0) {
715               // Subtract the fixed row key length and the family key fixed length
716               familyCommonPrefix = Math.max(
717                   0,
718                   Math.min(familyCommonPrefix,
719                       current.lastCommonPrefix - (3 + currentCell.getRowLength())));
720               familyCommonPrefix += CellComparator.findCommonPrefixInFamilyPart(seekCell,
721                   currentCell, familyCommonPrefix);
722               comp = CellComparator.compareCommonFamilyPrefix(seekCell, currentCell,
723                   familyCommonPrefix);
724               if (comp == 0) {
725                 // subtract the rowkey fixed length and the family key fixed
726                 // length
727                 qualCommonPrefix = Math.max(
728                     0,
729                     Math.min(
730                         qualCommonPrefix,
731                         current.lastCommonPrefix
732                             - (3 + currentCell.getRowLength() + currentCell.getFamilyLength())));
733                 qualCommonPrefix += CellComparator.findCommonPrefixInQualifierPart(seekCell,
734                     currentCell, qualCommonPrefix);
735                 comp = CellComparator.compareCommonQualifierPrefix(seekCell, currentCell,
736                     qualCommonPrefix);
737                 if (comp == 0) {
738                   comp = CellComparator.compareTimestamps(seekCell, currentCell);
739                   if (comp == 0) {
740                     // Compare types. Let the delete types sort ahead of puts;
741                     // i.e. types
742                     // of higher numbers sort before those of lesser numbers.
743                     // Maximum
744                     // (255)
745                     // appears ahead of everything, and minimum (0) appears
746                     // after
747                     // everything.
748                     comp = (0xff & currentCell.getTypeByte()) - (0xff & seekCell.getTypeByte());
749                   }
750                 }
751               }
752             }
753           }
754         } else {
755           Cell r = new KeyValue.KeyOnlyKeyValue(current.keyBuffer, 0, current.keyLength);
756           comp = comparator.compareOnlyKeyPortion(seekCell, r);
757         }
758         if (comp == 0) { // exact match
759           if (seekBefore) {
760             if (!previous.isValid()) {
761               // The caller (seekBefore) has to ensure that we are not at the
762               // first key in the block.
763               throw new IllegalStateException("Cannot seekBefore if "
764                   + "positioned at the first key in the block: key="
765                   + Bytes.toStringBinary(seekCell.getRowArray()));
766             }
767             moveToPrevious();
768             return 1;
769           }
770           return 0;
771         }
772 
773         if (comp < 0) { // already too large, check previous
774           if (previous.isValid()) {
775             moveToPrevious();
776           } else {
777             return HConstants.INDEX_KEY_MAGIC; // using optimized index key
778           }
779           return 1;
780         }
781 
782         // move to next, if more data is available
783         if (currentBuffer.hasRemaining()) {
784           previous.copyFromNext(current);
785           decodeNext();
786           current.createKeyOnlyKeyValue(current.keyBuffer, current.memstoreTS);
787         } else {
788           break;
789         }
790       } while (true);
791 
792       // we hit the end of the block, not an exact match
793       return 1;
794     }
795 
796     private int compareTypeBytes(Cell key, Cell right) {
797       if (key.getFamilyLength() + key.getQualifierLength() == 0
798           && key.getTypeByte() == Type.Minimum.getCode()) {
799         // left is "bigger", i.e. it appears later in the sorted order
800         return 1;
801       }
802       if (right.getFamilyLength() + right.getQualifierLength() == 0
803           && right.getTypeByte() == Type.Minimum.getCode()) {
804         return -1;
805       }
806       return 0;
807     }
808 
809 
810     private void moveToPrevious() {
811       if (!previous.isValid()) {
812         throw new IllegalStateException(
813             "Can move back only once and not in first key in the block.");
814       }
815 
816       STATE tmp = previous;
817       previous = current;
818       current = tmp;
819 
820       // move after last key value
821       currentBuffer.position(current.nextKvOffset);
822       // Already decoded the tag bytes. We cache this tags into current state and also the total
823       // compressed length of the tags bytes. For the next time decodeNext() we don't need to decode
824       // the tags again. This might pollute the Data Dictionary what we use for the compression.
825       // When current.uncompressTags is false, we will just reuse the current.tagsBuffer and skip
826       // 'tagsCompressedLength' bytes of source stream.
827       // See in decodeTags()
828       current.tagsBuffer = previous.tagsBuffer;
829       current.tagsCompressedLength = previous.tagsCompressedLength;
830       current.uncompressTags = false;
831       previous.invalidate();
832     }
833 
834     @SuppressWarnings("unchecked")
835     protected STATE createSeekerState() {
836       // This will fail for non-default seeker state if the subclass does not
837       // override this method.
838       return (STATE) new SeekerState();
839     }
840 
841     abstract protected void decodeFirst();
842     abstract protected void decodeNext();
843   }
844 
845   /**
846    * @param cell
847    * @param out
848    * @param encodingCtx
849    * @return unencoded size added
850    * @throws IOException
851    */
852   protected final int afterEncodingKeyValue(Cell cell, DataOutputStream out,
853       HFileBlockDefaultEncodingContext encodingCtx) throws IOException {
854     int size = 0;
855     if (encodingCtx.getHFileContext().isIncludesTags()) {
856       int tagsLength = cell.getTagsLength();
857       ByteBufferUtils.putCompressedInt(out, tagsLength);
858       // There are some tags to be written
859       if (tagsLength > 0) {
860         TagCompressionContext tagCompressionContext = encodingCtx.getTagCompressionContext();
861         // When tag compression is enabled, tagCompressionContext will have a not null value. Write
862         // the tags using Dictionary compression in such a case
863         if (tagCompressionContext != null) {
864           tagCompressionContext
865               .compressTags(out, cell.getTagsArray(), cell.getTagsOffset(), tagsLength);
866         } else {
867           out.write(cell.getTagsArray(), cell.getTagsOffset(), tagsLength);
868         }
869       }
870       size += tagsLength + KeyValue.TAGS_LENGTH_SIZE;
871     }
872     if (encodingCtx.getHFileContext().isIncludesMvcc()) {
873       // Copy memstore timestamp from the byte buffer to the output stream.
874       long memstoreTS = cell.getSequenceId();
875       WritableUtils.writeVLong(out, memstoreTS);
876       // TODO use a writeVLong which returns the #bytes written so that 2 time parsing can be
877       // avoided.
878       size += WritableUtils.getVIntSize(memstoreTS);
879     }
880     return size;
881   }
882 
883   protected final void afterDecodingKeyValue(DataInputStream source,
884       ByteBuffer dest, HFileBlockDefaultDecodingContext decodingCtx) throws IOException {
885     if (decodingCtx.getHFileContext().isIncludesTags()) {
886       int tagsLength = ByteBufferUtils.readCompressedInt(source);
887       // Put as unsigned short
888       dest.put((byte) ((tagsLength >> 8) & 0xff));
889       dest.put((byte) (tagsLength & 0xff));
890       if (tagsLength > 0) {
891         TagCompressionContext tagCompressionContext = decodingCtx.getTagCompressionContext();
892         // When tag compression is been used in this file, tagCompressionContext will have a not
893         // null value passed.
894         if (tagCompressionContext != null) {
895           tagCompressionContext.uncompressTags(source, dest, tagsLength);
896         } else {
897           ByteBufferUtils.copyFromStreamToBuffer(dest, source, tagsLength);
898         }
899       }
900     }
901     if (decodingCtx.getHFileContext().isIncludesMvcc()) {
902       long memstoreTS = -1;
903       try {
904         // Copy memstore timestamp from the data input stream to the byte
905         // buffer.
906         memstoreTS = WritableUtils.readVLong(source);
907         ByteBufferUtils.writeVLong(dest, memstoreTS);
908       } catch (IOException ex) {
909         throw new RuntimeException("Unable to copy memstore timestamp " +
910             memstoreTS + " after decoding a key/value");
911       }
912     }
913   }
914 
915   @Override
916   public HFileBlockEncodingContext newDataBlockEncodingContext(DataBlockEncoding encoding,
917       byte[] header, HFileContext meta) {
918     return new HFileBlockDefaultEncodingContext(encoding, header, meta);
919   }
920 
921   @Override
922   public HFileBlockDecodingContext newDataBlockDecodingContext(HFileContext meta) {
923     return new HFileBlockDefaultDecodingContext(meta);
924   }
925 
926   protected abstract ByteBuffer internalDecodeKeyValues(DataInputStream source,
927       int allocateHeaderLength, int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx)
928       throws IOException;
929 
930   /**
931    * Asserts that there is at least the given amount of unfilled space
932    * remaining in the given buffer.
933    * @param out typically, the buffer we are writing to
934    * @param length the required space in the buffer
935    * @throws EncoderBufferTooSmallException If there are no enough bytes.
936    */
937   protected static void ensureSpace(ByteBuffer out, int length)
938       throws EncoderBufferTooSmallException {
939     if (out.position() + length > out.limit()) {
940       throw new EncoderBufferTooSmallException(
941           "Buffer position=" + out.position() +
942           ", buffer limit=" + out.limit() +
943           ", length to be written=" + length);
944     }
945   }
946 
947   @Override
948   public void startBlockEncoding(HFileBlockEncodingContext blkEncodingCtx, DataOutputStream out)
949       throws IOException {
950     if (blkEncodingCtx.getClass() != HFileBlockDefaultEncodingContext.class) {
951       throw new IOException (this.getClass().getName() + " only accepts "
952           + HFileBlockDefaultEncodingContext.class.getName() + " as the " +
953           "encoding context.");
954     }
955 
956     HFileBlockDefaultEncodingContext encodingCtx =
957         (HFileBlockDefaultEncodingContext) blkEncodingCtx;
958     encodingCtx.prepareEncoding(out);
959     if (encodingCtx.getHFileContext().isIncludesTags()
960         && encodingCtx.getHFileContext().isCompressTags()) {
961       if (encodingCtx.getTagCompressionContext() != null) {
962         // It will be overhead to create the TagCompressionContext again and again for every block
963         // encoding.
964         encodingCtx.getTagCompressionContext().clear();
965       } else {
966         try {
967           TagCompressionContext tagCompressionContext = new TagCompressionContext(
968               LRUDictionary.class, Byte.MAX_VALUE);
969           encodingCtx.setTagCompressionContext(tagCompressionContext);
970         } catch (Exception e) {
971           throw new IOException("Failed to initialize TagCompressionContext", e);
972         }
973       }
974     }
975     ByteBufferUtils.putInt(out, 0); // DUMMY length. This will be updated in endBlockEncoding()
976     blkEncodingCtx.setEncodingState(new BufferedDataBlockEncodingState());
977   }
978 
979   private static class BufferedDataBlockEncodingState extends EncodingState {
980     int unencodedDataSizeWritten = 0;
981   }
982 
983   @Override
984   public int encode(Cell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
985       throws IOException {
986     BufferedDataBlockEncodingState state = (BufferedDataBlockEncodingState) encodingCtx
987         .getEncodingState();
988     int encodedKvSize = internalEncode(cell, (HFileBlockDefaultEncodingContext) encodingCtx, out);
989     state.unencodedDataSizeWritten += encodedKvSize;
990     return encodedKvSize;
991   }
992 
993   public abstract int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingCtx,
994       DataOutputStream out) throws IOException;
995 
996   @Override
997   public void endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out,
998       byte[] uncompressedBytesWithHeader) throws IOException {
999     BufferedDataBlockEncodingState state = (BufferedDataBlockEncodingState) encodingCtx
1000         .getEncodingState();
1001     // Write the unencodedDataSizeWritten (with header size)
1002     Bytes.putInt(uncompressedBytesWithHeader, HConstants.HFILEBLOCK_HEADER_SIZE
1003         + DataBlockEncoding.ID_SIZE, state.unencodedDataSizeWritten
1004         );
1005     if (encodingCtx.getDataBlockEncoding() != DataBlockEncoding.NONE) {
1006       encodingCtx.postEncoding(BlockType.ENCODED_DATA);
1007     } else {
1008       encodingCtx.postEncoding(BlockType.DATA);
1009     }
1010   }
1011 }