001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import static org.apache.hadoop.fs.CommonPathCapabilities.FS_STORAGEPOLICY;
021
022import java.io.FileNotFoundException;
023import java.io.IOException;
024import java.net.URI;
025import java.net.URISyntaxException;
026import java.util.List;
027import java.util.Locale;
028import java.util.Map;
029import java.util.concurrent.ConcurrentHashMap;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FSDataOutputStream;
032import org.apache.hadoop.fs.FileStatus;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.LocatedFileStatus;
035import org.apache.hadoop.fs.Path;
036import org.apache.hadoop.fs.PathFilter;
037import org.apache.hadoop.fs.RemoteIterator;
038import org.apache.hadoop.fs.permission.FsPermission;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
046
047/**
048 * Utility methods for interacting with the underlying file system.
049 * <p/>
050 * Note that {@link #setStoragePolicy(FileSystem, Path, String)} is tested in TestFSUtils and
051 * pre-commit will run the hbase-server tests if there's code change in this class. See
052 * <a href="https://issues.apache.org/jira/browse/HBASE-20838">HBASE-20838</a> for more details.
053 */
054@InterfaceAudience.Private
055public final class CommonFSUtils {
056  private static final Logger LOG = LoggerFactory.getLogger(CommonFSUtils.class);
057
058  /** Parameter name for HBase WAL directory */
059  public static final String HBASE_WAL_DIR = "hbase.wal.dir";
060
061  /** Parameter to disable stream capability enforcement checks */
062  public static final String UNSAFE_STREAM_CAPABILITY_ENFORCE =
063    "hbase.unsafe.stream.capability.enforce";
064
065  /** Full access permissions (starting point for a umask) */
066  public static final String FULL_RWX_PERMISSIONS = "777";
067
068  private CommonFSUtils() {
069  }
070
071  /**
072   * Compare of path component. Does not consider schema; i.e. if schemas different but
073   * <code>path</code> starts with <code>rootPath</code>, then the function returns true
074   * @param rootPath value to check for
075   * @param path     subject to check
076   * @return True if <code>path</code> starts with <code>rootPath</code>
077   */
078  public static boolean isStartingWithPath(final Path rootPath, final String path) {
079    String uriRootPath = rootPath.toUri().getPath();
080    String tailUriPath = new Path(path).toUri().getPath();
081    return tailUriPath.startsWith(uriRootPath);
082  }
083
084  /**
085   * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
086   * '/a/b/c' part. Does not consider schema; i.e. if schemas different but path or subpath matches,
087   * the two will equate.
088   * @param pathToSearch Path we will be trying to match against.
089   * @param pathTail     what to match
090   * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
091   */
092  public static boolean isMatchingTail(final Path pathToSearch, String pathTail) {
093    return isMatchingTail(pathToSearch, new Path(pathTail));
094  }
095
096  /**
097   * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
098   * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true. Does not consider
099   * schema; i.e. if schemas different but path or subpath matches, the two will equate.
100   * @param pathToSearch Path we will be trying to match agains against
101   * @param pathTail     what to match
102   * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
103   */
104  public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) {
105    if (pathToSearch.depth() != pathTail.depth()) {
106      return false;
107    }
108    Path tailPath = pathTail;
109    String tailName;
110    Path toSearch = pathToSearch;
111    String toSearchName;
112    boolean result = false;
113    do {
114      tailName = tailPath.getName();
115      if (tailName == null || tailName.length() <= 0) {
116        result = true;
117        break;
118      }
119      toSearchName = toSearch.getName();
120      if (toSearchName == null || toSearchName.length() <= 0) {
121        break;
122      }
123      // Move up a parent on each path for next go around. Path doesn't let us go off the end.
124      tailPath = tailPath.getParent();
125      toSearch = toSearch.getParent();
126    } while (tailName.equals(toSearchName));
127    return result;
128  }
129
130  /**
131   * Delete if exists.
132   * @param fs  filesystem object
133   * @param dir directory to delete
134   * @return True if deleted <code>dir</code>
135   * @throws IOException e
136   */
137  public static boolean deleteDirectory(final FileSystem fs, final Path dir) throws IOException {
138    return fs.exists(dir) && fs.delete(dir, true);
139  }
140
141  /**
142   * Return the number of bytes that large input files should be optimally be split into to minimize
143   * i/o time.
144   * @param fs filesystem object
145   * @return the default block size for the path's filesystem
146   */
147  public static long getDefaultBlockSize(final FileSystem fs, final Path path) {
148    return fs.getDefaultBlockSize(path);
149  }
150
151  /*
152   * Get the default replication.
153   * @param fs filesystem object
154   * @param f path of file
155   * @return default replication for the path's filesystem
156   */
157  public static short getDefaultReplication(final FileSystem fs, final Path path) {
158    return fs.getDefaultReplication(path);
159  }
160
161  /**
162   * Returns the default buffer size to use during writes. The size of the buffer should probably be
163   * a multiple of hardware page size (4096 on Intel x86), and it determines how much data is
164   * buffered during read and write operations.
165   * @param fs filesystem object
166   * @return default buffer size to use during writes
167   */
168  public static int getDefaultBufferSize(final FileSystem fs) {
169    return fs.getConf().getInt("io.file.buffer.size", 4096);
170  }
171
172  /**
173   * Create the specified file on the filesystem. By default, this will:
174   * <ol>
175   * <li>apply the umask in the configuration (if it is enabled)</li>
176   * <li>use the fs configured buffer size (or 4096 if not set)</li>
177   * <li>use the default replication</li>
178   * <li>use the default block size</li>
179   * <li>not track progress</li>
180   * </ol>
181   * @param fs        {@link FileSystem} on which to write the file
182   * @param path      {@link Path} to the file to write
183   * @param perm      intial permissions
184   * @param overwrite Whether or not the created file should be overwritten.
185   * @return output stream to the created file
186   * @throws IOException if the file cannot be created
187   */
188  public static FSDataOutputStream create(FileSystem fs, Path path, FsPermission perm,
189    boolean overwrite) throws IOException {
190    return create(fs, path, perm, overwrite, true);
191  }
192
193  /**
194   * Create the specified file on the filesystem. By default, this will:
195   * <ol>
196   * <li>apply the umask in the configuration (if it is enabled)</li>
197   * <li>use the fs configured buffer size (or 4096 if not set)</li>
198   * <li>use the default replication</li>
199   * <li>use the default block size</li>
200   * <li>not track progress</li>
201   * </ol>
202   * @param fs                {@link FileSystem} on which to write the file
203   * @param path              {@link Path} to the file to write
204   * @param perm              intial permissions
205   * @param overwrite         Whether or not the created file should be overwritten.
206   * @param isRecursiveCreate recursively create parent directories
207   * @return output stream to the created file
208   * @throws IOException if the file cannot be created
209   */
210  public static FSDataOutputStream create(FileSystem fs, Path path, FsPermission perm,
211    boolean overwrite, boolean isRecursiveCreate) throws IOException {
212    if (LOG.isTraceEnabled()) {
213      LOG.trace("Creating file={} with permission={}, overwrite={}, recursive={}", path, perm,
214        overwrite, isRecursiveCreate);
215    }
216    if (isRecursiveCreate) {
217      return fs.create(path, perm, overwrite, getDefaultBufferSize(fs),
218        getDefaultReplication(fs, path), getDefaultBlockSize(fs, path), null);
219    } else {
220      return fs.createNonRecursive(path, perm, overwrite, getDefaultBufferSize(fs),
221        getDefaultReplication(fs, path), getDefaultBlockSize(fs, path), null);
222    }
223  }
224
225  /**
226   * Get the file permissions specified in the configuration, if they are enabled.
227   * @param fs               filesystem that the file will be created on.
228   * @param conf             configuration to read for determining if permissions are enabled and
229   *                         which to use
230   * @param permssionConfKey property key in the configuration to use when finding the permission
231   * @return the permission to use when creating a new file on the fs. If special permissions are
232   *         not specified in the configuration, then the default permissions on the the fs will be
233   *         returned.
234   */
235  public static FsPermission getFilePermissions(final FileSystem fs, final Configuration conf,
236    final String permssionConfKey) {
237    boolean enablePermissions = conf.getBoolean(HConstants.ENABLE_DATA_FILE_UMASK, false);
238
239    if (enablePermissions) {
240      try {
241        FsPermission perm = new FsPermission(FULL_RWX_PERMISSIONS);
242        // make sure that we have a mask, if not, go default.
243        String mask = conf.get(permssionConfKey);
244        if (mask == null) {
245          return FsPermission.getFileDefault();
246        }
247        // appy the umask
248        FsPermission umask = new FsPermission(mask);
249        return perm.applyUMask(umask);
250      } catch (IllegalArgumentException e) {
251        LOG.warn("Incorrect umask attempted to be created: " + conf.get(permssionConfKey)
252          + ", using default file permissions.", e);
253        return FsPermission.getFileDefault();
254      }
255    }
256    return FsPermission.getFileDefault();
257  }
258
259  /**
260   * Verifies root directory path is a valid URI with a scheme
261   * @param root root directory path
262   * @return Passed <code>root</code> argument.
263   * @throws IOException if not a valid URI with a scheme
264   */
265  public static Path validateRootPath(Path root) throws IOException {
266    try {
267      URI rootURI = new URI(root.toString());
268      String scheme = rootURI.getScheme();
269      if (scheme == null) {
270        throw new IOException("Root directory does not have a scheme");
271      }
272      return root;
273    } catch (URISyntaxException e) {
274      throw new IOException("Root directory path is not a valid " + "URI -- check your "
275        + HConstants.HBASE_DIR + " configuration", e);
276    }
277  }
278
279  /**
280   * Checks for the presence of the WAL log root path (using the provided conf object) in the given
281   * path. If it exists, this method removes it and returns the String representation of remaining
282   * relative path.
283   * @param path must not be null
284   * @param conf must not be null
285   * @return String representation of the remaining relative path
286   * @throws IOException from underlying filesystem
287   */
288  public static String removeWALRootPath(Path path, final Configuration conf) throws IOException {
289    Path root = getWALRootDir(conf);
290    String pathStr = path.toString();
291    // check that the path is absolute... it has the root path in it.
292    if (!pathStr.startsWith(root.toString())) {
293      return pathStr;
294    }
295    // if not, return as it is.
296    return pathStr.substring(root.toString().length() + 1);// remove the "/" too.
297  }
298
299  /**
300   * Return the 'path' component of a Path. In Hadoop, Path is a URI. This method returns the 'path'
301   * component of a Path's URI: e.g. If a Path is
302   * <code>hdfs://example.org:9000/hbase_trunk/TestTable/compaction.dir</code>, this method returns
303   * <code>/hbase_trunk/TestTable/compaction.dir</code>. This method is useful if you want to print
304   * out a Path without qualifying Filesystem instance.
305   * @param p Filesystem Path whose 'path' component we are to return.
306   * @return Path portion of the Filesystem
307   */
308  public static String getPath(Path p) {
309    return p.toUri().getPath();
310  }
311
312  /**
313   * Get the path for the root data directory
314   * @param c configuration
315   * @return {@link Path} to hbase root directory from configuration as a qualified Path.
316   * @throws IOException e
317   */
318  public static Path getRootDir(final Configuration c) throws IOException {
319    Path p = new Path(c.get(HConstants.HBASE_DIR));
320    FileSystem fs = p.getFileSystem(c);
321    return p.makeQualified(fs.getUri(), fs.getWorkingDirectory());
322  }
323
324  public static void setRootDir(final Configuration c, final Path root) {
325    c.set(HConstants.HBASE_DIR, root.toString());
326  }
327
328  public static void setFsDefault(final Configuration c, final Path root) {
329    c.set("fs.defaultFS", root.toString()); // for hadoop 0.21+
330  }
331
332  public static void setFsDefault(final Configuration c, final String uri) {
333    c.set("fs.defaultFS", uri); // for hadoop 0.21+
334  }
335
336  public static FileSystem getRootDirFileSystem(final Configuration c) throws IOException {
337    Path p = getRootDir(c);
338    return p.getFileSystem(c);
339  }
340
341  /**
342   * Get the path for the root directory for WAL data
343   * @param c configuration
344   * @return {@link Path} to hbase log root directory: e.g. {@value HBASE_WAL_DIR} from
345   *         configuration as a qualified Path. Defaults to HBase root dir.
346   * @throws IOException e
347   */
348  public static Path getWALRootDir(final Configuration c) throws IOException {
349    Path p = new Path(c.get(HBASE_WAL_DIR, c.get(HConstants.HBASE_DIR)));
350    if (!isValidWALRootDir(p, c)) {
351      return getRootDir(c);
352    }
353    FileSystem fs = p.getFileSystem(c);
354    return p.makeQualified(fs.getUri(), fs.getWorkingDirectory());
355  }
356
357  /**
358   * Returns the URI in the string format
359   * @param c configuration
360   * @param p path
361   * @return - the URI's to string format
362   */
363  public static String getDirUri(final Configuration c, Path p) throws IOException {
364    if (p.toUri().getScheme() != null) {
365      return p.toUri().toString();
366    }
367    return null;
368  }
369
370  public static void setWALRootDir(final Configuration c, final Path root) {
371    c.set(HBASE_WAL_DIR, root.toString());
372  }
373
374  public static FileSystem getWALFileSystem(final Configuration c) throws IOException {
375    Path p = getWALRootDir(c);
376    FileSystem fs = p.getFileSystem(c);
377    // hadoop-core does fs caching, so need to propagate this if set
378    String enforceStreamCapability = c.get(UNSAFE_STREAM_CAPABILITY_ENFORCE);
379    if (enforceStreamCapability != null) {
380      fs.getConf().set(UNSAFE_STREAM_CAPABILITY_ENFORCE, enforceStreamCapability);
381    }
382    return fs;
383  }
384
385  private static boolean isValidWALRootDir(Path walDir, final Configuration c) throws IOException {
386    Path rootDir = getRootDir(c);
387    FileSystem fs = walDir.getFileSystem(c);
388    Path qualifiedWalDir = walDir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
389    if (!qualifiedWalDir.equals(rootDir)) {
390      if (qualifiedWalDir.toString().startsWith(rootDir.toString() + "/")) {
391        throw new IllegalStateException("Illegal WAL directory specified. "
392          + "WAL directories are not permitted to be under root directory: rootDir="
393          + rootDir.toString() + ", qualifiedWALDir=" + qualifiedWalDir);
394      }
395    }
396    return true;
397  }
398
399  /**
400   * Returns the WAL region directory based on the given table name and region name
401   * @param conf              configuration to determine WALRootDir
402   * @param tableName         Table that the region is under
403   * @param encodedRegionName Region name used for creating the final region directory
404   * @return the region directory used to store WALs under the WALRootDir
405   * @throws IOException if there is an exception determining the WALRootDir
406   */
407  public static Path getWALRegionDir(final Configuration conf, final TableName tableName,
408    final String encodedRegionName) throws IOException {
409    return new Path(getWALTableDir(conf, tableName), encodedRegionName);
410  }
411
412  /**
413   * Returns the Table directory under the WALRootDir for the specified table name
414   * @param conf      configuration used to get the WALRootDir
415   * @param tableName Table to get the directory for
416   * @return a path to the WAL table directory for the specified table
417   * @throws IOException if there is an exception determining the WALRootDir
418   */
419  public static Path getWALTableDir(final Configuration conf, final TableName tableName)
420    throws IOException {
421    Path baseDir = new Path(getWALRootDir(conf), HConstants.BASE_NAMESPACE_DIR);
422    return new Path(new Path(baseDir, tableName.getNamespaceAsString()),
423      tableName.getQualifierAsString());
424  }
425
426  /**
427   * For backward compatibility with HBASE-20734, where we store recovered edits in a wrong
428   * directory without BASE_NAMESPACE_DIR. See HBASE-22617 for more details.
429   * @deprecated For compatibility, will be removed in 4.0.0.
430   */
431  @Deprecated
432  public static Path getWrongWALRegionDir(final Configuration conf, final TableName tableName,
433    final String encodedRegionName) throws IOException {
434    Path wrongTableDir = new Path(new Path(getWALRootDir(conf), tableName.getNamespaceAsString()),
435      tableName.getQualifierAsString());
436    return new Path(wrongTableDir, encodedRegionName);
437  }
438
439  /**
440   * Returns the {@link org.apache.hadoop.fs.Path} object representing the table directory under
441   * path rootdir
442   * @param rootdir   qualified path of HBase root directory
443   * @param tableName name of table
444   * @return {@link org.apache.hadoop.fs.Path} for table
445   */
446  public static Path getTableDir(Path rootdir, final TableName tableName) {
447    return new Path(getNamespaceDir(rootdir, tableName.getNamespaceAsString()),
448      tableName.getQualifierAsString());
449  }
450
451  /**
452   * Returns the {@link org.apache.hadoop.fs.Path} object representing the region directory under
453   * path rootdir
454   * @param rootdir    qualified path of HBase root directory
455   * @param tableName  name of table
456   * @param regionName The encoded region name
457   * @return {@link org.apache.hadoop.fs.Path} for region
458   */
459  public static Path getRegionDir(Path rootdir, TableName tableName, String regionName) {
460    return new Path(getTableDir(rootdir, tableName), regionName);
461  }
462
463  /**
464   * Returns the {@link org.apache.hadoop.hbase.TableName} object representing the table directory
465   * under path rootdir
466   * @param tablePath path of table
467   * @return {@link org.apache.hadoop.fs.Path} for table
468   */
469  public static TableName getTableName(Path tablePath) {
470    return TableName.valueOf(tablePath.getParent().getName(), tablePath.getName());
471  }
472
473  /**
474   * Returns the {@link org.apache.hadoop.fs.Path} object representing the namespace directory under
475   * path rootdir
476   * @param rootdir   qualified path of HBase root directory
477   * @param namespace namespace name
478   * @return {@link org.apache.hadoop.fs.Path} for table
479   */
480  public static Path getNamespaceDir(Path rootdir, final String namespace) {
481    return new Path(rootdir, new Path(HConstants.BASE_NAMESPACE_DIR, new Path(namespace)));
482  }
483
484  // this mapping means that under a federated FileSystem implementation, we'll
485  // only log the first failure from any of the underlying FileSystems at WARN and all others
486  // will be at DEBUG.
487  private static final Map<FileSystem, Boolean> warningMap = new ConcurrentHashMap<>();
488
489  /**
490   * Sets storage policy for given path. If the passed path is a directory, we'll set the storage
491   * policy for all files created in the future in said directory. Note that this change in storage
492   * policy takes place at the FileSystem level; it will persist beyond this RS's lifecycle. If
493   * we're running on a version of FileSystem that doesn't support the given storage policy (or
494   * storage policies at all), then we'll issue a log message and continue. See
495   * http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html
496   * @param fs            We only do anything it implements a setStoragePolicy method
497   * @param path          the Path whose storage policy is to be set
498   * @param storagePolicy Policy to set on <code>path</code>; see hadoop 2.6+
499   *                      org.apache.hadoop.hdfs.protocol.HdfsConstants for possible list e.g
500   *                      'COLD', 'WARM', 'HOT', 'ONE_SSD', 'ALL_SSD', 'LAZY_PERSIST'.
501   */
502  public static void setStoragePolicy(final FileSystem fs, final Path path,
503    final String storagePolicy) {
504    try {
505      setStoragePolicy(fs, path, storagePolicy, false);
506    } catch (IOException e) {
507      // should never arrive here
508      LOG.warn("We have chosen not to throw exception but some unexpectedly thrown out", e);
509    }
510  }
511
512  static void setStoragePolicy(final FileSystem fs, final Path path, final String storagePolicy,
513    boolean throwException) throws IOException {
514    if (storagePolicy == null) {
515      if (LOG.isTraceEnabled()) {
516        LOG.trace("We were passed a null storagePolicy, exiting early.");
517      }
518      return;
519    }
520    String trimmedStoragePolicy = storagePolicy.trim();
521    if (trimmedStoragePolicy.isEmpty()) {
522      LOG.trace("We were passed an empty storagePolicy, exiting early.");
523      return;
524    } else {
525      trimmedStoragePolicy = trimmedStoragePolicy.toUpperCase(Locale.ROOT);
526    }
527    if (trimmedStoragePolicy.equals(HConstants.DEFER_TO_HDFS_STORAGE_POLICY)) {
528      LOG.trace("We were passed the defer-to-hdfs policy {}, exiting early.", trimmedStoragePolicy);
529      return;
530    }
531    try {
532      invokeSetStoragePolicy(fs, path, trimmedStoragePolicy);
533    } catch (IOException e) {
534      LOG.trace("Failed to invoke set storage policy API on FS", e);
535      if (throwException) {
536        throw e;
537      }
538    }
539  }
540
541  /*
542   * All args have been checked and are good. Run the setStoragePolicy invocation.
543   */
544  private static void invokeSetStoragePolicy(final FileSystem fs, final Path path,
545    final String storagePolicy) throws IOException {
546    Exception toThrow = null;
547
548    if (!fs.hasPathCapability(path, FS_STORAGEPOLICY)) {
549      LOG.debug("The file system does not support storage policy.");
550      return;
551    }
552
553    try {
554      fs.setStoragePolicy(path, storagePolicy);
555      LOG.debug("Set storagePolicy={} for path={}", storagePolicy, path);
556    } catch (Exception e) {
557      toThrow = e;
558      // This swallows FNFE, should we be throwing it? seems more likely to indicate dev
559      // misuse than a runtime problem with HDFS.
560      if (!warningMap.containsKey(fs)) {
561        warningMap.put(fs, true);
562        LOG.warn("Unable to set storagePolicy=" + storagePolicy + " for path=" + path + ". "
563          + "DEBUG log level might have more details.", e);
564      } else if (LOG.isDebugEnabled()) {
565        LOG.debug("Unable to set storagePolicy=" + storagePolicy + " for path=" + path, e);
566      }
567
568      // Hadoop 2.8+, 3.0-a1+ added FileSystem.setStoragePolicy with a default implementation
569      // that throws UnsupportedOperationException
570      if (e instanceof UnsupportedOperationException) {
571        if (LOG.isDebugEnabled()) {
572          LOG.debug("The underlying FileSystem implementation doesn't support "
573            + "setStoragePolicy. This is probably intentional on their part, since HDFS-9345 "
574            + "appears to be present in your version of Hadoop. For more information check "
575            + "the Hadoop documentation on 'ArchivalStorage', the Hadoop FileSystem "
576            + "specification docs from HADOOP-11981, and/or related documentation from the "
577            + "provider of the underlying FileSystem (its name should appear in the "
578            + "stacktrace that accompanies this message). Note in particular that Hadoop's "
579            + "local filesystem implementation doesn't support storage policies.", e);
580        }
581      }
582    }
583
584    if (toThrow != null) {
585      throw new IOException(toThrow);
586    }
587  }
588
589  /**
590   * Return true if this is a filesystem whose scheme is 'hdfs'.
591   * @throws IOException from underlying FileSystem
592   */
593  public static boolean isHDFS(final Configuration conf) throws IOException {
594    FileSystem fs = FileSystem.get(conf);
595    String scheme = fs.getUri().getScheme();
596    return scheme.equalsIgnoreCase("hdfs");
597  }
598
599  /**
600   * Checks if the given path is the one with 'recovered.edits' dir.
601   * @param path must not be null
602   * @return True if we recovered edits
603   */
604  public static boolean isRecoveredEdits(Path path) {
605    return path.toString().contains(HConstants.RECOVERED_EDITS_DIR);
606  }
607
608  /**
609   * Returns the filesystem of the hbase rootdir.
610   * @throws IOException from underlying FileSystem
611   */
612  public static FileSystem getCurrentFileSystem(Configuration conf) throws IOException {
613    return getRootDir(conf).getFileSystem(conf);
614  }
615
616  /**
617   * Calls fs.listStatus() and treats FileNotFoundException as non-fatal This accommodates
618   * differences between hadoop versions, where hadoop 1 does not throw a FileNotFoundException, and
619   * return an empty FileStatus[] while Hadoop 2 will throw FileNotFoundException. Where possible,
620   * prefer FSUtils#listStatusWithStatusFilter(FileSystem, Path, FileStatusFilter) instead.
621   * @param fs     file system
622   * @param dir    directory
623   * @param filter path filter
624   * @return null if dir is empty or doesn't exist, otherwise FileStatus array
625   */
626  public static FileStatus[] listStatus(final FileSystem fs, final Path dir,
627    final PathFilter filter) throws IOException {
628    FileStatus[] status = null;
629    try {
630      status = filter == null ? fs.listStatus(dir) : fs.listStatus(dir, filter);
631    } catch (FileNotFoundException fnfe) {
632      // if directory doesn't exist, return null
633      if (LOG.isTraceEnabled()) {
634        LOG.trace("{} doesn't exist", dir);
635      }
636    }
637    if (status == null || status.length < 1) {
638      return null;
639    }
640    return status;
641  }
642
643  /**
644   * Calls fs.listStatus() and treats FileNotFoundException as non-fatal This would accommodates
645   * differences between hadoop versions
646   * @param fs  file system
647   * @param dir directory
648   * @return null if dir is empty or doesn't exist, otherwise FileStatus array
649   */
650  public static FileStatus[] listStatus(final FileSystem fs, final Path dir) throws IOException {
651    return listStatus(fs, dir, null);
652  }
653
654  /**
655   * Calls fs.listFiles() to get FileStatus and BlockLocations together for reducing rpc call
656   * @param fs  file system
657   * @param dir directory
658   * @return LocatedFileStatus list
659   */
660  public static List<LocatedFileStatus> listLocatedStatus(final FileSystem fs, final Path dir)
661    throws IOException {
662    List<LocatedFileStatus> status = null;
663    try {
664      RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fs.listFiles(dir, false);
665      while (locatedFileStatusRemoteIterator.hasNext()) {
666        if (status == null) {
667          status = Lists.newArrayList();
668        }
669        status.add(locatedFileStatusRemoteIterator.next());
670      }
671    } catch (FileNotFoundException fnfe) {
672      // if directory doesn't exist, return null
673      if (LOG.isTraceEnabled()) {
674        LOG.trace("{} doesn't exist", dir);
675      }
676    }
677    return status;
678  }
679
680  /**
681   * Calls fs.delete() and returns the value returned by the fs.delete()
682   * @param fs        must not be null
683   * @param path      must not be null
684   * @param recursive delete tree rooted at path
685   * @return the value returned by the fs.delete()
686   * @throws IOException from underlying FileSystem
687   */
688  public static boolean delete(final FileSystem fs, final Path path, final boolean recursive)
689    throws IOException {
690    return fs.delete(path, recursive);
691  }
692
693  /**
694   * Calls fs.exists(). Checks if the specified path exists
695   * @param fs   must not be null
696   * @param path must not be null
697   * @return the value returned by fs.exists()
698   * @throws IOException from underlying FileSystem
699   */
700  public static boolean isExists(final FileSystem fs, final Path path) throws IOException {
701    return fs.exists(path);
702  }
703
704  /**
705   * Log the current state of the filesystem from a certain root directory
706   * @param fs   filesystem to investigate
707   * @param root root file/directory to start logging from
708   * @param log  log to output information
709   * @throws IOException if an unexpected exception occurs
710   */
711  public static void logFileSystemState(final FileSystem fs, final Path root, Logger log)
712    throws IOException {
713    log.debug("File system contents for path {}", root);
714    logFSTree(log, fs, root, "|-");
715  }
716
717  /**
718   * Recursive helper to log the state of the FS
719   * @see #logFileSystemState(FileSystem, Path, Logger)
720   */
721  private static void logFSTree(Logger log, final FileSystem fs, final Path root, String prefix)
722    throws IOException {
723    FileStatus[] files = listStatus(fs, root, null);
724    if (files == null) {
725      return;
726    }
727
728    for (FileStatus file : files) {
729      if (file.isDirectory()) {
730        log.debug(prefix + file.getPath().getName() + "/");
731        logFSTree(log, fs, file.getPath(), prefix + "---");
732      } else {
733        log.debug(prefix + file.getPath().getName());
734      }
735    }
736  }
737
738  public static boolean renameAndSetModifyTime(final FileSystem fs, final Path src, final Path dest)
739    throws IOException {
740    // set the modify time for TimeToLive Cleaner
741    fs.setTimes(src, EnvironmentEdgeManager.currentTime(), -1);
742    return fs.rename(src, dest);
743  }
744
745  /**
746   * Check if short circuit read buffer size is set and if not, set it to hbase value.
747   * @param conf must not be null
748   */
749  public static void checkShortCircuitReadBufferSize(final Configuration conf) {
750    final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2;
751    final int notSet = -1;
752    // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2
753    final String dfsKey = "dfs.client.read.shortcircuit.buffer.size";
754    int size = conf.getInt(dfsKey, notSet);
755    // If a size is set, return -- we will use it.
756    if (size != notSet) {
757      return;
758    }
759    // But short circuit buffer size is normally not set. Put in place the hbase wanted size.
760    int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize);
761    conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
762  }
763
764  /**
765   * Helper exception for those cases where the place where we need to check a stream capability is
766   * not where we have the needed context to explain the impact and mitigation for a lack.
767   */
768  public static class StreamLacksCapabilityException extends Exception {
769    public StreamLacksCapabilityException(String message, Throwable cause) {
770      super(message, cause);
771    }
772
773    public StreamLacksCapabilityException(String message) {
774      super(message);
775    }
776  }
777}