001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io;
019
020import java.io.Closeable;
021import java.io.IOException;
022import java.util.concurrent.atomic.AtomicInteger;
023import org.apache.hadoop.fs.FSDataInputStream;
024import org.apache.hadoop.fs.FileSystem;
025import org.apache.hadoop.fs.Path;
026import org.apache.hadoop.hbase.fs.HFileSystem;
027import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
028import org.apache.yetus.audience.InterfaceAudience;
029
030import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
031
032/**
033 * Wrapper for input stream(s) that takes care of the interaction of FS and HBase checksums, as well
034 * as closing streams. Initialization is not thread-safe, but normal operation is; see method
035 * comments.
036 */
037@InterfaceAudience.Private
038public class FSDataInputStreamWrapper implements Closeable {
039
040  private final HFileSystem hfs;
041  private final Path path;
042  private final FileLink link;
043  private final boolean doCloseStreams;
044  private final boolean dropBehind;
045  private final long readahead;
046
047  /**
048   * Two stream handles, one with and one without FS-level checksum. HDFS checksum setting is on FS
049   * level, not single read level, so you have to keep two FS objects and two handles open to
050   * interleave different reads freely, which is very sad. This is what we do: 1) First, we need to
051   * read the trailer of HFile to determine checksum parameters. We always use FS checksum to do
052   * that, so ctor opens {@link #stream}. 2.1) After that, if HBase checksum is not used, we'd just
053   * always use {@link #stream}; 2.2) If HBase checksum can be used, we'll open
054   * {@link #streamNoFsChecksum}, and close {@link #stream}. User MUST call prepareForBlockReader
055   * for that to happen; if they don't, (2.1) will be the default. 3) The users can call
056   * {@link #shouldUseHBaseChecksum()}, and pass its result to {@link #getStream(boolean)} to get
057   * stream (if Java had out/pointer params we could return both in one call). This stream is
058   * guaranteed to be set. 4) The first time HBase checksum fails, one would call
059   * {@link #fallbackToFsChecksum(int)}. That will take lock, and open {@link #stream}. While this
060   * is going on, others will continue to use the old stream; if they also want to fall back,
061   * they'll also call {@link #fallbackToFsChecksum(int)}, and block until {@link #stream} is set.
062   * 5) After some number of checksumOk() calls, we will go back to using HBase checksum. We will
063   * have 2 handles; however we presume checksums fail so rarely that we don't care.
064   */
065  private volatile FSDataInputStream stream = null;
066  private volatile FSDataInputStream streamNoFsChecksum = null;
067  private final Object streamNoFsChecksumFirstCreateLock = new Object();
068
069  // The configuration states that we should validate hbase checksums
070  private boolean useHBaseChecksumConfigured;
071
072  // Record the current state of this reader with respect to
073  // validating checkums in HBase. This is originally set the same
074  // value as useHBaseChecksumConfigured, but can change state as and when
075  // we encounter checksum verification failures.
076  private volatile boolean useHBaseChecksum;
077
078  // In the case of a checksum failure, do these many succeeding
079  // reads without hbase checksum verification.
080  private AtomicInteger hbaseChecksumOffCount = new AtomicInteger(-1);
081
082  private final static ReadStatistics readStatistics = new ReadStatistics();
083
084  private static class ReadStatistics {
085    long totalBytesRead;
086    long totalLocalBytesRead;
087    long totalShortCircuitBytesRead;
088    long totalZeroCopyBytesRead;
089  }
090
091  protected Path readerPath;
092
093  public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException {
094    this(fs, path, false, -1L);
095  }
096
097  public FSDataInputStreamWrapper(FileSystem fs, Path path, boolean dropBehind, long readahead)
098    throws IOException {
099    this(fs, null, path, dropBehind, readahead);
100  }
101
102  public FSDataInputStreamWrapper(FileSystem fs, FileLink link, boolean dropBehind, long readahead)
103    throws IOException {
104    this(fs, link, null, dropBehind, readahead);
105  }
106
107  private FSDataInputStreamWrapper(FileSystem fs, FileLink link, Path path, boolean dropBehind,
108    long readahead) throws IOException {
109    assert (path == null) != (link == null);
110    this.path = path;
111    this.link = link;
112    this.doCloseStreams = true;
113    this.dropBehind = dropBehind;
114    this.readahead = readahead;
115    // If the fs is not an instance of HFileSystem, then create an instance of HFileSystem
116    // that wraps over the specified fs. In this case, we will not be able to avoid
117    // checksumming inside the filesystem.
118    this.hfs = (fs instanceof HFileSystem) ? (HFileSystem) fs : new HFileSystem(fs);
119
120    // Initially we are going to read the tail block. Open the reader w/FS checksum.
121    this.useHBaseChecksumConfigured = this.useHBaseChecksum = false;
122    this.stream = (link != null) ? link.open(hfs) : hfs.open(path);
123    this.readerPath = this.stream.getWrappedStream() instanceof FileLink.FileLinkInputStream
124      ? ((FileLink.FileLinkInputStream) this.stream.getWrappedStream()).getCurrentPath()
125      : path;
126    setStreamOptions(stream);
127  }
128
129  private void setStreamOptions(FSDataInputStream in) {
130    try {
131      in.setDropBehind(dropBehind);
132    } catch (Exception e) {
133      // Skipped.
134    }
135    if (readahead >= 0) {
136      try {
137        in.setReadahead(readahead);
138      } catch (Exception e) {
139        // Skipped.
140      }
141    }
142  }
143
144  /**
145   * Prepares the streams for block reader. NOT THREAD SAFE. Must be called once, after any reads
146   * finish and before any other reads start (what happens in reality is we read the tail, then call
147   * this based on what's in the tail, then read blocks).
148   * @param forceNoHBaseChecksum Force not using HBase checksum.
149   */
150  public void prepareForBlockReader(boolean forceNoHBaseChecksum) throws IOException {
151    if (hfs == null) return;
152    assert this.stream != null && !this.useHBaseChecksumConfigured;
153    boolean useHBaseChecksum =
154      !forceNoHBaseChecksum && hfs.useHBaseChecksum() && (hfs.getNoChecksumFs() != hfs);
155
156    if (useHBaseChecksum) {
157      FileSystem fsNc = hfs.getNoChecksumFs();
158      this.streamNoFsChecksum = (link != null) ? link.open(fsNc) : fsNc.open(path);
159      setStreamOptions(streamNoFsChecksum);
160      this.useHBaseChecksumConfigured = this.useHBaseChecksum = useHBaseChecksum;
161      // Close the checksum stream; we will reopen it if we get an HBase checksum failure.
162      this.stream.close();
163      this.stream = null;
164    }
165  }
166
167  /** For use in tests. */
168  public FSDataInputStreamWrapper(FSDataInputStream fsdis) {
169    this(fsdis, fsdis);
170  }
171
172  /** For use in tests. */
173  public FSDataInputStreamWrapper(FSDataInputStream fsdis, FSDataInputStream noChecksum) {
174    doCloseStreams = false;
175    stream = fsdis;
176    streamNoFsChecksum = noChecksum;
177    path = null;
178    link = null;
179    hfs = null;
180    useHBaseChecksumConfigured = useHBaseChecksum = false;
181    dropBehind = false;
182    readahead = 0;
183  }
184
185  /** Returns Whether we are presently using HBase checksum. */
186  public boolean shouldUseHBaseChecksum() {
187    return this.useHBaseChecksum;
188  }
189
190  /**
191   * Get the stream to use. Thread-safe.
192   * @param useHBaseChecksum must be the value that shouldUseHBaseChecksum has returned at some
193   *                         point in the past, otherwise the result is undefined.
194   */
195  public FSDataInputStream getStream(boolean useHBaseChecksum) {
196    return useHBaseChecksum ? this.streamNoFsChecksum : this.stream;
197  }
198
199  /**
200   * Read from non-checksum stream failed, fall back to FS checksum. Thread-safe.
201   * @param offCount For how many checksumOk calls to turn off the HBase checksum.
202   */
203  public FSDataInputStream fallbackToFsChecksum(int offCount) throws IOException {
204    // checksumOffCount is speculative, but let's try to reset it less.
205    boolean partOfConvoy = false;
206    if (this.stream == null) {
207      synchronized (streamNoFsChecksumFirstCreateLock) {
208        partOfConvoy = (this.stream != null);
209        if (!partOfConvoy) {
210          this.stream = (link != null) ? link.open(hfs) : hfs.open(path);
211        }
212      }
213    }
214    if (!partOfConvoy) {
215      this.useHBaseChecksum = false;
216      this.hbaseChecksumOffCount.set(offCount);
217    }
218    return this.stream;
219  }
220
221  /** Report that checksum was ok, so we may ponder going back to HBase checksum. */
222  public void checksumOk() {
223    if (
224      this.useHBaseChecksumConfigured && !this.useHBaseChecksum
225        && (this.hbaseChecksumOffCount.getAndDecrement() < 0)
226    ) {
227      // The stream we need is already open (because we were using HBase checksum in the past).
228      assert this.streamNoFsChecksum != null;
229      this.useHBaseChecksum = true;
230    }
231  }
232
233  private void updateInputStreamStatistics(FSDataInputStream stream) {
234    // If the underlying file system is HDFS, update read statistics upon close.
235    if (stream instanceof HdfsDataInputStream) {
236      /**
237       * Because HDFS ReadStatistics is calculated per input stream, it is not feasible to update
238       * the aggregated number in real time. Instead, the metrics are updated when an input stream
239       * is closed.
240       */
241      HdfsDataInputStream hdfsDataInputStream = (HdfsDataInputStream) stream;
242      synchronized (readStatistics) {
243        readStatistics.totalBytesRead +=
244          hdfsDataInputStream.getReadStatistics().getTotalBytesRead();
245        readStatistics.totalLocalBytesRead +=
246          hdfsDataInputStream.getReadStatistics().getTotalLocalBytesRead();
247        readStatistics.totalShortCircuitBytesRead +=
248          hdfsDataInputStream.getReadStatistics().getTotalShortCircuitBytesRead();
249        readStatistics.totalZeroCopyBytesRead +=
250          hdfsDataInputStream.getReadStatistics().getTotalZeroCopyBytesRead();
251      }
252    }
253  }
254
255  public static long getTotalBytesRead() {
256    synchronized (readStatistics) {
257      return readStatistics.totalBytesRead;
258    }
259  }
260
261  public static long getLocalBytesRead() {
262    synchronized (readStatistics) {
263      return readStatistics.totalLocalBytesRead;
264    }
265  }
266
267  public static long getShortCircuitBytesRead() {
268    synchronized (readStatistics) {
269      return readStatistics.totalShortCircuitBytesRead;
270    }
271  }
272
273  public static long getZeroCopyBytesRead() {
274    synchronized (readStatistics) {
275      return readStatistics.totalZeroCopyBytesRead;
276    }
277  }
278
279  /** CloseClose stream(s) if necessary. */
280  @Override
281  public void close() {
282    if (!doCloseStreams) {
283      return;
284    }
285    updateInputStreamStatistics(this.streamNoFsChecksum);
286    // we do not care about the close exception as it is for reading, no data loss issue.
287    Closeables.closeQuietly(streamNoFsChecksum);
288
289    updateInputStreamStatistics(stream);
290    Closeables.closeQuietly(stream);
291  }
292
293  public HFileSystem getHfs() {
294    return this.hfs;
295  }
296
297  /**
298   * This will free sockets and file descriptors held by the stream only when the stream implements
299   * org.apache.hadoop.fs.CanUnbuffer. NOT THREAD SAFE. Must be called only when all the clients
300   * using this stream to read the blocks have finished reading. If by chance the stream is
301   * unbuffered and there are clients still holding this stream for read then on next client read
302   * request a new socket will be opened by Datanode without client knowing about it and will serve
303   * its read request. Note: If this socket is idle for some time then the DataNode will close the
304   * socket and the socket will move into CLOSE_WAIT state and on the next client request on this
305   * stream, the current socket will be closed and a new socket will be opened to serve the
306   * requests.
307   */
308  public void unbuffer() {
309    // todo: it may make sense to always unbuffer both streams. we'd need to carefully
310    // research the usages to know if that is safe. for now just do the current.
311    FSDataInputStream stream = this.getStream(this.shouldUseHBaseChecksum());
312    if (stream != null) {
313      stream.unbuffer();
314    }
315  }
316
317  public Path getReaderPath() {
318    return readerPath;
319  }
320
321  // For tests
322  void setShouldUseHBaseChecksum() {
323    useHBaseChecksumConfigured = true;
324    useHBaseChecksum = true;
325  }
326}