001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.nio;
019
020import java.io.IOException;
021import java.nio.BufferOverflowException;
022import java.nio.BufferUnderflowException;
023import java.nio.ByteBuffer;
024import java.nio.InvalidMarkException;
025import java.nio.channels.ReadableByteChannel;
026
027import org.apache.hadoop.hbase.util.ByteBufferUtils;
028import org.apache.hadoop.hbase.util.Bytes;
029import org.apache.hadoop.hbase.util.ObjectIntPair;
030import org.apache.yetus.audience.InterfaceAudience;
031
032import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
033
034/**
035 * Provides a unified view of all the underlying ByteBuffers and will look as if a bigger
036 * sequential buffer. This class provides similar APIs as in {@link ByteBuffer} to put/get int,
037 * short, long etc and doing operations like mark, reset, slice etc. This has to be used when
038 * data is split across multiple byte buffers and we don't want copy them to single buffer
039 * for reading from it.
040 */
041@InterfaceAudience.Private
042public class MultiByteBuff extends ByteBuff {
043
044  private final ByteBuffer[] items;
045  // Pointer to the current item in the MBB
046  private ByteBuffer curItem = null;
047  // Index of the current item in the MBB
048  private int curItemIndex = 0;
049
050  private int limit = 0;
051  private int limitedItemIndex;
052  private int markedItemIndex = -1;
053  private final int[] itemBeginPos;
054
055  public MultiByteBuff(ByteBuffer... items) {
056    assert items != null;
057    assert items.length > 0;
058    this.items = items;
059    this.curItem = this.items[this.curItemIndex];
060    // See below optimization in getInt(int) where we check whether the given index land in current
061    // item. For this we need to check whether the passed index is less than the next item begin
062    // offset. To handle this effectively for the last item buffer, we add an extra item into this
063    // array.
064    itemBeginPos = new int[items.length + 1];
065    int offset = 0;
066    for (int i = 0; i < items.length; i++) {
067      ByteBuffer item = items[i];
068      item.rewind();
069      itemBeginPos[i] = offset;
070      int l = item.limit() - item.position();
071      offset += l;
072    }
073    this.limit = offset;
074    this.itemBeginPos[items.length] = offset + 1;
075    this.limitedItemIndex = this.items.length - 1;
076  }
077
078  private MultiByteBuff(ByteBuffer[] items, int[] itemBeginPos, int limit, int limitedIndex,
079      int curItemIndex, int markedIndex) {
080    this.items = items;
081    this.curItemIndex = curItemIndex;
082    this.curItem = this.items[this.curItemIndex];
083    this.itemBeginPos = itemBeginPos;
084    this.limit = limit;
085    this.limitedItemIndex = limitedIndex;
086    this.markedItemIndex = markedIndex;
087  }
088
089  /**
090   * @throws UnsupportedOperationException MBB does not support
091   * array based operations
092   */
093  @Override
094  public byte[] array() {
095    throw new UnsupportedOperationException();
096  }
097
098  /**
099   * @throws UnsupportedOperationException MBB does not
100   * support array based operations
101   */
102  @Override
103  public int arrayOffset() {
104    throw new UnsupportedOperationException();
105  }
106
107  /**
108   * @return false. MBB does not support array based operations
109   */
110  @Override
111  public boolean hasArray() {
112    return false;
113  }
114
115  /**
116   * @return the total capacity of this MultiByteBuffer.
117   */
118  @Override
119  public int capacity() {
120    int c = 0;
121    for (ByteBuffer item : this.items) {
122      c += item.capacity();
123    }
124    return c;
125  }
126
127  /**
128   * Fetches the byte at the given index. Does not change position of the underlying ByteBuffers
129   * @param index
130   * @return the byte at the given index
131   */
132  @Override
133  public byte get(int index) {
134    int itemIndex = getItemIndex(index);
135    return ByteBufferUtils.toByte(this.items[itemIndex], index - this.itemBeginPos[itemIndex]);
136  }
137
138  @Override
139  public byte getByteAfterPosition(int offset) {
140    // Mostly the index specified will land within this current item. Short circuit for that
141    int index = offset + this.position();
142    int itemIndex = getItemIndexFromCurItemIndex(index);
143    return ByteBufferUtils.toByte(this.items[itemIndex], index - this.itemBeginPos[itemIndex]);
144  }
145
146  /*
147   * Returns in which sub ByteBuffer, the given element index will be available.
148   */
149  private int getItemIndex(int elemIndex) {
150    int index = 1;
151    while (elemIndex >= this.itemBeginPos[index]) {
152      index++;
153      if (index == this.itemBeginPos.length) {
154        throw new IndexOutOfBoundsException();
155      }
156    }
157    return index - 1;
158  }
159
160  /*
161   * Returns in which sub ByteBuffer, the given element index will be available. In this case we are
162   * sure that the item will be after MBB's current position
163   */
164  private int getItemIndexFromCurItemIndex(int elemIndex) {
165    int index = this.curItemIndex;
166    while (elemIndex >= this.itemBeginPos[index]) {
167      index++;
168      if (index == this.itemBeginPos.length) {
169        throw new IndexOutOfBoundsException();
170      }
171    }
172    return index - 1;
173  }
174
175  /**
176   * Fetches the int at the given index. Does not change position of the underlying ByteBuffers
177   * @param index
178   * @return the int value at the given index
179   */
180  @Override
181  public int getInt(int index) {
182    // Mostly the index specified will land within this current item. Short circuit for that
183    int itemIndex;
184    if (this.itemBeginPos[this.curItemIndex] <= index
185        && this.itemBeginPos[this.curItemIndex + 1] > index) {
186      itemIndex = this.curItemIndex;
187    } else {
188      itemIndex = getItemIndex(index);
189    }
190    return getInt(index, itemIndex);
191  }
192
193  @Override
194  public int getIntAfterPosition(int offset) {
195    // Mostly the index specified will land within this current item. Short circuit for that
196    int index = offset + this.position();
197    int itemIndex;
198    if (this.itemBeginPos[this.curItemIndex + 1] > index) {
199      itemIndex = this.curItemIndex;
200    } else {
201      itemIndex = getItemIndexFromCurItemIndex(index);
202    }
203    return getInt(index, itemIndex);
204  }
205
206  /**
207   * Fetches the short at the given index. Does not change position of the underlying ByteBuffers
208   * @param index
209   * @return the short value at the given index
210   */
211  @Override
212  public short getShort(int index) {
213    // Mostly the index specified will land within this current item. Short circuit for that
214    int itemIndex;
215    if (this.itemBeginPos[this.curItemIndex] <= index
216        && this.itemBeginPos[this.curItemIndex + 1] > index) {
217      itemIndex = this.curItemIndex;
218    } else {
219      itemIndex = getItemIndex(index);
220    }
221    ByteBuffer item = items[itemIndex];
222    int offsetInItem = index - this.itemBeginPos[itemIndex];
223    if (item.limit() - offsetInItem >= Bytes.SIZEOF_SHORT) {
224      return ByteBufferUtils.toShort(item, offsetInItem);
225    }
226    if (items.length - 1 == itemIndex) {
227      // means cur item is the last one and we wont be able to read a int. Throw exception
228      throw new BufferUnderflowException();
229    }
230    ByteBuffer nextItem = items[itemIndex + 1];
231    // Get available one byte from this item and remaining one from next
232    short n = 0;
233    n = (short) (n ^ (ByteBufferUtils.toByte(item, offsetInItem) & 0xFF));
234    n = (short) (n << 8);
235    n = (short) (n ^ (ByteBufferUtils.toByte(nextItem, 0) & 0xFF));
236    return n;
237  }
238
239  @Override
240  public short getShortAfterPosition(int offset) {
241    // Mostly the index specified will land within this current item. Short circuit for that
242    int index = offset + this.position();
243    int itemIndex;
244    if (this.itemBeginPos[this.curItemIndex + 1] > index) {
245      itemIndex = this.curItemIndex;
246    } else {
247      itemIndex = getItemIndexFromCurItemIndex(index);
248    }
249    return getShort(index, itemIndex);
250  }
251
252  private int getInt(int index, int itemIndex) {
253    ByteBuffer item = items[itemIndex];
254    int offsetInItem = index - this.itemBeginPos[itemIndex];
255    int remainingLen = item.limit() - offsetInItem;
256    if (remainingLen >= Bytes.SIZEOF_INT) {
257      return ByteBufferUtils.toInt(item, offsetInItem);
258    }
259    if (items.length - 1 == itemIndex) {
260      // means cur item is the last one and we wont be able to read a int. Throw exception
261      throw new BufferUnderflowException();
262    }
263    int l = 0;
264    for (int i = 0; i < Bytes.SIZEOF_INT; i++) {
265      l <<= 8;
266      l ^= get(index + i) & 0xFF;
267    }
268    return l;
269  }
270
271  private short getShort(int index, int itemIndex) {
272    ByteBuffer item = items[itemIndex];
273    int offsetInItem = index - this.itemBeginPos[itemIndex];
274    int remainingLen = item.limit() - offsetInItem;
275    if (remainingLen >= Bytes.SIZEOF_SHORT) {
276      return ByteBufferUtils.toShort(item, offsetInItem);
277    }
278    if (items.length - 1 == itemIndex) {
279      // means cur item is the last one and we wont be able to read a short. Throw exception
280      throw new BufferUnderflowException();
281    }
282    ByteBuffer nextItem = items[itemIndex + 1];
283    // Get available bytes from this item and remaining from next
284    short l = 0;
285    for (int i = offsetInItem; i < item.capacity(); i++) {
286      l = (short) (l << 8);
287      l = (short) (l ^ (ByteBufferUtils.toByte(item, i) & 0xFF));
288    }
289    for (int i = 0; i < Bytes.SIZEOF_SHORT - remainingLen; i++) {
290      l = (short) (l << 8);
291      l = (short) (l ^ (ByteBufferUtils.toByte(nextItem, i) & 0xFF));
292    }
293    return l;
294  }
295
296  private long getLong(int index, int itemIndex) {
297    ByteBuffer item = items[itemIndex];
298    int offsetInItem = index - this.itemBeginPos[itemIndex];
299    int remainingLen = item.limit() - offsetInItem;
300    if (remainingLen >= Bytes.SIZEOF_LONG) {
301      return ByteBufferUtils.toLong(item, offsetInItem);
302    }
303    if (items.length - 1 == itemIndex) {
304      // means cur item is the last one and we wont be able to read a long. Throw exception
305      throw new BufferUnderflowException();
306    }
307    long l = 0;
308    for (int i = 0; i < Bytes.SIZEOF_LONG; i++) {
309      l <<= 8;
310      l ^= get(index + i) & 0xFF;
311    }
312    return l;
313  }
314
315  /**
316   * Fetches the long at the given index. Does not change position of the underlying ByteBuffers
317   * @param index
318   * @return the long value at the given index
319   */
320  @Override
321  public long getLong(int index) {
322    // Mostly the index specified will land within this current item. Short circuit for that
323    int itemIndex;
324    if (this.itemBeginPos[this.curItemIndex] <= index
325        && this.itemBeginPos[this.curItemIndex + 1] > index) {
326      itemIndex = this.curItemIndex;
327    } else {
328      itemIndex = getItemIndex(index);
329    }
330    return getLong(index, itemIndex);
331  }
332
333  @Override
334  public long getLongAfterPosition(int offset) {
335    // Mostly the index specified will land within this current item. Short circuit for that
336    int index = offset + this.position();
337    int itemIndex;
338    if (this.itemBeginPos[this.curItemIndex + 1] > index) {
339      itemIndex = this.curItemIndex;
340    } else {
341      itemIndex = getItemIndexFromCurItemIndex(index);
342    }
343    return getLong(index, itemIndex);
344  }
345
346  /**
347   * @return this MBB's current position
348   */
349  @Override
350  public int position() {
351    return itemBeginPos[this.curItemIndex] + this.curItem.position();
352  }
353
354  /**
355   * Sets this MBB's position to the given value.
356   * @param position
357   * @return this object
358   */
359  @Override
360  public MultiByteBuff position(int position) {
361    // Short circuit for positioning within the cur item. Mostly that is the case.
362    if (this.itemBeginPos[this.curItemIndex] <= position
363        && this.itemBeginPos[this.curItemIndex + 1] > position) {
364      this.curItem.position(position - this.itemBeginPos[this.curItemIndex]);
365      return this;
366    }
367    int itemIndex = getItemIndex(position);
368    // All items from 0 - curItem-1 set position at end.
369    for (int i = 0; i < itemIndex; i++) {
370      this.items[i].position(this.items[i].limit());
371    }
372    // All items after curItem set position at begin
373    for (int i = itemIndex + 1; i < this.items.length; i++) {
374      this.items[i].position(0);
375    }
376    this.curItem = this.items[itemIndex];
377    this.curItem.position(position - this.itemBeginPos[itemIndex]);
378    this.curItemIndex = itemIndex;
379    return this;
380  }
381
382  /**
383   * Rewinds this MBB and the position is set to 0
384   * @return this object
385   */
386  @Override
387  public MultiByteBuff rewind() {
388    for (int i = 0; i < this.items.length; i++) {
389      this.items[i].rewind();
390    }
391    this.curItemIndex = 0;
392    this.curItem = this.items[this.curItemIndex];
393    this.markedItemIndex = -1;
394    return this;
395  }
396
397  /**
398   * Marks the current position of the MBB
399   * @return this object
400   */
401  @Override
402  public MultiByteBuff mark() {
403    this.markedItemIndex = this.curItemIndex;
404    this.curItem.mark();
405    return this;
406  }
407
408  /**
409   * Similar to {@link ByteBuffer}.reset(), ensures that this MBB
410   * is reset back to last marked position.
411   * @return This MBB
412   */
413  @Override
414  public MultiByteBuff reset() {
415    // when the buffer is moved to the next one.. the reset should happen on the previous marked
416    // item and the new one should be taken as the base
417    if (this.markedItemIndex < 0) throw new InvalidMarkException();
418    ByteBuffer markedItem = this.items[this.markedItemIndex];
419    markedItem.reset();
420    this.curItem = markedItem;
421    // All items after the marked position upto the current item should be reset to 0
422    for (int i = this.curItemIndex; i > this.markedItemIndex; i--) {
423      this.items[i].position(0);
424    }
425    this.curItemIndex = this.markedItemIndex;
426    return this;
427  }
428
429  /**
430   * Returns the number of elements between the current position and the
431   * limit.
432   * @return the remaining elements in this MBB
433   */
434  @Override
435  public int remaining() {
436    int remain = 0;
437    for (int i = curItemIndex; i < items.length; i++) {
438      remain += items[i].remaining();
439    }
440    return remain;
441  }
442
443  /**
444   * Returns true if there are elements between the current position and the limt
445   * @return true if there are elements, false otherwise
446   */
447  @Override
448  public final boolean hasRemaining() {
449    return this.curItem.hasRemaining() || (this.curItemIndex < this.limitedItemIndex
450        && this.items[this.curItemIndex + 1].hasRemaining());
451  }
452
453  /**
454   * A relative method that returns byte at the current position.  Increments the
455   * current position by the size of a byte.
456   * @return the byte at the current position
457   */
458  @Override
459  public byte get() {
460    if (this.curItem.remaining() == 0) {
461      if (items.length - 1 == this.curItemIndex) {
462        // means cur item is the last one and we wont be able to read a long. Throw exception
463        throw new BufferUnderflowException();
464      }
465      this.curItemIndex++;
466      this.curItem = this.items[this.curItemIndex];
467    }
468    return this.curItem.get();
469  }
470
471  /**
472   * Returns the short value at the current position. Also advances the position by the size
473   * of short
474   *
475   * @return the short value at the current position
476   */
477  @Override
478  public short getShort() {
479    int remaining = this.curItem.remaining();
480    if (remaining >= Bytes.SIZEOF_SHORT) {
481      return this.curItem.getShort();
482    }
483    short n = 0;
484    n = (short) (n ^ (get() & 0xFF));
485    n = (short) (n << 8);
486    n = (short) (n ^ (get() & 0xFF));
487    return n;
488  }
489
490  /**
491   * Returns the int value at the current position. Also advances the position by the size of int
492   *
493   * @return the int value at the current position
494   */
495  @Override
496  public int getInt() {
497    int remaining = this.curItem.remaining();
498    if (remaining >= Bytes.SIZEOF_INT) {
499      return this.curItem.getInt();
500    }
501    int n = 0;
502    for (int i = 0; i < Bytes.SIZEOF_INT; i++) {
503      n <<= 8;
504      n ^= get() & 0xFF;
505    }
506    return n;
507  }
508
509
510  /**
511   * Returns the long value at the current position. Also advances the position by the size of long
512   *
513   * @return the long value at the current position
514   */
515  @Override
516  public long getLong() {
517    int remaining = this.curItem.remaining();
518    if (remaining >= Bytes.SIZEOF_LONG) {
519      return this.curItem.getLong();
520    }
521    long l = 0;
522    for (int i = 0; i < Bytes.SIZEOF_LONG; i++) {
523      l <<= 8;
524      l ^= get() & 0xFF;
525    }
526    return l;
527  }
528
529  /**
530   * Copies the content from this MBB's current position to the byte array and fills it. Also
531   * advances the position of the MBB by the length of the byte[].
532   * @param dst
533   */
534  @Override
535  public void get(byte[] dst) {
536    get(dst, 0, dst.length);
537  }
538
539  /**
540   * Copies the specified number of bytes from this MBB's current position to the byte[]'s offset.
541   * Also advances the position of the MBB by the given length.
542   * @param dst
543   * @param offset within the current array
544   * @param length upto which the bytes to be copied
545   */
546  @Override
547  public void get(byte[] dst, int offset, int length) {
548    while (length > 0) {
549      int toRead = Math.min(length, this.curItem.remaining());
550      ByteBufferUtils.copyFromBufferToArray(dst, this.curItem, this.curItem.position(), offset,
551          toRead);
552      this.curItem.position(this.curItem.position() + toRead);
553      length -= toRead;
554      if (length == 0) break;
555      this.curItemIndex++;
556      this.curItem = this.items[this.curItemIndex];
557      offset += toRead;
558    }
559  }
560
561  @Override
562  public void get(int sourceOffset, byte[] dst, int offset, int length) {
563    int itemIndex = getItemIndex(sourceOffset);
564    ByteBuffer item = this.items[itemIndex];
565    sourceOffset = sourceOffset - this.itemBeginPos[itemIndex];
566    while (length > 0) {
567      int toRead = Math.min((item.limit() - sourceOffset), length);
568      ByteBufferUtils.copyFromBufferToArray(dst, item, sourceOffset, offset,
569          toRead);
570      length -= toRead;
571      if (length == 0) break;
572      itemIndex++;
573      item = this.items[itemIndex];
574      offset += toRead;
575      sourceOffset = 0;
576    }
577  }
578
579  /**
580   * Marks the limit of this MBB.
581   * @param limit
582   * @return This MBB
583   */
584  @Override
585  public MultiByteBuff limit(int limit) {
586    this.limit = limit;
587    // Normally the limit will try to limit within the last BB item
588    int limitedIndexBegin = this.itemBeginPos[this.limitedItemIndex];
589    if (limit >= limitedIndexBegin && limit < this.itemBeginPos[this.limitedItemIndex + 1]) {
590      this.items[this.limitedItemIndex].limit(limit - limitedIndexBegin);
591      return this;
592    }
593    int itemIndex = getItemIndex(limit);
594    int beginOffset = this.itemBeginPos[itemIndex];
595    int offsetInItem = limit - beginOffset;
596    ByteBuffer item = items[itemIndex];
597    item.limit(offsetInItem);
598    for (int i = this.limitedItemIndex; i < itemIndex; i++) {
599      this.items[i].limit(this.items[i].capacity());
600    }
601    this.limitedItemIndex = itemIndex;
602    for (int i = itemIndex + 1; i < this.items.length; i++) {
603      this.items[i].limit(this.items[i].position());
604    }
605    return this;
606  }
607
608  /**
609   * Returns the limit of this MBB
610   * @return limit of the MBB
611   */
612  @Override
613  public int limit() {
614    return this.limit;
615  }
616
617  /**
618   * Returns an MBB which is a sliced version of this MBB. The position, limit and mark
619   * of the new MBB will be independent than that of the original MBB.
620   * The content of the new MBB will start at this MBB's current position
621   * @return a sliced MBB
622   */
623  @Override
624  public MultiByteBuff slice() {
625    ByteBuffer[] copy = new ByteBuffer[this.limitedItemIndex - this.curItemIndex + 1];
626    for (int i = curItemIndex, j = 0; i <= this.limitedItemIndex; i++, j++) {
627      copy[j] = this.items[i].slice();
628    }
629    return new MultiByteBuff(copy);
630  }
631
632  /**
633   * Returns an MBB which is a duplicate version of this MBB. The position, limit and mark
634   * of the new MBB will be independent than that of the original MBB.
635   * The content of the new MBB will start at this MBB's current position
636   * The position, limit and mark of the new MBB would be identical to this MBB in terms of
637   * values.
638   * @return a sliced MBB
639   */
640  @Override
641  public MultiByteBuff duplicate() {
642    ByteBuffer[] itemsCopy = new ByteBuffer[this.items.length];
643    for (int i = 0; i < this.items.length; i++) {
644      itemsCopy[i] = items[i].duplicate();
645    }
646    return new MultiByteBuff(itemsCopy, this.itemBeginPos, this.limit, this.limitedItemIndex,
647        this.curItemIndex, this.markedItemIndex);
648  }
649
650  /**
651   * Writes a byte to this MBB at the current position and increments the position
652   * @param b
653   * @return this object
654   */
655  @Override
656  public MultiByteBuff put(byte b) {
657    if (this.curItem.remaining() == 0) {
658      if (this.curItemIndex == this.items.length - 1) {
659        throw new BufferOverflowException();
660      }
661      this.curItemIndex++;
662      this.curItem = this.items[this.curItemIndex];
663    }
664    this.curItem.put(b);
665    return this;
666  }
667
668  /**
669   * Writes a byte to this MBB at the given index
670   * @param index
671   * @param b
672   * @return this object
673   */
674  @Override
675  public MultiByteBuff put(int index, byte b) {
676    int itemIndex = getItemIndex(limit);
677    ByteBuffer item = items[itemIndex];
678    item.put(index - itemBeginPos[itemIndex], b);
679    return this;
680  }
681
682  /**
683   * Copies from a src MBB to this MBB.
684   * @param offset the position in this MBB to which the copy should happen
685   * @param src the src MBB
686   * @param srcOffset the offset in the src MBB from where the elements should be read
687   * @param length the length upto which the copy should happen
688   */
689  @Override
690  public MultiByteBuff put(int offset, ByteBuff src, int srcOffset, int length) {
691    int destItemIndex = getItemIndex(offset);
692    int srcItemIndex = getItemIndex(srcOffset);
693    ByteBuffer destItem = this.items[destItemIndex];
694    offset = offset - this.itemBeginPos[destItemIndex];
695
696    ByteBuffer srcItem = getItemByteBuffer(src, srcItemIndex);
697    srcOffset = srcOffset - this.itemBeginPos[srcItemIndex];
698    int toRead, toWrite, toMove;
699    while (length > 0) {
700      toWrite = destItem.limit() - offset;
701      toRead = srcItem.limit() - srcOffset;
702      toMove = Math.min(length, Math.min(toRead, toWrite));
703      ByteBufferUtils.copyFromBufferToBuffer(srcItem, destItem, srcOffset, offset, toMove);
704      length -= toMove;
705      if (length == 0) break;
706      if (toRead < toWrite) {
707        srcItem = getItemByteBuffer(src, ++srcItemIndex);
708        srcOffset = 0;
709        offset += toMove;
710      } else if (toRead > toWrite) {
711        destItem = this.items[++destItemIndex];
712        offset = 0;
713        srcOffset += toMove;
714      } else {
715        // toRead = toWrite case
716        srcItem = getItemByteBuffer(src, ++srcItemIndex);
717        srcOffset = 0;
718        destItem = this.items[++destItemIndex];
719        offset = 0;
720      }
721    }
722    return this;
723  }
724
725  private static ByteBuffer getItemByteBuffer(ByteBuff buf, int index) {
726    return (buf instanceof SingleByteBuff) ? ((SingleByteBuff) buf).getEnclosingByteBuffer()
727        : ((MultiByteBuff) buf).items[index];
728  }
729
730  /**
731   * Writes an int to this MBB at its current position. Also advances the position by size of int
732   * @param val Int value to write
733   * @return this object
734   */
735  @Override
736  public MultiByteBuff putInt(int val) {
737    if (this.curItem.remaining() >= Bytes.SIZEOF_INT) {
738      this.curItem.putInt(val);
739      return this;
740    }
741    if (this.curItemIndex == this.items.length - 1) {
742      throw new BufferOverflowException();
743    }
744    // During read, we will read as byte by byte for this case. So just write in Big endian
745    put(int3(val));
746    put(int2(val));
747    put(int1(val));
748    put(int0(val));
749    return this;
750  }
751
752  private static byte int3(int x) {
753    return (byte) (x >> 24);
754  }
755
756  private static byte int2(int x) {
757    return (byte) (x >> 16);
758  }
759
760  private static byte int1(int x) {
761    return (byte) (x >> 8);
762  }
763
764  private static byte int0(int x) {
765    return (byte) (x);
766  }
767
768  /**
769   * Copies from the given byte[] to this MBB
770   * @param src
771   * @return this MBB
772   */
773  @Override
774  public final MultiByteBuff put(byte[] src) {
775    return put(src, 0, src.length);
776  }
777
778  /**
779   * Copies from the given byte[] to this MBB
780   * @param src
781   * @param offset the position in the byte array from which the copy should be done
782   * @param length the length upto which the copy should happen
783   * @return this MBB
784   */
785  @Override
786  public MultiByteBuff put(byte[] src, int offset, int length) {
787    if (this.curItem.remaining() >= length) {
788      ByteBufferUtils.copyFromArrayToBuffer(this.curItem, src, offset, length);
789      return this;
790    }
791    int end = offset + length;
792    for (int i = offset; i < end; i++) {
793      this.put(src[i]);
794    }
795    return this;
796  }
797
798
799  /**
800   * Writes a long to this MBB at its current position. Also advances the position by size of long
801   * @param val Long value to write
802   * @return this object
803   */
804  @Override
805  public MultiByteBuff putLong(long val) {
806    if (this.curItem.remaining() >= Bytes.SIZEOF_LONG) {
807      this.curItem.putLong(val);
808      return this;
809    }
810    if (this.curItemIndex == this.items.length - 1) {
811      throw new BufferOverflowException();
812    }
813    // During read, we will read as byte by byte for this case. So just write in Big endian
814    put(long7(val));
815    put(long6(val));
816    put(long5(val));
817    put(long4(val));
818    put(long3(val));
819    put(long2(val));
820    put(long1(val));
821    put(long0(val));
822    return this;
823  }
824
825  private static byte long7(long x) {
826    return (byte) (x >> 56);
827  }
828
829  private static byte long6(long x) {
830    return (byte) (x >> 48);
831  }
832
833  private static byte long5(long x) {
834    return (byte) (x >> 40);
835  }
836
837  private static byte long4(long x) {
838    return (byte) (x >> 32);
839  }
840
841  private static byte long3(long x) {
842    return (byte) (x >> 24);
843  }
844
845  private static byte long2(long x) {
846    return (byte) (x >> 16);
847  }
848
849  private static byte long1(long x) {
850    return (byte) (x >> 8);
851  }
852
853  private static byte long0(long x) {
854    return (byte) (x);
855  }
856
857  /**
858   * Jumps the current position of this MBB by specified length.
859   * @param length
860   */
861  @Override
862  public MultiByteBuff skip(int length) {
863    // Get available bytes from this item and remaining from next
864    int jump = 0;
865    while (true) {
866      jump = this.curItem.remaining();
867      if (jump >= length) {
868        this.curItem.position(this.curItem.position() + length);
869        break;
870      }
871      this.curItem.position(this.curItem.position() + jump);
872      length -= jump;
873      this.curItemIndex++;
874      this.curItem = this.items[this.curItemIndex];
875    }
876    return this;
877  }
878
879  /**
880   * Jumps back the current position of this MBB by specified length.
881   * @param length
882   */
883  @Override
884  public MultiByteBuff moveBack(int length) {
885    while (length != 0) {
886      if (length > curItem.position()) {
887        length -= curItem.position();
888        this.curItem.position(0);
889        this.curItemIndex--;
890        this.curItem = this.items[curItemIndex];
891      } else {
892        this.curItem.position(curItem.position() - length);
893        break;
894      }
895    }
896    return this;
897  }
898
899 /**
900   * Returns bytes from current position till length specified, as a single ByteBuffer. When all
901   * these bytes happen to be in a single ByteBuffer, which this object wraps, that ByteBuffer item
902   * as such will be returned. So users are warned not to change the position or limit of this
903   * returned ByteBuffer. The position of the returned byte buffer is at the begin of the required
904   * bytes. When the required bytes happen to span across multiple ByteBuffers, this API will copy
905   * the bytes to a newly created ByteBuffer of required size and return that.
906   *
907   * @param length number of bytes required.
908   * @return bytes from current position till length specified, as a single ByteButter.
909   */
910  @Override
911  public ByteBuffer asSubByteBuffer(int length) {
912    if (this.curItem.remaining() >= length) {
913      return this.curItem;
914    }
915    int offset = 0;
916    byte[] dupB = new byte[length];
917    int locCurItemIndex = curItemIndex;
918    ByteBuffer locCurItem = curItem;
919    while (length > 0) {
920      int toRead = Math.min(length, locCurItem.remaining());
921      ByteBufferUtils
922          .copyFromBufferToArray(dupB, locCurItem, locCurItem.position(), offset, toRead);
923      length -= toRead;
924      if (length == 0) break;
925      locCurItemIndex++;
926      locCurItem = this.items[locCurItemIndex];
927      offset += toRead;
928    }
929    return ByteBuffer.wrap(dupB);
930  }
931
932  /**
933   * Returns bytes from given offset till length specified, as a single ByteBuffer. When all these
934   * bytes happen to be in a single ByteBuffer, which this object wraps, that ByteBuffer item as
935   * such will be returned (with offset in this ByteBuffer where the bytes starts). So users are
936   * warned not to change the position or limit of this returned ByteBuffer. When the required bytes
937   * happen to span across multiple ByteBuffers, this API will copy the bytes to a newly created
938   * ByteBuffer of required size and return that.
939   *
940   * @param offset the offset in this MBB from where the subBuffer should be created
941   * @param length the length of the subBuffer
942   * @param pair a pair that will have the bytes from the current position till length specified, as
943   *        a single ByteBuffer and offset in that Buffer where the bytes starts. The method would
944   *        set the values on the pair that is passed in by the caller
945   */
946  @Override
947  public void asSubByteBuffer(int offset, int length, ObjectIntPair<ByteBuffer> pair) {
948    if (this.itemBeginPos[this.curItemIndex] <= offset) {
949      int relOffsetInCurItem = offset - this.itemBeginPos[this.curItemIndex];
950      if (this.curItem.limit() - relOffsetInCurItem >= length) {
951        pair.setFirst(this.curItem);
952        pair.setSecond(relOffsetInCurItem);
953        return;
954      }
955    }
956    int itemIndex = getItemIndex(offset);
957    ByteBuffer item = this.items[itemIndex];
958    offset = offset - this.itemBeginPos[itemIndex];
959    if (item.limit() - offset >= length) {
960      pair.setFirst(item);
961      pair.setSecond(offset);
962      return;
963    }
964    byte[] dst = new byte[length];
965    int destOffset = 0;
966    while (length > 0) {
967      int toRead = Math.min(length, item.limit() - offset);
968      ByteBufferUtils.copyFromBufferToArray(dst, item, offset, destOffset, toRead);
969      length -= toRead;
970      if (length == 0) break;
971      itemIndex++;
972      item = this.items[itemIndex];
973      destOffset += toRead;
974      offset = 0;
975    }
976    pair.setFirst(ByteBuffer.wrap(dst));
977    pair.setSecond(0);
978    return;
979  }
980
981  /**
982   * Copies the content from an this MBB to a ByteBuffer
983   * @param out the ByteBuffer to which the copy has to happen
984   * @param sourceOffset the offset in the MBB from which the elements has
985   * to be copied
986   * @param length the length in the MBB upto which the elements has to be copied
987   */
988  @Override
989  public void get(ByteBuffer out, int sourceOffset,
990      int length) {
991      // Not used from real read path actually. So not going with
992      // optimization
993    for (int i = 0; i < length; ++i) {
994      out.put(this.get(sourceOffset + i));
995    }
996  }
997
998  /**
999   * Copy the content from this MBB to a byte[] based on the given offset and
1000   * length
1001   *
1002   * @param offset
1003   *          the position from where the copy should start
1004   * @param length
1005   *          the length upto which the copy has to be done
1006   * @return byte[] with the copied contents from this MBB.
1007   */
1008  @Override
1009  public byte[] toBytes(int offset, int length) {
1010    byte[] output = new byte[length];
1011    this.get(offset, output, 0, length);
1012    return output;
1013  }
1014
1015  @Override
1016  public int read(ReadableByteChannel channel) throws IOException {
1017    int total = 0;
1018    while (true) {
1019      // Read max possible into the current BB
1020      int len = channelRead(channel, this.curItem);
1021      if (len > 0)
1022        total += len;
1023      if (this.curItem.hasRemaining()) {
1024        // We were not able to read enough to fill the current BB itself. Means there is no point in
1025        // doing more reads from Channel. Only this much there for now.
1026        break;
1027      } else {
1028        if (this.curItemIndex >= this.limitedItemIndex) break;
1029        this.curItemIndex++;
1030        this.curItem = this.items[this.curItemIndex];
1031      }
1032    }
1033    return total;
1034  }
1035
1036  @Override
1037  public boolean equals(Object obj) {
1038    if (!(obj instanceof MultiByteBuff)) return false;
1039    if (this == obj) return true;
1040    MultiByteBuff that = (MultiByteBuff) obj;
1041    if (this.capacity() != that.capacity()) return false;
1042    if (ByteBuff.compareTo(this, this.position(), this.limit(), that, that.position(),
1043        that.limit()) == 0) {
1044      return true;
1045    }
1046    return false;
1047  }
1048
1049  @Override
1050  public int hashCode() {
1051    int hash = 0;
1052    for (ByteBuffer b : this.items) {
1053      hash += b.hashCode();
1054    }
1055    return hash;
1056  }
1057
1058  /**
1059   * @return the ByteBuffers which this wraps.
1060   */
1061  @VisibleForTesting
1062  public ByteBuffer[] getEnclosingByteBuffers() {
1063    return this.items;
1064  }
1065}