001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import static org.apache.hadoop.hbase.util.LocatedBlockHelper.getLocatedBlockLocations;
021import static org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET;
022
023import edu.umd.cs.findbugs.annotations.CheckForNull;
024import java.io.ByteArrayInputStream;
025import java.io.DataInputStream;
026import java.io.EOFException;
027import java.io.FileNotFoundException;
028import java.io.IOException;
029import java.io.InterruptedIOException;
030import java.lang.reflect.InvocationTargetException;
031import java.lang.reflect.Method;
032import java.net.InetSocketAddress;
033import java.net.URI;
034import java.util.ArrayList;
035import java.util.Arrays;
036import java.util.Collection;
037import java.util.Collections;
038import java.util.HashMap;
039import java.util.HashSet;
040import java.util.Iterator;
041import java.util.List;
042import java.util.Locale;
043import java.util.Map;
044import java.util.Optional;
045import java.util.Set;
046import java.util.Vector;
047import java.util.concurrent.ConcurrentHashMap;
048import java.util.concurrent.ExecutionException;
049import java.util.concurrent.ExecutorService;
050import java.util.concurrent.Executors;
051import java.util.concurrent.Future;
052import java.util.concurrent.FutureTask;
053import java.util.concurrent.ThreadPoolExecutor;
054import java.util.concurrent.TimeUnit;
055import java.util.regex.Pattern;
056import org.apache.commons.lang3.ArrayUtils;
057import org.apache.hadoop.conf.Configuration;
058import org.apache.hadoop.fs.BlockLocation;
059import org.apache.hadoop.fs.FSDataInputStream;
060import org.apache.hadoop.fs.FSDataOutputStream;
061import org.apache.hadoop.fs.FileStatus;
062import org.apache.hadoop.fs.FileSystem;
063import org.apache.hadoop.fs.FileUtil;
064import org.apache.hadoop.fs.Path;
065import org.apache.hadoop.fs.PathFilter;
066import org.apache.hadoop.fs.StorageType;
067import org.apache.hadoop.fs.permission.FsPermission;
068import org.apache.hadoop.hbase.ClusterIdFile;
069import org.apache.hadoop.hbase.ClusterIdFileParser;
070import org.apache.hadoop.hbase.HConstants;
071import org.apache.hadoop.hbase.HDFSBlocksDistribution;
072import org.apache.hadoop.hbase.TableName;
073import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
074import org.apache.hadoop.hbase.client.RegionInfo;
075import org.apache.hadoop.hbase.client.RegionInfoBuilder;
076import org.apache.hadoop.hbase.exceptions.DeserializationException;
077import org.apache.hadoop.hbase.fs.HFileSystem;
078import org.apache.hadoop.hbase.io.HFileLink;
079import org.apache.hadoop.hbase.master.HMaster;
080import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
081import org.apache.hadoop.hdfs.DFSClient;
082import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
083import org.apache.hadoop.hdfs.DFSUtil;
084import org.apache.hadoop.hdfs.DistributedFileSystem;
085import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
086import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
087import org.apache.hadoop.hdfs.protocol.LocatedBlock;
088import org.apache.hadoop.io.IOUtils;
089import org.apache.hadoop.ipc.RemoteException;
090import org.apache.yetus.audience.InterfaceAudience;
091import org.slf4j.Logger;
092import org.slf4j.LoggerFactory;
093
094import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
095import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
096import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
097import org.apache.hbase.thirdparty.com.google.common.primitives.Ints;
098import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
099
100import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
101import org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos;
102
103/**
104 * Utility methods for interacting with the underlying file system.
105 */
106@InterfaceAudience.Private
107public final class FSUtils {
108  private static final Logger LOG = LoggerFactory.getLogger(FSUtils.class);
109
110  private static final String THREAD_POOLSIZE = "hbase.client.localityCheck.threadPoolSize";
111  private static final int DEFAULT_THREAD_POOLSIZE = 2;
112
113  /** Set to true on Windows platforms */
114  // currently only used in testing. TODO refactor into a test class
115  public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
116
117  private static Class<?> safeModeClazz = null;
118  private static Class<?> safeModeActionClazz = null;
119  private static Object safeModeGet = null;
120  {
121    try {
122      safeModeClazz = Class.forName("org.apache.hadoop.fs.SafeMode");
123      safeModeActionClazz = Class.forName("org.apache.hadoop.fs.SafeModeAction");
124      safeModeGet = safeModeClazz.getField("SAFEMODE_GET").get(null);
125    } catch (ClassNotFoundException | NoSuchFieldException e) {
126      LOG.debug("SafeMode interface not in the classpath, this means Hadoop 3.3.5 or below.");
127    } catch (IllegalAccessException e) {
128      LOG.error("SafeModeAction.SAFEMODE_GET is not accessible. "
129        + "Unexpected Hadoop version or messy classpath?", e);
130      throw new RuntimeException(e);
131    }
132  }
133
134  private FSUtils() {
135  }
136
137  /** Returns True is <code>fs</code> is instance of DistributedFileSystem n */
138  public static boolean isDistributedFileSystem(final FileSystem fs) throws IOException {
139    FileSystem fileSystem = fs;
140    // If passed an instance of HFileSystem, it fails instanceof DistributedFileSystem.
141    // Check its backing fs for dfs-ness.
142    if (fs instanceof HFileSystem) {
143      fileSystem = ((HFileSystem) fs).getBackingFs();
144    }
145    return fileSystem instanceof DistributedFileSystem;
146  }
147
148  /**
149   * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
150   * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true. Does not consider
151   * schema; i.e. if schemas different but path or subpath matches, the two will equate.
152   * @param pathToSearch Path we will be trying to match.
153   * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
154   */
155  public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) {
156    Path tailPath = pathTail;
157    String tailName;
158    Path toSearch = pathToSearch;
159    String toSearchName;
160    boolean result = false;
161
162    if (pathToSearch.depth() != pathTail.depth()) {
163      return false;
164    }
165
166    do {
167      tailName = tailPath.getName();
168      if (tailName == null || tailName.isEmpty()) {
169        result = true;
170        break;
171      }
172      toSearchName = toSearch.getName();
173      if (toSearchName == null || toSearchName.isEmpty()) {
174        break;
175      }
176      // Move up a parent on each path for next go around. Path doesn't let us go off the end.
177      tailPath = tailPath.getParent();
178      toSearch = toSearch.getParent();
179    } while (tailName.equals(toSearchName));
180    return result;
181  }
182
183  /**
184   * Delete the region directory if exists.
185   * @return True if deleted the region directory.
186   */
187  public static boolean deleteRegionDir(final Configuration conf, final RegionInfo hri)
188    throws IOException {
189    Path rootDir = CommonFSUtils.getRootDir(conf);
190    FileSystem fs = rootDir.getFileSystem(conf);
191    return CommonFSUtils.deleteDirectory(fs,
192      new Path(CommonFSUtils.getTableDir(rootDir, hri.getTable()), hri.getEncodedName()));
193  }
194
195  /**
196   * Create the specified file on the filesystem. By default, this will:
197   * <ol>
198   * <li>overwrite the file if it exists</li>
199   * <li>apply the umask in the configuration (if it is enabled)</li>
200   * <li>use the fs configured buffer size (or 4096 if not set)</li>
201   * <li>use the configured column family replication or default replication if
202   * {@link ColumnFamilyDescriptorBuilder#DEFAULT_DFS_REPLICATION}</li>
203   * <li>use the default block size</li>
204   * <li>not track progress</li>
205   * </ol>
206   * @param conf         configurations
207   * @param fs           {@link FileSystem} on which to write the file
208   * @param path         {@link Path} to the file to write
209   * @param perm         permissions
210   * @param favoredNodes favored data nodes
211   * @return output stream to the created file
212   * @throws IOException if the file cannot be created
213   */
214  public static FSDataOutputStream create(Configuration conf, FileSystem fs, Path path,
215    FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException {
216    return create(conf, fs, path, perm, favoredNodes, true);
217  }
218
219  /**
220   * Create the specified file on the filesystem. By default, this will:
221   * <ol>
222   * <li>overwrite the file if it exists</li>
223   * <li>apply the umask in the configuration (if it is enabled)</li>
224   * <li>use the fs configured buffer size (or 4096 if not set)</li>
225   * <li>use the configured column family replication or default replication if
226   * {@link ColumnFamilyDescriptorBuilder#DEFAULT_DFS_REPLICATION}</li>
227   * <li>use the default block size</li>
228   * <li>not track progress</li>
229   * </ol>
230   * @param conf              configurations
231   * @param fs                {@link FileSystem} on which to write the file
232   * @param path              {@link Path} to the file to write
233   * @param perm              permissions
234   * @param favoredNodes      favored data nodes
235   * @param isRecursiveCreate recursively create parent directories
236   * @return output stream to the created file
237   * @throws IOException if the file cannot be created
238   */
239  public static FSDataOutputStream create(Configuration conf, FileSystem fs, Path path,
240    FsPermission perm, InetSocketAddress[] favoredNodes, boolean isRecursiveCreate)
241    throws IOException {
242    if (fs instanceof HFileSystem) {
243      FileSystem backingFs = ((HFileSystem) fs).getBackingFs();
244      if (backingFs instanceof DistributedFileSystem) {
245        short replication = Short.parseShort(conf.get(ColumnFamilyDescriptorBuilder.DFS_REPLICATION,
246          String.valueOf(ColumnFamilyDescriptorBuilder.DEFAULT_DFS_REPLICATION)));
247        DistributedFileSystem.HdfsDataOutputStreamBuilder builder =
248          ((DistributedFileSystem) backingFs).createFile(path).recursive().permission(perm)
249            .create();
250        if (favoredNodes != null) {
251          builder.favoredNodes(favoredNodes);
252        }
253        if (replication > 0) {
254          builder.replication(replication);
255        }
256        return builder.build();
257      }
258
259    }
260    return CommonFSUtils.create(fs, path, perm, true, isRecursiveCreate);
261  }
262
263  /**
264   * Checks to see if the specified file system is available
265   * @param fs filesystem
266   * @throws IOException e
267   */
268  public static void checkFileSystemAvailable(final FileSystem fs) throws IOException {
269    if (!(fs instanceof DistributedFileSystem)) {
270      return;
271    }
272    IOException exception = null;
273    DistributedFileSystem dfs = (DistributedFileSystem) fs;
274    try {
275      if (dfs.exists(new Path("/"))) {
276        return;
277      }
278    } catch (IOException e) {
279      exception = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
280    }
281    try {
282      fs.close();
283    } catch (Exception e) {
284      LOG.error("file system close failed: ", e);
285    }
286    throw new IOException("File system is not available", exception);
287  }
288
289  /**
290   * Inquire the Active NameNode's safe mode status.
291   * @param dfs A DistributedFileSystem object representing the underlying HDFS.
292   * @return whether we're in safe mode
293   */
294  private static boolean isInSafeMode(FileSystem dfs) throws IOException {
295    if (isDistributedFileSystem(dfs)) {
296      return ((DistributedFileSystem) dfs).setSafeMode(SAFEMODE_GET, true);
297    } else {
298      try {
299        Object ret = dfs.getClass()
300          .getMethod("setSafeMode", new Class[] { safeModeActionClazz, Boolean.class })
301          .invoke(dfs, safeModeGet, true);
302        return (Boolean) ret;
303      } catch (InvocationTargetException | IllegalAccessException | NoSuchMethodException e) {
304        LOG.error("The file system does not support setSafeMode(). Abort.", e);
305        throw new RuntimeException(e);
306      }
307    }
308  }
309
310  /**
311   * Check whether dfs is in safemode.
312   */
313  public static void checkDfsSafeMode(final Configuration conf) throws IOException {
314    boolean isInSafeMode = false;
315    FileSystem fs = FileSystem.get(conf);
316    if (supportSafeMode(fs)) {
317      isInSafeMode = isInSafeMode(fs);
318    }
319    if (isInSafeMode) {
320      throw new IOException("File system is in safemode, it can't be written now");
321    }
322  }
323
324  /**
325   * Verifies current version of file system
326   * @param fs      filesystem object
327   * @param rootdir root hbase directory
328   * @return null if no version file exists, version string otherwise
329   * @throws IOException              if the version file fails to open
330   * @throws DeserializationException if the version data cannot be translated into a version
331   */
332  public static String getVersion(FileSystem fs, Path rootdir)
333    throws IOException, DeserializationException {
334    final Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
335    FileStatus[] status = null;
336    try {
337      // hadoop 2.0 throws FNFE if directory does not exist.
338      // hadoop 1.0 returns null if directory does not exist.
339      status = fs.listStatus(versionFile);
340    } catch (FileNotFoundException fnfe) {
341      return null;
342    }
343    if (ArrayUtils.getLength(status) == 0) {
344      return null;
345    }
346    String version = null;
347    byte[] content = new byte[(int) status[0].getLen()];
348    FSDataInputStream s = fs.open(versionFile);
349    try {
350      IOUtils.readFully(s, content, 0, content.length);
351      if (ProtobufUtil.isPBMagicPrefix(content)) {
352        version = parseVersionFrom(content);
353      } else {
354        // Presume it pre-pb format.
355        try (DataInputStream dis = new DataInputStream(new ByteArrayInputStream(content))) {
356          version = dis.readUTF();
357        }
358      }
359    } catch (EOFException eof) {
360      LOG.warn("Version file was empty, odd, will try to set it.");
361    } finally {
362      s.close();
363    }
364    return version;
365  }
366
367  /**
368   * Parse the content of the ${HBASE_ROOTDIR}/hbase.version file.
369   * @param bytes The byte content of the hbase.version file
370   * @return The version found in the file as a String
371   * @throws DeserializationException if the version data cannot be translated into a version
372   */
373  static String parseVersionFrom(final byte[] bytes) throws DeserializationException {
374    ProtobufUtil.expectPBMagicPrefix(bytes);
375    int pblen = ProtobufUtil.lengthOfPBMagic();
376    FSProtos.HBaseVersionFileContent.Builder builder =
377      FSProtos.HBaseVersionFileContent.newBuilder();
378    try {
379      ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
380      return builder.getVersion();
381    } catch (IOException e) {
382      // Convert
383      throw new DeserializationException(e);
384    }
385  }
386
387  /**
388   * Create the content to write into the ${HBASE_ROOTDIR}/hbase.version file.
389   * @param version Version to persist
390   * @return Serialized protobuf with <code>version</code> content and a bit of pb magic for a
391   *         prefix.
392   */
393  static byte[] toVersionByteArray(final String version) {
394    FSProtos.HBaseVersionFileContent.Builder builder =
395      FSProtos.HBaseVersionFileContent.newBuilder();
396    return ProtobufUtil.prependPBMagic(builder.setVersion(version).build().toByteArray());
397  }
398
399  /**
400   * Verifies current version of file system
401   * @param fs      file system
402   * @param rootdir root directory of HBase installation
403   * @param message if true, issues a message on System.out
404   * @throws IOException              if the version file cannot be opened
405   * @throws DeserializationException if the contents of the version file cannot be parsed
406   */
407  public static void checkVersion(FileSystem fs, Path rootdir, boolean message)
408    throws IOException, DeserializationException {
409    checkVersion(fs, rootdir, message, 0, HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
410  }
411
412  /**
413   * Verifies current version of file system
414   * @param fs      file system
415   * @param rootdir root directory of HBase installation
416   * @param message if true, issues a message on System.out
417   * @param wait    wait interval
418   * @param retries number of times to retry
419   * @throws IOException              if the version file cannot be opened
420   * @throws DeserializationException if the contents of the version file cannot be parsed
421   */
422  public static void checkVersion(FileSystem fs, Path rootdir, boolean message, int wait,
423    int retries) throws IOException, DeserializationException {
424    String version = getVersion(fs, rootdir);
425    String msg;
426    if (version == null) {
427      if (!metaRegionExists(fs, rootdir)) {
428        // rootDir is empty (no version file and no root region)
429        // just create new version file (HBASE-1195)
430        setVersion(fs, rootdir, wait, retries);
431        return;
432      } else {
433        msg = "hbase.version file is missing. Is your hbase.rootdir valid? "
434          + "You can restore hbase.version file by running 'HBCK2 filesystem -fix'. "
435          + "See https://github.com/apache/hbase-operator-tools/tree/master/hbase-hbck2";
436      }
437    } else if (version.compareTo(HConstants.FILE_SYSTEM_VERSION) == 0) {
438      return;
439    } else {
440      msg = "HBase file layout needs to be upgraded. Current filesystem version is " + version
441        + " but software requires version " + HConstants.FILE_SYSTEM_VERSION
442        + ". Consult https://hbase.apache.org/docs for further information about "
443        + "upgrading HBase.";
444    }
445
446    // version is deprecated require migration
447    // Output on stdout so user sees it in terminal.
448    if (message) {
449      System.out.println("WARNING! " + msg);
450    }
451    throw new FileSystemVersionException(msg);
452  }
453
454  /**
455   * Sets version of file system
456   * @param fs      filesystem object
457   * @param rootdir hbase root
458   * @throws IOException e
459   */
460  public static void setVersion(FileSystem fs, Path rootdir) throws IOException {
461    setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, 0,
462      HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
463  }
464
465  /**
466   * Sets version of file system
467   * @param fs      filesystem object
468   * @param rootdir hbase root
469   * @param wait    time to wait for retry
470   * @param retries number of times to retry before failing
471   * @throws IOException e
472   */
473  public static void setVersion(FileSystem fs, Path rootdir, int wait, int retries)
474    throws IOException {
475    setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, wait, retries);
476  }
477
478  /**
479   * Sets version of file system
480   * @param fs      filesystem object
481   * @param rootdir hbase root directory
482   * @param version version to set
483   * @param wait    time to wait for retry
484   * @param retries number of times to retry before throwing an IOException
485   * @throws IOException e
486   */
487  public static void setVersion(FileSystem fs, Path rootdir, String version, int wait, int retries)
488    throws IOException {
489    Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
490    Path tempVersionFile = new Path(rootdir,
491      HConstants.HBASE_TEMP_DIRECTORY + Path.SEPARATOR + HConstants.VERSION_FILE_NAME);
492    while (true) {
493      try {
494        // Write the version to a temporary file
495        FSDataOutputStream s = fs.create(tempVersionFile);
496        try {
497          s.write(toVersionByteArray(version));
498          s.close();
499          s = null;
500          // Move the temp version file to its normal location. Returns false
501          // if the rename failed. Throw an IOE in that case.
502          if (!fs.rename(tempVersionFile, versionFile)) {
503            throw new IOException("Unable to move temp version file to " + versionFile);
504          }
505        } finally {
506          // Cleaning up the temporary if the rename failed would be trying
507          // too hard. We'll unconditionally create it again the next time
508          // through anyway, files are overwritten by default by create().
509
510          // Attempt to close the stream on the way out if it is still open.
511          try {
512            if (s != null) s.close();
513          } catch (IOException ignore) {
514          }
515        }
516        LOG.info("Created version file at " + rootdir.toString() + " with version=" + version);
517        return;
518      } catch (IOException e) {
519        if (retries > 0) {
520          LOG.debug("Unable to create version file at " + rootdir.toString() + ", retrying", e);
521          fs.delete(versionFile, false);
522          try {
523            if (wait > 0) {
524              Thread.sleep(wait);
525            }
526          } catch (InterruptedException ie) {
527            throw (InterruptedIOException) new InterruptedIOException().initCause(ie);
528          }
529          retries--;
530        } else {
531          throw e;
532        }
533      }
534    }
535  }
536
537  /**
538   * Checks that a cluster ID file exists in the HBase root directory
539   * @param fs      the root directory FileSystem
540   * @param rootdir the HBase root directory in HDFS
541   * @param wait    how long to wait between retries
542   * @return <code>true</code> if the file exists, otherwise <code>false</code>
543   * @throws IOException if checking the FileSystem fails
544   */
545  public static boolean checkFileExistsInHbaseRootDir(FileSystem fs, Path rootdir, String file,
546    long wait) throws IOException {
547    while (true) {
548      try {
549        Path filePath = new Path(rootdir, file);
550        return fs.exists(filePath);
551      } catch (IOException ioe) {
552        if (wait > 0L) {
553          LOG.warn("Unable to check file {} in {}, retrying in {}ms", file, rootdir, wait, ioe);
554          try {
555            Thread.sleep(wait);
556          } catch (InterruptedException e) {
557            Thread.currentThread().interrupt();
558            throw (InterruptedIOException) new InterruptedIOException().initCause(e);
559          }
560        } else {
561          throw ioe;
562        }
563      }
564    }
565  }
566
567  /**
568   * Use the given parser object to read and parse contents of Cluster Id file. e.g. Cluster Id or
569   * Active read-replica Cluster Id
570   */
571  public static <T extends ClusterIdFile> T getClusterIdFile(FileSystem fs, Path rootdir,
572    ClusterIdFileParser<T> parser) throws IOException {
573    Path idPath = new Path(rootdir, parser.getFileName());
574    T cs = null;
575    FileStatus status = fs.exists(idPath) ? fs.getFileStatus(idPath) : null;
576    if (status != null) {
577      int len = Ints.checkedCast(status.getLen());
578      byte[] content = new byte[len];
579      FSDataInputStream in = fs.open(idPath);
580      try {
581        in.readFully(content);
582      } catch (EOFException eof) {
583        LOG.warn("Cluster file {} is empty", idPath);
584      } finally {
585        in.close();
586      }
587      try {
588        cs = parser.parseFrom(content);
589      } catch (DeserializationException e) {
590        throw new IOException("content=" + Bytes.toString(content), e);
591      }
592      // If not pb'd, make it so.
593      if (!ProtobufUtil.isPBMagicPrefix(content)) {
594        String cid = null;
595        in = fs.open(idPath);
596        try {
597          cid = in.readUTF();
598          cs = parser.readString(cid);
599        } catch (EOFException eof) {
600          LOG.warn("Cluster file {} is empty", idPath);
601        } finally {
602          in.close();
603        }
604        rewriteAsPb(fs, rootdir, idPath, parser.getFileName(), cs);
605      }
606      return cs;
607    } else {
608      LOG.warn("Cluster file does not exist at {}", idPath);
609    }
610    return cs;
611  }
612
613  private static <T extends ClusterIdFile> void rewriteAsPb(final FileSystem fs, final Path rootdir,
614    final Path p, final String fileName, final T cs) throws IOException {
615    // Rewrite the file as pb. Move aside the old one first, write new
616    // then delete the moved-aside file.
617    Path movedAsideName = new Path(p + "." + EnvironmentEdgeManager.currentTime());
618    if (!fs.rename(p, movedAsideName)) throw new IOException("Failed rename of " + p);
619    setClusterIdFile(fs, rootdir, fileName, cs, 100);
620    if (!fs.delete(movedAsideName, false)) {
621      throw new IOException("Failed delete of " + movedAsideName);
622    }
623    LOG.debug("Rewrote the {} file as pb", fileName);
624  }
625
626  /**
627   * Writes a new unique identifier for this cluster to the Cluster Id ("hbase.id" or
628   * "active.cluster.suffix.id") file in the HBase root directory. If any operations on the ID file
629   * fails, and {@code wait} is a positive value, the method will retry to produce the ID file until
630   * the thread is forcibly interrupted.
631   * @param fs       the root directory FileSystem
632   * @param rootdir  the path to the HBase root directory
633   * @param fileName name of the file to be written
634   * @param cs       the object to be written
635   * @param wait     how long (in milliseconds) to wait between retries
636   * @throws IOException if writing to the FileSystem fails and no wait value
637   */
638  public static <T extends ClusterIdFile> void setClusterIdFile(final FileSystem fs,
639    final Path rootdir, final String fileName, final T cs, final long wait) throws IOException {
640    final Path idFile = new Path(rootdir, fileName);
641    final Path tempDir = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY);
642    final Path tempIdFile = new Path(tempDir, fileName);
643
644    LOG.debug("Cluster file [{}] is present and contains cluster id: {}", idFile, cs);
645    writeClusterInfo(fs, rootdir, idFile, tempIdFile, cs.toByteArray(), wait);
646  }
647
648  /**
649   * Writes information about this cluster to the specified file. For ex, it is used for writing
650   * cluster id in "hbase.id" file in the HBase root directory. Also, used for writing active
651   * cluster suffix in "active_cluster_suffix.id" file. If any operations on the ID file fails, and
652   * {@code wait} is a positive value, the method will retry to produce the ID file until the thread
653   * is forcibly interrupted.
654   */
655  private static void writeClusterInfo(final FileSystem fs, final Path rootdir, final Path idFile,
656    final Path tempIdFile, byte[] fileData, final long wait) throws IOException {
657    while (true) {
658      Optional<IOException> failure = Optional.empty();
659
660      LOG.debug("Write the file to a temporary location: {}", tempIdFile);
661      try (FSDataOutputStream s = fs.create(tempIdFile)) {
662        s.write(fileData);
663      } catch (IOException ioe) {
664        failure = Optional.of(ioe);
665      }
666
667      if (!failure.isPresent()) {
668        try {
669          LOG.debug("Move the temporary file to its target location [{}]:[{}]", tempIdFile, idFile);
670
671          if (!fs.rename(tempIdFile, idFile)) {
672            failure = Optional.of(new IOException("Unable to move temp file to " + idFile));
673          }
674        } catch (IOException ioe) {
675          failure = Optional.of(ioe);
676        }
677      }
678
679      if (failure.isPresent()) {
680        final IOException cause = failure.get();
681        if (wait > 0L) {
682          LOG.warn("Unable to create file in {}, retrying in {}ms", rootdir, wait, cause);
683          try {
684            Thread.sleep(wait);
685          } catch (InterruptedException e) {
686            Thread.currentThread().interrupt();
687            throw (InterruptedIOException) new InterruptedIOException().initCause(e);
688          }
689          continue;
690        } else {
691          throw cause;
692        }
693      } else {
694        return;
695      }
696    }
697  }
698
699  /**
700   * If DFS, check safe mode and if so, wait until we clear it.
701   * @param conf configuration
702   * @param wait Sleep between retries
703   * @throws IOException e
704   */
705  public static void waitOnSafeMode(final Configuration conf, final long wait) throws IOException {
706    FileSystem fs = FileSystem.get(conf);
707    if (!supportSafeMode(fs)) {
708      return;
709    }
710    // Make sure dfs is not in safe mode
711    while (isInSafeMode(fs)) {
712      LOG.info("Waiting for dfs to exit safe mode...");
713      try {
714        Thread.sleep(wait);
715      } catch (InterruptedException e) {
716        Thread.currentThread().interrupt();
717        throw (InterruptedIOException) new InterruptedIOException().initCause(e);
718      }
719    }
720  }
721
722  public static boolean supportSafeMode(FileSystem fs) {
723    // return true if HDFS.
724    if (fs instanceof DistributedFileSystem) {
725      return true;
726    }
727    // return true if the file system implements SafeMode interface.
728    if (safeModeClazz != null) {
729      return (safeModeClazz.isAssignableFrom(fs.getClass()));
730    }
731    // return false if the file system is not HDFS and does not implement SafeMode interface.
732    return false;
733  }
734
735  /**
736   * Checks if meta region exists
737   * @param fs      file system
738   * @param rootDir root directory of HBase installation
739   * @return true if exists
740   */
741  public static boolean metaRegionExists(FileSystem fs, Path rootDir) throws IOException {
742    Path metaRegionDir = getRegionDirFromRootDir(rootDir, RegionInfoBuilder.FIRST_META_REGIONINFO);
743    return fs.exists(metaRegionDir);
744  }
745
746  /**
747   * Compute HDFS block distribution of a given HdfsDataInputStream. All HdfsDataInputStreams are
748   * backed by a series of LocatedBlocks, which are fetched periodically from the namenode. This
749   * method retrieves those blocks from the input stream and uses them to calculate
750   * HDFSBlockDistribution. The underlying method in DFSInputStream does attempt to use locally
751   * cached blocks, but may hit the namenode if the cache is determined to be incomplete. The method
752   * also involves making copies of all LocatedBlocks rather than return the underlying blocks
753   * themselves.
754   */
755  static public HDFSBlocksDistribution
756    computeHDFSBlocksDistribution(HdfsDataInputStream inputStream) throws IOException {
757    List<LocatedBlock> blocks = inputStream.getAllBlocks();
758    HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution();
759    for (LocatedBlock block : blocks) {
760      String[] hosts = getHostsForLocations(block);
761      long len = block.getBlockSize();
762      StorageType[] storageTypes = block.getStorageTypes();
763      blocksDistribution.addHostsAndBlockWeight(hosts, len, storageTypes);
764    }
765    return blocksDistribution;
766  }
767
768  private static String[] getHostsForLocations(LocatedBlock block) {
769    DatanodeInfo[] locations = getLocatedBlockLocations(block);
770    String[] hosts = new String[locations.length];
771    for (int i = 0; i < hosts.length; i++) {
772      hosts[i] = locations[i].getHostName();
773    }
774    return hosts;
775  }
776
777  /**
778   * Compute HDFS blocks distribution of a given file, or a portion of the file
779   * @param fs     file system
780   * @param status file status of the file
781   * @param start  start position of the portion
782   * @param length length of the portion
783   * @return The HDFS blocks distribution
784   */
785  static public HDFSBlocksDistribution computeHDFSBlocksDistribution(final FileSystem fs,
786    FileStatus status, long start, long length) throws IOException {
787    HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution();
788    BlockLocation[] blockLocations = fs.getFileBlockLocations(status, start, length);
789    addToHDFSBlocksDistribution(blocksDistribution, blockLocations);
790    return blocksDistribution;
791  }
792
793  /**
794   * Update blocksDistribution with blockLocations
795   * @param blocksDistribution the hdfs blocks distribution
796   * @param blockLocations     an array containing block location
797   */
798  static public void addToHDFSBlocksDistribution(HDFSBlocksDistribution blocksDistribution,
799    BlockLocation[] blockLocations) throws IOException {
800    for (BlockLocation bl : blockLocations) {
801      String[] hosts = bl.getHosts();
802      long len = bl.getLength();
803      StorageType[] storageTypes = bl.getStorageTypes();
804      blocksDistribution.addHostsAndBlockWeight(hosts, len, storageTypes);
805    }
806  }
807
808  // TODO move this method OUT of FSUtils. No dependencies to HMaster
809  /**
810   * Returns the total overall fragmentation percentage. Includes hbase:meta and -ROOT- as well.
811   * @param master The master defining the HBase root and file system
812   * @return A map for each table and its percentage (never null)
813   * @throws IOException When scanning the directory fails
814   */
815  public static int getTotalTableFragmentation(final HMaster master) throws IOException {
816    Map<String, Integer> map = getTableFragmentation(master);
817    return map.isEmpty() ? -1 : map.get("-TOTAL-");
818  }
819
820  /**
821   * Runs through the HBase rootdir and checks how many stores for each table have more than one
822   * file in them. Checks -ROOT- and hbase:meta too. The total percentage across all tables is
823   * stored under the special key "-TOTAL-".
824   * @param master The master defining the HBase root and file system.
825   * @return A map for each table and its percentage (never null).
826   * @throws IOException When scanning the directory fails.
827   */
828  public static Map<String, Integer> getTableFragmentation(final HMaster master)
829    throws IOException {
830    Path path = CommonFSUtils.getRootDir(master.getConfiguration());
831    // since HMaster.getFileSystem() is package private
832    FileSystem fs = path.getFileSystem(master.getConfiguration());
833    return getTableFragmentation(fs, path);
834  }
835
836  /**
837   * Runs through the HBase rootdir and checks how many stores for each table have more than one
838   * file in them. Checks -ROOT- and hbase:meta too. The total percentage across all tables is
839   * stored under the special key "-TOTAL-".
840   * @param fs           The file system to use
841   * @param hbaseRootDir The root directory to scan
842   * @return A map for each table and its percentage (never null)
843   * @throws IOException When scanning the directory fails
844   */
845  public static Map<String, Integer> getTableFragmentation(final FileSystem fs,
846    final Path hbaseRootDir) throws IOException {
847    Map<String, Integer> frags = new HashMap<>();
848    int cfCountTotal = 0;
849    int cfFragTotal = 0;
850    PathFilter regionFilter = new RegionDirFilter(fs);
851    PathFilter familyFilter = new FamilyDirFilter(fs);
852    List<Path> tableDirs = getTableDirs(fs, hbaseRootDir);
853    for (Path d : tableDirs) {
854      int cfCount = 0;
855      int cfFrag = 0;
856      FileStatus[] regionDirs = fs.listStatus(d, regionFilter);
857      for (FileStatus regionDir : regionDirs) {
858        Path dd = regionDir.getPath();
859        // else its a region name, now look in region for families
860        FileStatus[] familyDirs = fs.listStatus(dd, familyFilter);
861        for (FileStatus familyDir : familyDirs) {
862          cfCount++;
863          cfCountTotal++;
864          Path family = familyDir.getPath();
865          // now in family make sure only one file
866          FileStatus[] familyStatus = fs.listStatus(family);
867          if (familyStatus.length > 1) {
868            cfFrag++;
869            cfFragTotal++;
870          }
871        }
872      }
873      // compute percentage per table and store in result list
874      frags.put(CommonFSUtils.getTableName(d).getNameAsString(),
875        cfCount == 0 ? 0 : Math.round((float) cfFrag / cfCount * 100));
876    }
877    // set overall percentage for all tables
878    frags.put("-TOTAL-",
879      cfCountTotal == 0 ? 0 : Math.round((float) cfFragTotal / cfCountTotal * 100));
880    return frags;
881  }
882
883  public static void renameFile(FileSystem fs, Path src, Path dst) throws IOException {
884    if (fs.exists(dst) && !fs.delete(dst, false)) {
885      throw new IOException("Can not delete " + dst);
886    }
887    if (!fs.rename(src, dst)) {
888      throw new IOException("Can not rename from " + src + " to " + dst);
889    }
890  }
891
892  /**
893   * A {@link PathFilter} that returns only regular files.
894   */
895  static class FileFilter extends AbstractFileStatusFilter {
896    private final FileSystem fs;
897
898    public FileFilter(final FileSystem fs) {
899      this.fs = fs;
900    }
901
902    @Override
903    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
904      try {
905        return isFile(fs, isDir, p);
906      } catch (IOException e) {
907        LOG.warn("Unable to verify if path={} is a regular file", p, e);
908        return false;
909      }
910    }
911  }
912
913  /**
914   * Directory filter that doesn't include any of the directories in the specified blacklist
915   */
916  public static class BlackListDirFilter extends AbstractFileStatusFilter {
917    private final FileSystem fs;
918    private List<String> blacklist;
919
920    /**
921     * Create a filter on the givem filesystem with the specified blacklist
922     * @param fs                     filesystem to filter
923     * @param directoryNameBlackList list of the names of the directories to filter. If
924     *                               <tt>null</tt>, all directories are returned
925     */
926    @SuppressWarnings("unchecked")
927    public BlackListDirFilter(final FileSystem fs, final List<String> directoryNameBlackList) {
928      this.fs = fs;
929      blacklist = (List<String>) (directoryNameBlackList == null
930        ? Collections.emptyList()
931        : directoryNameBlackList);
932    }
933
934    @Override
935    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
936      if (!isValidName(p.getName())) {
937        return false;
938      }
939
940      try {
941        return isDirectory(fs, isDir, p);
942      } catch (IOException e) {
943        LOG.warn("An error occurred while verifying if [{}] is a valid directory."
944          + " Returning 'not valid' and continuing.", p, e);
945        return false;
946      }
947    }
948
949    protected boolean isValidName(final String name) {
950      return !blacklist.contains(name);
951    }
952  }
953
954  /**
955   * A {@link PathFilter} that only allows directories.
956   */
957  public static class DirFilter extends BlackListDirFilter {
958
959    public DirFilter(FileSystem fs) {
960      super(fs, null);
961    }
962  }
963
964  /**
965   * A {@link PathFilter} that returns usertable directories. To get all directories use the
966   * {@link BlackListDirFilter} with a <tt>null</tt> blacklist
967   */
968  public static class UserTableDirFilter extends BlackListDirFilter {
969    public UserTableDirFilter(FileSystem fs) {
970      super(fs, HConstants.HBASE_NON_TABLE_DIRS);
971    }
972
973    @Override
974    protected boolean isValidName(final String name) {
975      if (!super.isValidName(name)) return false;
976
977      try {
978        TableName.isLegalTableQualifierName(Bytes.toBytes(name));
979      } catch (IllegalArgumentException e) {
980        LOG.info("Invalid table name: {}", name);
981        return false;
982      }
983      return true;
984    }
985  }
986
987  public static List<Path> getTableDirs(final FileSystem fs, final Path rootdir)
988    throws IOException {
989    List<Path> tableDirs = new ArrayList<>();
990    Path baseNamespaceDir = new Path(rootdir, HConstants.BASE_NAMESPACE_DIR);
991    if (fs.exists(baseNamespaceDir)) {
992      for (FileStatus status : fs.globStatus(new Path(baseNamespaceDir, "*"))) {
993        tableDirs.addAll(FSUtils.getLocalTableDirs(fs, status.getPath()));
994      }
995    }
996    return tableDirs;
997  }
998
999  /**
1000   * @return All the table directories under <code>rootdir</code>. Ignore non table hbase folders
1001   *         such as .logs, .oldlogs, .corrupt folders.
1002   */
1003  public static List<Path> getLocalTableDirs(final FileSystem fs, final Path rootdir)
1004    throws IOException {
1005    // presumes any directory under hbase.rootdir is a table
1006    FileStatus[] dirs = fs.listStatus(rootdir, new UserTableDirFilter(fs));
1007    List<Path> tabledirs = new ArrayList<>(dirs.length);
1008    for (FileStatus dir : dirs) {
1009      tabledirs.add(dir.getPath());
1010    }
1011    return tabledirs;
1012  }
1013
1014  /**
1015   * A filter to exclude meta tables belonging to foreign clusters. This is essential in a
1016   * read-replica setup where multiple clusters share the same fs.
1017   * @param tablePath The Path to the table directory.
1018   * @return {@code true} if the path is a regular table or the cluster's own meta table.
1019   *         {@code false} if it is a meta table belonging to a different cluster.
1020   */
1021  public static boolean isLocalMetaTable(Path tablePath) {
1022    if (tablePath == null) {
1023      return false;
1024    }
1025    String dirName = tablePath.getName();
1026    if (dirName.startsWith(TableName.META_TABLE_NAME.getQualifierAsString())) {
1027      return TableName.valueOf(TableName.META_TABLE_NAME.getNamespaceAsString(), dirName)
1028        .equals(TableName.META_TABLE_NAME);
1029    }
1030    return true;
1031  }
1032
1033  /**
1034   * Filter for all dirs that don't start with '.'
1035   */
1036  public static class RegionDirFilter extends AbstractFileStatusFilter {
1037    // This pattern will accept 0.90+ style hex region dirs and older numeric region dir names.
1038    final public static Pattern regionDirPattern = Pattern.compile("^[0-9a-f]*$");
1039    final FileSystem fs;
1040
1041    public RegionDirFilter(FileSystem fs) {
1042      this.fs = fs;
1043    }
1044
1045    @Override
1046    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1047      if (!regionDirPattern.matcher(p.getName()).matches()) {
1048        return false;
1049      }
1050
1051      try {
1052        return isDirectory(fs, isDir, p);
1053      } catch (IOException ioe) {
1054        // Maybe the file was moved or the fs was disconnected.
1055        LOG.warn("Skipping file {} due to IOException", p, ioe);
1056        return false;
1057      }
1058    }
1059  }
1060
1061  /**
1062   * Given a particular table dir, return all the regiondirs inside it, excluding files such as
1063   * .tableinfo
1064   * @param fs       A file system for the Path
1065   * @param tableDir Path to a specific table directory &lt;hbase.rootdir&gt;/&lt;tabledir&gt;
1066   * @return List of paths to valid region directories in table dir.
1067   */
1068  public static List<Path> getRegionDirs(final FileSystem fs, final Path tableDir)
1069    throws IOException {
1070    // assumes we are in a table dir.
1071    List<FileStatus> rds = listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs));
1072    if (rds == null) {
1073      return Collections.emptyList();
1074    }
1075    List<Path> regionDirs = new ArrayList<>(rds.size());
1076    for (FileStatus rdfs : rds) {
1077      Path rdPath = rdfs.getPath();
1078      regionDirs.add(rdPath);
1079    }
1080    return regionDirs;
1081  }
1082
1083  public static Path getRegionDirFromRootDir(Path rootDir, RegionInfo region) {
1084    return getRegionDirFromTableDir(CommonFSUtils.getTableDir(rootDir, region.getTable()), region);
1085  }
1086
1087  public static Path getRegionDirFromTableDir(Path tableDir, RegionInfo region) {
1088    return getRegionDirFromTableDir(tableDir,
1089      ServerRegionReplicaUtil.getRegionInfoForFs(region).getEncodedName());
1090  }
1091
1092  public static Path getRegionDirFromTableDir(Path tableDir, String encodedRegionName) {
1093    return new Path(tableDir, encodedRegionName);
1094  }
1095
1096  /**
1097   * Filter for all dirs that are legal column family names. This is generally used for colfam dirs
1098   * &lt;hbase.rootdir&gt;/&lt;tabledir&gt;/&lt;regiondir&gt;/&lt;colfamdir&gt;.
1099   */
1100  public static class FamilyDirFilter extends AbstractFileStatusFilter {
1101    final FileSystem fs;
1102
1103    public FamilyDirFilter(FileSystem fs) {
1104      this.fs = fs;
1105    }
1106
1107    @Override
1108    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1109      try {
1110        // throws IAE if invalid
1111        ColumnFamilyDescriptorBuilder.isLegalColumnFamilyName(Bytes.toBytes(p.getName()));
1112      } catch (IllegalArgumentException iae) {
1113        // path name is an invalid family name and thus is excluded.
1114        return false;
1115      }
1116
1117      try {
1118        return isDirectory(fs, isDir, p);
1119      } catch (IOException ioe) {
1120        // Maybe the file was moved or the fs was disconnected.
1121        LOG.warn("Skipping file {} due to IOException", p, ioe);
1122        return false;
1123      }
1124    }
1125  }
1126
1127  /**
1128   * Given a particular region dir, return all the familydirs inside it
1129   * @param fs        A file system for the Path
1130   * @param regionDir Path to a specific region directory
1131   * @return List of paths to valid family directories in region dir.
1132   */
1133  public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir)
1134    throws IOException {
1135    // assumes we are in a region dir.
1136    return getFilePaths(fs, regionDir, new FamilyDirFilter(fs));
1137  }
1138
1139  public static List<Path> getReferenceFilePaths(final FileSystem fs, final Path familyDir)
1140    throws IOException {
1141    return getFilePaths(fs, familyDir, new ReferenceFileFilter(fs));
1142  }
1143
1144  public static List<Path> getReferenceAndLinkFilePaths(final FileSystem fs, final Path familyDir)
1145    throws IOException {
1146    return getFilePaths(fs, familyDir, new ReferenceAndLinkFileFilter(fs));
1147  }
1148
1149  private static List<Path> getFilePaths(final FileSystem fs, final Path dir,
1150    final PathFilter pathFilter) throws IOException {
1151    FileStatus[] fds = fs.listStatus(dir, pathFilter);
1152    List<Path> files = new ArrayList<>(fds.length);
1153    for (FileStatus fdfs : fds) {
1154      Path fdPath = fdfs.getPath();
1155      files.add(fdPath);
1156    }
1157    return files;
1158  }
1159
1160  public static int getRegionReferenceAndLinkFileCount(final FileSystem fs, final Path p) {
1161    int result = 0;
1162    try {
1163      for (Path familyDir : getFamilyDirs(fs, p)) {
1164        result += getReferenceAndLinkFilePaths(fs, familyDir).size();
1165      }
1166    } catch (IOException e) {
1167      LOG.warn("Error Counting reference files.", e);
1168    }
1169    return result;
1170  }
1171
1172  public static class ReferenceAndLinkFileFilter implements PathFilter {
1173
1174    private final FileSystem fs;
1175
1176    public ReferenceAndLinkFileFilter(FileSystem fs) {
1177      this.fs = fs;
1178    }
1179
1180    @Override
1181    public boolean accept(Path rd) {
1182      try {
1183        // only files can be references.
1184        return !fs.getFileStatus(rd).isDirectory()
1185          && (StoreFileInfo.isReference(rd) || HFileLink.isHFileLink(rd));
1186      } catch (IOException ioe) {
1187        // Maybe the file was moved or the fs was disconnected.
1188        LOG.warn("Skipping file " + rd + " due to IOException", ioe);
1189        return false;
1190      }
1191    }
1192  }
1193
1194  /**
1195   * Filter for HFiles that excludes reference files.
1196   */
1197  public static class HFileFilter extends AbstractFileStatusFilter {
1198    final FileSystem fs;
1199
1200    public HFileFilter(FileSystem fs) {
1201      this.fs = fs;
1202    }
1203
1204    @Override
1205    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1206      if (!StoreFileInfo.isHFile(p) && !StoreFileInfo.isMobFile(p)) {
1207        return false;
1208      }
1209
1210      try {
1211        return isFile(fs, isDir, p);
1212      } catch (IOException ioe) {
1213        // Maybe the file was moved or the fs was disconnected.
1214        LOG.warn("Skipping file {} due to IOException", p, ioe);
1215        return false;
1216      }
1217    }
1218  }
1219
1220  /**
1221   * Filter for HFileLinks (StoreFiles and HFiles not included). the filter itself does not consider
1222   * if a link is file or not.
1223   */
1224  public static class HFileLinkFilter implements PathFilter {
1225
1226    @Override
1227    public boolean accept(Path p) {
1228      return HFileLink.isHFileLink(p);
1229    }
1230  }
1231
1232  public static class ReferenceFileFilter extends AbstractFileStatusFilter {
1233
1234    private final FileSystem fs;
1235
1236    public ReferenceFileFilter(FileSystem fs) {
1237      this.fs = fs;
1238    }
1239
1240    @Override
1241    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1242      if (!StoreFileInfo.isReference(p)) {
1243        return false;
1244      }
1245
1246      try {
1247        // only files can be references.
1248        return isFile(fs, isDir, p);
1249      } catch (IOException ioe) {
1250        // Maybe the file was moved or the fs was disconnected.
1251        LOG.warn("Skipping file {} due to IOException", p, ioe);
1252        return false;
1253      }
1254    }
1255  }
1256
1257  /**
1258   * Called every so-often by storefile map builder getTableStoreFilePathMap to report progress.
1259   */
1260  interface ProgressReporter {
1261    /**
1262     * @param status File or directory we are about to process.
1263     */
1264    void progress(FileStatus status);
1265  }
1266
1267  /**
1268   * Runs through the HBase rootdir/tablename and creates a reverse lookup map for table StoreFile
1269   * names to the full Path. <br>
1270   * Example...<br>
1271   * Key = 3944417774205889744 <br>
1272   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1273   * @param map          map to add values. If null, this method will create and populate one to
1274   *                     return
1275   * @param fs           The file system to use.
1276   * @param hbaseRootDir The root directory to scan.
1277   * @param tableName    name of the table to scan.
1278   * @return Map keyed by StoreFile name with a value of the full Path.
1279   * @throws IOException When scanning the directory fails.
1280   */
1281  public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map,
1282    final FileSystem fs, final Path hbaseRootDir, TableName tableName)
1283    throws IOException, InterruptedException {
1284    return getTableStoreFilePathMap(map, fs, hbaseRootDir, tableName, null, null,
1285      (ProgressReporter) null);
1286  }
1287
1288  /**
1289   * Runs through the HBase rootdir/tablename and creates a reverse lookup map for table StoreFile
1290   * names to the full Path. Note that because this method can be called on a 'live' HBase system
1291   * that we will skip files that no longer exist by the time we traverse them and similarly the
1292   * user of the result needs to consider that some entries in this map may not exist by the time
1293   * this call completes. <br>
1294   * Example...<br>
1295   * Key = 3944417774205889744 <br>
1296   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1297   * @param resultMap        map to add values. If null, this method will create and populate one to
1298   *                         return
1299   * @param fs               The file system to use.
1300   * @param hbaseRootDir     The root directory to scan.
1301   * @param tableName        name of the table to scan.
1302   * @param sfFilter         optional path filter to apply to store files
1303   * @param executor         optional executor service to parallelize this operation
1304   * @param progressReporter Instance or null; gets called every time we move to new region of
1305   *                         family dir and for each store file.
1306   * @return Map keyed by StoreFile name with a value of the full Path.
1307   * @throws IOException When scanning the directory fails.
1308   * @deprecated Since 2.3.0. For removal in hbase4. Use ProgressReporter override instead.
1309   */
1310  @Deprecated
1311  public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap,
1312    final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter,
1313    ExecutorService executor, final HbckErrorReporter progressReporter)
1314    throws IOException, InterruptedException {
1315    return getTableStoreFilePathMap(resultMap, fs, hbaseRootDir, tableName, sfFilter, executor,
1316      new ProgressReporter() {
1317        @Override
1318        public void progress(FileStatus status) {
1319          // status is not used in this implementation.
1320          progressReporter.progress();
1321        }
1322      });
1323  }
1324
1325  /**
1326   * Runs through the HBase rootdir/tablename and creates a reverse lookup map for table StoreFile
1327   * names to the full Path. Note that because this method can be called on a 'live' HBase system
1328   * that we will skip files that no longer exist by the time we traverse them and similarly the
1329   * user of the result needs to consider that some entries in this map may not exist by the time
1330   * this call completes. <br>
1331   * Example...<br>
1332   * Key = 3944417774205889744 <br>
1333   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1334   * @param resultMap        map to add values. If null, this method will create and populate one to
1335   *                         return
1336   * @param fs               The file system to use.
1337   * @param hbaseRootDir     The root directory to scan.
1338   * @param tableName        name of the table to scan.
1339   * @param sfFilter         optional path filter to apply to store files
1340   * @param executor         optional executor service to parallelize this operation
1341   * @param progressReporter Instance or null; gets called every time we move to new region of
1342   *                         family dir and for each store file.
1343   * @return Map keyed by StoreFile name with a value of the full Path.
1344   * @throws IOException          When scanning the directory fails.
1345   * @throws InterruptedException the thread is interrupted, either before or during the activity.
1346   */
1347  public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap,
1348    final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter,
1349    ExecutorService executor, final ProgressReporter progressReporter)
1350    throws IOException, InterruptedException {
1351
1352    final Map<String, Path> finalResultMap =
1353      resultMap == null ? new ConcurrentHashMap<>(128, 0.75f, 32) : resultMap;
1354
1355    // only include the directory paths to tables
1356    Path tableDir = CommonFSUtils.getTableDir(hbaseRootDir, tableName);
1357    // Inside a table, there are compaction.dir directories to skip. Otherwise, all else
1358    // should be regions.
1359    final FamilyDirFilter familyFilter = new FamilyDirFilter(fs);
1360    final Vector<Exception> exceptions = new Vector<>();
1361
1362    try {
1363      List<FileStatus> regionDirs =
1364        FSUtils.listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs));
1365      if (regionDirs == null) {
1366        return finalResultMap;
1367      }
1368
1369      final List<Future<?>> futures = new ArrayList<>(regionDirs.size());
1370
1371      for (FileStatus regionDir : regionDirs) {
1372        if (null != progressReporter) {
1373          progressReporter.progress(regionDir);
1374        }
1375        final Path dd = regionDir.getPath();
1376
1377        if (!exceptions.isEmpty()) {
1378          break;
1379        }
1380
1381        Runnable getRegionStoreFileMapCall = new Runnable() {
1382          @Override
1383          public void run() {
1384            try {
1385              HashMap<String, Path> regionStoreFileMap = new HashMap<>();
1386              List<FileStatus> familyDirs =
1387                FSUtils.listStatusWithStatusFilter(fs, dd, familyFilter);
1388              if (familyDirs == null) {
1389                if (!fs.exists(dd)) {
1390                  LOG.warn("Skipping region because it no longer exists: " + dd);
1391                } else {
1392                  LOG.warn("Skipping region because it has no family dirs: " + dd);
1393                }
1394                return;
1395              }
1396              for (FileStatus familyDir : familyDirs) {
1397                if (null != progressReporter) {
1398                  progressReporter.progress(familyDir);
1399                }
1400                Path family = familyDir.getPath();
1401                if (family.getName().equals(HConstants.RECOVERED_EDITS_DIR)) {
1402                  continue;
1403                }
1404                // now in family, iterate over the StoreFiles and
1405                // put in map
1406                FileStatus[] familyStatus = fs.listStatus(family);
1407                for (FileStatus sfStatus : familyStatus) {
1408                  if (null != progressReporter) {
1409                    progressReporter.progress(sfStatus);
1410                  }
1411                  Path sf = sfStatus.getPath();
1412                  if (sfFilter == null || sfFilter.accept(sf)) {
1413                    regionStoreFileMap.put(sf.getName(), sf);
1414                  }
1415                }
1416              }
1417              finalResultMap.putAll(regionStoreFileMap);
1418            } catch (Exception e) {
1419              LOG.error("Could not get region store file map for region: " + dd, e);
1420              exceptions.add(e);
1421            }
1422          }
1423        };
1424
1425        // If executor is available, submit async tasks to exec concurrently, otherwise
1426        // just do serial sync execution
1427        if (executor != null) {
1428          Future<?> future = executor.submit(getRegionStoreFileMapCall);
1429          futures.add(future);
1430        } else {
1431          FutureTask<?> future = new FutureTask<>(getRegionStoreFileMapCall, null);
1432          future.run();
1433          futures.add(future);
1434        }
1435      }
1436
1437      // Ensure all pending tasks are complete (or that we run into an exception)
1438      for (Future<?> f : futures) {
1439        if (!exceptions.isEmpty()) {
1440          break;
1441        }
1442        try {
1443          f.get();
1444        } catch (ExecutionException e) {
1445          LOG.error("Unexpected exec exception!  Should've been caught already.  (Bug?)", e);
1446          // Shouldn't happen, we already logged/caught any exceptions in the Runnable
1447        }
1448      }
1449    } catch (IOException e) {
1450      LOG.error("Cannot execute getTableStoreFilePathMap for " + tableName, e);
1451      exceptions.add(e);
1452    } finally {
1453      if (!exceptions.isEmpty()) {
1454        // Just throw the first exception as an indication something bad happened
1455        // Don't need to propagate all the exceptions, we already logged them all anyway
1456        Throwables.propagateIfPossible(exceptions.firstElement(), IOException.class);
1457        throw new IOException(exceptions.firstElement());
1458      }
1459    }
1460
1461    return finalResultMap;
1462  }
1463
1464  public static int getRegionReferenceFileCount(final FileSystem fs, final Path p) {
1465    int result = 0;
1466    try {
1467      for (Path familyDir : getFamilyDirs(fs, p)) {
1468        result += getReferenceFilePaths(fs, familyDir).size();
1469      }
1470    } catch (IOException e) {
1471      LOG.warn("Error counting reference files", e);
1472    }
1473    return result;
1474  }
1475
1476  /**
1477   * Runs through the HBase rootdir and creates a reverse lookup map for table StoreFile names to
1478   * the full Path. <br>
1479   * Example...<br>
1480   * Key = 3944417774205889744 <br>
1481   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1482   * @param fs           The file system to use.
1483   * @param hbaseRootDir The root directory to scan.
1484   * @return Map keyed by StoreFile name with a value of the full Path.
1485   * @throws IOException When scanning the directory fails.
1486   */
1487  public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs,
1488    final Path hbaseRootDir) throws IOException, InterruptedException {
1489    return getTableStoreFilePathMap(fs, hbaseRootDir, null, null, (ProgressReporter) null);
1490  }
1491
1492  /**
1493   * Runs through the HBase rootdir and creates a reverse lookup map for table StoreFile names to
1494   * the full Path. <br>
1495   * Example...<br>
1496   * Key = 3944417774205889744 <br>
1497   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1498   * @param fs               The file system to use.
1499   * @param hbaseRootDir     The root directory to scan.
1500   * @param sfFilter         optional path filter to apply to store files
1501   * @param executor         optional executor service to parallelize this operation
1502   * @param progressReporter Instance or null; gets called every time we move to new region of
1503   *                         family dir and for each store file.
1504   * @return Map keyed by StoreFile name with a value of the full Path.
1505   * @throws IOException When scanning the directory fails.
1506   * @deprecated Since 2.3.0. Will be removed in hbase4. Used
1507   *             {@link #getTableStoreFilePathMap(FileSystem, Path, PathFilter, ExecutorService, ProgressReporter)}
1508   */
1509  @Deprecated
1510  public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs,
1511    final Path hbaseRootDir, PathFilter sfFilter, ExecutorService executor,
1512    HbckErrorReporter progressReporter) throws IOException, InterruptedException {
1513    return getTableStoreFilePathMap(fs, hbaseRootDir, sfFilter, executor, new ProgressReporter() {
1514      @Override
1515      public void progress(FileStatus status) {
1516        // status is not used in this implementation.
1517        progressReporter.progress();
1518      }
1519    });
1520  }
1521
1522  /**
1523   * Runs through the HBase rootdir and creates a reverse lookup map for table StoreFile names to
1524   * the full Path. <br>
1525   * Example...<br>
1526   * Key = 3944417774205889744 <br>
1527   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1528   * @param fs               The file system to use.
1529   * @param hbaseRootDir     The root directory to scan.
1530   * @param sfFilter         optional path filter to apply to store files
1531   * @param executor         optional executor service to parallelize this operation
1532   * @param progressReporter Instance or null; gets called every time we move to new region of
1533   *                         family dir and for each store file.
1534   * @return Map keyed by StoreFile name with a value of the full Path.
1535   * @throws IOException When scanning the directory fails.
1536   */
1537  public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs,
1538    final Path hbaseRootDir, PathFilter sfFilter, ExecutorService executor,
1539    ProgressReporter progressReporter) throws IOException, InterruptedException {
1540    ConcurrentHashMap<String, Path> map = new ConcurrentHashMap<>(1024, 0.75f, 32);
1541
1542    // if this method looks similar to 'getTableFragmentation' that is because
1543    // it was borrowed from it.
1544
1545    // only include the directory paths to tables
1546    for (Path tableDir : FSUtils.getTableDirs(fs, hbaseRootDir)) {
1547      getTableStoreFilePathMap(map, fs, hbaseRootDir, CommonFSUtils.getTableName(tableDir),
1548        sfFilter, executor, progressReporter);
1549    }
1550    return map;
1551  }
1552
1553  /**
1554   * Filters FileStatuses in an array and returns a list
1555   * @param input  An array of FileStatuses
1556   * @param filter A required filter to filter the array
1557   * @return A list of FileStatuses
1558   */
1559  public static List<FileStatus> filterFileStatuses(FileStatus[] input, FileStatusFilter filter) {
1560    if (input == null) return null;
1561    return filterFileStatuses(Iterators.forArray(input), filter);
1562  }
1563
1564  /**
1565   * Filters FileStatuses in an iterator and returns a list
1566   * @param input  An iterator of FileStatuses
1567   * @param filter A required filter to filter the array
1568   * @return A list of FileStatuses
1569   */
1570  public static List<FileStatus> filterFileStatuses(Iterator<FileStatus> input,
1571    FileStatusFilter filter) {
1572    if (input == null) return null;
1573    ArrayList<FileStatus> results = new ArrayList<>();
1574    while (input.hasNext()) {
1575      FileStatus f = input.next();
1576      if (filter.accept(f)) {
1577        results.add(f);
1578      }
1579    }
1580    return results;
1581  }
1582
1583  /**
1584   * Calls fs.listStatus() and treats FileNotFoundException as non-fatal This accommodates
1585   * differences between hadoop versions, where hadoop 1 does not throw a FileNotFoundException, and
1586   * return an empty FileStatus[] while Hadoop 2 will throw FileNotFoundException.
1587   * @param fs     file system
1588   * @param dir    directory
1589   * @param filter file status filter
1590   * @return null if dir is empty or doesn't exist, otherwise FileStatus list
1591   */
1592  public static List<FileStatus> listStatusWithStatusFilter(final FileSystem fs, final Path dir,
1593    final FileStatusFilter filter) throws IOException {
1594    FileStatus[] status = null;
1595    try {
1596      status = fs.listStatus(dir);
1597    } catch (FileNotFoundException fnfe) {
1598      LOG.trace("{} does not exist", dir);
1599      return null;
1600    }
1601
1602    if (ArrayUtils.getLength(status) == 0) {
1603      return null;
1604    }
1605
1606    if (filter == null) {
1607      return Arrays.asList(status);
1608    } else {
1609      List<FileStatus> status2 = filterFileStatuses(status, filter);
1610      if (status2 == null || status2.isEmpty()) {
1611        return null;
1612      } else {
1613        return status2;
1614      }
1615    }
1616  }
1617
1618  /**
1619   * This function is to scan the root path of the file system to get the degree of locality for
1620   * each region on each of the servers having at least one block of that region. This is used by
1621   * the tool {@link org.apache.hadoop.hbase.master.RegionPlacementMaintainer} the configuration to
1622   * use
1623   * @return the mapping from region encoded name to a map of server names to locality fraction in
1624   *         case of file system errors or interrupts
1625   */
1626  public static Map<String, Map<String, Float>>
1627    getRegionDegreeLocalityMappingFromFS(final Configuration conf) throws IOException {
1628    return getRegionDegreeLocalityMappingFromFS(conf, null,
1629      conf.getInt(THREAD_POOLSIZE, DEFAULT_THREAD_POOLSIZE));
1630
1631  }
1632
1633  /**
1634   * This function is to scan the root path of the file system to get the degree of locality for
1635   * each region on each of the servers having at least one block of that region. the configuration
1636   * to use the table you wish to scan locality for the thread pool size to use
1637   * @return the mapping from region encoded name to a map of server names to locality fraction in
1638   *         case of file system errors or interrupts
1639   */
1640  public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
1641    final Configuration conf, final String desiredTable, int threadPoolSize) throws IOException {
1642    Map<String, Map<String, Float>> regionDegreeLocalityMapping = new ConcurrentHashMap<>();
1643    getRegionLocalityMappingFromFS(conf, desiredTable, threadPoolSize, regionDegreeLocalityMapping);
1644    return regionDegreeLocalityMapping;
1645  }
1646
1647  /**
1648   * This function is to scan the root path of the file system to get either the mapping between the
1649   * region name and its best locality region server or the degree of locality of each region on
1650   * each of the servers having at least one block of that region. The output map parameters are
1651   * both optional. the configuration to use the table you wish to scan locality for the thread pool
1652   * size to use the map into which to put the locality degree mapping or null, must be a
1653   * thread-safe implementation in case of file system errors or interrupts
1654   */
1655  private static void getRegionLocalityMappingFromFS(final Configuration conf,
1656    final String desiredTable, int threadPoolSize,
1657    final Map<String, Map<String, Float>> regionDegreeLocalityMapping) throws IOException {
1658    final FileSystem fs = FileSystem.get(conf);
1659    final Path rootPath = CommonFSUtils.getRootDir(conf);
1660    final long startTime = EnvironmentEdgeManager.currentTime();
1661    final Path queryPath;
1662    // The table files are in ${hbase.rootdir}/data/<namespace>/<table>/*
1663    if (null == desiredTable) {
1664      queryPath =
1665        new Path(new Path(rootPath, HConstants.BASE_NAMESPACE_DIR).toString() + "/*/*/*/");
1666    } else {
1667      queryPath = new Path(
1668        CommonFSUtils.getTableDir(rootPath, TableName.valueOf(desiredTable)).toString() + "/*/");
1669    }
1670
1671    // reject all paths that are not appropriate
1672    PathFilter pathFilter = new PathFilter() {
1673      @Override
1674      public boolean accept(Path path) {
1675        // this is the region name; it may get some noise data
1676        if (null == path) {
1677          return false;
1678        }
1679
1680        // no parent?
1681        Path parent = path.getParent();
1682        if (null == parent) {
1683          return false;
1684        }
1685
1686        String regionName = path.getName();
1687        if (null == regionName) {
1688          return false;
1689        }
1690
1691        if (!regionName.toLowerCase(Locale.ROOT).matches("[0-9a-f]+")) {
1692          return false;
1693        }
1694        return true;
1695      }
1696    };
1697
1698    FileStatus[] statusList = fs.globStatus(queryPath, pathFilter);
1699
1700    if (LOG.isDebugEnabled()) {
1701      LOG.debug("Query Path: {} ; # list of files: {}", queryPath, Arrays.toString(statusList));
1702    }
1703
1704    if (null == statusList) {
1705      return;
1706    }
1707
1708    // lower the number of threads in case we have very few expected regions
1709    threadPoolSize = Math.min(threadPoolSize, statusList.length);
1710
1711    // run in multiple threads
1712    final ExecutorService tpe = Executors.newFixedThreadPool(threadPoolSize,
1713      new ThreadFactoryBuilder().setNameFormat("FSRegionQuery-pool-%d").setDaemon(true)
1714        .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
1715    try {
1716      // ignore all file status items that are not of interest
1717      for (FileStatus regionStatus : statusList) {
1718        if (null == regionStatus || !regionStatus.isDirectory()) {
1719          continue;
1720        }
1721
1722        final Path regionPath = regionStatus.getPath();
1723        if (null != regionPath) {
1724          tpe.execute(new FSRegionScanner(fs, regionPath, null, regionDegreeLocalityMapping));
1725        }
1726      }
1727    } finally {
1728      tpe.shutdown();
1729      final long threadWakeFrequency = (long) conf.getInt(HConstants.THREAD_WAKE_FREQUENCY,
1730        HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
1731      try {
1732        // here we wait until TPE terminates, which is either naturally or by
1733        // exceptions in the execution of the threads
1734        while (!tpe.awaitTermination(threadWakeFrequency, TimeUnit.MILLISECONDS)) {
1735          // printing out rough estimate, so as to not introduce
1736          // AtomicInteger
1737          LOG.info("Locality checking is underway: { Scanned Regions : "
1738            + ((ThreadPoolExecutor) tpe).getCompletedTaskCount() + "/"
1739            + ((ThreadPoolExecutor) tpe).getTaskCount() + " }");
1740        }
1741      } catch (InterruptedException e) {
1742        Thread.currentThread().interrupt();
1743        throw (InterruptedIOException) new InterruptedIOException().initCause(e);
1744      }
1745    }
1746
1747    long overhead = EnvironmentEdgeManager.currentTime() - startTime;
1748    LOG.info("Scan DFS for locality info takes {}ms", overhead);
1749  }
1750
1751  /**
1752   * Do our short circuit read setup. Checks buffer size to use and whether to do checksumming in
1753   * hbase or hdfs.
1754   */
1755  public static void setupShortCircuitRead(final Configuration conf) {
1756    // Check that the user has not set the "dfs.client.read.shortcircuit.skip.checksum" property.
1757    boolean shortCircuitSkipChecksum =
1758      conf.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false);
1759    boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
1760    if (shortCircuitSkipChecksum) {
1761      LOG.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not "
1762        + "be set to true."
1763        + (useHBaseChecksum
1764          ? " HBase checksum doesn't require "
1765            + "it, see https://issues.apache.org/jira/browse/HBASE-6868."
1766          : ""));
1767      assert !shortCircuitSkipChecksum; // this will fail if assertions are on
1768    }
1769    checkShortCircuitReadBufferSize(conf);
1770  }
1771
1772  /**
1773   * Check if short circuit read buffer size is set and if not, set it to hbase value.
1774   */
1775  public static void checkShortCircuitReadBufferSize(final Configuration conf) {
1776    final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2;
1777    final int notSet = -1;
1778    // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2
1779    final String dfsKey = "dfs.client.read.shortcircuit.buffer.size";
1780    int size = conf.getInt(dfsKey, notSet);
1781    // If a size is set, return -- we will use it.
1782    if (size != notSet) return;
1783    // But short circuit buffer size is normally not set. Put in place the hbase wanted size.
1784    int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize);
1785    conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
1786  }
1787
1788  /**
1789   * Returns The DFSClient DFSHedgedReadMetrics instance or null if can't be found or not on hdfs.
1790   */
1791  public static DFSHedgedReadMetrics getDFSHedgedReadMetrics(final Configuration c)
1792    throws IOException {
1793    if (!CommonFSUtils.isHDFS(c)) {
1794      return null;
1795    }
1796    // getHedgedReadMetrics is package private. Get the DFSClient instance that is internal
1797    // to the DFS FS instance and make the method getHedgedReadMetrics accessible, then invoke it
1798    // to get the singleton instance of DFSHedgedReadMetrics shared by DFSClients.
1799    final String name = "getHedgedReadMetrics";
1800    DFSClient dfsclient = ((DistributedFileSystem) FileSystem.get(c)).getClient();
1801    Method m;
1802    try {
1803      m = dfsclient.getClass().getDeclaredMethod(name);
1804    } catch (NoSuchMethodException e) {
1805      LOG.warn(
1806        "Failed find method " + name + " in dfsclient; no hedged read metrics: " + e.getMessage());
1807      return null;
1808    } catch (SecurityException e) {
1809      LOG.warn(
1810        "Failed find method " + name + " in dfsclient; no hedged read metrics: " + e.getMessage());
1811      return null;
1812    }
1813    m.setAccessible(true);
1814    try {
1815      return (DFSHedgedReadMetrics) m.invoke(dfsclient);
1816    } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
1817      LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: "
1818        + e.getMessage());
1819      return null;
1820    }
1821  }
1822
1823  public static List<Path> copyFilesParallel(FileSystem srcFS, Path src, FileSystem dstFS, Path dst,
1824    Configuration conf, int threads) throws IOException {
1825    ExecutorService pool = Executors.newFixedThreadPool(threads);
1826    List<Future<Void>> futures = new ArrayList<>();
1827    List<Path> traversedPaths;
1828    try {
1829      traversedPaths = copyFiles(srcFS, src, dstFS, dst, conf, pool, futures);
1830      for (Future<Void> future : futures) {
1831        future.get();
1832      }
1833    } catch (ExecutionException | InterruptedException | IOException e) {
1834      throw new IOException("Copy snapshot reference files failed", e);
1835    } finally {
1836      pool.shutdownNow();
1837    }
1838    return traversedPaths;
1839  }
1840
1841  private static List<Path> copyFiles(FileSystem srcFS, Path src, FileSystem dstFS, Path dst,
1842    Configuration conf, ExecutorService pool, List<Future<Void>> futures) throws IOException {
1843    List<Path> traversedPaths = new ArrayList<>();
1844    traversedPaths.add(dst);
1845    FileStatus currentFileStatus = srcFS.getFileStatus(src);
1846    if (currentFileStatus.isDirectory()) {
1847      if (!dstFS.mkdirs(dst)) {
1848        throw new IOException("Create directory failed: " + dst);
1849      }
1850      FileStatus[] subPaths = srcFS.listStatus(src);
1851      for (FileStatus subPath : subPaths) {
1852        traversedPaths.addAll(copyFiles(srcFS, subPath.getPath(), dstFS,
1853          new Path(dst, subPath.getPath().getName()), conf, pool, futures));
1854      }
1855    } else {
1856      Future<Void> future = pool.submit(() -> {
1857        FileUtil.copy(srcFS, src, dstFS, dst, false, false, conf);
1858        return null;
1859      });
1860      futures.add(future);
1861    }
1862    return traversedPaths;
1863  }
1864
1865  /** Returns A set containing all namenode addresses of fs */
1866  private static Set<InetSocketAddress> getNNAddresses(DistributedFileSystem fs,
1867    Configuration conf) {
1868    Set<InetSocketAddress> addresses = new HashSet<>();
1869    String serviceName = fs.getCanonicalServiceName();
1870
1871    if (serviceName.startsWith("ha-hdfs")) {
1872      try {
1873        Map<String, Map<String, InetSocketAddress>> addressMap =
1874          DFSUtil.getNNServiceRpcAddressesForCluster(conf);
1875        String nameService = serviceName.substring(serviceName.indexOf(":") + 1);
1876        if (addressMap.containsKey(nameService)) {
1877          Map<String, InetSocketAddress> nnMap = addressMap.get(nameService);
1878          for (Map.Entry<String, InetSocketAddress> e2 : nnMap.entrySet()) {
1879            InetSocketAddress addr = e2.getValue();
1880            addresses.add(addr);
1881          }
1882        }
1883      } catch (Exception e) {
1884        LOG.warn("DFSUtil.getNNServiceRpcAddresses failed. serviceName=" + serviceName, e);
1885      }
1886    } else {
1887      URI uri = fs.getUri();
1888      int port = uri.getPort();
1889      if (port < 0) {
1890        int idx = serviceName.indexOf(':');
1891        port = Integer.parseInt(serviceName.substring(idx + 1));
1892      }
1893      InetSocketAddress addr = new InetSocketAddress(uri.getHost(), port);
1894      addresses.add(addr);
1895    }
1896
1897    return addresses;
1898  }
1899
1900  /**
1901   * @param conf the Configuration of HBase
1902   * @return Whether srcFs and desFs are on same hdfs or not
1903   */
1904  public static boolean isSameHdfs(Configuration conf, FileSystem srcFs, FileSystem desFs) {
1905    // By getCanonicalServiceName, we could make sure both srcFs and desFs
1906    // show a unified format which contains scheme, host and port.
1907    String srcServiceName = srcFs.getCanonicalServiceName();
1908    String desServiceName = desFs.getCanonicalServiceName();
1909
1910    if (srcServiceName == null || desServiceName == null) {
1911      return false;
1912    }
1913    if (srcServiceName.equals(desServiceName)) {
1914      return true;
1915    }
1916    if (srcServiceName.startsWith("ha-hdfs") && desServiceName.startsWith("ha-hdfs")) {
1917      Collection<String> internalNameServices =
1918        conf.getTrimmedStringCollection("dfs.internal.nameservices");
1919      if (!internalNameServices.isEmpty()) {
1920        if (internalNameServices.contains(srcServiceName.split(":")[1])) {
1921          return true;
1922        } else {
1923          return false;
1924        }
1925      }
1926    }
1927    if (srcFs instanceof DistributedFileSystem && desFs instanceof DistributedFileSystem) {
1928      // If one serviceName is an HA format while the other is a non-HA format,
1929      // maybe they refer to the same FileSystem.
1930      // For example, srcFs is "ha-hdfs://nameservices" and desFs is "hdfs://activeNamenode:port"
1931      Set<InetSocketAddress> srcAddrs = getNNAddresses((DistributedFileSystem) srcFs, conf);
1932      Set<InetSocketAddress> desAddrs = getNNAddresses((DistributedFileSystem) desFs, conf);
1933      if (Sets.intersection(srcAddrs, desAddrs).size() > 0) {
1934        return true;
1935      }
1936    }
1937
1938    return false;
1939  }
1940}