001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.util; 019 020import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DIRECT_BYTES_READ_KEY; 021import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.HEAP_BYTES_READ_KEY; 022 023import io.opentelemetry.api.common.Attributes; 024import io.opentelemetry.api.common.AttributesBuilder; 025import io.opentelemetry.api.trace.Span; 026import io.opentelemetry.context.Context; 027import java.io.IOException; 028import java.io.InputStream; 029import java.lang.reflect.InvocationTargetException; 030import java.lang.reflect.Method; 031import java.nio.ByteBuffer; 032import java.util.Optional; 033import org.apache.hadoop.fs.ByteBufferReadable; 034import org.apache.hadoop.fs.FSDataInputStream; 035import org.apache.hadoop.hbase.io.hfile.trace.HFileContextAttributesBuilderConsumer; 036import org.apache.hadoop.hbase.nio.ByteBuff; 037import org.apache.hadoop.io.IOUtils; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042@InterfaceAudience.Private 043public final class BlockIOUtils { 044 private static final Logger LOG = LoggerFactory.getLogger(BlockIOUtils.class); 045 // TODO: remove the reflection when we update to Hadoop 3.3 or above. 046 private static Method byteBufferPositionedReadMethod; 047 048 static { 049 initByteBufferPositionReadableMethod(); 050 } 051 052 // Disallow instantiation 053 private BlockIOUtils() { 054 055 } 056 057 private static void initByteBufferPositionReadableMethod() { 058 try { 059 // long position, ByteBuffer buf 060 byteBufferPositionedReadMethod = 061 FSDataInputStream.class.getMethod("read", long.class, ByteBuffer.class); 062 } catch (NoSuchMethodException e) { 063 LOG.debug("Unable to find positioned bytebuffer read API of FSDataInputStream. " 064 + "preadWithExtra() will use a temporary on-heap byte array."); 065 } 066 } 067 068 public static boolean isByteBufferReadable(FSDataInputStream is) { 069 InputStream cur = is.getWrappedStream(); 070 for (;;) { 071 if ((cur instanceof FSDataInputStream)) { 072 cur = ((FSDataInputStream) cur).getWrappedStream(); 073 } else { 074 break; 075 } 076 } 077 return cur instanceof ByteBufferReadable; 078 } 079 080 /** 081 * Read length bytes into ByteBuffers directly. 082 * @param buf the destination {@link ByteBuff} 083 * @param dis the HDFS input stream which implement the ByteBufferReadable interface. 084 * @param length bytes to read. 085 * @throws IOException exception to throw if any error happen 086 */ 087 public static void readFully(ByteBuff buf, FSDataInputStream dis, int length) throws IOException { 088 final Span span = Span.current(); 089 final AttributesBuilder attributesBuilder = builderFromContext(Context.current()); 090 if (!isByteBufferReadable(dis)) { 091 // If InputStream does not support the ByteBuffer read, just read to heap and copy bytes to 092 // the destination ByteBuff. 093 byte[] heapBuf = new byte[length]; 094 IOUtils.readFully(dis, heapBuf, 0, length); 095 annotateHeapBytesRead(attributesBuilder, length); 096 span.addEvent("BlockIOUtils.readFully", attributesBuilder.build()); 097 copyToByteBuff(heapBuf, 0, length, buf); 098 return; 099 } 100 int directBytesRead = 0, heapBytesRead = 0; 101 ByteBuffer[] buffers = buf.nioByteBuffers(); 102 int remain = length; 103 int idx = 0; 104 ByteBuffer cur = buffers[idx]; 105 try { 106 while (remain > 0) { 107 while (!cur.hasRemaining()) { 108 if (++idx >= buffers.length) { 109 throw new IOException( 110 "Not enough ByteBuffers to read the reminding " + remain + " " + "bytes"); 111 } 112 cur = buffers[idx]; 113 } 114 cur.limit(cur.position() + Math.min(remain, cur.remaining())); 115 int bytesRead = dis.read(cur); 116 if (bytesRead < 0) { 117 throw new IOException( 118 "Premature EOF from inputStream, but still need " + remain + " " + "bytes"); 119 } 120 remain -= bytesRead; 121 if (cur.isDirect()) { 122 directBytesRead += bytesRead; 123 } else { 124 heapBytesRead += bytesRead; 125 } 126 } 127 } finally { 128 annotateBytesRead(attributesBuilder, directBytesRead, heapBytesRead); 129 span.addEvent("BlockIOUtils.readFully", attributesBuilder.build()); 130 } 131 } 132 133 /** 134 * Copying bytes from InputStream to {@link ByteBuff} by using an temporary heap byte[] (default 135 * size is 1024 now). 136 * @param in the InputStream to read 137 * @param out the destination {@link ByteBuff} 138 * @param length to read 139 * @throws IOException if any io error encountered. 140 */ 141 public static void readFullyWithHeapBuffer(InputStream in, ByteBuff out, int length) 142 throws IOException { 143 if (length < 0) { 144 throw new IllegalArgumentException("Length must not be negative: " + length); 145 } 146 int heapBytesRead = 0; 147 int remain = length, count; 148 byte[] buffer = new byte[1024]; 149 try { 150 while (remain > 0) { 151 count = in.read(buffer, 0, Math.min(remain, buffer.length)); 152 if (count < 0) { 153 throw new IOException( 154 "Premature EOF from inputStream, but still need " + remain + " bytes"); 155 } 156 out.put(buffer, 0, count); 157 remain -= count; 158 heapBytesRead += count; 159 } 160 } finally { 161 final Span span = Span.current(); 162 final AttributesBuilder attributesBuilder = builderFromContext(Context.current()); 163 annotateHeapBytesRead(attributesBuilder, heapBytesRead); 164 span.addEvent("BlockIOUtils.readFullyWithHeapBuffer", attributesBuilder.build()); 165 } 166 } 167 168 /** 169 * Read from an input stream at least <code>necessaryLen</code> and if possible, 170 * <code>extraLen</code> also if available. Analogous to 171 * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but specifies a number of "extra" 172 * bytes to also optionally read. 173 * @param in the input stream to read from 174 * @param buf the buffer to read into 175 * @param bufOffset the destination offset in the buffer 176 * @param necessaryLen the number of bytes that are absolutely necessary to read 177 * @param extraLen the number of extra bytes that would be nice to read 178 * @return true if succeeded reading the extra bytes 179 * @throws IOException if failed to read the necessary bytes 180 */ 181 private static boolean readWithExtraOnHeap(InputStream in, byte[] buf, int bufOffset, 182 int necessaryLen, int extraLen) throws IOException { 183 int heapBytesRead = 0; 184 int bytesRemaining = necessaryLen + extraLen; 185 try { 186 while (bytesRemaining > 0) { 187 int ret = in.read(buf, bufOffset, bytesRemaining); 188 if (ret < 0) { 189 if (bytesRemaining <= extraLen) { 190 // We could not read the "extra data", but that is OK. 191 break; 192 } 193 throw new IOException("Premature EOF from inputStream (read " + "returned " + ret 194 + ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen 195 + " extra bytes, " + "successfully read " + (necessaryLen + extraLen - bytesRemaining)); 196 } 197 bufOffset += ret; 198 bytesRemaining -= ret; 199 heapBytesRead += ret; 200 } 201 } finally { 202 final Span span = Span.current(); 203 final AttributesBuilder attributesBuilder = builderFromContext(Context.current()); 204 annotateHeapBytesRead(attributesBuilder, heapBytesRead); 205 span.addEvent("BlockIOUtils.readWithExtra", attributesBuilder.build()); 206 } 207 return bytesRemaining <= 0; 208 } 209 210 /** 211 * Read bytes into ByteBuffers directly, those buffers either contains the extraLen bytes or only 212 * contains necessaryLen bytes, which depends on how much bytes do the last time we read. 213 * @param buf the destination {@link ByteBuff}. 214 * @param dis input stream to read. 215 * @param necessaryLen bytes which we must read 216 * @param extraLen bytes which we may read 217 * @return if the returned flag is true, then we've finished to read the extraLen into our 218 * ByteBuffers, otherwise we've not read the extraLen bytes yet. 219 * @throws IOException if failed to read the necessary bytes. 220 */ 221 public static boolean readWithExtra(ByteBuff buf, FSDataInputStream dis, int necessaryLen, 222 int extraLen) throws IOException { 223 if (!isByteBufferReadable(dis)) { 224 // If InputStream does not support the ByteBuffer read, just read to heap and copy bytes to 225 // the destination ByteBuff. 226 byte[] heapBuf = new byte[necessaryLen + extraLen]; 227 boolean ret = readWithExtraOnHeap(dis, heapBuf, 0, necessaryLen, extraLen); 228 copyToByteBuff(heapBuf, 0, heapBuf.length, buf); 229 return ret; 230 } 231 int directBytesRead = 0, heapBytesRead = 0; 232 ByteBuffer[] buffers = buf.nioByteBuffers(); 233 int bytesRead = 0; 234 int remain = necessaryLen + extraLen; 235 int idx = 0; 236 ByteBuffer cur = buffers[idx]; 237 try { 238 while (bytesRead < necessaryLen) { 239 while (!cur.hasRemaining()) { 240 if (++idx >= buffers.length) { 241 throw new IOException( 242 "Not enough ByteBuffers to read the reminding " + remain + "bytes"); 243 } 244 cur = buffers[idx]; 245 } 246 cur.limit(cur.position() + Math.min(remain, cur.remaining())); 247 int ret = dis.read(cur); 248 if (ret < 0) { 249 throw new IOException("Premature EOF from inputStream (read returned " + ret 250 + ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen 251 + " extra bytes, successfully read " + bytesRead); 252 } 253 bytesRead += ret; 254 remain -= ret; 255 if (cur.isDirect()) { 256 directBytesRead += ret; 257 } else { 258 heapBytesRead += ret; 259 } 260 } 261 } finally { 262 final Span span = Span.current(); 263 final AttributesBuilder attributesBuilder = builderFromContext(Context.current()); 264 annotateBytesRead(attributesBuilder, directBytesRead, heapBytesRead); 265 span.addEvent("BlockIOUtils.readWithExtra", attributesBuilder.build()); 266 } 267 return (extraLen > 0) && (bytesRead == necessaryLen + extraLen); 268 } 269 270 /** 271 * Read from an input stream at least <code>necessaryLen</code> and if possible, 272 * <code>extraLen</code> also if available. Analogous to 273 * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but uses positional read and 274 * specifies a number of "extra" bytes that would be desirable but not absolutely necessary to 275 * read. If the input stream supports ByteBufferPositionedReadable, it reads to the byte buffer 276 * directly, and does not allocate a temporary byte array. 277 * @param buff ByteBuff to read into. 278 * @param dis the input stream to read from 279 * @param position the position within the stream from which to start reading 280 * @param necessaryLen the number of bytes that are absolutely necessary to read 281 * @param extraLen the number of extra bytes that would be nice to read 282 * @return true if and only if extraLen is > 0 and reading those extra bytes was successful 283 * @throws IOException if failed to read the necessary bytes 284 */ 285 public static boolean preadWithExtra(ByteBuff buff, FSDataInputStream dis, long position, 286 int necessaryLen, int extraLen) throws IOException { 287 return preadWithExtra(buff, dis, position, necessaryLen, extraLen, false); 288 } 289 290 /** 291 * Read from an input stream at least <code>necessaryLen</code> and if possible, 292 * <code>extraLen</code> also if available. Analogous to 293 * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but uses positional read and 294 * specifies a number of "extra" bytes that would be desirable but not absolutely necessary to 295 * read. If the input stream supports ByteBufferPositionedReadable, it reads to the byte buffer 296 * directly, and does not allocate a temporary byte array. 297 * @param buff ByteBuff to read into. 298 * @param dis the input stream to read from 299 * @param position the position within the stream from which to start reading 300 * @param necessaryLen the number of bytes that are absolutely necessary to read 301 * @param extraLen the number of extra bytes that would be nice to read 302 * @param readAllBytes whether we must read the necessaryLen and extraLen 303 * @return true if and only if extraLen is > 0 and reading those extra bytes was successful 304 * @throws IOException if failed to read the necessary bytes 305 */ 306 public static boolean preadWithExtra(ByteBuff buff, FSDataInputStream dis, long position, 307 int necessaryLen, int extraLen, boolean readAllBytes) throws IOException { 308 boolean preadbytebuffer = dis.hasCapability("in:preadbytebuffer"); 309 310 if (preadbytebuffer) { 311 return preadWithExtraDirectly(buff, dis, position, necessaryLen, extraLen, readAllBytes); 312 } else { 313 return preadWithExtraOnHeap(buff, dis, position, necessaryLen, extraLen, readAllBytes); 314 } 315 } 316 317 private static boolean preadWithExtraOnHeap(ByteBuff buff, FSDataInputStream dis, long position, 318 int necessaryLen, int extraLen, boolean readAllBytes) throws IOException { 319 int remain = necessaryLen + extraLen; 320 byte[] buf = new byte[remain]; 321 int bytesRead = 0; 322 int lengthMustRead = readAllBytes ? remain : necessaryLen; 323 try { 324 while (bytesRead < lengthMustRead) { 325 int ret = dis.read(position + bytesRead, buf, bytesRead, remain); 326 if (ret < 0) { 327 throw new IOException("Premature EOF from inputStream (positional read returned " + ret 328 + ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen 329 + " extra bytes, successfully read " + bytesRead); 330 } 331 bytesRead += ret; 332 remain -= ret; 333 } 334 } finally { 335 final Span span = Span.current(); 336 final AttributesBuilder attributesBuilder = builderFromContext(Context.current()); 337 annotateHeapBytesRead(attributesBuilder, bytesRead); 338 span.addEvent("BlockIOUtils.preadWithExtra", attributesBuilder.build()); 339 } 340 copyToByteBuff(buf, 0, bytesRead, buff); 341 return (extraLen > 0) && (bytesRead == necessaryLen + extraLen); 342 } 343 344 private static boolean preadWithExtraDirectly(ByteBuff buff, FSDataInputStream dis, long position, 345 int necessaryLen, int extraLen, boolean readAllBytes) throws IOException { 346 int directBytesRead = 0, heapBytesRead = 0; 347 int remain = necessaryLen + extraLen, bytesRead = 0, idx = 0; 348 ByteBuffer[] buffers = buff.nioByteBuffers(); 349 ByteBuffer cur = buffers[idx]; 350 int lengthMustRead = readAllBytes ? remain : necessaryLen; 351 try { 352 while (bytesRead < lengthMustRead) { 353 int ret; 354 while (!cur.hasRemaining()) { 355 if (++idx >= buffers.length) { 356 throw new IOException( 357 "Not enough ByteBuffers to read the reminding " + remain + "bytes"); 358 } 359 cur = buffers[idx]; 360 } 361 cur.limit(cur.position() + Math.min(remain, cur.remaining())); 362 try { 363 ret = (Integer) byteBufferPositionedReadMethod.invoke(dis, position + bytesRead, cur); 364 } catch (IllegalAccessException e) { 365 throw new IOException("Unable to invoke ByteBuffer positioned read when trying to read " 366 + bytesRead + " bytes from position " + position, e); 367 } catch (InvocationTargetException e) { 368 throw new IOException("Encountered an exception when invoking ByteBuffer positioned read" 369 + " when trying to read " + bytesRead + " bytes from position " + position, e); 370 } 371 if (ret < 0) { 372 throw new IOException("Premature EOF from inputStream (positional read returned " + ret 373 + ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen 374 + " extra bytes, successfully read " + bytesRead); 375 } 376 bytesRead += ret; 377 remain -= ret; 378 if (cur.isDirect()) { 379 directBytesRead += bytesRead; 380 } else { 381 heapBytesRead += bytesRead; 382 } 383 } 384 } finally { 385 final Span span = Span.current(); 386 final AttributesBuilder attributesBuilder = builderFromContext(Context.current()); 387 annotateBytesRead(attributesBuilder, directBytesRead, heapBytesRead); 388 span.addEvent("BlockIOUtils.preadWithExtra", attributesBuilder.build()); 389 } 390 391 return (extraLen > 0) && (bytesRead == necessaryLen + extraLen); 392 } 393 394 private static int copyToByteBuff(byte[] buf, int offset, int len, ByteBuff out) 395 throws IOException { 396 if (offset < 0 || len < 0 || offset + len > buf.length) { 397 throw new IOException("Invalid offset=" + offset + " and len=" + len + ", cap=" + buf.length); 398 } 399 ByteBuffer[] buffers = out.nioByteBuffers(); 400 int idx = 0, remain = len, copyLen; 401 ByteBuffer cur = buffers[idx]; 402 while (remain > 0) { 403 while (!cur.hasRemaining()) { 404 if (++idx >= buffers.length) { 405 throw new IOException("Not enough ByteBuffers to read the reminding " + remain + "bytes"); 406 } 407 cur = buffers[idx]; 408 } 409 copyLen = Math.min(cur.remaining(), remain); 410 cur.put(buf, offset, copyLen); 411 remain -= copyLen; 412 offset += copyLen; 413 } 414 return len; 415 } 416 417 /** 418 * Construct a fresh {@link AttributesBuilder} from the provided {@link Context}, populated with 419 * relevant attributes populated by {@link HFileContextAttributesBuilderConsumer#CONTEXT_KEY}. 420 */ 421 private static AttributesBuilder builderFromContext(Context context) { 422 final AttributesBuilder attributesBuilder = Attributes.builder(); 423 Optional.ofNullable(context) 424 .map(val -> val.get(HFileContextAttributesBuilderConsumer.CONTEXT_KEY)) 425 .ifPresent(c -> c.accept(attributesBuilder)); 426 return attributesBuilder; 427 } 428 429 /** 430 * Conditionally annotate {@code span} with the appropriate attribute when value is non-zero. 431 */ 432 private static void annotateHeapBytesRead(AttributesBuilder attributesBuilder, 433 int heapBytesRead) { 434 annotateBytesRead(attributesBuilder, 0, heapBytesRead); 435 } 436 437 /** 438 * Conditionally annotate {@code attributesBuilder} with appropriate attributes when values are 439 * non-zero. 440 */ 441 private static void annotateBytesRead(AttributesBuilder attributesBuilder, long directBytesRead, 442 long heapBytesRead) { 443 if (directBytesRead > 0) { 444 attributesBuilder.put(DIRECT_BYTES_READ_KEY, directBytesRead); 445 } 446 if (heapBytesRead > 0) { 447 attributesBuilder.put(HEAP_BYTES_READ_KEY, heapBytesRead); 448 } 449 } 450}