001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.util;
020
021import edu.umd.cs.findbugs.annotations.CheckForNull;
022import java.io.ByteArrayInputStream;
023import java.io.DataInputStream;
024import java.io.EOFException;
025import java.io.FileNotFoundException;
026import java.io.IOException;
027import java.io.InterruptedIOException;
028import java.lang.reflect.InvocationTargetException;
029import java.lang.reflect.Method;
030import java.net.InetSocketAddress;
031import java.net.URI;
032import java.util.ArrayList;
033import java.util.Arrays;
034import java.util.Collection;
035import java.util.Collections;
036import java.util.HashMap;
037import java.util.HashSet;
038import java.util.Iterator;
039import java.util.List;
040import java.util.Locale;
041import java.util.Map;
042import java.util.Optional;
043import java.util.Set;
044import java.util.Vector;
045import java.util.concurrent.ConcurrentHashMap;
046import java.util.concurrent.ExecutionException;
047import java.util.concurrent.ExecutorService;
048import java.util.concurrent.Executors;
049import java.util.concurrent.Future;
050import java.util.concurrent.FutureTask;
051import java.util.concurrent.ThreadPoolExecutor;
052import java.util.concurrent.TimeUnit;
053import java.util.regex.Pattern;
054import org.apache.commons.lang3.ArrayUtils;
055import org.apache.hadoop.conf.Configuration;
056import org.apache.hadoop.fs.BlockLocation;
057import org.apache.hadoop.fs.FSDataInputStream;
058import org.apache.hadoop.fs.FSDataOutputStream;
059import org.apache.hadoop.fs.FileStatus;
060import org.apache.hadoop.fs.FileSystem;
061import org.apache.hadoop.fs.FileUtil;
062import org.apache.hadoop.fs.Path;
063import org.apache.hadoop.fs.PathFilter;
064import org.apache.hadoop.fs.StorageType;
065import org.apache.hadoop.fs.permission.FsPermission;
066import org.apache.hadoop.hbase.ClusterId;
067import org.apache.hadoop.hbase.HConstants;
068import org.apache.hadoop.hbase.HDFSBlocksDistribution;
069import org.apache.hadoop.hbase.TableName;
070import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
071import org.apache.hadoop.hbase.client.RegionInfo;
072import org.apache.hadoop.hbase.client.RegionInfoBuilder;
073import org.apache.hadoop.hbase.exceptions.DeserializationException;
074import org.apache.hadoop.hbase.fs.HFileSystem;
075import org.apache.hadoop.hbase.io.HFileLink;
076import org.apache.hadoop.hbase.master.HMaster;
077import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
078import org.apache.hadoop.hdfs.DFSClient;
079import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
080import org.apache.hadoop.hdfs.DFSUtil;
081import org.apache.hadoop.hdfs.DistributedFileSystem;
082import org.apache.hadoop.hdfs.protocol.HdfsConstants;
083import org.apache.hadoop.io.IOUtils;
084import org.apache.hadoop.ipc.RemoteException;
085import org.apache.hadoop.util.Progressable;
086import org.apache.yetus.audience.InterfaceAudience;
087import org.slf4j.Logger;
088import org.slf4j.LoggerFactory;
089
090import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
091import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
092import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
093import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
094import org.apache.hbase.thirdparty.com.google.common.primitives.Ints;
095
096import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
097import org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos;
098
099/**
100 * Utility methods for interacting with the underlying file system.
101 */
102@InterfaceAudience.Private
103public final class FSUtils {
104  private static final Logger LOG = LoggerFactory.getLogger(FSUtils.class);
105
106  private static final String THREAD_POOLSIZE = "hbase.client.localityCheck.threadPoolSize";
107  private static final int DEFAULT_THREAD_POOLSIZE = 2;
108
109  /** Set to true on Windows platforms */
110  @VisibleForTesting // currently only used in testing. TODO refactor into a test class
111  public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
112
113  private FSUtils() {
114  }
115
116  /**
117   * @return True is <code>fs</code> is instance of DistributedFileSystem
118   * @throws IOException
119   */
120  public static boolean isDistributedFileSystem(final FileSystem fs) throws IOException {
121    FileSystem fileSystem = fs;
122    // If passed an instance of HFileSystem, it fails instanceof DistributedFileSystem.
123    // Check its backing fs for dfs-ness.
124    if (fs instanceof HFileSystem) {
125      fileSystem = ((HFileSystem)fs).getBackingFs();
126    }
127    return fileSystem instanceof DistributedFileSystem;
128  }
129
130  /**
131   * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
132   * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true.  Does not consider
133   * schema; i.e. if schemas different but path or subpath matches, the two will equate.
134   * @param pathToSearch Path we will be trying to match.
135   * @param pathTail
136   * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
137   */
138  public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) {
139    Path tailPath = pathTail;
140    String tailName;
141    Path toSearch = pathToSearch;
142    String toSearchName;
143    boolean result = false;
144
145    if (pathToSearch.depth() != pathTail.depth()) {
146      return false;
147    }
148
149    do {
150      tailName = tailPath.getName();
151      if (tailName == null || tailName.isEmpty()) {
152        result = true;
153        break;
154      }
155      toSearchName = toSearch.getName();
156      if (toSearchName == null || toSearchName.isEmpty()) {
157        break;
158      }
159      // Move up a parent on each path for next go around.  Path doesn't let us go off the end.
160      tailPath = tailPath.getParent();
161      toSearch = toSearch.getParent();
162    } while(tailName.equals(toSearchName));
163    return result;
164  }
165
166  /**
167   * Delete the region directory if exists.
168   * @return True if deleted the region directory.
169   * @throws IOException
170   */
171  public static boolean deleteRegionDir(final Configuration conf, final RegionInfo hri)
172    throws IOException {
173    Path rootDir = CommonFSUtils.getRootDir(conf);
174    FileSystem fs = rootDir.getFileSystem(conf);
175    return CommonFSUtils.deleteDirectory(fs,
176      new Path(CommonFSUtils.getTableDir(rootDir, hri.getTable()), hri.getEncodedName()));
177  }
178
179 /**
180   * Create the specified file on the filesystem. By default, this will:
181   * <ol>
182   * <li>overwrite the file if it exists</li>
183   * <li>apply the umask in the configuration (if it is enabled)</li>
184   * <li>use the fs configured buffer size (or 4096 if not set)</li>
185   * <li>use the configured column family replication or default replication if
186   * {@link ColumnFamilyDescriptorBuilder#DEFAULT_DFS_REPLICATION}</li>
187   * <li>use the default block size</li>
188   * <li>not track progress</li>
189   * </ol>
190   * @param conf configurations
191   * @param fs {@link FileSystem} on which to write the file
192   * @param path {@link Path} to the file to write
193   * @param perm permissions
194   * @param favoredNodes favored data nodes
195   * @return output stream to the created file
196   * @throws IOException if the file cannot be created
197   */
198  public static FSDataOutputStream create(Configuration conf, FileSystem fs, Path path,
199    FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException {
200    if (fs instanceof HFileSystem) {
201      FileSystem backingFs = ((HFileSystem) fs).getBackingFs();
202      if (backingFs instanceof DistributedFileSystem) {
203        // Try to use the favoredNodes version via reflection to allow backwards-
204        // compatibility.
205        short replication = Short.parseShort(conf.get(ColumnFamilyDescriptorBuilder.DFS_REPLICATION,
206          String.valueOf(ColumnFamilyDescriptorBuilder.DEFAULT_DFS_REPLICATION)));
207        try {
208          return (FSDataOutputStream) (DistributedFileSystem.class
209            .getDeclaredMethod("create", Path.class, FsPermission.class, boolean.class, int.class,
210              short.class, long.class, Progressable.class, InetSocketAddress[].class)
211            .invoke(backingFs, path, perm, true, CommonFSUtils.getDefaultBufferSize(backingFs),
212              replication > 0 ? replication : CommonFSUtils.getDefaultReplication(backingFs, path),
213              CommonFSUtils.getDefaultBlockSize(backingFs, path), null, favoredNodes));
214        } catch (InvocationTargetException ite) {
215          // Function was properly called, but threw it's own exception.
216          throw new IOException(ite.getCause());
217        } catch (NoSuchMethodException e) {
218          LOG.debug("DFS Client does not support most favored nodes create; using default create");
219          LOG.trace("Ignoring; use default create", e);
220        } catch (IllegalArgumentException | SecurityException | IllegalAccessException e) {
221          LOG.debug("Ignoring (most likely Reflection related exception) " + e);
222        }
223      }
224    }
225    return CommonFSUtils.create(fs, path, perm, true);
226  }
227
228  /**
229   * Checks to see if the specified file system is available
230   *
231   * @param fs filesystem
232   * @throws IOException e
233   */
234  public static void checkFileSystemAvailable(final FileSystem fs)
235  throws IOException {
236    if (!(fs instanceof DistributedFileSystem)) {
237      return;
238    }
239    IOException exception = null;
240    DistributedFileSystem dfs = (DistributedFileSystem) fs;
241    try {
242      if (dfs.exists(new Path("/"))) {
243        return;
244      }
245    } catch (IOException e) {
246      exception = e instanceof RemoteException ?
247              ((RemoteException)e).unwrapRemoteException() : e;
248    }
249    try {
250      fs.close();
251    } catch (Exception e) {
252      LOG.error("file system close failed: ", e);
253    }
254    throw new IOException("File system is not available", exception);
255  }
256
257  /**
258   * We use reflection because {@link DistributedFileSystem#setSafeMode(
259   * HdfsConstants.SafeModeAction action, boolean isChecked)} is not in hadoop 1.1
260   *
261   * @param dfs
262   * @return whether we're in safe mode
263   * @throws IOException
264   */
265  private static boolean isInSafeMode(DistributedFileSystem dfs) throws IOException {
266    boolean inSafeMode = false;
267    try {
268      Method m = DistributedFileSystem.class.getMethod("setSafeMode", new Class<?> []{
269          org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.class, boolean.class});
270      inSafeMode = (Boolean) m.invoke(dfs,
271        org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET, true);
272    } catch (Exception e) {
273      if (e instanceof IOException) throw (IOException) e;
274
275      // Check whether dfs is on safemode.
276      inSafeMode = dfs.setSafeMode(
277        org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET);
278    }
279    return inSafeMode;
280  }
281
282  /**
283   * Check whether dfs is in safemode.
284   * @param conf
285   * @throws IOException
286   */
287  public static void checkDfsSafeMode(final Configuration conf)
288  throws IOException {
289    boolean isInSafeMode = false;
290    FileSystem fs = FileSystem.get(conf);
291    if (fs instanceof DistributedFileSystem) {
292      DistributedFileSystem dfs = (DistributedFileSystem)fs;
293      isInSafeMode = isInSafeMode(dfs);
294    }
295    if (isInSafeMode) {
296      throw new IOException("File system is in safemode, it can't be written now");
297    }
298  }
299
300  /**
301   * Verifies current version of file system
302   *
303   * @param fs filesystem object
304   * @param rootdir root hbase directory
305   * @return null if no version file exists, version string otherwise
306   * @throws IOException if the version file fails to open
307   * @throws DeserializationException if the version data cannot be translated into a version
308   */
309  public static String getVersion(FileSystem fs, Path rootdir)
310  throws IOException, DeserializationException {
311    final Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
312    FileStatus[] status = null;
313    try {
314      // hadoop 2.0 throws FNFE if directory does not exist.
315      // hadoop 1.0 returns null if directory does not exist.
316      status = fs.listStatus(versionFile);
317    } catch (FileNotFoundException fnfe) {
318      return null;
319    }
320    if (ArrayUtils.getLength(status) == 0) {
321      return null;
322    }
323    String version = null;
324    byte [] content = new byte [(int)status[0].getLen()];
325    FSDataInputStream s = fs.open(versionFile);
326    try {
327      IOUtils.readFully(s, content, 0, content.length);
328      if (ProtobufUtil.isPBMagicPrefix(content)) {
329        version = parseVersionFrom(content);
330      } else {
331        // Presume it pre-pb format.
332        try (DataInputStream dis = new DataInputStream(new ByteArrayInputStream(content))) {
333          version = dis.readUTF();
334        }
335      }
336    } catch (EOFException eof) {
337      LOG.warn("Version file was empty, odd, will try to set it.");
338    } finally {
339      s.close();
340    }
341    return version;
342  }
343
344  /**
345   * Parse the content of the ${HBASE_ROOTDIR}/hbase.version file.
346   * @param bytes The byte content of the hbase.version file
347   * @return The version found in the file as a String
348   * @throws DeserializationException if the version data cannot be translated into a version
349   */
350  static String parseVersionFrom(final byte [] bytes)
351  throws DeserializationException {
352    ProtobufUtil.expectPBMagicPrefix(bytes);
353    int pblen = ProtobufUtil.lengthOfPBMagic();
354    FSProtos.HBaseVersionFileContent.Builder builder =
355      FSProtos.HBaseVersionFileContent.newBuilder();
356    try {
357      ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
358      return builder.getVersion();
359    } catch (IOException e) {
360      // Convert
361      throw new DeserializationException(e);
362    }
363  }
364
365  /**
366   * Create the content to write into the ${HBASE_ROOTDIR}/hbase.version file.
367   * @param version Version to persist
368   * @return Serialized protobuf with <code>version</code> content and a bit of pb magic for a prefix.
369   */
370  static byte [] toVersionByteArray(final String version) {
371    FSProtos.HBaseVersionFileContent.Builder builder =
372      FSProtos.HBaseVersionFileContent.newBuilder();
373    return ProtobufUtil.prependPBMagic(builder.setVersion(version).build().toByteArray());
374  }
375
376  /**
377   * Verifies current version of file system
378   *
379   * @param fs file system
380   * @param rootdir root directory of HBase installation
381   * @param message if true, issues a message on System.out
382   * @throws IOException if the version file cannot be opened
383   * @throws DeserializationException if the contents of the version file cannot be parsed
384   */
385  public static void checkVersion(FileSystem fs, Path rootdir, boolean message)
386  throws IOException, DeserializationException {
387    checkVersion(fs, rootdir, message, 0, HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
388  }
389
390  /**
391   * Verifies current version of file system
392   *
393   * @param fs file system
394   * @param rootdir root directory of HBase installation
395   * @param message if true, issues a message on System.out
396   * @param wait wait interval
397   * @param retries number of times to retry
398   *
399   * @throws IOException if the version file cannot be opened
400   * @throws DeserializationException if the contents of the version file cannot be parsed
401   */
402  public static void checkVersion(FileSystem fs, Path rootdir,
403      boolean message, int wait, int retries)
404  throws IOException, DeserializationException {
405    String version = getVersion(fs, rootdir);
406    String msg;
407    if (version == null) {
408      if (!metaRegionExists(fs, rootdir)) {
409        // rootDir is empty (no version file and no root region)
410        // just create new version file (HBASE-1195)
411        setVersion(fs, rootdir, wait, retries);
412        return;
413      } else {
414        msg = "hbase.version file is missing. Is your hbase.rootdir valid? " +
415            "You can restore hbase.version file by running 'HBCK2 filesystem -fix'. " +
416            "See https://github.com/apache/hbase-operator-tools/tree/master/hbase-hbck2";
417      }
418    } else if (version.compareTo(HConstants.FILE_SYSTEM_VERSION) == 0) {
419      return;
420    } else {
421      msg = "HBase file layout needs to be upgraded. Current filesystem version is " + version +
422          " but software requires version " + HConstants.FILE_SYSTEM_VERSION +
423          ". Consult http://hbase.apache.org/book.html for further information about " +
424          "upgrading HBase.";
425    }
426
427    // version is deprecated require migration
428    // Output on stdout so user sees it in terminal.
429    if (message) {
430      System.out.println("WARNING! " + msg);
431    }
432    throw new FileSystemVersionException(msg);
433  }
434
435  /**
436   * Sets version of file system
437   *
438   * @param fs filesystem object
439   * @param rootdir hbase root
440   * @throws IOException e
441   */
442  public static void setVersion(FileSystem fs, Path rootdir)
443  throws IOException {
444    setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, 0,
445      HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
446  }
447
448  /**
449   * Sets version of file system
450   *
451   * @param fs filesystem object
452   * @param rootdir hbase root
453   * @param wait time to wait for retry
454   * @param retries number of times to retry before failing
455   * @throws IOException e
456   */
457  public static void setVersion(FileSystem fs, Path rootdir, int wait, int retries)
458  throws IOException {
459    setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, wait, retries);
460  }
461
462
463  /**
464   * Sets version of file system
465   *
466   * @param fs filesystem object
467   * @param rootdir hbase root directory
468   * @param version version to set
469   * @param wait time to wait for retry
470   * @param retries number of times to retry before throwing an IOException
471   * @throws IOException e
472   */
473  public static void setVersion(FileSystem fs, Path rootdir, String version,
474      int wait, int retries) throws IOException {
475    Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
476    Path tempVersionFile = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY + Path.SEPARATOR +
477      HConstants.VERSION_FILE_NAME);
478    while (true) {
479      try {
480        // Write the version to a temporary file
481        FSDataOutputStream s = fs.create(tempVersionFile);
482        try {
483          s.write(toVersionByteArray(version));
484          s.close();
485          s = null;
486          // Move the temp version file to its normal location. Returns false
487          // if the rename failed. Throw an IOE in that case.
488          if (!fs.rename(tempVersionFile, versionFile)) {
489            throw new IOException("Unable to move temp version file to " + versionFile);
490          }
491        } finally {
492          // Cleaning up the temporary if the rename failed would be trying
493          // too hard. We'll unconditionally create it again the next time
494          // through anyway, files are overwritten by default by create().
495
496          // Attempt to close the stream on the way out if it is still open.
497          try {
498            if (s != null) s.close();
499          } catch (IOException ignore) { }
500        }
501        LOG.info("Created version file at " + rootdir.toString() + " with version=" + version);
502        return;
503      } catch (IOException e) {
504        if (retries > 0) {
505          LOG.debug("Unable to create version file at " + rootdir.toString() + ", retrying", e);
506          fs.delete(versionFile, false);
507          try {
508            if (wait > 0) {
509              Thread.sleep(wait);
510            }
511          } catch (InterruptedException ie) {
512            throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
513          }
514          retries--;
515        } else {
516          throw e;
517        }
518      }
519    }
520  }
521
522  /**
523   * Checks that a cluster ID file exists in the HBase root directory
524   * @param fs the root directory FileSystem
525   * @param rootdir the HBase root directory in HDFS
526   * @param wait how long to wait between retries
527   * @return <code>true</code> if the file exists, otherwise <code>false</code>
528   * @throws IOException if checking the FileSystem fails
529   */
530  public static boolean checkClusterIdExists(FileSystem fs, Path rootdir,
531      long wait) throws IOException {
532    while (true) {
533      try {
534        Path filePath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
535        return fs.exists(filePath);
536      } catch (IOException ioe) {
537        if (wait > 0L) {
538          LOG.warn("Unable to check cluster ID file in {}, retrying in {}ms", rootdir, wait, ioe);
539          try {
540            Thread.sleep(wait);
541          } catch (InterruptedException e) {
542            Thread.currentThread().interrupt();
543            throw (InterruptedIOException) new InterruptedIOException().initCause(e);
544          }
545        } else {
546          throw ioe;
547        }
548      }
549    }
550  }
551
552  /**
553   * Returns the value of the unique cluster ID stored for this HBase instance.
554   * @param fs the root directory FileSystem
555   * @param rootdir the path to the HBase root directory
556   * @return the unique cluster identifier
557   * @throws IOException if reading the cluster ID file fails
558   */
559  public static ClusterId getClusterId(FileSystem fs, Path rootdir)
560  throws IOException {
561    Path idPath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
562    ClusterId clusterId = null;
563    FileStatus status = fs.exists(idPath)? fs.getFileStatus(idPath):  null;
564    if (status != null) {
565      int len = Ints.checkedCast(status.getLen());
566      byte [] content = new byte[len];
567      FSDataInputStream in = fs.open(idPath);
568      try {
569        in.readFully(content);
570      } catch (EOFException eof) {
571        LOG.warn("Cluster ID file {} is empty", idPath);
572      } finally{
573        in.close();
574      }
575      try {
576        clusterId = ClusterId.parseFrom(content);
577      } catch (DeserializationException e) {
578        throw new IOException("content=" + Bytes.toString(content), e);
579      }
580      // If not pb'd, make it so.
581      if (!ProtobufUtil.isPBMagicPrefix(content)) {
582        String cid = null;
583        in = fs.open(idPath);
584        try {
585          cid = in.readUTF();
586          clusterId = new ClusterId(cid);
587        } catch (EOFException eof) {
588          LOG.warn("Cluster ID file {} is empty", idPath);
589        } finally {
590          in.close();
591        }
592        rewriteAsPb(fs, rootdir, idPath, clusterId);
593      }
594      return clusterId;
595    } else {
596      LOG.warn("Cluster ID file does not exist at {}", idPath);
597    }
598    return clusterId;
599  }
600
601  /**
602   * @param cid
603   * @throws IOException
604   */
605  private static void rewriteAsPb(final FileSystem fs, final Path rootdir, final Path p,
606      final ClusterId cid)
607  throws IOException {
608    // Rewrite the file as pb.  Move aside the old one first, write new
609    // then delete the moved-aside file.
610    Path movedAsideName = new Path(p + "." + System.currentTimeMillis());
611    if (!fs.rename(p, movedAsideName)) throw new IOException("Failed rename of " + p);
612    setClusterId(fs, rootdir, cid, 100);
613    if (!fs.delete(movedAsideName, false)) {
614      throw new IOException("Failed delete of " + movedAsideName);
615    }
616    LOG.debug("Rewrote the hbase.id file as pb");
617  }
618
619  /**
620   * Writes a new unique identifier for this cluster to the "hbase.id" file in the HBase root
621   * directory. If any operations on the ID file fails, and {@code wait} is a positive value, the
622   * method will retry to produce the ID file until the thread is forcibly interrupted.
623   *
624   * @param fs the root directory FileSystem
625   * @param rootdir the path to the HBase root directory
626   * @param clusterId the unique identifier to store
627   * @param wait how long (in milliseconds) to wait between retries
628   * @throws IOException if writing to the FileSystem fails and no wait value
629   */
630  public static void setClusterId(final FileSystem fs, final Path rootdir,
631      final ClusterId clusterId, final long wait) throws IOException {
632
633    final Path idFile = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
634    final Path tempDir = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY);
635    final Path tempIdFile = new Path(tempDir, HConstants.CLUSTER_ID_FILE_NAME);
636
637    LOG.debug("Create cluster ID file [{}] with ID: {}", idFile, clusterId);
638
639    while (true) {
640      Optional<IOException> failure = Optional.empty();
641
642      LOG.debug("Write the cluster ID file to a temporary location: {}", tempIdFile);
643      try (FSDataOutputStream s = fs.create(tempIdFile)) {
644        s.write(clusterId.toByteArray());
645      } catch (IOException ioe) {
646        failure = Optional.of(ioe);
647      }
648
649      if (!failure.isPresent()) {
650        try {
651          LOG.debug("Move the temporary cluster ID file to its target location [{}]:[{}]",
652            tempIdFile, idFile);
653
654          if (!fs.rename(tempIdFile, idFile)) {
655            failure =
656                Optional.of(new IOException("Unable to move temp cluster ID file to " + idFile));
657          }
658        } catch (IOException ioe) {
659          failure = Optional.of(ioe);
660        }
661      }
662
663      if (failure.isPresent()) {
664        final IOException cause = failure.get();
665        if (wait > 0L) {
666          LOG.warn("Unable to create cluster ID file in {}, retrying in {}ms", rootdir, wait,
667            cause);
668          try {
669            Thread.sleep(wait);
670          } catch (InterruptedException e) {
671            Thread.currentThread().interrupt();
672            throw (InterruptedIOException) new InterruptedIOException().initCause(e);
673          }
674          continue;
675        } else {
676          throw cause;
677        }
678      } else {
679        return;
680      }
681    }
682  }
683
684  /**
685   * If DFS, check safe mode and if so, wait until we clear it.
686   * @param conf configuration
687   * @param wait Sleep between retries
688   * @throws IOException e
689   */
690  public static void waitOnSafeMode(final Configuration conf,
691    final long wait)
692  throws IOException {
693    FileSystem fs = FileSystem.get(conf);
694    if (!(fs instanceof DistributedFileSystem)) return;
695    DistributedFileSystem dfs = (DistributedFileSystem)fs;
696    // Make sure dfs is not in safe mode
697    while (isInSafeMode(dfs)) {
698      LOG.info("Waiting for dfs to exit safe mode...");
699      try {
700        Thread.sleep(wait);
701      } catch (InterruptedException e) {
702        Thread.currentThread().interrupt();
703        throw (InterruptedIOException) new InterruptedIOException().initCause(e);
704      }
705    }
706  }
707
708  /**
709   * Checks if meta region exists
710   * @param fs file system
711   * @param rootDir root directory of HBase installation
712   * @return true if exists
713   */
714  public static boolean metaRegionExists(FileSystem fs, Path rootDir) throws IOException {
715    Path metaRegionDir = getRegionDirFromRootDir(rootDir, RegionInfoBuilder.FIRST_META_REGIONINFO);
716    return fs.exists(metaRegionDir);
717  }
718
719  /**
720   * Compute HDFS blocks distribution of a given file, or a portion of the file
721   * @param fs file system
722   * @param status file status of the file
723   * @param start start position of the portion
724   * @param length length of the portion
725   * @return The HDFS blocks distribution
726   */
727  static public HDFSBlocksDistribution computeHDFSBlocksDistribution(
728    final FileSystem fs, FileStatus status, long start, long length)
729    throws IOException {
730    HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution();
731    BlockLocation [] blockLocations =
732      fs.getFileBlockLocations(status, start, length);
733    addToHDFSBlocksDistribution(blocksDistribution, blockLocations);
734    return blocksDistribution;
735  }
736
737  /**
738   * Update blocksDistribution with blockLocations
739   * @param blocksDistribution the hdfs blocks distribution
740   * @param blockLocations an array containing block location
741   */
742  static public void addToHDFSBlocksDistribution(
743      HDFSBlocksDistribution blocksDistribution, BlockLocation[] blockLocations)
744      throws IOException {
745    for (BlockLocation bl : blockLocations) {
746      String[] hosts = bl.getHosts();
747      long len = bl.getLength();
748      StorageType[] storageTypes = bl.getStorageTypes();
749      blocksDistribution.addHostsAndBlockWeight(hosts, len, storageTypes);
750    }
751  }
752
753  // TODO move this method OUT of FSUtils. No dependencies to HMaster
754  /**
755   * Returns the total overall fragmentation percentage. Includes hbase:meta and
756   * -ROOT- as well.
757   *
758   * @param master  The master defining the HBase root and file system
759   * @return A map for each table and its percentage (never null)
760   * @throws IOException When scanning the directory fails
761   */
762  public static int getTotalTableFragmentation(final HMaster master)
763  throws IOException {
764    Map<String, Integer> map = getTableFragmentation(master);
765    return map.isEmpty() ? -1 :  map.get("-TOTAL-");
766  }
767
768  /**
769   * Runs through the HBase rootdir and checks how many stores for each table
770   * have more than one file in them. Checks -ROOT- and hbase:meta too. The total
771   * percentage across all tables is stored under the special key "-TOTAL-".
772   *
773   * @param master  The master defining the HBase root and file system.
774   * @return A map for each table and its percentage (never null).
775   *
776   * @throws IOException When scanning the directory fails.
777   */
778  public static Map<String, Integer> getTableFragmentation(final HMaster master)
779    throws IOException {
780    Path path = CommonFSUtils.getRootDir(master.getConfiguration());
781    // since HMaster.getFileSystem() is package private
782    FileSystem fs = path.getFileSystem(master.getConfiguration());
783    return getTableFragmentation(fs, path);
784  }
785
786  /**
787   * Runs through the HBase rootdir and checks how many stores for each table
788   * have more than one file in them. Checks -ROOT- and hbase:meta too. The total
789   * percentage across all tables is stored under the special key "-TOTAL-".
790   *
791   * @param fs  The file system to use
792   * @param hbaseRootDir  The root directory to scan
793   * @return A map for each table and its percentage (never null)
794   * @throws IOException When scanning the directory fails
795   */
796  public static Map<String, Integer> getTableFragmentation(
797    final FileSystem fs, final Path hbaseRootDir)
798  throws IOException {
799    Map<String, Integer> frags = new HashMap<>();
800    int cfCountTotal = 0;
801    int cfFragTotal = 0;
802    PathFilter regionFilter = new RegionDirFilter(fs);
803    PathFilter familyFilter = new FamilyDirFilter(fs);
804    List<Path> tableDirs = getTableDirs(fs, hbaseRootDir);
805    for (Path d : tableDirs) {
806      int cfCount = 0;
807      int cfFrag = 0;
808      FileStatus[] regionDirs = fs.listStatus(d, regionFilter);
809      for (FileStatus regionDir : regionDirs) {
810        Path dd = regionDir.getPath();
811        // else its a region name, now look in region for families
812        FileStatus[] familyDirs = fs.listStatus(dd, familyFilter);
813        for (FileStatus familyDir : familyDirs) {
814          cfCount++;
815          cfCountTotal++;
816          Path family = familyDir.getPath();
817          // now in family make sure only one file
818          FileStatus[] familyStatus = fs.listStatus(family);
819          if (familyStatus.length > 1) {
820            cfFrag++;
821            cfFragTotal++;
822          }
823        }
824      }
825      // compute percentage per table and store in result list
826      frags.put(CommonFSUtils.getTableName(d).getNameAsString(),
827        cfCount == 0? 0: Math.round((float) cfFrag / cfCount * 100));
828    }
829    // set overall percentage for all tables
830    frags.put("-TOTAL-",
831      cfCountTotal == 0? 0: Math.round((float) cfFragTotal / cfCountTotal * 100));
832    return frags;
833  }
834
835  public static void renameFile(FileSystem fs, Path src, Path dst) throws IOException {
836    if (fs.exists(dst) && !fs.delete(dst, false)) {
837      throw new IOException("Can not delete " + dst);
838    }
839    if (!fs.rename(src, dst)) {
840      throw new IOException("Can not rename from " + src + " to " + dst);
841    }
842  }
843
844  /**
845   * A {@link PathFilter} that returns only regular files.
846   */
847  static class FileFilter extends AbstractFileStatusFilter {
848    private final FileSystem fs;
849
850    public FileFilter(final FileSystem fs) {
851      this.fs = fs;
852    }
853
854    @Override
855    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
856      try {
857        return isFile(fs, isDir, p);
858      } catch (IOException e) {
859        LOG.warn("Unable to verify if path={} is a regular file", p, e);
860        return false;
861      }
862    }
863  }
864
865  /**
866   * Directory filter that doesn't include any of the directories in the specified blacklist
867   */
868  public static class BlackListDirFilter extends AbstractFileStatusFilter {
869    private final FileSystem fs;
870    private List<String> blacklist;
871
872    /**
873     * Create a filter on the givem filesystem with the specified blacklist
874     * @param fs filesystem to filter
875     * @param directoryNameBlackList list of the names of the directories to filter. If
876     *          <tt>null</tt>, all directories are returned
877     */
878    @SuppressWarnings("unchecked")
879    public BlackListDirFilter(final FileSystem fs, final List<String> directoryNameBlackList) {
880      this.fs = fs;
881      blacklist =
882        (List<String>) (directoryNameBlackList == null ? Collections.emptyList()
883          : directoryNameBlackList);
884    }
885
886    @Override
887    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
888      if (!isValidName(p.getName())) {
889        return false;
890      }
891
892      try {
893        return isDirectory(fs, isDir, p);
894      } catch (IOException e) {
895        LOG.warn("An error occurred while verifying if [{}] is a valid directory."
896            + " Returning 'not valid' and continuing.", p, e);
897        return false;
898      }
899    }
900
901    protected boolean isValidName(final String name) {
902      return !blacklist.contains(name);
903    }
904  }
905
906  /**
907   * A {@link PathFilter} that only allows directories.
908   */
909  public static class DirFilter extends BlackListDirFilter {
910
911    public DirFilter(FileSystem fs) {
912      super(fs, null);
913    }
914  }
915
916  /**
917   * A {@link PathFilter} that returns usertable directories. To get all directories use the
918   * {@link BlackListDirFilter} with a <tt>null</tt> blacklist
919   */
920  public static class UserTableDirFilter extends BlackListDirFilter {
921    public UserTableDirFilter(FileSystem fs) {
922      super(fs, HConstants.HBASE_NON_TABLE_DIRS);
923    }
924
925    @Override
926    protected boolean isValidName(final String name) {
927      if (!super.isValidName(name))
928        return false;
929
930      try {
931        TableName.isLegalTableQualifierName(Bytes.toBytes(name));
932      } catch (IllegalArgumentException e) {
933        LOG.info("Invalid table name: {}", name);
934        return false;
935      }
936      return true;
937    }
938  }
939
940  public static List<Path> getTableDirs(final FileSystem fs, final Path rootdir)
941      throws IOException {
942    List<Path> tableDirs = new ArrayList<>();
943
944    for (FileStatus status : fs
945        .globStatus(new Path(rootdir, new Path(HConstants.BASE_NAMESPACE_DIR, "*")))) {
946      tableDirs.addAll(FSUtils.getLocalTableDirs(fs, status.getPath()));
947    }
948    return tableDirs;
949  }
950
951  /**
952   * @param fs
953   * @param rootdir
954   * @return All the table directories under <code>rootdir</code>. Ignore non table hbase folders such as
955   * .logs, .oldlogs, .corrupt folders.
956   * @throws IOException
957   */
958  public static List<Path> getLocalTableDirs(final FileSystem fs, final Path rootdir)
959      throws IOException {
960    // presumes any directory under hbase.rootdir is a table
961    FileStatus[] dirs = fs.listStatus(rootdir, new UserTableDirFilter(fs));
962    List<Path> tabledirs = new ArrayList<>(dirs.length);
963    for (FileStatus dir: dirs) {
964      tabledirs.add(dir.getPath());
965    }
966    return tabledirs;
967  }
968
969  /**
970   * Filter for all dirs that don't start with '.'
971   */
972  public static class RegionDirFilter extends AbstractFileStatusFilter {
973    // This pattern will accept 0.90+ style hex region dirs and older numeric region dir names.
974    final public static Pattern regionDirPattern = Pattern.compile("^[0-9a-f]*$");
975    final FileSystem fs;
976
977    public RegionDirFilter(FileSystem fs) {
978      this.fs = fs;
979    }
980
981    @Override
982    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
983      if (!regionDirPattern.matcher(p.getName()).matches()) {
984        return false;
985      }
986
987      try {
988        return isDirectory(fs, isDir, p);
989      } catch (IOException ioe) {
990        // Maybe the file was moved or the fs was disconnected.
991        LOG.warn("Skipping file {} due to IOException", p, ioe);
992        return false;
993      }
994    }
995  }
996
997  /**
998   * Given a particular table dir, return all the regiondirs inside it, excluding files such as
999   * .tableinfo
1000   * @param fs A file system for the Path
1001   * @param tableDir Path to a specific table directory &lt;hbase.rootdir&gt;/&lt;tabledir&gt;
1002   * @return List of paths to valid region directories in table dir.
1003   * @throws IOException
1004   */
1005  public static List<Path> getRegionDirs(final FileSystem fs, final Path tableDir) throws IOException {
1006    // assumes we are in a table dir.
1007    List<FileStatus> rds = listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs));
1008    if (rds == null) {
1009      return Collections.emptyList();
1010    }
1011    List<Path> regionDirs = new ArrayList<>(rds.size());
1012    for (FileStatus rdfs: rds) {
1013      Path rdPath = rdfs.getPath();
1014      regionDirs.add(rdPath);
1015    }
1016    return regionDirs;
1017  }
1018
1019  public static Path getRegionDirFromRootDir(Path rootDir, RegionInfo region) {
1020    return getRegionDirFromTableDir(CommonFSUtils.getTableDir(rootDir, region.getTable()), region);
1021  }
1022
1023  public static Path getRegionDirFromTableDir(Path tableDir, RegionInfo region) {
1024    return getRegionDirFromTableDir(tableDir,
1025        ServerRegionReplicaUtil.getRegionInfoForFs(region).getEncodedName());
1026  }
1027
1028  public static Path getRegionDirFromTableDir(Path tableDir, String encodedRegionName) {
1029    return new Path(tableDir, encodedRegionName);
1030  }
1031
1032  /**
1033   * Filter for all dirs that are legal column family names.  This is generally used for colfam
1034   * dirs &lt;hbase.rootdir&gt;/&lt;tabledir&gt;/&lt;regiondir&gt;/&lt;colfamdir&gt;.
1035   */
1036  public static class FamilyDirFilter extends AbstractFileStatusFilter {
1037    final FileSystem fs;
1038
1039    public FamilyDirFilter(FileSystem fs) {
1040      this.fs = fs;
1041    }
1042
1043    @Override
1044    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1045      try {
1046        // throws IAE if invalid
1047        ColumnFamilyDescriptorBuilder.isLegalColumnFamilyName(Bytes.toBytes(p.getName()));
1048      } catch (IllegalArgumentException iae) {
1049        // path name is an invalid family name and thus is excluded.
1050        return false;
1051      }
1052
1053      try {
1054        return isDirectory(fs, isDir, p);
1055      } catch (IOException ioe) {
1056        // Maybe the file was moved or the fs was disconnected.
1057        LOG.warn("Skipping file {} due to IOException", p, ioe);
1058        return false;
1059      }
1060    }
1061  }
1062
1063  /**
1064   * Given a particular region dir, return all the familydirs inside it
1065   *
1066   * @param fs A file system for the Path
1067   * @param regionDir Path to a specific region directory
1068   * @return List of paths to valid family directories in region dir.
1069   * @throws IOException
1070   */
1071  public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir) throws IOException {
1072    // assumes we are in a region dir.
1073    FileStatus[] fds = fs.listStatus(regionDir, new FamilyDirFilter(fs));
1074    List<Path> familyDirs = new ArrayList<>(fds.length);
1075    for (FileStatus fdfs: fds) {
1076      Path fdPath = fdfs.getPath();
1077      familyDirs.add(fdPath);
1078    }
1079    return familyDirs;
1080  }
1081
1082  public static List<Path> getReferenceFilePaths(final FileSystem fs, final Path familyDir) throws IOException {
1083    List<FileStatus> fds = listStatusWithStatusFilter(fs, familyDir, new ReferenceFileFilter(fs));
1084    if (fds == null) {
1085      return Collections.emptyList();
1086    }
1087    List<Path> referenceFiles = new ArrayList<>(fds.size());
1088    for (FileStatus fdfs: fds) {
1089      Path fdPath = fdfs.getPath();
1090      referenceFiles.add(fdPath);
1091    }
1092    return referenceFiles;
1093  }
1094
1095  /**
1096   * Filter for HFiles that excludes reference files.
1097   */
1098  public static class HFileFilter extends AbstractFileStatusFilter {
1099    final FileSystem fs;
1100
1101    public HFileFilter(FileSystem fs) {
1102      this.fs = fs;
1103    }
1104
1105    @Override
1106    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1107      if (!StoreFileInfo.isHFile(p) && !StoreFileInfo.isMobFile(p)) {
1108        return false;
1109      }
1110
1111      try {
1112        return isFile(fs, isDir, p);
1113      } catch (IOException ioe) {
1114        // Maybe the file was moved or the fs was disconnected.
1115        LOG.warn("Skipping file {} due to IOException", p, ioe);
1116        return false;
1117      }
1118    }
1119  }
1120
1121  /**
1122   * Filter for HFileLinks (StoreFiles and HFiles not included).
1123   * the filter itself does not consider if a link is file or not.
1124   */
1125  public static class HFileLinkFilter implements PathFilter {
1126
1127    @Override
1128    public boolean accept(Path p) {
1129      return HFileLink.isHFileLink(p);
1130    }
1131  }
1132
1133  public static class ReferenceFileFilter extends AbstractFileStatusFilter {
1134
1135    private final FileSystem fs;
1136
1137    public ReferenceFileFilter(FileSystem fs) {
1138      this.fs = fs;
1139    }
1140
1141    @Override
1142    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1143      if (!StoreFileInfo.isReference(p)) {
1144        return false;
1145      }
1146
1147      try {
1148        // only files can be references.
1149        return isFile(fs, isDir, p);
1150      } catch (IOException ioe) {
1151        // Maybe the file was moved or the fs was disconnected.
1152        LOG.warn("Skipping file {} due to IOException", p, ioe);
1153        return false;
1154      }
1155    }
1156  }
1157
1158  /**
1159   * Called every so-often by storefile map builder getTableStoreFilePathMap to
1160   * report progress.
1161   */
1162  interface ProgressReporter {
1163    /**
1164     * @param status File or directory we are about to process.
1165     */
1166    void progress(FileStatus status);
1167  }
1168
1169  /**
1170   * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1171   * table StoreFile names to the full Path.
1172   * <br>
1173   * Example...<br>
1174   * Key = 3944417774205889744  <br>
1175   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1176   *
1177   * @param map map to add values.  If null, this method will create and populate one to return
1178   * @param fs  The file system to use.
1179   * @param hbaseRootDir  The root directory to scan.
1180   * @param tableName name of the table to scan.
1181   * @return Map keyed by StoreFile name with a value of the full Path.
1182   * @throws IOException When scanning the directory fails.
1183   * @throws InterruptedException
1184   */
1185  public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map,
1186  final FileSystem fs, final Path hbaseRootDir, TableName tableName)
1187  throws IOException, InterruptedException {
1188    return getTableStoreFilePathMap(map, fs, hbaseRootDir, tableName, null, null,
1189        (ProgressReporter)null);
1190  }
1191
1192  /**
1193   * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1194   * table StoreFile names to the full Path.  Note that because this method can be called
1195   * on a 'live' HBase system that we will skip files that no longer exist by the time
1196   * we traverse them and similarly the user of the result needs to consider that some
1197   * entries in this map may not exist by the time this call completes.
1198   * <br>
1199   * Example...<br>
1200   * Key = 3944417774205889744  <br>
1201   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1202   *
1203   * @param resultMap map to add values.  If null, this method will create and populate one to return
1204   * @param fs  The file system to use.
1205   * @param hbaseRootDir  The root directory to scan.
1206   * @param tableName name of the table to scan.
1207   * @param sfFilter optional path filter to apply to store files
1208   * @param executor optional executor service to parallelize this operation
1209   * @param progressReporter Instance or null; gets called every time we move to new region of
1210   *   family dir and for each store file.
1211   * @return Map keyed by StoreFile name with a value of the full Path.
1212   * @throws IOException When scanning the directory fails.
1213   * @deprecated Since 2.3.0. For removal in hbase4. Use ProgressReporter override instead.
1214   */
1215  @Deprecated
1216  public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap,
1217      final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter,
1218      ExecutorService executor, final HbckErrorReporter progressReporter)
1219      throws IOException, InterruptedException {
1220    return getTableStoreFilePathMap(resultMap, fs, hbaseRootDir, tableName, sfFilter, executor,
1221        new ProgressReporter() {
1222          @Override
1223          public void progress(FileStatus status) {
1224            // status is not used in this implementation.
1225            progressReporter.progress();
1226          }
1227        });
1228  }
1229
1230  /**
1231   * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1232   * table StoreFile names to the full Path.  Note that because this method can be called
1233   * on a 'live' HBase system that we will skip files that no longer exist by the time
1234   * we traverse them and similarly the user of the result needs to consider that some
1235   * entries in this map may not exist by the time this call completes.
1236   * <br>
1237   * Example...<br>
1238   * Key = 3944417774205889744  <br>
1239   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1240   *
1241   * @param resultMap map to add values.  If null, this method will create and populate one
1242   *   to return
1243   * @param fs  The file system to use.
1244   * @param hbaseRootDir  The root directory to scan.
1245   * @param tableName name of the table to scan.
1246   * @param sfFilter optional path filter to apply to store files
1247   * @param executor optional executor service to parallelize this operation
1248   * @param progressReporter Instance or null; gets called every time we move to new region of
1249   *   family dir and for each store file.
1250   * @return Map keyed by StoreFile name with a value of the full Path.
1251   * @throws IOException When scanning the directory fails.
1252   * @throws InterruptedException the thread is interrupted, either before or during the activity.
1253   */
1254  public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap,
1255      final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter,
1256      ExecutorService executor, final ProgressReporter progressReporter)
1257    throws IOException, InterruptedException {
1258
1259    final Map<String, Path> finalResultMap =
1260        resultMap == null ? new ConcurrentHashMap<>(128, 0.75f, 32) : resultMap;
1261
1262    // only include the directory paths to tables
1263    Path tableDir = CommonFSUtils.getTableDir(hbaseRootDir, tableName);
1264    // Inside a table, there are compaction.dir directories to skip.  Otherwise, all else
1265    // should be regions.
1266    final FamilyDirFilter familyFilter = new FamilyDirFilter(fs);
1267    final Vector<Exception> exceptions = new Vector<>();
1268
1269    try {
1270      List<FileStatus> regionDirs = FSUtils.listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs));
1271      if (regionDirs == null) {
1272        return finalResultMap;
1273      }
1274
1275      final List<Future<?>> futures = new ArrayList<>(regionDirs.size());
1276
1277      for (FileStatus regionDir : regionDirs) {
1278        if (null != progressReporter) {
1279          progressReporter.progress(regionDir);
1280        }
1281        final Path dd = regionDir.getPath();
1282
1283        if (!exceptions.isEmpty()) {
1284          break;
1285        }
1286
1287        Runnable getRegionStoreFileMapCall = new Runnable() {
1288          @Override
1289          public void run() {
1290            try {
1291              HashMap<String,Path> regionStoreFileMap = new HashMap<>();
1292              List<FileStatus> familyDirs = FSUtils.listStatusWithStatusFilter(fs, dd, familyFilter);
1293              if (familyDirs == null) {
1294                if (!fs.exists(dd)) {
1295                  LOG.warn("Skipping region because it no longer exists: " + dd);
1296                } else {
1297                  LOG.warn("Skipping region because it has no family dirs: " + dd);
1298                }
1299                return;
1300              }
1301              for (FileStatus familyDir : familyDirs) {
1302                if (null != progressReporter) {
1303                  progressReporter.progress(familyDir);
1304                }
1305                Path family = familyDir.getPath();
1306                if (family.getName().equals(HConstants.RECOVERED_EDITS_DIR)) {
1307                  continue;
1308                }
1309                // now in family, iterate over the StoreFiles and
1310                // put in map
1311                FileStatus[] familyStatus = fs.listStatus(family);
1312                for (FileStatus sfStatus : familyStatus) {
1313                  if (null != progressReporter) {
1314                    progressReporter.progress(sfStatus);
1315                  }
1316                  Path sf = sfStatus.getPath();
1317                  if (sfFilter == null || sfFilter.accept(sf)) {
1318                    regionStoreFileMap.put( sf.getName(), sf);
1319                  }
1320                }
1321              }
1322              finalResultMap.putAll(regionStoreFileMap);
1323            } catch (Exception e) {
1324              LOG.error("Could not get region store file map for region: " + dd, e);
1325              exceptions.add(e);
1326            }
1327          }
1328        };
1329
1330        // If executor is available, submit async tasks to exec concurrently, otherwise
1331        // just do serial sync execution
1332        if (executor != null) {
1333          Future<?> future = executor.submit(getRegionStoreFileMapCall);
1334          futures.add(future);
1335        } else {
1336          FutureTask<?> future = new FutureTask<>(getRegionStoreFileMapCall, null);
1337          future.run();
1338          futures.add(future);
1339        }
1340      }
1341
1342      // Ensure all pending tasks are complete (or that we run into an exception)
1343      for (Future<?> f : futures) {
1344        if (!exceptions.isEmpty()) {
1345          break;
1346        }
1347        try {
1348          f.get();
1349        } catch (ExecutionException e) {
1350          LOG.error("Unexpected exec exception!  Should've been caught already.  (Bug?)", e);
1351          // Shouldn't happen, we already logged/caught any exceptions in the Runnable
1352        }
1353      }
1354    } catch (IOException e) {
1355      LOG.error("Cannot execute getTableStoreFilePathMap for " + tableName, e);
1356      exceptions.add(e);
1357    } finally {
1358      if (!exceptions.isEmpty()) {
1359        // Just throw the first exception as an indication something bad happened
1360        // Don't need to propagate all the exceptions, we already logged them all anyway
1361        Throwables.propagateIfPossible(exceptions.firstElement(), IOException.class);
1362        throw new IOException(exceptions.firstElement());
1363      }
1364    }
1365
1366    return finalResultMap;
1367  }
1368
1369  public static int getRegionReferenceFileCount(final FileSystem fs, final Path p) {
1370    int result = 0;
1371    try {
1372      for (Path familyDir:getFamilyDirs(fs, p)){
1373        result += getReferenceFilePaths(fs, familyDir).size();
1374      }
1375    } catch (IOException e) {
1376      LOG.warn("Error counting reference files", e);
1377    }
1378    return result;
1379  }
1380
1381  /**
1382   * Runs through the HBase rootdir and creates a reverse lookup map for
1383   * table StoreFile names to the full Path.
1384   * <br>
1385   * Example...<br>
1386   * Key = 3944417774205889744  <br>
1387   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1388   *
1389   * @param fs  The file system to use.
1390   * @param hbaseRootDir  The root directory to scan.
1391   * @return Map keyed by StoreFile name with a value of the full Path.
1392   * @throws IOException When scanning the directory fails.
1393   */
1394  public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs,
1395      final Path hbaseRootDir)
1396  throws IOException, InterruptedException {
1397    return getTableStoreFilePathMap(fs, hbaseRootDir, null, null, (ProgressReporter)null);
1398  }
1399
1400  /**
1401   * Runs through the HBase rootdir and creates a reverse lookup map for
1402   * table StoreFile names to the full Path.
1403   * <br>
1404   * Example...<br>
1405   * Key = 3944417774205889744  <br>
1406   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1407   *
1408   * @param fs  The file system to use.
1409   * @param hbaseRootDir  The root directory to scan.
1410   * @param sfFilter optional path filter to apply to store files
1411   * @param executor optional executor service to parallelize this operation
1412   * @param progressReporter Instance or null; gets called every time we move to new region of
1413   *   family dir and for each store file.
1414   * @return Map keyed by StoreFile name with a value of the full Path.
1415   * @throws IOException When scanning the directory fails.
1416   * @deprecated Since 2.3.0. Will be removed in hbase4. Used {@link
1417   *   #getTableStoreFilePathMap(FileSystem, Path, PathFilter, ExecutorService, ProgressReporter)}
1418   */
1419  @Deprecated
1420  public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs,
1421      final Path hbaseRootDir, PathFilter sfFilter, ExecutorService executor,
1422      HbckErrorReporter progressReporter)
1423    throws IOException, InterruptedException {
1424    return getTableStoreFilePathMap(fs, hbaseRootDir, sfFilter, executor,
1425        new ProgressReporter() {
1426          @Override
1427          public void progress(FileStatus status) {
1428            // status is not used in this implementation.
1429            progressReporter.progress();
1430          }
1431        });
1432  }
1433
1434  /**
1435   * Runs through the HBase rootdir and creates a reverse lookup map for
1436   * table StoreFile names to the full Path.
1437   * <br>
1438   * Example...<br>
1439   * Key = 3944417774205889744  <br>
1440   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1441   *
1442   * @param fs  The file system to use.
1443   * @param hbaseRootDir  The root directory to scan.
1444   * @param sfFilter optional path filter to apply to store files
1445   * @param executor optional executor service to parallelize this operation
1446   * @param progressReporter Instance or null; gets called every time we move to new region of
1447   *   family dir and for each store file.
1448   * @return Map keyed by StoreFile name with a value of the full Path.
1449   * @throws IOException When scanning the directory fails.
1450   * @throws InterruptedException
1451   */
1452  public static Map<String, Path> getTableStoreFilePathMap(
1453    final FileSystem fs, final Path hbaseRootDir, PathFilter sfFilter,
1454        ExecutorService executor, ProgressReporter progressReporter)
1455  throws IOException, InterruptedException {
1456    ConcurrentHashMap<String, Path> map = new ConcurrentHashMap<>(1024, 0.75f, 32);
1457
1458    // if this method looks similar to 'getTableFragmentation' that is because
1459    // it was borrowed from it.
1460
1461    // only include the directory paths to tables
1462    for (Path tableDir : FSUtils.getTableDirs(fs, hbaseRootDir)) {
1463      getTableStoreFilePathMap(map, fs, hbaseRootDir, CommonFSUtils.getTableName(tableDir),
1464        sfFilter, executor, progressReporter);
1465    }
1466    return map;
1467  }
1468
1469  /**
1470   * Filters FileStatuses in an array and returns a list
1471   *
1472   * @param input   An array of FileStatuses
1473   * @param filter  A required filter to filter the array
1474   * @return        A list of FileStatuses
1475   */
1476  public static List<FileStatus> filterFileStatuses(FileStatus[] input,
1477      FileStatusFilter filter) {
1478    if (input == null) return null;
1479    return filterFileStatuses(Iterators.forArray(input), filter);
1480  }
1481
1482  /**
1483   * Filters FileStatuses in an iterator and returns a list
1484   *
1485   * @param input   An iterator of FileStatuses
1486   * @param filter  A required filter to filter the array
1487   * @return        A list of FileStatuses
1488   */
1489  public static List<FileStatus> filterFileStatuses(Iterator<FileStatus> input,
1490      FileStatusFilter filter) {
1491    if (input == null) return null;
1492    ArrayList<FileStatus> results = new ArrayList<>();
1493    while (input.hasNext()) {
1494      FileStatus f = input.next();
1495      if (filter.accept(f)) {
1496        results.add(f);
1497      }
1498    }
1499    return results;
1500  }
1501
1502  /**
1503   * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
1504   * This accommodates differences between hadoop versions, where hadoop 1
1505   * does not throw a FileNotFoundException, and return an empty FileStatus[]
1506   * while Hadoop 2 will throw FileNotFoundException.
1507   *
1508   * @param fs file system
1509   * @param dir directory
1510   * @param filter file status filter
1511   * @return null if dir is empty or doesn't exist, otherwise FileStatus list
1512   */
1513  public static List<FileStatus> listStatusWithStatusFilter(final FileSystem fs,
1514      final Path dir, final FileStatusFilter filter) throws IOException {
1515    FileStatus [] status = null;
1516    try {
1517      status = fs.listStatus(dir);
1518    } catch (FileNotFoundException fnfe) {
1519      LOG.trace("{} does not exist", dir);
1520      return null;
1521    }
1522
1523    if (ArrayUtils.getLength(status) == 0)  {
1524      return null;
1525    }
1526
1527    if (filter == null) {
1528      return Arrays.asList(status);
1529    } else {
1530      List<FileStatus> status2 = filterFileStatuses(status, filter);
1531      if (status2 == null || status2.isEmpty()) {
1532        return null;
1533      } else {
1534        return status2;
1535      }
1536    }
1537  }
1538
1539  /**
1540   * This function is to scan the root path of the file system to get the
1541   * degree of locality for each region on each of the servers having at least
1542   * one block of that region.
1543   * This is used by the tool {@link org.apache.hadoop.hbase.master.RegionPlacementMaintainer}
1544   *
1545   * @param conf
1546   *          the configuration to use
1547   * @return the mapping from region encoded name to a map of server names to
1548   *           locality fraction
1549   * @throws IOException
1550   *           in case of file system errors or interrupts
1551   */
1552  public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
1553      final Configuration conf) throws IOException {
1554    return getRegionDegreeLocalityMappingFromFS(
1555        conf, null,
1556        conf.getInt(THREAD_POOLSIZE, DEFAULT_THREAD_POOLSIZE));
1557
1558  }
1559
1560  /**
1561   * This function is to scan the root path of the file system to get the
1562   * degree of locality for each region on each of the servers having at least
1563   * one block of that region.
1564   *
1565   * @param conf
1566   *          the configuration to use
1567   * @param desiredTable
1568   *          the table you wish to scan locality for
1569   * @param threadPoolSize
1570   *          the thread pool size to use
1571   * @return the mapping from region encoded name to a map of server names to
1572   *           locality fraction
1573   * @throws IOException
1574   *           in case of file system errors or interrupts
1575   */
1576  public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
1577      final Configuration conf, final String desiredTable, int threadPoolSize)
1578      throws IOException {
1579    Map<String, Map<String, Float>> regionDegreeLocalityMapping = new ConcurrentHashMap<>();
1580    getRegionLocalityMappingFromFS(conf, desiredTable, threadPoolSize, regionDegreeLocalityMapping);
1581    return regionDegreeLocalityMapping;
1582  }
1583
1584  /**
1585   * This function is to scan the root path of the file system to get either the
1586   * mapping between the region name and its best locality region server or the
1587   * degree of locality of each region on each of the servers having at least
1588   * one block of that region. The output map parameters are both optional.
1589   *
1590   * @param conf
1591   *          the configuration to use
1592   * @param desiredTable
1593   *          the table you wish to scan locality for
1594   * @param threadPoolSize
1595   *          the thread pool size to use
1596   * @param regionDegreeLocalityMapping
1597   *          the map into which to put the locality degree mapping or null,
1598   *          must be a thread-safe implementation
1599   * @throws IOException
1600   *           in case of file system errors or interrupts
1601   */
1602  private static void getRegionLocalityMappingFromFS(final Configuration conf,
1603    final String desiredTable, int threadPoolSize,
1604    final Map<String, Map<String, Float>> regionDegreeLocalityMapping) throws IOException {
1605    final FileSystem fs = FileSystem.get(conf);
1606    final Path rootPath = CommonFSUtils.getRootDir(conf);
1607    final long startTime = EnvironmentEdgeManager.currentTime();
1608    final Path queryPath;
1609    // The table files are in ${hbase.rootdir}/data/<namespace>/<table>/*
1610    if (null == desiredTable) {
1611      queryPath =
1612        new Path(new Path(rootPath, HConstants.BASE_NAMESPACE_DIR).toString() + "/*/*/*/");
1613    } else {
1614      queryPath = new Path(
1615        CommonFSUtils.getTableDir(rootPath, TableName.valueOf(desiredTable)).toString() + "/*/");
1616    }
1617
1618    // reject all paths that are not appropriate
1619    PathFilter pathFilter = new PathFilter() {
1620      @Override
1621      public boolean accept(Path path) {
1622        // this is the region name; it may get some noise data
1623        if (null == path) {
1624          return false;
1625        }
1626
1627        // no parent?
1628        Path parent = path.getParent();
1629        if (null == parent) {
1630          return false;
1631        }
1632
1633        String regionName = path.getName();
1634        if (null == regionName) {
1635          return false;
1636        }
1637
1638        if (!regionName.toLowerCase(Locale.ROOT).matches("[0-9a-f]+")) {
1639          return false;
1640        }
1641        return true;
1642      }
1643    };
1644
1645    FileStatus[] statusList = fs.globStatus(queryPath, pathFilter);
1646
1647    if (LOG.isDebugEnabled()) {
1648      LOG.debug("Query Path: {} ; # list of files: {}", queryPath, Arrays.toString(statusList));
1649    }
1650
1651    if (null == statusList) {
1652      return;
1653    }
1654
1655    // lower the number of threads in case we have very few expected regions
1656    threadPoolSize = Math.min(threadPoolSize, statusList.length);
1657
1658    // run in multiple threads
1659    final ExecutorService tpe = Executors.newFixedThreadPool(threadPoolSize,
1660      Threads.newDaemonThreadFactory("FSRegionQuery"));
1661    try {
1662      // ignore all file status items that are not of interest
1663      for (FileStatus regionStatus : statusList) {
1664        if (null == regionStatus || !regionStatus.isDirectory()) {
1665          continue;
1666        }
1667
1668        final Path regionPath = regionStatus.getPath();
1669        if (null != regionPath) {
1670          tpe.execute(new FSRegionScanner(fs, regionPath, null, regionDegreeLocalityMapping));
1671        }
1672      }
1673    } finally {
1674      tpe.shutdown();
1675      final long threadWakeFrequency = (long) conf.getInt(HConstants.THREAD_WAKE_FREQUENCY,
1676        HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
1677      try {
1678        // here we wait until TPE terminates, which is either naturally or by
1679        // exceptions in the execution of the threads
1680        while (!tpe.awaitTermination(threadWakeFrequency,
1681            TimeUnit.MILLISECONDS)) {
1682          // printing out rough estimate, so as to not introduce
1683          // AtomicInteger
1684          LOG.info("Locality checking is underway: { Scanned Regions : "
1685              + ((ThreadPoolExecutor) tpe).getCompletedTaskCount() + "/"
1686              + ((ThreadPoolExecutor) tpe).getTaskCount() + " }");
1687        }
1688      } catch (InterruptedException e) {
1689        Thread.currentThread().interrupt();
1690        throw (InterruptedIOException) new InterruptedIOException().initCause(e);
1691      }
1692    }
1693
1694    long overhead = EnvironmentEdgeManager.currentTime() - startTime;
1695    LOG.info("Scan DFS for locality info takes {}ms", overhead);
1696  }
1697
1698  /**
1699   * Do our short circuit read setup.
1700   * Checks buffer size to use and whether to do checksumming in hbase or hdfs.
1701   * @param conf
1702   */
1703  public static void setupShortCircuitRead(final Configuration conf) {
1704    // Check that the user has not set the "dfs.client.read.shortcircuit.skip.checksum" property.
1705    boolean shortCircuitSkipChecksum =
1706      conf.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false);
1707    boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
1708    if (shortCircuitSkipChecksum) {
1709      LOG.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not " +
1710        "be set to true." + (useHBaseChecksum ? " HBase checksum doesn't require " +
1711        "it, see https://issues.apache.org/jira/browse/HBASE-6868." : ""));
1712      assert !shortCircuitSkipChecksum; //this will fail if assertions are on
1713    }
1714    checkShortCircuitReadBufferSize(conf);
1715  }
1716
1717  /**
1718   * Check if short circuit read buffer size is set and if not, set it to hbase value.
1719   * @param conf
1720   */
1721  public static void checkShortCircuitReadBufferSize(final Configuration conf) {
1722    final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2;
1723    final int notSet = -1;
1724    // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2
1725    final String dfsKey = "dfs.client.read.shortcircuit.buffer.size";
1726    int size = conf.getInt(dfsKey, notSet);
1727    // If a size is set, return -- we will use it.
1728    if (size != notSet) return;
1729    // But short circuit buffer size is normally not set.  Put in place the hbase wanted size.
1730    int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize);
1731    conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
1732  }
1733
1734  /**
1735   * @param c
1736   * @return The DFSClient DFSHedgedReadMetrics instance or null if can't be found or not on hdfs.
1737   * @throws IOException
1738   */
1739  public static DFSHedgedReadMetrics getDFSHedgedReadMetrics(final Configuration c)
1740      throws IOException {
1741    if (!CommonFSUtils.isHDFS(c)) {
1742      return null;
1743    }
1744    // getHedgedReadMetrics is package private. Get the DFSClient instance that is internal
1745    // to the DFS FS instance and make the method getHedgedReadMetrics accessible, then invoke it
1746    // to get the singleton instance of DFSHedgedReadMetrics shared by DFSClients.
1747    final String name = "getHedgedReadMetrics";
1748    DFSClient dfsclient = ((DistributedFileSystem)FileSystem.get(c)).getClient();
1749    Method m;
1750    try {
1751      m = dfsclient.getClass().getDeclaredMethod(name);
1752    } catch (NoSuchMethodException e) {
1753      LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " +
1754          e.getMessage());
1755      return null;
1756    } catch (SecurityException e) {
1757      LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " +
1758          e.getMessage());
1759      return null;
1760    }
1761    m.setAccessible(true);
1762    try {
1763      return (DFSHedgedReadMetrics)m.invoke(dfsclient);
1764    } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
1765      LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " +
1766          e.getMessage());
1767      return null;
1768    }
1769  }
1770
1771  public static List<Path> copyFilesParallel(FileSystem srcFS, Path src, FileSystem dstFS, Path dst,
1772      Configuration conf, int threads) throws IOException {
1773    ExecutorService pool = Executors.newFixedThreadPool(threads);
1774    List<Future<Void>> futures = new ArrayList<>();
1775    List<Path> traversedPaths;
1776    try {
1777      traversedPaths = copyFiles(srcFS, src, dstFS, dst, conf, pool, futures);
1778      for (Future<Void> future : futures) {
1779        future.get();
1780      }
1781    } catch (ExecutionException | InterruptedException | IOException e) {
1782      throw new IOException("Copy snapshot reference files failed", e);
1783    } finally {
1784      pool.shutdownNow();
1785    }
1786    return traversedPaths;
1787  }
1788
1789  private static List<Path> copyFiles(FileSystem srcFS, Path src, FileSystem dstFS, Path dst,
1790      Configuration conf, ExecutorService pool, List<Future<Void>> futures) throws IOException {
1791    List<Path> traversedPaths = new ArrayList<>();
1792    traversedPaths.add(dst);
1793    FileStatus currentFileStatus = srcFS.getFileStatus(src);
1794    if (currentFileStatus.isDirectory()) {
1795      if (!dstFS.mkdirs(dst)) {
1796        throw new IOException("Create directory failed: " + dst);
1797      }
1798      FileStatus[] subPaths = srcFS.listStatus(src);
1799      for (FileStatus subPath : subPaths) {
1800        traversedPaths.addAll(copyFiles(srcFS, subPath.getPath(), dstFS,
1801          new Path(dst, subPath.getPath().getName()), conf, pool, futures));
1802      }
1803    } else {
1804      Future<Void> future = pool.submit(() -> {
1805        FileUtil.copy(srcFS, src, dstFS, dst, false, false, conf);
1806        return null;
1807      });
1808      futures.add(future);
1809    }
1810    return traversedPaths;
1811  }
1812
1813  /**
1814   * @return A set containing all namenode addresses of fs
1815   */
1816  private static Set<InetSocketAddress> getNNAddresses(DistributedFileSystem fs,
1817    Configuration conf) {
1818    Set<InetSocketAddress> addresses = new HashSet<>();
1819    String serviceName = fs.getCanonicalServiceName();
1820
1821    if (serviceName.startsWith("ha-hdfs")) {
1822      try {
1823        Map<String, Map<String, InetSocketAddress>> addressMap =
1824          DFSUtil.getNNServiceRpcAddressesForCluster(conf);
1825        String nameService = serviceName.substring(serviceName.indexOf(":") + 1);
1826        if (addressMap.containsKey(nameService)) {
1827          Map<String, InetSocketAddress> nnMap = addressMap.get(nameService);
1828          for (Map.Entry<String, InetSocketAddress> e2 : nnMap.entrySet()) {
1829            InetSocketAddress addr = e2.getValue();
1830            addresses.add(addr);
1831          }
1832        }
1833      } catch (Exception e) {
1834        LOG.warn("DFSUtil.getNNServiceRpcAddresses failed. serviceName=" + serviceName, e);
1835      }
1836    } else {
1837      URI uri = fs.getUri();
1838      int port = uri.getPort();
1839      if (port < 0) {
1840        int idx = serviceName.indexOf(':');
1841        port = Integer.parseInt(serviceName.substring(idx + 1));
1842      }
1843      InetSocketAddress addr = new InetSocketAddress(uri.getHost(), port);
1844      addresses.add(addr);
1845    }
1846
1847    return addresses;
1848  }
1849
1850  /**
1851   * @param conf the Configuration of HBase
1852   * @return Whether srcFs and desFs are on same hdfs or not
1853   */
1854  public static boolean isSameHdfs(Configuration conf, FileSystem srcFs, FileSystem desFs) {
1855    // By getCanonicalServiceName, we could make sure both srcFs and desFs
1856    // show a unified format which contains scheme, host and port.
1857    String srcServiceName = srcFs.getCanonicalServiceName();
1858    String desServiceName = desFs.getCanonicalServiceName();
1859
1860    if (srcServiceName == null || desServiceName == null) {
1861      return false;
1862    }
1863    if (srcServiceName.equals(desServiceName)) {
1864      return true;
1865    }
1866    if (srcServiceName.startsWith("ha-hdfs") && desServiceName.startsWith("ha-hdfs")) {
1867      Collection<String> internalNameServices =
1868        conf.getTrimmedStringCollection("dfs.internal.nameservices");
1869      if (!internalNameServices.isEmpty()) {
1870        if (internalNameServices.contains(srcServiceName.split(":")[1])) {
1871          return true;
1872        } else {
1873          return false;
1874        }
1875      }
1876    }
1877    if (srcFs instanceof DistributedFileSystem && desFs instanceof DistributedFileSystem) {
1878      // If one serviceName is an HA format while the other is a non-HA format,
1879      // maybe they refer to the same FileSystem.
1880      // For example, srcFs is "ha-hdfs://nameservices" and desFs is "hdfs://activeNamenode:port"
1881      Set<InetSocketAddress> srcAddrs = getNNAddresses((DistributedFileSystem) srcFs, conf);
1882      Set<InetSocketAddress> desAddrs = getNNAddresses((DistributedFileSystem) desFs, conf);
1883      if (Sets.intersection(srcAddrs, desAddrs).size() > 0) {
1884        return true;
1885      }
1886    }
1887
1888    return false;
1889  }
1890}