View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations
15   * under the License.
16   */
17  package org.apache.hadoop.hbase.io.encoding;
18  
19  import java.io.DataInputStream;
20  import java.io.DataOutputStream;
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  
24  import org.apache.hadoop.classification.InterfaceAudience;
25  import org.apache.hadoop.hbase.KeyValue;
26  import org.apache.hadoop.hbase.KeyValue.KVComparator;
27  import org.apache.hadoop.hbase.util.ByteBufferUtils;
28  import org.apache.hadoop.hbase.util.Bytes;
29  
30  /**
31   * Compress using:
32   * - store size of common prefix
33   * - save column family once, it is same within HFile
34   * - use integer compression for key, value and prefix (7-bit encoding)
35   * - use bits to avoid duplication key length, value length
36   *   and type if it same as previous
37   * - store in 3 bits length of timestamp field
38   * - allow diff in timestamp instead of actual value
39   *
40   * Format:
41   * - 1 byte:    flag
42   * - 1-5 bytes: key length (only if FLAG_SAME_KEY_LENGTH is not set in flag)
43   * - 1-5 bytes: value length (only if FLAG_SAME_VALUE_LENGTH is not set in flag)
44   * - 1-5 bytes: prefix length
45   * - ... bytes: rest of the row (if prefix length is small enough)
46   * - ... bytes: qualifier (or suffix depending on prefix length)
47   * - 1-8 bytes: timestamp or diff
48   * - 1 byte:    type (only if FLAG_SAME_TYPE is not set in the flag)
49   * - ... bytes: value
50   */
51  @InterfaceAudience.Private
52  public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder {
53    static final int FLAG_SAME_KEY_LENGTH = 1;
54    static final int FLAG_SAME_VALUE_LENGTH = 1 << 1;
55    static final int FLAG_SAME_TYPE = 1 << 2;
56    static final int FLAG_TIMESTAMP_IS_DIFF = 1 << 3;
57    static final int MASK_TIMESTAMP_LENGTH = (1 << 4) | (1 << 5) | (1 << 6);
58    static final int SHIFT_TIMESTAMP_LENGTH = 4;
59    static final int FLAG_TIMESTAMP_SIGN = 1 << 7;
60  
61    protected static class DiffCompressionState extends CompressionState {
62      long timestamp;
63      byte[] familyNameWithSize;
64  
65      @Override
66      protected void readTimestamp(ByteBuffer in) {
67        timestamp = in.getLong();
68      }
69  
70      @Override
71      void copyFrom(CompressionState state) {
72        super.copyFrom(state);
73        DiffCompressionState state2 = (DiffCompressionState) state;
74        timestamp = state2.timestamp;
75      }
76    }
77  
78    private void uncompressSingleKeyValue(DataInputStream source,
79        ByteBuffer buffer,
80        DiffCompressionState state)
81            throws IOException, EncoderBufferTooSmallException {
82      // read the column family at the beginning
83      if (state.isFirst()) {
84        state.familyLength = source.readByte();
85        state.familyNameWithSize =
86            new byte[(state.familyLength & 0xff) + KeyValue.FAMILY_LENGTH_SIZE];
87        state.familyNameWithSize[0] = state.familyLength;
88        int read = source.read(state.familyNameWithSize, KeyValue.FAMILY_LENGTH_SIZE,
89            state.familyLength);
90        assert read == state.familyLength;
91      }
92  
93      // read flag
94      byte flag = source.readByte();
95  
96      // read key/value/common lengths
97      int keyLength;
98      int valueLength;
99      if ((flag & FLAG_SAME_KEY_LENGTH) != 0) {
100       keyLength = state.keyLength;
101     } else {
102       keyLength = ByteBufferUtils.readCompressedInt(source);
103     }
104     if ((flag & FLAG_SAME_VALUE_LENGTH) != 0) {
105       valueLength = state.valueLength;
106     } else {
107       valueLength = ByteBufferUtils.readCompressedInt(source);
108     }
109     int commonPrefix = ByteBufferUtils.readCompressedInt(source);
110 
111     // create KeyValue buffer and fill it prefix
112     int keyOffset = buffer.position();
113     ensureSpace(buffer, keyLength + valueLength + KeyValue.ROW_OFFSET);
114     buffer.putInt(keyLength);
115     buffer.putInt(valueLength);
116 
117     // copy common from previous key
118     if (commonPrefix > 0) {
119       ByteBufferUtils.copyFromBufferToBuffer(buffer, buffer, state.prevOffset
120           + KeyValue.ROW_OFFSET, commonPrefix);
121     }
122 
123     // copy the rest of the key from the buffer
124     int keyRestLength;
125     if (state.isFirst() || commonPrefix <
126         state.rowLength + KeyValue.ROW_LENGTH_SIZE) {
127       // omit the family part of the key, it is always the same
128       short rowLength;
129       int rowRestLength;
130 
131       // check length of row
132       if (commonPrefix < KeyValue.ROW_LENGTH_SIZE) {
133         // not yet copied, do it now
134         ByteBufferUtils.copyFromStreamToBuffer(buffer, source,
135             KeyValue.ROW_LENGTH_SIZE - commonPrefix);
136         ByteBufferUtils.skip(buffer, -KeyValue.ROW_LENGTH_SIZE);
137         rowLength = buffer.getShort();
138         rowRestLength = rowLength;
139       } else {
140         // already in buffer, just read it
141         rowLength = buffer.getShort(keyOffset + KeyValue.ROW_OFFSET);
142         rowRestLength = rowLength + KeyValue.ROW_LENGTH_SIZE - commonPrefix;
143       }
144 
145       // copy the rest of row
146       ByteBufferUtils.copyFromStreamToBuffer(buffer, source, rowRestLength);
147       state.rowLength = rowLength;
148 
149       // copy the column family
150       buffer.put(state.familyNameWithSize);
151 
152       keyRestLength = keyLength - rowLength -
153           state.familyNameWithSize.length -
154           (KeyValue.ROW_LENGTH_SIZE + KeyValue.TIMESTAMP_TYPE_SIZE);
155     } else {
156       // prevRowWithSizeLength is the same as on previous row
157       keyRestLength = keyLength - commonPrefix - KeyValue.TIMESTAMP_TYPE_SIZE;
158     }
159     // copy the rest of the key, after column family -> column qualifier
160     ByteBufferUtils.copyFromStreamToBuffer(buffer, source, keyRestLength);
161 
162     // handle timestamp
163     int timestampFitsInBytes =
164         ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH) + 1;
165     long timestamp = ByteBufferUtils.readLong(source, timestampFitsInBytes);
166     if ((flag & FLAG_TIMESTAMP_SIGN) != 0) {
167       timestamp = -timestamp;
168     }
169     if ((flag & FLAG_TIMESTAMP_IS_DIFF) != 0) {
170       timestamp = state.timestamp - timestamp;
171     }
172     buffer.putLong(timestamp);
173 
174     // copy the type field
175     byte type;
176     if ((flag & FLAG_SAME_TYPE) != 0) {
177       type = state.type;
178     } else {
179       type = source.readByte();
180     }
181     buffer.put(type);
182 
183     // copy value part
184     ByteBufferUtils.copyFromStreamToBuffer(buffer, source, valueLength);
185 
186     state.keyLength = keyLength;
187     state.valueLength = valueLength;
188     state.prevOffset = keyOffset;
189     state.timestamp = timestamp;
190     state.type = type;
191     // state.qualifier is unused
192   }
193 
194   @Override
195   public int internalEncode(KeyValue kv, HFileBlockDefaultEncodingContext encodingContext,
196       DataOutputStream out) throws IOException {
197     EncodingState state = encodingContext.getEncodingState();
198     int size = compressSingleKeyValue(out, kv, state.prevKv);
199     size += afterEncodingKeyValue(kv, out, encodingContext);
200     state.prevKv = kv;
201     return size;
202   }
203 
204   private int compressSingleKeyValue(DataOutputStream out, KeyValue kv, KeyValue prevKv)
205       throws IOException {
206     byte flag = 0;
207     int kLength = kv.getKeyLength();
208     int vLength = kv.getValueLength();
209 
210     long timestamp;
211     long diffTimestamp = 0;
212     int diffTimestampFitsInBytes = 0;
213     int timestampFitsInBytes;
214     int commonPrefix;
215     byte[] curKvBuf = kv.getBuffer();
216 
217     if (prevKv == null) {
218       timestamp = kv.getTimestamp();
219       if (timestamp < 0) {
220         flag |= FLAG_TIMESTAMP_SIGN;
221         timestamp = -timestamp;
222       }
223       timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp);
224       flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
225       commonPrefix = 0;
226       // put column family
227       byte familyLength = kv.getFamilyLength();
228       out.write(familyLength);
229       out.write(kv.getFamilyArray(), kv.getFamilyOffset(), familyLength);
230     } else {
231       // Finding common prefix
232       int preKeyLength = prevKv.getKeyLength();
233       commonPrefix = ByteBufferUtils.findCommonPrefix(curKvBuf, kv.getKeyOffset(), kLength
234           - KeyValue.TIMESTAMP_TYPE_SIZE, prevKv.getBuffer(), prevKv.getKeyOffset(), preKeyLength
235           - KeyValue.TIMESTAMP_TYPE_SIZE);
236       if (kLength == preKeyLength) {
237         flag |= FLAG_SAME_KEY_LENGTH;
238       }
239       if (vLength == prevKv.getValueLength()) {
240         flag |= FLAG_SAME_VALUE_LENGTH;
241       }
242       if (kv.getTypeByte() == prevKv.getTypeByte()) {
243         flag |= FLAG_SAME_TYPE;
244       }
245       // don't compress timestamp and type using prefix encode timestamp
246       timestamp = kv.getTimestamp();
247       diffTimestamp = prevKv.getTimestamp() - timestamp;
248       boolean negativeTimestamp = timestamp < 0;
249       if (negativeTimestamp) {
250         timestamp = -timestamp;
251       }
252       timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp);
253       boolean minusDiffTimestamp = diffTimestamp < 0;
254       if (minusDiffTimestamp) {
255         diffTimestamp = -diffTimestamp;
256       }
257       diffTimestampFitsInBytes = ByteBufferUtils.longFitsIn(diffTimestamp);
258       if (diffTimestampFitsInBytes < timestampFitsInBytes) {
259         flag |= (diffTimestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
260         flag |= FLAG_TIMESTAMP_IS_DIFF;
261         if (minusDiffTimestamp) {
262           flag |= FLAG_TIMESTAMP_SIGN;
263         }
264       } else {
265         flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
266         if (negativeTimestamp) {
267           flag |= FLAG_TIMESTAMP_SIGN;
268         }
269       }
270     }
271     out.write(flag);
272     if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
273       ByteBufferUtils.putCompressedInt(out, kLength);
274     }
275     if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
276       ByteBufferUtils.putCompressedInt(out, vLength);
277     }
278     ByteBufferUtils.putCompressedInt(out, commonPrefix);
279     if (prevKv == null || commonPrefix < kv.getRowLength() + KeyValue.ROW_LENGTH_SIZE) {
280       int restRowLength = kv.getRowLength() + KeyValue.ROW_LENGTH_SIZE - commonPrefix;
281       out.write(curKvBuf, kv.getKeyOffset() + commonPrefix, restRowLength);
282       out.write(curKvBuf, kv.getQualifierOffset(), kv.getQualifierLength());
283     } else {
284       out.write(curKvBuf, kv.getKeyOffset() + commonPrefix, kLength - commonPrefix
285           - KeyValue.TIMESTAMP_TYPE_SIZE);
286     }
287     if ((flag & FLAG_TIMESTAMP_IS_DIFF) == 0) {
288       ByteBufferUtils.putLong(out, timestamp, timestampFitsInBytes);
289     } else {
290       ByteBufferUtils.putLong(out, diffTimestamp, diffTimestampFitsInBytes);
291     }
292 
293     if ((flag & FLAG_SAME_TYPE) == 0) {
294       out.write(kv.getTypeByte());
295     }
296     out.write(kv.getValueArray(), kv.getValueOffset(), vLength);
297     return kLength + vLength + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE;
298   }
299 
300   @Override
301   public ByteBuffer getFirstKeyInBlock(ByteBuffer block) {
302     block.mark();
303     block.position(Bytes.SIZEOF_INT);
304     byte familyLength = block.get();
305     ByteBufferUtils.skip(block, familyLength);
306     byte flag = block.get();
307     int keyLength = ByteBufferUtils.readCompressedInt(block);
308     ByteBufferUtils.readCompressedInt(block); // valueLength
309     ByteBufferUtils.readCompressedInt(block); // commonLength
310     ByteBuffer result = ByteBuffer.allocate(keyLength);
311 
312     // copy row
313     int pos = result.arrayOffset();
314     block.get(result.array(), pos, Bytes.SIZEOF_SHORT);
315     pos += Bytes.SIZEOF_SHORT;
316     short rowLength = result.getShort();
317     block.get(result.array(), pos, rowLength);
318     pos += rowLength;
319 
320     // copy family
321     int savePosition = block.position();
322     block.position(Bytes.SIZEOF_INT);
323     block.get(result.array(), pos, familyLength + Bytes.SIZEOF_BYTE);
324     pos += familyLength + Bytes.SIZEOF_BYTE;
325 
326     // copy qualifier
327     block.position(savePosition);
328     int qualifierLength =
329         keyLength - pos + result.arrayOffset() - KeyValue.TIMESTAMP_TYPE_SIZE;
330     block.get(result.array(), pos, qualifierLength);
331     pos += qualifierLength;
332 
333     // copy the timestamp and type
334     int timestampFitInBytes =
335         ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH) + 1;
336     long timestamp = ByteBufferUtils.readLong(block, timestampFitInBytes);
337     if ((flag & FLAG_TIMESTAMP_SIGN) != 0) {
338       timestamp = -timestamp;
339     }
340     result.putLong(pos, timestamp);
341     pos += Bytes.SIZEOF_LONG;
342     block.get(result.array(), pos, Bytes.SIZEOF_BYTE);
343 
344     block.reset();
345     return result;
346   }
347 
348   @Override
349   public String toString() {
350     return DiffKeyDeltaEncoder.class.getSimpleName();
351   }
352 
353   protected static class DiffSeekerState extends SeekerState {
354     private int rowLengthWithSize;
355     private long timestamp;
356 
357     @Override
358     protected void copyFromNext(SeekerState that) {
359       super.copyFromNext(that);
360       DiffSeekerState other = (DiffSeekerState) that;
361       rowLengthWithSize = other.rowLengthWithSize;
362       timestamp = other.timestamp;
363     }
364   }
365 
366   @Override
367   public EncodedSeeker createSeeker(KVComparator comparator,
368       HFileBlockDecodingContext decodingCtx) {
369     return new BufferedEncodedSeeker<DiffSeekerState>(comparator, decodingCtx) {
370       private byte[] familyNameWithSize;
371       private static final int TIMESTAMP_WITH_TYPE_LENGTH =
372           Bytes.SIZEOF_LONG + Bytes.SIZEOF_BYTE;
373 
374       private void decode(boolean isFirst) {
375         byte flag = currentBuffer.get();
376         byte type = 0;
377         if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
378           if (!isFirst) {
379             type = current.keyBuffer[current.keyLength - Bytes.SIZEOF_BYTE];
380           }
381           current.keyLength = ByteBufferUtils.readCompressedInt(currentBuffer);
382         }
383         if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
384           current.valueLength =
385               ByteBufferUtils.readCompressedInt(currentBuffer);
386         }
387         current.lastCommonPrefix =
388             ByteBufferUtils.readCompressedInt(currentBuffer);
389 
390         current.ensureSpaceForKey();
391 
392         if (current.lastCommonPrefix < Bytes.SIZEOF_SHORT) {
393           // length of row is different, copy everything except family
394 
395           // copy the row size
396           currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
397               Bytes.SIZEOF_SHORT - current.lastCommonPrefix);
398           current.rowLengthWithSize = Bytes.toShort(current.keyBuffer, 0) +
399               Bytes.SIZEOF_SHORT;
400 
401           // copy the rest of row
402           currentBuffer.get(current.keyBuffer, Bytes.SIZEOF_SHORT,
403               current.rowLengthWithSize - Bytes.SIZEOF_SHORT);
404 
405           // copy the column family
406           System.arraycopy(familyNameWithSize, 0, current.keyBuffer,
407               current.rowLengthWithSize, familyNameWithSize.length);
408 
409           // copy the qualifier
410           currentBuffer.get(current.keyBuffer,
411               current.rowLengthWithSize + familyNameWithSize.length,
412               current.keyLength - current.rowLengthWithSize -
413               familyNameWithSize.length - TIMESTAMP_WITH_TYPE_LENGTH);
414         } else if (current.lastCommonPrefix < current.rowLengthWithSize) {
415           // we have to copy part of row and qualifier,
416           // but column family is in right place
417 
418           // before column family (rest of row)
419           currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
420               current.rowLengthWithSize - current.lastCommonPrefix);
421 
422           // after column family (qualifier)
423           currentBuffer.get(current.keyBuffer,
424               current.rowLengthWithSize + familyNameWithSize.length,
425               current.keyLength - current.rowLengthWithSize -
426               familyNameWithSize.length - TIMESTAMP_WITH_TYPE_LENGTH);
427         } else {
428           // copy just the ending
429           currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
430               current.keyLength - TIMESTAMP_WITH_TYPE_LENGTH -
431               current.lastCommonPrefix);
432         }
433 
434         // timestamp
435         int pos = current.keyLength - TIMESTAMP_WITH_TYPE_LENGTH;
436         int timestampFitInBytes = 1 +
437             ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH);
438         long timestampOrDiff =
439             ByteBufferUtils.readLong(currentBuffer, timestampFitInBytes);
440         if ((flag & FLAG_TIMESTAMP_SIGN) != 0) {
441           timestampOrDiff = -timestampOrDiff;
442         }
443         if ((flag & FLAG_TIMESTAMP_IS_DIFF) == 0) { // it is timestamp
444           current.timestamp = timestampOrDiff;
445         } else { // it is diff
446           current.timestamp = current.timestamp - timestampOrDiff;
447         }
448         Bytes.putLong(current.keyBuffer, pos, current.timestamp);
449         pos += Bytes.SIZEOF_LONG;
450 
451         // type
452         if ((flag & FLAG_SAME_TYPE) == 0) {
453           currentBuffer.get(current.keyBuffer, pos, Bytes.SIZEOF_BYTE);
454         } else if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
455           current.keyBuffer[pos] = type;
456         }
457 
458         current.valueOffset = currentBuffer.position();
459         ByteBufferUtils.skip(currentBuffer, current.valueLength);
460 
461         if (includesTags()) {
462           decodeTags();
463         }
464         if (includesMvcc()) {
465           current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
466         } else {
467           current.memstoreTS = 0;
468         }
469         current.nextKvOffset = currentBuffer.position();
470       }
471 
472       @Override
473       protected void decodeFirst() {
474         ByteBufferUtils.skip(currentBuffer, Bytes.SIZEOF_INT);
475 
476         // read column family
477         byte familyNameLength = currentBuffer.get();
478         familyNameWithSize = new byte[familyNameLength + Bytes.SIZEOF_BYTE];
479         familyNameWithSize[0] = familyNameLength;
480         currentBuffer.get(familyNameWithSize, Bytes.SIZEOF_BYTE,
481             familyNameLength);
482         decode(true);
483       }
484 
485       @Override
486       protected void decodeNext() {
487         decode(false);
488       }
489 
490       @Override
491       protected DiffSeekerState createSeekerState() {
492         return new DiffSeekerState();
493       }
494     };
495   }
496 
497   @Override
498   protected ByteBuffer internalDecodeKeyValues(DataInputStream source, int allocateHeaderLength,
499       int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx) throws IOException {
500     int decompressedSize = source.readInt();
501     ByteBuffer buffer = ByteBuffer.allocate(decompressedSize +
502         allocateHeaderLength);
503     buffer.position(allocateHeaderLength);
504     DiffCompressionState state = new DiffCompressionState();
505     while (source.available() > skipLastBytes) {
506       uncompressSingleKeyValue(source, buffer, state);
507       afterDecodingKeyValue(source, buffer, decodingCtx);
508     }
509 
510     if (source.available() != skipLastBytes) {
511       throw new IllegalStateException("Read too much bytes.");
512     }
513 
514     return buffer;
515   }
516 }