View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations
15   * under the License.
16   */
17  package org.apache.hadoop.hbase.io.encoding;
18  
19  import java.io.DataInputStream;
20  import java.io.DataOutputStream;
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  
24  import org.apache.hadoop.classification.InterfaceAudience;
25  import org.apache.hadoop.hbase.KeyValue;
26  import org.apache.hadoop.hbase.KeyValue.KVComparator;
27  import org.apache.hadoop.hbase.util.ByteBufferUtils;
28  import org.apache.hadoop.hbase.util.Bytes;
29  
30  /**
31   * Encoder similar to {@link DiffKeyDeltaEncoder} but supposedly faster.
32   *
33   * Compress using:
34   * - store size of common prefix
35   * - save column family once in the first KeyValue
36   * - use integer compression for key, value and prefix (7-bit encoding)
37   * - use bits to avoid duplication key length, value length
38   *   and type if it same as previous
39   * - store in 3 bits length of prefix timestamp
40   *    with previous KeyValue's timestamp
41   * - one bit which allow to omit value if it is the same
42   *
43   * Format:
44   * - 1 byte:    flag
45   * - 1-5 bytes: key length (only if FLAG_SAME_KEY_LENGTH is not set in flag)
46   * - 1-5 bytes: value length (only if FLAG_SAME_VALUE_LENGTH is not set in flag)
47   * - 1-5 bytes: prefix length
48   * - ... bytes: rest of the row (if prefix length is small enough)
49   * - ... bytes: qualifier (or suffix depending on prefix length)
50   * - 1-8 bytes: timestamp suffix
51   * - 1 byte:    type (only if FLAG_SAME_TYPE is not set in the flag)
52   * - ... bytes: value (only if FLAG_SAME_VALUE is not set in the flag)
53   *
54   */
55  @InterfaceAudience.Private
56  public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder {
57    final int MASK_TIMESTAMP_LENGTH = (1 << 0) | (1 << 1) | (1 << 2);
58    final int SHIFT_TIMESTAMP_LENGTH = 0;
59    final int FLAG_SAME_KEY_LENGTH = 1 << 3;
60    final int FLAG_SAME_VALUE_LENGTH = 1 << 4;
61    final int FLAG_SAME_TYPE = 1 << 5;
62    final int FLAG_SAME_VALUE = 1 << 6;
63  
64    private static class FastDiffCompressionState extends CompressionState {
65      byte[] timestamp = new byte[KeyValue.TIMESTAMP_SIZE];
66      int prevTimestampOffset;
67  
68      @Override
69      protected void readTimestamp(ByteBuffer in) {
70        in.get(timestamp);
71      }
72  
73      @Override
74      void copyFrom(CompressionState state) {
75        super.copyFrom(state);
76        FastDiffCompressionState state2 = (FastDiffCompressionState) state;
77        System.arraycopy(state2.timestamp, 0, timestamp, 0,
78            KeyValue.TIMESTAMP_SIZE);
79        prevTimestampOffset = state2.prevTimestampOffset;
80      }
81  
82      /**
83       * Copies the first key/value from the given stream, and initializes
84       * decompression state based on it. Assumes that we have already read key
85       * and value lengths. Does not set {@link #qualifierLength} (not used by
86       * decompression) or {@link #prevOffset} (set by the calle afterwards).
87       */
88      private void decompressFirstKV(ByteBuffer out, DataInputStream in)
89          throws IOException {
90        int kvPos = out.position();
91        out.putInt(keyLength);
92        out.putInt(valueLength);
93        prevTimestampOffset = out.position() + keyLength -
94            KeyValue.TIMESTAMP_TYPE_SIZE;
95        ByteBufferUtils.copyFromStreamToBuffer(out, in, keyLength + valueLength);
96        rowLength = out.getShort(kvPos + KeyValue.ROW_OFFSET);
97        familyLength = out.get(kvPos + KeyValue.ROW_OFFSET +
98            KeyValue.ROW_LENGTH_SIZE + rowLength);
99        type = out.get(prevTimestampOffset + KeyValue.TIMESTAMP_SIZE);
100     }
101 
102   }
103 
104   private int findCommonTimestampPrefix(byte[] curKvBuf, int curKvTsOff, byte[] preKvBuf,
105       int preKvTsOff) {
106     int commonPrefix = 0;
107     while (commonPrefix < (KeyValue.TIMESTAMP_SIZE - 1)
108         && curKvBuf[curKvTsOff + commonPrefix] == preKvBuf[preKvTsOff + commonPrefix]) {
109       commonPrefix++;
110     }
111     return commonPrefix; // has to be at most 7 bytes
112   }
113 
114   private void uncompressSingleKeyValue(DataInputStream source,
115       ByteBuffer out, FastDiffCompressionState state)
116           throws IOException, EncoderBufferTooSmallException {
117     byte flag = source.readByte();
118     int prevKeyLength = state.keyLength;
119 
120     if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
121       state.keyLength = ByteBufferUtils.readCompressedInt(source);
122     }
123     if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
124       state.valueLength = ByteBufferUtils.readCompressedInt(source);
125     }
126     int commonLength = ByteBufferUtils.readCompressedInt(source);
127 
128     ensureSpace(out, state.keyLength + state.valueLength + KeyValue.ROW_OFFSET);
129 
130     int kvPos = out.position();
131 
132     if (!state.isFirst()) {
133       // copy the prefix
134       int common;
135       int prevOffset;
136 
137       if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
138         out.putInt(state.keyLength);
139         out.putInt(state.valueLength);
140         prevOffset = state.prevOffset + KeyValue.ROW_OFFSET;
141         common = commonLength;
142       } else {
143         if ((flag & FLAG_SAME_KEY_LENGTH) != 0) {
144           prevOffset = state.prevOffset;
145           common = commonLength + KeyValue.ROW_OFFSET;
146         } else {
147           out.putInt(state.keyLength);
148           prevOffset = state.prevOffset + KeyValue.KEY_LENGTH_SIZE;
149           common = commonLength + KeyValue.KEY_LENGTH_SIZE;
150         }
151       }
152 
153       ByteBufferUtils.copyFromBufferToBuffer(out, out, prevOffset, common);
154 
155       // copy the rest of the key from the buffer
156       int keyRestLength;
157       if (commonLength < state.rowLength + KeyValue.ROW_LENGTH_SIZE) {
158         // omit the family part of the key, it is always the same
159         int rowWithSizeLength;
160         int rowRestLength;
161 
162         // check length of row
163         if (commonLength < KeyValue.ROW_LENGTH_SIZE) {
164           // not yet copied, do it now
165           ByteBufferUtils.copyFromStreamToBuffer(out, source,
166               KeyValue.ROW_LENGTH_SIZE - commonLength);
167 
168           rowWithSizeLength = out.getShort(out.position() -
169               KeyValue.ROW_LENGTH_SIZE) + KeyValue.ROW_LENGTH_SIZE;
170           rowRestLength = rowWithSizeLength - KeyValue.ROW_LENGTH_SIZE;
171         } else {
172           // already in kvBuffer, just read it
173           rowWithSizeLength = out.getShort(kvPos + KeyValue.ROW_OFFSET) +
174               KeyValue.ROW_LENGTH_SIZE;
175           rowRestLength = rowWithSizeLength - commonLength;
176         }
177 
178         // copy the rest of row
179         ByteBufferUtils.copyFromStreamToBuffer(out, source, rowRestLength);
180 
181         // copy the column family
182         ByteBufferUtils.copyFromBufferToBuffer(out, out,
183             state.prevOffset + KeyValue.ROW_OFFSET + KeyValue.ROW_LENGTH_SIZE
184                 + state.rowLength, state.familyLength
185                 + KeyValue.FAMILY_LENGTH_SIZE);
186         state.rowLength = (short) (rowWithSizeLength -
187             KeyValue.ROW_LENGTH_SIZE);
188 
189         keyRestLength = state.keyLength - rowWithSizeLength -
190             state.familyLength -
191             (KeyValue.FAMILY_LENGTH_SIZE + KeyValue.TIMESTAMP_TYPE_SIZE);
192       } else {
193         // prevRowWithSizeLength is the same as on previous row
194         keyRestLength = state.keyLength - commonLength -
195             KeyValue.TIMESTAMP_TYPE_SIZE;
196       }
197       // copy the rest of the key, after column family == column qualifier
198       ByteBufferUtils.copyFromStreamToBuffer(out, source, keyRestLength);
199 
200       // copy timestamp
201       int prefixTimestamp =
202           (flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH;
203       ByteBufferUtils.copyFromBufferToBuffer(out, out,
204           state.prevTimestampOffset, prefixTimestamp);
205       state.prevTimestampOffset = out.position() - prefixTimestamp;
206       ByteBufferUtils.copyFromStreamToBuffer(out, source,
207           KeyValue.TIMESTAMP_SIZE - prefixTimestamp);
208 
209       // copy the type and value
210       if ((flag & FLAG_SAME_TYPE) != 0) {
211         out.put(state.type);
212         if ((flag & FLAG_SAME_VALUE) != 0) {
213           ByteBufferUtils.copyFromBufferToBuffer(out, out, state.prevOffset +
214               KeyValue.ROW_OFFSET + prevKeyLength, state.valueLength);
215         } else {
216           ByteBufferUtils.copyFromStreamToBuffer(out, source,
217               state.valueLength);
218         }
219       } else {
220         if ((flag & FLAG_SAME_VALUE) != 0) {
221           ByteBufferUtils.copyFromStreamToBuffer(out, source,
222               KeyValue.TYPE_SIZE);
223           ByteBufferUtils.copyFromBufferToBuffer(out, out, state.prevOffset +
224               KeyValue.ROW_OFFSET + prevKeyLength, state.valueLength);
225         } else {
226           ByteBufferUtils.copyFromStreamToBuffer(out, source,
227               state.valueLength + KeyValue.TYPE_SIZE);
228         }
229         state.type = out.get(state.prevTimestampOffset +
230             KeyValue.TIMESTAMP_SIZE);
231       }
232     } else { // this is the first element
233       state.decompressFirstKV(out, source);
234     }
235 
236     state.prevOffset = kvPos;
237   }
238 
239   @Override
240   public int internalEncode(KeyValue kv, HFileBlockDefaultEncodingContext encodingContext,
241       DataOutputStream out) throws IOException {
242     EncodingState state = encodingContext.getEncodingState();
243     int size = compressSingleKeyValue(out, kv, state.prevKv);
244     size += afterEncodingKeyValue(kv, out, encodingContext);
245     state.prevKv = kv;
246     return size;
247   }
248 
249   private int compressSingleKeyValue(DataOutputStream out, KeyValue kv, KeyValue prevKv)
250       throws IOException {
251     byte flag = 0;
252     int kLength = kv.getKeyLength();
253     int vLength = kv.getValueLength();
254     byte[] curKvBuf = kv.getBuffer();
255 
256     if (prevKv == null) {
257       // copy the key, there is no common prefix with none
258       out.write(flag);
259       ByteBufferUtils.putCompressedInt(out, kLength);
260       ByteBufferUtils.putCompressedInt(out, vLength);
261       ByteBufferUtils.putCompressedInt(out, 0);
262       out.write(curKvBuf, kv.getKeyOffset(), kLength + vLength);
263     } else {
264       byte[] preKvBuf = prevKv.getBuffer();
265       int preKeyLength = prevKv.getKeyLength();
266       int preValLength = prevKv.getValueLength();
267       // find a common prefix and skip it
268       int commonPrefix = ByteBufferUtils.findCommonPrefix(curKvBuf, kv.getKeyOffset(), kLength
269           - KeyValue.TIMESTAMP_TYPE_SIZE, preKvBuf, prevKv.getKeyOffset(), preKeyLength
270           - KeyValue.TIMESTAMP_TYPE_SIZE);
271 
272       if (kLength == prevKv.getKeyLength()) {
273         flag |= FLAG_SAME_KEY_LENGTH;
274       }
275       if (vLength == prevKv.getValueLength()) {
276         flag |= FLAG_SAME_VALUE_LENGTH;
277       }
278       if (kv.getTypeByte() == prevKv.getTypeByte()) {
279         flag |= FLAG_SAME_TYPE;
280       }
281 
282       int commonTimestampPrefix = findCommonTimestampPrefix(curKvBuf, kv.getKeyOffset() + kLength
283           - KeyValue.TIMESTAMP_TYPE_SIZE, preKvBuf, prevKv.getKeyOffset() + preKeyLength
284           - KeyValue.TIMESTAMP_TYPE_SIZE);
285 
286       flag |= commonTimestampPrefix << SHIFT_TIMESTAMP_LENGTH;
287 
288       // Check if current and previous values are the same. Compare value
289       // length first as an optimization.
290       if (vLength == preValLength
291           && Bytes.equals(kv.getValueArray(), kv.getValueOffset(), vLength,
292               prevKv.getValueArray(), prevKv.getValueOffset(), preValLength)) {
293         flag |= FLAG_SAME_VALUE;
294       }
295 
296       out.write(flag);
297       if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
298         ByteBufferUtils.putCompressedInt(out, kLength);
299       }
300       if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
301         ByteBufferUtils.putCompressedInt(out, vLength);
302       }
303       ByteBufferUtils.putCompressedInt(out, commonPrefix);
304 
305       if (commonPrefix < kv.getRowLength() + KeyValue.ROW_LENGTH_SIZE) {
306         // Previous and current rows are different. Copy the differing part of
307         // the row, skip the column family, and copy the qualifier.
308         out.write(curKvBuf, kv.getKeyOffset() + commonPrefix, kv.getRowLength()
309             + KeyValue.ROW_LENGTH_SIZE - commonPrefix);
310         out.write(curKvBuf, kv.getQualifierOffset(), kv.getQualifierLength());
311       } else {
312         // The common part includes the whole row. As the column family is the
313         // same across the whole file, it will automatically be included in the
314         // common prefix, so we need not special-case it here.
315         int restKeyLength = kLength - commonPrefix - KeyValue.TIMESTAMP_TYPE_SIZE;
316         out.write(curKvBuf, kv.getKeyOffset() + commonPrefix, restKeyLength);
317       }
318       out.write(curKvBuf, kv.getKeyOffset() + kLength - KeyValue.TIMESTAMP_TYPE_SIZE
319           + commonTimestampPrefix, KeyValue.TIMESTAMP_SIZE - commonTimestampPrefix);
320 
321       // Write the type if it is not the same as before.
322       if ((flag & FLAG_SAME_TYPE) == 0) {
323         out.write(kv.getTypeByte());
324       }
325 
326       // Write the value if it is not the same as before.
327       if ((flag & FLAG_SAME_VALUE) == 0) {
328         out.write(kv.getValueArray(), kv.getValueOffset(), vLength);
329       }
330     }
331     return kLength + vLength + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE;
332   }
333 
334   @Override
335   protected ByteBuffer internalDecodeKeyValues(DataInputStream source, int allocateHeaderLength,
336       int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx) throws IOException {
337     int decompressedSize = source.readInt();
338     ByteBuffer buffer = ByteBuffer.allocate(decompressedSize +
339         allocateHeaderLength);
340     buffer.position(allocateHeaderLength);
341     FastDiffCompressionState state = new FastDiffCompressionState();
342     while (source.available() > skipLastBytes) {
343       uncompressSingleKeyValue(source, buffer, state);
344       afterDecodingKeyValue(source, buffer, decodingCtx);
345     }
346 
347     if (source.available() != skipLastBytes) {
348       throw new IllegalStateException("Read too much bytes.");
349     }
350 
351     return buffer;
352   }
353 
354   @Override
355   public ByteBuffer getFirstKeyInBlock(ByteBuffer block) {
356     block.mark();
357     block.position(Bytes.SIZEOF_INT + Bytes.SIZEOF_BYTE);
358     int keyLength = ByteBufferUtils.readCompressedInt(block);
359     ByteBufferUtils.readCompressedInt(block); // valueLength
360     ByteBufferUtils.readCompressedInt(block); // commonLength
361     int pos = block.position();
362     block.reset();
363     return ByteBuffer.wrap(block.array(), block.arrayOffset() + pos, keyLength)
364         .slice();
365   }
366 
367   @Override
368   public String toString() {
369     return FastDiffDeltaEncoder.class.getSimpleName();
370   }
371 
372   protected static class FastDiffSeekerState extends SeekerState {
373     private byte[] prevTimestampAndType =
374         new byte[KeyValue.TIMESTAMP_TYPE_SIZE];
375     private int rowLengthWithSize;
376     private int familyLengthWithSize;
377 
378     @Override
379     protected void copyFromNext(SeekerState that) {
380       super.copyFromNext(that);
381       FastDiffSeekerState other = (FastDiffSeekerState) that;
382       System.arraycopy(other.prevTimestampAndType, 0,
383           prevTimestampAndType, 0,
384           KeyValue.TIMESTAMP_TYPE_SIZE);
385       rowLengthWithSize = other.rowLengthWithSize;
386       familyLengthWithSize = other.familyLengthWithSize;
387     }
388   }
389 
390   @Override
391   public EncodedSeeker createSeeker(KVComparator comparator,
392       final HFileBlockDecodingContext decodingCtx) {
393     return new BufferedEncodedSeeker<FastDiffSeekerState>(comparator, decodingCtx) {
394       private void decode(boolean isFirst) {
395         byte flag = currentBuffer.get();
396         if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
397           if (!isFirst) {
398             System.arraycopy(current.keyBuffer,
399                 current.keyLength - current.prevTimestampAndType.length,
400                 current.prevTimestampAndType, 0,
401                 current.prevTimestampAndType.length);
402           }
403           current.keyLength = ByteBufferUtils.readCompressedInt(currentBuffer);
404         }
405         if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
406           current.valueLength =
407               ByteBufferUtils.readCompressedInt(currentBuffer);
408         }
409         current.lastCommonPrefix =
410             ByteBufferUtils.readCompressedInt(currentBuffer);
411 
412         current.ensureSpaceForKey();
413 
414         if (isFirst) {
415           // copy everything
416           currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
417               current.keyLength - current.prevTimestampAndType.length);
418           current.rowLengthWithSize = Bytes.toShort(current.keyBuffer, 0) +
419               Bytes.SIZEOF_SHORT;
420           current.familyLengthWithSize =
421               current.keyBuffer[current.rowLengthWithSize] + Bytes.SIZEOF_BYTE;
422         } else if (current.lastCommonPrefix < Bytes.SIZEOF_SHORT) {
423           // length of row is different, copy everything except family
424 
425           // copy the row size
426           int oldRowLengthWithSize = current.rowLengthWithSize;
427           currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
428               Bytes.SIZEOF_SHORT - current.lastCommonPrefix);
429           current.rowLengthWithSize = Bytes.toShort(current.keyBuffer, 0) +
430               Bytes.SIZEOF_SHORT;
431 
432           // move the column family
433           System.arraycopy(current.keyBuffer, oldRowLengthWithSize,
434               current.keyBuffer, current.rowLengthWithSize,
435               current.familyLengthWithSize);
436 
437           // copy the rest of row
438           currentBuffer.get(current.keyBuffer, Bytes.SIZEOF_SHORT,
439               current.rowLengthWithSize - Bytes.SIZEOF_SHORT);
440 
441           // copy the qualifier
442           currentBuffer.get(current.keyBuffer, current.rowLengthWithSize
443               + current.familyLengthWithSize, current.keyLength
444               - current.rowLengthWithSize - current.familyLengthWithSize
445               - current.prevTimestampAndType.length);
446         } else if (current.lastCommonPrefix < current.rowLengthWithSize) {
447           // We have to copy part of row and qualifier, but the column family
448           // is in the right place.
449 
450           // before column family (rest of row)
451           currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
452               current.rowLengthWithSize - current.lastCommonPrefix);
453 
454           // after column family (qualifier)
455           currentBuffer.get(current.keyBuffer, current.rowLengthWithSize
456               + current.familyLengthWithSize, current.keyLength
457               - current.rowLengthWithSize - current.familyLengthWithSize
458               - current.prevTimestampAndType.length);
459         } else {
460           // copy just the ending
461           currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
462               current.keyLength - current.prevTimestampAndType.length
463                   - current.lastCommonPrefix);
464         }
465 
466         // timestamp
467         int pos = current.keyLength - current.prevTimestampAndType.length;
468         int commonTimestampPrefix = (flag & MASK_TIMESTAMP_LENGTH) >>>
469           SHIFT_TIMESTAMP_LENGTH;
470         if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
471           System.arraycopy(current.prevTimestampAndType, 0, current.keyBuffer,
472               pos, commonTimestampPrefix);
473         }
474         pos += commonTimestampPrefix;
475         currentBuffer.get(current.keyBuffer, pos,
476             Bytes.SIZEOF_LONG - commonTimestampPrefix);
477         pos += Bytes.SIZEOF_LONG - commonTimestampPrefix;
478 
479         // type
480         if ((flag & FLAG_SAME_TYPE) == 0) {
481           currentBuffer.get(current.keyBuffer, pos, Bytes.SIZEOF_BYTE);
482         } else if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
483           current.keyBuffer[pos] =
484               current.prevTimestampAndType[Bytes.SIZEOF_LONG];
485         }
486 
487         // handle value
488         if ((flag & FLAG_SAME_VALUE) == 0) {
489           current.valueOffset = currentBuffer.position();
490           ByteBufferUtils.skip(currentBuffer, current.valueLength);
491         }
492 
493         if (includesTags()) {
494           decodeTags();
495         }
496         if (includesMvcc()) {
497           current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
498         } else {
499           current.memstoreTS = 0;
500         }
501         current.nextKvOffset = currentBuffer.position();
502       }
503 
504       @Override
505       protected void decodeFirst() {
506         ByteBufferUtils.skip(currentBuffer, Bytes.SIZEOF_INT);
507         decode(true);
508       }
509 
510       @Override
511       protected void decodeNext() {
512         decode(false);
513       }
514 
515       @Override
516       protected FastDiffSeekerState createSeekerState() {
517         return new FastDiffSeekerState();
518       }
519     };
520   }
521 }