001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.fs;
019
020import edu.umd.cs.findbugs.annotations.Nullable;
021import java.io.Closeable;
022import java.io.IOException;
023import java.lang.reflect.Field;
024import java.lang.reflect.InvocationHandler;
025import java.lang.reflect.InvocationTargetException;
026import java.lang.reflect.Method;
027import java.lang.reflect.Modifier;
028import java.lang.reflect.Proxy;
029import java.lang.reflect.UndeclaredThrowableException;
030import java.net.URI;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FSDataOutputStream;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.FilterFileSystem;
035import org.apache.hadoop.fs.LocalFileSystem;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.ServerName;
038import org.apache.hadoop.hbase.util.CommonFSUtils;
039import org.apache.hadoop.hbase.util.ReflectionUtils;
040import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
041import org.apache.hadoop.hdfs.DFSClient;
042import org.apache.hadoop.hdfs.DistributedFileSystem;
043import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
044import org.apache.hadoop.hdfs.protocol.ClientProtocol;
045import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
046import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
047import org.apache.hadoop.hdfs.protocol.LocatedBlock;
048import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
049import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
050import org.apache.hadoop.ipc.RPC;
051import org.apache.hadoop.util.Progressable;
052import org.apache.yetus.audience.InterfaceAudience;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056/**
057 * An encapsulation for the FileSystem object that hbase uses to access data. This class allows the
058 * flexibility of using separate filesystem objects for reading and writing hfiles and wals.
059 */
060@InterfaceAudience.Private
061public class HFileSystem extends FilterFileSystem {
062  public static final Logger LOG = LoggerFactory.getLogger(HFileSystem.class);
063
064  private final FileSystem noChecksumFs; // read hfile data from storage
065  private final boolean useHBaseChecksum;
066  private static volatile byte unspecifiedStoragePolicyId = Byte.MIN_VALUE;
067
068  /**
069   * Create a FileSystem object for HBase regionservers.
070   * @param conf             The configuration to be used for the filesystem
071   * @param useHBaseChecksum if true, then use checksum verfication in hbase, otherwise delegate
072   *                         checksum verification to the FileSystem.
073   */
074  public HFileSystem(Configuration conf, boolean useHBaseChecksum) throws IOException {
075
076    // Create the default filesystem with checksum verification switched on.
077    // By default, any operation to this FilterFileSystem occurs on
078    // the underlying filesystem that has checksums switched on.
079    // This FS#get(URI, conf) clearly indicates in the javadoc that if the FS is
080    // not created it will initialize the FS and return that created FS. If it is
081    // already created it will just return the FS that was already created.
082    // We take pains to funnel all of our FileSystem instantiation through this call to ensure
083    // we never need to call FS.initialize ourself so that we do not have to track any state to
084    // avoid calling initialize more than once.
085    this.fs = FileSystem.get(getDefaultUri(conf), conf);
086    this.useHBaseChecksum = useHBaseChecksum;
087
088    // disable checksum verification for local fileSystem, see HBASE-11218
089    if (fs instanceof LocalFileSystem) {
090      fs.setWriteChecksum(false);
091      fs.setVerifyChecksum(false);
092    }
093
094    addLocationsOrderInterceptor(conf);
095
096    // If hbase checksum verification is switched on, then create a new
097    // filesystem object that has cksum verification turned off.
098    // We will avoid verifying checksums in the fs client, instead do it
099    // inside of hbase.
100    // If this is the local file system hadoop has a bug where seeks
101    // do not go to the correct location if setVerifyChecksum(false) is called.
102    // This manifests itself in that incorrect data is read and HFileBlocks won't be able to read
103    // their header magic numbers. See HBASE-5885
104    if (useHBaseChecksum && !(fs instanceof LocalFileSystem)) {
105      conf = new Configuration(conf);
106      conf.setBoolean("dfs.client.read.shortcircuit.skip.checksum", true);
107      this.noChecksumFs = maybeWrapFileSystem(newInstanceFileSystem(conf), conf);
108      this.noChecksumFs.setVerifyChecksum(false);
109    } else {
110      this.noChecksumFs = maybeWrapFileSystem(fs, conf);
111    }
112
113    this.fs = maybeWrapFileSystem(this.fs, conf);
114  }
115
116  /**
117   * Wrap a FileSystem object within a HFileSystem. The noChecksumFs and writefs are both set to be
118   * the same specified fs. Do not verify hbase-checksums while reading data from filesystem.
119   * @param fs Set the noChecksumFs and writeFs to this specified filesystem.
120   */
121  public HFileSystem(FileSystem fs) {
122    this.fs = fs;
123    this.noChecksumFs = fs;
124    this.useHBaseChecksum = false;
125  }
126
127  /**
128   * Returns the filesystem that is specially setup for doing reads from storage. This object avoids
129   * doing checksum verifications for reads.
130   * @return The FileSystem object that can be used to read data from files.
131   */
132  public FileSystem getNoChecksumFs() {
133    return noChecksumFs;
134  }
135
136  /**
137   * Returns the underlying filesystem
138   * @return The underlying FileSystem for this FilterFileSystem object.
139   */
140  public FileSystem getBackingFs() throws IOException {
141    return fs;
142  }
143
144  /**
145   * Set the source path (directory/file) to the specified storage policy.
146   * @param path       The source path (directory/file).
147   * @param policyName The name of the storage policy: 'HOT', 'COLD', etc. See see hadoop 2.6+
148   *                   org.apache.hadoop.hdfs.protocol.HdfsConstants for possible list e.g 'COLD',
149   *                   'WARM', 'HOT', 'ONE_SSD', 'ALL_SSD', 'LAZY_PERSIST'.
150   */
151  public void setStoragePolicy(Path path, String policyName) {
152    CommonFSUtils.setStoragePolicy(this.fs, path, policyName);
153  }
154
155  /**
156   * Get the storage policy of the source path (directory/file).
157   * @param path The source path (directory/file).
158   * @return Storage policy name, or {@code null} if not using {@link DistributedFileSystem} or
159   *         exception thrown when trying to get policy
160   */
161  @Nullable
162  public String getStoragePolicyName(Path path) {
163    try {
164      Object blockStoragePolicySpi =
165        ReflectionUtils.invokeMethod(this.fs, "getStoragePolicy", path);
166      return (String) ReflectionUtils.invokeMethod(blockStoragePolicySpi, "getName");
167    } catch (Exception e) {
168      // Maybe fail because of using old HDFS version, try the old way
169      if (LOG.isTraceEnabled()) {
170        LOG.trace("Failed to get policy directly", e);
171      }
172      return getStoragePolicyForOldHDFSVersion(path);
173    }
174  }
175
176  /**
177   * Before Hadoop 2.8.0, there's no getStoragePolicy method for FileSystem interface, and we need
178   * to keep compatible with it. See HADOOP-12161 for more details.
179   * @param path Path to get storage policy against
180   * @return the storage policy name
181   */
182  private String getStoragePolicyForOldHDFSVersion(Path path) {
183    try {
184      if (this.fs instanceof DistributedFileSystem) {
185        DistributedFileSystem dfs = (DistributedFileSystem) this.fs;
186        HdfsFileStatus status = dfs.getClient().getFileInfo(path.toUri().getPath());
187        if (null != status) {
188          if (unspecifiedStoragePolicyId < 0) {
189            // Get the unspecified id field through reflection to avoid compilation error.
190            // In later version BlockStoragePolicySuite#ID_UNSPECIFIED is moved to
191            // HdfsConstants#BLOCK_STORAGE_POLICY_ID_UNSPECIFIED
192            Field idUnspecified = BlockStoragePolicySuite.class.getField("ID_UNSPECIFIED");
193            unspecifiedStoragePolicyId = idUnspecified.getByte(BlockStoragePolicySuite.class);
194          }
195          byte storagePolicyId = status.getStoragePolicy();
196          if (storagePolicyId != unspecifiedStoragePolicyId) {
197            BlockStoragePolicy[] policies = dfs.getStoragePolicies();
198            for (BlockStoragePolicy policy : policies) {
199              if (policy.getId() == storagePolicyId) {
200                return policy.getName();
201              }
202            }
203          }
204        }
205      }
206    } catch (Throwable e) {
207      LOG.warn("failed to get block storage policy of [" + path + "]", e);
208    }
209
210    return null;
211  }
212
213  /**
214   * Are we verifying checksums in HBase?
215   * @return True, if hbase is configured to verify checksums, otherwise false.
216   */
217  public boolean useHBaseChecksum() {
218    return useHBaseChecksum;
219  }
220
221  /**
222   * Close this filesystem object
223   */
224  @Override
225  public void close() throws IOException {
226    super.close();
227    if (this.noChecksumFs != fs) {
228      this.noChecksumFs.close();
229    }
230  }
231
232  /**
233   * Returns a brand new instance of the FileSystem. It does not use the FileSystem.Cache. In newer
234   * versions of HDFS, we can directly invoke FileSystem.newInstance(Configuration).
235   * @param conf Configuration
236   * @return A new instance of the filesystem
237   */
238  private static FileSystem newInstanceFileSystem(Configuration conf) throws IOException {
239    URI uri = FileSystem.getDefaultUri(conf);
240    FileSystem fs = null;
241    Class<?> clazz = conf.getClass("fs." + uri.getScheme() + ".impl", null);
242    if (clazz != null) {
243      // This will be true for Hadoop 1.0, or 0.20.
244      fs = (FileSystem) org.apache.hadoop.util.ReflectionUtils.newInstance(clazz, conf);
245      fs.initialize(uri, conf);
246    } else {
247      // For Hadoop 2.0, we have to go through FileSystem for the filesystem
248      // implementation to be loaded by the service loader in case it has not
249      // been loaded yet.
250      Configuration clone = new Configuration(conf);
251      clone.setBoolean("fs." + uri.getScheme() + ".impl.disable.cache", true);
252      fs = FileSystem.get(uri, clone);
253    }
254    if (fs == null) {
255      throw new IOException("No FileSystem for scheme: " + uri.getScheme());
256    }
257
258    return fs;
259  }
260
261  /**
262   * Returns an instance of Filesystem wrapped into the class specified in hbase.fs.wrapper
263   * property, if one is set in the configuration, returns unmodified FS instance passed in as an
264   * argument otherwise.
265   * @param base Filesystem instance to wrap
266   * @param conf Configuration
267   * @return wrapped instance of FS, or the same instance if no wrapping configured.
268   */
269  private FileSystem maybeWrapFileSystem(FileSystem base, Configuration conf) {
270    try {
271      Class<?> clazz = conf.getClass("hbase.fs.wrapper", null);
272      if (clazz != null) {
273        return (FileSystem) clazz.getConstructor(FileSystem.class, Configuration.class)
274          .newInstance(base, conf);
275      }
276    } catch (Exception e) {
277      LOG.error("Failed to wrap filesystem: " + e);
278    }
279    return base;
280  }
281
282  public static boolean addLocationsOrderInterceptor(Configuration conf) throws IOException {
283    return addLocationsOrderInterceptor(conf, new ReorderWALBlocks());
284  }
285
286  /**
287   * Add an interceptor on the calls to the namenode#getBlockLocations from the DFSClient linked to
288   * this FileSystem. See HBASE-6435 for the background.
289   * <p/>
290   * There should be no reason, except testing, to create a specific ReorderBlocks.
291   * @return true if the interceptor was added, false otherwise.
292   */
293  static boolean addLocationsOrderInterceptor(Configuration conf, final ReorderBlocks lrb) {
294    if (!conf.getBoolean("hbase.filesystem.reorder.blocks", true)) { // activated by default
295      LOG.debug("addLocationsOrderInterceptor configured to false");
296      return false;
297    }
298
299    FileSystem fs;
300    try {
301      fs = FileSystem.get(conf);
302    } catch (IOException e) {
303      LOG.warn("Can't get the file system from the conf.", e);
304      return false;
305    }
306
307    if (!(fs instanceof DistributedFileSystem)) {
308      LOG.debug("The file system is not a DistributedFileSystem. "
309        + "Skipping on block location reordering");
310      return false;
311    }
312
313    DistributedFileSystem dfs = (DistributedFileSystem) fs;
314    DFSClient dfsc = dfs.getClient();
315    if (dfsc == null) {
316      LOG.warn("The DistributedFileSystem does not contain a DFSClient. Can't add the location "
317        + "block reordering interceptor. Continuing, but this is unexpected.");
318      return false;
319    }
320
321    try {
322      Field nf = DFSClient.class.getDeclaredField("namenode");
323      nf.setAccessible(true);
324      Field modifiersField = Field.class.getDeclaredField("modifiers");
325      modifiersField.setAccessible(true);
326      modifiersField.setInt(nf, nf.getModifiers() & ~Modifier.FINAL);
327
328      ClientProtocol namenode = (ClientProtocol) nf.get(dfsc);
329      if (namenode == null) {
330        LOG.warn("The DFSClient is not linked to a namenode. Can't add the location block"
331          + " reordering interceptor. Continuing, but this is unexpected.");
332        return false;
333      }
334
335      ClientProtocol cp1 = createReorderingProxy(namenode, lrb, conf);
336      nf.set(dfsc, cp1);
337      LOG.info("Added intercepting call to namenode#getBlockLocations so can do block reordering"
338        + " using class " + lrb.getClass().getName());
339    } catch (NoSuchFieldException e) {
340      LOG.warn("Can't modify the DFSClient#namenode field to add the location reorder.", e);
341      return false;
342    } catch (IllegalAccessException e) {
343      LOG.warn("Can't modify the DFSClient#namenode field to add the location reorder.", e);
344      return false;
345    }
346
347    return true;
348  }
349
350  private static ClientProtocol createReorderingProxy(final ClientProtocol cp,
351    final ReorderBlocks lrb, final Configuration conf) {
352    return (ClientProtocol) Proxy.newProxyInstance(cp.getClass().getClassLoader(),
353      new Class[] { ClientProtocol.class, Closeable.class }, new InvocationHandler() {
354        @Override
355        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
356          try {
357            if ((args == null || args.length == 0) && "close".equals(method.getName())) {
358              RPC.stopProxy(cp);
359              return null;
360            } else {
361              Object res = method.invoke(cp, args);
362              if (
363                res != null && args != null && args.length == 3
364                  && "getBlockLocations".equals(method.getName()) && res instanceof LocatedBlocks
365                  && args[0] instanceof String && args[0] != null
366              ) {
367                lrb.reorderBlocks(conf, (LocatedBlocks) res, (String) args[0]);
368              }
369              return res;
370            }
371          } catch (InvocationTargetException ite) {
372            // We will have this for all the exception, checked on not, sent
373            // by any layer, including the functional exception
374            Throwable cause = ite.getCause();
375            if (cause == null) {
376              throw new RuntimeException("Proxy invocation failed and getCause is null", ite);
377            }
378            if (cause instanceof UndeclaredThrowableException) {
379              Throwable causeCause = cause.getCause();
380              if (causeCause == null) {
381                throw new RuntimeException("UndeclaredThrowableException had null cause!");
382              }
383              cause = cause.getCause();
384            }
385            throw cause;
386          }
387        }
388      });
389  }
390
391  /**
392   * Interface to implement to add a specific reordering logic in hdfs.
393   */
394  interface ReorderBlocks {
395    /**
396     * @param conf - the conf to use
397     * @param lbs  - the LocatedBlocks to reorder
398     * @param src  - the file name currently read
399     * @throws IOException - if something went wrong
400     */
401    void reorderBlocks(Configuration conf, LocatedBlocks lbs, String src) throws IOException;
402  }
403
404  /**
405   * We're putting at lowest priority the wal files blocks that are on the same datanode as the
406   * original regionserver which created these files. This because we fear that the datanode is
407   * actually dead, so if we use it it will timeout.
408   */
409  static class ReorderWALBlocks implements ReorderBlocks {
410    @Override
411    public void reorderBlocks(Configuration conf, LocatedBlocks lbs, String src)
412      throws IOException {
413
414      ServerName sn = AbstractFSWALProvider.getServerNameFromWALDirectoryName(conf, src);
415      if (sn == null) {
416        // It's not an WAL
417        return;
418      }
419
420      // Ok, so it's an WAL
421      String hostName = sn.getHostname();
422      if (LOG.isTraceEnabled()) {
423        LOG.trace(src + " is an WAL file, so reordering blocks, last hostname will be:" + hostName);
424      }
425
426      // Just check for all blocks
427      for (LocatedBlock lb : lbs.getLocatedBlocks()) {
428        DatanodeInfo[] dnis = lb.getLocations();
429        if (dnis != null && dnis.length > 1) {
430          boolean found = false;
431          for (int i = 0; i < dnis.length - 1 && !found; i++) {
432            if (hostName.equals(dnis[i].getHostName())) {
433              // advance the other locations by one and put this one at the last place.
434              DatanodeInfo toLast = dnis[i];
435              System.arraycopy(dnis, i + 1, dnis, i, dnis.length - i - 1);
436              dnis[dnis.length - 1] = toLast;
437              found = true;
438            }
439          }
440        }
441      }
442    }
443  }
444
445  /**
446   * Create a new HFileSystem object, similar to FileSystem.get(). This returns a filesystem object
447   * that avoids checksum verification in the filesystem for hfileblock-reads. For these blocks,
448   * checksum verification is done by HBase.
449   */
450  static public FileSystem get(Configuration conf) throws IOException {
451    return new HFileSystem(conf, true);
452  }
453
454  /**
455   * Wrap a LocalFileSystem within a HFileSystem.
456   */
457  static public FileSystem getLocalFs(Configuration conf) throws IOException {
458    return new HFileSystem(FileSystem.getLocal(conf));
459  }
460
461  /**
462   * The org.apache.hadoop.fs.FilterFileSystem does not yet support createNonRecursive. This is a
463   * hadoop bug and when it is fixed in Hadoop, this definition will go away.
464   */
465  @Override
466  @SuppressWarnings("deprecation")
467  public FSDataOutputStream createNonRecursive(Path f, boolean overwrite, int bufferSize,
468    short replication, long blockSize, Progressable progress) throws IOException {
469    return fs.createNonRecursive(f, overwrite, bufferSize, replication, blockSize, progress);
470  }
471}