View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations
15   * under the License.
16   */
17  package org.apache.hadoop.hbase.io.encoding;
18  
19  import java.io.DataInputStream;
20  import java.io.DataOutputStream;
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  
24  import org.apache.hadoop.hbase.classification.InterfaceAudience;
25  import org.apache.hadoop.hbase.Cell;
26  import org.apache.hadoop.hbase.CellUtil;
27  import org.apache.hadoop.hbase.KeyValue;
28  import org.apache.hadoop.hbase.KeyValue.KVComparator;
29  import org.apache.hadoop.hbase.KeyValueUtil;
30  import org.apache.hadoop.hbase.util.ByteBufferUtils;
31  import org.apache.hadoop.hbase.util.Bytes;
32  
33  /**
34   * Compress key by storing size of common prefix with previous KeyValue
35   * and storing raw size of rest.
36   *
37   * Format:
38   * 1-5 bytes: compressed key length minus prefix (7-bit encoding)
39   * 1-5 bytes: compressed value length (7-bit encoding)
40   * 1-3 bytes: compressed length of common key prefix
41   * ... bytes: rest of key (including timestamp)
42   * ... bytes: value
43   *
44   * In a worst case compressed KeyValue will be three bytes longer than original.
45   *
46   */
47  @InterfaceAudience.Private
48  public class PrefixKeyDeltaEncoder extends BufferedDataBlockEncoder {
49  
50    @Override
51    public int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingContext,
52        DataOutputStream out) throws IOException {
53      int klength = KeyValueUtil.keyLength(cell);
54      int vlength = cell.getValueLength();
55      EncodingState state = encodingContext.getEncodingState();
56      if (state.prevCell == null) {
57        // copy the key, there is no common prefix with none
58        ByteBufferUtils.putCompressedInt(out, klength);
59        ByteBufferUtils.putCompressedInt(out, vlength);
60        ByteBufferUtils.putCompressedInt(out, 0);
61        CellUtil.writeFlatKey(cell, out);
62      } else {
63        // find a common prefix and skip it
64        int common = CellUtil.findCommonPrefixInFlatKey(cell, state.prevCell, true, true);
65        ByteBufferUtils.putCompressedInt(out, klength - common);
66        ByteBufferUtils.putCompressedInt(out, vlength);
67        ByteBufferUtils.putCompressedInt(out, common);
68        writeKeyExcludingCommon(cell, common, out);
69      }
70      // Write the value part
71      out.write(cell.getValueArray(), cell.getValueOffset(), vlength);
72      int size = klength + vlength + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE;
73      size += afterEncodingKeyValue(cell, out, encodingContext);
74      state.prevCell = cell;
75      return size;
76    }
77  
78    private void writeKeyExcludingCommon(Cell cell, int commonPrefix, DataOutputStream out)
79        throws IOException {
80      short rLen = cell.getRowLength();
81      if (commonPrefix < rLen + KeyValue.ROW_LENGTH_SIZE) {
82        // Previous and current rows are different. Need to write the differing part followed by
83        // cf,q,ts and type
84        CellUtil.writeRowKeyExcludingCommon(cell, rLen, commonPrefix, out);
85        byte fLen = cell.getFamilyLength();
86        out.writeByte(fLen);
87        out.write(cell.getFamilyArray(), cell.getFamilyOffset(), fLen);
88        out.write(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
89        out.writeLong(cell.getTimestamp());
90        out.writeByte(cell.getTypeByte());
91      } else {
92        // The full row key part is common. CF part will be common for sure as we deal with Cells in
93        // same family. Just need write the differing part in q, ts and type
94        commonPrefix = commonPrefix - (rLen + KeyValue.ROW_LENGTH_SIZE)
95            - (cell.getFamilyLength() + KeyValue.FAMILY_LENGTH_SIZE);
96        int qLen = cell.getQualifierLength();
97        int commonQualPrefix = Math.min(commonPrefix, qLen);
98        int qualPartLenToWrite = qLen - commonQualPrefix;
99        if (qualPartLenToWrite > 0) {
100         out.write(cell.getQualifierArray(), cell.getQualifierOffset() + commonQualPrefix,
101             qualPartLenToWrite);
102       }
103       commonPrefix -= commonQualPrefix;
104       // Common part in TS also?
105       if (commonPrefix > 0) {
106         int commonTimestampPrefix = Math.min(commonPrefix, KeyValue.TIMESTAMP_SIZE);
107         if (commonTimestampPrefix < KeyValue.TIMESTAMP_SIZE) {
108           byte[] curTsBuf = Bytes.toBytes(cell.getTimestamp());
109           out.write(curTsBuf, commonTimestampPrefix, KeyValue.TIMESTAMP_SIZE
110               - commonTimestampPrefix);
111         }
112         commonPrefix -= commonTimestampPrefix;
113         if (commonPrefix == 0) {
114           out.writeByte(cell.getTypeByte());
115         }
116       } else {
117         out.writeLong(cell.getTimestamp());
118         out.writeByte(cell.getTypeByte());
119       }
120     }
121   }
122 
123   @Override
124   protected ByteBuffer internalDecodeKeyValues(DataInputStream source, int allocateHeaderLength,
125       int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx) throws IOException {
126     int decompressedSize = source.readInt();
127     ByteBuffer buffer = ByteBuffer.allocate(decompressedSize +
128         allocateHeaderLength);
129     buffer.position(allocateHeaderLength);
130     int prevKeyOffset = 0;
131 
132     while (source.available() > skipLastBytes) {
133       prevKeyOffset = decodeKeyValue(source, buffer, prevKeyOffset);
134       afterDecodingKeyValue(source, buffer, decodingCtx);
135     }
136 
137     if (source.available() != skipLastBytes) {
138       throw new IllegalStateException("Read too many bytes.");
139     }
140 
141     buffer.limit(buffer.position());
142     return buffer;
143   }
144 
145   private int decodeKeyValue(DataInputStream source, ByteBuffer buffer,
146       int prevKeyOffset)
147           throws IOException, EncoderBufferTooSmallException {
148     int keyLength = ByteBufferUtils.readCompressedInt(source);
149     int valueLength = ByteBufferUtils.readCompressedInt(source);
150     int commonLength = ByteBufferUtils.readCompressedInt(source);
151     int keyOffset;
152     keyLength += commonLength;
153 
154     ensureSpace(buffer, keyLength + valueLength + KeyValue.ROW_OFFSET);
155 
156     buffer.putInt(keyLength);
157     buffer.putInt(valueLength);
158 
159     // copy the prefix
160     if (commonLength > 0) {
161       keyOffset = buffer.position();
162       ByteBufferUtils.copyFromBufferToBuffer(buffer, buffer, prevKeyOffset,
163           commonLength);
164     } else {
165       keyOffset = buffer.position();
166     }
167 
168     // copy rest of the key and value
169     int len = keyLength - commonLength + valueLength;
170     ByteBufferUtils.copyFromStreamToBuffer(buffer, source, len);
171     return keyOffset;
172   }
173 
174   @Override
175   public ByteBuffer getFirstKeyInBlock(ByteBuffer block) {
176     block.mark();
177     block.position(Bytes.SIZEOF_INT);
178     int keyLength = ByteBufferUtils.readCompressedInt(block);
179     ByteBufferUtils.readCompressedInt(block);
180     int commonLength = ByteBufferUtils.readCompressedInt(block);
181     if (commonLength != 0) {
182       throw new AssertionError("Nonzero common length in the first key in "
183           + "block: " + commonLength);
184     }
185     int pos = block.position();
186     block.reset();
187     ByteBuffer dup = block.duplicate();
188     dup.position(pos);
189     dup.limit(pos + keyLength);
190     return dup.slice();
191   }
192 
193   @Override
194   public String toString() {
195     return PrefixKeyDeltaEncoder.class.getSimpleName();
196   }
197 
198   @Override
199   public EncodedSeeker createSeeker(KVComparator comparator,
200       final HFileBlockDecodingContext decodingCtx) {
201     return new BufferedEncodedSeeker<SeekerState>(comparator, decodingCtx) {
202       @Override
203       protected void decodeNext() {
204         current.keyLength = ByteBufferUtils.readCompressedInt(currentBuffer);
205         current.valueLength = ByteBufferUtils.readCompressedInt(currentBuffer);
206         current.lastCommonPrefix =
207             ByteBufferUtils.readCompressedInt(currentBuffer);
208         current.keyLength += current.lastCommonPrefix;
209         current.ensureSpaceForKey();
210         currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
211             current.keyLength - current.lastCommonPrefix);
212         current.valueOffset = currentBuffer.position();
213         ByteBufferUtils.skip(currentBuffer, current.valueLength);
214         if (includesTags()) {
215           decodeTags();
216         }
217         if (includesMvcc()) {
218           current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
219         } else {
220           current.memstoreTS = 0;
221         }
222         current.nextKvOffset = currentBuffer.position();
223       }
224 
225       @Override
226       protected void decodeFirst() {
227         ByteBufferUtils.skip(currentBuffer, Bytes.SIZEOF_INT);
228         decodeNext();
229       }
230     };
231   }
232 }