001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.util;
020
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.lang.reflect.InvocationTargetException;
024import java.lang.reflect.Method;
025import java.net.URI;
026import java.net.URISyntaxException;
027import java.util.List;
028import java.util.Locale;
029import java.util.Map;
030import java.util.concurrent.ConcurrentHashMap;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FSDataOutputStream;
033import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
034import org.apache.hadoop.fs.FileStatus;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.LocatedFileStatus;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.fs.PathFilter;
039import org.apache.hadoop.fs.RemoteIterator;
040import org.apache.hadoop.fs.permission.FsPermission;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.yetus.audience.InterfaceAudience;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
048
049/**
050 * Utility methods for interacting with the underlying file system.
051 * <p/>
052 * Note that {@link #setStoragePolicy(FileSystem, Path, String)} is tested in TestFSUtils and
053 * pre-commit will run the hbase-server tests if there's code change in this class. See
054 * <a href="https://issues.apache.org/jira/browse/HBASE-20838">HBASE-20838</a> for more details.
055 */
056@InterfaceAudience.Private
057public final class CommonFSUtils {
058  private static final Logger LOG = LoggerFactory.getLogger(CommonFSUtils.class);
059
060  /** Parameter name for HBase WAL directory */
061  public static final String HBASE_WAL_DIR = "hbase.wal.dir";
062
063  /** Parameter to disable stream capability enforcement checks */
064  public static final String UNSAFE_STREAM_CAPABILITY_ENFORCE =
065    "hbase.unsafe.stream.capability.enforce";
066
067  /** Full access permissions (starting point for a umask) */
068  public static final String FULL_RWX_PERMISSIONS = "777";
069
070  private CommonFSUtils() {
071  }
072
073  /**
074   * Compare of path component. Does not consider schema; i.e. if schemas
075   * different but <code>path</code> starts with <code>rootPath</code>,
076   * then the function returns true
077   * @param rootPath value to check for
078   * @param path subject to check
079   * @return True if <code>path</code> starts with <code>rootPath</code>
080   */
081  public static boolean isStartingWithPath(final Path rootPath, final String path) {
082    String uriRootPath = rootPath.toUri().getPath();
083    String tailUriPath = (new Path(path)).toUri().getPath();
084    return tailUriPath.startsWith(uriRootPath);
085  }
086
087  /**
088   * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
089   * '/a/b/c' part. Does not consider schema; i.e. if schemas different but path or subpath matches,
090   * the two will equate.
091   * @param pathToSearch Path we will be trying to match against.
092   * @param pathTail what to match
093   * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
094   */
095  public static boolean isMatchingTail(final Path pathToSearch, String pathTail) {
096    return isMatchingTail(pathToSearch, new Path(pathTail));
097  }
098
099  /**
100   * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
101   * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true.  Does not consider
102   * schema; i.e. if schemas different but path or subpath matches, the two will equate.
103   * @param pathToSearch Path we will be trying to match agains against
104   * @param pathTail what to match
105   * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
106   */
107  public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) {
108    if (pathToSearch.depth() != pathTail.depth()) {
109      return false;
110    }
111    Path tailPath = pathTail;
112    String tailName;
113    Path toSearch = pathToSearch;
114    String toSearchName;
115    boolean result = false;
116    do {
117      tailName = tailPath.getName();
118      if (tailName == null || tailName.length() <= 0) {
119        result = true;
120        break;
121      }
122      toSearchName = toSearch.getName();
123      if (toSearchName == null || toSearchName.length() <= 0) {
124        break;
125      }
126      // Move up a parent on each path for next go around.  Path doesn't let us go off the end.
127      tailPath = tailPath.getParent();
128      toSearch = toSearch.getParent();
129    } while(tailName.equals(toSearchName));
130    return result;
131  }
132
133  /**
134   * Delete if exists.
135   * @param fs filesystem object
136   * @param dir directory to delete
137   * @return True if deleted <code>dir</code>
138   * @throws IOException e
139   */
140  public static boolean deleteDirectory(final FileSystem fs, final Path dir) throws IOException {
141    return fs.exists(dir) && fs.delete(dir, true);
142  }
143
144  /**
145   * Return the number of bytes that large input files should be optimally
146   * be split into to minimize i/o time.
147   *
148   * @param fs filesystem object
149   * @return the default block size for the path's filesystem
150   */
151  public static long getDefaultBlockSize(final FileSystem fs, final Path path) {
152    return fs.getDefaultBlockSize(path);
153  }
154
155  /*
156   * Get the default replication.
157   *
158   * @param fs filesystem object
159   * @param f path of file
160   * @return default replication for the path's filesystem
161   */
162  public static short getDefaultReplication(final FileSystem fs, final Path path) {
163    return fs.getDefaultReplication(path);
164  }
165
166  /**
167   * Returns the default buffer size to use during writes.
168   *
169   * The size of the buffer should probably be a multiple of hardware
170   * page size (4096 on Intel x86), and it determines how much data is
171   * buffered during read and write operations.
172   *
173   * @param fs filesystem object
174   * @return default buffer size to use during writes
175   */
176  public static int getDefaultBufferSize(final FileSystem fs) {
177    return fs.getConf().getInt("io.file.buffer.size", 4096);
178  }
179
180  /**
181   * Create the specified file on the filesystem. By default, this will:
182   * <ol>
183   * <li>apply the umask in the configuration (if it is enabled)</li>
184   * <li>use the fs configured buffer size (or 4096 if not set)</li>
185   * <li>use the default replication</li>
186   * <li>use the default block size</li>
187   * <li>not track progress</li>
188   * </ol>
189   *
190   * @param fs {@link FileSystem} on which to write the file
191   * @param path {@link Path} to the file to write
192   * @param perm intial permissions
193   * @param overwrite Whether or not the created file should be overwritten.
194   * @return output stream to the created file
195   * @throws IOException if the file cannot be created
196   */
197  public static FSDataOutputStream create(FileSystem fs, Path path,
198      FsPermission perm, boolean overwrite) throws IOException {
199    if (LOG.isTraceEnabled()) {
200      LOG.trace("Creating file={} with permission={}, overwrite={}", path, perm, overwrite);
201    }
202    return fs.create(path, perm, overwrite, getDefaultBufferSize(fs),
203        getDefaultReplication(fs, path), getDefaultBlockSize(fs, path), null);
204  }
205
206  /**
207   * Get the file permissions specified in the configuration, if they are
208   * enabled.
209   *
210   * @param fs filesystem that the file will be created on.
211   * @param conf configuration to read for determining if permissions are
212   *          enabled and which to use
213   * @param permssionConfKey property key in the configuration to use when
214   *          finding the permission
215   * @return the permission to use when creating a new file on the fs. If
216   *         special permissions are not specified in the configuration, then
217   *         the default permissions on the the fs will be returned.
218   */
219  public static FsPermission getFilePermissions(final FileSystem fs,
220      final Configuration conf, final String permssionConfKey) {
221    boolean enablePermissions = conf.getBoolean(
222        HConstants.ENABLE_DATA_FILE_UMASK, false);
223
224    if (enablePermissions) {
225      try {
226        FsPermission perm = new FsPermission(FULL_RWX_PERMISSIONS);
227        // make sure that we have a mask, if not, go default.
228        String mask = conf.get(permssionConfKey);
229        if (mask == null) {
230          return FsPermission.getFileDefault();
231        }
232        // appy the umask
233        FsPermission umask = new FsPermission(mask);
234        return perm.applyUMask(umask);
235      } catch (IllegalArgumentException e) {
236        LOG.warn(
237            "Incorrect umask attempted to be created: "
238                + conf.get(permssionConfKey)
239                + ", using default file permissions.", e);
240        return FsPermission.getFileDefault();
241      }
242    }
243    return FsPermission.getFileDefault();
244  }
245
246  /**
247   * Verifies root directory path is a valid URI with a scheme
248   *
249   * @param root root directory path
250   * @return Passed <code>root</code> argument.
251   * @throws IOException if not a valid URI with a scheme
252   */
253  public static Path validateRootPath(Path root) throws IOException {
254    try {
255      URI rootURI = new URI(root.toString());
256      String scheme = rootURI.getScheme();
257      if (scheme == null) {
258        throw new IOException("Root directory does not have a scheme");
259      }
260      return root;
261    } catch (URISyntaxException e) {
262      throw new IOException("Root directory path is not a valid " +
263        "URI -- check your " + HConstants.HBASE_DIR + " configuration", e);
264    }
265  }
266
267  /**
268   * Checks for the presence of the WAL log root path (using the provided conf object) in the given
269   * path. If it exists, this method removes it and returns the String representation of remaining
270   * relative path.
271   * @param path must not be null
272   * @param conf must not be null
273   * @return String representation of the remaining relative path
274   * @throws IOException from underlying filesystem
275   */
276  public static String removeWALRootPath(Path path, final Configuration conf) throws IOException {
277    Path root = getWALRootDir(conf);
278    String pathStr = path.toString();
279    // check that the path is absolute... it has the root path in it.
280    if (!pathStr.startsWith(root.toString())) {
281      return pathStr;
282    }
283    // if not, return as it is.
284    return pathStr.substring(root.toString().length() + 1);// remove the "/" too.
285  }
286
287  /**
288   * Return the 'path' component of a Path.  In Hadoop, Path is a URI.  This
289   * method returns the 'path' component of a Path's URI: e.g. If a Path is
290   * <code>hdfs://example.org:9000/hbase_trunk/TestTable/compaction.dir</code>,
291   * this method returns <code>/hbase_trunk/TestTable/compaction.dir</code>.
292   * This method is useful if you want to print out a Path without qualifying
293   * Filesystem instance.
294   * @param p Filesystem Path whose 'path' component we are to return.
295   * @return Path portion of the Filesystem
296   */
297  public static String getPath(Path p) {
298    return p.toUri().getPath();
299  }
300
301  /**
302   * @param c configuration
303   * @return {@link Path} to hbase root directory from
304   *     configuration as a qualified Path.
305   * @throws IOException e
306   */
307  public static Path getRootDir(final Configuration c) throws IOException {
308    Path p = new Path(c.get(HConstants.HBASE_DIR));
309    FileSystem fs = p.getFileSystem(c);
310    return p.makeQualified(fs.getUri(), fs.getWorkingDirectory());
311  }
312
313  public static void setRootDir(final Configuration c, final Path root) {
314    c.set(HConstants.HBASE_DIR, root.toString());
315  }
316
317  public static void setFsDefault(final Configuration c, final Path root) {
318    c.set("fs.defaultFS", root.toString());    // for hadoop 0.21+
319  }
320
321  public static void setFsDefault(final Configuration c, final String uri) {
322    c.set("fs.defaultFS", uri); // for hadoop 0.21+
323  }
324
325  public static FileSystem getRootDirFileSystem(final Configuration c) throws IOException {
326    Path p = getRootDir(c);
327    return p.getFileSystem(c);
328  }
329
330  /**
331   * @param c configuration
332   * @return {@link Path} to hbase log root directory: e.g. {@value HBASE_WAL_DIR} from
333   *     configuration as a qualified Path. Defaults to HBase root dir.
334   * @throws IOException e
335   */
336  public static Path getWALRootDir(final Configuration c) throws IOException {
337
338    Path p = new Path(c.get(HBASE_WAL_DIR, c.get(HConstants.HBASE_DIR)));
339    if (!isValidWALRootDir(p, c)) {
340      return getRootDir(c);
341    }
342    FileSystem fs = p.getFileSystem(c);
343    return p.makeQualified(fs.getUri(), fs.getWorkingDirectory());
344  }
345
346  /**
347   * Returns the URI in the string format
348   * @param c configuration
349   * @param p path
350   * @return - the URI's to string format
351   * @throws IOException
352   */
353  public static String getDirUri(final Configuration c, Path p) throws IOException {
354    if (p.toUri().getScheme() != null) {
355      return p.toUri().toString();
356    }
357    return null;
358  }
359
360  public static void setWALRootDir(final Configuration c, final Path root) {
361    c.set(HBASE_WAL_DIR, root.toString());
362  }
363
364  public static FileSystem getWALFileSystem(final Configuration c) throws IOException {
365    Path p = getWALRootDir(c);
366    FileSystem fs = p.getFileSystem(c);
367    // hadoop-core does fs caching, so need to propagate this if set
368    String enforceStreamCapability = c.get(UNSAFE_STREAM_CAPABILITY_ENFORCE);
369    if (enforceStreamCapability != null) {
370      fs.getConf().set(UNSAFE_STREAM_CAPABILITY_ENFORCE, enforceStreamCapability);
371    }
372    return fs;
373  }
374
375  private static boolean isValidWALRootDir(Path walDir, final Configuration c) throws IOException {
376    Path rootDir = getRootDir(c);
377    FileSystem fs = walDir.getFileSystem(c);
378    Path qualifiedWalDir = walDir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
379    if (!qualifiedWalDir.equals(rootDir)) {
380      if (qualifiedWalDir.toString().startsWith(rootDir.toString() + "/")) {
381        throw new IllegalStateException("Illegal WAL directory specified. " +
382          "WAL directories are not permitted to be under root directory: rootDir=" +
383          rootDir.toString() + ", qualifiedWALDir=" + qualifiedWalDir);
384      }
385    }
386    return true;
387  }
388
389  /**
390   * Returns the WAL region directory based on the given table name and region name
391   * @param conf configuration to determine WALRootDir
392   * @param tableName Table that the region is under
393   * @param encodedRegionName Region name used for creating the final region directory
394   * @return the region directory used to store WALs under the WALRootDir
395   * @throws IOException if there is an exception determining the WALRootDir
396   */
397  public static Path getWALRegionDir(final Configuration conf, final TableName tableName,
398      final String encodedRegionName) throws IOException {
399    return new Path(getWALTableDir(conf, tableName), encodedRegionName);
400  }
401
402  /**
403   * Returns the Table directory under the WALRootDir for the specified table name
404   * @param conf configuration used to get the WALRootDir
405   * @param tableName Table to get the directory for
406   * @return a path to the WAL table directory for the specified table
407   * @throws IOException if there is an exception determining the WALRootDir
408   */
409  public static Path getWALTableDir(final Configuration conf, final TableName tableName)
410      throws IOException {
411    Path baseDir = new Path(getWALRootDir(conf), HConstants.BASE_NAMESPACE_DIR);
412    return new Path(new Path(baseDir, tableName.getNamespaceAsString()),
413      tableName.getQualifierAsString());
414  }
415
416  /**
417   * For backward compatibility with HBASE-20734, where we store recovered edits in a wrong
418   * directory without BASE_NAMESPACE_DIR. See HBASE-22617 for more details.
419   * @deprecated For compatibility, will be removed in 4.0.0.
420   */
421  @Deprecated
422  public static Path getWrongWALRegionDir(final Configuration conf, final TableName tableName,
423      final String encodedRegionName) throws IOException {
424    Path wrongTableDir = new Path(new Path(getWALRootDir(conf), tableName.getNamespaceAsString()),
425      tableName.getQualifierAsString());
426    return new Path(wrongTableDir, encodedRegionName);
427  }
428
429  /**
430   * Returns the {@link org.apache.hadoop.fs.Path} object representing the table directory under
431   * path rootdir
432   *
433   * @param rootdir qualified path of HBase root directory
434   * @param tableName name of table
435   * @return {@link org.apache.hadoop.fs.Path} for table
436   */
437  public static Path getTableDir(Path rootdir, final TableName tableName) {
438    return new Path(getNamespaceDir(rootdir, tableName.getNamespaceAsString()),
439        tableName.getQualifierAsString());
440  }
441
442  /**
443   * Returns the {@link org.apache.hadoop.fs.Path} object representing the region directory under
444   * path rootdir
445   *
446   * @param rootdir qualified path of HBase root directory
447   * @param tableName name of table
448   * @param regionName The encoded region name
449   * @return {@link org.apache.hadoop.fs.Path} for region
450   */
451  public static Path getRegionDir(Path rootdir, TableName tableName, String regionName) {
452    return new Path(getTableDir(rootdir, tableName), regionName);
453  }
454
455  /**
456   * Returns the {@link org.apache.hadoop.hbase.TableName} object representing
457   * the table directory under
458   * path rootdir
459   *
460   * @param tablePath path of table
461   * @return {@link org.apache.hadoop.fs.Path} for table
462   */
463  public static TableName getTableName(Path tablePath) {
464    return TableName.valueOf(tablePath.getParent().getName(), tablePath.getName());
465  }
466
467  /**
468   * Returns the {@link org.apache.hadoop.fs.Path} object representing
469   * the namespace directory under path rootdir
470   *
471   * @param rootdir qualified path of HBase root directory
472   * @param namespace namespace name
473   * @return {@link org.apache.hadoop.fs.Path} for table
474   */
475  public static Path getNamespaceDir(Path rootdir, final String namespace) {
476    return new Path(rootdir, new Path(HConstants.BASE_NAMESPACE_DIR,
477        new Path(namespace)));
478  }
479
480  // this mapping means that under a federated FileSystem implementation, we'll
481  // only log the first failure from any of the underlying FileSystems at WARN and all others
482  // will be at DEBUG.
483  private static final Map<FileSystem, Boolean> warningMap = new ConcurrentHashMap<>();
484
485  /**
486   * Sets storage policy for given path.
487   * If the passed path is a directory, we'll set the storage policy for all files
488   * created in the future in said directory. Note that this change in storage
489   * policy takes place at the FileSystem level; it will persist beyond this RS's lifecycle.
490   * If we're running on a version of FileSystem that doesn't support the given storage policy
491   * (or storage policies at all), then we'll issue a log message and continue.
492   *
493   * See http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html
494   *
495   * @param fs We only do anything it implements a setStoragePolicy method
496   * @param path the Path whose storage policy is to be set
497   * @param storagePolicy Policy to set on <code>path</code>; see hadoop 2.6+
498   *   org.apache.hadoop.hdfs.protocol.HdfsConstants for possible list e.g
499   *   'COLD', 'WARM', 'HOT', 'ONE_SSD', 'ALL_SSD', 'LAZY_PERSIST'.
500   */
501  public static void setStoragePolicy(final FileSystem fs, final Path path,
502      final String storagePolicy) {
503    try {
504      setStoragePolicy(fs, path, storagePolicy, false);
505    } catch (IOException e) {
506      // should never arrive here
507      LOG.warn("We have chosen not to throw exception but some unexpectedly thrown out", e);
508    }
509  }
510
511  static void setStoragePolicy(final FileSystem fs, final Path path, final String storagePolicy,
512      boolean throwException) throws IOException {
513    if (storagePolicy == null) {
514      if (LOG.isTraceEnabled()) {
515        LOG.trace("We were passed a null storagePolicy, exiting early.");
516      }
517      return;
518    }
519    String trimmedStoragePolicy = storagePolicy.trim();
520    if (trimmedStoragePolicy.isEmpty()) {
521      LOG.trace("We were passed an empty storagePolicy, exiting early.");
522      return;
523    } else {
524      trimmedStoragePolicy = trimmedStoragePolicy.toUpperCase(Locale.ROOT);
525    }
526    if (trimmedStoragePolicy.equals(HConstants.DEFER_TO_HDFS_STORAGE_POLICY)) {
527      LOG.trace("We were passed the defer-to-hdfs policy {}, exiting early.", trimmedStoragePolicy);
528      return;
529    }
530    try {
531      invokeSetStoragePolicy(fs, path, trimmedStoragePolicy);
532    } catch (IOException e) {
533      LOG.trace("Failed to invoke set storage policy API on FS", e);
534      if (throwException) {
535        throw e;
536      }
537    }
538  }
539
540  /*
541   * All args have been checked and are good. Run the setStoragePolicy invocation.
542   */
543  private static void invokeSetStoragePolicy(final FileSystem fs, final Path path,
544      final String storagePolicy) throws IOException {
545    Exception toThrow = null;
546
547    try {
548      fs.setStoragePolicy(path, storagePolicy);
549      LOG.debug("Set storagePolicy={} for path={}", storagePolicy, path);
550    } catch (Exception e) {
551      toThrow = e;
552      // This swallows FNFE, should we be throwing it? seems more likely to indicate dev
553      // misuse than a runtime problem with HDFS.
554      if (!warningMap.containsKey(fs)) {
555        warningMap.put(fs, true);
556        LOG.warn("Unable to set storagePolicy=" + storagePolicy + " for path=" + path + ". " +
557            "DEBUG log level might have more details.", e);
558      } else if (LOG.isDebugEnabled()) {
559        LOG.debug("Unable to set storagePolicy=" + storagePolicy + " for path=" + path, e);
560      }
561
562      // Hadoop 2.8+, 3.0-a1+ added FileSystem.setStoragePolicy with a default implementation
563      // that throws UnsupportedOperationException
564      if (e instanceof UnsupportedOperationException) {
565        if (LOG.isDebugEnabled()) {
566          LOG.debug("The underlying FileSystem implementation doesn't support " +
567              "setStoragePolicy. This is probably intentional on their part, since HDFS-9345 " +
568              "appears to be present in your version of Hadoop. For more information check " +
569              "the Hadoop documentation on 'ArchivalStorage', the Hadoop FileSystem " +
570              "specification docs from HADOOP-11981, and/or related documentation from the " +
571              "provider of the underlying FileSystem (its name should appear in the " +
572              "stacktrace that accompanies this message). Note in particular that Hadoop's " +
573              "local filesystem implementation doesn't support storage policies.", e);
574        }
575      }
576    }
577
578    if (toThrow != null) {
579      throw new IOException(toThrow);
580    }
581  }
582
583  /**
584   * @param conf must not be null
585   * @return True if this filesystem whose scheme is 'hdfs'.
586   * @throws IOException from underlying FileSystem
587   */
588  public static boolean isHDFS(final Configuration conf) throws IOException {
589    FileSystem fs = FileSystem.get(conf);
590    String scheme = fs.getUri().getScheme();
591    return scheme.equalsIgnoreCase("hdfs");
592  }
593
594  /**
595   * Checks if the given path is the one with 'recovered.edits' dir.
596   * @param path must not be null
597   * @return True if we recovered edits
598   */
599  public static boolean isRecoveredEdits(Path path) {
600    return path.toString().contains(HConstants.RECOVERED_EDITS_DIR);
601  }
602
603  /**
604   * @param conf must not be null
605   * @return Returns the filesystem of the hbase rootdir.
606   * @throws IOException from underlying FileSystem
607   */
608  public static FileSystem getCurrentFileSystem(Configuration conf) throws IOException {
609    return getRootDir(conf).getFileSystem(conf);
610  }
611
612  /**
613   * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
614   * This accommodates differences between hadoop versions, where hadoop 1
615   * does not throw a FileNotFoundException, and return an empty FileStatus[]
616   * while Hadoop 2 will throw FileNotFoundException.
617   *
618   * Where possible, prefer FSUtils#listStatusWithStatusFilter(FileSystem,
619   * Path, FileStatusFilter) instead.
620   *
621   * @param fs file system
622   * @param dir directory
623   * @param filter path filter
624   * @return null if dir is empty or doesn't exist, otherwise FileStatus array
625   */
626  public static FileStatus[] listStatus(final FileSystem fs,
627      final Path dir, final PathFilter filter) throws IOException {
628    FileStatus [] status = null;
629    try {
630      status = filter == null ? fs.listStatus(dir) : fs.listStatus(dir, filter);
631    } catch (FileNotFoundException fnfe) {
632      // if directory doesn't exist, return null
633      if (LOG.isTraceEnabled()) {
634        LOG.trace("{} doesn't exist", dir);
635      }
636    }
637    if (status == null || status.length < 1) {
638      return null;
639    }
640    return status;
641  }
642
643  /**
644   * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
645   * This would accommodates differences between hadoop versions
646   *
647   * @param fs file system
648   * @param dir directory
649   * @return null if dir is empty or doesn't exist, otherwise FileStatus array
650   */
651  public static FileStatus[] listStatus(final FileSystem fs, final Path dir) throws IOException {
652    return listStatus(fs, dir, null);
653  }
654
655  /**
656   * Calls fs.listFiles() to get FileStatus and BlockLocations together for reducing rpc call
657   *
658   * @param fs file system
659   * @param dir directory
660   * @return LocatedFileStatus list
661   */
662  public static List<LocatedFileStatus> listLocatedStatus(final FileSystem fs,
663      final Path dir) throws IOException {
664    List<LocatedFileStatus> status = null;
665    try {
666      RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fs
667          .listFiles(dir, false);
668      while (locatedFileStatusRemoteIterator.hasNext()) {
669        if (status == null) {
670          status = Lists.newArrayList();
671        }
672        status.add(locatedFileStatusRemoteIterator.next());
673      }
674    } catch (FileNotFoundException fnfe) {
675      // if directory doesn't exist, return null
676      if (LOG.isTraceEnabled()) {
677        LOG.trace("{} doesn't exist", dir);
678      }
679    }
680    return status;
681  }
682
683  /**
684   * Calls fs.delete() and returns the value returned by the fs.delete()
685   *
686   * @param fs must not be null
687   * @param path must not be null
688   * @param recursive delete tree rooted at path
689   * @return the value returned by the fs.delete()
690   * @throws IOException from underlying FileSystem
691   */
692  public static boolean delete(final FileSystem fs, final Path path, final boolean recursive)
693      throws IOException {
694    return fs.delete(path, recursive);
695  }
696
697  /**
698   * Calls fs.exists(). Checks if the specified path exists
699   *
700   * @param fs must not be null
701   * @param path must not be null
702   * @return the value returned by fs.exists()
703   * @throws IOException from underlying FileSystem
704   */
705  public static boolean isExists(final FileSystem fs, final Path path) throws IOException {
706    return fs.exists(path);
707  }
708
709  /**
710   * Log the current state of the filesystem from a certain root directory
711   * @param fs filesystem to investigate
712   * @param root root file/directory to start logging from
713   * @param log log to output information
714   * @throws IOException if an unexpected exception occurs
715   */
716  public static void logFileSystemState(final FileSystem fs, final Path root, Logger log)
717      throws IOException {
718    log.debug("File system contents for path {}", root);
719    logFSTree(log, fs, root, "|-");
720  }
721
722  /**
723   * Recursive helper to log the state of the FS
724   *
725   * @see #logFileSystemState(FileSystem, Path, Logger)
726   */
727  private static void logFSTree(Logger log, final FileSystem fs, final Path root, String prefix)
728      throws IOException {
729    FileStatus[] files = listStatus(fs, root, null);
730    if (files == null) {
731      return;
732    }
733
734    for (FileStatus file : files) {
735      if (file.isDirectory()) {
736        log.debug(prefix + file.getPath().getName() + "/");
737        logFSTree(log, fs, file.getPath(), prefix + "---");
738      } else {
739        log.debug(prefix + file.getPath().getName());
740      }
741    }
742  }
743
744  public static boolean renameAndSetModifyTime(final FileSystem fs, final Path src, final Path dest)
745      throws IOException {
746    // set the modify time for TimeToLive Cleaner
747    fs.setTimes(src, EnvironmentEdgeManager.currentTime(), -1);
748    return fs.rename(src, dest);
749  }
750
751  /**
752   * Check if short circuit read buffer size is set and if not, set it to hbase value.
753   * @param conf must not be null
754   */
755  public static void checkShortCircuitReadBufferSize(final Configuration conf) {
756    final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2;
757    final int notSet = -1;
758    // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2
759    final String dfsKey = "dfs.client.read.shortcircuit.buffer.size";
760    int size = conf.getInt(dfsKey, notSet);
761    // If a size is set, return -- we will use it.
762    if (size != notSet) {
763      return;
764    }
765    // But short circuit buffer size is normally not set.  Put in place the hbase wanted size.
766    int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize);
767    conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
768  }
769
770  private static final class DfsBuilderUtility {
771    private static final Class<?> BUILDER;
772    private static final Method REPLICATE;
773
774    static {
775      String builderName = "org.apache.hadoop.hdfs.DistributedFileSystem$HdfsDataOutputStreamBuilder";
776      Class<?> builderClass = null;
777      try {
778        builderClass = Class.forName(builderName);
779      } catch (ClassNotFoundException e) {
780        LOG.debug("{} not available, will not set replicate when creating output stream", builderName);
781      }
782      Method replicateMethod = null;
783      if (builderClass != null) {
784        try {
785          replicateMethod = builderClass.getMethod("replicate");
786          LOG.debug("Using builder API via reflection for DFS file creation.");
787        } catch (NoSuchMethodException e) {
788          LOG.debug("Could not find replicate method on builder; will not set replicate when" +
789            " creating output stream", e);
790        }
791      }
792      BUILDER = builderClass;
793      REPLICATE = replicateMethod;
794    }
795
796    /**
797     * Attempt to use builder API via reflection to call the replicate method on the given builder.
798     */
799    static void replicate(FSDataOutputStreamBuilder<?, ?> builder) {
800      if (BUILDER != null && REPLICATE != null && BUILDER.isAssignableFrom(builder.getClass())) {
801        try {
802          REPLICATE.invoke(builder);
803        } catch (IllegalAccessException | InvocationTargetException e) {
804          // Should have caught this failure during initialization, so log full trace here
805          LOG.warn("Couldn't use reflection with builder API", e);
806        }
807      }
808    }
809  }
810
811  /**
812   * Attempt to use builder API via reflection to create a file with the given parameters and
813   * replication enabled.
814   * <p/>
815   * Will not attempt to enable replication when passed an HFileSystem.
816   */
817  public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwrite)
818    throws IOException {
819    FSDataOutputStreamBuilder<?, ?> builder = fs.createFile(path).overwrite(overwrite);
820    DfsBuilderUtility.replicate(builder);
821    return builder.build();
822  }
823
824  /**
825   * Attempt to use builder API via reflection to create a file with the given parameters and
826   * replication enabled.
827   * <p/>
828   * Will not attempt to enable replication when passed an HFileSystem.
829   */
830  public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwrite,
831    int bufferSize, short replication, long blockSize, boolean isRecursive) throws IOException {
832    FSDataOutputStreamBuilder<?, ?> builder = fs.createFile(path).overwrite(overwrite)
833      .bufferSize(bufferSize).replication(replication).blockSize(blockSize);
834    if (isRecursive) {
835      builder.recursive();
836    }
837    DfsBuilderUtility.replicate(builder);
838    return builder.build();
839  }
840
841  /**
842   * Helper exception for those cases where the place where we need to check a stream capability
843   * is not where we have the needed context to explain the impact and mitigation for a lack.
844   */
845  public static class StreamLacksCapabilityException extends Exception {
846    public StreamLacksCapabilityException(String message, Throwable cause) {
847      super(message, cause);
848    }
849    public StreamLacksCapabilityException(String message) {
850      super(message);
851    }
852  }
853}