001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.encoding;
019
020import java.io.DataInputStream;
021import java.io.DataOutputStream;
022import java.io.IOException;
023import java.io.OutputStream;
024import java.nio.ByteBuffer;
025import org.apache.hadoop.hbase.ByteBufferExtendedCell;
026import org.apache.hadoop.hbase.Cell;
027import org.apache.hadoop.hbase.CellComparator;
028import org.apache.hadoop.hbase.CellUtil;
029import org.apache.hadoop.hbase.ExtendedCell;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.KeyValue;
032import org.apache.hadoop.hbase.KeyValueUtil;
033import org.apache.hadoop.hbase.PrivateCellUtil;
034import org.apache.hadoop.hbase.io.TagCompressionContext;
035import org.apache.hadoop.hbase.io.util.LRUDictionary;
036import org.apache.hadoop.hbase.io.util.StreamUtils;
037import org.apache.hadoop.hbase.nio.ByteBuff;
038import org.apache.hadoop.hbase.util.ByteBufferUtils;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.apache.hadoop.hbase.util.ClassSize;
041import org.apache.hadoop.hbase.util.ObjectIntPair;
042import org.apache.hadoop.io.WritableUtils;
043import org.apache.yetus.audience.InterfaceAudience;
044
045/**
046 * Base class for all data block encoders that use a buffer.
047 */
048@InterfaceAudience.Private
049abstract class BufferedDataBlockEncoder extends AbstractDataBlockEncoder {
050  /**
051   * TODO: This datablockencoder is dealing in internals of hfileblocks. Purge reference to HFBs
052   */
053  private static int INITIAL_KEY_BUFFER_SIZE = 512;
054
055  @Override
056  public ByteBuffer decodeKeyValues(DataInputStream source,
057    HFileBlockDecodingContext blkDecodingCtx) throws IOException {
058    if (blkDecodingCtx.getClass() != HFileBlockDefaultDecodingContext.class) {
059      throw new IOException(this.getClass().getName() + " only accepts "
060        + HFileBlockDefaultDecodingContext.class.getName() + " as the decoding context.");
061    }
062
063    HFileBlockDefaultDecodingContext decodingCtx =
064      (HFileBlockDefaultDecodingContext) blkDecodingCtx;
065    if (
066      decodingCtx.getHFileContext().isIncludesTags()
067        && decodingCtx.getHFileContext().isCompressTags()
068    ) {
069      if (decodingCtx.getTagCompressionContext() != null) {
070        // It will be overhead to create the TagCompressionContext again and again for every block
071        // decoding.
072        decodingCtx.getTagCompressionContext().clear();
073      } else {
074        try {
075          TagCompressionContext tagCompressionContext =
076            new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
077          decodingCtx.setTagCompressionContext(tagCompressionContext);
078        } catch (Exception e) {
079          throw new IOException("Failed to initialize TagCompressionContext", e);
080        }
081      }
082    }
083    return internalDecodeKeyValues(source, 0, 0, decodingCtx);
084  }
085
086  /********************* common prefixes *************************/
087  // Having this as static is fine but if META is having DBE then we should
088  // change this.
089  public static int compareCommonRowPrefix(Cell left, Cell right, int rowCommonPrefix) {
090    return Bytes.compareTo(left.getRowArray(), left.getRowOffset() + rowCommonPrefix,
091      left.getRowLength() - rowCommonPrefix, right.getRowArray(),
092      right.getRowOffset() + rowCommonPrefix, right.getRowLength() - rowCommonPrefix);
093  }
094
095  public static int compareCommonFamilyPrefix(Cell left, Cell right, int familyCommonPrefix) {
096    return Bytes.compareTo(left.getFamilyArray(), left.getFamilyOffset() + familyCommonPrefix,
097      left.getFamilyLength() - familyCommonPrefix, right.getFamilyArray(),
098      right.getFamilyOffset() + familyCommonPrefix, right.getFamilyLength() - familyCommonPrefix);
099  }
100
101  public static int compareCommonQualifierPrefix(Cell left, Cell right, int qualCommonPrefix) {
102    return Bytes.compareTo(left.getQualifierArray(), left.getQualifierOffset() + qualCommonPrefix,
103      left.getQualifierLength() - qualCommonPrefix, right.getQualifierArray(),
104      right.getQualifierOffset() + qualCommonPrefix, right.getQualifierLength() - qualCommonPrefix);
105  }
106
107  protected static class SeekerState {
108    protected ByteBuff currentBuffer;
109    protected TagCompressionContext tagCompressionContext;
110    protected int valueOffset = -1;
111    protected int keyLength;
112    protected int valueLength;
113    protected int lastCommonPrefix;
114    protected int tagsLength = 0;
115    protected int tagsOffset = -1;
116    protected int tagsCompressedLength = 0;
117    protected boolean uncompressTags = true;
118
119    /** We need to store a copy of the key. */
120    protected byte[] keyBuffer = HConstants.EMPTY_BYTE_ARRAY;
121    protected byte[] tagsBuffer = HConstants.EMPTY_BYTE_ARRAY;
122
123    protected long memstoreTS;
124    protected int nextKvOffset;
125    protected KeyValue.KeyOnlyKeyValue currentKey = new KeyValue.KeyOnlyKeyValue();
126    // A temp pair object which will be reused by ByteBuff#asSubByteBuffer calls. This avoids too
127    // many object creations.
128    private final ObjectIntPair<ByteBuffer> tmpPair;
129    private final boolean includeTags;
130
131    public SeekerState(ObjectIntPair<ByteBuffer> tmpPair, boolean includeTags) {
132      this.tmpPair = tmpPair;
133      this.includeTags = includeTags;
134    }
135
136    protected boolean isValid() {
137      return valueOffset != -1;
138    }
139
140    protected void invalidate() {
141      valueOffset = -1;
142      tagsCompressedLength = 0;
143      currentKey.clear();
144      uncompressTags = true;
145      currentBuffer = null;
146    }
147
148    protected void ensureSpaceForKey() {
149      if (keyLength > keyBuffer.length) {
150        int newKeyBufferLength =
151          Integer.highestOneBit(Math.max(INITIAL_KEY_BUFFER_SIZE, keyLength) - 1) << 1;
152        byte[] newKeyBuffer = new byte[newKeyBufferLength];
153        System.arraycopy(keyBuffer, 0, newKeyBuffer, 0, keyBuffer.length);
154        keyBuffer = newKeyBuffer;
155      }
156    }
157
158    protected void ensureSpaceForTags() {
159      if (tagsLength > tagsBuffer.length) {
160        int newTagsBufferLength =
161          Integer.highestOneBit(Math.max(INITIAL_KEY_BUFFER_SIZE, tagsLength) - 1) << 1;
162        byte[] newTagsBuffer = new byte[newTagsBufferLength];
163        System.arraycopy(tagsBuffer, 0, newTagsBuffer, 0, tagsBuffer.length);
164        tagsBuffer = newTagsBuffer;
165      }
166    }
167
168    protected void setKey(byte[] keyBuffer, long memTS) {
169      currentKey.setKey(keyBuffer, 0, keyLength);
170      memstoreTS = memTS;
171    }
172
173    /**
174     * Copy the state from the next one into this instance (the previous state placeholder). Used to
175     * save the previous state when we are advancing the seeker to the next key/value.
176     */
177    protected void copyFromNext(SeekerState nextState) {
178      if (keyBuffer.length != nextState.keyBuffer.length) {
179        keyBuffer = nextState.keyBuffer.clone();
180      } else if (!isValid()) {
181        // Note: we can only call isValid before we override our state, so this
182        // comes before all the assignments at the end of this method.
183        System.arraycopy(nextState.keyBuffer, 0, keyBuffer, 0, nextState.keyLength);
184      } else {
185        // don't copy the common prefix between this key and the previous one
186        System.arraycopy(nextState.keyBuffer, nextState.lastCommonPrefix, keyBuffer,
187          nextState.lastCommonPrefix, nextState.keyLength - nextState.lastCommonPrefix);
188      }
189      currentKey.set(nextState.currentKey);
190
191      valueOffset = nextState.valueOffset;
192      keyLength = nextState.keyLength;
193      valueLength = nextState.valueLength;
194      lastCommonPrefix = nextState.lastCommonPrefix;
195      nextKvOffset = nextState.nextKvOffset;
196      memstoreTS = nextState.memstoreTS;
197      currentBuffer = nextState.currentBuffer;
198      tagsOffset = nextState.tagsOffset;
199      tagsLength = nextState.tagsLength;
200      if (nextState.tagCompressionContext != null) {
201        tagCompressionContext = nextState.tagCompressionContext;
202      }
203    }
204
205    public Cell toCell() {
206      // Buffer backing the value and tags part from the HFileBlock's buffer
207      // When tag compression in use, this will be only the value bytes area.
208      ByteBuffer valAndTagsBuffer;
209      int vOffset;
210      int valAndTagsLength = this.valueLength;
211      int tagsLenSerializationSize = 0;
212      if (this.includeTags && this.tagCompressionContext == null) {
213        // Include the tags part also. This will be the tags bytes + 2 bytes of for storing tags
214        // length
215        tagsLenSerializationSize = this.tagsOffset - (this.valueOffset + this.valueLength);
216        valAndTagsLength += tagsLenSerializationSize + this.tagsLength;
217      }
218      this.currentBuffer.asSubByteBuffer(this.valueOffset, valAndTagsLength, this.tmpPair);
219      valAndTagsBuffer = this.tmpPair.getFirst();
220      vOffset = this.tmpPair.getSecond();// This is the offset to value part in the BB
221      if (valAndTagsBuffer.hasArray()) {
222        return toOnheapCell(valAndTagsBuffer, vOffset, tagsLenSerializationSize);
223      } else {
224        return toOffheapCell(valAndTagsBuffer, vOffset, tagsLenSerializationSize);
225      }
226    }
227
228    private Cell toOnheapCell(ByteBuffer valAndTagsBuffer, int vOffset,
229      int tagsLenSerializationSize) {
230      byte[] tagsArray = HConstants.EMPTY_BYTE_ARRAY;
231      int tOffset = 0;
232      if (this.includeTags) {
233        if (this.tagCompressionContext == null) {
234          tagsArray = valAndTagsBuffer.array();
235          tOffset =
236            valAndTagsBuffer.arrayOffset() + vOffset + this.valueLength + tagsLenSerializationSize;
237        } else {
238          tagsArray = Bytes.copy(tagsBuffer, 0, this.tagsLength);
239          tOffset = 0;
240        }
241      }
242      return new OnheapDecodedCell(Bytes.copy(keyBuffer, 0, this.keyLength),
243        currentKey.getRowLength(), currentKey.getFamilyOffset(), currentKey.getFamilyLength(),
244        currentKey.getQualifierOffset(), currentKey.getQualifierLength(), currentKey.getTimestamp(),
245        currentKey.getTypeByte(), valAndTagsBuffer.array(),
246        valAndTagsBuffer.arrayOffset() + vOffset, this.valueLength, memstoreTS, tagsArray, tOffset,
247        this.tagsLength);
248    }
249
250    private Cell toOffheapCell(ByteBuffer valAndTagsBuffer, int vOffset,
251      int tagsLenSerializationSize) {
252      ByteBuffer tagsBuf = HConstants.EMPTY_BYTE_BUFFER;
253      int tOffset = 0;
254      if (this.includeTags) {
255        if (this.tagCompressionContext == null) {
256          tagsBuf = valAndTagsBuffer;
257          tOffset = vOffset + this.valueLength + tagsLenSerializationSize;
258        } else {
259          tagsBuf = ByteBuffer.wrap(Bytes.copy(tagsBuffer, 0, this.tagsLength));
260          tOffset = 0;
261        }
262      }
263      return new OffheapDecodedExtendedCell(
264        ByteBuffer.wrap(Bytes.copy(keyBuffer, 0, this.keyLength)), currentKey.getRowLength(),
265        currentKey.getFamilyOffset(), currentKey.getFamilyLength(), currentKey.getQualifierOffset(),
266        currentKey.getQualifierLength(), currentKey.getTimestamp(), currentKey.getTypeByte(),
267        valAndTagsBuffer, vOffset, this.valueLength, memstoreTS, tagsBuf, tOffset, this.tagsLength);
268    }
269  }
270
271  /**
272   * Copies only the key part of the keybuffer by doing a deep copy and passes the seeker state
273   * members for taking a clone. Note that the value byte[] part is still pointing to the
274   * currentBuffer and represented by the valueOffset and valueLength
275   */
276  // We return this as a Cell to the upper layers of read flow and might try setting a new SeqId
277  // there. So this has to be an instance of ExtendedCell.
278  protected static class OnheapDecodedCell implements ExtendedCell {
279    private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
280      + (3 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT)
281      + Bytes.SIZEOF_SHORT + (2 * Bytes.SIZEOF_BYTE) + (3 * ClassSize.ARRAY));
282    private byte[] keyOnlyBuffer;
283    private short rowLength;
284    private int familyOffset;
285    private byte familyLength;
286    private int qualifierOffset;
287    private int qualifierLength;
288    private long timeStamp;
289    private byte typeByte;
290    private byte[] valueBuffer;
291    private int valueOffset;
292    private int valueLength;
293    private byte[] tagsBuffer;
294    private int tagsOffset;
295    private int tagsLength;
296    private long seqId;
297
298    protected OnheapDecodedCell(byte[] keyBuffer, short rowLength, int familyOffset,
299      byte familyLength, int qualOffset, int qualLength, long timeStamp, byte typeByte,
300      byte[] valueBuffer, int valueOffset, int valueLen, long seqId, byte[] tagsBuffer,
301      int tagsOffset, int tagsLength) {
302      this.keyOnlyBuffer = keyBuffer;
303      this.rowLength = rowLength;
304      this.familyOffset = familyOffset;
305      this.familyLength = familyLength;
306      this.qualifierOffset = qualOffset;
307      this.qualifierLength = qualLength;
308      this.timeStamp = timeStamp;
309      this.typeByte = typeByte;
310      this.valueBuffer = valueBuffer;
311      this.valueOffset = valueOffset;
312      this.valueLength = valueLen;
313      this.tagsBuffer = tagsBuffer;
314      this.tagsOffset = tagsOffset;
315      this.tagsLength = tagsLength;
316      setSequenceId(seqId);
317    }
318
319    @Override
320    public byte[] getRowArray() {
321      return keyOnlyBuffer;
322    }
323
324    @Override
325    public byte[] getFamilyArray() {
326      return keyOnlyBuffer;
327    }
328
329    @Override
330    public byte[] getQualifierArray() {
331      return keyOnlyBuffer;
332    }
333
334    @Override
335    public int getRowOffset() {
336      return Bytes.SIZEOF_SHORT;
337    }
338
339    @Override
340    public short getRowLength() {
341      return rowLength;
342    }
343
344    @Override
345    public int getFamilyOffset() {
346      return familyOffset;
347    }
348
349    @Override
350    public byte getFamilyLength() {
351      return familyLength;
352    }
353
354    @Override
355    public int getQualifierOffset() {
356      return qualifierOffset;
357    }
358
359    @Override
360    public int getQualifierLength() {
361      return qualifierLength;
362    }
363
364    @Override
365    public long getTimestamp() {
366      return timeStamp;
367    }
368
369    @Override
370    public byte getTypeByte() {
371      return typeByte;
372    }
373
374    @Override
375    public long getSequenceId() {
376      return seqId;
377    }
378
379    @Override
380    public byte[] getValueArray() {
381      return this.valueBuffer;
382    }
383
384    @Override
385    public int getValueOffset() {
386      return valueOffset;
387    }
388
389    @Override
390    public int getValueLength() {
391      return valueLength;
392    }
393
394    @Override
395    public byte[] getTagsArray() {
396      return this.tagsBuffer;
397    }
398
399    @Override
400    public int getTagsOffset() {
401      return this.tagsOffset;
402    }
403
404    @Override
405    public int getTagsLength() {
406      return tagsLength;
407    }
408
409    @Override
410    public String toString() {
411      return KeyValue.keyToString(this.keyOnlyBuffer, 0, KeyValueUtil.keyLength(this)) + "/vlen="
412        + getValueLength() + "/seqid=" + seqId;
413    }
414
415    @Override
416    public void setSequenceId(long seqId) {
417      this.seqId = seqId;
418    }
419
420    @Override
421    public long heapSize() {
422      return FIXED_OVERHEAD + rowLength + familyLength + qualifierLength + valueLength + tagsLength;
423    }
424
425    @Override
426    public int write(OutputStream out, boolean withTags) throws IOException {
427      int lenToWrite = getSerializedSize(withTags);
428      ByteBufferUtils.putInt(out, keyOnlyBuffer.length);
429      ByteBufferUtils.putInt(out, valueLength);
430      // Write key
431      out.write(keyOnlyBuffer);
432      // Write value
433      out.write(this.valueBuffer, this.valueOffset, this.valueLength);
434      if (withTags && this.tagsLength > 0) {
435        // 2 bytes tags length followed by tags bytes
436        // tags length is serialized with 2 bytes only(short way) even if the type is int.
437        // As this is non -ve numbers, we save the sign bit. See HBASE-11437
438        out.write((byte) (0xff & (this.tagsLength >> 8)));
439        out.write((byte) (0xff & this.tagsLength));
440        out.write(this.tagsBuffer, this.tagsOffset, this.tagsLength);
441      }
442      return lenToWrite;
443    }
444
445    @Override
446    public int getSerializedSize(boolean withTags) {
447      return KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength, tagsLength,
448        withTags);
449    }
450
451    @Override
452    public void write(ByteBuffer buf, int offset) {
453      // This is not used in actual flow. Throwing UnsupportedOperationException
454      throw new UnsupportedOperationException();
455    }
456
457    @Override
458    public void setTimestamp(long ts) throws IOException {
459      // This is not used in actual flow. Throwing UnsupportedOperationException
460      throw new UnsupportedOperationException();
461    }
462
463    @Override
464    public void setTimestamp(byte[] ts) throws IOException {
465      // This is not used in actual flow. Throwing UnsupportedOperationException
466      throw new UnsupportedOperationException();
467    }
468
469    @Override
470    public ExtendedCell deepClone() {
471      // This is not used in actual flow. Throwing UnsupportedOperationException
472      throw new UnsupportedOperationException();
473    }
474  }
475
476  protected static class OffheapDecodedExtendedCell extends ByteBufferExtendedCell {
477    private static final long FIXED_OVERHEAD =
478      (long) ClassSize.align(ClassSize.OBJECT + (3 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG)
479        + (7 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE)
480        + (3 * ClassSize.BYTE_BUFFER);
481    private ByteBuffer keyBuffer;
482    private short rowLength;
483    private int familyOffset;
484    private byte familyLength;
485    private int qualifierOffset;
486    private int qualifierLength;
487    private long timeStamp;
488    private byte typeByte;
489    private ByteBuffer valueBuffer;
490    private int valueOffset;
491    private int valueLength;
492    private ByteBuffer tagsBuffer;
493    private int tagsOffset;
494    private int tagsLength;
495    private long seqId;
496
497    protected OffheapDecodedExtendedCell(ByteBuffer keyBuffer, short rowLength, int familyOffset,
498      byte familyLength, int qualOffset, int qualLength, long timeStamp, byte typeByte,
499      ByteBuffer valueBuffer, int valueOffset, int valueLen, long seqId, ByteBuffer tagsBuffer,
500      int tagsOffset, int tagsLength) {
501      // The keyBuffer is always onheap
502      assert keyBuffer.hasArray();
503      assert keyBuffer.arrayOffset() == 0;
504      this.keyBuffer = keyBuffer;
505      this.rowLength = rowLength;
506      this.familyOffset = familyOffset;
507      this.familyLength = familyLength;
508      this.qualifierOffset = qualOffset;
509      this.qualifierLength = qualLength;
510      this.timeStamp = timeStamp;
511      this.typeByte = typeByte;
512      this.valueBuffer = valueBuffer;
513      this.valueOffset = valueOffset;
514      this.valueLength = valueLen;
515      this.tagsBuffer = tagsBuffer;
516      this.tagsOffset = tagsOffset;
517      this.tagsLength = tagsLength;
518      setSequenceId(seqId);
519    }
520
521    @Override
522    @SuppressWarnings("ByteBufferBackingArray")
523    public byte[] getRowArray() {
524      return this.keyBuffer.array();
525    }
526
527    @Override
528    public int getRowOffset() {
529      return getRowPosition();
530    }
531
532    @Override
533    public short getRowLength() {
534      return this.rowLength;
535    }
536
537    @Override
538    @SuppressWarnings("ByteBufferBackingArray")
539    public byte[] getFamilyArray() {
540      return this.keyBuffer.array();
541    }
542
543    @Override
544    public int getFamilyOffset() {
545      return getFamilyPosition();
546    }
547
548    @Override
549    public byte getFamilyLength() {
550      return this.familyLength;
551    }
552
553    @Override
554    @SuppressWarnings("ByteBufferBackingArray")
555    public byte[] getQualifierArray() {
556      return this.keyBuffer.array();
557    }
558
559    @Override
560    public int getQualifierOffset() {
561      return getQualifierPosition();
562    }
563
564    @Override
565    public int getQualifierLength() {
566      return this.qualifierLength;
567    }
568
569    @Override
570    public long getTimestamp() {
571      return this.timeStamp;
572    }
573
574    @Override
575    public byte getTypeByte() {
576      return this.typeByte;
577    }
578
579    @Override
580    public long getSequenceId() {
581      return this.seqId;
582    }
583
584    @Override
585    public byte[] getValueArray() {
586      return CellUtil.cloneValue(this);
587    }
588
589    @Override
590    public int getValueOffset() {
591      return 0;
592    }
593
594    @Override
595    public int getValueLength() {
596      return this.valueLength;
597    }
598
599    @Override
600    public byte[] getTagsArray() {
601      return PrivateCellUtil.cloneTags(this);
602    }
603
604    @Override
605    public int getTagsOffset() {
606      return 0;
607    }
608
609    @Override
610    public int getTagsLength() {
611      return this.tagsLength;
612    }
613
614    @Override
615    public ByteBuffer getRowByteBuffer() {
616      return this.keyBuffer;
617    }
618
619    @Override
620    public int getRowPosition() {
621      return Bytes.SIZEOF_SHORT;
622    }
623
624    @Override
625    public ByteBuffer getFamilyByteBuffer() {
626      return this.keyBuffer;
627    }
628
629    @Override
630    public int getFamilyPosition() {
631      return this.familyOffset;
632    }
633
634    @Override
635    public ByteBuffer getQualifierByteBuffer() {
636      return this.keyBuffer;
637    }
638
639    @Override
640    public int getQualifierPosition() {
641      return this.qualifierOffset;
642    }
643
644    @Override
645    public ByteBuffer getValueByteBuffer() {
646      return this.valueBuffer;
647    }
648
649    @Override
650    public int getValuePosition() {
651      return this.valueOffset;
652    }
653
654    @Override
655    public ByteBuffer getTagsByteBuffer() {
656      return this.tagsBuffer;
657    }
658
659    @Override
660    public int getTagsPosition() {
661      return this.tagsOffset;
662    }
663
664    @Override
665    public long heapSize() {
666      return FIXED_OVERHEAD;
667    }
668
669    @Override
670    public void setSequenceId(long seqId) {
671      this.seqId = seqId;
672    }
673
674    @Override
675    public int write(OutputStream out, boolean withTags) throws IOException {
676      int lenToWrite = getSerializedSize(withTags);
677      ByteBufferUtils.putInt(out, keyBuffer.remaining());
678      ByteBufferUtils.putInt(out, valueLength);
679      // Write key
680      out.write(keyBuffer.array(), keyBuffer.arrayOffset(), keyBuffer.remaining());
681      // Write value
682      ByteBufferUtils.copyBufferToStream(out, this.valueBuffer, this.valueOffset, this.valueLength);
683      if (withTags && this.tagsLength > 0) {
684        // 2 bytes tags length followed by tags bytes
685        // tags length is serialized with 2 bytes only(short way) even if the type is int.
686        // As this is non -ve numbers, we save the sign bit. See HBASE-11437
687        out.write((byte) (0xff & (this.tagsLength >> 8)));
688        out.write((byte) (0xff & this.tagsLength));
689        ByteBufferUtils.copyBufferToStream(out, this.tagsBuffer, this.tagsOffset, this.tagsLength);
690      }
691      return lenToWrite;
692    }
693
694    @Override
695    public int getSerializedSize(boolean withTags) {
696      return KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength, tagsLength,
697        withTags);
698    }
699
700    @Override
701    public void setTimestamp(long ts) throws IOException {
702      // This is not used in actual flow. Throwing UnsupportedOperationException
703      throw new UnsupportedOperationException();
704    }
705
706    @Override
707    public void setTimestamp(byte[] ts) throws IOException {
708      // This is not used in actual flow. Throwing UnsupportedOperationException
709      throw new UnsupportedOperationException();
710    }
711
712    @Override
713    public void write(ByteBuffer buf, int offset) {
714      // This is not used in actual flow. Throwing UnsupportedOperationException
715      throw new UnsupportedOperationException();
716    }
717
718    @Override
719    public ExtendedCell deepClone() {
720      // This is not used in actual flow. Throwing UnsupportedOperationException
721      throw new UnsupportedOperationException();
722    }
723  }
724
725  protected abstract static class BufferedEncodedSeeker<STATE extends SeekerState>
726    extends AbstractEncodedSeeker {
727    protected ByteBuff currentBuffer;
728    protected TagCompressionContext tagCompressionContext = null;
729    protected KeyValue.KeyOnlyKeyValue keyOnlyKV = new KeyValue.KeyOnlyKeyValue();
730    // A temp pair object which will be reused by ByteBuff#asSubByteBuffer calls. This avoids too
731    // many object creations.
732    protected final ObjectIntPair<ByteBuffer> tmpPair = new ObjectIntPair<>();
733    protected STATE current, previous;
734
735    public BufferedEncodedSeeker(HFileBlockDecodingContext decodingCtx) {
736      super(decodingCtx);
737      if (decodingCtx.getHFileContext().isCompressTags()) {
738        try {
739          tagCompressionContext = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
740        } catch (Exception e) {
741          throw new RuntimeException("Failed to initialize TagCompressionContext", e);
742        }
743      }
744      current = createSeekerState(); // always valid
745      previous = createSeekerState(); // may not be valid
746    }
747
748    @Override
749    public int compareKey(CellComparator comparator, Cell key) {
750      keyOnlyKV.setKey(current.keyBuffer, 0, current.keyLength);
751      return PrivateCellUtil.compareKeyIgnoresMvcc(comparator, key, keyOnlyKV);
752    }
753
754    @Override
755    public void setCurrentBuffer(ByteBuff buffer) {
756      if (this.tagCompressionContext != null) {
757        this.tagCompressionContext.clear();
758      }
759      currentBuffer = buffer;
760      current.currentBuffer = currentBuffer;
761      if (tagCompressionContext != null) {
762        current.tagCompressionContext = tagCompressionContext;
763      }
764      decodeFirst();
765      current.setKey(current.keyBuffer, current.memstoreTS);
766      previous.invalidate();
767    }
768
769    @Override
770    public Cell getKey() {
771      byte[] key = new byte[current.keyLength];
772      System.arraycopy(current.keyBuffer, 0, key, 0, current.keyLength);
773      return new KeyValue.KeyOnlyKeyValue(key);
774    }
775
776    @Override
777    public ByteBuffer getValueShallowCopy() {
778      currentBuffer.asSubByteBuffer(current.valueOffset, current.valueLength, tmpPair);
779      ByteBuffer dup = tmpPair.getFirst().duplicate();
780      dup.position(tmpPair.getSecond());
781      dup.limit(tmpPair.getSecond() + current.valueLength);
782      return dup.slice();
783    }
784
785    @Override
786    public Cell getCell() {
787      return current.toCell();
788    }
789
790    @Override
791    public void rewind() {
792      currentBuffer.rewind();
793      if (tagCompressionContext != null) {
794        tagCompressionContext.clear();
795        // Prior seekToKeyInBlock may have reset this to false if we fell back to previous
796        // seeker state. This is an optimization so we don't have to uncompress tags again when
797        // reading last state.
798        // In case of rewind, we are starting from the beginning of the buffer, so we need
799        // to uncompress any tags we see.
800        // It may make sense to reset this in setCurrentBuffer as well, but we seem to only call
801        // setCurrentBuffer after StoreFileScanner.seekAtOrAfter which calls next to consume the
802        // seeker state. Rewind is called by seekBefore, which doesn't and leaves us in this state.
803        current.uncompressTags = true;
804      }
805      decodeFirst();
806      current.setKey(current.keyBuffer, current.memstoreTS);
807      previous.invalidate();
808    }
809
810    @Override
811    public boolean next() {
812      if (!currentBuffer.hasRemaining()) {
813        return false;
814      }
815      decodeNext();
816      current.setKey(current.keyBuffer, current.memstoreTS);
817      previous.invalidate();
818      return true;
819    }
820
821    protected void decodeTags() {
822      current.tagsLength = ByteBuff.readCompressedInt(currentBuffer);
823      if (tagCompressionContext != null) {
824        if (current.uncompressTags) {
825          // Tag compression is been used. uncompress it into tagsBuffer
826          current.ensureSpaceForTags();
827          try {
828            current.tagsCompressedLength = tagCompressionContext.uncompressTags(currentBuffer,
829              current.tagsBuffer, 0, current.tagsLength);
830          } catch (Exception e) {
831            throw new RuntimeException("Exception while uncompressing tags", e);
832          }
833        } else {
834          currentBuffer.skip(current.tagsCompressedLength);
835          current.uncompressTags = true;// Reset this.
836        }
837        current.tagsOffset = -1;
838      } else {
839        // When tag compress is not used, let us not do copying of tags bytes into tagsBuffer.
840        // Just mark the tags Offset so as to create the KV buffer later in getKeyValueBuffer()
841        current.tagsOffset = currentBuffer.position();
842        currentBuffer.skip(current.tagsLength);
843      }
844    }
845
846    @Override
847    public int seekToKeyInBlock(Cell seekCell, boolean seekBefore) {
848      int rowCommonPrefix = 0;
849      int familyCommonPrefix = 0;
850      int qualCommonPrefix = 0;
851      previous.invalidate();
852      do {
853        int comp;
854        keyOnlyKV.setKey(current.keyBuffer, 0, current.keyLength);
855        if (current.lastCommonPrefix != 0) {
856          // The KV format has row key length also in the byte array. The
857          // common prefix
858          // includes it. So we need to subtract to find out the common prefix
859          // in the
860          // row part alone
861          rowCommonPrefix = Math.min(rowCommonPrefix, current.lastCommonPrefix - 2);
862        }
863        if (current.lastCommonPrefix <= 2) {
864          rowCommonPrefix = 0;
865        }
866        rowCommonPrefix += findCommonPrefixInRowPart(seekCell, keyOnlyKV, rowCommonPrefix);
867        comp = compareCommonRowPrefix(seekCell, keyOnlyKV, rowCommonPrefix);
868        if (comp == 0) {
869          comp = compareTypeBytes(seekCell, keyOnlyKV);
870          if (comp == 0) {
871            // Subtract the fixed row key length and the family key fixed length
872            familyCommonPrefix = Math.max(0, Math.min(familyCommonPrefix,
873              current.lastCommonPrefix - (3 + keyOnlyKV.getRowLength())));
874            familyCommonPrefix +=
875              findCommonPrefixInFamilyPart(seekCell, keyOnlyKV, familyCommonPrefix);
876            comp = compareCommonFamilyPrefix(seekCell, keyOnlyKV, familyCommonPrefix);
877            if (comp == 0) {
878              // subtract the rowkey fixed length and the family key fixed
879              // length
880              qualCommonPrefix = Math.max(0, Math.min(qualCommonPrefix, current.lastCommonPrefix
881                - (3 + keyOnlyKV.getRowLength() + keyOnlyKV.getFamilyLength())));
882              qualCommonPrefix +=
883                findCommonPrefixInQualifierPart(seekCell, keyOnlyKV, qualCommonPrefix);
884              comp = compareCommonQualifierPrefix(seekCell, keyOnlyKV, qualCommonPrefix);
885              if (comp == 0) {
886                comp = CellComparator.getInstance().compareTimestamps(seekCell, keyOnlyKV);
887                if (comp == 0) {
888                  // Compare types. Let the delete types sort ahead of puts;
889                  // i.e. types
890                  // of higher numbers sort before those of lesser numbers.
891                  // Maximum
892                  // (255)
893                  // appears ahead of everything, and minimum (0) appears
894                  // after
895                  // everything.
896                  comp = (0xff & keyOnlyKV.getTypeByte()) - (0xff & seekCell.getTypeByte());
897                }
898              }
899            }
900          }
901        }
902        if (comp == 0) { // exact match
903          if (seekBefore) {
904            if (!previous.isValid()) {
905              // The caller (seekBefore) has to ensure that we are not at the
906              // first key in the block.
907              throw new IllegalStateException(
908                "Cannot seekBefore if " + "positioned at the first key in the block: key="
909                  + Bytes.toStringBinary(seekCell.getRowArray()));
910            }
911            moveToPrevious();
912            return 1;
913          }
914          return 0;
915        }
916
917        if (comp < 0) { // already too large, check previous
918          if (previous.isValid()) {
919            moveToPrevious();
920          } else {
921            return HConstants.INDEX_KEY_MAGIC; // using optimized index key
922          }
923          return 1;
924        }
925
926        // move to next, if more data is available
927        if (currentBuffer.hasRemaining()) {
928          previous.copyFromNext(current);
929          decodeNext();
930          current.setKey(current.keyBuffer, current.memstoreTS);
931        } else {
932          break;
933        }
934      } while (true);
935
936      // we hit the end of the block, not an exact match
937      return 1;
938    }
939
940    private int compareTypeBytes(Cell key, Cell right) {
941      if (
942        key.getFamilyLength() + key.getQualifierLength() == 0
943          && key.getTypeByte() == KeyValue.Type.Minimum.getCode()
944      ) {
945        // left is "bigger", i.e. it appears later in the sorted order
946        return 1;
947      }
948      if (
949        right.getFamilyLength() + right.getQualifierLength() == 0
950          && right.getTypeByte() == KeyValue.Type.Minimum.getCode()
951      ) {
952        return -1;
953      }
954      return 0;
955    }
956
957    private static int findCommonPrefixInRowPart(Cell left, Cell right, int rowCommonPrefix) {
958      return Bytes.findCommonPrefix(left.getRowArray(), right.getRowArray(),
959        left.getRowLength() - rowCommonPrefix, right.getRowLength() - rowCommonPrefix,
960        left.getRowOffset() + rowCommonPrefix, right.getRowOffset() + rowCommonPrefix);
961    }
962
963    private static int findCommonPrefixInFamilyPart(Cell left, Cell right, int familyCommonPrefix) {
964      return Bytes.findCommonPrefix(left.getFamilyArray(), right.getFamilyArray(),
965        left.getFamilyLength() - familyCommonPrefix, right.getFamilyLength() - familyCommonPrefix,
966        left.getFamilyOffset() + familyCommonPrefix, right.getFamilyOffset() + familyCommonPrefix);
967    }
968
969    private static int findCommonPrefixInQualifierPart(Cell left, Cell right,
970      int qualifierCommonPrefix) {
971      return Bytes.findCommonPrefix(left.getQualifierArray(), right.getQualifierArray(),
972        left.getQualifierLength() - qualifierCommonPrefix,
973        right.getQualifierLength() - qualifierCommonPrefix,
974        left.getQualifierOffset() + qualifierCommonPrefix,
975        right.getQualifierOffset() + qualifierCommonPrefix);
976    }
977
978    private void moveToPrevious() {
979      if (!previous.isValid()) {
980        throw new IllegalStateException(
981          "Can move back only once and not in first key in the block.");
982      }
983
984      STATE tmp = previous;
985      previous = current;
986      current = tmp;
987
988      // move after last key value
989      currentBuffer.position(current.nextKvOffset);
990      // Already decoded the tag bytes. We cache this tags into current state and also the total
991      // compressed length of the tags bytes. For the next time decodeNext() we don't need to decode
992      // the tags again. This might pollute the Data Dictionary what we use for the compression.
993      // When current.uncompressTags is false, we will just reuse the current.tagsBuffer and skip
994      // 'tagsCompressedLength' bytes of source stream.
995      // See in decodeTags()
996      current.tagsBuffer = previous.tagsBuffer;
997      current.tagsCompressedLength = previous.tagsCompressedLength;
998      current.uncompressTags = false;
999      // The current key has to be reset with the previous Cell
1000      current.setKey(current.keyBuffer, current.memstoreTS);
1001      previous.invalidate();
1002    }
1003
1004    @SuppressWarnings("unchecked")
1005    protected STATE createSeekerState() {
1006      // This will fail for non-default seeker state if the subclass does not
1007      // override this method.
1008      return (STATE) new SeekerState(this.tmpPair, this.includesTags());
1009    }
1010
1011    abstract protected void decodeFirst();
1012
1013    abstract protected void decodeNext();
1014  }
1015
1016  /** Returns unencoded size added */
1017  protected final int afterEncodingKeyValue(Cell cell, DataOutputStream out,
1018    HFileBlockDefaultEncodingContext encodingCtx) throws IOException {
1019    int size = 0;
1020    if (encodingCtx.getHFileContext().isIncludesTags()) {
1021      int tagsLength = cell.getTagsLength();
1022      ByteBufferUtils.putCompressedInt(out, tagsLength);
1023      // There are some tags to be written
1024      if (tagsLength > 0) {
1025        TagCompressionContext tagCompressionContext = encodingCtx.getTagCompressionContext();
1026        // When tag compression is enabled, tagCompressionContext will have a not null value. Write
1027        // the tags using Dictionary compression in such a case
1028        if (tagCompressionContext != null) {
1029          // Not passing tagsLength considering that parsing of the tagsLength is not costly
1030          PrivateCellUtil.compressTags(out, cell, tagCompressionContext);
1031        } else {
1032          PrivateCellUtil.writeTags(out, cell, tagsLength);
1033        }
1034      }
1035      size += tagsLength + KeyValue.TAGS_LENGTH_SIZE;
1036    }
1037    if (encodingCtx.getHFileContext().isIncludesMvcc()) {
1038      // Copy memstore timestamp from the byte buffer to the output stream.
1039      long memstoreTS = cell.getSequenceId();
1040      WritableUtils.writeVLong(out, memstoreTS);
1041      // TODO use a writeVLong which returns the #bytes written so that 2 time parsing can be
1042      // avoided.
1043      size += WritableUtils.getVIntSize(memstoreTS);
1044    }
1045    return size;
1046  }
1047
1048  protected final void afterDecodingKeyValue(DataInputStream source, ByteBuffer dest,
1049    HFileBlockDefaultDecodingContext decodingCtx) throws IOException {
1050    if (decodingCtx.getHFileContext().isIncludesTags()) {
1051      int tagsLength = ByteBufferUtils.readCompressedInt(source);
1052      // Put as unsigned short
1053      dest.put((byte) ((tagsLength >> 8) & 0xff));
1054      dest.put((byte) (tagsLength & 0xff));
1055      if (tagsLength > 0) {
1056        TagCompressionContext tagCompressionContext = decodingCtx.getTagCompressionContext();
1057        // When tag compression is been used in this file, tagCompressionContext will have a not
1058        // null value passed.
1059        if (tagCompressionContext != null) {
1060          tagCompressionContext.uncompressTags(source, dest, tagsLength);
1061        } else {
1062          ByteBufferUtils.copyFromStreamToBuffer(dest, source, tagsLength);
1063        }
1064      }
1065    }
1066    if (decodingCtx.getHFileContext().isIncludesMvcc()) {
1067      long memstoreTS = -1;
1068      try {
1069        // Copy memstore timestamp from the data input stream to the byte
1070        // buffer.
1071        memstoreTS = WritableUtils.readVLong(source);
1072        ByteBufferUtils.writeVLong(dest, memstoreTS);
1073      } catch (IOException ex) {
1074        throw new RuntimeException(
1075          "Unable to copy memstore timestamp " + memstoreTS + " after decoding a key/value");
1076      }
1077    }
1078  }
1079
1080  protected abstract ByteBuffer internalDecodeKeyValues(DataInputStream source,
1081    int allocateHeaderLength, int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx)
1082    throws IOException;
1083
1084  /**
1085   * Asserts that there is at least the given amount of unfilled space remaining in the given
1086   * buffer.
1087   * @param out    typically, the buffer we are writing to
1088   * @param length the required space in the buffer
1089   * @throws EncoderBufferTooSmallException If there are no enough bytes.
1090   */
1091  protected static void ensureSpace(ByteBuffer out, int length)
1092    throws EncoderBufferTooSmallException {
1093    if (out.position() + length > out.limit()) {
1094      throw new EncoderBufferTooSmallException("Buffer position=" + out.position()
1095        + ", buffer limit=" + out.limit() + ", length to be written=" + length);
1096    }
1097  }
1098
1099  @Override
1100  public void startBlockEncoding(HFileBlockEncodingContext blkEncodingCtx, DataOutputStream out)
1101    throws IOException {
1102    if (blkEncodingCtx.getClass() != HFileBlockDefaultEncodingContext.class) {
1103      throw new IOException(this.getClass().getName() + " only accepts "
1104        + HFileBlockDefaultEncodingContext.class.getName() + " as the " + "encoding context.");
1105    }
1106
1107    HFileBlockDefaultEncodingContext encodingCtx =
1108      (HFileBlockDefaultEncodingContext) blkEncodingCtx;
1109    encodingCtx.prepareEncoding(out);
1110    if (
1111      encodingCtx.getHFileContext().isIncludesTags()
1112        && encodingCtx.getHFileContext().isCompressTags()
1113    ) {
1114      if (encodingCtx.getTagCompressionContext() != null) {
1115        // It will be overhead to create the TagCompressionContext again and again for every block
1116        // encoding.
1117        encodingCtx.getTagCompressionContext().clear();
1118      } else {
1119        try {
1120          TagCompressionContext tagCompressionContext =
1121            new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
1122          encodingCtx.setTagCompressionContext(tagCompressionContext);
1123        } catch (Exception e) {
1124          throw new IOException("Failed to initialize TagCompressionContext", e);
1125        }
1126      }
1127    }
1128    StreamUtils.writeInt(out, 0); // DUMMY length. This will be updated in endBlockEncoding()
1129    blkEncodingCtx.setEncodingState(new EncodingState());
1130  }
1131
1132  @Override
1133  public void encode(Cell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
1134    throws IOException {
1135    EncodingState state = encodingCtx.getEncodingState();
1136    int posBeforeEncode = out.size();
1137    int encodedKvSize = internalEncode(cell, (HFileBlockDefaultEncodingContext) encodingCtx, out);
1138    state.postCellEncode(encodedKvSize, out.size() - posBeforeEncode);
1139  }
1140
1141  public abstract int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingCtx,
1142    DataOutputStream out) throws IOException;
1143
1144  @Override
1145  public void endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out,
1146    byte[] uncompressedBytesWithHeader) throws IOException {
1147    EncodingState state = encodingCtx.getEncodingState();
1148    // Write the unencodedDataSizeWritten (with header size)
1149    Bytes.putInt(uncompressedBytesWithHeader,
1150      HConstants.HFILEBLOCK_HEADER_SIZE + DataBlockEncoding.ID_SIZE,
1151      state.getUnencodedDataSizeWritten());
1152    postEncoding(encodingCtx);
1153  }
1154
1155}