View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations
15   * under the License.
16   */
17  package org.apache.hadoop.hbase.io.encoding;
18  
19  import java.io.DataInputStream;
20  import java.io.DataOutputStream;
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  
24  import org.apache.hadoop.classification.InterfaceAudience;
25  import org.apache.hadoop.hbase.KeyValue;
26  import org.apache.hadoop.hbase.KeyValue.KVComparator;
27  import org.apache.hadoop.hbase.util.ByteBufferUtils;
28  import org.apache.hadoop.hbase.util.Bytes;
29  
30  /**
31   * Compress using:
32   * - store size of common prefix
33   * - save column family once, it is same within HFile
34   * - use integer compression for key, value and prefix (7-bit encoding)
35   * - use bits to avoid duplication key length, value length
36   *   and type if it same as previous
37   * - store in 3 bits length of timestamp field
38   * - allow diff in timestamp instead of actual value
39   *
40   * Format:
41   * - 1 byte:    flag
42   * - 1-5 bytes: key length (only if FLAG_SAME_KEY_LENGTH is not set in flag)
43   * - 1-5 bytes: value length (only if FLAG_SAME_VALUE_LENGTH is not set in flag)
44   * - 1-5 bytes: prefix length
45   * - ... bytes: rest of the row (if prefix length is small enough)
46   * - ... bytes: qualifier (or suffix depending on prefix length)
47   * - 1-8 bytes: timestamp or diff
48   * - 1 byte:    type (only if FLAG_SAME_TYPE is not set in the flag)
49   * - ... bytes: value
50   */
51  @InterfaceAudience.Private
52  public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder {
53    static final int FLAG_SAME_KEY_LENGTH = 1;
54    static final int FLAG_SAME_VALUE_LENGTH = 1 << 1;
55    static final int FLAG_SAME_TYPE = 1 << 2;
56    static final int FLAG_TIMESTAMP_IS_DIFF = 1 << 3;
57    static final int MASK_TIMESTAMP_LENGTH = (1 << 4) | (1 << 5) | (1 << 6);
58    static final int SHIFT_TIMESTAMP_LENGTH = 4;
59    static final int FLAG_TIMESTAMP_SIGN = 1 << 7;
60  
61    protected static class DiffCompressionState extends CompressionState {
62      long timestamp;
63      byte[] familyNameWithSize;
64  
65      @Override
66      protected void readTimestamp(ByteBuffer in) {
67        timestamp = in.getLong();
68      }
69  
70      @Override
71      void copyFrom(CompressionState state) {
72        super.copyFrom(state);
73        DiffCompressionState state2 = (DiffCompressionState) state;
74        timestamp = state2.timestamp;
75      }
76    }
77  
78    private void compressSingleKeyValue(DiffCompressionState previousState,
79        DiffCompressionState currentState, DataOutputStream out,
80        ByteBuffer in) throws IOException {
81      byte flag = 0;
82      int kvPos = in.position();
83      int keyLength = in.getInt();
84      int valueLength = in.getInt();
85  
86      long timestamp;
87      long diffTimestamp = 0;
88      int diffTimestampFitsInBytes = 0;
89  
90      int commonPrefix;
91  
92      int timestampFitsInBytes;
93  
94      if (previousState.isFirst()) {
95        currentState.readKey(in, keyLength, valueLength);
96        currentState.prevOffset = kvPos;
97        timestamp = currentState.timestamp;
98        if (timestamp < 0) {
99          flag |= FLAG_TIMESTAMP_SIGN;
100         timestamp = -timestamp;
101       }
102       timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp);
103 
104       flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
105       commonPrefix = 0;
106 
107       // put column family
108       in.mark();
109       ByteBufferUtils.skip(in, currentState.rowLength
110           + KeyValue.ROW_LENGTH_SIZE);
111       ByteBufferUtils.moveBufferToStream(out, in, currentState.familyLength
112           + KeyValue.FAMILY_LENGTH_SIZE);
113       in.reset();
114     } else {
115       // find a common prefix and skip it
116       commonPrefix =
117           ByteBufferUtils.findCommonPrefix(in, in.position(),
118               previousState.prevOffset + KeyValue.ROW_OFFSET, keyLength
119                   - KeyValue.TIMESTAMP_TYPE_SIZE);
120       // don't compress timestamp and type using prefix
121 
122       currentState.readKey(in, keyLength, valueLength,
123           commonPrefix, previousState);
124       currentState.prevOffset = kvPos;
125       timestamp = currentState.timestamp;
126       boolean negativeTimestamp = timestamp < 0;
127       if (negativeTimestamp) {
128         timestamp = -timestamp;
129       }
130       timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp);
131 
132       if (keyLength == previousState.keyLength) {
133         flag |= FLAG_SAME_KEY_LENGTH;
134       }
135       if (valueLength == previousState.valueLength) {
136         flag |= FLAG_SAME_VALUE_LENGTH;
137       }
138       if (currentState.type == previousState.type) {
139         flag |= FLAG_SAME_TYPE;
140       }
141 
142       // encode timestamp
143       diffTimestamp = previousState.timestamp - currentState.timestamp;
144       boolean minusDiffTimestamp = diffTimestamp < 0;
145       if (minusDiffTimestamp) {
146         diffTimestamp = -diffTimestamp;
147       }
148       diffTimestampFitsInBytes = ByteBufferUtils.longFitsIn(diffTimestamp);
149       if (diffTimestampFitsInBytes < timestampFitsInBytes) {
150         flag |= (diffTimestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
151         flag |= FLAG_TIMESTAMP_IS_DIFF;
152         if (minusDiffTimestamp) {
153           flag |= FLAG_TIMESTAMP_SIGN;
154         }
155       } else {
156         flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
157         if (negativeTimestamp) {
158           flag |= FLAG_TIMESTAMP_SIGN;
159         }
160       }
161     }
162 
163     out.write(flag);
164 
165     if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
166       ByteBufferUtils.putCompressedInt(out, keyLength);
167     }
168     if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
169       ByteBufferUtils.putCompressedInt(out, valueLength);
170     }
171 
172     ByteBufferUtils.putCompressedInt(out, commonPrefix);
173     ByteBufferUtils.skip(in, commonPrefix);
174 
175     if (previousState.isFirst() ||
176         commonPrefix < currentState.rowLength + KeyValue.ROW_LENGTH_SIZE) {
177       int restRowLength =
178           currentState.rowLength + KeyValue.ROW_LENGTH_SIZE - commonPrefix;
179       ByteBufferUtils.moveBufferToStream(out, in, restRowLength);
180       ByteBufferUtils.skip(in, currentState.familyLength +
181           KeyValue.FAMILY_LENGTH_SIZE);
182       ByteBufferUtils.moveBufferToStream(out, in, currentState.qualifierLength);
183     } else {
184       ByteBufferUtils.moveBufferToStream(out, in,
185           keyLength - commonPrefix - KeyValue.TIMESTAMP_TYPE_SIZE);
186     }
187 
188     if ((flag & FLAG_TIMESTAMP_IS_DIFF) == 0) {
189       ByteBufferUtils.putLong(out, timestamp, timestampFitsInBytes);
190     } else {
191       ByteBufferUtils.putLong(out, diffTimestamp, diffTimestampFitsInBytes);
192     }
193 
194     if ((flag & FLAG_SAME_TYPE) == 0) {
195       out.write(currentState.type);
196     }
197     ByteBufferUtils.skip(in, KeyValue.TIMESTAMP_TYPE_SIZE);
198 
199     ByteBufferUtils.moveBufferToStream(out, in, valueLength);
200   }
201 
202   private void uncompressSingleKeyValue(DataInputStream source,
203       ByteBuffer buffer,
204       DiffCompressionState state)
205           throws IOException, EncoderBufferTooSmallException {
206     // read the column family at the beginning
207     if (state.isFirst()) {
208       state.familyLength = source.readByte();
209       state.familyNameWithSize =
210           new byte[(state.familyLength & 0xff) + KeyValue.FAMILY_LENGTH_SIZE];
211       state.familyNameWithSize[0] = state.familyLength;
212       int read = source.read(state.familyNameWithSize, KeyValue.FAMILY_LENGTH_SIZE,
213           state.familyLength);
214       assert read == state.familyLength;
215     }
216 
217     // read flag
218     byte flag = source.readByte();
219 
220     // read key/value/common lengths
221     int keyLength;
222     int valueLength;
223     if ((flag & FLAG_SAME_KEY_LENGTH) != 0) {
224       keyLength = state.keyLength;
225     } else {
226       keyLength = ByteBufferUtils.readCompressedInt(source);
227     }
228     if ((flag & FLAG_SAME_VALUE_LENGTH) != 0) {
229       valueLength = state.valueLength;
230     } else {
231       valueLength = ByteBufferUtils.readCompressedInt(source);
232     }
233     int commonPrefix = ByteBufferUtils.readCompressedInt(source);
234 
235     // create KeyValue buffer and fill it prefix
236     int keyOffset = buffer.position();
237     ensureSpace(buffer, keyLength + valueLength + KeyValue.ROW_OFFSET);
238     buffer.putInt(keyLength);
239     buffer.putInt(valueLength);
240 
241     // copy common from previous key
242     if (commonPrefix > 0) {
243       ByteBufferUtils.copyFromBufferToBuffer(buffer, buffer, state.prevOffset
244           + KeyValue.ROW_OFFSET, commonPrefix);
245     }
246 
247     // copy the rest of the key from the buffer
248     int keyRestLength;
249     if (state.isFirst() || commonPrefix <
250         state.rowLength + KeyValue.ROW_LENGTH_SIZE) {
251       // omit the family part of the key, it is always the same
252       short rowLength;
253       int rowRestLength;
254 
255       // check length of row
256       if (commonPrefix < KeyValue.ROW_LENGTH_SIZE) {
257         // not yet copied, do it now
258         ByteBufferUtils.copyFromStreamToBuffer(buffer, source,
259             KeyValue.ROW_LENGTH_SIZE - commonPrefix);
260         ByteBufferUtils.skip(buffer, -KeyValue.ROW_LENGTH_SIZE);
261         rowLength = buffer.getShort();
262         rowRestLength = rowLength;
263       } else {
264         // already in buffer, just read it
265         rowLength = buffer.getShort(keyOffset + KeyValue.ROW_OFFSET);
266         rowRestLength = rowLength + KeyValue.ROW_LENGTH_SIZE - commonPrefix;
267       }
268 
269       // copy the rest of row
270       ByteBufferUtils.copyFromStreamToBuffer(buffer, source, rowRestLength);
271       state.rowLength = rowLength;
272 
273       // copy the column family
274       buffer.put(state.familyNameWithSize);
275 
276       keyRestLength = keyLength - rowLength -
277           state.familyNameWithSize.length -
278           (KeyValue.ROW_LENGTH_SIZE + KeyValue.TIMESTAMP_TYPE_SIZE);
279     } else {
280       // prevRowWithSizeLength is the same as on previous row
281       keyRestLength = keyLength - commonPrefix - KeyValue.TIMESTAMP_TYPE_SIZE;
282     }
283     // copy the rest of the key, after column family -> column qualifier
284     ByteBufferUtils.copyFromStreamToBuffer(buffer, source, keyRestLength);
285 
286     // handle timestamp
287     int timestampFitsInBytes =
288         ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH) + 1;
289     long timestamp = ByteBufferUtils.readLong(source, timestampFitsInBytes);
290     if ((flag & FLAG_TIMESTAMP_SIGN) != 0) {
291       timestamp = -timestamp;
292     }
293     if ((flag & FLAG_TIMESTAMP_IS_DIFF) != 0) {
294       timestamp = state.timestamp - timestamp;
295     }
296     buffer.putLong(timestamp);
297 
298     // copy the type field
299     byte type;
300     if ((flag & FLAG_SAME_TYPE) != 0) {
301       type = state.type;
302     } else {
303       type = source.readByte();
304     }
305     buffer.put(type);
306 
307     // copy value part
308     ByteBufferUtils.copyFromStreamToBuffer(buffer, source, valueLength);
309 
310     state.keyLength = keyLength;
311     state.valueLength = valueLength;
312     state.prevOffset = keyOffset;
313     state.timestamp = timestamp;
314     state.type = type;
315     // state.qualifier is unused
316   }
317 
318   @Override
319   public void internalEncodeKeyValues(DataOutputStream out,
320       ByteBuffer in, HFileBlockDefaultEncodingContext encodingCtx) throws IOException {
321     in.rewind();
322     ByteBufferUtils.putInt(out, in.limit());
323     DiffCompressionState previousState = new DiffCompressionState();
324     DiffCompressionState currentState = new DiffCompressionState();
325     while (in.hasRemaining()) {
326       compressSingleKeyValue(previousState, currentState,
327           out, in);
328       afterEncodingKeyValue(in, out, encodingCtx);
329 
330       // swap previousState <-> currentState
331       DiffCompressionState tmp = previousState;
332       previousState = currentState;
333       currentState = tmp;
334     }
335   }
336 
337 
338   @Override
339   public ByteBuffer getFirstKeyInBlock(ByteBuffer block) {
340     block.mark();
341     block.position(Bytes.SIZEOF_INT);
342     byte familyLength = block.get();
343     ByteBufferUtils.skip(block, familyLength);
344     byte flag = block.get();
345     int keyLength = ByteBufferUtils.readCompressedInt(block);
346     ByteBufferUtils.readCompressedInt(block); // valueLength
347     ByteBufferUtils.readCompressedInt(block); // commonLength
348     ByteBuffer result = ByteBuffer.allocate(keyLength);
349 
350     // copy row
351     int pos = result.arrayOffset();
352     block.get(result.array(), pos, Bytes.SIZEOF_SHORT);
353     pos += Bytes.SIZEOF_SHORT;
354     short rowLength = result.getShort();
355     block.get(result.array(), pos, rowLength);
356     pos += rowLength;
357 
358     // copy family
359     int savePosition = block.position();
360     block.position(Bytes.SIZEOF_INT);
361     block.get(result.array(), pos, familyLength + Bytes.SIZEOF_BYTE);
362     pos += familyLength + Bytes.SIZEOF_BYTE;
363 
364     // copy qualifier
365     block.position(savePosition);
366     int qualifierLength =
367         keyLength - pos + result.arrayOffset() - KeyValue.TIMESTAMP_TYPE_SIZE;
368     block.get(result.array(), pos, qualifierLength);
369     pos += qualifierLength;
370 
371     // copy the timestamp and type
372     int timestampFitInBytes =
373         ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH) + 1;
374     long timestamp = ByteBufferUtils.readLong(block, timestampFitInBytes);
375     if ((flag & FLAG_TIMESTAMP_SIGN) != 0) {
376       timestamp = -timestamp;
377     }
378     result.putLong(pos, timestamp);
379     pos += Bytes.SIZEOF_LONG;
380     block.get(result.array(), pos, Bytes.SIZEOF_BYTE);
381 
382     block.reset();
383     return result;
384   }
385 
386   @Override
387   public String toString() {
388     return DiffKeyDeltaEncoder.class.getSimpleName();
389   }
390 
391   protected static class DiffSeekerState extends SeekerState {
392     private int rowLengthWithSize;
393     private long timestamp;
394 
395     @Override
396     protected void copyFromNext(SeekerState that) {
397       super.copyFromNext(that);
398       DiffSeekerState other = (DiffSeekerState) that;
399       rowLengthWithSize = other.rowLengthWithSize;
400       timestamp = other.timestamp;
401     }
402   }
403 
404   @Override
405   public EncodedSeeker createSeeker(KVComparator comparator,
406       HFileBlockDecodingContext decodingCtx) {
407     return new BufferedEncodedSeeker<DiffSeekerState>(comparator, decodingCtx) {
408       private byte[] familyNameWithSize;
409       private static final int TIMESTAMP_WITH_TYPE_LENGTH =
410           Bytes.SIZEOF_LONG + Bytes.SIZEOF_BYTE;
411 
412       private void decode(boolean isFirst) {
413         byte flag = currentBuffer.get();
414         byte type = 0;
415         if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
416           if (!isFirst) {
417             type = current.keyBuffer[current.keyLength - Bytes.SIZEOF_BYTE];
418           }
419           current.keyLength = ByteBufferUtils.readCompressedInt(currentBuffer);
420         }
421         if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
422           current.valueLength =
423               ByteBufferUtils.readCompressedInt(currentBuffer);
424         }
425         current.lastCommonPrefix =
426             ByteBufferUtils.readCompressedInt(currentBuffer);
427 
428         current.ensureSpaceForKey();
429 
430         if (current.lastCommonPrefix < Bytes.SIZEOF_SHORT) {
431           // length of row is different, copy everything except family
432 
433           // copy the row size
434           currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
435               Bytes.SIZEOF_SHORT - current.lastCommonPrefix);
436           current.rowLengthWithSize = Bytes.toShort(current.keyBuffer, 0) +
437               Bytes.SIZEOF_SHORT;
438 
439           // copy the rest of row
440           currentBuffer.get(current.keyBuffer, Bytes.SIZEOF_SHORT,
441               current.rowLengthWithSize - Bytes.SIZEOF_SHORT);
442 
443           // copy the column family
444           System.arraycopy(familyNameWithSize, 0, current.keyBuffer,
445               current.rowLengthWithSize, familyNameWithSize.length);
446 
447           // copy the qualifier
448           currentBuffer.get(current.keyBuffer,
449               current.rowLengthWithSize + familyNameWithSize.length,
450               current.keyLength - current.rowLengthWithSize -
451               familyNameWithSize.length - TIMESTAMP_WITH_TYPE_LENGTH);
452         } else if (current.lastCommonPrefix < current.rowLengthWithSize) {
453           // we have to copy part of row and qualifier,
454           // but column family is in right place
455 
456           // before column family (rest of row)
457           currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
458               current.rowLengthWithSize - current.lastCommonPrefix);
459 
460           // after column family (qualifier)
461           currentBuffer.get(current.keyBuffer,
462               current.rowLengthWithSize + familyNameWithSize.length,
463               current.keyLength - current.rowLengthWithSize -
464               familyNameWithSize.length - TIMESTAMP_WITH_TYPE_LENGTH);
465         } else {
466           // copy just the ending
467           currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
468               current.keyLength - TIMESTAMP_WITH_TYPE_LENGTH -
469               current.lastCommonPrefix);
470         }
471 
472         // timestamp
473         int pos = current.keyLength - TIMESTAMP_WITH_TYPE_LENGTH;
474         int timestampFitInBytes = 1 +
475             ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH);
476         long timestampOrDiff =
477             ByteBufferUtils.readLong(currentBuffer, timestampFitInBytes);
478         if ((flag & FLAG_TIMESTAMP_SIGN) != 0) {
479           timestampOrDiff = -timestampOrDiff;
480         }
481         if ((flag & FLAG_TIMESTAMP_IS_DIFF) == 0) { // it is timestamp
482           current.timestamp = timestampOrDiff;
483         } else { // it is diff
484           current.timestamp = current.timestamp - timestampOrDiff;
485         }
486         Bytes.putLong(current.keyBuffer, pos, current.timestamp);
487         pos += Bytes.SIZEOF_LONG;
488 
489         // type
490         if ((flag & FLAG_SAME_TYPE) == 0) {
491           currentBuffer.get(current.keyBuffer, pos, Bytes.SIZEOF_BYTE);
492         } else if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
493           current.keyBuffer[pos] = type;
494         }
495 
496         current.valueOffset = currentBuffer.position();
497         ByteBufferUtils.skip(currentBuffer, current.valueLength);
498 
499         if (includesTags()) {
500           decodeTags();
501         }
502         if (includesMvcc()) {
503           current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
504         } else {
505           current.memstoreTS = 0;
506         }
507         current.nextKvOffset = currentBuffer.position();
508       }
509 
510       @Override
511       protected void decodeFirst() {
512         ByteBufferUtils.skip(currentBuffer, Bytes.SIZEOF_INT);
513 
514         // read column family
515         byte familyNameLength = currentBuffer.get();
516         familyNameWithSize = new byte[familyNameLength + Bytes.SIZEOF_BYTE];
517         familyNameWithSize[0] = familyNameLength;
518         currentBuffer.get(familyNameWithSize, Bytes.SIZEOF_BYTE,
519             familyNameLength);
520         decode(true);
521       }
522 
523       @Override
524       protected void decodeNext() {
525         decode(false);
526       }
527 
528       @Override
529       protected DiffSeekerState createSeekerState() {
530         return new DiffSeekerState();
531       }
532     };
533   }
534 
535   @Override
536   protected ByteBuffer internalDecodeKeyValues(DataInputStream source, int allocateHeaderLength,
537       int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx) throws IOException {
538     int decompressedSize = source.readInt();
539     ByteBuffer buffer = ByteBuffer.allocate(decompressedSize +
540         allocateHeaderLength);
541     buffer.position(allocateHeaderLength);
542     DiffCompressionState state = new DiffCompressionState();
543     while (source.available() > skipLastBytes) {
544       uncompressSingleKeyValue(source, buffer, state);
545       afterDecodingKeyValue(source, buffer, decodingCtx);
546     }
547 
548     if (source.available() != skipLastBytes) {
549       throw new IllegalStateException("Read too much bytes.");
550     }
551 
552     return buffer;
553   }
554 }