001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.nio;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.io.IOException;
022import java.nio.ByteBuffer;
023import java.nio.channels.FileChannel;
024import java.nio.channels.ReadableByteChannel;
025import java.util.List;
026import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
027import org.apache.hadoop.hbase.util.ByteBufferUtils;
028import org.apache.hadoop.hbase.util.Bytes;
029import org.apache.hadoop.hbase.util.ObjectIntPair;
030import org.apache.yetus.audience.InterfaceAudience;
031
032import org.apache.hbase.thirdparty.io.netty.util.internal.ObjectUtil;
033
034/**
035 * An abstract class that abstracts out as to how the byte buffers are used, either single or
036 * multiple. We have this interface because the java's ByteBuffers cannot be sub-classed. This class
037 * provides APIs similar to the ones provided in java's nio ByteBuffers and allows you to do
038 * positional reads/writes and relative reads and writes on the underlying BB. In addition to it, we
039 * have some additional APIs which helps us in the read path. <br/>
040 * The ByteBuff implement {@link HBaseReferenceCounted} interface which mean need to maintains a
041 * {@link RefCnt} inside, if ensure that the ByteBuff won't be used any more, we must do a
042 * {@link ByteBuff#release()} to recycle its NIO ByteBuffers. when considering the
043 * {@link ByteBuff#duplicate()} or {@link ByteBuff#slice()}, releasing either the duplicated one or
044 * the original one will free its memory, because they share the same NIO ByteBuffers. when you want
045 * to retain the NIO ByteBuffers even if the origin one called {@link ByteBuff#release()}, you can
046 * do like this:
047 *
048 * <pre>
049 *   ByteBuff original = ...;
050 *   ByteBuff dup = original.duplicate();
051 *   dup.retain();
052 *   original.release();
053 *   // The NIO buffers can still be accessed unless you release the duplicated one
054 *   dup.get(...);
055 *   dup.release();
056 *   // Both the original and dup can not access the NIO buffers any more.
057 * </pre>
058 */
059@InterfaceAudience.Private
060public abstract class ByteBuff implements HBaseReferenceCounted {
061  private static final String REFERENCE_COUNT_NAME = "ReferenceCount";
062  private static final int NIO_BUFFER_LIMIT = 64 * 1024; // should not be more than 64KB.
063
064  protected RefCnt refCnt;
065
066  /*************************** Methods for reference count **********************************/
067
068  protected void checkRefCount() {
069    ObjectUtil.checkPositive(refCnt(), REFERENCE_COUNT_NAME);
070  }
071
072  public int refCnt() {
073    return refCnt.refCnt();
074  }
075
076  @Override
077  public boolean release() {
078    return refCnt.release();
079  }
080
081  /******************************* Methods for ByteBuff **************************************/
082
083  /** Returns this ByteBuff's current position */
084  public abstract int position();
085
086  /**
087   * Sets this ByteBuff's position to the given value. n * @return this object
088   */
089  public abstract ByteBuff position(int position);
090
091  /**
092   * Jumps the current position of this ByteBuff by specified length.
093   * @param len the length to be skipped
094   */
095  public abstract ByteBuff skip(int len);
096
097  /**
098   * Jumps back the current position of this ByteBuff by specified length.
099   * @param len the length to move back
100   */
101  public abstract ByteBuff moveBack(int len);
102
103  /** Returns the total capacity of this ByteBuff. */
104  public abstract int capacity();
105
106  /**
107   * Returns the limit of this ByteBuff
108   * @return limit of the ByteBuff
109   */
110  public abstract int limit();
111
112  /**
113   * Marks the limit of this ByteBuff. n * @return This ByteBuff
114   */
115  public abstract ByteBuff limit(int limit);
116
117  /**
118   * Rewinds this ByteBuff and the position is set to 0
119   * @return this object
120   */
121  public abstract ByteBuff rewind();
122
123  /**
124   * Marks the current position of the ByteBuff
125   * @return this object
126   */
127  public abstract ByteBuff mark();
128
129  /**
130   * Returns bytes from current position till length specified, as a single ByteBuffer. When all
131   * these bytes happen to be in a single ByteBuffer, which this object wraps, that ByteBuffer item
132   * as such will be returned. So users are warned not to change the position or limit of this
133   * returned ByteBuffer. The position of the returned byte buffer is at the begin of the required
134   * bytes. When the required bytes happen to span across multiple ByteBuffers, this API will copy
135   * the bytes to a newly created ByteBuffer of required size and return that.
136   * @param length number of bytes required.
137   * @return bytes from current position till length specified, as a single ByteButter.
138   */
139  public abstract ByteBuffer asSubByteBuffer(int length);
140
141  /**
142   * Returns bytes from given offset till length specified, as a single ByteBuffer. When all these
143   * bytes happen to be in a single ByteBuffer, which this object wraps, that ByteBuffer item as
144   * such will be returned (with offset in this ByteBuffer where the bytes starts). So users are
145   * warned not to change the position or limit of this returned ByteBuffer. When the required bytes
146   * happen to span across multiple ByteBuffers, this API will copy the bytes to a newly created
147   * ByteBuffer of required size and return that.
148   * @param offset the offset in this ByteBuff from where the subBuffer should be created
149   * @param length the length of the subBuffer
150   * @param pair   a pair that will have the bytes from the current position till length specified,
151   *               as a single ByteBuffer and offset in that Buffer where the bytes starts. Since
152   *               this API gets called in a loop we are passing a pair to it which could be created
153   *               outside the loop and the method would set the values on the pair that is passed
154   *               in by the caller. Thus it avoids more object creations that would happen if the
155   *               pair that is returned is created by this method every time.
156   */
157  public abstract void asSubByteBuffer(int offset, int length, ObjectIntPair<ByteBuffer> pair);
158
159  /**
160   * Returns the number of elements between the current position and the limit.
161   * @return the remaining elements in this ByteBuff
162   */
163  public abstract int remaining();
164
165  /**
166   * Returns true if there are elements between the current position and the limt
167   * @return true if there are elements, false otherwise
168   */
169  public abstract boolean hasRemaining();
170
171  /**
172   * Similar to {@link ByteBuffer}.reset(), ensures that this ByteBuff is reset back to last marked
173   * position.
174   * @return This ByteBuff
175   */
176  public abstract ByteBuff reset();
177
178  /**
179   * Returns an ByteBuff which is a sliced version of this ByteBuff. The position, limit and mark of
180   * the new ByteBuff will be independent than that of the original ByteBuff. The content of the new
181   * ByteBuff will start at this ByteBuff's current position
182   * @return a sliced ByteBuff
183   */
184  public abstract ByteBuff slice();
185
186  /**
187   * Returns an ByteBuff which is a duplicate version of this ByteBuff. The position, limit and mark
188   * of the new ByteBuff will be independent than that of the original ByteBuff. The content of the
189   * new ByteBuff will start at this ByteBuff's current position The position, limit and mark of the
190   * new ByteBuff would be identical to this ByteBuff in terms of values.
191   * @return a sliced ByteBuff
192   */
193  public abstract ByteBuff duplicate();
194
195  /**
196   * A relative method that returns byte at the current position. Increments the current position by
197   * the size of a byte.
198   * @return the byte at the current position
199   */
200  public abstract byte get();
201
202  /**
203   * Fetches the byte at the given index. Does not change position of the underlying ByteBuffers n
204   * * @return the byte at the given index
205   */
206  public abstract byte get(int index);
207
208  /**
209   * Fetches the byte at the given offset from current position. Does not change position of the
210   * underlying ByteBuffers. n * @return the byte value at the given index.
211   */
212  public abstract byte getByteAfterPosition(int offset);
213
214  /**
215   * Writes a byte to this ByteBuff at the current position and increments the position n * @return
216   * this object
217   */
218  public abstract ByteBuff put(byte b);
219
220  /**
221   * Writes a byte to this ByteBuff at the given index nn * @return this object
222   */
223  public abstract ByteBuff put(int index, byte b);
224
225  /**
226   * Copies the specified number of bytes from this ByteBuff's current position to the byte[]'s
227   * offset. Also advances the position of the ByteBuff by the given length. n * @param offset
228   * within the current array
229   * @param length upto which the bytes to be copied
230   */
231  public abstract void get(byte[] dst, int offset, int length);
232
233  /**
234   * Copies the specified number of bytes from this ByteBuff's given position to the byte[]'s
235   * offset. The position of the ByteBuff remains in the current position only
236   * @param sourceOffset the offset in this ByteBuff from where the copy should happen
237   * @param dst          the byte[] to which the ByteBuff's content is to be copied
238   * @param offset       within the current array
239   * @param length       upto which the bytes to be copied
240   */
241  public abstract void get(int sourceOffset, byte[] dst, int offset, int length);
242
243  /**
244   * Copies the content from this ByteBuff's current position to the byte array and fills it. Also
245   * advances the position of the ByteBuff by the length of the byte[]. n
246   */
247  public abstract void get(byte[] dst);
248
249  /**
250   * Copies from the given byte[] to this ByteBuff n * @param offset the position in the byte array
251   * from which the copy should be done
252   * @param length the length upto which the copy should happen
253   * @return this ByteBuff
254   */
255  public abstract ByteBuff put(byte[] src, int offset, int length);
256
257  /**
258   * Copies from the given byte[] to this ByteBuff n * @return this ByteBuff
259   */
260  public abstract ByteBuff put(byte[] src);
261
262  /** Returns true or false if the underlying BB support hasArray */
263  public abstract boolean hasArray();
264
265  /** Returns the byte[] if the underlying BB has single BB and hasArray true */
266  public abstract byte[] array();
267
268  /** Returns the arrayOffset of the byte[] incase of a single BB backed ByteBuff */
269  public abstract int arrayOffset();
270
271  /**
272   * Returns the short value at the current position. Also advances the position by the size of
273   * short
274   * @return the short value at the current position
275   */
276  public abstract short getShort();
277
278  /**
279   * Fetches the short value at the given index. Does not change position of the underlying
280   * ByteBuffers. The caller is sure that the index will be after the current position of this
281   * ByteBuff. So even if the current short does not fit in the current item we can safely move to
282   * the next item and fetch the remaining bytes forming the short n * @return the short value at
283   * the given index
284   */
285  public abstract short getShort(int index);
286
287  /**
288   * Fetches the short value at the given offset from current position. Does not change position of
289   * the underlying ByteBuffers. n * @return the short value at the given index.
290   */
291  public abstract short getShortAfterPosition(int offset);
292
293  /**
294   * Returns the int value at the current position. Also advances the position by the size of int
295   * @return the int value at the current position
296   */
297  public abstract int getInt();
298
299  /**
300   * Writes an int to this ByteBuff at its current position. Also advances the position by size of
301   * int
302   * @param value Int value to write
303   * @return this object
304   */
305  public abstract ByteBuff putInt(int value);
306
307  /**
308   * Fetches the int at the given index. Does not change position of the underlying ByteBuffers.
309   * Even if the current int does not fit in the current item we can safely move to the next item
310   * and fetch the remaining bytes forming the int n * @return the int value at the given index
311   */
312  public abstract int getInt(int index);
313
314  /**
315   * Fetches the int value at the given offset from current position. Does not change position of
316   * the underlying ByteBuffers. n * @return the int value at the given index.
317   */
318  public abstract int getIntAfterPosition(int offset);
319
320  /**
321   * Returns the long value at the current position. Also advances the position by the size of long
322   * @return the long value at the current position
323   */
324  public abstract long getLong();
325
326  /**
327   * Writes a long to this ByteBuff at its current position. Also advances the position by size of
328   * long
329   * @param value Long value to write
330   * @return this object
331   */
332  public abstract ByteBuff putLong(long value);
333
334  /**
335   * Fetches the long at the given index. Does not change position of the underlying ByteBuffers.
336   * The caller is sure that the index will be after the current position of this ByteBuff. So even
337   * if the current long does not fit in the current item we can safely move to the next item and
338   * fetch the remaining bytes forming the long n * @return the long value at the given index
339   */
340  public abstract long getLong(int index);
341
342  /**
343   * Fetches the long value at the given offset from current position. Does not change position of
344   * the underlying ByteBuffers. n * @return the long value at the given index.
345   */
346  public abstract long getLongAfterPosition(int offset);
347
348  /**
349   * Copy the content from this ByteBuff to a byte[].
350   * @return byte[] with the copied contents from this ByteBuff.
351   */
352  public byte[] toBytes() {
353    return toBytes(0, this.limit());
354  }
355
356  /**
357   * Copy the content from this ByteBuff to a byte[] based on the given offset and length n * the
358   * position from where the copy should start n * the length upto which the copy has to be done
359   * @return byte[] with the copied contents from this ByteBuff.
360   */
361  public abstract byte[] toBytes(int offset, int length);
362
363  /**
364   * Copies the content from this ByteBuff to a ByteBuffer Note : This will advance the position
365   * marker of {@code out} but not change the position maker for this ByteBuff
366   * @param out          the ByteBuffer to which the copy has to happen
367   * @param sourceOffset the offset in the ByteBuff from which the elements has to be copied
368   * @param length       the length in this ByteBuff upto which the elements has to be copied
369   */
370  public abstract void get(ByteBuffer out, int sourceOffset, int length);
371
372  /**
373   * Copies the contents from the src ByteBuff to this ByteBuff. This will be absolute positional
374   * copying and won't affect the position of any of the buffers.
375   * @param offset    the position in this ByteBuff to which the copy should happen
376   * @param src       the src ByteBuff
377   * @param srcOffset the offset in the src ByteBuff from where the elements should be read
378   * @param length    the length up to which the copy should happen
379   */
380  public abstract ByteBuff put(int offset, ByteBuff src, int srcOffset, int length);
381
382  /**
383   * Reads bytes from the given channel into this ByteBuff n * @return The number of bytes read from
384   * the channel n
385   */
386  public abstract int read(ReadableByteChannel channel) throws IOException;
387
388  /**
389   * Reads bytes from FileChannel into this ByteBuff
390   */
391  public abstract int read(FileChannel channel, long offset) throws IOException;
392
393  /**
394   * Write this ByteBuff's data into target file
395   */
396  public abstract int write(FileChannel channel, long offset) throws IOException;
397
398  /**
399   * function interface for Channel read
400   */
401  @FunctionalInterface
402  interface ChannelReader {
403    int read(ReadableByteChannel channel, ByteBuffer buf, long offset) throws IOException;
404  }
405
406  static final ChannelReader CHANNEL_READER = (channel, buf, offset) -> {
407    return channel.read(buf);
408  };
409
410  static final ChannelReader FILE_READER = (channel, buf, offset) -> {
411    return ((FileChannel) channel).read(buf, offset);
412  };
413
414  // static helper methods
415  public static int read(ReadableByteChannel channel, ByteBuffer buf, long offset,
416    ChannelReader reader) throws IOException {
417    if (buf.remaining() <= NIO_BUFFER_LIMIT) {
418      return reader.read(channel, buf, offset);
419    }
420    int originalLimit = buf.limit();
421    int initialRemaining = buf.remaining();
422    int ret = 0;
423
424    while (buf.remaining() > 0) {
425      try {
426        int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
427        buf.limit(buf.position() + ioSize);
428        offset += ret;
429        ret = reader.read(channel, buf, offset);
430        if (ret < ioSize) {
431          break;
432        }
433      } finally {
434        buf.limit(originalLimit);
435      }
436    }
437    int nBytes = initialRemaining - buf.remaining();
438    return (nBytes > 0) ? nBytes : ret;
439  }
440
441  /**
442   * Read integer from ByteBuff coded in 7 bits and increment position.
443   * @return Read integer.
444   */
445  public static int readCompressedInt(ByteBuff buf) {
446    byte b = buf.get();
447    if ((b & ByteBufferUtils.NEXT_BIT_MASK) != 0) {
448      return (b & ByteBufferUtils.VALUE_MASK)
449        + (readCompressedInt(buf) << ByteBufferUtils.NEXT_BIT_SHIFT);
450    }
451    return b & ByteBufferUtils.VALUE_MASK;
452  }
453
454  /**
455   * Compares two ByteBuffs
456   * @param buf1 the first ByteBuff
457   * @param o1   the offset in the first ByteBuff from where the compare has to happen
458   * @param len1 the length in the first ByteBuff upto which the compare has to happen
459   * @param buf2 the second ByteBuff
460   * @param o2   the offset in the second ByteBuff from where the compare has to happen
461   * @param len2 the length in the second ByteBuff upto which the compare has to happen
462   * @return Positive if buf1 is bigger than buf2, 0 if they are equal, and negative if buf1 is
463   *         smaller than buf2.
464   */
465  public static int compareTo(ByteBuff buf1, int o1, int len1, ByteBuff buf2, int o2, int len2) {
466    if (buf1.hasArray() && buf2.hasArray()) {
467      return Bytes.compareTo(buf1.array(), buf1.arrayOffset() + o1, len1, buf2.array(),
468        buf2.arrayOffset() + o2, len2);
469    }
470    int end1 = o1 + len1;
471    int end2 = o2 + len2;
472    for (int i = o1, j = o2; i < end1 && j < end2; i++, j++) {
473      int a = buf1.get(i) & 0xFF;
474      int b = buf2.get(j) & 0xFF;
475      if (a != b) {
476        return a - b;
477      }
478    }
479    return len1 - len2;
480  }
481
482  /**
483   * Read long which was written to fitInBytes bytes and increment position.
484   * @param fitInBytes In how many bytes given long is stored.
485   * @return The value of parsed long.
486   */
487  public static long readLong(ByteBuff in, final int fitInBytes) {
488    long tmpLength = 0;
489    for (int i = 0; i < fitInBytes; ++i) {
490      tmpLength |= (in.get() & 0xffl) << (8l * i);
491    }
492    return tmpLength;
493  }
494
495  public abstract ByteBuffer[] nioByteBuffers();
496
497  @Override
498  public String toString() {
499    return this.getClass().getSimpleName() + "[pos=" + position() + ", lim=" + limit() + ", cap= "
500      + capacity() + "]";
501  }
502
503  /********************************* ByteBuff wrapper methods ***********************************/
504
505  /**
506   * In theory, the upstream should never construct an ByteBuff by passing an given refCnt, so
507   * please don't use this public method in other place. Make the method public here because the
508   * BucketEntry#wrapAsCacheable in hbase-server module will use its own refCnt and ByteBuffers from
509   * IOEngine to composite an HFileBlock's ByteBuff, we didn't find a better way so keep the public
510   * way here.
511   */
512  public static ByteBuff wrap(ByteBuffer[] buffers, RefCnt refCnt) {
513    if (buffers == null || buffers.length == 0) {
514      throw new IllegalArgumentException("buffers shouldn't be null or empty");
515    }
516    return buffers.length == 1
517      ? new SingleByteBuff(refCnt, buffers[0])
518      : new MultiByteBuff(refCnt, buffers);
519  }
520
521  public static ByteBuff wrap(ByteBuffer[] buffers, Recycler recycler) {
522    return wrap(buffers, RefCnt.create(recycler));
523  }
524
525  public static ByteBuff wrap(ByteBuffer[] buffers) {
526    return wrap(buffers, RefCnt.create());
527  }
528
529  public static ByteBuff wrap(List<ByteBuffer> buffers, Recycler recycler) {
530    return wrap(buffers, RefCnt.create(recycler));
531  }
532
533  public static ByteBuff wrap(List<ByteBuffer> buffers) {
534    return wrap(buffers, RefCnt.create());
535  }
536
537  public static ByteBuff wrap(ByteBuffer buffer) {
538    return wrap(buffer, RefCnt.create());
539  }
540
541  /**
542   * Calling this method in strategic locations where ByteBuffs are referenced may help diagnose
543   * potential buffer leaks. We pass the buffer itself as a default hint, but one can use
544   * {@link #touch(Object)} to pass their own hint as well.
545   */
546  @Override
547  public ByteBuff touch() {
548    return touch(this);
549  }
550
551  @Override
552  public ByteBuff touch(Object hint) {
553    refCnt.touch(hint);
554    return this;
555  }
556
557  @RestrictedApi(explanation = "Should only be called in tests", link = "",
558      allowedOnPath = ".*/src/test/.*")
559  public RefCnt getRefCnt() {
560    return refCnt;
561  }
562
563  /**
564   * Make this private because we don't want to expose the refCnt related wrap method to upstream.
565   */
566  private static ByteBuff wrap(List<ByteBuffer> buffers, RefCnt refCnt) {
567    if (buffers == null || buffers.size() == 0) {
568      throw new IllegalArgumentException("buffers shouldn't be null or empty");
569    }
570    return buffers.size() == 1
571      ? new SingleByteBuff(refCnt, buffers.get(0))
572      : new MultiByteBuff(refCnt, buffers.toArray(new ByteBuffer[0]));
573  }
574
575  /**
576   * Make this private because we don't want to expose the refCnt related wrap method to upstream.
577   */
578  private static ByteBuff wrap(ByteBuffer buffer, RefCnt refCnt) {
579    return new SingleByteBuff(refCnt, buffer);
580  }
581}