001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with this
004 * work for additional information regarding copyright ownership. The ASF
005 * licenses this file to you under the Apache License, Version 2.0 (the
006 * "License"); you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
013 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
014 * License for the specific language governing permissions and limitations
015 * under the License.
016 */
017package org.apache.hadoop.hbase.io.encoding;
018
019import java.io.DataInputStream;
020import java.io.DataOutput;
021import java.io.DataOutputStream;
022import java.io.IOException;
023import java.nio.ByteBuffer;
024
025import org.apache.hadoop.hbase.Cell;
026import org.apache.hadoop.hbase.CellComparator;
027import org.apache.hadoop.hbase.KeyValue;
028import org.apache.hadoop.hbase.KeyValueUtil;
029import org.apache.hadoop.hbase.PrivateCellUtil;
030import org.apache.hadoop.hbase.nio.ByteBuff;
031import org.apache.hadoop.hbase.util.ByteBufferUtils;
032import org.apache.hadoop.hbase.util.Bytes;
033import org.apache.hadoop.hbase.util.ObjectIntPair;
034import org.apache.yetus.audience.InterfaceAudience;
035
036/**
037 * Encoder similar to {@link DiffKeyDeltaEncoder} but supposedly faster.
038 *
039 * Compress using:
040 * - store size of common prefix
041 * - save column family once in the first KeyValue
042 * - use integer compression for key, value and prefix (7-bit encoding)
043 * - use bits to avoid duplication key length, value length
044 *   and type if it same as previous
045 * - store in 3 bits length of prefix timestamp
046 *    with previous KeyValue's timestamp
047 * - one bit which allow to omit value if it is the same
048 *
049 * Format:
050 * - 1 byte:    flag
051 * - 1-5 bytes: key length (only if FLAG_SAME_KEY_LENGTH is not set in flag)
052 * - 1-5 bytes: value length (only if FLAG_SAME_VALUE_LENGTH is not set in flag)
053 * - 1-5 bytes: prefix length
054 * - ... bytes: rest of the row (if prefix length is small enough)
055 * - ... bytes: qualifier (or suffix depending on prefix length)
056 * - 1-8 bytes: timestamp suffix
057 * - 1 byte:    type (only if FLAG_SAME_TYPE is not set in the flag)
058 * - ... bytes: value (only if FLAG_SAME_VALUE is not set in the flag)
059 *
060 */
061@InterfaceAudience.Private
062public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder {
063  static final int MASK_TIMESTAMP_LENGTH = (1 << 0) | (1 << 1) | (1 << 2);
064  static final int SHIFT_TIMESTAMP_LENGTH = 0;
065  static final int FLAG_SAME_KEY_LENGTH = 1 << 3;
066  static final int FLAG_SAME_VALUE_LENGTH = 1 << 4;
067  static final int FLAG_SAME_TYPE = 1 << 5;
068  static final int FLAG_SAME_VALUE = 1 << 6;
069
070  private static class FastDiffCompressionState extends CompressionState {
071    byte[] timestamp = new byte[KeyValue.TIMESTAMP_SIZE];
072    int prevTimestampOffset;
073
074    @Override
075    protected void readTimestamp(ByteBuffer in) {
076      in.get(timestamp);
077    }
078
079    @Override
080    void copyFrom(CompressionState state) {
081      super.copyFrom(state);
082      FastDiffCompressionState state2 = (FastDiffCompressionState) state;
083      System.arraycopy(state2.timestamp, 0, timestamp, 0,
084          KeyValue.TIMESTAMP_SIZE);
085      prevTimestampOffset = state2.prevTimestampOffset;
086    }
087
088    /**
089     * Copies the first key/value from the given stream, and initializes
090     * decompression state based on it. Assumes that we have already read key
091     * and value lengths. Does not set {@link #qualifierLength} (not used by
092     * decompression) or {@link #prevOffset} (set by the calle afterwards).
093     */
094    private void decompressFirstKV(ByteBuffer out, DataInputStream in)
095        throws IOException {
096      int kvPos = out.position();
097      out.putInt(keyLength);
098      out.putInt(valueLength);
099      prevTimestampOffset = out.position() + keyLength -
100          KeyValue.TIMESTAMP_TYPE_SIZE;
101      ByteBufferUtils.copyFromStreamToBuffer(out, in, keyLength + valueLength);
102      rowLength = out.getShort(kvPos + KeyValue.ROW_OFFSET);
103      familyLength = out.get(kvPos + KeyValue.ROW_OFFSET +
104          KeyValue.ROW_LENGTH_SIZE + rowLength);
105      type = out.get(prevTimestampOffset + KeyValue.TIMESTAMP_SIZE);
106    }
107
108  }
109
110  private int findCommonTimestampPrefix(byte[] curTsBuf, byte[] prevTsBuf) {
111    int commonPrefix = 0;
112    while (commonPrefix < (KeyValue.TIMESTAMP_SIZE - 1)
113        && curTsBuf[commonPrefix] == prevTsBuf[commonPrefix]) {
114      commonPrefix++;
115    }
116    return commonPrefix; // has to be at most 7 bytes
117  }
118
119  private void uncompressSingleKeyValue(DataInputStream source,
120      ByteBuffer out, FastDiffCompressionState state)
121          throws IOException, EncoderBufferTooSmallException {
122    byte flag = source.readByte();
123    int prevKeyLength = state.keyLength;
124
125    if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
126      state.keyLength = ByteBufferUtils.readCompressedInt(source);
127    }
128    if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
129      state.valueLength = ByteBufferUtils.readCompressedInt(source);
130    }
131    int commonLength = ByteBufferUtils.readCompressedInt(source);
132
133    ensureSpace(out, state.keyLength + state.valueLength + KeyValue.ROW_OFFSET);
134
135    int kvPos = out.position();
136
137    if (!state.isFirst()) {
138      // copy the prefix
139      int common;
140      int prevOffset;
141
142      if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
143        out.putInt(state.keyLength);
144        out.putInt(state.valueLength);
145        prevOffset = state.prevOffset + KeyValue.ROW_OFFSET;
146        common = commonLength;
147      } else {
148        if ((flag & FLAG_SAME_KEY_LENGTH) != 0) {
149          prevOffset = state.prevOffset;
150          common = commonLength + KeyValue.ROW_OFFSET;
151        } else {
152          out.putInt(state.keyLength);
153          prevOffset = state.prevOffset + KeyValue.KEY_LENGTH_SIZE;
154          common = commonLength + KeyValue.KEY_LENGTH_SIZE;
155        }
156      }
157
158      ByteBufferUtils.copyFromBufferToBuffer(out, out, prevOffset, common);
159
160      // copy the rest of the key from the buffer
161      int keyRestLength;
162      if (commonLength < state.rowLength + KeyValue.ROW_LENGTH_SIZE) {
163        // omit the family part of the key, it is always the same
164        int rowWithSizeLength;
165        int rowRestLength;
166
167        // check length of row
168        if (commonLength < KeyValue.ROW_LENGTH_SIZE) {
169          // not yet copied, do it now
170          ByteBufferUtils.copyFromStreamToBuffer(out, source,
171              KeyValue.ROW_LENGTH_SIZE - commonLength);
172
173          rowWithSizeLength = out.getShort(out.position() -
174              KeyValue.ROW_LENGTH_SIZE) + KeyValue.ROW_LENGTH_SIZE;
175          rowRestLength = rowWithSizeLength - KeyValue.ROW_LENGTH_SIZE;
176        } else {
177          // already in kvBuffer, just read it
178          rowWithSizeLength = out.getShort(kvPos + KeyValue.ROW_OFFSET) +
179              KeyValue.ROW_LENGTH_SIZE;
180          rowRestLength = rowWithSizeLength - commonLength;
181        }
182
183        // copy the rest of row
184        ByteBufferUtils.copyFromStreamToBuffer(out, source, rowRestLength);
185
186        // copy the column family
187        ByteBufferUtils.copyFromBufferToBuffer(out, out,
188            state.prevOffset + KeyValue.ROW_OFFSET + KeyValue.ROW_LENGTH_SIZE
189                + state.rowLength, state.familyLength
190                + KeyValue.FAMILY_LENGTH_SIZE);
191        state.rowLength = (short) (rowWithSizeLength -
192            KeyValue.ROW_LENGTH_SIZE);
193
194        keyRestLength = state.keyLength - rowWithSizeLength -
195            state.familyLength -
196            (KeyValue.FAMILY_LENGTH_SIZE + KeyValue.TIMESTAMP_TYPE_SIZE);
197      } else {
198        // prevRowWithSizeLength is the same as on previous row
199        keyRestLength = state.keyLength - commonLength -
200            KeyValue.TIMESTAMP_TYPE_SIZE;
201      }
202      // copy the rest of the key, after column family == column qualifier
203      ByteBufferUtils.copyFromStreamToBuffer(out, source, keyRestLength);
204
205      // copy timestamp
206      int prefixTimestamp =
207          (flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH;
208      ByteBufferUtils.copyFromBufferToBuffer(out, out,
209          state.prevTimestampOffset, prefixTimestamp);
210      state.prevTimestampOffset = out.position() - prefixTimestamp;
211      ByteBufferUtils.copyFromStreamToBuffer(out, source,
212          KeyValue.TIMESTAMP_SIZE - prefixTimestamp);
213
214      // copy the type and value
215      if ((flag & FLAG_SAME_TYPE) != 0) {
216        out.put(state.type);
217        if ((flag & FLAG_SAME_VALUE) != 0) {
218          ByteBufferUtils.copyFromBufferToBuffer(out, out, state.prevOffset +
219              KeyValue.ROW_OFFSET + prevKeyLength, state.valueLength);
220        } else {
221          ByteBufferUtils.copyFromStreamToBuffer(out, source,
222              state.valueLength);
223        }
224      } else {
225        if ((flag & FLAG_SAME_VALUE) != 0) {
226          ByteBufferUtils.copyFromStreamToBuffer(out, source,
227              KeyValue.TYPE_SIZE);
228          ByteBufferUtils.copyFromBufferToBuffer(out, out, state.prevOffset +
229              KeyValue.ROW_OFFSET + prevKeyLength, state.valueLength);
230        } else {
231          ByteBufferUtils.copyFromStreamToBuffer(out, source,
232              state.valueLength + KeyValue.TYPE_SIZE);
233        }
234        state.type = out.get(state.prevTimestampOffset +
235            KeyValue.TIMESTAMP_SIZE);
236      }
237    } else { // this is the first element
238      state.decompressFirstKV(out, source);
239    }
240
241    state.prevOffset = kvPos;
242  }
243
244  @Override
245  public int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingContext,
246      DataOutputStream out) throws IOException {
247    EncodingState state = encodingContext.getEncodingState();
248    int size = compressSingleKeyValue(out, cell, state.prevCell);
249    size += afterEncodingKeyValue(cell, out, encodingContext);
250    state.prevCell = cell;
251    return size;
252  }
253
254  private int compressSingleKeyValue(DataOutputStream out, Cell cell, Cell prevCell)
255      throws IOException {
256    int flag = 0; // Do not use more bits than will fit into a byte
257    int kLength = KeyValueUtil.keyLength(cell);
258    int vLength = cell.getValueLength();
259
260    if (prevCell == null) {
261      // copy the key, there is no common prefix with none
262      out.write(flag);
263      ByteBufferUtils.putCompressedInt(out, kLength);
264      ByteBufferUtils.putCompressedInt(out, vLength);
265      ByteBufferUtils.putCompressedInt(out, 0);
266      PrivateCellUtil.writeFlatKey(cell, (DataOutput)out);
267      // Write the value part
268      PrivateCellUtil.writeValue(out, cell, cell.getValueLength());
269    } else {
270      int preKeyLength = KeyValueUtil.keyLength(prevCell);
271      int preValLength = prevCell.getValueLength();
272      // find a common prefix and skip it
273      int commonPrefix = PrivateCellUtil.findCommonPrefixInFlatKey(cell, prevCell, true, false);
274
275      if (kLength == preKeyLength) {
276        flag |= FLAG_SAME_KEY_LENGTH;
277      }
278      if (vLength == prevCell.getValueLength()) {
279        flag |= FLAG_SAME_VALUE_LENGTH;
280      }
281      if (cell.getTypeByte() == prevCell.getTypeByte()) {
282        flag |= FLAG_SAME_TYPE;
283      }
284
285      byte[] curTsBuf = Bytes.toBytes(cell.getTimestamp());
286      int commonTimestampPrefix = findCommonTimestampPrefix(curTsBuf,
287          Bytes.toBytes(prevCell.getTimestamp()));
288
289      flag |= commonTimestampPrefix << SHIFT_TIMESTAMP_LENGTH;
290
291      // Check if current and previous values are the same. Compare value
292      // length first as an optimization.
293      if (vLength == preValLength
294          && PrivateCellUtil.matchingValue(cell, prevCell, vLength, preValLength)) {
295        flag |= FLAG_SAME_VALUE;
296      }
297
298      out.write(flag);
299      if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
300        ByteBufferUtils.putCompressedInt(out, kLength);
301      }
302      if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
303        ByteBufferUtils.putCompressedInt(out, vLength);
304      }
305      ByteBufferUtils.putCompressedInt(out, commonPrefix);
306      short rLen = cell.getRowLength();
307      if (commonPrefix < rLen + KeyValue.ROW_LENGTH_SIZE) {
308        // Previous and current rows are different. Copy the differing part of
309        // the row, skip the column family, and copy the qualifier.
310        PrivateCellUtil.writeRowKeyExcludingCommon(cell, rLen, commonPrefix, out);
311        PrivateCellUtil.writeQualifier(out, cell, cell.getQualifierLength());
312      } else {
313        // The common part includes the whole row. As the column family is the
314        // same across the whole file, it will automatically be included in the
315        // common prefix, so we need not special-case it here.
316        // What we write here is the non common part of the qualifier
317        int commonQualPrefix = commonPrefix - (rLen + KeyValue.ROW_LENGTH_SIZE)
318            - (cell.getFamilyLength() + KeyValue.FAMILY_LENGTH_SIZE);
319        PrivateCellUtil.writeQualifierSkippingBytes(out, cell, cell.getQualifierLength(),
320          commonQualPrefix);
321      }
322      // Write non common ts part
323      out.write(curTsBuf, commonTimestampPrefix, KeyValue.TIMESTAMP_SIZE - commonTimestampPrefix);
324
325      // Write the type if it is not the same as before.
326      if ((flag & FLAG_SAME_TYPE) == 0) {
327        out.write(cell.getTypeByte());
328      }
329
330      // Write the value if it is not the same as before.
331      if ((flag & FLAG_SAME_VALUE) == 0) {
332        PrivateCellUtil.writeValue(out, cell, vLength);
333      }
334    }
335    return kLength + vLength + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE;
336  }
337
338  @Override
339  protected ByteBuffer internalDecodeKeyValues(DataInputStream source, int allocateHeaderLength,
340      int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx) throws IOException {
341    int decompressedSize = source.readInt();
342    ByteBuffer buffer = ByteBuffer.allocate(decompressedSize +
343        allocateHeaderLength);
344    buffer.position(allocateHeaderLength);
345    FastDiffCompressionState state = new FastDiffCompressionState();
346    while (source.available() > skipLastBytes) {
347      uncompressSingleKeyValue(source, buffer, state);
348      afterDecodingKeyValue(source, buffer, decodingCtx);
349    }
350
351    if (source.available() != skipLastBytes) {
352      throw new IllegalStateException("Read too much bytes.");
353    }
354
355    return buffer;
356  }
357
358  @Override
359  public Cell getFirstKeyCellInBlock(ByteBuff block) {
360    block.mark();
361    block.position(Bytes.SIZEOF_INT + Bytes.SIZEOF_BYTE);
362    int keyLength = ByteBuff.readCompressedInt(block);
363    // TODO : See if we can avoid these reads as the read values are not getting used
364    ByteBuff.readCompressedInt(block); // valueLength
365    ByteBuff.readCompressedInt(block); // commonLength
366    ByteBuffer key = block.asSubByteBuffer(keyLength).duplicate();
367    block.reset();
368    return createFirstKeyCell(key, keyLength);
369  }
370
371  @Override
372  public String toString() {
373    return FastDiffDeltaEncoder.class.getSimpleName();
374  }
375
376  protected static class FastDiffSeekerState extends SeekerState {
377    private byte[] prevTimestampAndType =
378        new byte[KeyValue.TIMESTAMP_TYPE_SIZE];
379    private int rowLengthWithSize;
380    private int familyLengthWithSize;
381
382    public FastDiffSeekerState(ObjectIntPair<ByteBuffer> tmpPair,
383        boolean includeTags) {
384      super(tmpPair, includeTags);
385    }
386
387    @Override
388    protected void copyFromNext(SeekerState that) {
389      super.copyFromNext(that);
390      FastDiffSeekerState other = (FastDiffSeekerState) that;
391      System.arraycopy(other.prevTimestampAndType, 0,
392          prevTimestampAndType, 0,
393          KeyValue.TIMESTAMP_TYPE_SIZE);
394      rowLengthWithSize = other.rowLengthWithSize;
395      familyLengthWithSize = other.familyLengthWithSize;
396    }
397  }
398
399  @Override
400  public EncodedSeeker createSeeker(CellComparator comparator,
401      final HFileBlockDecodingContext decodingCtx) {
402    return new BufferedEncodedSeeker<FastDiffSeekerState>(comparator, decodingCtx) {
403      private void decode(boolean isFirst) {
404        byte flag = currentBuffer.get();
405        if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
406          if (!isFirst) {
407            System.arraycopy(current.keyBuffer,
408                current.keyLength - current.prevTimestampAndType.length,
409                current.prevTimestampAndType, 0,
410                current.prevTimestampAndType.length);
411          }
412          current.keyLength = ByteBuff.readCompressedInt(currentBuffer);
413        }
414        if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
415          current.valueLength = ByteBuff.readCompressedInt(currentBuffer);
416        }
417        current.lastCommonPrefix = ByteBuff.readCompressedInt(currentBuffer);
418
419        current.ensureSpaceForKey();
420
421        if (isFirst) {
422          // copy everything
423          currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
424              current.keyLength - current.prevTimestampAndType.length);
425          current.rowLengthWithSize = Bytes.toShort(current.keyBuffer, 0) +
426              Bytes.SIZEOF_SHORT;
427          current.familyLengthWithSize =
428              current.keyBuffer[current.rowLengthWithSize] + Bytes.SIZEOF_BYTE;
429        } else if (current.lastCommonPrefix < Bytes.SIZEOF_SHORT) {
430          // length of row is different, copy everything except family
431
432          // copy the row size
433          int oldRowLengthWithSize = current.rowLengthWithSize;
434          currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
435              Bytes.SIZEOF_SHORT - current.lastCommonPrefix);
436          current.rowLengthWithSize = Bytes.toShort(current.keyBuffer, 0) +
437              Bytes.SIZEOF_SHORT;
438
439          // move the column family
440          System.arraycopy(current.keyBuffer, oldRowLengthWithSize,
441              current.keyBuffer, current.rowLengthWithSize,
442              current.familyLengthWithSize);
443
444          // copy the rest of row
445          currentBuffer.get(current.keyBuffer, Bytes.SIZEOF_SHORT,
446              current.rowLengthWithSize - Bytes.SIZEOF_SHORT);
447
448          // copy the qualifier
449          currentBuffer.get(current.keyBuffer, current.rowLengthWithSize
450              + current.familyLengthWithSize, current.keyLength
451              - current.rowLengthWithSize - current.familyLengthWithSize
452              - current.prevTimestampAndType.length);
453        } else if (current.lastCommonPrefix < current.rowLengthWithSize) {
454          // We have to copy part of row and qualifier, but the column family
455          // is in the right place.
456
457          // before column family (rest of row)
458          currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
459              current.rowLengthWithSize - current.lastCommonPrefix);
460
461          // after column family (qualifier)
462          currentBuffer.get(current.keyBuffer, current.rowLengthWithSize
463              + current.familyLengthWithSize, current.keyLength
464              - current.rowLengthWithSize - current.familyLengthWithSize
465              - current.prevTimestampAndType.length);
466        } else {
467          // copy just the ending
468          currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
469              current.keyLength - current.prevTimestampAndType.length
470                  - current.lastCommonPrefix);
471        }
472
473        // timestamp
474        int pos = current.keyLength - current.prevTimestampAndType.length;
475        int commonTimestampPrefix = (flag & MASK_TIMESTAMP_LENGTH) >>>
476          SHIFT_TIMESTAMP_LENGTH;
477        if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
478          System.arraycopy(current.prevTimestampAndType, 0, current.keyBuffer,
479              pos, commonTimestampPrefix);
480        }
481        pos += commonTimestampPrefix;
482        currentBuffer.get(current.keyBuffer, pos,
483            Bytes.SIZEOF_LONG - commonTimestampPrefix);
484        pos += Bytes.SIZEOF_LONG - commonTimestampPrefix;
485
486        // type
487        if ((flag & FLAG_SAME_TYPE) == 0) {
488          currentBuffer.get(current.keyBuffer, pos, Bytes.SIZEOF_BYTE);
489        } else if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
490          current.keyBuffer[pos] =
491              current.prevTimestampAndType[Bytes.SIZEOF_LONG];
492        }
493
494        // handle value
495        if ((flag & FLAG_SAME_VALUE) == 0) {
496          current.valueOffset = currentBuffer.position();
497          currentBuffer.skip(current.valueLength);
498        }
499
500        if (includesTags()) {
501          decodeTags();
502        }
503        if (includesMvcc()) {
504          current.memstoreTS = ByteBuff.readVLong(currentBuffer);
505        } else {
506          current.memstoreTS = 0;
507        }
508        current.nextKvOffset = currentBuffer.position();
509      }
510
511      @Override
512      protected void decodeFirst() {
513        currentBuffer.skip(Bytes.SIZEOF_INT);
514        decode(true);
515      }
516
517      @Override
518      protected void decodeNext() {
519        decode(false);
520      }
521
522      @Override
523      protected FastDiffSeekerState createSeekerState() {
524        return new FastDiffSeekerState(this.tmpPair, this.includesTags());
525      }
526    };
527  }
528}