001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with this 004 * work for additional information regarding copyright ownership. The ASF 005 * licenses this file to you under the Apache License, Version 2.0 (the 006 * "License"); you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 013 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 014 * License for the specific language governing permissions and limitations 015 * under the License. 016 */ 017package org.apache.hadoop.hbase.io.encoding; 018 019import java.io.DataInputStream; 020import java.io.DataOutputStream; 021import java.io.IOException; 022import java.io.OutputStream; 023import java.nio.ByteBuffer; 024import org.apache.hadoop.hbase.ByteBufferExtendedCell; 025import org.apache.hadoop.hbase.Cell; 026import org.apache.hadoop.hbase.CellComparator; 027import org.apache.hadoop.hbase.CellUtil; 028import org.apache.hadoop.hbase.ExtendedCell; 029import org.apache.hadoop.hbase.HConstants; 030import org.apache.hadoop.hbase.KeyValue; 031import org.apache.hadoop.hbase.KeyValue.Type; 032import org.apache.hadoop.hbase.KeyValueUtil; 033import org.apache.hadoop.hbase.PrivateCellUtil; 034import org.apache.hadoop.hbase.io.TagCompressionContext; 035import org.apache.hadoop.hbase.io.util.LRUDictionary; 036import org.apache.hadoop.hbase.io.util.StreamUtils; 037import org.apache.hadoop.hbase.nio.ByteBuff; 038import org.apache.hadoop.hbase.util.ByteBufferUtils; 039import org.apache.hadoop.hbase.util.Bytes; 040import org.apache.hadoop.hbase.util.ClassSize; 041import org.apache.hadoop.hbase.util.ObjectIntPair; 042import org.apache.hadoop.io.WritableUtils; 043import org.apache.yetus.audience.InterfaceAudience; 044 045/** 046 * Base class for all data block encoders that use a buffer. 047 */ 048@InterfaceAudience.Private 049abstract class BufferedDataBlockEncoder extends AbstractDataBlockEncoder { 050 /** 051 * TODO: This datablockencoder is dealing in internals of hfileblocks. Purge reference to HFBs 052 */ 053 private static int INITIAL_KEY_BUFFER_SIZE = 512; 054 055 @Override 056 public ByteBuffer decodeKeyValues(DataInputStream source, 057 HFileBlockDecodingContext blkDecodingCtx) throws IOException { 058 if (blkDecodingCtx.getClass() != HFileBlockDefaultDecodingContext.class) { 059 throw new IOException(this.getClass().getName() + " only accepts " 060 + HFileBlockDefaultDecodingContext.class.getName() + " as the decoding context."); 061 } 062 063 HFileBlockDefaultDecodingContext decodingCtx = 064 (HFileBlockDefaultDecodingContext) blkDecodingCtx; 065 if (decodingCtx.getHFileContext().isIncludesTags() 066 && decodingCtx.getHFileContext().isCompressTags()) { 067 if (decodingCtx.getTagCompressionContext() != null) { 068 // It will be overhead to create the TagCompressionContext again and again for every block 069 // decoding. 070 decodingCtx.getTagCompressionContext().clear(); 071 } else { 072 try { 073 TagCompressionContext tagCompressionContext = new TagCompressionContext( 074 LRUDictionary.class, Byte.MAX_VALUE); 075 decodingCtx.setTagCompressionContext(tagCompressionContext); 076 } catch (Exception e) { 077 throw new IOException("Failed to initialize TagCompressionContext", e); 078 } 079 } 080 } 081 return internalDecodeKeyValues(source, 0, 0, decodingCtx); 082 } 083 084 /********************* common prefixes *************************/ 085 // Having this as static is fine but if META is having DBE then we should 086 // change this. 087 public static int compareCommonRowPrefix(Cell left, Cell right, int rowCommonPrefix) { 088 return Bytes.compareTo(left.getRowArray(), left.getRowOffset() + rowCommonPrefix, 089 left.getRowLength() - rowCommonPrefix, right.getRowArray(), right.getRowOffset() 090 + rowCommonPrefix, right.getRowLength() - rowCommonPrefix); 091 } 092 093 public static int compareCommonFamilyPrefix(Cell left, Cell right, int familyCommonPrefix) { 094 return Bytes.compareTo(left.getFamilyArray(), left.getFamilyOffset() + familyCommonPrefix, 095 left.getFamilyLength() - familyCommonPrefix, right.getFamilyArray(), 096 right.getFamilyOffset() + familyCommonPrefix, right.getFamilyLength() - familyCommonPrefix); 097 } 098 099 public static int compareCommonQualifierPrefix(Cell left, Cell right, int qualCommonPrefix) { 100 return Bytes.compareTo(left.getQualifierArray(), left.getQualifierOffset() + qualCommonPrefix, 101 left.getQualifierLength() - qualCommonPrefix, right.getQualifierArray(), 102 right.getQualifierOffset() + qualCommonPrefix, right.getQualifierLength() 103 - qualCommonPrefix); 104 } 105 106 protected static class SeekerState { 107 protected ByteBuff currentBuffer; 108 protected TagCompressionContext tagCompressionContext; 109 protected int valueOffset = -1; 110 protected int keyLength; 111 protected int valueLength; 112 protected int lastCommonPrefix; 113 protected int tagsLength = 0; 114 protected int tagsOffset = -1; 115 protected int tagsCompressedLength = 0; 116 protected boolean uncompressTags = true; 117 118 /** We need to store a copy of the key. */ 119 protected byte[] keyBuffer = HConstants.EMPTY_BYTE_ARRAY; 120 protected byte[] tagsBuffer = HConstants.EMPTY_BYTE_ARRAY; 121 122 protected long memstoreTS; 123 protected int nextKvOffset; 124 protected KeyValue.KeyOnlyKeyValue currentKey = new KeyValue.KeyOnlyKeyValue(); 125 // A temp pair object which will be reused by ByteBuff#asSubByteBuffer calls. This avoids too 126 // many object creations. 127 private final ObjectIntPair<ByteBuffer> tmpPair; 128 private final boolean includeTags; 129 130 public SeekerState(ObjectIntPair<ByteBuffer> tmpPair, boolean includeTags) { 131 this.tmpPair = tmpPair; 132 this.includeTags = includeTags; 133 } 134 135 protected boolean isValid() { 136 return valueOffset != -1; 137 } 138 139 protected void invalidate() { 140 valueOffset = -1; 141 tagsCompressedLength = 0; 142 currentKey.clear(); 143 uncompressTags = true; 144 currentBuffer = null; 145 } 146 147 protected void ensureSpaceForKey() { 148 if (keyLength > keyBuffer.length) { 149 int newKeyBufferLength = Integer.highestOneBit(Math.max( 150 INITIAL_KEY_BUFFER_SIZE, keyLength) - 1) << 1; 151 byte[] newKeyBuffer = new byte[newKeyBufferLength]; 152 System.arraycopy(keyBuffer, 0, newKeyBuffer, 0, keyBuffer.length); 153 keyBuffer = newKeyBuffer; 154 } 155 } 156 157 protected void ensureSpaceForTags() { 158 if (tagsLength > tagsBuffer.length) { 159 int newTagsBufferLength = Integer.highestOneBit(Math.max( 160 INITIAL_KEY_BUFFER_SIZE, tagsLength) - 1) << 1; 161 byte[] newTagsBuffer = new byte[newTagsBufferLength]; 162 System.arraycopy(tagsBuffer, 0, newTagsBuffer, 0, tagsBuffer.length); 163 tagsBuffer = newTagsBuffer; 164 } 165 } 166 167 protected void setKey(byte[] keyBuffer, long memTS) { 168 currentKey.setKey(keyBuffer, 0, keyLength); 169 memstoreTS = memTS; 170 } 171 172 /** 173 * Copy the state from the next one into this instance (the previous state 174 * placeholder). Used to save the previous state when we are advancing the 175 * seeker to the next key/value. 176 */ 177 protected void copyFromNext(SeekerState nextState) { 178 if (keyBuffer.length != nextState.keyBuffer.length) { 179 keyBuffer = nextState.keyBuffer.clone(); 180 } else if (!isValid()) { 181 // Note: we can only call isValid before we override our state, so this 182 // comes before all the assignments at the end of this method. 183 System.arraycopy(nextState.keyBuffer, 0, keyBuffer, 0, 184 nextState.keyLength); 185 } else { 186 // don't copy the common prefix between this key and the previous one 187 System.arraycopy(nextState.keyBuffer, nextState.lastCommonPrefix, 188 keyBuffer, nextState.lastCommonPrefix, nextState.keyLength 189 - nextState.lastCommonPrefix); 190 } 191 currentKey.set(nextState.currentKey); 192 193 valueOffset = nextState.valueOffset; 194 keyLength = nextState.keyLength; 195 valueLength = nextState.valueLength; 196 lastCommonPrefix = nextState.lastCommonPrefix; 197 nextKvOffset = nextState.nextKvOffset; 198 memstoreTS = nextState.memstoreTS; 199 currentBuffer = nextState.currentBuffer; 200 tagsOffset = nextState.tagsOffset; 201 tagsLength = nextState.tagsLength; 202 if (nextState.tagCompressionContext != null) { 203 tagCompressionContext = nextState.tagCompressionContext; 204 } 205 } 206 207 public Cell toCell() { 208 // Buffer backing the value and tags part from the HFileBlock's buffer 209 // When tag compression in use, this will be only the value bytes area. 210 ByteBuffer valAndTagsBuffer; 211 int vOffset; 212 int valAndTagsLength = this.valueLength; 213 int tagsLenSerializationSize = 0; 214 if (this.includeTags && this.tagCompressionContext == null) { 215 // Include the tags part also. This will be the tags bytes + 2 bytes of for storing tags 216 // length 217 tagsLenSerializationSize = this.tagsOffset - (this.valueOffset + this.valueLength); 218 valAndTagsLength += tagsLenSerializationSize + this.tagsLength; 219 } 220 this.currentBuffer.asSubByteBuffer(this.valueOffset, valAndTagsLength, this.tmpPair); 221 valAndTagsBuffer = this.tmpPair.getFirst(); 222 vOffset = this.tmpPair.getSecond();// This is the offset to value part in the BB 223 if (valAndTagsBuffer.hasArray()) { 224 return toOnheapCell(valAndTagsBuffer, vOffset, tagsLenSerializationSize); 225 } else { 226 return toOffheapCell(valAndTagsBuffer, vOffset, tagsLenSerializationSize); 227 } 228 } 229 230 private Cell toOnheapCell(ByteBuffer valAndTagsBuffer, int vOffset, 231 int tagsLenSerializationSize) { 232 byte[] tagsArray = HConstants.EMPTY_BYTE_ARRAY; 233 int tOffset = 0; 234 if (this.includeTags) { 235 if (this.tagCompressionContext == null) { 236 tagsArray = valAndTagsBuffer.array(); 237 tOffset = valAndTagsBuffer.arrayOffset() + vOffset + this.valueLength 238 + tagsLenSerializationSize; 239 } else { 240 tagsArray = Bytes.copy(tagsBuffer, 0, this.tagsLength); 241 tOffset = 0; 242 } 243 } 244 return new OnheapDecodedCell(Bytes.copy(keyBuffer, 0, this.keyLength), 245 currentKey.getRowLength(), currentKey.getFamilyOffset(), currentKey.getFamilyLength(), 246 currentKey.getQualifierOffset(), currentKey.getQualifierLength(), 247 currentKey.getTimestamp(), currentKey.getTypeByte(), valAndTagsBuffer.array(), 248 valAndTagsBuffer.arrayOffset() + vOffset, this.valueLength, memstoreTS, tagsArray, 249 tOffset, this.tagsLength); 250 } 251 252 private Cell toOffheapCell(ByteBuffer valAndTagsBuffer, int vOffset, 253 int tagsLenSerializationSize) { 254 ByteBuffer tagsBuf = HConstants.EMPTY_BYTE_BUFFER; 255 int tOffset = 0; 256 if (this.includeTags) { 257 if (this.tagCompressionContext == null) { 258 tagsBuf = valAndTagsBuffer; 259 tOffset = vOffset + this.valueLength + tagsLenSerializationSize; 260 } else { 261 tagsBuf = ByteBuffer.wrap(Bytes.copy(tagsBuffer, 0, this.tagsLength)); 262 tOffset = 0; 263 } 264 } 265 return new OffheapDecodedExtendedCell( 266 ByteBuffer.wrap(Bytes.copy(keyBuffer, 0, this.keyLength)), currentKey.getRowLength(), 267 currentKey.getFamilyOffset(), currentKey.getFamilyLength(), 268 currentKey.getQualifierOffset(), currentKey.getQualifierLength(), 269 currentKey.getTimestamp(), currentKey.getTypeByte(), valAndTagsBuffer, vOffset, 270 this.valueLength, memstoreTS, tagsBuf, tOffset, this.tagsLength); 271 } 272 } 273 274 /** 275 * Copies only the key part of the keybuffer by doing a deep copy and passes the 276 * seeker state members for taking a clone. 277 * Note that the value byte[] part is still pointing to the currentBuffer and 278 * represented by the valueOffset and valueLength 279 */ 280 // We return this as a Cell to the upper layers of read flow and might try setting a new SeqId 281 // there. So this has to be an instance of ExtendedCell. 282 protected static class OnheapDecodedCell implements ExtendedCell { 283 private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT 284 + (3 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT) 285 + (Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) + (3 * ClassSize.ARRAY)); 286 private byte[] keyOnlyBuffer; 287 private short rowLength; 288 private int familyOffset; 289 private byte familyLength; 290 private int qualifierOffset; 291 private int qualifierLength; 292 private long timestamp; 293 private byte typeByte; 294 private byte[] valueBuffer; 295 private int valueOffset; 296 private int valueLength; 297 private byte[] tagsBuffer; 298 private int tagsOffset; 299 private int tagsLength; 300 private long seqId; 301 302 protected OnheapDecodedCell(byte[] keyBuffer, short rowLength, int familyOffset, 303 byte familyLength, int qualOffset, int qualLength, long timeStamp, byte typeByte, 304 byte[] valueBuffer, int valueOffset, int valueLen, long seqId, byte[] tagsBuffer, 305 int tagsOffset, int tagsLength) { 306 this.keyOnlyBuffer = keyBuffer; 307 this.rowLength = rowLength; 308 this.familyOffset = familyOffset; 309 this.familyLength = familyLength; 310 this.qualifierOffset = qualOffset; 311 this.qualifierLength = qualLength; 312 this.timestamp = timeStamp; 313 this.typeByte = typeByte; 314 this.valueBuffer = valueBuffer; 315 this.valueOffset = valueOffset; 316 this.valueLength = valueLen; 317 this.tagsBuffer = tagsBuffer; 318 this.tagsOffset = tagsOffset; 319 this.tagsLength = tagsLength; 320 setSequenceId(seqId); 321 } 322 323 @Override 324 public byte[] getRowArray() { 325 return keyOnlyBuffer; 326 } 327 328 @Override 329 public byte[] getFamilyArray() { 330 return keyOnlyBuffer; 331 } 332 333 @Override 334 public byte[] getQualifierArray() { 335 return keyOnlyBuffer; 336 } 337 338 @Override 339 public int getRowOffset() { 340 return Bytes.SIZEOF_SHORT; 341 } 342 343 @Override 344 public short getRowLength() { 345 return rowLength; 346 } 347 348 @Override 349 public int getFamilyOffset() { 350 return familyOffset; 351 } 352 353 @Override 354 public byte getFamilyLength() { 355 return familyLength; 356 } 357 358 @Override 359 public int getQualifierOffset() { 360 return qualifierOffset; 361 } 362 363 @Override 364 public int getQualifierLength() { 365 return qualifierLength; 366 } 367 368 @Override 369 public long getTimestamp() { 370 return timestamp; 371 } 372 373 @Override 374 public byte getTypeByte() { 375 return typeByte; 376 } 377 378 @Override 379 public long getSequenceId() { 380 return seqId; 381 } 382 383 @Override 384 public byte[] getValueArray() { 385 return this.valueBuffer; 386 } 387 388 @Override 389 public int getValueOffset() { 390 return valueOffset; 391 } 392 393 @Override 394 public int getValueLength() { 395 return valueLength; 396 } 397 398 @Override 399 public byte[] getTagsArray() { 400 return this.tagsBuffer; 401 } 402 403 @Override 404 public int getTagsOffset() { 405 return this.tagsOffset; 406 } 407 408 @Override 409 public int getTagsLength() { 410 return tagsLength; 411 } 412 413 @Override 414 public String toString() { 415 return KeyValue.keyToString(this.keyOnlyBuffer, 0, KeyValueUtil.keyLength(this)) + "/vlen=" 416 + getValueLength() + "/seqid=" + seqId; 417 } 418 419 @Override 420 public void setSequenceId(long seqId) { 421 this.seqId = seqId; 422 } 423 424 @Override 425 public long heapSize() { 426 return FIXED_OVERHEAD + rowLength + familyLength + qualifierLength + valueLength + tagsLength; 427 } 428 429 @Override 430 public int write(OutputStream out, boolean withTags) throws IOException { 431 int lenToWrite = getSerializedSize(withTags); 432 ByteBufferUtils.putInt(out, keyOnlyBuffer.length); 433 ByteBufferUtils.putInt(out, valueLength); 434 // Write key 435 out.write(keyOnlyBuffer); 436 // Write value 437 out.write(this.valueBuffer, this.valueOffset, this.valueLength); 438 if (withTags && this.tagsLength > 0) { 439 // 2 bytes tags length followed by tags bytes 440 // tags length is serialized with 2 bytes only(short way) even if the type is int. 441 // As this is non -ve numbers, we save the sign bit. See HBASE-11437 442 out.write((byte) (0xff & (this.tagsLength >> 8))); 443 out.write((byte) (0xff & this.tagsLength)); 444 out.write(this.tagsBuffer, this.tagsOffset, this.tagsLength); 445 } 446 return lenToWrite; 447 } 448 449 @Override 450 public int getSerializedSize(boolean withTags) { 451 return KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength, tagsLength, 452 withTags); 453 } 454 455 @Override 456 public void write(ByteBuffer buf, int offset) { 457 // This is not used in actual flow. Throwing UnsupportedOperationException 458 throw new UnsupportedOperationException(); 459 } 460 461 @Override 462 public void setTimestamp(long ts) throws IOException { 463 // This is not used in actual flow. Throwing UnsupportedOperationException 464 throw new UnsupportedOperationException(); 465 } 466 467 @Override 468 public void setTimestamp(byte[] ts) throws IOException { 469 // This is not used in actual flow. Throwing UnsupportedOperationException 470 throw new UnsupportedOperationException(); 471 } 472 473 @Override 474 public ExtendedCell deepClone() { 475 // This is not used in actual flow. Throwing UnsupportedOperationException 476 throw new UnsupportedOperationException(); 477 } 478 } 479 480 protected static class OffheapDecodedExtendedCell extends ByteBufferExtendedCell { 481 private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT 482 + (3 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT) 483 + (Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) + (3 * ClassSize.BYTE_BUFFER)); 484 private ByteBuffer keyBuffer; 485 private short rowLength; 486 private int familyOffset; 487 private byte familyLength; 488 private int qualifierOffset; 489 private int qualifierLength; 490 private long timestamp; 491 private byte typeByte; 492 private ByteBuffer valueBuffer; 493 private int valueOffset; 494 private int valueLength; 495 private ByteBuffer tagsBuffer; 496 private int tagsOffset; 497 private int tagsLength; 498 private long seqId; 499 500 protected OffheapDecodedExtendedCell(ByteBuffer keyBuffer, short rowLength, int familyOffset, 501 byte familyLength, int qualOffset, int qualLength, long timeStamp, byte typeByte, 502 ByteBuffer valueBuffer, int valueOffset, int valueLen, long seqId, ByteBuffer tagsBuffer, 503 int tagsOffset, int tagsLength) { 504 // The keyBuffer is always onheap 505 assert keyBuffer.hasArray(); 506 assert keyBuffer.arrayOffset() == 0; 507 this.keyBuffer = keyBuffer; 508 this.rowLength = rowLength; 509 this.familyOffset = familyOffset; 510 this.familyLength = familyLength; 511 this.qualifierOffset = qualOffset; 512 this.qualifierLength = qualLength; 513 this.timestamp = timeStamp; 514 this.typeByte = typeByte; 515 this.valueBuffer = valueBuffer; 516 this.valueOffset = valueOffset; 517 this.valueLength = valueLen; 518 this.tagsBuffer = tagsBuffer; 519 this.tagsOffset = tagsOffset; 520 this.tagsLength = tagsLength; 521 setSequenceId(seqId); 522 } 523 524 @Override 525 public byte[] getRowArray() { 526 return this.keyBuffer.array(); 527 } 528 529 @Override 530 public int getRowOffset() { 531 return getRowPosition(); 532 } 533 534 @Override 535 public short getRowLength() { 536 return this.rowLength; 537 } 538 539 @Override 540 public byte[] getFamilyArray() { 541 return this.keyBuffer.array(); 542 } 543 544 @Override 545 public int getFamilyOffset() { 546 return getFamilyPosition(); 547 } 548 549 @Override 550 public byte getFamilyLength() { 551 return this.familyLength; 552 } 553 554 @Override 555 public byte[] getQualifierArray() { 556 return this.keyBuffer.array(); 557 } 558 559 @Override 560 public int getQualifierOffset() { 561 return getQualifierPosition(); 562 } 563 564 @Override 565 public int getQualifierLength() { 566 return this.qualifierLength; 567 } 568 569 @Override 570 public long getTimestamp() { 571 return this.timestamp; 572 } 573 574 @Override 575 public byte getTypeByte() { 576 return this.typeByte; 577 } 578 579 @Override 580 public long getSequenceId() { 581 return this.seqId; 582 } 583 584 @Override 585 public byte[] getValueArray() { 586 return CellUtil.cloneValue(this); 587 } 588 589 @Override 590 public int getValueOffset() { 591 return 0; 592 } 593 594 @Override 595 public int getValueLength() { 596 return this.valueLength; 597 } 598 599 @Override 600 public byte[] getTagsArray() { 601 return CellUtil.cloneTags(this); 602 } 603 604 @Override 605 public int getTagsOffset() { 606 return 0; 607 } 608 609 @Override 610 public int getTagsLength() { 611 return this.tagsLength; 612 } 613 614 @Override 615 public ByteBuffer getRowByteBuffer() { 616 return this.keyBuffer; 617 } 618 619 @Override 620 public int getRowPosition() { 621 return Bytes.SIZEOF_SHORT; 622 } 623 624 @Override 625 public ByteBuffer getFamilyByteBuffer() { 626 return this.keyBuffer; 627 } 628 629 @Override 630 public int getFamilyPosition() { 631 return this.familyOffset; 632 } 633 634 @Override 635 public ByteBuffer getQualifierByteBuffer() { 636 return this.keyBuffer; 637 } 638 639 @Override 640 public int getQualifierPosition() { 641 return this.qualifierOffset; 642 } 643 644 @Override 645 public ByteBuffer getValueByteBuffer() { 646 return this.valueBuffer; 647 } 648 649 @Override 650 public int getValuePosition() { 651 return this.valueOffset; 652 } 653 654 @Override 655 public ByteBuffer getTagsByteBuffer() { 656 return this.tagsBuffer; 657 } 658 659 @Override 660 public int getTagsPosition() { 661 return this.tagsOffset; 662 } 663 664 @Override 665 public long heapSize() { 666 return FIXED_OVERHEAD; 667 } 668 669 @Override 670 public void setSequenceId(long seqId) { 671 this.seqId = seqId; 672 } 673 674 @Override 675 public int write(OutputStream out, boolean withTags) throws IOException { 676 int lenToWrite = getSerializedSize(withTags); 677 ByteBufferUtils.putInt(out, keyBuffer.capacity()); 678 ByteBufferUtils.putInt(out, valueLength); 679 // Write key 680 out.write(keyBuffer.array()); 681 // Write value 682 ByteBufferUtils.copyBufferToStream(out, this.valueBuffer, this.valueOffset, this.valueLength); 683 if (withTags && this.tagsLength > 0) { 684 // 2 bytes tags length followed by tags bytes 685 // tags length is serialized with 2 bytes only(short way) even if the type is int. 686 // As this is non -ve numbers, we save the sign bit. See HBASE-11437 687 out.write((byte) (0xff & (this.tagsLength >> 8))); 688 out.write((byte) (0xff & this.tagsLength)); 689 ByteBufferUtils.copyBufferToStream(out, this.tagsBuffer, this.tagsOffset, this.tagsLength); 690 } 691 return lenToWrite; 692 } 693 694 @Override 695 public int getSerializedSize(boolean withTags) { 696 return KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength, tagsLength, 697 withTags); 698 } 699 700 @Override 701 public void setTimestamp(long ts) throws IOException { 702 // This is not used in actual flow. Throwing UnsupportedOperationException 703 throw new UnsupportedOperationException(); 704 } 705 706 @Override 707 public void setTimestamp(byte[] ts) throws IOException { 708 // This is not used in actual flow. Throwing UnsupportedOperationException 709 throw new UnsupportedOperationException(); 710 } 711 712 @Override 713 public void write(ByteBuffer buf, int offset) { 714 // This is not used in actual flow. Throwing UnsupportedOperationException 715 throw new UnsupportedOperationException(); 716 } 717 718 @Override 719 public ExtendedCell deepClone() { 720 // This is not used in actual flow. Throwing UnsupportedOperationException 721 throw new UnsupportedOperationException(); 722 } 723 } 724 725 protected abstract static class BufferedEncodedSeeker<STATE extends SeekerState> 726 extends AbstractEncodedSeeker { 727 protected ByteBuff currentBuffer; 728 protected TagCompressionContext tagCompressionContext = null; 729 protected KeyValue.KeyOnlyKeyValue keyOnlyKV = new KeyValue.KeyOnlyKeyValue(); 730 // A temp pair object which will be reused by ByteBuff#asSubByteBuffer calls. This avoids too 731 // many object creations. 732 protected final ObjectIntPair<ByteBuffer> tmpPair = new ObjectIntPair<>(); 733 protected STATE current, previous; 734 735 public BufferedEncodedSeeker(HFileBlockDecodingContext decodingCtx) { 736 super(decodingCtx); 737 if (decodingCtx.getHFileContext().isCompressTags()) { 738 try { 739 tagCompressionContext = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE); 740 } catch (Exception e) { 741 throw new RuntimeException("Failed to initialize TagCompressionContext", e); 742 } 743 } 744 current = createSeekerState(); // always valid 745 previous = createSeekerState(); // may not be valid 746 } 747 748 @Override 749 public int compareKey(CellComparator comparator, Cell key) { 750 keyOnlyKV.setKey(current.keyBuffer, 0, current.keyLength); 751 return PrivateCellUtil.compareKeyIgnoresMvcc(comparator, key, keyOnlyKV); 752 } 753 754 @Override 755 public void setCurrentBuffer(ByteBuff buffer) { 756 if (this.tagCompressionContext != null) { 757 this.tagCompressionContext.clear(); 758 } 759 currentBuffer = buffer; 760 current.currentBuffer = currentBuffer; 761 if(tagCompressionContext != null) { 762 current.tagCompressionContext = tagCompressionContext; 763 } 764 decodeFirst(); 765 current.setKey(current.keyBuffer, current.memstoreTS); 766 previous.invalidate(); 767 } 768 769 @Override 770 public Cell getKey() { 771 byte[] key = new byte[current.keyLength]; 772 System.arraycopy(current.keyBuffer, 0, key, 0, current.keyLength); 773 return new KeyValue.KeyOnlyKeyValue(key); 774 } 775 776 @Override 777 public ByteBuffer getValueShallowCopy() { 778 currentBuffer.asSubByteBuffer(current.valueOffset, current.valueLength, tmpPair); 779 ByteBuffer dup = tmpPair.getFirst().duplicate(); 780 dup.position(tmpPair.getSecond()); 781 dup.limit(tmpPair.getSecond() + current.valueLength); 782 return dup.slice(); 783 } 784 785 @Override 786 public Cell getCell() { 787 return current.toCell(); 788 } 789 790 @Override 791 public void rewind() { 792 currentBuffer.rewind(); 793 if (tagCompressionContext != null) { 794 tagCompressionContext.clear(); 795 } 796 decodeFirst(); 797 current.setKey(current.keyBuffer, current.memstoreTS); 798 previous.invalidate(); 799 } 800 801 @Override 802 public boolean next() { 803 if (!currentBuffer.hasRemaining()) { 804 return false; 805 } 806 decodeNext(); 807 current.setKey(current.keyBuffer, current.memstoreTS); 808 previous.invalidate(); 809 return true; 810 } 811 812 protected void decodeTags() { 813 current.tagsLength = ByteBuff.readCompressedInt(currentBuffer); 814 if (tagCompressionContext != null) { 815 if (current.uncompressTags) { 816 // Tag compression is been used. uncompress it into tagsBuffer 817 current.ensureSpaceForTags(); 818 try { 819 current.tagsCompressedLength = tagCompressionContext.uncompressTags(currentBuffer, 820 current.tagsBuffer, 0, current.tagsLength); 821 } catch (IOException e) { 822 throw new RuntimeException("Exception while uncompressing tags", e); 823 } 824 } else { 825 currentBuffer.skip(current.tagsCompressedLength); 826 current.uncompressTags = true;// Reset this. 827 } 828 current.tagsOffset = -1; 829 } else { 830 // When tag compress is not used, let us not do copying of tags bytes into tagsBuffer. 831 // Just mark the tags Offset so as to create the KV buffer later in getKeyValueBuffer() 832 current.tagsOffset = currentBuffer.position(); 833 currentBuffer.skip(current.tagsLength); 834 } 835 } 836 837 @Override 838 public int seekToKeyInBlock(Cell seekCell, boolean seekBefore) { 839 int rowCommonPrefix = 0; 840 int familyCommonPrefix = 0; 841 int qualCommonPrefix = 0; 842 previous.invalidate(); 843 do { 844 int comp; 845 keyOnlyKV.setKey(current.keyBuffer, 0, current.keyLength); 846 if (current.lastCommonPrefix != 0) { 847 // The KV format has row key length also in the byte array. The 848 // common prefix 849 // includes it. So we need to subtract to find out the common prefix 850 // in the 851 // row part alone 852 rowCommonPrefix = Math.min(rowCommonPrefix, current.lastCommonPrefix - 2); 853 } 854 if (current.lastCommonPrefix <= 2) { 855 rowCommonPrefix = 0; 856 } 857 rowCommonPrefix += findCommonPrefixInRowPart(seekCell, keyOnlyKV, rowCommonPrefix); 858 comp = compareCommonRowPrefix(seekCell, keyOnlyKV, rowCommonPrefix); 859 if (comp == 0) { 860 comp = compareTypeBytes(seekCell, keyOnlyKV); 861 if (comp == 0) { 862 // Subtract the fixed row key length and the family key fixed length 863 familyCommonPrefix = Math.max( 864 0, 865 Math.min(familyCommonPrefix, 866 current.lastCommonPrefix - (3 + keyOnlyKV.getRowLength()))); 867 familyCommonPrefix += findCommonPrefixInFamilyPart(seekCell, keyOnlyKV, 868 familyCommonPrefix); 869 comp = compareCommonFamilyPrefix(seekCell, keyOnlyKV, familyCommonPrefix); 870 if (comp == 0) { 871 // subtract the rowkey fixed length and the family key fixed 872 // length 873 qualCommonPrefix = Math.max( 874 0, 875 Math.min( 876 qualCommonPrefix, 877 current.lastCommonPrefix 878 - (3 + keyOnlyKV.getRowLength() + keyOnlyKV.getFamilyLength()))); 879 qualCommonPrefix += findCommonPrefixInQualifierPart(seekCell, keyOnlyKV, 880 qualCommonPrefix); 881 comp = compareCommonQualifierPrefix(seekCell, keyOnlyKV, qualCommonPrefix); 882 if (comp == 0) { 883 comp = CellComparator.getInstance().compareTimestamps(seekCell, keyOnlyKV); 884 if (comp == 0) { 885 // Compare types. Let the delete types sort ahead of puts; 886 // i.e. types 887 // of higher numbers sort before those of lesser numbers. 888 // Maximum 889 // (255) 890 // appears ahead of everything, and minimum (0) appears 891 // after 892 // everything. 893 comp = (0xff & keyOnlyKV.getTypeByte()) - (0xff & seekCell.getTypeByte()); 894 } 895 } 896 } 897 } 898 } 899 if (comp == 0) { // exact match 900 if (seekBefore) { 901 if (!previous.isValid()) { 902 // The caller (seekBefore) has to ensure that we are not at the 903 // first key in the block. 904 throw new IllegalStateException("Cannot seekBefore if " 905 + "positioned at the first key in the block: key=" 906 + Bytes.toStringBinary(seekCell.getRowArray())); 907 } 908 moveToPrevious(); 909 return 1; 910 } 911 return 0; 912 } 913 914 if (comp < 0) { // already too large, check previous 915 if (previous.isValid()) { 916 moveToPrevious(); 917 } else { 918 return HConstants.INDEX_KEY_MAGIC; // using optimized index key 919 } 920 return 1; 921 } 922 923 // move to next, if more data is available 924 if (currentBuffer.hasRemaining()) { 925 previous.copyFromNext(current); 926 decodeNext(); 927 current.setKey(current.keyBuffer, current.memstoreTS); 928 } else { 929 break; 930 } 931 } while (true); 932 933 // we hit the end of the block, not an exact match 934 return 1; 935 } 936 937 private int compareTypeBytes(Cell key, Cell right) { 938 if (key.getFamilyLength() + key.getQualifierLength() == 0 939 && key.getTypeByte() == Type.Minimum.getCode()) { 940 // left is "bigger", i.e. it appears later in the sorted order 941 return 1; 942 } 943 if (right.getFamilyLength() + right.getQualifierLength() == 0 944 && right.getTypeByte() == Type.Minimum.getCode()) { 945 return -1; 946 } 947 return 0; 948 } 949 950 private static int findCommonPrefixInRowPart(Cell left, Cell right, int rowCommonPrefix) { 951 return Bytes.findCommonPrefix(left.getRowArray(), right.getRowArray(), left.getRowLength() 952 - rowCommonPrefix, right.getRowLength() - rowCommonPrefix, left.getRowOffset() 953 + rowCommonPrefix, right.getRowOffset() + rowCommonPrefix); 954 } 955 956 private static int findCommonPrefixInFamilyPart(Cell left, Cell right, int familyCommonPrefix) { 957 return Bytes 958 .findCommonPrefix(left.getFamilyArray(), right.getFamilyArray(), left.getFamilyLength() 959 - familyCommonPrefix, right.getFamilyLength() - familyCommonPrefix, 960 left.getFamilyOffset() + familyCommonPrefix, right.getFamilyOffset() 961 + familyCommonPrefix); 962 } 963 964 private static int findCommonPrefixInQualifierPart(Cell left, Cell right, 965 int qualifierCommonPrefix) { 966 return Bytes.findCommonPrefix(left.getQualifierArray(), right.getQualifierArray(), 967 left.getQualifierLength() - qualifierCommonPrefix, right.getQualifierLength() 968 - qualifierCommonPrefix, left.getQualifierOffset() + qualifierCommonPrefix, 969 right.getQualifierOffset() + qualifierCommonPrefix); 970 } 971 972 private void moveToPrevious() { 973 if (!previous.isValid()) { 974 throw new IllegalStateException( 975 "Can move back only once and not in first key in the block."); 976 } 977 978 STATE tmp = previous; 979 previous = current; 980 current = tmp; 981 982 // move after last key value 983 currentBuffer.position(current.nextKvOffset); 984 // Already decoded the tag bytes. We cache this tags into current state and also the total 985 // compressed length of the tags bytes. For the next time decodeNext() we don't need to decode 986 // the tags again. This might pollute the Data Dictionary what we use for the compression. 987 // When current.uncompressTags is false, we will just reuse the current.tagsBuffer and skip 988 // 'tagsCompressedLength' bytes of source stream. 989 // See in decodeTags() 990 current.tagsBuffer = previous.tagsBuffer; 991 current.tagsCompressedLength = previous.tagsCompressedLength; 992 current.uncompressTags = false; 993 // The current key has to be reset with the previous Cell 994 current.setKey(current.keyBuffer, current.memstoreTS); 995 previous.invalidate(); 996 } 997 998 @SuppressWarnings("unchecked") 999 protected STATE createSeekerState() { 1000 // This will fail for non-default seeker state if the subclass does not 1001 // override this method. 1002 return (STATE) new SeekerState(this.tmpPair, this.includesTags()); 1003 } 1004 1005 abstract protected void decodeFirst(); 1006 abstract protected void decodeNext(); 1007 } 1008 1009 /** 1010 * @return unencoded size added 1011 */ 1012 protected final int afterEncodingKeyValue(Cell cell, DataOutputStream out, 1013 HFileBlockDefaultEncodingContext encodingCtx) throws IOException { 1014 int size = 0; 1015 if (encodingCtx.getHFileContext().isIncludesTags()) { 1016 int tagsLength = cell.getTagsLength(); 1017 ByteBufferUtils.putCompressedInt(out, tagsLength); 1018 // There are some tags to be written 1019 if (tagsLength > 0) { 1020 TagCompressionContext tagCompressionContext = encodingCtx.getTagCompressionContext(); 1021 // When tag compression is enabled, tagCompressionContext will have a not null value. Write 1022 // the tags using Dictionary compression in such a case 1023 if (tagCompressionContext != null) { 1024 // Not passing tagsLength considering that parsing of the tagsLength is not costly 1025 PrivateCellUtil.compressTags(out, cell, tagCompressionContext); 1026 } else { 1027 PrivateCellUtil.writeTags(out, cell, tagsLength); 1028 } 1029 } 1030 size += tagsLength + KeyValue.TAGS_LENGTH_SIZE; 1031 } 1032 if (encodingCtx.getHFileContext().isIncludesMvcc()) { 1033 // Copy memstore timestamp from the byte buffer to the output stream. 1034 long memstoreTS = cell.getSequenceId(); 1035 WritableUtils.writeVLong(out, memstoreTS); 1036 // TODO use a writeVLong which returns the #bytes written so that 2 time parsing can be 1037 // avoided. 1038 size += WritableUtils.getVIntSize(memstoreTS); 1039 } 1040 return size; 1041 } 1042 1043 protected final void afterDecodingKeyValue(DataInputStream source, 1044 ByteBuffer dest, HFileBlockDefaultDecodingContext decodingCtx) throws IOException { 1045 if (decodingCtx.getHFileContext().isIncludesTags()) { 1046 int tagsLength = ByteBufferUtils.readCompressedInt(source); 1047 // Put as unsigned short 1048 dest.put((byte) ((tagsLength >> 8) & 0xff)); 1049 dest.put((byte) (tagsLength & 0xff)); 1050 if (tagsLength > 0) { 1051 TagCompressionContext tagCompressionContext = decodingCtx.getTagCompressionContext(); 1052 // When tag compression is been used in this file, tagCompressionContext will have a not 1053 // null value passed. 1054 if (tagCompressionContext != null) { 1055 tagCompressionContext.uncompressTags(source, dest, tagsLength); 1056 } else { 1057 ByteBufferUtils.copyFromStreamToBuffer(dest, source, tagsLength); 1058 } 1059 } 1060 } 1061 if (decodingCtx.getHFileContext().isIncludesMvcc()) { 1062 long memstoreTS = -1; 1063 try { 1064 // Copy memstore timestamp from the data input stream to the byte 1065 // buffer. 1066 memstoreTS = WritableUtils.readVLong(source); 1067 ByteBufferUtils.writeVLong(dest, memstoreTS); 1068 } catch (IOException ex) { 1069 throw new RuntimeException("Unable to copy memstore timestamp " + 1070 memstoreTS + " after decoding a key/value"); 1071 } 1072 } 1073 } 1074 1075 protected abstract ByteBuffer internalDecodeKeyValues(DataInputStream source, 1076 int allocateHeaderLength, int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx) 1077 throws IOException; 1078 1079 /** 1080 * Asserts that there is at least the given amount of unfilled space 1081 * remaining in the given buffer. 1082 * @param out typically, the buffer we are writing to 1083 * @param length the required space in the buffer 1084 * @throws EncoderBufferTooSmallException If there are no enough bytes. 1085 */ 1086 protected static void ensureSpace(ByteBuffer out, int length) 1087 throws EncoderBufferTooSmallException { 1088 if (out.position() + length > out.limit()) { 1089 throw new EncoderBufferTooSmallException( 1090 "Buffer position=" + out.position() + 1091 ", buffer limit=" + out.limit() + 1092 ", length to be written=" + length); 1093 } 1094 } 1095 1096 @Override 1097 public void startBlockEncoding(HFileBlockEncodingContext blkEncodingCtx, DataOutputStream out) 1098 throws IOException { 1099 if (blkEncodingCtx.getClass() != HFileBlockDefaultEncodingContext.class) { 1100 throw new IOException(this.getClass().getName() + " only accepts " 1101 + HFileBlockDefaultEncodingContext.class.getName() + " as the " + 1102 "encoding context."); 1103 } 1104 1105 HFileBlockDefaultEncodingContext encodingCtx = 1106 (HFileBlockDefaultEncodingContext) blkEncodingCtx; 1107 encodingCtx.prepareEncoding(out); 1108 if (encodingCtx.getHFileContext().isIncludesTags() 1109 && encodingCtx.getHFileContext().isCompressTags()) { 1110 if (encodingCtx.getTagCompressionContext() != null) { 1111 // It will be overhead to create the TagCompressionContext again and again for every block 1112 // encoding. 1113 encodingCtx.getTagCompressionContext().clear(); 1114 } else { 1115 try { 1116 TagCompressionContext tagCompressionContext = new TagCompressionContext( 1117 LRUDictionary.class, Byte.MAX_VALUE); 1118 encodingCtx.setTagCompressionContext(tagCompressionContext); 1119 } catch (Exception e) { 1120 throw new IOException("Failed to initialize TagCompressionContext", e); 1121 } 1122 } 1123 } 1124 StreamUtils.writeInt(out, 0); // DUMMY length. This will be updated in endBlockEncoding() 1125 blkEncodingCtx.setEncodingState(new EncodingState()); 1126 } 1127 1128 @Override 1129 public void encode(Cell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out) 1130 throws IOException { 1131 EncodingState state = encodingCtx.getEncodingState(); 1132 int posBeforeEncode = out.size(); 1133 int encodedKvSize = internalEncode(cell, (HFileBlockDefaultEncodingContext) encodingCtx, out); 1134 state.postCellEncode(encodedKvSize, out.size() - posBeforeEncode); 1135 } 1136 1137 public abstract int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingCtx, 1138 DataOutputStream out) throws IOException; 1139 1140 @Override 1141 public void endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out, 1142 byte[] uncompressedBytesWithHeader) throws IOException { 1143 EncodingState state = encodingCtx.getEncodingState(); 1144 // Write the unencodedDataSizeWritten (with header size) 1145 Bytes.putInt(uncompressedBytesWithHeader, 1146 HConstants.HFILEBLOCK_HEADER_SIZE + DataBlockEncoding.ID_SIZE, 1147 state.getUnencodedDataSizeWritten()); 1148 postEncoding(encodingCtx); 1149 } 1150 1151}