001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.encoding; 019 020import java.io.DataInputStream; 021import java.io.DataOutputStream; 022import java.io.IOException; 023import java.io.OutputStream; 024import java.nio.ByteBuffer; 025import org.apache.hadoop.hbase.ByteBufferExtendedCell; 026import org.apache.hadoop.hbase.Cell; 027import org.apache.hadoop.hbase.CellComparator; 028import org.apache.hadoop.hbase.CellUtil; 029import org.apache.hadoop.hbase.ExtendedCell; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.KeyValue; 032import org.apache.hadoop.hbase.KeyValue.Type; 033import org.apache.hadoop.hbase.KeyValueUtil; 034import org.apache.hadoop.hbase.PrivateCellUtil; 035import org.apache.hadoop.hbase.io.TagCompressionContext; 036import org.apache.hadoop.hbase.io.util.LRUDictionary; 037import org.apache.hadoop.hbase.io.util.StreamUtils; 038import org.apache.hadoop.hbase.nio.ByteBuff; 039import org.apache.hadoop.hbase.util.ByteBufferUtils; 040import org.apache.hadoop.hbase.util.Bytes; 041import org.apache.hadoop.hbase.util.ClassSize; 042import org.apache.hadoop.hbase.util.ObjectIntPair; 043import org.apache.hadoop.io.WritableUtils; 044import org.apache.yetus.audience.InterfaceAudience; 045 046/** 047 * Base class for all data block encoders that use a buffer. 048 */ 049@InterfaceAudience.Private 050abstract class BufferedDataBlockEncoder extends AbstractDataBlockEncoder { 051 /** 052 * TODO: This datablockencoder is dealing in internals of hfileblocks. Purge reference to HFBs 053 */ 054 private static int INITIAL_KEY_BUFFER_SIZE = 512; 055 056 @Override 057 public ByteBuffer decodeKeyValues(DataInputStream source, 058 HFileBlockDecodingContext blkDecodingCtx) throws IOException { 059 if (blkDecodingCtx.getClass() != HFileBlockDefaultDecodingContext.class) { 060 throw new IOException(this.getClass().getName() + " only accepts " 061 + HFileBlockDefaultDecodingContext.class.getName() + " as the decoding context."); 062 } 063 064 HFileBlockDefaultDecodingContext decodingCtx = 065 (HFileBlockDefaultDecodingContext) blkDecodingCtx; 066 if ( 067 decodingCtx.getHFileContext().isIncludesTags() 068 && decodingCtx.getHFileContext().isCompressTags() 069 ) { 070 if (decodingCtx.getTagCompressionContext() != null) { 071 // It will be overhead to create the TagCompressionContext again and again for every block 072 // decoding. 073 decodingCtx.getTagCompressionContext().clear(); 074 } else { 075 try { 076 TagCompressionContext tagCompressionContext = 077 new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE); 078 decodingCtx.setTagCompressionContext(tagCompressionContext); 079 } catch (Exception e) { 080 throw new IOException("Failed to initialize TagCompressionContext", e); 081 } 082 } 083 } 084 return internalDecodeKeyValues(source, 0, 0, decodingCtx); 085 } 086 087 /********************* common prefixes *************************/ 088 // Having this as static is fine but if META is having DBE then we should 089 // change this. 090 public static int compareCommonRowPrefix(Cell left, Cell right, int rowCommonPrefix) { 091 return Bytes.compareTo(left.getRowArray(), left.getRowOffset() + rowCommonPrefix, 092 left.getRowLength() - rowCommonPrefix, right.getRowArray(), 093 right.getRowOffset() + rowCommonPrefix, right.getRowLength() - rowCommonPrefix); 094 } 095 096 public static int compareCommonFamilyPrefix(Cell left, Cell right, int familyCommonPrefix) { 097 return Bytes.compareTo(left.getFamilyArray(), left.getFamilyOffset() + familyCommonPrefix, 098 left.getFamilyLength() - familyCommonPrefix, right.getFamilyArray(), 099 right.getFamilyOffset() + familyCommonPrefix, right.getFamilyLength() - familyCommonPrefix); 100 } 101 102 public static int compareCommonQualifierPrefix(Cell left, Cell right, int qualCommonPrefix) { 103 return Bytes.compareTo(left.getQualifierArray(), left.getQualifierOffset() + qualCommonPrefix, 104 left.getQualifierLength() - qualCommonPrefix, right.getQualifierArray(), 105 right.getQualifierOffset() + qualCommonPrefix, right.getQualifierLength() - qualCommonPrefix); 106 } 107 108 protected static class SeekerState { 109 protected ByteBuff currentBuffer; 110 protected TagCompressionContext tagCompressionContext; 111 protected int valueOffset = -1; 112 protected int keyLength; 113 protected int valueLength; 114 protected int lastCommonPrefix; 115 protected int tagsLength = 0; 116 protected int tagsOffset = -1; 117 protected int tagsCompressedLength = 0; 118 protected boolean uncompressTags = true; 119 120 /** We need to store a copy of the key. */ 121 protected byte[] keyBuffer = HConstants.EMPTY_BYTE_ARRAY; 122 protected byte[] tagsBuffer = HConstants.EMPTY_BYTE_ARRAY; 123 124 protected long memstoreTS; 125 protected int nextKvOffset; 126 protected KeyValue.KeyOnlyKeyValue currentKey = new KeyValue.KeyOnlyKeyValue(); 127 // A temp pair object which will be reused by ByteBuff#asSubByteBuffer calls. This avoids too 128 // many object creations. 129 private final ObjectIntPair<ByteBuffer> tmpPair; 130 private final boolean includeTags; 131 132 public SeekerState(ObjectIntPair<ByteBuffer> tmpPair, boolean includeTags) { 133 this.tmpPair = tmpPair; 134 this.includeTags = includeTags; 135 } 136 137 protected boolean isValid() { 138 return valueOffset != -1; 139 } 140 141 protected void invalidate() { 142 valueOffset = -1; 143 tagsCompressedLength = 0; 144 currentKey.clear(); 145 uncompressTags = true; 146 currentBuffer = null; 147 } 148 149 protected void ensureSpaceForKey() { 150 if (keyLength > keyBuffer.length) { 151 int newKeyBufferLength = 152 Integer.highestOneBit(Math.max(INITIAL_KEY_BUFFER_SIZE, keyLength) - 1) << 1; 153 byte[] newKeyBuffer = new byte[newKeyBufferLength]; 154 System.arraycopy(keyBuffer, 0, newKeyBuffer, 0, keyBuffer.length); 155 keyBuffer = newKeyBuffer; 156 } 157 } 158 159 protected void ensureSpaceForTags() { 160 if (tagsLength > tagsBuffer.length) { 161 int newTagsBufferLength = 162 Integer.highestOneBit(Math.max(INITIAL_KEY_BUFFER_SIZE, tagsLength) - 1) << 1; 163 byte[] newTagsBuffer = new byte[newTagsBufferLength]; 164 System.arraycopy(tagsBuffer, 0, newTagsBuffer, 0, tagsBuffer.length); 165 tagsBuffer = newTagsBuffer; 166 } 167 } 168 169 protected void setKey(byte[] keyBuffer, long memTS) { 170 currentKey.setKey(keyBuffer, 0, keyLength); 171 memstoreTS = memTS; 172 } 173 174 /** 175 * Copy the state from the next one into this instance (the previous state placeholder). Used to 176 * save the previous state when we are advancing the seeker to the next key/value. 177 */ 178 protected void copyFromNext(SeekerState nextState) { 179 if (keyBuffer.length != nextState.keyBuffer.length) { 180 keyBuffer = nextState.keyBuffer.clone(); 181 } else if (!isValid()) { 182 // Note: we can only call isValid before we override our state, so this 183 // comes before all the assignments at the end of this method. 184 System.arraycopy(nextState.keyBuffer, 0, keyBuffer, 0, nextState.keyLength); 185 } else { 186 // don't copy the common prefix between this key and the previous one 187 System.arraycopy(nextState.keyBuffer, nextState.lastCommonPrefix, keyBuffer, 188 nextState.lastCommonPrefix, nextState.keyLength - nextState.lastCommonPrefix); 189 } 190 currentKey.set(nextState.currentKey); 191 192 valueOffset = nextState.valueOffset; 193 keyLength = nextState.keyLength; 194 valueLength = nextState.valueLength; 195 lastCommonPrefix = nextState.lastCommonPrefix; 196 nextKvOffset = nextState.nextKvOffset; 197 memstoreTS = nextState.memstoreTS; 198 currentBuffer = nextState.currentBuffer; 199 tagsOffset = nextState.tagsOffset; 200 tagsLength = nextState.tagsLength; 201 if (nextState.tagCompressionContext != null) { 202 tagCompressionContext = nextState.tagCompressionContext; 203 } 204 } 205 206 public Cell toCell() { 207 // Buffer backing the value and tags part from the HFileBlock's buffer 208 // When tag compression in use, this will be only the value bytes area. 209 ByteBuffer valAndTagsBuffer; 210 int vOffset; 211 int valAndTagsLength = this.valueLength; 212 int tagsLenSerializationSize = 0; 213 if (this.includeTags && this.tagCompressionContext == null) { 214 // Include the tags part also. This will be the tags bytes + 2 bytes of for storing tags 215 // length 216 tagsLenSerializationSize = this.tagsOffset - (this.valueOffset + this.valueLength); 217 valAndTagsLength += tagsLenSerializationSize + this.tagsLength; 218 } 219 this.currentBuffer.asSubByteBuffer(this.valueOffset, valAndTagsLength, this.tmpPair); 220 valAndTagsBuffer = this.tmpPair.getFirst(); 221 vOffset = this.tmpPair.getSecond();// This is the offset to value part in the BB 222 if (valAndTagsBuffer.hasArray()) { 223 return toOnheapCell(valAndTagsBuffer, vOffset, tagsLenSerializationSize); 224 } else { 225 return toOffheapCell(valAndTagsBuffer, vOffset, tagsLenSerializationSize); 226 } 227 } 228 229 private Cell toOnheapCell(ByteBuffer valAndTagsBuffer, int vOffset, 230 int tagsLenSerializationSize) { 231 byte[] tagsArray = HConstants.EMPTY_BYTE_ARRAY; 232 int tOffset = 0; 233 if (this.includeTags) { 234 if (this.tagCompressionContext == null) { 235 tOffset = 236 valAndTagsBuffer.arrayOffset() + vOffset + this.valueLength + tagsLenSerializationSize; 237 tagsArray = Bytes.copy(valAndTagsBuffer.array(), tOffset, this.tagsLength); 238 tOffset = 0; 239 } else { 240 tagsArray = Bytes.copy(tagsBuffer, 0, this.tagsLength); 241 tOffset = 0; 242 } 243 } 244 return new OnheapDecodedCell(Bytes.copy(keyBuffer, 0, this.keyLength), 245 currentKey.getRowLength(), currentKey.getFamilyOffset(), currentKey.getFamilyLength(), 246 currentKey.getQualifierOffset(), currentKey.getQualifierLength(), currentKey.getTimestamp(), 247 currentKey.getTypeByte(), Bytes.copy(valAndTagsBuffer.array(), 248 valAndTagsBuffer.arrayOffset() + vOffset, this.valueLength), 249 0, this.valueLength, memstoreTS, tagsArray, tOffset, this.tagsLength); 250 } 251 252 private Cell toOffheapCell(ByteBuffer valAndTagsBuffer, int vOffset, 253 int tagsLenSerializationSize) { 254 ByteBuffer tagsBuf = HConstants.EMPTY_BYTE_BUFFER; 255 int tOffset = 0; 256 if (this.includeTags) { 257 if (this.tagCompressionContext == null) { 258 tOffset = vOffset + this.valueLength + tagsLenSerializationSize; 259 byte[] output = new byte[this.tagsLength]; 260 ByteBufferUtils.copyFromBufferToArray(output, valAndTagsBuffer, tOffset, 0, 261 this.tagsLength); 262 tagsBuf = ByteBuffer.wrap(output); 263 tOffset = 0; 264 } else { 265 tagsBuf = ByteBuffer.wrap(Bytes.copy(tagsBuffer, 0, this.tagsLength)); 266 tOffset = 0; 267 } 268 } 269 270 if (this.valueLength > 0) { 271 byte[] output = new byte[this.valueLength]; 272 ByteBufferUtils.copyFromBufferToArray(output, valAndTagsBuffer, vOffset, 0, 273 this.valueLength); 274 valAndTagsBuffer = ByteBuffer.wrap(output); 275 vOffset = 0; 276 } 277 278 return new OffheapDecodedExtendedCell( 279 ByteBuffer.wrap(Bytes.copy(keyBuffer, 0, this.keyLength)), currentKey.getRowLength(), 280 currentKey.getFamilyOffset(), currentKey.getFamilyLength(), currentKey.getQualifierOffset(), 281 currentKey.getQualifierLength(), currentKey.getTimestamp(), currentKey.getTypeByte(), 282 valAndTagsBuffer, vOffset, this.valueLength, memstoreTS, tagsBuf, tOffset, this.tagsLength); 283 } 284 } 285 286 /** 287 * Copies only the key part of the keybuffer by doing a deep copy and passes the seeker state 288 * members for taking a clone. Note that the value byte[] part is still pointing to the 289 * currentBuffer and represented by the valueOffset and valueLength 290 */ 291 // We return this as a Cell to the upper layers of read flow and might try setting a new SeqId 292 // there. So this has to be an instance of ExtendedCell. 293 protected static class OnheapDecodedCell implements ExtendedCell { 294 private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT 295 + (3 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT) 296 + (Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) + (3 * ClassSize.ARRAY)); 297 private byte[] keyOnlyBuffer; 298 private short rowLength; 299 private int familyOffset; 300 private byte familyLength; 301 private int qualifierOffset; 302 private int qualifierLength; 303 private long timestamp; 304 private byte typeByte; 305 private byte[] valueBuffer; 306 private int valueOffset; 307 private int valueLength; 308 private byte[] tagsBuffer; 309 private int tagsOffset; 310 private int tagsLength; 311 private long seqId; 312 313 protected OnheapDecodedCell(byte[] keyBuffer, short rowLength, int familyOffset, 314 byte familyLength, int qualOffset, int qualLength, long timeStamp, byte typeByte, 315 byte[] valueBuffer, int valueOffset, int valueLen, long seqId, byte[] tagsBuffer, 316 int tagsOffset, int tagsLength) { 317 this.keyOnlyBuffer = keyBuffer; 318 this.rowLength = rowLength; 319 this.familyOffset = familyOffset; 320 this.familyLength = familyLength; 321 this.qualifierOffset = qualOffset; 322 this.qualifierLength = qualLength; 323 this.timestamp = timeStamp; 324 this.typeByte = typeByte; 325 this.valueBuffer = valueBuffer; 326 this.valueOffset = valueOffset; 327 this.valueLength = valueLen; 328 this.tagsBuffer = tagsBuffer; 329 this.tagsOffset = tagsOffset; 330 this.tagsLength = tagsLength; 331 setSequenceId(seqId); 332 } 333 334 @Override 335 public byte[] getRowArray() { 336 return keyOnlyBuffer; 337 } 338 339 @Override 340 public byte[] getFamilyArray() { 341 return keyOnlyBuffer; 342 } 343 344 @Override 345 public byte[] getQualifierArray() { 346 return keyOnlyBuffer; 347 } 348 349 @Override 350 public int getRowOffset() { 351 return Bytes.SIZEOF_SHORT; 352 } 353 354 @Override 355 public short getRowLength() { 356 return rowLength; 357 } 358 359 @Override 360 public int getFamilyOffset() { 361 return familyOffset; 362 } 363 364 @Override 365 public byte getFamilyLength() { 366 return familyLength; 367 } 368 369 @Override 370 public int getQualifierOffset() { 371 return qualifierOffset; 372 } 373 374 @Override 375 public int getQualifierLength() { 376 return qualifierLength; 377 } 378 379 @Override 380 public long getTimestamp() { 381 return timestamp; 382 } 383 384 @Override 385 public byte getTypeByte() { 386 return typeByte; 387 } 388 389 @Override 390 public long getSequenceId() { 391 return seqId; 392 } 393 394 @Override 395 public byte[] getValueArray() { 396 return this.valueBuffer; 397 } 398 399 @Override 400 public int getValueOffset() { 401 return valueOffset; 402 } 403 404 @Override 405 public int getValueLength() { 406 return valueLength; 407 } 408 409 @Override 410 public byte[] getTagsArray() { 411 return this.tagsBuffer; 412 } 413 414 @Override 415 public int getTagsOffset() { 416 return this.tagsOffset; 417 } 418 419 @Override 420 public int getTagsLength() { 421 return tagsLength; 422 } 423 424 @Override 425 public String toString() { 426 return KeyValue.keyToString(this.keyOnlyBuffer, 0, KeyValueUtil.keyLength(this)) + "/vlen=" 427 + getValueLength() + "/seqid=" + seqId; 428 } 429 430 @Override 431 public void setSequenceId(long seqId) { 432 this.seqId = seqId; 433 } 434 435 @Override 436 public long heapSize() { 437 return FIXED_OVERHEAD + rowLength + familyLength + qualifierLength + valueLength + tagsLength; 438 } 439 440 @Override 441 public int write(OutputStream out, boolean withTags) throws IOException { 442 int lenToWrite = getSerializedSize(withTags); 443 ByteBufferUtils.putInt(out, keyOnlyBuffer.length); 444 ByteBufferUtils.putInt(out, valueLength); 445 // Write key 446 out.write(keyOnlyBuffer); 447 // Write value 448 out.write(this.valueBuffer, this.valueOffset, this.valueLength); 449 if (withTags && this.tagsLength > 0) { 450 // 2 bytes tags length followed by tags bytes 451 // tags length is serialized with 2 bytes only(short way) even if the type is int. 452 // As this is non -ve numbers, we save the sign bit. See HBASE-11437 453 out.write((byte) (0xff & (this.tagsLength >> 8))); 454 out.write((byte) (0xff & this.tagsLength)); 455 out.write(this.tagsBuffer, this.tagsOffset, this.tagsLength); 456 } 457 return lenToWrite; 458 } 459 460 @Override 461 public int getSerializedSize(boolean withTags) { 462 return KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength, tagsLength, 463 withTags); 464 } 465 466 @Override 467 public void write(ByteBuffer buf, int offset) { 468 // This is not used in actual flow. Throwing UnsupportedOperationException 469 throw new UnsupportedOperationException(); 470 } 471 472 @Override 473 public void setTimestamp(long ts) throws IOException { 474 // This is not used in actual flow. Throwing UnsupportedOperationException 475 throw new UnsupportedOperationException(); 476 } 477 478 @Override 479 public void setTimestamp(byte[] ts) throws IOException { 480 // This is not used in actual flow. Throwing UnsupportedOperationException 481 throw new UnsupportedOperationException(); 482 } 483 484 @Override 485 public ExtendedCell deepClone() { 486 // This is not used in actual flow. Throwing UnsupportedOperationException 487 throw new UnsupportedOperationException(); 488 } 489 } 490 491 protected static class OffheapDecodedExtendedCell extends ByteBufferExtendedCell { 492 private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT 493 + (3 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT) 494 + (Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) + (3 * ClassSize.BYTE_BUFFER)); 495 private ByteBuffer keyBuffer; 496 private short rowLength; 497 private int familyOffset; 498 private byte familyLength; 499 private int qualifierOffset; 500 private int qualifierLength; 501 private long timestamp; 502 private byte typeByte; 503 private ByteBuffer valueBuffer; 504 private int valueOffset; 505 private int valueLength; 506 private ByteBuffer tagsBuffer; 507 private int tagsOffset; 508 private int tagsLength; 509 private long seqId; 510 511 protected OffheapDecodedExtendedCell(ByteBuffer keyBuffer, short rowLength, int familyOffset, 512 byte familyLength, int qualOffset, int qualLength, long timeStamp, byte typeByte, 513 ByteBuffer valueBuffer, int valueOffset, int valueLen, long seqId, ByteBuffer tagsBuffer, 514 int tagsOffset, int tagsLength) { 515 // The keyBuffer is always onheap 516 assert keyBuffer.hasArray(); 517 assert keyBuffer.arrayOffset() == 0; 518 this.keyBuffer = keyBuffer; 519 this.rowLength = rowLength; 520 this.familyOffset = familyOffset; 521 this.familyLength = familyLength; 522 this.qualifierOffset = qualOffset; 523 this.qualifierLength = qualLength; 524 this.timestamp = timeStamp; 525 this.typeByte = typeByte; 526 this.valueBuffer = valueBuffer; 527 this.valueOffset = valueOffset; 528 this.valueLength = valueLen; 529 this.tagsBuffer = tagsBuffer; 530 this.tagsOffset = tagsOffset; 531 this.tagsLength = tagsLength; 532 setSequenceId(seqId); 533 } 534 535 @Override 536 public byte[] getRowArray() { 537 return this.keyBuffer.array(); 538 } 539 540 @Override 541 public int getRowOffset() { 542 return getRowPosition(); 543 } 544 545 @Override 546 public short getRowLength() { 547 return this.rowLength; 548 } 549 550 @Override 551 public byte[] getFamilyArray() { 552 return this.keyBuffer.array(); 553 } 554 555 @Override 556 public int getFamilyOffset() { 557 return getFamilyPosition(); 558 } 559 560 @Override 561 public byte getFamilyLength() { 562 return this.familyLength; 563 } 564 565 @Override 566 public byte[] getQualifierArray() { 567 return this.keyBuffer.array(); 568 } 569 570 @Override 571 public int getQualifierOffset() { 572 return getQualifierPosition(); 573 } 574 575 @Override 576 public int getQualifierLength() { 577 return this.qualifierLength; 578 } 579 580 @Override 581 public long getTimestamp() { 582 return this.timestamp; 583 } 584 585 @Override 586 public byte getTypeByte() { 587 return this.typeByte; 588 } 589 590 @Override 591 public long getSequenceId() { 592 return this.seqId; 593 } 594 595 @Override 596 public byte[] getValueArray() { 597 return CellUtil.cloneValue(this); 598 } 599 600 @Override 601 public int getValueOffset() { 602 return 0; 603 } 604 605 @Override 606 public int getValueLength() { 607 return this.valueLength; 608 } 609 610 @Override 611 public byte[] getTagsArray() { 612 return PrivateCellUtil.cloneTags(this); 613 } 614 615 @Override 616 public int getTagsOffset() { 617 return 0; 618 } 619 620 @Override 621 public int getTagsLength() { 622 return this.tagsLength; 623 } 624 625 @Override 626 public ByteBuffer getRowByteBuffer() { 627 return this.keyBuffer; 628 } 629 630 @Override 631 public int getRowPosition() { 632 return Bytes.SIZEOF_SHORT; 633 } 634 635 @Override 636 public ByteBuffer getFamilyByteBuffer() { 637 return this.keyBuffer; 638 } 639 640 @Override 641 public int getFamilyPosition() { 642 return this.familyOffset; 643 } 644 645 @Override 646 public ByteBuffer getQualifierByteBuffer() { 647 return this.keyBuffer; 648 } 649 650 @Override 651 public int getQualifierPosition() { 652 return this.qualifierOffset; 653 } 654 655 @Override 656 public ByteBuffer getValueByteBuffer() { 657 return this.valueBuffer; 658 } 659 660 @Override 661 public int getValuePosition() { 662 return this.valueOffset; 663 } 664 665 @Override 666 public ByteBuffer getTagsByteBuffer() { 667 return this.tagsBuffer; 668 } 669 670 @Override 671 public int getTagsPosition() { 672 return this.tagsOffset; 673 } 674 675 @Override 676 public long heapSize() { 677 return FIXED_OVERHEAD; 678 } 679 680 @Override 681 public void setSequenceId(long seqId) { 682 this.seqId = seqId; 683 } 684 685 @Override 686 public int write(OutputStream out, boolean withTags) throws IOException { 687 int lenToWrite = getSerializedSize(withTags); 688 ByteBufferUtils.putInt(out, keyBuffer.capacity()); 689 ByteBufferUtils.putInt(out, valueLength); 690 // Write key 691 out.write(keyBuffer.array()); 692 // Write value 693 ByteBufferUtils.copyBufferToStream(out, this.valueBuffer, this.valueOffset, this.valueLength); 694 if (withTags && this.tagsLength > 0) { 695 // 2 bytes tags length followed by tags bytes 696 // tags length is serialized with 2 bytes only(short way) even if the type is int. 697 // As this is non -ve numbers, we save the sign bit. See HBASE-11437 698 out.write((byte) (0xff & (this.tagsLength >> 8))); 699 out.write((byte) (0xff & this.tagsLength)); 700 ByteBufferUtils.copyBufferToStream(out, this.tagsBuffer, this.tagsOffset, this.tagsLength); 701 } 702 return lenToWrite; 703 } 704 705 @Override 706 public int getSerializedSize(boolean withTags) { 707 return KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength, tagsLength, 708 withTags); 709 } 710 711 @Override 712 public void setTimestamp(long ts) throws IOException { 713 // This is not used in actual flow. Throwing UnsupportedOperationException 714 throw new UnsupportedOperationException(); 715 } 716 717 @Override 718 public void setTimestamp(byte[] ts) throws IOException { 719 // This is not used in actual flow. Throwing UnsupportedOperationException 720 throw new UnsupportedOperationException(); 721 } 722 723 @Override 724 public void write(ByteBuffer buf, int offset) { 725 // This is not used in actual flow. Throwing UnsupportedOperationException 726 throw new UnsupportedOperationException(); 727 } 728 729 @Override 730 public ExtendedCell deepClone() { 731 // This is not used in actual flow. Throwing UnsupportedOperationException 732 throw new UnsupportedOperationException(); 733 } 734 } 735 736 protected abstract static class BufferedEncodedSeeker<STATE extends SeekerState> 737 extends AbstractEncodedSeeker { 738 protected ByteBuff currentBuffer; 739 protected TagCompressionContext tagCompressionContext = null; 740 protected KeyValue.KeyOnlyKeyValue keyOnlyKV = new KeyValue.KeyOnlyKeyValue(); 741 // A temp pair object which will be reused by ByteBuff#asSubByteBuffer calls. This avoids too 742 // many object creations. 743 protected final ObjectIntPair<ByteBuffer> tmpPair = new ObjectIntPair<>(); 744 protected STATE current, previous; 745 746 public BufferedEncodedSeeker(HFileBlockDecodingContext decodingCtx) { 747 super(decodingCtx); 748 if (decodingCtx.getHFileContext().isCompressTags()) { 749 try { 750 tagCompressionContext = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE); 751 } catch (Exception e) { 752 throw new RuntimeException("Failed to initialize TagCompressionContext", e); 753 } 754 } 755 current = createSeekerState(); // always valid 756 previous = createSeekerState(); // may not be valid 757 } 758 759 @Override 760 public int compareKey(CellComparator comparator, Cell key) { 761 keyOnlyKV.setKey(current.keyBuffer, 0, current.keyLength); 762 return PrivateCellUtil.compareKeyIgnoresMvcc(comparator, key, keyOnlyKV); 763 } 764 765 @Override 766 public void setCurrentBuffer(ByteBuff buffer) { 767 if (this.tagCompressionContext != null) { 768 this.tagCompressionContext.clear(); 769 } 770 currentBuffer = buffer; 771 current.currentBuffer = currentBuffer; 772 if (tagCompressionContext != null) { 773 current.tagCompressionContext = tagCompressionContext; 774 } 775 decodeFirst(); 776 current.setKey(current.keyBuffer, current.memstoreTS); 777 previous.invalidate(); 778 } 779 780 @Override 781 public Cell getKey() { 782 byte[] key = new byte[current.keyLength]; 783 System.arraycopy(current.keyBuffer, 0, key, 0, current.keyLength); 784 return new KeyValue.KeyOnlyKeyValue(key); 785 } 786 787 @Override 788 public ByteBuffer getValueShallowCopy() { 789 currentBuffer.asSubByteBuffer(current.valueOffset, current.valueLength, tmpPair); 790 ByteBuffer dup = tmpPair.getFirst().duplicate(); 791 dup.position(tmpPair.getSecond()); 792 dup.limit(tmpPair.getSecond() + current.valueLength); 793 return dup.slice(); 794 } 795 796 @Override 797 public Cell getCell() { 798 return current.toCell(); 799 } 800 801 @Override 802 public void rewind() { 803 currentBuffer.rewind(); 804 if (tagCompressionContext != null) { 805 tagCompressionContext.clear(); 806 } 807 decodeFirst(); 808 current.setKey(current.keyBuffer, current.memstoreTS); 809 previous.invalidate(); 810 } 811 812 @Override 813 public boolean next() { 814 if (!currentBuffer.hasRemaining()) { 815 return false; 816 } 817 decodeNext(); 818 current.setKey(current.keyBuffer, current.memstoreTS); 819 previous.invalidate(); 820 return true; 821 } 822 823 protected void decodeTags() { 824 current.tagsLength = ByteBuff.readCompressedInt(currentBuffer); 825 if (tagCompressionContext != null) { 826 if (current.uncompressTags) { 827 // Tag compression is been used. uncompress it into tagsBuffer 828 current.ensureSpaceForTags(); 829 try { 830 current.tagsCompressedLength = tagCompressionContext.uncompressTags(currentBuffer, 831 current.tagsBuffer, 0, current.tagsLength); 832 } catch (IOException e) { 833 throw new RuntimeException("Exception while uncompressing tags", e); 834 } 835 } else { 836 currentBuffer.skip(current.tagsCompressedLength); 837 current.uncompressTags = true;// Reset this. 838 } 839 current.tagsOffset = -1; 840 } else { 841 // When tag compress is not used, let us not do copying of tags bytes into tagsBuffer. 842 // Just mark the tags Offset so as to create the KV buffer later in getKeyValueBuffer() 843 current.tagsOffset = currentBuffer.position(); 844 currentBuffer.skip(current.tagsLength); 845 } 846 } 847 848 @Override 849 public int seekToKeyInBlock(Cell seekCell, boolean seekBefore) { 850 int rowCommonPrefix = 0; 851 int familyCommonPrefix = 0; 852 int qualCommonPrefix = 0; 853 previous.invalidate(); 854 do { 855 int comp; 856 keyOnlyKV.setKey(current.keyBuffer, 0, current.keyLength); 857 if (current.lastCommonPrefix != 0) { 858 // The KV format has row key length also in the byte array. The 859 // common prefix 860 // includes it. So we need to subtract to find out the common prefix 861 // in the 862 // row part alone 863 rowCommonPrefix = Math.min(rowCommonPrefix, current.lastCommonPrefix - 2); 864 } 865 if (current.lastCommonPrefix <= 2) { 866 rowCommonPrefix = 0; 867 } 868 rowCommonPrefix += findCommonPrefixInRowPart(seekCell, keyOnlyKV, rowCommonPrefix); 869 comp = compareCommonRowPrefix(seekCell, keyOnlyKV, rowCommonPrefix); 870 if (comp == 0) { 871 comp = compareTypeBytes(seekCell, keyOnlyKV); 872 if (comp == 0) { 873 // Subtract the fixed row key length and the family key fixed length 874 familyCommonPrefix = Math.max(0, Math.min(familyCommonPrefix, 875 current.lastCommonPrefix - (3 + keyOnlyKV.getRowLength()))); 876 familyCommonPrefix += 877 findCommonPrefixInFamilyPart(seekCell, keyOnlyKV, familyCommonPrefix); 878 comp = compareCommonFamilyPrefix(seekCell, keyOnlyKV, familyCommonPrefix); 879 if (comp == 0) { 880 // subtract the rowkey fixed length and the family key fixed 881 // length 882 qualCommonPrefix = Math.max(0, Math.min(qualCommonPrefix, current.lastCommonPrefix 883 - (3 + keyOnlyKV.getRowLength() + keyOnlyKV.getFamilyLength()))); 884 qualCommonPrefix += 885 findCommonPrefixInQualifierPart(seekCell, keyOnlyKV, qualCommonPrefix); 886 comp = compareCommonQualifierPrefix(seekCell, keyOnlyKV, qualCommonPrefix); 887 if (comp == 0) { 888 comp = CellComparator.getInstance().compareTimestamps(seekCell, keyOnlyKV); 889 if (comp == 0) { 890 // Compare types. Let the delete types sort ahead of puts; 891 // i.e. types 892 // of higher numbers sort before those of lesser numbers. 893 // Maximum 894 // (255) 895 // appears ahead of everything, and minimum (0) appears 896 // after 897 // everything. 898 comp = (0xff & keyOnlyKV.getTypeByte()) - (0xff & seekCell.getTypeByte()); 899 } 900 } 901 } 902 } 903 } 904 if (comp == 0) { // exact match 905 if (seekBefore) { 906 if (!previous.isValid()) { 907 // The caller (seekBefore) has to ensure that we are not at the 908 // first key in the block. 909 throw new IllegalStateException( 910 "Cannot seekBefore if " + "positioned at the first key in the block: key=" 911 + Bytes.toStringBinary(seekCell.getRowArray())); 912 } 913 moveToPrevious(); 914 return 1; 915 } 916 return 0; 917 } 918 919 if (comp < 0) { // already too large, check previous 920 if (previous.isValid()) { 921 moveToPrevious(); 922 } else { 923 return HConstants.INDEX_KEY_MAGIC; // using optimized index key 924 } 925 return 1; 926 } 927 928 // move to next, if more data is available 929 if (currentBuffer.hasRemaining()) { 930 previous.copyFromNext(current); 931 decodeNext(); 932 current.setKey(current.keyBuffer, current.memstoreTS); 933 } else { 934 break; 935 } 936 } while (true); 937 938 // we hit the end of the block, not an exact match 939 return 1; 940 } 941 942 private int compareTypeBytes(Cell key, Cell right) { 943 if ( 944 key.getFamilyLength() + key.getQualifierLength() == 0 945 && key.getTypeByte() == Type.Minimum.getCode() 946 ) { 947 // left is "bigger", i.e. it appears later in the sorted order 948 return 1; 949 } 950 if ( 951 right.getFamilyLength() + right.getQualifierLength() == 0 952 && right.getTypeByte() == Type.Minimum.getCode() 953 ) { 954 return -1; 955 } 956 return 0; 957 } 958 959 private static int findCommonPrefixInRowPart(Cell left, Cell right, int rowCommonPrefix) { 960 return Bytes.findCommonPrefix(left.getRowArray(), right.getRowArray(), 961 left.getRowLength() - rowCommonPrefix, right.getRowLength() - rowCommonPrefix, 962 left.getRowOffset() + rowCommonPrefix, right.getRowOffset() + rowCommonPrefix); 963 } 964 965 private static int findCommonPrefixInFamilyPart(Cell left, Cell right, int familyCommonPrefix) { 966 return Bytes.findCommonPrefix(left.getFamilyArray(), right.getFamilyArray(), 967 left.getFamilyLength() - familyCommonPrefix, right.getFamilyLength() - familyCommonPrefix, 968 left.getFamilyOffset() + familyCommonPrefix, right.getFamilyOffset() + familyCommonPrefix); 969 } 970 971 private static int findCommonPrefixInQualifierPart(Cell left, Cell right, 972 int qualifierCommonPrefix) { 973 return Bytes.findCommonPrefix(left.getQualifierArray(), right.getQualifierArray(), 974 left.getQualifierLength() - qualifierCommonPrefix, 975 right.getQualifierLength() - qualifierCommonPrefix, 976 left.getQualifierOffset() + qualifierCommonPrefix, 977 right.getQualifierOffset() + qualifierCommonPrefix); 978 } 979 980 private void moveToPrevious() { 981 if (!previous.isValid()) { 982 throw new IllegalStateException( 983 "Can move back only once and not in first key in the block."); 984 } 985 986 STATE tmp = previous; 987 previous = current; 988 current = tmp; 989 990 // move after last key value 991 currentBuffer.position(current.nextKvOffset); 992 // Already decoded the tag bytes. We cache this tags into current state and also the total 993 // compressed length of the tags bytes. For the next time decodeNext() we don't need to decode 994 // the tags again. This might pollute the Data Dictionary what we use for the compression. 995 // When current.uncompressTags is false, we will just reuse the current.tagsBuffer and skip 996 // 'tagsCompressedLength' bytes of source stream. 997 // See in decodeTags() 998 current.tagsBuffer = previous.tagsBuffer; 999 current.tagsCompressedLength = previous.tagsCompressedLength; 1000 current.uncompressTags = false; 1001 // The current key has to be reset with the previous Cell 1002 current.setKey(current.keyBuffer, current.memstoreTS); 1003 previous.invalidate(); 1004 } 1005 1006 @SuppressWarnings("unchecked") 1007 protected STATE createSeekerState() { 1008 // This will fail for non-default seeker state if the subclass does not 1009 // override this method. 1010 return (STATE) new SeekerState(this.tmpPair, this.includesTags()); 1011 } 1012 1013 abstract protected void decodeFirst(); 1014 1015 abstract protected void decodeNext(); 1016 } 1017 1018 /** 1019 * @return unencoded size added 1020 */ 1021 protected final int afterEncodingKeyValue(Cell cell, DataOutputStream out, 1022 HFileBlockDefaultEncodingContext encodingCtx) throws IOException { 1023 int size = 0; 1024 if (encodingCtx.getHFileContext().isIncludesTags()) { 1025 int tagsLength = cell.getTagsLength(); 1026 ByteBufferUtils.putCompressedInt(out, tagsLength); 1027 // There are some tags to be written 1028 if (tagsLength > 0) { 1029 TagCompressionContext tagCompressionContext = encodingCtx.getTagCompressionContext(); 1030 // When tag compression is enabled, tagCompressionContext will have a not null value. Write 1031 // the tags using Dictionary compression in such a case 1032 if (tagCompressionContext != null) { 1033 // Not passing tagsLength considering that parsing of the tagsLength is not costly 1034 PrivateCellUtil.compressTags(out, cell, tagCompressionContext); 1035 } else { 1036 PrivateCellUtil.writeTags(out, cell, tagsLength); 1037 } 1038 } 1039 size += tagsLength + KeyValue.TAGS_LENGTH_SIZE; 1040 } 1041 if (encodingCtx.getHFileContext().isIncludesMvcc()) { 1042 // Copy memstore timestamp from the byte buffer to the output stream. 1043 long memstoreTS = cell.getSequenceId(); 1044 WritableUtils.writeVLong(out, memstoreTS); 1045 // TODO use a writeVLong which returns the #bytes written so that 2 time parsing can be 1046 // avoided. 1047 size += WritableUtils.getVIntSize(memstoreTS); 1048 } 1049 return size; 1050 } 1051 1052 protected final void afterDecodingKeyValue(DataInputStream source, ByteBuffer dest, 1053 HFileBlockDefaultDecodingContext decodingCtx) throws IOException { 1054 if (decodingCtx.getHFileContext().isIncludesTags()) { 1055 int tagsLength = ByteBufferUtils.readCompressedInt(source); 1056 // Put as unsigned short 1057 dest.put((byte) ((tagsLength >> 8) & 0xff)); 1058 dest.put((byte) (tagsLength & 0xff)); 1059 if (tagsLength > 0) { 1060 TagCompressionContext tagCompressionContext = decodingCtx.getTagCompressionContext(); 1061 // When tag compression is been used in this file, tagCompressionContext will have a not 1062 // null value passed. 1063 if (tagCompressionContext != null) { 1064 tagCompressionContext.uncompressTags(source, dest, tagsLength); 1065 } else { 1066 ByteBufferUtils.copyFromStreamToBuffer(dest, source, tagsLength); 1067 } 1068 } 1069 } 1070 if (decodingCtx.getHFileContext().isIncludesMvcc()) { 1071 long memstoreTS = -1; 1072 try { 1073 // Copy memstore timestamp from the data input stream to the byte 1074 // buffer. 1075 memstoreTS = WritableUtils.readVLong(source); 1076 ByteBufferUtils.writeVLong(dest, memstoreTS); 1077 } catch (IOException ex) { 1078 throw new RuntimeException( 1079 "Unable to copy memstore timestamp " + memstoreTS + " after decoding a key/value"); 1080 } 1081 } 1082 } 1083 1084 protected abstract ByteBuffer internalDecodeKeyValues(DataInputStream source, 1085 int allocateHeaderLength, int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx) 1086 throws IOException; 1087 1088 /** 1089 * Asserts that there is at least the given amount of unfilled space remaining in the given 1090 * buffer. 1091 * @param out typically, the buffer we are writing to 1092 * @param length the required space in the buffer 1093 * @throws EncoderBufferTooSmallException If there are no enough bytes. 1094 */ 1095 protected static void ensureSpace(ByteBuffer out, int length) 1096 throws EncoderBufferTooSmallException { 1097 if (out.position() + length > out.limit()) { 1098 throw new EncoderBufferTooSmallException("Buffer position=" + out.position() 1099 + ", buffer limit=" + out.limit() + ", length to be written=" + length); 1100 } 1101 } 1102 1103 @Override 1104 public void startBlockEncoding(HFileBlockEncodingContext blkEncodingCtx, DataOutputStream out) 1105 throws IOException { 1106 if (blkEncodingCtx.getClass() != HFileBlockDefaultEncodingContext.class) { 1107 throw new IOException(this.getClass().getName() + " only accepts " 1108 + HFileBlockDefaultEncodingContext.class.getName() + " as the " + "encoding context."); 1109 } 1110 1111 HFileBlockDefaultEncodingContext encodingCtx = 1112 (HFileBlockDefaultEncodingContext) blkEncodingCtx; 1113 encodingCtx.prepareEncoding(out); 1114 if ( 1115 encodingCtx.getHFileContext().isIncludesTags() 1116 && encodingCtx.getHFileContext().isCompressTags() 1117 ) { 1118 if (encodingCtx.getTagCompressionContext() != null) { 1119 // It will be overhead to create the TagCompressionContext again and again for every block 1120 // encoding. 1121 encodingCtx.getTagCompressionContext().clear(); 1122 } else { 1123 try { 1124 TagCompressionContext tagCompressionContext = 1125 new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE); 1126 encodingCtx.setTagCompressionContext(tagCompressionContext); 1127 } catch (Exception e) { 1128 throw new IOException("Failed to initialize TagCompressionContext", e); 1129 } 1130 } 1131 } 1132 StreamUtils.writeInt(out, 0); // DUMMY length. This will be updated in endBlockEncoding() 1133 blkEncodingCtx.setEncodingState(new EncodingState()); 1134 } 1135 1136 @Override 1137 public void encode(Cell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out) 1138 throws IOException { 1139 EncodingState state = encodingCtx.getEncodingState(); 1140 int posBeforeEncode = out.size(); 1141 int encodedKvSize = internalEncode(cell, (HFileBlockDefaultEncodingContext) encodingCtx, out); 1142 state.postCellEncode(encodedKvSize, out.size() - posBeforeEncode); 1143 } 1144 1145 public abstract int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingCtx, 1146 DataOutputStream out) throws IOException; 1147 1148 @Override 1149 public void endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out, 1150 byte[] uncompressedBytesWithHeader) throws IOException { 1151 EncodingState state = encodingCtx.getEncodingState(); 1152 // Write the unencodedDataSizeWritten (with header size) 1153 Bytes.putInt(uncompressedBytesWithHeader, 1154 HConstants.HFILEBLOCK_HEADER_SIZE + DataBlockEncoding.ID_SIZE, 1155 state.getUnencodedDataSizeWritten()); 1156 postEncoding(encodingCtx); 1157 } 1158 1159}