001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.util;
019
020import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DIRECT_BYTES_READ_KEY;
021import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.HEAP_BYTES_READ_KEY;
022
023import io.opentelemetry.api.common.Attributes;
024import io.opentelemetry.api.common.AttributesBuilder;
025import io.opentelemetry.api.trace.Span;
026import io.opentelemetry.context.Context;
027import java.io.IOException;
028import java.io.InputStream;
029import java.lang.reflect.InvocationTargetException;
030import java.lang.reflect.Method;
031import java.nio.ByteBuffer;
032import java.util.Optional;
033import org.apache.hadoop.fs.ByteBufferReadable;
034import org.apache.hadoop.fs.FSDataInputStream;
035import org.apache.hadoop.hbase.io.hfile.trace.HFileContextAttributesBuilderConsumer;
036import org.apache.hadoop.hbase.nio.ByteBuff;
037import org.apache.hadoop.io.IOUtils;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042@InterfaceAudience.Private
043public final class BlockIOUtils {
044  private static final Logger LOG = LoggerFactory.getLogger(BlockIOUtils.class);
045  // TODO: remove the reflection when we update to Hadoop 3.3 or above.
046  private static Method byteBufferPositionedReadMethod;
047
048  static {
049    initByteBufferPositionReadableMethod();
050  }
051
052  // Disallow instantiation
053  private BlockIOUtils() {
054
055  }
056
057  private static void initByteBufferPositionReadableMethod() {
058    try {
059      // long position, ByteBuffer buf
060      byteBufferPositionedReadMethod =
061        FSDataInputStream.class.getMethod("read", long.class, ByteBuffer.class);
062    } catch (NoSuchMethodException e) {
063      LOG.debug("Unable to find positioned bytebuffer read API of FSDataInputStream. "
064        + "preadWithExtra() will use a temporary on-heap byte array.");
065    }
066  }
067
068  public static boolean isByteBufferReadable(FSDataInputStream is) {
069    InputStream cur = is.getWrappedStream();
070    for (;;) {
071      if ((cur instanceof FSDataInputStream)) {
072        cur = ((FSDataInputStream) cur).getWrappedStream();
073      } else {
074        break;
075      }
076    }
077    return cur instanceof ByteBufferReadable;
078  }
079
080  /**
081   * Read length bytes into ByteBuffers directly.
082   * @param buf    the destination {@link ByteBuff}
083   * @param dis    the HDFS input stream which implement the ByteBufferReadable interface.
084   * @param length bytes to read.
085   * @throws IOException exception to throw if any error happen
086   */
087  public static void readFully(ByteBuff buf, FSDataInputStream dis, int length) throws IOException {
088    final Span span = Span.current();
089    final AttributesBuilder attributesBuilder = builderFromContext(Context.current());
090    if (!isByteBufferReadable(dis)) {
091      // If InputStream does not support the ByteBuffer read, just read to heap and copy bytes to
092      // the destination ByteBuff.
093      byte[] heapBuf = new byte[length];
094      IOUtils.readFully(dis, heapBuf, 0, length);
095      annotateHeapBytesRead(attributesBuilder, length);
096      span.addEvent("BlockIOUtils.readFully", attributesBuilder.build());
097      copyToByteBuff(heapBuf, 0, length, buf);
098      return;
099    }
100    int directBytesRead = 0, heapBytesRead = 0;
101    ByteBuffer[] buffers = buf.nioByteBuffers();
102    int remain = length;
103    int idx = 0;
104    ByteBuffer cur = buffers[idx];
105    try {
106      while (remain > 0) {
107        while (!cur.hasRemaining()) {
108          if (++idx >= buffers.length) {
109            throw new IOException(
110              "Not enough ByteBuffers to read the reminding " + remain + " " + "bytes");
111          }
112          cur = buffers[idx];
113        }
114        cur.limit(cur.position() + Math.min(remain, cur.remaining()));
115        int bytesRead = dis.read(cur);
116        if (bytesRead < 0) {
117          throw new IOException(
118            "Premature EOF from inputStream, but still need " + remain + " " + "bytes");
119        }
120        remain -= bytesRead;
121        if (cur.isDirect()) {
122          directBytesRead += bytesRead;
123        } else {
124          heapBytesRead += bytesRead;
125        }
126      }
127    } finally {
128      annotateBytesRead(attributesBuilder, directBytesRead, heapBytesRead);
129      span.addEvent("BlockIOUtils.readFully", attributesBuilder.build());
130    }
131  }
132
133  /**
134   * Copying bytes from InputStream to {@link ByteBuff} by using an temporary heap byte[] (default
135   * size is 1024 now).
136   * @param in     the InputStream to read
137   * @param out    the destination {@link ByteBuff}
138   * @param length to read
139   * @throws IOException if any io error encountered.
140   */
141  public static void readFullyWithHeapBuffer(InputStream in, ByteBuff out, int length)
142    throws IOException {
143    if (length < 0) {
144      throw new IllegalArgumentException("Length must not be negative: " + length);
145    }
146    int heapBytesRead = 0;
147    int remain = length, count;
148    byte[] buffer = new byte[1024];
149    try {
150      while (remain > 0) {
151        count = in.read(buffer, 0, Math.min(remain, buffer.length));
152        if (count < 0) {
153          throw new IOException(
154            "Premature EOF from inputStream, but still need " + remain + " bytes");
155        }
156        out.put(buffer, 0, count);
157        remain -= count;
158        heapBytesRead += count;
159      }
160    } finally {
161      final Span span = Span.current();
162      final AttributesBuilder attributesBuilder = builderFromContext(Context.current());
163      annotateHeapBytesRead(attributesBuilder, heapBytesRead);
164      span.addEvent("BlockIOUtils.readFullyWithHeapBuffer", attributesBuilder.build());
165    }
166  }
167
168  /**
169   * Read from an input stream at least <code>necessaryLen</code> and if possible,
170   * <code>extraLen</code> also if available. Analogous to
171   * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but specifies a number of "extra"
172   * bytes to also optionally read.
173   * @param in           the input stream to read from
174   * @param buf          the buffer to read into
175   * @param bufOffset    the destination offset in the buffer
176   * @param necessaryLen the number of bytes that are absolutely necessary to read
177   * @param extraLen     the number of extra bytes that would be nice to read
178   * @return true if succeeded reading the extra bytes
179   * @throws IOException if failed to read the necessary bytes
180   */
181  private static boolean readWithExtraOnHeap(InputStream in, byte[] buf, int bufOffset,
182    int necessaryLen, int extraLen) throws IOException {
183    int heapBytesRead = 0;
184    int bytesRemaining = necessaryLen + extraLen;
185    try {
186      while (bytesRemaining > 0) {
187        int ret = in.read(buf, bufOffset, bytesRemaining);
188        if (ret < 0) {
189          if (bytesRemaining <= extraLen) {
190            // We could not read the "extra data", but that is OK.
191            break;
192          }
193          throw new IOException("Premature EOF from inputStream (read " + "returned " + ret
194            + ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen
195            + " extra bytes, " + "successfully read " + (necessaryLen + extraLen - bytesRemaining));
196        }
197        bufOffset += ret;
198        bytesRemaining -= ret;
199        heapBytesRead += ret;
200      }
201    } finally {
202      final Span span = Span.current();
203      final AttributesBuilder attributesBuilder = builderFromContext(Context.current());
204      annotateHeapBytesRead(attributesBuilder, heapBytesRead);
205      span.addEvent("BlockIOUtils.readWithExtra", attributesBuilder.build());
206    }
207    return bytesRemaining <= 0;
208  }
209
210  /**
211   * Read bytes into ByteBuffers directly, those buffers either contains the extraLen bytes or only
212   * contains necessaryLen bytes, which depends on how much bytes do the last time we read.
213   * @param buf          the destination {@link ByteBuff}.
214   * @param dis          input stream to read.
215   * @param necessaryLen bytes which we must read
216   * @param extraLen     bytes which we may read
217   * @return if the returned flag is true, then we've finished to read the extraLen into our
218   *         ByteBuffers, otherwise we've not read the extraLen bytes yet.
219   * @throws IOException if failed to read the necessary bytes.
220   */
221  public static boolean readWithExtra(ByteBuff buf, FSDataInputStream dis, int necessaryLen,
222    int extraLen) throws IOException {
223    if (!isByteBufferReadable(dis)) {
224      // If InputStream does not support the ByteBuffer read, just read to heap and copy bytes to
225      // the destination ByteBuff.
226      byte[] heapBuf = new byte[necessaryLen + extraLen];
227      boolean ret = readWithExtraOnHeap(dis, heapBuf, 0, necessaryLen, extraLen);
228      copyToByteBuff(heapBuf, 0, heapBuf.length, buf);
229      return ret;
230    }
231    int directBytesRead = 0, heapBytesRead = 0;
232    ByteBuffer[] buffers = buf.nioByteBuffers();
233    int bytesRead = 0;
234    int remain = necessaryLen + extraLen;
235    int idx = 0;
236    ByteBuffer cur = buffers[idx];
237    try {
238      while (bytesRead < necessaryLen) {
239        while (!cur.hasRemaining()) {
240          if (++idx >= buffers.length) {
241            throw new IOException(
242              "Not enough ByteBuffers to read the reminding " + remain + "bytes");
243          }
244          cur = buffers[idx];
245        }
246        cur.limit(cur.position() + Math.min(remain, cur.remaining()));
247        int ret = dis.read(cur);
248        if (ret < 0) {
249          throw new IOException("Premature EOF from inputStream (read returned " + ret
250            + ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen
251            + " extra bytes, successfully read " + bytesRead);
252        }
253        bytesRead += ret;
254        remain -= ret;
255        if (cur.isDirect()) {
256          directBytesRead += ret;
257        } else {
258          heapBytesRead += ret;
259        }
260      }
261    } finally {
262      final Span span = Span.current();
263      final AttributesBuilder attributesBuilder = builderFromContext(Context.current());
264      annotateBytesRead(attributesBuilder, directBytesRead, heapBytesRead);
265      span.addEvent("BlockIOUtils.readWithExtra", attributesBuilder.build());
266    }
267    return (extraLen > 0) && (bytesRead == necessaryLen + extraLen);
268  }
269
270  /**
271   * Read from an input stream at least <code>necessaryLen</code> and if possible,
272   * <code>extraLen</code> also if available. Analogous to
273   * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but uses positional read and
274   * specifies a number of "extra" bytes that would be desirable but not absolutely necessary to
275   * read. If the input stream supports ByteBufferPositionedReadable, it reads to the byte buffer
276   * directly, and does not allocate a temporary byte array.
277   * @param buff         ByteBuff to read into.
278   * @param dis          the input stream to read from
279   * @param position     the position within the stream from which to start reading
280   * @param necessaryLen the number of bytes that are absolutely necessary to read
281   * @param extraLen     the number of extra bytes that would be nice to read
282   * @return true if and only if extraLen is > 0 and reading those extra bytes was successful
283   * @throws IOException if failed to read the necessary bytes
284   */
285  public static boolean preadWithExtra(ByteBuff buff, FSDataInputStream dis, long position,
286    int necessaryLen, int extraLen) throws IOException {
287    return preadWithExtra(buff, dis, position, necessaryLen, extraLen, false);
288  }
289
290  /**
291   * Read from an input stream at least <code>necessaryLen</code> and if possible,
292   * <code>extraLen</code> also if available. Analogous to
293   * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but uses positional read and
294   * specifies a number of "extra" bytes that would be desirable but not absolutely necessary to
295   * read. If the input stream supports ByteBufferPositionedReadable, it reads to the byte buffer
296   * directly, and does not allocate a temporary byte array.
297   * @param buff         ByteBuff to read into.
298   * @param dis          the input stream to read from
299   * @param position     the position within the stream from which to start reading
300   * @param necessaryLen the number of bytes that are absolutely necessary to read
301   * @param extraLen     the number of extra bytes that would be nice to read
302   * @param readAllBytes whether we must read the necessaryLen and extraLen
303   * @return true if and only if extraLen is > 0 and reading those extra bytes was successful
304   * @throws IOException if failed to read the necessary bytes
305   */
306  public static boolean preadWithExtra(ByteBuff buff, FSDataInputStream dis, long position,
307    int necessaryLen, int extraLen, boolean readAllBytes) throws IOException {
308    boolean preadbytebuffer = dis.hasCapability("in:preadbytebuffer");
309
310    if (preadbytebuffer) {
311      return preadWithExtraDirectly(buff, dis, position, necessaryLen, extraLen, readAllBytes);
312    } else {
313      return preadWithExtraOnHeap(buff, dis, position, necessaryLen, extraLen, readAllBytes);
314    }
315  }
316
317  private static boolean preadWithExtraOnHeap(ByteBuff buff, FSDataInputStream dis, long position,
318    int necessaryLen, int extraLen, boolean readAllBytes) throws IOException {
319    int remain = necessaryLen + extraLen;
320    byte[] buf = new byte[remain];
321    int bytesRead = 0;
322    int lengthMustRead = readAllBytes ? remain : necessaryLen;
323    try {
324      while (bytesRead < lengthMustRead) {
325        int ret = dis.read(position + bytesRead, buf, bytesRead, remain);
326        if (ret < 0) {
327          throw new IOException("Premature EOF from inputStream (positional read returned " + ret
328            + ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen
329            + " extra bytes, successfully read " + bytesRead);
330        }
331        bytesRead += ret;
332        remain -= ret;
333      }
334    } finally {
335      final Span span = Span.current();
336      final AttributesBuilder attributesBuilder = builderFromContext(Context.current());
337      annotateHeapBytesRead(attributesBuilder, bytesRead);
338      span.addEvent("BlockIOUtils.preadWithExtra", attributesBuilder.build());
339    }
340    copyToByteBuff(buf, 0, bytesRead, buff);
341    return (extraLen > 0) && (bytesRead == necessaryLen + extraLen);
342  }
343
344  private static boolean preadWithExtraDirectly(ByteBuff buff, FSDataInputStream dis, long position,
345    int necessaryLen, int extraLen, boolean readAllBytes) throws IOException {
346    int directBytesRead = 0, heapBytesRead = 0;
347    int remain = necessaryLen + extraLen, bytesRead = 0, idx = 0;
348    ByteBuffer[] buffers = buff.nioByteBuffers();
349    ByteBuffer cur = buffers[idx];
350    int lengthMustRead = readAllBytes ? remain : necessaryLen;
351    try {
352      while (bytesRead < lengthMustRead) {
353        int ret;
354        while (!cur.hasRemaining()) {
355          if (++idx >= buffers.length) {
356            throw new IOException(
357              "Not enough ByteBuffers to read the reminding " + remain + "bytes");
358          }
359          cur = buffers[idx];
360        }
361        cur.limit(cur.position() + Math.min(remain, cur.remaining()));
362        try {
363          ret = (Integer) byteBufferPositionedReadMethod.invoke(dis, position + bytesRead, cur);
364        } catch (IllegalAccessException e) {
365          throw new IOException("Unable to invoke ByteBuffer positioned read when trying to read "
366            + bytesRead + " bytes from position " + position, e);
367        } catch (InvocationTargetException e) {
368          throw new IOException("Encountered an exception when invoking ByteBuffer positioned read"
369            + " when trying to read " + bytesRead + " bytes from position " + position, e);
370        } catch (NullPointerException e) {
371          throw new IOException("something is null");
372        } catch (Exception e) {
373          throw e;
374        }
375        if (ret < 0) {
376          throw new IOException("Premature EOF from inputStream (positional read returned " + ret
377            + ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen
378            + " extra bytes, successfully read " + bytesRead);
379        }
380        bytesRead += ret;
381        remain -= ret;
382        if (cur.isDirect()) {
383          directBytesRead += bytesRead;
384        } else {
385          heapBytesRead += bytesRead;
386        }
387      }
388    } finally {
389      final Span span = Span.current();
390      final AttributesBuilder attributesBuilder = builderFromContext(Context.current());
391      annotateBytesRead(attributesBuilder, directBytesRead, heapBytesRead);
392      span.addEvent("BlockIOUtils.preadWithExtra", attributesBuilder.build());
393    }
394
395    return (extraLen > 0) && (bytesRead == necessaryLen + extraLen);
396  }
397
398  private static int copyToByteBuff(byte[] buf, int offset, int len, ByteBuff out)
399    throws IOException {
400    if (offset < 0 || len < 0 || offset + len > buf.length) {
401      throw new IOException("Invalid offset=" + offset + " and len=" + len + ", cap=" + buf.length);
402    }
403    ByteBuffer[] buffers = out.nioByteBuffers();
404    int idx = 0, remain = len, copyLen;
405    ByteBuffer cur = buffers[idx];
406    while (remain > 0) {
407      while (!cur.hasRemaining()) {
408        if (++idx >= buffers.length) {
409          throw new IOException("Not enough ByteBuffers to read the reminding " + remain + "bytes");
410        }
411        cur = buffers[idx];
412      }
413      copyLen = Math.min(cur.remaining(), remain);
414      cur.put(buf, offset, copyLen);
415      remain -= copyLen;
416      offset += copyLen;
417    }
418    return len;
419  }
420
421  /**
422   * Construct a fresh {@link AttributesBuilder} from the provided {@link Context}, populated with
423   * relevant attributes populated by {@link HFileContextAttributesBuilderConsumer#CONTEXT_KEY}.
424   */
425  private static AttributesBuilder builderFromContext(Context context) {
426    final AttributesBuilder attributesBuilder = Attributes.builder();
427    Optional.ofNullable(context)
428      .map(val -> val.get(HFileContextAttributesBuilderConsumer.CONTEXT_KEY))
429      .ifPresent(c -> c.accept(attributesBuilder));
430    return attributesBuilder;
431  }
432
433  /**
434   * Conditionally annotate {@code span} with the appropriate attribute when value is non-zero.
435   */
436  private static void annotateHeapBytesRead(AttributesBuilder attributesBuilder,
437    int heapBytesRead) {
438    annotateBytesRead(attributesBuilder, 0, heapBytesRead);
439  }
440
441  /**
442   * Conditionally annotate {@code attributesBuilder} with appropriate attributes when values are
443   * non-zero.
444   */
445  private static void annotateBytesRead(AttributesBuilder attributesBuilder, long directBytesRead,
446    long heapBytesRead) {
447    if (directBytesRead > 0) {
448      attributesBuilder.put(DIRECT_BYTES_READ_KEY, directBytesRead);
449    }
450    if (heapBytesRead > 0) {
451      attributesBuilder.put(HEAP_BYTES_READ_KEY, heapBytesRead);
452    }
453  }
454}