001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.nio;
019
020import static org.apache.hadoop.hbase.io.ByteBuffAllocator.NONE;
021
022import java.io.IOException;
023import java.nio.BufferOverflowException;
024import java.nio.BufferUnderflowException;
025import java.nio.ByteBuffer;
026import java.nio.InvalidMarkException;
027import java.nio.channels.FileChannel;
028import java.nio.channels.ReadableByteChannel;
029import java.util.Iterator;
030import java.util.NoSuchElementException;
031import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
032import org.apache.hadoop.hbase.util.ByteBufferUtils;
033import org.apache.hadoop.hbase.util.Bytes;
034import org.apache.hadoop.hbase.util.ObjectIntPair;
035import org.apache.yetus.audience.InterfaceAudience;
036
037/**
038 * Provides a unified view of all the underlying ByteBuffers and will look as if a bigger sequential
039 * buffer. This class provides similar APIs as in {@link ByteBuffer} to put/get int, short, long etc
040 * and doing operations like mark, reset, slice etc. This has to be used when data is split across
041 * multiple byte buffers and we don't want copy them to single buffer for reading from it.
042 */
043@InterfaceAudience.Private
044public class MultiByteBuff extends ByteBuff {
045
046  private final ByteBuffer[] items;
047  // Pointer to the current item in the MBB
048  private ByteBuffer curItem = null;
049  // Index of the current item in the MBB
050  private int curItemIndex = 0;
051
052  private int limit = 0;
053  private int limitedItemIndex;
054  private int markedItemIndex = -1;
055  private final int[] itemBeginPos;
056
057  private Iterator<ByteBuffer> buffsIterator = new Iterator<ByteBuffer>() {
058    @Override
059    public boolean hasNext() {
060      return curItemIndex < limitedItemIndex
061        || (curItemIndex == limitedItemIndex && items[curItemIndex].hasRemaining());
062    }
063
064    @Override
065    public ByteBuffer next() {
066      if (curItemIndex >= items.length) {
067        throw new NoSuchElementException("items overflow");
068      }
069      curItem = items[curItemIndex++];
070      return curItem;
071    }
072  };
073
074  public MultiByteBuff(ByteBuffer... items) {
075    this(NONE, items);
076  }
077
078  public MultiByteBuff(Recycler recycler, ByteBuffer... items) {
079    this(new RefCnt(recycler), items);
080  }
081
082  MultiByteBuff(RefCnt refCnt, ByteBuffer... items) {
083    this.refCnt = refCnt;
084    assert items != null;
085    assert items.length > 0;
086    this.items = items;
087    this.curItem = this.items[this.curItemIndex];
088    // See below optimization in getInt(int) where we check whether the given index land in current
089    // item. For this we need to check whether the passed index is less than the next item begin
090    // offset. To handle this effectively for the last item buffer, we add an extra item into this
091    // array.
092    itemBeginPos = new int[items.length + 1];
093    int offset = 0;
094    for (int i = 0; i < items.length; i++) {
095      ByteBuffer item = items[i];
096      item.rewind();
097      itemBeginPos[i] = offset;
098      int l = item.limit() - item.position();
099      offset += l;
100    }
101    this.limit = offset;
102    this.itemBeginPos[items.length] = offset + 1;
103    this.limitedItemIndex = this.items.length - 1;
104  }
105
106  private MultiByteBuff(RefCnt refCnt, ByteBuffer[] items, int[] itemBeginPos, int limit,
107    int limitedIndex, int curItemIndex, int markedIndex) {
108    this.refCnt = refCnt;
109    this.items = items;
110    this.curItemIndex = curItemIndex;
111    this.curItem = this.items[this.curItemIndex];
112    this.itemBeginPos = itemBeginPos;
113    this.limit = limit;
114    this.limitedItemIndex = limitedIndex;
115    this.markedItemIndex = markedIndex;
116  }
117
118  /**
119   * @throws UnsupportedOperationException MBB does not support array based operations
120   */
121  @Override
122  public byte[] array() {
123    throw new UnsupportedOperationException();
124  }
125
126  /**
127   * @throws UnsupportedOperationException MBB does not support array based operations
128   */
129  @Override
130  public int arrayOffset() {
131    throw new UnsupportedOperationException();
132  }
133
134  /** Returns false. MBB does not support array based operations */
135  @Override
136  public boolean hasArray() {
137    return false;
138  }
139
140  /** Returns the total capacity of this MultiByteBuffer. */
141  @Override
142  public int capacity() {
143    checkRefCount();
144    int c = 0;
145    for (ByteBuffer item : this.items) {
146      c += item.capacity();
147    }
148    return c;
149  }
150
151  /**
152   * Fetches the byte at the given index. Does not change position of the underlying ByteBuffers n
153   * * @return the byte at the given index
154   */
155  @Override
156  public byte get(int index) {
157    checkRefCount();
158    int itemIndex = getItemIndex(index);
159    return ByteBufferUtils.toByte(this.items[itemIndex], index - this.itemBeginPos[itemIndex]);
160  }
161
162  @Override
163  public byte getByteAfterPosition(int offset) {
164    checkRefCount();
165    // Mostly the index specified will land within this current item. Short circuit for that
166    int index = offset + this.position();
167    int itemIndex = getItemIndexFromCurItemIndex(index);
168    return ByteBufferUtils.toByte(this.items[itemIndex], index - this.itemBeginPos[itemIndex]);
169  }
170
171  /*
172   * Returns in which sub ByteBuffer, the given element index will be available.
173   */
174  private int getItemIndex(int elemIndex) {
175    if (elemIndex < 0) {
176      throw new IndexOutOfBoundsException();
177    }
178    int index = 1;
179    while (elemIndex >= this.itemBeginPos[index]) {
180      index++;
181      if (index == this.itemBeginPos.length) {
182        throw new IndexOutOfBoundsException();
183      }
184    }
185    return index - 1;
186  }
187
188  /*
189   * Returns in which sub ByteBuffer, the given element index will be available. In this case we are
190   * sure that the item will be after MBB's current position
191   */
192  private int getItemIndexFromCurItemIndex(int elemIndex) {
193    int index = this.curItemIndex;
194    while (elemIndex >= this.itemBeginPos[index]) {
195      index++;
196      if (index == this.itemBeginPos.length) {
197        throw new IndexOutOfBoundsException();
198      }
199    }
200    return index - 1;
201  }
202
203  /**
204   * Fetches the int at the given index. Does not change position of the underlying ByteBuffers n
205   * * @return the int value at the given index
206   */
207  @Override
208  public int getInt(int index) {
209    checkRefCount();
210    // Mostly the index specified will land within this current item. Short circuit for that
211    int itemIndex;
212    if (
213      this.itemBeginPos[this.curItemIndex] <= index
214        && this.itemBeginPos[this.curItemIndex + 1] > index
215    ) {
216      itemIndex = this.curItemIndex;
217    } else {
218      itemIndex = getItemIndex(index);
219    }
220    return getInt(index, itemIndex);
221  }
222
223  @Override
224  public int getIntAfterPosition(int offset) {
225    checkRefCount();
226    // Mostly the index specified will land within this current item. Short circuit for that
227    int index = offset + this.position();
228    int itemIndex;
229    if (this.itemBeginPos[this.curItemIndex + 1] > index) {
230      itemIndex = this.curItemIndex;
231    } else {
232      itemIndex = getItemIndexFromCurItemIndex(index);
233    }
234    return getInt(index, itemIndex);
235  }
236
237  /**
238   * Fetches the short at the given index. Does not change position of the underlying ByteBuffers n
239   * * @return the short value at the given index
240   */
241  @Override
242  public short getShort(int index) {
243    checkRefCount();
244    // Mostly the index specified will land within this current item. Short circuit for that
245    int itemIndex;
246    if (
247      this.itemBeginPos[this.curItemIndex] <= index
248        && this.itemBeginPos[this.curItemIndex + 1] > index
249    ) {
250      itemIndex = this.curItemIndex;
251    } else {
252      itemIndex = getItemIndex(index);
253    }
254    ByteBuffer item = items[itemIndex];
255    int offsetInItem = index - this.itemBeginPos[itemIndex];
256    if (item.limit() - offsetInItem >= Bytes.SIZEOF_SHORT) {
257      return ByteBufferUtils.toShort(item, offsetInItem);
258    }
259    if (items.length - 1 == itemIndex) {
260      // means cur item is the last one and we wont be able to read a int. Throw exception
261      throw new BufferUnderflowException();
262    }
263    ByteBuffer nextItem = items[itemIndex + 1];
264    // Get available one byte from this item and remaining one from next
265    short n = 0;
266    n = (short) (n ^ (ByteBufferUtils.toByte(item, offsetInItem) & 0xFF));
267    n = (short) (n << 8);
268    n = (short) (n ^ (ByteBufferUtils.toByte(nextItem, 0) & 0xFF));
269    return n;
270  }
271
272  @Override
273  public short getShortAfterPosition(int offset) {
274    checkRefCount();
275    // Mostly the index specified will land within this current item. Short circuit for that
276    int index = offset + this.position();
277    int itemIndex;
278    if (this.itemBeginPos[this.curItemIndex + 1] > index) {
279      itemIndex = this.curItemIndex;
280    } else {
281      itemIndex = getItemIndexFromCurItemIndex(index);
282    }
283    return getShort(index, itemIndex);
284  }
285
286  private int getInt(int index, int itemIndex) {
287    ByteBuffer item = items[itemIndex];
288    int offsetInItem = index - this.itemBeginPos[itemIndex];
289    int remainingLen = item.limit() - offsetInItem;
290    if (remainingLen >= Bytes.SIZEOF_INT) {
291      return ByteBufferUtils.toInt(item, offsetInItem);
292    }
293    if (items.length - 1 == itemIndex) {
294      // means cur item is the last one and we wont be able to read a int. Throw exception
295      throw new BufferUnderflowException();
296    }
297    int l = 0;
298    for (int i = 0; i < Bytes.SIZEOF_INT; i++) {
299      l <<= 8;
300      l ^= get(index + i) & 0xFF;
301    }
302    return l;
303  }
304
305  private short getShort(int index, int itemIndex) {
306    ByteBuffer item = items[itemIndex];
307    int offsetInItem = index - this.itemBeginPos[itemIndex];
308    int remainingLen = item.limit() - offsetInItem;
309    if (remainingLen >= Bytes.SIZEOF_SHORT) {
310      return ByteBufferUtils.toShort(item, offsetInItem);
311    }
312    if (items.length - 1 == itemIndex) {
313      // means cur item is the last one and we wont be able to read a short. Throw exception
314      throw new BufferUnderflowException();
315    }
316    ByteBuffer nextItem = items[itemIndex + 1];
317    // Get available bytes from this item and remaining from next
318    short l = 0;
319    for (int i = offsetInItem; i < item.capacity(); i++) {
320      l = (short) (l << 8);
321      l = (short) (l ^ (ByteBufferUtils.toByte(item, i) & 0xFF));
322    }
323    for (int i = 0; i < Bytes.SIZEOF_SHORT - remainingLen; i++) {
324      l = (short) (l << 8);
325      l = (short) (l ^ (ByteBufferUtils.toByte(nextItem, i) & 0xFF));
326    }
327    return l;
328  }
329
330  private long getLong(int index, int itemIndex) {
331    ByteBuffer item = items[itemIndex];
332    int offsetInItem = index - this.itemBeginPos[itemIndex];
333    int remainingLen = item.limit() - offsetInItem;
334    if (remainingLen >= Bytes.SIZEOF_LONG) {
335      return ByteBufferUtils.toLong(item, offsetInItem);
336    }
337    if (items.length - 1 == itemIndex) {
338      // means cur item is the last one and we wont be able to read a long. Throw exception
339      throw new BufferUnderflowException();
340    }
341    long l = 0;
342    for (int i = 0; i < Bytes.SIZEOF_LONG; i++) {
343      l <<= 8;
344      l ^= get(index + i) & 0xFF;
345    }
346    return l;
347  }
348
349  /**
350   * Fetches the long at the given index. Does not change position of the underlying ByteBuffers n
351   * * @return the long value at the given index
352   */
353  @Override
354  public long getLong(int index) {
355    checkRefCount();
356    // Mostly the index specified will land within this current item. Short circuit for that
357    int itemIndex;
358    if (
359      this.itemBeginPos[this.curItemIndex] <= index
360        && this.itemBeginPos[this.curItemIndex + 1] > index
361    ) {
362      itemIndex = this.curItemIndex;
363    } else {
364      itemIndex = getItemIndex(index);
365    }
366    return getLong(index, itemIndex);
367  }
368
369  @Override
370  public long getLongAfterPosition(int offset) {
371    checkRefCount();
372    // Mostly the index specified will land within this current item. Short circuit for that
373    int index = offset + this.position();
374    int itemIndex;
375    if (this.itemBeginPos[this.curItemIndex + 1] > index) {
376      itemIndex = this.curItemIndex;
377    } else {
378      itemIndex = getItemIndexFromCurItemIndex(index);
379    }
380    return getLong(index, itemIndex);
381  }
382
383  /** Returns this MBB's current position */
384  @Override
385  public int position() {
386    checkRefCount();
387    return itemBeginPos[this.curItemIndex] + this.curItem.position();
388  }
389
390  /**
391   * Sets this MBB's position to the given value. n * @return this object
392   */
393  @Override
394  public MultiByteBuff position(int position) {
395    checkRefCount();
396    // Short circuit for positioning within the cur item. Mostly that is the case.
397    if (
398      this.itemBeginPos[this.curItemIndex] <= position
399        && this.itemBeginPos[this.curItemIndex + 1] > position
400    ) {
401      this.curItem.position(position - this.itemBeginPos[this.curItemIndex]);
402      return this;
403    }
404    int itemIndex = getItemIndex(position);
405    // All items from 0 - curItem-1 set position at end.
406    for (int i = 0; i < itemIndex; i++) {
407      this.items[i].position(this.items[i].limit());
408    }
409    // All items after curItem set position at begin
410    for (int i = itemIndex + 1; i < this.items.length; i++) {
411      this.items[i].position(0);
412    }
413    this.curItem = this.items[itemIndex];
414    this.curItem.position(position - this.itemBeginPos[itemIndex]);
415    this.curItemIndex = itemIndex;
416    return this;
417  }
418
419  /**
420   * Rewinds this MBB and the position is set to 0
421   * @return this object
422   */
423  @Override
424  public MultiByteBuff rewind() {
425    checkRefCount();
426    for (int i = 0; i < this.items.length; i++) {
427      this.items[i].rewind();
428    }
429    this.curItemIndex = 0;
430    this.curItem = this.items[this.curItemIndex];
431    this.markedItemIndex = -1;
432    return this;
433  }
434
435  /**
436   * Marks the current position of the MBB
437   * @return this object
438   */
439  @Override
440  public MultiByteBuff mark() {
441    checkRefCount();
442    this.markedItemIndex = this.curItemIndex;
443    this.curItem.mark();
444    return this;
445  }
446
447  /**
448   * Similar to {@link ByteBuffer}.reset(), ensures that this MBB is reset back to last marked
449   * position.
450   * @return This MBB
451   */
452  @Override
453  public MultiByteBuff reset() {
454    checkRefCount();
455    // when the buffer is moved to the next one.. the reset should happen on the previous marked
456    // item and the new one should be taken as the base
457    if (this.markedItemIndex < 0) throw new InvalidMarkException();
458    ByteBuffer markedItem = this.items[this.markedItemIndex];
459    markedItem.reset();
460    this.curItem = markedItem;
461    // All items after the marked position upto the current item should be reset to 0
462    for (int i = this.curItemIndex; i > this.markedItemIndex; i--) {
463      this.items[i].position(0);
464    }
465    this.curItemIndex = this.markedItemIndex;
466    return this;
467  }
468
469  /**
470   * Returns the number of elements between the current position and the limit.
471   * @return the remaining elements in this MBB
472   */
473  @Override
474  public int remaining() {
475    checkRefCount();
476    int remain = 0;
477    for (int i = curItemIndex; i < items.length; i++) {
478      remain += items[i].remaining();
479    }
480    return remain;
481  }
482
483  /**
484   * Returns true if there are elements between the current position and the limt
485   * @return true if there are elements, false otherwise
486   */
487  @Override
488  public final boolean hasRemaining() {
489    checkRefCount();
490    return this.curItem.hasRemaining() || (this.curItemIndex < this.limitedItemIndex
491      && this.items[this.curItemIndex + 1].hasRemaining());
492  }
493
494  /**
495   * A relative method that returns byte at the current position. Increments the current position by
496   * the size of a byte.
497   * @return the byte at the current position
498   */
499  @Override
500  public byte get() {
501    checkRefCount();
502    if (this.curItem.remaining() == 0) {
503      if (items.length - 1 == this.curItemIndex) {
504        // means cur item is the last one and we wont be able to read a long. Throw exception
505        throw new BufferUnderflowException();
506      }
507      this.curItemIndex++;
508      this.curItem = this.items[this.curItemIndex];
509    }
510    return this.curItem.get();
511  }
512
513  /**
514   * Returns the short value at the current position. Also advances the position by the size of
515   * short
516   * @return the short value at the current position
517   */
518  @Override
519  public short getShort() {
520    checkRefCount();
521    int remaining = this.curItem.remaining();
522    if (remaining >= Bytes.SIZEOF_SHORT) {
523      return this.curItem.getShort();
524    }
525    short n = 0;
526    n = (short) (n ^ (get() & 0xFF));
527    n = (short) (n << 8);
528    n = (short) (n ^ (get() & 0xFF));
529    return n;
530  }
531
532  /**
533   * Returns the int value at the current position. Also advances the position by the size of int
534   * @return the int value at the current position
535   */
536  @Override
537  public int getInt() {
538    checkRefCount();
539    int remaining = this.curItem.remaining();
540    if (remaining >= Bytes.SIZEOF_INT) {
541      return this.curItem.getInt();
542    }
543    int n = 0;
544    for (int i = 0; i < Bytes.SIZEOF_INT; i++) {
545      n <<= 8;
546      n ^= get() & 0xFF;
547    }
548    return n;
549  }
550
551  /**
552   * Returns the long value at the current position. Also advances the position by the size of long
553   * @return the long value at the current position
554   */
555  @Override
556  public long getLong() {
557    checkRefCount();
558    int remaining = this.curItem.remaining();
559    if (remaining >= Bytes.SIZEOF_LONG) {
560      return this.curItem.getLong();
561    }
562    long l = 0;
563    for (int i = 0; i < Bytes.SIZEOF_LONG; i++) {
564      l <<= 8;
565      l ^= get() & 0xFF;
566    }
567    return l;
568  }
569
570  /**
571   * Copies the content from this MBB's current position to the byte array and fills it. Also
572   * advances the position of the MBB by the length of the byte[]. n
573   */
574  @Override
575  public void get(byte[] dst) {
576    get(dst, 0, dst.length);
577  }
578
579  /**
580   * Copies the specified number of bytes from this MBB's current position to the byte[]'s offset.
581   * Also advances the position of the MBB by the given length.
582   */
583  @Override
584  public void get(byte[] dst, int offset, int length) {
585    checkRefCount();
586    while (length > 0) {
587      int toRead = Math.min(length, this.curItem.remaining());
588      ByteBufferUtils.copyFromBufferToArray(dst, this.curItem, this.curItem.position(), offset,
589        toRead);
590      this.curItem.position(this.curItem.position() + toRead);
591      length -= toRead;
592      if (length == 0) break;
593      this.curItemIndex++;
594      this.curItem = this.items[this.curItemIndex];
595      offset += toRead;
596    }
597  }
598
599  @Override
600  public void get(int sourceOffset, byte[] dst, int offset, int length) {
601    checkRefCount();
602    int itemIndex = getItemIndex(sourceOffset);
603    ByteBuffer item = this.items[itemIndex];
604    sourceOffset = sourceOffset - this.itemBeginPos[itemIndex];
605    while (length > 0) {
606      int toRead = Math.min((item.limit() - sourceOffset), length);
607      ByteBufferUtils.copyFromBufferToArray(dst, item, sourceOffset, offset, toRead);
608      length -= toRead;
609      if (length == 0) break;
610      itemIndex++;
611      item = this.items[itemIndex];
612      offset += toRead;
613      sourceOffset = 0;
614    }
615  }
616
617  /**
618   * Marks the limit of this MBB. n * @return This MBB
619   */
620  @Override
621  public MultiByteBuff limit(int limit) {
622    checkRefCount();
623    this.limit = limit;
624    // Normally the limit will try to limit within the last BB item
625    int limitedIndexBegin = this.itemBeginPos[this.limitedItemIndex];
626    if (limit >= limitedIndexBegin && limit < this.itemBeginPos[this.limitedItemIndex + 1]) {
627      this.items[this.limitedItemIndex].limit(limit - limitedIndexBegin);
628      return this;
629    }
630    int itemIndex = getItemIndex(limit);
631    int beginOffset = this.itemBeginPos[itemIndex];
632    int offsetInItem = limit - beginOffset;
633    ByteBuffer item = items[itemIndex];
634    item.limit(offsetInItem);
635    for (int i = this.limitedItemIndex; i < itemIndex; i++) {
636      this.items[i].limit(this.items[i].capacity());
637    }
638    this.limitedItemIndex = itemIndex;
639    for (int i = itemIndex + 1; i < this.items.length; i++) {
640      this.items[i].limit(this.items[i].position());
641    }
642    return this;
643  }
644
645  /**
646   * Returns the limit of this MBB
647   * @return limit of the MBB
648   */
649  @Override
650  public int limit() {
651    return this.limit;
652  }
653
654  /**
655   * Returns an MBB which is a sliced version of this MBB. The position, limit and mark of the new
656   * MBB will be independent than that of the original MBB. The content of the new MBB will start at
657   * this MBB's current position
658   * @return a sliced MBB
659   */
660  @Override
661  public MultiByteBuff slice() {
662    checkRefCount();
663    ByteBuffer[] copy = new ByteBuffer[this.limitedItemIndex - this.curItemIndex + 1];
664    for (int i = curItemIndex, j = 0; i <= this.limitedItemIndex; i++, j++) {
665      copy[j] = this.items[i].slice();
666    }
667    return new MultiByteBuff(refCnt, copy);
668  }
669
670  /**
671   * Returns an MBB which is a duplicate version of this MBB. The position, limit and mark of the
672   * new MBB will be independent than that of the original MBB. The content of the new MBB will
673   * start at this MBB's current position The position, limit and mark of the new MBB would be
674   * identical to this MBB in terms of values.
675   * @return a duplicated MBB
676   */
677  @Override
678  public MultiByteBuff duplicate() {
679    checkRefCount();
680    ByteBuffer[] itemsCopy = new ByteBuffer[this.items.length];
681    for (int i = 0; i < this.items.length; i++) {
682      itemsCopy[i] = items[i].duplicate();
683    }
684    return new MultiByteBuff(refCnt, itemsCopy, this.itemBeginPos, this.limit,
685      this.limitedItemIndex, this.curItemIndex, this.markedItemIndex);
686  }
687
688  /**
689   * Writes a byte to this MBB at the current position and increments the position n * @return this
690   * object
691   */
692  @Override
693  public MultiByteBuff put(byte b) {
694    checkRefCount();
695    if (this.curItem.remaining() == 0) {
696      if (this.curItemIndex == this.items.length - 1) {
697        throw new BufferOverflowException();
698      }
699      this.curItemIndex++;
700      this.curItem = this.items[this.curItemIndex];
701    }
702    this.curItem.put(b);
703    return this;
704  }
705
706  /**
707   * Writes a byte to this MBB at the given index and won't affect the position of any of the
708   * buffers.
709   * @return this object
710   * @throws IndexOutOfBoundsException If <tt>index</tt> is negative or not smaller than the
711   *                                   {@link MultiByteBuff#limit}
712   */
713  @Override
714  public MultiByteBuff put(int index, byte b) {
715    checkRefCount();
716    int itemIndex = getItemIndex(index);
717    ByteBuffer item = items[itemIndex];
718    item.put(index - itemBeginPos[itemIndex], b);
719    return this;
720  }
721
722  /**
723   * Copies from a src BB to this MBB. This will be absolute positional copying and won't affect the
724   * position of any of the buffers.
725   * @param destOffset the position in this MBB to which the copy should happen
726   * @param src        the src MBB
727   * @param srcOffset  the offset in the src MBB from where the elements should be read
728   * @param length     the length upto which the copy should happen
729   * @throws BufferUnderflowException If there are fewer than length bytes remaining in src
730   *                                  ByteBuff.
731   * @throws BufferOverflowException  If there is insufficient available space in this MBB for
732   *                                  length bytes.
733   */
734  @Override
735  public MultiByteBuff put(int destOffset, ByteBuff src, int srcOffset, int length) {
736    checkRefCount();
737    int destItemIndex = getItemIndex(destOffset);
738    int srcItemIndex = getItemIndexForByteBuff(src, srcOffset, length);
739
740    ByteBuffer destItem = this.items[destItemIndex];
741    destOffset = this.getRelativeOffset(destOffset, destItemIndex);
742
743    ByteBuffer srcItem = getItemByteBuffer(src, srcItemIndex);
744    srcOffset = getRelativeOffsetForByteBuff(src, srcOffset, srcItemIndex);
745
746    while (length > 0) {
747      int toWrite = destItem.limit() - destOffset;
748      if (toWrite <= 0) {
749        throw new BufferOverflowException();
750      }
751      int toRead = srcItem.limit() - srcOffset;
752      if (toRead <= 0) {
753        throw new BufferUnderflowException();
754      }
755      int toMove = Math.min(length, Math.min(toRead, toWrite));
756      ByteBufferUtils.copyFromBufferToBuffer(srcItem, destItem, srcOffset, destOffset, toMove);
757      length -= toMove;
758      if (length == 0) {
759        break;
760      }
761      if (toRead < toWrite) {
762        if (++srcItemIndex >= getItemByteBufferCount(src)) {
763          throw new BufferUnderflowException();
764        }
765        srcItem = getItemByteBuffer(src, srcItemIndex);
766        srcOffset = 0;
767        destOffset += toMove;
768      } else if (toRead > toWrite) {
769        if (++destItemIndex >= this.items.length) {
770          throw new BufferOverflowException();
771        }
772        destItem = this.items[destItemIndex];
773        destOffset = 0;
774        srcOffset += toMove;
775      } else {
776        // toRead = toWrite case
777        if (++srcItemIndex >= getItemByteBufferCount(src)) {
778          throw new BufferUnderflowException();
779        }
780        srcItem = getItemByteBuffer(src, srcItemIndex);
781        srcOffset = 0;
782        if (++destItemIndex >= this.items.length) {
783          throw new BufferOverflowException();
784        }
785        destItem = this.items[destItemIndex];
786        destOffset = 0;
787      }
788    }
789    return this;
790  }
791
792  private static ByteBuffer getItemByteBuffer(ByteBuff buf, int byteBufferIndex) {
793    if (buf instanceof SingleByteBuff) {
794      if (byteBufferIndex != 0) {
795        throw new IndexOutOfBoundsException(
796          "index:[" + byteBufferIndex + "],but only index 0 is valid.");
797      }
798      return buf.nioByteBuffers()[0];
799    }
800    MultiByteBuff multiByteBuff = (MultiByteBuff) buf;
801    if (byteBufferIndex < 0 || byteBufferIndex >= multiByteBuff.items.length) {
802      throw new IndexOutOfBoundsException("index:[" + byteBufferIndex + "],but only index [0-"
803        + multiByteBuff.items.length + ") is valid.");
804    }
805    return multiByteBuff.items[byteBufferIndex];
806  }
807
808  private static int getItemIndexForByteBuff(ByteBuff byteBuff, int offset, int length) {
809    if (byteBuff instanceof SingleByteBuff) {
810      ByteBuffer byteBuffer = byteBuff.nioByteBuffers()[0];
811      if (offset + length > byteBuffer.limit()) {
812        throw new BufferUnderflowException();
813      }
814      return 0;
815    }
816    MultiByteBuff multiByteBuff = (MultiByteBuff) byteBuff;
817    return multiByteBuff.getItemIndex(offset);
818  }
819
820  private static int getRelativeOffsetForByteBuff(ByteBuff byteBuff, int globalOffset,
821    int itemIndex) {
822    if (byteBuff instanceof SingleByteBuff) {
823      if (itemIndex != 0) {
824        throw new IndexOutOfBoundsException("index:[" + itemIndex + "],but only index 0 is valid.");
825      }
826      return globalOffset;
827    }
828    return ((MultiByteBuff) byteBuff).getRelativeOffset(globalOffset, itemIndex);
829  }
830
831  private int getRelativeOffset(int globalOffset, int itemIndex) {
832    if (itemIndex < 0 || itemIndex >= this.items.length) {
833      throw new IndexOutOfBoundsException(
834        "index:[" + itemIndex + "],but only index [0-" + this.items.length + ") is valid.");
835    }
836    return globalOffset - this.itemBeginPos[itemIndex];
837  }
838
839  private static int getItemByteBufferCount(ByteBuff buf) {
840    return (buf instanceof SingleByteBuff) ? 1 : ((MultiByteBuff) buf).items.length;
841  }
842
843  /**
844   * Writes an int to this MBB at its current position. Also advances the position by size of int
845   * @param val Int value to write
846   * @return this object
847   */
848  @Override
849  public MultiByteBuff putInt(int val) {
850    checkRefCount();
851    if (this.curItem.remaining() >= Bytes.SIZEOF_INT) {
852      this.curItem.putInt(val);
853      return this;
854    }
855    if (this.curItemIndex == this.items.length - 1) {
856      throw new BufferOverflowException();
857    }
858    // During read, we will read as byte by byte for this case. So just write in Big endian
859    put(int3(val));
860    put(int2(val));
861    put(int1(val));
862    put(int0(val));
863    return this;
864  }
865
866  private static byte int3(int x) {
867    return (byte) (x >> 24);
868  }
869
870  private static byte int2(int x) {
871    return (byte) (x >> 16);
872  }
873
874  private static byte int1(int x) {
875    return (byte) (x >> 8);
876  }
877
878  private static byte int0(int x) {
879    return (byte) x;
880  }
881
882  /** Copies from the given byte[] to this MBB */
883  @Override
884  public final MultiByteBuff put(byte[] src) {
885    return put(src, 0, src.length);
886  }
887
888  /** Copies from the given byte[] to this MBB. */
889  @Override
890  public MultiByteBuff put(byte[] src, int offset, int length) {
891    checkRefCount();
892    if (this.curItem.remaining() >= length) {
893      ByteBufferUtils.copyFromArrayToBuffer(this.curItem, src, offset, length);
894      return this;
895    }
896    int end = offset + length;
897    for (int i = offset; i < end; i++) {
898      this.put(src[i]);
899    }
900    return this;
901  }
902
903  /**
904   * Writes a long to this MBB at its current position. Also advances the position by size of long
905   * @param val Long value to write
906   * @return this object
907   */
908  @Override
909  public MultiByteBuff putLong(long val) {
910    checkRefCount();
911    if (this.curItem.remaining() >= Bytes.SIZEOF_LONG) {
912      this.curItem.putLong(val);
913      return this;
914    }
915    if (this.curItemIndex == this.items.length - 1) {
916      throw new BufferOverflowException();
917    }
918    // During read, we will read as byte by byte for this case. So just write in Big endian
919    put(long7(val));
920    put(long6(val));
921    put(long5(val));
922    put(long4(val));
923    put(long3(val));
924    put(long2(val));
925    put(long1(val));
926    put(long0(val));
927    return this;
928  }
929
930  private static byte long7(long x) {
931    return (byte) (x >> 56);
932  }
933
934  private static byte long6(long x) {
935    return (byte) (x >> 48);
936  }
937
938  private static byte long5(long x) {
939    return (byte) (x >> 40);
940  }
941
942  private static byte long4(long x) {
943    return (byte) (x >> 32);
944  }
945
946  private static byte long3(long x) {
947    return (byte) (x >> 24);
948  }
949
950  private static byte long2(long x) {
951    return (byte) (x >> 16);
952  }
953
954  private static byte long1(long x) {
955    return (byte) (x >> 8);
956  }
957
958  private static byte long0(long x) {
959    return (byte) x;
960  }
961
962  /**
963   * Jumps the current position of this MBB by specified length. n
964   */
965  @Override
966  public MultiByteBuff skip(int length) {
967    checkRefCount();
968    // Get available bytes from this item and remaining from next
969    int jump = 0;
970    while (true) {
971      jump = this.curItem.remaining();
972      if (jump >= length) {
973        this.curItem.position(this.curItem.position() + length);
974        break;
975      }
976      this.curItem.position(this.curItem.position() + jump);
977      length -= jump;
978      this.curItemIndex++;
979      this.curItem = this.items[this.curItemIndex];
980    }
981    return this;
982  }
983
984  /**
985   * Jumps back the current position of this MBB by specified length. n
986   */
987  @Override
988  public MultiByteBuff moveBack(int length) {
989    checkRefCount();
990    while (length != 0) {
991      if (length > curItem.position()) {
992        length -= curItem.position();
993        this.curItem.position(0);
994        this.curItemIndex--;
995        this.curItem = this.items[curItemIndex];
996      } else {
997        this.curItem.position(curItem.position() - length);
998        break;
999      }
1000    }
1001    return this;
1002  }
1003
1004  /**
1005   * Returns bytes from current position till length specified, as a single ByteBuffer. When all
1006   * these bytes happen to be in a single ByteBuffer, which this object wraps, that ByteBuffer item
1007   * as such will be returned. So users are warned not to change the position or limit of this
1008   * returned ByteBuffer. The position of the returned byte buffer is at the begin of the required
1009   * bytes. When the required bytes happen to span across multiple ByteBuffers, this API will copy
1010   * the bytes to a newly created ByteBuffer of required size and return that.
1011   * @param length number of bytes required.
1012   * @return bytes from current position till length specified, as a single ByteButter.
1013   */
1014  @Override
1015  public ByteBuffer asSubByteBuffer(int length) {
1016    checkRefCount();
1017    if (this.curItem.remaining() >= length) {
1018      return this.curItem;
1019    }
1020    int offset = 0;
1021    byte[] dupB = new byte[length];
1022    int locCurItemIndex = curItemIndex;
1023    ByteBuffer locCurItem = curItem;
1024    while (length > 0) {
1025      int toRead = Math.min(length, locCurItem.remaining());
1026      ByteBufferUtils.copyFromBufferToArray(dupB, locCurItem, locCurItem.position(), offset,
1027        toRead);
1028      length -= toRead;
1029      if (length == 0) break;
1030      locCurItemIndex++;
1031      locCurItem = this.items[locCurItemIndex];
1032      offset += toRead;
1033    }
1034    return ByteBuffer.wrap(dupB);
1035  }
1036
1037  /**
1038   * Returns bytes from given offset till length specified, as a single ByteBuffer. When all these
1039   * bytes happen to be in a single ByteBuffer, which this object wraps, that ByteBuffer item as
1040   * such will be returned (with offset in this ByteBuffer where the bytes starts). So users are
1041   * warned not to change the position or limit of this returned ByteBuffer. When the required bytes
1042   * happen to span across multiple ByteBuffers, this API will copy the bytes to a newly created
1043   * ByteBuffer of required size and return that.
1044   * @param offset the offset in this MBB from where the subBuffer should be created
1045   * @param length the length of the subBuffer
1046   * @param pair   a pair that will have the bytes from the current position till length specified,
1047   *               as a single ByteBuffer and offset in that Buffer where the bytes starts. The
1048   *               method would set the values on the pair that is passed in by the caller
1049   */
1050  @Override
1051  public void asSubByteBuffer(int offset, int length, ObjectIntPair<ByteBuffer> pair) {
1052    checkRefCount();
1053    if (this.itemBeginPos[this.curItemIndex] <= offset) {
1054      int relOffsetInCurItem = offset - this.itemBeginPos[this.curItemIndex];
1055      if (this.curItem.limit() - relOffsetInCurItem >= length) {
1056        pair.setFirst(this.curItem);
1057        pair.setSecond(relOffsetInCurItem);
1058        return;
1059      }
1060    }
1061    int itemIndex = getItemIndex(offset);
1062    ByteBuffer item = this.items[itemIndex];
1063    offset = offset - this.itemBeginPos[itemIndex];
1064    if (item.limit() - offset >= length) {
1065      pair.setFirst(item);
1066      pair.setSecond(offset);
1067      return;
1068    }
1069    byte[] dst = new byte[length];
1070    int destOffset = 0;
1071    while (length > 0) {
1072      int toRead = Math.min(length, item.limit() - offset);
1073      ByteBufferUtils.copyFromBufferToArray(dst, item, offset, destOffset, toRead);
1074      length -= toRead;
1075      if (length == 0) break;
1076      itemIndex++;
1077      item = this.items[itemIndex];
1078      destOffset += toRead;
1079      offset = 0;
1080    }
1081    pair.setFirst(ByteBuffer.wrap(dst));
1082    pair.setSecond(0);
1083  }
1084
1085  /**
1086   * Copies the content from an this MBB to a ByteBuffer
1087   * @param out          the ByteBuffer to which the copy has to happen, its position will be
1088   *                     advanced.
1089   * @param sourceOffset the offset in the MBB from which the elements has to be copied
1090   * @param length       the length in the MBB upto which the elements has to be copied
1091   */
1092  @Override
1093  public void get(ByteBuffer out, int sourceOffset, int length) {
1094    checkRefCount();
1095    int itemIndex = getItemIndex(sourceOffset);
1096    ByteBuffer in = this.items[itemIndex];
1097    sourceOffset = sourceOffset - this.itemBeginPos[itemIndex];
1098    while (length > 0) {
1099      int toRead = Math.min(in.limit() - sourceOffset, length);
1100      ByteBufferUtils.copyFromBufferToBuffer(in, out, sourceOffset, toRead);
1101      length -= toRead;
1102      if (length == 0) {
1103        break;
1104      }
1105      itemIndex++;
1106      in = this.items[itemIndex];
1107      sourceOffset = 0;
1108    }
1109  }
1110
1111  /**
1112   * Copy the content from this MBB to a byte[] based on the given offset and length n * the
1113   * position from where the copy should start n * the length upto which the copy has to be done
1114   * @return byte[] with the copied contents from this MBB.
1115   */
1116  @Override
1117  public byte[] toBytes(int offset, int length) {
1118    checkRefCount();
1119    byte[] output = new byte[length];
1120    this.get(offset, output, 0, length);
1121    return output;
1122  }
1123
1124  private int internalRead(ReadableByteChannel channel, long offset, ChannelReader reader)
1125    throws IOException {
1126    checkRefCount();
1127    int total = 0;
1128    while (buffsIterator.hasNext()) {
1129      ByteBuffer buffer = buffsIterator.next();
1130      int len = read(channel, buffer, offset, reader);
1131      if (len > 0) {
1132        total += len;
1133        offset += len;
1134      }
1135      if (buffer.hasRemaining()) {
1136        // reset
1137        curItem = buffer;
1138        curItemIndex = (curItemIndex - 1);
1139        break;
1140      }
1141    }
1142    return total;
1143  }
1144
1145  @Override
1146  public int read(ReadableByteChannel channel) throws IOException {
1147    return internalRead(channel, 0, CHANNEL_READER);
1148  }
1149
1150  @Override
1151  public int read(FileChannel channel, long offset) throws IOException {
1152    return internalRead(channel, offset, FILE_READER);
1153  }
1154
1155  @Override
1156  public int write(FileChannel channel, long offset) throws IOException {
1157    checkRefCount();
1158    int total = 0;
1159    while (buffsIterator.hasNext()) {
1160      ByteBuffer buffer = buffsIterator.next();
1161      while (buffer.hasRemaining()) {
1162        int len = channel.write(buffer, offset);
1163        total += len;
1164        offset += len;
1165      }
1166    }
1167    return total;
1168  }
1169
1170  @Override
1171  public ByteBuffer[] nioByteBuffers() {
1172    checkRefCount();
1173    return this.items;
1174  }
1175
1176  @Override
1177  public boolean equals(Object obj) {
1178    if (!(obj instanceof MultiByteBuff)) return false;
1179    if (this == obj) return true;
1180    MultiByteBuff that = (MultiByteBuff) obj;
1181    if (this.capacity() != that.capacity()) return false;
1182    if (
1183      ByteBuff.compareTo(this, this.position(), this.limit(), that, that.position(), that.limit())
1184          == 0
1185    ) {
1186      return true;
1187    }
1188    return false;
1189  }
1190
1191  @Override
1192  public int hashCode() {
1193    int hash = 0;
1194    for (ByteBuffer b : this.items) {
1195      hash += b.hashCode();
1196    }
1197    return hash;
1198  }
1199
1200  @Override
1201  public MultiByteBuff retain() {
1202    refCnt.retain();
1203    return this;
1204  }
1205}