001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io;
019
020import java.io.Closeable;
021import java.io.IOException;
022import java.io.InputStream;
023import java.lang.reflect.InvocationTargetException;
024import java.lang.reflect.Method;
025import java.util.concurrent.atomic.AtomicInteger;
026
027import org.apache.commons.io.IOUtils;
028import org.apache.hadoop.fs.FSDataInputStream;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.fs.HFileSystem;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
037
038/**
039 * Wrapper for input stream(s) that takes care of the interaction of FS and HBase checksums,
040 * as well as closing streams. Initialization is not thread-safe, but normal operation is;
041 * see method comments.
042 */
043@InterfaceAudience.Private
044public class FSDataInputStreamWrapper implements Closeable {
045  private static final Logger LOG = LoggerFactory.getLogger(FSDataInputStreamWrapper.class);
046  private static final boolean isLogTraceEnabled = LOG.isTraceEnabled();
047
048  private final HFileSystem hfs;
049  private final Path path;
050  private final FileLink link;
051  private final boolean doCloseStreams;
052  private final boolean dropBehind;
053  private final long readahead;
054
055  /** Two stream handles, one with and one without FS-level checksum.
056   * HDFS checksum setting is on FS level, not single read level, so you have to keep two
057   * FS objects and two handles open to interleave different reads freely, which is very sad.
058   * This is what we do:
059   * 1) First, we need to read the trailer of HFile to determine checksum parameters.
060   *  We always use FS checksum to do that, so ctor opens {@link #stream}.
061   * 2.1) After that, if HBase checksum is not used, we'd just always use {@link #stream};
062   * 2.2) If HBase checksum can be used, we'll open {@link #streamNoFsChecksum},
063   *  and close {@link #stream}. User MUST call prepareForBlockReader for that to happen;
064   *  if they don't, (2.1) will be the default.
065   * 3) The users can call {@link #shouldUseHBaseChecksum()}, and pass its result to
066   *  {@link #getStream(boolean)} to get stream (if Java had out/pointer params we could
067   *  return both in one call). This stream is guaranteed to be set.
068   * 4) The first time HBase checksum fails, one would call {@link #fallbackToFsChecksum(int)}.
069   * That will take lock, and open {@link #stream}. While this is going on, others will
070   * continue to use the old stream; if they also want to fall back, they'll also call
071   * {@link #fallbackToFsChecksum(int)}, and block until {@link #stream} is set.
072   * 5) After some number of checksumOk() calls, we will go back to using HBase checksum.
073   * We will have 2 handles; however we presume checksums fail so rarely that we don't care.
074   */
075  private volatile FSDataInputStream stream = null;
076  private volatile FSDataInputStream streamNoFsChecksum = null;
077  private final Object streamNoFsChecksumFirstCreateLock = new Object();
078
079  // The configuration states that we should validate hbase checksums
080  private boolean useHBaseChecksumConfigured;
081
082  // Record the current state of this reader with respect to
083  // validating checkums in HBase. This is originally set the same
084  // value as useHBaseChecksumConfigured, but can change state as and when
085  // we encounter checksum verification failures.
086  private volatile boolean useHBaseChecksum;
087
088  // In the case of a checksum failure, do these many succeeding
089  // reads without hbase checksum verification.
090  private AtomicInteger hbaseChecksumOffCount = new AtomicInteger(-1);
091
092  private Boolean instanceOfCanUnbuffer = null;
093  // Using reflection to get org.apache.hadoop.fs.CanUnbuffer#unbuffer method to avoid compilation
094  // errors against Hadoop pre 2.6.4 and 2.7.1 versions.
095  private Method unbuffer = null;
096
097  public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException {
098    this(fs, path, false, -1L);
099  }
100
101  public FSDataInputStreamWrapper(FileSystem fs, Path path, boolean dropBehind, long readahead) throws IOException {
102    this(fs, null, path, dropBehind, readahead);
103  }
104
105  public FSDataInputStreamWrapper(FileSystem fs, FileLink link,
106                                  boolean dropBehind, long readahead) throws IOException {
107    this(fs, link, null, dropBehind, readahead);
108  }
109
110  private FSDataInputStreamWrapper(FileSystem fs, FileLink link, Path path, boolean dropBehind,
111      long readahead) throws IOException {
112    assert (path == null) != (link == null);
113    this.path = path;
114    this.link = link;
115    this.doCloseStreams = true;
116    this.dropBehind = dropBehind;
117    this.readahead = readahead;
118    // If the fs is not an instance of HFileSystem, then create an instance of HFileSystem
119    // that wraps over the specified fs. In this case, we will not be able to avoid
120    // checksumming inside the filesystem.
121    this.hfs = (fs instanceof HFileSystem) ? (HFileSystem) fs : new HFileSystem(fs);
122
123    // Initially we are going to read the tail block. Open the reader w/FS checksum.
124    this.useHBaseChecksumConfigured = this.useHBaseChecksum = false;
125    this.stream = (link != null) ? link.open(hfs) : hfs.open(path);
126    setStreamOptions(stream);
127  }
128
129  private void setStreamOptions(FSDataInputStream in) {
130    try {
131      in.setDropBehind(dropBehind);
132    } catch (Exception e) {
133      // Skipped.
134    }
135    if (readahead >= 0) {
136      try {
137        in.setReadahead(readahead);
138      } catch (Exception e) {
139        // Skipped.
140      }
141    }
142  }
143
144  /**
145   * Prepares the streams for block reader. NOT THREAD SAFE. Must be called once, after any
146   * reads finish and before any other reads start (what happens in reality is we read the
147   * tail, then call this based on what's in the tail, then read blocks).
148   * @param forceNoHBaseChecksum Force not using HBase checksum.
149   */
150  public void prepareForBlockReader(boolean forceNoHBaseChecksum) throws IOException {
151    if (hfs == null) return;
152    assert this.stream != null && !this.useHBaseChecksumConfigured;
153    boolean useHBaseChecksum =
154        !forceNoHBaseChecksum && hfs.useHBaseChecksum() && (hfs.getNoChecksumFs() != hfs);
155
156    if (useHBaseChecksum) {
157      FileSystem fsNc = hfs.getNoChecksumFs();
158      this.streamNoFsChecksum = (link != null) ? link.open(fsNc) : fsNc.open(path);
159      setStreamOptions(streamNoFsChecksum);
160      this.useHBaseChecksumConfigured = this.useHBaseChecksum = useHBaseChecksum;
161      // Close the checksum stream; we will reopen it if we get an HBase checksum failure.
162      this.stream.close();
163      this.stream = null;
164    }
165  }
166
167  /** For use in tests. */
168  @VisibleForTesting
169  public FSDataInputStreamWrapper(FSDataInputStream fsdis) {
170    this(fsdis, fsdis);
171  }
172
173  /** For use in tests. */
174  @VisibleForTesting
175  public FSDataInputStreamWrapper(FSDataInputStream fsdis, FSDataInputStream noChecksum) {
176    doCloseStreams = false;
177    stream = fsdis;
178    streamNoFsChecksum = noChecksum;
179    path = null;
180    link = null;
181    hfs = null;
182    useHBaseChecksumConfigured = useHBaseChecksum = false;
183    dropBehind = false;
184    readahead = 0;
185  }
186
187  /**
188   * @return Whether we are presently using HBase checksum.
189   */
190  public boolean shouldUseHBaseChecksum() {
191    return this.useHBaseChecksum;
192  }
193
194  /**
195   * Get the stream to use. Thread-safe.
196   * @param useHBaseChecksum must be the value that shouldUseHBaseChecksum has returned
197   *  at some point in the past, otherwise the result is undefined.
198   */
199  public FSDataInputStream getStream(boolean useHBaseChecksum) {
200    return useHBaseChecksum ? this.streamNoFsChecksum : this.stream;
201  }
202
203  /**
204   * Read from non-checksum stream failed, fall back to FS checksum. Thread-safe.
205   * @param offCount For how many checksumOk calls to turn off the HBase checksum.
206   */
207  public FSDataInputStream fallbackToFsChecksum(int offCount) throws IOException {
208    // checksumOffCount is speculative, but let's try to reset it less.
209    boolean partOfConvoy = false;
210    if (this.stream == null) {
211      synchronized (streamNoFsChecksumFirstCreateLock) {
212        partOfConvoy = (this.stream != null);
213        if (!partOfConvoy) {
214          this.stream = (link != null) ? link.open(hfs) : hfs.open(path);
215        }
216      }
217    }
218    if (!partOfConvoy) {
219      this.useHBaseChecksum = false;
220      this.hbaseChecksumOffCount.set(offCount);
221    }
222    return this.stream;
223  }
224
225  /** Report that checksum was ok, so we may ponder going back to HBase checksum. */
226  public void checksumOk() {
227    if (this.useHBaseChecksumConfigured && !this.useHBaseChecksum
228        && (this.hbaseChecksumOffCount.getAndDecrement() < 0)) {
229      // The stream we need is already open (because we were using HBase checksum in the past).
230      assert this.streamNoFsChecksum != null;
231      this.useHBaseChecksum = true;
232    }
233  }
234
235  /** Close stream(s) if necessary. */
236  @Override
237  public void close() {
238    if (!doCloseStreams) {
239      return;
240    }
241    // we do not care about the close exception as it is for reading, no data loss issue.
242    IOUtils.closeQuietly(streamNoFsChecksum);
243    IOUtils.closeQuietly(stream);
244  }
245
246  public HFileSystem getHfs() {
247    return this.hfs;
248  }
249
250  /**
251   * This will free sockets and file descriptors held by the stream only when the stream implements
252   * org.apache.hadoop.fs.CanUnbuffer. NOT THREAD SAFE. Must be called only when all the clients
253   * using this stream to read the blocks have finished reading. If by chance the stream is
254   * unbuffered and there are clients still holding this stream for read then on next client read
255   * request a new socket will be opened by Datanode without client knowing about it and will serve
256   * its read request. Note: If this socket is idle for some time then the DataNode will close the
257   * socket and the socket will move into CLOSE_WAIT state and on the next client request on this
258   * stream, the current socket will be closed and a new socket will be opened to serve the
259   * requests.
260   */
261  @SuppressWarnings({ "rawtypes" })
262  public void unbuffer() {
263    FSDataInputStream stream = this.getStream(this.shouldUseHBaseChecksum());
264    if (stream != null) {
265      InputStream wrappedStream = stream.getWrappedStream();
266      // CanUnbuffer interface was added as part of HDFS-7694 and the fix is available in Hadoop
267      // 2.6.4+ and 2.7.1+ versions only so check whether the stream object implements the
268      // CanUnbuffer interface or not and based on that call the unbuffer api.
269      final Class<? extends InputStream> streamClass = wrappedStream.getClass();
270      if (this.instanceOfCanUnbuffer == null) {
271        // To ensure we compute whether the stream is instance of CanUnbuffer only once.
272        this.instanceOfCanUnbuffer = false;
273        Class<?>[] streamInterfaces = streamClass.getInterfaces();
274        for (Class c : streamInterfaces) {
275          if (c.getCanonicalName().toString().equals("org.apache.hadoop.fs.CanUnbuffer")) {
276            try {
277              this.unbuffer = streamClass.getDeclaredMethod("unbuffer");
278            } catch (NoSuchMethodException | SecurityException e) {
279              if (isLogTraceEnabled) {
280                LOG.trace("Failed to find 'unbuffer' method in class " + streamClass
281                    + " . So there may be a TCP socket connection "
282                    + "left open in CLOSE_WAIT state.", e);
283              }
284              return;
285            }
286            this.instanceOfCanUnbuffer = true;
287            break;
288          }
289        }
290      }
291      if (this.instanceOfCanUnbuffer) {
292        try {
293          this.unbuffer.invoke(wrappedStream);
294        } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
295          if (isLogTraceEnabled) {
296            LOG.trace("Failed to invoke 'unbuffer' method in class " + streamClass
297                + " . So there may be a TCP socket connection left open in CLOSE_WAIT state.", e);
298          }
299        }
300      } else {
301        if (isLogTraceEnabled) {
302          LOG.trace("Failed to find 'unbuffer' method in class " + streamClass
303              + " . So there may be a TCP socket connection "
304              + "left open in CLOSE_WAIT state. For more details check "
305              + "https://issues.apache.org/jira/browse/HBASE-9393");
306        }
307      }
308    }
309  }
310}