001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with this
004 * work for additional information regarding copyright ownership. The ASF
005 * licenses this file to you under the Apache License, Version 2.0 (the
006 * "License"); you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
013 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
014 * License for the specific language governing permissions and limitations
015 * under the License.
016 */
017package org.apache.hadoop.hbase.io.encoding;
018
019import java.io.DataInputStream;
020import java.io.DataOutputStream;
021import java.io.IOException;
022import java.io.OutputStream;
023import java.nio.ByteBuffer;
024import org.apache.hadoop.hbase.ByteBufferExtendedCell;
025import org.apache.hadoop.hbase.Cell;
026import org.apache.hadoop.hbase.CellComparator;
027import org.apache.hadoop.hbase.CellUtil;
028import org.apache.hadoop.hbase.ExtendedCell;
029import org.apache.hadoop.hbase.HConstants;
030import org.apache.hadoop.hbase.KeyValue;
031import org.apache.hadoop.hbase.KeyValue.Type;
032import org.apache.hadoop.hbase.KeyValueUtil;
033import org.apache.hadoop.hbase.PrivateCellUtil;
034import org.apache.hadoop.hbase.io.TagCompressionContext;
035import org.apache.hadoop.hbase.io.util.LRUDictionary;
036import org.apache.hadoop.hbase.io.util.StreamUtils;
037import org.apache.hadoop.hbase.nio.ByteBuff;
038import org.apache.hadoop.hbase.util.ByteBufferUtils;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.apache.hadoop.hbase.util.ClassSize;
041import org.apache.hadoop.hbase.util.ObjectIntPair;
042import org.apache.hadoop.io.WritableUtils;
043import org.apache.yetus.audience.InterfaceAudience;
044
045/**
046 * Base class for all data block encoders that use a buffer.
047 */
048@InterfaceAudience.Private
049abstract class BufferedDataBlockEncoder extends AbstractDataBlockEncoder {
050  /**
051   * TODO: This datablockencoder is dealing in internals of hfileblocks. Purge reference to HFBs
052   */
053  private static int INITIAL_KEY_BUFFER_SIZE = 512;
054
055  @Override
056  public ByteBuffer decodeKeyValues(DataInputStream source,
057      HFileBlockDecodingContext blkDecodingCtx) throws IOException {
058    if (blkDecodingCtx.getClass() != HFileBlockDefaultDecodingContext.class) {
059      throw new IOException(this.getClass().getName() + " only accepts "
060          + HFileBlockDefaultDecodingContext.class.getName() + " as the decoding context.");
061    }
062
063    HFileBlockDefaultDecodingContext decodingCtx =
064        (HFileBlockDefaultDecodingContext) blkDecodingCtx;
065    if (decodingCtx.getHFileContext().isIncludesTags()
066        && decodingCtx.getHFileContext().isCompressTags()) {
067      if (decodingCtx.getTagCompressionContext() != null) {
068        // It will be overhead to create the TagCompressionContext again and again for every block
069        // decoding.
070        decodingCtx.getTagCompressionContext().clear();
071      } else {
072        try {
073          TagCompressionContext tagCompressionContext = new TagCompressionContext(
074              LRUDictionary.class, Byte.MAX_VALUE);
075          decodingCtx.setTagCompressionContext(tagCompressionContext);
076        } catch (Exception e) {
077          throw new IOException("Failed to initialize TagCompressionContext", e);
078        }
079      }
080    }
081    return internalDecodeKeyValues(source, 0, 0, decodingCtx);
082  }
083
084  /********************* common prefixes *************************/
085  // Having this as static is fine but if META is having DBE then we should
086  // change this.
087  public static int compareCommonRowPrefix(Cell left, Cell right, int rowCommonPrefix) {
088    return Bytes.compareTo(left.getRowArray(), left.getRowOffset() + rowCommonPrefix,
089        left.getRowLength() - rowCommonPrefix, right.getRowArray(), right.getRowOffset()
090            + rowCommonPrefix, right.getRowLength() - rowCommonPrefix);
091  }
092
093  public static int compareCommonFamilyPrefix(Cell left, Cell right, int familyCommonPrefix) {
094    return Bytes.compareTo(left.getFamilyArray(), left.getFamilyOffset() + familyCommonPrefix,
095        left.getFamilyLength() - familyCommonPrefix, right.getFamilyArray(),
096        right.getFamilyOffset() + familyCommonPrefix, right.getFamilyLength() - familyCommonPrefix);
097  }
098
099  public static int compareCommonQualifierPrefix(Cell left, Cell right, int qualCommonPrefix) {
100    return Bytes.compareTo(left.getQualifierArray(), left.getQualifierOffset() + qualCommonPrefix,
101        left.getQualifierLength() - qualCommonPrefix, right.getQualifierArray(),
102        right.getQualifierOffset() + qualCommonPrefix, right.getQualifierLength()
103            - qualCommonPrefix);
104  }
105
106  protected static class SeekerState {
107    protected ByteBuff currentBuffer;
108    protected TagCompressionContext tagCompressionContext;
109    protected int valueOffset = -1;
110    protected int keyLength;
111    protected int valueLength;
112    protected int lastCommonPrefix;
113    protected int tagsLength = 0;
114    protected int tagsOffset = -1;
115    protected int tagsCompressedLength = 0;
116    protected boolean uncompressTags = true;
117
118    /** We need to store a copy of the key. */
119    protected byte[] keyBuffer = HConstants.EMPTY_BYTE_ARRAY;
120    protected byte[] tagsBuffer = HConstants.EMPTY_BYTE_ARRAY;
121
122    protected long memstoreTS;
123    protected int nextKvOffset;
124    protected KeyValue.KeyOnlyKeyValue currentKey = new KeyValue.KeyOnlyKeyValue();
125    // A temp pair object which will be reused by ByteBuff#asSubByteBuffer calls. This avoids too
126    // many object creations.
127    private final ObjectIntPair<ByteBuffer> tmpPair;
128    private final boolean includeTags;
129
130    public SeekerState(ObjectIntPair<ByteBuffer> tmpPair, boolean includeTags) {
131      this.tmpPair = tmpPair;
132      this.includeTags = includeTags;
133    }
134
135    protected boolean isValid() {
136      return valueOffset != -1;
137    }
138
139    protected void invalidate() {
140      valueOffset = -1;
141      tagsCompressedLength = 0;
142      currentKey.clear();
143      uncompressTags = true;
144      currentBuffer = null;
145    }
146
147    protected void ensureSpaceForKey() {
148      if (keyLength > keyBuffer.length) {
149        int newKeyBufferLength = Integer.highestOneBit(Math.max(
150            INITIAL_KEY_BUFFER_SIZE, keyLength) - 1) << 1;
151        byte[] newKeyBuffer = new byte[newKeyBufferLength];
152        System.arraycopy(keyBuffer, 0, newKeyBuffer, 0, keyBuffer.length);
153        keyBuffer = newKeyBuffer;
154      }
155    }
156
157    protected void ensureSpaceForTags() {
158      if (tagsLength > tagsBuffer.length) {
159        int newTagsBufferLength = Integer.highestOneBit(Math.max(
160            INITIAL_KEY_BUFFER_SIZE, tagsLength) - 1) << 1;
161        byte[] newTagsBuffer = new byte[newTagsBufferLength];
162        System.arraycopy(tagsBuffer, 0, newTagsBuffer, 0, tagsBuffer.length);
163        tagsBuffer = newTagsBuffer;
164      }
165    }
166
167    protected void setKey(byte[] keyBuffer, long memTS) {
168      currentKey.setKey(keyBuffer, 0, keyLength);
169      memstoreTS = memTS;
170    }
171
172    /**
173     * Copy the state from the next one into this instance (the previous state
174     * placeholder). Used to save the previous state when we are advancing the
175     * seeker to the next key/value.
176     */
177    protected void copyFromNext(SeekerState nextState) {
178      if (keyBuffer.length != nextState.keyBuffer.length) {
179        keyBuffer = nextState.keyBuffer.clone();
180      } else if (!isValid()) {
181        // Note: we can only call isValid before we override our state, so this
182        // comes before all the assignments at the end of this method.
183        System.arraycopy(nextState.keyBuffer, 0, keyBuffer, 0,
184             nextState.keyLength);
185      } else {
186        // don't copy the common prefix between this key and the previous one
187        System.arraycopy(nextState.keyBuffer, nextState.lastCommonPrefix,
188            keyBuffer, nextState.lastCommonPrefix, nextState.keyLength
189                - nextState.lastCommonPrefix);
190      }
191      currentKey.set(nextState.currentKey);
192
193      valueOffset = nextState.valueOffset;
194      keyLength = nextState.keyLength;
195      valueLength = nextState.valueLength;
196      lastCommonPrefix = nextState.lastCommonPrefix;
197      nextKvOffset = nextState.nextKvOffset;
198      memstoreTS = nextState.memstoreTS;
199      currentBuffer = nextState.currentBuffer;
200      tagsOffset = nextState.tagsOffset;
201      tagsLength = nextState.tagsLength;
202      if (nextState.tagCompressionContext != null) {
203        tagCompressionContext = nextState.tagCompressionContext;
204      }
205    }
206
207    public Cell toCell() {
208      // Buffer backing the value and tags part from the HFileBlock's buffer
209      // When tag compression in use, this will be only the value bytes area.
210      ByteBuffer valAndTagsBuffer;
211      int vOffset;
212      int valAndTagsLength = this.valueLength;
213      int tagsLenSerializationSize = 0;
214      if (this.includeTags && this.tagCompressionContext == null) {
215        // Include the tags part also. This will be the tags bytes + 2 bytes of for storing tags
216        // length
217        tagsLenSerializationSize = this.tagsOffset - (this.valueOffset + this.valueLength);
218        valAndTagsLength += tagsLenSerializationSize + this.tagsLength;
219      }
220      this.currentBuffer.asSubByteBuffer(this.valueOffset, valAndTagsLength, this.tmpPair);
221      valAndTagsBuffer = this.tmpPair.getFirst();
222      vOffset = this.tmpPair.getSecond();// This is the offset to value part in the BB
223      if (valAndTagsBuffer.hasArray()) {
224        return toOnheapCell(valAndTagsBuffer, vOffset, tagsLenSerializationSize);
225      } else {
226        return toOffheapCell(valAndTagsBuffer, vOffset, tagsLenSerializationSize);
227      }
228    }
229
230    private Cell toOnheapCell(ByteBuffer valAndTagsBuffer, int vOffset,
231        int tagsLenSerializationSize) {
232      byte[] tagsArray = HConstants.EMPTY_BYTE_ARRAY;
233      int tOffset = 0;
234      if (this.includeTags) {
235        if (this.tagCompressionContext == null) {
236          tagsArray = valAndTagsBuffer.array();
237          tOffset = valAndTagsBuffer.arrayOffset() + vOffset + this.valueLength
238              + tagsLenSerializationSize;
239        } else {
240          tagsArray = Bytes.copy(tagsBuffer, 0, this.tagsLength);
241          tOffset = 0;
242        }
243      }
244      return new OnheapDecodedCell(Bytes.copy(keyBuffer, 0, this.keyLength),
245          currentKey.getRowLength(), currentKey.getFamilyOffset(), currentKey.getFamilyLength(),
246          currentKey.getQualifierOffset(), currentKey.getQualifierLength(),
247          currentKey.getTimestamp(), currentKey.getTypeByte(), valAndTagsBuffer.array(),
248          valAndTagsBuffer.arrayOffset() + vOffset, this.valueLength, memstoreTS, tagsArray,
249          tOffset, this.tagsLength);
250    }
251
252    private Cell toOffheapCell(ByteBuffer valAndTagsBuffer, int vOffset,
253        int tagsLenSerializationSize) {
254      ByteBuffer tagsBuf = HConstants.EMPTY_BYTE_BUFFER;
255      int tOffset = 0;
256      if (this.includeTags) {
257        if (this.tagCompressionContext == null) {
258          tagsBuf = valAndTagsBuffer;
259          tOffset = vOffset + this.valueLength + tagsLenSerializationSize;
260        } else {
261          tagsBuf = ByteBuffer.wrap(Bytes.copy(tagsBuffer, 0, this.tagsLength));
262          tOffset = 0;
263        }
264      }
265      return new OffheapDecodedExtendedCell(
266          ByteBuffer.wrap(Bytes.copy(keyBuffer, 0, this.keyLength)), currentKey.getRowLength(),
267          currentKey.getFamilyOffset(), currentKey.getFamilyLength(),
268          currentKey.getQualifierOffset(), currentKey.getQualifierLength(),
269          currentKey.getTimestamp(), currentKey.getTypeByte(), valAndTagsBuffer, vOffset,
270          this.valueLength, memstoreTS, tagsBuf, tOffset, this.tagsLength);
271    }
272  }
273
274  /**
275   * Copies only the key part of the keybuffer by doing a deep copy and passes the
276   * seeker state members for taking a clone.
277   * Note that the value byte[] part is still pointing to the currentBuffer and
278   * represented by the valueOffset and valueLength
279   */
280  // We return this as a Cell to the upper layers of read flow and might try setting a new SeqId
281  // there. So this has to be an instance of ExtendedCell.
282  protected static class OnheapDecodedCell implements ExtendedCell {
283    private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
284        + (3 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT)
285        + (Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) + (3 * ClassSize.ARRAY));
286    private byte[] keyOnlyBuffer;
287    private short rowLength;
288    private int familyOffset;
289    private byte familyLength;
290    private int qualifierOffset;
291    private int qualifierLength;
292    private long timestamp;
293    private byte typeByte;
294    private byte[] valueBuffer;
295    private int valueOffset;
296    private int valueLength;
297    private byte[] tagsBuffer;
298    private int tagsOffset;
299    private int tagsLength;
300    private long seqId;
301
302    protected OnheapDecodedCell(byte[] keyBuffer, short rowLength, int familyOffset,
303        byte familyLength, int qualOffset, int qualLength, long timeStamp, byte typeByte,
304        byte[] valueBuffer, int valueOffset, int valueLen, long seqId, byte[] tagsBuffer,
305        int tagsOffset, int tagsLength) {
306      this.keyOnlyBuffer = keyBuffer;
307      this.rowLength = rowLength;
308      this.familyOffset = familyOffset;
309      this.familyLength = familyLength;
310      this.qualifierOffset = qualOffset;
311      this.qualifierLength = qualLength;
312      this.timestamp = timeStamp;
313      this.typeByte = typeByte;
314      this.valueBuffer = valueBuffer;
315      this.valueOffset = valueOffset;
316      this.valueLength = valueLen;
317      this.tagsBuffer = tagsBuffer;
318      this.tagsOffset = tagsOffset;
319      this.tagsLength = tagsLength;
320      setSequenceId(seqId);
321    }
322
323    @Override
324    public byte[] getRowArray() {
325      return keyOnlyBuffer;
326    }
327
328    @Override
329    public byte[] getFamilyArray() {
330      return keyOnlyBuffer;
331    }
332
333    @Override
334    public byte[] getQualifierArray() {
335      return keyOnlyBuffer;
336    }
337
338    @Override
339    public int getRowOffset() {
340      return Bytes.SIZEOF_SHORT;
341    }
342
343    @Override
344    public short getRowLength() {
345      return rowLength;
346    }
347
348    @Override
349    public int getFamilyOffset() {
350      return familyOffset;
351    }
352
353    @Override
354    public byte getFamilyLength() {
355      return familyLength;
356    }
357
358    @Override
359    public int getQualifierOffset() {
360      return qualifierOffset;
361    }
362
363    @Override
364    public int getQualifierLength() {
365      return qualifierLength;
366    }
367
368    @Override
369    public long getTimestamp() {
370      return timestamp;
371    }
372
373    @Override
374    public byte getTypeByte() {
375      return typeByte;
376    }
377
378    @Override
379    public long getSequenceId() {
380      return seqId;
381    }
382
383    @Override
384    public byte[] getValueArray() {
385      return this.valueBuffer;
386    }
387
388    @Override
389    public int getValueOffset() {
390      return valueOffset;
391    }
392
393    @Override
394    public int getValueLength() {
395      return valueLength;
396    }
397
398    @Override
399    public byte[] getTagsArray() {
400      return this.tagsBuffer;
401    }
402
403    @Override
404    public int getTagsOffset() {
405      return this.tagsOffset;
406    }
407
408    @Override
409    public int getTagsLength() {
410      return tagsLength;
411    }
412
413    @Override
414    public String toString() {
415      return KeyValue.keyToString(this.keyOnlyBuffer, 0, KeyValueUtil.keyLength(this)) + "/vlen="
416          + getValueLength() + "/seqid=" + seqId;
417    }
418
419    @Override
420    public void setSequenceId(long seqId) {
421      this.seqId = seqId;
422    }
423
424    @Override
425    public long heapSize() {
426      return FIXED_OVERHEAD + rowLength + familyLength + qualifierLength + valueLength + tagsLength;
427    }
428
429    @Override
430    public int write(OutputStream out, boolean withTags) throws IOException {
431      int lenToWrite = getSerializedSize(withTags);
432      ByteBufferUtils.putInt(out, keyOnlyBuffer.length);
433      ByteBufferUtils.putInt(out, valueLength);
434      // Write key
435      out.write(keyOnlyBuffer);
436      // Write value
437      out.write(this.valueBuffer, this.valueOffset, this.valueLength);
438      if (withTags && this.tagsLength > 0) {
439        // 2 bytes tags length followed by tags bytes
440        // tags length is serialized with 2 bytes only(short way) even if the type is int.
441        // As this is non -ve numbers, we save the sign bit. See HBASE-11437
442        out.write((byte) (0xff & (this.tagsLength >> 8)));
443        out.write((byte) (0xff & this.tagsLength));
444        out.write(this.tagsBuffer, this.tagsOffset, this.tagsLength);
445      }
446      return lenToWrite;
447    }
448
449    @Override
450    public int getSerializedSize(boolean withTags) {
451      return KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength, tagsLength,
452          withTags);
453    }
454
455    @Override
456    public void write(ByteBuffer buf, int offset) {
457      // This is not used in actual flow. Throwing UnsupportedOperationException
458      throw new UnsupportedOperationException();
459    }
460
461    @Override
462    public void setTimestamp(long ts) throws IOException {
463      // This is not used in actual flow. Throwing UnsupportedOperationException
464      throw new UnsupportedOperationException();
465    }
466
467    @Override
468    public void setTimestamp(byte[] ts) throws IOException {
469      // This is not used in actual flow. Throwing UnsupportedOperationException
470      throw new UnsupportedOperationException();
471    }
472
473    @Override
474    public ExtendedCell deepClone() {
475      // This is not used in actual flow. Throwing UnsupportedOperationException
476      throw new UnsupportedOperationException();
477    }
478  }
479
480  protected static class OffheapDecodedExtendedCell extends ByteBufferExtendedCell {
481    private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
482        + (3 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT)
483        + (Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) + (3 * ClassSize.BYTE_BUFFER));
484    private ByteBuffer keyBuffer;
485    private short rowLength;
486    private int familyOffset;
487    private byte familyLength;
488    private int qualifierOffset;
489    private int qualifierLength;
490    private long timestamp;
491    private byte typeByte;
492    private ByteBuffer valueBuffer;
493    private int valueOffset;
494    private int valueLength;
495    private ByteBuffer tagsBuffer;
496    private int tagsOffset;
497    private int tagsLength;
498    private long seqId;
499
500    protected OffheapDecodedExtendedCell(ByteBuffer keyBuffer, short rowLength, int familyOffset,
501        byte familyLength, int qualOffset, int qualLength, long timeStamp, byte typeByte,
502        ByteBuffer valueBuffer, int valueOffset, int valueLen, long seqId, ByteBuffer tagsBuffer,
503        int tagsOffset, int tagsLength) {
504      // The keyBuffer is always onheap
505      assert keyBuffer.hasArray();
506      assert keyBuffer.arrayOffset() == 0;
507      this.keyBuffer = keyBuffer;
508      this.rowLength = rowLength;
509      this.familyOffset = familyOffset;
510      this.familyLength = familyLength;
511      this.qualifierOffset = qualOffset;
512      this.qualifierLength = qualLength;
513      this.timestamp = timeStamp;
514      this.typeByte = typeByte;
515      this.valueBuffer = valueBuffer;
516      this.valueOffset = valueOffset;
517      this.valueLength = valueLen;
518      this.tagsBuffer = tagsBuffer;
519      this.tagsOffset = tagsOffset;
520      this.tagsLength = tagsLength;
521      setSequenceId(seqId);
522    }
523
524    @Override
525    public byte[] getRowArray() {
526      return this.keyBuffer.array();
527    }
528
529    @Override
530    public int getRowOffset() {
531      return getRowPosition();
532    }
533
534    @Override
535    public short getRowLength() {
536      return this.rowLength;
537    }
538
539    @Override
540    public byte[] getFamilyArray() {
541      return this.keyBuffer.array();
542    }
543
544    @Override
545    public int getFamilyOffset() {
546      return getFamilyPosition();
547    }
548
549    @Override
550    public byte getFamilyLength() {
551      return this.familyLength;
552    }
553
554    @Override
555    public byte[] getQualifierArray() {
556      return this.keyBuffer.array();
557    }
558
559    @Override
560    public int getQualifierOffset() {
561      return getQualifierPosition();
562    }
563
564    @Override
565    public int getQualifierLength() {
566      return this.qualifierLength;
567    }
568
569    @Override
570    public long getTimestamp() {
571      return this.timestamp;
572    }
573
574    @Override
575    public byte getTypeByte() {
576      return this.typeByte;
577    }
578
579    @Override
580    public long getSequenceId() {
581      return this.seqId;
582    }
583
584    @Override
585    public byte[] getValueArray() {
586      return CellUtil.cloneValue(this);
587    }
588
589    @Override
590    public int getValueOffset() {
591      return 0;
592    }
593
594    @Override
595    public int getValueLength() {
596      return this.valueLength;
597    }
598
599    @Override
600    public byte[] getTagsArray() {
601      return CellUtil.cloneTags(this);
602    }
603
604    @Override
605    public int getTagsOffset() {
606      return 0;
607    }
608
609    @Override
610    public int getTagsLength() {
611      return this.tagsLength;
612    }
613
614    @Override
615    public ByteBuffer getRowByteBuffer() {
616      return this.keyBuffer;
617    }
618
619    @Override
620    public int getRowPosition() {
621      return Bytes.SIZEOF_SHORT;
622    }
623
624    @Override
625    public ByteBuffer getFamilyByteBuffer() {
626      return this.keyBuffer;
627    }
628
629    @Override
630    public int getFamilyPosition() {
631      return this.familyOffset;
632    }
633
634    @Override
635    public ByteBuffer getQualifierByteBuffer() {
636      return this.keyBuffer;
637    }
638
639    @Override
640    public int getQualifierPosition() {
641      return this.qualifierOffset;
642    }
643
644    @Override
645    public ByteBuffer getValueByteBuffer() {
646      return this.valueBuffer;
647    }
648
649    @Override
650    public int getValuePosition() {
651      return this.valueOffset;
652    }
653
654    @Override
655    public ByteBuffer getTagsByteBuffer() {
656      return this.tagsBuffer;
657    }
658
659    @Override
660    public int getTagsPosition() {
661      return this.tagsOffset;
662    }
663
664    @Override
665    public long heapSize() {
666      return FIXED_OVERHEAD;
667    }
668
669    @Override
670    public void setSequenceId(long seqId) {
671      this.seqId = seqId;
672    }
673
674    @Override
675    public int write(OutputStream out, boolean withTags) throws IOException {
676      int lenToWrite = getSerializedSize(withTags);
677      ByteBufferUtils.putInt(out, keyBuffer.capacity());
678      ByteBufferUtils.putInt(out, valueLength);
679      // Write key
680      out.write(keyBuffer.array());
681      // Write value
682      ByteBufferUtils.copyBufferToStream(out, this.valueBuffer, this.valueOffset, this.valueLength);
683      if (withTags && this.tagsLength > 0) {
684        // 2 bytes tags length followed by tags bytes
685        // tags length is serialized with 2 bytes only(short way) even if the type is int.
686        // As this is non -ve numbers, we save the sign bit. See HBASE-11437
687        out.write((byte) (0xff & (this.tagsLength >> 8)));
688        out.write((byte) (0xff & this.tagsLength));
689        ByteBufferUtils.copyBufferToStream(out, this.tagsBuffer, this.tagsOffset, this.tagsLength);
690      }
691      return lenToWrite;
692    }
693
694    @Override
695    public int getSerializedSize(boolean withTags) {
696      return KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength, tagsLength,
697          withTags);
698    }
699
700    @Override
701    public void setTimestamp(long ts) throws IOException {
702      // This is not used in actual flow. Throwing UnsupportedOperationException
703      throw new UnsupportedOperationException();
704    }
705
706    @Override
707    public void setTimestamp(byte[] ts) throws IOException {
708      // This is not used in actual flow. Throwing UnsupportedOperationException
709      throw new UnsupportedOperationException();
710    }
711
712    @Override
713    public void write(ByteBuffer buf, int offset) {
714      // This is not used in actual flow. Throwing UnsupportedOperationException
715      throw new UnsupportedOperationException();
716    }
717
718    @Override
719    public ExtendedCell deepClone() {
720      // This is not used in actual flow. Throwing UnsupportedOperationException
721      throw new UnsupportedOperationException();
722    }
723  }
724
725  protected abstract static class BufferedEncodedSeeker<STATE extends SeekerState>
726      extends AbstractEncodedSeeker {
727    protected ByteBuff currentBuffer;
728    protected TagCompressionContext tagCompressionContext = null;
729    protected  KeyValue.KeyOnlyKeyValue keyOnlyKV = new KeyValue.KeyOnlyKeyValue();
730    // A temp pair object which will be reused by ByteBuff#asSubByteBuffer calls. This avoids too
731    // many object creations.
732    protected final ObjectIntPair<ByteBuffer> tmpPair = new ObjectIntPair<>();
733    protected STATE current, previous;
734
735    public BufferedEncodedSeeker(HFileBlockDecodingContext decodingCtx) {
736      super(decodingCtx);
737      if (decodingCtx.getHFileContext().isCompressTags()) {
738        try {
739          tagCompressionContext = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
740        } catch (Exception e) {
741          throw new RuntimeException("Failed to initialize TagCompressionContext", e);
742        }
743      }
744      current = createSeekerState(); // always valid
745      previous = createSeekerState(); // may not be valid
746    }
747
748    @Override
749    public int compareKey(CellComparator comparator, Cell key) {
750      keyOnlyKV.setKey(current.keyBuffer, 0, current.keyLength);
751      return PrivateCellUtil.compareKeyIgnoresMvcc(comparator, key, keyOnlyKV);
752    }
753
754    @Override
755    public void setCurrentBuffer(ByteBuff buffer) {
756      if (this.tagCompressionContext != null) {
757        this.tagCompressionContext.clear();
758      }
759      currentBuffer = buffer;
760      current.currentBuffer = currentBuffer;
761      if(tagCompressionContext != null) {
762        current.tagCompressionContext = tagCompressionContext;
763      }
764      decodeFirst();
765      current.setKey(current.keyBuffer, current.memstoreTS);
766      previous.invalidate();
767    }
768
769    @Override
770    public Cell getKey() {
771      byte[] key = new byte[current.keyLength];
772      System.arraycopy(current.keyBuffer, 0, key, 0, current.keyLength);
773      return new KeyValue.KeyOnlyKeyValue(key);
774    }
775
776    @Override
777    public ByteBuffer getValueShallowCopy() {
778      currentBuffer.asSubByteBuffer(current.valueOffset, current.valueLength, tmpPair);
779      ByteBuffer dup = tmpPair.getFirst().duplicate();
780      dup.position(tmpPair.getSecond());
781      dup.limit(tmpPair.getSecond() + current.valueLength);
782      return dup.slice();
783    }
784
785    @Override
786    public Cell getCell() {
787      return current.toCell();
788    }
789
790    @Override
791    public void rewind() {
792      currentBuffer.rewind();
793      if (tagCompressionContext != null) {
794        tagCompressionContext.clear();
795      }
796      decodeFirst();
797      current.setKey(current.keyBuffer, current.memstoreTS);
798      previous.invalidate();
799    }
800
801    @Override
802    public boolean next() {
803      if (!currentBuffer.hasRemaining()) {
804        return false;
805      }
806      decodeNext();
807      current.setKey(current.keyBuffer, current.memstoreTS);
808      previous.invalidate();
809      return true;
810    }
811
812    protected void decodeTags() {
813      current.tagsLength = ByteBuff.readCompressedInt(currentBuffer);
814      if (tagCompressionContext != null) {
815        if (current.uncompressTags) {
816          // Tag compression is been used. uncompress it into tagsBuffer
817          current.ensureSpaceForTags();
818          try {
819            current.tagsCompressedLength = tagCompressionContext.uncompressTags(currentBuffer,
820                current.tagsBuffer, 0, current.tagsLength);
821          } catch (IOException e) {
822            throw new RuntimeException("Exception while uncompressing tags", e);
823          }
824        } else {
825          currentBuffer.skip(current.tagsCompressedLength);
826          current.uncompressTags = true;// Reset this.
827        }
828        current.tagsOffset = -1;
829      } else {
830        // When tag compress is not used, let us not do copying of tags bytes into tagsBuffer.
831        // Just mark the tags Offset so as to create the KV buffer later in getKeyValueBuffer()
832        current.tagsOffset = currentBuffer.position();
833        currentBuffer.skip(current.tagsLength);
834      }
835    }
836
837    @Override
838    public int seekToKeyInBlock(Cell seekCell, boolean seekBefore) {
839      int rowCommonPrefix = 0;
840      int familyCommonPrefix = 0;
841      int qualCommonPrefix = 0;
842      previous.invalidate();
843      do {
844        int comp;
845        keyOnlyKV.setKey(current.keyBuffer, 0, current.keyLength);
846        if (current.lastCommonPrefix != 0) {
847          // The KV format has row key length also in the byte array. The
848          // common prefix
849          // includes it. So we need to subtract to find out the common prefix
850          // in the
851          // row part alone
852          rowCommonPrefix = Math.min(rowCommonPrefix, current.lastCommonPrefix - 2);
853        }
854        if (current.lastCommonPrefix <= 2) {
855          rowCommonPrefix = 0;
856        }
857        rowCommonPrefix += findCommonPrefixInRowPart(seekCell, keyOnlyKV, rowCommonPrefix);
858        comp = compareCommonRowPrefix(seekCell, keyOnlyKV, rowCommonPrefix);
859        if (comp == 0) {
860          comp = compareTypeBytes(seekCell, keyOnlyKV);
861          if (comp == 0) {
862            // Subtract the fixed row key length and the family key fixed length
863            familyCommonPrefix = Math.max(
864                0,
865                Math.min(familyCommonPrefix,
866                    current.lastCommonPrefix - (3 + keyOnlyKV.getRowLength())));
867            familyCommonPrefix += findCommonPrefixInFamilyPart(seekCell, keyOnlyKV,
868                familyCommonPrefix);
869            comp = compareCommonFamilyPrefix(seekCell, keyOnlyKV, familyCommonPrefix);
870            if (comp == 0) {
871              // subtract the rowkey fixed length and the family key fixed
872              // length
873              qualCommonPrefix = Math.max(
874                  0,
875                  Math.min(
876                      qualCommonPrefix,
877                      current.lastCommonPrefix
878                          - (3 + keyOnlyKV.getRowLength() + keyOnlyKV.getFamilyLength())));
879              qualCommonPrefix += findCommonPrefixInQualifierPart(seekCell, keyOnlyKV,
880                  qualCommonPrefix);
881              comp = compareCommonQualifierPrefix(seekCell, keyOnlyKV, qualCommonPrefix);
882              if (comp == 0) {
883                comp = CellComparator.getInstance().compareTimestamps(seekCell, keyOnlyKV);
884                if (comp == 0) {
885                  // Compare types. Let the delete types sort ahead of puts;
886                  // i.e. types
887                  // of higher numbers sort before those of lesser numbers.
888                  // Maximum
889                  // (255)
890                  // appears ahead of everything, and minimum (0) appears
891                  // after
892                  // everything.
893                  comp = (0xff & keyOnlyKV.getTypeByte()) - (0xff & seekCell.getTypeByte());
894                }
895              }
896            }
897          }
898        }
899        if (comp == 0) { // exact match
900          if (seekBefore) {
901            if (!previous.isValid()) {
902              // The caller (seekBefore) has to ensure that we are not at the
903              // first key in the block.
904              throw new IllegalStateException("Cannot seekBefore if "
905                  + "positioned at the first key in the block: key="
906                  + Bytes.toStringBinary(seekCell.getRowArray()));
907            }
908            moveToPrevious();
909            return 1;
910          }
911          return 0;
912        }
913
914        if (comp < 0) { // already too large, check previous
915          if (previous.isValid()) {
916            moveToPrevious();
917          } else {
918            return HConstants.INDEX_KEY_MAGIC; // using optimized index key
919          }
920          return 1;
921        }
922
923        // move to next, if more data is available
924        if (currentBuffer.hasRemaining()) {
925          previous.copyFromNext(current);
926          decodeNext();
927          current.setKey(current.keyBuffer, current.memstoreTS);
928        } else {
929          break;
930        }
931      } while (true);
932
933      // we hit the end of the block, not an exact match
934      return 1;
935    }
936
937    private int compareTypeBytes(Cell key, Cell right) {
938      if (key.getFamilyLength() + key.getQualifierLength() == 0
939          && key.getTypeByte() == Type.Minimum.getCode()) {
940        // left is "bigger", i.e. it appears later in the sorted order
941        return 1;
942      }
943      if (right.getFamilyLength() + right.getQualifierLength() == 0
944          && right.getTypeByte() == Type.Minimum.getCode()) {
945        return -1;
946      }
947      return 0;
948    }
949
950    private static int findCommonPrefixInRowPart(Cell left, Cell right, int rowCommonPrefix) {
951      return Bytes.findCommonPrefix(left.getRowArray(), right.getRowArray(), left.getRowLength()
952          - rowCommonPrefix, right.getRowLength() - rowCommonPrefix, left.getRowOffset()
953          + rowCommonPrefix, right.getRowOffset() + rowCommonPrefix);
954    }
955
956    private static int findCommonPrefixInFamilyPart(Cell left, Cell right, int familyCommonPrefix) {
957      return Bytes
958          .findCommonPrefix(left.getFamilyArray(), right.getFamilyArray(), left.getFamilyLength()
959              - familyCommonPrefix, right.getFamilyLength() - familyCommonPrefix,
960              left.getFamilyOffset() + familyCommonPrefix, right.getFamilyOffset()
961                  + familyCommonPrefix);
962    }
963
964    private static int findCommonPrefixInQualifierPart(Cell left, Cell right,
965        int qualifierCommonPrefix) {
966      return Bytes.findCommonPrefix(left.getQualifierArray(), right.getQualifierArray(),
967          left.getQualifierLength() - qualifierCommonPrefix, right.getQualifierLength()
968              - qualifierCommonPrefix, left.getQualifierOffset() + qualifierCommonPrefix,
969          right.getQualifierOffset() + qualifierCommonPrefix);
970    }
971
972    private void moveToPrevious() {
973      if (!previous.isValid()) {
974        throw new IllegalStateException(
975            "Can move back only once and not in first key in the block.");
976      }
977
978      STATE tmp = previous;
979      previous = current;
980      current = tmp;
981
982      // move after last key value
983      currentBuffer.position(current.nextKvOffset);
984      // Already decoded the tag bytes. We cache this tags into current state and also the total
985      // compressed length of the tags bytes. For the next time decodeNext() we don't need to decode
986      // the tags again. This might pollute the Data Dictionary what we use for the compression.
987      // When current.uncompressTags is false, we will just reuse the current.tagsBuffer and skip
988      // 'tagsCompressedLength' bytes of source stream.
989      // See in decodeTags()
990      current.tagsBuffer = previous.tagsBuffer;
991      current.tagsCompressedLength = previous.tagsCompressedLength;
992      current.uncompressTags = false;
993      // The current key has to be reset with the previous Cell
994      current.setKey(current.keyBuffer, current.memstoreTS);
995      previous.invalidate();
996    }
997
998    @SuppressWarnings("unchecked")
999    protected STATE createSeekerState() {
1000      // This will fail for non-default seeker state if the subclass does not
1001      // override this method.
1002      return (STATE) new SeekerState(this.tmpPair, this.includesTags());
1003    }
1004
1005    abstract protected void decodeFirst();
1006    abstract protected void decodeNext();
1007  }
1008
1009  /**
1010   * @return unencoded size added
1011   */
1012  protected final int afterEncodingKeyValue(Cell cell, DataOutputStream out,
1013      HFileBlockDefaultEncodingContext encodingCtx) throws IOException {
1014    int size = 0;
1015    if (encodingCtx.getHFileContext().isIncludesTags()) {
1016      int tagsLength = cell.getTagsLength();
1017      ByteBufferUtils.putCompressedInt(out, tagsLength);
1018      // There are some tags to be written
1019      if (tagsLength > 0) {
1020        TagCompressionContext tagCompressionContext = encodingCtx.getTagCompressionContext();
1021        // When tag compression is enabled, tagCompressionContext will have a not null value. Write
1022        // the tags using Dictionary compression in such a case
1023        if (tagCompressionContext != null) {
1024          // Not passing tagsLength considering that parsing of the tagsLength is not costly
1025          PrivateCellUtil.compressTags(out, cell, tagCompressionContext);
1026        } else {
1027          PrivateCellUtil.writeTags(out, cell, tagsLength);
1028        }
1029      }
1030      size += tagsLength + KeyValue.TAGS_LENGTH_SIZE;
1031    }
1032    if (encodingCtx.getHFileContext().isIncludesMvcc()) {
1033      // Copy memstore timestamp from the byte buffer to the output stream.
1034      long memstoreTS = cell.getSequenceId();
1035      WritableUtils.writeVLong(out, memstoreTS);
1036      // TODO use a writeVLong which returns the #bytes written so that 2 time parsing can be
1037      // avoided.
1038      size += WritableUtils.getVIntSize(memstoreTS);
1039    }
1040    return size;
1041  }
1042
1043  protected final void afterDecodingKeyValue(DataInputStream source,
1044      ByteBuffer dest, HFileBlockDefaultDecodingContext decodingCtx) throws IOException {
1045    if (decodingCtx.getHFileContext().isIncludesTags()) {
1046      int tagsLength = ByteBufferUtils.readCompressedInt(source);
1047      // Put as unsigned short
1048      dest.put((byte) ((tagsLength >> 8) & 0xff));
1049      dest.put((byte) (tagsLength & 0xff));
1050      if (tagsLength > 0) {
1051        TagCompressionContext tagCompressionContext = decodingCtx.getTagCompressionContext();
1052        // When tag compression is been used in this file, tagCompressionContext will have a not
1053        // null value passed.
1054        if (tagCompressionContext != null) {
1055          tagCompressionContext.uncompressTags(source, dest, tagsLength);
1056        } else {
1057          ByteBufferUtils.copyFromStreamToBuffer(dest, source, tagsLength);
1058        }
1059      }
1060    }
1061    if (decodingCtx.getHFileContext().isIncludesMvcc()) {
1062      long memstoreTS = -1;
1063      try {
1064        // Copy memstore timestamp from the data input stream to the byte
1065        // buffer.
1066        memstoreTS = WritableUtils.readVLong(source);
1067        ByteBufferUtils.writeVLong(dest, memstoreTS);
1068      } catch (IOException ex) {
1069        throw new RuntimeException("Unable to copy memstore timestamp " +
1070            memstoreTS + " after decoding a key/value");
1071      }
1072    }
1073  }
1074
1075  protected abstract ByteBuffer internalDecodeKeyValues(DataInputStream source,
1076      int allocateHeaderLength, int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx)
1077      throws IOException;
1078
1079  /**
1080   * Asserts that there is at least the given amount of unfilled space
1081   * remaining in the given buffer.
1082   * @param out typically, the buffer we are writing to
1083   * @param length the required space in the buffer
1084   * @throws EncoderBufferTooSmallException If there are no enough bytes.
1085   */
1086  protected static void ensureSpace(ByteBuffer out, int length)
1087      throws EncoderBufferTooSmallException {
1088    if (out.position() + length > out.limit()) {
1089      throw new EncoderBufferTooSmallException(
1090          "Buffer position=" + out.position() +
1091          ", buffer limit=" + out.limit() +
1092          ", length to be written=" + length);
1093    }
1094  }
1095
1096  @Override
1097  public void startBlockEncoding(HFileBlockEncodingContext blkEncodingCtx, DataOutputStream out)
1098      throws IOException {
1099    if (blkEncodingCtx.getClass() != HFileBlockDefaultEncodingContext.class) {
1100      throw new IOException(this.getClass().getName() + " only accepts "
1101          + HFileBlockDefaultEncodingContext.class.getName() + " as the " +
1102          "encoding context.");
1103    }
1104
1105    HFileBlockDefaultEncodingContext encodingCtx =
1106        (HFileBlockDefaultEncodingContext) blkEncodingCtx;
1107    encodingCtx.prepareEncoding(out);
1108    if (encodingCtx.getHFileContext().isIncludesTags()
1109        && encodingCtx.getHFileContext().isCompressTags()) {
1110      if (encodingCtx.getTagCompressionContext() != null) {
1111        // It will be overhead to create the TagCompressionContext again and again for every block
1112        // encoding.
1113        encodingCtx.getTagCompressionContext().clear();
1114      } else {
1115        try {
1116          TagCompressionContext tagCompressionContext = new TagCompressionContext(
1117              LRUDictionary.class, Byte.MAX_VALUE);
1118          encodingCtx.setTagCompressionContext(tagCompressionContext);
1119        } catch (Exception e) {
1120          throw new IOException("Failed to initialize TagCompressionContext", e);
1121        }
1122      }
1123    }
1124    StreamUtils.writeInt(out, 0); // DUMMY length. This will be updated in endBlockEncoding()
1125    blkEncodingCtx.setEncodingState(new EncodingState());
1126  }
1127
1128  @Override
1129  public void encode(Cell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
1130      throws IOException {
1131    EncodingState state = encodingCtx.getEncodingState();
1132    int posBeforeEncode = out.size();
1133    int encodedKvSize = internalEncode(cell, (HFileBlockDefaultEncodingContext) encodingCtx, out);
1134    state.postCellEncode(encodedKvSize, out.size() - posBeforeEncode);
1135  }
1136
1137  public abstract int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingCtx,
1138      DataOutputStream out) throws IOException;
1139
1140  @Override
1141  public void endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out,
1142      byte[] uncompressedBytesWithHeader) throws IOException {
1143    EncodingState state = encodingCtx.getEncodingState();
1144    // Write the unencodedDataSizeWritten (with header size)
1145    Bytes.putInt(uncompressedBytesWithHeader,
1146      HConstants.HFILEBLOCK_HEADER_SIZE + DataBlockEncoding.ID_SIZE,
1147      state.getUnencodedDataSizeWritten());
1148    postEncoding(encodingCtx);
1149  }
1150
1151}