001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.encoding;
019
020import java.io.DataInputStream;
021import java.io.DataOutputStream;
022import java.io.IOException;
023import java.nio.ByteBuffer;
024import org.apache.hadoop.hbase.Cell;
025import org.apache.hadoop.hbase.KeyValue;
026import org.apache.hadoop.hbase.KeyValueUtil;
027import org.apache.hadoop.hbase.PrivateCellUtil;
028import org.apache.hadoop.hbase.nio.ByteBuff;
029import org.apache.hadoop.hbase.util.ByteBufferUtils;
030import org.apache.hadoop.hbase.util.Bytes;
031import org.apache.hadoop.hbase.util.ObjectIntPair;
032import org.apache.yetus.audience.InterfaceAudience;
033
034/**
035 * Compress using: - store size of common prefix - save column family once, it is same within HFile
036 * - use integer compression for key, value and prefix (7-bit encoding) - use bits to avoid
037 * duplication key length, value length and type if it same as previous - store in 3 bits length of
038 * timestamp field - allow diff in timestamp instead of actual value Format: - 1 byte: flag - 1-5
039 * bytes: key length (only if FLAG_SAME_KEY_LENGTH is not set in flag) - 1-5 bytes: value length
040 * (only if FLAG_SAME_VALUE_LENGTH is not set in flag) - 1-5 bytes: prefix length - ... bytes: rest
041 * of the row (if prefix length is small enough) - ... bytes: qualifier (or suffix depending on
042 * prefix length) - 1-8 bytes: timestamp or diff - 1 byte: type (only if FLAG_SAME_TYPE is not set
043 * in the flag) - ... bytes: value
044 */
045@InterfaceAudience.Private
046public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder {
047  static final int FLAG_SAME_KEY_LENGTH = 1;
048  static final int FLAG_SAME_VALUE_LENGTH = 1 << 1;
049  static final int FLAG_SAME_TYPE = 1 << 2;
050  static final int FLAG_TIMESTAMP_IS_DIFF = 1 << 3;
051  static final int MASK_TIMESTAMP_LENGTH = (1 << 4) | (1 << 5) | (1 << 6);
052  static final int SHIFT_TIMESTAMP_LENGTH = 4;
053  static final int FLAG_TIMESTAMP_SIGN = 1 << 7;
054
055  protected static class DiffCompressionState extends CompressionState {
056    long timestamp;
057    byte[] familyNameWithSize;
058
059    @Override
060    protected void readTimestamp(ByteBuffer in) {
061      timestamp = in.getLong();
062    }
063
064    @Override
065    void copyFrom(CompressionState state) {
066      super.copyFrom(state);
067      DiffCompressionState state2 = (DiffCompressionState) state;
068      timestamp = state2.timestamp;
069    }
070  }
071
072  private void uncompressSingleKeyValue(DataInputStream source, ByteBuffer buffer,
073    DiffCompressionState state) throws IOException, EncoderBufferTooSmallException {
074    // read the column family at the beginning
075    if (state.isFirst()) {
076      state.familyLength = source.readByte();
077      state.familyNameWithSize =
078        new byte[(state.familyLength & 0xff) + KeyValue.FAMILY_LENGTH_SIZE];
079      state.familyNameWithSize[0] = state.familyLength;
080      int read =
081        source.read(state.familyNameWithSize, KeyValue.FAMILY_LENGTH_SIZE, state.familyLength);
082      assert read == state.familyLength;
083    }
084
085    // read flag
086    byte flag = source.readByte();
087
088    // read key/value/common lengths
089    int keyLength;
090    int valueLength;
091    if ((flag & FLAG_SAME_KEY_LENGTH) != 0) {
092      keyLength = state.keyLength;
093    } else {
094      keyLength = ByteBufferUtils.readCompressedInt(source);
095    }
096    if ((flag & FLAG_SAME_VALUE_LENGTH) != 0) {
097      valueLength = state.valueLength;
098    } else {
099      valueLength = ByteBufferUtils.readCompressedInt(source);
100    }
101    int commonPrefix = ByteBufferUtils.readCompressedInt(source);
102
103    // create KeyValue buffer and fill it prefix
104    int keyOffset = buffer.position();
105    ensureSpace(buffer, keyLength + valueLength + KeyValue.ROW_OFFSET);
106    buffer.putInt(keyLength);
107    buffer.putInt(valueLength);
108
109    // copy common from previous key
110    if (commonPrefix > 0) {
111      ByteBufferUtils.copyFromBufferToBuffer(buffer, buffer, state.prevOffset + KeyValue.ROW_OFFSET,
112        commonPrefix);
113    }
114
115    // copy the rest of the key from the buffer
116    int keyRestLength;
117    if (state.isFirst() || commonPrefix < state.rowLength + KeyValue.ROW_LENGTH_SIZE) {
118      // omit the family part of the key, it is always the same
119      short rowLength;
120      int rowRestLength;
121
122      // check length of row
123      if (commonPrefix < KeyValue.ROW_LENGTH_SIZE) {
124        // not yet copied, do it now
125        ByteBufferUtils.copyFromStreamToBuffer(buffer, source,
126          KeyValue.ROW_LENGTH_SIZE - commonPrefix);
127        ByteBufferUtils.skip(buffer, -KeyValue.ROW_LENGTH_SIZE);
128        rowLength = buffer.getShort();
129        rowRestLength = rowLength;
130      } else {
131        // already in buffer, just read it
132        rowLength = buffer.getShort(keyOffset + KeyValue.ROW_OFFSET);
133        rowRestLength = rowLength + KeyValue.ROW_LENGTH_SIZE - commonPrefix;
134      }
135
136      // copy the rest of row
137      ByteBufferUtils.copyFromStreamToBuffer(buffer, source, rowRestLength);
138      state.rowLength = rowLength;
139
140      // copy the column family
141      buffer.put(state.familyNameWithSize);
142
143      keyRestLength = keyLength - rowLength - state.familyNameWithSize.length
144        - (KeyValue.ROW_LENGTH_SIZE + KeyValue.TIMESTAMP_TYPE_SIZE);
145    } else {
146      // prevRowWithSizeLength is the same as on previous row
147      keyRestLength = keyLength - commonPrefix - KeyValue.TIMESTAMP_TYPE_SIZE;
148    }
149    // copy the rest of the key, after column family -> column qualifier
150    ByteBufferUtils.copyFromStreamToBuffer(buffer, source, keyRestLength);
151
152    // handle timestamp
153    int timestampFitsInBytes = ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH) + 1;
154    long timestamp = ByteBufferUtils.readLong(source, timestampFitsInBytes);
155    if ((flag & FLAG_TIMESTAMP_SIGN) != 0) {
156      timestamp = -timestamp;
157    }
158    if ((flag & FLAG_TIMESTAMP_IS_DIFF) != 0) {
159      timestamp = state.timestamp - timestamp;
160    }
161    buffer.putLong(timestamp);
162
163    // copy the type field
164    byte type;
165    if ((flag & FLAG_SAME_TYPE) != 0) {
166      type = state.type;
167    } else {
168      type = source.readByte();
169    }
170    buffer.put(type);
171
172    // copy value part
173    ByteBufferUtils.copyFromStreamToBuffer(buffer, source, valueLength);
174
175    state.keyLength = keyLength;
176    state.valueLength = valueLength;
177    state.prevOffset = keyOffset;
178    state.timestamp = timestamp;
179    state.type = type;
180    // state.qualifier is unused
181  }
182
183  @Override
184  public int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingContext,
185    DataOutputStream out) throws IOException {
186    EncodingState state = encodingContext.getEncodingState();
187    int size = compressSingleKeyValue(out, cell, state.prevCell);
188    size += afterEncodingKeyValue(cell, out, encodingContext);
189    state.prevCell = cell;
190    return size;
191  }
192
193  private int compressSingleKeyValue(DataOutputStream out, Cell cell, Cell prevCell)
194    throws IOException {
195    int flag = 0; // Do not use more bits that can fit into a byte
196    int kLength = KeyValueUtil.keyLength(cell);
197    int vLength = cell.getValueLength();
198
199    long timestamp;
200    long diffTimestamp = 0;
201    int diffTimestampFitsInBytes = 0;
202    int timestampFitsInBytes;
203    int commonPrefix = 0;
204
205    if (prevCell == null) {
206      timestamp = cell.getTimestamp();
207      if (timestamp < 0) {
208        flag |= FLAG_TIMESTAMP_SIGN;
209        timestamp = -timestamp;
210      }
211      timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp);
212      flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
213      // put column family
214      byte familyLength = cell.getFamilyLength();
215      out.write(familyLength);
216      PrivateCellUtil.writeFamily(out, cell, familyLength);
217    } else {
218      // Finding common prefix
219      int preKeyLength = KeyValueUtil.keyLength(prevCell);
220      commonPrefix = PrivateCellUtil.findCommonPrefixInFlatKey(cell, prevCell, true, false);
221      if (kLength == preKeyLength) {
222        flag |= FLAG_SAME_KEY_LENGTH;
223      }
224      if (vLength == prevCell.getValueLength()) {
225        flag |= FLAG_SAME_VALUE_LENGTH;
226      }
227      if (cell.getTypeByte() == prevCell.getTypeByte()) {
228        flag |= FLAG_SAME_TYPE;
229      }
230      // don't compress timestamp and type using prefix encode timestamp
231      timestamp = cell.getTimestamp();
232      diffTimestamp = prevCell.getTimestamp() - timestamp;
233      boolean negativeTimestamp = timestamp < 0;
234      if (negativeTimestamp) {
235        timestamp = -timestamp;
236      }
237      timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp);
238      boolean minusDiffTimestamp = diffTimestamp < 0;
239      if (minusDiffTimestamp) {
240        diffTimestamp = -diffTimestamp;
241      }
242      diffTimestampFitsInBytes = ByteBufferUtils.longFitsIn(diffTimestamp);
243      if (diffTimestampFitsInBytes < timestampFitsInBytes) {
244        flag |= (diffTimestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
245        flag |= FLAG_TIMESTAMP_IS_DIFF;
246        if (minusDiffTimestamp) {
247          flag |= FLAG_TIMESTAMP_SIGN;
248        }
249      } else {
250        flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
251        if (negativeTimestamp) {
252          flag |= FLAG_TIMESTAMP_SIGN;
253        }
254      }
255    }
256    out.write(flag);
257    if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
258      ByteBufferUtils.putCompressedInt(out, kLength);
259    }
260    if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
261      ByteBufferUtils.putCompressedInt(out, vLength);
262    }
263    ByteBufferUtils.putCompressedInt(out, commonPrefix);
264    short rLen = cell.getRowLength();
265    if (commonPrefix < rLen + KeyValue.ROW_LENGTH_SIZE) {
266      // Previous and current rows are different. Copy the differing part of
267      // the row, skip the column family, and copy the qualifier.
268      PrivateCellUtil.writeRowKeyExcludingCommon(cell, rLen, commonPrefix, out);
269      PrivateCellUtil.writeQualifier(out, cell, cell.getQualifierLength());
270    } else {
271      // The common part includes the whole row. As the column family is the
272      // same across the whole file, it will automatically be included in the
273      // common prefix, so we need not special-case it here.
274      // What we write here is the non common part of the qualifier
275      int commonQualPrefix = commonPrefix - (rLen + KeyValue.ROW_LENGTH_SIZE)
276        - (cell.getFamilyLength() + KeyValue.FAMILY_LENGTH_SIZE);
277      PrivateCellUtil.writeQualifierSkippingBytes(out, cell, cell.getQualifierLength(),
278        commonQualPrefix);
279    }
280    if ((flag & FLAG_TIMESTAMP_IS_DIFF) == 0) {
281      ByteBufferUtils.putLong(out, timestamp, timestampFitsInBytes);
282    } else {
283      ByteBufferUtils.putLong(out, diffTimestamp, diffTimestampFitsInBytes);
284    }
285
286    if ((flag & FLAG_SAME_TYPE) == 0) {
287      out.write(cell.getTypeByte());
288    }
289    PrivateCellUtil.writeValue(out, cell, vLength);
290    return kLength + vLength + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE;
291  }
292
293  @Override
294  public Cell getFirstKeyCellInBlock(ByteBuff block) {
295    block.mark();
296    block.position(Bytes.SIZEOF_INT);
297    byte familyLength = block.get();
298    block.skip(familyLength);
299    byte flag = block.get();
300    int keyLength = ByteBuff.readCompressedInt(block);
301    // TODO : See if we can avoid these reads as the read values are not getting used
302    ByteBuff.readCompressedInt(block); // valueLength
303    ByteBuff.readCompressedInt(block); // commonLength
304    ByteBuffer result = ByteBuffer.allocate(keyLength);
305
306    // copy row
307    assert !result.isDirect();
308    int pos = result.arrayOffset();
309    block.get(result.array(), pos, Bytes.SIZEOF_SHORT);
310    pos += Bytes.SIZEOF_SHORT;
311    short rowLength = result.getShort();
312    block.get(result.array(), pos, rowLength);
313    pos += rowLength;
314
315    // copy family
316    int savePosition = block.position();
317    block.position(Bytes.SIZEOF_INT);
318    block.get(result.array(), pos, familyLength + Bytes.SIZEOF_BYTE);
319    pos += familyLength + Bytes.SIZEOF_BYTE;
320
321    // copy qualifier
322    block.position(savePosition);
323    int qualifierLength = keyLength - pos + result.arrayOffset() - KeyValue.TIMESTAMP_TYPE_SIZE;
324    block.get(result.array(), pos, qualifierLength);
325    pos += qualifierLength;
326
327    // copy the timestamp and type
328    int timestampFitInBytes = ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH) + 1;
329    long timestamp = ByteBuff.readLong(block, timestampFitInBytes);
330    if ((flag & FLAG_TIMESTAMP_SIGN) != 0) {
331      timestamp = -timestamp;
332    }
333    result.putLong(pos, timestamp);
334    pos += Bytes.SIZEOF_LONG;
335    block.get(result.array(), pos, Bytes.SIZEOF_BYTE);
336
337    block.reset();
338    // The result is already a BB. So always we will create a KeyOnlyKv.
339    return new KeyValue.KeyOnlyKeyValue(result.array(), 0, keyLength);
340  }
341
342  @Override
343  public String toString() {
344    return DiffKeyDeltaEncoder.class.getSimpleName();
345  }
346
347  protected static class DiffSeekerState extends SeekerState {
348
349    private int rowLengthWithSize;
350    private long timestamp;
351
352    public DiffSeekerState(ObjectIntPair<ByteBuffer> tmpPair, boolean includeTags) {
353      super(tmpPair, includeTags);
354    }
355
356    @Override
357    protected void copyFromNext(SeekerState that) {
358      super.copyFromNext(that);
359      DiffSeekerState other = (DiffSeekerState) that;
360      rowLengthWithSize = other.rowLengthWithSize;
361      timestamp = other.timestamp;
362    }
363  }
364
365  @Override
366  public EncodedSeeker createSeeker(HFileBlockDecodingContext decodingCtx) {
367    return new DiffSeekerStateBufferedEncodedSeeker(decodingCtx);
368  }
369
370  @Override
371  protected ByteBuffer internalDecodeKeyValues(DataInputStream source, int allocateHeaderLength,
372    int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx) throws IOException {
373    int decompressedSize = source.readInt();
374    ByteBuffer buffer = ByteBuffer.allocate(decompressedSize + allocateHeaderLength);
375    buffer.position(allocateHeaderLength);
376    DiffCompressionState state = new DiffCompressionState();
377    while (source.available() > skipLastBytes) {
378      uncompressSingleKeyValue(source, buffer, state);
379      afterDecodingKeyValue(source, buffer, decodingCtx);
380    }
381
382    if (source.available() != skipLastBytes) {
383      throw new IllegalStateException("Read too much bytes.");
384    }
385
386    return buffer;
387  }
388
389  private static class DiffSeekerStateBufferedEncodedSeeker
390    extends BufferedEncodedSeeker<DiffSeekerState> {
391    private byte[] familyNameWithSize;
392    private static final int TIMESTAMP_WITH_TYPE_LENGTH = Bytes.SIZEOF_LONG + Bytes.SIZEOF_BYTE;
393
394    private DiffSeekerStateBufferedEncodedSeeker(HFileBlockDecodingContext decodingCtx) {
395      super(decodingCtx);
396    }
397
398    private void decode(boolean isFirst) {
399      byte flag = currentBuffer.get();
400      byte type = 0;
401      if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
402        if (!isFirst) {
403          type = current.keyBuffer[current.keyLength - Bytes.SIZEOF_BYTE];
404        }
405        current.keyLength = ByteBuff.readCompressedInt(currentBuffer);
406      }
407      if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
408        current.valueLength = ByteBuff.readCompressedInt(currentBuffer);
409      }
410      current.lastCommonPrefix = ByteBuff.readCompressedInt(currentBuffer);
411
412      current.ensureSpaceForKey();
413
414      if (current.lastCommonPrefix < Bytes.SIZEOF_SHORT) {
415        // length of row is different, copy everything except family
416
417        // copy the row size
418        currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
419          Bytes.SIZEOF_SHORT - current.lastCommonPrefix);
420        current.rowLengthWithSize = Bytes.toShort(current.keyBuffer, 0) + Bytes.SIZEOF_SHORT;
421
422        // copy the rest of row
423        currentBuffer.get(current.keyBuffer, Bytes.SIZEOF_SHORT,
424          current.rowLengthWithSize - Bytes.SIZEOF_SHORT);
425
426        // copy the column family
427        System.arraycopy(familyNameWithSize, 0, current.keyBuffer, current.rowLengthWithSize,
428          familyNameWithSize.length);
429
430        // copy the qualifier
431        currentBuffer.get(current.keyBuffer, current.rowLengthWithSize + familyNameWithSize.length,
432          current.keyLength - current.rowLengthWithSize - familyNameWithSize.length
433            - TIMESTAMP_WITH_TYPE_LENGTH);
434      } else if (current.lastCommonPrefix < current.rowLengthWithSize) {
435        // we have to copy part of row and qualifier,
436        // but column family is in right place
437
438        // before column family (rest of row)
439        currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
440          current.rowLengthWithSize - current.lastCommonPrefix);
441
442        // after column family (qualifier)
443        currentBuffer.get(current.keyBuffer, current.rowLengthWithSize + familyNameWithSize.length,
444          current.keyLength - current.rowLengthWithSize - familyNameWithSize.length
445            - TIMESTAMP_WITH_TYPE_LENGTH);
446      } else {
447        // copy just the ending
448        currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
449          current.keyLength - TIMESTAMP_WITH_TYPE_LENGTH - current.lastCommonPrefix);
450      }
451
452      // timestamp
453      int pos = current.keyLength - TIMESTAMP_WITH_TYPE_LENGTH;
454      int timestampFitInBytes = 1 + ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH);
455      long timestampOrDiff = ByteBuff.readLong(currentBuffer, timestampFitInBytes);
456      if ((flag & FLAG_TIMESTAMP_SIGN) != 0) {
457        timestampOrDiff = -timestampOrDiff;
458      }
459      if ((flag & FLAG_TIMESTAMP_IS_DIFF) == 0) { // it is timestamp
460        current.timestamp = timestampOrDiff;
461      } else { // it is diff
462        current.timestamp = current.timestamp - timestampOrDiff;
463      }
464      Bytes.putLong(current.keyBuffer, pos, current.timestamp);
465      pos += Bytes.SIZEOF_LONG;
466
467      // type
468      if ((flag & FLAG_SAME_TYPE) == 0) {
469        currentBuffer.get(current.keyBuffer, pos, Bytes.SIZEOF_BYTE);
470      } else if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
471        current.keyBuffer[pos] = type;
472      }
473
474      current.valueOffset = currentBuffer.position();
475      currentBuffer.skip(current.valueLength);
476
477      if (includesTags()) {
478        decodeTags();
479      }
480      if (includesMvcc()) {
481        current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
482      } else {
483        current.memstoreTS = 0;
484      }
485      current.nextKvOffset = currentBuffer.position();
486    }
487
488    @Override
489    protected void decodeFirst() {
490      currentBuffer.skip(Bytes.SIZEOF_INT);
491
492      // read column family
493      byte familyNameLength = currentBuffer.get();
494      familyNameWithSize = new byte[familyNameLength + Bytes.SIZEOF_BYTE];
495      familyNameWithSize[0] = familyNameLength;
496      currentBuffer.get(familyNameWithSize, Bytes.SIZEOF_BYTE, familyNameLength);
497      decode(true);
498    }
499
500    @Override
501    protected void decodeNext() {
502      decode(false);
503    }
504
505    @Override
506    protected DiffSeekerState createSeekerState() {
507      return new DiffSeekerState(this.tmpPair, this.includesTags());
508    }
509  }
510}