001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.nio; 019 020import java.io.IOException; 021import java.nio.ByteBuffer; 022import java.nio.channels.FileChannel; 023import java.nio.channels.ReadableByteChannel; 024import java.util.List; 025 026import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler; 027import org.apache.hadoop.hbase.util.ByteBufferUtils; 028import org.apache.hadoop.hbase.util.Bytes; 029import org.apache.hadoop.hbase.util.ObjectIntPair; 030import org.apache.yetus.audience.InterfaceAudience; 031 032import org.apache.hbase.thirdparty.io.netty.util.internal.ObjectUtil; 033 034 035/** 036 * An abstract class that abstracts out as to how the byte buffers are used, either single or 037 * multiple. We have this interface because the java's ByteBuffers cannot be sub-classed. This class 038 * provides APIs similar to the ones provided in java's nio ByteBuffers and allows you to do 039 * positional reads/writes and relative reads and writes on the underlying BB. In addition to it, we 040 * have some additional APIs which helps us in the read path. <br/> 041 * The ByteBuff implement {@link HBaseReferenceCounted} interface which mean need to maintains a 042 * {@link RefCnt} inside, if ensure that the ByteBuff won't be used any more, we must do a 043 * {@link ByteBuff#release()} to recycle its NIO ByteBuffers. when considering the 044 * {@link ByteBuff#duplicate()} or {@link ByteBuff#slice()}, releasing either the duplicated one or 045 * the original one will free its memory, because they share the same NIO ByteBuffers. when you want 046 * to retain the NIO ByteBuffers even if the origin one called {@link ByteBuff#release()}, you can 047 * do like this: 048 * 049 * <pre> 050 * ByteBuff original = ...; 051 * ByteBuff dup = original.duplicate(); 052 * dup.retain(); 053 * original.release(); 054 * // The NIO buffers can still be accessed unless you release the duplicated one 055 * dup.get(...); 056 * dup.release(); 057 * // Both the original and dup can not access the NIO buffers any more. 058 * </pre> 059 */ 060@InterfaceAudience.Private 061public abstract class ByteBuff implements HBaseReferenceCounted { 062 private static final String REFERENCE_COUNT_NAME = "ReferenceCount"; 063 private static final int NIO_BUFFER_LIMIT = 64 * 1024; // should not be more than 64KB. 064 065 protected RefCnt refCnt; 066 067 /*************************** Methods for reference count **********************************/ 068 069 protected void checkRefCount() { 070 ObjectUtil.checkPositive(refCnt(), REFERENCE_COUNT_NAME); 071 } 072 073 public int refCnt() { 074 return refCnt.refCnt(); 075 } 076 077 @Override 078 public boolean release() { 079 return refCnt.release(); 080 } 081 082 /******************************* Methods for ByteBuff **************************************/ 083 084 /** 085 * @return this ByteBuff's current position 086 */ 087 public abstract int position(); 088 089 /** 090 * Sets this ByteBuff's position to the given value. 091 * @param position 092 * @return this object 093 */ 094 public abstract ByteBuff position(int position); 095 096 /** 097 * Jumps the current position of this ByteBuff by specified length. 098 * @param len the length to be skipped 099 */ 100 public abstract ByteBuff skip(int len); 101 102 /** 103 * Jumps back the current position of this ByteBuff by specified length. 104 * @param len the length to move back 105 */ 106 public abstract ByteBuff moveBack(int len); 107 108 /** 109 * @return the total capacity of this ByteBuff. 110 */ 111 public abstract int capacity(); 112 113 /** 114 * Returns the limit of this ByteBuff 115 * @return limit of the ByteBuff 116 */ 117 public abstract int limit(); 118 119 /** 120 * Marks the limit of this ByteBuff. 121 * @param limit 122 * @return This ByteBuff 123 */ 124 public abstract ByteBuff limit(int limit); 125 126 /** 127 * Rewinds this ByteBuff and the position is set to 0 128 * @return this object 129 */ 130 public abstract ByteBuff rewind(); 131 132 /** 133 * Marks the current position of the ByteBuff 134 * @return this object 135 */ 136 public abstract ByteBuff mark(); 137 138 /** 139 * Returns bytes from current position till length specified, as a single ByteBuffer. When all 140 * these bytes happen to be in a single ByteBuffer, which this object wraps, that ByteBuffer item 141 * as such will be returned. So users are warned not to change the position or limit of this 142 * returned ByteBuffer. The position of the returned byte buffer is at the begin of the required 143 * bytes. When the required bytes happen to span across multiple ByteBuffers, this API will copy 144 * the bytes to a newly created ByteBuffer of required size and return that. 145 * 146 * @param length number of bytes required. 147 * @return bytes from current position till length specified, as a single ByteButter. 148 */ 149 public abstract ByteBuffer asSubByteBuffer(int length); 150 151 /** 152 * Returns bytes from given offset till length specified, as a single ByteBuffer. When all these 153 * bytes happen to be in a single ByteBuffer, which this object wraps, that ByteBuffer item as 154 * such will be returned (with offset in this ByteBuffer where the bytes starts). So users are 155 * warned not to change the position or limit of this returned ByteBuffer. When the required bytes 156 * happen to span across multiple ByteBuffers, this API will copy the bytes to a newly created 157 * ByteBuffer of required size and return that. 158 * 159 * @param offset the offset in this ByteBuff from where the subBuffer should be created 160 * @param length the length of the subBuffer 161 * @param pair a pair that will have the bytes from the current position till length specified, 162 * as a single ByteBuffer and offset in that Buffer where the bytes starts. 163 * Since this API gets called in a loop we are passing a pair to it which could be created 164 * outside the loop and the method would set the values on the pair that is passed in by 165 * the caller. Thus it avoids more object creations that would happen if the pair that is 166 * returned is created by this method every time. 167 */ 168 public abstract void asSubByteBuffer(int offset, int length, ObjectIntPair<ByteBuffer> pair); 169 170 /** 171 * Returns the number of elements between the current position and the 172 * limit. 173 * @return the remaining elements in this ByteBuff 174 */ 175 public abstract int remaining(); 176 177 /** 178 * Returns true if there are elements between the current position and the limt 179 * @return true if there are elements, false otherwise 180 */ 181 public abstract boolean hasRemaining(); 182 183 /** 184 * Similar to {@link ByteBuffer}.reset(), ensures that this ByteBuff 185 * is reset back to last marked position. 186 * @return This ByteBuff 187 */ 188 public abstract ByteBuff reset(); 189 190 /** 191 * Returns an ByteBuff which is a sliced version of this ByteBuff. The position, limit and mark 192 * of the new ByteBuff will be independent than that of the original ByteBuff. 193 * The content of the new ByteBuff will start at this ByteBuff's current position 194 * @return a sliced ByteBuff 195 */ 196 public abstract ByteBuff slice(); 197 198 /** 199 * Returns an ByteBuff which is a duplicate version of this ByteBuff. The 200 * position, limit and mark of the new ByteBuff will be independent than that 201 * of the original ByteBuff. The content of the new ByteBuff will start at 202 * this ByteBuff's current position The position, limit and mark of the new 203 * ByteBuff would be identical to this ByteBuff in terms of values. 204 * 205 * @return a sliced ByteBuff 206 */ 207 public abstract ByteBuff duplicate(); 208 209 /** 210 * A relative method that returns byte at the current position. Increments the 211 * current position by the size of a byte. 212 * @return the byte at the current position 213 */ 214 public abstract byte get(); 215 216 /** 217 * Fetches the byte at the given index. Does not change position of the underlying ByteBuffers 218 * @param index 219 * @return the byte at the given index 220 */ 221 public abstract byte get(int index); 222 223 /** 224 * Fetches the byte at the given offset from current position. Does not change position 225 * of the underlying ByteBuffers. 226 * 227 * @param offset 228 * @return the byte value at the given index. 229 */ 230 public abstract byte getByteAfterPosition(int offset); 231 232 /** 233 * Writes a byte to this ByteBuff at the current position and increments the position 234 * @param b 235 * @return this object 236 */ 237 public abstract ByteBuff put(byte b); 238 239 /** 240 * Writes a byte to this ByteBuff at the given index 241 * @param index 242 * @param b 243 * @return this object 244 */ 245 public abstract ByteBuff put(int index, byte b); 246 247 /** 248 * Copies the specified number of bytes from this ByteBuff's current position to 249 * the byte[]'s offset. Also advances the position of the ByteBuff by the given length. 250 * @param dst 251 * @param offset within the current array 252 * @param length upto which the bytes to be copied 253 */ 254 public abstract void get(byte[] dst, int offset, int length); 255 256 /** 257 * Copies the specified number of bytes from this ByteBuff's given position to 258 * the byte[]'s offset. The position of the ByteBuff remains in the current position only 259 * @param sourceOffset the offset in this ByteBuff from where the copy should happen 260 * @param dst the byte[] to which the ByteBuff's content is to be copied 261 * @param offset within the current array 262 * @param length upto which the bytes to be copied 263 */ 264 public abstract void get(int sourceOffset, byte[] dst, int offset, int length); 265 266 /** 267 * Copies the content from this ByteBuff's current position to the byte array and fills it. Also 268 * advances the position of the ByteBuff by the length of the byte[]. 269 * @param dst 270 */ 271 public abstract void get(byte[] dst); 272 273 /** 274 * Copies from the given byte[] to this ByteBuff 275 * @param src 276 * @param offset the position in the byte array from which the copy should be done 277 * @param length the length upto which the copy should happen 278 * @return this ByteBuff 279 */ 280 public abstract ByteBuff put(byte[] src, int offset, int length); 281 282 /** 283 * Copies from the given byte[] to this ByteBuff 284 * @param src 285 * @return this ByteBuff 286 */ 287 public abstract ByteBuff put(byte[] src); 288 289 /** 290 * @return true or false if the underlying BB support hasArray 291 */ 292 public abstract boolean hasArray(); 293 294 /** 295 * @return the byte[] if the underlying BB has single BB and hasArray true 296 */ 297 public abstract byte[] array(); 298 299 /** 300 * @return the arrayOffset of the byte[] incase of a single BB backed ByteBuff 301 */ 302 public abstract int arrayOffset(); 303 304 /** 305 * Returns the short value at the current position. Also advances the position by the size 306 * of short 307 * 308 * @return the short value at the current position 309 */ 310 public abstract short getShort(); 311 312 /** 313 * Fetches the short value at the given index. Does not change position of the 314 * underlying ByteBuffers. The caller is sure that the index will be after 315 * the current position of this ByteBuff. So even if the current short does not fit in the 316 * current item we can safely move to the next item and fetch the remaining bytes forming 317 * the short 318 * 319 * @param index 320 * @return the short value at the given index 321 */ 322 public abstract short getShort(int index); 323 324 /** 325 * Fetches the short value at the given offset from current position. Does not change position 326 * of the underlying ByteBuffers. 327 * 328 * @param offset 329 * @return the short value at the given index. 330 */ 331 public abstract short getShortAfterPosition(int offset); 332 333 /** 334 * Returns the int value at the current position. Also advances the position by the size of int 335 * 336 * @return the int value at the current position 337 */ 338 public abstract int getInt(); 339 340 /** 341 * Writes an int to this ByteBuff at its current position. Also advances the position 342 * by size of int 343 * @param value Int value to write 344 * @return this object 345 */ 346 public abstract ByteBuff putInt(int value); 347 348 /** 349 * Fetches the int at the given index. Does not change position of the underlying ByteBuffers. 350 * Even if the current int does not fit in the 351 * current item we can safely move to the next item and fetch the remaining bytes forming 352 * the int 353 * 354 * @param index 355 * @return the int value at the given index 356 */ 357 public abstract int getInt(int index); 358 359 /** 360 * Fetches the int value at the given offset from current position. Does not change position 361 * of the underlying ByteBuffers. 362 * 363 * @param offset 364 * @return the int value at the given index. 365 */ 366 public abstract int getIntAfterPosition(int offset); 367 368 /** 369 * Returns the long value at the current position. Also advances the position by the size of long 370 * 371 * @return the long value at the current position 372 */ 373 public abstract long getLong(); 374 375 /** 376 * Writes a long to this ByteBuff at its current position. 377 * Also advances the position by size of long 378 * @param value Long value to write 379 * @return this object 380 */ 381 public abstract ByteBuff putLong(long value); 382 383 /** 384 * Fetches the long at the given index. Does not change position of the 385 * underlying ByteBuffers. The caller is sure that the index will be after 386 * the current position of this ByteBuff. So even if the current long does not fit in the 387 * current item we can safely move to the next item and fetch the remaining bytes forming 388 * the long 389 * 390 * @param index 391 * @return the long value at the given index 392 */ 393 public abstract long getLong(int index); 394 395 /** 396 * Fetches the long value at the given offset from current position. Does not change position 397 * of the underlying ByteBuffers. 398 * 399 * @param offset 400 * @return the long value at the given index. 401 */ 402 public abstract long getLongAfterPosition(int offset); 403 404 /** 405 * Copy the content from this ByteBuff to a byte[]. 406 * @return byte[] with the copied contents from this ByteBuff. 407 */ 408 public byte[] toBytes() { 409 return toBytes(0, this.limit()); 410 } 411 412 /** 413 * Copy the content from this ByteBuff to a byte[] based on the given offset and 414 * length 415 * 416 * @param offset 417 * the position from where the copy should start 418 * @param length 419 * the length upto which the copy has to be done 420 * @return byte[] with the copied contents from this ByteBuff. 421 */ 422 public abstract byte[] toBytes(int offset, int length); 423 424 /** 425 * Copies the content from this ByteBuff to a ByteBuffer 426 * Note : This will advance the position marker of {@code out} but not change the position maker 427 * for this ByteBuff 428 * @param out the ByteBuffer to which the copy has to happen 429 * @param sourceOffset the offset in the ByteBuff from which the elements has 430 * to be copied 431 * @param length the length in this ByteBuff upto which the elements has to be copied 432 */ 433 public abstract void get(ByteBuffer out, int sourceOffset, int length); 434 435 /** 436 * Copies the contents from the src ByteBuff to this ByteBuff. This will be 437 * absolute positional copying and 438 * won't affect the position of any of the buffers. 439 * @param offset the position in this ByteBuff to which the copy should happen 440 * @param src the src ByteBuff 441 * @param srcOffset the offset in the src ByteBuff from where the elements should be read 442 * @param length the length up to which the copy should happen 443 */ 444 public abstract ByteBuff put(int offset, ByteBuff src, int srcOffset, int length); 445 446 /** 447 * Reads bytes from the given channel into this ByteBuff 448 * @param channel 449 * @return The number of bytes read from the channel 450 * @throws IOException 451 */ 452 public abstract int read(ReadableByteChannel channel) throws IOException; 453 454 /** 455 * Reads bytes from FileChannel into this ByteBuff 456 */ 457 public abstract int read(FileChannel channel, long offset) throws IOException; 458 459 /** 460 * Write this ByteBuff's data into target file 461 */ 462 public abstract int write(FileChannel channel, long offset) throws IOException; 463 464 /** 465 * function interface for Channel read 466 */ 467 @FunctionalInterface 468 interface ChannelReader { 469 int read(ReadableByteChannel channel, ByteBuffer buf, long offset) throws IOException; 470 } 471 472 static final ChannelReader CHANNEL_READER = (channel, buf, offset) -> { 473 return channel.read(buf); 474 }; 475 476 static final ChannelReader FILE_READER = (channel, buf, offset) -> { 477 return ((FileChannel)channel).read(buf, offset); 478 }; 479 480 // static helper methods 481 public static int read(ReadableByteChannel channel, ByteBuffer buf, long offset, 482 ChannelReader reader) throws IOException { 483 if (buf.remaining() <= NIO_BUFFER_LIMIT) { 484 return reader.read(channel, buf, offset); 485 } 486 int originalLimit = buf.limit(); 487 int initialRemaining = buf.remaining(); 488 int ret = 0; 489 490 while (buf.remaining() > 0) { 491 try { 492 int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT); 493 buf.limit(buf.position() + ioSize); 494 offset += ret; 495 ret = reader.read(channel, buf, offset); 496 if (ret < ioSize) { 497 break; 498 } 499 } finally { 500 buf.limit(originalLimit); 501 } 502 } 503 int nBytes = initialRemaining - buf.remaining(); 504 return (nBytes > 0) ? nBytes : ret; 505 } 506 507 /** 508 * Read integer from ByteBuff coded in 7 bits and increment position. 509 * @return Read integer. 510 */ 511 public static int readCompressedInt(ByteBuff buf) { 512 byte b = buf.get(); 513 if ((b & ByteBufferUtils.NEXT_BIT_MASK) != 0) { 514 return (b & ByteBufferUtils.VALUE_MASK) 515 + (readCompressedInt(buf) << ByteBufferUtils.NEXT_BIT_SHIFT); 516 } 517 return b & ByteBufferUtils.VALUE_MASK; 518 } 519 520 /** 521 * Compares two ByteBuffs 522 * 523 * @param buf1 the first ByteBuff 524 * @param o1 the offset in the first ByteBuff from where the compare has to happen 525 * @param len1 the length in the first ByteBuff upto which the compare has to happen 526 * @param buf2 the second ByteBuff 527 * @param o2 the offset in the second ByteBuff from where the compare has to happen 528 * @param len2 the length in the second ByteBuff upto which the compare has to happen 529 * @return Positive if buf1 is bigger than buf2, 0 if they are equal, and negative if buf1 is 530 * smaller than buf2. 531 */ 532 public static int compareTo(ByteBuff buf1, int o1, int len1, ByteBuff buf2, 533 int o2, int len2) { 534 if (buf1.hasArray() && buf2.hasArray()) { 535 return Bytes.compareTo(buf1.array(), buf1.arrayOffset() + o1, len1, buf2.array(), 536 buf2.arrayOffset() + o2, len2); 537 } 538 int end1 = o1 + len1; 539 int end2 = o2 + len2; 540 for (int i = o1, j = o2; i < end1 && j < end2; i++, j++) { 541 int a = buf1.get(i) & 0xFF; 542 int b = buf2.get(j) & 0xFF; 543 if (a != b) { 544 return a - b; 545 } 546 } 547 return len1 - len2; 548 } 549 550 /** 551 * Read long which was written to fitInBytes bytes and increment position. 552 * @param fitInBytes In how many bytes given long is stored. 553 * @return The value of parsed long. 554 */ 555 public static long readLong(ByteBuff in, final int fitInBytes) { 556 long tmpLength = 0; 557 for (int i = 0; i < fitInBytes; ++i) { 558 tmpLength |= (in.get() & 0xffl) << (8l * i); 559 } 560 return tmpLength; 561 } 562 563 public abstract ByteBuffer[] nioByteBuffers(); 564 565 @Override 566 public String toString() { 567 return this.getClass().getSimpleName() + "[pos=" + position() + ", lim=" + limit() + 568 ", cap= " + capacity() + "]"; 569 } 570 571 /********************************* ByteBuff wrapper methods ***********************************/ 572 573 /** 574 * In theory, the upstream should never construct an ByteBuff by passing an given refCnt, so 575 * please don't use this public method in other place. Make the method public here because the 576 * BucketEntry#wrapAsCacheable in hbase-server module will use its own refCnt and ByteBuffers from 577 * IOEngine to composite an HFileBlock's ByteBuff, we didn't find a better way so keep the public 578 * way here. 579 */ 580 public static ByteBuff wrap(ByteBuffer[] buffers, RefCnt refCnt) { 581 if (buffers == null || buffers.length == 0) { 582 throw new IllegalArgumentException("buffers shouldn't be null or empty"); 583 } 584 return buffers.length == 1 ? new SingleByteBuff(refCnt, buffers[0]) 585 : new MultiByteBuff(refCnt, buffers); 586 } 587 588 public static ByteBuff wrap(ByteBuffer[] buffers, Recycler recycler) { 589 return wrap(buffers, RefCnt.create(recycler)); 590 } 591 592 public static ByteBuff wrap(ByteBuffer[] buffers) { 593 return wrap(buffers, RefCnt.create()); 594 } 595 596 public static ByteBuff wrap(List<ByteBuffer> buffers, Recycler recycler) { 597 return wrap(buffers, RefCnt.create(recycler)); 598 } 599 600 public static ByteBuff wrap(List<ByteBuffer> buffers) { 601 return wrap(buffers, RefCnt.create()); 602 } 603 604 public static ByteBuff wrap(ByteBuffer buffer) { 605 return wrap(buffer, RefCnt.create()); 606 } 607 608 /** 609 * Make this private because we don't want to expose the refCnt related wrap method to upstream. 610 */ 611 private static ByteBuff wrap(List<ByteBuffer> buffers, RefCnt refCnt) { 612 if (buffers == null || buffers.size() == 0) { 613 throw new IllegalArgumentException("buffers shouldn't be null or empty"); 614 } 615 return buffers.size() == 1 ? new SingleByteBuff(refCnt, buffers.get(0)) 616 : new MultiByteBuff(refCnt, buffers.toArray(new ByteBuffer[0])); 617 } 618 619 /** 620 * Make this private because we don't want to expose the refCnt related wrap method to upstream. 621 */ 622 private static ByteBuff wrap(ByteBuffer buffer, RefCnt refCnt) { 623 return new SingleByteBuff(refCnt, buffer); 624 } 625}