View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations
15   * under the License.
16   */
17  package org.apache.hadoop.hbase.io.encoding;
18  
19  import java.io.DataInputStream;
20  import java.io.DataOutputStream;
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  
24  import org.apache.hadoop.hbase.Cell;
25  import org.apache.hadoop.hbase.CellUtil;
26  import org.apache.hadoop.hbase.KeyValue;
27  import org.apache.hadoop.hbase.KeyValue.KVComparator;
28  import org.apache.hadoop.hbase.KeyValueUtil;
29  import org.apache.hadoop.hbase.classification.InterfaceAudience;
30  import org.apache.hadoop.hbase.util.ByteBufferUtils;
31  import org.apache.hadoop.hbase.util.Bytes;
32  
33  /**
34   * Compress using:
35   * - store size of common prefix
36   * - save column family once, it is same within HFile
37   * - use integer compression for key, value and prefix (7-bit encoding)
38   * - use bits to avoid duplication key length, value length
39   *   and type if it same as previous
40   * - store in 3 bits length of timestamp field
41   * - allow diff in timestamp instead of actual value
42   *
43   * Format:
44   * - 1 byte:    flag
45   * - 1-5 bytes: key length (only if FLAG_SAME_KEY_LENGTH is not set in flag)
46   * - 1-5 bytes: value length (only if FLAG_SAME_VALUE_LENGTH is not set in flag)
47   * - 1-5 bytes: prefix length
48   * - ... bytes: rest of the row (if prefix length is small enough)
49   * - ... bytes: qualifier (or suffix depending on prefix length)
50   * - 1-8 bytes: timestamp or diff
51   * - 1 byte:    type (only if FLAG_SAME_TYPE is not set in the flag)
52   * - ... bytes: value
53   */
54  @InterfaceAudience.Private
55  public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder {
56    static final int FLAG_SAME_KEY_LENGTH = 1;
57    static final int FLAG_SAME_VALUE_LENGTH = 1 << 1;
58    static final int FLAG_SAME_TYPE = 1 << 2;
59    static final int FLAG_TIMESTAMP_IS_DIFF = 1 << 3;
60    static final int MASK_TIMESTAMP_LENGTH = (1 << 4) | (1 << 5) | (1 << 6);
61    static final int SHIFT_TIMESTAMP_LENGTH = 4;
62    static final int FLAG_TIMESTAMP_SIGN = 1 << 7;
63  
64    protected static class DiffCompressionState extends CompressionState {
65      long timestamp;
66      byte[] familyNameWithSize;
67  
68      @Override
69      protected void readTimestamp(ByteBuffer in) {
70        timestamp = in.getLong();
71      }
72  
73      @Override
74      void copyFrom(CompressionState state) {
75        super.copyFrom(state);
76        DiffCompressionState state2 = (DiffCompressionState) state;
77        timestamp = state2.timestamp;
78      }
79    }
80  
81    private void uncompressSingleKeyValue(DataInputStream source,
82        ByteBuffer buffer,
83        DiffCompressionState state)
84            throws IOException, EncoderBufferTooSmallException {
85      // read the column family at the beginning
86      if (state.isFirst()) {
87        state.familyLength = source.readByte();
88        state.familyNameWithSize =
89            new byte[(state.familyLength & 0xff) + KeyValue.FAMILY_LENGTH_SIZE];
90        state.familyNameWithSize[0] = state.familyLength;
91        int read = source.read(state.familyNameWithSize, KeyValue.FAMILY_LENGTH_SIZE,
92            state.familyLength);
93        assert read == state.familyLength;
94      }
95  
96      // read flag
97      byte flag = source.readByte();
98  
99      // read key/value/common lengths
100     int keyLength;
101     int valueLength;
102     if ((flag & FLAG_SAME_KEY_LENGTH) != 0) {
103       keyLength = state.keyLength;
104     } else {
105       keyLength = ByteBufferUtils.readCompressedInt(source);
106     }
107     if ((flag & FLAG_SAME_VALUE_LENGTH) != 0) {
108       valueLength = state.valueLength;
109     } else {
110       valueLength = ByteBufferUtils.readCompressedInt(source);
111     }
112     int commonPrefix = ByteBufferUtils.readCompressedInt(source);
113 
114     // create KeyValue buffer and fill it prefix
115     int keyOffset = buffer.position();
116     ensureSpace(buffer, keyLength + valueLength + KeyValue.ROW_OFFSET);
117     buffer.putInt(keyLength);
118     buffer.putInt(valueLength);
119 
120     // copy common from previous key
121     if (commonPrefix > 0) {
122       ByteBufferUtils.copyFromBufferToBuffer(buffer, buffer, state.prevOffset
123           + KeyValue.ROW_OFFSET, commonPrefix);
124     }
125 
126     // copy the rest of the key from the buffer
127     int keyRestLength;
128     if (state.isFirst() || commonPrefix <
129         state.rowLength + KeyValue.ROW_LENGTH_SIZE) {
130       // omit the family part of the key, it is always the same
131       short rowLength;
132       int rowRestLength;
133 
134       // check length of row
135       if (commonPrefix < KeyValue.ROW_LENGTH_SIZE) {
136         // not yet copied, do it now
137         ByteBufferUtils.copyFromStreamToBuffer(buffer, source,
138             KeyValue.ROW_LENGTH_SIZE - commonPrefix);
139         ByteBufferUtils.skip(buffer, -KeyValue.ROW_LENGTH_SIZE);
140         rowLength = buffer.getShort();
141         rowRestLength = rowLength;
142       } else {
143         // already in buffer, just read it
144         rowLength = buffer.getShort(keyOffset + KeyValue.ROW_OFFSET);
145         rowRestLength = rowLength + KeyValue.ROW_LENGTH_SIZE - commonPrefix;
146       }
147 
148       // copy the rest of row
149       ByteBufferUtils.copyFromStreamToBuffer(buffer, source, rowRestLength);
150       state.rowLength = rowLength;
151 
152       // copy the column family
153       buffer.put(state.familyNameWithSize);
154 
155       keyRestLength = keyLength - rowLength -
156           state.familyNameWithSize.length -
157           (KeyValue.ROW_LENGTH_SIZE + KeyValue.TIMESTAMP_TYPE_SIZE);
158     } else {
159       // prevRowWithSizeLength is the same as on previous row
160       keyRestLength = keyLength - commonPrefix - KeyValue.TIMESTAMP_TYPE_SIZE;
161     }
162     // copy the rest of the key, after column family -> column qualifier
163     ByteBufferUtils.copyFromStreamToBuffer(buffer, source, keyRestLength);
164 
165     // handle timestamp
166     int timestampFitsInBytes =
167         ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH) + 1;
168     long timestamp = ByteBufferUtils.readLong(source, timestampFitsInBytes);
169     if ((flag & FLAG_TIMESTAMP_SIGN) != 0) {
170       timestamp = -timestamp;
171     }
172     if ((flag & FLAG_TIMESTAMP_IS_DIFF) != 0) {
173       timestamp = state.timestamp - timestamp;
174     }
175     buffer.putLong(timestamp);
176 
177     // copy the type field
178     byte type;
179     if ((flag & FLAG_SAME_TYPE) != 0) {
180       type = state.type;
181     } else {
182       type = source.readByte();
183     }
184     buffer.put(type);
185 
186     // copy value part
187     ByteBufferUtils.copyFromStreamToBuffer(buffer, source, valueLength);
188 
189     state.keyLength = keyLength;
190     state.valueLength = valueLength;
191     state.prevOffset = keyOffset;
192     state.timestamp = timestamp;
193     state.type = type;
194     // state.qualifier is unused
195   }
196 
197   @Override
198   public int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingContext,
199       DataOutputStream out) throws IOException {
200     EncodingState state = encodingContext.getEncodingState();
201     int size = compressSingleKeyValue(out, cell, state.prevCell);
202     size += afterEncodingKeyValue(cell, out, encodingContext);
203     state.prevCell = cell;
204     return size;
205   }
206 
207   private int compressSingleKeyValue(DataOutputStream out, Cell cell, Cell prevCell)
208       throws IOException {
209     byte flag = 0;
210     int kLength = KeyValueUtil.keyLength(cell);
211     int vLength = cell.getValueLength();
212 
213     long timestamp;
214     long diffTimestamp = 0;
215     int diffTimestampFitsInBytes = 0;
216     int timestampFitsInBytes;
217     int commonPrefix = 0;
218 
219     if (prevCell == null) {
220       timestamp = cell.getTimestamp();
221       if (timestamp < 0) {
222         flag |= FLAG_TIMESTAMP_SIGN;
223         timestamp = -timestamp;
224       }
225       timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp);
226       flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
227       // put column family
228       byte familyLength = cell.getFamilyLength();
229       out.write(familyLength);
230       out.write(cell.getFamilyArray(), cell.getFamilyOffset(), familyLength);
231     } else {
232       // Finding common prefix
233       int preKeyLength = KeyValueUtil.keyLength(prevCell);
234       commonPrefix = CellUtil.findCommonPrefixInFlatKey(cell, prevCell, true, false);
235       if (kLength == preKeyLength) {
236         flag |= FLAG_SAME_KEY_LENGTH;
237       }
238       if (vLength == prevCell.getValueLength()) {
239         flag |= FLAG_SAME_VALUE_LENGTH;
240       }
241       if (cell.getTypeByte() == prevCell.getTypeByte()) {
242         flag |= FLAG_SAME_TYPE;
243       }
244       // don't compress timestamp and type using prefix encode timestamp
245       timestamp = cell.getTimestamp();
246       diffTimestamp = prevCell.getTimestamp() - timestamp;
247       boolean negativeTimestamp = timestamp < 0;
248       if (negativeTimestamp) {
249         timestamp = -timestamp;
250       }
251       timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp);
252       boolean minusDiffTimestamp = diffTimestamp < 0;
253       if (minusDiffTimestamp) {
254         diffTimestamp = -diffTimestamp;
255       }
256       diffTimestampFitsInBytes = ByteBufferUtils.longFitsIn(diffTimestamp);
257       if (diffTimestampFitsInBytes < timestampFitsInBytes) {
258         flag |= (diffTimestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
259         flag |= FLAG_TIMESTAMP_IS_DIFF;
260         if (minusDiffTimestamp) {
261           flag |= FLAG_TIMESTAMP_SIGN;
262         }
263       } else {
264         flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
265         if (negativeTimestamp) {
266           flag |= FLAG_TIMESTAMP_SIGN;
267         }
268       }
269     }
270     out.write(flag);
271     if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
272       ByteBufferUtils.putCompressedInt(out, kLength);
273     }
274     if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
275       ByteBufferUtils.putCompressedInt(out, vLength);
276     }
277     ByteBufferUtils.putCompressedInt(out, commonPrefix);
278     short rLen = cell.getRowLength();
279     if (commonPrefix < rLen + KeyValue.ROW_LENGTH_SIZE) {
280       // Previous and current rows are different. Copy the differing part of
281       // the row, skip the column family, and copy the qualifier.
282       CellUtil.writeRowKeyExcludingCommon(cell, rLen, commonPrefix, out);
283       out.write(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
284     } else {
285       // The common part includes the whole row. As the column family is the
286       // same across the whole file, it will automatically be included in the
287       // common prefix, so we need not special-case it here.
288       // What we write here is the non common part of the qualifier
289       int commonQualPrefix = commonPrefix - (rLen + KeyValue.ROW_LENGTH_SIZE)
290           - (cell.getFamilyLength() + KeyValue.FAMILY_LENGTH_SIZE);
291       out.write(cell.getQualifierArray(), cell.getQualifierOffset() + commonQualPrefix,
292           cell.getQualifierLength() - commonQualPrefix);
293     }
294     if ((flag & FLAG_TIMESTAMP_IS_DIFF) == 0) {
295       ByteBufferUtils.putLong(out, timestamp, timestampFitsInBytes);
296     } else {
297       ByteBufferUtils.putLong(out, diffTimestamp, diffTimestampFitsInBytes);
298     }
299 
300     if ((flag & FLAG_SAME_TYPE) == 0) {
301       out.write(cell.getTypeByte());
302     }
303     out.write(cell.getValueArray(), cell.getValueOffset(), vLength);
304     return kLength + vLength + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE;
305   }
306 
307   @Override
308   public ByteBuffer getFirstKeyInBlock(ByteBuffer block) {
309     block.mark();
310     block.position(Bytes.SIZEOF_INT);
311     byte familyLength = block.get();
312     ByteBufferUtils.skip(block, familyLength);
313     byte flag = block.get();
314     int keyLength = ByteBufferUtils.readCompressedInt(block);
315     ByteBufferUtils.readCompressedInt(block); // valueLength
316     ByteBufferUtils.readCompressedInt(block); // commonLength
317     ByteBuffer result = ByteBuffer.allocate(keyLength);
318 
319     // copy row
320     int pos = result.arrayOffset();
321     block.get(result.array(), pos, Bytes.SIZEOF_SHORT);
322     pos += Bytes.SIZEOF_SHORT;
323     short rowLength = result.getShort();
324     block.get(result.array(), pos, rowLength);
325     pos += rowLength;
326 
327     // copy family
328     int savePosition = block.position();
329     block.position(Bytes.SIZEOF_INT);
330     block.get(result.array(), pos, familyLength + Bytes.SIZEOF_BYTE);
331     pos += familyLength + Bytes.SIZEOF_BYTE;
332 
333     // copy qualifier
334     block.position(savePosition);
335     int qualifierLength =
336         keyLength - pos + result.arrayOffset() - KeyValue.TIMESTAMP_TYPE_SIZE;
337     block.get(result.array(), pos, qualifierLength);
338     pos += qualifierLength;
339 
340     // copy the timestamp and type
341     int timestampFitInBytes =
342         ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH) + 1;
343     long timestamp = ByteBufferUtils.readLong(block, timestampFitInBytes);
344     if ((flag & FLAG_TIMESTAMP_SIGN) != 0) {
345       timestamp = -timestamp;
346     }
347     result.putLong(pos, timestamp);
348     pos += Bytes.SIZEOF_LONG;
349     block.get(result.array(), pos, Bytes.SIZEOF_BYTE);
350 
351     block.reset();
352     return result;
353   }
354 
355   @Override
356   public String toString() {
357     return DiffKeyDeltaEncoder.class.getSimpleName();
358   }
359 
360   protected static class DiffSeekerState extends SeekerState {
361     private int rowLengthWithSize;
362     private long timestamp;
363 
364     @Override
365     protected void copyFromNext(SeekerState that) {
366       super.copyFromNext(that);
367       DiffSeekerState other = (DiffSeekerState) that;
368       rowLengthWithSize = other.rowLengthWithSize;
369       timestamp = other.timestamp;
370     }
371   }
372 
373   @Override
374   public EncodedSeeker createSeeker(KVComparator comparator,
375       HFileBlockDecodingContext decodingCtx) {
376     return new BufferedEncodedSeeker<DiffSeekerState>(comparator, decodingCtx) {
377       private byte[] familyNameWithSize;
378       private static final int TIMESTAMP_WITH_TYPE_LENGTH =
379           Bytes.SIZEOF_LONG + Bytes.SIZEOF_BYTE;
380 
381       private void decode(boolean isFirst) {
382         byte flag = currentBuffer.get();
383         byte type = 0;
384         if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
385           if (!isFirst) {
386             type = current.keyBuffer[current.keyLength - Bytes.SIZEOF_BYTE];
387           }
388           current.keyLength = ByteBufferUtils.readCompressedInt(currentBuffer);
389         }
390         if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
391           current.valueLength =
392               ByteBufferUtils.readCompressedInt(currentBuffer);
393         }
394         current.lastCommonPrefix =
395             ByteBufferUtils.readCompressedInt(currentBuffer);
396 
397         current.ensureSpaceForKey();
398 
399         if (current.lastCommonPrefix < Bytes.SIZEOF_SHORT) {
400           // length of row is different, copy everything except family
401 
402           // copy the row size
403           currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
404               Bytes.SIZEOF_SHORT - current.lastCommonPrefix);
405           current.rowLengthWithSize = Bytes.toShort(current.keyBuffer, 0) +
406               Bytes.SIZEOF_SHORT;
407 
408           // copy the rest of row
409           currentBuffer.get(current.keyBuffer, Bytes.SIZEOF_SHORT,
410               current.rowLengthWithSize - Bytes.SIZEOF_SHORT);
411 
412           // copy the column family
413           System.arraycopy(familyNameWithSize, 0, current.keyBuffer,
414               current.rowLengthWithSize, familyNameWithSize.length);
415 
416           // copy the qualifier
417           currentBuffer.get(current.keyBuffer,
418               current.rowLengthWithSize + familyNameWithSize.length,
419               current.keyLength - current.rowLengthWithSize -
420               familyNameWithSize.length - TIMESTAMP_WITH_TYPE_LENGTH);
421         } else if (current.lastCommonPrefix < current.rowLengthWithSize) {
422           // we have to copy part of row and qualifier,
423           // but column family is in right place
424 
425           // before column family (rest of row)
426           currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
427               current.rowLengthWithSize - current.lastCommonPrefix);
428 
429           // after column family (qualifier)
430           currentBuffer.get(current.keyBuffer,
431               current.rowLengthWithSize + familyNameWithSize.length,
432               current.keyLength - current.rowLengthWithSize -
433               familyNameWithSize.length - TIMESTAMP_WITH_TYPE_LENGTH);
434         } else {
435           // copy just the ending
436           currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
437               current.keyLength - TIMESTAMP_WITH_TYPE_LENGTH -
438               current.lastCommonPrefix);
439         }
440 
441         // timestamp
442         int pos = current.keyLength - TIMESTAMP_WITH_TYPE_LENGTH;
443         int timestampFitInBytes = 1 +
444             ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH);
445         long timestampOrDiff =
446             ByteBufferUtils.readLong(currentBuffer, timestampFitInBytes);
447         if ((flag & FLAG_TIMESTAMP_SIGN) != 0) {
448           timestampOrDiff = -timestampOrDiff;
449         }
450         if ((flag & FLAG_TIMESTAMP_IS_DIFF) == 0) { // it is timestamp
451           current.timestamp = timestampOrDiff;
452         } else { // it is diff
453           current.timestamp = current.timestamp - timestampOrDiff;
454         }
455         Bytes.putLong(current.keyBuffer, pos, current.timestamp);
456         pos += Bytes.SIZEOF_LONG;
457 
458         // type
459         if ((flag & FLAG_SAME_TYPE) == 0) {
460           currentBuffer.get(current.keyBuffer, pos, Bytes.SIZEOF_BYTE);
461         } else if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
462           current.keyBuffer[pos] = type;
463         }
464 
465         current.valueOffset = currentBuffer.position();
466         ByteBufferUtils.skip(currentBuffer, current.valueLength);
467 
468         if (includesTags()) {
469           decodeTags();
470         }
471         if (includesMvcc()) {
472           current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
473         } else {
474           current.memstoreTS = 0;
475         }
476         current.nextKvOffset = currentBuffer.position();
477       }
478 
479       @Override
480       protected void decodeFirst() {
481         ByteBufferUtils.skip(currentBuffer, Bytes.SIZEOF_INT);
482 
483         // read column family
484         byte familyNameLength = currentBuffer.get();
485         familyNameWithSize = new byte[familyNameLength + Bytes.SIZEOF_BYTE];
486         familyNameWithSize[0] = familyNameLength;
487         currentBuffer.get(familyNameWithSize, Bytes.SIZEOF_BYTE,
488             familyNameLength);
489         decode(true);
490       }
491 
492       @Override
493       protected void decodeNext() {
494         decode(false);
495       }
496 
497       @Override
498       protected DiffSeekerState createSeekerState() {
499         return new DiffSeekerState();
500       }
501     };
502   }
503 
504   @Override
505   protected ByteBuffer internalDecodeKeyValues(DataInputStream source, int allocateHeaderLength,
506       int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx) throws IOException {
507     int decompressedSize = source.readInt();
508     ByteBuffer buffer = ByteBuffer.allocate(decompressedSize +
509         allocateHeaderLength);
510     buffer.position(allocateHeaderLength);
511     DiffCompressionState state = new DiffCompressionState();
512     while (source.available() > skipLastBytes) {
513       uncompressSingleKeyValue(source, buffer, state);
514       afterDecodingKeyValue(source, buffer, decodingCtx);
515     }
516 
517     if (source.available() != skipLastBytes) {
518       throw new IllegalStateException("Read too much bytes.");
519     }
520 
521     return buffer;
522   }
523 }