View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.io;
19  
20  import java.io.IOException;
21  
22  import org.apache.hadoop.fs.FSDataInputStream;
23  import org.apache.hadoop.fs.FileSystem;
24  import org.apache.hadoop.fs.Path;
25  import org.apache.hadoop.hbase.fs.HFileSystem;
26  
27  import com.google.common.annotations.VisibleForTesting;
28  
29  /**
30   * Wrapper for input stream(s) that takes care of the interaction of FS and HBase checksums,
31   * as well as closing streams. Initialization is not thread-safe, but normal operation is;
32   * see method comments.
33   */
34  public class FSDataInputStreamWrapper {
35    private final HFileSystem hfs;
36    private final Path path;
37    private final FileLink link;
38    private final boolean doCloseStreams;
39  
40    /** Two stream handles, one with and one without FS-level checksum.
41     * HDFS checksum setting is on FS level, not single read level, so you have to keep two
42     * FS objects and two handles open to interleave different reads freely, which is very sad.
43     * This is what we do:
44     * 1) First, we need to read the trailer of HFile to determine checksum parameters.
45     *  We always use FS checksum to do that, so ctor opens {@link #stream}.
46     * 2.1) After that, if HBase checksum is not used, we'd just always use {@link #stream};
47     * 2.2) If HBase checksum can be used, we'll open {@link #streamNoFsChecksum},
48     *  and close {@link #stream}. User MUST call prepareForBlockReader for that to happen;
49     *  if they don't, (2.1) will be the default.
50     * 3) The users can call {@link #shouldUseHBaseChecksum()}, and pass its result to
51     *  {@link #getStream(boolean)} to get stream (if Java had out/pointer params we could
52     *  return both in one call). This stream is guaranteed to be set.
53     * 4) The first time HBase checksum fails, one would call {@link #fallbackToFsChecksum(int)}.
54     * That will take lock, and open {@link #stream}. While this is going on, others will
55     * continue to use the old stream; if they also want to fall back, they'll also call
56     * {@link #fallbackToFsChecksum(int)}, and block until {@link #stream} is set.
57     * 5) After some number of checksumOk() calls, we will go back to using HBase checksum.
58     * We will have 2 handles; however we presume checksums fail so rarely that we don't care.
59     */
60    private volatile FSDataInputStream stream = null;
61    private volatile FSDataInputStream streamNoFsChecksum = null;
62    private Object streamNoFsChecksumFirstCreateLock = new Object();
63  
64    // The configuration states that we should validate hbase checksums
65    private boolean useHBaseChecksumConfigured;
66  
67    // Record the current state of this reader with respect to
68    // validating checkums in HBase. This is originally set the same
69    // value as useHBaseChecksumConfigured, but can change state as and when
70    // we encounter checksum verification failures.
71    private volatile boolean useHBaseChecksum;
72  
73    // In the case of a checksum failure, do these many succeeding
74    // reads without hbase checksum verification.
75    private volatile int hbaseChecksumOffCount = -1;
76  
77    public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException {
78      this(fs, null, path, false);
79    }
80  
81    public FSDataInputStreamWrapper(FileSystem fs, Path path, boolean dropBehind) throws IOException {
82      this(fs, null, path, dropBehind);
83    }
84  
85    public FSDataInputStreamWrapper(FileSystem fs, FileLink link) throws IOException {
86      this(fs, link, null, false);
87    }
88    public FSDataInputStreamWrapper(FileSystem fs, FileLink link,
89                                    boolean dropBehind) throws IOException {
90      this(fs, link, null, dropBehind);
91    }
92  
93    private FSDataInputStreamWrapper(FileSystem fs, FileLink link,
94                                     Path path, boolean dropBehind) throws IOException {
95      assert (path == null) != (link == null);
96      this.path = path;
97      this.link = link;
98      this.doCloseStreams = true;
99      // If the fs is not an instance of HFileSystem, then create an instance of HFileSystem
100     // that wraps over the specified fs. In this case, we will not be able to avoid
101     // checksumming inside the filesystem.
102     this.hfs = (fs instanceof HFileSystem) ? (HFileSystem)fs : new HFileSystem(fs);
103 
104     // Initially we are going to read the tail block. Open the reader w/FS checksum.
105     this.useHBaseChecksumConfigured = this.useHBaseChecksum = false;
106     this.stream = (link != null) ? link.open(hfs) : hfs.open(path);
107     try {
108       this.stream.setDropBehind(dropBehind);
109     } catch (Exception e) {
110       // Skipped.
111     }
112   }
113 
114 
115   /**
116    * Prepares the streams for block reader. NOT THREAD SAFE. Must be called once, after any
117    * reads finish and before any other reads start (what happens in reality is we read the
118    * tail, then call this based on what's in the tail, then read blocks).
119    * @param forceNoHBaseChecksum Force not using HBase checksum.
120    */
121   public void prepareForBlockReader(boolean forceNoHBaseChecksum) throws IOException {
122     if (hfs == null) return;
123     assert this.stream != null && !this.useHBaseChecksumConfigured;
124     boolean useHBaseChecksum =
125         !forceNoHBaseChecksum && hfs.useHBaseChecksum() && (hfs.getNoChecksumFs() != hfs);
126 
127     if (useHBaseChecksum) {
128       FileSystem fsNc = hfs.getNoChecksumFs();
129       this.streamNoFsChecksum = (link != null) ? link.open(fsNc) : fsNc.open(path);
130       this.useHBaseChecksumConfigured = this.useHBaseChecksum = useHBaseChecksum;
131       // Close the checksum stream; we will reopen it if we get an HBase checksum failure.
132       this.stream.close();
133       this.stream = null;
134     }
135   }
136 
137   /** For use in tests. */
138   @VisibleForTesting
139   public FSDataInputStreamWrapper(FSDataInputStream fsdis) {
140     this(fsdis, fsdis);
141   }
142 
143   /** For use in tests. */
144   @VisibleForTesting
145   public FSDataInputStreamWrapper(FSDataInputStream fsdis, FSDataInputStream noChecksum) {
146     doCloseStreams = false;
147     stream = fsdis;
148     streamNoFsChecksum = noChecksum;
149     path = null;
150     link = null;
151     hfs = null;
152     useHBaseChecksumConfigured = useHBaseChecksum = false;
153   }
154 
155   /**
156    * @return Whether we are presently using HBase checksum.
157    */
158   public boolean shouldUseHBaseChecksum() {
159     return this.useHBaseChecksum;
160   }
161 
162   /**
163    * Get the stream to use. Thread-safe.
164    * @param useHBaseChecksum must be the value that shouldUseHBaseChecksum has returned
165    *  at some point in the past, otherwise the result is undefined.
166    */
167   public FSDataInputStream getStream(boolean useHBaseChecksum) {
168     return useHBaseChecksum ? this.streamNoFsChecksum : this.stream;
169   }
170 
171   /**
172    * Read from non-checksum stream failed, fall back to FS checksum. Thread-safe.
173    * @param offCount For how many checksumOk calls to turn off the HBase checksum.
174    */
175   public FSDataInputStream fallbackToFsChecksum(int offCount) throws IOException {
176     // checksumOffCount is speculative, but let's try to reset it less.
177     boolean partOfConvoy = false;
178     if (this.stream == null) {
179       synchronized (streamNoFsChecksumFirstCreateLock) {
180         partOfConvoy = (this.stream != null);
181         if (!partOfConvoy) {
182           this.stream = (link != null) ? link.open(hfs) : hfs.open(path);
183         }
184       }
185     }
186     if (!partOfConvoy) {
187       this.useHBaseChecksum = false;
188       this.hbaseChecksumOffCount = offCount;
189     }
190     return this.stream;
191   }
192 
193   /** Report that checksum was ok, so we may ponder going back to HBase checksum. */
194   public void checksumOk() {
195     if (this.useHBaseChecksumConfigured && !this.useHBaseChecksum
196         && (this.hbaseChecksumOffCount-- < 0)) {
197       // The stream we need is already open (because we were using HBase checksum in the past).
198       assert this.streamNoFsChecksum != null;
199       this.useHBaseChecksum = true;
200     }
201   }
202 
203   /** Close stream(s) if necessary. */
204   public void close() throws IOException {
205     if (!doCloseStreams) return;
206     try {
207       if (stream != streamNoFsChecksum && streamNoFsChecksum != null) {
208         streamNoFsChecksum.close();
209         streamNoFsChecksum = null;
210       }
211     } finally {
212       if (stream != null) {
213         stream.close();
214         stream = null;
215       }
216     }
217   }
218 
219   public HFileSystem getHfs() {
220     return this.hfs;
221   }
222 }