001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.util;
020
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.lang.reflect.InvocationTargetException;
024import java.lang.reflect.Method;
025import java.net.URI;
026import java.net.URISyntaxException;
027import java.util.List;
028import java.util.Locale;
029import java.util.Map;
030import java.util.concurrent.ConcurrentHashMap;
031import org.apache.hadoop.HadoopIllegalArgumentException;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FSDataOutputStream;
034import org.apache.hadoop.fs.FileStatus;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.LocatedFileStatus;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.fs.PathFilter;
039import org.apache.hadoop.fs.RemoteIterator;
040import org.apache.hadoop.fs.permission.FsPermission;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.hadoop.ipc.RemoteException;
044import org.apache.yetus.audience.InterfaceAudience;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
049import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
050
051/**
052 * Utility methods for interacting with the underlying file system.
053 */
054@InterfaceAudience.Private
055public abstract class CommonFSUtils {
056  private static final Logger LOG = LoggerFactory.getLogger(CommonFSUtils.class);
057
058  /** Parameter name for HBase WAL directory */
059  public static final String HBASE_WAL_DIR = "hbase.wal.dir";
060
061  /** Parameter to disable stream capability enforcement checks */
062  public static final String UNSAFE_STREAM_CAPABILITY_ENFORCE = "hbase.unsafe.stream.capability.enforce";
063
064  /** Full access permissions (starting point for a umask) */
065  public static final String FULL_RWX_PERMISSIONS = "777";
066
067  protected CommonFSUtils() {
068    super();
069  }
070
071  /**
072   * Compare of path component. Does not consider schema; i.e. if schemas
073   * different but <code>path</code> starts with <code>rootPath</code>,
074   * then the function returns true
075   * @param rootPath value to check for
076   * @param path subject to check
077   * @return True if <code>path</code> starts with <code>rootPath</code>
078   */
079  public static boolean isStartingWithPath(final Path rootPath, final String path) {
080    String uriRootPath = rootPath.toUri().getPath();
081    String tailUriPath = (new Path(path)).toUri().getPath();
082    return tailUriPath.startsWith(uriRootPath);
083  }
084
085  /**
086   * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
087   * '/a/b/c' part. Does not consider schema; i.e. if schemas different but path or subpath matches,
088   * the two will equate.
089   * @param pathToSearch Path we will be trying to match against.
090   * @param pathTail what to match
091   * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
092   */
093  public static boolean isMatchingTail(final Path pathToSearch, String pathTail) {
094    return isMatchingTail(pathToSearch, new Path(pathTail));
095  }
096
097  /**
098   * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
099   * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true.  Does not consider
100   * schema; i.e. if schemas different but path or subpath matches, the two will equate.
101   * @param pathToSearch Path we will be trying to match agains against
102   * @param pathTail what to match
103   * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
104   */
105  public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) {
106    if (pathToSearch.depth() != pathTail.depth()) {
107      return false;
108    }
109    Path tailPath = pathTail;
110    String tailName;
111    Path toSearch = pathToSearch;
112    String toSearchName;
113    boolean result = false;
114    do {
115      tailName = tailPath.getName();
116      if (tailName == null || tailName.length() <= 0) {
117        result = true;
118        break;
119      }
120      toSearchName = toSearch.getName();
121      if (toSearchName == null || toSearchName.length() <= 0) {
122        break;
123      }
124      // Move up a parent on each path for next go around.  Path doesn't let us go off the end.
125      tailPath = tailPath.getParent();
126      toSearch = toSearch.getParent();
127    } while(tailName.equals(toSearchName));
128    return result;
129  }
130
131  /**
132   * Delete if exists.
133   * @param fs filesystem object
134   * @param dir directory to delete
135   * @return True if deleted <code>dir</code>
136   * @throws IOException e
137   */
138  public static boolean deleteDirectory(final FileSystem fs, final Path dir)
139  throws IOException {
140    return fs.exists(dir) && fs.delete(dir, true);
141  }
142
143  /**
144   * Return the number of bytes that large input files should be optimally
145   * be split into to minimize i/o time.
146   *
147   * use reflection to search for getDefaultBlockSize(Path f)
148   * if the method doesn't exist, fall back to using getDefaultBlockSize()
149   *
150   * @param fs filesystem object
151   * @return the default block size for the path's filesystem
152   * @throws IOException e
153   */
154  public static long getDefaultBlockSize(final FileSystem fs, final Path path) throws IOException {
155    Method m = null;
156    Class<? extends FileSystem> cls = fs.getClass();
157    try {
158      m = cls.getMethod("getDefaultBlockSize", new Class<?>[] { Path.class });
159    } catch (NoSuchMethodException e) {
160      LOG.info("FileSystem doesn't support getDefaultBlockSize");
161    } catch (SecurityException e) {
162      LOG.info("Doesn't have access to getDefaultBlockSize on FileSystems", e);
163      m = null; // could happen on setAccessible()
164    }
165    if (m == null) {
166      return fs.getDefaultBlockSize(path);
167    } else {
168      try {
169        Object ret = m.invoke(fs, path);
170        return ((Long)ret).longValue();
171      } catch (Exception e) {
172        throw new IOException(e);
173      }
174    }
175  }
176
177  /*
178   * Get the default replication.
179   *
180   * use reflection to search for getDefaultReplication(Path f)
181   * if the method doesn't exist, fall back to using getDefaultReplication()
182   *
183   * @param fs filesystem object
184   * @param f path of file
185   * @return default replication for the path's filesystem
186   * @throws IOException e
187   */
188  public static short getDefaultReplication(final FileSystem fs, final Path path)
189      throws IOException {
190    Method m = null;
191    Class<? extends FileSystem> cls = fs.getClass();
192    try {
193      m = cls.getMethod("getDefaultReplication", new Class<?>[] { Path.class });
194    } catch (NoSuchMethodException e) {
195      LOG.info("FileSystem doesn't support getDefaultReplication");
196    } catch (SecurityException e) {
197      LOG.info("Doesn't have access to getDefaultReplication on FileSystems", e);
198      m = null; // could happen on setAccessible()
199    }
200    if (m == null) {
201      return fs.getDefaultReplication(path);
202    } else {
203      try {
204        Object ret = m.invoke(fs, path);
205        return ((Number)ret).shortValue();
206      } catch (Exception e) {
207        throw new IOException(e);
208      }
209    }
210  }
211
212  /**
213   * Returns the default buffer size to use during writes.
214   *
215   * The size of the buffer should probably be a multiple of hardware
216   * page size (4096 on Intel x86), and it determines how much data is
217   * buffered during read and write operations.
218   *
219   * @param fs filesystem object
220   * @return default buffer size to use during writes
221   */
222  public static int getDefaultBufferSize(final FileSystem fs) {
223    return fs.getConf().getInt("io.file.buffer.size", 4096);
224  }
225
226  /**
227   * Create the specified file on the filesystem. By default, this will:
228   * <ol>
229   * <li>apply the umask in the configuration (if it is enabled)</li>
230   * <li>use the fs configured buffer size (or 4096 if not set)</li>
231   * <li>use the default replication</li>
232   * <li>use the default block size</li>
233   * <li>not track progress</li>
234   * </ol>
235   *
236   * @param fs {@link FileSystem} on which to write the file
237   * @param path {@link Path} to the file to write
238   * @param perm intial permissions
239   * @param overwrite Whether or not the created file should be overwritten.
240   * @return output stream to the created file
241   * @throws IOException if the file cannot be created
242   */
243  public static FSDataOutputStream create(FileSystem fs, Path path,
244      FsPermission perm, boolean overwrite) throws IOException {
245    if (LOG.isTraceEnabled()) {
246      LOG.trace("Creating file=" + path + " with permission=" + perm + ", overwrite=" + overwrite);
247    }
248    return fs.create(path, perm, overwrite, getDefaultBufferSize(fs),
249        getDefaultReplication(fs, path), getDefaultBlockSize(fs, path), null);
250  }
251
252  /**
253   * Get the file permissions specified in the configuration, if they are
254   * enabled.
255   *
256   * @param fs filesystem that the file will be created on.
257   * @param conf configuration to read for determining if permissions are
258   *          enabled and which to use
259   * @param permssionConfKey property key in the configuration to use when
260   *          finding the permission
261   * @return the permission to use when creating a new file on the fs. If
262   *         special permissions are not specified in the configuration, then
263   *         the default permissions on the the fs will be returned.
264   */
265  public static FsPermission getFilePermissions(final FileSystem fs,
266      final Configuration conf, final String permssionConfKey) {
267    boolean enablePermissions = conf.getBoolean(
268        HConstants.ENABLE_DATA_FILE_UMASK, false);
269
270    if (enablePermissions) {
271      try {
272        FsPermission perm = new FsPermission(FULL_RWX_PERMISSIONS);
273        // make sure that we have a mask, if not, go default.
274        String mask = conf.get(permssionConfKey);
275        if (mask == null) {
276          return FsPermission.getFileDefault();
277        }
278        // appy the umask
279        FsPermission umask = new FsPermission(mask);
280        return perm.applyUMask(umask);
281      } catch (IllegalArgumentException e) {
282        LOG.warn(
283            "Incorrect umask attempted to be created: "
284                + conf.get(permssionConfKey)
285                + ", using default file permissions.", e);
286        return FsPermission.getFileDefault();
287      }
288    }
289    return FsPermission.getFileDefault();
290  }
291
292  /**
293   * Verifies root directory path is a valid URI with a scheme
294   *
295   * @param root root directory path
296   * @return Passed <code>root</code> argument.
297   * @throws IOException if not a valid URI with a scheme
298   */
299  public static Path validateRootPath(Path root) throws IOException {
300    try {
301      URI rootURI = new URI(root.toString());
302      String scheme = rootURI.getScheme();
303      if (scheme == null) {
304        throw new IOException("Root directory does not have a scheme");
305      }
306      return root;
307    } catch (URISyntaxException e) {
308      IOException io = new IOException("Root directory path is not a valid " +
309        "URI -- check your " + HConstants.HBASE_DIR + " configuration");
310      io.initCause(e);
311      throw io;
312    }
313  }
314
315  /**
316   * Checks for the presence of the WAL log root path (using the provided conf object) in the given
317   * path. If it exists, this method removes it and returns the String representation of remaining
318   * relative path.
319   * @param path must not be null
320   * @param conf must not be null
321   * @return String representation of the remaining relative path
322   * @throws IOException from underlying filesystem
323   */
324  public static String removeWALRootPath(Path path, final Configuration conf) throws IOException {
325    Path root = getWALRootDir(conf);
326    String pathStr = path.toString();
327    // check that the path is absolute... it has the root path in it.
328    if (!pathStr.startsWith(root.toString())) {
329      return pathStr;
330    }
331    // if not, return as it is.
332    return pathStr.substring(root.toString().length() + 1);// remove the "/" too.
333  }
334
335  /**
336   * Return the 'path' component of a Path.  In Hadoop, Path is an URI.  This
337   * method returns the 'path' component of a Path's URI: e.g. If a Path is
338   * <code>hdfs://example.org:9000/hbase_trunk/TestTable/compaction.dir</code>,
339   * this method returns <code>/hbase_trunk/TestTable/compaction.dir</code>.
340   * This method is useful if you want to print out a Path without qualifying
341   * Filesystem instance.
342   * @param p Filesystem Path whose 'path' component we are to return.
343   * @return Path portion of the Filesystem
344   */
345  public static String getPath(Path p) {
346    return p.toUri().getPath();
347  }
348
349  /**
350   * @param c configuration
351   * @return {@link Path} to hbase root directory from
352   *     configuration as a qualified Path.
353   * @throws IOException e
354   */
355  public static Path getRootDir(final Configuration c) throws IOException {
356    Path p = new Path(c.get(HConstants.HBASE_DIR));
357    FileSystem fs = p.getFileSystem(c);
358    return p.makeQualified(fs.getUri(), fs.getWorkingDirectory());
359  }
360
361  public static void setRootDir(final Configuration c, final Path root) throws IOException {
362    c.set(HConstants.HBASE_DIR, root.toString());
363  }
364
365  public static void setFsDefault(final Configuration c, final Path root) throws IOException {
366    c.set("fs.defaultFS", root.toString());    // for hadoop 0.21+
367  }
368
369  public static FileSystem getRootDirFileSystem(final Configuration c) throws IOException {
370    Path p = getRootDir(c);
371    return p.getFileSystem(c);
372  }
373
374  /**
375   * @param c configuration
376   * @return {@link Path} to hbase log root directory: e.g. {@value HBASE_WAL_DIR} from
377   *     configuration as a qualified Path. Defaults to HBase root dir.
378   * @throws IOException e
379   */
380  public static Path getWALRootDir(final Configuration c) throws IOException {
381
382    Path p = new Path(c.get(HBASE_WAL_DIR, c.get(HConstants.HBASE_DIR)));
383    if (!isValidWALRootDir(p, c)) {
384      return getRootDir(c);
385    }
386    FileSystem fs = p.getFileSystem(c);
387    return p.makeQualified(fs.getUri(), fs.getWorkingDirectory());
388  }
389
390  @VisibleForTesting
391  public static void setWALRootDir(final Configuration c, final Path root) throws IOException {
392    c.set(HBASE_WAL_DIR, root.toString());
393  }
394
395  public static FileSystem getWALFileSystem(final Configuration c) throws IOException {
396    Path p = getWALRootDir(c);
397    FileSystem fs = p.getFileSystem(c);
398    // hadoop-core does fs caching, so need to propogate this if set
399    String enforceStreamCapability = c.get(UNSAFE_STREAM_CAPABILITY_ENFORCE);
400    if (enforceStreamCapability != null) {
401      fs.getConf().set(UNSAFE_STREAM_CAPABILITY_ENFORCE, enforceStreamCapability);
402    }
403    return fs;
404  }
405
406  private static boolean isValidWALRootDir(Path walDir, final Configuration c) throws IOException {
407    Path rootDir = getRootDir(c);
408    FileSystem fs = walDir.getFileSystem(c);
409    Path qualifiedWalDir = walDir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
410    if (!qualifiedWalDir.equals(rootDir)) {
411      if (qualifiedWalDir.toString().startsWith(rootDir.toString() + "/")) {
412        throw new IllegalStateException("Illegal WAL directory specified. " +
413            "WAL directories are not permitted to be under the root directory if set.");
414      }
415    }
416    return true;
417  }
418
419  /**
420   * Returns the WAL region directory based on the given table name and region name
421   * @param conf configuration to determine WALRootDir
422   * @param tableName Table that the region is under
423   * @param encodedRegionName Region name used for creating the final region directory
424   * @return the region directory used to store WALs under the WALRootDir
425   * @throws IOException if there is an exception determining the WALRootDir
426   */
427  public static Path getWALRegionDir(final Configuration conf, final TableName tableName,
428      final String encodedRegionName) throws IOException {
429    return new Path(getWALTableDir(conf, tableName), encodedRegionName);
430  }
431
432  /**
433   * Returns the Table directory under the WALRootDir for the specified table name
434   * @param conf configuration used to get the WALRootDir
435   * @param tableName Table to get the directory for
436   * @return a path to the WAL table directory for the specified table
437   * @throws IOException if there is an exception determining the WALRootDir
438   */
439  public static Path getWALTableDir(final Configuration conf, final TableName tableName)
440      throws IOException {
441    Path baseDir = new Path(getWALRootDir(conf), HConstants.BASE_NAMESPACE_DIR);
442    return new Path(new Path(baseDir, tableName.getNamespaceAsString()),
443      tableName.getQualifierAsString());
444  }
445
446  /**
447   * For backward compatibility with HBASE-20734, where we store recovered edits in a wrong
448   * directory without BASE_NAMESPACE_DIR. See HBASE-22617 for more details.
449   * @deprecated For compatibility, will be removed in 4.0.0.
450   */
451  @Deprecated
452  public static Path getWrongWALRegionDir(final Configuration conf, final TableName tableName,
453      final String encodedRegionName) throws IOException {
454    Path wrongTableDir = new Path(new Path(getWALRootDir(conf), tableName.getNamespaceAsString()),
455      tableName.getQualifierAsString());
456    return new Path(wrongTableDir, encodedRegionName);
457  }
458
459  /**
460   * Returns the {@link org.apache.hadoop.fs.Path} object representing the table directory under
461   * path rootdir
462   *
463   * @param rootdir qualified path of HBase root directory
464   * @param tableName name of table
465   * @return {@link org.apache.hadoop.fs.Path} for table
466   */
467  public static Path getTableDir(Path rootdir, final TableName tableName) {
468    return new Path(getNamespaceDir(rootdir, tableName.getNamespaceAsString()),
469        tableName.getQualifierAsString());
470  }
471
472  /**
473   * Returns the {@link org.apache.hadoop.hbase.TableName} object representing
474   * the table directory under
475   * path rootdir
476   *
477   * @param tablePath path of table
478   * @return {@link org.apache.hadoop.fs.Path} for table
479   */
480  public static TableName getTableName(Path tablePath) {
481    return TableName.valueOf(tablePath.getParent().getName(), tablePath.getName());
482  }
483
484  /**
485   * Returns the {@link org.apache.hadoop.fs.Path} object representing
486   * the namespace directory under path rootdir
487   *
488   * @param rootdir qualified path of HBase root directory
489   * @param namespace namespace name
490   * @return {@link org.apache.hadoop.fs.Path} for table
491   */
492  public static Path getNamespaceDir(Path rootdir, final String namespace) {
493    return new Path(rootdir, new Path(HConstants.BASE_NAMESPACE_DIR,
494        new Path(namespace)));
495  }
496
497  // this mapping means that under a federated FileSystem implementation, we'll
498  // only log the first failure from any of the underlying FileSystems at WARN and all others
499  // will be at DEBUG.
500  private static final Map<FileSystem, Boolean> warningMap =
501      new ConcurrentHashMap<FileSystem, Boolean>();
502
503  /**
504   * Sets storage policy for given path.
505   * If the passed path is a directory, we'll set the storage policy for all files
506   * created in the future in said directory. Note that this change in storage
507   * policy takes place at the FileSystem level; it will persist beyond this RS's lifecycle.
508   * If we're running on a version of FileSystem that doesn't support the given storage policy
509   * (or storage policies at all), then we'll issue a log message and continue.
510   *
511   * See http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html
512   *
513   * @param fs We only do anything it implements a setStoragePolicy method
514   * @param path the Path whose storage policy is to be set
515   * @param storagePolicy Policy to set on <code>path</code>; see hadoop 2.6+
516   *   org.apache.hadoop.hdfs.protocol.HdfsConstants for possible list e.g
517   *   'COLD', 'WARM', 'HOT', 'ONE_SSD', 'ALL_SSD', 'LAZY_PERSIST'.
518   */
519  public static void setStoragePolicy(final FileSystem fs, final Path path,
520      final String storagePolicy) {
521    try {
522      setStoragePolicy(fs, path, storagePolicy, false);
523    } catch (IOException e) {
524      // should never arrive here
525      LOG.warn("We have chosen not to throw exception but some unexpectedly thrown out", e);
526    }
527  }
528
529  static void setStoragePolicy(final FileSystem fs, final Path path, final String storagePolicy,
530      boolean throwException) throws IOException {
531    if (storagePolicy == null) {
532      if (LOG.isTraceEnabled()) {
533        LOG.trace("We were passed a null storagePolicy, exiting early.");
534      }
535      return;
536    }
537    String trimmedStoragePolicy = storagePolicy.trim();
538    if (trimmedStoragePolicy.isEmpty()) {
539      if (LOG.isTraceEnabled()) {
540        LOG.trace("We were passed an empty storagePolicy, exiting early.");
541      }
542      return;
543    } else {
544      trimmedStoragePolicy = trimmedStoragePolicy.toUpperCase(Locale.ROOT);
545    }
546    if (trimmedStoragePolicy.equals(HConstants.DEFER_TO_HDFS_STORAGE_POLICY)) {
547      if (LOG.isTraceEnabled()) {
548        LOG.trace("We were passed the defer-to-hdfs policy {}, exiting early.",
549          trimmedStoragePolicy);
550      }
551      return;
552    }
553    try {
554      invokeSetStoragePolicy(fs, path, trimmedStoragePolicy);
555    } catch (IOException e) {
556      if (LOG.isTraceEnabled()) {
557        LOG.trace("Failed to invoke set storage policy API on FS", e);
558      }
559      if (throwException) {
560        throw e;
561      }
562    }
563  }
564
565  /*
566   * All args have been checked and are good. Run the setStoragePolicy invocation.
567   */
568  private static void invokeSetStoragePolicy(final FileSystem fs, final Path path,
569      final String storagePolicy) throws IOException {
570    Method m = null;
571    Exception toThrow = null;
572    try {
573      m = fs.getClass().getDeclaredMethod("setStoragePolicy",
574        new Class<?>[] { Path.class, String.class });
575      m.setAccessible(true);
576    } catch (NoSuchMethodException e) {
577      toThrow = e;
578      final String msg = "FileSystem doesn't support setStoragePolicy; HDFS-6584, HDFS-9345 " +
579          "not available. This is normal and expected on earlier Hadoop versions.";
580      if (!warningMap.containsKey(fs)) {
581        warningMap.put(fs, true);
582        LOG.warn(msg, e);
583      } else if (LOG.isDebugEnabled()) {
584        LOG.debug(msg, e);
585      }
586      m = null;
587    } catch (SecurityException e) {
588      toThrow = e;
589      final String msg = "No access to setStoragePolicy on FileSystem from the SecurityManager; " +
590          "HDFS-6584, HDFS-9345 not available. This is unusual and probably warrants an email " +
591          "to the user@hbase mailing list. Please be sure to include a link to your configs, and " +
592          "logs that include this message and period of time before it. Logs around service " +
593          "start up will probably be useful as well.";
594      if (!warningMap.containsKey(fs)) {
595        warningMap.put(fs, true);
596        LOG.warn(msg, e);
597      } else if (LOG.isDebugEnabled()) {
598        LOG.debug(msg, e);
599      }
600      m = null; // could happen on setAccessible() or getDeclaredMethod()
601    }
602    if (m != null) {
603      try {
604        m.invoke(fs, path, storagePolicy);
605        if (LOG.isDebugEnabled()) {
606          LOG.debug("Set storagePolicy=" + storagePolicy + " for path=" + path);
607        }
608      } catch (Exception e) {
609        toThrow = e;
610        // This swallows FNFE, should we be throwing it? seems more likely to indicate dev
611        // misuse than a runtime problem with HDFS.
612        if (!warningMap.containsKey(fs)) {
613          warningMap.put(fs, true);
614          LOG.warn("Unable to set storagePolicy=" + storagePolicy + " for path=" + path + ". " +
615              "DEBUG log level might have more details.", e);
616        } else if (LOG.isDebugEnabled()) {
617          LOG.debug("Unable to set storagePolicy=" + storagePolicy + " for path=" + path, e);
618        }
619        // check for lack of HDFS-7228
620        if (e instanceof InvocationTargetException) {
621          final Throwable exception = e.getCause();
622          if (exception instanceof RemoteException &&
623              HadoopIllegalArgumentException.class.getName().equals(
624                ((RemoteException)exception).getClassName())) {
625            if (LOG.isDebugEnabled()) {
626              LOG.debug("Given storage policy, '" +storagePolicy +"', was rejected and probably " +
627                "isn't a valid policy for the version of Hadoop you're running. I.e. if you're " +
628                "trying to use SSD related policies then you're likely missing HDFS-7228. For " +
629                "more information see the 'ArchivalStorage' docs for your Hadoop release.");
630            }
631          // Hadoop 2.8+, 3.0-a1+ added FileSystem.setStoragePolicy with a default implementation
632          // that throws UnsupportedOperationException
633          } else if (exception instanceof UnsupportedOperationException) {
634            if (LOG.isDebugEnabled()) {
635              LOG.debug("The underlying FileSystem implementation doesn't support " +
636                  "setStoragePolicy. This is probably intentional on their part, since HDFS-9345 " +
637                  "appears to be present in your version of Hadoop. For more information check " +
638                  "the Hadoop documentation on 'ArchivalStorage', the Hadoop FileSystem " +
639                  "specification docs from HADOOP-11981, and/or related documentation from the " +
640                  "provider of the underlying FileSystem (its name should appear in the " +
641                  "stacktrace that accompanies this message). Note in particular that Hadoop's " +
642                  "local filesystem implementation doesn't support storage policies.", exception);
643            }
644          }
645        }
646      }
647    }
648    if (toThrow != null) {
649      throw new IOException(toThrow);
650    }
651  }
652
653  /**
654   * @param conf must not be null
655   * @return True if this filesystem whose scheme is 'hdfs'.
656   * @throws IOException from underlying FileSystem
657   */
658  public static boolean isHDFS(final Configuration conf) throws IOException {
659    FileSystem fs = FileSystem.get(conf);
660    String scheme = fs.getUri().getScheme();
661    return scheme.equalsIgnoreCase("hdfs");
662  }
663
664  /**
665   * Checks if the given path is the one with 'recovered.edits' dir.
666   * @param path must not be null
667   * @return True if we recovered edits
668   */
669  public static boolean isRecoveredEdits(Path path) {
670    return path.toString().contains(HConstants.RECOVERED_EDITS_DIR);
671  }
672
673  /**
674   * @param conf must not be null
675   * @return Returns the filesystem of the hbase rootdir.
676   * @throws IOException from underlying FileSystem
677   */
678  public static FileSystem getCurrentFileSystem(Configuration conf)
679  throws IOException {
680    return getRootDir(conf).getFileSystem(conf);
681  }
682
683  /**
684   * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
685   * This accommodates differences between hadoop versions, where hadoop 1
686   * does not throw a FileNotFoundException, and return an empty FileStatus[]
687   * while Hadoop 2 will throw FileNotFoundException.
688   *
689   * Where possible, prefer FSUtils#listStatusWithStatusFilter(FileSystem,
690   * Path, FileStatusFilter) instead.
691   *
692   * @param fs file system
693   * @param dir directory
694   * @param filter path filter
695   * @return null if dir is empty or doesn't exist, otherwise FileStatus array
696   */
697  public static FileStatus [] listStatus(final FileSystem fs,
698      final Path dir, final PathFilter filter) throws IOException {
699    FileStatus [] status = null;
700    try {
701      status = filter == null ? fs.listStatus(dir) : fs.listStatus(dir, filter);
702    } catch (FileNotFoundException fnfe) {
703      // if directory doesn't exist, return null
704      if (LOG.isTraceEnabled()) {
705        LOG.trace(dir + " doesn't exist");
706      }
707    }
708    if (status == null || status.length < 1) {
709      return null;
710    }
711    return status;
712  }
713
714  /**
715   * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
716   * This would accommodates differences between hadoop versions
717   *
718   * @param fs file system
719   * @param dir directory
720   * @return null if dir is empty or doesn't exist, otherwise FileStatus array
721   */
722  public static FileStatus[] listStatus(final FileSystem fs, final Path dir) throws IOException {
723    return listStatus(fs, dir, null);
724  }
725
726  /**
727   * Calls fs.listFiles() to get FileStatus and BlockLocations together for reducing rpc call
728   *
729   * @param fs file system
730   * @param dir directory
731   * @return LocatedFileStatus list
732   */
733  public static List<LocatedFileStatus> listLocatedStatus(final FileSystem fs,
734      final Path dir) throws IOException {
735    List<LocatedFileStatus> status = null;
736    try {
737      RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fs
738          .listFiles(dir, false);
739      while (locatedFileStatusRemoteIterator.hasNext()) {
740        if (status == null) {
741          status = Lists.newArrayList();
742        }
743        status.add(locatedFileStatusRemoteIterator.next());
744      }
745    } catch (FileNotFoundException fnfe) {
746      // if directory doesn't exist, return null
747      if (LOG.isTraceEnabled()) {
748        LOG.trace(dir + " doesn't exist");
749      }
750    }
751    return status;
752  }
753
754  /**
755   * Calls fs.delete() and returns the value returned by the fs.delete()
756   *
757   * @param fs must not be null
758   * @param path must not be null
759   * @param recursive delete tree rooted at path
760   * @return the value returned by the fs.delete()
761   * @throws IOException from underlying FileSystem
762   */
763  public static boolean delete(final FileSystem fs, final Path path, final boolean recursive)
764      throws IOException {
765    return fs.delete(path, recursive);
766  }
767
768  /**
769   * Calls fs.exists(). Checks if the specified path exists
770   *
771   * @param fs must not be null
772   * @param path must not be null
773   * @return the value returned by fs.exists()
774   * @throws IOException from underlying FileSystem
775   */
776  public static boolean isExists(final FileSystem fs, final Path path) throws IOException {
777    return fs.exists(path);
778  }
779
780  /**
781   * Log the current state of the filesystem from a certain root directory
782   * @param fs filesystem to investigate
783   * @param root root file/directory to start logging from
784   * @param LOG log to output information
785   * @throws IOException if an unexpected exception occurs
786   */
787  public static void logFileSystemState(final FileSystem fs, final Path root, Logger LOG)
788      throws IOException {
789    LOG.debug("File system contents for path " + root);
790    logFSTree(LOG, fs, root, "|-");
791  }
792
793  /**
794   * Recursive helper to log the state of the FS
795   *
796   * @see #logFileSystemState(FileSystem, Path, Logger)
797   */
798  private static void logFSTree(Logger LOG, final FileSystem fs, final Path root, String prefix)
799      throws IOException {
800    FileStatus[] files = listStatus(fs, root, null);
801    if (files == null) {
802      return;
803    }
804
805    for (FileStatus file : files) {
806      if (file.isDirectory()) {
807        LOG.debug(prefix + file.getPath().getName() + "/");
808        logFSTree(LOG, fs, file.getPath(), prefix + "---");
809      } else {
810        LOG.debug(prefix + file.getPath().getName());
811      }
812    }
813  }
814
815  public static boolean renameAndSetModifyTime(final FileSystem fs, final Path src, final Path dest)
816      throws IOException {
817    // set the modify time for TimeToLive Cleaner
818    fs.setTimes(src, EnvironmentEdgeManager.currentTime(), -1);
819    return fs.rename(src, dest);
820  }
821
822  /**
823   * Do our short circuit read setup.
824   * Checks buffer size to use and whether to do checksumming in hbase or hdfs.
825   * @param conf must not be null
826   */
827  public static void setupShortCircuitRead(final Configuration conf) {
828    // Check that the user has not set the "dfs.client.read.shortcircuit.skip.checksum" property.
829    boolean shortCircuitSkipChecksum =
830      conf.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false);
831    boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
832    if (shortCircuitSkipChecksum) {
833      LOG.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not " +
834        "be set to true." + (useHBaseChecksum ? " HBase checksum doesn't require " +
835        "it, see https://issues.apache.org/jira/browse/HBASE-6868." : ""));
836      assert !shortCircuitSkipChecksum; //this will fail if assertions are on
837    }
838    checkShortCircuitReadBufferSize(conf);
839  }
840
841  /**
842   * Check if short circuit read buffer size is set and if not, set it to hbase value.
843   * @param conf must not be null
844   */
845  public static void checkShortCircuitReadBufferSize(final Configuration conf) {
846    final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2;
847    final int notSet = -1;
848    // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2
849    final String dfsKey = "dfs.client.read.shortcircuit.buffer.size";
850    int size = conf.getInt(dfsKey, notSet);
851    // If a size is set, return -- we will use it.
852    if (size != notSet) {
853      return;
854    }
855    // But short circuit buffer size is normally not set.  Put in place the hbase wanted size.
856    int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize);
857    conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
858  }
859
860  private static class DfsBuilderUtility {
861    static Class<?> dfsClass = null;
862    static Method createMethod;
863    static Method overwriteMethod;
864    static Method bufferSizeMethod;
865    static Method blockSizeMethod;
866    static Method recursiveMethod;
867    static Method replicateMethod;
868    static Method replicationMethod;
869    static Method buildMethod;
870    static boolean allMethodsPresent = false;
871
872    static {
873      String dfsName = "org.apache.hadoop.hdfs.DistributedFileSystem";
874      String builderName = dfsName + "$HdfsDataOutputStreamBuilder";
875      Class<?> builderClass = null;
876
877      try {
878        dfsClass = Class.forName(dfsName);
879      } catch (ClassNotFoundException e) {
880        LOG.debug("{} not available, will not use builder API for file creation.", dfsName);
881      }
882      try {
883        builderClass = Class.forName(builderName);
884      } catch (ClassNotFoundException e) {
885        LOG.debug("{} not available, will not use builder API for file creation.", builderName);
886      }
887
888      if (dfsClass != null && builderClass != null) {
889        try {
890          createMethod = dfsClass.getMethod("createFile", Path.class);
891          overwriteMethod = builderClass.getMethod("overwrite", boolean.class);
892          bufferSizeMethod = builderClass.getMethod("bufferSize", int.class);
893          blockSizeMethod = builderClass.getMethod("blockSize", long.class);
894          recursiveMethod = builderClass.getMethod("recursive");
895          replicateMethod = builderClass.getMethod("replicate");
896          replicationMethod = builderClass.getMethod("replication", short.class);
897          buildMethod = builderClass.getMethod("build");
898
899          allMethodsPresent = true;
900          LOG.debug("Using builder API via reflection for DFS file creation.");
901        } catch (NoSuchMethodException e) {
902          LOG.debug("Could not find method on builder; will use old DFS API for file creation {}",
903              e.getMessage());
904        }
905      }
906    }
907
908    /**
909     * Attempt to use builder API via reflection to create a file with the given parameters and
910     * replication enabled.
911     */
912    static FSDataOutputStream createHelper(FileSystem fs, Path path, boolean overwritable,
913        int bufferSize, short replication, long blockSize, boolean isRecursive) throws IOException {
914      if (allMethodsPresent && dfsClass.isInstance(fs)) {
915        try {
916          Object builder;
917
918          builder = createMethod.invoke(fs, path);
919          builder = overwriteMethod.invoke(builder, overwritable);
920          builder = bufferSizeMethod.invoke(builder, bufferSize);
921          builder = blockSizeMethod.invoke(builder, blockSize);
922          if (isRecursive) {
923            builder = recursiveMethod.invoke(builder);
924          }
925          builder = replicateMethod.invoke(builder);
926          builder = replicationMethod.invoke(builder, replication);
927          return (FSDataOutputStream) buildMethod.invoke(builder);
928        } catch (IllegalAccessException | InvocationTargetException e) {
929          // Should have caught this failure during initialization, so log full trace here
930          LOG.warn("Couldn't use reflection with builder API", e);
931        }
932      }
933
934      if (isRecursive) {
935        return fs.create(path, overwritable, bufferSize, replication, blockSize, null);
936      }
937      return fs.createNonRecursive(path, overwritable, bufferSize, replication, blockSize, null);
938    }
939
940    /**
941     * Attempt to use builder API via reflection to create a file with the given parameters and
942     * replication enabled.
943     */
944    static FSDataOutputStream createHelper(FileSystem fs, Path path, boolean overwritable)
945        throws IOException {
946      if (allMethodsPresent && dfsClass.isInstance(fs)) {
947        try {
948          Object builder;
949
950          builder = createMethod.invoke(fs, path);
951          builder = overwriteMethod.invoke(builder, overwritable);
952          builder = replicateMethod.invoke(builder);
953          return (FSDataOutputStream) buildMethod.invoke(builder);
954        } catch (IllegalAccessException | InvocationTargetException e) {
955          // Should have caught this failure during initialization, so log full trace here
956          LOG.warn("Couldn't use reflection with builder API", e);
957        }
958      }
959
960      return fs.create(path, overwritable);
961    }
962  }
963
964  /**
965   * Attempt to use builder API via reflection to create a file with the given parameters and
966   * replication enabled.
967   * <p>
968   * Will not attempt to enable replication when passed an HFileSystem.
969   */
970  public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwritable)
971      throws IOException {
972    return DfsBuilderUtility.createHelper(fs, path, overwritable);
973  }
974
975  /**
976   * Attempt to use builder API via reflection to create a file with the given parameters and
977   * replication enabled.
978   * <p>
979   * Will not attempt to enable replication when passed an HFileSystem.
980   */
981  public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwritable,
982      int bufferSize, short replication, long blockSize, boolean isRecursive) throws IOException {
983    return DfsBuilderUtility.createHelper(fs, path, overwritable, bufferSize, replication,
984        blockSize, isRecursive);
985  }
986
987  // Holder singleton idiom. JVM spec ensures this will be run at most once per Classloader, and
988  // not until we attempt to reference it.
989  private static class StreamCapabilities {
990    public static final boolean PRESENT;
991    public static final Class<?> CLASS;
992    public static final Method METHOD;
993    static {
994      boolean tmp = false;
995      Class<?> clazz = null;
996      Method method = null;
997      try {
998        clazz = Class.forName("org.apache.hadoop.fs.StreamCapabilities");
999        method = clazz.getMethod("hasCapability", String.class);
1000        tmp = true;
1001      } catch(ClassNotFoundException|NoSuchMethodException|SecurityException exception) {
1002        LOG.warn("Your Hadoop installation does not include the StreamCapabilities class from " +
1003                 "HDFS-11644, so we will skip checking if any FSDataOutputStreams actually " +
1004                 "support hflush/hsync. If you are running on top of HDFS this probably just " +
1005                 "means you have an older version and this can be ignored. If you are running on " +
1006                 "top of an alternate FileSystem implementation you should manually verify that " +
1007                 "hflush and hsync are implemented; otherwise you risk data loss and hard to " +
1008                 "diagnose errors when our assumptions are violated.");
1009        LOG.debug("The first request to check for StreamCapabilities came from this stacktrace.",
1010            exception);
1011      } finally {
1012        PRESENT = tmp;
1013        CLASS = clazz;
1014        METHOD = method;
1015      }
1016    }
1017  }
1018
1019  /**
1020   * If our FileSystem version includes the StreamCapabilities class, check if
1021   * the given stream has a particular capability.
1022   * @param stream capabilities are per-stream instance, so check this one specifically. must not be
1023   *        null
1024   * @param capability what to look for, per Hadoop Common's FileSystem docs
1025   * @return true if there are no StreamCapabilities. false if there are, but this stream doesn't
1026   *         implement it. return result of asking the stream otherwise.
1027   */
1028  public static boolean hasCapability(FSDataOutputStream stream, String capability) {
1029    // be consistent whether or not StreamCapabilities is present
1030    if (stream == null) {
1031      throw new NullPointerException("stream parameter must not be null.");
1032    }
1033    // If o.a.h.fs.StreamCapabilities doesn't exist, assume everyone does everything
1034    // otherwise old versions of Hadoop will break.
1035    boolean result = true;
1036    if (StreamCapabilities.PRESENT) {
1037      // if StreamCapabilities is present, but the stream doesn't implement it
1038      // or we run into a problem invoking the method,
1039      // we treat that as equivalent to not declaring anything
1040      result = false;
1041      if (StreamCapabilities.CLASS.isAssignableFrom(stream.getClass())) {
1042        try {
1043          result = ((Boolean)StreamCapabilities.METHOD.invoke(stream, capability)).booleanValue();
1044        } catch (IllegalAccessException|IllegalArgumentException|InvocationTargetException
1045            exception) {
1046          LOG.warn("Your Hadoop installation's StreamCapabilities implementation doesn't match " +
1047              "our understanding of how it's supposed to work. Please file a JIRA and include " +
1048              "the following stack trace. In the mean time we're interpreting this behavior " +
1049              "difference as a lack of capability support, which will probably cause a failure.",
1050              exception);
1051        }
1052      }
1053    }
1054    return result;
1055  }
1056
1057  /**
1058   * Helper exception for those cases where the place where we need to check a stream capability
1059   * is not where we have the needed context to explain the impact and mitigation for a lack.
1060   */
1061  public static class StreamLacksCapabilityException extends Exception {
1062    public StreamLacksCapabilityException(String message, Throwable cause) {
1063      super(message, cause);
1064    }
1065    public StreamLacksCapabilityException(String message) {
1066      super(message);
1067    }
1068  }
1069}