View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations
15   * under the License.
16   */
17  package org.apache.hadoop.hbase.io.encoding;
18  
19  import java.io.DataInputStream;
20  import java.io.DataOutputStream;
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  
24  import org.apache.hadoop.hbase.Cell;
25  import org.apache.hadoop.hbase.CellComparator;
26  import org.apache.hadoop.hbase.CellUtil;
27  import org.apache.hadoop.hbase.KeyValue;
28  import org.apache.hadoop.hbase.KeyValueUtil;
29  import org.apache.hadoop.hbase.classification.InterfaceAudience;
30  import org.apache.hadoop.hbase.nio.ByteBuff;
31  import org.apache.hadoop.hbase.util.ByteBufferUtils;
32  import org.apache.hadoop.hbase.util.Bytes;
33  import org.apache.hadoop.hbase.util.ObjectIntPair;
34  
35  /**
36   * Compress using:
37   * - store size of common prefix
38   * - save column family once, it is same within HFile
39   * - use integer compression for key, value and prefix (7-bit encoding)
40   * - use bits to avoid duplication key length, value length
41   *   and type if it same as previous
42   * - store in 3 bits length of timestamp field
43   * - allow diff in timestamp instead of actual value
44   *
45   * Format:
46   * - 1 byte:    flag
47   * - 1-5 bytes: key length (only if FLAG_SAME_KEY_LENGTH is not set in flag)
48   * - 1-5 bytes: value length (only if FLAG_SAME_VALUE_LENGTH is not set in flag)
49   * - 1-5 bytes: prefix length
50   * - ... bytes: rest of the row (if prefix length is small enough)
51   * - ... bytes: qualifier (or suffix depending on prefix length)
52   * - 1-8 bytes: timestamp or diff
53   * - 1 byte:    type (only if FLAG_SAME_TYPE is not set in the flag)
54   * - ... bytes: value
55   */
56  @InterfaceAudience.Private
57  public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder {
58    static final int FLAG_SAME_KEY_LENGTH = 1;
59    static final int FLAG_SAME_VALUE_LENGTH = 1 << 1;
60    static final int FLAG_SAME_TYPE = 1 << 2;
61    static final int FLAG_TIMESTAMP_IS_DIFF = 1 << 3;
62    static final int MASK_TIMESTAMP_LENGTH = (1 << 4) | (1 << 5) | (1 << 6);
63    static final int SHIFT_TIMESTAMP_LENGTH = 4;
64    static final int FLAG_TIMESTAMP_SIGN = 1 << 7;
65  
66    protected static class DiffCompressionState extends CompressionState {
67      long timestamp;
68      byte[] familyNameWithSize;
69  
70      @Override
71      protected void readTimestamp(ByteBuffer in) {
72        timestamp = in.getLong();
73      }
74  
75      @Override
76      void copyFrom(CompressionState state) {
77        super.copyFrom(state);
78        DiffCompressionState state2 = (DiffCompressionState) state;
79        timestamp = state2.timestamp;
80      }
81    }
82  
83    private void uncompressSingleKeyValue(DataInputStream source,
84        ByteBuffer buffer,
85        DiffCompressionState state)
86            throws IOException, EncoderBufferTooSmallException {
87      // read the column family at the beginning
88      if (state.isFirst()) {
89        state.familyLength = source.readByte();
90        state.familyNameWithSize =
91            new byte[(state.familyLength & 0xff) + KeyValue.FAMILY_LENGTH_SIZE];
92        state.familyNameWithSize[0] = state.familyLength;
93        int read = source.read(state.familyNameWithSize, KeyValue.FAMILY_LENGTH_SIZE,
94            state.familyLength);
95        assert read == state.familyLength;
96      }
97  
98      // read flag
99      byte flag = source.readByte();
100 
101     // read key/value/common lengths
102     int keyLength;
103     int valueLength;
104     if ((flag & FLAG_SAME_KEY_LENGTH) != 0) {
105       keyLength = state.keyLength;
106     } else {
107       keyLength = ByteBufferUtils.readCompressedInt(source);
108     }
109     if ((flag & FLAG_SAME_VALUE_LENGTH) != 0) {
110       valueLength = state.valueLength;
111     } else {
112       valueLength = ByteBufferUtils.readCompressedInt(source);
113     }
114     int commonPrefix = ByteBufferUtils.readCompressedInt(source);
115 
116     // create KeyValue buffer and fill it prefix
117     int keyOffset = buffer.position();
118     ensureSpace(buffer, keyLength + valueLength + KeyValue.ROW_OFFSET);
119     buffer.putInt(keyLength);
120     buffer.putInt(valueLength);
121 
122     // copy common from previous key
123     if (commonPrefix > 0) {
124       ByteBufferUtils.copyFromBufferToBuffer(buffer, buffer, state.prevOffset
125           + KeyValue.ROW_OFFSET, commonPrefix);
126     }
127 
128     // copy the rest of the key from the buffer
129     int keyRestLength;
130     if (state.isFirst() || commonPrefix <
131         state.rowLength + KeyValue.ROW_LENGTH_SIZE) {
132       // omit the family part of the key, it is always the same
133       short rowLength;
134       int rowRestLength;
135 
136       // check length of row
137       if (commonPrefix < KeyValue.ROW_LENGTH_SIZE) {
138         // not yet copied, do it now
139         ByteBufferUtils.copyFromStreamToBuffer(buffer, source,
140             KeyValue.ROW_LENGTH_SIZE - commonPrefix);
141         ByteBufferUtils.skip(buffer, -KeyValue.ROW_LENGTH_SIZE);
142         rowLength = buffer.getShort();
143         rowRestLength = rowLength;
144       } else {
145         // already in buffer, just read it
146         rowLength = buffer.getShort(keyOffset + KeyValue.ROW_OFFSET);
147         rowRestLength = rowLength + KeyValue.ROW_LENGTH_SIZE - commonPrefix;
148       }
149 
150       // copy the rest of row
151       ByteBufferUtils.copyFromStreamToBuffer(buffer, source, rowRestLength);
152       state.rowLength = rowLength;
153 
154       // copy the column family
155       buffer.put(state.familyNameWithSize);
156 
157       keyRestLength = keyLength - rowLength -
158           state.familyNameWithSize.length -
159           (KeyValue.ROW_LENGTH_SIZE + KeyValue.TIMESTAMP_TYPE_SIZE);
160     } else {
161       // prevRowWithSizeLength is the same as on previous row
162       keyRestLength = keyLength - commonPrefix - KeyValue.TIMESTAMP_TYPE_SIZE;
163     }
164     // copy the rest of the key, after column family -> column qualifier
165     ByteBufferUtils.copyFromStreamToBuffer(buffer, source, keyRestLength);
166 
167     // handle timestamp
168     int timestampFitsInBytes =
169         ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH) + 1;
170     long timestamp = ByteBufferUtils.readLong(source, timestampFitsInBytes);
171     if ((flag & FLAG_TIMESTAMP_SIGN) != 0) {
172       timestamp = -timestamp;
173     }
174     if ((flag & FLAG_TIMESTAMP_IS_DIFF) != 0) {
175       timestamp = state.timestamp - timestamp;
176     }
177     buffer.putLong(timestamp);
178 
179     // copy the type field
180     byte type;
181     if ((flag & FLAG_SAME_TYPE) != 0) {
182       type = state.type;
183     } else {
184       type = source.readByte();
185     }
186     buffer.put(type);
187 
188     // copy value part
189     ByteBufferUtils.copyFromStreamToBuffer(buffer, source, valueLength);
190 
191     state.keyLength = keyLength;
192     state.valueLength = valueLength;
193     state.prevOffset = keyOffset;
194     state.timestamp = timestamp;
195     state.type = type;
196     // state.qualifier is unused
197   }
198 
199   @Override
200   public int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingContext,
201       DataOutputStream out) throws IOException {
202     EncodingState state = encodingContext.getEncodingState();
203     int size = compressSingleKeyValue(out, cell, state.prevCell);
204     size += afterEncodingKeyValue(cell, out, encodingContext);
205     state.prevCell = cell;
206     return size;
207   }
208 
209   private int compressSingleKeyValue(DataOutputStream out, Cell cell, Cell prevCell)
210       throws IOException {
211     byte flag = 0;
212     int kLength = KeyValueUtil.keyLength(cell);
213     int vLength = cell.getValueLength();
214 
215     long timestamp;
216     long diffTimestamp = 0;
217     int diffTimestampFitsInBytes = 0;
218     int timestampFitsInBytes;
219     int commonPrefix = 0;
220 
221     if (prevCell == null) {
222       timestamp = cell.getTimestamp();
223       if (timestamp < 0) {
224         flag |= FLAG_TIMESTAMP_SIGN;
225         timestamp = -timestamp;
226       }
227       timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp);
228       flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
229       // put column family
230       byte familyLength = cell.getFamilyLength();
231       out.write(familyLength);
232       CellUtil.writeFamily(out, cell, familyLength);
233     } else {
234       // Finding common prefix
235       int preKeyLength = KeyValueUtil.keyLength(prevCell);
236       commonPrefix = CellUtil.findCommonPrefixInFlatKey(cell, prevCell, true, false);
237       if (kLength == preKeyLength) {
238         flag |= FLAG_SAME_KEY_LENGTH;
239       }
240       if (vLength == prevCell.getValueLength()) {
241         flag |= FLAG_SAME_VALUE_LENGTH;
242       }
243       if (cell.getTypeByte() == prevCell.getTypeByte()) {
244         flag |= FLAG_SAME_TYPE;
245       }
246       // don't compress timestamp and type using prefix encode timestamp
247       timestamp = cell.getTimestamp();
248       diffTimestamp = prevCell.getTimestamp() - timestamp;
249       boolean negativeTimestamp = timestamp < 0;
250       if (negativeTimestamp) {
251         timestamp = -timestamp;
252       }
253       timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp);
254       boolean minusDiffTimestamp = diffTimestamp < 0;
255       if (minusDiffTimestamp) {
256         diffTimestamp = -diffTimestamp;
257       }
258       diffTimestampFitsInBytes = ByteBufferUtils.longFitsIn(diffTimestamp);
259       if (diffTimestampFitsInBytes < timestampFitsInBytes) {
260         flag |= (diffTimestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
261         flag |= FLAG_TIMESTAMP_IS_DIFF;
262         if (minusDiffTimestamp) {
263           flag |= FLAG_TIMESTAMP_SIGN;
264         }
265       } else {
266         flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
267         if (negativeTimestamp) {
268           flag |= FLAG_TIMESTAMP_SIGN;
269         }
270       }
271     }
272     out.write(flag);
273     if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
274       ByteBufferUtils.putCompressedInt(out, kLength);
275     }
276     if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
277       ByteBufferUtils.putCompressedInt(out, vLength);
278     }
279     ByteBufferUtils.putCompressedInt(out, commonPrefix);
280     short rLen = cell.getRowLength();
281     if (commonPrefix < rLen + KeyValue.ROW_LENGTH_SIZE) {
282       // Previous and current rows are different. Copy the differing part of
283       // the row, skip the column family, and copy the qualifier.
284       CellUtil.writeRowKeyExcludingCommon(cell, rLen, commonPrefix, out);
285       CellUtil.writeQualifier(out, cell, cell.getQualifierLength());
286     } else {
287       // The common part includes the whole row. As the column family is the
288       // same across the whole file, it will automatically be included in the
289       // common prefix, so we need not special-case it here.
290       // What we write here is the non common part of the qualifier
291       int commonQualPrefix = commonPrefix - (rLen + KeyValue.ROW_LENGTH_SIZE)
292           - (cell.getFamilyLength() + KeyValue.FAMILY_LENGTH_SIZE);
293       CellUtil.writeQualifierSkippingBytes(out, cell, cell.getQualifierLength(),
294         commonQualPrefix);
295     }
296     if ((flag & FLAG_TIMESTAMP_IS_DIFF) == 0) {
297       ByteBufferUtils.putLong(out, timestamp, timestampFitsInBytes);
298     } else {
299       ByteBufferUtils.putLong(out, diffTimestamp, diffTimestampFitsInBytes);
300     }
301 
302     if ((flag & FLAG_SAME_TYPE) == 0) {
303       out.write(cell.getTypeByte());
304     }
305     CellUtil.writeValue(out, cell, vLength);
306     return kLength + vLength + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE;
307   }
308 
309   @Override
310   public Cell getFirstKeyCellInBlock(ByteBuff block) {
311     block.mark();
312     block.position(Bytes.SIZEOF_INT);
313     byte familyLength = block.get();
314     block.skip(familyLength);
315     byte flag = block.get();
316     int keyLength  = ByteBuff.readCompressedInt(block);
317     // TODO : See if we can avoid these reads as the read values are not getting used
318     ByteBuff.readCompressedInt(block); // valueLength
319     ByteBuff.readCompressedInt(block); // commonLength
320     ByteBuffer result = ByteBuffer.allocate(keyLength);
321 
322     // copy row
323     assert !(result.isDirect());
324     int pos = result.arrayOffset();
325     block.get(result.array(), pos, Bytes.SIZEOF_SHORT);
326     pos += Bytes.SIZEOF_SHORT;
327     short rowLength = result.getShort();
328     block.get(result.array(), pos, rowLength);
329     pos += rowLength;
330 
331     // copy family
332     int savePosition = block.position();
333     block.position(Bytes.SIZEOF_INT);
334     block.get(result.array(), pos, familyLength + Bytes.SIZEOF_BYTE);
335     pos += familyLength + Bytes.SIZEOF_BYTE;
336 
337     // copy qualifier
338     block.position(savePosition);
339     int qualifierLength =
340         keyLength - pos + result.arrayOffset() - KeyValue.TIMESTAMP_TYPE_SIZE;
341     block.get(result.array(), pos, qualifierLength);
342     pos += qualifierLength;
343 
344     // copy the timestamp and type
345     int timestampFitInBytes =
346         ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH) + 1;
347     long timestamp = ByteBuff.readLong(block, timestampFitInBytes);
348     if ((flag & FLAG_TIMESTAMP_SIGN) != 0) {
349       timestamp = -timestamp;
350     }
351     result.putLong(pos, timestamp);
352     pos += Bytes.SIZEOF_LONG;
353     block.get(result.array(), pos, Bytes.SIZEOF_BYTE);
354 
355     block.reset();
356     // The result is already a BB. So always we will create a KeyOnlyKv.
357     return new KeyValue.KeyOnlyKeyValue(result.array(), 0, keyLength);
358   }
359 
360   @Override
361   public String toString() {
362     return DiffKeyDeltaEncoder.class.getSimpleName();
363   }
364 
365   protected static class DiffSeekerState extends SeekerState {
366 
367     private int rowLengthWithSize;
368     private long timestamp;
369 
370     public DiffSeekerState(ObjectIntPair<ByteBuffer> tmpPair,
371         boolean includeTags) {
372       super(tmpPair, includeTags);
373     }
374
375     @Override
376     protected void copyFromNext(SeekerState that) {
377       super.copyFromNext(that);
378       DiffSeekerState other = (DiffSeekerState) that;
379       rowLengthWithSize = other.rowLengthWithSize;
380       timestamp = other.timestamp;
381     }
382   }
383
384   @Override
385   public EncodedSeeker createSeeker(CellComparator comparator,
386       HFileBlockDecodingContext decodingCtx) {
387     return new BufferedEncodedSeeker<DiffSeekerState>(comparator, decodingCtx) {
388       private byte[] familyNameWithSize;
389       private static final int TIMESTAMP_WITH_TYPE_LENGTH =
390           Bytes.SIZEOF_LONG + Bytes.SIZEOF_BYTE;
391
392       private void decode(boolean isFirst) {
393         byte flag = currentBuffer.get();
394         byte type = 0;
395         if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
396           if (!isFirst) {
397             type = current.keyBuffer[current.keyLength - Bytes.SIZEOF_BYTE];
398           }
399           current.keyLength = ByteBuff.readCompressedInt(currentBuffer);
400         }
401         if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
402           current.valueLength = ByteBuff.readCompressedInt(currentBuffer);
403         }
404         current.lastCommonPrefix = ByteBuff.readCompressedInt(currentBuffer);
405
406         current.ensureSpaceForKey();
407
408         if (current.lastCommonPrefix < Bytes.SIZEOF_SHORT) {
409           // length of row is different, copy everything except family
410
411           // copy the row size
412           currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
413               Bytes.SIZEOF_SHORT - current.lastCommonPrefix);
414           current.rowLengthWithSize = Bytes.toShort(current.keyBuffer, 0) +
415               Bytes.SIZEOF_SHORT;
416
417           // copy the rest of row
418           currentBuffer.get(current.keyBuffer, Bytes.SIZEOF_SHORT,
419               current.rowLengthWithSize - Bytes.SIZEOF_SHORT);
420
421           // copy the column family
422           System.arraycopy(familyNameWithSize, 0, current.keyBuffer,
423               current.rowLengthWithSize, familyNameWithSize.length);
424
425           // copy the qualifier
426           currentBuffer.get(current.keyBuffer,
427               current.rowLengthWithSize + familyNameWithSize.length,
428               current.keyLength - current.rowLengthWithSize -
429               familyNameWithSize.length - TIMESTAMP_WITH_TYPE_LENGTH);
430         } else if (current.lastCommonPrefix < current.rowLengthWithSize) {
431           // we have to copy part of row and qualifier,
432           // but column family is in right place
433
434           // before column family (rest of row)
435           currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
436               current.rowLengthWithSize - current.lastCommonPrefix);
437
438           // after column family (qualifier)
439           currentBuffer.get(current.keyBuffer,
440               current.rowLengthWithSize + familyNameWithSize.length,
441               current.keyLength - current.rowLengthWithSize -
442               familyNameWithSize.length - TIMESTAMP_WITH_TYPE_LENGTH);
443         } else {
444           // copy just the ending
445           currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
446               current.keyLength - TIMESTAMP_WITH_TYPE_LENGTH -
447               current.lastCommonPrefix);
448         }
449
450         // timestamp
451         int pos = current.keyLength - TIMESTAMP_WITH_TYPE_LENGTH;
452         int timestampFitInBytes = 1 +
453             ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH);
454         long timestampOrDiff = ByteBuff.readLong(currentBuffer, timestampFitInBytes);
455         if ((flag & FLAG_TIMESTAMP_SIGN) != 0) {
456           timestampOrDiff = -timestampOrDiff;
457         }
458         if ((flag & FLAG_TIMESTAMP_IS_DIFF) == 0) { // it is timestamp
459           current.timestamp = timestampOrDiff;
460         } else { // it is diff
461           current.timestamp = current.timestamp - timestampOrDiff;
462         }
463         Bytes.putLong(current.keyBuffer, pos, current.timestamp);
464         pos += Bytes.SIZEOF_LONG;
465
466         // type
467         if ((flag & FLAG_SAME_TYPE) == 0) {
468           currentBuffer.get(current.keyBuffer, pos, Bytes.SIZEOF_BYTE);
469         } else if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
470           current.keyBuffer[pos] = type;
471         }
472
473         current.valueOffset = currentBuffer.position();
474         currentBuffer.skip(current.valueLength);
475
476         if (includesTags()) {
477           decodeTags();
478         }
479         if (includesMvcc()) {
480           current.memstoreTS = ByteBuff.readVLong(currentBuffer);
481         } else {
482           current.memstoreTS = 0;
483         }
484         current.nextKvOffset = currentBuffer.position();
485       }
486
487       @Override
488       protected void decodeFirst() {
489         currentBuffer.skip(Bytes.SIZEOF_INT);
490
491         // read column family
492         byte familyNameLength = currentBuffer.get();
493         familyNameWithSize = new byte[familyNameLength + Bytes.SIZEOF_BYTE];
494         familyNameWithSize[0] = familyNameLength;
495         currentBuffer.get(familyNameWithSize, Bytes.SIZEOF_BYTE,
496             familyNameLength);
497         decode(true);
498       }
499
500       @Override
501       protected void decodeNext() {
502         decode(false);
503       }
504
505       @Override
506       protected DiffSeekerState createSeekerState() {
507         return new DiffSeekerState(this.tmpPair, this.includesTags());
508       }
509     };
510   }
511
512   @Override
513   protected ByteBuffer internalDecodeKeyValues(DataInputStream source, int allocateHeaderLength,
514       int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx) throws IOException {
515     int decompressedSize = source.readInt();
516     ByteBuffer buffer = ByteBuffer.allocate(decompressedSize +
517         allocateHeaderLength);
518     buffer.position(allocateHeaderLength);
519     DiffCompressionState state = new DiffCompressionState();
520     while (source.available() > skipLastBytes) {
521       uncompressSingleKeyValue(source, buffer, state);
522       afterDecodingKeyValue(source, buffer, decodingCtx);
523     }
524
525     if (source.available() != skipLastBytes) {
526       throw new IllegalStateException("Read too much bytes.");
527     }
528
529     return buffer;
530   }
531 }