001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.util;
020
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.lang.reflect.InvocationTargetException;
024import java.lang.reflect.Method;
025import java.net.URI;
026import java.net.URISyntaxException;
027import java.util.List;
028import java.util.Locale;
029import java.util.Map;
030import java.util.concurrent.ConcurrentHashMap;
031import org.apache.hadoop.HadoopIllegalArgumentException;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FSDataOutputStream;
034import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
035import org.apache.hadoop.fs.FileStatus;
036import org.apache.hadoop.fs.FileSystem;
037import org.apache.hadoop.fs.LocatedFileStatus;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.fs.PathFilter;
040import org.apache.hadoop.fs.RemoteIterator;
041import org.apache.hadoop.fs.permission.FsPermission;
042import org.apache.hadoop.hbase.HConstants;
043import org.apache.hadoop.hbase.TableName;
044import org.apache.hadoop.ipc.RemoteException;
045import org.apache.yetus.audience.InterfaceAudience;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
050import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
051
052/**
053 * Utility methods for interacting with the underlying file system.
054 * <p/>
055 * Note that {@link #setStoragePolicy(FileSystem, Path, String)} is tested in TestFSUtils and
056 * pre-commit will run the hbase-server tests if there's code change in this class. See
057 * <a href="https://issues.apache.org/jira/browse/HBASE-20838">HBASE-20838</a> for more details.
058 */
059@InterfaceAudience.Private
060public final class CommonFSUtils {
061  private static final Logger LOG = LoggerFactory.getLogger(CommonFSUtils.class);
062
063  /** Parameter name for HBase WAL directory */
064  public static final String HBASE_WAL_DIR = "hbase.wal.dir";
065
066  /** Parameter to disable stream capability enforcement checks */
067  public static final String UNSAFE_STREAM_CAPABILITY_ENFORCE =
068    "hbase.unsafe.stream.capability.enforce";
069
070  /** Full access permissions (starting point for a umask) */
071  public static final String FULL_RWX_PERMISSIONS = "777";
072
073  private CommonFSUtils() {
074  }
075
076  /**
077   * Compare of path component. Does not consider schema; i.e. if schemas
078   * different but <code>path</code> starts with <code>rootPath</code>,
079   * then the function returns true
080   * @param rootPath value to check for
081   * @param path subject to check
082   * @return True if <code>path</code> starts with <code>rootPath</code>
083   */
084  public static boolean isStartingWithPath(final Path rootPath, final String path) {
085    String uriRootPath = rootPath.toUri().getPath();
086    String tailUriPath = (new Path(path)).toUri().getPath();
087    return tailUriPath.startsWith(uriRootPath);
088  }
089
090  /**
091   * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
092   * '/a/b/c' part. Does not consider schema; i.e. if schemas different but path or subpath matches,
093   * the two will equate.
094   * @param pathToSearch Path we will be trying to match against.
095   * @param pathTail what to match
096   * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
097   */
098  public static boolean isMatchingTail(final Path pathToSearch, String pathTail) {
099    return isMatchingTail(pathToSearch, new Path(pathTail));
100  }
101
102  /**
103   * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
104   * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true.  Does not consider
105   * schema; i.e. if schemas different but path or subpath matches, the two will equate.
106   * @param pathToSearch Path we will be trying to match agains against
107   * @param pathTail what to match
108   * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
109   */
110  public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) {
111    if (pathToSearch.depth() != pathTail.depth()) {
112      return false;
113    }
114    Path tailPath = pathTail;
115    String tailName;
116    Path toSearch = pathToSearch;
117    String toSearchName;
118    boolean result = false;
119    do {
120      tailName = tailPath.getName();
121      if (tailName == null || tailName.length() <= 0) {
122        result = true;
123        break;
124      }
125      toSearchName = toSearch.getName();
126      if (toSearchName == null || toSearchName.length() <= 0) {
127        break;
128      }
129      // Move up a parent on each path for next go around.  Path doesn't let us go off the end.
130      tailPath = tailPath.getParent();
131      toSearch = toSearch.getParent();
132    } while(tailName.equals(toSearchName));
133    return result;
134  }
135
136  /**
137   * Delete if exists.
138   * @param fs filesystem object
139   * @param dir directory to delete
140   * @return True if deleted <code>dir</code>
141   * @throws IOException e
142   */
143  public static boolean deleteDirectory(final FileSystem fs, final Path dir) throws IOException {
144    return fs.exists(dir) && fs.delete(dir, true);
145  }
146
147  /**
148   * Return the number of bytes that large input files should be optimally
149   * be split into to minimize i/o time.
150   *
151   * @param fs filesystem object
152   * @return the default block size for the path's filesystem
153   */
154  public static long getDefaultBlockSize(final FileSystem fs, final Path path) {
155    return fs.getDefaultBlockSize(path);
156  }
157
158  /*
159   * Get the default replication.
160   *
161   * @param fs filesystem object
162   * @param f path of file
163   * @return default replication for the path's filesystem
164   */
165  public static short getDefaultReplication(final FileSystem fs, final Path path) {
166    return fs.getDefaultReplication(path);
167  }
168
169  /**
170   * Returns the default buffer size to use during writes.
171   *
172   * The size of the buffer should probably be a multiple of hardware
173   * page size (4096 on Intel x86), and it determines how much data is
174   * buffered during read and write operations.
175   *
176   * @param fs filesystem object
177   * @return default buffer size to use during writes
178   */
179  public static int getDefaultBufferSize(final FileSystem fs) {
180    return fs.getConf().getInt("io.file.buffer.size", 4096);
181  }
182
183  /**
184   * Create the specified file on the filesystem. By default, this will:
185   * <ol>
186   * <li>apply the umask in the configuration (if it is enabled)</li>
187   * <li>use the fs configured buffer size (or 4096 if not set)</li>
188   * <li>use the default replication</li>
189   * <li>use the default block size</li>
190   * <li>not track progress</li>
191   * </ol>
192   *
193   * @param fs {@link FileSystem} on which to write the file
194   * @param path {@link Path} to the file to write
195   * @param perm intial permissions
196   * @param overwrite Whether or not the created file should be overwritten.
197   * @return output stream to the created file
198   * @throws IOException if the file cannot be created
199   */
200  public static FSDataOutputStream create(FileSystem fs, Path path,
201      FsPermission perm, boolean overwrite) throws IOException {
202    if (LOG.isTraceEnabled()) {
203      LOG.trace("Creating file={} with permission={}, overwrite={}", path, perm, overwrite);
204    }
205    return fs.create(path, perm, overwrite, getDefaultBufferSize(fs),
206        getDefaultReplication(fs, path), getDefaultBlockSize(fs, path), null);
207  }
208
209  /**
210   * Get the file permissions specified in the configuration, if they are
211   * enabled.
212   *
213   * @param fs filesystem that the file will be created on.
214   * @param conf configuration to read for determining if permissions are
215   *          enabled and which to use
216   * @param permssionConfKey property key in the configuration to use when
217   *          finding the permission
218   * @return the permission to use when creating a new file on the fs. If
219   *         special permissions are not specified in the configuration, then
220   *         the default permissions on the the fs will be returned.
221   */
222  public static FsPermission getFilePermissions(final FileSystem fs,
223      final Configuration conf, final String permssionConfKey) {
224    boolean enablePermissions = conf.getBoolean(
225        HConstants.ENABLE_DATA_FILE_UMASK, false);
226
227    if (enablePermissions) {
228      try {
229        FsPermission perm = new FsPermission(FULL_RWX_PERMISSIONS);
230        // make sure that we have a mask, if not, go default.
231        String mask = conf.get(permssionConfKey);
232        if (mask == null) {
233          return FsPermission.getFileDefault();
234        }
235        // appy the umask
236        FsPermission umask = new FsPermission(mask);
237        return perm.applyUMask(umask);
238      } catch (IllegalArgumentException e) {
239        LOG.warn(
240            "Incorrect umask attempted to be created: "
241                + conf.get(permssionConfKey)
242                + ", using default file permissions.", e);
243        return FsPermission.getFileDefault();
244      }
245    }
246    return FsPermission.getFileDefault();
247  }
248
249  /**
250   * Verifies root directory path is a valid URI with a scheme
251   *
252   * @param root root directory path
253   * @return Passed <code>root</code> argument.
254   * @throws IOException if not a valid URI with a scheme
255   */
256  public static Path validateRootPath(Path root) throws IOException {
257    try {
258      URI rootURI = new URI(root.toString());
259      String scheme = rootURI.getScheme();
260      if (scheme == null) {
261        throw new IOException("Root directory does not have a scheme");
262      }
263      return root;
264    } catch (URISyntaxException e) {
265      throw new IOException("Root directory path is not a valid " +
266        "URI -- check your " + HConstants.HBASE_DIR + " configuration", e);
267    }
268  }
269
270  /**
271   * Checks for the presence of the WAL log root path (using the provided conf object) in the given
272   * path. If it exists, this method removes it and returns the String representation of remaining
273   * relative path.
274   * @param path must not be null
275   * @param conf must not be null
276   * @return String representation of the remaining relative path
277   * @throws IOException from underlying filesystem
278   */
279  public static String removeWALRootPath(Path path, final Configuration conf) throws IOException {
280    Path root = getWALRootDir(conf);
281    String pathStr = path.toString();
282    // check that the path is absolute... it has the root path in it.
283    if (!pathStr.startsWith(root.toString())) {
284      return pathStr;
285    }
286    // if not, return as it is.
287    return pathStr.substring(root.toString().length() + 1);// remove the "/" too.
288  }
289
290  /**
291   * Return the 'path' component of a Path.  In Hadoop, Path is a URI.  This
292   * method returns the 'path' component of a Path's URI: e.g. If a Path is
293   * <code>hdfs://example.org:9000/hbase_trunk/TestTable/compaction.dir</code>,
294   * this method returns <code>/hbase_trunk/TestTable/compaction.dir</code>.
295   * This method is useful if you want to print out a Path without qualifying
296   * Filesystem instance.
297   * @param p Filesystem Path whose 'path' component we are to return.
298   * @return Path portion of the Filesystem
299   */
300  public static String getPath(Path p) {
301    return p.toUri().getPath();
302  }
303
304  /**
305   * @param c configuration
306   * @return {@link Path} to hbase root directory from
307   *     configuration as a qualified Path.
308   * @throws IOException e
309   */
310  public static Path getRootDir(final Configuration c) throws IOException {
311    Path p = new Path(c.get(HConstants.HBASE_DIR));
312    FileSystem fs = p.getFileSystem(c);
313    return p.makeQualified(fs.getUri(), fs.getWorkingDirectory());
314  }
315
316  public static void setRootDir(final Configuration c, final Path root) {
317    c.set(HConstants.HBASE_DIR, root.toString());
318  }
319
320  public static void setFsDefault(final Configuration c, final Path root) {
321    c.set("fs.defaultFS", root.toString());    // for hadoop 0.21+
322  }
323
324  public static FileSystem getRootDirFileSystem(final Configuration c) throws IOException {
325    Path p = getRootDir(c);
326    return p.getFileSystem(c);
327  }
328
329  /**
330   * @param c configuration
331   * @return {@link Path} to hbase log root directory: e.g. {@value HBASE_WAL_DIR} from
332   *     configuration as a qualified Path. Defaults to HBase root dir.
333   * @throws IOException e
334   */
335  public static Path getWALRootDir(final Configuration c) throws IOException {
336
337    Path p = new Path(c.get(HBASE_WAL_DIR, c.get(HConstants.HBASE_DIR)));
338    if (!isValidWALRootDir(p, c)) {
339      return getRootDir(c);
340    }
341    FileSystem fs = p.getFileSystem(c);
342    return p.makeQualified(fs.getUri(), fs.getWorkingDirectory());
343  }
344
345  @VisibleForTesting
346  public static void setWALRootDir(final Configuration c, final Path root) {
347    c.set(HBASE_WAL_DIR, root.toString());
348  }
349
350  public static FileSystem getWALFileSystem(final Configuration c) throws IOException {
351    Path p = getWALRootDir(c);
352    FileSystem fs = p.getFileSystem(c);
353    // hadoop-core does fs caching, so need to propagate this if set
354    String enforceStreamCapability = c.get(UNSAFE_STREAM_CAPABILITY_ENFORCE);
355    if (enforceStreamCapability != null) {
356      fs.getConf().set(UNSAFE_STREAM_CAPABILITY_ENFORCE, enforceStreamCapability);
357    }
358    return fs;
359  }
360
361  private static boolean isValidWALRootDir(Path walDir, final Configuration c) throws IOException {
362    Path rootDir = getRootDir(c);
363    FileSystem fs = walDir.getFileSystem(c);
364    Path qualifiedWalDir = walDir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
365    if (!qualifiedWalDir.equals(rootDir)) {
366      if (qualifiedWalDir.toString().startsWith(rootDir.toString() + "/")) {
367        throw new IllegalStateException("Illegal WAL directory specified. " +
368            "WAL directories are not permitted to be under the root directory if set.");
369      }
370    }
371    return true;
372  }
373
374  /**
375   * Returns the WAL region directory based on the given table name and region name
376   * @param conf configuration to determine WALRootDir
377   * @param tableName Table that the region is under
378   * @param encodedRegionName Region name used for creating the final region directory
379   * @return the region directory used to store WALs under the WALRootDir
380   * @throws IOException if there is an exception determining the WALRootDir
381   */
382  public static Path getWALRegionDir(final Configuration conf, final TableName tableName,
383      final String encodedRegionName) throws IOException {
384    return new Path(getWALTableDir(conf, tableName), encodedRegionName);
385  }
386
387  /**
388   * Returns the Table directory under the WALRootDir for the specified table name
389   * @param conf configuration used to get the WALRootDir
390   * @param tableName Table to get the directory for
391   * @return a path to the WAL table directory for the specified table
392   * @throws IOException if there is an exception determining the WALRootDir
393   */
394  public static Path getWALTableDir(final Configuration conf, final TableName tableName)
395      throws IOException {
396    Path baseDir = new Path(getWALRootDir(conf), HConstants.BASE_NAMESPACE_DIR);
397    return new Path(new Path(baseDir, tableName.getNamespaceAsString()),
398      tableName.getQualifierAsString());
399  }
400
401  /**
402   * For backward compatibility with HBASE-20734, where we store recovered edits in a wrong
403   * directory without BASE_NAMESPACE_DIR. See HBASE-22617 for more details.
404   * @deprecated For compatibility, will be removed in 4.0.0.
405   */
406  @Deprecated
407  public static Path getWrongWALRegionDir(final Configuration conf, final TableName tableName,
408      final String encodedRegionName) throws IOException {
409    Path wrongTableDir = new Path(new Path(getWALRootDir(conf), tableName.getNamespaceAsString()),
410      tableName.getQualifierAsString());
411    return new Path(wrongTableDir, encodedRegionName);
412  }
413
414  /**
415   * Returns the {@link org.apache.hadoop.fs.Path} object representing the table directory under
416   * path rootdir
417   *
418   * @param rootdir qualified path of HBase root directory
419   * @param tableName name of table
420   * @return {@link org.apache.hadoop.fs.Path} for table
421   */
422  public static Path getTableDir(Path rootdir, final TableName tableName) {
423    return new Path(getNamespaceDir(rootdir, tableName.getNamespaceAsString()),
424        tableName.getQualifierAsString());
425  }
426
427  /**
428   * Returns the {@link org.apache.hadoop.fs.Path} object representing the region directory under
429   * path rootdir
430   *
431   * @param rootdir qualified path of HBase root directory
432   * @param tableName name of table
433   * @param regionName The encoded region name
434   * @return {@link org.apache.hadoop.fs.Path} for region
435   */
436  public static Path getRegionDir(Path rootdir, TableName tableName, String regionName) {
437    return new Path(getTableDir(rootdir, tableName), regionName);
438  }
439
440  /**
441   * Returns the {@link org.apache.hadoop.hbase.TableName} object representing
442   * the table directory under
443   * path rootdir
444   *
445   * @param tablePath path of table
446   * @return {@link org.apache.hadoop.fs.Path} for table
447   */
448  public static TableName getTableName(Path tablePath) {
449    return TableName.valueOf(tablePath.getParent().getName(), tablePath.getName());
450  }
451
452  /**
453   * Returns the {@link org.apache.hadoop.fs.Path} object representing
454   * the namespace directory under path rootdir
455   *
456   * @param rootdir qualified path of HBase root directory
457   * @param namespace namespace name
458   * @return {@link org.apache.hadoop.fs.Path} for table
459   */
460  public static Path getNamespaceDir(Path rootdir, final String namespace) {
461    return new Path(rootdir, new Path(HConstants.BASE_NAMESPACE_DIR,
462        new Path(namespace)));
463  }
464
465  // this mapping means that under a federated FileSystem implementation, we'll
466  // only log the first failure from any of the underlying FileSystems at WARN and all others
467  // will be at DEBUG.
468  private static final Map<FileSystem, Boolean> warningMap = new ConcurrentHashMap<>();
469
470  /**
471   * Sets storage policy for given path.
472   * If the passed path is a directory, we'll set the storage policy for all files
473   * created in the future in said directory. Note that this change in storage
474   * policy takes place at the FileSystem level; it will persist beyond this RS's lifecycle.
475   * If we're running on a version of FileSystem that doesn't support the given storage policy
476   * (or storage policies at all), then we'll issue a log message and continue.
477   *
478   * See http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html
479   *
480   * @param fs We only do anything it implements a setStoragePolicy method
481   * @param path the Path whose storage policy is to be set
482   * @param storagePolicy Policy to set on <code>path</code>; see hadoop 2.6+
483   *   org.apache.hadoop.hdfs.protocol.HdfsConstants for possible list e.g
484   *   'COLD', 'WARM', 'HOT', 'ONE_SSD', 'ALL_SSD', 'LAZY_PERSIST'.
485   */
486  public static void setStoragePolicy(final FileSystem fs, final Path path,
487      final String storagePolicy) {
488    try {
489      setStoragePolicy(fs, path, storagePolicy, false);
490    } catch (IOException e) {
491      // should never arrive here
492      LOG.warn("We have chosen not to throw exception but some unexpectedly thrown out", e);
493    }
494  }
495
496  static void setStoragePolicy(final FileSystem fs, final Path path, final String storagePolicy,
497      boolean throwException) throws IOException {
498    if (storagePolicy == null) {
499      if (LOG.isTraceEnabled()) {
500        LOG.trace("We were passed a null storagePolicy, exiting early.");
501      }
502      return;
503    }
504    String trimmedStoragePolicy = storagePolicy.trim();
505    if (trimmedStoragePolicy.isEmpty()) {
506      LOG.trace("We were passed an empty storagePolicy, exiting early.");
507      return;
508    } else {
509      trimmedStoragePolicy = trimmedStoragePolicy.toUpperCase(Locale.ROOT);
510    }
511    if (trimmedStoragePolicy.equals(HConstants.DEFER_TO_HDFS_STORAGE_POLICY)) {
512      LOG.trace("We were passed the defer-to-hdfs policy {}, exiting early.", trimmedStoragePolicy);
513      return;
514    }
515    try {
516      invokeSetStoragePolicy(fs, path, trimmedStoragePolicy);
517    } catch (IOException e) {
518      LOG.trace("Failed to invoke set storage policy API on FS", e);
519      if (throwException) {
520        throw e;
521      }
522    }
523  }
524
525  /*
526   * All args have been checked and are good. Run the setStoragePolicy invocation.
527   */
528  private static void invokeSetStoragePolicy(final FileSystem fs, final Path path,
529      final String storagePolicy) throws IOException {
530    Exception toThrow = null;
531
532    try {
533      fs.setStoragePolicy(path, storagePolicy);
534      LOG.debug("Set storagePolicy={} for path={}", storagePolicy, path);
535    } catch (Exception e) {
536      toThrow = e;
537      // This swallows FNFE, should we be throwing it? seems more likely to indicate dev
538      // misuse than a runtime problem with HDFS.
539      if (!warningMap.containsKey(fs)) {
540        warningMap.put(fs, true);
541        LOG.warn("Unable to set storagePolicy=" + storagePolicy + " for path=" + path + ". " +
542            "DEBUG log level might have more details.", e);
543      } else if (LOG.isDebugEnabled()) {
544        LOG.debug("Unable to set storagePolicy=" + storagePolicy + " for path=" + path, e);
545      }
546
547      // Hadoop 2.8+, 3.0-a1+ added FileSystem.setStoragePolicy with a default implementation
548      // that throws UnsupportedOperationException
549      if (e instanceof UnsupportedOperationException) {
550        if (LOG.isDebugEnabled()) {
551          LOG.debug("The underlying FileSystem implementation doesn't support " +
552              "setStoragePolicy. This is probably intentional on their part, since HDFS-9345 " +
553              "appears to be present in your version of Hadoop. For more information check " +
554              "the Hadoop documentation on 'ArchivalStorage', the Hadoop FileSystem " +
555              "specification docs from HADOOP-11981, and/or related documentation from the " +
556              "provider of the underlying FileSystem (its name should appear in the " +
557              "stacktrace that accompanies this message). Note in particular that Hadoop's " +
558              "local filesystem implementation doesn't support storage policies.", e);
559        }
560      }
561    }
562
563    if (toThrow != null) {
564      throw new IOException(toThrow);
565    }
566  }
567
568  /**
569   * @param conf must not be null
570   * @return True if this filesystem whose scheme is 'hdfs'.
571   * @throws IOException from underlying FileSystem
572   */
573  public static boolean isHDFS(final Configuration conf) throws IOException {
574    FileSystem fs = FileSystem.get(conf);
575    String scheme = fs.getUri().getScheme();
576    return scheme.equalsIgnoreCase("hdfs");
577  }
578
579  /**
580   * Checks if the given path is the one with 'recovered.edits' dir.
581   * @param path must not be null
582   * @return True if we recovered edits
583   */
584  public static boolean isRecoveredEdits(Path path) {
585    return path.toString().contains(HConstants.RECOVERED_EDITS_DIR);
586  }
587
588  /**
589   * @param conf must not be null
590   * @return Returns the filesystem of the hbase rootdir.
591   * @throws IOException from underlying FileSystem
592   */
593  public static FileSystem getCurrentFileSystem(Configuration conf) throws IOException {
594    return getRootDir(conf).getFileSystem(conf);
595  }
596
597  /**
598   * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
599   * This accommodates differences between hadoop versions, where hadoop 1
600   * does not throw a FileNotFoundException, and return an empty FileStatus[]
601   * while Hadoop 2 will throw FileNotFoundException.
602   *
603   * Where possible, prefer FSUtils#listStatusWithStatusFilter(FileSystem,
604   * Path, FileStatusFilter) instead.
605   *
606   * @param fs file system
607   * @param dir directory
608   * @param filter path filter
609   * @return null if dir is empty or doesn't exist, otherwise FileStatus array
610   */
611  public static FileStatus[] listStatus(final FileSystem fs,
612      final Path dir, final PathFilter filter) throws IOException {
613    FileStatus [] status = null;
614    try {
615      status = filter == null ? fs.listStatus(dir) : fs.listStatus(dir, filter);
616    } catch (FileNotFoundException fnfe) {
617      // if directory doesn't exist, return null
618      if (LOG.isTraceEnabled()) {
619        LOG.trace("{} doesn't exist", dir);
620      }
621    }
622    if (status == null || status.length < 1) {
623      return null;
624    }
625    return status;
626  }
627
628  /**
629   * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
630   * This would accommodates differences between hadoop versions
631   *
632   * @param fs file system
633   * @param dir directory
634   * @return null if dir is empty or doesn't exist, otherwise FileStatus array
635   */
636  public static FileStatus[] listStatus(final FileSystem fs, final Path dir) throws IOException {
637    return listStatus(fs, dir, null);
638  }
639
640  /**
641   * Calls fs.listFiles() to get FileStatus and BlockLocations together for reducing rpc call
642   *
643   * @param fs file system
644   * @param dir directory
645   * @return LocatedFileStatus list
646   */
647  public static List<LocatedFileStatus> listLocatedStatus(final FileSystem fs,
648      final Path dir) throws IOException {
649    List<LocatedFileStatus> status = null;
650    try {
651      RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fs
652          .listFiles(dir, false);
653      while (locatedFileStatusRemoteIterator.hasNext()) {
654        if (status == null) {
655          status = Lists.newArrayList();
656        }
657        status.add(locatedFileStatusRemoteIterator.next());
658      }
659    } catch (FileNotFoundException fnfe) {
660      // if directory doesn't exist, return null
661      if (LOG.isTraceEnabled()) {
662        LOG.trace("{} doesn't exist", dir);
663      }
664    }
665    return status;
666  }
667
668  /**
669   * Calls fs.delete() and returns the value returned by the fs.delete()
670   *
671   * @param fs must not be null
672   * @param path must not be null
673   * @param recursive delete tree rooted at path
674   * @return the value returned by the fs.delete()
675   * @throws IOException from underlying FileSystem
676   */
677  public static boolean delete(final FileSystem fs, final Path path, final boolean recursive)
678      throws IOException {
679    return fs.delete(path, recursive);
680  }
681
682  /**
683   * Calls fs.exists(). Checks if the specified path exists
684   *
685   * @param fs must not be null
686   * @param path must not be null
687   * @return the value returned by fs.exists()
688   * @throws IOException from underlying FileSystem
689   */
690  public static boolean isExists(final FileSystem fs, final Path path) throws IOException {
691    return fs.exists(path);
692  }
693
694  /**
695   * Log the current state of the filesystem from a certain root directory
696   * @param fs filesystem to investigate
697   * @param root root file/directory to start logging from
698   * @param log log to output information
699   * @throws IOException if an unexpected exception occurs
700   */
701  public static void logFileSystemState(final FileSystem fs, final Path root, Logger log)
702      throws IOException {
703    log.debug("File system contents for path {}", root);
704    logFSTree(log, fs, root, "|-");
705  }
706
707  /**
708   * Recursive helper to log the state of the FS
709   *
710   * @see #logFileSystemState(FileSystem, Path, Logger)
711   */
712  private static void logFSTree(Logger log, final FileSystem fs, final Path root, String prefix)
713      throws IOException {
714    FileStatus[] files = listStatus(fs, root, null);
715    if (files == null) {
716      return;
717    }
718
719    for (FileStatus file : files) {
720      if (file.isDirectory()) {
721        log.debug(prefix + file.getPath().getName() + "/");
722        logFSTree(log, fs, file.getPath(), prefix + "---");
723      } else {
724        log.debug(prefix + file.getPath().getName());
725      }
726    }
727  }
728
729  public static boolean renameAndSetModifyTime(final FileSystem fs, final Path src, final Path dest)
730      throws IOException {
731    // set the modify time for TimeToLive Cleaner
732    fs.setTimes(src, EnvironmentEdgeManager.currentTime(), -1);
733    return fs.rename(src, dest);
734  }
735
736  /**
737   * Check if short circuit read buffer size is set and if not, set it to hbase value.
738   * @param conf must not be null
739   */
740  public static void checkShortCircuitReadBufferSize(final Configuration conf) {
741    final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2;
742    final int notSet = -1;
743    // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2
744    final String dfsKey = "dfs.client.read.shortcircuit.buffer.size";
745    int size = conf.getInt(dfsKey, notSet);
746    // If a size is set, return -- we will use it.
747    if (size != notSet) {
748      return;
749    }
750    // But short circuit buffer size is normally not set.  Put in place the hbase wanted size.
751    int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize);
752    conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
753  }
754
755  private static final class DfsBuilderUtility {
756    private static final Class<?> BUILDER;
757    private static final Method REPLICATE;
758
759    static {
760      String builderName = "org.apache.hadoop.hdfs.DistributedFileSystem$HdfsDataOutputStreamBuilder";
761      Class<?> builderClass = null;
762      try {
763        builderClass = Class.forName(builderName);
764      } catch (ClassNotFoundException e) {
765        LOG.debug("{} not available, will not set replicate when creating output stream", builderName);
766      }
767      Method replicateMethod = null;
768      if (builderClass != null) {
769        try {
770          replicateMethod = builderClass.getMethod("replicate");
771          LOG.debug("Using builder API via reflection for DFS file creation.");
772        } catch (NoSuchMethodException e) {
773          LOG.debug("Could not find replicate method on builder; will not set replicate when" +
774            " creating output stream", e);
775        }
776      }
777      BUILDER = builderClass;
778      REPLICATE = replicateMethod;
779    }
780
781    /**
782     * Attempt to use builder API via reflection to call the replicate method on the given builder.
783     */
784    static void replicate(FSDataOutputStreamBuilder<?, ?> builder) {
785      if (BUILDER != null && REPLICATE != null && BUILDER.isAssignableFrom(builder.getClass())) {
786        try {
787          REPLICATE.invoke(builder);
788        } catch (IllegalAccessException | InvocationTargetException e) {
789          // Should have caught this failure during initialization, so log full trace here
790          LOG.warn("Couldn't use reflection with builder API", e);
791        }
792      }
793    }
794  }
795
796  /**
797   * Attempt to use builder API via reflection to create a file with the given parameters and
798   * replication enabled.
799   * <p/>
800   * Will not attempt to enable replication when passed an HFileSystem.
801   */
802  public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwrite)
803    throws IOException {
804    FSDataOutputStreamBuilder<?, ?> builder = fs.createFile(path).overwrite(overwrite);
805    DfsBuilderUtility.replicate(builder);
806    return builder.build();
807  }
808
809  /**
810   * Attempt to use builder API via reflection to create a file with the given parameters and
811   * replication enabled.
812   * <p/>
813   * Will not attempt to enable replication when passed an HFileSystem.
814   */
815  public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwrite,
816    int bufferSize, short replication, long blockSize, boolean isRecursive) throws IOException {
817    FSDataOutputStreamBuilder<?, ?> builder = fs.createFile(path).overwrite(overwrite)
818      .bufferSize(bufferSize).replication(replication).blockSize(blockSize);
819    if (isRecursive) {
820      builder.recursive();
821    }
822    DfsBuilderUtility.replicate(builder);
823    return builder.build();
824  }
825
826  /**
827   * Helper exception for those cases where the place where we need to check a stream capability
828   * is not where we have the needed context to explain the impact and mitigation for a lack.
829   */
830  public static class StreamLacksCapabilityException extends Exception {
831    public StreamLacksCapabilityException(String message, Throwable cause) {
832      super(message, cause);
833    }
834    public StreamLacksCapabilityException(String message) {
835      super(message);
836    }
837  }
838}