View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations
15   * under the License.
16   */
17  package org.apache.hadoop.hbase.io.encoding;
18  
19  import java.io.DataInputStream;
20  import java.io.DataOutputStream;
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  
24  import org.apache.hadoop.classification.InterfaceAudience;
25  import org.apache.hadoop.hbase.Cell;
26  import org.apache.hadoop.hbase.CellComparator;
27  import org.apache.hadoop.hbase.HConstants;
28  import org.apache.hadoop.hbase.KeyValue;
29  import org.apache.hadoop.hbase.KeyValue.KVComparator;
30  import org.apache.hadoop.hbase.KeyValue.SamePrefixComparator;
31  import org.apache.hadoop.hbase.KeyValue.Type;
32  import org.apache.hadoop.hbase.KeyValueUtil;
33  import org.apache.hadoop.hbase.io.TagCompressionContext;
34  import org.apache.hadoop.hbase.io.hfile.BlockType;
35  import org.apache.hadoop.hbase.io.hfile.HFileContext;
36  import org.apache.hadoop.hbase.io.util.LRUDictionary;
37  import org.apache.hadoop.hbase.util.ByteBufferUtils;
38  import org.apache.hadoop.hbase.util.Bytes;
39  import org.apache.hadoop.io.WritableUtils;
40  
41  /**
42   * Base class for all data block encoders that use a buffer.
43   */
44  @InterfaceAudience.Private
45  abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
46  
47    private static int INITIAL_KEY_BUFFER_SIZE = 512;
48  
49    @Override
50    public ByteBuffer decodeKeyValues(DataInputStream source,
51        HFileBlockDecodingContext blkDecodingCtx) throws IOException {
52      if (blkDecodingCtx.getClass() != HFileBlockDefaultDecodingContext.class) {
53        throw new IOException(this.getClass().getName() + " only accepts "
54            + HFileBlockDefaultDecodingContext.class.getName() + " as the decoding context.");
55      }
56  
57      HFileBlockDefaultDecodingContext decodingCtx =
58          (HFileBlockDefaultDecodingContext) blkDecodingCtx;
59      if (decodingCtx.getHFileContext().isIncludesTags()
60          && decodingCtx.getHFileContext().isCompressTags()) {
61        if (decodingCtx.getTagCompressionContext() != null) {
62          // It will be overhead to create the TagCompressionContext again and again for every block
63          // decoding.
64          decodingCtx.getTagCompressionContext().clear();
65        } else {
66          try {
67            TagCompressionContext tagCompressionContext = new TagCompressionContext(
68                LRUDictionary.class, Byte.MAX_VALUE);
69            decodingCtx.setTagCompressionContext(tagCompressionContext);
70          } catch (Exception e) {
71            throw new IOException("Failed to initialize TagCompressionContext", e);
72          }
73        }
74      }
75      return internalDecodeKeyValues(source, 0, 0, decodingCtx);
76    }
77  
78    protected static class SeekerState implements Cell {
79      protected ByteBuffer currentBuffer;
80      protected TagCompressionContext tagCompressionContext;
81      protected int valueOffset = -1;
82      protected int keyLength;
83      protected int valueLength;
84      protected int lastCommonPrefix;
85      protected int tagsLength = 0;
86      protected int tagsOffset = -1;
87      protected int tagsCompressedLength = 0;
88      protected boolean uncompressTags = true;
89  
90      /** We need to store a copy of the key. */
91      protected byte[] keyBuffer = new byte[INITIAL_KEY_BUFFER_SIZE];
92      protected byte[] tagsBuffer = new byte[INITIAL_KEY_BUFFER_SIZE];
93  
94      protected long memstoreTS;
95      protected int nextKvOffset;
96      protected KeyValue.KeyOnlyKeyValue currentKey = new KeyValue.KeyOnlyKeyValue();
97  
98      protected boolean isValid() {
99        return valueOffset != -1;
100     }
101 
102     protected void invalidate() {
103       valueOffset = -1;
104       tagsCompressedLength = 0;
105       currentKey = new KeyValue.KeyOnlyKeyValue();
106       uncompressTags = true;
107       currentBuffer = null;
108     }
109 
110     protected void ensureSpaceForKey() {
111       if (keyLength > keyBuffer.length) {
112         // rare case, but we need to handle arbitrary length of key
113         int newKeyBufferLength = Math.max(keyBuffer.length, 1) * 2;
114         while (keyLength > newKeyBufferLength) {
115           newKeyBufferLength *= 2;
116         }
117         byte[] newKeyBuffer = new byte[newKeyBufferLength];
118         System.arraycopy(keyBuffer, 0, newKeyBuffer, 0, keyBuffer.length);
119         keyBuffer = newKeyBuffer;
120       }
121     }
122 
123     protected void ensureSpaceForTags() {
124       if (tagsLength > tagsBuffer.length) {
125         // rare case, but we need to handle arbitrary length of tags
126         int newTagsBufferLength = Math.max(tagsBuffer.length, 1) * 2;
127         while (tagsLength > newTagsBufferLength) {
128           newTagsBufferLength *= 2;
129         }
130         byte[] newTagsBuffer = new byte[newTagsBufferLength];
131         System.arraycopy(tagsBuffer, 0, newTagsBuffer, 0, tagsBuffer.length);
132         tagsBuffer = newTagsBuffer;
133       }
134     }
135 
136     protected void createKeyOnlyKeyValue(byte[] keyBuffer, long memTS) {
137       currentKey.setKey(keyBuffer, 0, keyLength);
138       memstoreTS = memTS;
139     }
140 
141     /**
142      * Copy the state from the next one into this instance (the previous state
143      * placeholder). Used to save the previous state when we are advancing the
144      * seeker to the next key/value.
145      */
146     protected void copyFromNext(SeekerState nextState) {
147       if (keyBuffer.length != nextState.keyBuffer.length) {
148         keyBuffer = nextState.keyBuffer.clone();
149       } else if (!isValid()) {
150         // Note: we can only call isValid before we override our state, so this
151         // comes before all the assignments at the end of this method.
152         System.arraycopy(nextState.keyBuffer, 0, keyBuffer, 0,
153              nextState.keyLength);
154       } else {
155         // don't copy the common prefix between this key and the previous one
156         System.arraycopy(nextState.keyBuffer, nextState.lastCommonPrefix,
157             keyBuffer, nextState.lastCommonPrefix, nextState.keyLength
158                 - nextState.lastCommonPrefix);
159       }
160       currentKey = nextState.currentKey;
161 
162       valueOffset = nextState.valueOffset;
163       keyLength = nextState.keyLength;
164       valueLength = nextState.valueLength;
165       lastCommonPrefix = nextState.lastCommonPrefix;
166       nextKvOffset = nextState.nextKvOffset;
167       memstoreTS = nextState.memstoreTS;
168       currentBuffer = nextState.currentBuffer;
169       if (nextState.tagCompressionContext != null) {
170         tagCompressionContext = nextState.tagCompressionContext;
171       }
172     }
173 
174     @Override
175     public byte[] getRowArray() {
176       return currentKey.getRowArray();
177     }
178 
179     @Override
180     public int getRowOffset() {
181       return Bytes.SIZEOF_SHORT;
182     }
183 
184     @Override
185     public short getRowLength() {
186       return currentKey.getRowLength();
187     }
188 
189     @Override
190     public byte[] getFamilyArray() {
191       return currentKey.getFamilyArray();
192     }
193 
194     @Override
195     public int getFamilyOffset() {
196       return currentKey.getFamilyOffset();
197     }
198 
199     @Override
200     public byte getFamilyLength() {
201       return currentKey.getFamilyLength();
202     }
203 
204     @Override
205     public byte[] getQualifierArray() {
206       return currentKey.getQualifierArray();
207     }
208 
209     @Override
210     public int getQualifierOffset() {
211       return currentKey.getQualifierOffset();
212     }
213 
214     @Override
215     public int getQualifierLength() {
216       return currentKey.getQualifierLength();
217     }
218 
219     @Override
220     public long getTimestamp() {
221       return currentKey.getTimestamp();
222     }
223 
224     @Override
225     public byte getTypeByte() {
226       return currentKey.getTypeByte();
227     }
228 
229     @Override
230     public long getMvccVersion() {
231       return memstoreTS;
232     }
233 
234     @Override
235     public byte[] getValueArray() {
236       return currentBuffer.array();
237     }
238 
239     @Override
240     public int getValueOffset() {
241       return currentBuffer.arrayOffset() + valueOffset;
242     }
243 
244     @Override
245     public int getValueLength() {
246       return valueLength;
247     }
248 
249     @Override
250     public byte[] getTagsArray() {
251       if (tagCompressionContext != null) {
252         return tagsBuffer;
253       }
254       return currentBuffer.array();
255     }
256 
257     @Override
258     public int getTagsOffset() {
259       if (tagCompressionContext != null) {
260         return 0;
261       }
262       return currentBuffer.arrayOffset() + tagsOffset;
263     }
264 
265     @Override
266     public short getTagsLength() {
267       return (short) tagsLength;
268     }
269 
270     @Override
271     @Deprecated
272     public byte[] getValue() {
273       throw new UnsupportedOperationException("getValue() not supported");
274     }
275 
276     @Override
277     @Deprecated
278     public byte[] getFamily() {
279       throw new UnsupportedOperationException("getFamily() not supported");
280     }
281 
282     @Override
283     @Deprecated
284     public byte[] getQualifier() {
285       throw new UnsupportedOperationException("getQualifier() not supported");
286     }
287 
288     @Override
289     @Deprecated
290     public byte[] getRow() {
291       throw new UnsupportedOperationException("getRow() not supported");
292     }
293 
294     @Override
295     public String toString() {
296       KeyValue kv = KeyValueUtil.copyToNewKeyValue(this);
297       if (kv == null) {
298         return "null";
299       }
300       return kv.toString();
301     }
302 
303     public Cell shallowCopy() {
304       return new ClonedSeekerState(currentBuffer, keyBuffer, currentKey.getRowLength(),
305           currentKey.getFamilyOffset(), currentKey.getFamilyLength(), keyLength, 
306           currentKey.getQualifierOffset(), currentKey.getQualifierLength(),
307           currentKey.getTimestamp(), currentKey.getTypeByte(), valueLength, valueOffset,
308           memstoreTS, tagsOffset, tagsLength, tagCompressionContext, tagsBuffer);
309     }
310           
311   }
312 
313   /**
314    * Copies only the key part of the keybuffer by doing a deep copy and passes the 
315    * seeker state members for taking a clone.
316    * Note that the value byte[] part is still pointing to the currentBuffer and the 
317    * represented by the valueOffset and valueLength
318    */
319   protected static class ClonedSeekerState implements Cell {
320     private byte[] keyOnlyBuffer;
321     private ByteBuffer currentBuffer;
322     private short rowLength;
323     private int familyOffset;
324     private byte familyLength;
325     private int qualifierOffset;
326     private int qualifierLength;
327     private long timestamp;
328     private byte typeByte;
329     private int valueOffset;
330     private int valueLength;
331     private int tagsLength;
332     private int tagsOffset;
333     private byte[] cloneTagsBuffer;
334     private long memstoreTS;
335     private TagCompressionContext tagCompressionContext;
336     
337     protected ClonedSeekerState(ByteBuffer currentBuffer, byte[] keyBuffer, short rowLength,
338         int familyOffset, byte familyLength, int keyLength, int qualOffset, int qualLength,
339         long timeStamp, byte typeByte, int valueLen, int valueOffset, long memStoreTS,
340         int tagsOffset, int tagsLength, TagCompressionContext tagCompressionContext,
341         byte[] tagsBuffer) {
342       this.currentBuffer = currentBuffer;
343       keyOnlyBuffer = new byte[keyLength];
344       this.tagCompressionContext = tagCompressionContext;
345       this.rowLength = rowLength;
346       this.familyOffset = familyOffset;
347       this.familyLength = familyLength;
348       this.qualifierOffset = qualOffset;
349       this.qualifierLength = qualLength;
350       this.timestamp = timeStamp;
351       this.typeByte = typeByte;
352       this.valueLength = valueLen;
353       this.valueOffset = valueOffset;
354       this.memstoreTS = memStoreTS;
355       this.tagsOffset = tagsOffset;
356       this.tagsLength = tagsLength;
357       System.arraycopy(keyBuffer, 0, keyOnlyBuffer, 0, keyLength);
358       if (tagCompressionContext != null) {
359         this.cloneTagsBuffer = new byte[tagsLength];
360         System.arraycopy(tagsBuffer, 0, this.cloneTagsBuffer, 0, tagsLength);
361       }
362     }
363 
364     @Override
365     public byte[] getRowArray() {
366       return keyOnlyBuffer;
367     }
368 
369     @Override
370     public byte[] getFamilyArray() {
371       return keyOnlyBuffer;
372     }
373 
374     @Override
375     public byte[] getQualifierArray() {
376       return keyOnlyBuffer;
377     }
378 
379     @Override
380     public int getRowOffset() {
381       return Bytes.SIZEOF_SHORT;
382     }
383 
384     @Override
385     public short getRowLength() {
386       return rowLength;
387     }
388 
389     @Override
390     public int getFamilyOffset() {
391       return familyOffset;
392     }
393 
394     @Override
395     public byte getFamilyLength() {
396       return familyLength;
397     }
398 
399     @Override
400     public int getQualifierOffset() {
401       return qualifierOffset;
402     }
403 
404     @Override
405     public int getQualifierLength() {
406       return qualifierLength;
407     }
408 
409     @Override
410     public long getTimestamp() {
411       return timestamp;
412     }
413 
414     @Override
415     public byte getTypeByte() {
416       return typeByte;
417     }
418 
419     @Override
420     public long getMvccVersion() {
421       return memstoreTS;
422     }
423 
424     @Override
425     public byte[] getValueArray() {
426       return currentBuffer.array();
427     }
428 
429     @Override
430     public int getValueOffset() {
431       return currentBuffer.arrayOffset() + valueOffset;
432     }
433 
434     @Override
435     public int getValueLength() {
436       return valueLength;
437     }
438 
439     @Override
440     public byte[] getTagsArray() {
441       if (tagCompressionContext != null) {
442         return cloneTagsBuffer;
443       }
444       return currentBuffer.array();
445     }
446 
447     @Override
448     public int getTagsOffset() {
449       if (tagCompressionContext != null) {
450         return 0;
451       }
452       return currentBuffer.arrayOffset() + tagsOffset;
453     }
454 
455     @Override
456     public short getTagsLength() {
457       return (short) tagsLength;
458     }
459 
460     @Override
461     @Deprecated
462     public byte[] getValue() {
463       throw new UnsupportedOperationException("getValue() not supported");
464     }
465 
466     @Override
467     @Deprecated
468     public byte[] getFamily() {
469       throw new UnsupportedOperationException("getFamily() not supported");
470     }
471 
472     @Override
473     @Deprecated
474     public byte[] getQualifier() {
475       throw new UnsupportedOperationException("getQualifier() not supported");
476     }
477 
478     @Override
479     @Deprecated
480     public byte[] getRow() {
481       throw new UnsupportedOperationException("getRow() not supported");
482     }
483 
484     @Override
485     public String toString() {
486       KeyValue kv = KeyValueUtil.copyToNewKeyValue(this);
487       if (kv == null) {
488         return "null";
489       }
490       return kv.toString();
491     }
492   }
493 
494   protected abstract static class
495       BufferedEncodedSeeker<STATE extends SeekerState>
496       implements EncodedSeeker {
497     protected HFileBlockDecodingContext decodingCtx;
498     protected final KVComparator comparator;
499     protected final SamePrefixComparator<byte[]> samePrefixComparator;
500     protected ByteBuffer currentBuffer;
501     protected STATE current = createSeekerState(); // always valid
502     protected STATE previous = createSeekerState(); // may not be valid
503     protected TagCompressionContext tagCompressionContext = null;
504 
505     public BufferedEncodedSeeker(KVComparator comparator,
506         HFileBlockDecodingContext decodingCtx) {
507       this.comparator = comparator;
508       this.samePrefixComparator = comparator;
509       this.decodingCtx = decodingCtx;
510       if (decodingCtx.getHFileContext().isCompressTags()) {
511         try {
512           tagCompressionContext = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
513         } catch (Exception e) {
514           throw new RuntimeException("Failed to initialize TagCompressionContext", e);
515         }
516       }
517     }
518     
519     protected boolean includesMvcc() {
520       return this.decodingCtx.getHFileContext().isIncludesMvcc();
521     }
522 
523     protected boolean includesTags() {
524       return this.decodingCtx.getHFileContext().isIncludesTags();
525     }
526 
527     @Override
528     public int compareKey(KVComparator comparator, byte[] key, int offset, int length) {
529       return comparator.compareFlatKey(key, offset, length,
530           current.keyBuffer, 0, current.keyLength);
531     }
532 
533     @Override
534     public int compareKey(KVComparator comparator, Cell key) {
535       return comparator.compareOnlyKeyPortion(key,
536           new KeyValue.KeyOnlyKeyValue(current.keyBuffer, 0, current.keyLength));
537     }
538 
539     @Override
540     public void setCurrentBuffer(ByteBuffer buffer) {
541       if (this.tagCompressionContext != null) {
542         this.tagCompressionContext.clear();
543       }
544       currentBuffer = buffer;
545       current.currentBuffer = currentBuffer;
546       if(tagCompressionContext != null) {
547         current.tagCompressionContext = tagCompressionContext;
548       }
549       decodeFirst();
550       current.createKeyOnlyKeyValue(current.keyBuffer, current.memstoreTS);
551       previous.invalidate();
552     }
553 
554     @Override
555     public ByteBuffer getKeyDeepCopy() {
556       ByteBuffer keyBuffer = ByteBuffer.allocate(current.keyLength);
557       keyBuffer.put(current.keyBuffer, 0, current.keyLength);
558       return keyBuffer;
559     }
560 
561     @Override
562     public ByteBuffer getValueShallowCopy() {
563       return ByteBuffer.wrap(currentBuffer.array(),
564           currentBuffer.arrayOffset() + current.valueOffset,
565           current.valueLength);
566     }
567 
568     @Override
569     public ByteBuffer getKeyValueBuffer() {
570       ByteBuffer kvBuffer = createKVBuffer();
571       kvBuffer.putInt(current.keyLength);
572       kvBuffer.putInt(current.valueLength);
573       kvBuffer.put(current.keyBuffer, 0, current.keyLength);
574       kvBuffer.put(currentBuffer.array(),
575           currentBuffer.arrayOffset() + current.valueOffset,
576           current.valueLength);
577       if (current.tagsLength > 0) {
578         kvBuffer.putShort((short) current.tagsLength);
579         if (current.tagsOffset != -1) {
580           // the offset of the tags bytes in the underlying buffer is marked. So the temp
581           // buffer,tagsBuffer was not been used.
582           kvBuffer.put(currentBuffer.array(), currentBuffer.arrayOffset() + current.tagsOffset,
583               current.tagsLength);
584         } else {
585           // When tagsOffset is marked as -1, tag compression was present and so the tags were
586           // uncompressed into temp buffer, tagsBuffer. Let us copy it from there
587           kvBuffer.put(current.tagsBuffer, 0, current.tagsLength);
588         }
589       }
590       return kvBuffer;
591     }
592 
593     protected ByteBuffer createKVBuffer() {
594       int kvBufSize = (int) KeyValue.getKeyValueDataStructureSize(current.keyLength,
595           current.valueLength, current.tagsLength);
596       ByteBuffer kvBuffer = ByteBuffer.allocate(kvBufSize);
597       return kvBuffer;
598     }
599 
600     @Override
601     public Cell getKeyValue() {
602       return current.shallowCopy();
603     }
604 
605     @Override
606     public void rewind() {
607       currentBuffer.rewind();
608       if (tagCompressionContext != null) {
609         tagCompressionContext.clear();
610       }
611       decodeFirst();
612       current.createKeyOnlyKeyValue(current.keyBuffer, current.memstoreTS);
613       previous.invalidate();
614     }
615 
616     @Override
617     public boolean next() {
618       if (!currentBuffer.hasRemaining()) {
619         return false;
620       }
621       decodeNext();
622       current.createKeyOnlyKeyValue(current.keyBuffer, current.memstoreTS);
623       previous.invalidate();
624       return true;
625     }
626 
627     protected void decodeTags() {
628       current.tagsLength = ByteBufferUtils.readCompressedInt(currentBuffer);
629       if (tagCompressionContext != null) {
630         if (current.uncompressTags) {
631           // Tag compression is been used. uncompress it into tagsBuffer
632           current.ensureSpaceForTags();
633           try {
634             current.tagsCompressedLength = tagCompressionContext.uncompressTags(currentBuffer,
635                 current.tagsBuffer, 0, current.tagsLength);
636           } catch (IOException e) {
637             throw new RuntimeException("Exception while uncompressing tags", e);
638           }
639         } else {
640           ByteBufferUtils.skip(currentBuffer, current.tagsCompressedLength);
641           current.uncompressTags = true;// Reset this.
642         }
643         current.tagsOffset = -1;
644       } else {
645         // When tag compress is not used, let us not do temp copying of tags bytes into tagsBuffer.
646         // Just mark the tags Offset so as to create the KV buffer later in getKeyValueBuffer()
647         current.tagsOffset = currentBuffer.position();
648         ByteBufferUtils.skip(currentBuffer, current.tagsLength);
649       }
650     }
651 
652     @Override
653     public int seekToKeyInBlock(byte[] key, int offset, int length, boolean seekBefore) {
654       return seekToKeyInBlock(new KeyValue.KeyOnlyKeyValue(key, offset, length), seekBefore);
655     }
656 
657     @Override
658     public int seekToKeyInBlock(Cell seekCell, boolean seekBefore) {
659       int rowCommonPrefix = 0;
660       int familyCommonPrefix = 0;
661       int qualCommonPrefix = 0;
662       previous.invalidate();
663       KeyValue.KeyOnlyKeyValue currentCell = new KeyValue.KeyOnlyKeyValue();
664       do {
665         int comp;
666         if (samePrefixComparator != null) {
667           currentCell.setKey(current.keyBuffer, 0, current.keyLength);
668           if (current.lastCommonPrefix != 0) {
669             // The KV format has row key length also in the byte array. The
670             // common prefix
671             // includes it. So we need to subtract to find out the common prefix
672             // in the
673             // row part alone
674             rowCommonPrefix = Math.min(rowCommonPrefix, current.lastCommonPrefix - 2);
675           }
676           if (current.lastCommonPrefix <= 2) {
677             rowCommonPrefix = 0;
678           }
679           rowCommonPrefix += CellComparator.findCommonPrefixInRowPart(seekCell, currentCell,
680               rowCommonPrefix);
681           comp = CellComparator.compareCommonRowPrefix(seekCell, currentCell, rowCommonPrefix);
682           if (comp == 0) {
683             comp = compareTypeBytes(seekCell, currentCell);
684             if (comp == 0) {
685               // Subtract the fixed row key length and the family key fixed length
686               familyCommonPrefix = Math.max(
687                   0,
688                   Math.min(familyCommonPrefix,
689                       current.lastCommonPrefix - (3 + currentCell.getRowLength())));
690               familyCommonPrefix += CellComparator.findCommonPrefixInFamilyPart(seekCell,
691                   currentCell, familyCommonPrefix);
692               comp = CellComparator.compareCommonFamilyPrefix(seekCell, currentCell,
693                   familyCommonPrefix);
694               if (comp == 0) {
695                 // subtract the rowkey fixed length and the family key fixed
696                 // length
697                 qualCommonPrefix = Math.max(
698                     0,
699                     Math.min(
700                         qualCommonPrefix,
701                         current.lastCommonPrefix
702                             - (3 + currentCell.getRowLength() + currentCell.getFamilyLength())));
703                 qualCommonPrefix += CellComparator.findCommonPrefixInQualifierPart(seekCell,
704                     currentCell, qualCommonPrefix);
705                 comp = CellComparator.compareCommonQualifierPrefix(seekCell, currentCell,
706                     qualCommonPrefix);
707                 if (comp == 0) {
708                   comp = CellComparator.compareTimestamps(seekCell, currentCell);
709                   if (comp == 0) {
710                     // Compare types. Let the delete types sort ahead of puts;
711                     // i.e. types
712                     // of higher numbers sort before those of lesser numbers.
713                     // Maximum
714                     // (255)
715                     // appears ahead of everything, and minimum (0) appears
716                     // after
717                     // everything.
718                     comp = (0xff & currentCell.getTypeByte()) - (0xff & seekCell.getTypeByte());
719                   }
720                 }
721               }
722             }
723           }
724         } else {
725           Cell r = new KeyValue.KeyOnlyKeyValue(current.keyBuffer, 0, current.keyLength);
726           comp = comparator.compareOnlyKeyPortion(seekCell, r);
727         }
728         if (comp == 0) { // exact match
729           if (seekBefore) {
730             if (!previous.isValid()) {
731               // The caller (seekBefore) has to ensure that we are not at the
732               // first key in the block.
733               throw new IllegalStateException("Cannot seekBefore if "
734                   + "positioned at the first key in the block: key="
735                   + Bytes.toStringBinary(seekCell.getRowArray()));
736             }
737             moveToPrevious();
738             return 1;
739           }
740           return 0;
741         }
742 
743         if (comp < 0) { // already too large, check previous
744           if (previous.isValid()) {
745             moveToPrevious();
746           } else {
747             return HConstants.INDEX_KEY_MAGIC; // using optimized index key
748           }
749           return 1;
750         }
751 
752         // move to next, if more data is available
753         if (currentBuffer.hasRemaining()) {
754           previous.copyFromNext(current);
755           decodeNext();
756           current.createKeyOnlyKeyValue(current.keyBuffer, current.memstoreTS);
757         } else {
758           break;
759         }
760       } while (true);
761 
762       // we hit the end of the block, not an exact match
763       return 1;
764     }
765 
766     private int compareTypeBytes(Cell key, Cell right) {
767       if (key.getFamilyLength() + key.getQualifierLength() == 0
768           && key.getTypeByte() == Type.Minimum.getCode()) {
769         // left is "bigger", i.e. it appears later in the sorted order
770         return 1;
771       }
772       if (right.getFamilyLength() + right.getQualifierLength() == 0
773           && right.getTypeByte() == Type.Minimum.getCode()) {
774         return -1;
775       }
776       return 0;
777     }
778 
779 
780     private void moveToPrevious() {
781       if (!previous.isValid()) {
782         throw new IllegalStateException(
783             "Can move back only once and not in first key in the block.");
784       }
785 
786       STATE tmp = previous;
787       previous = current;
788       current = tmp;
789 
790       // move after last key value
791       currentBuffer.position(current.nextKvOffset);
792       // Already decoded the tag bytes. We cache this tags into current state and also the total
793       // compressed length of the tags bytes. For the next time decodeNext() we don't need to decode
794       // the tags again. This might pollute the Data Dictionary what we use for the compression.
795       // When current.uncompressTags is false, we will just reuse the current.tagsBuffer and skip
796       // 'tagsCompressedLength' bytes of source stream.
797       // See in decodeTags()
798       current.tagsBuffer = previous.tagsBuffer;
799       current.tagsCompressedLength = previous.tagsCompressedLength;
800       current.uncompressTags = false;
801       previous.invalidate();
802     }
803 
804     @SuppressWarnings("unchecked")
805     protected STATE createSeekerState() {
806       // This will fail for non-default seeker state if the subclass does not
807       // override this method.
808       return (STATE) new SeekerState();
809     }
810 
811     abstract protected void decodeFirst();
812     abstract protected void decodeNext();
813   }
814 
815   /**
816    * @param kv
817    * @param out
818    * @param encodingCtx
819    * @return unencoded size added
820    * @throws IOException
821    */
822   protected final int afterEncodingKeyValue(KeyValue kv, DataOutputStream out,
823       HFileBlockDefaultEncodingContext encodingCtx) throws IOException {
824     int size = 0;
825     if (encodingCtx.getHFileContext().isIncludesTags()) {
826       short tagsLength = kv.getTagsLength();
827       ByteBufferUtils.putCompressedInt(out, tagsLength);
828       // There are some tags to be written
829       if (tagsLength > 0) {
830         TagCompressionContext tagCompressionContext = encodingCtx.getTagCompressionContext();
831         // When tag compression is enabled, tagCompressionContext will have a not null value. Write
832         // the tags using Dictionary compression in such a case
833         if (tagCompressionContext != null) {
834           tagCompressionContext
835               .compressTags(out, kv.getTagsArray(), kv.getTagsOffset(), tagsLength);
836         } else {
837           out.write(kv.getTagsArray(), kv.getTagsOffset(), tagsLength);
838         }
839       }
840       size += tagsLength + KeyValue.TAGS_LENGTH_SIZE;
841     }
842     if (encodingCtx.getHFileContext().isIncludesMvcc()) {
843       // Copy memstore timestamp from the byte buffer to the output stream.
844       long memstoreTS = kv.getMvccVersion();
845       WritableUtils.writeVLong(out, memstoreTS);
846       // TODO use a writeVLong which returns the #bytes written so that 2 time parsing can be
847       // avoided.
848       size += WritableUtils.getVIntSize(memstoreTS);
849     }
850     return size;
851   }
852 
853   protected final void afterDecodingKeyValue(DataInputStream source,
854       ByteBuffer dest, HFileBlockDefaultDecodingContext decodingCtx) throws IOException {
855     if (decodingCtx.getHFileContext().isIncludesTags()) {
856       short tagsLength = (short) ByteBufferUtils.readCompressedInt(source);
857       dest.putShort(tagsLength);
858       if (tagsLength > 0) {
859         TagCompressionContext tagCompressionContext = decodingCtx.getTagCompressionContext();
860         // When tag compression is been used in this file, tagCompressionContext will have a not
861         // null value passed.
862         if (tagCompressionContext != null) {
863           tagCompressionContext.uncompressTags(source, dest, tagsLength);
864         } else {
865           ByteBufferUtils.copyFromStreamToBuffer(dest, source, tagsLength);
866         }
867       }
868     }
869     if (decodingCtx.getHFileContext().isIncludesMvcc()) {
870       long memstoreTS = -1;
871       try {
872         // Copy memstore timestamp from the data input stream to the byte
873         // buffer.
874         memstoreTS = WritableUtils.readVLong(source);
875         ByteBufferUtils.writeVLong(dest, memstoreTS);
876       } catch (IOException ex) {
877         throw new RuntimeException("Unable to copy memstore timestamp " +
878             memstoreTS + " after decoding a key/value");
879       }
880     }
881   }
882 
883   @Override
884   public HFileBlockEncodingContext newDataBlockEncodingContext(DataBlockEncoding encoding,
885       byte[] header, HFileContext meta) {
886     return new HFileBlockDefaultEncodingContext(encoding, header, meta);
887   }
888 
889   @Override
890   public HFileBlockDecodingContext newDataBlockDecodingContext(HFileContext meta) {
891     return new HFileBlockDefaultDecodingContext(meta);
892   }
893 
894   protected abstract ByteBuffer internalDecodeKeyValues(DataInputStream source,
895       int allocateHeaderLength, int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx)
896       throws IOException;
897 
898   /**
899    * Asserts that there is at least the given amount of unfilled space
900    * remaining in the given buffer.
901    * @param out typically, the buffer we are writing to
902    * @param length the required space in the buffer
903    * @throws EncoderBufferTooSmallException If there are no enough bytes.
904    */
905   protected static void ensureSpace(ByteBuffer out, int length)
906       throws EncoderBufferTooSmallException {
907     if (out.position() + length > out.limit()) {
908       throw new EncoderBufferTooSmallException(
909           "Buffer position=" + out.position() +
910           ", buffer limit=" + out.limit() +
911           ", length to be written=" + length);
912     }
913   }
914 
915   @Override
916   public void startBlockEncoding(HFileBlockEncodingContext blkEncodingCtx, DataOutputStream out)
917       throws IOException {
918     if (blkEncodingCtx.getClass() != HFileBlockDefaultEncodingContext.class) {
919       throw new IOException (this.getClass().getName() + " only accepts "
920           + HFileBlockDefaultEncodingContext.class.getName() + " as the " +
921           "encoding context.");
922     }
923 
924     HFileBlockDefaultEncodingContext encodingCtx =
925         (HFileBlockDefaultEncodingContext) blkEncodingCtx;
926     encodingCtx.prepareEncoding(out);
927     if (encodingCtx.getHFileContext().isIncludesTags()
928         && encodingCtx.getHFileContext().isCompressTags()) {
929       if (encodingCtx.getTagCompressionContext() != null) {
930         // It will be overhead to create the TagCompressionContext again and again for every block
931         // encoding.
932         encodingCtx.getTagCompressionContext().clear();
933       } else {
934         try {
935           TagCompressionContext tagCompressionContext = new TagCompressionContext(
936               LRUDictionary.class, Byte.MAX_VALUE);
937           encodingCtx.setTagCompressionContext(tagCompressionContext);
938         } catch (Exception e) {
939           throw new IOException("Failed to initialize TagCompressionContext", e);
940         }
941       }
942     }
943     ByteBufferUtils.putInt(out, 0); // DUMMY length. This will be updated in endBlockEncoding()
944     blkEncodingCtx.setEncodingState(new BufferedDataBlockEncodingState());
945   }
946 
947   private static class BufferedDataBlockEncodingState extends EncodingState {
948     int unencodedDataSizeWritten = 0;
949   }
950 
951   @Override
952   public int encode(KeyValue kv, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
953       throws IOException {
954     BufferedDataBlockEncodingState state = (BufferedDataBlockEncodingState) encodingCtx
955         .getEncodingState();
956     int encodedKvSize = internalEncode(kv, (HFileBlockDefaultEncodingContext) encodingCtx, out);
957     state.unencodedDataSizeWritten += encodedKvSize;
958     return encodedKvSize;
959   }
960 
961   public abstract int internalEncode(KeyValue kv, HFileBlockDefaultEncodingContext encodingCtx,
962       DataOutputStream out) throws IOException;
963 
964   @Override
965   public void endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out,
966       byte[] uncompressedBytesWithHeader) throws IOException {
967     BufferedDataBlockEncodingState state = (BufferedDataBlockEncodingState) encodingCtx
968         .getEncodingState();
969     // Write the unencodedDataSizeWritten (with header size)
970     Bytes.putInt(uncompressedBytesWithHeader, HConstants.HFILEBLOCK_HEADER_SIZE
971         + DataBlockEncoding.ID_SIZE, state.unencodedDataSizeWritten
972         );
973     if (encodingCtx.getDataBlockEncoding() != DataBlockEncoding.NONE) {
974       encodingCtx.postEncoding(BlockType.ENCODED_DATA);
975     } else {
976       encodingCtx.postEncoding(BlockType.DATA);
977     }
978   }
979 }