View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations
15   * under the License.
16   */
17  package org.apache.hadoop.hbase.io.encoding;
18  
19  import java.io.DataInputStream;
20  import java.io.DataOutputStream;
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  
24  import org.apache.hadoop.hbase.KeyValue;
25  import org.apache.hadoop.hbase.util.ByteBufferUtils;
26  import org.apache.hadoop.hbase.util.Bytes;
27  import org.apache.hadoop.io.RawComparator;
28  
29  import org.apache.hadoop.classification.InterfaceAudience;
30  
31  /**
32   * Compress using:
33   * - store size of common prefix
34   * - save column family once, it is same within HFile
35   * - use integer compression for key, value and prefix (7-bit encoding)
36   * - use bits to avoid duplication key length, value length
37   *   and type if it same as previous
38   * - store in 3 bits length of timestamp field
39   * - allow diff in timestamp instead of actual value
40   *
41   * Format:
42   * - 1 byte:    flag
43   * - 1-5 bytes: key length (only if FLAG_SAME_KEY_LENGTH is not set in flag)
44   * - 1-5 bytes: value length (only if FLAG_SAME_VALUE_LENGTH is not set in flag)
45   * - 1-5 bytes: prefix length
46   * - ... bytes: rest of the row (if prefix length is small enough)
47   * - ... bytes: qualifier (or suffix depending on prefix length)
48   * - 1-8 bytes: timestamp or diff
49   * - 1 byte:    type (only if FLAG_SAME_TYPE is not set in the flag)
50   * - ... bytes: value
51   */
52  @InterfaceAudience.Private
53  public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder {
54    static final int FLAG_SAME_KEY_LENGTH = 1;
55    static final int FLAG_SAME_VALUE_LENGTH = 1 << 1;
56    static final int FLAG_SAME_TYPE = 1 << 2;
57    static final int FLAG_TIMESTAMP_IS_DIFF = 1 << 3;
58    static final int MASK_TIMESTAMP_LENGTH = (1 << 4) | (1 << 5) | (1 << 6);
59    static final int SHIFT_TIMESTAMP_LENGTH = 4;
60    static final int FLAG_TIMESTAMP_SIGN = 1 << 7;
61  
62    protected static class DiffCompressionState extends CompressionState {
63      long timestamp;
64      byte[] familyNameWithSize;
65  
66      @Override
67      protected void readTimestamp(ByteBuffer in) {
68        timestamp = in.getLong();
69      }
70  
71      @Override
72      void copyFrom(CompressionState state) {
73        super.copyFrom(state);
74        DiffCompressionState state2 = (DiffCompressionState) state;
75        timestamp = state2.timestamp;
76      }
77    }
78  
79    private void compressSingleKeyValue(DiffCompressionState previousState,
80        DiffCompressionState currentState, DataOutputStream out,
81        ByteBuffer in) throws IOException {
82      byte flag = 0;
83      int kvPos = in.position();
84      int keyLength = in.getInt();
85      int valueLength = in.getInt();
86  
87      long timestamp;
88      long diffTimestamp = 0;
89      int diffTimestampFitsInBytes = 0;
90  
91      int commonPrefix;
92  
93      int timestampFitsInBytes;
94  
95      if (previousState.isFirst()) {
96        currentState.readKey(in, keyLength, valueLength);
97        currentState.prevOffset = kvPos;
98        timestamp = currentState.timestamp;
99        if (timestamp < 0) {
100         flag |= FLAG_TIMESTAMP_SIGN;
101         timestamp = -timestamp;
102       }
103       timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp);
104 
105       flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
106       commonPrefix = 0;
107 
108       // put column family
109       in.mark();
110       ByteBufferUtils.skip(in, currentState.rowLength
111           + KeyValue.ROW_LENGTH_SIZE);
112       ByteBufferUtils.moveBufferToStream(out, in, currentState.familyLength
113           + KeyValue.FAMILY_LENGTH_SIZE);
114       in.reset();
115     } else {
116       // find a common prefix and skip it
117       commonPrefix =
118           ByteBufferUtils.findCommonPrefix(in, in.position(),
119               previousState.prevOffset + KeyValue.ROW_OFFSET, keyLength
120                   - KeyValue.TIMESTAMP_TYPE_SIZE);
121       // don't compress timestamp and type using prefix
122 
123       currentState.readKey(in, keyLength, valueLength,
124           commonPrefix, previousState);
125       currentState.prevOffset = kvPos;
126       timestamp = currentState.timestamp;
127       boolean negativeTimestamp = timestamp < 0;
128       if (negativeTimestamp) {
129         timestamp = -timestamp;
130       }
131       timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp);
132 
133       if (keyLength == previousState.keyLength) {
134         flag |= FLAG_SAME_KEY_LENGTH;
135       }
136       if (valueLength == previousState.valueLength) {
137         flag |= FLAG_SAME_VALUE_LENGTH;
138       }
139       if (currentState.type == previousState.type) {
140         flag |= FLAG_SAME_TYPE;
141       }
142 
143       // encode timestamp
144       diffTimestamp = previousState.timestamp - currentState.timestamp;
145       boolean minusDiffTimestamp = diffTimestamp < 0;
146       if (minusDiffTimestamp) {
147         diffTimestamp = -diffTimestamp;
148       }
149       diffTimestampFitsInBytes = ByteBufferUtils.longFitsIn(diffTimestamp);
150       if (diffTimestampFitsInBytes < timestampFitsInBytes) {
151         flag |= (diffTimestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
152         flag |= FLAG_TIMESTAMP_IS_DIFF;
153         if (minusDiffTimestamp) {
154           flag |= FLAG_TIMESTAMP_SIGN;
155         }
156       } else {
157         flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
158         if (negativeTimestamp) {
159           flag |= FLAG_TIMESTAMP_SIGN;
160         }
161       }
162     }
163 
164     out.write(flag);
165 
166     if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
167       ByteBufferUtils.putCompressedInt(out, keyLength);
168     }
169     if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
170       ByteBufferUtils.putCompressedInt(out, valueLength);
171     }
172 
173     ByteBufferUtils.putCompressedInt(out, commonPrefix);
174     ByteBufferUtils.skip(in, commonPrefix);
175 
176     if (previousState.isFirst() ||
177         commonPrefix < currentState.rowLength + KeyValue.ROW_LENGTH_SIZE) {
178       int restRowLength =
179           currentState.rowLength + KeyValue.ROW_LENGTH_SIZE - commonPrefix;
180       ByteBufferUtils.moveBufferToStream(out, in, restRowLength);
181       ByteBufferUtils.skip(in, currentState.familyLength +
182           KeyValue.FAMILY_LENGTH_SIZE);
183       ByteBufferUtils.moveBufferToStream(out, in, currentState.qualifierLength);
184     } else {
185       ByteBufferUtils.moveBufferToStream(out, in,
186           keyLength - commonPrefix - KeyValue.TIMESTAMP_TYPE_SIZE);
187     }
188 
189     if ((flag & FLAG_TIMESTAMP_IS_DIFF) == 0) {
190       ByteBufferUtils.putLong(out, timestamp, timestampFitsInBytes);
191     } else {
192       ByteBufferUtils.putLong(out, diffTimestamp, diffTimestampFitsInBytes);
193     }
194 
195     if ((flag & FLAG_SAME_TYPE) == 0) {
196       out.write(currentState.type);
197     }
198     ByteBufferUtils.skip(in, KeyValue.TIMESTAMP_TYPE_SIZE);
199 
200     ByteBufferUtils.moveBufferToStream(out, in, valueLength);
201   }
202 
203   private void uncompressSingleKeyValue(DataInputStream source,
204       ByteBuffer buffer,
205       DiffCompressionState state)
206           throws IOException, EncoderBufferTooSmallException {
207     // read the column family at the beginning
208     if (state.isFirst()) {
209       state.familyLength = source.readByte();
210       state.familyNameWithSize =
211           new byte[(state.familyLength & 0xff) + KeyValue.FAMILY_LENGTH_SIZE];
212       state.familyNameWithSize[0] = state.familyLength;
213       source.read(state.familyNameWithSize, KeyValue.FAMILY_LENGTH_SIZE,
214           state.familyLength);
215     }
216 
217     // read flag
218     byte flag = source.readByte();
219 
220     // read key/value/common lengths
221     int keyLength;
222     int valueLength;
223     if ((flag & FLAG_SAME_KEY_LENGTH) != 0) {
224       keyLength = state.keyLength;
225     } else {
226       keyLength = ByteBufferUtils.readCompressedInt(source);
227     }
228     if ((flag & FLAG_SAME_VALUE_LENGTH) != 0) {
229       valueLength = state.valueLength;
230     } else {
231       valueLength = ByteBufferUtils.readCompressedInt(source);
232     }
233     int commonPrefix = ByteBufferUtils.readCompressedInt(source);
234 
235     // create KeyValue buffer and fill it prefix
236     int keyOffset = buffer.position();
237     ByteBufferUtils.ensureSpace(buffer, keyLength + valueLength
238         + KeyValue.ROW_OFFSET);
239     buffer.putInt(keyLength);
240     buffer.putInt(valueLength);
241 
242     // copy common from previous key
243     if (commonPrefix > 0) {
244       ByteBufferUtils.copyFromBufferToBuffer(buffer, buffer, state.prevOffset
245           + KeyValue.ROW_OFFSET, commonPrefix);
246     }
247 
248     // copy the rest of the key from the buffer
249     int keyRestLength;
250     if (state.isFirst() || commonPrefix <
251         state.rowLength + KeyValue.ROW_LENGTH_SIZE) {
252       // omit the family part of the key, it is always the same
253       short rowLength;
254       int rowRestLength;
255 
256       // check length of row
257       if (commonPrefix < KeyValue.ROW_LENGTH_SIZE) {
258         // not yet copied, do it now
259         ByteBufferUtils.copyFromStreamToBuffer(buffer, source,
260             KeyValue.ROW_LENGTH_SIZE - commonPrefix);
261         ByteBufferUtils.skip(buffer, -KeyValue.ROW_LENGTH_SIZE);
262         rowLength = buffer.getShort();
263         rowRestLength = rowLength;
264       } else {
265         // already in buffer, just read it
266         rowLength = buffer.getShort(keyOffset + KeyValue.ROW_OFFSET);
267         rowRestLength = rowLength + KeyValue.ROW_LENGTH_SIZE - commonPrefix;
268       }
269 
270       // copy the rest of row
271       ByteBufferUtils.copyFromStreamToBuffer(buffer, source, rowRestLength);
272       state.rowLength = rowLength;
273 
274       // copy the column family
275       buffer.put(state.familyNameWithSize);
276 
277       keyRestLength = keyLength - rowLength -
278           state.familyNameWithSize.length -
279           (KeyValue.ROW_LENGTH_SIZE + KeyValue.TIMESTAMP_TYPE_SIZE);
280     } else {
281       // prevRowWithSizeLength is the same as on previous row
282       keyRestLength = keyLength - commonPrefix - KeyValue.TIMESTAMP_TYPE_SIZE;
283     }
284     // copy the rest of the key, after column family -> column qualifier
285     ByteBufferUtils.copyFromStreamToBuffer(buffer, source, keyRestLength);
286 
287     // handle timestamp
288     int timestampFitsInBytes =
289         ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH) + 1;
290     long timestamp = ByteBufferUtils.readLong(source, timestampFitsInBytes);
291     if ((flag & FLAG_TIMESTAMP_SIGN) != 0) {
292       timestamp = -timestamp;
293     }
294     if ((flag & FLAG_TIMESTAMP_IS_DIFF) != 0) {
295       timestamp = state.timestamp - timestamp;
296     }
297     buffer.putLong(timestamp);
298 
299     // copy the type field
300     byte type;
301     if ((flag & FLAG_SAME_TYPE) != 0) {
302       type = state.type;
303     } else {
304       type = source.readByte();
305     }
306     buffer.put(type);
307 
308     // copy value part
309     ByteBufferUtils.copyFromStreamToBuffer(buffer, source, valueLength);
310 
311     state.keyLength = keyLength;
312     state.valueLength = valueLength;
313     state.prevOffset = keyOffset;
314     state.timestamp = timestamp;
315     state.type = type;
316     // state.qualifier is unused
317   }
318 
319   @Override
320   public void compressKeyValues(DataOutputStream out,
321       ByteBuffer in, boolean includesMemstoreTS) throws IOException {
322     in.rewind();
323     ByteBufferUtils.putInt(out, in.limit());
324     DiffCompressionState previousState = new DiffCompressionState();
325     DiffCompressionState currentState = new DiffCompressionState();
326     while (in.hasRemaining()) {
327       compressSingleKeyValue(previousState, currentState,
328           out, in);
329       afterEncodingKeyValue(in, out, includesMemstoreTS);
330 
331       // swap previousState <-> currentState
332       DiffCompressionState tmp = previousState;
333       previousState = currentState;
334       currentState = tmp;
335     }
336   }
337 
338   @Override
339   public ByteBuffer uncompressKeyValues(DataInputStream source,
340       int allocHeaderLength, int skipLastBytes, boolean includesMemstoreTS)
341       throws IOException {
342     int decompressedSize = source.readInt();
343     ByteBuffer buffer = ByteBuffer.allocate(decompressedSize +
344         allocHeaderLength);
345     buffer.position(allocHeaderLength);
346     DiffCompressionState state = new DiffCompressionState();
347     while (source.available() > skipLastBytes) {
348       uncompressSingleKeyValue(source, buffer, state);
349       afterDecodingKeyValue(source, buffer, includesMemstoreTS);
350     }
351 
352     if (source.available() != skipLastBytes) {
353       throw new IllegalStateException("Read too much bytes.");
354     }
355 
356     return buffer;
357   }
358 
359   @Override
360   public ByteBuffer getFirstKeyInBlock(ByteBuffer block) {
361     block.mark();
362     block.position(Bytes.SIZEOF_INT);
363     byte familyLength = block.get();
364     ByteBufferUtils.skip(block, familyLength);
365     byte flag = block.get();
366     int keyLength = ByteBufferUtils.readCompressedInt(block);
367     ByteBufferUtils.readCompressedInt(block); // valueLength
368     ByteBufferUtils.readCompressedInt(block); // commonLength
369     ByteBuffer result = ByteBuffer.allocate(keyLength);
370 
371     // copy row
372     int pos = result.arrayOffset();
373     block.get(result.array(), pos, Bytes.SIZEOF_SHORT);
374     pos += Bytes.SIZEOF_SHORT;
375     short rowLength = result.getShort();
376     block.get(result.array(), pos, rowLength);
377     pos += rowLength;
378 
379     // copy family
380     int savePosition = block.position();
381     block.position(Bytes.SIZEOF_INT);
382     block.get(result.array(), pos, familyLength + Bytes.SIZEOF_BYTE);
383     pos += familyLength + Bytes.SIZEOF_BYTE;
384 
385     // copy qualifier
386     block.position(savePosition);
387     int qualifierLength =
388         keyLength - pos + result.arrayOffset() - KeyValue.TIMESTAMP_TYPE_SIZE;
389     block.get(result.array(), pos, qualifierLength);
390     pos += qualifierLength;
391 
392     // copy the timestamp and type
393     int timestampFitInBytes =
394         ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH) + 1;
395     long timestamp = ByteBufferUtils.readLong(block, timestampFitInBytes);
396     if ((flag & FLAG_TIMESTAMP_SIGN) != 0) {
397       timestamp = -timestamp;
398     }
399     result.putLong(pos, timestamp);
400     pos += Bytes.SIZEOF_LONG;
401     block.get(result.array(), pos, Bytes.SIZEOF_BYTE);
402 
403     block.reset();
404     return result;
405   }
406 
407   @Override
408   public String toString() {
409     return DiffKeyDeltaEncoder.class.getSimpleName();
410   }
411 
412   protected static class DiffSeekerState extends SeekerState {
413     private int rowLengthWithSize;
414     private long timestamp;
415 
416     @Override
417     protected void copyFromNext(SeekerState that) {
418       super.copyFromNext(that);
419       DiffSeekerState other = (DiffSeekerState) that;
420       rowLengthWithSize = other.rowLengthWithSize;
421       timestamp = other.timestamp;
422     }
423   }
424 
425   @Override
426   public EncodedSeeker createSeeker(RawComparator<byte[]> comparator,
427       final boolean includesMemstoreTS) {
428     return new BufferedEncodedSeeker<DiffSeekerState>(comparator) {
429       private byte[] familyNameWithSize;
430       private static final int TIMESTAMP_WITH_TYPE_LENGTH =
431           Bytes.SIZEOF_LONG + Bytes.SIZEOF_BYTE;
432 
433       private void decode(boolean isFirst) {
434         byte flag = currentBuffer.get();
435         byte type = 0;
436         if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
437           if (!isFirst) {
438             type = current.keyBuffer[current.keyLength - Bytes.SIZEOF_BYTE];
439           }
440           current.keyLength = ByteBufferUtils.readCompressedInt(currentBuffer);
441         }
442         if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
443           current.valueLength =
444               ByteBufferUtils.readCompressedInt(currentBuffer);
445         }
446         current.lastCommonPrefix =
447             ByteBufferUtils.readCompressedInt(currentBuffer);
448 
449         current.ensureSpaceForKey();
450 
451         if (current.lastCommonPrefix < Bytes.SIZEOF_SHORT) {
452           // length of row is different, copy everything except family
453 
454           // copy the row size
455           currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
456               Bytes.SIZEOF_SHORT - current.lastCommonPrefix);
457           current.rowLengthWithSize = Bytes.toShort(current.keyBuffer, 0) +
458               Bytes.SIZEOF_SHORT;
459 
460           // copy the rest of row
461           currentBuffer.get(current.keyBuffer, Bytes.SIZEOF_SHORT,
462               current.rowLengthWithSize - Bytes.SIZEOF_SHORT);
463 
464           // copy the column family
465           System.arraycopy(familyNameWithSize, 0, current.keyBuffer,
466               current.rowLengthWithSize, familyNameWithSize.length);
467 
468           // copy the qualifier
469           currentBuffer.get(current.keyBuffer,
470               current.rowLengthWithSize + familyNameWithSize.length,
471               current.keyLength - current.rowLengthWithSize -
472               familyNameWithSize.length - TIMESTAMP_WITH_TYPE_LENGTH);
473         } else if (current.lastCommonPrefix < current.rowLengthWithSize) {
474           // we have to copy part of row and qualifier,
475           // but column family is in right place
476 
477           // before column family (rest of row)
478           currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
479               current.rowLengthWithSize - current.lastCommonPrefix);
480 
481           // after column family (qualifier)
482           currentBuffer.get(current.keyBuffer,
483               current.rowLengthWithSize + familyNameWithSize.length,
484               current.keyLength - current.rowLengthWithSize -
485               familyNameWithSize.length - TIMESTAMP_WITH_TYPE_LENGTH);
486         } else {
487           // copy just the ending
488           currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
489               current.keyLength - TIMESTAMP_WITH_TYPE_LENGTH -
490               current.lastCommonPrefix);
491         }
492 
493         // timestamp
494         int pos = current.keyLength - TIMESTAMP_WITH_TYPE_LENGTH;
495         int timestampFitInBytes = 1 +
496             ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH);
497         long timestampOrDiff =
498             ByteBufferUtils.readLong(currentBuffer, timestampFitInBytes);
499         if ((flag & FLAG_TIMESTAMP_SIGN) != 0) {
500           timestampOrDiff = -timestampOrDiff;
501         }
502         if ((flag & FLAG_TIMESTAMP_IS_DIFF) == 0) { // it is timestamp
503           current.timestamp = timestampOrDiff;
504         } else { // it is diff
505           current.timestamp = current.timestamp - timestampOrDiff;
506         }
507         Bytes.putLong(current.keyBuffer, pos, current.timestamp);
508         pos += Bytes.SIZEOF_LONG;
509 
510         // type
511         if ((flag & FLAG_SAME_TYPE) == 0) {
512           currentBuffer.get(current.keyBuffer, pos, Bytes.SIZEOF_BYTE);
513         } else if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
514           current.keyBuffer[pos] = type;
515         }
516 
517         current.valueOffset = currentBuffer.position();
518         ByteBufferUtils.skip(currentBuffer, current.valueLength);
519 
520         if (includesMemstoreTS) {
521           current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
522         } else {
523           current.memstoreTS = 0;
524         }
525         current.nextKvOffset = currentBuffer.position();
526       }
527 
528       @Override
529       protected void decodeFirst() {
530         ByteBufferUtils.skip(currentBuffer, Bytes.SIZEOF_INT);
531 
532         // read column family
533         byte familyNameLength = currentBuffer.get();
534         familyNameWithSize = new byte[familyNameLength + Bytes.SIZEOF_BYTE];
535         familyNameWithSize[0] = familyNameLength;
536         currentBuffer.get(familyNameWithSize, Bytes.SIZEOF_BYTE,
537             familyNameLength);
538         decode(true);
539       }
540 
541       @Override
542       protected void decodeNext() {
543         decode(false);
544       }
545 
546       @Override
547       protected DiffSeekerState createSeekerState() {
548         return new DiffSeekerState();
549       }
550     };
551   }
552 }