001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with this 004 * work for additional information regarding copyright ownership. The ASF 005 * licenses this file to you under the Apache License, Version 2.0 (the 006 * "License"); you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 013 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 014 * License for the specific language governing permissions and limitations 015 * under the License. 016 */ 017package org.apache.hadoop.hbase.io.encoding; 018 019import java.io.DataInputStream; 020import java.io.DataOutputStream; 021import java.io.IOException; 022import java.nio.ByteBuffer; 023 024import org.apache.hadoop.hbase.Cell; 025import org.apache.hadoop.hbase.CellComparator; 026import org.apache.hadoop.hbase.KeyValue; 027import org.apache.hadoop.hbase.KeyValueUtil; 028import org.apache.hadoop.hbase.PrivateCellUtil; 029import org.apache.hadoop.hbase.nio.ByteBuff; 030import org.apache.hadoop.hbase.util.ByteBufferUtils; 031import org.apache.hadoop.hbase.util.Bytes; 032import org.apache.hadoop.hbase.util.ObjectIntPair; 033import org.apache.yetus.audience.InterfaceAudience; 034 035/** 036 * Compress using: 037 * - store size of common prefix 038 * - save column family once, it is same within HFile 039 * - use integer compression for key, value and prefix (7-bit encoding) 040 * - use bits to avoid duplication key length, value length 041 * and type if it same as previous 042 * - store in 3 bits length of timestamp field 043 * - allow diff in timestamp instead of actual value 044 * 045 * Format: 046 * - 1 byte: flag 047 * - 1-5 bytes: key length (only if FLAG_SAME_KEY_LENGTH is not set in flag) 048 * - 1-5 bytes: value length (only if FLAG_SAME_VALUE_LENGTH is not set in flag) 049 * - 1-5 bytes: prefix length 050 * - ... bytes: rest of the row (if prefix length is small enough) 051 * - ... bytes: qualifier (or suffix depending on prefix length) 052 * - 1-8 bytes: timestamp or diff 053 * - 1 byte: type (only if FLAG_SAME_TYPE is not set in the flag) 054 * - ... bytes: value 055 */ 056@InterfaceAudience.Private 057public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder { 058 static final int FLAG_SAME_KEY_LENGTH = 1; 059 static final int FLAG_SAME_VALUE_LENGTH = 1 << 1; 060 static final int FLAG_SAME_TYPE = 1 << 2; 061 static final int FLAG_TIMESTAMP_IS_DIFF = 1 << 3; 062 static final int MASK_TIMESTAMP_LENGTH = (1 << 4) | (1 << 5) | (1 << 6); 063 static final int SHIFT_TIMESTAMP_LENGTH = 4; 064 static final int FLAG_TIMESTAMP_SIGN = 1 << 7; 065 066 protected static class DiffCompressionState extends CompressionState { 067 long timestamp; 068 byte[] familyNameWithSize; 069 070 @Override 071 protected void readTimestamp(ByteBuffer in) { 072 timestamp = in.getLong(); 073 } 074 075 @Override 076 void copyFrom(CompressionState state) { 077 super.copyFrom(state); 078 DiffCompressionState state2 = (DiffCompressionState) state; 079 timestamp = state2.timestamp; 080 } 081 } 082 083 private void uncompressSingleKeyValue(DataInputStream source, 084 ByteBuffer buffer, 085 DiffCompressionState state) 086 throws IOException, EncoderBufferTooSmallException { 087 // read the column family at the beginning 088 if (state.isFirst()) { 089 state.familyLength = source.readByte(); 090 state.familyNameWithSize = 091 new byte[(state.familyLength & 0xff) + KeyValue.FAMILY_LENGTH_SIZE]; 092 state.familyNameWithSize[0] = state.familyLength; 093 int read = source.read(state.familyNameWithSize, KeyValue.FAMILY_LENGTH_SIZE, 094 state.familyLength); 095 assert read == state.familyLength; 096 } 097 098 // read flag 099 byte flag = source.readByte(); 100 101 // read key/value/common lengths 102 int keyLength; 103 int valueLength; 104 if ((flag & FLAG_SAME_KEY_LENGTH) != 0) { 105 keyLength = state.keyLength; 106 } else { 107 keyLength = ByteBufferUtils.readCompressedInt(source); 108 } 109 if ((flag & FLAG_SAME_VALUE_LENGTH) != 0) { 110 valueLength = state.valueLength; 111 } else { 112 valueLength = ByteBufferUtils.readCompressedInt(source); 113 } 114 int commonPrefix = ByteBufferUtils.readCompressedInt(source); 115 116 // create KeyValue buffer and fill it prefix 117 int keyOffset = buffer.position(); 118 ensureSpace(buffer, keyLength + valueLength + KeyValue.ROW_OFFSET); 119 buffer.putInt(keyLength); 120 buffer.putInt(valueLength); 121 122 // copy common from previous key 123 if (commonPrefix > 0) { 124 ByteBufferUtils.copyFromBufferToBuffer(buffer, buffer, state.prevOffset 125 + KeyValue.ROW_OFFSET, commonPrefix); 126 } 127 128 // copy the rest of the key from the buffer 129 int keyRestLength; 130 if (state.isFirst() || commonPrefix < 131 state.rowLength + KeyValue.ROW_LENGTH_SIZE) { 132 // omit the family part of the key, it is always the same 133 short rowLength; 134 int rowRestLength; 135 136 // check length of row 137 if (commonPrefix < KeyValue.ROW_LENGTH_SIZE) { 138 // not yet copied, do it now 139 ByteBufferUtils.copyFromStreamToBuffer(buffer, source, 140 KeyValue.ROW_LENGTH_SIZE - commonPrefix); 141 ByteBufferUtils.skip(buffer, -KeyValue.ROW_LENGTH_SIZE); 142 rowLength = buffer.getShort(); 143 rowRestLength = rowLength; 144 } else { 145 // already in buffer, just read it 146 rowLength = buffer.getShort(keyOffset + KeyValue.ROW_OFFSET); 147 rowRestLength = rowLength + KeyValue.ROW_LENGTH_SIZE - commonPrefix; 148 } 149 150 // copy the rest of row 151 ByteBufferUtils.copyFromStreamToBuffer(buffer, source, rowRestLength); 152 state.rowLength = rowLength; 153 154 // copy the column family 155 buffer.put(state.familyNameWithSize); 156 157 keyRestLength = keyLength - rowLength - 158 state.familyNameWithSize.length - 159 (KeyValue.ROW_LENGTH_SIZE + KeyValue.TIMESTAMP_TYPE_SIZE); 160 } else { 161 // prevRowWithSizeLength is the same as on previous row 162 keyRestLength = keyLength - commonPrefix - KeyValue.TIMESTAMP_TYPE_SIZE; 163 } 164 // copy the rest of the key, after column family -> column qualifier 165 ByteBufferUtils.copyFromStreamToBuffer(buffer, source, keyRestLength); 166 167 // handle timestamp 168 int timestampFitsInBytes = 169 ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH) + 1; 170 long timestamp = ByteBufferUtils.readLong(source, timestampFitsInBytes); 171 if ((flag & FLAG_TIMESTAMP_SIGN) != 0) { 172 timestamp = -timestamp; 173 } 174 if ((flag & FLAG_TIMESTAMP_IS_DIFF) != 0) { 175 timestamp = state.timestamp - timestamp; 176 } 177 buffer.putLong(timestamp); 178 179 // copy the type field 180 byte type; 181 if ((flag & FLAG_SAME_TYPE) != 0) { 182 type = state.type; 183 } else { 184 type = source.readByte(); 185 } 186 buffer.put(type); 187 188 // copy value part 189 ByteBufferUtils.copyFromStreamToBuffer(buffer, source, valueLength); 190 191 state.keyLength = keyLength; 192 state.valueLength = valueLength; 193 state.prevOffset = keyOffset; 194 state.timestamp = timestamp; 195 state.type = type; 196 // state.qualifier is unused 197 } 198 199 @Override 200 public int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingContext, 201 DataOutputStream out) throws IOException { 202 EncodingState state = encodingContext.getEncodingState(); 203 int size = compressSingleKeyValue(out, cell, state.prevCell); 204 size += afterEncodingKeyValue(cell, out, encodingContext); 205 state.prevCell = cell; 206 return size; 207 } 208 209 private int compressSingleKeyValue(DataOutputStream out, Cell cell, Cell prevCell) 210 throws IOException { 211 int flag = 0; // Do not use more bits that can fit into a byte 212 int kLength = KeyValueUtil.keyLength(cell); 213 int vLength = cell.getValueLength(); 214 215 long timestamp; 216 long diffTimestamp = 0; 217 int diffTimestampFitsInBytes = 0; 218 int timestampFitsInBytes; 219 int commonPrefix = 0; 220 221 if (prevCell == null) { 222 timestamp = cell.getTimestamp(); 223 if (timestamp < 0) { 224 flag |= FLAG_TIMESTAMP_SIGN; 225 timestamp = -timestamp; 226 } 227 timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp); 228 flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH; 229 // put column family 230 byte familyLength = cell.getFamilyLength(); 231 out.write(familyLength); 232 PrivateCellUtil.writeFamily(out, cell, familyLength); 233 } else { 234 // Finding common prefix 235 int preKeyLength = KeyValueUtil.keyLength(prevCell); 236 commonPrefix = PrivateCellUtil.findCommonPrefixInFlatKey(cell, prevCell, true, false); 237 if (kLength == preKeyLength) { 238 flag |= FLAG_SAME_KEY_LENGTH; 239 } 240 if (vLength == prevCell.getValueLength()) { 241 flag |= FLAG_SAME_VALUE_LENGTH; 242 } 243 if (cell.getTypeByte() == prevCell.getTypeByte()) { 244 flag |= FLAG_SAME_TYPE; 245 } 246 // don't compress timestamp and type using prefix encode timestamp 247 timestamp = cell.getTimestamp(); 248 diffTimestamp = prevCell.getTimestamp() - timestamp; 249 boolean negativeTimestamp = timestamp < 0; 250 if (negativeTimestamp) { 251 timestamp = -timestamp; 252 } 253 timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp); 254 boolean minusDiffTimestamp = diffTimestamp < 0; 255 if (minusDiffTimestamp) { 256 diffTimestamp = -diffTimestamp; 257 } 258 diffTimestampFitsInBytes = ByteBufferUtils.longFitsIn(diffTimestamp); 259 if (diffTimestampFitsInBytes < timestampFitsInBytes) { 260 flag |= (diffTimestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH; 261 flag |= FLAG_TIMESTAMP_IS_DIFF; 262 if (minusDiffTimestamp) { 263 flag |= FLAG_TIMESTAMP_SIGN; 264 } 265 } else { 266 flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH; 267 if (negativeTimestamp) { 268 flag |= FLAG_TIMESTAMP_SIGN; 269 } 270 } 271 } 272 out.write(flag); 273 if ((flag & FLAG_SAME_KEY_LENGTH) == 0) { 274 ByteBufferUtils.putCompressedInt(out, kLength); 275 } 276 if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) { 277 ByteBufferUtils.putCompressedInt(out, vLength); 278 } 279 ByteBufferUtils.putCompressedInt(out, commonPrefix); 280 short rLen = cell.getRowLength(); 281 if (commonPrefix < rLen + KeyValue.ROW_LENGTH_SIZE) { 282 // Previous and current rows are different. Copy the differing part of 283 // the row, skip the column family, and copy the qualifier. 284 PrivateCellUtil.writeRowKeyExcludingCommon(cell, rLen, commonPrefix, out); 285 PrivateCellUtil.writeQualifier(out, cell, cell.getQualifierLength()); 286 } else { 287 // The common part includes the whole row. As the column family is the 288 // same across the whole file, it will automatically be included in the 289 // common prefix, so we need not special-case it here. 290 // What we write here is the non common part of the qualifier 291 int commonQualPrefix = commonPrefix - (rLen + KeyValue.ROW_LENGTH_SIZE) 292 - (cell.getFamilyLength() + KeyValue.FAMILY_LENGTH_SIZE); 293 PrivateCellUtil.writeQualifierSkippingBytes(out, cell, cell.getQualifierLength(), 294 commonQualPrefix); 295 } 296 if ((flag & FLAG_TIMESTAMP_IS_DIFF) == 0) { 297 ByteBufferUtils.putLong(out, timestamp, timestampFitsInBytes); 298 } else { 299 ByteBufferUtils.putLong(out, diffTimestamp, diffTimestampFitsInBytes); 300 } 301 302 if ((flag & FLAG_SAME_TYPE) == 0) { 303 out.write(cell.getTypeByte()); 304 } 305 PrivateCellUtil.writeValue(out, cell, vLength); 306 return kLength + vLength + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE; 307 } 308 309 @Override 310 public Cell getFirstKeyCellInBlock(ByteBuff block) { 311 block.mark(); 312 block.position(Bytes.SIZEOF_INT); 313 byte familyLength = block.get(); 314 block.skip(familyLength); 315 byte flag = block.get(); 316 int keyLength = ByteBuff.readCompressedInt(block); 317 // TODO : See if we can avoid these reads as the read values are not getting used 318 ByteBuff.readCompressedInt(block); // valueLength 319 ByteBuff.readCompressedInt(block); // commonLength 320 ByteBuffer result = ByteBuffer.allocate(keyLength); 321 322 // copy row 323 assert !(result.isDirect()); 324 int pos = result.arrayOffset(); 325 block.get(result.array(), pos, Bytes.SIZEOF_SHORT); 326 pos += Bytes.SIZEOF_SHORT; 327 short rowLength = result.getShort(); 328 block.get(result.array(), pos, rowLength); 329 pos += rowLength; 330 331 // copy family 332 int savePosition = block.position(); 333 block.position(Bytes.SIZEOF_INT); 334 block.get(result.array(), pos, familyLength + Bytes.SIZEOF_BYTE); 335 pos += familyLength + Bytes.SIZEOF_BYTE; 336 337 // copy qualifier 338 block.position(savePosition); 339 int qualifierLength = 340 keyLength - pos + result.arrayOffset() - KeyValue.TIMESTAMP_TYPE_SIZE; 341 block.get(result.array(), pos, qualifierLength); 342 pos += qualifierLength; 343 344 // copy the timestamp and type 345 int timestampFitInBytes = 346 ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH) + 1; 347 long timestamp = ByteBuff.readLong(block, timestampFitInBytes); 348 if ((flag & FLAG_TIMESTAMP_SIGN) != 0) { 349 timestamp = -timestamp; 350 } 351 result.putLong(pos, timestamp); 352 pos += Bytes.SIZEOF_LONG; 353 block.get(result.array(), pos, Bytes.SIZEOF_BYTE); 354 355 block.reset(); 356 // The result is already a BB. So always we will create a KeyOnlyKv. 357 return new KeyValue.KeyOnlyKeyValue(result.array(), 0, keyLength); 358 } 359 360 @Override 361 public String toString() { 362 return DiffKeyDeltaEncoder.class.getSimpleName(); 363 } 364 365 protected static class DiffSeekerState extends SeekerState { 366 367 private int rowLengthWithSize; 368 private long timestamp; 369 370 public DiffSeekerState(ObjectIntPair<ByteBuffer> tmpPair, 371 boolean includeTags) { 372 super(tmpPair, includeTags); 373 } 374 375 @Override 376 protected void copyFromNext(SeekerState that) { 377 super.copyFromNext(that); 378 DiffSeekerState other = (DiffSeekerState) that; 379 rowLengthWithSize = other.rowLengthWithSize; 380 timestamp = other.timestamp; 381 } 382 } 383 384 @Override 385 public EncodedSeeker createSeeker(CellComparator comparator, 386 HFileBlockDecodingContext decodingCtx) { 387 return new BufferedEncodedSeeker<DiffSeekerState>(comparator, decodingCtx) { 388 private byte[] familyNameWithSize; 389 private static final int TIMESTAMP_WITH_TYPE_LENGTH = 390 Bytes.SIZEOF_LONG + Bytes.SIZEOF_BYTE; 391 392 private void decode(boolean isFirst) { 393 byte flag = currentBuffer.get(); 394 byte type = 0; 395 if ((flag & FLAG_SAME_KEY_LENGTH) == 0) { 396 if (!isFirst) { 397 type = current.keyBuffer[current.keyLength - Bytes.SIZEOF_BYTE]; 398 } 399 current.keyLength = ByteBuff.readCompressedInt(currentBuffer); 400 } 401 if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) { 402 current.valueLength = ByteBuff.readCompressedInt(currentBuffer); 403 } 404 current.lastCommonPrefix = ByteBuff.readCompressedInt(currentBuffer); 405 406 current.ensureSpaceForKey(); 407 408 if (current.lastCommonPrefix < Bytes.SIZEOF_SHORT) { 409 // length of row is different, copy everything except family 410 411 // copy the row size 412 currentBuffer.get(current.keyBuffer, current.lastCommonPrefix, 413 Bytes.SIZEOF_SHORT - current.lastCommonPrefix); 414 current.rowLengthWithSize = Bytes.toShort(current.keyBuffer, 0) + 415 Bytes.SIZEOF_SHORT; 416 417 // copy the rest of row 418 currentBuffer.get(current.keyBuffer, Bytes.SIZEOF_SHORT, 419 current.rowLengthWithSize - Bytes.SIZEOF_SHORT); 420 421 // copy the column family 422 System.arraycopy(familyNameWithSize, 0, current.keyBuffer, 423 current.rowLengthWithSize, familyNameWithSize.length); 424 425 // copy the qualifier 426 currentBuffer.get(current.keyBuffer, 427 current.rowLengthWithSize + familyNameWithSize.length, 428 current.keyLength - current.rowLengthWithSize - 429 familyNameWithSize.length - TIMESTAMP_WITH_TYPE_LENGTH); 430 } else if (current.lastCommonPrefix < current.rowLengthWithSize) { 431 // we have to copy part of row and qualifier, 432 // but column family is in right place 433 434 // before column family (rest of row) 435 currentBuffer.get(current.keyBuffer, current.lastCommonPrefix, 436 current.rowLengthWithSize - current.lastCommonPrefix); 437 438 // after column family (qualifier) 439 currentBuffer.get(current.keyBuffer, 440 current.rowLengthWithSize + familyNameWithSize.length, 441 current.keyLength - current.rowLengthWithSize - 442 familyNameWithSize.length - TIMESTAMP_WITH_TYPE_LENGTH); 443 } else { 444 // copy just the ending 445 currentBuffer.get(current.keyBuffer, current.lastCommonPrefix, 446 current.keyLength - TIMESTAMP_WITH_TYPE_LENGTH - 447 current.lastCommonPrefix); 448 } 449 450 // timestamp 451 int pos = current.keyLength - TIMESTAMP_WITH_TYPE_LENGTH; 452 int timestampFitInBytes = 1 + 453 ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH); 454 long timestampOrDiff = ByteBuff.readLong(currentBuffer, timestampFitInBytes); 455 if ((flag & FLAG_TIMESTAMP_SIGN) != 0) { 456 timestampOrDiff = -timestampOrDiff; 457 } 458 if ((flag & FLAG_TIMESTAMP_IS_DIFF) == 0) { // it is timestamp 459 current.timestamp = timestampOrDiff; 460 } else { // it is diff 461 current.timestamp = current.timestamp - timestampOrDiff; 462 } 463 Bytes.putLong(current.keyBuffer, pos, current.timestamp); 464 pos += Bytes.SIZEOF_LONG; 465 466 // type 467 if ((flag & FLAG_SAME_TYPE) == 0) { 468 currentBuffer.get(current.keyBuffer, pos, Bytes.SIZEOF_BYTE); 469 } else if ((flag & FLAG_SAME_KEY_LENGTH) == 0) { 470 current.keyBuffer[pos] = type; 471 } 472 473 current.valueOffset = currentBuffer.position(); 474 currentBuffer.skip(current.valueLength); 475 476 if (includesTags()) { 477 decodeTags(); 478 } 479 if (includesMvcc()) { 480 current.memstoreTS = ByteBuff.readVLong(currentBuffer); 481 } else { 482 current.memstoreTS = 0; 483 } 484 current.nextKvOffset = currentBuffer.position(); 485 } 486 487 @Override 488 protected void decodeFirst() { 489 currentBuffer.skip(Bytes.SIZEOF_INT); 490 491 // read column family 492 byte familyNameLength = currentBuffer.get(); 493 familyNameWithSize = new byte[familyNameLength + Bytes.SIZEOF_BYTE]; 494 familyNameWithSize[0] = familyNameLength; 495 currentBuffer.get(familyNameWithSize, Bytes.SIZEOF_BYTE, 496 familyNameLength); 497 decode(true); 498 } 499 500 @Override 501 protected void decodeNext() { 502 decode(false); 503 } 504 505 @Override 506 protected DiffSeekerState createSeekerState() { 507 return new DiffSeekerState(this.tmpPair, this.includesTags()); 508 } 509 }; 510 } 511 512 @Override 513 protected ByteBuffer internalDecodeKeyValues(DataInputStream source, int allocateHeaderLength, 514 int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx) throws IOException { 515 int decompressedSize = source.readInt(); 516 ByteBuffer buffer = ByteBuffer.allocate(decompressedSize + 517 allocateHeaderLength); 518 buffer.position(allocateHeaderLength); 519 DiffCompressionState state = new DiffCompressionState(); 520 while (source.available() > skipLastBytes) { 521 uncompressSingleKeyValue(source, buffer, state); 522 afterDecodingKeyValue(source, buffer, decodingCtx); 523 } 524 525 if (source.available() != skipLastBytes) { 526 throw new IllegalStateException("Read too much bytes."); 527 } 528 529 return buffer; 530 } 531}