View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations
15   * under the License.
16   */
17  package org.apache.hadoop.hbase.io.encoding;
18  
19  import java.io.DataInputStream;
20  import java.io.DataOutputStream;
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  
24  import org.apache.hadoop.hbase.classification.InterfaceAudience;
25  import org.apache.hadoop.hbase.Cell;
26  import org.apache.hadoop.hbase.CellUtil;
27  import org.apache.hadoop.hbase.KeyValue;
28  import org.apache.hadoop.hbase.KeyValueUtil;
29  import org.apache.hadoop.hbase.KeyValue.KVComparator;
30  import org.apache.hadoop.hbase.util.ByteBufferUtils;
31  import org.apache.hadoop.hbase.util.Bytes;
32  
33  /**
34   * Compress using:
35   * - store size of common prefix
36   * - save column family once, it is same within HFile
37   * - use integer compression for key, value and prefix (7-bit encoding)
38   * - use bits to avoid duplication key length, value length
39   *   and type if it same as previous
40   * - store in 3 bits length of timestamp field
41   * - allow diff in timestamp instead of actual value
42   *
43   * Format:
44   * - 1 byte:    flag
45   * - 1-5 bytes: key length (only if FLAG_SAME_KEY_LENGTH is not set in flag)
46   * - 1-5 bytes: value length (only if FLAG_SAME_VALUE_LENGTH is not set in flag)
47   * - 1-5 bytes: prefix length
48   * - ... bytes: rest of the row (if prefix length is small enough)
49   * - ... bytes: qualifier (or suffix depending on prefix length)
50   * - 1-8 bytes: timestamp or diff
51   * - 1 byte:    type (only if FLAG_SAME_TYPE is not set in the flag)
52   * - ... bytes: value
53   */
54  @InterfaceAudience.Private
55  public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder {
56    static final int FLAG_SAME_KEY_LENGTH = 1;
57    static final int FLAG_SAME_VALUE_LENGTH = 1 << 1;
58    static final int FLAG_SAME_TYPE = 1 << 2;
59    static final int FLAG_TIMESTAMP_IS_DIFF = 1 << 3;
60    static final int MASK_TIMESTAMP_LENGTH = (1 << 4) | (1 << 5) | (1 << 6);
61    static final int SHIFT_TIMESTAMP_LENGTH = 4;
62    static final int FLAG_TIMESTAMP_SIGN = 1 << 7;
63  
64    protected static class DiffCompressionState extends CompressionState {
65      long timestamp;
66      byte[] familyNameWithSize;
67  
68      @Override
69      protected void readTimestamp(ByteBuffer in) {
70        timestamp = in.getLong();
71      }
72  
73      @Override
74      void copyFrom(CompressionState state) {
75        super.copyFrom(state);
76        DiffCompressionState state2 = (DiffCompressionState) state;
77        timestamp = state2.timestamp;
78      }
79    }
80  
81    private void uncompressSingleKeyValue(DataInputStream source,
82        ByteBuffer buffer,
83        DiffCompressionState state)
84            throws IOException, EncoderBufferTooSmallException {
85      // read the column family at the beginning
86      if (state.isFirst()) {
87        state.familyLength = source.readByte();
88        state.familyNameWithSize =
89            new byte[(state.familyLength & 0xff) + KeyValue.FAMILY_LENGTH_SIZE];
90        state.familyNameWithSize[0] = state.familyLength;
91        int read = source.read(state.familyNameWithSize, KeyValue.FAMILY_LENGTH_SIZE,
92            state.familyLength);
93        assert read == state.familyLength;
94      }
95  
96      // read flag
97      byte flag = source.readByte();
98  
99      // read key/value/common lengths
100     int keyLength;
101     int valueLength;
102     if ((flag & FLAG_SAME_KEY_LENGTH) != 0) {
103       keyLength = state.keyLength;
104     } else {
105       keyLength = ByteBufferUtils.readCompressedInt(source);
106     }
107     if ((flag & FLAG_SAME_VALUE_LENGTH) != 0) {
108       valueLength = state.valueLength;
109     } else {
110       valueLength = ByteBufferUtils.readCompressedInt(source);
111     }
112     int commonPrefix = ByteBufferUtils.readCompressedInt(source);
113 
114     // create KeyValue buffer and fill it prefix
115     int keyOffset = buffer.position();
116     ensureSpace(buffer, keyLength + valueLength + KeyValue.ROW_OFFSET);
117     buffer.putInt(keyLength);
118     buffer.putInt(valueLength);
119 
120     // copy common from previous key
121     if (commonPrefix > 0) {
122       ByteBufferUtils.copyFromBufferToBuffer(buffer, buffer, state.prevOffset
123           + KeyValue.ROW_OFFSET, commonPrefix);
124     }
125 
126     // copy the rest of the key from the buffer
127     int keyRestLength;
128     if (state.isFirst() || commonPrefix <
129         state.rowLength + KeyValue.ROW_LENGTH_SIZE) {
130       // omit the family part of the key, it is always the same
131       short rowLength;
132       int rowRestLength;
133 
134       // check length of row
135       if (commonPrefix < KeyValue.ROW_LENGTH_SIZE) {
136         // not yet copied, do it now
137         ByteBufferUtils.copyFromStreamToBuffer(buffer, source,
138             KeyValue.ROW_LENGTH_SIZE - commonPrefix);
139         ByteBufferUtils.skip(buffer, -KeyValue.ROW_LENGTH_SIZE);
140         rowLength = buffer.getShort();
141         rowRestLength = rowLength;
142       } else {
143         // already in buffer, just read it
144         rowLength = buffer.getShort(keyOffset + KeyValue.ROW_OFFSET);
145         rowRestLength = rowLength + KeyValue.ROW_LENGTH_SIZE - commonPrefix;
146       }
147 
148       // copy the rest of row
149       ByteBufferUtils.copyFromStreamToBuffer(buffer, source, rowRestLength);
150       state.rowLength = rowLength;
151 
152       // copy the column family
153       buffer.put(state.familyNameWithSize);
154 
155       keyRestLength = keyLength - rowLength -
156           state.familyNameWithSize.length -
157           (KeyValue.ROW_LENGTH_SIZE + KeyValue.TIMESTAMP_TYPE_SIZE);
158     } else {
159       // prevRowWithSizeLength is the same as on previous row
160       keyRestLength = keyLength - commonPrefix - KeyValue.TIMESTAMP_TYPE_SIZE;
161     }
162     // copy the rest of the key, after column family -> column qualifier
163     ByteBufferUtils.copyFromStreamToBuffer(buffer, source, keyRestLength);
164 
165     // handle timestamp
166     int timestampFitsInBytes =
167         ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH) + 1;
168     long timestamp = ByteBufferUtils.readLong(source, timestampFitsInBytes);
169     if ((flag & FLAG_TIMESTAMP_SIGN) != 0) {
170       timestamp = -timestamp;
171     }
172     if ((flag & FLAG_TIMESTAMP_IS_DIFF) != 0) {
173       timestamp = state.timestamp - timestamp;
174     }
175     buffer.putLong(timestamp);
176 
177     // copy the type field
178     byte type;
179     if ((flag & FLAG_SAME_TYPE) != 0) {
180       type = state.type;
181     } else {
182       type = source.readByte();
183     }
184     buffer.put(type);
185 
186     // copy value part
187     ByteBufferUtils.copyFromStreamToBuffer(buffer, source, valueLength);
188 
189     state.keyLength = keyLength;
190     state.valueLength = valueLength;
191     state.prevOffset = keyOffset;
192     state.timestamp = timestamp;
193     state.type = type;
194     // state.qualifier is unused
195   }
196 
197   @Override
198   public int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingContext,
199       DataOutputStream out) throws IOException {
200     EncodingState state = encodingContext.getEncodingState();
201     int size = compressSingleKeyValue(out, cell, state.prevCell);
202     size += afterEncodingKeyValue(cell, out, encodingContext);
203     state.prevCell = cell;
204     return size;
205   }
206 
207   private int compressSingleKeyValue(DataOutputStream out, Cell cell, Cell prevCell)
208       throws IOException {
209     byte flag = 0;
210     int kLength = KeyValueUtil.keyLength(cell);
211     int vLength = cell.getValueLength();
212 
213     long timestamp;
214     long diffTimestamp = 0;
215     int diffTimestampFitsInBytes = 0;
216     int timestampFitsInBytes;
217     int commonPrefix = 0;
218 
219     if (prevCell == null) {
220       timestamp = cell.getTimestamp();
221       if (timestamp < 0) {
222         flag |= FLAG_TIMESTAMP_SIGN;
223         timestamp = -timestamp;
224       }
225       timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp);
226       flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
227       // put column family
228       byte familyLength = cell.getFamilyLength();
229       out.write(familyLength);
230       out.write(cell.getFamilyArray(), cell.getFamilyOffset(), familyLength);
231     } else {
232       // Finding common prefix
233       int preKeyLength = KeyValueUtil.keyLength(prevCell);
234       commonPrefix = CellUtil.findCommonPrefixInFlatKey(cell, prevCell, true, false);
235       if (kLength == preKeyLength) {
236         flag |= FLAG_SAME_KEY_LENGTH;
237       }
238       if (vLength == prevCell.getValueLength()) {
239         flag |= FLAG_SAME_VALUE_LENGTH;
240       }
241       if (cell.getTypeByte() == prevCell.getTypeByte()) {
242         flag |= FLAG_SAME_TYPE;
243       }
244       // don't compress timestamp and type using prefix encode timestamp
245       timestamp = cell.getTimestamp();
246       diffTimestamp = prevCell.getTimestamp() - timestamp;
247       boolean negativeTimestamp = timestamp < 0;
248       if (negativeTimestamp) {
249         timestamp = -timestamp;
250       }
251       timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp);
252       boolean minusDiffTimestamp = diffTimestamp < 0;
253       if (minusDiffTimestamp) {
254         diffTimestamp = -diffTimestamp;
255       }
256       diffTimestampFitsInBytes = ByteBufferUtils.longFitsIn(diffTimestamp);
257       if (diffTimestampFitsInBytes < timestampFitsInBytes) {
258         flag |= (diffTimestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
259         flag |= FLAG_TIMESTAMP_IS_DIFF;
260         if (minusDiffTimestamp) {
261           flag |= FLAG_TIMESTAMP_SIGN;
262         }
263       } else {
264         flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
265         if (negativeTimestamp) {
266           flag |= FLAG_TIMESTAMP_SIGN;
267         }
268       }
269     }
270     out.write(flag);
271     if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
272       ByteBufferUtils.putCompressedInt(out, kLength);
273     }
274     if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
275       ByteBufferUtils.putCompressedInt(out, vLength);
276     }
277     ByteBufferUtils.putCompressedInt(out, commonPrefix);
278     short rLen = cell.getRowLength();
279     if (commonPrefix < rLen + KeyValue.ROW_LENGTH_SIZE) {
280       // Previous and current rows are different. Copy the differing part of
281       // the row, skip the column family, and copy the qualifier.
282       CellUtil.writeRowKeyExcludingCommon(cell, rLen, commonPrefix, out);
283       out.write(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
284     } else {
285       // The common part includes the whole row. As the column family is the
286       // same across the whole file, it will automatically be included in the
287       // common prefix, so we need not special-case it here.
288       // What we write here is the non common part of the qualifier
289       int commonQualPrefix = commonPrefix - (rLen + KeyValue.ROW_LENGTH_SIZE)
290           - (cell.getFamilyLength() + KeyValue.FAMILY_LENGTH_SIZE);
291       out.write(cell.getQualifierArray(), cell.getQualifierOffset() + commonQualPrefix,
292           cell.getQualifierLength() - commonQualPrefix);
293     }
294     if ((flag & FLAG_TIMESTAMP_IS_DIFF) == 0) {
295       ByteBufferUtils.putLong(out, timestamp, timestampFitsInBytes);
296     } else {
297       ByteBufferUtils.putLong(out, diffTimestamp, diffTimestampFitsInBytes);
298     }
299 
300     if ((flag & FLAG_SAME_TYPE) == 0) {
301       out.write(cell.getTypeByte());
302     }
303     out.write(cell.getValueArray(), cell.getValueOffset(), vLength);
304     return kLength + vLength + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE;
305   }
306 
307   @Override
308   public ByteBuffer getFirstKeyInBlock(ByteBuffer block) {
309     block.mark();
310     block.position(Bytes.SIZEOF_INT);
311     byte familyLength = block.get();
312     ByteBufferUtils.skip(block, familyLength);
313     byte flag = block.get();
314     int keyLength = ByteBufferUtils.readCompressedInt(block);
315     ByteBufferUtils.readCompressedInt(block); // valueLength
316     ByteBufferUtils.readCompressedInt(block); // commonLength
317     ByteBuffer result = ByteBuffer.allocate(keyLength);
318 
319     // copy row
320     assert !(result.isDirect());
321     int pos = result.arrayOffset();
322     block.get(result.array(), pos, Bytes.SIZEOF_SHORT);
323     pos += Bytes.SIZEOF_SHORT;
324     short rowLength = result.getShort();
325     block.get(result.array(), pos, rowLength);
326     pos += rowLength;
327 
328     // copy family
329     int savePosition = block.position();
330     block.position(Bytes.SIZEOF_INT);
331     block.get(result.array(), pos, familyLength + Bytes.SIZEOF_BYTE);
332     pos += familyLength + Bytes.SIZEOF_BYTE;
333 
334     // copy qualifier
335     block.position(savePosition);
336     int qualifierLength =
337         keyLength - pos + result.arrayOffset() - KeyValue.TIMESTAMP_TYPE_SIZE;
338     block.get(result.array(), pos, qualifierLength);
339     pos += qualifierLength;
340 
341     // copy the timestamp and type
342     int timestampFitInBytes =
343         ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH) + 1;
344     long timestamp = ByteBufferUtils.readLong(block, timestampFitInBytes);
345     if ((flag & FLAG_TIMESTAMP_SIGN) != 0) {
346       timestamp = -timestamp;
347     }
348     result.putLong(pos, timestamp);
349     pos += Bytes.SIZEOF_LONG;
350     block.get(result.array(), pos, Bytes.SIZEOF_BYTE);
351 
352     block.reset();
353     return result;
354   }
355 
356   @Override
357   public String toString() {
358     return DiffKeyDeltaEncoder.class.getSimpleName();
359   }
360 
361   protected static class DiffSeekerState extends SeekerState {
362     private int rowLengthWithSize;
363     private long timestamp;
364 
365     @Override
366     protected void copyFromNext(SeekerState that) {
367       super.copyFromNext(that);
368       DiffSeekerState other = (DiffSeekerState) that;
369       rowLengthWithSize = other.rowLengthWithSize;
370       timestamp = other.timestamp;
371     }
372   }
373 
374   @Override
375   public EncodedSeeker createSeeker(KVComparator comparator,
376       HFileBlockDecodingContext decodingCtx) {
377     return new BufferedEncodedSeeker<DiffSeekerState>(comparator, decodingCtx) {
378       private byte[] familyNameWithSize;
379       private static final int TIMESTAMP_WITH_TYPE_LENGTH =
380           Bytes.SIZEOF_LONG + Bytes.SIZEOF_BYTE;
381 
382       private void decode(boolean isFirst) {
383         byte flag = currentBuffer.get();
384         byte type = 0;
385         if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
386           if (!isFirst) {
387             type = current.keyBuffer[current.keyLength - Bytes.SIZEOF_BYTE];
388           }
389           current.keyLength = ByteBufferUtils.readCompressedInt(currentBuffer);
390         }
391         if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
392           current.valueLength =
393               ByteBufferUtils.readCompressedInt(currentBuffer);
394         }
395         current.lastCommonPrefix =
396             ByteBufferUtils.readCompressedInt(currentBuffer);
397 
398         current.ensureSpaceForKey();
399 
400         if (current.lastCommonPrefix < Bytes.SIZEOF_SHORT) {
401           // length of row is different, copy everything except family
402 
403           // copy the row size
404           currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
405               Bytes.SIZEOF_SHORT - current.lastCommonPrefix);
406           current.rowLengthWithSize = Bytes.toShort(current.keyBuffer, 0) +
407               Bytes.SIZEOF_SHORT;
408 
409           // copy the rest of row
410           currentBuffer.get(current.keyBuffer, Bytes.SIZEOF_SHORT,
411               current.rowLengthWithSize - Bytes.SIZEOF_SHORT);
412 
413           // copy the column family
414           System.arraycopy(familyNameWithSize, 0, current.keyBuffer,
415               current.rowLengthWithSize, familyNameWithSize.length);
416 
417           // copy the qualifier
418           currentBuffer.get(current.keyBuffer,
419               current.rowLengthWithSize + familyNameWithSize.length,
420               current.keyLength - current.rowLengthWithSize -
421               familyNameWithSize.length - TIMESTAMP_WITH_TYPE_LENGTH);
422         } else if (current.lastCommonPrefix < current.rowLengthWithSize) {
423           // we have to copy part of row and qualifier,
424           // but column family is in right place
425 
426           // before column family (rest of row)
427           currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
428               current.rowLengthWithSize - current.lastCommonPrefix);
429 
430           // after column family (qualifier)
431           currentBuffer.get(current.keyBuffer,
432               current.rowLengthWithSize + familyNameWithSize.length,
433               current.keyLength - current.rowLengthWithSize -
434               familyNameWithSize.length - TIMESTAMP_WITH_TYPE_LENGTH);
435         } else {
436           // copy just the ending
437           currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
438               current.keyLength - TIMESTAMP_WITH_TYPE_LENGTH -
439               current.lastCommonPrefix);
440         }
441 
442         // timestamp
443         int pos = current.keyLength - TIMESTAMP_WITH_TYPE_LENGTH;
444         int timestampFitInBytes = 1 +
445             ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH);
446         long timestampOrDiff =
447             ByteBufferUtils.readLong(currentBuffer, timestampFitInBytes);
448         if ((flag & FLAG_TIMESTAMP_SIGN) != 0) {
449           timestampOrDiff = -timestampOrDiff;
450         }
451         if ((flag & FLAG_TIMESTAMP_IS_DIFF) == 0) { // it is timestamp
452           current.timestamp = timestampOrDiff;
453         } else { // it is diff
454           current.timestamp = current.timestamp - timestampOrDiff;
455         }
456         Bytes.putLong(current.keyBuffer, pos, current.timestamp);
457         pos += Bytes.SIZEOF_LONG;
458 
459         // type
460         if ((flag & FLAG_SAME_TYPE) == 0) {
461           currentBuffer.get(current.keyBuffer, pos, Bytes.SIZEOF_BYTE);
462         } else if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
463           current.keyBuffer[pos] = type;
464         }
465 
466         current.valueOffset = currentBuffer.position();
467         ByteBufferUtils.skip(currentBuffer, current.valueLength);
468 
469         if (includesTags()) {
470           decodeTags();
471         }
472         if (includesMvcc()) {
473           current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
474         } else {
475           current.memstoreTS = 0;
476         }
477         current.nextKvOffset = currentBuffer.position();
478       }
479 
480       @Override
481       protected void decodeFirst() {
482         ByteBufferUtils.skip(currentBuffer, Bytes.SIZEOF_INT);
483 
484         // read column family
485         byte familyNameLength = currentBuffer.get();
486         familyNameWithSize = new byte[familyNameLength + Bytes.SIZEOF_BYTE];
487         familyNameWithSize[0] = familyNameLength;
488         currentBuffer.get(familyNameWithSize, Bytes.SIZEOF_BYTE,
489             familyNameLength);
490         decode(true);
491       }
492 
493       @Override
494       protected void decodeNext() {
495         decode(false);
496       }
497 
498       @Override
499       protected DiffSeekerState createSeekerState() {
500         return new DiffSeekerState();
501       }
502     };
503   }
504 
505   @Override
506   protected ByteBuffer internalDecodeKeyValues(DataInputStream source, int allocateHeaderLength,
507       int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx) throws IOException {
508     int decompressedSize = source.readInt();
509     ByteBuffer buffer = ByteBuffer.allocate(decompressedSize +
510         allocateHeaderLength);
511     buffer.position(allocateHeaderLength);
512     DiffCompressionState state = new DiffCompressionState();
513     while (source.available() > skipLastBytes) {
514       uncompressSingleKeyValue(source, buffer, state);
515       afterDecodingKeyValue(source, buffer, decodingCtx);
516     }
517 
518     if (source.available() != skipLastBytes) {
519       throw new IllegalStateException("Read too much bytes.");
520     }
521 
522     return buffer;
523   }
524 }