001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.nio;
019
020import static org.apache.hadoop.hbase.io.ByteBuffAllocator.NONE;
021
022import java.io.IOException;
023import java.nio.BufferOverflowException;
024import java.nio.BufferUnderflowException;
025import java.nio.ByteBuffer;
026import java.nio.InvalidMarkException;
027import java.nio.channels.FileChannel;
028import java.nio.channels.ReadableByteChannel;
029import java.util.Iterator;
030import java.util.NoSuchElementException;
031import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
032import org.apache.hadoop.hbase.util.ByteBufferUtils;
033import org.apache.hadoop.hbase.util.Bytes;
034import org.apache.hadoop.hbase.util.ObjectIntPair;
035import org.apache.yetus.audience.InterfaceAudience;
036
037/**
038 * Provides a unified view of all the underlying ByteBuffers and will look as if a bigger sequential
039 * buffer. This class provides similar APIs as in {@link ByteBuffer} to put/get int, short, long etc
040 * and doing operations like mark, reset, slice etc. This has to be used when data is split across
041 * multiple byte buffers and we don't want copy them to single buffer for reading from it.
042 */
043@InterfaceAudience.Private
044public class MultiByteBuff extends ByteBuff {
045
046  private final ByteBuffer[] items;
047  // Pointer to the current item in the MBB
048  private ByteBuffer curItem = null;
049  // Index of the current item in the MBB
050  private int curItemIndex = 0;
051
052  private int limit = 0;
053  private int limitedItemIndex;
054  private int markedItemIndex = -1;
055  private final int[] itemBeginPos;
056
057  private Iterator<ByteBuffer> buffsIterator = new Iterator<ByteBuffer>() {
058    @Override
059    public boolean hasNext() {
060      return curItemIndex < limitedItemIndex
061        || (curItemIndex == limitedItemIndex && items[curItemIndex].hasRemaining());
062    }
063
064    @Override
065    public ByteBuffer next() {
066      if (curItemIndex >= items.length) {
067        throw new NoSuchElementException("items overflow");
068      }
069      curItem = items[curItemIndex++];
070      return curItem;
071    }
072  };
073
074  public MultiByteBuff(ByteBuffer... items) {
075    this(NONE, items);
076  }
077
078  public MultiByteBuff(Recycler recycler, ByteBuffer... items) {
079    this(new RefCnt(recycler), items);
080  }
081
082  MultiByteBuff(RefCnt refCnt, ByteBuffer... items) {
083    this.refCnt = refCnt;
084    assert items != null;
085    assert items.length > 0;
086    this.items = items;
087    this.curItem = this.items[this.curItemIndex];
088    // See below optimization in getInt(int) where we check whether the given index land in current
089    // item. For this we need to check whether the passed index is less than the next item begin
090    // offset. To handle this effectively for the last item buffer, we add an extra item into this
091    // array.
092    itemBeginPos = new int[items.length + 1];
093    int offset = 0;
094    for (int i = 0; i < items.length; i++) {
095      ByteBuffer item = items[i];
096      item.rewind();
097      itemBeginPos[i] = offset;
098      int l = item.limit() - item.position();
099      offset += l;
100    }
101    this.limit = offset;
102    this.itemBeginPos[items.length] = offset + 1;
103    this.limitedItemIndex = this.items.length - 1;
104  }
105
106  private MultiByteBuff(RefCnt refCnt, ByteBuffer[] items, int[] itemBeginPos, int limit,
107    int limitedIndex, int curItemIndex, int markedIndex) {
108    this.refCnt = refCnt;
109    this.items = items;
110    this.curItemIndex = curItemIndex;
111    this.curItem = this.items[this.curItemIndex];
112    this.itemBeginPos = itemBeginPos;
113    this.limit = limit;
114    this.limitedItemIndex = limitedIndex;
115    this.markedItemIndex = markedIndex;
116  }
117
118  /**
119   * @throws UnsupportedOperationException MBB does not support array based operations
120   */
121  @Override
122  public byte[] array() {
123    throw new UnsupportedOperationException();
124  }
125
126  /**
127   * @throws UnsupportedOperationException MBB does not support array based operations
128   */
129  @Override
130  public int arrayOffset() {
131    throw new UnsupportedOperationException();
132  }
133
134  /** Returns false. MBB does not support array based operations */
135  @Override
136  public boolean hasArray() {
137    return false;
138  }
139
140  /** Returns the total capacity of this MultiByteBuffer. */
141  @Override
142  public int capacity() {
143    checkRefCount();
144    int c = 0;
145    for (ByteBuffer item : this.items) {
146      c += item.capacity();
147    }
148    return c;
149  }
150
151  /**
152   * Fetches the byte at the given index. Does not change position of the underlying ByteBuffers
153   * @return the byte at the given index
154   */
155  @Override
156  public byte get(int index) {
157    checkRefCount();
158    int itemIndex = getItemIndex(index);
159    return ByteBufferUtils.toByte(this.items[itemIndex], index - this.itemBeginPos[itemIndex]);
160  }
161
162  @Override
163  public byte getByteAfterPosition(int offset) {
164    checkRefCount();
165    // Mostly the index specified will land within this current item. Short circuit for that
166    int index = offset + this.position();
167    int itemIndex = getItemIndexFromCurItemIndex(index);
168    return ByteBufferUtils.toByte(this.items[itemIndex], index - this.itemBeginPos[itemIndex]);
169  }
170
171  /*
172   * Returns in which sub ByteBuffer, the given element index will be available.
173   */
174  private int getItemIndex(int elemIndex) {
175    if (elemIndex < 0) {
176      throw new IndexOutOfBoundsException();
177    }
178    int index = 1;
179    while (elemIndex >= this.itemBeginPos[index]) {
180      index++;
181      if (index == this.itemBeginPos.length) {
182        throw new IndexOutOfBoundsException();
183      }
184    }
185    return index - 1;
186  }
187
188  /*
189   * Returns in which sub ByteBuffer, the given element index will be available. In this case we are
190   * sure that the item will be after MBB's current position
191   */
192  private int getItemIndexFromCurItemIndex(int elemIndex) {
193    int index = this.curItemIndex;
194    while (elemIndex >= this.itemBeginPos[index]) {
195      index++;
196      if (index == this.itemBeginPos.length) {
197        throw new IndexOutOfBoundsException();
198      }
199    }
200    return index - 1;
201  }
202
203  /**
204   * Fetches the int at the given index. Does not change position of the underlying ByteBuffers
205   * @return the int value at the given index
206   */
207  @Override
208  public int getInt(int index) {
209    checkRefCount();
210    // Mostly the index specified will land within this current item. Short circuit for that
211    int itemIndex;
212    if (
213      this.itemBeginPos[this.curItemIndex] <= index
214        && this.itemBeginPos[this.curItemIndex + 1] > index
215    ) {
216      itemIndex = this.curItemIndex;
217    } else {
218      itemIndex = getItemIndex(index);
219    }
220    return getInt(index, itemIndex);
221  }
222
223  @Override
224  public int getIntAfterPosition(int offset) {
225    checkRefCount();
226    // Mostly the index specified will land within this current item. Short circuit for that
227    int index = offset + this.position();
228    int itemIndex;
229    if (this.itemBeginPos[this.curItemIndex + 1] > index) {
230      itemIndex = this.curItemIndex;
231    } else {
232      itemIndex = getItemIndexFromCurItemIndex(index);
233    }
234    return getInt(index, itemIndex);
235  }
236
237  /**
238   * Fetches the short at the given index. Does not change position of the underlying ByteBuffers
239   * @return the short value at the given index
240   */
241  @Override
242  public short getShort(int index) {
243    checkRefCount();
244    // Mostly the index specified will land within this current item. Short circuit for that
245    int itemIndex;
246    if (
247      this.itemBeginPos[this.curItemIndex] <= index
248        && this.itemBeginPos[this.curItemIndex + 1] > index
249    ) {
250      itemIndex = this.curItemIndex;
251    } else {
252      itemIndex = getItemIndex(index);
253    }
254    ByteBuffer item = items[itemIndex];
255    int offsetInItem = index - this.itemBeginPos[itemIndex];
256    if (item.limit() - offsetInItem >= Bytes.SIZEOF_SHORT) {
257      return ByteBufferUtils.toShort(item, offsetInItem);
258    }
259    if (items.length - 1 == itemIndex) {
260      // means cur item is the last one and we wont be able to read a int. Throw exception
261      throw new BufferUnderflowException();
262    }
263    ByteBuffer nextItem = items[itemIndex + 1];
264    // Get available one byte from this item and remaining one from next
265    short n = 0;
266    n = (short) (n ^ (ByteBufferUtils.toByte(item, offsetInItem) & 0xFF));
267    n = (short) (n << 8);
268    n = (short) (n ^ (ByteBufferUtils.toByte(nextItem, 0) & 0xFF));
269    return n;
270  }
271
272  @Override
273  public short getShortAfterPosition(int offset) {
274    checkRefCount();
275    // Mostly the index specified will land within this current item. Short circuit for that
276    int index = offset + this.position();
277    int itemIndex;
278    if (this.itemBeginPos[this.curItemIndex + 1] > index) {
279      itemIndex = this.curItemIndex;
280    } else {
281      itemIndex = getItemIndexFromCurItemIndex(index);
282    }
283    return getShort(index, itemIndex);
284  }
285
286  private int getInt(int index, int itemIndex) {
287    ByteBuffer item = items[itemIndex];
288    int offsetInItem = index - this.itemBeginPos[itemIndex];
289    int remainingLen = item.limit() - offsetInItem;
290    if (remainingLen >= Bytes.SIZEOF_INT) {
291      return ByteBufferUtils.toInt(item, offsetInItem);
292    }
293    if (items.length - 1 == itemIndex) {
294      // means cur item is the last one and we wont be able to read a int. Throw exception
295      throw new BufferUnderflowException();
296    }
297    int l = 0;
298    for (int i = 0; i < Bytes.SIZEOF_INT; i++) {
299      l <<= 8;
300      l ^= get(index + i) & 0xFF;
301    }
302    return l;
303  }
304
305  private short getShort(int index, int itemIndex) {
306    ByteBuffer item = items[itemIndex];
307    int offsetInItem = index - this.itemBeginPos[itemIndex];
308    int remainingLen = item.limit() - offsetInItem;
309    if (remainingLen >= Bytes.SIZEOF_SHORT) {
310      return ByteBufferUtils.toShort(item, offsetInItem);
311    }
312    if (items.length - 1 == itemIndex) {
313      // means cur item is the last one and we wont be able to read a short. Throw exception
314      throw new BufferUnderflowException();
315    }
316    ByteBuffer nextItem = items[itemIndex + 1];
317    // Get available bytes from this item and remaining from next
318    short l = 0;
319    for (int i = offsetInItem; i < item.capacity(); i++) {
320      l = (short) (l << 8);
321      l = (short) (l ^ (ByteBufferUtils.toByte(item, i) & 0xFF));
322    }
323    for (int i = 0; i < Bytes.SIZEOF_SHORT - remainingLen; i++) {
324      l = (short) (l << 8);
325      l = (short) (l ^ (ByteBufferUtils.toByte(nextItem, i) & 0xFF));
326    }
327    return l;
328  }
329
330  private long getLong(int index, int itemIndex) {
331    ByteBuffer item = items[itemIndex];
332    int offsetInItem = index - this.itemBeginPos[itemIndex];
333    int remainingLen = item.limit() - offsetInItem;
334    if (remainingLen >= Bytes.SIZEOF_LONG) {
335      return ByteBufferUtils.toLong(item, offsetInItem);
336    }
337    if (items.length - 1 == itemIndex) {
338      // means cur item is the last one and we wont be able to read a long. Throw exception
339      throw new BufferUnderflowException();
340    }
341    long l = 0;
342    for (int i = 0; i < Bytes.SIZEOF_LONG; i++) {
343      l <<= 8;
344      l ^= get(index + i) & 0xFF;
345    }
346    return l;
347  }
348
349  /**
350   * Fetches the long at the given index. Does not change position of the underlying ByteBuffers
351   * @return the long value at the given index
352   */
353  @Override
354  public long getLong(int index) {
355    checkRefCount();
356    // Mostly the index specified will land within this current item. Short circuit for that
357    int itemIndex;
358    if (
359      this.itemBeginPos[this.curItemIndex] <= index
360        && this.itemBeginPos[this.curItemIndex + 1] > index
361    ) {
362      itemIndex = this.curItemIndex;
363    } else {
364      itemIndex = getItemIndex(index);
365    }
366    return getLong(index, itemIndex);
367  }
368
369  @Override
370  public long getLongAfterPosition(int offset) {
371    checkRefCount();
372    // Mostly the index specified will land within this current item. Short circuit for that
373    int index = offset + this.position();
374    int itemIndex;
375    if (this.itemBeginPos[this.curItemIndex + 1] > index) {
376      itemIndex = this.curItemIndex;
377    } else {
378      itemIndex = getItemIndexFromCurItemIndex(index);
379    }
380    return getLong(index, itemIndex);
381  }
382
383  /** Returns this MBB's current position */
384  @Override
385  public int position() {
386    checkRefCount();
387    return itemBeginPos[this.curItemIndex] + this.curItem.position();
388  }
389
390  /**
391   * Sets this MBB's position to the given value.
392   * @return this object
393   */
394  @Override
395  public MultiByteBuff position(int position) {
396    checkRefCount();
397    // Short circuit for positioning within the cur item. Mostly that is the case.
398    if (
399      this.itemBeginPos[this.curItemIndex] <= position
400        && this.itemBeginPos[this.curItemIndex + 1] > position
401    ) {
402      this.curItem.position(position - this.itemBeginPos[this.curItemIndex]);
403      return this;
404    }
405    int itemIndex = getItemIndex(position);
406    // All items from 0 - curItem-1 set position at end.
407    for (int i = 0; i < itemIndex; i++) {
408      this.items[i].position(this.items[i].limit());
409    }
410    // All items after curItem set position at begin
411    for (int i = itemIndex + 1; i < this.items.length; i++) {
412      this.items[i].position(0);
413    }
414    this.curItem = this.items[itemIndex];
415    this.curItem.position(position - this.itemBeginPos[itemIndex]);
416    this.curItemIndex = itemIndex;
417    return this;
418  }
419
420  /**
421   * Rewinds this MBB and the position is set to 0
422   * @return this object
423   */
424  @Override
425  public MultiByteBuff rewind() {
426    checkRefCount();
427    for (int i = 0; i < this.items.length; i++) {
428      this.items[i].rewind();
429    }
430    this.curItemIndex = 0;
431    this.curItem = this.items[this.curItemIndex];
432    this.markedItemIndex = -1;
433    return this;
434  }
435
436  /**
437   * Marks the current position of the MBB
438   * @return this object
439   */
440  @Override
441  public MultiByteBuff mark() {
442    checkRefCount();
443    this.markedItemIndex = this.curItemIndex;
444    this.curItem.mark();
445    return this;
446  }
447
448  /**
449   * Similar to {@link ByteBuffer}.reset(), ensures that this MBB is reset back to last marked
450   * position.
451   * @return This MBB
452   */
453  @Override
454  public MultiByteBuff reset() {
455    checkRefCount();
456    // when the buffer is moved to the next one.. the reset should happen on the previous marked
457    // item and the new one should be taken as the base
458    if (this.markedItemIndex < 0) throw new InvalidMarkException();
459    ByteBuffer markedItem = this.items[this.markedItemIndex];
460    markedItem.reset();
461    this.curItem = markedItem;
462    // All items after the marked position upto the current item should be reset to 0
463    for (int i = this.curItemIndex; i > this.markedItemIndex; i--) {
464      this.items[i].position(0);
465    }
466    this.curItemIndex = this.markedItemIndex;
467    return this;
468  }
469
470  /**
471   * Returns the number of elements between the current position and the limit.
472   * @return the remaining elements in this MBB
473   */
474  @Override
475  public int remaining() {
476    checkRefCount();
477    int remain = 0;
478    for (int i = curItemIndex; i < items.length; i++) {
479      remain += items[i].remaining();
480    }
481    return remain;
482  }
483
484  /**
485   * Returns true if there are elements between the current position and the limt
486   * @return true if there are elements, false otherwise
487   */
488  @Override
489  public final boolean hasRemaining() {
490    checkRefCount();
491    return this.curItem.hasRemaining() || (this.curItemIndex < this.limitedItemIndex
492      && this.items[this.curItemIndex + 1].hasRemaining());
493  }
494
495  /**
496   * A relative method that returns byte at the current position. Increments the current position by
497   * the size of a byte.
498   * @return the byte at the current position
499   */
500  @Override
501  public byte get() {
502    checkRefCount();
503    if (this.curItem.remaining() == 0) {
504      if (items.length - 1 == this.curItemIndex) {
505        // means cur item is the last one and we wont be able to read a long. Throw exception
506        throw new BufferUnderflowException();
507      }
508      this.curItemIndex++;
509      this.curItem = this.items[this.curItemIndex];
510    }
511    return this.curItem.get();
512  }
513
514  /**
515   * Returns the short value at the current position. Also advances the position by the size of
516   * short
517   * @return the short value at the current position
518   */
519  @Override
520  public short getShort() {
521    checkRefCount();
522    int remaining = this.curItem.remaining();
523    if (remaining >= Bytes.SIZEOF_SHORT) {
524      return this.curItem.getShort();
525    }
526    short n = 0;
527    n = (short) (n ^ (get() & 0xFF));
528    n = (short) (n << 8);
529    n = (short) (n ^ (get() & 0xFF));
530    return n;
531  }
532
533  /**
534   * Returns the int value at the current position. Also advances the position by the size of int
535   * @return the int value at the current position
536   */
537  @Override
538  public int getInt() {
539    checkRefCount();
540    int remaining = this.curItem.remaining();
541    if (remaining >= Bytes.SIZEOF_INT) {
542      return this.curItem.getInt();
543    }
544    int n = 0;
545    for (int i = 0; i < Bytes.SIZEOF_INT; i++) {
546      n <<= 8;
547      n ^= get() & 0xFF;
548    }
549    return n;
550  }
551
552  /**
553   * Returns the long value at the current position. Also advances the position by the size of long
554   * @return the long value at the current position
555   */
556  @Override
557  public long getLong() {
558    checkRefCount();
559    int remaining = this.curItem.remaining();
560    if (remaining >= Bytes.SIZEOF_LONG) {
561      return this.curItem.getLong();
562    }
563    long l = 0;
564    for (int i = 0; i < Bytes.SIZEOF_LONG; i++) {
565      l <<= 8;
566      l ^= get() & 0xFF;
567    }
568    return l;
569  }
570
571  /**
572   * Copies the content from this MBB's current position to the byte array and fills it. Also
573   * advances the position of the MBB by the length of the byte[].
574   */
575  @Override
576  public void get(byte[] dst) {
577    get(dst, 0, dst.length);
578  }
579
580  /**
581   * Copies the specified number of bytes from this MBB's current position to the byte[]'s offset.
582   * Also advances the position of the MBB by the given length.
583   */
584  @Override
585  public void get(byte[] dst, int offset, int length) {
586    checkRefCount();
587    while (length > 0) {
588      int toRead = Math.min(length, this.curItem.remaining());
589      ByteBufferUtils.copyFromBufferToArray(dst, this.curItem, this.curItem.position(), offset,
590        toRead);
591      this.curItem.position(this.curItem.position() + toRead);
592      length -= toRead;
593      if (length == 0) break;
594      this.curItemIndex++;
595      this.curItem = this.items[this.curItemIndex];
596      offset += toRead;
597    }
598  }
599
600  @Override
601  public void get(int sourceOffset, byte[] dst, int offset, int length) {
602    checkRefCount();
603    int itemIndex = getItemIndex(sourceOffset);
604    ByteBuffer item = this.items[itemIndex];
605    sourceOffset = sourceOffset - this.itemBeginPos[itemIndex];
606    while (length > 0) {
607      int toRead = Math.min((item.limit() - sourceOffset), length);
608      ByteBufferUtils.copyFromBufferToArray(dst, item, sourceOffset, offset, toRead);
609      length -= toRead;
610      if (length == 0) break;
611      itemIndex++;
612      item = this.items[itemIndex];
613      offset += toRead;
614      sourceOffset = 0;
615    }
616  }
617
618  /**
619   * Marks the limit of this MBB.
620   * @return This MBB
621   */
622  @Override
623  public MultiByteBuff limit(int limit) {
624    checkRefCount();
625    this.limit = limit;
626    // Normally the limit will try to limit within the last BB item
627    int limitedIndexBegin = this.itemBeginPos[this.limitedItemIndex];
628    if (limit >= limitedIndexBegin && limit < this.itemBeginPos[this.limitedItemIndex + 1]) {
629      this.items[this.limitedItemIndex].limit(limit - limitedIndexBegin);
630      return this;
631    }
632    int itemIndex = getItemIndex(limit);
633    int beginOffset = this.itemBeginPos[itemIndex];
634    int offsetInItem = limit - beginOffset;
635    ByteBuffer item = items[itemIndex];
636    item.limit(offsetInItem);
637    for (int i = this.limitedItemIndex; i < itemIndex; i++) {
638      this.items[i].limit(this.items[i].capacity());
639    }
640    this.limitedItemIndex = itemIndex;
641    for (int i = itemIndex + 1; i < this.items.length; i++) {
642      this.items[i].limit(this.items[i].position());
643    }
644    return this;
645  }
646
647  /**
648   * Returns the limit of this MBB
649   * @return limit of the MBB
650   */
651  @Override
652  public int limit() {
653    return this.limit;
654  }
655
656  /**
657   * Returns an MBB which is a sliced version of this MBB. The position, limit and mark of the new
658   * MBB will be independent than that of the original MBB. The content of the new MBB will start at
659   * this MBB's current position
660   * @return a sliced MBB
661   */
662  @Override
663  public MultiByteBuff slice() {
664    checkRefCount();
665    ByteBuffer[] copy = new ByteBuffer[this.limitedItemIndex - this.curItemIndex + 1];
666    for (int i = curItemIndex, j = 0; i <= this.limitedItemIndex; i++, j++) {
667      copy[j] = this.items[i].slice();
668    }
669    return new MultiByteBuff(refCnt, copy);
670  }
671
672  /**
673   * Returns an MBB which is a duplicate version of this MBB. The position, limit and mark of the
674   * new MBB will be independent than that of the original MBB. The content of the new MBB will
675   * start at this MBB's current position The position, limit and mark of the new MBB would be
676   * identical to this MBB in terms of values.
677   * @return a duplicated MBB
678   */
679  @Override
680  public MultiByteBuff duplicate() {
681    checkRefCount();
682    ByteBuffer[] itemsCopy = new ByteBuffer[this.items.length];
683    for (int i = 0; i < this.items.length; i++) {
684      itemsCopy[i] = items[i].duplicate();
685    }
686    return new MultiByteBuff(refCnt, itemsCopy, this.itemBeginPos, this.limit,
687      this.limitedItemIndex, this.curItemIndex, this.markedItemIndex);
688  }
689
690  /**
691   * Writes a byte to this MBB at the current position and increments the position
692   * @return this object
693   */
694  @Override
695  public MultiByteBuff put(byte b) {
696    checkRefCount();
697    if (this.curItem.remaining() == 0) {
698      if (this.curItemIndex == this.items.length - 1) {
699        throw new BufferOverflowException();
700      }
701      this.curItemIndex++;
702      this.curItem = this.items[this.curItemIndex];
703    }
704    this.curItem.put(b);
705    return this;
706  }
707
708  /**
709   * Writes a byte to this MBB at the given index and won't affect the position of any of the
710   * buffers.
711   * @return this object
712   * @throws IndexOutOfBoundsException If <tt>index</tt> is negative or not smaller than the
713   *                                   {@link MultiByteBuff#limit}
714   */
715  @Override
716  public MultiByteBuff put(int index, byte b) {
717    checkRefCount();
718    int itemIndex = getItemIndex(index);
719    ByteBuffer item = items[itemIndex];
720    item.put(index - itemBeginPos[itemIndex], b);
721    return this;
722  }
723
724  /**
725   * Copies from a src BB to this MBB. This will be absolute positional copying and won't affect the
726   * position of any of the buffers.
727   * @param destOffset the position in this MBB to which the copy should happen
728   * @param src        the src MBB
729   * @param srcOffset  the offset in the src MBB from where the elements should be read
730   * @param length     the length upto which the copy should happen
731   * @throws BufferUnderflowException If there are fewer than length bytes remaining in src
732   *                                  ByteBuff.
733   * @throws BufferOverflowException  If there is insufficient available space in this MBB for
734   *                                  length bytes.
735   */
736  @Override
737  public MultiByteBuff put(int destOffset, ByteBuff src, int srcOffset, int length) {
738    checkRefCount();
739    int destItemIndex = getItemIndex(destOffset);
740    int srcItemIndex = getItemIndexForByteBuff(src, srcOffset, length);
741
742    ByteBuffer destItem = this.items[destItemIndex];
743    destOffset = this.getRelativeOffset(destOffset, destItemIndex);
744
745    ByteBuffer srcItem = getItemByteBuffer(src, srcItemIndex);
746    srcOffset = getRelativeOffsetForByteBuff(src, srcOffset, srcItemIndex);
747
748    while (length > 0) {
749      int toWrite = destItem.limit() - destOffset;
750      if (toWrite <= 0) {
751        throw new BufferOverflowException();
752      }
753      int toRead = srcItem.limit() - srcOffset;
754      if (toRead <= 0) {
755        throw new BufferUnderflowException();
756      }
757      int toMove = Math.min(length, Math.min(toRead, toWrite));
758      ByteBufferUtils.copyFromBufferToBuffer(srcItem, destItem, srcOffset, destOffset, toMove);
759      length -= toMove;
760      if (length == 0) {
761        break;
762      }
763      if (toRead < toWrite) {
764        if (++srcItemIndex >= getItemByteBufferCount(src)) {
765          throw new BufferUnderflowException();
766        }
767        srcItem = getItemByteBuffer(src, srcItemIndex);
768        srcOffset = 0;
769        destOffset += toMove;
770      } else if (toRead > toWrite) {
771        if (++destItemIndex >= this.items.length) {
772          throw new BufferOverflowException();
773        }
774        destItem = this.items[destItemIndex];
775        destOffset = 0;
776        srcOffset += toMove;
777      } else {
778        // toRead = toWrite case
779        if (++srcItemIndex >= getItemByteBufferCount(src)) {
780          throw new BufferUnderflowException();
781        }
782        srcItem = getItemByteBuffer(src, srcItemIndex);
783        srcOffset = 0;
784        if (++destItemIndex >= this.items.length) {
785          throw new BufferOverflowException();
786        }
787        destItem = this.items[destItemIndex];
788        destOffset = 0;
789      }
790    }
791    return this;
792  }
793
794  private static ByteBuffer getItemByteBuffer(ByteBuff buf, int byteBufferIndex) {
795    if (buf instanceof SingleByteBuff) {
796      if (byteBufferIndex != 0) {
797        throw new IndexOutOfBoundsException(
798          "index:[" + byteBufferIndex + "],but only index 0 is valid.");
799      }
800      return buf.nioByteBuffers()[0];
801    }
802    MultiByteBuff multiByteBuff = (MultiByteBuff) buf;
803    if (byteBufferIndex < 0 || byteBufferIndex >= multiByteBuff.items.length) {
804      throw new IndexOutOfBoundsException("index:[" + byteBufferIndex + "],but only index [0-"
805        + multiByteBuff.items.length + ") is valid.");
806    }
807    return multiByteBuff.items[byteBufferIndex];
808  }
809
810  private static int getItemIndexForByteBuff(ByteBuff byteBuff, int offset, int length) {
811    if (byteBuff instanceof SingleByteBuff) {
812      ByteBuffer byteBuffer = byteBuff.nioByteBuffers()[0];
813      if (offset + length > byteBuffer.limit()) {
814        throw new BufferUnderflowException();
815      }
816      return 0;
817    }
818    MultiByteBuff multiByteBuff = (MultiByteBuff) byteBuff;
819    return multiByteBuff.getItemIndex(offset);
820  }
821
822  private static int getRelativeOffsetForByteBuff(ByteBuff byteBuff, int globalOffset,
823    int itemIndex) {
824    if (byteBuff instanceof SingleByteBuff) {
825      if (itemIndex != 0) {
826        throw new IndexOutOfBoundsException("index:[" + itemIndex + "],but only index 0 is valid.");
827      }
828      return globalOffset;
829    }
830    return ((MultiByteBuff) byteBuff).getRelativeOffset(globalOffset, itemIndex);
831  }
832
833  private int getRelativeOffset(int globalOffset, int itemIndex) {
834    if (itemIndex < 0 || itemIndex >= this.items.length) {
835      throw new IndexOutOfBoundsException(
836        "index:[" + itemIndex + "],but only index [0-" + this.items.length + ") is valid.");
837    }
838    return globalOffset - this.itemBeginPos[itemIndex];
839  }
840
841  private static int getItemByteBufferCount(ByteBuff buf) {
842    return (buf instanceof SingleByteBuff) ? 1 : ((MultiByteBuff) buf).items.length;
843  }
844
845  /**
846   * Writes an int to this MBB at its current position. Also advances the position by size of int
847   * @param val Int value to write
848   * @return this object
849   */
850  @Override
851  public MultiByteBuff putInt(int val) {
852    checkRefCount();
853    if (this.curItem.remaining() >= Bytes.SIZEOF_INT) {
854      this.curItem.putInt(val);
855      return this;
856    }
857    if (this.curItemIndex == this.items.length - 1) {
858      throw new BufferOverflowException();
859    }
860    // During read, we will read as byte by byte for this case. So just write in Big endian
861    put(int3(val));
862    put(int2(val));
863    put(int1(val));
864    put(int0(val));
865    return this;
866  }
867
868  private static byte int3(int x) {
869    return (byte) (x >> 24);
870  }
871
872  private static byte int2(int x) {
873    return (byte) (x >> 16);
874  }
875
876  private static byte int1(int x) {
877    return (byte) (x >> 8);
878  }
879
880  private static byte int0(int x) {
881    return (byte) x;
882  }
883
884  /** Copies from the given byte[] to this MBB */
885  @Override
886  public final MultiByteBuff put(byte[] src) {
887    return put(src, 0, src.length);
888  }
889
890  /** Copies from the given byte[] to this MBB. */
891  @Override
892  public MultiByteBuff put(byte[] src, int offset, int length) {
893    checkRefCount();
894    if (this.curItem.remaining() >= length) {
895      ByteBufferUtils.copyFromArrayToBuffer(this.curItem, src, offset, length);
896      return this;
897    }
898    int end = offset + length;
899    for (int i = offset; i < end; i++) {
900      this.put(src[i]);
901    }
902    return this;
903  }
904
905  /**
906   * Writes a long to this MBB at its current position. Also advances the position by size of long
907   * @param val Long value to write
908   * @return this object
909   */
910  @Override
911  public MultiByteBuff putLong(long val) {
912    checkRefCount();
913    if (this.curItem.remaining() >= Bytes.SIZEOF_LONG) {
914      this.curItem.putLong(val);
915      return this;
916    }
917    if (this.curItemIndex == this.items.length - 1) {
918      throw new BufferOverflowException();
919    }
920    // During read, we will read as byte by byte for this case. So just write in Big endian
921    put(long7(val));
922    put(long6(val));
923    put(long5(val));
924    put(long4(val));
925    put(long3(val));
926    put(long2(val));
927    put(long1(val));
928    put(long0(val));
929    return this;
930  }
931
932  private static byte long7(long x) {
933    return (byte) (x >> 56);
934  }
935
936  private static byte long6(long x) {
937    return (byte) (x >> 48);
938  }
939
940  private static byte long5(long x) {
941    return (byte) (x >> 40);
942  }
943
944  private static byte long4(long x) {
945    return (byte) (x >> 32);
946  }
947
948  private static byte long3(long x) {
949    return (byte) (x >> 24);
950  }
951
952  private static byte long2(long x) {
953    return (byte) (x >> 16);
954  }
955
956  private static byte long1(long x) {
957    return (byte) (x >> 8);
958  }
959
960  private static byte long0(long x) {
961    return (byte) x;
962  }
963
964  /**
965   * Jumps the current position of this MBB by specified length.
966   */
967  @Override
968  public MultiByteBuff skip(int length) {
969    checkRefCount();
970    // Get available bytes from this item and remaining from next
971    int jump = 0;
972    while (true) {
973      jump = this.curItem.remaining();
974      if (jump >= length) {
975        this.curItem.position(this.curItem.position() + length);
976        break;
977      }
978      this.curItem.position(this.curItem.position() + jump);
979      length -= jump;
980      this.curItemIndex++;
981      this.curItem = this.items[this.curItemIndex];
982    }
983    return this;
984  }
985
986  /**
987   * Jumps back the current position of this MBB by specified length.
988   */
989  @Override
990  public MultiByteBuff moveBack(int length) {
991    checkRefCount();
992    while (length != 0) {
993      if (length > curItem.position()) {
994        length -= curItem.position();
995        this.curItem.position(0);
996        this.curItemIndex--;
997        this.curItem = this.items[curItemIndex];
998      } else {
999        this.curItem.position(curItem.position() - length);
1000        break;
1001      }
1002    }
1003    return this;
1004  }
1005
1006  /**
1007   * Returns bytes from current position till length specified, as a single ByteBuffer. When all
1008   * these bytes happen to be in a single ByteBuffer, which this object wraps, that ByteBuffer item
1009   * as such will be returned. So users are warned not to change the position or limit of this
1010   * returned ByteBuffer. The position of the returned byte buffer is at the begin of the required
1011   * bytes. When the required bytes happen to span across multiple ByteBuffers, this API will copy
1012   * the bytes to a newly created ByteBuffer of required size and return that.
1013   * @param length number of bytes required.
1014   * @return bytes from current position till length specified, as a single ByteButter.
1015   */
1016  @Override
1017  public ByteBuffer asSubByteBuffer(int length) {
1018    checkRefCount();
1019    if (this.curItem.remaining() >= length) {
1020      return this.curItem;
1021    }
1022    int offset = 0;
1023    byte[] dupB = new byte[length];
1024    int locCurItemIndex = curItemIndex;
1025    ByteBuffer locCurItem = curItem;
1026    while (length > 0) {
1027      int toRead = Math.min(length, locCurItem.remaining());
1028      ByteBufferUtils.copyFromBufferToArray(dupB, locCurItem, locCurItem.position(), offset,
1029        toRead);
1030      length -= toRead;
1031      if (length == 0) break;
1032      locCurItemIndex++;
1033      locCurItem = this.items[locCurItemIndex];
1034      offset += toRead;
1035    }
1036    return ByteBuffer.wrap(dupB);
1037  }
1038
1039  /**
1040   * Returns bytes from given offset till length specified, as a single ByteBuffer. When all these
1041   * bytes happen to be in a single ByteBuffer, which this object wraps, that ByteBuffer item as
1042   * such will be returned (with offset in this ByteBuffer where the bytes starts). So users are
1043   * warned not to change the position or limit of this returned ByteBuffer. When the required bytes
1044   * happen to span across multiple ByteBuffers, this API will copy the bytes to a newly created
1045   * ByteBuffer of required size and return that.
1046   * @param offset the offset in this MBB from where the subBuffer should be created
1047   * @param length the length of the subBuffer
1048   * @param pair   a pair that will have the bytes from the current position till length specified,
1049   *               as a single ByteBuffer and offset in that Buffer where the bytes starts. The
1050   *               method would set the values on the pair that is passed in by the caller
1051   */
1052  @Override
1053  public void asSubByteBuffer(int offset, int length, ObjectIntPair<ByteBuffer> pair) {
1054    checkRefCount();
1055    if (this.itemBeginPos[this.curItemIndex] <= offset) {
1056      int relOffsetInCurItem = offset - this.itemBeginPos[this.curItemIndex];
1057      if (this.curItem.limit() - relOffsetInCurItem >= length) {
1058        pair.setFirst(this.curItem);
1059        pair.setSecond(relOffsetInCurItem);
1060        return;
1061      }
1062    }
1063    int itemIndex = getItemIndex(offset);
1064    ByteBuffer item = this.items[itemIndex];
1065    offset = offset - this.itemBeginPos[itemIndex];
1066    if (item.limit() - offset >= length) {
1067      pair.setFirst(item);
1068      pair.setSecond(offset);
1069      return;
1070    }
1071    byte[] dst = new byte[length];
1072    int destOffset = 0;
1073    while (length > 0) {
1074      int toRead = Math.min(length, item.limit() - offset);
1075      ByteBufferUtils.copyFromBufferToArray(dst, item, offset, destOffset, toRead);
1076      length -= toRead;
1077      if (length == 0) break;
1078      itemIndex++;
1079      item = this.items[itemIndex];
1080      destOffset += toRead;
1081      offset = 0;
1082    }
1083    pair.setFirst(ByteBuffer.wrap(dst));
1084    pair.setSecond(0);
1085  }
1086
1087  /**
1088   * Copies the content from an this MBB to a ByteBuffer
1089   * @param out          the ByteBuffer to which the copy has to happen, its position will be
1090   *                     advanced.
1091   * @param sourceOffset the offset in the MBB from which the elements has to be copied
1092   * @param length       the length in the MBB upto which the elements has to be copied
1093   */
1094  @Override
1095  public void get(ByteBuffer out, int sourceOffset, int length) {
1096    checkRefCount();
1097    int itemIndex = getItemIndex(sourceOffset);
1098    ByteBuffer in = this.items[itemIndex];
1099    sourceOffset = sourceOffset - this.itemBeginPos[itemIndex];
1100    while (length > 0) {
1101      int toRead = Math.min(in.limit() - sourceOffset, length);
1102      ByteBufferUtils.copyFromBufferToBuffer(in, out, sourceOffset, toRead);
1103      length -= toRead;
1104      if (length == 0) {
1105        break;
1106      }
1107      itemIndex++;
1108      in = this.items[itemIndex];
1109      sourceOffset = 0;
1110    }
1111  }
1112
1113  /**
1114   * Copy the content from this MBB to a byte[] based on the given offset and length the position
1115   * from where the copy should start the length upto which the copy has to be done
1116   * @return byte[] with the copied contents from this MBB.
1117   */
1118  @Override
1119  public byte[] toBytes(int offset, int length) {
1120    checkRefCount();
1121    byte[] output = new byte[length];
1122    this.get(offset, output, 0, length);
1123    return output;
1124  }
1125
1126  private int internalRead(ReadableByteChannel channel, long offset, ChannelReader reader)
1127    throws IOException {
1128    checkRefCount();
1129    int total = 0;
1130    while (buffsIterator.hasNext()) {
1131      ByteBuffer buffer = buffsIterator.next();
1132      int len = read(channel, buffer, offset, reader);
1133      if (len > 0) {
1134        total += len;
1135        offset += len;
1136      }
1137      if (buffer.hasRemaining()) {
1138        // reset
1139        curItem = buffer;
1140        curItemIndex = (curItemIndex - 1);
1141        break;
1142      }
1143    }
1144    return total;
1145  }
1146
1147  @Override
1148  public int read(ReadableByteChannel channel) throws IOException {
1149    return internalRead(channel, 0, CHANNEL_READER);
1150  }
1151
1152  @Override
1153  public int read(FileChannel channel, long offset) throws IOException {
1154    return internalRead(channel, offset, FILE_READER);
1155  }
1156
1157  @Override
1158  public int write(FileChannel channel, long offset) throws IOException {
1159    checkRefCount();
1160    int total = 0;
1161    while (buffsIterator.hasNext()) {
1162      ByteBuffer buffer = buffsIterator.next();
1163      while (buffer.hasRemaining()) {
1164        int len = channel.write(buffer, offset);
1165        total += len;
1166        offset += len;
1167      }
1168    }
1169    return total;
1170  }
1171
1172  @Override
1173  public ByteBuffer[] nioByteBuffers() {
1174    checkRefCount();
1175    return this.items;
1176  }
1177
1178  @Override
1179  public boolean equals(Object obj) {
1180    if (!(obj instanceof MultiByteBuff)) return false;
1181    if (this == obj) return true;
1182    MultiByteBuff that = (MultiByteBuff) obj;
1183    if (this.capacity() != that.capacity()) return false;
1184    if (
1185      ByteBuff.compareTo(this, this.position(), this.limit(), that, that.position(), that.limit())
1186          == 0
1187    ) {
1188      return true;
1189    }
1190    return false;
1191  }
1192
1193  @Override
1194  public int hashCode() {
1195    int hash = 0;
1196    for (ByteBuffer b : this.items) {
1197      hash += b.hashCode();
1198    }
1199    return hash;
1200  }
1201
1202  @Override
1203  public MultiByteBuff retain() {
1204    refCnt.retain();
1205    return this;
1206  }
1207}