View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations
15   * under the License.
16   */
17  package org.apache.hadoop.hbase.io.encoding;
18  
19  import java.io.DataInputStream;
20  import java.io.DataOutputStream;
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  
24  import org.apache.hadoop.hbase.Cell;
25  import org.apache.hadoop.hbase.CellComparator;
26  import org.apache.hadoop.hbase.CellUtil;
27  import org.apache.hadoop.hbase.KeyValue;
28  import org.apache.hadoop.hbase.KeyValueUtil;
29  import org.apache.hadoop.hbase.classification.InterfaceAudience;
30  import org.apache.hadoop.hbase.nio.ByteBuff;
31  import org.apache.hadoop.hbase.util.ByteBufferUtils;
32  import org.apache.hadoop.hbase.util.Bytes;
33  
34  /**
35   * Compress key by storing size of common prefix with previous KeyValue
36   * and storing raw size of rest.
37   *
38   * Format:
39   * 1-5 bytes: compressed key length minus prefix (7-bit encoding)
40   * 1-5 bytes: compressed value length (7-bit encoding)
41   * 1-3 bytes: compressed length of common key prefix
42   * ... bytes: rest of key (including timestamp)
43   * ... bytes: value
44   *
45   * In a worst case compressed KeyValue will be three bytes longer than original.
46   *
47   */
48  @InterfaceAudience.Private
49  public class PrefixKeyDeltaEncoder extends BufferedDataBlockEncoder {
50  
51    @Override
52    public int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingContext,
53        DataOutputStream out) throws IOException {
54      int klength = KeyValueUtil.keyLength(cell);
55      int vlength = cell.getValueLength();
56      EncodingState state = encodingContext.getEncodingState();
57      if (state.prevCell == null) {
58        // copy the key, there is no common prefix with none
59        ByteBufferUtils.putCompressedInt(out, klength);
60        ByteBufferUtils.putCompressedInt(out, vlength);
61        ByteBufferUtils.putCompressedInt(out, 0);
62        CellUtil.writeFlatKey(cell, out);
63      } else {
64        // find a common prefix and skip it
65        int common = CellUtil.findCommonPrefixInFlatKey(cell, state.prevCell, true, true);
66        ByteBufferUtils.putCompressedInt(out, klength - common);
67        ByteBufferUtils.putCompressedInt(out, vlength);
68        ByteBufferUtils.putCompressedInt(out, common);
69        writeKeyExcludingCommon(cell, common, out);
70      }
71      // Write the value part
72      out.write(cell.getValueArray(), cell.getValueOffset(), vlength);
73      int size = klength + vlength + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE;
74      size += afterEncodingKeyValue(cell, out, encodingContext);
75      state.prevCell = cell;
76      return size;
77    }
78  
79    private void writeKeyExcludingCommon(Cell cell, int commonPrefix, DataOutputStream out)
80        throws IOException {
81      short rLen = cell.getRowLength();
82      if (commonPrefix < rLen + KeyValue.ROW_LENGTH_SIZE) {
83        // Previous and current rows are different. Need to write the differing part followed by
84        // cf,q,ts and type
85        CellUtil.writeRowKeyExcludingCommon(cell, rLen, commonPrefix, out);
86        byte fLen = cell.getFamilyLength();
87        out.writeByte(fLen);
88        out.write(cell.getFamilyArray(), cell.getFamilyOffset(), fLen);
89        out.write(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
90        out.writeLong(cell.getTimestamp());
91        out.writeByte(cell.getTypeByte());
92      } else {
93        // The full row key part is common. CF part will be common for sure as we deal with Cells in
94        // same family. Just need write the differing part in q, ts and type
95        commonPrefix = commonPrefix - (rLen + KeyValue.ROW_LENGTH_SIZE)
96            - (cell.getFamilyLength() + KeyValue.FAMILY_LENGTH_SIZE);
97        int qLen = cell.getQualifierLength();
98        int commonQualPrefix = Math.min(commonPrefix, qLen);
99        int qualPartLenToWrite = qLen - commonQualPrefix;
100       if (qualPartLenToWrite > 0) {
101         out.write(cell.getQualifierArray(), cell.getQualifierOffset() + commonQualPrefix,
102             qualPartLenToWrite);
103       }
104       commonPrefix -= commonQualPrefix;
105       // Common part in TS also?
106       if (commonPrefix > 0) {
107         int commonTimestampPrefix = Math.min(commonPrefix, KeyValue.TIMESTAMP_SIZE);
108         if (commonTimestampPrefix < KeyValue.TIMESTAMP_SIZE) {
109           byte[] curTsBuf = Bytes.toBytes(cell.getTimestamp());
110           out.write(curTsBuf, commonTimestampPrefix, KeyValue.TIMESTAMP_SIZE
111               - commonTimestampPrefix);
112         }
113         commonPrefix -= commonTimestampPrefix;
114         if (commonPrefix == 0) {
115           out.writeByte(cell.getTypeByte());
116         }
117       } else {
118         out.writeLong(cell.getTimestamp());
119         out.writeByte(cell.getTypeByte());
120       }
121     }
122   }
123 
124   @Override
125   protected ByteBuffer internalDecodeKeyValues(DataInputStream source, int allocateHeaderLength,
126       int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx) throws IOException {
127     int decompressedSize = source.readInt();
128     ByteBuffer buffer = ByteBuffer.allocate(decompressedSize +
129         allocateHeaderLength);
130     buffer.position(allocateHeaderLength);
131     int prevKeyOffset = 0;
132 
133     while (source.available() > skipLastBytes) {
134       prevKeyOffset = decodeKeyValue(source, buffer, prevKeyOffset);
135       afterDecodingKeyValue(source, buffer, decodingCtx);
136     }
137 
138     if (source.available() != skipLastBytes) {
139       throw new IllegalStateException("Read too many bytes.");
140     }
141 
142     buffer.limit(buffer.position());
143     return buffer;
144   }
145 
146   private int decodeKeyValue(DataInputStream source, ByteBuffer buffer,
147       int prevKeyOffset)
148           throws IOException, EncoderBufferTooSmallException {
149     int keyLength = ByteBufferUtils.readCompressedInt(source);
150     int valueLength = ByteBufferUtils.readCompressedInt(source);
151     int commonLength = ByteBufferUtils.readCompressedInt(source);
152     int keyOffset;
153     keyLength += commonLength;
154 
155     ensureSpace(buffer, keyLength + valueLength + KeyValue.ROW_OFFSET);
156 
157     buffer.putInt(keyLength);
158     buffer.putInt(valueLength);
159 
160     // copy the prefix
161     if (commonLength > 0) {
162       keyOffset = buffer.position();
163       ByteBufferUtils.copyFromBufferToBuffer(buffer, buffer, prevKeyOffset,
164           commonLength);
165     } else {
166       keyOffset = buffer.position();
167     }
168 
169     // copy rest of the key and value
170     int len = keyLength - commonLength + valueLength;
171     ByteBufferUtils.copyFromStreamToBuffer(buffer, source, len);
172     return keyOffset;
173   }
174 
175   @Override
176   public Cell getFirstKeyCellInBlock(ByteBuff block) {
177     block.mark();
178     block.position(Bytes.SIZEOF_INT);
179     int keyLength = ByteBuff.readCompressedInt(block);
180     // TODO : See if we can avoid these reads as the read values are not getting used
181     ByteBuff.readCompressedInt(block);
182     int commonLength = ByteBuff.readCompressedInt(block);
183     if (commonLength != 0) {
184       throw new AssertionError("Nonzero common length in the first key in "
185           + "block: " + commonLength);
186     }
187     ByteBuffer key = block.asSubByteBuffer(keyLength).duplicate();
188     block.reset();
189     return createFirstKeyCell(key, keyLength);
190   }
191 
192   @Override
193   public String toString() {
194     return PrefixKeyDeltaEncoder.class.getSimpleName();
195   }
196 
197   @Override
198   public EncodedSeeker createSeeker(CellComparator comparator,
199       final HFileBlockDecodingContext decodingCtx) {
200     return new BufferedEncodedSeeker<SeekerState>(comparator, decodingCtx) {
201       @Override
202       protected void decodeNext() {
203         current.keyLength = ByteBuff.readCompressedInt(currentBuffer);
204         current.valueLength = ByteBuff.readCompressedInt(currentBuffer);
205         current.lastCommonPrefix = ByteBuff.readCompressedInt(currentBuffer);
206         current.keyLength += current.lastCommonPrefix;
207         current.ensureSpaceForKey();
208         currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
209             current.keyLength - current.lastCommonPrefix);
210         current.valueOffset = currentBuffer.position();
211         currentBuffer.skip(current.valueLength);
212         if (includesTags()) {
213           decodeTags();
214         }
215         if (includesMvcc()) {
216           current.memstoreTS = ByteBuff.readVLong(currentBuffer);
217         } else {
218           current.memstoreTS = 0;
219         }
220         current.nextKvOffset = currentBuffer.position();
221       }
222 
223       @Override
224       protected void decodeFirst() {
225         currentBuffer.skip(Bytes.SIZEOF_INT);
226         decodeNext();
227       }
228     };
229   }
230 }