001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.encoding;
019
020import java.io.DataInputStream;
021import java.io.DataOutputStream;
022import java.io.IOException;
023import java.io.OutputStream;
024import java.nio.ByteBuffer;
025import org.apache.hadoop.hbase.ByteBufferExtendedCell;
026import org.apache.hadoop.hbase.Cell;
027import org.apache.hadoop.hbase.CellComparator;
028import org.apache.hadoop.hbase.CellUtil;
029import org.apache.hadoop.hbase.ExtendedCell;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.KeyValue;
032import org.apache.hadoop.hbase.KeyValue.Type;
033import org.apache.hadoop.hbase.KeyValueUtil;
034import org.apache.hadoop.hbase.PrivateCellUtil;
035import org.apache.hadoop.hbase.io.TagCompressionContext;
036import org.apache.hadoop.hbase.io.util.LRUDictionary;
037import org.apache.hadoop.hbase.io.util.StreamUtils;
038import org.apache.hadoop.hbase.nio.ByteBuff;
039import org.apache.hadoop.hbase.util.ByteBufferUtils;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.apache.hadoop.hbase.util.ClassSize;
042import org.apache.hadoop.hbase.util.ObjectIntPair;
043import org.apache.hadoop.io.WritableUtils;
044import org.apache.yetus.audience.InterfaceAudience;
045
046/**
047 * Base class for all data block encoders that use a buffer.
048 */
049@InterfaceAudience.Private
050abstract class BufferedDataBlockEncoder extends AbstractDataBlockEncoder {
051  /**
052   * TODO: This datablockencoder is dealing in internals of hfileblocks. Purge reference to HFBs
053   */
054  private static int INITIAL_KEY_BUFFER_SIZE = 512;
055
056  @Override
057  public ByteBuffer decodeKeyValues(DataInputStream source,
058    HFileBlockDecodingContext blkDecodingCtx) throws IOException {
059    if (blkDecodingCtx.getClass() != HFileBlockDefaultDecodingContext.class) {
060      throw new IOException(this.getClass().getName() + " only accepts "
061        + HFileBlockDefaultDecodingContext.class.getName() + " as the decoding context.");
062    }
063
064    HFileBlockDefaultDecodingContext decodingCtx =
065      (HFileBlockDefaultDecodingContext) blkDecodingCtx;
066    if (
067      decodingCtx.getHFileContext().isIncludesTags()
068        && decodingCtx.getHFileContext().isCompressTags()
069    ) {
070      if (decodingCtx.getTagCompressionContext() != null) {
071        // It will be overhead to create the TagCompressionContext again and again for every block
072        // decoding.
073        decodingCtx.getTagCompressionContext().clear();
074      } else {
075        try {
076          TagCompressionContext tagCompressionContext =
077            new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
078          decodingCtx.setTagCompressionContext(tagCompressionContext);
079        } catch (Exception e) {
080          throw new IOException("Failed to initialize TagCompressionContext", e);
081        }
082      }
083    }
084    return internalDecodeKeyValues(source, 0, 0, decodingCtx);
085  }
086
087  /********************* common prefixes *************************/
088  // Having this as static is fine but if META is having DBE then we should
089  // change this.
090  public static int compareCommonRowPrefix(Cell left, Cell right, int rowCommonPrefix) {
091    return Bytes.compareTo(left.getRowArray(), left.getRowOffset() + rowCommonPrefix,
092      left.getRowLength() - rowCommonPrefix, right.getRowArray(),
093      right.getRowOffset() + rowCommonPrefix, right.getRowLength() - rowCommonPrefix);
094  }
095
096  public static int compareCommonFamilyPrefix(Cell left, Cell right, int familyCommonPrefix) {
097    return Bytes.compareTo(left.getFamilyArray(), left.getFamilyOffset() + familyCommonPrefix,
098      left.getFamilyLength() - familyCommonPrefix, right.getFamilyArray(),
099      right.getFamilyOffset() + familyCommonPrefix, right.getFamilyLength() - familyCommonPrefix);
100  }
101
102  public static int compareCommonQualifierPrefix(Cell left, Cell right, int qualCommonPrefix) {
103    return Bytes.compareTo(left.getQualifierArray(), left.getQualifierOffset() + qualCommonPrefix,
104      left.getQualifierLength() - qualCommonPrefix, right.getQualifierArray(),
105      right.getQualifierOffset() + qualCommonPrefix, right.getQualifierLength() - qualCommonPrefix);
106  }
107
108  protected static class SeekerState {
109    protected ByteBuff currentBuffer;
110    protected TagCompressionContext tagCompressionContext;
111    protected int valueOffset = -1;
112    protected int keyLength;
113    protected int valueLength;
114    protected int lastCommonPrefix;
115    protected int tagsLength = 0;
116    protected int tagsOffset = -1;
117    protected int tagsCompressedLength = 0;
118    protected boolean uncompressTags = true;
119
120    /** We need to store a copy of the key. */
121    protected byte[] keyBuffer = HConstants.EMPTY_BYTE_ARRAY;
122    protected byte[] tagsBuffer = HConstants.EMPTY_BYTE_ARRAY;
123
124    protected long memstoreTS;
125    protected int nextKvOffset;
126    protected KeyValue.KeyOnlyKeyValue currentKey = new KeyValue.KeyOnlyKeyValue();
127    // A temp pair object which will be reused by ByteBuff#asSubByteBuffer calls. This avoids too
128    // many object creations.
129    private final ObjectIntPair<ByteBuffer> tmpPair;
130    private final boolean includeTags;
131
132    public SeekerState(ObjectIntPair<ByteBuffer> tmpPair, boolean includeTags) {
133      this.tmpPair = tmpPair;
134      this.includeTags = includeTags;
135    }
136
137    protected boolean isValid() {
138      return valueOffset != -1;
139    }
140
141    protected void invalidate() {
142      valueOffset = -1;
143      tagsCompressedLength = 0;
144      currentKey.clear();
145      uncompressTags = true;
146      currentBuffer = null;
147    }
148
149    protected void ensureSpaceForKey() {
150      if (keyLength > keyBuffer.length) {
151        int newKeyBufferLength =
152          Integer.highestOneBit(Math.max(INITIAL_KEY_BUFFER_SIZE, keyLength) - 1) << 1;
153        byte[] newKeyBuffer = new byte[newKeyBufferLength];
154        System.arraycopy(keyBuffer, 0, newKeyBuffer, 0, keyBuffer.length);
155        keyBuffer = newKeyBuffer;
156      }
157    }
158
159    protected void ensureSpaceForTags() {
160      if (tagsLength > tagsBuffer.length) {
161        int newTagsBufferLength =
162          Integer.highestOneBit(Math.max(INITIAL_KEY_BUFFER_SIZE, tagsLength) - 1) << 1;
163        byte[] newTagsBuffer = new byte[newTagsBufferLength];
164        System.arraycopy(tagsBuffer, 0, newTagsBuffer, 0, tagsBuffer.length);
165        tagsBuffer = newTagsBuffer;
166      }
167    }
168
169    protected void setKey(byte[] keyBuffer, long memTS) {
170      currentKey.setKey(keyBuffer, 0, keyLength);
171      memstoreTS = memTS;
172    }
173
174    /**
175     * Copy the state from the next one into this instance (the previous state placeholder). Used to
176     * save the previous state when we are advancing the seeker to the next key/value.
177     */
178    protected void copyFromNext(SeekerState nextState) {
179      if (keyBuffer.length != nextState.keyBuffer.length) {
180        keyBuffer = nextState.keyBuffer.clone();
181      } else if (!isValid()) {
182        // Note: we can only call isValid before we override our state, so this
183        // comes before all the assignments at the end of this method.
184        System.arraycopy(nextState.keyBuffer, 0, keyBuffer, 0, nextState.keyLength);
185      } else {
186        // don't copy the common prefix between this key and the previous one
187        System.arraycopy(nextState.keyBuffer, nextState.lastCommonPrefix, keyBuffer,
188          nextState.lastCommonPrefix, nextState.keyLength - nextState.lastCommonPrefix);
189      }
190      currentKey.set(nextState.currentKey);
191
192      valueOffset = nextState.valueOffset;
193      keyLength = nextState.keyLength;
194      valueLength = nextState.valueLength;
195      lastCommonPrefix = nextState.lastCommonPrefix;
196      nextKvOffset = nextState.nextKvOffset;
197      memstoreTS = nextState.memstoreTS;
198      currentBuffer = nextState.currentBuffer;
199      tagsOffset = nextState.tagsOffset;
200      tagsLength = nextState.tagsLength;
201      if (nextState.tagCompressionContext != null) {
202        tagCompressionContext = nextState.tagCompressionContext;
203      }
204    }
205
206    public Cell toCell() {
207      // Buffer backing the value and tags part from the HFileBlock's buffer
208      // When tag compression in use, this will be only the value bytes area.
209      ByteBuffer valAndTagsBuffer;
210      int vOffset;
211      int valAndTagsLength = this.valueLength;
212      int tagsLenSerializationSize = 0;
213      if (this.includeTags && this.tagCompressionContext == null) {
214        // Include the tags part also. This will be the tags bytes + 2 bytes of for storing tags
215        // length
216        tagsLenSerializationSize = this.tagsOffset - (this.valueOffset + this.valueLength);
217        valAndTagsLength += tagsLenSerializationSize + this.tagsLength;
218      }
219      this.currentBuffer.asSubByteBuffer(this.valueOffset, valAndTagsLength, this.tmpPair);
220      valAndTagsBuffer = this.tmpPair.getFirst();
221      vOffset = this.tmpPair.getSecond();// This is the offset to value part in the BB
222      if (valAndTagsBuffer.hasArray()) {
223        return toOnheapCell(valAndTagsBuffer, vOffset, tagsLenSerializationSize);
224      } else {
225        return toOffheapCell(valAndTagsBuffer, vOffset, tagsLenSerializationSize);
226      }
227    }
228
229    private Cell toOnheapCell(ByteBuffer valAndTagsBuffer, int vOffset,
230      int tagsLenSerializationSize) {
231      byte[] tagsArray = HConstants.EMPTY_BYTE_ARRAY;
232      int tOffset = 0;
233      if (this.includeTags) {
234        if (this.tagCompressionContext == null) {
235          tagsArray = valAndTagsBuffer.array();
236          tOffset =
237            valAndTagsBuffer.arrayOffset() + vOffset + this.valueLength + tagsLenSerializationSize;
238        } else {
239          tagsArray = Bytes.copy(tagsBuffer, 0, this.tagsLength);
240          tOffset = 0;
241        }
242      }
243      return new OnheapDecodedCell(Bytes.copy(keyBuffer, 0, this.keyLength),
244        currentKey.getRowLength(), currentKey.getFamilyOffset(), currentKey.getFamilyLength(),
245        currentKey.getQualifierOffset(), currentKey.getQualifierLength(), currentKey.getTimestamp(),
246        currentKey.getTypeByte(), valAndTagsBuffer.array(),
247        valAndTagsBuffer.arrayOffset() + vOffset, this.valueLength, memstoreTS, tagsArray, tOffset,
248        this.tagsLength);
249    }
250
251    private Cell toOffheapCell(ByteBuffer valAndTagsBuffer, int vOffset,
252      int tagsLenSerializationSize) {
253      ByteBuffer tagsBuf = HConstants.EMPTY_BYTE_BUFFER;
254      int tOffset = 0;
255      if (this.includeTags) {
256        if (this.tagCompressionContext == null) {
257          tagsBuf = valAndTagsBuffer;
258          tOffset = vOffset + this.valueLength + tagsLenSerializationSize;
259        } else {
260          tagsBuf = ByteBuffer.wrap(Bytes.copy(tagsBuffer, 0, this.tagsLength));
261          tOffset = 0;
262        }
263      }
264      return new OffheapDecodedExtendedCell(
265        ByteBuffer.wrap(Bytes.copy(keyBuffer, 0, this.keyLength)), currentKey.getRowLength(),
266        currentKey.getFamilyOffset(), currentKey.getFamilyLength(), currentKey.getQualifierOffset(),
267        currentKey.getQualifierLength(), currentKey.getTimestamp(), currentKey.getTypeByte(),
268        valAndTagsBuffer, vOffset, this.valueLength, memstoreTS, tagsBuf, tOffset, this.tagsLength);
269    }
270  }
271
272  /**
273   * Copies only the key part of the keybuffer by doing a deep copy and passes the seeker state
274   * members for taking a clone. Note that the value byte[] part is still pointing to the
275   * currentBuffer and represented by the valueOffset and valueLength
276   */
277  // We return this as a Cell to the upper layers of read flow and might try setting a new SeqId
278  // there. So this has to be an instance of ExtendedCell.
279  protected static class OnheapDecodedCell implements ExtendedCell {
280    private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
281      + (3 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT)
282      + (Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) + (3 * ClassSize.ARRAY));
283    private byte[] keyOnlyBuffer;
284    private short rowLength;
285    private int familyOffset;
286    private byte familyLength;
287    private int qualifierOffset;
288    private int qualifierLength;
289    private long timestamp;
290    private byte typeByte;
291    private byte[] valueBuffer;
292    private int valueOffset;
293    private int valueLength;
294    private byte[] tagsBuffer;
295    private int tagsOffset;
296    private int tagsLength;
297    private long seqId;
298
299    protected OnheapDecodedCell(byte[] keyBuffer, short rowLength, int familyOffset,
300      byte familyLength, int qualOffset, int qualLength, long timeStamp, byte typeByte,
301      byte[] valueBuffer, int valueOffset, int valueLen, long seqId, byte[] tagsBuffer,
302      int tagsOffset, int tagsLength) {
303      this.keyOnlyBuffer = keyBuffer;
304      this.rowLength = rowLength;
305      this.familyOffset = familyOffset;
306      this.familyLength = familyLength;
307      this.qualifierOffset = qualOffset;
308      this.qualifierLength = qualLength;
309      this.timestamp = timeStamp;
310      this.typeByte = typeByte;
311      this.valueBuffer = valueBuffer;
312      this.valueOffset = valueOffset;
313      this.valueLength = valueLen;
314      this.tagsBuffer = tagsBuffer;
315      this.tagsOffset = tagsOffset;
316      this.tagsLength = tagsLength;
317      setSequenceId(seqId);
318    }
319
320    @Override
321    public byte[] getRowArray() {
322      return keyOnlyBuffer;
323    }
324
325    @Override
326    public byte[] getFamilyArray() {
327      return keyOnlyBuffer;
328    }
329
330    @Override
331    public byte[] getQualifierArray() {
332      return keyOnlyBuffer;
333    }
334
335    @Override
336    public int getRowOffset() {
337      return Bytes.SIZEOF_SHORT;
338    }
339
340    @Override
341    public short getRowLength() {
342      return rowLength;
343    }
344
345    @Override
346    public int getFamilyOffset() {
347      return familyOffset;
348    }
349
350    @Override
351    public byte getFamilyLength() {
352      return familyLength;
353    }
354
355    @Override
356    public int getQualifierOffset() {
357      return qualifierOffset;
358    }
359
360    @Override
361    public int getQualifierLength() {
362      return qualifierLength;
363    }
364
365    @Override
366    public long getTimestamp() {
367      return timestamp;
368    }
369
370    @Override
371    public byte getTypeByte() {
372      return typeByte;
373    }
374
375    @Override
376    public long getSequenceId() {
377      return seqId;
378    }
379
380    @Override
381    public byte[] getValueArray() {
382      return this.valueBuffer;
383    }
384
385    @Override
386    public int getValueOffset() {
387      return valueOffset;
388    }
389
390    @Override
391    public int getValueLength() {
392      return valueLength;
393    }
394
395    @Override
396    public byte[] getTagsArray() {
397      return this.tagsBuffer;
398    }
399
400    @Override
401    public int getTagsOffset() {
402      return this.tagsOffset;
403    }
404
405    @Override
406    public int getTagsLength() {
407      return tagsLength;
408    }
409
410    @Override
411    public String toString() {
412      return KeyValue.keyToString(this.keyOnlyBuffer, 0, KeyValueUtil.keyLength(this)) + "/vlen="
413        + getValueLength() + "/seqid=" + seqId;
414    }
415
416    @Override
417    public void setSequenceId(long seqId) {
418      this.seqId = seqId;
419    }
420
421    @Override
422    public long heapSize() {
423      return FIXED_OVERHEAD + rowLength + familyLength + qualifierLength + valueLength + tagsLength;
424    }
425
426    @Override
427    public int write(OutputStream out, boolean withTags) throws IOException {
428      int lenToWrite = getSerializedSize(withTags);
429      ByteBufferUtils.putInt(out, keyOnlyBuffer.length);
430      ByteBufferUtils.putInt(out, valueLength);
431      // Write key
432      out.write(keyOnlyBuffer);
433      // Write value
434      out.write(this.valueBuffer, this.valueOffset, this.valueLength);
435      if (withTags && this.tagsLength > 0) {
436        // 2 bytes tags length followed by tags bytes
437        // tags length is serialized with 2 bytes only(short way) even if the type is int.
438        // As this is non -ve numbers, we save the sign bit. See HBASE-11437
439        out.write((byte) (0xff & (this.tagsLength >> 8)));
440        out.write((byte) (0xff & this.tagsLength));
441        out.write(this.tagsBuffer, this.tagsOffset, this.tagsLength);
442      }
443      return lenToWrite;
444    }
445
446    @Override
447    public int getSerializedSize(boolean withTags) {
448      return KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength, tagsLength,
449        withTags);
450    }
451
452    @Override
453    public void write(ByteBuffer buf, int offset) {
454      // This is not used in actual flow. Throwing UnsupportedOperationException
455      throw new UnsupportedOperationException();
456    }
457
458    @Override
459    public void setTimestamp(long ts) throws IOException {
460      // This is not used in actual flow. Throwing UnsupportedOperationException
461      throw new UnsupportedOperationException();
462    }
463
464    @Override
465    public void setTimestamp(byte[] ts) throws IOException {
466      // This is not used in actual flow. Throwing UnsupportedOperationException
467      throw new UnsupportedOperationException();
468    }
469
470    @Override
471    public ExtendedCell deepClone() {
472      // This is not used in actual flow. Throwing UnsupportedOperationException
473      throw new UnsupportedOperationException();
474    }
475  }
476
477  protected static class OffheapDecodedExtendedCell extends ByteBufferExtendedCell {
478    private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
479      + (3 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT)
480      + (Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) + (3 * ClassSize.BYTE_BUFFER));
481    private ByteBuffer keyBuffer;
482    private short rowLength;
483    private int familyOffset;
484    private byte familyLength;
485    private int qualifierOffset;
486    private int qualifierLength;
487    private long timestamp;
488    private byte typeByte;
489    private ByteBuffer valueBuffer;
490    private int valueOffset;
491    private int valueLength;
492    private ByteBuffer tagsBuffer;
493    private int tagsOffset;
494    private int tagsLength;
495    private long seqId;
496
497    protected OffheapDecodedExtendedCell(ByteBuffer keyBuffer, short rowLength, int familyOffset,
498      byte familyLength, int qualOffset, int qualLength, long timeStamp, byte typeByte,
499      ByteBuffer valueBuffer, int valueOffset, int valueLen, long seqId, ByteBuffer tagsBuffer,
500      int tagsOffset, int tagsLength) {
501      // The keyBuffer is always onheap
502      assert keyBuffer.hasArray();
503      assert keyBuffer.arrayOffset() == 0;
504      this.keyBuffer = keyBuffer;
505      this.rowLength = rowLength;
506      this.familyOffset = familyOffset;
507      this.familyLength = familyLength;
508      this.qualifierOffset = qualOffset;
509      this.qualifierLength = qualLength;
510      this.timestamp = timeStamp;
511      this.typeByte = typeByte;
512      this.valueBuffer = valueBuffer;
513      this.valueOffset = valueOffset;
514      this.valueLength = valueLen;
515      this.tagsBuffer = tagsBuffer;
516      this.tagsOffset = tagsOffset;
517      this.tagsLength = tagsLength;
518      setSequenceId(seqId);
519    }
520
521    @Override
522    public byte[] getRowArray() {
523      return this.keyBuffer.array();
524    }
525
526    @Override
527    public int getRowOffset() {
528      return getRowPosition();
529    }
530
531    @Override
532    public short getRowLength() {
533      return this.rowLength;
534    }
535
536    @Override
537    public byte[] getFamilyArray() {
538      return this.keyBuffer.array();
539    }
540
541    @Override
542    public int getFamilyOffset() {
543      return getFamilyPosition();
544    }
545
546    @Override
547    public byte getFamilyLength() {
548      return this.familyLength;
549    }
550
551    @Override
552    public byte[] getQualifierArray() {
553      return this.keyBuffer.array();
554    }
555
556    @Override
557    public int getQualifierOffset() {
558      return getQualifierPosition();
559    }
560
561    @Override
562    public int getQualifierLength() {
563      return this.qualifierLength;
564    }
565
566    @Override
567    public long getTimestamp() {
568      return this.timestamp;
569    }
570
571    @Override
572    public byte getTypeByte() {
573      return this.typeByte;
574    }
575
576    @Override
577    public long getSequenceId() {
578      return this.seqId;
579    }
580
581    @Override
582    public byte[] getValueArray() {
583      return CellUtil.cloneValue(this);
584    }
585
586    @Override
587    public int getValueOffset() {
588      return 0;
589    }
590
591    @Override
592    public int getValueLength() {
593      return this.valueLength;
594    }
595
596    @Override
597    public byte[] getTagsArray() {
598      return PrivateCellUtil.cloneTags(this);
599    }
600
601    @Override
602    public int getTagsOffset() {
603      return 0;
604    }
605
606    @Override
607    public int getTagsLength() {
608      return this.tagsLength;
609    }
610
611    @Override
612    public ByteBuffer getRowByteBuffer() {
613      return this.keyBuffer;
614    }
615
616    @Override
617    public int getRowPosition() {
618      return Bytes.SIZEOF_SHORT;
619    }
620
621    @Override
622    public ByteBuffer getFamilyByteBuffer() {
623      return this.keyBuffer;
624    }
625
626    @Override
627    public int getFamilyPosition() {
628      return this.familyOffset;
629    }
630
631    @Override
632    public ByteBuffer getQualifierByteBuffer() {
633      return this.keyBuffer;
634    }
635
636    @Override
637    public int getQualifierPosition() {
638      return this.qualifierOffset;
639    }
640
641    @Override
642    public ByteBuffer getValueByteBuffer() {
643      return this.valueBuffer;
644    }
645
646    @Override
647    public int getValuePosition() {
648      return this.valueOffset;
649    }
650
651    @Override
652    public ByteBuffer getTagsByteBuffer() {
653      return this.tagsBuffer;
654    }
655
656    @Override
657    public int getTagsPosition() {
658      return this.tagsOffset;
659    }
660
661    @Override
662    public long heapSize() {
663      return FIXED_OVERHEAD;
664    }
665
666    @Override
667    public void setSequenceId(long seqId) {
668      this.seqId = seqId;
669    }
670
671    @Override
672    public int write(OutputStream out, boolean withTags) throws IOException {
673      int lenToWrite = getSerializedSize(withTags);
674      ByteBufferUtils.putInt(out, keyBuffer.capacity());
675      ByteBufferUtils.putInt(out, valueLength);
676      // Write key
677      out.write(keyBuffer.array());
678      // Write value
679      ByteBufferUtils.copyBufferToStream(out, this.valueBuffer, this.valueOffset, this.valueLength);
680      if (withTags && this.tagsLength > 0) {
681        // 2 bytes tags length followed by tags bytes
682        // tags length is serialized with 2 bytes only(short way) even if the type is int.
683        // As this is non -ve numbers, we save the sign bit. See HBASE-11437
684        out.write((byte) (0xff & (this.tagsLength >> 8)));
685        out.write((byte) (0xff & this.tagsLength));
686        ByteBufferUtils.copyBufferToStream(out, this.tagsBuffer, this.tagsOffset, this.tagsLength);
687      }
688      return lenToWrite;
689    }
690
691    @Override
692    public int getSerializedSize(boolean withTags) {
693      return KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength, tagsLength,
694        withTags);
695    }
696
697    @Override
698    public void setTimestamp(long ts) throws IOException {
699      // This is not used in actual flow. Throwing UnsupportedOperationException
700      throw new UnsupportedOperationException();
701    }
702
703    @Override
704    public void setTimestamp(byte[] ts) throws IOException {
705      // This is not used in actual flow. Throwing UnsupportedOperationException
706      throw new UnsupportedOperationException();
707    }
708
709    @Override
710    public void write(ByteBuffer buf, int offset) {
711      // This is not used in actual flow. Throwing UnsupportedOperationException
712      throw new UnsupportedOperationException();
713    }
714
715    @Override
716    public ExtendedCell deepClone() {
717      // This is not used in actual flow. Throwing UnsupportedOperationException
718      throw new UnsupportedOperationException();
719    }
720  }
721
722  protected abstract static class BufferedEncodedSeeker<STATE extends SeekerState>
723    extends AbstractEncodedSeeker {
724    protected ByteBuff currentBuffer;
725    protected TagCompressionContext tagCompressionContext = null;
726    protected KeyValue.KeyOnlyKeyValue keyOnlyKV = new KeyValue.KeyOnlyKeyValue();
727    // A temp pair object which will be reused by ByteBuff#asSubByteBuffer calls. This avoids too
728    // many object creations.
729    protected final ObjectIntPair<ByteBuffer> tmpPair = new ObjectIntPair<>();
730    protected STATE current, previous;
731
732    public BufferedEncodedSeeker(HFileBlockDecodingContext decodingCtx) {
733      super(decodingCtx);
734      if (decodingCtx.getHFileContext().isCompressTags()) {
735        try {
736          tagCompressionContext = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
737        } catch (Exception e) {
738          throw new RuntimeException("Failed to initialize TagCompressionContext", e);
739        }
740      }
741      current = createSeekerState(); // always valid
742      previous = createSeekerState(); // may not be valid
743    }
744
745    @Override
746    public int compareKey(CellComparator comparator, Cell key) {
747      keyOnlyKV.setKey(current.keyBuffer, 0, current.keyLength);
748      return PrivateCellUtil.compareKeyIgnoresMvcc(comparator, key, keyOnlyKV);
749    }
750
751    @Override
752    public void setCurrentBuffer(ByteBuff buffer) {
753      if (this.tagCompressionContext != null) {
754        this.tagCompressionContext.clear();
755      }
756      currentBuffer = buffer;
757      current.currentBuffer = currentBuffer;
758      if (tagCompressionContext != null) {
759        current.tagCompressionContext = tagCompressionContext;
760      }
761      decodeFirst();
762      current.setKey(current.keyBuffer, current.memstoreTS);
763      previous.invalidate();
764    }
765
766    @Override
767    public Cell getKey() {
768      byte[] key = new byte[current.keyLength];
769      System.arraycopy(current.keyBuffer, 0, key, 0, current.keyLength);
770      return new KeyValue.KeyOnlyKeyValue(key);
771    }
772
773    @Override
774    public ByteBuffer getValueShallowCopy() {
775      currentBuffer.asSubByteBuffer(current.valueOffset, current.valueLength, tmpPair);
776      ByteBuffer dup = tmpPair.getFirst().duplicate();
777      dup.position(tmpPair.getSecond());
778      dup.limit(tmpPair.getSecond() + current.valueLength);
779      return dup.slice();
780    }
781
782    @Override
783    public Cell getCell() {
784      return current.toCell();
785    }
786
787    @Override
788    public void rewind() {
789      currentBuffer.rewind();
790      if (tagCompressionContext != null) {
791        tagCompressionContext.clear();
792      }
793      decodeFirst();
794      current.setKey(current.keyBuffer, current.memstoreTS);
795      previous.invalidate();
796    }
797
798    @Override
799    public boolean next() {
800      if (!currentBuffer.hasRemaining()) {
801        return false;
802      }
803      decodeNext();
804      current.setKey(current.keyBuffer, current.memstoreTS);
805      previous.invalidate();
806      return true;
807    }
808
809    protected void decodeTags() {
810      current.tagsLength = ByteBuff.readCompressedInt(currentBuffer);
811      if (tagCompressionContext != null) {
812        if (current.uncompressTags) {
813          // Tag compression is been used. uncompress it into tagsBuffer
814          current.ensureSpaceForTags();
815          try {
816            current.tagsCompressedLength = tagCompressionContext.uncompressTags(currentBuffer,
817              current.tagsBuffer, 0, current.tagsLength);
818          } catch (IOException e) {
819            throw new RuntimeException("Exception while uncompressing tags", e);
820          }
821        } else {
822          currentBuffer.skip(current.tagsCompressedLength);
823          current.uncompressTags = true;// Reset this.
824        }
825        current.tagsOffset = -1;
826      } else {
827        // When tag compress is not used, let us not do copying of tags bytes into tagsBuffer.
828        // Just mark the tags Offset so as to create the KV buffer later in getKeyValueBuffer()
829        current.tagsOffset = currentBuffer.position();
830        currentBuffer.skip(current.tagsLength);
831      }
832    }
833
834    @Override
835    public int seekToKeyInBlock(Cell seekCell, boolean seekBefore) {
836      int rowCommonPrefix = 0;
837      int familyCommonPrefix = 0;
838      int qualCommonPrefix = 0;
839      previous.invalidate();
840      do {
841        int comp;
842        keyOnlyKV.setKey(current.keyBuffer, 0, current.keyLength);
843        if (current.lastCommonPrefix != 0) {
844          // The KV format has row key length also in the byte array. The
845          // common prefix
846          // includes it. So we need to subtract to find out the common prefix
847          // in the
848          // row part alone
849          rowCommonPrefix = Math.min(rowCommonPrefix, current.lastCommonPrefix - 2);
850        }
851        if (current.lastCommonPrefix <= 2) {
852          rowCommonPrefix = 0;
853        }
854        rowCommonPrefix += findCommonPrefixInRowPart(seekCell, keyOnlyKV, rowCommonPrefix);
855        comp = compareCommonRowPrefix(seekCell, keyOnlyKV, rowCommonPrefix);
856        if (comp == 0) {
857          comp = compareTypeBytes(seekCell, keyOnlyKV);
858          if (comp == 0) {
859            // Subtract the fixed row key length and the family key fixed length
860            familyCommonPrefix = Math.max(0, Math.min(familyCommonPrefix,
861              current.lastCommonPrefix - (3 + keyOnlyKV.getRowLength())));
862            familyCommonPrefix +=
863              findCommonPrefixInFamilyPart(seekCell, keyOnlyKV, familyCommonPrefix);
864            comp = compareCommonFamilyPrefix(seekCell, keyOnlyKV, familyCommonPrefix);
865            if (comp == 0) {
866              // subtract the rowkey fixed length and the family key fixed
867              // length
868              qualCommonPrefix = Math.max(0, Math.min(qualCommonPrefix, current.lastCommonPrefix
869                - (3 + keyOnlyKV.getRowLength() + keyOnlyKV.getFamilyLength())));
870              qualCommonPrefix +=
871                findCommonPrefixInQualifierPart(seekCell, keyOnlyKV, qualCommonPrefix);
872              comp = compareCommonQualifierPrefix(seekCell, keyOnlyKV, qualCommonPrefix);
873              if (comp == 0) {
874                comp = CellComparator.getInstance().compareTimestamps(seekCell, keyOnlyKV);
875                if (comp == 0) {
876                  // Compare types. Let the delete types sort ahead of puts;
877                  // i.e. types
878                  // of higher numbers sort before those of lesser numbers.
879                  // Maximum
880                  // (255)
881                  // appears ahead of everything, and minimum (0) appears
882                  // after
883                  // everything.
884                  comp = (0xff & keyOnlyKV.getTypeByte()) - (0xff & seekCell.getTypeByte());
885                }
886              }
887            }
888          }
889        }
890        if (comp == 0) { // exact match
891          if (seekBefore) {
892            if (!previous.isValid()) {
893              // The caller (seekBefore) has to ensure that we are not at the
894              // first key in the block.
895              throw new IllegalStateException(
896                "Cannot seekBefore if " + "positioned at the first key in the block: key="
897                  + Bytes.toStringBinary(seekCell.getRowArray()));
898            }
899            moveToPrevious();
900            return 1;
901          }
902          return 0;
903        }
904
905        if (comp < 0) { // already too large, check previous
906          if (previous.isValid()) {
907            moveToPrevious();
908          } else {
909            return HConstants.INDEX_KEY_MAGIC; // using optimized index key
910          }
911          return 1;
912        }
913
914        // move to next, if more data is available
915        if (currentBuffer.hasRemaining()) {
916          previous.copyFromNext(current);
917          decodeNext();
918          current.setKey(current.keyBuffer, current.memstoreTS);
919        } else {
920          break;
921        }
922      } while (true);
923
924      // we hit the end of the block, not an exact match
925      return 1;
926    }
927
928    private int compareTypeBytes(Cell key, Cell right) {
929      if (
930        key.getFamilyLength() + key.getQualifierLength() == 0
931          && key.getTypeByte() == Type.Minimum.getCode()
932      ) {
933        // left is "bigger", i.e. it appears later in the sorted order
934        return 1;
935      }
936      if (
937        right.getFamilyLength() + right.getQualifierLength() == 0
938          && right.getTypeByte() == Type.Minimum.getCode()
939      ) {
940        return -1;
941      }
942      return 0;
943    }
944
945    private static int findCommonPrefixInRowPart(Cell left, Cell right, int rowCommonPrefix) {
946      return Bytes.findCommonPrefix(left.getRowArray(), right.getRowArray(),
947        left.getRowLength() - rowCommonPrefix, right.getRowLength() - rowCommonPrefix,
948        left.getRowOffset() + rowCommonPrefix, right.getRowOffset() + rowCommonPrefix);
949    }
950
951    private static int findCommonPrefixInFamilyPart(Cell left, Cell right, int familyCommonPrefix) {
952      return Bytes.findCommonPrefix(left.getFamilyArray(), right.getFamilyArray(),
953        left.getFamilyLength() - familyCommonPrefix, right.getFamilyLength() - familyCommonPrefix,
954        left.getFamilyOffset() + familyCommonPrefix, right.getFamilyOffset() + familyCommonPrefix);
955    }
956
957    private static int findCommonPrefixInQualifierPart(Cell left, Cell right,
958      int qualifierCommonPrefix) {
959      return Bytes.findCommonPrefix(left.getQualifierArray(), right.getQualifierArray(),
960        left.getQualifierLength() - qualifierCommonPrefix,
961        right.getQualifierLength() - qualifierCommonPrefix,
962        left.getQualifierOffset() + qualifierCommonPrefix,
963        right.getQualifierOffset() + qualifierCommonPrefix);
964    }
965
966    private void moveToPrevious() {
967      if (!previous.isValid()) {
968        throw new IllegalStateException(
969          "Can move back only once and not in first key in the block.");
970      }
971
972      STATE tmp = previous;
973      previous = current;
974      current = tmp;
975
976      // move after last key value
977      currentBuffer.position(current.nextKvOffset);
978      // Already decoded the tag bytes. We cache this tags into current state and also the total
979      // compressed length of the tags bytes. For the next time decodeNext() we don't need to decode
980      // the tags again. This might pollute the Data Dictionary what we use for the compression.
981      // When current.uncompressTags is false, we will just reuse the current.tagsBuffer and skip
982      // 'tagsCompressedLength' bytes of source stream.
983      // See in decodeTags()
984      current.tagsBuffer = previous.tagsBuffer;
985      current.tagsCompressedLength = previous.tagsCompressedLength;
986      current.uncompressTags = false;
987      // The current key has to be reset with the previous Cell
988      current.setKey(current.keyBuffer, current.memstoreTS);
989      previous.invalidate();
990    }
991
992    @SuppressWarnings("unchecked")
993    protected STATE createSeekerState() {
994      // This will fail for non-default seeker state if the subclass does not
995      // override this method.
996      return (STATE) new SeekerState(this.tmpPair, this.includesTags());
997    }
998
999    abstract protected void decodeFirst();
1000
1001    abstract protected void decodeNext();
1002  }
1003
1004  /**
1005   * @return unencoded size added
1006   */
1007  protected final int afterEncodingKeyValue(Cell cell, DataOutputStream out,
1008    HFileBlockDefaultEncodingContext encodingCtx) throws IOException {
1009    int size = 0;
1010    if (encodingCtx.getHFileContext().isIncludesTags()) {
1011      int tagsLength = cell.getTagsLength();
1012      ByteBufferUtils.putCompressedInt(out, tagsLength);
1013      // There are some tags to be written
1014      if (tagsLength > 0) {
1015        TagCompressionContext tagCompressionContext = encodingCtx.getTagCompressionContext();
1016        // When tag compression is enabled, tagCompressionContext will have a not null value. Write
1017        // the tags using Dictionary compression in such a case
1018        if (tagCompressionContext != null) {
1019          // Not passing tagsLength considering that parsing of the tagsLength is not costly
1020          PrivateCellUtil.compressTags(out, cell, tagCompressionContext);
1021        } else {
1022          PrivateCellUtil.writeTags(out, cell, tagsLength);
1023        }
1024      }
1025      size += tagsLength + KeyValue.TAGS_LENGTH_SIZE;
1026    }
1027    if (encodingCtx.getHFileContext().isIncludesMvcc()) {
1028      // Copy memstore timestamp from the byte buffer to the output stream.
1029      long memstoreTS = cell.getSequenceId();
1030      WritableUtils.writeVLong(out, memstoreTS);
1031      // TODO use a writeVLong which returns the #bytes written so that 2 time parsing can be
1032      // avoided.
1033      size += WritableUtils.getVIntSize(memstoreTS);
1034    }
1035    return size;
1036  }
1037
1038  protected final void afterDecodingKeyValue(DataInputStream source, ByteBuffer dest,
1039    HFileBlockDefaultDecodingContext decodingCtx) throws IOException {
1040    if (decodingCtx.getHFileContext().isIncludesTags()) {
1041      int tagsLength = ByteBufferUtils.readCompressedInt(source);
1042      // Put as unsigned short
1043      dest.put((byte) ((tagsLength >> 8) & 0xff));
1044      dest.put((byte) (tagsLength & 0xff));
1045      if (tagsLength > 0) {
1046        TagCompressionContext tagCompressionContext = decodingCtx.getTagCompressionContext();
1047        // When tag compression is been used in this file, tagCompressionContext will have a not
1048        // null value passed.
1049        if (tagCompressionContext != null) {
1050          tagCompressionContext.uncompressTags(source, dest, tagsLength);
1051        } else {
1052          ByteBufferUtils.copyFromStreamToBuffer(dest, source, tagsLength);
1053        }
1054      }
1055    }
1056    if (decodingCtx.getHFileContext().isIncludesMvcc()) {
1057      long memstoreTS = -1;
1058      try {
1059        // Copy memstore timestamp from the data input stream to the byte
1060        // buffer.
1061        memstoreTS = WritableUtils.readVLong(source);
1062        ByteBufferUtils.writeVLong(dest, memstoreTS);
1063      } catch (IOException ex) {
1064        throw new RuntimeException(
1065          "Unable to copy memstore timestamp " + memstoreTS + " after decoding a key/value");
1066      }
1067    }
1068  }
1069
1070  protected abstract ByteBuffer internalDecodeKeyValues(DataInputStream source,
1071    int allocateHeaderLength, int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx)
1072    throws IOException;
1073
1074  /**
1075   * Asserts that there is at least the given amount of unfilled space remaining in the given
1076   * buffer.
1077   * @param out    typically, the buffer we are writing to
1078   * @param length the required space in the buffer
1079   * @throws EncoderBufferTooSmallException If there are no enough bytes.
1080   */
1081  protected static void ensureSpace(ByteBuffer out, int length)
1082    throws EncoderBufferTooSmallException {
1083    if (out.position() + length > out.limit()) {
1084      throw new EncoderBufferTooSmallException("Buffer position=" + out.position()
1085        + ", buffer limit=" + out.limit() + ", length to be written=" + length);
1086    }
1087  }
1088
1089  @Override
1090  public void startBlockEncoding(HFileBlockEncodingContext blkEncodingCtx, DataOutputStream out)
1091    throws IOException {
1092    if (blkEncodingCtx.getClass() != HFileBlockDefaultEncodingContext.class) {
1093      throw new IOException(this.getClass().getName() + " only accepts "
1094        + HFileBlockDefaultEncodingContext.class.getName() + " as the " + "encoding context.");
1095    }
1096
1097    HFileBlockDefaultEncodingContext encodingCtx =
1098      (HFileBlockDefaultEncodingContext) blkEncodingCtx;
1099    encodingCtx.prepareEncoding(out);
1100    if (
1101      encodingCtx.getHFileContext().isIncludesTags()
1102        && encodingCtx.getHFileContext().isCompressTags()
1103    ) {
1104      if (encodingCtx.getTagCompressionContext() != null) {
1105        // It will be overhead to create the TagCompressionContext again and again for every block
1106        // encoding.
1107        encodingCtx.getTagCompressionContext().clear();
1108      } else {
1109        try {
1110          TagCompressionContext tagCompressionContext =
1111            new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
1112          encodingCtx.setTagCompressionContext(tagCompressionContext);
1113        } catch (Exception e) {
1114          throw new IOException("Failed to initialize TagCompressionContext", e);
1115        }
1116      }
1117    }
1118    StreamUtils.writeInt(out, 0); // DUMMY length. This will be updated in endBlockEncoding()
1119    blkEncodingCtx.setEncodingState(new EncodingState());
1120  }
1121
1122  @Override
1123  public void encode(Cell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
1124    throws IOException {
1125    EncodingState state = encodingCtx.getEncodingState();
1126    int posBeforeEncode = out.size();
1127    int encodedKvSize = internalEncode(cell, (HFileBlockDefaultEncodingContext) encodingCtx, out);
1128    state.postCellEncode(encodedKvSize, out.size() - posBeforeEncode);
1129  }
1130
1131  public abstract int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingCtx,
1132    DataOutputStream out) throws IOException;
1133
1134  @Override
1135  public void endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out,
1136    byte[] uncompressedBytesWithHeader) throws IOException {
1137    EncodingState state = encodingCtx.getEncodingState();
1138    // Write the unencodedDataSizeWritten (with header size)
1139    Bytes.putInt(uncompressedBytesWithHeader,
1140      HConstants.HFILEBLOCK_HEADER_SIZE + DataBlockEncoding.ID_SIZE,
1141      state.getUnencodedDataSizeWritten());
1142    postEncoding(encodingCtx);
1143  }
1144
1145}