001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.util;
020
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.lang.reflect.InvocationTargetException;
024import java.lang.reflect.Method;
025import java.net.URI;
026import java.net.URISyntaxException;
027import java.util.List;
028import java.util.Locale;
029import java.util.Map;
030import java.util.concurrent.ConcurrentHashMap;
031import org.apache.hadoop.HadoopIllegalArgumentException;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FSDataOutputStream;
034import org.apache.hadoop.fs.FileStatus;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.LocatedFileStatus;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.fs.PathFilter;
039import org.apache.hadoop.fs.RemoteIterator;
040import org.apache.hadoop.fs.permission.FsPermission;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.hadoop.ipc.RemoteException;
044import org.apache.yetus.audience.InterfaceAudience;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
049import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
050
051/**
052 * Utility methods for interacting with the underlying file system.
053 * <p/>
054 * Note that {@link #setStoragePolicy(FileSystem, Path, String)} is tested in TestFSUtils and
055 * pre-commit will run the hbase-server tests if there's code change in this class. See
056 * <a href="https://issues.apache.org/jira/browse/HBASE-20838">HBASE-20838</a> for more details.
057 */
058@InterfaceAudience.Private
059public abstract class CommonFSUtils {
060  private static final Logger LOG = LoggerFactory.getLogger(CommonFSUtils.class);
061
062  /** Parameter name for HBase WAL directory */
063  public static final String HBASE_WAL_DIR = "hbase.wal.dir";
064
065  /** Parameter to disable stream capability enforcement checks */
066  public static final String UNSAFE_STREAM_CAPABILITY_ENFORCE =
067    "hbase.unsafe.stream.capability.enforce";
068
069  /** Full access permissions (starting point for a umask) */
070  public static final String FULL_RWX_PERMISSIONS = "777";
071
072  protected CommonFSUtils() {
073    super();
074  }
075
076  /**
077   * Compare of path component. Does not consider schema; i.e. if schemas
078   * different but <code>path</code> starts with <code>rootPath</code>,
079   * then the function returns true
080   * @param rootPath value to check for
081   * @param path subject to check
082   * @return True if <code>path</code> starts with <code>rootPath</code>
083   */
084  public static boolean isStartingWithPath(final Path rootPath, final String path) {
085    String uriRootPath = rootPath.toUri().getPath();
086    String tailUriPath = (new Path(path)).toUri().getPath();
087    return tailUriPath.startsWith(uriRootPath);
088  }
089
090  /**
091   * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
092   * '/a/b/c' part. Does not consider schema; i.e. if schemas different but path or subpath matches,
093   * the two will equate.
094   * @param pathToSearch Path we will be trying to match against.
095   * @param pathTail what to match
096   * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
097   */
098  public static boolean isMatchingTail(final Path pathToSearch, String pathTail) {
099    return isMatchingTail(pathToSearch, new Path(pathTail));
100  }
101
102  /**
103   * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
104   * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true.  Does not consider
105   * schema; i.e. if schemas different but path or subpath matches, the two will equate.
106   * @param pathToSearch Path we will be trying to match agains against
107   * @param pathTail what to match
108   * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
109   */
110  public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) {
111    if (pathToSearch.depth() != pathTail.depth()) {
112      return false;
113    }
114    Path tailPath = pathTail;
115    String tailName;
116    Path toSearch = pathToSearch;
117    String toSearchName;
118    boolean result = false;
119    do {
120      tailName = tailPath.getName();
121      if (tailName == null || tailName.length() <= 0) {
122        result = true;
123        break;
124      }
125      toSearchName = toSearch.getName();
126      if (toSearchName == null || toSearchName.length() <= 0) {
127        break;
128      }
129      // Move up a parent on each path for next go around.  Path doesn't let us go off the end.
130      tailPath = tailPath.getParent();
131      toSearch = toSearch.getParent();
132    } while(tailName.equals(toSearchName));
133    return result;
134  }
135
136  /**
137   * Delete if exists.
138   * @param fs filesystem object
139   * @param dir directory to delete
140   * @return True if deleted <code>dir</code>
141   * @throws IOException e
142   */
143  public static boolean deleteDirectory(final FileSystem fs, final Path dir) throws IOException {
144    return fs.exists(dir) && fs.delete(dir, true);
145  }
146
147  /**
148   * Return the number of bytes that large input files should be optimally
149   * be split into to minimize i/o time.
150   *
151   * use reflection to search for getDefaultBlockSize(Path f)
152   * if the method doesn't exist, fall back to using getDefaultBlockSize()
153   *
154   * @param fs filesystem object
155   * @return the default block size for the path's filesystem
156   * @throws IOException e
157   */
158  public static long getDefaultBlockSize(final FileSystem fs, final Path path) throws IOException {
159    Method m = null;
160    Class<? extends FileSystem> cls = fs.getClass();
161    try {
162      m = cls.getMethod("getDefaultBlockSize", Path.class);
163    } catch (NoSuchMethodException e) {
164      LOG.info("FileSystem doesn't support getDefaultBlockSize");
165    } catch (SecurityException e) {
166      LOG.info("Doesn't have access to getDefaultBlockSize on FileSystems", e);
167      m = null; // could happen on setAccessible()
168    }
169    if (m == null) {
170      return fs.getDefaultBlockSize(path);
171    } else {
172      try {
173        Object ret = m.invoke(fs, path);
174        return ((Long)ret).longValue();
175      } catch (Exception e) {
176        throw new IOException(e);
177      }
178    }
179  }
180
181  /*
182   * Get the default replication.
183   *
184   * use reflection to search for getDefaultReplication(Path f)
185   * if the method doesn't exist, fall back to using getDefaultReplication()
186   *
187   * @param fs filesystem object
188   * @param f path of file
189   * @return default replication for the path's filesystem
190   * @throws IOException e
191   */
192  public static short getDefaultReplication(final FileSystem fs, final Path path)
193      throws IOException {
194    Method m = null;
195    Class<? extends FileSystem> cls = fs.getClass();
196    try {
197      m = cls.getMethod("getDefaultReplication", Path.class);
198    } catch (NoSuchMethodException e) {
199      LOG.info("FileSystem doesn't support getDefaultReplication");
200    } catch (SecurityException e) {
201      LOG.info("Doesn't have access to getDefaultReplication on FileSystems", e);
202      m = null; // could happen on setAccessible()
203    }
204    if (m == null) {
205      return fs.getDefaultReplication(path);
206    } else {
207      try {
208        Object ret = m.invoke(fs, path);
209        return ((Number)ret).shortValue();
210      } catch (Exception e) {
211        throw new IOException(e);
212      }
213    }
214  }
215
216  /**
217   * Returns the default buffer size to use during writes.
218   *
219   * The size of the buffer should probably be a multiple of hardware
220   * page size (4096 on Intel x86), and it determines how much data is
221   * buffered during read and write operations.
222   *
223   * @param fs filesystem object
224   * @return default buffer size to use during writes
225   */
226  public static int getDefaultBufferSize(final FileSystem fs) {
227    return fs.getConf().getInt("io.file.buffer.size", 4096);
228  }
229
230  /**
231   * Create the specified file on the filesystem. By default, this will:
232   * <ol>
233   * <li>apply the umask in the configuration (if it is enabled)</li>
234   * <li>use the fs configured buffer size (or 4096 if not set)</li>
235   * <li>use the default replication</li>
236   * <li>use the default block size</li>
237   * <li>not track progress</li>
238   * </ol>
239   *
240   * @param fs {@link FileSystem} on which to write the file
241   * @param path {@link Path} to the file to write
242   * @param perm intial permissions
243   * @param overwrite Whether or not the created file should be overwritten.
244   * @return output stream to the created file
245   * @throws IOException if the file cannot be created
246   */
247  public static FSDataOutputStream create(FileSystem fs, Path path,
248      FsPermission perm, boolean overwrite) throws IOException {
249    if (LOG.isTraceEnabled()) {
250      LOG.trace("Creating file={} with permission={}, overwrite={}", path, perm, overwrite);
251    }
252    return fs.create(path, perm, overwrite, getDefaultBufferSize(fs),
253        getDefaultReplication(fs, path), getDefaultBlockSize(fs, path), null);
254  }
255
256  /**
257   * Get the file permissions specified in the configuration, if they are
258   * enabled.
259   *
260   * @param fs filesystem that the file will be created on.
261   * @param conf configuration to read for determining if permissions are
262   *          enabled and which to use
263   * @param permssionConfKey property key in the configuration to use when
264   *          finding the permission
265   * @return the permission to use when creating a new file on the fs. If
266   *         special permissions are not specified in the configuration, then
267   *         the default permissions on the the fs will be returned.
268   */
269  public static FsPermission getFilePermissions(final FileSystem fs,
270      final Configuration conf, final String permssionConfKey) {
271    boolean enablePermissions = conf.getBoolean(
272        HConstants.ENABLE_DATA_FILE_UMASK, false);
273
274    if (enablePermissions) {
275      try {
276        FsPermission perm = new FsPermission(FULL_RWX_PERMISSIONS);
277        // make sure that we have a mask, if not, go default.
278        String mask = conf.get(permssionConfKey);
279        if (mask == null) {
280          return FsPermission.getFileDefault();
281        }
282        // appy the umask
283        FsPermission umask = new FsPermission(mask);
284        return perm.applyUMask(umask);
285      } catch (IllegalArgumentException e) {
286        LOG.warn(
287            "Incorrect umask attempted to be created: "
288                + conf.get(permssionConfKey)
289                + ", using default file permissions.", e);
290        return FsPermission.getFileDefault();
291      }
292    }
293    return FsPermission.getFileDefault();
294  }
295
296  /**
297   * Verifies root directory path is a valid URI with a scheme
298   *
299   * @param root root directory path
300   * @return Passed <code>root</code> argument.
301   * @throws IOException if not a valid URI with a scheme
302   */
303  public static Path validateRootPath(Path root) throws IOException {
304    try {
305      URI rootURI = new URI(root.toString());
306      String scheme = rootURI.getScheme();
307      if (scheme == null) {
308        throw new IOException("Root directory does not have a scheme");
309      }
310      return root;
311    } catch (URISyntaxException e) {
312      IOException io = new IOException("Root directory path is not a valid " +
313        "URI -- check your " + HConstants.HBASE_DIR + " configuration");
314      io.initCause(e);
315      throw io;
316    }
317  }
318
319  /**
320   * Checks for the presence of the WAL log root path (using the provided conf object) in the given
321   * path. If it exists, this method removes it and returns the String representation of remaining
322   * relative path.
323   * @param path must not be null
324   * @param conf must not be null
325   * @return String representation of the remaining relative path
326   * @throws IOException from underlying filesystem
327   */
328  public static String removeWALRootPath(Path path, final Configuration conf) throws IOException {
329    Path root = getWALRootDir(conf);
330    String pathStr = path.toString();
331    // check that the path is absolute... it has the root path in it.
332    if (!pathStr.startsWith(root.toString())) {
333      return pathStr;
334    }
335    // if not, return as it is.
336    return pathStr.substring(root.toString().length() + 1);// remove the "/" too.
337  }
338
339  /**
340   * Return the 'path' component of a Path.  In Hadoop, Path is a URI.  This
341   * method returns the 'path' component of a Path's URI: e.g. If a Path is
342   * <code>hdfs://example.org:9000/hbase_trunk/TestTable/compaction.dir</code>,
343   * this method returns <code>/hbase_trunk/TestTable/compaction.dir</code>.
344   * This method is useful if you want to print out a Path without qualifying
345   * Filesystem instance.
346   * @param p Filesystem Path whose 'path' component we are to return.
347   * @return Path portion of the Filesystem
348   */
349  public static String getPath(Path p) {
350    return p.toUri().getPath();
351  }
352
353  /**
354   * @param c configuration
355   * @return {@link Path} to hbase root directory from
356   *     configuration as a qualified Path.
357   * @throws IOException e
358   */
359  public static Path getRootDir(final Configuration c) throws IOException {
360    Path p = new Path(c.get(HConstants.HBASE_DIR));
361    FileSystem fs = p.getFileSystem(c);
362    return p.makeQualified(fs.getUri(), fs.getWorkingDirectory());
363  }
364
365  public static void setRootDir(final Configuration c, final Path root) {
366    c.set(HConstants.HBASE_DIR, root.toString());
367  }
368
369  public static void setFsDefault(final Configuration c, final Path root) {
370    c.set("fs.defaultFS", root.toString());    // for hadoop 0.21+
371  }
372
373  public static FileSystem getRootDirFileSystem(final Configuration c) throws IOException {
374    Path p = getRootDir(c);
375    return p.getFileSystem(c);
376  }
377
378  /**
379   * @param c configuration
380   * @return {@link Path} to hbase log root directory: e.g. {@value HBASE_WAL_DIR} from
381   *     configuration as a qualified Path. Defaults to HBase root dir.
382   * @throws IOException e
383   */
384  public static Path getWALRootDir(final Configuration c) throws IOException {
385
386    Path p = new Path(c.get(HBASE_WAL_DIR, c.get(HConstants.HBASE_DIR)));
387    if (!isValidWALRootDir(p, c)) {
388      return getRootDir(c);
389    }
390    FileSystem fs = p.getFileSystem(c);
391    return p.makeQualified(fs.getUri(), fs.getWorkingDirectory());
392  }
393
394  @VisibleForTesting
395  public static void setWALRootDir(final Configuration c, final Path root) {
396    c.set(HBASE_WAL_DIR, root.toString());
397  }
398
399  public static FileSystem getWALFileSystem(final Configuration c) throws IOException {
400    Path p = getWALRootDir(c);
401    FileSystem fs = p.getFileSystem(c);
402    // hadoop-core does fs caching, so need to propogate this if set
403    String enforceStreamCapability = c.get(UNSAFE_STREAM_CAPABILITY_ENFORCE);
404    if (enforceStreamCapability != null) {
405      fs.getConf().set(UNSAFE_STREAM_CAPABILITY_ENFORCE, enforceStreamCapability);
406    }
407    return fs;
408  }
409
410  private static boolean isValidWALRootDir(Path walDir, final Configuration c) throws IOException {
411    Path rootDir = getRootDir(c);
412    FileSystem fs = walDir.getFileSystem(c);
413    Path qualifiedWalDir = walDir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
414    if (!qualifiedWalDir.equals(rootDir)) {
415      if (qualifiedWalDir.toString().startsWith(rootDir.toString() + "/")) {
416        throw new IllegalStateException("Illegal WAL directory specified. " +
417            "WAL directories are not permitted to be under the root directory if set.");
418      }
419    }
420    return true;
421  }
422
423  /**
424   * Returns the WAL region directory based on the given table name and region name
425   * @param conf configuration to determine WALRootDir
426   * @param tableName Table that the region is under
427   * @param encodedRegionName Region name used for creating the final region directory
428   * @return the region directory used to store WALs under the WALRootDir
429   * @throws IOException if there is an exception determining the WALRootDir
430   */
431  public static Path getWALRegionDir(final Configuration conf, final TableName tableName,
432      final String encodedRegionName) throws IOException {
433    return new Path(getWALTableDir(conf, tableName), encodedRegionName);
434  }
435
436  /**
437   * Returns the Table directory under the WALRootDir for the specified table name
438   * @param conf configuration used to get the WALRootDir
439   * @param tableName Table to get the directory for
440   * @return a path to the WAL table directory for the specified table
441   * @throws IOException if there is an exception determining the WALRootDir
442   */
443  public static Path getWALTableDir(final Configuration conf, final TableName tableName)
444      throws IOException {
445    Path baseDir = new Path(getWALRootDir(conf), HConstants.BASE_NAMESPACE_DIR);
446    return new Path(new Path(baseDir, tableName.getNamespaceAsString()),
447      tableName.getQualifierAsString());
448  }
449
450  /**
451   * For backward compatibility with HBASE-20734, where we store recovered edits in a wrong
452   * directory without BASE_NAMESPACE_DIR. See HBASE-22617 for more details.
453   * @deprecated For compatibility, will be removed in 4.0.0.
454   */
455  @Deprecated
456  public static Path getWrongWALRegionDir(final Configuration conf, final TableName tableName,
457      final String encodedRegionName) throws IOException {
458    Path wrongTableDir = new Path(new Path(getWALRootDir(conf), tableName.getNamespaceAsString()),
459      tableName.getQualifierAsString());
460    return new Path(wrongTableDir, encodedRegionName);
461  }
462
463  /**
464   * Returns the {@link org.apache.hadoop.fs.Path} object representing the table directory under
465   * path rootdir
466   *
467   * @param rootdir qualified path of HBase root directory
468   * @param tableName name of table
469   * @return {@link org.apache.hadoop.fs.Path} for table
470   */
471  public static Path getTableDir(Path rootdir, final TableName tableName) {
472    return new Path(getNamespaceDir(rootdir, tableName.getNamespaceAsString()),
473        tableName.getQualifierAsString());
474  }
475
476  /**
477   * Returns the {@link org.apache.hadoop.hbase.TableName} object representing
478   * the table directory under
479   * path rootdir
480   *
481   * @param tablePath path of table
482   * @return {@link org.apache.hadoop.fs.Path} for table
483   */
484  public static TableName getTableName(Path tablePath) {
485    return TableName.valueOf(tablePath.getParent().getName(), tablePath.getName());
486  }
487
488  /**
489   * Returns the {@link org.apache.hadoop.fs.Path} object representing
490   * the namespace directory under path rootdir
491   *
492   * @param rootdir qualified path of HBase root directory
493   * @param namespace namespace name
494   * @return {@link org.apache.hadoop.fs.Path} for table
495   */
496  public static Path getNamespaceDir(Path rootdir, final String namespace) {
497    return new Path(rootdir, new Path(HConstants.BASE_NAMESPACE_DIR,
498        new Path(namespace)));
499  }
500
501  // this mapping means that under a federated FileSystem implementation, we'll
502  // only log the first failure from any of the underlying FileSystems at WARN and all others
503  // will be at DEBUG.
504  private static final Map<FileSystem, Boolean> warningMap = new ConcurrentHashMap<>();
505
506  /**
507   * Sets storage policy for given path.
508   * If the passed path is a directory, we'll set the storage policy for all files
509   * created in the future in said directory. Note that this change in storage
510   * policy takes place at the FileSystem level; it will persist beyond this RS's lifecycle.
511   * If we're running on a version of FileSystem that doesn't support the given storage policy
512   * (or storage policies at all), then we'll issue a log message and continue.
513   *
514   * See http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html
515   *
516   * @param fs We only do anything it implements a setStoragePolicy method
517   * @param path the Path whose storage policy is to be set
518   * @param storagePolicy Policy to set on <code>path</code>; see hadoop 2.6+
519   *   org.apache.hadoop.hdfs.protocol.HdfsConstants for possible list e.g
520   *   'COLD', 'WARM', 'HOT', 'ONE_SSD', 'ALL_SSD', 'LAZY_PERSIST'.
521   */
522  public static void setStoragePolicy(final FileSystem fs, final Path path,
523      final String storagePolicy) {
524    try {
525      setStoragePolicy(fs, path, storagePolicy, false);
526    } catch (IOException e) {
527      // should never arrive here
528      LOG.warn("We have chosen not to throw exception but some unexpectedly thrown out", e);
529    }
530  }
531
532  static void setStoragePolicy(final FileSystem fs, final Path path, final String storagePolicy,
533      boolean throwException) throws IOException {
534    if (storagePolicy == null) {
535      if (LOG.isTraceEnabled()) {
536        LOG.trace("We were passed a null storagePolicy, exiting early.");
537      }
538      return;
539    }
540    String trimmedStoragePolicy = storagePolicy.trim();
541    if (trimmedStoragePolicy.isEmpty()) {
542      if (LOG.isTraceEnabled()) {
543        LOG.trace("We were passed an empty storagePolicy, exiting early.");
544      }
545      return;
546    } else {
547      trimmedStoragePolicy = trimmedStoragePolicy.toUpperCase(Locale.ROOT);
548    }
549    if (trimmedStoragePolicy.equals(HConstants.DEFER_TO_HDFS_STORAGE_POLICY)) {
550      if (LOG.isTraceEnabled()) {
551        LOG.trace("We were passed the defer-to-hdfs policy {}, exiting early.",
552          trimmedStoragePolicy);
553      }
554      return;
555    }
556    try {
557      invokeSetStoragePolicy(fs, path, trimmedStoragePolicy);
558    } catch (IOException e) {
559      if (LOG.isTraceEnabled()) {
560        LOG.trace("Failed to invoke set storage policy API on FS", e);
561      }
562      if (throwException) {
563        throw e;
564      }
565    }
566  }
567
568  /*
569   * All args have been checked and are good. Run the setStoragePolicy invocation.
570   */
571  private static void invokeSetStoragePolicy(final FileSystem fs, final Path path,
572      final String storagePolicy) throws IOException {
573    Method m = null;
574    Exception toThrow = null;
575    try {
576      m = fs.getClass().getDeclaredMethod("setStoragePolicy", Path.class, String.class);
577      m.setAccessible(true);
578    } catch (NoSuchMethodException e) {
579      toThrow = e;
580      final String msg = "FileSystem doesn't support setStoragePolicy; HDFS-6584, HDFS-9345 " +
581          "not available. This is normal and expected on earlier Hadoop versions.";
582      if (!warningMap.containsKey(fs)) {
583        warningMap.put(fs, true);
584        LOG.warn(msg, e);
585      } else if (LOG.isDebugEnabled()) {
586        LOG.debug(msg, e);
587      }
588      m = null;
589    } catch (SecurityException e) {
590      toThrow = e;
591      final String msg = "No access to setStoragePolicy on FileSystem from the SecurityManager; " +
592          "HDFS-6584, HDFS-9345 not available. This is unusual and probably warrants an email " +
593          "to the user@hbase mailing list. Please be sure to include a link to your configs, and " +
594          "logs that include this message and period of time before it. Logs around service " +
595          "start up will probably be useful as well.";
596      if (!warningMap.containsKey(fs)) {
597        warningMap.put(fs, true);
598        LOG.warn(msg, e);
599      } else if (LOG.isDebugEnabled()) {
600        LOG.debug(msg, e);
601      }
602      m = null; // could happen on setAccessible() or getDeclaredMethod()
603    }
604    if (m != null) {
605      try {
606        m.invoke(fs, path, storagePolicy);
607        if (LOG.isDebugEnabled()) {
608          LOG.debug("Set storagePolicy={} for path={}", storagePolicy, path);
609        }
610      } catch (Exception e) {
611        toThrow = e;
612        // This swallows FNFE, should we be throwing it? seems more likely to indicate dev
613        // misuse than a runtime problem with HDFS.
614        if (!warningMap.containsKey(fs)) {
615          warningMap.put(fs, true);
616          LOG.warn("Unable to set storagePolicy=" + storagePolicy + " for path=" + path + ". " +
617              "DEBUG log level might have more details.", e);
618        } else if (LOG.isDebugEnabled()) {
619          LOG.debug("Unable to set storagePolicy=" + storagePolicy + " for path=" + path, e);
620        }
621        // check for lack of HDFS-7228
622        if (e instanceof InvocationTargetException) {
623          final Throwable exception = e.getCause();
624          if (exception instanceof RemoteException &&
625              HadoopIllegalArgumentException.class.getName().equals(
626                ((RemoteException)exception).getClassName())) {
627            if (LOG.isDebugEnabled()) {
628              LOG.debug("Given storage policy, '" +storagePolicy +"', was rejected and probably " +
629                "isn't a valid policy for the version of Hadoop you're running. I.e. if you're " +
630                "trying to use SSD related policies then you're likely missing HDFS-7228. For " +
631                "more information see the 'ArchivalStorage' docs for your Hadoop release.");
632            }
633          // Hadoop 2.8+, 3.0-a1+ added FileSystem.setStoragePolicy with a default implementation
634          // that throws UnsupportedOperationException
635          } else if (exception instanceof UnsupportedOperationException) {
636            if (LOG.isDebugEnabled()) {
637              LOG.debug("The underlying FileSystem implementation doesn't support " +
638                  "setStoragePolicy. This is probably intentional on their part, since HDFS-9345 " +
639                  "appears to be present in your version of Hadoop. For more information check " +
640                  "the Hadoop documentation on 'ArchivalStorage', the Hadoop FileSystem " +
641                  "specification docs from HADOOP-11981, and/or related documentation from the " +
642                  "provider of the underlying FileSystem (its name should appear in the " +
643                  "stacktrace that accompanies this message). Note in particular that Hadoop's " +
644                  "local filesystem implementation doesn't support storage policies.", exception);
645            }
646          }
647        }
648      }
649    }
650    if (toThrow != null) {
651      throw new IOException(toThrow);
652    }
653  }
654
655  /**
656   * @param conf must not be null
657   * @return True if this filesystem whose scheme is 'hdfs'.
658   * @throws IOException from underlying FileSystem
659   */
660  public static boolean isHDFS(final Configuration conf) throws IOException {
661    FileSystem fs = FileSystem.get(conf);
662    String scheme = fs.getUri().getScheme();
663    return scheme.equalsIgnoreCase("hdfs");
664  }
665
666  /**
667   * Checks if the given path is the one with 'recovered.edits' dir.
668   * @param path must not be null
669   * @return True if we recovered edits
670   */
671  public static boolean isRecoveredEdits(Path path) {
672    return path.toString().contains(HConstants.RECOVERED_EDITS_DIR);
673  }
674
675  /**
676   * @param conf must not be null
677   * @return Returns the filesystem of the hbase rootdir.
678   * @throws IOException from underlying FileSystem
679   */
680  public static FileSystem getCurrentFileSystem(Configuration conf) throws IOException {
681    return getRootDir(conf).getFileSystem(conf);
682  }
683
684  /**
685   * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
686   * This accommodates differences between hadoop versions, where hadoop 1
687   * does not throw a FileNotFoundException, and return an empty FileStatus[]
688   * while Hadoop 2 will throw FileNotFoundException.
689   *
690   * Where possible, prefer FSUtils#listStatusWithStatusFilter(FileSystem,
691   * Path, FileStatusFilter) instead.
692   *
693   * @param fs file system
694   * @param dir directory
695   * @param filter path filter
696   * @return null if dir is empty or doesn't exist, otherwise FileStatus array
697   */
698  public static FileStatus[] listStatus(final FileSystem fs,
699      final Path dir, final PathFilter filter) throws IOException {
700    FileStatus [] status = null;
701    try {
702      status = filter == null ? fs.listStatus(dir) : fs.listStatus(dir, filter);
703    } catch (FileNotFoundException fnfe) {
704      // if directory doesn't exist, return null
705      if (LOG.isTraceEnabled()) {
706        LOG.trace("{} doesn't exist", dir);
707      }
708    }
709    if (status == null || status.length < 1) {
710      return null;
711    }
712    return status;
713  }
714
715  /**
716   * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
717   * This would accommodates differences between hadoop versions
718   *
719   * @param fs file system
720   * @param dir directory
721   * @return null if dir is empty or doesn't exist, otherwise FileStatus array
722   */
723  public static FileStatus[] listStatus(final FileSystem fs, final Path dir) throws IOException {
724    return listStatus(fs, dir, null);
725  }
726
727  /**
728   * Calls fs.listFiles() to get FileStatus and BlockLocations together for reducing rpc call
729   *
730   * @param fs file system
731   * @param dir directory
732   * @return LocatedFileStatus list
733   */
734  public static List<LocatedFileStatus> listLocatedStatus(final FileSystem fs,
735      final Path dir) throws IOException {
736    List<LocatedFileStatus> status = null;
737    try {
738      RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fs
739          .listFiles(dir, false);
740      while (locatedFileStatusRemoteIterator.hasNext()) {
741        if (status == null) {
742          status = Lists.newArrayList();
743        }
744        status.add(locatedFileStatusRemoteIterator.next());
745      }
746    } catch (FileNotFoundException fnfe) {
747      // if directory doesn't exist, return null
748      if (LOG.isTraceEnabled()) {
749        LOG.trace("{} doesn't exist", dir);
750      }
751    }
752    return status;
753  }
754
755  /**
756   * Calls fs.delete() and returns the value returned by the fs.delete()
757   *
758   * @param fs must not be null
759   * @param path must not be null
760   * @param recursive delete tree rooted at path
761   * @return the value returned by the fs.delete()
762   * @throws IOException from underlying FileSystem
763   */
764  public static boolean delete(final FileSystem fs, final Path path, final boolean recursive)
765      throws IOException {
766    return fs.delete(path, recursive);
767  }
768
769  /**
770   * Calls fs.exists(). Checks if the specified path exists
771   *
772   * @param fs must not be null
773   * @param path must not be null
774   * @return the value returned by fs.exists()
775   * @throws IOException from underlying FileSystem
776   */
777  public static boolean isExists(final FileSystem fs, final Path path) throws IOException {
778    return fs.exists(path);
779  }
780
781  /**
782   * Log the current state of the filesystem from a certain root directory
783   * @param fs filesystem to investigate
784   * @param root root file/directory to start logging from
785   * @param log log to output information
786   * @throws IOException if an unexpected exception occurs
787   */
788  public static void logFileSystemState(final FileSystem fs, final Path root, Logger log)
789      throws IOException {
790    log.debug("File system contents for path {}", root);
791    logFSTree(log, fs, root, "|-");
792  }
793
794  /**
795   * Recursive helper to log the state of the FS
796   *
797   * @see #logFileSystemState(FileSystem, Path, Logger)
798   */
799  private static void logFSTree(Logger log, final FileSystem fs, final Path root, String prefix)
800      throws IOException {
801    FileStatus[] files = listStatus(fs, root, null);
802    if (files == null) {
803      return;
804    }
805
806    for (FileStatus file : files) {
807      if (file.isDirectory()) {
808        log.debug(prefix + file.getPath().getName() + "/");
809        logFSTree(log, fs, file.getPath(), prefix + "---");
810      } else {
811        log.debug(prefix + file.getPath().getName());
812      }
813    }
814  }
815
816  public static boolean renameAndSetModifyTime(final FileSystem fs, final Path src, final Path dest)
817      throws IOException {
818    // set the modify time for TimeToLive Cleaner
819    fs.setTimes(src, EnvironmentEdgeManager.currentTime(), -1);
820    return fs.rename(src, dest);
821  }
822
823  /**
824   * Check if short circuit read buffer size is set and if not, set it to hbase value.
825   * @param conf must not be null
826   */
827  public static void checkShortCircuitReadBufferSize(final Configuration conf) {
828    final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2;
829    final int notSet = -1;
830    // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2
831    final String dfsKey = "dfs.client.read.shortcircuit.buffer.size";
832    int size = conf.getInt(dfsKey, notSet);
833    // If a size is set, return -- we will use it.
834    if (size != notSet) {
835      return;
836    }
837    // But short circuit buffer size is normally not set.  Put in place the hbase wanted size.
838    int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize);
839    conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
840  }
841
842  private static class DfsBuilderUtility {
843    static Class<?> dfsClass = null;
844    static Method createMethod;
845    static Method overwriteMethod;
846    static Method bufferSizeMethod;
847    static Method blockSizeMethod;
848    static Method recursiveMethod;
849    static Method replicateMethod;
850    static Method replicationMethod;
851    static Method buildMethod;
852    static boolean allMethodsPresent = false;
853
854    static {
855      String dfsName = "org.apache.hadoop.hdfs.DistributedFileSystem";
856      String builderName = dfsName + "$HdfsDataOutputStreamBuilder";
857      Class<?> builderClass = null;
858
859      try {
860        dfsClass = Class.forName(dfsName);
861      } catch (ClassNotFoundException e) {
862        LOG.debug("{} not available, will not use builder API for file creation.", dfsName);
863      }
864      try {
865        builderClass = Class.forName(builderName);
866      } catch (ClassNotFoundException e) {
867        LOG.debug("{} not available, will not use builder API for file creation.", builderName);
868      }
869
870      if (dfsClass != null && builderClass != null) {
871        try {
872          createMethod = dfsClass.getMethod("createFile", Path.class);
873          overwriteMethod = builderClass.getMethod("overwrite", boolean.class);
874          bufferSizeMethod = builderClass.getMethod("bufferSize", int.class);
875          blockSizeMethod = builderClass.getMethod("blockSize", long.class);
876          recursiveMethod = builderClass.getMethod("recursive");
877          replicateMethod = builderClass.getMethod("replicate");
878          replicationMethod = builderClass.getMethod("replication", short.class);
879          buildMethod = builderClass.getMethod("build");
880
881          allMethodsPresent = true;
882          LOG.debug("Using builder API via reflection for DFS file creation.");
883        } catch (NoSuchMethodException e) {
884          LOG.debug("Could not find method on builder; will use old DFS API for file creation {}",
885              e.getMessage());
886        }
887      }
888    }
889
890    /**
891     * Attempt to use builder API via reflection to create a file with the given parameters and
892     * replication enabled.
893     */
894    static FSDataOutputStream createHelper(FileSystem fs, Path path, boolean overwritable,
895        int bufferSize, short replication, long blockSize, boolean isRecursive) throws IOException {
896      if (allMethodsPresent && dfsClass.isInstance(fs)) {
897        try {
898          Object builder;
899
900          builder = createMethod.invoke(fs, path);
901          builder = overwriteMethod.invoke(builder, overwritable);
902          builder = bufferSizeMethod.invoke(builder, bufferSize);
903          builder = blockSizeMethod.invoke(builder, blockSize);
904          if (isRecursive) {
905            builder = recursiveMethod.invoke(builder);
906          }
907          builder = replicateMethod.invoke(builder);
908          builder = replicationMethod.invoke(builder, replication);
909          return (FSDataOutputStream) buildMethod.invoke(builder);
910        } catch (IllegalAccessException | InvocationTargetException e) {
911          // Should have caught this failure during initialization, so log full trace here
912          LOG.warn("Couldn't use reflection with builder API", e);
913        }
914      }
915
916      if (isRecursive) {
917        return fs.create(path, overwritable, bufferSize, replication, blockSize, null);
918      }
919      return fs.createNonRecursive(path, overwritable, bufferSize, replication, blockSize, null);
920    }
921
922    /**
923     * Attempt to use builder API via reflection to create a file with the given parameters and
924     * replication enabled.
925     */
926    static FSDataOutputStream createHelper(FileSystem fs, Path path, boolean overwritable)
927        throws IOException {
928      if (allMethodsPresent && dfsClass.isInstance(fs)) {
929        try {
930          Object builder;
931
932          builder = createMethod.invoke(fs, path);
933          builder = overwriteMethod.invoke(builder, overwritable);
934          builder = replicateMethod.invoke(builder);
935          return (FSDataOutputStream) buildMethod.invoke(builder);
936        } catch (IllegalAccessException | InvocationTargetException e) {
937          // Should have caught this failure during initialization, so log full trace here
938          LOG.warn("Couldn't use reflection with builder API", e);
939        }
940      }
941
942      return fs.create(path, overwritable);
943    }
944  }
945
946  /**
947   * Attempt to use builder API via reflection to create a file with the given parameters and
948   * replication enabled.
949   * <p>
950   * Will not attempt to enable replication when passed an HFileSystem.
951   */
952  public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwritable)
953      throws IOException {
954    return DfsBuilderUtility.createHelper(fs, path, overwritable);
955  }
956
957  /**
958   * Attempt to use builder API via reflection to create a file with the given parameters and
959   * replication enabled.
960   * <p>
961   * Will not attempt to enable replication when passed an HFileSystem.
962   */
963  public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwritable,
964      int bufferSize, short replication, long blockSize, boolean isRecursive) throws IOException {
965    return DfsBuilderUtility.createHelper(fs, path, overwritable, bufferSize, replication,
966        blockSize, isRecursive);
967  }
968
969  // Holder singleton idiom. JVM spec ensures this will be run at most once per Classloader, and
970  // not until we attempt to reference it.
971  private static class StreamCapabilities {
972    public static final boolean PRESENT;
973    public static final Class<?> CLASS;
974    public static final Method METHOD;
975    static {
976      boolean tmp = false;
977      Class<?> clazz = null;
978      Method method = null;
979      try {
980        clazz = Class.forName("org.apache.hadoop.fs.StreamCapabilities");
981        method = clazz.getMethod("hasCapability", String.class);
982        tmp = true;
983      } catch(ClassNotFoundException|NoSuchMethodException|SecurityException exception) {
984        LOG.warn("Your Hadoop installation does not include the StreamCapabilities class from " +
985                 "HDFS-11644, so we will skip checking if any FSDataOutputStreams actually " +
986                 "support hflush/hsync. If you are running on top of HDFS this probably just " +
987                 "means you have an older version and this can be ignored. If you are running on " +
988                 "top of an alternate FileSystem implementation you should manually verify that " +
989                 "hflush and hsync are implemented; otherwise you risk data loss and hard to " +
990                 "diagnose errors when our assumptions are violated.");
991        LOG.debug("The first request to check for StreamCapabilities came from this stacktrace.",
992            exception);
993      } finally {
994        PRESENT = tmp;
995        CLASS = clazz;
996        METHOD = method;
997      }
998    }
999  }
1000
1001  /**
1002   * If our FileSystem version includes the StreamCapabilities class, check if
1003   * the given stream has a particular capability.
1004   * @param stream capabilities are per-stream instance, so check this one specifically. must not be
1005   *        null
1006   * @param capability what to look for, per Hadoop Common's FileSystem docs
1007   * @return true if there are no StreamCapabilities. false if there are, but this stream doesn't
1008   *         implement it. return result of asking the stream otherwise.
1009   */
1010  public static boolean hasCapability(FSDataOutputStream stream, String capability) {
1011    // be consistent whether or not StreamCapabilities is present
1012    if (stream == null) {
1013      throw new NullPointerException("stream parameter must not be null.");
1014    }
1015    // If o.a.h.fs.StreamCapabilities doesn't exist, assume everyone does everything
1016    // otherwise old versions of Hadoop will break.
1017    boolean result = true;
1018    if (StreamCapabilities.PRESENT) {
1019      // if StreamCapabilities is present, but the stream doesn't implement it
1020      // or we run into a problem invoking the method,
1021      // we treat that as equivalent to not declaring anything
1022      result = false;
1023      if (StreamCapabilities.CLASS.isAssignableFrom(stream.getClass())) {
1024        try {
1025          result = ((Boolean)StreamCapabilities.METHOD.invoke(stream, capability)).booleanValue();
1026        } catch (IllegalAccessException|IllegalArgumentException|InvocationTargetException
1027            exception) {
1028          LOG.warn("Your Hadoop installation's StreamCapabilities implementation doesn't match " +
1029              "our understanding of how it's supposed to work. Please file a JIRA and include " +
1030              "the following stack trace. In the mean time we're interpreting this behavior " +
1031              "difference as a lack of capability support, which will probably cause a failure.",
1032              exception);
1033        }
1034      }
1035    }
1036    return result;
1037  }
1038
1039  /**
1040   * Helper exception for those cases where the place where we need to check a stream capability
1041   * is not where we have the needed context to explain the impact and mitigation for a lack.
1042   */
1043  public static class StreamLacksCapabilityException extends Exception {
1044    public StreamLacksCapabilityException(String message, Throwable cause) {
1045      super(message, cause);
1046    }
1047    public StreamLacksCapabilityException(String message) {
1048      super(message);
1049    }
1050  }
1051}