001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.nio; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.IOException; 022import java.nio.ByteBuffer; 023import java.nio.channels.FileChannel; 024import java.nio.channels.ReadableByteChannel; 025import java.util.List; 026import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler; 027import org.apache.hadoop.hbase.util.ByteBufferUtils; 028import org.apache.hadoop.hbase.util.Bytes; 029import org.apache.hadoop.hbase.util.ObjectIntPair; 030import org.apache.yetus.audience.InterfaceAudience; 031 032import org.apache.hbase.thirdparty.io.netty.util.internal.ObjectUtil; 033 034/** 035 * An abstract class that abstracts out as to how the byte buffers are used, either single or 036 * multiple. We have this interface because the java's ByteBuffers cannot be sub-classed. This class 037 * provides APIs similar to the ones provided in java's nio ByteBuffers and allows you to do 038 * positional reads/writes and relative reads and writes on the underlying BB. In addition to it, we 039 * have some additional APIs which helps us in the read path. <br/> 040 * The ByteBuff implement {@link HBaseReferenceCounted} interface which mean need to maintains a 041 * {@link RefCnt} inside, if ensure that the ByteBuff won't be used any more, we must do a 042 * {@link ByteBuff#release()} to recycle its NIO ByteBuffers. when considering the 043 * {@link ByteBuff#duplicate()} or {@link ByteBuff#slice()}, releasing either the duplicated one or 044 * the original one will free its memory, because they share the same NIO ByteBuffers. when you want 045 * to retain the NIO ByteBuffers even if the origin one called {@link ByteBuff#release()}, you can 046 * do like this: 047 * 048 * <pre> 049 * ByteBuff original = ...; 050 * ByteBuff dup = original.duplicate(); 051 * dup.retain(); 052 * original.release(); 053 * // The NIO buffers can still be accessed unless you release the duplicated one 054 * dup.get(...); 055 * dup.release(); 056 * // Both the original and dup can not access the NIO buffers any more. 057 * </pre> 058 */ 059@InterfaceAudience.Private 060public abstract class ByteBuff implements HBaseReferenceCounted { 061 private static final String REFERENCE_COUNT_NAME = "ReferenceCount"; 062 private static final int NIO_BUFFER_LIMIT = 64 * 1024; // should not be more than 64KB. 063 064 protected RefCnt refCnt; 065 066 /*************************** Methods for reference count **********************************/ 067 068 protected void checkRefCount() { 069 ObjectUtil.checkPositive(refCnt(), REFERENCE_COUNT_NAME); 070 } 071 072 @Override 073 public int refCnt() { 074 return refCnt.refCnt(); 075 } 076 077 @Override 078 public boolean release() { 079 return refCnt.release(); 080 } 081 082 /******************************* Methods for ByteBuff **************************************/ 083 084 /** Returns this ByteBuff's current position */ 085 public abstract int position(); 086 087 /** 088 * Sets this ByteBuff's position to the given value. 089 * @return this object 090 */ 091 public abstract ByteBuff position(int position); 092 093 /** 094 * Jumps the current position of this ByteBuff by specified length. 095 * @param len the length to be skipped 096 */ 097 public abstract ByteBuff skip(int len); 098 099 /** 100 * Jumps back the current position of this ByteBuff by specified length. 101 * @param len the length to move back 102 */ 103 public abstract ByteBuff moveBack(int len); 104 105 /** Returns the total capacity of this ByteBuff. */ 106 public abstract int capacity(); 107 108 /** Returns the limit of this ByteBuff */ 109 public abstract int limit(); 110 111 /** Marks the limit of this ByteBuff */ 112 public abstract ByteBuff limit(int limit); 113 114 /** Rewinds this ByteBuff and the position is set to 0 */ 115 public abstract ByteBuff rewind(); 116 117 /** Marks the current position of the ByteBuff */ 118 public abstract ByteBuff mark(); 119 120 /** 121 * Returns bytes from current position till length specified, as a single ByteBuffer. When all 122 * these bytes happen to be in a single ByteBuffer, which this object wraps, that ByteBuffer item 123 * as such will be returned. So users are warned not to change the position or limit of this 124 * returned ByteBuffer. The position of the returned byte buffer is at the begin of the required 125 * bytes. When the required bytes happen to span across multiple ByteBuffers, this API will copy 126 * the bytes to a newly created ByteBuffer of required size and return that. 127 * @param length number of bytes required. 128 * @return bytes from current position till length specified, as a single ByteButter. 129 */ 130 public abstract ByteBuffer asSubByteBuffer(int length); 131 132 /** 133 * Returns bytes from given offset till length specified, as a single ByteBuffer. When all these 134 * bytes happen to be in a single ByteBuffer, which this object wraps, that ByteBuffer item as 135 * such will be returned (with offset in this ByteBuffer where the bytes starts). So users are 136 * warned not to change the position or limit of this returned ByteBuffer. When the required bytes 137 * happen to span across multiple ByteBuffers, this API will copy the bytes to a newly created 138 * ByteBuffer of required size and return that. 139 * @param offset the offset in this ByteBuff from where the subBuffer should be created 140 * @param length the length of the subBuffer 141 * @param pair a pair that will have the bytes from the current position till length specified, 142 * as a single ByteBuffer and offset in that Buffer where the bytes starts. Since 143 * this API gets called in a loop we are passing a pair to it which could be created 144 * outside the loop and the method would set the values on the pair that is passed 145 * in by the caller. Thus it avoids more object creations that would happen if the 146 * pair that is returned is created by this method every time. 147 */ 148 public abstract void asSubByteBuffer(int offset, int length, ObjectIntPair<ByteBuffer> pair); 149 150 /** Returns the number of elements between the current position and the limit. */ 151 public abstract int remaining(); 152 153 /** Returns true if there are elements between the current position and the limit. */ 154 public abstract boolean hasRemaining(); 155 156 /** 157 * Similar to {@link ByteBuffer}.reset(), ensures that this ByteBuff is reset back to last marked 158 * position. 159 * @return This ByteBuff 160 */ 161 public abstract ByteBuff reset(); 162 163 /** 164 * Returns an ByteBuff which is a sliced version of this ByteBuff. The position, limit and mark of 165 * the new ByteBuff will be independent than that of the original ByteBuff. The content of the new 166 * ByteBuff will start at this ByteBuff's current position 167 * @return a sliced ByteBuff 168 */ 169 public abstract ByteBuff slice(); 170 171 /** 172 * Returns an ByteBuff which is a duplicate version of this ByteBuff. The position, limit and mark 173 * of the new ByteBuff will be independent than that of the original ByteBuff. The content of the 174 * new ByteBuff will start at this ByteBuff's current position The position, limit and mark of the 175 * new ByteBuff would be identical to this ByteBuff in terms of values. 176 * @return a sliced ByteBuff 177 */ 178 public abstract ByteBuff duplicate(); 179 180 /** 181 * A relative method that returns byte at the current position. Increments the current position by 182 * the size of a byte. 183 * @return the byte at the current position 184 */ 185 public abstract byte get(); 186 187 /** 188 * Fetches the byte at the given index. Does not change position of the underlying ByteBuffers 189 * @return the byte at the given index 190 */ 191 public abstract byte get(int index); 192 193 /** 194 * Fetches the byte at the given offset from current position. Does not change position of the 195 * underlying ByteBuffers. 196 * @return the byte value at the given index. 197 */ 198 public abstract byte getByteAfterPosition(int offset); 199 200 /** 201 * Writes a byte to this ByteBuff at the current position and increments the position 202 * @return this object 203 */ 204 public abstract ByteBuff put(byte b); 205 206 /** 207 * Writes a byte to this ByteBuff at the given index 208 * @return this object 209 */ 210 public abstract ByteBuff put(int index, byte b); 211 212 /** 213 * Copies the specified number of bytes from this ByteBuff's current position to the byte[]'s 214 * offset. Also advances the position of the ByteBuff by the given length. 215 * @param dst the byte[] to which the ByteBuff's content is to be copied 216 * @param offset within the current array 217 * @param length upto which the bytes to be copied 218 */ 219 public abstract void get(byte[] dst, int offset, int length); 220 221 /** 222 * Copies the specified number of bytes from this ByteBuff's given position to the byte[]'s 223 * offset. The position of the ByteBuff remains in the current position only 224 * @param sourceOffset the offset in this ByteBuff from where the copy should happen 225 * @param dst the byte[] to which the ByteBuff's content is to be copied 226 * @param offset within the current array 227 * @param length upto which the bytes to be copied 228 */ 229 public abstract void get(int sourceOffset, byte[] dst, int offset, int length); 230 231 /** 232 * Copies the content from this ByteBuff's current position to the byte array and fills it. Also 233 * advances the position of the ByteBuff by the length of the byte[]. 234 * @param dst the byte[] to which the ByteBuff's content is to be copied 235 */ 236 public abstract void get(byte[] dst); 237 238 /** 239 * Copies from the given byte[] to this ByteBuff 240 * @param src source byte array 241 * @param offset the position in the byte array from which the copy should be done 242 * @param length the length upto which the copy should happen 243 * @return this ByteBuff 244 */ 245 public abstract ByteBuff put(byte[] src, int offset, int length); 246 247 /** 248 * Copies from the given byte[] to this ByteBuff 249 * @return this ByteBuff 250 * @param src source byte array 251 * @return this ByteBuff 252 */ 253 public abstract ByteBuff put(byte[] src); 254 255 /** Returns true or false if the underlying BB support hasArray */ 256 public abstract boolean hasArray(); 257 258 /** Returns the byte[] if the underlying BB has single BB and hasArray true */ 259 public abstract byte[] array(); 260 261 /** Returns the arrayOffset of the byte[] incase of a single BB backed ByteBuff */ 262 public abstract int arrayOffset(); 263 264 /** 265 * Returns the short value at the current position. Also advances the position by the size of 266 * short. 267 */ 268 public abstract short getShort(); 269 270 /** 271 * Fetches the short value at the given index. Does not change position of the underlying 272 * ByteBuffers. The caller is sure that the index will be after the current position of this 273 * ByteBuff. So even if the current short does not fit in the current item we can safely move to 274 * the next item and fetch the remaining bytes forming the short 275 * @return the short value at the given index 276 */ 277 public abstract short getShort(int index); 278 279 /** 280 * Fetches the short value at the given offset from current position. Does not change position of 281 * the underlying ByteBuffers. 282 * @return the short value at the given index. 283 */ 284 public abstract short getShortAfterPosition(int offset); 285 286 /** 287 * Returns the int value at the current position. Also advances the position by the size of int. 288 */ 289 public abstract int getInt(); 290 291 /** 292 * Writes an int to this ByteBuff at its current position. Also advances the position by size of 293 * int. 294 */ 295 public abstract ByteBuff putInt(int value); 296 297 /** 298 * Fetches the int at the given index. Does not change position of the underlying ByteBuffers. 299 * Even if the current int does not fit in the current item we can safely move to the next item 300 * and fetch the remaining bytes forming the int. 301 */ 302 public abstract int getInt(int index); 303 304 /** 305 * Fetches the int value at the given offset from current position. Does not change position of 306 * the underlying ByteBuffers. 307 */ 308 public abstract int getIntAfterPosition(int offset); 309 310 /** 311 * Returns the long value at the current position. Also advances the position by the size of long. 312 */ 313 public abstract long getLong(); 314 315 /** 316 * Writes a long to this ByteBuff at its current position. Also advances the position by size of 317 * long. 318 */ 319 public abstract ByteBuff putLong(long value); 320 321 /** 322 * Fetches the long at the given index. Does not change position of the underlying ByteBuffers. 323 * The caller is sure that the index will be after the current position of this ByteBuff. So even 324 * if the current long does not fit in the current item we can safely move to the next item and 325 * fetch the remaining bytes forming the long 326 * @return the long value at the given index 327 */ 328 public abstract long getLong(int index); 329 330 /** 331 * Fetches the long value at the given offset from current position. Does not change position of 332 * the underlying ByteBuffers. 333 * @return the long value at the given index. 334 */ 335 public abstract long getLongAfterPosition(int offset); 336 337 /** 338 * Copy the content from this ByteBuff to a byte[]. 339 */ 340 public byte[] toBytes() { 341 return toBytes(0, this.limit()); 342 } 343 344 /** 345 * Copy the content from this ByteBuff to a byte[] based on the given offset and length. 346 */ 347 public abstract byte[] toBytes(int offset, int length); 348 349 /** 350 * Copies the content from this ByteBuff to a ByteBuffer Note : This will advance the position 351 * marker of {@code out} but not change the position maker for this ByteBuff 352 * @param out the ByteBuffer to which the copy has to happen 353 * @param sourceOffset the offset in the ByteBuff from which the elements has to be copied 354 * @param length the length in this ByteBuff upto which the elements has to be copied 355 */ 356 public abstract void get(ByteBuffer out, int sourceOffset, int length); 357 358 /** 359 * Copies the contents from the src ByteBuff to this ByteBuff. This will be absolute positional 360 * copying and won't affect the position of any of the buffers. 361 * @param offset the position in this ByteBuff to which the copy should happen 362 * @param src the src ByteBuff 363 * @param srcOffset the offset in the src ByteBuff from where the elements should be read 364 * @param length the length up to which the copy should happen 365 */ 366 public abstract ByteBuff put(int offset, ByteBuff src, int srcOffset, int length); 367 368 /** Reads bytes from the given channel into this ByteBuf. */ 369 public abstract int read(ReadableByteChannel channel) throws IOException; 370 371 /** Reads bytes from FileChannel into this ByteBuff */ 372 public abstract int read(FileChannel channel, long offset) throws IOException; 373 374 /** Write this ByteBuff's data into target file */ 375 public abstract int write(FileChannel channel, long offset) throws IOException; 376 377 /** Functional interface for Channel read */ 378 @FunctionalInterface 379 interface ChannelReader { 380 int read(ReadableByteChannel channel, ByteBuffer buf, long offset) throws IOException; 381 } 382 383 static final ChannelReader CHANNEL_READER = (channel, buf, offset) -> { 384 return channel.read(buf); 385 }; 386 387 static final ChannelReader FILE_READER = (channel, buf, offset) -> { 388 return ((FileChannel) channel).read(buf, offset); 389 }; 390 391 // static helper methods 392 public static int read(ReadableByteChannel channel, ByteBuffer buf, long offset, 393 ChannelReader reader) throws IOException { 394 if (buf.remaining() <= NIO_BUFFER_LIMIT) { 395 return reader.read(channel, buf, offset); 396 } 397 int originalLimit = buf.limit(); 398 int initialRemaining = buf.remaining(); 399 int ret = 0; 400 401 while (buf.remaining() > 0) { 402 try { 403 int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT); 404 buf.limit(buf.position() + ioSize); 405 offset += ret; 406 ret = reader.read(channel, buf, offset); 407 if (ret < ioSize) { 408 break; 409 } 410 } finally { 411 buf.limit(originalLimit); 412 } 413 } 414 int nBytes = initialRemaining - buf.remaining(); 415 return (nBytes > 0) ? nBytes : ret; 416 } 417 418 /** Read integer from ByteBuff coded in 7 bits and increment position. */ 419 public static int readCompressedInt(ByteBuff buf) { 420 byte b = buf.get(); 421 if ((b & ByteBufferUtils.NEXT_BIT_MASK) != 0) { 422 return (b & ByteBufferUtils.VALUE_MASK) 423 + (readCompressedInt(buf) << ByteBufferUtils.NEXT_BIT_SHIFT); 424 } 425 return b & ByteBufferUtils.VALUE_MASK; 426 } 427 428 /** 429 * Compares two ByteBuffs 430 * @param buf1 the first ByteBuff 431 * @param o1 the offset in the first ByteBuff from where the compare has to happen 432 * @param len1 the length in the first ByteBuff upto which the compare has to happen 433 * @param buf2 the second ByteBuff 434 * @param o2 the offset in the second ByteBuff from where the compare has to happen 435 * @param len2 the length in the second ByteBuff upto which the compare has to happen 436 * @return Positive if buf1 is bigger than buf2, 0 if they are equal, and negative if buf1 is 437 * smaller than buf2. 438 */ 439 public static int compareTo(ByteBuff buf1, int o1, int len1, ByteBuff buf2, int o2, int len2) { 440 if (buf1.hasArray() && buf2.hasArray()) { 441 return Bytes.compareTo(buf1.array(), buf1.arrayOffset() + o1, len1, buf2.array(), 442 buf2.arrayOffset() + o2, len2); 443 } 444 int end1 = o1 + len1; 445 int end2 = o2 + len2; 446 for (int i = o1, j = o2; i < end1 && j < end2; i++, j++) { 447 int a = buf1.get(i) & 0xFF; 448 int b = buf2.get(j) & 0xFF; 449 if (a != b) { 450 return a - b; 451 } 452 } 453 return len1 - len2; 454 } 455 456 /** 457 * Read long which was written to fitInBytes bytes and increment position. 458 * @param fitInBytes In how many bytes given long is stored. 459 * @return The value of parsed long. 460 */ 461 public static long readLong(ByteBuff in, final int fitInBytes) { 462 long tmpLength = 0; 463 for (int i = 0; i < fitInBytes; ++i) { 464 tmpLength |= (in.get() & 0xffl) << (8l * i); 465 } 466 return tmpLength; 467 } 468 469 public abstract ByteBuffer[] nioByteBuffers(); 470 471 @Override 472 public String toString() { 473 return this.getClass().getSimpleName() + "[pos=" + position() + ", lim=" + limit() + ", cap= " 474 + capacity() + "]"; 475 } 476 477 /********************************* ByteBuff wrapper methods ***********************************/ 478 479 /** 480 * In theory, the upstream should never construct an ByteBuff by passing an given refCnt, so 481 * please don't use this public method in other place. Make the method public here because the 482 * BucketEntry#wrapAsCacheable in hbase-server module will use its own refCnt and ByteBuffers from 483 * IOEngine to composite an HFileBlock's ByteBuff, we didn't find a better way so keep the public 484 * way here. 485 */ 486 public static ByteBuff wrap(ByteBuffer[] buffers, RefCnt refCnt) { 487 if (buffers == null || buffers.length == 0) { 488 throw new IllegalArgumentException("buffers shouldn't be null or empty"); 489 } 490 return buffers.length == 1 491 ? new SingleByteBuff(refCnt, buffers[0]) 492 : new MultiByteBuff(refCnt, buffers); 493 } 494 495 public static ByteBuff wrap(ByteBuffer[] buffers, Recycler recycler) { 496 return wrap(buffers, RefCnt.create(recycler)); 497 } 498 499 public static ByteBuff wrap(ByteBuffer[] buffers) { 500 return wrap(buffers, RefCnt.create()); 501 } 502 503 public static ByteBuff wrap(List<ByteBuffer> buffers, Recycler recycler) { 504 return wrap(buffers, RefCnt.create(recycler)); 505 } 506 507 public static ByteBuff wrap(List<ByteBuffer> buffers) { 508 return wrap(buffers, RefCnt.create()); 509 } 510 511 public static ByteBuff wrap(ByteBuffer buffer) { 512 return wrap(buffer, RefCnt.create()); 513 } 514 515 /** 516 * Calling this method in strategic locations where ByteBuffs are referenced may help diagnose 517 * potential buffer leaks. We pass the buffer itself as a default hint, but one can use 518 * {@link #touch(Object)} to pass their own hint as well. 519 */ 520 @Override 521 public ByteBuff touch() { 522 return touch(this); 523 } 524 525 @Override 526 public ByteBuff touch(Object hint) { 527 refCnt.touch(hint); 528 return this; 529 } 530 531 @RestrictedApi(explanation = "Should only be called in tests", link = "", 532 allowedOnPath = ".*/src/test/.*") 533 public RefCnt getRefCnt() { 534 return refCnt; 535 } 536 537 // Make this private because we don't want to expose the refCnt related wrap method to upstream. 538 private static ByteBuff wrap(List<ByteBuffer> buffers, RefCnt refCnt) { 539 if (buffers == null || buffers.size() == 0) { 540 throw new IllegalArgumentException("buffers shouldn't be null or empty"); 541 } 542 return buffers.size() == 1 543 ? new SingleByteBuff(refCnt, buffers.get(0)) 544 : new MultiByteBuff(refCnt, buffers.toArray(new ByteBuffer[0])); 545 } 546 547 // Make this private because we don't want to expose the refCnt related wrap method to upstream. 548 private static ByteBuff wrap(ByteBuffer buffer, RefCnt refCnt) { 549 return new SingleByteBuff(refCnt, buffer); 550 } 551}