001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.encoding; 019 020import java.io.DataInputStream; 021import java.io.DataOutputStream; 022import java.io.IOException; 023import java.io.OutputStream; 024import java.nio.ByteBuffer; 025import org.apache.hadoop.hbase.ByteBufferExtendedCell; 026import org.apache.hadoop.hbase.Cell; 027import org.apache.hadoop.hbase.CellComparator; 028import org.apache.hadoop.hbase.CellUtil; 029import org.apache.hadoop.hbase.ExtendedCell; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.KeyValue; 032import org.apache.hadoop.hbase.KeyValueUtil; 033import org.apache.hadoop.hbase.PrivateCellUtil; 034import org.apache.hadoop.hbase.io.TagCompressionContext; 035import org.apache.hadoop.hbase.io.util.LRUDictionary; 036import org.apache.hadoop.hbase.io.util.StreamUtils; 037import org.apache.hadoop.hbase.nio.ByteBuff; 038import org.apache.hadoop.hbase.util.ByteBufferUtils; 039import org.apache.hadoop.hbase.util.Bytes; 040import org.apache.hadoop.hbase.util.ClassSize; 041import org.apache.hadoop.hbase.util.ObjectIntPair; 042import org.apache.hadoop.io.WritableUtils; 043import org.apache.yetus.audience.InterfaceAudience; 044 045/** 046 * Base class for all data block encoders that use a buffer. 047 */ 048@InterfaceAudience.Private 049abstract class BufferedDataBlockEncoder extends AbstractDataBlockEncoder { 050 /** 051 * TODO: This datablockencoder is dealing in internals of hfileblocks. Purge reference to HFBs 052 */ 053 private static int INITIAL_KEY_BUFFER_SIZE = 512; 054 055 @Override 056 public ByteBuffer decodeKeyValues(DataInputStream source, 057 HFileBlockDecodingContext blkDecodingCtx) throws IOException { 058 if (blkDecodingCtx.getClass() != HFileBlockDefaultDecodingContext.class) { 059 throw new IOException(this.getClass().getName() + " only accepts " 060 + HFileBlockDefaultDecodingContext.class.getName() + " as the decoding context."); 061 } 062 063 HFileBlockDefaultDecodingContext decodingCtx = 064 (HFileBlockDefaultDecodingContext) blkDecodingCtx; 065 if ( 066 decodingCtx.getHFileContext().isIncludesTags() 067 && decodingCtx.getHFileContext().isCompressTags() 068 ) { 069 if (decodingCtx.getTagCompressionContext() != null) { 070 // It will be overhead to create the TagCompressionContext again and again for every block 071 // decoding. 072 decodingCtx.getTagCompressionContext().clear(); 073 } else { 074 try { 075 TagCompressionContext tagCompressionContext = 076 new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE); 077 decodingCtx.setTagCompressionContext(tagCompressionContext); 078 } catch (Exception e) { 079 throw new IOException("Failed to initialize TagCompressionContext", e); 080 } 081 } 082 } 083 return internalDecodeKeyValues(source, 0, 0, decodingCtx); 084 } 085 086 /********************* common prefixes *************************/ 087 // Having this as static is fine but if META is having DBE then we should 088 // change this. 089 public static int compareCommonRowPrefix(Cell left, Cell right, int rowCommonPrefix) { 090 return Bytes.compareTo(left.getRowArray(), left.getRowOffset() + rowCommonPrefix, 091 left.getRowLength() - rowCommonPrefix, right.getRowArray(), 092 right.getRowOffset() + rowCommonPrefix, right.getRowLength() - rowCommonPrefix); 093 } 094 095 public static int compareCommonFamilyPrefix(Cell left, Cell right, int familyCommonPrefix) { 096 return Bytes.compareTo(left.getFamilyArray(), left.getFamilyOffset() + familyCommonPrefix, 097 left.getFamilyLength() - familyCommonPrefix, right.getFamilyArray(), 098 right.getFamilyOffset() + familyCommonPrefix, right.getFamilyLength() - familyCommonPrefix); 099 } 100 101 public static int compareCommonQualifierPrefix(Cell left, Cell right, int qualCommonPrefix) { 102 return Bytes.compareTo(left.getQualifierArray(), left.getQualifierOffset() + qualCommonPrefix, 103 left.getQualifierLength() - qualCommonPrefix, right.getQualifierArray(), 104 right.getQualifierOffset() + qualCommonPrefix, right.getQualifierLength() - qualCommonPrefix); 105 } 106 107 protected static class SeekerState { 108 protected ByteBuff currentBuffer; 109 protected TagCompressionContext tagCompressionContext; 110 protected int valueOffset = -1; 111 protected int keyLength; 112 protected int valueLength; 113 protected int lastCommonPrefix; 114 protected int tagsLength = 0; 115 protected int tagsOffset = -1; 116 protected int tagsCompressedLength = 0; 117 protected boolean uncompressTags = true; 118 119 /** We need to store a copy of the key. */ 120 protected byte[] keyBuffer = HConstants.EMPTY_BYTE_ARRAY; 121 protected byte[] tagsBuffer = HConstants.EMPTY_BYTE_ARRAY; 122 123 protected long memstoreTS; 124 protected int nextKvOffset; 125 protected KeyValue.KeyOnlyKeyValue currentKey = new KeyValue.KeyOnlyKeyValue(); 126 // A temp pair object which will be reused by ByteBuff#asSubByteBuffer calls. This avoids too 127 // many object creations. 128 private final ObjectIntPair<ByteBuffer> tmpPair; 129 private final boolean includeTags; 130 131 public SeekerState(ObjectIntPair<ByteBuffer> tmpPair, boolean includeTags) { 132 this.tmpPair = tmpPair; 133 this.includeTags = includeTags; 134 } 135 136 protected boolean isValid() { 137 return valueOffset != -1; 138 } 139 140 protected void invalidate() { 141 valueOffset = -1; 142 tagsCompressedLength = 0; 143 currentKey.clear(); 144 uncompressTags = true; 145 currentBuffer = null; 146 } 147 148 protected void ensureSpaceForKey() { 149 if (keyLength > keyBuffer.length) { 150 int newKeyBufferLength = 151 Integer.highestOneBit(Math.max(INITIAL_KEY_BUFFER_SIZE, keyLength) - 1) << 1; 152 byte[] newKeyBuffer = new byte[newKeyBufferLength]; 153 System.arraycopy(keyBuffer, 0, newKeyBuffer, 0, keyBuffer.length); 154 keyBuffer = newKeyBuffer; 155 } 156 } 157 158 protected void ensureSpaceForTags() { 159 if (tagsLength > tagsBuffer.length) { 160 int newTagsBufferLength = 161 Integer.highestOneBit(Math.max(INITIAL_KEY_BUFFER_SIZE, tagsLength) - 1) << 1; 162 byte[] newTagsBuffer = new byte[newTagsBufferLength]; 163 System.arraycopy(tagsBuffer, 0, newTagsBuffer, 0, tagsBuffer.length); 164 tagsBuffer = newTagsBuffer; 165 } 166 } 167 168 protected void setKey(byte[] keyBuffer, long memTS) { 169 currentKey.setKey(keyBuffer, 0, keyLength); 170 memstoreTS = memTS; 171 } 172 173 /** 174 * Copy the state from the next one into this instance (the previous state placeholder). Used to 175 * save the previous state when we are advancing the seeker to the next key/value. 176 */ 177 protected void copyFromNext(SeekerState nextState) { 178 if (keyBuffer.length != nextState.keyBuffer.length) { 179 keyBuffer = nextState.keyBuffer.clone(); 180 } else if (!isValid()) { 181 // Note: we can only call isValid before we override our state, so this 182 // comes before all the assignments at the end of this method. 183 System.arraycopy(nextState.keyBuffer, 0, keyBuffer, 0, nextState.keyLength); 184 } else { 185 // don't copy the common prefix between this key and the previous one 186 System.arraycopy(nextState.keyBuffer, nextState.lastCommonPrefix, keyBuffer, 187 nextState.lastCommonPrefix, nextState.keyLength - nextState.lastCommonPrefix); 188 } 189 currentKey.set(nextState.currentKey); 190 191 valueOffset = nextState.valueOffset; 192 keyLength = nextState.keyLength; 193 valueLength = nextState.valueLength; 194 lastCommonPrefix = nextState.lastCommonPrefix; 195 nextKvOffset = nextState.nextKvOffset; 196 memstoreTS = nextState.memstoreTS; 197 currentBuffer = nextState.currentBuffer; 198 tagsOffset = nextState.tagsOffset; 199 tagsLength = nextState.tagsLength; 200 if (nextState.tagCompressionContext != null) { 201 tagCompressionContext = nextState.tagCompressionContext; 202 } 203 } 204 205 public Cell toCell() { 206 // Buffer backing the value and tags part from the HFileBlock's buffer 207 // When tag compression in use, this will be only the value bytes area. 208 ByteBuffer valAndTagsBuffer; 209 int vOffset; 210 int valAndTagsLength = this.valueLength; 211 int tagsLenSerializationSize = 0; 212 if (this.includeTags && this.tagCompressionContext == null) { 213 // Include the tags part also. This will be the tags bytes + 2 bytes of for storing tags 214 // length 215 tagsLenSerializationSize = this.tagsOffset - (this.valueOffset + this.valueLength); 216 valAndTagsLength += tagsLenSerializationSize + this.tagsLength; 217 } 218 this.currentBuffer.asSubByteBuffer(this.valueOffset, valAndTagsLength, this.tmpPair); 219 valAndTagsBuffer = this.tmpPair.getFirst(); 220 vOffset = this.tmpPair.getSecond();// This is the offset to value part in the BB 221 if (valAndTagsBuffer.hasArray()) { 222 return toOnheapCell(valAndTagsBuffer, vOffset, tagsLenSerializationSize); 223 } else { 224 return toOffheapCell(valAndTagsBuffer, vOffset, tagsLenSerializationSize); 225 } 226 } 227 228 private Cell toOnheapCell(ByteBuffer valAndTagsBuffer, int vOffset, 229 int tagsLenSerializationSize) { 230 byte[] tagsArray = HConstants.EMPTY_BYTE_ARRAY; 231 int tOffset = 0; 232 if (this.includeTags) { 233 if (this.tagCompressionContext == null) { 234 tagsArray = valAndTagsBuffer.array(); 235 tOffset = 236 valAndTagsBuffer.arrayOffset() + vOffset + this.valueLength + tagsLenSerializationSize; 237 } else { 238 tagsArray = Bytes.copy(tagsBuffer, 0, this.tagsLength); 239 tOffset = 0; 240 } 241 } 242 return new OnheapDecodedCell(Bytes.copy(keyBuffer, 0, this.keyLength), 243 currentKey.getRowLength(), currentKey.getFamilyOffset(), currentKey.getFamilyLength(), 244 currentKey.getQualifierOffset(), currentKey.getQualifierLength(), currentKey.getTimestamp(), 245 currentKey.getTypeByte(), valAndTagsBuffer.array(), 246 valAndTagsBuffer.arrayOffset() + vOffset, this.valueLength, memstoreTS, tagsArray, tOffset, 247 this.tagsLength); 248 } 249 250 private Cell toOffheapCell(ByteBuffer valAndTagsBuffer, int vOffset, 251 int tagsLenSerializationSize) { 252 ByteBuffer tagsBuf = HConstants.EMPTY_BYTE_BUFFER; 253 int tOffset = 0; 254 if (this.includeTags) { 255 if (this.tagCompressionContext == null) { 256 tagsBuf = valAndTagsBuffer; 257 tOffset = vOffset + this.valueLength + tagsLenSerializationSize; 258 } else { 259 tagsBuf = ByteBuffer.wrap(Bytes.copy(tagsBuffer, 0, this.tagsLength)); 260 tOffset = 0; 261 } 262 } 263 return new OffheapDecodedExtendedCell( 264 ByteBuffer.wrap(Bytes.copy(keyBuffer, 0, this.keyLength)), currentKey.getRowLength(), 265 currentKey.getFamilyOffset(), currentKey.getFamilyLength(), currentKey.getQualifierOffset(), 266 currentKey.getQualifierLength(), currentKey.getTimestamp(), currentKey.getTypeByte(), 267 valAndTagsBuffer, vOffset, this.valueLength, memstoreTS, tagsBuf, tOffset, this.tagsLength); 268 } 269 } 270 271 /** 272 * Copies only the key part of the keybuffer by doing a deep copy and passes the seeker state 273 * members for taking a clone. Note that the value byte[] part is still pointing to the 274 * currentBuffer and represented by the valueOffset and valueLength 275 */ 276 // We return this as a Cell to the upper layers of read flow and might try setting a new SeqId 277 // there. So this has to be an instance of ExtendedCell. 278 protected static class OnheapDecodedCell implements ExtendedCell { 279 private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT 280 + (3 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT) 281 + Bytes.SIZEOF_SHORT + (2 * Bytes.SIZEOF_BYTE) + (3 * ClassSize.ARRAY)); 282 private byte[] keyOnlyBuffer; 283 private short rowLength; 284 private int familyOffset; 285 private byte familyLength; 286 private int qualifierOffset; 287 private int qualifierLength; 288 private long timeStamp; 289 private byte typeByte; 290 private byte[] valueBuffer; 291 private int valueOffset; 292 private int valueLength; 293 private byte[] tagsBuffer; 294 private int tagsOffset; 295 private int tagsLength; 296 private long seqId; 297 298 protected OnheapDecodedCell(byte[] keyBuffer, short rowLength, int familyOffset, 299 byte familyLength, int qualOffset, int qualLength, long timeStamp, byte typeByte, 300 byte[] valueBuffer, int valueOffset, int valueLen, long seqId, byte[] tagsBuffer, 301 int tagsOffset, int tagsLength) { 302 this.keyOnlyBuffer = keyBuffer; 303 this.rowLength = rowLength; 304 this.familyOffset = familyOffset; 305 this.familyLength = familyLength; 306 this.qualifierOffset = qualOffset; 307 this.qualifierLength = qualLength; 308 this.timeStamp = timeStamp; 309 this.typeByte = typeByte; 310 this.valueBuffer = valueBuffer; 311 this.valueOffset = valueOffset; 312 this.valueLength = valueLen; 313 this.tagsBuffer = tagsBuffer; 314 this.tagsOffset = tagsOffset; 315 this.tagsLength = tagsLength; 316 setSequenceId(seqId); 317 } 318 319 @Override 320 public byte[] getRowArray() { 321 return keyOnlyBuffer; 322 } 323 324 @Override 325 public byte[] getFamilyArray() { 326 return keyOnlyBuffer; 327 } 328 329 @Override 330 public byte[] getQualifierArray() { 331 return keyOnlyBuffer; 332 } 333 334 @Override 335 public int getRowOffset() { 336 return Bytes.SIZEOF_SHORT; 337 } 338 339 @Override 340 public short getRowLength() { 341 return rowLength; 342 } 343 344 @Override 345 public int getFamilyOffset() { 346 return familyOffset; 347 } 348 349 @Override 350 public byte getFamilyLength() { 351 return familyLength; 352 } 353 354 @Override 355 public int getQualifierOffset() { 356 return qualifierOffset; 357 } 358 359 @Override 360 public int getQualifierLength() { 361 return qualifierLength; 362 } 363 364 @Override 365 public long getTimestamp() { 366 return timeStamp; 367 } 368 369 @Override 370 public byte getTypeByte() { 371 return typeByte; 372 } 373 374 @Override 375 public long getSequenceId() { 376 return seqId; 377 } 378 379 @Override 380 public byte[] getValueArray() { 381 return this.valueBuffer; 382 } 383 384 @Override 385 public int getValueOffset() { 386 return valueOffset; 387 } 388 389 @Override 390 public int getValueLength() { 391 return valueLength; 392 } 393 394 @Override 395 public byte[] getTagsArray() { 396 return this.tagsBuffer; 397 } 398 399 @Override 400 public int getTagsOffset() { 401 return this.tagsOffset; 402 } 403 404 @Override 405 public int getTagsLength() { 406 return tagsLength; 407 } 408 409 @Override 410 public String toString() { 411 return KeyValue.keyToString(this.keyOnlyBuffer, 0, KeyValueUtil.keyLength(this)) + "/vlen=" 412 + getValueLength() + "/seqid=" + seqId; 413 } 414 415 @Override 416 public void setSequenceId(long seqId) { 417 this.seqId = seqId; 418 } 419 420 @Override 421 public long heapSize() { 422 return FIXED_OVERHEAD + rowLength + familyLength + qualifierLength + valueLength + tagsLength; 423 } 424 425 @Override 426 public int write(OutputStream out, boolean withTags) throws IOException { 427 int lenToWrite = getSerializedSize(withTags); 428 ByteBufferUtils.putInt(out, keyOnlyBuffer.length); 429 ByteBufferUtils.putInt(out, valueLength); 430 // Write key 431 out.write(keyOnlyBuffer); 432 // Write value 433 out.write(this.valueBuffer, this.valueOffset, this.valueLength); 434 if (withTags && this.tagsLength > 0) { 435 // 2 bytes tags length followed by tags bytes 436 // tags length is serialized with 2 bytes only(short way) even if the type is int. 437 // As this is non -ve numbers, we save the sign bit. See HBASE-11437 438 out.write((byte) (0xff & (this.tagsLength >> 8))); 439 out.write((byte) (0xff & this.tagsLength)); 440 out.write(this.tagsBuffer, this.tagsOffset, this.tagsLength); 441 } 442 return lenToWrite; 443 } 444 445 @Override 446 public int getSerializedSize(boolean withTags) { 447 return KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength, tagsLength, 448 withTags); 449 } 450 451 @Override 452 public void write(ByteBuffer buf, int offset) { 453 // This is not used in actual flow. Throwing UnsupportedOperationException 454 throw new UnsupportedOperationException(); 455 } 456 457 @Override 458 public void setTimestamp(long ts) throws IOException { 459 // This is not used in actual flow. Throwing UnsupportedOperationException 460 throw new UnsupportedOperationException(); 461 } 462 463 @Override 464 public void setTimestamp(byte[] ts) throws IOException { 465 // This is not used in actual flow. Throwing UnsupportedOperationException 466 throw new UnsupportedOperationException(); 467 } 468 469 @Override 470 public ExtendedCell deepClone() { 471 // This is not used in actual flow. Throwing UnsupportedOperationException 472 throw new UnsupportedOperationException(); 473 } 474 } 475 476 protected static class OffheapDecodedExtendedCell extends ByteBufferExtendedCell { 477 private static final long FIXED_OVERHEAD = 478 (long) ClassSize.align(ClassSize.OBJECT + (3 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) 479 + (7 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) 480 + (3 * ClassSize.BYTE_BUFFER); 481 private ByteBuffer keyBuffer; 482 private short rowLength; 483 private int familyOffset; 484 private byte familyLength; 485 private int qualifierOffset; 486 private int qualifierLength; 487 private long timeStamp; 488 private byte typeByte; 489 private ByteBuffer valueBuffer; 490 private int valueOffset; 491 private int valueLength; 492 private ByteBuffer tagsBuffer; 493 private int tagsOffset; 494 private int tagsLength; 495 private long seqId; 496 497 protected OffheapDecodedExtendedCell(ByteBuffer keyBuffer, short rowLength, int familyOffset, 498 byte familyLength, int qualOffset, int qualLength, long timeStamp, byte typeByte, 499 ByteBuffer valueBuffer, int valueOffset, int valueLen, long seqId, ByteBuffer tagsBuffer, 500 int tagsOffset, int tagsLength) { 501 // The keyBuffer is always onheap 502 assert keyBuffer.hasArray(); 503 assert keyBuffer.arrayOffset() == 0; 504 this.keyBuffer = keyBuffer; 505 this.rowLength = rowLength; 506 this.familyOffset = familyOffset; 507 this.familyLength = familyLength; 508 this.qualifierOffset = qualOffset; 509 this.qualifierLength = qualLength; 510 this.timeStamp = timeStamp; 511 this.typeByte = typeByte; 512 this.valueBuffer = valueBuffer; 513 this.valueOffset = valueOffset; 514 this.valueLength = valueLen; 515 this.tagsBuffer = tagsBuffer; 516 this.tagsOffset = tagsOffset; 517 this.tagsLength = tagsLength; 518 setSequenceId(seqId); 519 } 520 521 @Override 522 @SuppressWarnings("ByteBufferBackingArray") 523 public byte[] getRowArray() { 524 return this.keyBuffer.array(); 525 } 526 527 @Override 528 public int getRowOffset() { 529 return getRowPosition(); 530 } 531 532 @Override 533 public short getRowLength() { 534 return this.rowLength; 535 } 536 537 @Override 538 @SuppressWarnings("ByteBufferBackingArray") 539 public byte[] getFamilyArray() { 540 return this.keyBuffer.array(); 541 } 542 543 @Override 544 public int getFamilyOffset() { 545 return getFamilyPosition(); 546 } 547 548 @Override 549 public byte getFamilyLength() { 550 return this.familyLength; 551 } 552 553 @Override 554 @SuppressWarnings("ByteBufferBackingArray") 555 public byte[] getQualifierArray() { 556 return this.keyBuffer.array(); 557 } 558 559 @Override 560 public int getQualifierOffset() { 561 return getQualifierPosition(); 562 } 563 564 @Override 565 public int getQualifierLength() { 566 return this.qualifierLength; 567 } 568 569 @Override 570 public long getTimestamp() { 571 return this.timeStamp; 572 } 573 574 @Override 575 public byte getTypeByte() { 576 return this.typeByte; 577 } 578 579 @Override 580 public long getSequenceId() { 581 return this.seqId; 582 } 583 584 @Override 585 public byte[] getValueArray() { 586 return CellUtil.cloneValue(this); 587 } 588 589 @Override 590 public int getValueOffset() { 591 return 0; 592 } 593 594 @Override 595 public int getValueLength() { 596 return this.valueLength; 597 } 598 599 @Override 600 public byte[] getTagsArray() { 601 return CellUtil.cloneTags(this); 602 } 603 604 @Override 605 public int getTagsOffset() { 606 return 0; 607 } 608 609 @Override 610 public int getTagsLength() { 611 return this.tagsLength; 612 } 613 614 @Override 615 public ByteBuffer getRowByteBuffer() { 616 return this.keyBuffer; 617 } 618 619 @Override 620 public int getRowPosition() { 621 return Bytes.SIZEOF_SHORT; 622 } 623 624 @Override 625 public ByteBuffer getFamilyByteBuffer() { 626 return this.keyBuffer; 627 } 628 629 @Override 630 public int getFamilyPosition() { 631 return this.familyOffset; 632 } 633 634 @Override 635 public ByteBuffer getQualifierByteBuffer() { 636 return this.keyBuffer; 637 } 638 639 @Override 640 public int getQualifierPosition() { 641 return this.qualifierOffset; 642 } 643 644 @Override 645 public ByteBuffer getValueByteBuffer() { 646 return this.valueBuffer; 647 } 648 649 @Override 650 public int getValuePosition() { 651 return this.valueOffset; 652 } 653 654 @Override 655 public ByteBuffer getTagsByteBuffer() { 656 return this.tagsBuffer; 657 } 658 659 @Override 660 public int getTagsPosition() { 661 return this.tagsOffset; 662 } 663 664 @Override 665 public long heapSize() { 666 return FIXED_OVERHEAD; 667 } 668 669 @Override 670 public void setSequenceId(long seqId) { 671 this.seqId = seqId; 672 } 673 674 @Override 675 public int write(OutputStream out, boolean withTags) throws IOException { 676 int lenToWrite = getSerializedSize(withTags); 677 ByteBufferUtils.putInt(out, keyBuffer.remaining()); 678 ByteBufferUtils.putInt(out, valueLength); 679 // Write key 680 out.write(keyBuffer.array(), keyBuffer.arrayOffset(), keyBuffer.remaining()); 681 // Write value 682 ByteBufferUtils.copyBufferToStream(out, this.valueBuffer, this.valueOffset, this.valueLength); 683 if (withTags && this.tagsLength > 0) { 684 // 2 bytes tags length followed by tags bytes 685 // tags length is serialized with 2 bytes only(short way) even if the type is int. 686 // As this is non -ve numbers, we save the sign bit. See HBASE-11437 687 out.write((byte) (0xff & (this.tagsLength >> 8))); 688 out.write((byte) (0xff & this.tagsLength)); 689 ByteBufferUtils.copyBufferToStream(out, this.tagsBuffer, this.tagsOffset, this.tagsLength); 690 } 691 return lenToWrite; 692 } 693 694 @Override 695 public int getSerializedSize(boolean withTags) { 696 return KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength, tagsLength, 697 withTags); 698 } 699 700 @Override 701 public void setTimestamp(long ts) throws IOException { 702 // This is not used in actual flow. Throwing UnsupportedOperationException 703 throw new UnsupportedOperationException(); 704 } 705 706 @Override 707 public void setTimestamp(byte[] ts) throws IOException { 708 // This is not used in actual flow. Throwing UnsupportedOperationException 709 throw new UnsupportedOperationException(); 710 } 711 712 @Override 713 public void write(ByteBuffer buf, int offset) { 714 // This is not used in actual flow. Throwing UnsupportedOperationException 715 throw new UnsupportedOperationException(); 716 } 717 718 @Override 719 public ExtendedCell deepClone() { 720 // This is not used in actual flow. Throwing UnsupportedOperationException 721 throw new UnsupportedOperationException(); 722 } 723 } 724 725 protected abstract static class BufferedEncodedSeeker<STATE extends SeekerState> 726 extends AbstractEncodedSeeker { 727 protected ByteBuff currentBuffer; 728 protected TagCompressionContext tagCompressionContext = null; 729 protected KeyValue.KeyOnlyKeyValue keyOnlyKV = new KeyValue.KeyOnlyKeyValue(); 730 // A temp pair object which will be reused by ByteBuff#asSubByteBuffer calls. This avoids too 731 // many object creations. 732 protected final ObjectIntPair<ByteBuffer> tmpPair = new ObjectIntPair<>(); 733 protected STATE current, previous; 734 735 public BufferedEncodedSeeker(HFileBlockDecodingContext decodingCtx) { 736 super(decodingCtx); 737 if (decodingCtx.getHFileContext().isCompressTags()) { 738 try { 739 tagCompressionContext = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE); 740 } catch (Exception e) { 741 throw new RuntimeException("Failed to initialize TagCompressionContext", e); 742 } 743 } 744 current = createSeekerState(); // always valid 745 previous = createSeekerState(); // may not be valid 746 } 747 748 @Override 749 public int compareKey(CellComparator comparator, Cell key) { 750 keyOnlyKV.setKey(current.keyBuffer, 0, current.keyLength); 751 return PrivateCellUtil.compareKeyIgnoresMvcc(comparator, key, keyOnlyKV); 752 } 753 754 @Override 755 public void setCurrentBuffer(ByteBuff buffer) { 756 if (this.tagCompressionContext != null) { 757 this.tagCompressionContext.clear(); 758 } 759 currentBuffer = buffer; 760 current.currentBuffer = currentBuffer; 761 if (tagCompressionContext != null) { 762 current.tagCompressionContext = tagCompressionContext; 763 } 764 decodeFirst(); 765 current.setKey(current.keyBuffer, current.memstoreTS); 766 previous.invalidate(); 767 } 768 769 @Override 770 public Cell getKey() { 771 byte[] key = new byte[current.keyLength]; 772 System.arraycopy(current.keyBuffer, 0, key, 0, current.keyLength); 773 return new KeyValue.KeyOnlyKeyValue(key); 774 } 775 776 @Override 777 public ByteBuffer getValueShallowCopy() { 778 currentBuffer.asSubByteBuffer(current.valueOffset, current.valueLength, tmpPair); 779 ByteBuffer dup = tmpPair.getFirst().duplicate(); 780 dup.position(tmpPair.getSecond()); 781 dup.limit(tmpPair.getSecond() + current.valueLength); 782 return dup.slice(); 783 } 784 785 @Override 786 public Cell getCell() { 787 return current.toCell(); 788 } 789 790 @Override 791 public void rewind() { 792 currentBuffer.rewind(); 793 if (tagCompressionContext != null) { 794 tagCompressionContext.clear(); 795 } 796 decodeFirst(); 797 current.setKey(current.keyBuffer, current.memstoreTS); 798 previous.invalidate(); 799 } 800 801 @Override 802 public boolean next() { 803 if (!currentBuffer.hasRemaining()) { 804 return false; 805 } 806 decodeNext(); 807 current.setKey(current.keyBuffer, current.memstoreTS); 808 previous.invalidate(); 809 return true; 810 } 811 812 protected void decodeTags() { 813 current.tagsLength = ByteBuff.readCompressedInt(currentBuffer); 814 if (tagCompressionContext != null) { 815 if (current.uncompressTags) { 816 // Tag compression is been used. uncompress it into tagsBuffer 817 current.ensureSpaceForTags(); 818 try { 819 current.tagsCompressedLength = tagCompressionContext.uncompressTags(currentBuffer, 820 current.tagsBuffer, 0, current.tagsLength); 821 } catch (IOException e) { 822 throw new RuntimeException("Exception while uncompressing tags", e); 823 } 824 } else { 825 currentBuffer.skip(current.tagsCompressedLength); 826 current.uncompressTags = true;// Reset this. 827 } 828 current.tagsOffset = -1; 829 } else { 830 // When tag compress is not used, let us not do copying of tags bytes into tagsBuffer. 831 // Just mark the tags Offset so as to create the KV buffer later in getKeyValueBuffer() 832 current.tagsOffset = currentBuffer.position(); 833 currentBuffer.skip(current.tagsLength); 834 } 835 } 836 837 @Override 838 public int seekToKeyInBlock(Cell seekCell, boolean seekBefore) { 839 int rowCommonPrefix = 0; 840 int familyCommonPrefix = 0; 841 int qualCommonPrefix = 0; 842 previous.invalidate(); 843 do { 844 int comp; 845 keyOnlyKV.setKey(current.keyBuffer, 0, current.keyLength); 846 if (current.lastCommonPrefix != 0) { 847 // The KV format has row key length also in the byte array. The 848 // common prefix 849 // includes it. So we need to subtract to find out the common prefix 850 // in the 851 // row part alone 852 rowCommonPrefix = Math.min(rowCommonPrefix, current.lastCommonPrefix - 2); 853 } 854 if (current.lastCommonPrefix <= 2) { 855 rowCommonPrefix = 0; 856 } 857 rowCommonPrefix += findCommonPrefixInRowPart(seekCell, keyOnlyKV, rowCommonPrefix); 858 comp = compareCommonRowPrefix(seekCell, keyOnlyKV, rowCommonPrefix); 859 if (comp == 0) { 860 comp = compareTypeBytes(seekCell, keyOnlyKV); 861 if (comp == 0) { 862 // Subtract the fixed row key length and the family key fixed length 863 familyCommonPrefix = Math.max(0, Math.min(familyCommonPrefix, 864 current.lastCommonPrefix - (3 + keyOnlyKV.getRowLength()))); 865 familyCommonPrefix += 866 findCommonPrefixInFamilyPart(seekCell, keyOnlyKV, familyCommonPrefix); 867 comp = compareCommonFamilyPrefix(seekCell, keyOnlyKV, familyCommonPrefix); 868 if (comp == 0) { 869 // subtract the rowkey fixed length and the family key fixed 870 // length 871 qualCommonPrefix = Math.max(0, Math.min(qualCommonPrefix, current.lastCommonPrefix 872 - (3 + keyOnlyKV.getRowLength() + keyOnlyKV.getFamilyLength()))); 873 qualCommonPrefix += 874 findCommonPrefixInQualifierPart(seekCell, keyOnlyKV, qualCommonPrefix); 875 comp = compareCommonQualifierPrefix(seekCell, keyOnlyKV, qualCommonPrefix); 876 if (comp == 0) { 877 comp = CellComparator.getInstance().compareTimestamps(seekCell, keyOnlyKV); 878 if (comp == 0) { 879 // Compare types. Let the delete types sort ahead of puts; 880 // i.e. types 881 // of higher numbers sort before those of lesser numbers. 882 // Maximum 883 // (255) 884 // appears ahead of everything, and minimum (0) appears 885 // after 886 // everything. 887 comp = (0xff & keyOnlyKV.getTypeByte()) - (0xff & seekCell.getTypeByte()); 888 } 889 } 890 } 891 } 892 } 893 if (comp == 0) { // exact match 894 if (seekBefore) { 895 if (!previous.isValid()) { 896 // The caller (seekBefore) has to ensure that we are not at the 897 // first key in the block. 898 throw new IllegalStateException( 899 "Cannot seekBefore if " + "positioned at the first key in the block: key=" 900 + Bytes.toStringBinary(seekCell.getRowArray())); 901 } 902 moveToPrevious(); 903 return 1; 904 } 905 return 0; 906 } 907 908 if (comp < 0) { // already too large, check previous 909 if (previous.isValid()) { 910 moveToPrevious(); 911 } else { 912 return HConstants.INDEX_KEY_MAGIC; // using optimized index key 913 } 914 return 1; 915 } 916 917 // move to next, if more data is available 918 if (currentBuffer.hasRemaining()) { 919 previous.copyFromNext(current); 920 decodeNext(); 921 current.setKey(current.keyBuffer, current.memstoreTS); 922 } else { 923 break; 924 } 925 } while (true); 926 927 // we hit the end of the block, not an exact match 928 return 1; 929 } 930 931 private int compareTypeBytes(Cell key, Cell right) { 932 if ( 933 key.getFamilyLength() + key.getQualifierLength() == 0 934 && key.getTypeByte() == KeyValue.Type.Minimum.getCode() 935 ) { 936 // left is "bigger", i.e. it appears later in the sorted order 937 return 1; 938 } 939 if ( 940 right.getFamilyLength() + right.getQualifierLength() == 0 941 && right.getTypeByte() == KeyValue.Type.Minimum.getCode() 942 ) { 943 return -1; 944 } 945 return 0; 946 } 947 948 private static int findCommonPrefixInRowPart(Cell left, Cell right, int rowCommonPrefix) { 949 return Bytes.findCommonPrefix(left.getRowArray(), right.getRowArray(), 950 left.getRowLength() - rowCommonPrefix, right.getRowLength() - rowCommonPrefix, 951 left.getRowOffset() + rowCommonPrefix, right.getRowOffset() + rowCommonPrefix); 952 } 953 954 private static int findCommonPrefixInFamilyPart(Cell left, Cell right, int familyCommonPrefix) { 955 return Bytes.findCommonPrefix(left.getFamilyArray(), right.getFamilyArray(), 956 left.getFamilyLength() - familyCommonPrefix, right.getFamilyLength() - familyCommonPrefix, 957 left.getFamilyOffset() + familyCommonPrefix, right.getFamilyOffset() + familyCommonPrefix); 958 } 959 960 private static int findCommonPrefixInQualifierPart(Cell left, Cell right, 961 int qualifierCommonPrefix) { 962 return Bytes.findCommonPrefix(left.getQualifierArray(), right.getQualifierArray(), 963 left.getQualifierLength() - qualifierCommonPrefix, 964 right.getQualifierLength() - qualifierCommonPrefix, 965 left.getQualifierOffset() + qualifierCommonPrefix, 966 right.getQualifierOffset() + qualifierCommonPrefix); 967 } 968 969 private void moveToPrevious() { 970 if (!previous.isValid()) { 971 throw new IllegalStateException( 972 "Can move back only once and not in first key in the block."); 973 } 974 975 STATE tmp = previous; 976 previous = current; 977 current = tmp; 978 979 // move after last key value 980 currentBuffer.position(current.nextKvOffset); 981 // Already decoded the tag bytes. We cache this tags into current state and also the total 982 // compressed length of the tags bytes. For the next time decodeNext() we don't need to decode 983 // the tags again. This might pollute the Data Dictionary what we use for the compression. 984 // When current.uncompressTags is false, we will just reuse the current.tagsBuffer and skip 985 // 'tagsCompressedLength' bytes of source stream. 986 // See in decodeTags() 987 current.tagsBuffer = previous.tagsBuffer; 988 current.tagsCompressedLength = previous.tagsCompressedLength; 989 current.uncompressTags = false; 990 // The current key has to be reset with the previous Cell 991 current.setKey(current.keyBuffer, current.memstoreTS); 992 previous.invalidate(); 993 } 994 995 @SuppressWarnings("unchecked") 996 protected STATE createSeekerState() { 997 // This will fail for non-default seeker state if the subclass does not 998 // override this method. 999 return (STATE) new SeekerState(this.tmpPair, this.includesTags()); 1000 } 1001 1002 abstract protected void decodeFirst(); 1003 1004 abstract protected void decodeNext(); 1005 } 1006 1007 /** Returns unencoded size added */ 1008 protected final int afterEncodingKeyValue(Cell cell, DataOutputStream out, 1009 HFileBlockDefaultEncodingContext encodingCtx) throws IOException { 1010 int size = 0; 1011 if (encodingCtx.getHFileContext().isIncludesTags()) { 1012 int tagsLength = cell.getTagsLength(); 1013 ByteBufferUtils.putCompressedInt(out, tagsLength); 1014 // There are some tags to be written 1015 if (tagsLength > 0) { 1016 TagCompressionContext tagCompressionContext = encodingCtx.getTagCompressionContext(); 1017 // When tag compression is enabled, tagCompressionContext will have a not null value. Write 1018 // the tags using Dictionary compression in such a case 1019 if (tagCompressionContext != null) { 1020 // Not passing tagsLength considering that parsing of the tagsLength is not costly 1021 PrivateCellUtil.compressTags(out, cell, tagCompressionContext); 1022 } else { 1023 PrivateCellUtil.writeTags(out, cell, tagsLength); 1024 } 1025 } 1026 size += tagsLength + KeyValue.TAGS_LENGTH_SIZE; 1027 } 1028 if (encodingCtx.getHFileContext().isIncludesMvcc()) { 1029 // Copy memstore timestamp from the byte buffer to the output stream. 1030 long memstoreTS = cell.getSequenceId(); 1031 WritableUtils.writeVLong(out, memstoreTS); 1032 // TODO use a writeVLong which returns the #bytes written so that 2 time parsing can be 1033 // avoided. 1034 size += WritableUtils.getVIntSize(memstoreTS); 1035 } 1036 return size; 1037 } 1038 1039 protected final void afterDecodingKeyValue(DataInputStream source, ByteBuffer dest, 1040 HFileBlockDefaultDecodingContext decodingCtx) throws IOException { 1041 if (decodingCtx.getHFileContext().isIncludesTags()) { 1042 int tagsLength = ByteBufferUtils.readCompressedInt(source); 1043 // Put as unsigned short 1044 dest.put((byte) ((tagsLength >> 8) & 0xff)); 1045 dest.put((byte) (tagsLength & 0xff)); 1046 if (tagsLength > 0) { 1047 TagCompressionContext tagCompressionContext = decodingCtx.getTagCompressionContext(); 1048 // When tag compression is been used in this file, tagCompressionContext will have a not 1049 // null value passed. 1050 if (tagCompressionContext != null) { 1051 tagCompressionContext.uncompressTags(source, dest, tagsLength); 1052 } else { 1053 ByteBufferUtils.copyFromStreamToBuffer(dest, source, tagsLength); 1054 } 1055 } 1056 } 1057 if (decodingCtx.getHFileContext().isIncludesMvcc()) { 1058 long memstoreTS = -1; 1059 try { 1060 // Copy memstore timestamp from the data input stream to the byte 1061 // buffer. 1062 memstoreTS = WritableUtils.readVLong(source); 1063 ByteBufferUtils.writeVLong(dest, memstoreTS); 1064 } catch (IOException ex) { 1065 throw new RuntimeException( 1066 "Unable to copy memstore timestamp " + memstoreTS + " after decoding a key/value"); 1067 } 1068 } 1069 } 1070 1071 protected abstract ByteBuffer internalDecodeKeyValues(DataInputStream source, 1072 int allocateHeaderLength, int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx) 1073 throws IOException; 1074 1075 /** 1076 * Asserts that there is at least the given amount of unfilled space remaining in the given 1077 * buffer. 1078 * @param out typically, the buffer we are writing to 1079 * @param length the required space in the buffer 1080 * @throws EncoderBufferTooSmallException If there are no enough bytes. 1081 */ 1082 protected static void ensureSpace(ByteBuffer out, int length) 1083 throws EncoderBufferTooSmallException { 1084 if (out.position() + length > out.limit()) { 1085 throw new EncoderBufferTooSmallException("Buffer position=" + out.position() 1086 + ", buffer limit=" + out.limit() + ", length to be written=" + length); 1087 } 1088 } 1089 1090 @Override 1091 public void startBlockEncoding(HFileBlockEncodingContext blkEncodingCtx, DataOutputStream out) 1092 throws IOException { 1093 if (blkEncodingCtx.getClass() != HFileBlockDefaultEncodingContext.class) { 1094 throw new IOException(this.getClass().getName() + " only accepts " 1095 + HFileBlockDefaultEncodingContext.class.getName() + " as the " + "encoding context."); 1096 } 1097 1098 HFileBlockDefaultEncodingContext encodingCtx = 1099 (HFileBlockDefaultEncodingContext) blkEncodingCtx; 1100 encodingCtx.prepareEncoding(out); 1101 if ( 1102 encodingCtx.getHFileContext().isIncludesTags() 1103 && encodingCtx.getHFileContext().isCompressTags() 1104 ) { 1105 if (encodingCtx.getTagCompressionContext() != null) { 1106 // It will be overhead to create the TagCompressionContext again and again for every block 1107 // encoding. 1108 encodingCtx.getTagCompressionContext().clear(); 1109 } else { 1110 try { 1111 TagCompressionContext tagCompressionContext = 1112 new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE); 1113 encodingCtx.setTagCompressionContext(tagCompressionContext); 1114 } catch (Exception e) { 1115 throw new IOException("Failed to initialize TagCompressionContext", e); 1116 } 1117 } 1118 } 1119 StreamUtils.writeInt(out, 0); // DUMMY length. This will be updated in endBlockEncoding() 1120 blkEncodingCtx.setEncodingState(new EncodingState()); 1121 } 1122 1123 @Override 1124 public void encode(Cell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out) 1125 throws IOException { 1126 EncodingState state = encodingCtx.getEncodingState(); 1127 int posBeforeEncode = out.size(); 1128 int encodedKvSize = internalEncode(cell, (HFileBlockDefaultEncodingContext) encodingCtx, out); 1129 state.postCellEncode(encodedKvSize, out.size() - posBeforeEncode); 1130 } 1131 1132 public abstract int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingCtx, 1133 DataOutputStream out) throws IOException; 1134 1135 @Override 1136 public void endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out, 1137 byte[] uncompressedBytesWithHeader) throws IOException { 1138 EncodingState state = encodingCtx.getEncodingState(); 1139 // Write the unencodedDataSizeWritten (with header size) 1140 Bytes.putInt(uncompressedBytesWithHeader, 1141 HConstants.HFILEBLOCK_HEADER_SIZE + DataBlockEncoding.ID_SIZE, 1142 state.getUnencodedDataSizeWritten()); 1143 postEncoding(encodingCtx); 1144 } 1145 1146}