View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations
15   * under the License.
16   */
17  package org.apache.hadoop.hbase.io.encoding;
18  
19  import java.io.DataInputStream;
20  import java.io.DataOutputStream;
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  
24  import org.apache.hadoop.hbase.Cell;
25  import org.apache.hadoop.hbase.CellComparator;
26  import org.apache.hadoop.hbase.CellUtil;
27  import org.apache.hadoop.hbase.HConstants;
28  import org.apache.hadoop.hbase.KeyValue;
29  import org.apache.hadoop.hbase.KeyValue.KVComparator;
30  import org.apache.hadoop.hbase.KeyValue.SamePrefixComparator;
31  import org.apache.hadoop.hbase.KeyValue.Type;
32  import org.apache.hadoop.hbase.KeyValueUtil;
33  import org.apache.hadoop.hbase.SettableSequenceId;
34  import org.apache.hadoop.hbase.classification.InterfaceAudience;
35  import org.apache.hadoop.hbase.io.HeapSize;
36  import org.apache.hadoop.hbase.io.TagCompressionContext;
37  import org.apache.hadoop.hbase.io.hfile.BlockType;
38  import org.apache.hadoop.hbase.io.hfile.HFileContext;
39  import org.apache.hadoop.hbase.io.util.LRUDictionary;
40  import org.apache.hadoop.hbase.util.ByteBufferUtils;
41  import org.apache.hadoop.hbase.util.Bytes;
42  import org.apache.hadoop.hbase.util.ClassSize;
43  import org.apache.hadoop.io.WritableUtils;
44  
45  /**
46   * Base class for all data block encoders that use a buffer.
47   */
48  @InterfaceAudience.Private
49  abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
50  
51    private static int INITIAL_KEY_BUFFER_SIZE = 512;
52  
53    @Override
54    public ByteBuffer decodeKeyValues(DataInputStream source,
55        HFileBlockDecodingContext blkDecodingCtx) throws IOException {
56      if (blkDecodingCtx.getClass() != HFileBlockDefaultDecodingContext.class) {
57        throw new IOException(this.getClass().getName() + " only accepts "
58            + HFileBlockDefaultDecodingContext.class.getName() + " as the decoding context.");
59      }
60  
61      HFileBlockDefaultDecodingContext decodingCtx =
62          (HFileBlockDefaultDecodingContext) blkDecodingCtx;
63      if (decodingCtx.getHFileContext().isIncludesTags()
64          && decodingCtx.getHFileContext().isCompressTags()) {
65        if (decodingCtx.getTagCompressionContext() != null) {
66          // It will be overhead to create the TagCompressionContext again and again for every block
67          // decoding.
68          decodingCtx.getTagCompressionContext().clear();
69        } else {
70          try {
71            TagCompressionContext tagCompressionContext = new TagCompressionContext(
72                LRUDictionary.class, Byte.MAX_VALUE);
73            decodingCtx.setTagCompressionContext(tagCompressionContext);
74          } catch (Exception e) {
75            throw new IOException("Failed to initialize TagCompressionContext", e);
76          }
77        }
78      }
79      return internalDecodeKeyValues(source, 0, 0, decodingCtx);
80    }
81  
82    protected static class SeekerState implements Cell {
83      protected ByteBuffer currentBuffer;
84      protected TagCompressionContext tagCompressionContext;
85      protected int valueOffset = -1;
86      protected int keyLength;
87      protected int valueLength;
88      protected int lastCommonPrefix;
89      protected int tagsLength = 0;
90      protected int tagsOffset = -1;
91      protected int tagsCompressedLength = 0;
92      protected boolean uncompressTags = true;
93  
94      /** We need to store a copy of the key. */
95      protected byte[] keyBuffer = new byte[INITIAL_KEY_BUFFER_SIZE];
96      protected byte[] tagsBuffer = new byte[INITIAL_KEY_BUFFER_SIZE];
97  
98      protected long memstoreTS;
99      protected int nextKvOffset;
100     protected KeyValue.KeyOnlyKeyValue currentKey = new KeyValue.KeyOnlyKeyValue();
101 
102     protected boolean isValid() {
103       return valueOffset != -1;
104     }
105 
106     protected void invalidate() {
107       valueOffset = -1;
108       tagsCompressedLength = 0;
109       currentKey = new KeyValue.KeyOnlyKeyValue();
110       uncompressTags = true;
111       currentBuffer = null;
112     }
113 
114     protected void ensureSpaceForKey() {
115       if (keyLength > keyBuffer.length) {
116         // rare case, but we need to handle arbitrary length of key
117         int newKeyBufferLength = Math.max(keyBuffer.length, 1) * 2;
118         while (keyLength > newKeyBufferLength) {
119           newKeyBufferLength *= 2;
120         }
121         byte[] newKeyBuffer = new byte[newKeyBufferLength];
122         System.arraycopy(keyBuffer, 0, newKeyBuffer, 0, keyBuffer.length);
123         keyBuffer = newKeyBuffer;
124       }
125     }
126 
127     protected void ensureSpaceForTags() {
128       if (tagsLength > tagsBuffer.length) {
129         // rare case, but we need to handle arbitrary length of tags
130         int newTagsBufferLength = Math.max(tagsBuffer.length, 1) * 2;
131         while (tagsLength > newTagsBufferLength) {
132           newTagsBufferLength *= 2;
133         }
134         byte[] newTagsBuffer = new byte[newTagsBufferLength];
135         System.arraycopy(tagsBuffer, 0, newTagsBuffer, 0, tagsBuffer.length);
136         tagsBuffer = newTagsBuffer;
137       }
138     }
139 
140     protected void setKey(byte[] keyBuffer, long memTS) {
141       currentKey.setKey(keyBuffer, 0, keyLength);
142       memstoreTS = memTS;
143     }
144 
145     /**
146      * Copy the state from the next one into this instance (the previous state
147      * placeholder). Used to save the previous state when we are advancing the
148      * seeker to the next key/value.
149      */
150     protected void copyFromNext(SeekerState nextState) {
151       if (keyBuffer.length != nextState.keyBuffer.length) {
152         keyBuffer = nextState.keyBuffer.clone();
153       } else if (!isValid()) {
154         // Note: we can only call isValid before we override our state, so this
155         // comes before all the assignments at the end of this method.
156         System.arraycopy(nextState.keyBuffer, 0, keyBuffer, 0,
157              nextState.keyLength);
158       } else {
159         // don't copy the common prefix between this key and the previous one
160         System.arraycopy(nextState.keyBuffer, nextState.lastCommonPrefix,
161             keyBuffer, nextState.lastCommonPrefix, nextState.keyLength
162                 - nextState.lastCommonPrefix);
163       }
164       currentKey = nextState.currentKey;
165 
166       valueOffset = nextState.valueOffset;
167       keyLength = nextState.keyLength;
168       valueLength = nextState.valueLength;
169       lastCommonPrefix = nextState.lastCommonPrefix;
170       nextKvOffset = nextState.nextKvOffset;
171       memstoreTS = nextState.memstoreTS;
172       currentBuffer = nextState.currentBuffer;
173       if (nextState.tagCompressionContext != null) {
174         tagCompressionContext = nextState.tagCompressionContext;
175       }
176     }
177 
178     @Override
179     public byte[] getRowArray() {
180       return currentKey.getRowArray();
181     }
182 
183     @Override
184     public int getRowOffset() {
185       return Bytes.SIZEOF_SHORT;
186     }
187 
188     @Override
189     public short getRowLength() {
190       return currentKey.getRowLength();
191     }
192 
193     @Override
194     public byte[] getFamilyArray() {
195       return currentKey.getFamilyArray();
196     }
197 
198     @Override
199     public int getFamilyOffset() {
200       return currentKey.getFamilyOffset();
201     }
202 
203     @Override
204     public byte getFamilyLength() {
205       return currentKey.getFamilyLength();
206     }
207 
208     @Override
209     public byte[] getQualifierArray() {
210       return currentKey.getQualifierArray();
211     }
212 
213     @Override
214     public int getQualifierOffset() {
215       return currentKey.getQualifierOffset();
216     }
217 
218     @Override
219     public int getQualifierLength() {
220       return currentKey.getQualifierLength();
221     }
222 
223     @Override
224     public long getTimestamp() {
225       return currentKey.getTimestamp();
226     }
227 
228     @Override
229     public byte getTypeByte() {
230       return currentKey.getTypeByte();
231     }
232 
233     @Override
234     public long getMvccVersion() {
235       return memstoreTS;
236     }
237 
238     @Override
239     public long getSequenceId() {
240       return memstoreTS;
241     }
242 
243     @Override
244     public byte[] getValueArray() {
245       return currentBuffer.array();
246     }
247 
248     @Override
249     public int getValueOffset() {
250       return currentBuffer.arrayOffset() + valueOffset;
251     }
252 
253     @Override
254     public int getValueLength() {
255       return valueLength;
256     }
257 
258     @Override
259     public byte[] getTagsArray() {
260       if (tagCompressionContext != null) {
261         return tagsBuffer;
262       }
263       return currentBuffer.array();
264     }
265 
266     @Override
267     public int getTagsOffset() {
268       if (tagCompressionContext != null) {
269         return 0;
270       }
271       return currentBuffer.arrayOffset() + tagsOffset;
272     }
273 
274     @Override
275     public int getTagsLength() {
276       return tagsLength;
277     }
278 
279     @Override
280     @Deprecated
281     public byte[] getValue() {
282       throw new UnsupportedOperationException("getValue() not supported");
283     }
284 
285     @Override
286     @Deprecated
287     public byte[] getFamily() {
288       throw new UnsupportedOperationException("getFamily() not supported");
289     }
290 
291     @Override
292     @Deprecated
293     public byte[] getQualifier() {
294       throw new UnsupportedOperationException("getQualifier() not supported");
295     }
296 
297     @Override
298     @Deprecated
299     public byte[] getRow() {
300       throw new UnsupportedOperationException("getRow() not supported");
301     }
302 
303     @Override
304     public String toString() {
305       return KeyValue.keyToString(this.keyBuffer, 0, KeyValueUtil.keyLength(this)) + "/vlen="
306           + getValueLength() + "/seqid=" + memstoreTS;
307     }
308 
309     public Cell shallowCopy() {
310       return new ClonedSeekerState(currentBuffer, keyBuffer, currentKey.getRowLength(),
311           currentKey.getFamilyOffset(), currentKey.getFamilyLength(), keyLength, 
312           currentKey.getQualifierOffset(), currentKey.getQualifierLength(),
313           currentKey.getTimestamp(), currentKey.getTypeByte(), valueLength, valueOffset,
314           memstoreTS, tagsOffset, tagsLength, tagCompressionContext, tagsBuffer);
315     }
316   }
317 
318   /**
319    * Copies only the key part of the keybuffer by doing a deep copy and passes the 
320    * seeker state members for taking a clone.
321    * Note that the value byte[] part is still pointing to the currentBuffer and the 
322    * represented by the valueOffset and valueLength
323    */
324   // We return this as a Cell to the upper layers of read flow and might try setting a new SeqId
325   // there. So this has to be an instance of SettableSequenceId. SeekerState need not be
326   // SettableSequenceId as we never return that to top layers. When we have to, we make
327   // ClonedSeekerState from it.
328   protected static class ClonedSeekerState implements Cell, HeapSize, SettableSequenceId {
329     private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
330         + (4 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT)
331         + (Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) + (2 * ClassSize.ARRAY));
332     private byte[] keyOnlyBuffer;
333     private ByteBuffer currentBuffer;
334     private short rowLength;
335     private int familyOffset;
336     private byte familyLength;
337     private int qualifierOffset;
338     private int qualifierLength;
339     private long timestamp;
340     private byte typeByte;
341     private int valueOffset;
342     private int valueLength;
343     private int tagsLength;
344     private int tagsOffset;
345     private byte[] cloneTagsBuffer;
346     private long seqId;
347     private TagCompressionContext tagCompressionContext;
348     
349     protected ClonedSeekerState(ByteBuffer currentBuffer, byte[] keyBuffer, short rowLength,
350         int familyOffset, byte familyLength, int keyLength, int qualOffset, int qualLength,
351         long timeStamp, byte typeByte, int valueLen, int valueOffset, long seqId,
352         int tagsOffset, int tagsLength, TagCompressionContext tagCompressionContext,
353         byte[] tagsBuffer) {
354       this.currentBuffer = currentBuffer;
355       keyOnlyBuffer = new byte[keyLength];
356       this.tagCompressionContext = tagCompressionContext;
357       this.rowLength = rowLength;
358       this.familyOffset = familyOffset;
359       this.familyLength = familyLength;
360       this.qualifierOffset = qualOffset;
361       this.qualifierLength = qualLength;
362       this.timestamp = timeStamp;
363       this.typeByte = typeByte;
364       this.valueLength = valueLen;
365       this.valueOffset = valueOffset;
366       this.tagsOffset = tagsOffset;
367       this.tagsLength = tagsLength;
368       System.arraycopy(keyBuffer, 0, keyOnlyBuffer, 0, keyLength);
369       if (tagCompressionContext != null) {
370         this.cloneTagsBuffer = new byte[tagsLength];
371         System.arraycopy(tagsBuffer, 0, this.cloneTagsBuffer, 0, tagsLength);
372       }
373       setSequenceId(seqId);
374     }
375 
376     @Override
377     public byte[] getRowArray() {
378       return keyOnlyBuffer;
379     }
380 
381     @Override
382     public byte[] getFamilyArray() {
383       return keyOnlyBuffer;
384     }
385 
386     @Override
387     public byte[] getQualifierArray() {
388       return keyOnlyBuffer;
389     }
390 
391     @Override
392     public int getRowOffset() {
393       return Bytes.SIZEOF_SHORT;
394     }
395 
396     @Override
397     public short getRowLength() {
398       return rowLength;
399     }
400 
401     @Override
402     public int getFamilyOffset() {
403       return familyOffset;
404     }
405 
406     @Override
407     public byte getFamilyLength() {
408       return familyLength;
409     }
410 
411     @Override
412     public int getQualifierOffset() {
413       return qualifierOffset;
414     }
415 
416     @Override
417     public int getQualifierLength() {
418       return qualifierLength;
419     }
420 
421     @Override
422     public long getTimestamp() {
423       return timestamp;
424     }
425 
426     @Override
427     public byte getTypeByte() {
428       return typeByte;
429     }
430 
431     @Override
432     @Deprecated
433     public long getMvccVersion() {
434       return getSequenceId();
435     }
436 
437     @Override
438     public long getSequenceId() {
439       return seqId;
440     }
441 
442     @Override
443     public byte[] getValueArray() {
444       return currentBuffer.array();
445     }
446 
447     @Override
448     public int getValueOffset() {
449       return currentBuffer.arrayOffset() + valueOffset;
450     }
451 
452     @Override
453     public int getValueLength() {
454       return valueLength;
455     }
456 
457     @Override
458     public byte[] getTagsArray() {
459       if (tagCompressionContext != null) {
460         return cloneTagsBuffer;
461       }
462       return currentBuffer.array();
463     }
464 
465     @Override
466     public int getTagsOffset() {
467       if (tagCompressionContext != null) {
468         return 0;
469       }
470       return currentBuffer.arrayOffset() + tagsOffset;
471     }
472 
473     @Override
474     public int getTagsLength() {
475       return tagsLength;
476     }
477 
478     @Override
479     @Deprecated
480     public byte[] getValue() {
481       return CellUtil.cloneValue(this);
482     }
483 
484     @Override
485     @Deprecated
486     public byte[] getFamily() {
487       return CellUtil.cloneFamily(this);
488     }
489 
490     @Override
491     @Deprecated
492     public byte[] getQualifier() {
493       return CellUtil.cloneQualifier(this);
494     }
495 
496     @Override
497     @Deprecated
498     public byte[] getRow() {
499       return CellUtil.cloneRow(this);
500     }
501 
502     @Override
503     public String toString() {
504       return KeyValue.keyToString(this.keyOnlyBuffer, 0, KeyValueUtil.keyLength(this)) + "/vlen="
505           + getValueLength() + "/seqid=" + seqId;
506     }
507 
508     @Override
509     public void setSequenceId(long seqId) {
510       this.seqId = seqId;
511     }
512 
513     @Override
514     public long heapSize() {
515       return FIXED_OVERHEAD + rowLength + familyLength + qualifierLength + valueLength + tagsLength;
516     }
517   }
518 
519   protected abstract static class
520       BufferedEncodedSeeker<STATE extends SeekerState>
521       implements EncodedSeeker {
522     protected HFileBlockDecodingContext decodingCtx;
523     protected final KVComparator comparator;
524     protected final SamePrefixComparator<byte[]> samePrefixComparator;
525     protected ByteBuffer currentBuffer;
526     protected STATE current = createSeekerState(); // always valid
527     protected STATE previous = createSeekerState(); // may not be valid
528     protected TagCompressionContext tagCompressionContext = null;
529 
530     public BufferedEncodedSeeker(KVComparator comparator,
531         HFileBlockDecodingContext decodingCtx) {
532       this.comparator = comparator;
533       this.samePrefixComparator = comparator;
534       this.decodingCtx = decodingCtx;
535       if (decodingCtx.getHFileContext().isCompressTags()) {
536         try {
537           tagCompressionContext = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
538         } catch (Exception e) {
539           throw new RuntimeException("Failed to initialize TagCompressionContext", e);
540         }
541       }
542     }
543     
544     protected boolean includesMvcc() {
545       return this.decodingCtx.getHFileContext().isIncludesMvcc();
546     }
547 
548     protected boolean includesTags() {
549       return this.decodingCtx.getHFileContext().isIncludesTags();
550     }
551 
552     @Override
553     public int compareKey(KVComparator comparator, byte[] key, int offset, int length) {
554       return comparator.compareFlatKey(key, offset, length,
555           current.keyBuffer, 0, current.keyLength);
556     }
557 
558     @Override
559     public int compareKey(KVComparator comparator, Cell key) {
560       return comparator.compareOnlyKeyPortion(key,
561           new KeyValue.KeyOnlyKeyValue(current.keyBuffer, 0, current.keyLength));
562     }
563 
564     @Override
565     public void setCurrentBuffer(ByteBuffer buffer) {
566       if (this.tagCompressionContext != null) {
567         this.tagCompressionContext.clear();
568       }
569       currentBuffer = buffer;
570       current.currentBuffer = currentBuffer;
571       if(tagCompressionContext != null) {
572         current.tagCompressionContext = tagCompressionContext;
573       }
574       decodeFirst();
575       current.setKey(current.keyBuffer, current.memstoreTS);
576       previous.invalidate();
577     }
578 
579     @Override
580     public ByteBuffer getKeyDeepCopy() {
581       ByteBuffer keyBuffer = ByteBuffer.allocate(current.keyLength);
582       keyBuffer.put(current.keyBuffer, 0, current.keyLength);
583       return keyBuffer;
584     }
585 
586     @Override
587     public ByteBuffer getValueShallowCopy() {
588       ByteBuffer dup = currentBuffer.duplicate();
589       dup.position(current.valueOffset);
590       dup.limit(current.valueOffset + current.valueLength);
591       return dup.slice();
592     }
593 
594     @Override
595     public ByteBuffer getKeyValueBuffer() {
596       ByteBuffer kvBuffer = createKVBuffer();
597       kvBuffer.putInt(current.keyLength);
598       kvBuffer.putInt(current.valueLength);
599       kvBuffer.put(current.keyBuffer, 0, current.keyLength);
600       ByteBufferUtils.copyFromBufferToBuffer(kvBuffer, currentBuffer, current.valueOffset,
601           current.valueLength);
602       if (current.tagsLength > 0) {
603         // Put short as unsigned
604         kvBuffer.put((byte) (current.tagsLength >> 8 & 0xff));
605         kvBuffer.put((byte) (current.tagsLength & 0xff));
606         if (current.tagsOffset != -1) {
607           // the offset of the tags bytes in the underlying buffer is marked. So the temp
608           // buffer,tagsBuffer was not been used.
609           ByteBufferUtils.copyFromBufferToBuffer(kvBuffer, currentBuffer, current.tagsOffset,
610               current.tagsLength);
611         } else {
612           // When tagsOffset is marked as -1, tag compression was present and so the tags were
613           // uncompressed into temp buffer, tagsBuffer. Let us copy it from there
614           kvBuffer.put(current.tagsBuffer, 0, current.tagsLength);
615         }
616       }
617       return kvBuffer;
618     }
619 
620     protected ByteBuffer createKVBuffer() {
621       int kvBufSize = (int) KeyValue.getKeyValueDataStructureSize(current.keyLength,
622           current.valueLength, current.tagsLength);
623       ByteBuffer kvBuffer = ByteBuffer.allocate(kvBufSize);
624       return kvBuffer;
625     }
626 
627     @Override
628     public Cell getKeyValue() {
629       return current.shallowCopy();
630     }
631 
632     @Override
633     public void rewind() {
634       currentBuffer.rewind();
635       if (tagCompressionContext != null) {
636         tagCompressionContext.clear();
637       }
638       decodeFirst();
639       current.setKey(current.keyBuffer, current.memstoreTS);
640       previous.invalidate();
641     }
642 
643     @Override
644     public boolean next() {
645       if (!currentBuffer.hasRemaining()) {
646         return false;
647       }
648       decodeNext();
649       current.setKey(current.keyBuffer, current.memstoreTS);
650       previous.invalidate();
651       return true;
652     }
653 
654     protected void decodeTags() {
655       current.tagsLength = ByteBufferUtils.readCompressedInt(currentBuffer);
656       if (tagCompressionContext != null) {
657         if (current.uncompressTags) {
658           // Tag compression is been used. uncompress it into tagsBuffer
659           current.ensureSpaceForTags();
660           try {
661             current.tagsCompressedLength = tagCompressionContext.uncompressTags(currentBuffer,
662                 current.tagsBuffer, 0, current.tagsLength);
663           } catch (IOException e) {
664             throw new RuntimeException("Exception while uncompressing tags", e);
665           }
666         } else {
667           ByteBufferUtils.skip(currentBuffer, current.tagsCompressedLength);
668           current.uncompressTags = true;// Reset this.
669         }
670         current.tagsOffset = -1;
671       } else {
672         // When tag compress is not used, let us not do copying of tags bytes into tagsBuffer.
673         // Just mark the tags Offset so as to create the KV buffer later in getKeyValueBuffer()
674         current.tagsOffset = currentBuffer.position();
675         ByteBufferUtils.skip(currentBuffer, current.tagsLength);
676       }
677     }
678 
679     @Override
680     public int seekToKeyInBlock(byte[] key, int offset, int length, boolean seekBefore) {
681       return seekToKeyInBlock(new KeyValue.KeyOnlyKeyValue(key, offset, length), seekBefore);
682     }
683 
684     @Override
685     public int seekToKeyInBlock(Cell seekCell, boolean seekBefore) {
686       int rowCommonPrefix = 0;
687       int familyCommonPrefix = 0;
688       int qualCommonPrefix = 0;
689       previous.invalidate();
690       KeyValue.KeyOnlyKeyValue currentCell = new KeyValue.KeyOnlyKeyValue();
691       do {
692         int comp;
693         if (samePrefixComparator != null) {
694           currentCell.setKey(current.keyBuffer, 0, current.keyLength);
695           if (current.lastCommonPrefix != 0) {
696             // The KV format has row key length also in the byte array. The
697             // common prefix
698             // includes it. So we need to subtract to find out the common prefix
699             // in the
700             // row part alone
701             rowCommonPrefix = Math.min(rowCommonPrefix, current.lastCommonPrefix - 2);
702           }
703           if (current.lastCommonPrefix <= 2) {
704             rowCommonPrefix = 0;
705           }
706           rowCommonPrefix += CellComparator.findCommonPrefixInRowPart(seekCell, currentCell,
707               rowCommonPrefix);
708           comp = CellComparator.compareCommonRowPrefix(seekCell, currentCell, rowCommonPrefix);
709           if (comp == 0) {
710             comp = compareTypeBytes(seekCell, currentCell);
711             if (comp == 0) {
712               // Subtract the fixed row key length and the family key fixed length
713               familyCommonPrefix = Math.max(
714                   0,
715                   Math.min(familyCommonPrefix,
716                       current.lastCommonPrefix - (3 + currentCell.getRowLength())));
717               familyCommonPrefix += CellComparator.findCommonPrefixInFamilyPart(seekCell,
718                   currentCell, familyCommonPrefix);
719               comp = CellComparator.compareCommonFamilyPrefix(seekCell, currentCell,
720                   familyCommonPrefix);
721               if (comp == 0) {
722                 // subtract the rowkey fixed length and the family key fixed
723                 // length
724                 qualCommonPrefix = Math.max(
725                     0,
726                     Math.min(
727                         qualCommonPrefix,
728                         current.lastCommonPrefix
729                             - (3 + currentCell.getRowLength() + currentCell.getFamilyLength())));
730                 qualCommonPrefix += CellComparator.findCommonPrefixInQualifierPart(seekCell,
731                     currentCell, qualCommonPrefix);
732                 comp = CellComparator.compareCommonQualifierPrefix(seekCell, currentCell,
733                     qualCommonPrefix);
734                 if (comp == 0) {
735                   comp = CellComparator.compareTimestamps(seekCell, currentCell);
736                   if (comp == 0) {
737                     // Compare types. Let the delete types sort ahead of puts;
738                     // i.e. types
739                     // of higher numbers sort before those of lesser numbers.
740                     // Maximum
741                     // (255)
742                     // appears ahead of everything, and minimum (0) appears
743                     // after
744                     // everything.
745                     comp = (0xff & currentCell.getTypeByte()) - (0xff & seekCell.getTypeByte());
746                   }
747                 }
748               }
749             }
750           }
751         } else {
752           Cell r = new KeyValue.KeyOnlyKeyValue(current.keyBuffer, 0, current.keyLength);
753           comp = comparator.compareOnlyKeyPortion(seekCell, r);
754         }
755         if (comp == 0) { // exact match
756           if (seekBefore) {
757             if (!previous.isValid()) {
758               // The caller (seekBefore) has to ensure that we are not at the
759               // first key in the block.
760               throw new IllegalStateException("Cannot seekBefore if "
761                   + "positioned at the first key in the block: key="
762                   + Bytes.toStringBinary(seekCell.getRowArray()));
763             }
764             moveToPrevious();
765             return 1;
766           }
767           return 0;
768         }
769 
770         if (comp < 0) { // already too large, check previous
771           if (previous.isValid()) {
772             moveToPrevious();
773           } else {
774             return HConstants.INDEX_KEY_MAGIC; // using optimized index key
775           }
776           return 1;
777         }
778 
779         // move to next, if more data is available
780         if (currentBuffer.hasRemaining()) {
781           previous.copyFromNext(current);
782           decodeNext();
783           current.setKey(current.keyBuffer, current.memstoreTS);
784         } else {
785           break;
786         }
787       } while (true);
788 
789       // we hit the end of the block, not an exact match
790       return 1;
791     }
792 
793     private int compareTypeBytes(Cell key, Cell right) {
794       if (key.getFamilyLength() + key.getQualifierLength() == 0
795           && key.getTypeByte() == Type.Minimum.getCode()) {
796         // left is "bigger", i.e. it appears later in the sorted order
797         return 1;
798       }
799       if (right.getFamilyLength() + right.getQualifierLength() == 0
800           && right.getTypeByte() == Type.Minimum.getCode()) {
801         return -1;
802       }
803       return 0;
804     }
805 
806 
807     private void moveToPrevious() {
808       if (!previous.isValid()) {
809         throw new IllegalStateException(
810             "Can move back only once and not in first key in the block.");
811       }
812 
813       STATE tmp = previous;
814       previous = current;
815       current = tmp;
816 
817       // move after last key value
818       currentBuffer.position(current.nextKvOffset);
819       // Already decoded the tag bytes. We cache this tags into current state and also the total
820       // compressed length of the tags bytes. For the next time decodeNext() we don't need to decode
821       // the tags again. This might pollute the Data Dictionary what we use for the compression.
822       // When current.uncompressTags is false, we will just reuse the current.tagsBuffer and skip
823       // 'tagsCompressedLength' bytes of source stream.
824       // See in decodeTags()
825       current.tagsBuffer = previous.tagsBuffer;
826       current.tagsCompressedLength = previous.tagsCompressedLength;
827       current.uncompressTags = false;
828       previous.invalidate();
829     }
830 
831     @SuppressWarnings("unchecked")
832     protected STATE createSeekerState() {
833       // This will fail for non-default seeker state if the subclass does not
834       // override this method.
835       return (STATE) new SeekerState();
836     }
837 
838     abstract protected void decodeFirst();
839     abstract protected void decodeNext();
840   }
841 
842   /**
843    * @param cell
844    * @param out
845    * @param encodingCtx
846    * @return unencoded size added
847    * @throws IOException
848    */
849   protected final int afterEncodingKeyValue(Cell cell, DataOutputStream out,
850       HFileBlockDefaultEncodingContext encodingCtx) throws IOException {
851     int size = 0;
852     if (encodingCtx.getHFileContext().isIncludesTags()) {
853       int tagsLength = cell.getTagsLength();
854       ByteBufferUtils.putCompressedInt(out, tagsLength);
855       // There are some tags to be written
856       if (tagsLength > 0) {
857         TagCompressionContext tagCompressionContext = encodingCtx.getTagCompressionContext();
858         // When tag compression is enabled, tagCompressionContext will have a not null value. Write
859         // the tags using Dictionary compression in such a case
860         if (tagCompressionContext != null) {
861           tagCompressionContext
862               .compressTags(out, cell.getTagsArray(), cell.getTagsOffset(), tagsLength);
863         } else {
864           out.write(cell.getTagsArray(), cell.getTagsOffset(), tagsLength);
865         }
866       }
867       size += tagsLength + KeyValue.TAGS_LENGTH_SIZE;
868     }
869     if (encodingCtx.getHFileContext().isIncludesMvcc()) {
870       // Copy memstore timestamp from the byte buffer to the output stream.
871       long memstoreTS = cell.getSequenceId();
872       WritableUtils.writeVLong(out, memstoreTS);
873       // TODO use a writeVLong which returns the #bytes written so that 2 time parsing can be
874       // avoided.
875       size += WritableUtils.getVIntSize(memstoreTS);
876     }
877     return size;
878   }
879 
880   protected final void afterDecodingKeyValue(DataInputStream source,
881       ByteBuffer dest, HFileBlockDefaultDecodingContext decodingCtx) throws IOException {
882     if (decodingCtx.getHFileContext().isIncludesTags()) {
883       int tagsLength = ByteBufferUtils.readCompressedInt(source);
884       // Put as unsigned short
885       dest.put((byte) ((tagsLength >> 8) & 0xff));
886       dest.put((byte) (tagsLength & 0xff));
887       if (tagsLength > 0) {
888         TagCompressionContext tagCompressionContext = decodingCtx.getTagCompressionContext();
889         // When tag compression is been used in this file, tagCompressionContext will have a not
890         // null value passed.
891         if (tagCompressionContext != null) {
892           tagCompressionContext.uncompressTags(source, dest, tagsLength);
893         } else {
894           ByteBufferUtils.copyFromStreamToBuffer(dest, source, tagsLength);
895         }
896       }
897     }
898     if (decodingCtx.getHFileContext().isIncludesMvcc()) {
899       long memstoreTS = -1;
900       try {
901         // Copy memstore timestamp from the data input stream to the byte
902         // buffer.
903         memstoreTS = WritableUtils.readVLong(source);
904         ByteBufferUtils.writeVLong(dest, memstoreTS);
905       } catch (IOException ex) {
906         throw new RuntimeException("Unable to copy memstore timestamp " +
907             memstoreTS + " after decoding a key/value");
908       }
909     }
910   }
911 
912   @Override
913   public HFileBlockEncodingContext newDataBlockEncodingContext(DataBlockEncoding encoding,
914       byte[] header, HFileContext meta) {
915     return new HFileBlockDefaultEncodingContext(encoding, header, meta);
916   }
917 
918   @Override
919   public HFileBlockDecodingContext newDataBlockDecodingContext(HFileContext meta) {
920     return new HFileBlockDefaultDecodingContext(meta);
921   }
922 
923   protected abstract ByteBuffer internalDecodeKeyValues(DataInputStream source,
924       int allocateHeaderLength, int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx)
925       throws IOException;
926 
927   /**
928    * Asserts that there is at least the given amount of unfilled space
929    * remaining in the given buffer.
930    * @param out typically, the buffer we are writing to
931    * @param length the required space in the buffer
932    * @throws EncoderBufferTooSmallException If there are no enough bytes.
933    */
934   protected static void ensureSpace(ByteBuffer out, int length)
935       throws EncoderBufferTooSmallException {
936     if (out.position() + length > out.limit()) {
937       throw new EncoderBufferTooSmallException(
938           "Buffer position=" + out.position() +
939           ", buffer limit=" + out.limit() +
940           ", length to be written=" + length);
941     }
942   }
943 
944   @Override
945   public void startBlockEncoding(HFileBlockEncodingContext blkEncodingCtx, DataOutputStream out)
946       throws IOException {
947     if (blkEncodingCtx.getClass() != HFileBlockDefaultEncodingContext.class) {
948       throw new IOException (this.getClass().getName() + " only accepts "
949           + HFileBlockDefaultEncodingContext.class.getName() + " as the " +
950           "encoding context.");
951     }
952 
953     HFileBlockDefaultEncodingContext encodingCtx =
954         (HFileBlockDefaultEncodingContext) blkEncodingCtx;
955     encodingCtx.prepareEncoding(out);
956     if (encodingCtx.getHFileContext().isIncludesTags()
957         && encodingCtx.getHFileContext().isCompressTags()) {
958       if (encodingCtx.getTagCompressionContext() != null) {
959         // It will be overhead to create the TagCompressionContext again and again for every block
960         // encoding.
961         encodingCtx.getTagCompressionContext().clear();
962       } else {
963         try {
964           TagCompressionContext tagCompressionContext = new TagCompressionContext(
965               LRUDictionary.class, Byte.MAX_VALUE);
966           encodingCtx.setTagCompressionContext(tagCompressionContext);
967         } catch (Exception e) {
968           throw new IOException("Failed to initialize TagCompressionContext", e);
969         }
970       }
971     }
972     ByteBufferUtils.putInt(out, 0); // DUMMY length. This will be updated in endBlockEncoding()
973     blkEncodingCtx.setEncodingState(new BufferedDataBlockEncodingState());
974   }
975 
976   private static class BufferedDataBlockEncodingState extends EncodingState {
977     int unencodedDataSizeWritten = 0;
978   }
979 
980   @Override
981   public int encode(Cell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
982       throws IOException {
983     BufferedDataBlockEncodingState state = (BufferedDataBlockEncodingState) encodingCtx
984         .getEncodingState();
985     int encodedKvSize = internalEncode(cell, (HFileBlockDefaultEncodingContext) encodingCtx, out);
986     state.unencodedDataSizeWritten += encodedKvSize;
987     return encodedKvSize;
988   }
989 
990   public abstract int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingCtx,
991       DataOutputStream out) throws IOException;
992 
993   @Override
994   public void endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out,
995       byte[] uncompressedBytesWithHeader) throws IOException {
996     BufferedDataBlockEncodingState state = (BufferedDataBlockEncodingState) encodingCtx
997         .getEncodingState();
998     // Write the unencodedDataSizeWritten (with header size)
999     Bytes.putInt(uncompressedBytesWithHeader, HConstants.HFILEBLOCK_HEADER_SIZE
1000         + DataBlockEncoding.ID_SIZE, state.unencodedDataSizeWritten
1001         );
1002     if (encodingCtx.getDataBlockEncoding() != DataBlockEncoding.NONE) {
1003       encodingCtx.postEncoding(BlockType.ENCODED_DATA);
1004     } else {
1005       encodingCtx.postEncoding(BlockType.DATA);
1006     }
1007   }
1008 }