View Javadoc

1   /*
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.hbase.fs;
22  
23  import java.io.Closeable;
24  import java.io.IOException;
25  import java.lang.reflect.Field;
26  import java.lang.reflect.InvocationHandler;
27  import java.lang.reflect.InvocationTargetException;
28  import java.lang.reflect.Method;
29  import java.lang.reflect.Modifier;
30  import java.lang.reflect.Proxy;
31  import java.lang.reflect.UndeclaredThrowableException;
32  import java.net.URI;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.fs.FSDataOutputStream;
38  import org.apache.hadoop.fs.FileSystem;
39  import org.apache.hadoop.fs.FilterFileSystem;
40  import org.apache.hadoop.fs.LocalFileSystem;
41  import org.apache.hadoop.fs.Path;
42  import org.apache.hadoop.hbase.ServerName;
43  import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
44  import org.apache.hadoop.hdfs.DFSClient;
45  import org.apache.hadoop.hdfs.DistributedFileSystem;
46  import org.apache.hadoop.hdfs.protocol.ClientProtocol;
47  import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
48  import org.apache.hadoop.hdfs.protocol.LocatedBlock;
49  import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
50  import org.apache.hadoop.ipc.RPC;
51  import org.apache.hadoop.util.Progressable;
52  import org.apache.hadoop.util.ReflectionUtils;
53  
54  /**
55   * An encapsulation for the FileSystem object that hbase uses to access
56   * data. This class allows the flexibility of using  
57   * separate filesystem objects for reading and writing hfiles and wals.
58   * In future, if we want to make wals be in a different filesystem,
59   * this is the place to make it happen.
60   */
61  public class HFileSystem extends FilterFileSystem {
62    public static final Log LOG = LogFactory.getLog(HFileSystem.class);
63  
64    private final FileSystem noChecksumFs;   // read hfile data from storage
65    private final boolean useHBaseChecksum;
66  
67    /**
68     * Create a FileSystem object for HBase regionservers.
69     * @param conf The configuration to be used for the filesystem
70     * @param useHBaseChecksum if true, then use
71     *        checksum verfication in hbase, otherwise
72     *        delegate checksum verification to the FileSystem.
73     */
74    public HFileSystem(Configuration conf, boolean useHBaseChecksum)
75      throws IOException {
76  
77      // Create the default filesystem with checksum verification switched on.
78      // By default, any operation to this FilterFileSystem occurs on
79      // the underlying filesystem that has checksums switched on.
80      this.fs = FileSystem.get(conf);
81      this.useHBaseChecksum = useHBaseChecksum;
82      
83      fs.initialize(getDefaultUri(conf), conf);
84      
85      // disable checksum verification for local fileSystem, see HBASE-11218
86      if (fs instanceof LocalFileSystem) {
87        fs.setWriteChecksum(false);
88        fs.setVerifyChecksum(false);
89      }
90  
91      addLocationsOrderInterceptor(conf);
92  
93      // If hbase checksum verification is switched on, then create a new
94      // filesystem object that has cksum verification turned off.
95      // We will avoid verifying checksums in the fs client, instead do it
96      // inside of hbase.
97      // If this is the local file system hadoop has a bug where seeks
98      // do not go to the correct location if setVerifyChecksum(false) is called.
99      // This manifests itself in that incorrect data is read and HFileBlocks won't be able to read
100     // their header magic numbers. See HBASE-5885
101     if (useHBaseChecksum && !(fs instanceof LocalFileSystem)) {
102       conf = new Configuration(conf);
103       conf.setBoolean("dfs.client.read.shortcircuit.skip.checksum", true);
104       this.noChecksumFs = maybeWrapFileSystem(newInstanceFileSystem(conf), conf);
105       this.noChecksumFs.setVerifyChecksum(false);
106     } else {
107       this.noChecksumFs = maybeWrapFileSystem(fs, conf);
108     }
109
110     this.fs = maybeWrapFileSystem(this.fs, conf);
111   }
112
113   /**
114    * Wrap a FileSystem object within a HFileSystem. The noChecksumFs and
115    * writefs are both set to be the same specified fs. 
116    * Do not verify hbase-checksums while reading data from filesystem.
117    * @param fs Set the noChecksumFs and writeFs to this specified filesystem.
118    */
119   public HFileSystem(FileSystem fs) {
120     this.fs = fs;
121     this.noChecksumFs = fs;
122     this.useHBaseChecksum = false;
123   }
124
125   /**
126    * Returns the filesystem that is specially setup for 
127    * doing reads from storage. This object avoids doing 
128    * checksum verifications for reads.
129    * @return The FileSystem object that can be used to read data
130    *         from files.
131    */
132   public FileSystem getNoChecksumFs() {
133     return noChecksumFs;
134   }
135
136   /**
137    * Returns the underlying filesystem
138    * @return The underlying FileSystem for this FilterFileSystem object.
139    */
140   public FileSystem getBackingFs() throws IOException {
141     return fs;
142   }
143
144   /**
145    * Are we verifying checksums in HBase?
146    * @return True, if hbase is configured to verify checksums,
147    *         otherwise false.
148    */
149   public boolean useHBaseChecksum() {
150     return useHBaseChecksum;
151   }
152
153   /**
154    * Close this filesystem object
155    */
156   @Override
157   public void close() throws IOException {
158     super.close();
159     if (this.noChecksumFs != fs) {
160       this.noChecksumFs.close();
161     }
162   }
163
164  /**
165    * Returns a brand new instance of the FileSystem. It does not use
166    * the FileSystem.Cache. In newer versions of HDFS, we can directly
167    * invoke FileSystem.newInstance(Configuration).
168    * 
169    * @param conf Configuration
170    * @return A new instance of the filesystem
171    */
172   private static FileSystem newInstanceFileSystem(Configuration conf)
173     throws IOException {
174     URI uri = FileSystem.getDefaultUri(conf);
175     FileSystem fs = null;
176     Class<?> clazz = conf.getClass("fs." + uri.getScheme() + ".impl", null);
177     if (clazz != null) {
178       // This will be true for Hadoop 1.0, or 0.20.
179       fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
180       fs.initialize(uri, conf);
181     } else {
182       // For Hadoop 2.0, we have to go through FileSystem for the filesystem
183       // implementation to be loaded by the service loader in case it has not
184       // been loaded yet.
185       Configuration clone = new Configuration(conf);
186       clone.setBoolean("fs." + uri.getScheme() + ".impl.disable.cache", true);
187       fs = FileSystem.get(uri, clone);
188     }
189     if (fs == null) {
190       throw new IOException("No FileSystem for scheme: " + uri.getScheme());
191     }
192
193     return fs;
194   }
195
196   /**
197    * Returns an instance of Filesystem wrapped into the class specified in
198    * hbase.fs.wrapper property, if one is set in the configuration, returns
199    * unmodified FS instance passed in as an argument otherwise.
200    * @param base Filesystem instance to wrap
201    * @param conf Configuration
202    * @return wrapped instance of FS, or the same instance if no wrapping configured.
203    */
204   private FileSystem maybeWrapFileSystem(FileSystem base, Configuration conf) {
205     try {
206       Class<?> clazz = conf.getClass("hbase.fs.wrapper", null);
207       if (clazz != null) {
208         return (FileSystem) clazz.getConstructor(FileSystem.class, Configuration.class)
209           .newInstance(base, conf);
210       }
211     } catch (Exception e) {
212       LOG.error("Failed to wrap filesystem: " + e);
213     }
214     return base;
215   }
216
217   public static boolean addLocationsOrderInterceptor(Configuration conf) throws IOException {
218     return addLocationsOrderInterceptor(conf, new ReorderWALBlocks());
219   }
220
221   /**
222    * Add an interceptor on the calls to the namenode#getBlockLocations from the DFSClient
223    * linked to this FileSystem. See HBASE-6435 for the background.
224    * <p/>
225    * There should be no reason, except testing, to create a specific ReorderBlocks.
226    *
227    * @return true if the interceptor was added, false otherwise.
228    */
229   static boolean addLocationsOrderInterceptor(Configuration conf, final ReorderBlocks lrb) {
230     if (!conf.getBoolean("hbase.filesystem.reorder.blocks", true)) {  // activated by default
231       LOG.debug("addLocationsOrderInterceptor configured to false");
232       return false;
233     }
234 
235     FileSystem fs;
236     try {
237       fs = FileSystem.get(conf);
238     } catch (IOException e) {
239       LOG.warn("Can't get the file system from the conf.", e);
240       return false;
241     }
242
243     if (!(fs instanceof DistributedFileSystem)) {
244       LOG.debug("The file system is not a DistributedFileSystem. " +
245           "Skipping on block location reordering");
246       return false;
247     }
248
249     DistributedFileSystem dfs = (DistributedFileSystem) fs;
250     DFSClient dfsc = dfs.getClient();
251     if (dfsc == null) {
252       LOG.warn("The DistributedFileSystem does not contain a DFSClient. Can't add the location " +
253           "block reordering interceptor. Continuing, but this is unexpected."
254       );
255       return false;
256     }
257
258     try {
259       Field nf = DFSClient.class.getDeclaredField("namenode");
260       nf.setAccessible(true);
261       Field modifiersField = Field.class.getDeclaredField("modifiers");
262       modifiersField.setAccessible(true);
263       modifiersField.setInt(nf, nf.getModifiers() & ~Modifier.FINAL);
264 
265       ClientProtocol namenode = (ClientProtocol) nf.get(dfsc);
266       if (namenode == null) {
267         LOG.warn("The DFSClient is not linked to a namenode. Can't add the location block" +
268             " reordering interceptor. Continuing, but this is unexpected."
269         );
270         return false;
271       }
272
273       ClientProtocol cp1 = createReorderingProxy(namenode, lrb, conf);
274       nf.set(dfsc, cp1);
275       LOG.info("Added intercepting call to namenode#getBlockLocations so can do block reordering" +
276         " using class " + lrb.getClass());
277     } catch (NoSuchFieldException e) {
278       LOG.warn("Can't modify the DFSClient#namenode field to add the location reorder.", e);
279       return false;
280     } catch (IllegalAccessException e) {
281       LOG.warn("Can't modify the DFSClient#namenode field to add the location reorder.", e);
282       return false;
283     }
284
285     return true;
286   }
287
288   private static ClientProtocol createReorderingProxy(final ClientProtocol cp,
289       final ReorderBlocks lrb, final Configuration conf) {
290     return (ClientProtocol) Proxy.newProxyInstance
291         (cp.getClass().getClassLoader(),
292             new Class[]{ClientProtocol.class, Closeable.class},
293             new InvocationHandler() {
294               public Object invoke(Object proxy, Method method,
295                                    Object[] args) throws Throwable {
296                 try {
297                   if ((args == null || args.length == 0)
298                       && "close".equals(method.getName())) {
299                     RPC.stopProxy(cp);
300                     return null;
301                   } else {
302                     Object res = method.invoke(cp, args);
303                     if (res != null && args != null && args.length == 3
304                         && "getBlockLocations".equals(method.getName())
305                         && res instanceof LocatedBlocks
306                         && args[0] instanceof String
307                         && args[0] != null) {
308                       lrb.reorderBlocks(conf, (LocatedBlocks) res, (String) args[0]);
309                     }
310                     return res;
311                   }
312                 } catch  (InvocationTargetException ite) {
313                   // We will have this for all the exception, checked on not, sent
314                   //  by any layer, including the functional exception
315                   Throwable cause = ite.getCause();
316                   if (cause == null){
317                     throw new RuntimeException(
318                       "Proxy invocation failed and getCause is null", ite);
319                   }
320                   if (cause instanceof UndeclaredThrowableException) {
321                     Throwable causeCause = cause.getCause();
322                     if (causeCause == null) {
323                       throw new RuntimeException("UndeclaredThrowableException had null cause!");
324                     }
325                     cause = cause.getCause();
326                   }
327                   throw cause;
328                 }
329               }
330             });
331   }
332 
333   /**
334    * Interface to implement to add a specific reordering logic in hdfs.
335    */
336   interface ReorderBlocks {
337     /**
338      *
339      * @param conf - the conf to use
340      * @param lbs - the LocatedBlocks to reorder
341      * @param src - the file name currently read
342      * @throws IOException - if something went wrong
343      */
344     void reorderBlocks(Configuration conf, LocatedBlocks lbs, String src) throws IOException;
345   }
346
347   /**
348    * We're putting at lowest priority the wal files blocks that are on the same datanode
349    * as the original regionserver which created these files. This because we fear that the
350    * datanode is actually dead, so if we use it it will timeout.
351    */
352   static class ReorderWALBlocks implements ReorderBlocks {
353     public void reorderBlocks(Configuration conf, LocatedBlocks lbs, String src)
354         throws IOException {
355
356       ServerName sn = AbstractFSWALProvider.getServerNameFromWALDirectoryName(conf, src);
357       if (sn == null) {
358         // It's not an WAL
359         return;
360       }
361
362       // Ok, so it's an WAL
363       String hostName = sn.getHostname();
364       if (LOG.isTraceEnabled()) {
365         LOG.trace(src +
366             " is an WAL file, so reordering blocks, last hostname will be:" + hostName);
367       }
368
369       // Just check for all blocks
370       for (LocatedBlock lb : lbs.getLocatedBlocks()) {
371         DatanodeInfo[] dnis = lb.getLocations();
372         if (dnis != null && dnis.length > 1) {
373           boolean found = false;
374           for (int i = 0; i < dnis.length - 1 && !found; i++) {
375             if (hostName.equals(dnis[i].getHostName())) {
376               // advance the other locations by one and put this one at the last place.
377               DatanodeInfo toLast = dnis[i];
378               System.arraycopy(dnis, i + 1, dnis, i, dnis.length - i - 1);
379               dnis[dnis.length - 1] = toLast;
380               found = true;
381             }
382           }
383         }
384       }
385     }
386   }
387
388   /**
389    * Create a new HFileSystem object, similar to FileSystem.get().
390    * This returns a filesystem object that avoids checksum
391    * verification in the filesystem for hfileblock-reads.
392    * For these blocks, checksum verification is done by HBase.
393    */
394   static public FileSystem get(Configuration conf) throws IOException {
395     return new HFileSystem(conf, true);
396   }
397
398   /**
399    * Wrap a LocalFileSystem within a HFileSystem.
400    */
401   static public FileSystem getLocalFs(Configuration conf) throws IOException {
402     return new HFileSystem(FileSystem.getLocal(conf));
403   }
404
405   /**
406    * The org.apache.hadoop.fs.FilterFileSystem does not yet support 
407    * createNonRecursive. This is a hadoop bug and when it is fixed in Hadoop,
408    * this definition will go away.
409    */
410   @SuppressWarnings("deprecation")
411   public FSDataOutputStream createNonRecursive(Path f,
412       boolean overwrite,
413       int bufferSize, short replication, long blockSize,
414       Progressable progress) throws IOException {
415     return fs.createNonRecursive(f, overwrite, bufferSize, replication,
416                                  blockSize, progress);
417   }
418 }