View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations
15   * under the License.
16   */
17  package org.apache.hadoop.hbase.io.encoding;
18  
19  import java.io.DataInputStream;
20  import java.io.DataOutputStream;
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  
24  import org.apache.hadoop.classification.InterfaceAudience;
25  import org.apache.hadoop.hbase.KeyValue;
26  import org.apache.hadoop.hbase.KeyValue.KVComparator;
27  import org.apache.hadoop.hbase.util.ByteBufferUtils;
28  import org.apache.hadoop.hbase.util.Bytes;
29  
30  /**
31   * Compress key by storing size of common prefix with previous KeyValue
32   * and storing raw size of rest.
33   *
34   * Format:
35   * 1-5 bytes: compressed key length minus prefix (7-bit encoding)
36   * 1-5 bytes: compressed value length (7-bit encoding)
37   * 1-3 bytes: compressed length of common key prefix
38   * ... bytes: rest of key (including timestamp)
39   * ... bytes: value
40   *
41   * In a worst case compressed KeyValue will be three bytes longer than original.
42   *
43   */
44  @InterfaceAudience.Private
45  public class PrefixKeyDeltaEncoder extends BufferedDataBlockEncoder {
46  
47    @Override
48    public int internalEncode(KeyValue kv, HFileBlockDefaultEncodingContext encodingContext,
49        DataOutputStream out) throws IOException {
50      byte[] kvBuf = kv.getBuffer();
51      int klength = kv.getKeyLength();
52      int vlength = kv.getValueLength();
53      EncodingState state = encodingContext.getEncodingState();
54      if (state.prevKv == null) {
55        // copy the key, there is no common prefix with none
56        ByteBufferUtils.putCompressedInt(out, klength);
57        ByteBufferUtils.putCompressedInt(out, vlength);
58        ByteBufferUtils.putCompressedInt(out, 0);
59        out.write(kvBuf, kv.getKeyOffset(), klength + vlength);
60      } else {
61        // find a common prefix and skip it
62        int common = ByteBufferUtils.findCommonPrefix(state.prevKv.getBuffer(),
63            state.prevKv.getKeyOffset(), state.prevKv.getKeyLength(), kvBuf, kv.getKeyOffset(),
64            kv.getKeyLength());
65        ByteBufferUtils.putCompressedInt(out, klength - common);
66        ByteBufferUtils.putCompressedInt(out, vlength);
67        ByteBufferUtils.putCompressedInt(out, common);
68        out.write(kvBuf, kv.getKeyOffset() + common, klength - common + vlength);
69      }
70      int size = klength + vlength + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE;
71      size += afterEncodingKeyValue(kv, out, encodingContext);
72      state.prevKv = kv;
73      return size;
74    }
75  
76    @Override
77    protected ByteBuffer internalDecodeKeyValues(DataInputStream source, int allocateHeaderLength,
78        int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx) throws IOException {
79      int decompressedSize = source.readInt();
80      ByteBuffer buffer = ByteBuffer.allocate(decompressedSize +
81          allocateHeaderLength);
82      buffer.position(allocateHeaderLength);
83      int prevKeyOffset = 0;
84  
85      while (source.available() > skipLastBytes) {
86        prevKeyOffset = decodeKeyValue(source, buffer, prevKeyOffset);
87        afterDecodingKeyValue(source, buffer, decodingCtx);
88      }
89  
90      if (source.available() != skipLastBytes) {
91        throw new IllegalStateException("Read too many bytes.");
92      }
93  
94      buffer.limit(buffer.position());
95      return buffer;
96    }
97  
98    private int decodeKeyValue(DataInputStream source, ByteBuffer buffer,
99        int prevKeyOffset)
100           throws IOException, EncoderBufferTooSmallException {
101     int keyLength = ByteBufferUtils.readCompressedInt(source);
102     int valueLength = ByteBufferUtils.readCompressedInt(source);
103     int commonLength = ByteBufferUtils.readCompressedInt(source);
104     int keyOffset;
105     keyLength += commonLength;
106 
107     ensureSpace(buffer, keyLength + valueLength + KeyValue.ROW_OFFSET);
108 
109     buffer.putInt(keyLength);
110     buffer.putInt(valueLength);
111 
112     // copy the prefix
113     if (commonLength > 0) {
114       keyOffset = buffer.position();
115       ByteBufferUtils.copyFromBufferToBuffer(buffer, buffer, prevKeyOffset,
116           commonLength);
117     } else {
118       keyOffset = buffer.position();
119     }
120 
121     // copy rest of the key and value
122     int len = keyLength - commonLength + valueLength;
123     ByteBufferUtils.copyFromStreamToBuffer(buffer, source, len);
124     return keyOffset;
125   }
126 
127   @Override
128   public ByteBuffer getFirstKeyInBlock(ByteBuffer block) {
129     block.mark();
130     block.position(Bytes.SIZEOF_INT);
131     int keyLength = ByteBufferUtils.readCompressedInt(block);
132     ByteBufferUtils.readCompressedInt(block);
133     int commonLength = ByteBufferUtils.readCompressedInt(block);
134     if (commonLength != 0) {
135       throw new AssertionError("Nonzero common length in the first key in "
136           + "block: " + commonLength);
137     }
138     int pos = block.position();
139     block.reset();
140     return ByteBuffer.wrap(block.array(), block.arrayOffset() + pos, keyLength)
141         .slice();
142   }
143 
144   @Override
145   public String toString() {
146     return PrefixKeyDeltaEncoder.class.getSimpleName();
147   }
148 
149   @Override
150   public EncodedSeeker createSeeker(KVComparator comparator,
151       final HFileBlockDecodingContext decodingCtx) {
152     return new BufferedEncodedSeeker<SeekerState>(comparator, decodingCtx) {
153       @Override
154       protected void decodeNext() {
155         current.keyLength = ByteBufferUtils.readCompressedInt(currentBuffer);
156         current.valueLength = ByteBufferUtils.readCompressedInt(currentBuffer);
157         current.lastCommonPrefix =
158             ByteBufferUtils.readCompressedInt(currentBuffer);
159         current.keyLength += current.lastCommonPrefix;
160         current.ensureSpaceForKey();
161         currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
162             current.keyLength - current.lastCommonPrefix);
163         current.valueOffset = currentBuffer.position();
164         ByteBufferUtils.skip(currentBuffer, current.valueLength);
165         if (includesTags()) {
166           decodeTags();
167         }
168         if (includesMvcc()) {
169           current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
170         } else {
171           current.memstoreTS = 0;
172         }
173         current.nextKvOffset = currentBuffer.position();
174       }
175 
176       @Override
177       protected void decodeFirst() {
178         ByteBufferUtils.skip(currentBuffer, Bytes.SIZEOF_INT);
179         decodeNext();
180       }
181     };
182   }
183 }