View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations
15   * under the License.
16   */
17  package org.apache.hadoop.hbase.io.encoding;
18  
19  import java.io.DataInputStream;
20  import java.io.DataOutputStream;
21  import java.io.IOException;
22  import java.io.OutputStream;
23  import java.nio.ByteBuffer;
24  
25  import org.apache.hadoop.hbase.KeyValue;
26  import org.apache.hadoop.hbase.util.ByteBufferUtils;
27  import org.apache.hadoop.hbase.util.Bytes;
28  import org.apache.hadoop.io.RawComparator;
29  
30  import org.apache.hadoop.classification.InterfaceAudience;
31  
32  /**
33   * Encoder similar to {@link DiffKeyDeltaEncoder} but supposedly faster.
34   *
35   * Compress using:
36   * - store size of common prefix
37   * - save column family once in the first KeyValue
38   * - use integer compression for key, value and prefix (7-bit encoding)
39   * - use bits to avoid duplication key length, value length
40   *   and type if it same as previous
41   * - store in 3 bits length of prefix timestamp
42   *    with previous KeyValue's timestamp
43   * - one bit which allow to omit value if it is the same
44   *
45   * Format:
46   * - 1 byte:    flag
47   * - 1-5 bytes: key length (only if FLAG_SAME_KEY_LENGTH is not set in flag)
48   * - 1-5 bytes: value length (only if FLAG_SAME_VALUE_LENGTH is not set in flag)
49   * - 1-5 bytes: prefix length
50   * - ... bytes: rest of the row (if prefix length is small enough)
51   * - ... bytes: qualifier (or suffix depending on prefix length)
52   * - 1-8 bytes: timestamp suffix
53   * - 1 byte:    type (only if FLAG_SAME_TYPE is not set in the flag)
54   * - ... bytes: value (only if FLAG_SAME_VALUE is not set in the flag)
55   *
56   */
57  @InterfaceAudience.Private
58  public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder {
59    final int MASK_TIMESTAMP_LENGTH = (1 << 0) | (1 << 1) | (1 << 2);
60    final int SHIFT_TIMESTAMP_LENGTH = 0;
61    final int FLAG_SAME_KEY_LENGTH = 1 << 3;
62    final int FLAG_SAME_VALUE_LENGTH = 1 << 4;
63    final int FLAG_SAME_TYPE = 1 << 5;
64    final int FLAG_SAME_VALUE = 1 << 6;
65  
66    private static class FastDiffCompressionState extends CompressionState {
67      byte[] timestamp = new byte[KeyValue.TIMESTAMP_SIZE];
68      int prevTimestampOffset;
69  
70      @Override
71      protected void readTimestamp(ByteBuffer in) {
72        in.get(timestamp);
73      }
74  
75      @Override
76      void copyFrom(CompressionState state) {
77        super.copyFrom(state);
78        FastDiffCompressionState state2 = (FastDiffCompressionState) state;
79        System.arraycopy(state2.timestamp, 0, timestamp, 0,
80            KeyValue.TIMESTAMP_SIZE);
81        prevTimestampOffset = state2.prevTimestampOffset;
82      }
83  
84      /**
85       * Copies the first key/value from the given stream, and initializes
86       * decompression state based on it. Assumes that we have already read key
87       * and value lengths. Does not set {@link #qualifierLength} (not used by
88       * decompression) or {@link #prevOffset} (set by the calle afterwards).
89       */
90      private void decompressFirstKV(ByteBuffer out, DataInputStream in)
91          throws IOException {
92        int kvPos = out.position();
93        out.putInt(keyLength);
94        out.putInt(valueLength);
95        prevTimestampOffset = out.position() + keyLength -
96            KeyValue.TIMESTAMP_TYPE_SIZE;
97        ByteBufferUtils.copyFromStreamToBuffer(out, in, keyLength + valueLength);
98        rowLength = out.getShort(kvPos + KeyValue.ROW_OFFSET);
99        familyLength = out.get(kvPos + KeyValue.ROW_OFFSET +
100           KeyValue.ROW_LENGTH_SIZE + rowLength);
101       type = out.get(prevTimestampOffset + KeyValue.TIMESTAMP_SIZE);
102     }
103 
104   }
105 
106   private void compressSingleKeyValue(
107         FastDiffCompressionState previousState,
108         FastDiffCompressionState currentState,
109         OutputStream out, ByteBuffer in) throws IOException {
110     currentState.prevOffset = in.position();
111     int keyLength = in.getInt();
112     int valueOffset =
113         currentState.prevOffset + keyLength + KeyValue.ROW_OFFSET;
114     int valueLength = in.getInt();
115     byte flag = 0;
116 
117     if (previousState.isFirst()) {
118       // copy the key, there is no common prefix with none
119       out.write(flag);
120       ByteBufferUtils.putCompressedInt(out, keyLength);
121       ByteBufferUtils.putCompressedInt(out, valueLength);
122       ByteBufferUtils.putCompressedInt(out, 0);
123 
124       currentState.readKey(in, keyLength, valueLength);
125 
126       ByteBufferUtils.moveBufferToStream(out, in, keyLength + valueLength);
127     } else {
128       // find a common prefix and skip it
129       int commonPrefix = ByteBufferUtils.findCommonPrefix(in, in.position(),
130           previousState.prevOffset + KeyValue.ROW_OFFSET,
131           Math.min(keyLength, previousState.keyLength) -
132           KeyValue.TIMESTAMP_TYPE_SIZE);
133 
134       currentState.readKey(in, keyLength, valueLength,
135           commonPrefix, previousState);
136 
137       if (keyLength == previousState.keyLength) {
138         flag |= FLAG_SAME_KEY_LENGTH;
139       }
140       if (valueLength == previousState.valueLength) {
141         flag |= FLAG_SAME_VALUE_LENGTH;
142       }
143       if (currentState.type == previousState.type) {
144         flag |= FLAG_SAME_TYPE;
145       }
146 
147       int commonTimestampPrefix = findCommonTimestampPrefix(
148           currentState, previousState);
149       flag |= commonTimestampPrefix << SHIFT_TIMESTAMP_LENGTH;
150 
151       // Check if current and previous values are the same. Compare value
152       // length first as an optimization.
153       if (valueLength == previousState.valueLength) {
154         int previousValueOffset = previousState.prevOffset
155             + previousState.keyLength + KeyValue.ROW_OFFSET;
156         if (ByteBufferUtils.arePartsEqual(in,
157                 previousValueOffset, previousState.valueLength,
158                 valueOffset, valueLength)) {
159           flag |= FLAG_SAME_VALUE;
160         }
161       }
162 
163       out.write(flag);
164       if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
165         ByteBufferUtils.putCompressedInt(out, keyLength);
166       }
167       if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
168         ByteBufferUtils.putCompressedInt(out, valueLength);
169       }
170       ByteBufferUtils.putCompressedInt(out, commonPrefix);
171 
172       ByteBufferUtils.skip(in, commonPrefix);
173       if (commonPrefix < currentState.rowLength + KeyValue.ROW_LENGTH_SIZE) {
174         // Previous and current rows are different. Copy the differing part of
175         // the row, skip the column family, and copy the qualifier.
176         ByteBufferUtils.moveBufferToStream(out, in,
177             currentState.rowLength + KeyValue.ROW_LENGTH_SIZE - commonPrefix);
178         ByteBufferUtils.skip(in, currentState.familyLength +
179             KeyValue.FAMILY_LENGTH_SIZE);
180         ByteBufferUtils.moveBufferToStream(out, in,
181             currentState.qualifierLength);
182       } else {
183         // The common part includes the whole row. As the column family is the
184         // same across the whole file, it will automatically be included in the
185         // common prefix, so we need not special-case it here.
186         int restKeyLength = keyLength - commonPrefix -
187             KeyValue.TIMESTAMP_TYPE_SIZE;
188         ByteBufferUtils.moveBufferToStream(out, in, restKeyLength);
189       }
190       ByteBufferUtils.skip(in, commonTimestampPrefix);
191       ByteBufferUtils.moveBufferToStream(out, in,
192           KeyValue.TIMESTAMP_SIZE - commonTimestampPrefix);
193 
194       // Write the type if it is not the same as before.
195       if ((flag & FLAG_SAME_TYPE) == 0) {
196         out.write(currentState.type);
197       }
198 
199       // Write the value if it is not the same as before.
200       if ((flag & FLAG_SAME_VALUE) == 0) {
201         ByteBufferUtils.copyBufferToStream(out, in, valueOffset, valueLength);
202       }
203 
204       // Skip key type and value in the input buffer.
205       ByteBufferUtils.skip(in, KeyValue.TYPE_SIZE + currentState.valueLength);
206     }
207   }
208 
209   private int findCommonTimestampPrefix(FastDiffCompressionState left,
210       FastDiffCompressionState right) {
211     int prefixTimestamp = 0;
212     while (prefixTimestamp < (KeyValue.TIMESTAMP_SIZE - 1) &&
213         left.timestamp[prefixTimestamp]
214             == right.timestamp[prefixTimestamp]) {
215       prefixTimestamp++;
216     }
217     return prefixTimestamp; // has to be at most 7 bytes
218   }
219 
220   private void uncompressSingleKeyValue(DataInputStream source,
221       ByteBuffer out, FastDiffCompressionState state)
222           throws IOException, EncoderBufferTooSmallException {
223     byte flag = source.readByte();
224     int prevKeyLength = state.keyLength;
225 
226     if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
227       state.keyLength = ByteBufferUtils.readCompressedInt(source);
228     }
229     if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
230       state.valueLength = ByteBufferUtils.readCompressedInt(source);
231     }
232     int commonLength = ByteBufferUtils.readCompressedInt(source);
233 
234     ByteBufferUtils.ensureSpace(out, state.keyLength + state.valueLength +
235         KeyValue.ROW_OFFSET);
236 
237     int kvPos = out.position();
238 
239     if (!state.isFirst()) {
240       // copy the prefix
241       int common;
242       int prevOffset;
243 
244       if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
245         out.putInt(state.keyLength);
246         out.putInt(state.valueLength);
247         prevOffset = state.prevOffset + KeyValue.ROW_OFFSET;
248         common = commonLength;
249       } else {
250         if ((flag & FLAG_SAME_KEY_LENGTH) != 0) {
251           prevOffset = state.prevOffset;
252           common = commonLength + KeyValue.ROW_OFFSET;
253         } else {
254           out.putInt(state.keyLength);
255           prevOffset = state.prevOffset + KeyValue.KEY_LENGTH_SIZE;
256           common = commonLength + KeyValue.KEY_LENGTH_SIZE;
257         }
258       }
259 
260       ByteBufferUtils.copyFromBufferToBuffer(out, out, prevOffset, common);
261 
262       // copy the rest of the key from the buffer
263       int keyRestLength;
264       if (commonLength < state.rowLength + KeyValue.ROW_LENGTH_SIZE) {
265         // omit the family part of the key, it is always the same
266         int rowWithSizeLength;
267         int rowRestLength;
268 
269         // check length of row
270         if (commonLength < KeyValue.ROW_LENGTH_SIZE) {
271           // not yet copied, do it now
272           ByteBufferUtils.copyFromStreamToBuffer(out, source,
273               KeyValue.ROW_LENGTH_SIZE - commonLength);
274 
275           rowWithSizeLength = out.getShort(out.position() -
276               KeyValue.ROW_LENGTH_SIZE) + KeyValue.ROW_LENGTH_SIZE;
277           rowRestLength = rowWithSizeLength - KeyValue.ROW_LENGTH_SIZE;
278         } else {
279           // already in kvBuffer, just read it
280           rowWithSizeLength = out.getShort(kvPos + KeyValue.ROW_OFFSET) +
281               KeyValue.ROW_LENGTH_SIZE;
282           rowRestLength = rowWithSizeLength - commonLength;
283         }
284 
285         // copy the rest of row
286         ByteBufferUtils.copyFromStreamToBuffer(out, source, rowRestLength);
287 
288         // copy the column family
289         ByteBufferUtils.copyFromBufferToBuffer(out, out,
290             state.prevOffset + KeyValue.ROW_OFFSET + KeyValue.ROW_LENGTH_SIZE
291                 + state.rowLength, state.familyLength
292                 + KeyValue.FAMILY_LENGTH_SIZE);
293         state.rowLength = (short) (rowWithSizeLength -
294             KeyValue.ROW_LENGTH_SIZE);
295 
296         keyRestLength = state.keyLength - rowWithSizeLength -
297             state.familyLength -
298             (KeyValue.FAMILY_LENGTH_SIZE + KeyValue.TIMESTAMP_TYPE_SIZE);
299       } else {
300         // prevRowWithSizeLength is the same as on previous row
301         keyRestLength = state.keyLength - commonLength -
302             KeyValue.TIMESTAMP_TYPE_SIZE;
303       }
304       // copy the rest of the key, after column family == column qualifier
305       ByteBufferUtils.copyFromStreamToBuffer(out, source, keyRestLength);
306 
307       // copy timestamp
308       int prefixTimestamp =
309           (flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH;
310       ByteBufferUtils.copyFromBufferToBuffer(out, out,
311           state.prevTimestampOffset, prefixTimestamp);
312       state.prevTimestampOffset = out.position() - prefixTimestamp;
313       ByteBufferUtils.copyFromStreamToBuffer(out, source,
314           KeyValue.TIMESTAMP_SIZE - prefixTimestamp);
315 
316       // copy the type and value
317       if ((flag & FLAG_SAME_TYPE) != 0) {
318         out.put(state.type);
319         if ((flag & FLAG_SAME_VALUE) != 0) {
320           ByteBufferUtils.copyFromBufferToBuffer(out, out, state.prevOffset +
321               KeyValue.ROW_OFFSET + prevKeyLength, state.valueLength);
322         } else {
323           ByteBufferUtils.copyFromStreamToBuffer(out, source,
324               state.valueLength);
325         }
326       } else {
327         if ((flag & FLAG_SAME_VALUE) != 0) {
328           ByteBufferUtils.copyFromStreamToBuffer(out, source,
329               KeyValue.TYPE_SIZE);
330           ByteBufferUtils.copyFromBufferToBuffer(out, out, state.prevOffset +
331               KeyValue.ROW_OFFSET + prevKeyLength, state.valueLength);
332         } else {
333           ByteBufferUtils.copyFromStreamToBuffer(out, source,
334               state.valueLength + KeyValue.TYPE_SIZE);
335         }
336         state.type = out.get(state.prevTimestampOffset +
337             KeyValue.TIMESTAMP_SIZE);
338       }
339     } else { // this is the first element
340       state.decompressFirstKV(out, source);
341     }
342 
343     state.prevOffset = kvPos;
344   }
345 
346   @Override
347   public void compressKeyValues(DataOutputStream out,
348       ByteBuffer in, boolean includesMemstoreTS) throws IOException {
349     in.rewind();
350     ByteBufferUtils.putInt(out, in.limit());
351     FastDiffCompressionState previousState = new FastDiffCompressionState();
352     FastDiffCompressionState currentState = new FastDiffCompressionState();
353     while (in.hasRemaining()) {
354       compressSingleKeyValue(previousState, currentState,
355           out, in);
356       afterEncodingKeyValue(in, out, includesMemstoreTS);
357 
358       // swap previousState <-> currentState
359       FastDiffCompressionState tmp = previousState;
360       previousState = currentState;
361       currentState = tmp;
362     }
363   }
364 
365   @Override
366   public ByteBuffer uncompressKeyValues(DataInputStream source,
367       int allocHeaderLength, int skipLastBytes, boolean includesMemstoreTS)
368           throws IOException {
369     int decompressedSize = source.readInt();
370     ByteBuffer buffer = ByteBuffer.allocate(decompressedSize +
371         allocHeaderLength);
372     buffer.position(allocHeaderLength);
373     FastDiffCompressionState state = new FastDiffCompressionState();
374     while (source.available() > skipLastBytes) {
375       uncompressSingleKeyValue(source, buffer, state);
376       afterDecodingKeyValue(source, buffer, includesMemstoreTS);
377     }
378 
379     if (source.available() != skipLastBytes) {
380       throw new IllegalStateException("Read too much bytes.");
381     }
382 
383     return buffer;
384   }
385 
386   @Override
387   public ByteBuffer getFirstKeyInBlock(ByteBuffer block) {
388     block.mark();
389     block.position(Bytes.SIZEOF_INT + Bytes.SIZEOF_BYTE);
390     int keyLength = ByteBufferUtils.readCompressedInt(block);
391     ByteBufferUtils.readCompressedInt(block); // valueLength
392     ByteBufferUtils.readCompressedInt(block); // commonLength
393     int pos = block.position();
394     block.reset();
395     return ByteBuffer.wrap(block.array(), block.arrayOffset() + pos, keyLength).slice();
396   }
397 
398   @Override
399   public String toString() {
400     return FastDiffDeltaEncoder.class.getSimpleName();
401   }
402 
403   protected static class FastDiffSeekerState extends SeekerState {
404     private byte[] prevTimestampAndType =
405         new byte[KeyValue.TIMESTAMP_TYPE_SIZE];
406     private int rowLengthWithSize;
407     private int familyLengthWithSize;
408 
409     @Override
410     protected void copyFromNext(SeekerState that) {
411       super.copyFromNext(that);
412       FastDiffSeekerState other = (FastDiffSeekerState) that;
413       System.arraycopy(other.prevTimestampAndType, 0,
414           prevTimestampAndType, 0,
415           KeyValue.TIMESTAMP_TYPE_SIZE);
416       rowLengthWithSize = other.rowLengthWithSize;
417       familyLengthWithSize = other.familyLengthWithSize;
418     }
419   }
420 
421   @Override
422   public EncodedSeeker createSeeker(RawComparator<byte[]> comparator,
423       final boolean includesMemstoreTS) {
424     return new BufferedEncodedSeeker<FastDiffSeekerState>(comparator) {
425       private void decode(boolean isFirst) {
426         byte flag = currentBuffer.get();
427         if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
428           if (!isFirst) {
429             System.arraycopy(current.keyBuffer,
430                 current.keyLength - current.prevTimestampAndType.length,
431                 current.prevTimestampAndType, 0,
432                 current.prevTimestampAndType.length);
433           }
434           current.keyLength = ByteBufferUtils.readCompressedInt(currentBuffer);
435         }
436         if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
437           current.valueLength =
438               ByteBufferUtils.readCompressedInt(currentBuffer);
439         }
440         current.lastCommonPrefix =
441             ByteBufferUtils.readCompressedInt(currentBuffer);
442 
443         current.ensureSpaceForKey();
444 
445         if (isFirst) {
446           // copy everything
447           currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
448               current.keyLength - current.prevTimestampAndType.length);
449           current.rowLengthWithSize = Bytes.toShort(current.keyBuffer, 0) +
450               Bytes.SIZEOF_SHORT;
451           current.familyLengthWithSize =
452               current.keyBuffer[current.rowLengthWithSize] + Bytes.SIZEOF_BYTE;
453         } else if (current.lastCommonPrefix < Bytes.SIZEOF_SHORT) {
454           // length of row is different, copy everything except family
455 
456           // copy the row size
457           int oldRowLengthWithSize = current.rowLengthWithSize;
458           currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
459               Bytes.SIZEOF_SHORT - current.lastCommonPrefix);
460           current.rowLengthWithSize = Bytes.toShort(current.keyBuffer, 0) +
461               Bytes.SIZEOF_SHORT;
462 
463           // move the column family
464           System.arraycopy(current.keyBuffer, oldRowLengthWithSize,
465               current.keyBuffer, current.rowLengthWithSize,
466               current.familyLengthWithSize);
467 
468           // copy the rest of row
469           currentBuffer.get(current.keyBuffer, Bytes.SIZEOF_SHORT,
470               current.rowLengthWithSize - Bytes.SIZEOF_SHORT);
471 
472           // copy the qualifier
473           currentBuffer.get(current.keyBuffer, current.rowLengthWithSize
474               + current.familyLengthWithSize, current.keyLength
475               - current.rowLengthWithSize - current.familyLengthWithSize
476               - current.prevTimestampAndType.length);
477         } else if (current.lastCommonPrefix < current.rowLengthWithSize) {
478           // We have to copy part of row and qualifier, but the column family
479           // is in the right place.
480 
481           // before column family (rest of row)
482           currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
483               current.rowLengthWithSize - current.lastCommonPrefix);
484 
485           // after column family (qualifier)
486           currentBuffer.get(current.keyBuffer, current.rowLengthWithSize
487               + current.familyLengthWithSize, current.keyLength
488               - current.rowLengthWithSize - current.familyLengthWithSize
489               - current.prevTimestampAndType.length);
490         } else {
491           // copy just the ending
492           currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
493               current.keyLength - current.prevTimestampAndType.length
494                   - current.lastCommonPrefix);
495         }
496 
497         // timestamp
498         int pos = current.keyLength - current.prevTimestampAndType.length;
499         int commonTimestampPrefix = (flag & MASK_TIMESTAMP_LENGTH) >>>
500           SHIFT_TIMESTAMP_LENGTH;
501         if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
502           System.arraycopy(current.prevTimestampAndType, 0, current.keyBuffer,
503               pos, commonTimestampPrefix);
504         }
505         pos += commonTimestampPrefix;
506         currentBuffer.get(current.keyBuffer, pos,
507             Bytes.SIZEOF_LONG - commonTimestampPrefix);
508         pos += Bytes.SIZEOF_LONG - commonTimestampPrefix;
509 
510         // type
511         if ((flag & FLAG_SAME_TYPE) == 0) {
512           currentBuffer.get(current.keyBuffer, pos, Bytes.SIZEOF_BYTE);
513         } else if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
514           current.keyBuffer[pos] =
515               current.prevTimestampAndType[Bytes.SIZEOF_LONG];
516         }
517 
518         // handle value
519         if ((flag & FLAG_SAME_VALUE) == 0) {
520           current.valueOffset = currentBuffer.position();
521           ByteBufferUtils.skip(currentBuffer, current.valueLength);
522         }
523 
524         if (includesMemstoreTS) {
525           current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
526         } else {
527           current.memstoreTS = 0;
528         }
529         current.nextKvOffset = currentBuffer.position();
530       }
531 
532       @Override
533       protected void decodeFirst() {
534         ByteBufferUtils.skip(currentBuffer, Bytes.SIZEOF_INT);
535         decode(true);
536       }
537 
538       @Override
539       protected void decodeNext() {
540         decode(false);
541       }
542 
543       @Override
544       protected FastDiffSeekerState createSeekerState() {
545         return new FastDiffSeekerState();
546       }
547     };
548   }
549 }