001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.util;
019
020import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DIRECT_BYTES_READ_KEY;
021import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.HEAP_BYTES_READ_KEY;
022
023import io.opentelemetry.api.common.Attributes;
024import io.opentelemetry.api.common.AttributesBuilder;
025import io.opentelemetry.api.trace.Span;
026import io.opentelemetry.context.Context;
027import java.io.IOException;
028import java.io.InputStream;
029import java.lang.reflect.InvocationTargetException;
030import java.lang.reflect.Method;
031import java.nio.ByteBuffer;
032import java.util.Optional;
033import org.apache.hadoop.fs.ByteBufferReadable;
034import org.apache.hadoop.fs.FSDataInputStream;
035import org.apache.hadoop.hbase.io.hfile.trace.HFileContextAttributesBuilderConsumer;
036import org.apache.hadoop.hbase.nio.ByteBuff;
037import org.apache.hadoop.io.IOUtils;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042@InterfaceAudience.Private
043public final class BlockIOUtils {
044  private static final Logger LOG = LoggerFactory.getLogger(BlockIOUtils.class);
045  // TODO: remove the reflection when we update to Hadoop 3.3 or above.
046  private static Method byteBufferPositionedReadMethod;
047
048  static {
049    initByteBufferPositionReadableMethod();
050  }
051
052  // Disallow instantiation
053  private BlockIOUtils() {
054
055  }
056
057  private static void initByteBufferPositionReadableMethod() {
058    try {
059      // long position, ByteBuffer buf
060      byteBufferPositionedReadMethod =
061        FSDataInputStream.class.getMethod("read", long.class, ByteBuffer.class);
062    } catch (NoSuchMethodException e) {
063      LOG.debug("Unable to find positioned bytebuffer read API of FSDataInputStream. "
064        + "preadWithExtra() will use a temporary on-heap byte array.");
065    }
066  }
067
068  public static boolean isByteBufferReadable(FSDataInputStream is) {
069    InputStream cur = is.getWrappedStream();
070    for (;;) {
071      if ((cur instanceof FSDataInputStream)) {
072        cur = ((FSDataInputStream) cur).getWrappedStream();
073      } else {
074        break;
075      }
076    }
077    return cur instanceof ByteBufferReadable;
078  }
079
080  /**
081   * Read length bytes into ByteBuffers directly.
082   * @param buf    the destination {@link ByteBuff}
083   * @param dis    the HDFS input stream which implement the ByteBufferReadable interface.
084   * @param length bytes to read.
085   * @throws IOException exception to throw if any error happen
086   */
087  public static void readFully(ByteBuff buf, FSDataInputStream dis, int length) throws IOException {
088    final Span span = Span.current();
089    if (!isByteBufferReadable(dis)) {
090      // If InputStream does not support the ByteBuffer read, just read to heap and copy bytes to
091      // the destination ByteBuff.
092      byte[] heapBuf = new byte[length];
093      IOUtils.readFully(dis, heapBuf, 0, length);
094      span.addEvent("BlockIOUtils.readFully", getHeapBytesReadAttributes(span, length));
095      copyToByteBuff(heapBuf, 0, length, buf);
096      return;
097    }
098    int directBytesRead = 0, heapBytesRead = 0;
099    ByteBuffer[] buffers = buf.nioByteBuffers();
100    int remain = length;
101    int idx = 0;
102    ByteBuffer cur = buffers[idx];
103    try {
104      while (remain > 0) {
105        while (!cur.hasRemaining()) {
106          if (++idx >= buffers.length) {
107            throw new IOException(
108              "Not enough ByteBuffers to read the reminding " + remain + " " + "bytes");
109          }
110          cur = buffers[idx];
111        }
112        cur.limit(cur.position() + Math.min(remain, cur.remaining()));
113        int bytesRead = dis.read(cur);
114        if (bytesRead < 0) {
115          throw new IOException(
116            "Premature EOF from inputStream, but still need " + remain + " " + "bytes");
117        }
118        remain -= bytesRead;
119        if (cur.isDirect()) {
120          directBytesRead += bytesRead;
121        } else {
122          heapBytesRead += bytesRead;
123        }
124      }
125    } finally {
126      span.addEvent("BlockIOUtils.readFully",
127        getDirectAndHeapBytesReadAttributes(span, directBytesRead, heapBytesRead));
128    }
129  }
130
131  /**
132   * Copying bytes from InputStream to {@link ByteBuff} by using an temporary heap byte[] (default
133   * size is 1024 now).
134   * @param in     the InputStream to read
135   * @param out    the destination {@link ByteBuff}
136   * @param length to read
137   * @throws IOException if any io error encountered.
138   */
139  public static void readFullyWithHeapBuffer(InputStream in, ByteBuff out, int length)
140    throws IOException {
141    if (length < 0) {
142      throw new IllegalArgumentException("Length must not be negative: " + length);
143    }
144    int heapBytesRead = 0;
145    int remain = length, count;
146    byte[] buffer = new byte[1024];
147    try {
148      while (remain > 0) {
149        count = in.read(buffer, 0, Math.min(remain, buffer.length));
150        if (count < 0) {
151          throw new IOException(
152            "Premature EOF from inputStream, but still need " + remain + " bytes");
153        }
154        out.put(buffer, 0, count);
155        remain -= count;
156        heapBytesRead += count;
157      }
158    } finally {
159      final Span span = Span.current();
160      span.addEvent("BlockIOUtils.readFullyWithHeapBuffer",
161        getHeapBytesReadAttributes(span, heapBytesRead));
162    }
163  }
164
165  /**
166   * Read from an input stream at least <code>necessaryLen</code> and if possible,
167   * <code>extraLen</code> also if available. Analogous to
168   * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but specifies a number of "extra"
169   * bytes to also optionally read.
170   * @param in           the input stream to read from
171   * @param buf          the buffer to read into
172   * @param bufOffset    the destination offset in the buffer
173   * @param necessaryLen the number of bytes that are absolutely necessary to read
174   * @param extraLen     the number of extra bytes that would be nice to read
175   * @return true if succeeded reading the extra bytes
176   * @throws IOException if failed to read the necessary bytes
177   */
178  private static boolean readWithExtraOnHeap(InputStream in, byte[] buf, int bufOffset,
179    int necessaryLen, int extraLen) throws IOException {
180    int heapBytesRead = 0;
181    int bytesRemaining = necessaryLen + extraLen;
182    try {
183      while (bytesRemaining > 0) {
184        int ret = in.read(buf, bufOffset, bytesRemaining);
185        if (ret < 0) {
186          if (bytesRemaining <= extraLen) {
187            // We could not read the "extra data", but that is OK.
188            break;
189          }
190          throw new IOException("Premature EOF from inputStream (read " + "returned " + ret
191            + ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen
192            + " extra bytes, " + "successfully read " + (necessaryLen + extraLen - bytesRemaining));
193        }
194        bufOffset += ret;
195        bytesRemaining -= ret;
196        heapBytesRead += ret;
197      }
198    } finally {
199      final Span span = Span.current();
200      span.addEvent("BlockIOUtils.readWithExtra", getHeapBytesReadAttributes(span, heapBytesRead));
201    }
202    return bytesRemaining <= 0;
203  }
204
205  /**
206   * Read bytes into ByteBuffers directly, those buffers either contains the extraLen bytes or only
207   * contains necessaryLen bytes, which depends on how much bytes do the last time we read.
208   * @param buf          the destination {@link ByteBuff}.
209   * @param dis          input stream to read.
210   * @param necessaryLen bytes which we must read
211   * @param extraLen     bytes which we may read
212   * @return if the returned flag is true, then we've finished to read the extraLen into our
213   *         ByteBuffers, otherwise we've not read the extraLen bytes yet.
214   * @throws IOException if failed to read the necessary bytes.
215   */
216  public static boolean readWithExtra(ByteBuff buf, FSDataInputStream dis, int necessaryLen,
217    int extraLen) throws IOException {
218    if (!isByteBufferReadable(dis)) {
219      // If InputStream does not support the ByteBuffer read, just read to heap and copy bytes to
220      // the destination ByteBuff.
221      byte[] heapBuf = new byte[necessaryLen + extraLen];
222      boolean ret = readWithExtraOnHeap(dis, heapBuf, 0, necessaryLen, extraLen);
223      copyToByteBuff(heapBuf, 0, heapBuf.length, buf);
224      return ret;
225    }
226    int directBytesRead = 0, heapBytesRead = 0;
227    ByteBuffer[] buffers = buf.nioByteBuffers();
228    int bytesRead = 0;
229    int remain = necessaryLen + extraLen;
230    int idx = 0;
231    ByteBuffer cur = buffers[idx];
232    try {
233      while (bytesRead < necessaryLen) {
234        while (!cur.hasRemaining()) {
235          if (++idx >= buffers.length) {
236            throw new IOException(
237              "Not enough ByteBuffers to read the reminding " + remain + "bytes");
238          }
239          cur = buffers[idx];
240        }
241        cur.limit(cur.position() + Math.min(remain, cur.remaining()));
242        int ret = dis.read(cur);
243        if (ret < 0) {
244          throw new IOException("Premature EOF from inputStream (read returned " + ret
245            + ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen
246            + " extra bytes, successfully read " + bytesRead);
247        }
248        bytesRead += ret;
249        remain -= ret;
250        if (cur.isDirect()) {
251          directBytesRead += ret;
252        } else {
253          heapBytesRead += ret;
254        }
255      }
256    } finally {
257      final Span span = Span.current();
258      span.addEvent("BlockIOUtils.readWithExtra",
259        getDirectAndHeapBytesReadAttributes(span, directBytesRead, heapBytesRead));
260    }
261    return (extraLen > 0) && (bytesRead == necessaryLen + extraLen);
262  }
263
264  /**
265   * Read from an input stream at least <code>necessaryLen</code> and if possible,
266   * <code>extraLen</code> also if available. Analogous to
267   * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but uses positional read and
268   * specifies a number of "extra" bytes that would be desirable but not absolutely necessary to
269   * read. If the input stream supports ByteBufferPositionedReadable, it reads to the byte buffer
270   * directly, and does not allocate a temporary byte array.
271   * @param buff         ByteBuff to read into.
272   * @param dis          the input stream to read from
273   * @param position     the position within the stream from which to start reading
274   * @param necessaryLen the number of bytes that are absolutely necessary to read
275   * @param extraLen     the number of extra bytes that would be nice to read
276   * @return true if and only if extraLen is > 0 and reading those extra bytes was successful
277   * @throws IOException if failed to read the necessary bytes
278   */
279  public static boolean preadWithExtra(ByteBuff buff, FSDataInputStream dis, long position,
280    int necessaryLen, int extraLen) throws IOException {
281    return preadWithExtra(buff, dis, position, necessaryLen, extraLen, false);
282  }
283
284  /**
285   * Read from an input stream at least <code>necessaryLen</code> and if possible,
286   * <code>extraLen</code> also if available. Analogous to
287   * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but uses positional read and
288   * specifies a number of "extra" bytes that would be desirable but not absolutely necessary to
289   * read. If the input stream supports ByteBufferPositionedReadable, it reads to the byte buffer
290   * directly, and does not allocate a temporary byte array.
291   * @param buff         ByteBuff to read into.
292   * @param dis          the input stream to read from
293   * @param position     the position within the stream from which to start reading
294   * @param necessaryLen the number of bytes that are absolutely necessary to read
295   * @param extraLen     the number of extra bytes that would be nice to read
296   * @param readAllBytes whether we must read the necessaryLen and extraLen
297   * @return true if and only if extraLen is > 0 and reading those extra bytes was successful
298   * @throws IOException if failed to read the necessary bytes
299   */
300  public static boolean preadWithExtra(ByteBuff buff, FSDataInputStream dis, long position,
301    int necessaryLen, int extraLen, boolean readAllBytes) throws IOException {
302    boolean preadbytebuffer = dis.hasCapability("in:preadbytebuffer");
303
304    if (preadbytebuffer) {
305      return preadWithExtraDirectly(buff, dis, position, necessaryLen, extraLen, readAllBytes);
306    } else {
307      return preadWithExtraOnHeap(buff, dis, position, necessaryLen, extraLen, readAllBytes);
308    }
309  }
310
311  private static boolean preadWithExtraOnHeap(ByteBuff buff, FSDataInputStream dis, long position,
312    int necessaryLen, int extraLen, boolean readAllBytes) throws IOException {
313    int remain = necessaryLen + extraLen;
314    byte[] buf = new byte[remain];
315    int bytesRead = 0;
316    int lengthMustRead = readAllBytes ? remain : necessaryLen;
317    try {
318      while (bytesRead < lengthMustRead) {
319        int ret = dis.read(position + bytesRead, buf, bytesRead, remain);
320        if (ret < 0) {
321          throw new IOException("Premature EOF from inputStream (positional read returned " + ret
322            + ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen
323            + " extra bytes, successfully read " + bytesRead);
324        }
325        bytesRead += ret;
326        remain -= ret;
327      }
328    } finally {
329      final Span span = Span.current();
330      span.addEvent("BlockIOUtils.preadWithExtra", getHeapBytesReadAttributes(span, bytesRead));
331    }
332    copyToByteBuff(buf, 0, bytesRead, buff);
333    return (extraLen > 0) && (bytesRead == necessaryLen + extraLen);
334  }
335
336  private static boolean preadWithExtraDirectly(ByteBuff buff, FSDataInputStream dis, long position,
337    int necessaryLen, int extraLen, boolean readAllBytes) throws IOException {
338    int directBytesRead = 0, heapBytesRead = 0;
339    int remain = necessaryLen + extraLen, bytesRead = 0, idx = 0;
340    ByteBuffer[] buffers = buff.nioByteBuffers();
341    ByteBuffer cur = buffers[idx];
342    int lengthMustRead = readAllBytes ? remain : necessaryLen;
343    try {
344      while (bytesRead < lengthMustRead) {
345        int ret;
346        while (!cur.hasRemaining()) {
347          if (++idx >= buffers.length) {
348            throw new IOException(
349              "Not enough ByteBuffers to read the reminding " + remain + "bytes");
350          }
351          cur = buffers[idx];
352        }
353        cur.limit(cur.position() + Math.min(remain, cur.remaining()));
354        try {
355          ret = (Integer) byteBufferPositionedReadMethod.invoke(dis, position + bytesRead, cur);
356        } catch (IllegalAccessException e) {
357          throw new IOException("Unable to invoke ByteBuffer positioned read when trying to read "
358            + bytesRead + " bytes from position " + position, e);
359        } catch (InvocationTargetException e) {
360          throw new IOException("Encountered an exception when invoking ByteBuffer positioned read"
361            + " when trying to read " + bytesRead + " bytes from position " + position, e);
362        }
363        if (ret < 0) {
364          throw new IOException("Premature EOF from inputStream (positional read returned " + ret
365            + ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen
366            + " extra bytes, successfully read " + bytesRead);
367        }
368        bytesRead += ret;
369        remain -= ret;
370        if (cur.isDirect()) {
371          directBytesRead += bytesRead;
372        } else {
373          heapBytesRead += bytesRead;
374        }
375      }
376    } finally {
377      final Span span = Span.current();
378      span.addEvent("BlockIOUtils.preadWithExtra",
379        getDirectAndHeapBytesReadAttributes(span, directBytesRead, heapBytesRead));
380    }
381
382    return (extraLen > 0) && (bytesRead == necessaryLen + extraLen);
383  }
384
385  private static int copyToByteBuff(byte[] buf, int offset, int len, ByteBuff out)
386    throws IOException {
387    if (offset < 0 || len < 0 || offset + len > buf.length) {
388      throw new IOException("Invalid offset=" + offset + " and len=" + len + ", cap=" + buf.length);
389    }
390    ByteBuffer[] buffers = out.nioByteBuffers();
391    int idx = 0, remain = len, copyLen;
392    ByteBuffer cur = buffers[idx];
393    while (remain > 0) {
394      while (!cur.hasRemaining()) {
395        if (++idx >= buffers.length) {
396          throw new IOException("Not enough ByteBuffers to read the reminding " + remain + "bytes");
397        }
398        cur = buffers[idx];
399      }
400      copyLen = Math.min(cur.remaining(), remain);
401      cur.put(buf, offset, copyLen);
402      remain -= copyLen;
403      offset += copyLen;
404    }
405    return len;
406  }
407
408  /**
409   * Builds OpenTelemetry attributes to be recorded on a span for an event which reads direct and
410   * heap bytes. This will short-circuit and record nothing if OpenTelemetry isn't enabled.
411   */
412  private static Attributes getDirectAndHeapBytesReadAttributes(Span span, int directBytesRead,
413    int heapBytesRead) {
414    // It's expensive to record these attributes, so we avoid the cost of doing this if the span
415    // isn't going to be persisted
416    if (!span.isRecording()) {
417      return Attributes.empty();
418    }
419
420    final AttributesBuilder attributesBuilder = builderFromContext(Context.current());
421    annotateBytesRead(attributesBuilder, directBytesRead, heapBytesRead);
422
423    return attributesBuilder.build();
424  }
425
426  /**
427   * Builds OpenTelemtry attributes to be recorded on a span for an event which reads just heap
428   * bytes. This will short-circuit and record nothing if OpenTelemetry isn't enabled.
429   */
430  private static Attributes getHeapBytesReadAttributes(Span span, int heapBytesRead) {
431    // It's expensive to record these attributes, so we avoid the cost of doing this if the span
432    // isn't going to be persisted
433    if (!span.isRecording()) {
434      return Attributes.empty();
435    }
436
437    final AttributesBuilder attributesBuilder = builderFromContext(Context.current());
438    annotateHeapBytesRead(attributesBuilder, heapBytesRead);
439
440    return attributesBuilder.build();
441  }
442
443  /**
444   * Construct a fresh {@link AttributesBuilder} from the provided {@link Context}, populated with
445   * relevant attributes populated by {@link HFileContextAttributesBuilderConsumer#CONTEXT_KEY}.
446   */
447  private static AttributesBuilder builderFromContext(Context context) {
448    final AttributesBuilder attributesBuilder = Attributes.builder();
449    Optional.ofNullable(context)
450      .map(val -> val.get(HFileContextAttributesBuilderConsumer.CONTEXT_KEY))
451      .ifPresent(c -> c.accept(attributesBuilder));
452    return attributesBuilder;
453  }
454
455  /**
456   * Conditionally annotate {@code span} with the appropriate attribute when value is non-zero.
457   */
458  private static void annotateHeapBytesRead(AttributesBuilder attributesBuilder,
459    int heapBytesRead) {
460    annotateBytesRead(attributesBuilder, 0, heapBytesRead);
461  }
462
463  /**
464   * Conditionally annotate {@code attributesBuilder} with appropriate attributes when values are
465   * non-zero.
466   */
467  private static void annotateBytesRead(AttributesBuilder attributesBuilder, int directBytesRead,
468    int heapBytesRead) {
469    if (directBytesRead > 0) {
470      attributesBuilder.put(DIRECT_BYTES_READ_KEY, directBytesRead);
471    }
472    if (heapBytesRead > 0) {
473      attributesBuilder.put(HEAP_BYTES_READ_KEY, heapBytesRead);
474    }
475  }
476}