001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.util;
020
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.lang.reflect.InvocationTargetException;
024import java.lang.reflect.Method;
025import java.net.URI;
026import java.net.URISyntaxException;
027import java.util.List;
028import java.util.Locale;
029import java.util.Map;
030import java.util.Objects;
031import java.util.concurrent.ConcurrentHashMap;
032import org.apache.hadoop.HadoopIllegalArgumentException;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FSDataOutputStream;
035import org.apache.hadoop.fs.FileStatus;
036import org.apache.hadoop.fs.FileSystem;
037import org.apache.hadoop.fs.LocatedFileStatus;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.fs.PathFilter;
040import org.apache.hadoop.fs.RemoteIterator;
041import org.apache.hadoop.fs.permission.FsPermission;
042import org.apache.hadoop.hbase.HConstants;
043import org.apache.hadoop.hbase.TableName;
044import org.apache.hadoop.ipc.RemoteException;
045import org.apache.yetus.audience.InterfaceAudience;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
050import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
051
052/**
053 * Utility methods for interacting with the underlying file system.
054 * <p/>
055 * Note that {@link #setStoragePolicy(FileSystem, Path, String)} is tested in TestFSUtils and
056 * pre-commit will run the hbase-server tests if there's code change in this class. See
057 * <a href="https://issues.apache.org/jira/browse/HBASE-20838">HBASE-20838</a> for more details.
058 */
059@InterfaceAudience.Private
060public abstract class CommonFSUtils {
061  private static final Logger LOG = LoggerFactory.getLogger(CommonFSUtils.class);
062
063  /** Parameter name for HBase WAL directory */
064  public static final String HBASE_WAL_DIR = "hbase.wal.dir";
065
066  /** Parameter to disable stream capability enforcement checks */
067  public static final String UNSAFE_STREAM_CAPABILITY_ENFORCE =
068    "hbase.unsafe.stream.capability.enforce";
069
070  /** Full access permissions (starting point for a umask) */
071  public static final String FULL_RWX_PERMISSIONS = "777";
072
073  protected CommonFSUtils() {
074    super();
075  }
076
077  /**
078   * Compare of path component. Does not consider schema; i.e. if schemas
079   * different but <code>path</code> starts with <code>rootPath</code>,
080   * then the function returns true
081   * @param rootPath value to check for
082   * @param path subject to check
083   * @return True if <code>path</code> starts with <code>rootPath</code>
084   */
085  public static boolean isStartingWithPath(final Path rootPath, final String path) {
086    String uriRootPath = rootPath.toUri().getPath();
087    String tailUriPath = (new Path(path)).toUri().getPath();
088    return tailUriPath.startsWith(uriRootPath);
089  }
090
091  /**
092   * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
093   * '/a/b/c' part. Does not consider schema; i.e. if schemas different but path or subpath matches,
094   * the two will equate.
095   * @param pathToSearch Path we will be trying to match against.
096   * @param pathTail what to match
097   * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
098   */
099  public static boolean isMatchingTail(final Path pathToSearch, String pathTail) {
100    return isMatchingTail(pathToSearch, new Path(pathTail));
101  }
102
103  /**
104   * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
105   * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true.  Does not consider
106   * schema; i.e. if schemas different but path or subpath matches, the two will equate.
107   * @param pathToSearch Path we will be trying to match agains against
108   * @param pathTail what to match
109   * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
110   */
111  public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) {
112    if (pathToSearch.depth() != pathTail.depth()) {
113      return false;
114    }
115    Path tailPath = pathTail;
116    String tailName;
117    Path toSearch = pathToSearch;
118    String toSearchName;
119    boolean result = false;
120    do {
121      tailName = tailPath.getName();
122      if (tailName == null || tailName.length() <= 0) {
123        result = true;
124        break;
125      }
126      toSearchName = toSearch.getName();
127      if (toSearchName == null || toSearchName.length() <= 0) {
128        break;
129      }
130      // Move up a parent on each path for next go around.  Path doesn't let us go off the end.
131      tailPath = tailPath.getParent();
132      toSearch = toSearch.getParent();
133    } while(tailName.equals(toSearchName));
134    return result;
135  }
136
137  /**
138   * Delete if exists.
139   * @param fs filesystem object
140   * @param dir directory to delete
141   * @return True if deleted <code>dir</code>
142   * @throws IOException e
143   */
144  public static boolean deleteDirectory(final FileSystem fs, final Path dir) throws IOException {
145    return fs.exists(dir) && fs.delete(dir, true);
146  }
147
148  /**
149   * Return the number of bytes that large input files should be optimally
150   * be split into to minimize i/o time.
151   *
152   * @param fs filesystem object
153   * @return the default block size for the path's filesystem
154   */
155  public static long getDefaultBlockSize(final FileSystem fs, final Path path) {
156    return fs.getDefaultBlockSize(path);
157  }
158
159  /*
160   * Get the default replication.
161   *
162   * @param fs filesystem object
163   * @param f path of file
164   * @return default replication for the path's filesystem
165   */
166  public static short getDefaultReplication(final FileSystem fs, final Path path) {
167    return fs.getDefaultReplication(path);
168  }
169
170  /**
171   * Returns the default buffer size to use during writes.
172   *
173   * The size of the buffer should probably be a multiple of hardware
174   * page size (4096 on Intel x86), and it determines how much data is
175   * buffered during read and write operations.
176   *
177   * @param fs filesystem object
178   * @return default buffer size to use during writes
179   */
180  public static int getDefaultBufferSize(final FileSystem fs) {
181    return fs.getConf().getInt("io.file.buffer.size", 4096);
182  }
183
184  /**
185   * Create the specified file on the filesystem. By default, this will:
186   * <ol>
187   * <li>apply the umask in the configuration (if it is enabled)</li>
188   * <li>use the fs configured buffer size (or 4096 if not set)</li>
189   * <li>use the default replication</li>
190   * <li>use the default block size</li>
191   * <li>not track progress</li>
192   * </ol>
193   *
194   * @param fs {@link FileSystem} on which to write the file
195   * @param path {@link Path} to the file to write
196   * @param perm intial permissions
197   * @param overwrite Whether or not the created file should be overwritten.
198   * @return output stream to the created file
199   * @throws IOException if the file cannot be created
200   */
201  public static FSDataOutputStream create(FileSystem fs, Path path,
202      FsPermission perm, boolean overwrite) throws IOException {
203    if (LOG.isTraceEnabled()) {
204      LOG.trace("Creating file={} with permission={}, overwrite={}", path, perm, overwrite);
205    }
206    return fs.create(path, perm, overwrite, getDefaultBufferSize(fs),
207        getDefaultReplication(fs, path), getDefaultBlockSize(fs, path), null);
208  }
209
210  /**
211   * Get the file permissions specified in the configuration, if they are
212   * enabled.
213   *
214   * @param fs filesystem that the file will be created on.
215   * @param conf configuration to read for determining if permissions are
216   *          enabled and which to use
217   * @param permssionConfKey property key in the configuration to use when
218   *          finding the permission
219   * @return the permission to use when creating a new file on the fs. If
220   *         special permissions are not specified in the configuration, then
221   *         the default permissions on the the fs will be returned.
222   */
223  public static FsPermission getFilePermissions(final FileSystem fs,
224      final Configuration conf, final String permssionConfKey) {
225    boolean enablePermissions = conf.getBoolean(
226        HConstants.ENABLE_DATA_FILE_UMASK, false);
227
228    if (enablePermissions) {
229      try {
230        FsPermission perm = new FsPermission(FULL_RWX_PERMISSIONS);
231        // make sure that we have a mask, if not, go default.
232        String mask = conf.get(permssionConfKey);
233        if (mask == null) {
234          return FsPermission.getFileDefault();
235        }
236        // appy the umask
237        FsPermission umask = new FsPermission(mask);
238        return perm.applyUMask(umask);
239      } catch (IllegalArgumentException e) {
240        LOG.warn(
241            "Incorrect umask attempted to be created: "
242                + conf.get(permssionConfKey)
243                + ", using default file permissions.", e);
244        return FsPermission.getFileDefault();
245      }
246    }
247    return FsPermission.getFileDefault();
248  }
249
250  /**
251   * Verifies root directory path is a valid URI with a scheme
252   *
253   * @param root root directory path
254   * @return Passed <code>root</code> argument.
255   * @throws IOException if not a valid URI with a scheme
256   */
257  public static Path validateRootPath(Path root) throws IOException {
258    try {
259      URI rootURI = new URI(root.toString());
260      String scheme = rootURI.getScheme();
261      if (scheme == null) {
262        throw new IOException("Root directory does not have a scheme");
263      }
264      return root;
265    } catch (URISyntaxException e) {
266      throw new IOException("Root directory path is not a valid " +
267        "URI -- check your " + HConstants.HBASE_DIR + " configuration", e);
268    }
269  }
270
271  /**
272   * Checks for the presence of the WAL log root path (using the provided conf object) in the given
273   * path. If it exists, this method removes it and returns the String representation of remaining
274   * relative path.
275   * @param path must not be null
276   * @param conf must not be null
277   * @return String representation of the remaining relative path
278   * @throws IOException from underlying filesystem
279   */
280  public static String removeWALRootPath(Path path, final Configuration conf) throws IOException {
281    Path root = getWALRootDir(conf);
282    String pathStr = path.toString();
283    // check that the path is absolute... it has the root path in it.
284    if (!pathStr.startsWith(root.toString())) {
285      return pathStr;
286    }
287    // if not, return as it is.
288    return pathStr.substring(root.toString().length() + 1);// remove the "/" too.
289  }
290
291  /**
292   * Return the 'path' component of a Path.  In Hadoop, Path is a URI.  This
293   * method returns the 'path' component of a Path's URI: e.g. If a Path is
294   * <code>hdfs://example.org:9000/hbase_trunk/TestTable/compaction.dir</code>,
295   * this method returns <code>/hbase_trunk/TestTable/compaction.dir</code>.
296   * This method is useful if you want to print out a Path without qualifying
297   * Filesystem instance.
298   * @param p Filesystem Path whose 'path' component we are to return.
299   * @return Path portion of the Filesystem
300   */
301  public static String getPath(Path p) {
302    return p.toUri().getPath();
303  }
304
305  /**
306   * @param c configuration
307   * @return {@link Path} to hbase root directory from
308   *     configuration as a qualified Path.
309   * @throws IOException e
310   */
311  public static Path getRootDir(final Configuration c) throws IOException {
312    Path p = new Path(c.get(HConstants.HBASE_DIR));
313    FileSystem fs = p.getFileSystem(c);
314    return p.makeQualified(fs.getUri(), fs.getWorkingDirectory());
315  }
316
317  public static void setRootDir(final Configuration c, final Path root) {
318    c.set(HConstants.HBASE_DIR, root.toString());
319  }
320
321  public static void setFsDefault(final Configuration c, final Path root) {
322    c.set("fs.defaultFS", root.toString());    // for hadoop 0.21+
323  }
324
325  public static FileSystem getRootDirFileSystem(final Configuration c) throws IOException {
326    Path p = getRootDir(c);
327    return p.getFileSystem(c);
328  }
329
330  /**
331   * @param c configuration
332   * @return {@link Path} to hbase log root directory: e.g. {@value HBASE_WAL_DIR} from
333   *     configuration as a qualified Path. Defaults to HBase root dir.
334   * @throws IOException e
335   */
336  public static Path getWALRootDir(final Configuration c) throws IOException {
337    Path p = new Path(c.get(HBASE_WAL_DIR, c.get(HConstants.HBASE_DIR)));
338    if (!isValidWALRootDir(p, c)) {
339      return getRootDir(c);
340    }
341    FileSystem fs = p.getFileSystem(c);
342    return p.makeQualified(fs.getUri(), fs.getWorkingDirectory());
343  }
344
345  @VisibleForTesting
346  public static void setWALRootDir(final Configuration c, final Path root) {
347    c.set(HBASE_WAL_DIR, root.toString());
348  }
349
350  public static FileSystem getWALFileSystem(final Configuration c) throws IOException {
351    Path p = getWALRootDir(c);
352    FileSystem fs = p.getFileSystem(c);
353    // hadoop-core does fs caching, so need to propogate this if set
354    String enforceStreamCapability = c.get(UNSAFE_STREAM_CAPABILITY_ENFORCE);
355    if (enforceStreamCapability != null) {
356      fs.getConf().set(UNSAFE_STREAM_CAPABILITY_ENFORCE, enforceStreamCapability);
357    }
358    return fs;
359  }
360
361  private static boolean isValidWALRootDir(Path walDir, final Configuration c) throws IOException {
362    Path rootDir = getRootDir(c);
363    FileSystem fs = walDir.getFileSystem(c);
364    Path qualifiedWalDir = walDir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
365    if (!qualifiedWalDir.equals(rootDir)) {
366      if (qualifiedWalDir.toString().startsWith(rootDir.toString() + "/")) {
367        throw new IllegalStateException("Illegal WAL directory specified. " +
368            "WAL directories are not permitted to be under the root directory if set.");
369      }
370    }
371    return true;
372  }
373
374  /**
375   * Returns the WAL region directory based on the given table name and region name
376   * @param conf configuration to determine WALRootDir
377   * @param tableName Table that the region is under
378   * @param encodedRegionName Region name used for creating the final region directory
379   * @return the region directory used to store WALs under the WALRootDir
380   * @throws IOException if there is an exception determining the WALRootDir
381   */
382  public static Path getWALRegionDir(final Configuration conf, final TableName tableName,
383      final String encodedRegionName) throws IOException {
384    return new Path(getWALTableDir(conf, tableName), encodedRegionName);
385  }
386
387  /**
388   * Returns the Table directory under the WALRootDir for the specified table name
389   * @param conf configuration used to get the WALRootDir
390   * @param tableName Table to get the directory for
391   * @return a path to the WAL table directory for the specified table
392   * @throws IOException if there is an exception determining the WALRootDir
393   */
394  public static Path getWALTableDir(final Configuration conf, final TableName tableName)
395      throws IOException {
396    Path baseDir = new Path(getWALRootDir(conf), HConstants.BASE_NAMESPACE_DIR);
397    return new Path(new Path(baseDir, tableName.getNamespaceAsString()),
398      tableName.getQualifierAsString());
399  }
400
401  /**
402   * For backward compatibility with HBASE-20734, where we store recovered edits in a wrong
403   * directory without BASE_NAMESPACE_DIR. See HBASE-22617 for more details.
404   * @deprecated For compatibility, will be removed in 4.0.0.
405   */
406  @Deprecated
407  public static Path getWrongWALRegionDir(final Configuration conf, final TableName tableName,
408      final String encodedRegionName) throws IOException {
409    Path wrongTableDir = new Path(new Path(getWALRootDir(conf), tableName.getNamespaceAsString()),
410      tableName.getQualifierAsString());
411    return new Path(wrongTableDir, encodedRegionName);
412  }
413
414  /**
415   * Returns the {@link org.apache.hadoop.fs.Path} object representing the table directory under
416   * path rootdir
417   *
418   * @param rootdir qualified path of HBase root directory
419   * @param tableName name of table
420   * @return {@link org.apache.hadoop.fs.Path} for table
421   */
422  public static Path getTableDir(Path rootdir, final TableName tableName) {
423    return new Path(getNamespaceDir(rootdir, tableName.getNamespaceAsString()),
424        tableName.getQualifierAsString());
425  }
426
427  /**
428   * Returns the {@link org.apache.hadoop.hbase.TableName} object representing
429   * the table directory under
430   * path rootdir
431   *
432   * @param tablePath path of table
433   * @return {@link org.apache.hadoop.fs.Path} for table
434   */
435  public static TableName getTableName(Path tablePath) {
436    return TableName.valueOf(tablePath.getParent().getName(), tablePath.getName());
437  }
438
439  /**
440   * Returns the {@link org.apache.hadoop.fs.Path} object representing
441   * the namespace directory under path rootdir
442   *
443   * @param rootdir qualified path of HBase root directory
444   * @param namespace namespace name
445   * @return {@link org.apache.hadoop.fs.Path} for table
446   */
447  public static Path getNamespaceDir(Path rootdir, final String namespace) {
448    return new Path(rootdir, new Path(HConstants.BASE_NAMESPACE_DIR,
449        new Path(namespace)));
450  }
451
452  // this mapping means that under a federated FileSystem implementation, we'll
453  // only log the first failure from any of the underlying FileSystems at WARN and all others
454  // will be at DEBUG.
455  private static final Map<FileSystem, Boolean> warningMap = new ConcurrentHashMap<>();
456
457  /**
458   * Sets storage policy for given path.
459   * If the passed path is a directory, we'll set the storage policy for all files
460   * created in the future in said directory. Note that this change in storage
461   * policy takes place at the FileSystem level; it will persist beyond this RS's lifecycle.
462   * If we're running on a version of FileSystem that doesn't support the given storage policy
463   * (or storage policies at all), then we'll issue a log message and continue.
464   *
465   * See http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html
466   *
467   * @param fs We only do anything it implements a setStoragePolicy method
468   * @param path the Path whose storage policy is to be set
469   * @param storagePolicy Policy to set on <code>path</code>; see hadoop 2.6+
470   *   org.apache.hadoop.hdfs.protocol.HdfsConstants for possible list e.g
471   *   'COLD', 'WARM', 'HOT', 'ONE_SSD', 'ALL_SSD', 'LAZY_PERSIST'.
472   */
473  public static void setStoragePolicy(final FileSystem fs, final Path path,
474      final String storagePolicy) {
475    try {
476      setStoragePolicy(fs, path, storagePolicy, false);
477    } catch (IOException e) {
478      // should never arrive here
479      LOG.warn("We have chosen not to throw exception but some unexpectedly thrown out", e);
480    }
481  }
482
483  static void setStoragePolicy(final FileSystem fs, final Path path, final String storagePolicy,
484      boolean throwException) throws IOException {
485    if (storagePolicy == null) {
486      if (LOG.isTraceEnabled()) {
487        LOG.trace("We were passed a null storagePolicy, exiting early.");
488      }
489      return;
490    }
491    String trimmedStoragePolicy = storagePolicy.trim();
492    if (trimmedStoragePolicy.isEmpty()) {
493      if (LOG.isTraceEnabled()) {
494        LOG.trace("We were passed an empty storagePolicy, exiting early.");
495      }
496      return;
497    } else {
498      trimmedStoragePolicy = trimmedStoragePolicy.toUpperCase(Locale.ROOT);
499    }
500    if (trimmedStoragePolicy.equals(HConstants.DEFER_TO_HDFS_STORAGE_POLICY)) {
501      if (LOG.isTraceEnabled()) {
502        LOG.trace("We were passed the defer-to-hdfs policy {}, exiting early.",
503          trimmedStoragePolicy);
504      }
505      return;
506    }
507    try {
508      invokeSetStoragePolicy(fs, path, trimmedStoragePolicy);
509    } catch (IOException e) {
510      if (LOG.isTraceEnabled()) {
511        LOG.trace("Failed to invoke set storage policy API on FS", e);
512      }
513      if (throwException) {
514        throw e;
515      }
516    }
517  }
518
519  /*
520   * All args have been checked and are good. Run the setStoragePolicy invocation.
521   */
522  private static void invokeSetStoragePolicy(final FileSystem fs, final Path path,
523      final String storagePolicy) throws IOException {
524    Exception toThrow = null;
525
526    try {
527      fs.setStoragePolicy(path, storagePolicy);
528
529      if (LOG.isDebugEnabled()) {
530        LOG.debug("Set storagePolicy={} for path={}", storagePolicy, path);
531      }
532    } catch (Exception e) {
533      toThrow = e;
534      // This swallows FNFE, should we be throwing it? seems more likely to indicate dev
535      // misuse than a runtime problem with HDFS.
536      if (!warningMap.containsKey(fs)) {
537        warningMap.put(fs, true);
538        LOG.warn("Unable to set storagePolicy=" + storagePolicy + " for path=" + path + ". " +
539            "DEBUG log level might have more details.", e);
540      } else if (LOG.isDebugEnabled()) {
541        LOG.debug("Unable to set storagePolicy=" + storagePolicy + " for path=" + path, e);
542      }
543
544      // check for lack of HDFS-7228
545      if (e instanceof RemoteException &&
546          HadoopIllegalArgumentException.class.getName().equals(
547            ((RemoteException)e).getClassName())) {
548        if (LOG.isDebugEnabled()) {
549          LOG.debug("Given storage policy, '" +storagePolicy +"', was rejected and probably " +
550            "isn't a valid policy for the version of Hadoop you're running. I.e. if you're " +
551            "trying to use SSD related policies then you're likely missing HDFS-7228. For " +
552            "more information see the 'ArchivalStorage' docs for your Hadoop release.");
553        }
554      // Hadoop 2.8+, 3.0-a1+ added FileSystem.setStoragePolicy with a default implementation
555      // that throws UnsupportedOperationException
556      } else if (e instanceof UnsupportedOperationException) {
557        if (LOG.isDebugEnabled()) {
558          LOG.debug("The underlying FileSystem implementation doesn't support " +
559              "setStoragePolicy. This is probably intentional on their part, since HDFS-9345 " +
560              "appears to be present in your version of Hadoop. For more information check " +
561              "the Hadoop documentation on 'ArchivalStorage', the Hadoop FileSystem " +
562              "specification docs from HADOOP-11981, and/or related documentation from the " +
563              "provider of the underlying FileSystem (its name should appear in the " +
564              "stacktrace that accompanies this message). Note in particular that Hadoop's " +
565              "local filesystem implementation doesn't support storage policies.", e);
566        }
567      }
568    }
569
570    if (toThrow != null) {
571      throw new IOException(toThrow);
572    }
573  }
574
575  /**
576   * @param conf must not be null
577   * @return True if this filesystem whose scheme is 'hdfs'.
578   * @throws IOException from underlying FileSystem
579   */
580  public static boolean isHDFS(final Configuration conf) throws IOException {
581    FileSystem fs = FileSystem.get(conf);
582    String scheme = fs.getUri().getScheme();
583    return scheme.equalsIgnoreCase("hdfs");
584  }
585
586  /**
587   * Checks if the given path is the one with 'recovered.edits' dir.
588   * @param path must not be null
589   * @return True if we recovered edits
590   */
591  public static boolean isRecoveredEdits(Path path) {
592    return path.toString().contains(HConstants.RECOVERED_EDITS_DIR);
593  }
594
595  /**
596   * @param conf must not be null
597   * @return Returns the filesystem of the hbase rootdir.
598   * @throws IOException from underlying FileSystem
599   */
600  public static FileSystem getCurrentFileSystem(Configuration conf) throws IOException {
601    return getRootDir(conf).getFileSystem(conf);
602  }
603
604  /**
605   * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
606   * This accommodates differences between hadoop versions, where hadoop 1
607   * does not throw a FileNotFoundException, and return an empty FileStatus[]
608   * while Hadoop 2 will throw FileNotFoundException.
609   *
610   * Where possible, prefer FSUtils#listStatusWithStatusFilter(FileSystem,
611   * Path, FileStatusFilter) instead.
612   *
613   * @param fs file system
614   * @param dir directory
615   * @param filter path filter
616   * @return null if dir is empty or doesn't exist, otherwise FileStatus array
617   */
618  public static FileStatus[] listStatus(final FileSystem fs,
619      final Path dir, final PathFilter filter) throws IOException {
620    FileStatus [] status = null;
621    try {
622      status = filter == null ? fs.listStatus(dir) : fs.listStatus(dir, filter);
623    } catch (FileNotFoundException fnfe) {
624      // if directory doesn't exist, return null
625      if (LOG.isTraceEnabled()) {
626        LOG.trace("{} doesn't exist", dir);
627      }
628    }
629    if (status == null || status.length < 1) {
630      return null;
631    }
632    return status;
633  }
634
635  /**
636   * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
637   * This would accommodates differences between hadoop versions
638   *
639   * @param fs file system
640   * @param dir directory
641   * @return null if dir is empty or doesn't exist, otherwise FileStatus array
642   */
643  public static FileStatus[] listStatus(final FileSystem fs, final Path dir) throws IOException {
644    return listStatus(fs, dir, null);
645  }
646
647  /**
648   * Calls fs.listFiles() to get FileStatus and BlockLocations together for reducing rpc call
649   *
650   * @param fs file system
651   * @param dir directory
652   * @return LocatedFileStatus list
653   */
654  public static List<LocatedFileStatus> listLocatedStatus(final FileSystem fs,
655      final Path dir) throws IOException {
656    List<LocatedFileStatus> status = null;
657    try {
658      RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fs
659          .listFiles(dir, false);
660      while (locatedFileStatusRemoteIterator.hasNext()) {
661        if (status == null) {
662          status = Lists.newArrayList();
663        }
664        status.add(locatedFileStatusRemoteIterator.next());
665      }
666    } catch (FileNotFoundException fnfe) {
667      // if directory doesn't exist, return null
668      if (LOG.isTraceEnabled()) {
669        LOG.trace("{} doesn't exist", dir);
670      }
671    }
672    return status;
673  }
674
675  /**
676   * Calls fs.delete() and returns the value returned by the fs.delete()
677   *
678   * @param fs must not be null
679   * @param path must not be null
680   * @param recursive delete tree rooted at path
681   * @return the value returned by the fs.delete()
682   * @throws IOException from underlying FileSystem
683   */
684  public static boolean delete(final FileSystem fs, final Path path, final boolean recursive)
685      throws IOException {
686    return fs.delete(path, recursive);
687  }
688
689  /**
690   * Calls fs.exists(). Checks if the specified path exists
691   *
692   * @param fs must not be null
693   * @param path must not be null
694   * @return the value returned by fs.exists()
695   * @throws IOException from underlying FileSystem
696   */
697  public static boolean isExists(final FileSystem fs, final Path path) throws IOException {
698    return fs.exists(path);
699  }
700
701  /**
702   * Log the current state of the filesystem from a certain root directory
703   * @param fs filesystem to investigate
704   * @param root root file/directory to start logging from
705   * @param log log to output information
706   * @throws IOException if an unexpected exception occurs
707   */
708  public static void logFileSystemState(final FileSystem fs, final Path root, Logger log)
709      throws IOException {
710    log.debug("File system contents for path {}", root);
711    logFSTree(log, fs, root, "|-");
712  }
713
714  /**
715   * Recursive helper to log the state of the FS
716   *
717   * @see #logFileSystemState(FileSystem, Path, Logger)
718   */
719  private static void logFSTree(Logger log, final FileSystem fs, final Path root, String prefix)
720      throws IOException {
721    FileStatus[] files = listStatus(fs, root, null);
722    if (files == null) {
723      return;
724    }
725
726    for (FileStatus file : files) {
727      if (file.isDirectory()) {
728        log.debug(prefix + file.getPath().getName() + "/");
729        logFSTree(log, fs, file.getPath(), prefix + "---");
730      } else {
731        log.debug(prefix + file.getPath().getName());
732      }
733    }
734  }
735
736  public static boolean renameAndSetModifyTime(final FileSystem fs, final Path src, final Path dest)
737      throws IOException {
738    // set the modify time for TimeToLive Cleaner
739    fs.setTimes(src, EnvironmentEdgeManager.currentTime(), -1);
740    return fs.rename(src, dest);
741  }
742
743  /**
744   * Check if short circuit read buffer size is set and if not, set it to hbase value.
745   * @param conf must not be null
746   */
747  public static void checkShortCircuitReadBufferSize(final Configuration conf) {
748    final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2;
749    final int notSet = -1;
750    // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2
751    final String dfsKey = "dfs.client.read.shortcircuit.buffer.size";
752    int size = conf.getInt(dfsKey, notSet);
753    // If a size is set, return -- we will use it.
754    if (size != notSet) {
755      return;
756    }
757    // But short circuit buffer size is normally not set.  Put in place the hbase wanted size.
758    int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize);
759    conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
760  }
761
762  private static class DfsBuilderUtility {
763    static Class<?> dfsClass = null;
764    static Method createMethod;
765    static Method overwriteMethod;
766    static Method bufferSizeMethod;
767    static Method blockSizeMethod;
768    static Method recursiveMethod;
769    static Method replicateMethod;
770    static Method replicationMethod;
771    static Method buildMethod;
772    static boolean allMethodsPresent = false;
773
774    static {
775      String dfsName = "org.apache.hadoop.hdfs.DistributedFileSystem";
776      String builderName = dfsName + "$HdfsDataOutputStreamBuilder";
777      Class<?> builderClass = null;
778
779      try {
780        dfsClass = Class.forName(dfsName);
781      } catch (ClassNotFoundException e) {
782        LOG.debug("{} not available, will not use builder API for file creation.", dfsName);
783      }
784      try {
785        builderClass = Class.forName(builderName);
786      } catch (ClassNotFoundException e) {
787        LOG.debug("{} not available, will not use builder API for file creation.", builderName);
788      }
789
790      if (dfsClass != null && builderClass != null) {
791        try {
792          createMethod = dfsClass.getMethod("createFile", Path.class);
793          overwriteMethod = builderClass.getMethod("overwrite", boolean.class);
794          bufferSizeMethod = builderClass.getMethod("bufferSize", int.class);
795          blockSizeMethod = builderClass.getMethod("blockSize", long.class);
796          recursiveMethod = builderClass.getMethod("recursive");
797          replicateMethod = builderClass.getMethod("replicate");
798          replicationMethod = builderClass.getMethod("replication", short.class);
799          buildMethod = builderClass.getMethod("build");
800
801          allMethodsPresent = true;
802          LOG.debug("Using builder API via reflection for DFS file creation.");
803        } catch (NoSuchMethodException e) {
804          LOG.debug("Could not find method on builder; will use old DFS API for file creation {}",
805              e.getMessage());
806        }
807      }
808    }
809
810    /**
811     * Attempt to use builder API via reflection to create a file with the given parameters and
812     * replication enabled.
813     */
814    static FSDataOutputStream createHelper(FileSystem fs, Path path, boolean overwritable,
815        int bufferSize, short replication, long blockSize, boolean isRecursive) throws IOException {
816      if (allMethodsPresent && dfsClass.isInstance(fs)) {
817        try {
818          Object builder;
819
820          builder = createMethod.invoke(fs, path);
821          builder = overwriteMethod.invoke(builder, overwritable);
822          builder = bufferSizeMethod.invoke(builder, bufferSize);
823          builder = blockSizeMethod.invoke(builder, blockSize);
824          if (isRecursive) {
825            builder = recursiveMethod.invoke(builder);
826          }
827          builder = replicateMethod.invoke(builder);
828          builder = replicationMethod.invoke(builder, replication);
829          return (FSDataOutputStream) buildMethod.invoke(builder);
830        } catch (IllegalAccessException | InvocationTargetException e) {
831          // Should have caught this failure during initialization, so log full trace here
832          LOG.warn("Couldn't use reflection with builder API", e);
833        }
834      }
835
836      if (isRecursive) {
837        return fs.create(path, overwritable, bufferSize, replication, blockSize, null);
838      }
839      return fs.createNonRecursive(path, overwritable, bufferSize, replication, blockSize, null);
840    }
841
842    /**
843     * Attempt to use builder API via reflection to create a file with the given parameters and
844     * replication enabled.
845     */
846    static FSDataOutputStream createHelper(FileSystem fs, Path path, boolean overwritable)
847        throws IOException {
848      if (allMethodsPresent && dfsClass.isInstance(fs)) {
849        try {
850          Object builder;
851
852          builder = createMethod.invoke(fs, path);
853          builder = overwriteMethod.invoke(builder, overwritable);
854          builder = replicateMethod.invoke(builder);
855          return (FSDataOutputStream) buildMethod.invoke(builder);
856        } catch (IllegalAccessException | InvocationTargetException e) {
857          // Should have caught this failure during initialization, so log full trace here
858          LOG.warn("Couldn't use reflection with builder API", e);
859        }
860      }
861
862      return fs.create(path, overwritable);
863    }
864  }
865
866  /**
867   * Attempt to use builder API via reflection to create a file with the given parameters and
868   * replication enabled.
869   * <p>
870   * Will not attempt to enable replication when passed an HFileSystem.
871   */
872  public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwritable)
873      throws IOException {
874    return DfsBuilderUtility.createHelper(fs, path, overwritable);
875  }
876
877  /**
878   * Attempt to use builder API via reflection to create a file with the given parameters and
879   * replication enabled.
880   * <p>
881   * Will not attempt to enable replication when passed an HFileSystem.
882   */
883  public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwritable,
884      int bufferSize, short replication, long blockSize, boolean isRecursive) throws IOException {
885    return DfsBuilderUtility.createHelper(fs, path, overwritable, bufferSize, replication,
886        blockSize, isRecursive);
887  }
888
889  // Holder singleton idiom. JVM spec ensures this will be run at most once per Classloader, and
890  // not until we attempt to reference it.
891  private static class StreamCapabilities {
892    public static final boolean PRESENT;
893    public static final Class<?> CLASS;
894    public static final Method METHOD;
895    static {
896      boolean tmp = false;
897      Class<?> clazz = null;
898      Method method = null;
899      try {
900        clazz = Class.forName("org.apache.hadoop.fs.StreamCapabilities");
901        method = clazz.getMethod("hasCapability", String.class);
902        tmp = true;
903      } catch(ClassNotFoundException|NoSuchMethodException|SecurityException exception) {
904        LOG.warn("Your Hadoop installation does not include the StreamCapabilities class from " +
905                 "HDFS-11644, so we will skip checking if any FSDataOutputStreams actually " +
906                 "support hflush/hsync. If you are running on top of HDFS this probably just " +
907                 "means you have an older version and this can be ignored. If you are running on " +
908                 "top of an alternate FileSystem implementation you should manually verify that " +
909                 "hflush and hsync are implemented; otherwise you risk data loss and hard to " +
910                 "diagnose errors when our assumptions are violated.");
911        LOG.debug("The first request to check for StreamCapabilities came from this stacktrace.",
912            exception);
913      } finally {
914        PRESENT = tmp;
915        CLASS = clazz;
916        METHOD = method;
917      }
918    }
919  }
920
921  /**
922   * If our FileSystem version includes the StreamCapabilities class, check if the given stream has
923   * a particular capability.
924   * @param stream capabilities are per-stream instance, so check this one specifically. must not be
925   *          null
926   * @param capability what to look for, per Hadoop Common's FileSystem docs
927   * @return true if there are no StreamCapabilities. false if there are, but this stream doesn't
928   *         implement it. return result of asking the stream otherwise.
929   * @throws NullPointerException if {@code stream} is {@code null}
930   */
931  public static boolean hasCapability(FSDataOutputStream stream, String capability) {
932    // be consistent whether or not StreamCapabilities is present
933    Objects.requireNonNull(stream, "stream cannot be null");
934    // If o.a.h.fs.StreamCapabilities doesn't exist, assume everyone does everything
935    // otherwise old versions of Hadoop will break.
936    boolean result = true;
937    if (StreamCapabilities.PRESENT) {
938      // if StreamCapabilities is present, but the stream doesn't implement it
939      // or we run into a problem invoking the method,
940      // we treat that as equivalent to not declaring anything
941      result = false;
942      if (StreamCapabilities.CLASS.isAssignableFrom(stream.getClass())) {
943        try {
944          result = ((Boolean)StreamCapabilities.METHOD.invoke(stream, capability)).booleanValue();
945        } catch (IllegalAccessException|IllegalArgumentException|InvocationTargetException
946            exception) {
947          LOG.warn("Your Hadoop installation's StreamCapabilities implementation doesn't match " +
948              "our understanding of how it's supposed to work. Please file a JIRA and include " +
949              "the following stack trace. In the mean time we're interpreting this behavior " +
950              "difference as a lack of capability support, which will probably cause a failure.",
951              exception);
952        }
953      }
954    }
955    return result;
956  }
957
958  /**
959   * Helper exception for those cases where the place where we need to check a stream capability
960   * is not where we have the needed context to explain the impact and mitigation for a lack.
961   */
962  public static class StreamLacksCapabilityException extends Exception {
963    public StreamLacksCapabilityException(String message, Throwable cause) {
964      super(message, cause);
965    }
966    public StreamLacksCapabilityException(String message) {
967      super(message);
968    }
969  }
970}