View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations
15   * under the License.
16   */
17  package org.apache.hadoop.hbase.io.encoding;
18  
19  import java.io.DataInputStream;
20  import java.io.DataOutputStream;
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  
24  import org.apache.hadoop.hbase.Cell;
25  import org.apache.hadoop.hbase.CellComparator;
26  import org.apache.hadoop.hbase.CellUtil;
27  import org.apache.hadoop.hbase.KeyValue;
28  import org.apache.hadoop.hbase.KeyValueUtil;
29  import org.apache.hadoop.hbase.classification.InterfaceAudience;
30  import org.apache.hadoop.hbase.nio.ByteBuff;
31  import org.apache.hadoop.hbase.util.ByteBufferUtils;
32  import org.apache.hadoop.hbase.util.Bytes;
33  import org.apache.hadoop.hbase.util.Pair;
34  
35  /**
36   * Encoder similar to {@link DiffKeyDeltaEncoder} but supposedly faster.
37   *
38   * Compress using:
39   * - store size of common prefix
40   * - save column family once in the first KeyValue
41   * - use integer compression for key, value and prefix (7-bit encoding)
42   * - use bits to avoid duplication key length, value length
43   *   and type if it same as previous
44   * - store in 3 bits length of prefix timestamp
45   *    with previous KeyValue's timestamp
46   * - one bit which allow to omit value if it is the same
47   *
48   * Format:
49   * - 1 byte:    flag
50   * - 1-5 bytes: key length (only if FLAG_SAME_KEY_LENGTH is not set in flag)
51   * - 1-5 bytes: value length (only if FLAG_SAME_VALUE_LENGTH is not set in flag)
52   * - 1-5 bytes: prefix length
53   * - ... bytes: rest of the row (if prefix length is small enough)
54   * - ... bytes: qualifier (or suffix depending on prefix length)
55   * - 1-8 bytes: timestamp suffix
56   * - 1 byte:    type (only if FLAG_SAME_TYPE is not set in the flag)
57   * - ... bytes: value (only if FLAG_SAME_VALUE is not set in the flag)
58   *
59   */
60  @InterfaceAudience.Private
61  public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder {
62    final int MASK_TIMESTAMP_LENGTH = (1 << 0) | (1 << 1) | (1 << 2);
63    final int SHIFT_TIMESTAMP_LENGTH = 0;
64    final int FLAG_SAME_KEY_LENGTH = 1 << 3;
65    final int FLAG_SAME_VALUE_LENGTH = 1 << 4;
66    final int FLAG_SAME_TYPE = 1 << 5;
67    final int FLAG_SAME_VALUE = 1 << 6;
68  
69    private static class FastDiffCompressionState extends CompressionState {
70      byte[] timestamp = new byte[KeyValue.TIMESTAMP_SIZE];
71      int prevTimestampOffset;
72  
73      @Override
74      protected void readTimestamp(ByteBuffer in) {
75        in.get(timestamp);
76      }
77  
78      @Override
79      void copyFrom(CompressionState state) {
80        super.copyFrom(state);
81        FastDiffCompressionState state2 = (FastDiffCompressionState) state;
82        System.arraycopy(state2.timestamp, 0, timestamp, 0,
83            KeyValue.TIMESTAMP_SIZE);
84        prevTimestampOffset = state2.prevTimestampOffset;
85      }
86  
87      /**
88       * Copies the first key/value from the given stream, and initializes
89       * decompression state based on it. Assumes that we have already read key
90       * and value lengths. Does not set {@link #qualifierLength} (not used by
91       * decompression) or {@link #prevOffset} (set by the calle afterwards).
92       */
93      private void decompressFirstKV(ByteBuffer out, DataInputStream in)
94          throws IOException {
95        int kvPos = out.position();
96        out.putInt(keyLength);
97        out.putInt(valueLength);
98        prevTimestampOffset = out.position() + keyLength -
99            KeyValue.TIMESTAMP_TYPE_SIZE;
100       ByteBufferUtils.copyFromStreamToBuffer(out, in, keyLength + valueLength);
101       rowLength = out.getShort(kvPos + KeyValue.ROW_OFFSET);
102       familyLength = out.get(kvPos + KeyValue.ROW_OFFSET +
103           KeyValue.ROW_LENGTH_SIZE + rowLength);
104       type = out.get(prevTimestampOffset + KeyValue.TIMESTAMP_SIZE);
105     }
106 
107   }
108 
109   private int findCommonTimestampPrefix(byte[] curTsBuf, byte[] prevTsBuf) {
110     int commonPrefix = 0;
111     while (commonPrefix < (KeyValue.TIMESTAMP_SIZE - 1)
112         && curTsBuf[commonPrefix] == prevTsBuf[commonPrefix]) {
113       commonPrefix++;
114     }
115     return commonPrefix; // has to be at most 7 bytes
116   }
117 
118   private void uncompressSingleKeyValue(DataInputStream source,
119       ByteBuffer out, FastDiffCompressionState state)
120           throws IOException, EncoderBufferTooSmallException {
121     byte flag = source.readByte();
122     int prevKeyLength = state.keyLength;
123 
124     if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
125       state.keyLength = ByteBufferUtils.readCompressedInt(source);
126     }
127     if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
128       state.valueLength = ByteBufferUtils.readCompressedInt(source);
129     }
130     int commonLength = ByteBufferUtils.readCompressedInt(source);
131 
132     ensureSpace(out, state.keyLength + state.valueLength + KeyValue.ROW_OFFSET);
133 
134     int kvPos = out.position();
135 
136     if (!state.isFirst()) {
137       // copy the prefix
138       int common;
139       int prevOffset;
140 
141       if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
142         out.putInt(state.keyLength);
143         out.putInt(state.valueLength);
144         prevOffset = state.prevOffset + KeyValue.ROW_OFFSET;
145         common = commonLength;
146       } else {
147         if ((flag & FLAG_SAME_KEY_LENGTH) != 0) {
148           prevOffset = state.prevOffset;
149           common = commonLength + KeyValue.ROW_OFFSET;
150         } else {
151           out.putInt(state.keyLength);
152           prevOffset = state.prevOffset + KeyValue.KEY_LENGTH_SIZE;
153           common = commonLength + KeyValue.KEY_LENGTH_SIZE;
154         }
155       }
156 
157       ByteBufferUtils.copyFromBufferToBuffer(out, out, prevOffset, common);
158 
159       // copy the rest of the key from the buffer
160       int keyRestLength;
161       if (commonLength < state.rowLength + KeyValue.ROW_LENGTH_SIZE) {
162         // omit the family part of the key, it is always the same
163         int rowWithSizeLength;
164         int rowRestLength;
165 
166         // check length of row
167         if (commonLength < KeyValue.ROW_LENGTH_SIZE) {
168           // not yet copied, do it now
169           ByteBufferUtils.copyFromStreamToBuffer(out, source,
170               KeyValue.ROW_LENGTH_SIZE - commonLength);
171 
172           rowWithSizeLength = out.getShort(out.position() -
173               KeyValue.ROW_LENGTH_SIZE) + KeyValue.ROW_LENGTH_SIZE;
174           rowRestLength = rowWithSizeLength - KeyValue.ROW_LENGTH_SIZE;
175         } else {
176           // already in kvBuffer, just read it
177           rowWithSizeLength = out.getShort(kvPos + KeyValue.ROW_OFFSET) +
178               KeyValue.ROW_LENGTH_SIZE;
179           rowRestLength = rowWithSizeLength - commonLength;
180         }
181 
182         // copy the rest of row
183         ByteBufferUtils.copyFromStreamToBuffer(out, source, rowRestLength);
184 
185         // copy the column family
186         ByteBufferUtils.copyFromBufferToBuffer(out, out,
187             state.prevOffset + KeyValue.ROW_OFFSET + KeyValue.ROW_LENGTH_SIZE
188                 + state.rowLength, state.familyLength
189                 + KeyValue.FAMILY_LENGTH_SIZE);
190         state.rowLength = (short) (rowWithSizeLength -
191             KeyValue.ROW_LENGTH_SIZE);
192 
193         keyRestLength = state.keyLength - rowWithSizeLength -
194             state.familyLength -
195             (KeyValue.FAMILY_LENGTH_SIZE + KeyValue.TIMESTAMP_TYPE_SIZE);
196       } else {
197         // prevRowWithSizeLength is the same as on previous row
198         keyRestLength = state.keyLength - commonLength -
199             KeyValue.TIMESTAMP_TYPE_SIZE;
200       }
201       // copy the rest of the key, after column family == column qualifier
202       ByteBufferUtils.copyFromStreamToBuffer(out, source, keyRestLength);
203 
204       // copy timestamp
205       int prefixTimestamp =
206           (flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH;
207       ByteBufferUtils.copyFromBufferToBuffer(out, out,
208           state.prevTimestampOffset, prefixTimestamp);
209       state.prevTimestampOffset = out.position() - prefixTimestamp;
210       ByteBufferUtils.copyFromStreamToBuffer(out, source,
211           KeyValue.TIMESTAMP_SIZE - prefixTimestamp);
212 
213       // copy the type and value
214       if ((flag & FLAG_SAME_TYPE) != 0) {
215         out.put(state.type);
216         if ((flag & FLAG_SAME_VALUE) != 0) {
217           ByteBufferUtils.copyFromBufferToBuffer(out, out, state.prevOffset +
218               KeyValue.ROW_OFFSET + prevKeyLength, state.valueLength);
219         } else {
220           ByteBufferUtils.copyFromStreamToBuffer(out, source,
221               state.valueLength);
222         }
223       } else {
224         if ((flag & FLAG_SAME_VALUE) != 0) {
225           ByteBufferUtils.copyFromStreamToBuffer(out, source,
226               KeyValue.TYPE_SIZE);
227           ByteBufferUtils.copyFromBufferToBuffer(out, out, state.prevOffset +
228               KeyValue.ROW_OFFSET + prevKeyLength, state.valueLength);
229         } else {
230           ByteBufferUtils.copyFromStreamToBuffer(out, source,
231               state.valueLength + KeyValue.TYPE_SIZE);
232         }
233         state.type = out.get(state.prevTimestampOffset +
234             KeyValue.TIMESTAMP_SIZE);
235       }
236     } else { // this is the first element
237       state.decompressFirstKV(out, source);
238     }
239 
240     state.prevOffset = kvPos;
241   }
242 
243   @Override
244   public int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingContext,
245       DataOutputStream out) throws IOException {
246     EncodingState state = encodingContext.getEncodingState();
247     int size = compressSingleKeyValue(out, cell, state.prevCell);
248     size += afterEncodingKeyValue(cell, out, encodingContext);
249     state.prevCell = cell;
250     return size;
251   }
252 
253   private int compressSingleKeyValue(DataOutputStream out, Cell cell, Cell prevCell)
254       throws IOException {
255     byte flag = 0;
256     int kLength = KeyValueUtil.keyLength(cell);
257     int vLength = cell.getValueLength();
258 
259     if (prevCell == null) {
260       // copy the key, there is no common prefix with none
261       out.write(flag);
262       ByteBufferUtils.putCompressedInt(out, kLength);
263       ByteBufferUtils.putCompressedInt(out, vLength);
264       ByteBufferUtils.putCompressedInt(out, 0);
265       CellUtil.writeFlatKey(cell, out);
266       // Write the value part
267       out.write(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
268     } else {
269       int preKeyLength = KeyValueUtil.keyLength(prevCell);
270       int preValLength = prevCell.getValueLength();
271       // find a common prefix and skip it
272       int commonPrefix = CellUtil.findCommonPrefixInFlatKey(cell, prevCell, true, false);
273 
274       if (kLength == preKeyLength) {
275         flag |= FLAG_SAME_KEY_LENGTH;
276       }
277       if (vLength == prevCell.getValueLength()) {
278         flag |= FLAG_SAME_VALUE_LENGTH;
279       }
280       if (cell.getTypeByte() == prevCell.getTypeByte()) {
281         flag |= FLAG_SAME_TYPE;
282       }
283 
284       byte[] curTsBuf = Bytes.toBytes(cell.getTimestamp());
285       int commonTimestampPrefix = findCommonTimestampPrefix(curTsBuf,
286           Bytes.toBytes(prevCell.getTimestamp()));
287 
288       flag |= commonTimestampPrefix << SHIFT_TIMESTAMP_LENGTH;
289 
290       // Check if current and previous values are the same. Compare value
291       // length first as an optimization.
292       if (vLength == preValLength
293           && Bytes.equals(cell.getValueArray(), cell.getValueOffset(), vLength,
294               prevCell.getValueArray(), prevCell.getValueOffset(), preValLength)) {
295         flag |= FLAG_SAME_VALUE;
296       }
297 
298       out.write(flag);
299       if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
300         ByteBufferUtils.putCompressedInt(out, kLength);
301       }
302       if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
303         ByteBufferUtils.putCompressedInt(out, vLength);
304       }
305       ByteBufferUtils.putCompressedInt(out, commonPrefix);
306       short rLen = cell.getRowLength();
307       if (commonPrefix < rLen + KeyValue.ROW_LENGTH_SIZE) {
308         // Previous and current rows are different. Copy the differing part of
309         // the row, skip the column family, and copy the qualifier.
310         CellUtil.writeRowKeyExcludingCommon(cell, rLen, commonPrefix, out);
311         out.write(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
312       } else {
313         // The common part includes the whole row. As the column family is the
314         // same across the whole file, it will automatically be included in the
315         // common prefix, so we need not special-case it here.
316         // What we write here is the non common part of the qualifier
317         int commonQualPrefix = commonPrefix - (rLen + KeyValue.ROW_LENGTH_SIZE)
318             - (cell.getFamilyLength() + KeyValue.FAMILY_LENGTH_SIZE);
319         out.write(cell.getQualifierArray(), cell.getQualifierOffset() + commonQualPrefix,
320             cell.getQualifierLength() - commonQualPrefix);
321       }
322       // Write non common ts part
323       out.write(curTsBuf, commonTimestampPrefix, KeyValue.TIMESTAMP_SIZE - commonTimestampPrefix);
324 
325       // Write the type if it is not the same as before.
326       if ((flag & FLAG_SAME_TYPE) == 0) {
327         out.write(cell.getTypeByte());
328       }
329 
330       // Write the value if it is not the same as before.
331       if ((flag & FLAG_SAME_VALUE) == 0) {
332         out.write(cell.getValueArray(), cell.getValueOffset(), vLength);
333       }
334     }
335     return kLength + vLength + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE;
336   }
337 
338   @Override
339   protected ByteBuffer internalDecodeKeyValues(DataInputStream source, int allocateHeaderLength,
340       int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx) throws IOException {
341     int decompressedSize = source.readInt();
342     ByteBuffer buffer = ByteBuffer.allocate(decompressedSize +
343         allocateHeaderLength);
344     buffer.position(allocateHeaderLength);
345     FastDiffCompressionState state = new FastDiffCompressionState();
346     while (source.available() > skipLastBytes) {
347       uncompressSingleKeyValue(source, buffer, state);
348       afterDecodingKeyValue(source, buffer, decodingCtx);
349     }
350 
351     if (source.available() != skipLastBytes) {
352       throw new IllegalStateException("Read too much bytes.");
353     }
354 
355     return buffer;
356   }
357 
358   @Override
359   public Cell getFirstKeyCellInBlock(ByteBuff block) {
360     block.mark();
361     block.position(Bytes.SIZEOF_INT + Bytes.SIZEOF_BYTE);
362     int keyLength = ByteBuff.readCompressedInt(block);
363     // TODO : See if we can avoid these reads as the read values are not getting used
364     ByteBuff.readCompressedInt(block); // valueLength
365     ByteBuff.readCompressedInt(block); // commonLength
366     ByteBuffer key = block.asSubByteBuffer(keyLength).duplicate();
367     block.reset();
368     return createFirstKeyCell(key, keyLength);
369   }
370 
371   @Override
372   public String toString() {
373     return FastDiffDeltaEncoder.class.getSimpleName();
374   }
375 
376   protected static class FastDiffSeekerState extends SeekerState {
377     private byte[] prevTimestampAndType =
378         new byte[KeyValue.TIMESTAMP_TYPE_SIZE];
379     private int rowLengthWithSize;
380     private int familyLengthWithSize;
381 
382     public FastDiffSeekerState(Pair<ByteBuffer, Integer> tmpPair, boolean includeTags) {
383       super(tmpPair, includeTags);
384     }
385 
386     @Override
387     protected void copyFromNext(SeekerState that) {
388       super.copyFromNext(that);
389       FastDiffSeekerState other = (FastDiffSeekerState) that;
390       System.arraycopy(other.prevTimestampAndType, 0,
391           prevTimestampAndType, 0,
392           KeyValue.TIMESTAMP_TYPE_SIZE);
393       rowLengthWithSize = other.rowLengthWithSize;
394       familyLengthWithSize = other.familyLengthWithSize;
395     }
396   }
397 
398   @Override
399   public EncodedSeeker createSeeker(CellComparator comparator,
400       final HFileBlockDecodingContext decodingCtx) {
401     return new BufferedEncodedSeeker<FastDiffSeekerState>(comparator, decodingCtx) {
402       private void decode(boolean isFirst) {
403         byte flag = currentBuffer.get();
404         if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
405           if (!isFirst) {
406             System.arraycopy(current.keyBuffer,
407                 current.keyLength - current.prevTimestampAndType.length,
408                 current.prevTimestampAndType, 0,
409                 current.prevTimestampAndType.length);
410           }
411           current.keyLength = ByteBuff.readCompressedInt(currentBuffer);
412         }
413         if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
414           current.valueLength = ByteBuff.readCompressedInt(currentBuffer);
415         }
416         current.lastCommonPrefix = ByteBuff.readCompressedInt(currentBuffer);
417 
418         current.ensureSpaceForKey();
419 
420         if (isFirst) {
421           // copy everything
422           currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
423               current.keyLength - current.prevTimestampAndType.length);
424           current.rowLengthWithSize = Bytes.toShort(current.keyBuffer, 0) +
425               Bytes.SIZEOF_SHORT;
426           current.familyLengthWithSize =
427               current.keyBuffer[current.rowLengthWithSize] + Bytes.SIZEOF_BYTE;
428         } else if (current.lastCommonPrefix < Bytes.SIZEOF_SHORT) {
429           // length of row is different, copy everything except family
430 
431           // copy the row size
432           int oldRowLengthWithSize = current.rowLengthWithSize;
433           currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
434               Bytes.SIZEOF_SHORT - current.lastCommonPrefix);
435           current.rowLengthWithSize = Bytes.toShort(current.keyBuffer, 0) +
436               Bytes.SIZEOF_SHORT;
437 
438           // move the column family
439           System.arraycopy(current.keyBuffer, oldRowLengthWithSize,
440               current.keyBuffer, current.rowLengthWithSize,
441               current.familyLengthWithSize);
442 
443           // copy the rest of row
444           currentBuffer.get(current.keyBuffer, Bytes.SIZEOF_SHORT,
445               current.rowLengthWithSize - Bytes.SIZEOF_SHORT);
446 
447           // copy the qualifier
448           currentBuffer.get(current.keyBuffer, current.rowLengthWithSize
449               + current.familyLengthWithSize, current.keyLength
450               - current.rowLengthWithSize - current.familyLengthWithSize
451               - current.prevTimestampAndType.length);
452         } else if (current.lastCommonPrefix < current.rowLengthWithSize) {
453           // We have to copy part of row and qualifier, but the column family
454           // is in the right place.
455 
456           // before column family (rest of row)
457           currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
458               current.rowLengthWithSize - current.lastCommonPrefix);
459 
460           // after column family (qualifier)
461           currentBuffer.get(current.keyBuffer, current.rowLengthWithSize
462               + current.familyLengthWithSize, current.keyLength
463               - current.rowLengthWithSize - current.familyLengthWithSize
464               - current.prevTimestampAndType.length);
465         } else {
466           // copy just the ending
467           currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
468               current.keyLength - current.prevTimestampAndType.length
469                   - current.lastCommonPrefix);
470         }
471 
472         // timestamp
473         int pos = current.keyLength - current.prevTimestampAndType.length;
474         int commonTimestampPrefix = (flag & MASK_TIMESTAMP_LENGTH) >>>
475           SHIFT_TIMESTAMP_LENGTH;
476         if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
477           System.arraycopy(current.prevTimestampAndType, 0, current.keyBuffer,
478               pos, commonTimestampPrefix);
479         }
480         pos += commonTimestampPrefix;
481         currentBuffer.get(current.keyBuffer, pos,
482             Bytes.SIZEOF_LONG - commonTimestampPrefix);
483         pos += Bytes.SIZEOF_LONG - commonTimestampPrefix;
484 
485         // type
486         if ((flag & FLAG_SAME_TYPE) == 0) {
487           currentBuffer.get(current.keyBuffer, pos, Bytes.SIZEOF_BYTE);
488         } else if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
489           current.keyBuffer[pos] =
490               current.prevTimestampAndType[Bytes.SIZEOF_LONG];
491         }
492 
493         // handle value
494         if ((flag & FLAG_SAME_VALUE) == 0) {
495           current.valueOffset = currentBuffer.position();
496           currentBuffer.skip(current.valueLength);
497         }
498 
499         if (includesTags()) {
500           decodeTags();
501         }
502         if (includesMvcc()) {
503           current.memstoreTS = ByteBuff.readVLong(currentBuffer);
504         } else {
505           current.memstoreTS = 0;
506         }
507         current.nextKvOffset = currentBuffer.position();
508       }
509 
510       @Override
511       protected void decodeFirst() {
512         currentBuffer.skip(Bytes.SIZEOF_INT);
513         decode(true);
514       }
515 
516       @Override
517       protected void decodeNext() {
518         decode(false);
519       }
520 
521       @Override
522       protected FastDiffSeekerState createSeekerState() {
523         return new FastDiffSeekerState(this.tmpPair, this.includesTags());
524       }
525     };
526   }
527 }