001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io; 019 020import java.io.Closeable; 021import java.io.IOException; 022import java.io.InputStream; 023import java.util.concurrent.atomic.AtomicInteger; 024 025import org.apache.commons.io.IOUtils; 026import org.apache.hadoop.fs.CanUnbuffer; 027import org.apache.hadoop.fs.FSDataInputStream; 028import org.apache.hadoop.fs.FileSystem; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.fs.HFileSystem; 031import org.apache.hadoop.hdfs.DFSInputStream; 032import org.apache.hadoop.hdfs.client.HdfsDataInputStream; 033import org.apache.yetus.audience.InterfaceAudience; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037/** 038 * Wrapper for input stream(s) that takes care of the interaction of FS and HBase checksums, 039 * as well as closing streams. Initialization is not thread-safe, but normal operation is; 040 * see method comments. 041 */ 042@InterfaceAudience.Private 043public class FSDataInputStreamWrapper implements Closeable { 044 private static final Logger LOG = LoggerFactory.getLogger(FSDataInputStreamWrapper.class); 045 private static final boolean isLogTraceEnabled = LOG.isTraceEnabled(); 046 047 private final HFileSystem hfs; 048 private final Path path; 049 private final FileLink link; 050 private final boolean doCloseStreams; 051 private final boolean dropBehind; 052 private final long readahead; 053 054 /** Two stream handles, one with and one without FS-level checksum. 055 * HDFS checksum setting is on FS level, not single read level, so you have to keep two 056 * FS objects and two handles open to interleave different reads freely, which is very sad. 057 * This is what we do: 058 * 1) First, we need to read the trailer of HFile to determine checksum parameters. 059 * We always use FS checksum to do that, so ctor opens {@link #stream}. 060 * 2.1) After that, if HBase checksum is not used, we'd just always use {@link #stream}; 061 * 2.2) If HBase checksum can be used, we'll open {@link #streamNoFsChecksum}, 062 * and close {@link #stream}. User MUST call prepareForBlockReader for that to happen; 063 * if they don't, (2.1) will be the default. 064 * 3) The users can call {@link #shouldUseHBaseChecksum()}, and pass its result to 065 * {@link #getStream(boolean)} to get stream (if Java had out/pointer params we could 066 * return both in one call). This stream is guaranteed to be set. 067 * 4) The first time HBase checksum fails, one would call {@link #fallbackToFsChecksum(int)}. 068 * That will take lock, and open {@link #stream}. While this is going on, others will 069 * continue to use the old stream; if they also want to fall back, they'll also call 070 * {@link #fallbackToFsChecksum(int)}, and block until {@link #stream} is set. 071 * 5) After some number of checksumOk() calls, we will go back to using HBase checksum. 072 * We will have 2 handles; however we presume checksums fail so rarely that we don't care. 073 */ 074 private volatile FSDataInputStream stream = null; 075 private volatile FSDataInputStream streamNoFsChecksum = null; 076 private final Object streamNoFsChecksumFirstCreateLock = new Object(); 077 078 // The configuration states that we should validate hbase checksums 079 private boolean useHBaseChecksumConfigured; 080 081 // Record the current state of this reader with respect to 082 // validating checkums in HBase. This is originally set the same 083 // value as useHBaseChecksumConfigured, but can change state as and when 084 // we encounter checksum verification failures. 085 private volatile boolean useHBaseChecksum; 086 087 // In the case of a checksum failure, do these many succeeding 088 // reads without hbase checksum verification. 089 private AtomicInteger hbaseChecksumOffCount = new AtomicInteger(-1); 090 091 private final static ReadStatistics readStatistics = new ReadStatistics(); 092 093 private static class ReadStatistics { 094 long totalBytesRead; 095 long totalLocalBytesRead; 096 long totalShortCircuitBytesRead; 097 long totalZeroCopyBytesRead; 098 } 099 100 private Boolean instanceOfCanUnbuffer = null; 101 private CanUnbuffer unbuffer = null; 102 103 public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException { 104 this(fs, path, false, -1L); 105 } 106 107 public FSDataInputStreamWrapper(FileSystem fs, Path path, boolean dropBehind, long readahead) throws IOException { 108 this(fs, null, path, dropBehind, readahead); 109 } 110 111 public FSDataInputStreamWrapper(FileSystem fs, FileLink link, 112 boolean dropBehind, long readahead) throws IOException { 113 this(fs, link, null, dropBehind, readahead); 114 } 115 116 private FSDataInputStreamWrapper(FileSystem fs, FileLink link, Path path, boolean dropBehind, 117 long readahead) throws IOException { 118 assert (path == null) != (link == null); 119 this.path = path; 120 this.link = link; 121 this.doCloseStreams = true; 122 this.dropBehind = dropBehind; 123 this.readahead = readahead; 124 // If the fs is not an instance of HFileSystem, then create an instance of HFileSystem 125 // that wraps over the specified fs. In this case, we will not be able to avoid 126 // checksumming inside the filesystem. 127 this.hfs = (fs instanceof HFileSystem) ? (HFileSystem) fs : new HFileSystem(fs); 128 129 // Initially we are going to read the tail block. Open the reader w/FS checksum. 130 this.useHBaseChecksumConfigured = this.useHBaseChecksum = false; 131 this.stream = (link != null) ? link.open(hfs) : hfs.open(path); 132 setStreamOptions(stream); 133 } 134 135 private void setStreamOptions(FSDataInputStream in) { 136 try { 137 in.setDropBehind(dropBehind); 138 } catch (Exception e) { 139 // Skipped. 140 } 141 if (readahead >= 0) { 142 try { 143 in.setReadahead(readahead); 144 } catch (Exception e) { 145 // Skipped. 146 } 147 } 148 } 149 150 /** 151 * Prepares the streams for block reader. NOT THREAD SAFE. Must be called once, after any 152 * reads finish and before any other reads start (what happens in reality is we read the 153 * tail, then call this based on what's in the tail, then read blocks). 154 * @param forceNoHBaseChecksum Force not using HBase checksum. 155 */ 156 public void prepareForBlockReader(boolean forceNoHBaseChecksum) throws IOException { 157 if (hfs == null) return; 158 assert this.stream != null && !this.useHBaseChecksumConfigured; 159 boolean useHBaseChecksum = 160 !forceNoHBaseChecksum && hfs.useHBaseChecksum() && (hfs.getNoChecksumFs() != hfs); 161 162 if (useHBaseChecksum) { 163 FileSystem fsNc = hfs.getNoChecksumFs(); 164 this.streamNoFsChecksum = (link != null) ? link.open(fsNc) : fsNc.open(path); 165 setStreamOptions(streamNoFsChecksum); 166 this.useHBaseChecksumConfigured = this.useHBaseChecksum = useHBaseChecksum; 167 // Close the checksum stream; we will reopen it if we get an HBase checksum failure. 168 this.stream.close(); 169 this.stream = null; 170 } 171 } 172 173 /** For use in tests. */ 174 public FSDataInputStreamWrapper(FSDataInputStream fsdis) { 175 this(fsdis, fsdis); 176 } 177 178 /** For use in tests. */ 179 public FSDataInputStreamWrapper(FSDataInputStream fsdis, FSDataInputStream noChecksum) { 180 doCloseStreams = false; 181 stream = fsdis; 182 streamNoFsChecksum = noChecksum; 183 path = null; 184 link = null; 185 hfs = null; 186 useHBaseChecksumConfigured = useHBaseChecksum = false; 187 dropBehind = false; 188 readahead = 0; 189 } 190 191 /** 192 * @return Whether we are presently using HBase checksum. 193 */ 194 public boolean shouldUseHBaseChecksum() { 195 return this.useHBaseChecksum; 196 } 197 198 /** 199 * Get the stream to use. Thread-safe. 200 * @param useHBaseChecksum must be the value that shouldUseHBaseChecksum has returned 201 * at some point in the past, otherwise the result is undefined. 202 */ 203 public FSDataInputStream getStream(boolean useHBaseChecksum) { 204 return useHBaseChecksum ? this.streamNoFsChecksum : this.stream; 205 } 206 207 /** 208 * Read from non-checksum stream failed, fall back to FS checksum. Thread-safe. 209 * @param offCount For how many checksumOk calls to turn off the HBase checksum. 210 */ 211 public FSDataInputStream fallbackToFsChecksum(int offCount) throws IOException { 212 // checksumOffCount is speculative, but let's try to reset it less. 213 boolean partOfConvoy = false; 214 if (this.stream == null) { 215 synchronized (streamNoFsChecksumFirstCreateLock) { 216 partOfConvoy = (this.stream != null); 217 if (!partOfConvoy) { 218 this.stream = (link != null) ? link.open(hfs) : hfs.open(path); 219 } 220 } 221 } 222 if (!partOfConvoy) { 223 this.useHBaseChecksum = false; 224 this.hbaseChecksumOffCount.set(offCount); 225 } 226 return this.stream; 227 } 228 229 /** Report that checksum was ok, so we may ponder going back to HBase checksum. */ 230 public void checksumOk() { 231 if (this.useHBaseChecksumConfigured && !this.useHBaseChecksum 232 && (this.hbaseChecksumOffCount.getAndDecrement() < 0)) { 233 // The stream we need is already open (because we were using HBase checksum in the past). 234 assert this.streamNoFsChecksum != null; 235 this.useHBaseChecksum = true; 236 } 237 } 238 239 private void updateInputStreamStatistics(FSDataInputStream stream) { 240 // If the underlying file system is HDFS, update read statistics upon close. 241 if (stream instanceof HdfsDataInputStream) { 242 /** 243 * Because HDFS ReadStatistics is calculated per input stream, it is not 244 * feasible to update the aggregated number in real time. Instead, the 245 * metrics are updated when an input stream is closed. 246 */ 247 HdfsDataInputStream hdfsDataInputStream = (HdfsDataInputStream)stream; 248 synchronized (readStatistics) { 249 readStatistics.totalBytesRead += hdfsDataInputStream.getReadStatistics(). 250 getTotalBytesRead(); 251 readStatistics.totalLocalBytesRead += hdfsDataInputStream.getReadStatistics(). 252 getTotalLocalBytesRead(); 253 readStatistics.totalShortCircuitBytesRead += hdfsDataInputStream.getReadStatistics(). 254 getTotalShortCircuitBytesRead(); 255 readStatistics.totalZeroCopyBytesRead += hdfsDataInputStream.getReadStatistics(). 256 getTotalZeroCopyBytesRead(); 257 } 258 } 259 } 260 261 public static long getTotalBytesRead() { 262 synchronized (readStatistics) { 263 return readStatistics.totalBytesRead; 264 } 265 } 266 267 public static long getLocalBytesRead() { 268 synchronized (readStatistics) { 269 return readStatistics.totalLocalBytesRead; 270 } 271 } 272 273 public static long getShortCircuitBytesRead() { 274 synchronized (readStatistics) { 275 return readStatistics.totalShortCircuitBytesRead; 276 } 277 } 278 279 public static long getZeroCopyBytesRead() { 280 synchronized (readStatistics) { 281 return readStatistics.totalZeroCopyBytesRead; 282 } 283 } 284 285 /** CloseClose stream(s) if necessary. */ 286 @Override 287 public void close() { 288 if (!doCloseStreams) { 289 return; 290 } 291 updateInputStreamStatistics(this.streamNoFsChecksum); 292 // we do not care about the close exception as it is for reading, no data loss issue. 293 IOUtils.closeQuietly(streamNoFsChecksum); 294 295 296 updateInputStreamStatistics(stream); 297 IOUtils.closeQuietly(stream); 298 } 299 300 public HFileSystem getHfs() { 301 return this.hfs; 302 } 303 304 /** 305 * This will free sockets and file descriptors held by the stream only when the stream implements 306 * org.apache.hadoop.fs.CanUnbuffer. NOT THREAD SAFE. Must be called only when all the clients 307 * using this stream to read the blocks have finished reading. If by chance the stream is 308 * unbuffered and there are clients still holding this stream for read then on next client read 309 * request a new socket will be opened by Datanode without client knowing about it and will serve 310 * its read request. Note: If this socket is idle for some time then the DataNode will close the 311 * socket and the socket will move into CLOSE_WAIT state and on the next client request on this 312 * stream, the current socket will be closed and a new socket will be opened to serve the 313 * requests. 314 */ 315 @SuppressWarnings({ "rawtypes" }) 316 public void unbuffer() { 317 FSDataInputStream stream = this.getStream(this.shouldUseHBaseChecksum()); 318 if (stream != null) { 319 InputStream wrappedStream = stream.getWrappedStream(); 320 // CanUnbuffer interface was added as part of HDFS-7694 and the fix is available in Hadoop 321 // 2.6.4+ and 2.7.1+ versions only so check whether the stream object implements the 322 // CanUnbuffer interface or not and based on that call the unbuffer api. 323 final Class<? extends InputStream> streamClass = wrappedStream.getClass(); 324 if (this.instanceOfCanUnbuffer == null) { 325 // To ensure we compute whether the stream is instance of CanUnbuffer only once. 326 this.instanceOfCanUnbuffer = false; 327 if (wrappedStream instanceof CanUnbuffer) { 328 this.unbuffer = (CanUnbuffer) wrappedStream; 329 this.instanceOfCanUnbuffer = true; 330 } 331 } 332 if (this.instanceOfCanUnbuffer) { 333 try { 334 this.unbuffer.unbuffer(); 335 } catch (UnsupportedOperationException e){ 336 if (isLogTraceEnabled) { 337 LOG.trace("Failed to invoke 'unbuffer' method in class " + streamClass 338 + " . So there may be the stream does not support unbuffering.", e); 339 } 340 } 341 } else { 342 if (isLogTraceEnabled) { 343 LOG.trace("Failed to find 'unbuffer' method in class " + streamClass); 344 } 345 } 346 } 347 } 348}