001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.io.util;
020
021import java.io.IOException;
022import java.io.InputStream;
023import java.nio.ByteBuffer;
024
025import org.apache.hadoop.fs.ByteBufferReadable;
026import org.apache.hadoop.fs.FSDataInputStream;
027import org.apache.hadoop.hbase.nio.ByteBuff;
028import org.apache.hadoop.io.IOUtils;
029import org.apache.yetus.audience.InterfaceAudience;
030
031@InterfaceAudience.Private
032public final class BlockIOUtils {
033
034  // Disallow instantiation
035  private BlockIOUtils() {
036
037  }
038
039  public static boolean isByteBufferReadable(FSDataInputStream is) {
040    InputStream cur = is.getWrappedStream();
041    for (;;) {
042      if ((cur instanceof FSDataInputStream)) {
043        cur = ((FSDataInputStream) cur).getWrappedStream();
044      } else {
045        break;
046      }
047    }
048    return cur instanceof ByteBufferReadable;
049  }
050
051  /**
052   * Read length bytes into ByteBuffers directly.
053   * @param buf the destination {@link ByteBuff}
054   * @param dis the HDFS input stream which implement the ByteBufferReadable interface.
055   * @param length bytes to read.
056   * @throws IOException exception to throw if any error happen
057   */
058  public static void readFully(ByteBuff buf, FSDataInputStream dis, int length) throws IOException {
059    if (!isByteBufferReadable(dis)) {
060      // If InputStream does not support the ByteBuffer read, just read to heap and copy bytes to
061      // the destination ByteBuff.
062      byte[] heapBuf = new byte[length];
063      IOUtils.readFully(dis, heapBuf, 0, length);
064      copyToByteBuff(heapBuf, 0, length, buf);
065      return;
066    }
067    ByteBuffer[] buffers = buf.nioByteBuffers();
068    int remain = length;
069    int idx = 0;
070    ByteBuffer cur = buffers[idx];
071    while (remain > 0) {
072      while (!cur.hasRemaining()) {
073        if (++idx >= buffers.length) {
074          throw new IOException(
075              "Not enough ByteBuffers to read the reminding " + remain + " " + "bytes");
076        }
077        cur = buffers[idx];
078      }
079      cur.limit(cur.position() + Math.min(remain, cur.remaining()));
080      int bytesRead = dis.read(cur);
081      if (bytesRead < 0) {
082        throw new IOException(
083            "Premature EOF from inputStream, but still need " + remain + " " + "bytes");
084      }
085      remain -= bytesRead;
086    }
087  }
088
089  /**
090   * Copying bytes from InputStream to {@link ByteBuff} by using an temporary heap byte[] (default
091   * size is 1024 now).
092   * @param in the InputStream to read
093   * @param out the destination {@link ByteBuff}
094   * @param length to read
095   * @throws IOException if any io error encountered.
096   */
097  public static void readFullyWithHeapBuffer(InputStream in, ByteBuff out, int length)
098      throws IOException {
099    byte[] buffer = new byte[1024];
100    if (length < 0) {
101      throw new IllegalArgumentException("Length must not be negative: " + length);
102    }
103    int remain = length, count;
104    while (remain > 0) {
105      count = in.read(buffer, 0, Math.min(remain, buffer.length));
106      if (count < 0) {
107        throw new IOException(
108            "Premature EOF from inputStream, but still need " + remain + " bytes");
109      }
110      out.put(buffer, 0, count);
111      remain -= count;
112    }
113  }
114
115  /**
116   * Read from an input stream at least <code>necessaryLen</code> and if possible,
117   * <code>extraLen</code> also if available. Analogous to
118   * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but specifies a number of "extra"
119   * bytes to also optionally read.
120   * @param in the input stream to read from
121   * @param buf the buffer to read into
122   * @param bufOffset the destination offset in the buffer
123   * @param necessaryLen the number of bytes that are absolutely necessary to read
124   * @param extraLen the number of extra bytes that would be nice to read
125   * @return true if succeeded reading the extra bytes
126   * @throws IOException if failed to read the necessary bytes
127   */
128  private static boolean readWithExtraOnHeap(InputStream in, byte[] buf, int bufOffset,
129      int necessaryLen, int extraLen) throws IOException {
130    int bytesRemaining = necessaryLen + extraLen;
131    while (bytesRemaining > 0) {
132      int ret = in.read(buf, bufOffset, bytesRemaining);
133      if (ret < 0) {
134        if (bytesRemaining <= extraLen) {
135          // We could not read the "extra data", but that is OK.
136          break;
137        }
138        throw new IOException("Premature EOF from inputStream (read " + "returned " + ret
139            + ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen
140            + " extra bytes, " + "successfully read " + (necessaryLen + extraLen - bytesRemaining));
141      }
142      bufOffset += ret;
143      bytesRemaining -= ret;
144    }
145    return bytesRemaining <= 0;
146  }
147
148  /**
149   * Read bytes into ByteBuffers directly, those buffers either contains the extraLen bytes or only
150   * contains necessaryLen bytes, which depends on how much bytes do the last time we read.
151   * @param buf the destination {@link ByteBuff}.
152   * @param dis input stream to read.
153   * @param necessaryLen bytes which we must read
154   * @param extraLen bytes which we may read
155   * @return if the returned flag is true, then we've finished to read the extraLen into our
156   *         ByteBuffers, otherwise we've not read the extraLen bytes yet.
157   * @throws IOException if failed to read the necessary bytes.
158   */
159  public static boolean readWithExtra(ByteBuff buf, FSDataInputStream dis, int necessaryLen,
160      int extraLen) throws IOException {
161    if (!isByteBufferReadable(dis)) {
162      // If InputStream does not support the ByteBuffer read, just read to heap and copy bytes to
163      // the destination ByteBuff.
164      byte[] heapBuf = new byte[necessaryLen + extraLen];
165      boolean ret = readWithExtraOnHeap(dis, heapBuf, 0, necessaryLen, extraLen);
166      copyToByteBuff(heapBuf, 0, heapBuf.length, buf);
167      return ret;
168    }
169    ByteBuffer[] buffers = buf.nioByteBuffers();
170    int bytesRead = 0;
171    int remain = necessaryLen + extraLen;
172    int idx = 0;
173    ByteBuffer cur = buffers[idx];
174    while (bytesRead < necessaryLen) {
175      while (!cur.hasRemaining()) {
176        if (++idx >= buffers.length) {
177          throw new IOException("Not enough ByteBuffers to read the reminding " + remain + "bytes");
178        }
179        cur = buffers[idx];
180      }
181      cur.limit(cur.position() + Math.min(remain, cur.remaining()));
182      int ret = dis.read(cur);
183      if (ret < 0) {
184        throw new IOException("Premature EOF from inputStream (read returned " + ret
185            + ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen
186            + " extra bytes, successfully read " + bytesRead);
187      }
188      bytesRead += ret;
189      remain -= ret;
190    }
191    return (extraLen > 0) && (bytesRead == necessaryLen + extraLen);
192  }
193
194  /**
195   * Read from an input stream at least <code>necessaryLen</code> and if possible,
196   * <code>extraLen</code> also if available. Analogous to
197   * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but uses positional read and
198   * specifies a number of "extra" bytes that would be desirable but not absolutely necessary to
199   * read.
200   * @param buff ByteBuff to read into.
201   * @param dis the input stream to read from
202   * @param position the position within the stream from which to start reading
203   * @param necessaryLen the number of bytes that are absolutely necessary to read
204   * @param extraLen the number of extra bytes that would be nice to read
205   * @return true if and only if extraLen is > 0 and reading those extra bytes was successful
206   * @throws IOException if failed to read the necessary bytes
207   */
208  public static boolean preadWithExtra(ByteBuff buff, FSDataInputStream dis, long position,
209      int necessaryLen, int extraLen) throws IOException {
210    int remain = necessaryLen + extraLen;
211    byte[] buf = new byte[remain];
212    int bytesRead = 0;
213    while (bytesRead < necessaryLen) {
214      int ret = dis.read(position + bytesRead, buf, bytesRead, remain);
215      if (ret < 0) {
216        throw new IOException("Premature EOF from inputStream (positional read returned " + ret
217            + ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen
218            + " extra bytes, successfully read " + bytesRead);
219      }
220      bytesRead += ret;
221      remain -= ret;
222    }
223    // Copy the bytes from on-heap bytes[] to ByteBuffer[] now, and after resolving HDFS-3246, we
224    // will read the bytes to ByteBuffer[] directly without allocating any on-heap byte[].
225    // TODO I keep the bytes copy here, because I want to abstract the ByteBuffer[]
226    // preadWithExtra method for the upper layer, only need to refactor this method if the
227    // ByteBuffer pread is OK.
228    copyToByteBuff(buf, 0, bytesRead, buff);
229    return (extraLen > 0) && (bytesRead == necessaryLen + extraLen);
230  }
231
232  private static int copyToByteBuff(byte[] buf, int offset, int len, ByteBuff out)
233      throws IOException {
234    if (offset < 0 || len < 0 || offset + len > buf.length) {
235      throw new IOException("Invalid offset=" + offset + " and len=" + len + ", cap=" + buf.length);
236    }
237    ByteBuffer[] buffers = out.nioByteBuffers();
238    int idx = 0, remain = len, copyLen;
239    ByteBuffer cur = buffers[idx];
240    while (remain > 0) {
241      while (!cur.hasRemaining()) {
242        if (++idx >= buffers.length) {
243          throw new IOException("Not enough ByteBuffers to read the reminding " + remain + "bytes");
244        }
245        cur = buffers[idx];
246      }
247      copyLen = Math.min(cur.remaining(), remain);
248      cur.put(buf, offset, copyLen);
249      remain -= copyLen;
250      offset += copyLen;
251    }
252    return len;
253  }
254}