001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.encoding; 019 020import java.io.DataInputStream; 021import java.io.DataOutputStream; 022import java.io.IOException; 023import java.io.OutputStream; 024import java.nio.ByteBuffer; 025import org.apache.hadoop.hbase.ByteBufferExtendedCell; 026import org.apache.hadoop.hbase.Cell; 027import org.apache.hadoop.hbase.CellComparator; 028import org.apache.hadoop.hbase.CellUtil; 029import org.apache.hadoop.hbase.ExtendedCell; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.KeyValue; 032import org.apache.hadoop.hbase.KeyValue.Type; 033import org.apache.hadoop.hbase.KeyValueUtil; 034import org.apache.hadoop.hbase.PrivateCellUtil; 035import org.apache.hadoop.hbase.io.TagCompressionContext; 036import org.apache.hadoop.hbase.io.util.LRUDictionary; 037import org.apache.hadoop.hbase.io.util.StreamUtils; 038import org.apache.hadoop.hbase.nio.ByteBuff; 039import org.apache.hadoop.hbase.util.ByteBufferUtils; 040import org.apache.hadoop.hbase.util.Bytes; 041import org.apache.hadoop.hbase.util.ClassSize; 042import org.apache.hadoop.hbase.util.ObjectIntPair; 043import org.apache.hadoop.io.WritableUtils; 044import org.apache.yetus.audience.InterfaceAudience; 045 046/** 047 * Base class for all data block encoders that use a buffer. 048 */ 049@InterfaceAudience.Private 050abstract class BufferedDataBlockEncoder extends AbstractDataBlockEncoder { 051 /** 052 * TODO: This datablockencoder is dealing in internals of hfileblocks. Purge reference to HFBs 053 */ 054 private static int INITIAL_KEY_BUFFER_SIZE = 512; 055 056 @Override 057 public ByteBuffer decodeKeyValues(DataInputStream source, 058 HFileBlockDecodingContext blkDecodingCtx) throws IOException { 059 if (blkDecodingCtx.getClass() != HFileBlockDefaultDecodingContext.class) { 060 throw new IOException(this.getClass().getName() + " only accepts " 061 + HFileBlockDefaultDecodingContext.class.getName() + " as the decoding context."); 062 } 063 064 HFileBlockDefaultDecodingContext decodingCtx = 065 (HFileBlockDefaultDecodingContext) blkDecodingCtx; 066 if ( 067 decodingCtx.getHFileContext().isIncludesTags() 068 && decodingCtx.getHFileContext().isCompressTags() 069 ) { 070 if (decodingCtx.getTagCompressionContext() != null) { 071 // It will be overhead to create the TagCompressionContext again and again for every block 072 // decoding. 073 decodingCtx.getTagCompressionContext().clear(); 074 } else { 075 try { 076 TagCompressionContext tagCompressionContext = 077 new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE); 078 decodingCtx.setTagCompressionContext(tagCompressionContext); 079 } catch (Exception e) { 080 throw new IOException("Failed to initialize TagCompressionContext", e); 081 } 082 } 083 } 084 return internalDecodeKeyValues(source, 0, 0, decodingCtx); 085 } 086 087 /********************* common prefixes *************************/ 088 // Having this as static is fine but if META is having DBE then we should 089 // change this. 090 public static int compareCommonRowPrefix(Cell left, Cell right, int rowCommonPrefix) { 091 return Bytes.compareTo(left.getRowArray(), left.getRowOffset() + rowCommonPrefix, 092 left.getRowLength() - rowCommonPrefix, right.getRowArray(), 093 right.getRowOffset() + rowCommonPrefix, right.getRowLength() - rowCommonPrefix); 094 } 095 096 public static int compareCommonFamilyPrefix(Cell left, Cell right, int familyCommonPrefix) { 097 return Bytes.compareTo(left.getFamilyArray(), left.getFamilyOffset() + familyCommonPrefix, 098 left.getFamilyLength() - familyCommonPrefix, right.getFamilyArray(), 099 right.getFamilyOffset() + familyCommonPrefix, right.getFamilyLength() - familyCommonPrefix); 100 } 101 102 public static int compareCommonQualifierPrefix(Cell left, Cell right, int qualCommonPrefix) { 103 return Bytes.compareTo(left.getQualifierArray(), left.getQualifierOffset() + qualCommonPrefix, 104 left.getQualifierLength() - qualCommonPrefix, right.getQualifierArray(), 105 right.getQualifierOffset() + qualCommonPrefix, right.getQualifierLength() - qualCommonPrefix); 106 } 107 108 protected static class SeekerState { 109 protected ByteBuff currentBuffer; 110 protected TagCompressionContext tagCompressionContext; 111 protected int valueOffset = -1; 112 protected int keyLength; 113 protected int valueLength; 114 protected int lastCommonPrefix; 115 protected int tagsLength = 0; 116 protected int tagsOffset = -1; 117 protected int tagsCompressedLength = 0; 118 protected boolean uncompressTags = true; 119 120 /** We need to store a copy of the key. */ 121 protected byte[] keyBuffer = HConstants.EMPTY_BYTE_ARRAY; 122 protected byte[] tagsBuffer = HConstants.EMPTY_BYTE_ARRAY; 123 124 protected long memstoreTS; 125 protected int nextKvOffset; 126 protected KeyValue.KeyOnlyKeyValue currentKey = new KeyValue.KeyOnlyKeyValue(); 127 // A temp pair object which will be reused by ByteBuff#asSubByteBuffer calls. This avoids too 128 // many object creations. 129 private final ObjectIntPair<ByteBuffer> tmpPair; 130 private final boolean includeTags; 131 132 public SeekerState(ObjectIntPair<ByteBuffer> tmpPair, boolean includeTags) { 133 this.tmpPair = tmpPair; 134 this.includeTags = includeTags; 135 } 136 137 protected boolean isValid() { 138 return valueOffset != -1; 139 } 140 141 protected void invalidate() { 142 valueOffset = -1; 143 tagsCompressedLength = 0; 144 currentKey.clear(); 145 uncompressTags = true; 146 currentBuffer = null; 147 } 148 149 protected void ensureSpaceForKey() { 150 if (keyLength > keyBuffer.length) { 151 int newKeyBufferLength = 152 Integer.highestOneBit(Math.max(INITIAL_KEY_BUFFER_SIZE, keyLength) - 1) << 1; 153 byte[] newKeyBuffer = new byte[newKeyBufferLength]; 154 System.arraycopy(keyBuffer, 0, newKeyBuffer, 0, keyBuffer.length); 155 keyBuffer = newKeyBuffer; 156 } 157 } 158 159 protected void ensureSpaceForTags() { 160 if (tagsLength > tagsBuffer.length) { 161 int newTagsBufferLength = 162 Integer.highestOneBit(Math.max(INITIAL_KEY_BUFFER_SIZE, tagsLength) - 1) << 1; 163 byte[] newTagsBuffer = new byte[newTagsBufferLength]; 164 System.arraycopy(tagsBuffer, 0, newTagsBuffer, 0, tagsBuffer.length); 165 tagsBuffer = newTagsBuffer; 166 } 167 } 168 169 protected void setKey(byte[] keyBuffer, long memTS) { 170 currentKey.setKey(keyBuffer, 0, keyLength); 171 memstoreTS = memTS; 172 } 173 174 /** 175 * Copy the state from the next one into this instance (the previous state placeholder). Used to 176 * save the previous state when we are advancing the seeker to the next key/value. 177 */ 178 protected void copyFromNext(SeekerState nextState) { 179 if (keyBuffer.length != nextState.keyBuffer.length) { 180 keyBuffer = nextState.keyBuffer.clone(); 181 } else if (!isValid()) { 182 // Note: we can only call isValid before we override our state, so this 183 // comes before all the assignments at the end of this method. 184 System.arraycopy(nextState.keyBuffer, 0, keyBuffer, 0, nextState.keyLength); 185 } else { 186 // don't copy the common prefix between this key and the previous one 187 System.arraycopy(nextState.keyBuffer, nextState.lastCommonPrefix, keyBuffer, 188 nextState.lastCommonPrefix, nextState.keyLength - nextState.lastCommonPrefix); 189 } 190 currentKey.set(nextState.currentKey); 191 192 valueOffset = nextState.valueOffset; 193 keyLength = nextState.keyLength; 194 valueLength = nextState.valueLength; 195 lastCommonPrefix = nextState.lastCommonPrefix; 196 nextKvOffset = nextState.nextKvOffset; 197 memstoreTS = nextState.memstoreTS; 198 currentBuffer = nextState.currentBuffer; 199 tagsOffset = nextState.tagsOffset; 200 tagsLength = nextState.tagsLength; 201 if (nextState.tagCompressionContext != null) { 202 tagCompressionContext = nextState.tagCompressionContext; 203 } 204 } 205 206 public Cell toCell() { 207 // Buffer backing the value and tags part from the HFileBlock's buffer 208 // When tag compression in use, this will be only the value bytes area. 209 ByteBuffer valAndTagsBuffer; 210 int vOffset; 211 int valAndTagsLength = this.valueLength; 212 int tagsLenSerializationSize = 0; 213 if (this.includeTags && this.tagCompressionContext == null) { 214 // Include the tags part also. This will be the tags bytes + 2 bytes of for storing tags 215 // length 216 tagsLenSerializationSize = this.tagsOffset - (this.valueOffset + this.valueLength); 217 valAndTagsLength += tagsLenSerializationSize + this.tagsLength; 218 } 219 this.currentBuffer.asSubByteBuffer(this.valueOffset, valAndTagsLength, this.tmpPair); 220 valAndTagsBuffer = this.tmpPair.getFirst(); 221 vOffset = this.tmpPair.getSecond();// This is the offset to value part in the BB 222 if (valAndTagsBuffer.hasArray()) { 223 return toOnheapCell(valAndTagsBuffer, vOffset, tagsLenSerializationSize); 224 } else { 225 return toOffheapCell(valAndTagsBuffer, vOffset, tagsLenSerializationSize); 226 } 227 } 228 229 private Cell toOnheapCell(ByteBuffer valAndTagsBuffer, int vOffset, 230 int tagsLenSerializationSize) { 231 byte[] tagsArray = HConstants.EMPTY_BYTE_ARRAY; 232 int tOffset = 0; 233 if (this.includeTags) { 234 if (this.tagCompressionContext == null) { 235 tagsArray = valAndTagsBuffer.array(); 236 tOffset = 237 valAndTagsBuffer.arrayOffset() + vOffset + this.valueLength + tagsLenSerializationSize; 238 } else { 239 tagsArray = Bytes.copy(tagsBuffer, 0, this.tagsLength); 240 tOffset = 0; 241 } 242 } 243 return new OnheapDecodedCell(Bytes.copy(keyBuffer, 0, this.keyLength), 244 currentKey.getRowLength(), currentKey.getFamilyOffset(), currentKey.getFamilyLength(), 245 currentKey.getQualifierOffset(), currentKey.getQualifierLength(), currentKey.getTimestamp(), 246 currentKey.getTypeByte(), valAndTagsBuffer.array(), 247 valAndTagsBuffer.arrayOffset() + vOffset, this.valueLength, memstoreTS, tagsArray, tOffset, 248 this.tagsLength); 249 } 250 251 private Cell toOffheapCell(ByteBuffer valAndTagsBuffer, int vOffset, 252 int tagsLenSerializationSize) { 253 ByteBuffer tagsBuf = HConstants.EMPTY_BYTE_BUFFER; 254 int tOffset = 0; 255 if (this.includeTags) { 256 if (this.tagCompressionContext == null) { 257 tagsBuf = valAndTagsBuffer; 258 tOffset = vOffset + this.valueLength + tagsLenSerializationSize; 259 } else { 260 tagsBuf = ByteBuffer.wrap(Bytes.copy(tagsBuffer, 0, this.tagsLength)); 261 tOffset = 0; 262 } 263 } 264 return new OffheapDecodedExtendedCell( 265 ByteBuffer.wrap(Bytes.copy(keyBuffer, 0, this.keyLength)), currentKey.getRowLength(), 266 currentKey.getFamilyOffset(), currentKey.getFamilyLength(), currentKey.getQualifierOffset(), 267 currentKey.getQualifierLength(), currentKey.getTimestamp(), currentKey.getTypeByte(), 268 valAndTagsBuffer, vOffset, this.valueLength, memstoreTS, tagsBuf, tOffset, this.tagsLength); 269 } 270 } 271 272 /** 273 * Copies only the key part of the keybuffer by doing a deep copy and passes the seeker state 274 * members for taking a clone. Note that the value byte[] part is still pointing to the 275 * currentBuffer and represented by the valueOffset and valueLength 276 */ 277 // We return this as a Cell to the upper layers of read flow and might try setting a new SeqId 278 // there. So this has to be an instance of ExtendedCell. 279 protected static class OnheapDecodedCell implements ExtendedCell { 280 private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT 281 + (3 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT) 282 + (Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) + (3 * ClassSize.ARRAY)); 283 private byte[] keyOnlyBuffer; 284 private short rowLength; 285 private int familyOffset; 286 private byte familyLength; 287 private int qualifierOffset; 288 private int qualifierLength; 289 private long timestamp; 290 private byte typeByte; 291 private byte[] valueBuffer; 292 private int valueOffset; 293 private int valueLength; 294 private byte[] tagsBuffer; 295 private int tagsOffset; 296 private int tagsLength; 297 private long seqId; 298 299 protected OnheapDecodedCell(byte[] keyBuffer, short rowLength, int familyOffset, 300 byte familyLength, int qualOffset, int qualLength, long timeStamp, byte typeByte, 301 byte[] valueBuffer, int valueOffset, int valueLen, long seqId, byte[] tagsBuffer, 302 int tagsOffset, int tagsLength) { 303 this.keyOnlyBuffer = keyBuffer; 304 this.rowLength = rowLength; 305 this.familyOffset = familyOffset; 306 this.familyLength = familyLength; 307 this.qualifierOffset = qualOffset; 308 this.qualifierLength = qualLength; 309 this.timestamp = timeStamp; 310 this.typeByte = typeByte; 311 this.valueBuffer = valueBuffer; 312 this.valueOffset = valueOffset; 313 this.valueLength = valueLen; 314 this.tagsBuffer = tagsBuffer; 315 this.tagsOffset = tagsOffset; 316 this.tagsLength = tagsLength; 317 setSequenceId(seqId); 318 } 319 320 @Override 321 public byte[] getRowArray() { 322 return keyOnlyBuffer; 323 } 324 325 @Override 326 public byte[] getFamilyArray() { 327 return keyOnlyBuffer; 328 } 329 330 @Override 331 public byte[] getQualifierArray() { 332 return keyOnlyBuffer; 333 } 334 335 @Override 336 public int getRowOffset() { 337 return Bytes.SIZEOF_SHORT; 338 } 339 340 @Override 341 public short getRowLength() { 342 return rowLength; 343 } 344 345 @Override 346 public int getFamilyOffset() { 347 return familyOffset; 348 } 349 350 @Override 351 public byte getFamilyLength() { 352 return familyLength; 353 } 354 355 @Override 356 public int getQualifierOffset() { 357 return qualifierOffset; 358 } 359 360 @Override 361 public int getQualifierLength() { 362 return qualifierLength; 363 } 364 365 @Override 366 public long getTimestamp() { 367 return timestamp; 368 } 369 370 @Override 371 public byte getTypeByte() { 372 return typeByte; 373 } 374 375 @Override 376 public long getSequenceId() { 377 return seqId; 378 } 379 380 @Override 381 public byte[] getValueArray() { 382 return this.valueBuffer; 383 } 384 385 @Override 386 public int getValueOffset() { 387 return valueOffset; 388 } 389 390 @Override 391 public int getValueLength() { 392 return valueLength; 393 } 394 395 @Override 396 public byte[] getTagsArray() { 397 return this.tagsBuffer; 398 } 399 400 @Override 401 public int getTagsOffset() { 402 return this.tagsOffset; 403 } 404 405 @Override 406 public int getTagsLength() { 407 return tagsLength; 408 } 409 410 @Override 411 public String toString() { 412 return KeyValue.keyToString(this.keyOnlyBuffer, 0, KeyValueUtil.keyLength(this)) + "/vlen=" 413 + getValueLength() + "/seqid=" + seqId; 414 } 415 416 @Override 417 public void setSequenceId(long seqId) { 418 this.seqId = seqId; 419 } 420 421 @Override 422 public long heapSize() { 423 return FIXED_OVERHEAD + rowLength + familyLength + qualifierLength + valueLength + tagsLength; 424 } 425 426 @Override 427 public int write(OutputStream out, boolean withTags) throws IOException { 428 int lenToWrite = getSerializedSize(withTags); 429 ByteBufferUtils.putInt(out, keyOnlyBuffer.length); 430 ByteBufferUtils.putInt(out, valueLength); 431 // Write key 432 out.write(keyOnlyBuffer); 433 // Write value 434 out.write(this.valueBuffer, this.valueOffset, this.valueLength); 435 if (withTags && this.tagsLength > 0) { 436 // 2 bytes tags length followed by tags bytes 437 // tags length is serialized with 2 bytes only(short way) even if the type is int. 438 // As this is non -ve numbers, we save the sign bit. See HBASE-11437 439 out.write((byte) (0xff & (this.tagsLength >> 8))); 440 out.write((byte) (0xff & this.tagsLength)); 441 out.write(this.tagsBuffer, this.tagsOffset, this.tagsLength); 442 } 443 return lenToWrite; 444 } 445 446 @Override 447 public int getSerializedSize(boolean withTags) { 448 return KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength, tagsLength, 449 withTags); 450 } 451 452 @Override 453 public void write(ByteBuffer buf, int offset) { 454 // This is not used in actual flow. Throwing UnsupportedOperationException 455 throw new UnsupportedOperationException(); 456 } 457 458 @Override 459 public void setTimestamp(long ts) throws IOException { 460 // This is not used in actual flow. Throwing UnsupportedOperationException 461 throw new UnsupportedOperationException(); 462 } 463 464 @Override 465 public void setTimestamp(byte[] ts) throws IOException { 466 // This is not used in actual flow. Throwing UnsupportedOperationException 467 throw new UnsupportedOperationException(); 468 } 469 470 @Override 471 public ExtendedCell deepClone() { 472 // This is not used in actual flow. Throwing UnsupportedOperationException 473 throw new UnsupportedOperationException(); 474 } 475 } 476 477 protected static class OffheapDecodedExtendedCell extends ByteBufferExtendedCell { 478 private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT 479 + (3 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT) 480 + (Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) + (3 * ClassSize.BYTE_BUFFER)); 481 private ByteBuffer keyBuffer; 482 private short rowLength; 483 private int familyOffset; 484 private byte familyLength; 485 private int qualifierOffset; 486 private int qualifierLength; 487 private long timestamp; 488 private byte typeByte; 489 private ByteBuffer valueBuffer; 490 private int valueOffset; 491 private int valueLength; 492 private ByteBuffer tagsBuffer; 493 private int tagsOffset; 494 private int tagsLength; 495 private long seqId; 496 497 protected OffheapDecodedExtendedCell(ByteBuffer keyBuffer, short rowLength, int familyOffset, 498 byte familyLength, int qualOffset, int qualLength, long timeStamp, byte typeByte, 499 ByteBuffer valueBuffer, int valueOffset, int valueLen, long seqId, ByteBuffer tagsBuffer, 500 int tagsOffset, int tagsLength) { 501 // The keyBuffer is always onheap 502 assert keyBuffer.hasArray(); 503 assert keyBuffer.arrayOffset() == 0; 504 this.keyBuffer = keyBuffer; 505 this.rowLength = rowLength; 506 this.familyOffset = familyOffset; 507 this.familyLength = familyLength; 508 this.qualifierOffset = qualOffset; 509 this.qualifierLength = qualLength; 510 this.timestamp = timeStamp; 511 this.typeByte = typeByte; 512 this.valueBuffer = valueBuffer; 513 this.valueOffset = valueOffset; 514 this.valueLength = valueLen; 515 this.tagsBuffer = tagsBuffer; 516 this.tagsOffset = tagsOffset; 517 this.tagsLength = tagsLength; 518 setSequenceId(seqId); 519 } 520 521 @Override 522 public byte[] getRowArray() { 523 return this.keyBuffer.array(); 524 } 525 526 @Override 527 public int getRowOffset() { 528 return getRowPosition(); 529 } 530 531 @Override 532 public short getRowLength() { 533 return this.rowLength; 534 } 535 536 @Override 537 public byte[] getFamilyArray() { 538 return this.keyBuffer.array(); 539 } 540 541 @Override 542 public int getFamilyOffset() { 543 return getFamilyPosition(); 544 } 545 546 @Override 547 public byte getFamilyLength() { 548 return this.familyLength; 549 } 550 551 @Override 552 public byte[] getQualifierArray() { 553 return this.keyBuffer.array(); 554 } 555 556 @Override 557 public int getQualifierOffset() { 558 return getQualifierPosition(); 559 } 560 561 @Override 562 public int getQualifierLength() { 563 return this.qualifierLength; 564 } 565 566 @Override 567 public long getTimestamp() { 568 return this.timestamp; 569 } 570 571 @Override 572 public byte getTypeByte() { 573 return this.typeByte; 574 } 575 576 @Override 577 public long getSequenceId() { 578 return this.seqId; 579 } 580 581 @Override 582 public byte[] getValueArray() { 583 return CellUtil.cloneValue(this); 584 } 585 586 @Override 587 public int getValueOffset() { 588 return 0; 589 } 590 591 @Override 592 public int getValueLength() { 593 return this.valueLength; 594 } 595 596 @Override 597 public byte[] getTagsArray() { 598 return PrivateCellUtil.cloneTags(this); 599 } 600 601 @Override 602 public int getTagsOffset() { 603 return 0; 604 } 605 606 @Override 607 public int getTagsLength() { 608 return this.tagsLength; 609 } 610 611 @Override 612 public ByteBuffer getRowByteBuffer() { 613 return this.keyBuffer; 614 } 615 616 @Override 617 public int getRowPosition() { 618 return Bytes.SIZEOF_SHORT; 619 } 620 621 @Override 622 public ByteBuffer getFamilyByteBuffer() { 623 return this.keyBuffer; 624 } 625 626 @Override 627 public int getFamilyPosition() { 628 return this.familyOffset; 629 } 630 631 @Override 632 public ByteBuffer getQualifierByteBuffer() { 633 return this.keyBuffer; 634 } 635 636 @Override 637 public int getQualifierPosition() { 638 return this.qualifierOffset; 639 } 640 641 @Override 642 public ByteBuffer getValueByteBuffer() { 643 return this.valueBuffer; 644 } 645 646 @Override 647 public int getValuePosition() { 648 return this.valueOffset; 649 } 650 651 @Override 652 public ByteBuffer getTagsByteBuffer() { 653 return this.tagsBuffer; 654 } 655 656 @Override 657 public int getTagsPosition() { 658 return this.tagsOffset; 659 } 660 661 @Override 662 public long heapSize() { 663 return FIXED_OVERHEAD; 664 } 665 666 @Override 667 public void setSequenceId(long seqId) { 668 this.seqId = seqId; 669 } 670 671 @Override 672 public int write(OutputStream out, boolean withTags) throws IOException { 673 int lenToWrite = getSerializedSize(withTags); 674 ByteBufferUtils.putInt(out, keyBuffer.capacity()); 675 ByteBufferUtils.putInt(out, valueLength); 676 // Write key 677 out.write(keyBuffer.array()); 678 // Write value 679 ByteBufferUtils.copyBufferToStream(out, this.valueBuffer, this.valueOffset, this.valueLength); 680 if (withTags && this.tagsLength > 0) { 681 // 2 bytes tags length followed by tags bytes 682 // tags length is serialized with 2 bytes only(short way) even if the type is int. 683 // As this is non -ve numbers, we save the sign bit. See HBASE-11437 684 out.write((byte) (0xff & (this.tagsLength >> 8))); 685 out.write((byte) (0xff & this.tagsLength)); 686 ByteBufferUtils.copyBufferToStream(out, this.tagsBuffer, this.tagsOffset, this.tagsLength); 687 } 688 return lenToWrite; 689 } 690 691 @Override 692 public int getSerializedSize(boolean withTags) { 693 return KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength, tagsLength, 694 withTags); 695 } 696 697 @Override 698 public void setTimestamp(long ts) throws IOException { 699 // This is not used in actual flow. Throwing UnsupportedOperationException 700 throw new UnsupportedOperationException(); 701 } 702 703 @Override 704 public void setTimestamp(byte[] ts) throws IOException { 705 // This is not used in actual flow. Throwing UnsupportedOperationException 706 throw new UnsupportedOperationException(); 707 } 708 709 @Override 710 public void write(ByteBuffer buf, int offset) { 711 // This is not used in actual flow. Throwing UnsupportedOperationException 712 throw new UnsupportedOperationException(); 713 } 714 715 @Override 716 public ExtendedCell deepClone() { 717 // This is not used in actual flow. Throwing UnsupportedOperationException 718 throw new UnsupportedOperationException(); 719 } 720 } 721 722 protected abstract static class BufferedEncodedSeeker<STATE extends SeekerState> 723 extends AbstractEncodedSeeker { 724 protected ByteBuff currentBuffer; 725 protected TagCompressionContext tagCompressionContext = null; 726 protected KeyValue.KeyOnlyKeyValue keyOnlyKV = new KeyValue.KeyOnlyKeyValue(); 727 // A temp pair object which will be reused by ByteBuff#asSubByteBuffer calls. This avoids too 728 // many object creations. 729 protected final ObjectIntPair<ByteBuffer> tmpPair = new ObjectIntPair<>(); 730 protected STATE current, previous; 731 732 public BufferedEncodedSeeker(HFileBlockDecodingContext decodingCtx) { 733 super(decodingCtx); 734 if (decodingCtx.getHFileContext().isCompressTags()) { 735 try { 736 tagCompressionContext = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE); 737 } catch (Exception e) { 738 throw new RuntimeException("Failed to initialize TagCompressionContext", e); 739 } 740 } 741 current = createSeekerState(); // always valid 742 previous = createSeekerState(); // may not be valid 743 } 744 745 @Override 746 public int compareKey(CellComparator comparator, Cell key) { 747 keyOnlyKV.setKey(current.keyBuffer, 0, current.keyLength); 748 return PrivateCellUtil.compareKeyIgnoresMvcc(comparator, key, keyOnlyKV); 749 } 750 751 @Override 752 public void setCurrentBuffer(ByteBuff buffer) { 753 if (this.tagCompressionContext != null) { 754 this.tagCompressionContext.clear(); 755 } 756 currentBuffer = buffer; 757 current.currentBuffer = currentBuffer; 758 if (tagCompressionContext != null) { 759 current.tagCompressionContext = tagCompressionContext; 760 } 761 decodeFirst(); 762 current.setKey(current.keyBuffer, current.memstoreTS); 763 previous.invalidate(); 764 } 765 766 @Override 767 public Cell getKey() { 768 byte[] key = new byte[current.keyLength]; 769 System.arraycopy(current.keyBuffer, 0, key, 0, current.keyLength); 770 return new KeyValue.KeyOnlyKeyValue(key); 771 } 772 773 @Override 774 public ByteBuffer getValueShallowCopy() { 775 currentBuffer.asSubByteBuffer(current.valueOffset, current.valueLength, tmpPair); 776 ByteBuffer dup = tmpPair.getFirst().duplicate(); 777 dup.position(tmpPair.getSecond()); 778 dup.limit(tmpPair.getSecond() + current.valueLength); 779 return dup.slice(); 780 } 781 782 @Override 783 public Cell getCell() { 784 return current.toCell(); 785 } 786 787 @Override 788 public void rewind() { 789 currentBuffer.rewind(); 790 if (tagCompressionContext != null) { 791 tagCompressionContext.clear(); 792 } 793 decodeFirst(); 794 current.setKey(current.keyBuffer, current.memstoreTS); 795 previous.invalidate(); 796 } 797 798 @Override 799 public boolean next() { 800 if (!currentBuffer.hasRemaining()) { 801 return false; 802 } 803 decodeNext(); 804 current.setKey(current.keyBuffer, current.memstoreTS); 805 previous.invalidate(); 806 return true; 807 } 808 809 protected void decodeTags() { 810 current.tagsLength = ByteBuff.readCompressedInt(currentBuffer); 811 if (tagCompressionContext != null) { 812 if (current.uncompressTags) { 813 // Tag compression is been used. uncompress it into tagsBuffer 814 current.ensureSpaceForTags(); 815 try { 816 current.tagsCompressedLength = tagCompressionContext.uncompressTags(currentBuffer, 817 current.tagsBuffer, 0, current.tagsLength); 818 } catch (IOException e) { 819 throw new RuntimeException("Exception while uncompressing tags", e); 820 } 821 } else { 822 currentBuffer.skip(current.tagsCompressedLength); 823 current.uncompressTags = true;// Reset this. 824 } 825 current.tagsOffset = -1; 826 } else { 827 // When tag compress is not used, let us not do copying of tags bytes into tagsBuffer. 828 // Just mark the tags Offset so as to create the KV buffer later in getKeyValueBuffer() 829 current.tagsOffset = currentBuffer.position(); 830 currentBuffer.skip(current.tagsLength); 831 } 832 } 833 834 @Override 835 public int seekToKeyInBlock(Cell seekCell, boolean seekBefore) { 836 int rowCommonPrefix = 0; 837 int familyCommonPrefix = 0; 838 int qualCommonPrefix = 0; 839 previous.invalidate(); 840 do { 841 int comp; 842 keyOnlyKV.setKey(current.keyBuffer, 0, current.keyLength); 843 if (current.lastCommonPrefix != 0) { 844 // The KV format has row key length also in the byte array. The 845 // common prefix 846 // includes it. So we need to subtract to find out the common prefix 847 // in the 848 // row part alone 849 rowCommonPrefix = Math.min(rowCommonPrefix, current.lastCommonPrefix - 2); 850 } 851 if (current.lastCommonPrefix <= 2) { 852 rowCommonPrefix = 0; 853 } 854 rowCommonPrefix += findCommonPrefixInRowPart(seekCell, keyOnlyKV, rowCommonPrefix); 855 comp = compareCommonRowPrefix(seekCell, keyOnlyKV, rowCommonPrefix); 856 if (comp == 0) { 857 comp = compareTypeBytes(seekCell, keyOnlyKV); 858 if (comp == 0) { 859 // Subtract the fixed row key length and the family key fixed length 860 familyCommonPrefix = Math.max(0, Math.min(familyCommonPrefix, 861 current.lastCommonPrefix - (3 + keyOnlyKV.getRowLength()))); 862 familyCommonPrefix += 863 findCommonPrefixInFamilyPart(seekCell, keyOnlyKV, familyCommonPrefix); 864 comp = compareCommonFamilyPrefix(seekCell, keyOnlyKV, familyCommonPrefix); 865 if (comp == 0) { 866 // subtract the rowkey fixed length and the family key fixed 867 // length 868 qualCommonPrefix = Math.max(0, Math.min(qualCommonPrefix, current.lastCommonPrefix 869 - (3 + keyOnlyKV.getRowLength() + keyOnlyKV.getFamilyLength()))); 870 qualCommonPrefix += 871 findCommonPrefixInQualifierPart(seekCell, keyOnlyKV, qualCommonPrefix); 872 comp = compareCommonQualifierPrefix(seekCell, keyOnlyKV, qualCommonPrefix); 873 if (comp == 0) { 874 comp = CellComparator.getInstance().compareTimestamps(seekCell, keyOnlyKV); 875 if (comp == 0) { 876 // Compare types. Let the delete types sort ahead of puts; 877 // i.e. types 878 // of higher numbers sort before those of lesser numbers. 879 // Maximum 880 // (255) 881 // appears ahead of everything, and minimum (0) appears 882 // after 883 // everything. 884 comp = (0xff & keyOnlyKV.getTypeByte()) - (0xff & seekCell.getTypeByte()); 885 } 886 } 887 } 888 } 889 } 890 if (comp == 0) { // exact match 891 if (seekBefore) { 892 if (!previous.isValid()) { 893 // The caller (seekBefore) has to ensure that we are not at the 894 // first key in the block. 895 throw new IllegalStateException( 896 "Cannot seekBefore if " + "positioned at the first key in the block: key=" 897 + Bytes.toStringBinary(seekCell.getRowArray())); 898 } 899 moveToPrevious(); 900 return 1; 901 } 902 return 0; 903 } 904 905 if (comp < 0) { // already too large, check previous 906 if (previous.isValid()) { 907 moveToPrevious(); 908 } else { 909 return HConstants.INDEX_KEY_MAGIC; // using optimized index key 910 } 911 return 1; 912 } 913 914 // move to next, if more data is available 915 if (currentBuffer.hasRemaining()) { 916 previous.copyFromNext(current); 917 decodeNext(); 918 current.setKey(current.keyBuffer, current.memstoreTS); 919 } else { 920 break; 921 } 922 } while (true); 923 924 // we hit the end of the block, not an exact match 925 return 1; 926 } 927 928 private int compareTypeBytes(Cell key, Cell right) { 929 if ( 930 key.getFamilyLength() + key.getQualifierLength() == 0 931 && key.getTypeByte() == Type.Minimum.getCode() 932 ) { 933 // left is "bigger", i.e. it appears later in the sorted order 934 return 1; 935 } 936 if ( 937 right.getFamilyLength() + right.getQualifierLength() == 0 938 && right.getTypeByte() == Type.Minimum.getCode() 939 ) { 940 return -1; 941 } 942 return 0; 943 } 944 945 private static int findCommonPrefixInRowPart(Cell left, Cell right, int rowCommonPrefix) { 946 return Bytes.findCommonPrefix(left.getRowArray(), right.getRowArray(), 947 left.getRowLength() - rowCommonPrefix, right.getRowLength() - rowCommonPrefix, 948 left.getRowOffset() + rowCommonPrefix, right.getRowOffset() + rowCommonPrefix); 949 } 950 951 private static int findCommonPrefixInFamilyPart(Cell left, Cell right, int familyCommonPrefix) { 952 return Bytes.findCommonPrefix(left.getFamilyArray(), right.getFamilyArray(), 953 left.getFamilyLength() - familyCommonPrefix, right.getFamilyLength() - familyCommonPrefix, 954 left.getFamilyOffset() + familyCommonPrefix, right.getFamilyOffset() + familyCommonPrefix); 955 } 956 957 private static int findCommonPrefixInQualifierPart(Cell left, Cell right, 958 int qualifierCommonPrefix) { 959 return Bytes.findCommonPrefix(left.getQualifierArray(), right.getQualifierArray(), 960 left.getQualifierLength() - qualifierCommonPrefix, 961 right.getQualifierLength() - qualifierCommonPrefix, 962 left.getQualifierOffset() + qualifierCommonPrefix, 963 right.getQualifierOffset() + qualifierCommonPrefix); 964 } 965 966 private void moveToPrevious() { 967 if (!previous.isValid()) { 968 throw new IllegalStateException( 969 "Can move back only once and not in first key in the block."); 970 } 971 972 STATE tmp = previous; 973 previous = current; 974 current = tmp; 975 976 // move after last key value 977 currentBuffer.position(current.nextKvOffset); 978 // Already decoded the tag bytes. We cache this tags into current state and also the total 979 // compressed length of the tags bytes. For the next time decodeNext() we don't need to decode 980 // the tags again. This might pollute the Data Dictionary what we use for the compression. 981 // When current.uncompressTags is false, we will just reuse the current.tagsBuffer and skip 982 // 'tagsCompressedLength' bytes of source stream. 983 // See in decodeTags() 984 current.tagsBuffer = previous.tagsBuffer; 985 current.tagsCompressedLength = previous.tagsCompressedLength; 986 current.uncompressTags = false; 987 // The current key has to be reset with the previous Cell 988 current.setKey(current.keyBuffer, current.memstoreTS); 989 previous.invalidate(); 990 } 991 992 @SuppressWarnings("unchecked") 993 protected STATE createSeekerState() { 994 // This will fail for non-default seeker state if the subclass does not 995 // override this method. 996 return (STATE) new SeekerState(this.tmpPair, this.includesTags()); 997 } 998 999 abstract protected void decodeFirst(); 1000 1001 abstract protected void decodeNext(); 1002 } 1003 1004 /** 1005 * @return unencoded size added 1006 */ 1007 protected final int afterEncodingKeyValue(Cell cell, DataOutputStream out, 1008 HFileBlockDefaultEncodingContext encodingCtx) throws IOException { 1009 int size = 0; 1010 if (encodingCtx.getHFileContext().isIncludesTags()) { 1011 int tagsLength = cell.getTagsLength(); 1012 ByteBufferUtils.putCompressedInt(out, tagsLength); 1013 // There are some tags to be written 1014 if (tagsLength > 0) { 1015 TagCompressionContext tagCompressionContext = encodingCtx.getTagCompressionContext(); 1016 // When tag compression is enabled, tagCompressionContext will have a not null value. Write 1017 // the tags using Dictionary compression in such a case 1018 if (tagCompressionContext != null) { 1019 // Not passing tagsLength considering that parsing of the tagsLength is not costly 1020 PrivateCellUtil.compressTags(out, cell, tagCompressionContext); 1021 } else { 1022 PrivateCellUtil.writeTags(out, cell, tagsLength); 1023 } 1024 } 1025 size += tagsLength + KeyValue.TAGS_LENGTH_SIZE; 1026 } 1027 if (encodingCtx.getHFileContext().isIncludesMvcc()) { 1028 // Copy memstore timestamp from the byte buffer to the output stream. 1029 long memstoreTS = cell.getSequenceId(); 1030 WritableUtils.writeVLong(out, memstoreTS); 1031 // TODO use a writeVLong which returns the #bytes written so that 2 time parsing can be 1032 // avoided. 1033 size += WritableUtils.getVIntSize(memstoreTS); 1034 } 1035 return size; 1036 } 1037 1038 protected final void afterDecodingKeyValue(DataInputStream source, ByteBuffer dest, 1039 HFileBlockDefaultDecodingContext decodingCtx) throws IOException { 1040 if (decodingCtx.getHFileContext().isIncludesTags()) { 1041 int tagsLength = ByteBufferUtils.readCompressedInt(source); 1042 // Put as unsigned short 1043 dest.put((byte) ((tagsLength >> 8) & 0xff)); 1044 dest.put((byte) (tagsLength & 0xff)); 1045 if (tagsLength > 0) { 1046 TagCompressionContext tagCompressionContext = decodingCtx.getTagCompressionContext(); 1047 // When tag compression is been used in this file, tagCompressionContext will have a not 1048 // null value passed. 1049 if (tagCompressionContext != null) { 1050 tagCompressionContext.uncompressTags(source, dest, tagsLength); 1051 } else { 1052 ByteBufferUtils.copyFromStreamToBuffer(dest, source, tagsLength); 1053 } 1054 } 1055 } 1056 if (decodingCtx.getHFileContext().isIncludesMvcc()) { 1057 long memstoreTS = -1; 1058 try { 1059 // Copy memstore timestamp from the data input stream to the byte 1060 // buffer. 1061 memstoreTS = WritableUtils.readVLong(source); 1062 ByteBufferUtils.writeVLong(dest, memstoreTS); 1063 } catch (IOException ex) { 1064 throw new RuntimeException( 1065 "Unable to copy memstore timestamp " + memstoreTS + " after decoding a key/value"); 1066 } 1067 } 1068 } 1069 1070 protected abstract ByteBuffer internalDecodeKeyValues(DataInputStream source, 1071 int allocateHeaderLength, int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx) 1072 throws IOException; 1073 1074 /** 1075 * Asserts that there is at least the given amount of unfilled space remaining in the given 1076 * buffer. 1077 * @param out typically, the buffer we are writing to 1078 * @param length the required space in the buffer 1079 * @throws EncoderBufferTooSmallException If there are no enough bytes. 1080 */ 1081 protected static void ensureSpace(ByteBuffer out, int length) 1082 throws EncoderBufferTooSmallException { 1083 if (out.position() + length > out.limit()) { 1084 throw new EncoderBufferTooSmallException("Buffer position=" + out.position() 1085 + ", buffer limit=" + out.limit() + ", length to be written=" + length); 1086 } 1087 } 1088 1089 @Override 1090 public void startBlockEncoding(HFileBlockEncodingContext blkEncodingCtx, DataOutputStream out) 1091 throws IOException { 1092 if (blkEncodingCtx.getClass() != HFileBlockDefaultEncodingContext.class) { 1093 throw new IOException(this.getClass().getName() + " only accepts " 1094 + HFileBlockDefaultEncodingContext.class.getName() + " as the " + "encoding context."); 1095 } 1096 1097 HFileBlockDefaultEncodingContext encodingCtx = 1098 (HFileBlockDefaultEncodingContext) blkEncodingCtx; 1099 encodingCtx.prepareEncoding(out); 1100 if ( 1101 encodingCtx.getHFileContext().isIncludesTags() 1102 && encodingCtx.getHFileContext().isCompressTags() 1103 ) { 1104 if (encodingCtx.getTagCompressionContext() != null) { 1105 // It will be overhead to create the TagCompressionContext again and again for every block 1106 // encoding. 1107 encodingCtx.getTagCompressionContext().clear(); 1108 } else { 1109 try { 1110 TagCompressionContext tagCompressionContext = 1111 new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE); 1112 encodingCtx.setTagCompressionContext(tagCompressionContext); 1113 } catch (Exception e) { 1114 throw new IOException("Failed to initialize TagCompressionContext", e); 1115 } 1116 } 1117 } 1118 StreamUtils.writeInt(out, 0); // DUMMY length. This will be updated in endBlockEncoding() 1119 blkEncodingCtx.setEncodingState(new EncodingState()); 1120 } 1121 1122 @Override 1123 public void encode(Cell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out) 1124 throws IOException { 1125 EncodingState state = encodingCtx.getEncodingState(); 1126 int posBeforeEncode = out.size(); 1127 int encodedKvSize = internalEncode(cell, (HFileBlockDefaultEncodingContext) encodingCtx, out); 1128 state.postCellEncode(encodedKvSize, out.size() - posBeforeEncode); 1129 } 1130 1131 public abstract int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingCtx, 1132 DataOutputStream out) throws IOException; 1133 1134 @Override 1135 public void endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out, 1136 byte[] uncompressedBytesWithHeader) throws IOException { 1137 EncodingState state = encodingCtx.getEncodingState(); 1138 // Write the unencodedDataSizeWritten (with header size) 1139 Bytes.putInt(uncompressedBytesWithHeader, 1140 HConstants.HFILEBLOCK_HEADER_SIZE + DataBlockEncoding.ID_SIZE, 1141 state.getUnencodedDataSizeWritten()); 1142 postEncoding(encodingCtx); 1143 } 1144 1145}