View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations
15   * under the License.
16   */
17  package org.apache.hadoop.hbase.io.encoding;
18  
19  import java.io.DataInputStream;
20  import java.io.DataOutputStream;
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  
24  import org.apache.hadoop.hbase.Cell;
25  import org.apache.hadoop.hbase.CellComparator;
26  import org.apache.hadoop.hbase.CellUtil;
27  import org.apache.hadoop.hbase.HConstants;
28  import org.apache.hadoop.hbase.KeyValue;
29  import org.apache.hadoop.hbase.KeyValue.KVComparator;
30  import org.apache.hadoop.hbase.KeyValue.SamePrefixComparator;
31  import org.apache.hadoop.hbase.KeyValue.Type;
32  import org.apache.hadoop.hbase.KeyValueUtil;
33  import org.apache.hadoop.hbase.SettableSequenceId;
34  import org.apache.hadoop.hbase.classification.InterfaceAudience;
35  import org.apache.hadoop.hbase.io.HeapSize;
36  import org.apache.hadoop.hbase.io.TagCompressionContext;
37  import org.apache.hadoop.hbase.io.hfile.BlockType;
38  import org.apache.hadoop.hbase.io.hfile.HFileContext;
39  import org.apache.hadoop.hbase.io.util.LRUDictionary;
40  import org.apache.hadoop.hbase.util.ByteBufferUtils;
41  import org.apache.hadoop.hbase.util.Bytes;
42  import org.apache.hadoop.hbase.util.ClassSize;
43  import org.apache.hadoop.io.WritableUtils;
44  
45  /**
46   * Base class for all data block encoders that use a buffer.
47   */
48  @InterfaceAudience.Private
49  abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
50  
51    private static int INITIAL_KEY_BUFFER_SIZE = 512;
52  
53    @Override
54    public ByteBuffer decodeKeyValues(DataInputStream source,
55        HFileBlockDecodingContext blkDecodingCtx) throws IOException {
56      if (blkDecodingCtx.getClass() != HFileBlockDefaultDecodingContext.class) {
57        throw new IOException(this.getClass().getName() + " only accepts "
58            + HFileBlockDefaultDecodingContext.class.getName() + " as the decoding context.");
59      }
60  
61      HFileBlockDefaultDecodingContext decodingCtx =
62          (HFileBlockDefaultDecodingContext) blkDecodingCtx;
63      if (decodingCtx.getHFileContext().isIncludesTags()
64          && decodingCtx.getHFileContext().isCompressTags()) {
65        if (decodingCtx.getTagCompressionContext() != null) {
66          // It will be overhead to create the TagCompressionContext again and again for every block
67          // decoding.
68          decodingCtx.getTagCompressionContext().clear();
69        } else {
70          try {
71            TagCompressionContext tagCompressionContext = new TagCompressionContext(
72                LRUDictionary.class, Byte.MAX_VALUE);
73            decodingCtx.setTagCompressionContext(tagCompressionContext);
74          } catch (Exception e) {
75            throw new IOException("Failed to initialize TagCompressionContext", e);
76          }
77        }
78      }
79      return internalDecodeKeyValues(source, 0, 0, decodingCtx);
80    }
81  
82    protected static class SeekerState implements Cell {
83      protected ByteBuffer currentBuffer;
84      protected TagCompressionContext tagCompressionContext;
85      protected int valueOffset = -1;
86      protected int keyLength;
87      protected int valueLength;
88      protected int lastCommonPrefix;
89      protected int tagsLength = 0;
90      protected int tagsOffset = -1;
91      protected int tagsCompressedLength = 0;
92      protected boolean uncompressTags = true;
93  
94      /** We need to store a copy of the key. */
95      protected byte[] keyBuffer = new byte[INITIAL_KEY_BUFFER_SIZE];
96      protected byte[] tagsBuffer = new byte[INITIAL_KEY_BUFFER_SIZE];
97  
98      protected long memstoreTS;
99      protected int nextKvOffset;
100     protected KeyValue.KeyOnlyKeyValue currentKey = new KeyValue.KeyOnlyKeyValue();
101 
102     protected boolean isValid() {
103       return valueOffset != -1;
104     }
105 
106     protected void invalidate() {
107       valueOffset = -1;
108       tagsCompressedLength = 0;
109       currentKey = new KeyValue.KeyOnlyKeyValue();
110       uncompressTags = true;
111       currentBuffer = null;
112     }
113 
114     protected void ensureSpaceForKey() {
115       if (keyLength > keyBuffer.length) {
116         // rare case, but we need to handle arbitrary length of key
117         int newKeyBufferLength = Math.max(keyBuffer.length, 1) * 2;
118         while (keyLength > newKeyBufferLength) {
119           newKeyBufferLength *= 2;
120         }
121         byte[] newKeyBuffer = new byte[newKeyBufferLength];
122         System.arraycopy(keyBuffer, 0, newKeyBuffer, 0, keyBuffer.length);
123         keyBuffer = newKeyBuffer;
124       }
125     }
126 
127     protected void ensureSpaceForTags() {
128       if (tagsLength > tagsBuffer.length) {
129         // rare case, but we need to handle arbitrary length of tags
130         int newTagsBufferLength = Math.max(tagsBuffer.length, 1) * 2;
131         while (tagsLength > newTagsBufferLength) {
132           newTagsBufferLength *= 2;
133         }
134         byte[] newTagsBuffer = new byte[newTagsBufferLength];
135         System.arraycopy(tagsBuffer, 0, newTagsBuffer, 0, tagsBuffer.length);
136         tagsBuffer = newTagsBuffer;
137       }
138     }
139 
140     protected void setKey(byte[] keyBuffer, long memTS) {
141       currentKey.setKey(keyBuffer, 0, keyLength);
142       memstoreTS = memTS;
143     }
144 
145     /**
146      * Copy the state from the next one into this instance (the previous state
147      * placeholder). Used to save the previous state when we are advancing the
148      * seeker to the next key/value.
149      */
150     protected void copyFromNext(SeekerState nextState) {
151       if (keyBuffer.length != nextState.keyBuffer.length) {
152         keyBuffer = nextState.keyBuffer.clone();
153       } else if (!isValid()) {
154         // Note: we can only call isValid before we override our state, so this
155         // comes before all the assignments at the end of this method.
156         System.arraycopy(nextState.keyBuffer, 0, keyBuffer, 0,
157              nextState.keyLength);
158       } else {
159         // don't copy the common prefix between this key and the previous one
160         System.arraycopy(nextState.keyBuffer, nextState.lastCommonPrefix,
161             keyBuffer, nextState.lastCommonPrefix, nextState.keyLength
162                 - nextState.lastCommonPrefix);
163       }
164       currentKey = nextState.currentKey;
165 
166       valueOffset = nextState.valueOffset;
167       keyLength = nextState.keyLength;
168       valueLength = nextState.valueLength;
169       lastCommonPrefix = nextState.lastCommonPrefix;
170       nextKvOffset = nextState.nextKvOffset;
171       memstoreTS = nextState.memstoreTS;
172       currentBuffer = nextState.currentBuffer;
173       if (nextState.tagCompressionContext != null) {
174         tagCompressionContext = nextState.tagCompressionContext;
175       }
176     }
177 
178     @Override
179     public byte[] getRowArray() {
180       return currentKey.getRowArray();
181     }
182 
183     @Override
184     public int getRowOffset() {
185       return Bytes.SIZEOF_SHORT;
186     }
187 
188     @Override
189     public short getRowLength() {
190       return currentKey.getRowLength();
191     }
192 
193     @Override
194     public byte[] getFamilyArray() {
195       return currentKey.getFamilyArray();
196     }
197 
198     @Override
199     public int getFamilyOffset() {
200       return currentKey.getFamilyOffset();
201     }
202 
203     @Override
204     public byte getFamilyLength() {
205       return currentKey.getFamilyLength();
206     }
207 
208     @Override
209     public byte[] getQualifierArray() {
210       return currentKey.getQualifierArray();
211     }
212 
213     @Override
214     public int getQualifierOffset() {
215       return currentKey.getQualifierOffset();
216     }
217 
218     @Override
219     public int getQualifierLength() {
220       return currentKey.getQualifierLength();
221     }
222 
223     @Override
224     public long getTimestamp() {
225       return currentKey.getTimestamp();
226     }
227 
228     @Override
229     public byte getTypeByte() {
230       return currentKey.getTypeByte();
231     }
232 
233     @Override
234     public long getMvccVersion() {
235       return memstoreTS;
236     }
237 
238     @Override
239     public long getSequenceId() {
240       return memstoreTS;
241     }
242 
243     @Override
244     public byte[] getValueArray() {
245       return currentBuffer.array();
246     }
247 
248     @Override
249     public int getValueOffset() {
250       return currentBuffer.arrayOffset() + valueOffset;
251     }
252 
253     @Override
254     public int getValueLength() {
255       return valueLength;
256     }
257 
258     @Override
259     public byte[] getTagsArray() {
260       if (tagCompressionContext != null) {
261         return tagsBuffer;
262       }
263       return currentBuffer.array();
264     }
265 
266     @Override
267     public int getTagsOffset() {
268       if (tagCompressionContext != null) {
269         return 0;
270       }
271       return currentBuffer.arrayOffset() + tagsOffset;
272     }
273 
274     @Override
275     public int getTagsLength() {
276       return tagsLength;
277     }
278 
279     @Override
280     @Deprecated
281     public byte[] getValue() {
282       throw new UnsupportedOperationException("getValue() not supported");
283     }
284 
285     @Override
286     @Deprecated
287     public byte[] getFamily() {
288       throw new UnsupportedOperationException("getFamily() not supported");
289     }
290 
291     @Override
292     @Deprecated
293     public byte[] getQualifier() {
294       throw new UnsupportedOperationException("getQualifier() not supported");
295     }
296 
297     @Override
298     @Deprecated
299     public byte[] getRow() {
300       throw new UnsupportedOperationException("getRow() not supported");
301     }
302 
303     @Override
304     public String toString() {
305       return KeyValue.keyToString(this.keyBuffer, 0, KeyValueUtil.keyLength(this)) + "/vlen="
306           + getValueLength() + "/seqid=" + memstoreTS;
307     }
308 
309     public Cell shallowCopy() {
310       return new ClonedSeekerState(currentBuffer, keyBuffer, currentKey.getRowLength(),
311           currentKey.getFamilyOffset(), currentKey.getFamilyLength(), keyLength, 
312           currentKey.getQualifierOffset(), currentKey.getQualifierLength(),
313           currentKey.getTimestamp(), currentKey.getTypeByte(), valueLength, valueOffset,
314           memstoreTS, tagsOffset, tagsLength, tagCompressionContext, tagsBuffer);
315     }
316   }
317 
318   /**
319    * Copies only the key part of the keybuffer by doing a deep copy and passes the 
320    * seeker state members for taking a clone.
321    * Note that the value byte[] part is still pointing to the currentBuffer and the 
322    * represented by the valueOffset and valueLength
323    */
324   // We return this as a Cell to the upper layers of read flow and might try setting a new SeqId
325   // there. So this has to be an instance of SettableSequenceId. SeekerState need not be
326   // SettableSequenceId as we never return that to top layers. When we have to, we make
327   // ClonedSeekerState from it.
328   protected static class ClonedSeekerState implements Cell, HeapSize, SettableSequenceId {
329     private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
330         + (4 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT)
331         + (Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) + (2 * ClassSize.ARRAY));
332     private byte[] keyOnlyBuffer;
333     private ByteBuffer currentBuffer;
334     private short rowLength;
335     private int familyOffset;
336     private byte familyLength;
337     private int qualifierOffset;
338     private int qualifierLength;
339     private long timestamp;
340     private byte typeByte;
341     private int valueOffset;
342     private int valueLength;
343     private int tagsLength;
344     private int tagsOffset;
345     private byte[] cloneTagsBuffer;
346     private long seqId;
347     private TagCompressionContext tagCompressionContext;
348     
349     protected ClonedSeekerState(ByteBuffer currentBuffer, byte[] keyBuffer, short rowLength,
350         int familyOffset, byte familyLength, int keyLength, int qualOffset, int qualLength,
351         long timeStamp, byte typeByte, int valueLen, int valueOffset, long seqId,
352         int tagsOffset, int tagsLength, TagCompressionContext tagCompressionContext,
353         byte[] tagsBuffer) {
354       this.currentBuffer = currentBuffer;
355       keyOnlyBuffer = new byte[keyLength];
356       this.tagCompressionContext = tagCompressionContext;
357       this.rowLength = rowLength;
358       this.familyOffset = familyOffset;
359       this.familyLength = familyLength;
360       this.qualifierOffset = qualOffset;
361       this.qualifierLength = qualLength;
362       this.timestamp = timeStamp;
363       this.typeByte = typeByte;
364       this.valueLength = valueLen;
365       this.valueOffset = valueOffset;
366       this.tagsOffset = tagsOffset;
367       this.tagsLength = tagsLength;
368       System.arraycopy(keyBuffer, 0, keyOnlyBuffer, 0, keyLength);
369       if (tagCompressionContext != null) {
370         this.cloneTagsBuffer = new byte[tagsLength];
371         System.arraycopy(tagsBuffer, 0, this.cloneTagsBuffer, 0, tagsLength);
372       }
373       setSequenceId(seqId);
374     }
375 
376     @Override
377     public byte[] getRowArray() {
378       return keyOnlyBuffer;
379     }
380 
381     @Override
382     public byte[] getFamilyArray() {
383       return keyOnlyBuffer;
384     }
385 
386     @Override
387     public byte[] getQualifierArray() {
388       return keyOnlyBuffer;
389     }
390 
391     @Override
392     public int getRowOffset() {
393       return Bytes.SIZEOF_SHORT;
394     }
395 
396     @Override
397     public short getRowLength() {
398       return rowLength;
399     }
400 
401     @Override
402     public int getFamilyOffset() {
403       return familyOffset;
404     }
405 
406     @Override
407     public byte getFamilyLength() {
408       return familyLength;
409     }
410 
411     @Override
412     public int getQualifierOffset() {
413       return qualifierOffset;
414     }
415 
416     @Override
417     public int getQualifierLength() {
418       return qualifierLength;
419     }
420 
421     @Override
422     public long getTimestamp() {
423       return timestamp;
424     }
425 
426     @Override
427     public byte getTypeByte() {
428       return typeByte;
429     }
430 
431     @Override
432     @Deprecated
433     public long getMvccVersion() {
434       return getSequenceId();
435     }
436 
437     @Override
438     public long getSequenceId() {
439       return seqId;
440     }
441 
442     @Override
443     public byte[] getValueArray() {
444       return currentBuffer.array();
445     }
446 
447     @Override
448     public int getValueOffset() {
449       return currentBuffer.arrayOffset() + valueOffset;
450     }
451 
452     @Override
453     public int getValueLength() {
454       return valueLength;
455     }
456 
457     @Override
458     public byte[] getTagsArray() {
459       if (tagCompressionContext != null) {
460         return cloneTagsBuffer;
461       }
462       return currentBuffer.array();
463     }
464 
465     @Override
466     public int getTagsOffset() {
467       if (tagCompressionContext != null) {
468         return 0;
469       }
470       return currentBuffer.arrayOffset() + tagsOffset;
471     }
472 
473     @Override
474     public int getTagsLength() {
475       return tagsLength;
476     }
477 
478     @Override
479     @Deprecated
480     public byte[] getValue() {
481       return CellUtil.cloneValue(this);
482     }
483 
484     @Override
485     @Deprecated
486     public byte[] getFamily() {
487       return CellUtil.cloneFamily(this);
488     }
489 
490     @Override
491     @Deprecated
492     public byte[] getQualifier() {
493       return CellUtil.cloneQualifier(this);
494     }
495 
496     @Override
497     @Deprecated
498     public byte[] getRow() {
499       return CellUtil.cloneRow(this);
500     }
501 
502     @Override
503     public String toString() {
504       return KeyValue.keyToString(this.keyOnlyBuffer, 0, KeyValueUtil.keyLength(this)) + "/vlen="
505           + getValueLength() + "/seqid=" + seqId;
506     }
507 
508     @Override
509     public void setSequenceId(long seqId) {
510       this.seqId = seqId;
511     }
512 
513     @Override
514     public long heapSize() {
515       return FIXED_OVERHEAD + rowLength + familyLength + qualifierLength + valueLength + tagsLength;
516     }
517   }
518 
519   protected abstract static class
520       BufferedEncodedSeeker<STATE extends SeekerState>
521       implements EncodedSeeker {
522     protected HFileBlockDecodingContext decodingCtx;
523     protected final KVComparator comparator;
524     protected final SamePrefixComparator<byte[]> samePrefixComparator;
525     protected ByteBuffer currentBuffer;
526     protected STATE current = createSeekerState(); // always valid
527     protected STATE previous = createSeekerState(); // may not be valid
528     protected TagCompressionContext tagCompressionContext = null;
529 
530     public BufferedEncodedSeeker(KVComparator comparator,
531         HFileBlockDecodingContext decodingCtx) {
532       this.comparator = comparator;
533       this.samePrefixComparator = comparator;
534       this.decodingCtx = decodingCtx;
535       if (decodingCtx.getHFileContext().isCompressTags()) {
536         try {
537           tagCompressionContext = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
538         } catch (Exception e) {
539           throw new RuntimeException("Failed to initialize TagCompressionContext", e);
540         }
541       }
542     }
543     
544     protected boolean includesMvcc() {
545       return this.decodingCtx.getHFileContext().isIncludesMvcc();
546     }
547 
548     protected boolean includesTags() {
549       return this.decodingCtx.getHFileContext().isIncludesTags();
550     }
551 
552     @Override
553     public int compareKey(KVComparator comparator, byte[] key, int offset, int length) {
554       return comparator.compareFlatKey(key, offset, length,
555           current.keyBuffer, 0, current.keyLength);
556     }
557 
558     @Override
559     public int compareKey(KVComparator comparator, Cell key) {
560       return comparator.compareOnlyKeyPortion(key,
561           new KeyValue.KeyOnlyKeyValue(current.keyBuffer, 0, current.keyLength));
562     }
563 
564     @Override
565     public void setCurrentBuffer(ByteBuffer buffer) {
566       if (this.tagCompressionContext != null) {
567         this.tagCompressionContext.clear();
568       }
569       currentBuffer = buffer;
570       current.currentBuffer = currentBuffer;
571       if(tagCompressionContext != null) {
572         current.tagCompressionContext = tagCompressionContext;
573       }
574       decodeFirst();
575       current.setKey(current.keyBuffer, current.memstoreTS);
576       previous.invalidate();
577     }
578 
579     @Override
580     public ByteBuffer getKeyDeepCopy() {
581       ByteBuffer keyBuffer = ByteBuffer.allocate(current.keyLength);
582       keyBuffer.put(current.keyBuffer, 0, current.keyLength);
583       return keyBuffer;
584     }
585 
586     @Override
587     public ByteBuffer getValueShallowCopy() {
588       ByteBuffer dup = currentBuffer.duplicate();
589       dup.position(current.valueOffset);
590       dup.limit(current.valueOffset + current.valueLength);
591       return dup.slice();
592     }
593 
594     @Override
595     public ByteBuffer getKeyValueBuffer() {
596       ByteBuffer kvBuffer = createKVBuffer();
597       kvBuffer.putInt(current.keyLength);
598       kvBuffer.putInt(current.valueLength);
599       kvBuffer.put(current.keyBuffer, 0, current.keyLength);
600       ByteBufferUtils.copyFromBufferToBuffer(kvBuffer, currentBuffer, current.valueOffset,
601           current.valueLength);
602       if (current.tagsLength > 0) {
603         // Put short as unsigned
604         kvBuffer.put((byte) (current.tagsLength >> 8 & 0xff));
605         kvBuffer.put((byte) (current.tagsLength & 0xff));
606         if (current.tagsOffset != -1) {
607           // the offset of the tags bytes in the underlying buffer is marked. So the temp
608           // buffer,tagsBuffer was not been used.
609           ByteBufferUtils.copyFromBufferToBuffer(kvBuffer, currentBuffer, current.tagsOffset,
610               current.tagsLength);
611         } else {
612           // When tagsOffset is marked as -1, tag compression was present and so the tags were
613           // uncompressed into temp buffer, tagsBuffer. Let us copy it from there
614           kvBuffer.put(current.tagsBuffer, 0, current.tagsLength);
615         }
616       }
617       return kvBuffer;
618     }
619 
620     protected ByteBuffer createKVBuffer() {
621       int kvBufSize = (int) KeyValue.getKeyValueDataStructureSize(current.keyLength,
622           current.valueLength, current.tagsLength);
623       ByteBuffer kvBuffer = ByteBuffer.allocate(kvBufSize);
624       return kvBuffer;
625     }
626 
627     @Override
628     public Cell getKeyValue() {
629       return current.shallowCopy();
630     }
631 
632     @Override
633     public void rewind() {
634       currentBuffer.rewind();
635       if (tagCompressionContext != null) {
636         tagCompressionContext.clear();
637       }
638       decodeFirst();
639       current.setKey(current.keyBuffer, current.memstoreTS);
640       previous.invalidate();
641     }
642 
643     @Override
644     public boolean next() {
645       if (!currentBuffer.hasRemaining()) {
646         return false;
647       }
648       decodeNext();
649       current.setKey(current.keyBuffer, current.memstoreTS);
650       previous.invalidate();
651       return true;
652     }
653 
654     protected void decodeTags() {
655       current.tagsLength = ByteBufferUtils.readCompressedInt(currentBuffer);
656       if (tagCompressionContext != null) {
657         if (current.uncompressTags) {
658           // Tag compression is been used. uncompress it into tagsBuffer
659           current.ensureSpaceForTags();
660           try {
661             current.tagsCompressedLength = tagCompressionContext.uncompressTags(currentBuffer,
662                 current.tagsBuffer, 0, current.tagsLength);
663           } catch (IOException e) {
664             throw new RuntimeException("Exception while uncompressing tags", e);
665           }
666         } else {
667           ByteBufferUtils.skip(currentBuffer, current.tagsCompressedLength);
668           current.uncompressTags = true;// Reset this.
669         }
670         current.tagsOffset = -1;
671       } else {
672         // When tag compress is not used, let us not do copying of tags bytes into tagsBuffer.
673         // Just mark the tags Offset so as to create the KV buffer later in getKeyValueBuffer()
674         current.tagsOffset = currentBuffer.position();
675         ByteBufferUtils.skip(currentBuffer, current.tagsLength);
676       }
677     }
678 
679     @Override
680     public int seekToKeyInBlock(Cell seekCell, boolean seekBefore) {
681       int rowCommonPrefix = 0;
682       int familyCommonPrefix = 0;
683       int qualCommonPrefix = 0;
684       previous.invalidate();
685       KeyValue.KeyOnlyKeyValue currentCell = new KeyValue.KeyOnlyKeyValue();
686       do {
687         int comp;
688         if (samePrefixComparator != null) {
689           currentCell.setKey(current.keyBuffer, 0, current.keyLength);
690           if (current.lastCommonPrefix != 0) {
691             // The KV format has row key length also in the byte array. The
692             // common prefix
693             // includes it. So we need to subtract to find out the common prefix
694             // in the
695             // row part alone
696             rowCommonPrefix = Math.min(rowCommonPrefix, current.lastCommonPrefix - 2);
697           }
698           if (current.lastCommonPrefix <= 2) {
699             rowCommonPrefix = 0;
700           }
701           rowCommonPrefix += CellComparator.findCommonPrefixInRowPart(seekCell, currentCell,
702               rowCommonPrefix);
703           comp = CellComparator.compareCommonRowPrefix(seekCell, currentCell, rowCommonPrefix);
704           if (comp == 0) {
705             comp = compareTypeBytes(seekCell, currentCell);
706             if (comp == 0) {
707               // Subtract the fixed row key length and the family key fixed length
708               familyCommonPrefix = Math.max(
709                   0,
710                   Math.min(familyCommonPrefix,
711                       current.lastCommonPrefix - (3 + currentCell.getRowLength())));
712               familyCommonPrefix += CellComparator.findCommonPrefixInFamilyPart(seekCell,
713                   currentCell, familyCommonPrefix);
714               comp = CellComparator.compareCommonFamilyPrefix(seekCell, currentCell,
715                   familyCommonPrefix);
716               if (comp == 0) {
717                 // subtract the rowkey fixed length and the family key fixed
718                 // length
719                 qualCommonPrefix = Math.max(
720                     0,
721                     Math.min(
722                         qualCommonPrefix,
723                         current.lastCommonPrefix
724                             - (3 + currentCell.getRowLength() + currentCell.getFamilyLength())));
725                 qualCommonPrefix += CellComparator.findCommonPrefixInQualifierPart(seekCell,
726                     currentCell, qualCommonPrefix);
727                 comp = CellComparator.compareCommonQualifierPrefix(seekCell, currentCell,
728                     qualCommonPrefix);
729                 if (comp == 0) {
730                   comp = CellComparator.compareTimestamps(seekCell, currentCell);
731                   if (comp == 0) {
732                     // Compare types. Let the delete types sort ahead of puts;
733                     // i.e. types
734                     // of higher numbers sort before those of lesser numbers.
735                     // Maximum
736                     // (255)
737                     // appears ahead of everything, and minimum (0) appears
738                     // after
739                     // everything.
740                     comp = (0xff & currentCell.getTypeByte()) - (0xff & seekCell.getTypeByte());
741                   }
742                 }
743               }
744             }
745           }
746         } else {
747           Cell r = new KeyValue.KeyOnlyKeyValue(current.keyBuffer, 0, current.keyLength);
748           comp = comparator.compareOnlyKeyPortion(seekCell, r);
749         }
750         if (comp == 0) { // exact match
751           if (seekBefore) {
752             if (!previous.isValid()) {
753               // The caller (seekBefore) has to ensure that we are not at the
754               // first key in the block.
755               throw new IllegalStateException("Cannot seekBefore if "
756                   + "positioned at the first key in the block: key="
757                   + Bytes.toStringBinary(seekCell.getRowArray()));
758             }
759             moveToPrevious();
760             return 1;
761           }
762           return 0;
763         }
764 
765         if (comp < 0) { // already too large, check previous
766           if (previous.isValid()) {
767             moveToPrevious();
768           } else {
769             return HConstants.INDEX_KEY_MAGIC; // using optimized index key
770           }
771           return 1;
772         }
773 
774         // move to next, if more data is available
775         if (currentBuffer.hasRemaining()) {
776           previous.copyFromNext(current);
777           decodeNext();
778           current.setKey(current.keyBuffer, current.memstoreTS);
779         } else {
780           break;
781         }
782       } while (true);
783 
784       // we hit the end of the block, not an exact match
785       return 1;
786     }
787 
788     private int compareTypeBytes(Cell key, Cell right) {
789       if (key.getFamilyLength() + key.getQualifierLength() == 0
790           && key.getTypeByte() == Type.Minimum.getCode()) {
791         // left is "bigger", i.e. it appears later in the sorted order
792         return 1;
793       }
794       if (right.getFamilyLength() + right.getQualifierLength() == 0
795           && right.getTypeByte() == Type.Minimum.getCode()) {
796         return -1;
797       }
798       return 0;
799     }
800 
801 
802     private void moveToPrevious() {
803       if (!previous.isValid()) {
804         throw new IllegalStateException(
805             "Can move back only once and not in first key in the block.");
806       }
807 
808       STATE tmp = previous;
809       previous = current;
810       current = tmp;
811 
812       // move after last key value
813       currentBuffer.position(current.nextKvOffset);
814       // Already decoded the tag bytes. We cache this tags into current state and also the total
815       // compressed length of the tags bytes. For the next time decodeNext() we don't need to decode
816       // the tags again. This might pollute the Data Dictionary what we use for the compression.
817       // When current.uncompressTags is false, we will just reuse the current.tagsBuffer and skip
818       // 'tagsCompressedLength' bytes of source stream.
819       // See in decodeTags()
820       current.tagsBuffer = previous.tagsBuffer;
821       current.tagsCompressedLength = previous.tagsCompressedLength;
822       current.uncompressTags = false;
823       previous.invalidate();
824     }
825 
826     @SuppressWarnings("unchecked")
827     protected STATE createSeekerState() {
828       // This will fail for non-default seeker state if the subclass does not
829       // override this method.
830       return (STATE) new SeekerState();
831     }
832 
833     abstract protected void decodeFirst();
834     abstract protected void decodeNext();
835   }
836 
837   /**
838    * @param cell
839    * @param out
840    * @param encodingCtx
841    * @return unencoded size added
842    * @throws IOException
843    */
844   protected final int afterEncodingKeyValue(Cell cell, DataOutputStream out,
845       HFileBlockDefaultEncodingContext encodingCtx) throws IOException {
846     int size = 0;
847     if (encodingCtx.getHFileContext().isIncludesTags()) {
848       int tagsLength = cell.getTagsLength();
849       ByteBufferUtils.putCompressedInt(out, tagsLength);
850       // There are some tags to be written
851       if (tagsLength > 0) {
852         TagCompressionContext tagCompressionContext = encodingCtx.getTagCompressionContext();
853         // When tag compression is enabled, tagCompressionContext will have a not null value. Write
854         // the tags using Dictionary compression in such a case
855         if (tagCompressionContext != null) {
856           tagCompressionContext
857               .compressTags(out, cell.getTagsArray(), cell.getTagsOffset(), tagsLength);
858         } else {
859           out.write(cell.getTagsArray(), cell.getTagsOffset(), tagsLength);
860         }
861       }
862       size += tagsLength + KeyValue.TAGS_LENGTH_SIZE;
863     }
864     if (encodingCtx.getHFileContext().isIncludesMvcc()) {
865       // Copy memstore timestamp from the byte buffer to the output stream.
866       long memstoreTS = cell.getSequenceId();
867       WritableUtils.writeVLong(out, memstoreTS);
868       // TODO use a writeVLong which returns the #bytes written so that 2 time parsing can be
869       // avoided.
870       size += WritableUtils.getVIntSize(memstoreTS);
871     }
872     return size;
873   }
874 
875   protected final void afterDecodingKeyValue(DataInputStream source,
876       ByteBuffer dest, HFileBlockDefaultDecodingContext decodingCtx) throws IOException {
877     if (decodingCtx.getHFileContext().isIncludesTags()) {
878       int tagsLength = ByteBufferUtils.readCompressedInt(source);
879       // Put as unsigned short
880       dest.put((byte) ((tagsLength >> 8) & 0xff));
881       dest.put((byte) (tagsLength & 0xff));
882       if (tagsLength > 0) {
883         TagCompressionContext tagCompressionContext = decodingCtx.getTagCompressionContext();
884         // When tag compression is been used in this file, tagCompressionContext will have a not
885         // null value passed.
886         if (tagCompressionContext != null) {
887           tagCompressionContext.uncompressTags(source, dest, tagsLength);
888         } else {
889           ByteBufferUtils.copyFromStreamToBuffer(dest, source, tagsLength);
890         }
891       }
892     }
893     if (decodingCtx.getHFileContext().isIncludesMvcc()) {
894       long memstoreTS = -1;
895       try {
896         // Copy memstore timestamp from the data input stream to the byte
897         // buffer.
898         memstoreTS = WritableUtils.readVLong(source);
899         ByteBufferUtils.writeVLong(dest, memstoreTS);
900       } catch (IOException ex) {
901         throw new RuntimeException("Unable to copy memstore timestamp " +
902             memstoreTS + " after decoding a key/value");
903       }
904     }
905   }
906 
907   @Override
908   public HFileBlockEncodingContext newDataBlockEncodingContext(DataBlockEncoding encoding,
909       byte[] header, HFileContext meta) {
910     return new HFileBlockDefaultEncodingContext(encoding, header, meta);
911   }
912 
913   @Override
914   public HFileBlockDecodingContext newDataBlockDecodingContext(HFileContext meta) {
915     return new HFileBlockDefaultDecodingContext(meta);
916   }
917 
918   protected abstract ByteBuffer internalDecodeKeyValues(DataInputStream source,
919       int allocateHeaderLength, int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx)
920       throws IOException;
921 
922   /**
923    * Asserts that there is at least the given amount of unfilled space
924    * remaining in the given buffer.
925    * @param out typically, the buffer we are writing to
926    * @param length the required space in the buffer
927    * @throws EncoderBufferTooSmallException If there are no enough bytes.
928    */
929   protected static void ensureSpace(ByteBuffer out, int length)
930       throws EncoderBufferTooSmallException {
931     if (out.position() + length > out.limit()) {
932       throw new EncoderBufferTooSmallException(
933           "Buffer position=" + out.position() +
934           ", buffer limit=" + out.limit() +
935           ", length to be written=" + length);
936     }
937   }
938 
939   @Override
940   public void startBlockEncoding(HFileBlockEncodingContext blkEncodingCtx, DataOutputStream out)
941       throws IOException {
942     if (blkEncodingCtx.getClass() != HFileBlockDefaultEncodingContext.class) {
943       throw new IOException (this.getClass().getName() + " only accepts "
944           + HFileBlockDefaultEncodingContext.class.getName() + " as the " +
945           "encoding context.");
946     }
947 
948     HFileBlockDefaultEncodingContext encodingCtx =
949         (HFileBlockDefaultEncodingContext) blkEncodingCtx;
950     encodingCtx.prepareEncoding(out);
951     if (encodingCtx.getHFileContext().isIncludesTags()
952         && encodingCtx.getHFileContext().isCompressTags()) {
953       if (encodingCtx.getTagCompressionContext() != null) {
954         // It will be overhead to create the TagCompressionContext again and again for every block
955         // encoding.
956         encodingCtx.getTagCompressionContext().clear();
957       } else {
958         try {
959           TagCompressionContext tagCompressionContext = new TagCompressionContext(
960               LRUDictionary.class, Byte.MAX_VALUE);
961           encodingCtx.setTagCompressionContext(tagCompressionContext);
962         } catch (Exception e) {
963           throw new IOException("Failed to initialize TagCompressionContext", e);
964         }
965       }
966     }
967     ByteBufferUtils.putInt(out, 0); // DUMMY length. This will be updated in endBlockEncoding()
968     blkEncodingCtx.setEncodingState(new BufferedDataBlockEncodingState());
969   }
970 
971   private static class BufferedDataBlockEncodingState extends EncodingState {
972     int unencodedDataSizeWritten = 0;
973   }
974 
975   @Override
976   public int encode(Cell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
977       throws IOException {
978     BufferedDataBlockEncodingState state = (BufferedDataBlockEncodingState) encodingCtx
979         .getEncodingState();
980     int encodedKvSize = internalEncode(cell, (HFileBlockDefaultEncodingContext) encodingCtx, out);
981     state.unencodedDataSizeWritten += encodedKvSize;
982     return encodedKvSize;
983   }
984 
985   public abstract int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingCtx,
986       DataOutputStream out) throws IOException;
987 
988   @Override
989   public void endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out,
990       byte[] uncompressedBytesWithHeader) throws IOException {
991     BufferedDataBlockEncodingState state = (BufferedDataBlockEncodingState) encodingCtx
992         .getEncodingState();
993     // Write the unencodedDataSizeWritten (with header size)
994     Bytes.putInt(uncompressedBytesWithHeader, HConstants.HFILEBLOCK_HEADER_SIZE
995         + DataBlockEncoding.ID_SIZE, state.unencodedDataSizeWritten
996         );
997     if (encodingCtx.getDataBlockEncoding() != DataBlockEncoding.NONE) {
998       encodingCtx.postEncoding(BlockType.ENCODED_DATA);
999     } else {
1000       encodingCtx.postEncoding(BlockType.DATA);
1001     }
1002   }
1003 }