001/*
002 * Copyright The Apache Software Foundation
003 *
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package org.apache.hadoop.hbase.fs;
022
023import edu.umd.cs.findbugs.annotations.Nullable;
024import java.io.Closeable;
025import java.io.IOException;
026import java.lang.reflect.Field;
027import java.lang.reflect.InvocationHandler;
028import java.lang.reflect.InvocationTargetException;
029import java.lang.reflect.Method;
030import java.lang.reflect.Modifier;
031import java.lang.reflect.Proxy;
032import java.lang.reflect.UndeclaredThrowableException;
033import java.net.URI;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.FSDataOutputStream;
036import org.apache.hadoop.fs.FileSystem;
037import org.apache.hadoop.fs.FilterFileSystem;
038import org.apache.hadoop.fs.LocalFileSystem;
039import org.apache.hadoop.fs.Path;
040import org.apache.hadoop.hbase.ServerName;
041import org.apache.hadoop.hbase.util.CommonFSUtils;
042import org.apache.hadoop.hbase.util.ReflectionUtils;
043import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
044import org.apache.hadoop.hdfs.DFSClient;
045import org.apache.hadoop.hdfs.DistributedFileSystem;
046import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
047import org.apache.hadoop.hdfs.protocol.ClientProtocol;
048import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
049import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
050import org.apache.hadoop.hdfs.protocol.LocatedBlock;
051import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
052import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
053import org.apache.hadoop.ipc.RPC;
054import org.apache.hadoop.util.Progressable;
055import org.apache.yetus.audience.InterfaceAudience;
056import org.slf4j.Logger;
057import org.slf4j.LoggerFactory;
058
059/**
060 * An encapsulation for the FileSystem object that hbase uses to access
061 * data. This class allows the flexibility of using
062 * separate filesystem objects for reading and writing hfiles and wals.
063 */
064@InterfaceAudience.Private
065public class HFileSystem extends FilterFileSystem {
066  public static final Logger LOG = LoggerFactory.getLogger(HFileSystem.class);
067
068  private final FileSystem noChecksumFs;   // read hfile data from storage
069  private final boolean useHBaseChecksum;
070  private static volatile byte unspecifiedStoragePolicyId = Byte.MIN_VALUE;
071
072  /**
073   * Create a FileSystem object for HBase regionservers.
074   * @param conf The configuration to be used for the filesystem
075   * @param useHBaseChecksum if true, then use
076   *        checksum verfication in hbase, otherwise
077   *        delegate checksum verification to the FileSystem.
078   */
079  public HFileSystem(Configuration conf, boolean useHBaseChecksum)
080    throws IOException {
081
082    // Create the default filesystem with checksum verification switched on.
083    // By default, any operation to this FilterFileSystem occurs on
084    // the underlying filesystem that has checksums switched on.
085    // This FS#get(URI, conf) clearly indicates in the javadoc that if the FS is
086    // not created it will initialize the FS and return that created FS. If it is
087    // already created it will just return the FS that was already created.
088    // We take pains to funnel all of our FileSystem instantiation through this call to ensure
089    // we never need to call FS.initialize ourself so that we do not have to track any state to
090    // avoid calling initialize more than once.
091    this.fs = FileSystem.get(getDefaultUri(conf), conf);
092    this.useHBaseChecksum = useHBaseChecksum;
093
094    // disable checksum verification for local fileSystem, see HBASE-11218
095    if (fs instanceof LocalFileSystem) {
096      fs.setWriteChecksum(false);
097      fs.setVerifyChecksum(false);
098    }
099
100    addLocationsOrderInterceptor(conf);
101
102    // If hbase checksum verification is switched on, then create a new
103    // filesystem object that has cksum verification turned off.
104    // We will avoid verifying checksums in the fs client, instead do it
105    // inside of hbase.
106    // If this is the local file system hadoop has a bug where seeks
107    // do not go to the correct location if setVerifyChecksum(false) is called.
108    // This manifests itself in that incorrect data is read and HFileBlocks won't be able to read
109    // their header magic numbers. See HBASE-5885
110    if (useHBaseChecksum && !(fs instanceof LocalFileSystem)) {
111      conf = new Configuration(conf);
112      conf.setBoolean("dfs.client.read.shortcircuit.skip.checksum", true);
113      this.noChecksumFs = maybeWrapFileSystem(newInstanceFileSystem(conf), conf);
114      this.noChecksumFs.setVerifyChecksum(false);
115    } else {
116      this.noChecksumFs = maybeWrapFileSystem(fs, conf);
117    }
118
119    this.fs = maybeWrapFileSystem(this.fs, conf);
120  }
121
122  /**
123   * Wrap a FileSystem object within a HFileSystem. The noChecksumFs and
124   * writefs are both set to be the same specified fs.
125   * Do not verify hbase-checksums while reading data from filesystem.
126   * @param fs Set the noChecksumFs and writeFs to this specified filesystem.
127   */
128  public HFileSystem(FileSystem fs) {
129    this.fs = fs;
130    this.noChecksumFs = fs;
131    this.useHBaseChecksum = false;
132  }
133
134  /**
135   * Returns the filesystem that is specially setup for
136   * doing reads from storage. This object avoids doing
137   * checksum verifications for reads.
138   * @return The FileSystem object that can be used to read data
139   *         from files.
140   */
141  public FileSystem getNoChecksumFs() {
142    return noChecksumFs;
143  }
144
145  /**
146   * Returns the underlying filesystem
147   * @return The underlying FileSystem for this FilterFileSystem object.
148   */
149  public FileSystem getBackingFs() throws IOException {
150    return fs;
151  }
152
153  /**
154   * Set the source path (directory/file) to the specified storage policy.
155   * @param path The source path (directory/file).
156   * @param policyName The name of the storage policy: 'HOT', 'COLD', etc.
157   * See see hadoop 2.6+ org.apache.hadoop.hdfs.protocol.HdfsConstants for possible list e.g
158   * 'COLD', 'WARM', 'HOT', 'ONE_SSD', 'ALL_SSD', 'LAZY_PERSIST'.
159   */
160  public void setStoragePolicy(Path path, String policyName) {
161    CommonFSUtils.setStoragePolicy(this.fs, path, policyName);
162  }
163
164  /**
165   * Get the storage policy of the source path (directory/file).
166   * @param path The source path (directory/file).
167   * @return Storage policy name, or {@code null} if not using {@link DistributedFileSystem} or
168   *         exception thrown when trying to get policy
169   */
170  @Nullable
171  public String getStoragePolicyName(Path path) {
172    try {
173      Object blockStoragePolicySpi =
174          ReflectionUtils.invokeMethod(this.fs, "getStoragePolicy", path);
175      return (String) ReflectionUtils.invokeMethod(blockStoragePolicySpi, "getName");
176    } catch (Exception e) {
177      // Maybe fail because of using old HDFS version, try the old way
178      if (LOG.isTraceEnabled()) {
179        LOG.trace("Failed to get policy directly", e);
180      }
181      return getStoragePolicyForOldHDFSVersion(path);
182    }
183  }
184
185  /**
186   * Before Hadoop 2.8.0, there's no getStoragePolicy method for FileSystem interface, and we need
187   * to keep compatible with it. See HADOOP-12161 for more details.
188   * @param path Path to get storage policy against
189   * @return the storage policy name
190   */
191  private String getStoragePolicyForOldHDFSVersion(Path path) {
192    try {
193      if (this.fs instanceof DistributedFileSystem) {
194        DistributedFileSystem dfs = (DistributedFileSystem) this.fs;
195        HdfsFileStatus status = dfs.getClient().getFileInfo(path.toUri().getPath());
196        if (null != status) {
197          if (unspecifiedStoragePolicyId < 0) {
198            // Get the unspecified id field through reflection to avoid compilation error.
199            // In later version BlockStoragePolicySuite#ID_UNSPECIFIED is moved to
200            // HdfsConstants#BLOCK_STORAGE_POLICY_ID_UNSPECIFIED
201            Field idUnspecified = BlockStoragePolicySuite.class.getField("ID_UNSPECIFIED");
202            unspecifiedStoragePolicyId = idUnspecified.getByte(BlockStoragePolicySuite.class);
203          }
204          byte storagePolicyId = status.getStoragePolicy();
205          if (storagePolicyId != unspecifiedStoragePolicyId) {
206            BlockStoragePolicy[] policies = dfs.getStoragePolicies();
207            for (BlockStoragePolicy policy : policies) {
208              if (policy.getId() == storagePolicyId) {
209                return policy.getName();
210              }
211            }
212          }
213        }
214      }
215    } catch (Throwable e) {
216      LOG.warn("failed to get block storage policy of [" + path + "]", e);
217    }
218
219    return null;
220  }
221
222  /**
223   * Are we verifying checksums in HBase?
224   * @return True, if hbase is configured to verify checksums,
225   *         otherwise false.
226   */
227  public boolean useHBaseChecksum() {
228    return useHBaseChecksum;
229  }
230
231  /**
232   * Close this filesystem object
233   */
234  @Override
235  public void close() throws IOException {
236    super.close();
237    if (this.noChecksumFs != fs) {
238      this.noChecksumFs.close();
239    }
240  }
241
242  /**
243   * Returns a brand new instance of the FileSystem. It does not use
244   * the FileSystem.Cache. In newer versions of HDFS, we can directly
245   * invoke FileSystem.newInstance(Configuration).
246   *
247   * @param conf Configuration
248   * @return A new instance of the filesystem
249   */
250  private static FileSystem newInstanceFileSystem(Configuration conf) throws IOException {
251    URI uri = FileSystem.getDefaultUri(conf);
252    FileSystem fs = null;
253    Class<?> clazz = conf.getClass("fs." + uri.getScheme() + ".impl", null);
254    if (clazz != null) {
255      // This will be true for Hadoop 1.0, or 0.20.
256      fs = (FileSystem) org.apache.hadoop.util.ReflectionUtils.newInstance(clazz, conf);
257      fs.initialize(uri, conf);
258    } else {
259      // For Hadoop 2.0, we have to go through FileSystem for the filesystem
260      // implementation to be loaded by the service loader in case it has not
261      // been loaded yet.
262      Configuration clone = new Configuration(conf);
263      clone.setBoolean("fs." + uri.getScheme() + ".impl.disable.cache", true);
264      fs = FileSystem.get(uri, clone);
265    }
266    if (fs == null) {
267      throw new IOException("No FileSystem for scheme: " + uri.getScheme());
268    }
269
270    return fs;
271  }
272
273  /**
274   * Returns an instance of Filesystem wrapped into the class specified in
275   * hbase.fs.wrapper property, if one is set in the configuration, returns
276   * unmodified FS instance passed in as an argument otherwise.
277   * @param base Filesystem instance to wrap
278   * @param conf Configuration
279   * @return wrapped instance of FS, or the same instance if no wrapping configured.
280   */
281  private FileSystem maybeWrapFileSystem(FileSystem base, Configuration conf) {
282    try {
283      Class<?> clazz = conf.getClass("hbase.fs.wrapper", null);
284      if (clazz != null) {
285        return (FileSystem) clazz.getConstructor(FileSystem.class, Configuration.class)
286          .newInstance(base, conf);
287      }
288    } catch (Exception e) {
289      LOG.error("Failed to wrap filesystem: " + e);
290    }
291    return base;
292  }
293
294  public static boolean addLocationsOrderInterceptor(Configuration conf) throws IOException {
295    return addLocationsOrderInterceptor(conf, new ReorderWALBlocks());
296  }
297
298  /**
299   * Add an interceptor on the calls to the namenode#getBlockLocations from the DFSClient
300   * linked to this FileSystem. See HBASE-6435 for the background.
301   * <p/>
302   * There should be no reason, except testing, to create a specific ReorderBlocks.
303   *
304   * @return true if the interceptor was added, false otherwise.
305   */
306  static boolean addLocationsOrderInterceptor(Configuration conf, final ReorderBlocks lrb) {
307    if (!conf.getBoolean("hbase.filesystem.reorder.blocks", true)) {  // activated by default
308      LOG.debug("addLocationsOrderInterceptor configured to false");
309      return false;
310    }
311
312    FileSystem fs;
313    try {
314      fs = FileSystem.get(conf);
315    } catch (IOException e) {
316      LOG.warn("Can't get the file system from the conf.", e);
317      return false;
318    }
319
320    if (!(fs instanceof DistributedFileSystem)) {
321      LOG.debug("The file system is not a DistributedFileSystem. " +
322          "Skipping on block location reordering");
323      return false;
324    }
325
326    DistributedFileSystem dfs = (DistributedFileSystem) fs;
327    DFSClient dfsc = dfs.getClient();
328    if (dfsc == null) {
329      LOG.warn("The DistributedFileSystem does not contain a DFSClient. Can't add the location " +
330          "block reordering interceptor. Continuing, but this is unexpected."
331      );
332      return false;
333    }
334
335    try {
336      Field nf = DFSClient.class.getDeclaredField("namenode");
337      nf.setAccessible(true);
338      Field modifiersField = Field.class.getDeclaredField("modifiers");
339      modifiersField.setAccessible(true);
340      modifiersField.setInt(nf, nf.getModifiers() & ~Modifier.FINAL);
341
342      ClientProtocol namenode = (ClientProtocol) nf.get(dfsc);
343      if (namenode == null) {
344        LOG.warn("The DFSClient is not linked to a namenode. Can't add the location block" +
345            " reordering interceptor. Continuing, but this is unexpected."
346        );
347        return false;
348      }
349
350      ClientProtocol cp1 = createReorderingProxy(namenode, lrb, conf);
351      nf.set(dfsc, cp1);
352      LOG.info("Added intercepting call to namenode#getBlockLocations so can do block reordering" +
353        " using class " + lrb.getClass().getName());
354    } catch (NoSuchFieldException e) {
355      LOG.warn("Can't modify the DFSClient#namenode field to add the location reorder.", e);
356      return false;
357    } catch (IllegalAccessException e) {
358      LOG.warn("Can't modify the DFSClient#namenode field to add the location reorder.", e);
359      return false;
360    }
361
362    return true;
363  }
364
365  private static ClientProtocol createReorderingProxy(final ClientProtocol cp,
366      final ReorderBlocks lrb, final Configuration conf) {
367    return (ClientProtocol) Proxy.newProxyInstance(cp.getClass().getClassLoader(),
368        new Class[]{ClientProtocol.class, Closeable.class}, new InvocationHandler() {
369          @Override
370          public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
371            try {
372              if ((args == null || args.length == 0) && "close".equals(method.getName())) {
373                RPC.stopProxy(cp);
374                return null;
375              } else {
376                Object res = method.invoke(cp, args);
377                if (res != null && args != null && args.length == 3
378                    && "getBlockLocations".equals(method.getName())
379                    && res instanceof LocatedBlocks
380                    && args[0] instanceof String
381                    && args[0] != null) {
382                  lrb.reorderBlocks(conf, (LocatedBlocks) res, (String) args[0]);
383                }
384                return res;
385              }
386            } catch  (InvocationTargetException ite) {
387              // We will have this for all the exception, checked on not, sent
388              //  by any layer, including the functional exception
389              Throwable cause = ite.getCause();
390              if (cause == null){
391                throw new RuntimeException("Proxy invocation failed and getCause is null", ite);
392              }
393              if (cause instanceof UndeclaredThrowableException) {
394                Throwable causeCause = cause.getCause();
395                if (causeCause == null) {
396                  throw new RuntimeException("UndeclaredThrowableException had null cause!");
397                }
398                cause = cause.getCause();
399              }
400              throw cause;
401            }
402          }
403        });
404  }
405
406  /**
407   * Interface to implement to add a specific reordering logic in hdfs.
408   */
409  interface ReorderBlocks {
410    /**
411     *
412     * @param conf - the conf to use
413     * @param lbs - the LocatedBlocks to reorder
414     * @param src - the file name currently read
415     * @throws IOException - if something went wrong
416     */
417    void reorderBlocks(Configuration conf, LocatedBlocks lbs, String src) throws IOException;
418  }
419
420  /**
421   * We're putting at lowest priority the wal files blocks that are on the same datanode
422   * as the original regionserver which created these files. This because we fear that the
423   * datanode is actually dead, so if we use it it will timeout.
424   */
425  static class ReorderWALBlocks implements ReorderBlocks {
426    @Override
427    public void reorderBlocks(Configuration conf, LocatedBlocks lbs, String src)
428        throws IOException {
429
430      ServerName sn = AbstractFSWALProvider.getServerNameFromWALDirectoryName(conf, src);
431      if (sn == null) {
432        // It's not an WAL
433        return;
434      }
435
436      // Ok, so it's an WAL
437      String hostName = sn.getHostname();
438      if (LOG.isTraceEnabled()) {
439        LOG.trace(src +
440            " is an WAL file, so reordering blocks, last hostname will be:" + hostName);
441      }
442
443      // Just check for all blocks
444      for (LocatedBlock lb : lbs.getLocatedBlocks()) {
445        DatanodeInfo[] dnis = lb.getLocations();
446        if (dnis != null && dnis.length > 1) {
447          boolean found = false;
448          for (int i = 0; i < dnis.length - 1 && !found; i++) {
449            if (hostName.equals(dnis[i].getHostName())) {
450              // advance the other locations by one and put this one at the last place.
451              DatanodeInfo toLast = dnis[i];
452              System.arraycopy(dnis, i + 1, dnis, i, dnis.length - i - 1);
453              dnis[dnis.length - 1] = toLast;
454              found = true;
455            }
456          }
457        }
458      }
459    }
460  }
461
462  /**
463   * Create a new HFileSystem object, similar to FileSystem.get().
464   * This returns a filesystem object that avoids checksum
465   * verification in the filesystem for hfileblock-reads.
466   * For these blocks, checksum verification is done by HBase.
467   */
468  static public FileSystem get(Configuration conf) throws IOException {
469    return new HFileSystem(conf, true);
470  }
471
472  /**
473   * Wrap a LocalFileSystem within a HFileSystem.
474   */
475  static public FileSystem getLocalFs(Configuration conf) throws IOException {
476    return new HFileSystem(FileSystem.getLocal(conf));
477  }
478
479  /**
480   * The org.apache.hadoop.fs.FilterFileSystem does not yet support
481   * createNonRecursive. This is a hadoop bug and when it is fixed in Hadoop,
482   * this definition will go away.
483   */
484  @Override
485  @SuppressWarnings("deprecation")
486  public FSDataOutputStream createNonRecursive(Path f,
487      boolean overwrite,
488      int bufferSize, short replication, long blockSize,
489      Progressable progress) throws IOException {
490    return fs.createNonRecursive(f, overwrite, bufferSize, replication,
491                                 blockSize, progress);
492  }
493}