001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.nio;
019
020import java.io.IOException;
021import java.nio.ByteBuffer;
022import java.nio.channels.FileChannel;
023import java.nio.channels.ReadableByteChannel;
024import java.util.List;
025
026import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
027import org.apache.hadoop.hbase.util.ByteBufferUtils;
028import org.apache.hadoop.hbase.util.Bytes;
029import org.apache.hadoop.hbase.util.ObjectIntPair;
030import org.apache.yetus.audience.InterfaceAudience;
031
032import org.apache.hbase.thirdparty.io.netty.util.internal.ObjectUtil;
033
034
035/**
036 * An abstract class that abstracts out as to how the byte buffers are used, either single or
037 * multiple. We have this interface because the java's ByteBuffers cannot be sub-classed. This class
038 * provides APIs similar to the ones provided in java's nio ByteBuffers and allows you to do
039 * positional reads/writes and relative reads and writes on the underlying BB. In addition to it, we
040 * have some additional APIs which helps us in the read path. <br/>
041 * The ByteBuff implement {@link HBaseReferenceCounted} interface which mean need to maintains a
042 * {@link RefCnt} inside, if ensure that the ByteBuff won't be used any more, we must do a
043 * {@link ByteBuff#release()} to recycle its NIO ByteBuffers. when considering the
044 * {@link ByteBuff#duplicate()} or {@link ByteBuff#slice()}, releasing either the duplicated one or
045 * the original one will free its memory, because they share the same NIO ByteBuffers. when you want
046 * to retain the NIO ByteBuffers even if the origin one called {@link ByteBuff#release()}, you can
047 * do like this:
048 *
049 * <pre>
050 *   ByteBuff original = ...;
051 *   ByteBuff dup = original.duplicate();
052 *   dup.retain();
053 *   original.release();
054 *   // The NIO buffers can still be accessed unless you release the duplicated one
055 *   dup.get(...);
056 *   dup.release();
057 *   // Both the original and dup can not access the NIO buffers any more.
058 * </pre>
059 */
060@InterfaceAudience.Private
061public abstract class ByteBuff implements HBaseReferenceCounted {
062  private static final String REFERENCE_COUNT_NAME = "ReferenceCount";
063  private static final int NIO_BUFFER_LIMIT = 64 * 1024; // should not be more than 64KB.
064
065  protected RefCnt refCnt;
066
067  /*************************** Methods for reference count **********************************/
068
069  protected void checkRefCount() {
070    ObjectUtil.checkPositive(refCnt(), REFERENCE_COUNT_NAME);
071  }
072
073  public int refCnt() {
074    return refCnt.refCnt();
075  }
076
077  @Override
078  public boolean release() {
079    return refCnt.release();
080  }
081
082  /******************************* Methods for ByteBuff **************************************/
083
084  /**
085   * @return this ByteBuff's current position
086   */
087  public abstract int position();
088
089  /**
090   * Sets this ByteBuff's position to the given value.
091   * @param position
092   * @return this object
093   */
094  public abstract ByteBuff position(int position);
095
096  /**
097   * Jumps the current position of this ByteBuff by specified length.
098   * @param len the length to be skipped
099   */
100  public abstract ByteBuff skip(int len);
101
102  /**
103   * Jumps back the current position of this ByteBuff by specified length.
104   * @param len the length to move back
105   */
106  public abstract ByteBuff moveBack(int len);
107
108  /**
109   * @return the total capacity of this ByteBuff.
110   */
111  public abstract int capacity();
112
113  /**
114   * Returns the limit of this ByteBuff
115   * @return limit of the ByteBuff
116   */
117  public abstract int limit();
118
119  /**
120   * Marks the limit of this ByteBuff.
121   * @param limit
122   * @return This ByteBuff
123   */
124  public abstract ByteBuff limit(int limit);
125
126  /**
127   * Rewinds this ByteBuff and the position is set to 0
128   * @return this object
129   */
130  public abstract ByteBuff rewind();
131
132  /**
133   * Marks the current position of the ByteBuff
134   * @return this object
135   */
136  public abstract ByteBuff mark();
137
138  /**
139   * Returns bytes from current position till length specified, as a single ByteBuffer. When all
140   * these bytes happen to be in a single ByteBuffer, which this object wraps, that ByteBuffer item
141   * as such will be returned. So users are warned not to change the position or limit of this
142   * returned ByteBuffer. The position of the returned byte buffer is at the begin of the required
143   * bytes. When the required bytes happen to span across multiple ByteBuffers, this API will copy
144   * the bytes to a newly created ByteBuffer of required size and return that.
145   *
146   * @param length number of bytes required.
147   * @return bytes from current position till length specified, as a single ByteButter.
148   */
149  public abstract ByteBuffer asSubByteBuffer(int length);
150
151  /**
152   * Returns bytes from given offset till length specified, as a single ByteBuffer. When all these
153   * bytes happen to be in a single ByteBuffer, which this object wraps, that ByteBuffer item as
154   * such will be returned (with offset in this ByteBuffer where the bytes starts). So users are
155   * warned not to change the position or limit of this returned ByteBuffer. When the required bytes
156   * happen to span across multiple ByteBuffers, this API will copy the bytes to a newly created
157   * ByteBuffer of required size and return that.
158   *
159   * @param offset the offset in this ByteBuff from where the subBuffer should be created
160   * @param length the length of the subBuffer
161   * @param pair a pair that will have the bytes from the current position till length specified,
162   *        as a single ByteBuffer and offset in that Buffer where the bytes starts.
163   *        Since this API gets called in a loop we are passing a pair to it which could be created
164   *        outside the loop and the method would set the values on the pair that is passed in by
165   *        the caller. Thus it avoids more object creations that would happen if the pair that is
166   *        returned is created by this method every time.
167   */
168  public abstract void asSubByteBuffer(int offset, int length, ObjectIntPair<ByteBuffer> pair);
169
170  /**
171   * Returns the number of elements between the current position and the
172   * limit.
173   * @return the remaining elements in this ByteBuff
174   */
175  public abstract int remaining();
176
177  /**
178   * Returns true if there are elements between the current position and the limt
179   * @return true if there are elements, false otherwise
180   */
181  public abstract boolean hasRemaining();
182
183  /**
184   * Similar to {@link ByteBuffer}.reset(), ensures that this ByteBuff
185   * is reset back to last marked position.
186   * @return This ByteBuff
187   */
188  public abstract ByteBuff reset();
189
190  /**
191   * Returns an ByteBuff which is a sliced version of this ByteBuff. The position, limit and mark
192   * of the new ByteBuff will be independent than that of the original ByteBuff.
193   * The content of the new ByteBuff will start at this ByteBuff's current position
194   * @return a sliced ByteBuff
195   */
196  public abstract ByteBuff slice();
197
198  /**
199   * Returns an ByteBuff which is a duplicate version of this ByteBuff. The
200   * position, limit and mark of the new ByteBuff will be independent than that
201   * of the original ByteBuff. The content of the new ByteBuff will start at
202   * this ByteBuff's current position The position, limit and mark of the new
203   * ByteBuff would be identical to this ByteBuff in terms of values.
204   *
205   * @return a sliced ByteBuff
206   */
207  public abstract ByteBuff duplicate();
208
209  /**
210   * A relative method that returns byte at the current position.  Increments the
211   * current position by the size of a byte.
212   * @return the byte at the current position
213   */
214  public abstract byte get();
215
216  /**
217   * Fetches the byte at the given index. Does not change position of the underlying ByteBuffers
218   * @param index
219   * @return the byte at the given index
220   */
221  public abstract byte get(int index);
222
223  /**
224   * Fetches the byte at the given offset from current position. Does not change position
225   * of the underlying ByteBuffers.
226   *
227   * @param offset
228   * @return the byte value at the given index.
229   */
230  public abstract byte getByteAfterPosition(int offset);
231
232  /**
233   * Writes a byte to this ByteBuff at the current position and increments the position
234   * @param b
235   * @return this object
236   */
237  public abstract ByteBuff put(byte b);
238
239  /**
240   * Writes a byte to this ByteBuff at the given index
241   * @param index
242   * @param b
243   * @return this object
244   */
245  public abstract ByteBuff put(int index, byte b);
246
247  /**
248   * Copies the specified number of bytes from this ByteBuff's current position to
249   * the byte[]'s offset. Also advances the position of the ByteBuff by the given length.
250   * @param dst
251   * @param offset within the current array
252   * @param length upto which the bytes to be copied
253   */
254  public abstract void get(byte[] dst, int offset, int length);
255
256  /**
257   * Copies the specified number of bytes from this ByteBuff's given position to
258   * the byte[]'s offset. The position of the ByteBuff remains in the current position only
259   * @param sourceOffset the offset in this ByteBuff from where the copy should happen
260   * @param dst the byte[] to which the ByteBuff's content is to be copied
261   * @param offset within the current array
262   * @param length upto which the bytes to be copied
263   */
264  public abstract void get(int sourceOffset, byte[] dst, int offset, int length);
265
266  /**
267   * Copies the content from this ByteBuff's current position to the byte array and fills it. Also
268   * advances the position of the ByteBuff by the length of the byte[].
269   * @param dst
270   */
271  public abstract void get(byte[] dst);
272
273  /**
274   * Copies from the given byte[] to this ByteBuff
275   * @param src
276   * @param offset the position in the byte array from which the copy should be done
277   * @param length the length upto which the copy should happen
278   * @return this ByteBuff
279   */
280  public abstract ByteBuff put(byte[] src, int offset, int length);
281
282  /**
283   * Copies from the given byte[] to this ByteBuff
284   * @param src
285   * @return this ByteBuff
286   */
287  public abstract ByteBuff put(byte[] src);
288
289  /**
290   * @return true or false if the underlying BB support hasArray
291   */
292  public abstract boolean hasArray();
293
294  /**
295   * @return the byte[] if the underlying BB has single BB and hasArray true
296   */
297  public abstract byte[] array();
298
299  /**
300   * @return the arrayOffset of the byte[] incase of a single BB backed ByteBuff
301   */
302  public abstract int arrayOffset();
303
304  /**
305   * Returns the short value at the current position. Also advances the position by the size
306   * of short
307   *
308   * @return the short value at the current position
309   */
310  public abstract short getShort();
311
312  /**
313   * Fetches the short value at the given index. Does not change position of the
314   * underlying ByteBuffers. The caller is sure that the index will be after
315   * the current position of this ByteBuff. So even if the current short does not fit in the
316   * current item we can safely move to the next item and fetch the remaining bytes forming
317   * the short
318   *
319   * @param index
320   * @return the short value at the given index
321   */
322  public abstract short getShort(int index);
323
324  /**
325   * Fetches the short value at the given offset from current position. Does not change position
326   * of the underlying ByteBuffers.
327   *
328   * @param offset
329   * @return the short value at the given index.
330   */
331  public abstract short getShortAfterPosition(int offset);
332
333  /**
334   * Returns the int value at the current position. Also advances the position by the size of int
335   *
336   * @return the int value at the current position
337   */
338  public abstract int getInt();
339
340  /**
341   * Writes an int to this ByteBuff at its current position. Also advances the position
342   * by size of int
343   * @param value Int value to write
344   * @return this object
345   */
346  public abstract ByteBuff putInt(int value);
347
348  /**
349   * Fetches the int at the given index. Does not change position of the underlying ByteBuffers.
350   * Even if the current int does not fit in the
351   * current item we can safely move to the next item and fetch the remaining bytes forming
352   * the int
353   *
354   * @param index
355   * @return the int value at the given index
356   */
357  public abstract int getInt(int index);
358
359  /**
360   * Fetches the int value at the given offset from current position. Does not change position
361   * of the underlying ByteBuffers.
362   *
363   * @param offset
364   * @return the int value at the given index.
365   */
366  public abstract int getIntAfterPosition(int offset);
367
368  /**
369   * Returns the long value at the current position. Also advances the position by the size of long
370   *
371   * @return the long value at the current position
372   */
373  public abstract long getLong();
374
375  /**
376   * Writes a long to this ByteBuff at its current position.
377   * Also advances the position by size of long
378   * @param value Long value to write
379   * @return this object
380   */
381  public abstract ByteBuff putLong(long value);
382
383  /**
384   * Fetches the long at the given index. Does not change position of the
385   * underlying ByteBuffers. The caller is sure that the index will be after
386   * the current position of this ByteBuff. So even if the current long does not fit in the
387   * current item we can safely move to the next item and fetch the remaining bytes forming
388   * the long
389   *
390   * @param index
391   * @return the long value at the given index
392   */
393  public abstract long getLong(int index);
394
395  /**
396   * Fetches the long value at the given offset from current position. Does not change position
397   * of the underlying ByteBuffers.
398   *
399   * @param offset
400   * @return the long value at the given index.
401   */
402  public abstract long getLongAfterPosition(int offset);
403
404  /**
405   * Copy the content from this ByteBuff to a byte[].
406   * @return byte[] with the copied contents from this ByteBuff.
407   */
408  public byte[] toBytes() {
409    return toBytes(0, this.limit());
410  }
411
412  /**
413   * Copy the content from this ByteBuff to a byte[] based on the given offset and
414   * length
415   *
416   * @param offset
417   *          the position from where the copy should start
418   * @param length
419   *          the length upto which the copy has to be done
420   * @return byte[] with the copied contents from this ByteBuff.
421   */
422  public abstract byte[] toBytes(int offset, int length);
423
424  /**
425   * Copies the content from this ByteBuff to a ByteBuffer
426   * Note : This will advance the position marker of {@code out} but not change the position maker
427   * for this ByteBuff
428   * @param out the ByteBuffer to which the copy has to happen
429   * @param sourceOffset the offset in the ByteBuff from which the elements has
430   * to be copied
431   * @param length the length in this ByteBuff upto which the elements has to be copied
432   */
433  public abstract void get(ByteBuffer out, int sourceOffset, int length);
434
435  /**
436   * Copies the contents from the src ByteBuff to this ByteBuff. This will be
437   * absolute positional copying and
438   * won't affect the position of any of the buffers.
439   * @param offset the position in this ByteBuff to which the copy should happen
440   * @param src the src ByteBuff
441   * @param srcOffset the offset in the src ByteBuff from where the elements should be read
442   * @param length the length up to which the copy should happen
443   */
444  public abstract ByteBuff put(int offset, ByteBuff src, int srcOffset, int length);
445
446  /**
447   * Reads bytes from the given channel into this ByteBuff
448   * @param channel
449   * @return The number of bytes read from the channel
450   * @throws IOException
451   */
452  public abstract int read(ReadableByteChannel channel) throws IOException;
453
454  /**
455   * Reads bytes from FileChannel into this ByteBuff
456   */
457  public abstract int read(FileChannel channel, long offset) throws IOException;
458
459  /**
460   * Write this ByteBuff's data into target file
461   */
462  public abstract int write(FileChannel channel, long offset) throws IOException;
463
464  /**
465   * function interface for Channel read
466   */
467  @FunctionalInterface
468  interface ChannelReader {
469    int read(ReadableByteChannel channel, ByteBuffer buf, long offset) throws IOException;
470  }
471
472  static final ChannelReader CHANNEL_READER = (channel, buf, offset) -> {
473    return channel.read(buf);
474  };
475
476  static final ChannelReader FILE_READER = (channel, buf, offset) -> {
477    return ((FileChannel)channel).read(buf, offset);
478  };
479
480  // static helper methods
481  public static int read(ReadableByteChannel channel, ByteBuffer buf, long offset,
482      ChannelReader reader) throws IOException {
483    if (buf.remaining() <= NIO_BUFFER_LIMIT) {
484      return reader.read(channel, buf, offset);
485    }
486    int originalLimit = buf.limit();
487    int initialRemaining = buf.remaining();
488    int ret = 0;
489
490    while (buf.remaining() > 0) {
491      try {
492        int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
493        buf.limit(buf.position() + ioSize);
494        offset += ret;
495        ret = reader.read(channel, buf, offset);
496        if (ret < ioSize) {
497          break;
498        }
499      } finally {
500        buf.limit(originalLimit);
501      }
502    }
503    int nBytes = initialRemaining - buf.remaining();
504    return (nBytes > 0) ? nBytes : ret;
505  }
506
507  /**
508   * Read integer from ByteBuff coded in 7 bits and increment position.
509   * @return Read integer.
510   */
511  public static int readCompressedInt(ByteBuff buf) {
512    byte b = buf.get();
513    if ((b & ByteBufferUtils.NEXT_BIT_MASK) != 0) {
514      return (b & ByteBufferUtils.VALUE_MASK)
515          + (readCompressedInt(buf) << ByteBufferUtils.NEXT_BIT_SHIFT);
516    }
517    return b & ByteBufferUtils.VALUE_MASK;
518  }
519
520  /**
521   * Compares two ByteBuffs
522   *
523   * @param buf1 the first ByteBuff
524   * @param o1 the offset in the first ByteBuff from where the compare has to happen
525   * @param len1 the length in the first ByteBuff upto which the compare has to happen
526   * @param buf2 the second ByteBuff
527   * @param o2 the offset in the second ByteBuff from where the compare has to happen
528   * @param len2 the length in the second ByteBuff upto which the compare has to happen
529   * @return Positive if buf1 is bigger than buf2, 0 if they are equal, and negative if buf1 is
530   *         smaller than buf2.
531   */
532  public static int compareTo(ByteBuff buf1, int o1, int len1, ByteBuff buf2,
533      int o2, int len2) {
534    if (buf1.hasArray() && buf2.hasArray()) {
535      return Bytes.compareTo(buf1.array(), buf1.arrayOffset() + o1, len1, buf2.array(),
536          buf2.arrayOffset() + o2, len2);
537    }
538    int end1 = o1 + len1;
539    int end2 = o2 + len2;
540    for (int i = o1, j = o2; i < end1 && j < end2; i++, j++) {
541      int a = buf1.get(i) & 0xFF;
542      int b = buf2.get(j) & 0xFF;
543      if (a != b) {
544        return a - b;
545      }
546    }
547    return len1 - len2;
548  }
549
550  /**
551   * Read long which was written to fitInBytes bytes and increment position.
552   * @param fitInBytes In how many bytes given long is stored.
553   * @return The value of parsed long.
554   */
555  public static long readLong(ByteBuff in, final int fitInBytes) {
556    long tmpLength = 0;
557    for (int i = 0; i < fitInBytes; ++i) {
558      tmpLength |= (in.get() & 0xffl) << (8l * i);
559    }
560    return tmpLength;
561  }
562
563  public abstract ByteBuffer[] nioByteBuffers();
564
565  @Override
566  public String toString() {
567    return this.getClass().getSimpleName() + "[pos=" + position() + ", lim=" + limit() +
568        ", cap= " + capacity() + "]";
569  }
570
571  /********************************* ByteBuff wrapper methods ***********************************/
572
573  /**
574   * In theory, the upstream should never construct an ByteBuff by passing an given refCnt, so
575   * please don't use this public method in other place. Make the method public here because the
576   * BucketEntry#wrapAsCacheable in hbase-server module will use its own refCnt and ByteBuffers from
577   * IOEngine to composite an HFileBlock's ByteBuff, we didn't find a better way so keep the public
578   * way here.
579   */
580  public static ByteBuff wrap(ByteBuffer[] buffers, RefCnt refCnt) {
581    if (buffers == null || buffers.length == 0) {
582      throw new IllegalArgumentException("buffers shouldn't be null or empty");
583    }
584    return buffers.length == 1 ? new SingleByteBuff(refCnt, buffers[0])
585        : new MultiByteBuff(refCnt, buffers);
586  }
587
588  public static ByteBuff wrap(ByteBuffer[] buffers, Recycler recycler) {
589    return wrap(buffers, RefCnt.create(recycler));
590  }
591
592  public static ByteBuff wrap(ByteBuffer[] buffers) {
593    return wrap(buffers, RefCnt.create());
594  }
595
596  public static ByteBuff wrap(List<ByteBuffer> buffers, Recycler recycler) {
597    return wrap(buffers, RefCnt.create(recycler));
598  }
599
600  public static ByteBuff wrap(List<ByteBuffer> buffers) {
601    return wrap(buffers, RefCnt.create());
602  }
603
604  public static ByteBuff wrap(ByteBuffer buffer) {
605    return wrap(buffer, RefCnt.create());
606  }
607
608  /**
609   * Make this private because we don't want to expose the refCnt related wrap method to upstream.
610   */
611  private static ByteBuff wrap(List<ByteBuffer> buffers, RefCnt refCnt) {
612    if (buffers == null || buffers.size() == 0) {
613      throw new IllegalArgumentException("buffers shouldn't be null or empty");
614    }
615    return buffers.size() == 1 ? new SingleByteBuff(refCnt, buffers.get(0))
616        : new MultiByteBuff(refCnt, buffers.toArray(new ByteBuffer[0]));
617  }
618
619  /**
620   * Make this private because we don't want to expose the refCnt related wrap method to upstream.
621   */
622  private static ByteBuff wrap(ByteBuffer buffer, RefCnt refCnt) {
623    return new SingleByteBuff(refCnt, buffer);
624  }
625}