001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.encoding; 019 020import java.io.DataInputStream; 021import java.io.DataOutputStream; 022import java.io.IOException; 023import java.io.OutputStream; 024import java.nio.ByteBuffer; 025import org.apache.hadoop.hbase.ByteBufferExtendedCell; 026import org.apache.hadoop.hbase.Cell; 027import org.apache.hadoop.hbase.CellComparator; 028import org.apache.hadoop.hbase.CellUtil; 029import org.apache.hadoop.hbase.ExtendedCell; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.KeyValue; 032import org.apache.hadoop.hbase.KeyValueUtil; 033import org.apache.hadoop.hbase.PrivateCellUtil; 034import org.apache.hadoop.hbase.io.TagCompressionContext; 035import org.apache.hadoop.hbase.io.util.LRUDictionary; 036import org.apache.hadoop.hbase.io.util.StreamUtils; 037import org.apache.hadoop.hbase.nio.ByteBuff; 038import org.apache.hadoop.hbase.util.ByteBufferUtils; 039import org.apache.hadoop.hbase.util.Bytes; 040import org.apache.hadoop.hbase.util.ClassSize; 041import org.apache.hadoop.hbase.util.ObjectIntPair; 042import org.apache.hadoop.io.WritableUtils; 043import org.apache.yetus.audience.InterfaceAudience; 044 045/** 046 * Base class for all data block encoders that use a buffer. 047 */ 048@InterfaceAudience.Private 049abstract class BufferedDataBlockEncoder extends AbstractDataBlockEncoder { 050 /** 051 * TODO: This datablockencoder is dealing in internals of hfileblocks. Purge reference to HFBs 052 */ 053 private static int INITIAL_KEY_BUFFER_SIZE = 512; 054 055 @Override 056 public ByteBuffer decodeKeyValues(DataInputStream source, 057 HFileBlockDecodingContext blkDecodingCtx) throws IOException { 058 if (blkDecodingCtx.getClass() != HFileBlockDefaultDecodingContext.class) { 059 throw new IOException(this.getClass().getName() + " only accepts " 060 + HFileBlockDefaultDecodingContext.class.getName() + " as the decoding context."); 061 } 062 063 HFileBlockDefaultDecodingContext decodingCtx = 064 (HFileBlockDefaultDecodingContext) blkDecodingCtx; 065 if ( 066 decodingCtx.getHFileContext().isIncludesTags() 067 && decodingCtx.getHFileContext().isCompressTags() 068 ) { 069 if (decodingCtx.getTagCompressionContext() != null) { 070 // It will be overhead to create the TagCompressionContext again and again for every block 071 // decoding. 072 decodingCtx.getTagCompressionContext().clear(); 073 } else { 074 try { 075 TagCompressionContext tagCompressionContext = 076 new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE); 077 decodingCtx.setTagCompressionContext(tagCompressionContext); 078 } catch (Exception e) { 079 throw new IOException("Failed to initialize TagCompressionContext", e); 080 } 081 } 082 } 083 return internalDecodeKeyValues(source, 0, 0, decodingCtx); 084 } 085 086 /********************* common prefixes *************************/ 087 // Having this as static is fine but if META is having DBE then we should 088 // change this. 089 public static int compareCommonRowPrefix(Cell left, Cell right, int rowCommonPrefix) { 090 return Bytes.compareTo(left.getRowArray(), left.getRowOffset() + rowCommonPrefix, 091 left.getRowLength() - rowCommonPrefix, right.getRowArray(), 092 right.getRowOffset() + rowCommonPrefix, right.getRowLength() - rowCommonPrefix); 093 } 094 095 public static int compareCommonFamilyPrefix(Cell left, Cell right, int familyCommonPrefix) { 096 return Bytes.compareTo(left.getFamilyArray(), left.getFamilyOffset() + familyCommonPrefix, 097 left.getFamilyLength() - familyCommonPrefix, right.getFamilyArray(), 098 right.getFamilyOffset() + familyCommonPrefix, right.getFamilyLength() - familyCommonPrefix); 099 } 100 101 public static int compareCommonQualifierPrefix(Cell left, Cell right, int qualCommonPrefix) { 102 return Bytes.compareTo(left.getQualifierArray(), left.getQualifierOffset() + qualCommonPrefix, 103 left.getQualifierLength() - qualCommonPrefix, right.getQualifierArray(), 104 right.getQualifierOffset() + qualCommonPrefix, right.getQualifierLength() - qualCommonPrefix); 105 } 106 107 protected static class SeekerState { 108 protected ByteBuff currentBuffer; 109 protected TagCompressionContext tagCompressionContext; 110 protected int valueOffset = -1; 111 protected int keyLength; 112 protected int valueLength; 113 protected int lastCommonPrefix; 114 protected int tagsLength = 0; 115 protected int tagsOffset = -1; 116 protected int tagsCompressedLength = 0; 117 protected boolean uncompressTags = true; 118 119 /** We need to store a copy of the key. */ 120 protected byte[] keyBuffer = HConstants.EMPTY_BYTE_ARRAY; 121 protected byte[] tagsBuffer = HConstants.EMPTY_BYTE_ARRAY; 122 123 protected long memstoreTS; 124 protected int nextKvOffset; 125 protected KeyValue.KeyOnlyKeyValue currentKey = new KeyValue.KeyOnlyKeyValue(); 126 // A temp pair object which will be reused by ByteBuff#asSubByteBuffer calls. This avoids too 127 // many object creations. 128 private final ObjectIntPair<ByteBuffer> tmpPair; 129 private final boolean includeTags; 130 131 public SeekerState(ObjectIntPair<ByteBuffer> tmpPair, boolean includeTags) { 132 this.tmpPair = tmpPair; 133 this.includeTags = includeTags; 134 } 135 136 protected boolean isValid() { 137 return valueOffset != -1; 138 } 139 140 protected void invalidate() { 141 valueOffset = -1; 142 tagsCompressedLength = 0; 143 currentKey.clear(); 144 uncompressTags = true; 145 currentBuffer = null; 146 } 147 148 protected void ensureSpaceForKey() { 149 if (keyLength > keyBuffer.length) { 150 int newKeyBufferLength = 151 Integer.highestOneBit(Math.max(INITIAL_KEY_BUFFER_SIZE, keyLength) - 1) << 1; 152 byte[] newKeyBuffer = new byte[newKeyBufferLength]; 153 System.arraycopy(keyBuffer, 0, newKeyBuffer, 0, keyBuffer.length); 154 keyBuffer = newKeyBuffer; 155 } 156 } 157 158 protected void ensureSpaceForTags() { 159 if (tagsLength > tagsBuffer.length) { 160 int newTagsBufferLength = 161 Integer.highestOneBit(Math.max(INITIAL_KEY_BUFFER_SIZE, tagsLength) - 1) << 1; 162 byte[] newTagsBuffer = new byte[newTagsBufferLength]; 163 System.arraycopy(tagsBuffer, 0, newTagsBuffer, 0, tagsBuffer.length); 164 tagsBuffer = newTagsBuffer; 165 } 166 } 167 168 protected void setKey(byte[] keyBuffer, long memTS) { 169 currentKey.setKey(keyBuffer, 0, keyLength); 170 memstoreTS = memTS; 171 } 172 173 /** 174 * Copy the state from the next one into this instance (the previous state placeholder). Used to 175 * save the previous state when we are advancing the seeker to the next key/value. 176 */ 177 protected void copyFromNext(SeekerState nextState) { 178 if (keyBuffer.length != nextState.keyBuffer.length) { 179 keyBuffer = nextState.keyBuffer.clone(); 180 } else if (!isValid()) { 181 // Note: we can only call isValid before we override our state, so this 182 // comes before all the assignments at the end of this method. 183 System.arraycopy(nextState.keyBuffer, 0, keyBuffer, 0, nextState.keyLength); 184 } else { 185 // don't copy the common prefix between this key and the previous one 186 System.arraycopy(nextState.keyBuffer, nextState.lastCommonPrefix, keyBuffer, 187 nextState.lastCommonPrefix, nextState.keyLength - nextState.lastCommonPrefix); 188 } 189 currentKey.set(nextState.currentKey); 190 191 valueOffset = nextState.valueOffset; 192 keyLength = nextState.keyLength; 193 valueLength = nextState.valueLength; 194 lastCommonPrefix = nextState.lastCommonPrefix; 195 nextKvOffset = nextState.nextKvOffset; 196 memstoreTS = nextState.memstoreTS; 197 currentBuffer = nextState.currentBuffer; 198 tagsOffset = nextState.tagsOffset; 199 tagsLength = nextState.tagsLength; 200 if (nextState.tagCompressionContext != null) { 201 tagCompressionContext = nextState.tagCompressionContext; 202 } 203 } 204 205 public Cell toCell() { 206 // Buffer backing the value and tags part from the HFileBlock's buffer 207 // When tag compression in use, this will be only the value bytes area. 208 ByteBuffer valAndTagsBuffer; 209 int vOffset; 210 int valAndTagsLength = this.valueLength; 211 int tagsLenSerializationSize = 0; 212 if (this.includeTags && this.tagCompressionContext == null) { 213 // Include the tags part also. This will be the tags bytes + 2 bytes of for storing tags 214 // length 215 tagsLenSerializationSize = this.tagsOffset - (this.valueOffset + this.valueLength); 216 valAndTagsLength += tagsLenSerializationSize + this.tagsLength; 217 } 218 this.currentBuffer.asSubByteBuffer(this.valueOffset, valAndTagsLength, this.tmpPair); 219 valAndTagsBuffer = this.tmpPair.getFirst(); 220 vOffset = this.tmpPair.getSecond();// This is the offset to value part in the BB 221 if (valAndTagsBuffer.hasArray()) { 222 return toOnheapCell(valAndTagsBuffer, vOffset, tagsLenSerializationSize); 223 } else { 224 return toOffheapCell(valAndTagsBuffer, vOffset, tagsLenSerializationSize); 225 } 226 } 227 228 private Cell toOnheapCell(ByteBuffer valAndTagsBuffer, int vOffset, 229 int tagsLenSerializationSize) { 230 byte[] tagsArray = HConstants.EMPTY_BYTE_ARRAY; 231 int tOffset = 0; 232 if (this.includeTags) { 233 if (this.tagCompressionContext == null) { 234 tagsArray = valAndTagsBuffer.array(); 235 tOffset = 236 valAndTagsBuffer.arrayOffset() + vOffset + this.valueLength + tagsLenSerializationSize; 237 } else { 238 tagsArray = Bytes.copy(tagsBuffer, 0, this.tagsLength); 239 tOffset = 0; 240 } 241 } 242 return new OnheapDecodedCell(Bytes.copy(keyBuffer, 0, this.keyLength), 243 currentKey.getRowLength(), currentKey.getFamilyOffset(), currentKey.getFamilyLength(), 244 currentKey.getQualifierOffset(), currentKey.getQualifierLength(), currentKey.getTimestamp(), 245 currentKey.getTypeByte(), valAndTagsBuffer.array(), 246 valAndTagsBuffer.arrayOffset() + vOffset, this.valueLength, memstoreTS, tagsArray, tOffset, 247 this.tagsLength); 248 } 249 250 private Cell toOffheapCell(ByteBuffer valAndTagsBuffer, int vOffset, 251 int tagsLenSerializationSize) { 252 ByteBuffer tagsBuf = HConstants.EMPTY_BYTE_BUFFER; 253 int tOffset = 0; 254 if (this.includeTags) { 255 if (this.tagCompressionContext == null) { 256 tagsBuf = valAndTagsBuffer; 257 tOffset = vOffset + this.valueLength + tagsLenSerializationSize; 258 } else { 259 tagsBuf = ByteBuffer.wrap(Bytes.copy(tagsBuffer, 0, this.tagsLength)); 260 tOffset = 0; 261 } 262 } 263 return new OffheapDecodedExtendedCell( 264 ByteBuffer.wrap(Bytes.copy(keyBuffer, 0, this.keyLength)), currentKey.getRowLength(), 265 currentKey.getFamilyOffset(), currentKey.getFamilyLength(), currentKey.getQualifierOffset(), 266 currentKey.getQualifierLength(), currentKey.getTimestamp(), currentKey.getTypeByte(), 267 valAndTagsBuffer, vOffset, this.valueLength, memstoreTS, tagsBuf, tOffset, this.tagsLength); 268 } 269 } 270 271 /** 272 * Copies only the key part of the keybuffer by doing a deep copy and passes the seeker state 273 * members for taking a clone. Note that the value byte[] part is still pointing to the 274 * currentBuffer and represented by the valueOffset and valueLength 275 */ 276 // We return this as a Cell to the upper layers of read flow and might try setting a new SeqId 277 // there. So this has to be an instance of ExtendedCell. 278 protected static class OnheapDecodedCell implements ExtendedCell { 279 private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT 280 + (3 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT) 281 + Bytes.SIZEOF_SHORT + (2 * Bytes.SIZEOF_BYTE) + (3 * ClassSize.ARRAY)); 282 private byte[] keyOnlyBuffer; 283 private short rowLength; 284 private int familyOffset; 285 private byte familyLength; 286 private int qualifierOffset; 287 private int qualifierLength; 288 private long timeStamp; 289 private byte typeByte; 290 private byte[] valueBuffer; 291 private int valueOffset; 292 private int valueLength; 293 private byte[] tagsBuffer; 294 private int tagsOffset; 295 private int tagsLength; 296 private long seqId; 297 298 protected OnheapDecodedCell(byte[] keyBuffer, short rowLength, int familyOffset, 299 byte familyLength, int qualOffset, int qualLength, long timeStamp, byte typeByte, 300 byte[] valueBuffer, int valueOffset, int valueLen, long seqId, byte[] tagsBuffer, 301 int tagsOffset, int tagsLength) { 302 this.keyOnlyBuffer = keyBuffer; 303 this.rowLength = rowLength; 304 this.familyOffset = familyOffset; 305 this.familyLength = familyLength; 306 this.qualifierOffset = qualOffset; 307 this.qualifierLength = qualLength; 308 this.timeStamp = timeStamp; 309 this.typeByte = typeByte; 310 this.valueBuffer = valueBuffer; 311 this.valueOffset = valueOffset; 312 this.valueLength = valueLen; 313 this.tagsBuffer = tagsBuffer; 314 this.tagsOffset = tagsOffset; 315 this.tagsLength = tagsLength; 316 setSequenceId(seqId); 317 } 318 319 @Override 320 public byte[] getRowArray() { 321 return keyOnlyBuffer; 322 } 323 324 @Override 325 public byte[] getFamilyArray() { 326 return keyOnlyBuffer; 327 } 328 329 @Override 330 public byte[] getQualifierArray() { 331 return keyOnlyBuffer; 332 } 333 334 @Override 335 public int getRowOffset() { 336 return Bytes.SIZEOF_SHORT; 337 } 338 339 @Override 340 public short getRowLength() { 341 return rowLength; 342 } 343 344 @Override 345 public int getFamilyOffset() { 346 return familyOffset; 347 } 348 349 @Override 350 public byte getFamilyLength() { 351 return familyLength; 352 } 353 354 @Override 355 public int getQualifierOffset() { 356 return qualifierOffset; 357 } 358 359 @Override 360 public int getQualifierLength() { 361 return qualifierLength; 362 } 363 364 @Override 365 public long getTimestamp() { 366 return timeStamp; 367 } 368 369 @Override 370 public byte getTypeByte() { 371 return typeByte; 372 } 373 374 @Override 375 public long getSequenceId() { 376 return seqId; 377 } 378 379 @Override 380 public byte[] getValueArray() { 381 return this.valueBuffer; 382 } 383 384 @Override 385 public int getValueOffset() { 386 return valueOffset; 387 } 388 389 @Override 390 public int getValueLength() { 391 return valueLength; 392 } 393 394 @Override 395 public byte[] getTagsArray() { 396 return this.tagsBuffer; 397 } 398 399 @Override 400 public int getTagsOffset() { 401 return this.tagsOffset; 402 } 403 404 @Override 405 public int getTagsLength() { 406 return tagsLength; 407 } 408 409 @Override 410 public String toString() { 411 return KeyValue.keyToString(this.keyOnlyBuffer, 0, KeyValueUtil.keyLength(this)) + "/vlen=" 412 + getValueLength() + "/seqid=" + seqId; 413 } 414 415 @Override 416 public void setSequenceId(long seqId) { 417 this.seqId = seqId; 418 } 419 420 @Override 421 public long heapSize() { 422 return FIXED_OVERHEAD + rowLength + familyLength + qualifierLength + valueLength + tagsLength; 423 } 424 425 @Override 426 public int write(OutputStream out, boolean withTags) throws IOException { 427 int lenToWrite = getSerializedSize(withTags); 428 ByteBufferUtils.putInt(out, keyOnlyBuffer.length); 429 ByteBufferUtils.putInt(out, valueLength); 430 // Write key 431 out.write(keyOnlyBuffer); 432 // Write value 433 out.write(this.valueBuffer, this.valueOffset, this.valueLength); 434 if (withTags && this.tagsLength > 0) { 435 // 2 bytes tags length followed by tags bytes 436 // tags length is serialized with 2 bytes only(short way) even if the type is int. 437 // As this is non -ve numbers, we save the sign bit. See HBASE-11437 438 out.write((byte) (0xff & (this.tagsLength >> 8))); 439 out.write((byte) (0xff & this.tagsLength)); 440 out.write(this.tagsBuffer, this.tagsOffset, this.tagsLength); 441 } 442 return lenToWrite; 443 } 444 445 @Override 446 public int getSerializedSize(boolean withTags) { 447 return KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength, tagsLength, 448 withTags); 449 } 450 451 @Override 452 public void write(ByteBuffer buf, int offset) { 453 // This is not used in actual flow. Throwing UnsupportedOperationException 454 throw new UnsupportedOperationException(); 455 } 456 457 @Override 458 public void setTimestamp(long ts) throws IOException { 459 // This is not used in actual flow. Throwing UnsupportedOperationException 460 throw new UnsupportedOperationException(); 461 } 462 463 @Override 464 public void setTimestamp(byte[] ts) throws IOException { 465 // This is not used in actual flow. Throwing UnsupportedOperationException 466 throw new UnsupportedOperationException(); 467 } 468 469 @Override 470 public ExtendedCell deepClone() { 471 // This is not used in actual flow. Throwing UnsupportedOperationException 472 throw new UnsupportedOperationException(); 473 } 474 } 475 476 protected static class OffheapDecodedExtendedCell extends ByteBufferExtendedCell { 477 private static final long FIXED_OVERHEAD = 478 (long) ClassSize.align(ClassSize.OBJECT + (3 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) 479 + (7 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) 480 + (3 * ClassSize.BYTE_BUFFER); 481 private ByteBuffer keyBuffer; 482 private short rowLength; 483 private int familyOffset; 484 private byte familyLength; 485 private int qualifierOffset; 486 private int qualifierLength; 487 private long timeStamp; 488 private byte typeByte; 489 private ByteBuffer valueBuffer; 490 private int valueOffset; 491 private int valueLength; 492 private ByteBuffer tagsBuffer; 493 private int tagsOffset; 494 private int tagsLength; 495 private long seqId; 496 497 protected OffheapDecodedExtendedCell(ByteBuffer keyBuffer, short rowLength, int familyOffset, 498 byte familyLength, int qualOffset, int qualLength, long timeStamp, byte typeByte, 499 ByteBuffer valueBuffer, int valueOffset, int valueLen, long seqId, ByteBuffer tagsBuffer, 500 int tagsOffset, int tagsLength) { 501 // The keyBuffer is always onheap 502 assert keyBuffer.hasArray(); 503 assert keyBuffer.arrayOffset() == 0; 504 this.keyBuffer = keyBuffer; 505 this.rowLength = rowLength; 506 this.familyOffset = familyOffset; 507 this.familyLength = familyLength; 508 this.qualifierOffset = qualOffset; 509 this.qualifierLength = qualLength; 510 this.timeStamp = timeStamp; 511 this.typeByte = typeByte; 512 this.valueBuffer = valueBuffer; 513 this.valueOffset = valueOffset; 514 this.valueLength = valueLen; 515 this.tagsBuffer = tagsBuffer; 516 this.tagsOffset = tagsOffset; 517 this.tagsLength = tagsLength; 518 setSequenceId(seqId); 519 } 520 521 @Override 522 @SuppressWarnings("ByteBufferBackingArray") 523 public byte[] getRowArray() { 524 return this.keyBuffer.array(); 525 } 526 527 @Override 528 public int getRowOffset() { 529 return getRowPosition(); 530 } 531 532 @Override 533 public short getRowLength() { 534 return this.rowLength; 535 } 536 537 @Override 538 @SuppressWarnings("ByteBufferBackingArray") 539 public byte[] getFamilyArray() { 540 return this.keyBuffer.array(); 541 } 542 543 @Override 544 public int getFamilyOffset() { 545 return getFamilyPosition(); 546 } 547 548 @Override 549 public byte getFamilyLength() { 550 return this.familyLength; 551 } 552 553 @Override 554 @SuppressWarnings("ByteBufferBackingArray") 555 public byte[] getQualifierArray() { 556 return this.keyBuffer.array(); 557 } 558 559 @Override 560 public int getQualifierOffset() { 561 return getQualifierPosition(); 562 } 563 564 @Override 565 public int getQualifierLength() { 566 return this.qualifierLength; 567 } 568 569 @Override 570 public long getTimestamp() { 571 return this.timeStamp; 572 } 573 574 @Override 575 public byte getTypeByte() { 576 return this.typeByte; 577 } 578 579 @Override 580 public long getSequenceId() { 581 return this.seqId; 582 } 583 584 @Override 585 public byte[] getValueArray() { 586 return CellUtil.cloneValue(this); 587 } 588 589 @Override 590 public int getValueOffset() { 591 return 0; 592 } 593 594 @Override 595 public int getValueLength() { 596 return this.valueLength; 597 } 598 599 @Override 600 public byte[] getTagsArray() { 601 return PrivateCellUtil.cloneTags(this); 602 } 603 604 @Override 605 public int getTagsOffset() { 606 return 0; 607 } 608 609 @Override 610 public int getTagsLength() { 611 return this.tagsLength; 612 } 613 614 @Override 615 public ByteBuffer getRowByteBuffer() { 616 return this.keyBuffer; 617 } 618 619 @Override 620 public int getRowPosition() { 621 return Bytes.SIZEOF_SHORT; 622 } 623 624 @Override 625 public ByteBuffer getFamilyByteBuffer() { 626 return this.keyBuffer; 627 } 628 629 @Override 630 public int getFamilyPosition() { 631 return this.familyOffset; 632 } 633 634 @Override 635 public ByteBuffer getQualifierByteBuffer() { 636 return this.keyBuffer; 637 } 638 639 @Override 640 public int getQualifierPosition() { 641 return this.qualifierOffset; 642 } 643 644 @Override 645 public ByteBuffer getValueByteBuffer() { 646 return this.valueBuffer; 647 } 648 649 @Override 650 public int getValuePosition() { 651 return this.valueOffset; 652 } 653 654 @Override 655 public ByteBuffer getTagsByteBuffer() { 656 return this.tagsBuffer; 657 } 658 659 @Override 660 public int getTagsPosition() { 661 return this.tagsOffset; 662 } 663 664 @Override 665 public long heapSize() { 666 return FIXED_OVERHEAD; 667 } 668 669 @Override 670 public void setSequenceId(long seqId) { 671 this.seqId = seqId; 672 } 673 674 @Override 675 public int write(OutputStream out, boolean withTags) throws IOException { 676 int lenToWrite = getSerializedSize(withTags); 677 ByteBufferUtils.putInt(out, keyBuffer.remaining()); 678 ByteBufferUtils.putInt(out, valueLength); 679 // Write key 680 out.write(keyBuffer.array(), keyBuffer.arrayOffset(), keyBuffer.remaining()); 681 // Write value 682 ByteBufferUtils.copyBufferToStream(out, this.valueBuffer, this.valueOffset, this.valueLength); 683 if (withTags && this.tagsLength > 0) { 684 // 2 bytes tags length followed by tags bytes 685 // tags length is serialized with 2 bytes only(short way) even if the type is int. 686 // As this is non -ve numbers, we save the sign bit. See HBASE-11437 687 out.write((byte) (0xff & (this.tagsLength >> 8))); 688 out.write((byte) (0xff & this.tagsLength)); 689 ByteBufferUtils.copyBufferToStream(out, this.tagsBuffer, this.tagsOffset, this.tagsLength); 690 } 691 return lenToWrite; 692 } 693 694 @Override 695 public int getSerializedSize(boolean withTags) { 696 return KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength, tagsLength, 697 withTags); 698 } 699 700 @Override 701 public void setTimestamp(long ts) throws IOException { 702 // This is not used in actual flow. Throwing UnsupportedOperationException 703 throw new UnsupportedOperationException(); 704 } 705 706 @Override 707 public void setTimestamp(byte[] ts) throws IOException { 708 // This is not used in actual flow. Throwing UnsupportedOperationException 709 throw new UnsupportedOperationException(); 710 } 711 712 @Override 713 public void write(ByteBuffer buf, int offset) { 714 // This is not used in actual flow. Throwing UnsupportedOperationException 715 throw new UnsupportedOperationException(); 716 } 717 718 @Override 719 public ExtendedCell deepClone() { 720 // This is not used in actual flow. Throwing UnsupportedOperationException 721 throw new UnsupportedOperationException(); 722 } 723 } 724 725 protected abstract static class BufferedEncodedSeeker<STATE extends SeekerState> 726 extends AbstractEncodedSeeker { 727 protected ByteBuff currentBuffer; 728 protected TagCompressionContext tagCompressionContext = null; 729 protected KeyValue.KeyOnlyKeyValue keyOnlyKV = new KeyValue.KeyOnlyKeyValue(); 730 // A temp pair object which will be reused by ByteBuff#asSubByteBuffer calls. This avoids too 731 // many object creations. 732 protected final ObjectIntPair<ByteBuffer> tmpPair = new ObjectIntPair<>(); 733 protected STATE current, previous; 734 735 public BufferedEncodedSeeker(HFileBlockDecodingContext decodingCtx) { 736 super(decodingCtx); 737 if (decodingCtx.getHFileContext().isCompressTags()) { 738 try { 739 tagCompressionContext = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE); 740 } catch (Exception e) { 741 throw new RuntimeException("Failed to initialize TagCompressionContext", e); 742 } 743 } 744 current = createSeekerState(); // always valid 745 previous = createSeekerState(); // may not be valid 746 } 747 748 @Override 749 public int compareKey(CellComparator comparator, Cell key) { 750 keyOnlyKV.setKey(current.keyBuffer, 0, current.keyLength); 751 return PrivateCellUtil.compareKeyIgnoresMvcc(comparator, key, keyOnlyKV); 752 } 753 754 @Override 755 public void setCurrentBuffer(ByteBuff buffer) { 756 if (this.tagCompressionContext != null) { 757 this.tagCompressionContext.clear(); 758 } 759 currentBuffer = buffer; 760 current.currentBuffer = currentBuffer; 761 if (tagCompressionContext != null) { 762 current.tagCompressionContext = tagCompressionContext; 763 } 764 decodeFirst(); 765 current.setKey(current.keyBuffer, current.memstoreTS); 766 previous.invalidate(); 767 } 768 769 @Override 770 public Cell getKey() { 771 byte[] key = new byte[current.keyLength]; 772 System.arraycopy(current.keyBuffer, 0, key, 0, current.keyLength); 773 return new KeyValue.KeyOnlyKeyValue(key); 774 } 775 776 @Override 777 public ByteBuffer getValueShallowCopy() { 778 currentBuffer.asSubByteBuffer(current.valueOffset, current.valueLength, tmpPair); 779 ByteBuffer dup = tmpPair.getFirst().duplicate(); 780 dup.position(tmpPair.getSecond()); 781 dup.limit(tmpPair.getSecond() + current.valueLength); 782 return dup.slice(); 783 } 784 785 @Override 786 public Cell getCell() { 787 return current.toCell(); 788 } 789 790 @Override 791 public void rewind() { 792 currentBuffer.rewind(); 793 if (tagCompressionContext != null) { 794 tagCompressionContext.clear(); 795 // Prior seekToKeyInBlock may have reset this to false if we fell back to previous 796 // seeker state. This is an optimization so we don't have to uncompress tags again when 797 // reading last state. 798 // In case of rewind, we are starting from the beginning of the buffer, so we need 799 // to uncompress any tags we see. 800 // It may make sense to reset this in setCurrentBuffer as well, but we seem to only call 801 // setCurrentBuffer after StoreFileScanner.seekAtOrAfter which calls next to consume the 802 // seeker state. Rewind is called by seekBefore, which doesn't and leaves us in this state. 803 current.uncompressTags = true; 804 } 805 decodeFirst(); 806 current.setKey(current.keyBuffer, current.memstoreTS); 807 previous.invalidate(); 808 } 809 810 @Override 811 public boolean next() { 812 if (!currentBuffer.hasRemaining()) { 813 return false; 814 } 815 decodeNext(); 816 current.setKey(current.keyBuffer, current.memstoreTS); 817 previous.invalidate(); 818 return true; 819 } 820 821 protected void decodeTags() { 822 current.tagsLength = ByteBuff.readCompressedInt(currentBuffer); 823 if (tagCompressionContext != null) { 824 if (current.uncompressTags) { 825 // Tag compression is been used. uncompress it into tagsBuffer 826 current.ensureSpaceForTags(); 827 try { 828 current.tagsCompressedLength = tagCompressionContext.uncompressTags(currentBuffer, 829 current.tagsBuffer, 0, current.tagsLength); 830 } catch (Exception e) { 831 throw new RuntimeException("Exception while uncompressing tags", e); 832 } 833 } else { 834 currentBuffer.skip(current.tagsCompressedLength); 835 current.uncompressTags = true;// Reset this. 836 } 837 current.tagsOffset = -1; 838 } else { 839 // When tag compress is not used, let us not do copying of tags bytes into tagsBuffer. 840 // Just mark the tags Offset so as to create the KV buffer later in getKeyValueBuffer() 841 current.tagsOffset = currentBuffer.position(); 842 currentBuffer.skip(current.tagsLength); 843 } 844 } 845 846 @Override 847 public int seekToKeyInBlock(Cell seekCell, boolean seekBefore) { 848 int rowCommonPrefix = 0; 849 int familyCommonPrefix = 0; 850 int qualCommonPrefix = 0; 851 previous.invalidate(); 852 do { 853 int comp; 854 keyOnlyKV.setKey(current.keyBuffer, 0, current.keyLength); 855 if (current.lastCommonPrefix != 0) { 856 // The KV format has row key length also in the byte array. The 857 // common prefix 858 // includes it. So we need to subtract to find out the common prefix 859 // in the 860 // row part alone 861 rowCommonPrefix = Math.min(rowCommonPrefix, current.lastCommonPrefix - 2); 862 } 863 if (current.lastCommonPrefix <= 2) { 864 rowCommonPrefix = 0; 865 } 866 rowCommonPrefix += findCommonPrefixInRowPart(seekCell, keyOnlyKV, rowCommonPrefix); 867 comp = compareCommonRowPrefix(seekCell, keyOnlyKV, rowCommonPrefix); 868 if (comp == 0) { 869 comp = compareTypeBytes(seekCell, keyOnlyKV); 870 if (comp == 0) { 871 // Subtract the fixed row key length and the family key fixed length 872 familyCommonPrefix = Math.max(0, Math.min(familyCommonPrefix, 873 current.lastCommonPrefix - (3 + keyOnlyKV.getRowLength()))); 874 familyCommonPrefix += 875 findCommonPrefixInFamilyPart(seekCell, keyOnlyKV, familyCommonPrefix); 876 comp = compareCommonFamilyPrefix(seekCell, keyOnlyKV, familyCommonPrefix); 877 if (comp == 0) { 878 // subtract the rowkey fixed length and the family key fixed 879 // length 880 qualCommonPrefix = Math.max(0, Math.min(qualCommonPrefix, current.lastCommonPrefix 881 - (3 + keyOnlyKV.getRowLength() + keyOnlyKV.getFamilyLength()))); 882 qualCommonPrefix += 883 findCommonPrefixInQualifierPart(seekCell, keyOnlyKV, qualCommonPrefix); 884 comp = compareCommonQualifierPrefix(seekCell, keyOnlyKV, qualCommonPrefix); 885 if (comp == 0) { 886 comp = CellComparator.getInstance().compareTimestamps(seekCell, keyOnlyKV); 887 if (comp == 0) { 888 // Compare types. Let the delete types sort ahead of puts; 889 // i.e. types 890 // of higher numbers sort before those of lesser numbers. 891 // Maximum 892 // (255) 893 // appears ahead of everything, and minimum (0) appears 894 // after 895 // everything. 896 comp = (0xff & keyOnlyKV.getTypeByte()) - (0xff & seekCell.getTypeByte()); 897 } 898 } 899 } 900 } 901 } 902 if (comp == 0) { // exact match 903 if (seekBefore) { 904 if (!previous.isValid()) { 905 // The caller (seekBefore) has to ensure that we are not at the 906 // first key in the block. 907 throw new IllegalStateException( 908 "Cannot seekBefore if " + "positioned at the first key in the block: key=" 909 + Bytes.toStringBinary(seekCell.getRowArray())); 910 } 911 moveToPrevious(); 912 return 1; 913 } 914 return 0; 915 } 916 917 if (comp < 0) { // already too large, check previous 918 if (previous.isValid()) { 919 moveToPrevious(); 920 } else { 921 return HConstants.INDEX_KEY_MAGIC; // using optimized index key 922 } 923 return 1; 924 } 925 926 // move to next, if more data is available 927 if (currentBuffer.hasRemaining()) { 928 previous.copyFromNext(current); 929 decodeNext(); 930 current.setKey(current.keyBuffer, current.memstoreTS); 931 } else { 932 break; 933 } 934 } while (true); 935 936 // we hit the end of the block, not an exact match 937 return 1; 938 } 939 940 private int compareTypeBytes(Cell key, Cell right) { 941 if ( 942 key.getFamilyLength() + key.getQualifierLength() == 0 943 && key.getTypeByte() == KeyValue.Type.Minimum.getCode() 944 ) { 945 // left is "bigger", i.e. it appears later in the sorted order 946 return 1; 947 } 948 if ( 949 right.getFamilyLength() + right.getQualifierLength() == 0 950 && right.getTypeByte() == KeyValue.Type.Minimum.getCode() 951 ) { 952 return -1; 953 } 954 return 0; 955 } 956 957 private static int findCommonPrefixInRowPart(Cell left, Cell right, int rowCommonPrefix) { 958 return Bytes.findCommonPrefix(left.getRowArray(), right.getRowArray(), 959 left.getRowLength() - rowCommonPrefix, right.getRowLength() - rowCommonPrefix, 960 left.getRowOffset() + rowCommonPrefix, right.getRowOffset() + rowCommonPrefix); 961 } 962 963 private static int findCommonPrefixInFamilyPart(Cell left, Cell right, int familyCommonPrefix) { 964 return Bytes.findCommonPrefix(left.getFamilyArray(), right.getFamilyArray(), 965 left.getFamilyLength() - familyCommonPrefix, right.getFamilyLength() - familyCommonPrefix, 966 left.getFamilyOffset() + familyCommonPrefix, right.getFamilyOffset() + familyCommonPrefix); 967 } 968 969 private static int findCommonPrefixInQualifierPart(Cell left, Cell right, 970 int qualifierCommonPrefix) { 971 return Bytes.findCommonPrefix(left.getQualifierArray(), right.getQualifierArray(), 972 left.getQualifierLength() - qualifierCommonPrefix, 973 right.getQualifierLength() - qualifierCommonPrefix, 974 left.getQualifierOffset() + qualifierCommonPrefix, 975 right.getQualifierOffset() + qualifierCommonPrefix); 976 } 977 978 private void moveToPrevious() { 979 if (!previous.isValid()) { 980 throw new IllegalStateException( 981 "Can move back only once and not in first key in the block."); 982 } 983 984 STATE tmp = previous; 985 previous = current; 986 current = tmp; 987 988 // move after last key value 989 currentBuffer.position(current.nextKvOffset); 990 // Already decoded the tag bytes. We cache this tags into current state and also the total 991 // compressed length of the tags bytes. For the next time decodeNext() we don't need to decode 992 // the tags again. This might pollute the Data Dictionary what we use for the compression. 993 // When current.uncompressTags is false, we will just reuse the current.tagsBuffer and skip 994 // 'tagsCompressedLength' bytes of source stream. 995 // See in decodeTags() 996 current.tagsBuffer = previous.tagsBuffer; 997 current.tagsCompressedLength = previous.tagsCompressedLength; 998 current.uncompressTags = false; 999 // The current key has to be reset with the previous Cell 1000 current.setKey(current.keyBuffer, current.memstoreTS); 1001 previous.invalidate(); 1002 } 1003 1004 @SuppressWarnings("unchecked") 1005 protected STATE createSeekerState() { 1006 // This will fail for non-default seeker state if the subclass does not 1007 // override this method. 1008 return (STATE) new SeekerState(this.tmpPair, this.includesTags()); 1009 } 1010 1011 abstract protected void decodeFirst(); 1012 1013 abstract protected void decodeNext(); 1014 } 1015 1016 /** Returns unencoded size added */ 1017 protected final int afterEncodingKeyValue(Cell cell, DataOutputStream out, 1018 HFileBlockDefaultEncodingContext encodingCtx) throws IOException { 1019 int size = 0; 1020 if (encodingCtx.getHFileContext().isIncludesTags()) { 1021 int tagsLength = cell.getTagsLength(); 1022 ByteBufferUtils.putCompressedInt(out, tagsLength); 1023 // There are some tags to be written 1024 if (tagsLength > 0) { 1025 TagCompressionContext tagCompressionContext = encodingCtx.getTagCompressionContext(); 1026 // When tag compression is enabled, tagCompressionContext will have a not null value. Write 1027 // the tags using Dictionary compression in such a case 1028 if (tagCompressionContext != null) { 1029 // Not passing tagsLength considering that parsing of the tagsLength is not costly 1030 PrivateCellUtil.compressTags(out, cell, tagCompressionContext); 1031 } else { 1032 PrivateCellUtil.writeTags(out, cell, tagsLength); 1033 } 1034 } 1035 size += tagsLength + KeyValue.TAGS_LENGTH_SIZE; 1036 } 1037 if (encodingCtx.getHFileContext().isIncludesMvcc()) { 1038 // Copy memstore timestamp from the byte buffer to the output stream. 1039 long memstoreTS = cell.getSequenceId(); 1040 WritableUtils.writeVLong(out, memstoreTS); 1041 // TODO use a writeVLong which returns the #bytes written so that 2 time parsing can be 1042 // avoided. 1043 size += WritableUtils.getVIntSize(memstoreTS); 1044 } 1045 return size; 1046 } 1047 1048 protected final void afterDecodingKeyValue(DataInputStream source, ByteBuffer dest, 1049 HFileBlockDefaultDecodingContext decodingCtx) throws IOException { 1050 if (decodingCtx.getHFileContext().isIncludesTags()) { 1051 int tagsLength = ByteBufferUtils.readCompressedInt(source); 1052 // Put as unsigned short 1053 dest.put((byte) ((tagsLength >> 8) & 0xff)); 1054 dest.put((byte) (tagsLength & 0xff)); 1055 if (tagsLength > 0) { 1056 TagCompressionContext tagCompressionContext = decodingCtx.getTagCompressionContext(); 1057 // When tag compression is been used in this file, tagCompressionContext will have a not 1058 // null value passed. 1059 if (tagCompressionContext != null) { 1060 tagCompressionContext.uncompressTags(source, dest, tagsLength); 1061 } else { 1062 ByteBufferUtils.copyFromStreamToBuffer(dest, source, tagsLength); 1063 } 1064 } 1065 } 1066 if (decodingCtx.getHFileContext().isIncludesMvcc()) { 1067 long memstoreTS = -1; 1068 try { 1069 // Copy memstore timestamp from the data input stream to the byte 1070 // buffer. 1071 memstoreTS = WritableUtils.readVLong(source); 1072 ByteBufferUtils.writeVLong(dest, memstoreTS); 1073 } catch (IOException ex) { 1074 throw new RuntimeException( 1075 "Unable to copy memstore timestamp " + memstoreTS + " after decoding a key/value"); 1076 } 1077 } 1078 } 1079 1080 protected abstract ByteBuffer internalDecodeKeyValues(DataInputStream source, 1081 int allocateHeaderLength, int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx) 1082 throws IOException; 1083 1084 /** 1085 * Asserts that there is at least the given amount of unfilled space remaining in the given 1086 * buffer. 1087 * @param out typically, the buffer we are writing to 1088 * @param length the required space in the buffer 1089 * @throws EncoderBufferTooSmallException If there are no enough bytes. 1090 */ 1091 protected static void ensureSpace(ByteBuffer out, int length) 1092 throws EncoderBufferTooSmallException { 1093 if (out.position() + length > out.limit()) { 1094 throw new EncoderBufferTooSmallException("Buffer position=" + out.position() 1095 + ", buffer limit=" + out.limit() + ", length to be written=" + length); 1096 } 1097 } 1098 1099 @Override 1100 public void startBlockEncoding(HFileBlockEncodingContext blkEncodingCtx, DataOutputStream out) 1101 throws IOException { 1102 if (blkEncodingCtx.getClass() != HFileBlockDefaultEncodingContext.class) { 1103 throw new IOException(this.getClass().getName() + " only accepts " 1104 + HFileBlockDefaultEncodingContext.class.getName() + " as the " + "encoding context."); 1105 } 1106 1107 HFileBlockDefaultEncodingContext encodingCtx = 1108 (HFileBlockDefaultEncodingContext) blkEncodingCtx; 1109 encodingCtx.prepareEncoding(out); 1110 if ( 1111 encodingCtx.getHFileContext().isIncludesTags() 1112 && encodingCtx.getHFileContext().isCompressTags() 1113 ) { 1114 if (encodingCtx.getTagCompressionContext() != null) { 1115 // It will be overhead to create the TagCompressionContext again and again for every block 1116 // encoding. 1117 encodingCtx.getTagCompressionContext().clear(); 1118 } else { 1119 try { 1120 TagCompressionContext tagCompressionContext = 1121 new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE); 1122 encodingCtx.setTagCompressionContext(tagCompressionContext); 1123 } catch (Exception e) { 1124 throw new IOException("Failed to initialize TagCompressionContext", e); 1125 } 1126 } 1127 } 1128 StreamUtils.writeInt(out, 0); // DUMMY length. This will be updated in endBlockEncoding() 1129 blkEncodingCtx.setEncodingState(new EncodingState()); 1130 } 1131 1132 @Override 1133 public void encode(Cell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out) 1134 throws IOException { 1135 EncodingState state = encodingCtx.getEncodingState(); 1136 int posBeforeEncode = out.size(); 1137 int encodedKvSize = internalEncode(cell, (HFileBlockDefaultEncodingContext) encodingCtx, out); 1138 state.postCellEncode(encodedKvSize, out.size() - posBeforeEncode); 1139 } 1140 1141 public abstract int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingCtx, 1142 DataOutputStream out) throws IOException; 1143 1144 @Override 1145 public void endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out, 1146 byte[] uncompressedBytesWithHeader) throws IOException { 1147 EncodingState state = encodingCtx.getEncodingState(); 1148 // Write the unencodedDataSizeWritten (with header size) 1149 Bytes.putInt(uncompressedBytesWithHeader, 1150 HConstants.HFILEBLOCK_HEADER_SIZE + DataBlockEncoding.ID_SIZE, 1151 state.getUnencodedDataSizeWritten()); 1152 postEncoding(encodingCtx); 1153 } 1154 1155}