001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.nio; 019 020import java.io.IOException; 021import java.nio.BufferOverflowException; 022import java.nio.BufferUnderflowException; 023import java.nio.ByteBuffer; 024import java.nio.InvalidMarkException; 025import java.nio.channels.ReadableByteChannel; 026 027import org.apache.hadoop.hbase.util.ByteBufferUtils; 028import org.apache.hadoop.hbase.util.Bytes; 029import org.apache.hadoop.hbase.util.ObjectIntPair; 030import org.apache.yetus.audience.InterfaceAudience; 031 032import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 033 034/** 035 * Provides a unified view of all the underlying ByteBuffers and will look as if a bigger 036 * sequential buffer. This class provides similar APIs as in {@link ByteBuffer} to put/get int, 037 * short, long etc and doing operations like mark, reset, slice etc. This has to be used when 038 * data is split across multiple byte buffers and we don't want copy them to single buffer 039 * for reading from it. 040 */ 041@InterfaceAudience.Private 042public class MultiByteBuff extends ByteBuff { 043 044 private final ByteBuffer[] items; 045 // Pointer to the current item in the MBB 046 private ByteBuffer curItem = null; 047 // Index of the current item in the MBB 048 private int curItemIndex = 0; 049 050 private int limit = 0; 051 private int limitedItemIndex; 052 private int markedItemIndex = -1; 053 private final int[] itemBeginPos; 054 055 public MultiByteBuff(ByteBuffer... items) { 056 assert items != null; 057 assert items.length > 0; 058 this.items = items; 059 this.curItem = this.items[this.curItemIndex]; 060 // See below optimization in getInt(int) where we check whether the given index land in current 061 // item. For this we need to check whether the passed index is less than the next item begin 062 // offset. To handle this effectively for the last item buffer, we add an extra item into this 063 // array. 064 itemBeginPos = new int[items.length + 1]; 065 int offset = 0; 066 for (int i = 0; i < items.length; i++) { 067 ByteBuffer item = items[i]; 068 item.rewind(); 069 itemBeginPos[i] = offset; 070 int l = item.limit() - item.position(); 071 offset += l; 072 } 073 this.limit = offset; 074 this.itemBeginPos[items.length] = offset + 1; 075 this.limitedItemIndex = this.items.length - 1; 076 } 077 078 private MultiByteBuff(ByteBuffer[] items, int[] itemBeginPos, int limit, int limitedIndex, 079 int curItemIndex, int markedIndex) { 080 this.items = items; 081 this.curItemIndex = curItemIndex; 082 this.curItem = this.items[this.curItemIndex]; 083 this.itemBeginPos = itemBeginPos; 084 this.limit = limit; 085 this.limitedItemIndex = limitedIndex; 086 this.markedItemIndex = markedIndex; 087 } 088 089 /** 090 * @throws UnsupportedOperationException MBB does not support 091 * array based operations 092 */ 093 @Override 094 public byte[] array() { 095 throw new UnsupportedOperationException(); 096 } 097 098 /** 099 * @throws UnsupportedOperationException MBB does not 100 * support array based operations 101 */ 102 @Override 103 public int arrayOffset() { 104 throw new UnsupportedOperationException(); 105 } 106 107 /** 108 * @return false. MBB does not support array based operations 109 */ 110 @Override 111 public boolean hasArray() { 112 return false; 113 } 114 115 /** 116 * @return the total capacity of this MultiByteBuffer. 117 */ 118 @Override 119 public int capacity() { 120 int c = 0; 121 for (ByteBuffer item : this.items) { 122 c += item.capacity(); 123 } 124 return c; 125 } 126 127 /** 128 * Fetches the byte at the given index. Does not change position of the underlying ByteBuffers 129 * @param index 130 * @return the byte at the given index 131 */ 132 @Override 133 public byte get(int index) { 134 int itemIndex = getItemIndex(index); 135 return ByteBufferUtils.toByte(this.items[itemIndex], index - this.itemBeginPos[itemIndex]); 136 } 137 138 @Override 139 public byte getByteAfterPosition(int offset) { 140 // Mostly the index specified will land within this current item. Short circuit for that 141 int index = offset + this.position(); 142 int itemIndex = getItemIndexFromCurItemIndex(index); 143 return ByteBufferUtils.toByte(this.items[itemIndex], index - this.itemBeginPos[itemIndex]); 144 } 145 146 /* 147 * Returns in which sub ByteBuffer, the given element index will be available. 148 */ 149 private int getItemIndex(int elemIndex) { 150 int index = 1; 151 while (elemIndex >= this.itemBeginPos[index]) { 152 index++; 153 if (index == this.itemBeginPos.length) { 154 throw new IndexOutOfBoundsException(); 155 } 156 } 157 return index - 1; 158 } 159 160 /* 161 * Returns in which sub ByteBuffer, the given element index will be available. In this case we are 162 * sure that the item will be after MBB's current position 163 */ 164 private int getItemIndexFromCurItemIndex(int elemIndex) { 165 int index = this.curItemIndex; 166 while (elemIndex >= this.itemBeginPos[index]) { 167 index++; 168 if (index == this.itemBeginPos.length) { 169 throw new IndexOutOfBoundsException(); 170 } 171 } 172 return index - 1; 173 } 174 175 /** 176 * Fetches the int at the given index. Does not change position of the underlying ByteBuffers 177 * @param index 178 * @return the int value at the given index 179 */ 180 @Override 181 public int getInt(int index) { 182 // Mostly the index specified will land within this current item. Short circuit for that 183 int itemIndex; 184 if (this.itemBeginPos[this.curItemIndex] <= index 185 && this.itemBeginPos[this.curItemIndex + 1] > index) { 186 itemIndex = this.curItemIndex; 187 } else { 188 itemIndex = getItemIndex(index); 189 } 190 return getInt(index, itemIndex); 191 } 192 193 @Override 194 public int getIntAfterPosition(int offset) { 195 // Mostly the index specified will land within this current item. Short circuit for that 196 int index = offset + this.position(); 197 int itemIndex; 198 if (this.itemBeginPos[this.curItemIndex + 1] > index) { 199 itemIndex = this.curItemIndex; 200 } else { 201 itemIndex = getItemIndexFromCurItemIndex(index); 202 } 203 return getInt(index, itemIndex); 204 } 205 206 /** 207 * Fetches the short at the given index. Does not change position of the underlying ByteBuffers 208 * @param index 209 * @return the short value at the given index 210 */ 211 @Override 212 public short getShort(int index) { 213 // Mostly the index specified will land within this current item. Short circuit for that 214 int itemIndex; 215 if (this.itemBeginPos[this.curItemIndex] <= index 216 && this.itemBeginPos[this.curItemIndex + 1] > index) { 217 itemIndex = this.curItemIndex; 218 } else { 219 itemIndex = getItemIndex(index); 220 } 221 ByteBuffer item = items[itemIndex]; 222 int offsetInItem = index - this.itemBeginPos[itemIndex]; 223 if (item.limit() - offsetInItem >= Bytes.SIZEOF_SHORT) { 224 return ByteBufferUtils.toShort(item, offsetInItem); 225 } 226 if (items.length - 1 == itemIndex) { 227 // means cur item is the last one and we wont be able to read a int. Throw exception 228 throw new BufferUnderflowException(); 229 } 230 ByteBuffer nextItem = items[itemIndex + 1]; 231 // Get available one byte from this item and remaining one from next 232 short n = 0; 233 n = (short) (n ^ (ByteBufferUtils.toByte(item, offsetInItem) & 0xFF)); 234 n = (short) (n << 8); 235 n = (short) (n ^ (ByteBufferUtils.toByte(nextItem, 0) & 0xFF)); 236 return n; 237 } 238 239 @Override 240 public short getShortAfterPosition(int offset) { 241 // Mostly the index specified will land within this current item. Short circuit for that 242 int index = offset + this.position(); 243 int itemIndex; 244 if (this.itemBeginPos[this.curItemIndex + 1] > index) { 245 itemIndex = this.curItemIndex; 246 } else { 247 itemIndex = getItemIndexFromCurItemIndex(index); 248 } 249 return getShort(index, itemIndex); 250 } 251 252 private int getInt(int index, int itemIndex) { 253 ByteBuffer item = items[itemIndex]; 254 int offsetInItem = index - this.itemBeginPos[itemIndex]; 255 int remainingLen = item.limit() - offsetInItem; 256 if (remainingLen >= Bytes.SIZEOF_INT) { 257 return ByteBufferUtils.toInt(item, offsetInItem); 258 } 259 if (items.length - 1 == itemIndex) { 260 // means cur item is the last one and we wont be able to read a int. Throw exception 261 throw new BufferUnderflowException(); 262 } 263 int l = 0; 264 for (int i = 0; i < Bytes.SIZEOF_INT; i++) { 265 l <<= 8; 266 l ^= get(index + i) & 0xFF; 267 } 268 return l; 269 } 270 271 private short getShort(int index, int itemIndex) { 272 ByteBuffer item = items[itemIndex]; 273 int offsetInItem = index - this.itemBeginPos[itemIndex]; 274 int remainingLen = item.limit() - offsetInItem; 275 if (remainingLen >= Bytes.SIZEOF_SHORT) { 276 return ByteBufferUtils.toShort(item, offsetInItem); 277 } 278 if (items.length - 1 == itemIndex) { 279 // means cur item is the last one and we wont be able to read a short. Throw exception 280 throw new BufferUnderflowException(); 281 } 282 ByteBuffer nextItem = items[itemIndex + 1]; 283 // Get available bytes from this item and remaining from next 284 short l = 0; 285 for (int i = offsetInItem; i < item.capacity(); i++) { 286 l = (short) (l << 8); 287 l = (short) (l ^ (ByteBufferUtils.toByte(item, i) & 0xFF)); 288 } 289 for (int i = 0; i < Bytes.SIZEOF_SHORT - remainingLen; i++) { 290 l = (short) (l << 8); 291 l = (short) (l ^ (ByteBufferUtils.toByte(nextItem, i) & 0xFF)); 292 } 293 return l; 294 } 295 296 private long getLong(int index, int itemIndex) { 297 ByteBuffer item = items[itemIndex]; 298 int offsetInItem = index - this.itemBeginPos[itemIndex]; 299 int remainingLen = item.limit() - offsetInItem; 300 if (remainingLen >= Bytes.SIZEOF_LONG) { 301 return ByteBufferUtils.toLong(item, offsetInItem); 302 } 303 if (items.length - 1 == itemIndex) { 304 // means cur item is the last one and we wont be able to read a long. Throw exception 305 throw new BufferUnderflowException(); 306 } 307 long l = 0; 308 for (int i = 0; i < Bytes.SIZEOF_LONG; i++) { 309 l <<= 8; 310 l ^= get(index + i) & 0xFF; 311 } 312 return l; 313 } 314 315 /** 316 * Fetches the long at the given index. Does not change position of the underlying ByteBuffers 317 * @param index 318 * @return the long value at the given index 319 */ 320 @Override 321 public long getLong(int index) { 322 // Mostly the index specified will land within this current item. Short circuit for that 323 int itemIndex; 324 if (this.itemBeginPos[this.curItemIndex] <= index 325 && this.itemBeginPos[this.curItemIndex + 1] > index) { 326 itemIndex = this.curItemIndex; 327 } else { 328 itemIndex = getItemIndex(index); 329 } 330 return getLong(index, itemIndex); 331 } 332 333 @Override 334 public long getLongAfterPosition(int offset) { 335 // Mostly the index specified will land within this current item. Short circuit for that 336 int index = offset + this.position(); 337 int itemIndex; 338 if (this.itemBeginPos[this.curItemIndex + 1] > index) { 339 itemIndex = this.curItemIndex; 340 } else { 341 itemIndex = getItemIndexFromCurItemIndex(index); 342 } 343 return getLong(index, itemIndex); 344 } 345 346 /** 347 * @return this MBB's current position 348 */ 349 @Override 350 public int position() { 351 return itemBeginPos[this.curItemIndex] + this.curItem.position(); 352 } 353 354 /** 355 * Sets this MBB's position to the given value. 356 * @param position 357 * @return this object 358 */ 359 @Override 360 public MultiByteBuff position(int position) { 361 // Short circuit for positioning within the cur item. Mostly that is the case. 362 if (this.itemBeginPos[this.curItemIndex] <= position 363 && this.itemBeginPos[this.curItemIndex + 1] > position) { 364 this.curItem.position(position - this.itemBeginPos[this.curItemIndex]); 365 return this; 366 } 367 int itemIndex = getItemIndex(position); 368 // All items from 0 - curItem-1 set position at end. 369 for (int i = 0; i < itemIndex; i++) { 370 this.items[i].position(this.items[i].limit()); 371 } 372 // All items after curItem set position at begin 373 for (int i = itemIndex + 1; i < this.items.length; i++) { 374 this.items[i].position(0); 375 } 376 this.curItem = this.items[itemIndex]; 377 this.curItem.position(position - this.itemBeginPos[itemIndex]); 378 this.curItemIndex = itemIndex; 379 return this; 380 } 381 382 /** 383 * Rewinds this MBB and the position is set to 0 384 * @return this object 385 */ 386 @Override 387 public MultiByteBuff rewind() { 388 for (int i = 0; i < this.items.length; i++) { 389 this.items[i].rewind(); 390 } 391 this.curItemIndex = 0; 392 this.curItem = this.items[this.curItemIndex]; 393 this.markedItemIndex = -1; 394 return this; 395 } 396 397 /** 398 * Marks the current position of the MBB 399 * @return this object 400 */ 401 @Override 402 public MultiByteBuff mark() { 403 this.markedItemIndex = this.curItemIndex; 404 this.curItem.mark(); 405 return this; 406 } 407 408 /** 409 * Similar to {@link ByteBuffer}.reset(), ensures that this MBB 410 * is reset back to last marked position. 411 * @return This MBB 412 */ 413 @Override 414 public MultiByteBuff reset() { 415 // when the buffer is moved to the next one.. the reset should happen on the previous marked 416 // item and the new one should be taken as the base 417 if (this.markedItemIndex < 0) throw new InvalidMarkException(); 418 ByteBuffer markedItem = this.items[this.markedItemIndex]; 419 markedItem.reset(); 420 this.curItem = markedItem; 421 // All items after the marked position upto the current item should be reset to 0 422 for (int i = this.curItemIndex; i > this.markedItemIndex; i--) { 423 this.items[i].position(0); 424 } 425 this.curItemIndex = this.markedItemIndex; 426 return this; 427 } 428 429 /** 430 * Returns the number of elements between the current position and the 431 * limit. 432 * @return the remaining elements in this MBB 433 */ 434 @Override 435 public int remaining() { 436 int remain = 0; 437 for (int i = curItemIndex; i < items.length; i++) { 438 remain += items[i].remaining(); 439 } 440 return remain; 441 } 442 443 /** 444 * Returns true if there are elements between the current position and the limt 445 * @return true if there are elements, false otherwise 446 */ 447 @Override 448 public final boolean hasRemaining() { 449 return this.curItem.hasRemaining() || (this.curItemIndex < this.limitedItemIndex 450 && this.items[this.curItemIndex + 1].hasRemaining()); 451 } 452 453 /** 454 * A relative method that returns byte at the current position. Increments the 455 * current position by the size of a byte. 456 * @return the byte at the current position 457 */ 458 @Override 459 public byte get() { 460 if (this.curItem.remaining() == 0) { 461 if (items.length - 1 == this.curItemIndex) { 462 // means cur item is the last one and we wont be able to read a long. Throw exception 463 throw new BufferUnderflowException(); 464 } 465 this.curItemIndex++; 466 this.curItem = this.items[this.curItemIndex]; 467 } 468 return this.curItem.get(); 469 } 470 471 /** 472 * Returns the short value at the current position. Also advances the position by the size 473 * of short 474 * 475 * @return the short value at the current position 476 */ 477 @Override 478 public short getShort() { 479 int remaining = this.curItem.remaining(); 480 if (remaining >= Bytes.SIZEOF_SHORT) { 481 return this.curItem.getShort(); 482 } 483 short n = 0; 484 n = (short) (n ^ (get() & 0xFF)); 485 n = (short) (n << 8); 486 n = (short) (n ^ (get() & 0xFF)); 487 return n; 488 } 489 490 /** 491 * Returns the int value at the current position. Also advances the position by the size of int 492 * 493 * @return the int value at the current position 494 */ 495 @Override 496 public int getInt() { 497 int remaining = this.curItem.remaining(); 498 if (remaining >= Bytes.SIZEOF_INT) { 499 return this.curItem.getInt(); 500 } 501 int n = 0; 502 for (int i = 0; i < Bytes.SIZEOF_INT; i++) { 503 n <<= 8; 504 n ^= get() & 0xFF; 505 } 506 return n; 507 } 508 509 510 /** 511 * Returns the long value at the current position. Also advances the position by the size of long 512 * 513 * @return the long value at the current position 514 */ 515 @Override 516 public long getLong() { 517 int remaining = this.curItem.remaining(); 518 if (remaining >= Bytes.SIZEOF_LONG) { 519 return this.curItem.getLong(); 520 } 521 long l = 0; 522 for (int i = 0; i < Bytes.SIZEOF_LONG; i++) { 523 l <<= 8; 524 l ^= get() & 0xFF; 525 } 526 return l; 527 } 528 529 /** 530 * Copies the content from this MBB's current position to the byte array and fills it. Also 531 * advances the position of the MBB by the length of the byte[]. 532 * @param dst 533 */ 534 @Override 535 public void get(byte[] dst) { 536 get(dst, 0, dst.length); 537 } 538 539 /** 540 * Copies the specified number of bytes from this MBB's current position to the byte[]'s offset. 541 * Also advances the position of the MBB by the given length. 542 * @param dst 543 * @param offset within the current array 544 * @param length upto which the bytes to be copied 545 */ 546 @Override 547 public void get(byte[] dst, int offset, int length) { 548 while (length > 0) { 549 int toRead = Math.min(length, this.curItem.remaining()); 550 ByteBufferUtils.copyFromBufferToArray(dst, this.curItem, this.curItem.position(), offset, 551 toRead); 552 this.curItem.position(this.curItem.position() + toRead); 553 length -= toRead; 554 if (length == 0) break; 555 this.curItemIndex++; 556 this.curItem = this.items[this.curItemIndex]; 557 offset += toRead; 558 } 559 } 560 561 @Override 562 public void get(int sourceOffset, byte[] dst, int offset, int length) { 563 int itemIndex = getItemIndex(sourceOffset); 564 ByteBuffer item = this.items[itemIndex]; 565 sourceOffset = sourceOffset - this.itemBeginPos[itemIndex]; 566 while (length > 0) { 567 int toRead = Math.min((item.limit() - sourceOffset), length); 568 ByteBufferUtils.copyFromBufferToArray(dst, item, sourceOffset, offset, 569 toRead); 570 length -= toRead; 571 if (length == 0) break; 572 itemIndex++; 573 item = this.items[itemIndex]; 574 offset += toRead; 575 sourceOffset = 0; 576 } 577 } 578 579 /** 580 * Marks the limit of this MBB. 581 * @param limit 582 * @return This MBB 583 */ 584 @Override 585 public MultiByteBuff limit(int limit) { 586 this.limit = limit; 587 // Normally the limit will try to limit within the last BB item 588 int limitedIndexBegin = this.itemBeginPos[this.limitedItemIndex]; 589 if (limit >= limitedIndexBegin && limit < this.itemBeginPos[this.limitedItemIndex + 1]) { 590 this.items[this.limitedItemIndex].limit(limit - limitedIndexBegin); 591 return this; 592 } 593 int itemIndex = getItemIndex(limit); 594 int beginOffset = this.itemBeginPos[itemIndex]; 595 int offsetInItem = limit - beginOffset; 596 ByteBuffer item = items[itemIndex]; 597 item.limit(offsetInItem); 598 for (int i = this.limitedItemIndex; i < itemIndex; i++) { 599 this.items[i].limit(this.items[i].capacity()); 600 } 601 this.limitedItemIndex = itemIndex; 602 for (int i = itemIndex + 1; i < this.items.length; i++) { 603 this.items[i].limit(this.items[i].position()); 604 } 605 return this; 606 } 607 608 /** 609 * Returns the limit of this MBB 610 * @return limit of the MBB 611 */ 612 @Override 613 public int limit() { 614 return this.limit; 615 } 616 617 /** 618 * Returns an MBB which is a sliced version of this MBB. The position, limit and mark 619 * of the new MBB will be independent than that of the original MBB. 620 * The content of the new MBB will start at this MBB's current position 621 * @return a sliced MBB 622 */ 623 @Override 624 public MultiByteBuff slice() { 625 ByteBuffer[] copy = new ByteBuffer[this.limitedItemIndex - this.curItemIndex + 1]; 626 for (int i = curItemIndex, j = 0; i <= this.limitedItemIndex; i++, j++) { 627 copy[j] = this.items[i].slice(); 628 } 629 return new MultiByteBuff(copy); 630 } 631 632 /** 633 * Returns an MBB which is a duplicate version of this MBB. The position, limit and mark 634 * of the new MBB will be independent than that of the original MBB. 635 * The content of the new MBB will start at this MBB's current position 636 * The position, limit and mark of the new MBB would be identical to this MBB in terms of 637 * values. 638 * @return a sliced MBB 639 */ 640 @Override 641 public MultiByteBuff duplicate() { 642 ByteBuffer[] itemsCopy = new ByteBuffer[this.items.length]; 643 for (int i = 0; i < this.items.length; i++) { 644 itemsCopy[i] = items[i].duplicate(); 645 } 646 return new MultiByteBuff(itemsCopy, this.itemBeginPos, this.limit, this.limitedItemIndex, 647 this.curItemIndex, this.markedItemIndex); 648 } 649 650 /** 651 * Writes a byte to this MBB at the current position and increments the position 652 * @param b 653 * @return this object 654 */ 655 @Override 656 public MultiByteBuff put(byte b) { 657 if (this.curItem.remaining() == 0) { 658 if (this.curItemIndex == this.items.length - 1) { 659 throw new BufferOverflowException(); 660 } 661 this.curItemIndex++; 662 this.curItem = this.items[this.curItemIndex]; 663 } 664 this.curItem.put(b); 665 return this; 666 } 667 668 /** 669 * Writes a byte to this MBB at the given index 670 * @param index 671 * @param b 672 * @return this object 673 */ 674 @Override 675 public MultiByteBuff put(int index, byte b) { 676 int itemIndex = getItemIndex(limit); 677 ByteBuffer item = items[itemIndex]; 678 item.put(index - itemBeginPos[itemIndex], b); 679 return this; 680 } 681 682 /** 683 * Copies from a src MBB to this MBB. 684 * @param offset the position in this MBB to which the copy should happen 685 * @param src the src MBB 686 * @param srcOffset the offset in the src MBB from where the elements should be read 687 * @param length the length upto which the copy should happen 688 */ 689 @Override 690 public MultiByteBuff put(int offset, ByteBuff src, int srcOffset, int length) { 691 int destItemIndex = getItemIndex(offset); 692 int srcItemIndex = getItemIndex(srcOffset); 693 ByteBuffer destItem = this.items[destItemIndex]; 694 offset = offset - this.itemBeginPos[destItemIndex]; 695 696 ByteBuffer srcItem = getItemByteBuffer(src, srcItemIndex); 697 srcOffset = srcOffset - this.itemBeginPos[srcItemIndex]; 698 int toRead, toWrite, toMove; 699 while (length > 0) { 700 toWrite = destItem.limit() - offset; 701 toRead = srcItem.limit() - srcOffset; 702 toMove = Math.min(length, Math.min(toRead, toWrite)); 703 ByteBufferUtils.copyFromBufferToBuffer(srcItem, destItem, srcOffset, offset, toMove); 704 length -= toMove; 705 if (length == 0) break; 706 if (toRead < toWrite) { 707 srcItem = getItemByteBuffer(src, ++srcItemIndex); 708 srcOffset = 0; 709 offset += toMove; 710 } else if (toRead > toWrite) { 711 destItem = this.items[++destItemIndex]; 712 offset = 0; 713 srcOffset += toMove; 714 } else { 715 // toRead = toWrite case 716 srcItem = getItemByteBuffer(src, ++srcItemIndex); 717 srcOffset = 0; 718 destItem = this.items[++destItemIndex]; 719 offset = 0; 720 } 721 } 722 return this; 723 } 724 725 private static ByteBuffer getItemByteBuffer(ByteBuff buf, int index) { 726 return (buf instanceof SingleByteBuff) ? ((SingleByteBuff) buf).getEnclosingByteBuffer() 727 : ((MultiByteBuff) buf).items[index]; 728 } 729 730 /** 731 * Writes an int to this MBB at its current position. Also advances the position by size of int 732 * @param val Int value to write 733 * @return this object 734 */ 735 @Override 736 public MultiByteBuff putInt(int val) { 737 if (this.curItem.remaining() >= Bytes.SIZEOF_INT) { 738 this.curItem.putInt(val); 739 return this; 740 } 741 if (this.curItemIndex == this.items.length - 1) { 742 throw new BufferOverflowException(); 743 } 744 // During read, we will read as byte by byte for this case. So just write in Big endian 745 put(int3(val)); 746 put(int2(val)); 747 put(int1(val)); 748 put(int0(val)); 749 return this; 750 } 751 752 private static byte int3(int x) { 753 return (byte) (x >> 24); 754 } 755 756 private static byte int2(int x) { 757 return (byte) (x >> 16); 758 } 759 760 private static byte int1(int x) { 761 return (byte) (x >> 8); 762 } 763 764 private static byte int0(int x) { 765 return (byte) (x); 766 } 767 768 /** 769 * Copies from the given byte[] to this MBB 770 * @param src 771 * @return this MBB 772 */ 773 @Override 774 public final MultiByteBuff put(byte[] src) { 775 return put(src, 0, src.length); 776 } 777 778 /** 779 * Copies from the given byte[] to this MBB 780 * @param src 781 * @param offset the position in the byte array from which the copy should be done 782 * @param length the length upto which the copy should happen 783 * @return this MBB 784 */ 785 @Override 786 public MultiByteBuff put(byte[] src, int offset, int length) { 787 if (this.curItem.remaining() >= length) { 788 ByteBufferUtils.copyFromArrayToBuffer(this.curItem, src, offset, length); 789 return this; 790 } 791 int end = offset + length; 792 for (int i = offset; i < end; i++) { 793 this.put(src[i]); 794 } 795 return this; 796 } 797 798 799 /** 800 * Writes a long to this MBB at its current position. Also advances the position by size of long 801 * @param val Long value to write 802 * @return this object 803 */ 804 @Override 805 public MultiByteBuff putLong(long val) { 806 if (this.curItem.remaining() >= Bytes.SIZEOF_LONG) { 807 this.curItem.putLong(val); 808 return this; 809 } 810 if (this.curItemIndex == this.items.length - 1) { 811 throw new BufferOverflowException(); 812 } 813 // During read, we will read as byte by byte for this case. So just write in Big endian 814 put(long7(val)); 815 put(long6(val)); 816 put(long5(val)); 817 put(long4(val)); 818 put(long3(val)); 819 put(long2(val)); 820 put(long1(val)); 821 put(long0(val)); 822 return this; 823 } 824 825 private static byte long7(long x) { 826 return (byte) (x >> 56); 827 } 828 829 private static byte long6(long x) { 830 return (byte) (x >> 48); 831 } 832 833 private static byte long5(long x) { 834 return (byte) (x >> 40); 835 } 836 837 private static byte long4(long x) { 838 return (byte) (x >> 32); 839 } 840 841 private static byte long3(long x) { 842 return (byte) (x >> 24); 843 } 844 845 private static byte long2(long x) { 846 return (byte) (x >> 16); 847 } 848 849 private static byte long1(long x) { 850 return (byte) (x >> 8); 851 } 852 853 private static byte long0(long x) { 854 return (byte) (x); 855 } 856 857 /** 858 * Jumps the current position of this MBB by specified length. 859 * @param length 860 */ 861 @Override 862 public MultiByteBuff skip(int length) { 863 // Get available bytes from this item and remaining from next 864 int jump = 0; 865 while (true) { 866 jump = this.curItem.remaining(); 867 if (jump >= length) { 868 this.curItem.position(this.curItem.position() + length); 869 break; 870 } 871 this.curItem.position(this.curItem.position() + jump); 872 length -= jump; 873 this.curItemIndex++; 874 this.curItem = this.items[this.curItemIndex]; 875 } 876 return this; 877 } 878 879 /** 880 * Jumps back the current position of this MBB by specified length. 881 * @param length 882 */ 883 @Override 884 public MultiByteBuff moveBack(int length) { 885 while (length != 0) { 886 if (length > curItem.position()) { 887 length -= curItem.position(); 888 this.curItem.position(0); 889 this.curItemIndex--; 890 this.curItem = this.items[curItemIndex]; 891 } else { 892 this.curItem.position(curItem.position() - length); 893 break; 894 } 895 } 896 return this; 897 } 898 899 /** 900 * Returns bytes from current position till length specified, as a single ByteBuffer. When all 901 * these bytes happen to be in a single ByteBuffer, which this object wraps, that ByteBuffer item 902 * as such will be returned. So users are warned not to change the position or limit of this 903 * returned ByteBuffer. The position of the returned byte buffer is at the begin of the required 904 * bytes. When the required bytes happen to span across multiple ByteBuffers, this API will copy 905 * the bytes to a newly created ByteBuffer of required size and return that. 906 * 907 * @param length number of bytes required. 908 * @return bytes from current position till length specified, as a single ByteButter. 909 */ 910 @Override 911 public ByteBuffer asSubByteBuffer(int length) { 912 if (this.curItem.remaining() >= length) { 913 return this.curItem; 914 } 915 int offset = 0; 916 byte[] dupB = new byte[length]; 917 int locCurItemIndex = curItemIndex; 918 ByteBuffer locCurItem = curItem; 919 while (length > 0) { 920 int toRead = Math.min(length, locCurItem.remaining()); 921 ByteBufferUtils 922 .copyFromBufferToArray(dupB, locCurItem, locCurItem.position(), offset, toRead); 923 length -= toRead; 924 if (length == 0) break; 925 locCurItemIndex++; 926 locCurItem = this.items[locCurItemIndex]; 927 offset += toRead; 928 } 929 return ByteBuffer.wrap(dupB); 930 } 931 932 /** 933 * Returns bytes from given offset till length specified, as a single ByteBuffer. When all these 934 * bytes happen to be in a single ByteBuffer, which this object wraps, that ByteBuffer item as 935 * such will be returned (with offset in this ByteBuffer where the bytes starts). So users are 936 * warned not to change the position or limit of this returned ByteBuffer. When the required bytes 937 * happen to span across multiple ByteBuffers, this API will copy the bytes to a newly created 938 * ByteBuffer of required size and return that. 939 * 940 * @param offset the offset in this MBB from where the subBuffer should be created 941 * @param length the length of the subBuffer 942 * @param pair a pair that will have the bytes from the current position till length specified, as 943 * a single ByteBuffer and offset in that Buffer where the bytes starts. The method would 944 * set the values on the pair that is passed in by the caller 945 */ 946 @Override 947 public void asSubByteBuffer(int offset, int length, ObjectIntPair<ByteBuffer> pair) { 948 if (this.itemBeginPos[this.curItemIndex] <= offset) { 949 int relOffsetInCurItem = offset - this.itemBeginPos[this.curItemIndex]; 950 if (this.curItem.limit() - relOffsetInCurItem >= length) { 951 pair.setFirst(this.curItem); 952 pair.setSecond(relOffsetInCurItem); 953 return; 954 } 955 } 956 int itemIndex = getItemIndex(offset); 957 ByteBuffer item = this.items[itemIndex]; 958 offset = offset - this.itemBeginPos[itemIndex]; 959 if (item.limit() - offset >= length) { 960 pair.setFirst(item); 961 pair.setSecond(offset); 962 return; 963 } 964 byte[] dst = new byte[length]; 965 int destOffset = 0; 966 while (length > 0) { 967 int toRead = Math.min(length, item.limit() - offset); 968 ByteBufferUtils.copyFromBufferToArray(dst, item, offset, destOffset, toRead); 969 length -= toRead; 970 if (length == 0) break; 971 itemIndex++; 972 item = this.items[itemIndex]; 973 destOffset += toRead; 974 offset = 0; 975 } 976 pair.setFirst(ByteBuffer.wrap(dst)); 977 pair.setSecond(0); 978 return; 979 } 980 981 /** 982 * Copies the content from an this MBB to a ByteBuffer 983 * @param out the ByteBuffer to which the copy has to happen 984 * @param sourceOffset the offset in the MBB from which the elements has 985 * to be copied 986 * @param length the length in the MBB upto which the elements has to be copied 987 */ 988 @Override 989 public void get(ByteBuffer out, int sourceOffset, 990 int length) { 991 // Not used from real read path actually. So not going with 992 // optimization 993 for (int i = 0; i < length; ++i) { 994 out.put(this.get(sourceOffset + i)); 995 } 996 } 997 998 /** 999 * Copy the content from this MBB to a byte[] based on the given offset and 1000 * length 1001 * 1002 * @param offset 1003 * the position from where the copy should start 1004 * @param length 1005 * the length upto which the copy has to be done 1006 * @return byte[] with the copied contents from this MBB. 1007 */ 1008 @Override 1009 public byte[] toBytes(int offset, int length) { 1010 byte[] output = new byte[length]; 1011 this.get(offset, output, 0, length); 1012 return output; 1013 } 1014 1015 @Override 1016 public int read(ReadableByteChannel channel) throws IOException { 1017 int total = 0; 1018 while (true) { 1019 // Read max possible into the current BB 1020 int len = channelRead(channel, this.curItem); 1021 if (len > 0) 1022 total += len; 1023 if (this.curItem.hasRemaining()) { 1024 // We were not able to read enough to fill the current BB itself. Means there is no point in 1025 // doing more reads from Channel. Only this much there for now. 1026 break; 1027 } else { 1028 if (this.curItemIndex >= this.limitedItemIndex) break; 1029 this.curItemIndex++; 1030 this.curItem = this.items[this.curItemIndex]; 1031 } 1032 } 1033 return total; 1034 } 1035 1036 @Override 1037 public boolean equals(Object obj) { 1038 if (!(obj instanceof MultiByteBuff)) return false; 1039 if (this == obj) return true; 1040 MultiByteBuff that = (MultiByteBuff) obj; 1041 if (this.capacity() != that.capacity()) return false; 1042 if (ByteBuff.compareTo(this, this.position(), this.limit(), that, that.position(), 1043 that.limit()) == 0) { 1044 return true; 1045 } 1046 return false; 1047 } 1048 1049 @Override 1050 public int hashCode() { 1051 int hash = 0; 1052 for (ByteBuffer b : this.items) { 1053 hash += b.hashCode(); 1054 } 1055 return hash; 1056 } 1057 1058 /** 1059 * @return the ByteBuffers which this wraps. 1060 */ 1061 @VisibleForTesting 1062 public ByteBuffer[] getEnclosingByteBuffers() { 1063 return this.items; 1064 } 1065}