001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io; 019 020import java.io.Closeable; 021import java.io.IOException; 022import java.io.InputStream; 023import java.util.concurrent.atomic.AtomicInteger; 024import org.apache.hadoop.fs.CanUnbuffer; 025import org.apache.hadoop.fs.FSDataInputStream; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.fs.HFileSystem; 029import org.apache.hadoop.hdfs.client.HdfsDataInputStream; 030import org.apache.yetus.audience.InterfaceAudience; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 035 036/** 037 * Wrapper for input stream(s) that takes care of the interaction of FS and HBase checksums, as well 038 * as closing streams. Initialization is not thread-safe, but normal operation is; see method 039 * comments. 040 */ 041@InterfaceAudience.Private 042public class FSDataInputStreamWrapper implements Closeable { 043 private static final Logger LOG = LoggerFactory.getLogger(FSDataInputStreamWrapper.class); 044 private static final boolean isLogTraceEnabled = LOG.isTraceEnabled(); 045 046 private final HFileSystem hfs; 047 private final Path path; 048 private final FileLink link; 049 private final boolean doCloseStreams; 050 private final boolean dropBehind; 051 private final long readahead; 052 053 /** 054 * Two stream handles, one with and one without FS-level checksum. HDFS checksum setting is on FS 055 * level, not single read level, so you have to keep two FS objects and two handles open to 056 * interleave different reads freely, which is very sad. This is what we do: 1) First, we need to 057 * read the trailer of HFile to determine checksum parameters. We always use FS checksum to do 058 * that, so ctor opens {@link #stream}. 2.1) After that, if HBase checksum is not used, we'd just 059 * always use {@link #stream}; 2.2) If HBase checksum can be used, we'll open 060 * {@link #streamNoFsChecksum}, and close {@link #stream}. User MUST call prepareForBlockReader 061 * for that to happen; if they don't, (2.1) will be the default. 3) The users can call 062 * {@link #shouldUseHBaseChecksum()}, and pass its result to {@link #getStream(boolean)} to get 063 * stream (if Java had out/pointer params we could return both in one call). This stream is 064 * guaranteed to be set. 4) The first time HBase checksum fails, one would call 065 * {@link #fallbackToFsChecksum(int)}. That will take lock, and open {@link #stream}. While this 066 * is going on, others will continue to use the old stream; if they also want to fall back, 067 * they'll also call {@link #fallbackToFsChecksum(int)}, and block until {@link #stream} is set. 068 * 5) After some number of checksumOk() calls, we will go back to using HBase checksum. We will 069 * have 2 handles; however we presume checksums fail so rarely that we don't care. 070 */ 071 private volatile FSDataInputStream stream = null; 072 private volatile FSDataInputStream streamNoFsChecksum = null; 073 private final Object streamNoFsChecksumFirstCreateLock = new Object(); 074 075 // The configuration states that we should validate hbase checksums 076 private boolean useHBaseChecksumConfigured; 077 078 // Record the current state of this reader with respect to 079 // validating checkums in HBase. This is originally set the same 080 // value as useHBaseChecksumConfigured, but can change state as and when 081 // we encounter checksum verification failures. 082 private volatile boolean useHBaseChecksum; 083 084 // In the case of a checksum failure, do these many succeeding 085 // reads without hbase checksum verification. 086 private AtomicInteger hbaseChecksumOffCount = new AtomicInteger(-1); 087 088 private final static ReadStatistics readStatistics = new ReadStatistics(); 089 090 private static class ReadStatistics { 091 long totalBytesRead; 092 long totalLocalBytesRead; 093 long totalShortCircuitBytesRead; 094 long totalZeroCopyBytesRead; 095 } 096 097 private Boolean instanceOfCanUnbuffer = null; 098 private CanUnbuffer unbuffer = null; 099 100 protected Path readerPath; 101 102 public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException { 103 this(fs, path, false, -1L); 104 } 105 106 public FSDataInputStreamWrapper(FileSystem fs, Path path, boolean dropBehind, long readahead) 107 throws IOException { 108 this(fs, null, path, dropBehind, readahead); 109 } 110 111 public FSDataInputStreamWrapper(FileSystem fs, FileLink link, boolean dropBehind, long readahead) 112 throws IOException { 113 this(fs, link, null, dropBehind, readahead); 114 } 115 116 private FSDataInputStreamWrapper(FileSystem fs, FileLink link, Path path, boolean dropBehind, 117 long readahead) throws IOException { 118 assert (path == null) != (link == null); 119 this.path = path; 120 this.link = link; 121 this.doCloseStreams = true; 122 this.dropBehind = dropBehind; 123 this.readahead = readahead; 124 // If the fs is not an instance of HFileSystem, then create an instance of HFileSystem 125 // that wraps over the specified fs. In this case, we will not be able to avoid 126 // checksumming inside the filesystem. 127 this.hfs = (fs instanceof HFileSystem) ? (HFileSystem) fs : new HFileSystem(fs); 128 129 // Initially we are going to read the tail block. Open the reader w/FS checksum. 130 this.useHBaseChecksumConfigured = this.useHBaseChecksum = false; 131 this.stream = (link != null) ? link.open(hfs) : hfs.open(path); 132 this.readerPath = this.stream.getWrappedStream() instanceof FileLink.FileLinkInputStream 133 ? ((FileLink.FileLinkInputStream) this.stream.getWrappedStream()).getCurrentPath() 134 : path; 135 setStreamOptions(stream); 136 } 137 138 private void setStreamOptions(FSDataInputStream in) { 139 try { 140 in.setDropBehind(dropBehind); 141 } catch (Exception e) { 142 // Skipped. 143 } 144 if (readahead >= 0) { 145 try { 146 in.setReadahead(readahead); 147 } catch (Exception e) { 148 // Skipped. 149 } 150 } 151 } 152 153 /** 154 * Prepares the streams for block reader. NOT THREAD SAFE. Must be called once, after any reads 155 * finish and before any other reads start (what happens in reality is we read the tail, then call 156 * this based on what's in the tail, then read blocks). 157 * @param forceNoHBaseChecksum Force not using HBase checksum. 158 */ 159 public void prepareForBlockReader(boolean forceNoHBaseChecksum) throws IOException { 160 if (hfs == null) return; 161 assert this.stream != null && !this.useHBaseChecksumConfigured; 162 boolean useHBaseChecksum = 163 !forceNoHBaseChecksum && hfs.useHBaseChecksum() && (hfs.getNoChecksumFs() != hfs); 164 165 if (useHBaseChecksum) { 166 FileSystem fsNc = hfs.getNoChecksumFs(); 167 this.streamNoFsChecksum = (link != null) ? link.open(fsNc) : fsNc.open(path); 168 setStreamOptions(streamNoFsChecksum); 169 this.useHBaseChecksumConfigured = this.useHBaseChecksum = useHBaseChecksum; 170 // Close the checksum stream; we will reopen it if we get an HBase checksum failure. 171 this.stream.close(); 172 this.stream = null; 173 } 174 } 175 176 /** For use in tests. */ 177 public FSDataInputStreamWrapper(FSDataInputStream fsdis) { 178 this(fsdis, fsdis); 179 } 180 181 /** For use in tests. */ 182 public FSDataInputStreamWrapper(FSDataInputStream fsdis, FSDataInputStream noChecksum) { 183 doCloseStreams = false; 184 stream = fsdis; 185 streamNoFsChecksum = noChecksum; 186 path = null; 187 link = null; 188 hfs = null; 189 useHBaseChecksumConfigured = useHBaseChecksum = false; 190 dropBehind = false; 191 readahead = 0; 192 } 193 194 /** Returns Whether we are presently using HBase checksum. */ 195 public boolean shouldUseHBaseChecksum() { 196 return this.useHBaseChecksum; 197 } 198 199 /** 200 * Get the stream to use. Thread-safe. 201 * @param useHBaseChecksum must be the value that shouldUseHBaseChecksum has returned at some 202 * point in the past, otherwise the result is undefined. 203 */ 204 public FSDataInputStream getStream(boolean useHBaseChecksum) { 205 return useHBaseChecksum ? this.streamNoFsChecksum : this.stream; 206 } 207 208 /** 209 * Read from non-checksum stream failed, fall back to FS checksum. Thread-safe. 210 * @param offCount For how many checksumOk calls to turn off the HBase checksum. 211 */ 212 public FSDataInputStream fallbackToFsChecksum(int offCount) throws IOException { 213 // checksumOffCount is speculative, but let's try to reset it less. 214 boolean partOfConvoy = false; 215 if (this.stream == null) { 216 synchronized (streamNoFsChecksumFirstCreateLock) { 217 partOfConvoy = (this.stream != null); 218 if (!partOfConvoy) { 219 this.stream = (link != null) ? link.open(hfs) : hfs.open(path); 220 } 221 } 222 } 223 if (!partOfConvoy) { 224 this.useHBaseChecksum = false; 225 this.hbaseChecksumOffCount.set(offCount); 226 } 227 return this.stream; 228 } 229 230 /** Report that checksum was ok, so we may ponder going back to HBase checksum. */ 231 public void checksumOk() { 232 if ( 233 this.useHBaseChecksumConfigured && !this.useHBaseChecksum 234 && (this.hbaseChecksumOffCount.getAndDecrement() < 0) 235 ) { 236 // The stream we need is already open (because we were using HBase checksum in the past). 237 assert this.streamNoFsChecksum != null; 238 this.useHBaseChecksum = true; 239 } 240 } 241 242 private void updateInputStreamStatistics(FSDataInputStream stream) { 243 // If the underlying file system is HDFS, update read statistics upon close. 244 if (stream instanceof HdfsDataInputStream) { 245 /** 246 * Because HDFS ReadStatistics is calculated per input stream, it is not feasible to update 247 * the aggregated number in real time. Instead, the metrics are updated when an input stream 248 * is closed. 249 */ 250 HdfsDataInputStream hdfsDataInputStream = (HdfsDataInputStream) stream; 251 synchronized (readStatistics) { 252 readStatistics.totalBytesRead += 253 hdfsDataInputStream.getReadStatistics().getTotalBytesRead(); 254 readStatistics.totalLocalBytesRead += 255 hdfsDataInputStream.getReadStatistics().getTotalLocalBytesRead(); 256 readStatistics.totalShortCircuitBytesRead += 257 hdfsDataInputStream.getReadStatistics().getTotalShortCircuitBytesRead(); 258 readStatistics.totalZeroCopyBytesRead += 259 hdfsDataInputStream.getReadStatistics().getTotalZeroCopyBytesRead(); 260 } 261 } 262 } 263 264 public static long getTotalBytesRead() { 265 synchronized (readStatistics) { 266 return readStatistics.totalBytesRead; 267 } 268 } 269 270 public static long getLocalBytesRead() { 271 synchronized (readStatistics) { 272 return readStatistics.totalLocalBytesRead; 273 } 274 } 275 276 public static long getShortCircuitBytesRead() { 277 synchronized (readStatistics) { 278 return readStatistics.totalShortCircuitBytesRead; 279 } 280 } 281 282 public static long getZeroCopyBytesRead() { 283 synchronized (readStatistics) { 284 return readStatistics.totalZeroCopyBytesRead; 285 } 286 } 287 288 /** CloseClose stream(s) if necessary. */ 289 @Override 290 public void close() { 291 if (!doCloseStreams) { 292 return; 293 } 294 updateInputStreamStatistics(this.streamNoFsChecksum); 295 // we do not care about the close exception as it is for reading, no data loss issue. 296 Closeables.closeQuietly(streamNoFsChecksum); 297 298 updateInputStreamStatistics(stream); 299 Closeables.closeQuietly(stream); 300 } 301 302 public HFileSystem getHfs() { 303 return this.hfs; 304 } 305 306 /** 307 * This will free sockets and file descriptors held by the stream only when the stream implements 308 * org.apache.hadoop.fs.CanUnbuffer. NOT THREAD SAFE. Must be called only when all the clients 309 * using this stream to read the blocks have finished reading. If by chance the stream is 310 * unbuffered and there are clients still holding this stream for read then on next client read 311 * request a new socket will be opened by Datanode without client knowing about it and will serve 312 * its read request. Note: If this socket is idle for some time then the DataNode will close the 313 * socket and the socket will move into CLOSE_WAIT state and on the next client request on this 314 * stream, the current socket will be closed and a new socket will be opened to serve the 315 * requests. 316 */ 317 @SuppressWarnings({ "rawtypes" }) 318 public void unbuffer() { 319 FSDataInputStream stream = this.getStream(this.shouldUseHBaseChecksum()); 320 if (stream != null) { 321 InputStream wrappedStream = stream.getWrappedStream(); 322 // CanUnbuffer interface was added as part of HDFS-7694 and the fix is available in Hadoop 323 // 2.6.4+ and 2.7.1+ versions only so check whether the stream object implements the 324 // CanUnbuffer interface or not and based on that call the unbuffer api. 325 final Class<? extends InputStream> streamClass = wrappedStream.getClass(); 326 if (this.instanceOfCanUnbuffer == null) { 327 // To ensure we compute whether the stream is instance of CanUnbuffer only once. 328 this.instanceOfCanUnbuffer = false; 329 if (wrappedStream instanceof CanUnbuffer) { 330 this.unbuffer = (CanUnbuffer) wrappedStream; 331 this.instanceOfCanUnbuffer = true; 332 } 333 } 334 if (this.instanceOfCanUnbuffer) { 335 try { 336 this.unbuffer.unbuffer(); 337 } catch (UnsupportedOperationException e) { 338 if (isLogTraceEnabled) { 339 LOG.trace("Failed to invoke 'unbuffer' method in class " + streamClass 340 + " . So there may be the stream does not support unbuffering.", e); 341 } 342 } 343 } else { 344 if (isLogTraceEnabled) { 345 LOG.trace("Failed to find 'unbuffer' method in class " + streamClass); 346 } 347 } 348 } 349 } 350 351 public Path getReaderPath() { 352 return readerPath; 353 } 354}