001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io; 019 020import java.io.Closeable; 021import java.io.IOException; 022import java.io.InputStream; 023import java.util.concurrent.atomic.AtomicInteger; 024import org.apache.hadoop.fs.CanUnbuffer; 025import org.apache.hadoop.fs.FSDataInputStream; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.fs.HFileSystem; 029import org.apache.hadoop.hdfs.client.HdfsDataInputStream; 030import org.apache.yetus.audience.InterfaceAudience; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 035 036/** 037 * Wrapper for input stream(s) that takes care of the interaction of FS and HBase checksums, as well 038 * as closing streams. Initialization is not thread-safe, but normal operation is; see method 039 * comments. 040 */ 041@InterfaceAudience.Private 042public class FSDataInputStreamWrapper implements Closeable { 043 private static final Logger LOG = LoggerFactory.getLogger(FSDataInputStreamWrapper.class); 044 private static final boolean isLogTraceEnabled = LOG.isTraceEnabled(); 045 046 private final HFileSystem hfs; 047 private final Path path; 048 private final FileLink link; 049 private final boolean doCloseStreams; 050 private final boolean dropBehind; 051 private final long readahead; 052 053 /** 054 * Two stream handles, one with and one without FS-level checksum. HDFS checksum setting is on FS 055 * level, not single read level, so you have to keep two FS objects and two handles open to 056 * interleave different reads freely, which is very sad. This is what we do: 1) First, we need to 057 * read the trailer of HFile to determine checksum parameters. We always use FS checksum to do 058 * that, so ctor opens {@link #stream}. 2.1) After that, if HBase checksum is not used, we'd just 059 * always use {@link #stream}; 2.2) If HBase checksum can be used, we'll open 060 * {@link #streamNoFsChecksum}, and close {@link #stream}. User MUST call prepareForBlockReader 061 * for that to happen; if they don't, (2.1) will be the default. 3) The users can call 062 * {@link #shouldUseHBaseChecksum()}, and pass its result to {@link #getStream(boolean)} to get 063 * stream (if Java had out/pointer params we could return both in one call). This stream is 064 * guaranteed to be set. 4) The first time HBase checksum fails, one would call 065 * {@link #fallbackToFsChecksum(int)}. That will take lock, and open {@link #stream}. While this 066 * is going on, others will continue to use the old stream; if they also want to fall back, 067 * they'll also call {@link #fallbackToFsChecksum(int)}, and block until {@link #stream} is set. 068 * 5) After some number of checksumOk() calls, we will go back to using HBase checksum. We will 069 * have 2 handles; however we presume checksums fail so rarely that we don't care. 070 */ 071 private volatile FSDataInputStream stream = null; 072 private volatile FSDataInputStream streamNoFsChecksum = null; 073 private final Object streamNoFsChecksumFirstCreateLock = new Object(); 074 075 // The configuration states that we should validate hbase checksums 076 private boolean useHBaseChecksumConfigured; 077 078 // Record the current state of this reader with respect to 079 // validating checkums in HBase. This is originally set the same 080 // value as useHBaseChecksumConfigured, but can change state as and when 081 // we encounter checksum verification failures. 082 private volatile boolean useHBaseChecksum; 083 084 // In the case of a checksum failure, do these many succeeding 085 // reads without hbase checksum verification. 086 private AtomicInteger hbaseChecksumOffCount = new AtomicInteger(-1); 087 088 private final static ReadStatistics readStatistics = new ReadStatistics(); 089 090 private static class ReadStatistics { 091 long totalBytesRead; 092 long totalLocalBytesRead; 093 long totalShortCircuitBytesRead; 094 long totalZeroCopyBytesRead; 095 } 096 097 private Boolean instanceOfCanUnbuffer = null; 098 private CanUnbuffer unbuffer = null; 099 100 public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException { 101 this(fs, path, false, -1L); 102 } 103 104 public FSDataInputStreamWrapper(FileSystem fs, Path path, boolean dropBehind, long readahead) 105 throws IOException { 106 this(fs, null, path, dropBehind, readahead); 107 } 108 109 public FSDataInputStreamWrapper(FileSystem fs, FileLink link, boolean dropBehind, long readahead) 110 throws IOException { 111 this(fs, link, null, dropBehind, readahead); 112 } 113 114 private FSDataInputStreamWrapper(FileSystem fs, FileLink link, Path path, boolean dropBehind, 115 long readahead) throws IOException { 116 assert (path == null) != (link == null); 117 this.path = path; 118 this.link = link; 119 this.doCloseStreams = true; 120 this.dropBehind = dropBehind; 121 this.readahead = readahead; 122 // If the fs is not an instance of HFileSystem, then create an instance of HFileSystem 123 // that wraps over the specified fs. In this case, we will not be able to avoid 124 // checksumming inside the filesystem. 125 this.hfs = (fs instanceof HFileSystem) ? (HFileSystem) fs : new HFileSystem(fs); 126 127 // Initially we are going to read the tail block. Open the reader w/FS checksum. 128 this.useHBaseChecksumConfigured = this.useHBaseChecksum = false; 129 this.stream = (link != null) ? link.open(hfs) : hfs.open(path); 130 setStreamOptions(stream); 131 } 132 133 private void setStreamOptions(FSDataInputStream in) { 134 try { 135 in.setDropBehind(dropBehind); 136 } catch (Exception e) { 137 // Skipped. 138 } 139 if (readahead >= 0) { 140 try { 141 in.setReadahead(readahead); 142 } catch (Exception e) { 143 // Skipped. 144 } 145 } 146 } 147 148 /** 149 * Prepares the streams for block reader. NOT THREAD SAFE. Must be called once, after any reads 150 * finish and before any other reads start (what happens in reality is we read the tail, then call 151 * this based on what's in the tail, then read blocks). 152 * @param forceNoHBaseChecksum Force not using HBase checksum. 153 */ 154 public void prepareForBlockReader(boolean forceNoHBaseChecksum) throws IOException { 155 if (hfs == null) return; 156 assert this.stream != null && !this.useHBaseChecksumConfigured; 157 boolean useHBaseChecksum = 158 !forceNoHBaseChecksum && hfs.useHBaseChecksum() && (hfs.getNoChecksumFs() != hfs); 159 160 if (useHBaseChecksum) { 161 FileSystem fsNc = hfs.getNoChecksumFs(); 162 this.streamNoFsChecksum = (link != null) ? link.open(fsNc) : fsNc.open(path); 163 setStreamOptions(streamNoFsChecksum); 164 this.useHBaseChecksumConfigured = this.useHBaseChecksum = useHBaseChecksum; 165 // Close the checksum stream; we will reopen it if we get an HBase checksum failure. 166 this.stream.close(); 167 this.stream = null; 168 } 169 } 170 171 /** For use in tests. */ 172 public FSDataInputStreamWrapper(FSDataInputStream fsdis) { 173 this(fsdis, fsdis); 174 } 175 176 /** For use in tests. */ 177 public FSDataInputStreamWrapper(FSDataInputStream fsdis, FSDataInputStream noChecksum) { 178 doCloseStreams = false; 179 stream = fsdis; 180 streamNoFsChecksum = noChecksum; 181 path = null; 182 link = null; 183 hfs = null; 184 useHBaseChecksumConfigured = useHBaseChecksum = false; 185 dropBehind = false; 186 readahead = 0; 187 } 188 189 /** Returns Whether we are presently using HBase checksum. */ 190 public boolean shouldUseHBaseChecksum() { 191 return this.useHBaseChecksum; 192 } 193 194 /** 195 * Get the stream to use. Thread-safe. 196 * @param useHBaseChecksum must be the value that shouldUseHBaseChecksum has returned at some 197 * point in the past, otherwise the result is undefined. 198 */ 199 public FSDataInputStream getStream(boolean useHBaseChecksum) { 200 return useHBaseChecksum ? this.streamNoFsChecksum : this.stream; 201 } 202 203 /** 204 * Read from non-checksum stream failed, fall back to FS checksum. Thread-safe. 205 * @param offCount For how many checksumOk calls to turn off the HBase checksum. 206 */ 207 public FSDataInputStream fallbackToFsChecksum(int offCount) throws IOException { 208 // checksumOffCount is speculative, but let's try to reset it less. 209 boolean partOfConvoy = false; 210 if (this.stream == null) { 211 synchronized (streamNoFsChecksumFirstCreateLock) { 212 partOfConvoy = (this.stream != null); 213 if (!partOfConvoy) { 214 this.stream = (link != null) ? link.open(hfs) : hfs.open(path); 215 } 216 } 217 } 218 if (!partOfConvoy) { 219 this.useHBaseChecksum = false; 220 this.hbaseChecksumOffCount.set(offCount); 221 } 222 return this.stream; 223 } 224 225 /** Report that checksum was ok, so we may ponder going back to HBase checksum. */ 226 public void checksumOk() { 227 if ( 228 this.useHBaseChecksumConfigured && !this.useHBaseChecksum 229 && (this.hbaseChecksumOffCount.getAndDecrement() < 0) 230 ) { 231 // The stream we need is already open (because we were using HBase checksum in the past). 232 assert this.streamNoFsChecksum != null; 233 this.useHBaseChecksum = true; 234 } 235 } 236 237 private void updateInputStreamStatistics(FSDataInputStream stream) { 238 // If the underlying file system is HDFS, update read statistics upon close. 239 if (stream instanceof HdfsDataInputStream) { 240 /** 241 * Because HDFS ReadStatistics is calculated per input stream, it is not feasible to update 242 * the aggregated number in real time. Instead, the metrics are updated when an input stream 243 * is closed. 244 */ 245 HdfsDataInputStream hdfsDataInputStream = (HdfsDataInputStream) stream; 246 synchronized (readStatistics) { 247 readStatistics.totalBytesRead += 248 hdfsDataInputStream.getReadStatistics().getTotalBytesRead(); 249 readStatistics.totalLocalBytesRead += 250 hdfsDataInputStream.getReadStatistics().getTotalLocalBytesRead(); 251 readStatistics.totalShortCircuitBytesRead += 252 hdfsDataInputStream.getReadStatistics().getTotalShortCircuitBytesRead(); 253 readStatistics.totalZeroCopyBytesRead += 254 hdfsDataInputStream.getReadStatistics().getTotalZeroCopyBytesRead(); 255 } 256 } 257 } 258 259 public static long getTotalBytesRead() { 260 synchronized (readStatistics) { 261 return readStatistics.totalBytesRead; 262 } 263 } 264 265 public static long getLocalBytesRead() { 266 synchronized (readStatistics) { 267 return readStatistics.totalLocalBytesRead; 268 } 269 } 270 271 public static long getShortCircuitBytesRead() { 272 synchronized (readStatistics) { 273 return readStatistics.totalShortCircuitBytesRead; 274 } 275 } 276 277 public static long getZeroCopyBytesRead() { 278 synchronized (readStatistics) { 279 return readStatistics.totalZeroCopyBytesRead; 280 } 281 } 282 283 /** CloseClose stream(s) if necessary. */ 284 @Override 285 public void close() { 286 if (!doCloseStreams) { 287 return; 288 } 289 updateInputStreamStatistics(this.streamNoFsChecksum); 290 // we do not care about the close exception as it is for reading, no data loss issue. 291 Closeables.closeQuietly(streamNoFsChecksum); 292 293 updateInputStreamStatistics(stream); 294 Closeables.closeQuietly(stream); 295 } 296 297 public HFileSystem getHfs() { 298 return this.hfs; 299 } 300 301 /** 302 * This will free sockets and file descriptors held by the stream only when the stream implements 303 * org.apache.hadoop.fs.CanUnbuffer. NOT THREAD SAFE. Must be called only when all the clients 304 * using this stream to read the blocks have finished reading. If by chance the stream is 305 * unbuffered and there are clients still holding this stream for read then on next client read 306 * request a new socket will be opened by Datanode without client knowing about it and will serve 307 * its read request. Note: If this socket is idle for some time then the DataNode will close the 308 * socket and the socket will move into CLOSE_WAIT state and on the next client request on this 309 * stream, the current socket will be closed and a new socket will be opened to serve the 310 * requests. 311 */ 312 @SuppressWarnings({ "rawtypes" }) 313 public void unbuffer() { 314 FSDataInputStream stream = this.getStream(this.shouldUseHBaseChecksum()); 315 if (stream != null) { 316 InputStream wrappedStream = stream.getWrappedStream(); 317 // CanUnbuffer interface was added as part of HDFS-7694 and the fix is available in Hadoop 318 // 2.6.4+ and 2.7.1+ versions only so check whether the stream object implements the 319 // CanUnbuffer interface or not and based on that call the unbuffer api. 320 final Class<? extends InputStream> streamClass = wrappedStream.getClass(); 321 if (this.instanceOfCanUnbuffer == null) { 322 // To ensure we compute whether the stream is instance of CanUnbuffer only once. 323 this.instanceOfCanUnbuffer = false; 324 if (wrappedStream instanceof CanUnbuffer) { 325 this.unbuffer = (CanUnbuffer) wrappedStream; 326 this.instanceOfCanUnbuffer = true; 327 } 328 } 329 if (this.instanceOfCanUnbuffer) { 330 try { 331 this.unbuffer.unbuffer(); 332 } catch (UnsupportedOperationException e) { 333 if (isLogTraceEnabled) { 334 LOG.trace("Failed to invoke 'unbuffer' method in class " + streamClass 335 + " . So there may be the stream does not support unbuffering.", e); 336 } 337 } 338 } else { 339 if (isLogTraceEnabled) { 340 LOG.trace("Failed to find 'unbuffer' method in class " + streamClass); 341 } 342 } 343 } 344 } 345}