001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with this 004 * work for additional information regarding copyright ownership. The ASF 005 * licenses this file to you under the Apache License, Version 2.0 (the 006 * "License"); you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 013 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 014 * License for the specific language governing permissions and limitations 015 * under the License. 016 */ 017package org.apache.hadoop.hbase.io.encoding; 018 019import java.io.DataInputStream; 020import java.io.DataOutputStream; 021import java.io.IOException; 022import java.io.OutputStream; 023import java.nio.ByteBuffer; 024import org.apache.hadoop.hbase.ByteBufferExtendedCell; 025import org.apache.hadoop.hbase.Cell; 026import org.apache.hadoop.hbase.CellComparator; 027import org.apache.hadoop.hbase.CellUtil; 028import org.apache.hadoop.hbase.ExtendedCell; 029import org.apache.hadoop.hbase.HConstants; 030import org.apache.hadoop.hbase.KeyValue; 031import org.apache.hadoop.hbase.KeyValue.Type; 032import org.apache.hadoop.hbase.KeyValueUtil; 033import org.apache.hadoop.hbase.PrivateCellUtil; 034import org.apache.hadoop.hbase.io.TagCompressionContext; 035import org.apache.hadoop.hbase.io.util.LRUDictionary; 036import org.apache.hadoop.hbase.io.util.StreamUtils; 037import org.apache.hadoop.hbase.nio.ByteBuff; 038import org.apache.hadoop.hbase.util.ByteBufferUtils; 039import org.apache.hadoop.hbase.util.Bytes; 040import org.apache.hadoop.hbase.util.ClassSize; 041import org.apache.hadoop.hbase.util.ObjectIntPair; 042import org.apache.hadoop.io.WritableUtils; 043import org.apache.yetus.audience.InterfaceAudience; 044 045/** 046 * Base class for all data block encoders that use a buffer. 047 */ 048@InterfaceAudience.Private 049abstract class BufferedDataBlockEncoder extends AbstractDataBlockEncoder { 050 /** 051 * TODO: This datablockencoder is dealing in internals of hfileblocks. Purge reference to HFBs 052 */ 053 private static int INITIAL_KEY_BUFFER_SIZE = 512; 054 055 @Override 056 public ByteBuffer decodeKeyValues(DataInputStream source, 057 HFileBlockDecodingContext blkDecodingCtx) throws IOException { 058 if (blkDecodingCtx.getClass() != HFileBlockDefaultDecodingContext.class) { 059 throw new IOException(this.getClass().getName() + " only accepts " 060 + HFileBlockDefaultDecodingContext.class.getName() + " as the decoding context."); 061 } 062 063 HFileBlockDefaultDecodingContext decodingCtx = 064 (HFileBlockDefaultDecodingContext) blkDecodingCtx; 065 if (decodingCtx.getHFileContext().isIncludesTags() 066 && decodingCtx.getHFileContext().isCompressTags()) { 067 if (decodingCtx.getTagCompressionContext() != null) { 068 // It will be overhead to create the TagCompressionContext again and again for every block 069 // decoding. 070 decodingCtx.getTagCompressionContext().clear(); 071 } else { 072 try { 073 TagCompressionContext tagCompressionContext = new TagCompressionContext( 074 LRUDictionary.class, Byte.MAX_VALUE); 075 decodingCtx.setTagCompressionContext(tagCompressionContext); 076 } catch (Exception e) { 077 throw new IOException("Failed to initialize TagCompressionContext", e); 078 } 079 } 080 } 081 return internalDecodeKeyValues(source, 0, 0, decodingCtx); 082 } 083 084 /********************* common prefixes *************************/ 085 // Having this as static is fine but if META is having DBE then we should 086 // change this. 087 public static int compareCommonRowPrefix(Cell left, Cell right, int rowCommonPrefix) { 088 return Bytes.compareTo(left.getRowArray(), left.getRowOffset() + rowCommonPrefix, 089 left.getRowLength() - rowCommonPrefix, right.getRowArray(), right.getRowOffset() 090 + rowCommonPrefix, right.getRowLength() - rowCommonPrefix); 091 } 092 093 public static int compareCommonFamilyPrefix(Cell left, Cell right, int familyCommonPrefix) { 094 return Bytes.compareTo(left.getFamilyArray(), left.getFamilyOffset() + familyCommonPrefix, 095 left.getFamilyLength() - familyCommonPrefix, right.getFamilyArray(), 096 right.getFamilyOffset() + familyCommonPrefix, right.getFamilyLength() - familyCommonPrefix); 097 } 098 099 public static int compareCommonQualifierPrefix(Cell left, Cell right, int qualCommonPrefix) { 100 return Bytes.compareTo(left.getQualifierArray(), left.getQualifierOffset() + qualCommonPrefix, 101 left.getQualifierLength() - qualCommonPrefix, right.getQualifierArray(), 102 right.getQualifierOffset() + qualCommonPrefix, right.getQualifierLength() 103 - qualCommonPrefix); 104 } 105 106 protected static class SeekerState { 107 protected ByteBuff currentBuffer; 108 protected TagCompressionContext tagCompressionContext; 109 protected int valueOffset = -1; 110 protected int keyLength; 111 protected int valueLength; 112 protected int lastCommonPrefix; 113 protected int tagsLength = 0; 114 protected int tagsOffset = -1; 115 protected int tagsCompressedLength = 0; 116 protected boolean uncompressTags = true; 117 118 /** We need to store a copy of the key. */ 119 protected byte[] keyBuffer = HConstants.EMPTY_BYTE_ARRAY; 120 protected byte[] tagsBuffer = HConstants.EMPTY_BYTE_ARRAY; 121 122 protected long memstoreTS; 123 protected int nextKvOffset; 124 protected KeyValue.KeyOnlyKeyValue currentKey = new KeyValue.KeyOnlyKeyValue(); 125 // A temp pair object which will be reused by ByteBuff#asSubByteBuffer calls. This avoids too 126 // many object creations. 127 private final ObjectIntPair<ByteBuffer> tmpPair; 128 private final boolean includeTags; 129 130 public SeekerState(ObjectIntPair<ByteBuffer> tmpPair, boolean includeTags) { 131 this.tmpPair = tmpPair; 132 this.includeTags = includeTags; 133 } 134 135 protected boolean isValid() { 136 return valueOffset != -1; 137 } 138 139 protected void invalidate() { 140 valueOffset = -1; 141 tagsCompressedLength = 0; 142 currentKey.clear(); 143 uncompressTags = true; 144 currentBuffer = null; 145 } 146 147 protected void ensureSpaceForKey() { 148 if (keyLength > keyBuffer.length) { 149 int newKeyBufferLength = Integer.highestOneBit(Math.max( 150 INITIAL_KEY_BUFFER_SIZE, keyLength) - 1) << 1; 151 byte[] newKeyBuffer = new byte[newKeyBufferLength]; 152 System.arraycopy(keyBuffer, 0, newKeyBuffer, 0, keyBuffer.length); 153 keyBuffer = newKeyBuffer; 154 } 155 } 156 157 protected void ensureSpaceForTags() { 158 if (tagsLength > tagsBuffer.length) { 159 int newTagsBufferLength = Integer.highestOneBit(Math.max( 160 INITIAL_KEY_BUFFER_SIZE, tagsLength) - 1) << 1; 161 byte[] newTagsBuffer = new byte[newTagsBufferLength]; 162 System.arraycopy(tagsBuffer, 0, newTagsBuffer, 0, tagsBuffer.length); 163 tagsBuffer = newTagsBuffer; 164 } 165 } 166 167 protected void setKey(byte[] keyBuffer, long memTS) { 168 currentKey.setKey(keyBuffer, 0, keyLength); 169 memstoreTS = memTS; 170 } 171 172 /** 173 * Copy the state from the next one into this instance (the previous state 174 * placeholder). Used to save the previous state when we are advancing the 175 * seeker to the next key/value. 176 */ 177 protected void copyFromNext(SeekerState nextState) { 178 if (keyBuffer.length != nextState.keyBuffer.length) { 179 keyBuffer = nextState.keyBuffer.clone(); 180 } else if (!isValid()) { 181 // Note: we can only call isValid before we override our state, so this 182 // comes before all the assignments at the end of this method. 183 System.arraycopy(nextState.keyBuffer, 0, keyBuffer, 0, 184 nextState.keyLength); 185 } else { 186 // don't copy the common prefix between this key and the previous one 187 System.arraycopy(nextState.keyBuffer, nextState.lastCommonPrefix, 188 keyBuffer, nextState.lastCommonPrefix, nextState.keyLength 189 - nextState.lastCommonPrefix); 190 } 191 currentKey.set(nextState.currentKey); 192 193 valueOffset = nextState.valueOffset; 194 keyLength = nextState.keyLength; 195 valueLength = nextState.valueLength; 196 lastCommonPrefix = nextState.lastCommonPrefix; 197 nextKvOffset = nextState.nextKvOffset; 198 memstoreTS = nextState.memstoreTS; 199 currentBuffer = nextState.currentBuffer; 200 tagsOffset = nextState.tagsOffset; 201 tagsLength = nextState.tagsLength; 202 if (nextState.tagCompressionContext != null) { 203 tagCompressionContext = nextState.tagCompressionContext; 204 } 205 } 206 207 public Cell toCell() { 208 // Buffer backing the value and tags part from the HFileBlock's buffer 209 // When tag compression in use, this will be only the value bytes area. 210 ByteBuffer valAndTagsBuffer; 211 int vOffset; 212 int valAndTagsLength = this.valueLength; 213 int tagsLenSerializationSize = 0; 214 if (this.includeTags && this.tagCompressionContext == null) { 215 // Include the tags part also. This will be the tags bytes + 2 bytes of for storing tags 216 // length 217 tagsLenSerializationSize = this.tagsOffset - (this.valueOffset + this.valueLength); 218 valAndTagsLength += tagsLenSerializationSize + this.tagsLength; 219 } 220 this.currentBuffer.asSubByteBuffer(this.valueOffset, valAndTagsLength, this.tmpPair); 221 valAndTagsBuffer = this.tmpPair.getFirst(); 222 vOffset = this.tmpPair.getSecond();// This is the offset to value part in the BB 223 if (valAndTagsBuffer.hasArray()) { 224 return toOnheapCell(valAndTagsBuffer, vOffset, tagsLenSerializationSize); 225 } else { 226 return toOffheapCell(valAndTagsBuffer, vOffset, tagsLenSerializationSize); 227 } 228 } 229 230 private Cell toOnheapCell(ByteBuffer valAndTagsBuffer, int vOffset, 231 int tagsLenSerializationSize) { 232 byte[] tagsArray = HConstants.EMPTY_BYTE_ARRAY; 233 int tOffset = 0; 234 if (this.includeTags) { 235 if (this.tagCompressionContext == null) { 236 tagsArray = valAndTagsBuffer.array(); 237 tOffset = valAndTagsBuffer.arrayOffset() + vOffset + this.valueLength 238 + tagsLenSerializationSize; 239 } else { 240 tagsArray = Bytes.copy(tagsBuffer, 0, this.tagsLength); 241 tOffset = 0; 242 } 243 } 244 return new OnheapDecodedCell(Bytes.copy(keyBuffer, 0, this.keyLength), 245 currentKey.getRowLength(), currentKey.getFamilyOffset(), currentKey.getFamilyLength(), 246 currentKey.getQualifierOffset(), currentKey.getQualifierLength(), 247 currentKey.getTimestamp(), currentKey.getTypeByte(), valAndTagsBuffer.array(), 248 valAndTagsBuffer.arrayOffset() + vOffset, this.valueLength, memstoreTS, tagsArray, 249 tOffset, this.tagsLength); 250 } 251 252 private Cell toOffheapCell(ByteBuffer valAndTagsBuffer, int vOffset, 253 int tagsLenSerializationSize) { 254 ByteBuffer tagsBuf = HConstants.EMPTY_BYTE_BUFFER; 255 int tOffset = 0; 256 if (this.includeTags) { 257 if (this.tagCompressionContext == null) { 258 tagsBuf = valAndTagsBuffer; 259 tOffset = vOffset + this.valueLength + tagsLenSerializationSize; 260 } else { 261 tagsBuf = ByteBuffer.wrap(Bytes.copy(tagsBuffer, 0, this.tagsLength)); 262 tOffset = 0; 263 } 264 } 265 return new OffheapDecodedExtendedCell( 266 ByteBuffer.wrap(Bytes.copy(keyBuffer, 0, this.keyLength)), currentKey.getRowLength(), 267 currentKey.getFamilyOffset(), currentKey.getFamilyLength(), 268 currentKey.getQualifierOffset(), currentKey.getQualifierLength(), 269 currentKey.getTimestamp(), currentKey.getTypeByte(), valAndTagsBuffer, vOffset, 270 this.valueLength, memstoreTS, tagsBuf, tOffset, this.tagsLength); 271 } 272 } 273 274 /** 275 * Copies only the key part of the keybuffer by doing a deep copy and passes the 276 * seeker state members for taking a clone. 277 * Note that the value byte[] part is still pointing to the currentBuffer and 278 * represented by the valueOffset and valueLength 279 */ 280 // We return this as a Cell to the upper layers of read flow and might try setting a new SeqId 281 // there. So this has to be an instance of ExtendedCell. 282 protected static class OnheapDecodedCell implements ExtendedCell { 283 private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT 284 + (3 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT) 285 + (Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) + (3 * ClassSize.ARRAY)); 286 private byte[] keyOnlyBuffer; 287 private short rowLength; 288 private int familyOffset; 289 private byte familyLength; 290 private int qualifierOffset; 291 private int qualifierLength; 292 private long timestamp; 293 private byte typeByte; 294 private byte[] valueBuffer; 295 private int valueOffset; 296 private int valueLength; 297 private byte[] tagsBuffer; 298 private int tagsOffset; 299 private int tagsLength; 300 private long seqId; 301 302 protected OnheapDecodedCell(byte[] keyBuffer, short rowLength, int familyOffset, 303 byte familyLength, int qualOffset, int qualLength, long timeStamp, byte typeByte, 304 byte[] valueBuffer, int valueOffset, int valueLen, long seqId, byte[] tagsBuffer, 305 int tagsOffset, int tagsLength) { 306 this.keyOnlyBuffer = keyBuffer; 307 this.rowLength = rowLength; 308 this.familyOffset = familyOffset; 309 this.familyLength = familyLength; 310 this.qualifierOffset = qualOffset; 311 this.qualifierLength = qualLength; 312 this.timestamp = timeStamp; 313 this.typeByte = typeByte; 314 this.valueBuffer = valueBuffer; 315 this.valueOffset = valueOffset; 316 this.valueLength = valueLen; 317 this.tagsBuffer = tagsBuffer; 318 this.tagsOffset = tagsOffset; 319 this.tagsLength = tagsLength; 320 setSequenceId(seqId); 321 } 322 323 @Override 324 public byte[] getRowArray() { 325 return keyOnlyBuffer; 326 } 327 328 @Override 329 public byte[] getFamilyArray() { 330 return keyOnlyBuffer; 331 } 332 333 @Override 334 public byte[] getQualifierArray() { 335 return keyOnlyBuffer; 336 } 337 338 @Override 339 public int getRowOffset() { 340 return Bytes.SIZEOF_SHORT; 341 } 342 343 @Override 344 public short getRowLength() { 345 return rowLength; 346 } 347 348 @Override 349 public int getFamilyOffset() { 350 return familyOffset; 351 } 352 353 @Override 354 public byte getFamilyLength() { 355 return familyLength; 356 } 357 358 @Override 359 public int getQualifierOffset() { 360 return qualifierOffset; 361 } 362 363 @Override 364 public int getQualifierLength() { 365 return qualifierLength; 366 } 367 368 @Override 369 public long getTimestamp() { 370 return timestamp; 371 } 372 373 @Override 374 public byte getTypeByte() { 375 return typeByte; 376 } 377 378 @Override 379 public long getSequenceId() { 380 return seqId; 381 } 382 383 @Override 384 public byte[] getValueArray() { 385 return this.valueBuffer; 386 } 387 388 @Override 389 public int getValueOffset() { 390 return valueOffset; 391 } 392 393 @Override 394 public int getValueLength() { 395 return valueLength; 396 } 397 398 @Override 399 public byte[] getTagsArray() { 400 return this.tagsBuffer; 401 } 402 403 @Override 404 public int getTagsOffset() { 405 return this.tagsOffset; 406 } 407 408 @Override 409 public int getTagsLength() { 410 return tagsLength; 411 } 412 413 @Override 414 public String toString() { 415 return KeyValue.keyToString(this.keyOnlyBuffer, 0, KeyValueUtil.keyLength(this)) + "/vlen=" 416 + getValueLength() + "/seqid=" + seqId; 417 } 418 419 @Override 420 public void setSequenceId(long seqId) { 421 this.seqId = seqId; 422 } 423 424 @Override 425 public long heapSize() { 426 return FIXED_OVERHEAD + rowLength + familyLength + qualifierLength + valueLength + tagsLength; 427 } 428 429 @Override 430 public int write(OutputStream out, boolean withTags) throws IOException { 431 int lenToWrite = getSerializedSize(withTags); 432 ByteBufferUtils.putInt(out, keyOnlyBuffer.length); 433 ByteBufferUtils.putInt(out, valueLength); 434 // Write key 435 out.write(keyOnlyBuffer); 436 // Write value 437 out.write(this.valueBuffer, this.valueOffset, this.valueLength); 438 if (withTags && this.tagsLength > 0) { 439 // 2 bytes tags length followed by tags bytes 440 // tags length is serialized with 2 bytes only(short way) even if the type is int. 441 // As this is non -ve numbers, we save the sign bit. See HBASE-11437 442 out.write((byte) (0xff & (this.tagsLength >> 8))); 443 out.write((byte) (0xff & this.tagsLength)); 444 out.write(this.tagsBuffer, this.tagsOffset, this.tagsLength); 445 } 446 return lenToWrite; 447 } 448 449 @Override 450 public int getSerializedSize(boolean withTags) { 451 return KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength, tagsLength, 452 withTags); 453 } 454 455 @Override 456 public void write(ByteBuffer buf, int offset) { 457 // This is not used in actual flow. Throwing UnsupportedOperationException 458 throw new UnsupportedOperationException(); 459 } 460 461 @Override 462 public void setTimestamp(long ts) throws IOException { 463 // This is not used in actual flow. Throwing UnsupportedOperationException 464 throw new UnsupportedOperationException(); 465 } 466 467 @Override 468 public void setTimestamp(byte[] ts) throws IOException { 469 // This is not used in actual flow. Throwing UnsupportedOperationException 470 throw new UnsupportedOperationException(); 471 } 472 473 @Override 474 public ExtendedCell deepClone() { 475 // This is not used in actual flow. Throwing UnsupportedOperationException 476 throw new UnsupportedOperationException(); 477 } 478 } 479 480 protected static class OffheapDecodedExtendedCell extends ByteBufferExtendedCell { 481 private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT 482 + (3 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT) 483 + (Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) + (3 * ClassSize.BYTE_BUFFER)); 484 private ByteBuffer keyBuffer; 485 private short rowLength; 486 private int familyOffset; 487 private byte familyLength; 488 private int qualifierOffset; 489 private int qualifierLength; 490 private long timestamp; 491 private byte typeByte; 492 private ByteBuffer valueBuffer; 493 private int valueOffset; 494 private int valueLength; 495 private ByteBuffer tagsBuffer; 496 private int tagsOffset; 497 private int tagsLength; 498 private long seqId; 499 500 protected OffheapDecodedExtendedCell(ByteBuffer keyBuffer, short rowLength, int familyOffset, 501 byte familyLength, int qualOffset, int qualLength, long timeStamp, byte typeByte, 502 ByteBuffer valueBuffer, int valueOffset, int valueLen, long seqId, ByteBuffer tagsBuffer, 503 int tagsOffset, int tagsLength) { 504 // The keyBuffer is always onheap 505 assert keyBuffer.hasArray(); 506 assert keyBuffer.arrayOffset() == 0; 507 this.keyBuffer = keyBuffer; 508 this.rowLength = rowLength; 509 this.familyOffset = familyOffset; 510 this.familyLength = familyLength; 511 this.qualifierOffset = qualOffset; 512 this.qualifierLength = qualLength; 513 this.timestamp = timeStamp; 514 this.typeByte = typeByte; 515 this.valueBuffer = valueBuffer; 516 this.valueOffset = valueOffset; 517 this.valueLength = valueLen; 518 this.tagsBuffer = tagsBuffer; 519 this.tagsOffset = tagsOffset; 520 this.tagsLength = tagsLength; 521 setSequenceId(seqId); 522 } 523 524 @Override 525 public byte[] getRowArray() { 526 return this.keyBuffer.array(); 527 } 528 529 @Override 530 public int getRowOffset() { 531 return getRowPosition(); 532 } 533 534 @Override 535 public short getRowLength() { 536 return this.rowLength; 537 } 538 539 @Override 540 public byte[] getFamilyArray() { 541 return this.keyBuffer.array(); 542 } 543 544 @Override 545 public int getFamilyOffset() { 546 return getFamilyPosition(); 547 } 548 549 @Override 550 public byte getFamilyLength() { 551 return this.familyLength; 552 } 553 554 @Override 555 public byte[] getQualifierArray() { 556 return this.keyBuffer.array(); 557 } 558 559 @Override 560 public int getQualifierOffset() { 561 return getQualifierPosition(); 562 } 563 564 @Override 565 public int getQualifierLength() { 566 return this.qualifierLength; 567 } 568 569 @Override 570 public long getTimestamp() { 571 return this.timestamp; 572 } 573 574 @Override 575 public byte getTypeByte() { 576 return this.typeByte; 577 } 578 579 @Override 580 public long getSequenceId() { 581 return this.seqId; 582 } 583 584 @Override 585 public byte[] getValueArray() { 586 return CellUtil.cloneValue(this); 587 } 588 589 @Override 590 public int getValueOffset() { 591 return 0; 592 } 593 594 @Override 595 public int getValueLength() { 596 return this.valueLength; 597 } 598 599 @Override 600 public byte[] getTagsArray() { 601 return CellUtil.cloneTags(this); 602 } 603 604 @Override 605 public int getTagsOffset() { 606 return 0; 607 } 608 609 @Override 610 public int getTagsLength() { 611 return this.tagsLength; 612 } 613 614 @Override 615 public ByteBuffer getRowByteBuffer() { 616 return this.keyBuffer; 617 } 618 619 @Override 620 public int getRowPosition() { 621 return Bytes.SIZEOF_SHORT; 622 } 623 624 @Override 625 public ByteBuffer getFamilyByteBuffer() { 626 return this.keyBuffer; 627 } 628 629 @Override 630 public int getFamilyPosition() { 631 return this.familyOffset; 632 } 633 634 @Override 635 public ByteBuffer getQualifierByteBuffer() { 636 return this.keyBuffer; 637 } 638 639 @Override 640 public int getQualifierPosition() { 641 return this.qualifierOffset; 642 } 643 644 @Override 645 public ByteBuffer getValueByteBuffer() { 646 return this.valueBuffer; 647 } 648 649 @Override 650 public int getValuePosition() { 651 return this.valueOffset; 652 } 653 654 @Override 655 public ByteBuffer getTagsByteBuffer() { 656 return this.tagsBuffer; 657 } 658 659 @Override 660 public int getTagsPosition() { 661 return this.tagsOffset; 662 } 663 664 @Override 665 public long heapSize() { 666 return FIXED_OVERHEAD; 667 } 668 669 @Override 670 public void setSequenceId(long seqId) { 671 this.seqId = seqId; 672 } 673 674 @Override 675 public int write(OutputStream out, boolean withTags) throws IOException { 676 int lenToWrite = getSerializedSize(withTags); 677 ByteBufferUtils.putInt(out, keyBuffer.capacity()); 678 ByteBufferUtils.putInt(out, valueLength); 679 // Write key 680 out.write(keyBuffer.array()); 681 // Write value 682 ByteBufferUtils.copyBufferToStream(out, this.valueBuffer, this.valueOffset, this.valueLength); 683 if (withTags && this.tagsLength > 0) { 684 // 2 bytes tags length followed by tags bytes 685 // tags length is serialized with 2 bytes only(short way) even if the type is int. 686 // As this is non -ve numbers, we save the sign bit. See HBASE-11437 687 out.write((byte) (0xff & (this.tagsLength >> 8))); 688 out.write((byte) (0xff & this.tagsLength)); 689 ByteBufferUtils.copyBufferToStream(out, this.tagsBuffer, this.tagsOffset, this.tagsLength); 690 } 691 return lenToWrite; 692 } 693 694 @Override 695 public int getSerializedSize(boolean withTags) { 696 return KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength, tagsLength, 697 withTags); 698 } 699 700 @Override 701 public void setTimestamp(long ts) throws IOException { 702 // This is not used in actual flow. Throwing UnsupportedOperationException 703 throw new UnsupportedOperationException(); 704 } 705 706 @Override 707 public void setTimestamp(byte[] ts) throws IOException { 708 // This is not used in actual flow. Throwing UnsupportedOperationException 709 throw new UnsupportedOperationException(); 710 } 711 712 @Override 713 public void write(ByteBuffer buf, int offset) { 714 // This is not used in actual flow. Throwing UnsupportedOperationException 715 throw new UnsupportedOperationException(); 716 } 717 718 @Override 719 public ExtendedCell deepClone() { 720 // This is not used in actual flow. Throwing UnsupportedOperationException 721 throw new UnsupportedOperationException(); 722 } 723 } 724 725 protected abstract static class BufferedEncodedSeeker<STATE extends SeekerState> 726 extends AbstractEncodedSeeker { 727 protected ByteBuff currentBuffer; 728 protected TagCompressionContext tagCompressionContext = null; 729 protected KeyValue.KeyOnlyKeyValue keyOnlyKV = new KeyValue.KeyOnlyKeyValue(); 730 // A temp pair object which will be reused by ByteBuff#asSubByteBuffer calls. This avoids too 731 // many object creations. 732 protected final ObjectIntPair<ByteBuffer> tmpPair = new ObjectIntPair<>(); 733 protected STATE current, previous; 734 735 public BufferedEncodedSeeker(CellComparator comparator, 736 HFileBlockDecodingContext decodingCtx) { 737 super(comparator, decodingCtx); 738 if (decodingCtx.getHFileContext().isCompressTags()) { 739 try { 740 tagCompressionContext = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE); 741 } catch (Exception e) { 742 throw new RuntimeException("Failed to initialize TagCompressionContext", e); 743 } 744 } 745 current = createSeekerState(); // always valid 746 previous = createSeekerState(); // may not be valid 747 } 748 749 @Override 750 public int compareKey(CellComparator comparator, Cell key) { 751 keyOnlyKV.setKey(current.keyBuffer, 0, current.keyLength); 752 return PrivateCellUtil.compareKeyIgnoresMvcc(comparator, key, keyOnlyKV); 753 } 754 755 @Override 756 public void setCurrentBuffer(ByteBuff buffer) { 757 if (this.tagCompressionContext != null) { 758 this.tagCompressionContext.clear(); 759 } 760 currentBuffer = buffer; 761 current.currentBuffer = currentBuffer; 762 if(tagCompressionContext != null) { 763 current.tagCompressionContext = tagCompressionContext; 764 } 765 decodeFirst(); 766 current.setKey(current.keyBuffer, current.memstoreTS); 767 previous.invalidate(); 768 } 769 770 @Override 771 public Cell getKey() { 772 byte[] key = new byte[current.keyLength]; 773 System.arraycopy(current.keyBuffer, 0, key, 0, current.keyLength); 774 return new KeyValue.KeyOnlyKeyValue(key); 775 } 776 777 @Override 778 public ByteBuffer getValueShallowCopy() { 779 currentBuffer.asSubByteBuffer(current.valueOffset, current.valueLength, tmpPair); 780 ByteBuffer dup = tmpPair.getFirst().duplicate(); 781 dup.position(tmpPair.getSecond()); 782 dup.limit(tmpPair.getSecond() + current.valueLength); 783 return dup.slice(); 784 } 785 786 @Override 787 public Cell getCell() { 788 return current.toCell(); 789 } 790 791 @Override 792 public void rewind() { 793 currentBuffer.rewind(); 794 if (tagCompressionContext != null) { 795 tagCompressionContext.clear(); 796 } 797 decodeFirst(); 798 current.setKey(current.keyBuffer, current.memstoreTS); 799 previous.invalidate(); 800 } 801 802 @Override 803 public boolean next() { 804 if (!currentBuffer.hasRemaining()) { 805 return false; 806 } 807 decodeNext(); 808 current.setKey(current.keyBuffer, current.memstoreTS); 809 previous.invalidate(); 810 return true; 811 } 812 813 protected void decodeTags() { 814 current.tagsLength = ByteBuff.readCompressedInt(currentBuffer); 815 if (tagCompressionContext != null) { 816 if (current.uncompressTags) { 817 // Tag compression is been used. uncompress it into tagsBuffer 818 current.ensureSpaceForTags(); 819 try { 820 current.tagsCompressedLength = tagCompressionContext.uncompressTags(currentBuffer, 821 current.tagsBuffer, 0, current.tagsLength); 822 } catch (IOException e) { 823 throw new RuntimeException("Exception while uncompressing tags", e); 824 } 825 } else { 826 currentBuffer.skip(current.tagsCompressedLength); 827 current.uncompressTags = true;// Reset this. 828 } 829 current.tagsOffset = -1; 830 } else { 831 // When tag compress is not used, let us not do copying of tags bytes into tagsBuffer. 832 // Just mark the tags Offset so as to create the KV buffer later in getKeyValueBuffer() 833 current.tagsOffset = currentBuffer.position(); 834 currentBuffer.skip(current.tagsLength); 835 } 836 } 837 838 @Override 839 public int seekToKeyInBlock(Cell seekCell, boolean seekBefore) { 840 int rowCommonPrefix = 0; 841 int familyCommonPrefix = 0; 842 int qualCommonPrefix = 0; 843 previous.invalidate(); 844 do { 845 int comp; 846 keyOnlyKV.setKey(current.keyBuffer, 0, current.keyLength); 847 if (current.lastCommonPrefix != 0) { 848 // The KV format has row key length also in the byte array. The 849 // common prefix 850 // includes it. So we need to subtract to find out the common prefix 851 // in the 852 // row part alone 853 rowCommonPrefix = Math.min(rowCommonPrefix, current.lastCommonPrefix - 2); 854 } 855 if (current.lastCommonPrefix <= 2) { 856 rowCommonPrefix = 0; 857 } 858 rowCommonPrefix += findCommonPrefixInRowPart(seekCell, keyOnlyKV, rowCommonPrefix); 859 comp = compareCommonRowPrefix(seekCell, keyOnlyKV, rowCommonPrefix); 860 if (comp == 0) { 861 comp = compareTypeBytes(seekCell, keyOnlyKV); 862 if (comp == 0) { 863 // Subtract the fixed row key length and the family key fixed length 864 familyCommonPrefix = Math.max( 865 0, 866 Math.min(familyCommonPrefix, 867 current.lastCommonPrefix - (3 + keyOnlyKV.getRowLength()))); 868 familyCommonPrefix += findCommonPrefixInFamilyPart(seekCell, keyOnlyKV, 869 familyCommonPrefix); 870 comp = compareCommonFamilyPrefix(seekCell, keyOnlyKV, familyCommonPrefix); 871 if (comp == 0) { 872 // subtract the rowkey fixed length and the family key fixed 873 // length 874 qualCommonPrefix = Math.max( 875 0, 876 Math.min( 877 qualCommonPrefix, 878 current.lastCommonPrefix 879 - (3 + keyOnlyKV.getRowLength() + keyOnlyKV.getFamilyLength()))); 880 qualCommonPrefix += findCommonPrefixInQualifierPart(seekCell, keyOnlyKV, 881 qualCommonPrefix); 882 comp = compareCommonQualifierPrefix(seekCell, keyOnlyKV, qualCommonPrefix); 883 if (comp == 0) { 884 comp = CellComparator.getInstance().compareTimestamps(seekCell, keyOnlyKV); 885 if (comp == 0) { 886 // Compare types. Let the delete types sort ahead of puts; 887 // i.e. types 888 // of higher numbers sort before those of lesser numbers. 889 // Maximum 890 // (255) 891 // appears ahead of everything, and minimum (0) appears 892 // after 893 // everything. 894 comp = (0xff & keyOnlyKV.getTypeByte()) - (0xff & seekCell.getTypeByte()); 895 } 896 } 897 } 898 } 899 } 900 if (comp == 0) { // exact match 901 if (seekBefore) { 902 if (!previous.isValid()) { 903 // The caller (seekBefore) has to ensure that we are not at the 904 // first key in the block. 905 throw new IllegalStateException("Cannot seekBefore if " 906 + "positioned at the first key in the block: key=" 907 + Bytes.toStringBinary(seekCell.getRowArray())); 908 } 909 moveToPrevious(); 910 return 1; 911 } 912 return 0; 913 } 914 915 if (comp < 0) { // already too large, check previous 916 if (previous.isValid()) { 917 moveToPrevious(); 918 } else { 919 return HConstants.INDEX_KEY_MAGIC; // using optimized index key 920 } 921 return 1; 922 } 923 924 // move to next, if more data is available 925 if (currentBuffer.hasRemaining()) { 926 previous.copyFromNext(current); 927 decodeNext(); 928 current.setKey(current.keyBuffer, current.memstoreTS); 929 } else { 930 break; 931 } 932 } while (true); 933 934 // we hit the end of the block, not an exact match 935 return 1; 936 } 937 938 private int compareTypeBytes(Cell key, Cell right) { 939 if (key.getFamilyLength() + key.getQualifierLength() == 0 940 && key.getTypeByte() == Type.Minimum.getCode()) { 941 // left is "bigger", i.e. it appears later in the sorted order 942 return 1; 943 } 944 if (right.getFamilyLength() + right.getQualifierLength() == 0 945 && right.getTypeByte() == Type.Minimum.getCode()) { 946 return -1; 947 } 948 return 0; 949 } 950 951 private static int findCommonPrefixInRowPart(Cell left, Cell right, int rowCommonPrefix) { 952 return Bytes.findCommonPrefix(left.getRowArray(), right.getRowArray(), left.getRowLength() 953 - rowCommonPrefix, right.getRowLength() - rowCommonPrefix, left.getRowOffset() 954 + rowCommonPrefix, right.getRowOffset() + rowCommonPrefix); 955 } 956 957 private static int findCommonPrefixInFamilyPart(Cell left, Cell right, int familyCommonPrefix) { 958 return Bytes 959 .findCommonPrefix(left.getFamilyArray(), right.getFamilyArray(), left.getFamilyLength() 960 - familyCommonPrefix, right.getFamilyLength() - familyCommonPrefix, 961 left.getFamilyOffset() + familyCommonPrefix, right.getFamilyOffset() 962 + familyCommonPrefix); 963 } 964 965 private static int findCommonPrefixInQualifierPart(Cell left, Cell right, 966 int qualifierCommonPrefix) { 967 return Bytes.findCommonPrefix(left.getQualifierArray(), right.getQualifierArray(), 968 left.getQualifierLength() - qualifierCommonPrefix, right.getQualifierLength() 969 - qualifierCommonPrefix, left.getQualifierOffset() + qualifierCommonPrefix, 970 right.getQualifierOffset() + qualifierCommonPrefix); 971 } 972 973 private void moveToPrevious() { 974 if (!previous.isValid()) { 975 throw new IllegalStateException( 976 "Can move back only once and not in first key in the block."); 977 } 978 979 STATE tmp = previous; 980 previous = current; 981 current = tmp; 982 983 // move after last key value 984 currentBuffer.position(current.nextKvOffset); 985 // Already decoded the tag bytes. We cache this tags into current state and also the total 986 // compressed length of the tags bytes. For the next time decodeNext() we don't need to decode 987 // the tags again. This might pollute the Data Dictionary what we use for the compression. 988 // When current.uncompressTags is false, we will just reuse the current.tagsBuffer and skip 989 // 'tagsCompressedLength' bytes of source stream. 990 // See in decodeTags() 991 current.tagsBuffer = previous.tagsBuffer; 992 current.tagsCompressedLength = previous.tagsCompressedLength; 993 current.uncompressTags = false; 994 // The current key has to be reset with the previous Cell 995 current.setKey(current.keyBuffer, current.memstoreTS); 996 previous.invalidate(); 997 } 998 999 @SuppressWarnings("unchecked") 1000 protected STATE createSeekerState() { 1001 // This will fail for non-default seeker state if the subclass does not 1002 // override this method. 1003 return (STATE) new SeekerState(this.tmpPair, this.includesTags()); 1004 } 1005 1006 abstract protected void decodeFirst(); 1007 abstract protected void decodeNext(); 1008 } 1009 1010 /** 1011 * @param cell 1012 * @param out 1013 * @param encodingCtx 1014 * @return unencoded size added 1015 * @throws IOException 1016 */ 1017 protected final int afterEncodingKeyValue(Cell cell, DataOutputStream out, 1018 HFileBlockDefaultEncodingContext encodingCtx) throws IOException { 1019 int size = 0; 1020 if (encodingCtx.getHFileContext().isIncludesTags()) { 1021 int tagsLength = cell.getTagsLength(); 1022 ByteBufferUtils.putCompressedInt(out, tagsLength); 1023 // There are some tags to be written 1024 if (tagsLength > 0) { 1025 TagCompressionContext tagCompressionContext = encodingCtx.getTagCompressionContext(); 1026 // When tag compression is enabled, tagCompressionContext will have a not null value. Write 1027 // the tags using Dictionary compression in such a case 1028 if (tagCompressionContext != null) { 1029 // Not passing tagsLength considering that parsing of the tagsLength is not costly 1030 PrivateCellUtil.compressTags(out, cell, tagCompressionContext); 1031 } else { 1032 PrivateCellUtil.writeTags(out, cell, tagsLength); 1033 } 1034 } 1035 size += tagsLength + KeyValue.TAGS_LENGTH_SIZE; 1036 } 1037 if (encodingCtx.getHFileContext().isIncludesMvcc()) { 1038 // Copy memstore timestamp from the byte buffer to the output stream. 1039 long memstoreTS = cell.getSequenceId(); 1040 WritableUtils.writeVLong(out, memstoreTS); 1041 // TODO use a writeVLong which returns the #bytes written so that 2 time parsing can be 1042 // avoided. 1043 size += WritableUtils.getVIntSize(memstoreTS); 1044 } 1045 return size; 1046 } 1047 1048 protected final void afterDecodingKeyValue(DataInputStream source, 1049 ByteBuffer dest, HFileBlockDefaultDecodingContext decodingCtx) throws IOException { 1050 if (decodingCtx.getHFileContext().isIncludesTags()) { 1051 int tagsLength = ByteBufferUtils.readCompressedInt(source); 1052 // Put as unsigned short 1053 dest.put((byte) ((tagsLength >> 8) & 0xff)); 1054 dest.put((byte) (tagsLength & 0xff)); 1055 if (tagsLength > 0) { 1056 TagCompressionContext tagCompressionContext = decodingCtx.getTagCompressionContext(); 1057 // When tag compression is been used in this file, tagCompressionContext will have a not 1058 // null value passed. 1059 if (tagCompressionContext != null) { 1060 tagCompressionContext.uncompressTags(source, dest, tagsLength); 1061 } else { 1062 ByteBufferUtils.copyFromStreamToBuffer(dest, source, tagsLength); 1063 } 1064 } 1065 } 1066 if (decodingCtx.getHFileContext().isIncludesMvcc()) { 1067 long memstoreTS = -1; 1068 try { 1069 // Copy memstore timestamp from the data input stream to the byte 1070 // buffer. 1071 memstoreTS = WritableUtils.readVLong(source); 1072 ByteBufferUtils.writeVLong(dest, memstoreTS); 1073 } catch (IOException ex) { 1074 throw new RuntimeException("Unable to copy memstore timestamp " + 1075 memstoreTS + " after decoding a key/value"); 1076 } 1077 } 1078 } 1079 1080 protected abstract ByteBuffer internalDecodeKeyValues(DataInputStream source, 1081 int allocateHeaderLength, int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx) 1082 throws IOException; 1083 1084 /** 1085 * Asserts that there is at least the given amount of unfilled space 1086 * remaining in the given buffer. 1087 * @param out typically, the buffer we are writing to 1088 * @param length the required space in the buffer 1089 * @throws EncoderBufferTooSmallException If there are no enough bytes. 1090 */ 1091 protected static void ensureSpace(ByteBuffer out, int length) 1092 throws EncoderBufferTooSmallException { 1093 if (out.position() + length > out.limit()) { 1094 throw new EncoderBufferTooSmallException( 1095 "Buffer position=" + out.position() + 1096 ", buffer limit=" + out.limit() + 1097 ", length to be written=" + length); 1098 } 1099 } 1100 1101 @Override 1102 public void startBlockEncoding(HFileBlockEncodingContext blkEncodingCtx, DataOutputStream out) 1103 throws IOException { 1104 if (blkEncodingCtx.getClass() != HFileBlockDefaultEncodingContext.class) { 1105 throw new IOException (this.getClass().getName() + " only accepts " 1106 + HFileBlockDefaultEncodingContext.class.getName() + " as the " + 1107 "encoding context."); 1108 } 1109 1110 HFileBlockDefaultEncodingContext encodingCtx = 1111 (HFileBlockDefaultEncodingContext) blkEncodingCtx; 1112 encodingCtx.prepareEncoding(out); 1113 if (encodingCtx.getHFileContext().isIncludesTags() 1114 && encodingCtx.getHFileContext().isCompressTags()) { 1115 if (encodingCtx.getTagCompressionContext() != null) { 1116 // It will be overhead to create the TagCompressionContext again and again for every block 1117 // encoding. 1118 encodingCtx.getTagCompressionContext().clear(); 1119 } else { 1120 try { 1121 TagCompressionContext tagCompressionContext = new TagCompressionContext( 1122 LRUDictionary.class, Byte.MAX_VALUE); 1123 encodingCtx.setTagCompressionContext(tagCompressionContext); 1124 } catch (Exception e) { 1125 throw new IOException("Failed to initialize TagCompressionContext", e); 1126 } 1127 } 1128 } 1129 StreamUtils.writeInt(out, 0); // DUMMY length. This will be updated in endBlockEncoding() 1130 blkEncodingCtx.setEncodingState(new BufferedDataBlockEncodingState()); 1131 } 1132 1133 private static class BufferedDataBlockEncodingState extends EncodingState { 1134 int unencodedDataSizeWritten = 0; 1135 } 1136 1137 @Override 1138 public int encode(Cell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out) 1139 throws IOException { 1140 BufferedDataBlockEncodingState state = (BufferedDataBlockEncodingState) encodingCtx 1141 .getEncodingState(); 1142 int encodedKvSize = internalEncode(cell, (HFileBlockDefaultEncodingContext) encodingCtx, out); 1143 state.unencodedDataSizeWritten += encodedKvSize; 1144 return encodedKvSize; 1145 } 1146 1147 public abstract int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingCtx, 1148 DataOutputStream out) throws IOException; 1149 1150 @Override 1151 public void endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out, 1152 byte[] uncompressedBytesWithHeader) throws IOException { 1153 BufferedDataBlockEncodingState state = (BufferedDataBlockEncodingState) encodingCtx 1154 .getEncodingState(); 1155 // Write the unencodedDataSizeWritten (with header size) 1156 Bytes.putInt(uncompressedBytesWithHeader, 1157 HConstants.HFILEBLOCK_HEADER_SIZE + DataBlockEncoding.ID_SIZE, state.unencodedDataSizeWritten 1158 ); 1159 postEncoding(encodingCtx); 1160 } 1161 1162}