001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.encoding; 019 020import java.io.DataInputStream; 021import java.io.DataOutputStream; 022import java.io.IOException; 023import java.io.OutputStream; 024import java.nio.ByteBuffer; 025import org.apache.hadoop.hbase.ByteBufferExtendedCell; 026import org.apache.hadoop.hbase.Cell; 027import org.apache.hadoop.hbase.CellComparator; 028import org.apache.hadoop.hbase.CellUtil; 029import org.apache.hadoop.hbase.ExtendedCell; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.KeyValue; 032import org.apache.hadoop.hbase.KeyValueUtil; 033import org.apache.hadoop.hbase.PrivateCellUtil; 034import org.apache.hadoop.hbase.io.TagCompressionContext; 035import org.apache.hadoop.hbase.io.util.LRUDictionary; 036import org.apache.hadoop.hbase.io.util.StreamUtils; 037import org.apache.hadoop.hbase.nio.ByteBuff; 038import org.apache.hadoop.hbase.util.ByteBufferUtils; 039import org.apache.hadoop.hbase.util.Bytes; 040import org.apache.hadoop.hbase.util.ClassSize; 041import org.apache.hadoop.hbase.util.ObjectIntPair; 042import org.apache.hadoop.io.WritableUtils; 043import org.apache.yetus.audience.InterfaceAudience; 044 045/** 046 * Base class for all data block encoders that use a buffer. 047 */ 048@InterfaceAudience.Private 049abstract class BufferedDataBlockEncoder extends AbstractDataBlockEncoder { 050 /** 051 * TODO: This datablockencoder is dealing in internals of hfileblocks. Purge reference to HFBs 052 */ 053 private static int INITIAL_KEY_BUFFER_SIZE = 512; 054 055 @Override 056 public ByteBuffer decodeKeyValues(DataInputStream source, 057 HFileBlockDecodingContext blkDecodingCtx) throws IOException { 058 if (blkDecodingCtx.getClass() != HFileBlockDefaultDecodingContext.class) { 059 throw new IOException(this.getClass().getName() + " only accepts " 060 + HFileBlockDefaultDecodingContext.class.getName() + " as the decoding context."); 061 } 062 063 HFileBlockDefaultDecodingContext decodingCtx = 064 (HFileBlockDefaultDecodingContext) blkDecodingCtx; 065 if ( 066 decodingCtx.getHFileContext().isIncludesTags() 067 && decodingCtx.getHFileContext().isCompressTags() 068 ) { 069 if (decodingCtx.getTagCompressionContext() != null) { 070 // It will be overhead to create the TagCompressionContext again and again for every block 071 // decoding. 072 decodingCtx.getTagCompressionContext().clear(); 073 } else { 074 try { 075 TagCompressionContext tagCompressionContext = 076 new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE); 077 decodingCtx.setTagCompressionContext(tagCompressionContext); 078 } catch (Exception e) { 079 throw new IOException("Failed to initialize TagCompressionContext", e); 080 } 081 } 082 } 083 return internalDecodeKeyValues(source, 0, 0, decodingCtx); 084 } 085 086 /********************* common prefixes *************************/ 087 // Having this as static is fine but if META is having DBE then we should 088 // change this. 089 public static int compareCommonRowPrefix(Cell left, Cell right, int rowCommonPrefix) { 090 if (left instanceof ByteBufferExtendedCell) { 091 ByteBufferExtendedCell bbLeft = (ByteBufferExtendedCell) left; 092 if (right instanceof ByteBufferExtendedCell) { 093 ByteBufferExtendedCell bbRight = (ByteBufferExtendedCell) right; 094 return ByteBufferUtils.compareTo(bbLeft.getRowByteBuffer(), 095 bbLeft.getRowPosition() + rowCommonPrefix, left.getRowLength() - rowCommonPrefix, 096 bbRight.getRowByteBuffer(), bbRight.getRowPosition() + rowCommonPrefix, 097 right.getRowLength() - rowCommonPrefix); 098 } else { 099 return ByteBufferUtils.compareTo(bbLeft.getRowByteBuffer(), 100 bbLeft.getRowPosition() + rowCommonPrefix, left.getRowLength() - rowCommonPrefix, 101 right.getRowArray(), right.getRowOffset() + rowCommonPrefix, 102 right.getRowLength() - rowCommonPrefix); 103 } 104 } else { 105 if (right instanceof ByteBufferExtendedCell) { 106 ByteBufferExtendedCell bbRight = (ByteBufferExtendedCell) right; 107 return ByteBufferUtils.compareTo(left.getRowArray(), left.getRowOffset() + rowCommonPrefix, 108 left.getRowLength() - rowCommonPrefix, bbRight.getRowByteBuffer(), 109 bbRight.getRowPosition() + rowCommonPrefix, right.getRowLength() - rowCommonPrefix); 110 } else { 111 return Bytes.compareTo(left.getRowArray(), left.getRowOffset() + rowCommonPrefix, 112 left.getRowLength() - rowCommonPrefix, right.getRowArray(), 113 right.getRowOffset() + rowCommonPrefix, right.getRowLength() - rowCommonPrefix); 114 } 115 } 116 } 117 118 public static int compareCommonFamilyPrefix(Cell left, Cell right, int familyCommonPrefix) { 119 if (left instanceof ByteBufferExtendedCell) { 120 ByteBufferExtendedCell bbLeft = (ByteBufferExtendedCell) left; 121 if (right instanceof ByteBufferExtendedCell) { 122 ByteBufferExtendedCell bbRight = (ByteBufferExtendedCell) right; 123 return ByteBufferUtils.compareTo(bbLeft.getFamilyByteBuffer(), 124 bbLeft.getFamilyPosition() + familyCommonPrefix, 125 left.getFamilyLength() - familyCommonPrefix, bbRight.getFamilyByteBuffer(), 126 bbRight.getFamilyPosition() + familyCommonPrefix, 127 right.getFamilyLength() - familyCommonPrefix); 128 } else { 129 return ByteBufferUtils.compareTo(bbLeft.getFamilyByteBuffer(), 130 bbLeft.getFamilyPosition() + familyCommonPrefix, 131 left.getFamilyLength() - familyCommonPrefix, right.getFamilyArray(), 132 right.getFamilyOffset() + familyCommonPrefix, 133 right.getFamilyLength() - familyCommonPrefix); 134 } 135 } else { 136 if (right instanceof ByteBufferExtendedCell) { 137 ByteBufferExtendedCell bbRight = (ByteBufferExtendedCell) right; 138 return ByteBufferUtils.compareTo(left.getFamilyArray(), 139 left.getFamilyOffset() + familyCommonPrefix, left.getFamilyLength() - familyCommonPrefix, 140 bbRight.getFamilyByteBuffer(), bbRight.getFamilyPosition() + familyCommonPrefix, 141 right.getFamilyLength() - familyCommonPrefix); 142 } else { 143 return Bytes.compareTo(left.getFamilyArray(), left.getFamilyOffset() + familyCommonPrefix, 144 left.getFamilyLength() - familyCommonPrefix, right.getFamilyArray(), 145 right.getFamilyOffset() + familyCommonPrefix, 146 right.getFamilyLength() - familyCommonPrefix); 147 } 148 } 149 } 150 151 public static int compareCommonQualifierPrefix(Cell left, Cell right, int qualCommonPrefix) { 152 if (left instanceof ByteBufferExtendedCell) { 153 ByteBufferExtendedCell bbLeft = (ByteBufferExtendedCell) left; 154 if (right instanceof ByteBufferExtendedCell) { 155 ByteBufferExtendedCell bbRight = (ByteBufferExtendedCell) right; 156 return ByteBufferUtils.compareTo(bbLeft.getQualifierByteBuffer(), 157 bbLeft.getQualifierPosition() + qualCommonPrefix, 158 left.getQualifierLength() - qualCommonPrefix, bbRight.getQualifierByteBuffer(), 159 bbRight.getQualifierPosition() + qualCommonPrefix, 160 right.getQualifierLength() - qualCommonPrefix); 161 } else { 162 return ByteBufferUtils.compareTo(bbLeft.getQualifierByteBuffer(), 163 bbLeft.getQualifierPosition() + qualCommonPrefix, 164 left.getQualifierLength() - qualCommonPrefix, right.getQualifierArray(), 165 right.getQualifierOffset() + qualCommonPrefix, 166 right.getQualifierLength() - qualCommonPrefix); 167 } 168 } else { 169 if (right instanceof ByteBufferExtendedCell) { 170 ByteBufferExtendedCell bbRight = (ByteBufferExtendedCell) right; 171 return ByteBufferUtils.compareTo(left.getQualifierArray(), 172 left.getQualifierOffset() + qualCommonPrefix, 173 left.getQualifierLength() - qualCommonPrefix, bbRight.getQualifierByteBuffer(), 174 bbRight.getQualifierPosition() + qualCommonPrefix, 175 right.getQualifierLength() - qualCommonPrefix); 176 } else { 177 return Bytes.compareTo(left.getQualifierArray(), 178 left.getQualifierOffset() + qualCommonPrefix, 179 left.getQualifierLength() - qualCommonPrefix, right.getQualifierArray(), 180 right.getQualifierOffset() + qualCommonPrefix, 181 right.getQualifierLength() - qualCommonPrefix); 182 } 183 } 184 } 185 186 protected static class SeekerState { 187 protected ByteBuff currentBuffer; 188 protected TagCompressionContext tagCompressionContext; 189 protected int valueOffset = -1; 190 protected int keyLength; 191 protected int valueLength; 192 protected int lastCommonPrefix; 193 protected int tagsLength = 0; 194 protected int tagsOffset = -1; 195 protected int tagsCompressedLength = 0; 196 protected boolean uncompressTags = true; 197 198 /** We need to store a copy of the key. */ 199 protected byte[] keyBuffer = HConstants.EMPTY_BYTE_ARRAY; 200 protected byte[] tagsBuffer = HConstants.EMPTY_BYTE_ARRAY; 201 202 protected long memstoreTS; 203 protected int nextKvOffset; 204 protected KeyValue.KeyOnlyKeyValue currentKey = new KeyValue.KeyOnlyKeyValue(); 205 // A temp pair object which will be reused by ByteBuff#asSubByteBuffer calls. This avoids too 206 // many object creations. 207 private final ObjectIntPair<ByteBuffer> tmpPair; 208 private final boolean includeTags; 209 210 public SeekerState(ObjectIntPair<ByteBuffer> tmpPair, boolean includeTags) { 211 this.tmpPair = tmpPair; 212 this.includeTags = includeTags; 213 } 214 215 protected boolean isValid() { 216 return valueOffset != -1; 217 } 218 219 protected void invalidate() { 220 valueOffset = -1; 221 tagsCompressedLength = 0; 222 currentKey.clear(); 223 uncompressTags = true; 224 currentBuffer = null; 225 } 226 227 protected void ensureSpaceForKey() { 228 if (keyLength > keyBuffer.length) { 229 int newKeyBufferLength = 230 Integer.highestOneBit(Math.max(INITIAL_KEY_BUFFER_SIZE, keyLength) - 1) << 1; 231 byte[] newKeyBuffer = new byte[newKeyBufferLength]; 232 System.arraycopy(keyBuffer, 0, newKeyBuffer, 0, keyBuffer.length); 233 keyBuffer = newKeyBuffer; 234 } 235 } 236 237 protected void ensureSpaceForTags() { 238 if (tagsLength > tagsBuffer.length) { 239 int newTagsBufferLength = 240 Integer.highestOneBit(Math.max(INITIAL_KEY_BUFFER_SIZE, tagsLength) - 1) << 1; 241 byte[] newTagsBuffer = new byte[newTagsBufferLength]; 242 System.arraycopy(tagsBuffer, 0, newTagsBuffer, 0, tagsBuffer.length); 243 tagsBuffer = newTagsBuffer; 244 } 245 } 246 247 protected void setKey(byte[] keyBuffer, long memTS) { 248 currentKey.setKey(keyBuffer, 0, keyLength); 249 memstoreTS = memTS; 250 } 251 252 /** 253 * Copy the state from the next one into this instance (the previous state placeholder). Used to 254 * save the previous state when we are advancing the seeker to the next key/value. 255 */ 256 protected void copyFromNext(SeekerState nextState) { 257 if (keyBuffer.length != nextState.keyBuffer.length) { 258 keyBuffer = nextState.keyBuffer.clone(); 259 } else if (!isValid()) { 260 // Note: we can only call isValid before we override our state, so this 261 // comes before all the assignments at the end of this method. 262 System.arraycopy(nextState.keyBuffer, 0, keyBuffer, 0, nextState.keyLength); 263 } else { 264 // don't copy the common prefix between this key and the previous one 265 System.arraycopy(nextState.keyBuffer, nextState.lastCommonPrefix, keyBuffer, 266 nextState.lastCommonPrefix, nextState.keyLength - nextState.lastCommonPrefix); 267 } 268 currentKey.set(nextState.currentKey); 269 270 valueOffset = nextState.valueOffset; 271 keyLength = nextState.keyLength; 272 valueLength = nextState.valueLength; 273 lastCommonPrefix = nextState.lastCommonPrefix; 274 nextKvOffset = nextState.nextKvOffset; 275 memstoreTS = nextState.memstoreTS; 276 currentBuffer = nextState.currentBuffer; 277 tagsOffset = nextState.tagsOffset; 278 tagsLength = nextState.tagsLength; 279 if (nextState.tagCompressionContext != null) { 280 tagCompressionContext = nextState.tagCompressionContext; 281 } 282 } 283 284 public ExtendedCell toCell() { 285 // Buffer backing the value and tags part from the HFileBlock's buffer 286 // When tag compression in use, this will be only the value bytes area. 287 ByteBuffer valAndTagsBuffer; 288 int vOffset; 289 int valAndTagsLength = this.valueLength; 290 int tagsLenSerializationSize = 0; 291 if (this.includeTags && this.tagCompressionContext == null) { 292 // Include the tags part also. This will be the tags bytes + 2 bytes of for storing tags 293 // length 294 tagsLenSerializationSize = this.tagsOffset - (this.valueOffset + this.valueLength); 295 valAndTagsLength += tagsLenSerializationSize + this.tagsLength; 296 } 297 this.currentBuffer.asSubByteBuffer(this.valueOffset, valAndTagsLength, this.tmpPair); 298 valAndTagsBuffer = this.tmpPair.getFirst(); 299 vOffset = this.tmpPair.getSecond();// This is the offset to value part in the BB 300 if (valAndTagsBuffer.hasArray()) { 301 return toOnheapCell(valAndTagsBuffer, vOffset, tagsLenSerializationSize); 302 } else { 303 return toOffheapCell(valAndTagsBuffer, vOffset, tagsLenSerializationSize); 304 } 305 } 306 307 private ExtendedCell toOnheapCell(ByteBuffer valAndTagsBuffer, int vOffset, 308 int tagsLenSerializationSize) { 309 byte[] tagsArray = HConstants.EMPTY_BYTE_ARRAY; 310 int tOffset = 0; 311 if (this.includeTags) { 312 if (this.tagCompressionContext == null) { 313 tagsArray = valAndTagsBuffer.array(); 314 tOffset = 315 valAndTagsBuffer.arrayOffset() + vOffset + this.valueLength + tagsLenSerializationSize; 316 } else { 317 tagsArray = Bytes.copy(tagsBuffer, 0, this.tagsLength); 318 tOffset = 0; 319 } 320 } 321 return new OnheapDecodedCell(Bytes.copy(keyBuffer, 0, this.keyLength), 322 currentKey.getRowLength(), currentKey.getFamilyOffset(), currentKey.getFamilyLength(), 323 currentKey.getQualifierOffset(), currentKey.getQualifierLength(), currentKey.getTimestamp(), 324 currentKey.getTypeByte(), valAndTagsBuffer.array(), 325 valAndTagsBuffer.arrayOffset() + vOffset, this.valueLength, memstoreTS, tagsArray, tOffset, 326 this.tagsLength); 327 } 328 329 private ExtendedCell toOffheapCell(ByteBuffer valAndTagsBuffer, int vOffset, 330 int tagsLenSerializationSize) { 331 ByteBuffer tagsBuf = HConstants.EMPTY_BYTE_BUFFER; 332 int tOffset = 0; 333 if (this.includeTags) { 334 if (this.tagCompressionContext == null) { 335 tagsBuf = valAndTagsBuffer; 336 tOffset = vOffset + this.valueLength + tagsLenSerializationSize; 337 } else { 338 tagsBuf = ByteBuffer.wrap(Bytes.copy(tagsBuffer, 0, this.tagsLength)); 339 tOffset = 0; 340 } 341 } 342 return new OffheapDecodedExtendedCell( 343 ByteBuffer.wrap(Bytes.copy(keyBuffer, 0, this.keyLength)), currentKey.getRowLength(), 344 currentKey.getFamilyOffset(), currentKey.getFamilyLength(), currentKey.getQualifierOffset(), 345 currentKey.getQualifierLength(), currentKey.getTimestamp(), currentKey.getTypeByte(), 346 valAndTagsBuffer, vOffset, this.valueLength, memstoreTS, tagsBuf, tOffset, this.tagsLength); 347 } 348 } 349 350 /** 351 * Copies only the key part of the keybuffer by doing a deep copy and passes the seeker state 352 * members for taking a clone. Note that the value byte[] part is still pointing to the 353 * currentBuffer and represented by the valueOffset and valueLength 354 */ 355 // We return this as a Cell to the upper layers of read flow and might try setting a new SeqId 356 // there. So this has to be an instance of ExtendedCell. 357 protected static class OnheapDecodedCell implements ExtendedCell { 358 private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT 359 + (3 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT) 360 + Bytes.SIZEOF_SHORT + (2 * Bytes.SIZEOF_BYTE) + (3 * ClassSize.ARRAY)); 361 private byte[] keyOnlyBuffer; 362 private short rowLength; 363 private int familyOffset; 364 private byte familyLength; 365 private int qualifierOffset; 366 private int qualifierLength; 367 private long timeStamp; 368 private byte typeByte; 369 private byte[] valueBuffer; 370 private int valueOffset; 371 private int valueLength; 372 private byte[] tagsBuffer; 373 private int tagsOffset; 374 private int tagsLength; 375 private long seqId; 376 377 protected OnheapDecodedCell(byte[] keyBuffer, short rowLength, int familyOffset, 378 byte familyLength, int qualOffset, int qualLength, long timeStamp, byte typeByte, 379 byte[] valueBuffer, int valueOffset, int valueLen, long seqId, byte[] tagsBuffer, 380 int tagsOffset, int tagsLength) { 381 this.keyOnlyBuffer = keyBuffer; 382 this.rowLength = rowLength; 383 this.familyOffset = familyOffset; 384 this.familyLength = familyLength; 385 this.qualifierOffset = qualOffset; 386 this.qualifierLength = qualLength; 387 this.timeStamp = timeStamp; 388 this.typeByte = typeByte; 389 this.valueBuffer = valueBuffer; 390 this.valueOffset = valueOffset; 391 this.valueLength = valueLen; 392 this.tagsBuffer = tagsBuffer; 393 this.tagsOffset = tagsOffset; 394 this.tagsLength = tagsLength; 395 setSequenceId(seqId); 396 } 397 398 @Override 399 public byte[] getRowArray() { 400 return keyOnlyBuffer; 401 } 402 403 @Override 404 public byte[] getFamilyArray() { 405 return keyOnlyBuffer; 406 } 407 408 @Override 409 public byte[] getQualifierArray() { 410 return keyOnlyBuffer; 411 } 412 413 @Override 414 public int getRowOffset() { 415 return Bytes.SIZEOF_SHORT; 416 } 417 418 @Override 419 public short getRowLength() { 420 return rowLength; 421 } 422 423 @Override 424 public int getFamilyOffset() { 425 return familyOffset; 426 } 427 428 @Override 429 public byte getFamilyLength() { 430 return familyLength; 431 } 432 433 @Override 434 public int getQualifierOffset() { 435 return qualifierOffset; 436 } 437 438 @Override 439 public int getQualifierLength() { 440 return qualifierLength; 441 } 442 443 @Override 444 public long getTimestamp() { 445 return timeStamp; 446 } 447 448 @Override 449 public byte getTypeByte() { 450 return typeByte; 451 } 452 453 @Override 454 public long getSequenceId() { 455 return seqId; 456 } 457 458 @Override 459 public byte[] getValueArray() { 460 return this.valueBuffer; 461 } 462 463 @Override 464 public int getValueOffset() { 465 return valueOffset; 466 } 467 468 @Override 469 public int getValueLength() { 470 return valueLength; 471 } 472 473 @Override 474 public byte[] getTagsArray() { 475 return this.tagsBuffer; 476 } 477 478 @Override 479 public int getTagsOffset() { 480 return this.tagsOffset; 481 } 482 483 @Override 484 public int getTagsLength() { 485 return tagsLength; 486 } 487 488 @Override 489 public String toString() { 490 return KeyValue.keyToString(this.keyOnlyBuffer, 0, KeyValueUtil.keyLength(this)) + "/vlen=" 491 + getValueLength() + "/seqid=" + seqId; 492 } 493 494 @Override 495 public void setSequenceId(long seqId) { 496 this.seqId = seqId; 497 } 498 499 @Override 500 public long heapSize() { 501 return FIXED_OVERHEAD + rowLength + familyLength + qualifierLength + valueLength + tagsLength; 502 } 503 504 @Override 505 public int write(OutputStream out, boolean withTags) throws IOException { 506 int lenToWrite = getSerializedSize(withTags); 507 ByteBufferUtils.putInt(out, keyOnlyBuffer.length); 508 ByteBufferUtils.putInt(out, valueLength); 509 // Write key 510 out.write(keyOnlyBuffer); 511 // Write value 512 out.write(this.valueBuffer, this.valueOffset, this.valueLength); 513 if (withTags && this.tagsLength > 0) { 514 // 2 bytes tags length followed by tags bytes 515 // tags length is serialized with 2 bytes only(short way) even if the type is int. 516 // As this is non -ve numbers, we save the sign bit. See HBASE-11437 517 out.write((byte) (0xff & (this.tagsLength >> 8))); 518 out.write((byte) (0xff & this.tagsLength)); 519 out.write(this.tagsBuffer, this.tagsOffset, this.tagsLength); 520 } 521 return lenToWrite; 522 } 523 524 @Override 525 public int getSerializedSize(boolean withTags) { 526 return KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength, tagsLength, 527 withTags); 528 } 529 530 @Override 531 public void write(ByteBuffer buf, int offset) { 532 // This is not used in actual flow. Throwing UnsupportedOperationException 533 throw new UnsupportedOperationException(); 534 } 535 536 @Override 537 public void setTimestamp(long ts) throws IOException { 538 // This is not used in actual flow. Throwing UnsupportedOperationException 539 throw new UnsupportedOperationException(); 540 } 541 542 @Override 543 public void setTimestamp(byte[] ts) throws IOException { 544 // This is not used in actual flow. Throwing UnsupportedOperationException 545 throw new UnsupportedOperationException(); 546 } 547 548 @Override 549 public ExtendedCell deepClone() { 550 // This is not used in actual flow. Throwing UnsupportedOperationException 551 throw new UnsupportedOperationException(); 552 } 553 } 554 555 protected static class OffheapDecodedExtendedCell extends ByteBufferExtendedCell { 556 private static final long FIXED_OVERHEAD = 557 (long) ClassSize.align(ClassSize.OBJECT + (3 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) 558 + (7 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) 559 + (3 * ClassSize.BYTE_BUFFER); 560 private ByteBuffer keyBuffer; 561 private short rowLength; 562 private int familyOffset; 563 private byte familyLength; 564 private int qualifierOffset; 565 private int qualifierLength; 566 private long timeStamp; 567 private byte typeByte; 568 private ByteBuffer valueBuffer; 569 private int valueOffset; 570 private int valueLength; 571 private ByteBuffer tagsBuffer; 572 private int tagsOffset; 573 private int tagsLength; 574 private long seqId; 575 576 protected OffheapDecodedExtendedCell(ByteBuffer keyBuffer, short rowLength, int familyOffset, 577 byte familyLength, int qualOffset, int qualLength, long timeStamp, byte typeByte, 578 ByteBuffer valueBuffer, int valueOffset, int valueLen, long seqId, ByteBuffer tagsBuffer, 579 int tagsOffset, int tagsLength) { 580 // The keyBuffer is always onheap 581 assert keyBuffer.hasArray(); 582 assert keyBuffer.arrayOffset() == 0; 583 this.keyBuffer = keyBuffer; 584 this.rowLength = rowLength; 585 this.familyOffset = familyOffset; 586 this.familyLength = familyLength; 587 this.qualifierOffset = qualOffset; 588 this.qualifierLength = qualLength; 589 this.timeStamp = timeStamp; 590 this.typeByte = typeByte; 591 this.valueBuffer = valueBuffer; 592 this.valueOffset = valueOffset; 593 this.valueLength = valueLen; 594 this.tagsBuffer = tagsBuffer; 595 this.tagsOffset = tagsOffset; 596 this.tagsLength = tagsLength; 597 setSequenceId(seqId); 598 } 599 600 @Override 601 @SuppressWarnings("ByteBufferBackingArray") 602 public byte[] getRowArray() { 603 return this.keyBuffer.array(); 604 } 605 606 @Override 607 public int getRowOffset() { 608 return getRowPosition(); 609 } 610 611 @Override 612 public short getRowLength() { 613 return this.rowLength; 614 } 615 616 @Override 617 @SuppressWarnings("ByteBufferBackingArray") 618 public byte[] getFamilyArray() { 619 return this.keyBuffer.array(); 620 } 621 622 @Override 623 public int getFamilyOffset() { 624 return getFamilyPosition(); 625 } 626 627 @Override 628 public byte getFamilyLength() { 629 return this.familyLength; 630 } 631 632 @Override 633 @SuppressWarnings("ByteBufferBackingArray") 634 public byte[] getQualifierArray() { 635 return this.keyBuffer.array(); 636 } 637 638 @Override 639 public int getQualifierOffset() { 640 return getQualifierPosition(); 641 } 642 643 @Override 644 public int getQualifierLength() { 645 return this.qualifierLength; 646 } 647 648 @Override 649 public long getTimestamp() { 650 return this.timeStamp; 651 } 652 653 @Override 654 public byte getTypeByte() { 655 return this.typeByte; 656 } 657 658 @Override 659 public long getSequenceId() { 660 return this.seqId; 661 } 662 663 @Override 664 public byte[] getValueArray() { 665 return CellUtil.cloneValue(this); 666 } 667 668 @Override 669 public int getValueOffset() { 670 return 0; 671 } 672 673 @Override 674 public int getValueLength() { 675 return this.valueLength; 676 } 677 678 @Override 679 public byte[] getTagsArray() { 680 return PrivateCellUtil.cloneTags(this); 681 } 682 683 @Override 684 public int getTagsOffset() { 685 return 0; 686 } 687 688 @Override 689 public int getTagsLength() { 690 return this.tagsLength; 691 } 692 693 @Override 694 public ByteBuffer getRowByteBuffer() { 695 return this.keyBuffer; 696 } 697 698 @Override 699 public int getRowPosition() { 700 return Bytes.SIZEOF_SHORT; 701 } 702 703 @Override 704 public ByteBuffer getFamilyByteBuffer() { 705 return this.keyBuffer; 706 } 707 708 @Override 709 public int getFamilyPosition() { 710 return this.familyOffset; 711 } 712 713 @Override 714 public ByteBuffer getQualifierByteBuffer() { 715 return this.keyBuffer; 716 } 717 718 @Override 719 public int getQualifierPosition() { 720 return this.qualifierOffset; 721 } 722 723 @Override 724 public ByteBuffer getValueByteBuffer() { 725 return this.valueBuffer; 726 } 727 728 @Override 729 public int getValuePosition() { 730 return this.valueOffset; 731 } 732 733 @Override 734 public ByteBuffer getTagsByteBuffer() { 735 return this.tagsBuffer; 736 } 737 738 @Override 739 public int getTagsPosition() { 740 return this.tagsOffset; 741 } 742 743 @Override 744 public long heapSize() { 745 return FIXED_OVERHEAD; 746 } 747 748 @Override 749 public void setSequenceId(long seqId) { 750 this.seqId = seqId; 751 } 752 753 @Override 754 public int write(OutputStream out, boolean withTags) throws IOException { 755 int lenToWrite = getSerializedSize(withTags); 756 ByteBufferUtils.putInt(out, keyBuffer.remaining()); 757 ByteBufferUtils.putInt(out, valueLength); 758 // Write key 759 out.write(keyBuffer.array(), keyBuffer.arrayOffset(), keyBuffer.remaining()); 760 // Write value 761 ByteBufferUtils.copyBufferToStream(out, this.valueBuffer, this.valueOffset, this.valueLength); 762 if (withTags && this.tagsLength > 0) { 763 // 2 bytes tags length followed by tags bytes 764 // tags length is serialized with 2 bytes only(short way) even if the type is int. 765 // As this is non -ve numbers, we save the sign bit. See HBASE-11437 766 out.write((byte) (0xff & (this.tagsLength >> 8))); 767 out.write((byte) (0xff & this.tagsLength)); 768 ByteBufferUtils.copyBufferToStream(out, this.tagsBuffer, this.tagsOffset, this.tagsLength); 769 } 770 return lenToWrite; 771 } 772 773 @Override 774 public int getSerializedSize(boolean withTags) { 775 return KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength, tagsLength, 776 withTags); 777 } 778 779 @Override 780 public void setTimestamp(long ts) throws IOException { 781 // This is not used in actual flow. Throwing UnsupportedOperationException 782 throw new UnsupportedOperationException(); 783 } 784 785 @Override 786 public void setTimestamp(byte[] ts) throws IOException { 787 // This is not used in actual flow. Throwing UnsupportedOperationException 788 throw new UnsupportedOperationException(); 789 } 790 791 @Override 792 public void write(ByteBuffer buf, int offset) { 793 // This is not used in actual flow. Throwing UnsupportedOperationException 794 throw new UnsupportedOperationException(); 795 } 796 797 @Override 798 public ExtendedCell deepClone() { 799 // This is not used in actual flow. Throwing UnsupportedOperationException 800 throw new UnsupportedOperationException(); 801 } 802 } 803 804 protected abstract static class BufferedEncodedSeeker<STATE extends SeekerState> 805 extends AbstractEncodedSeeker { 806 protected ByteBuff currentBuffer; 807 protected TagCompressionContext tagCompressionContext = null; 808 protected KeyValue.KeyOnlyKeyValue keyOnlyKV = new KeyValue.KeyOnlyKeyValue(); 809 // A temp pair object which will be reused by ByteBuff#asSubByteBuffer calls. This avoids too 810 // many object creations. 811 protected final ObjectIntPair<ByteBuffer> tmpPair = new ObjectIntPair<>(); 812 protected STATE current, previous; 813 814 public BufferedEncodedSeeker(HFileBlockDecodingContext decodingCtx) { 815 super(decodingCtx); 816 if (decodingCtx.getHFileContext().isCompressTags()) { 817 try { 818 tagCompressionContext = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE); 819 } catch (Exception e) { 820 throw new RuntimeException("Failed to initialize TagCompressionContext", e); 821 } 822 } 823 current = createSeekerState(); // always valid 824 previous = createSeekerState(); // may not be valid 825 } 826 827 @Override 828 public int compareKey(CellComparator comparator, ExtendedCell key) { 829 keyOnlyKV.setKey(current.keyBuffer, 0, current.keyLength); 830 return PrivateCellUtil.compareKeyIgnoresMvcc(comparator, key, keyOnlyKV); 831 } 832 833 @Override 834 public void setCurrentBuffer(ByteBuff buffer) { 835 if (this.tagCompressionContext != null) { 836 this.tagCompressionContext.clear(); 837 838 // Prior seekToKeyInBlock may have reset this to false if we fell back to previous 839 // seeker state. This is an optimization so we don't have to uncompress tags again when 840 // reading last state. 841 // In seekBefore flow, if block change happens then rewind is not called and 842 // setCurrentBuffer is called, so need to uncompress any tags we see. 843 current.uncompressTags = true; 844 } 845 currentBuffer = buffer; 846 current.currentBuffer = currentBuffer; 847 if (tagCompressionContext != null) { 848 current.tagCompressionContext = tagCompressionContext; 849 } 850 decodeFirst(); 851 current.setKey(current.keyBuffer, current.memstoreTS); 852 previous.invalidate(); 853 } 854 855 @Override 856 public ExtendedCell getKey() { 857 byte[] key = new byte[current.keyLength]; 858 System.arraycopy(current.keyBuffer, 0, key, 0, current.keyLength); 859 return new KeyValue.KeyOnlyKeyValue(key); 860 } 861 862 @Override 863 public ByteBuffer getValueShallowCopy() { 864 currentBuffer.asSubByteBuffer(current.valueOffset, current.valueLength, tmpPair); 865 ByteBuffer dup = tmpPair.getFirst().duplicate(); 866 dup.position(tmpPair.getSecond()); 867 dup.limit(tmpPair.getSecond() + current.valueLength); 868 return dup.slice(); 869 } 870 871 @Override 872 public ExtendedCell getCell() { 873 return current.toCell(); 874 } 875 876 @Override 877 public void rewind() { 878 currentBuffer.rewind(); 879 if (tagCompressionContext != null) { 880 tagCompressionContext.clear(); 881 // Prior seekToKeyInBlock may have reset this to false if we fell back to previous 882 // seeker state. This is an optimization so we don't have to uncompress tags again when 883 // reading last state. 884 // In case of rewind, we are starting from the beginning of the buffer, so we need 885 // to uncompress any tags we see. 886 current.uncompressTags = true; 887 } 888 decodeFirst(); 889 current.setKey(current.keyBuffer, current.memstoreTS); 890 previous.invalidate(); 891 } 892 893 @Override 894 public boolean next() { 895 if (!currentBuffer.hasRemaining()) { 896 return false; 897 } 898 decodeNext(); 899 current.setKey(current.keyBuffer, current.memstoreTS); 900 previous.invalidate(); 901 return true; 902 } 903 904 protected void decodeTags() { 905 current.tagsLength = ByteBuff.readCompressedInt(currentBuffer); 906 if (tagCompressionContext != null) { 907 if (current.uncompressTags) { 908 // Tag compression is been used. uncompress it into tagsBuffer 909 current.ensureSpaceForTags(); 910 try { 911 current.tagsCompressedLength = tagCompressionContext.uncompressTags(currentBuffer, 912 current.tagsBuffer, 0, current.tagsLength); 913 } catch (Exception e) { 914 throw new RuntimeException("Exception while uncompressing tags", e); 915 } 916 } else { 917 currentBuffer.skip(current.tagsCompressedLength); 918 current.uncompressTags = true;// Reset this. 919 } 920 current.tagsOffset = -1; 921 } else { 922 // When tag compress is not used, let us not do copying of tags bytes into tagsBuffer. 923 // Just mark the tags Offset so as to create the KV buffer later in getKeyValueBuffer() 924 current.tagsOffset = currentBuffer.position(); 925 currentBuffer.skip(current.tagsLength); 926 } 927 } 928 929 @Override 930 public int seekToKeyInBlock(ExtendedCell seekCell, boolean seekBefore) { 931 int rowCommonPrefix = 0; 932 int familyCommonPrefix = 0; 933 int qualCommonPrefix = 0; 934 previous.invalidate(); 935 do { 936 int comp; 937 keyOnlyKV.setKey(current.keyBuffer, 0, current.keyLength); 938 if (current.lastCommonPrefix != 0) { 939 // The KV format has row key length also in the byte array. The 940 // common prefix 941 // includes it. So we need to subtract to find out the common prefix 942 // in the 943 // row part alone 944 rowCommonPrefix = Math.min(rowCommonPrefix, current.lastCommonPrefix - 2); 945 } 946 if (current.lastCommonPrefix <= 2) { 947 rowCommonPrefix = 0; 948 } 949 rowCommonPrefix += findCommonPrefixInRowPart(seekCell, keyOnlyKV, rowCommonPrefix); 950 comp = compareCommonRowPrefix(seekCell, keyOnlyKV, rowCommonPrefix); 951 if (comp == 0) { 952 comp = compareTypeBytes(seekCell, keyOnlyKV); 953 if (comp == 0) { 954 // Subtract the fixed row key length and the family key fixed length 955 familyCommonPrefix = Math.max(0, Math.min(familyCommonPrefix, 956 current.lastCommonPrefix - (3 + keyOnlyKV.getRowLength()))); 957 familyCommonPrefix += 958 findCommonPrefixInFamilyPart(seekCell, keyOnlyKV, familyCommonPrefix); 959 comp = compareCommonFamilyPrefix(seekCell, keyOnlyKV, familyCommonPrefix); 960 if (comp == 0) { 961 // subtract the rowkey fixed length and the family key fixed 962 // length 963 qualCommonPrefix = Math.max(0, Math.min(qualCommonPrefix, current.lastCommonPrefix 964 - (3 + keyOnlyKV.getRowLength() + keyOnlyKV.getFamilyLength()))); 965 qualCommonPrefix += 966 findCommonPrefixInQualifierPart(seekCell, keyOnlyKV, qualCommonPrefix); 967 comp = compareCommonQualifierPrefix(seekCell, keyOnlyKV, qualCommonPrefix); 968 if (comp == 0) { 969 comp = CellComparator.getInstance().compareTimestamps(seekCell, keyOnlyKV); 970 if (comp == 0) { 971 // Compare types. Let the delete types sort ahead of puts; 972 // i.e. types 973 // of higher numbers sort before those of lesser numbers. 974 // Maximum 975 // (255) 976 // appears ahead of everything, and minimum (0) appears 977 // after 978 // everything. 979 comp = (0xff & keyOnlyKV.getTypeByte()) - (0xff & seekCell.getTypeByte()); 980 } 981 } 982 } 983 } 984 } 985 if (comp == 0) { // exact match 986 if (seekBefore) { 987 if (!previous.isValid()) { 988 // The caller (seekBefore) has to ensure that we are not at the 989 // first key in the block. 990 throw new IllegalStateException( 991 "Cannot seekBefore if " + "positioned at the first key in the block: key=" 992 + Bytes.toStringBinary(seekCell.getRowArray())); 993 } 994 moveToPrevious(); 995 return 1; 996 } 997 return 0; 998 } 999 1000 if (comp < 0) { // already too large, check previous 1001 if (previous.isValid()) { 1002 moveToPrevious(); 1003 } else { 1004 return HConstants.INDEX_KEY_MAGIC; // using optimized index key 1005 } 1006 return 1; 1007 } 1008 1009 // move to next, if more data is available 1010 if (currentBuffer.hasRemaining()) { 1011 previous.copyFromNext(current); 1012 decodeNext(); 1013 current.setKey(current.keyBuffer, current.memstoreTS); 1014 } else { 1015 break; 1016 } 1017 } while (true); 1018 1019 // we hit the end of the block, not an exact match 1020 return 1; 1021 } 1022 1023 private int compareTypeBytes(ExtendedCell key, ExtendedCell right) { 1024 if ( 1025 key.getFamilyLength() + key.getQualifierLength() == 0 1026 && key.getTypeByte() == KeyValue.Type.Minimum.getCode() 1027 ) { 1028 // left is "bigger", i.e. it appears later in the sorted order 1029 return 1; 1030 } 1031 if ( 1032 right.getFamilyLength() + right.getQualifierLength() == 0 1033 && right.getTypeByte() == KeyValue.Type.Minimum.getCode() 1034 ) { 1035 return -1; 1036 } 1037 return 0; 1038 } 1039 1040 // These findCommonPrefix* methods rely on the fact that keyOnlyKv is the "right" cell argument 1041 // and always on-heap 1042 1043 private static int findCommonPrefixInRowPart(Cell left, KeyValue.KeyOnlyKeyValue right, 1044 int rowCommonPrefix) { 1045 if (left instanceof ByteBufferExtendedCell) { 1046 ByteBufferExtendedCell bbLeft = (ByteBufferExtendedCell) left; 1047 return ByteBufferUtils.findCommonPrefix(bbLeft.getRowByteBuffer(), 1048 bbLeft.getRowPosition() + rowCommonPrefix, left.getRowLength() - rowCommonPrefix, 1049 right.getRowArray(), right.getRowOffset() + rowCommonPrefix, 1050 right.getRowLength() - rowCommonPrefix); 1051 } else { 1052 return Bytes.findCommonPrefix(left.getRowArray(), right.getRowArray(), 1053 left.getRowLength() - rowCommonPrefix, right.getRowLength() - rowCommonPrefix, 1054 left.getRowOffset() + rowCommonPrefix, right.getRowOffset() + rowCommonPrefix); 1055 } 1056 } 1057 1058 private static int findCommonPrefixInFamilyPart(Cell left, KeyValue.KeyOnlyKeyValue right, 1059 int familyCommonPrefix) { 1060 if (left instanceof ByteBufferExtendedCell) { 1061 ByteBufferExtendedCell bbLeft = (ByteBufferExtendedCell) left; 1062 return ByteBufferUtils.findCommonPrefix(bbLeft.getFamilyByteBuffer(), 1063 bbLeft.getFamilyPosition() + familyCommonPrefix, 1064 left.getFamilyLength() - familyCommonPrefix, right.getFamilyArray(), 1065 right.getFamilyOffset() + familyCommonPrefix, 1066 right.getFamilyLength() - familyCommonPrefix); 1067 } else { 1068 return Bytes.findCommonPrefix(left.getFamilyArray(), right.getFamilyArray(), 1069 left.getFamilyLength() - familyCommonPrefix, right.getFamilyLength() - familyCommonPrefix, 1070 left.getFamilyOffset() + familyCommonPrefix, 1071 right.getFamilyOffset() + familyCommonPrefix); 1072 } 1073 } 1074 1075 private static int findCommonPrefixInQualifierPart(Cell left, KeyValue.KeyOnlyKeyValue right, 1076 int qualifierCommonPrefix) { 1077 if (left instanceof ByteBufferExtendedCell) { 1078 ByteBufferExtendedCell bbLeft = (ByteBufferExtendedCell) left; 1079 return ByteBufferUtils.findCommonPrefix(bbLeft.getQualifierByteBuffer(), 1080 bbLeft.getQualifierPosition() + qualifierCommonPrefix, 1081 left.getQualifierLength() - qualifierCommonPrefix, right.getQualifierArray(), 1082 right.getQualifierOffset() + qualifierCommonPrefix, 1083 right.getQualifierLength() - qualifierCommonPrefix); 1084 } else { 1085 return Bytes.findCommonPrefix(left.getQualifierArray(), right.getQualifierArray(), 1086 left.getQualifierLength() - qualifierCommonPrefix, 1087 right.getQualifierLength() - qualifierCommonPrefix, 1088 left.getQualifierOffset() + qualifierCommonPrefix, 1089 right.getQualifierOffset() + qualifierCommonPrefix); 1090 } 1091 } 1092 1093 private void moveToPrevious() { 1094 if (!previous.isValid()) { 1095 throw new IllegalStateException( 1096 "Can move back only once and not in first key in the block."); 1097 } 1098 1099 STATE tmp = previous; 1100 previous = current; 1101 current = tmp; 1102 1103 // move after last key value 1104 currentBuffer.position(current.nextKvOffset); 1105 // Already decoded the tag bytes. We cache this tags into current state and also the total 1106 // compressed length of the tags bytes. For the next time decodeNext() we don't need to decode 1107 // the tags again. This might pollute the Data Dictionary what we use for the compression. 1108 // When current.uncompressTags is false, we will just reuse the current.tagsBuffer and skip 1109 // 'tagsCompressedLength' bytes of source stream. 1110 // See in decodeTags() 1111 current.tagsBuffer = previous.tagsBuffer; 1112 current.tagsCompressedLength = previous.tagsCompressedLength; 1113 current.uncompressTags = false; 1114 // The current key has to be reset with the previous Cell 1115 current.setKey(current.keyBuffer, current.memstoreTS); 1116 previous.invalidate(); 1117 } 1118 1119 @SuppressWarnings("unchecked") 1120 protected STATE createSeekerState() { 1121 // This will fail for non-default seeker state if the subclass does not 1122 // override this method. 1123 return (STATE) new SeekerState(this.tmpPair, this.includesTags()); 1124 } 1125 1126 abstract protected void decodeFirst(); 1127 1128 abstract protected void decodeNext(); 1129 } 1130 1131 /** Returns unencoded size added */ 1132 protected final int afterEncodingKeyValue(ExtendedCell cell, DataOutputStream out, 1133 HFileBlockDefaultEncodingContext encodingCtx) throws IOException { 1134 int size = 0; 1135 if (encodingCtx.getHFileContext().isIncludesTags()) { 1136 int tagsLength = cell.getTagsLength(); 1137 ByteBufferUtils.putCompressedInt(out, tagsLength); 1138 // There are some tags to be written 1139 if (tagsLength > 0) { 1140 TagCompressionContext tagCompressionContext = encodingCtx.getTagCompressionContext(); 1141 // When tag compression is enabled, tagCompressionContext will have a not null value. Write 1142 // the tags using Dictionary compression in such a case 1143 if (tagCompressionContext != null) { 1144 // Not passing tagsLength considering that parsing of the tagsLength is not costly 1145 PrivateCellUtil.compressTags(out, cell, tagCompressionContext); 1146 } else { 1147 PrivateCellUtil.writeTags(out, cell, tagsLength); 1148 } 1149 } 1150 size += tagsLength + KeyValue.TAGS_LENGTH_SIZE; 1151 } 1152 if (encodingCtx.getHFileContext().isIncludesMvcc()) { 1153 // Copy memstore timestamp from the byte buffer to the output stream. 1154 long memstoreTS = cell.getSequenceId(); 1155 WritableUtils.writeVLong(out, memstoreTS); 1156 // TODO use a writeVLong which returns the #bytes written so that 2 time parsing can be 1157 // avoided. 1158 size += WritableUtils.getVIntSize(memstoreTS); 1159 } 1160 return size; 1161 } 1162 1163 protected final void afterDecodingKeyValue(DataInputStream source, ByteBuffer dest, 1164 HFileBlockDefaultDecodingContext decodingCtx) throws IOException { 1165 if (decodingCtx.getHFileContext().isIncludesTags()) { 1166 int tagsLength = ByteBufferUtils.readCompressedInt(source); 1167 // Put as unsigned short 1168 dest.put((byte) ((tagsLength >> 8) & 0xff)); 1169 dest.put((byte) (tagsLength & 0xff)); 1170 if (tagsLength > 0) { 1171 TagCompressionContext tagCompressionContext = decodingCtx.getTagCompressionContext(); 1172 // When tag compression is been used in this file, tagCompressionContext will have a not 1173 // null value passed. 1174 if (tagCompressionContext != null) { 1175 tagCompressionContext.uncompressTags(source, dest, tagsLength); 1176 } else { 1177 ByteBufferUtils.copyFromStreamToBuffer(dest, source, tagsLength); 1178 } 1179 } 1180 } 1181 if (decodingCtx.getHFileContext().isIncludesMvcc()) { 1182 long memstoreTS = -1; 1183 try { 1184 // Copy memstore timestamp from the data input stream to the byte 1185 // buffer. 1186 memstoreTS = WritableUtils.readVLong(source); 1187 ByteBufferUtils.writeVLong(dest, memstoreTS); 1188 } catch (IOException ex) { 1189 throw new RuntimeException( 1190 "Unable to copy memstore timestamp " + memstoreTS + " after decoding a key/value"); 1191 } 1192 } 1193 } 1194 1195 protected abstract ByteBuffer internalDecodeKeyValues(DataInputStream source, 1196 int allocateHeaderLength, int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx) 1197 throws IOException; 1198 1199 /** 1200 * Asserts that there is at least the given amount of unfilled space remaining in the given 1201 * buffer. 1202 * @param out typically, the buffer we are writing to 1203 * @param length the required space in the buffer 1204 * @throws EncoderBufferTooSmallException If there are no enough bytes. 1205 */ 1206 protected static void ensureSpace(ByteBuffer out, int length) 1207 throws EncoderBufferTooSmallException { 1208 if (out.position() + length > out.limit()) { 1209 throw new EncoderBufferTooSmallException("Buffer position=" + out.position() 1210 + ", buffer limit=" + out.limit() + ", length to be written=" + length); 1211 } 1212 } 1213 1214 @Override 1215 public void startBlockEncoding(HFileBlockEncodingContext blkEncodingCtx, DataOutputStream out) 1216 throws IOException { 1217 if (blkEncodingCtx.getClass() != HFileBlockDefaultEncodingContext.class) { 1218 throw new IOException(this.getClass().getName() + " only accepts " 1219 + HFileBlockDefaultEncodingContext.class.getName() + " as the " + "encoding context."); 1220 } 1221 1222 HFileBlockDefaultEncodingContext encodingCtx = 1223 (HFileBlockDefaultEncodingContext) blkEncodingCtx; 1224 encodingCtx.prepareEncoding(out); 1225 if ( 1226 encodingCtx.getHFileContext().isIncludesTags() 1227 && encodingCtx.getHFileContext().isCompressTags() 1228 ) { 1229 if (encodingCtx.getTagCompressionContext() != null) { 1230 // It will be overhead to create the TagCompressionContext again and again for every block 1231 // encoding. 1232 encodingCtx.getTagCompressionContext().clear(); 1233 } else { 1234 try { 1235 TagCompressionContext tagCompressionContext = 1236 new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE); 1237 encodingCtx.setTagCompressionContext(tagCompressionContext); 1238 } catch (Exception e) { 1239 throw new IOException("Failed to initialize TagCompressionContext", e); 1240 } 1241 } 1242 } 1243 StreamUtils.writeInt(out, 0); // DUMMY length. This will be updated in endBlockEncoding() 1244 blkEncodingCtx.setEncodingState(new EncodingState()); 1245 } 1246 1247 @Override 1248 public void encode(ExtendedCell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out) 1249 throws IOException { 1250 EncodingState state = encodingCtx.getEncodingState(); 1251 int posBeforeEncode = out.size(); 1252 int encodedKvSize = internalEncode(cell, (HFileBlockDefaultEncodingContext) encodingCtx, out); 1253 state.postCellEncode(encodedKvSize, out.size() - posBeforeEncode); 1254 } 1255 1256 public abstract int internalEncode(ExtendedCell cell, 1257 HFileBlockDefaultEncodingContext encodingCtx, DataOutputStream out) throws IOException; 1258 1259 @Override 1260 public void endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out, 1261 byte[] uncompressedBytesWithHeader) throws IOException { 1262 EncodingState state = encodingCtx.getEncodingState(); 1263 // Write the unencodedDataSizeWritten (with header size) 1264 Bytes.putInt(uncompressedBytesWithHeader, 1265 HConstants.HFILEBLOCK_HEADER_SIZE + DataBlockEncoding.ID_SIZE, 1266 state.getUnencodedDataSizeWritten()); 1267 postEncoding(encodingCtx); 1268 } 1269 1270}