001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import static org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET;
021
022import edu.umd.cs.findbugs.annotations.CheckForNull;
023import java.io.ByteArrayInputStream;
024import java.io.DataInputStream;
025import java.io.EOFException;
026import java.io.FileNotFoundException;
027import java.io.IOException;
028import java.io.InterruptedIOException;
029import java.lang.reflect.InvocationTargetException;
030import java.lang.reflect.Method;
031import java.net.InetSocketAddress;
032import java.net.URI;
033import java.util.ArrayList;
034import java.util.Arrays;
035import java.util.Collection;
036import java.util.Collections;
037import java.util.HashMap;
038import java.util.HashSet;
039import java.util.Iterator;
040import java.util.List;
041import java.util.Locale;
042import java.util.Map;
043import java.util.Optional;
044import java.util.Set;
045import java.util.Vector;
046import java.util.concurrent.ConcurrentHashMap;
047import java.util.concurrent.ExecutionException;
048import java.util.concurrent.ExecutorService;
049import java.util.concurrent.Executors;
050import java.util.concurrent.Future;
051import java.util.concurrent.FutureTask;
052import java.util.concurrent.ThreadPoolExecutor;
053import java.util.concurrent.TimeUnit;
054import java.util.regex.Pattern;
055import org.apache.commons.lang3.ArrayUtils;
056import org.apache.hadoop.conf.Configuration;
057import org.apache.hadoop.fs.BlockLocation;
058import org.apache.hadoop.fs.FSDataInputStream;
059import org.apache.hadoop.fs.FSDataOutputStream;
060import org.apache.hadoop.fs.FileStatus;
061import org.apache.hadoop.fs.FileSystem;
062import org.apache.hadoop.fs.FileUtil;
063import org.apache.hadoop.fs.Path;
064import org.apache.hadoop.fs.PathFilter;
065import org.apache.hadoop.fs.StorageType;
066import org.apache.hadoop.fs.permission.FsPermission;
067import org.apache.hadoop.hbase.ClusterId;
068import org.apache.hadoop.hbase.HConstants;
069import org.apache.hadoop.hbase.HDFSBlocksDistribution;
070import org.apache.hadoop.hbase.TableName;
071import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
072import org.apache.hadoop.hbase.client.RegionInfo;
073import org.apache.hadoop.hbase.client.RegionInfoBuilder;
074import org.apache.hadoop.hbase.exceptions.DeserializationException;
075import org.apache.hadoop.hbase.fs.HFileSystem;
076import org.apache.hadoop.hbase.io.HFileLink;
077import org.apache.hadoop.hbase.master.HMaster;
078import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
079import org.apache.hadoop.hdfs.DFSClient;
080import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
081import org.apache.hadoop.hdfs.DFSUtil;
082import org.apache.hadoop.hdfs.DistributedFileSystem;
083import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
084import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
085import org.apache.hadoop.hdfs.protocol.LocatedBlock;
086import org.apache.hadoop.io.IOUtils;
087import org.apache.hadoop.ipc.RemoteException;
088import org.apache.yetus.audience.InterfaceAudience;
089import org.slf4j.Logger;
090import org.slf4j.LoggerFactory;
091
092import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
093import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
094import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
095import org.apache.hbase.thirdparty.com.google.common.primitives.Ints;
096import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
097
098import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
099import org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos;
100
101/**
102 * Utility methods for interacting with the underlying file system.
103 */
104@InterfaceAudience.Private
105public final class FSUtils {
106  private static final Logger LOG = LoggerFactory.getLogger(FSUtils.class);
107
108  private static final String THREAD_POOLSIZE = "hbase.client.localityCheck.threadPoolSize";
109  private static final int DEFAULT_THREAD_POOLSIZE = 2;
110
111  /** Set to true on Windows platforms */
112  // currently only used in testing. TODO refactor into a test class
113  public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
114
115  private FSUtils() {
116  }
117
118  /** Returns True is <code>fs</code> is instance of DistributedFileSystem n */
119  public static boolean isDistributedFileSystem(final FileSystem fs) throws IOException {
120    FileSystem fileSystem = fs;
121    // If passed an instance of HFileSystem, it fails instanceof DistributedFileSystem.
122    // Check its backing fs for dfs-ness.
123    if (fs instanceof HFileSystem) {
124      fileSystem = ((HFileSystem) fs).getBackingFs();
125    }
126    return fileSystem instanceof DistributedFileSystem;
127  }
128
129  /**
130   * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
131   * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true. Does not consider
132   * schema; i.e. if schemas different but path or subpath matches, the two will equate.
133   * @param pathToSearch Path we will be trying to match.
134   * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
135   */
136  public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) {
137    Path tailPath = pathTail;
138    String tailName;
139    Path toSearch = pathToSearch;
140    String toSearchName;
141    boolean result = false;
142
143    if (pathToSearch.depth() != pathTail.depth()) {
144      return false;
145    }
146
147    do {
148      tailName = tailPath.getName();
149      if (tailName == null || tailName.isEmpty()) {
150        result = true;
151        break;
152      }
153      toSearchName = toSearch.getName();
154      if (toSearchName == null || toSearchName.isEmpty()) {
155        break;
156      }
157      // Move up a parent on each path for next go around. Path doesn't let us go off the end.
158      tailPath = tailPath.getParent();
159      toSearch = toSearch.getParent();
160    } while (tailName.equals(toSearchName));
161    return result;
162  }
163
164  /**
165   * Delete the region directory if exists.
166   * @return True if deleted the region directory.
167   */
168  public static boolean deleteRegionDir(final Configuration conf, final RegionInfo hri)
169    throws IOException {
170    Path rootDir = CommonFSUtils.getRootDir(conf);
171    FileSystem fs = rootDir.getFileSystem(conf);
172    return CommonFSUtils.deleteDirectory(fs,
173      new Path(CommonFSUtils.getTableDir(rootDir, hri.getTable()), hri.getEncodedName()));
174  }
175
176  /**
177   * Create the specified file on the filesystem. By default, this will:
178   * <ol>
179   * <li>overwrite the file if it exists</li>
180   * <li>apply the umask in the configuration (if it is enabled)</li>
181   * <li>use the fs configured buffer size (or 4096 if not set)</li>
182   * <li>use the configured column family replication or default replication if
183   * {@link ColumnFamilyDescriptorBuilder#DEFAULT_DFS_REPLICATION}</li>
184   * <li>use the default block size</li>
185   * <li>not track progress</li>
186   * </ol>
187   * @param conf         configurations
188   * @param fs           {@link FileSystem} on which to write the file
189   * @param path         {@link Path} to the file to write
190   * @param perm         permissions
191   * @param favoredNodes favored data nodes
192   * @return output stream to the created file
193   * @throws IOException if the file cannot be created
194   */
195  public static FSDataOutputStream create(Configuration conf, FileSystem fs, Path path,
196    FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException {
197    if (fs instanceof HFileSystem) {
198      FileSystem backingFs = ((HFileSystem) fs).getBackingFs();
199      if (backingFs instanceof DistributedFileSystem) {
200        short replication = Short.parseShort(conf.get(ColumnFamilyDescriptorBuilder.DFS_REPLICATION,
201          String.valueOf(ColumnFamilyDescriptorBuilder.DEFAULT_DFS_REPLICATION)));
202        DistributedFileSystem.HdfsDataOutputStreamBuilder builder =
203          ((DistributedFileSystem) backingFs).createFile(path).recursive().permission(perm)
204            .create();
205        if (favoredNodes != null) {
206          builder.favoredNodes(favoredNodes);
207        }
208        if (replication > 0) {
209          builder.replication(replication);
210        }
211        return builder.build();
212      }
213
214    }
215    return CommonFSUtils.create(fs, path, perm, true);
216  }
217
218  /**
219   * Checks to see if the specified file system is available
220   * @param fs filesystem
221   * @throws IOException e
222   */
223  public static void checkFileSystemAvailable(final FileSystem fs) throws IOException {
224    if (!(fs instanceof DistributedFileSystem)) {
225      return;
226    }
227    IOException exception = null;
228    DistributedFileSystem dfs = (DistributedFileSystem) fs;
229    try {
230      if (dfs.exists(new Path("/"))) {
231        return;
232      }
233    } catch (IOException e) {
234      exception = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
235    }
236    try {
237      fs.close();
238    } catch (Exception e) {
239      LOG.error("file system close failed: ", e);
240    }
241    throw new IOException("File system is not available", exception);
242  }
243
244  /**
245   * Inquire the Active NameNode's safe mode status.
246   * @param dfs A DistributedFileSystem object representing the underlying HDFS.
247   * @return whether we're in safe mode
248   */
249  private static boolean isInSafeMode(DistributedFileSystem dfs) throws IOException {
250    return dfs.setSafeMode(SAFEMODE_GET, true);
251  }
252
253  /**
254   * Check whether dfs is in safemode.
255   */
256  public static void checkDfsSafeMode(final Configuration conf) throws IOException {
257    boolean isInSafeMode = false;
258    FileSystem fs = FileSystem.get(conf);
259    if (fs instanceof DistributedFileSystem) {
260      DistributedFileSystem dfs = (DistributedFileSystem) fs;
261      isInSafeMode = isInSafeMode(dfs);
262    }
263    if (isInSafeMode) {
264      throw new IOException("File system is in safemode, it can't be written now");
265    }
266  }
267
268  /**
269   * Verifies current version of file system
270   * @param fs      filesystem object
271   * @param rootdir root hbase directory
272   * @return null if no version file exists, version string otherwise
273   * @throws IOException              if the version file fails to open
274   * @throws DeserializationException if the version data cannot be translated into a version
275   */
276  public static String getVersion(FileSystem fs, Path rootdir)
277    throws IOException, DeserializationException {
278    final Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
279    FileStatus[] status = null;
280    try {
281      // hadoop 2.0 throws FNFE if directory does not exist.
282      // hadoop 1.0 returns null if directory does not exist.
283      status = fs.listStatus(versionFile);
284    } catch (FileNotFoundException fnfe) {
285      return null;
286    }
287    if (ArrayUtils.getLength(status) == 0) {
288      return null;
289    }
290    String version = null;
291    byte[] content = new byte[(int) status[0].getLen()];
292    FSDataInputStream s = fs.open(versionFile);
293    try {
294      IOUtils.readFully(s, content, 0, content.length);
295      if (ProtobufUtil.isPBMagicPrefix(content)) {
296        version = parseVersionFrom(content);
297      } else {
298        // Presume it pre-pb format.
299        try (DataInputStream dis = new DataInputStream(new ByteArrayInputStream(content))) {
300          version = dis.readUTF();
301        }
302      }
303    } catch (EOFException eof) {
304      LOG.warn("Version file was empty, odd, will try to set it.");
305    } finally {
306      s.close();
307    }
308    return version;
309  }
310
311  /**
312   * Parse the content of the ${HBASE_ROOTDIR}/hbase.version file.
313   * @param bytes The byte content of the hbase.version file
314   * @return The version found in the file as a String
315   * @throws DeserializationException if the version data cannot be translated into a version
316   */
317  static String parseVersionFrom(final byte[] bytes) throws DeserializationException {
318    ProtobufUtil.expectPBMagicPrefix(bytes);
319    int pblen = ProtobufUtil.lengthOfPBMagic();
320    FSProtos.HBaseVersionFileContent.Builder builder =
321      FSProtos.HBaseVersionFileContent.newBuilder();
322    try {
323      ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
324      return builder.getVersion();
325    } catch (IOException e) {
326      // Convert
327      throw new DeserializationException(e);
328    }
329  }
330
331  /**
332   * Create the content to write into the ${HBASE_ROOTDIR}/hbase.version file.
333   * @param version Version to persist
334   * @return Serialized protobuf with <code>version</code> content and a bit of pb magic for a
335   *         prefix.
336   */
337  static byte[] toVersionByteArray(final String version) {
338    FSProtos.HBaseVersionFileContent.Builder builder =
339      FSProtos.HBaseVersionFileContent.newBuilder();
340    return ProtobufUtil.prependPBMagic(builder.setVersion(version).build().toByteArray());
341  }
342
343  /**
344   * Verifies current version of file system
345   * @param fs      file system
346   * @param rootdir root directory of HBase installation
347   * @param message if true, issues a message on System.out
348   * @throws IOException              if the version file cannot be opened
349   * @throws DeserializationException if the contents of the version file cannot be parsed
350   */
351  public static void checkVersion(FileSystem fs, Path rootdir, boolean message)
352    throws IOException, DeserializationException {
353    checkVersion(fs, rootdir, message, 0, HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
354  }
355
356  /**
357   * Verifies current version of file system
358   * @param fs      file system
359   * @param rootdir root directory of HBase installation
360   * @param message if true, issues a message on System.out
361   * @param wait    wait interval
362   * @param retries number of times to retry
363   * @throws IOException              if the version file cannot be opened
364   * @throws DeserializationException if the contents of the version file cannot be parsed
365   */
366  public static void checkVersion(FileSystem fs, Path rootdir, boolean message, int wait,
367    int retries) throws IOException, DeserializationException {
368    String version = getVersion(fs, rootdir);
369    String msg;
370    if (version == null) {
371      if (!metaRegionExists(fs, rootdir)) {
372        // rootDir is empty (no version file and no root region)
373        // just create new version file (HBASE-1195)
374        setVersion(fs, rootdir, wait, retries);
375        return;
376      } else {
377        msg = "hbase.version file is missing. Is your hbase.rootdir valid? "
378          + "You can restore hbase.version file by running 'HBCK2 filesystem -fix'. "
379          + "See https://github.com/apache/hbase-operator-tools/tree/master/hbase-hbck2";
380      }
381    } else if (version.compareTo(HConstants.FILE_SYSTEM_VERSION) == 0) {
382      return;
383    } else {
384      msg = "HBase file layout needs to be upgraded. Current filesystem version is " + version
385        + " but software requires version " + HConstants.FILE_SYSTEM_VERSION
386        + ". Consult http://hbase.apache.org/book.html for further information about "
387        + "upgrading HBase.";
388    }
389
390    // version is deprecated require migration
391    // Output on stdout so user sees it in terminal.
392    if (message) {
393      System.out.println("WARNING! " + msg);
394    }
395    throw new FileSystemVersionException(msg);
396  }
397
398  /**
399   * Sets version of file system
400   * @param fs      filesystem object
401   * @param rootdir hbase root
402   * @throws IOException e
403   */
404  public static void setVersion(FileSystem fs, Path rootdir) throws IOException {
405    setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, 0,
406      HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
407  }
408
409  /**
410   * Sets version of file system
411   * @param fs      filesystem object
412   * @param rootdir hbase root
413   * @param wait    time to wait for retry
414   * @param retries number of times to retry before failing
415   * @throws IOException e
416   */
417  public static void setVersion(FileSystem fs, Path rootdir, int wait, int retries)
418    throws IOException {
419    setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, wait, retries);
420  }
421
422  /**
423   * Sets version of file system
424   * @param fs      filesystem object
425   * @param rootdir hbase root directory
426   * @param version version to set
427   * @param wait    time to wait for retry
428   * @param retries number of times to retry before throwing an IOException
429   * @throws IOException e
430   */
431  public static void setVersion(FileSystem fs, Path rootdir, String version, int wait, int retries)
432    throws IOException {
433    Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
434    Path tempVersionFile = new Path(rootdir,
435      HConstants.HBASE_TEMP_DIRECTORY + Path.SEPARATOR + HConstants.VERSION_FILE_NAME);
436    while (true) {
437      try {
438        // Write the version to a temporary file
439        FSDataOutputStream s = fs.create(tempVersionFile);
440        try {
441          s.write(toVersionByteArray(version));
442          s.close();
443          s = null;
444          // Move the temp version file to its normal location. Returns false
445          // if the rename failed. Throw an IOE in that case.
446          if (!fs.rename(tempVersionFile, versionFile)) {
447            throw new IOException("Unable to move temp version file to " + versionFile);
448          }
449        } finally {
450          // Cleaning up the temporary if the rename failed would be trying
451          // too hard. We'll unconditionally create it again the next time
452          // through anyway, files are overwritten by default by create().
453
454          // Attempt to close the stream on the way out if it is still open.
455          try {
456            if (s != null) s.close();
457          } catch (IOException ignore) {
458          }
459        }
460        LOG.info("Created version file at " + rootdir.toString() + " with version=" + version);
461        return;
462      } catch (IOException e) {
463        if (retries > 0) {
464          LOG.debug("Unable to create version file at " + rootdir.toString() + ", retrying", e);
465          fs.delete(versionFile, false);
466          try {
467            if (wait > 0) {
468              Thread.sleep(wait);
469            }
470          } catch (InterruptedException ie) {
471            throw (InterruptedIOException) new InterruptedIOException().initCause(ie);
472          }
473          retries--;
474        } else {
475          throw e;
476        }
477      }
478    }
479  }
480
481  /**
482   * Checks that a cluster ID file exists in the HBase root directory
483   * @param fs      the root directory FileSystem
484   * @param rootdir the HBase root directory in HDFS
485   * @param wait    how long to wait between retries
486   * @return <code>true</code> if the file exists, otherwise <code>false</code>
487   * @throws IOException if checking the FileSystem fails
488   */
489  public static boolean checkClusterIdExists(FileSystem fs, Path rootdir, long wait)
490    throws IOException {
491    while (true) {
492      try {
493        Path filePath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
494        return fs.exists(filePath);
495      } catch (IOException ioe) {
496        if (wait > 0L) {
497          LOG.warn("Unable to check cluster ID file in {}, retrying in {}ms", rootdir, wait, ioe);
498          try {
499            Thread.sleep(wait);
500          } catch (InterruptedException e) {
501            Thread.currentThread().interrupt();
502            throw (InterruptedIOException) new InterruptedIOException().initCause(e);
503          }
504        } else {
505          throw ioe;
506        }
507      }
508    }
509  }
510
511  /**
512   * Returns the value of the unique cluster ID stored for this HBase instance.
513   * @param fs      the root directory FileSystem
514   * @param rootdir the path to the HBase root directory
515   * @return the unique cluster identifier
516   * @throws IOException if reading the cluster ID file fails
517   */
518  public static ClusterId getClusterId(FileSystem fs, Path rootdir) throws IOException {
519    Path idPath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
520    ClusterId clusterId = null;
521    FileStatus status = fs.exists(idPath) ? fs.getFileStatus(idPath) : null;
522    if (status != null) {
523      int len = Ints.checkedCast(status.getLen());
524      byte[] content = new byte[len];
525      FSDataInputStream in = fs.open(idPath);
526      try {
527        in.readFully(content);
528      } catch (EOFException eof) {
529        LOG.warn("Cluster ID file {} is empty", idPath);
530      } finally {
531        in.close();
532      }
533      try {
534        clusterId = ClusterId.parseFrom(content);
535      } catch (DeserializationException e) {
536        throw new IOException("content=" + Bytes.toString(content), e);
537      }
538      // If not pb'd, make it so.
539      if (!ProtobufUtil.isPBMagicPrefix(content)) {
540        String cid = null;
541        in = fs.open(idPath);
542        try {
543          cid = in.readUTF();
544          clusterId = new ClusterId(cid);
545        } catch (EOFException eof) {
546          LOG.warn("Cluster ID file {} is empty", idPath);
547        } finally {
548          in.close();
549        }
550        rewriteAsPb(fs, rootdir, idPath, clusterId);
551      }
552      return clusterId;
553    } else {
554      LOG.warn("Cluster ID file does not exist at {}", idPath);
555    }
556    return clusterId;
557  }
558
559  /**
560   *   */
561  private static void rewriteAsPb(final FileSystem fs, final Path rootdir, final Path p,
562    final ClusterId cid) throws IOException {
563    // Rewrite the file as pb. Move aside the old one first, write new
564    // then delete the moved-aside file.
565    Path movedAsideName = new Path(p + "." + EnvironmentEdgeManager.currentTime());
566    if (!fs.rename(p, movedAsideName)) throw new IOException("Failed rename of " + p);
567    setClusterId(fs, rootdir, cid, 100);
568    if (!fs.delete(movedAsideName, false)) {
569      throw new IOException("Failed delete of " + movedAsideName);
570    }
571    LOG.debug("Rewrote the hbase.id file as pb");
572  }
573
574  /**
575   * Writes a new unique identifier for this cluster to the "hbase.id" file in the HBase root
576   * directory. If any operations on the ID file fails, and {@code wait} is a positive value, the
577   * method will retry to produce the ID file until the thread is forcibly interrupted.
578   * @param fs        the root directory FileSystem
579   * @param rootdir   the path to the HBase root directory
580   * @param clusterId the unique identifier to store
581   * @param wait      how long (in milliseconds) to wait between retries
582   * @throws IOException if writing to the FileSystem fails and no wait value
583   */
584  public static void setClusterId(final FileSystem fs, final Path rootdir,
585    final ClusterId clusterId, final long wait) throws IOException {
586
587    final Path idFile = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
588    final Path tempDir = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY);
589    final Path tempIdFile = new Path(tempDir, HConstants.CLUSTER_ID_FILE_NAME);
590
591    LOG.debug("Create cluster ID file [{}] with ID: {}", idFile, clusterId);
592
593    while (true) {
594      Optional<IOException> failure = Optional.empty();
595
596      LOG.debug("Write the cluster ID file to a temporary location: {}", tempIdFile);
597      try (FSDataOutputStream s = fs.create(tempIdFile)) {
598        s.write(clusterId.toByteArray());
599      } catch (IOException ioe) {
600        failure = Optional.of(ioe);
601      }
602
603      if (!failure.isPresent()) {
604        try {
605          LOG.debug("Move the temporary cluster ID file to its target location [{}]:[{}]",
606            tempIdFile, idFile);
607
608          if (!fs.rename(tempIdFile, idFile)) {
609            failure =
610              Optional.of(new IOException("Unable to move temp cluster ID file to " + idFile));
611          }
612        } catch (IOException ioe) {
613          failure = Optional.of(ioe);
614        }
615      }
616
617      if (failure.isPresent()) {
618        final IOException cause = failure.get();
619        if (wait > 0L) {
620          LOG.warn("Unable to create cluster ID file in {}, retrying in {}ms", rootdir, wait,
621            cause);
622          try {
623            Thread.sleep(wait);
624          } catch (InterruptedException e) {
625            Thread.currentThread().interrupt();
626            throw (InterruptedIOException) new InterruptedIOException().initCause(e);
627          }
628          continue;
629        } else {
630          throw cause;
631        }
632      } else {
633        return;
634      }
635    }
636  }
637
638  /**
639   * If DFS, check safe mode and if so, wait until we clear it.
640   * @param conf configuration
641   * @param wait Sleep between retries
642   * @throws IOException e
643   */
644  public static void waitOnSafeMode(final Configuration conf, final long wait) throws IOException {
645    FileSystem fs = FileSystem.get(conf);
646    if (!(fs instanceof DistributedFileSystem)) return;
647    DistributedFileSystem dfs = (DistributedFileSystem) fs;
648    // Make sure dfs is not in safe mode
649    while (isInSafeMode(dfs)) {
650      LOG.info("Waiting for dfs to exit safe mode...");
651      try {
652        Thread.sleep(wait);
653      } catch (InterruptedException e) {
654        Thread.currentThread().interrupt();
655        throw (InterruptedIOException) new InterruptedIOException().initCause(e);
656      }
657    }
658  }
659
660  /**
661   * Checks if meta region exists
662   * @param fs      file system
663   * @param rootDir root directory of HBase installation
664   * @return true if exists
665   */
666  public static boolean metaRegionExists(FileSystem fs, Path rootDir) throws IOException {
667    Path metaRegionDir = getRegionDirFromRootDir(rootDir, RegionInfoBuilder.FIRST_META_REGIONINFO);
668    return fs.exists(metaRegionDir);
669  }
670
671  /**
672   * Compute HDFS block distribution of a given HdfsDataInputStream. All HdfsDataInputStreams are
673   * backed by a series of LocatedBlocks, which are fetched periodically from the namenode. This
674   * method retrieves those blocks from the input stream and uses them to calculate
675   * HDFSBlockDistribution. The underlying method in DFSInputStream does attempt to use locally
676   * cached blocks, but may hit the namenode if the cache is determined to be incomplete. The method
677   * also involves making copies of all LocatedBlocks rather than return the underlying blocks
678   * themselves.
679   */
680  static public HDFSBlocksDistribution
681    computeHDFSBlocksDistribution(HdfsDataInputStream inputStream) throws IOException {
682    List<LocatedBlock> blocks = inputStream.getAllBlocks();
683    HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution();
684    for (LocatedBlock block : blocks) {
685      String[] hosts = getHostsForLocations(block);
686      long len = block.getBlockSize();
687      StorageType[] storageTypes = block.getStorageTypes();
688      blocksDistribution.addHostsAndBlockWeight(hosts, len, storageTypes);
689    }
690    return blocksDistribution;
691  }
692
693  private static String[] getHostsForLocations(LocatedBlock block) {
694    DatanodeInfo[] locations = block.getLocations();
695    String[] hosts = new String[locations.length];
696    for (int i = 0; i < hosts.length; i++) {
697      hosts[i] = locations[i].getHostName();
698    }
699    return hosts;
700  }
701
702  /**
703   * Compute HDFS blocks distribution of a given file, or a portion of the file
704   * @param fs     file system
705   * @param status file status of the file
706   * @param start  start position of the portion
707   * @param length length of the portion
708   * @return The HDFS blocks distribution
709   */
710  static public HDFSBlocksDistribution computeHDFSBlocksDistribution(final FileSystem fs,
711    FileStatus status, long start, long length) throws IOException {
712    HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution();
713    BlockLocation[] blockLocations = fs.getFileBlockLocations(status, start, length);
714    addToHDFSBlocksDistribution(blocksDistribution, blockLocations);
715    return blocksDistribution;
716  }
717
718  /**
719   * Update blocksDistribution with blockLocations
720   * @param blocksDistribution the hdfs blocks distribution
721   * @param blockLocations     an array containing block location
722   */
723  static public void addToHDFSBlocksDistribution(HDFSBlocksDistribution blocksDistribution,
724    BlockLocation[] blockLocations) throws IOException {
725    for (BlockLocation bl : blockLocations) {
726      String[] hosts = bl.getHosts();
727      long len = bl.getLength();
728      StorageType[] storageTypes = bl.getStorageTypes();
729      blocksDistribution.addHostsAndBlockWeight(hosts, len, storageTypes);
730    }
731  }
732
733  // TODO move this method OUT of FSUtils. No dependencies to HMaster
734  /**
735   * Returns the total overall fragmentation percentage. Includes hbase:meta and -ROOT- as well.
736   * @param master The master defining the HBase root and file system
737   * @return A map for each table and its percentage (never null)
738   * @throws IOException When scanning the directory fails
739   */
740  public static int getTotalTableFragmentation(final HMaster master) throws IOException {
741    Map<String, Integer> map = getTableFragmentation(master);
742    return map.isEmpty() ? -1 : map.get("-TOTAL-");
743  }
744
745  /**
746   * Runs through the HBase rootdir and checks how many stores for each table have more than one
747   * file in them. Checks -ROOT- and hbase:meta too. The total percentage across all tables is
748   * stored under the special key "-TOTAL-".
749   * @param master The master defining the HBase root and file system.
750   * @return A map for each table and its percentage (never null).
751   * @throws IOException When scanning the directory fails.
752   */
753  public static Map<String, Integer> getTableFragmentation(final HMaster master)
754    throws IOException {
755    Path path = CommonFSUtils.getRootDir(master.getConfiguration());
756    // since HMaster.getFileSystem() is package private
757    FileSystem fs = path.getFileSystem(master.getConfiguration());
758    return getTableFragmentation(fs, path);
759  }
760
761  /**
762   * Runs through the HBase rootdir and checks how many stores for each table have more than one
763   * file in them. Checks -ROOT- and hbase:meta too. The total percentage across all tables is
764   * stored under the special key "-TOTAL-".
765   * @param fs           The file system to use
766   * @param hbaseRootDir The root directory to scan
767   * @return A map for each table and its percentage (never null)
768   * @throws IOException When scanning the directory fails
769   */
770  public static Map<String, Integer> getTableFragmentation(final FileSystem fs,
771    final Path hbaseRootDir) throws IOException {
772    Map<String, Integer> frags = new HashMap<>();
773    int cfCountTotal = 0;
774    int cfFragTotal = 0;
775    PathFilter regionFilter = new RegionDirFilter(fs);
776    PathFilter familyFilter = new FamilyDirFilter(fs);
777    List<Path> tableDirs = getTableDirs(fs, hbaseRootDir);
778    for (Path d : tableDirs) {
779      int cfCount = 0;
780      int cfFrag = 0;
781      FileStatus[] regionDirs = fs.listStatus(d, regionFilter);
782      for (FileStatus regionDir : regionDirs) {
783        Path dd = regionDir.getPath();
784        // else its a region name, now look in region for families
785        FileStatus[] familyDirs = fs.listStatus(dd, familyFilter);
786        for (FileStatus familyDir : familyDirs) {
787          cfCount++;
788          cfCountTotal++;
789          Path family = familyDir.getPath();
790          // now in family make sure only one file
791          FileStatus[] familyStatus = fs.listStatus(family);
792          if (familyStatus.length > 1) {
793            cfFrag++;
794            cfFragTotal++;
795          }
796        }
797      }
798      // compute percentage per table and store in result list
799      frags.put(CommonFSUtils.getTableName(d).getNameAsString(),
800        cfCount == 0 ? 0 : Math.round((float) cfFrag / cfCount * 100));
801    }
802    // set overall percentage for all tables
803    frags.put("-TOTAL-",
804      cfCountTotal == 0 ? 0 : Math.round((float) cfFragTotal / cfCountTotal * 100));
805    return frags;
806  }
807
808  public static void renameFile(FileSystem fs, Path src, Path dst) throws IOException {
809    if (fs.exists(dst) && !fs.delete(dst, false)) {
810      throw new IOException("Can not delete " + dst);
811    }
812    if (!fs.rename(src, dst)) {
813      throw new IOException("Can not rename from " + src + " to " + dst);
814    }
815  }
816
817  /**
818   * A {@link PathFilter} that returns only regular files.
819   */
820  static class FileFilter extends AbstractFileStatusFilter {
821    private final FileSystem fs;
822
823    public FileFilter(final FileSystem fs) {
824      this.fs = fs;
825    }
826
827    @Override
828    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
829      try {
830        return isFile(fs, isDir, p);
831      } catch (IOException e) {
832        LOG.warn("Unable to verify if path={} is a regular file", p, e);
833        return false;
834      }
835    }
836  }
837
838  /**
839   * Directory filter that doesn't include any of the directories in the specified blacklist
840   */
841  public static class BlackListDirFilter extends AbstractFileStatusFilter {
842    private final FileSystem fs;
843    private List<String> blacklist;
844
845    /**
846     * Create a filter on the givem filesystem with the specified blacklist
847     * @param fs                     filesystem to filter
848     * @param directoryNameBlackList list of the names of the directories to filter. If
849     *                               <tt>null</tt>, all directories are returned
850     */
851    @SuppressWarnings("unchecked")
852    public BlackListDirFilter(final FileSystem fs, final List<String> directoryNameBlackList) {
853      this.fs = fs;
854      blacklist = (List<String>) (directoryNameBlackList == null
855        ? Collections.emptyList()
856        : directoryNameBlackList);
857    }
858
859    @Override
860    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
861      if (!isValidName(p.getName())) {
862        return false;
863      }
864
865      try {
866        return isDirectory(fs, isDir, p);
867      } catch (IOException e) {
868        LOG.warn("An error occurred while verifying if [{}] is a valid directory."
869          + " Returning 'not valid' and continuing.", p, e);
870        return false;
871      }
872    }
873
874    protected boolean isValidName(final String name) {
875      return !blacklist.contains(name);
876    }
877  }
878
879  /**
880   * A {@link PathFilter} that only allows directories.
881   */
882  public static class DirFilter extends BlackListDirFilter {
883
884    public DirFilter(FileSystem fs) {
885      super(fs, null);
886    }
887  }
888
889  /**
890   * A {@link PathFilter} that returns usertable directories. To get all directories use the
891   * {@link BlackListDirFilter} with a <tt>null</tt> blacklist
892   */
893  public static class UserTableDirFilter extends BlackListDirFilter {
894    public UserTableDirFilter(FileSystem fs) {
895      super(fs, HConstants.HBASE_NON_TABLE_DIRS);
896    }
897
898    @Override
899    protected boolean isValidName(final String name) {
900      if (!super.isValidName(name)) return false;
901
902      try {
903        TableName.isLegalTableQualifierName(Bytes.toBytes(name));
904      } catch (IllegalArgumentException e) {
905        LOG.info("Invalid table name: {}", name);
906        return false;
907      }
908      return true;
909    }
910  }
911
912  public static List<Path> getTableDirs(final FileSystem fs, final Path rootdir)
913    throws IOException {
914    List<Path> tableDirs = new ArrayList<>();
915    Path baseNamespaceDir = new Path(rootdir, HConstants.BASE_NAMESPACE_DIR);
916    if (fs.exists(baseNamespaceDir)) {
917      for (FileStatus status : fs.globStatus(new Path(baseNamespaceDir, "*"))) {
918        tableDirs.addAll(FSUtils.getLocalTableDirs(fs, status.getPath()));
919      }
920    }
921    return tableDirs;
922  }
923
924  /**
925   * @return All the table directories under <code>rootdir</code>. Ignore non table hbase folders
926   *         such as .logs, .oldlogs, .corrupt folders.
927   */
928  public static List<Path> getLocalTableDirs(final FileSystem fs, final Path rootdir)
929    throws IOException {
930    // presumes any directory under hbase.rootdir is a table
931    FileStatus[] dirs = fs.listStatus(rootdir, new UserTableDirFilter(fs));
932    List<Path> tabledirs = new ArrayList<>(dirs.length);
933    for (FileStatus dir : dirs) {
934      tabledirs.add(dir.getPath());
935    }
936    return tabledirs;
937  }
938
939  /**
940   * Filter for all dirs that don't start with '.'
941   */
942  public static class RegionDirFilter extends AbstractFileStatusFilter {
943    // This pattern will accept 0.90+ style hex region dirs and older numeric region dir names.
944    final public static Pattern regionDirPattern = Pattern.compile("^[0-9a-f]*$");
945    final FileSystem fs;
946
947    public RegionDirFilter(FileSystem fs) {
948      this.fs = fs;
949    }
950
951    @Override
952    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
953      if (!regionDirPattern.matcher(p.getName()).matches()) {
954        return false;
955      }
956
957      try {
958        return isDirectory(fs, isDir, p);
959      } catch (IOException ioe) {
960        // Maybe the file was moved or the fs was disconnected.
961        LOG.warn("Skipping file {} due to IOException", p, ioe);
962        return false;
963      }
964    }
965  }
966
967  /**
968   * Given a particular table dir, return all the regiondirs inside it, excluding files such as
969   * .tableinfo
970   * @param fs       A file system for the Path
971   * @param tableDir Path to a specific table directory &lt;hbase.rootdir&gt;/&lt;tabledir&gt;
972   * @return List of paths to valid region directories in table dir.
973   */
974  public static List<Path> getRegionDirs(final FileSystem fs, final Path tableDir)
975    throws IOException {
976    // assumes we are in a table dir.
977    List<FileStatus> rds = listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs));
978    if (rds == null) {
979      return Collections.emptyList();
980    }
981    List<Path> regionDirs = new ArrayList<>(rds.size());
982    for (FileStatus rdfs : rds) {
983      Path rdPath = rdfs.getPath();
984      regionDirs.add(rdPath);
985    }
986    return regionDirs;
987  }
988
989  public static Path getRegionDirFromRootDir(Path rootDir, RegionInfo region) {
990    return getRegionDirFromTableDir(CommonFSUtils.getTableDir(rootDir, region.getTable()), region);
991  }
992
993  public static Path getRegionDirFromTableDir(Path tableDir, RegionInfo region) {
994    return getRegionDirFromTableDir(tableDir,
995      ServerRegionReplicaUtil.getRegionInfoForFs(region).getEncodedName());
996  }
997
998  public static Path getRegionDirFromTableDir(Path tableDir, String encodedRegionName) {
999    return new Path(tableDir, encodedRegionName);
1000  }
1001
1002  /**
1003   * Filter for all dirs that are legal column family names. This is generally used for colfam dirs
1004   * &lt;hbase.rootdir&gt;/&lt;tabledir&gt;/&lt;regiondir&gt;/&lt;colfamdir&gt;.
1005   */
1006  public static class FamilyDirFilter extends AbstractFileStatusFilter {
1007    final FileSystem fs;
1008
1009    public FamilyDirFilter(FileSystem fs) {
1010      this.fs = fs;
1011    }
1012
1013    @Override
1014    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1015      try {
1016        // throws IAE if invalid
1017        ColumnFamilyDescriptorBuilder.isLegalColumnFamilyName(Bytes.toBytes(p.getName()));
1018      } catch (IllegalArgumentException iae) {
1019        // path name is an invalid family name and thus is excluded.
1020        return false;
1021      }
1022
1023      try {
1024        return isDirectory(fs, isDir, p);
1025      } catch (IOException ioe) {
1026        // Maybe the file was moved or the fs was disconnected.
1027        LOG.warn("Skipping file {} due to IOException", p, ioe);
1028        return false;
1029      }
1030    }
1031  }
1032
1033  /**
1034   * Given a particular region dir, return all the familydirs inside it
1035   * @param fs        A file system for the Path
1036   * @param regionDir Path to a specific region directory
1037   * @return List of paths to valid family directories in region dir.
1038   */
1039  public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir)
1040    throws IOException {
1041    // assumes we are in a region dir.
1042    return getFilePaths(fs, regionDir, new FamilyDirFilter(fs));
1043  }
1044
1045  public static List<Path> getReferenceFilePaths(final FileSystem fs, final Path familyDir)
1046    throws IOException {
1047    return getFilePaths(fs, familyDir, new ReferenceFileFilter(fs));
1048  }
1049
1050  public static List<Path> getReferenceAndLinkFilePaths(final FileSystem fs, final Path familyDir)
1051    throws IOException {
1052    return getFilePaths(fs, familyDir, new ReferenceAndLinkFileFilter(fs));
1053  }
1054
1055  private static List<Path> getFilePaths(final FileSystem fs, final Path dir,
1056    final PathFilter pathFilter) throws IOException {
1057    FileStatus[] fds = fs.listStatus(dir, pathFilter);
1058    List<Path> files = new ArrayList<>(fds.length);
1059    for (FileStatus fdfs : fds) {
1060      Path fdPath = fdfs.getPath();
1061      files.add(fdPath);
1062    }
1063    return files;
1064  }
1065
1066  public static int getRegionReferenceAndLinkFileCount(final FileSystem fs, final Path p) {
1067    int result = 0;
1068    try {
1069      for (Path familyDir : getFamilyDirs(fs, p)) {
1070        result += getReferenceAndLinkFilePaths(fs, familyDir).size();
1071      }
1072    } catch (IOException e) {
1073      LOG.warn("Error Counting reference files.", e);
1074    }
1075    return result;
1076  }
1077
1078  public static class ReferenceAndLinkFileFilter implements PathFilter {
1079
1080    private final FileSystem fs;
1081
1082    public ReferenceAndLinkFileFilter(FileSystem fs) {
1083      this.fs = fs;
1084    }
1085
1086    @Override
1087    public boolean accept(Path rd) {
1088      try {
1089        // only files can be references.
1090        return !fs.getFileStatus(rd).isDirectory()
1091          && (StoreFileInfo.isReference(rd) || HFileLink.isHFileLink(rd));
1092      } catch (IOException ioe) {
1093        // Maybe the file was moved or the fs was disconnected.
1094        LOG.warn("Skipping file " + rd + " due to IOException", ioe);
1095        return false;
1096      }
1097    }
1098  }
1099
1100  /**
1101   * Filter for HFiles that excludes reference files.
1102   */
1103  public static class HFileFilter extends AbstractFileStatusFilter {
1104    final FileSystem fs;
1105
1106    public HFileFilter(FileSystem fs) {
1107      this.fs = fs;
1108    }
1109
1110    @Override
1111    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1112      if (!StoreFileInfo.isHFile(p) && !StoreFileInfo.isMobFile(p)) {
1113        return false;
1114      }
1115
1116      try {
1117        return isFile(fs, isDir, p);
1118      } catch (IOException ioe) {
1119        // Maybe the file was moved or the fs was disconnected.
1120        LOG.warn("Skipping file {} due to IOException", p, ioe);
1121        return false;
1122      }
1123    }
1124  }
1125
1126  /**
1127   * Filter for HFileLinks (StoreFiles and HFiles not included). the filter itself does not consider
1128   * if a link is file or not.
1129   */
1130  public static class HFileLinkFilter implements PathFilter {
1131
1132    @Override
1133    public boolean accept(Path p) {
1134      return HFileLink.isHFileLink(p);
1135    }
1136  }
1137
1138  public static class ReferenceFileFilter extends AbstractFileStatusFilter {
1139
1140    private final FileSystem fs;
1141
1142    public ReferenceFileFilter(FileSystem fs) {
1143      this.fs = fs;
1144    }
1145
1146    @Override
1147    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1148      if (!StoreFileInfo.isReference(p)) {
1149        return false;
1150      }
1151
1152      try {
1153        // only files can be references.
1154        return isFile(fs, isDir, p);
1155      } catch (IOException ioe) {
1156        // Maybe the file was moved or the fs was disconnected.
1157        LOG.warn("Skipping file {} due to IOException", p, ioe);
1158        return false;
1159      }
1160    }
1161  }
1162
1163  /**
1164   * Called every so-often by storefile map builder getTableStoreFilePathMap to report progress.
1165   */
1166  interface ProgressReporter {
1167    /**
1168     * @param status File or directory we are about to process.
1169     */
1170    void progress(FileStatus status);
1171  }
1172
1173  /**
1174   * Runs through the HBase rootdir/tablename and creates a reverse lookup map for table StoreFile
1175   * names to the full Path. <br>
1176   * Example...<br>
1177   * Key = 3944417774205889744 <br>
1178   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1179   * @param map          map to add values. If null, this method will create and populate one to
1180   *                     return
1181   * @param fs           The file system to use.
1182   * @param hbaseRootDir The root directory to scan.
1183   * @param tableName    name of the table to scan.
1184   * @return Map keyed by StoreFile name with a value of the full Path.
1185   * @throws IOException When scanning the directory fails.
1186   */
1187  public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map,
1188    final FileSystem fs, final Path hbaseRootDir, TableName tableName)
1189    throws IOException, InterruptedException {
1190    return getTableStoreFilePathMap(map, fs, hbaseRootDir, tableName, null, null,
1191      (ProgressReporter) null);
1192  }
1193
1194  /**
1195   * Runs through the HBase rootdir/tablename and creates a reverse lookup map for table StoreFile
1196   * names to the full Path. Note that because this method can be called on a 'live' HBase system
1197   * that we will skip files that no longer exist by the time we traverse them and similarly the
1198   * user of the result needs to consider that some entries in this map may not exist by the time
1199   * this call completes. <br>
1200   * Example...<br>
1201   * Key = 3944417774205889744 <br>
1202   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1203   * @param resultMap        map to add values. If null, this method will create and populate one to
1204   *                         return
1205   * @param fs               The file system to use.
1206   * @param hbaseRootDir     The root directory to scan.
1207   * @param tableName        name of the table to scan.
1208   * @param sfFilter         optional path filter to apply to store files
1209   * @param executor         optional executor service to parallelize this operation
1210   * @param progressReporter Instance or null; gets called every time we move to new region of
1211   *                         family dir and for each store file.
1212   * @return Map keyed by StoreFile name with a value of the full Path.
1213   * @throws IOException When scanning the directory fails.
1214   * @deprecated Since 2.3.0. For removal in hbase4. Use ProgressReporter override instead.
1215   */
1216  @Deprecated
1217  public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap,
1218    final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter,
1219    ExecutorService executor, final HbckErrorReporter progressReporter)
1220    throws IOException, InterruptedException {
1221    return getTableStoreFilePathMap(resultMap, fs, hbaseRootDir, tableName, sfFilter, executor,
1222      new ProgressReporter() {
1223        @Override
1224        public void progress(FileStatus status) {
1225          // status is not used in this implementation.
1226          progressReporter.progress();
1227        }
1228      });
1229  }
1230
1231  /**
1232   * Runs through the HBase rootdir/tablename and creates a reverse lookup map for table StoreFile
1233   * names to the full Path. Note that because this method can be called on a 'live' HBase system
1234   * that we will skip files that no longer exist by the time we traverse them and similarly the
1235   * user of the result needs to consider that some entries in this map may not exist by the time
1236   * this call completes. <br>
1237   * Example...<br>
1238   * Key = 3944417774205889744 <br>
1239   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1240   * @param resultMap        map to add values. If null, this method will create and populate one to
1241   *                         return
1242   * @param fs               The file system to use.
1243   * @param hbaseRootDir     The root directory to scan.
1244   * @param tableName        name of the table to scan.
1245   * @param sfFilter         optional path filter to apply to store files
1246   * @param executor         optional executor service to parallelize this operation
1247   * @param progressReporter Instance or null; gets called every time we move to new region of
1248   *                         family dir and for each store file.
1249   * @return Map keyed by StoreFile name with a value of the full Path.
1250   * @throws IOException          When scanning the directory fails.
1251   * @throws InterruptedException the thread is interrupted, either before or during the activity.
1252   */
1253  public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap,
1254    final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter,
1255    ExecutorService executor, final ProgressReporter progressReporter)
1256    throws IOException, InterruptedException {
1257
1258    final Map<String, Path> finalResultMap =
1259      resultMap == null ? new ConcurrentHashMap<>(128, 0.75f, 32) : resultMap;
1260
1261    // only include the directory paths to tables
1262    Path tableDir = CommonFSUtils.getTableDir(hbaseRootDir, tableName);
1263    // Inside a table, there are compaction.dir directories to skip. Otherwise, all else
1264    // should be regions.
1265    final FamilyDirFilter familyFilter = new FamilyDirFilter(fs);
1266    final Vector<Exception> exceptions = new Vector<>();
1267
1268    try {
1269      List<FileStatus> regionDirs =
1270        FSUtils.listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs));
1271      if (regionDirs == null) {
1272        return finalResultMap;
1273      }
1274
1275      final List<Future<?>> futures = new ArrayList<>(regionDirs.size());
1276
1277      for (FileStatus regionDir : regionDirs) {
1278        if (null != progressReporter) {
1279          progressReporter.progress(regionDir);
1280        }
1281        final Path dd = regionDir.getPath();
1282
1283        if (!exceptions.isEmpty()) {
1284          break;
1285        }
1286
1287        Runnable getRegionStoreFileMapCall = new Runnable() {
1288          @Override
1289          public void run() {
1290            try {
1291              HashMap<String, Path> regionStoreFileMap = new HashMap<>();
1292              List<FileStatus> familyDirs =
1293                FSUtils.listStatusWithStatusFilter(fs, dd, familyFilter);
1294              if (familyDirs == null) {
1295                if (!fs.exists(dd)) {
1296                  LOG.warn("Skipping region because it no longer exists: " + dd);
1297                } else {
1298                  LOG.warn("Skipping region because it has no family dirs: " + dd);
1299                }
1300                return;
1301              }
1302              for (FileStatus familyDir : familyDirs) {
1303                if (null != progressReporter) {
1304                  progressReporter.progress(familyDir);
1305                }
1306                Path family = familyDir.getPath();
1307                if (family.getName().equals(HConstants.RECOVERED_EDITS_DIR)) {
1308                  continue;
1309                }
1310                // now in family, iterate over the StoreFiles and
1311                // put in map
1312                FileStatus[] familyStatus = fs.listStatus(family);
1313                for (FileStatus sfStatus : familyStatus) {
1314                  if (null != progressReporter) {
1315                    progressReporter.progress(sfStatus);
1316                  }
1317                  Path sf = sfStatus.getPath();
1318                  if (sfFilter == null || sfFilter.accept(sf)) {
1319                    regionStoreFileMap.put(sf.getName(), sf);
1320                  }
1321                }
1322              }
1323              finalResultMap.putAll(regionStoreFileMap);
1324            } catch (Exception e) {
1325              LOG.error("Could not get region store file map for region: " + dd, e);
1326              exceptions.add(e);
1327            }
1328          }
1329        };
1330
1331        // If executor is available, submit async tasks to exec concurrently, otherwise
1332        // just do serial sync execution
1333        if (executor != null) {
1334          Future<?> future = executor.submit(getRegionStoreFileMapCall);
1335          futures.add(future);
1336        } else {
1337          FutureTask<?> future = new FutureTask<>(getRegionStoreFileMapCall, null);
1338          future.run();
1339          futures.add(future);
1340        }
1341      }
1342
1343      // Ensure all pending tasks are complete (or that we run into an exception)
1344      for (Future<?> f : futures) {
1345        if (!exceptions.isEmpty()) {
1346          break;
1347        }
1348        try {
1349          f.get();
1350        } catch (ExecutionException e) {
1351          LOG.error("Unexpected exec exception!  Should've been caught already.  (Bug?)", e);
1352          // Shouldn't happen, we already logged/caught any exceptions in the Runnable
1353        }
1354      }
1355    } catch (IOException e) {
1356      LOG.error("Cannot execute getTableStoreFilePathMap for " + tableName, e);
1357      exceptions.add(e);
1358    } finally {
1359      if (!exceptions.isEmpty()) {
1360        // Just throw the first exception as an indication something bad happened
1361        // Don't need to propagate all the exceptions, we already logged them all anyway
1362        Throwables.propagateIfPossible(exceptions.firstElement(), IOException.class);
1363        throw new IOException(exceptions.firstElement());
1364      }
1365    }
1366
1367    return finalResultMap;
1368  }
1369
1370  public static int getRegionReferenceFileCount(final FileSystem fs, final Path p) {
1371    int result = 0;
1372    try {
1373      for (Path familyDir : getFamilyDirs(fs, p)) {
1374        result += getReferenceFilePaths(fs, familyDir).size();
1375      }
1376    } catch (IOException e) {
1377      LOG.warn("Error counting reference files", e);
1378    }
1379    return result;
1380  }
1381
1382  /**
1383   * Runs through the HBase rootdir and creates a reverse lookup map for table StoreFile names to
1384   * the full Path. <br>
1385   * Example...<br>
1386   * Key = 3944417774205889744 <br>
1387   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1388   * @param fs           The file system to use.
1389   * @param hbaseRootDir The root directory to scan.
1390   * @return Map keyed by StoreFile name with a value of the full Path.
1391   * @throws IOException When scanning the directory fails.
1392   */
1393  public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs,
1394    final Path hbaseRootDir) throws IOException, InterruptedException {
1395    return getTableStoreFilePathMap(fs, hbaseRootDir, null, null, (ProgressReporter) null);
1396  }
1397
1398  /**
1399   * Runs through the HBase rootdir and creates a reverse lookup map for table StoreFile names to
1400   * the full Path. <br>
1401   * Example...<br>
1402   * Key = 3944417774205889744 <br>
1403   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1404   * @param fs               The file system to use.
1405   * @param hbaseRootDir     The root directory to scan.
1406   * @param sfFilter         optional path filter to apply to store files
1407   * @param executor         optional executor service to parallelize this operation
1408   * @param progressReporter Instance or null; gets called every time we move to new region of
1409   *                         family dir and for each store file.
1410   * @return Map keyed by StoreFile name with a value of the full Path.
1411   * @throws IOException When scanning the directory fails.
1412   * @deprecated Since 2.3.0. Will be removed in hbase4. Used
1413   *             {@link #getTableStoreFilePathMap(FileSystem, Path, PathFilter, ExecutorService, ProgressReporter)}
1414   */
1415  @Deprecated
1416  public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs,
1417    final Path hbaseRootDir, PathFilter sfFilter, ExecutorService executor,
1418    HbckErrorReporter progressReporter) throws IOException, InterruptedException {
1419    return getTableStoreFilePathMap(fs, hbaseRootDir, sfFilter, executor, new ProgressReporter() {
1420      @Override
1421      public void progress(FileStatus status) {
1422        // status is not used in this implementation.
1423        progressReporter.progress();
1424      }
1425    });
1426  }
1427
1428  /**
1429   * Runs through the HBase rootdir and creates a reverse lookup map for table StoreFile names to
1430   * the full Path. <br>
1431   * Example...<br>
1432   * Key = 3944417774205889744 <br>
1433   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1434   * @param fs               The file system to use.
1435   * @param hbaseRootDir     The root directory to scan.
1436   * @param sfFilter         optional path filter to apply to store files
1437   * @param executor         optional executor service to parallelize this operation
1438   * @param progressReporter Instance or null; gets called every time we move to new region of
1439   *                         family dir and for each store file.
1440   * @return Map keyed by StoreFile name with a value of the full Path.
1441   * @throws IOException When scanning the directory fails.
1442   */
1443  public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs,
1444    final Path hbaseRootDir, PathFilter sfFilter, ExecutorService executor,
1445    ProgressReporter progressReporter) throws IOException, InterruptedException {
1446    ConcurrentHashMap<String, Path> map = new ConcurrentHashMap<>(1024, 0.75f, 32);
1447
1448    // if this method looks similar to 'getTableFragmentation' that is because
1449    // it was borrowed from it.
1450
1451    // only include the directory paths to tables
1452    for (Path tableDir : FSUtils.getTableDirs(fs, hbaseRootDir)) {
1453      getTableStoreFilePathMap(map, fs, hbaseRootDir, CommonFSUtils.getTableName(tableDir),
1454        sfFilter, executor, progressReporter);
1455    }
1456    return map;
1457  }
1458
1459  /**
1460   * Filters FileStatuses in an array and returns a list
1461   * @param input  An array of FileStatuses
1462   * @param filter A required filter to filter the array
1463   * @return A list of FileStatuses
1464   */
1465  public static List<FileStatus> filterFileStatuses(FileStatus[] input, FileStatusFilter filter) {
1466    if (input == null) return null;
1467    return filterFileStatuses(Iterators.forArray(input), filter);
1468  }
1469
1470  /**
1471   * Filters FileStatuses in an iterator and returns a list
1472   * @param input  An iterator of FileStatuses
1473   * @param filter A required filter to filter the array
1474   * @return A list of FileStatuses
1475   */
1476  public static List<FileStatus> filterFileStatuses(Iterator<FileStatus> input,
1477    FileStatusFilter filter) {
1478    if (input == null) return null;
1479    ArrayList<FileStatus> results = new ArrayList<>();
1480    while (input.hasNext()) {
1481      FileStatus f = input.next();
1482      if (filter.accept(f)) {
1483        results.add(f);
1484      }
1485    }
1486    return results;
1487  }
1488
1489  /**
1490   * Calls fs.listStatus() and treats FileNotFoundException as non-fatal This accommodates
1491   * differences between hadoop versions, where hadoop 1 does not throw a FileNotFoundException, and
1492   * return an empty FileStatus[] while Hadoop 2 will throw FileNotFoundException.
1493   * @param fs     file system
1494   * @param dir    directory
1495   * @param filter file status filter
1496   * @return null if dir is empty or doesn't exist, otherwise FileStatus list
1497   */
1498  public static List<FileStatus> listStatusWithStatusFilter(final FileSystem fs, final Path dir,
1499    final FileStatusFilter filter) throws IOException {
1500    FileStatus[] status = null;
1501    try {
1502      status = fs.listStatus(dir);
1503    } catch (FileNotFoundException fnfe) {
1504      LOG.trace("{} does not exist", dir);
1505      return null;
1506    }
1507
1508    if (ArrayUtils.getLength(status) == 0) {
1509      return null;
1510    }
1511
1512    if (filter == null) {
1513      return Arrays.asList(status);
1514    } else {
1515      List<FileStatus> status2 = filterFileStatuses(status, filter);
1516      if (status2 == null || status2.isEmpty()) {
1517        return null;
1518      } else {
1519        return status2;
1520      }
1521    }
1522  }
1523
1524  /**
1525   * This function is to scan the root path of the file system to get the degree of locality for
1526   * each region on each of the servers having at least one block of that region. This is used by
1527   * the tool {@link org.apache.hadoop.hbase.master.RegionPlacementMaintainer} the configuration to
1528   * use
1529   * @return the mapping from region encoded name to a map of server names to locality fraction in
1530   *         case of file system errors or interrupts
1531   */
1532  public static Map<String, Map<String, Float>>
1533    getRegionDegreeLocalityMappingFromFS(final Configuration conf) throws IOException {
1534    return getRegionDegreeLocalityMappingFromFS(conf, null,
1535      conf.getInt(THREAD_POOLSIZE, DEFAULT_THREAD_POOLSIZE));
1536
1537  }
1538
1539  /**
1540   * This function is to scan the root path of the file system to get the degree of locality for
1541   * each region on each of the servers having at least one block of that region. the configuration
1542   * to use the table you wish to scan locality for the thread pool size to use
1543   * @return the mapping from region encoded name to a map of server names to locality fraction in
1544   *         case of file system errors or interrupts
1545   */
1546  public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
1547    final Configuration conf, final String desiredTable, int threadPoolSize) throws IOException {
1548    Map<String, Map<String, Float>> regionDegreeLocalityMapping = new ConcurrentHashMap<>();
1549    getRegionLocalityMappingFromFS(conf, desiredTable, threadPoolSize, regionDegreeLocalityMapping);
1550    return regionDegreeLocalityMapping;
1551  }
1552
1553  /**
1554   * This function is to scan the root path of the file system to get either the mapping between the
1555   * region name and its best locality region server or the degree of locality of each region on
1556   * each of the servers having at least one block of that region. The output map parameters are
1557   * both optional. the configuration to use the table you wish to scan locality for the thread pool
1558   * size to use the map into which to put the locality degree mapping or null, must be a
1559   * thread-safe implementation in case of file system errors or interrupts
1560   */
1561  private static void getRegionLocalityMappingFromFS(final Configuration conf,
1562    final String desiredTable, int threadPoolSize,
1563    final Map<String, Map<String, Float>> regionDegreeLocalityMapping) throws IOException {
1564    final FileSystem fs = FileSystem.get(conf);
1565    final Path rootPath = CommonFSUtils.getRootDir(conf);
1566    final long startTime = EnvironmentEdgeManager.currentTime();
1567    final Path queryPath;
1568    // The table files are in ${hbase.rootdir}/data/<namespace>/<table>/*
1569    if (null == desiredTable) {
1570      queryPath =
1571        new Path(new Path(rootPath, HConstants.BASE_NAMESPACE_DIR).toString() + "/*/*/*/");
1572    } else {
1573      queryPath = new Path(
1574        CommonFSUtils.getTableDir(rootPath, TableName.valueOf(desiredTable)).toString() + "/*/");
1575    }
1576
1577    // reject all paths that are not appropriate
1578    PathFilter pathFilter = new PathFilter() {
1579      @Override
1580      public boolean accept(Path path) {
1581        // this is the region name; it may get some noise data
1582        if (null == path) {
1583          return false;
1584        }
1585
1586        // no parent?
1587        Path parent = path.getParent();
1588        if (null == parent) {
1589          return false;
1590        }
1591
1592        String regionName = path.getName();
1593        if (null == regionName) {
1594          return false;
1595        }
1596
1597        if (!regionName.toLowerCase(Locale.ROOT).matches("[0-9a-f]+")) {
1598          return false;
1599        }
1600        return true;
1601      }
1602    };
1603
1604    FileStatus[] statusList = fs.globStatus(queryPath, pathFilter);
1605
1606    if (LOG.isDebugEnabled()) {
1607      LOG.debug("Query Path: {} ; # list of files: {}", queryPath, Arrays.toString(statusList));
1608    }
1609
1610    if (null == statusList) {
1611      return;
1612    }
1613
1614    // lower the number of threads in case we have very few expected regions
1615    threadPoolSize = Math.min(threadPoolSize, statusList.length);
1616
1617    // run in multiple threads
1618    final ExecutorService tpe = Executors.newFixedThreadPool(threadPoolSize,
1619      new ThreadFactoryBuilder().setNameFormat("FSRegionQuery-pool-%d").setDaemon(true)
1620        .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
1621    try {
1622      // ignore all file status items that are not of interest
1623      for (FileStatus regionStatus : statusList) {
1624        if (null == regionStatus || !regionStatus.isDirectory()) {
1625          continue;
1626        }
1627
1628        final Path regionPath = regionStatus.getPath();
1629        if (null != regionPath) {
1630          tpe.execute(new FSRegionScanner(fs, regionPath, null, regionDegreeLocalityMapping));
1631        }
1632      }
1633    } finally {
1634      tpe.shutdown();
1635      final long threadWakeFrequency = (long) conf.getInt(HConstants.THREAD_WAKE_FREQUENCY,
1636        HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
1637      try {
1638        // here we wait until TPE terminates, which is either naturally or by
1639        // exceptions in the execution of the threads
1640        while (!tpe.awaitTermination(threadWakeFrequency, TimeUnit.MILLISECONDS)) {
1641          // printing out rough estimate, so as to not introduce
1642          // AtomicInteger
1643          LOG.info("Locality checking is underway: { Scanned Regions : "
1644            + ((ThreadPoolExecutor) tpe).getCompletedTaskCount() + "/"
1645            + ((ThreadPoolExecutor) tpe).getTaskCount() + " }");
1646        }
1647      } catch (InterruptedException e) {
1648        Thread.currentThread().interrupt();
1649        throw (InterruptedIOException) new InterruptedIOException().initCause(e);
1650      }
1651    }
1652
1653    long overhead = EnvironmentEdgeManager.currentTime() - startTime;
1654    LOG.info("Scan DFS for locality info takes {}ms", overhead);
1655  }
1656
1657  /**
1658   * Do our short circuit read setup. Checks buffer size to use and whether to do checksumming in
1659   * hbase or hdfs.
1660   */
1661  public static void setupShortCircuitRead(final Configuration conf) {
1662    // Check that the user has not set the "dfs.client.read.shortcircuit.skip.checksum" property.
1663    boolean shortCircuitSkipChecksum =
1664      conf.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false);
1665    boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
1666    if (shortCircuitSkipChecksum) {
1667      LOG.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not "
1668        + "be set to true."
1669        + (useHBaseChecksum
1670          ? " HBase checksum doesn't require "
1671            + "it, see https://issues.apache.org/jira/browse/HBASE-6868."
1672          : ""));
1673      assert !shortCircuitSkipChecksum; // this will fail if assertions are on
1674    }
1675    checkShortCircuitReadBufferSize(conf);
1676  }
1677
1678  /**
1679   * Check if short circuit read buffer size is set and if not, set it to hbase value.
1680   */
1681  public static void checkShortCircuitReadBufferSize(final Configuration conf) {
1682    final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2;
1683    final int notSet = -1;
1684    // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2
1685    final String dfsKey = "dfs.client.read.shortcircuit.buffer.size";
1686    int size = conf.getInt(dfsKey, notSet);
1687    // If a size is set, return -- we will use it.
1688    if (size != notSet) return;
1689    // But short circuit buffer size is normally not set. Put in place the hbase wanted size.
1690    int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize);
1691    conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
1692  }
1693
1694  /**
1695   * Returns The DFSClient DFSHedgedReadMetrics instance or null if can't be found or not on hdfs.
1696   */
1697  public static DFSHedgedReadMetrics getDFSHedgedReadMetrics(final Configuration c)
1698    throws IOException {
1699    if (!CommonFSUtils.isHDFS(c)) {
1700      return null;
1701    }
1702    // getHedgedReadMetrics is package private. Get the DFSClient instance that is internal
1703    // to the DFS FS instance and make the method getHedgedReadMetrics accessible, then invoke it
1704    // to get the singleton instance of DFSHedgedReadMetrics shared by DFSClients.
1705    final String name = "getHedgedReadMetrics";
1706    DFSClient dfsclient = ((DistributedFileSystem) FileSystem.get(c)).getClient();
1707    Method m;
1708    try {
1709      m = dfsclient.getClass().getDeclaredMethod(name);
1710    } catch (NoSuchMethodException e) {
1711      LOG.warn(
1712        "Failed find method " + name + " in dfsclient; no hedged read metrics: " + e.getMessage());
1713      return null;
1714    } catch (SecurityException e) {
1715      LOG.warn(
1716        "Failed find method " + name + " in dfsclient; no hedged read metrics: " + e.getMessage());
1717      return null;
1718    }
1719    m.setAccessible(true);
1720    try {
1721      return (DFSHedgedReadMetrics) m.invoke(dfsclient);
1722    } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
1723      LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: "
1724        + e.getMessage());
1725      return null;
1726    }
1727  }
1728
1729  public static List<Path> copyFilesParallel(FileSystem srcFS, Path src, FileSystem dstFS, Path dst,
1730    Configuration conf, int threads) throws IOException {
1731    ExecutorService pool = Executors.newFixedThreadPool(threads);
1732    List<Future<Void>> futures = new ArrayList<>();
1733    List<Path> traversedPaths;
1734    try {
1735      traversedPaths = copyFiles(srcFS, src, dstFS, dst, conf, pool, futures);
1736      for (Future<Void> future : futures) {
1737        future.get();
1738      }
1739    } catch (ExecutionException | InterruptedException | IOException e) {
1740      throw new IOException("Copy snapshot reference files failed", e);
1741    } finally {
1742      pool.shutdownNow();
1743    }
1744    return traversedPaths;
1745  }
1746
1747  private static List<Path> copyFiles(FileSystem srcFS, Path src, FileSystem dstFS, Path dst,
1748    Configuration conf, ExecutorService pool, List<Future<Void>> futures) throws IOException {
1749    List<Path> traversedPaths = new ArrayList<>();
1750    traversedPaths.add(dst);
1751    FileStatus currentFileStatus = srcFS.getFileStatus(src);
1752    if (currentFileStatus.isDirectory()) {
1753      if (!dstFS.mkdirs(dst)) {
1754        throw new IOException("Create directory failed: " + dst);
1755      }
1756      FileStatus[] subPaths = srcFS.listStatus(src);
1757      for (FileStatus subPath : subPaths) {
1758        traversedPaths.addAll(copyFiles(srcFS, subPath.getPath(), dstFS,
1759          new Path(dst, subPath.getPath().getName()), conf, pool, futures));
1760      }
1761    } else {
1762      Future<Void> future = pool.submit(() -> {
1763        FileUtil.copy(srcFS, src, dstFS, dst, false, false, conf);
1764        return null;
1765      });
1766      futures.add(future);
1767    }
1768    return traversedPaths;
1769  }
1770
1771  /** Returns A set containing all namenode addresses of fs */
1772  private static Set<InetSocketAddress> getNNAddresses(DistributedFileSystem fs,
1773    Configuration conf) {
1774    Set<InetSocketAddress> addresses = new HashSet<>();
1775    String serviceName = fs.getCanonicalServiceName();
1776
1777    if (serviceName.startsWith("ha-hdfs")) {
1778      try {
1779        Map<String, Map<String, InetSocketAddress>> addressMap =
1780          DFSUtil.getNNServiceRpcAddressesForCluster(conf);
1781        String nameService = serviceName.substring(serviceName.indexOf(":") + 1);
1782        if (addressMap.containsKey(nameService)) {
1783          Map<String, InetSocketAddress> nnMap = addressMap.get(nameService);
1784          for (Map.Entry<String, InetSocketAddress> e2 : nnMap.entrySet()) {
1785            InetSocketAddress addr = e2.getValue();
1786            addresses.add(addr);
1787          }
1788        }
1789      } catch (Exception e) {
1790        LOG.warn("DFSUtil.getNNServiceRpcAddresses failed. serviceName=" + serviceName, e);
1791      }
1792    } else {
1793      URI uri = fs.getUri();
1794      int port = uri.getPort();
1795      if (port < 0) {
1796        int idx = serviceName.indexOf(':');
1797        port = Integer.parseInt(serviceName.substring(idx + 1));
1798      }
1799      InetSocketAddress addr = new InetSocketAddress(uri.getHost(), port);
1800      addresses.add(addr);
1801    }
1802
1803    return addresses;
1804  }
1805
1806  /**
1807   * @param conf the Configuration of HBase
1808   * @return Whether srcFs and desFs are on same hdfs or not
1809   */
1810  public static boolean isSameHdfs(Configuration conf, FileSystem srcFs, FileSystem desFs) {
1811    // By getCanonicalServiceName, we could make sure both srcFs and desFs
1812    // show a unified format which contains scheme, host and port.
1813    String srcServiceName = srcFs.getCanonicalServiceName();
1814    String desServiceName = desFs.getCanonicalServiceName();
1815
1816    if (srcServiceName == null || desServiceName == null) {
1817      return false;
1818    }
1819    if (srcServiceName.equals(desServiceName)) {
1820      return true;
1821    }
1822    if (srcServiceName.startsWith("ha-hdfs") && desServiceName.startsWith("ha-hdfs")) {
1823      Collection<String> internalNameServices =
1824        conf.getTrimmedStringCollection("dfs.internal.nameservices");
1825      if (!internalNameServices.isEmpty()) {
1826        if (internalNameServices.contains(srcServiceName.split(":")[1])) {
1827          return true;
1828        } else {
1829          return false;
1830        }
1831      }
1832    }
1833    if (srcFs instanceof DistributedFileSystem && desFs instanceof DistributedFileSystem) {
1834      // If one serviceName is an HA format while the other is a non-HA format,
1835      // maybe they refer to the same FileSystem.
1836      // For example, srcFs is "ha-hdfs://nameservices" and desFs is "hdfs://activeNamenode:port"
1837      Set<InetSocketAddress> srcAddrs = getNNAddresses((DistributedFileSystem) srcFs, conf);
1838      Set<InetSocketAddress> desAddrs = getNNAddresses((DistributedFileSystem) desFs, conf);
1839      if (Sets.intersection(srcAddrs, desAddrs).size() > 0) {
1840        return true;
1841      }
1842    }
1843
1844    return false;
1845  }
1846}