001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.encoding;
019
020import java.io.DataInputStream;
021import java.io.DataOutputStream;
022import java.io.IOException;
023import java.io.OutputStream;
024import java.nio.ByteBuffer;
025import org.apache.hadoop.hbase.ByteBufferExtendedCell;
026import org.apache.hadoop.hbase.Cell;
027import org.apache.hadoop.hbase.CellComparator;
028import org.apache.hadoop.hbase.CellUtil;
029import org.apache.hadoop.hbase.ExtendedCell;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.KeyValue;
032import org.apache.hadoop.hbase.KeyValueUtil;
033import org.apache.hadoop.hbase.PrivateCellUtil;
034import org.apache.hadoop.hbase.io.TagCompressionContext;
035import org.apache.hadoop.hbase.io.util.LRUDictionary;
036import org.apache.hadoop.hbase.io.util.StreamUtils;
037import org.apache.hadoop.hbase.nio.ByteBuff;
038import org.apache.hadoop.hbase.util.ByteBufferUtils;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.apache.hadoop.hbase.util.ClassSize;
041import org.apache.hadoop.hbase.util.ObjectIntPair;
042import org.apache.hadoop.io.WritableUtils;
043import org.apache.yetus.audience.InterfaceAudience;
044
045/**
046 * Base class for all data block encoders that use a buffer.
047 */
048@InterfaceAudience.Private
049abstract class BufferedDataBlockEncoder extends AbstractDataBlockEncoder {
050  /**
051   * TODO: This datablockencoder is dealing in internals of hfileblocks. Purge reference to HFBs
052   */
053  private static int INITIAL_KEY_BUFFER_SIZE = 512;
054
055  @Override
056  public ByteBuffer decodeKeyValues(DataInputStream source,
057    HFileBlockDecodingContext blkDecodingCtx) throws IOException {
058    if (blkDecodingCtx.getClass() != HFileBlockDefaultDecodingContext.class) {
059      throw new IOException(this.getClass().getName() + " only accepts "
060        + HFileBlockDefaultDecodingContext.class.getName() + " as the decoding context.");
061    }
062
063    HFileBlockDefaultDecodingContext decodingCtx =
064      (HFileBlockDefaultDecodingContext) blkDecodingCtx;
065    if (
066      decodingCtx.getHFileContext().isIncludesTags()
067        && decodingCtx.getHFileContext().isCompressTags()
068    ) {
069      if (decodingCtx.getTagCompressionContext() != null) {
070        // It will be overhead to create the TagCompressionContext again and again for every block
071        // decoding.
072        decodingCtx.getTagCompressionContext().clear();
073      } else {
074        try {
075          TagCompressionContext tagCompressionContext =
076            new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
077          decodingCtx.setTagCompressionContext(tagCompressionContext);
078        } catch (Exception e) {
079          throw new IOException("Failed to initialize TagCompressionContext", e);
080        }
081      }
082    }
083    return internalDecodeKeyValues(source, 0, 0, decodingCtx);
084  }
085
086  /********************* common prefixes *************************/
087  // Having this as static is fine but if META is having DBE then we should
088  // change this.
089  public static int compareCommonRowPrefix(Cell left, Cell right, int rowCommonPrefix) {
090    if (left instanceof ByteBufferExtendedCell) {
091      ByteBufferExtendedCell bbLeft = (ByteBufferExtendedCell) left;
092      if (right instanceof ByteBufferExtendedCell) {
093        ByteBufferExtendedCell bbRight = (ByteBufferExtendedCell) right;
094        return ByteBufferUtils.compareTo(bbLeft.getRowByteBuffer(),
095          bbLeft.getRowPosition() + rowCommonPrefix, left.getRowLength() - rowCommonPrefix,
096          bbRight.getRowByteBuffer(), bbRight.getRowPosition() + rowCommonPrefix,
097          right.getRowLength() - rowCommonPrefix);
098      } else {
099        return ByteBufferUtils.compareTo(bbLeft.getRowByteBuffer(),
100          bbLeft.getRowPosition() + rowCommonPrefix, left.getRowLength() - rowCommonPrefix,
101          right.getRowArray(), right.getRowOffset() + rowCommonPrefix,
102          right.getRowLength() - rowCommonPrefix);
103      }
104    } else {
105      if (right instanceof ByteBufferExtendedCell) {
106        ByteBufferExtendedCell bbRight = (ByteBufferExtendedCell) right;
107        return ByteBufferUtils.compareTo(left.getRowArray(), left.getRowOffset() + rowCommonPrefix,
108          left.getRowLength() - rowCommonPrefix, bbRight.getRowByteBuffer(),
109          bbRight.getRowPosition() + rowCommonPrefix, right.getRowLength() - rowCommonPrefix);
110      } else {
111        return Bytes.compareTo(left.getRowArray(), left.getRowOffset() + rowCommonPrefix,
112          left.getRowLength() - rowCommonPrefix, right.getRowArray(),
113          right.getRowOffset() + rowCommonPrefix, right.getRowLength() - rowCommonPrefix);
114      }
115    }
116  }
117
118  public static int compareCommonFamilyPrefix(Cell left, Cell right, int familyCommonPrefix) {
119    if (left instanceof ByteBufferExtendedCell) {
120      ByteBufferExtendedCell bbLeft = (ByteBufferExtendedCell) left;
121      if (right instanceof ByteBufferExtendedCell) {
122        ByteBufferExtendedCell bbRight = (ByteBufferExtendedCell) right;
123        return ByteBufferUtils.compareTo(bbLeft.getFamilyByteBuffer(),
124          bbLeft.getFamilyPosition() + familyCommonPrefix,
125          left.getFamilyLength() - familyCommonPrefix, bbRight.getFamilyByteBuffer(),
126          bbRight.getFamilyPosition() + familyCommonPrefix,
127          right.getFamilyLength() - familyCommonPrefix);
128      } else {
129        return ByteBufferUtils.compareTo(bbLeft.getFamilyByteBuffer(),
130          bbLeft.getFamilyPosition() + familyCommonPrefix,
131          left.getFamilyLength() - familyCommonPrefix, right.getFamilyArray(),
132          right.getFamilyOffset() + familyCommonPrefix,
133          right.getFamilyLength() - familyCommonPrefix);
134      }
135    } else {
136      if (right instanceof ByteBufferExtendedCell) {
137        ByteBufferExtendedCell bbRight = (ByteBufferExtendedCell) right;
138        return ByteBufferUtils.compareTo(left.getFamilyArray(),
139          left.getFamilyOffset() + familyCommonPrefix, left.getFamilyLength() - familyCommonPrefix,
140          bbRight.getFamilyByteBuffer(), bbRight.getFamilyPosition() + familyCommonPrefix,
141          right.getFamilyLength() - familyCommonPrefix);
142      } else {
143        return Bytes.compareTo(left.getFamilyArray(), left.getFamilyOffset() + familyCommonPrefix,
144          left.getFamilyLength() - familyCommonPrefix, right.getFamilyArray(),
145          right.getFamilyOffset() + familyCommonPrefix,
146          right.getFamilyLength() - familyCommonPrefix);
147      }
148    }
149  }
150
151  public static int compareCommonQualifierPrefix(Cell left, Cell right, int qualCommonPrefix) {
152    if (left instanceof ByteBufferExtendedCell) {
153      ByteBufferExtendedCell bbLeft = (ByteBufferExtendedCell) left;
154      if (right instanceof ByteBufferExtendedCell) {
155        ByteBufferExtendedCell bbRight = (ByteBufferExtendedCell) right;
156        return ByteBufferUtils.compareTo(bbLeft.getQualifierByteBuffer(),
157          bbLeft.getQualifierPosition() + qualCommonPrefix,
158          left.getQualifierLength() - qualCommonPrefix, bbRight.getQualifierByteBuffer(),
159          bbRight.getQualifierPosition() + qualCommonPrefix,
160          right.getQualifierLength() - qualCommonPrefix);
161      } else {
162        return ByteBufferUtils.compareTo(bbLeft.getQualifierByteBuffer(),
163          bbLeft.getQualifierPosition() + qualCommonPrefix,
164          left.getQualifierLength() - qualCommonPrefix, right.getQualifierArray(),
165          right.getQualifierOffset() + qualCommonPrefix,
166          right.getQualifierLength() - qualCommonPrefix);
167      }
168    } else {
169      if (right instanceof ByteBufferExtendedCell) {
170        ByteBufferExtendedCell bbRight = (ByteBufferExtendedCell) right;
171        return ByteBufferUtils.compareTo(left.getQualifierArray(),
172          left.getQualifierOffset() + qualCommonPrefix,
173          left.getQualifierLength() - qualCommonPrefix, bbRight.getQualifierByteBuffer(),
174          bbRight.getQualifierPosition() + qualCommonPrefix,
175          right.getQualifierLength() - qualCommonPrefix);
176      } else {
177        return Bytes.compareTo(left.getQualifierArray(),
178          left.getQualifierOffset() + qualCommonPrefix,
179          left.getQualifierLength() - qualCommonPrefix, right.getQualifierArray(),
180          right.getQualifierOffset() + qualCommonPrefix,
181          right.getQualifierLength() - qualCommonPrefix);
182      }
183    }
184  }
185
186  protected static class SeekerState {
187    protected ByteBuff currentBuffer;
188    protected TagCompressionContext tagCompressionContext;
189    protected int valueOffset = -1;
190    protected int keyLength;
191    protected int valueLength;
192    protected int lastCommonPrefix;
193    protected int tagsLength = 0;
194    protected int tagsOffset = -1;
195    protected int tagsCompressedLength = 0;
196    protected boolean uncompressTags = true;
197
198    /** We need to store a copy of the key. */
199    protected byte[] keyBuffer = HConstants.EMPTY_BYTE_ARRAY;
200    protected byte[] tagsBuffer = HConstants.EMPTY_BYTE_ARRAY;
201
202    protected long memstoreTS;
203    protected int nextKvOffset;
204    protected KeyValue.KeyOnlyKeyValue currentKey = new KeyValue.KeyOnlyKeyValue();
205    // A temp pair object which will be reused by ByteBuff#asSubByteBuffer calls. This avoids too
206    // many object creations.
207    private final ObjectIntPair<ByteBuffer> tmpPair;
208    private final boolean includeTags;
209
210    public SeekerState(ObjectIntPair<ByteBuffer> tmpPair, boolean includeTags) {
211      this.tmpPair = tmpPair;
212      this.includeTags = includeTags;
213    }
214
215    protected boolean isValid() {
216      return valueOffset != -1;
217    }
218
219    protected void invalidate() {
220      valueOffset = -1;
221      tagsCompressedLength = 0;
222      currentKey.clear();
223      uncompressTags = true;
224      currentBuffer = null;
225    }
226
227    protected void ensureSpaceForKey() {
228      if (keyLength > keyBuffer.length) {
229        int newKeyBufferLength =
230          Integer.highestOneBit(Math.max(INITIAL_KEY_BUFFER_SIZE, keyLength) - 1) << 1;
231        byte[] newKeyBuffer = new byte[newKeyBufferLength];
232        System.arraycopy(keyBuffer, 0, newKeyBuffer, 0, keyBuffer.length);
233        keyBuffer = newKeyBuffer;
234      }
235    }
236
237    protected void ensureSpaceForTags() {
238      if (tagsLength > tagsBuffer.length) {
239        int newTagsBufferLength =
240          Integer.highestOneBit(Math.max(INITIAL_KEY_BUFFER_SIZE, tagsLength) - 1) << 1;
241        byte[] newTagsBuffer = new byte[newTagsBufferLength];
242        System.arraycopy(tagsBuffer, 0, newTagsBuffer, 0, tagsBuffer.length);
243        tagsBuffer = newTagsBuffer;
244      }
245    }
246
247    protected void setKey(byte[] keyBuffer, long memTS) {
248      currentKey.setKey(keyBuffer, 0, keyLength);
249      memstoreTS = memTS;
250    }
251
252    /**
253     * Copy the state from the next one into this instance (the previous state placeholder). Used to
254     * save the previous state when we are advancing the seeker to the next key/value.
255     */
256    protected void copyFromNext(SeekerState nextState) {
257      if (keyBuffer.length != nextState.keyBuffer.length) {
258        keyBuffer = nextState.keyBuffer.clone();
259      } else if (!isValid()) {
260        // Note: we can only call isValid before we override our state, so this
261        // comes before all the assignments at the end of this method.
262        System.arraycopy(nextState.keyBuffer, 0, keyBuffer, 0, nextState.keyLength);
263      } else {
264        // don't copy the common prefix between this key and the previous one
265        System.arraycopy(nextState.keyBuffer, nextState.lastCommonPrefix, keyBuffer,
266          nextState.lastCommonPrefix, nextState.keyLength - nextState.lastCommonPrefix);
267      }
268      currentKey.set(nextState.currentKey);
269
270      valueOffset = nextState.valueOffset;
271      keyLength = nextState.keyLength;
272      valueLength = nextState.valueLength;
273      lastCommonPrefix = nextState.lastCommonPrefix;
274      nextKvOffset = nextState.nextKvOffset;
275      memstoreTS = nextState.memstoreTS;
276      currentBuffer = nextState.currentBuffer;
277      tagsOffset = nextState.tagsOffset;
278      tagsLength = nextState.tagsLength;
279      if (nextState.tagCompressionContext != null) {
280        tagCompressionContext = nextState.tagCompressionContext;
281      }
282    }
283
284    public Cell toCell() {
285      // Buffer backing the value and tags part from the HFileBlock's buffer
286      // When tag compression in use, this will be only the value bytes area.
287      ByteBuffer valAndTagsBuffer;
288      int vOffset;
289      int valAndTagsLength = this.valueLength;
290      int tagsLenSerializationSize = 0;
291      if (this.includeTags && this.tagCompressionContext == null) {
292        // Include the tags part also. This will be the tags bytes + 2 bytes of for storing tags
293        // length
294        tagsLenSerializationSize = this.tagsOffset - (this.valueOffset + this.valueLength);
295        valAndTagsLength += tagsLenSerializationSize + this.tagsLength;
296      }
297      this.currentBuffer.asSubByteBuffer(this.valueOffset, valAndTagsLength, this.tmpPair);
298      valAndTagsBuffer = this.tmpPair.getFirst();
299      vOffset = this.tmpPair.getSecond();// This is the offset to value part in the BB
300      if (valAndTagsBuffer.hasArray()) {
301        return toOnheapCell(valAndTagsBuffer, vOffset, tagsLenSerializationSize);
302      } else {
303        return toOffheapCell(valAndTagsBuffer, vOffset, tagsLenSerializationSize);
304      }
305    }
306
307    private Cell toOnheapCell(ByteBuffer valAndTagsBuffer, int vOffset,
308      int tagsLenSerializationSize) {
309      byte[] tagsArray = HConstants.EMPTY_BYTE_ARRAY;
310      int tOffset = 0;
311      if (this.includeTags) {
312        if (this.tagCompressionContext == null) {
313          tagsArray = valAndTagsBuffer.array();
314          tOffset =
315            valAndTagsBuffer.arrayOffset() + vOffset + this.valueLength + tagsLenSerializationSize;
316        } else {
317          tagsArray = Bytes.copy(tagsBuffer, 0, this.tagsLength);
318          tOffset = 0;
319        }
320      }
321      return new OnheapDecodedCell(Bytes.copy(keyBuffer, 0, this.keyLength),
322        currentKey.getRowLength(), currentKey.getFamilyOffset(), currentKey.getFamilyLength(),
323        currentKey.getQualifierOffset(), currentKey.getQualifierLength(), currentKey.getTimestamp(),
324        currentKey.getTypeByte(), valAndTagsBuffer.array(),
325        valAndTagsBuffer.arrayOffset() + vOffset, this.valueLength, memstoreTS, tagsArray, tOffset,
326        this.tagsLength);
327    }
328
329    private Cell toOffheapCell(ByteBuffer valAndTagsBuffer, int vOffset,
330      int tagsLenSerializationSize) {
331      ByteBuffer tagsBuf = HConstants.EMPTY_BYTE_BUFFER;
332      int tOffset = 0;
333      if (this.includeTags) {
334        if (this.tagCompressionContext == null) {
335          tagsBuf = valAndTagsBuffer;
336          tOffset = vOffset + this.valueLength + tagsLenSerializationSize;
337        } else {
338          tagsBuf = ByteBuffer.wrap(Bytes.copy(tagsBuffer, 0, this.tagsLength));
339          tOffset = 0;
340        }
341      }
342      return new OffheapDecodedExtendedCell(
343        ByteBuffer.wrap(Bytes.copy(keyBuffer, 0, this.keyLength)), currentKey.getRowLength(),
344        currentKey.getFamilyOffset(), currentKey.getFamilyLength(), currentKey.getQualifierOffset(),
345        currentKey.getQualifierLength(), currentKey.getTimestamp(), currentKey.getTypeByte(),
346        valAndTagsBuffer, vOffset, this.valueLength, memstoreTS, tagsBuf, tOffset, this.tagsLength);
347    }
348  }
349
350  /**
351   * Copies only the key part of the keybuffer by doing a deep copy and passes the seeker state
352   * members for taking a clone. Note that the value byte[] part is still pointing to the
353   * currentBuffer and represented by the valueOffset and valueLength
354   */
355  // We return this as a Cell to the upper layers of read flow and might try setting a new SeqId
356  // there. So this has to be an instance of ExtendedCell.
357  protected static class OnheapDecodedCell implements ExtendedCell {
358    private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
359      + (3 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT)
360      + Bytes.SIZEOF_SHORT + (2 * Bytes.SIZEOF_BYTE) + (3 * ClassSize.ARRAY));
361    private byte[] keyOnlyBuffer;
362    private short rowLength;
363    private int familyOffset;
364    private byte familyLength;
365    private int qualifierOffset;
366    private int qualifierLength;
367    private long timeStamp;
368    private byte typeByte;
369    private byte[] valueBuffer;
370    private int valueOffset;
371    private int valueLength;
372    private byte[] tagsBuffer;
373    private int tagsOffset;
374    private int tagsLength;
375    private long seqId;
376
377    protected OnheapDecodedCell(byte[] keyBuffer, short rowLength, int familyOffset,
378      byte familyLength, int qualOffset, int qualLength, long timeStamp, byte typeByte,
379      byte[] valueBuffer, int valueOffset, int valueLen, long seqId, byte[] tagsBuffer,
380      int tagsOffset, int tagsLength) {
381      this.keyOnlyBuffer = keyBuffer;
382      this.rowLength = rowLength;
383      this.familyOffset = familyOffset;
384      this.familyLength = familyLength;
385      this.qualifierOffset = qualOffset;
386      this.qualifierLength = qualLength;
387      this.timeStamp = timeStamp;
388      this.typeByte = typeByte;
389      this.valueBuffer = valueBuffer;
390      this.valueOffset = valueOffset;
391      this.valueLength = valueLen;
392      this.tagsBuffer = tagsBuffer;
393      this.tagsOffset = tagsOffset;
394      this.tagsLength = tagsLength;
395      setSequenceId(seqId);
396    }
397
398    @Override
399    public byte[] getRowArray() {
400      return keyOnlyBuffer;
401    }
402
403    @Override
404    public byte[] getFamilyArray() {
405      return keyOnlyBuffer;
406    }
407
408    @Override
409    public byte[] getQualifierArray() {
410      return keyOnlyBuffer;
411    }
412
413    @Override
414    public int getRowOffset() {
415      return Bytes.SIZEOF_SHORT;
416    }
417
418    @Override
419    public short getRowLength() {
420      return rowLength;
421    }
422
423    @Override
424    public int getFamilyOffset() {
425      return familyOffset;
426    }
427
428    @Override
429    public byte getFamilyLength() {
430      return familyLength;
431    }
432
433    @Override
434    public int getQualifierOffset() {
435      return qualifierOffset;
436    }
437
438    @Override
439    public int getQualifierLength() {
440      return qualifierLength;
441    }
442
443    @Override
444    public long getTimestamp() {
445      return timeStamp;
446    }
447
448    @Override
449    public byte getTypeByte() {
450      return typeByte;
451    }
452
453    @Override
454    public long getSequenceId() {
455      return seqId;
456    }
457
458    @Override
459    public byte[] getValueArray() {
460      return this.valueBuffer;
461    }
462
463    @Override
464    public int getValueOffset() {
465      return valueOffset;
466    }
467
468    @Override
469    public int getValueLength() {
470      return valueLength;
471    }
472
473    @Override
474    public byte[] getTagsArray() {
475      return this.tagsBuffer;
476    }
477
478    @Override
479    public int getTagsOffset() {
480      return this.tagsOffset;
481    }
482
483    @Override
484    public int getTagsLength() {
485      return tagsLength;
486    }
487
488    @Override
489    public String toString() {
490      return KeyValue.keyToString(this.keyOnlyBuffer, 0, KeyValueUtil.keyLength(this)) + "/vlen="
491        + getValueLength() + "/seqid=" + seqId;
492    }
493
494    @Override
495    public void setSequenceId(long seqId) {
496      this.seqId = seqId;
497    }
498
499    @Override
500    public long heapSize() {
501      return FIXED_OVERHEAD + rowLength + familyLength + qualifierLength + valueLength + tagsLength;
502    }
503
504    @Override
505    public int write(OutputStream out, boolean withTags) throws IOException {
506      int lenToWrite = getSerializedSize(withTags);
507      ByteBufferUtils.putInt(out, keyOnlyBuffer.length);
508      ByteBufferUtils.putInt(out, valueLength);
509      // Write key
510      out.write(keyOnlyBuffer);
511      // Write value
512      out.write(this.valueBuffer, this.valueOffset, this.valueLength);
513      if (withTags && this.tagsLength > 0) {
514        // 2 bytes tags length followed by tags bytes
515        // tags length is serialized with 2 bytes only(short way) even if the type is int.
516        // As this is non -ve numbers, we save the sign bit. See HBASE-11437
517        out.write((byte) (0xff & (this.tagsLength >> 8)));
518        out.write((byte) (0xff & this.tagsLength));
519        out.write(this.tagsBuffer, this.tagsOffset, this.tagsLength);
520      }
521      return lenToWrite;
522    }
523
524    @Override
525    public int getSerializedSize(boolean withTags) {
526      return KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength, tagsLength,
527        withTags);
528    }
529
530    @Override
531    public void write(ByteBuffer buf, int offset) {
532      // This is not used in actual flow. Throwing UnsupportedOperationException
533      throw new UnsupportedOperationException();
534    }
535
536    @Override
537    public void setTimestamp(long ts) throws IOException {
538      // This is not used in actual flow. Throwing UnsupportedOperationException
539      throw new UnsupportedOperationException();
540    }
541
542    @Override
543    public void setTimestamp(byte[] ts) throws IOException {
544      // This is not used in actual flow. Throwing UnsupportedOperationException
545      throw new UnsupportedOperationException();
546    }
547
548    @Override
549    public ExtendedCell deepClone() {
550      // This is not used in actual flow. Throwing UnsupportedOperationException
551      throw new UnsupportedOperationException();
552    }
553  }
554
555  protected static class OffheapDecodedExtendedCell extends ByteBufferExtendedCell {
556    private static final long FIXED_OVERHEAD =
557      (long) ClassSize.align(ClassSize.OBJECT + (3 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG)
558        + (7 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE)
559        + (3 * ClassSize.BYTE_BUFFER);
560    private ByteBuffer keyBuffer;
561    private short rowLength;
562    private int familyOffset;
563    private byte familyLength;
564    private int qualifierOffset;
565    private int qualifierLength;
566    private long timeStamp;
567    private byte typeByte;
568    private ByteBuffer valueBuffer;
569    private int valueOffset;
570    private int valueLength;
571    private ByteBuffer tagsBuffer;
572    private int tagsOffset;
573    private int tagsLength;
574    private long seqId;
575
576    protected OffheapDecodedExtendedCell(ByteBuffer keyBuffer, short rowLength, int familyOffset,
577      byte familyLength, int qualOffset, int qualLength, long timeStamp, byte typeByte,
578      ByteBuffer valueBuffer, int valueOffset, int valueLen, long seqId, ByteBuffer tagsBuffer,
579      int tagsOffset, int tagsLength) {
580      // The keyBuffer is always onheap
581      assert keyBuffer.hasArray();
582      assert keyBuffer.arrayOffset() == 0;
583      this.keyBuffer = keyBuffer;
584      this.rowLength = rowLength;
585      this.familyOffset = familyOffset;
586      this.familyLength = familyLength;
587      this.qualifierOffset = qualOffset;
588      this.qualifierLength = qualLength;
589      this.timeStamp = timeStamp;
590      this.typeByte = typeByte;
591      this.valueBuffer = valueBuffer;
592      this.valueOffset = valueOffset;
593      this.valueLength = valueLen;
594      this.tagsBuffer = tagsBuffer;
595      this.tagsOffset = tagsOffset;
596      this.tagsLength = tagsLength;
597      setSequenceId(seqId);
598    }
599
600    @Override
601    @SuppressWarnings("ByteBufferBackingArray")
602    public byte[] getRowArray() {
603      return this.keyBuffer.array();
604    }
605
606    @Override
607    public int getRowOffset() {
608      return getRowPosition();
609    }
610
611    @Override
612    public short getRowLength() {
613      return this.rowLength;
614    }
615
616    @Override
617    @SuppressWarnings("ByteBufferBackingArray")
618    public byte[] getFamilyArray() {
619      return this.keyBuffer.array();
620    }
621
622    @Override
623    public int getFamilyOffset() {
624      return getFamilyPosition();
625    }
626
627    @Override
628    public byte getFamilyLength() {
629      return this.familyLength;
630    }
631
632    @Override
633    @SuppressWarnings("ByteBufferBackingArray")
634    public byte[] getQualifierArray() {
635      return this.keyBuffer.array();
636    }
637
638    @Override
639    public int getQualifierOffset() {
640      return getQualifierPosition();
641    }
642
643    @Override
644    public int getQualifierLength() {
645      return this.qualifierLength;
646    }
647
648    @Override
649    public long getTimestamp() {
650      return this.timeStamp;
651    }
652
653    @Override
654    public byte getTypeByte() {
655      return this.typeByte;
656    }
657
658    @Override
659    public long getSequenceId() {
660      return this.seqId;
661    }
662
663    @Override
664    public byte[] getValueArray() {
665      return CellUtil.cloneValue(this);
666    }
667
668    @Override
669    public int getValueOffset() {
670      return 0;
671    }
672
673    @Override
674    public int getValueLength() {
675      return this.valueLength;
676    }
677
678    @Override
679    public byte[] getTagsArray() {
680      return PrivateCellUtil.cloneTags(this);
681    }
682
683    @Override
684    public int getTagsOffset() {
685      return 0;
686    }
687
688    @Override
689    public int getTagsLength() {
690      return this.tagsLength;
691    }
692
693    @Override
694    public ByteBuffer getRowByteBuffer() {
695      return this.keyBuffer;
696    }
697
698    @Override
699    public int getRowPosition() {
700      return Bytes.SIZEOF_SHORT;
701    }
702
703    @Override
704    public ByteBuffer getFamilyByteBuffer() {
705      return this.keyBuffer;
706    }
707
708    @Override
709    public int getFamilyPosition() {
710      return this.familyOffset;
711    }
712
713    @Override
714    public ByteBuffer getQualifierByteBuffer() {
715      return this.keyBuffer;
716    }
717
718    @Override
719    public int getQualifierPosition() {
720      return this.qualifierOffset;
721    }
722
723    @Override
724    public ByteBuffer getValueByteBuffer() {
725      return this.valueBuffer;
726    }
727
728    @Override
729    public int getValuePosition() {
730      return this.valueOffset;
731    }
732
733    @Override
734    public ByteBuffer getTagsByteBuffer() {
735      return this.tagsBuffer;
736    }
737
738    @Override
739    public int getTagsPosition() {
740      return this.tagsOffset;
741    }
742
743    @Override
744    public long heapSize() {
745      return FIXED_OVERHEAD;
746    }
747
748    @Override
749    public void setSequenceId(long seqId) {
750      this.seqId = seqId;
751    }
752
753    @Override
754    public int write(OutputStream out, boolean withTags) throws IOException {
755      int lenToWrite = getSerializedSize(withTags);
756      ByteBufferUtils.putInt(out, keyBuffer.remaining());
757      ByteBufferUtils.putInt(out, valueLength);
758      // Write key
759      out.write(keyBuffer.array(), keyBuffer.arrayOffset(), keyBuffer.remaining());
760      // Write value
761      ByteBufferUtils.copyBufferToStream(out, this.valueBuffer, this.valueOffset, this.valueLength);
762      if (withTags && this.tagsLength > 0) {
763        // 2 bytes tags length followed by tags bytes
764        // tags length is serialized with 2 bytes only(short way) even if the type is int.
765        // As this is non -ve numbers, we save the sign bit. See HBASE-11437
766        out.write((byte) (0xff & (this.tagsLength >> 8)));
767        out.write((byte) (0xff & this.tagsLength));
768        ByteBufferUtils.copyBufferToStream(out, this.tagsBuffer, this.tagsOffset, this.tagsLength);
769      }
770      return lenToWrite;
771    }
772
773    @Override
774    public int getSerializedSize(boolean withTags) {
775      return KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength, tagsLength,
776        withTags);
777    }
778
779    @Override
780    public void setTimestamp(long ts) throws IOException {
781      // This is not used in actual flow. Throwing UnsupportedOperationException
782      throw new UnsupportedOperationException();
783    }
784
785    @Override
786    public void setTimestamp(byte[] ts) throws IOException {
787      // This is not used in actual flow. Throwing UnsupportedOperationException
788      throw new UnsupportedOperationException();
789    }
790
791    @Override
792    public void write(ByteBuffer buf, int offset) {
793      // This is not used in actual flow. Throwing UnsupportedOperationException
794      throw new UnsupportedOperationException();
795    }
796
797    @Override
798    public ExtendedCell deepClone() {
799      // This is not used in actual flow. Throwing UnsupportedOperationException
800      throw new UnsupportedOperationException();
801    }
802  }
803
804  protected abstract static class BufferedEncodedSeeker<STATE extends SeekerState>
805    extends AbstractEncodedSeeker {
806    protected ByteBuff currentBuffer;
807    protected TagCompressionContext tagCompressionContext = null;
808    protected KeyValue.KeyOnlyKeyValue keyOnlyKV = new KeyValue.KeyOnlyKeyValue();
809    // A temp pair object which will be reused by ByteBuff#asSubByteBuffer calls. This avoids too
810    // many object creations.
811    protected final ObjectIntPair<ByteBuffer> tmpPair = new ObjectIntPair<>();
812    protected STATE current, previous;
813
814    public BufferedEncodedSeeker(HFileBlockDecodingContext decodingCtx) {
815      super(decodingCtx);
816      if (decodingCtx.getHFileContext().isCompressTags()) {
817        try {
818          tagCompressionContext = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
819        } catch (Exception e) {
820          throw new RuntimeException("Failed to initialize TagCompressionContext", e);
821        }
822      }
823      current = createSeekerState(); // always valid
824      previous = createSeekerState(); // may not be valid
825    }
826
827    @Override
828    public int compareKey(CellComparator comparator, Cell key) {
829      keyOnlyKV.setKey(current.keyBuffer, 0, current.keyLength);
830      return PrivateCellUtil.compareKeyIgnoresMvcc(comparator, key, keyOnlyKV);
831    }
832
833    @Override
834    public void setCurrentBuffer(ByteBuff buffer) {
835      if (this.tagCompressionContext != null) {
836        this.tagCompressionContext.clear();
837      }
838      currentBuffer = buffer;
839      current.currentBuffer = currentBuffer;
840      if (tagCompressionContext != null) {
841        current.tagCompressionContext = tagCompressionContext;
842      }
843      decodeFirst();
844      current.setKey(current.keyBuffer, current.memstoreTS);
845      previous.invalidate();
846    }
847
848    @Override
849    public Cell getKey() {
850      byte[] key = new byte[current.keyLength];
851      System.arraycopy(current.keyBuffer, 0, key, 0, current.keyLength);
852      return new KeyValue.KeyOnlyKeyValue(key);
853    }
854
855    @Override
856    public ByteBuffer getValueShallowCopy() {
857      currentBuffer.asSubByteBuffer(current.valueOffset, current.valueLength, tmpPair);
858      ByteBuffer dup = tmpPair.getFirst().duplicate();
859      dup.position(tmpPair.getSecond());
860      dup.limit(tmpPair.getSecond() + current.valueLength);
861      return dup.slice();
862    }
863
864    @Override
865    public Cell getCell() {
866      return current.toCell();
867    }
868
869    @Override
870    public void rewind() {
871      currentBuffer.rewind();
872      if (tagCompressionContext != null) {
873        tagCompressionContext.clear();
874        // Prior seekToKeyInBlock may have reset this to false if we fell back to previous
875        // seeker state. This is an optimization so we don't have to uncompress tags again when
876        // reading last state.
877        // In case of rewind, we are starting from the beginning of the buffer, so we need
878        // to uncompress any tags we see.
879        // It may make sense to reset this in setCurrentBuffer as well, but we seem to only call
880        // setCurrentBuffer after StoreFileScanner.seekAtOrAfter which calls next to consume the
881        // seeker state. Rewind is called by seekBefore, which doesn't and leaves us in this state.
882        current.uncompressTags = true;
883      }
884      decodeFirst();
885      current.setKey(current.keyBuffer, current.memstoreTS);
886      previous.invalidate();
887    }
888
889    @Override
890    public boolean next() {
891      if (!currentBuffer.hasRemaining()) {
892        return false;
893      }
894      decodeNext();
895      current.setKey(current.keyBuffer, current.memstoreTS);
896      previous.invalidate();
897      return true;
898    }
899
900    protected void decodeTags() {
901      current.tagsLength = ByteBuff.readCompressedInt(currentBuffer);
902      if (tagCompressionContext != null) {
903        if (current.uncompressTags) {
904          // Tag compression is been used. uncompress it into tagsBuffer
905          current.ensureSpaceForTags();
906          try {
907            current.tagsCompressedLength = tagCompressionContext.uncompressTags(currentBuffer,
908              current.tagsBuffer, 0, current.tagsLength);
909          } catch (Exception e) {
910            throw new RuntimeException("Exception while uncompressing tags", e);
911          }
912        } else {
913          currentBuffer.skip(current.tagsCompressedLength);
914          current.uncompressTags = true;// Reset this.
915        }
916        current.tagsOffset = -1;
917      } else {
918        // When tag compress is not used, let us not do copying of tags bytes into tagsBuffer.
919        // Just mark the tags Offset so as to create the KV buffer later in getKeyValueBuffer()
920        current.tagsOffset = currentBuffer.position();
921        currentBuffer.skip(current.tagsLength);
922      }
923    }
924
925    @Override
926    public int seekToKeyInBlock(Cell seekCell, boolean seekBefore) {
927      int rowCommonPrefix = 0;
928      int familyCommonPrefix = 0;
929      int qualCommonPrefix = 0;
930      previous.invalidate();
931      do {
932        int comp;
933        keyOnlyKV.setKey(current.keyBuffer, 0, current.keyLength);
934        if (current.lastCommonPrefix != 0) {
935          // The KV format has row key length also in the byte array. The
936          // common prefix
937          // includes it. So we need to subtract to find out the common prefix
938          // in the
939          // row part alone
940          rowCommonPrefix = Math.min(rowCommonPrefix, current.lastCommonPrefix - 2);
941        }
942        if (current.lastCommonPrefix <= 2) {
943          rowCommonPrefix = 0;
944        }
945        rowCommonPrefix += findCommonPrefixInRowPart(seekCell, keyOnlyKV, rowCommonPrefix);
946        comp = compareCommonRowPrefix(seekCell, keyOnlyKV, rowCommonPrefix);
947        if (comp == 0) {
948          comp = compareTypeBytes(seekCell, keyOnlyKV);
949          if (comp == 0) {
950            // Subtract the fixed row key length and the family key fixed length
951            familyCommonPrefix = Math.max(0, Math.min(familyCommonPrefix,
952              current.lastCommonPrefix - (3 + keyOnlyKV.getRowLength())));
953            familyCommonPrefix +=
954              findCommonPrefixInFamilyPart(seekCell, keyOnlyKV, familyCommonPrefix);
955            comp = compareCommonFamilyPrefix(seekCell, keyOnlyKV, familyCommonPrefix);
956            if (comp == 0) {
957              // subtract the rowkey fixed length and the family key fixed
958              // length
959              qualCommonPrefix = Math.max(0, Math.min(qualCommonPrefix, current.lastCommonPrefix
960                - (3 + keyOnlyKV.getRowLength() + keyOnlyKV.getFamilyLength())));
961              qualCommonPrefix +=
962                findCommonPrefixInQualifierPart(seekCell, keyOnlyKV, qualCommonPrefix);
963              comp = compareCommonQualifierPrefix(seekCell, keyOnlyKV, qualCommonPrefix);
964              if (comp == 0) {
965                comp = CellComparator.getInstance().compareTimestamps(seekCell, keyOnlyKV);
966                if (comp == 0) {
967                  // Compare types. Let the delete types sort ahead of puts;
968                  // i.e. types
969                  // of higher numbers sort before those of lesser numbers.
970                  // Maximum
971                  // (255)
972                  // appears ahead of everything, and minimum (0) appears
973                  // after
974                  // everything.
975                  comp = (0xff & keyOnlyKV.getTypeByte()) - (0xff & seekCell.getTypeByte());
976                }
977              }
978            }
979          }
980        }
981        if (comp == 0) { // exact match
982          if (seekBefore) {
983            if (!previous.isValid()) {
984              // The caller (seekBefore) has to ensure that we are not at the
985              // first key in the block.
986              throw new IllegalStateException(
987                "Cannot seekBefore if " + "positioned at the first key in the block: key="
988                  + Bytes.toStringBinary(seekCell.getRowArray()));
989            }
990            moveToPrevious();
991            return 1;
992          }
993          return 0;
994        }
995
996        if (comp < 0) { // already too large, check previous
997          if (previous.isValid()) {
998            moveToPrevious();
999          } else {
1000            return HConstants.INDEX_KEY_MAGIC; // using optimized index key
1001          }
1002          return 1;
1003        }
1004
1005        // move to next, if more data is available
1006        if (currentBuffer.hasRemaining()) {
1007          previous.copyFromNext(current);
1008          decodeNext();
1009          current.setKey(current.keyBuffer, current.memstoreTS);
1010        } else {
1011          break;
1012        }
1013      } while (true);
1014
1015      // we hit the end of the block, not an exact match
1016      return 1;
1017    }
1018
1019    private int compareTypeBytes(Cell key, Cell right) {
1020      if (
1021        key.getFamilyLength() + key.getQualifierLength() == 0
1022          && key.getTypeByte() == KeyValue.Type.Minimum.getCode()
1023      ) {
1024        // left is "bigger", i.e. it appears later in the sorted order
1025        return 1;
1026      }
1027      if (
1028        right.getFamilyLength() + right.getQualifierLength() == 0
1029          && right.getTypeByte() == KeyValue.Type.Minimum.getCode()
1030      ) {
1031        return -1;
1032      }
1033      return 0;
1034    }
1035
1036    // These findCommonPrefix* methods rely on the fact that keyOnlyKv is the "right" cell argument
1037    // and always on-heap
1038
1039    private static int findCommonPrefixInRowPart(Cell left, KeyValue.KeyOnlyKeyValue right,
1040      int rowCommonPrefix) {
1041      if (left instanceof ByteBufferExtendedCell) {
1042        ByteBufferExtendedCell bbLeft = (ByteBufferExtendedCell) left;
1043        return ByteBufferUtils.findCommonPrefix(bbLeft.getRowByteBuffer(),
1044          bbLeft.getRowPosition() + rowCommonPrefix, left.getRowLength() - rowCommonPrefix,
1045          right.getRowArray(), right.getRowOffset() + rowCommonPrefix,
1046          right.getRowLength() - rowCommonPrefix);
1047      } else {
1048        return Bytes.findCommonPrefix(left.getRowArray(), right.getRowArray(),
1049          left.getRowLength() - rowCommonPrefix, right.getRowLength() - rowCommonPrefix,
1050          left.getRowOffset() + rowCommonPrefix, right.getRowOffset() + rowCommonPrefix);
1051      }
1052    }
1053
1054    private static int findCommonPrefixInFamilyPart(Cell left, KeyValue.KeyOnlyKeyValue right,
1055      int familyCommonPrefix) {
1056      if (left instanceof ByteBufferExtendedCell) {
1057        ByteBufferExtendedCell bbLeft = (ByteBufferExtendedCell) left;
1058        return ByteBufferUtils.findCommonPrefix(bbLeft.getFamilyByteBuffer(),
1059          bbLeft.getFamilyPosition() + familyCommonPrefix,
1060          left.getFamilyLength() - familyCommonPrefix, right.getFamilyArray(),
1061          right.getFamilyOffset() + familyCommonPrefix,
1062          right.getFamilyLength() - familyCommonPrefix);
1063      } else {
1064        return Bytes.findCommonPrefix(left.getFamilyArray(), right.getFamilyArray(),
1065          left.getFamilyLength() - familyCommonPrefix, right.getFamilyLength() - familyCommonPrefix,
1066          left.getFamilyOffset() + familyCommonPrefix,
1067          right.getFamilyOffset() + familyCommonPrefix);
1068      }
1069    }
1070
1071    private static int findCommonPrefixInQualifierPart(Cell left, KeyValue.KeyOnlyKeyValue right,
1072      int qualifierCommonPrefix) {
1073      if (left instanceof ByteBufferExtendedCell) {
1074        ByteBufferExtendedCell bbLeft = (ByteBufferExtendedCell) left;
1075        return ByteBufferUtils.findCommonPrefix(bbLeft.getQualifierByteBuffer(),
1076          bbLeft.getQualifierPosition() + qualifierCommonPrefix,
1077          left.getQualifierLength() - qualifierCommonPrefix, right.getQualifierArray(),
1078          right.getQualifierOffset() + qualifierCommonPrefix,
1079          right.getQualifierLength() - qualifierCommonPrefix);
1080      } else {
1081        return Bytes.findCommonPrefix(left.getQualifierArray(), right.getQualifierArray(),
1082          left.getQualifierLength() - qualifierCommonPrefix,
1083          right.getQualifierLength() - qualifierCommonPrefix,
1084          left.getQualifierOffset() + qualifierCommonPrefix,
1085          right.getQualifierOffset() + qualifierCommonPrefix);
1086      }
1087    }
1088
1089    private void moveToPrevious() {
1090      if (!previous.isValid()) {
1091        throw new IllegalStateException(
1092          "Can move back only once and not in first key in the block.");
1093      }
1094
1095      STATE tmp = previous;
1096      previous = current;
1097      current = tmp;
1098
1099      // move after last key value
1100      currentBuffer.position(current.nextKvOffset);
1101      // Already decoded the tag bytes. We cache this tags into current state and also the total
1102      // compressed length of the tags bytes. For the next time decodeNext() we don't need to decode
1103      // the tags again. This might pollute the Data Dictionary what we use for the compression.
1104      // When current.uncompressTags is false, we will just reuse the current.tagsBuffer and skip
1105      // 'tagsCompressedLength' bytes of source stream.
1106      // See in decodeTags()
1107      current.tagsBuffer = previous.tagsBuffer;
1108      current.tagsCompressedLength = previous.tagsCompressedLength;
1109      current.uncompressTags = false;
1110      // The current key has to be reset with the previous Cell
1111      current.setKey(current.keyBuffer, current.memstoreTS);
1112      previous.invalidate();
1113    }
1114
1115    @SuppressWarnings("unchecked")
1116    protected STATE createSeekerState() {
1117      // This will fail for non-default seeker state if the subclass does not
1118      // override this method.
1119      return (STATE) new SeekerState(this.tmpPair, this.includesTags());
1120    }
1121
1122    abstract protected void decodeFirst();
1123
1124    abstract protected void decodeNext();
1125  }
1126
1127  /** Returns unencoded size added */
1128  protected final int afterEncodingKeyValue(Cell cell, DataOutputStream out,
1129    HFileBlockDefaultEncodingContext encodingCtx) throws IOException {
1130    int size = 0;
1131    if (encodingCtx.getHFileContext().isIncludesTags()) {
1132      int tagsLength = cell.getTagsLength();
1133      ByteBufferUtils.putCompressedInt(out, tagsLength);
1134      // There are some tags to be written
1135      if (tagsLength > 0) {
1136        TagCompressionContext tagCompressionContext = encodingCtx.getTagCompressionContext();
1137        // When tag compression is enabled, tagCompressionContext will have a not null value. Write
1138        // the tags using Dictionary compression in such a case
1139        if (tagCompressionContext != null) {
1140          // Not passing tagsLength considering that parsing of the tagsLength is not costly
1141          PrivateCellUtil.compressTags(out, cell, tagCompressionContext);
1142        } else {
1143          PrivateCellUtil.writeTags(out, cell, tagsLength);
1144        }
1145      }
1146      size += tagsLength + KeyValue.TAGS_LENGTH_SIZE;
1147    }
1148    if (encodingCtx.getHFileContext().isIncludesMvcc()) {
1149      // Copy memstore timestamp from the byte buffer to the output stream.
1150      long memstoreTS = cell.getSequenceId();
1151      WritableUtils.writeVLong(out, memstoreTS);
1152      // TODO use a writeVLong which returns the #bytes written so that 2 time parsing can be
1153      // avoided.
1154      size += WritableUtils.getVIntSize(memstoreTS);
1155    }
1156    return size;
1157  }
1158
1159  protected final void afterDecodingKeyValue(DataInputStream source, ByteBuffer dest,
1160    HFileBlockDefaultDecodingContext decodingCtx) throws IOException {
1161    if (decodingCtx.getHFileContext().isIncludesTags()) {
1162      int tagsLength = ByteBufferUtils.readCompressedInt(source);
1163      // Put as unsigned short
1164      dest.put((byte) ((tagsLength >> 8) & 0xff));
1165      dest.put((byte) (tagsLength & 0xff));
1166      if (tagsLength > 0) {
1167        TagCompressionContext tagCompressionContext = decodingCtx.getTagCompressionContext();
1168        // When tag compression is been used in this file, tagCompressionContext will have a not
1169        // null value passed.
1170        if (tagCompressionContext != null) {
1171          tagCompressionContext.uncompressTags(source, dest, tagsLength);
1172        } else {
1173          ByteBufferUtils.copyFromStreamToBuffer(dest, source, tagsLength);
1174        }
1175      }
1176    }
1177    if (decodingCtx.getHFileContext().isIncludesMvcc()) {
1178      long memstoreTS = -1;
1179      try {
1180        // Copy memstore timestamp from the data input stream to the byte
1181        // buffer.
1182        memstoreTS = WritableUtils.readVLong(source);
1183        ByteBufferUtils.writeVLong(dest, memstoreTS);
1184      } catch (IOException ex) {
1185        throw new RuntimeException(
1186          "Unable to copy memstore timestamp " + memstoreTS + " after decoding a key/value");
1187      }
1188    }
1189  }
1190
1191  protected abstract ByteBuffer internalDecodeKeyValues(DataInputStream source,
1192    int allocateHeaderLength, int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx)
1193    throws IOException;
1194
1195  /**
1196   * Asserts that there is at least the given amount of unfilled space remaining in the given
1197   * buffer.
1198   * @param out    typically, the buffer we are writing to
1199   * @param length the required space in the buffer
1200   * @throws EncoderBufferTooSmallException If there are no enough bytes.
1201   */
1202  protected static void ensureSpace(ByteBuffer out, int length)
1203    throws EncoderBufferTooSmallException {
1204    if (out.position() + length > out.limit()) {
1205      throw new EncoderBufferTooSmallException("Buffer position=" + out.position()
1206        + ", buffer limit=" + out.limit() + ", length to be written=" + length);
1207    }
1208  }
1209
1210  @Override
1211  public void startBlockEncoding(HFileBlockEncodingContext blkEncodingCtx, DataOutputStream out)
1212    throws IOException {
1213    if (blkEncodingCtx.getClass() != HFileBlockDefaultEncodingContext.class) {
1214      throw new IOException(this.getClass().getName() + " only accepts "
1215        + HFileBlockDefaultEncodingContext.class.getName() + " as the " + "encoding context.");
1216    }
1217
1218    HFileBlockDefaultEncodingContext encodingCtx =
1219      (HFileBlockDefaultEncodingContext) blkEncodingCtx;
1220    encodingCtx.prepareEncoding(out);
1221    if (
1222      encodingCtx.getHFileContext().isIncludesTags()
1223        && encodingCtx.getHFileContext().isCompressTags()
1224    ) {
1225      if (encodingCtx.getTagCompressionContext() != null) {
1226        // It will be overhead to create the TagCompressionContext again and again for every block
1227        // encoding.
1228        encodingCtx.getTagCompressionContext().clear();
1229      } else {
1230        try {
1231          TagCompressionContext tagCompressionContext =
1232            new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
1233          encodingCtx.setTagCompressionContext(tagCompressionContext);
1234        } catch (Exception e) {
1235          throw new IOException("Failed to initialize TagCompressionContext", e);
1236        }
1237      }
1238    }
1239    StreamUtils.writeInt(out, 0); // DUMMY length. This will be updated in endBlockEncoding()
1240    blkEncodingCtx.setEncodingState(new EncodingState());
1241  }
1242
1243  @Override
1244  public void encode(Cell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
1245    throws IOException {
1246    EncodingState state = encodingCtx.getEncodingState();
1247    int posBeforeEncode = out.size();
1248    int encodedKvSize = internalEncode(cell, (HFileBlockDefaultEncodingContext) encodingCtx, out);
1249    state.postCellEncode(encodedKvSize, out.size() - posBeforeEncode);
1250  }
1251
1252  public abstract int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingCtx,
1253    DataOutputStream out) throws IOException;
1254
1255  @Override
1256  public void endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out,
1257    byte[] uncompressedBytesWithHeader) throws IOException {
1258    EncodingState state = encodingCtx.getEncodingState();
1259    // Write the unencodedDataSizeWritten (with header size)
1260    Bytes.putInt(uncompressedBytesWithHeader,
1261      HConstants.HFILEBLOCK_HEADER_SIZE + DataBlockEncoding.ID_SIZE,
1262      state.getUnencodedDataSizeWritten());
1263    postEncoding(encodingCtx);
1264  }
1265
1266}