001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.encoding;
019
020import java.io.DataInputStream;
021import java.io.DataOutputStream;
022import java.io.IOException;
023import java.io.OutputStream;
024import java.nio.ByteBuffer;
025import org.apache.hadoop.hbase.ByteBufferExtendedCell;
026import org.apache.hadoop.hbase.Cell;
027import org.apache.hadoop.hbase.CellComparator;
028import org.apache.hadoop.hbase.CellUtil;
029import org.apache.hadoop.hbase.ExtendedCell;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.KeyValue;
032import org.apache.hadoop.hbase.KeyValue.Type;
033import org.apache.hadoop.hbase.KeyValueUtil;
034import org.apache.hadoop.hbase.PrivateCellUtil;
035import org.apache.hadoop.hbase.io.TagCompressionContext;
036import org.apache.hadoop.hbase.io.util.LRUDictionary;
037import org.apache.hadoop.hbase.io.util.StreamUtils;
038import org.apache.hadoop.hbase.nio.ByteBuff;
039import org.apache.hadoop.hbase.util.ByteBufferUtils;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.apache.hadoop.hbase.util.ClassSize;
042import org.apache.hadoop.hbase.util.ObjectIntPair;
043import org.apache.hadoop.io.WritableUtils;
044import org.apache.yetus.audience.InterfaceAudience;
045
046/**
047 * Base class for all data block encoders that use a buffer.
048 */
049@InterfaceAudience.Private
050abstract class BufferedDataBlockEncoder extends AbstractDataBlockEncoder {
051  /**
052   * TODO: This datablockencoder is dealing in internals of hfileblocks. Purge reference to HFBs
053   */
054  private static int INITIAL_KEY_BUFFER_SIZE = 512;
055
056  @Override
057  public ByteBuffer decodeKeyValues(DataInputStream source,
058    HFileBlockDecodingContext blkDecodingCtx) throws IOException {
059    if (blkDecodingCtx.getClass() != HFileBlockDefaultDecodingContext.class) {
060      throw new IOException(this.getClass().getName() + " only accepts "
061        + HFileBlockDefaultDecodingContext.class.getName() + " as the decoding context.");
062    }
063
064    HFileBlockDefaultDecodingContext decodingCtx =
065      (HFileBlockDefaultDecodingContext) blkDecodingCtx;
066    if (
067      decodingCtx.getHFileContext().isIncludesTags()
068        && decodingCtx.getHFileContext().isCompressTags()
069    ) {
070      if (decodingCtx.getTagCompressionContext() != null) {
071        // It will be overhead to create the TagCompressionContext again and again for every block
072        // decoding.
073        decodingCtx.getTagCompressionContext().clear();
074      } else {
075        try {
076          TagCompressionContext tagCompressionContext =
077            new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
078          decodingCtx.setTagCompressionContext(tagCompressionContext);
079        } catch (Exception e) {
080          throw new IOException("Failed to initialize TagCompressionContext", e);
081        }
082      }
083    }
084    return internalDecodeKeyValues(source, 0, 0, decodingCtx);
085  }
086
087  /********************* common prefixes *************************/
088  // Having this as static is fine but if META is having DBE then we should
089  // change this.
090  public static int compareCommonRowPrefix(Cell left, Cell right, int rowCommonPrefix) {
091    return Bytes.compareTo(left.getRowArray(), left.getRowOffset() + rowCommonPrefix,
092      left.getRowLength() - rowCommonPrefix, right.getRowArray(),
093      right.getRowOffset() + rowCommonPrefix, right.getRowLength() - rowCommonPrefix);
094  }
095
096  public static int compareCommonFamilyPrefix(Cell left, Cell right, int familyCommonPrefix) {
097    return Bytes.compareTo(left.getFamilyArray(), left.getFamilyOffset() + familyCommonPrefix,
098      left.getFamilyLength() - familyCommonPrefix, right.getFamilyArray(),
099      right.getFamilyOffset() + familyCommonPrefix, right.getFamilyLength() - familyCommonPrefix);
100  }
101
102  public static int compareCommonQualifierPrefix(Cell left, Cell right, int qualCommonPrefix) {
103    return Bytes.compareTo(left.getQualifierArray(), left.getQualifierOffset() + qualCommonPrefix,
104      left.getQualifierLength() - qualCommonPrefix, right.getQualifierArray(),
105      right.getQualifierOffset() + qualCommonPrefix, right.getQualifierLength() - qualCommonPrefix);
106  }
107
108  protected static class SeekerState {
109    protected ByteBuff currentBuffer;
110    protected TagCompressionContext tagCompressionContext;
111    protected int valueOffset = -1;
112    protected int keyLength;
113    protected int valueLength;
114    protected int lastCommonPrefix;
115    protected int tagsLength = 0;
116    protected int tagsOffset = -1;
117    protected int tagsCompressedLength = 0;
118    protected boolean uncompressTags = true;
119
120    /** We need to store a copy of the key. */
121    protected byte[] keyBuffer = HConstants.EMPTY_BYTE_ARRAY;
122    protected byte[] tagsBuffer = HConstants.EMPTY_BYTE_ARRAY;
123
124    protected long memstoreTS;
125    protected int nextKvOffset;
126    protected KeyValue.KeyOnlyKeyValue currentKey = new KeyValue.KeyOnlyKeyValue();
127    // A temp pair object which will be reused by ByteBuff#asSubByteBuffer calls. This avoids too
128    // many object creations.
129    private final ObjectIntPair<ByteBuffer> tmpPair;
130    private final boolean includeTags;
131
132    public SeekerState(ObjectIntPair<ByteBuffer> tmpPair, boolean includeTags) {
133      this.tmpPair = tmpPair;
134      this.includeTags = includeTags;
135    }
136
137    protected boolean isValid() {
138      return valueOffset != -1;
139    }
140
141    protected void invalidate() {
142      valueOffset = -1;
143      tagsCompressedLength = 0;
144      currentKey.clear();
145      uncompressTags = true;
146      currentBuffer = null;
147    }
148
149    protected void ensureSpaceForKey() {
150      if (keyLength > keyBuffer.length) {
151        int newKeyBufferLength =
152          Integer.highestOneBit(Math.max(INITIAL_KEY_BUFFER_SIZE, keyLength) - 1) << 1;
153        byte[] newKeyBuffer = new byte[newKeyBufferLength];
154        System.arraycopy(keyBuffer, 0, newKeyBuffer, 0, keyBuffer.length);
155        keyBuffer = newKeyBuffer;
156      }
157    }
158
159    protected void ensureSpaceForTags() {
160      if (tagsLength > tagsBuffer.length) {
161        int newTagsBufferLength =
162          Integer.highestOneBit(Math.max(INITIAL_KEY_BUFFER_SIZE, tagsLength) - 1) << 1;
163        byte[] newTagsBuffer = new byte[newTagsBufferLength];
164        System.arraycopy(tagsBuffer, 0, newTagsBuffer, 0, tagsBuffer.length);
165        tagsBuffer = newTagsBuffer;
166      }
167    }
168
169    protected void setKey(byte[] keyBuffer, long memTS) {
170      currentKey.setKey(keyBuffer, 0, keyLength);
171      memstoreTS = memTS;
172    }
173
174    /**
175     * Copy the state from the next one into this instance (the previous state placeholder). Used to
176     * save the previous state when we are advancing the seeker to the next key/value.
177     */
178    protected void copyFromNext(SeekerState nextState) {
179      if (keyBuffer.length != nextState.keyBuffer.length) {
180        keyBuffer = nextState.keyBuffer.clone();
181      } else if (!isValid()) {
182        // Note: we can only call isValid before we override our state, so this
183        // comes before all the assignments at the end of this method.
184        System.arraycopy(nextState.keyBuffer, 0, keyBuffer, 0, nextState.keyLength);
185      } else {
186        // don't copy the common prefix between this key and the previous one
187        System.arraycopy(nextState.keyBuffer, nextState.lastCommonPrefix, keyBuffer,
188          nextState.lastCommonPrefix, nextState.keyLength - nextState.lastCommonPrefix);
189      }
190      currentKey.set(nextState.currentKey);
191
192      valueOffset = nextState.valueOffset;
193      keyLength = nextState.keyLength;
194      valueLength = nextState.valueLength;
195      lastCommonPrefix = nextState.lastCommonPrefix;
196      nextKvOffset = nextState.nextKvOffset;
197      memstoreTS = nextState.memstoreTS;
198      currentBuffer = nextState.currentBuffer;
199      tagsOffset = nextState.tagsOffset;
200      tagsLength = nextState.tagsLength;
201      if (nextState.tagCompressionContext != null) {
202        tagCompressionContext = nextState.tagCompressionContext;
203      }
204    }
205
206    public Cell toCell() {
207      // Buffer backing the value and tags part from the HFileBlock's buffer
208      // When tag compression in use, this will be only the value bytes area.
209      ByteBuffer valAndTagsBuffer;
210      int vOffset;
211      int valAndTagsLength = this.valueLength;
212      int tagsLenSerializationSize = 0;
213      if (this.includeTags && this.tagCompressionContext == null) {
214        // Include the tags part also. This will be the tags bytes + 2 bytes of for storing tags
215        // length
216        tagsLenSerializationSize = this.tagsOffset - (this.valueOffset + this.valueLength);
217        valAndTagsLength += tagsLenSerializationSize + this.tagsLength;
218      }
219      this.currentBuffer.asSubByteBuffer(this.valueOffset, valAndTagsLength, this.tmpPair);
220      valAndTagsBuffer = this.tmpPair.getFirst();
221      vOffset = this.tmpPair.getSecond();// This is the offset to value part in the BB
222      if (valAndTagsBuffer.hasArray()) {
223        return toOnheapCell(valAndTagsBuffer, vOffset, tagsLenSerializationSize);
224      } else {
225        return toOffheapCell(valAndTagsBuffer, vOffset, tagsLenSerializationSize);
226      }
227    }
228
229    private Cell toOnheapCell(ByteBuffer valAndTagsBuffer, int vOffset,
230      int tagsLenSerializationSize) {
231      byte[] tagsArray = HConstants.EMPTY_BYTE_ARRAY;
232      int tOffset = 0;
233      if (this.includeTags) {
234        if (this.tagCompressionContext == null) {
235          tOffset =
236            valAndTagsBuffer.arrayOffset() + vOffset + this.valueLength + tagsLenSerializationSize;
237          tagsArray = Bytes.copy(valAndTagsBuffer.array(), tOffset, this.tagsLength);
238          tOffset = 0;
239        } else {
240          tagsArray = Bytes.copy(tagsBuffer, 0, this.tagsLength);
241          tOffset = 0;
242        }
243      }
244      return new OnheapDecodedCell(Bytes.copy(keyBuffer, 0, this.keyLength),
245        currentKey.getRowLength(), currentKey.getFamilyOffset(), currentKey.getFamilyLength(),
246        currentKey.getQualifierOffset(), currentKey.getQualifierLength(), currentKey.getTimestamp(),
247        currentKey.getTypeByte(), Bytes.copy(valAndTagsBuffer.array(),
248          valAndTagsBuffer.arrayOffset() + vOffset, this.valueLength),
249        0, this.valueLength, memstoreTS, tagsArray, tOffset, this.tagsLength);
250    }
251
252    private Cell toOffheapCell(ByteBuffer valAndTagsBuffer, int vOffset,
253      int tagsLenSerializationSize) {
254      ByteBuffer tagsBuf = HConstants.EMPTY_BYTE_BUFFER;
255      int tOffset = 0;
256      if (this.includeTags) {
257        if (this.tagCompressionContext == null) {
258          tOffset = vOffset + this.valueLength + tagsLenSerializationSize;
259          byte[] output = new byte[this.tagsLength];
260          ByteBufferUtils.copyFromBufferToArray(output, valAndTagsBuffer, tOffset, 0,
261            this.tagsLength);
262          tagsBuf = ByteBuffer.wrap(output);
263          tOffset = 0;
264        } else {
265          tagsBuf = ByteBuffer.wrap(Bytes.copy(tagsBuffer, 0, this.tagsLength));
266          tOffset = 0;
267        }
268      }
269
270      if (this.valueLength > 0) {
271        byte[] output = new byte[this.valueLength];
272        ByteBufferUtils.copyFromBufferToArray(output, valAndTagsBuffer, vOffset, 0,
273          this.valueLength);
274        valAndTagsBuffer = ByteBuffer.wrap(output);
275        vOffset = 0;
276      }
277
278      return new OffheapDecodedExtendedCell(
279        ByteBuffer.wrap(Bytes.copy(keyBuffer, 0, this.keyLength)), currentKey.getRowLength(),
280        currentKey.getFamilyOffset(), currentKey.getFamilyLength(), currentKey.getQualifierOffset(),
281        currentKey.getQualifierLength(), currentKey.getTimestamp(), currentKey.getTypeByte(),
282        valAndTagsBuffer, vOffset, this.valueLength, memstoreTS, tagsBuf, tOffset, this.tagsLength);
283    }
284  }
285
286  /**
287   * Copies only the key part of the keybuffer by doing a deep copy and passes the seeker state
288   * members for taking a clone. Note that the value byte[] part is still pointing to the
289   * currentBuffer and represented by the valueOffset and valueLength
290   */
291  // We return this as a Cell to the upper layers of read flow and might try setting a new SeqId
292  // there. So this has to be an instance of ExtendedCell.
293  protected static class OnheapDecodedCell implements ExtendedCell {
294    private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
295      + (3 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT)
296      + (Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) + (3 * ClassSize.ARRAY));
297    private byte[] keyOnlyBuffer;
298    private short rowLength;
299    private int familyOffset;
300    private byte familyLength;
301    private int qualifierOffset;
302    private int qualifierLength;
303    private long timestamp;
304    private byte typeByte;
305    private byte[] valueBuffer;
306    private int valueOffset;
307    private int valueLength;
308    private byte[] tagsBuffer;
309    private int tagsOffset;
310    private int tagsLength;
311    private long seqId;
312
313    protected OnheapDecodedCell(byte[] keyBuffer, short rowLength, int familyOffset,
314      byte familyLength, int qualOffset, int qualLength, long timeStamp, byte typeByte,
315      byte[] valueBuffer, int valueOffset, int valueLen, long seqId, byte[] tagsBuffer,
316      int tagsOffset, int tagsLength) {
317      this.keyOnlyBuffer = keyBuffer;
318      this.rowLength = rowLength;
319      this.familyOffset = familyOffset;
320      this.familyLength = familyLength;
321      this.qualifierOffset = qualOffset;
322      this.qualifierLength = qualLength;
323      this.timestamp = timeStamp;
324      this.typeByte = typeByte;
325      this.valueBuffer = valueBuffer;
326      this.valueOffset = valueOffset;
327      this.valueLength = valueLen;
328      this.tagsBuffer = tagsBuffer;
329      this.tagsOffset = tagsOffset;
330      this.tagsLength = tagsLength;
331      setSequenceId(seqId);
332    }
333
334    @Override
335    public byte[] getRowArray() {
336      return keyOnlyBuffer;
337    }
338
339    @Override
340    public byte[] getFamilyArray() {
341      return keyOnlyBuffer;
342    }
343
344    @Override
345    public byte[] getQualifierArray() {
346      return keyOnlyBuffer;
347    }
348
349    @Override
350    public int getRowOffset() {
351      return Bytes.SIZEOF_SHORT;
352    }
353
354    @Override
355    public short getRowLength() {
356      return rowLength;
357    }
358
359    @Override
360    public int getFamilyOffset() {
361      return familyOffset;
362    }
363
364    @Override
365    public byte getFamilyLength() {
366      return familyLength;
367    }
368
369    @Override
370    public int getQualifierOffset() {
371      return qualifierOffset;
372    }
373
374    @Override
375    public int getQualifierLength() {
376      return qualifierLength;
377    }
378
379    @Override
380    public long getTimestamp() {
381      return timestamp;
382    }
383
384    @Override
385    public byte getTypeByte() {
386      return typeByte;
387    }
388
389    @Override
390    public long getSequenceId() {
391      return seqId;
392    }
393
394    @Override
395    public byte[] getValueArray() {
396      return this.valueBuffer;
397    }
398
399    @Override
400    public int getValueOffset() {
401      return valueOffset;
402    }
403
404    @Override
405    public int getValueLength() {
406      return valueLength;
407    }
408
409    @Override
410    public byte[] getTagsArray() {
411      return this.tagsBuffer;
412    }
413
414    @Override
415    public int getTagsOffset() {
416      return this.tagsOffset;
417    }
418
419    @Override
420    public int getTagsLength() {
421      return tagsLength;
422    }
423
424    @Override
425    public String toString() {
426      return KeyValue.keyToString(this.keyOnlyBuffer, 0, KeyValueUtil.keyLength(this)) + "/vlen="
427        + getValueLength() + "/seqid=" + seqId;
428    }
429
430    @Override
431    public void setSequenceId(long seqId) {
432      this.seqId = seqId;
433    }
434
435    @Override
436    public long heapSize() {
437      return FIXED_OVERHEAD + rowLength + familyLength + qualifierLength + valueLength + tagsLength;
438    }
439
440    @Override
441    public int write(OutputStream out, boolean withTags) throws IOException {
442      int lenToWrite = getSerializedSize(withTags);
443      ByteBufferUtils.putInt(out, keyOnlyBuffer.length);
444      ByteBufferUtils.putInt(out, valueLength);
445      // Write key
446      out.write(keyOnlyBuffer);
447      // Write value
448      out.write(this.valueBuffer, this.valueOffset, this.valueLength);
449      if (withTags && this.tagsLength > 0) {
450        // 2 bytes tags length followed by tags bytes
451        // tags length is serialized with 2 bytes only(short way) even if the type is int.
452        // As this is non -ve numbers, we save the sign bit. See HBASE-11437
453        out.write((byte) (0xff & (this.tagsLength >> 8)));
454        out.write((byte) (0xff & this.tagsLength));
455        out.write(this.tagsBuffer, this.tagsOffset, this.tagsLength);
456      }
457      return lenToWrite;
458    }
459
460    @Override
461    public int getSerializedSize(boolean withTags) {
462      return KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength, tagsLength,
463        withTags);
464    }
465
466    @Override
467    public void write(ByteBuffer buf, int offset) {
468      // This is not used in actual flow. Throwing UnsupportedOperationException
469      throw new UnsupportedOperationException();
470    }
471
472    @Override
473    public void setTimestamp(long ts) throws IOException {
474      // This is not used in actual flow. Throwing UnsupportedOperationException
475      throw new UnsupportedOperationException();
476    }
477
478    @Override
479    public void setTimestamp(byte[] ts) throws IOException {
480      // This is not used in actual flow. Throwing UnsupportedOperationException
481      throw new UnsupportedOperationException();
482    }
483
484    @Override
485    public ExtendedCell deepClone() {
486      // This is not used in actual flow. Throwing UnsupportedOperationException
487      throw new UnsupportedOperationException();
488    }
489  }
490
491  protected static class OffheapDecodedExtendedCell extends ByteBufferExtendedCell {
492    private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
493      + (3 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT)
494      + (Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) + (3 * ClassSize.BYTE_BUFFER));
495    private ByteBuffer keyBuffer;
496    private short rowLength;
497    private int familyOffset;
498    private byte familyLength;
499    private int qualifierOffset;
500    private int qualifierLength;
501    private long timestamp;
502    private byte typeByte;
503    private ByteBuffer valueBuffer;
504    private int valueOffset;
505    private int valueLength;
506    private ByteBuffer tagsBuffer;
507    private int tagsOffset;
508    private int tagsLength;
509    private long seqId;
510
511    protected OffheapDecodedExtendedCell(ByteBuffer keyBuffer, short rowLength, int familyOffset,
512      byte familyLength, int qualOffset, int qualLength, long timeStamp, byte typeByte,
513      ByteBuffer valueBuffer, int valueOffset, int valueLen, long seqId, ByteBuffer tagsBuffer,
514      int tagsOffset, int tagsLength) {
515      // The keyBuffer is always onheap
516      assert keyBuffer.hasArray();
517      assert keyBuffer.arrayOffset() == 0;
518      this.keyBuffer = keyBuffer;
519      this.rowLength = rowLength;
520      this.familyOffset = familyOffset;
521      this.familyLength = familyLength;
522      this.qualifierOffset = qualOffset;
523      this.qualifierLength = qualLength;
524      this.timestamp = timeStamp;
525      this.typeByte = typeByte;
526      this.valueBuffer = valueBuffer;
527      this.valueOffset = valueOffset;
528      this.valueLength = valueLen;
529      this.tagsBuffer = tagsBuffer;
530      this.tagsOffset = tagsOffset;
531      this.tagsLength = tagsLength;
532      setSequenceId(seqId);
533    }
534
535    @Override
536    public byte[] getRowArray() {
537      return this.keyBuffer.array();
538    }
539
540    @Override
541    public int getRowOffset() {
542      return getRowPosition();
543    }
544
545    @Override
546    public short getRowLength() {
547      return this.rowLength;
548    }
549
550    @Override
551    public byte[] getFamilyArray() {
552      return this.keyBuffer.array();
553    }
554
555    @Override
556    public int getFamilyOffset() {
557      return getFamilyPosition();
558    }
559
560    @Override
561    public byte getFamilyLength() {
562      return this.familyLength;
563    }
564
565    @Override
566    public byte[] getQualifierArray() {
567      return this.keyBuffer.array();
568    }
569
570    @Override
571    public int getQualifierOffset() {
572      return getQualifierPosition();
573    }
574
575    @Override
576    public int getQualifierLength() {
577      return this.qualifierLength;
578    }
579
580    @Override
581    public long getTimestamp() {
582      return this.timestamp;
583    }
584
585    @Override
586    public byte getTypeByte() {
587      return this.typeByte;
588    }
589
590    @Override
591    public long getSequenceId() {
592      return this.seqId;
593    }
594
595    @Override
596    public byte[] getValueArray() {
597      return CellUtil.cloneValue(this);
598    }
599
600    @Override
601    public int getValueOffset() {
602      return 0;
603    }
604
605    @Override
606    public int getValueLength() {
607      return this.valueLength;
608    }
609
610    @Override
611    public byte[] getTagsArray() {
612      return PrivateCellUtil.cloneTags(this);
613    }
614
615    @Override
616    public int getTagsOffset() {
617      return 0;
618    }
619
620    @Override
621    public int getTagsLength() {
622      return this.tagsLength;
623    }
624
625    @Override
626    public ByteBuffer getRowByteBuffer() {
627      return this.keyBuffer;
628    }
629
630    @Override
631    public int getRowPosition() {
632      return Bytes.SIZEOF_SHORT;
633    }
634
635    @Override
636    public ByteBuffer getFamilyByteBuffer() {
637      return this.keyBuffer;
638    }
639
640    @Override
641    public int getFamilyPosition() {
642      return this.familyOffset;
643    }
644
645    @Override
646    public ByteBuffer getQualifierByteBuffer() {
647      return this.keyBuffer;
648    }
649
650    @Override
651    public int getQualifierPosition() {
652      return this.qualifierOffset;
653    }
654
655    @Override
656    public ByteBuffer getValueByteBuffer() {
657      return this.valueBuffer;
658    }
659
660    @Override
661    public int getValuePosition() {
662      return this.valueOffset;
663    }
664
665    @Override
666    public ByteBuffer getTagsByteBuffer() {
667      return this.tagsBuffer;
668    }
669
670    @Override
671    public int getTagsPosition() {
672      return this.tagsOffset;
673    }
674
675    @Override
676    public long heapSize() {
677      return FIXED_OVERHEAD;
678    }
679
680    @Override
681    public void setSequenceId(long seqId) {
682      this.seqId = seqId;
683    }
684
685    @Override
686    public int write(OutputStream out, boolean withTags) throws IOException {
687      int lenToWrite = getSerializedSize(withTags);
688      ByteBufferUtils.putInt(out, keyBuffer.capacity());
689      ByteBufferUtils.putInt(out, valueLength);
690      // Write key
691      out.write(keyBuffer.array());
692      // Write value
693      ByteBufferUtils.copyBufferToStream(out, this.valueBuffer, this.valueOffset, this.valueLength);
694      if (withTags && this.tagsLength > 0) {
695        // 2 bytes tags length followed by tags bytes
696        // tags length is serialized with 2 bytes only(short way) even if the type is int.
697        // As this is non -ve numbers, we save the sign bit. See HBASE-11437
698        out.write((byte) (0xff & (this.tagsLength >> 8)));
699        out.write((byte) (0xff & this.tagsLength));
700        ByteBufferUtils.copyBufferToStream(out, this.tagsBuffer, this.tagsOffset, this.tagsLength);
701      }
702      return lenToWrite;
703    }
704
705    @Override
706    public int getSerializedSize(boolean withTags) {
707      return KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength, tagsLength,
708        withTags);
709    }
710
711    @Override
712    public void setTimestamp(long ts) throws IOException {
713      // This is not used in actual flow. Throwing UnsupportedOperationException
714      throw new UnsupportedOperationException();
715    }
716
717    @Override
718    public void setTimestamp(byte[] ts) throws IOException {
719      // This is not used in actual flow. Throwing UnsupportedOperationException
720      throw new UnsupportedOperationException();
721    }
722
723    @Override
724    public void write(ByteBuffer buf, int offset) {
725      // This is not used in actual flow. Throwing UnsupportedOperationException
726      throw new UnsupportedOperationException();
727    }
728
729    @Override
730    public ExtendedCell deepClone() {
731      // This is not used in actual flow. Throwing UnsupportedOperationException
732      throw new UnsupportedOperationException();
733    }
734  }
735
736  protected abstract static class BufferedEncodedSeeker<STATE extends SeekerState>
737    extends AbstractEncodedSeeker {
738    protected ByteBuff currentBuffer;
739    protected TagCompressionContext tagCompressionContext = null;
740    protected KeyValue.KeyOnlyKeyValue keyOnlyKV = new KeyValue.KeyOnlyKeyValue();
741    // A temp pair object which will be reused by ByteBuff#asSubByteBuffer calls. This avoids too
742    // many object creations.
743    protected final ObjectIntPair<ByteBuffer> tmpPair = new ObjectIntPair<>();
744    protected STATE current, previous;
745
746    public BufferedEncodedSeeker(HFileBlockDecodingContext decodingCtx) {
747      super(decodingCtx);
748      if (decodingCtx.getHFileContext().isCompressTags()) {
749        try {
750          tagCompressionContext = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
751        } catch (Exception e) {
752          throw new RuntimeException("Failed to initialize TagCompressionContext", e);
753        }
754      }
755      current = createSeekerState(); // always valid
756      previous = createSeekerState(); // may not be valid
757    }
758
759    @Override
760    public int compareKey(CellComparator comparator, Cell key) {
761      keyOnlyKV.setKey(current.keyBuffer, 0, current.keyLength);
762      return PrivateCellUtil.compareKeyIgnoresMvcc(comparator, key, keyOnlyKV);
763    }
764
765    @Override
766    public void setCurrentBuffer(ByteBuff buffer) {
767      if (this.tagCompressionContext != null) {
768        this.tagCompressionContext.clear();
769      }
770      currentBuffer = buffer;
771      current.currentBuffer = currentBuffer;
772      if (tagCompressionContext != null) {
773        current.tagCompressionContext = tagCompressionContext;
774      }
775      decodeFirst();
776      current.setKey(current.keyBuffer, current.memstoreTS);
777      previous.invalidate();
778    }
779
780    @Override
781    public Cell getKey() {
782      byte[] key = new byte[current.keyLength];
783      System.arraycopy(current.keyBuffer, 0, key, 0, current.keyLength);
784      return new KeyValue.KeyOnlyKeyValue(key);
785    }
786
787    @Override
788    public ByteBuffer getValueShallowCopy() {
789      currentBuffer.asSubByteBuffer(current.valueOffset, current.valueLength, tmpPair);
790      ByteBuffer dup = tmpPair.getFirst().duplicate();
791      dup.position(tmpPair.getSecond());
792      dup.limit(tmpPair.getSecond() + current.valueLength);
793      return dup.slice();
794    }
795
796    @Override
797    public Cell getCell() {
798      return current.toCell();
799    }
800
801    @Override
802    public void rewind() {
803      currentBuffer.rewind();
804      if (tagCompressionContext != null) {
805        tagCompressionContext.clear();
806      }
807      decodeFirst();
808      current.setKey(current.keyBuffer, current.memstoreTS);
809      previous.invalidate();
810    }
811
812    @Override
813    public boolean next() {
814      if (!currentBuffer.hasRemaining()) {
815        return false;
816      }
817      decodeNext();
818      current.setKey(current.keyBuffer, current.memstoreTS);
819      previous.invalidate();
820      return true;
821    }
822
823    protected void decodeTags() {
824      current.tagsLength = ByteBuff.readCompressedInt(currentBuffer);
825      if (tagCompressionContext != null) {
826        if (current.uncompressTags) {
827          // Tag compression is been used. uncompress it into tagsBuffer
828          current.ensureSpaceForTags();
829          try {
830            current.tagsCompressedLength = tagCompressionContext.uncompressTags(currentBuffer,
831              current.tagsBuffer, 0, current.tagsLength);
832          } catch (IOException e) {
833            throw new RuntimeException("Exception while uncompressing tags", e);
834          }
835        } else {
836          currentBuffer.skip(current.tagsCompressedLength);
837          current.uncompressTags = true;// Reset this.
838        }
839        current.tagsOffset = -1;
840      } else {
841        // When tag compress is not used, let us not do copying of tags bytes into tagsBuffer.
842        // Just mark the tags Offset so as to create the KV buffer later in getKeyValueBuffer()
843        current.tagsOffset = currentBuffer.position();
844        currentBuffer.skip(current.tagsLength);
845      }
846    }
847
848    @Override
849    public int seekToKeyInBlock(Cell seekCell, boolean seekBefore) {
850      int rowCommonPrefix = 0;
851      int familyCommonPrefix = 0;
852      int qualCommonPrefix = 0;
853      previous.invalidate();
854      do {
855        int comp;
856        keyOnlyKV.setKey(current.keyBuffer, 0, current.keyLength);
857        if (current.lastCommonPrefix != 0) {
858          // The KV format has row key length also in the byte array. The
859          // common prefix
860          // includes it. So we need to subtract to find out the common prefix
861          // in the
862          // row part alone
863          rowCommonPrefix = Math.min(rowCommonPrefix, current.lastCommonPrefix - 2);
864        }
865        if (current.lastCommonPrefix <= 2) {
866          rowCommonPrefix = 0;
867        }
868        rowCommonPrefix += findCommonPrefixInRowPart(seekCell, keyOnlyKV, rowCommonPrefix);
869        comp = compareCommonRowPrefix(seekCell, keyOnlyKV, rowCommonPrefix);
870        if (comp == 0) {
871          comp = compareTypeBytes(seekCell, keyOnlyKV);
872          if (comp == 0) {
873            // Subtract the fixed row key length and the family key fixed length
874            familyCommonPrefix = Math.max(0, Math.min(familyCommonPrefix,
875              current.lastCommonPrefix - (3 + keyOnlyKV.getRowLength())));
876            familyCommonPrefix +=
877              findCommonPrefixInFamilyPart(seekCell, keyOnlyKV, familyCommonPrefix);
878            comp = compareCommonFamilyPrefix(seekCell, keyOnlyKV, familyCommonPrefix);
879            if (comp == 0) {
880              // subtract the rowkey fixed length and the family key fixed
881              // length
882              qualCommonPrefix = Math.max(0, Math.min(qualCommonPrefix, current.lastCommonPrefix
883                - (3 + keyOnlyKV.getRowLength() + keyOnlyKV.getFamilyLength())));
884              qualCommonPrefix +=
885                findCommonPrefixInQualifierPart(seekCell, keyOnlyKV, qualCommonPrefix);
886              comp = compareCommonQualifierPrefix(seekCell, keyOnlyKV, qualCommonPrefix);
887              if (comp == 0) {
888                comp = CellComparator.getInstance().compareTimestamps(seekCell, keyOnlyKV);
889                if (comp == 0) {
890                  // Compare types. Let the delete types sort ahead of puts;
891                  // i.e. types
892                  // of higher numbers sort before those of lesser numbers.
893                  // Maximum
894                  // (255)
895                  // appears ahead of everything, and minimum (0) appears
896                  // after
897                  // everything.
898                  comp = (0xff & keyOnlyKV.getTypeByte()) - (0xff & seekCell.getTypeByte());
899                }
900              }
901            }
902          }
903        }
904        if (comp == 0) { // exact match
905          if (seekBefore) {
906            if (!previous.isValid()) {
907              // The caller (seekBefore) has to ensure that we are not at the
908              // first key in the block.
909              throw new IllegalStateException(
910                "Cannot seekBefore if " + "positioned at the first key in the block: key="
911                  + Bytes.toStringBinary(seekCell.getRowArray()));
912            }
913            moveToPrevious();
914            return 1;
915          }
916          return 0;
917        }
918
919        if (comp < 0) { // already too large, check previous
920          if (previous.isValid()) {
921            moveToPrevious();
922          } else {
923            return HConstants.INDEX_KEY_MAGIC; // using optimized index key
924          }
925          return 1;
926        }
927
928        // move to next, if more data is available
929        if (currentBuffer.hasRemaining()) {
930          previous.copyFromNext(current);
931          decodeNext();
932          current.setKey(current.keyBuffer, current.memstoreTS);
933        } else {
934          break;
935        }
936      } while (true);
937
938      // we hit the end of the block, not an exact match
939      return 1;
940    }
941
942    private int compareTypeBytes(Cell key, Cell right) {
943      if (
944        key.getFamilyLength() + key.getQualifierLength() == 0
945          && key.getTypeByte() == Type.Minimum.getCode()
946      ) {
947        // left is "bigger", i.e. it appears later in the sorted order
948        return 1;
949      }
950      if (
951        right.getFamilyLength() + right.getQualifierLength() == 0
952          && right.getTypeByte() == Type.Minimum.getCode()
953      ) {
954        return -1;
955      }
956      return 0;
957    }
958
959    private static int findCommonPrefixInRowPart(Cell left, Cell right, int rowCommonPrefix) {
960      return Bytes.findCommonPrefix(left.getRowArray(), right.getRowArray(),
961        left.getRowLength() - rowCommonPrefix, right.getRowLength() - rowCommonPrefix,
962        left.getRowOffset() + rowCommonPrefix, right.getRowOffset() + rowCommonPrefix);
963    }
964
965    private static int findCommonPrefixInFamilyPart(Cell left, Cell right, int familyCommonPrefix) {
966      return Bytes.findCommonPrefix(left.getFamilyArray(), right.getFamilyArray(),
967        left.getFamilyLength() - familyCommonPrefix, right.getFamilyLength() - familyCommonPrefix,
968        left.getFamilyOffset() + familyCommonPrefix, right.getFamilyOffset() + familyCommonPrefix);
969    }
970
971    private static int findCommonPrefixInQualifierPart(Cell left, Cell right,
972      int qualifierCommonPrefix) {
973      return Bytes.findCommonPrefix(left.getQualifierArray(), right.getQualifierArray(),
974        left.getQualifierLength() - qualifierCommonPrefix,
975        right.getQualifierLength() - qualifierCommonPrefix,
976        left.getQualifierOffset() + qualifierCommonPrefix,
977        right.getQualifierOffset() + qualifierCommonPrefix);
978    }
979
980    private void moveToPrevious() {
981      if (!previous.isValid()) {
982        throw new IllegalStateException(
983          "Can move back only once and not in first key in the block.");
984      }
985
986      STATE tmp = previous;
987      previous = current;
988      current = tmp;
989
990      // move after last key value
991      currentBuffer.position(current.nextKvOffset);
992      // Already decoded the tag bytes. We cache this tags into current state and also the total
993      // compressed length of the tags bytes. For the next time decodeNext() we don't need to decode
994      // the tags again. This might pollute the Data Dictionary what we use for the compression.
995      // When current.uncompressTags is false, we will just reuse the current.tagsBuffer and skip
996      // 'tagsCompressedLength' bytes of source stream.
997      // See in decodeTags()
998      current.tagsBuffer = previous.tagsBuffer;
999      current.tagsCompressedLength = previous.tagsCompressedLength;
1000      current.uncompressTags = false;
1001      // The current key has to be reset with the previous Cell
1002      current.setKey(current.keyBuffer, current.memstoreTS);
1003      previous.invalidate();
1004    }
1005
1006    @SuppressWarnings("unchecked")
1007    protected STATE createSeekerState() {
1008      // This will fail for non-default seeker state if the subclass does not
1009      // override this method.
1010      return (STATE) new SeekerState(this.tmpPair, this.includesTags());
1011    }
1012
1013    abstract protected void decodeFirst();
1014
1015    abstract protected void decodeNext();
1016  }
1017
1018  /**
1019   * @return unencoded size added
1020   */
1021  protected final int afterEncodingKeyValue(Cell cell, DataOutputStream out,
1022    HFileBlockDefaultEncodingContext encodingCtx) throws IOException {
1023    int size = 0;
1024    if (encodingCtx.getHFileContext().isIncludesTags()) {
1025      int tagsLength = cell.getTagsLength();
1026      ByteBufferUtils.putCompressedInt(out, tagsLength);
1027      // There are some tags to be written
1028      if (tagsLength > 0) {
1029        TagCompressionContext tagCompressionContext = encodingCtx.getTagCompressionContext();
1030        // When tag compression is enabled, tagCompressionContext will have a not null value. Write
1031        // the tags using Dictionary compression in such a case
1032        if (tagCompressionContext != null) {
1033          // Not passing tagsLength considering that parsing of the tagsLength is not costly
1034          PrivateCellUtil.compressTags(out, cell, tagCompressionContext);
1035        } else {
1036          PrivateCellUtil.writeTags(out, cell, tagsLength);
1037        }
1038      }
1039      size += tagsLength + KeyValue.TAGS_LENGTH_SIZE;
1040    }
1041    if (encodingCtx.getHFileContext().isIncludesMvcc()) {
1042      // Copy memstore timestamp from the byte buffer to the output stream.
1043      long memstoreTS = cell.getSequenceId();
1044      WritableUtils.writeVLong(out, memstoreTS);
1045      // TODO use a writeVLong which returns the #bytes written so that 2 time parsing can be
1046      // avoided.
1047      size += WritableUtils.getVIntSize(memstoreTS);
1048    }
1049    return size;
1050  }
1051
1052  protected final void afterDecodingKeyValue(DataInputStream source, ByteBuffer dest,
1053    HFileBlockDefaultDecodingContext decodingCtx) throws IOException {
1054    if (decodingCtx.getHFileContext().isIncludesTags()) {
1055      int tagsLength = ByteBufferUtils.readCompressedInt(source);
1056      // Put as unsigned short
1057      dest.put((byte) ((tagsLength >> 8) & 0xff));
1058      dest.put((byte) (tagsLength & 0xff));
1059      if (tagsLength > 0) {
1060        TagCompressionContext tagCompressionContext = decodingCtx.getTagCompressionContext();
1061        // When tag compression is been used in this file, tagCompressionContext will have a not
1062        // null value passed.
1063        if (tagCompressionContext != null) {
1064          tagCompressionContext.uncompressTags(source, dest, tagsLength);
1065        } else {
1066          ByteBufferUtils.copyFromStreamToBuffer(dest, source, tagsLength);
1067        }
1068      }
1069    }
1070    if (decodingCtx.getHFileContext().isIncludesMvcc()) {
1071      long memstoreTS = -1;
1072      try {
1073        // Copy memstore timestamp from the data input stream to the byte
1074        // buffer.
1075        memstoreTS = WritableUtils.readVLong(source);
1076        ByteBufferUtils.writeVLong(dest, memstoreTS);
1077      } catch (IOException ex) {
1078        throw new RuntimeException(
1079          "Unable to copy memstore timestamp " + memstoreTS + " after decoding a key/value");
1080      }
1081    }
1082  }
1083
1084  protected abstract ByteBuffer internalDecodeKeyValues(DataInputStream source,
1085    int allocateHeaderLength, int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx)
1086    throws IOException;
1087
1088  /**
1089   * Asserts that there is at least the given amount of unfilled space remaining in the given
1090   * buffer.
1091   * @param out    typically, the buffer we are writing to
1092   * @param length the required space in the buffer
1093   * @throws EncoderBufferTooSmallException If there are no enough bytes.
1094   */
1095  protected static void ensureSpace(ByteBuffer out, int length)
1096    throws EncoderBufferTooSmallException {
1097    if (out.position() + length > out.limit()) {
1098      throw new EncoderBufferTooSmallException("Buffer position=" + out.position()
1099        + ", buffer limit=" + out.limit() + ", length to be written=" + length);
1100    }
1101  }
1102
1103  @Override
1104  public void startBlockEncoding(HFileBlockEncodingContext blkEncodingCtx, DataOutputStream out)
1105    throws IOException {
1106    if (blkEncodingCtx.getClass() != HFileBlockDefaultEncodingContext.class) {
1107      throw new IOException(this.getClass().getName() + " only accepts "
1108        + HFileBlockDefaultEncodingContext.class.getName() + " as the " + "encoding context.");
1109    }
1110
1111    HFileBlockDefaultEncodingContext encodingCtx =
1112      (HFileBlockDefaultEncodingContext) blkEncodingCtx;
1113    encodingCtx.prepareEncoding(out);
1114    if (
1115      encodingCtx.getHFileContext().isIncludesTags()
1116        && encodingCtx.getHFileContext().isCompressTags()
1117    ) {
1118      if (encodingCtx.getTagCompressionContext() != null) {
1119        // It will be overhead to create the TagCompressionContext again and again for every block
1120        // encoding.
1121        encodingCtx.getTagCompressionContext().clear();
1122      } else {
1123        try {
1124          TagCompressionContext tagCompressionContext =
1125            new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
1126          encodingCtx.setTagCompressionContext(tagCompressionContext);
1127        } catch (Exception e) {
1128          throw new IOException("Failed to initialize TagCompressionContext", e);
1129        }
1130      }
1131    }
1132    StreamUtils.writeInt(out, 0); // DUMMY length. This will be updated in endBlockEncoding()
1133    blkEncodingCtx.setEncodingState(new EncodingState());
1134  }
1135
1136  @Override
1137  public void encode(Cell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
1138    throws IOException {
1139    EncodingState state = encodingCtx.getEncodingState();
1140    int posBeforeEncode = out.size();
1141    int encodedKvSize = internalEncode(cell, (HFileBlockDefaultEncodingContext) encodingCtx, out);
1142    state.postCellEncode(encodedKvSize, out.size() - posBeforeEncode);
1143  }
1144
1145  public abstract int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingCtx,
1146    DataOutputStream out) throws IOException;
1147
1148  @Override
1149  public void endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out,
1150    byte[] uncompressedBytesWithHeader) throws IOException {
1151    EncodingState state = encodingCtx.getEncodingState();
1152    // Write the unencodedDataSizeWritten (with header size)
1153    Bytes.putInt(uncompressedBytesWithHeader,
1154      HConstants.HFILEBLOCK_HEADER_SIZE + DataBlockEncoding.ID_SIZE,
1155      state.getUnencodedDataSizeWritten());
1156    postEncoding(encodingCtx);
1157  }
1158
1159}