001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.nio; 019 020import static org.apache.hadoop.hbase.io.ByteBuffAllocator.NONE; 021 022import java.io.IOException; 023import java.nio.BufferOverflowException; 024import java.nio.BufferUnderflowException; 025import java.nio.ByteBuffer; 026import java.nio.InvalidMarkException; 027import java.nio.channels.FileChannel; 028import java.nio.channels.ReadableByteChannel; 029import java.util.Iterator; 030import java.util.NoSuchElementException; 031import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler; 032import org.apache.hadoop.hbase.util.ByteBufferUtils; 033import org.apache.hadoop.hbase.util.Bytes; 034import org.apache.hadoop.hbase.util.ObjectIntPair; 035import org.apache.yetus.audience.InterfaceAudience; 036 037/** 038 * Provides a unified view of all the underlying ByteBuffers and will look as if a bigger sequential 039 * buffer. This class provides similar APIs as in {@link ByteBuffer} to put/get int, short, long etc 040 * and doing operations like mark, reset, slice etc. This has to be used when data is split across 041 * multiple byte buffers and we don't want copy them to single buffer for reading from it. 042 */ 043@InterfaceAudience.Private 044public class MultiByteBuff extends ByteBuff { 045 046 private final ByteBuffer[] items; 047 // Pointer to the current item in the MBB 048 private ByteBuffer curItem = null; 049 // Index of the current item in the MBB 050 private int curItemIndex = 0; 051 052 private int limit = 0; 053 private int limitedItemIndex; 054 private int markedItemIndex = -1; 055 private final int[] itemBeginPos; 056 057 private Iterator<ByteBuffer> buffsIterator = new Iterator<ByteBuffer>() { 058 @Override 059 public boolean hasNext() { 060 return curItemIndex < limitedItemIndex 061 || (curItemIndex == limitedItemIndex && items[curItemIndex].hasRemaining()); 062 } 063 064 @Override 065 public ByteBuffer next() { 066 if (curItemIndex >= items.length) { 067 throw new NoSuchElementException("items overflow"); 068 } 069 curItem = items[curItemIndex++]; 070 return curItem; 071 } 072 }; 073 074 public MultiByteBuff(ByteBuffer... items) { 075 this(NONE, items); 076 } 077 078 public MultiByteBuff(Recycler recycler, ByteBuffer... items) { 079 this(new RefCnt(recycler), items); 080 } 081 082 MultiByteBuff(RefCnt refCnt, ByteBuffer... items) { 083 this.refCnt = refCnt; 084 assert items != null; 085 assert items.length > 0; 086 this.items = items; 087 this.curItem = this.items[this.curItemIndex]; 088 // See below optimization in getInt(int) where we check whether the given index land in current 089 // item. For this we need to check whether the passed index is less than the next item begin 090 // offset. To handle this effectively for the last item buffer, we add an extra item into this 091 // array. 092 itemBeginPos = new int[items.length + 1]; 093 int offset = 0; 094 for (int i = 0; i < items.length; i++) { 095 ByteBuffer item = items[i]; 096 item.rewind(); 097 itemBeginPos[i] = offset; 098 int l = item.limit() - item.position(); 099 offset += l; 100 } 101 this.limit = offset; 102 this.itemBeginPos[items.length] = offset + 1; 103 this.limitedItemIndex = this.items.length - 1; 104 } 105 106 private MultiByteBuff(RefCnt refCnt, ByteBuffer[] items, int[] itemBeginPos, int limit, 107 int limitedIndex, int curItemIndex, int markedIndex) { 108 this.refCnt = refCnt; 109 this.items = items; 110 this.curItemIndex = curItemIndex; 111 this.curItem = this.items[this.curItemIndex]; 112 this.itemBeginPos = itemBeginPos; 113 this.limit = limit; 114 this.limitedItemIndex = limitedIndex; 115 this.markedItemIndex = markedIndex; 116 } 117 118 /** 119 * @throws UnsupportedOperationException MBB does not support array based operations 120 */ 121 @Override 122 public byte[] array() { 123 throw new UnsupportedOperationException(); 124 } 125 126 /** 127 * @throws UnsupportedOperationException MBB does not support array based operations 128 */ 129 @Override 130 public int arrayOffset() { 131 throw new UnsupportedOperationException(); 132 } 133 134 /** Returns false. MBB does not support array based operations */ 135 @Override 136 public boolean hasArray() { 137 return false; 138 } 139 140 /** Returns the total capacity of this MultiByteBuffer. */ 141 @Override 142 public int capacity() { 143 checkRefCount(); 144 int c = 0; 145 for (ByteBuffer item : this.items) { 146 c += item.capacity(); 147 } 148 return c; 149 } 150 151 /** 152 * Fetches the byte at the given index. Does not change position of the underlying ByteBuffers n 153 * * @return the byte at the given index 154 */ 155 @Override 156 public byte get(int index) { 157 checkRefCount(); 158 int itemIndex = getItemIndex(index); 159 return ByteBufferUtils.toByte(this.items[itemIndex], index - this.itemBeginPos[itemIndex]); 160 } 161 162 @Override 163 public byte getByteAfterPosition(int offset) { 164 checkRefCount(); 165 // Mostly the index specified will land within this current item. Short circuit for that 166 int index = offset + this.position(); 167 int itemIndex = getItemIndexFromCurItemIndex(index); 168 return ByteBufferUtils.toByte(this.items[itemIndex], index - this.itemBeginPos[itemIndex]); 169 } 170 171 /* 172 * Returns in which sub ByteBuffer, the given element index will be available. 173 */ 174 private int getItemIndex(int elemIndex) { 175 if (elemIndex < 0) { 176 throw new IndexOutOfBoundsException(); 177 } 178 int index = 1; 179 while (elemIndex >= this.itemBeginPos[index]) { 180 index++; 181 if (index == this.itemBeginPos.length) { 182 throw new IndexOutOfBoundsException(); 183 } 184 } 185 return index - 1; 186 } 187 188 /* 189 * Returns in which sub ByteBuffer, the given element index will be available. In this case we are 190 * sure that the item will be after MBB's current position 191 */ 192 private int getItemIndexFromCurItemIndex(int elemIndex) { 193 int index = this.curItemIndex; 194 while (elemIndex >= this.itemBeginPos[index]) { 195 index++; 196 if (index == this.itemBeginPos.length) { 197 throw new IndexOutOfBoundsException(); 198 } 199 } 200 return index - 1; 201 } 202 203 /** 204 * Fetches the int at the given index. Does not change position of the underlying ByteBuffers n 205 * * @return the int value at the given index 206 */ 207 @Override 208 public int getInt(int index) { 209 checkRefCount(); 210 // Mostly the index specified will land within this current item. Short circuit for that 211 int itemIndex; 212 if ( 213 this.itemBeginPos[this.curItemIndex] <= index 214 && this.itemBeginPos[this.curItemIndex + 1] > index 215 ) { 216 itemIndex = this.curItemIndex; 217 } else { 218 itemIndex = getItemIndex(index); 219 } 220 return getInt(index, itemIndex); 221 } 222 223 @Override 224 public int getIntAfterPosition(int offset) { 225 checkRefCount(); 226 // Mostly the index specified will land within this current item. Short circuit for that 227 int index = offset + this.position(); 228 int itemIndex; 229 if (this.itemBeginPos[this.curItemIndex + 1] > index) { 230 itemIndex = this.curItemIndex; 231 } else { 232 itemIndex = getItemIndexFromCurItemIndex(index); 233 } 234 return getInt(index, itemIndex); 235 } 236 237 /** 238 * Fetches the short at the given index. Does not change position of the underlying ByteBuffers n 239 * * @return the short value at the given index 240 */ 241 @Override 242 public short getShort(int index) { 243 checkRefCount(); 244 // Mostly the index specified will land within this current item. Short circuit for that 245 int itemIndex; 246 if ( 247 this.itemBeginPos[this.curItemIndex] <= index 248 && this.itemBeginPos[this.curItemIndex + 1] > index 249 ) { 250 itemIndex = this.curItemIndex; 251 } else { 252 itemIndex = getItemIndex(index); 253 } 254 ByteBuffer item = items[itemIndex]; 255 int offsetInItem = index - this.itemBeginPos[itemIndex]; 256 if (item.limit() - offsetInItem >= Bytes.SIZEOF_SHORT) { 257 return ByteBufferUtils.toShort(item, offsetInItem); 258 } 259 if (items.length - 1 == itemIndex) { 260 // means cur item is the last one and we wont be able to read a int. Throw exception 261 throw new BufferUnderflowException(); 262 } 263 ByteBuffer nextItem = items[itemIndex + 1]; 264 // Get available one byte from this item and remaining one from next 265 short n = 0; 266 n = (short) (n ^ (ByteBufferUtils.toByte(item, offsetInItem) & 0xFF)); 267 n = (short) (n << 8); 268 n = (short) (n ^ (ByteBufferUtils.toByte(nextItem, 0) & 0xFF)); 269 return n; 270 } 271 272 @Override 273 public short getShortAfterPosition(int offset) { 274 checkRefCount(); 275 // Mostly the index specified will land within this current item. Short circuit for that 276 int index = offset + this.position(); 277 int itemIndex; 278 if (this.itemBeginPos[this.curItemIndex + 1] > index) { 279 itemIndex = this.curItemIndex; 280 } else { 281 itemIndex = getItemIndexFromCurItemIndex(index); 282 } 283 return getShort(index, itemIndex); 284 } 285 286 private int getInt(int index, int itemIndex) { 287 ByteBuffer item = items[itemIndex]; 288 int offsetInItem = index - this.itemBeginPos[itemIndex]; 289 int remainingLen = item.limit() - offsetInItem; 290 if (remainingLen >= Bytes.SIZEOF_INT) { 291 return ByteBufferUtils.toInt(item, offsetInItem); 292 } 293 if (items.length - 1 == itemIndex) { 294 // means cur item is the last one and we wont be able to read a int. Throw exception 295 throw new BufferUnderflowException(); 296 } 297 int l = 0; 298 for (int i = 0; i < Bytes.SIZEOF_INT; i++) { 299 l <<= 8; 300 l ^= get(index + i) & 0xFF; 301 } 302 return l; 303 } 304 305 private short getShort(int index, int itemIndex) { 306 ByteBuffer item = items[itemIndex]; 307 int offsetInItem = index - this.itemBeginPos[itemIndex]; 308 int remainingLen = item.limit() - offsetInItem; 309 if (remainingLen >= Bytes.SIZEOF_SHORT) { 310 return ByteBufferUtils.toShort(item, offsetInItem); 311 } 312 if (items.length - 1 == itemIndex) { 313 // means cur item is the last one and we wont be able to read a short. Throw exception 314 throw new BufferUnderflowException(); 315 } 316 ByteBuffer nextItem = items[itemIndex + 1]; 317 // Get available bytes from this item and remaining from next 318 short l = 0; 319 for (int i = offsetInItem; i < item.capacity(); i++) { 320 l = (short) (l << 8); 321 l = (short) (l ^ (ByteBufferUtils.toByte(item, i) & 0xFF)); 322 } 323 for (int i = 0; i < Bytes.SIZEOF_SHORT - remainingLen; i++) { 324 l = (short) (l << 8); 325 l = (short) (l ^ (ByteBufferUtils.toByte(nextItem, i) & 0xFF)); 326 } 327 return l; 328 } 329 330 private long getLong(int index, int itemIndex) { 331 ByteBuffer item = items[itemIndex]; 332 int offsetInItem = index - this.itemBeginPos[itemIndex]; 333 int remainingLen = item.limit() - offsetInItem; 334 if (remainingLen >= Bytes.SIZEOF_LONG) { 335 return ByteBufferUtils.toLong(item, offsetInItem); 336 } 337 if (items.length - 1 == itemIndex) { 338 // means cur item is the last one and we wont be able to read a long. Throw exception 339 throw new BufferUnderflowException(); 340 } 341 long l = 0; 342 for (int i = 0; i < Bytes.SIZEOF_LONG; i++) { 343 l <<= 8; 344 l ^= get(index + i) & 0xFF; 345 } 346 return l; 347 } 348 349 /** 350 * Fetches the long at the given index. Does not change position of the underlying ByteBuffers n 351 * * @return the long value at the given index 352 */ 353 @Override 354 public long getLong(int index) { 355 checkRefCount(); 356 // Mostly the index specified will land within this current item. Short circuit for that 357 int itemIndex; 358 if ( 359 this.itemBeginPos[this.curItemIndex] <= index 360 && this.itemBeginPos[this.curItemIndex + 1] > index 361 ) { 362 itemIndex = this.curItemIndex; 363 } else { 364 itemIndex = getItemIndex(index); 365 } 366 return getLong(index, itemIndex); 367 } 368 369 @Override 370 public long getLongAfterPosition(int offset) { 371 checkRefCount(); 372 // Mostly the index specified will land within this current item. Short circuit for that 373 int index = offset + this.position(); 374 int itemIndex; 375 if (this.itemBeginPos[this.curItemIndex + 1] > index) { 376 itemIndex = this.curItemIndex; 377 } else { 378 itemIndex = getItemIndexFromCurItemIndex(index); 379 } 380 return getLong(index, itemIndex); 381 } 382 383 /** Returns this MBB's current position */ 384 @Override 385 public int position() { 386 checkRefCount(); 387 return itemBeginPos[this.curItemIndex] + this.curItem.position(); 388 } 389 390 /** 391 * Sets this MBB's position to the given value. n * @return this object 392 */ 393 @Override 394 public MultiByteBuff position(int position) { 395 checkRefCount(); 396 // Short circuit for positioning within the cur item. Mostly that is the case. 397 if ( 398 this.itemBeginPos[this.curItemIndex] <= position 399 && this.itemBeginPos[this.curItemIndex + 1] > position 400 ) { 401 this.curItem.position(position - this.itemBeginPos[this.curItemIndex]); 402 return this; 403 } 404 int itemIndex = getItemIndex(position); 405 // All items from 0 - curItem-1 set position at end. 406 for (int i = 0; i < itemIndex; i++) { 407 this.items[i].position(this.items[i].limit()); 408 } 409 // All items after curItem set position at begin 410 for (int i = itemIndex + 1; i < this.items.length; i++) { 411 this.items[i].position(0); 412 } 413 this.curItem = this.items[itemIndex]; 414 this.curItem.position(position - this.itemBeginPos[itemIndex]); 415 this.curItemIndex = itemIndex; 416 return this; 417 } 418 419 /** 420 * Rewinds this MBB and the position is set to 0 421 * @return this object 422 */ 423 @Override 424 public MultiByteBuff rewind() { 425 checkRefCount(); 426 for (int i = 0; i < this.items.length; i++) { 427 this.items[i].rewind(); 428 } 429 this.curItemIndex = 0; 430 this.curItem = this.items[this.curItemIndex]; 431 this.markedItemIndex = -1; 432 return this; 433 } 434 435 /** 436 * Marks the current position of the MBB 437 * @return this object 438 */ 439 @Override 440 public MultiByteBuff mark() { 441 checkRefCount(); 442 this.markedItemIndex = this.curItemIndex; 443 this.curItem.mark(); 444 return this; 445 } 446 447 /** 448 * Similar to {@link ByteBuffer}.reset(), ensures that this MBB is reset back to last marked 449 * position. 450 * @return This MBB 451 */ 452 @Override 453 public MultiByteBuff reset() { 454 checkRefCount(); 455 // when the buffer is moved to the next one.. the reset should happen on the previous marked 456 // item and the new one should be taken as the base 457 if (this.markedItemIndex < 0) throw new InvalidMarkException(); 458 ByteBuffer markedItem = this.items[this.markedItemIndex]; 459 markedItem.reset(); 460 this.curItem = markedItem; 461 // All items after the marked position upto the current item should be reset to 0 462 for (int i = this.curItemIndex; i > this.markedItemIndex; i--) { 463 this.items[i].position(0); 464 } 465 this.curItemIndex = this.markedItemIndex; 466 return this; 467 } 468 469 /** 470 * Returns the number of elements between the current position and the limit. 471 * @return the remaining elements in this MBB 472 */ 473 @Override 474 public int remaining() { 475 checkRefCount(); 476 int remain = 0; 477 for (int i = curItemIndex; i < items.length; i++) { 478 remain += items[i].remaining(); 479 } 480 return remain; 481 } 482 483 /** 484 * Returns true if there are elements between the current position and the limt 485 * @return true if there are elements, false otherwise 486 */ 487 @Override 488 public final boolean hasRemaining() { 489 checkRefCount(); 490 return this.curItem.hasRemaining() || (this.curItemIndex < this.limitedItemIndex 491 && this.items[this.curItemIndex + 1].hasRemaining()); 492 } 493 494 /** 495 * A relative method that returns byte at the current position. Increments the current position by 496 * the size of a byte. 497 * @return the byte at the current position 498 */ 499 @Override 500 public byte get() { 501 checkRefCount(); 502 if (this.curItem.remaining() == 0) { 503 if (items.length - 1 == this.curItemIndex) { 504 // means cur item is the last one and we wont be able to read a long. Throw exception 505 throw new BufferUnderflowException(); 506 } 507 this.curItemIndex++; 508 this.curItem = this.items[this.curItemIndex]; 509 } 510 return this.curItem.get(); 511 } 512 513 /** 514 * Returns the short value at the current position. Also advances the position by the size of 515 * short 516 * @return the short value at the current position 517 */ 518 @Override 519 public short getShort() { 520 checkRefCount(); 521 int remaining = this.curItem.remaining(); 522 if (remaining >= Bytes.SIZEOF_SHORT) { 523 return this.curItem.getShort(); 524 } 525 short n = 0; 526 n = (short) (n ^ (get() & 0xFF)); 527 n = (short) (n << 8); 528 n = (short) (n ^ (get() & 0xFF)); 529 return n; 530 } 531 532 /** 533 * Returns the int value at the current position. Also advances the position by the size of int 534 * @return the int value at the current position 535 */ 536 @Override 537 public int getInt() { 538 checkRefCount(); 539 int remaining = this.curItem.remaining(); 540 if (remaining >= Bytes.SIZEOF_INT) { 541 return this.curItem.getInt(); 542 } 543 int n = 0; 544 for (int i = 0; i < Bytes.SIZEOF_INT; i++) { 545 n <<= 8; 546 n ^= get() & 0xFF; 547 } 548 return n; 549 } 550 551 /** 552 * Returns the long value at the current position. Also advances the position by the size of long 553 * @return the long value at the current position 554 */ 555 @Override 556 public long getLong() { 557 checkRefCount(); 558 int remaining = this.curItem.remaining(); 559 if (remaining >= Bytes.SIZEOF_LONG) { 560 return this.curItem.getLong(); 561 } 562 long l = 0; 563 for (int i = 0; i < Bytes.SIZEOF_LONG; i++) { 564 l <<= 8; 565 l ^= get() & 0xFF; 566 } 567 return l; 568 } 569 570 /** 571 * Copies the content from this MBB's current position to the byte array and fills it. Also 572 * advances the position of the MBB by the length of the byte[]. n 573 */ 574 @Override 575 public void get(byte[] dst) { 576 get(dst, 0, dst.length); 577 } 578 579 /** 580 * Copies the specified number of bytes from this MBB's current position to the byte[]'s offset. 581 * Also advances the position of the MBB by the given length. 582 */ 583 @Override 584 public void get(byte[] dst, int offset, int length) { 585 checkRefCount(); 586 while (length > 0) { 587 int toRead = Math.min(length, this.curItem.remaining()); 588 ByteBufferUtils.copyFromBufferToArray(dst, this.curItem, this.curItem.position(), offset, 589 toRead); 590 this.curItem.position(this.curItem.position() + toRead); 591 length -= toRead; 592 if (length == 0) break; 593 this.curItemIndex++; 594 this.curItem = this.items[this.curItemIndex]; 595 offset += toRead; 596 } 597 } 598 599 @Override 600 public void get(int sourceOffset, byte[] dst, int offset, int length) { 601 checkRefCount(); 602 int itemIndex = getItemIndex(sourceOffset); 603 ByteBuffer item = this.items[itemIndex]; 604 sourceOffset = sourceOffset - this.itemBeginPos[itemIndex]; 605 while (length > 0) { 606 int toRead = Math.min((item.limit() - sourceOffset), length); 607 ByteBufferUtils.copyFromBufferToArray(dst, item, sourceOffset, offset, toRead); 608 length -= toRead; 609 if (length == 0) break; 610 itemIndex++; 611 item = this.items[itemIndex]; 612 offset += toRead; 613 sourceOffset = 0; 614 } 615 } 616 617 /** 618 * Marks the limit of this MBB. n * @return This MBB 619 */ 620 @Override 621 public MultiByteBuff limit(int limit) { 622 checkRefCount(); 623 this.limit = limit; 624 // Normally the limit will try to limit within the last BB item 625 int limitedIndexBegin = this.itemBeginPos[this.limitedItemIndex]; 626 if (limit >= limitedIndexBegin && limit < this.itemBeginPos[this.limitedItemIndex + 1]) { 627 this.items[this.limitedItemIndex].limit(limit - limitedIndexBegin); 628 return this; 629 } 630 int itemIndex = getItemIndex(limit); 631 int beginOffset = this.itemBeginPos[itemIndex]; 632 int offsetInItem = limit - beginOffset; 633 ByteBuffer item = items[itemIndex]; 634 item.limit(offsetInItem); 635 for (int i = this.limitedItemIndex; i < itemIndex; i++) { 636 this.items[i].limit(this.items[i].capacity()); 637 } 638 this.limitedItemIndex = itemIndex; 639 for (int i = itemIndex + 1; i < this.items.length; i++) { 640 this.items[i].limit(this.items[i].position()); 641 } 642 return this; 643 } 644 645 /** 646 * Returns the limit of this MBB 647 * @return limit of the MBB 648 */ 649 @Override 650 public int limit() { 651 return this.limit; 652 } 653 654 /** 655 * Returns an MBB which is a sliced version of this MBB. The position, limit and mark of the new 656 * MBB will be independent than that of the original MBB. The content of the new MBB will start at 657 * this MBB's current position 658 * @return a sliced MBB 659 */ 660 @Override 661 public MultiByteBuff slice() { 662 checkRefCount(); 663 ByteBuffer[] copy = new ByteBuffer[this.limitedItemIndex - this.curItemIndex + 1]; 664 for (int i = curItemIndex, j = 0; i <= this.limitedItemIndex; i++, j++) { 665 copy[j] = this.items[i].slice(); 666 } 667 return new MultiByteBuff(refCnt, copy); 668 } 669 670 /** 671 * Returns an MBB which is a duplicate version of this MBB. The position, limit and mark of the 672 * new MBB will be independent than that of the original MBB. The content of the new MBB will 673 * start at this MBB's current position The position, limit and mark of the new MBB would be 674 * identical to this MBB in terms of values. 675 * @return a duplicated MBB 676 */ 677 @Override 678 public MultiByteBuff duplicate() { 679 checkRefCount(); 680 ByteBuffer[] itemsCopy = new ByteBuffer[this.items.length]; 681 for (int i = 0; i < this.items.length; i++) { 682 itemsCopy[i] = items[i].duplicate(); 683 } 684 return new MultiByteBuff(refCnt, itemsCopy, this.itemBeginPos, this.limit, 685 this.limitedItemIndex, this.curItemIndex, this.markedItemIndex); 686 } 687 688 /** 689 * Writes a byte to this MBB at the current position and increments the position n * @return this 690 * object 691 */ 692 @Override 693 public MultiByteBuff put(byte b) { 694 checkRefCount(); 695 if (this.curItem.remaining() == 0) { 696 if (this.curItemIndex == this.items.length - 1) { 697 throw new BufferOverflowException(); 698 } 699 this.curItemIndex++; 700 this.curItem = this.items[this.curItemIndex]; 701 } 702 this.curItem.put(b); 703 return this; 704 } 705 706 /** 707 * Writes a byte to this MBB at the given index and won't affect the position of any of the 708 * buffers. 709 * @return this object 710 * @throws IndexOutOfBoundsException If <tt>index</tt> is negative or not smaller than the 711 * {@link MultiByteBuff#limit} 712 */ 713 @Override 714 public MultiByteBuff put(int index, byte b) { 715 checkRefCount(); 716 int itemIndex = getItemIndex(index); 717 ByteBuffer item = items[itemIndex]; 718 item.put(index - itemBeginPos[itemIndex], b); 719 return this; 720 } 721 722 /** 723 * Copies from a src BB to this MBB. This will be absolute positional copying and won't affect the 724 * position of any of the buffers. 725 * @param destOffset the position in this MBB to which the copy should happen 726 * @param src the src MBB 727 * @param srcOffset the offset in the src MBB from where the elements should be read 728 * @param length the length upto which the copy should happen 729 * @throws BufferUnderflowException If there are fewer than length bytes remaining in src 730 * ByteBuff. 731 * @throws BufferOverflowException If there is insufficient available space in this MBB for 732 * length bytes. 733 */ 734 @Override 735 public MultiByteBuff put(int destOffset, ByteBuff src, int srcOffset, int length) { 736 checkRefCount(); 737 int destItemIndex = getItemIndex(destOffset); 738 int srcItemIndex = getItemIndexForByteBuff(src, srcOffset, length); 739 740 ByteBuffer destItem = this.items[destItemIndex]; 741 destOffset = this.getRelativeOffset(destOffset, destItemIndex); 742 743 ByteBuffer srcItem = getItemByteBuffer(src, srcItemIndex); 744 srcOffset = getRelativeOffsetForByteBuff(src, srcOffset, srcItemIndex); 745 746 while (length > 0) { 747 int toWrite = destItem.limit() - destOffset; 748 if (toWrite <= 0) { 749 throw new BufferOverflowException(); 750 } 751 int toRead = srcItem.limit() - srcOffset; 752 if (toRead <= 0) { 753 throw new BufferUnderflowException(); 754 } 755 int toMove = Math.min(length, Math.min(toRead, toWrite)); 756 ByteBufferUtils.copyFromBufferToBuffer(srcItem, destItem, srcOffset, destOffset, toMove); 757 length -= toMove; 758 if (length == 0) { 759 break; 760 } 761 if (toRead < toWrite) { 762 if (++srcItemIndex >= getItemByteBufferCount(src)) { 763 throw new BufferUnderflowException(); 764 } 765 srcItem = getItemByteBuffer(src, srcItemIndex); 766 srcOffset = 0; 767 destOffset += toMove; 768 } else if (toRead > toWrite) { 769 if (++destItemIndex >= this.items.length) { 770 throw new BufferOverflowException(); 771 } 772 destItem = this.items[destItemIndex]; 773 destOffset = 0; 774 srcOffset += toMove; 775 } else { 776 // toRead = toWrite case 777 if (++srcItemIndex >= getItemByteBufferCount(src)) { 778 throw new BufferUnderflowException(); 779 } 780 srcItem = getItemByteBuffer(src, srcItemIndex); 781 srcOffset = 0; 782 if (++destItemIndex >= this.items.length) { 783 throw new BufferOverflowException(); 784 } 785 destItem = this.items[destItemIndex]; 786 destOffset = 0; 787 } 788 } 789 return this; 790 } 791 792 private static ByteBuffer getItemByteBuffer(ByteBuff buf, int byteBufferIndex) { 793 if (buf instanceof SingleByteBuff) { 794 if (byteBufferIndex != 0) { 795 throw new IndexOutOfBoundsException( 796 "index:[" + byteBufferIndex + "],but only index 0 is valid."); 797 } 798 return buf.nioByteBuffers()[0]; 799 } 800 MultiByteBuff multiByteBuff = (MultiByteBuff) buf; 801 if (byteBufferIndex < 0 || byteBufferIndex >= multiByteBuff.items.length) { 802 throw new IndexOutOfBoundsException("index:[" + byteBufferIndex + "],but only index [0-" 803 + multiByteBuff.items.length + ") is valid."); 804 } 805 return multiByteBuff.items[byteBufferIndex]; 806 } 807 808 private static int getItemIndexForByteBuff(ByteBuff byteBuff, int offset, int length) { 809 if (byteBuff instanceof SingleByteBuff) { 810 ByteBuffer byteBuffer = byteBuff.nioByteBuffers()[0]; 811 if (offset + length > byteBuffer.limit()) { 812 throw new BufferUnderflowException(); 813 } 814 return 0; 815 } 816 MultiByteBuff multiByteBuff = (MultiByteBuff) byteBuff; 817 return multiByteBuff.getItemIndex(offset); 818 } 819 820 private static int getRelativeOffsetForByteBuff(ByteBuff byteBuff, int globalOffset, 821 int itemIndex) { 822 if (byteBuff instanceof SingleByteBuff) { 823 if (itemIndex != 0) { 824 throw new IndexOutOfBoundsException("index:[" + itemIndex + "],but only index 0 is valid."); 825 } 826 return globalOffset; 827 } 828 return ((MultiByteBuff) byteBuff).getRelativeOffset(globalOffset, itemIndex); 829 } 830 831 private int getRelativeOffset(int globalOffset, int itemIndex) { 832 if (itemIndex < 0 || itemIndex >= this.items.length) { 833 throw new IndexOutOfBoundsException( 834 "index:[" + itemIndex + "],but only index [0-" + this.items.length + ") is valid."); 835 } 836 return globalOffset - this.itemBeginPos[itemIndex]; 837 } 838 839 private static int getItemByteBufferCount(ByteBuff buf) { 840 return (buf instanceof SingleByteBuff) ? 1 : ((MultiByteBuff) buf).items.length; 841 } 842 843 /** 844 * Writes an int to this MBB at its current position. Also advances the position by size of int 845 * @param val Int value to write 846 * @return this object 847 */ 848 @Override 849 public MultiByteBuff putInt(int val) { 850 checkRefCount(); 851 if (this.curItem.remaining() >= Bytes.SIZEOF_INT) { 852 this.curItem.putInt(val); 853 return this; 854 } 855 if (this.curItemIndex == this.items.length - 1) { 856 throw new BufferOverflowException(); 857 } 858 // During read, we will read as byte by byte for this case. So just write in Big endian 859 put(int3(val)); 860 put(int2(val)); 861 put(int1(val)); 862 put(int0(val)); 863 return this; 864 } 865 866 private static byte int3(int x) { 867 return (byte) (x >> 24); 868 } 869 870 private static byte int2(int x) { 871 return (byte) (x >> 16); 872 } 873 874 private static byte int1(int x) { 875 return (byte) (x >> 8); 876 } 877 878 private static byte int0(int x) { 879 return (byte) x; 880 } 881 882 /** Copies from the given byte[] to this MBB */ 883 @Override 884 public final MultiByteBuff put(byte[] src) { 885 return put(src, 0, src.length); 886 } 887 888 /** Copies from the given byte[] to this MBB. */ 889 @Override 890 public MultiByteBuff put(byte[] src, int offset, int length) { 891 checkRefCount(); 892 if (this.curItem.remaining() >= length) { 893 ByteBufferUtils.copyFromArrayToBuffer(this.curItem, src, offset, length); 894 return this; 895 } 896 int end = offset + length; 897 for (int i = offset; i < end; i++) { 898 this.put(src[i]); 899 } 900 return this; 901 } 902 903 /** 904 * Writes a long to this MBB at its current position. Also advances the position by size of long 905 * @param val Long value to write 906 * @return this object 907 */ 908 @Override 909 public MultiByteBuff putLong(long val) { 910 checkRefCount(); 911 if (this.curItem.remaining() >= Bytes.SIZEOF_LONG) { 912 this.curItem.putLong(val); 913 return this; 914 } 915 if (this.curItemIndex == this.items.length - 1) { 916 throw new BufferOverflowException(); 917 } 918 // During read, we will read as byte by byte for this case. So just write in Big endian 919 put(long7(val)); 920 put(long6(val)); 921 put(long5(val)); 922 put(long4(val)); 923 put(long3(val)); 924 put(long2(val)); 925 put(long1(val)); 926 put(long0(val)); 927 return this; 928 } 929 930 private static byte long7(long x) { 931 return (byte) (x >> 56); 932 } 933 934 private static byte long6(long x) { 935 return (byte) (x >> 48); 936 } 937 938 private static byte long5(long x) { 939 return (byte) (x >> 40); 940 } 941 942 private static byte long4(long x) { 943 return (byte) (x >> 32); 944 } 945 946 private static byte long3(long x) { 947 return (byte) (x >> 24); 948 } 949 950 private static byte long2(long x) { 951 return (byte) (x >> 16); 952 } 953 954 private static byte long1(long x) { 955 return (byte) (x >> 8); 956 } 957 958 private static byte long0(long x) { 959 return (byte) x; 960 } 961 962 /** 963 * Jumps the current position of this MBB by specified length. n 964 */ 965 @Override 966 public MultiByteBuff skip(int length) { 967 checkRefCount(); 968 // Get available bytes from this item and remaining from next 969 int jump = 0; 970 while (true) { 971 jump = this.curItem.remaining(); 972 if (jump >= length) { 973 this.curItem.position(this.curItem.position() + length); 974 break; 975 } 976 this.curItem.position(this.curItem.position() + jump); 977 length -= jump; 978 this.curItemIndex++; 979 this.curItem = this.items[this.curItemIndex]; 980 } 981 return this; 982 } 983 984 /** 985 * Jumps back the current position of this MBB by specified length. n 986 */ 987 @Override 988 public MultiByteBuff moveBack(int length) { 989 checkRefCount(); 990 while (length != 0) { 991 if (length > curItem.position()) { 992 length -= curItem.position(); 993 this.curItem.position(0); 994 this.curItemIndex--; 995 this.curItem = this.items[curItemIndex]; 996 } else { 997 this.curItem.position(curItem.position() - length); 998 break; 999 } 1000 } 1001 return this; 1002 } 1003 1004 /** 1005 * Returns bytes from current position till length specified, as a single ByteBuffer. When all 1006 * these bytes happen to be in a single ByteBuffer, which this object wraps, that ByteBuffer item 1007 * as such will be returned. So users are warned not to change the position or limit of this 1008 * returned ByteBuffer. The position of the returned byte buffer is at the begin of the required 1009 * bytes. When the required bytes happen to span across multiple ByteBuffers, this API will copy 1010 * the bytes to a newly created ByteBuffer of required size and return that. 1011 * @param length number of bytes required. 1012 * @return bytes from current position till length specified, as a single ByteButter. 1013 */ 1014 @Override 1015 public ByteBuffer asSubByteBuffer(int length) { 1016 checkRefCount(); 1017 if (this.curItem.remaining() >= length) { 1018 return this.curItem; 1019 } 1020 int offset = 0; 1021 byte[] dupB = new byte[length]; 1022 int locCurItemIndex = curItemIndex; 1023 ByteBuffer locCurItem = curItem; 1024 while (length > 0) { 1025 int toRead = Math.min(length, locCurItem.remaining()); 1026 ByteBufferUtils.copyFromBufferToArray(dupB, locCurItem, locCurItem.position(), offset, 1027 toRead); 1028 length -= toRead; 1029 if (length == 0) break; 1030 locCurItemIndex++; 1031 locCurItem = this.items[locCurItemIndex]; 1032 offset += toRead; 1033 } 1034 return ByteBuffer.wrap(dupB); 1035 } 1036 1037 /** 1038 * Returns bytes from given offset till length specified, as a single ByteBuffer. When all these 1039 * bytes happen to be in a single ByteBuffer, which this object wraps, that ByteBuffer item as 1040 * such will be returned (with offset in this ByteBuffer where the bytes starts). So users are 1041 * warned not to change the position or limit of this returned ByteBuffer. When the required bytes 1042 * happen to span across multiple ByteBuffers, this API will copy the bytes to a newly created 1043 * ByteBuffer of required size and return that. 1044 * @param offset the offset in this MBB from where the subBuffer should be created 1045 * @param length the length of the subBuffer 1046 * @param pair a pair that will have the bytes from the current position till length specified, 1047 * as a single ByteBuffer and offset in that Buffer where the bytes starts. The 1048 * method would set the values on the pair that is passed in by the caller 1049 */ 1050 @Override 1051 public void asSubByteBuffer(int offset, int length, ObjectIntPair<ByteBuffer> pair) { 1052 checkRefCount(); 1053 if (this.itemBeginPos[this.curItemIndex] <= offset) { 1054 int relOffsetInCurItem = offset - this.itemBeginPos[this.curItemIndex]; 1055 if (this.curItem.limit() - relOffsetInCurItem >= length) { 1056 pair.setFirst(this.curItem); 1057 pair.setSecond(relOffsetInCurItem); 1058 return; 1059 } 1060 } 1061 int itemIndex = getItemIndex(offset); 1062 ByteBuffer item = this.items[itemIndex]; 1063 offset = offset - this.itemBeginPos[itemIndex]; 1064 if (item.limit() - offset >= length) { 1065 pair.setFirst(item); 1066 pair.setSecond(offset); 1067 return; 1068 } 1069 byte[] dst = new byte[length]; 1070 int destOffset = 0; 1071 while (length > 0) { 1072 int toRead = Math.min(length, item.limit() - offset); 1073 ByteBufferUtils.copyFromBufferToArray(dst, item, offset, destOffset, toRead); 1074 length -= toRead; 1075 if (length == 0) break; 1076 itemIndex++; 1077 item = this.items[itemIndex]; 1078 destOffset += toRead; 1079 offset = 0; 1080 } 1081 pair.setFirst(ByteBuffer.wrap(dst)); 1082 pair.setSecond(0); 1083 } 1084 1085 /** 1086 * Copies the content from an this MBB to a ByteBuffer 1087 * @param out the ByteBuffer to which the copy has to happen, its position will be 1088 * advanced. 1089 * @param sourceOffset the offset in the MBB from which the elements has to be copied 1090 * @param length the length in the MBB upto which the elements has to be copied 1091 */ 1092 @Override 1093 public void get(ByteBuffer out, int sourceOffset, int length) { 1094 checkRefCount(); 1095 int itemIndex = getItemIndex(sourceOffset); 1096 ByteBuffer in = this.items[itemIndex]; 1097 sourceOffset = sourceOffset - this.itemBeginPos[itemIndex]; 1098 while (length > 0) { 1099 int toRead = Math.min(in.limit() - sourceOffset, length); 1100 ByteBufferUtils.copyFromBufferToBuffer(in, out, sourceOffset, toRead); 1101 length -= toRead; 1102 if (length == 0) { 1103 break; 1104 } 1105 itemIndex++; 1106 in = this.items[itemIndex]; 1107 sourceOffset = 0; 1108 } 1109 } 1110 1111 /** 1112 * Copy the content from this MBB to a byte[] based on the given offset and length n * the 1113 * position from where the copy should start n * the length upto which the copy has to be done 1114 * @return byte[] with the copied contents from this MBB. 1115 */ 1116 @Override 1117 public byte[] toBytes(int offset, int length) { 1118 checkRefCount(); 1119 byte[] output = new byte[length]; 1120 this.get(offset, output, 0, length); 1121 return output; 1122 } 1123 1124 private int internalRead(ReadableByteChannel channel, long offset, ChannelReader reader) 1125 throws IOException { 1126 checkRefCount(); 1127 int total = 0; 1128 while (buffsIterator.hasNext()) { 1129 ByteBuffer buffer = buffsIterator.next(); 1130 int len = read(channel, buffer, offset, reader); 1131 if (len > 0) { 1132 total += len; 1133 offset += len; 1134 } 1135 if (buffer.hasRemaining()) { 1136 // reset 1137 curItem = buffer; 1138 curItemIndex = (curItemIndex - 1); 1139 break; 1140 } 1141 } 1142 return total; 1143 } 1144 1145 @Override 1146 public int read(ReadableByteChannel channel) throws IOException { 1147 return internalRead(channel, 0, CHANNEL_READER); 1148 } 1149 1150 @Override 1151 public int read(FileChannel channel, long offset) throws IOException { 1152 return internalRead(channel, offset, FILE_READER); 1153 } 1154 1155 @Override 1156 public int write(FileChannel channel, long offset) throws IOException { 1157 checkRefCount(); 1158 int total = 0; 1159 while (buffsIterator.hasNext()) { 1160 ByteBuffer buffer = buffsIterator.next(); 1161 while (buffer.hasRemaining()) { 1162 int len = channel.write(buffer, offset); 1163 total += len; 1164 offset += len; 1165 } 1166 } 1167 return total; 1168 } 1169 1170 @Override 1171 public ByteBuffer[] nioByteBuffers() { 1172 checkRefCount(); 1173 return this.items; 1174 } 1175 1176 @Override 1177 public boolean equals(Object obj) { 1178 if (!(obj instanceof MultiByteBuff)) return false; 1179 if (this == obj) return true; 1180 MultiByteBuff that = (MultiByteBuff) obj; 1181 if (this.capacity() != that.capacity()) return false; 1182 if ( 1183 ByteBuff.compareTo(this, this.position(), this.limit(), that, that.position(), that.limit()) 1184 == 0 1185 ) { 1186 return true; 1187 } 1188 return false; 1189 } 1190 1191 @Override 1192 public int hashCode() { 1193 int hash = 0; 1194 for (ByteBuffer b : this.items) { 1195 hash += b.hashCode(); 1196 } 1197 return hash; 1198 } 1199 1200 @Override 1201 public MultiByteBuff retain() { 1202 refCnt.retain(); 1203 return this; 1204 } 1205}