1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.hbase.io; 19 20 import java.io.IOException; 21 22 import org.apache.hadoop.fs.FSDataInputStream; 23 import org.apache.hadoop.fs.FileSystem; 24 import org.apache.hadoop.fs.Path; 25 import org.apache.hadoop.hbase.fs.HFileSystem; 26 import org.apache.hadoop.hbase.io.FileLink; 27 28 import com.google.common.annotations.VisibleForTesting; 29 30 /** 31 * Wrapper for input stream(s) that takes care of the interaction of FS and HBase checksums, 32 * as well as closing streams. Initialization is not thread-safe, but normal operation is; 33 * see method comments. 34 */ 35 public class FSDataInputStreamWrapper { 36 private final HFileSystem hfs; 37 private final Path path; 38 private final FileLink link; 39 private final boolean doCloseStreams; 40 41 /** Two stream handles, one with and one without FS-level checksum. 42 * HDFS checksum setting is on FS level, not single read level, so you have to keep two 43 * FS objects and two handles open to interleave different reads freely, which is very sad. 44 * This is what we do: 45 * 1) First, we need to read the trailer of HFile to determine checksum parameters. 46 * We always use FS checksum to do that, so ctor opens {@link #stream}. 47 * 2.1) After that, if HBase checksum is not used, we'd just always use {@link #stream}; 48 * 2.2) If HBase checksum can be used, we'll open {@link #streamNoFsChecksum}, 49 * and close {@link #stream}. User MUST call prepareForBlockReader for that to happen; 50 * if they don't, (2.1) will be the default. 51 * 3) The users can call {@link #shouldUseHBaseChecksum()}, and pass its result to 52 * {@link #getStream(boolean)} to get stream (if Java had out/pointer params we could 53 * return both in one call). This stream is guaranteed to be set. 54 * 4) The first time HBase checksum fails, one would call {@link #fallbackToFsChecksum(int)}. 55 * That will take lock, and open {@link #stream}. While this is going on, others will 56 * continue to use the old stream; if they also want to fall back, they'll also call 57 * {@link #fallbackToFsChecksum(int)}, and block until {@link #stream} is set. 58 * 5) After some number of checksumOk() calls, we will go back to using HBase checksum. 59 * We will have 2 handles; however we presume checksums fail so rarely that we don't care. 60 */ 61 private volatile FSDataInputStream stream = null; 62 private volatile FSDataInputStream streamNoFsChecksum = null; 63 private Object streamNoFsChecksumFirstCreateLock = new Object(); 64 65 // The configuration states that we should validate hbase checksums 66 private boolean useHBaseChecksumConfigured; 67 68 // Record the current state of this reader with respect to 69 // validating checkums in HBase. This is originally set the same 70 // value as useHBaseChecksumConfigured, but can change state as and when 71 // we encounter checksum verification failures. 72 private volatile boolean useHBaseChecksum; 73 74 // In the case of a checksum failure, do these many succeeding 75 // reads without hbase checksum verification. 76 private volatile int hbaseChecksumOffCount = -1; 77 78 public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException { 79 this(fs, null, path); 80 } 81 82 public FSDataInputStreamWrapper(FileSystem fs, FileLink link) throws IOException { 83 this(fs, link, null); 84 } 85 86 private FSDataInputStreamWrapper(FileSystem fs, FileLink link, Path path) throws IOException { 87 assert (path == null) != (link == null); 88 this.path = path; 89 this.link = link; 90 this.doCloseStreams = true; 91 // If the fs is not an instance of HFileSystem, then create an instance of HFileSystem 92 // that wraps over the specified fs. In this case, we will not be able to avoid 93 // checksumming inside the filesystem. 94 this.hfs = (fs instanceof HFileSystem) ? (HFileSystem)fs : new HFileSystem(fs); 95 96 // Initially we are going to read the tail block. Open the reader w/FS checksum. 97 this.useHBaseChecksumConfigured = this.useHBaseChecksum = false; 98 this.stream = (link != null) ? link.open(hfs) : hfs.open(path); 99 } 100 101 /** 102 * Prepares the streams for block reader. NOT THREAD SAFE. Must be called once, after any 103 * reads finish and before any other reads start (what happens in reality is we read the 104 * tail, then call this based on what's in the tail, then read blocks). 105 * @param forceNoHBaseChecksum Force not using HBase checksum. 106 */ 107 public void prepareForBlockReader(boolean forceNoHBaseChecksum) throws IOException { 108 if (hfs == null) return; 109 assert this.stream != null && !this.useHBaseChecksumConfigured; 110 boolean useHBaseChecksum = 111 !forceNoHBaseChecksum && hfs.useHBaseChecksum() && (hfs.getNoChecksumFs() != hfs); 112 113 if (useHBaseChecksum) { 114 FileSystem fsNc = hfs.getNoChecksumFs(); 115 this.streamNoFsChecksum = (link != null) ? link.open(fsNc) : fsNc.open(path); 116 this.useHBaseChecksumConfigured = this.useHBaseChecksum = useHBaseChecksum; 117 // Close the checksum stream; we will reopen it if we get an HBase checksum failure. 118 this.stream.close(); 119 this.stream = null; 120 } 121 } 122 123 /** For use in tests. */ 124 @VisibleForTesting 125 public FSDataInputStreamWrapper(FSDataInputStream fsdis) { 126 this(fsdis, fsdis); 127 } 128 129 /** For use in tests. */ 130 @VisibleForTesting 131 public FSDataInputStreamWrapper(FSDataInputStream fsdis, FSDataInputStream noChecksum) { 132 doCloseStreams = false; 133 stream = fsdis; 134 streamNoFsChecksum = noChecksum; 135 path = null; 136 link = null; 137 hfs = null; 138 useHBaseChecksumConfigured = useHBaseChecksum = false; 139 } 140 141 /** 142 * @return Whether we are presently using HBase checksum. 143 */ 144 public boolean shouldUseHBaseChecksum() { 145 return this.useHBaseChecksum; 146 } 147 148 /** 149 * Get the stream to use. Thread-safe. 150 * @param useHBaseChecksum must be the value that shouldUseHBaseChecksum has returned 151 * at some point in the past, otherwise the result is undefined. 152 */ 153 public FSDataInputStream getStream(boolean useHBaseChecksum) { 154 return useHBaseChecksum ? this.streamNoFsChecksum : this.stream; 155 } 156 157 /** 158 * Read from non-checksum stream failed, fall back to FS checksum. Thread-safe. 159 * @param offCount For how many checksumOk calls to turn off the HBase checksum. 160 */ 161 public FSDataInputStream fallbackToFsChecksum(int offCount) throws IOException { 162 // checksumOffCount is speculative, but let's try to reset it less. 163 boolean partOfConvoy = false; 164 if (this.stream == null) { 165 synchronized (streamNoFsChecksumFirstCreateLock) { 166 partOfConvoy = (this.stream != null); 167 if (!partOfConvoy) { 168 this.stream = (link != null) ? link.open(hfs) : hfs.open(path); 169 } 170 } 171 } 172 if (!partOfConvoy) { 173 this.useHBaseChecksum = false; 174 this.hbaseChecksumOffCount = offCount; 175 } 176 return this.stream; 177 } 178 179 /** Report that checksum was ok, so we may ponder going back to HBase checksum. */ 180 public void checksumOk() { 181 if (this.useHBaseChecksumConfigured && !this.useHBaseChecksum 182 && (this.hbaseChecksumOffCount-- < 0)) { 183 // The stream we need is already open (because we were using HBase checksum in the past). 184 assert this.streamNoFsChecksum != null; 185 this.useHBaseChecksum = true; 186 } 187 } 188 189 /** Close stream(s) if necessary. */ 190 public void close() throws IOException { 191 if (!doCloseStreams) return; 192 try { 193 if (stream != streamNoFsChecksum && streamNoFsChecksum != null) { 194 streamNoFsChecksum.close(); 195 streamNoFsChecksum = null; 196 } 197 } finally { 198 if (stream != null) { 199 stream.close(); 200 stream = null; 201 } 202 } 203 } 204 205 public HFileSystem getHfs() { 206 return this.hfs; 207 } 208 }