View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations
15   * under the License.
16   */
17  package org.apache.hadoop.hbase.io.encoding;
18  
19  import java.io.DataInputStream;
20  import java.io.DataOutputStream;
21  import java.io.IOException;
22  import java.io.OutputStream;
23  import java.nio.ByteBuffer;
24  
25  import org.apache.hadoop.classification.InterfaceAudience;
26  import org.apache.hadoop.hbase.KeyValue;
27  import org.apache.hadoop.hbase.KeyValue.KVComparator;
28  import org.apache.hadoop.hbase.util.ByteBufferUtils;
29  import org.apache.hadoop.hbase.util.Bytes;
30  
31  /**
32   * Encoder similar to {@link DiffKeyDeltaEncoder} but supposedly faster.
33   *
34   * Compress using:
35   * - store size of common prefix
36   * - save column family once in the first KeyValue
37   * - use integer compression for key, value and prefix (7-bit encoding)
38   * - use bits to avoid duplication key length, value length
39   *   and type if it same as previous
40   * - store in 3 bits length of prefix timestamp
41   *    with previous KeyValue's timestamp
42   * - one bit which allow to omit value if it is the same
43   *
44   * Format:
45   * - 1 byte:    flag
46   * - 1-5 bytes: key length (only if FLAG_SAME_KEY_LENGTH is not set in flag)
47   * - 1-5 bytes: value length (only if FLAG_SAME_VALUE_LENGTH is not set in flag)
48   * - 1-5 bytes: prefix length
49   * - ... bytes: rest of the row (if prefix length is small enough)
50   * - ... bytes: qualifier (or suffix depending on prefix length)
51   * - 1-8 bytes: timestamp suffix
52   * - 1 byte:    type (only if FLAG_SAME_TYPE is not set in the flag)
53   * - ... bytes: value (only if FLAG_SAME_VALUE is not set in the flag)
54   *
55   */
56  @InterfaceAudience.Private
57  public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder {
58    final int MASK_TIMESTAMP_LENGTH = (1 << 0) | (1 << 1) | (1 << 2);
59    final int SHIFT_TIMESTAMP_LENGTH = 0;
60    final int FLAG_SAME_KEY_LENGTH = 1 << 3;
61    final int FLAG_SAME_VALUE_LENGTH = 1 << 4;
62    final int FLAG_SAME_TYPE = 1 << 5;
63    final int FLAG_SAME_VALUE = 1 << 6;
64  
65    private static class FastDiffCompressionState extends CompressionState {
66      byte[] timestamp = new byte[KeyValue.TIMESTAMP_SIZE];
67      int prevTimestampOffset;
68  
69      @Override
70      protected void readTimestamp(ByteBuffer in) {
71        in.get(timestamp);
72      }
73  
74      @Override
75      void copyFrom(CompressionState state) {
76        super.copyFrom(state);
77        FastDiffCompressionState state2 = (FastDiffCompressionState) state;
78        System.arraycopy(state2.timestamp, 0, timestamp, 0,
79            KeyValue.TIMESTAMP_SIZE);
80        prevTimestampOffset = state2.prevTimestampOffset;
81      }
82  
83      /**
84       * Copies the first key/value from the given stream, and initializes
85       * decompression state based on it. Assumes that we have already read key
86       * and value lengths. Does not set {@link #qualifierLength} (not used by
87       * decompression) or {@link #prevOffset} (set by the calle afterwards).
88       */
89      private void decompressFirstKV(ByteBuffer out, DataInputStream in)
90          throws IOException {
91        int kvPos = out.position();
92        out.putInt(keyLength);
93        out.putInt(valueLength);
94        prevTimestampOffset = out.position() + keyLength -
95            KeyValue.TIMESTAMP_TYPE_SIZE;
96        ByteBufferUtils.copyFromStreamToBuffer(out, in, keyLength + valueLength);
97        rowLength = out.getShort(kvPos + KeyValue.ROW_OFFSET);
98        familyLength = out.get(kvPos + KeyValue.ROW_OFFSET +
99            KeyValue.ROW_LENGTH_SIZE + rowLength);
100       type = out.get(prevTimestampOffset + KeyValue.TIMESTAMP_SIZE);
101     }
102 
103   }
104 
105   private void compressSingleKeyValue(
106         FastDiffCompressionState previousState,
107         FastDiffCompressionState currentState,
108         OutputStream out, ByteBuffer in) throws IOException {
109     currentState.prevOffset = in.position();
110     int keyLength = in.getInt();
111     int valueOffset =
112         currentState.prevOffset + keyLength + KeyValue.ROW_OFFSET;
113     int valueLength = in.getInt();
114     byte flag = 0;
115 
116     if (previousState.isFirst()) {
117       // copy the key, there is no common prefix with none
118       out.write(flag);
119       ByteBufferUtils.putCompressedInt(out, keyLength);
120       ByteBufferUtils.putCompressedInt(out, valueLength);
121       ByteBufferUtils.putCompressedInt(out, 0);
122 
123       currentState.readKey(in, keyLength, valueLength);
124 
125       ByteBufferUtils.moveBufferToStream(out, in, keyLength + valueLength);
126     } else {
127       // find a common prefix and skip it
128       int commonPrefix = ByteBufferUtils.findCommonPrefix(in, in.position(),
129           previousState.prevOffset + KeyValue.ROW_OFFSET,
130           Math.min(keyLength, previousState.keyLength) -
131           KeyValue.TIMESTAMP_TYPE_SIZE);
132 
133       currentState.readKey(in, keyLength, valueLength,
134           commonPrefix, previousState);
135 
136       if (keyLength == previousState.keyLength) {
137         flag |= FLAG_SAME_KEY_LENGTH;
138       }
139       if (valueLength == previousState.valueLength) {
140         flag |= FLAG_SAME_VALUE_LENGTH;
141       }
142       if (currentState.type == previousState.type) {
143         flag |= FLAG_SAME_TYPE;
144       }
145 
146       int commonTimestampPrefix = findCommonTimestampPrefix(
147           currentState, previousState);
148       flag |= commonTimestampPrefix << SHIFT_TIMESTAMP_LENGTH;
149 
150       // Check if current and previous values are the same. Compare value
151       // length first as an optimization.
152       if (valueLength == previousState.valueLength) {
153         int previousValueOffset = previousState.prevOffset
154             + previousState.keyLength + KeyValue.ROW_OFFSET;
155         if (ByteBufferUtils.arePartsEqual(in,
156                 previousValueOffset, previousState.valueLength,
157                 valueOffset, valueLength)) {
158           flag |= FLAG_SAME_VALUE;
159         }
160       }
161 
162       out.write(flag);
163       if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
164         ByteBufferUtils.putCompressedInt(out, keyLength);
165       }
166       if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
167         ByteBufferUtils.putCompressedInt(out, valueLength);
168       }
169       ByteBufferUtils.putCompressedInt(out, commonPrefix);
170 
171       ByteBufferUtils.skip(in, commonPrefix);
172       if (commonPrefix < currentState.rowLength + KeyValue.ROW_LENGTH_SIZE) {
173         // Previous and current rows are different. Copy the differing part of
174         // the row, skip the column family, and copy the qualifier.
175         ByteBufferUtils.moveBufferToStream(out, in,
176             currentState.rowLength + KeyValue.ROW_LENGTH_SIZE - commonPrefix);
177         ByteBufferUtils.skip(in, currentState.familyLength +
178             KeyValue.FAMILY_LENGTH_SIZE);
179         ByteBufferUtils.moveBufferToStream(out, in,
180             currentState.qualifierLength);
181       } else {
182         // The common part includes the whole row. As the column family is the
183         // same across the whole file, it will automatically be included in the
184         // common prefix, so we need not special-case it here.
185         int restKeyLength = keyLength - commonPrefix -
186             KeyValue.TIMESTAMP_TYPE_SIZE;
187         ByteBufferUtils.moveBufferToStream(out, in, restKeyLength);
188       }
189       ByteBufferUtils.skip(in, commonTimestampPrefix);
190       ByteBufferUtils.moveBufferToStream(out, in,
191           KeyValue.TIMESTAMP_SIZE - commonTimestampPrefix);
192 
193       // Write the type if it is not the same as before.
194       if ((flag & FLAG_SAME_TYPE) == 0) {
195         out.write(currentState.type);
196       }
197 
198       // Write the value if it is not the same as before.
199       if ((flag & FLAG_SAME_VALUE) == 0) {
200         ByteBufferUtils.copyBufferToStream(out, in, valueOffset, valueLength);
201       }
202 
203       // Skip key type and value in the input buffer.
204       ByteBufferUtils.skip(in, KeyValue.TYPE_SIZE + currentState.valueLength);
205     }
206   }
207 
208   private int findCommonTimestampPrefix(FastDiffCompressionState left,
209       FastDiffCompressionState right) {
210     int prefixTimestamp = 0;
211     while (prefixTimestamp < (KeyValue.TIMESTAMP_SIZE - 1) &&
212         left.timestamp[prefixTimestamp]
213             == right.timestamp[prefixTimestamp]) {
214       prefixTimestamp++;
215     }
216     return prefixTimestamp; // has to be at most 7 bytes
217   }
218 
219   private void uncompressSingleKeyValue(DataInputStream source,
220       ByteBuffer out, FastDiffCompressionState state)
221           throws IOException, EncoderBufferTooSmallException {
222     byte flag = source.readByte();
223     int prevKeyLength = state.keyLength;
224 
225     if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
226       state.keyLength = ByteBufferUtils.readCompressedInt(source);
227     }
228     if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
229       state.valueLength = ByteBufferUtils.readCompressedInt(source);
230     }
231     int commonLength = ByteBufferUtils.readCompressedInt(source);
232 
233     ensureSpace(out, state.keyLength + state.valueLength + KeyValue.ROW_OFFSET);
234 
235     int kvPos = out.position();
236 
237     if (!state.isFirst()) {
238       // copy the prefix
239       int common;
240       int prevOffset;
241 
242       if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
243         out.putInt(state.keyLength);
244         out.putInt(state.valueLength);
245         prevOffset = state.prevOffset + KeyValue.ROW_OFFSET;
246         common = commonLength;
247       } else {
248         if ((flag & FLAG_SAME_KEY_LENGTH) != 0) {
249           prevOffset = state.prevOffset;
250           common = commonLength + KeyValue.ROW_OFFSET;
251         } else {
252           out.putInt(state.keyLength);
253           prevOffset = state.prevOffset + KeyValue.KEY_LENGTH_SIZE;
254           common = commonLength + KeyValue.KEY_LENGTH_SIZE;
255         }
256       }
257 
258       ByteBufferUtils.copyFromBufferToBuffer(out, out, prevOffset, common);
259 
260       // copy the rest of the key from the buffer
261       int keyRestLength;
262       if (commonLength < state.rowLength + KeyValue.ROW_LENGTH_SIZE) {
263         // omit the family part of the key, it is always the same
264         int rowWithSizeLength;
265         int rowRestLength;
266 
267         // check length of row
268         if (commonLength < KeyValue.ROW_LENGTH_SIZE) {
269           // not yet copied, do it now
270           ByteBufferUtils.copyFromStreamToBuffer(out, source,
271               KeyValue.ROW_LENGTH_SIZE - commonLength);
272 
273           rowWithSizeLength = out.getShort(out.position() -
274               KeyValue.ROW_LENGTH_SIZE) + KeyValue.ROW_LENGTH_SIZE;
275           rowRestLength = rowWithSizeLength - KeyValue.ROW_LENGTH_SIZE;
276         } else {
277           // already in kvBuffer, just read it
278           rowWithSizeLength = out.getShort(kvPos + KeyValue.ROW_OFFSET) +
279               KeyValue.ROW_LENGTH_SIZE;
280           rowRestLength = rowWithSizeLength - commonLength;
281         }
282 
283         // copy the rest of row
284         ByteBufferUtils.copyFromStreamToBuffer(out, source, rowRestLength);
285 
286         // copy the column family
287         ByteBufferUtils.copyFromBufferToBuffer(out, out,
288             state.prevOffset + KeyValue.ROW_OFFSET + KeyValue.ROW_LENGTH_SIZE
289                 + state.rowLength, state.familyLength
290                 + KeyValue.FAMILY_LENGTH_SIZE);
291         state.rowLength = (short) (rowWithSizeLength -
292             KeyValue.ROW_LENGTH_SIZE);
293 
294         keyRestLength = state.keyLength - rowWithSizeLength -
295             state.familyLength -
296             (KeyValue.FAMILY_LENGTH_SIZE + KeyValue.TIMESTAMP_TYPE_SIZE);
297       } else {
298         // prevRowWithSizeLength is the same as on previous row
299         keyRestLength = state.keyLength - commonLength -
300             KeyValue.TIMESTAMP_TYPE_SIZE;
301       }
302       // copy the rest of the key, after column family == column qualifier
303       ByteBufferUtils.copyFromStreamToBuffer(out, source, keyRestLength);
304 
305       // copy timestamp
306       int prefixTimestamp =
307           (flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH;
308       ByteBufferUtils.copyFromBufferToBuffer(out, out,
309           state.prevTimestampOffset, prefixTimestamp);
310       state.prevTimestampOffset = out.position() - prefixTimestamp;
311       ByteBufferUtils.copyFromStreamToBuffer(out, source,
312           KeyValue.TIMESTAMP_SIZE - prefixTimestamp);
313 
314       // copy the type and value
315       if ((flag & FLAG_SAME_TYPE) != 0) {
316         out.put(state.type);
317         if ((flag & FLAG_SAME_VALUE) != 0) {
318           ByteBufferUtils.copyFromBufferToBuffer(out, out, state.prevOffset +
319               KeyValue.ROW_OFFSET + prevKeyLength, state.valueLength);
320         } else {
321           ByteBufferUtils.copyFromStreamToBuffer(out, source,
322               state.valueLength);
323         }
324       } else {
325         if ((flag & FLAG_SAME_VALUE) != 0) {
326           ByteBufferUtils.copyFromStreamToBuffer(out, source,
327               KeyValue.TYPE_SIZE);
328           ByteBufferUtils.copyFromBufferToBuffer(out, out, state.prevOffset +
329               KeyValue.ROW_OFFSET + prevKeyLength, state.valueLength);
330         } else {
331           ByteBufferUtils.copyFromStreamToBuffer(out, source,
332               state.valueLength + KeyValue.TYPE_SIZE);
333         }
334         state.type = out.get(state.prevTimestampOffset +
335             KeyValue.TIMESTAMP_SIZE);
336       }
337     } else { // this is the first element
338       state.decompressFirstKV(out, source);
339     }
340 
341     state.prevOffset = kvPos;
342   }
343 
344   @Override
345   public void internalEncodeKeyValues(DataOutputStream out, ByteBuffer in,
346       HFileBlockDefaultEncodingContext encodingCtx) throws IOException {
347     in.rewind();
348     ByteBufferUtils.putInt(out, in.limit());
349     FastDiffCompressionState previousState = new FastDiffCompressionState();
350     FastDiffCompressionState currentState = new FastDiffCompressionState();
351     while (in.hasRemaining()) {
352       compressSingleKeyValue(previousState, currentState,
353           out, in);
354       afterEncodingKeyValue(in, out, encodingCtx);
355 
356       // swap previousState <-> currentState
357       FastDiffCompressionState tmp = previousState;
358       previousState = currentState;
359       currentState = tmp;
360     }
361   }
362 
363   @Override
364   protected ByteBuffer internalDecodeKeyValues(DataInputStream source, int allocateHeaderLength,
365       int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx) throws IOException {
366     int decompressedSize = source.readInt();
367     ByteBuffer buffer = ByteBuffer.allocate(decompressedSize +
368         allocateHeaderLength);
369     buffer.position(allocateHeaderLength);
370     FastDiffCompressionState state = new FastDiffCompressionState();
371     while (source.available() > skipLastBytes) {
372       uncompressSingleKeyValue(source, buffer, state);
373       afterDecodingKeyValue(source, buffer, decodingCtx);
374     }
375 
376     if (source.available() != skipLastBytes) {
377       throw new IllegalStateException("Read too much bytes.");
378     }
379 
380     return buffer;
381   }
382 
383   @Override
384   public ByteBuffer getFirstKeyInBlock(ByteBuffer block) {
385     block.mark();
386     block.position(Bytes.SIZEOF_INT + Bytes.SIZEOF_BYTE);
387     int keyLength = ByteBufferUtils.readCompressedInt(block);
388     ByteBufferUtils.readCompressedInt(block); // valueLength
389     ByteBufferUtils.readCompressedInt(block); // commonLength
390     int pos = block.position();
391     block.reset();
392     return ByteBuffer.wrap(block.array(), pos, keyLength).slice();
393   }
394 
395   @Override
396   public String toString() {
397     return FastDiffDeltaEncoder.class.getSimpleName();
398   }
399 
400   protected static class FastDiffSeekerState extends SeekerState {
401     private byte[] prevTimestampAndType =
402         new byte[KeyValue.TIMESTAMP_TYPE_SIZE];
403     private int rowLengthWithSize;
404     private int familyLengthWithSize;
405 
406     @Override
407     protected void copyFromNext(SeekerState that) {
408       super.copyFromNext(that);
409       FastDiffSeekerState other = (FastDiffSeekerState) that;
410       System.arraycopy(other.prevTimestampAndType, 0,
411           prevTimestampAndType, 0,
412           KeyValue.TIMESTAMP_TYPE_SIZE);
413       rowLengthWithSize = other.rowLengthWithSize;
414       familyLengthWithSize = other.familyLengthWithSize;
415     }
416   }
417 
418   @Override
419   public EncodedSeeker createSeeker(KVComparator comparator,
420       final HFileBlockDecodingContext decodingCtx) {
421     return new BufferedEncodedSeeker<FastDiffSeekerState>(comparator, decodingCtx) {
422       private void decode(boolean isFirst) {
423         byte flag = currentBuffer.get();
424         if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
425           if (!isFirst) {
426             System.arraycopy(current.keyBuffer,
427                 current.keyLength - current.prevTimestampAndType.length,
428                 current.prevTimestampAndType, 0,
429                 current.prevTimestampAndType.length);
430           }
431           current.keyLength = ByteBufferUtils.readCompressedInt(currentBuffer);
432         }
433         if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
434           current.valueLength =
435               ByteBufferUtils.readCompressedInt(currentBuffer);
436         }
437         current.lastCommonPrefix =
438             ByteBufferUtils.readCompressedInt(currentBuffer);
439 
440         current.ensureSpaceForKey();
441 
442         if (isFirst) {
443           // copy everything
444           currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
445               current.keyLength - current.prevTimestampAndType.length);
446           current.rowLengthWithSize = Bytes.toShort(current.keyBuffer, 0) +
447               Bytes.SIZEOF_SHORT;
448           current.familyLengthWithSize =
449               current.keyBuffer[current.rowLengthWithSize] + Bytes.SIZEOF_BYTE;
450         } else if (current.lastCommonPrefix < Bytes.SIZEOF_SHORT) {
451           // length of row is different, copy everything except family
452 
453           // copy the row size
454           int oldRowLengthWithSize = current.rowLengthWithSize;
455           currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
456               Bytes.SIZEOF_SHORT - current.lastCommonPrefix);
457           current.rowLengthWithSize = Bytes.toShort(current.keyBuffer, 0) +
458               Bytes.SIZEOF_SHORT;
459 
460           // move the column family
461           System.arraycopy(current.keyBuffer, oldRowLengthWithSize,
462               current.keyBuffer, current.rowLengthWithSize,
463               current.familyLengthWithSize);
464 
465           // copy the rest of row
466           currentBuffer.get(current.keyBuffer, Bytes.SIZEOF_SHORT,
467               current.rowLengthWithSize - Bytes.SIZEOF_SHORT);
468 
469           // copy the qualifier
470           currentBuffer.get(current.keyBuffer, current.rowLengthWithSize
471               + current.familyLengthWithSize, current.keyLength
472               - current.rowLengthWithSize - current.familyLengthWithSize
473               - current.prevTimestampAndType.length);
474         } else if (current.lastCommonPrefix < current.rowLengthWithSize) {
475           // We have to copy part of row and qualifier, but the column family
476           // is in the right place.
477 
478           // before column family (rest of row)
479           currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
480               current.rowLengthWithSize - current.lastCommonPrefix);
481 
482           // after column family (qualifier)
483           currentBuffer.get(current.keyBuffer, current.rowLengthWithSize
484               + current.familyLengthWithSize, current.keyLength
485               - current.rowLengthWithSize - current.familyLengthWithSize
486               - current.prevTimestampAndType.length);
487         } else {
488           // copy just the ending
489           currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
490               current.keyLength - current.prevTimestampAndType.length
491                   - current.lastCommonPrefix);
492         }
493 
494         // timestamp
495         int pos = current.keyLength - current.prevTimestampAndType.length;
496         int commonTimestampPrefix = (flag & MASK_TIMESTAMP_LENGTH) >>>
497           SHIFT_TIMESTAMP_LENGTH;
498         if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
499           System.arraycopy(current.prevTimestampAndType, 0, current.keyBuffer,
500               pos, commonTimestampPrefix);
501         }
502         pos += commonTimestampPrefix;
503         currentBuffer.get(current.keyBuffer, pos,
504             Bytes.SIZEOF_LONG - commonTimestampPrefix);
505         pos += Bytes.SIZEOF_LONG - commonTimestampPrefix;
506 
507         // type
508         if ((flag & FLAG_SAME_TYPE) == 0) {
509           currentBuffer.get(current.keyBuffer, pos, Bytes.SIZEOF_BYTE);
510         } else if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
511           current.keyBuffer[pos] =
512               current.prevTimestampAndType[Bytes.SIZEOF_LONG];
513         }
514 
515         // handle value
516         if ((flag & FLAG_SAME_VALUE) == 0) {
517           current.valueOffset = currentBuffer.position();
518           ByteBufferUtils.skip(currentBuffer, current.valueLength);
519         }
520 
521         if (includesTags()) {
522           decodeTags();
523         }
524         if (includesMvcc()) {
525           current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
526         } else {
527           current.memstoreTS = 0;
528         }
529         current.nextKvOffset = currentBuffer.position();
530       }
531 
532       @Override
533       protected void decodeFirst() {
534         ByteBufferUtils.skip(currentBuffer, Bytes.SIZEOF_INT);
535         decode(true);
536       }
537 
538       @Override
539       protected void decodeNext() {
540         decode(false);
541       }
542 
543       @Override
544       protected FastDiffSeekerState createSeekerState() {
545         return new FastDiffSeekerState();
546       }
547     };
548   }
549 }