001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.encoding;
019
020import java.io.DataInputStream;
021import java.io.DataOutputStream;
022import java.io.IOException;
023import java.io.OutputStream;
024import java.nio.ByteBuffer;
025import org.apache.hadoop.hbase.ByteBufferExtendedCell;
026import org.apache.hadoop.hbase.Cell;
027import org.apache.hadoop.hbase.CellComparator;
028import org.apache.hadoop.hbase.CellUtil;
029import org.apache.hadoop.hbase.ExtendedCell;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.KeyValue;
032import org.apache.hadoop.hbase.KeyValueUtil;
033import org.apache.hadoop.hbase.PrivateCellUtil;
034import org.apache.hadoop.hbase.io.TagCompressionContext;
035import org.apache.hadoop.hbase.io.util.LRUDictionary;
036import org.apache.hadoop.hbase.io.util.StreamUtils;
037import org.apache.hadoop.hbase.nio.ByteBuff;
038import org.apache.hadoop.hbase.util.ByteBufferUtils;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.apache.hadoop.hbase.util.ClassSize;
041import org.apache.hadoop.hbase.util.ObjectIntPair;
042import org.apache.hadoop.io.WritableUtils;
043import org.apache.yetus.audience.InterfaceAudience;
044
045/**
046 * Base class for all data block encoders that use a buffer.
047 */
048@InterfaceAudience.Private
049abstract class BufferedDataBlockEncoder extends AbstractDataBlockEncoder {
050  /**
051   * TODO: This datablockencoder is dealing in internals of hfileblocks. Purge reference to HFBs
052   */
053  private static int INITIAL_KEY_BUFFER_SIZE = 512;
054
055  @Override
056  public ByteBuffer decodeKeyValues(DataInputStream source,
057    HFileBlockDecodingContext blkDecodingCtx) throws IOException {
058    if (blkDecodingCtx.getClass() != HFileBlockDefaultDecodingContext.class) {
059      throw new IOException(this.getClass().getName() + " only accepts "
060        + HFileBlockDefaultDecodingContext.class.getName() + " as the decoding context.");
061    }
062
063    HFileBlockDefaultDecodingContext decodingCtx =
064      (HFileBlockDefaultDecodingContext) blkDecodingCtx;
065    if (
066      decodingCtx.getHFileContext().isIncludesTags()
067        && decodingCtx.getHFileContext().isCompressTags()
068    ) {
069      if (decodingCtx.getTagCompressionContext() != null) {
070        // It will be overhead to create the TagCompressionContext again and again for every block
071        // decoding.
072        decodingCtx.getTagCompressionContext().clear();
073      } else {
074        try {
075          TagCompressionContext tagCompressionContext =
076            new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
077          decodingCtx.setTagCompressionContext(tagCompressionContext);
078        } catch (Exception e) {
079          throw new IOException("Failed to initialize TagCompressionContext", e);
080        }
081      }
082    }
083    return internalDecodeKeyValues(source, 0, 0, decodingCtx);
084  }
085
086  /********************* common prefixes *************************/
087  // Having this as static is fine but if META is having DBE then we should
088  // change this.
089  public static int compareCommonRowPrefix(Cell left, Cell right, int rowCommonPrefix) {
090    return Bytes.compareTo(left.getRowArray(), left.getRowOffset() + rowCommonPrefix,
091      left.getRowLength() - rowCommonPrefix, right.getRowArray(),
092      right.getRowOffset() + rowCommonPrefix, right.getRowLength() - rowCommonPrefix);
093  }
094
095  public static int compareCommonFamilyPrefix(Cell left, Cell right, int familyCommonPrefix) {
096    return Bytes.compareTo(left.getFamilyArray(), left.getFamilyOffset() + familyCommonPrefix,
097      left.getFamilyLength() - familyCommonPrefix, right.getFamilyArray(),
098      right.getFamilyOffset() + familyCommonPrefix, right.getFamilyLength() - familyCommonPrefix);
099  }
100
101  public static int compareCommonQualifierPrefix(Cell left, Cell right, int qualCommonPrefix) {
102    return Bytes.compareTo(left.getQualifierArray(), left.getQualifierOffset() + qualCommonPrefix,
103      left.getQualifierLength() - qualCommonPrefix, right.getQualifierArray(),
104      right.getQualifierOffset() + qualCommonPrefix, right.getQualifierLength() - qualCommonPrefix);
105  }
106
107  protected static class SeekerState {
108    protected ByteBuff currentBuffer;
109    protected TagCompressionContext tagCompressionContext;
110    protected int valueOffset = -1;
111    protected int keyLength;
112    protected int valueLength;
113    protected int lastCommonPrefix;
114    protected int tagsLength = 0;
115    protected int tagsOffset = -1;
116    protected int tagsCompressedLength = 0;
117    protected boolean uncompressTags = true;
118
119    /** We need to store a copy of the key. */
120    protected byte[] keyBuffer = HConstants.EMPTY_BYTE_ARRAY;
121    protected byte[] tagsBuffer = HConstants.EMPTY_BYTE_ARRAY;
122
123    protected long memstoreTS;
124    protected int nextKvOffset;
125    protected KeyValue.KeyOnlyKeyValue currentKey = new KeyValue.KeyOnlyKeyValue();
126    // A temp pair object which will be reused by ByteBuff#asSubByteBuffer calls. This avoids too
127    // many object creations.
128    private final ObjectIntPair<ByteBuffer> tmpPair;
129    private final boolean includeTags;
130
131    public SeekerState(ObjectIntPair<ByteBuffer> tmpPair, boolean includeTags) {
132      this.tmpPair = tmpPair;
133      this.includeTags = includeTags;
134    }
135
136    protected boolean isValid() {
137      return valueOffset != -1;
138    }
139
140    protected void invalidate() {
141      valueOffset = -1;
142      tagsCompressedLength = 0;
143      currentKey.clear();
144      uncompressTags = true;
145      currentBuffer = null;
146    }
147
148    protected void ensureSpaceForKey() {
149      if (keyLength > keyBuffer.length) {
150        int newKeyBufferLength =
151          Integer.highestOneBit(Math.max(INITIAL_KEY_BUFFER_SIZE, keyLength) - 1) << 1;
152        byte[] newKeyBuffer = new byte[newKeyBufferLength];
153        System.arraycopy(keyBuffer, 0, newKeyBuffer, 0, keyBuffer.length);
154        keyBuffer = newKeyBuffer;
155      }
156    }
157
158    protected void ensureSpaceForTags() {
159      if (tagsLength > tagsBuffer.length) {
160        int newTagsBufferLength =
161          Integer.highestOneBit(Math.max(INITIAL_KEY_BUFFER_SIZE, tagsLength) - 1) << 1;
162        byte[] newTagsBuffer = new byte[newTagsBufferLength];
163        System.arraycopy(tagsBuffer, 0, newTagsBuffer, 0, tagsBuffer.length);
164        tagsBuffer = newTagsBuffer;
165      }
166    }
167
168    protected void setKey(byte[] keyBuffer, long memTS) {
169      currentKey.setKey(keyBuffer, 0, keyLength);
170      memstoreTS = memTS;
171    }
172
173    /**
174     * Copy the state from the next one into this instance (the previous state placeholder). Used to
175     * save the previous state when we are advancing the seeker to the next key/value.
176     */
177    protected void copyFromNext(SeekerState nextState) {
178      if (keyBuffer.length != nextState.keyBuffer.length) {
179        keyBuffer = nextState.keyBuffer.clone();
180      } else if (!isValid()) {
181        // Note: we can only call isValid before we override our state, so this
182        // comes before all the assignments at the end of this method.
183        System.arraycopy(nextState.keyBuffer, 0, keyBuffer, 0, nextState.keyLength);
184      } else {
185        // don't copy the common prefix between this key and the previous one
186        System.arraycopy(nextState.keyBuffer, nextState.lastCommonPrefix, keyBuffer,
187          nextState.lastCommonPrefix, nextState.keyLength - nextState.lastCommonPrefix);
188      }
189      currentKey.set(nextState.currentKey);
190
191      valueOffset = nextState.valueOffset;
192      keyLength = nextState.keyLength;
193      valueLength = nextState.valueLength;
194      lastCommonPrefix = nextState.lastCommonPrefix;
195      nextKvOffset = nextState.nextKvOffset;
196      memstoreTS = nextState.memstoreTS;
197      currentBuffer = nextState.currentBuffer;
198      tagsOffset = nextState.tagsOffset;
199      tagsLength = nextState.tagsLength;
200      if (nextState.tagCompressionContext != null) {
201        tagCompressionContext = nextState.tagCompressionContext;
202      }
203    }
204
205    public Cell toCell() {
206      // Buffer backing the value and tags part from the HFileBlock's buffer
207      // When tag compression in use, this will be only the value bytes area.
208      ByteBuffer valAndTagsBuffer;
209      int vOffset;
210      int valAndTagsLength = this.valueLength;
211      int tagsLenSerializationSize = 0;
212      if (this.includeTags && this.tagCompressionContext == null) {
213        // Include the tags part also. This will be the tags bytes + 2 bytes of for storing tags
214        // length
215        tagsLenSerializationSize = this.tagsOffset - (this.valueOffset + this.valueLength);
216        valAndTagsLength += tagsLenSerializationSize + this.tagsLength;
217      }
218      this.currentBuffer.asSubByteBuffer(this.valueOffset, valAndTagsLength, this.tmpPair);
219      valAndTagsBuffer = this.tmpPair.getFirst();
220      vOffset = this.tmpPair.getSecond();// This is the offset to value part in the BB
221      if (valAndTagsBuffer.hasArray()) {
222        return toOnheapCell(valAndTagsBuffer, vOffset, tagsLenSerializationSize);
223      } else {
224        return toOffheapCell(valAndTagsBuffer, vOffset, tagsLenSerializationSize);
225      }
226    }
227
228    private Cell toOnheapCell(ByteBuffer valAndTagsBuffer, int vOffset,
229      int tagsLenSerializationSize) {
230      byte[] tagsArray = HConstants.EMPTY_BYTE_ARRAY;
231      int tOffset = 0;
232      if (this.includeTags) {
233        if (this.tagCompressionContext == null) {
234          tagsArray = valAndTagsBuffer.array();
235          tOffset =
236            valAndTagsBuffer.arrayOffset() + vOffset + this.valueLength + tagsLenSerializationSize;
237        } else {
238          tagsArray = Bytes.copy(tagsBuffer, 0, this.tagsLength);
239          tOffset = 0;
240        }
241      }
242      return new OnheapDecodedCell(Bytes.copy(keyBuffer, 0, this.keyLength),
243        currentKey.getRowLength(), currentKey.getFamilyOffset(), currentKey.getFamilyLength(),
244        currentKey.getQualifierOffset(), currentKey.getQualifierLength(), currentKey.getTimestamp(),
245        currentKey.getTypeByte(), valAndTagsBuffer.array(),
246        valAndTagsBuffer.arrayOffset() + vOffset, this.valueLength, memstoreTS, tagsArray, tOffset,
247        this.tagsLength);
248    }
249
250    private Cell toOffheapCell(ByteBuffer valAndTagsBuffer, int vOffset,
251      int tagsLenSerializationSize) {
252      ByteBuffer tagsBuf = HConstants.EMPTY_BYTE_BUFFER;
253      int tOffset = 0;
254      if (this.includeTags) {
255        if (this.tagCompressionContext == null) {
256          tagsBuf = valAndTagsBuffer;
257          tOffset = vOffset + this.valueLength + tagsLenSerializationSize;
258        } else {
259          tagsBuf = ByteBuffer.wrap(Bytes.copy(tagsBuffer, 0, this.tagsLength));
260          tOffset = 0;
261        }
262      }
263      return new OffheapDecodedExtendedCell(
264        ByteBuffer.wrap(Bytes.copy(keyBuffer, 0, this.keyLength)), currentKey.getRowLength(),
265        currentKey.getFamilyOffset(), currentKey.getFamilyLength(), currentKey.getQualifierOffset(),
266        currentKey.getQualifierLength(), currentKey.getTimestamp(), currentKey.getTypeByte(),
267        valAndTagsBuffer, vOffset, this.valueLength, memstoreTS, tagsBuf, tOffset, this.tagsLength);
268    }
269  }
270
271  /**
272   * Copies only the key part of the keybuffer by doing a deep copy and passes the seeker state
273   * members for taking a clone. Note that the value byte[] part is still pointing to the
274   * currentBuffer and represented by the valueOffset and valueLength
275   */
276  // We return this as a Cell to the upper layers of read flow and might try setting a new SeqId
277  // there. So this has to be an instance of ExtendedCell.
278  protected static class OnheapDecodedCell implements ExtendedCell {
279    private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
280      + (3 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT)
281      + Bytes.SIZEOF_SHORT + (2 * Bytes.SIZEOF_BYTE) + (3 * ClassSize.ARRAY));
282    private byte[] keyOnlyBuffer;
283    private short rowLength;
284    private int familyOffset;
285    private byte familyLength;
286    private int qualifierOffset;
287    private int qualifierLength;
288    private long timeStamp;
289    private byte typeByte;
290    private byte[] valueBuffer;
291    private int valueOffset;
292    private int valueLength;
293    private byte[] tagsBuffer;
294    private int tagsOffset;
295    private int tagsLength;
296    private long seqId;
297
298    protected OnheapDecodedCell(byte[] keyBuffer, short rowLength, int familyOffset,
299      byte familyLength, int qualOffset, int qualLength, long timeStamp, byte typeByte,
300      byte[] valueBuffer, int valueOffset, int valueLen, long seqId, byte[] tagsBuffer,
301      int tagsOffset, int tagsLength) {
302      this.keyOnlyBuffer = keyBuffer;
303      this.rowLength = rowLength;
304      this.familyOffset = familyOffset;
305      this.familyLength = familyLength;
306      this.qualifierOffset = qualOffset;
307      this.qualifierLength = qualLength;
308      this.timeStamp = timeStamp;
309      this.typeByte = typeByte;
310      this.valueBuffer = valueBuffer;
311      this.valueOffset = valueOffset;
312      this.valueLength = valueLen;
313      this.tagsBuffer = tagsBuffer;
314      this.tagsOffset = tagsOffset;
315      this.tagsLength = tagsLength;
316      setSequenceId(seqId);
317    }
318
319    @Override
320    public byte[] getRowArray() {
321      return keyOnlyBuffer;
322    }
323
324    @Override
325    public byte[] getFamilyArray() {
326      return keyOnlyBuffer;
327    }
328
329    @Override
330    public byte[] getQualifierArray() {
331      return keyOnlyBuffer;
332    }
333
334    @Override
335    public int getRowOffset() {
336      return Bytes.SIZEOF_SHORT;
337    }
338
339    @Override
340    public short getRowLength() {
341      return rowLength;
342    }
343
344    @Override
345    public int getFamilyOffset() {
346      return familyOffset;
347    }
348
349    @Override
350    public byte getFamilyLength() {
351      return familyLength;
352    }
353
354    @Override
355    public int getQualifierOffset() {
356      return qualifierOffset;
357    }
358
359    @Override
360    public int getQualifierLength() {
361      return qualifierLength;
362    }
363
364    @Override
365    public long getTimestamp() {
366      return timeStamp;
367    }
368
369    @Override
370    public byte getTypeByte() {
371      return typeByte;
372    }
373
374    @Override
375    public long getSequenceId() {
376      return seqId;
377    }
378
379    @Override
380    public byte[] getValueArray() {
381      return this.valueBuffer;
382    }
383
384    @Override
385    public int getValueOffset() {
386      return valueOffset;
387    }
388
389    @Override
390    public int getValueLength() {
391      return valueLength;
392    }
393
394    @Override
395    public byte[] getTagsArray() {
396      return this.tagsBuffer;
397    }
398
399    @Override
400    public int getTagsOffset() {
401      return this.tagsOffset;
402    }
403
404    @Override
405    public int getTagsLength() {
406      return tagsLength;
407    }
408
409    @Override
410    public String toString() {
411      return KeyValue.keyToString(this.keyOnlyBuffer, 0, KeyValueUtil.keyLength(this)) + "/vlen="
412        + getValueLength() + "/seqid=" + seqId;
413    }
414
415    @Override
416    public void setSequenceId(long seqId) {
417      this.seqId = seqId;
418    }
419
420    @Override
421    public long heapSize() {
422      return FIXED_OVERHEAD + rowLength + familyLength + qualifierLength + valueLength + tagsLength;
423    }
424
425    @Override
426    public int write(OutputStream out, boolean withTags) throws IOException {
427      int lenToWrite = getSerializedSize(withTags);
428      ByteBufferUtils.putInt(out, keyOnlyBuffer.length);
429      ByteBufferUtils.putInt(out, valueLength);
430      // Write key
431      out.write(keyOnlyBuffer);
432      // Write value
433      out.write(this.valueBuffer, this.valueOffset, this.valueLength);
434      if (withTags && this.tagsLength > 0) {
435        // 2 bytes tags length followed by tags bytes
436        // tags length is serialized with 2 bytes only(short way) even if the type is int.
437        // As this is non -ve numbers, we save the sign bit. See HBASE-11437
438        out.write((byte) (0xff & (this.tagsLength >> 8)));
439        out.write((byte) (0xff & this.tagsLength));
440        out.write(this.tagsBuffer, this.tagsOffset, this.tagsLength);
441      }
442      return lenToWrite;
443    }
444
445    @Override
446    public int getSerializedSize(boolean withTags) {
447      return KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength, tagsLength,
448        withTags);
449    }
450
451    @Override
452    public void write(ByteBuffer buf, int offset) {
453      // This is not used in actual flow. Throwing UnsupportedOperationException
454      throw new UnsupportedOperationException();
455    }
456
457    @Override
458    public void setTimestamp(long ts) throws IOException {
459      // This is not used in actual flow. Throwing UnsupportedOperationException
460      throw new UnsupportedOperationException();
461    }
462
463    @Override
464    public void setTimestamp(byte[] ts) throws IOException {
465      // This is not used in actual flow. Throwing UnsupportedOperationException
466      throw new UnsupportedOperationException();
467    }
468
469    @Override
470    public ExtendedCell deepClone() {
471      // This is not used in actual flow. Throwing UnsupportedOperationException
472      throw new UnsupportedOperationException();
473    }
474  }
475
476  protected static class OffheapDecodedExtendedCell extends ByteBufferExtendedCell {
477    private static final long FIXED_OVERHEAD =
478      (long) ClassSize.align(ClassSize.OBJECT + (3 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG)
479        + (7 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE)
480        + (3 * ClassSize.BYTE_BUFFER);
481    private ByteBuffer keyBuffer;
482    private short rowLength;
483    private int familyOffset;
484    private byte familyLength;
485    private int qualifierOffset;
486    private int qualifierLength;
487    private long timeStamp;
488    private byte typeByte;
489    private ByteBuffer valueBuffer;
490    private int valueOffset;
491    private int valueLength;
492    private ByteBuffer tagsBuffer;
493    private int tagsOffset;
494    private int tagsLength;
495    private long seqId;
496
497    protected OffheapDecodedExtendedCell(ByteBuffer keyBuffer, short rowLength, int familyOffset,
498      byte familyLength, int qualOffset, int qualLength, long timeStamp, byte typeByte,
499      ByteBuffer valueBuffer, int valueOffset, int valueLen, long seqId, ByteBuffer tagsBuffer,
500      int tagsOffset, int tagsLength) {
501      // The keyBuffer is always onheap
502      assert keyBuffer.hasArray();
503      assert keyBuffer.arrayOffset() == 0;
504      this.keyBuffer = keyBuffer;
505      this.rowLength = rowLength;
506      this.familyOffset = familyOffset;
507      this.familyLength = familyLength;
508      this.qualifierOffset = qualOffset;
509      this.qualifierLength = qualLength;
510      this.timeStamp = timeStamp;
511      this.typeByte = typeByte;
512      this.valueBuffer = valueBuffer;
513      this.valueOffset = valueOffset;
514      this.valueLength = valueLen;
515      this.tagsBuffer = tagsBuffer;
516      this.tagsOffset = tagsOffset;
517      this.tagsLength = tagsLength;
518      setSequenceId(seqId);
519    }
520
521    @Override
522    @SuppressWarnings("ByteBufferBackingArray")
523    public byte[] getRowArray() {
524      return this.keyBuffer.array();
525    }
526
527    @Override
528    public int getRowOffset() {
529      return getRowPosition();
530    }
531
532    @Override
533    public short getRowLength() {
534      return this.rowLength;
535    }
536
537    @Override
538    @SuppressWarnings("ByteBufferBackingArray")
539    public byte[] getFamilyArray() {
540      return this.keyBuffer.array();
541    }
542
543    @Override
544    public int getFamilyOffset() {
545      return getFamilyPosition();
546    }
547
548    @Override
549    public byte getFamilyLength() {
550      return this.familyLength;
551    }
552
553    @Override
554    @SuppressWarnings("ByteBufferBackingArray")
555    public byte[] getQualifierArray() {
556      return this.keyBuffer.array();
557    }
558
559    @Override
560    public int getQualifierOffset() {
561      return getQualifierPosition();
562    }
563
564    @Override
565    public int getQualifierLength() {
566      return this.qualifierLength;
567    }
568
569    @Override
570    public long getTimestamp() {
571      return this.timeStamp;
572    }
573
574    @Override
575    public byte getTypeByte() {
576      return this.typeByte;
577    }
578
579    @Override
580    public long getSequenceId() {
581      return this.seqId;
582    }
583
584    @Override
585    public byte[] getValueArray() {
586      return CellUtil.cloneValue(this);
587    }
588
589    @Override
590    public int getValueOffset() {
591      return 0;
592    }
593
594    @Override
595    public int getValueLength() {
596      return this.valueLength;
597    }
598
599    @Override
600    public byte[] getTagsArray() {
601      return CellUtil.cloneTags(this);
602    }
603
604    @Override
605    public int getTagsOffset() {
606      return 0;
607    }
608
609    @Override
610    public int getTagsLength() {
611      return this.tagsLength;
612    }
613
614    @Override
615    public ByteBuffer getRowByteBuffer() {
616      return this.keyBuffer;
617    }
618
619    @Override
620    public int getRowPosition() {
621      return Bytes.SIZEOF_SHORT;
622    }
623
624    @Override
625    public ByteBuffer getFamilyByteBuffer() {
626      return this.keyBuffer;
627    }
628
629    @Override
630    public int getFamilyPosition() {
631      return this.familyOffset;
632    }
633
634    @Override
635    public ByteBuffer getQualifierByteBuffer() {
636      return this.keyBuffer;
637    }
638
639    @Override
640    public int getQualifierPosition() {
641      return this.qualifierOffset;
642    }
643
644    @Override
645    public ByteBuffer getValueByteBuffer() {
646      return this.valueBuffer;
647    }
648
649    @Override
650    public int getValuePosition() {
651      return this.valueOffset;
652    }
653
654    @Override
655    public ByteBuffer getTagsByteBuffer() {
656      return this.tagsBuffer;
657    }
658
659    @Override
660    public int getTagsPosition() {
661      return this.tagsOffset;
662    }
663
664    @Override
665    public long heapSize() {
666      return FIXED_OVERHEAD;
667    }
668
669    @Override
670    public void setSequenceId(long seqId) {
671      this.seqId = seqId;
672    }
673
674    @Override
675    public int write(OutputStream out, boolean withTags) throws IOException {
676      int lenToWrite = getSerializedSize(withTags);
677      ByteBufferUtils.putInt(out, keyBuffer.remaining());
678      ByteBufferUtils.putInt(out, valueLength);
679      // Write key
680      out.write(keyBuffer.array(), keyBuffer.arrayOffset(), keyBuffer.remaining());
681      // Write value
682      ByteBufferUtils.copyBufferToStream(out, this.valueBuffer, this.valueOffset, this.valueLength);
683      if (withTags && this.tagsLength > 0) {
684        // 2 bytes tags length followed by tags bytes
685        // tags length is serialized with 2 bytes only(short way) even if the type is int.
686        // As this is non -ve numbers, we save the sign bit. See HBASE-11437
687        out.write((byte) (0xff & (this.tagsLength >> 8)));
688        out.write((byte) (0xff & this.tagsLength));
689        ByteBufferUtils.copyBufferToStream(out, this.tagsBuffer, this.tagsOffset, this.tagsLength);
690      }
691      return lenToWrite;
692    }
693
694    @Override
695    public int getSerializedSize(boolean withTags) {
696      return KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength, tagsLength,
697        withTags);
698    }
699
700    @Override
701    public void setTimestamp(long ts) throws IOException {
702      // This is not used in actual flow. Throwing UnsupportedOperationException
703      throw new UnsupportedOperationException();
704    }
705
706    @Override
707    public void setTimestamp(byte[] ts) throws IOException {
708      // This is not used in actual flow. Throwing UnsupportedOperationException
709      throw new UnsupportedOperationException();
710    }
711
712    @Override
713    public void write(ByteBuffer buf, int offset) {
714      // This is not used in actual flow. Throwing UnsupportedOperationException
715      throw new UnsupportedOperationException();
716    }
717
718    @Override
719    public ExtendedCell deepClone() {
720      // This is not used in actual flow. Throwing UnsupportedOperationException
721      throw new UnsupportedOperationException();
722    }
723  }
724
725  protected abstract static class BufferedEncodedSeeker<STATE extends SeekerState>
726    extends AbstractEncodedSeeker {
727    protected ByteBuff currentBuffer;
728    protected TagCompressionContext tagCompressionContext = null;
729    protected KeyValue.KeyOnlyKeyValue keyOnlyKV = new KeyValue.KeyOnlyKeyValue();
730    // A temp pair object which will be reused by ByteBuff#asSubByteBuffer calls. This avoids too
731    // many object creations.
732    protected final ObjectIntPair<ByteBuffer> tmpPair = new ObjectIntPair<>();
733    protected STATE current, previous;
734
735    public BufferedEncodedSeeker(HFileBlockDecodingContext decodingCtx) {
736      super(decodingCtx);
737      if (decodingCtx.getHFileContext().isCompressTags()) {
738        try {
739          tagCompressionContext = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
740        } catch (Exception e) {
741          throw new RuntimeException("Failed to initialize TagCompressionContext", e);
742        }
743      }
744      current = createSeekerState(); // always valid
745      previous = createSeekerState(); // may not be valid
746    }
747
748    @Override
749    public int compareKey(CellComparator comparator, Cell key) {
750      keyOnlyKV.setKey(current.keyBuffer, 0, current.keyLength);
751      return PrivateCellUtil.compareKeyIgnoresMvcc(comparator, key, keyOnlyKV);
752    }
753
754    @Override
755    public void setCurrentBuffer(ByteBuff buffer) {
756      if (this.tagCompressionContext != null) {
757        this.tagCompressionContext.clear();
758      }
759      currentBuffer = buffer;
760      current.currentBuffer = currentBuffer;
761      if (tagCompressionContext != null) {
762        current.tagCompressionContext = tagCompressionContext;
763      }
764      decodeFirst();
765      current.setKey(current.keyBuffer, current.memstoreTS);
766      previous.invalidate();
767    }
768
769    @Override
770    public Cell getKey() {
771      byte[] key = new byte[current.keyLength];
772      System.arraycopy(current.keyBuffer, 0, key, 0, current.keyLength);
773      return new KeyValue.KeyOnlyKeyValue(key);
774    }
775
776    @Override
777    public ByteBuffer getValueShallowCopy() {
778      currentBuffer.asSubByteBuffer(current.valueOffset, current.valueLength, tmpPair);
779      ByteBuffer dup = tmpPair.getFirst().duplicate();
780      dup.position(tmpPair.getSecond());
781      dup.limit(tmpPair.getSecond() + current.valueLength);
782      return dup.slice();
783    }
784
785    @Override
786    public Cell getCell() {
787      return current.toCell();
788    }
789
790    @Override
791    public void rewind() {
792      currentBuffer.rewind();
793      if (tagCompressionContext != null) {
794        tagCompressionContext.clear();
795      }
796      decodeFirst();
797      current.setKey(current.keyBuffer, current.memstoreTS);
798      previous.invalidate();
799    }
800
801    @Override
802    public boolean next() {
803      if (!currentBuffer.hasRemaining()) {
804        return false;
805      }
806      decodeNext();
807      current.setKey(current.keyBuffer, current.memstoreTS);
808      previous.invalidate();
809      return true;
810    }
811
812    protected void decodeTags() {
813      current.tagsLength = ByteBuff.readCompressedInt(currentBuffer);
814      if (tagCompressionContext != null) {
815        if (current.uncompressTags) {
816          // Tag compression is been used. uncompress it into tagsBuffer
817          current.ensureSpaceForTags();
818          try {
819            current.tagsCompressedLength = tagCompressionContext.uncompressTags(currentBuffer,
820              current.tagsBuffer, 0, current.tagsLength);
821          } catch (IOException e) {
822            throw new RuntimeException("Exception while uncompressing tags", e);
823          }
824        } else {
825          currentBuffer.skip(current.tagsCompressedLength);
826          current.uncompressTags = true;// Reset this.
827        }
828        current.tagsOffset = -1;
829      } else {
830        // When tag compress is not used, let us not do copying of tags bytes into tagsBuffer.
831        // Just mark the tags Offset so as to create the KV buffer later in getKeyValueBuffer()
832        current.tagsOffset = currentBuffer.position();
833        currentBuffer.skip(current.tagsLength);
834      }
835    }
836
837    @Override
838    public int seekToKeyInBlock(Cell seekCell, boolean seekBefore) {
839      int rowCommonPrefix = 0;
840      int familyCommonPrefix = 0;
841      int qualCommonPrefix = 0;
842      previous.invalidate();
843      do {
844        int comp;
845        keyOnlyKV.setKey(current.keyBuffer, 0, current.keyLength);
846        if (current.lastCommonPrefix != 0) {
847          // The KV format has row key length also in the byte array. The
848          // common prefix
849          // includes it. So we need to subtract to find out the common prefix
850          // in the
851          // row part alone
852          rowCommonPrefix = Math.min(rowCommonPrefix, current.lastCommonPrefix - 2);
853        }
854        if (current.lastCommonPrefix <= 2) {
855          rowCommonPrefix = 0;
856        }
857        rowCommonPrefix += findCommonPrefixInRowPart(seekCell, keyOnlyKV, rowCommonPrefix);
858        comp = compareCommonRowPrefix(seekCell, keyOnlyKV, rowCommonPrefix);
859        if (comp == 0) {
860          comp = compareTypeBytes(seekCell, keyOnlyKV);
861          if (comp == 0) {
862            // Subtract the fixed row key length and the family key fixed length
863            familyCommonPrefix = Math.max(0, Math.min(familyCommonPrefix,
864              current.lastCommonPrefix - (3 + keyOnlyKV.getRowLength())));
865            familyCommonPrefix +=
866              findCommonPrefixInFamilyPart(seekCell, keyOnlyKV, familyCommonPrefix);
867            comp = compareCommonFamilyPrefix(seekCell, keyOnlyKV, familyCommonPrefix);
868            if (comp == 0) {
869              // subtract the rowkey fixed length and the family key fixed
870              // length
871              qualCommonPrefix = Math.max(0, Math.min(qualCommonPrefix, current.lastCommonPrefix
872                - (3 + keyOnlyKV.getRowLength() + keyOnlyKV.getFamilyLength())));
873              qualCommonPrefix +=
874                findCommonPrefixInQualifierPart(seekCell, keyOnlyKV, qualCommonPrefix);
875              comp = compareCommonQualifierPrefix(seekCell, keyOnlyKV, qualCommonPrefix);
876              if (comp == 0) {
877                comp = CellComparator.getInstance().compareTimestamps(seekCell, keyOnlyKV);
878                if (comp == 0) {
879                  // Compare types. Let the delete types sort ahead of puts;
880                  // i.e. types
881                  // of higher numbers sort before those of lesser numbers.
882                  // Maximum
883                  // (255)
884                  // appears ahead of everything, and minimum (0) appears
885                  // after
886                  // everything.
887                  comp = (0xff & keyOnlyKV.getTypeByte()) - (0xff & seekCell.getTypeByte());
888                }
889              }
890            }
891          }
892        }
893        if (comp == 0) { // exact match
894          if (seekBefore) {
895            if (!previous.isValid()) {
896              // The caller (seekBefore) has to ensure that we are not at the
897              // first key in the block.
898              throw new IllegalStateException(
899                "Cannot seekBefore if " + "positioned at the first key in the block: key="
900                  + Bytes.toStringBinary(seekCell.getRowArray()));
901            }
902            moveToPrevious();
903            return 1;
904          }
905          return 0;
906        }
907
908        if (comp < 0) { // already too large, check previous
909          if (previous.isValid()) {
910            moveToPrevious();
911          } else {
912            return HConstants.INDEX_KEY_MAGIC; // using optimized index key
913          }
914          return 1;
915        }
916
917        // move to next, if more data is available
918        if (currentBuffer.hasRemaining()) {
919          previous.copyFromNext(current);
920          decodeNext();
921          current.setKey(current.keyBuffer, current.memstoreTS);
922        } else {
923          break;
924        }
925      } while (true);
926
927      // we hit the end of the block, not an exact match
928      return 1;
929    }
930
931    private int compareTypeBytes(Cell key, Cell right) {
932      if (
933        key.getFamilyLength() + key.getQualifierLength() == 0
934          && key.getTypeByte() == KeyValue.Type.Minimum.getCode()
935      ) {
936        // left is "bigger", i.e. it appears later in the sorted order
937        return 1;
938      }
939      if (
940        right.getFamilyLength() + right.getQualifierLength() == 0
941          && right.getTypeByte() == KeyValue.Type.Minimum.getCode()
942      ) {
943        return -1;
944      }
945      return 0;
946    }
947
948    private static int findCommonPrefixInRowPart(Cell left, Cell right, int rowCommonPrefix) {
949      return Bytes.findCommonPrefix(left.getRowArray(), right.getRowArray(),
950        left.getRowLength() - rowCommonPrefix, right.getRowLength() - rowCommonPrefix,
951        left.getRowOffset() + rowCommonPrefix, right.getRowOffset() + rowCommonPrefix);
952    }
953
954    private static int findCommonPrefixInFamilyPart(Cell left, Cell right, int familyCommonPrefix) {
955      return Bytes.findCommonPrefix(left.getFamilyArray(), right.getFamilyArray(),
956        left.getFamilyLength() - familyCommonPrefix, right.getFamilyLength() - familyCommonPrefix,
957        left.getFamilyOffset() + familyCommonPrefix, right.getFamilyOffset() + familyCommonPrefix);
958    }
959
960    private static int findCommonPrefixInQualifierPart(Cell left, Cell right,
961      int qualifierCommonPrefix) {
962      return Bytes.findCommonPrefix(left.getQualifierArray(), right.getQualifierArray(),
963        left.getQualifierLength() - qualifierCommonPrefix,
964        right.getQualifierLength() - qualifierCommonPrefix,
965        left.getQualifierOffset() + qualifierCommonPrefix,
966        right.getQualifierOffset() + qualifierCommonPrefix);
967    }
968
969    private void moveToPrevious() {
970      if (!previous.isValid()) {
971        throw new IllegalStateException(
972          "Can move back only once and not in first key in the block.");
973      }
974
975      STATE tmp = previous;
976      previous = current;
977      current = tmp;
978
979      // move after last key value
980      currentBuffer.position(current.nextKvOffset);
981      // Already decoded the tag bytes. We cache this tags into current state and also the total
982      // compressed length of the tags bytes. For the next time decodeNext() we don't need to decode
983      // the tags again. This might pollute the Data Dictionary what we use for the compression.
984      // When current.uncompressTags is false, we will just reuse the current.tagsBuffer and skip
985      // 'tagsCompressedLength' bytes of source stream.
986      // See in decodeTags()
987      current.tagsBuffer = previous.tagsBuffer;
988      current.tagsCompressedLength = previous.tagsCompressedLength;
989      current.uncompressTags = false;
990      // The current key has to be reset with the previous Cell
991      current.setKey(current.keyBuffer, current.memstoreTS);
992      previous.invalidate();
993    }
994
995    @SuppressWarnings("unchecked")
996    protected STATE createSeekerState() {
997      // This will fail for non-default seeker state if the subclass does not
998      // override this method.
999      return (STATE) new SeekerState(this.tmpPair, this.includesTags());
1000    }
1001
1002    abstract protected void decodeFirst();
1003
1004    abstract protected void decodeNext();
1005  }
1006
1007  /** Returns unencoded size added */
1008  protected final int afterEncodingKeyValue(Cell cell, DataOutputStream out,
1009    HFileBlockDefaultEncodingContext encodingCtx) throws IOException {
1010    int size = 0;
1011    if (encodingCtx.getHFileContext().isIncludesTags()) {
1012      int tagsLength = cell.getTagsLength();
1013      ByteBufferUtils.putCompressedInt(out, tagsLength);
1014      // There are some tags to be written
1015      if (tagsLength > 0) {
1016        TagCompressionContext tagCompressionContext = encodingCtx.getTagCompressionContext();
1017        // When tag compression is enabled, tagCompressionContext will have a not null value. Write
1018        // the tags using Dictionary compression in such a case
1019        if (tagCompressionContext != null) {
1020          // Not passing tagsLength considering that parsing of the tagsLength is not costly
1021          PrivateCellUtil.compressTags(out, cell, tagCompressionContext);
1022        } else {
1023          PrivateCellUtil.writeTags(out, cell, tagsLength);
1024        }
1025      }
1026      size += tagsLength + KeyValue.TAGS_LENGTH_SIZE;
1027    }
1028    if (encodingCtx.getHFileContext().isIncludesMvcc()) {
1029      // Copy memstore timestamp from the byte buffer to the output stream.
1030      long memstoreTS = cell.getSequenceId();
1031      WritableUtils.writeVLong(out, memstoreTS);
1032      // TODO use a writeVLong which returns the #bytes written so that 2 time parsing can be
1033      // avoided.
1034      size += WritableUtils.getVIntSize(memstoreTS);
1035    }
1036    return size;
1037  }
1038
1039  protected final void afterDecodingKeyValue(DataInputStream source, ByteBuffer dest,
1040    HFileBlockDefaultDecodingContext decodingCtx) throws IOException {
1041    if (decodingCtx.getHFileContext().isIncludesTags()) {
1042      int tagsLength = ByteBufferUtils.readCompressedInt(source);
1043      // Put as unsigned short
1044      dest.put((byte) ((tagsLength >> 8) & 0xff));
1045      dest.put((byte) (tagsLength & 0xff));
1046      if (tagsLength > 0) {
1047        TagCompressionContext tagCompressionContext = decodingCtx.getTagCompressionContext();
1048        // When tag compression is been used in this file, tagCompressionContext will have a not
1049        // null value passed.
1050        if (tagCompressionContext != null) {
1051          tagCompressionContext.uncompressTags(source, dest, tagsLength);
1052        } else {
1053          ByteBufferUtils.copyFromStreamToBuffer(dest, source, tagsLength);
1054        }
1055      }
1056    }
1057    if (decodingCtx.getHFileContext().isIncludesMvcc()) {
1058      long memstoreTS = -1;
1059      try {
1060        // Copy memstore timestamp from the data input stream to the byte
1061        // buffer.
1062        memstoreTS = WritableUtils.readVLong(source);
1063        ByteBufferUtils.writeVLong(dest, memstoreTS);
1064      } catch (IOException ex) {
1065        throw new RuntimeException(
1066          "Unable to copy memstore timestamp " + memstoreTS + " after decoding a key/value");
1067      }
1068    }
1069  }
1070
1071  protected abstract ByteBuffer internalDecodeKeyValues(DataInputStream source,
1072    int allocateHeaderLength, int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx)
1073    throws IOException;
1074
1075  /**
1076   * Asserts that there is at least the given amount of unfilled space remaining in the given
1077   * buffer.
1078   * @param out    typically, the buffer we are writing to
1079   * @param length the required space in the buffer
1080   * @throws EncoderBufferTooSmallException If there are no enough bytes.
1081   */
1082  protected static void ensureSpace(ByteBuffer out, int length)
1083    throws EncoderBufferTooSmallException {
1084    if (out.position() + length > out.limit()) {
1085      throw new EncoderBufferTooSmallException("Buffer position=" + out.position()
1086        + ", buffer limit=" + out.limit() + ", length to be written=" + length);
1087    }
1088  }
1089
1090  @Override
1091  public void startBlockEncoding(HFileBlockEncodingContext blkEncodingCtx, DataOutputStream out)
1092    throws IOException {
1093    if (blkEncodingCtx.getClass() != HFileBlockDefaultEncodingContext.class) {
1094      throw new IOException(this.getClass().getName() + " only accepts "
1095        + HFileBlockDefaultEncodingContext.class.getName() + " as the " + "encoding context.");
1096    }
1097
1098    HFileBlockDefaultEncodingContext encodingCtx =
1099      (HFileBlockDefaultEncodingContext) blkEncodingCtx;
1100    encodingCtx.prepareEncoding(out);
1101    if (
1102      encodingCtx.getHFileContext().isIncludesTags()
1103        && encodingCtx.getHFileContext().isCompressTags()
1104    ) {
1105      if (encodingCtx.getTagCompressionContext() != null) {
1106        // It will be overhead to create the TagCompressionContext again and again for every block
1107        // encoding.
1108        encodingCtx.getTagCompressionContext().clear();
1109      } else {
1110        try {
1111          TagCompressionContext tagCompressionContext =
1112            new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
1113          encodingCtx.setTagCompressionContext(tagCompressionContext);
1114        } catch (Exception e) {
1115          throw new IOException("Failed to initialize TagCompressionContext", e);
1116        }
1117      }
1118    }
1119    StreamUtils.writeInt(out, 0); // DUMMY length. This will be updated in endBlockEncoding()
1120    blkEncodingCtx.setEncodingState(new EncodingState());
1121  }
1122
1123  @Override
1124  public void encode(Cell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
1125    throws IOException {
1126    EncodingState state = encodingCtx.getEncodingState();
1127    int posBeforeEncode = out.size();
1128    int encodedKvSize = internalEncode(cell, (HFileBlockDefaultEncodingContext) encodingCtx, out);
1129    state.postCellEncode(encodedKvSize, out.size() - posBeforeEncode);
1130  }
1131
1132  public abstract int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingCtx,
1133    DataOutputStream out) throws IOException;
1134
1135  @Override
1136  public void endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out,
1137    byte[] uncompressedBytesWithHeader) throws IOException {
1138    EncodingState state = encodingCtx.getEncodingState();
1139    // Write the unencodedDataSizeWritten (with header size)
1140    Bytes.putInt(uncompressedBytesWithHeader,
1141      HConstants.HFILEBLOCK_HEADER_SIZE + DataBlockEncoding.ID_SIZE,
1142      state.getUnencodedDataSizeWritten());
1143    postEncoding(encodingCtx);
1144  }
1145
1146}