001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.util;
019
020import java.io.IOException;
021import java.io.InputStream;
022import java.lang.reflect.InvocationTargetException;
023import java.lang.reflect.Method;
024import java.nio.ByteBuffer;
025import org.apache.hadoop.fs.ByteBufferReadable;
026import org.apache.hadoop.fs.FSDataInputStream;
027import org.apache.hadoop.hbase.nio.ByteBuff;
028import org.apache.hadoop.io.IOUtils;
029import org.apache.yetus.audience.InterfaceAudience;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033@InterfaceAudience.Private
034public final class BlockIOUtils {
035  private static final Logger LOG = LoggerFactory.getLogger(BlockIOUtils.class);
036  // TODO: remove the reflection when we update to Hadoop 3.3 or above.
037  private static Method byteBufferPositionedReadMethod;
038
039  static {
040    initByteBufferPositionReadableMethod();
041  }
042
043  // Disallow instantiation
044  private BlockIOUtils() {
045
046  }
047
048  private static void initByteBufferPositionReadableMethod() {
049    try {
050      // long position, ByteBuffer buf
051      byteBufferPositionedReadMethod =
052        FSDataInputStream.class.getMethod("read", long.class, ByteBuffer.class);
053    } catch (NoSuchMethodException e) {
054      LOG.debug("Unable to find positioned bytebuffer read API of FSDataInputStream. "
055        + "preadWithExtra() will use a temporary on-heap byte array.");
056    }
057  }
058
059  public static boolean isByteBufferReadable(FSDataInputStream is) {
060    InputStream cur = is.getWrappedStream();
061    for (;;) {
062      if ((cur instanceof FSDataInputStream)) {
063        cur = ((FSDataInputStream) cur).getWrappedStream();
064      } else {
065        break;
066      }
067    }
068    return cur instanceof ByteBufferReadable;
069  }
070
071  /**
072   * Read length bytes into ByteBuffers directly.
073   * @param buf    the destination {@link ByteBuff}
074   * @param dis    the HDFS input stream which implement the ByteBufferReadable interface.
075   * @param length bytes to read.
076   * @throws IOException exception to throw if any error happen
077   */
078  public static void readFully(ByteBuff buf, FSDataInputStream dis, int length) throws IOException {
079    if (!isByteBufferReadable(dis)) {
080      // If InputStream does not support the ByteBuffer read, just read to heap and copy bytes to
081      // the destination ByteBuff.
082      byte[] heapBuf = new byte[length];
083      IOUtils.readFully(dis, heapBuf, 0, length);
084      copyToByteBuff(heapBuf, 0, length, buf);
085      return;
086    }
087    ByteBuffer[] buffers = buf.nioByteBuffers();
088    int remain = length;
089    int idx = 0;
090    ByteBuffer cur = buffers[idx];
091    while (remain > 0) {
092      while (!cur.hasRemaining()) {
093        if (++idx >= buffers.length) {
094          throw new IOException(
095            "Not enough ByteBuffers to read the reminding " + remain + " " + "bytes");
096        }
097        cur = buffers[idx];
098      }
099      cur.limit(cur.position() + Math.min(remain, cur.remaining()));
100      int bytesRead = dis.read(cur);
101      if (bytesRead < 0) {
102        throw new IOException(
103          "Premature EOF from inputStream, but still need " + remain + " " + "bytes");
104      }
105      remain -= bytesRead;
106    }
107  }
108
109  /**
110   * Copying bytes from InputStream to {@link ByteBuff} by using an temporary heap byte[] (default
111   * size is 1024 now).
112   * @param in     the InputStream to read
113   * @param out    the destination {@link ByteBuff}
114   * @param length to read
115   * @throws IOException if any io error encountered.
116   */
117  public static void readFullyWithHeapBuffer(InputStream in, ByteBuff out, int length)
118    throws IOException {
119    byte[] buffer = new byte[1024];
120    if (length < 0) {
121      throw new IllegalArgumentException("Length must not be negative: " + length);
122    }
123    int remain = length, count;
124    while (remain > 0) {
125      count = in.read(buffer, 0, Math.min(remain, buffer.length));
126      if (count < 0) {
127        throw new IOException(
128          "Premature EOF from inputStream, but still need " + remain + " bytes");
129      }
130      out.put(buffer, 0, count);
131      remain -= count;
132    }
133  }
134
135  /**
136   * Read from an input stream at least <code>necessaryLen</code> and if possible,
137   * <code>extraLen</code> also if available. Analogous to
138   * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but specifies a number of "extra"
139   * bytes to also optionally read.
140   * @param in           the input stream to read from
141   * @param buf          the buffer to read into
142   * @param bufOffset    the destination offset in the buffer
143   * @param necessaryLen the number of bytes that are absolutely necessary to read
144   * @param extraLen     the number of extra bytes that would be nice to read
145   * @return true if succeeded reading the extra bytes
146   * @throws IOException if failed to read the necessary bytes
147   */
148  private static boolean readWithExtraOnHeap(InputStream in, byte[] buf, int bufOffset,
149    int necessaryLen, int extraLen) throws IOException {
150    int bytesRemaining = necessaryLen + extraLen;
151    while (bytesRemaining > 0) {
152      int ret = in.read(buf, bufOffset, bytesRemaining);
153      if (ret < 0) {
154        if (bytesRemaining <= extraLen) {
155          // We could not read the "extra data", but that is OK.
156          break;
157        }
158        throw new IOException("Premature EOF from inputStream (read " + "returned " + ret
159          + ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen
160          + " extra bytes, " + "successfully read " + (necessaryLen + extraLen - bytesRemaining));
161      }
162      bufOffset += ret;
163      bytesRemaining -= ret;
164    }
165    return bytesRemaining <= 0;
166  }
167
168  /**
169   * Read bytes into ByteBuffers directly, those buffers either contains the extraLen bytes or only
170   * contains necessaryLen bytes, which depends on how much bytes do the last time we read.
171   * @param buf          the destination {@link ByteBuff}.
172   * @param dis          input stream to read.
173   * @param necessaryLen bytes which we must read
174   * @param extraLen     bytes which we may read
175   * @return if the returned flag is true, then we've finished to read the extraLen into our
176   *         ByteBuffers, otherwise we've not read the extraLen bytes yet.
177   * @throws IOException if failed to read the necessary bytes.
178   */
179  public static boolean readWithExtra(ByteBuff buf, FSDataInputStream dis, int necessaryLen,
180    int extraLen) throws IOException {
181    if (!isByteBufferReadable(dis)) {
182      // If InputStream does not support the ByteBuffer read, just read to heap and copy bytes to
183      // the destination ByteBuff.
184      byte[] heapBuf = new byte[necessaryLen + extraLen];
185      boolean ret = readWithExtraOnHeap(dis, heapBuf, 0, necessaryLen, extraLen);
186      copyToByteBuff(heapBuf, 0, heapBuf.length, buf);
187      return ret;
188    }
189    ByteBuffer[] buffers = buf.nioByteBuffers();
190    int bytesRead = 0;
191    int remain = necessaryLen + extraLen;
192    int idx = 0;
193    ByteBuffer cur = buffers[idx];
194    while (bytesRead < necessaryLen) {
195      while (!cur.hasRemaining()) {
196        if (++idx >= buffers.length) {
197          throw new IOException("Not enough ByteBuffers to read the reminding " + remain + "bytes");
198        }
199        cur = buffers[idx];
200      }
201      cur.limit(cur.position() + Math.min(remain, cur.remaining()));
202      int ret = dis.read(cur);
203      if (ret < 0) {
204        throw new IOException("Premature EOF from inputStream (read returned " + ret
205          + ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen
206          + " extra bytes, successfully read " + bytesRead);
207      }
208      bytesRead += ret;
209      remain -= ret;
210    }
211    return (extraLen > 0) && (bytesRead == necessaryLen + extraLen);
212  }
213
214  /**
215   * Read from an input stream at least <code>necessaryLen</code> and if possible,
216   * <code>extraLen</code> also if available. Analogous to
217   * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but uses positional read and
218   * specifies a number of "extra" bytes that would be desirable but not absolutely necessary to
219   * read. If the input stream supports ByteBufferPositionedReadable, it reads to the byte buffer
220   * directly, and does not allocate a temporary byte array.
221   * @param buff         ByteBuff to read into.
222   * @param dis          the input stream to read from
223   * @param position     the position within the stream from which to start reading
224   * @param necessaryLen the number of bytes that are absolutely necessary to read
225   * @param extraLen     the number of extra bytes that would be nice to read
226   * @return true if and only if extraLen is > 0 and reading those extra bytes was successful
227   * @throws IOException if failed to read the necessary bytes
228   */
229  public static boolean preadWithExtra(ByteBuff buff, FSDataInputStream dis, long position,
230    int necessaryLen, int extraLen) throws IOException {
231    return preadWithExtra(buff, dis, position, necessaryLen, extraLen, false);
232  }
233
234  /**
235   * Read from an input stream at least <code>necessaryLen</code> and if possible,
236   * <code>extraLen</code> also if available. Analogous to
237   * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but uses positional read and
238   * specifies a number of "extra" bytes that would be desirable but not absolutely necessary to
239   * read. If the input stream supports ByteBufferPositionedReadable, it reads to the byte buffer
240   * directly, and does not allocate a temporary byte array.
241   * @param buff         ByteBuff to read into.
242   * @param dis          the input stream to read from
243   * @param position     the position within the stream from which to start reading
244   * @param necessaryLen the number of bytes that are absolutely necessary to read
245   * @param extraLen     the number of extra bytes that would be nice to read
246   * @param readAllBytes whether we must read the necessaryLen and extraLen
247   * @return true if and only if extraLen is > 0 and reading those extra bytes was successful
248   * @throws IOException if failed to read the necessary bytes
249   */
250  public static boolean preadWithExtra(ByteBuff buff, FSDataInputStream dis, long position,
251    int necessaryLen, int extraLen, boolean readAllBytes) throws IOException {
252    boolean preadbytebuffer = dis.hasCapability("in:preadbytebuffer");
253
254    if (preadbytebuffer) {
255      return preadWithExtraDirectly(buff, dis, position, necessaryLen, extraLen, readAllBytes);
256    } else {
257      return preadWithExtraOnHeap(buff, dis, position, necessaryLen, extraLen, readAllBytes);
258    }
259  }
260
261  private static boolean preadWithExtraOnHeap(ByteBuff buff, FSDataInputStream dis, long position,
262    int necessaryLen, int extraLen, boolean readAllBytes) throws IOException {
263    int remain = necessaryLen + extraLen;
264    byte[] buf = new byte[remain];
265    int bytesRead = 0;
266    int lengthMustRead = readAllBytes ? remain : necessaryLen;
267    while (bytesRead < lengthMustRead) {
268      int ret = dis.read(position + bytesRead, buf, bytesRead, remain);
269      if (ret < 0) {
270        throw new IOException("Premature EOF from inputStream (positional read returned " + ret
271          + ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen
272          + " extra bytes, successfully read " + bytesRead);
273      }
274      bytesRead += ret;
275      remain -= ret;
276    }
277    copyToByteBuff(buf, 0, bytesRead, buff);
278    return (extraLen > 0) && (bytesRead == necessaryLen + extraLen);
279  }
280
281  private static boolean preadWithExtraDirectly(ByteBuff buff, FSDataInputStream dis, long position,
282    int necessaryLen, int extraLen, boolean readAllBytes) throws IOException {
283    int remain = necessaryLen + extraLen, bytesRead = 0, idx = 0;
284    ByteBuffer[] buffers = buff.nioByteBuffers();
285    ByteBuffer cur = buffers[idx];
286    int lengthMustRead = readAllBytes ? remain : necessaryLen;
287    while (bytesRead < lengthMustRead) {
288      int ret;
289      while (!cur.hasRemaining()) {
290        if (++idx >= buffers.length) {
291          throw new IOException("Not enough ByteBuffers to read the reminding " + remain + "bytes");
292        }
293        cur = buffers[idx];
294      }
295      cur.limit(cur.position() + Math.min(remain, cur.remaining()));
296      try {
297        ret = (Integer) byteBufferPositionedReadMethod.invoke(dis, position + bytesRead, cur);
298      } catch (IllegalAccessException e) {
299        throw new IOException("Unable to invoke ByteBuffer positioned read when trying to read "
300          + bytesRead + " bytes from position " + position, e);
301      } catch (InvocationTargetException e) {
302        throw new IOException("Encountered an exception when invoking ByteBuffer positioned read"
303          + " when trying to read " + bytesRead + " bytes from position " + position, e);
304      } catch (NullPointerException e) {
305        throw new IOException("something is null");
306      } catch (Exception e) {
307        throw e;
308      }
309      if (ret < 0) {
310        throw new IOException("Premature EOF from inputStream (positional read returned " + ret
311          + ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen
312          + " extra bytes, successfully read " + bytesRead);
313      }
314      bytesRead += ret;
315      remain -= ret;
316    }
317
318    return (extraLen > 0) && (bytesRead == necessaryLen + extraLen);
319  }
320
321  private static int copyToByteBuff(byte[] buf, int offset, int len, ByteBuff out)
322    throws IOException {
323    if (offset < 0 || len < 0 || offset + len > buf.length) {
324      throw new IOException("Invalid offset=" + offset + " and len=" + len + ", cap=" + buf.length);
325    }
326    ByteBuffer[] buffers = out.nioByteBuffers();
327    int idx = 0, remain = len, copyLen;
328    ByteBuffer cur = buffers[idx];
329    while (remain > 0) {
330      while (!cur.hasRemaining()) {
331        if (++idx >= buffers.length) {
332          throw new IOException("Not enough ByteBuffers to read the reminding " + remain + "bytes");
333        }
334        cur = buffers[idx];
335      }
336      copyLen = Math.min(cur.remaining(), remain);
337      cur.put(buf, offset, copyLen);
338      remain -= copyLen;
339      offset += copyLen;
340    }
341    return len;
342  }
343}