001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io;
019
020import java.io.Closeable;
021import java.io.IOException;
022import java.io.InputStream;
023import java.util.concurrent.atomic.AtomicInteger;
024
025import org.apache.commons.io.IOUtils;
026import org.apache.hadoop.fs.CanUnbuffer;
027import org.apache.hadoop.fs.FSDataInputStream;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.fs.HFileSystem;
031import org.apache.hadoop.hdfs.DFSInputStream;
032import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
033import org.apache.yetus.audience.InterfaceAudience;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
038
039/**
040 * Wrapper for input stream(s) that takes care of the interaction of FS and HBase checksums,
041 * as well as closing streams. Initialization is not thread-safe, but normal operation is;
042 * see method comments.
043 */
044@InterfaceAudience.Private
045public class FSDataInputStreamWrapper implements Closeable {
046  private static final Logger LOG = LoggerFactory.getLogger(FSDataInputStreamWrapper.class);
047  private static final boolean isLogTraceEnabled = LOG.isTraceEnabled();
048
049  private final HFileSystem hfs;
050  private final Path path;
051  private final FileLink link;
052  private final boolean doCloseStreams;
053  private final boolean dropBehind;
054  private final long readahead;
055
056  /** Two stream handles, one with and one without FS-level checksum.
057   * HDFS checksum setting is on FS level, not single read level, so you have to keep two
058   * FS objects and two handles open to interleave different reads freely, which is very sad.
059   * This is what we do:
060   * 1) First, we need to read the trailer of HFile to determine checksum parameters.
061   *  We always use FS checksum to do that, so ctor opens {@link #stream}.
062   * 2.1) After that, if HBase checksum is not used, we'd just always use {@link #stream};
063   * 2.2) If HBase checksum can be used, we'll open {@link #streamNoFsChecksum},
064   *  and close {@link #stream}. User MUST call prepareForBlockReader for that to happen;
065   *  if they don't, (2.1) will be the default.
066   * 3) The users can call {@link #shouldUseHBaseChecksum()}, and pass its result to
067   *  {@link #getStream(boolean)} to get stream (if Java had out/pointer params we could
068   *  return both in one call). This stream is guaranteed to be set.
069   * 4) The first time HBase checksum fails, one would call {@link #fallbackToFsChecksum(int)}.
070   * That will take lock, and open {@link #stream}. While this is going on, others will
071   * continue to use the old stream; if they also want to fall back, they'll also call
072   * {@link #fallbackToFsChecksum(int)}, and block until {@link #stream} is set.
073   * 5) After some number of checksumOk() calls, we will go back to using HBase checksum.
074   * We will have 2 handles; however we presume checksums fail so rarely that we don't care.
075   */
076  private volatile FSDataInputStream stream = null;
077  private volatile FSDataInputStream streamNoFsChecksum = null;
078  private final Object streamNoFsChecksumFirstCreateLock = new Object();
079
080  // The configuration states that we should validate hbase checksums
081  private boolean useHBaseChecksumConfigured;
082
083  // Record the current state of this reader with respect to
084  // validating checkums in HBase. This is originally set the same
085  // value as useHBaseChecksumConfigured, but can change state as and when
086  // we encounter checksum verification failures.
087  private volatile boolean useHBaseChecksum;
088
089  // In the case of a checksum failure, do these many succeeding
090  // reads without hbase checksum verification.
091  private AtomicInteger hbaseChecksumOffCount = new AtomicInteger(-1);
092
093  private final static ReadStatistics readStatistics = new ReadStatistics();
094
095  private static class ReadStatistics {
096    long totalBytesRead;
097    long totalLocalBytesRead;
098    long totalShortCircuitBytesRead;
099    long totalZeroCopyBytesRead;
100  }
101
102  private Boolean instanceOfCanUnbuffer = null;
103  private CanUnbuffer unbuffer = null;
104
105  public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException {
106    this(fs, path, false, -1L);
107  }
108
109  public FSDataInputStreamWrapper(FileSystem fs, Path path, boolean dropBehind, long readahead) throws IOException {
110    this(fs, null, path, dropBehind, readahead);
111  }
112
113  public FSDataInputStreamWrapper(FileSystem fs, FileLink link,
114                                  boolean dropBehind, long readahead) throws IOException {
115    this(fs, link, null, dropBehind, readahead);
116  }
117
118  private FSDataInputStreamWrapper(FileSystem fs, FileLink link, Path path, boolean dropBehind,
119      long readahead) throws IOException {
120    assert (path == null) != (link == null);
121    this.path = path;
122    this.link = link;
123    this.doCloseStreams = true;
124    this.dropBehind = dropBehind;
125    this.readahead = readahead;
126    // If the fs is not an instance of HFileSystem, then create an instance of HFileSystem
127    // that wraps over the specified fs. In this case, we will not be able to avoid
128    // checksumming inside the filesystem.
129    this.hfs = (fs instanceof HFileSystem) ? (HFileSystem) fs : new HFileSystem(fs);
130
131    // Initially we are going to read the tail block. Open the reader w/FS checksum.
132    this.useHBaseChecksumConfigured = this.useHBaseChecksum = false;
133    this.stream = (link != null) ? link.open(hfs) : hfs.open(path);
134    setStreamOptions(stream);
135  }
136
137  private void setStreamOptions(FSDataInputStream in) {
138    try {
139      in.setDropBehind(dropBehind);
140    } catch (Exception e) {
141      // Skipped.
142    }
143    if (readahead >= 0) {
144      try {
145        in.setReadahead(readahead);
146      } catch (Exception e) {
147        // Skipped.
148      }
149    }
150  }
151
152  /**
153   * Prepares the streams for block reader. NOT THREAD SAFE. Must be called once, after any
154   * reads finish and before any other reads start (what happens in reality is we read the
155   * tail, then call this based on what's in the tail, then read blocks).
156   * @param forceNoHBaseChecksum Force not using HBase checksum.
157   */
158  public void prepareForBlockReader(boolean forceNoHBaseChecksum) throws IOException {
159    if (hfs == null) return;
160    assert this.stream != null && !this.useHBaseChecksumConfigured;
161    boolean useHBaseChecksum =
162        !forceNoHBaseChecksum && hfs.useHBaseChecksum() && (hfs.getNoChecksumFs() != hfs);
163
164    if (useHBaseChecksum) {
165      FileSystem fsNc = hfs.getNoChecksumFs();
166      this.streamNoFsChecksum = (link != null) ? link.open(fsNc) : fsNc.open(path);
167      setStreamOptions(streamNoFsChecksum);
168      this.useHBaseChecksumConfigured = this.useHBaseChecksum = useHBaseChecksum;
169      // Close the checksum stream; we will reopen it if we get an HBase checksum failure.
170      this.stream.close();
171      this.stream = null;
172    }
173  }
174
175  /** For use in tests. */
176  @VisibleForTesting
177  public FSDataInputStreamWrapper(FSDataInputStream fsdis) {
178    this(fsdis, fsdis);
179  }
180
181  /** For use in tests. */
182  @VisibleForTesting
183  public FSDataInputStreamWrapper(FSDataInputStream fsdis, FSDataInputStream noChecksum) {
184    doCloseStreams = false;
185    stream = fsdis;
186    streamNoFsChecksum = noChecksum;
187    path = null;
188    link = null;
189    hfs = null;
190    useHBaseChecksumConfigured = useHBaseChecksum = false;
191    dropBehind = false;
192    readahead = 0;
193  }
194
195  /**
196   * @return Whether we are presently using HBase checksum.
197   */
198  public boolean shouldUseHBaseChecksum() {
199    return this.useHBaseChecksum;
200  }
201
202  /**
203   * Get the stream to use. Thread-safe.
204   * @param useHBaseChecksum must be the value that shouldUseHBaseChecksum has returned
205   *  at some point in the past, otherwise the result is undefined.
206   */
207  public FSDataInputStream getStream(boolean useHBaseChecksum) {
208    return useHBaseChecksum ? this.streamNoFsChecksum : this.stream;
209  }
210
211  /**
212   * Read from non-checksum stream failed, fall back to FS checksum. Thread-safe.
213   * @param offCount For how many checksumOk calls to turn off the HBase checksum.
214   */
215  public FSDataInputStream fallbackToFsChecksum(int offCount) throws IOException {
216    // checksumOffCount is speculative, but let's try to reset it less.
217    boolean partOfConvoy = false;
218    if (this.stream == null) {
219      synchronized (streamNoFsChecksumFirstCreateLock) {
220        partOfConvoy = (this.stream != null);
221        if (!partOfConvoy) {
222          this.stream = (link != null) ? link.open(hfs) : hfs.open(path);
223        }
224      }
225    }
226    if (!partOfConvoy) {
227      this.useHBaseChecksum = false;
228      this.hbaseChecksumOffCount.set(offCount);
229    }
230    return this.stream;
231  }
232
233  /** Report that checksum was ok, so we may ponder going back to HBase checksum. */
234  public void checksumOk() {
235    if (this.useHBaseChecksumConfigured && !this.useHBaseChecksum
236        && (this.hbaseChecksumOffCount.getAndDecrement() < 0)) {
237      // The stream we need is already open (because we were using HBase checksum in the past).
238      assert this.streamNoFsChecksum != null;
239      this.useHBaseChecksum = true;
240    }
241  }
242
243  private void updateInputStreamStatistics(FSDataInputStream stream) {
244    // If the underlying file system is HDFS, update read statistics upon close.
245    if (stream instanceof HdfsDataInputStream) {
246      /**
247       * Because HDFS ReadStatistics is calculated per input stream, it is not
248       * feasible to update the aggregated number in real time. Instead, the
249       * metrics are updated when an input stream is closed.
250       */
251      HdfsDataInputStream hdfsDataInputStream = (HdfsDataInputStream)stream;
252      synchronized (readStatistics) {
253        readStatistics.totalBytesRead += hdfsDataInputStream.getReadStatistics().
254          getTotalBytesRead();
255        readStatistics.totalLocalBytesRead += hdfsDataInputStream.getReadStatistics().
256          getTotalLocalBytesRead();
257        readStatistics.totalShortCircuitBytesRead += hdfsDataInputStream.getReadStatistics().
258          getTotalShortCircuitBytesRead();
259        readStatistics.totalZeroCopyBytesRead += hdfsDataInputStream.getReadStatistics().
260          getTotalZeroCopyBytesRead();
261      }
262    }
263  }
264
265  public static long getTotalBytesRead() {
266    synchronized (readStatistics) {
267      return readStatistics.totalBytesRead;
268    }
269  }
270
271  public static long getLocalBytesRead() {
272    synchronized (readStatistics) {
273      return readStatistics.totalLocalBytesRead;
274    }
275  }
276
277  public static long getShortCircuitBytesRead() {
278    synchronized (readStatistics) {
279      return readStatistics.totalShortCircuitBytesRead;
280    }
281  }
282
283  public static long getZeroCopyBytesRead() {
284    synchronized (readStatistics) {
285      return readStatistics.totalZeroCopyBytesRead;
286    }
287  }
288
289  /** CloseClose stream(s) if necessary. */
290  @Override
291  public void close() {
292    if (!doCloseStreams) {
293      return;
294    }
295    updateInputStreamStatistics(this.streamNoFsChecksum);
296    // we do not care about the close exception as it is for reading, no data loss issue.
297    IOUtils.closeQuietly(streamNoFsChecksum);
298
299
300    updateInputStreamStatistics(stream);
301    IOUtils.closeQuietly(stream);
302  }
303
304  public HFileSystem getHfs() {
305    return this.hfs;
306  }
307
308  /**
309   * This will free sockets and file descriptors held by the stream only when the stream implements
310   * org.apache.hadoop.fs.CanUnbuffer. NOT THREAD SAFE. Must be called only when all the clients
311   * using this stream to read the blocks have finished reading. If by chance the stream is
312   * unbuffered and there are clients still holding this stream for read then on next client read
313   * request a new socket will be opened by Datanode without client knowing about it and will serve
314   * its read request. Note: If this socket is idle for some time then the DataNode will close the
315   * socket and the socket will move into CLOSE_WAIT state and on the next client request on this
316   * stream, the current socket will be closed and a new socket will be opened to serve the
317   * requests.
318   */
319  @SuppressWarnings({ "rawtypes" })
320  public void unbuffer() {
321    FSDataInputStream stream = this.getStream(this.shouldUseHBaseChecksum());
322    if (stream != null) {
323      InputStream wrappedStream = stream.getWrappedStream();
324      // CanUnbuffer interface was added as part of HDFS-7694 and the fix is available in Hadoop
325      // 2.6.4+ and 2.7.1+ versions only so check whether the stream object implements the
326      // CanUnbuffer interface or not and based on that call the unbuffer api.
327      final Class<? extends InputStream> streamClass = wrappedStream.getClass();
328      if (this.instanceOfCanUnbuffer == null) {
329        // To ensure we compute whether the stream is instance of CanUnbuffer only once.
330        this.instanceOfCanUnbuffer = false;
331        if (wrappedStream instanceof CanUnbuffer) {
332          this.unbuffer = (CanUnbuffer) wrappedStream;
333          this.instanceOfCanUnbuffer = true;
334        }
335      }
336      if (this.instanceOfCanUnbuffer) {
337        try {
338          this.unbuffer.unbuffer();
339        } catch (UnsupportedOperationException e){
340          if (isLogTraceEnabled) {
341            LOG.trace("Failed to invoke 'unbuffer' method in class " + streamClass
342                + " . So there may be the stream does not support unbuffering.", e);
343          }
344        }
345      } else {
346        if (isLogTraceEnabled) {
347          LOG.trace("Failed to find 'unbuffer' method in class " + streamClass);
348        }
349      }
350    }
351  }
352}