001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io; 019 020import java.io.Closeable; 021import java.io.IOException; 022import java.util.concurrent.atomic.AtomicInteger; 023import org.apache.hadoop.fs.FSDataInputStream; 024import org.apache.hadoop.fs.FileSystem; 025import org.apache.hadoop.fs.Path; 026import org.apache.hadoop.hbase.fs.HFileSystem; 027import org.apache.hadoop.hdfs.client.HdfsDataInputStream; 028import org.apache.yetus.audience.InterfaceAudience; 029 030import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 031 032/** 033 * Wrapper for input stream(s) that takes care of the interaction of FS and HBase checksums, as well 034 * as closing streams. Initialization is not thread-safe, but normal operation is; see method 035 * comments. 036 */ 037@InterfaceAudience.Private 038public class FSDataInputStreamWrapper implements Closeable { 039 040 private final HFileSystem hfs; 041 private final Path path; 042 private final FileLink link; 043 private final boolean doCloseStreams; 044 private final boolean dropBehind; 045 private final long readahead; 046 047 /** 048 * Two stream handles, one with and one without FS-level checksum. HDFS checksum setting is on FS 049 * level, not single read level, so you have to keep two FS objects and two handles open to 050 * interleave different reads freely, which is very sad. This is what we do: 1) First, we need to 051 * read the trailer of HFile to determine checksum parameters. We always use FS checksum to do 052 * that, so ctor opens {@link #stream}. 2.1) After that, if HBase checksum is not used, we'd just 053 * always use {@link #stream}; 2.2) If HBase checksum can be used, we'll open 054 * {@link #streamNoFsChecksum}, and close {@link #stream}. User MUST call prepareForBlockReader 055 * for that to happen; if they don't, (2.1) will be the default. 3) The users can call 056 * {@link #shouldUseHBaseChecksum()}, and pass its result to {@link #getStream(boolean)} to get 057 * stream (if Java had out/pointer params we could return both in one call). This stream is 058 * guaranteed to be set. 4) The first time HBase checksum fails, one would call 059 * {@link #fallbackToFsChecksum(int)}. That will take lock, and open {@link #stream}. While this 060 * is going on, others will continue to use the old stream; if they also want to fall back, 061 * they'll also call {@link #fallbackToFsChecksum(int)}, and block until {@link #stream} is set. 062 * 5) After some number of checksumOk() calls, we will go back to using HBase checksum. We will 063 * have 2 handles; however we presume checksums fail so rarely that we don't care. 064 */ 065 private volatile FSDataInputStream stream = null; 066 private volatile FSDataInputStream streamNoFsChecksum = null; 067 private final Object streamNoFsChecksumFirstCreateLock = new Object(); 068 069 // The configuration states that we should validate hbase checksums 070 private boolean useHBaseChecksumConfigured; 071 072 // Record the current state of this reader with respect to 073 // validating checkums in HBase. This is originally set the same 074 // value as useHBaseChecksumConfigured, but can change state as and when 075 // we encounter checksum verification failures. 076 private volatile boolean useHBaseChecksum; 077 078 // In the case of a checksum failure, do these many succeeding 079 // reads without hbase checksum verification. 080 private AtomicInteger hbaseChecksumOffCount = new AtomicInteger(-1); 081 082 private final static ReadStatistics readStatistics = new ReadStatistics(); 083 084 private static class ReadStatistics { 085 long totalBytesRead; 086 long totalLocalBytesRead; 087 long totalShortCircuitBytesRead; 088 long totalZeroCopyBytesRead; 089 } 090 091 protected Path readerPath; 092 093 public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException { 094 this(fs, path, false, -1L); 095 } 096 097 public FSDataInputStreamWrapper(FileSystem fs, Path path, boolean dropBehind, long readahead) 098 throws IOException { 099 this(fs, null, path, dropBehind, readahead); 100 } 101 102 public FSDataInputStreamWrapper(FileSystem fs, FileLink link, boolean dropBehind, long readahead) 103 throws IOException { 104 this(fs, link, null, dropBehind, readahead); 105 } 106 107 private FSDataInputStreamWrapper(FileSystem fs, FileLink link, Path path, boolean dropBehind, 108 long readahead) throws IOException { 109 assert (path == null) != (link == null); 110 this.path = path; 111 this.link = link; 112 this.doCloseStreams = true; 113 this.dropBehind = dropBehind; 114 this.readahead = readahead; 115 // If the fs is not an instance of HFileSystem, then create an instance of HFileSystem 116 // that wraps over the specified fs. In this case, we will not be able to avoid 117 // checksumming inside the filesystem. 118 this.hfs = (fs instanceof HFileSystem) ? (HFileSystem) fs : new HFileSystem(fs); 119 120 // Initially we are going to read the tail block. Open the reader w/FS checksum. 121 this.useHBaseChecksumConfigured = this.useHBaseChecksum = false; 122 this.stream = (link != null) ? link.open(hfs) : hfs.open(path); 123 this.readerPath = this.stream.getWrappedStream() instanceof FileLink.FileLinkInputStream 124 ? ((FileLink.FileLinkInputStream) this.stream.getWrappedStream()).getCurrentPath() 125 : path; 126 setStreamOptions(stream); 127 } 128 129 private void setStreamOptions(FSDataInputStream in) { 130 try { 131 in.setDropBehind(dropBehind); 132 } catch (Exception e) { 133 // Skipped. 134 } 135 if (readahead >= 0) { 136 try { 137 in.setReadahead(readahead); 138 } catch (Exception e) { 139 // Skipped. 140 } 141 } 142 } 143 144 /** 145 * Prepares the streams for block reader. NOT THREAD SAFE. Must be called once, after any reads 146 * finish and before any other reads start (what happens in reality is we read the tail, then call 147 * this based on what's in the tail, then read blocks). 148 * @param forceNoHBaseChecksum Force not using HBase checksum. 149 */ 150 public void prepareForBlockReader(boolean forceNoHBaseChecksum) throws IOException { 151 if (hfs == null) return; 152 assert this.stream != null && !this.useHBaseChecksumConfigured; 153 boolean useHBaseChecksum = 154 !forceNoHBaseChecksum && hfs.useHBaseChecksum() && (hfs.getNoChecksumFs() != hfs); 155 156 if (useHBaseChecksum) { 157 FileSystem fsNc = hfs.getNoChecksumFs(); 158 this.streamNoFsChecksum = (link != null) ? link.open(fsNc) : fsNc.open(path); 159 setStreamOptions(streamNoFsChecksum); 160 this.useHBaseChecksumConfigured = this.useHBaseChecksum = useHBaseChecksum; 161 // Close the checksum stream; we will reopen it if we get an HBase checksum failure. 162 this.stream.close(); 163 this.stream = null; 164 } 165 } 166 167 /** For use in tests. */ 168 public FSDataInputStreamWrapper(FSDataInputStream fsdis) { 169 this(fsdis, fsdis); 170 } 171 172 /** For use in tests. */ 173 public FSDataInputStreamWrapper(FSDataInputStream fsdis, FSDataInputStream noChecksum) { 174 doCloseStreams = false; 175 stream = fsdis; 176 streamNoFsChecksum = noChecksum; 177 path = null; 178 link = null; 179 hfs = null; 180 useHBaseChecksumConfigured = useHBaseChecksum = false; 181 dropBehind = false; 182 readahead = 0; 183 } 184 185 /** Returns Whether we are presently using HBase checksum. */ 186 public boolean shouldUseHBaseChecksum() { 187 return this.useHBaseChecksum; 188 } 189 190 /** 191 * Get the stream to use. Thread-safe. 192 * @param useHBaseChecksum must be the value that shouldUseHBaseChecksum has returned at some 193 * point in the past, otherwise the result is undefined. 194 */ 195 public FSDataInputStream getStream(boolean useHBaseChecksum) { 196 return useHBaseChecksum ? this.streamNoFsChecksum : this.stream; 197 } 198 199 /** 200 * Read from non-checksum stream failed, fall back to FS checksum. Thread-safe. 201 * @param offCount For how many checksumOk calls to turn off the HBase checksum. 202 */ 203 public FSDataInputStream fallbackToFsChecksum(int offCount) throws IOException { 204 // checksumOffCount is speculative, but let's try to reset it less. 205 boolean partOfConvoy = false; 206 if (this.stream == null) { 207 synchronized (streamNoFsChecksumFirstCreateLock) { 208 partOfConvoy = (this.stream != null); 209 if (!partOfConvoy) { 210 this.stream = (link != null) ? link.open(hfs) : hfs.open(path); 211 } 212 } 213 } 214 if (!partOfConvoy) { 215 this.useHBaseChecksum = false; 216 this.hbaseChecksumOffCount.set(offCount); 217 } 218 return this.stream; 219 } 220 221 /** Report that checksum was ok, so we may ponder going back to HBase checksum. */ 222 public void checksumOk() { 223 if ( 224 this.useHBaseChecksumConfigured && !this.useHBaseChecksum 225 && (this.hbaseChecksumOffCount.getAndDecrement() < 0) 226 ) { 227 // The stream we need is already open (because we were using HBase checksum in the past). 228 assert this.streamNoFsChecksum != null; 229 this.useHBaseChecksum = true; 230 } 231 } 232 233 private void updateInputStreamStatistics(FSDataInputStream stream) { 234 // If the underlying file system is HDFS, update read statistics upon close. 235 if (stream instanceof HdfsDataInputStream) { 236 /** 237 * Because HDFS ReadStatistics is calculated per input stream, it is not feasible to update 238 * the aggregated number in real time. Instead, the metrics are updated when an input stream 239 * is closed. 240 */ 241 HdfsDataInputStream hdfsDataInputStream = (HdfsDataInputStream) stream; 242 synchronized (readStatistics) { 243 readStatistics.totalBytesRead += 244 hdfsDataInputStream.getReadStatistics().getTotalBytesRead(); 245 readStatistics.totalLocalBytesRead += 246 hdfsDataInputStream.getReadStatistics().getTotalLocalBytesRead(); 247 readStatistics.totalShortCircuitBytesRead += 248 hdfsDataInputStream.getReadStatistics().getTotalShortCircuitBytesRead(); 249 readStatistics.totalZeroCopyBytesRead += 250 hdfsDataInputStream.getReadStatistics().getTotalZeroCopyBytesRead(); 251 } 252 } 253 } 254 255 public static long getTotalBytesRead() { 256 synchronized (readStatistics) { 257 return readStatistics.totalBytesRead; 258 } 259 } 260 261 public static long getLocalBytesRead() { 262 synchronized (readStatistics) { 263 return readStatistics.totalLocalBytesRead; 264 } 265 } 266 267 public static long getShortCircuitBytesRead() { 268 synchronized (readStatistics) { 269 return readStatistics.totalShortCircuitBytesRead; 270 } 271 } 272 273 public static long getZeroCopyBytesRead() { 274 synchronized (readStatistics) { 275 return readStatistics.totalZeroCopyBytesRead; 276 } 277 } 278 279 /** CloseClose stream(s) if necessary. */ 280 @Override 281 public void close() { 282 if (!doCloseStreams) { 283 return; 284 } 285 updateInputStreamStatistics(this.streamNoFsChecksum); 286 // we do not care about the close exception as it is for reading, no data loss issue. 287 Closeables.closeQuietly(streamNoFsChecksum); 288 289 updateInputStreamStatistics(stream); 290 Closeables.closeQuietly(stream); 291 } 292 293 public HFileSystem getHfs() { 294 return this.hfs; 295 } 296 297 /** 298 * This will free sockets and file descriptors held by the stream only when the stream implements 299 * org.apache.hadoop.fs.CanUnbuffer. NOT THREAD SAFE. Must be called only when all the clients 300 * using this stream to read the blocks have finished reading. If by chance the stream is 301 * unbuffered and there are clients still holding this stream for read then on next client read 302 * request a new socket will be opened by Datanode without client knowing about it and will serve 303 * its read request. Note: If this socket is idle for some time then the DataNode will close the 304 * socket and the socket will move into CLOSE_WAIT state and on the next client request on this 305 * stream, the current socket will be closed and a new socket will be opened to serve the 306 * requests. 307 */ 308 public void unbuffer() { 309 // todo: it may make sense to always unbuffer both streams. we'd need to carefully 310 // research the usages to know if that is safe. for now just do the current. 311 FSDataInputStream stream = this.getStream(this.shouldUseHBaseChecksum()); 312 if (stream != null) { 313 stream.unbuffer(); 314 } 315 } 316 317 public Path getReaderPath() { 318 return readerPath; 319 } 320 321 // For tests 322 void setShouldUseHBaseChecksum() { 323 useHBaseChecksumConfigured = true; 324 useHBaseChecksum = true; 325 } 326}