View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations
15   * under the License.
16   */
17  package org.apache.hadoop.hbase.io.encoding;
18  
19  import java.io.DataInputStream;
20  import java.io.DataOutputStream;
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  
24  import org.apache.hadoop.hbase.Cell;
25  import org.apache.hadoop.hbase.CellComparator;
26  import org.apache.hadoop.hbase.CellUtil;
27  import org.apache.hadoop.hbase.HConstants;
28  import org.apache.hadoop.hbase.KeyValue;
29  import org.apache.hadoop.hbase.KeyValue.KVComparator;
30  import org.apache.hadoop.hbase.KeyValue.SamePrefixComparator;
31  import org.apache.hadoop.hbase.KeyValue.Type;
32  import org.apache.hadoop.hbase.KeyValueUtil;
33  import org.apache.hadoop.hbase.SettableSequenceId;
34  import org.apache.hadoop.hbase.classification.InterfaceAudience;
35  import org.apache.hadoop.hbase.io.HeapSize;
36  import org.apache.hadoop.hbase.io.TagCompressionContext;
37  import org.apache.hadoop.hbase.io.hfile.BlockType;
38  import org.apache.hadoop.hbase.io.hfile.HFileContext;
39  import org.apache.hadoop.hbase.io.util.LRUDictionary;
40  import org.apache.hadoop.hbase.util.ByteBufferUtils;
41  import org.apache.hadoop.hbase.util.Bytes;
42  import org.apache.hadoop.hbase.util.ClassSize;
43  import org.apache.hadoop.io.WritableUtils;
44  
45  /**
46   * Base class for all data block encoders that use a buffer.
47   */
48  @InterfaceAudience.Private
49  abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
50  
51    private static int INITIAL_KEY_BUFFER_SIZE = 512;
52  
53    @Override
54    public ByteBuffer decodeKeyValues(DataInputStream source,
55        HFileBlockDecodingContext blkDecodingCtx) throws IOException {
56      if (blkDecodingCtx.getClass() != HFileBlockDefaultDecodingContext.class) {
57        throw new IOException(this.getClass().getName() + " only accepts "
58            + HFileBlockDefaultDecodingContext.class.getName() + " as the decoding context.");
59      }
60  
61      HFileBlockDefaultDecodingContext decodingCtx =
62          (HFileBlockDefaultDecodingContext) blkDecodingCtx;
63      if (decodingCtx.getHFileContext().isIncludesTags()
64          && decodingCtx.getHFileContext().isCompressTags()) {
65        if (decodingCtx.getTagCompressionContext() != null) {
66          // It will be overhead to create the TagCompressionContext again and again for every block
67          // decoding.
68          decodingCtx.getTagCompressionContext().clear();
69        } else {
70          try {
71            TagCompressionContext tagCompressionContext = new TagCompressionContext(
72                LRUDictionary.class, Byte.MAX_VALUE);
73            decodingCtx.setTagCompressionContext(tagCompressionContext);
74          } catch (Exception e) {
75            throw new IOException("Failed to initialize TagCompressionContext", e);
76          }
77        }
78      }
79      return internalDecodeKeyValues(source, 0, 0, decodingCtx);
80    }
81  
82    protected static class SeekerState implements Cell {
83      protected ByteBuffer currentBuffer;
84      protected TagCompressionContext tagCompressionContext;
85      protected int valueOffset = -1;
86      protected int keyLength;
87      protected int valueLength;
88      protected int lastCommonPrefix;
89      protected int tagsLength = 0;
90      protected int tagsOffset = -1;
91      protected int tagsCompressedLength = 0;
92      protected boolean uncompressTags = true;
93  
94      /** We need to store a copy of the key. */
95      protected byte[] keyBuffer = new byte[INITIAL_KEY_BUFFER_SIZE];
96      protected byte[] tagsBuffer = new byte[INITIAL_KEY_BUFFER_SIZE];
97  
98      protected long memstoreTS;
99      protected int nextKvOffset;
100     protected KeyValue.KeyOnlyKeyValue currentKey = new KeyValue.KeyOnlyKeyValue();
101 
102     protected boolean isValid() {
103       return valueOffset != -1;
104     }
105 
106     protected void invalidate() {
107       valueOffset = -1;
108       tagsCompressedLength = 0;
109       currentKey = new KeyValue.KeyOnlyKeyValue();
110       uncompressTags = true;
111       currentBuffer = null;
112     }
113 
114     protected void ensureSpaceForKey() {
115       if (keyLength > keyBuffer.length) {
116         // rare case, but we need to handle arbitrary length of key
117         int newKeyBufferLength = Math.max(keyBuffer.length, 1) * 2;
118         while (keyLength > newKeyBufferLength) {
119           newKeyBufferLength *= 2;
120         }
121         byte[] newKeyBuffer = new byte[newKeyBufferLength];
122         System.arraycopy(keyBuffer, 0, newKeyBuffer, 0, keyBuffer.length);
123         keyBuffer = newKeyBuffer;
124       }
125     }
126 
127     protected void ensureSpaceForTags() {
128       if (tagsLength > tagsBuffer.length) {
129         // rare case, but we need to handle arbitrary length of tags
130         int newTagsBufferLength = Math.max(tagsBuffer.length, 1) * 2;
131         while (tagsLength > newTagsBufferLength) {
132           newTagsBufferLength *= 2;
133         }
134         byte[] newTagsBuffer = new byte[newTagsBufferLength];
135         System.arraycopy(tagsBuffer, 0, newTagsBuffer, 0, tagsBuffer.length);
136         tagsBuffer = newTagsBuffer;
137       }
138     }
139 
140     protected void setKey(byte[] keyBuffer, long memTS) {
141       currentKey.setKey(keyBuffer, 0, keyLength);
142       memstoreTS = memTS;
143     }
144 
145     /**
146      * Copy the state from the next one into this instance (the previous state
147      * placeholder). Used to save the previous state when we are advancing the
148      * seeker to the next key/value.
149      */
150     protected void copyFromNext(SeekerState nextState) {
151       if (keyBuffer.length != nextState.keyBuffer.length) {
152         keyBuffer = nextState.keyBuffer.clone();
153       } else if (!isValid()) {
154         // Note: we can only call isValid before we override our state, so this
155         // comes before all the assignments at the end of this method.
156         System.arraycopy(nextState.keyBuffer, 0, keyBuffer, 0,
157              nextState.keyLength);
158       } else {
159         // don't copy the common prefix between this key and the previous one
160         System.arraycopy(nextState.keyBuffer, nextState.lastCommonPrefix,
161             keyBuffer, nextState.lastCommonPrefix, nextState.keyLength
162                 - nextState.lastCommonPrefix);
163       }
164       currentKey = nextState.currentKey;
165 
166       valueOffset = nextState.valueOffset;
167       keyLength = nextState.keyLength;
168       valueLength = nextState.valueLength;
169       lastCommonPrefix = nextState.lastCommonPrefix;
170       nextKvOffset = nextState.nextKvOffset;
171       memstoreTS = nextState.memstoreTS;
172       currentBuffer = nextState.currentBuffer;
173       tagsOffset = nextState.tagsOffset;
174       tagsLength = nextState.tagsLength;
175       if (nextState.tagCompressionContext != null) {
176         tagCompressionContext = nextState.tagCompressionContext;
177       }
178     }
179 
180     @Override
181     public byte[] getRowArray() {
182       return currentKey.getRowArray();
183     }
184 
185     @Override
186     public int getRowOffset() {
187       return Bytes.SIZEOF_SHORT;
188     }
189 
190     @Override
191     public short getRowLength() {
192       return currentKey.getRowLength();
193     }
194 
195     @Override
196     public byte[] getFamilyArray() {
197       return currentKey.getFamilyArray();
198     }
199 
200     @Override
201     public int getFamilyOffset() {
202       return currentKey.getFamilyOffset();
203     }
204 
205     @Override
206     public byte getFamilyLength() {
207       return currentKey.getFamilyLength();
208     }
209 
210     @Override
211     public byte[] getQualifierArray() {
212       return currentKey.getQualifierArray();
213     }
214 
215     @Override
216     public int getQualifierOffset() {
217       return currentKey.getQualifierOffset();
218     }
219 
220     @Override
221     public int getQualifierLength() {
222       return currentKey.getQualifierLength();
223     }
224 
225     @Override
226     public long getTimestamp() {
227       return currentKey.getTimestamp();
228     }
229 
230     @Override
231     public byte getTypeByte() {
232       return currentKey.getTypeByte();
233     }
234 
235     @Override
236     public long getMvccVersion() {
237       return memstoreTS;
238     }
239 
240     @Override
241     public long getSequenceId() {
242       return memstoreTS;
243     }
244 
245     @Override
246     public byte[] getValueArray() {
247       return currentBuffer.array();
248     }
249 
250     @Override
251     public int getValueOffset() {
252       return currentBuffer.arrayOffset() + valueOffset;
253     }
254 
255     @Override
256     public int getValueLength() {
257       return valueLength;
258     }
259 
260     @Override
261     public byte[] getTagsArray() {
262       if (tagCompressionContext != null) {
263         return tagsBuffer;
264       }
265       return currentBuffer.array();
266     }
267 
268     @Override
269     public int getTagsOffset() {
270       if (tagCompressionContext != null) {
271         return 0;
272       }
273       return currentBuffer.arrayOffset() + tagsOffset;
274     }
275 
276     @Override
277     public int getTagsLength() {
278       return tagsLength;
279     }
280 
281     @Override
282     @Deprecated
283     public byte[] getValue() {
284       throw new UnsupportedOperationException("getValue() not supported");
285     }
286 
287     @Override
288     @Deprecated
289     public byte[] getFamily() {
290       throw new UnsupportedOperationException("getFamily() not supported");
291     }
292 
293     @Override
294     @Deprecated
295     public byte[] getQualifier() {
296       throw new UnsupportedOperationException("getQualifier() not supported");
297     }
298 
299     @Override
300     @Deprecated
301     public byte[] getRow() {
302       throw new UnsupportedOperationException("getRow() not supported");
303     }
304 
305     @Override
306     public String toString() {
307       return KeyValue.keyToString(this.keyBuffer, 0, KeyValueUtil.keyLength(this)) + "/vlen="
308           + getValueLength() + "/seqid=" + memstoreTS;
309     }
310 
311     public Cell shallowCopy() {
312       return new ClonedSeekerState(currentBuffer, keyBuffer, currentKey.getRowLength(),
313           currentKey.getFamilyOffset(), currentKey.getFamilyLength(), keyLength, 
314           currentKey.getQualifierOffset(), currentKey.getQualifierLength(),
315           currentKey.getTimestamp(), currentKey.getTypeByte(), valueLength, valueOffset,
316           memstoreTS, tagsOffset, tagsLength, tagCompressionContext, tagsBuffer);
317     }
318   }
319 
320   /**
321    * Copies only the key part of the keybuffer by doing a deep copy and passes the 
322    * seeker state members for taking a clone.
323    * Note that the value byte[] part is still pointing to the currentBuffer and the 
324    * represented by the valueOffset and valueLength
325    */
326   // We return this as a Cell to the upper layers of read flow and might try setting a new SeqId
327   // there. So this has to be an instance of SettableSequenceId. SeekerState need not be
328   // SettableSequenceId as we never return that to top layers. When we have to, we make
329   // ClonedSeekerState from it.
330   protected static class ClonedSeekerState implements Cell, HeapSize, SettableSequenceId {
331     private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
332         + (4 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT)
333         + (Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) + (2 * ClassSize.ARRAY));
334     private byte[] keyOnlyBuffer;
335     private ByteBuffer currentBuffer;
336     private short rowLength;
337     private int familyOffset;
338     private byte familyLength;
339     private int qualifierOffset;
340     private int qualifierLength;
341     private long timestamp;
342     private byte typeByte;
343     private int valueOffset;
344     private int valueLength;
345     private int tagsLength;
346     private int tagsOffset;
347     private byte[] cloneTagsBuffer;
348     private long seqId;
349     private TagCompressionContext tagCompressionContext;
350     
351     protected ClonedSeekerState(ByteBuffer currentBuffer, byte[] keyBuffer, short rowLength,
352         int familyOffset, byte familyLength, int keyLength, int qualOffset, int qualLength,
353         long timeStamp, byte typeByte, int valueLen, int valueOffset, long seqId,
354         int tagsOffset, int tagsLength, TagCompressionContext tagCompressionContext,
355         byte[] tagsBuffer) {
356       this.currentBuffer = currentBuffer;
357       keyOnlyBuffer = new byte[keyLength];
358       this.tagCompressionContext = tagCompressionContext;
359       this.rowLength = rowLength;
360       this.familyOffset = familyOffset;
361       this.familyLength = familyLength;
362       this.qualifierOffset = qualOffset;
363       this.qualifierLength = qualLength;
364       this.timestamp = timeStamp;
365       this.typeByte = typeByte;
366       this.valueLength = valueLen;
367       this.valueOffset = valueOffset;
368       this.tagsOffset = tagsOffset;
369       this.tagsLength = tagsLength;
370       System.arraycopy(keyBuffer, 0, keyOnlyBuffer, 0, keyLength);
371       if (tagCompressionContext != null) {
372         this.cloneTagsBuffer = new byte[tagsLength];
373         System.arraycopy(tagsBuffer, 0, this.cloneTagsBuffer, 0, tagsLength);
374       }
375       setSequenceId(seqId);
376     }
377 
378     @Override
379     public byte[] getRowArray() {
380       return keyOnlyBuffer;
381     }
382 
383     @Override
384     public byte[] getFamilyArray() {
385       return keyOnlyBuffer;
386     }
387 
388     @Override
389     public byte[] getQualifierArray() {
390       return keyOnlyBuffer;
391     }
392 
393     @Override
394     public int getRowOffset() {
395       return Bytes.SIZEOF_SHORT;
396     }
397 
398     @Override
399     public short getRowLength() {
400       return rowLength;
401     }
402 
403     @Override
404     public int getFamilyOffset() {
405       return familyOffset;
406     }
407 
408     @Override
409     public byte getFamilyLength() {
410       return familyLength;
411     }
412 
413     @Override
414     public int getQualifierOffset() {
415       return qualifierOffset;
416     }
417 
418     @Override
419     public int getQualifierLength() {
420       return qualifierLength;
421     }
422 
423     @Override
424     public long getTimestamp() {
425       return timestamp;
426     }
427 
428     @Override
429     public byte getTypeByte() {
430       return typeByte;
431     }
432 
433     @Override
434     @Deprecated
435     public long getMvccVersion() {
436       return getSequenceId();
437     }
438 
439     @Override
440     public long getSequenceId() {
441       return seqId;
442     }
443 
444     @Override
445     public byte[] getValueArray() {
446       return currentBuffer.array();
447     }
448 
449     @Override
450     public int getValueOffset() {
451       return currentBuffer.arrayOffset() + valueOffset;
452     }
453 
454     @Override
455     public int getValueLength() {
456       return valueLength;
457     }
458 
459     @Override
460     public byte[] getTagsArray() {
461       if (tagCompressionContext != null) {
462         return cloneTagsBuffer;
463       }
464       return currentBuffer.array();
465     }
466 
467     @Override
468     public int getTagsOffset() {
469       if (tagCompressionContext != null) {
470         return 0;
471       }
472       return currentBuffer.arrayOffset() + tagsOffset;
473     }
474 
475     @Override
476     public int getTagsLength() {
477       return tagsLength;
478     }
479 
480     @Override
481     @Deprecated
482     public byte[] getValue() {
483       return CellUtil.cloneValue(this);
484     }
485 
486     @Override
487     @Deprecated
488     public byte[] getFamily() {
489       return CellUtil.cloneFamily(this);
490     }
491 
492     @Override
493     @Deprecated
494     public byte[] getQualifier() {
495       return CellUtil.cloneQualifier(this);
496     }
497 
498     @Override
499     @Deprecated
500     public byte[] getRow() {
501       return CellUtil.cloneRow(this);
502     }
503 
504     @Override
505     public String toString() {
506       return KeyValue.keyToString(this.keyOnlyBuffer, 0, KeyValueUtil.keyLength(this)) + "/vlen="
507           + getValueLength() + "/seqid=" + seqId;
508     }
509 
510     @Override
511     public void setSequenceId(long seqId) {
512       this.seqId = seqId;
513     }
514 
515     @Override
516     public long heapSize() {
517       return FIXED_OVERHEAD + rowLength + familyLength + qualifierLength + valueLength + tagsLength;
518     }
519   }
520 
521   protected abstract static class
522       BufferedEncodedSeeker<STATE extends SeekerState>
523       implements EncodedSeeker {
524     protected HFileBlockDecodingContext decodingCtx;
525     protected final KVComparator comparator;
526     protected final SamePrefixComparator<byte[]> samePrefixComparator;
527     protected ByteBuffer currentBuffer;
528     protected STATE current = createSeekerState(); // always valid
529     protected STATE previous = createSeekerState(); // may not be valid
530     protected TagCompressionContext tagCompressionContext = null;
531 
532     public BufferedEncodedSeeker(KVComparator comparator,
533         HFileBlockDecodingContext decodingCtx) {
534       this.comparator = comparator;
535       this.samePrefixComparator = comparator;
536       this.decodingCtx = decodingCtx;
537       if (decodingCtx.getHFileContext().isCompressTags()) {
538         try {
539           tagCompressionContext = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
540         } catch (Exception e) {
541           throw new RuntimeException("Failed to initialize TagCompressionContext", e);
542         }
543       }
544     }
545     
546     protected boolean includesMvcc() {
547       return this.decodingCtx.getHFileContext().isIncludesMvcc();
548     }
549 
550     protected boolean includesTags() {
551       return this.decodingCtx.getHFileContext().isIncludesTags();
552     }
553 
554     @Override
555     public int compareKey(KVComparator comparator, byte[] key, int offset, int length) {
556       return comparator.compareFlatKey(key, offset, length,
557           current.keyBuffer, 0, current.keyLength);
558     }
559 
560     @Override
561     public int compareKey(KVComparator comparator, Cell key) {
562       return comparator.compareOnlyKeyPortion(key,
563           new KeyValue.KeyOnlyKeyValue(current.keyBuffer, 0, current.keyLength));
564     }
565 
566     @Override
567     public void setCurrentBuffer(ByteBuffer buffer) {
568       if (this.tagCompressionContext != null) {
569         this.tagCompressionContext.clear();
570       }
571       currentBuffer = buffer;
572       current.currentBuffer = currentBuffer;
573       if(tagCompressionContext != null) {
574         current.tagCompressionContext = tagCompressionContext;
575       }
576       decodeFirst();
577       current.setKey(current.keyBuffer, current.memstoreTS);
578       previous.invalidate();
579     }
580 
581     @Override
582     public ByteBuffer getKeyDeepCopy() {
583       ByteBuffer keyBuffer = ByteBuffer.allocate(current.keyLength);
584       keyBuffer.put(current.keyBuffer, 0, current.keyLength);
585       keyBuffer.rewind();
586       return keyBuffer;
587     }
588 
589     @Override
590     public ByteBuffer getValueShallowCopy() {
591       ByteBuffer dup = currentBuffer.duplicate();
592       dup.position(current.valueOffset);
593       dup.limit(current.valueOffset + current.valueLength);
594       return dup.slice();
595     }
596 
597     @Override
598     public Cell getKeyValue() {
599       return current.shallowCopy();
600     }
601 
602     @Override
603     public void rewind() {
604       currentBuffer.rewind();
605       if (tagCompressionContext != null) {
606         tagCompressionContext.clear();
607       }
608       decodeFirst();
609       current.setKey(current.keyBuffer, current.memstoreTS);
610       previous.invalidate();
611     }
612 
613     @Override
614     public boolean next() {
615       if (!currentBuffer.hasRemaining()) {
616         return false;
617       }
618       decodeNext();
619       current.setKey(current.keyBuffer, current.memstoreTS);
620       previous.invalidate();
621       return true;
622     }
623 
624     protected void decodeTags() {
625       current.tagsLength = ByteBufferUtils.readCompressedInt(currentBuffer);
626       if (tagCompressionContext != null) {
627         if (current.uncompressTags) {
628           // Tag compression is been used. uncompress it into tagsBuffer
629           current.ensureSpaceForTags();
630           try {
631             current.tagsCompressedLength = tagCompressionContext.uncompressTags(currentBuffer,
632                 current.tagsBuffer, 0, current.tagsLength);
633           } catch (IOException e) {
634             throw new RuntimeException("Exception while uncompressing tags", e);
635           }
636         } else {
637           ByteBufferUtils.skip(currentBuffer, current.tagsCompressedLength);
638           current.uncompressTags = true;// Reset this.
639         }
640         current.tagsOffset = -1;
641       } else {
642         // When tag compress is not used, let us not do copying of tags bytes into tagsBuffer.
643         // Just mark the tags Offset so as to create the KV buffer later in getKeyValueBuffer()
644         current.tagsOffset = currentBuffer.position();
645         ByteBufferUtils.skip(currentBuffer, current.tagsLength);
646       }
647     }
648 
649     @Override
650     public int seekToKeyInBlock(byte[] key, int offset, int length, boolean seekBefore) {
651       return seekToKeyInBlock(new KeyValue.KeyOnlyKeyValue(key, offset, length), seekBefore);
652     }
653 
654     @Override
655     public int seekToKeyInBlock(Cell seekCell, boolean seekBefore) {
656       int rowCommonPrefix = 0;
657       int familyCommonPrefix = 0;
658       int qualCommonPrefix = 0;
659       previous.invalidate();
660       KeyValue.KeyOnlyKeyValue currentCell = new KeyValue.KeyOnlyKeyValue();
661       do {
662         int comp;
663         if (samePrefixComparator != null) {
664           currentCell.setKey(current.keyBuffer, 0, current.keyLength);
665           if (current.lastCommonPrefix != 0) {
666             // The KV format has row key length also in the byte array. The
667             // common prefix
668             // includes it. So we need to subtract to find out the common prefix
669             // in the
670             // row part alone
671             rowCommonPrefix = Math.min(rowCommonPrefix, current.lastCommonPrefix - 2);
672           }
673           if (current.lastCommonPrefix <= 2) {
674             rowCommonPrefix = 0;
675           }
676           rowCommonPrefix += CellComparator.findCommonPrefixInRowPart(seekCell, currentCell,
677               rowCommonPrefix);
678           comp = CellComparator.compareCommonRowPrefix(seekCell, currentCell, rowCommonPrefix);
679           if (comp == 0) {
680             comp = compareTypeBytes(seekCell, currentCell);
681             if (comp == 0) {
682               // Subtract the fixed row key length and the family key fixed length
683               familyCommonPrefix = Math.max(
684                   0,
685                   Math.min(familyCommonPrefix,
686                       current.lastCommonPrefix - (3 + currentCell.getRowLength())));
687               familyCommonPrefix += CellComparator.findCommonPrefixInFamilyPart(seekCell,
688                   currentCell, familyCommonPrefix);
689               comp = CellComparator.compareCommonFamilyPrefix(seekCell, currentCell,
690                   familyCommonPrefix);
691               if (comp == 0) {
692                 // subtract the rowkey fixed length and the family key fixed
693                 // length
694                 qualCommonPrefix = Math.max(
695                     0,
696                     Math.min(
697                         qualCommonPrefix,
698                         current.lastCommonPrefix
699                             - (3 + currentCell.getRowLength() + currentCell.getFamilyLength())));
700                 qualCommonPrefix += CellComparator.findCommonPrefixInQualifierPart(seekCell,
701                     currentCell, qualCommonPrefix);
702                 comp = CellComparator.compareCommonQualifierPrefix(seekCell, currentCell,
703                     qualCommonPrefix);
704                 if (comp == 0) {
705                   comp = CellComparator.compareTimestamps(seekCell, currentCell);
706                   if (comp == 0) {
707                     // Compare types. Let the delete types sort ahead of puts;
708                     // i.e. types
709                     // of higher numbers sort before those of lesser numbers.
710                     // Maximum
711                     // (255)
712                     // appears ahead of everything, and minimum (0) appears
713                     // after
714                     // everything.
715                     comp = (0xff & currentCell.getTypeByte()) - (0xff & seekCell.getTypeByte());
716                   }
717                 }
718               }
719             }
720           }
721         } else {
722           Cell r = new KeyValue.KeyOnlyKeyValue(current.keyBuffer, 0, current.keyLength);
723           comp = comparator.compareOnlyKeyPortion(seekCell, r);
724         }
725         if (comp == 0) { // exact match
726           if (seekBefore) {
727             if (!previous.isValid()) {
728               // The caller (seekBefore) has to ensure that we are not at the
729               // first key in the block.
730               throw new IllegalStateException("Cannot seekBefore if "
731                   + "positioned at the first key in the block: key="
732                   + Bytes.toStringBinary(seekCell.getRowArray()));
733             }
734             moveToPrevious();
735             return 1;
736           }
737           return 0;
738         }
739 
740         if (comp < 0) { // already too large, check previous
741           if (previous.isValid()) {
742             moveToPrevious();
743           } else {
744             return HConstants.INDEX_KEY_MAGIC; // using optimized index key
745           }
746           return 1;
747         }
748 
749         // move to next, if more data is available
750         if (currentBuffer.hasRemaining()) {
751           previous.copyFromNext(current);
752           decodeNext();
753           current.setKey(current.keyBuffer, current.memstoreTS);
754         } else {
755           break;
756         }
757       } while (true);
758 
759       // we hit the end of the block, not an exact match
760       return 1;
761     }
762 
763     private int compareTypeBytes(Cell key, Cell right) {
764       if (key.getFamilyLength() + key.getQualifierLength() == 0
765           && key.getTypeByte() == Type.Minimum.getCode()) {
766         // left is "bigger", i.e. it appears later in the sorted order
767         return 1;
768       }
769       if (right.getFamilyLength() + right.getQualifierLength() == 0
770           && right.getTypeByte() == Type.Minimum.getCode()) {
771         return -1;
772       }
773       return 0;
774     }
775 
776 
777     private void moveToPrevious() {
778       if (!previous.isValid()) {
779         throw new IllegalStateException(
780             "Can move back only once and not in first key in the block.");
781       }
782 
783       STATE tmp = previous;
784       previous = current;
785       current = tmp;
786 
787       // move after last key value
788       currentBuffer.position(current.nextKvOffset);
789       // Already decoded the tag bytes. We cache this tags into current state and also the total
790       // compressed length of the tags bytes. For the next time decodeNext() we don't need to decode
791       // the tags again. This might pollute the Data Dictionary what we use for the compression.
792       // When current.uncompressTags is false, we will just reuse the current.tagsBuffer and skip
793       // 'tagsCompressedLength' bytes of source stream.
794       // See in decodeTags()
795       current.tagsBuffer = previous.tagsBuffer;
796       current.tagsCompressedLength = previous.tagsCompressedLength;
797       current.uncompressTags = false;
798       current.setKey(current.keyBuffer, current.memstoreTS);
799       previous.invalidate();
800     }
801 
802     @SuppressWarnings("unchecked")
803     protected STATE createSeekerState() {
804       // This will fail for non-default seeker state if the subclass does not
805       // override this method.
806       return (STATE) new SeekerState();
807     }
808 
809     abstract protected void decodeFirst();
810     abstract protected void decodeNext();
811   }
812 
813   /**
814    * @param cell
815    * @param out
816    * @param encodingCtx
817    * @return unencoded size added
818    * @throws IOException
819    */
820   protected final int afterEncodingKeyValue(Cell cell, DataOutputStream out,
821       HFileBlockDefaultEncodingContext encodingCtx) throws IOException {
822     int size = 0;
823     if (encodingCtx.getHFileContext().isIncludesTags()) {
824       int tagsLength = cell.getTagsLength();
825       ByteBufferUtils.putCompressedInt(out, tagsLength);
826       // There are some tags to be written
827       if (tagsLength > 0) {
828         TagCompressionContext tagCompressionContext = encodingCtx.getTagCompressionContext();
829         // When tag compression is enabled, tagCompressionContext will have a not null value. Write
830         // the tags using Dictionary compression in such a case
831         if (tagCompressionContext != null) {
832           tagCompressionContext
833               .compressTags(out, cell.getTagsArray(), cell.getTagsOffset(), tagsLength);
834         } else {
835           out.write(cell.getTagsArray(), cell.getTagsOffset(), tagsLength);
836         }
837       }
838       size += tagsLength + KeyValue.TAGS_LENGTH_SIZE;
839     }
840     if (encodingCtx.getHFileContext().isIncludesMvcc()) {
841       // Copy memstore timestamp from the byte buffer to the output stream.
842       long memstoreTS = cell.getSequenceId();
843       WritableUtils.writeVLong(out, memstoreTS);
844       // TODO use a writeVLong which returns the #bytes written so that 2 time parsing can be
845       // avoided.
846       size += WritableUtils.getVIntSize(memstoreTS);
847     }
848     return size;
849   }
850 
851   protected final void afterDecodingKeyValue(DataInputStream source,
852       ByteBuffer dest, HFileBlockDefaultDecodingContext decodingCtx) throws IOException {
853     if (decodingCtx.getHFileContext().isIncludesTags()) {
854       int tagsLength = ByteBufferUtils.readCompressedInt(source);
855       // Put as unsigned short
856       dest.put((byte) ((tagsLength >> 8) & 0xff));
857       dest.put((byte) (tagsLength & 0xff));
858       if (tagsLength > 0) {
859         TagCompressionContext tagCompressionContext = decodingCtx.getTagCompressionContext();
860         // When tag compression is been used in this file, tagCompressionContext will have a not
861         // null value passed.
862         if (tagCompressionContext != null) {
863           tagCompressionContext.uncompressTags(source, dest, tagsLength);
864         } else {
865           ByteBufferUtils.copyFromStreamToBuffer(dest, source, tagsLength);
866         }
867       }
868     }
869     if (decodingCtx.getHFileContext().isIncludesMvcc()) {
870       long memstoreTS = -1;
871       try {
872         // Copy memstore timestamp from the data input stream to the byte
873         // buffer.
874         memstoreTS = WritableUtils.readVLong(source);
875         ByteBufferUtils.writeVLong(dest, memstoreTS);
876       } catch (IOException ex) {
877         throw new RuntimeException("Unable to copy memstore timestamp " +
878             memstoreTS + " after decoding a key/value");
879       }
880     }
881   }
882 
883   @Override
884   public HFileBlockEncodingContext newDataBlockEncodingContext(DataBlockEncoding encoding,
885       byte[] header, HFileContext meta) {
886     return new HFileBlockDefaultEncodingContext(encoding, header, meta);
887   }
888 
889   @Override
890   public HFileBlockDecodingContext newDataBlockDecodingContext(HFileContext meta) {
891     return new HFileBlockDefaultDecodingContext(meta);
892   }
893 
894   protected abstract ByteBuffer internalDecodeKeyValues(DataInputStream source,
895       int allocateHeaderLength, int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx)
896       throws IOException;
897 
898   /**
899    * Asserts that there is at least the given amount of unfilled space
900    * remaining in the given buffer.
901    * @param out typically, the buffer we are writing to
902    * @param length the required space in the buffer
903    * @throws EncoderBufferTooSmallException If there are no enough bytes.
904    */
905   protected static void ensureSpace(ByteBuffer out, int length)
906       throws EncoderBufferTooSmallException {
907     if (out.position() + length > out.limit()) {
908       throw new EncoderBufferTooSmallException(
909           "Buffer position=" + out.position() +
910           ", buffer limit=" + out.limit() +
911           ", length to be written=" + length);
912     }
913   }
914 
915   @Override
916   public void startBlockEncoding(HFileBlockEncodingContext blkEncodingCtx, DataOutputStream out)
917       throws IOException {
918     if (blkEncodingCtx.getClass() != HFileBlockDefaultEncodingContext.class) {
919       throw new IOException (this.getClass().getName() + " only accepts "
920           + HFileBlockDefaultEncodingContext.class.getName() + " as the " +
921           "encoding context.");
922     }
923 
924     HFileBlockDefaultEncodingContext encodingCtx =
925         (HFileBlockDefaultEncodingContext) blkEncodingCtx;
926     encodingCtx.prepareEncoding(out);
927     if (encodingCtx.getHFileContext().isIncludesTags()
928         && encodingCtx.getHFileContext().isCompressTags()) {
929       if (encodingCtx.getTagCompressionContext() != null) {
930         // It will be overhead to create the TagCompressionContext again and again for every block
931         // encoding.
932         encodingCtx.getTagCompressionContext().clear();
933       } else {
934         try {
935           TagCompressionContext tagCompressionContext = new TagCompressionContext(
936               LRUDictionary.class, Byte.MAX_VALUE);
937           encodingCtx.setTagCompressionContext(tagCompressionContext);
938         } catch (Exception e) {
939           throw new IOException("Failed to initialize TagCompressionContext", e);
940         }
941       }
942     }
943     ByteBufferUtils.putInt(out, 0); // DUMMY length. This will be updated in endBlockEncoding()
944     blkEncodingCtx.setEncodingState(new BufferedDataBlockEncodingState());
945   }
946 
947   private static class BufferedDataBlockEncodingState extends EncodingState {
948     int unencodedDataSizeWritten = 0;
949   }
950 
951   @Override
952   public int encode(Cell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
953       throws IOException {
954     BufferedDataBlockEncodingState state = (BufferedDataBlockEncodingState) encodingCtx
955         .getEncodingState();
956     int encodedKvSize = internalEncode(cell, (HFileBlockDefaultEncodingContext) encodingCtx, out);
957     state.unencodedDataSizeWritten += encodedKvSize;
958     return encodedKvSize;
959   }
960 
961   public abstract int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingCtx,
962       DataOutputStream out) throws IOException;
963 
964   @Override
965   public void endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out,
966       byte[] uncompressedBytesWithHeader) throws IOException {
967     BufferedDataBlockEncodingState state = (BufferedDataBlockEncodingState) encodingCtx
968         .getEncodingState();
969     // Write the unencodedDataSizeWritten (with header size)
970     Bytes.putInt(uncompressedBytesWithHeader, HConstants.HFILEBLOCK_HEADER_SIZE
971         + DataBlockEncoding.ID_SIZE, state.unencodedDataSizeWritten
972         );
973     if (encodingCtx.getDataBlockEncoding() != DataBlockEncoding.NONE) {
974       encodingCtx.postEncoding(BlockType.ENCODED_DATA);
975     } else {
976       encodingCtx.postEncoding(BlockType.DATA);
977     }
978   }
979 }