001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.util; 019 020import java.io.IOException; 021import java.io.InputStream; 022import java.lang.reflect.InvocationTargetException; 023import java.lang.reflect.Method; 024import java.nio.ByteBuffer; 025import org.apache.hadoop.fs.ByteBufferReadable; 026import org.apache.hadoop.fs.FSDataInputStream; 027import org.apache.hadoop.hbase.nio.ByteBuff; 028import org.apache.hadoop.io.IOUtils; 029import org.apache.yetus.audience.InterfaceAudience; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033@InterfaceAudience.Private 034public final class BlockIOUtils { 035 private static final Logger LOG = LoggerFactory.getLogger(BlockIOUtils.class); 036 // TODO: remove the reflection when we update to Hadoop 3.3 or above. 037 private static Method byteBufferPositionedReadMethod; 038 039 static { 040 initByteBufferPositionReadableMethod(); 041 } 042 043 // Disallow instantiation 044 private BlockIOUtils() { 045 046 } 047 048 private static void initByteBufferPositionReadableMethod() { 049 try { 050 // long position, ByteBuffer buf 051 byteBufferPositionedReadMethod = 052 FSDataInputStream.class.getMethod("read", long.class, ByteBuffer.class); 053 } catch (NoSuchMethodException e) { 054 LOG.debug("Unable to find positioned bytebuffer read API of FSDataInputStream. " 055 + "preadWithExtra() will use a temporary on-heap byte array."); 056 } 057 } 058 059 public static boolean isByteBufferReadable(FSDataInputStream is) { 060 InputStream cur = is.getWrappedStream(); 061 for (;;) { 062 if ((cur instanceof FSDataInputStream)) { 063 cur = ((FSDataInputStream) cur).getWrappedStream(); 064 } else { 065 break; 066 } 067 } 068 return cur instanceof ByteBufferReadable; 069 } 070 071 /** 072 * Read length bytes into ByteBuffers directly. 073 * @param buf the destination {@link ByteBuff} 074 * @param dis the HDFS input stream which implement the ByteBufferReadable interface. 075 * @param length bytes to read. 076 * @throws IOException exception to throw if any error happen 077 */ 078 public static void readFully(ByteBuff buf, FSDataInputStream dis, int length) throws IOException { 079 if (!isByteBufferReadable(dis)) { 080 // If InputStream does not support the ByteBuffer read, just read to heap and copy bytes to 081 // the destination ByteBuff. 082 byte[] heapBuf = new byte[length]; 083 IOUtils.readFully(dis, heapBuf, 0, length); 084 copyToByteBuff(heapBuf, 0, length, buf); 085 return; 086 } 087 ByteBuffer[] buffers = buf.nioByteBuffers(); 088 int remain = length; 089 int idx = 0; 090 ByteBuffer cur = buffers[idx]; 091 while (remain > 0) { 092 while (!cur.hasRemaining()) { 093 if (++idx >= buffers.length) { 094 throw new IOException( 095 "Not enough ByteBuffers to read the reminding " + remain + " " + "bytes"); 096 } 097 cur = buffers[idx]; 098 } 099 cur.limit(cur.position() + Math.min(remain, cur.remaining())); 100 int bytesRead = dis.read(cur); 101 if (bytesRead < 0) { 102 throw new IOException( 103 "Premature EOF from inputStream, but still need " + remain + " " + "bytes"); 104 } 105 remain -= bytesRead; 106 } 107 } 108 109 /** 110 * Copying bytes from InputStream to {@link ByteBuff} by using an temporary heap byte[] (default 111 * size is 1024 now). 112 * @param in the InputStream to read 113 * @param out the destination {@link ByteBuff} 114 * @param length to read 115 * @throws IOException if any io error encountered. 116 */ 117 public static void readFullyWithHeapBuffer(InputStream in, ByteBuff out, int length) 118 throws IOException { 119 byte[] buffer = new byte[1024]; 120 if (length < 0) { 121 throw new IllegalArgumentException("Length must not be negative: " + length); 122 } 123 int remain = length, count; 124 while (remain > 0) { 125 count = in.read(buffer, 0, Math.min(remain, buffer.length)); 126 if (count < 0) { 127 throw new IOException( 128 "Premature EOF from inputStream, but still need " + remain + " bytes"); 129 } 130 out.put(buffer, 0, count); 131 remain -= count; 132 } 133 } 134 135 /** 136 * Read from an input stream at least <code>necessaryLen</code> and if possible, 137 * <code>extraLen</code> also if available. Analogous to 138 * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but specifies a number of "extra" 139 * bytes to also optionally read. 140 * @param in the input stream to read from 141 * @param buf the buffer to read into 142 * @param bufOffset the destination offset in the buffer 143 * @param necessaryLen the number of bytes that are absolutely necessary to read 144 * @param extraLen the number of extra bytes that would be nice to read 145 * @return true if succeeded reading the extra bytes 146 * @throws IOException if failed to read the necessary bytes 147 */ 148 private static boolean readWithExtraOnHeap(InputStream in, byte[] buf, int bufOffset, 149 int necessaryLen, int extraLen) throws IOException { 150 int bytesRemaining = necessaryLen + extraLen; 151 while (bytesRemaining > 0) { 152 int ret = in.read(buf, bufOffset, bytesRemaining); 153 if (ret < 0) { 154 if (bytesRemaining <= extraLen) { 155 // We could not read the "extra data", but that is OK. 156 break; 157 } 158 throw new IOException("Premature EOF from inputStream (read " + "returned " + ret 159 + ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen 160 + " extra bytes, " + "successfully read " + (necessaryLen + extraLen - bytesRemaining)); 161 } 162 bufOffset += ret; 163 bytesRemaining -= ret; 164 } 165 return bytesRemaining <= 0; 166 } 167 168 /** 169 * Read bytes into ByteBuffers directly, those buffers either contains the extraLen bytes or only 170 * contains necessaryLen bytes, which depends on how much bytes do the last time we read. 171 * @param buf the destination {@link ByteBuff}. 172 * @param dis input stream to read. 173 * @param necessaryLen bytes which we must read 174 * @param extraLen bytes which we may read 175 * @return if the returned flag is true, then we've finished to read the extraLen into our 176 * ByteBuffers, otherwise we've not read the extraLen bytes yet. 177 * @throws IOException if failed to read the necessary bytes. 178 */ 179 public static boolean readWithExtra(ByteBuff buf, FSDataInputStream dis, int necessaryLen, 180 int extraLen) throws IOException { 181 if (!isByteBufferReadable(dis)) { 182 // If InputStream does not support the ByteBuffer read, just read to heap and copy bytes to 183 // the destination ByteBuff. 184 byte[] heapBuf = new byte[necessaryLen + extraLen]; 185 boolean ret = readWithExtraOnHeap(dis, heapBuf, 0, necessaryLen, extraLen); 186 copyToByteBuff(heapBuf, 0, heapBuf.length, buf); 187 return ret; 188 } 189 ByteBuffer[] buffers = buf.nioByteBuffers(); 190 int bytesRead = 0; 191 int remain = necessaryLen + extraLen; 192 int idx = 0; 193 ByteBuffer cur = buffers[idx]; 194 while (bytesRead < necessaryLen) { 195 while (!cur.hasRemaining()) { 196 if (++idx >= buffers.length) { 197 throw new IOException("Not enough ByteBuffers to read the reminding " + remain + "bytes"); 198 } 199 cur = buffers[idx]; 200 } 201 cur.limit(cur.position() + Math.min(remain, cur.remaining())); 202 int ret = dis.read(cur); 203 if (ret < 0) { 204 throw new IOException("Premature EOF from inputStream (read returned " + ret 205 + ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen 206 + " extra bytes, successfully read " + bytesRead); 207 } 208 bytesRead += ret; 209 remain -= ret; 210 } 211 return (extraLen > 0) && (bytesRead == necessaryLen + extraLen); 212 } 213 214 /** 215 * Read from an input stream at least <code>necessaryLen</code> and if possible, 216 * <code>extraLen</code> also if available. Analogous to 217 * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but uses positional read and 218 * specifies a number of "extra" bytes that would be desirable but not absolutely necessary to 219 * read. If the input stream supports ByteBufferPositionedReadable, it reads to the byte buffer 220 * directly, and does not allocate a temporary byte array. 221 * @param buff ByteBuff to read into. 222 * @param dis the input stream to read from 223 * @param position the position within the stream from which to start reading 224 * @param necessaryLen the number of bytes that are absolutely necessary to read 225 * @param extraLen the number of extra bytes that would be nice to read 226 * @return true if and only if extraLen is > 0 and reading those extra bytes was successful 227 * @throws IOException if failed to read the necessary bytes 228 */ 229 public static boolean preadWithExtra(ByteBuff buff, FSDataInputStream dis, long position, 230 int necessaryLen, int extraLen) throws IOException { 231 return preadWithExtra(buff, dis, position, necessaryLen, extraLen, false); 232 } 233 234 /** 235 * Read from an input stream at least <code>necessaryLen</code> and if possible, 236 * <code>extraLen</code> also if available. Analogous to 237 * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but uses positional read and 238 * specifies a number of "extra" bytes that would be desirable but not absolutely necessary to 239 * read. If the input stream supports ByteBufferPositionedReadable, it reads to the byte buffer 240 * directly, and does not allocate a temporary byte array. 241 * @param buff ByteBuff to read into. 242 * @param dis the input stream to read from 243 * @param position the position within the stream from which to start reading 244 * @param necessaryLen the number of bytes that are absolutely necessary to read 245 * @param extraLen the number of extra bytes that would be nice to read 246 * @param readAllBytes whether we must read the necessaryLen and extraLen 247 * @return true if and only if extraLen is > 0 and reading those extra bytes was successful 248 * @throws IOException if failed to read the necessary bytes 249 */ 250 public static boolean preadWithExtra(ByteBuff buff, FSDataInputStream dis, long position, 251 int necessaryLen, int extraLen, boolean readAllBytes) throws IOException { 252 boolean preadbytebuffer = dis.hasCapability("in:preadbytebuffer"); 253 254 if (preadbytebuffer) { 255 return preadWithExtraDirectly(buff, dis, position, necessaryLen, extraLen, readAllBytes); 256 } else { 257 return preadWithExtraOnHeap(buff, dis, position, necessaryLen, extraLen, readAllBytes); 258 } 259 } 260 261 private static boolean preadWithExtraOnHeap(ByteBuff buff, FSDataInputStream dis, long position, 262 int necessaryLen, int extraLen, boolean readAllBytes) throws IOException { 263 int remain = necessaryLen + extraLen; 264 byte[] buf = new byte[remain]; 265 int bytesRead = 0; 266 int lengthMustRead = readAllBytes ? remain : necessaryLen; 267 while (bytesRead < lengthMustRead) { 268 int ret = dis.read(position + bytesRead, buf, bytesRead, remain); 269 if (ret < 0) { 270 throw new IOException("Premature EOF from inputStream (positional read returned " + ret 271 + ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen 272 + " extra bytes, successfully read " + bytesRead); 273 } 274 bytesRead += ret; 275 remain -= ret; 276 } 277 copyToByteBuff(buf, 0, bytesRead, buff); 278 return (extraLen > 0) && (bytesRead == necessaryLen + extraLen); 279 } 280 281 private static boolean preadWithExtraDirectly(ByteBuff buff, FSDataInputStream dis, long position, 282 int necessaryLen, int extraLen, boolean readAllBytes) throws IOException { 283 int remain = necessaryLen + extraLen, bytesRead = 0, idx = 0; 284 ByteBuffer[] buffers = buff.nioByteBuffers(); 285 ByteBuffer cur = buffers[idx]; 286 int lengthMustRead = readAllBytes ? remain : necessaryLen; 287 while (bytesRead < lengthMustRead) { 288 int ret; 289 while (!cur.hasRemaining()) { 290 if (++idx >= buffers.length) { 291 throw new IOException("Not enough ByteBuffers to read the reminding " + remain + "bytes"); 292 } 293 cur = buffers[idx]; 294 } 295 cur.limit(cur.position() + Math.min(remain, cur.remaining())); 296 try { 297 ret = (Integer) byteBufferPositionedReadMethod.invoke(dis, position + bytesRead, cur); 298 } catch (IllegalAccessException e) { 299 throw new IOException("Unable to invoke ByteBuffer positioned read when trying to read " 300 + bytesRead + " bytes from position " + position, e); 301 } catch (InvocationTargetException e) { 302 throw new IOException("Encountered an exception when invoking ByteBuffer positioned read" 303 + " when trying to read " + bytesRead + " bytes from position " + position, e); 304 } catch (NullPointerException e) { 305 throw new IOException("something is null"); 306 } catch (Exception e) { 307 throw e; 308 } 309 if (ret < 0) { 310 throw new IOException("Premature EOF from inputStream (positional read returned " + ret 311 + ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen 312 + " extra bytes, successfully read " + bytesRead); 313 } 314 bytesRead += ret; 315 remain -= ret; 316 } 317 318 return (extraLen > 0) && (bytesRead == necessaryLen + extraLen); 319 } 320 321 private static int copyToByteBuff(byte[] buf, int offset, int len, ByteBuff out) 322 throws IOException { 323 if (offset < 0 || len < 0 || offset + len > buf.length) { 324 throw new IOException("Invalid offset=" + offset + " and len=" + len + ", cap=" + buf.length); 325 } 326 ByteBuffer[] buffers = out.nioByteBuffers(); 327 int idx = 0, remain = len, copyLen; 328 ByteBuffer cur = buffers[idx]; 329 while (remain > 0) { 330 while (!cur.hasRemaining()) { 331 if (++idx >= buffers.length) { 332 throw new IOException("Not enough ByteBuffers to read the reminding " + remain + "bytes"); 333 } 334 cur = buffers[idx]; 335 } 336 copyLen = Math.min(cur.remaining(), remain); 337 cur.put(buf, offset, copyLen); 338 remain -= copyLen; 339 offset += copyLen; 340 } 341 return len; 342 } 343}