001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with this
004 * work for additional information regarding copyright ownership. The ASF
005 * licenses this file to you under the Apache License, Version 2.0 (the
006 * "License"); you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
013 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
014 * License for the specific language governing permissions and limitations
015 * under the License.
016 */
017package org.apache.hadoop.hbase.io.encoding;
018
019import java.io.DataInputStream;
020import java.io.DataOutput;
021import java.io.DataOutputStream;
022import java.io.IOException;
023import java.nio.ByteBuffer;
024import org.apache.hadoop.hbase.Cell;
025import org.apache.hadoop.hbase.KeyValue;
026import org.apache.hadoop.hbase.KeyValueUtil;
027import org.apache.hadoop.hbase.PrivateCellUtil;
028import org.apache.hadoop.hbase.nio.ByteBuff;
029import org.apache.hadoop.hbase.util.ByteBufferUtils;
030import org.apache.hadoop.hbase.util.Bytes;
031import org.apache.hadoop.hbase.util.ObjectIntPair;
032import org.apache.yetus.audience.InterfaceAudience;
033
034/**
035 * Encoder similar to {@link DiffKeyDeltaEncoder} but supposedly faster.
036 *
037 * Compress using:
038 * - store size of common prefix
039 * - save column family once in the first KeyValue
040 * - use integer compression for key, value and prefix (7-bit encoding)
041 * - use bits to avoid duplication key length, value length
042 *   and type if it same as previous
043 * - store in 3 bits length of prefix timestamp
044 *    with previous KeyValue's timestamp
045 * - one bit which allow to omit value if it is the same
046 *
047 * Format:
048 * - 1 byte:    flag
049 * - 1-5 bytes: key length (only if FLAG_SAME_KEY_LENGTH is not set in flag)
050 * - 1-5 bytes: value length (only if FLAG_SAME_VALUE_LENGTH is not set in flag)
051 * - 1-5 bytes: prefix length
052 * - ... bytes: rest of the row (if prefix length is small enough)
053 * - ... bytes: qualifier (or suffix depending on prefix length)
054 * - 1-8 bytes: timestamp suffix
055 * - 1 byte:    type (only if FLAG_SAME_TYPE is not set in the flag)
056 * - ... bytes: value (only if FLAG_SAME_VALUE is not set in the flag)
057 *
058 */
059@InterfaceAudience.Private
060public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder {
061  static final int MASK_TIMESTAMP_LENGTH = (1 << 0) | (1 << 1) | (1 << 2);
062  static final int SHIFT_TIMESTAMP_LENGTH = 0;
063  static final int FLAG_SAME_KEY_LENGTH = 1 << 3;
064  static final int FLAG_SAME_VALUE_LENGTH = 1 << 4;
065  static final int FLAG_SAME_TYPE = 1 << 5;
066  static final int FLAG_SAME_VALUE = 1 << 6;
067
068  private static class FastDiffCompressionState extends CompressionState {
069    byte[] timestamp = new byte[KeyValue.TIMESTAMP_SIZE];
070    int prevTimestampOffset;
071
072    @Override
073    protected void readTimestamp(ByteBuffer in) {
074      in.get(timestamp);
075    }
076
077    @Override
078    void copyFrom(CompressionState state) {
079      super.copyFrom(state);
080      FastDiffCompressionState state2 = (FastDiffCompressionState) state;
081      System.arraycopy(state2.timestamp, 0, timestamp, 0,
082          KeyValue.TIMESTAMP_SIZE);
083      prevTimestampOffset = state2.prevTimestampOffset;
084    }
085
086    /**
087     * Copies the first key/value from the given stream, and initializes
088     * decompression state based on it. Assumes that we have already read key
089     * and value lengths. Does not set {@link #qualifierLength} (not used by
090     * decompression) or {@link #prevOffset} (set by the calle afterwards).
091     */
092    private void decompressFirstKV(ByteBuffer out, DataInputStream in)
093        throws IOException {
094      int kvPos = out.position();
095      out.putInt(keyLength);
096      out.putInt(valueLength);
097      prevTimestampOffset = out.position() + keyLength -
098          KeyValue.TIMESTAMP_TYPE_SIZE;
099      ByteBufferUtils.copyFromStreamToBuffer(out, in, keyLength + valueLength);
100      rowLength = out.getShort(kvPos + KeyValue.ROW_OFFSET);
101      familyLength = out.get(kvPos + KeyValue.ROW_OFFSET +
102          KeyValue.ROW_LENGTH_SIZE + rowLength);
103      type = out.get(prevTimestampOffset + KeyValue.TIMESTAMP_SIZE);
104    }
105
106  }
107
108  private int findCommonTimestampPrefix(byte[] curTsBuf, byte[] prevTsBuf) {
109    int commonPrefix = 0;
110    while (commonPrefix < (KeyValue.TIMESTAMP_SIZE - 1)
111        && curTsBuf[commonPrefix] == prevTsBuf[commonPrefix]) {
112      commonPrefix++;
113    }
114    return commonPrefix; // has to be at most 7 bytes
115  }
116
117  private void uncompressSingleKeyValue(DataInputStream source,
118      ByteBuffer out, FastDiffCompressionState state)
119          throws IOException, EncoderBufferTooSmallException {
120    byte flag = source.readByte();
121    int prevKeyLength = state.keyLength;
122
123    if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
124      state.keyLength = ByteBufferUtils.readCompressedInt(source);
125    }
126    if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
127      state.valueLength = ByteBufferUtils.readCompressedInt(source);
128    }
129    int commonLength = ByteBufferUtils.readCompressedInt(source);
130
131    ensureSpace(out, state.keyLength + state.valueLength + KeyValue.ROW_OFFSET);
132
133    int kvPos = out.position();
134
135    if (!state.isFirst()) {
136      // copy the prefix
137      int common;
138      int prevOffset;
139
140      if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
141        out.putInt(state.keyLength);
142        out.putInt(state.valueLength);
143        prevOffset = state.prevOffset + KeyValue.ROW_OFFSET;
144        common = commonLength;
145      } else {
146        if ((flag & FLAG_SAME_KEY_LENGTH) != 0) {
147          prevOffset = state.prevOffset;
148          common = commonLength + KeyValue.ROW_OFFSET;
149        } else {
150          out.putInt(state.keyLength);
151          prevOffset = state.prevOffset + KeyValue.KEY_LENGTH_SIZE;
152          common = commonLength + KeyValue.KEY_LENGTH_SIZE;
153        }
154      }
155
156      ByteBufferUtils.copyFromBufferToBuffer(out, out, prevOffset, common);
157
158      // copy the rest of the key from the buffer
159      int keyRestLength;
160      if (commonLength < state.rowLength + KeyValue.ROW_LENGTH_SIZE) {
161        // omit the family part of the key, it is always the same
162        int rowWithSizeLength;
163        int rowRestLength;
164
165        // check length of row
166        if (commonLength < KeyValue.ROW_LENGTH_SIZE) {
167          // not yet copied, do it now
168          ByteBufferUtils.copyFromStreamToBuffer(out, source,
169              KeyValue.ROW_LENGTH_SIZE - commonLength);
170
171          rowWithSizeLength = out.getShort(out.position() -
172              KeyValue.ROW_LENGTH_SIZE) + KeyValue.ROW_LENGTH_SIZE;
173          rowRestLength = rowWithSizeLength - KeyValue.ROW_LENGTH_SIZE;
174        } else {
175          // already in kvBuffer, just read it
176          rowWithSizeLength = out.getShort(kvPos + KeyValue.ROW_OFFSET) +
177              KeyValue.ROW_LENGTH_SIZE;
178          rowRestLength = rowWithSizeLength - commonLength;
179        }
180
181        // copy the rest of row
182        ByteBufferUtils.copyFromStreamToBuffer(out, source, rowRestLength);
183
184        // copy the column family
185        ByteBufferUtils.copyFromBufferToBuffer(out, out,
186            state.prevOffset + KeyValue.ROW_OFFSET + KeyValue.ROW_LENGTH_SIZE
187                + state.rowLength, state.familyLength
188                + KeyValue.FAMILY_LENGTH_SIZE);
189        state.rowLength = (short) (rowWithSizeLength -
190            KeyValue.ROW_LENGTH_SIZE);
191
192        keyRestLength = state.keyLength - rowWithSizeLength -
193            state.familyLength -
194            (KeyValue.FAMILY_LENGTH_SIZE + KeyValue.TIMESTAMP_TYPE_SIZE);
195      } else {
196        // prevRowWithSizeLength is the same as on previous row
197        keyRestLength = state.keyLength - commonLength -
198            KeyValue.TIMESTAMP_TYPE_SIZE;
199      }
200      // copy the rest of the key, after column family == column qualifier
201      ByteBufferUtils.copyFromStreamToBuffer(out, source, keyRestLength);
202
203      // copy timestamp
204      int prefixTimestamp =
205          (flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH;
206      ByteBufferUtils.copyFromBufferToBuffer(out, out,
207          state.prevTimestampOffset, prefixTimestamp);
208      state.prevTimestampOffset = out.position() - prefixTimestamp;
209      ByteBufferUtils.copyFromStreamToBuffer(out, source,
210          KeyValue.TIMESTAMP_SIZE - prefixTimestamp);
211
212      // copy the type and value
213      if ((flag & FLAG_SAME_TYPE) != 0) {
214        out.put(state.type);
215        if ((flag & FLAG_SAME_VALUE) != 0) {
216          ByteBufferUtils.copyFromBufferToBuffer(out, out, state.prevOffset +
217              KeyValue.ROW_OFFSET + prevKeyLength, state.valueLength);
218        } else {
219          ByteBufferUtils.copyFromStreamToBuffer(out, source,
220              state.valueLength);
221        }
222      } else {
223        if ((flag & FLAG_SAME_VALUE) != 0) {
224          ByteBufferUtils.copyFromStreamToBuffer(out, source,
225              KeyValue.TYPE_SIZE);
226          ByteBufferUtils.copyFromBufferToBuffer(out, out, state.prevOffset +
227              KeyValue.ROW_OFFSET + prevKeyLength, state.valueLength);
228        } else {
229          ByteBufferUtils.copyFromStreamToBuffer(out, source,
230              state.valueLength + KeyValue.TYPE_SIZE);
231        }
232        state.type = out.get(state.prevTimestampOffset +
233            KeyValue.TIMESTAMP_SIZE);
234      }
235    } else { // this is the first element
236      state.decompressFirstKV(out, source);
237    }
238
239    state.prevOffset = kvPos;
240  }
241
242  @Override
243  public int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingContext,
244      DataOutputStream out) throws IOException {
245    EncodingState state = encodingContext.getEncodingState();
246    int size = compressSingleKeyValue(out, cell, state.prevCell);
247    size += afterEncodingKeyValue(cell, out, encodingContext);
248    state.prevCell = cell;
249    return size;
250  }
251
252  private int compressSingleKeyValue(DataOutputStream out, Cell cell, Cell prevCell)
253      throws IOException {
254    int flag = 0; // Do not use more bits than will fit into a byte
255    int kLength = KeyValueUtil.keyLength(cell);
256    int vLength = cell.getValueLength();
257
258    if (prevCell == null) {
259      // copy the key, there is no common prefix with none
260      out.write(flag);
261      ByteBufferUtils.putCompressedInt(out, kLength);
262      ByteBufferUtils.putCompressedInt(out, vLength);
263      ByteBufferUtils.putCompressedInt(out, 0);
264      PrivateCellUtil.writeFlatKey(cell, (DataOutput)out);
265      // Write the value part
266      PrivateCellUtil.writeValue(out, cell, cell.getValueLength());
267    } else {
268      int preKeyLength = KeyValueUtil.keyLength(prevCell);
269      int preValLength = prevCell.getValueLength();
270      // find a common prefix and skip it
271      int commonPrefix = PrivateCellUtil.findCommonPrefixInFlatKey(cell, prevCell, true, false);
272
273      if (kLength == preKeyLength) {
274        flag |= FLAG_SAME_KEY_LENGTH;
275      }
276      if (vLength == prevCell.getValueLength()) {
277        flag |= FLAG_SAME_VALUE_LENGTH;
278      }
279      if (cell.getTypeByte() == prevCell.getTypeByte()) {
280        flag |= FLAG_SAME_TYPE;
281      }
282
283      byte[] curTsBuf = Bytes.toBytes(cell.getTimestamp());
284      int commonTimestampPrefix = findCommonTimestampPrefix(curTsBuf,
285          Bytes.toBytes(prevCell.getTimestamp()));
286
287      flag |= commonTimestampPrefix << SHIFT_TIMESTAMP_LENGTH;
288
289      // Check if current and previous values are the same. Compare value
290      // length first as an optimization.
291      if (vLength == preValLength
292          && PrivateCellUtil.matchingValue(cell, prevCell, vLength, preValLength)) {
293        flag |= FLAG_SAME_VALUE;
294      }
295
296      out.write(flag);
297      if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
298        ByteBufferUtils.putCompressedInt(out, kLength);
299      }
300      if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
301        ByteBufferUtils.putCompressedInt(out, vLength);
302      }
303      ByteBufferUtils.putCompressedInt(out, commonPrefix);
304      short rLen = cell.getRowLength();
305      if (commonPrefix < rLen + KeyValue.ROW_LENGTH_SIZE) {
306        // Previous and current rows are different. Copy the differing part of
307        // the row, skip the column family, and copy the qualifier.
308        PrivateCellUtil.writeRowKeyExcludingCommon(cell, rLen, commonPrefix, out);
309        PrivateCellUtil.writeQualifier(out, cell, cell.getQualifierLength());
310      } else {
311        // The common part includes the whole row. As the column family is the
312        // same across the whole file, it will automatically be included in the
313        // common prefix, so we need not special-case it here.
314        // What we write here is the non common part of the qualifier
315        int commonQualPrefix = commonPrefix - (rLen + KeyValue.ROW_LENGTH_SIZE)
316            - (cell.getFamilyLength() + KeyValue.FAMILY_LENGTH_SIZE);
317        PrivateCellUtil.writeQualifierSkippingBytes(out, cell, cell.getQualifierLength(),
318          commonQualPrefix);
319      }
320      // Write non common ts part
321      out.write(curTsBuf, commonTimestampPrefix, KeyValue.TIMESTAMP_SIZE - commonTimestampPrefix);
322
323      // Write the type if it is not the same as before.
324      if ((flag & FLAG_SAME_TYPE) == 0) {
325        out.write(cell.getTypeByte());
326      }
327
328      // Write the value if it is not the same as before.
329      if ((flag & FLAG_SAME_VALUE) == 0) {
330        PrivateCellUtil.writeValue(out, cell, vLength);
331      }
332    }
333    return kLength + vLength + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE;
334  }
335
336  @Override
337  protected ByteBuffer internalDecodeKeyValues(DataInputStream source, int allocateHeaderLength,
338      int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx) throws IOException {
339    int decompressedSize = source.readInt();
340    ByteBuffer buffer = ByteBuffer.allocate(decompressedSize +
341        allocateHeaderLength);
342    buffer.position(allocateHeaderLength);
343    FastDiffCompressionState state = new FastDiffCompressionState();
344    while (source.available() > skipLastBytes) {
345      uncompressSingleKeyValue(source, buffer, state);
346      afterDecodingKeyValue(source, buffer, decodingCtx);
347    }
348
349    if (source.available() != skipLastBytes) {
350      throw new IllegalStateException("Read too much bytes.");
351    }
352
353    return buffer;
354  }
355
356  @Override
357  public Cell getFirstKeyCellInBlock(ByteBuff block) {
358    block.mark();
359    block.position(Bytes.SIZEOF_INT + Bytes.SIZEOF_BYTE);
360    int keyLength = ByteBuff.readCompressedInt(block);
361    // TODO : See if we can avoid these reads as the read values are not getting used
362    ByteBuff.readCompressedInt(block); // valueLength
363    ByteBuff.readCompressedInt(block); // commonLength
364    ByteBuffer key = block.asSubByteBuffer(keyLength).duplicate();
365    block.reset();
366    return createFirstKeyCell(key, keyLength);
367  }
368
369  @Override
370  public String toString() {
371    return FastDiffDeltaEncoder.class.getSimpleName();
372  }
373
374  protected static class FastDiffSeekerState extends SeekerState {
375    private byte[] prevTimestampAndType =
376        new byte[KeyValue.TIMESTAMP_TYPE_SIZE];
377    private int rowLengthWithSize;
378    private int familyLengthWithSize;
379
380    public FastDiffSeekerState(ObjectIntPair<ByteBuffer> tmpPair,
381        boolean includeTags) {
382      super(tmpPair, includeTags);
383    }
384
385    @Override
386    protected void copyFromNext(SeekerState that) {
387      super.copyFromNext(that);
388      FastDiffSeekerState other = (FastDiffSeekerState) that;
389      System.arraycopy(other.prevTimestampAndType, 0,
390          prevTimestampAndType, 0,
391          KeyValue.TIMESTAMP_TYPE_SIZE);
392      rowLengthWithSize = other.rowLengthWithSize;
393      familyLengthWithSize = other.familyLengthWithSize;
394    }
395  }
396
397  @Override
398  public EncodedSeeker createSeeker(final HFileBlockDecodingContext decodingCtx) {
399    return new FastDiffSeekerStateBufferedEncodedSeeker(decodingCtx);
400  }
401
402  private static class FastDiffSeekerStateBufferedEncodedSeeker
403      extends BufferedEncodedSeeker<FastDiffSeekerState> {
404
405    private FastDiffSeekerStateBufferedEncodedSeeker(HFileBlockDecodingContext decodingCtx) {
406      super(decodingCtx);
407    }
408
409    private void decode(boolean isFirst) {
410      byte flag = currentBuffer.get();
411      if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
412        if (!isFirst) {
413          System.arraycopy(current.keyBuffer,
414              current.keyLength - current.prevTimestampAndType.length,
415              current.prevTimestampAndType, 0,
416              current.prevTimestampAndType.length);
417        }
418        current.keyLength = ByteBuff.readCompressedInt(currentBuffer);
419      }
420      if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
421        current.valueLength = ByteBuff.readCompressedInt(currentBuffer);
422      }
423      current.lastCommonPrefix = ByteBuff.readCompressedInt(currentBuffer);
424
425      current.ensureSpaceForKey();
426
427      if (isFirst) {
428        // copy everything
429        currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
430            current.keyLength - current.prevTimestampAndType.length);
431        current.rowLengthWithSize = Bytes.toShort(current.keyBuffer, 0) +
432            Bytes.SIZEOF_SHORT;
433        current.familyLengthWithSize =
434            current.keyBuffer[current.rowLengthWithSize] + Bytes.SIZEOF_BYTE;
435      } else if (current.lastCommonPrefix < Bytes.SIZEOF_SHORT) {
436        // length of row is different, copy everything except family
437
438        // copy the row size
439        int oldRowLengthWithSize = current.rowLengthWithSize;
440        currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
441            Bytes.SIZEOF_SHORT - current.lastCommonPrefix);
442        current.rowLengthWithSize = Bytes.toShort(current.keyBuffer, 0) +
443            Bytes.SIZEOF_SHORT;
444
445        // move the column family
446        System.arraycopy(current.keyBuffer, oldRowLengthWithSize,
447            current.keyBuffer, current.rowLengthWithSize,
448            current.familyLengthWithSize);
449
450        // copy the rest of row
451        currentBuffer.get(current.keyBuffer, Bytes.SIZEOF_SHORT,
452            current.rowLengthWithSize - Bytes.SIZEOF_SHORT);
453
454        // copy the qualifier
455        currentBuffer.get(current.keyBuffer, current.rowLengthWithSize
456            + current.familyLengthWithSize, current.keyLength
457            - current.rowLengthWithSize - current.familyLengthWithSize
458            - current.prevTimestampAndType.length);
459      } else if (current.lastCommonPrefix < current.rowLengthWithSize) {
460        // We have to copy part of row and qualifier, but the column family
461        // is in the right place.
462
463        // before column family (rest of row)
464        currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
465            current.rowLengthWithSize - current.lastCommonPrefix);
466
467        // after column family (qualifier)
468        currentBuffer.get(current.keyBuffer, current.rowLengthWithSize
469            + current.familyLengthWithSize, current.keyLength
470            - current.rowLengthWithSize - current.familyLengthWithSize
471            - current.prevTimestampAndType.length);
472      } else {
473        // copy just the ending
474        currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
475            current.keyLength - current.prevTimestampAndType.length
476                - current.lastCommonPrefix);
477      }
478
479      // timestamp
480      int pos = current.keyLength - current.prevTimestampAndType.length;
481      int commonTimestampPrefix = (flag & MASK_TIMESTAMP_LENGTH) >>>
482        SHIFT_TIMESTAMP_LENGTH;
483      if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
484        System.arraycopy(current.prevTimestampAndType, 0, current.keyBuffer,
485            pos, commonTimestampPrefix);
486      }
487      pos += commonTimestampPrefix;
488      currentBuffer.get(current.keyBuffer, pos,
489          Bytes.SIZEOF_LONG - commonTimestampPrefix);
490      pos += Bytes.SIZEOF_LONG - commonTimestampPrefix;
491
492      // type
493      if ((flag & FLAG_SAME_TYPE) == 0) {
494        currentBuffer.get(current.keyBuffer, pos, Bytes.SIZEOF_BYTE);
495      } else if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
496        current.keyBuffer[pos] =
497            current.prevTimestampAndType[Bytes.SIZEOF_LONG];
498      }
499
500      // handle value
501      if ((flag & FLAG_SAME_VALUE) == 0) {
502        current.valueOffset = currentBuffer.position();
503        currentBuffer.skip(current.valueLength);
504      }
505
506      if (includesTags()) {
507        decodeTags();
508      }
509      if (includesMvcc()) {
510        current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
511      } else {
512        current.memstoreTS = 0;
513      }
514      current.nextKvOffset = currentBuffer.position();
515    }
516
517    @Override
518    protected void decodeFirst() {
519      currentBuffer.skip(Bytes.SIZEOF_INT);
520      decode(true);
521    }
522
523    @Override
524    protected void decodeNext() {
525      decode(false);
526    }
527
528    @Override
529    protected FastDiffSeekerState createSeekerState() {
530      return new FastDiffSeekerState(this.tmpPair, this.includesTags());
531    }
532  }
533}