001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io;
019
020import java.io.Closeable;
021import java.io.IOException;
022import java.io.InputStream;
023import java.util.concurrent.atomic.AtomicInteger;
024import org.apache.hadoop.fs.CanUnbuffer;
025import org.apache.hadoop.fs.FSDataInputStream;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.fs.HFileSystem;
029import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
030import org.apache.yetus.audience.InterfaceAudience;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
035
036/**
037 * Wrapper for input stream(s) that takes care of the interaction of FS and HBase checksums, as well
038 * as closing streams. Initialization is not thread-safe, but normal operation is; see method
039 * comments.
040 */
041@InterfaceAudience.Private
042public class FSDataInputStreamWrapper implements Closeable {
043  private static final Logger LOG = LoggerFactory.getLogger(FSDataInputStreamWrapper.class);
044  private static final boolean isLogTraceEnabled = LOG.isTraceEnabled();
045
046  private final HFileSystem hfs;
047  private final Path path;
048  private final FileLink link;
049  private final boolean doCloseStreams;
050  private final boolean dropBehind;
051  private final long readahead;
052
053  /**
054   * Two stream handles, one with and one without FS-level checksum. HDFS checksum setting is on FS
055   * level, not single read level, so you have to keep two FS objects and two handles open to
056   * interleave different reads freely, which is very sad. This is what we do: 1) First, we need to
057   * read the trailer of HFile to determine checksum parameters. We always use FS checksum to do
058   * that, so ctor opens {@link #stream}. 2.1) After that, if HBase checksum is not used, we'd just
059   * always use {@link #stream}; 2.2) If HBase checksum can be used, we'll open
060   * {@link #streamNoFsChecksum}, and close {@link #stream}. User MUST call prepareForBlockReader
061   * for that to happen; if they don't, (2.1) will be the default. 3) The users can call
062   * {@link #shouldUseHBaseChecksum()}, and pass its result to {@link #getStream(boolean)} to get
063   * stream (if Java had out/pointer params we could return both in one call). This stream is
064   * guaranteed to be set. 4) The first time HBase checksum fails, one would call
065   * {@link #fallbackToFsChecksum(int)}. That will take lock, and open {@link #stream}. While this
066   * is going on, others will continue to use the old stream; if they also want to fall back,
067   * they'll also call {@link #fallbackToFsChecksum(int)}, and block until {@link #stream} is set.
068   * 5) After some number of checksumOk() calls, we will go back to using HBase checksum. We will
069   * have 2 handles; however we presume checksums fail so rarely that we don't care.
070   */
071  private volatile FSDataInputStream stream = null;
072  private volatile FSDataInputStream streamNoFsChecksum = null;
073  private final Object streamNoFsChecksumFirstCreateLock = new Object();
074
075  // The configuration states that we should validate hbase checksums
076  private boolean useHBaseChecksumConfigured;
077
078  // Record the current state of this reader with respect to
079  // validating checkums in HBase. This is originally set the same
080  // value as useHBaseChecksumConfigured, but can change state as and when
081  // we encounter checksum verification failures.
082  private volatile boolean useHBaseChecksum;
083
084  // In the case of a checksum failure, do these many succeeding
085  // reads without hbase checksum verification.
086  private AtomicInteger hbaseChecksumOffCount = new AtomicInteger(-1);
087
088  private final static ReadStatistics readStatistics = new ReadStatistics();
089
090  private static class ReadStatistics {
091    long totalBytesRead;
092    long totalLocalBytesRead;
093    long totalShortCircuitBytesRead;
094    long totalZeroCopyBytesRead;
095  }
096
097  private Boolean instanceOfCanUnbuffer = null;
098  private CanUnbuffer unbuffer = null;
099
100  protected Path readerPath;
101
102  public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException {
103    this(fs, path, false, -1L);
104  }
105
106  public FSDataInputStreamWrapper(FileSystem fs, Path path, boolean dropBehind, long readahead)
107    throws IOException {
108    this(fs, null, path, dropBehind, readahead);
109  }
110
111  public FSDataInputStreamWrapper(FileSystem fs, FileLink link, boolean dropBehind, long readahead)
112    throws IOException {
113    this(fs, link, null, dropBehind, readahead);
114  }
115
116  private FSDataInputStreamWrapper(FileSystem fs, FileLink link, Path path, boolean dropBehind,
117    long readahead) throws IOException {
118    assert (path == null) != (link == null);
119    this.path = path;
120    this.link = link;
121    this.doCloseStreams = true;
122    this.dropBehind = dropBehind;
123    this.readahead = readahead;
124    // If the fs is not an instance of HFileSystem, then create an instance of HFileSystem
125    // that wraps over the specified fs. In this case, we will not be able to avoid
126    // checksumming inside the filesystem.
127    this.hfs = (fs instanceof HFileSystem) ? (HFileSystem) fs : new HFileSystem(fs);
128
129    // Initially we are going to read the tail block. Open the reader w/FS checksum.
130    this.useHBaseChecksumConfigured = this.useHBaseChecksum = false;
131    this.stream = (link != null) ? link.open(hfs) : hfs.open(path);
132    this.readerPath = this.stream.getWrappedStream() instanceof FileLink.FileLinkInputStream
133      ? ((FileLink.FileLinkInputStream) this.stream.getWrappedStream()).getCurrentPath()
134      : path;
135    setStreamOptions(stream);
136  }
137
138  private void setStreamOptions(FSDataInputStream in) {
139    try {
140      in.setDropBehind(dropBehind);
141    } catch (Exception e) {
142      // Skipped.
143    }
144    if (readahead >= 0) {
145      try {
146        in.setReadahead(readahead);
147      } catch (Exception e) {
148        // Skipped.
149      }
150    }
151  }
152
153  /**
154   * Prepares the streams for block reader. NOT THREAD SAFE. Must be called once, after any reads
155   * finish and before any other reads start (what happens in reality is we read the tail, then call
156   * this based on what's in the tail, then read blocks).
157   * @param forceNoHBaseChecksum Force not using HBase checksum.
158   */
159  public void prepareForBlockReader(boolean forceNoHBaseChecksum) throws IOException {
160    if (hfs == null) return;
161    assert this.stream != null && !this.useHBaseChecksumConfigured;
162    boolean useHBaseChecksum =
163      !forceNoHBaseChecksum && hfs.useHBaseChecksum() && (hfs.getNoChecksumFs() != hfs);
164
165    if (useHBaseChecksum) {
166      FileSystem fsNc = hfs.getNoChecksumFs();
167      this.streamNoFsChecksum = (link != null) ? link.open(fsNc) : fsNc.open(path);
168      setStreamOptions(streamNoFsChecksum);
169      this.useHBaseChecksumConfigured = this.useHBaseChecksum = useHBaseChecksum;
170      // Close the checksum stream; we will reopen it if we get an HBase checksum failure.
171      this.stream.close();
172      this.stream = null;
173    }
174  }
175
176  /** For use in tests. */
177  public FSDataInputStreamWrapper(FSDataInputStream fsdis) {
178    this(fsdis, fsdis);
179  }
180
181  /** For use in tests. */
182  public FSDataInputStreamWrapper(FSDataInputStream fsdis, FSDataInputStream noChecksum) {
183    doCloseStreams = false;
184    stream = fsdis;
185    streamNoFsChecksum = noChecksum;
186    path = null;
187    link = null;
188    hfs = null;
189    useHBaseChecksumConfigured = useHBaseChecksum = false;
190    dropBehind = false;
191    readahead = 0;
192  }
193
194  /** Returns Whether we are presently using HBase checksum. */
195  public boolean shouldUseHBaseChecksum() {
196    return this.useHBaseChecksum;
197  }
198
199  /**
200   * Get the stream to use. Thread-safe.
201   * @param useHBaseChecksum must be the value that shouldUseHBaseChecksum has returned at some
202   *                         point in the past, otherwise the result is undefined.
203   */
204  public FSDataInputStream getStream(boolean useHBaseChecksum) {
205    return useHBaseChecksum ? this.streamNoFsChecksum : this.stream;
206  }
207
208  /**
209   * Read from non-checksum stream failed, fall back to FS checksum. Thread-safe.
210   * @param offCount For how many checksumOk calls to turn off the HBase checksum.
211   */
212  public FSDataInputStream fallbackToFsChecksum(int offCount) throws IOException {
213    // checksumOffCount is speculative, but let's try to reset it less.
214    boolean partOfConvoy = false;
215    if (this.stream == null) {
216      synchronized (streamNoFsChecksumFirstCreateLock) {
217        partOfConvoy = (this.stream != null);
218        if (!partOfConvoy) {
219          this.stream = (link != null) ? link.open(hfs) : hfs.open(path);
220        }
221      }
222    }
223    if (!partOfConvoy) {
224      this.useHBaseChecksum = false;
225      this.hbaseChecksumOffCount.set(offCount);
226    }
227    return this.stream;
228  }
229
230  /** Report that checksum was ok, so we may ponder going back to HBase checksum. */
231  public void checksumOk() {
232    if (
233      this.useHBaseChecksumConfigured && !this.useHBaseChecksum
234        && (this.hbaseChecksumOffCount.getAndDecrement() < 0)
235    ) {
236      // The stream we need is already open (because we were using HBase checksum in the past).
237      assert this.streamNoFsChecksum != null;
238      this.useHBaseChecksum = true;
239    }
240  }
241
242  private void updateInputStreamStatistics(FSDataInputStream stream) {
243    // If the underlying file system is HDFS, update read statistics upon close.
244    if (stream instanceof HdfsDataInputStream) {
245      /**
246       * Because HDFS ReadStatistics is calculated per input stream, it is not feasible to update
247       * the aggregated number in real time. Instead, the metrics are updated when an input stream
248       * is closed.
249       */
250      HdfsDataInputStream hdfsDataInputStream = (HdfsDataInputStream) stream;
251      synchronized (readStatistics) {
252        readStatistics.totalBytesRead +=
253          hdfsDataInputStream.getReadStatistics().getTotalBytesRead();
254        readStatistics.totalLocalBytesRead +=
255          hdfsDataInputStream.getReadStatistics().getTotalLocalBytesRead();
256        readStatistics.totalShortCircuitBytesRead +=
257          hdfsDataInputStream.getReadStatistics().getTotalShortCircuitBytesRead();
258        readStatistics.totalZeroCopyBytesRead +=
259          hdfsDataInputStream.getReadStatistics().getTotalZeroCopyBytesRead();
260      }
261    }
262  }
263
264  public static long getTotalBytesRead() {
265    synchronized (readStatistics) {
266      return readStatistics.totalBytesRead;
267    }
268  }
269
270  public static long getLocalBytesRead() {
271    synchronized (readStatistics) {
272      return readStatistics.totalLocalBytesRead;
273    }
274  }
275
276  public static long getShortCircuitBytesRead() {
277    synchronized (readStatistics) {
278      return readStatistics.totalShortCircuitBytesRead;
279    }
280  }
281
282  public static long getZeroCopyBytesRead() {
283    synchronized (readStatistics) {
284      return readStatistics.totalZeroCopyBytesRead;
285    }
286  }
287
288  /** CloseClose stream(s) if necessary. */
289  @Override
290  public void close() {
291    if (!doCloseStreams) {
292      return;
293    }
294    updateInputStreamStatistics(this.streamNoFsChecksum);
295    // we do not care about the close exception as it is for reading, no data loss issue.
296    Closeables.closeQuietly(streamNoFsChecksum);
297
298    updateInputStreamStatistics(stream);
299    Closeables.closeQuietly(stream);
300  }
301
302  public HFileSystem getHfs() {
303    return this.hfs;
304  }
305
306  /**
307   * This will free sockets and file descriptors held by the stream only when the stream implements
308   * org.apache.hadoop.fs.CanUnbuffer. NOT THREAD SAFE. Must be called only when all the clients
309   * using this stream to read the blocks have finished reading. If by chance the stream is
310   * unbuffered and there are clients still holding this stream for read then on next client read
311   * request a new socket will be opened by Datanode without client knowing about it and will serve
312   * its read request. Note: If this socket is idle for some time then the DataNode will close the
313   * socket and the socket will move into CLOSE_WAIT state and on the next client request on this
314   * stream, the current socket will be closed and a new socket will be opened to serve the
315   * requests.
316   */
317  @SuppressWarnings({ "rawtypes" })
318  public void unbuffer() {
319    FSDataInputStream stream = this.getStream(this.shouldUseHBaseChecksum());
320    if (stream != null) {
321      InputStream wrappedStream = stream.getWrappedStream();
322      // CanUnbuffer interface was added as part of HDFS-7694 and the fix is available in Hadoop
323      // 2.6.4+ and 2.7.1+ versions only so check whether the stream object implements the
324      // CanUnbuffer interface or not and based on that call the unbuffer api.
325      final Class<? extends InputStream> streamClass = wrappedStream.getClass();
326      if (this.instanceOfCanUnbuffer == null) {
327        // To ensure we compute whether the stream is instance of CanUnbuffer only once.
328        this.instanceOfCanUnbuffer = false;
329        if (wrappedStream instanceof CanUnbuffer) {
330          this.unbuffer = (CanUnbuffer) wrappedStream;
331          this.instanceOfCanUnbuffer = true;
332        }
333      }
334      if (this.instanceOfCanUnbuffer) {
335        try {
336          this.unbuffer.unbuffer();
337        } catch (UnsupportedOperationException e) {
338          if (isLogTraceEnabled) {
339            LOG.trace("Failed to invoke 'unbuffer' method in class " + streamClass
340              + " . So there may be the stream does not support unbuffering.", e);
341          }
342        }
343      } else {
344        if (isLogTraceEnabled) {
345          LOG.trace("Failed to find 'unbuffer' method in class " + streamClass);
346        }
347      }
348    }
349  }
350
351  public Path getReaderPath() {
352    return readerPath;
353  }
354}