001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.nio; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.IOException; 022import java.nio.ByteBuffer; 023import java.nio.channels.FileChannel; 024import java.nio.channels.ReadableByteChannel; 025import java.util.List; 026import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler; 027import org.apache.hadoop.hbase.util.ByteBufferUtils; 028import org.apache.hadoop.hbase.util.Bytes; 029import org.apache.hadoop.hbase.util.ObjectIntPair; 030import org.apache.yetus.audience.InterfaceAudience; 031 032import org.apache.hbase.thirdparty.io.netty.util.internal.ObjectUtil; 033 034/** 035 * An abstract class that abstracts out as to how the byte buffers are used, either single or 036 * multiple. We have this interface because the java's ByteBuffers cannot be sub-classed. This class 037 * provides APIs similar to the ones provided in java's nio ByteBuffers and allows you to do 038 * positional reads/writes and relative reads and writes on the underlying BB. In addition to it, we 039 * have some additional APIs which helps us in the read path. <br/> 040 * The ByteBuff implement {@link HBaseReferenceCounted} interface which mean need to maintains a 041 * {@link RefCnt} inside, if ensure that the ByteBuff won't be used any more, we must do a 042 * {@link ByteBuff#release()} to recycle its NIO ByteBuffers. when considering the 043 * {@link ByteBuff#duplicate()} or {@link ByteBuff#slice()}, releasing either the duplicated one or 044 * the original one will free its memory, because they share the same NIO ByteBuffers. when you want 045 * to retain the NIO ByteBuffers even if the origin one called {@link ByteBuff#release()}, you can 046 * do like this: 047 * 048 * <pre> 049 * ByteBuff original = ...; 050 * ByteBuff dup = original.duplicate(); 051 * dup.retain(); 052 * original.release(); 053 * // The NIO buffers can still be accessed unless you release the duplicated one 054 * dup.get(...); 055 * dup.release(); 056 * // Both the original and dup can not access the NIO buffers any more. 057 * </pre> 058 */ 059@InterfaceAudience.Private 060public abstract class ByteBuff implements HBaseReferenceCounted { 061 private static final String REFERENCE_COUNT_NAME = "ReferenceCount"; 062 private static final int NIO_BUFFER_LIMIT = 64 * 1024; // should not be more than 64KB. 063 064 protected RefCnt refCnt; 065 066 /*************************** Methods for reference count **********************************/ 067 068 /** 069 * Checks that there are still references to the buffer. This protects against the case where a 070 * ByteBuff method (i.e. slice, get, etc) could be called against a buffer whose backing data may 071 * have been released. We only need to do this check if the refCnt has a recycler. If there's no 072 * recycler, the backing data will be handled by normal java GC and won't get incorrectly 073 * released. So we can avoid the overhead of checking the refCnt on every call. See HBASE-27710. 074 */ 075 protected void checkRefCount() { 076 if (refCnt.hasRecycler()) { 077 ObjectUtil.checkPositive(refCnt(), REFERENCE_COUNT_NAME); 078 } 079 } 080 081 @Override 082 public int refCnt() { 083 return refCnt.refCnt(); 084 } 085 086 @Override 087 public boolean release() { 088 return refCnt.release(); 089 } 090 091 /******************************* Methods for ByteBuff **************************************/ 092 093 /** Returns this ByteBuff's current position */ 094 public abstract int position(); 095 096 /** 097 * Sets this ByteBuff's position to the given value. 098 * @return this object 099 */ 100 public abstract ByteBuff position(int position); 101 102 /** 103 * Jumps the current position of this ByteBuff by specified length. 104 * @param len the length to be skipped 105 */ 106 public abstract ByteBuff skip(int len); 107 108 /** 109 * Jumps back the current position of this ByteBuff by specified length. 110 * @param len the length to move back 111 */ 112 public abstract ByteBuff moveBack(int len); 113 114 /** Returns the total capacity of this ByteBuff. */ 115 public abstract int capacity(); 116 117 /** Returns the limit of this ByteBuff */ 118 public abstract int limit(); 119 120 /** Marks the limit of this ByteBuff */ 121 public abstract ByteBuff limit(int limit); 122 123 /** Rewinds this ByteBuff and the position is set to 0 */ 124 public abstract ByteBuff rewind(); 125 126 /** Marks the current position of the ByteBuff */ 127 public abstract ByteBuff mark(); 128 129 /** 130 * Returns bytes from current position till length specified, as a single ByteBuffer. When all 131 * these bytes happen to be in a single ByteBuffer, which this object wraps, that ByteBuffer item 132 * as such will be returned. So users are warned not to change the position or limit of this 133 * returned ByteBuffer. The position of the returned byte buffer is at the begin of the required 134 * bytes. When the required bytes happen to span across multiple ByteBuffers, this API will copy 135 * the bytes to a newly created ByteBuffer of required size and return that. 136 * @param length number of bytes required. 137 * @return bytes from current position till length specified, as a single ByteButter. 138 */ 139 public abstract ByteBuffer asSubByteBuffer(int length); 140 141 /** 142 * Returns bytes from given offset till length specified, as a single ByteBuffer. When all these 143 * bytes happen to be in a single ByteBuffer, which this object wraps, that ByteBuffer item as 144 * such will be returned (with offset in this ByteBuffer where the bytes starts). So users are 145 * warned not to change the position or limit of this returned ByteBuffer. When the required bytes 146 * happen to span across multiple ByteBuffers, this API will copy the bytes to a newly created 147 * ByteBuffer of required size and return that. 148 * @param offset the offset in this ByteBuff from where the subBuffer should be created 149 * @param length the length of the subBuffer 150 * @param pair a pair that will have the bytes from the current position till length specified, 151 * as a single ByteBuffer and offset in that Buffer where the bytes starts. Since 152 * this API gets called in a loop we are passing a pair to it which could be created 153 * outside the loop and the method would set the values on the pair that is passed 154 * in by the caller. Thus it avoids more object creations that would happen if the 155 * pair that is returned is created by this method every time. 156 */ 157 public abstract void asSubByteBuffer(int offset, int length, ObjectIntPair<ByteBuffer> pair); 158 159 /** Returns the number of elements between the current position and the limit. */ 160 public abstract int remaining(); 161 162 /** Returns true if there are elements between the current position and the limit. */ 163 public abstract boolean hasRemaining(); 164 165 /** 166 * Similar to {@link ByteBuffer}.reset(), ensures that this ByteBuff is reset back to last marked 167 * position. 168 * @return This ByteBuff 169 */ 170 public abstract ByteBuff reset(); 171 172 /** 173 * Returns an ByteBuff which is a sliced version of this ByteBuff. The position, limit and mark of 174 * the new ByteBuff will be independent than that of the original ByteBuff. The content of the new 175 * ByteBuff will start at this ByteBuff's current position 176 * @return a sliced ByteBuff 177 */ 178 public abstract ByteBuff slice(); 179 180 /** 181 * Returns an ByteBuff which is a duplicate version of this ByteBuff. The position, limit and mark 182 * of the new ByteBuff will be independent than that of the original ByteBuff. The content of the 183 * new ByteBuff will start at this ByteBuff's current position The position, limit and mark of the 184 * new ByteBuff would be identical to this ByteBuff in terms of values. 185 * @return a sliced ByteBuff 186 */ 187 public abstract ByteBuff duplicate(); 188 189 /** 190 * A relative method that returns byte at the current position. Increments the current position by 191 * the size of a byte. 192 * @return the byte at the current position 193 */ 194 public abstract byte get(); 195 196 /** 197 * Fetches the byte at the given index. Does not change position of the underlying ByteBuffers 198 * @return the byte at the given index 199 */ 200 public abstract byte get(int index); 201 202 /** 203 * Fetches the byte at the given offset from current position. Does not change position of the 204 * underlying ByteBuffers. 205 * @return the byte value at the given index. 206 */ 207 public abstract byte getByteAfterPosition(int offset); 208 209 /** 210 * Writes a byte to this ByteBuff at the current position and increments the position 211 * @return this object 212 */ 213 public abstract ByteBuff put(byte b); 214 215 /** 216 * Writes a byte to this ByteBuff at the given index 217 * @return this object 218 */ 219 public abstract ByteBuff put(int index, byte b); 220 221 /** 222 * Copies the specified number of bytes from this ByteBuff's current position to the byte[]'s 223 * offset. Also advances the position of the ByteBuff by the given length. 224 * @param dst the byte[] to which the ByteBuff's content is to be copied 225 * @param offset within the current array 226 * @param length upto which the bytes to be copied 227 */ 228 public abstract void get(byte[] dst, int offset, int length); 229 230 /** 231 * Copies the specified number of bytes from this ByteBuff's given position to the byte[]'s 232 * offset. The position of the ByteBuff remains in the current position only 233 * @param sourceOffset the offset in this ByteBuff from where the copy should happen 234 * @param dst the byte[] to which the ByteBuff's content is to be copied 235 * @param offset within the current array 236 * @param length upto which the bytes to be copied 237 */ 238 public abstract void get(int sourceOffset, byte[] dst, int offset, int length); 239 240 /** 241 * Copies the content from this ByteBuff's current position to the byte array and fills it. Also 242 * advances the position of the ByteBuff by the length of the byte[]. 243 * @param dst the byte[] to which the ByteBuff's content is to be copied 244 */ 245 public abstract void get(byte[] dst); 246 247 /** 248 * Copies from the given byte[] to this ByteBuff 249 * @param src source byte array 250 * @param offset the position in the byte array from which the copy should be done 251 * @param length the length upto which the copy should happen 252 * @return this ByteBuff 253 */ 254 public abstract ByteBuff put(byte[] src, int offset, int length); 255 256 /** 257 * Copies from the given byte[] to this ByteBuff 258 * @return this ByteBuff 259 * @param src source byte array 260 * @return this ByteBuff 261 */ 262 public abstract ByteBuff put(byte[] src); 263 264 /** Returns true or false if the underlying BB support hasArray */ 265 public abstract boolean hasArray(); 266 267 /** Returns the byte[] if the underlying BB has single BB and hasArray true */ 268 public abstract byte[] array(); 269 270 /** Returns the arrayOffset of the byte[] incase of a single BB backed ByteBuff */ 271 public abstract int arrayOffset(); 272 273 /** 274 * Returns the short value at the current position. Also advances the position by the size of 275 * short. 276 */ 277 public abstract short getShort(); 278 279 /** 280 * Fetches the short value at the given index. Does not change position of the underlying 281 * ByteBuffers. The caller is sure that the index will be after the current position of this 282 * ByteBuff. So even if the current short does not fit in the current item we can safely move to 283 * the next item and fetch the remaining bytes forming the short 284 * @return the short value at the given index 285 */ 286 public abstract short getShort(int index); 287 288 /** 289 * Fetches the short value at the given offset from current position. Does not change position of 290 * the underlying ByteBuffers. 291 * @return the short value at the given index. 292 */ 293 public abstract short getShortAfterPosition(int offset); 294 295 /** 296 * Returns the int value at the current position. Also advances the position by the size of int. 297 */ 298 public abstract int getInt(); 299 300 /** 301 * Writes an int to this ByteBuff at its current position. Also advances the position by size of 302 * int. 303 */ 304 public abstract ByteBuff putInt(int value); 305 306 /** 307 * Fetches the int at the given index. Does not change position of the underlying ByteBuffers. 308 * Even if the current int does not fit in the current item we can safely move to the next item 309 * and fetch the remaining bytes forming the int. 310 */ 311 public abstract int getInt(int index); 312 313 /** 314 * Fetches the int value at the given offset from current position. Does not change position of 315 * the underlying ByteBuffers. 316 */ 317 public abstract int getIntAfterPosition(int offset); 318 319 /** 320 * Returns the long value at the current position. Also advances the position by the size of long. 321 */ 322 public abstract long getLong(); 323 324 /** 325 * Writes a long to this ByteBuff at its current position. Also advances the position by size of 326 * long. 327 */ 328 public abstract ByteBuff putLong(long value); 329 330 /** 331 * Fetches the long at the given index. Does not change position of the underlying ByteBuffers. 332 * The caller is sure that the index will be after the current position of this ByteBuff. So even 333 * if the current long does not fit in the current item we can safely move to the next item and 334 * fetch the remaining bytes forming the long 335 * @return the long value at the given index 336 */ 337 public abstract long getLong(int index); 338 339 /** 340 * Fetches the long value at the given offset from current position. Does not change position of 341 * the underlying ByteBuffers. 342 * @return the long value at the given index. 343 */ 344 public abstract long getLongAfterPosition(int offset); 345 346 /** 347 * Copy the content from this ByteBuff to a byte[]. 348 */ 349 public byte[] toBytes() { 350 return toBytes(0, this.limit()); 351 } 352 353 /** 354 * Copy the content from this ByteBuff to a byte[] based on the given offset and length. 355 */ 356 public abstract byte[] toBytes(int offset, int length); 357 358 /** 359 * Copies the content from this ByteBuff to a ByteBuffer Note : This will advance the position 360 * marker of {@code out} but not change the position maker for this ByteBuff 361 * @param out the ByteBuffer to which the copy has to happen 362 * @param sourceOffset the offset in the ByteBuff from which the elements has to be copied 363 * @param length the length in this ByteBuff upto which the elements has to be copied 364 */ 365 public abstract void get(ByteBuffer out, int sourceOffset, int length); 366 367 /** 368 * Copies the contents from the src ByteBuff to this ByteBuff. This will be absolute positional 369 * copying and won't affect the position of any of the buffers. 370 * @param offset the position in this ByteBuff to which the copy should happen 371 * @param src the src ByteBuff 372 * @param srcOffset the offset in the src ByteBuff from where the elements should be read 373 * @param length the length up to which the copy should happen 374 */ 375 public abstract ByteBuff put(int offset, ByteBuff src, int srcOffset, int length); 376 377 /** Reads bytes from the given channel into this ByteBuf. */ 378 public abstract int read(ReadableByteChannel channel) throws IOException; 379 380 /** Reads bytes from FileChannel into this ByteBuff */ 381 public abstract int read(FileChannel channel, long offset) throws IOException; 382 383 /** Write this ByteBuff's data into target file */ 384 public abstract int write(FileChannel channel, long offset) throws IOException; 385 386 /** Functional interface for Channel read */ 387 @FunctionalInterface 388 interface ChannelReader { 389 int read(ReadableByteChannel channel, ByteBuffer buf, long offset) throws IOException; 390 } 391 392 static final ChannelReader CHANNEL_READER = (channel, buf, offset) -> { 393 return channel.read(buf); 394 }; 395 396 static final ChannelReader FILE_READER = (channel, buf, offset) -> { 397 return ((FileChannel) channel).read(buf, offset); 398 }; 399 400 // static helper methods 401 public static int read(ReadableByteChannel channel, ByteBuffer buf, long offset, 402 ChannelReader reader) throws IOException { 403 if (buf.remaining() <= NIO_BUFFER_LIMIT) { 404 return reader.read(channel, buf, offset); 405 } 406 int originalLimit = buf.limit(); 407 int initialRemaining = buf.remaining(); 408 int ret = 0; 409 410 while (buf.remaining() > 0) { 411 try { 412 int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT); 413 buf.limit(buf.position() + ioSize); 414 offset += ret; 415 ret = reader.read(channel, buf, offset); 416 if (ret < ioSize) { 417 break; 418 } 419 } finally { 420 buf.limit(originalLimit); 421 } 422 } 423 int nBytes = initialRemaining - buf.remaining(); 424 return (nBytes > 0) ? nBytes : ret; 425 } 426 427 /** Read integer from ByteBuff coded in 7 bits and increment position. */ 428 public static int readCompressedInt(ByteBuff buf) { 429 byte b = buf.get(); 430 if ((b & ByteBufferUtils.NEXT_BIT_MASK) != 0) { 431 return (b & ByteBufferUtils.VALUE_MASK) 432 + (readCompressedInt(buf) << ByteBufferUtils.NEXT_BIT_SHIFT); 433 } 434 return b & ByteBufferUtils.VALUE_MASK; 435 } 436 437 /** 438 * Compares two ByteBuffs 439 * @param buf1 the first ByteBuff 440 * @param o1 the offset in the first ByteBuff from where the compare has to happen 441 * @param len1 the length in the first ByteBuff upto which the compare has to happen 442 * @param buf2 the second ByteBuff 443 * @param o2 the offset in the second ByteBuff from where the compare has to happen 444 * @param len2 the length in the second ByteBuff upto which the compare has to happen 445 * @return Positive if buf1 is bigger than buf2, 0 if they are equal, and negative if buf1 is 446 * smaller than buf2. 447 */ 448 public static int compareTo(ByteBuff buf1, int o1, int len1, ByteBuff buf2, int o2, int len2) { 449 if (buf1.hasArray() && buf2.hasArray()) { 450 return Bytes.compareTo(buf1.array(), buf1.arrayOffset() + o1, len1, buf2.array(), 451 buf2.arrayOffset() + o2, len2); 452 } 453 int end1 = o1 + len1; 454 int end2 = o2 + len2; 455 for (int i = o1, j = o2; i < end1 && j < end2; i++, j++) { 456 int a = buf1.get(i) & 0xFF; 457 int b = buf2.get(j) & 0xFF; 458 if (a != b) { 459 return a - b; 460 } 461 } 462 return len1 - len2; 463 } 464 465 /** 466 * Read long which was written to fitInBytes bytes and increment position. 467 * @param fitInBytes In how many bytes given long is stored. 468 * @return The value of parsed long. 469 */ 470 public static long readLong(ByteBuff in, final int fitInBytes) { 471 long tmpLength = 0; 472 for (int i = 0; i < fitInBytes; ++i) { 473 tmpLength |= (in.get() & 0xffl) << (8l * i); 474 } 475 return tmpLength; 476 } 477 478 public abstract ByteBuffer[] nioByteBuffers(); 479 480 @Override 481 public String toString() { 482 return this.getClass().getSimpleName() + "[pos=" + position() + ", lim=" + limit() + ", cap= " 483 + capacity() + "]"; 484 } 485 486 /********************************* ByteBuff wrapper methods ***********************************/ 487 488 /** 489 * In theory, the upstream should never construct an ByteBuff by passing an given refCnt, so 490 * please don't use this public method in other place. Make the method public here because the 491 * BucketEntry#wrapAsCacheable in hbase-server module will use its own refCnt and ByteBuffers from 492 * IOEngine to composite an HFileBlock's ByteBuff, we didn't find a better way so keep the public 493 * way here. 494 */ 495 public static ByteBuff wrap(ByteBuffer[] buffers, RefCnt refCnt) { 496 if (buffers == null || buffers.length == 0) { 497 throw new IllegalArgumentException("buffers shouldn't be null or empty"); 498 } 499 return buffers.length == 1 500 ? new SingleByteBuff(refCnt, buffers[0]) 501 : new MultiByteBuff(refCnt, buffers); 502 } 503 504 public static ByteBuff wrap(ByteBuffer[] buffers, Recycler recycler) { 505 return wrap(buffers, RefCnt.create(recycler)); 506 } 507 508 public static ByteBuff wrap(ByteBuffer[] buffers) { 509 return wrap(buffers, RefCnt.create()); 510 } 511 512 public static ByteBuff wrap(List<ByteBuffer> buffers, Recycler recycler) { 513 return wrap(buffers, RefCnt.create(recycler)); 514 } 515 516 public static ByteBuff wrap(List<ByteBuffer> buffers) { 517 return wrap(buffers, RefCnt.create()); 518 } 519 520 public static ByteBuff wrap(ByteBuffer buffer) { 521 return wrap(buffer, RefCnt.create()); 522 } 523 524 /** 525 * Calling this method in strategic locations where ByteBuffs are referenced may help diagnose 526 * potential buffer leaks. We pass the buffer itself as a default hint, but one can use 527 * {@link #touch(Object)} to pass their own hint as well. 528 */ 529 @Override 530 public ByteBuff touch() { 531 return touch(this); 532 } 533 534 @Override 535 public ByteBuff touch(Object hint) { 536 refCnt.touch(hint); 537 return this; 538 } 539 540 @RestrictedApi(explanation = "Should only be called in tests", link = "", 541 allowedOnPath = ".*/src/test/.*") 542 public RefCnt getRefCnt() { 543 return refCnt; 544 } 545 546 // Make this private because we don't want to expose the refCnt related wrap method to upstream. 547 private static ByteBuff wrap(List<ByteBuffer> buffers, RefCnt refCnt) { 548 if (buffers == null || buffers.size() == 0) { 549 throw new IllegalArgumentException("buffers shouldn't be null or empty"); 550 } 551 return buffers.size() == 1 552 ? new SingleByteBuff(refCnt, buffers.get(0)) 553 : new MultiByteBuff(refCnt, buffers.toArray(new ByteBuffer[0])); 554 } 555 556 // Make this private because we don't want to expose the refCnt related wrap method to upstream. 557 private static ByteBuff wrap(ByteBuffer buffer, RefCnt refCnt) { 558 return new SingleByteBuff(refCnt, buffer); 559 } 560}