001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import static org.apache.hadoop.hbase.util.LocatedBlockHelper.getLocatedBlockLocations;
021import static org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET;
022
023import edu.umd.cs.findbugs.annotations.CheckForNull;
024import java.io.ByteArrayInputStream;
025import java.io.DataInputStream;
026import java.io.EOFException;
027import java.io.FileNotFoundException;
028import java.io.IOException;
029import java.io.InterruptedIOException;
030import java.lang.reflect.InvocationTargetException;
031import java.lang.reflect.Method;
032import java.net.InetSocketAddress;
033import java.net.URI;
034import java.util.ArrayList;
035import java.util.Arrays;
036import java.util.Collection;
037import java.util.Collections;
038import java.util.HashMap;
039import java.util.HashSet;
040import java.util.Iterator;
041import java.util.List;
042import java.util.Locale;
043import java.util.Map;
044import java.util.Optional;
045import java.util.Set;
046import java.util.Vector;
047import java.util.concurrent.ConcurrentHashMap;
048import java.util.concurrent.ExecutionException;
049import java.util.concurrent.ExecutorService;
050import java.util.concurrent.Executors;
051import java.util.concurrent.Future;
052import java.util.concurrent.FutureTask;
053import java.util.concurrent.ThreadPoolExecutor;
054import java.util.concurrent.TimeUnit;
055import java.util.regex.Pattern;
056import org.apache.commons.lang3.ArrayUtils;
057import org.apache.hadoop.conf.Configuration;
058import org.apache.hadoop.fs.BlockLocation;
059import org.apache.hadoop.fs.FSDataInputStream;
060import org.apache.hadoop.fs.FSDataOutputStream;
061import org.apache.hadoop.fs.FileStatus;
062import org.apache.hadoop.fs.FileSystem;
063import org.apache.hadoop.fs.FileUtil;
064import org.apache.hadoop.fs.Path;
065import org.apache.hadoop.fs.PathFilter;
066import org.apache.hadoop.fs.StorageType;
067import org.apache.hadoop.fs.permission.FsPermission;
068import org.apache.hadoop.hbase.ClusterId;
069import org.apache.hadoop.hbase.HConstants;
070import org.apache.hadoop.hbase.HDFSBlocksDistribution;
071import org.apache.hadoop.hbase.TableName;
072import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
073import org.apache.hadoop.hbase.client.RegionInfo;
074import org.apache.hadoop.hbase.client.RegionInfoBuilder;
075import org.apache.hadoop.hbase.exceptions.DeserializationException;
076import org.apache.hadoop.hbase.fs.HFileSystem;
077import org.apache.hadoop.hbase.io.HFileLink;
078import org.apache.hadoop.hbase.master.HMaster;
079import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
080import org.apache.hadoop.hdfs.DFSClient;
081import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
082import org.apache.hadoop.hdfs.DFSUtil;
083import org.apache.hadoop.hdfs.DistributedFileSystem;
084import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
085import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
086import org.apache.hadoop.hdfs.protocol.LocatedBlock;
087import org.apache.hadoop.io.IOUtils;
088import org.apache.hadoop.ipc.RemoteException;
089import org.apache.yetus.audience.InterfaceAudience;
090import org.slf4j.Logger;
091import org.slf4j.LoggerFactory;
092
093import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
094import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
095import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
096import org.apache.hbase.thirdparty.com.google.common.primitives.Ints;
097import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
098
099import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
100import org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos;
101
102/**
103 * Utility methods for interacting with the underlying file system.
104 */
105@InterfaceAudience.Private
106public final class FSUtils {
107  private static final Logger LOG = LoggerFactory.getLogger(FSUtils.class);
108
109  private static final String THREAD_POOLSIZE = "hbase.client.localityCheck.threadPoolSize";
110  private static final int DEFAULT_THREAD_POOLSIZE = 2;
111
112  /** Set to true on Windows platforms */
113  // currently only used in testing. TODO refactor into a test class
114  public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
115
116  private FSUtils() {
117  }
118
119  /** Returns True is <code>fs</code> is instance of DistributedFileSystem n */
120  public static boolean isDistributedFileSystem(final FileSystem fs) throws IOException {
121    FileSystem fileSystem = fs;
122    // If passed an instance of HFileSystem, it fails instanceof DistributedFileSystem.
123    // Check its backing fs for dfs-ness.
124    if (fs instanceof HFileSystem) {
125      fileSystem = ((HFileSystem) fs).getBackingFs();
126    }
127    return fileSystem instanceof DistributedFileSystem;
128  }
129
130  /**
131   * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
132   * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true. Does not consider
133   * schema; i.e. if schemas different but path or subpath matches, the two will equate.
134   * @param pathToSearch Path we will be trying to match.
135   * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
136   */
137  public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) {
138    Path tailPath = pathTail;
139    String tailName;
140    Path toSearch = pathToSearch;
141    String toSearchName;
142    boolean result = false;
143
144    if (pathToSearch.depth() != pathTail.depth()) {
145      return false;
146    }
147
148    do {
149      tailName = tailPath.getName();
150      if (tailName == null || tailName.isEmpty()) {
151        result = true;
152        break;
153      }
154      toSearchName = toSearch.getName();
155      if (toSearchName == null || toSearchName.isEmpty()) {
156        break;
157      }
158      // Move up a parent on each path for next go around. Path doesn't let us go off the end.
159      tailPath = tailPath.getParent();
160      toSearch = toSearch.getParent();
161    } while (tailName.equals(toSearchName));
162    return result;
163  }
164
165  /**
166   * Delete the region directory if exists.
167   * @return True if deleted the region directory.
168   */
169  public static boolean deleteRegionDir(final Configuration conf, final RegionInfo hri)
170    throws IOException {
171    Path rootDir = CommonFSUtils.getRootDir(conf);
172    FileSystem fs = rootDir.getFileSystem(conf);
173    return CommonFSUtils.deleteDirectory(fs,
174      new Path(CommonFSUtils.getTableDir(rootDir, hri.getTable()), hri.getEncodedName()));
175  }
176
177  /**
178   * Create the specified file on the filesystem. By default, this will:
179   * <ol>
180   * <li>overwrite the file if it exists</li>
181   * <li>apply the umask in the configuration (if it is enabled)</li>
182   * <li>use the fs configured buffer size (or 4096 if not set)</li>
183   * <li>use the configured column family replication or default replication if
184   * {@link ColumnFamilyDescriptorBuilder#DEFAULT_DFS_REPLICATION}</li>
185   * <li>use the default block size</li>
186   * <li>not track progress</li>
187   * </ol>
188   * @param conf         configurations
189   * @param fs           {@link FileSystem} on which to write the file
190   * @param path         {@link Path} to the file to write
191   * @param perm         permissions
192   * @param favoredNodes favored data nodes
193   * @return output stream to the created file
194   * @throws IOException if the file cannot be created
195   */
196  public static FSDataOutputStream create(Configuration conf, FileSystem fs, Path path,
197    FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException {
198    if (fs instanceof HFileSystem) {
199      FileSystem backingFs = ((HFileSystem) fs).getBackingFs();
200      if (backingFs instanceof DistributedFileSystem) {
201        short replication = Short.parseShort(conf.get(ColumnFamilyDescriptorBuilder.DFS_REPLICATION,
202          String.valueOf(ColumnFamilyDescriptorBuilder.DEFAULT_DFS_REPLICATION)));
203        DistributedFileSystem.HdfsDataOutputStreamBuilder builder =
204          ((DistributedFileSystem) backingFs).createFile(path).recursive().permission(perm)
205            .create();
206        if (favoredNodes != null) {
207          builder.favoredNodes(favoredNodes);
208        }
209        if (replication > 0) {
210          builder.replication(replication);
211        }
212        return builder.build();
213      }
214
215    }
216    return CommonFSUtils.create(fs, path, perm, true);
217  }
218
219  /**
220   * Checks to see if the specified file system is available
221   * @param fs filesystem
222   * @throws IOException e
223   */
224  public static void checkFileSystemAvailable(final FileSystem fs) throws IOException {
225    if (!(fs instanceof DistributedFileSystem)) {
226      return;
227    }
228    IOException exception = null;
229    DistributedFileSystem dfs = (DistributedFileSystem) fs;
230    try {
231      if (dfs.exists(new Path("/"))) {
232        return;
233      }
234    } catch (IOException e) {
235      exception = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
236    }
237    try {
238      fs.close();
239    } catch (Exception e) {
240      LOG.error("file system close failed: ", e);
241    }
242    throw new IOException("File system is not available", exception);
243  }
244
245  /**
246   * Inquire the Active NameNode's safe mode status.
247   * @param dfs A DistributedFileSystem object representing the underlying HDFS.
248   * @return whether we're in safe mode
249   */
250  private static boolean isInSafeMode(DistributedFileSystem dfs) throws IOException {
251    return dfs.setSafeMode(SAFEMODE_GET, true);
252  }
253
254  /**
255   * Check whether dfs is in safemode.
256   */
257  public static void checkDfsSafeMode(final Configuration conf) throws IOException {
258    boolean isInSafeMode = false;
259    FileSystem fs = FileSystem.get(conf);
260    if (fs instanceof DistributedFileSystem) {
261      DistributedFileSystem dfs = (DistributedFileSystem) fs;
262      isInSafeMode = isInSafeMode(dfs);
263    }
264    if (isInSafeMode) {
265      throw new IOException("File system is in safemode, it can't be written now");
266    }
267  }
268
269  /**
270   * Verifies current version of file system
271   * @param fs      filesystem object
272   * @param rootdir root hbase directory
273   * @return null if no version file exists, version string otherwise
274   * @throws IOException              if the version file fails to open
275   * @throws DeserializationException if the version data cannot be translated into a version
276   */
277  public static String getVersion(FileSystem fs, Path rootdir)
278    throws IOException, DeserializationException {
279    final Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
280    FileStatus[] status = null;
281    try {
282      // hadoop 2.0 throws FNFE if directory does not exist.
283      // hadoop 1.0 returns null if directory does not exist.
284      status = fs.listStatus(versionFile);
285    } catch (FileNotFoundException fnfe) {
286      return null;
287    }
288    if (ArrayUtils.getLength(status) == 0) {
289      return null;
290    }
291    String version = null;
292    byte[] content = new byte[(int) status[0].getLen()];
293    FSDataInputStream s = fs.open(versionFile);
294    try {
295      IOUtils.readFully(s, content, 0, content.length);
296      if (ProtobufUtil.isPBMagicPrefix(content)) {
297        version = parseVersionFrom(content);
298      } else {
299        // Presume it pre-pb format.
300        try (DataInputStream dis = new DataInputStream(new ByteArrayInputStream(content))) {
301          version = dis.readUTF();
302        }
303      }
304    } catch (EOFException eof) {
305      LOG.warn("Version file was empty, odd, will try to set it.");
306    } finally {
307      s.close();
308    }
309    return version;
310  }
311
312  /**
313   * Parse the content of the ${HBASE_ROOTDIR}/hbase.version file.
314   * @param bytes The byte content of the hbase.version file
315   * @return The version found in the file as a String
316   * @throws DeserializationException if the version data cannot be translated into a version
317   */
318  static String parseVersionFrom(final byte[] bytes) throws DeserializationException {
319    ProtobufUtil.expectPBMagicPrefix(bytes);
320    int pblen = ProtobufUtil.lengthOfPBMagic();
321    FSProtos.HBaseVersionFileContent.Builder builder =
322      FSProtos.HBaseVersionFileContent.newBuilder();
323    try {
324      ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
325      return builder.getVersion();
326    } catch (IOException e) {
327      // Convert
328      throw new DeserializationException(e);
329    }
330  }
331
332  /**
333   * Create the content to write into the ${HBASE_ROOTDIR}/hbase.version file.
334   * @param version Version to persist
335   * @return Serialized protobuf with <code>version</code> content and a bit of pb magic for a
336   *         prefix.
337   */
338  static byte[] toVersionByteArray(final String version) {
339    FSProtos.HBaseVersionFileContent.Builder builder =
340      FSProtos.HBaseVersionFileContent.newBuilder();
341    return ProtobufUtil.prependPBMagic(builder.setVersion(version).build().toByteArray());
342  }
343
344  /**
345   * Verifies current version of file system
346   * @param fs      file system
347   * @param rootdir root directory of HBase installation
348   * @param message if true, issues a message on System.out
349   * @throws IOException              if the version file cannot be opened
350   * @throws DeserializationException if the contents of the version file cannot be parsed
351   */
352  public static void checkVersion(FileSystem fs, Path rootdir, boolean message)
353    throws IOException, DeserializationException {
354    checkVersion(fs, rootdir, message, 0, HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
355  }
356
357  /**
358   * Verifies current version of file system
359   * @param fs      file system
360   * @param rootdir root directory of HBase installation
361   * @param message if true, issues a message on System.out
362   * @param wait    wait interval
363   * @param retries number of times to retry
364   * @throws IOException              if the version file cannot be opened
365   * @throws DeserializationException if the contents of the version file cannot be parsed
366   */
367  public static void checkVersion(FileSystem fs, Path rootdir, boolean message, int wait,
368    int retries) throws IOException, DeserializationException {
369    String version = getVersion(fs, rootdir);
370    String msg;
371    if (version == null) {
372      if (!metaRegionExists(fs, rootdir)) {
373        // rootDir is empty (no version file and no root region)
374        // just create new version file (HBASE-1195)
375        setVersion(fs, rootdir, wait, retries);
376        return;
377      } else {
378        msg = "hbase.version file is missing. Is your hbase.rootdir valid? "
379          + "You can restore hbase.version file by running 'HBCK2 filesystem -fix'. "
380          + "See https://github.com/apache/hbase-operator-tools/tree/master/hbase-hbck2";
381      }
382    } else if (version.compareTo(HConstants.FILE_SYSTEM_VERSION) == 0) {
383      return;
384    } else {
385      msg = "HBase file layout needs to be upgraded. Current filesystem version is " + version
386        + " but software requires version " + HConstants.FILE_SYSTEM_VERSION
387        + ". Consult http://hbase.apache.org/book.html for further information about "
388        + "upgrading HBase.";
389    }
390
391    // version is deprecated require migration
392    // Output on stdout so user sees it in terminal.
393    if (message) {
394      System.out.println("WARNING! " + msg);
395    }
396    throw new FileSystemVersionException(msg);
397  }
398
399  /**
400   * Sets version of file system
401   * @param fs      filesystem object
402   * @param rootdir hbase root
403   * @throws IOException e
404   */
405  public static void setVersion(FileSystem fs, Path rootdir) throws IOException {
406    setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, 0,
407      HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
408  }
409
410  /**
411   * Sets version of file system
412   * @param fs      filesystem object
413   * @param rootdir hbase root
414   * @param wait    time to wait for retry
415   * @param retries number of times to retry before failing
416   * @throws IOException e
417   */
418  public static void setVersion(FileSystem fs, Path rootdir, int wait, int retries)
419    throws IOException {
420    setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, wait, retries);
421  }
422
423  /**
424   * Sets version of file system
425   * @param fs      filesystem object
426   * @param rootdir hbase root directory
427   * @param version version to set
428   * @param wait    time to wait for retry
429   * @param retries number of times to retry before throwing an IOException
430   * @throws IOException e
431   */
432  public static void setVersion(FileSystem fs, Path rootdir, String version, int wait, int retries)
433    throws IOException {
434    Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
435    Path tempVersionFile = new Path(rootdir,
436      HConstants.HBASE_TEMP_DIRECTORY + Path.SEPARATOR + HConstants.VERSION_FILE_NAME);
437    while (true) {
438      try {
439        // Write the version to a temporary file
440        FSDataOutputStream s = fs.create(tempVersionFile);
441        try {
442          s.write(toVersionByteArray(version));
443          s.close();
444          s = null;
445          // Move the temp version file to its normal location. Returns false
446          // if the rename failed. Throw an IOE in that case.
447          if (!fs.rename(tempVersionFile, versionFile)) {
448            throw new IOException("Unable to move temp version file to " + versionFile);
449          }
450        } finally {
451          // Cleaning up the temporary if the rename failed would be trying
452          // too hard. We'll unconditionally create it again the next time
453          // through anyway, files are overwritten by default by create().
454
455          // Attempt to close the stream on the way out if it is still open.
456          try {
457            if (s != null) s.close();
458          } catch (IOException ignore) {
459          }
460        }
461        LOG.info("Created version file at " + rootdir.toString() + " with version=" + version);
462        return;
463      } catch (IOException e) {
464        if (retries > 0) {
465          LOG.debug("Unable to create version file at " + rootdir.toString() + ", retrying", e);
466          fs.delete(versionFile, false);
467          try {
468            if (wait > 0) {
469              Thread.sleep(wait);
470            }
471          } catch (InterruptedException ie) {
472            throw (InterruptedIOException) new InterruptedIOException().initCause(ie);
473          }
474          retries--;
475        } else {
476          throw e;
477        }
478      }
479    }
480  }
481
482  /**
483   * Checks that a cluster ID file exists in the HBase root directory
484   * @param fs      the root directory FileSystem
485   * @param rootdir the HBase root directory in HDFS
486   * @param wait    how long to wait between retries
487   * @return <code>true</code> if the file exists, otherwise <code>false</code>
488   * @throws IOException if checking the FileSystem fails
489   */
490  public static boolean checkClusterIdExists(FileSystem fs, Path rootdir, long wait)
491    throws IOException {
492    while (true) {
493      try {
494        Path filePath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
495        return fs.exists(filePath);
496      } catch (IOException ioe) {
497        if (wait > 0L) {
498          LOG.warn("Unable to check cluster ID file in {}, retrying in {}ms", rootdir, wait, ioe);
499          try {
500            Thread.sleep(wait);
501          } catch (InterruptedException e) {
502            Thread.currentThread().interrupt();
503            throw (InterruptedIOException) new InterruptedIOException().initCause(e);
504          }
505        } else {
506          throw ioe;
507        }
508      }
509    }
510  }
511
512  /**
513   * Returns the value of the unique cluster ID stored for this HBase instance.
514   * @param fs      the root directory FileSystem
515   * @param rootdir the path to the HBase root directory
516   * @return the unique cluster identifier
517   * @throws IOException if reading the cluster ID file fails
518   */
519  public static ClusterId getClusterId(FileSystem fs, Path rootdir) throws IOException {
520    Path idPath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
521    ClusterId clusterId = null;
522    FileStatus status = fs.exists(idPath) ? fs.getFileStatus(idPath) : null;
523    if (status != null) {
524      int len = Ints.checkedCast(status.getLen());
525      byte[] content = new byte[len];
526      FSDataInputStream in = fs.open(idPath);
527      try {
528        in.readFully(content);
529      } catch (EOFException eof) {
530        LOG.warn("Cluster ID file {} is empty", idPath);
531      } finally {
532        in.close();
533      }
534      try {
535        clusterId = ClusterId.parseFrom(content);
536      } catch (DeserializationException e) {
537        throw new IOException("content=" + Bytes.toString(content), e);
538      }
539      // If not pb'd, make it so.
540      if (!ProtobufUtil.isPBMagicPrefix(content)) {
541        String cid = null;
542        in = fs.open(idPath);
543        try {
544          cid = in.readUTF();
545          clusterId = new ClusterId(cid);
546        } catch (EOFException eof) {
547          LOG.warn("Cluster ID file {} is empty", idPath);
548        } finally {
549          in.close();
550        }
551        rewriteAsPb(fs, rootdir, idPath, clusterId);
552      }
553      return clusterId;
554    } else {
555      LOG.warn("Cluster ID file does not exist at {}", idPath);
556    }
557    return clusterId;
558  }
559
560  /**
561   *   */
562  private static void rewriteAsPb(final FileSystem fs, final Path rootdir, final Path p,
563    final ClusterId cid) throws IOException {
564    // Rewrite the file as pb. Move aside the old one first, write new
565    // then delete the moved-aside file.
566    Path movedAsideName = new Path(p + "." + EnvironmentEdgeManager.currentTime());
567    if (!fs.rename(p, movedAsideName)) throw new IOException("Failed rename of " + p);
568    setClusterId(fs, rootdir, cid, 100);
569    if (!fs.delete(movedAsideName, false)) {
570      throw new IOException("Failed delete of " + movedAsideName);
571    }
572    LOG.debug("Rewrote the hbase.id file as pb");
573  }
574
575  /**
576   * Writes a new unique identifier for this cluster to the "hbase.id" file in the HBase root
577   * directory. If any operations on the ID file fails, and {@code wait} is a positive value, the
578   * method will retry to produce the ID file until the thread is forcibly interrupted.
579   * @param fs        the root directory FileSystem
580   * @param rootdir   the path to the HBase root directory
581   * @param clusterId the unique identifier to store
582   * @param wait      how long (in milliseconds) to wait between retries
583   * @throws IOException if writing to the FileSystem fails and no wait value
584   */
585  public static void setClusterId(final FileSystem fs, final Path rootdir,
586    final ClusterId clusterId, final long wait) throws IOException {
587
588    final Path idFile = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
589    final Path tempDir = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY);
590    final Path tempIdFile = new Path(tempDir, HConstants.CLUSTER_ID_FILE_NAME);
591
592    LOG.debug("Create cluster ID file [{}] with ID: {}", idFile, clusterId);
593
594    while (true) {
595      Optional<IOException> failure = Optional.empty();
596
597      LOG.debug("Write the cluster ID file to a temporary location: {}", tempIdFile);
598      try (FSDataOutputStream s = fs.create(tempIdFile)) {
599        s.write(clusterId.toByteArray());
600      } catch (IOException ioe) {
601        failure = Optional.of(ioe);
602      }
603
604      if (!failure.isPresent()) {
605        try {
606          LOG.debug("Move the temporary cluster ID file to its target location [{}]:[{}]",
607            tempIdFile, idFile);
608
609          if (!fs.rename(tempIdFile, idFile)) {
610            failure =
611              Optional.of(new IOException("Unable to move temp cluster ID file to " + idFile));
612          }
613        } catch (IOException ioe) {
614          failure = Optional.of(ioe);
615        }
616      }
617
618      if (failure.isPresent()) {
619        final IOException cause = failure.get();
620        if (wait > 0L) {
621          LOG.warn("Unable to create cluster ID file in {}, retrying in {}ms", rootdir, wait,
622            cause);
623          try {
624            Thread.sleep(wait);
625          } catch (InterruptedException e) {
626            Thread.currentThread().interrupt();
627            throw (InterruptedIOException) new InterruptedIOException().initCause(e);
628          }
629          continue;
630        } else {
631          throw cause;
632        }
633      } else {
634        return;
635      }
636    }
637  }
638
639  /**
640   * If DFS, check safe mode and if so, wait until we clear it.
641   * @param conf configuration
642   * @param wait Sleep between retries
643   * @throws IOException e
644   */
645  public static void waitOnSafeMode(final Configuration conf, final long wait) throws IOException {
646    FileSystem fs = FileSystem.get(conf);
647    if (!(fs instanceof DistributedFileSystem)) return;
648    DistributedFileSystem dfs = (DistributedFileSystem) fs;
649    // Make sure dfs is not in safe mode
650    while (isInSafeMode(dfs)) {
651      LOG.info("Waiting for dfs to exit safe mode...");
652      try {
653        Thread.sleep(wait);
654      } catch (InterruptedException e) {
655        Thread.currentThread().interrupt();
656        throw (InterruptedIOException) new InterruptedIOException().initCause(e);
657      }
658    }
659  }
660
661  /**
662   * Checks if meta region exists
663   * @param fs      file system
664   * @param rootDir root directory of HBase installation
665   * @return true if exists
666   */
667  public static boolean metaRegionExists(FileSystem fs, Path rootDir) throws IOException {
668    Path metaRegionDir = getRegionDirFromRootDir(rootDir, RegionInfoBuilder.FIRST_META_REGIONINFO);
669    return fs.exists(metaRegionDir);
670  }
671
672  /**
673   * Compute HDFS block distribution of a given HdfsDataInputStream. All HdfsDataInputStreams are
674   * backed by a series of LocatedBlocks, which are fetched periodically from the namenode. This
675   * method retrieves those blocks from the input stream and uses them to calculate
676   * HDFSBlockDistribution. The underlying method in DFSInputStream does attempt to use locally
677   * cached blocks, but may hit the namenode if the cache is determined to be incomplete. The method
678   * also involves making copies of all LocatedBlocks rather than return the underlying blocks
679   * themselves.
680   */
681  static public HDFSBlocksDistribution
682    computeHDFSBlocksDistribution(HdfsDataInputStream inputStream) throws IOException {
683    List<LocatedBlock> blocks = inputStream.getAllBlocks();
684    HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution();
685    for (LocatedBlock block : blocks) {
686      String[] hosts = getHostsForLocations(block);
687      long len = block.getBlockSize();
688      StorageType[] storageTypes = block.getStorageTypes();
689      blocksDistribution.addHostsAndBlockWeight(hosts, len, storageTypes);
690    }
691    return blocksDistribution;
692  }
693
694  private static String[] getHostsForLocations(LocatedBlock block) {
695    DatanodeInfo[] locations = getLocatedBlockLocations(block);
696    String[] hosts = new String[locations.length];
697    for (int i = 0; i < hosts.length; i++) {
698      hosts[i] = locations[i].getHostName();
699    }
700    return hosts;
701  }
702
703  /**
704   * Compute HDFS blocks distribution of a given file, or a portion of the file
705   * @param fs     file system
706   * @param status file status of the file
707   * @param start  start position of the portion
708   * @param length length of the portion
709   * @return The HDFS blocks distribution
710   */
711  static public HDFSBlocksDistribution computeHDFSBlocksDistribution(final FileSystem fs,
712    FileStatus status, long start, long length) throws IOException {
713    HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution();
714    BlockLocation[] blockLocations = fs.getFileBlockLocations(status, start, length);
715    addToHDFSBlocksDistribution(blocksDistribution, blockLocations);
716    return blocksDistribution;
717  }
718
719  /**
720   * Update blocksDistribution with blockLocations
721   * @param blocksDistribution the hdfs blocks distribution
722   * @param blockLocations     an array containing block location
723   */
724  static public void addToHDFSBlocksDistribution(HDFSBlocksDistribution blocksDistribution,
725    BlockLocation[] blockLocations) throws IOException {
726    for (BlockLocation bl : blockLocations) {
727      String[] hosts = bl.getHosts();
728      long len = bl.getLength();
729      StorageType[] storageTypes = bl.getStorageTypes();
730      blocksDistribution.addHostsAndBlockWeight(hosts, len, storageTypes);
731    }
732  }
733
734  // TODO move this method OUT of FSUtils. No dependencies to HMaster
735  /**
736   * Returns the total overall fragmentation percentage. Includes hbase:meta and -ROOT- as well.
737   * @param master The master defining the HBase root and file system
738   * @return A map for each table and its percentage (never null)
739   * @throws IOException When scanning the directory fails
740   */
741  public static int getTotalTableFragmentation(final HMaster master) throws IOException {
742    Map<String, Integer> map = getTableFragmentation(master);
743    return map.isEmpty() ? -1 : map.get("-TOTAL-");
744  }
745
746  /**
747   * Runs through the HBase rootdir and checks how many stores for each table have more than one
748   * file in them. Checks -ROOT- and hbase:meta too. The total percentage across all tables is
749   * stored under the special key "-TOTAL-".
750   * @param master The master defining the HBase root and file system.
751   * @return A map for each table and its percentage (never null).
752   * @throws IOException When scanning the directory fails.
753   */
754  public static Map<String, Integer> getTableFragmentation(final HMaster master)
755    throws IOException {
756    Path path = CommonFSUtils.getRootDir(master.getConfiguration());
757    // since HMaster.getFileSystem() is package private
758    FileSystem fs = path.getFileSystem(master.getConfiguration());
759    return getTableFragmentation(fs, path);
760  }
761
762  /**
763   * Runs through the HBase rootdir and checks how many stores for each table have more than one
764   * file in them. Checks -ROOT- and hbase:meta too. The total percentage across all tables is
765   * stored under the special key "-TOTAL-".
766   * @param fs           The file system to use
767   * @param hbaseRootDir The root directory to scan
768   * @return A map for each table and its percentage (never null)
769   * @throws IOException When scanning the directory fails
770   */
771  public static Map<String, Integer> getTableFragmentation(final FileSystem fs,
772    final Path hbaseRootDir) throws IOException {
773    Map<String, Integer> frags = new HashMap<>();
774    int cfCountTotal = 0;
775    int cfFragTotal = 0;
776    PathFilter regionFilter = new RegionDirFilter(fs);
777    PathFilter familyFilter = new FamilyDirFilter(fs);
778    List<Path> tableDirs = getTableDirs(fs, hbaseRootDir);
779    for (Path d : tableDirs) {
780      int cfCount = 0;
781      int cfFrag = 0;
782      FileStatus[] regionDirs = fs.listStatus(d, regionFilter);
783      for (FileStatus regionDir : regionDirs) {
784        Path dd = regionDir.getPath();
785        // else its a region name, now look in region for families
786        FileStatus[] familyDirs = fs.listStatus(dd, familyFilter);
787        for (FileStatus familyDir : familyDirs) {
788          cfCount++;
789          cfCountTotal++;
790          Path family = familyDir.getPath();
791          // now in family make sure only one file
792          FileStatus[] familyStatus = fs.listStatus(family);
793          if (familyStatus.length > 1) {
794            cfFrag++;
795            cfFragTotal++;
796          }
797        }
798      }
799      // compute percentage per table and store in result list
800      frags.put(CommonFSUtils.getTableName(d).getNameAsString(),
801        cfCount == 0 ? 0 : Math.round((float) cfFrag / cfCount * 100));
802    }
803    // set overall percentage for all tables
804    frags.put("-TOTAL-",
805      cfCountTotal == 0 ? 0 : Math.round((float) cfFragTotal / cfCountTotal * 100));
806    return frags;
807  }
808
809  public static void renameFile(FileSystem fs, Path src, Path dst) throws IOException {
810    if (fs.exists(dst) && !fs.delete(dst, false)) {
811      throw new IOException("Can not delete " + dst);
812    }
813    if (!fs.rename(src, dst)) {
814      throw new IOException("Can not rename from " + src + " to " + dst);
815    }
816  }
817
818  /**
819   * A {@link PathFilter} that returns only regular files.
820   */
821  static class FileFilter extends AbstractFileStatusFilter {
822    private final FileSystem fs;
823
824    public FileFilter(final FileSystem fs) {
825      this.fs = fs;
826    }
827
828    @Override
829    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
830      try {
831        return isFile(fs, isDir, p);
832      } catch (IOException e) {
833        LOG.warn("Unable to verify if path={} is a regular file", p, e);
834        return false;
835      }
836    }
837  }
838
839  /**
840   * Directory filter that doesn't include any of the directories in the specified blacklist
841   */
842  public static class BlackListDirFilter extends AbstractFileStatusFilter {
843    private final FileSystem fs;
844    private List<String> blacklist;
845
846    /**
847     * Create a filter on the givem filesystem with the specified blacklist
848     * @param fs                     filesystem to filter
849     * @param directoryNameBlackList list of the names of the directories to filter. If
850     *                               <tt>null</tt>, all directories are returned
851     */
852    @SuppressWarnings("unchecked")
853    public BlackListDirFilter(final FileSystem fs, final List<String> directoryNameBlackList) {
854      this.fs = fs;
855      blacklist = (List<String>) (directoryNameBlackList == null
856        ? Collections.emptyList()
857        : directoryNameBlackList);
858    }
859
860    @Override
861    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
862      if (!isValidName(p.getName())) {
863        return false;
864      }
865
866      try {
867        return isDirectory(fs, isDir, p);
868      } catch (IOException e) {
869        LOG.warn("An error occurred while verifying if [{}] is a valid directory."
870          + " Returning 'not valid' and continuing.", p, e);
871        return false;
872      }
873    }
874
875    protected boolean isValidName(final String name) {
876      return !blacklist.contains(name);
877    }
878  }
879
880  /**
881   * A {@link PathFilter} that only allows directories.
882   */
883  public static class DirFilter extends BlackListDirFilter {
884
885    public DirFilter(FileSystem fs) {
886      super(fs, null);
887    }
888  }
889
890  /**
891   * A {@link PathFilter} that returns usertable directories. To get all directories use the
892   * {@link BlackListDirFilter} with a <tt>null</tt> blacklist
893   */
894  public static class UserTableDirFilter extends BlackListDirFilter {
895    public UserTableDirFilter(FileSystem fs) {
896      super(fs, HConstants.HBASE_NON_TABLE_DIRS);
897    }
898
899    @Override
900    protected boolean isValidName(final String name) {
901      if (!super.isValidName(name)) return false;
902
903      try {
904        TableName.isLegalTableQualifierName(Bytes.toBytes(name));
905      } catch (IllegalArgumentException e) {
906        LOG.info("Invalid table name: {}", name);
907        return false;
908      }
909      return true;
910    }
911  }
912
913  public static List<Path> getTableDirs(final FileSystem fs, final Path rootdir)
914    throws IOException {
915    List<Path> tableDirs = new ArrayList<>();
916    Path baseNamespaceDir = new Path(rootdir, HConstants.BASE_NAMESPACE_DIR);
917    if (fs.exists(baseNamespaceDir)) {
918      for (FileStatus status : fs.globStatus(new Path(baseNamespaceDir, "*"))) {
919        tableDirs.addAll(FSUtils.getLocalTableDirs(fs, status.getPath()));
920      }
921    }
922    return tableDirs;
923  }
924
925  /**
926   * @return All the table directories under <code>rootdir</code>. Ignore non table hbase folders
927   *         such as .logs, .oldlogs, .corrupt folders.
928   */
929  public static List<Path> getLocalTableDirs(final FileSystem fs, final Path rootdir)
930    throws IOException {
931    // presumes any directory under hbase.rootdir is a table
932    FileStatus[] dirs = fs.listStatus(rootdir, new UserTableDirFilter(fs));
933    List<Path> tabledirs = new ArrayList<>(dirs.length);
934    for (FileStatus dir : dirs) {
935      tabledirs.add(dir.getPath());
936    }
937    return tabledirs;
938  }
939
940  /**
941   * Filter for all dirs that don't start with '.'
942   */
943  public static class RegionDirFilter extends AbstractFileStatusFilter {
944    // This pattern will accept 0.90+ style hex region dirs and older numeric region dir names.
945    final public static Pattern regionDirPattern = Pattern.compile("^[0-9a-f]*$");
946    final FileSystem fs;
947
948    public RegionDirFilter(FileSystem fs) {
949      this.fs = fs;
950    }
951
952    @Override
953    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
954      if (!regionDirPattern.matcher(p.getName()).matches()) {
955        return false;
956      }
957
958      try {
959        return isDirectory(fs, isDir, p);
960      } catch (IOException ioe) {
961        // Maybe the file was moved or the fs was disconnected.
962        LOG.warn("Skipping file {} due to IOException", p, ioe);
963        return false;
964      }
965    }
966  }
967
968  /**
969   * Given a particular table dir, return all the regiondirs inside it, excluding files such as
970   * .tableinfo
971   * @param fs       A file system for the Path
972   * @param tableDir Path to a specific table directory &lt;hbase.rootdir&gt;/&lt;tabledir&gt;
973   * @return List of paths to valid region directories in table dir.
974   */
975  public static List<Path> getRegionDirs(final FileSystem fs, final Path tableDir)
976    throws IOException {
977    // assumes we are in a table dir.
978    List<FileStatus> rds = listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs));
979    if (rds == null) {
980      return Collections.emptyList();
981    }
982    List<Path> regionDirs = new ArrayList<>(rds.size());
983    for (FileStatus rdfs : rds) {
984      Path rdPath = rdfs.getPath();
985      regionDirs.add(rdPath);
986    }
987    return regionDirs;
988  }
989
990  public static Path getRegionDirFromRootDir(Path rootDir, RegionInfo region) {
991    return getRegionDirFromTableDir(CommonFSUtils.getTableDir(rootDir, region.getTable()), region);
992  }
993
994  public static Path getRegionDirFromTableDir(Path tableDir, RegionInfo region) {
995    return getRegionDirFromTableDir(tableDir,
996      ServerRegionReplicaUtil.getRegionInfoForFs(region).getEncodedName());
997  }
998
999  public static Path getRegionDirFromTableDir(Path tableDir, String encodedRegionName) {
1000    return new Path(tableDir, encodedRegionName);
1001  }
1002
1003  /**
1004   * Filter for all dirs that are legal column family names. This is generally used for colfam dirs
1005   * &lt;hbase.rootdir&gt;/&lt;tabledir&gt;/&lt;regiondir&gt;/&lt;colfamdir&gt;.
1006   */
1007  public static class FamilyDirFilter extends AbstractFileStatusFilter {
1008    final FileSystem fs;
1009
1010    public FamilyDirFilter(FileSystem fs) {
1011      this.fs = fs;
1012    }
1013
1014    @Override
1015    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1016      try {
1017        // throws IAE if invalid
1018        ColumnFamilyDescriptorBuilder.isLegalColumnFamilyName(Bytes.toBytes(p.getName()));
1019      } catch (IllegalArgumentException iae) {
1020        // path name is an invalid family name and thus is excluded.
1021        return false;
1022      }
1023
1024      try {
1025        return isDirectory(fs, isDir, p);
1026      } catch (IOException ioe) {
1027        // Maybe the file was moved or the fs was disconnected.
1028        LOG.warn("Skipping file {} due to IOException", p, ioe);
1029        return false;
1030      }
1031    }
1032  }
1033
1034  /**
1035   * Given a particular region dir, return all the familydirs inside it
1036   * @param fs        A file system for the Path
1037   * @param regionDir Path to a specific region directory
1038   * @return List of paths to valid family directories in region dir.
1039   */
1040  public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir)
1041    throws IOException {
1042    // assumes we are in a region dir.
1043    return getFilePaths(fs, regionDir, new FamilyDirFilter(fs));
1044  }
1045
1046  public static List<Path> getReferenceFilePaths(final FileSystem fs, final Path familyDir)
1047    throws IOException {
1048    return getFilePaths(fs, familyDir, new ReferenceFileFilter(fs));
1049  }
1050
1051  public static List<Path> getReferenceAndLinkFilePaths(final FileSystem fs, final Path familyDir)
1052    throws IOException {
1053    return getFilePaths(fs, familyDir, new ReferenceAndLinkFileFilter(fs));
1054  }
1055
1056  private static List<Path> getFilePaths(final FileSystem fs, final Path dir,
1057    final PathFilter pathFilter) throws IOException {
1058    FileStatus[] fds = fs.listStatus(dir, pathFilter);
1059    List<Path> files = new ArrayList<>(fds.length);
1060    for (FileStatus fdfs : fds) {
1061      Path fdPath = fdfs.getPath();
1062      files.add(fdPath);
1063    }
1064    return files;
1065  }
1066
1067  public static int getRegionReferenceAndLinkFileCount(final FileSystem fs, final Path p) {
1068    int result = 0;
1069    try {
1070      for (Path familyDir : getFamilyDirs(fs, p)) {
1071        result += getReferenceAndLinkFilePaths(fs, familyDir).size();
1072      }
1073    } catch (IOException e) {
1074      LOG.warn("Error Counting reference files.", e);
1075    }
1076    return result;
1077  }
1078
1079  public static class ReferenceAndLinkFileFilter implements PathFilter {
1080
1081    private final FileSystem fs;
1082
1083    public ReferenceAndLinkFileFilter(FileSystem fs) {
1084      this.fs = fs;
1085    }
1086
1087    @Override
1088    public boolean accept(Path rd) {
1089      try {
1090        // only files can be references.
1091        return !fs.getFileStatus(rd).isDirectory()
1092          && (StoreFileInfo.isReference(rd) || HFileLink.isHFileLink(rd));
1093      } catch (IOException ioe) {
1094        // Maybe the file was moved or the fs was disconnected.
1095        LOG.warn("Skipping file " + rd + " due to IOException", ioe);
1096        return false;
1097      }
1098    }
1099  }
1100
1101  /**
1102   * Filter for HFiles that excludes reference files.
1103   */
1104  public static class HFileFilter extends AbstractFileStatusFilter {
1105    final FileSystem fs;
1106
1107    public HFileFilter(FileSystem fs) {
1108      this.fs = fs;
1109    }
1110
1111    @Override
1112    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1113      if (!StoreFileInfo.isHFile(p) && !StoreFileInfo.isMobFile(p)) {
1114        return false;
1115      }
1116
1117      try {
1118        return isFile(fs, isDir, p);
1119      } catch (IOException ioe) {
1120        // Maybe the file was moved or the fs was disconnected.
1121        LOG.warn("Skipping file {} due to IOException", p, ioe);
1122        return false;
1123      }
1124    }
1125  }
1126
1127  /**
1128   * Filter for HFileLinks (StoreFiles and HFiles not included). the filter itself does not consider
1129   * if a link is file or not.
1130   */
1131  public static class HFileLinkFilter implements PathFilter {
1132
1133    @Override
1134    public boolean accept(Path p) {
1135      return HFileLink.isHFileLink(p);
1136    }
1137  }
1138
1139  public static class ReferenceFileFilter extends AbstractFileStatusFilter {
1140
1141    private final FileSystem fs;
1142
1143    public ReferenceFileFilter(FileSystem fs) {
1144      this.fs = fs;
1145    }
1146
1147    @Override
1148    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1149      if (!StoreFileInfo.isReference(p)) {
1150        return false;
1151      }
1152
1153      try {
1154        // only files can be references.
1155        return isFile(fs, isDir, p);
1156      } catch (IOException ioe) {
1157        // Maybe the file was moved or the fs was disconnected.
1158        LOG.warn("Skipping file {} due to IOException", p, ioe);
1159        return false;
1160      }
1161    }
1162  }
1163
1164  /**
1165   * Called every so-often by storefile map builder getTableStoreFilePathMap to report progress.
1166   */
1167  interface ProgressReporter {
1168    /**
1169     * @param status File or directory we are about to process.
1170     */
1171    void progress(FileStatus status);
1172  }
1173
1174  /**
1175   * Runs through the HBase rootdir/tablename and creates a reverse lookup map for table StoreFile
1176   * names to the full Path. <br>
1177   * Example...<br>
1178   * Key = 3944417774205889744 <br>
1179   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1180   * @param map          map to add values. If null, this method will create and populate one to
1181   *                     return
1182   * @param fs           The file system to use.
1183   * @param hbaseRootDir The root directory to scan.
1184   * @param tableName    name of the table to scan.
1185   * @return Map keyed by StoreFile name with a value of the full Path.
1186   * @throws IOException When scanning the directory fails.
1187   */
1188  public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map,
1189    final FileSystem fs, final Path hbaseRootDir, TableName tableName)
1190    throws IOException, InterruptedException {
1191    return getTableStoreFilePathMap(map, fs, hbaseRootDir, tableName, null, null,
1192      (ProgressReporter) null);
1193  }
1194
1195  /**
1196   * Runs through the HBase rootdir/tablename and creates a reverse lookup map for table StoreFile
1197   * names to the full Path. Note that because this method can be called on a 'live' HBase system
1198   * that we will skip files that no longer exist by the time we traverse them and similarly the
1199   * user of the result needs to consider that some entries in this map may not exist by the time
1200   * this call completes. <br>
1201   * Example...<br>
1202   * Key = 3944417774205889744 <br>
1203   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1204   * @param resultMap        map to add values. If null, this method will create and populate one to
1205   *                         return
1206   * @param fs               The file system to use.
1207   * @param hbaseRootDir     The root directory to scan.
1208   * @param tableName        name of the table to scan.
1209   * @param sfFilter         optional path filter to apply to store files
1210   * @param executor         optional executor service to parallelize this operation
1211   * @param progressReporter Instance or null; gets called every time we move to new region of
1212   *                         family dir and for each store file.
1213   * @return Map keyed by StoreFile name with a value of the full Path.
1214   * @throws IOException When scanning the directory fails.
1215   * @deprecated Since 2.3.0. For removal in hbase4. Use ProgressReporter override instead.
1216   */
1217  @Deprecated
1218  public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap,
1219    final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter,
1220    ExecutorService executor, final HbckErrorReporter progressReporter)
1221    throws IOException, InterruptedException {
1222    return getTableStoreFilePathMap(resultMap, fs, hbaseRootDir, tableName, sfFilter, executor,
1223      new ProgressReporter() {
1224        @Override
1225        public void progress(FileStatus status) {
1226          // status is not used in this implementation.
1227          progressReporter.progress();
1228        }
1229      });
1230  }
1231
1232  /**
1233   * Runs through the HBase rootdir/tablename and creates a reverse lookup map for table StoreFile
1234   * names to the full Path. Note that because this method can be called on a 'live' HBase system
1235   * that we will skip files that no longer exist by the time we traverse them and similarly the
1236   * user of the result needs to consider that some entries in this map may not exist by the time
1237   * this call completes. <br>
1238   * Example...<br>
1239   * Key = 3944417774205889744 <br>
1240   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1241   * @param resultMap        map to add values. If null, this method will create and populate one to
1242   *                         return
1243   * @param fs               The file system to use.
1244   * @param hbaseRootDir     The root directory to scan.
1245   * @param tableName        name of the table to scan.
1246   * @param sfFilter         optional path filter to apply to store files
1247   * @param executor         optional executor service to parallelize this operation
1248   * @param progressReporter Instance or null; gets called every time we move to new region of
1249   *                         family dir and for each store file.
1250   * @return Map keyed by StoreFile name with a value of the full Path.
1251   * @throws IOException          When scanning the directory fails.
1252   * @throws InterruptedException the thread is interrupted, either before or during the activity.
1253   */
1254  public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap,
1255    final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter,
1256    ExecutorService executor, final ProgressReporter progressReporter)
1257    throws IOException, InterruptedException {
1258
1259    final Map<String, Path> finalResultMap =
1260      resultMap == null ? new ConcurrentHashMap<>(128, 0.75f, 32) : resultMap;
1261
1262    // only include the directory paths to tables
1263    Path tableDir = CommonFSUtils.getTableDir(hbaseRootDir, tableName);
1264    // Inside a table, there are compaction.dir directories to skip. Otherwise, all else
1265    // should be regions.
1266    final FamilyDirFilter familyFilter = new FamilyDirFilter(fs);
1267    final Vector<Exception> exceptions = new Vector<>();
1268
1269    try {
1270      List<FileStatus> regionDirs =
1271        FSUtils.listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs));
1272      if (regionDirs == null) {
1273        return finalResultMap;
1274      }
1275
1276      final List<Future<?>> futures = new ArrayList<>(regionDirs.size());
1277
1278      for (FileStatus regionDir : regionDirs) {
1279        if (null != progressReporter) {
1280          progressReporter.progress(regionDir);
1281        }
1282        final Path dd = regionDir.getPath();
1283
1284        if (!exceptions.isEmpty()) {
1285          break;
1286        }
1287
1288        Runnable getRegionStoreFileMapCall = new Runnable() {
1289          @Override
1290          public void run() {
1291            try {
1292              HashMap<String, Path> regionStoreFileMap = new HashMap<>();
1293              List<FileStatus> familyDirs =
1294                FSUtils.listStatusWithStatusFilter(fs, dd, familyFilter);
1295              if (familyDirs == null) {
1296                if (!fs.exists(dd)) {
1297                  LOG.warn("Skipping region because it no longer exists: " + dd);
1298                } else {
1299                  LOG.warn("Skipping region because it has no family dirs: " + dd);
1300                }
1301                return;
1302              }
1303              for (FileStatus familyDir : familyDirs) {
1304                if (null != progressReporter) {
1305                  progressReporter.progress(familyDir);
1306                }
1307                Path family = familyDir.getPath();
1308                if (family.getName().equals(HConstants.RECOVERED_EDITS_DIR)) {
1309                  continue;
1310                }
1311                // now in family, iterate over the StoreFiles and
1312                // put in map
1313                FileStatus[] familyStatus = fs.listStatus(family);
1314                for (FileStatus sfStatus : familyStatus) {
1315                  if (null != progressReporter) {
1316                    progressReporter.progress(sfStatus);
1317                  }
1318                  Path sf = sfStatus.getPath();
1319                  if (sfFilter == null || sfFilter.accept(sf)) {
1320                    regionStoreFileMap.put(sf.getName(), sf);
1321                  }
1322                }
1323              }
1324              finalResultMap.putAll(regionStoreFileMap);
1325            } catch (Exception e) {
1326              LOG.error("Could not get region store file map for region: " + dd, e);
1327              exceptions.add(e);
1328            }
1329          }
1330        };
1331
1332        // If executor is available, submit async tasks to exec concurrently, otherwise
1333        // just do serial sync execution
1334        if (executor != null) {
1335          Future<?> future = executor.submit(getRegionStoreFileMapCall);
1336          futures.add(future);
1337        } else {
1338          FutureTask<?> future = new FutureTask<>(getRegionStoreFileMapCall, null);
1339          future.run();
1340          futures.add(future);
1341        }
1342      }
1343
1344      // Ensure all pending tasks are complete (or that we run into an exception)
1345      for (Future<?> f : futures) {
1346        if (!exceptions.isEmpty()) {
1347          break;
1348        }
1349        try {
1350          f.get();
1351        } catch (ExecutionException e) {
1352          LOG.error("Unexpected exec exception!  Should've been caught already.  (Bug?)", e);
1353          // Shouldn't happen, we already logged/caught any exceptions in the Runnable
1354        }
1355      }
1356    } catch (IOException e) {
1357      LOG.error("Cannot execute getTableStoreFilePathMap for " + tableName, e);
1358      exceptions.add(e);
1359    } finally {
1360      if (!exceptions.isEmpty()) {
1361        // Just throw the first exception as an indication something bad happened
1362        // Don't need to propagate all the exceptions, we already logged them all anyway
1363        Throwables.propagateIfPossible(exceptions.firstElement(), IOException.class);
1364        throw new IOException(exceptions.firstElement());
1365      }
1366    }
1367
1368    return finalResultMap;
1369  }
1370
1371  public static int getRegionReferenceFileCount(final FileSystem fs, final Path p) {
1372    int result = 0;
1373    try {
1374      for (Path familyDir : getFamilyDirs(fs, p)) {
1375        result += getReferenceFilePaths(fs, familyDir).size();
1376      }
1377    } catch (IOException e) {
1378      LOG.warn("Error counting reference files", e);
1379    }
1380    return result;
1381  }
1382
1383  /**
1384   * Runs through the HBase rootdir and creates a reverse lookup map for table StoreFile names to
1385   * the full Path. <br>
1386   * Example...<br>
1387   * Key = 3944417774205889744 <br>
1388   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1389   * @param fs           The file system to use.
1390   * @param hbaseRootDir The root directory to scan.
1391   * @return Map keyed by StoreFile name with a value of the full Path.
1392   * @throws IOException When scanning the directory fails.
1393   */
1394  public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs,
1395    final Path hbaseRootDir) throws IOException, InterruptedException {
1396    return getTableStoreFilePathMap(fs, hbaseRootDir, null, null, (ProgressReporter) null);
1397  }
1398
1399  /**
1400   * Runs through the HBase rootdir and creates a reverse lookup map for table StoreFile names to
1401   * the full Path. <br>
1402   * Example...<br>
1403   * Key = 3944417774205889744 <br>
1404   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1405   * @param fs               The file system to use.
1406   * @param hbaseRootDir     The root directory to scan.
1407   * @param sfFilter         optional path filter to apply to store files
1408   * @param executor         optional executor service to parallelize this operation
1409   * @param progressReporter Instance or null; gets called every time we move to new region of
1410   *                         family dir and for each store file.
1411   * @return Map keyed by StoreFile name with a value of the full Path.
1412   * @throws IOException When scanning the directory fails.
1413   * @deprecated Since 2.3.0. Will be removed in hbase4. Used
1414   *             {@link #getTableStoreFilePathMap(FileSystem, Path, PathFilter, ExecutorService, ProgressReporter)}
1415   */
1416  @Deprecated
1417  public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs,
1418    final Path hbaseRootDir, PathFilter sfFilter, ExecutorService executor,
1419    HbckErrorReporter progressReporter) throws IOException, InterruptedException {
1420    return getTableStoreFilePathMap(fs, hbaseRootDir, sfFilter, executor, new ProgressReporter() {
1421      @Override
1422      public void progress(FileStatus status) {
1423        // status is not used in this implementation.
1424        progressReporter.progress();
1425      }
1426    });
1427  }
1428
1429  /**
1430   * Runs through the HBase rootdir and creates a reverse lookup map for table StoreFile names to
1431   * the full Path. <br>
1432   * Example...<br>
1433   * Key = 3944417774205889744 <br>
1434   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1435   * @param fs               The file system to use.
1436   * @param hbaseRootDir     The root directory to scan.
1437   * @param sfFilter         optional path filter to apply to store files
1438   * @param executor         optional executor service to parallelize this operation
1439   * @param progressReporter Instance or null; gets called every time we move to new region of
1440   *                         family dir and for each store file.
1441   * @return Map keyed by StoreFile name with a value of the full Path.
1442   * @throws IOException When scanning the directory fails.
1443   */
1444  public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs,
1445    final Path hbaseRootDir, PathFilter sfFilter, ExecutorService executor,
1446    ProgressReporter progressReporter) throws IOException, InterruptedException {
1447    ConcurrentHashMap<String, Path> map = new ConcurrentHashMap<>(1024, 0.75f, 32);
1448
1449    // if this method looks similar to 'getTableFragmentation' that is because
1450    // it was borrowed from it.
1451
1452    // only include the directory paths to tables
1453    for (Path tableDir : FSUtils.getTableDirs(fs, hbaseRootDir)) {
1454      getTableStoreFilePathMap(map, fs, hbaseRootDir, CommonFSUtils.getTableName(tableDir),
1455        sfFilter, executor, progressReporter);
1456    }
1457    return map;
1458  }
1459
1460  /**
1461   * Filters FileStatuses in an array and returns a list
1462   * @param input  An array of FileStatuses
1463   * @param filter A required filter to filter the array
1464   * @return A list of FileStatuses
1465   */
1466  public static List<FileStatus> filterFileStatuses(FileStatus[] input, FileStatusFilter filter) {
1467    if (input == null) return null;
1468    return filterFileStatuses(Iterators.forArray(input), filter);
1469  }
1470
1471  /**
1472   * Filters FileStatuses in an iterator and returns a list
1473   * @param input  An iterator of FileStatuses
1474   * @param filter A required filter to filter the array
1475   * @return A list of FileStatuses
1476   */
1477  public static List<FileStatus> filterFileStatuses(Iterator<FileStatus> input,
1478    FileStatusFilter filter) {
1479    if (input == null) return null;
1480    ArrayList<FileStatus> results = new ArrayList<>();
1481    while (input.hasNext()) {
1482      FileStatus f = input.next();
1483      if (filter.accept(f)) {
1484        results.add(f);
1485      }
1486    }
1487    return results;
1488  }
1489
1490  /**
1491   * Calls fs.listStatus() and treats FileNotFoundException as non-fatal This accommodates
1492   * differences between hadoop versions, where hadoop 1 does not throw a FileNotFoundException, and
1493   * return an empty FileStatus[] while Hadoop 2 will throw FileNotFoundException.
1494   * @param fs     file system
1495   * @param dir    directory
1496   * @param filter file status filter
1497   * @return null if dir is empty or doesn't exist, otherwise FileStatus list
1498   */
1499  public static List<FileStatus> listStatusWithStatusFilter(final FileSystem fs, final Path dir,
1500    final FileStatusFilter filter) throws IOException {
1501    FileStatus[] status = null;
1502    try {
1503      status = fs.listStatus(dir);
1504    } catch (FileNotFoundException fnfe) {
1505      LOG.trace("{} does not exist", dir);
1506      return null;
1507    }
1508
1509    if (ArrayUtils.getLength(status) == 0) {
1510      return null;
1511    }
1512
1513    if (filter == null) {
1514      return Arrays.asList(status);
1515    } else {
1516      List<FileStatus> status2 = filterFileStatuses(status, filter);
1517      if (status2 == null || status2.isEmpty()) {
1518        return null;
1519      } else {
1520        return status2;
1521      }
1522    }
1523  }
1524
1525  /**
1526   * This function is to scan the root path of the file system to get the degree of locality for
1527   * each region on each of the servers having at least one block of that region. This is used by
1528   * the tool {@link org.apache.hadoop.hbase.master.RegionPlacementMaintainer} the configuration to
1529   * use
1530   * @return the mapping from region encoded name to a map of server names to locality fraction in
1531   *         case of file system errors or interrupts
1532   */
1533  public static Map<String, Map<String, Float>>
1534    getRegionDegreeLocalityMappingFromFS(final Configuration conf) throws IOException {
1535    return getRegionDegreeLocalityMappingFromFS(conf, null,
1536      conf.getInt(THREAD_POOLSIZE, DEFAULT_THREAD_POOLSIZE));
1537
1538  }
1539
1540  /**
1541   * This function is to scan the root path of the file system to get the degree of locality for
1542   * each region on each of the servers having at least one block of that region. the configuration
1543   * to use the table you wish to scan locality for the thread pool size to use
1544   * @return the mapping from region encoded name to a map of server names to locality fraction in
1545   *         case of file system errors or interrupts
1546   */
1547  public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
1548    final Configuration conf, final String desiredTable, int threadPoolSize) throws IOException {
1549    Map<String, Map<String, Float>> regionDegreeLocalityMapping = new ConcurrentHashMap<>();
1550    getRegionLocalityMappingFromFS(conf, desiredTable, threadPoolSize, regionDegreeLocalityMapping);
1551    return regionDegreeLocalityMapping;
1552  }
1553
1554  /**
1555   * This function is to scan the root path of the file system to get either the mapping between the
1556   * region name and its best locality region server or the degree of locality of each region on
1557   * each of the servers having at least one block of that region. The output map parameters are
1558   * both optional. the configuration to use the table you wish to scan locality for the thread pool
1559   * size to use the map into which to put the locality degree mapping or null, must be a
1560   * thread-safe implementation in case of file system errors or interrupts
1561   */
1562  private static void getRegionLocalityMappingFromFS(final Configuration conf,
1563    final String desiredTable, int threadPoolSize,
1564    final Map<String, Map<String, Float>> regionDegreeLocalityMapping) throws IOException {
1565    final FileSystem fs = FileSystem.get(conf);
1566    final Path rootPath = CommonFSUtils.getRootDir(conf);
1567    final long startTime = EnvironmentEdgeManager.currentTime();
1568    final Path queryPath;
1569    // The table files are in ${hbase.rootdir}/data/<namespace>/<table>/*
1570    if (null == desiredTable) {
1571      queryPath =
1572        new Path(new Path(rootPath, HConstants.BASE_NAMESPACE_DIR).toString() + "/*/*/*/");
1573    } else {
1574      queryPath = new Path(
1575        CommonFSUtils.getTableDir(rootPath, TableName.valueOf(desiredTable)).toString() + "/*/");
1576    }
1577
1578    // reject all paths that are not appropriate
1579    PathFilter pathFilter = new PathFilter() {
1580      @Override
1581      public boolean accept(Path path) {
1582        // this is the region name; it may get some noise data
1583        if (null == path) {
1584          return false;
1585        }
1586
1587        // no parent?
1588        Path parent = path.getParent();
1589        if (null == parent) {
1590          return false;
1591        }
1592
1593        String regionName = path.getName();
1594        if (null == regionName) {
1595          return false;
1596        }
1597
1598        if (!regionName.toLowerCase(Locale.ROOT).matches("[0-9a-f]+")) {
1599          return false;
1600        }
1601        return true;
1602      }
1603    };
1604
1605    FileStatus[] statusList = fs.globStatus(queryPath, pathFilter);
1606
1607    if (LOG.isDebugEnabled()) {
1608      LOG.debug("Query Path: {} ; # list of files: {}", queryPath, Arrays.toString(statusList));
1609    }
1610
1611    if (null == statusList) {
1612      return;
1613    }
1614
1615    // lower the number of threads in case we have very few expected regions
1616    threadPoolSize = Math.min(threadPoolSize, statusList.length);
1617
1618    // run in multiple threads
1619    final ExecutorService tpe = Executors.newFixedThreadPool(threadPoolSize,
1620      new ThreadFactoryBuilder().setNameFormat("FSRegionQuery-pool-%d").setDaemon(true)
1621        .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
1622    try {
1623      // ignore all file status items that are not of interest
1624      for (FileStatus regionStatus : statusList) {
1625        if (null == regionStatus || !regionStatus.isDirectory()) {
1626          continue;
1627        }
1628
1629        final Path regionPath = regionStatus.getPath();
1630        if (null != regionPath) {
1631          tpe.execute(new FSRegionScanner(fs, regionPath, null, regionDegreeLocalityMapping));
1632        }
1633      }
1634    } finally {
1635      tpe.shutdown();
1636      final long threadWakeFrequency = (long) conf.getInt(HConstants.THREAD_WAKE_FREQUENCY,
1637        HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
1638      try {
1639        // here we wait until TPE terminates, which is either naturally or by
1640        // exceptions in the execution of the threads
1641        while (!tpe.awaitTermination(threadWakeFrequency, TimeUnit.MILLISECONDS)) {
1642          // printing out rough estimate, so as to not introduce
1643          // AtomicInteger
1644          LOG.info("Locality checking is underway: { Scanned Regions : "
1645            + ((ThreadPoolExecutor) tpe).getCompletedTaskCount() + "/"
1646            + ((ThreadPoolExecutor) tpe).getTaskCount() + " }");
1647        }
1648      } catch (InterruptedException e) {
1649        Thread.currentThread().interrupt();
1650        throw (InterruptedIOException) new InterruptedIOException().initCause(e);
1651      }
1652    }
1653
1654    long overhead = EnvironmentEdgeManager.currentTime() - startTime;
1655    LOG.info("Scan DFS for locality info takes {}ms", overhead);
1656  }
1657
1658  /**
1659   * Do our short circuit read setup. Checks buffer size to use and whether to do checksumming in
1660   * hbase or hdfs.
1661   */
1662  public static void setupShortCircuitRead(final Configuration conf) {
1663    // Check that the user has not set the "dfs.client.read.shortcircuit.skip.checksum" property.
1664    boolean shortCircuitSkipChecksum =
1665      conf.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false);
1666    boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
1667    if (shortCircuitSkipChecksum) {
1668      LOG.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not "
1669        + "be set to true."
1670        + (useHBaseChecksum
1671          ? " HBase checksum doesn't require "
1672            + "it, see https://issues.apache.org/jira/browse/HBASE-6868."
1673          : ""));
1674      assert !shortCircuitSkipChecksum; // this will fail if assertions are on
1675    }
1676    checkShortCircuitReadBufferSize(conf);
1677  }
1678
1679  /**
1680   * Check if short circuit read buffer size is set and if not, set it to hbase value.
1681   */
1682  public static void checkShortCircuitReadBufferSize(final Configuration conf) {
1683    final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2;
1684    final int notSet = -1;
1685    // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2
1686    final String dfsKey = "dfs.client.read.shortcircuit.buffer.size";
1687    int size = conf.getInt(dfsKey, notSet);
1688    // If a size is set, return -- we will use it.
1689    if (size != notSet) return;
1690    // But short circuit buffer size is normally not set. Put in place the hbase wanted size.
1691    int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize);
1692    conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
1693  }
1694
1695  /**
1696   * Returns The DFSClient DFSHedgedReadMetrics instance or null if can't be found or not on hdfs.
1697   */
1698  public static DFSHedgedReadMetrics getDFSHedgedReadMetrics(final Configuration c)
1699    throws IOException {
1700    if (!CommonFSUtils.isHDFS(c)) {
1701      return null;
1702    }
1703    // getHedgedReadMetrics is package private. Get the DFSClient instance that is internal
1704    // to the DFS FS instance and make the method getHedgedReadMetrics accessible, then invoke it
1705    // to get the singleton instance of DFSHedgedReadMetrics shared by DFSClients.
1706    final String name = "getHedgedReadMetrics";
1707    DFSClient dfsclient = ((DistributedFileSystem) FileSystem.get(c)).getClient();
1708    Method m;
1709    try {
1710      m = dfsclient.getClass().getDeclaredMethod(name);
1711    } catch (NoSuchMethodException e) {
1712      LOG.warn(
1713        "Failed find method " + name + " in dfsclient; no hedged read metrics: " + e.getMessage());
1714      return null;
1715    } catch (SecurityException e) {
1716      LOG.warn(
1717        "Failed find method " + name + " in dfsclient; no hedged read metrics: " + e.getMessage());
1718      return null;
1719    }
1720    m.setAccessible(true);
1721    try {
1722      return (DFSHedgedReadMetrics) m.invoke(dfsclient);
1723    } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
1724      LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: "
1725        + e.getMessage());
1726      return null;
1727    }
1728  }
1729
1730  public static List<Path> copyFilesParallel(FileSystem srcFS, Path src, FileSystem dstFS, Path dst,
1731    Configuration conf, int threads) throws IOException {
1732    ExecutorService pool = Executors.newFixedThreadPool(threads);
1733    List<Future<Void>> futures = new ArrayList<>();
1734    List<Path> traversedPaths;
1735    try {
1736      traversedPaths = copyFiles(srcFS, src, dstFS, dst, conf, pool, futures);
1737      for (Future<Void> future : futures) {
1738        future.get();
1739      }
1740    } catch (ExecutionException | InterruptedException | IOException e) {
1741      throw new IOException("Copy snapshot reference files failed", e);
1742    } finally {
1743      pool.shutdownNow();
1744    }
1745    return traversedPaths;
1746  }
1747
1748  private static List<Path> copyFiles(FileSystem srcFS, Path src, FileSystem dstFS, Path dst,
1749    Configuration conf, ExecutorService pool, List<Future<Void>> futures) throws IOException {
1750    List<Path> traversedPaths = new ArrayList<>();
1751    traversedPaths.add(dst);
1752    FileStatus currentFileStatus = srcFS.getFileStatus(src);
1753    if (currentFileStatus.isDirectory()) {
1754      if (!dstFS.mkdirs(dst)) {
1755        throw new IOException("Create directory failed: " + dst);
1756      }
1757      FileStatus[] subPaths = srcFS.listStatus(src);
1758      for (FileStatus subPath : subPaths) {
1759        traversedPaths.addAll(copyFiles(srcFS, subPath.getPath(), dstFS,
1760          new Path(dst, subPath.getPath().getName()), conf, pool, futures));
1761      }
1762    } else {
1763      Future<Void> future = pool.submit(() -> {
1764        FileUtil.copy(srcFS, src, dstFS, dst, false, false, conf);
1765        return null;
1766      });
1767      futures.add(future);
1768    }
1769    return traversedPaths;
1770  }
1771
1772  /** Returns A set containing all namenode addresses of fs */
1773  private static Set<InetSocketAddress> getNNAddresses(DistributedFileSystem fs,
1774    Configuration conf) {
1775    Set<InetSocketAddress> addresses = new HashSet<>();
1776    String serviceName = fs.getCanonicalServiceName();
1777
1778    if (serviceName.startsWith("ha-hdfs")) {
1779      try {
1780        Map<String, Map<String, InetSocketAddress>> addressMap =
1781          DFSUtil.getNNServiceRpcAddressesForCluster(conf);
1782        String nameService = serviceName.substring(serviceName.indexOf(":") + 1);
1783        if (addressMap.containsKey(nameService)) {
1784          Map<String, InetSocketAddress> nnMap = addressMap.get(nameService);
1785          for (Map.Entry<String, InetSocketAddress> e2 : nnMap.entrySet()) {
1786            InetSocketAddress addr = e2.getValue();
1787            addresses.add(addr);
1788          }
1789        }
1790      } catch (Exception e) {
1791        LOG.warn("DFSUtil.getNNServiceRpcAddresses failed. serviceName=" + serviceName, e);
1792      }
1793    } else {
1794      URI uri = fs.getUri();
1795      int port = uri.getPort();
1796      if (port < 0) {
1797        int idx = serviceName.indexOf(':');
1798        port = Integer.parseInt(serviceName.substring(idx + 1));
1799      }
1800      InetSocketAddress addr = new InetSocketAddress(uri.getHost(), port);
1801      addresses.add(addr);
1802    }
1803
1804    return addresses;
1805  }
1806
1807  /**
1808   * @param conf the Configuration of HBase
1809   * @return Whether srcFs and desFs are on same hdfs or not
1810   */
1811  public static boolean isSameHdfs(Configuration conf, FileSystem srcFs, FileSystem desFs) {
1812    // By getCanonicalServiceName, we could make sure both srcFs and desFs
1813    // show a unified format which contains scheme, host and port.
1814    String srcServiceName = srcFs.getCanonicalServiceName();
1815    String desServiceName = desFs.getCanonicalServiceName();
1816
1817    if (srcServiceName == null || desServiceName == null) {
1818      return false;
1819    }
1820    if (srcServiceName.equals(desServiceName)) {
1821      return true;
1822    }
1823    if (srcServiceName.startsWith("ha-hdfs") && desServiceName.startsWith("ha-hdfs")) {
1824      Collection<String> internalNameServices =
1825        conf.getTrimmedStringCollection("dfs.internal.nameservices");
1826      if (!internalNameServices.isEmpty()) {
1827        if (internalNameServices.contains(srcServiceName.split(":")[1])) {
1828          return true;
1829        } else {
1830          return false;
1831        }
1832      }
1833    }
1834    if (srcFs instanceof DistributedFileSystem && desFs instanceof DistributedFileSystem) {
1835      // If one serviceName is an HA format while the other is a non-HA format,
1836      // maybe they refer to the same FileSystem.
1837      // For example, srcFs is "ha-hdfs://nameservices" and desFs is "hdfs://activeNamenode:port"
1838      Set<InetSocketAddress> srcAddrs = getNNAddresses((DistributedFileSystem) srcFs, conf);
1839      Set<InetSocketAddress> desAddrs = getNNAddresses((DistributedFileSystem) desFs, conf);
1840      if (Sets.intersection(srcAddrs, desAddrs).size() > 0) {
1841        return true;
1842      }
1843    }
1844
1845    return false;
1846  }
1847}