1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.io;
19
20 import java.io.IOException;
21 import java.io.InputStream;
22 import java.lang.reflect.InvocationTargetException;
23 import java.lang.reflect.Method;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.fs.FSDataInputStream;
28 import org.apache.hadoop.fs.FileSystem;
29 import org.apache.hadoop.fs.Path;
30 import org.apache.hadoop.hbase.fs.HFileSystem;
31
32 import com.google.common.annotations.VisibleForTesting;
33
34
35
36
37
38
39 public class FSDataInputStreamWrapper {
40 private static final Log LOG = LogFactory.getLog(FSDataInputStreamWrapper.class);
41 private static final boolean isLogTraceEnabled = LOG.isTraceEnabled();
42
43 private final HFileSystem hfs;
44 private final Path path;
45 private final FileLink link;
46 private final boolean doCloseStreams;
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68 private volatile FSDataInputStream stream = null;
69 private volatile FSDataInputStream streamNoFsChecksum = null;
70 private Object streamNoFsChecksumFirstCreateLock = new Object();
71
72
73 private boolean useHBaseChecksumConfigured;
74
75
76
77
78
79 private volatile boolean useHBaseChecksum;
80
81
82
83 private volatile int hbaseChecksumOffCount = -1;
84
85 private Boolean instanceOfCanUnbuffer = null;
86
87
88 private Method unbuffer = null;
89
90 public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException {
91 this(fs, null, path, false);
92 }
93
94 public FSDataInputStreamWrapper(FileSystem fs, Path path, boolean dropBehind) throws IOException {
95 this(fs, null, path, dropBehind);
96 }
97
98 public FSDataInputStreamWrapper(FileSystem fs, FileLink link) throws IOException {
99 this(fs, link, null, false);
100 }
101 public FSDataInputStreamWrapper(FileSystem fs, FileLink link,
102 boolean dropBehind) throws IOException {
103 this(fs, link, null, dropBehind);
104 }
105
106 private FSDataInputStreamWrapper(FileSystem fs, FileLink link,
107 Path path, boolean dropBehind) throws IOException {
108 assert (path == null) != (link == null);
109 this.path = path;
110 this.link = link;
111 this.doCloseStreams = true;
112
113
114
115 this.hfs = (fs instanceof HFileSystem) ? (HFileSystem)fs : new HFileSystem(fs);
116
117
118 this.useHBaseChecksumConfigured = this.useHBaseChecksum = false;
119 this.stream = (link != null) ? link.open(hfs) : hfs.open(path);
120 try {
121 this.stream.setDropBehind(dropBehind);
122 } catch (Exception e) {
123
124 }
125 }
126
127
128
129
130
131
132
133
134 public void prepareForBlockReader(boolean forceNoHBaseChecksum) throws IOException {
135 if (hfs == null) return;
136 assert this.stream != null && !this.useHBaseChecksumConfigured;
137 boolean useHBaseChecksum =
138 !forceNoHBaseChecksum && hfs.useHBaseChecksum() && (hfs.getNoChecksumFs() != hfs);
139
140 if (useHBaseChecksum) {
141 FileSystem fsNc = hfs.getNoChecksumFs();
142 this.streamNoFsChecksum = (link != null) ? link.open(fsNc) : fsNc.open(path);
143 this.useHBaseChecksumConfigured = this.useHBaseChecksum = useHBaseChecksum;
144
145 this.stream.close();
146 this.stream = null;
147 }
148 }
149
150
151 @VisibleForTesting
152 public FSDataInputStreamWrapper(FSDataInputStream fsdis) {
153 this(fsdis, fsdis);
154 }
155
156
157 @VisibleForTesting
158 public FSDataInputStreamWrapper(FSDataInputStream fsdis, FSDataInputStream noChecksum) {
159 doCloseStreams = false;
160 stream = fsdis;
161 streamNoFsChecksum = noChecksum;
162 path = null;
163 link = null;
164 hfs = null;
165 useHBaseChecksumConfigured = useHBaseChecksum = false;
166 }
167
168
169
170
171 public boolean shouldUseHBaseChecksum() {
172 return this.useHBaseChecksum;
173 }
174
175
176
177
178
179
180 public FSDataInputStream getStream(boolean useHBaseChecksum) {
181 return useHBaseChecksum ? this.streamNoFsChecksum : this.stream;
182 }
183
184
185
186
187
188 public FSDataInputStream fallbackToFsChecksum(int offCount) throws IOException {
189
190 boolean partOfConvoy = false;
191 if (this.stream == null) {
192 synchronized (streamNoFsChecksumFirstCreateLock) {
193 partOfConvoy = (this.stream != null);
194 if (!partOfConvoy) {
195 this.stream = (link != null) ? link.open(hfs) : hfs.open(path);
196 }
197 }
198 }
199 if (!partOfConvoy) {
200 this.useHBaseChecksum = false;
201 this.hbaseChecksumOffCount = offCount;
202 }
203 return this.stream;
204 }
205
206
207 public void checksumOk() {
208 if (this.useHBaseChecksumConfigured && !this.useHBaseChecksum
209 && (this.hbaseChecksumOffCount-- < 0)) {
210
211 assert this.streamNoFsChecksum != null;
212 this.useHBaseChecksum = true;
213 }
214 }
215
216
217 public void close() throws IOException {
218 if (!doCloseStreams) return;
219 try {
220 if (stream != streamNoFsChecksum && streamNoFsChecksum != null) {
221 streamNoFsChecksum.close();
222 streamNoFsChecksum = null;
223 }
224 } finally {
225 if (stream != null) {
226 stream.close();
227 stream = null;
228 }
229 }
230 }
231
232 public HFileSystem getHfs() {
233 return this.hfs;
234 }
235
236
237
238
239
240
241
242
243
244
245
246
247 @SuppressWarnings({ "rawtypes" })
248 public void unbuffer() {
249 FSDataInputStream stream = this.getStream(this.shouldUseHBaseChecksum());
250 if (stream != null) {
251 InputStream wrappedStream = stream.getWrappedStream();
252
253
254
255 final Class<? extends InputStream> streamClass = wrappedStream.getClass();
256 if (this.instanceOfCanUnbuffer == null) {
257
258 this.instanceOfCanUnbuffer = false;
259 Class<?>[] streamInterfaces = streamClass.getInterfaces();
260 for (Class c : streamInterfaces) {
261 if (c.getCanonicalName().toString().equals("org.apache.hadoop.fs.CanUnbuffer")) {
262 try {
263 this.unbuffer = streamClass.getDeclaredMethod("unbuffer");
264 } catch (NoSuchMethodException | SecurityException e) {
265 if (isLogTraceEnabled) {
266 LOG.trace("Failed to find 'unbuffer' method in class " + streamClass
267 + " . So there may be a TCP socket connection "
268 + "left open in CLOSE_WAIT state.", e);
269 }
270 return;
271 }
272 this.instanceOfCanUnbuffer = true;
273 break;
274 }
275 }
276 }
277 if (this.instanceOfCanUnbuffer) {
278 try {
279 this.unbuffer.invoke(wrappedStream);
280 } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
281 if (isLogTraceEnabled) {
282 LOG.trace("Failed to invoke 'unbuffer' method in class " + streamClass
283 + " . So there may be a TCP socket connection left open in CLOSE_WAIT state.", e);
284 }
285 }
286 } else {
287 if (isLogTraceEnabled) {
288 LOG.trace("Failed to find 'unbuffer' method in class " + streamClass
289 + " . So there may be a TCP socket connection "
290 + "left open in CLOSE_WAIT state. For more details check "
291 + "https://issues.apache.org/jira/browse/HBASE-9393");
292 }
293 }
294 }
295 }
296 }