001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.nio; 019 020import static org.apache.hadoop.hbase.io.ByteBuffAllocator.NONE; 021 022import java.io.IOException; 023import java.nio.BufferOverflowException; 024import java.nio.BufferUnderflowException; 025import java.nio.ByteBuffer; 026import java.nio.InvalidMarkException; 027import java.nio.channels.FileChannel; 028import java.nio.channels.ReadableByteChannel; 029import java.util.Iterator; 030import java.util.NoSuchElementException; 031 032import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler; 033import org.apache.hadoop.hbase.util.ByteBufferUtils; 034import org.apache.hadoop.hbase.util.Bytes; 035import org.apache.hadoop.hbase.util.ObjectIntPair; 036import org.apache.yetus.audience.InterfaceAudience; 037 038/** 039 * Provides a unified view of all the underlying ByteBuffers and will look as if a bigger 040 * sequential buffer. This class provides similar APIs as in {@link ByteBuffer} to put/get int, 041 * short, long etc and doing operations like mark, reset, slice etc. This has to be used when 042 * data is split across multiple byte buffers and we don't want copy them to single buffer 043 * for reading from it. 044 */ 045@InterfaceAudience.Private 046public class MultiByteBuff extends ByteBuff { 047 048 private final ByteBuffer[] items; 049 // Pointer to the current item in the MBB 050 private ByteBuffer curItem = null; 051 // Index of the current item in the MBB 052 private int curItemIndex = 0; 053 054 private int limit = 0; 055 private int limitedItemIndex; 056 private int markedItemIndex = -1; 057 private final int[] itemBeginPos; 058 059 private Iterator<ByteBuffer> buffsIterator = new Iterator<ByteBuffer>() { 060 @Override 061 public boolean hasNext() { 062 return curItemIndex < limitedItemIndex || 063 (curItemIndex == limitedItemIndex && items[curItemIndex].hasRemaining()); 064 } 065 066 @Override 067 public ByteBuffer next() { 068 if (curItemIndex >= items.length) { 069 throw new NoSuchElementException("items overflow"); 070 } 071 curItem = items[curItemIndex++]; 072 return curItem; 073 } 074 }; 075 076 public MultiByteBuff(ByteBuffer... items) { 077 this(NONE, items); 078 } 079 080 public MultiByteBuff(Recycler recycler, ByteBuffer... items) { 081 this(new RefCnt(recycler), items); 082 } 083 084 MultiByteBuff(RefCnt refCnt, ByteBuffer... items) { 085 this.refCnt = refCnt; 086 assert items != null; 087 assert items.length > 0; 088 this.items = items; 089 this.curItem = this.items[this.curItemIndex]; 090 // See below optimization in getInt(int) where we check whether the given index land in current 091 // item. For this we need to check whether the passed index is less than the next item begin 092 // offset. To handle this effectively for the last item buffer, we add an extra item into this 093 // array. 094 itemBeginPos = new int[items.length + 1]; 095 int offset = 0; 096 for (int i = 0; i < items.length; i++) { 097 ByteBuffer item = items[i]; 098 item.rewind(); 099 itemBeginPos[i] = offset; 100 int l = item.limit() - item.position(); 101 offset += l; 102 } 103 this.limit = offset; 104 this.itemBeginPos[items.length] = offset + 1; 105 this.limitedItemIndex = this.items.length - 1; 106 } 107 108 private MultiByteBuff(RefCnt refCnt, ByteBuffer[] items, int[] itemBeginPos, int limit, 109 int limitedIndex, int curItemIndex, int markedIndex) { 110 this.refCnt = refCnt; 111 this.items = items; 112 this.curItemIndex = curItemIndex; 113 this.curItem = this.items[this.curItemIndex]; 114 this.itemBeginPos = itemBeginPos; 115 this.limit = limit; 116 this.limitedItemIndex = limitedIndex; 117 this.markedItemIndex = markedIndex; 118 } 119 120 /** 121 * @throws UnsupportedOperationException MBB does not support 122 * array based operations 123 */ 124 @Override 125 public byte[] array() { 126 throw new UnsupportedOperationException(); 127 } 128 129 /** 130 * @throws UnsupportedOperationException MBB does not 131 * support array based operations 132 */ 133 @Override 134 public int arrayOffset() { 135 throw new UnsupportedOperationException(); 136 } 137 138 /** 139 * @return false. MBB does not support array based operations 140 */ 141 @Override 142 public boolean hasArray() { 143 return false; 144 } 145 146 /** 147 * @return the total capacity of this MultiByteBuffer. 148 */ 149 @Override 150 public int capacity() { 151 checkRefCount(); 152 int c = 0; 153 for (ByteBuffer item : this.items) { 154 c += item.capacity(); 155 } 156 return c; 157 } 158 159 /** 160 * Fetches the byte at the given index. Does not change position of the underlying ByteBuffers 161 * @param index 162 * @return the byte at the given index 163 */ 164 @Override 165 public byte get(int index) { 166 checkRefCount(); 167 int itemIndex = getItemIndex(index); 168 return ByteBufferUtils.toByte(this.items[itemIndex], index - this.itemBeginPos[itemIndex]); 169 } 170 171 @Override 172 public byte getByteAfterPosition(int offset) { 173 checkRefCount(); 174 // Mostly the index specified will land within this current item. Short circuit for that 175 int index = offset + this.position(); 176 int itemIndex = getItemIndexFromCurItemIndex(index); 177 return ByteBufferUtils.toByte(this.items[itemIndex], index - this.itemBeginPos[itemIndex]); 178 } 179 180 /* 181 * Returns in which sub ByteBuffer, the given element index will be available. 182 */ 183 private int getItemIndex(int elemIndex) { 184 int index = 1; 185 while (elemIndex >= this.itemBeginPos[index]) { 186 index++; 187 if (index == this.itemBeginPos.length) { 188 throw new IndexOutOfBoundsException(); 189 } 190 } 191 return index - 1; 192 } 193 194 /* 195 * Returns in which sub ByteBuffer, the given element index will be available. In this case we are 196 * sure that the item will be after MBB's current position 197 */ 198 private int getItemIndexFromCurItemIndex(int elemIndex) { 199 int index = this.curItemIndex; 200 while (elemIndex >= this.itemBeginPos[index]) { 201 index++; 202 if (index == this.itemBeginPos.length) { 203 throw new IndexOutOfBoundsException(); 204 } 205 } 206 return index - 1; 207 } 208 209 /** 210 * Fetches the int at the given index. Does not change position of the underlying ByteBuffers 211 * @param index 212 * @return the int value at the given index 213 */ 214 @Override 215 public int getInt(int index) { 216 checkRefCount(); 217 // Mostly the index specified will land within this current item. Short circuit for that 218 int itemIndex; 219 if (this.itemBeginPos[this.curItemIndex] <= index 220 && this.itemBeginPos[this.curItemIndex + 1] > index) { 221 itemIndex = this.curItemIndex; 222 } else { 223 itemIndex = getItemIndex(index); 224 } 225 return getInt(index, itemIndex); 226 } 227 228 @Override 229 public int getIntAfterPosition(int offset) { 230 checkRefCount(); 231 // Mostly the index specified will land within this current item. Short circuit for that 232 int index = offset + this.position(); 233 int itemIndex; 234 if (this.itemBeginPos[this.curItemIndex + 1] > index) { 235 itemIndex = this.curItemIndex; 236 } else { 237 itemIndex = getItemIndexFromCurItemIndex(index); 238 } 239 return getInt(index, itemIndex); 240 } 241 242 /** 243 * Fetches the short at the given index. Does not change position of the underlying ByteBuffers 244 * @param index 245 * @return the short value at the given index 246 */ 247 @Override 248 public short getShort(int index) { 249 checkRefCount(); 250 // Mostly the index specified will land within this current item. Short circuit for that 251 int itemIndex; 252 if (this.itemBeginPos[this.curItemIndex] <= index 253 && this.itemBeginPos[this.curItemIndex + 1] > index) { 254 itemIndex = this.curItemIndex; 255 } else { 256 itemIndex = getItemIndex(index); 257 } 258 ByteBuffer item = items[itemIndex]; 259 int offsetInItem = index - this.itemBeginPos[itemIndex]; 260 if (item.limit() - offsetInItem >= Bytes.SIZEOF_SHORT) { 261 return ByteBufferUtils.toShort(item, offsetInItem); 262 } 263 if (items.length - 1 == itemIndex) { 264 // means cur item is the last one and we wont be able to read a int. Throw exception 265 throw new BufferUnderflowException(); 266 } 267 ByteBuffer nextItem = items[itemIndex + 1]; 268 // Get available one byte from this item and remaining one from next 269 short n = 0; 270 n = (short) (n ^ (ByteBufferUtils.toByte(item, offsetInItem) & 0xFF)); 271 n = (short) (n << 8); 272 n = (short) (n ^ (ByteBufferUtils.toByte(nextItem, 0) & 0xFF)); 273 return n; 274 } 275 276 @Override 277 public short getShortAfterPosition(int offset) { 278 checkRefCount(); 279 // Mostly the index specified will land within this current item. Short circuit for that 280 int index = offset + this.position(); 281 int itemIndex; 282 if (this.itemBeginPos[this.curItemIndex + 1] > index) { 283 itemIndex = this.curItemIndex; 284 } else { 285 itemIndex = getItemIndexFromCurItemIndex(index); 286 } 287 return getShort(index, itemIndex); 288 } 289 290 private int getInt(int index, int itemIndex) { 291 ByteBuffer item = items[itemIndex]; 292 int offsetInItem = index - this.itemBeginPos[itemIndex]; 293 int remainingLen = item.limit() - offsetInItem; 294 if (remainingLen >= Bytes.SIZEOF_INT) { 295 return ByteBufferUtils.toInt(item, offsetInItem); 296 } 297 if (items.length - 1 == itemIndex) { 298 // means cur item is the last one and we wont be able to read a int. Throw exception 299 throw new BufferUnderflowException(); 300 } 301 int l = 0; 302 for (int i = 0; i < Bytes.SIZEOF_INT; i++) { 303 l <<= 8; 304 l ^= get(index + i) & 0xFF; 305 } 306 return l; 307 } 308 309 private short getShort(int index, int itemIndex) { 310 ByteBuffer item = items[itemIndex]; 311 int offsetInItem = index - this.itemBeginPos[itemIndex]; 312 int remainingLen = item.limit() - offsetInItem; 313 if (remainingLen >= Bytes.SIZEOF_SHORT) { 314 return ByteBufferUtils.toShort(item, offsetInItem); 315 } 316 if (items.length - 1 == itemIndex) { 317 // means cur item is the last one and we wont be able to read a short. Throw exception 318 throw new BufferUnderflowException(); 319 } 320 ByteBuffer nextItem = items[itemIndex + 1]; 321 // Get available bytes from this item and remaining from next 322 short l = 0; 323 for (int i = offsetInItem; i < item.capacity(); i++) { 324 l = (short) (l << 8); 325 l = (short) (l ^ (ByteBufferUtils.toByte(item, i) & 0xFF)); 326 } 327 for (int i = 0; i < Bytes.SIZEOF_SHORT - remainingLen; i++) { 328 l = (short) (l << 8); 329 l = (short) (l ^ (ByteBufferUtils.toByte(nextItem, i) & 0xFF)); 330 } 331 return l; 332 } 333 334 private long getLong(int index, int itemIndex) { 335 ByteBuffer item = items[itemIndex]; 336 int offsetInItem = index - this.itemBeginPos[itemIndex]; 337 int remainingLen = item.limit() - offsetInItem; 338 if (remainingLen >= Bytes.SIZEOF_LONG) { 339 return ByteBufferUtils.toLong(item, offsetInItem); 340 } 341 if (items.length - 1 == itemIndex) { 342 // means cur item is the last one and we wont be able to read a long. Throw exception 343 throw new BufferUnderflowException(); 344 } 345 long l = 0; 346 for (int i = 0; i < Bytes.SIZEOF_LONG; i++) { 347 l <<= 8; 348 l ^= get(index + i) & 0xFF; 349 } 350 return l; 351 } 352 353 /** 354 * Fetches the long at the given index. Does not change position of the underlying ByteBuffers 355 * @param index 356 * @return the long value at the given index 357 */ 358 @Override 359 public long getLong(int index) { 360 checkRefCount(); 361 // Mostly the index specified will land within this current item. Short circuit for that 362 int itemIndex; 363 if (this.itemBeginPos[this.curItemIndex] <= index 364 && this.itemBeginPos[this.curItemIndex + 1] > index) { 365 itemIndex = this.curItemIndex; 366 } else { 367 itemIndex = getItemIndex(index); 368 } 369 return getLong(index, itemIndex); 370 } 371 372 @Override 373 public long getLongAfterPosition(int offset) { 374 checkRefCount(); 375 // Mostly the index specified will land within this current item. Short circuit for that 376 int index = offset + this.position(); 377 int itemIndex; 378 if (this.itemBeginPos[this.curItemIndex + 1] > index) { 379 itemIndex = this.curItemIndex; 380 } else { 381 itemIndex = getItemIndexFromCurItemIndex(index); 382 } 383 return getLong(index, itemIndex); 384 } 385 386 /** 387 * @return this MBB's current position 388 */ 389 @Override 390 public int position() { 391 checkRefCount(); 392 return itemBeginPos[this.curItemIndex] + this.curItem.position(); 393 } 394 395 /** 396 * Sets this MBB's position to the given value. 397 * @param position 398 * @return this object 399 */ 400 @Override 401 public MultiByteBuff position(int position) { 402 checkRefCount(); 403 // Short circuit for positioning within the cur item. Mostly that is the case. 404 if (this.itemBeginPos[this.curItemIndex] <= position 405 && this.itemBeginPos[this.curItemIndex + 1] > position) { 406 this.curItem.position(position - this.itemBeginPos[this.curItemIndex]); 407 return this; 408 } 409 int itemIndex = getItemIndex(position); 410 // All items from 0 - curItem-1 set position at end. 411 for (int i = 0; i < itemIndex; i++) { 412 this.items[i].position(this.items[i].limit()); 413 } 414 // All items after curItem set position at begin 415 for (int i = itemIndex + 1; i < this.items.length; i++) { 416 this.items[i].position(0); 417 } 418 this.curItem = this.items[itemIndex]; 419 this.curItem.position(position - this.itemBeginPos[itemIndex]); 420 this.curItemIndex = itemIndex; 421 return this; 422 } 423 424 /** 425 * Rewinds this MBB and the position is set to 0 426 * @return this object 427 */ 428 @Override 429 public MultiByteBuff rewind() { 430 checkRefCount(); 431 for (int i = 0; i < this.items.length; i++) { 432 this.items[i].rewind(); 433 } 434 this.curItemIndex = 0; 435 this.curItem = this.items[this.curItemIndex]; 436 this.markedItemIndex = -1; 437 return this; 438 } 439 440 /** 441 * Marks the current position of the MBB 442 * @return this object 443 */ 444 @Override 445 public MultiByteBuff mark() { 446 checkRefCount(); 447 this.markedItemIndex = this.curItemIndex; 448 this.curItem.mark(); 449 return this; 450 } 451 452 /** 453 * Similar to {@link ByteBuffer}.reset(), ensures that this MBB 454 * is reset back to last marked position. 455 * @return This MBB 456 */ 457 @Override 458 public MultiByteBuff reset() { 459 checkRefCount(); 460 // when the buffer is moved to the next one.. the reset should happen on the previous marked 461 // item and the new one should be taken as the base 462 if (this.markedItemIndex < 0) throw new InvalidMarkException(); 463 ByteBuffer markedItem = this.items[this.markedItemIndex]; 464 markedItem.reset(); 465 this.curItem = markedItem; 466 // All items after the marked position upto the current item should be reset to 0 467 for (int i = this.curItemIndex; i > this.markedItemIndex; i--) { 468 this.items[i].position(0); 469 } 470 this.curItemIndex = this.markedItemIndex; 471 return this; 472 } 473 474 /** 475 * Returns the number of elements between the current position and the 476 * limit. 477 * @return the remaining elements in this MBB 478 */ 479 @Override 480 public int remaining() { 481 checkRefCount(); 482 int remain = 0; 483 for (int i = curItemIndex; i < items.length; i++) { 484 remain += items[i].remaining(); 485 } 486 return remain; 487 } 488 489 /** 490 * Returns true if there are elements between the current position and the limt 491 * @return true if there are elements, false otherwise 492 */ 493 @Override 494 public final boolean hasRemaining() { 495 checkRefCount(); 496 return this.curItem.hasRemaining() || (this.curItemIndex < this.limitedItemIndex 497 && this.items[this.curItemIndex + 1].hasRemaining()); 498 } 499 500 /** 501 * A relative method that returns byte at the current position. Increments the 502 * current position by the size of a byte. 503 * @return the byte at the current position 504 */ 505 @Override 506 public byte get() { 507 checkRefCount(); 508 if (this.curItem.remaining() == 0) { 509 if (items.length - 1 == this.curItemIndex) { 510 // means cur item is the last one and we wont be able to read a long. Throw exception 511 throw new BufferUnderflowException(); 512 } 513 this.curItemIndex++; 514 this.curItem = this.items[this.curItemIndex]; 515 } 516 return this.curItem.get(); 517 } 518 519 /** 520 * Returns the short value at the current position. Also advances the position by the size 521 * of short 522 * 523 * @return the short value at the current position 524 */ 525 @Override 526 public short getShort() { 527 checkRefCount(); 528 int remaining = this.curItem.remaining(); 529 if (remaining >= Bytes.SIZEOF_SHORT) { 530 return this.curItem.getShort(); 531 } 532 short n = 0; 533 n = (short) (n ^ (get() & 0xFF)); 534 n = (short) (n << 8); 535 n = (short) (n ^ (get() & 0xFF)); 536 return n; 537 } 538 539 /** 540 * Returns the int value at the current position. Also advances the position by the size of int 541 * 542 * @return the int value at the current position 543 */ 544 @Override 545 public int getInt() { 546 checkRefCount(); 547 int remaining = this.curItem.remaining(); 548 if (remaining >= Bytes.SIZEOF_INT) { 549 return this.curItem.getInt(); 550 } 551 int n = 0; 552 for (int i = 0; i < Bytes.SIZEOF_INT; i++) { 553 n <<= 8; 554 n ^= get() & 0xFF; 555 } 556 return n; 557 } 558 559 560 /** 561 * Returns the long value at the current position. Also advances the position by the size of long 562 * 563 * @return the long value at the current position 564 */ 565 @Override 566 public long getLong() { 567 checkRefCount(); 568 int remaining = this.curItem.remaining(); 569 if (remaining >= Bytes.SIZEOF_LONG) { 570 return this.curItem.getLong(); 571 } 572 long l = 0; 573 for (int i = 0; i < Bytes.SIZEOF_LONG; i++) { 574 l <<= 8; 575 l ^= get() & 0xFF; 576 } 577 return l; 578 } 579 580 /** 581 * Copies the content from this MBB's current position to the byte array and fills it. Also 582 * advances the position of the MBB by the length of the byte[]. 583 * @param dst 584 */ 585 @Override 586 public void get(byte[] dst) { 587 get(dst, 0, dst.length); 588 } 589 590 /** 591 * Copies the specified number of bytes from this MBB's current position to the byte[]'s offset. 592 * Also advances the position of the MBB by the given length. 593 * @param dst 594 * @param offset within the current array 595 * @param length upto which the bytes to be copied 596 */ 597 @Override 598 public void get(byte[] dst, int offset, int length) { 599 checkRefCount(); 600 while (length > 0) { 601 int toRead = Math.min(length, this.curItem.remaining()); 602 ByteBufferUtils.copyFromBufferToArray(dst, this.curItem, this.curItem.position(), offset, 603 toRead); 604 this.curItem.position(this.curItem.position() + toRead); 605 length -= toRead; 606 if (length == 0) break; 607 this.curItemIndex++; 608 this.curItem = this.items[this.curItemIndex]; 609 offset += toRead; 610 } 611 } 612 613 @Override 614 public void get(int sourceOffset, byte[] dst, int offset, int length) { 615 checkRefCount(); 616 int itemIndex = getItemIndex(sourceOffset); 617 ByteBuffer item = this.items[itemIndex]; 618 sourceOffset = sourceOffset - this.itemBeginPos[itemIndex]; 619 while (length > 0) { 620 int toRead = Math.min((item.limit() - sourceOffset), length); 621 ByteBufferUtils.copyFromBufferToArray(dst, item, sourceOffset, offset, toRead); 622 length -= toRead; 623 if (length == 0) break; 624 itemIndex++; 625 item = this.items[itemIndex]; 626 offset += toRead; 627 sourceOffset = 0; 628 } 629 } 630 631 /** 632 * Marks the limit of this MBB. 633 * @param limit 634 * @return This MBB 635 */ 636 @Override 637 public MultiByteBuff limit(int limit) { 638 checkRefCount(); 639 this.limit = limit; 640 // Normally the limit will try to limit within the last BB item 641 int limitedIndexBegin = this.itemBeginPos[this.limitedItemIndex]; 642 if (limit >= limitedIndexBegin && limit < this.itemBeginPos[this.limitedItemIndex + 1]) { 643 this.items[this.limitedItemIndex].limit(limit - limitedIndexBegin); 644 return this; 645 } 646 int itemIndex = getItemIndex(limit); 647 int beginOffset = this.itemBeginPos[itemIndex]; 648 int offsetInItem = limit - beginOffset; 649 ByteBuffer item = items[itemIndex]; 650 item.limit(offsetInItem); 651 for (int i = this.limitedItemIndex; i < itemIndex; i++) { 652 this.items[i].limit(this.items[i].capacity()); 653 } 654 this.limitedItemIndex = itemIndex; 655 for (int i = itemIndex + 1; i < this.items.length; i++) { 656 this.items[i].limit(this.items[i].position()); 657 } 658 return this; 659 } 660 661 /** 662 * Returns the limit of this MBB 663 * @return limit of the MBB 664 */ 665 @Override 666 public int limit() { 667 return this.limit; 668 } 669 670 /** 671 * Returns an MBB which is a sliced version of this MBB. The position, limit and mark 672 * of the new MBB will be independent than that of the original MBB. 673 * The content of the new MBB will start at this MBB's current position 674 * @return a sliced MBB 675 */ 676 @Override 677 public MultiByteBuff slice() { 678 checkRefCount(); 679 ByteBuffer[] copy = new ByteBuffer[this.limitedItemIndex - this.curItemIndex + 1]; 680 for (int i = curItemIndex, j = 0; i <= this.limitedItemIndex; i++, j++) { 681 copy[j] = this.items[i].slice(); 682 } 683 return new MultiByteBuff(refCnt, copy); 684 } 685 686 /** 687 * Returns an MBB which is a duplicate version of this MBB. The position, limit and mark of the 688 * new MBB will be independent than that of the original MBB. The content of the new MBB will 689 * start at this MBB's current position The position, limit and mark of the new MBB would be 690 * identical to this MBB in terms of values. 691 * @return a duplicated MBB 692 */ 693 @Override 694 public MultiByteBuff duplicate() { 695 checkRefCount(); 696 ByteBuffer[] itemsCopy = new ByteBuffer[this.items.length]; 697 for (int i = 0; i < this.items.length; i++) { 698 itemsCopy[i] = items[i].duplicate(); 699 } 700 return new MultiByteBuff(refCnt, itemsCopy, this.itemBeginPos, this.limit, 701 this.limitedItemIndex, this.curItemIndex, this.markedItemIndex); 702 } 703 704 /** 705 * Writes a byte to this MBB at the current position and increments the position 706 * @param b 707 * @return this object 708 */ 709 @Override 710 public MultiByteBuff put(byte b) { 711 checkRefCount(); 712 if (this.curItem.remaining() == 0) { 713 if (this.curItemIndex == this.items.length - 1) { 714 throw new BufferOverflowException(); 715 } 716 this.curItemIndex++; 717 this.curItem = this.items[this.curItemIndex]; 718 } 719 this.curItem.put(b); 720 return this; 721 } 722 723 /** 724 * Writes a byte to this MBB at the given index 725 * @param index 726 * @param b 727 * @return this object 728 */ 729 @Override 730 public MultiByteBuff put(int index, byte b) { 731 checkRefCount(); 732 int itemIndex = getItemIndex(limit); 733 ByteBuffer item = items[itemIndex]; 734 item.put(index - itemBeginPos[itemIndex], b); 735 return this; 736 } 737 738 /** 739 * Copies from a src MBB to this MBB. 740 * @param offset the position in this MBB to which the copy should happen 741 * @param src the src MBB 742 * @param srcOffset the offset in the src MBB from where the elements should be read 743 * @param length the length upto which the copy should happen 744 */ 745 @Override 746 public MultiByteBuff put(int offset, ByteBuff src, int srcOffset, int length) { 747 checkRefCount(); 748 int destItemIndex = getItemIndex(offset); 749 int srcItemIndex = getItemIndex(srcOffset); 750 ByteBuffer destItem = this.items[destItemIndex]; 751 offset = offset - this.itemBeginPos[destItemIndex]; 752 753 ByteBuffer srcItem = getItemByteBuffer(src, srcItemIndex); 754 srcOffset = srcOffset - this.itemBeginPos[srcItemIndex]; 755 int toRead, toWrite, toMove; 756 while (length > 0) { 757 toWrite = destItem.limit() - offset; 758 toRead = srcItem.limit() - srcOffset; 759 toMove = Math.min(length, Math.min(toRead, toWrite)); 760 ByteBufferUtils.copyFromBufferToBuffer(srcItem, destItem, srcOffset, offset, toMove); 761 length -= toMove; 762 if (length == 0) break; 763 if (toRead < toWrite) { 764 srcItem = getItemByteBuffer(src, ++srcItemIndex); 765 srcOffset = 0; 766 offset += toMove; 767 } else if (toRead > toWrite) { 768 destItem = this.items[++destItemIndex]; 769 offset = 0; 770 srcOffset += toMove; 771 } else { 772 // toRead = toWrite case 773 srcItem = getItemByteBuffer(src, ++srcItemIndex); 774 srcOffset = 0; 775 destItem = this.items[++destItemIndex]; 776 offset = 0; 777 } 778 } 779 return this; 780 } 781 782 private static ByteBuffer getItemByteBuffer(ByteBuff buf, int index) { 783 return (buf instanceof SingleByteBuff) ? buf.nioByteBuffers()[0] 784 : ((MultiByteBuff) buf).items[index]; 785 } 786 787 /** 788 * Writes an int to this MBB at its current position. Also advances the position by size of int 789 * @param val Int value to write 790 * @return this object 791 */ 792 @Override 793 public MultiByteBuff putInt(int val) { 794 checkRefCount(); 795 if (this.curItem.remaining() >= Bytes.SIZEOF_INT) { 796 this.curItem.putInt(val); 797 return this; 798 } 799 if (this.curItemIndex == this.items.length - 1) { 800 throw new BufferOverflowException(); 801 } 802 // During read, we will read as byte by byte for this case. So just write in Big endian 803 put(int3(val)); 804 put(int2(val)); 805 put(int1(val)); 806 put(int0(val)); 807 return this; 808 } 809 810 private static byte int3(int x) { 811 return (byte) (x >> 24); 812 } 813 814 private static byte int2(int x) { 815 return (byte) (x >> 16); 816 } 817 818 private static byte int1(int x) { 819 return (byte) (x >> 8); 820 } 821 822 private static byte int0(int x) { 823 return (byte) (x); 824 } 825 826 /** 827 * Copies from the given byte[] to this MBB 828 * @param src 829 * @return this MBB 830 */ 831 @Override 832 public final MultiByteBuff put(byte[] src) { 833 return put(src, 0, src.length); 834 } 835 836 /** 837 * Copies from the given byte[] to this MBB 838 * @param src 839 * @param offset the position in the byte array from which the copy should be done 840 * @param length the length upto which the copy should happen 841 * @return this MBB 842 */ 843 @Override 844 public MultiByteBuff put(byte[] src, int offset, int length) { 845 checkRefCount(); 846 if (this.curItem.remaining() >= length) { 847 ByteBufferUtils.copyFromArrayToBuffer(this.curItem, src, offset, length); 848 return this; 849 } 850 int end = offset + length; 851 for (int i = offset; i < end; i++) { 852 this.put(src[i]); 853 } 854 return this; 855 } 856 857 858 /** 859 * Writes a long to this MBB at its current position. Also advances the position by size of long 860 * @param val Long value to write 861 * @return this object 862 */ 863 @Override 864 public MultiByteBuff putLong(long val) { 865 checkRefCount(); 866 if (this.curItem.remaining() >= Bytes.SIZEOF_LONG) { 867 this.curItem.putLong(val); 868 return this; 869 } 870 if (this.curItemIndex == this.items.length - 1) { 871 throw new BufferOverflowException(); 872 } 873 // During read, we will read as byte by byte for this case. So just write in Big endian 874 put(long7(val)); 875 put(long6(val)); 876 put(long5(val)); 877 put(long4(val)); 878 put(long3(val)); 879 put(long2(val)); 880 put(long1(val)); 881 put(long0(val)); 882 return this; 883 } 884 885 private static byte long7(long x) { 886 return (byte) (x >> 56); 887 } 888 889 private static byte long6(long x) { 890 return (byte) (x >> 48); 891 } 892 893 private static byte long5(long x) { 894 return (byte) (x >> 40); 895 } 896 897 private static byte long4(long x) { 898 return (byte) (x >> 32); 899 } 900 901 private static byte long3(long x) { 902 return (byte) (x >> 24); 903 } 904 905 private static byte long2(long x) { 906 return (byte) (x >> 16); 907 } 908 909 private static byte long1(long x) { 910 return (byte) (x >> 8); 911 } 912 913 private static byte long0(long x) { 914 return (byte) (x); 915 } 916 917 /** 918 * Jumps the current position of this MBB by specified length. 919 * @param length 920 */ 921 @Override 922 public MultiByteBuff skip(int length) { 923 checkRefCount(); 924 // Get available bytes from this item and remaining from next 925 int jump = 0; 926 while (true) { 927 jump = this.curItem.remaining(); 928 if (jump >= length) { 929 this.curItem.position(this.curItem.position() + length); 930 break; 931 } 932 this.curItem.position(this.curItem.position() + jump); 933 length -= jump; 934 this.curItemIndex++; 935 this.curItem = this.items[this.curItemIndex]; 936 } 937 return this; 938 } 939 940 /** 941 * Jumps back the current position of this MBB by specified length. 942 * @param length 943 */ 944 @Override 945 public MultiByteBuff moveBack(int length) { 946 checkRefCount(); 947 while (length != 0) { 948 if (length > curItem.position()) { 949 length -= curItem.position(); 950 this.curItem.position(0); 951 this.curItemIndex--; 952 this.curItem = this.items[curItemIndex]; 953 } else { 954 this.curItem.position(curItem.position() - length); 955 break; 956 } 957 } 958 return this; 959 } 960 961 /** 962 * Returns bytes from current position till length specified, as a single ByteBuffer. When all 963 * these bytes happen to be in a single ByteBuffer, which this object wraps, that ByteBuffer item 964 * as such will be returned. So users are warned not to change the position or limit of this 965 * returned ByteBuffer. The position of the returned byte buffer is at the begin of the required 966 * bytes. When the required bytes happen to span across multiple ByteBuffers, this API will copy 967 * the bytes to a newly created ByteBuffer of required size and return that. 968 * 969 * @param length number of bytes required. 970 * @return bytes from current position till length specified, as a single ByteButter. 971 */ 972 @Override 973 public ByteBuffer asSubByteBuffer(int length) { 974 checkRefCount(); 975 if (this.curItem.remaining() >= length) { 976 return this.curItem; 977 } 978 int offset = 0; 979 byte[] dupB = new byte[length]; 980 int locCurItemIndex = curItemIndex; 981 ByteBuffer locCurItem = curItem; 982 while (length > 0) { 983 int toRead = Math.min(length, locCurItem.remaining()); 984 ByteBufferUtils.copyFromBufferToArray(dupB, locCurItem, locCurItem.position(), offset, 985 toRead); 986 length -= toRead; 987 if (length == 0) break; 988 locCurItemIndex++; 989 locCurItem = this.items[locCurItemIndex]; 990 offset += toRead; 991 } 992 return ByteBuffer.wrap(dupB); 993 } 994 995 /** 996 * Returns bytes from given offset till length specified, as a single ByteBuffer. When all these 997 * bytes happen to be in a single ByteBuffer, which this object wraps, that ByteBuffer item as 998 * such will be returned (with offset in this ByteBuffer where the bytes starts). So users are 999 * warned not to change the position or limit of this returned ByteBuffer. When the required bytes 1000 * happen to span across multiple ByteBuffers, this API will copy the bytes to a newly created 1001 * ByteBuffer of required size and return that. 1002 * 1003 * @param offset the offset in this MBB from where the subBuffer should be created 1004 * @param length the length of the subBuffer 1005 * @param pair a pair that will have the bytes from the current position till length specified, as 1006 * a single ByteBuffer and offset in that Buffer where the bytes starts. The method would 1007 * set the values on the pair that is passed in by the caller 1008 */ 1009 @Override 1010 public void asSubByteBuffer(int offset, int length, ObjectIntPair<ByteBuffer> pair) { 1011 checkRefCount(); 1012 if (this.itemBeginPos[this.curItemIndex] <= offset) { 1013 int relOffsetInCurItem = offset - this.itemBeginPos[this.curItemIndex]; 1014 if (this.curItem.limit() - relOffsetInCurItem >= length) { 1015 pair.setFirst(this.curItem); 1016 pair.setSecond(relOffsetInCurItem); 1017 return; 1018 } 1019 } 1020 int itemIndex = getItemIndex(offset); 1021 ByteBuffer item = this.items[itemIndex]; 1022 offset = offset - this.itemBeginPos[itemIndex]; 1023 if (item.limit() - offset >= length) { 1024 pair.setFirst(item); 1025 pair.setSecond(offset); 1026 return; 1027 } 1028 byte[] dst = new byte[length]; 1029 int destOffset = 0; 1030 while (length > 0) { 1031 int toRead = Math.min(length, item.limit() - offset); 1032 ByteBufferUtils.copyFromBufferToArray(dst, item, offset, destOffset, toRead); 1033 length -= toRead; 1034 if (length == 0) break; 1035 itemIndex++; 1036 item = this.items[itemIndex]; 1037 destOffset += toRead; 1038 offset = 0; 1039 } 1040 pair.setFirst(ByteBuffer.wrap(dst)); 1041 pair.setSecond(0); 1042 } 1043 1044 /** 1045 * Copies the content from an this MBB to a ByteBuffer 1046 * @param out the ByteBuffer to which the copy has to happen, its position will be advanced. 1047 * @param sourceOffset the offset in the MBB from which the elements has to be copied 1048 * @param length the length in the MBB upto which the elements has to be copied 1049 */ 1050 @Override 1051 public void get(ByteBuffer out, int sourceOffset, int length) { 1052 checkRefCount(); 1053 int itemIndex = getItemIndex(sourceOffset); 1054 ByteBuffer in = this.items[itemIndex]; 1055 sourceOffset = sourceOffset - this.itemBeginPos[itemIndex]; 1056 while (length > 0) { 1057 int toRead = Math.min(in.limit() - sourceOffset, length); 1058 ByteBufferUtils.copyFromBufferToBuffer(in, out, sourceOffset, toRead); 1059 length -= toRead; 1060 if (length == 0) { 1061 break; 1062 } 1063 itemIndex++; 1064 in = this.items[itemIndex]; 1065 sourceOffset = 0; 1066 } 1067 } 1068 1069 /** 1070 * Copy the content from this MBB to a byte[] based on the given offset and 1071 * length 1072 * 1073 * @param offset 1074 * the position from where the copy should start 1075 * @param length 1076 * the length upto which the copy has to be done 1077 * @return byte[] with the copied contents from this MBB. 1078 */ 1079 @Override 1080 public byte[] toBytes(int offset, int length) { 1081 checkRefCount(); 1082 byte[] output = new byte[length]; 1083 this.get(offset, output, 0, length); 1084 return output; 1085 } 1086 1087 private int internalRead(ReadableByteChannel channel, long offset, 1088 ChannelReader reader) throws IOException { 1089 checkRefCount(); 1090 int total = 0; 1091 while (buffsIterator.hasNext()) { 1092 ByteBuffer buffer = buffsIterator.next(); 1093 int len = read(channel, buffer, offset, reader); 1094 if (len > 0) { 1095 total += len; 1096 offset += len; 1097 } 1098 if (buffer.hasRemaining()) { 1099 break; 1100 } 1101 } 1102 return total; 1103 } 1104 1105 @Override 1106 public int read(ReadableByteChannel channel) throws IOException { 1107 return internalRead(channel, 0, CHANNEL_READER); 1108 } 1109 1110 @Override 1111 public int read(FileChannel channel, long offset) throws IOException { 1112 return internalRead(channel, offset, FILE_READER); 1113 } 1114 1115 @Override 1116 public int write(FileChannel channel, long offset) throws IOException { 1117 checkRefCount(); 1118 int total = 0; 1119 while (buffsIterator.hasNext()) { 1120 ByteBuffer buffer = buffsIterator.next(); 1121 while (buffer.hasRemaining()) { 1122 int len = channel.write(buffer, offset); 1123 total += len; 1124 offset += len; 1125 } 1126 } 1127 return total; 1128 } 1129 1130 @Override 1131 public ByteBuffer[] nioByteBuffers() { 1132 checkRefCount(); 1133 return this.items; 1134 } 1135 1136 @Override 1137 public boolean equals(Object obj) { 1138 if (!(obj instanceof MultiByteBuff)) return false; 1139 if (this == obj) return true; 1140 MultiByteBuff that = (MultiByteBuff) obj; 1141 if (this.capacity() != that.capacity()) return false; 1142 if (ByteBuff.compareTo(this, this.position(), this.limit(), that, that.position(), 1143 that.limit()) == 0) { 1144 return true; 1145 } 1146 return false; 1147 } 1148 1149 @Override 1150 public int hashCode() { 1151 int hash = 0; 1152 for (ByteBuffer b : this.items) { 1153 hash += b.hashCode(); 1154 } 1155 return hash; 1156 } 1157 1158 @Override 1159 public MultiByteBuff retain() { 1160 refCnt.retain(); 1161 return this; 1162 } 1163}