001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with this
004 * work for additional information regarding copyright ownership. The ASF
005 * licenses this file to you under the Apache License, Version 2.0 (the
006 * "License"); you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
013 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
014 * License for the specific language governing permissions and limitations
015 * under the License.
016 */
017package org.apache.hadoop.hbase.io.encoding;
018
019import java.io.DataInputStream;
020import java.io.DataOutput;
021import java.io.DataOutputStream;
022import java.io.IOException;
023import java.nio.ByteBuffer;
024
025import org.apache.hadoop.hbase.Cell;
026import org.apache.hadoop.hbase.CellComparator;
027import org.apache.hadoop.hbase.KeyValue;
028import org.apache.hadoop.hbase.KeyValueUtil;
029import org.apache.hadoop.hbase.PrivateCellUtil;
030import org.apache.hadoop.hbase.nio.ByteBuff;
031import org.apache.hadoop.hbase.util.ByteBufferUtils;
032import org.apache.hadoop.hbase.util.Bytes;
033import org.apache.yetus.audience.InterfaceAudience;
034
035/**
036 * Compress key by storing size of common prefix with previous KeyValue
037 * and storing raw size of rest.
038 *
039 * Format:
040 * 1-5 bytes: compressed key length minus prefix (7-bit encoding)
041 * 1-5 bytes: compressed value length (7-bit encoding)
042 * 1-3 bytes: compressed length of common key prefix
043 * ... bytes: rest of key (including timestamp)
044 * ... bytes: value
045 *
046 * In a worst case compressed KeyValue will be three bytes longer than original.
047 *
048 */
049@InterfaceAudience.Private
050public class PrefixKeyDeltaEncoder extends BufferedDataBlockEncoder {
051
052  @Override
053  public int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingContext,
054      DataOutputStream out) throws IOException {
055    int klength = KeyValueUtil.keyLength(cell);
056    int vlength = cell.getValueLength();
057    EncodingState state = encodingContext.getEncodingState();
058    if (state.prevCell == null) {
059      // copy the key, there is no common prefix with none
060      ByteBufferUtils.putCompressedInt(out, klength);
061      ByteBufferUtils.putCompressedInt(out, vlength);
062      ByteBufferUtils.putCompressedInt(out, 0);
063      PrivateCellUtil.writeFlatKey(cell, (DataOutput)out);
064    } else {
065      // find a common prefix and skip it
066      int common = PrivateCellUtil.findCommonPrefixInFlatKey(cell, state.prevCell, true, true);
067      ByteBufferUtils.putCompressedInt(out, klength - common);
068      ByteBufferUtils.putCompressedInt(out, vlength);
069      ByteBufferUtils.putCompressedInt(out, common);
070      writeKeyExcludingCommon(cell, common, out);
071    }
072    // Write the value part
073    PrivateCellUtil.writeValue(out, cell, vlength);
074    int size = klength + vlength + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE;
075    size += afterEncodingKeyValue(cell, out, encodingContext);
076    state.prevCell = cell;
077    return size;
078  }
079
080  private void writeKeyExcludingCommon(Cell cell, int commonPrefix, DataOutputStream out)
081      throws IOException {
082    short rLen = cell.getRowLength();
083    if (commonPrefix < rLen + KeyValue.ROW_LENGTH_SIZE) {
084      // Previous and current rows are different. Need to write the differing part followed by
085      // cf,q,ts and type
086      PrivateCellUtil.writeRowKeyExcludingCommon(cell, rLen, commonPrefix, out);
087      byte fLen = cell.getFamilyLength();
088      out.writeByte(fLen);
089      PrivateCellUtil.writeFamily(out, cell, fLen);
090      PrivateCellUtil.writeQualifier(out, cell, cell.getQualifierLength());
091      out.writeLong(cell.getTimestamp());
092      out.writeByte(cell.getTypeByte());
093    } else {
094      // The full row key part is common. CF part will be common for sure as we deal with Cells in
095      // same family. Just need write the differing part in q, ts and type
096      commonPrefix = commonPrefix - (rLen + KeyValue.ROW_LENGTH_SIZE)
097          - (cell.getFamilyLength() + KeyValue.FAMILY_LENGTH_SIZE);
098      int qLen = cell.getQualifierLength();
099      int commonQualPrefix = Math.min(commonPrefix, qLen);
100      int qualPartLenToWrite = qLen - commonQualPrefix;
101      if (qualPartLenToWrite > 0) {
102        PrivateCellUtil.writeQualifierSkippingBytes(out, cell, qLen, commonQualPrefix);
103      }
104      commonPrefix -= commonQualPrefix;
105      // Common part in TS also?
106      if (commonPrefix > 0) {
107        int commonTimestampPrefix = Math.min(commonPrefix, KeyValue.TIMESTAMP_SIZE);
108        if (commonTimestampPrefix < KeyValue.TIMESTAMP_SIZE) {
109          byte[] curTsBuf = Bytes.toBytes(cell.getTimestamp());
110          out.write(curTsBuf, commonTimestampPrefix, KeyValue.TIMESTAMP_SIZE
111              - commonTimestampPrefix);
112        }
113        commonPrefix -= commonTimestampPrefix;
114        if (commonPrefix == 0) {
115          out.writeByte(cell.getTypeByte());
116        }
117      } else {
118        out.writeLong(cell.getTimestamp());
119        out.writeByte(cell.getTypeByte());
120      }
121    }
122  }
123
124  @Override
125  protected ByteBuffer internalDecodeKeyValues(DataInputStream source, int allocateHeaderLength,
126      int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx) throws IOException {
127    int decompressedSize = source.readInt();
128    ByteBuffer buffer = ByteBuffer.allocate(decompressedSize +
129        allocateHeaderLength);
130    buffer.position(allocateHeaderLength);
131    int prevKeyOffset = 0;
132
133    while (source.available() > skipLastBytes) {
134      prevKeyOffset = decodeKeyValue(source, buffer, prevKeyOffset);
135      afterDecodingKeyValue(source, buffer, decodingCtx);
136    }
137
138    if (source.available() != skipLastBytes) {
139      throw new IllegalStateException("Read too many bytes.");
140    }
141
142    buffer.limit(buffer.position());
143    return buffer;
144  }
145
146  private int decodeKeyValue(DataInputStream source, ByteBuffer buffer,
147      int prevKeyOffset)
148          throws IOException, EncoderBufferTooSmallException {
149    int keyLength = ByteBufferUtils.readCompressedInt(source);
150    int valueLength = ByteBufferUtils.readCompressedInt(source);
151    int commonLength = ByteBufferUtils.readCompressedInt(source);
152    int keyOffset;
153    keyLength += commonLength;
154
155    ensureSpace(buffer, keyLength + valueLength + KeyValue.ROW_OFFSET);
156
157    buffer.putInt(keyLength);
158    buffer.putInt(valueLength);
159
160    // copy the prefix
161    if (commonLength > 0) {
162      keyOffset = buffer.position();
163      ByteBufferUtils.copyFromBufferToBuffer(buffer, buffer, prevKeyOffset,
164          commonLength);
165    } else {
166      keyOffset = buffer.position();
167    }
168
169    // copy rest of the key and value
170    int len = keyLength - commonLength + valueLength;
171    ByteBufferUtils.copyFromStreamToBuffer(buffer, source, len);
172    return keyOffset;
173  }
174
175  @Override
176  public Cell getFirstKeyCellInBlock(ByteBuff block) {
177    block.mark();
178    block.position(Bytes.SIZEOF_INT);
179    int keyLength = ByteBuff.readCompressedInt(block);
180    // TODO : See if we can avoid these reads as the read values are not getting used
181    ByteBuff.readCompressedInt(block);
182    int commonLength = ByteBuff.readCompressedInt(block);
183    if (commonLength != 0) {
184      throw new AssertionError("Nonzero common length in the first key in "
185          + "block: " + commonLength);
186    }
187    ByteBuffer key = block.asSubByteBuffer(keyLength).duplicate();
188    block.reset();
189    return createFirstKeyCell(key, keyLength);
190  }
191
192  @Override
193  public String toString() {
194    return PrefixKeyDeltaEncoder.class.getSimpleName();
195  }
196
197  @Override
198  public EncodedSeeker createSeeker(CellComparator comparator,
199      final HFileBlockDecodingContext decodingCtx) {
200    return new BufferedEncodedSeeker<SeekerState>(comparator, decodingCtx) {
201      @Override
202      protected void decodeNext() {
203        current.keyLength = ByteBuff.readCompressedInt(currentBuffer);
204        current.valueLength = ByteBuff.readCompressedInt(currentBuffer);
205        current.lastCommonPrefix = ByteBuff.readCompressedInt(currentBuffer);
206        current.keyLength += current.lastCommonPrefix;
207        current.ensureSpaceForKey();
208        currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
209            current.keyLength - current.lastCommonPrefix);
210        current.valueOffset = currentBuffer.position();
211        currentBuffer.skip(current.valueLength);
212        if (includesTags()) {
213          decodeTags();
214        }
215        if (includesMvcc()) {
216          current.memstoreTS = ByteBuff.readVLong(currentBuffer);
217        } else {
218          current.memstoreTS = 0;
219        }
220        current.nextKvOffset = currentBuffer.position();
221      }
222
223      @Override
224      protected void decodeFirst() {
225        currentBuffer.skip(Bytes.SIZEOF_INT);
226        decodeNext();
227      }
228    };
229  }
230}