001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.net.URI;
023import java.net.URISyntaxException;
024import java.util.List;
025import java.util.Locale;
026import java.util.Map;
027import java.util.concurrent.ConcurrentHashMap;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FSDataOutputStream;
030import org.apache.hadoop.fs.FileStatus;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.LocatedFileStatus;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.fs.PathFilter;
035import org.apache.hadoop.fs.RemoteIterator;
036import org.apache.hadoop.fs.permission.FsPermission;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.yetus.audience.InterfaceAudience;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
044
045/**
046 * Utility methods for interacting with the underlying file system.
047 * <p/>
048 * Note that {@link #setStoragePolicy(FileSystem, Path, String)} is tested in TestFSUtils and
049 * pre-commit will run the hbase-server tests if there's code change in this class. See
050 * <a href="https://issues.apache.org/jira/browse/HBASE-20838">HBASE-20838</a> for more details.
051 */
052@InterfaceAudience.Private
053public final class CommonFSUtils {
054  private static final Logger LOG = LoggerFactory.getLogger(CommonFSUtils.class);
055
056  /** Parameter name for HBase WAL directory */
057  public static final String HBASE_WAL_DIR = "hbase.wal.dir";
058
059  /** Parameter to disable stream capability enforcement checks */
060  public static final String UNSAFE_STREAM_CAPABILITY_ENFORCE =
061    "hbase.unsafe.stream.capability.enforce";
062
063  /** Full access permissions (starting point for a umask) */
064  public static final String FULL_RWX_PERMISSIONS = "777";
065
066  private CommonFSUtils() {
067  }
068
069  /**
070   * Compare of path component. Does not consider schema; i.e. if schemas different but
071   * <code>path</code> starts with <code>rootPath</code>, then the function returns true
072   * @param rootPath value to check for
073   * @param path     subject to check
074   * @return True if <code>path</code> starts with <code>rootPath</code>
075   */
076  public static boolean isStartingWithPath(final Path rootPath, final String path) {
077    String uriRootPath = rootPath.toUri().getPath();
078    String tailUriPath = (new Path(path)).toUri().getPath();
079    return tailUriPath.startsWith(uriRootPath);
080  }
081
082  /**
083   * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
084   * '/a/b/c' part. Does not consider schema; i.e. if schemas different but path or subpath matches,
085   * the two will equate.
086   * @param pathToSearch Path we will be trying to match against.
087   * @param pathTail     what to match
088   * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
089   */
090  public static boolean isMatchingTail(final Path pathToSearch, String pathTail) {
091    return isMatchingTail(pathToSearch, new Path(pathTail));
092  }
093
094  /**
095   * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
096   * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true. Does not consider
097   * schema; i.e. if schemas different but path or subpath matches, the two will equate.
098   * @param pathToSearch Path we will be trying to match agains against
099   * @param pathTail     what to match
100   * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
101   */
102  public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) {
103    if (pathToSearch.depth() != pathTail.depth()) {
104      return false;
105    }
106    Path tailPath = pathTail;
107    String tailName;
108    Path toSearch = pathToSearch;
109    String toSearchName;
110    boolean result = false;
111    do {
112      tailName = tailPath.getName();
113      if (tailName == null || tailName.length() <= 0) {
114        result = true;
115        break;
116      }
117      toSearchName = toSearch.getName();
118      if (toSearchName == null || toSearchName.length() <= 0) {
119        break;
120      }
121      // Move up a parent on each path for next go around. Path doesn't let us go off the end.
122      tailPath = tailPath.getParent();
123      toSearch = toSearch.getParent();
124    } while (tailName.equals(toSearchName));
125    return result;
126  }
127
128  /**
129   * Delete if exists.
130   * @param fs  filesystem object
131   * @param dir directory to delete
132   * @return True if deleted <code>dir</code>
133   * @throws IOException e
134   */
135  public static boolean deleteDirectory(final FileSystem fs, final Path dir) throws IOException {
136    return fs.exists(dir) && fs.delete(dir, true);
137  }
138
139  /**
140   * Return the number of bytes that large input files should be optimally be split into to minimize
141   * i/o time.
142   * @param fs filesystem object
143   * @return the default block size for the path's filesystem
144   */
145  public static long getDefaultBlockSize(final FileSystem fs, final Path path) {
146    return fs.getDefaultBlockSize(path);
147  }
148
149  /*
150   * Get the default replication.
151   * @param fs filesystem object
152   * @param f path of file
153   * @return default replication for the path's filesystem
154   */
155  public static short getDefaultReplication(final FileSystem fs, final Path path) {
156    return fs.getDefaultReplication(path);
157  }
158
159  /**
160   * Returns the default buffer size to use during writes. The size of the buffer should probably be
161   * a multiple of hardware page size (4096 on Intel x86), and it determines how much data is
162   * buffered during read and write operations.
163   * @param fs filesystem object
164   * @return default buffer size to use during writes
165   */
166  public static int getDefaultBufferSize(final FileSystem fs) {
167    return fs.getConf().getInt("io.file.buffer.size", 4096);
168  }
169
170  /**
171   * Create the specified file on the filesystem. By default, this will:
172   * <ol>
173   * <li>apply the umask in the configuration (if it is enabled)</li>
174   * <li>use the fs configured buffer size (or 4096 if not set)</li>
175   * <li>use the default replication</li>
176   * <li>use the default block size</li>
177   * <li>not track progress</li>
178   * </ol>
179   * @param fs        {@link FileSystem} on which to write the file
180   * @param path      {@link Path} to the file to write
181   * @param perm      intial permissions
182   * @param overwrite Whether or not the created file should be overwritten.
183   * @return output stream to the created file
184   * @throws IOException if the file cannot be created
185   */
186  public static FSDataOutputStream create(FileSystem fs, Path path, FsPermission perm,
187    boolean overwrite) throws IOException {
188    if (LOG.isTraceEnabled()) {
189      LOG.trace("Creating file={} with permission={}, overwrite={}", path, perm, overwrite);
190    }
191    return fs.create(path, perm, overwrite, getDefaultBufferSize(fs),
192      getDefaultReplication(fs, path), getDefaultBlockSize(fs, path), null);
193  }
194
195  /**
196   * Get the file permissions specified in the configuration, if they are enabled.
197   * @param fs               filesystem that the file will be created on.
198   * @param conf             configuration to read for determining if permissions are enabled and
199   *                         which to use
200   * @param permssionConfKey property key in the configuration to use when finding the permission
201   * @return the permission to use when creating a new file on the fs. If special permissions are
202   *         not specified in the configuration, then the default permissions on the the fs will be
203   *         returned.
204   */
205  public static FsPermission getFilePermissions(final FileSystem fs, final Configuration conf,
206    final String permssionConfKey) {
207    boolean enablePermissions = conf.getBoolean(HConstants.ENABLE_DATA_FILE_UMASK, false);
208
209    if (enablePermissions) {
210      try {
211        FsPermission perm = new FsPermission(FULL_RWX_PERMISSIONS);
212        // make sure that we have a mask, if not, go default.
213        String mask = conf.get(permssionConfKey);
214        if (mask == null) {
215          return FsPermission.getFileDefault();
216        }
217        // appy the umask
218        FsPermission umask = new FsPermission(mask);
219        return perm.applyUMask(umask);
220      } catch (IllegalArgumentException e) {
221        LOG.warn("Incorrect umask attempted to be created: " + conf.get(permssionConfKey)
222          + ", using default file permissions.", e);
223        return FsPermission.getFileDefault();
224      }
225    }
226    return FsPermission.getFileDefault();
227  }
228
229  /**
230   * Verifies root directory path is a valid URI with a scheme
231   * @param root root directory path
232   * @return Passed <code>root</code> argument.
233   * @throws IOException if not a valid URI with a scheme
234   */
235  public static Path validateRootPath(Path root) throws IOException {
236    try {
237      URI rootURI = new URI(root.toString());
238      String scheme = rootURI.getScheme();
239      if (scheme == null) {
240        throw new IOException("Root directory does not have a scheme");
241      }
242      return root;
243    } catch (URISyntaxException e) {
244      throw new IOException("Root directory path is not a valid " + "URI -- check your "
245        + HConstants.HBASE_DIR + " configuration", e);
246    }
247  }
248
249  /**
250   * Checks for the presence of the WAL log root path (using the provided conf object) in the given
251   * path. If it exists, this method removes it and returns the String representation of remaining
252   * relative path.
253   * @param path must not be null
254   * @param conf must not be null
255   * @return String representation of the remaining relative path
256   * @throws IOException from underlying filesystem
257   */
258  public static String removeWALRootPath(Path path, final Configuration conf) throws IOException {
259    Path root = getWALRootDir(conf);
260    String pathStr = path.toString();
261    // check that the path is absolute... it has the root path in it.
262    if (!pathStr.startsWith(root.toString())) {
263      return pathStr;
264    }
265    // if not, return as it is.
266    return pathStr.substring(root.toString().length() + 1);// remove the "/" too.
267  }
268
269  /**
270   * Return the 'path' component of a Path. In Hadoop, Path is a URI. This method returns the 'path'
271   * component of a Path's URI: e.g. If a Path is
272   * <code>hdfs://example.org:9000/hbase_trunk/TestTable/compaction.dir</code>, this method returns
273   * <code>/hbase_trunk/TestTable/compaction.dir</code>. This method is useful if you want to print
274   * out a Path without qualifying Filesystem instance.
275   * @param p Filesystem Path whose 'path' component we are to return.
276   * @return Path portion of the Filesystem
277   */
278  public static String getPath(Path p) {
279    return p.toUri().getPath();
280  }
281
282  /**
283   * @param c configuration
284   * @return {@link Path} to hbase root directory from configuration as a qualified Path.
285   * @throws IOException e
286   */
287  public static Path getRootDir(final Configuration c) throws IOException {
288    Path p = new Path(c.get(HConstants.HBASE_DIR));
289    FileSystem fs = p.getFileSystem(c);
290    return p.makeQualified(fs.getUri(), fs.getWorkingDirectory());
291  }
292
293  public static void setRootDir(final Configuration c, final Path root) {
294    c.set(HConstants.HBASE_DIR, root.toString());
295  }
296
297  public static void setFsDefault(final Configuration c, final Path root) {
298    c.set("fs.defaultFS", root.toString()); // for hadoop 0.21+
299  }
300
301  public static void setFsDefault(final Configuration c, final String uri) {
302    c.set("fs.defaultFS", uri); // for hadoop 0.21+
303  }
304
305  public static FileSystem getRootDirFileSystem(final Configuration c) throws IOException {
306    Path p = getRootDir(c);
307    return p.getFileSystem(c);
308  }
309
310  /**
311   * @param c configuration
312   * @return {@link Path} to hbase log root directory: e.g. {@value HBASE_WAL_DIR} from
313   *         configuration as a qualified Path. Defaults to HBase root dir.
314   * @throws IOException e
315   */
316  public static Path getWALRootDir(final Configuration c) throws IOException {
317    Path p = new Path(c.get(HBASE_WAL_DIR, c.get(HConstants.HBASE_DIR)));
318    if (!isValidWALRootDir(p, c)) {
319      return getRootDir(c);
320    }
321    FileSystem fs = p.getFileSystem(c);
322    return p.makeQualified(fs.getUri(), fs.getWorkingDirectory());
323  }
324
325  /**
326   * Returns the URI in the string format
327   * @param c configuration
328   * @param p path
329   * @return - the URI's to string format n
330   */
331  public static String getDirUri(final Configuration c, Path p) throws IOException {
332    if (p.toUri().getScheme() != null) {
333      return p.toUri().toString();
334    }
335    return null;
336  }
337
338  public static void setWALRootDir(final Configuration c, final Path root) {
339    c.set(HBASE_WAL_DIR, root.toString());
340  }
341
342  public static FileSystem getWALFileSystem(final Configuration c) throws IOException {
343    Path p = getWALRootDir(c);
344    FileSystem fs = p.getFileSystem(c);
345    // hadoop-core does fs caching, so need to propagate this if set
346    String enforceStreamCapability = c.get(UNSAFE_STREAM_CAPABILITY_ENFORCE);
347    if (enforceStreamCapability != null) {
348      fs.getConf().set(UNSAFE_STREAM_CAPABILITY_ENFORCE, enforceStreamCapability);
349    }
350    return fs;
351  }
352
353  private static boolean isValidWALRootDir(Path walDir, final Configuration c) throws IOException {
354    Path rootDir = getRootDir(c);
355    FileSystem fs = walDir.getFileSystem(c);
356    Path qualifiedWalDir = walDir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
357    if (!qualifiedWalDir.equals(rootDir)) {
358      if (qualifiedWalDir.toString().startsWith(rootDir.toString() + "/")) {
359        throw new IllegalStateException("Illegal WAL directory specified. "
360          + "WAL directories are not permitted to be under root directory: rootDir="
361          + rootDir.toString() + ", qualifiedWALDir=" + qualifiedWalDir);
362      }
363    }
364    return true;
365  }
366
367  /**
368   * Returns the WAL region directory based on the given table name and region name
369   * @param conf              configuration to determine WALRootDir
370   * @param tableName         Table that the region is under
371   * @param encodedRegionName Region name used for creating the final region directory
372   * @return the region directory used to store WALs under the WALRootDir
373   * @throws IOException if there is an exception determining the WALRootDir
374   */
375  public static Path getWALRegionDir(final Configuration conf, final TableName tableName,
376    final String encodedRegionName) throws IOException {
377    return new Path(getWALTableDir(conf, tableName), encodedRegionName);
378  }
379
380  /**
381   * Returns the Table directory under the WALRootDir for the specified table name
382   * @param conf      configuration used to get the WALRootDir
383   * @param tableName Table to get the directory for
384   * @return a path to the WAL table directory for the specified table
385   * @throws IOException if there is an exception determining the WALRootDir
386   */
387  public static Path getWALTableDir(final Configuration conf, final TableName tableName)
388    throws IOException {
389    Path baseDir = new Path(getWALRootDir(conf), HConstants.BASE_NAMESPACE_DIR);
390    return new Path(new Path(baseDir, tableName.getNamespaceAsString()),
391      tableName.getQualifierAsString());
392  }
393
394  /**
395   * For backward compatibility with HBASE-20734, where we store recovered edits in a wrong
396   * directory without BASE_NAMESPACE_DIR. See HBASE-22617 for more details.
397   * @deprecated For compatibility, will be removed in 4.0.0.
398   */
399  @Deprecated
400  public static Path getWrongWALRegionDir(final Configuration conf, final TableName tableName,
401    final String encodedRegionName) throws IOException {
402    Path wrongTableDir = new Path(new Path(getWALRootDir(conf), tableName.getNamespaceAsString()),
403      tableName.getQualifierAsString());
404    return new Path(wrongTableDir, encodedRegionName);
405  }
406
407  /**
408   * Returns the {@link org.apache.hadoop.fs.Path} object representing the table directory under
409   * path rootdir
410   * @param rootdir   qualified path of HBase root directory
411   * @param tableName name of table
412   * @return {@link org.apache.hadoop.fs.Path} for table
413   */
414  public static Path getTableDir(Path rootdir, final TableName tableName) {
415    return new Path(getNamespaceDir(rootdir, tableName.getNamespaceAsString()),
416      tableName.getQualifierAsString());
417  }
418
419  /**
420   * Returns the {@link org.apache.hadoop.fs.Path} object representing the region directory under
421   * path rootdir
422   * @param rootdir    qualified path of HBase root directory
423   * @param tableName  name of table
424   * @param regionName The encoded region name
425   * @return {@link org.apache.hadoop.fs.Path} for region
426   */
427  public static Path getRegionDir(Path rootdir, TableName tableName, String regionName) {
428    return new Path(getTableDir(rootdir, tableName), regionName);
429  }
430
431  /**
432   * Returns the {@link org.apache.hadoop.hbase.TableName} object representing the table directory
433   * under path rootdir
434   * @param tablePath path of table
435   * @return {@link org.apache.hadoop.fs.Path} for table
436   */
437  public static TableName getTableName(Path tablePath) {
438    return TableName.valueOf(tablePath.getParent().getName(), tablePath.getName());
439  }
440
441  /**
442   * Returns the {@link org.apache.hadoop.fs.Path} object representing the namespace directory under
443   * path rootdir
444   * @param rootdir   qualified path of HBase root directory
445   * @param namespace namespace name
446   * @return {@link org.apache.hadoop.fs.Path} for table
447   */
448  public static Path getNamespaceDir(Path rootdir, final String namespace) {
449    return new Path(rootdir, new Path(HConstants.BASE_NAMESPACE_DIR, new Path(namespace)));
450  }
451
452  // this mapping means that under a federated FileSystem implementation, we'll
453  // only log the first failure from any of the underlying FileSystems at WARN and all others
454  // will be at DEBUG.
455  private static final Map<FileSystem, Boolean> warningMap = new ConcurrentHashMap<>();
456
457  /**
458   * Sets storage policy for given path. If the passed path is a directory, we'll set the storage
459   * policy for all files created in the future in said directory. Note that this change in storage
460   * policy takes place at the FileSystem level; it will persist beyond this RS's lifecycle. If
461   * we're running on a version of FileSystem that doesn't support the given storage policy (or
462   * storage policies at all), then we'll issue a log message and continue. See
463   * http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html
464   * @param fs            We only do anything it implements a setStoragePolicy method
465   * @param path          the Path whose storage policy is to be set
466   * @param storagePolicy Policy to set on <code>path</code>; see hadoop 2.6+
467   *                      org.apache.hadoop.hdfs.protocol.HdfsConstants for possible list e.g
468   *                      'COLD', 'WARM', 'HOT', 'ONE_SSD', 'ALL_SSD', 'LAZY_PERSIST'.
469   */
470  public static void setStoragePolicy(final FileSystem fs, final Path path,
471    final String storagePolicy) {
472    try {
473      setStoragePolicy(fs, path, storagePolicy, false);
474    } catch (IOException e) {
475      // should never arrive here
476      LOG.warn("We have chosen not to throw exception but some unexpectedly thrown out", e);
477    }
478  }
479
480  static void setStoragePolicy(final FileSystem fs, final Path path, final String storagePolicy,
481    boolean throwException) throws IOException {
482    if (storagePolicy == null) {
483      if (LOG.isTraceEnabled()) {
484        LOG.trace("We were passed a null storagePolicy, exiting early.");
485      }
486      return;
487    }
488    String trimmedStoragePolicy = storagePolicy.trim();
489    if (trimmedStoragePolicy.isEmpty()) {
490      LOG.trace("We were passed an empty storagePolicy, exiting early.");
491      return;
492    } else {
493      trimmedStoragePolicy = trimmedStoragePolicy.toUpperCase(Locale.ROOT);
494    }
495    if (trimmedStoragePolicy.equals(HConstants.DEFER_TO_HDFS_STORAGE_POLICY)) {
496      LOG.trace("We were passed the defer-to-hdfs policy {}, exiting early.", trimmedStoragePolicy);
497      return;
498    }
499    try {
500      invokeSetStoragePolicy(fs, path, trimmedStoragePolicy);
501    } catch (IOException e) {
502      LOG.trace("Failed to invoke set storage policy API on FS", e);
503      if (throwException) {
504        throw e;
505      }
506    }
507  }
508
509  /*
510   * All args have been checked and are good. Run the setStoragePolicy invocation.
511   */
512  private static void invokeSetStoragePolicy(final FileSystem fs, final Path path,
513    final String storagePolicy) throws IOException {
514    Exception toThrow = null;
515
516    try {
517      fs.setStoragePolicy(path, storagePolicy);
518      LOG.debug("Set storagePolicy={} for path={}", storagePolicy, path);
519    } catch (Exception e) {
520      toThrow = e;
521      // This swallows FNFE, should we be throwing it? seems more likely to indicate dev
522      // misuse than a runtime problem with HDFS.
523      if (!warningMap.containsKey(fs)) {
524        warningMap.put(fs, true);
525        LOG.warn("Unable to set storagePolicy=" + storagePolicy + " for path=" + path + ". "
526          + "DEBUG log level might have more details.", e);
527      } else if (LOG.isDebugEnabled()) {
528        LOG.debug("Unable to set storagePolicy=" + storagePolicy + " for path=" + path, e);
529      }
530
531      // Hadoop 2.8+, 3.0-a1+ added FileSystem.setStoragePolicy with a default implementation
532      // that throws UnsupportedOperationException
533      if (e instanceof UnsupportedOperationException) {
534        if (LOG.isDebugEnabled()) {
535          LOG.debug("The underlying FileSystem implementation doesn't support "
536            + "setStoragePolicy. This is probably intentional on their part, since HDFS-9345 "
537            + "appears to be present in your version of Hadoop. For more information check "
538            + "the Hadoop documentation on 'ArchivalStorage', the Hadoop FileSystem "
539            + "specification docs from HADOOP-11981, and/or related documentation from the "
540            + "provider of the underlying FileSystem (its name should appear in the "
541            + "stacktrace that accompanies this message). Note in particular that Hadoop's "
542            + "local filesystem implementation doesn't support storage policies.", e);
543        }
544      }
545    }
546
547    if (toThrow != null) {
548      throw new IOException(toThrow);
549    }
550  }
551
552  /**
553   * @param conf must not be null
554   * @return True if this filesystem whose scheme is 'hdfs'.
555   * @throws IOException from underlying FileSystem
556   */
557  public static boolean isHDFS(final Configuration conf) throws IOException {
558    FileSystem fs = FileSystem.get(conf);
559    String scheme = fs.getUri().getScheme();
560    return scheme.equalsIgnoreCase("hdfs");
561  }
562
563  /**
564   * Checks if the given path is the one with 'recovered.edits' dir.
565   * @param path must not be null
566   * @return True if we recovered edits
567   */
568  public static boolean isRecoveredEdits(Path path) {
569    return path.toString().contains(HConstants.RECOVERED_EDITS_DIR);
570  }
571
572  /**
573   * @param conf must not be null
574   * @return Returns the filesystem of the hbase rootdir.
575   * @throws IOException from underlying FileSystem
576   */
577  public static FileSystem getCurrentFileSystem(Configuration conf) throws IOException {
578    return getRootDir(conf).getFileSystem(conf);
579  }
580
581  /**
582   * Calls fs.listStatus() and treats FileNotFoundException as non-fatal This accommodates
583   * differences between hadoop versions, where hadoop 1 does not throw a FileNotFoundException, and
584   * return an empty FileStatus[] while Hadoop 2 will throw FileNotFoundException. Where possible,
585   * prefer FSUtils#listStatusWithStatusFilter(FileSystem, Path, FileStatusFilter) instead.
586   * @param fs     file system
587   * @param dir    directory
588   * @param filter path filter
589   * @return null if dir is empty or doesn't exist, otherwise FileStatus array
590   */
591  public static FileStatus[] listStatus(final FileSystem fs, final Path dir,
592    final PathFilter filter) throws IOException {
593    FileStatus[] status = null;
594    try {
595      status = filter == null ? fs.listStatus(dir) : fs.listStatus(dir, filter);
596    } catch (FileNotFoundException fnfe) {
597      // if directory doesn't exist, return null
598      if (LOG.isTraceEnabled()) {
599        LOG.trace("{} doesn't exist", dir);
600      }
601    }
602    if (status == null || status.length < 1) {
603      return null;
604    }
605    return status;
606  }
607
608  /**
609   * Calls fs.listStatus() and treats FileNotFoundException as non-fatal This would accommodates
610   * differences between hadoop versions
611   * @param fs  file system
612   * @param dir directory
613   * @return null if dir is empty or doesn't exist, otherwise FileStatus array
614   */
615  public static FileStatus[] listStatus(final FileSystem fs, final Path dir) throws IOException {
616    return listStatus(fs, dir, null);
617  }
618
619  /**
620   * Calls fs.listFiles() to get FileStatus and BlockLocations together for reducing rpc call
621   * @param fs  file system
622   * @param dir directory
623   * @return LocatedFileStatus list
624   */
625  public static List<LocatedFileStatus> listLocatedStatus(final FileSystem fs, final Path dir)
626    throws IOException {
627    List<LocatedFileStatus> status = null;
628    try {
629      RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fs.listFiles(dir, false);
630      while (locatedFileStatusRemoteIterator.hasNext()) {
631        if (status == null) {
632          status = Lists.newArrayList();
633        }
634        status.add(locatedFileStatusRemoteIterator.next());
635      }
636    } catch (FileNotFoundException fnfe) {
637      // if directory doesn't exist, return null
638      if (LOG.isTraceEnabled()) {
639        LOG.trace("{} doesn't exist", dir);
640      }
641    }
642    return status;
643  }
644
645  /**
646   * Calls fs.delete() and returns the value returned by the fs.delete()
647   * @param fs        must not be null
648   * @param path      must not be null
649   * @param recursive delete tree rooted at path
650   * @return the value returned by the fs.delete()
651   * @throws IOException from underlying FileSystem
652   */
653  public static boolean delete(final FileSystem fs, final Path path, final boolean recursive)
654    throws IOException {
655    return fs.delete(path, recursive);
656  }
657
658  /**
659   * Calls fs.exists(). Checks if the specified path exists
660   * @param fs   must not be null
661   * @param path must not be null
662   * @return the value returned by fs.exists()
663   * @throws IOException from underlying FileSystem
664   */
665  public static boolean isExists(final FileSystem fs, final Path path) throws IOException {
666    return fs.exists(path);
667  }
668
669  /**
670   * Log the current state of the filesystem from a certain root directory
671   * @param fs   filesystem to investigate
672   * @param root root file/directory to start logging from
673   * @param log  log to output information
674   * @throws IOException if an unexpected exception occurs
675   */
676  public static void logFileSystemState(final FileSystem fs, final Path root, Logger log)
677    throws IOException {
678    log.debug("File system contents for path {}", root);
679    logFSTree(log, fs, root, "|-");
680  }
681
682  /**
683   * Recursive helper to log the state of the FS
684   * @see #logFileSystemState(FileSystem, Path, Logger)
685   */
686  private static void logFSTree(Logger log, final FileSystem fs, final Path root, String prefix)
687    throws IOException {
688    FileStatus[] files = listStatus(fs, root, null);
689    if (files == null) {
690      return;
691    }
692
693    for (FileStatus file : files) {
694      if (file.isDirectory()) {
695        log.debug(prefix + file.getPath().getName() + "/");
696        logFSTree(log, fs, file.getPath(), prefix + "---");
697      } else {
698        log.debug(prefix + file.getPath().getName());
699      }
700    }
701  }
702
703  public static boolean renameAndSetModifyTime(final FileSystem fs, final Path src, final Path dest)
704    throws IOException {
705    // set the modify time for TimeToLive Cleaner
706    fs.setTimes(src, EnvironmentEdgeManager.currentTime(), -1);
707    return fs.rename(src, dest);
708  }
709
710  /**
711   * Check if short circuit read buffer size is set and if not, set it to hbase value.
712   * @param conf must not be null
713   */
714  public static void checkShortCircuitReadBufferSize(final Configuration conf) {
715    final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2;
716    final int notSet = -1;
717    // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2
718    final String dfsKey = "dfs.client.read.shortcircuit.buffer.size";
719    int size = conf.getInt(dfsKey, notSet);
720    // If a size is set, return -- we will use it.
721    if (size != notSet) {
722      return;
723    }
724    // But short circuit buffer size is normally not set. Put in place the hbase wanted size.
725    int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize);
726    conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
727  }
728
729  /**
730   * Helper exception for those cases where the place where we need to check a stream capability is
731   * not where we have the needed context to explain the impact and mitigation for a lack.
732   */
733  public static class StreamLacksCapabilityException extends Exception {
734    public StreamLacksCapabilityException(String message, Throwable cause) {
735      super(message, cause);
736    }
737
738    public StreamLacksCapabilityException(String message) {
739      super(message);
740    }
741  }
742}