View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations
15   * under the License.
16   */
17  package org.apache.hadoop.hbase.io.encoding;
18  
19  import java.io.DataInputStream;
20  import java.io.DataOutputStream;
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  
24  import org.apache.hadoop.hbase.Cell;
25  import org.apache.hadoop.hbase.CellComparator;
26  import org.apache.hadoop.hbase.CellUtil;
27  import org.apache.hadoop.hbase.KeyValue;
28  import org.apache.hadoop.hbase.KeyValueUtil;
29  import org.apache.hadoop.hbase.classification.InterfaceAudience;
30  import org.apache.hadoop.hbase.nio.ByteBuff;
31  import org.apache.hadoop.hbase.util.ByteBufferUtils;
32  import org.apache.hadoop.hbase.util.Bytes;
33  import org.apache.hadoop.hbase.util.Pair;
34  
35  /**
36   * Compress using:
37   * - store size of common prefix
38   * - save column family once, it is same within HFile
39   * - use integer compression for key, value and prefix (7-bit encoding)
40   * - use bits to avoid duplication key length, value length
41   *   and type if it same as previous
42   * - store in 3 bits length of timestamp field
43   * - allow diff in timestamp instead of actual value
44   *
45   * Format:
46   * - 1 byte:    flag
47   * - 1-5 bytes: key length (only if FLAG_SAME_KEY_LENGTH is not set in flag)
48   * - 1-5 bytes: value length (only if FLAG_SAME_VALUE_LENGTH is not set in flag)
49   * - 1-5 bytes: prefix length
50   * - ... bytes: rest of the row (if prefix length is small enough)
51   * - ... bytes: qualifier (or suffix depending on prefix length)
52   * - 1-8 bytes: timestamp or diff
53   * - 1 byte:    type (only if FLAG_SAME_TYPE is not set in the flag)
54   * - ... bytes: value
55   */
56  @InterfaceAudience.Private
57  public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder {
58    static final int FLAG_SAME_KEY_LENGTH = 1;
59    static final int FLAG_SAME_VALUE_LENGTH = 1 << 1;
60    static final int FLAG_SAME_TYPE = 1 << 2;
61    static final int FLAG_TIMESTAMP_IS_DIFF = 1 << 3;
62    static final int MASK_TIMESTAMP_LENGTH = (1 << 4) | (1 << 5) | (1 << 6);
63    static final int SHIFT_TIMESTAMP_LENGTH = 4;
64    static final int FLAG_TIMESTAMP_SIGN = 1 << 7;
65  
66    protected static class DiffCompressionState extends CompressionState {
67      long timestamp;
68      byte[] familyNameWithSize;
69  
70      @Override
71      protected void readTimestamp(ByteBuffer in) {
72        timestamp = in.getLong();
73      }
74  
75      @Override
76      void copyFrom(CompressionState state) {
77        super.copyFrom(state);
78        DiffCompressionState state2 = (DiffCompressionState) state;
79        timestamp = state2.timestamp;
80      }
81    }
82  
83    private void uncompressSingleKeyValue(DataInputStream source,
84        ByteBuffer buffer,
85        DiffCompressionState state)
86            throws IOException, EncoderBufferTooSmallException {
87      // read the column family at the beginning
88      if (state.isFirst()) {
89        state.familyLength = source.readByte();
90        state.familyNameWithSize =
91            new byte[(state.familyLength & 0xff) + KeyValue.FAMILY_LENGTH_SIZE];
92        state.familyNameWithSize[0] = state.familyLength;
93        int read = source.read(state.familyNameWithSize, KeyValue.FAMILY_LENGTH_SIZE,
94            state.familyLength);
95        assert read == state.familyLength;
96      }
97  
98      // read flag
99      byte flag = source.readByte();
100 
101     // read key/value/common lengths
102     int keyLength;
103     int valueLength;
104     if ((flag & FLAG_SAME_KEY_LENGTH) != 0) {
105       keyLength = state.keyLength;
106     } else {
107       keyLength = ByteBufferUtils.readCompressedInt(source);
108     }
109     if ((flag & FLAG_SAME_VALUE_LENGTH) != 0) {
110       valueLength = state.valueLength;
111     } else {
112       valueLength = ByteBufferUtils.readCompressedInt(source);
113     }
114     int commonPrefix = ByteBufferUtils.readCompressedInt(source);
115 
116     // create KeyValue buffer and fill it prefix
117     int keyOffset = buffer.position();
118     ensureSpace(buffer, keyLength + valueLength + KeyValue.ROW_OFFSET);
119     buffer.putInt(keyLength);
120     buffer.putInt(valueLength);
121 
122     // copy common from previous key
123     if (commonPrefix > 0) {
124       ByteBufferUtils.copyFromBufferToBuffer(buffer, buffer, state.prevOffset
125           + KeyValue.ROW_OFFSET, commonPrefix);
126     }
127 
128     // copy the rest of the key from the buffer
129     int keyRestLength;
130     if (state.isFirst() || commonPrefix <
131         state.rowLength + KeyValue.ROW_LENGTH_SIZE) {
132       // omit the family part of the key, it is always the same
133       short rowLength;
134       int rowRestLength;
135 
136       // check length of row
137       if (commonPrefix < KeyValue.ROW_LENGTH_SIZE) {
138         // not yet copied, do it now
139         ByteBufferUtils.copyFromStreamToBuffer(buffer, source,
140             KeyValue.ROW_LENGTH_SIZE - commonPrefix);
141         ByteBufferUtils.skip(buffer, -KeyValue.ROW_LENGTH_SIZE);
142         rowLength = buffer.getShort();
143         rowRestLength = rowLength;
144       } else {
145         // already in buffer, just read it
146         rowLength = buffer.getShort(keyOffset + KeyValue.ROW_OFFSET);
147         rowRestLength = rowLength + KeyValue.ROW_LENGTH_SIZE - commonPrefix;
148       }
149 
150       // copy the rest of row
151       ByteBufferUtils.copyFromStreamToBuffer(buffer, source, rowRestLength);
152       state.rowLength = rowLength;
153 
154       // copy the column family
155       buffer.put(state.familyNameWithSize);
156 
157       keyRestLength = keyLength - rowLength -
158           state.familyNameWithSize.length -
159           (KeyValue.ROW_LENGTH_SIZE + KeyValue.TIMESTAMP_TYPE_SIZE);
160     } else {
161       // prevRowWithSizeLength is the same as on previous row
162       keyRestLength = keyLength - commonPrefix - KeyValue.TIMESTAMP_TYPE_SIZE;
163     }
164     // copy the rest of the key, after column family -> column qualifier
165     ByteBufferUtils.copyFromStreamToBuffer(buffer, source, keyRestLength);
166 
167     // handle timestamp
168     int timestampFitsInBytes =
169         ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH) + 1;
170     long timestamp = ByteBufferUtils.readLong(source, timestampFitsInBytes);
171     if ((flag & FLAG_TIMESTAMP_SIGN) != 0) {
172       timestamp = -timestamp;
173     }
174     if ((flag & FLAG_TIMESTAMP_IS_DIFF) != 0) {
175       timestamp = state.timestamp - timestamp;
176     }
177     buffer.putLong(timestamp);
178 
179     // copy the type field
180     byte type;
181     if ((flag & FLAG_SAME_TYPE) != 0) {
182       type = state.type;
183     } else {
184       type = source.readByte();
185     }
186     buffer.put(type);
187 
188     // copy value part
189     ByteBufferUtils.copyFromStreamToBuffer(buffer, source, valueLength);
190 
191     state.keyLength = keyLength;
192     state.valueLength = valueLength;
193     state.prevOffset = keyOffset;
194     state.timestamp = timestamp;
195     state.type = type;
196     // state.qualifier is unused
197   }
198 
199   @Override
200   public int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingContext,
201       DataOutputStream out) throws IOException {
202     EncodingState state = encodingContext.getEncodingState();
203     int size = compressSingleKeyValue(out, cell, state.prevCell);
204     size += afterEncodingKeyValue(cell, out, encodingContext);
205     state.prevCell = cell;
206     return size;
207   }
208 
209   private int compressSingleKeyValue(DataOutputStream out, Cell cell, Cell prevCell)
210       throws IOException {
211     byte flag = 0;
212     int kLength = KeyValueUtil.keyLength(cell);
213     int vLength = cell.getValueLength();
214 
215     long timestamp;
216     long diffTimestamp = 0;
217     int diffTimestampFitsInBytes = 0;
218     int timestampFitsInBytes;
219     int commonPrefix = 0;
220 
221     if (prevCell == null) {
222       timestamp = cell.getTimestamp();
223       if (timestamp < 0) {
224         flag |= FLAG_TIMESTAMP_SIGN;
225         timestamp = -timestamp;
226       }
227       timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp);
228       flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
229       // put column family
230       byte familyLength = cell.getFamilyLength();
231       out.write(familyLength);
232       out.write(cell.getFamilyArray(), cell.getFamilyOffset(), familyLength);
233     } else {
234       // Finding common prefix
235       int preKeyLength = KeyValueUtil.keyLength(prevCell);
236       commonPrefix = CellUtil.findCommonPrefixInFlatKey(cell, prevCell, true, false);
237       if (kLength == preKeyLength) {
238         flag |= FLAG_SAME_KEY_LENGTH;
239       }
240       if (vLength == prevCell.getValueLength()) {
241         flag |= FLAG_SAME_VALUE_LENGTH;
242       }
243       if (cell.getTypeByte() == prevCell.getTypeByte()) {
244         flag |= FLAG_SAME_TYPE;
245       }
246       // don't compress timestamp and type using prefix encode timestamp
247       timestamp = cell.getTimestamp();
248       diffTimestamp = prevCell.getTimestamp() - timestamp;
249       boolean negativeTimestamp = timestamp < 0;
250       if (negativeTimestamp) {
251         timestamp = -timestamp;
252       }
253       timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp);
254       boolean minusDiffTimestamp = diffTimestamp < 0;
255       if (minusDiffTimestamp) {
256         diffTimestamp = -diffTimestamp;
257       }
258       diffTimestampFitsInBytes = ByteBufferUtils.longFitsIn(diffTimestamp);
259       if (diffTimestampFitsInBytes < timestampFitsInBytes) {
260         flag |= (diffTimestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
261         flag |= FLAG_TIMESTAMP_IS_DIFF;
262         if (minusDiffTimestamp) {
263           flag |= FLAG_TIMESTAMP_SIGN;
264         }
265       } else {
266         flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
267         if (negativeTimestamp) {
268           flag |= FLAG_TIMESTAMP_SIGN;
269         }
270       }
271     }
272     out.write(flag);
273     if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
274       ByteBufferUtils.putCompressedInt(out, kLength);
275     }
276     if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
277       ByteBufferUtils.putCompressedInt(out, vLength);
278     }
279     ByteBufferUtils.putCompressedInt(out, commonPrefix);
280     short rLen = cell.getRowLength();
281     if (commonPrefix < rLen + KeyValue.ROW_LENGTH_SIZE) {
282       // Previous and current rows are different. Copy the differing part of
283       // the row, skip the column family, and copy the qualifier.
284       CellUtil.writeRowKeyExcludingCommon(cell, rLen, commonPrefix, out);
285       out.write(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
286     } else {
287       // The common part includes the whole row. As the column family is the
288       // same across the whole file, it will automatically be included in the
289       // common prefix, so we need not special-case it here.
290       // What we write here is the non common part of the qualifier
291       int commonQualPrefix = commonPrefix - (rLen + KeyValue.ROW_LENGTH_SIZE)
292           - (cell.getFamilyLength() + KeyValue.FAMILY_LENGTH_SIZE);
293       out.write(cell.getQualifierArray(), cell.getQualifierOffset() + commonQualPrefix,
294           cell.getQualifierLength() - commonQualPrefix);
295     }
296     if ((flag & FLAG_TIMESTAMP_IS_DIFF) == 0) {
297       ByteBufferUtils.putLong(out, timestamp, timestampFitsInBytes);
298     } else {
299       ByteBufferUtils.putLong(out, diffTimestamp, diffTimestampFitsInBytes);
300     }
301 
302     if ((flag & FLAG_SAME_TYPE) == 0) {
303       out.write(cell.getTypeByte());
304     }
305     out.write(cell.getValueArray(), cell.getValueOffset(), vLength);
306     return kLength + vLength + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE;
307   }
308 
309   @Override
310   public Cell getFirstKeyCellInBlock(ByteBuff block) {
311     block.mark();
312     block.position(Bytes.SIZEOF_INT);
313     byte familyLength = block.get();
314     block.skip(familyLength);
315     byte flag = block.get();
316     int keyLength  = ByteBuff.readCompressedInt(block);
317     // TODO : See if we can avoid these reads as the read values are not getting used
318     ByteBuff.readCompressedInt(block); // valueLength
319     ByteBuff.readCompressedInt(block); // commonLength
320     ByteBuffer result = ByteBuffer.allocate(keyLength);
321 
322     // copy row
323     assert !(result.isDirect());
324     int pos = result.arrayOffset();
325     block.get(result.array(), pos, Bytes.SIZEOF_SHORT);
326     pos += Bytes.SIZEOF_SHORT;
327     short rowLength = result.getShort();
328     block.get(result.array(), pos, rowLength);
329     pos += rowLength;
330 
331     // copy family
332     int savePosition = block.position();
333     block.position(Bytes.SIZEOF_INT);
334     block.get(result.array(), pos, familyLength + Bytes.SIZEOF_BYTE);
335     pos += familyLength + Bytes.SIZEOF_BYTE;
336 
337     // copy qualifier
338     block.position(savePosition);
339     int qualifierLength =
340         keyLength - pos + result.arrayOffset() - KeyValue.TIMESTAMP_TYPE_SIZE;
341     block.get(result.array(), pos, qualifierLength);
342     pos += qualifierLength;
343 
344     // copy the timestamp and type
345     int timestampFitInBytes =
346         ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH) + 1;
347     long timestamp = ByteBuff.readLong(block, timestampFitInBytes);
348     if ((flag & FLAG_TIMESTAMP_SIGN) != 0) {
349       timestamp = -timestamp;
350     }
351     result.putLong(pos, timestamp);
352     pos += Bytes.SIZEOF_LONG;
353     block.get(result.array(), pos, Bytes.SIZEOF_BYTE);
354 
355     block.reset();
356     // The result is already a BB. So always we will create a KeyOnlyKv.
357     return new KeyValue.KeyOnlyKeyValue(result.array(), 0, keyLength);
358   }
359 
360   @Override
361   public String toString() {
362     return DiffKeyDeltaEncoder.class.getSimpleName();
363   }
364 
365   protected static class DiffSeekerState extends SeekerState {
366 
367     private int rowLengthWithSize;
368     private long timestamp;
369 
370     public DiffSeekerState(Pair<ByteBuffer, Integer> tmpPair, boolean includeTags) {
371       super(tmpPair, includeTags);
372     }
373 
374     @Override
375     protected void copyFromNext(SeekerState that) {
376       super.copyFromNext(that);
377       DiffSeekerState other = (DiffSeekerState) that;
378       rowLengthWithSize = other.rowLengthWithSize;
379       timestamp = other.timestamp;
380     }
381   }
382 
383   @Override
384   public EncodedSeeker createSeeker(CellComparator comparator,
385       HFileBlockDecodingContext decodingCtx) {
386     return new BufferedEncodedSeeker<DiffSeekerState>(comparator, decodingCtx) {
387       private byte[] familyNameWithSize;
388       private static final int TIMESTAMP_WITH_TYPE_LENGTH =
389           Bytes.SIZEOF_LONG + Bytes.SIZEOF_BYTE;
390 
391       private void decode(boolean isFirst) {
392         byte flag = currentBuffer.get();
393         byte type = 0;
394         if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
395           if (!isFirst) {
396             type = current.keyBuffer[current.keyLength - Bytes.SIZEOF_BYTE];
397           }
398           current.keyLength = ByteBuff.readCompressedInt(currentBuffer);
399         }
400         if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
401           current.valueLength = ByteBuff.readCompressedInt(currentBuffer);
402         }
403         current.lastCommonPrefix = ByteBuff.readCompressedInt(currentBuffer);
404 
405         current.ensureSpaceForKey();
406 
407         if (current.lastCommonPrefix < Bytes.SIZEOF_SHORT) {
408           // length of row is different, copy everything except family
409 
410           // copy the row size
411           currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
412               Bytes.SIZEOF_SHORT - current.lastCommonPrefix);
413           current.rowLengthWithSize = Bytes.toShort(current.keyBuffer, 0) +
414               Bytes.SIZEOF_SHORT;
415 
416           // copy the rest of row
417           currentBuffer.get(current.keyBuffer, Bytes.SIZEOF_SHORT,
418               current.rowLengthWithSize - Bytes.SIZEOF_SHORT);
419 
420           // copy the column family
421           System.arraycopy(familyNameWithSize, 0, current.keyBuffer,
422               current.rowLengthWithSize, familyNameWithSize.length);
423 
424           // copy the qualifier
425           currentBuffer.get(current.keyBuffer,
426               current.rowLengthWithSize + familyNameWithSize.length,
427               current.keyLength - current.rowLengthWithSize -
428               familyNameWithSize.length - TIMESTAMP_WITH_TYPE_LENGTH);
429         } else if (current.lastCommonPrefix < current.rowLengthWithSize) {
430           // we have to copy part of row and qualifier,
431           // but column family is in right place
432 
433           // before column family (rest of row)
434           currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
435               current.rowLengthWithSize - current.lastCommonPrefix);
436 
437           // after column family (qualifier)
438           currentBuffer.get(current.keyBuffer,
439               current.rowLengthWithSize + familyNameWithSize.length,
440               current.keyLength - current.rowLengthWithSize -
441               familyNameWithSize.length - TIMESTAMP_WITH_TYPE_LENGTH);
442         } else {
443           // copy just the ending
444           currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
445               current.keyLength - TIMESTAMP_WITH_TYPE_LENGTH -
446               current.lastCommonPrefix);
447         }
448 
449         // timestamp
450         int pos = current.keyLength - TIMESTAMP_WITH_TYPE_LENGTH;
451         int timestampFitInBytes = 1 +
452             ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH);
453         long timestampOrDiff = ByteBuff.readLong(currentBuffer, timestampFitInBytes);
454         if ((flag & FLAG_TIMESTAMP_SIGN) != 0) {
455           timestampOrDiff = -timestampOrDiff;
456         }
457         if ((flag & FLAG_TIMESTAMP_IS_DIFF) == 0) { // it is timestamp
458           current.timestamp = timestampOrDiff;
459         } else { // it is diff
460           current.timestamp = current.timestamp - timestampOrDiff;
461         }
462         Bytes.putLong(current.keyBuffer, pos, current.timestamp);
463         pos += Bytes.SIZEOF_LONG;
464 
465         // type
466         if ((flag & FLAG_SAME_TYPE) == 0) {
467           currentBuffer.get(current.keyBuffer, pos, Bytes.SIZEOF_BYTE);
468         } else if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
469           current.keyBuffer[pos] = type;
470         }
471 
472         current.valueOffset = currentBuffer.position();
473         currentBuffer.skip(current.valueLength);
474 
475         if (includesTags()) {
476           decodeTags();
477         }
478         if (includesMvcc()) {
479           current.memstoreTS = ByteBuff.readVLong(currentBuffer);
480         } else {
481           current.memstoreTS = 0;
482         }
483         current.nextKvOffset = currentBuffer.position();
484       }
485 
486       @Override
487       protected void decodeFirst() {
488         currentBuffer.skip(Bytes.SIZEOF_INT);
489 
490         // read column family
491         byte familyNameLength = currentBuffer.get();
492         familyNameWithSize = new byte[familyNameLength + Bytes.SIZEOF_BYTE];
493         familyNameWithSize[0] = familyNameLength;
494         currentBuffer.get(familyNameWithSize, Bytes.SIZEOF_BYTE,
495             familyNameLength);
496         decode(true);
497       }
498 
499       @Override
500       protected void decodeNext() {
501         decode(false);
502       }
503 
504       @Override
505       protected DiffSeekerState createSeekerState() {
506         return new DiffSeekerState(this.tmpPair, this.includesTags());
507       }
508     };
509   }
510 
511   @Override
512   protected ByteBuffer internalDecodeKeyValues(DataInputStream source, int allocateHeaderLength,
513       int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx) throws IOException {
514     int decompressedSize = source.readInt();
515     ByteBuffer buffer = ByteBuffer.allocate(decompressedSize +
516         allocateHeaderLength);
517     buffer.position(allocateHeaderLength);
518     DiffCompressionState state = new DiffCompressionState();
519     while (source.available() > skipLastBytes) {
520       uncompressSingleKeyValue(source, buffer, state);
521       afterDecodingKeyValue(source, buffer, decodingCtx);
522     }
523 
524     if (source.available() != skipLastBytes) {
525       throw new IllegalStateException("Read too much bytes.");
526     }
527 
528     return buffer;
529   }
530 }