001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.nio;
019
020import static org.apache.hadoop.hbase.io.ByteBuffAllocator.NONE;
021
022import java.io.IOException;
023import java.nio.BufferOverflowException;
024import java.nio.BufferUnderflowException;
025import java.nio.ByteBuffer;
026import java.nio.InvalidMarkException;
027import java.nio.channels.FileChannel;
028import java.nio.channels.ReadableByteChannel;
029import java.util.Iterator;
030import java.util.NoSuchElementException;
031
032import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
033import org.apache.hadoop.hbase.util.ByteBufferUtils;
034import org.apache.hadoop.hbase.util.Bytes;
035import org.apache.hadoop.hbase.util.ObjectIntPair;
036import org.apache.yetus.audience.InterfaceAudience;
037
038/**
039 * Provides a unified view of all the underlying ByteBuffers and will look as if a bigger
040 * sequential buffer. This class provides similar APIs as in {@link ByteBuffer} to put/get int,
041 * short, long etc and doing operations like mark, reset, slice etc. This has to be used when
042 * data is split across multiple byte buffers and we don't want copy them to single buffer
043 * for reading from it.
044 */
045@InterfaceAudience.Private
046public class MultiByteBuff extends ByteBuff {
047
048  private final ByteBuffer[] items;
049  // Pointer to the current item in the MBB
050  private ByteBuffer curItem = null;
051  // Index of the current item in the MBB
052  private int curItemIndex = 0;
053
054  private int limit = 0;
055  private int limitedItemIndex;
056  private int markedItemIndex = -1;
057  private final int[] itemBeginPos;
058
059  private Iterator<ByteBuffer> buffsIterator = new Iterator<ByteBuffer>() {
060    @Override
061    public boolean hasNext() {
062      return curItemIndex < limitedItemIndex ||
063          (curItemIndex == limitedItemIndex && items[curItemIndex].hasRemaining());
064    }
065
066    @Override
067    public ByteBuffer next() {
068      if (curItemIndex >= items.length) {
069        throw new NoSuchElementException("items overflow");
070      }
071      curItem = items[curItemIndex++];
072      return curItem;
073    }
074  };
075
076  public MultiByteBuff(ByteBuffer... items) {
077    this(NONE, items);
078  }
079
080  public MultiByteBuff(Recycler recycler, ByteBuffer... items) {
081    this(new RefCnt(recycler), items);
082  }
083
084  MultiByteBuff(RefCnt refCnt, ByteBuffer... items) {
085    this.refCnt = refCnt;
086    assert items != null;
087    assert items.length > 0;
088    this.items = items;
089    this.curItem = this.items[this.curItemIndex];
090    // See below optimization in getInt(int) where we check whether the given index land in current
091    // item. For this we need to check whether the passed index is less than the next item begin
092    // offset. To handle this effectively for the last item buffer, we add an extra item into this
093    // array.
094    itemBeginPos = new int[items.length + 1];
095    int offset = 0;
096    for (int i = 0; i < items.length; i++) {
097      ByteBuffer item = items[i];
098      item.rewind();
099      itemBeginPos[i] = offset;
100      int l = item.limit() - item.position();
101      offset += l;
102    }
103    this.limit = offset;
104    this.itemBeginPos[items.length] = offset + 1;
105    this.limitedItemIndex = this.items.length - 1;
106  }
107
108  private MultiByteBuff(RefCnt refCnt, ByteBuffer[] items, int[] itemBeginPos, int limit,
109      int limitedIndex, int curItemIndex, int markedIndex) {
110    this.refCnt = refCnt;
111    this.items = items;
112    this.curItemIndex = curItemIndex;
113    this.curItem = this.items[this.curItemIndex];
114    this.itemBeginPos = itemBeginPos;
115    this.limit = limit;
116    this.limitedItemIndex = limitedIndex;
117    this.markedItemIndex = markedIndex;
118  }
119
120  /**
121   * @throws UnsupportedOperationException MBB does not support
122   * array based operations
123   */
124  @Override
125  public byte[] array() {
126    throw new UnsupportedOperationException();
127  }
128
129  /**
130   * @throws UnsupportedOperationException MBB does not
131   * support array based operations
132   */
133  @Override
134  public int arrayOffset() {
135    throw new UnsupportedOperationException();
136  }
137
138  /**
139   * @return false. MBB does not support array based operations
140   */
141  @Override
142  public boolean hasArray() {
143    return false;
144  }
145
146  /**
147   * @return the total capacity of this MultiByteBuffer.
148   */
149  @Override
150  public int capacity() {
151    checkRefCount();
152    int c = 0;
153    for (ByteBuffer item : this.items) {
154      c += item.capacity();
155    }
156    return c;
157  }
158
159  /**
160   * Fetches the byte at the given index. Does not change position of the underlying ByteBuffers
161   * @param index
162   * @return the byte at the given index
163   */
164  @Override
165  public byte get(int index) {
166    checkRefCount();
167    int itemIndex = getItemIndex(index);
168    return ByteBufferUtils.toByte(this.items[itemIndex], index - this.itemBeginPos[itemIndex]);
169  }
170
171  @Override
172  public byte getByteAfterPosition(int offset) {
173    checkRefCount();
174    // Mostly the index specified will land within this current item. Short circuit for that
175    int index = offset + this.position();
176    int itemIndex = getItemIndexFromCurItemIndex(index);
177    return ByteBufferUtils.toByte(this.items[itemIndex], index - this.itemBeginPos[itemIndex]);
178  }
179
180  /*
181   * Returns in which sub ByteBuffer, the given element index will be available.
182   */
183  private int getItemIndex(int elemIndex) {
184    int index = 1;
185    while (elemIndex >= this.itemBeginPos[index]) {
186      index++;
187      if (index == this.itemBeginPos.length) {
188        throw new IndexOutOfBoundsException();
189      }
190    }
191    return index - 1;
192  }
193
194  /*
195   * Returns in which sub ByteBuffer, the given element index will be available. In this case we are
196   * sure that the item will be after MBB's current position
197   */
198  private int getItemIndexFromCurItemIndex(int elemIndex) {
199    int index = this.curItemIndex;
200    while (elemIndex >= this.itemBeginPos[index]) {
201      index++;
202      if (index == this.itemBeginPos.length) {
203        throw new IndexOutOfBoundsException();
204      }
205    }
206    return index - 1;
207  }
208
209  /**
210   * Fetches the int at the given index. Does not change position of the underlying ByteBuffers
211   * @param index
212   * @return the int value at the given index
213   */
214  @Override
215  public int getInt(int index) {
216    checkRefCount();
217    // Mostly the index specified will land within this current item. Short circuit for that
218    int itemIndex;
219    if (this.itemBeginPos[this.curItemIndex] <= index
220        && this.itemBeginPos[this.curItemIndex + 1] > index) {
221      itemIndex = this.curItemIndex;
222    } else {
223      itemIndex = getItemIndex(index);
224    }
225    return getInt(index, itemIndex);
226  }
227
228  @Override
229  public int getIntAfterPosition(int offset) {
230    checkRefCount();
231    // Mostly the index specified will land within this current item. Short circuit for that
232    int index = offset + this.position();
233    int itemIndex;
234    if (this.itemBeginPos[this.curItemIndex + 1] > index) {
235      itemIndex = this.curItemIndex;
236    } else {
237      itemIndex = getItemIndexFromCurItemIndex(index);
238    }
239    return getInt(index, itemIndex);
240  }
241
242  /**
243   * Fetches the short at the given index. Does not change position of the underlying ByteBuffers
244   * @param index
245   * @return the short value at the given index
246   */
247  @Override
248  public short getShort(int index) {
249    checkRefCount();
250    // Mostly the index specified will land within this current item. Short circuit for that
251    int itemIndex;
252    if (this.itemBeginPos[this.curItemIndex] <= index
253        && this.itemBeginPos[this.curItemIndex + 1] > index) {
254      itemIndex = this.curItemIndex;
255    } else {
256      itemIndex = getItemIndex(index);
257    }
258    ByteBuffer item = items[itemIndex];
259    int offsetInItem = index - this.itemBeginPos[itemIndex];
260    if (item.limit() - offsetInItem >= Bytes.SIZEOF_SHORT) {
261      return ByteBufferUtils.toShort(item, offsetInItem);
262    }
263    if (items.length - 1 == itemIndex) {
264      // means cur item is the last one and we wont be able to read a int. Throw exception
265      throw new BufferUnderflowException();
266    }
267    ByteBuffer nextItem = items[itemIndex + 1];
268    // Get available one byte from this item and remaining one from next
269    short n = 0;
270    n = (short) (n ^ (ByteBufferUtils.toByte(item, offsetInItem) & 0xFF));
271    n = (short) (n << 8);
272    n = (short) (n ^ (ByteBufferUtils.toByte(nextItem, 0) & 0xFF));
273    return n;
274  }
275
276  @Override
277  public short getShortAfterPosition(int offset) {
278    checkRefCount();
279    // Mostly the index specified will land within this current item. Short circuit for that
280    int index = offset + this.position();
281    int itemIndex;
282    if (this.itemBeginPos[this.curItemIndex + 1] > index) {
283      itemIndex = this.curItemIndex;
284    } else {
285      itemIndex = getItemIndexFromCurItemIndex(index);
286    }
287    return getShort(index, itemIndex);
288  }
289
290  private int getInt(int index, int itemIndex) {
291    ByteBuffer item = items[itemIndex];
292    int offsetInItem = index - this.itemBeginPos[itemIndex];
293    int remainingLen = item.limit() - offsetInItem;
294    if (remainingLen >= Bytes.SIZEOF_INT) {
295      return ByteBufferUtils.toInt(item, offsetInItem);
296    }
297    if (items.length - 1 == itemIndex) {
298      // means cur item is the last one and we wont be able to read a int. Throw exception
299      throw new BufferUnderflowException();
300    }
301    int l = 0;
302    for (int i = 0; i < Bytes.SIZEOF_INT; i++) {
303      l <<= 8;
304      l ^= get(index + i) & 0xFF;
305    }
306    return l;
307  }
308
309  private short getShort(int index, int itemIndex) {
310    ByteBuffer item = items[itemIndex];
311    int offsetInItem = index - this.itemBeginPos[itemIndex];
312    int remainingLen = item.limit() - offsetInItem;
313    if (remainingLen >= Bytes.SIZEOF_SHORT) {
314      return ByteBufferUtils.toShort(item, offsetInItem);
315    }
316    if (items.length - 1 == itemIndex) {
317      // means cur item is the last one and we wont be able to read a short. Throw exception
318      throw new BufferUnderflowException();
319    }
320    ByteBuffer nextItem = items[itemIndex + 1];
321    // Get available bytes from this item and remaining from next
322    short l = 0;
323    for (int i = offsetInItem; i < item.capacity(); i++) {
324      l = (short) (l << 8);
325      l = (short) (l ^ (ByteBufferUtils.toByte(item, i) & 0xFF));
326    }
327    for (int i = 0; i < Bytes.SIZEOF_SHORT - remainingLen; i++) {
328      l = (short) (l << 8);
329      l = (short) (l ^ (ByteBufferUtils.toByte(nextItem, i) & 0xFF));
330    }
331    return l;
332  }
333
334  private long getLong(int index, int itemIndex) {
335    ByteBuffer item = items[itemIndex];
336    int offsetInItem = index - this.itemBeginPos[itemIndex];
337    int remainingLen = item.limit() - offsetInItem;
338    if (remainingLen >= Bytes.SIZEOF_LONG) {
339      return ByteBufferUtils.toLong(item, offsetInItem);
340    }
341    if (items.length - 1 == itemIndex) {
342      // means cur item is the last one and we wont be able to read a long. Throw exception
343      throw new BufferUnderflowException();
344    }
345    long l = 0;
346    for (int i = 0; i < Bytes.SIZEOF_LONG; i++) {
347      l <<= 8;
348      l ^= get(index + i) & 0xFF;
349    }
350    return l;
351  }
352
353  /**
354   * Fetches the long at the given index. Does not change position of the underlying ByteBuffers
355   * @param index
356   * @return the long value at the given index
357   */
358  @Override
359  public long getLong(int index) {
360    checkRefCount();
361    // Mostly the index specified will land within this current item. Short circuit for that
362    int itemIndex;
363    if (this.itemBeginPos[this.curItemIndex] <= index
364        && this.itemBeginPos[this.curItemIndex + 1] > index) {
365      itemIndex = this.curItemIndex;
366    } else {
367      itemIndex = getItemIndex(index);
368    }
369    return getLong(index, itemIndex);
370  }
371
372  @Override
373  public long getLongAfterPosition(int offset) {
374    checkRefCount();
375    // Mostly the index specified will land within this current item. Short circuit for that
376    int index = offset + this.position();
377    int itemIndex;
378    if (this.itemBeginPos[this.curItemIndex + 1] > index) {
379      itemIndex = this.curItemIndex;
380    } else {
381      itemIndex = getItemIndexFromCurItemIndex(index);
382    }
383    return getLong(index, itemIndex);
384  }
385
386  /**
387   * @return this MBB's current position
388   */
389  @Override
390  public int position() {
391    checkRefCount();
392    return itemBeginPos[this.curItemIndex] + this.curItem.position();
393  }
394
395  /**
396   * Sets this MBB's position to the given value.
397   * @param position
398   * @return this object
399   */
400  @Override
401  public MultiByteBuff position(int position) {
402    checkRefCount();
403    // Short circuit for positioning within the cur item. Mostly that is the case.
404    if (this.itemBeginPos[this.curItemIndex] <= position
405        && this.itemBeginPos[this.curItemIndex + 1] > position) {
406      this.curItem.position(position - this.itemBeginPos[this.curItemIndex]);
407      return this;
408    }
409    int itemIndex = getItemIndex(position);
410    // All items from 0 - curItem-1 set position at end.
411    for (int i = 0; i < itemIndex; i++) {
412      this.items[i].position(this.items[i].limit());
413    }
414    // All items after curItem set position at begin
415    for (int i = itemIndex + 1; i < this.items.length; i++) {
416      this.items[i].position(0);
417    }
418    this.curItem = this.items[itemIndex];
419    this.curItem.position(position - this.itemBeginPos[itemIndex]);
420    this.curItemIndex = itemIndex;
421    return this;
422  }
423
424  /**
425   * Rewinds this MBB and the position is set to 0
426   * @return this object
427   */
428  @Override
429  public MultiByteBuff rewind() {
430    checkRefCount();
431    for (int i = 0; i < this.items.length; i++) {
432      this.items[i].rewind();
433    }
434    this.curItemIndex = 0;
435    this.curItem = this.items[this.curItemIndex];
436    this.markedItemIndex = -1;
437    return this;
438  }
439
440  /**
441   * Marks the current position of the MBB
442   * @return this object
443   */
444  @Override
445  public MultiByteBuff mark() {
446    checkRefCount();
447    this.markedItemIndex = this.curItemIndex;
448    this.curItem.mark();
449    return this;
450  }
451
452  /**
453   * Similar to {@link ByteBuffer}.reset(), ensures that this MBB
454   * is reset back to last marked position.
455   * @return This MBB
456   */
457  @Override
458  public MultiByteBuff reset() {
459    checkRefCount();
460    // when the buffer is moved to the next one.. the reset should happen on the previous marked
461    // item and the new one should be taken as the base
462    if (this.markedItemIndex < 0) throw new InvalidMarkException();
463    ByteBuffer markedItem = this.items[this.markedItemIndex];
464    markedItem.reset();
465    this.curItem = markedItem;
466    // All items after the marked position upto the current item should be reset to 0
467    for (int i = this.curItemIndex; i > this.markedItemIndex; i--) {
468      this.items[i].position(0);
469    }
470    this.curItemIndex = this.markedItemIndex;
471    return this;
472  }
473
474  /**
475   * Returns the number of elements between the current position and the
476   * limit.
477   * @return the remaining elements in this MBB
478   */
479  @Override
480  public int remaining() {
481    checkRefCount();
482    int remain = 0;
483    for (int i = curItemIndex; i < items.length; i++) {
484      remain += items[i].remaining();
485    }
486    return remain;
487  }
488
489  /**
490   * Returns true if there are elements between the current position and the limt
491   * @return true if there are elements, false otherwise
492   */
493  @Override
494  public final boolean hasRemaining() {
495    checkRefCount();
496    return this.curItem.hasRemaining() || (this.curItemIndex < this.limitedItemIndex
497        && this.items[this.curItemIndex + 1].hasRemaining());
498  }
499
500  /**
501   * A relative method that returns byte at the current position.  Increments the
502   * current position by the size of a byte.
503   * @return the byte at the current position
504   */
505  @Override
506  public byte get() {
507    checkRefCount();
508    if (this.curItem.remaining() == 0) {
509      if (items.length - 1 == this.curItemIndex) {
510        // means cur item is the last one and we wont be able to read a long. Throw exception
511        throw new BufferUnderflowException();
512      }
513      this.curItemIndex++;
514      this.curItem = this.items[this.curItemIndex];
515    }
516    return this.curItem.get();
517  }
518
519  /**
520   * Returns the short value at the current position. Also advances the position by the size
521   * of short
522   *
523   * @return the short value at the current position
524   */
525  @Override
526  public short getShort() {
527    checkRefCount();
528    int remaining = this.curItem.remaining();
529    if (remaining >= Bytes.SIZEOF_SHORT) {
530      return this.curItem.getShort();
531    }
532    short n = 0;
533    n = (short) (n ^ (get() & 0xFF));
534    n = (short) (n << 8);
535    n = (short) (n ^ (get() & 0xFF));
536    return n;
537  }
538
539  /**
540   * Returns the int value at the current position. Also advances the position by the size of int
541   *
542   * @return the int value at the current position
543   */
544  @Override
545  public int getInt() {
546    checkRefCount();
547    int remaining = this.curItem.remaining();
548    if (remaining >= Bytes.SIZEOF_INT) {
549      return this.curItem.getInt();
550    }
551    int n = 0;
552    for (int i = 0; i < Bytes.SIZEOF_INT; i++) {
553      n <<= 8;
554      n ^= get() & 0xFF;
555    }
556    return n;
557  }
558
559
560  /**
561   * Returns the long value at the current position. Also advances the position by the size of long
562   *
563   * @return the long value at the current position
564   */
565  @Override
566  public long getLong() {
567    checkRefCount();
568    int remaining = this.curItem.remaining();
569    if (remaining >= Bytes.SIZEOF_LONG) {
570      return this.curItem.getLong();
571    }
572    long l = 0;
573    for (int i = 0; i < Bytes.SIZEOF_LONG; i++) {
574      l <<= 8;
575      l ^= get() & 0xFF;
576    }
577    return l;
578  }
579
580  /**
581   * Copies the content from this MBB's current position to the byte array and fills it. Also
582   * advances the position of the MBB by the length of the byte[].
583   * @param dst
584   */
585  @Override
586  public void get(byte[] dst) {
587    get(dst, 0, dst.length);
588  }
589
590  /**
591   * Copies the specified number of bytes from this MBB's current position to the byte[]'s offset.
592   * Also advances the position of the MBB by the given length.
593   * @param dst
594   * @param offset within the current array
595   * @param length upto which the bytes to be copied
596   */
597  @Override
598  public void get(byte[] dst, int offset, int length) {
599    checkRefCount();
600    while (length > 0) {
601      int toRead = Math.min(length, this.curItem.remaining());
602      ByteBufferUtils.copyFromBufferToArray(dst, this.curItem, this.curItem.position(), offset,
603        toRead);
604      this.curItem.position(this.curItem.position() + toRead);
605      length -= toRead;
606      if (length == 0) break;
607      this.curItemIndex++;
608      this.curItem = this.items[this.curItemIndex];
609      offset += toRead;
610    }
611  }
612
613  @Override
614  public void get(int sourceOffset, byte[] dst, int offset, int length) {
615    checkRefCount();
616    int itemIndex = getItemIndex(sourceOffset);
617    ByteBuffer item = this.items[itemIndex];
618    sourceOffset = sourceOffset - this.itemBeginPos[itemIndex];
619    while (length > 0) {
620      int toRead = Math.min((item.limit() - sourceOffset), length);
621      ByteBufferUtils.copyFromBufferToArray(dst, item, sourceOffset, offset, toRead);
622      length -= toRead;
623      if (length == 0) break;
624      itemIndex++;
625      item = this.items[itemIndex];
626      offset += toRead;
627      sourceOffset = 0;
628    }
629  }
630
631  /**
632   * Marks the limit of this MBB.
633   * @param limit
634   * @return This MBB
635   */
636  @Override
637  public MultiByteBuff limit(int limit) {
638    checkRefCount();
639    this.limit = limit;
640    // Normally the limit will try to limit within the last BB item
641    int limitedIndexBegin = this.itemBeginPos[this.limitedItemIndex];
642    if (limit >= limitedIndexBegin && limit < this.itemBeginPos[this.limitedItemIndex + 1]) {
643      this.items[this.limitedItemIndex].limit(limit - limitedIndexBegin);
644      return this;
645    }
646    int itemIndex = getItemIndex(limit);
647    int beginOffset = this.itemBeginPos[itemIndex];
648    int offsetInItem = limit - beginOffset;
649    ByteBuffer item = items[itemIndex];
650    item.limit(offsetInItem);
651    for (int i = this.limitedItemIndex; i < itemIndex; i++) {
652      this.items[i].limit(this.items[i].capacity());
653    }
654    this.limitedItemIndex = itemIndex;
655    for (int i = itemIndex + 1; i < this.items.length; i++) {
656      this.items[i].limit(this.items[i].position());
657    }
658    return this;
659  }
660
661  /**
662   * Returns the limit of this MBB
663   * @return limit of the MBB
664   */
665  @Override
666  public int limit() {
667    return this.limit;
668  }
669
670  /**
671   * Returns an MBB which is a sliced version of this MBB. The position, limit and mark
672   * of the new MBB will be independent than that of the original MBB.
673   * The content of the new MBB will start at this MBB's current position
674   * @return a sliced MBB
675   */
676  @Override
677  public MultiByteBuff slice() {
678    checkRefCount();
679    ByteBuffer[] copy = new ByteBuffer[this.limitedItemIndex - this.curItemIndex + 1];
680    for (int i = curItemIndex, j = 0; i <= this.limitedItemIndex; i++, j++) {
681      copy[j] = this.items[i].slice();
682    }
683    return new MultiByteBuff(refCnt, copy);
684  }
685
686  /**
687   * Returns an MBB which is a duplicate version of this MBB. The position, limit and mark of the
688   * new MBB will be independent than that of the original MBB. The content of the new MBB will
689   * start at this MBB's current position The position, limit and mark of the new MBB would be
690   * identical to this MBB in terms of values.
691   * @return a duplicated MBB
692   */
693  @Override
694  public MultiByteBuff duplicate() {
695    checkRefCount();
696    ByteBuffer[] itemsCopy = new ByteBuffer[this.items.length];
697    for (int i = 0; i < this.items.length; i++) {
698      itemsCopy[i] = items[i].duplicate();
699    }
700    return new MultiByteBuff(refCnt, itemsCopy, this.itemBeginPos, this.limit,
701        this.limitedItemIndex, this.curItemIndex, this.markedItemIndex);
702  }
703
704  /**
705   * Writes a byte to this MBB at the current position and increments the position
706   * @param b
707   * @return this object
708   */
709  @Override
710  public MultiByteBuff put(byte b) {
711    checkRefCount();
712    if (this.curItem.remaining() == 0) {
713      if (this.curItemIndex == this.items.length - 1) {
714        throw new BufferOverflowException();
715      }
716      this.curItemIndex++;
717      this.curItem = this.items[this.curItemIndex];
718    }
719    this.curItem.put(b);
720    return this;
721  }
722
723  /**
724   * Writes a byte to this MBB at the given index
725   * @param index
726   * @param b
727   * @return this object
728   */
729  @Override
730  public MultiByteBuff put(int index, byte b) {
731    checkRefCount();
732    int itemIndex = getItemIndex(limit);
733    ByteBuffer item = items[itemIndex];
734    item.put(index - itemBeginPos[itemIndex], b);
735    return this;
736  }
737
738  /**
739   * Copies from a src MBB to this MBB.
740   * @param offset the position in this MBB to which the copy should happen
741   * @param src the src MBB
742   * @param srcOffset the offset in the src MBB from where the elements should be read
743   * @param length the length upto which the copy should happen
744   */
745  @Override
746  public MultiByteBuff put(int offset, ByteBuff src, int srcOffset, int length) {
747    checkRefCount();
748    int destItemIndex = getItemIndex(offset);
749    int srcItemIndex = getItemIndex(srcOffset);
750    ByteBuffer destItem = this.items[destItemIndex];
751    offset = offset - this.itemBeginPos[destItemIndex];
752
753    ByteBuffer srcItem = getItemByteBuffer(src, srcItemIndex);
754    srcOffset = srcOffset - this.itemBeginPos[srcItemIndex];
755    int toRead, toWrite, toMove;
756    while (length > 0) {
757      toWrite = destItem.limit() - offset;
758      toRead = srcItem.limit() - srcOffset;
759      toMove = Math.min(length, Math.min(toRead, toWrite));
760      ByteBufferUtils.copyFromBufferToBuffer(srcItem, destItem, srcOffset, offset, toMove);
761      length -= toMove;
762      if (length == 0) break;
763      if (toRead < toWrite) {
764        srcItem = getItemByteBuffer(src, ++srcItemIndex);
765        srcOffset = 0;
766        offset += toMove;
767      } else if (toRead > toWrite) {
768        destItem = this.items[++destItemIndex];
769        offset = 0;
770        srcOffset += toMove;
771      } else {
772        // toRead = toWrite case
773        srcItem = getItemByteBuffer(src, ++srcItemIndex);
774        srcOffset = 0;
775        destItem = this.items[++destItemIndex];
776        offset = 0;
777      }
778    }
779    return this;
780  }
781
782  private static ByteBuffer getItemByteBuffer(ByteBuff buf, int index) {
783    return (buf instanceof SingleByteBuff) ? buf.nioByteBuffers()[0]
784        : ((MultiByteBuff) buf).items[index];
785  }
786
787  /**
788   * Writes an int to this MBB at its current position. Also advances the position by size of int
789   * @param val Int value to write
790   * @return this object
791   */
792  @Override
793  public MultiByteBuff putInt(int val) {
794    checkRefCount();
795    if (this.curItem.remaining() >= Bytes.SIZEOF_INT) {
796      this.curItem.putInt(val);
797      return this;
798    }
799    if (this.curItemIndex == this.items.length - 1) {
800      throw new BufferOverflowException();
801    }
802    // During read, we will read as byte by byte for this case. So just write in Big endian
803    put(int3(val));
804    put(int2(val));
805    put(int1(val));
806    put(int0(val));
807    return this;
808  }
809
810  private static byte int3(int x) {
811    return (byte) (x >> 24);
812  }
813
814  private static byte int2(int x) {
815    return (byte) (x >> 16);
816  }
817
818  private static byte int1(int x) {
819    return (byte) (x >> 8);
820  }
821
822  private static byte int0(int x) {
823    return (byte) (x);
824  }
825
826  /**
827   * Copies from the given byte[] to this MBB
828   * @param src
829   * @return this MBB
830   */
831  @Override
832  public final MultiByteBuff put(byte[] src) {
833    return put(src, 0, src.length);
834  }
835
836  /**
837   * Copies from the given byte[] to this MBB
838   * @param src
839   * @param offset the position in the byte array from which the copy should be done
840   * @param length the length upto which the copy should happen
841   * @return this MBB
842   */
843  @Override
844  public MultiByteBuff put(byte[] src, int offset, int length) {
845    checkRefCount();
846    if (this.curItem.remaining() >= length) {
847      ByteBufferUtils.copyFromArrayToBuffer(this.curItem, src, offset, length);
848      return this;
849    }
850    int end = offset + length;
851    for (int i = offset; i < end; i++) {
852      this.put(src[i]);
853    }
854    return this;
855  }
856
857
858  /**
859   * Writes a long to this MBB at its current position. Also advances the position by size of long
860   * @param val Long value to write
861   * @return this object
862   */
863  @Override
864  public MultiByteBuff putLong(long val) {
865    checkRefCount();
866    if (this.curItem.remaining() >= Bytes.SIZEOF_LONG) {
867      this.curItem.putLong(val);
868      return this;
869    }
870    if (this.curItemIndex == this.items.length - 1) {
871      throw new BufferOverflowException();
872    }
873    // During read, we will read as byte by byte for this case. So just write in Big endian
874    put(long7(val));
875    put(long6(val));
876    put(long5(val));
877    put(long4(val));
878    put(long3(val));
879    put(long2(val));
880    put(long1(val));
881    put(long0(val));
882    return this;
883  }
884
885  private static byte long7(long x) {
886    return (byte) (x >> 56);
887  }
888
889  private static byte long6(long x) {
890    return (byte) (x >> 48);
891  }
892
893  private static byte long5(long x) {
894    return (byte) (x >> 40);
895  }
896
897  private static byte long4(long x) {
898    return (byte) (x >> 32);
899  }
900
901  private static byte long3(long x) {
902    return (byte) (x >> 24);
903  }
904
905  private static byte long2(long x) {
906    return (byte) (x >> 16);
907  }
908
909  private static byte long1(long x) {
910    return (byte) (x >> 8);
911  }
912
913  private static byte long0(long x) {
914    return (byte) (x);
915  }
916
917  /**
918   * Jumps the current position of this MBB by specified length.
919   * @param length
920   */
921  @Override
922  public MultiByteBuff skip(int length) {
923    checkRefCount();
924    // Get available bytes from this item and remaining from next
925    int jump = 0;
926    while (true) {
927      jump = this.curItem.remaining();
928      if (jump >= length) {
929        this.curItem.position(this.curItem.position() + length);
930        break;
931      }
932      this.curItem.position(this.curItem.position() + jump);
933      length -= jump;
934      this.curItemIndex++;
935      this.curItem = this.items[this.curItemIndex];
936    }
937    return this;
938  }
939
940  /**
941   * Jumps back the current position of this MBB by specified length.
942   * @param length
943   */
944  @Override
945  public MultiByteBuff moveBack(int length) {
946    checkRefCount();
947    while (length != 0) {
948      if (length > curItem.position()) {
949        length -= curItem.position();
950        this.curItem.position(0);
951        this.curItemIndex--;
952        this.curItem = this.items[curItemIndex];
953      } else {
954        this.curItem.position(curItem.position() - length);
955        break;
956      }
957    }
958    return this;
959  }
960
961 /**
962   * Returns bytes from current position till length specified, as a single ByteBuffer. When all
963   * these bytes happen to be in a single ByteBuffer, which this object wraps, that ByteBuffer item
964   * as such will be returned. So users are warned not to change the position or limit of this
965   * returned ByteBuffer. The position of the returned byte buffer is at the begin of the required
966   * bytes. When the required bytes happen to span across multiple ByteBuffers, this API will copy
967   * the bytes to a newly created ByteBuffer of required size and return that.
968   *
969   * @param length number of bytes required.
970   * @return bytes from current position till length specified, as a single ByteButter.
971   */
972  @Override
973  public ByteBuffer asSubByteBuffer(int length) {
974    checkRefCount();
975    if (this.curItem.remaining() >= length) {
976      return this.curItem;
977    }
978    int offset = 0;
979    byte[] dupB = new byte[length];
980    int locCurItemIndex = curItemIndex;
981    ByteBuffer locCurItem = curItem;
982    while (length > 0) {
983      int toRead = Math.min(length, locCurItem.remaining());
984      ByteBufferUtils.copyFromBufferToArray(dupB, locCurItem, locCurItem.position(), offset,
985        toRead);
986      length -= toRead;
987      if (length == 0) break;
988      locCurItemIndex++;
989      locCurItem = this.items[locCurItemIndex];
990      offset += toRead;
991    }
992    return ByteBuffer.wrap(dupB);
993  }
994
995  /**
996   * Returns bytes from given offset till length specified, as a single ByteBuffer. When all these
997   * bytes happen to be in a single ByteBuffer, which this object wraps, that ByteBuffer item as
998   * such will be returned (with offset in this ByteBuffer where the bytes starts). So users are
999   * warned not to change the position or limit of this returned ByteBuffer. When the required bytes
1000   * happen to span across multiple ByteBuffers, this API will copy the bytes to a newly created
1001   * ByteBuffer of required size and return that.
1002   *
1003   * @param offset the offset in this MBB from where the subBuffer should be created
1004   * @param length the length of the subBuffer
1005   * @param pair a pair that will have the bytes from the current position till length specified, as
1006   *        a single ByteBuffer and offset in that Buffer where the bytes starts. The method would
1007   *        set the values on the pair that is passed in by the caller
1008   */
1009  @Override
1010  public void asSubByteBuffer(int offset, int length, ObjectIntPair<ByteBuffer> pair) {
1011    checkRefCount();
1012    if (this.itemBeginPos[this.curItemIndex] <= offset) {
1013      int relOffsetInCurItem = offset - this.itemBeginPos[this.curItemIndex];
1014      if (this.curItem.limit() - relOffsetInCurItem >= length) {
1015        pair.setFirst(this.curItem);
1016        pair.setSecond(relOffsetInCurItem);
1017        return;
1018      }
1019    }
1020    int itemIndex = getItemIndex(offset);
1021    ByteBuffer item = this.items[itemIndex];
1022    offset = offset - this.itemBeginPos[itemIndex];
1023    if (item.limit() - offset >= length) {
1024      pair.setFirst(item);
1025      pair.setSecond(offset);
1026      return;
1027    }
1028    byte[] dst = new byte[length];
1029    int destOffset = 0;
1030    while (length > 0) {
1031      int toRead = Math.min(length, item.limit() - offset);
1032      ByteBufferUtils.copyFromBufferToArray(dst, item, offset, destOffset, toRead);
1033      length -= toRead;
1034      if (length == 0) break;
1035      itemIndex++;
1036      item = this.items[itemIndex];
1037      destOffset += toRead;
1038      offset = 0;
1039    }
1040    pair.setFirst(ByteBuffer.wrap(dst));
1041    pair.setSecond(0);
1042  }
1043
1044  /**
1045   * Copies the content from an this MBB to a ByteBuffer
1046   * @param out the ByteBuffer to which the copy has to happen, its position will be advanced.
1047   * @param sourceOffset the offset in the MBB from which the elements has to be copied
1048   * @param length the length in the MBB upto which the elements has to be copied
1049   */
1050  @Override
1051  public void get(ByteBuffer out, int sourceOffset, int length) {
1052    checkRefCount();
1053    int itemIndex = getItemIndex(sourceOffset);
1054    ByteBuffer in = this.items[itemIndex];
1055    sourceOffset = sourceOffset - this.itemBeginPos[itemIndex];
1056    while (length > 0) {
1057      int toRead = Math.min(in.limit() - sourceOffset, length);
1058      ByteBufferUtils.copyFromBufferToBuffer(in, out, sourceOffset, toRead);
1059      length -= toRead;
1060      if (length == 0) {
1061        break;
1062      }
1063      itemIndex++;
1064      in = this.items[itemIndex];
1065      sourceOffset = 0;
1066    }
1067  }
1068
1069  /**
1070   * Copy the content from this MBB to a byte[] based on the given offset and
1071   * length
1072   *
1073   * @param offset
1074   *          the position from where the copy should start
1075   * @param length
1076   *          the length upto which the copy has to be done
1077   * @return byte[] with the copied contents from this MBB.
1078   */
1079  @Override
1080  public byte[] toBytes(int offset, int length) {
1081    checkRefCount();
1082    byte[] output = new byte[length];
1083    this.get(offset, output, 0, length);
1084    return output;
1085  }
1086
1087  private int internalRead(ReadableByteChannel channel, long offset,
1088      ChannelReader reader) throws IOException {
1089    checkRefCount();
1090    int total = 0;
1091    while (buffsIterator.hasNext()) {
1092      ByteBuffer buffer = buffsIterator.next();
1093      int len = read(channel, buffer, offset, reader);
1094      if (len > 0) {
1095        total += len;
1096        offset += len;
1097      }
1098      if (buffer.hasRemaining()) {
1099        break;
1100      }
1101    }
1102    return total;
1103  }
1104
1105  @Override
1106  public int read(ReadableByteChannel channel) throws IOException {
1107    return internalRead(channel, 0, CHANNEL_READER);
1108  }
1109
1110  @Override
1111  public int read(FileChannel channel, long offset) throws IOException {
1112    return internalRead(channel, offset, FILE_READER);
1113  }
1114
1115  @Override
1116  public int write(FileChannel channel, long offset) throws IOException {
1117    checkRefCount();
1118    int total = 0;
1119    while (buffsIterator.hasNext()) {
1120      ByteBuffer buffer = buffsIterator.next();
1121      while (buffer.hasRemaining()) {
1122        int len = channel.write(buffer, offset);
1123        total += len;
1124        offset += len;
1125      }
1126    }
1127    return total;
1128  }
1129
1130  @Override
1131  public ByteBuffer[] nioByteBuffers() {
1132    checkRefCount();
1133    return this.items;
1134  }
1135
1136  @Override
1137  public boolean equals(Object obj) {
1138    if (!(obj instanceof MultiByteBuff)) return false;
1139    if (this == obj) return true;
1140    MultiByteBuff that = (MultiByteBuff) obj;
1141    if (this.capacity() != that.capacity()) return false;
1142    if (ByteBuff.compareTo(this, this.position(), this.limit(), that, that.position(),
1143      that.limit()) == 0) {
1144      return true;
1145    }
1146    return false;
1147  }
1148
1149  @Override
1150  public int hashCode() {
1151    int hash = 0;
1152    for (ByteBuffer b : this.items) {
1153      hash += b.hashCode();
1154    }
1155    return hash;
1156  }
1157
1158  @Override
1159  public MultiByteBuff retain() {
1160    refCnt.retain();
1161    return this;
1162  }
1163}