001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.nio; 019 020import static org.apache.hadoop.hbase.io.ByteBuffAllocator.NONE; 021 022import java.io.IOException; 023import java.nio.BufferOverflowException; 024import java.nio.BufferUnderflowException; 025import java.nio.ByteBuffer; 026import java.nio.InvalidMarkException; 027import java.nio.channels.FileChannel; 028import java.nio.channels.ReadableByteChannel; 029import java.util.Iterator; 030import java.util.NoSuchElementException; 031import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler; 032import org.apache.hadoop.hbase.util.ByteBufferUtils; 033import org.apache.hadoop.hbase.util.Bytes; 034import org.apache.hadoop.hbase.util.ObjectIntPair; 035import org.apache.yetus.audience.InterfaceAudience; 036 037/** 038 * Provides a unified view of all the underlying ByteBuffers and will look as if a bigger sequential 039 * buffer. This class provides similar APIs as in {@link ByteBuffer} to put/get int, short, long etc 040 * and doing operations like mark, reset, slice etc. This has to be used when data is split across 041 * multiple byte buffers and we don't want copy them to single buffer for reading from it. 042 */ 043@InterfaceAudience.Private 044public class MultiByteBuff extends ByteBuff { 045 046 private final ByteBuffer[] items; 047 // Pointer to the current item in the MBB 048 private ByteBuffer curItem = null; 049 // Index of the current item in the MBB 050 private int curItemIndex = 0; 051 052 private int limit = 0; 053 private int limitedItemIndex; 054 private int markedItemIndex = -1; 055 private final int[] itemBeginPos; 056 057 private Iterator<ByteBuffer> buffsIterator = new Iterator<ByteBuffer>() { 058 @Override 059 public boolean hasNext() { 060 return curItemIndex < limitedItemIndex 061 || (curItemIndex == limitedItemIndex && items[curItemIndex].hasRemaining()); 062 } 063 064 @Override 065 public ByteBuffer next() { 066 if (curItemIndex >= items.length) { 067 throw new NoSuchElementException("items overflow"); 068 } 069 curItem = items[curItemIndex++]; 070 return curItem; 071 } 072 }; 073 074 public MultiByteBuff(ByteBuffer... items) { 075 this(NONE, items); 076 } 077 078 public MultiByteBuff(Recycler recycler, ByteBuffer... items) { 079 this(new RefCnt(recycler), items); 080 } 081 082 MultiByteBuff(RefCnt refCnt, ByteBuffer... items) { 083 this.refCnt = refCnt; 084 assert items != null; 085 assert items.length > 0; 086 this.items = items; 087 this.curItem = this.items[this.curItemIndex]; 088 // See below optimization in getInt(int) where we check whether the given index land in current 089 // item. For this we need to check whether the passed index is less than the next item begin 090 // offset. To handle this effectively for the last item buffer, we add an extra item into this 091 // array. 092 itemBeginPos = new int[items.length + 1]; 093 int offset = 0; 094 for (int i = 0; i < items.length; i++) { 095 ByteBuffer item = items[i]; 096 item.rewind(); 097 itemBeginPos[i] = offset; 098 int l = item.limit() - item.position(); 099 offset += l; 100 } 101 this.limit = offset; 102 this.itemBeginPos[items.length] = offset + 1; 103 this.limitedItemIndex = this.items.length - 1; 104 } 105 106 private MultiByteBuff(RefCnt refCnt, ByteBuffer[] items, int[] itemBeginPos, int limit, 107 int limitedIndex, int curItemIndex, int markedIndex) { 108 this.refCnt = refCnt; 109 this.items = items; 110 this.curItemIndex = curItemIndex; 111 this.curItem = this.items[this.curItemIndex]; 112 this.itemBeginPos = itemBeginPos; 113 this.limit = limit; 114 this.limitedItemIndex = limitedIndex; 115 this.markedItemIndex = markedIndex; 116 } 117 118 /** 119 * @throws UnsupportedOperationException MBB does not support array based operations 120 */ 121 @Override 122 public byte[] array() { 123 throw new UnsupportedOperationException(); 124 } 125 126 /** 127 * @throws UnsupportedOperationException MBB does not support array based operations 128 */ 129 @Override 130 public int arrayOffset() { 131 throw new UnsupportedOperationException(); 132 } 133 134 /** Returns false. MBB does not support array based operations */ 135 @Override 136 public boolean hasArray() { 137 return false; 138 } 139 140 /** Returns the total capacity of this MultiByteBuffer. */ 141 @Override 142 public int capacity() { 143 checkRefCount(); 144 int c = 0; 145 for (ByteBuffer item : this.items) { 146 c += item.capacity(); 147 } 148 return c; 149 } 150 151 /** 152 * Fetches the byte at the given index. Does not change position of the underlying ByteBuffers 153 * @return the byte at the given index 154 */ 155 @Override 156 public byte get(int index) { 157 checkRefCount(); 158 int itemIndex = getItemIndex(index); 159 return ByteBufferUtils.toByte(this.items[itemIndex], index - this.itemBeginPos[itemIndex]); 160 } 161 162 @Override 163 public byte getByteAfterPosition(int offset) { 164 checkRefCount(); 165 // Mostly the index specified will land within this current item. Short circuit for that 166 int index = offset + this.position(); 167 int itemIndex = getItemIndexFromCurItemIndex(index); 168 return ByteBufferUtils.toByte(this.items[itemIndex], index - this.itemBeginPos[itemIndex]); 169 } 170 171 /* 172 * Returns in which sub ByteBuffer, the given element index will be available. 173 */ 174 private int getItemIndex(int elemIndex) { 175 if (elemIndex < 0) { 176 throw new IndexOutOfBoundsException(); 177 } 178 int index = 1; 179 while (elemIndex >= this.itemBeginPos[index]) { 180 index++; 181 if (index == this.itemBeginPos.length) { 182 throw new IndexOutOfBoundsException(); 183 } 184 } 185 return index - 1; 186 } 187 188 /* 189 * Returns in which sub ByteBuffer, the given element index will be available. In this case we are 190 * sure that the item will be after MBB's current position 191 */ 192 private int getItemIndexFromCurItemIndex(int elemIndex) { 193 int index = this.curItemIndex; 194 while (elemIndex >= this.itemBeginPos[index]) { 195 index++; 196 if (index == this.itemBeginPos.length) { 197 throw new IndexOutOfBoundsException(); 198 } 199 } 200 return index - 1; 201 } 202 203 /** 204 * Fetches the int at the given index. Does not change position of the underlying ByteBuffers 205 * @return the int value at the given index 206 */ 207 @Override 208 public int getInt(int index) { 209 checkRefCount(); 210 // Mostly the index specified will land within this current item. Short circuit for that 211 int itemIndex; 212 if ( 213 this.itemBeginPos[this.curItemIndex] <= index 214 && this.itemBeginPos[this.curItemIndex + 1] > index 215 ) { 216 itemIndex = this.curItemIndex; 217 } else { 218 itemIndex = getItemIndex(index); 219 } 220 return getInt(index, itemIndex); 221 } 222 223 @Override 224 public int getIntAfterPosition(int offset) { 225 checkRefCount(); 226 // Mostly the index specified will land within this current item. Short circuit for that 227 int index = offset + this.position(); 228 int itemIndex; 229 if (this.itemBeginPos[this.curItemIndex + 1] > index) { 230 itemIndex = this.curItemIndex; 231 } else { 232 itemIndex = getItemIndexFromCurItemIndex(index); 233 } 234 return getInt(index, itemIndex); 235 } 236 237 /** 238 * Fetches the short at the given index. Does not change position of the underlying ByteBuffers 239 * @return the short value at the given index 240 */ 241 @Override 242 public short getShort(int index) { 243 checkRefCount(); 244 // Mostly the index specified will land within this current item. Short circuit for that 245 int itemIndex; 246 if ( 247 this.itemBeginPos[this.curItemIndex] <= index 248 && this.itemBeginPos[this.curItemIndex + 1] > index 249 ) { 250 itemIndex = this.curItemIndex; 251 } else { 252 itemIndex = getItemIndex(index); 253 } 254 ByteBuffer item = items[itemIndex]; 255 int offsetInItem = index - this.itemBeginPos[itemIndex]; 256 if (item.limit() - offsetInItem >= Bytes.SIZEOF_SHORT) { 257 return ByteBufferUtils.toShort(item, offsetInItem); 258 } 259 if (items.length - 1 == itemIndex) { 260 // means cur item is the last one and we wont be able to read a int. Throw exception 261 throw new BufferUnderflowException(); 262 } 263 ByteBuffer nextItem = items[itemIndex + 1]; 264 // Get available one byte from this item and remaining one from next 265 short n = 0; 266 n = (short) (n ^ (ByteBufferUtils.toByte(item, offsetInItem) & 0xFF)); 267 n = (short) (n << 8); 268 n = (short) (n ^ (ByteBufferUtils.toByte(nextItem, 0) & 0xFF)); 269 return n; 270 } 271 272 @Override 273 public short getShortAfterPosition(int offset) { 274 checkRefCount(); 275 // Mostly the index specified will land within this current item. Short circuit for that 276 int index = offset + this.position(); 277 int itemIndex; 278 if (this.itemBeginPos[this.curItemIndex + 1] > index) { 279 itemIndex = this.curItemIndex; 280 } else { 281 itemIndex = getItemIndexFromCurItemIndex(index); 282 } 283 return getShort(index, itemIndex); 284 } 285 286 private int getInt(int index, int itemIndex) { 287 ByteBuffer item = items[itemIndex]; 288 int offsetInItem = index - this.itemBeginPos[itemIndex]; 289 int remainingLen = item.limit() - offsetInItem; 290 if (remainingLen >= Bytes.SIZEOF_INT) { 291 return ByteBufferUtils.toInt(item, offsetInItem); 292 } 293 if (items.length - 1 == itemIndex) { 294 // means cur item is the last one and we wont be able to read a int. Throw exception 295 throw new BufferUnderflowException(); 296 } 297 int l = 0; 298 for (int i = 0; i < Bytes.SIZEOF_INT; i++) { 299 l <<= 8; 300 l ^= get(index + i) & 0xFF; 301 } 302 return l; 303 } 304 305 private short getShort(int index, int itemIndex) { 306 ByteBuffer item = items[itemIndex]; 307 int offsetInItem = index - this.itemBeginPos[itemIndex]; 308 int remainingLen = item.limit() - offsetInItem; 309 if (remainingLen >= Bytes.SIZEOF_SHORT) { 310 return ByteBufferUtils.toShort(item, offsetInItem); 311 } 312 if (items.length - 1 == itemIndex) { 313 // means cur item is the last one and we wont be able to read a short. Throw exception 314 throw new BufferUnderflowException(); 315 } 316 ByteBuffer nextItem = items[itemIndex + 1]; 317 // Get available bytes from this item and remaining from next 318 short l = 0; 319 for (int i = offsetInItem; i < item.capacity(); i++) { 320 l = (short) (l << 8); 321 l = (short) (l ^ (ByteBufferUtils.toByte(item, i) & 0xFF)); 322 } 323 for (int i = 0; i < Bytes.SIZEOF_SHORT - remainingLen; i++) { 324 l = (short) (l << 8); 325 l = (short) (l ^ (ByteBufferUtils.toByte(nextItem, i) & 0xFF)); 326 } 327 return l; 328 } 329 330 private long getLong(int index, int itemIndex) { 331 ByteBuffer item = items[itemIndex]; 332 int offsetInItem = index - this.itemBeginPos[itemIndex]; 333 int remainingLen = item.limit() - offsetInItem; 334 if (remainingLen >= Bytes.SIZEOF_LONG) { 335 return ByteBufferUtils.toLong(item, offsetInItem); 336 } 337 if (items.length - 1 == itemIndex) { 338 // means cur item is the last one and we wont be able to read a long. Throw exception 339 throw new BufferUnderflowException(); 340 } 341 long l = 0; 342 for (int i = 0; i < Bytes.SIZEOF_LONG; i++) { 343 l <<= 8; 344 l ^= get(index + i) & 0xFF; 345 } 346 return l; 347 } 348 349 /** 350 * Fetches the long at the given index. Does not change position of the underlying ByteBuffers 351 * @return the long value at the given index 352 */ 353 @Override 354 public long getLong(int index) { 355 checkRefCount(); 356 // Mostly the index specified will land within this current item. Short circuit for that 357 int itemIndex; 358 if ( 359 this.itemBeginPos[this.curItemIndex] <= index 360 && this.itemBeginPos[this.curItemIndex + 1] > index 361 ) { 362 itemIndex = this.curItemIndex; 363 } else { 364 itemIndex = getItemIndex(index); 365 } 366 return getLong(index, itemIndex); 367 } 368 369 @Override 370 public long getLongAfterPosition(int offset) { 371 checkRefCount(); 372 // Mostly the index specified will land within this current item. Short circuit for that 373 int index = offset + this.position(); 374 int itemIndex; 375 if (this.itemBeginPos[this.curItemIndex + 1] > index) { 376 itemIndex = this.curItemIndex; 377 } else { 378 itemIndex = getItemIndexFromCurItemIndex(index); 379 } 380 return getLong(index, itemIndex); 381 } 382 383 /** Returns this MBB's current position */ 384 @Override 385 public int position() { 386 checkRefCount(); 387 return itemBeginPos[this.curItemIndex] + this.curItem.position(); 388 } 389 390 /** 391 * Sets this MBB's position to the given value. 392 * @return this object 393 */ 394 @Override 395 public MultiByteBuff position(int position) { 396 checkRefCount(); 397 // Short circuit for positioning within the cur item. Mostly that is the case. 398 if ( 399 this.itemBeginPos[this.curItemIndex] <= position 400 && this.itemBeginPos[this.curItemIndex + 1] > position 401 ) { 402 this.curItem.position(position - this.itemBeginPos[this.curItemIndex]); 403 return this; 404 } 405 int itemIndex = getItemIndex(position); 406 // All items from 0 - curItem-1 set position at end. 407 for (int i = 0; i < itemIndex; i++) { 408 this.items[i].position(this.items[i].limit()); 409 } 410 // All items after curItem set position at begin 411 for (int i = itemIndex + 1; i < this.items.length; i++) { 412 this.items[i].position(0); 413 } 414 this.curItem = this.items[itemIndex]; 415 this.curItem.position(position - this.itemBeginPos[itemIndex]); 416 this.curItemIndex = itemIndex; 417 return this; 418 } 419 420 /** 421 * Rewinds this MBB and the position is set to 0 422 * @return this object 423 */ 424 @Override 425 public MultiByteBuff rewind() { 426 checkRefCount(); 427 for (int i = 0; i < this.items.length; i++) { 428 this.items[i].rewind(); 429 } 430 this.curItemIndex = 0; 431 this.curItem = this.items[this.curItemIndex]; 432 this.markedItemIndex = -1; 433 return this; 434 } 435 436 /** 437 * Marks the current position of the MBB 438 * @return this object 439 */ 440 @Override 441 public MultiByteBuff mark() { 442 checkRefCount(); 443 this.markedItemIndex = this.curItemIndex; 444 this.curItem.mark(); 445 return this; 446 } 447 448 /** 449 * Similar to {@link ByteBuffer}.reset(), ensures that this MBB is reset back to last marked 450 * position. 451 * @return This MBB 452 */ 453 @Override 454 public MultiByteBuff reset() { 455 checkRefCount(); 456 // when the buffer is moved to the next one.. the reset should happen on the previous marked 457 // item and the new one should be taken as the base 458 if (this.markedItemIndex < 0) throw new InvalidMarkException(); 459 ByteBuffer markedItem = this.items[this.markedItemIndex]; 460 markedItem.reset(); 461 this.curItem = markedItem; 462 // All items after the marked position upto the current item should be reset to 0 463 for (int i = this.curItemIndex; i > this.markedItemIndex; i--) { 464 this.items[i].position(0); 465 } 466 this.curItemIndex = this.markedItemIndex; 467 return this; 468 } 469 470 /** 471 * Returns the number of elements between the current position and the limit. 472 * @return the remaining elements in this MBB 473 */ 474 @Override 475 public int remaining() { 476 checkRefCount(); 477 int remain = 0; 478 for (int i = curItemIndex; i < items.length; i++) { 479 remain += items[i].remaining(); 480 } 481 return remain; 482 } 483 484 /** 485 * Returns true if there are elements between the current position and the limt 486 * @return true if there are elements, false otherwise 487 */ 488 @Override 489 public final boolean hasRemaining() { 490 checkRefCount(); 491 return this.curItem.hasRemaining() || (this.curItemIndex < this.limitedItemIndex 492 && this.items[this.curItemIndex + 1].hasRemaining()); 493 } 494 495 /** 496 * A relative method that returns byte at the current position. Increments the current position by 497 * the size of a byte. 498 * @return the byte at the current position 499 */ 500 @Override 501 public byte get() { 502 checkRefCount(); 503 if (this.curItem.remaining() == 0) { 504 if (items.length - 1 == this.curItemIndex) { 505 // means cur item is the last one and we wont be able to read a long. Throw exception 506 throw new BufferUnderflowException(); 507 } 508 this.curItemIndex++; 509 this.curItem = this.items[this.curItemIndex]; 510 } 511 return this.curItem.get(); 512 } 513 514 /** 515 * Returns the short value at the current position. Also advances the position by the size of 516 * short 517 * @return the short value at the current position 518 */ 519 @Override 520 public short getShort() { 521 checkRefCount(); 522 int remaining = this.curItem.remaining(); 523 if (remaining >= Bytes.SIZEOF_SHORT) { 524 return this.curItem.getShort(); 525 } 526 short n = 0; 527 n = (short) (n ^ (get() & 0xFF)); 528 n = (short) (n << 8); 529 n = (short) (n ^ (get() & 0xFF)); 530 return n; 531 } 532 533 /** 534 * Returns the int value at the current position. Also advances the position by the size of int 535 * @return the int value at the current position 536 */ 537 @Override 538 public int getInt() { 539 checkRefCount(); 540 int remaining = this.curItem.remaining(); 541 if (remaining >= Bytes.SIZEOF_INT) { 542 return this.curItem.getInt(); 543 } 544 int n = 0; 545 for (int i = 0; i < Bytes.SIZEOF_INT; i++) { 546 n <<= 8; 547 n ^= get() & 0xFF; 548 } 549 return n; 550 } 551 552 /** 553 * Returns the long value at the current position. Also advances the position by the size of long 554 * @return the long value at the current position 555 */ 556 @Override 557 public long getLong() { 558 checkRefCount(); 559 int remaining = this.curItem.remaining(); 560 if (remaining >= Bytes.SIZEOF_LONG) { 561 return this.curItem.getLong(); 562 } 563 long l = 0; 564 for (int i = 0; i < Bytes.SIZEOF_LONG; i++) { 565 l <<= 8; 566 l ^= get() & 0xFF; 567 } 568 return l; 569 } 570 571 /** 572 * Copies the content from this MBB's current position to the byte array and fills it. Also 573 * advances the position of the MBB by the length of the byte[]. 574 */ 575 @Override 576 public void get(byte[] dst) { 577 get(dst, 0, dst.length); 578 } 579 580 /** 581 * Copies the specified number of bytes from this MBB's current position to the byte[]'s offset. 582 * Also advances the position of the MBB by the given length. 583 */ 584 @Override 585 public void get(byte[] dst, int offset, int length) { 586 checkRefCount(); 587 while (length > 0) { 588 int toRead = Math.min(length, this.curItem.remaining()); 589 ByteBufferUtils.copyFromBufferToArray(dst, this.curItem, this.curItem.position(), offset, 590 toRead); 591 this.curItem.position(this.curItem.position() + toRead); 592 length -= toRead; 593 if (length == 0) break; 594 this.curItemIndex++; 595 this.curItem = this.items[this.curItemIndex]; 596 offset += toRead; 597 } 598 } 599 600 @Override 601 public void get(int sourceOffset, byte[] dst, int offset, int length) { 602 checkRefCount(); 603 int itemIndex = getItemIndex(sourceOffset); 604 ByteBuffer item = this.items[itemIndex]; 605 sourceOffset = sourceOffset - this.itemBeginPos[itemIndex]; 606 while (length > 0) { 607 int toRead = Math.min((item.limit() - sourceOffset), length); 608 ByteBufferUtils.copyFromBufferToArray(dst, item, sourceOffset, offset, toRead); 609 length -= toRead; 610 if (length == 0) break; 611 itemIndex++; 612 item = this.items[itemIndex]; 613 offset += toRead; 614 sourceOffset = 0; 615 } 616 } 617 618 /** 619 * Marks the limit of this MBB. 620 * @return This MBB 621 */ 622 @Override 623 public MultiByteBuff limit(int limit) { 624 checkRefCount(); 625 this.limit = limit; 626 // Normally the limit will try to limit within the last BB item 627 int limitedIndexBegin = this.itemBeginPos[this.limitedItemIndex]; 628 if (limit >= limitedIndexBegin && limit < this.itemBeginPos[this.limitedItemIndex + 1]) { 629 this.items[this.limitedItemIndex].limit(limit - limitedIndexBegin); 630 return this; 631 } 632 int itemIndex = getItemIndex(limit); 633 int beginOffset = this.itemBeginPos[itemIndex]; 634 int offsetInItem = limit - beginOffset; 635 ByteBuffer item = items[itemIndex]; 636 item.limit(offsetInItem); 637 for (int i = this.limitedItemIndex; i < itemIndex; i++) { 638 this.items[i].limit(this.items[i].capacity()); 639 } 640 this.limitedItemIndex = itemIndex; 641 for (int i = itemIndex + 1; i < this.items.length; i++) { 642 this.items[i].limit(this.items[i].position()); 643 } 644 return this; 645 } 646 647 /** 648 * Returns the limit of this MBB 649 * @return limit of the MBB 650 */ 651 @Override 652 public int limit() { 653 return this.limit; 654 } 655 656 /** 657 * Returns an MBB which is a sliced version of this MBB. The position, limit and mark of the new 658 * MBB will be independent than that of the original MBB. The content of the new MBB will start at 659 * this MBB's current position 660 * @return a sliced MBB 661 */ 662 @Override 663 public MultiByteBuff slice() { 664 checkRefCount(); 665 ByteBuffer[] copy = new ByteBuffer[this.limitedItemIndex - this.curItemIndex + 1]; 666 for (int i = curItemIndex, j = 0; i <= this.limitedItemIndex; i++, j++) { 667 copy[j] = this.items[i].slice(); 668 } 669 return new MultiByteBuff(refCnt, copy); 670 } 671 672 /** 673 * Returns an MBB which is a duplicate version of this MBB. The position, limit and mark of the 674 * new MBB will be independent than that of the original MBB. The content of the new MBB will 675 * start at this MBB's current position The position, limit and mark of the new MBB would be 676 * identical to this MBB in terms of values. 677 * @return a duplicated MBB 678 */ 679 @Override 680 public MultiByteBuff duplicate() { 681 checkRefCount(); 682 ByteBuffer[] itemsCopy = new ByteBuffer[this.items.length]; 683 for (int i = 0; i < this.items.length; i++) { 684 itemsCopy[i] = items[i].duplicate(); 685 } 686 return new MultiByteBuff(refCnt, itemsCopy, this.itemBeginPos, this.limit, 687 this.limitedItemIndex, this.curItemIndex, this.markedItemIndex); 688 } 689 690 /** 691 * Writes a byte to this MBB at the current position and increments the position 692 * @return this object 693 */ 694 @Override 695 public MultiByteBuff put(byte b) { 696 checkRefCount(); 697 if (this.curItem.remaining() == 0) { 698 if (this.curItemIndex == this.items.length - 1) { 699 throw new BufferOverflowException(); 700 } 701 this.curItemIndex++; 702 this.curItem = this.items[this.curItemIndex]; 703 } 704 this.curItem.put(b); 705 return this; 706 } 707 708 /** 709 * Writes a byte to this MBB at the given index and won't affect the position of any of the 710 * buffers. 711 * @return this object 712 * @throws IndexOutOfBoundsException If <tt>index</tt> is negative or not smaller than the 713 * {@link MultiByteBuff#limit} 714 */ 715 @Override 716 public MultiByteBuff put(int index, byte b) { 717 checkRefCount(); 718 int itemIndex = getItemIndex(index); 719 ByteBuffer item = items[itemIndex]; 720 item.put(index - itemBeginPos[itemIndex], b); 721 return this; 722 } 723 724 /** 725 * Copies from a src BB to this MBB. This will be absolute positional copying and won't affect the 726 * position of any of the buffers. 727 * @param destOffset the position in this MBB to which the copy should happen 728 * @param src the src MBB 729 * @param srcOffset the offset in the src MBB from where the elements should be read 730 * @param length the length upto which the copy should happen 731 * @throws BufferUnderflowException If there are fewer than length bytes remaining in src 732 * ByteBuff. 733 * @throws BufferOverflowException If there is insufficient available space in this MBB for 734 * length bytes. 735 */ 736 @Override 737 public MultiByteBuff put(int destOffset, ByteBuff src, int srcOffset, int length) { 738 checkRefCount(); 739 int destItemIndex = getItemIndex(destOffset); 740 int srcItemIndex = getItemIndexForByteBuff(src, srcOffset, length); 741 742 ByteBuffer destItem = this.items[destItemIndex]; 743 destOffset = this.getRelativeOffset(destOffset, destItemIndex); 744 745 ByteBuffer srcItem = getItemByteBuffer(src, srcItemIndex); 746 srcOffset = getRelativeOffsetForByteBuff(src, srcOffset, srcItemIndex); 747 748 while (length > 0) { 749 int toWrite = destItem.limit() - destOffset; 750 if (toWrite <= 0) { 751 throw new BufferOverflowException(); 752 } 753 int toRead = srcItem.limit() - srcOffset; 754 if (toRead <= 0) { 755 throw new BufferUnderflowException(); 756 } 757 int toMove = Math.min(length, Math.min(toRead, toWrite)); 758 ByteBufferUtils.copyFromBufferToBuffer(srcItem, destItem, srcOffset, destOffset, toMove); 759 length -= toMove; 760 if (length == 0) { 761 break; 762 } 763 if (toRead < toWrite) { 764 if (++srcItemIndex >= getItemByteBufferCount(src)) { 765 throw new BufferUnderflowException(); 766 } 767 srcItem = getItemByteBuffer(src, srcItemIndex); 768 srcOffset = 0; 769 destOffset += toMove; 770 } else if (toRead > toWrite) { 771 if (++destItemIndex >= this.items.length) { 772 throw new BufferOverflowException(); 773 } 774 destItem = this.items[destItemIndex]; 775 destOffset = 0; 776 srcOffset += toMove; 777 } else { 778 // toRead = toWrite case 779 if (++srcItemIndex >= getItemByteBufferCount(src)) { 780 throw new BufferUnderflowException(); 781 } 782 srcItem = getItemByteBuffer(src, srcItemIndex); 783 srcOffset = 0; 784 if (++destItemIndex >= this.items.length) { 785 throw new BufferOverflowException(); 786 } 787 destItem = this.items[destItemIndex]; 788 destOffset = 0; 789 } 790 } 791 return this; 792 } 793 794 private static ByteBuffer getItemByteBuffer(ByteBuff buf, int byteBufferIndex) { 795 if (buf instanceof SingleByteBuff) { 796 if (byteBufferIndex != 0) { 797 throw new IndexOutOfBoundsException( 798 "index:[" + byteBufferIndex + "],but only index 0 is valid."); 799 } 800 return buf.nioByteBuffers()[0]; 801 } 802 MultiByteBuff multiByteBuff = (MultiByteBuff) buf; 803 if (byteBufferIndex < 0 || byteBufferIndex >= multiByteBuff.items.length) { 804 throw new IndexOutOfBoundsException("index:[" + byteBufferIndex + "],but only index [0-" 805 + multiByteBuff.items.length + ") is valid."); 806 } 807 return multiByteBuff.items[byteBufferIndex]; 808 } 809 810 private static int getItemIndexForByteBuff(ByteBuff byteBuff, int offset, int length) { 811 if (byteBuff instanceof SingleByteBuff) { 812 ByteBuffer byteBuffer = byteBuff.nioByteBuffers()[0]; 813 if (offset + length > byteBuffer.limit()) { 814 throw new BufferUnderflowException(); 815 } 816 return 0; 817 } 818 MultiByteBuff multiByteBuff = (MultiByteBuff) byteBuff; 819 return multiByteBuff.getItemIndex(offset); 820 } 821 822 private static int getRelativeOffsetForByteBuff(ByteBuff byteBuff, int globalOffset, 823 int itemIndex) { 824 if (byteBuff instanceof SingleByteBuff) { 825 if (itemIndex != 0) { 826 throw new IndexOutOfBoundsException("index:[" + itemIndex + "],but only index 0 is valid."); 827 } 828 return globalOffset; 829 } 830 return ((MultiByteBuff) byteBuff).getRelativeOffset(globalOffset, itemIndex); 831 } 832 833 private int getRelativeOffset(int globalOffset, int itemIndex) { 834 if (itemIndex < 0 || itemIndex >= this.items.length) { 835 throw new IndexOutOfBoundsException( 836 "index:[" + itemIndex + "],but only index [0-" + this.items.length + ") is valid."); 837 } 838 return globalOffset - this.itemBeginPos[itemIndex]; 839 } 840 841 private static int getItemByteBufferCount(ByteBuff buf) { 842 return (buf instanceof SingleByteBuff) ? 1 : ((MultiByteBuff) buf).items.length; 843 } 844 845 /** 846 * Writes an int to this MBB at its current position. Also advances the position by size of int 847 * @param val Int value to write 848 * @return this object 849 */ 850 @Override 851 public MultiByteBuff putInt(int val) { 852 checkRefCount(); 853 if (this.curItem.remaining() >= Bytes.SIZEOF_INT) { 854 this.curItem.putInt(val); 855 return this; 856 } 857 if (this.curItemIndex == this.items.length - 1) { 858 throw new BufferOverflowException(); 859 } 860 // During read, we will read as byte by byte for this case. So just write in Big endian 861 put(int3(val)); 862 put(int2(val)); 863 put(int1(val)); 864 put(int0(val)); 865 return this; 866 } 867 868 private static byte int3(int x) { 869 return (byte) (x >> 24); 870 } 871 872 private static byte int2(int x) { 873 return (byte) (x >> 16); 874 } 875 876 private static byte int1(int x) { 877 return (byte) (x >> 8); 878 } 879 880 private static byte int0(int x) { 881 return (byte) x; 882 } 883 884 /** Copies from the given byte[] to this MBB */ 885 @Override 886 public final MultiByteBuff put(byte[] src) { 887 return put(src, 0, src.length); 888 } 889 890 /** Copies from the given byte[] to this MBB. */ 891 @Override 892 public MultiByteBuff put(byte[] src, int offset, int length) { 893 checkRefCount(); 894 if (this.curItem.remaining() >= length) { 895 ByteBufferUtils.copyFromArrayToBuffer(this.curItem, src, offset, length); 896 return this; 897 } 898 int end = offset + length; 899 for (int i = offset; i < end; i++) { 900 this.put(src[i]); 901 } 902 return this; 903 } 904 905 /** 906 * Writes a long to this MBB at its current position. Also advances the position by size of long 907 * @param val Long value to write 908 * @return this object 909 */ 910 @Override 911 public MultiByteBuff putLong(long val) { 912 checkRefCount(); 913 if (this.curItem.remaining() >= Bytes.SIZEOF_LONG) { 914 this.curItem.putLong(val); 915 return this; 916 } 917 if (this.curItemIndex == this.items.length - 1) { 918 throw new BufferOverflowException(); 919 } 920 // During read, we will read as byte by byte for this case. So just write in Big endian 921 put(long7(val)); 922 put(long6(val)); 923 put(long5(val)); 924 put(long4(val)); 925 put(long3(val)); 926 put(long2(val)); 927 put(long1(val)); 928 put(long0(val)); 929 return this; 930 } 931 932 private static byte long7(long x) { 933 return (byte) (x >> 56); 934 } 935 936 private static byte long6(long x) { 937 return (byte) (x >> 48); 938 } 939 940 private static byte long5(long x) { 941 return (byte) (x >> 40); 942 } 943 944 private static byte long4(long x) { 945 return (byte) (x >> 32); 946 } 947 948 private static byte long3(long x) { 949 return (byte) (x >> 24); 950 } 951 952 private static byte long2(long x) { 953 return (byte) (x >> 16); 954 } 955 956 private static byte long1(long x) { 957 return (byte) (x >> 8); 958 } 959 960 private static byte long0(long x) { 961 return (byte) x; 962 } 963 964 /** 965 * Jumps the current position of this MBB by specified length. 966 */ 967 @Override 968 public MultiByteBuff skip(int length) { 969 checkRefCount(); 970 // Get available bytes from this item and remaining from next 971 int jump = 0; 972 while (true) { 973 jump = this.curItem.remaining(); 974 if (jump >= length) { 975 this.curItem.position(this.curItem.position() + length); 976 break; 977 } 978 this.curItem.position(this.curItem.position() + jump); 979 length -= jump; 980 this.curItemIndex++; 981 this.curItem = this.items[this.curItemIndex]; 982 } 983 return this; 984 } 985 986 /** 987 * Jumps back the current position of this MBB by specified length. 988 */ 989 @Override 990 public MultiByteBuff moveBack(int length) { 991 checkRefCount(); 992 while (length != 0) { 993 if (length > curItem.position()) { 994 length -= curItem.position(); 995 this.curItem.position(0); 996 this.curItemIndex--; 997 this.curItem = this.items[curItemIndex]; 998 } else { 999 this.curItem.position(curItem.position() - length); 1000 break; 1001 } 1002 } 1003 return this; 1004 } 1005 1006 /** 1007 * Returns bytes from current position till length specified, as a single ByteBuffer. When all 1008 * these bytes happen to be in a single ByteBuffer, which this object wraps, that ByteBuffer item 1009 * as such will be returned. So users are warned not to change the position or limit of this 1010 * returned ByteBuffer. The position of the returned byte buffer is at the begin of the required 1011 * bytes. When the required bytes happen to span across multiple ByteBuffers, this API will copy 1012 * the bytes to a newly created ByteBuffer of required size and return that. 1013 * @param length number of bytes required. 1014 * @return bytes from current position till length specified, as a single ByteButter. 1015 */ 1016 @Override 1017 public ByteBuffer asSubByteBuffer(int length) { 1018 checkRefCount(); 1019 if (this.curItem.remaining() >= length) { 1020 return this.curItem; 1021 } 1022 int offset = 0; 1023 byte[] dupB = new byte[length]; 1024 int locCurItemIndex = curItemIndex; 1025 ByteBuffer locCurItem = curItem; 1026 while (length > 0) { 1027 int toRead = Math.min(length, locCurItem.remaining()); 1028 ByteBufferUtils.copyFromBufferToArray(dupB, locCurItem, locCurItem.position(), offset, 1029 toRead); 1030 length -= toRead; 1031 if (length == 0) break; 1032 locCurItemIndex++; 1033 locCurItem = this.items[locCurItemIndex]; 1034 offset += toRead; 1035 } 1036 return ByteBuffer.wrap(dupB); 1037 } 1038 1039 /** 1040 * Returns bytes from given offset till length specified, as a single ByteBuffer. When all these 1041 * bytes happen to be in a single ByteBuffer, which this object wraps, that ByteBuffer item as 1042 * such will be returned (with offset in this ByteBuffer where the bytes starts). So users are 1043 * warned not to change the position or limit of this returned ByteBuffer. When the required bytes 1044 * happen to span across multiple ByteBuffers, this API will copy the bytes to a newly created 1045 * ByteBuffer of required size and return that. 1046 * @param offset the offset in this MBB from where the subBuffer should be created 1047 * @param length the length of the subBuffer 1048 * @param pair a pair that will have the bytes from the current position till length specified, 1049 * as a single ByteBuffer and offset in that Buffer where the bytes starts. The 1050 * method would set the values on the pair that is passed in by the caller 1051 */ 1052 @Override 1053 public void asSubByteBuffer(int offset, int length, ObjectIntPair<ByteBuffer> pair) { 1054 checkRefCount(); 1055 if (this.itemBeginPos[this.curItemIndex] <= offset) { 1056 int relOffsetInCurItem = offset - this.itemBeginPos[this.curItemIndex]; 1057 if (this.curItem.limit() - relOffsetInCurItem >= length) { 1058 pair.setFirst(this.curItem); 1059 pair.setSecond(relOffsetInCurItem); 1060 return; 1061 } 1062 } 1063 int itemIndex = getItemIndex(offset); 1064 ByteBuffer item = this.items[itemIndex]; 1065 offset = offset - this.itemBeginPos[itemIndex]; 1066 if (item.limit() - offset >= length) { 1067 pair.setFirst(item); 1068 pair.setSecond(offset); 1069 return; 1070 } 1071 byte[] dst = new byte[length]; 1072 int destOffset = 0; 1073 while (length > 0) { 1074 int toRead = Math.min(length, item.limit() - offset); 1075 ByteBufferUtils.copyFromBufferToArray(dst, item, offset, destOffset, toRead); 1076 length -= toRead; 1077 if (length == 0) break; 1078 itemIndex++; 1079 item = this.items[itemIndex]; 1080 destOffset += toRead; 1081 offset = 0; 1082 } 1083 pair.setFirst(ByteBuffer.wrap(dst)); 1084 pair.setSecond(0); 1085 } 1086 1087 /** 1088 * Copies the content from an this MBB to a ByteBuffer 1089 * @param out the ByteBuffer to which the copy has to happen, its position will be 1090 * advanced. 1091 * @param sourceOffset the offset in the MBB from which the elements has to be copied 1092 * @param length the length in the MBB upto which the elements has to be copied 1093 */ 1094 @Override 1095 public void get(ByteBuffer out, int sourceOffset, int length) { 1096 checkRefCount(); 1097 int itemIndex = getItemIndex(sourceOffset); 1098 ByteBuffer in = this.items[itemIndex]; 1099 sourceOffset = sourceOffset - this.itemBeginPos[itemIndex]; 1100 while (length > 0) { 1101 int toRead = Math.min(in.limit() - sourceOffset, length); 1102 ByteBufferUtils.copyFromBufferToBuffer(in, out, sourceOffset, toRead); 1103 length -= toRead; 1104 if (length == 0) { 1105 break; 1106 } 1107 itemIndex++; 1108 in = this.items[itemIndex]; 1109 sourceOffset = 0; 1110 } 1111 } 1112 1113 /** 1114 * Copy the content from this MBB to a byte[] based on the given offset and length the position 1115 * from where the copy should start the length upto which the copy has to be done 1116 * @return byte[] with the copied contents from this MBB. 1117 */ 1118 @Override 1119 public byte[] toBytes(int offset, int length) { 1120 checkRefCount(); 1121 byte[] output = new byte[length]; 1122 this.get(offset, output, 0, length); 1123 return output; 1124 } 1125 1126 private int internalRead(ReadableByteChannel channel, long offset, ChannelReader reader) 1127 throws IOException { 1128 checkRefCount(); 1129 int total = 0; 1130 while (buffsIterator.hasNext()) { 1131 ByteBuffer buffer = buffsIterator.next(); 1132 int len = read(channel, buffer, offset, reader); 1133 if (len > 0) { 1134 total += len; 1135 offset += len; 1136 } 1137 if (buffer.hasRemaining()) { 1138 // reset 1139 curItem = buffer; 1140 curItemIndex = (curItemIndex - 1); 1141 break; 1142 } 1143 } 1144 return total; 1145 } 1146 1147 @Override 1148 public int read(ReadableByteChannel channel) throws IOException { 1149 return internalRead(channel, 0, CHANNEL_READER); 1150 } 1151 1152 @Override 1153 public int read(FileChannel channel, long offset) throws IOException { 1154 return internalRead(channel, offset, FILE_READER); 1155 } 1156 1157 @Override 1158 public int write(FileChannel channel, long offset) throws IOException { 1159 checkRefCount(); 1160 int total = 0; 1161 while (buffsIterator.hasNext()) { 1162 ByteBuffer buffer = buffsIterator.next(); 1163 while (buffer.hasRemaining()) { 1164 int len = channel.write(buffer, offset); 1165 total += len; 1166 offset += len; 1167 } 1168 } 1169 return total; 1170 } 1171 1172 @Override 1173 public ByteBuffer[] nioByteBuffers() { 1174 checkRefCount(); 1175 return this.items; 1176 } 1177 1178 @Override 1179 public boolean equals(Object obj) { 1180 if (!(obj instanceof MultiByteBuff)) return false; 1181 if (this == obj) return true; 1182 MultiByteBuff that = (MultiByteBuff) obj; 1183 if (this.capacity() != that.capacity()) return false; 1184 if ( 1185 ByteBuff.compareTo(this, this.position(), this.limit(), that, that.position(), that.limit()) 1186 == 0 1187 ) { 1188 return true; 1189 } 1190 return false; 1191 } 1192 1193 @Override 1194 public int hashCode() { 1195 int hash = 0; 1196 for (ByteBuffer b : this.items) { 1197 hash += b.hashCode(); 1198 } 1199 return hash; 1200 } 1201 1202 @Override 1203 public MultiByteBuff retain() { 1204 refCnt.retain(); 1205 return this; 1206 } 1207}