001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.encoding; 019 020import java.io.DataInputStream; 021import java.io.DataOutputStream; 022import java.io.IOException; 023import java.nio.ByteBuffer; 024import org.apache.hadoop.hbase.ExtendedCell; 025import org.apache.hadoop.hbase.KeyValue; 026import org.apache.hadoop.hbase.KeyValueUtil; 027import org.apache.hadoop.hbase.PrivateCellUtil; 028import org.apache.hadoop.hbase.nio.ByteBuff; 029import org.apache.hadoop.hbase.util.ByteBufferUtils; 030import org.apache.hadoop.hbase.util.Bytes; 031import org.apache.hadoop.hbase.util.ObjectIntPair; 032import org.apache.yetus.audience.InterfaceAudience; 033 034/** 035 * Compress using: - store size of common prefix - save column family once, it is same within HFile 036 * - use integer compression for key, value and prefix (7-bit encoding) - use bits to avoid 037 * duplication key length, value length and type if it same as previous - store in 3 bits length of 038 * timestamp field - allow diff in timestamp instead of actual value Format: - 1 byte: flag - 1-5 039 * bytes: key length (only if FLAG_SAME_KEY_LENGTH is not set in flag) - 1-5 bytes: value length 040 * (only if FLAG_SAME_VALUE_LENGTH is not set in flag) - 1-5 bytes: prefix length - ... bytes: rest 041 * of the row (if prefix length is small enough) - ... bytes: qualifier (or suffix depending on 042 * prefix length) - 1-8 bytes: timestamp or diff - 1 byte: type (only if FLAG_SAME_TYPE is not set 043 * in the flag) - ... bytes: value 044 */ 045@InterfaceAudience.Private 046public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder { 047 static final int FLAG_SAME_KEY_LENGTH = 1; 048 static final int FLAG_SAME_VALUE_LENGTH = 1 << 1; 049 static final int FLAG_SAME_TYPE = 1 << 2; 050 static final int FLAG_TIMESTAMP_IS_DIFF = 1 << 3; 051 static final int MASK_TIMESTAMP_LENGTH = (1 << 4) | (1 << 5) | (1 << 6); 052 static final int SHIFT_TIMESTAMP_LENGTH = 4; 053 static final int FLAG_TIMESTAMP_SIGN = 1 << 7; 054 055 protected static class DiffCompressionState extends CompressionState { 056 long timestamp; 057 byte[] familyNameWithSize; 058 059 @Override 060 protected void readTimestamp(ByteBuffer in) { 061 timestamp = in.getLong(); 062 } 063 064 @Override 065 void copyFrom(CompressionState state) { 066 super.copyFrom(state); 067 DiffCompressionState state2 = (DiffCompressionState) state; 068 timestamp = state2.timestamp; 069 } 070 } 071 072 private void uncompressSingleKeyValue(DataInputStream source, ByteBuffer buffer, 073 DiffCompressionState state) throws IOException, EncoderBufferTooSmallException { 074 // read the column family at the beginning 075 if (state.isFirst()) { 076 state.familyLength = source.readByte(); 077 state.familyNameWithSize = 078 new byte[(state.familyLength & 0xff) + KeyValue.FAMILY_LENGTH_SIZE]; 079 state.familyNameWithSize[0] = state.familyLength; 080 int read = 081 source.read(state.familyNameWithSize, KeyValue.FAMILY_LENGTH_SIZE, state.familyLength); 082 assert read == state.familyLength; 083 } 084 085 // read flag 086 byte flag = source.readByte(); 087 088 // read key/value/common lengths 089 int keyLength; 090 int valueLength; 091 if ((flag & FLAG_SAME_KEY_LENGTH) != 0) { 092 keyLength = state.keyLength; 093 } else { 094 keyLength = ByteBufferUtils.readCompressedInt(source); 095 } 096 if ((flag & FLAG_SAME_VALUE_LENGTH) != 0) { 097 valueLength = state.valueLength; 098 } else { 099 valueLength = ByteBufferUtils.readCompressedInt(source); 100 } 101 int commonPrefix = ByteBufferUtils.readCompressedInt(source); 102 103 // create KeyValue buffer and fill it prefix 104 int keyOffset = buffer.position(); 105 ensureSpace(buffer, keyLength + valueLength + KeyValue.ROW_OFFSET); 106 buffer.putInt(keyLength); 107 buffer.putInt(valueLength); 108 109 // copy common from previous key 110 if (commonPrefix > 0) { 111 ByteBufferUtils.copyFromBufferToBuffer(buffer, buffer, state.prevOffset + KeyValue.ROW_OFFSET, 112 commonPrefix); 113 } 114 115 // copy the rest of the key from the buffer 116 int keyRestLength; 117 if (state.isFirst() || commonPrefix < state.rowLength + KeyValue.ROW_LENGTH_SIZE) { 118 // omit the family part of the key, it is always the same 119 short rowLength; 120 int rowRestLength; 121 122 // check length of row 123 if (commonPrefix < KeyValue.ROW_LENGTH_SIZE) { 124 // not yet copied, do it now 125 ByteBufferUtils.copyFromStreamToBuffer(buffer, source, 126 KeyValue.ROW_LENGTH_SIZE - commonPrefix); 127 ByteBufferUtils.skip(buffer, -KeyValue.ROW_LENGTH_SIZE); 128 rowLength = buffer.getShort(); 129 rowRestLength = rowLength; 130 } else { 131 // already in buffer, just read it 132 rowLength = buffer.getShort(keyOffset + KeyValue.ROW_OFFSET); 133 rowRestLength = rowLength + KeyValue.ROW_LENGTH_SIZE - commonPrefix; 134 } 135 136 // copy the rest of row 137 ByteBufferUtils.copyFromStreamToBuffer(buffer, source, rowRestLength); 138 state.rowLength = rowLength; 139 140 // copy the column family 141 buffer.put(state.familyNameWithSize); 142 143 keyRestLength = keyLength - rowLength - state.familyNameWithSize.length 144 - (KeyValue.ROW_LENGTH_SIZE + KeyValue.TIMESTAMP_TYPE_SIZE); 145 } else { 146 // prevRowWithSizeLength is the same as on previous row 147 keyRestLength = keyLength - commonPrefix - KeyValue.TIMESTAMP_TYPE_SIZE; 148 } 149 // copy the rest of the key, after column family -> column qualifier 150 ByteBufferUtils.copyFromStreamToBuffer(buffer, source, keyRestLength); 151 152 // handle timestamp 153 int timestampFitsInBytes = ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH) + 1; 154 long timestamp = ByteBufferUtils.readLong(source, timestampFitsInBytes); 155 if ((flag & FLAG_TIMESTAMP_SIGN) != 0) { 156 timestamp = -timestamp; 157 } 158 if ((flag & FLAG_TIMESTAMP_IS_DIFF) != 0) { 159 timestamp = state.timestamp - timestamp; 160 } 161 buffer.putLong(timestamp); 162 163 // copy the type field 164 byte type; 165 if ((flag & FLAG_SAME_TYPE) != 0) { 166 type = state.type; 167 } else { 168 type = source.readByte(); 169 } 170 buffer.put(type); 171 172 // copy value part 173 ByteBufferUtils.copyFromStreamToBuffer(buffer, source, valueLength); 174 175 state.keyLength = keyLength; 176 state.valueLength = valueLength; 177 state.prevOffset = keyOffset; 178 state.timestamp = timestamp; 179 state.type = type; 180 // state.qualifier is unused 181 } 182 183 @Override 184 public int internalEncode(ExtendedCell cell, HFileBlockDefaultEncodingContext encodingContext, 185 DataOutputStream out) throws IOException { 186 EncodingState state = encodingContext.getEncodingState(); 187 int size = compressSingleKeyValue(out, cell, state.prevCell); 188 size += afterEncodingKeyValue(cell, out, encodingContext); 189 state.prevCell = cell; 190 return size; 191 } 192 193 private int compressSingleKeyValue(DataOutputStream out, ExtendedCell cell, ExtendedCell prevCell) 194 throws IOException { 195 int flag = 0; // Do not use more bits that can fit into a byte 196 int kLength = KeyValueUtil.keyLength(cell); 197 int vLength = cell.getValueLength(); 198 199 long timestamp; 200 long diffTimestamp = 0; 201 int diffTimestampFitsInBytes = 0; 202 int timestampFitsInBytes; 203 int commonPrefix = 0; 204 205 if (prevCell == null) { 206 timestamp = cell.getTimestamp(); 207 if (timestamp < 0) { 208 flag |= FLAG_TIMESTAMP_SIGN; 209 timestamp = -timestamp; 210 } 211 timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp); 212 flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH; 213 // put column family 214 byte familyLength = cell.getFamilyLength(); 215 out.write(familyLength); 216 PrivateCellUtil.writeFamily(out, cell, familyLength); 217 } else { 218 // Finding common prefix 219 int preKeyLength = KeyValueUtil.keyLength(prevCell); 220 commonPrefix = PrivateCellUtil.findCommonPrefixInFlatKey(cell, prevCell, true, false); 221 if (kLength == preKeyLength) { 222 flag |= FLAG_SAME_KEY_LENGTH; 223 } 224 if (vLength == prevCell.getValueLength()) { 225 flag |= FLAG_SAME_VALUE_LENGTH; 226 } 227 if (cell.getTypeByte() == prevCell.getTypeByte()) { 228 flag |= FLAG_SAME_TYPE; 229 } 230 // don't compress timestamp and type using prefix encode timestamp 231 timestamp = cell.getTimestamp(); 232 diffTimestamp = prevCell.getTimestamp() - timestamp; 233 boolean negativeTimestamp = timestamp < 0; 234 if (negativeTimestamp) { 235 timestamp = -timestamp; 236 } 237 timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp); 238 boolean minusDiffTimestamp = diffTimestamp < 0; 239 if (minusDiffTimestamp) { 240 diffTimestamp = -diffTimestamp; 241 } 242 diffTimestampFitsInBytes = ByteBufferUtils.longFitsIn(diffTimestamp); 243 if (diffTimestampFitsInBytes < timestampFitsInBytes) { 244 flag |= (diffTimestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH; 245 flag |= FLAG_TIMESTAMP_IS_DIFF; 246 if (minusDiffTimestamp) { 247 flag |= FLAG_TIMESTAMP_SIGN; 248 } 249 } else { 250 flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH; 251 if (negativeTimestamp) { 252 flag |= FLAG_TIMESTAMP_SIGN; 253 } 254 } 255 } 256 out.write(flag); 257 if ((flag & FLAG_SAME_KEY_LENGTH) == 0) { 258 ByteBufferUtils.putCompressedInt(out, kLength); 259 } 260 if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) { 261 ByteBufferUtils.putCompressedInt(out, vLength); 262 } 263 ByteBufferUtils.putCompressedInt(out, commonPrefix); 264 short rLen = cell.getRowLength(); 265 if (commonPrefix < rLen + KeyValue.ROW_LENGTH_SIZE) { 266 // Previous and current rows are different. Copy the differing part of 267 // the row, skip the column family, and copy the qualifier. 268 PrivateCellUtil.writeRowKeyExcludingCommon(cell, rLen, commonPrefix, out); 269 PrivateCellUtil.writeQualifier(out, cell, cell.getQualifierLength()); 270 } else { 271 // The common part includes the whole row. As the column family is the 272 // same across the whole file, it will automatically be included in the 273 // common prefix, so we need not special-case it here. 274 // What we write here is the non common part of the qualifier 275 int commonQualPrefix = commonPrefix - (rLen + KeyValue.ROW_LENGTH_SIZE) 276 - (cell.getFamilyLength() + KeyValue.FAMILY_LENGTH_SIZE); 277 PrivateCellUtil.writeQualifierSkippingBytes(out, cell, cell.getQualifierLength(), 278 commonQualPrefix); 279 } 280 if ((flag & FLAG_TIMESTAMP_IS_DIFF) == 0) { 281 ByteBufferUtils.putLong(out, timestamp, timestampFitsInBytes); 282 } else { 283 ByteBufferUtils.putLong(out, diffTimestamp, diffTimestampFitsInBytes); 284 } 285 286 if ((flag & FLAG_SAME_TYPE) == 0) { 287 out.write(cell.getTypeByte()); 288 } 289 PrivateCellUtil.writeValue(out, cell, vLength); 290 return kLength + vLength + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE; 291 } 292 293 @Override 294 public ExtendedCell getFirstKeyCellInBlock(ByteBuff block) { 295 block.mark(); 296 block.position(Bytes.SIZEOF_INT); 297 byte familyLength = block.get(); 298 block.skip(familyLength); 299 byte flag = block.get(); 300 int keyLength = ByteBuff.readCompressedInt(block); 301 // TODO : See if we can avoid these reads as the read values are not getting used 302 ByteBuff.readCompressedInt(block); // valueLength 303 ByteBuff.readCompressedInt(block); // commonLength 304 ByteBuffer result = ByteBuffer.allocate(keyLength); 305 306 // copy row 307 assert !result.isDirect(); 308 int pos = result.arrayOffset(); 309 block.get(result.array(), pos, Bytes.SIZEOF_SHORT); 310 pos += Bytes.SIZEOF_SHORT; 311 short rowLength = result.getShort(); 312 block.get(result.array(), pos, rowLength); 313 pos += rowLength; 314 315 // copy family 316 int savePosition = block.position(); 317 block.position(Bytes.SIZEOF_INT); 318 block.get(result.array(), pos, familyLength + Bytes.SIZEOF_BYTE); 319 pos += familyLength + Bytes.SIZEOF_BYTE; 320 321 // copy qualifier 322 block.position(savePosition); 323 int qualifierLength = keyLength - pos + result.arrayOffset() - KeyValue.TIMESTAMP_TYPE_SIZE; 324 block.get(result.array(), pos, qualifierLength); 325 pos += qualifierLength; 326 327 // copy the timestamp and type 328 int timestampFitInBytes = ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH) + 1; 329 long timestamp = ByteBuff.readLong(block, timestampFitInBytes); 330 if ((flag & FLAG_TIMESTAMP_SIGN) != 0) { 331 timestamp = -timestamp; 332 } 333 result.putLong(pos, timestamp); 334 pos += Bytes.SIZEOF_LONG; 335 block.get(result.array(), pos, Bytes.SIZEOF_BYTE); 336 337 block.reset(); 338 // The result is already a BB. So always we will create a KeyOnlyKv. 339 return new KeyValue.KeyOnlyKeyValue(result.array(), 0, keyLength); 340 } 341 342 @Override 343 public String toString() { 344 return DiffKeyDeltaEncoder.class.getSimpleName(); 345 } 346 347 protected static class DiffSeekerState extends SeekerState { 348 349 private int rowLengthWithSize; 350 private long timestamp; 351 352 public DiffSeekerState(ObjectIntPair<ByteBuffer> tmpPair, boolean includeTags) { 353 super(tmpPair, includeTags); 354 } 355 356 @Override 357 protected void copyFromNext(SeekerState that) { 358 super.copyFromNext(that); 359 DiffSeekerState other = (DiffSeekerState) that; 360 rowLengthWithSize = other.rowLengthWithSize; 361 timestamp = other.timestamp; 362 } 363 } 364 365 @Override 366 public EncodedSeeker createSeeker(HFileBlockDecodingContext decodingCtx) { 367 return new DiffSeekerStateBufferedEncodedSeeker(decodingCtx); 368 } 369 370 @Override 371 protected ByteBuffer internalDecodeKeyValues(DataInputStream source, int allocateHeaderLength, 372 int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx) throws IOException { 373 int decompressedSize = source.readInt(); 374 ByteBuffer buffer = ByteBuffer.allocate(decompressedSize + allocateHeaderLength); 375 buffer.position(allocateHeaderLength); 376 DiffCompressionState state = new DiffCompressionState(); 377 while (source.available() > skipLastBytes) { 378 uncompressSingleKeyValue(source, buffer, state); 379 afterDecodingKeyValue(source, buffer, decodingCtx); 380 } 381 382 if (source.available() != skipLastBytes) { 383 throw new IllegalStateException("Read too much bytes."); 384 } 385 386 return buffer; 387 } 388 389 private static class DiffSeekerStateBufferedEncodedSeeker 390 extends BufferedEncodedSeeker<DiffSeekerState> { 391 private byte[] familyNameWithSize; 392 private static final int TIMESTAMP_WITH_TYPE_LENGTH = Bytes.SIZEOF_LONG + Bytes.SIZEOF_BYTE; 393 394 private DiffSeekerStateBufferedEncodedSeeker(HFileBlockDecodingContext decodingCtx) { 395 super(decodingCtx); 396 } 397 398 private void decode(boolean isFirst) { 399 byte flag = currentBuffer.get(); 400 byte type = 0; 401 if ((flag & FLAG_SAME_KEY_LENGTH) == 0) { 402 if (!isFirst) { 403 type = current.keyBuffer[current.keyLength - Bytes.SIZEOF_BYTE]; 404 } 405 current.keyLength = ByteBuff.readCompressedInt(currentBuffer); 406 } 407 if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) { 408 current.valueLength = ByteBuff.readCompressedInt(currentBuffer); 409 } 410 current.lastCommonPrefix = ByteBuff.readCompressedInt(currentBuffer); 411 412 current.ensureSpaceForKey(); 413 414 if (current.lastCommonPrefix < Bytes.SIZEOF_SHORT) { 415 // length of row is different, copy everything except family 416 417 // copy the row size 418 currentBuffer.get(current.keyBuffer, current.lastCommonPrefix, 419 Bytes.SIZEOF_SHORT - current.lastCommonPrefix); 420 current.rowLengthWithSize = Bytes.toShort(current.keyBuffer, 0) + Bytes.SIZEOF_SHORT; 421 422 // copy the rest of row 423 currentBuffer.get(current.keyBuffer, Bytes.SIZEOF_SHORT, 424 current.rowLengthWithSize - Bytes.SIZEOF_SHORT); 425 426 // copy the column family 427 System.arraycopy(familyNameWithSize, 0, current.keyBuffer, current.rowLengthWithSize, 428 familyNameWithSize.length); 429 430 // copy the qualifier 431 currentBuffer.get(current.keyBuffer, current.rowLengthWithSize + familyNameWithSize.length, 432 current.keyLength - current.rowLengthWithSize - familyNameWithSize.length 433 - TIMESTAMP_WITH_TYPE_LENGTH); 434 } else if (current.lastCommonPrefix < current.rowLengthWithSize) { 435 // we have to copy part of row and qualifier, 436 // but column family is in right place 437 438 // before column family (rest of row) 439 currentBuffer.get(current.keyBuffer, current.lastCommonPrefix, 440 current.rowLengthWithSize - current.lastCommonPrefix); 441 442 // after column family (qualifier) 443 currentBuffer.get(current.keyBuffer, current.rowLengthWithSize + familyNameWithSize.length, 444 current.keyLength - current.rowLengthWithSize - familyNameWithSize.length 445 - TIMESTAMP_WITH_TYPE_LENGTH); 446 } else { 447 // copy just the ending 448 currentBuffer.get(current.keyBuffer, current.lastCommonPrefix, 449 current.keyLength - TIMESTAMP_WITH_TYPE_LENGTH - current.lastCommonPrefix); 450 } 451 452 // timestamp 453 int pos = current.keyLength - TIMESTAMP_WITH_TYPE_LENGTH; 454 int timestampFitInBytes = 1 + ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH); 455 long timestampOrDiff = ByteBuff.readLong(currentBuffer, timestampFitInBytes); 456 if ((flag & FLAG_TIMESTAMP_SIGN) != 0) { 457 timestampOrDiff = -timestampOrDiff; 458 } 459 if ((flag & FLAG_TIMESTAMP_IS_DIFF) == 0) { // it is timestamp 460 current.timestamp = timestampOrDiff; 461 } else { // it is diff 462 current.timestamp = current.timestamp - timestampOrDiff; 463 } 464 Bytes.putLong(current.keyBuffer, pos, current.timestamp); 465 pos += Bytes.SIZEOF_LONG; 466 467 // type 468 if ((flag & FLAG_SAME_TYPE) == 0) { 469 currentBuffer.get(current.keyBuffer, pos, Bytes.SIZEOF_BYTE); 470 } else if ((flag & FLAG_SAME_KEY_LENGTH) == 0) { 471 current.keyBuffer[pos] = type; 472 } 473 474 current.valueOffset = currentBuffer.position(); 475 currentBuffer.skip(current.valueLength); 476 477 if (includesTags()) { 478 decodeTags(); 479 } 480 if (includesMvcc()) { 481 current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer); 482 } else { 483 current.memstoreTS = 0; 484 } 485 current.nextKvOffset = currentBuffer.position(); 486 } 487 488 @Override 489 protected void decodeFirst() { 490 currentBuffer.skip(Bytes.SIZEOF_INT); 491 492 // read column family 493 byte familyNameLength = currentBuffer.get(); 494 familyNameWithSize = new byte[familyNameLength + Bytes.SIZEOF_BYTE]; 495 familyNameWithSize[0] = familyNameLength; 496 currentBuffer.get(familyNameWithSize, Bytes.SIZEOF_BYTE, familyNameLength); 497 decode(true); 498 } 499 500 @Override 501 protected void decodeNext() { 502 decode(false); 503 } 504 505 @Override 506 protected DiffSeekerState createSeekerState() { 507 return new DiffSeekerState(this.tmpPair, this.includesTags()); 508 } 509 } 510}