001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io; 019 020import java.io.Closeable; 021import java.io.IOException; 022import java.io.InputStream; 023import java.util.concurrent.atomic.AtomicInteger; 024import org.apache.hadoop.fs.CanUnbuffer; 025import org.apache.hadoop.fs.FSDataInputStream; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.fs.HFileSystem; 029import org.apache.hadoop.hdfs.client.HdfsDataInputStream; 030import org.apache.yetus.audience.InterfaceAudience; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 035 036/** 037 * Wrapper for input stream(s) that takes care of the interaction of FS and HBase checksums, as well 038 * as closing streams. Initialization is not thread-safe, but normal operation is; see method 039 * comments. 040 */ 041@InterfaceAudience.Private 042public class FSDataInputStreamWrapper implements Closeable { 043 private static final Logger LOG = LoggerFactory.getLogger(FSDataInputStreamWrapper.class); 044 private static final boolean isLogTraceEnabled = LOG.isTraceEnabled(); 045 046 private final HFileSystem hfs; 047 private final Path path; 048 private final FileLink link; 049 private final boolean doCloseStreams; 050 private final boolean dropBehind; 051 private final long readahead; 052 053 /** 054 * Two stream handles, one with and one without FS-level checksum. HDFS checksum setting is on FS 055 * level, not single read level, so you have to keep two FS objects and two handles open to 056 * interleave different reads freely, which is very sad. This is what we do: 1) First, we need to 057 * read the trailer of HFile to determine checksum parameters. We always use FS checksum to do 058 * that, so ctor opens {@link #stream}. 2.1) After that, if HBase checksum is not used, we'd just 059 * always use {@link #stream}; 2.2) If HBase checksum can be used, we'll open 060 * {@link #streamNoFsChecksum}, and close {@link #stream}. User MUST call prepareForBlockReader 061 * for that to happen; if they don't, (2.1) will be the default. 3) The users can call 062 * {@link #shouldUseHBaseChecksum()}, and pass its result to {@link #getStream(boolean)} to get 063 * stream (if Java had out/pointer params we could return both in one call). This stream is 064 * guaranteed to be set. 4) The first time HBase checksum fails, one would call 065 * {@link #fallbackToFsChecksum(int)}. That will take lock, and open {@link #stream}. While this 066 * is going on, others will continue to use the old stream; if they also want to fall back, 067 * they'll also call {@link #fallbackToFsChecksum(int)}, and block until {@link #stream} is set. 068 * 5) After some number of checksumOk() calls, we will go back to using HBase checksum. We will 069 * have 2 handles; however we presume checksums fail so rarely that we don't care. 070 */ 071 private volatile FSDataInputStream stream = null; 072 private volatile FSDataInputStream streamNoFsChecksum = null; 073 private final Object streamNoFsChecksumFirstCreateLock = new Object(); 074 075 // The configuration states that we should validate hbase checksums 076 private boolean useHBaseChecksumConfigured; 077 078 // Record the current state of this reader with respect to 079 // validating checkums in HBase. This is originally set the same 080 // value as useHBaseChecksumConfigured, but can change state as and when 081 // we encounter checksum verification failures. 082 private volatile boolean useHBaseChecksum; 083 084 // In the case of a checksum failure, do these many succeeding 085 // reads without hbase checksum verification. 086 private AtomicInteger hbaseChecksumOffCount = new AtomicInteger(-1); 087 088 private final static ReadStatistics readStatistics = new ReadStatistics(); 089 090 private static class ReadStatistics { 091 long totalBytesRead; 092 long totalLocalBytesRead; 093 long totalShortCircuitBytesRead; 094 long totalZeroCopyBytesRead; 095 } 096 097 private Boolean instanceOfCanUnbuffer = null; 098 private CanUnbuffer unbuffer = null; 099 100 public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException { 101 this(fs, path, false, -1L); 102 } 103 104 public FSDataInputStreamWrapper(FileSystem fs, Path path, boolean dropBehind, long readahead) 105 throws IOException { 106 this(fs, null, path, dropBehind, readahead); 107 } 108 109 public FSDataInputStreamWrapper(FileSystem fs, FileLink link, boolean dropBehind, long readahead) 110 throws IOException { 111 this(fs, link, null, dropBehind, readahead); 112 } 113 114 private FSDataInputStreamWrapper(FileSystem fs, FileLink link, Path path, boolean dropBehind, 115 long readahead) throws IOException { 116 assert (path == null) != (link == null); 117 this.path = path; 118 this.link = link; 119 this.doCloseStreams = true; 120 this.dropBehind = dropBehind; 121 this.readahead = readahead; 122 // If the fs is not an instance of HFileSystem, then create an instance of HFileSystem 123 // that wraps over the specified fs. In this case, we will not be able to avoid 124 // checksumming inside the filesystem. 125 this.hfs = (fs instanceof HFileSystem) ? (HFileSystem) fs : new HFileSystem(fs); 126 127 // Initially we are going to read the tail block. Open the reader w/FS checksum. 128 this.useHBaseChecksumConfigured = this.useHBaseChecksum = false; 129 this.stream = (link != null) ? link.open(hfs) : hfs.open(path); 130 setStreamOptions(stream); 131 } 132 133 private void setStreamOptions(FSDataInputStream in) { 134 try { 135 in.setDropBehind(dropBehind); 136 } catch (Exception e) { 137 // Skipped. 138 } 139 if (readahead >= 0) { 140 try { 141 in.setReadahead(readahead); 142 } catch (Exception e) { 143 // Skipped. 144 } 145 } 146 } 147 148 /** 149 * Prepares the streams for block reader. NOT THREAD SAFE. Must be called once, after any reads 150 * finish and before any other reads start (what happens in reality is we read the tail, then call 151 * this based on what's in the tail, then read blocks). 152 * @param forceNoHBaseChecksum Force not using HBase checksum. 153 */ 154 public void prepareForBlockReader(boolean forceNoHBaseChecksum) throws IOException { 155 if (hfs == null) return; 156 assert this.stream != null && !this.useHBaseChecksumConfigured; 157 boolean useHBaseChecksum = 158 !forceNoHBaseChecksum && hfs.useHBaseChecksum() && (hfs.getNoChecksumFs() != hfs); 159 160 if (useHBaseChecksum) { 161 FileSystem fsNc = hfs.getNoChecksumFs(); 162 this.streamNoFsChecksum = (link != null) ? link.open(fsNc) : fsNc.open(path); 163 setStreamOptions(streamNoFsChecksum); 164 this.useHBaseChecksumConfigured = this.useHBaseChecksum = useHBaseChecksum; 165 // Close the checksum stream; we will reopen it if we get an HBase checksum failure. 166 this.stream.close(); 167 this.stream = null; 168 } 169 } 170 171 /** For use in tests. */ 172 public FSDataInputStreamWrapper(FSDataInputStream fsdis) { 173 this(fsdis, fsdis); 174 } 175 176 /** For use in tests. */ 177 public FSDataInputStreamWrapper(FSDataInputStream fsdis, FSDataInputStream noChecksum) { 178 doCloseStreams = false; 179 stream = fsdis; 180 streamNoFsChecksum = noChecksum; 181 path = null; 182 link = null; 183 hfs = null; 184 useHBaseChecksumConfigured = useHBaseChecksum = false; 185 dropBehind = false; 186 readahead = 0; 187 } 188 189 /** 190 * @return Whether we are presently using HBase checksum. 191 */ 192 public boolean shouldUseHBaseChecksum() { 193 return this.useHBaseChecksum; 194 } 195 196 /** 197 * Get the stream to use. Thread-safe. 198 * @param useHBaseChecksum must be the value that shouldUseHBaseChecksum has returned at some 199 * point in the past, otherwise the result is undefined. 200 */ 201 public FSDataInputStream getStream(boolean useHBaseChecksum) { 202 return useHBaseChecksum ? this.streamNoFsChecksum : this.stream; 203 } 204 205 /** 206 * Read from non-checksum stream failed, fall back to FS checksum. Thread-safe. 207 * @param offCount For how many checksumOk calls to turn off the HBase checksum. 208 */ 209 public FSDataInputStream fallbackToFsChecksum(int offCount) throws IOException { 210 // checksumOffCount is speculative, but let's try to reset it less. 211 boolean partOfConvoy = false; 212 if (this.stream == null) { 213 synchronized (streamNoFsChecksumFirstCreateLock) { 214 partOfConvoy = (this.stream != null); 215 if (!partOfConvoy) { 216 this.stream = (link != null) ? link.open(hfs) : hfs.open(path); 217 } 218 } 219 } 220 if (!partOfConvoy) { 221 this.useHBaseChecksum = false; 222 this.hbaseChecksumOffCount.set(offCount); 223 } 224 return this.stream; 225 } 226 227 /** Report that checksum was ok, so we may ponder going back to HBase checksum. */ 228 public void checksumOk() { 229 if ( 230 this.useHBaseChecksumConfigured && !this.useHBaseChecksum 231 && (this.hbaseChecksumOffCount.getAndDecrement() < 0) 232 ) { 233 // The stream we need is already open (because we were using HBase checksum in the past). 234 assert this.streamNoFsChecksum != null; 235 this.useHBaseChecksum = true; 236 } 237 } 238 239 private void updateInputStreamStatistics(FSDataInputStream stream) { 240 // If the underlying file system is HDFS, update read statistics upon close. 241 if (stream instanceof HdfsDataInputStream) { 242 /** 243 * Because HDFS ReadStatistics is calculated per input stream, it is not feasible to update 244 * the aggregated number in real time. Instead, the metrics are updated when an input stream 245 * is closed. 246 */ 247 HdfsDataInputStream hdfsDataInputStream = (HdfsDataInputStream) stream; 248 synchronized (readStatistics) { 249 readStatistics.totalBytesRead += 250 hdfsDataInputStream.getReadStatistics().getTotalBytesRead(); 251 readStatistics.totalLocalBytesRead += 252 hdfsDataInputStream.getReadStatistics().getTotalLocalBytesRead(); 253 readStatistics.totalShortCircuitBytesRead += 254 hdfsDataInputStream.getReadStatistics().getTotalShortCircuitBytesRead(); 255 readStatistics.totalZeroCopyBytesRead += 256 hdfsDataInputStream.getReadStatistics().getTotalZeroCopyBytesRead(); 257 } 258 } 259 } 260 261 public static long getTotalBytesRead() { 262 synchronized (readStatistics) { 263 return readStatistics.totalBytesRead; 264 } 265 } 266 267 public static long getLocalBytesRead() { 268 synchronized (readStatistics) { 269 return readStatistics.totalLocalBytesRead; 270 } 271 } 272 273 public static long getShortCircuitBytesRead() { 274 synchronized (readStatistics) { 275 return readStatistics.totalShortCircuitBytesRead; 276 } 277 } 278 279 public static long getZeroCopyBytesRead() { 280 synchronized (readStatistics) { 281 return readStatistics.totalZeroCopyBytesRead; 282 } 283 } 284 285 /** CloseClose stream(s) if necessary. */ 286 @Override 287 public void close() { 288 if (!doCloseStreams) { 289 return; 290 } 291 updateInputStreamStatistics(this.streamNoFsChecksum); 292 // we do not care about the close exception as it is for reading, no data loss issue. 293 Closeables.closeQuietly(streamNoFsChecksum); 294 295 updateInputStreamStatistics(stream); 296 Closeables.closeQuietly(stream); 297 } 298 299 public HFileSystem getHfs() { 300 return this.hfs; 301 } 302 303 /** 304 * This will free sockets and file descriptors held by the stream only when the stream implements 305 * org.apache.hadoop.fs.CanUnbuffer. NOT THREAD SAFE. Must be called only when all the clients 306 * using this stream to read the blocks have finished reading. If by chance the stream is 307 * unbuffered and there are clients still holding this stream for read then on next client read 308 * request a new socket will be opened by Datanode without client knowing about it and will serve 309 * its read request. Note: If this socket is idle for some time then the DataNode will close the 310 * socket and the socket will move into CLOSE_WAIT state and on the next client request on this 311 * stream, the current socket will be closed and a new socket will be opened to serve the 312 * requests. 313 */ 314 @SuppressWarnings({ "rawtypes" }) 315 public void unbuffer() { 316 FSDataInputStream stream = this.getStream(this.shouldUseHBaseChecksum()); 317 if (stream != null) { 318 InputStream wrappedStream = stream.getWrappedStream(); 319 // CanUnbuffer interface was added as part of HDFS-7694 and the fix is available in Hadoop 320 // 2.6.4+ and 2.7.1+ versions only so check whether the stream object implements the 321 // CanUnbuffer interface or not and based on that call the unbuffer api. 322 final Class<? extends InputStream> streamClass = wrappedStream.getClass(); 323 if (this.instanceOfCanUnbuffer == null) { 324 // To ensure we compute whether the stream is instance of CanUnbuffer only once. 325 this.instanceOfCanUnbuffer = false; 326 if (wrappedStream instanceof CanUnbuffer) { 327 this.unbuffer = (CanUnbuffer) wrappedStream; 328 this.instanceOfCanUnbuffer = true; 329 } 330 } 331 if (this.instanceOfCanUnbuffer) { 332 try { 333 this.unbuffer.unbuffer(); 334 } catch (UnsupportedOperationException e) { 335 if (isLogTraceEnabled) { 336 LOG.trace("Failed to invoke 'unbuffer' method in class " + streamClass 337 + " . So there may be the stream does not support unbuffering.", e); 338 } 339 } 340 } else { 341 if (isLogTraceEnabled) { 342 LOG.trace("Failed to find 'unbuffer' method in class " + streamClass); 343 } 344 } 345 } 346 } 347}