001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with this 004 * work for additional information regarding copyright ownership. The ASF 005 * licenses this file to you under the Apache License, Version 2.0 (the 006 * "License"); you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 013 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 014 * License for the specific language governing permissions and limitations 015 * under the License. 016 */ 017package org.apache.hadoop.hbase.io.encoding; 018 019import java.io.DataInputStream; 020import java.io.DataOutputStream; 021import java.io.IOException; 022import java.nio.ByteBuffer; 023import org.apache.hadoop.hbase.Cell; 024import org.apache.hadoop.hbase.KeyValue; 025import org.apache.hadoop.hbase.KeyValueUtil; 026import org.apache.hadoop.hbase.PrivateCellUtil; 027import org.apache.hadoop.hbase.nio.ByteBuff; 028import org.apache.hadoop.hbase.util.ByteBufferUtils; 029import org.apache.hadoop.hbase.util.Bytes; 030import org.apache.hadoop.hbase.util.ObjectIntPair; 031import org.apache.yetus.audience.InterfaceAudience; 032 033/** 034 * Compress using: 035 * - store size of common prefix 036 * - save column family once, it is same within HFile 037 * - use integer compression for key, value and prefix (7-bit encoding) 038 * - use bits to avoid duplication key length, value length 039 * and type if it same as previous 040 * - store in 3 bits length of timestamp field 041 * - allow diff in timestamp instead of actual value 042 * 043 * Format: 044 * - 1 byte: flag 045 * - 1-5 bytes: key length (only if FLAG_SAME_KEY_LENGTH is not set in flag) 046 * - 1-5 bytes: value length (only if FLAG_SAME_VALUE_LENGTH is not set in flag) 047 * - 1-5 bytes: prefix length 048 * - ... bytes: rest of the row (if prefix length is small enough) 049 * - ... bytes: qualifier (or suffix depending on prefix length) 050 * - 1-8 bytes: timestamp or diff 051 * - 1 byte: type (only if FLAG_SAME_TYPE is not set in the flag) 052 * - ... bytes: value 053 */ 054@InterfaceAudience.Private 055public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder { 056 static final int FLAG_SAME_KEY_LENGTH = 1; 057 static final int FLAG_SAME_VALUE_LENGTH = 1 << 1; 058 static final int FLAG_SAME_TYPE = 1 << 2; 059 static final int FLAG_TIMESTAMP_IS_DIFF = 1 << 3; 060 static final int MASK_TIMESTAMP_LENGTH = (1 << 4) | (1 << 5) | (1 << 6); 061 static final int SHIFT_TIMESTAMP_LENGTH = 4; 062 static final int FLAG_TIMESTAMP_SIGN = 1 << 7; 063 064 protected static class DiffCompressionState extends CompressionState { 065 long timestamp; 066 byte[] familyNameWithSize; 067 068 @Override 069 protected void readTimestamp(ByteBuffer in) { 070 timestamp = in.getLong(); 071 } 072 073 @Override 074 void copyFrom(CompressionState state) { 075 super.copyFrom(state); 076 DiffCompressionState state2 = (DiffCompressionState) state; 077 timestamp = state2.timestamp; 078 } 079 } 080 081 private void uncompressSingleKeyValue(DataInputStream source, 082 ByteBuffer buffer, 083 DiffCompressionState state) 084 throws IOException, EncoderBufferTooSmallException { 085 // read the column family at the beginning 086 if (state.isFirst()) { 087 state.familyLength = source.readByte(); 088 state.familyNameWithSize = 089 new byte[(state.familyLength & 0xff) + KeyValue.FAMILY_LENGTH_SIZE]; 090 state.familyNameWithSize[0] = state.familyLength; 091 int read = source.read(state.familyNameWithSize, KeyValue.FAMILY_LENGTH_SIZE, 092 state.familyLength); 093 assert read == state.familyLength; 094 } 095 096 // read flag 097 byte flag = source.readByte(); 098 099 // read key/value/common lengths 100 int keyLength; 101 int valueLength; 102 if ((flag & FLAG_SAME_KEY_LENGTH) != 0) { 103 keyLength = state.keyLength; 104 } else { 105 keyLength = ByteBufferUtils.readCompressedInt(source); 106 } 107 if ((flag & FLAG_SAME_VALUE_LENGTH) != 0) { 108 valueLength = state.valueLength; 109 } else { 110 valueLength = ByteBufferUtils.readCompressedInt(source); 111 } 112 int commonPrefix = ByteBufferUtils.readCompressedInt(source); 113 114 // create KeyValue buffer and fill it prefix 115 int keyOffset = buffer.position(); 116 ensureSpace(buffer, keyLength + valueLength + KeyValue.ROW_OFFSET); 117 buffer.putInt(keyLength); 118 buffer.putInt(valueLength); 119 120 // copy common from previous key 121 if (commonPrefix > 0) { 122 ByteBufferUtils.copyFromBufferToBuffer(buffer, buffer, state.prevOffset 123 + KeyValue.ROW_OFFSET, commonPrefix); 124 } 125 126 // copy the rest of the key from the buffer 127 int keyRestLength; 128 if (state.isFirst() || commonPrefix < 129 state.rowLength + KeyValue.ROW_LENGTH_SIZE) { 130 // omit the family part of the key, it is always the same 131 short rowLength; 132 int rowRestLength; 133 134 // check length of row 135 if (commonPrefix < KeyValue.ROW_LENGTH_SIZE) { 136 // not yet copied, do it now 137 ByteBufferUtils.copyFromStreamToBuffer(buffer, source, 138 KeyValue.ROW_LENGTH_SIZE - commonPrefix); 139 ByteBufferUtils.skip(buffer, -KeyValue.ROW_LENGTH_SIZE); 140 rowLength = buffer.getShort(); 141 rowRestLength = rowLength; 142 } else { 143 // already in buffer, just read it 144 rowLength = buffer.getShort(keyOffset + KeyValue.ROW_OFFSET); 145 rowRestLength = rowLength + KeyValue.ROW_LENGTH_SIZE - commonPrefix; 146 } 147 148 // copy the rest of row 149 ByteBufferUtils.copyFromStreamToBuffer(buffer, source, rowRestLength); 150 state.rowLength = rowLength; 151 152 // copy the column family 153 buffer.put(state.familyNameWithSize); 154 155 keyRestLength = keyLength - rowLength - 156 state.familyNameWithSize.length - 157 (KeyValue.ROW_LENGTH_SIZE + KeyValue.TIMESTAMP_TYPE_SIZE); 158 } else { 159 // prevRowWithSizeLength is the same as on previous row 160 keyRestLength = keyLength - commonPrefix - KeyValue.TIMESTAMP_TYPE_SIZE; 161 } 162 // copy the rest of the key, after column family -> column qualifier 163 ByteBufferUtils.copyFromStreamToBuffer(buffer, source, keyRestLength); 164 165 // handle timestamp 166 int timestampFitsInBytes = 167 ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH) + 1; 168 long timestamp = ByteBufferUtils.readLong(source, timestampFitsInBytes); 169 if ((flag & FLAG_TIMESTAMP_SIGN) != 0) { 170 timestamp = -timestamp; 171 } 172 if ((flag & FLAG_TIMESTAMP_IS_DIFF) != 0) { 173 timestamp = state.timestamp - timestamp; 174 } 175 buffer.putLong(timestamp); 176 177 // copy the type field 178 byte type; 179 if ((flag & FLAG_SAME_TYPE) != 0) { 180 type = state.type; 181 } else { 182 type = source.readByte(); 183 } 184 buffer.put(type); 185 186 // copy value part 187 ByteBufferUtils.copyFromStreamToBuffer(buffer, source, valueLength); 188 189 state.keyLength = keyLength; 190 state.valueLength = valueLength; 191 state.prevOffset = keyOffset; 192 state.timestamp = timestamp; 193 state.type = type; 194 // state.qualifier is unused 195 } 196 197 @Override 198 public int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingContext, 199 DataOutputStream out) throws IOException { 200 EncodingState state = encodingContext.getEncodingState(); 201 int size = compressSingleKeyValue(out, cell, state.prevCell); 202 size += afterEncodingKeyValue(cell, out, encodingContext); 203 state.prevCell = cell; 204 return size; 205 } 206 207 private int compressSingleKeyValue(DataOutputStream out, Cell cell, Cell prevCell) 208 throws IOException { 209 int flag = 0; // Do not use more bits that can fit into a byte 210 int kLength = KeyValueUtil.keyLength(cell); 211 int vLength = cell.getValueLength(); 212 213 long timestamp; 214 long diffTimestamp = 0; 215 int diffTimestampFitsInBytes = 0; 216 int timestampFitsInBytes; 217 int commonPrefix = 0; 218 219 if (prevCell == null) { 220 timestamp = cell.getTimestamp(); 221 if (timestamp < 0) { 222 flag |= FLAG_TIMESTAMP_SIGN; 223 timestamp = -timestamp; 224 } 225 timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp); 226 flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH; 227 // put column family 228 byte familyLength = cell.getFamilyLength(); 229 out.write(familyLength); 230 PrivateCellUtil.writeFamily(out, cell, familyLength); 231 } else { 232 // Finding common prefix 233 int preKeyLength = KeyValueUtil.keyLength(prevCell); 234 commonPrefix = PrivateCellUtil.findCommonPrefixInFlatKey(cell, prevCell, true, false); 235 if (kLength == preKeyLength) { 236 flag |= FLAG_SAME_KEY_LENGTH; 237 } 238 if (vLength == prevCell.getValueLength()) { 239 flag |= FLAG_SAME_VALUE_LENGTH; 240 } 241 if (cell.getTypeByte() == prevCell.getTypeByte()) { 242 flag |= FLAG_SAME_TYPE; 243 } 244 // don't compress timestamp and type using prefix encode timestamp 245 timestamp = cell.getTimestamp(); 246 diffTimestamp = prevCell.getTimestamp() - timestamp; 247 boolean negativeTimestamp = timestamp < 0; 248 if (negativeTimestamp) { 249 timestamp = -timestamp; 250 } 251 timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp); 252 boolean minusDiffTimestamp = diffTimestamp < 0; 253 if (minusDiffTimestamp) { 254 diffTimestamp = -diffTimestamp; 255 } 256 diffTimestampFitsInBytes = ByteBufferUtils.longFitsIn(diffTimestamp); 257 if (diffTimestampFitsInBytes < timestampFitsInBytes) { 258 flag |= (diffTimestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH; 259 flag |= FLAG_TIMESTAMP_IS_DIFF; 260 if (minusDiffTimestamp) { 261 flag |= FLAG_TIMESTAMP_SIGN; 262 } 263 } else { 264 flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH; 265 if (negativeTimestamp) { 266 flag |= FLAG_TIMESTAMP_SIGN; 267 } 268 } 269 } 270 out.write(flag); 271 if ((flag & FLAG_SAME_KEY_LENGTH) == 0) { 272 ByteBufferUtils.putCompressedInt(out, kLength); 273 } 274 if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) { 275 ByteBufferUtils.putCompressedInt(out, vLength); 276 } 277 ByteBufferUtils.putCompressedInt(out, commonPrefix); 278 short rLen = cell.getRowLength(); 279 if (commonPrefix < rLen + KeyValue.ROW_LENGTH_SIZE) { 280 // Previous and current rows are different. Copy the differing part of 281 // the row, skip the column family, and copy the qualifier. 282 PrivateCellUtil.writeRowKeyExcludingCommon(cell, rLen, commonPrefix, out); 283 PrivateCellUtil.writeQualifier(out, cell, cell.getQualifierLength()); 284 } else { 285 // The common part includes the whole row. As the column family is the 286 // same across the whole file, it will automatically be included in the 287 // common prefix, so we need not special-case it here. 288 // What we write here is the non common part of the qualifier 289 int commonQualPrefix = commonPrefix - (rLen + KeyValue.ROW_LENGTH_SIZE) 290 - (cell.getFamilyLength() + KeyValue.FAMILY_LENGTH_SIZE); 291 PrivateCellUtil.writeQualifierSkippingBytes(out, cell, cell.getQualifierLength(), 292 commonQualPrefix); 293 } 294 if ((flag & FLAG_TIMESTAMP_IS_DIFF) == 0) { 295 ByteBufferUtils.putLong(out, timestamp, timestampFitsInBytes); 296 } else { 297 ByteBufferUtils.putLong(out, diffTimestamp, diffTimestampFitsInBytes); 298 } 299 300 if ((flag & FLAG_SAME_TYPE) == 0) { 301 out.write(cell.getTypeByte()); 302 } 303 PrivateCellUtil.writeValue(out, cell, vLength); 304 return kLength + vLength + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE; 305 } 306 307 @Override 308 public Cell getFirstKeyCellInBlock(ByteBuff block) { 309 block.mark(); 310 block.position(Bytes.SIZEOF_INT); 311 byte familyLength = block.get(); 312 block.skip(familyLength); 313 byte flag = block.get(); 314 int keyLength = ByteBuff.readCompressedInt(block); 315 // TODO : See if we can avoid these reads as the read values are not getting used 316 ByteBuff.readCompressedInt(block); // valueLength 317 ByteBuff.readCompressedInt(block); // commonLength 318 ByteBuffer result = ByteBuffer.allocate(keyLength); 319 320 // copy row 321 assert !(result.isDirect()); 322 int pos = result.arrayOffset(); 323 block.get(result.array(), pos, Bytes.SIZEOF_SHORT); 324 pos += Bytes.SIZEOF_SHORT; 325 short rowLength = result.getShort(); 326 block.get(result.array(), pos, rowLength); 327 pos += rowLength; 328 329 // copy family 330 int savePosition = block.position(); 331 block.position(Bytes.SIZEOF_INT); 332 block.get(result.array(), pos, familyLength + Bytes.SIZEOF_BYTE); 333 pos += familyLength + Bytes.SIZEOF_BYTE; 334 335 // copy qualifier 336 block.position(savePosition); 337 int qualifierLength = 338 keyLength - pos + result.arrayOffset() - KeyValue.TIMESTAMP_TYPE_SIZE; 339 block.get(result.array(), pos, qualifierLength); 340 pos += qualifierLength; 341 342 // copy the timestamp and type 343 int timestampFitInBytes = 344 ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH) + 1; 345 long timestamp = ByteBuff.readLong(block, timestampFitInBytes); 346 if ((flag & FLAG_TIMESTAMP_SIGN) != 0) { 347 timestamp = -timestamp; 348 } 349 result.putLong(pos, timestamp); 350 pos += Bytes.SIZEOF_LONG; 351 block.get(result.array(), pos, Bytes.SIZEOF_BYTE); 352 353 block.reset(); 354 // The result is already a BB. So always we will create a KeyOnlyKv. 355 return new KeyValue.KeyOnlyKeyValue(result.array(), 0, keyLength); 356 } 357 358 @Override 359 public String toString() { 360 return DiffKeyDeltaEncoder.class.getSimpleName(); 361 } 362 363 protected static class DiffSeekerState extends SeekerState { 364 365 private int rowLengthWithSize; 366 private long timestamp; 367 368 public DiffSeekerState(ObjectIntPair<ByteBuffer> tmpPair, 369 boolean includeTags) { 370 super(tmpPair, includeTags); 371 } 372 373 @Override 374 protected void copyFromNext(SeekerState that) { 375 super.copyFromNext(that); 376 DiffSeekerState other = (DiffSeekerState) that; 377 rowLengthWithSize = other.rowLengthWithSize; 378 timestamp = other.timestamp; 379 } 380 } 381 382 @Override 383 public EncodedSeeker createSeeker(HFileBlockDecodingContext decodingCtx) { 384 return new DiffSeekerStateBufferedEncodedSeeker(decodingCtx); 385 } 386 387 @Override 388 protected ByteBuffer internalDecodeKeyValues(DataInputStream source, int allocateHeaderLength, 389 int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx) throws IOException { 390 int decompressedSize = source.readInt(); 391 ByteBuffer buffer = ByteBuffer.allocate(decompressedSize + 392 allocateHeaderLength); 393 buffer.position(allocateHeaderLength); 394 DiffCompressionState state = new DiffCompressionState(); 395 while (source.available() > skipLastBytes) { 396 uncompressSingleKeyValue(source, buffer, state); 397 afterDecodingKeyValue(source, buffer, decodingCtx); 398 } 399 400 if (source.available() != skipLastBytes) { 401 throw new IllegalStateException("Read too much bytes."); 402 } 403 404 return buffer; 405 } 406 407 private static class DiffSeekerStateBufferedEncodedSeeker 408 extends BufferedEncodedSeeker<DiffSeekerState> { 409 private byte[] familyNameWithSize; 410 private static final int TIMESTAMP_WITH_TYPE_LENGTH = 411 Bytes.SIZEOF_LONG + Bytes.SIZEOF_BYTE; 412 413 private DiffSeekerStateBufferedEncodedSeeker(HFileBlockDecodingContext decodingCtx) { 414 super(decodingCtx); 415 } 416 417 private void decode(boolean isFirst) { 418 byte flag = currentBuffer.get(); 419 byte type = 0; 420 if ((flag & FLAG_SAME_KEY_LENGTH) == 0) { 421 if (!isFirst) { 422 type = current.keyBuffer[current.keyLength - Bytes.SIZEOF_BYTE]; 423 } 424 current.keyLength = ByteBuff.readCompressedInt(currentBuffer); 425 } 426 if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) { 427 current.valueLength = ByteBuff.readCompressedInt(currentBuffer); 428 } 429 current.lastCommonPrefix = ByteBuff.readCompressedInt(currentBuffer); 430 431 current.ensureSpaceForKey(); 432 433 if (current.lastCommonPrefix < Bytes.SIZEOF_SHORT) { 434 // length of row is different, copy everything except family 435 436 // copy the row size 437 currentBuffer.get(current.keyBuffer, current.lastCommonPrefix, 438 Bytes.SIZEOF_SHORT - current.lastCommonPrefix); 439 current.rowLengthWithSize = Bytes.toShort(current.keyBuffer, 0) + 440 Bytes.SIZEOF_SHORT; 441 442 // copy the rest of row 443 currentBuffer.get(current.keyBuffer, Bytes.SIZEOF_SHORT, 444 current.rowLengthWithSize - Bytes.SIZEOF_SHORT); 445 446 // copy the column family 447 System.arraycopy(familyNameWithSize, 0, current.keyBuffer, 448 current.rowLengthWithSize, familyNameWithSize.length); 449 450 // copy the qualifier 451 currentBuffer.get(current.keyBuffer, 452 current.rowLengthWithSize + familyNameWithSize.length, 453 current.keyLength - current.rowLengthWithSize - 454 familyNameWithSize.length - TIMESTAMP_WITH_TYPE_LENGTH); 455 } else if (current.lastCommonPrefix < current.rowLengthWithSize) { 456 // we have to copy part of row and qualifier, 457 // but column family is in right place 458 459 // before column family (rest of row) 460 currentBuffer.get(current.keyBuffer, current.lastCommonPrefix, 461 current.rowLengthWithSize - current.lastCommonPrefix); 462 463 // after column family (qualifier) 464 currentBuffer.get(current.keyBuffer, 465 current.rowLengthWithSize + familyNameWithSize.length, 466 current.keyLength - current.rowLengthWithSize - 467 familyNameWithSize.length - TIMESTAMP_WITH_TYPE_LENGTH); 468 } else { 469 // copy just the ending 470 currentBuffer.get(current.keyBuffer, current.lastCommonPrefix, 471 current.keyLength - TIMESTAMP_WITH_TYPE_LENGTH - 472 current.lastCommonPrefix); 473 } 474 475 // timestamp 476 int pos = current.keyLength - TIMESTAMP_WITH_TYPE_LENGTH; 477 int timestampFitInBytes = 1 + 478 ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH); 479 long timestampOrDiff = ByteBuff.readLong(currentBuffer, timestampFitInBytes); 480 if ((flag & FLAG_TIMESTAMP_SIGN) != 0) { 481 timestampOrDiff = -timestampOrDiff; 482 } 483 if ((flag & FLAG_TIMESTAMP_IS_DIFF) == 0) { // it is timestamp 484 current.timestamp = timestampOrDiff; 485 } else { // it is diff 486 current.timestamp = current.timestamp - timestampOrDiff; 487 } 488 Bytes.putLong(current.keyBuffer, pos, current.timestamp); 489 pos += Bytes.SIZEOF_LONG; 490 491 // type 492 if ((flag & FLAG_SAME_TYPE) == 0) { 493 currentBuffer.get(current.keyBuffer, pos, Bytes.SIZEOF_BYTE); 494 } else if ((flag & FLAG_SAME_KEY_LENGTH) == 0) { 495 current.keyBuffer[pos] = type; 496 } 497 498 current.valueOffset = currentBuffer.position(); 499 currentBuffer.skip(current.valueLength); 500 501 if (includesTags()) { 502 decodeTags(); 503 } 504 if (includesMvcc()) { 505 current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer); 506 } else { 507 current.memstoreTS = 0; 508 } 509 current.nextKvOffset = currentBuffer.position(); 510 } 511 512 @Override 513 protected void decodeFirst() { 514 currentBuffer.skip(Bytes.SIZEOF_INT); 515 516 // read column family 517 byte familyNameLength = currentBuffer.get(); 518 familyNameWithSize = new byte[familyNameLength + Bytes.SIZEOF_BYTE]; 519 familyNameWithSize[0] = familyNameLength; 520 currentBuffer.get(familyNameWithSize, Bytes.SIZEOF_BYTE, 521 familyNameLength); 522 decode(true); 523 } 524 525 @Override 526 protected void decodeNext() { 527 decode(false); 528 } 529 530 @Override 531 protected DiffSeekerState createSeekerState() { 532 return new DiffSeekerState(this.tmpPair, this.includesTags()); 533 } 534 } 535}