001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.nio; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.IOException; 022import java.nio.ByteBuffer; 023import java.nio.channels.FileChannel; 024import java.nio.channels.ReadableByteChannel; 025import java.util.List; 026import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler; 027import org.apache.hadoop.hbase.util.ByteBufferUtils; 028import org.apache.hadoop.hbase.util.Bytes; 029import org.apache.hadoop.hbase.util.ObjectIntPair; 030import org.apache.yetus.audience.InterfaceAudience; 031 032import org.apache.hbase.thirdparty.io.netty.util.internal.ObjectUtil; 033 034/** 035 * An abstract class that abstracts out as to how the byte buffers are used, either single or 036 * multiple. We have this interface because the java's ByteBuffers cannot be sub-classed. This class 037 * provides APIs similar to the ones provided in java's nio ByteBuffers and allows you to do 038 * positional reads/writes and relative reads and writes on the underlying BB. In addition to it, we 039 * have some additional APIs which helps us in the read path. <br/> 040 * The ByteBuff implement {@link HBaseReferenceCounted} interface which mean need to maintains a 041 * {@link RefCnt} inside, if ensure that the ByteBuff won't be used any more, we must do a 042 * {@link ByteBuff#release()} to recycle its NIO ByteBuffers. when considering the 043 * {@link ByteBuff#duplicate()} or {@link ByteBuff#slice()}, releasing either the duplicated one or 044 * the original one will free its memory, because they share the same NIO ByteBuffers. when you want 045 * to retain the NIO ByteBuffers even if the origin one called {@link ByteBuff#release()}, you can 046 * do like this: 047 * 048 * <pre> 049 * ByteBuff original = ...; 050 * ByteBuff dup = original.duplicate(); 051 * dup.retain(); 052 * original.release(); 053 * // The NIO buffers can still be accessed unless you release the duplicated one 054 * dup.get(...); 055 * dup.release(); 056 * // Both the original and dup can not access the NIO buffers any more. 057 * </pre> 058 */ 059@InterfaceAudience.Private 060public abstract class ByteBuff implements HBaseReferenceCounted { 061 private static final String REFERENCE_COUNT_NAME = "ReferenceCount"; 062 private static final int NIO_BUFFER_LIMIT = 64 * 1024; // should not be more than 64KB. 063 064 protected RefCnt refCnt; 065 066 /*************************** Methods for reference count **********************************/ 067 068 protected void checkRefCount() { 069 ObjectUtil.checkPositive(refCnt(), REFERENCE_COUNT_NAME); 070 } 071 072 @Override 073 public int refCnt() { 074 return refCnt.refCnt(); 075 } 076 077 @Override 078 public boolean release() { 079 return refCnt.release(); 080 } 081 082 /******************************* Methods for ByteBuff **************************************/ 083 084 /** Returns this ByteBuff's current position */ 085 public abstract int position(); 086 087 /** 088 * Sets this ByteBuff's position to the given value. n * @return this object 089 */ 090 public abstract ByteBuff position(int position); 091 092 /** 093 * Jumps the current position of this ByteBuff by specified length. 094 * @param len the length to be skipped 095 */ 096 public abstract ByteBuff skip(int len); 097 098 /** 099 * Jumps back the current position of this ByteBuff by specified length. 100 * @param len the length to move back 101 */ 102 public abstract ByteBuff moveBack(int len); 103 104 /** Returns the total capacity of this ByteBuff. */ 105 public abstract int capacity(); 106 107 /** Returns the limit of this ByteBuff */ 108 public abstract int limit(); 109 110 /** Marks the limit of this ByteBuff */ 111 public abstract ByteBuff limit(int limit); 112 113 /** Rewinds this ByteBuff and the position is set to 0 */ 114 public abstract ByteBuff rewind(); 115 116 /** Marks the current position of the ByteBuff */ 117 public abstract ByteBuff mark(); 118 119 /** 120 * Returns bytes from current position till length specified, as a single ByteBuffer. When all 121 * these bytes happen to be in a single ByteBuffer, which this object wraps, that ByteBuffer item 122 * as such will be returned. So users are warned not to change the position or limit of this 123 * returned ByteBuffer. The position of the returned byte buffer is at the begin of the required 124 * bytes. When the required bytes happen to span across multiple ByteBuffers, this API will copy 125 * the bytes to a newly created ByteBuffer of required size and return that. 126 * @param length number of bytes required. 127 * @return bytes from current position till length specified, as a single ByteButter. 128 */ 129 public abstract ByteBuffer asSubByteBuffer(int length); 130 131 /** 132 * Returns bytes from given offset till length specified, as a single ByteBuffer. When all these 133 * bytes happen to be in a single ByteBuffer, which this object wraps, that ByteBuffer item as 134 * such will be returned (with offset in this ByteBuffer where the bytes starts). So users are 135 * warned not to change the position or limit of this returned ByteBuffer. When the required bytes 136 * happen to span across multiple ByteBuffers, this API will copy the bytes to a newly created 137 * ByteBuffer of required size and return that. 138 * @param offset the offset in this ByteBuff from where the subBuffer should be created 139 * @param length the length of the subBuffer 140 * @param pair a pair that will have the bytes from the current position till length specified, 141 * as a single ByteBuffer and offset in that Buffer where the bytes starts. Since 142 * this API gets called in a loop we are passing a pair to it which could be created 143 * outside the loop and the method would set the values on the pair that is passed 144 * in by the caller. Thus it avoids more object creations that would happen if the 145 * pair that is returned is created by this method every time. 146 */ 147 public abstract void asSubByteBuffer(int offset, int length, ObjectIntPair<ByteBuffer> pair); 148 149 /** Returns the number of elements between the current position and the limit. */ 150 public abstract int remaining(); 151 152 /** Returns true if there are elements between the current position and the limit. */ 153 public abstract boolean hasRemaining(); 154 155 /** 156 * Similar to {@link ByteBuffer}.reset(), ensures that this ByteBuff is reset back to last marked 157 * position. 158 * @return This ByteBuff 159 */ 160 public abstract ByteBuff reset(); 161 162 /** 163 * Returns an ByteBuff which is a sliced version of this ByteBuff. The position, limit and mark of 164 * the new ByteBuff will be independent than that of the original ByteBuff. The content of the new 165 * ByteBuff will start at this ByteBuff's current position 166 * @return a sliced ByteBuff 167 */ 168 public abstract ByteBuff slice(); 169 170 /** 171 * Returns an ByteBuff which is a duplicate version of this ByteBuff. The position, limit and mark 172 * of the new ByteBuff will be independent than that of the original ByteBuff. The content of the 173 * new ByteBuff will start at this ByteBuff's current position The position, limit and mark of the 174 * new ByteBuff would be identical to this ByteBuff in terms of values. 175 * @return a sliced ByteBuff 176 */ 177 public abstract ByteBuff duplicate(); 178 179 /** 180 * A relative method that returns byte at the current position. Increments the current position by 181 * the size of a byte. 182 * @return the byte at the current position 183 */ 184 public abstract byte get(); 185 186 /** 187 * Fetches the byte at the given index. Does not change position of the underlying ByteBuffers n 188 * @return the byte at the given index 189 */ 190 public abstract byte get(int index); 191 192 /** 193 * Fetches the byte at the given offset from current position. Does not change position of the 194 * underlying ByteBuffers. 195 * @return the byte value at the given index. 196 */ 197 public abstract byte getByteAfterPosition(int offset); 198 199 /** 200 * Writes a byte to this ByteBuff at the current position and increments the position 201 * @return this object 202 */ 203 public abstract ByteBuff put(byte b); 204 205 /** 206 * Writes a byte to this ByteBuff at the given index 207 * @return this object 208 */ 209 public abstract ByteBuff put(int index, byte b); 210 211 /** 212 * Copies the specified number of bytes from this ByteBuff's current position to the byte[]'s 213 * offset. Also advances the position of the ByteBuff by the given length. 214 * @param dst the byte[] to which the ByteBuff's content is to be copied 215 * @param offset within the current array 216 * @param length upto which the bytes to be copied 217 */ 218 public abstract void get(byte[] dst, int offset, int length); 219 220 /** 221 * Copies the specified number of bytes from this ByteBuff's given position to the byte[]'s 222 * offset. The position of the ByteBuff remains in the current position only 223 * @param sourceOffset the offset in this ByteBuff from where the copy should happen 224 * @param dst the byte[] to which the ByteBuff's content is to be copied 225 * @param offset within the current array 226 * @param length upto which the bytes to be copied 227 */ 228 public abstract void get(int sourceOffset, byte[] dst, int offset, int length); 229 230 /** 231 * Copies the content from this ByteBuff's current position to the byte array and fills it. Also 232 * advances the position of the ByteBuff by the length of the byte[]. 233 * @param dst the byte[] to which the ByteBuff's content is to be copied 234 */ 235 public abstract void get(byte[] dst); 236 237 /** 238 * Copies from the given byte[] to this ByteBuff 239 * @param src source byte array 240 * @param offset the position in the byte array from which the copy should be done 241 * @param length the length upto which the copy should happen 242 * @return this ByteBuff 243 */ 244 public abstract ByteBuff put(byte[] src, int offset, int length); 245 246 /** 247 * Copies from the given byte[] to this ByteBuff n * @return this ByteBuff 248 * @param src source byte array 249 * @return this ByteBuff 250 */ 251 public abstract ByteBuff put(byte[] src); 252 253 /** Returns true or false if the underlying BB support hasArray */ 254 public abstract boolean hasArray(); 255 256 /** Returns the byte[] if the underlying BB has single BB and hasArray true */ 257 public abstract byte[] array(); 258 259 /** Returns the arrayOffset of the byte[] incase of a single BB backed ByteBuff */ 260 public abstract int arrayOffset(); 261 262 /** 263 * Returns the short value at the current position. Also advances the position by the size of 264 * short. 265 */ 266 public abstract short getShort(); 267 268 /** 269 * Fetches the short value at the given index. Does not change position of the underlying 270 * ByteBuffers. The caller is sure that the index will be after the current position of this 271 * ByteBuff. So even if the current short does not fit in the current item we can safely move to 272 * the next item and fetch the remaining bytes forming the short n * @return the short value at 273 * the given index 274 */ 275 public abstract short getShort(int index); 276 277 /** 278 * Fetches the short value at the given offset from current position. Does not change position of 279 * the underlying ByteBuffers. n * @return the short value at the given index. 280 */ 281 public abstract short getShortAfterPosition(int offset); 282 283 /** 284 * Returns the int value at the current position. Also advances the position by the size of int. 285 */ 286 public abstract int getInt(); 287 288 /** 289 * Writes an int to this ByteBuff at its current position. Also advances the position by size of 290 * int. 291 */ 292 public abstract ByteBuff putInt(int value); 293 294 /** 295 * Fetches the int at the given index. Does not change position of the underlying ByteBuffers. 296 * Even if the current int does not fit in the current item we can safely move to the next item 297 * and fetch the remaining bytes forming the int. 298 */ 299 public abstract int getInt(int index); 300 301 /** 302 * Fetches the int value at the given offset from current position. Does not change position of 303 * the underlying ByteBuffers. 304 */ 305 public abstract int getIntAfterPosition(int offset); 306 307 /** 308 * Returns the long value at the current position. Also advances the position by the size of long. 309 */ 310 public abstract long getLong(); 311 312 /** 313 * Writes a long to this ByteBuff at its current position. Also advances the position by size of 314 * long. 315 */ 316 public abstract ByteBuff putLong(long value); 317 318 /** 319 * Fetches the long at the given index. Does not change position of the underlying ByteBuffers. 320 * The caller is sure that the index will be after the current position of this ByteBuff. So even 321 * if the current long does not fit in the current item we can safely move to the next item and 322 * fetch the remaining bytes forming the long n * @return the long value at the given index 323 */ 324 public abstract long getLong(int index); 325 326 /** 327 * Fetches the long value at the given offset from current position. Does not change position of 328 * the underlying ByteBuffers. n * @return the long value at the given index. 329 */ 330 public abstract long getLongAfterPosition(int offset); 331 332 /** 333 * Copy the content from this ByteBuff to a byte[]. 334 */ 335 public byte[] toBytes() { 336 return toBytes(0, this.limit()); 337 } 338 339 /** 340 * Copy the content from this ByteBuff to a byte[] based on the given offset and length. 341 */ 342 public abstract byte[] toBytes(int offset, int length); 343 344 /** 345 * Copies the content from this ByteBuff to a ByteBuffer Note : This will advance the position 346 * marker of {@code out} but not change the position maker for this ByteBuff 347 * @param out the ByteBuffer to which the copy has to happen 348 * @param sourceOffset the offset in the ByteBuff from which the elements has to be copied 349 * @param length the length in this ByteBuff upto which the elements has to be copied 350 */ 351 public abstract void get(ByteBuffer out, int sourceOffset, int length); 352 353 /** 354 * Copies the contents from the src ByteBuff to this ByteBuff. This will be absolute positional 355 * copying and won't affect the position of any of the buffers. 356 * @param offset the position in this ByteBuff to which the copy should happen 357 * @param src the src ByteBuff 358 * @param srcOffset the offset in the src ByteBuff from where the elements should be read 359 * @param length the length up to which the copy should happen 360 */ 361 public abstract ByteBuff put(int offset, ByteBuff src, int srcOffset, int length); 362 363 /** Reads bytes from the given channel into this ByteBuf. */ 364 public abstract int read(ReadableByteChannel channel) throws IOException; 365 366 /** Reads bytes from FileChannel into this ByteBuff */ 367 public abstract int read(FileChannel channel, long offset) throws IOException; 368 369 /** Write this ByteBuff's data into target file */ 370 public abstract int write(FileChannel channel, long offset) throws IOException; 371 372 /** Functional interface for Channel read */ 373 @FunctionalInterface 374 interface ChannelReader { 375 int read(ReadableByteChannel channel, ByteBuffer buf, long offset) throws IOException; 376 } 377 378 static final ChannelReader CHANNEL_READER = (channel, buf, offset) -> { 379 return channel.read(buf); 380 }; 381 382 static final ChannelReader FILE_READER = (channel, buf, offset) -> { 383 return ((FileChannel) channel).read(buf, offset); 384 }; 385 386 // static helper methods 387 public static int read(ReadableByteChannel channel, ByteBuffer buf, long offset, 388 ChannelReader reader) throws IOException { 389 if (buf.remaining() <= NIO_BUFFER_LIMIT) { 390 return reader.read(channel, buf, offset); 391 } 392 int originalLimit = buf.limit(); 393 int initialRemaining = buf.remaining(); 394 int ret = 0; 395 396 while (buf.remaining() > 0) { 397 try { 398 int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT); 399 buf.limit(buf.position() + ioSize); 400 offset += ret; 401 ret = reader.read(channel, buf, offset); 402 if (ret < ioSize) { 403 break; 404 } 405 } finally { 406 buf.limit(originalLimit); 407 } 408 } 409 int nBytes = initialRemaining - buf.remaining(); 410 return (nBytes > 0) ? nBytes : ret; 411 } 412 413 /** Read integer from ByteBuff coded in 7 bits and increment position. */ 414 public static int readCompressedInt(ByteBuff buf) { 415 byte b = buf.get(); 416 if ((b & ByteBufferUtils.NEXT_BIT_MASK) != 0) { 417 return (b & ByteBufferUtils.VALUE_MASK) 418 + (readCompressedInt(buf) << ByteBufferUtils.NEXT_BIT_SHIFT); 419 } 420 return b & ByteBufferUtils.VALUE_MASK; 421 } 422 423 /** 424 * Compares two ByteBuffs 425 * @param buf1 the first ByteBuff 426 * @param o1 the offset in the first ByteBuff from where the compare has to happen 427 * @param len1 the length in the first ByteBuff upto which the compare has to happen 428 * @param buf2 the second ByteBuff 429 * @param o2 the offset in the second ByteBuff from where the compare has to happen 430 * @param len2 the length in the second ByteBuff upto which the compare has to happen 431 * @return Positive if buf1 is bigger than buf2, 0 if they are equal, and negative if buf1 is 432 * smaller than buf2. 433 */ 434 public static int compareTo(ByteBuff buf1, int o1, int len1, ByteBuff buf2, int o2, int len2) { 435 if (buf1.hasArray() && buf2.hasArray()) { 436 return Bytes.compareTo(buf1.array(), buf1.arrayOffset() + o1, len1, buf2.array(), 437 buf2.arrayOffset() + o2, len2); 438 } 439 int end1 = o1 + len1; 440 int end2 = o2 + len2; 441 for (int i = o1, j = o2; i < end1 && j < end2; i++, j++) { 442 int a = buf1.get(i) & 0xFF; 443 int b = buf2.get(j) & 0xFF; 444 if (a != b) { 445 return a - b; 446 } 447 } 448 return len1 - len2; 449 } 450 451 /** 452 * Read long which was written to fitInBytes bytes and increment position. 453 * @param fitInBytes In how many bytes given long is stored. 454 * @return The value of parsed long. 455 */ 456 public static long readLong(ByteBuff in, final int fitInBytes) { 457 long tmpLength = 0; 458 for (int i = 0; i < fitInBytes; ++i) { 459 tmpLength |= (in.get() & 0xffl) << (8l * i); 460 } 461 return tmpLength; 462 } 463 464 public abstract ByteBuffer[] nioByteBuffers(); 465 466 @Override 467 public String toString() { 468 return this.getClass().getSimpleName() + "[pos=" + position() + ", lim=" + limit() + ", cap= " 469 + capacity() + "]"; 470 } 471 472 /********************************* ByteBuff wrapper methods ***********************************/ 473 474 /** 475 * In theory, the upstream should never construct an ByteBuff by passing an given refCnt, so 476 * please don't use this public method in other place. Make the method public here because the 477 * BucketEntry#wrapAsCacheable in hbase-server module will use its own refCnt and ByteBuffers from 478 * IOEngine to composite an HFileBlock's ByteBuff, we didn't find a better way so keep the public 479 * way here. 480 */ 481 public static ByteBuff wrap(ByteBuffer[] buffers, RefCnt refCnt) { 482 if (buffers == null || buffers.length == 0) { 483 throw new IllegalArgumentException("buffers shouldn't be null or empty"); 484 } 485 return buffers.length == 1 486 ? new SingleByteBuff(refCnt, buffers[0]) 487 : new MultiByteBuff(refCnt, buffers); 488 } 489 490 public static ByteBuff wrap(ByteBuffer[] buffers, Recycler recycler) { 491 return wrap(buffers, RefCnt.create(recycler)); 492 } 493 494 public static ByteBuff wrap(ByteBuffer[] buffers) { 495 return wrap(buffers, RefCnt.create()); 496 } 497 498 public static ByteBuff wrap(List<ByteBuffer> buffers, Recycler recycler) { 499 return wrap(buffers, RefCnt.create(recycler)); 500 } 501 502 public static ByteBuff wrap(List<ByteBuffer> buffers) { 503 return wrap(buffers, RefCnt.create()); 504 } 505 506 public static ByteBuff wrap(ByteBuffer buffer) { 507 return wrap(buffer, RefCnt.create()); 508 } 509 510 /** 511 * Calling this method in strategic locations where ByteBuffs are referenced may help diagnose 512 * potential buffer leaks. We pass the buffer itself as a default hint, but one can use 513 * {@link #touch(Object)} to pass their own hint as well. 514 */ 515 @Override 516 public ByteBuff touch() { 517 return touch(this); 518 } 519 520 @Override 521 public ByteBuff touch(Object hint) { 522 refCnt.touch(hint); 523 return this; 524 } 525 526 @RestrictedApi(explanation = "Should only be called in tests", link = "", 527 allowedOnPath = ".*/src/test/.*") 528 public RefCnt getRefCnt() { 529 return refCnt; 530 } 531 532 // Make this private because we don't want to expose the refCnt related wrap method to upstream. 533 private static ByteBuff wrap(List<ByteBuffer> buffers, RefCnt refCnt) { 534 if (buffers == null || buffers.size() == 0) { 535 throw new IllegalArgumentException("buffers shouldn't be null or empty"); 536 } 537 return buffers.size() == 1 538 ? new SingleByteBuff(refCnt, buffers.get(0)) 539 : new MultiByteBuff(refCnt, buffers.toArray(new ByteBuffer[0])); 540 } 541 542 // Make this private because we don't want to expose the refCnt related wrap method to upstream. 543 private static ByteBuff wrap(ByteBuffer buffer, RefCnt refCnt) { 544 return new SingleByteBuff(refCnt, buffer); 545 } 546}