001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with this
004 * work for additional information regarding copyright ownership. The ASF
005 * licenses this file to you under the Apache License, Version 2.0 (the
006 * "License"); you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
013 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
014 * License for the specific language governing permissions and limitations
015 * under the License.
016 */
017package org.apache.hadoop.hbase.io.encoding;
018
019import java.io.DataInputStream;
020import java.io.DataOutputStream;
021import java.io.IOException;
022import java.nio.ByteBuffer;
023
024import org.apache.hadoop.hbase.Cell;
025import org.apache.hadoop.hbase.CellComparator;
026import org.apache.hadoop.hbase.KeyValue;
027import org.apache.hadoop.hbase.KeyValueUtil;
028import org.apache.hadoop.hbase.PrivateCellUtil;
029import org.apache.hadoop.hbase.nio.ByteBuff;
030import org.apache.hadoop.hbase.util.ByteBufferUtils;
031import org.apache.hadoop.hbase.util.Bytes;
032import org.apache.hadoop.hbase.util.ObjectIntPair;
033import org.apache.yetus.audience.InterfaceAudience;
034
035/**
036 * Compress using:
037 * - store size of common prefix
038 * - save column family once, it is same within HFile
039 * - use integer compression for key, value and prefix (7-bit encoding)
040 * - use bits to avoid duplication key length, value length
041 *   and type if it same as previous
042 * - store in 3 bits length of timestamp field
043 * - allow diff in timestamp instead of actual value
044 *
045 * Format:
046 * - 1 byte:    flag
047 * - 1-5 bytes: key length (only if FLAG_SAME_KEY_LENGTH is not set in flag)
048 * - 1-5 bytes: value length (only if FLAG_SAME_VALUE_LENGTH is not set in flag)
049 * - 1-5 bytes: prefix length
050 * - ... bytes: rest of the row (if prefix length is small enough)
051 * - ... bytes: qualifier (or suffix depending on prefix length)
052 * - 1-8 bytes: timestamp or diff
053 * - 1 byte:    type (only if FLAG_SAME_TYPE is not set in the flag)
054 * - ... bytes: value
055 */
056@InterfaceAudience.Private
057public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder {
058  static final int FLAG_SAME_KEY_LENGTH = 1;
059  static final int FLAG_SAME_VALUE_LENGTH = 1 << 1;
060  static final int FLAG_SAME_TYPE = 1 << 2;
061  static final int FLAG_TIMESTAMP_IS_DIFF = 1 << 3;
062  static final int MASK_TIMESTAMP_LENGTH = (1 << 4) | (1 << 5) | (1 << 6);
063  static final int SHIFT_TIMESTAMP_LENGTH = 4;
064  static final int FLAG_TIMESTAMP_SIGN = 1 << 7;
065
066  protected static class DiffCompressionState extends CompressionState {
067    long timestamp;
068    byte[] familyNameWithSize;
069
070    @Override
071    protected void readTimestamp(ByteBuffer in) {
072      timestamp = in.getLong();
073    }
074
075    @Override
076    void copyFrom(CompressionState state) {
077      super.copyFrom(state);
078      DiffCompressionState state2 = (DiffCompressionState) state;
079      timestamp = state2.timestamp;
080    }
081  }
082
083  private void uncompressSingleKeyValue(DataInputStream source,
084      ByteBuffer buffer,
085      DiffCompressionState state)
086          throws IOException, EncoderBufferTooSmallException {
087    // read the column family at the beginning
088    if (state.isFirst()) {
089      state.familyLength = source.readByte();
090      state.familyNameWithSize =
091          new byte[(state.familyLength & 0xff) + KeyValue.FAMILY_LENGTH_SIZE];
092      state.familyNameWithSize[0] = state.familyLength;
093      int read = source.read(state.familyNameWithSize, KeyValue.FAMILY_LENGTH_SIZE,
094          state.familyLength);
095      assert read == state.familyLength;
096    }
097
098    // read flag
099    byte flag = source.readByte();
100
101    // read key/value/common lengths
102    int keyLength;
103    int valueLength;
104    if ((flag & FLAG_SAME_KEY_LENGTH) != 0) {
105      keyLength = state.keyLength;
106    } else {
107      keyLength = ByteBufferUtils.readCompressedInt(source);
108    }
109    if ((flag & FLAG_SAME_VALUE_LENGTH) != 0) {
110      valueLength = state.valueLength;
111    } else {
112      valueLength = ByteBufferUtils.readCompressedInt(source);
113    }
114    int commonPrefix = ByteBufferUtils.readCompressedInt(source);
115
116    // create KeyValue buffer and fill it prefix
117    int keyOffset = buffer.position();
118    ensureSpace(buffer, keyLength + valueLength + KeyValue.ROW_OFFSET);
119    buffer.putInt(keyLength);
120    buffer.putInt(valueLength);
121
122    // copy common from previous key
123    if (commonPrefix > 0) {
124      ByteBufferUtils.copyFromBufferToBuffer(buffer, buffer, state.prevOffset
125          + KeyValue.ROW_OFFSET, commonPrefix);
126    }
127
128    // copy the rest of the key from the buffer
129    int keyRestLength;
130    if (state.isFirst() || commonPrefix <
131        state.rowLength + KeyValue.ROW_LENGTH_SIZE) {
132      // omit the family part of the key, it is always the same
133      short rowLength;
134      int rowRestLength;
135
136      // check length of row
137      if (commonPrefix < KeyValue.ROW_LENGTH_SIZE) {
138        // not yet copied, do it now
139        ByteBufferUtils.copyFromStreamToBuffer(buffer, source,
140            KeyValue.ROW_LENGTH_SIZE - commonPrefix);
141        ByteBufferUtils.skip(buffer, -KeyValue.ROW_LENGTH_SIZE);
142        rowLength = buffer.getShort();
143        rowRestLength = rowLength;
144      } else {
145        // already in buffer, just read it
146        rowLength = buffer.getShort(keyOffset + KeyValue.ROW_OFFSET);
147        rowRestLength = rowLength + KeyValue.ROW_LENGTH_SIZE - commonPrefix;
148      }
149
150      // copy the rest of row
151      ByteBufferUtils.copyFromStreamToBuffer(buffer, source, rowRestLength);
152      state.rowLength = rowLength;
153
154      // copy the column family
155      buffer.put(state.familyNameWithSize);
156
157      keyRestLength = keyLength - rowLength -
158          state.familyNameWithSize.length -
159          (KeyValue.ROW_LENGTH_SIZE + KeyValue.TIMESTAMP_TYPE_SIZE);
160    } else {
161      // prevRowWithSizeLength is the same as on previous row
162      keyRestLength = keyLength - commonPrefix - KeyValue.TIMESTAMP_TYPE_SIZE;
163    }
164    // copy the rest of the key, after column family -> column qualifier
165    ByteBufferUtils.copyFromStreamToBuffer(buffer, source, keyRestLength);
166
167    // handle timestamp
168    int timestampFitsInBytes =
169        ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH) + 1;
170    long timestamp = ByteBufferUtils.readLong(source, timestampFitsInBytes);
171    if ((flag & FLAG_TIMESTAMP_SIGN) != 0) {
172      timestamp = -timestamp;
173    }
174    if ((flag & FLAG_TIMESTAMP_IS_DIFF) != 0) {
175      timestamp = state.timestamp - timestamp;
176    }
177    buffer.putLong(timestamp);
178
179    // copy the type field
180    byte type;
181    if ((flag & FLAG_SAME_TYPE) != 0) {
182      type = state.type;
183    } else {
184      type = source.readByte();
185    }
186    buffer.put(type);
187
188    // copy value part
189    ByteBufferUtils.copyFromStreamToBuffer(buffer, source, valueLength);
190
191    state.keyLength = keyLength;
192    state.valueLength = valueLength;
193    state.prevOffset = keyOffset;
194    state.timestamp = timestamp;
195    state.type = type;
196    // state.qualifier is unused
197  }
198
199  @Override
200  public int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingContext,
201      DataOutputStream out) throws IOException {
202    EncodingState state = encodingContext.getEncodingState();
203    int size = compressSingleKeyValue(out, cell, state.prevCell);
204    size += afterEncodingKeyValue(cell, out, encodingContext);
205    state.prevCell = cell;
206    return size;
207  }
208
209  private int compressSingleKeyValue(DataOutputStream out, Cell cell, Cell prevCell)
210      throws IOException {
211    int flag = 0; // Do not use more bits that can fit into a byte
212    int kLength = KeyValueUtil.keyLength(cell);
213    int vLength = cell.getValueLength();
214
215    long timestamp;
216    long diffTimestamp = 0;
217    int diffTimestampFitsInBytes = 0;
218    int timestampFitsInBytes;
219    int commonPrefix = 0;
220
221    if (prevCell == null) {
222      timestamp = cell.getTimestamp();
223      if (timestamp < 0) {
224        flag |= FLAG_TIMESTAMP_SIGN;
225        timestamp = -timestamp;
226      }
227      timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp);
228      flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
229      // put column family
230      byte familyLength = cell.getFamilyLength();
231      out.write(familyLength);
232      PrivateCellUtil.writeFamily(out, cell, familyLength);
233    } else {
234      // Finding common prefix
235      int preKeyLength = KeyValueUtil.keyLength(prevCell);
236      commonPrefix = PrivateCellUtil.findCommonPrefixInFlatKey(cell, prevCell, true, false);
237      if (kLength == preKeyLength) {
238        flag |= FLAG_SAME_KEY_LENGTH;
239      }
240      if (vLength == prevCell.getValueLength()) {
241        flag |= FLAG_SAME_VALUE_LENGTH;
242      }
243      if (cell.getTypeByte() == prevCell.getTypeByte()) {
244        flag |= FLAG_SAME_TYPE;
245      }
246      // don't compress timestamp and type using prefix encode timestamp
247      timestamp = cell.getTimestamp();
248      diffTimestamp = prevCell.getTimestamp() - timestamp;
249      boolean negativeTimestamp = timestamp < 0;
250      if (negativeTimestamp) {
251        timestamp = -timestamp;
252      }
253      timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp);
254      boolean minusDiffTimestamp = diffTimestamp < 0;
255      if (minusDiffTimestamp) {
256        diffTimestamp = -diffTimestamp;
257      }
258      diffTimestampFitsInBytes = ByteBufferUtils.longFitsIn(diffTimestamp);
259      if (diffTimestampFitsInBytes < timestampFitsInBytes) {
260        flag |= (diffTimestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
261        flag |= FLAG_TIMESTAMP_IS_DIFF;
262        if (minusDiffTimestamp) {
263          flag |= FLAG_TIMESTAMP_SIGN;
264        }
265      } else {
266        flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
267        if (negativeTimestamp) {
268          flag |= FLAG_TIMESTAMP_SIGN;
269        }
270      }
271    }
272    out.write(flag);
273    if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
274      ByteBufferUtils.putCompressedInt(out, kLength);
275    }
276    if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
277      ByteBufferUtils.putCompressedInt(out, vLength);
278    }
279    ByteBufferUtils.putCompressedInt(out, commonPrefix);
280    short rLen = cell.getRowLength();
281    if (commonPrefix < rLen + KeyValue.ROW_LENGTH_SIZE) {
282      // Previous and current rows are different. Copy the differing part of
283      // the row, skip the column family, and copy the qualifier.
284      PrivateCellUtil.writeRowKeyExcludingCommon(cell, rLen, commonPrefix, out);
285      PrivateCellUtil.writeQualifier(out, cell, cell.getQualifierLength());
286    } else {
287      // The common part includes the whole row. As the column family is the
288      // same across the whole file, it will automatically be included in the
289      // common prefix, so we need not special-case it here.
290      // What we write here is the non common part of the qualifier
291      int commonQualPrefix = commonPrefix - (rLen + KeyValue.ROW_LENGTH_SIZE)
292          - (cell.getFamilyLength() + KeyValue.FAMILY_LENGTH_SIZE);
293      PrivateCellUtil.writeQualifierSkippingBytes(out, cell, cell.getQualifierLength(),
294        commonQualPrefix);
295    }
296    if ((flag & FLAG_TIMESTAMP_IS_DIFF) == 0) {
297      ByteBufferUtils.putLong(out, timestamp, timestampFitsInBytes);
298    } else {
299      ByteBufferUtils.putLong(out, diffTimestamp, diffTimestampFitsInBytes);
300    }
301
302    if ((flag & FLAG_SAME_TYPE) == 0) {
303      out.write(cell.getTypeByte());
304    }
305    PrivateCellUtil.writeValue(out, cell, vLength);
306    return kLength + vLength + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE;
307  }
308
309  @Override
310  public Cell getFirstKeyCellInBlock(ByteBuff block) {
311    block.mark();
312    block.position(Bytes.SIZEOF_INT);
313    byte familyLength = block.get();
314    block.skip(familyLength);
315    byte flag = block.get();
316    int keyLength  = ByteBuff.readCompressedInt(block);
317    // TODO : See if we can avoid these reads as the read values are not getting used
318    ByteBuff.readCompressedInt(block); // valueLength
319    ByteBuff.readCompressedInt(block); // commonLength
320    ByteBuffer result = ByteBuffer.allocate(keyLength);
321
322    // copy row
323    assert !(result.isDirect());
324    int pos = result.arrayOffset();
325    block.get(result.array(), pos, Bytes.SIZEOF_SHORT);
326    pos += Bytes.SIZEOF_SHORT;
327    short rowLength = result.getShort();
328    block.get(result.array(), pos, rowLength);
329    pos += rowLength;
330
331    // copy family
332    int savePosition = block.position();
333    block.position(Bytes.SIZEOF_INT);
334    block.get(result.array(), pos, familyLength + Bytes.SIZEOF_BYTE);
335    pos += familyLength + Bytes.SIZEOF_BYTE;
336
337    // copy qualifier
338    block.position(savePosition);
339    int qualifierLength =
340        keyLength - pos + result.arrayOffset() - KeyValue.TIMESTAMP_TYPE_SIZE;
341    block.get(result.array(), pos, qualifierLength);
342    pos += qualifierLength;
343
344    // copy the timestamp and type
345    int timestampFitInBytes =
346        ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH) + 1;
347    long timestamp = ByteBuff.readLong(block, timestampFitInBytes);
348    if ((flag & FLAG_TIMESTAMP_SIGN) != 0) {
349      timestamp = -timestamp;
350    }
351    result.putLong(pos, timestamp);
352    pos += Bytes.SIZEOF_LONG;
353    block.get(result.array(), pos, Bytes.SIZEOF_BYTE);
354
355    block.reset();
356    // The result is already a BB. So always we will create a KeyOnlyKv.
357    return new KeyValue.KeyOnlyKeyValue(result.array(), 0, keyLength);
358  }
359
360  @Override
361  public String toString() {
362    return DiffKeyDeltaEncoder.class.getSimpleName();
363  }
364
365  protected static class DiffSeekerState extends SeekerState {
366
367    private int rowLengthWithSize;
368    private long timestamp;
369
370    public DiffSeekerState(ObjectIntPair<ByteBuffer> tmpPair,
371        boolean includeTags) {
372      super(tmpPair, includeTags);
373    }
374
375    @Override
376    protected void copyFromNext(SeekerState that) {
377      super.copyFromNext(that);
378      DiffSeekerState other = (DiffSeekerState) that;
379      rowLengthWithSize = other.rowLengthWithSize;
380      timestamp = other.timestamp;
381    }
382  }
383
384  @Override
385  public EncodedSeeker createSeeker(CellComparator comparator,
386      HFileBlockDecodingContext decodingCtx) {
387    return new BufferedEncodedSeeker<DiffSeekerState>(comparator, decodingCtx) {
388      private byte[] familyNameWithSize;
389      private static final int TIMESTAMP_WITH_TYPE_LENGTH =
390          Bytes.SIZEOF_LONG + Bytes.SIZEOF_BYTE;
391
392      private void decode(boolean isFirst) {
393        byte flag = currentBuffer.get();
394        byte type = 0;
395        if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
396          if (!isFirst) {
397            type = current.keyBuffer[current.keyLength - Bytes.SIZEOF_BYTE];
398          }
399          current.keyLength = ByteBuff.readCompressedInt(currentBuffer);
400        }
401        if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
402          current.valueLength = ByteBuff.readCompressedInt(currentBuffer);
403        }
404        current.lastCommonPrefix = ByteBuff.readCompressedInt(currentBuffer);
405
406        current.ensureSpaceForKey();
407
408        if (current.lastCommonPrefix < Bytes.SIZEOF_SHORT) {
409          // length of row is different, copy everything except family
410
411          // copy the row size
412          currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
413              Bytes.SIZEOF_SHORT - current.lastCommonPrefix);
414          current.rowLengthWithSize = Bytes.toShort(current.keyBuffer, 0) +
415              Bytes.SIZEOF_SHORT;
416
417          // copy the rest of row
418          currentBuffer.get(current.keyBuffer, Bytes.SIZEOF_SHORT,
419              current.rowLengthWithSize - Bytes.SIZEOF_SHORT);
420
421          // copy the column family
422          System.arraycopy(familyNameWithSize, 0, current.keyBuffer,
423              current.rowLengthWithSize, familyNameWithSize.length);
424
425          // copy the qualifier
426          currentBuffer.get(current.keyBuffer,
427              current.rowLengthWithSize + familyNameWithSize.length,
428              current.keyLength - current.rowLengthWithSize -
429              familyNameWithSize.length - TIMESTAMP_WITH_TYPE_LENGTH);
430        } else if (current.lastCommonPrefix < current.rowLengthWithSize) {
431          // we have to copy part of row and qualifier,
432          // but column family is in right place
433
434          // before column family (rest of row)
435          currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
436              current.rowLengthWithSize - current.lastCommonPrefix);
437
438          // after column family (qualifier)
439          currentBuffer.get(current.keyBuffer,
440              current.rowLengthWithSize + familyNameWithSize.length,
441              current.keyLength - current.rowLengthWithSize -
442              familyNameWithSize.length - TIMESTAMP_WITH_TYPE_LENGTH);
443        } else {
444          // copy just the ending
445          currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
446              current.keyLength - TIMESTAMP_WITH_TYPE_LENGTH -
447              current.lastCommonPrefix);
448        }
449
450        // timestamp
451        int pos = current.keyLength - TIMESTAMP_WITH_TYPE_LENGTH;
452        int timestampFitInBytes = 1 +
453            ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH);
454        long timestampOrDiff = ByteBuff.readLong(currentBuffer, timestampFitInBytes);
455        if ((flag & FLAG_TIMESTAMP_SIGN) != 0) {
456          timestampOrDiff = -timestampOrDiff;
457        }
458        if ((flag & FLAG_TIMESTAMP_IS_DIFF) == 0) { // it is timestamp
459          current.timestamp = timestampOrDiff;
460        } else { // it is diff
461          current.timestamp = current.timestamp - timestampOrDiff;
462        }
463        Bytes.putLong(current.keyBuffer, pos, current.timestamp);
464        pos += Bytes.SIZEOF_LONG;
465
466        // type
467        if ((flag & FLAG_SAME_TYPE) == 0) {
468          currentBuffer.get(current.keyBuffer, pos, Bytes.SIZEOF_BYTE);
469        } else if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
470          current.keyBuffer[pos] = type;
471        }
472
473        current.valueOffset = currentBuffer.position();
474        currentBuffer.skip(current.valueLength);
475
476        if (includesTags()) {
477          decodeTags();
478        }
479        if (includesMvcc()) {
480          current.memstoreTS = ByteBuff.readVLong(currentBuffer);
481        } else {
482          current.memstoreTS = 0;
483        }
484        current.nextKvOffset = currentBuffer.position();
485      }
486
487      @Override
488      protected void decodeFirst() {
489        currentBuffer.skip(Bytes.SIZEOF_INT);
490
491        // read column family
492        byte familyNameLength = currentBuffer.get();
493        familyNameWithSize = new byte[familyNameLength + Bytes.SIZEOF_BYTE];
494        familyNameWithSize[0] = familyNameLength;
495        currentBuffer.get(familyNameWithSize, Bytes.SIZEOF_BYTE,
496            familyNameLength);
497        decode(true);
498      }
499
500      @Override
501      protected void decodeNext() {
502        decode(false);
503      }
504
505      @Override
506      protected DiffSeekerState createSeekerState() {
507        return new DiffSeekerState(this.tmpPair, this.includesTags());
508      }
509    };
510  }
511
512  @Override
513  protected ByteBuffer internalDecodeKeyValues(DataInputStream source, int allocateHeaderLength,
514      int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx) throws IOException {
515    int decompressedSize = source.readInt();
516    ByteBuffer buffer = ByteBuffer.allocate(decompressedSize +
517        allocateHeaderLength);
518    buffer.position(allocateHeaderLength);
519    DiffCompressionState state = new DiffCompressionState();
520    while (source.available() > skipLastBytes) {
521      uncompressSingleKeyValue(source, buffer, state);
522      afterDecodingKeyValue(source, buffer, decodingCtx);
523    }
524
525    if (source.available() != skipLastBytes) {
526      throw new IllegalStateException("Read too much bytes.");
527    }
528
529    return buffer;
530  }
531}