1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.apache.hadoop.hbase.fs;
22
23 import java.io.Closeable;
24 import java.io.IOException;
25 import java.lang.reflect.Field;
26 import java.lang.reflect.InvocationHandler;
27 import java.lang.reflect.InvocationTargetException;
28 import java.lang.reflect.Method;
29 import java.lang.reflect.Modifier;
30 import java.lang.reflect.Proxy;
31 import java.lang.reflect.UndeclaredThrowableException;
32 import java.net.URI;
33
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36 import org.apache.hadoop.conf.Configuration;
37 import org.apache.hadoop.fs.FSDataOutputStream;
38 import org.apache.hadoop.fs.FileSystem;
39 import org.apache.hadoop.fs.FilterFileSystem;
40 import org.apache.hadoop.fs.LocalFileSystem;
41 import org.apache.hadoop.fs.Path;
42 import org.apache.hadoop.hbase.ServerName;
43 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
44 import org.apache.hadoop.hdfs.DFSClient;
45 import org.apache.hadoop.hdfs.DistributedFileSystem;
46 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
47 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
48 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
49 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
50 import org.apache.hadoop.ipc.RPC;
51 import org.apache.hadoop.util.Progressable;
52 import org.apache.hadoop.util.ReflectionUtils;
53
54
55
56
57
58
59
60
61 public class HFileSystem extends FilterFileSystem {
62 public static final Log LOG = LogFactory.getLog(HFileSystem.class);
63
64 private final FileSystem noChecksumFs;
65 private final boolean useHBaseChecksum;
66
67
68
69
70
71
72
73
74 public HFileSystem(Configuration conf, boolean useHBaseChecksum)
75 throws IOException {
76
77
78
79
80 this.fs = FileSystem.get(conf);
81 this.useHBaseChecksum = useHBaseChecksum;
82
83 fs.initialize(getDefaultUri(conf), conf);
84
85
86 if (fs instanceof LocalFileSystem) {
87 fs.setWriteChecksum(false);
88 fs.setVerifyChecksum(false);
89 }
90
91 addLocationsOrderInterceptor(conf);
92
93
94
95
96
97
98
99
100
101 if (useHBaseChecksum && !(fs instanceof LocalFileSystem)) {
102 conf = new Configuration(conf);
103 conf.setBoolean("dfs.client.read.shortcircuit.skip.checksum", true);
104 this.noChecksumFs = newInstanceFileSystem(conf);
105 this.noChecksumFs.setVerifyChecksum(false);
106 } else {
107 this.noChecksumFs = fs;
108 }
109 }
110
111
112
113
114
115
116
117 public HFileSystem(FileSystem fs) {
118 this.fs = fs;
119 this.noChecksumFs = fs;
120 this.useHBaseChecksum = false;
121 }
122
123
124
125
126
127
128
129
130 public FileSystem getNoChecksumFs() {
131 return noChecksumFs;
132 }
133
134
135
136
137
138 public FileSystem getBackingFs() throws IOException {
139 return fs;
140 }
141
142
143
144
145
146
147 public boolean useHBaseChecksum() {
148 return useHBaseChecksum;
149 }
150
151
152
153
154 @Override
155 public void close() throws IOException {
156 super.close();
157 if (this.noChecksumFs != fs) {
158 this.noChecksumFs.close();
159 }
160 }
161
162
163
164
165
166
167
168
169
170 private static FileSystem newInstanceFileSystem(Configuration conf)
171 throws IOException {
172 URI uri = FileSystem.getDefaultUri(conf);
173 FileSystem fs = null;
174 Class<?> clazz = conf.getClass("fs." + uri.getScheme() + ".impl", null);
175 if (clazz != null) {
176
177 fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
178 fs.initialize(uri, conf);
179 } else {
180
181
182
183 Configuration clone = new Configuration(conf);
184 clone.setBoolean("fs." + uri.getScheme() + ".impl.disable.cache", true);
185 fs = FileSystem.get(uri, clone);
186 }
187 if (fs == null) {
188 throw new IOException("No FileSystem for scheme: " + uri.getScheme());
189 }
190
191 return fs;
192 }
193
194 public static boolean addLocationsOrderInterceptor(Configuration conf) throws IOException {
195 return addLocationsOrderInterceptor(conf, new ReorderWALBlocks());
196 }
197
198
199
200
201
202
203
204
205
206 static boolean addLocationsOrderInterceptor(Configuration conf, final ReorderBlocks lrb) {
207 if (!conf.getBoolean("hbase.filesystem.reorder.blocks", true)) {
208 LOG.debug("addLocationsOrderInterceptor configured to false");
209 return false;
210 }
211
212 FileSystem fs;
213 try {
214 fs = FileSystem.get(conf);
215 } catch (IOException e) {
216 LOG.warn("Can't get the file system from the conf.", e);
217 return false;
218 }
219
220 if (!(fs instanceof DistributedFileSystem)) {
221 LOG.debug("The file system is not a DistributedFileSystem. " +
222 "Skipping on block location reordering");
223 return false;
224 }
225
226 DistributedFileSystem dfs = (DistributedFileSystem) fs;
227 DFSClient dfsc = dfs.getClient();
228 if (dfsc == null) {
229 LOG.warn("The DistributedFileSystem does not contain a DFSClient. Can't add the location " +
230 "block reordering interceptor. Continuing, but this is unexpected."
231 );
232 return false;
233 }
234
235 try {
236 Field nf = DFSClient.class.getDeclaredField("namenode");
237 nf.setAccessible(true);
238 Field modifiersField = Field.class.getDeclaredField("modifiers");
239 modifiersField.setAccessible(true);
240 modifiersField.setInt(nf, nf.getModifiers() & ~Modifier.FINAL);
241
242 ClientProtocol namenode = (ClientProtocol) nf.get(dfsc);
243 if (namenode == null) {
244 LOG.warn("The DFSClient is not linked to a namenode. Can't add the location block" +
245 " reordering interceptor. Continuing, but this is unexpected."
246 );
247 return false;
248 }
249
250 ClientProtocol cp1 = createReorderingProxy(namenode, lrb, conf);
251 nf.set(dfsc, cp1);
252 LOG.info("Added intercepting call to namenode#getBlockLocations so can do block reordering" +
253 " using class " + lrb.getClass().getName());
254 } catch (NoSuchFieldException e) {
255 LOG.warn("Can't modify the DFSClient#namenode field to add the location reorder.", e);
256 return false;
257 } catch (IllegalAccessException e) {
258 LOG.warn("Can't modify the DFSClient#namenode field to add the location reorder.", e);
259 return false;
260 }
261
262 return true;
263 }
264
265 private static ClientProtocol createReorderingProxy(final ClientProtocol cp,
266 final ReorderBlocks lrb, final Configuration conf) {
267 return (ClientProtocol) Proxy.newProxyInstance
268 (cp.getClass().getClassLoader(),
269 new Class[]{ClientProtocol.class, Closeable.class},
270 new InvocationHandler() {
271 public Object invoke(Object proxy, Method method,
272 Object[] args) throws Throwable {
273 try {
274 if ((args == null || args.length == 0)
275 && "close".equals(method.getName())) {
276 RPC.stopProxy(cp);
277 return null;
278 } else {
279 Object res = method.invoke(cp, args);
280 if (res != null && args != null && args.length == 3
281 && "getBlockLocations".equals(method.getName())
282 && res instanceof LocatedBlocks
283 && args[0] instanceof String
284 && args[0] != null) {
285 lrb.reorderBlocks(conf, (LocatedBlocks) res, (String) args[0]);
286 }
287 return res;
288 }
289 } catch (InvocationTargetException ite) {
290
291
292 Throwable cause = ite.getCause();
293 if (cause == null){
294 throw new RuntimeException(
295 "Proxy invocation failed and getCause is null", ite);
296 }
297 if (cause instanceof UndeclaredThrowableException) {
298 Throwable causeCause = cause.getCause();
299 if (causeCause == null) {
300 throw new RuntimeException("UndeclaredThrowableException had null cause!");
301 }
302 cause = cause.getCause();
303 }
304 throw cause;
305 }
306 }
307 });
308 }
309
310
311
312
313 interface ReorderBlocks {
314
315
316
317
318
319
320
321 void reorderBlocks(Configuration conf, LocatedBlocks lbs, String src) throws IOException;
322 }
323
324
325
326
327
328
329 static class ReorderWALBlocks implements ReorderBlocks {
330 public void reorderBlocks(Configuration conf, LocatedBlocks lbs, String src)
331 throws IOException {
332
333 ServerName sn = DefaultWALProvider.getServerNameFromWALDirectoryName(conf, src);
334 if (sn == null) {
335
336 return;
337 }
338
339
340 String hostName = sn.getHostname();
341 if (LOG.isTraceEnabled()) {
342 LOG.trace(src +
343 " is an WAL file, so reordering blocks, last hostname will be:" + hostName);
344 }
345
346
347 for (LocatedBlock lb : lbs.getLocatedBlocks()) {
348 DatanodeInfo[] dnis = lb.getLocations();
349 if (dnis != null && dnis.length > 1) {
350 boolean found = false;
351 for (int i = 0; i < dnis.length - 1 && !found; i++) {
352 if (hostName.equals(dnis[i].getHostName())) {
353
354 DatanodeInfo toLast = dnis[i];
355 System.arraycopy(dnis, i + 1, dnis, i, dnis.length - i - 1);
356 dnis[dnis.length - 1] = toLast;
357 found = true;
358 }
359 }
360 }
361 }
362 }
363 }
364
365
366
367
368
369
370
371 static public FileSystem get(Configuration conf) throws IOException {
372 return new HFileSystem(conf, true);
373 }
374
375
376
377
378 static public FileSystem getLocalFs(Configuration conf) throws IOException {
379 return new HFileSystem(FileSystem.getLocal(conf));
380 }
381
382
383
384
385
386
387 @SuppressWarnings("deprecation")
388 public FSDataOutputStream createNonRecursive(Path f,
389 boolean overwrite,
390 int bufferSize, short replication, long blockSize,
391 Progressable progress) throws IOException {
392 return fs.createNonRecursive(f, overwrite, bufferSize, replication,
393 blockSize, progress);
394 }
395 }