View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations
15   * under the License.
16   */
17  package org.apache.hadoop.hbase.io.encoding;
18  
19  import java.io.DataInputStream;
20  import java.io.DataOutputStream;
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  
24  import org.apache.hadoop.classification.InterfaceAudience;
25  import org.apache.hadoop.hbase.Cell;
26  import org.apache.hadoop.hbase.CellComparator;
27  import org.apache.hadoop.hbase.HConstants;
28  import org.apache.hadoop.hbase.KeyValue;
29  import org.apache.hadoop.hbase.KeyValue.KVComparator;
30  import org.apache.hadoop.hbase.KeyValue.SamePrefixComparator;
31  import org.apache.hadoop.hbase.KeyValue.Type;
32  import org.apache.hadoop.hbase.KeyValueUtil;
33  import org.apache.hadoop.hbase.io.TagCompressionContext;
34  import org.apache.hadoop.hbase.io.hfile.BlockType;
35  import org.apache.hadoop.hbase.io.hfile.HFileContext;
36  import org.apache.hadoop.hbase.io.util.LRUDictionary;
37  import org.apache.hadoop.hbase.util.ByteBufferUtils;
38  import org.apache.hadoop.hbase.util.Bytes;
39  import org.apache.hadoop.io.WritableUtils;
40  
41  /**
42   * Base class for all data block encoders that use a buffer.
43   */
44  @InterfaceAudience.Private
45  abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
46  
47    private static int INITIAL_KEY_BUFFER_SIZE = 512;
48  
49    @Override
50    public ByteBuffer decodeKeyValues(DataInputStream source,
51        HFileBlockDecodingContext blkDecodingCtx) throws IOException {
52      if (blkDecodingCtx.getClass() != HFileBlockDefaultDecodingContext.class) {
53        throw new IOException(this.getClass().getName() + " only accepts "
54            + HFileBlockDefaultDecodingContext.class.getName() + " as the decoding context.");
55      }
56  
57      HFileBlockDefaultDecodingContext decodingCtx =
58          (HFileBlockDefaultDecodingContext) blkDecodingCtx;
59      if (decodingCtx.getHFileContext().isIncludesTags()
60          && decodingCtx.getHFileContext().isCompressTags()) {
61        if (decodingCtx.getTagCompressionContext() != null) {
62          // It will be overhead to create the TagCompressionContext again and again for every block
63          // decoding.
64          decodingCtx.getTagCompressionContext().clear();
65        } else {
66          try {
67            TagCompressionContext tagCompressionContext = new TagCompressionContext(
68                LRUDictionary.class, Byte.MAX_VALUE);
69            decodingCtx.setTagCompressionContext(tagCompressionContext);
70          } catch (Exception e) {
71            throw new IOException("Failed to initialize TagCompressionContext", e);
72          }
73        }
74      }
75      return internalDecodeKeyValues(source, 0, 0, decodingCtx);
76    }
77  
78    protected static class SeekerState implements Cell {
79      protected ByteBuffer currentBuffer;
80      protected TagCompressionContext tagCompressionContext;
81      protected int valueOffset = -1;
82      protected int keyLength;
83      protected int valueLength;
84      protected int lastCommonPrefix;
85      protected int tagsLength = 0;
86      protected int tagsOffset = -1;
87      protected int tagsCompressedLength = 0;
88      protected boolean uncompressTags = true;
89  
90      /** We need to store a copy of the key. */
91      protected byte[] keyBuffer = new byte[INITIAL_KEY_BUFFER_SIZE];
92      protected byte[] tagsBuffer = new byte[INITIAL_KEY_BUFFER_SIZE];
93  
94      protected long memstoreTS;
95      protected int nextKvOffset;
96      protected KeyValue.KeyOnlyKeyValue currentKey = new KeyValue.KeyOnlyKeyValue();
97  
98      protected boolean isValid() {
99        return valueOffset != -1;
100     }
101 
102     protected void invalidate() {
103       valueOffset = -1;
104       tagsCompressedLength = 0;
105       currentKey = new KeyValue.KeyOnlyKeyValue();
106       uncompressTags = true;
107       currentBuffer = null;
108     }
109 
110     protected void ensureSpaceForKey() {
111       if (keyLength > keyBuffer.length) {
112         // rare case, but we need to handle arbitrary length of key
113         int newKeyBufferLength = Math.max(keyBuffer.length, 1) * 2;
114         while (keyLength > newKeyBufferLength) {
115           newKeyBufferLength *= 2;
116         }
117         byte[] newKeyBuffer = new byte[newKeyBufferLength];
118         System.arraycopy(keyBuffer, 0, newKeyBuffer, 0, keyBuffer.length);
119         keyBuffer = newKeyBuffer;
120       }
121     }
122 
123     protected void ensureSpaceForTags() {
124       if (tagsLength > tagsBuffer.length) {
125         // rare case, but we need to handle arbitrary length of tags
126         int newTagsBufferLength = Math.max(tagsBuffer.length, 1) * 2;
127         while (tagsLength > newTagsBufferLength) {
128           newTagsBufferLength *= 2;
129         }
130         byte[] newTagsBuffer = new byte[newTagsBufferLength];
131         System.arraycopy(tagsBuffer, 0, newTagsBuffer, 0, tagsBuffer.length);
132         tagsBuffer = newTagsBuffer;
133       }
134     }
135 
136     protected void createKeyOnlyKeyValue(byte[] keyBuffer, long memTS) {
137       currentKey.setKey(keyBuffer, 0, keyLength);
138       memstoreTS = memTS;
139     }
140 
141     /**
142      * Copy the state from the next one into this instance (the previous state
143      * placeholder). Used to save the previous state when we are advancing the
144      * seeker to the next key/value.
145      */
146     protected void copyFromNext(SeekerState nextState) {
147       if (keyBuffer.length != nextState.keyBuffer.length) {
148         keyBuffer = nextState.keyBuffer.clone();
149       } else if (!isValid()) {
150         // Note: we can only call isValid before we override our state, so this
151         // comes before all the assignments at the end of this method.
152         System.arraycopy(nextState.keyBuffer, 0, keyBuffer, 0,
153              nextState.keyLength);
154       } else {
155         // don't copy the common prefix between this key and the previous one
156         System.arraycopy(nextState.keyBuffer, nextState.lastCommonPrefix,
157             keyBuffer, nextState.lastCommonPrefix, nextState.keyLength
158                 - nextState.lastCommonPrefix);
159       }
160       currentKey = nextState.currentKey;
161 
162       valueOffset = nextState.valueOffset;
163       keyLength = nextState.keyLength;
164       valueLength = nextState.valueLength;
165       lastCommonPrefix = nextState.lastCommonPrefix;
166       nextKvOffset = nextState.nextKvOffset;
167       memstoreTS = nextState.memstoreTS;
168       currentBuffer = nextState.currentBuffer;
169       if (nextState.tagCompressionContext != null) {
170         tagCompressionContext = nextState.tagCompressionContext;
171       }
172     }
173 
174     @Override
175     public byte[] getRowArray() {
176       return currentKey.getRowArray();
177     }
178 
179     @Override
180     public int getRowOffset() {
181       return Bytes.SIZEOF_SHORT;
182     }
183 
184     @Override
185     public short getRowLength() {
186       return currentKey.getRowLength();
187     }
188 
189     @Override
190     public byte[] getFamilyArray() {
191       return currentKey.getFamilyArray();
192     }
193 
194     @Override
195     public int getFamilyOffset() {
196       return currentKey.getFamilyOffset();
197     }
198 
199     @Override
200     public byte getFamilyLength() {
201       return currentKey.getFamilyLength();
202     }
203 
204     @Override
205     public byte[] getQualifierArray() {
206       return currentKey.getQualifierArray();
207     }
208 
209     @Override
210     public int getQualifierOffset() {
211       return currentKey.getQualifierOffset();
212     }
213 
214     @Override
215     public int getQualifierLength() {
216       return currentKey.getQualifierLength();
217     }
218 
219     @Override
220     public long getTimestamp() {
221       return currentKey.getTimestamp();
222     }
223 
224     @Override
225     public byte getTypeByte() {
226       return currentKey.getTypeByte();
227     }
228 
229     @Override
230     public long getMvccVersion() {
231       return memstoreTS;
232     }
233 
234     @Override
235     public long getSequenceId() {
236       return memstoreTS;
237     }
238 
239     @Override
240     public byte[] getValueArray() {
241       return currentBuffer.array();
242     }
243 
244     @Override
245     public int getValueOffset() {
246       return currentBuffer.arrayOffset() + valueOffset;
247     }
248 
249     @Override
250     public int getValueLength() {
251       return valueLength;
252     }
253 
254     @Override
255     public byte[] getTagsArray() {
256       if (tagCompressionContext != null) {
257         return tagsBuffer;
258       }
259       return currentBuffer.array();
260     }
261 
262     @Override
263     public int getTagsOffset() {
264       if (tagCompressionContext != null) {
265         return 0;
266       }
267       return currentBuffer.arrayOffset() + tagsOffset;
268     }
269 
270     @Override
271     public int getTagsLength() {
272       return tagsLength;
273     }
274 
275     @Override
276     @Deprecated
277     public byte[] getValue() {
278       throw new UnsupportedOperationException("getValue() not supported");
279     }
280 
281     @Override
282     @Deprecated
283     public byte[] getFamily() {
284       throw new UnsupportedOperationException("getFamily() not supported");
285     }
286 
287     @Override
288     @Deprecated
289     public byte[] getQualifier() {
290       throw new UnsupportedOperationException("getQualifier() not supported");
291     }
292 
293     @Override
294     @Deprecated
295     public byte[] getRow() {
296       throw new UnsupportedOperationException("getRow() not supported");
297     }
298 
299     @Override
300     public String toString() {
301       KeyValue kv = KeyValueUtil.copyToNewKeyValue(this);
302       if (kv == null) {
303         return "null";
304       }
305       return kv.toString();
306     }
307 
308     public Cell shallowCopy() {
309       return new ClonedSeekerState(currentBuffer, keyBuffer, currentKey.getRowLength(),
310           currentKey.getFamilyOffset(), currentKey.getFamilyLength(), keyLength, 
311           currentKey.getQualifierOffset(), currentKey.getQualifierLength(),
312           currentKey.getTimestamp(), currentKey.getTypeByte(), valueLength, valueOffset,
313           memstoreTS, tagsOffset, tagsLength, tagCompressionContext, tagsBuffer);
314     }
315   }
316 
317   /**
318    * Copies only the key part of the keybuffer by doing a deep copy and passes the 
319    * seeker state members for taking a clone.
320    * Note that the value byte[] part is still pointing to the currentBuffer and the 
321    * represented by the valueOffset and valueLength
322    */
323   protected static class ClonedSeekerState implements Cell {
324     private byte[] keyOnlyBuffer;
325     private ByteBuffer currentBuffer;
326     private short rowLength;
327     private int familyOffset;
328     private byte familyLength;
329     private int qualifierOffset;
330     private int qualifierLength;
331     private long timestamp;
332     private byte typeByte;
333     private int valueOffset;
334     private int valueLength;
335     private int tagsLength;
336     private int tagsOffset;
337     private byte[] cloneTagsBuffer;
338     private long memstoreTS;
339     private TagCompressionContext tagCompressionContext;
340     
341     protected ClonedSeekerState(ByteBuffer currentBuffer, byte[] keyBuffer, short rowLength,
342         int familyOffset, byte familyLength, int keyLength, int qualOffset, int qualLength,
343         long timeStamp, byte typeByte, int valueLen, int valueOffset, long memStoreTS,
344         int tagsOffset, int tagsLength, TagCompressionContext tagCompressionContext,
345         byte[] tagsBuffer) {
346       this.currentBuffer = currentBuffer;
347       keyOnlyBuffer = new byte[keyLength];
348       this.tagCompressionContext = tagCompressionContext;
349       this.rowLength = rowLength;
350       this.familyOffset = familyOffset;
351       this.familyLength = familyLength;
352       this.qualifierOffset = qualOffset;
353       this.qualifierLength = qualLength;
354       this.timestamp = timeStamp;
355       this.typeByte = typeByte;
356       this.valueLength = valueLen;
357       this.valueOffset = valueOffset;
358       this.memstoreTS = memStoreTS;
359       this.tagsOffset = tagsOffset;
360       this.tagsLength = tagsLength;
361       System.arraycopy(keyBuffer, 0, keyOnlyBuffer, 0, keyLength);
362       if (tagCompressionContext != null) {
363         this.cloneTagsBuffer = new byte[tagsLength];
364         System.arraycopy(tagsBuffer, 0, this.cloneTagsBuffer, 0, tagsLength);
365       }
366     }
367 
368     @Override
369     public byte[] getRowArray() {
370       return keyOnlyBuffer;
371     }
372 
373     @Override
374     public byte[] getFamilyArray() {
375       return keyOnlyBuffer;
376     }
377 
378     @Override
379     public byte[] getQualifierArray() {
380       return keyOnlyBuffer;
381     }
382 
383     @Override
384     public int getRowOffset() {
385       return Bytes.SIZEOF_SHORT;
386     }
387 
388     @Override
389     public short getRowLength() {
390       return rowLength;
391     }
392 
393     @Override
394     public int getFamilyOffset() {
395       return familyOffset;
396     }
397 
398     @Override
399     public byte getFamilyLength() {
400       return familyLength;
401     }
402 
403     @Override
404     public int getQualifierOffset() {
405       return qualifierOffset;
406     }
407 
408     @Override
409     public int getQualifierLength() {
410       return qualifierLength;
411     }
412 
413     @Override
414     public long getTimestamp() {
415       return timestamp;
416     }
417 
418     @Override
419     public byte getTypeByte() {
420       return typeByte;
421     }
422 
423     @Override
424     public long getMvccVersion() {
425       return memstoreTS;
426     }
427 
428     @Override
429     public long getSequenceId() {
430       return memstoreTS;
431     }
432 
433     @Override
434     public byte[] getValueArray() {
435       return currentBuffer.array();
436     }
437 
438     @Override
439     public int getValueOffset() {
440       return currentBuffer.arrayOffset() + valueOffset;
441     }
442 
443     @Override
444     public int getValueLength() {
445       return valueLength;
446     }
447 
448     @Override
449     public byte[] getTagsArray() {
450       if (tagCompressionContext != null) {
451         return cloneTagsBuffer;
452       }
453       return currentBuffer.array();
454     }
455 
456     @Override
457     public int getTagsOffset() {
458       if (tagCompressionContext != null) {
459         return 0;
460       }
461       return currentBuffer.arrayOffset() + tagsOffset;
462     }
463 
464     @Override
465     public int getTagsLength() {
466       return tagsLength;
467     }
468 
469     @Override
470     @Deprecated
471     public byte[] getValue() {
472       throw new UnsupportedOperationException("getValue() not supported");
473     }
474 
475     @Override
476     @Deprecated
477     public byte[] getFamily() {
478       throw new UnsupportedOperationException("getFamily() not supported");
479     }
480 
481     @Override
482     @Deprecated
483     public byte[] getQualifier() {
484       throw new UnsupportedOperationException("getQualifier() not supported");
485     }
486 
487     @Override
488     @Deprecated
489     public byte[] getRow() {
490       throw new UnsupportedOperationException("getRow() not supported");
491     }
492 
493     @Override
494     public String toString() {
495       KeyValue kv = KeyValueUtil.copyToNewKeyValue(this);
496       if (kv == null) {
497         return "null";
498       }
499       return kv.toString();
500     }
501   }
502 
503   protected abstract static class
504       BufferedEncodedSeeker<STATE extends SeekerState>
505       implements EncodedSeeker {
506     protected HFileBlockDecodingContext decodingCtx;
507     protected final KVComparator comparator;
508     protected final SamePrefixComparator<byte[]> samePrefixComparator;
509     protected ByteBuffer currentBuffer;
510     protected STATE current = createSeekerState(); // always valid
511     protected STATE previous = createSeekerState(); // may not be valid
512     protected TagCompressionContext tagCompressionContext = null;
513 
514     public BufferedEncodedSeeker(KVComparator comparator,
515         HFileBlockDecodingContext decodingCtx) {
516       this.comparator = comparator;
517       this.samePrefixComparator = comparator;
518       this.decodingCtx = decodingCtx;
519       if (decodingCtx.getHFileContext().isCompressTags()) {
520         try {
521           tagCompressionContext = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
522         } catch (Exception e) {
523           throw new RuntimeException("Failed to initialize TagCompressionContext", e);
524         }
525       }
526     }
527     
528     protected boolean includesMvcc() {
529       return this.decodingCtx.getHFileContext().isIncludesMvcc();
530     }
531 
532     protected boolean includesTags() {
533       return this.decodingCtx.getHFileContext().isIncludesTags();
534     }
535 
536     @Override
537     public int compareKey(KVComparator comparator, byte[] key, int offset, int length) {
538       return comparator.compareFlatKey(key, offset, length,
539           current.keyBuffer, 0, current.keyLength);
540     }
541 
542     @Override
543     public int compareKey(KVComparator comparator, Cell key) {
544       return comparator.compareOnlyKeyPortion(key,
545           new KeyValue.KeyOnlyKeyValue(current.keyBuffer, 0, current.keyLength));
546     }
547 
548     @Override
549     public void setCurrentBuffer(ByteBuffer buffer) {
550       if (this.tagCompressionContext != null) {
551         this.tagCompressionContext.clear();
552       }
553       currentBuffer = buffer;
554       current.currentBuffer = currentBuffer;
555       if(tagCompressionContext != null) {
556         current.tagCompressionContext = tagCompressionContext;
557       }
558       decodeFirst();
559       current.createKeyOnlyKeyValue(current.keyBuffer, current.memstoreTS);
560       previous.invalidate();
561     }
562 
563     @Override
564     public ByteBuffer getKeyDeepCopy() {
565       ByteBuffer keyBuffer = ByteBuffer.allocate(current.keyLength);
566       keyBuffer.put(current.keyBuffer, 0, current.keyLength);
567       return keyBuffer;
568     }
569 
570     @Override
571     public ByteBuffer getValueShallowCopy() {
572       return ByteBuffer.wrap(currentBuffer.array(),
573           currentBuffer.arrayOffset() + current.valueOffset,
574           current.valueLength);
575     }
576 
577     @Override
578     public ByteBuffer getKeyValueBuffer() {
579       ByteBuffer kvBuffer = createKVBuffer();
580       kvBuffer.putInt(current.keyLength);
581       kvBuffer.putInt(current.valueLength);
582       kvBuffer.put(current.keyBuffer, 0, current.keyLength);
583       kvBuffer.put(currentBuffer.array(),
584           currentBuffer.arrayOffset() + current.valueOffset,
585           current.valueLength);
586       if (current.tagsLength > 0) {
587         // Put short as unsigned
588         kvBuffer.put((byte) (current.tagsLength >> 8 & 0xff));
589         kvBuffer.put((byte) (current.tagsLength & 0xff));
590         if (current.tagsOffset != -1) {
591           // the offset of the tags bytes in the underlying buffer is marked. So the temp
592           // buffer,tagsBuffer was not been used.
593           kvBuffer.put(currentBuffer.array(), currentBuffer.arrayOffset() + current.tagsOffset,
594               current.tagsLength);
595         } else {
596           // When tagsOffset is marked as -1, tag compression was present and so the tags were
597           // uncompressed into temp buffer, tagsBuffer. Let us copy it from there
598           kvBuffer.put(current.tagsBuffer, 0, current.tagsLength);
599         }
600       }
601       return kvBuffer;
602     }
603 
604     protected ByteBuffer createKVBuffer() {
605       int kvBufSize = (int) KeyValue.getKeyValueDataStructureSize(current.keyLength,
606           current.valueLength, current.tagsLength);
607       ByteBuffer kvBuffer = ByteBuffer.allocate(kvBufSize);
608       return kvBuffer;
609     }
610 
611     @Override
612     public Cell getKeyValue() {
613       return current.shallowCopy();
614     }
615 
616     @Override
617     public void rewind() {
618       currentBuffer.rewind();
619       if (tagCompressionContext != null) {
620         tagCompressionContext.clear();
621       }
622       decodeFirst();
623       current.createKeyOnlyKeyValue(current.keyBuffer, current.memstoreTS);
624       previous.invalidate();
625     }
626 
627     @Override
628     public boolean next() {
629       if (!currentBuffer.hasRemaining()) {
630         return false;
631       }
632       decodeNext();
633       current.createKeyOnlyKeyValue(current.keyBuffer, current.memstoreTS);
634       previous.invalidate();
635       return true;
636     }
637 
638     protected void decodeTags() {
639       current.tagsLength = ByteBufferUtils.readCompressedInt(currentBuffer);
640       if (tagCompressionContext != null) {
641         if (current.uncompressTags) {
642           // Tag compression is been used. uncompress it into tagsBuffer
643           current.ensureSpaceForTags();
644           try {
645             current.tagsCompressedLength = tagCompressionContext.uncompressTags(currentBuffer,
646                 current.tagsBuffer, 0, current.tagsLength);
647           } catch (IOException e) {
648             throw new RuntimeException("Exception while uncompressing tags", e);
649           }
650         } else {
651           ByteBufferUtils.skip(currentBuffer, current.tagsCompressedLength);
652           current.uncompressTags = true;// Reset this.
653         }
654         current.tagsOffset = -1;
655       } else {
656         // When tag compress is not used, let us not do temp copying of tags bytes into tagsBuffer.
657         // Just mark the tags Offset so as to create the KV buffer later in getKeyValueBuffer()
658         current.tagsOffset = currentBuffer.position();
659         ByteBufferUtils.skip(currentBuffer, current.tagsLength);
660       }
661     }
662 
663     @Override
664     public int seekToKeyInBlock(byte[] key, int offset, int length, boolean seekBefore) {
665       return seekToKeyInBlock(new KeyValue.KeyOnlyKeyValue(key, offset, length), seekBefore);
666     }
667 
668     @Override
669     public int seekToKeyInBlock(Cell seekCell, boolean seekBefore) {
670       int rowCommonPrefix = 0;
671       int familyCommonPrefix = 0;
672       int qualCommonPrefix = 0;
673       previous.invalidate();
674       KeyValue.KeyOnlyKeyValue currentCell = new KeyValue.KeyOnlyKeyValue();
675       do {
676         int comp;
677         if (samePrefixComparator != null) {
678           currentCell.setKey(current.keyBuffer, 0, current.keyLength);
679           if (current.lastCommonPrefix != 0) {
680             // The KV format has row key length also in the byte array. The
681             // common prefix
682             // includes it. So we need to subtract to find out the common prefix
683             // in the
684             // row part alone
685             rowCommonPrefix = Math.min(rowCommonPrefix, current.lastCommonPrefix - 2);
686           }
687           if (current.lastCommonPrefix <= 2) {
688             rowCommonPrefix = 0;
689           }
690           rowCommonPrefix += CellComparator.findCommonPrefixInRowPart(seekCell, currentCell,
691               rowCommonPrefix);
692           comp = CellComparator.compareCommonRowPrefix(seekCell, currentCell, rowCommonPrefix);
693           if (comp == 0) {
694             comp = compareTypeBytes(seekCell, currentCell);
695             if (comp == 0) {
696               // Subtract the fixed row key length and the family key fixed length
697               familyCommonPrefix = Math.max(
698                   0,
699                   Math.min(familyCommonPrefix,
700                       current.lastCommonPrefix - (3 + currentCell.getRowLength())));
701               familyCommonPrefix += CellComparator.findCommonPrefixInFamilyPart(seekCell,
702                   currentCell, familyCommonPrefix);
703               comp = CellComparator.compareCommonFamilyPrefix(seekCell, currentCell,
704                   familyCommonPrefix);
705               if (comp == 0) {
706                 // subtract the rowkey fixed length and the family key fixed
707                 // length
708                 qualCommonPrefix = Math.max(
709                     0,
710                     Math.min(
711                         qualCommonPrefix,
712                         current.lastCommonPrefix
713                             - (3 + currentCell.getRowLength() + currentCell.getFamilyLength())));
714                 qualCommonPrefix += CellComparator.findCommonPrefixInQualifierPart(seekCell,
715                     currentCell, qualCommonPrefix);
716                 comp = CellComparator.compareCommonQualifierPrefix(seekCell, currentCell,
717                     qualCommonPrefix);
718                 if (comp == 0) {
719                   comp = CellComparator.compareTimestamps(seekCell, currentCell);
720                   if (comp == 0) {
721                     // Compare types. Let the delete types sort ahead of puts;
722                     // i.e. types
723                     // of higher numbers sort before those of lesser numbers.
724                     // Maximum
725                     // (255)
726                     // appears ahead of everything, and minimum (0) appears
727                     // after
728                     // everything.
729                     comp = (0xff & currentCell.getTypeByte()) - (0xff & seekCell.getTypeByte());
730                   }
731                 }
732               }
733             }
734           }
735         } else {
736           Cell r = new KeyValue.KeyOnlyKeyValue(current.keyBuffer, 0, current.keyLength);
737           comp = comparator.compareOnlyKeyPortion(seekCell, r);
738         }
739         if (comp == 0) { // exact match
740           if (seekBefore) {
741             if (!previous.isValid()) {
742               // The caller (seekBefore) has to ensure that we are not at the
743               // first key in the block.
744               throw new IllegalStateException("Cannot seekBefore if "
745                   + "positioned at the first key in the block: key="
746                   + Bytes.toStringBinary(seekCell.getRowArray()));
747             }
748             moveToPrevious();
749             return 1;
750           }
751           return 0;
752         }
753 
754         if (comp < 0) { // already too large, check previous
755           if (previous.isValid()) {
756             moveToPrevious();
757           } else {
758             return HConstants.INDEX_KEY_MAGIC; // using optimized index key
759           }
760           return 1;
761         }
762 
763         // move to next, if more data is available
764         if (currentBuffer.hasRemaining()) {
765           previous.copyFromNext(current);
766           decodeNext();
767           current.createKeyOnlyKeyValue(current.keyBuffer, current.memstoreTS);
768         } else {
769           break;
770         }
771       } while (true);
772 
773       // we hit the end of the block, not an exact match
774       return 1;
775     }
776 
777     private int compareTypeBytes(Cell key, Cell right) {
778       if (key.getFamilyLength() + key.getQualifierLength() == 0
779           && key.getTypeByte() == Type.Minimum.getCode()) {
780         // left is "bigger", i.e. it appears later in the sorted order
781         return 1;
782       }
783       if (right.getFamilyLength() + right.getQualifierLength() == 0
784           && right.getTypeByte() == Type.Minimum.getCode()) {
785         return -1;
786       }
787       return 0;
788     }
789 
790 
791     private void moveToPrevious() {
792       if (!previous.isValid()) {
793         throw new IllegalStateException(
794             "Can move back only once and not in first key in the block.");
795       }
796 
797       STATE tmp = previous;
798       previous = current;
799       current = tmp;
800 
801       // move after last key value
802       currentBuffer.position(current.nextKvOffset);
803       // Already decoded the tag bytes. We cache this tags into current state and also the total
804       // compressed length of the tags bytes. For the next time decodeNext() we don't need to decode
805       // the tags again. This might pollute the Data Dictionary what we use for the compression.
806       // When current.uncompressTags is false, we will just reuse the current.tagsBuffer and skip
807       // 'tagsCompressedLength' bytes of source stream.
808       // See in decodeTags()
809       current.tagsBuffer = previous.tagsBuffer;
810       current.tagsCompressedLength = previous.tagsCompressedLength;
811       current.uncompressTags = false;
812       previous.invalidate();
813     }
814 
815     @SuppressWarnings("unchecked")
816     protected STATE createSeekerState() {
817       // This will fail for non-default seeker state if the subclass does not
818       // override this method.
819       return (STATE) new SeekerState();
820     }
821 
822     abstract protected void decodeFirst();
823     abstract protected void decodeNext();
824   }
825 
826   /**
827    * @param kv
828    * @param out
829    * @param encodingCtx
830    * @return unencoded size added
831    * @throws IOException
832    */
833   protected final int afterEncodingKeyValue(KeyValue kv, DataOutputStream out,
834       HFileBlockDefaultEncodingContext encodingCtx) throws IOException {
835     int size = 0;
836     if (encodingCtx.getHFileContext().isIncludesTags()) {
837       int tagsLength = kv.getTagsLength();
838       ByteBufferUtils.putCompressedInt(out, tagsLength);
839       // There are some tags to be written
840       if (tagsLength > 0) {
841         TagCompressionContext tagCompressionContext = encodingCtx.getTagCompressionContext();
842         // When tag compression is enabled, tagCompressionContext will have a not null value. Write
843         // the tags using Dictionary compression in such a case
844         if (tagCompressionContext != null) {
845           tagCompressionContext
846               .compressTags(out, kv.getTagsArray(), kv.getTagsOffset(), tagsLength);
847         } else {
848           out.write(kv.getTagsArray(), kv.getTagsOffset(), tagsLength);
849         }
850       }
851       size += tagsLength + KeyValue.TAGS_LENGTH_SIZE;
852     }
853     if (encodingCtx.getHFileContext().isIncludesMvcc()) {
854       // Copy memstore timestamp from the byte buffer to the output stream.
855       long memstoreTS = kv.getMvccVersion();
856       WritableUtils.writeVLong(out, memstoreTS);
857       // TODO use a writeVLong which returns the #bytes written so that 2 time parsing can be
858       // avoided.
859       size += WritableUtils.getVIntSize(memstoreTS);
860     }
861     return size;
862   }
863 
864   protected final void afterDecodingKeyValue(DataInputStream source,
865       ByteBuffer dest, HFileBlockDefaultDecodingContext decodingCtx) throws IOException {
866     if (decodingCtx.getHFileContext().isIncludesTags()) {
867       int tagsLength = ByteBufferUtils.readCompressedInt(source);
868       // Put as unsigned short
869       dest.put((byte) ((tagsLength >> 8) & 0xff));
870       dest.put((byte) (tagsLength & 0xff));
871       if (tagsLength > 0) {
872         TagCompressionContext tagCompressionContext = decodingCtx.getTagCompressionContext();
873         // When tag compression is been used in this file, tagCompressionContext will have a not
874         // null value passed.
875         if (tagCompressionContext != null) {
876           tagCompressionContext.uncompressTags(source, dest, tagsLength);
877         } else {
878           ByteBufferUtils.copyFromStreamToBuffer(dest, source, tagsLength);
879         }
880       }
881     }
882     if (decodingCtx.getHFileContext().isIncludesMvcc()) {
883       long memstoreTS = -1;
884       try {
885         // Copy memstore timestamp from the data input stream to the byte
886         // buffer.
887         memstoreTS = WritableUtils.readVLong(source);
888         ByteBufferUtils.writeVLong(dest, memstoreTS);
889       } catch (IOException ex) {
890         throw new RuntimeException("Unable to copy memstore timestamp " +
891             memstoreTS + " after decoding a key/value");
892       }
893     }
894   }
895 
896   @Override
897   public HFileBlockEncodingContext newDataBlockEncodingContext(DataBlockEncoding encoding,
898       byte[] header, HFileContext meta) {
899     return new HFileBlockDefaultEncodingContext(encoding, header, meta);
900   }
901 
902   @Override
903   public HFileBlockDecodingContext newDataBlockDecodingContext(HFileContext meta) {
904     return new HFileBlockDefaultDecodingContext(meta);
905   }
906 
907   protected abstract ByteBuffer internalDecodeKeyValues(DataInputStream source,
908       int allocateHeaderLength, int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx)
909       throws IOException;
910 
911   /**
912    * Asserts that there is at least the given amount of unfilled space
913    * remaining in the given buffer.
914    * @param out typically, the buffer we are writing to
915    * @param length the required space in the buffer
916    * @throws EncoderBufferTooSmallException If there are no enough bytes.
917    */
918   protected static void ensureSpace(ByteBuffer out, int length)
919       throws EncoderBufferTooSmallException {
920     if (out.position() + length > out.limit()) {
921       throw new EncoderBufferTooSmallException(
922           "Buffer position=" + out.position() +
923           ", buffer limit=" + out.limit() +
924           ", length to be written=" + length);
925     }
926   }
927 
928   @Override
929   public void startBlockEncoding(HFileBlockEncodingContext blkEncodingCtx, DataOutputStream out)
930       throws IOException {
931     if (blkEncodingCtx.getClass() != HFileBlockDefaultEncodingContext.class) {
932       throw new IOException (this.getClass().getName() + " only accepts "
933           + HFileBlockDefaultEncodingContext.class.getName() + " as the " +
934           "encoding context.");
935     }
936 
937     HFileBlockDefaultEncodingContext encodingCtx =
938         (HFileBlockDefaultEncodingContext) blkEncodingCtx;
939     encodingCtx.prepareEncoding(out);
940     if (encodingCtx.getHFileContext().isIncludesTags()
941         && encodingCtx.getHFileContext().isCompressTags()) {
942       if (encodingCtx.getTagCompressionContext() != null) {
943         // It will be overhead to create the TagCompressionContext again and again for every block
944         // encoding.
945         encodingCtx.getTagCompressionContext().clear();
946       } else {
947         try {
948           TagCompressionContext tagCompressionContext = new TagCompressionContext(
949               LRUDictionary.class, Byte.MAX_VALUE);
950           encodingCtx.setTagCompressionContext(tagCompressionContext);
951         } catch (Exception e) {
952           throw new IOException("Failed to initialize TagCompressionContext", e);
953         }
954       }
955     }
956     ByteBufferUtils.putInt(out, 0); // DUMMY length. This will be updated in endBlockEncoding()
957     blkEncodingCtx.setEncodingState(new BufferedDataBlockEncodingState());
958   }
959 
960   private static class BufferedDataBlockEncodingState extends EncodingState {
961     int unencodedDataSizeWritten = 0;
962   }
963 
964   @Override
965   public int encode(KeyValue kv, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
966       throws IOException {
967     BufferedDataBlockEncodingState state = (BufferedDataBlockEncodingState) encodingCtx
968         .getEncodingState();
969     int encodedKvSize = internalEncode(kv, (HFileBlockDefaultEncodingContext) encodingCtx, out);
970     state.unencodedDataSizeWritten += encodedKvSize;
971     return encodedKvSize;
972   }
973 
974   public abstract int internalEncode(KeyValue kv, HFileBlockDefaultEncodingContext encodingCtx,
975       DataOutputStream out) throws IOException;
976 
977   @Override
978   public void endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out,
979       byte[] uncompressedBytesWithHeader) throws IOException {
980     BufferedDataBlockEncodingState state = (BufferedDataBlockEncodingState) encodingCtx
981         .getEncodingState();
982     // Write the unencodedDataSizeWritten (with header size)
983     Bytes.putInt(uncompressedBytesWithHeader, HConstants.HFILEBLOCK_HEADER_SIZE
984         + DataBlockEncoding.ID_SIZE, state.unencodedDataSizeWritten
985         );
986     if (encodingCtx.getDataBlockEncoding() != DataBlockEncoding.NONE) {
987       encodingCtx.postEncoding(BlockType.ENCODED_DATA);
988     } else {
989       encodingCtx.postEncoding(BlockType.DATA);
990     }
991   }
992 }