View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations
15   * under the License.
16   */
17  package org.apache.hadoop.hbase.io.encoding;
18  
19  import java.io.DataInputStream;
20  import java.io.DataOutputStream;
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  
24  import org.apache.hadoop.hbase.KeyValue;
25  import org.apache.hadoop.hbase.util.ByteBufferUtils;
26  import org.apache.hadoop.hbase.util.Bytes;
27  import org.apache.hadoop.io.RawComparator;
28  
29  import org.apache.hadoop.classification.InterfaceAudience;
30  
31  /**
32   * Compress key by storing size of common prefix with previous KeyValue
33   * and storing raw size of rest.
34   *
35   * Format:
36   * 1-5 bytes: compressed key length minus prefix (7-bit encoding)
37   * 1-5 bytes: compressed value length (7-bit encoding)
38   * 1-3 bytes: compressed length of common key prefix
39   * ... bytes: rest of key (including timestamp)
40   * ... bytes: value
41   *
42   * In a worst case compressed KeyValue will be three bytes longer than original.
43   *
44   */
45  @InterfaceAudience.Private
46  public class PrefixKeyDeltaEncoder extends BufferedDataBlockEncoder {
47  
48    private int addKV(int prevKeyOffset, DataOutputStream out,
49        ByteBuffer in, int prevKeyLength) throws IOException {
50      int keyLength = in.getInt();
51      int valueLength = in.getInt();
52  
53      if (prevKeyOffset == -1) {
54        // copy the key, there is no common prefix with none
55        ByteBufferUtils.putCompressedInt(out, keyLength);
56        ByteBufferUtils.putCompressedInt(out, valueLength);
57        ByteBufferUtils.putCompressedInt(out, 0);
58        ByteBufferUtils.moveBufferToStream(out, in, keyLength + valueLength);
59      } else {
60        // find a common prefix and skip it
61        int common = ByteBufferUtils.findCommonPrefix(
62            in, prevKeyOffset + KeyValue.ROW_OFFSET,
63            in.position(),
64            Math.min(prevKeyLength, keyLength));
65  
66        ByteBufferUtils.putCompressedInt(out, keyLength - common);
67        ByteBufferUtils.putCompressedInt(out, valueLength);
68        ByteBufferUtils.putCompressedInt(out, common);
69  
70        ByteBufferUtils.skip(in, common);
71        ByteBufferUtils.moveBufferToStream(out, in, keyLength - common
72            + valueLength);
73      }
74  
75      return keyLength;
76    }
77  
78    @Override
79    public void compressKeyValues(DataOutputStream writeHere,
80        ByteBuffer in, boolean includesMemstoreTS) throws IOException {
81      in.rewind();
82      ByteBufferUtils.putInt(writeHere, in.limit());
83      int prevOffset = -1;
84      int offset = 0;
85      int keyLength = 0;
86      while (in.hasRemaining()) {
87        offset = in.position();
88        keyLength = addKV(prevOffset, writeHere, in, keyLength);
89        afterEncodingKeyValue(in, writeHere, includesMemstoreTS);
90        prevOffset = offset;
91      }
92    }
93  
94    @Override
95    public ByteBuffer uncompressKeyValues(DataInputStream source,
96        int allocHeaderLength, int skipLastBytes, boolean includesMemstoreTS)
97            throws IOException {
98      int decompressedSize = source.readInt();
99      ByteBuffer buffer = ByteBuffer.allocate(decompressedSize +
100         allocHeaderLength);
101     buffer.position(allocHeaderLength);
102     int prevKeyOffset = 0;
103 
104     while (source.available() > skipLastBytes) {
105       prevKeyOffset = uncompressKeyValue(source, buffer, prevKeyOffset);
106       afterDecodingKeyValue(source, buffer, includesMemstoreTS);
107     }
108 
109     if (source.available() != skipLastBytes) {
110       throw new IllegalStateException("Read too many bytes.");
111     }
112 
113     buffer.limit(buffer.position());
114     return buffer;
115   }
116 
117   private int uncompressKeyValue(DataInputStream source, ByteBuffer buffer,
118       int prevKeyOffset)
119           throws IOException, EncoderBufferTooSmallException {
120     int keyLength = ByteBufferUtils.readCompressedInt(source);
121     int valueLength = ByteBufferUtils.readCompressedInt(source);
122     int commonLength = ByteBufferUtils.readCompressedInt(source);
123     int keyOffset;
124     keyLength += commonLength;
125 
126     ByteBufferUtils.ensureSpace(buffer, keyLength + valueLength
127         + KeyValue.ROW_OFFSET);
128 
129     buffer.putInt(keyLength);
130     buffer.putInt(valueLength);
131 
132     // copy the prefix
133     if (commonLength > 0) {
134       keyOffset = buffer.position();
135       ByteBufferUtils.copyFromBufferToBuffer(buffer, buffer, prevKeyOffset,
136           commonLength);
137     } else {
138       keyOffset = buffer.position();
139     }
140 
141     // copy rest of the key and value
142     int len = keyLength - commonLength + valueLength;
143     ByteBufferUtils.copyFromStreamToBuffer(buffer, source, len);
144     return keyOffset;
145   }
146 
147   @Override
148   public ByteBuffer getFirstKeyInBlock(ByteBuffer block) {
149     block.mark();
150     block.position(Bytes.SIZEOF_INT);
151     int keyLength = ByteBufferUtils.readCompressedInt(block);
152     ByteBufferUtils.readCompressedInt(block);
153     int commonLength = ByteBufferUtils.readCompressedInt(block);
154     if (commonLength != 0) {
155       throw new AssertionError("Nonzero common length in the first key in "
156           + "block: " + commonLength);
157     }
158     int pos = block.position();
159     block.reset();
160     return ByteBuffer.wrap(block.array(), block.arrayOffset() + pos, keyLength).slice();
161   }
162 
163   @Override
164   public String toString() {
165     return PrefixKeyDeltaEncoder.class.getSimpleName();
166   }
167 
168   @Override
169   public EncodedSeeker createSeeker(RawComparator<byte[]> comparator,
170       final boolean includesMemstoreTS) {
171     return new BufferedEncodedSeeker<SeekerState>(comparator) {
172       @Override
173       protected void decodeNext() {
174         current.keyLength = ByteBufferUtils.readCompressedInt(currentBuffer);
175         current.valueLength = ByteBufferUtils.readCompressedInt(currentBuffer);
176         current.lastCommonPrefix =
177             ByteBufferUtils.readCompressedInt(currentBuffer);
178         current.keyLength += current.lastCommonPrefix;
179         current.ensureSpaceForKey();
180         currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
181             current.keyLength - current.lastCommonPrefix);
182         current.valueOffset = currentBuffer.position();
183         ByteBufferUtils.skip(currentBuffer, current.valueLength);
184         if (includesMemstoreTS) {
185           current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
186         } else {
187           current.memstoreTS = 0;
188         }
189         current.nextKvOffset = currentBuffer.position();
190       }
191 
192       @Override
193       protected void decodeFirst() {
194         ByteBufferUtils.skip(currentBuffer, Bytes.SIZEOF_INT);
195         decodeNext();
196       }
197     };
198   }
199 }