001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.net.URI;
023import java.net.URISyntaxException;
024import java.util.List;
025import java.util.Locale;
026import java.util.Map;
027import java.util.concurrent.ConcurrentHashMap;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FSDataOutputStream;
030import org.apache.hadoop.fs.FileStatus;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.LocatedFileStatus;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.fs.PathFilter;
035import org.apache.hadoop.fs.RemoteIterator;
036import org.apache.hadoop.fs.permission.FsPermission;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.yetus.audience.InterfaceAudience;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
044
045/**
046 * Utility methods for interacting with the underlying file system.
047 * <p/>
048 * Note that {@link #setStoragePolicy(FileSystem, Path, String)} is tested in TestFSUtils and
049 * pre-commit will run the hbase-server tests if there's code change in this class. See
050 * <a href="https://issues.apache.org/jira/browse/HBASE-20838">HBASE-20838</a> for more details.
051 */
052@InterfaceAudience.Private
053public final class CommonFSUtils {
054  private static final Logger LOG = LoggerFactory.getLogger(CommonFSUtils.class);
055
056  /** Parameter name for HBase WAL directory */
057  public static final String HBASE_WAL_DIR = "hbase.wal.dir";
058
059  /** Parameter to disable stream capability enforcement checks */
060  public static final String UNSAFE_STREAM_CAPABILITY_ENFORCE =
061    "hbase.unsafe.stream.capability.enforce";
062
063  /** Full access permissions (starting point for a umask) */
064  public static final String FULL_RWX_PERMISSIONS = "777";
065
066  private CommonFSUtils() {
067  }
068
069  /**
070   * Compare of path component. Does not consider schema; i.e. if schemas different but
071   * <code>path</code> starts with <code>rootPath</code>, then the function returns true
072   * @param rootPath value to check for
073   * @param path     subject to check
074   * @return True if <code>path</code> starts with <code>rootPath</code>
075   */
076  public static boolean isStartingWithPath(final Path rootPath, final String path) {
077    String uriRootPath = rootPath.toUri().getPath();
078    String tailUriPath = new Path(path).toUri().getPath();
079    return tailUriPath.startsWith(uriRootPath);
080  }
081
082  /**
083   * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
084   * '/a/b/c' part. Does not consider schema; i.e. if schemas different but path or subpath matches,
085   * the two will equate.
086   * @param pathToSearch Path we will be trying to match against.
087   * @param pathTail     what to match
088   * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
089   */
090  public static boolean isMatchingTail(final Path pathToSearch, String pathTail) {
091    return isMatchingTail(pathToSearch, new Path(pathTail));
092  }
093
094  /**
095   * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
096   * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true. Does not consider
097   * schema; i.e. if schemas different but path or subpath matches, the two will equate.
098   * @param pathToSearch Path we will be trying to match agains against
099   * @param pathTail     what to match
100   * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
101   */
102  public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) {
103    if (pathToSearch.depth() != pathTail.depth()) {
104      return false;
105    }
106    Path tailPath = pathTail;
107    String tailName;
108    Path toSearch = pathToSearch;
109    String toSearchName;
110    boolean result = false;
111    do {
112      tailName = tailPath.getName();
113      if (tailName == null || tailName.length() <= 0) {
114        result = true;
115        break;
116      }
117      toSearchName = toSearch.getName();
118      if (toSearchName == null || toSearchName.length() <= 0) {
119        break;
120      }
121      // Move up a parent on each path for next go around. Path doesn't let us go off the end.
122      tailPath = tailPath.getParent();
123      toSearch = toSearch.getParent();
124    } while (tailName.equals(toSearchName));
125    return result;
126  }
127
128  /**
129   * Delete if exists.
130   * @param fs  filesystem object
131   * @param dir directory to delete
132   * @return True if deleted <code>dir</code>
133   * @throws IOException e
134   */
135  public static boolean deleteDirectory(final FileSystem fs, final Path dir) throws IOException {
136    return fs.exists(dir) && fs.delete(dir, true);
137  }
138
139  /**
140   * Return the number of bytes that large input files should be optimally be split into to minimize
141   * i/o time.
142   * @param fs filesystem object
143   * @return the default block size for the path's filesystem
144   */
145  public static long getDefaultBlockSize(final FileSystem fs, final Path path) {
146    return fs.getDefaultBlockSize(path);
147  }
148
149  /*
150   * Get the default replication.
151   * @param fs filesystem object
152   * @param f path of file
153   * @return default replication for the path's filesystem
154   */
155  public static short getDefaultReplication(final FileSystem fs, final Path path) {
156    return fs.getDefaultReplication(path);
157  }
158
159  /**
160   * Returns the default buffer size to use during writes. The size of the buffer should probably be
161   * a multiple of hardware page size (4096 on Intel x86), and it determines how much data is
162   * buffered during read and write operations.
163   * @param fs filesystem object
164   * @return default buffer size to use during writes
165   */
166  public static int getDefaultBufferSize(final FileSystem fs) {
167    return fs.getConf().getInt("io.file.buffer.size", 4096);
168  }
169
170  /**
171   * Create the specified file on the filesystem. By default, this will:
172   * <ol>
173   * <li>apply the umask in the configuration (if it is enabled)</li>
174   * <li>use the fs configured buffer size (or 4096 if not set)</li>
175   * <li>use the default replication</li>
176   * <li>use the default block size</li>
177   * <li>not track progress</li>
178   * </ol>
179   * @param fs        {@link FileSystem} on which to write the file
180   * @param path      {@link Path} to the file to write
181   * @param perm      intial permissions
182   * @param overwrite Whether or not the created file should be overwritten.
183   * @return output stream to the created file
184   * @throws IOException if the file cannot be created
185   */
186  public static FSDataOutputStream create(FileSystem fs, Path path, FsPermission perm,
187    boolean overwrite) throws IOException {
188    if (LOG.isTraceEnabled()) {
189      LOG.trace("Creating file={} with permission={}, overwrite={}", path, perm, overwrite);
190    }
191    return fs.create(path, perm, overwrite, getDefaultBufferSize(fs),
192      getDefaultReplication(fs, path), getDefaultBlockSize(fs, path), null);
193  }
194
195  /**
196   * Get the file permissions specified in the configuration, if they are enabled.
197   * @param fs               filesystem that the file will be created on.
198   * @param conf             configuration to read for determining if permissions are enabled and
199   *                         which to use
200   * @param permssionConfKey property key in the configuration to use when finding the permission
201   * @return the permission to use when creating a new file on the fs. If special permissions are
202   *         not specified in the configuration, then the default permissions on the the fs will be
203   *         returned.
204   */
205  public static FsPermission getFilePermissions(final FileSystem fs, final Configuration conf,
206    final String permssionConfKey) {
207    boolean enablePermissions = conf.getBoolean(HConstants.ENABLE_DATA_FILE_UMASK, false);
208
209    if (enablePermissions) {
210      try {
211        FsPermission perm = new FsPermission(FULL_RWX_PERMISSIONS);
212        // make sure that we have a mask, if not, go default.
213        String mask = conf.get(permssionConfKey);
214        if (mask == null) {
215          return FsPermission.getFileDefault();
216        }
217        // appy the umask
218        FsPermission umask = new FsPermission(mask);
219        return perm.applyUMask(umask);
220      } catch (IllegalArgumentException e) {
221        LOG.warn("Incorrect umask attempted to be created: " + conf.get(permssionConfKey)
222          + ", using default file permissions.", e);
223        return FsPermission.getFileDefault();
224      }
225    }
226    return FsPermission.getFileDefault();
227  }
228
229  /**
230   * Verifies root directory path is a valid URI with a scheme
231   * @param root root directory path
232   * @return Passed <code>root</code> argument.
233   * @throws IOException if not a valid URI with a scheme
234   */
235  public static Path validateRootPath(Path root) throws IOException {
236    try {
237      URI rootURI = new URI(root.toString());
238      String scheme = rootURI.getScheme();
239      if (scheme == null) {
240        throw new IOException("Root directory does not have a scheme");
241      }
242      return root;
243    } catch (URISyntaxException e) {
244      throw new IOException("Root directory path is not a valid " + "URI -- check your "
245        + HConstants.HBASE_DIR + " configuration", e);
246    }
247  }
248
249  /**
250   * Checks for the presence of the WAL log root path (using the provided conf object) in the given
251   * path. If it exists, this method removes it and returns the String representation of remaining
252   * relative path.
253   * @param path must not be null
254   * @param conf must not be null
255   * @return String representation of the remaining relative path
256   * @throws IOException from underlying filesystem
257   */
258  public static String removeWALRootPath(Path path, final Configuration conf) throws IOException {
259    Path root = getWALRootDir(conf);
260    String pathStr = path.toString();
261    // check that the path is absolute... it has the root path in it.
262    if (!pathStr.startsWith(root.toString())) {
263      return pathStr;
264    }
265    // if not, return as it is.
266    return pathStr.substring(root.toString().length() + 1);// remove the "/" too.
267  }
268
269  /**
270   * Return the 'path' component of a Path. In Hadoop, Path is a URI. This method returns the 'path'
271   * component of a Path's URI: e.g. If a Path is
272   * <code>hdfs://example.org:9000/hbase_trunk/TestTable/compaction.dir</code>, this method returns
273   * <code>/hbase_trunk/TestTable/compaction.dir</code>. This method is useful if you want to print
274   * out a Path without qualifying Filesystem instance.
275   * @param p Filesystem Path whose 'path' component we are to return.
276   * @return Path portion of the Filesystem
277   */
278  public static String getPath(Path p) {
279    return p.toUri().getPath();
280  }
281
282  /**
283   * Get the path for the root data directory
284   * @param c configuration
285   * @return {@link Path} to hbase root directory from configuration as a qualified Path.
286   * @throws IOException e
287   */
288  public static Path getRootDir(final Configuration c) throws IOException {
289    Path p = new Path(c.get(HConstants.HBASE_DIR));
290    FileSystem fs = p.getFileSystem(c);
291    return p.makeQualified(fs.getUri(), fs.getWorkingDirectory());
292  }
293
294  public static void setRootDir(final Configuration c, final Path root) {
295    c.set(HConstants.HBASE_DIR, root.toString());
296  }
297
298  public static void setFsDefault(final Configuration c, final Path root) {
299    c.set("fs.defaultFS", root.toString()); // for hadoop 0.21+
300  }
301
302  public static void setFsDefault(final Configuration c, final String uri) {
303    c.set("fs.defaultFS", uri); // for hadoop 0.21+
304  }
305
306  public static FileSystem getRootDirFileSystem(final Configuration c) throws IOException {
307    Path p = getRootDir(c);
308    return p.getFileSystem(c);
309  }
310
311  /**
312   * Get the path for the root directory for WAL data
313   * @param c configuration
314   * @return {@link Path} to hbase log root directory: e.g. {@value HBASE_WAL_DIR} from
315   *         configuration as a qualified Path. Defaults to HBase root dir.
316   * @throws IOException e
317   */
318  public static Path getWALRootDir(final Configuration c) throws IOException {
319    Path p = new Path(c.get(HBASE_WAL_DIR, c.get(HConstants.HBASE_DIR)));
320    if (!isValidWALRootDir(p, c)) {
321      return getRootDir(c);
322    }
323    FileSystem fs = p.getFileSystem(c);
324    return p.makeQualified(fs.getUri(), fs.getWorkingDirectory());
325  }
326
327  /**
328   * Returns the URI in the string format
329   * @param c configuration
330   * @param p path
331   * @return - the URI's to string format
332   */
333  public static String getDirUri(final Configuration c, Path p) throws IOException {
334    if (p.toUri().getScheme() != null) {
335      return p.toUri().toString();
336    }
337    return null;
338  }
339
340  public static void setWALRootDir(final Configuration c, final Path root) {
341    c.set(HBASE_WAL_DIR, root.toString());
342  }
343
344  public static FileSystem getWALFileSystem(final Configuration c) throws IOException {
345    Path p = getWALRootDir(c);
346    FileSystem fs = p.getFileSystem(c);
347    // hadoop-core does fs caching, so need to propagate this if set
348    String enforceStreamCapability = c.get(UNSAFE_STREAM_CAPABILITY_ENFORCE);
349    if (enforceStreamCapability != null) {
350      fs.getConf().set(UNSAFE_STREAM_CAPABILITY_ENFORCE, enforceStreamCapability);
351    }
352    return fs;
353  }
354
355  private static boolean isValidWALRootDir(Path walDir, final Configuration c) throws IOException {
356    Path rootDir = getRootDir(c);
357    FileSystem fs = walDir.getFileSystem(c);
358    Path qualifiedWalDir = walDir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
359    if (!qualifiedWalDir.equals(rootDir)) {
360      if (qualifiedWalDir.toString().startsWith(rootDir.toString() + "/")) {
361        throw new IllegalStateException("Illegal WAL directory specified. "
362          + "WAL directories are not permitted to be under root directory: rootDir="
363          + rootDir.toString() + ", qualifiedWALDir=" + qualifiedWalDir);
364      }
365    }
366    return true;
367  }
368
369  /**
370   * Returns the WAL region directory based on the given table name and region name
371   * @param conf              configuration to determine WALRootDir
372   * @param tableName         Table that the region is under
373   * @param encodedRegionName Region name used for creating the final region directory
374   * @return the region directory used to store WALs under the WALRootDir
375   * @throws IOException if there is an exception determining the WALRootDir
376   */
377  public static Path getWALRegionDir(final Configuration conf, final TableName tableName,
378    final String encodedRegionName) throws IOException {
379    return new Path(getWALTableDir(conf, tableName), encodedRegionName);
380  }
381
382  /**
383   * Returns the Table directory under the WALRootDir for the specified table name
384   * @param conf      configuration used to get the WALRootDir
385   * @param tableName Table to get the directory for
386   * @return a path to the WAL table directory for the specified table
387   * @throws IOException if there is an exception determining the WALRootDir
388   */
389  public static Path getWALTableDir(final Configuration conf, final TableName tableName)
390    throws IOException {
391    Path baseDir = new Path(getWALRootDir(conf), HConstants.BASE_NAMESPACE_DIR);
392    return new Path(new Path(baseDir, tableName.getNamespaceAsString()),
393      tableName.getQualifierAsString());
394  }
395
396  /**
397   * For backward compatibility with HBASE-20734, where we store recovered edits in a wrong
398   * directory without BASE_NAMESPACE_DIR. See HBASE-22617 for more details.
399   * @deprecated For compatibility, will be removed in 4.0.0.
400   */
401  @Deprecated
402  public static Path getWrongWALRegionDir(final Configuration conf, final TableName tableName,
403    final String encodedRegionName) throws IOException {
404    Path wrongTableDir = new Path(new Path(getWALRootDir(conf), tableName.getNamespaceAsString()),
405      tableName.getQualifierAsString());
406    return new Path(wrongTableDir, encodedRegionName);
407  }
408
409  /**
410   * Returns the {@link org.apache.hadoop.fs.Path} object representing the table directory under
411   * path rootdir
412   * @param rootdir   qualified path of HBase root directory
413   * @param tableName name of table
414   * @return {@link org.apache.hadoop.fs.Path} for table
415   */
416  public static Path getTableDir(Path rootdir, final TableName tableName) {
417    return new Path(getNamespaceDir(rootdir, tableName.getNamespaceAsString()),
418      tableName.getQualifierAsString());
419  }
420
421  /**
422   * Returns the {@link org.apache.hadoop.fs.Path} object representing the region directory under
423   * path rootdir
424   * @param rootdir    qualified path of HBase root directory
425   * @param tableName  name of table
426   * @param regionName The encoded region name
427   * @return {@link org.apache.hadoop.fs.Path} for region
428   */
429  public static Path getRegionDir(Path rootdir, TableName tableName, String regionName) {
430    return new Path(getTableDir(rootdir, tableName), regionName);
431  }
432
433  /**
434   * Returns the {@link org.apache.hadoop.hbase.TableName} object representing the table directory
435   * under path rootdir
436   * @param tablePath path of table
437   * @return {@link org.apache.hadoop.fs.Path} for table
438   */
439  public static TableName getTableName(Path tablePath) {
440    return TableName.valueOf(tablePath.getParent().getName(), tablePath.getName());
441  }
442
443  /**
444   * Returns the {@link org.apache.hadoop.fs.Path} object representing the namespace directory under
445   * path rootdir
446   * @param rootdir   qualified path of HBase root directory
447   * @param namespace namespace name
448   * @return {@link org.apache.hadoop.fs.Path} for table
449   */
450  public static Path getNamespaceDir(Path rootdir, final String namespace) {
451    return new Path(rootdir, new Path(HConstants.BASE_NAMESPACE_DIR, new Path(namespace)));
452  }
453
454  // this mapping means that under a federated FileSystem implementation, we'll
455  // only log the first failure from any of the underlying FileSystems at WARN and all others
456  // will be at DEBUG.
457  private static final Map<FileSystem, Boolean> warningMap = new ConcurrentHashMap<>();
458
459  /**
460   * Sets storage policy for given path. If the passed path is a directory, we'll set the storage
461   * policy for all files created in the future in said directory. Note that this change in storage
462   * policy takes place at the FileSystem level; it will persist beyond this RS's lifecycle. If
463   * we're running on a version of FileSystem that doesn't support the given storage policy (or
464   * storage policies at all), then we'll issue a log message and continue. See
465   * http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html
466   * @param fs            We only do anything it implements a setStoragePolicy method
467   * @param path          the Path whose storage policy is to be set
468   * @param storagePolicy Policy to set on <code>path</code>; see hadoop 2.6+
469   *                      org.apache.hadoop.hdfs.protocol.HdfsConstants for possible list e.g
470   *                      'COLD', 'WARM', 'HOT', 'ONE_SSD', 'ALL_SSD', 'LAZY_PERSIST'.
471   */
472  public static void setStoragePolicy(final FileSystem fs, final Path path,
473    final String storagePolicy) {
474    try {
475      setStoragePolicy(fs, path, storagePolicy, false);
476    } catch (IOException e) {
477      // should never arrive here
478      LOG.warn("We have chosen not to throw exception but some unexpectedly thrown out", e);
479    }
480  }
481
482  static void setStoragePolicy(final FileSystem fs, final Path path, final String storagePolicy,
483    boolean throwException) throws IOException {
484    if (storagePolicy == null) {
485      if (LOG.isTraceEnabled()) {
486        LOG.trace("We were passed a null storagePolicy, exiting early.");
487      }
488      return;
489    }
490    String trimmedStoragePolicy = storagePolicy.trim();
491    if (trimmedStoragePolicy.isEmpty()) {
492      LOG.trace("We were passed an empty storagePolicy, exiting early.");
493      return;
494    } else {
495      trimmedStoragePolicy = trimmedStoragePolicy.toUpperCase(Locale.ROOT);
496    }
497    if (trimmedStoragePolicy.equals(HConstants.DEFER_TO_HDFS_STORAGE_POLICY)) {
498      LOG.trace("We were passed the defer-to-hdfs policy {}, exiting early.", trimmedStoragePolicy);
499      return;
500    }
501    try {
502      invokeSetStoragePolicy(fs, path, trimmedStoragePolicy);
503    } catch (IOException e) {
504      LOG.trace("Failed to invoke set storage policy API on FS", e);
505      if (throwException) {
506        throw e;
507      }
508    }
509  }
510
511  /*
512   * All args have been checked and are good. Run the setStoragePolicy invocation.
513   */
514  private static void invokeSetStoragePolicy(final FileSystem fs, final Path path,
515    final String storagePolicy) throws IOException {
516    Exception toThrow = null;
517
518    try {
519      fs.setStoragePolicy(path, storagePolicy);
520      LOG.debug("Set storagePolicy={} for path={}", storagePolicy, path);
521    } catch (Exception e) {
522      toThrow = e;
523      // This swallows FNFE, should we be throwing it? seems more likely to indicate dev
524      // misuse than a runtime problem with HDFS.
525      if (!warningMap.containsKey(fs)) {
526        warningMap.put(fs, true);
527        LOG.warn("Unable to set storagePolicy=" + storagePolicy + " for path=" + path + ". "
528          + "DEBUG log level might have more details.", e);
529      } else if (LOG.isDebugEnabled()) {
530        LOG.debug("Unable to set storagePolicy=" + storagePolicy + " for path=" + path, e);
531      }
532
533      // Hadoop 2.8+, 3.0-a1+ added FileSystem.setStoragePolicy with a default implementation
534      // that throws UnsupportedOperationException
535      if (e instanceof UnsupportedOperationException) {
536        if (LOG.isDebugEnabled()) {
537          LOG.debug("The underlying FileSystem implementation doesn't support "
538            + "setStoragePolicy. This is probably intentional on their part, since HDFS-9345 "
539            + "appears to be present in your version of Hadoop. For more information check "
540            + "the Hadoop documentation on 'ArchivalStorage', the Hadoop FileSystem "
541            + "specification docs from HADOOP-11981, and/or related documentation from the "
542            + "provider of the underlying FileSystem (its name should appear in the "
543            + "stacktrace that accompanies this message). Note in particular that Hadoop's "
544            + "local filesystem implementation doesn't support storage policies.", e);
545        }
546      }
547    }
548
549    if (toThrow != null) {
550      throw new IOException(toThrow);
551    }
552  }
553
554  /**
555   * Return true if this is a filesystem whose scheme is 'hdfs'.
556   * @throws IOException from underlying FileSystem
557   */
558  public static boolean isHDFS(final Configuration conf) throws IOException {
559    FileSystem fs = FileSystem.get(conf);
560    String scheme = fs.getUri().getScheme();
561    return scheme.equalsIgnoreCase("hdfs");
562  }
563
564  /**
565   * Checks if the given path is the one with 'recovered.edits' dir.
566   * @param path must not be null
567   * @return True if we recovered edits
568   */
569  public static boolean isRecoveredEdits(Path path) {
570    return path.toString().contains(HConstants.RECOVERED_EDITS_DIR);
571  }
572
573  /**
574   * Returns the filesystem of the hbase rootdir.
575   * @throws IOException from underlying FileSystem
576   */
577  public static FileSystem getCurrentFileSystem(Configuration conf) throws IOException {
578    return getRootDir(conf).getFileSystem(conf);
579  }
580
581  /**
582   * Calls fs.listStatus() and treats FileNotFoundException as non-fatal This accommodates
583   * differences between hadoop versions, where hadoop 1 does not throw a FileNotFoundException, and
584   * return an empty FileStatus[] while Hadoop 2 will throw FileNotFoundException. Where possible,
585   * prefer FSUtils#listStatusWithStatusFilter(FileSystem, Path, FileStatusFilter) instead.
586   * @param fs     file system
587   * @param dir    directory
588   * @param filter path filter
589   * @return null if dir is empty or doesn't exist, otherwise FileStatus array
590   */
591  public static FileStatus[] listStatus(final FileSystem fs, final Path dir,
592    final PathFilter filter) throws IOException {
593    FileStatus[] status = null;
594    try {
595      status = filter == null ? fs.listStatus(dir) : fs.listStatus(dir, filter);
596    } catch (FileNotFoundException fnfe) {
597      // if directory doesn't exist, return null
598      if (LOG.isTraceEnabled()) {
599        LOG.trace("{} doesn't exist", dir);
600      }
601    }
602    if (status == null || status.length < 1) {
603      return null;
604    }
605    return status;
606  }
607
608  /**
609   * Calls fs.listStatus() and treats FileNotFoundException as non-fatal This would accommodates
610   * differences between hadoop versions
611   * @param fs  file system
612   * @param dir directory
613   * @return null if dir is empty or doesn't exist, otherwise FileStatus array
614   */
615  public static FileStatus[] listStatus(final FileSystem fs, final Path dir) throws IOException {
616    return listStatus(fs, dir, null);
617  }
618
619  /**
620   * Calls fs.listFiles() to get FileStatus and BlockLocations together for reducing rpc call
621   * @param fs  file system
622   * @param dir directory
623   * @return LocatedFileStatus list
624   */
625  public static List<LocatedFileStatus> listLocatedStatus(final FileSystem fs, final Path dir)
626    throws IOException {
627    List<LocatedFileStatus> status = null;
628    try {
629      RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fs.listFiles(dir, false);
630      while (locatedFileStatusRemoteIterator.hasNext()) {
631        if (status == null) {
632          status = Lists.newArrayList();
633        }
634        status.add(locatedFileStatusRemoteIterator.next());
635      }
636    } catch (FileNotFoundException fnfe) {
637      // if directory doesn't exist, return null
638      if (LOG.isTraceEnabled()) {
639        LOG.trace("{} doesn't exist", dir);
640      }
641    }
642    return status;
643  }
644
645  /**
646   * Calls fs.delete() and returns the value returned by the fs.delete()
647   * @param fs        must not be null
648   * @param path      must not be null
649   * @param recursive delete tree rooted at path
650   * @return the value returned by the fs.delete()
651   * @throws IOException from underlying FileSystem
652   */
653  public static boolean delete(final FileSystem fs, final Path path, final boolean recursive)
654    throws IOException {
655    return fs.delete(path, recursive);
656  }
657
658  /**
659   * Calls fs.exists(). Checks if the specified path exists
660   * @param fs   must not be null
661   * @param path must not be null
662   * @return the value returned by fs.exists()
663   * @throws IOException from underlying FileSystem
664   */
665  public static boolean isExists(final FileSystem fs, final Path path) throws IOException {
666    return fs.exists(path);
667  }
668
669  /**
670   * Log the current state of the filesystem from a certain root directory
671   * @param fs   filesystem to investigate
672   * @param root root file/directory to start logging from
673   * @param log  log to output information
674   * @throws IOException if an unexpected exception occurs
675   */
676  public static void logFileSystemState(final FileSystem fs, final Path root, Logger log)
677    throws IOException {
678    log.debug("File system contents for path {}", root);
679    logFSTree(log, fs, root, "|-");
680  }
681
682  /**
683   * Recursive helper to log the state of the FS
684   * @see #logFileSystemState(FileSystem, Path, Logger)
685   */
686  private static void logFSTree(Logger log, final FileSystem fs, final Path root, String prefix)
687    throws IOException {
688    FileStatus[] files = listStatus(fs, root, null);
689    if (files == null) {
690      return;
691    }
692
693    for (FileStatus file : files) {
694      if (file.isDirectory()) {
695        log.debug(prefix + file.getPath().getName() + "/");
696        logFSTree(log, fs, file.getPath(), prefix + "---");
697      } else {
698        log.debug(prefix + file.getPath().getName());
699      }
700    }
701  }
702
703  public static boolean renameAndSetModifyTime(final FileSystem fs, final Path src, final Path dest)
704    throws IOException {
705    // set the modify time for TimeToLive Cleaner
706    fs.setTimes(src, EnvironmentEdgeManager.currentTime(), -1);
707    return fs.rename(src, dest);
708  }
709
710  /**
711   * Check if short circuit read buffer size is set and if not, set it to hbase value.
712   * @param conf must not be null
713   */
714  public static void checkShortCircuitReadBufferSize(final Configuration conf) {
715    final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2;
716    final int notSet = -1;
717    // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2
718    final String dfsKey = "dfs.client.read.shortcircuit.buffer.size";
719    int size = conf.getInt(dfsKey, notSet);
720    // If a size is set, return -- we will use it.
721    if (size != notSet) {
722      return;
723    }
724    // But short circuit buffer size is normally not set. Put in place the hbase wanted size.
725    int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize);
726    conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
727  }
728
729  /**
730   * Helper exception for those cases where the place where we need to check a stream capability is
731   * not where we have the needed context to explain the impact and mitigation for a lack.
732   */
733  public static class StreamLacksCapabilityException extends Exception {
734    public StreamLacksCapabilityException(String message, Throwable cause) {
735      super(message, cause);
736    }
737
738    public StreamLacksCapabilityException(String message) {
739      super(message);
740    }
741  }
742}