001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.io.util; 020 021import java.io.IOException; 022import java.io.InputStream; 023import java.nio.ByteBuffer; 024 025import org.apache.hadoop.fs.ByteBufferReadable; 026import org.apache.hadoop.fs.FSDataInputStream; 027import org.apache.hadoop.hbase.nio.ByteBuff; 028import org.apache.hadoop.io.IOUtils; 029import org.apache.yetus.audience.InterfaceAudience; 030 031@InterfaceAudience.Private 032public final class BlockIOUtils { 033 034 // Disallow instantiation 035 private BlockIOUtils() { 036 037 } 038 039 public static boolean isByteBufferReadable(FSDataInputStream is) { 040 InputStream cur = is.getWrappedStream(); 041 for (;;) { 042 if ((cur instanceof FSDataInputStream)) { 043 cur = ((FSDataInputStream) cur).getWrappedStream(); 044 } else { 045 break; 046 } 047 } 048 return cur instanceof ByteBufferReadable; 049 } 050 051 /** 052 * Read length bytes into ByteBuffers directly. 053 * @param buf the destination {@link ByteBuff} 054 * @param dis the HDFS input stream which implement the ByteBufferReadable interface. 055 * @param length bytes to read. 056 * @throws IOException exception to throw if any error happen 057 */ 058 public static void readFully(ByteBuff buf, FSDataInputStream dis, int length) throws IOException { 059 if (!isByteBufferReadable(dis)) { 060 // If InputStream does not support the ByteBuffer read, just read to heap and copy bytes to 061 // the destination ByteBuff. 062 byte[] heapBuf = new byte[length]; 063 IOUtils.readFully(dis, heapBuf, 0, length); 064 copyToByteBuff(heapBuf, 0, length, buf); 065 return; 066 } 067 ByteBuffer[] buffers = buf.nioByteBuffers(); 068 int remain = length; 069 int idx = 0; 070 ByteBuffer cur = buffers[idx]; 071 while (remain > 0) { 072 while (!cur.hasRemaining()) { 073 if (++idx >= buffers.length) { 074 throw new IOException( 075 "Not enough ByteBuffers to read the reminding " + remain + " " + "bytes"); 076 } 077 cur = buffers[idx]; 078 } 079 cur.limit(cur.position() + Math.min(remain, cur.remaining())); 080 int bytesRead = dis.read(cur); 081 if (bytesRead < 0) { 082 throw new IOException( 083 "Premature EOF from inputStream, but still need " + remain + " " + "bytes"); 084 } 085 remain -= bytesRead; 086 } 087 } 088 089 /** 090 * Copying bytes from InputStream to {@link ByteBuff} by using an temporary heap byte[] (default 091 * size is 1024 now). 092 * @param in the InputStream to read 093 * @param out the destination {@link ByteBuff} 094 * @param length to read 095 * @throws IOException if any io error encountered. 096 */ 097 public static void readFullyWithHeapBuffer(InputStream in, ByteBuff out, int length) 098 throws IOException { 099 byte[] buffer = new byte[1024]; 100 if (length < 0) { 101 throw new IllegalArgumentException("Length must not be negative: " + length); 102 } 103 int remain = length, count; 104 while (remain > 0) { 105 count = in.read(buffer, 0, Math.min(remain, buffer.length)); 106 if (count < 0) { 107 throw new IOException( 108 "Premature EOF from inputStream, but still need " + remain + " bytes"); 109 } 110 out.put(buffer, 0, count); 111 remain -= count; 112 } 113 } 114 115 /** 116 * Read from an input stream at least <code>necessaryLen</code> and if possible, 117 * <code>extraLen</code> also if available. Analogous to 118 * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but specifies a number of "extra" 119 * bytes to also optionally read. 120 * @param in the input stream to read from 121 * @param buf the buffer to read into 122 * @param bufOffset the destination offset in the buffer 123 * @param necessaryLen the number of bytes that are absolutely necessary to read 124 * @param extraLen the number of extra bytes that would be nice to read 125 * @return true if succeeded reading the extra bytes 126 * @throws IOException if failed to read the necessary bytes 127 */ 128 private static boolean readWithExtraOnHeap(InputStream in, byte[] buf, int bufOffset, 129 int necessaryLen, int extraLen) throws IOException { 130 int bytesRemaining = necessaryLen + extraLen; 131 while (bytesRemaining > 0) { 132 int ret = in.read(buf, bufOffset, bytesRemaining); 133 if (ret < 0) { 134 if (bytesRemaining <= extraLen) { 135 // We could not read the "extra data", but that is OK. 136 break; 137 } 138 throw new IOException("Premature EOF from inputStream (read " + "returned " + ret 139 + ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen 140 + " extra bytes, " + "successfully read " + (necessaryLen + extraLen - bytesRemaining)); 141 } 142 bufOffset += ret; 143 bytesRemaining -= ret; 144 } 145 return bytesRemaining <= 0; 146 } 147 148 /** 149 * Read bytes into ByteBuffers directly, those buffers either contains the extraLen bytes or only 150 * contains necessaryLen bytes, which depends on how much bytes do the last time we read. 151 * @param buf the destination {@link ByteBuff}. 152 * @param dis input stream to read. 153 * @param necessaryLen bytes which we must read 154 * @param extraLen bytes which we may read 155 * @return if the returned flag is true, then we've finished to read the extraLen into our 156 * ByteBuffers, otherwise we've not read the extraLen bytes yet. 157 * @throws IOException if failed to read the necessary bytes. 158 */ 159 public static boolean readWithExtra(ByteBuff buf, FSDataInputStream dis, int necessaryLen, 160 int extraLen) throws IOException { 161 if (!isByteBufferReadable(dis)) { 162 // If InputStream does not support the ByteBuffer read, just read to heap and copy bytes to 163 // the destination ByteBuff. 164 byte[] heapBuf = new byte[necessaryLen + extraLen]; 165 boolean ret = readWithExtraOnHeap(dis, heapBuf, 0, necessaryLen, extraLen); 166 copyToByteBuff(heapBuf, 0, heapBuf.length, buf); 167 return ret; 168 } 169 ByteBuffer[] buffers = buf.nioByteBuffers(); 170 int bytesRead = 0; 171 int remain = necessaryLen + extraLen; 172 int idx = 0; 173 ByteBuffer cur = buffers[idx]; 174 while (bytesRead < necessaryLen) { 175 while (!cur.hasRemaining()) { 176 if (++idx >= buffers.length) { 177 throw new IOException("Not enough ByteBuffers to read the reminding " + remain + "bytes"); 178 } 179 cur = buffers[idx]; 180 } 181 cur.limit(cur.position() + Math.min(remain, cur.remaining())); 182 int ret = dis.read(cur); 183 if (ret < 0) { 184 throw new IOException("Premature EOF from inputStream (read returned " + ret 185 + ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen 186 + " extra bytes, successfully read " + bytesRead); 187 } 188 bytesRead += ret; 189 remain -= ret; 190 } 191 return (extraLen > 0) && (bytesRead == necessaryLen + extraLen); 192 } 193 194 /** 195 * Read from an input stream at least <code>necessaryLen</code> and if possible, 196 * <code>extraLen</code> also if available. Analogous to 197 * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but uses positional read and 198 * specifies a number of "extra" bytes that would be desirable but not absolutely necessary to 199 * read. 200 * @param buff ByteBuff to read into. 201 * @param dis the input stream to read from 202 * @param position the position within the stream from which to start reading 203 * @param necessaryLen the number of bytes that are absolutely necessary to read 204 * @param extraLen the number of extra bytes that would be nice to read 205 * @return true if and only if extraLen is > 0 and reading those extra bytes was successful 206 * @throws IOException if failed to read the necessary bytes 207 */ 208 public static boolean preadWithExtra(ByteBuff buff, FSDataInputStream dis, long position, 209 int necessaryLen, int extraLen) throws IOException { 210 int remain = necessaryLen + extraLen; 211 byte[] buf = new byte[remain]; 212 int bytesRead = 0; 213 while (bytesRead < necessaryLen) { 214 int ret = dis.read(position + bytesRead, buf, bytesRead, remain); 215 if (ret < 0) { 216 throw new IOException("Premature EOF from inputStream (positional read returned " + ret 217 + ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen 218 + " extra bytes, successfully read " + bytesRead); 219 } 220 bytesRead += ret; 221 remain -= ret; 222 } 223 // Copy the bytes from on-heap bytes[] to ByteBuffer[] now, and after resolving HDFS-3246, we 224 // will read the bytes to ByteBuffer[] directly without allocating any on-heap byte[]. 225 // TODO I keep the bytes copy here, because I want to abstract the ByteBuffer[] 226 // preadWithExtra method for the upper layer, only need to refactor this method if the 227 // ByteBuffer pread is OK. 228 copyToByteBuff(buf, 0, bytesRead, buff); 229 return (extraLen > 0) && (bytesRead == necessaryLen + extraLen); 230 } 231 232 private static int copyToByteBuff(byte[] buf, int offset, int len, ByteBuff out) 233 throws IOException { 234 if (offset < 0 || len < 0 || offset + len > buf.length) { 235 throw new IOException("Invalid offset=" + offset + " and len=" + len + ", cap=" + buf.length); 236 } 237 ByteBuffer[] buffers = out.nioByteBuffers(); 238 int idx = 0, remain = len, copyLen; 239 ByteBuffer cur = buffers[idx]; 240 while (remain > 0) { 241 while (!cur.hasRemaining()) { 242 if (++idx >= buffers.length) { 243 throw new IOException("Not enough ByteBuffers to read the reminding " + remain + "bytes"); 244 } 245 cur = buffers[idx]; 246 } 247 copyLen = Math.min(cur.remaining(), remain); 248 cur.put(buf, offset, copyLen); 249 remain -= copyLen; 250 offset += copyLen; 251 } 252 return len; 253 } 254}