001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io; 019 020import java.io.Closeable; 021import java.io.IOException; 022import java.io.InputStream; 023import java.util.concurrent.atomic.AtomicInteger; 024 025import org.apache.commons.io.IOUtils; 026import org.apache.hadoop.fs.CanUnbuffer; 027import org.apache.hadoop.fs.FSDataInputStream; 028import org.apache.hadoop.fs.FileSystem; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.fs.HFileSystem; 031import org.apache.hadoop.hdfs.DFSInputStream; 032import org.apache.hadoop.hdfs.client.HdfsDataInputStream; 033import org.apache.yetus.audience.InterfaceAudience; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 038 039/** 040 * Wrapper for input stream(s) that takes care of the interaction of FS and HBase checksums, 041 * as well as closing streams. Initialization is not thread-safe, but normal operation is; 042 * see method comments. 043 */ 044@InterfaceAudience.Private 045public class FSDataInputStreamWrapper implements Closeable { 046 private static final Logger LOG = LoggerFactory.getLogger(FSDataInputStreamWrapper.class); 047 private static final boolean isLogTraceEnabled = LOG.isTraceEnabled(); 048 049 private final HFileSystem hfs; 050 private final Path path; 051 private final FileLink link; 052 private final boolean doCloseStreams; 053 private final boolean dropBehind; 054 private final long readahead; 055 056 /** Two stream handles, one with and one without FS-level checksum. 057 * HDFS checksum setting is on FS level, not single read level, so you have to keep two 058 * FS objects and two handles open to interleave different reads freely, which is very sad. 059 * This is what we do: 060 * 1) First, we need to read the trailer of HFile to determine checksum parameters. 061 * We always use FS checksum to do that, so ctor opens {@link #stream}. 062 * 2.1) After that, if HBase checksum is not used, we'd just always use {@link #stream}; 063 * 2.2) If HBase checksum can be used, we'll open {@link #streamNoFsChecksum}, 064 * and close {@link #stream}. User MUST call prepareForBlockReader for that to happen; 065 * if they don't, (2.1) will be the default. 066 * 3) The users can call {@link #shouldUseHBaseChecksum()}, and pass its result to 067 * {@link #getStream(boolean)} to get stream (if Java had out/pointer params we could 068 * return both in one call). This stream is guaranteed to be set. 069 * 4) The first time HBase checksum fails, one would call {@link #fallbackToFsChecksum(int)}. 070 * That will take lock, and open {@link #stream}. While this is going on, others will 071 * continue to use the old stream; if they also want to fall back, they'll also call 072 * {@link #fallbackToFsChecksum(int)}, and block until {@link #stream} is set. 073 * 5) After some number of checksumOk() calls, we will go back to using HBase checksum. 074 * We will have 2 handles; however we presume checksums fail so rarely that we don't care. 075 */ 076 private volatile FSDataInputStream stream = null; 077 private volatile FSDataInputStream streamNoFsChecksum = null; 078 private final Object streamNoFsChecksumFirstCreateLock = new Object(); 079 080 // The configuration states that we should validate hbase checksums 081 private boolean useHBaseChecksumConfigured; 082 083 // Record the current state of this reader with respect to 084 // validating checkums in HBase. This is originally set the same 085 // value as useHBaseChecksumConfigured, but can change state as and when 086 // we encounter checksum verification failures. 087 private volatile boolean useHBaseChecksum; 088 089 // In the case of a checksum failure, do these many succeeding 090 // reads without hbase checksum verification. 091 private AtomicInteger hbaseChecksumOffCount = new AtomicInteger(-1); 092 093 private final static ReadStatistics readStatistics = new ReadStatistics(); 094 095 private static class ReadStatistics { 096 long totalBytesRead; 097 long totalLocalBytesRead; 098 long totalShortCircuitBytesRead; 099 long totalZeroCopyBytesRead; 100 } 101 102 private Boolean instanceOfCanUnbuffer = null; 103 private CanUnbuffer unbuffer = null; 104 105 public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException { 106 this(fs, path, false, -1L); 107 } 108 109 public FSDataInputStreamWrapper(FileSystem fs, Path path, boolean dropBehind, long readahead) throws IOException { 110 this(fs, null, path, dropBehind, readahead); 111 } 112 113 public FSDataInputStreamWrapper(FileSystem fs, FileLink link, 114 boolean dropBehind, long readahead) throws IOException { 115 this(fs, link, null, dropBehind, readahead); 116 } 117 118 private FSDataInputStreamWrapper(FileSystem fs, FileLink link, Path path, boolean dropBehind, 119 long readahead) throws IOException { 120 assert (path == null) != (link == null); 121 this.path = path; 122 this.link = link; 123 this.doCloseStreams = true; 124 this.dropBehind = dropBehind; 125 this.readahead = readahead; 126 // If the fs is not an instance of HFileSystem, then create an instance of HFileSystem 127 // that wraps over the specified fs. In this case, we will not be able to avoid 128 // checksumming inside the filesystem. 129 this.hfs = (fs instanceof HFileSystem) ? (HFileSystem) fs : new HFileSystem(fs); 130 131 // Initially we are going to read the tail block. Open the reader w/FS checksum. 132 this.useHBaseChecksumConfigured = this.useHBaseChecksum = false; 133 this.stream = (link != null) ? link.open(hfs) : hfs.open(path); 134 setStreamOptions(stream); 135 } 136 137 private void setStreamOptions(FSDataInputStream in) { 138 try { 139 in.setDropBehind(dropBehind); 140 } catch (Exception e) { 141 // Skipped. 142 } 143 if (readahead >= 0) { 144 try { 145 in.setReadahead(readahead); 146 } catch (Exception e) { 147 // Skipped. 148 } 149 } 150 } 151 152 /** 153 * Prepares the streams for block reader. NOT THREAD SAFE. Must be called once, after any 154 * reads finish and before any other reads start (what happens in reality is we read the 155 * tail, then call this based on what's in the tail, then read blocks). 156 * @param forceNoHBaseChecksum Force not using HBase checksum. 157 */ 158 public void prepareForBlockReader(boolean forceNoHBaseChecksum) throws IOException { 159 if (hfs == null) return; 160 assert this.stream != null && !this.useHBaseChecksumConfigured; 161 boolean useHBaseChecksum = 162 !forceNoHBaseChecksum && hfs.useHBaseChecksum() && (hfs.getNoChecksumFs() != hfs); 163 164 if (useHBaseChecksum) { 165 FileSystem fsNc = hfs.getNoChecksumFs(); 166 this.streamNoFsChecksum = (link != null) ? link.open(fsNc) : fsNc.open(path); 167 setStreamOptions(streamNoFsChecksum); 168 this.useHBaseChecksumConfigured = this.useHBaseChecksum = useHBaseChecksum; 169 // Close the checksum stream; we will reopen it if we get an HBase checksum failure. 170 this.stream.close(); 171 this.stream = null; 172 } 173 } 174 175 /** For use in tests. */ 176 @VisibleForTesting 177 public FSDataInputStreamWrapper(FSDataInputStream fsdis) { 178 this(fsdis, fsdis); 179 } 180 181 /** For use in tests. */ 182 @VisibleForTesting 183 public FSDataInputStreamWrapper(FSDataInputStream fsdis, FSDataInputStream noChecksum) { 184 doCloseStreams = false; 185 stream = fsdis; 186 streamNoFsChecksum = noChecksum; 187 path = null; 188 link = null; 189 hfs = null; 190 useHBaseChecksumConfigured = useHBaseChecksum = false; 191 dropBehind = false; 192 readahead = 0; 193 } 194 195 /** 196 * @return Whether we are presently using HBase checksum. 197 */ 198 public boolean shouldUseHBaseChecksum() { 199 return this.useHBaseChecksum; 200 } 201 202 /** 203 * Get the stream to use. Thread-safe. 204 * @param useHBaseChecksum must be the value that shouldUseHBaseChecksum has returned 205 * at some point in the past, otherwise the result is undefined. 206 */ 207 public FSDataInputStream getStream(boolean useHBaseChecksum) { 208 return useHBaseChecksum ? this.streamNoFsChecksum : this.stream; 209 } 210 211 /** 212 * Read from non-checksum stream failed, fall back to FS checksum. Thread-safe. 213 * @param offCount For how many checksumOk calls to turn off the HBase checksum. 214 */ 215 public FSDataInputStream fallbackToFsChecksum(int offCount) throws IOException { 216 // checksumOffCount is speculative, but let's try to reset it less. 217 boolean partOfConvoy = false; 218 if (this.stream == null) { 219 synchronized (streamNoFsChecksumFirstCreateLock) { 220 partOfConvoy = (this.stream != null); 221 if (!partOfConvoy) { 222 this.stream = (link != null) ? link.open(hfs) : hfs.open(path); 223 } 224 } 225 } 226 if (!partOfConvoy) { 227 this.useHBaseChecksum = false; 228 this.hbaseChecksumOffCount.set(offCount); 229 } 230 return this.stream; 231 } 232 233 /** Report that checksum was ok, so we may ponder going back to HBase checksum. */ 234 public void checksumOk() { 235 if (this.useHBaseChecksumConfigured && !this.useHBaseChecksum 236 && (this.hbaseChecksumOffCount.getAndDecrement() < 0)) { 237 // The stream we need is already open (because we were using HBase checksum in the past). 238 assert this.streamNoFsChecksum != null; 239 this.useHBaseChecksum = true; 240 } 241 } 242 243 private void updateInputStreamStatistics(FSDataInputStream stream) { 244 // If the underlying file system is HDFS, update read statistics upon close. 245 if (stream instanceof HdfsDataInputStream) { 246 /** 247 * Because HDFS ReadStatistics is calculated per input stream, it is not 248 * feasible to update the aggregated number in real time. Instead, the 249 * metrics are updated when an input stream is closed. 250 */ 251 HdfsDataInputStream hdfsDataInputStream = (HdfsDataInputStream)stream; 252 synchronized (readStatistics) { 253 readStatistics.totalBytesRead += hdfsDataInputStream.getReadStatistics(). 254 getTotalBytesRead(); 255 readStatistics.totalLocalBytesRead += hdfsDataInputStream.getReadStatistics(). 256 getTotalLocalBytesRead(); 257 readStatistics.totalShortCircuitBytesRead += hdfsDataInputStream.getReadStatistics(). 258 getTotalShortCircuitBytesRead(); 259 readStatistics.totalZeroCopyBytesRead += hdfsDataInputStream.getReadStatistics(). 260 getTotalZeroCopyBytesRead(); 261 } 262 } 263 } 264 265 public static long getTotalBytesRead() { 266 synchronized (readStatistics) { 267 return readStatistics.totalBytesRead; 268 } 269 } 270 271 public static long getLocalBytesRead() { 272 synchronized (readStatistics) { 273 return readStatistics.totalLocalBytesRead; 274 } 275 } 276 277 public static long getShortCircuitBytesRead() { 278 synchronized (readStatistics) { 279 return readStatistics.totalShortCircuitBytesRead; 280 } 281 } 282 283 public static long getZeroCopyBytesRead() { 284 synchronized (readStatistics) { 285 return readStatistics.totalZeroCopyBytesRead; 286 } 287 } 288 289 /** CloseClose stream(s) if necessary. */ 290 @Override 291 public void close() { 292 if (!doCloseStreams) { 293 return; 294 } 295 updateInputStreamStatistics(this.streamNoFsChecksum); 296 // we do not care about the close exception as it is for reading, no data loss issue. 297 IOUtils.closeQuietly(streamNoFsChecksum); 298 299 300 updateInputStreamStatistics(stream); 301 IOUtils.closeQuietly(stream); 302 } 303 304 public HFileSystem getHfs() { 305 return this.hfs; 306 } 307 308 /** 309 * This will free sockets and file descriptors held by the stream only when the stream implements 310 * org.apache.hadoop.fs.CanUnbuffer. NOT THREAD SAFE. Must be called only when all the clients 311 * using this stream to read the blocks have finished reading. If by chance the stream is 312 * unbuffered and there are clients still holding this stream for read then on next client read 313 * request a new socket will be opened by Datanode without client knowing about it and will serve 314 * its read request. Note: If this socket is idle for some time then the DataNode will close the 315 * socket and the socket will move into CLOSE_WAIT state and on the next client request on this 316 * stream, the current socket will be closed and a new socket will be opened to serve the 317 * requests. 318 */ 319 @SuppressWarnings({ "rawtypes" }) 320 public void unbuffer() { 321 FSDataInputStream stream = this.getStream(this.shouldUseHBaseChecksum()); 322 if (stream != null) { 323 InputStream wrappedStream = stream.getWrappedStream(); 324 // CanUnbuffer interface was added as part of HDFS-7694 and the fix is available in Hadoop 325 // 2.6.4+ and 2.7.1+ versions only so check whether the stream object implements the 326 // CanUnbuffer interface or not and based on that call the unbuffer api. 327 final Class<? extends InputStream> streamClass = wrappedStream.getClass(); 328 if (this.instanceOfCanUnbuffer == null) { 329 // To ensure we compute whether the stream is instance of CanUnbuffer only once. 330 this.instanceOfCanUnbuffer = false; 331 if (wrappedStream instanceof CanUnbuffer) { 332 this.unbuffer = (CanUnbuffer) wrappedStream; 333 this.instanceOfCanUnbuffer = true; 334 } 335 } 336 if (this.instanceOfCanUnbuffer) { 337 try { 338 this.unbuffer.unbuffer(); 339 } catch (UnsupportedOperationException e){ 340 if (isLogTraceEnabled) { 341 LOG.trace("Failed to invoke 'unbuffer' method in class " + streamClass 342 + " . So there may be the stream does not support unbuffering.", e); 343 } 344 } 345 } else { 346 if (isLogTraceEnabled) { 347 LOG.trace("Failed to find 'unbuffer' method in class " + streamClass); 348 } 349 } 350 } 351 } 352}