View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations
15   * under the License.
16   */
17  package org.apache.hadoop.hbase.io.encoding;
18  
19  import java.io.DataInputStream;
20  import java.io.DataOutputStream;
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  
24  import org.apache.hadoop.classification.InterfaceAudience;
25  import org.apache.hadoop.hbase.Cell;
26  import org.apache.hadoop.hbase.CellComparator;
27  import org.apache.hadoop.hbase.HConstants;
28  import org.apache.hadoop.hbase.KeyValue;
29  import org.apache.hadoop.hbase.SettableSequenceId;
30  import org.apache.hadoop.hbase.KeyValue.KVComparator;
31  import org.apache.hadoop.hbase.KeyValue.SamePrefixComparator;
32  import org.apache.hadoop.hbase.KeyValue.Type;
33  import org.apache.hadoop.hbase.KeyValueUtil;
34  import org.apache.hadoop.hbase.io.TagCompressionContext;
35  import org.apache.hadoop.hbase.io.hfile.BlockType;
36  import org.apache.hadoop.hbase.io.hfile.HFileContext;
37  import org.apache.hadoop.hbase.io.util.LRUDictionary;
38  import org.apache.hadoop.hbase.util.ByteBufferUtils;
39  import org.apache.hadoop.hbase.util.Bytes;
40  import org.apache.hadoop.io.WritableUtils;
41  
42  /**
43   * Base class for all data block encoders that use a buffer.
44   */
45  @InterfaceAudience.Private
46  abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
47  
48    private static int INITIAL_KEY_BUFFER_SIZE = 512;
49  
50    @Override
51    public ByteBuffer decodeKeyValues(DataInputStream source,
52        HFileBlockDecodingContext blkDecodingCtx) throws IOException {
53      if (blkDecodingCtx.getClass() != HFileBlockDefaultDecodingContext.class) {
54        throw new IOException(this.getClass().getName() + " only accepts "
55            + HFileBlockDefaultDecodingContext.class.getName() + " as the decoding context.");
56      }
57  
58      HFileBlockDefaultDecodingContext decodingCtx =
59          (HFileBlockDefaultDecodingContext) blkDecodingCtx;
60      if (decodingCtx.getHFileContext().isIncludesTags()
61          && decodingCtx.getHFileContext().isCompressTags()) {
62        if (decodingCtx.getTagCompressionContext() != null) {
63          // It will be overhead to create the TagCompressionContext again and again for every block
64          // decoding.
65          decodingCtx.getTagCompressionContext().clear();
66        } else {
67          try {
68            TagCompressionContext tagCompressionContext = new TagCompressionContext(
69                LRUDictionary.class, Byte.MAX_VALUE);
70            decodingCtx.setTagCompressionContext(tagCompressionContext);
71          } catch (Exception e) {
72            throw new IOException("Failed to initialize TagCompressionContext", e);
73          }
74        }
75      }
76      return internalDecodeKeyValues(source, 0, 0, decodingCtx);
77    }
78  
79    protected static class SeekerState implements Cell {
80      protected ByteBuffer currentBuffer;
81      protected TagCompressionContext tagCompressionContext;
82      protected int valueOffset = -1;
83      protected int keyLength;
84      protected int valueLength;
85      protected int lastCommonPrefix;
86      protected int tagsLength = 0;
87      protected int tagsOffset = -1;
88      protected int tagsCompressedLength = 0;
89      protected boolean uncompressTags = true;
90  
91      /** We need to store a copy of the key. */
92      protected byte[] keyBuffer = new byte[INITIAL_KEY_BUFFER_SIZE];
93      protected byte[] tagsBuffer = new byte[INITIAL_KEY_BUFFER_SIZE];
94  
95      protected long memstoreTS;
96      protected int nextKvOffset;
97      protected KeyValue.KeyOnlyKeyValue currentKey = new KeyValue.KeyOnlyKeyValue();
98  
99      protected boolean isValid() {
100       return valueOffset != -1;
101     }
102 
103     protected void invalidate() {
104       valueOffset = -1;
105       tagsCompressedLength = 0;
106       currentKey = new KeyValue.KeyOnlyKeyValue();
107       uncompressTags = true;
108       currentBuffer = null;
109     }
110 
111     protected void ensureSpaceForKey() {
112       if (keyLength > keyBuffer.length) {
113         // rare case, but we need to handle arbitrary length of key
114         int newKeyBufferLength = Math.max(keyBuffer.length, 1) * 2;
115         while (keyLength > newKeyBufferLength) {
116           newKeyBufferLength *= 2;
117         }
118         byte[] newKeyBuffer = new byte[newKeyBufferLength];
119         System.arraycopy(keyBuffer, 0, newKeyBuffer, 0, keyBuffer.length);
120         keyBuffer = newKeyBuffer;
121       }
122     }
123 
124     protected void ensureSpaceForTags() {
125       if (tagsLength > tagsBuffer.length) {
126         // rare case, but we need to handle arbitrary length of tags
127         int newTagsBufferLength = Math.max(tagsBuffer.length, 1) * 2;
128         while (tagsLength > newTagsBufferLength) {
129           newTagsBufferLength *= 2;
130         }
131         byte[] newTagsBuffer = new byte[newTagsBufferLength];
132         System.arraycopy(tagsBuffer, 0, newTagsBuffer, 0, tagsBuffer.length);
133         tagsBuffer = newTagsBuffer;
134       }
135     }
136 
137     protected void createKeyOnlyKeyValue(byte[] keyBuffer, long memTS) {
138       currentKey.setKey(keyBuffer, 0, keyLength);
139       memstoreTS = memTS;
140     }
141 
142     /**
143      * Copy the state from the next one into this instance (the previous state
144      * placeholder). Used to save the previous state when we are advancing the
145      * seeker to the next key/value.
146      */
147     protected void copyFromNext(SeekerState nextState) {
148       if (keyBuffer.length != nextState.keyBuffer.length) {
149         keyBuffer = nextState.keyBuffer.clone();
150       } else if (!isValid()) {
151         // Note: we can only call isValid before we override our state, so this
152         // comes before all the assignments at the end of this method.
153         System.arraycopy(nextState.keyBuffer, 0, keyBuffer, 0,
154              nextState.keyLength);
155       } else {
156         // don't copy the common prefix between this key and the previous one
157         System.arraycopy(nextState.keyBuffer, nextState.lastCommonPrefix,
158             keyBuffer, nextState.lastCommonPrefix, nextState.keyLength
159                 - nextState.lastCommonPrefix);
160       }
161       currentKey = nextState.currentKey;
162 
163       valueOffset = nextState.valueOffset;
164       keyLength = nextState.keyLength;
165       valueLength = nextState.valueLength;
166       lastCommonPrefix = nextState.lastCommonPrefix;
167       nextKvOffset = nextState.nextKvOffset;
168       memstoreTS = nextState.memstoreTS;
169       currentBuffer = nextState.currentBuffer;
170       if (nextState.tagCompressionContext != null) {
171         tagCompressionContext = nextState.tagCompressionContext;
172       }
173     }
174 
175     @Override
176     public byte[] getRowArray() {
177       return currentKey.getRowArray();
178     }
179 
180     @Override
181     public int getRowOffset() {
182       return Bytes.SIZEOF_SHORT;
183     }
184 
185     @Override
186     public short getRowLength() {
187       return currentKey.getRowLength();
188     }
189 
190     @Override
191     public byte[] getFamilyArray() {
192       return currentKey.getFamilyArray();
193     }
194 
195     @Override
196     public int getFamilyOffset() {
197       return currentKey.getFamilyOffset();
198     }
199 
200     @Override
201     public byte getFamilyLength() {
202       return currentKey.getFamilyLength();
203     }
204 
205     @Override
206     public byte[] getQualifierArray() {
207       return currentKey.getQualifierArray();
208     }
209 
210     @Override
211     public int getQualifierOffset() {
212       return currentKey.getQualifierOffset();
213     }
214 
215     @Override
216     public int getQualifierLength() {
217       return currentKey.getQualifierLength();
218     }
219 
220     @Override
221     public long getTimestamp() {
222       return currentKey.getTimestamp();
223     }
224 
225     @Override
226     public byte getTypeByte() {
227       return currentKey.getTypeByte();
228     }
229 
230     @Override
231     public long getMvccVersion() {
232       return memstoreTS;
233     }
234 
235     @Override
236     public long getSequenceId() {
237       return memstoreTS;
238     }
239 
240     @Override
241     public byte[] getValueArray() {
242       return currentBuffer.array();
243     }
244 
245     @Override
246     public int getValueOffset() {
247       return currentBuffer.arrayOffset() + valueOffset;
248     }
249 
250     @Override
251     public int getValueLength() {
252       return valueLength;
253     }
254 
255     @Override
256     public byte[] getTagsArray() {
257       if (tagCompressionContext != null) {
258         return tagsBuffer;
259       }
260       return currentBuffer.array();
261     }
262 
263     @Override
264     public int getTagsOffset() {
265       if (tagCompressionContext != null) {
266         return 0;
267       }
268       return currentBuffer.arrayOffset() + tagsOffset;
269     }
270 
271     @Override
272     public int getTagsLength() {
273       return tagsLength;
274     }
275 
276     @Override
277     @Deprecated
278     public byte[] getValue() {
279       throw new UnsupportedOperationException("getValue() not supported");
280     }
281 
282     @Override
283     @Deprecated
284     public byte[] getFamily() {
285       throw new UnsupportedOperationException("getFamily() not supported");
286     }
287 
288     @Override
289     @Deprecated
290     public byte[] getQualifier() {
291       throw new UnsupportedOperationException("getQualifier() not supported");
292     }
293 
294     @Override
295     @Deprecated
296     public byte[] getRow() {
297       throw new UnsupportedOperationException("getRow() not supported");
298     }
299 
300     @Override
301     public String toString() {
302       KeyValue kv = KeyValueUtil.copyToNewKeyValue(this);
303       if (kv == null) {
304         return "null";
305       }
306       return kv.toString();
307     }
308 
309     public Cell shallowCopy() {
310       return new ClonedSeekerState(currentBuffer, keyBuffer, currentKey.getRowLength(),
311           currentKey.getFamilyOffset(), currentKey.getFamilyLength(), keyLength, 
312           currentKey.getQualifierOffset(), currentKey.getQualifierLength(),
313           currentKey.getTimestamp(), currentKey.getTypeByte(), valueLength, valueOffset,
314           memstoreTS, tagsOffset, tagsLength, tagCompressionContext, tagsBuffer);
315     }
316   }
317 
318   /**
319    * Copies only the key part of the keybuffer by doing a deep copy and passes the 
320    * seeker state members for taking a clone.
321    * Note that the value byte[] part is still pointing to the currentBuffer and the 
322    * represented by the valueOffset and valueLength
323    */
324   // We return this as a Cell to the upper layers of read flow and might try setting a new SeqId
325   // there. So this has to be an instance of SettableSequenceId. SeekerState need not be
326   // SettableSequenceId as we never return that to top layers. When we have to, we make
327   // ClonedSeekerState from it.
328   protected static class ClonedSeekerState implements Cell, SettableSequenceId {
329     private byte[] keyOnlyBuffer;
330     private ByteBuffer currentBuffer;
331     private short rowLength;
332     private int familyOffset;
333     private byte familyLength;
334     private int qualifierOffset;
335     private int qualifierLength;
336     private long timestamp;
337     private byte typeByte;
338     private int valueOffset;
339     private int valueLength;
340     private int tagsLength;
341     private int tagsOffset;
342     private byte[] cloneTagsBuffer;
343     private long seqId;
344     private TagCompressionContext tagCompressionContext;
345     
346     protected ClonedSeekerState(ByteBuffer currentBuffer, byte[] keyBuffer, short rowLength,
347         int familyOffset, byte familyLength, int keyLength, int qualOffset, int qualLength,
348         long timeStamp, byte typeByte, int valueLen, int valueOffset, long seqId,
349         int tagsOffset, int tagsLength, TagCompressionContext tagCompressionContext,
350         byte[] tagsBuffer) {
351       this.currentBuffer = currentBuffer;
352       keyOnlyBuffer = new byte[keyLength];
353       this.tagCompressionContext = tagCompressionContext;
354       this.rowLength = rowLength;
355       this.familyOffset = familyOffset;
356       this.familyLength = familyLength;
357       this.qualifierOffset = qualOffset;
358       this.qualifierLength = qualLength;
359       this.timestamp = timeStamp;
360       this.typeByte = typeByte;
361       this.valueLength = valueLen;
362       this.valueOffset = valueOffset;
363       this.tagsOffset = tagsOffset;
364       this.tagsLength = tagsLength;
365       System.arraycopy(keyBuffer, 0, keyOnlyBuffer, 0, keyLength);
366       if (tagCompressionContext != null) {
367         this.cloneTagsBuffer = new byte[tagsLength];
368         System.arraycopy(tagsBuffer, 0, this.cloneTagsBuffer, 0, tagsLength);
369       }
370       setSequenceId(seqId);
371     }
372 
373     @Override
374     public byte[] getRowArray() {
375       return keyOnlyBuffer;
376     }
377 
378     @Override
379     public byte[] getFamilyArray() {
380       return keyOnlyBuffer;
381     }
382 
383     @Override
384     public byte[] getQualifierArray() {
385       return keyOnlyBuffer;
386     }
387 
388     @Override
389     public int getRowOffset() {
390       return Bytes.SIZEOF_SHORT;
391     }
392 
393     @Override
394     public short getRowLength() {
395       return rowLength;
396     }
397 
398     @Override
399     public int getFamilyOffset() {
400       return familyOffset;
401     }
402 
403     @Override
404     public byte getFamilyLength() {
405       return familyLength;
406     }
407 
408     @Override
409     public int getQualifierOffset() {
410       return qualifierOffset;
411     }
412 
413     @Override
414     public int getQualifierLength() {
415       return qualifierLength;
416     }
417 
418     @Override
419     public long getTimestamp() {
420       return timestamp;
421     }
422 
423     @Override
424     public byte getTypeByte() {
425       return typeByte;
426     }
427 
428     @Override
429     @Deprecated
430     public long getMvccVersion() {
431       return getSequenceId();
432     }
433 
434     @Override
435     public long getSequenceId() {
436       return seqId;
437     }
438 
439     @Override
440     public byte[] getValueArray() {
441       return currentBuffer.array();
442     }
443 
444     @Override
445     public int getValueOffset() {
446       return currentBuffer.arrayOffset() + valueOffset;
447     }
448 
449     @Override
450     public int getValueLength() {
451       return valueLength;
452     }
453 
454     @Override
455     public byte[] getTagsArray() {
456       if (tagCompressionContext != null) {
457         return cloneTagsBuffer;
458       }
459       return currentBuffer.array();
460     }
461 
462     @Override
463     public int getTagsOffset() {
464       if (tagCompressionContext != null) {
465         return 0;
466       }
467       return currentBuffer.arrayOffset() + tagsOffset;
468     }
469 
470     @Override
471     public int getTagsLength() {
472       return tagsLength;
473     }
474 
475     @Override
476     @Deprecated
477     public byte[] getValue() {
478       throw new UnsupportedOperationException("getValue() not supported");
479     }
480 
481     @Override
482     @Deprecated
483     public byte[] getFamily() {
484       throw new UnsupportedOperationException("getFamily() not supported");
485     }
486 
487     @Override
488     @Deprecated
489     public byte[] getQualifier() {
490       throw new UnsupportedOperationException("getQualifier() not supported");
491     }
492 
493     @Override
494     @Deprecated
495     public byte[] getRow() {
496       throw new UnsupportedOperationException("getRow() not supported");
497     }
498 
499     @Override
500     public String toString() {
501       KeyValue kv = KeyValueUtil.copyToNewKeyValue(this);
502       if (kv == null) {
503         return "null";
504       }
505       return kv.toString();
506     }
507 
508     @Override
509     public void setSequenceId(long seqId) {
510       this.seqId = seqId;
511     }
512   }
513 
514   protected abstract static class
515       BufferedEncodedSeeker<STATE extends SeekerState>
516       implements EncodedSeeker {
517     protected HFileBlockDecodingContext decodingCtx;
518     protected final KVComparator comparator;
519     protected final SamePrefixComparator<byte[]> samePrefixComparator;
520     protected ByteBuffer currentBuffer;
521     protected STATE current = createSeekerState(); // always valid
522     protected STATE previous = createSeekerState(); // may not be valid
523     protected TagCompressionContext tagCompressionContext = null;
524 
525     public BufferedEncodedSeeker(KVComparator comparator,
526         HFileBlockDecodingContext decodingCtx) {
527       this.comparator = comparator;
528       this.samePrefixComparator = comparator;
529       this.decodingCtx = decodingCtx;
530       if (decodingCtx.getHFileContext().isCompressTags()) {
531         try {
532           tagCompressionContext = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
533         } catch (Exception e) {
534           throw new RuntimeException("Failed to initialize TagCompressionContext", e);
535         }
536       }
537     }
538     
539     protected boolean includesMvcc() {
540       return this.decodingCtx.getHFileContext().isIncludesMvcc();
541     }
542 
543     protected boolean includesTags() {
544       return this.decodingCtx.getHFileContext().isIncludesTags();
545     }
546 
547     @Override
548     public int compareKey(KVComparator comparator, byte[] key, int offset, int length) {
549       return comparator.compareFlatKey(key, offset, length,
550           current.keyBuffer, 0, current.keyLength);
551     }
552 
553     @Override
554     public int compareKey(KVComparator comparator, Cell key) {
555       return comparator.compareOnlyKeyPortion(key,
556           new KeyValue.KeyOnlyKeyValue(current.keyBuffer, 0, current.keyLength));
557     }
558 
559     @Override
560     public void setCurrentBuffer(ByteBuffer buffer) {
561       if (this.tagCompressionContext != null) {
562         this.tagCompressionContext.clear();
563       }
564       currentBuffer = buffer;
565       current.currentBuffer = currentBuffer;
566       if(tagCompressionContext != null) {
567         current.tagCompressionContext = tagCompressionContext;
568       }
569       decodeFirst();
570       current.createKeyOnlyKeyValue(current.keyBuffer, current.memstoreTS);
571       previous.invalidate();
572     }
573 
574     @Override
575     public ByteBuffer getKeyDeepCopy() {
576       ByteBuffer keyBuffer = ByteBuffer.allocate(current.keyLength);
577       keyBuffer.put(current.keyBuffer, 0, current.keyLength);
578       return keyBuffer;
579     }
580 
581     @Override
582     public ByteBuffer getValueShallowCopy() {
583       return ByteBuffer.wrap(currentBuffer.array(),
584           currentBuffer.arrayOffset() + current.valueOffset,
585           current.valueLength);
586     }
587 
588     @Override
589     public ByteBuffer getKeyValueBuffer() {
590       ByteBuffer kvBuffer = createKVBuffer();
591       kvBuffer.putInt(current.keyLength);
592       kvBuffer.putInt(current.valueLength);
593       kvBuffer.put(current.keyBuffer, 0, current.keyLength);
594       kvBuffer.put(currentBuffer.array(),
595           currentBuffer.arrayOffset() + current.valueOffset,
596           current.valueLength);
597       if (current.tagsLength > 0) {
598         // Put short as unsigned
599         kvBuffer.put((byte) (current.tagsLength >> 8 & 0xff));
600         kvBuffer.put((byte) (current.tagsLength & 0xff));
601         if (current.tagsOffset != -1) {
602           // the offset of the tags bytes in the underlying buffer is marked. So the temp
603           // buffer,tagsBuffer was not been used.
604           kvBuffer.put(currentBuffer.array(), currentBuffer.arrayOffset() + current.tagsOffset,
605               current.tagsLength);
606         } else {
607           // When tagsOffset is marked as -1, tag compression was present and so the tags were
608           // uncompressed into temp buffer, tagsBuffer. Let us copy it from there
609           kvBuffer.put(current.tagsBuffer, 0, current.tagsLength);
610         }
611       }
612       return kvBuffer;
613     }
614 
615     protected ByteBuffer createKVBuffer() {
616       int kvBufSize = (int) KeyValue.getKeyValueDataStructureSize(current.keyLength,
617           current.valueLength, current.tagsLength);
618       ByteBuffer kvBuffer = ByteBuffer.allocate(kvBufSize);
619       return kvBuffer;
620     }
621 
622     @Override
623     public Cell getKeyValue() {
624       return current.shallowCopy();
625     }
626 
627     @Override
628     public void rewind() {
629       currentBuffer.rewind();
630       if (tagCompressionContext != null) {
631         tagCompressionContext.clear();
632       }
633       decodeFirst();
634       current.createKeyOnlyKeyValue(current.keyBuffer, current.memstoreTS);
635       previous.invalidate();
636     }
637 
638     @Override
639     public boolean next() {
640       if (!currentBuffer.hasRemaining()) {
641         return false;
642       }
643       decodeNext();
644       current.createKeyOnlyKeyValue(current.keyBuffer, current.memstoreTS);
645       previous.invalidate();
646       return true;
647     }
648 
649     protected void decodeTags() {
650       current.tagsLength = ByteBufferUtils.readCompressedInt(currentBuffer);
651       if (tagCompressionContext != null) {
652         if (current.uncompressTags) {
653           // Tag compression is been used. uncompress it into tagsBuffer
654           current.ensureSpaceForTags();
655           try {
656             current.tagsCompressedLength = tagCompressionContext.uncompressTags(currentBuffer,
657                 current.tagsBuffer, 0, current.tagsLength);
658           } catch (IOException e) {
659             throw new RuntimeException("Exception while uncompressing tags", e);
660           }
661         } else {
662           ByteBufferUtils.skip(currentBuffer, current.tagsCompressedLength);
663           current.uncompressTags = true;// Reset this.
664         }
665         current.tagsOffset = -1;
666       } else {
667         // When tag compress is not used, let us not do temp copying of tags bytes into tagsBuffer.
668         // Just mark the tags Offset so as to create the KV buffer later in getKeyValueBuffer()
669         current.tagsOffset = currentBuffer.position();
670         ByteBufferUtils.skip(currentBuffer, current.tagsLength);
671       }
672     }
673 
674     @Override
675     public int seekToKeyInBlock(byte[] key, int offset, int length, boolean seekBefore) {
676       return seekToKeyInBlock(new KeyValue.KeyOnlyKeyValue(key, offset, length), seekBefore);
677     }
678 
679     @Override
680     public int seekToKeyInBlock(Cell seekCell, boolean seekBefore) {
681       int rowCommonPrefix = 0;
682       int familyCommonPrefix = 0;
683       int qualCommonPrefix = 0;
684       previous.invalidate();
685       KeyValue.KeyOnlyKeyValue currentCell = new KeyValue.KeyOnlyKeyValue();
686       do {
687         int comp;
688         if (samePrefixComparator != null) {
689           currentCell.setKey(current.keyBuffer, 0, current.keyLength);
690           if (current.lastCommonPrefix != 0) {
691             // The KV format has row key length also in the byte array. The
692             // common prefix
693             // includes it. So we need to subtract to find out the common prefix
694             // in the
695             // row part alone
696             rowCommonPrefix = Math.min(rowCommonPrefix, current.lastCommonPrefix - 2);
697           }
698           if (current.lastCommonPrefix <= 2) {
699             rowCommonPrefix = 0;
700           }
701           rowCommonPrefix += CellComparator.findCommonPrefixInRowPart(seekCell, currentCell,
702               rowCommonPrefix);
703           comp = CellComparator.compareCommonRowPrefix(seekCell, currentCell, rowCommonPrefix);
704           if (comp == 0) {
705             comp = compareTypeBytes(seekCell, currentCell);
706             if (comp == 0) {
707               // Subtract the fixed row key length and the family key fixed length
708               familyCommonPrefix = Math.max(
709                   0,
710                   Math.min(familyCommonPrefix,
711                       current.lastCommonPrefix - (3 + currentCell.getRowLength())));
712               familyCommonPrefix += CellComparator.findCommonPrefixInFamilyPart(seekCell,
713                   currentCell, familyCommonPrefix);
714               comp = CellComparator.compareCommonFamilyPrefix(seekCell, currentCell,
715                   familyCommonPrefix);
716               if (comp == 0) {
717                 // subtract the rowkey fixed length and the family key fixed
718                 // length
719                 qualCommonPrefix = Math.max(
720                     0,
721                     Math.min(
722                         qualCommonPrefix,
723                         current.lastCommonPrefix
724                             - (3 + currentCell.getRowLength() + currentCell.getFamilyLength())));
725                 qualCommonPrefix += CellComparator.findCommonPrefixInQualifierPart(seekCell,
726                     currentCell, qualCommonPrefix);
727                 comp = CellComparator.compareCommonQualifierPrefix(seekCell, currentCell,
728                     qualCommonPrefix);
729                 if (comp == 0) {
730                   comp = CellComparator.compareTimestamps(seekCell, currentCell);
731                   if (comp == 0) {
732                     // Compare types. Let the delete types sort ahead of puts;
733                     // i.e. types
734                     // of higher numbers sort before those of lesser numbers.
735                     // Maximum
736                     // (255)
737                     // appears ahead of everything, and minimum (0) appears
738                     // after
739                     // everything.
740                     comp = (0xff & currentCell.getTypeByte()) - (0xff & seekCell.getTypeByte());
741                   }
742                 }
743               }
744             }
745           }
746         } else {
747           Cell r = new KeyValue.KeyOnlyKeyValue(current.keyBuffer, 0, current.keyLength);
748           comp = comparator.compareOnlyKeyPortion(seekCell, r);
749         }
750         if (comp == 0) { // exact match
751           if (seekBefore) {
752             if (!previous.isValid()) {
753               // The caller (seekBefore) has to ensure that we are not at the
754               // first key in the block.
755               throw new IllegalStateException("Cannot seekBefore if "
756                   + "positioned at the first key in the block: key="
757                   + Bytes.toStringBinary(seekCell.getRowArray()));
758             }
759             moveToPrevious();
760             return 1;
761           }
762           return 0;
763         }
764 
765         if (comp < 0) { // already too large, check previous
766           if (previous.isValid()) {
767             moveToPrevious();
768           } else {
769             return HConstants.INDEX_KEY_MAGIC; // using optimized index key
770           }
771           return 1;
772         }
773 
774         // move to next, if more data is available
775         if (currentBuffer.hasRemaining()) {
776           previous.copyFromNext(current);
777           decodeNext();
778           current.createKeyOnlyKeyValue(current.keyBuffer, current.memstoreTS);
779         } else {
780           break;
781         }
782       } while (true);
783 
784       // we hit the end of the block, not an exact match
785       return 1;
786     }
787 
788     private int compareTypeBytes(Cell key, Cell right) {
789       if (key.getFamilyLength() + key.getQualifierLength() == 0
790           && key.getTypeByte() == Type.Minimum.getCode()) {
791         // left is "bigger", i.e. it appears later in the sorted order
792         return 1;
793       }
794       if (right.getFamilyLength() + right.getQualifierLength() == 0
795           && right.getTypeByte() == Type.Minimum.getCode()) {
796         return -1;
797       }
798       return 0;
799     }
800 
801 
802     private void moveToPrevious() {
803       if (!previous.isValid()) {
804         throw new IllegalStateException(
805             "Can move back only once and not in first key in the block.");
806       }
807 
808       STATE tmp = previous;
809       previous = current;
810       current = tmp;
811 
812       // move after last key value
813       currentBuffer.position(current.nextKvOffset);
814       // Already decoded the tag bytes. We cache this tags into current state and also the total
815       // compressed length of the tags bytes. For the next time decodeNext() we don't need to decode
816       // the tags again. This might pollute the Data Dictionary what we use for the compression.
817       // When current.uncompressTags is false, we will just reuse the current.tagsBuffer and skip
818       // 'tagsCompressedLength' bytes of source stream.
819       // See in decodeTags()
820       current.tagsBuffer = previous.tagsBuffer;
821       current.tagsCompressedLength = previous.tagsCompressedLength;
822       current.uncompressTags = false;
823       previous.invalidate();
824     }
825 
826     @SuppressWarnings("unchecked")
827     protected STATE createSeekerState() {
828       // This will fail for non-default seeker state if the subclass does not
829       // override this method.
830       return (STATE) new SeekerState();
831     }
832 
833     abstract protected void decodeFirst();
834     abstract protected void decodeNext();
835   }
836 
837   /**
838    * @param kv
839    * @param out
840    * @param encodingCtx
841    * @return unencoded size added
842    * @throws IOException
843    */
844   protected final int afterEncodingKeyValue(KeyValue kv, DataOutputStream out,
845       HFileBlockDefaultEncodingContext encodingCtx) throws IOException {
846     int size = 0;
847     if (encodingCtx.getHFileContext().isIncludesTags()) {
848       int tagsLength = kv.getTagsLength();
849       ByteBufferUtils.putCompressedInt(out, tagsLength);
850       // There are some tags to be written
851       if (tagsLength > 0) {
852         TagCompressionContext tagCompressionContext = encodingCtx.getTagCompressionContext();
853         // When tag compression is enabled, tagCompressionContext will have a not null value. Write
854         // the tags using Dictionary compression in such a case
855         if (tagCompressionContext != null) {
856           tagCompressionContext
857               .compressTags(out, kv.getTagsArray(), kv.getTagsOffset(), tagsLength);
858         } else {
859           out.write(kv.getTagsArray(), kv.getTagsOffset(), tagsLength);
860         }
861       }
862       size += tagsLength + KeyValue.TAGS_LENGTH_SIZE;
863     }
864     if (encodingCtx.getHFileContext().isIncludesMvcc()) {
865       // Copy memstore timestamp from the byte buffer to the output stream.
866       long memstoreTS = kv.getMvccVersion();
867       WritableUtils.writeVLong(out, memstoreTS);
868       // TODO use a writeVLong which returns the #bytes written so that 2 time parsing can be
869       // avoided.
870       size += WritableUtils.getVIntSize(memstoreTS);
871     }
872     return size;
873   }
874 
875   protected final void afterDecodingKeyValue(DataInputStream source,
876       ByteBuffer dest, HFileBlockDefaultDecodingContext decodingCtx) throws IOException {
877     if (decodingCtx.getHFileContext().isIncludesTags()) {
878       int tagsLength = ByteBufferUtils.readCompressedInt(source);
879       // Put as unsigned short
880       dest.put((byte) ((tagsLength >> 8) & 0xff));
881       dest.put((byte) (tagsLength & 0xff));
882       if (tagsLength > 0) {
883         TagCompressionContext tagCompressionContext = decodingCtx.getTagCompressionContext();
884         // When tag compression is been used in this file, tagCompressionContext will have a not
885         // null value passed.
886         if (tagCompressionContext != null) {
887           tagCompressionContext.uncompressTags(source, dest, tagsLength);
888         } else {
889           ByteBufferUtils.copyFromStreamToBuffer(dest, source, tagsLength);
890         }
891       }
892     }
893     if (decodingCtx.getHFileContext().isIncludesMvcc()) {
894       long memstoreTS = -1;
895       try {
896         // Copy memstore timestamp from the data input stream to the byte
897         // buffer.
898         memstoreTS = WritableUtils.readVLong(source);
899         ByteBufferUtils.writeVLong(dest, memstoreTS);
900       } catch (IOException ex) {
901         throw new RuntimeException("Unable to copy memstore timestamp " +
902             memstoreTS + " after decoding a key/value");
903       }
904     }
905   }
906 
907   @Override
908   public HFileBlockEncodingContext newDataBlockEncodingContext(DataBlockEncoding encoding,
909       byte[] header, HFileContext meta) {
910     return new HFileBlockDefaultEncodingContext(encoding, header, meta);
911   }
912 
913   @Override
914   public HFileBlockDecodingContext newDataBlockDecodingContext(HFileContext meta) {
915     return new HFileBlockDefaultDecodingContext(meta);
916   }
917 
918   protected abstract ByteBuffer internalDecodeKeyValues(DataInputStream source,
919       int allocateHeaderLength, int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx)
920       throws IOException;
921 
922   /**
923    * Asserts that there is at least the given amount of unfilled space
924    * remaining in the given buffer.
925    * @param out typically, the buffer we are writing to
926    * @param length the required space in the buffer
927    * @throws EncoderBufferTooSmallException If there are no enough bytes.
928    */
929   protected static void ensureSpace(ByteBuffer out, int length)
930       throws EncoderBufferTooSmallException {
931     if (out.position() + length > out.limit()) {
932       throw new EncoderBufferTooSmallException(
933           "Buffer position=" + out.position() +
934           ", buffer limit=" + out.limit() +
935           ", length to be written=" + length);
936     }
937   }
938 
939   @Override
940   public void startBlockEncoding(HFileBlockEncodingContext blkEncodingCtx, DataOutputStream out)
941       throws IOException {
942     if (blkEncodingCtx.getClass() != HFileBlockDefaultEncodingContext.class) {
943       throw new IOException (this.getClass().getName() + " only accepts "
944           + HFileBlockDefaultEncodingContext.class.getName() + " as the " +
945           "encoding context.");
946     }
947 
948     HFileBlockDefaultEncodingContext encodingCtx =
949         (HFileBlockDefaultEncodingContext) blkEncodingCtx;
950     encodingCtx.prepareEncoding(out);
951     if (encodingCtx.getHFileContext().isIncludesTags()
952         && encodingCtx.getHFileContext().isCompressTags()) {
953       if (encodingCtx.getTagCompressionContext() != null) {
954         // It will be overhead to create the TagCompressionContext again and again for every block
955         // encoding.
956         encodingCtx.getTagCompressionContext().clear();
957       } else {
958         try {
959           TagCompressionContext tagCompressionContext = new TagCompressionContext(
960               LRUDictionary.class, Byte.MAX_VALUE);
961           encodingCtx.setTagCompressionContext(tagCompressionContext);
962         } catch (Exception e) {
963           throw new IOException("Failed to initialize TagCompressionContext", e);
964         }
965       }
966     }
967     ByteBufferUtils.putInt(out, 0); // DUMMY length. This will be updated in endBlockEncoding()
968     blkEncodingCtx.setEncodingState(new BufferedDataBlockEncodingState());
969   }
970 
971   private static class BufferedDataBlockEncodingState extends EncodingState {
972     int unencodedDataSizeWritten = 0;
973   }
974 
975   @Override
976   public int encode(KeyValue kv, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
977       throws IOException {
978     BufferedDataBlockEncodingState state = (BufferedDataBlockEncodingState) encodingCtx
979         .getEncodingState();
980     int encodedKvSize = internalEncode(kv, (HFileBlockDefaultEncodingContext) encodingCtx, out);
981     state.unencodedDataSizeWritten += encodedKvSize;
982     return encodedKvSize;
983   }
984 
985   public abstract int internalEncode(KeyValue kv, HFileBlockDefaultEncodingContext encodingCtx,
986       DataOutputStream out) throws IOException;
987 
988   @Override
989   public void endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out,
990       byte[] uncompressedBytesWithHeader) throws IOException {
991     BufferedDataBlockEncodingState state = (BufferedDataBlockEncodingState) encodingCtx
992         .getEncodingState();
993     // Write the unencodedDataSizeWritten (with header size)
994     Bytes.putInt(uncompressedBytesWithHeader, HConstants.HFILEBLOCK_HEADER_SIZE
995         + DataBlockEncoding.ID_SIZE, state.unencodedDataSizeWritten
996         );
997     if (encodingCtx.getDataBlockEncoding() != DataBlockEncoding.NONE) {
998       encodingCtx.postEncoding(BlockType.ENCODED_DATA);
999     } else {
1000       encodingCtx.postEncoding(BlockType.DATA);
1001     }
1002   }
1003 }