001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with this
004 * work for additional information regarding copyright ownership. The ASF
005 * licenses this file to you under the Apache License, Version 2.0 (the
006 * "License"); you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
013 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
014 * License for the specific language governing permissions and limitations
015 * under the License.
016 */
017package org.apache.hadoop.hbase.io.encoding;
018
019import java.io.DataInputStream;
020import java.io.DataOutputStream;
021import java.io.IOException;
022import java.io.OutputStream;
023import java.nio.ByteBuffer;
024import org.apache.hadoop.hbase.ByteBufferExtendedCell;
025import org.apache.hadoop.hbase.Cell;
026import org.apache.hadoop.hbase.CellComparator;
027import org.apache.hadoop.hbase.CellUtil;
028import org.apache.hadoop.hbase.ExtendedCell;
029import org.apache.hadoop.hbase.HConstants;
030import org.apache.hadoop.hbase.KeyValue;
031import org.apache.hadoop.hbase.KeyValue.Type;
032import org.apache.hadoop.hbase.KeyValueUtil;
033import org.apache.hadoop.hbase.PrivateCellUtil;
034import org.apache.hadoop.hbase.io.TagCompressionContext;
035import org.apache.hadoop.hbase.io.util.LRUDictionary;
036import org.apache.hadoop.hbase.io.util.StreamUtils;
037import org.apache.hadoop.hbase.nio.ByteBuff;
038import org.apache.hadoop.hbase.util.ByteBufferUtils;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.apache.hadoop.hbase.util.ClassSize;
041import org.apache.hadoop.hbase.util.ObjectIntPair;
042import org.apache.hadoop.io.WritableUtils;
043import org.apache.yetus.audience.InterfaceAudience;
044
045/**
046 * Base class for all data block encoders that use a buffer.
047 */
048@InterfaceAudience.Private
049abstract class BufferedDataBlockEncoder extends AbstractDataBlockEncoder {
050  /**
051   * TODO: This datablockencoder is dealing in internals of hfileblocks. Purge reference to HFBs
052   */
053  private static int INITIAL_KEY_BUFFER_SIZE = 512;
054
055  @Override
056  public ByteBuffer decodeKeyValues(DataInputStream source,
057      HFileBlockDecodingContext blkDecodingCtx) throws IOException {
058    if (blkDecodingCtx.getClass() != HFileBlockDefaultDecodingContext.class) {
059      throw new IOException(this.getClass().getName() + " only accepts "
060          + HFileBlockDefaultDecodingContext.class.getName() + " as the decoding context.");
061    }
062
063    HFileBlockDefaultDecodingContext decodingCtx =
064        (HFileBlockDefaultDecodingContext) blkDecodingCtx;
065    if (decodingCtx.getHFileContext().isIncludesTags()
066        && decodingCtx.getHFileContext().isCompressTags()) {
067      if (decodingCtx.getTagCompressionContext() != null) {
068        // It will be overhead to create the TagCompressionContext again and again for every block
069        // decoding.
070        decodingCtx.getTagCompressionContext().clear();
071      } else {
072        try {
073          TagCompressionContext tagCompressionContext = new TagCompressionContext(
074              LRUDictionary.class, Byte.MAX_VALUE);
075          decodingCtx.setTagCompressionContext(tagCompressionContext);
076        } catch (Exception e) {
077          throw new IOException("Failed to initialize TagCompressionContext", e);
078        }
079      }
080    }
081    return internalDecodeKeyValues(source, 0, 0, decodingCtx);
082  }
083
084  /********************* common prefixes *************************/
085  // Having this as static is fine but if META is having DBE then we should
086  // change this.
087  public static int compareCommonRowPrefix(Cell left, Cell right, int rowCommonPrefix) {
088    return Bytes.compareTo(left.getRowArray(), left.getRowOffset() + rowCommonPrefix,
089        left.getRowLength() - rowCommonPrefix, right.getRowArray(), right.getRowOffset()
090            + rowCommonPrefix, right.getRowLength() - rowCommonPrefix);
091  }
092
093  public static int compareCommonFamilyPrefix(Cell left, Cell right, int familyCommonPrefix) {
094    return Bytes.compareTo(left.getFamilyArray(), left.getFamilyOffset() + familyCommonPrefix,
095        left.getFamilyLength() - familyCommonPrefix, right.getFamilyArray(),
096        right.getFamilyOffset() + familyCommonPrefix, right.getFamilyLength() - familyCommonPrefix);
097  }
098
099  public static int compareCommonQualifierPrefix(Cell left, Cell right, int qualCommonPrefix) {
100    return Bytes.compareTo(left.getQualifierArray(), left.getQualifierOffset() + qualCommonPrefix,
101        left.getQualifierLength() - qualCommonPrefix, right.getQualifierArray(),
102        right.getQualifierOffset() + qualCommonPrefix, right.getQualifierLength()
103            - qualCommonPrefix);
104  }
105
106  protected static class SeekerState {
107    protected ByteBuff currentBuffer;
108    protected TagCompressionContext tagCompressionContext;
109    protected int valueOffset = -1;
110    protected int keyLength;
111    protected int valueLength;
112    protected int lastCommonPrefix;
113    protected int tagsLength = 0;
114    protected int tagsOffset = -1;
115    protected int tagsCompressedLength = 0;
116    protected boolean uncompressTags = true;
117
118    /** We need to store a copy of the key. */
119    protected byte[] keyBuffer = HConstants.EMPTY_BYTE_ARRAY;
120    protected byte[] tagsBuffer = HConstants.EMPTY_BYTE_ARRAY;
121
122    protected long memstoreTS;
123    protected int nextKvOffset;
124    protected KeyValue.KeyOnlyKeyValue currentKey = new KeyValue.KeyOnlyKeyValue();
125    // A temp pair object which will be reused by ByteBuff#asSubByteBuffer calls. This avoids too
126    // many object creations.
127    private final ObjectIntPair<ByteBuffer> tmpPair;
128    private final boolean includeTags;
129
130    public SeekerState(ObjectIntPair<ByteBuffer> tmpPair, boolean includeTags) {
131      this.tmpPair = tmpPair;
132      this.includeTags = includeTags;
133    }
134
135    protected boolean isValid() {
136      return valueOffset != -1;
137    }
138
139    protected void invalidate() {
140      valueOffset = -1;
141      tagsCompressedLength = 0;
142      currentKey.clear();
143      uncompressTags = true;
144      currentBuffer = null;
145    }
146
147    protected void ensureSpaceForKey() {
148      if (keyLength > keyBuffer.length) {
149        int newKeyBufferLength = Integer.highestOneBit(Math.max(
150            INITIAL_KEY_BUFFER_SIZE, keyLength) - 1) << 1;
151        byte[] newKeyBuffer = new byte[newKeyBufferLength];
152        System.arraycopy(keyBuffer, 0, newKeyBuffer, 0, keyBuffer.length);
153        keyBuffer = newKeyBuffer;
154      }
155    }
156
157    protected void ensureSpaceForTags() {
158      if (tagsLength > tagsBuffer.length) {
159        int newTagsBufferLength = Integer.highestOneBit(Math.max(
160            INITIAL_KEY_BUFFER_SIZE, tagsLength) - 1) << 1;
161        byte[] newTagsBuffer = new byte[newTagsBufferLength];
162        System.arraycopy(tagsBuffer, 0, newTagsBuffer, 0, tagsBuffer.length);
163        tagsBuffer = newTagsBuffer;
164      }
165    }
166
167    protected void setKey(byte[] keyBuffer, long memTS) {
168      currentKey.setKey(keyBuffer, 0, keyLength);
169      memstoreTS = memTS;
170    }
171
172    /**
173     * Copy the state from the next one into this instance (the previous state
174     * placeholder). Used to save the previous state when we are advancing the
175     * seeker to the next key/value.
176     */
177    protected void copyFromNext(SeekerState nextState) {
178      if (keyBuffer.length != nextState.keyBuffer.length) {
179        keyBuffer = nextState.keyBuffer.clone();
180      } else if (!isValid()) {
181        // Note: we can only call isValid before we override our state, so this
182        // comes before all the assignments at the end of this method.
183        System.arraycopy(nextState.keyBuffer, 0, keyBuffer, 0,
184             nextState.keyLength);
185      } else {
186        // don't copy the common prefix between this key and the previous one
187        System.arraycopy(nextState.keyBuffer, nextState.lastCommonPrefix,
188            keyBuffer, nextState.lastCommonPrefix, nextState.keyLength
189                - nextState.lastCommonPrefix);
190      }
191      currentKey.set(nextState.currentKey);
192
193      valueOffset = nextState.valueOffset;
194      keyLength = nextState.keyLength;
195      valueLength = nextState.valueLength;
196      lastCommonPrefix = nextState.lastCommonPrefix;
197      nextKvOffset = nextState.nextKvOffset;
198      memstoreTS = nextState.memstoreTS;
199      currentBuffer = nextState.currentBuffer;
200      tagsOffset = nextState.tagsOffset;
201      tagsLength = nextState.tagsLength;
202      if (nextState.tagCompressionContext != null) {
203        tagCompressionContext = nextState.tagCompressionContext;
204      }
205    }
206
207    public Cell toCell() {
208      // Buffer backing the value and tags part from the HFileBlock's buffer
209      // When tag compression in use, this will be only the value bytes area.
210      ByteBuffer valAndTagsBuffer;
211      int vOffset;
212      int valAndTagsLength = this.valueLength;
213      int tagsLenSerializationSize = 0;
214      if (this.includeTags && this.tagCompressionContext == null) {
215        // Include the tags part also. This will be the tags bytes + 2 bytes of for storing tags
216        // length
217        tagsLenSerializationSize = this.tagsOffset - (this.valueOffset + this.valueLength);
218        valAndTagsLength += tagsLenSerializationSize + this.tagsLength;
219      }
220      this.currentBuffer.asSubByteBuffer(this.valueOffset, valAndTagsLength, this.tmpPair);
221      valAndTagsBuffer = this.tmpPair.getFirst();
222      vOffset = this.tmpPair.getSecond();// This is the offset to value part in the BB
223      if (valAndTagsBuffer.hasArray()) {
224        return toOnheapCell(valAndTagsBuffer, vOffset, tagsLenSerializationSize);
225      } else {
226        return toOffheapCell(valAndTagsBuffer, vOffset, tagsLenSerializationSize);
227      }
228    }
229
230    private Cell toOnheapCell(ByteBuffer valAndTagsBuffer, int vOffset,
231        int tagsLenSerializationSize) {
232      byte[] tagsArray = HConstants.EMPTY_BYTE_ARRAY;
233      int tOffset = 0;
234      if (this.includeTags) {
235        if (this.tagCompressionContext == null) {
236          tagsArray = valAndTagsBuffer.array();
237          tOffset = valAndTagsBuffer.arrayOffset() + vOffset + this.valueLength
238              + tagsLenSerializationSize;
239        } else {
240          tagsArray = Bytes.copy(tagsBuffer, 0, this.tagsLength);
241          tOffset = 0;
242        }
243      }
244      return new OnheapDecodedCell(Bytes.copy(keyBuffer, 0, this.keyLength),
245          currentKey.getRowLength(), currentKey.getFamilyOffset(), currentKey.getFamilyLength(),
246          currentKey.getQualifierOffset(), currentKey.getQualifierLength(),
247          currentKey.getTimestamp(), currentKey.getTypeByte(), valAndTagsBuffer.array(),
248          valAndTagsBuffer.arrayOffset() + vOffset, this.valueLength, memstoreTS, tagsArray,
249          tOffset, this.tagsLength);
250    }
251
252    private Cell toOffheapCell(ByteBuffer valAndTagsBuffer, int vOffset,
253        int tagsLenSerializationSize) {
254      ByteBuffer tagsBuf = HConstants.EMPTY_BYTE_BUFFER;
255      int tOffset = 0;
256      if (this.includeTags) {
257        if (this.tagCompressionContext == null) {
258          tagsBuf = valAndTagsBuffer;
259          tOffset = vOffset + this.valueLength + tagsLenSerializationSize;
260        } else {
261          tagsBuf = ByteBuffer.wrap(Bytes.copy(tagsBuffer, 0, this.tagsLength));
262          tOffset = 0;
263        }
264      }
265      return new OffheapDecodedExtendedCell(
266          ByteBuffer.wrap(Bytes.copy(keyBuffer, 0, this.keyLength)), currentKey.getRowLength(),
267          currentKey.getFamilyOffset(), currentKey.getFamilyLength(),
268          currentKey.getQualifierOffset(), currentKey.getQualifierLength(),
269          currentKey.getTimestamp(), currentKey.getTypeByte(), valAndTagsBuffer, vOffset,
270          this.valueLength, memstoreTS, tagsBuf, tOffset, this.tagsLength);
271    }
272  }
273
274  /**
275   * Copies only the key part of the keybuffer by doing a deep copy and passes the
276   * seeker state members for taking a clone.
277   * Note that the value byte[] part is still pointing to the currentBuffer and
278   * represented by the valueOffset and valueLength
279   */
280  // We return this as a Cell to the upper layers of read flow and might try setting a new SeqId
281  // there. So this has to be an instance of ExtendedCell.
282  protected static class OnheapDecodedCell implements ExtendedCell {
283    private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
284        + (3 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT)
285        + (Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) + (3 * ClassSize.ARRAY));
286    private byte[] keyOnlyBuffer;
287    private short rowLength;
288    private int familyOffset;
289    private byte familyLength;
290    private int qualifierOffset;
291    private int qualifierLength;
292    private long timestamp;
293    private byte typeByte;
294    private byte[] valueBuffer;
295    private int valueOffset;
296    private int valueLength;
297    private byte[] tagsBuffer;
298    private int tagsOffset;
299    private int tagsLength;
300    private long seqId;
301
302    protected OnheapDecodedCell(byte[] keyBuffer, short rowLength, int familyOffset,
303        byte familyLength, int qualOffset, int qualLength, long timeStamp, byte typeByte,
304        byte[] valueBuffer, int valueOffset, int valueLen, long seqId, byte[] tagsBuffer,
305        int tagsOffset, int tagsLength) {
306      this.keyOnlyBuffer = keyBuffer;
307      this.rowLength = rowLength;
308      this.familyOffset = familyOffset;
309      this.familyLength = familyLength;
310      this.qualifierOffset = qualOffset;
311      this.qualifierLength = qualLength;
312      this.timestamp = timeStamp;
313      this.typeByte = typeByte;
314      this.valueBuffer = valueBuffer;
315      this.valueOffset = valueOffset;
316      this.valueLength = valueLen;
317      this.tagsBuffer = tagsBuffer;
318      this.tagsOffset = tagsOffset;
319      this.tagsLength = tagsLength;
320      setSequenceId(seqId);
321    }
322
323    @Override
324    public byte[] getRowArray() {
325      return keyOnlyBuffer;
326    }
327
328    @Override
329    public byte[] getFamilyArray() {
330      return keyOnlyBuffer;
331    }
332
333    @Override
334    public byte[] getQualifierArray() {
335      return keyOnlyBuffer;
336    }
337
338    @Override
339    public int getRowOffset() {
340      return Bytes.SIZEOF_SHORT;
341    }
342
343    @Override
344    public short getRowLength() {
345      return rowLength;
346    }
347
348    @Override
349    public int getFamilyOffset() {
350      return familyOffset;
351    }
352
353    @Override
354    public byte getFamilyLength() {
355      return familyLength;
356    }
357
358    @Override
359    public int getQualifierOffset() {
360      return qualifierOffset;
361    }
362
363    @Override
364    public int getQualifierLength() {
365      return qualifierLength;
366    }
367
368    @Override
369    public long getTimestamp() {
370      return timestamp;
371    }
372
373    @Override
374    public byte getTypeByte() {
375      return typeByte;
376    }
377
378    @Override
379    public long getSequenceId() {
380      return seqId;
381    }
382
383    @Override
384    public byte[] getValueArray() {
385      return this.valueBuffer;
386    }
387
388    @Override
389    public int getValueOffset() {
390      return valueOffset;
391    }
392
393    @Override
394    public int getValueLength() {
395      return valueLength;
396    }
397
398    @Override
399    public byte[] getTagsArray() {
400      return this.tagsBuffer;
401    }
402
403    @Override
404    public int getTagsOffset() {
405      return this.tagsOffset;
406    }
407
408    @Override
409    public int getTagsLength() {
410      return tagsLength;
411    }
412
413    @Override
414    public String toString() {
415      return KeyValue.keyToString(this.keyOnlyBuffer, 0, KeyValueUtil.keyLength(this)) + "/vlen="
416          + getValueLength() + "/seqid=" + seqId;
417    }
418
419    @Override
420    public void setSequenceId(long seqId) {
421      this.seqId = seqId;
422    }
423
424    @Override
425    public long heapSize() {
426      return FIXED_OVERHEAD + rowLength + familyLength + qualifierLength + valueLength + tagsLength;
427    }
428
429    @Override
430    public int write(OutputStream out, boolean withTags) throws IOException {
431      int lenToWrite = getSerializedSize(withTags);
432      ByteBufferUtils.putInt(out, keyOnlyBuffer.length);
433      ByteBufferUtils.putInt(out, valueLength);
434      // Write key
435      out.write(keyOnlyBuffer);
436      // Write value
437      out.write(this.valueBuffer, this.valueOffset, this.valueLength);
438      if (withTags && this.tagsLength > 0) {
439        // 2 bytes tags length followed by tags bytes
440        // tags length is serialized with 2 bytes only(short way) even if the type is int.
441        // As this is non -ve numbers, we save the sign bit. See HBASE-11437
442        out.write((byte) (0xff & (this.tagsLength >> 8)));
443        out.write((byte) (0xff & this.tagsLength));
444        out.write(this.tagsBuffer, this.tagsOffset, this.tagsLength);
445      }
446      return lenToWrite;
447    }
448
449    @Override
450    public int getSerializedSize(boolean withTags) {
451      return KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength, tagsLength,
452          withTags);
453    }
454
455    @Override
456    public void write(ByteBuffer buf, int offset) {
457      // This is not used in actual flow. Throwing UnsupportedOperationException
458      throw new UnsupportedOperationException();
459    }
460
461    @Override
462    public void setTimestamp(long ts) throws IOException {
463      // This is not used in actual flow. Throwing UnsupportedOperationException
464      throw new UnsupportedOperationException();
465    }
466
467    @Override
468    public void setTimestamp(byte[] ts) throws IOException {
469      // This is not used in actual flow. Throwing UnsupportedOperationException
470      throw new UnsupportedOperationException();
471    }
472
473    @Override
474    public ExtendedCell deepClone() {
475      // This is not used in actual flow. Throwing UnsupportedOperationException
476      throw new UnsupportedOperationException();
477    }
478  }
479
480  protected static class OffheapDecodedExtendedCell extends ByteBufferExtendedCell {
481    private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
482        + (3 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT)
483        + (Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) + (3 * ClassSize.BYTE_BUFFER));
484    private ByteBuffer keyBuffer;
485    private short rowLength;
486    private int familyOffset;
487    private byte familyLength;
488    private int qualifierOffset;
489    private int qualifierLength;
490    private long timestamp;
491    private byte typeByte;
492    private ByteBuffer valueBuffer;
493    private int valueOffset;
494    private int valueLength;
495    private ByteBuffer tagsBuffer;
496    private int tagsOffset;
497    private int tagsLength;
498    private long seqId;
499
500    protected OffheapDecodedExtendedCell(ByteBuffer keyBuffer, short rowLength, int familyOffset,
501        byte familyLength, int qualOffset, int qualLength, long timeStamp, byte typeByte,
502        ByteBuffer valueBuffer, int valueOffset, int valueLen, long seqId, ByteBuffer tagsBuffer,
503        int tagsOffset, int tagsLength) {
504      // The keyBuffer is always onheap
505      assert keyBuffer.hasArray();
506      assert keyBuffer.arrayOffset() == 0;
507      this.keyBuffer = keyBuffer;
508      this.rowLength = rowLength;
509      this.familyOffset = familyOffset;
510      this.familyLength = familyLength;
511      this.qualifierOffset = qualOffset;
512      this.qualifierLength = qualLength;
513      this.timestamp = timeStamp;
514      this.typeByte = typeByte;
515      this.valueBuffer = valueBuffer;
516      this.valueOffset = valueOffset;
517      this.valueLength = valueLen;
518      this.tagsBuffer = tagsBuffer;
519      this.tagsOffset = tagsOffset;
520      this.tagsLength = tagsLength;
521      setSequenceId(seqId);
522    }
523
524    @Override
525    public byte[] getRowArray() {
526      return this.keyBuffer.array();
527    }
528
529    @Override
530    public int getRowOffset() {
531      return getRowPosition();
532    }
533
534    @Override
535    public short getRowLength() {
536      return this.rowLength;
537    }
538
539    @Override
540    public byte[] getFamilyArray() {
541      return this.keyBuffer.array();
542    }
543
544    @Override
545    public int getFamilyOffset() {
546      return getFamilyPosition();
547    }
548
549    @Override
550    public byte getFamilyLength() {
551      return this.familyLength;
552    }
553
554    @Override
555    public byte[] getQualifierArray() {
556      return this.keyBuffer.array();
557    }
558
559    @Override
560    public int getQualifierOffset() {
561      return getQualifierPosition();
562    }
563
564    @Override
565    public int getQualifierLength() {
566      return this.qualifierLength;
567    }
568
569    @Override
570    public long getTimestamp() {
571      return this.timestamp;
572    }
573
574    @Override
575    public byte getTypeByte() {
576      return this.typeByte;
577    }
578
579    @Override
580    public long getSequenceId() {
581      return this.seqId;
582    }
583
584    @Override
585    public byte[] getValueArray() {
586      return CellUtil.cloneValue(this);
587    }
588
589    @Override
590    public int getValueOffset() {
591      return 0;
592    }
593
594    @Override
595    public int getValueLength() {
596      return this.valueLength;
597    }
598
599    @Override
600    public byte[] getTagsArray() {
601      return CellUtil.cloneTags(this);
602    }
603
604    @Override
605    public int getTagsOffset() {
606      return 0;
607    }
608
609    @Override
610    public int getTagsLength() {
611      return this.tagsLength;
612    }
613
614    @Override
615    public ByteBuffer getRowByteBuffer() {
616      return this.keyBuffer;
617    }
618
619    @Override
620    public int getRowPosition() {
621      return Bytes.SIZEOF_SHORT;
622    }
623
624    @Override
625    public ByteBuffer getFamilyByteBuffer() {
626      return this.keyBuffer;
627    }
628
629    @Override
630    public int getFamilyPosition() {
631      return this.familyOffset;
632    }
633
634    @Override
635    public ByteBuffer getQualifierByteBuffer() {
636      return this.keyBuffer;
637    }
638
639    @Override
640    public int getQualifierPosition() {
641      return this.qualifierOffset;
642    }
643
644    @Override
645    public ByteBuffer getValueByteBuffer() {
646      return this.valueBuffer;
647    }
648
649    @Override
650    public int getValuePosition() {
651      return this.valueOffset;
652    }
653
654    @Override
655    public ByteBuffer getTagsByteBuffer() {
656      return this.tagsBuffer;
657    }
658
659    @Override
660    public int getTagsPosition() {
661      return this.tagsOffset;
662    }
663
664    @Override
665    public long heapSize() {
666      return FIXED_OVERHEAD;
667    }
668
669    @Override
670    public void setSequenceId(long seqId) {
671      this.seqId = seqId;
672    }
673
674    @Override
675    public int write(OutputStream out, boolean withTags) throws IOException {
676      int lenToWrite = getSerializedSize(withTags);
677      ByteBufferUtils.putInt(out, keyBuffer.capacity());
678      ByteBufferUtils.putInt(out, valueLength);
679      // Write key
680      out.write(keyBuffer.array());
681      // Write value
682      ByteBufferUtils.copyBufferToStream(out, this.valueBuffer, this.valueOffset, this.valueLength);
683      if (withTags && this.tagsLength > 0) {
684        // 2 bytes tags length followed by tags bytes
685        // tags length is serialized with 2 bytes only(short way) even if the type is int.
686        // As this is non -ve numbers, we save the sign bit. See HBASE-11437
687        out.write((byte) (0xff & (this.tagsLength >> 8)));
688        out.write((byte) (0xff & this.tagsLength));
689        ByteBufferUtils.copyBufferToStream(out, this.tagsBuffer, this.tagsOffset, this.tagsLength);
690      }
691      return lenToWrite;
692    }
693
694    @Override
695    public int getSerializedSize(boolean withTags) {
696      return KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength, tagsLength,
697          withTags);
698    }
699
700    @Override
701    public void setTimestamp(long ts) throws IOException {
702      // This is not used in actual flow. Throwing UnsupportedOperationException
703      throw new UnsupportedOperationException();
704    }
705
706    @Override
707    public void setTimestamp(byte[] ts) throws IOException {
708      // This is not used in actual flow. Throwing UnsupportedOperationException
709      throw new UnsupportedOperationException();
710    }
711
712    @Override
713    public void write(ByteBuffer buf, int offset) {
714      // This is not used in actual flow. Throwing UnsupportedOperationException
715      throw new UnsupportedOperationException();
716    }
717
718    @Override
719    public ExtendedCell deepClone() {
720      // This is not used in actual flow. Throwing UnsupportedOperationException
721      throw new UnsupportedOperationException();
722    }
723  }
724
725  protected abstract static class BufferedEncodedSeeker<STATE extends SeekerState>
726      extends AbstractEncodedSeeker {
727    protected ByteBuff currentBuffer;
728    protected TagCompressionContext tagCompressionContext = null;
729    protected  KeyValue.KeyOnlyKeyValue keyOnlyKV = new KeyValue.KeyOnlyKeyValue();
730    // A temp pair object which will be reused by ByteBuff#asSubByteBuffer calls. This avoids too
731    // many object creations.
732    protected final ObjectIntPair<ByteBuffer> tmpPair = new ObjectIntPair<>();
733    protected STATE current, previous;
734
735    public BufferedEncodedSeeker(CellComparator comparator,
736        HFileBlockDecodingContext decodingCtx) {
737      super(comparator, decodingCtx);
738      if (decodingCtx.getHFileContext().isCompressTags()) {
739        try {
740          tagCompressionContext = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
741        } catch (Exception e) {
742          throw new RuntimeException("Failed to initialize TagCompressionContext", e);
743        }
744      }
745      current = createSeekerState(); // always valid
746      previous = createSeekerState(); // may not be valid
747    }
748
749    @Override
750    public int compareKey(CellComparator comparator, Cell key) {
751      keyOnlyKV.setKey(current.keyBuffer, 0, current.keyLength);
752      return PrivateCellUtil.compareKeyIgnoresMvcc(comparator, key, keyOnlyKV);
753    }
754
755    @Override
756    public void setCurrentBuffer(ByteBuff buffer) {
757      if (this.tagCompressionContext != null) {
758        this.tagCompressionContext.clear();
759      }
760      currentBuffer = buffer;
761      current.currentBuffer = currentBuffer;
762      if(tagCompressionContext != null) {
763        current.tagCompressionContext = tagCompressionContext;
764      }
765      decodeFirst();
766      current.setKey(current.keyBuffer, current.memstoreTS);
767      previous.invalidate();
768    }
769
770    @Override
771    public Cell getKey() {
772      byte[] key = new byte[current.keyLength];
773      System.arraycopy(current.keyBuffer, 0, key, 0, current.keyLength);
774      return new KeyValue.KeyOnlyKeyValue(key);
775    }
776
777    @Override
778    public ByteBuffer getValueShallowCopy() {
779      currentBuffer.asSubByteBuffer(current.valueOffset, current.valueLength, tmpPair);
780      ByteBuffer dup = tmpPair.getFirst().duplicate();
781      dup.position(tmpPair.getSecond());
782      dup.limit(tmpPair.getSecond() + current.valueLength);
783      return dup.slice();
784    }
785
786    @Override
787    public Cell getCell() {
788      return current.toCell();
789    }
790
791    @Override
792    public void rewind() {
793      currentBuffer.rewind();
794      if (tagCompressionContext != null) {
795        tagCompressionContext.clear();
796      }
797      decodeFirst();
798      current.setKey(current.keyBuffer, current.memstoreTS);
799      previous.invalidate();
800    }
801
802    @Override
803    public boolean next() {
804      if (!currentBuffer.hasRemaining()) {
805        return false;
806      }
807      decodeNext();
808      current.setKey(current.keyBuffer, current.memstoreTS);
809      previous.invalidate();
810      return true;
811    }
812
813    protected void decodeTags() {
814      current.tagsLength = ByteBuff.readCompressedInt(currentBuffer);
815      if (tagCompressionContext != null) {
816        if (current.uncompressTags) {
817          // Tag compression is been used. uncompress it into tagsBuffer
818          current.ensureSpaceForTags();
819          try {
820            current.tagsCompressedLength = tagCompressionContext.uncompressTags(currentBuffer,
821                current.tagsBuffer, 0, current.tagsLength);
822          } catch (IOException e) {
823            throw new RuntimeException("Exception while uncompressing tags", e);
824          }
825        } else {
826          currentBuffer.skip(current.tagsCompressedLength);
827          current.uncompressTags = true;// Reset this.
828        }
829        current.tagsOffset = -1;
830      } else {
831        // When tag compress is not used, let us not do copying of tags bytes into tagsBuffer.
832        // Just mark the tags Offset so as to create the KV buffer later in getKeyValueBuffer()
833        current.tagsOffset = currentBuffer.position();
834        currentBuffer.skip(current.tagsLength);
835      }
836    }
837
838    @Override
839    public int seekToKeyInBlock(Cell seekCell, boolean seekBefore) {
840      int rowCommonPrefix = 0;
841      int familyCommonPrefix = 0;
842      int qualCommonPrefix = 0;
843      previous.invalidate();
844      do {
845        int comp;
846        keyOnlyKV.setKey(current.keyBuffer, 0, current.keyLength);
847        if (current.lastCommonPrefix != 0) {
848          // The KV format has row key length also in the byte array. The
849          // common prefix
850          // includes it. So we need to subtract to find out the common prefix
851          // in the
852          // row part alone
853          rowCommonPrefix = Math.min(rowCommonPrefix, current.lastCommonPrefix - 2);
854        }
855        if (current.lastCommonPrefix <= 2) {
856          rowCommonPrefix = 0;
857        }
858        rowCommonPrefix += findCommonPrefixInRowPart(seekCell, keyOnlyKV, rowCommonPrefix);
859        comp = compareCommonRowPrefix(seekCell, keyOnlyKV, rowCommonPrefix);
860        if (comp == 0) {
861          comp = compareTypeBytes(seekCell, keyOnlyKV);
862          if (comp == 0) {
863            // Subtract the fixed row key length and the family key fixed length
864            familyCommonPrefix = Math.max(
865                0,
866                Math.min(familyCommonPrefix,
867                    current.lastCommonPrefix - (3 + keyOnlyKV.getRowLength())));
868            familyCommonPrefix += findCommonPrefixInFamilyPart(seekCell, keyOnlyKV,
869                familyCommonPrefix);
870            comp = compareCommonFamilyPrefix(seekCell, keyOnlyKV, familyCommonPrefix);
871            if (comp == 0) {
872              // subtract the rowkey fixed length and the family key fixed
873              // length
874              qualCommonPrefix = Math.max(
875                  0,
876                  Math.min(
877                      qualCommonPrefix,
878                      current.lastCommonPrefix
879                          - (3 + keyOnlyKV.getRowLength() + keyOnlyKV.getFamilyLength())));
880              qualCommonPrefix += findCommonPrefixInQualifierPart(seekCell, keyOnlyKV,
881                  qualCommonPrefix);
882              comp = compareCommonQualifierPrefix(seekCell, keyOnlyKV, qualCommonPrefix);
883              if (comp == 0) {
884                comp = CellComparator.getInstance().compareTimestamps(seekCell, keyOnlyKV);
885                if (comp == 0) {
886                  // Compare types. Let the delete types sort ahead of puts;
887                  // i.e. types
888                  // of higher numbers sort before those of lesser numbers.
889                  // Maximum
890                  // (255)
891                  // appears ahead of everything, and minimum (0) appears
892                  // after
893                  // everything.
894                  comp = (0xff & keyOnlyKV.getTypeByte()) - (0xff & seekCell.getTypeByte());
895                }
896              }
897            }
898          }
899        }
900        if (comp == 0) { // exact match
901          if (seekBefore) {
902            if (!previous.isValid()) {
903              // The caller (seekBefore) has to ensure that we are not at the
904              // first key in the block.
905              throw new IllegalStateException("Cannot seekBefore if "
906                  + "positioned at the first key in the block: key="
907                  + Bytes.toStringBinary(seekCell.getRowArray()));
908            }
909            moveToPrevious();
910            return 1;
911          }
912          return 0;
913        }
914
915        if (comp < 0) { // already too large, check previous
916          if (previous.isValid()) {
917            moveToPrevious();
918          } else {
919            return HConstants.INDEX_KEY_MAGIC; // using optimized index key
920          }
921          return 1;
922        }
923
924        // move to next, if more data is available
925        if (currentBuffer.hasRemaining()) {
926          previous.copyFromNext(current);
927          decodeNext();
928          current.setKey(current.keyBuffer, current.memstoreTS);
929        } else {
930          break;
931        }
932      } while (true);
933
934      // we hit the end of the block, not an exact match
935      return 1;
936    }
937
938    private int compareTypeBytes(Cell key, Cell right) {
939      if (key.getFamilyLength() + key.getQualifierLength() == 0
940          && key.getTypeByte() == Type.Minimum.getCode()) {
941        // left is "bigger", i.e. it appears later in the sorted order
942        return 1;
943      }
944      if (right.getFamilyLength() + right.getQualifierLength() == 0
945          && right.getTypeByte() == Type.Minimum.getCode()) {
946        return -1;
947      }
948      return 0;
949    }
950
951    private static int findCommonPrefixInRowPart(Cell left, Cell right, int rowCommonPrefix) {
952      return Bytes.findCommonPrefix(left.getRowArray(), right.getRowArray(), left.getRowLength()
953          - rowCommonPrefix, right.getRowLength() - rowCommonPrefix, left.getRowOffset()
954          + rowCommonPrefix, right.getRowOffset() + rowCommonPrefix);
955    }
956
957    private static int findCommonPrefixInFamilyPart(Cell left, Cell right, int familyCommonPrefix) {
958      return Bytes
959          .findCommonPrefix(left.getFamilyArray(), right.getFamilyArray(), left.getFamilyLength()
960              - familyCommonPrefix, right.getFamilyLength() - familyCommonPrefix,
961              left.getFamilyOffset() + familyCommonPrefix, right.getFamilyOffset()
962                  + familyCommonPrefix);
963    }
964
965    private static int findCommonPrefixInQualifierPart(Cell left, Cell right,
966        int qualifierCommonPrefix) {
967      return Bytes.findCommonPrefix(left.getQualifierArray(), right.getQualifierArray(),
968          left.getQualifierLength() - qualifierCommonPrefix, right.getQualifierLength()
969              - qualifierCommonPrefix, left.getQualifierOffset() + qualifierCommonPrefix,
970          right.getQualifierOffset() + qualifierCommonPrefix);
971    }
972
973    private void moveToPrevious() {
974      if (!previous.isValid()) {
975        throw new IllegalStateException(
976            "Can move back only once and not in first key in the block.");
977      }
978
979      STATE tmp = previous;
980      previous = current;
981      current = tmp;
982
983      // move after last key value
984      currentBuffer.position(current.nextKvOffset);
985      // Already decoded the tag bytes. We cache this tags into current state and also the total
986      // compressed length of the tags bytes. For the next time decodeNext() we don't need to decode
987      // the tags again. This might pollute the Data Dictionary what we use for the compression.
988      // When current.uncompressTags is false, we will just reuse the current.tagsBuffer and skip
989      // 'tagsCompressedLength' bytes of source stream.
990      // See in decodeTags()
991      current.tagsBuffer = previous.tagsBuffer;
992      current.tagsCompressedLength = previous.tagsCompressedLength;
993      current.uncompressTags = false;
994      // The current key has to be reset with the previous Cell
995      current.setKey(current.keyBuffer, current.memstoreTS);
996      previous.invalidate();
997    }
998
999    @SuppressWarnings("unchecked")
1000    protected STATE createSeekerState() {
1001      // This will fail for non-default seeker state if the subclass does not
1002      // override this method.
1003      return (STATE) new SeekerState(this.tmpPair, this.includesTags());
1004    }
1005
1006    abstract protected void decodeFirst();
1007    abstract protected void decodeNext();
1008  }
1009
1010  /**
1011   * @param cell
1012   * @param out
1013   * @param encodingCtx
1014   * @return unencoded size added
1015   * @throws IOException
1016   */
1017  protected final int afterEncodingKeyValue(Cell cell, DataOutputStream out,
1018      HFileBlockDefaultEncodingContext encodingCtx) throws IOException {
1019    int size = 0;
1020    if (encodingCtx.getHFileContext().isIncludesTags()) {
1021      int tagsLength = cell.getTagsLength();
1022      ByteBufferUtils.putCompressedInt(out, tagsLength);
1023      // There are some tags to be written
1024      if (tagsLength > 0) {
1025        TagCompressionContext tagCompressionContext = encodingCtx.getTagCompressionContext();
1026        // When tag compression is enabled, tagCompressionContext will have a not null value. Write
1027        // the tags using Dictionary compression in such a case
1028        if (tagCompressionContext != null) {
1029          // Not passing tagsLength considering that parsing of the tagsLength is not costly
1030          PrivateCellUtil.compressTags(out, cell, tagCompressionContext);
1031        } else {
1032          PrivateCellUtil.writeTags(out, cell, tagsLength);
1033        }
1034      }
1035      size += tagsLength + KeyValue.TAGS_LENGTH_SIZE;
1036    }
1037    if (encodingCtx.getHFileContext().isIncludesMvcc()) {
1038      // Copy memstore timestamp from the byte buffer to the output stream.
1039      long memstoreTS = cell.getSequenceId();
1040      WritableUtils.writeVLong(out, memstoreTS);
1041      // TODO use a writeVLong which returns the #bytes written so that 2 time parsing can be
1042      // avoided.
1043      size += WritableUtils.getVIntSize(memstoreTS);
1044    }
1045    return size;
1046  }
1047
1048  protected final void afterDecodingKeyValue(DataInputStream source,
1049      ByteBuffer dest, HFileBlockDefaultDecodingContext decodingCtx) throws IOException {
1050    if (decodingCtx.getHFileContext().isIncludesTags()) {
1051      int tagsLength = ByteBufferUtils.readCompressedInt(source);
1052      // Put as unsigned short
1053      dest.put((byte) ((tagsLength >> 8) & 0xff));
1054      dest.put((byte) (tagsLength & 0xff));
1055      if (tagsLength > 0) {
1056        TagCompressionContext tagCompressionContext = decodingCtx.getTagCompressionContext();
1057        // When tag compression is been used in this file, tagCompressionContext will have a not
1058        // null value passed.
1059        if (tagCompressionContext != null) {
1060          tagCompressionContext.uncompressTags(source, dest, tagsLength);
1061        } else {
1062          ByteBufferUtils.copyFromStreamToBuffer(dest, source, tagsLength);
1063        }
1064      }
1065    }
1066    if (decodingCtx.getHFileContext().isIncludesMvcc()) {
1067      long memstoreTS = -1;
1068      try {
1069        // Copy memstore timestamp from the data input stream to the byte
1070        // buffer.
1071        memstoreTS = WritableUtils.readVLong(source);
1072        ByteBufferUtils.writeVLong(dest, memstoreTS);
1073      } catch (IOException ex) {
1074        throw new RuntimeException("Unable to copy memstore timestamp " +
1075            memstoreTS + " after decoding a key/value");
1076      }
1077    }
1078  }
1079
1080  protected abstract ByteBuffer internalDecodeKeyValues(DataInputStream source,
1081      int allocateHeaderLength, int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx)
1082      throws IOException;
1083
1084  /**
1085   * Asserts that there is at least the given amount of unfilled space
1086   * remaining in the given buffer.
1087   * @param out typically, the buffer we are writing to
1088   * @param length the required space in the buffer
1089   * @throws EncoderBufferTooSmallException If there are no enough bytes.
1090   */
1091  protected static void ensureSpace(ByteBuffer out, int length)
1092      throws EncoderBufferTooSmallException {
1093    if (out.position() + length > out.limit()) {
1094      throw new EncoderBufferTooSmallException(
1095          "Buffer position=" + out.position() +
1096          ", buffer limit=" + out.limit() +
1097          ", length to be written=" + length);
1098    }
1099  }
1100
1101  @Override
1102  public void startBlockEncoding(HFileBlockEncodingContext blkEncodingCtx, DataOutputStream out)
1103      throws IOException {
1104    if (blkEncodingCtx.getClass() != HFileBlockDefaultEncodingContext.class) {
1105      throw new IOException (this.getClass().getName() + " only accepts "
1106          + HFileBlockDefaultEncodingContext.class.getName() + " as the " +
1107          "encoding context.");
1108    }
1109
1110    HFileBlockDefaultEncodingContext encodingCtx =
1111        (HFileBlockDefaultEncodingContext) blkEncodingCtx;
1112    encodingCtx.prepareEncoding(out);
1113    if (encodingCtx.getHFileContext().isIncludesTags()
1114        && encodingCtx.getHFileContext().isCompressTags()) {
1115      if (encodingCtx.getTagCompressionContext() != null) {
1116        // It will be overhead to create the TagCompressionContext again and again for every block
1117        // encoding.
1118        encodingCtx.getTagCompressionContext().clear();
1119      } else {
1120        try {
1121          TagCompressionContext tagCompressionContext = new TagCompressionContext(
1122              LRUDictionary.class, Byte.MAX_VALUE);
1123          encodingCtx.setTagCompressionContext(tagCompressionContext);
1124        } catch (Exception e) {
1125          throw new IOException("Failed to initialize TagCompressionContext", e);
1126        }
1127      }
1128    }
1129    StreamUtils.writeInt(out, 0); // DUMMY length. This will be updated in endBlockEncoding()
1130    blkEncodingCtx.setEncodingState(new BufferedDataBlockEncodingState());
1131  }
1132
1133  private static class BufferedDataBlockEncodingState extends EncodingState {
1134    int unencodedDataSizeWritten = 0;
1135  }
1136
1137  @Override
1138  public int encode(Cell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
1139      throws IOException {
1140    BufferedDataBlockEncodingState state = (BufferedDataBlockEncodingState) encodingCtx
1141        .getEncodingState();
1142    int encodedKvSize = internalEncode(cell, (HFileBlockDefaultEncodingContext) encodingCtx, out);
1143    state.unencodedDataSizeWritten += encodedKvSize;
1144    return encodedKvSize;
1145  }
1146
1147  public abstract int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingCtx,
1148      DataOutputStream out) throws IOException;
1149
1150  @Override
1151  public void endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out,
1152      byte[] uncompressedBytesWithHeader) throws IOException {
1153    BufferedDataBlockEncodingState state = (BufferedDataBlockEncodingState) encodingCtx
1154        .getEncodingState();
1155    // Write the unencodedDataSizeWritten (with header size)
1156    Bytes.putInt(uncompressedBytesWithHeader,
1157      HConstants.HFILEBLOCK_HEADER_SIZE + DataBlockEncoding.ID_SIZE, state.unencodedDataSizeWritten
1158        );
1159    postEncoding(encodingCtx);
1160  }
1161
1162}