001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.util; 019 020import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DIRECT_BYTES_READ_KEY; 021import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.HEAP_BYTES_READ_KEY; 022 023import io.opentelemetry.api.common.Attributes; 024import io.opentelemetry.api.common.AttributesBuilder; 025import io.opentelemetry.api.trace.Span; 026import io.opentelemetry.context.Context; 027import java.io.IOException; 028import java.io.InputStream; 029import java.lang.reflect.InvocationTargetException; 030import java.lang.reflect.Method; 031import java.nio.ByteBuffer; 032import java.util.Optional; 033import org.apache.hadoop.fs.ByteBufferReadable; 034import org.apache.hadoop.fs.FSDataInputStream; 035import org.apache.hadoop.hbase.io.hfile.trace.HFileContextAttributesBuilderConsumer; 036import org.apache.hadoop.hbase.nio.ByteBuff; 037import org.apache.hadoop.io.IOUtils; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042@InterfaceAudience.Private 043public final class BlockIOUtils { 044 private static final Logger LOG = LoggerFactory.getLogger(BlockIOUtils.class); 045 // TODO: remove the reflection when we update to Hadoop 3.3 or above. 046 private static Method byteBufferPositionedReadMethod; 047 048 static { 049 initByteBufferPositionReadableMethod(); 050 } 051 052 // Disallow instantiation 053 private BlockIOUtils() { 054 055 } 056 057 private static void initByteBufferPositionReadableMethod() { 058 try { 059 // long position, ByteBuffer buf 060 byteBufferPositionedReadMethod = 061 FSDataInputStream.class.getMethod("read", long.class, ByteBuffer.class); 062 } catch (NoSuchMethodException e) { 063 LOG.debug("Unable to find positioned bytebuffer read API of FSDataInputStream. " 064 + "preadWithExtra() will use a temporary on-heap byte array."); 065 } 066 } 067 068 public static boolean isByteBufferReadable(FSDataInputStream is) { 069 InputStream cur = is.getWrappedStream(); 070 for (;;) { 071 if ((cur instanceof FSDataInputStream)) { 072 cur = ((FSDataInputStream) cur).getWrappedStream(); 073 } else { 074 break; 075 } 076 } 077 return cur instanceof ByteBufferReadable; 078 } 079 080 /** 081 * Read length bytes into ByteBuffers directly. 082 * @param buf the destination {@link ByteBuff} 083 * @param dis the HDFS input stream which implement the ByteBufferReadable interface. 084 * @param length bytes to read. 085 * @throws IOException exception to throw if any error happen 086 */ 087 public static void readFully(ByteBuff buf, FSDataInputStream dis, int length) throws IOException { 088 final Span span = Span.current(); 089 if (!isByteBufferReadable(dis)) { 090 // If InputStream does not support the ByteBuffer read, just read to heap and copy bytes to 091 // the destination ByteBuff. 092 byte[] heapBuf = new byte[length]; 093 IOUtils.readFully(dis, heapBuf, 0, length); 094 span.addEvent("BlockIOUtils.readFully", getHeapBytesReadAttributes(span, length)); 095 copyToByteBuff(heapBuf, 0, length, buf); 096 return; 097 } 098 int directBytesRead = 0, heapBytesRead = 0; 099 ByteBuffer[] buffers = buf.nioByteBuffers(); 100 int remain = length; 101 int idx = 0; 102 ByteBuffer cur = buffers[idx]; 103 try { 104 while (remain > 0) { 105 while (!cur.hasRemaining()) { 106 if (++idx >= buffers.length) { 107 throw new IOException( 108 "Not enough ByteBuffers to read the reminding " + remain + " " + "bytes"); 109 } 110 cur = buffers[idx]; 111 } 112 cur.limit(cur.position() + Math.min(remain, cur.remaining())); 113 int bytesRead = dis.read(cur); 114 if (bytesRead < 0) { 115 throw new IOException( 116 "Premature EOF from inputStream, but still need " + remain + " " + "bytes"); 117 } 118 remain -= bytesRead; 119 if (cur.isDirect()) { 120 directBytesRead += bytesRead; 121 } else { 122 heapBytesRead += bytesRead; 123 } 124 } 125 } finally { 126 span.addEvent("BlockIOUtils.readFully", 127 getDirectAndHeapBytesReadAttributes(span, directBytesRead, heapBytesRead)); 128 } 129 } 130 131 /** 132 * Copying bytes from InputStream to {@link ByteBuff} by using an temporary heap byte[] (default 133 * size is 1024 now). 134 * @param in the InputStream to read 135 * @param out the destination {@link ByteBuff} 136 * @param length to read 137 * @throws IOException if any io error encountered. 138 */ 139 public static void readFullyWithHeapBuffer(InputStream in, ByteBuff out, int length) 140 throws IOException { 141 if (length < 0) { 142 throw new IllegalArgumentException("Length must not be negative: " + length); 143 } 144 int heapBytesRead = 0; 145 int remain = length, count; 146 byte[] buffer = new byte[1024]; 147 try { 148 while (remain > 0) { 149 count = in.read(buffer, 0, Math.min(remain, buffer.length)); 150 if (count < 0) { 151 throw new IOException( 152 "Premature EOF from inputStream, but still need " + remain + " bytes"); 153 } 154 out.put(buffer, 0, count); 155 remain -= count; 156 heapBytesRead += count; 157 } 158 } finally { 159 final Span span = Span.current(); 160 span.addEvent("BlockIOUtils.readFullyWithHeapBuffer", 161 getHeapBytesReadAttributes(span, heapBytesRead)); 162 } 163 } 164 165 /** 166 * Read from an input stream at least <code>necessaryLen</code> and if possible, 167 * <code>extraLen</code> also if available. Analogous to 168 * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but specifies a number of "extra" 169 * bytes to also optionally read. 170 * @param in the input stream to read from 171 * @param buf the buffer to read into 172 * @param bufOffset the destination offset in the buffer 173 * @param necessaryLen the number of bytes that are absolutely necessary to read 174 * @param extraLen the number of extra bytes that would be nice to read 175 * @return true if succeeded reading the extra bytes 176 * @throws IOException if failed to read the necessary bytes 177 */ 178 private static boolean readWithExtraOnHeap(InputStream in, byte[] buf, int bufOffset, 179 int necessaryLen, int extraLen) throws IOException { 180 int heapBytesRead = 0; 181 int bytesRemaining = necessaryLen + extraLen; 182 try { 183 while (bytesRemaining > 0) { 184 int ret = in.read(buf, bufOffset, bytesRemaining); 185 if (ret < 0) { 186 if (bytesRemaining <= extraLen) { 187 // We could not read the "extra data", but that is OK. 188 break; 189 } 190 throw new IOException("Premature EOF from inputStream (read " + "returned " + ret 191 + ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen 192 + " extra bytes, " + "successfully read " + (necessaryLen + extraLen - bytesRemaining)); 193 } 194 bufOffset += ret; 195 bytesRemaining -= ret; 196 heapBytesRead += ret; 197 } 198 } finally { 199 final Span span = Span.current(); 200 span.addEvent("BlockIOUtils.readWithExtra", getHeapBytesReadAttributes(span, heapBytesRead)); 201 } 202 return bytesRemaining <= 0; 203 } 204 205 /** 206 * Read bytes into ByteBuffers directly, those buffers either contains the extraLen bytes or only 207 * contains necessaryLen bytes, which depends on how much bytes do the last time we read. 208 * @param buf the destination {@link ByteBuff}. 209 * @param dis input stream to read. 210 * @param necessaryLen bytes which we must read 211 * @param extraLen bytes which we may read 212 * @return if the returned flag is true, then we've finished to read the extraLen into our 213 * ByteBuffers, otherwise we've not read the extraLen bytes yet. 214 * @throws IOException if failed to read the necessary bytes. 215 */ 216 public static boolean readWithExtra(ByteBuff buf, FSDataInputStream dis, int necessaryLen, 217 int extraLen) throws IOException { 218 if (!isByteBufferReadable(dis)) { 219 // If InputStream does not support the ByteBuffer read, just read to heap and copy bytes to 220 // the destination ByteBuff. 221 byte[] heapBuf = new byte[necessaryLen + extraLen]; 222 boolean ret = readWithExtraOnHeap(dis, heapBuf, 0, necessaryLen, extraLen); 223 copyToByteBuff(heapBuf, 0, heapBuf.length, buf); 224 return ret; 225 } 226 int directBytesRead = 0, heapBytesRead = 0; 227 ByteBuffer[] buffers = buf.nioByteBuffers(); 228 int bytesRead = 0; 229 int remain = necessaryLen + extraLen; 230 int idx = 0; 231 ByteBuffer cur = buffers[idx]; 232 try { 233 while (bytesRead < necessaryLen) { 234 while (!cur.hasRemaining()) { 235 if (++idx >= buffers.length) { 236 throw new IOException( 237 "Not enough ByteBuffers to read the reminding " + remain + "bytes"); 238 } 239 cur = buffers[idx]; 240 } 241 cur.limit(cur.position() + Math.min(remain, cur.remaining())); 242 int ret = dis.read(cur); 243 if (ret < 0) { 244 throw new IOException("Premature EOF from inputStream (read returned " + ret 245 + ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen 246 + " extra bytes, successfully read " + bytesRead); 247 } 248 bytesRead += ret; 249 remain -= ret; 250 if (cur.isDirect()) { 251 directBytesRead += ret; 252 } else { 253 heapBytesRead += ret; 254 } 255 } 256 } finally { 257 final Span span = Span.current(); 258 span.addEvent("BlockIOUtils.readWithExtra", 259 getDirectAndHeapBytesReadAttributes(span, directBytesRead, heapBytesRead)); 260 } 261 return (extraLen > 0) && (bytesRead == necessaryLen + extraLen); 262 } 263 264 /** 265 * Read from an input stream at least <code>necessaryLen</code> and if possible, 266 * <code>extraLen</code> also if available. Analogous to 267 * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but uses positional read and 268 * specifies a number of "extra" bytes that would be desirable but not absolutely necessary to 269 * read. If the input stream supports ByteBufferPositionedReadable, it reads to the byte buffer 270 * directly, and does not allocate a temporary byte array. 271 * @param buff ByteBuff to read into. 272 * @param dis the input stream to read from 273 * @param position the position within the stream from which to start reading 274 * @param necessaryLen the number of bytes that are absolutely necessary to read 275 * @param extraLen the number of extra bytes that would be nice to read 276 * @return true if and only if extraLen is > 0 and reading those extra bytes was successful 277 * @throws IOException if failed to read the necessary bytes 278 */ 279 public static boolean preadWithExtra(ByteBuff buff, FSDataInputStream dis, long position, 280 int necessaryLen, int extraLen) throws IOException { 281 return preadWithExtra(buff, dis, position, necessaryLen, extraLen, false); 282 } 283 284 /** 285 * Read from an input stream at least <code>necessaryLen</code> and if possible, 286 * <code>extraLen</code> also if available. Analogous to 287 * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but uses positional read and 288 * specifies a number of "extra" bytes that would be desirable but not absolutely necessary to 289 * read. If the input stream supports ByteBufferPositionedReadable, it reads to the byte buffer 290 * directly, and does not allocate a temporary byte array. 291 * @param buff ByteBuff to read into. 292 * @param dis the input stream to read from 293 * @param position the position within the stream from which to start reading 294 * @param necessaryLen the number of bytes that are absolutely necessary to read 295 * @param extraLen the number of extra bytes that would be nice to read 296 * @param readAllBytes whether we must read the necessaryLen and extraLen 297 * @return true if and only if extraLen is > 0 and reading those extra bytes was successful 298 * @throws IOException if failed to read the necessary bytes 299 */ 300 public static boolean preadWithExtra(ByteBuff buff, FSDataInputStream dis, long position, 301 int necessaryLen, int extraLen, boolean readAllBytes) throws IOException { 302 boolean preadbytebuffer = dis.hasCapability("in:preadbytebuffer"); 303 304 if (preadbytebuffer) { 305 return preadWithExtraDirectly(buff, dis, position, necessaryLen, extraLen, readAllBytes); 306 } else { 307 return preadWithExtraOnHeap(buff, dis, position, necessaryLen, extraLen, readAllBytes); 308 } 309 } 310 311 private static boolean preadWithExtraOnHeap(ByteBuff buff, FSDataInputStream dis, long position, 312 int necessaryLen, int extraLen, boolean readAllBytes) throws IOException { 313 int remain = necessaryLen + extraLen; 314 byte[] buf = new byte[remain]; 315 int bytesRead = 0; 316 int lengthMustRead = readAllBytes ? remain : necessaryLen; 317 try { 318 while (bytesRead < lengthMustRead) { 319 int ret = dis.read(position + bytesRead, buf, bytesRead, remain); 320 if (ret < 0) { 321 throw new IOException("Premature EOF from inputStream (positional read returned " + ret 322 + ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen 323 + " extra bytes, successfully read " + bytesRead); 324 } 325 bytesRead += ret; 326 remain -= ret; 327 } 328 } finally { 329 final Span span = Span.current(); 330 span.addEvent("BlockIOUtils.preadWithExtra", getHeapBytesReadAttributes(span, bytesRead)); 331 } 332 copyToByteBuff(buf, 0, bytesRead, buff); 333 return (extraLen > 0) && (bytesRead == necessaryLen + extraLen); 334 } 335 336 private static boolean preadWithExtraDirectly(ByteBuff buff, FSDataInputStream dis, long position, 337 int necessaryLen, int extraLen, boolean readAllBytes) throws IOException { 338 int directBytesRead = 0, heapBytesRead = 0; 339 int remain = necessaryLen + extraLen, bytesRead = 0, idx = 0; 340 ByteBuffer[] buffers = buff.nioByteBuffers(); 341 ByteBuffer cur = buffers[idx]; 342 int lengthMustRead = readAllBytes ? remain : necessaryLen; 343 try { 344 while (bytesRead < lengthMustRead) { 345 int ret; 346 while (!cur.hasRemaining()) { 347 if (++idx >= buffers.length) { 348 throw new IOException( 349 "Not enough ByteBuffers to read the reminding " + remain + "bytes"); 350 } 351 cur = buffers[idx]; 352 } 353 cur.limit(cur.position() + Math.min(remain, cur.remaining())); 354 try { 355 ret = (Integer) byteBufferPositionedReadMethod.invoke(dis, position + bytesRead, cur); 356 } catch (IllegalAccessException e) { 357 throw new IOException("Unable to invoke ByteBuffer positioned read when trying to read " 358 + bytesRead + " bytes from position " + position, e); 359 } catch (InvocationTargetException e) { 360 throw new IOException("Encountered an exception when invoking ByteBuffer positioned read" 361 + " when trying to read " + bytesRead + " bytes from position " + position, e); 362 } 363 if (ret < 0) { 364 throw new IOException("Premature EOF from inputStream (positional read returned " + ret 365 + ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen 366 + " extra bytes, successfully read " + bytesRead); 367 } 368 bytesRead += ret; 369 remain -= ret; 370 if (cur.isDirect()) { 371 directBytesRead += bytesRead; 372 } else { 373 heapBytesRead += bytesRead; 374 } 375 } 376 } finally { 377 final Span span = Span.current(); 378 span.addEvent("BlockIOUtils.preadWithExtra", 379 getDirectAndHeapBytesReadAttributes(span, directBytesRead, heapBytesRead)); 380 } 381 382 return (extraLen > 0) && (bytesRead == necessaryLen + extraLen); 383 } 384 385 private static int copyToByteBuff(byte[] buf, int offset, int len, ByteBuff out) 386 throws IOException { 387 if (offset < 0 || len < 0 || offset + len > buf.length) { 388 throw new IOException("Invalid offset=" + offset + " and len=" + len + ", cap=" + buf.length); 389 } 390 ByteBuffer[] buffers = out.nioByteBuffers(); 391 int idx = 0, remain = len, copyLen; 392 ByteBuffer cur = buffers[idx]; 393 while (remain > 0) { 394 while (!cur.hasRemaining()) { 395 if (++idx >= buffers.length) { 396 throw new IOException("Not enough ByteBuffers to read the reminding " + remain + "bytes"); 397 } 398 cur = buffers[idx]; 399 } 400 copyLen = Math.min(cur.remaining(), remain); 401 cur.put(buf, offset, copyLen); 402 remain -= copyLen; 403 offset += copyLen; 404 } 405 return len; 406 } 407 408 /** 409 * Builds OpenTelemetry attributes to be recorded on a span for an event which reads direct and 410 * heap bytes. This will short-circuit and record nothing if OpenTelemetry isn't enabled. 411 */ 412 private static Attributes getDirectAndHeapBytesReadAttributes(Span span, int directBytesRead, 413 int heapBytesRead) { 414 // It's expensive to record these attributes, so we avoid the cost of doing this if the span 415 // isn't going to be persisted 416 if (!span.isRecording()) { 417 return Attributes.empty(); 418 } 419 420 final AttributesBuilder attributesBuilder = builderFromContext(Context.current()); 421 annotateBytesRead(attributesBuilder, directBytesRead, heapBytesRead); 422 423 return attributesBuilder.build(); 424 } 425 426 /** 427 * Builds OpenTelemtry attributes to be recorded on a span for an event which reads just heap 428 * bytes. This will short-circuit and record nothing if OpenTelemetry isn't enabled. 429 */ 430 private static Attributes getHeapBytesReadAttributes(Span span, int heapBytesRead) { 431 // It's expensive to record these attributes, so we avoid the cost of doing this if the span 432 // isn't going to be persisted 433 if (!span.isRecording()) { 434 return Attributes.empty(); 435 } 436 437 final AttributesBuilder attributesBuilder = builderFromContext(Context.current()); 438 annotateHeapBytesRead(attributesBuilder, heapBytesRead); 439 440 return attributesBuilder.build(); 441 } 442 443 /** 444 * Construct a fresh {@link AttributesBuilder} from the provided {@link Context}, populated with 445 * relevant attributes populated by {@link HFileContextAttributesBuilderConsumer#CONTEXT_KEY}. 446 */ 447 private static AttributesBuilder builderFromContext(Context context) { 448 final AttributesBuilder attributesBuilder = Attributes.builder(); 449 Optional.ofNullable(context) 450 .map(val -> val.get(HFileContextAttributesBuilderConsumer.CONTEXT_KEY)) 451 .ifPresent(c -> c.accept(attributesBuilder)); 452 return attributesBuilder; 453 } 454 455 /** 456 * Conditionally annotate {@code span} with the appropriate attribute when value is non-zero. 457 */ 458 private static void annotateHeapBytesRead(AttributesBuilder attributesBuilder, 459 int heapBytesRead) { 460 annotateBytesRead(attributesBuilder, 0, heapBytesRead); 461 } 462 463 /** 464 * Conditionally annotate {@code attributesBuilder} with appropriate attributes when values are 465 * non-zero. 466 */ 467 private static void annotateBytesRead(AttributesBuilder attributesBuilder, int directBytesRead, 468 int heapBytesRead) { 469 if (directBytesRead > 0) { 470 attributesBuilder.put(DIRECT_BYTES_READ_KEY, directBytesRead); 471 } 472 if (heapBytesRead > 0) { 473 attributesBuilder.put(HEAP_BYTES_READ_KEY, heapBytesRead); 474 } 475 } 476}