001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with this
004 * work for additional information regarding copyright ownership. The ASF
005 * licenses this file to you under the Apache License, Version 2.0 (the
006 * "License"); you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
013 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
014 * License for the specific language governing permissions and limitations
015 * under the License.
016 */
017package org.apache.hadoop.hbase.io.encoding;
018
019import java.io.DataInputStream;
020import java.io.DataOutputStream;
021import java.io.IOException;
022import java.nio.ByteBuffer;
023import org.apache.hadoop.hbase.Cell;
024import org.apache.hadoop.hbase.KeyValue;
025import org.apache.hadoop.hbase.KeyValueUtil;
026import org.apache.hadoop.hbase.PrivateCellUtil;
027import org.apache.hadoop.hbase.nio.ByteBuff;
028import org.apache.hadoop.hbase.util.ByteBufferUtils;
029import org.apache.hadoop.hbase.util.Bytes;
030import org.apache.hadoop.hbase.util.ObjectIntPair;
031import org.apache.yetus.audience.InterfaceAudience;
032
033/**
034 * Compress using:
035 * - store size of common prefix
036 * - save column family once, it is same within HFile
037 * - use integer compression for key, value and prefix (7-bit encoding)
038 * - use bits to avoid duplication key length, value length
039 *   and type if it same as previous
040 * - store in 3 bits length of timestamp field
041 * - allow diff in timestamp instead of actual value
042 *
043 * Format:
044 * - 1 byte:    flag
045 * - 1-5 bytes: key length (only if FLAG_SAME_KEY_LENGTH is not set in flag)
046 * - 1-5 bytes: value length (only if FLAG_SAME_VALUE_LENGTH is not set in flag)
047 * - 1-5 bytes: prefix length
048 * - ... bytes: rest of the row (if prefix length is small enough)
049 * - ... bytes: qualifier (or suffix depending on prefix length)
050 * - 1-8 bytes: timestamp or diff
051 * - 1 byte:    type (only if FLAG_SAME_TYPE is not set in the flag)
052 * - ... bytes: value
053 */
054@InterfaceAudience.Private
055public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder {
056  static final int FLAG_SAME_KEY_LENGTH = 1;
057  static final int FLAG_SAME_VALUE_LENGTH = 1 << 1;
058  static final int FLAG_SAME_TYPE = 1 << 2;
059  static final int FLAG_TIMESTAMP_IS_DIFF = 1 << 3;
060  static final int MASK_TIMESTAMP_LENGTH = (1 << 4) | (1 << 5) | (1 << 6);
061  static final int SHIFT_TIMESTAMP_LENGTH = 4;
062  static final int FLAG_TIMESTAMP_SIGN = 1 << 7;
063
064  protected static class DiffCompressionState extends CompressionState {
065    long timestamp;
066    byte[] familyNameWithSize;
067
068    @Override
069    protected void readTimestamp(ByteBuffer in) {
070      timestamp = in.getLong();
071    }
072
073    @Override
074    void copyFrom(CompressionState state) {
075      super.copyFrom(state);
076      DiffCompressionState state2 = (DiffCompressionState) state;
077      timestamp = state2.timestamp;
078    }
079  }
080
081  private void uncompressSingleKeyValue(DataInputStream source,
082      ByteBuffer buffer,
083      DiffCompressionState state)
084          throws IOException, EncoderBufferTooSmallException {
085    // read the column family at the beginning
086    if (state.isFirst()) {
087      state.familyLength = source.readByte();
088      state.familyNameWithSize =
089          new byte[(state.familyLength & 0xff) + KeyValue.FAMILY_LENGTH_SIZE];
090      state.familyNameWithSize[0] = state.familyLength;
091      int read = source.read(state.familyNameWithSize, KeyValue.FAMILY_LENGTH_SIZE,
092          state.familyLength);
093      assert read == state.familyLength;
094    }
095
096    // read flag
097    byte flag = source.readByte();
098
099    // read key/value/common lengths
100    int keyLength;
101    int valueLength;
102    if ((flag & FLAG_SAME_KEY_LENGTH) != 0) {
103      keyLength = state.keyLength;
104    } else {
105      keyLength = ByteBufferUtils.readCompressedInt(source);
106    }
107    if ((flag & FLAG_SAME_VALUE_LENGTH) != 0) {
108      valueLength = state.valueLength;
109    } else {
110      valueLength = ByteBufferUtils.readCompressedInt(source);
111    }
112    int commonPrefix = ByteBufferUtils.readCompressedInt(source);
113
114    // create KeyValue buffer and fill it prefix
115    int keyOffset = buffer.position();
116    ensureSpace(buffer, keyLength + valueLength + KeyValue.ROW_OFFSET);
117    buffer.putInt(keyLength);
118    buffer.putInt(valueLength);
119
120    // copy common from previous key
121    if (commonPrefix > 0) {
122      ByteBufferUtils.copyFromBufferToBuffer(buffer, buffer, state.prevOffset
123          + KeyValue.ROW_OFFSET, commonPrefix);
124    }
125
126    // copy the rest of the key from the buffer
127    int keyRestLength;
128    if (state.isFirst() || commonPrefix <
129        state.rowLength + KeyValue.ROW_LENGTH_SIZE) {
130      // omit the family part of the key, it is always the same
131      short rowLength;
132      int rowRestLength;
133
134      // check length of row
135      if (commonPrefix < KeyValue.ROW_LENGTH_SIZE) {
136        // not yet copied, do it now
137        ByteBufferUtils.copyFromStreamToBuffer(buffer, source,
138            KeyValue.ROW_LENGTH_SIZE - commonPrefix);
139        ByteBufferUtils.skip(buffer, -KeyValue.ROW_LENGTH_SIZE);
140        rowLength = buffer.getShort();
141        rowRestLength = rowLength;
142      } else {
143        // already in buffer, just read it
144        rowLength = buffer.getShort(keyOffset + KeyValue.ROW_OFFSET);
145        rowRestLength = rowLength + KeyValue.ROW_LENGTH_SIZE - commonPrefix;
146      }
147
148      // copy the rest of row
149      ByteBufferUtils.copyFromStreamToBuffer(buffer, source, rowRestLength);
150      state.rowLength = rowLength;
151
152      // copy the column family
153      buffer.put(state.familyNameWithSize);
154
155      keyRestLength = keyLength - rowLength -
156          state.familyNameWithSize.length -
157          (KeyValue.ROW_LENGTH_SIZE + KeyValue.TIMESTAMP_TYPE_SIZE);
158    } else {
159      // prevRowWithSizeLength is the same as on previous row
160      keyRestLength = keyLength - commonPrefix - KeyValue.TIMESTAMP_TYPE_SIZE;
161    }
162    // copy the rest of the key, after column family -> column qualifier
163    ByteBufferUtils.copyFromStreamToBuffer(buffer, source, keyRestLength);
164
165    // handle timestamp
166    int timestampFitsInBytes =
167        ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH) + 1;
168    long timestamp = ByteBufferUtils.readLong(source, timestampFitsInBytes);
169    if ((flag & FLAG_TIMESTAMP_SIGN) != 0) {
170      timestamp = -timestamp;
171    }
172    if ((flag & FLAG_TIMESTAMP_IS_DIFF) != 0) {
173      timestamp = state.timestamp - timestamp;
174    }
175    buffer.putLong(timestamp);
176
177    // copy the type field
178    byte type;
179    if ((flag & FLAG_SAME_TYPE) != 0) {
180      type = state.type;
181    } else {
182      type = source.readByte();
183    }
184    buffer.put(type);
185
186    // copy value part
187    ByteBufferUtils.copyFromStreamToBuffer(buffer, source, valueLength);
188
189    state.keyLength = keyLength;
190    state.valueLength = valueLength;
191    state.prevOffset = keyOffset;
192    state.timestamp = timestamp;
193    state.type = type;
194    // state.qualifier is unused
195  }
196
197  @Override
198  public int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingContext,
199      DataOutputStream out) throws IOException {
200    EncodingState state = encodingContext.getEncodingState();
201    int size = compressSingleKeyValue(out, cell, state.prevCell);
202    size += afterEncodingKeyValue(cell, out, encodingContext);
203    state.prevCell = cell;
204    return size;
205  }
206
207  private int compressSingleKeyValue(DataOutputStream out, Cell cell, Cell prevCell)
208      throws IOException {
209    int flag = 0; // Do not use more bits that can fit into a byte
210    int kLength = KeyValueUtil.keyLength(cell);
211    int vLength = cell.getValueLength();
212
213    long timestamp;
214    long diffTimestamp = 0;
215    int diffTimestampFitsInBytes = 0;
216    int timestampFitsInBytes;
217    int commonPrefix = 0;
218
219    if (prevCell == null) {
220      timestamp = cell.getTimestamp();
221      if (timestamp < 0) {
222        flag |= FLAG_TIMESTAMP_SIGN;
223        timestamp = -timestamp;
224      }
225      timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp);
226      flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
227      // put column family
228      byte familyLength = cell.getFamilyLength();
229      out.write(familyLength);
230      PrivateCellUtil.writeFamily(out, cell, familyLength);
231    } else {
232      // Finding common prefix
233      int preKeyLength = KeyValueUtil.keyLength(prevCell);
234      commonPrefix = PrivateCellUtil.findCommonPrefixInFlatKey(cell, prevCell, true, false);
235      if (kLength == preKeyLength) {
236        flag |= FLAG_SAME_KEY_LENGTH;
237      }
238      if (vLength == prevCell.getValueLength()) {
239        flag |= FLAG_SAME_VALUE_LENGTH;
240      }
241      if (cell.getTypeByte() == prevCell.getTypeByte()) {
242        flag |= FLAG_SAME_TYPE;
243      }
244      // don't compress timestamp and type using prefix encode timestamp
245      timestamp = cell.getTimestamp();
246      diffTimestamp = prevCell.getTimestamp() - timestamp;
247      boolean negativeTimestamp = timestamp < 0;
248      if (negativeTimestamp) {
249        timestamp = -timestamp;
250      }
251      timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp);
252      boolean minusDiffTimestamp = diffTimestamp < 0;
253      if (minusDiffTimestamp) {
254        diffTimestamp = -diffTimestamp;
255      }
256      diffTimestampFitsInBytes = ByteBufferUtils.longFitsIn(diffTimestamp);
257      if (diffTimestampFitsInBytes < timestampFitsInBytes) {
258        flag |= (diffTimestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
259        flag |= FLAG_TIMESTAMP_IS_DIFF;
260        if (minusDiffTimestamp) {
261          flag |= FLAG_TIMESTAMP_SIGN;
262        }
263      } else {
264        flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
265        if (negativeTimestamp) {
266          flag |= FLAG_TIMESTAMP_SIGN;
267        }
268      }
269    }
270    out.write(flag);
271    if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
272      ByteBufferUtils.putCompressedInt(out, kLength);
273    }
274    if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
275      ByteBufferUtils.putCompressedInt(out, vLength);
276    }
277    ByteBufferUtils.putCompressedInt(out, commonPrefix);
278    short rLen = cell.getRowLength();
279    if (commonPrefix < rLen + KeyValue.ROW_LENGTH_SIZE) {
280      // Previous and current rows are different. Copy the differing part of
281      // the row, skip the column family, and copy the qualifier.
282      PrivateCellUtil.writeRowKeyExcludingCommon(cell, rLen, commonPrefix, out);
283      PrivateCellUtil.writeQualifier(out, cell, cell.getQualifierLength());
284    } else {
285      // The common part includes the whole row. As the column family is the
286      // same across the whole file, it will automatically be included in the
287      // common prefix, so we need not special-case it here.
288      // What we write here is the non common part of the qualifier
289      int commonQualPrefix = commonPrefix - (rLen + KeyValue.ROW_LENGTH_SIZE)
290          - (cell.getFamilyLength() + KeyValue.FAMILY_LENGTH_SIZE);
291      PrivateCellUtil.writeQualifierSkippingBytes(out, cell, cell.getQualifierLength(),
292        commonQualPrefix);
293    }
294    if ((flag & FLAG_TIMESTAMP_IS_DIFF) == 0) {
295      ByteBufferUtils.putLong(out, timestamp, timestampFitsInBytes);
296    } else {
297      ByteBufferUtils.putLong(out, diffTimestamp, diffTimestampFitsInBytes);
298    }
299
300    if ((flag & FLAG_SAME_TYPE) == 0) {
301      out.write(cell.getTypeByte());
302    }
303    PrivateCellUtil.writeValue(out, cell, vLength);
304    return kLength + vLength + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE;
305  }
306
307  @Override
308  public Cell getFirstKeyCellInBlock(ByteBuff block) {
309    block.mark();
310    block.position(Bytes.SIZEOF_INT);
311    byte familyLength = block.get();
312    block.skip(familyLength);
313    byte flag = block.get();
314    int keyLength  = ByteBuff.readCompressedInt(block);
315    // TODO : See if we can avoid these reads as the read values are not getting used
316    ByteBuff.readCompressedInt(block); // valueLength
317    ByteBuff.readCompressedInt(block); // commonLength
318    ByteBuffer result = ByteBuffer.allocate(keyLength);
319
320    // copy row
321    assert !(result.isDirect());
322    int pos = result.arrayOffset();
323    block.get(result.array(), pos, Bytes.SIZEOF_SHORT);
324    pos += Bytes.SIZEOF_SHORT;
325    short rowLength = result.getShort();
326    block.get(result.array(), pos, rowLength);
327    pos += rowLength;
328
329    // copy family
330    int savePosition = block.position();
331    block.position(Bytes.SIZEOF_INT);
332    block.get(result.array(), pos, familyLength + Bytes.SIZEOF_BYTE);
333    pos += familyLength + Bytes.SIZEOF_BYTE;
334
335    // copy qualifier
336    block.position(savePosition);
337    int qualifierLength =
338        keyLength - pos + result.arrayOffset() - KeyValue.TIMESTAMP_TYPE_SIZE;
339    block.get(result.array(), pos, qualifierLength);
340    pos += qualifierLength;
341
342    // copy the timestamp and type
343    int timestampFitInBytes =
344        ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH) + 1;
345    long timestamp = ByteBuff.readLong(block, timestampFitInBytes);
346    if ((flag & FLAG_TIMESTAMP_SIGN) != 0) {
347      timestamp = -timestamp;
348    }
349    result.putLong(pos, timestamp);
350    pos += Bytes.SIZEOF_LONG;
351    block.get(result.array(), pos, Bytes.SIZEOF_BYTE);
352
353    block.reset();
354    // The result is already a BB. So always we will create a KeyOnlyKv.
355    return new KeyValue.KeyOnlyKeyValue(result.array(), 0, keyLength);
356  }
357
358  @Override
359  public String toString() {
360    return DiffKeyDeltaEncoder.class.getSimpleName();
361  }
362
363  protected static class DiffSeekerState extends SeekerState {
364
365    private int rowLengthWithSize;
366    private long timestamp;
367
368    public DiffSeekerState(ObjectIntPair<ByteBuffer> tmpPair,
369        boolean includeTags) {
370      super(tmpPair, includeTags);
371    }
372
373    @Override
374    protected void copyFromNext(SeekerState that) {
375      super.copyFromNext(that);
376      DiffSeekerState other = (DiffSeekerState) that;
377      rowLengthWithSize = other.rowLengthWithSize;
378      timestamp = other.timestamp;
379    }
380  }
381
382  @Override
383  public EncodedSeeker createSeeker(HFileBlockDecodingContext decodingCtx) {
384    return new DiffSeekerStateBufferedEncodedSeeker(decodingCtx);
385  }
386
387  @Override
388  protected ByteBuffer internalDecodeKeyValues(DataInputStream source, int allocateHeaderLength,
389      int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx) throws IOException {
390    int decompressedSize = source.readInt();
391    ByteBuffer buffer = ByteBuffer.allocate(decompressedSize +
392        allocateHeaderLength);
393    buffer.position(allocateHeaderLength);
394    DiffCompressionState state = new DiffCompressionState();
395    while (source.available() > skipLastBytes) {
396      uncompressSingleKeyValue(source, buffer, state);
397      afterDecodingKeyValue(source, buffer, decodingCtx);
398    }
399
400    if (source.available() != skipLastBytes) {
401      throw new IllegalStateException("Read too much bytes.");
402    }
403
404    return buffer;
405  }
406
407  private static class DiffSeekerStateBufferedEncodedSeeker
408      extends BufferedEncodedSeeker<DiffSeekerState> {
409    private byte[] familyNameWithSize;
410    private static final int TIMESTAMP_WITH_TYPE_LENGTH =
411        Bytes.SIZEOF_LONG + Bytes.SIZEOF_BYTE;
412
413    private DiffSeekerStateBufferedEncodedSeeker(HFileBlockDecodingContext decodingCtx) {
414      super(decodingCtx);
415    }
416
417    private void decode(boolean isFirst) {
418      byte flag = currentBuffer.get();
419      byte type = 0;
420      if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
421        if (!isFirst) {
422          type = current.keyBuffer[current.keyLength - Bytes.SIZEOF_BYTE];
423        }
424        current.keyLength = ByteBuff.readCompressedInt(currentBuffer);
425      }
426      if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
427        current.valueLength = ByteBuff.readCompressedInt(currentBuffer);
428      }
429      current.lastCommonPrefix = ByteBuff.readCompressedInt(currentBuffer);
430
431      current.ensureSpaceForKey();
432
433      if (current.lastCommonPrefix < Bytes.SIZEOF_SHORT) {
434        // length of row is different, copy everything except family
435
436        // copy the row size
437        currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
438            Bytes.SIZEOF_SHORT - current.lastCommonPrefix);
439        current.rowLengthWithSize = Bytes.toShort(current.keyBuffer, 0) +
440            Bytes.SIZEOF_SHORT;
441
442        // copy the rest of row
443        currentBuffer.get(current.keyBuffer, Bytes.SIZEOF_SHORT,
444            current.rowLengthWithSize - Bytes.SIZEOF_SHORT);
445
446        // copy the column family
447        System.arraycopy(familyNameWithSize, 0, current.keyBuffer,
448            current.rowLengthWithSize, familyNameWithSize.length);
449
450        // copy the qualifier
451        currentBuffer.get(current.keyBuffer,
452            current.rowLengthWithSize + familyNameWithSize.length,
453            current.keyLength - current.rowLengthWithSize -
454            familyNameWithSize.length - TIMESTAMP_WITH_TYPE_LENGTH);
455      } else if (current.lastCommonPrefix < current.rowLengthWithSize) {
456        // we have to copy part of row and qualifier,
457        // but column family is in right place
458
459        // before column family (rest of row)
460        currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
461            current.rowLengthWithSize - current.lastCommonPrefix);
462
463        // after column family (qualifier)
464        currentBuffer.get(current.keyBuffer,
465            current.rowLengthWithSize + familyNameWithSize.length,
466            current.keyLength - current.rowLengthWithSize -
467            familyNameWithSize.length - TIMESTAMP_WITH_TYPE_LENGTH);
468      } else {
469        // copy just the ending
470        currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
471            current.keyLength - TIMESTAMP_WITH_TYPE_LENGTH -
472            current.lastCommonPrefix);
473      }
474
475      // timestamp
476      int pos = current.keyLength - TIMESTAMP_WITH_TYPE_LENGTH;
477      int timestampFitInBytes = 1 +
478          ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH);
479      long timestampOrDiff = ByteBuff.readLong(currentBuffer, timestampFitInBytes);
480      if ((flag & FLAG_TIMESTAMP_SIGN) != 0) {
481        timestampOrDiff = -timestampOrDiff;
482      }
483      if ((flag & FLAG_TIMESTAMP_IS_DIFF) == 0) { // it is timestamp
484        current.timestamp = timestampOrDiff;
485      } else { // it is diff
486        current.timestamp = current.timestamp - timestampOrDiff;
487      }
488      Bytes.putLong(current.keyBuffer, pos, current.timestamp);
489      pos += Bytes.SIZEOF_LONG;
490
491      // type
492      if ((flag & FLAG_SAME_TYPE) == 0) {
493        currentBuffer.get(current.keyBuffer, pos, Bytes.SIZEOF_BYTE);
494      } else if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
495        current.keyBuffer[pos] = type;
496      }
497
498      current.valueOffset = currentBuffer.position();
499      currentBuffer.skip(current.valueLength);
500
501      if (includesTags()) {
502        decodeTags();
503      }
504      if (includesMvcc()) {
505        current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
506      } else {
507        current.memstoreTS = 0;
508      }
509      current.nextKvOffset = currentBuffer.position();
510    }
511
512    @Override
513    protected void decodeFirst() {
514      currentBuffer.skip(Bytes.SIZEOF_INT);
515
516      // read column family
517      byte familyNameLength = currentBuffer.get();
518      familyNameWithSize = new byte[familyNameLength + Bytes.SIZEOF_BYTE];
519      familyNameWithSize[0] = familyNameLength;
520      currentBuffer.get(familyNameWithSize, Bytes.SIZEOF_BYTE,
521          familyNameLength);
522      decode(true);
523    }
524
525    @Override
526    protected void decodeNext() {
527      decode(false);
528    }
529
530    @Override
531    protected DiffSeekerState createSeekerState() {
532      return new DiffSeekerState(this.tmpPair, this.includesTags());
533    }
534  }
535}