001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io; 019 020import java.io.Closeable; 021import java.io.IOException; 022import java.io.InputStream; 023import java.lang.reflect.InvocationTargetException; 024import java.lang.reflect.Method; 025import java.util.concurrent.atomic.AtomicInteger; 026 027import org.apache.commons.io.IOUtils; 028import org.apache.hadoop.fs.FSDataInputStream; 029import org.apache.hadoop.fs.FileSystem; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.fs.HFileSystem; 032import org.apache.yetus.audience.InterfaceAudience; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 037 038/** 039 * Wrapper for input stream(s) that takes care of the interaction of FS and HBase checksums, 040 * as well as closing streams. Initialization is not thread-safe, but normal operation is; 041 * see method comments. 042 */ 043@InterfaceAudience.Private 044public class FSDataInputStreamWrapper implements Closeable { 045 private static final Logger LOG = LoggerFactory.getLogger(FSDataInputStreamWrapper.class); 046 private static final boolean isLogTraceEnabled = LOG.isTraceEnabled(); 047 048 private final HFileSystem hfs; 049 private final Path path; 050 private final FileLink link; 051 private final boolean doCloseStreams; 052 private final boolean dropBehind; 053 private final long readahead; 054 055 /** Two stream handles, one with and one without FS-level checksum. 056 * HDFS checksum setting is on FS level, not single read level, so you have to keep two 057 * FS objects and two handles open to interleave different reads freely, which is very sad. 058 * This is what we do: 059 * 1) First, we need to read the trailer of HFile to determine checksum parameters. 060 * We always use FS checksum to do that, so ctor opens {@link #stream}. 061 * 2.1) After that, if HBase checksum is not used, we'd just always use {@link #stream}; 062 * 2.2) If HBase checksum can be used, we'll open {@link #streamNoFsChecksum}, 063 * and close {@link #stream}. User MUST call prepareForBlockReader for that to happen; 064 * if they don't, (2.1) will be the default. 065 * 3) The users can call {@link #shouldUseHBaseChecksum()}, and pass its result to 066 * {@link #getStream(boolean)} to get stream (if Java had out/pointer params we could 067 * return both in one call). This stream is guaranteed to be set. 068 * 4) The first time HBase checksum fails, one would call {@link #fallbackToFsChecksum(int)}. 069 * That will take lock, and open {@link #stream}. While this is going on, others will 070 * continue to use the old stream; if they also want to fall back, they'll also call 071 * {@link #fallbackToFsChecksum(int)}, and block until {@link #stream} is set. 072 * 5) After some number of checksumOk() calls, we will go back to using HBase checksum. 073 * We will have 2 handles; however we presume checksums fail so rarely that we don't care. 074 */ 075 private volatile FSDataInputStream stream = null; 076 private volatile FSDataInputStream streamNoFsChecksum = null; 077 private final Object streamNoFsChecksumFirstCreateLock = new Object(); 078 079 // The configuration states that we should validate hbase checksums 080 private boolean useHBaseChecksumConfigured; 081 082 // Record the current state of this reader with respect to 083 // validating checkums in HBase. This is originally set the same 084 // value as useHBaseChecksumConfigured, but can change state as and when 085 // we encounter checksum verification failures. 086 private volatile boolean useHBaseChecksum; 087 088 // In the case of a checksum failure, do these many succeeding 089 // reads without hbase checksum verification. 090 private AtomicInteger hbaseChecksumOffCount = new AtomicInteger(-1); 091 092 private Boolean instanceOfCanUnbuffer = null; 093 // Using reflection to get org.apache.hadoop.fs.CanUnbuffer#unbuffer method to avoid compilation 094 // errors against Hadoop pre 2.6.4 and 2.7.1 versions. 095 private Method unbuffer = null; 096 097 public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException { 098 this(fs, path, false, -1L); 099 } 100 101 public FSDataInputStreamWrapper(FileSystem fs, Path path, boolean dropBehind, long readahead) throws IOException { 102 this(fs, null, path, dropBehind, readahead); 103 } 104 105 public FSDataInputStreamWrapper(FileSystem fs, FileLink link, 106 boolean dropBehind, long readahead) throws IOException { 107 this(fs, link, null, dropBehind, readahead); 108 } 109 110 private FSDataInputStreamWrapper(FileSystem fs, FileLink link, Path path, boolean dropBehind, 111 long readahead) throws IOException { 112 assert (path == null) != (link == null); 113 this.path = path; 114 this.link = link; 115 this.doCloseStreams = true; 116 this.dropBehind = dropBehind; 117 this.readahead = readahead; 118 // If the fs is not an instance of HFileSystem, then create an instance of HFileSystem 119 // that wraps over the specified fs. In this case, we will not be able to avoid 120 // checksumming inside the filesystem. 121 this.hfs = (fs instanceof HFileSystem) ? (HFileSystem) fs : new HFileSystem(fs); 122 123 // Initially we are going to read the tail block. Open the reader w/FS checksum. 124 this.useHBaseChecksumConfigured = this.useHBaseChecksum = false; 125 this.stream = (link != null) ? link.open(hfs) : hfs.open(path); 126 setStreamOptions(stream); 127 } 128 129 private void setStreamOptions(FSDataInputStream in) { 130 try { 131 in.setDropBehind(dropBehind); 132 } catch (Exception e) { 133 // Skipped. 134 } 135 if (readahead >= 0) { 136 try { 137 in.setReadahead(readahead); 138 } catch (Exception e) { 139 // Skipped. 140 } 141 } 142 } 143 144 /** 145 * Prepares the streams for block reader. NOT THREAD SAFE. Must be called once, after any 146 * reads finish and before any other reads start (what happens in reality is we read the 147 * tail, then call this based on what's in the tail, then read blocks). 148 * @param forceNoHBaseChecksum Force not using HBase checksum. 149 */ 150 public void prepareForBlockReader(boolean forceNoHBaseChecksum) throws IOException { 151 if (hfs == null) return; 152 assert this.stream != null && !this.useHBaseChecksumConfigured; 153 boolean useHBaseChecksum = 154 !forceNoHBaseChecksum && hfs.useHBaseChecksum() && (hfs.getNoChecksumFs() != hfs); 155 156 if (useHBaseChecksum) { 157 FileSystem fsNc = hfs.getNoChecksumFs(); 158 this.streamNoFsChecksum = (link != null) ? link.open(fsNc) : fsNc.open(path); 159 setStreamOptions(streamNoFsChecksum); 160 this.useHBaseChecksumConfigured = this.useHBaseChecksum = useHBaseChecksum; 161 // Close the checksum stream; we will reopen it if we get an HBase checksum failure. 162 this.stream.close(); 163 this.stream = null; 164 } 165 } 166 167 /** For use in tests. */ 168 @VisibleForTesting 169 public FSDataInputStreamWrapper(FSDataInputStream fsdis) { 170 this(fsdis, fsdis); 171 } 172 173 /** For use in tests. */ 174 @VisibleForTesting 175 public FSDataInputStreamWrapper(FSDataInputStream fsdis, FSDataInputStream noChecksum) { 176 doCloseStreams = false; 177 stream = fsdis; 178 streamNoFsChecksum = noChecksum; 179 path = null; 180 link = null; 181 hfs = null; 182 useHBaseChecksumConfigured = useHBaseChecksum = false; 183 dropBehind = false; 184 readahead = 0; 185 } 186 187 /** 188 * @return Whether we are presently using HBase checksum. 189 */ 190 public boolean shouldUseHBaseChecksum() { 191 return this.useHBaseChecksum; 192 } 193 194 /** 195 * Get the stream to use. Thread-safe. 196 * @param useHBaseChecksum must be the value that shouldUseHBaseChecksum has returned 197 * at some point in the past, otherwise the result is undefined. 198 */ 199 public FSDataInputStream getStream(boolean useHBaseChecksum) { 200 return useHBaseChecksum ? this.streamNoFsChecksum : this.stream; 201 } 202 203 /** 204 * Read from non-checksum stream failed, fall back to FS checksum. Thread-safe. 205 * @param offCount For how many checksumOk calls to turn off the HBase checksum. 206 */ 207 public FSDataInputStream fallbackToFsChecksum(int offCount) throws IOException { 208 // checksumOffCount is speculative, but let's try to reset it less. 209 boolean partOfConvoy = false; 210 if (this.stream == null) { 211 synchronized (streamNoFsChecksumFirstCreateLock) { 212 partOfConvoy = (this.stream != null); 213 if (!partOfConvoy) { 214 this.stream = (link != null) ? link.open(hfs) : hfs.open(path); 215 } 216 } 217 } 218 if (!partOfConvoy) { 219 this.useHBaseChecksum = false; 220 this.hbaseChecksumOffCount.set(offCount); 221 } 222 return this.stream; 223 } 224 225 /** Report that checksum was ok, so we may ponder going back to HBase checksum. */ 226 public void checksumOk() { 227 if (this.useHBaseChecksumConfigured && !this.useHBaseChecksum 228 && (this.hbaseChecksumOffCount.getAndDecrement() < 0)) { 229 // The stream we need is already open (because we were using HBase checksum in the past). 230 assert this.streamNoFsChecksum != null; 231 this.useHBaseChecksum = true; 232 } 233 } 234 235 /** Close stream(s) if necessary. */ 236 @Override 237 public void close() { 238 if (!doCloseStreams) { 239 return; 240 } 241 // we do not care about the close exception as it is for reading, no data loss issue. 242 IOUtils.closeQuietly(streamNoFsChecksum); 243 IOUtils.closeQuietly(stream); 244 } 245 246 public HFileSystem getHfs() { 247 return this.hfs; 248 } 249 250 /** 251 * This will free sockets and file descriptors held by the stream only when the stream implements 252 * org.apache.hadoop.fs.CanUnbuffer. NOT THREAD SAFE. Must be called only when all the clients 253 * using this stream to read the blocks have finished reading. If by chance the stream is 254 * unbuffered and there are clients still holding this stream for read then on next client read 255 * request a new socket will be opened by Datanode without client knowing about it and will serve 256 * its read request. Note: If this socket is idle for some time then the DataNode will close the 257 * socket and the socket will move into CLOSE_WAIT state and on the next client request on this 258 * stream, the current socket will be closed and a new socket will be opened to serve the 259 * requests. 260 */ 261 @SuppressWarnings({ "rawtypes" }) 262 public void unbuffer() { 263 FSDataInputStream stream = this.getStream(this.shouldUseHBaseChecksum()); 264 if (stream != null) { 265 InputStream wrappedStream = stream.getWrappedStream(); 266 // CanUnbuffer interface was added as part of HDFS-7694 and the fix is available in Hadoop 267 // 2.6.4+ and 2.7.1+ versions only so check whether the stream object implements the 268 // CanUnbuffer interface or not and based on that call the unbuffer api. 269 final Class<? extends InputStream> streamClass = wrappedStream.getClass(); 270 if (this.instanceOfCanUnbuffer == null) { 271 // To ensure we compute whether the stream is instance of CanUnbuffer only once. 272 this.instanceOfCanUnbuffer = false; 273 Class<?>[] streamInterfaces = streamClass.getInterfaces(); 274 for (Class c : streamInterfaces) { 275 if (c.getCanonicalName().toString().equals("org.apache.hadoop.fs.CanUnbuffer")) { 276 try { 277 this.unbuffer = streamClass.getDeclaredMethod("unbuffer"); 278 } catch (NoSuchMethodException | SecurityException e) { 279 if (isLogTraceEnabled) { 280 LOG.trace("Failed to find 'unbuffer' method in class " + streamClass 281 + " . So there may be a TCP socket connection " 282 + "left open in CLOSE_WAIT state.", e); 283 } 284 return; 285 } 286 this.instanceOfCanUnbuffer = true; 287 break; 288 } 289 } 290 } 291 if (this.instanceOfCanUnbuffer) { 292 try { 293 this.unbuffer.invoke(wrappedStream); 294 } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { 295 if (isLogTraceEnabled) { 296 LOG.trace("Failed to invoke 'unbuffer' method in class " + streamClass 297 + " . So there may be a TCP socket connection left open in CLOSE_WAIT state.", e); 298 } 299 } 300 } else { 301 if (isLogTraceEnabled) { 302 LOG.trace("Failed to find 'unbuffer' method in class " + streamClass 303 + " . So there may be a TCP socket connection " 304 + "left open in CLOSE_WAIT state. For more details check " 305 + "https://issues.apache.org/jira/browse/HBASE-9393"); 306 } 307 } 308 } 309 } 310}