001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.encoding;
019
020import java.io.DataInputStream;
021import java.io.DataOutputStream;
022import java.io.IOException;
023import java.io.OutputStream;
024import java.nio.ByteBuffer;
025import org.apache.hadoop.hbase.ByteBufferExtendedCell;
026import org.apache.hadoop.hbase.Cell;
027import org.apache.hadoop.hbase.CellComparator;
028import org.apache.hadoop.hbase.CellUtil;
029import org.apache.hadoop.hbase.ExtendedCell;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.KeyValue;
032import org.apache.hadoop.hbase.KeyValueUtil;
033import org.apache.hadoop.hbase.PrivateCellUtil;
034import org.apache.hadoop.hbase.io.TagCompressionContext;
035import org.apache.hadoop.hbase.io.util.LRUDictionary;
036import org.apache.hadoop.hbase.io.util.StreamUtils;
037import org.apache.hadoop.hbase.nio.ByteBuff;
038import org.apache.hadoop.hbase.util.ByteBufferUtils;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.apache.hadoop.hbase.util.ClassSize;
041import org.apache.hadoop.hbase.util.ObjectIntPair;
042import org.apache.hadoop.io.WritableUtils;
043import org.apache.yetus.audience.InterfaceAudience;
044
045/**
046 * Base class for all data block encoders that use a buffer.
047 */
048@InterfaceAudience.Private
049abstract class BufferedDataBlockEncoder extends AbstractDataBlockEncoder {
050  /**
051   * TODO: This datablockencoder is dealing in internals of hfileblocks. Purge reference to HFBs
052   */
053  private static int INITIAL_KEY_BUFFER_SIZE = 512;
054
055  @Override
056  public ByteBuffer decodeKeyValues(DataInputStream source,
057    HFileBlockDecodingContext blkDecodingCtx) throws IOException {
058    if (blkDecodingCtx.getClass() != HFileBlockDefaultDecodingContext.class) {
059      throw new IOException(this.getClass().getName() + " only accepts "
060        + HFileBlockDefaultDecodingContext.class.getName() + " as the decoding context.");
061    }
062
063    HFileBlockDefaultDecodingContext decodingCtx =
064      (HFileBlockDefaultDecodingContext) blkDecodingCtx;
065    if (
066      decodingCtx.getHFileContext().isIncludesTags()
067        && decodingCtx.getHFileContext().isCompressTags()
068    ) {
069      if (decodingCtx.getTagCompressionContext() != null) {
070        // It will be overhead to create the TagCompressionContext again and again for every block
071        // decoding.
072        decodingCtx.getTagCompressionContext().clear();
073      } else {
074        try {
075          TagCompressionContext tagCompressionContext =
076            new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
077          decodingCtx.setTagCompressionContext(tagCompressionContext);
078        } catch (Exception e) {
079          throw new IOException("Failed to initialize TagCompressionContext", e);
080        }
081      }
082    }
083    return internalDecodeKeyValues(source, 0, 0, decodingCtx);
084  }
085
086  /********************* common prefixes *************************/
087  // Having this as static is fine but if META is having DBE then we should
088  // change this.
089  public static int compareCommonRowPrefix(Cell left, Cell right, int rowCommonPrefix) {
090    if (left instanceof ByteBufferExtendedCell) {
091      ByteBufferExtendedCell bbLeft = (ByteBufferExtendedCell) left;
092      if (right instanceof ByteBufferExtendedCell) {
093        ByteBufferExtendedCell bbRight = (ByteBufferExtendedCell) right;
094        return ByteBufferUtils.compareTo(bbLeft.getRowByteBuffer(),
095          bbLeft.getRowPosition() + rowCommonPrefix, left.getRowLength() - rowCommonPrefix,
096          bbRight.getRowByteBuffer(), bbRight.getRowPosition() + rowCommonPrefix,
097          right.getRowLength() - rowCommonPrefix);
098      } else {
099        return ByteBufferUtils.compareTo(bbLeft.getRowByteBuffer(),
100          bbLeft.getRowPosition() + rowCommonPrefix, left.getRowLength() - rowCommonPrefix,
101          right.getRowArray(), right.getRowOffset() + rowCommonPrefix,
102          right.getRowLength() - rowCommonPrefix);
103      }
104    } else {
105      if (right instanceof ByteBufferExtendedCell) {
106        ByteBufferExtendedCell bbRight = (ByteBufferExtendedCell) right;
107        return ByteBufferUtils.compareTo(left.getRowArray(), left.getRowOffset() + rowCommonPrefix,
108          left.getRowLength() - rowCommonPrefix, bbRight.getRowByteBuffer(),
109          bbRight.getRowPosition() + rowCommonPrefix, right.getRowLength() - rowCommonPrefix);
110      } else {
111        return Bytes.compareTo(left.getRowArray(), left.getRowOffset() + rowCommonPrefix,
112          left.getRowLength() - rowCommonPrefix, right.getRowArray(),
113          right.getRowOffset() + rowCommonPrefix, right.getRowLength() - rowCommonPrefix);
114      }
115    }
116  }
117
118  public static int compareCommonFamilyPrefix(Cell left, Cell right, int familyCommonPrefix) {
119    if (left instanceof ByteBufferExtendedCell) {
120      ByteBufferExtendedCell bbLeft = (ByteBufferExtendedCell) left;
121      if (right instanceof ByteBufferExtendedCell) {
122        ByteBufferExtendedCell bbRight = (ByteBufferExtendedCell) right;
123        return ByteBufferUtils.compareTo(bbLeft.getFamilyByteBuffer(),
124          bbLeft.getFamilyPosition() + familyCommonPrefix,
125          left.getFamilyLength() - familyCommonPrefix, bbRight.getFamilyByteBuffer(),
126          bbRight.getFamilyPosition() + familyCommonPrefix,
127          right.getFamilyLength() - familyCommonPrefix);
128      } else {
129        return ByteBufferUtils.compareTo(bbLeft.getFamilyByteBuffer(),
130          bbLeft.getFamilyPosition() + familyCommonPrefix,
131          left.getFamilyLength() - familyCommonPrefix, right.getFamilyArray(),
132          right.getFamilyOffset() + familyCommonPrefix,
133          right.getFamilyLength() - familyCommonPrefix);
134      }
135    } else {
136      if (right instanceof ByteBufferExtendedCell) {
137        ByteBufferExtendedCell bbRight = (ByteBufferExtendedCell) right;
138        return ByteBufferUtils.compareTo(left.getFamilyArray(),
139          left.getFamilyOffset() + familyCommonPrefix, left.getFamilyLength() - familyCommonPrefix,
140          bbRight.getFamilyByteBuffer(), bbRight.getFamilyPosition() + familyCommonPrefix,
141          right.getFamilyLength() - familyCommonPrefix);
142      } else {
143        return Bytes.compareTo(left.getFamilyArray(), left.getFamilyOffset() + familyCommonPrefix,
144          left.getFamilyLength() - familyCommonPrefix, right.getFamilyArray(),
145          right.getFamilyOffset() + familyCommonPrefix,
146          right.getFamilyLength() - familyCommonPrefix);
147      }
148    }
149  }
150
151  public static int compareCommonQualifierPrefix(Cell left, Cell right, int qualCommonPrefix) {
152    if (left instanceof ByteBufferExtendedCell) {
153      ByteBufferExtendedCell bbLeft = (ByteBufferExtendedCell) left;
154      if (right instanceof ByteBufferExtendedCell) {
155        ByteBufferExtendedCell bbRight = (ByteBufferExtendedCell) right;
156        return ByteBufferUtils.compareTo(bbLeft.getQualifierByteBuffer(),
157          bbLeft.getQualifierPosition() + qualCommonPrefix,
158          left.getQualifierLength() - qualCommonPrefix, bbRight.getQualifierByteBuffer(),
159          bbRight.getQualifierPosition() + qualCommonPrefix,
160          right.getQualifierLength() - qualCommonPrefix);
161      } else {
162        return ByteBufferUtils.compareTo(bbLeft.getQualifierByteBuffer(),
163          bbLeft.getQualifierPosition() + qualCommonPrefix,
164          left.getQualifierLength() - qualCommonPrefix, right.getQualifierArray(),
165          right.getQualifierOffset() + qualCommonPrefix,
166          right.getQualifierLength() - qualCommonPrefix);
167      }
168    } else {
169      if (right instanceof ByteBufferExtendedCell) {
170        ByteBufferExtendedCell bbRight = (ByteBufferExtendedCell) right;
171        return ByteBufferUtils.compareTo(left.getQualifierArray(),
172          left.getQualifierOffset() + qualCommonPrefix,
173          left.getQualifierLength() - qualCommonPrefix, bbRight.getQualifierByteBuffer(),
174          bbRight.getQualifierPosition() + qualCommonPrefix,
175          right.getQualifierLength() - qualCommonPrefix);
176      } else {
177        return Bytes.compareTo(left.getQualifierArray(),
178          left.getQualifierOffset() + qualCommonPrefix,
179          left.getQualifierLength() - qualCommonPrefix, right.getQualifierArray(),
180          right.getQualifierOffset() + qualCommonPrefix,
181          right.getQualifierLength() - qualCommonPrefix);
182      }
183    }
184  }
185
186  protected static class SeekerState {
187    protected ByteBuff currentBuffer;
188    protected TagCompressionContext tagCompressionContext;
189    protected int valueOffset = -1;
190    protected int keyLength;
191    protected int valueLength;
192    protected int lastCommonPrefix;
193    protected int tagsLength = 0;
194    protected int tagsOffset = -1;
195    protected int tagsCompressedLength = 0;
196    protected boolean uncompressTags = true;
197
198    /** We need to store a copy of the key. */
199    protected byte[] keyBuffer = HConstants.EMPTY_BYTE_ARRAY;
200    protected byte[] tagsBuffer = HConstants.EMPTY_BYTE_ARRAY;
201
202    protected long memstoreTS;
203    protected int nextKvOffset;
204    protected KeyValue.KeyOnlyKeyValue currentKey = new KeyValue.KeyOnlyKeyValue();
205    // A temp pair object which will be reused by ByteBuff#asSubByteBuffer calls. This avoids too
206    // many object creations.
207    private final ObjectIntPair<ByteBuffer> tmpPair;
208    private final boolean includeTags;
209
210    public SeekerState(ObjectIntPair<ByteBuffer> tmpPair, boolean includeTags) {
211      this.tmpPair = tmpPair;
212      this.includeTags = includeTags;
213    }
214
215    protected boolean isValid() {
216      return valueOffset != -1;
217    }
218
219    protected void invalidate() {
220      valueOffset = -1;
221      tagsCompressedLength = 0;
222      currentKey.clear();
223      uncompressTags = true;
224      currentBuffer = null;
225    }
226
227    protected void ensureSpaceForKey() {
228      if (keyLength > keyBuffer.length) {
229        int newKeyBufferLength =
230          Integer.highestOneBit(Math.max(INITIAL_KEY_BUFFER_SIZE, keyLength) - 1) << 1;
231        byte[] newKeyBuffer = new byte[newKeyBufferLength];
232        System.arraycopy(keyBuffer, 0, newKeyBuffer, 0, keyBuffer.length);
233        keyBuffer = newKeyBuffer;
234      }
235    }
236
237    protected void ensureSpaceForTags() {
238      if (tagsLength > tagsBuffer.length) {
239        int newTagsBufferLength =
240          Integer.highestOneBit(Math.max(INITIAL_KEY_BUFFER_SIZE, tagsLength) - 1) << 1;
241        byte[] newTagsBuffer = new byte[newTagsBufferLength];
242        System.arraycopy(tagsBuffer, 0, newTagsBuffer, 0, tagsBuffer.length);
243        tagsBuffer = newTagsBuffer;
244      }
245    }
246
247    protected void setKey(byte[] keyBuffer, long memTS) {
248      currentKey.setKey(keyBuffer, 0, keyLength);
249      memstoreTS = memTS;
250    }
251
252    /**
253     * Copy the state from the next one into this instance (the previous state placeholder). Used to
254     * save the previous state when we are advancing the seeker to the next key/value.
255     */
256    protected void copyFromNext(SeekerState nextState) {
257      if (keyBuffer.length != nextState.keyBuffer.length) {
258        keyBuffer = nextState.keyBuffer.clone();
259      } else if (!isValid()) {
260        // Note: we can only call isValid before we override our state, so this
261        // comes before all the assignments at the end of this method.
262        System.arraycopy(nextState.keyBuffer, 0, keyBuffer, 0, nextState.keyLength);
263      } else {
264        // don't copy the common prefix between this key and the previous one
265        System.arraycopy(nextState.keyBuffer, nextState.lastCommonPrefix, keyBuffer,
266          nextState.lastCommonPrefix, nextState.keyLength - nextState.lastCommonPrefix);
267      }
268      currentKey.set(nextState.currentKey);
269
270      valueOffset = nextState.valueOffset;
271      keyLength = nextState.keyLength;
272      valueLength = nextState.valueLength;
273      lastCommonPrefix = nextState.lastCommonPrefix;
274      nextKvOffset = nextState.nextKvOffset;
275      memstoreTS = nextState.memstoreTS;
276      currentBuffer = nextState.currentBuffer;
277      tagsOffset = nextState.tagsOffset;
278      tagsLength = nextState.tagsLength;
279      if (nextState.tagCompressionContext != null) {
280        tagCompressionContext = nextState.tagCompressionContext;
281      }
282    }
283
284    public Cell toCell() {
285      // Buffer backing the value and tags part from the HFileBlock's buffer
286      // When tag compression in use, this will be only the value bytes area.
287      ByteBuffer valAndTagsBuffer;
288      int vOffset;
289      int valAndTagsLength = this.valueLength;
290      int tagsLenSerializationSize = 0;
291      if (this.includeTags && this.tagCompressionContext == null) {
292        // Include the tags part also. This will be the tags bytes + 2 bytes of for storing tags
293        // length
294        tagsLenSerializationSize = this.tagsOffset - (this.valueOffset + this.valueLength);
295        valAndTagsLength += tagsLenSerializationSize + this.tagsLength;
296      }
297      this.currentBuffer.asSubByteBuffer(this.valueOffset, valAndTagsLength, this.tmpPair);
298      valAndTagsBuffer = this.tmpPair.getFirst();
299      vOffset = this.tmpPair.getSecond();// This is the offset to value part in the BB
300      if (valAndTagsBuffer.hasArray()) {
301        return toOnheapCell(valAndTagsBuffer, vOffset, tagsLenSerializationSize);
302      } else {
303        return toOffheapCell(valAndTagsBuffer, vOffset, tagsLenSerializationSize);
304      }
305    }
306
307    private Cell toOnheapCell(ByteBuffer valAndTagsBuffer, int vOffset,
308      int tagsLenSerializationSize) {
309      byte[] tagsArray = HConstants.EMPTY_BYTE_ARRAY;
310      int tOffset = 0;
311      if (this.includeTags) {
312        if (this.tagCompressionContext == null) {
313          tagsArray = valAndTagsBuffer.array();
314          tOffset =
315            valAndTagsBuffer.arrayOffset() + vOffset + this.valueLength + tagsLenSerializationSize;
316        } else {
317          tagsArray = Bytes.copy(tagsBuffer, 0, this.tagsLength);
318          tOffset = 0;
319        }
320      }
321      return new OnheapDecodedCell(Bytes.copy(keyBuffer, 0, this.keyLength),
322        currentKey.getRowLength(), currentKey.getFamilyOffset(), currentKey.getFamilyLength(),
323        currentKey.getQualifierOffset(), currentKey.getQualifierLength(), currentKey.getTimestamp(),
324        currentKey.getTypeByte(), valAndTagsBuffer.array(),
325        valAndTagsBuffer.arrayOffset() + vOffset, this.valueLength, memstoreTS, tagsArray, tOffset,
326        this.tagsLength);
327    }
328
329    private Cell toOffheapCell(ByteBuffer valAndTagsBuffer, int vOffset,
330      int tagsLenSerializationSize) {
331      ByteBuffer tagsBuf = HConstants.EMPTY_BYTE_BUFFER;
332      int tOffset = 0;
333      if (this.includeTags) {
334        if (this.tagCompressionContext == null) {
335          tagsBuf = valAndTagsBuffer;
336          tOffset = vOffset + this.valueLength + tagsLenSerializationSize;
337        } else {
338          tagsBuf = ByteBuffer.wrap(Bytes.copy(tagsBuffer, 0, this.tagsLength));
339          tOffset = 0;
340        }
341      }
342      return new OffheapDecodedExtendedCell(
343        ByteBuffer.wrap(Bytes.copy(keyBuffer, 0, this.keyLength)), currentKey.getRowLength(),
344        currentKey.getFamilyOffset(), currentKey.getFamilyLength(), currentKey.getQualifierOffset(),
345        currentKey.getQualifierLength(), currentKey.getTimestamp(), currentKey.getTypeByte(),
346        valAndTagsBuffer, vOffset, this.valueLength, memstoreTS, tagsBuf, tOffset, this.tagsLength);
347    }
348  }
349
350  /**
351   * Copies only the key part of the keybuffer by doing a deep copy and passes the seeker state
352   * members for taking a clone. Note that the value byte[] part is still pointing to the
353   * currentBuffer and represented by the valueOffset and valueLength
354   */
355  // We return this as a Cell to the upper layers of read flow and might try setting a new SeqId
356  // there. So this has to be an instance of ExtendedCell.
357  protected static class OnheapDecodedCell implements ExtendedCell {
358    private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
359      + (3 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT)
360      + Bytes.SIZEOF_SHORT + (2 * Bytes.SIZEOF_BYTE) + (3 * ClassSize.ARRAY));
361    private byte[] keyOnlyBuffer;
362    private short rowLength;
363    private int familyOffset;
364    private byte familyLength;
365    private int qualifierOffset;
366    private int qualifierLength;
367    private long timeStamp;
368    private byte typeByte;
369    private byte[] valueBuffer;
370    private int valueOffset;
371    private int valueLength;
372    private byte[] tagsBuffer;
373    private int tagsOffset;
374    private int tagsLength;
375    private long seqId;
376
377    protected OnheapDecodedCell(byte[] keyBuffer, short rowLength, int familyOffset,
378      byte familyLength, int qualOffset, int qualLength, long timeStamp, byte typeByte,
379      byte[] valueBuffer, int valueOffset, int valueLen, long seqId, byte[] tagsBuffer,
380      int tagsOffset, int tagsLength) {
381      this.keyOnlyBuffer = keyBuffer;
382      this.rowLength = rowLength;
383      this.familyOffset = familyOffset;
384      this.familyLength = familyLength;
385      this.qualifierOffset = qualOffset;
386      this.qualifierLength = qualLength;
387      this.timeStamp = timeStamp;
388      this.typeByte = typeByte;
389      this.valueBuffer = valueBuffer;
390      this.valueOffset = valueOffset;
391      this.valueLength = valueLen;
392      this.tagsBuffer = tagsBuffer;
393      this.tagsOffset = tagsOffset;
394      this.tagsLength = tagsLength;
395      setSequenceId(seqId);
396    }
397
398    @Override
399    public byte[] getRowArray() {
400      return keyOnlyBuffer;
401    }
402
403    @Override
404    public byte[] getFamilyArray() {
405      return keyOnlyBuffer;
406    }
407
408    @Override
409    public byte[] getQualifierArray() {
410      return keyOnlyBuffer;
411    }
412
413    @Override
414    public int getRowOffset() {
415      return Bytes.SIZEOF_SHORT;
416    }
417
418    @Override
419    public short getRowLength() {
420      return rowLength;
421    }
422
423    @Override
424    public int getFamilyOffset() {
425      return familyOffset;
426    }
427
428    @Override
429    public byte getFamilyLength() {
430      return familyLength;
431    }
432
433    @Override
434    public int getQualifierOffset() {
435      return qualifierOffset;
436    }
437
438    @Override
439    public int getQualifierLength() {
440      return qualifierLength;
441    }
442
443    @Override
444    public long getTimestamp() {
445      return timeStamp;
446    }
447
448    @Override
449    public byte getTypeByte() {
450      return typeByte;
451    }
452
453    @Override
454    public long getSequenceId() {
455      return seqId;
456    }
457
458    @Override
459    public byte[] getValueArray() {
460      return this.valueBuffer;
461    }
462
463    @Override
464    public int getValueOffset() {
465      return valueOffset;
466    }
467
468    @Override
469    public int getValueLength() {
470      return valueLength;
471    }
472
473    @Override
474    public byte[] getTagsArray() {
475      return this.tagsBuffer;
476    }
477
478    @Override
479    public int getTagsOffset() {
480      return this.tagsOffset;
481    }
482
483    @Override
484    public int getTagsLength() {
485      return tagsLength;
486    }
487
488    @Override
489    public String toString() {
490      return KeyValue.keyToString(this.keyOnlyBuffer, 0, KeyValueUtil.keyLength(this)) + "/vlen="
491        + getValueLength() + "/seqid=" + seqId;
492    }
493
494    @Override
495    public void setSequenceId(long seqId) {
496      this.seqId = seqId;
497    }
498
499    @Override
500    public long heapSize() {
501      return FIXED_OVERHEAD + rowLength + familyLength + qualifierLength + valueLength + tagsLength;
502    }
503
504    @Override
505    public int write(OutputStream out, boolean withTags) throws IOException {
506      int lenToWrite = getSerializedSize(withTags);
507      ByteBufferUtils.putInt(out, keyOnlyBuffer.length);
508      ByteBufferUtils.putInt(out, valueLength);
509      // Write key
510      out.write(keyOnlyBuffer);
511      // Write value
512      out.write(this.valueBuffer, this.valueOffset, this.valueLength);
513      if (withTags && this.tagsLength > 0) {
514        // 2 bytes tags length followed by tags bytes
515        // tags length is serialized with 2 bytes only(short way) even if the type is int.
516        // As this is non -ve numbers, we save the sign bit. See HBASE-11437
517        out.write((byte) (0xff & (this.tagsLength >> 8)));
518        out.write((byte) (0xff & this.tagsLength));
519        out.write(this.tagsBuffer, this.tagsOffset, this.tagsLength);
520      }
521      return lenToWrite;
522    }
523
524    @Override
525    public int getSerializedSize(boolean withTags) {
526      return KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength, tagsLength,
527        withTags);
528    }
529
530    @Override
531    public void write(ByteBuffer buf, int offset) {
532      // This is not used in actual flow. Throwing UnsupportedOperationException
533      throw new UnsupportedOperationException();
534    }
535
536    @Override
537    public void setTimestamp(long ts) throws IOException {
538      // This is not used in actual flow. Throwing UnsupportedOperationException
539      throw new UnsupportedOperationException();
540    }
541
542    @Override
543    public void setTimestamp(byte[] ts) throws IOException {
544      // This is not used in actual flow. Throwing UnsupportedOperationException
545      throw new UnsupportedOperationException();
546    }
547
548    @Override
549    public ExtendedCell deepClone() {
550      // This is not used in actual flow. Throwing UnsupportedOperationException
551      throw new UnsupportedOperationException();
552    }
553  }
554
555  protected static class OffheapDecodedExtendedCell extends ByteBufferExtendedCell {
556    private static final long FIXED_OVERHEAD =
557      (long) ClassSize.align(ClassSize.OBJECT + (3 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG)
558        + (7 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE)
559        + (3 * ClassSize.BYTE_BUFFER);
560    private ByteBuffer keyBuffer;
561    private short rowLength;
562    private int familyOffset;
563    private byte familyLength;
564    private int qualifierOffset;
565    private int qualifierLength;
566    private long timeStamp;
567    private byte typeByte;
568    private ByteBuffer valueBuffer;
569    private int valueOffset;
570    private int valueLength;
571    private ByteBuffer tagsBuffer;
572    private int tagsOffset;
573    private int tagsLength;
574    private long seqId;
575
576    protected OffheapDecodedExtendedCell(ByteBuffer keyBuffer, short rowLength, int familyOffset,
577      byte familyLength, int qualOffset, int qualLength, long timeStamp, byte typeByte,
578      ByteBuffer valueBuffer, int valueOffset, int valueLen, long seqId, ByteBuffer tagsBuffer,
579      int tagsOffset, int tagsLength) {
580      // The keyBuffer is always onheap
581      assert keyBuffer.hasArray();
582      assert keyBuffer.arrayOffset() == 0;
583      this.keyBuffer = keyBuffer;
584      this.rowLength = rowLength;
585      this.familyOffset = familyOffset;
586      this.familyLength = familyLength;
587      this.qualifierOffset = qualOffset;
588      this.qualifierLength = qualLength;
589      this.timeStamp = timeStamp;
590      this.typeByte = typeByte;
591      this.valueBuffer = valueBuffer;
592      this.valueOffset = valueOffset;
593      this.valueLength = valueLen;
594      this.tagsBuffer = tagsBuffer;
595      this.tagsOffset = tagsOffset;
596      this.tagsLength = tagsLength;
597      setSequenceId(seqId);
598    }
599
600    @Override
601    @SuppressWarnings("ByteBufferBackingArray")
602    public byte[] getRowArray() {
603      return this.keyBuffer.array();
604    }
605
606    @Override
607    public int getRowOffset() {
608      return getRowPosition();
609    }
610
611    @Override
612    public short getRowLength() {
613      return this.rowLength;
614    }
615
616    @Override
617    @SuppressWarnings("ByteBufferBackingArray")
618    public byte[] getFamilyArray() {
619      return this.keyBuffer.array();
620    }
621
622    @Override
623    public int getFamilyOffset() {
624      return getFamilyPosition();
625    }
626
627    @Override
628    public byte getFamilyLength() {
629      return this.familyLength;
630    }
631
632    @Override
633    @SuppressWarnings("ByteBufferBackingArray")
634    public byte[] getQualifierArray() {
635      return this.keyBuffer.array();
636    }
637
638    @Override
639    public int getQualifierOffset() {
640      return getQualifierPosition();
641    }
642
643    @Override
644    public int getQualifierLength() {
645      return this.qualifierLength;
646    }
647
648    @Override
649    public long getTimestamp() {
650      return this.timeStamp;
651    }
652
653    @Override
654    public byte getTypeByte() {
655      return this.typeByte;
656    }
657
658    @Override
659    public long getSequenceId() {
660      return this.seqId;
661    }
662
663    @Override
664    public byte[] getValueArray() {
665      return CellUtil.cloneValue(this);
666    }
667
668    @Override
669    public int getValueOffset() {
670      return 0;
671    }
672
673    @Override
674    public int getValueLength() {
675      return this.valueLength;
676    }
677
678    @Override
679    public byte[] getTagsArray() {
680      return PrivateCellUtil.cloneTags(this);
681    }
682
683    @Override
684    public int getTagsOffset() {
685      return 0;
686    }
687
688    @Override
689    public int getTagsLength() {
690      return this.tagsLength;
691    }
692
693    @Override
694    public ByteBuffer getRowByteBuffer() {
695      return this.keyBuffer;
696    }
697
698    @Override
699    public int getRowPosition() {
700      return Bytes.SIZEOF_SHORT;
701    }
702
703    @Override
704    public ByteBuffer getFamilyByteBuffer() {
705      return this.keyBuffer;
706    }
707
708    @Override
709    public int getFamilyPosition() {
710      return this.familyOffset;
711    }
712
713    @Override
714    public ByteBuffer getQualifierByteBuffer() {
715      return this.keyBuffer;
716    }
717
718    @Override
719    public int getQualifierPosition() {
720      return this.qualifierOffset;
721    }
722
723    @Override
724    public ByteBuffer getValueByteBuffer() {
725      return this.valueBuffer;
726    }
727
728    @Override
729    public int getValuePosition() {
730      return this.valueOffset;
731    }
732
733    @Override
734    public ByteBuffer getTagsByteBuffer() {
735      return this.tagsBuffer;
736    }
737
738    @Override
739    public int getTagsPosition() {
740      return this.tagsOffset;
741    }
742
743    @Override
744    public long heapSize() {
745      return FIXED_OVERHEAD;
746    }
747
748    @Override
749    public void setSequenceId(long seqId) {
750      this.seqId = seqId;
751    }
752
753    @Override
754    public int write(OutputStream out, boolean withTags) throws IOException {
755      int lenToWrite = getSerializedSize(withTags);
756      ByteBufferUtils.putInt(out, keyBuffer.remaining());
757      ByteBufferUtils.putInt(out, valueLength);
758      // Write key
759      out.write(keyBuffer.array(), keyBuffer.arrayOffset(), keyBuffer.remaining());
760      // Write value
761      ByteBufferUtils.copyBufferToStream(out, this.valueBuffer, this.valueOffset, this.valueLength);
762      if (withTags && this.tagsLength > 0) {
763        // 2 bytes tags length followed by tags bytes
764        // tags length is serialized with 2 bytes only(short way) even if the type is int.
765        // As this is non -ve numbers, we save the sign bit. See HBASE-11437
766        out.write((byte) (0xff & (this.tagsLength >> 8)));
767        out.write((byte) (0xff & this.tagsLength));
768        ByteBufferUtils.copyBufferToStream(out, this.tagsBuffer, this.tagsOffset, this.tagsLength);
769      }
770      return lenToWrite;
771    }
772
773    @Override
774    public int getSerializedSize(boolean withTags) {
775      return KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength, tagsLength,
776        withTags);
777    }
778
779    @Override
780    public void setTimestamp(long ts) throws IOException {
781      // This is not used in actual flow. Throwing UnsupportedOperationException
782      throw new UnsupportedOperationException();
783    }
784
785    @Override
786    public void setTimestamp(byte[] ts) throws IOException {
787      // This is not used in actual flow. Throwing UnsupportedOperationException
788      throw new UnsupportedOperationException();
789    }
790
791    @Override
792    public void write(ByteBuffer buf, int offset) {
793      // This is not used in actual flow. Throwing UnsupportedOperationException
794      throw new UnsupportedOperationException();
795    }
796
797    @Override
798    public ExtendedCell deepClone() {
799      // This is not used in actual flow. Throwing UnsupportedOperationException
800      throw new UnsupportedOperationException();
801    }
802  }
803
804  protected abstract static class BufferedEncodedSeeker<STATE extends SeekerState>
805    extends AbstractEncodedSeeker {
806    protected ByteBuff currentBuffer;
807    protected TagCompressionContext tagCompressionContext = null;
808    protected KeyValue.KeyOnlyKeyValue keyOnlyKV = new KeyValue.KeyOnlyKeyValue();
809    // A temp pair object which will be reused by ByteBuff#asSubByteBuffer calls. This avoids too
810    // many object creations.
811    protected final ObjectIntPair<ByteBuffer> tmpPair = new ObjectIntPair<>();
812    protected STATE current, previous;
813
814    public BufferedEncodedSeeker(HFileBlockDecodingContext decodingCtx) {
815      super(decodingCtx);
816      if (decodingCtx.getHFileContext().isCompressTags()) {
817        try {
818          tagCompressionContext = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
819        } catch (Exception e) {
820          throw new RuntimeException("Failed to initialize TagCompressionContext", e);
821        }
822      }
823      current = createSeekerState(); // always valid
824      previous = createSeekerState(); // may not be valid
825    }
826
827    @Override
828    public int compareKey(CellComparator comparator, Cell key) {
829      keyOnlyKV.setKey(current.keyBuffer, 0, current.keyLength);
830      return PrivateCellUtil.compareKeyIgnoresMvcc(comparator, key, keyOnlyKV);
831    }
832
833    @Override
834    public void setCurrentBuffer(ByteBuff buffer) {
835      if (this.tagCompressionContext != null) {
836        this.tagCompressionContext.clear();
837
838        // Prior seekToKeyInBlock may have reset this to false if we fell back to previous
839        // seeker state. This is an optimization so we don't have to uncompress tags again when
840        // reading last state.
841        // In seekBefore flow, if block change happens then rewind is not called and
842        // setCurrentBuffer is called, so need to uncompress any tags we see.
843        current.uncompressTags = true;
844      }
845      currentBuffer = buffer;
846      current.currentBuffer = currentBuffer;
847      if (tagCompressionContext != null) {
848        current.tagCompressionContext = tagCompressionContext;
849      }
850      decodeFirst();
851      current.setKey(current.keyBuffer, current.memstoreTS);
852      previous.invalidate();
853    }
854
855    @Override
856    public Cell getKey() {
857      byte[] key = new byte[current.keyLength];
858      System.arraycopy(current.keyBuffer, 0, key, 0, current.keyLength);
859      return new KeyValue.KeyOnlyKeyValue(key);
860    }
861
862    @Override
863    public ByteBuffer getValueShallowCopy() {
864      currentBuffer.asSubByteBuffer(current.valueOffset, current.valueLength, tmpPair);
865      ByteBuffer dup = tmpPair.getFirst().duplicate();
866      dup.position(tmpPair.getSecond());
867      dup.limit(tmpPair.getSecond() + current.valueLength);
868      return dup.slice();
869    }
870
871    @Override
872    public Cell getCell() {
873      return current.toCell();
874    }
875
876    @Override
877    public void rewind() {
878      currentBuffer.rewind();
879      if (tagCompressionContext != null) {
880        tagCompressionContext.clear();
881        // Prior seekToKeyInBlock may have reset this to false if we fell back to previous
882        // seeker state. This is an optimization so we don't have to uncompress tags again when
883        // reading last state.
884        // In case of rewind, we are starting from the beginning of the buffer, so we need
885        // to uncompress any tags we see.
886        current.uncompressTags = true;
887      }
888      decodeFirst();
889      current.setKey(current.keyBuffer, current.memstoreTS);
890      previous.invalidate();
891    }
892
893    @Override
894    public boolean next() {
895      if (!currentBuffer.hasRemaining()) {
896        return false;
897      }
898      decodeNext();
899      current.setKey(current.keyBuffer, current.memstoreTS);
900      previous.invalidate();
901      return true;
902    }
903
904    protected void decodeTags() {
905      current.tagsLength = ByteBuff.readCompressedInt(currentBuffer);
906      if (tagCompressionContext != null) {
907        if (current.uncompressTags) {
908          // Tag compression is been used. uncompress it into tagsBuffer
909          current.ensureSpaceForTags();
910          try {
911            current.tagsCompressedLength = tagCompressionContext.uncompressTags(currentBuffer,
912              current.tagsBuffer, 0, current.tagsLength);
913          } catch (Exception e) {
914            throw new RuntimeException("Exception while uncompressing tags", e);
915          }
916        } else {
917          currentBuffer.skip(current.tagsCompressedLength);
918          current.uncompressTags = true;// Reset this.
919        }
920        current.tagsOffset = -1;
921      } else {
922        // When tag compress is not used, let us not do copying of tags bytes into tagsBuffer.
923        // Just mark the tags Offset so as to create the KV buffer later in getKeyValueBuffer()
924        current.tagsOffset = currentBuffer.position();
925        currentBuffer.skip(current.tagsLength);
926      }
927    }
928
929    @Override
930    public int seekToKeyInBlock(Cell seekCell, boolean seekBefore) {
931      int rowCommonPrefix = 0;
932      int familyCommonPrefix = 0;
933      int qualCommonPrefix = 0;
934      previous.invalidate();
935      do {
936        int comp;
937        keyOnlyKV.setKey(current.keyBuffer, 0, current.keyLength);
938        if (current.lastCommonPrefix != 0) {
939          // The KV format has row key length also in the byte array. The
940          // common prefix
941          // includes it. So we need to subtract to find out the common prefix
942          // in the
943          // row part alone
944          rowCommonPrefix = Math.min(rowCommonPrefix, current.lastCommonPrefix - 2);
945        }
946        if (current.lastCommonPrefix <= 2) {
947          rowCommonPrefix = 0;
948        }
949        rowCommonPrefix += findCommonPrefixInRowPart(seekCell, keyOnlyKV, rowCommonPrefix);
950        comp = compareCommonRowPrefix(seekCell, keyOnlyKV, rowCommonPrefix);
951        if (comp == 0) {
952          comp = compareTypeBytes(seekCell, keyOnlyKV);
953          if (comp == 0) {
954            // Subtract the fixed row key length and the family key fixed length
955            familyCommonPrefix = Math.max(0, Math.min(familyCommonPrefix,
956              current.lastCommonPrefix - (3 + keyOnlyKV.getRowLength())));
957            familyCommonPrefix +=
958              findCommonPrefixInFamilyPart(seekCell, keyOnlyKV, familyCommonPrefix);
959            comp = compareCommonFamilyPrefix(seekCell, keyOnlyKV, familyCommonPrefix);
960            if (comp == 0) {
961              // subtract the rowkey fixed length and the family key fixed
962              // length
963              qualCommonPrefix = Math.max(0, Math.min(qualCommonPrefix, current.lastCommonPrefix
964                - (3 + keyOnlyKV.getRowLength() + keyOnlyKV.getFamilyLength())));
965              qualCommonPrefix +=
966                findCommonPrefixInQualifierPart(seekCell, keyOnlyKV, qualCommonPrefix);
967              comp = compareCommonQualifierPrefix(seekCell, keyOnlyKV, qualCommonPrefix);
968              if (comp == 0) {
969                comp = CellComparator.getInstance().compareTimestamps(seekCell, keyOnlyKV);
970                if (comp == 0) {
971                  // Compare types. Let the delete types sort ahead of puts;
972                  // i.e. types
973                  // of higher numbers sort before those of lesser numbers.
974                  // Maximum
975                  // (255)
976                  // appears ahead of everything, and minimum (0) appears
977                  // after
978                  // everything.
979                  comp = (0xff & keyOnlyKV.getTypeByte()) - (0xff & seekCell.getTypeByte());
980                }
981              }
982            }
983          }
984        }
985        if (comp == 0) { // exact match
986          if (seekBefore) {
987            if (!previous.isValid()) {
988              // The caller (seekBefore) has to ensure that we are not at the
989              // first key in the block.
990              throw new IllegalStateException(
991                "Cannot seekBefore if " + "positioned at the first key in the block: key="
992                  + Bytes.toStringBinary(seekCell.getRowArray()));
993            }
994            moveToPrevious();
995            return 1;
996          }
997          return 0;
998        }
999
1000        if (comp < 0) { // already too large, check previous
1001          if (previous.isValid()) {
1002            moveToPrevious();
1003          } else {
1004            return HConstants.INDEX_KEY_MAGIC; // using optimized index key
1005          }
1006          return 1;
1007        }
1008
1009        // move to next, if more data is available
1010        if (currentBuffer.hasRemaining()) {
1011          previous.copyFromNext(current);
1012          decodeNext();
1013          current.setKey(current.keyBuffer, current.memstoreTS);
1014        } else {
1015          break;
1016        }
1017      } while (true);
1018
1019      // we hit the end of the block, not an exact match
1020      return 1;
1021    }
1022
1023    private int compareTypeBytes(Cell key, Cell right) {
1024      if (
1025        key.getFamilyLength() + key.getQualifierLength() == 0
1026          && key.getTypeByte() == KeyValue.Type.Minimum.getCode()
1027      ) {
1028        // left is "bigger", i.e. it appears later in the sorted order
1029        return 1;
1030      }
1031      if (
1032        right.getFamilyLength() + right.getQualifierLength() == 0
1033          && right.getTypeByte() == KeyValue.Type.Minimum.getCode()
1034      ) {
1035        return -1;
1036      }
1037      return 0;
1038    }
1039
1040    // These findCommonPrefix* methods rely on the fact that keyOnlyKv is the "right" cell argument
1041    // and always on-heap
1042
1043    private static int findCommonPrefixInRowPart(Cell left, KeyValue.KeyOnlyKeyValue right,
1044      int rowCommonPrefix) {
1045      if (left instanceof ByteBufferExtendedCell) {
1046        ByteBufferExtendedCell bbLeft = (ByteBufferExtendedCell) left;
1047        return ByteBufferUtils.findCommonPrefix(bbLeft.getRowByteBuffer(),
1048          bbLeft.getRowPosition() + rowCommonPrefix, left.getRowLength() - rowCommonPrefix,
1049          right.getRowArray(), right.getRowOffset() + rowCommonPrefix,
1050          right.getRowLength() - rowCommonPrefix);
1051      } else {
1052        return Bytes.findCommonPrefix(left.getRowArray(), right.getRowArray(),
1053          left.getRowLength() - rowCommonPrefix, right.getRowLength() - rowCommonPrefix,
1054          left.getRowOffset() + rowCommonPrefix, right.getRowOffset() + rowCommonPrefix);
1055      }
1056    }
1057
1058    private static int findCommonPrefixInFamilyPart(Cell left, KeyValue.KeyOnlyKeyValue right,
1059      int familyCommonPrefix) {
1060      if (left instanceof ByteBufferExtendedCell) {
1061        ByteBufferExtendedCell bbLeft = (ByteBufferExtendedCell) left;
1062        return ByteBufferUtils.findCommonPrefix(bbLeft.getFamilyByteBuffer(),
1063          bbLeft.getFamilyPosition() + familyCommonPrefix,
1064          left.getFamilyLength() - familyCommonPrefix, right.getFamilyArray(),
1065          right.getFamilyOffset() + familyCommonPrefix,
1066          right.getFamilyLength() - familyCommonPrefix);
1067      } else {
1068        return Bytes.findCommonPrefix(left.getFamilyArray(), right.getFamilyArray(),
1069          left.getFamilyLength() - familyCommonPrefix, right.getFamilyLength() - familyCommonPrefix,
1070          left.getFamilyOffset() + familyCommonPrefix,
1071          right.getFamilyOffset() + familyCommonPrefix);
1072      }
1073    }
1074
1075    private static int findCommonPrefixInQualifierPart(Cell left, KeyValue.KeyOnlyKeyValue right,
1076      int qualifierCommonPrefix) {
1077      if (left instanceof ByteBufferExtendedCell) {
1078        ByteBufferExtendedCell bbLeft = (ByteBufferExtendedCell) left;
1079        return ByteBufferUtils.findCommonPrefix(bbLeft.getQualifierByteBuffer(),
1080          bbLeft.getQualifierPosition() + qualifierCommonPrefix,
1081          left.getQualifierLength() - qualifierCommonPrefix, right.getQualifierArray(),
1082          right.getQualifierOffset() + qualifierCommonPrefix,
1083          right.getQualifierLength() - qualifierCommonPrefix);
1084      } else {
1085        return Bytes.findCommonPrefix(left.getQualifierArray(), right.getQualifierArray(),
1086          left.getQualifierLength() - qualifierCommonPrefix,
1087          right.getQualifierLength() - qualifierCommonPrefix,
1088          left.getQualifierOffset() + qualifierCommonPrefix,
1089          right.getQualifierOffset() + qualifierCommonPrefix);
1090      }
1091    }
1092
1093    private void moveToPrevious() {
1094      if (!previous.isValid()) {
1095        throw new IllegalStateException(
1096          "Can move back only once and not in first key in the block.");
1097      }
1098
1099      STATE tmp = previous;
1100      previous = current;
1101      current = tmp;
1102
1103      // move after last key value
1104      currentBuffer.position(current.nextKvOffset);
1105      // Already decoded the tag bytes. We cache this tags into current state and also the total
1106      // compressed length of the tags bytes. For the next time decodeNext() we don't need to decode
1107      // the tags again. This might pollute the Data Dictionary what we use for the compression.
1108      // When current.uncompressTags is false, we will just reuse the current.tagsBuffer and skip
1109      // 'tagsCompressedLength' bytes of source stream.
1110      // See in decodeTags()
1111      current.tagsBuffer = previous.tagsBuffer;
1112      current.tagsCompressedLength = previous.tagsCompressedLength;
1113      current.uncompressTags = false;
1114      // The current key has to be reset with the previous Cell
1115      current.setKey(current.keyBuffer, current.memstoreTS);
1116      previous.invalidate();
1117    }
1118
1119    @SuppressWarnings("unchecked")
1120    protected STATE createSeekerState() {
1121      // This will fail for non-default seeker state if the subclass does not
1122      // override this method.
1123      return (STATE) new SeekerState(this.tmpPair, this.includesTags());
1124    }
1125
1126    abstract protected void decodeFirst();
1127
1128    abstract protected void decodeNext();
1129  }
1130
1131  /** Returns unencoded size added */
1132  protected final int afterEncodingKeyValue(Cell cell, DataOutputStream out,
1133    HFileBlockDefaultEncodingContext encodingCtx) throws IOException {
1134    int size = 0;
1135    if (encodingCtx.getHFileContext().isIncludesTags()) {
1136      int tagsLength = cell.getTagsLength();
1137      ByteBufferUtils.putCompressedInt(out, tagsLength);
1138      // There are some tags to be written
1139      if (tagsLength > 0) {
1140        TagCompressionContext tagCompressionContext = encodingCtx.getTagCompressionContext();
1141        // When tag compression is enabled, tagCompressionContext will have a not null value. Write
1142        // the tags using Dictionary compression in such a case
1143        if (tagCompressionContext != null) {
1144          // Not passing tagsLength considering that parsing of the tagsLength is not costly
1145          PrivateCellUtil.compressTags(out, cell, tagCompressionContext);
1146        } else {
1147          PrivateCellUtil.writeTags(out, cell, tagsLength);
1148        }
1149      }
1150      size += tagsLength + KeyValue.TAGS_LENGTH_SIZE;
1151    }
1152    if (encodingCtx.getHFileContext().isIncludesMvcc()) {
1153      // Copy memstore timestamp from the byte buffer to the output stream.
1154      long memstoreTS = cell.getSequenceId();
1155      WritableUtils.writeVLong(out, memstoreTS);
1156      // TODO use a writeVLong which returns the #bytes written so that 2 time parsing can be
1157      // avoided.
1158      size += WritableUtils.getVIntSize(memstoreTS);
1159    }
1160    return size;
1161  }
1162
1163  protected final void afterDecodingKeyValue(DataInputStream source, ByteBuffer dest,
1164    HFileBlockDefaultDecodingContext decodingCtx) throws IOException {
1165    if (decodingCtx.getHFileContext().isIncludesTags()) {
1166      int tagsLength = ByteBufferUtils.readCompressedInt(source);
1167      // Put as unsigned short
1168      dest.put((byte) ((tagsLength >> 8) & 0xff));
1169      dest.put((byte) (tagsLength & 0xff));
1170      if (tagsLength > 0) {
1171        TagCompressionContext tagCompressionContext = decodingCtx.getTagCompressionContext();
1172        // When tag compression is been used in this file, tagCompressionContext will have a not
1173        // null value passed.
1174        if (tagCompressionContext != null) {
1175          tagCompressionContext.uncompressTags(source, dest, tagsLength);
1176        } else {
1177          ByteBufferUtils.copyFromStreamToBuffer(dest, source, tagsLength);
1178        }
1179      }
1180    }
1181    if (decodingCtx.getHFileContext().isIncludesMvcc()) {
1182      long memstoreTS = -1;
1183      try {
1184        // Copy memstore timestamp from the data input stream to the byte
1185        // buffer.
1186        memstoreTS = WritableUtils.readVLong(source);
1187        ByteBufferUtils.writeVLong(dest, memstoreTS);
1188      } catch (IOException ex) {
1189        throw new RuntimeException(
1190          "Unable to copy memstore timestamp " + memstoreTS + " after decoding a key/value");
1191      }
1192    }
1193  }
1194
1195  protected abstract ByteBuffer internalDecodeKeyValues(DataInputStream source,
1196    int allocateHeaderLength, int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx)
1197    throws IOException;
1198
1199  /**
1200   * Asserts that there is at least the given amount of unfilled space remaining in the given
1201   * buffer.
1202   * @param out    typically, the buffer we are writing to
1203   * @param length the required space in the buffer
1204   * @throws EncoderBufferTooSmallException If there are no enough bytes.
1205   */
1206  protected static void ensureSpace(ByteBuffer out, int length)
1207    throws EncoderBufferTooSmallException {
1208    if (out.position() + length > out.limit()) {
1209      throw new EncoderBufferTooSmallException("Buffer position=" + out.position()
1210        + ", buffer limit=" + out.limit() + ", length to be written=" + length);
1211    }
1212  }
1213
1214  @Override
1215  public void startBlockEncoding(HFileBlockEncodingContext blkEncodingCtx, DataOutputStream out)
1216    throws IOException {
1217    if (blkEncodingCtx.getClass() != HFileBlockDefaultEncodingContext.class) {
1218      throw new IOException(this.getClass().getName() + " only accepts "
1219        + HFileBlockDefaultEncodingContext.class.getName() + " as the " + "encoding context.");
1220    }
1221
1222    HFileBlockDefaultEncodingContext encodingCtx =
1223      (HFileBlockDefaultEncodingContext) blkEncodingCtx;
1224    encodingCtx.prepareEncoding(out);
1225    if (
1226      encodingCtx.getHFileContext().isIncludesTags()
1227        && encodingCtx.getHFileContext().isCompressTags()
1228    ) {
1229      if (encodingCtx.getTagCompressionContext() != null) {
1230        // It will be overhead to create the TagCompressionContext again and again for every block
1231        // encoding.
1232        encodingCtx.getTagCompressionContext().clear();
1233      } else {
1234        try {
1235          TagCompressionContext tagCompressionContext =
1236            new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
1237          encodingCtx.setTagCompressionContext(tagCompressionContext);
1238        } catch (Exception e) {
1239          throw new IOException("Failed to initialize TagCompressionContext", e);
1240        }
1241      }
1242    }
1243    StreamUtils.writeInt(out, 0); // DUMMY length. This will be updated in endBlockEncoding()
1244    blkEncodingCtx.setEncodingState(new EncodingState());
1245  }
1246
1247  @Override
1248  public void encode(Cell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
1249    throws IOException {
1250    EncodingState state = encodingCtx.getEncodingState();
1251    int posBeforeEncode = out.size();
1252    int encodedKvSize = internalEncode(cell, (HFileBlockDefaultEncodingContext) encodingCtx, out);
1253    state.postCellEncode(encodedKvSize, out.size() - posBeforeEncode);
1254  }
1255
1256  public abstract int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingCtx,
1257    DataOutputStream out) throws IOException;
1258
1259  @Override
1260  public void endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out,
1261    byte[] uncompressedBytesWithHeader) throws IOException {
1262    EncodingState state = encodingCtx.getEncodingState();
1263    // Write the unencodedDataSizeWritten (with header size)
1264    Bytes.putInt(uncompressedBytesWithHeader,
1265      HConstants.HFILEBLOCK_HEADER_SIZE + DataBlockEncoding.ID_SIZE,
1266      state.getUnencodedDataSizeWritten());
1267    postEncoding(encodingCtx);
1268  }
1269
1270}