001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.encoding;
019
020import java.io.DataInputStream;
021import java.io.DataOutput;
022import java.io.DataOutputStream;
023import java.io.IOException;
024import java.nio.ByteBuffer;
025import org.apache.hadoop.hbase.Cell;
026import org.apache.hadoop.hbase.KeyValue;
027import org.apache.hadoop.hbase.KeyValueUtil;
028import org.apache.hadoop.hbase.PrivateCellUtil;
029import org.apache.hadoop.hbase.nio.ByteBuff;
030import org.apache.hadoop.hbase.util.ByteBufferUtils;
031import org.apache.hadoop.hbase.util.Bytes;
032import org.apache.yetus.audience.InterfaceAudience;
033
034/**
035 * Compress key by storing size of common prefix with previous KeyValue and storing raw size of
036 * rest. Format: 1-5 bytes: compressed key length minus prefix (7-bit encoding) 1-5 bytes:
037 * compressed value length (7-bit encoding) 1-3 bytes: compressed length of common key prefix ...
038 * bytes: rest of key (including timestamp) ... bytes: value In a worst case compressed KeyValue
039 * will be three bytes longer than original.
040 */
041@InterfaceAudience.Private
042public class PrefixKeyDeltaEncoder extends BufferedDataBlockEncoder {
043
044  @Override
045  public int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingContext,
046    DataOutputStream out) throws IOException {
047    int klength = KeyValueUtil.keyLength(cell);
048    int vlength = cell.getValueLength();
049    EncodingState state = encodingContext.getEncodingState();
050    if (state.prevCell == null) {
051      // copy the key, there is no common prefix with none
052      ByteBufferUtils.putCompressedInt(out, klength);
053      ByteBufferUtils.putCompressedInt(out, vlength);
054      ByteBufferUtils.putCompressedInt(out, 0);
055      PrivateCellUtil.writeFlatKey(cell, (DataOutput) out);
056    } else {
057      // find a common prefix and skip it
058      int common = PrivateCellUtil.findCommonPrefixInFlatKey(cell, state.prevCell, true, true);
059      ByteBufferUtils.putCompressedInt(out, klength - common);
060      ByteBufferUtils.putCompressedInt(out, vlength);
061      ByteBufferUtils.putCompressedInt(out, common);
062      writeKeyExcludingCommon(cell, common, out);
063    }
064    // Write the value part
065    PrivateCellUtil.writeValue(out, cell, vlength);
066    int size = klength + vlength + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE;
067    size += afterEncodingKeyValue(cell, out, encodingContext);
068    state.prevCell = cell;
069    return size;
070  }
071
072  private void writeKeyExcludingCommon(Cell cell, int commonPrefix, DataOutputStream out)
073    throws IOException {
074    short rLen = cell.getRowLength();
075    if (commonPrefix < rLen + KeyValue.ROW_LENGTH_SIZE) {
076      // Previous and current rows are different. Need to write the differing part followed by
077      // cf,q,ts and type
078      PrivateCellUtil.writeRowKeyExcludingCommon(cell, rLen, commonPrefix, out);
079      byte fLen = cell.getFamilyLength();
080      out.writeByte(fLen);
081      PrivateCellUtil.writeFamily(out, cell, fLen);
082      PrivateCellUtil.writeQualifier(out, cell, cell.getQualifierLength());
083      out.writeLong(cell.getTimestamp());
084      out.writeByte(cell.getTypeByte());
085    } else {
086      // The full row key part is common. CF part will be common for sure as we deal with Cells in
087      // same family. Just need write the differing part in q, ts and type
088      commonPrefix = commonPrefix - (rLen + KeyValue.ROW_LENGTH_SIZE)
089        - (cell.getFamilyLength() + KeyValue.FAMILY_LENGTH_SIZE);
090      int qLen = cell.getQualifierLength();
091      int commonQualPrefix = Math.min(commonPrefix, qLen);
092      int qualPartLenToWrite = qLen - commonQualPrefix;
093      if (qualPartLenToWrite > 0) {
094        PrivateCellUtil.writeQualifierSkippingBytes(out, cell, qLen, commonQualPrefix);
095      }
096      commonPrefix -= commonQualPrefix;
097      // Common part in TS also?
098      if (commonPrefix > 0) {
099        int commonTimestampPrefix = Math.min(commonPrefix, KeyValue.TIMESTAMP_SIZE);
100        if (commonTimestampPrefix < KeyValue.TIMESTAMP_SIZE) {
101          byte[] curTsBuf = Bytes.toBytes(cell.getTimestamp());
102          out.write(curTsBuf, commonTimestampPrefix,
103            KeyValue.TIMESTAMP_SIZE - commonTimestampPrefix);
104        }
105        commonPrefix -= commonTimestampPrefix;
106        if (commonPrefix == 0) {
107          out.writeByte(cell.getTypeByte());
108        }
109      } else {
110        out.writeLong(cell.getTimestamp());
111        out.writeByte(cell.getTypeByte());
112      }
113    }
114  }
115
116  @Override
117  protected ByteBuffer internalDecodeKeyValues(DataInputStream source, int allocateHeaderLength,
118    int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx) throws IOException {
119    int decompressedSize = source.readInt();
120    ByteBuffer buffer = ByteBuffer.allocate(decompressedSize + allocateHeaderLength);
121    buffer.position(allocateHeaderLength);
122    int prevKeyOffset = 0;
123
124    while (source.available() > skipLastBytes) {
125      prevKeyOffset = decodeKeyValue(source, buffer, prevKeyOffset);
126      afterDecodingKeyValue(source, buffer, decodingCtx);
127    }
128
129    if (source.available() != skipLastBytes) {
130      throw new IllegalStateException("Read too many bytes.");
131    }
132
133    buffer.limit(buffer.position());
134    return buffer;
135  }
136
137  private int decodeKeyValue(DataInputStream source, ByteBuffer buffer, int prevKeyOffset)
138    throws IOException, EncoderBufferTooSmallException {
139    int keyLength = ByteBufferUtils.readCompressedInt(source);
140    int valueLength = ByteBufferUtils.readCompressedInt(source);
141    int commonLength = ByteBufferUtils.readCompressedInt(source);
142    int keyOffset;
143    keyLength += commonLength;
144
145    ensureSpace(buffer, keyLength + valueLength + KeyValue.ROW_OFFSET);
146
147    buffer.putInt(keyLength);
148    buffer.putInt(valueLength);
149
150    // copy the prefix
151    if (commonLength > 0) {
152      keyOffset = buffer.position();
153      ByteBufferUtils.copyFromBufferToBuffer(buffer, buffer, prevKeyOffset, commonLength);
154    } else {
155      keyOffset = buffer.position();
156    }
157
158    // copy rest of the key and value
159    int len = keyLength - commonLength + valueLength;
160    ByteBufferUtils.copyFromStreamToBuffer(buffer, source, len);
161    return keyOffset;
162  }
163
164  @Override
165  public Cell getFirstKeyCellInBlock(ByteBuff block) {
166    block.mark();
167    block.position(Bytes.SIZEOF_INT);
168    int keyLength = ByteBuff.readCompressedInt(block);
169    // TODO : See if we can avoid these reads as the read values are not getting used
170    ByteBuff.readCompressedInt(block);
171    int commonLength = ByteBuff.readCompressedInt(block);
172    if (commonLength != 0) {
173      throw new AssertionError(
174        "Nonzero common length in the first key in " + "block: " + commonLength);
175    }
176    ByteBuffer key = block.asSubByteBuffer(keyLength).duplicate();
177    block.reset();
178    return createFirstKeyCell(key, keyLength);
179  }
180
181  @Override
182  public String toString() {
183    return PrefixKeyDeltaEncoder.class.getSimpleName();
184  }
185
186  @Override
187  public EncodedSeeker createSeeker(final HFileBlockDecodingContext decodingCtx) {
188    return new SeekerStateBufferedEncodedSeeker(decodingCtx);
189  }
190
191  private static class SeekerStateBufferedEncodedSeeker extends BufferedEncodedSeeker<SeekerState> {
192
193    private SeekerStateBufferedEncodedSeeker(HFileBlockDecodingContext decodingCtx) {
194      super(decodingCtx);
195    }
196
197    @Override
198    protected void decodeNext() {
199      current.keyLength = ByteBuff.readCompressedInt(currentBuffer);
200      current.valueLength = ByteBuff.readCompressedInt(currentBuffer);
201      current.lastCommonPrefix = ByteBuff.readCompressedInt(currentBuffer);
202      current.keyLength += current.lastCommonPrefix;
203      current.ensureSpaceForKey();
204      currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
205        current.keyLength - current.lastCommonPrefix);
206      current.valueOffset = currentBuffer.position();
207      currentBuffer.skip(current.valueLength);
208      if (includesTags()) {
209        decodeTags();
210      }
211      if (includesMvcc()) {
212        current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
213      } else {
214        current.memstoreTS = 0;
215      }
216      current.nextKvOffset = currentBuffer.position();
217    }
218
219    @Override
220    protected void decodeFirst() {
221      currentBuffer.skip(Bytes.SIZEOF_INT);
222      decodeNext();
223    }
224  }
225}