001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.encoding; 019 020import java.io.DataInputStream; 021import java.io.DataOutput; 022import java.io.DataOutputStream; 023import java.io.IOException; 024import java.nio.ByteBuffer; 025import org.apache.hadoop.hbase.Cell; 026import org.apache.hadoop.hbase.KeyValue; 027import org.apache.hadoop.hbase.KeyValueUtil; 028import org.apache.hadoop.hbase.PrivateCellUtil; 029import org.apache.hadoop.hbase.nio.ByteBuff; 030import org.apache.hadoop.hbase.util.ByteBufferUtils; 031import org.apache.hadoop.hbase.util.Bytes; 032import org.apache.hadoop.hbase.util.ObjectIntPair; 033import org.apache.yetus.audience.InterfaceAudience; 034 035/** 036 * Encoder similar to {@link DiffKeyDeltaEncoder} but supposedly faster. Compress using: - store 037 * size of common prefix - save column family once in the first KeyValue - use integer compression 038 * for key, value and prefix (7-bit encoding) - use bits to avoid duplication key length, value 039 * length and type if it same as previous - store in 3 bits length of prefix timestamp with previous 040 * KeyValue's timestamp - one bit which allow to omit value if it is the same Format: - 1 byte: flag 041 * - 1-5 bytes: key length (only if FLAG_SAME_KEY_LENGTH is not set in flag) - 1-5 bytes: value 042 * length (only if FLAG_SAME_VALUE_LENGTH is not set in flag) - 1-5 bytes: prefix length - ... 043 * bytes: rest of the row (if prefix length is small enough) - ... bytes: qualifier (or suffix 044 * depending on prefix length) - 1-8 bytes: timestamp suffix - 1 byte: type (only if FLAG_SAME_TYPE 045 * is not set in the flag) - ... bytes: value (only if FLAG_SAME_VALUE is not set in the flag) 046 */ 047@InterfaceAudience.Private 048public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder { 049 static final int MASK_TIMESTAMP_LENGTH = (1 << 0) | (1 << 1) | (1 << 2); 050 static final int SHIFT_TIMESTAMP_LENGTH = 0; 051 static final int FLAG_SAME_KEY_LENGTH = 1 << 3; 052 static final int FLAG_SAME_VALUE_LENGTH = 1 << 4; 053 static final int FLAG_SAME_TYPE = 1 << 5; 054 static final int FLAG_SAME_VALUE = 1 << 6; 055 056 private static class FastDiffCompressionState extends CompressionState { 057 byte[] timestamp = new byte[KeyValue.TIMESTAMP_SIZE]; 058 int prevTimestampOffset; 059 060 @Override 061 protected void readTimestamp(ByteBuffer in) { 062 in.get(timestamp); 063 } 064 065 @Override 066 void copyFrom(CompressionState state) { 067 super.copyFrom(state); 068 FastDiffCompressionState state2 = (FastDiffCompressionState) state; 069 System.arraycopy(state2.timestamp, 0, timestamp, 0, KeyValue.TIMESTAMP_SIZE); 070 prevTimestampOffset = state2.prevTimestampOffset; 071 } 072 073 /** 074 * Copies the first key/value from the given stream, and initializes decompression state based 075 * on it. Assumes that we have already read key and value lengths. Does not set 076 * {@link #qualifierLength} (not used by decompression) or {@link #prevOffset} (set by the calle 077 * afterwards). 078 */ 079 private void decompressFirstKV(ByteBuffer out, DataInputStream in) throws IOException { 080 int kvPos = out.position(); 081 out.putInt(keyLength); 082 out.putInt(valueLength); 083 prevTimestampOffset = out.position() + keyLength - KeyValue.TIMESTAMP_TYPE_SIZE; 084 ByteBufferUtils.copyFromStreamToBuffer(out, in, keyLength + valueLength); 085 rowLength = out.getShort(kvPos + KeyValue.ROW_OFFSET); 086 familyLength = out.get(kvPos + KeyValue.ROW_OFFSET + KeyValue.ROW_LENGTH_SIZE + rowLength); 087 type = out.get(prevTimestampOffset + KeyValue.TIMESTAMP_SIZE); 088 } 089 090 } 091 092 private int findCommonTimestampPrefix(byte[] curTsBuf, byte[] prevTsBuf) { 093 int commonPrefix = 0; 094 while ( 095 commonPrefix < (KeyValue.TIMESTAMP_SIZE - 1) 096 && curTsBuf[commonPrefix] == prevTsBuf[commonPrefix] 097 ) { 098 commonPrefix++; 099 } 100 return commonPrefix; // has to be at most 7 bytes 101 } 102 103 private void uncompressSingleKeyValue(DataInputStream source, ByteBuffer out, 104 FastDiffCompressionState state) throws IOException, EncoderBufferTooSmallException { 105 byte flag = source.readByte(); 106 int prevKeyLength = state.keyLength; 107 108 if ((flag & FLAG_SAME_KEY_LENGTH) == 0) { 109 state.keyLength = ByteBufferUtils.readCompressedInt(source); 110 } 111 if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) { 112 state.valueLength = ByteBufferUtils.readCompressedInt(source); 113 } 114 int commonLength = ByteBufferUtils.readCompressedInt(source); 115 116 ensureSpace(out, state.keyLength + state.valueLength + KeyValue.ROW_OFFSET); 117 118 int kvPos = out.position(); 119 120 if (!state.isFirst()) { 121 // copy the prefix 122 int common; 123 int prevOffset; 124 125 if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) { 126 out.putInt(state.keyLength); 127 out.putInt(state.valueLength); 128 prevOffset = state.prevOffset + KeyValue.ROW_OFFSET; 129 common = commonLength; 130 } else { 131 if ((flag & FLAG_SAME_KEY_LENGTH) != 0) { 132 prevOffset = state.prevOffset; 133 common = commonLength + KeyValue.ROW_OFFSET; 134 } else { 135 out.putInt(state.keyLength); 136 prevOffset = state.prevOffset + KeyValue.KEY_LENGTH_SIZE; 137 common = commonLength + KeyValue.KEY_LENGTH_SIZE; 138 } 139 } 140 141 ByteBufferUtils.copyFromBufferToBuffer(out, out, prevOffset, common); 142 143 // copy the rest of the key from the buffer 144 int keyRestLength; 145 if (commonLength < state.rowLength + KeyValue.ROW_LENGTH_SIZE) { 146 // omit the family part of the key, it is always the same 147 int rowWithSizeLength; 148 int rowRestLength; 149 150 // check length of row 151 if (commonLength < KeyValue.ROW_LENGTH_SIZE) { 152 // not yet copied, do it now 153 ByteBufferUtils.copyFromStreamToBuffer(out, source, 154 KeyValue.ROW_LENGTH_SIZE - commonLength); 155 156 rowWithSizeLength = 157 out.getShort(out.position() - KeyValue.ROW_LENGTH_SIZE) + KeyValue.ROW_LENGTH_SIZE; 158 rowRestLength = rowWithSizeLength - KeyValue.ROW_LENGTH_SIZE; 159 } else { 160 // already in kvBuffer, just read it 161 rowWithSizeLength = out.getShort(kvPos + KeyValue.ROW_OFFSET) + KeyValue.ROW_LENGTH_SIZE; 162 rowRestLength = rowWithSizeLength - commonLength; 163 } 164 165 // copy the rest of row 166 ByteBufferUtils.copyFromStreamToBuffer(out, source, rowRestLength); 167 168 // copy the column family 169 ByteBufferUtils.copyFromBufferToBuffer(out, out, 170 state.prevOffset + KeyValue.ROW_OFFSET + KeyValue.ROW_LENGTH_SIZE + state.rowLength, 171 state.familyLength + KeyValue.FAMILY_LENGTH_SIZE); 172 state.rowLength = (short) (rowWithSizeLength - KeyValue.ROW_LENGTH_SIZE); 173 174 keyRestLength = state.keyLength - rowWithSizeLength - state.familyLength 175 - (KeyValue.FAMILY_LENGTH_SIZE + KeyValue.TIMESTAMP_TYPE_SIZE); 176 } else { 177 // prevRowWithSizeLength is the same as on previous row 178 keyRestLength = state.keyLength - commonLength - KeyValue.TIMESTAMP_TYPE_SIZE; 179 } 180 // copy the rest of the key, after column family == column qualifier 181 ByteBufferUtils.copyFromStreamToBuffer(out, source, keyRestLength); 182 183 // copy timestamp 184 int prefixTimestamp = (flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH; 185 ByteBufferUtils.copyFromBufferToBuffer(out, out, state.prevTimestampOffset, prefixTimestamp); 186 state.prevTimestampOffset = out.position() - prefixTimestamp; 187 ByteBufferUtils.copyFromStreamToBuffer(out, source, 188 KeyValue.TIMESTAMP_SIZE - prefixTimestamp); 189 190 // copy the type and value 191 if ((flag & FLAG_SAME_TYPE) != 0) { 192 out.put(state.type); 193 if ((flag & FLAG_SAME_VALUE) != 0) { 194 ByteBufferUtils.copyFromBufferToBuffer(out, out, 195 state.prevOffset + KeyValue.ROW_OFFSET + prevKeyLength, state.valueLength); 196 } else { 197 ByteBufferUtils.copyFromStreamToBuffer(out, source, state.valueLength); 198 } 199 } else { 200 if ((flag & FLAG_SAME_VALUE) != 0) { 201 ByteBufferUtils.copyFromStreamToBuffer(out, source, KeyValue.TYPE_SIZE); 202 ByteBufferUtils.copyFromBufferToBuffer(out, out, 203 state.prevOffset + KeyValue.ROW_OFFSET + prevKeyLength, state.valueLength); 204 } else { 205 ByteBufferUtils.copyFromStreamToBuffer(out, source, 206 state.valueLength + KeyValue.TYPE_SIZE); 207 } 208 state.type = out.get(state.prevTimestampOffset + KeyValue.TIMESTAMP_SIZE); 209 } 210 } else { // this is the first element 211 state.decompressFirstKV(out, source); 212 } 213 214 state.prevOffset = kvPos; 215 } 216 217 @Override 218 public int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingContext, 219 DataOutputStream out) throws IOException { 220 EncodingState state = encodingContext.getEncodingState(); 221 int size = compressSingleKeyValue(out, cell, state.prevCell); 222 size += afterEncodingKeyValue(cell, out, encodingContext); 223 state.prevCell = cell; 224 return size; 225 } 226 227 private int compressSingleKeyValue(DataOutputStream out, Cell cell, Cell prevCell) 228 throws IOException { 229 int flag = 0; // Do not use more bits than will fit into a byte 230 int kLength = KeyValueUtil.keyLength(cell); 231 int vLength = cell.getValueLength(); 232 233 if (prevCell == null) { 234 // copy the key, there is no common prefix with none 235 out.write(flag); 236 ByteBufferUtils.putCompressedInt(out, kLength); 237 ByteBufferUtils.putCompressedInt(out, vLength); 238 ByteBufferUtils.putCompressedInt(out, 0); 239 PrivateCellUtil.writeFlatKey(cell, (DataOutput) out); 240 // Write the value part 241 PrivateCellUtil.writeValue(out, cell, cell.getValueLength()); 242 } else { 243 int preKeyLength = KeyValueUtil.keyLength(prevCell); 244 int preValLength = prevCell.getValueLength(); 245 // find a common prefix and skip it 246 int commonPrefix = PrivateCellUtil.findCommonPrefixInFlatKey(cell, prevCell, true, false); 247 248 if (kLength == preKeyLength) { 249 flag |= FLAG_SAME_KEY_LENGTH; 250 } 251 if (vLength == prevCell.getValueLength()) { 252 flag |= FLAG_SAME_VALUE_LENGTH; 253 } 254 if (cell.getTypeByte() == prevCell.getTypeByte()) { 255 flag |= FLAG_SAME_TYPE; 256 } 257 258 byte[] curTsBuf = Bytes.toBytes(cell.getTimestamp()); 259 int commonTimestampPrefix = 260 findCommonTimestampPrefix(curTsBuf, Bytes.toBytes(prevCell.getTimestamp())); 261 262 flag |= commonTimestampPrefix << SHIFT_TIMESTAMP_LENGTH; 263 264 // Check if current and previous values are the same. Compare value 265 // length first as an optimization. 266 if ( 267 vLength == preValLength 268 && PrivateCellUtil.matchingValue(cell, prevCell, vLength, preValLength) 269 ) { 270 flag |= FLAG_SAME_VALUE; 271 } 272 273 out.write(flag); 274 if ((flag & FLAG_SAME_KEY_LENGTH) == 0) { 275 ByteBufferUtils.putCompressedInt(out, kLength); 276 } 277 if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) { 278 ByteBufferUtils.putCompressedInt(out, vLength); 279 } 280 ByteBufferUtils.putCompressedInt(out, commonPrefix); 281 short rLen = cell.getRowLength(); 282 if (commonPrefix < rLen + KeyValue.ROW_LENGTH_SIZE) { 283 // Previous and current rows are different. Copy the differing part of 284 // the row, skip the column family, and copy the qualifier. 285 PrivateCellUtil.writeRowKeyExcludingCommon(cell, rLen, commonPrefix, out); 286 PrivateCellUtil.writeQualifier(out, cell, cell.getQualifierLength()); 287 } else { 288 // The common part includes the whole row. As the column family is the 289 // same across the whole file, it will automatically be included in the 290 // common prefix, so we need not special-case it here. 291 // What we write here is the non common part of the qualifier 292 int commonQualPrefix = commonPrefix - (rLen + KeyValue.ROW_LENGTH_SIZE) 293 - (cell.getFamilyLength() + KeyValue.FAMILY_LENGTH_SIZE); 294 PrivateCellUtil.writeQualifierSkippingBytes(out, cell, cell.getQualifierLength(), 295 commonQualPrefix); 296 } 297 // Write non common ts part 298 out.write(curTsBuf, commonTimestampPrefix, KeyValue.TIMESTAMP_SIZE - commonTimestampPrefix); 299 300 // Write the type if it is not the same as before. 301 if ((flag & FLAG_SAME_TYPE) == 0) { 302 out.write(cell.getTypeByte()); 303 } 304 305 // Write the value if it is not the same as before. 306 if ((flag & FLAG_SAME_VALUE) == 0) { 307 PrivateCellUtil.writeValue(out, cell, vLength); 308 } 309 } 310 return kLength + vLength + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE; 311 } 312 313 @Override 314 protected ByteBuffer internalDecodeKeyValues(DataInputStream source, int allocateHeaderLength, 315 int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx) throws IOException { 316 int decompressedSize = source.readInt(); 317 ByteBuffer buffer = ByteBuffer.allocate(decompressedSize + allocateHeaderLength); 318 buffer.position(allocateHeaderLength); 319 FastDiffCompressionState state = new FastDiffCompressionState(); 320 while (source.available() > skipLastBytes) { 321 uncompressSingleKeyValue(source, buffer, state); 322 afterDecodingKeyValue(source, buffer, decodingCtx); 323 } 324 325 if (source.available() != skipLastBytes) { 326 throw new IllegalStateException("Read too much bytes."); 327 } 328 329 return buffer; 330 } 331 332 @Override 333 public Cell getFirstKeyCellInBlock(ByteBuff block) { 334 block.mark(); 335 block.position(Bytes.SIZEOF_INT + Bytes.SIZEOF_BYTE); 336 int keyLength = ByteBuff.readCompressedInt(block); 337 // TODO : See if we can avoid these reads as the read values are not getting used 338 ByteBuff.readCompressedInt(block); // valueLength 339 ByteBuff.readCompressedInt(block); // commonLength 340 ByteBuffer key = block.asSubByteBuffer(keyLength).duplicate(); 341 block.reset(); 342 return createFirstKeyCell(key, keyLength); 343 } 344 345 @Override 346 public String toString() { 347 return FastDiffDeltaEncoder.class.getSimpleName(); 348 } 349 350 protected static class FastDiffSeekerState extends SeekerState { 351 private byte[] prevTimestampAndType = new byte[KeyValue.TIMESTAMP_TYPE_SIZE]; 352 private int rowLengthWithSize; 353 private int familyLengthWithSize; 354 355 public FastDiffSeekerState(ObjectIntPair<ByteBuffer> tmpPair, boolean includeTags) { 356 super(tmpPair, includeTags); 357 } 358 359 @Override 360 protected void copyFromNext(SeekerState that) { 361 super.copyFromNext(that); 362 FastDiffSeekerState other = (FastDiffSeekerState) that; 363 System.arraycopy(other.prevTimestampAndType, 0, prevTimestampAndType, 0, 364 KeyValue.TIMESTAMP_TYPE_SIZE); 365 rowLengthWithSize = other.rowLengthWithSize; 366 familyLengthWithSize = other.familyLengthWithSize; 367 } 368 } 369 370 @Override 371 public EncodedSeeker createSeeker(final HFileBlockDecodingContext decodingCtx) { 372 return new FastDiffSeekerStateBufferedEncodedSeeker(decodingCtx); 373 } 374 375 private static class FastDiffSeekerStateBufferedEncodedSeeker 376 extends BufferedEncodedSeeker<FastDiffSeekerState> { 377 378 private FastDiffSeekerStateBufferedEncodedSeeker(HFileBlockDecodingContext decodingCtx) { 379 super(decodingCtx); 380 } 381 382 private void decode(boolean isFirst) { 383 byte flag = currentBuffer.get(); 384 if ((flag & FLAG_SAME_KEY_LENGTH) == 0) { 385 if (!isFirst) { 386 System.arraycopy(current.keyBuffer, 387 current.keyLength - current.prevTimestampAndType.length, current.prevTimestampAndType, 388 0, current.prevTimestampAndType.length); 389 } 390 current.keyLength = ByteBuff.readCompressedInt(currentBuffer); 391 } 392 if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) { 393 current.valueLength = ByteBuff.readCompressedInt(currentBuffer); 394 } 395 current.lastCommonPrefix = ByteBuff.readCompressedInt(currentBuffer); 396 397 current.ensureSpaceForKey(); 398 399 if (isFirst) { 400 // copy everything 401 currentBuffer.get(current.keyBuffer, current.lastCommonPrefix, 402 current.keyLength - current.prevTimestampAndType.length); 403 current.rowLengthWithSize = Bytes.toShort(current.keyBuffer, 0) + Bytes.SIZEOF_SHORT; 404 current.familyLengthWithSize = 405 current.keyBuffer[current.rowLengthWithSize] + Bytes.SIZEOF_BYTE; 406 } else if (current.lastCommonPrefix < Bytes.SIZEOF_SHORT) { 407 // length of row is different, copy everything except family 408 409 // copy the row size 410 int oldRowLengthWithSize = current.rowLengthWithSize; 411 currentBuffer.get(current.keyBuffer, current.lastCommonPrefix, 412 Bytes.SIZEOF_SHORT - current.lastCommonPrefix); 413 current.rowLengthWithSize = Bytes.toShort(current.keyBuffer, 0) + Bytes.SIZEOF_SHORT; 414 415 // move the column family 416 System.arraycopy(current.keyBuffer, oldRowLengthWithSize, current.keyBuffer, 417 current.rowLengthWithSize, current.familyLengthWithSize); 418 419 // copy the rest of row 420 currentBuffer.get(current.keyBuffer, Bytes.SIZEOF_SHORT, 421 current.rowLengthWithSize - Bytes.SIZEOF_SHORT); 422 423 // copy the qualifier 424 currentBuffer.get(current.keyBuffer, 425 current.rowLengthWithSize + current.familyLengthWithSize, 426 current.keyLength - current.rowLengthWithSize - current.familyLengthWithSize 427 - current.prevTimestampAndType.length); 428 } else if (current.lastCommonPrefix < current.rowLengthWithSize) { 429 // We have to copy part of row and qualifier, but the column family 430 // is in the right place. 431 432 // before column family (rest of row) 433 currentBuffer.get(current.keyBuffer, current.lastCommonPrefix, 434 current.rowLengthWithSize - current.lastCommonPrefix); 435 436 // after column family (qualifier) 437 currentBuffer.get(current.keyBuffer, 438 current.rowLengthWithSize + current.familyLengthWithSize, 439 current.keyLength - current.rowLengthWithSize - current.familyLengthWithSize 440 - current.prevTimestampAndType.length); 441 } else { 442 // copy just the ending 443 currentBuffer.get(current.keyBuffer, current.lastCommonPrefix, 444 current.keyLength - current.prevTimestampAndType.length - current.lastCommonPrefix); 445 } 446 447 // timestamp 448 int pos = current.keyLength - current.prevTimestampAndType.length; 449 int commonTimestampPrefix = (flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH; 450 if ((flag & FLAG_SAME_KEY_LENGTH) == 0) { 451 System.arraycopy(current.prevTimestampAndType, 0, current.keyBuffer, pos, 452 commonTimestampPrefix); 453 } 454 pos += commonTimestampPrefix; 455 currentBuffer.get(current.keyBuffer, pos, Bytes.SIZEOF_LONG - commonTimestampPrefix); 456 pos += Bytes.SIZEOF_LONG - commonTimestampPrefix; 457 458 // type 459 if ((flag & FLAG_SAME_TYPE) == 0) { 460 currentBuffer.get(current.keyBuffer, pos, Bytes.SIZEOF_BYTE); 461 } else if ((flag & FLAG_SAME_KEY_LENGTH) == 0) { 462 current.keyBuffer[pos] = current.prevTimestampAndType[Bytes.SIZEOF_LONG]; 463 } 464 465 // handle value 466 if ((flag & FLAG_SAME_VALUE) == 0) { 467 current.valueOffset = currentBuffer.position(); 468 currentBuffer.skip(current.valueLength); 469 } 470 471 if (includesTags()) { 472 decodeTags(); 473 } 474 if (includesMvcc()) { 475 current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer); 476 } else { 477 current.memstoreTS = 0; 478 } 479 current.nextKvOffset = currentBuffer.position(); 480 } 481 482 @Override 483 protected void decodeFirst() { 484 currentBuffer.skip(Bytes.SIZEOF_INT); 485 decode(true); 486 } 487 488 @Override 489 protected void decodeNext() { 490 decode(false); 491 } 492 493 @Override 494 protected FastDiffSeekerState createSeekerState() { 495 return new FastDiffSeekerState(this.tmpPair, this.includesTags()); 496 } 497 } 498}