001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.util;
020
021import edu.umd.cs.findbugs.annotations.CheckForNull;
022import java.io.ByteArrayInputStream;
023import java.io.DataInputStream;
024import java.io.EOFException;
025import java.io.FileNotFoundException;
026import java.io.IOException;
027import java.io.InterruptedIOException;
028import java.lang.reflect.InvocationTargetException;
029import java.lang.reflect.Method;
030import java.net.InetSocketAddress;
031import java.net.URI;
032import java.util.ArrayList;
033import java.util.Arrays;
034import java.util.Collection;
035import java.util.Collections;
036import java.util.HashMap;
037import java.util.HashSet;
038import java.util.Iterator;
039import java.util.List;
040import java.util.Locale;
041import java.util.Map;
042import java.util.Optional;
043import java.util.Set;
044import java.util.Vector;
045import java.util.concurrent.ConcurrentHashMap;
046import java.util.concurrent.ExecutionException;
047import java.util.concurrent.ExecutorService;
048import java.util.concurrent.Executors;
049import java.util.concurrent.Future;
050import java.util.concurrent.FutureTask;
051import java.util.concurrent.ThreadPoolExecutor;
052import java.util.concurrent.TimeUnit;
053import java.util.regex.Pattern;
054import org.apache.commons.lang3.ArrayUtils;
055import org.apache.hadoop.conf.Configuration;
056import org.apache.hadoop.fs.BlockLocation;
057import org.apache.hadoop.fs.FSDataInputStream;
058import org.apache.hadoop.fs.FSDataOutputStream;
059import org.apache.hadoop.fs.FileStatus;
060import org.apache.hadoop.fs.FileSystem;
061import org.apache.hadoop.fs.FileUtil;
062import org.apache.hadoop.fs.Path;
063import org.apache.hadoop.fs.PathFilter;
064import org.apache.hadoop.fs.StorageType;
065import org.apache.hadoop.fs.permission.FsPermission;
066import org.apache.hadoop.hbase.ClusterId;
067import org.apache.hadoop.hbase.HConstants;
068import org.apache.hadoop.hbase.HDFSBlocksDistribution;
069import org.apache.hadoop.hbase.TableName;
070import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
071import org.apache.hadoop.hbase.client.RegionInfo;
072import org.apache.hadoop.hbase.client.RegionInfoBuilder;
073import org.apache.hadoop.hbase.exceptions.DeserializationException;
074import org.apache.hadoop.hbase.fs.HFileSystem;
075import org.apache.hadoop.hbase.io.HFileLink;
076import org.apache.hadoop.hbase.master.HMaster;
077import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
078import org.apache.hadoop.hdfs.DFSClient;
079import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
080import org.apache.hadoop.hdfs.DFSUtil;
081import org.apache.hadoop.hdfs.DistributedFileSystem;
082import org.apache.hadoop.hdfs.protocol.HdfsConstants;
083import org.apache.hadoop.io.IOUtils;
084import org.apache.hadoop.ipc.RemoteException;
085import org.apache.hadoop.util.Progressable;
086import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
087import org.apache.yetus.audience.InterfaceAudience;
088import org.slf4j.Logger;
089import org.slf4j.LoggerFactory;
090
091import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
092import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
093import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
094import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
095import org.apache.hbase.thirdparty.com.google.common.primitives.Ints;
096
097import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
098import org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos;
099
100/**
101 * Utility methods for interacting with the underlying file system.
102 */
103@InterfaceAudience.Private
104public final class FSUtils {
105  private static final Logger LOG = LoggerFactory.getLogger(FSUtils.class);
106
107  private static final String THREAD_POOLSIZE = "hbase.client.localityCheck.threadPoolSize";
108  private static final int DEFAULT_THREAD_POOLSIZE = 2;
109
110  /** Set to true on Windows platforms */
111  @VisibleForTesting // currently only used in testing. TODO refactor into a test class
112  public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
113
114  private FSUtils() {
115  }
116
117  /**
118   * @return True is <code>fs</code> is instance of DistributedFileSystem
119   * @throws IOException
120   */
121  public static boolean isDistributedFileSystem(final FileSystem fs) throws IOException {
122    FileSystem fileSystem = fs;
123    // If passed an instance of HFileSystem, it fails instanceof DistributedFileSystem.
124    // Check its backing fs for dfs-ness.
125    if (fs instanceof HFileSystem) {
126      fileSystem = ((HFileSystem)fs).getBackingFs();
127    }
128    return fileSystem instanceof DistributedFileSystem;
129  }
130
131  /**
132   * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
133   * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true.  Does not consider
134   * schema; i.e. if schemas different but path or subpath matches, the two will equate.
135   * @param pathToSearch Path we will be trying to match.
136   * @param pathTail
137   * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
138   */
139  public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) {
140    Path tailPath = pathTail;
141    String tailName;
142    Path toSearch = pathToSearch;
143    String toSearchName;
144    boolean result = false;
145
146    if (pathToSearch.depth() != pathTail.depth()) {
147      return false;
148    }
149
150    do {
151      tailName = tailPath.getName();
152      if (tailName == null || tailName.isEmpty()) {
153        result = true;
154        break;
155      }
156      toSearchName = toSearch.getName();
157      if (toSearchName == null || toSearchName.isEmpty()) {
158        break;
159      }
160      // Move up a parent on each path for next go around.  Path doesn't let us go off the end.
161      tailPath = tailPath.getParent();
162      toSearch = toSearch.getParent();
163    } while(tailName.equals(toSearchName));
164    return result;
165  }
166
167  /**
168   * Delete the region directory if exists.
169   * @return True if deleted the region directory.
170   * @throws IOException
171   */
172  public static boolean deleteRegionDir(final Configuration conf, final RegionInfo hri)
173    throws IOException {
174    Path rootDir = CommonFSUtils.getRootDir(conf);
175    FileSystem fs = rootDir.getFileSystem(conf);
176    return CommonFSUtils.deleteDirectory(fs,
177      new Path(CommonFSUtils.getTableDir(rootDir, hri.getTable()), hri.getEncodedName()));
178  }
179
180 /**
181   * Create the specified file on the filesystem. By default, this will:
182   * <ol>
183   * <li>overwrite the file if it exists</li>
184   * <li>apply the umask in the configuration (if it is enabled)</li>
185   * <li>use the fs configured buffer size (or 4096 if not set)</li>
186   * <li>use the configured column family replication or default replication if
187   * {@link ColumnFamilyDescriptorBuilder#DEFAULT_DFS_REPLICATION}</li>
188   * <li>use the default block size</li>
189   * <li>not track progress</li>
190   * </ol>
191   * @param conf configurations
192   * @param fs {@link FileSystem} on which to write the file
193   * @param path {@link Path} to the file to write
194   * @param perm permissions
195   * @param favoredNodes favored data nodes
196   * @return output stream to the created file
197   * @throws IOException if the file cannot be created
198   */
199  public static FSDataOutputStream create(Configuration conf, FileSystem fs, Path path,
200    FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException {
201    if (fs instanceof HFileSystem) {
202      FileSystem backingFs = ((HFileSystem) fs).getBackingFs();
203      if (backingFs instanceof DistributedFileSystem) {
204        // Try to use the favoredNodes version via reflection to allow backwards-
205        // compatibility.
206        short replication = Short.parseShort(conf.get(ColumnFamilyDescriptorBuilder.DFS_REPLICATION,
207          String.valueOf(ColumnFamilyDescriptorBuilder.DEFAULT_DFS_REPLICATION)));
208        try {
209          return (FSDataOutputStream) (DistributedFileSystem.class
210            .getDeclaredMethod("create", Path.class, FsPermission.class, boolean.class, int.class,
211              short.class, long.class, Progressable.class, InetSocketAddress[].class)
212            .invoke(backingFs, path, perm, true, CommonFSUtils.getDefaultBufferSize(backingFs),
213              replication > 0 ? replication : CommonFSUtils.getDefaultReplication(backingFs, path),
214              CommonFSUtils.getDefaultBlockSize(backingFs, path), null, favoredNodes));
215        } catch (InvocationTargetException ite) {
216          // Function was properly called, but threw it's own exception.
217          throw new IOException(ite.getCause());
218        } catch (NoSuchMethodException e) {
219          LOG.debug("DFS Client does not support most favored nodes create; using default create");
220          LOG.trace("Ignoring; use default create", e);
221        } catch (IllegalArgumentException | SecurityException | IllegalAccessException e) {
222          LOG.debug("Ignoring (most likely Reflection related exception) " + e);
223        }
224      }
225    }
226    return CommonFSUtils.create(fs, path, perm, true);
227  }
228
229  /**
230   * Checks to see if the specified file system is available
231   *
232   * @param fs filesystem
233   * @throws IOException e
234   */
235  public static void checkFileSystemAvailable(final FileSystem fs)
236  throws IOException {
237    if (!(fs instanceof DistributedFileSystem)) {
238      return;
239    }
240    IOException exception = null;
241    DistributedFileSystem dfs = (DistributedFileSystem) fs;
242    try {
243      if (dfs.exists(new Path("/"))) {
244        return;
245      }
246    } catch (IOException e) {
247      exception = e instanceof RemoteException ?
248              ((RemoteException)e).unwrapRemoteException() : e;
249    }
250    try {
251      fs.close();
252    } catch (Exception e) {
253      LOG.error("file system close failed: ", e);
254    }
255    throw new IOException("File system is not available", exception);
256  }
257
258  /**
259   * We use reflection because {@link DistributedFileSystem#setSafeMode(
260   * HdfsConstants.SafeModeAction action, boolean isChecked)} is not in hadoop 1.1
261   *
262   * @param dfs
263   * @return whether we're in safe mode
264   * @throws IOException
265   */
266  private static boolean isInSafeMode(DistributedFileSystem dfs) throws IOException {
267    boolean inSafeMode = false;
268    try {
269      Method m = DistributedFileSystem.class.getMethod("setSafeMode", new Class<?> []{
270          org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.class, boolean.class});
271      inSafeMode = (Boolean) m.invoke(dfs,
272        org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET, true);
273    } catch (Exception e) {
274      if (e instanceof IOException) throw (IOException) e;
275
276      // Check whether dfs is on safemode.
277      inSafeMode = dfs.setSafeMode(
278        org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET);
279    }
280    return inSafeMode;
281  }
282
283  /**
284   * Check whether dfs is in safemode.
285   * @param conf
286   * @throws IOException
287   */
288  public static void checkDfsSafeMode(final Configuration conf)
289  throws IOException {
290    boolean isInSafeMode = false;
291    FileSystem fs = FileSystem.get(conf);
292    if (fs instanceof DistributedFileSystem) {
293      DistributedFileSystem dfs = (DistributedFileSystem)fs;
294      isInSafeMode = isInSafeMode(dfs);
295    }
296    if (isInSafeMode) {
297      throw new IOException("File system is in safemode, it can't be written now");
298    }
299  }
300
301  /**
302   * Verifies current version of file system
303   *
304   * @param fs filesystem object
305   * @param rootdir root hbase directory
306   * @return null if no version file exists, version string otherwise
307   * @throws IOException if the version file fails to open
308   * @throws DeserializationException if the version data cannot be translated into a version
309   */
310  public static String getVersion(FileSystem fs, Path rootdir)
311  throws IOException, DeserializationException {
312    final Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
313    FileStatus[] status = null;
314    try {
315      // hadoop 2.0 throws FNFE if directory does not exist.
316      // hadoop 1.0 returns null if directory does not exist.
317      status = fs.listStatus(versionFile);
318    } catch (FileNotFoundException fnfe) {
319      return null;
320    }
321    if (ArrayUtils.getLength(status) == 0) {
322      return null;
323    }
324    String version = null;
325    byte [] content = new byte [(int)status[0].getLen()];
326    FSDataInputStream s = fs.open(versionFile);
327    try {
328      IOUtils.readFully(s, content, 0, content.length);
329      if (ProtobufUtil.isPBMagicPrefix(content)) {
330        version = parseVersionFrom(content);
331      } else {
332        // Presume it pre-pb format.
333        try (DataInputStream dis = new DataInputStream(new ByteArrayInputStream(content))) {
334          version = dis.readUTF();
335        }
336      }
337    } catch (EOFException eof) {
338      LOG.warn("Version file was empty, odd, will try to set it.");
339    } finally {
340      s.close();
341    }
342    return version;
343  }
344
345  /**
346   * Parse the content of the ${HBASE_ROOTDIR}/hbase.version file.
347   * @param bytes The byte content of the hbase.version file
348   * @return The version found in the file as a String
349   * @throws DeserializationException if the version data cannot be translated into a version
350   */
351  static String parseVersionFrom(final byte [] bytes)
352  throws DeserializationException {
353    ProtobufUtil.expectPBMagicPrefix(bytes);
354    int pblen = ProtobufUtil.lengthOfPBMagic();
355    FSProtos.HBaseVersionFileContent.Builder builder =
356      FSProtos.HBaseVersionFileContent.newBuilder();
357    try {
358      ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
359      return builder.getVersion();
360    } catch (IOException e) {
361      // Convert
362      throw new DeserializationException(e);
363    }
364  }
365
366  /**
367   * Create the content to write into the ${HBASE_ROOTDIR}/hbase.version file.
368   * @param version Version to persist
369   * @return Serialized protobuf with <code>version</code> content and a bit of pb magic for a prefix.
370   */
371  static byte [] toVersionByteArray(final String version) {
372    FSProtos.HBaseVersionFileContent.Builder builder =
373      FSProtos.HBaseVersionFileContent.newBuilder();
374    return ProtobufUtil.prependPBMagic(builder.setVersion(version).build().toByteArray());
375  }
376
377  /**
378   * Verifies current version of file system
379   *
380   * @param fs file system
381   * @param rootdir root directory of HBase installation
382   * @param message if true, issues a message on System.out
383   * @throws IOException if the version file cannot be opened
384   * @throws DeserializationException if the contents of the version file cannot be parsed
385   */
386  public static void checkVersion(FileSystem fs, Path rootdir, boolean message)
387  throws IOException, DeserializationException {
388    checkVersion(fs, rootdir, message, 0, HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
389  }
390
391  /**
392   * Verifies current version of file system
393   *
394   * @param fs file system
395   * @param rootdir root directory of HBase installation
396   * @param message if true, issues a message on System.out
397   * @param wait wait interval
398   * @param retries number of times to retry
399   *
400   * @throws IOException if the version file cannot be opened
401   * @throws DeserializationException if the contents of the version file cannot be parsed
402   */
403  public static void checkVersion(FileSystem fs, Path rootdir,
404      boolean message, int wait, int retries)
405  throws IOException, DeserializationException {
406    String version = getVersion(fs, rootdir);
407    String msg;
408    if (version == null) {
409      if (!metaRegionExists(fs, rootdir)) {
410        // rootDir is empty (no version file and no root region)
411        // just create new version file (HBASE-1195)
412        setVersion(fs, rootdir, wait, retries);
413        return;
414      } else {
415        msg = "hbase.version file is missing. Is your hbase.rootdir valid? " +
416            "You can restore hbase.version file by running 'HBCK2 filesystem -fix'. " +
417            "See https://github.com/apache/hbase-operator-tools/tree/master/hbase-hbck2";
418      }
419    } else if (version.compareTo(HConstants.FILE_SYSTEM_VERSION) == 0) {
420      return;
421    } else {
422      msg = "HBase file layout needs to be upgraded. Current filesystem version is " + version +
423          " but software requires version " + HConstants.FILE_SYSTEM_VERSION +
424          ". Consult http://hbase.apache.org/book.html for further information about " +
425          "upgrading HBase.";
426    }
427
428    // version is deprecated require migration
429    // Output on stdout so user sees it in terminal.
430    if (message) {
431      System.out.println("WARNING! " + msg);
432    }
433    throw new FileSystemVersionException(msg);
434  }
435
436  /**
437   * Sets version of file system
438   *
439   * @param fs filesystem object
440   * @param rootdir hbase root
441   * @throws IOException e
442   */
443  public static void setVersion(FileSystem fs, Path rootdir)
444  throws IOException {
445    setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, 0,
446      HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
447  }
448
449  /**
450   * Sets version of file system
451   *
452   * @param fs filesystem object
453   * @param rootdir hbase root
454   * @param wait time to wait for retry
455   * @param retries number of times to retry before failing
456   * @throws IOException e
457   */
458  public static void setVersion(FileSystem fs, Path rootdir, int wait, int retries)
459  throws IOException {
460    setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, wait, retries);
461  }
462
463
464  /**
465   * Sets version of file system
466   *
467   * @param fs filesystem object
468   * @param rootdir hbase root directory
469   * @param version version to set
470   * @param wait time to wait for retry
471   * @param retries number of times to retry before throwing an IOException
472   * @throws IOException e
473   */
474  public static void setVersion(FileSystem fs, Path rootdir, String version,
475      int wait, int retries) throws IOException {
476    Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
477    Path tempVersionFile = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY + Path.SEPARATOR +
478      HConstants.VERSION_FILE_NAME);
479    while (true) {
480      try {
481        // Write the version to a temporary file
482        FSDataOutputStream s = fs.create(tempVersionFile);
483        try {
484          s.write(toVersionByteArray(version));
485          s.close();
486          s = null;
487          // Move the temp version file to its normal location. Returns false
488          // if the rename failed. Throw an IOE in that case.
489          if (!fs.rename(tempVersionFile, versionFile)) {
490            throw new IOException("Unable to move temp version file to " + versionFile);
491          }
492        } finally {
493          // Cleaning up the temporary if the rename failed would be trying
494          // too hard. We'll unconditionally create it again the next time
495          // through anyway, files are overwritten by default by create().
496
497          // Attempt to close the stream on the way out if it is still open.
498          try {
499            if (s != null) s.close();
500          } catch (IOException ignore) { }
501        }
502        LOG.info("Created version file at " + rootdir.toString() + " with version=" + version);
503        return;
504      } catch (IOException e) {
505        if (retries > 0) {
506          LOG.debug("Unable to create version file at " + rootdir.toString() + ", retrying", e);
507          fs.delete(versionFile, false);
508          try {
509            if (wait > 0) {
510              Thread.sleep(wait);
511            }
512          } catch (InterruptedException ie) {
513            throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
514          }
515          retries--;
516        } else {
517          throw e;
518        }
519      }
520    }
521  }
522
523  /**
524   * Checks that a cluster ID file exists in the HBase root directory
525   * @param fs the root directory FileSystem
526   * @param rootdir the HBase root directory in HDFS
527   * @param wait how long to wait between retries
528   * @return <code>true</code> if the file exists, otherwise <code>false</code>
529   * @throws IOException if checking the FileSystem fails
530   */
531  public static boolean checkClusterIdExists(FileSystem fs, Path rootdir,
532      long wait) throws IOException {
533    while (true) {
534      try {
535        Path filePath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
536        return fs.exists(filePath);
537      } catch (IOException ioe) {
538        if (wait > 0L) {
539          LOG.warn("Unable to check cluster ID file in {}, retrying in {}ms", rootdir, wait, ioe);
540          try {
541            Thread.sleep(wait);
542          } catch (InterruptedException e) {
543            Thread.currentThread().interrupt();
544            throw (InterruptedIOException) new InterruptedIOException().initCause(e);
545          }
546        } else {
547          throw ioe;
548        }
549      }
550    }
551  }
552
553  /**
554   * Returns the value of the unique cluster ID stored for this HBase instance.
555   * @param fs the root directory FileSystem
556   * @param rootdir the path to the HBase root directory
557   * @return the unique cluster identifier
558   * @throws IOException if reading the cluster ID file fails
559   */
560  public static ClusterId getClusterId(FileSystem fs, Path rootdir)
561  throws IOException {
562    Path idPath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
563    ClusterId clusterId = null;
564    FileStatus status = fs.exists(idPath)? fs.getFileStatus(idPath):  null;
565    if (status != null) {
566      int len = Ints.checkedCast(status.getLen());
567      byte [] content = new byte[len];
568      FSDataInputStream in = fs.open(idPath);
569      try {
570        in.readFully(content);
571      } catch (EOFException eof) {
572        LOG.warn("Cluster ID file {} is empty", idPath);
573      } finally{
574        in.close();
575      }
576      try {
577        clusterId = ClusterId.parseFrom(content);
578      } catch (DeserializationException e) {
579        throw new IOException("content=" + Bytes.toString(content), e);
580      }
581      // If not pb'd, make it so.
582      if (!ProtobufUtil.isPBMagicPrefix(content)) {
583        String cid = null;
584        in = fs.open(idPath);
585        try {
586          cid = in.readUTF();
587          clusterId = new ClusterId(cid);
588        } catch (EOFException eof) {
589          LOG.warn("Cluster ID file {} is empty", idPath);
590        } finally {
591          in.close();
592        }
593        rewriteAsPb(fs, rootdir, idPath, clusterId);
594      }
595      return clusterId;
596    } else {
597      LOG.warn("Cluster ID file does not exist at {}", idPath);
598    }
599    return clusterId;
600  }
601
602  /**
603   * @param cid
604   * @throws IOException
605   */
606  private static void rewriteAsPb(final FileSystem fs, final Path rootdir, final Path p,
607      final ClusterId cid)
608  throws IOException {
609    // Rewrite the file as pb.  Move aside the old one first, write new
610    // then delete the moved-aside file.
611    Path movedAsideName = new Path(p + "." + System.currentTimeMillis());
612    if (!fs.rename(p, movedAsideName)) throw new IOException("Failed rename of " + p);
613    setClusterId(fs, rootdir, cid, 100);
614    if (!fs.delete(movedAsideName, false)) {
615      throw new IOException("Failed delete of " + movedAsideName);
616    }
617    LOG.debug("Rewrote the hbase.id file as pb");
618  }
619
620  /**
621   * Writes a new unique identifier for this cluster to the "hbase.id" file in the HBase root
622   * directory. If any operations on the ID file fails, and {@code wait} is a positive value, the
623   * method will retry to produce the ID file until the thread is forcibly interrupted.
624   *
625   * @param fs the root directory FileSystem
626   * @param rootdir the path to the HBase root directory
627   * @param clusterId the unique identifier to store
628   * @param wait how long (in milliseconds) to wait between retries
629   * @throws IOException if writing to the FileSystem fails and no wait value
630   */
631  public static void setClusterId(final FileSystem fs, final Path rootdir,
632      final ClusterId clusterId, final long wait) throws IOException {
633
634    final Path idFile = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
635    final Path tempDir = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY);
636    final Path tempIdFile = new Path(tempDir, HConstants.CLUSTER_ID_FILE_NAME);
637
638    LOG.debug("Create cluster ID file [{}] with ID: {}", idFile, clusterId);
639
640    while (true) {
641      Optional<IOException> failure = Optional.empty();
642
643      LOG.debug("Write the cluster ID file to a temporary location: {}", tempIdFile);
644      try (FSDataOutputStream s = fs.create(tempIdFile)) {
645        s.write(clusterId.toByteArray());
646      } catch (IOException ioe) {
647        failure = Optional.of(ioe);
648      }
649
650      if (!failure.isPresent()) {
651        try {
652          LOG.debug("Move the temporary cluster ID file to its target location [{}]:[{}]",
653            tempIdFile, idFile);
654
655          if (!fs.rename(tempIdFile, idFile)) {
656            failure =
657                Optional.of(new IOException("Unable to move temp cluster ID file to " + idFile));
658          }
659        } catch (IOException ioe) {
660          failure = Optional.of(ioe);
661        }
662      }
663
664      if (failure.isPresent()) {
665        final IOException cause = failure.get();
666        if (wait > 0L) {
667          LOG.warn("Unable to create cluster ID file in {}, retrying in {}ms", rootdir, wait,
668            cause);
669          try {
670            Thread.sleep(wait);
671          } catch (InterruptedException e) {
672            Thread.currentThread().interrupt();
673            throw (InterruptedIOException) new InterruptedIOException().initCause(e);
674          }
675          continue;
676        } else {
677          throw cause;
678        }
679      } else {
680        return;
681      }
682    }
683  }
684
685  /**
686   * If DFS, check safe mode and if so, wait until we clear it.
687   * @param conf configuration
688   * @param wait Sleep between retries
689   * @throws IOException e
690   */
691  public static void waitOnSafeMode(final Configuration conf,
692    final long wait)
693  throws IOException {
694    FileSystem fs = FileSystem.get(conf);
695    if (!(fs instanceof DistributedFileSystem)) return;
696    DistributedFileSystem dfs = (DistributedFileSystem)fs;
697    // Make sure dfs is not in safe mode
698    while (isInSafeMode(dfs)) {
699      LOG.info("Waiting for dfs to exit safe mode...");
700      try {
701        Thread.sleep(wait);
702      } catch (InterruptedException e) {
703        Thread.currentThread().interrupt();
704        throw (InterruptedIOException) new InterruptedIOException().initCause(e);
705      }
706    }
707  }
708
709  /**
710   * Checks if meta region exists
711   * @param fs file system
712   * @param rootDir root directory of HBase installation
713   * @return true if exists
714   */
715  public static boolean metaRegionExists(FileSystem fs, Path rootDir) throws IOException {
716    Path metaRegionDir = getRegionDirFromRootDir(rootDir, RegionInfoBuilder.FIRST_META_REGIONINFO);
717    return fs.exists(metaRegionDir);
718  }
719
720  /**
721   * Compute HDFS blocks distribution of a given file, or a portion of the file
722   * @param fs file system
723   * @param status file status of the file
724   * @param start start position of the portion
725   * @param length length of the portion
726   * @return The HDFS blocks distribution
727   */
728  static public HDFSBlocksDistribution computeHDFSBlocksDistribution(
729    final FileSystem fs, FileStatus status, long start, long length)
730    throws IOException {
731    HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution();
732    BlockLocation [] blockLocations =
733      fs.getFileBlockLocations(status, start, length);
734    addToHDFSBlocksDistribution(blocksDistribution, blockLocations);
735    return blocksDistribution;
736  }
737
738  /**
739   * Update blocksDistribution with blockLocations
740   * @param blocksDistribution the hdfs blocks distribution
741   * @param blockLocations an array containing block location
742   */
743  static public void addToHDFSBlocksDistribution(
744      HDFSBlocksDistribution blocksDistribution, BlockLocation[] blockLocations)
745      throws IOException {
746    for (BlockLocation bl : blockLocations) {
747      String[] hosts = bl.getHosts();
748      long len = bl.getLength();
749      StorageType[] storageTypes = bl.getStorageTypes();
750      blocksDistribution.addHostsAndBlockWeight(hosts, len, storageTypes);
751    }
752  }
753
754  // TODO move this method OUT of FSUtils. No dependencies to HMaster
755  /**
756   * Returns the total overall fragmentation percentage. Includes hbase:meta and
757   * -ROOT- as well.
758   *
759   * @param master  The master defining the HBase root and file system
760   * @return A map for each table and its percentage (never null)
761   * @throws IOException When scanning the directory fails
762   */
763  public static int getTotalTableFragmentation(final HMaster master)
764  throws IOException {
765    Map<String, Integer> map = getTableFragmentation(master);
766    return map.isEmpty() ? -1 :  map.get("-TOTAL-");
767  }
768
769  /**
770   * Runs through the HBase rootdir and checks how many stores for each table
771   * have more than one file in them. Checks -ROOT- and hbase:meta too. The total
772   * percentage across all tables is stored under the special key "-TOTAL-".
773   *
774   * @param master  The master defining the HBase root and file system.
775   * @return A map for each table and its percentage (never null).
776   *
777   * @throws IOException When scanning the directory fails.
778   */
779  public static Map<String, Integer> getTableFragmentation(final HMaster master)
780    throws IOException {
781    Path path = CommonFSUtils.getRootDir(master.getConfiguration());
782    // since HMaster.getFileSystem() is package private
783    FileSystem fs = path.getFileSystem(master.getConfiguration());
784    return getTableFragmentation(fs, path);
785  }
786
787  /**
788   * Runs through the HBase rootdir and checks how many stores for each table
789   * have more than one file in them. Checks -ROOT- and hbase:meta too. The total
790   * percentage across all tables is stored under the special key "-TOTAL-".
791   *
792   * @param fs  The file system to use
793   * @param hbaseRootDir  The root directory to scan
794   * @return A map for each table and its percentage (never null)
795   * @throws IOException When scanning the directory fails
796   */
797  public static Map<String, Integer> getTableFragmentation(
798    final FileSystem fs, final Path hbaseRootDir)
799  throws IOException {
800    Map<String, Integer> frags = new HashMap<>();
801    int cfCountTotal = 0;
802    int cfFragTotal = 0;
803    PathFilter regionFilter = new RegionDirFilter(fs);
804    PathFilter familyFilter = new FamilyDirFilter(fs);
805    List<Path> tableDirs = getTableDirs(fs, hbaseRootDir);
806    for (Path d : tableDirs) {
807      int cfCount = 0;
808      int cfFrag = 0;
809      FileStatus[] regionDirs = fs.listStatus(d, regionFilter);
810      for (FileStatus regionDir : regionDirs) {
811        Path dd = regionDir.getPath();
812        // else its a region name, now look in region for families
813        FileStatus[] familyDirs = fs.listStatus(dd, familyFilter);
814        for (FileStatus familyDir : familyDirs) {
815          cfCount++;
816          cfCountTotal++;
817          Path family = familyDir.getPath();
818          // now in family make sure only one file
819          FileStatus[] familyStatus = fs.listStatus(family);
820          if (familyStatus.length > 1) {
821            cfFrag++;
822            cfFragTotal++;
823          }
824        }
825      }
826      // compute percentage per table and store in result list
827      frags.put(CommonFSUtils.getTableName(d).getNameAsString(),
828        cfCount == 0? 0: Math.round((float) cfFrag / cfCount * 100));
829    }
830    // set overall percentage for all tables
831    frags.put("-TOTAL-",
832      cfCountTotal == 0? 0: Math.round((float) cfFragTotal / cfCountTotal * 100));
833    return frags;
834  }
835
836  public static void renameFile(FileSystem fs, Path src, Path dst) throws IOException {
837    if (fs.exists(dst) && !fs.delete(dst, false)) {
838      throw new IOException("Can not delete " + dst);
839    }
840    if (!fs.rename(src, dst)) {
841      throw new IOException("Can not rename from " + src + " to " + dst);
842    }
843  }
844
845  /**
846   * A {@link PathFilter} that returns only regular files.
847   */
848  static class FileFilter extends AbstractFileStatusFilter {
849    private final FileSystem fs;
850
851    public FileFilter(final FileSystem fs) {
852      this.fs = fs;
853    }
854
855    @Override
856    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
857      try {
858        return isFile(fs, isDir, p);
859      } catch (IOException e) {
860        LOG.warn("Unable to verify if path={} is a regular file", p, e);
861        return false;
862      }
863    }
864  }
865
866  /**
867   * Directory filter that doesn't include any of the directories in the specified blacklist
868   */
869  public static class BlackListDirFilter extends AbstractFileStatusFilter {
870    private final FileSystem fs;
871    private List<String> blacklist;
872
873    /**
874     * Create a filter on the givem filesystem with the specified blacklist
875     * @param fs filesystem to filter
876     * @param directoryNameBlackList list of the names of the directories to filter. If
877     *          <tt>null</tt>, all directories are returned
878     */
879    @SuppressWarnings("unchecked")
880    public BlackListDirFilter(final FileSystem fs, final List<String> directoryNameBlackList) {
881      this.fs = fs;
882      blacklist =
883        (List<String>) (directoryNameBlackList == null ? Collections.emptyList()
884          : directoryNameBlackList);
885    }
886
887    @Override
888    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
889      if (!isValidName(p.getName())) {
890        return false;
891      }
892
893      try {
894        return isDirectory(fs, isDir, p);
895      } catch (IOException e) {
896        LOG.warn("An error occurred while verifying if [{}] is a valid directory."
897            + " Returning 'not valid' and continuing.", p, e);
898        return false;
899      }
900    }
901
902    protected boolean isValidName(final String name) {
903      return !blacklist.contains(name);
904    }
905  }
906
907  /**
908   * A {@link PathFilter} that only allows directories.
909   */
910  public static class DirFilter extends BlackListDirFilter {
911
912    public DirFilter(FileSystem fs) {
913      super(fs, null);
914    }
915  }
916
917  /**
918   * A {@link PathFilter} that returns usertable directories. To get all directories use the
919   * {@link BlackListDirFilter} with a <tt>null</tt> blacklist
920   */
921  public static class UserTableDirFilter extends BlackListDirFilter {
922    public UserTableDirFilter(FileSystem fs) {
923      super(fs, HConstants.HBASE_NON_TABLE_DIRS);
924    }
925
926    @Override
927    protected boolean isValidName(final String name) {
928      if (!super.isValidName(name))
929        return false;
930
931      try {
932        TableName.isLegalTableQualifierName(Bytes.toBytes(name));
933      } catch (IllegalArgumentException e) {
934        LOG.info("Invalid table name: {}", name);
935        return false;
936      }
937      return true;
938    }
939  }
940
941  public static List<Path> getTableDirs(final FileSystem fs, final Path rootdir)
942      throws IOException {
943    List<Path> tableDirs = new ArrayList<>();
944
945    for (FileStatus status : fs
946        .globStatus(new Path(rootdir, new Path(HConstants.BASE_NAMESPACE_DIR, "*")))) {
947      tableDirs.addAll(FSUtils.getLocalTableDirs(fs, status.getPath()));
948    }
949    return tableDirs;
950  }
951
952  /**
953   * @param fs
954   * @param rootdir
955   * @return All the table directories under <code>rootdir</code>. Ignore non table hbase folders such as
956   * .logs, .oldlogs, .corrupt folders.
957   * @throws IOException
958   */
959  public static List<Path> getLocalTableDirs(final FileSystem fs, final Path rootdir)
960      throws IOException {
961    // presumes any directory under hbase.rootdir is a table
962    FileStatus[] dirs = fs.listStatus(rootdir, new UserTableDirFilter(fs));
963    List<Path> tabledirs = new ArrayList<>(dirs.length);
964    for (FileStatus dir: dirs) {
965      tabledirs.add(dir.getPath());
966    }
967    return tabledirs;
968  }
969
970  /**
971   * Filter for all dirs that don't start with '.'
972   */
973  public static class RegionDirFilter extends AbstractFileStatusFilter {
974    // This pattern will accept 0.90+ style hex region dirs and older numeric region dir names.
975    final public static Pattern regionDirPattern = Pattern.compile("^[0-9a-f]*$");
976    final FileSystem fs;
977
978    public RegionDirFilter(FileSystem fs) {
979      this.fs = fs;
980    }
981
982    @Override
983    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
984      if (!regionDirPattern.matcher(p.getName()).matches()) {
985        return false;
986      }
987
988      try {
989        return isDirectory(fs, isDir, p);
990      } catch (IOException ioe) {
991        // Maybe the file was moved or the fs was disconnected.
992        LOG.warn("Skipping file {} due to IOException", p, ioe);
993        return false;
994      }
995    }
996  }
997
998  /**
999   * Given a particular table dir, return all the regiondirs inside it, excluding files such as
1000   * .tableinfo
1001   * @param fs A file system for the Path
1002   * @param tableDir Path to a specific table directory &lt;hbase.rootdir&gt;/&lt;tabledir&gt;
1003   * @return List of paths to valid region directories in table dir.
1004   * @throws IOException
1005   */
1006  public static List<Path> getRegionDirs(final FileSystem fs, final Path tableDir) throws IOException {
1007    // assumes we are in a table dir.
1008    List<FileStatus> rds = listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs));
1009    if (rds == null) {
1010      return Collections.emptyList();
1011    }
1012    List<Path> regionDirs = new ArrayList<>(rds.size());
1013    for (FileStatus rdfs: rds) {
1014      Path rdPath = rdfs.getPath();
1015      regionDirs.add(rdPath);
1016    }
1017    return regionDirs;
1018  }
1019
1020  public static Path getRegionDirFromRootDir(Path rootDir, RegionInfo region) {
1021    return getRegionDirFromTableDir(CommonFSUtils.getTableDir(rootDir, region.getTable()), region);
1022  }
1023
1024  public static Path getRegionDirFromTableDir(Path tableDir, RegionInfo region) {
1025    return getRegionDirFromTableDir(tableDir,
1026        ServerRegionReplicaUtil.getRegionInfoForFs(region).getEncodedName());
1027  }
1028
1029  public static Path getRegionDirFromTableDir(Path tableDir, String encodedRegionName) {
1030    return new Path(tableDir, encodedRegionName);
1031  }
1032
1033  /**
1034   * Filter for all dirs that are legal column family names.  This is generally used for colfam
1035   * dirs &lt;hbase.rootdir&gt;/&lt;tabledir&gt;/&lt;regiondir&gt;/&lt;colfamdir&gt;.
1036   */
1037  public static class FamilyDirFilter extends AbstractFileStatusFilter {
1038    final FileSystem fs;
1039
1040    public FamilyDirFilter(FileSystem fs) {
1041      this.fs = fs;
1042    }
1043
1044    @Override
1045    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1046      try {
1047        // throws IAE if invalid
1048        ColumnFamilyDescriptorBuilder.isLegalColumnFamilyName(Bytes.toBytes(p.getName()));
1049      } catch (IllegalArgumentException iae) {
1050        // path name is an invalid family name and thus is excluded.
1051        return false;
1052      }
1053
1054      try {
1055        return isDirectory(fs, isDir, p);
1056      } catch (IOException ioe) {
1057        // Maybe the file was moved or the fs was disconnected.
1058        LOG.warn("Skipping file {} due to IOException", p, ioe);
1059        return false;
1060      }
1061    }
1062  }
1063
1064  /**
1065   * Given a particular region dir, return all the familydirs inside it
1066   *
1067   * @param fs A file system for the Path
1068   * @param regionDir Path to a specific region directory
1069   * @return List of paths to valid family directories in region dir.
1070   * @throws IOException
1071   */
1072  public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir) throws IOException {
1073    // assumes we are in a region dir.
1074    FileStatus[] fds = fs.listStatus(regionDir, new FamilyDirFilter(fs));
1075    List<Path> familyDirs = new ArrayList<>(fds.length);
1076    for (FileStatus fdfs: fds) {
1077      Path fdPath = fdfs.getPath();
1078      familyDirs.add(fdPath);
1079    }
1080    return familyDirs;
1081  }
1082
1083  public static List<Path> getReferenceFilePaths(final FileSystem fs, final Path familyDir) throws IOException {
1084    List<FileStatus> fds = listStatusWithStatusFilter(fs, familyDir, new ReferenceFileFilter(fs));
1085    if (fds == null) {
1086      return Collections.emptyList();
1087    }
1088    List<Path> referenceFiles = new ArrayList<>(fds.size());
1089    for (FileStatus fdfs: fds) {
1090      Path fdPath = fdfs.getPath();
1091      referenceFiles.add(fdPath);
1092    }
1093    return referenceFiles;
1094  }
1095
1096  /**
1097   * Filter for HFiles that excludes reference files.
1098   */
1099  public static class HFileFilter extends AbstractFileStatusFilter {
1100    final FileSystem fs;
1101
1102    public HFileFilter(FileSystem fs) {
1103      this.fs = fs;
1104    }
1105
1106    @Override
1107    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1108      if (!StoreFileInfo.isHFile(p) && !StoreFileInfo.isMobFile(p)) {
1109        return false;
1110      }
1111
1112      try {
1113        return isFile(fs, isDir, p);
1114      } catch (IOException ioe) {
1115        // Maybe the file was moved or the fs was disconnected.
1116        LOG.warn("Skipping file {} due to IOException", p, ioe);
1117        return false;
1118      }
1119    }
1120  }
1121
1122  /**
1123   * Filter for HFileLinks (StoreFiles and HFiles not included).
1124   * the filter itself does not consider if a link is file or not.
1125   */
1126  public static class HFileLinkFilter implements PathFilter {
1127
1128    @Override
1129    public boolean accept(Path p) {
1130      return HFileLink.isHFileLink(p);
1131    }
1132  }
1133
1134  public static class ReferenceFileFilter extends AbstractFileStatusFilter {
1135
1136    private final FileSystem fs;
1137
1138    public ReferenceFileFilter(FileSystem fs) {
1139      this.fs = fs;
1140    }
1141
1142    @Override
1143    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1144      if (!StoreFileInfo.isReference(p)) {
1145        return false;
1146      }
1147
1148      try {
1149        // only files can be references.
1150        return isFile(fs, isDir, p);
1151      } catch (IOException ioe) {
1152        // Maybe the file was moved or the fs was disconnected.
1153        LOG.warn("Skipping file {} due to IOException", p, ioe);
1154        return false;
1155      }
1156    }
1157  }
1158
1159  /**
1160   * Called every so-often by storefile map builder getTableStoreFilePathMap to
1161   * report progress.
1162   */
1163  interface ProgressReporter {
1164    /**
1165     * @param status File or directory we are about to process.
1166     */
1167    void progress(FileStatus status);
1168  }
1169
1170  /**
1171   * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1172   * table StoreFile names to the full Path.
1173   * <br>
1174   * Example...<br>
1175   * Key = 3944417774205889744  <br>
1176   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1177   *
1178   * @param map map to add values.  If null, this method will create and populate one to return
1179   * @param fs  The file system to use.
1180   * @param hbaseRootDir  The root directory to scan.
1181   * @param tableName name of the table to scan.
1182   * @return Map keyed by StoreFile name with a value of the full Path.
1183   * @throws IOException When scanning the directory fails.
1184   * @throws InterruptedException
1185   */
1186  public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map,
1187  final FileSystem fs, final Path hbaseRootDir, TableName tableName)
1188  throws IOException, InterruptedException {
1189    return getTableStoreFilePathMap(map, fs, hbaseRootDir, tableName, null, null,
1190        (ProgressReporter)null);
1191  }
1192
1193  /**
1194   * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1195   * table StoreFile names to the full Path.  Note that because this method can be called
1196   * on a 'live' HBase system that we will skip files that no longer exist by the time
1197   * we traverse them and similarly the user of the result needs to consider that some
1198   * entries in this map may not exist by the time this call completes.
1199   * <br>
1200   * Example...<br>
1201   * Key = 3944417774205889744  <br>
1202   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1203   *
1204   * @param resultMap map to add values.  If null, this method will create and populate one to return
1205   * @param fs  The file system to use.
1206   * @param hbaseRootDir  The root directory to scan.
1207   * @param tableName name of the table to scan.
1208   * @param sfFilter optional path filter to apply to store files
1209   * @param executor optional executor service to parallelize this operation
1210   * @param progressReporter Instance or null; gets called every time we move to new region of
1211   *   family dir and for each store file.
1212   * @return Map keyed by StoreFile name with a value of the full Path.
1213   * @throws IOException When scanning the directory fails.
1214   * @deprecated Since 2.3.0. For removal in hbase4. Use ProgressReporter override instead.
1215   */
1216  @Deprecated
1217  public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap,
1218      final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter,
1219      ExecutorService executor, final HbckErrorReporter progressReporter)
1220      throws IOException, InterruptedException {
1221    return getTableStoreFilePathMap(resultMap, fs, hbaseRootDir, tableName, sfFilter, executor,
1222        new ProgressReporter() {
1223          @Override
1224          public void progress(FileStatus status) {
1225            // status is not used in this implementation.
1226            progressReporter.progress();
1227          }
1228        });
1229  }
1230
1231  /**
1232   * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1233   * table StoreFile names to the full Path.  Note that because this method can be called
1234   * on a 'live' HBase system that we will skip files that no longer exist by the time
1235   * we traverse them and similarly the user of the result needs to consider that some
1236   * entries in this map may not exist by the time this call completes.
1237   * <br>
1238   * Example...<br>
1239   * Key = 3944417774205889744  <br>
1240   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1241   *
1242   * @param resultMap map to add values.  If null, this method will create and populate one
1243   *   to return
1244   * @param fs  The file system to use.
1245   * @param hbaseRootDir  The root directory to scan.
1246   * @param tableName name of the table to scan.
1247   * @param sfFilter optional path filter to apply to store files
1248   * @param executor optional executor service to parallelize this operation
1249   * @param progressReporter Instance or null; gets called every time we move to new region of
1250   *   family dir and for each store file.
1251   * @return Map keyed by StoreFile name with a value of the full Path.
1252   * @throws IOException When scanning the directory fails.
1253   * @throws InterruptedException the thread is interrupted, either before or during the activity.
1254   */
1255  public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap,
1256      final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter,
1257      ExecutorService executor, final ProgressReporter progressReporter)
1258    throws IOException, InterruptedException {
1259
1260    final Map<String, Path> finalResultMap =
1261        resultMap == null ? new ConcurrentHashMap<>(128, 0.75f, 32) : resultMap;
1262
1263    // only include the directory paths to tables
1264    Path tableDir = CommonFSUtils.getTableDir(hbaseRootDir, tableName);
1265    // Inside a table, there are compaction.dir directories to skip.  Otherwise, all else
1266    // should be regions.
1267    final FamilyDirFilter familyFilter = new FamilyDirFilter(fs);
1268    final Vector<Exception> exceptions = new Vector<>();
1269
1270    try {
1271      List<FileStatus> regionDirs = FSUtils.listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs));
1272      if (regionDirs == null) {
1273        return finalResultMap;
1274      }
1275
1276      final List<Future<?>> futures = new ArrayList<>(regionDirs.size());
1277
1278      for (FileStatus regionDir : regionDirs) {
1279        if (null != progressReporter) {
1280          progressReporter.progress(regionDir);
1281        }
1282        final Path dd = regionDir.getPath();
1283
1284        if (!exceptions.isEmpty()) {
1285          break;
1286        }
1287
1288        Runnable getRegionStoreFileMapCall = new Runnable() {
1289          @Override
1290          public void run() {
1291            try {
1292              HashMap<String,Path> regionStoreFileMap = new HashMap<>();
1293              List<FileStatus> familyDirs = FSUtils.listStatusWithStatusFilter(fs, dd, familyFilter);
1294              if (familyDirs == null) {
1295                if (!fs.exists(dd)) {
1296                  LOG.warn("Skipping region because it no longer exists: " + dd);
1297                } else {
1298                  LOG.warn("Skipping region because it has no family dirs: " + dd);
1299                }
1300                return;
1301              }
1302              for (FileStatus familyDir : familyDirs) {
1303                if (null != progressReporter) {
1304                  progressReporter.progress(familyDir);
1305                }
1306                Path family = familyDir.getPath();
1307                if (family.getName().equals(HConstants.RECOVERED_EDITS_DIR)) {
1308                  continue;
1309                }
1310                // now in family, iterate over the StoreFiles and
1311                // put in map
1312                FileStatus[] familyStatus = fs.listStatus(family);
1313                for (FileStatus sfStatus : familyStatus) {
1314                  if (null != progressReporter) {
1315                    progressReporter.progress(sfStatus);
1316                  }
1317                  Path sf = sfStatus.getPath();
1318                  if (sfFilter == null || sfFilter.accept(sf)) {
1319                    regionStoreFileMap.put( sf.getName(), sf);
1320                  }
1321                }
1322              }
1323              finalResultMap.putAll(regionStoreFileMap);
1324            } catch (Exception e) {
1325              LOG.error("Could not get region store file map for region: " + dd, e);
1326              exceptions.add(e);
1327            }
1328          }
1329        };
1330
1331        // If executor is available, submit async tasks to exec concurrently, otherwise
1332        // just do serial sync execution
1333        if (executor != null) {
1334          Future<?> future = executor.submit(getRegionStoreFileMapCall);
1335          futures.add(future);
1336        } else {
1337          FutureTask<?> future = new FutureTask<>(getRegionStoreFileMapCall, null);
1338          future.run();
1339          futures.add(future);
1340        }
1341      }
1342
1343      // Ensure all pending tasks are complete (or that we run into an exception)
1344      for (Future<?> f : futures) {
1345        if (!exceptions.isEmpty()) {
1346          break;
1347        }
1348        try {
1349          f.get();
1350        } catch (ExecutionException e) {
1351          LOG.error("Unexpected exec exception!  Should've been caught already.  (Bug?)", e);
1352          // Shouldn't happen, we already logged/caught any exceptions in the Runnable
1353        }
1354      }
1355    } catch (IOException e) {
1356      LOG.error("Cannot execute getTableStoreFilePathMap for " + tableName, e);
1357      exceptions.add(e);
1358    } finally {
1359      if (!exceptions.isEmpty()) {
1360        // Just throw the first exception as an indication something bad happened
1361        // Don't need to propagate all the exceptions, we already logged them all anyway
1362        Throwables.propagateIfPossible(exceptions.firstElement(), IOException.class);
1363        throw new IOException(exceptions.firstElement());
1364      }
1365    }
1366
1367    return finalResultMap;
1368  }
1369
1370  public static int getRegionReferenceFileCount(final FileSystem fs, final Path p) {
1371    int result = 0;
1372    try {
1373      for (Path familyDir:getFamilyDirs(fs, p)){
1374        result += getReferenceFilePaths(fs, familyDir).size();
1375      }
1376    } catch (IOException e) {
1377      LOG.warn("Error counting reference files", e);
1378    }
1379    return result;
1380  }
1381
1382  /**
1383   * Runs through the HBase rootdir and creates a reverse lookup map for
1384   * table StoreFile names to the full Path.
1385   * <br>
1386   * Example...<br>
1387   * Key = 3944417774205889744  <br>
1388   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1389   *
1390   * @param fs  The file system to use.
1391   * @param hbaseRootDir  The root directory to scan.
1392   * @return Map keyed by StoreFile name with a value of the full Path.
1393   * @throws IOException When scanning the directory fails.
1394   */
1395  public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs,
1396      final Path hbaseRootDir)
1397  throws IOException, InterruptedException {
1398    return getTableStoreFilePathMap(fs, hbaseRootDir, null, null, (ProgressReporter)null);
1399  }
1400
1401  /**
1402   * Runs through the HBase rootdir and creates a reverse lookup map for
1403   * table StoreFile names to the full Path.
1404   * <br>
1405   * Example...<br>
1406   * Key = 3944417774205889744  <br>
1407   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1408   *
1409   * @param fs  The file system to use.
1410   * @param hbaseRootDir  The root directory to scan.
1411   * @param sfFilter optional path filter to apply to store files
1412   * @param executor optional executor service to parallelize this operation
1413   * @param progressReporter Instance or null; gets called every time we move to new region of
1414   *   family dir and for each store file.
1415   * @return Map keyed by StoreFile name with a value of the full Path.
1416   * @throws IOException When scanning the directory fails.
1417   * @deprecated Since 2.3.0. Will be removed in hbase4. Used {@link
1418   *   #getTableStoreFilePathMap(FileSystem, Path, PathFilter, ExecutorService, ProgressReporter)}
1419   */
1420  @Deprecated
1421  public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs,
1422      final Path hbaseRootDir, PathFilter sfFilter, ExecutorService executor,
1423      HbckErrorReporter progressReporter)
1424    throws IOException, InterruptedException {
1425    return getTableStoreFilePathMap(fs, hbaseRootDir, sfFilter, executor,
1426        new ProgressReporter() {
1427          @Override
1428          public void progress(FileStatus status) {
1429            // status is not used in this implementation.
1430            progressReporter.progress();
1431          }
1432        });
1433  }
1434
1435  /**
1436   * Runs through the HBase rootdir and creates a reverse lookup map for
1437   * table StoreFile names to the full Path.
1438   * <br>
1439   * Example...<br>
1440   * Key = 3944417774205889744  <br>
1441   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1442   *
1443   * @param fs  The file system to use.
1444   * @param hbaseRootDir  The root directory to scan.
1445   * @param sfFilter optional path filter to apply to store files
1446   * @param executor optional executor service to parallelize this operation
1447   * @param progressReporter Instance or null; gets called every time we move to new region of
1448   *   family dir and for each store file.
1449   * @return Map keyed by StoreFile name with a value of the full Path.
1450   * @throws IOException When scanning the directory fails.
1451   * @throws InterruptedException
1452   */
1453  public static Map<String, Path> getTableStoreFilePathMap(
1454    final FileSystem fs, final Path hbaseRootDir, PathFilter sfFilter,
1455        ExecutorService executor, ProgressReporter progressReporter)
1456  throws IOException, InterruptedException {
1457    ConcurrentHashMap<String, Path> map = new ConcurrentHashMap<>(1024, 0.75f, 32);
1458
1459    // if this method looks similar to 'getTableFragmentation' that is because
1460    // it was borrowed from it.
1461
1462    // only include the directory paths to tables
1463    for (Path tableDir : FSUtils.getTableDirs(fs, hbaseRootDir)) {
1464      getTableStoreFilePathMap(map, fs, hbaseRootDir, CommonFSUtils.getTableName(tableDir),
1465        sfFilter, executor, progressReporter);
1466    }
1467    return map;
1468  }
1469
1470  /**
1471   * Filters FileStatuses in an array and returns a list
1472   *
1473   * @param input   An array of FileStatuses
1474   * @param filter  A required filter to filter the array
1475   * @return        A list of FileStatuses
1476   */
1477  public static List<FileStatus> filterFileStatuses(FileStatus[] input,
1478      FileStatusFilter filter) {
1479    if (input == null) return null;
1480    return filterFileStatuses(Iterators.forArray(input), filter);
1481  }
1482
1483  /**
1484   * Filters FileStatuses in an iterator and returns a list
1485   *
1486   * @param input   An iterator of FileStatuses
1487   * @param filter  A required filter to filter the array
1488   * @return        A list of FileStatuses
1489   */
1490  public static List<FileStatus> filterFileStatuses(Iterator<FileStatus> input,
1491      FileStatusFilter filter) {
1492    if (input == null) return null;
1493    ArrayList<FileStatus> results = new ArrayList<>();
1494    while (input.hasNext()) {
1495      FileStatus f = input.next();
1496      if (filter.accept(f)) {
1497        results.add(f);
1498      }
1499    }
1500    return results;
1501  }
1502
1503  /**
1504   * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
1505   * This accommodates differences between hadoop versions, where hadoop 1
1506   * does not throw a FileNotFoundException, and return an empty FileStatus[]
1507   * while Hadoop 2 will throw FileNotFoundException.
1508   *
1509   * @param fs file system
1510   * @param dir directory
1511   * @param filter file status filter
1512   * @return null if dir is empty or doesn't exist, otherwise FileStatus list
1513   */
1514  public static List<FileStatus> listStatusWithStatusFilter(final FileSystem fs,
1515      final Path dir, final FileStatusFilter filter) throws IOException {
1516    FileStatus [] status = null;
1517    try {
1518      status = fs.listStatus(dir);
1519    } catch (FileNotFoundException fnfe) {
1520      LOG.trace("{} does not exist", dir);
1521      return null;
1522    }
1523
1524    if (ArrayUtils.getLength(status) == 0)  {
1525      return null;
1526    }
1527
1528    if (filter == null) {
1529      return Arrays.asList(status);
1530    } else {
1531      List<FileStatus> status2 = filterFileStatuses(status, filter);
1532      if (status2 == null || status2.isEmpty()) {
1533        return null;
1534      } else {
1535        return status2;
1536      }
1537    }
1538  }
1539
1540  /**
1541   * This function is to scan the root path of the file system to get the
1542   * degree of locality for each region on each of the servers having at least
1543   * one block of that region.
1544   * This is used by the tool {@link org.apache.hadoop.hbase.master.RegionPlacementMaintainer}
1545   *
1546   * @param conf
1547   *          the configuration to use
1548   * @return the mapping from region encoded name to a map of server names to
1549   *           locality fraction
1550   * @throws IOException
1551   *           in case of file system errors or interrupts
1552   */
1553  public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
1554      final Configuration conf) throws IOException {
1555    return getRegionDegreeLocalityMappingFromFS(
1556        conf, null,
1557        conf.getInt(THREAD_POOLSIZE, DEFAULT_THREAD_POOLSIZE));
1558
1559  }
1560
1561  /**
1562   * This function is to scan the root path of the file system to get the
1563   * degree of locality for each region on each of the servers having at least
1564   * one block of that region.
1565   *
1566   * @param conf
1567   *          the configuration to use
1568   * @param desiredTable
1569   *          the table you wish to scan locality for
1570   * @param threadPoolSize
1571   *          the thread pool size to use
1572   * @return the mapping from region encoded name to a map of server names to
1573   *           locality fraction
1574   * @throws IOException
1575   *           in case of file system errors or interrupts
1576   */
1577  public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
1578      final Configuration conf, final String desiredTable, int threadPoolSize)
1579      throws IOException {
1580    Map<String, Map<String, Float>> regionDegreeLocalityMapping = new ConcurrentHashMap<>();
1581    getRegionLocalityMappingFromFS(conf, desiredTable, threadPoolSize, regionDegreeLocalityMapping);
1582    return regionDegreeLocalityMapping;
1583  }
1584
1585  /**
1586   * This function is to scan the root path of the file system to get either the
1587   * mapping between the region name and its best locality region server or the
1588   * degree of locality of each region on each of the servers having at least
1589   * one block of that region. The output map parameters are both optional.
1590   *
1591   * @param conf
1592   *          the configuration to use
1593   * @param desiredTable
1594   *          the table you wish to scan locality for
1595   * @param threadPoolSize
1596   *          the thread pool size to use
1597   * @param regionDegreeLocalityMapping
1598   *          the map into which to put the locality degree mapping or null,
1599   *          must be a thread-safe implementation
1600   * @throws IOException
1601   *           in case of file system errors or interrupts
1602   */
1603  private static void getRegionLocalityMappingFromFS(final Configuration conf,
1604    final String desiredTable, int threadPoolSize,
1605    final Map<String, Map<String, Float>> regionDegreeLocalityMapping) throws IOException {
1606    final FileSystem fs = FileSystem.get(conf);
1607    final Path rootPath = CommonFSUtils.getRootDir(conf);
1608    final long startTime = EnvironmentEdgeManager.currentTime();
1609    final Path queryPath;
1610    // The table files are in ${hbase.rootdir}/data/<namespace>/<table>/*
1611    if (null == desiredTable) {
1612      queryPath =
1613        new Path(new Path(rootPath, HConstants.BASE_NAMESPACE_DIR).toString() + "/*/*/*/");
1614    } else {
1615      queryPath = new Path(
1616        CommonFSUtils.getTableDir(rootPath, TableName.valueOf(desiredTable)).toString() + "/*/");
1617    }
1618
1619    // reject all paths that are not appropriate
1620    PathFilter pathFilter = new PathFilter() {
1621      @Override
1622      public boolean accept(Path path) {
1623        // this is the region name; it may get some noise data
1624        if (null == path) {
1625          return false;
1626        }
1627
1628        // no parent?
1629        Path parent = path.getParent();
1630        if (null == parent) {
1631          return false;
1632        }
1633
1634        String regionName = path.getName();
1635        if (null == regionName) {
1636          return false;
1637        }
1638
1639        if (!regionName.toLowerCase(Locale.ROOT).matches("[0-9a-f]+")) {
1640          return false;
1641        }
1642        return true;
1643      }
1644    };
1645
1646    FileStatus[] statusList = fs.globStatus(queryPath, pathFilter);
1647
1648    if (LOG.isDebugEnabled()) {
1649      LOG.debug("Query Path: {} ; # list of files: {}", queryPath, Arrays.toString(statusList));
1650    }
1651
1652    if (null == statusList) {
1653      return;
1654    }
1655
1656    // lower the number of threads in case we have very few expected regions
1657    threadPoolSize = Math.min(threadPoolSize, statusList.length);
1658
1659    // run in multiple threads
1660    final ExecutorService tpe = Executors.newFixedThreadPool(threadPoolSize,
1661      new ThreadFactoryBuilder().setNameFormat("FSRegionQuery-pool-%d")
1662        .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
1663    try {
1664      // ignore all file status items that are not of interest
1665      for (FileStatus regionStatus : statusList) {
1666        if (null == regionStatus || !regionStatus.isDirectory()) {
1667          continue;
1668        }
1669
1670        final Path regionPath = regionStatus.getPath();
1671        if (null != regionPath) {
1672          tpe.execute(new FSRegionScanner(fs, regionPath, null, regionDegreeLocalityMapping));
1673        }
1674      }
1675    } finally {
1676      tpe.shutdown();
1677      final long threadWakeFrequency = (long) conf.getInt(HConstants.THREAD_WAKE_FREQUENCY,
1678        HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
1679      try {
1680        // here we wait until TPE terminates, which is either naturally or by
1681        // exceptions in the execution of the threads
1682        while (!tpe.awaitTermination(threadWakeFrequency,
1683            TimeUnit.MILLISECONDS)) {
1684          // printing out rough estimate, so as to not introduce
1685          // AtomicInteger
1686          LOG.info("Locality checking is underway: { Scanned Regions : "
1687              + ((ThreadPoolExecutor) tpe).getCompletedTaskCount() + "/"
1688              + ((ThreadPoolExecutor) tpe).getTaskCount() + " }");
1689        }
1690      } catch (InterruptedException e) {
1691        Thread.currentThread().interrupt();
1692        throw (InterruptedIOException) new InterruptedIOException().initCause(e);
1693      }
1694    }
1695
1696    long overhead = EnvironmentEdgeManager.currentTime() - startTime;
1697    LOG.info("Scan DFS for locality info takes {}ms", overhead);
1698  }
1699
1700  /**
1701   * Do our short circuit read setup.
1702   * Checks buffer size to use and whether to do checksumming in hbase or hdfs.
1703   * @param conf
1704   */
1705  public static void setupShortCircuitRead(final Configuration conf) {
1706    // Check that the user has not set the "dfs.client.read.shortcircuit.skip.checksum" property.
1707    boolean shortCircuitSkipChecksum =
1708      conf.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false);
1709    boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
1710    if (shortCircuitSkipChecksum) {
1711      LOG.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not " +
1712        "be set to true." + (useHBaseChecksum ? " HBase checksum doesn't require " +
1713        "it, see https://issues.apache.org/jira/browse/HBASE-6868." : ""));
1714      assert !shortCircuitSkipChecksum; //this will fail if assertions are on
1715    }
1716    checkShortCircuitReadBufferSize(conf);
1717  }
1718
1719  /**
1720   * Check if short circuit read buffer size is set and if not, set it to hbase value.
1721   * @param conf
1722   */
1723  public static void checkShortCircuitReadBufferSize(final Configuration conf) {
1724    final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2;
1725    final int notSet = -1;
1726    // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2
1727    final String dfsKey = "dfs.client.read.shortcircuit.buffer.size";
1728    int size = conf.getInt(dfsKey, notSet);
1729    // If a size is set, return -- we will use it.
1730    if (size != notSet) return;
1731    // But short circuit buffer size is normally not set.  Put in place the hbase wanted size.
1732    int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize);
1733    conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
1734  }
1735
1736  /**
1737   * @param c
1738   * @return The DFSClient DFSHedgedReadMetrics instance or null if can't be found or not on hdfs.
1739   * @throws IOException
1740   */
1741  public static DFSHedgedReadMetrics getDFSHedgedReadMetrics(final Configuration c)
1742      throws IOException {
1743    if (!CommonFSUtils.isHDFS(c)) {
1744      return null;
1745    }
1746    // getHedgedReadMetrics is package private. Get the DFSClient instance that is internal
1747    // to the DFS FS instance and make the method getHedgedReadMetrics accessible, then invoke it
1748    // to get the singleton instance of DFSHedgedReadMetrics shared by DFSClients.
1749    final String name = "getHedgedReadMetrics";
1750    DFSClient dfsclient = ((DistributedFileSystem)FileSystem.get(c)).getClient();
1751    Method m;
1752    try {
1753      m = dfsclient.getClass().getDeclaredMethod(name);
1754    } catch (NoSuchMethodException e) {
1755      LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " +
1756          e.getMessage());
1757      return null;
1758    } catch (SecurityException e) {
1759      LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " +
1760          e.getMessage());
1761      return null;
1762    }
1763    m.setAccessible(true);
1764    try {
1765      return (DFSHedgedReadMetrics)m.invoke(dfsclient);
1766    } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
1767      LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " +
1768          e.getMessage());
1769      return null;
1770    }
1771  }
1772
1773  public static List<Path> copyFilesParallel(FileSystem srcFS, Path src, FileSystem dstFS, Path dst,
1774      Configuration conf, int threads) throws IOException {
1775    ExecutorService pool = Executors.newFixedThreadPool(threads);
1776    List<Future<Void>> futures = new ArrayList<>();
1777    List<Path> traversedPaths;
1778    try {
1779      traversedPaths = copyFiles(srcFS, src, dstFS, dst, conf, pool, futures);
1780      for (Future<Void> future : futures) {
1781        future.get();
1782      }
1783    } catch (ExecutionException | InterruptedException | IOException e) {
1784      throw new IOException("Copy snapshot reference files failed", e);
1785    } finally {
1786      pool.shutdownNow();
1787    }
1788    return traversedPaths;
1789  }
1790
1791  private static List<Path> copyFiles(FileSystem srcFS, Path src, FileSystem dstFS, Path dst,
1792      Configuration conf, ExecutorService pool, List<Future<Void>> futures) throws IOException {
1793    List<Path> traversedPaths = new ArrayList<>();
1794    traversedPaths.add(dst);
1795    FileStatus currentFileStatus = srcFS.getFileStatus(src);
1796    if (currentFileStatus.isDirectory()) {
1797      if (!dstFS.mkdirs(dst)) {
1798        throw new IOException("Create directory failed: " + dst);
1799      }
1800      FileStatus[] subPaths = srcFS.listStatus(src);
1801      for (FileStatus subPath : subPaths) {
1802        traversedPaths.addAll(copyFiles(srcFS, subPath.getPath(), dstFS,
1803          new Path(dst, subPath.getPath().getName()), conf, pool, futures));
1804      }
1805    } else {
1806      Future<Void> future = pool.submit(() -> {
1807        FileUtil.copy(srcFS, src, dstFS, dst, false, false, conf);
1808        return null;
1809      });
1810      futures.add(future);
1811    }
1812    return traversedPaths;
1813  }
1814
1815  /**
1816   * @return A set containing all namenode addresses of fs
1817   */
1818  private static Set<InetSocketAddress> getNNAddresses(DistributedFileSystem fs,
1819    Configuration conf) {
1820    Set<InetSocketAddress> addresses = new HashSet<>();
1821    String serviceName = fs.getCanonicalServiceName();
1822
1823    if (serviceName.startsWith("ha-hdfs")) {
1824      try {
1825        Map<String, Map<String, InetSocketAddress>> addressMap =
1826          DFSUtil.getNNServiceRpcAddressesForCluster(conf);
1827        String nameService = serviceName.substring(serviceName.indexOf(":") + 1);
1828        if (addressMap.containsKey(nameService)) {
1829          Map<String, InetSocketAddress> nnMap = addressMap.get(nameService);
1830          for (Map.Entry<String, InetSocketAddress> e2 : nnMap.entrySet()) {
1831            InetSocketAddress addr = e2.getValue();
1832            addresses.add(addr);
1833          }
1834        }
1835      } catch (Exception e) {
1836        LOG.warn("DFSUtil.getNNServiceRpcAddresses failed. serviceName=" + serviceName, e);
1837      }
1838    } else {
1839      URI uri = fs.getUri();
1840      int port = uri.getPort();
1841      if (port < 0) {
1842        int idx = serviceName.indexOf(':');
1843        port = Integer.parseInt(serviceName.substring(idx + 1));
1844      }
1845      InetSocketAddress addr = new InetSocketAddress(uri.getHost(), port);
1846      addresses.add(addr);
1847    }
1848
1849    return addresses;
1850  }
1851
1852  /**
1853   * @param conf the Configuration of HBase
1854   * @return Whether srcFs and desFs are on same hdfs or not
1855   */
1856  public static boolean isSameHdfs(Configuration conf, FileSystem srcFs, FileSystem desFs) {
1857    // By getCanonicalServiceName, we could make sure both srcFs and desFs
1858    // show a unified format which contains scheme, host and port.
1859    String srcServiceName = srcFs.getCanonicalServiceName();
1860    String desServiceName = desFs.getCanonicalServiceName();
1861
1862    if (srcServiceName == null || desServiceName == null) {
1863      return false;
1864    }
1865    if (srcServiceName.equals(desServiceName)) {
1866      return true;
1867    }
1868    if (srcServiceName.startsWith("ha-hdfs") && desServiceName.startsWith("ha-hdfs")) {
1869      Collection<String> internalNameServices =
1870        conf.getTrimmedStringCollection("dfs.internal.nameservices");
1871      if (!internalNameServices.isEmpty()) {
1872        if (internalNameServices.contains(srcServiceName.split(":")[1])) {
1873          return true;
1874        } else {
1875          return false;
1876        }
1877      }
1878    }
1879    if (srcFs instanceof DistributedFileSystem && desFs instanceof DistributedFileSystem) {
1880      // If one serviceName is an HA format while the other is a non-HA format,
1881      // maybe they refer to the same FileSystem.
1882      // For example, srcFs is "ha-hdfs://nameservices" and desFs is "hdfs://activeNamenode:port"
1883      Set<InetSocketAddress> srcAddrs = getNNAddresses((DistributedFileSystem) srcFs, conf);
1884      Set<InetSocketAddress> desAddrs = getNNAddresses((DistributedFileSystem) desFs, conf);
1885      if (Sets.intersection(srcAddrs, desAddrs).size() > 0) {
1886        return true;
1887      }
1888    }
1889
1890    return false;
1891  }
1892}