001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with this
004 * work for additional information regarding copyright ownership. The ASF
005 * licenses this file to you under the Apache License, Version 2.0 (the
006 * "License"); you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
013 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
014 * License for the specific language governing permissions and limitations
015 * under the License.
016 */
017package org.apache.hadoop.hbase.io.encoding;
018
019import java.io.DataInputStream;
020import java.io.DataOutput;
021import java.io.DataOutputStream;
022import java.io.IOException;
023import java.nio.ByteBuffer;
024import org.apache.hadoop.hbase.Cell;
025import org.apache.hadoop.hbase.KeyValue;
026import org.apache.hadoop.hbase.KeyValueUtil;
027import org.apache.hadoop.hbase.PrivateCellUtil;
028import org.apache.hadoop.hbase.nio.ByteBuff;
029import org.apache.hadoop.hbase.util.ByteBufferUtils;
030import org.apache.hadoop.hbase.util.Bytes;
031import org.apache.yetus.audience.InterfaceAudience;
032
033/**
034 * Compress key by storing size of common prefix with previous KeyValue
035 * and storing raw size of rest.
036 *
037 * Format:
038 * 1-5 bytes: compressed key length minus prefix (7-bit encoding)
039 * 1-5 bytes: compressed value length (7-bit encoding)
040 * 1-3 bytes: compressed length of common key prefix
041 * ... bytes: rest of key (including timestamp)
042 * ... bytes: value
043 *
044 * In a worst case compressed KeyValue will be three bytes longer than original.
045 *
046 */
047@InterfaceAudience.Private
048public class PrefixKeyDeltaEncoder extends BufferedDataBlockEncoder {
049
050  @Override
051  public int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingContext,
052      DataOutputStream out) throws IOException {
053    int klength = KeyValueUtil.keyLength(cell);
054    int vlength = cell.getValueLength();
055    EncodingState state = encodingContext.getEncodingState();
056    if (state.prevCell == null) {
057      // copy the key, there is no common prefix with none
058      ByteBufferUtils.putCompressedInt(out, klength);
059      ByteBufferUtils.putCompressedInt(out, vlength);
060      ByteBufferUtils.putCompressedInt(out, 0);
061      PrivateCellUtil.writeFlatKey(cell, (DataOutput)out);
062    } else {
063      // find a common prefix and skip it
064      int common = PrivateCellUtil.findCommonPrefixInFlatKey(cell, state.prevCell, true, true);
065      ByteBufferUtils.putCompressedInt(out, klength - common);
066      ByteBufferUtils.putCompressedInt(out, vlength);
067      ByteBufferUtils.putCompressedInt(out, common);
068      writeKeyExcludingCommon(cell, common, out);
069    }
070    // Write the value part
071    PrivateCellUtil.writeValue(out, cell, vlength);
072    int size = klength + vlength + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE;
073    size += afterEncodingKeyValue(cell, out, encodingContext);
074    state.prevCell = cell;
075    return size;
076  }
077
078  private void writeKeyExcludingCommon(Cell cell, int commonPrefix, DataOutputStream out)
079      throws IOException {
080    short rLen = cell.getRowLength();
081    if (commonPrefix < rLen + KeyValue.ROW_LENGTH_SIZE) {
082      // Previous and current rows are different. Need to write the differing part followed by
083      // cf,q,ts and type
084      PrivateCellUtil.writeRowKeyExcludingCommon(cell, rLen, commonPrefix, out);
085      byte fLen = cell.getFamilyLength();
086      out.writeByte(fLen);
087      PrivateCellUtil.writeFamily(out, cell, fLen);
088      PrivateCellUtil.writeQualifier(out, cell, cell.getQualifierLength());
089      out.writeLong(cell.getTimestamp());
090      out.writeByte(cell.getTypeByte());
091    } else {
092      // The full row key part is common. CF part will be common for sure as we deal with Cells in
093      // same family. Just need write the differing part in q, ts and type
094      commonPrefix = commonPrefix - (rLen + KeyValue.ROW_LENGTH_SIZE)
095          - (cell.getFamilyLength() + KeyValue.FAMILY_LENGTH_SIZE);
096      int qLen = cell.getQualifierLength();
097      int commonQualPrefix = Math.min(commonPrefix, qLen);
098      int qualPartLenToWrite = qLen - commonQualPrefix;
099      if (qualPartLenToWrite > 0) {
100        PrivateCellUtil.writeQualifierSkippingBytes(out, cell, qLen, commonQualPrefix);
101      }
102      commonPrefix -= commonQualPrefix;
103      // Common part in TS also?
104      if (commonPrefix > 0) {
105        int commonTimestampPrefix = Math.min(commonPrefix, KeyValue.TIMESTAMP_SIZE);
106        if (commonTimestampPrefix < KeyValue.TIMESTAMP_SIZE) {
107          byte[] curTsBuf = Bytes.toBytes(cell.getTimestamp());
108          out.write(curTsBuf, commonTimestampPrefix, KeyValue.TIMESTAMP_SIZE
109              - commonTimestampPrefix);
110        }
111        commonPrefix -= commonTimestampPrefix;
112        if (commonPrefix == 0) {
113          out.writeByte(cell.getTypeByte());
114        }
115      } else {
116        out.writeLong(cell.getTimestamp());
117        out.writeByte(cell.getTypeByte());
118      }
119    }
120  }
121
122  @Override
123  protected ByteBuffer internalDecodeKeyValues(DataInputStream source, int allocateHeaderLength,
124      int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx) throws IOException {
125    int decompressedSize = source.readInt();
126    ByteBuffer buffer = ByteBuffer.allocate(decompressedSize +
127        allocateHeaderLength);
128    buffer.position(allocateHeaderLength);
129    int prevKeyOffset = 0;
130
131    while (source.available() > skipLastBytes) {
132      prevKeyOffset = decodeKeyValue(source, buffer, prevKeyOffset);
133      afterDecodingKeyValue(source, buffer, decodingCtx);
134    }
135
136    if (source.available() != skipLastBytes) {
137      throw new IllegalStateException("Read too many bytes.");
138    }
139
140    buffer.limit(buffer.position());
141    return buffer;
142  }
143
144  private int decodeKeyValue(DataInputStream source, ByteBuffer buffer,
145      int prevKeyOffset)
146          throws IOException, EncoderBufferTooSmallException {
147    int keyLength = ByteBufferUtils.readCompressedInt(source);
148    int valueLength = ByteBufferUtils.readCompressedInt(source);
149    int commonLength = ByteBufferUtils.readCompressedInt(source);
150    int keyOffset;
151    keyLength += commonLength;
152
153    ensureSpace(buffer, keyLength + valueLength + KeyValue.ROW_OFFSET);
154
155    buffer.putInt(keyLength);
156    buffer.putInt(valueLength);
157
158    // copy the prefix
159    if (commonLength > 0) {
160      keyOffset = buffer.position();
161      ByteBufferUtils.copyFromBufferToBuffer(buffer, buffer, prevKeyOffset,
162          commonLength);
163    } else {
164      keyOffset = buffer.position();
165    }
166
167    // copy rest of the key and value
168    int len = keyLength - commonLength + valueLength;
169    ByteBufferUtils.copyFromStreamToBuffer(buffer, source, len);
170    return keyOffset;
171  }
172
173  @Override
174  public Cell getFirstKeyCellInBlock(ByteBuff block) {
175    block.mark();
176    block.position(Bytes.SIZEOF_INT);
177    int keyLength = ByteBuff.readCompressedInt(block);
178    // TODO : See if we can avoid these reads as the read values are not getting used
179    ByteBuff.readCompressedInt(block);
180    int commonLength = ByteBuff.readCompressedInt(block);
181    if (commonLength != 0) {
182      throw new AssertionError("Nonzero common length in the first key in "
183          + "block: " + commonLength);
184    }
185    ByteBuffer key = block.asSubByteBuffer(keyLength).duplicate();
186    block.reset();
187    return createFirstKeyCell(key, keyLength);
188  }
189
190  @Override
191  public String toString() {
192    return PrefixKeyDeltaEncoder.class.getSimpleName();
193  }
194
195  @Override
196  public EncodedSeeker createSeeker(final HFileBlockDecodingContext decodingCtx) {
197    return new SeekerStateBufferedEncodedSeeker(decodingCtx);
198  }
199
200  private static class SeekerStateBufferedEncodedSeeker
201      extends BufferedEncodedSeeker<SeekerState> {
202
203    private SeekerStateBufferedEncodedSeeker(HFileBlockDecodingContext decodingCtx) {
204      super(decodingCtx);
205    }
206
207    @Override
208    protected void decodeNext() {
209      current.keyLength = ByteBuff.readCompressedInt(currentBuffer);
210      current.valueLength = ByteBuff.readCompressedInt(currentBuffer);
211      current.lastCommonPrefix = ByteBuff.readCompressedInt(currentBuffer);
212      current.keyLength += current.lastCommonPrefix;
213      current.ensureSpaceForKey();
214      currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
215          current.keyLength - current.lastCommonPrefix);
216      current.valueOffset = currentBuffer.position();
217      currentBuffer.skip(current.valueLength);
218      if (includesTags()) {
219        decodeTags();
220      }
221      if (includesMvcc()) {
222        current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
223      } else {
224        current.memstoreTS = 0;
225      }
226      current.nextKvOffset = currentBuffer.position();
227    }
228
229    @Override
230    protected void decodeFirst() {
231      currentBuffer.skip(Bytes.SIZEOF_INT);
232      decodeNext();
233    }
234  }
235}