001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.util;
020
021import edu.umd.cs.findbugs.annotations.CheckForNull;
022import java.io.ByteArrayInputStream;
023import java.io.DataInputStream;
024import java.io.EOFException;
025import java.io.FileNotFoundException;
026import java.io.IOException;
027import java.io.InterruptedIOException;
028import java.lang.reflect.InvocationTargetException;
029import java.lang.reflect.Method;
030import java.net.InetSocketAddress;
031import java.net.URI;
032import java.util.ArrayList;
033import java.util.Arrays;
034import java.util.Collection;
035import java.util.Collections;
036import java.util.HashMap;
037import java.util.HashSet;
038import java.util.Iterator;
039import java.util.List;
040import java.util.Locale;
041import java.util.Map;
042import java.util.Optional;
043import java.util.Set;
044import java.util.Vector;
045import java.util.concurrent.ConcurrentHashMap;
046import java.util.concurrent.ExecutionException;
047import java.util.concurrent.ExecutorService;
048import java.util.concurrent.Executors;
049import java.util.concurrent.Future;
050import java.util.concurrent.FutureTask;
051import java.util.concurrent.ThreadPoolExecutor;
052import java.util.concurrent.TimeUnit;
053import java.util.regex.Pattern;
054import org.apache.commons.lang3.ArrayUtils;
055import org.apache.hadoop.conf.Configuration;
056import org.apache.hadoop.fs.BlockLocation;
057import org.apache.hadoop.fs.FSDataInputStream;
058import org.apache.hadoop.fs.FSDataOutputStream;
059import org.apache.hadoop.fs.FileStatus;
060import org.apache.hadoop.fs.FileSystem;
061import org.apache.hadoop.fs.FileUtil;
062import org.apache.hadoop.fs.Path;
063import org.apache.hadoop.fs.PathFilter;
064import org.apache.hadoop.fs.permission.FsPermission;
065import org.apache.hadoop.hbase.ClusterId;
066import org.apache.hadoop.hbase.HConstants;
067import org.apache.hadoop.hbase.HDFSBlocksDistribution;
068import org.apache.hadoop.hbase.TableName;
069import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
070import org.apache.hadoop.hbase.client.RegionInfo;
071import org.apache.hadoop.hbase.client.RegionInfoBuilder;
072import org.apache.hadoop.hbase.exceptions.DeserializationException;
073import org.apache.hadoop.hbase.fs.HFileSystem;
074import org.apache.hadoop.hbase.io.HFileLink;
075import org.apache.hadoop.hbase.master.HMaster;
076import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
077import org.apache.hadoop.hdfs.DFSClient;
078import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
079import org.apache.hadoop.hdfs.DFSUtil;
080import org.apache.hadoop.hdfs.DistributedFileSystem;
081import org.apache.hadoop.hdfs.protocol.HdfsConstants;
082import org.apache.hadoop.io.IOUtils;
083import org.apache.hadoop.ipc.RemoteException;
084import org.apache.hadoop.util.Progressable;
085import org.apache.yetus.audience.InterfaceAudience;
086import org.slf4j.Logger;
087import org.slf4j.LoggerFactory;
088
089import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
090import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
091import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
092import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
093import org.apache.hbase.thirdparty.com.google.common.primitives.Ints;
094
095import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
096import org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos;
097
098/**
099 * Utility methods for interacting with the underlying file system.
100 */
101@InterfaceAudience.Private
102public final class FSUtils {
103  private static final Logger LOG = LoggerFactory.getLogger(FSUtils.class);
104
105  private static final String THREAD_POOLSIZE = "hbase.client.localityCheck.threadPoolSize";
106  private static final int DEFAULT_THREAD_POOLSIZE = 2;
107
108  /** Set to true on Windows platforms */
109  @VisibleForTesting // currently only used in testing. TODO refactor into a test class
110  public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
111
112  private FSUtils() {
113  }
114
115  /**
116   * @return True is <code>fs</code> is instance of DistributedFileSystem
117   * @throws IOException
118   */
119  public static boolean isDistributedFileSystem(final FileSystem fs) throws IOException {
120    FileSystem fileSystem = fs;
121    // If passed an instance of HFileSystem, it fails instanceof DistributedFileSystem.
122    // Check its backing fs for dfs-ness.
123    if (fs instanceof HFileSystem) {
124      fileSystem = ((HFileSystem)fs).getBackingFs();
125    }
126    return fileSystem instanceof DistributedFileSystem;
127  }
128
129  /**
130   * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
131   * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true.  Does not consider
132   * schema; i.e. if schemas different but path or subpath matches, the two will equate.
133   * @param pathToSearch Path we will be trying to match.
134   * @param pathTail
135   * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
136   */
137  public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) {
138    Path tailPath = pathTail;
139    String tailName;
140    Path toSearch = pathToSearch;
141    String toSearchName;
142    boolean result = false;
143
144    if (pathToSearch.depth() != pathTail.depth()) {
145      return false;
146    }
147
148    do {
149      tailName = tailPath.getName();
150      if (tailName == null || tailName.isEmpty()) {
151        result = true;
152        break;
153      }
154      toSearchName = toSearch.getName();
155      if (toSearchName == null || toSearchName.isEmpty()) {
156        break;
157      }
158      // Move up a parent on each path for next go around.  Path doesn't let us go off the end.
159      tailPath = tailPath.getParent();
160      toSearch = toSearch.getParent();
161    } while(tailName.equals(toSearchName));
162    return result;
163  }
164
165  /**
166   * Delete the region directory if exists.
167   * @return True if deleted the region directory.
168   * @throws IOException
169   */
170  public static boolean deleteRegionDir(final Configuration conf, final RegionInfo hri)
171    throws IOException {
172    Path rootDir = CommonFSUtils.getRootDir(conf);
173    FileSystem fs = rootDir.getFileSystem(conf);
174    return CommonFSUtils.deleteDirectory(fs,
175      new Path(CommonFSUtils.getTableDir(rootDir, hri.getTable()), hri.getEncodedName()));
176  }
177
178 /**
179   * Create the specified file on the filesystem. By default, this will:
180   * <ol>
181   * <li>overwrite the file if it exists</li>
182   * <li>apply the umask in the configuration (if it is enabled)</li>
183   * <li>use the fs configured buffer size (or 4096 if not set)</li>
184   * <li>use the configured column family replication or default replication if
185   * {@link ColumnFamilyDescriptorBuilder#DEFAULT_DFS_REPLICATION}</li>
186   * <li>use the default block size</li>
187   * <li>not track progress</li>
188   * </ol>
189   * @param conf configurations
190   * @param fs {@link FileSystem} on which to write the file
191   * @param path {@link Path} to the file to write
192   * @param perm permissions
193   * @param favoredNodes favored data nodes
194   * @return output stream to the created file
195   * @throws IOException if the file cannot be created
196   */
197  public static FSDataOutputStream create(Configuration conf, FileSystem fs, Path path,
198    FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException {
199    if (fs instanceof HFileSystem) {
200      FileSystem backingFs = ((HFileSystem) fs).getBackingFs();
201      if (backingFs instanceof DistributedFileSystem) {
202        // Try to use the favoredNodes version via reflection to allow backwards-
203        // compatibility.
204        short replication = Short.parseShort(conf.get(ColumnFamilyDescriptorBuilder.DFS_REPLICATION,
205          String.valueOf(ColumnFamilyDescriptorBuilder.DEFAULT_DFS_REPLICATION)));
206        try {
207          return (FSDataOutputStream) (DistributedFileSystem.class
208            .getDeclaredMethod("create", Path.class, FsPermission.class, boolean.class, int.class,
209              short.class, long.class, Progressable.class, InetSocketAddress[].class)
210            .invoke(backingFs, path, perm, true, CommonFSUtils.getDefaultBufferSize(backingFs),
211              replication > 0 ? replication : CommonFSUtils.getDefaultReplication(backingFs, path),
212              CommonFSUtils.getDefaultBlockSize(backingFs, path), null, favoredNodes));
213        } catch (InvocationTargetException ite) {
214          // Function was properly called, but threw it's own exception.
215          throw new IOException(ite.getCause());
216        } catch (NoSuchMethodException e) {
217          LOG.debug("DFS Client does not support most favored nodes create; using default create");
218          LOG.trace("Ignoring; use default create", e);
219        } catch (IllegalArgumentException | SecurityException | IllegalAccessException e) {
220          LOG.debug("Ignoring (most likely Reflection related exception) " + e);
221        }
222      }
223    }
224    return CommonFSUtils.create(fs, path, perm, true);
225  }
226
227  /**
228   * Checks to see if the specified file system is available
229   *
230   * @param fs filesystem
231   * @throws IOException e
232   */
233  public static void checkFileSystemAvailable(final FileSystem fs)
234  throws IOException {
235    if (!(fs instanceof DistributedFileSystem)) {
236      return;
237    }
238    IOException exception = null;
239    DistributedFileSystem dfs = (DistributedFileSystem) fs;
240    try {
241      if (dfs.exists(new Path("/"))) {
242        return;
243      }
244    } catch (IOException e) {
245      exception = e instanceof RemoteException ?
246              ((RemoteException)e).unwrapRemoteException() : e;
247    }
248    try {
249      fs.close();
250    } catch (Exception e) {
251      LOG.error("file system close failed: ", e);
252    }
253    throw new IOException("File system is not available", exception);
254  }
255
256  /**
257   * We use reflection because {@link DistributedFileSystem#setSafeMode(
258   * HdfsConstants.SafeModeAction action, boolean isChecked)} is not in hadoop 1.1
259   *
260   * @param dfs
261   * @return whether we're in safe mode
262   * @throws IOException
263   */
264  private static boolean isInSafeMode(DistributedFileSystem dfs) throws IOException {
265    boolean inSafeMode = false;
266    try {
267      Method m = DistributedFileSystem.class.getMethod("setSafeMode", new Class<?> []{
268          org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.class, boolean.class});
269      inSafeMode = (Boolean) m.invoke(dfs,
270        org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET, true);
271    } catch (Exception e) {
272      if (e instanceof IOException) throw (IOException) e;
273
274      // Check whether dfs is on safemode.
275      inSafeMode = dfs.setSafeMode(
276        org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET);
277    }
278    return inSafeMode;
279  }
280
281  /**
282   * Check whether dfs is in safemode.
283   * @param conf
284   * @throws IOException
285   */
286  public static void checkDfsSafeMode(final Configuration conf)
287  throws IOException {
288    boolean isInSafeMode = false;
289    FileSystem fs = FileSystem.get(conf);
290    if (fs instanceof DistributedFileSystem) {
291      DistributedFileSystem dfs = (DistributedFileSystem)fs;
292      isInSafeMode = isInSafeMode(dfs);
293    }
294    if (isInSafeMode) {
295      throw new IOException("File system is in safemode, it can't be written now");
296    }
297  }
298
299  /**
300   * Verifies current version of file system
301   *
302   * @param fs filesystem object
303   * @param rootdir root hbase directory
304   * @return null if no version file exists, version string otherwise
305   * @throws IOException if the version file fails to open
306   * @throws DeserializationException if the version data cannot be translated into a version
307   */
308  public static String getVersion(FileSystem fs, Path rootdir)
309  throws IOException, DeserializationException {
310    final Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
311    FileStatus[] status = null;
312    try {
313      // hadoop 2.0 throws FNFE if directory does not exist.
314      // hadoop 1.0 returns null if directory does not exist.
315      status = fs.listStatus(versionFile);
316    } catch (FileNotFoundException fnfe) {
317      return null;
318    }
319    if (ArrayUtils.getLength(status) == 0) {
320      return null;
321    }
322    String version = null;
323    byte [] content = new byte [(int)status[0].getLen()];
324    FSDataInputStream s = fs.open(versionFile);
325    try {
326      IOUtils.readFully(s, content, 0, content.length);
327      if (ProtobufUtil.isPBMagicPrefix(content)) {
328        version = parseVersionFrom(content);
329      } else {
330        // Presume it pre-pb format.
331        try (DataInputStream dis = new DataInputStream(new ByteArrayInputStream(content))) {
332          version = dis.readUTF();
333        }
334      }
335    } catch (EOFException eof) {
336      LOG.warn("Version file was empty, odd, will try to set it.");
337    } finally {
338      s.close();
339    }
340    return version;
341  }
342
343  /**
344   * Parse the content of the ${HBASE_ROOTDIR}/hbase.version file.
345   * @param bytes The byte content of the hbase.version file
346   * @return The version found in the file as a String
347   * @throws DeserializationException if the version data cannot be translated into a version
348   */
349  static String parseVersionFrom(final byte [] bytes)
350  throws DeserializationException {
351    ProtobufUtil.expectPBMagicPrefix(bytes);
352    int pblen = ProtobufUtil.lengthOfPBMagic();
353    FSProtos.HBaseVersionFileContent.Builder builder =
354      FSProtos.HBaseVersionFileContent.newBuilder();
355    try {
356      ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
357      return builder.getVersion();
358    } catch (IOException e) {
359      // Convert
360      throw new DeserializationException(e);
361    }
362  }
363
364  /**
365   * Create the content to write into the ${HBASE_ROOTDIR}/hbase.version file.
366   * @param version Version to persist
367   * @return Serialized protobuf with <code>version</code> content and a bit of pb magic for a prefix.
368   */
369  static byte [] toVersionByteArray(final String version) {
370    FSProtos.HBaseVersionFileContent.Builder builder =
371      FSProtos.HBaseVersionFileContent.newBuilder();
372    return ProtobufUtil.prependPBMagic(builder.setVersion(version).build().toByteArray());
373  }
374
375  /**
376   * Verifies current version of file system
377   *
378   * @param fs file system
379   * @param rootdir root directory of HBase installation
380   * @param message if true, issues a message on System.out
381   * @throws IOException if the version file cannot be opened
382   * @throws DeserializationException if the contents of the version file cannot be parsed
383   */
384  public static void checkVersion(FileSystem fs, Path rootdir, boolean message)
385  throws IOException, DeserializationException {
386    checkVersion(fs, rootdir, message, 0, HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
387  }
388
389  /**
390   * Verifies current version of file system
391   *
392   * @param fs file system
393   * @param rootdir root directory of HBase installation
394   * @param message if true, issues a message on System.out
395   * @param wait wait interval
396   * @param retries number of times to retry
397   *
398   * @throws IOException if the version file cannot be opened
399   * @throws DeserializationException if the contents of the version file cannot be parsed
400   */
401  public static void checkVersion(FileSystem fs, Path rootdir,
402      boolean message, int wait, int retries)
403  throws IOException, DeserializationException {
404    String version = getVersion(fs, rootdir);
405    String msg;
406    if (version == null) {
407      if (!metaRegionExists(fs, rootdir)) {
408        // rootDir is empty (no version file and no root region)
409        // just create new version file (HBASE-1195)
410        setVersion(fs, rootdir, wait, retries);
411        return;
412      } else {
413        msg = "hbase.version file is missing. Is your hbase.rootdir valid? " +
414            "You can restore hbase.version file by running 'HBCK2 filesystem -fix'. " +
415            "See https://github.com/apache/hbase-operator-tools/tree/master/hbase-hbck2";
416      }
417    } else if (version.compareTo(HConstants.FILE_SYSTEM_VERSION) == 0) {
418      return;
419    } else {
420      msg = "HBase file layout needs to be upgraded. Current filesystem version is " + version +
421          " but software requires version " + HConstants.FILE_SYSTEM_VERSION +
422          ". Consult http://hbase.apache.org/book.html for further information about " +
423          "upgrading HBase.";
424    }
425
426    // version is deprecated require migration
427    // Output on stdout so user sees it in terminal.
428    if (message) {
429      System.out.println("WARNING! " + msg);
430    }
431    throw new FileSystemVersionException(msg);
432  }
433
434  /**
435   * Sets version of file system
436   *
437   * @param fs filesystem object
438   * @param rootdir hbase root
439   * @throws IOException e
440   */
441  public static void setVersion(FileSystem fs, Path rootdir)
442  throws IOException {
443    setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, 0,
444      HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
445  }
446
447  /**
448   * Sets version of file system
449   *
450   * @param fs filesystem object
451   * @param rootdir hbase root
452   * @param wait time to wait for retry
453   * @param retries number of times to retry before failing
454   * @throws IOException e
455   */
456  public static void setVersion(FileSystem fs, Path rootdir, int wait, int retries)
457  throws IOException {
458    setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, wait, retries);
459  }
460
461
462  /**
463   * Sets version of file system
464   *
465   * @param fs filesystem object
466   * @param rootdir hbase root directory
467   * @param version version to set
468   * @param wait time to wait for retry
469   * @param retries number of times to retry before throwing an IOException
470   * @throws IOException e
471   */
472  public static void setVersion(FileSystem fs, Path rootdir, String version,
473      int wait, int retries) throws IOException {
474    Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
475    Path tempVersionFile = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY + Path.SEPARATOR +
476      HConstants.VERSION_FILE_NAME);
477    while (true) {
478      try {
479        // Write the version to a temporary file
480        FSDataOutputStream s = fs.create(tempVersionFile);
481        try {
482          s.write(toVersionByteArray(version));
483          s.close();
484          s = null;
485          // Move the temp version file to its normal location. Returns false
486          // if the rename failed. Throw an IOE in that case.
487          if (!fs.rename(tempVersionFile, versionFile)) {
488            throw new IOException("Unable to move temp version file to " + versionFile);
489          }
490        } finally {
491          // Cleaning up the temporary if the rename failed would be trying
492          // too hard. We'll unconditionally create it again the next time
493          // through anyway, files are overwritten by default by create().
494
495          // Attempt to close the stream on the way out if it is still open.
496          try {
497            if (s != null) s.close();
498          } catch (IOException ignore) { }
499        }
500        LOG.info("Created version file at " + rootdir.toString() + " with version=" + version);
501        return;
502      } catch (IOException e) {
503        if (retries > 0) {
504          LOG.debug("Unable to create version file at " + rootdir.toString() + ", retrying", e);
505          fs.delete(versionFile, false);
506          try {
507            if (wait > 0) {
508              Thread.sleep(wait);
509            }
510          } catch (InterruptedException ie) {
511            throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
512          }
513          retries--;
514        } else {
515          throw e;
516        }
517      }
518    }
519  }
520
521  /**
522   * Checks that a cluster ID file exists in the HBase root directory
523   * @param fs the root directory FileSystem
524   * @param rootdir the HBase root directory in HDFS
525   * @param wait how long to wait between retries
526   * @return <code>true</code> if the file exists, otherwise <code>false</code>
527   * @throws IOException if checking the FileSystem fails
528   */
529  public static boolean checkClusterIdExists(FileSystem fs, Path rootdir,
530      long wait) throws IOException {
531    while (true) {
532      try {
533        Path filePath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
534        return fs.exists(filePath);
535      } catch (IOException ioe) {
536        if (wait > 0L) {
537          LOG.warn("Unable to check cluster ID file in {}, retrying in {}ms", rootdir, wait, ioe);
538          try {
539            Thread.sleep(wait);
540          } catch (InterruptedException e) {
541            Thread.currentThread().interrupt();
542            throw (InterruptedIOException) new InterruptedIOException().initCause(e);
543          }
544        } else {
545          throw ioe;
546        }
547      }
548    }
549  }
550
551  /**
552   * Returns the value of the unique cluster ID stored for this HBase instance.
553   * @param fs the root directory FileSystem
554   * @param rootdir the path to the HBase root directory
555   * @return the unique cluster identifier
556   * @throws IOException if reading the cluster ID file fails
557   */
558  public static ClusterId getClusterId(FileSystem fs, Path rootdir)
559  throws IOException {
560    Path idPath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
561    ClusterId clusterId = null;
562    FileStatus status = fs.exists(idPath)? fs.getFileStatus(idPath):  null;
563    if (status != null) {
564      int len = Ints.checkedCast(status.getLen());
565      byte [] content = new byte[len];
566      FSDataInputStream in = fs.open(idPath);
567      try {
568        in.readFully(content);
569      } catch (EOFException eof) {
570        LOG.warn("Cluster ID file {} is empty", idPath);
571      } finally{
572        in.close();
573      }
574      try {
575        clusterId = ClusterId.parseFrom(content);
576      } catch (DeserializationException e) {
577        throw new IOException("content=" + Bytes.toString(content), e);
578      }
579      // If not pb'd, make it so.
580      if (!ProtobufUtil.isPBMagicPrefix(content)) {
581        String cid = null;
582        in = fs.open(idPath);
583        try {
584          cid = in.readUTF();
585          clusterId = new ClusterId(cid);
586        } catch (EOFException eof) {
587          LOG.warn("Cluster ID file {} is empty", idPath);
588        } finally {
589          in.close();
590        }
591        rewriteAsPb(fs, rootdir, idPath, clusterId);
592      }
593      return clusterId;
594    } else {
595      LOG.warn("Cluster ID file does not exist at {}", idPath);
596    }
597    return clusterId;
598  }
599
600  /**
601   * @param cid
602   * @throws IOException
603   */
604  private static void rewriteAsPb(final FileSystem fs, final Path rootdir, final Path p,
605      final ClusterId cid)
606  throws IOException {
607    // Rewrite the file as pb.  Move aside the old one first, write new
608    // then delete the moved-aside file.
609    Path movedAsideName = new Path(p + "." + System.currentTimeMillis());
610    if (!fs.rename(p, movedAsideName)) throw new IOException("Failed rename of " + p);
611    setClusterId(fs, rootdir, cid, 100);
612    if (!fs.delete(movedAsideName, false)) {
613      throw new IOException("Failed delete of " + movedAsideName);
614    }
615    LOG.debug("Rewrote the hbase.id file as pb");
616  }
617
618  /**
619   * Writes a new unique identifier for this cluster to the "hbase.id" file in the HBase root
620   * directory. If any operations on the ID file fails, and {@code wait} is a positive value, the
621   * method will retry to produce the ID file until the thread is forcibly interrupted.
622   *
623   * @param fs the root directory FileSystem
624   * @param rootdir the path to the HBase root directory
625   * @param clusterId the unique identifier to store
626   * @param wait how long (in milliseconds) to wait between retries
627   * @throws IOException if writing to the FileSystem fails and no wait value
628   */
629  public static void setClusterId(final FileSystem fs, final Path rootdir,
630      final ClusterId clusterId, final long wait) throws IOException {
631
632    final Path idFile = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
633    final Path tempDir = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY);
634    final Path tempIdFile = new Path(tempDir, HConstants.CLUSTER_ID_FILE_NAME);
635
636    LOG.debug("Create cluster ID file [{}] with ID: {}", idFile, clusterId);
637
638    while (true) {
639      Optional<IOException> failure = Optional.empty();
640
641      LOG.debug("Write the cluster ID file to a temporary location: {}", tempIdFile);
642      try (FSDataOutputStream s = fs.create(tempIdFile)) {
643        s.write(clusterId.toByteArray());
644      } catch (IOException ioe) {
645        failure = Optional.of(ioe);
646      }
647
648      if (!failure.isPresent()) {
649        try {
650          LOG.debug("Move the temporary cluster ID file to its target location [{}]:[{}]",
651            tempIdFile, idFile);
652
653          if (!fs.rename(tempIdFile, idFile)) {
654            failure =
655                Optional.of(new IOException("Unable to move temp cluster ID file to " + idFile));
656          }
657        } catch (IOException ioe) {
658          failure = Optional.of(ioe);
659        }
660      }
661
662      if (failure.isPresent()) {
663        final IOException cause = failure.get();
664        if (wait > 0L) {
665          LOG.warn("Unable to create cluster ID file in {}, retrying in {}ms", rootdir, wait,
666            cause);
667          try {
668            Thread.sleep(wait);
669          } catch (InterruptedException e) {
670            Thread.currentThread().interrupt();
671            throw (InterruptedIOException) new InterruptedIOException().initCause(e);
672          }
673          continue;
674        } else {
675          throw cause;
676        }
677      } else {
678        return;
679      }
680    }
681  }
682
683  /**
684   * If DFS, check safe mode and if so, wait until we clear it.
685   * @param conf configuration
686   * @param wait Sleep between retries
687   * @throws IOException e
688   */
689  public static void waitOnSafeMode(final Configuration conf,
690    final long wait)
691  throws IOException {
692    FileSystem fs = FileSystem.get(conf);
693    if (!(fs instanceof DistributedFileSystem)) return;
694    DistributedFileSystem dfs = (DistributedFileSystem)fs;
695    // Make sure dfs is not in safe mode
696    while (isInSafeMode(dfs)) {
697      LOG.info("Waiting for dfs to exit safe mode...");
698      try {
699        Thread.sleep(wait);
700      } catch (InterruptedException e) {
701        Thread.currentThread().interrupt();
702        throw (InterruptedIOException) new InterruptedIOException().initCause(e);
703      }
704    }
705  }
706
707  /**
708   * Checks if meta region exists
709   * @param fs file system
710   * @param rootDir root directory of HBase installation
711   * @return true if exists
712   */
713  public static boolean metaRegionExists(FileSystem fs, Path rootDir) throws IOException {
714    Path metaRegionDir = getRegionDirFromRootDir(rootDir, RegionInfoBuilder.FIRST_META_REGIONINFO);
715    return fs.exists(metaRegionDir);
716  }
717
718  /**
719   * Compute HDFS blocks distribution of a given file, or a portion of the file
720   * @param fs file system
721   * @param status file status of the file
722   * @param start start position of the portion
723   * @param length length of the portion
724   * @return The HDFS blocks distribution
725   */
726  static public HDFSBlocksDistribution computeHDFSBlocksDistribution(
727    final FileSystem fs, FileStatus status, long start, long length)
728    throws IOException {
729    HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution();
730    BlockLocation [] blockLocations =
731      fs.getFileBlockLocations(status, start, length);
732    for(BlockLocation bl : blockLocations) {
733      String [] hosts = bl.getHosts();
734      long len = bl.getLength();
735      blocksDistribution.addHostsAndBlockWeight(hosts, len);
736    }
737
738    return blocksDistribution;
739  }
740
741  /**
742   * Update blocksDistribution with blockLocations
743   * @param blocksDistribution the hdfs blocks distribution
744   * @param blockLocations an array containing block location
745   */
746  static public void addToHDFSBlocksDistribution(
747      HDFSBlocksDistribution blocksDistribution, BlockLocation[] blockLocations)
748      throws IOException {
749    for (BlockLocation bl : blockLocations) {
750      String[] hosts = bl.getHosts();
751      long len = bl.getLength();
752      blocksDistribution.addHostsAndBlockWeight(hosts, len);
753    }
754  }
755
756  // TODO move this method OUT of FSUtils. No dependencies to HMaster
757  /**
758   * Returns the total overall fragmentation percentage. Includes hbase:meta and
759   * -ROOT- as well.
760   *
761   * @param master  The master defining the HBase root and file system
762   * @return A map for each table and its percentage (never null)
763   * @throws IOException When scanning the directory fails
764   */
765  public static int getTotalTableFragmentation(final HMaster master)
766  throws IOException {
767    Map<String, Integer> map = getTableFragmentation(master);
768    return map.isEmpty() ? -1 :  map.get("-TOTAL-");
769  }
770
771  /**
772   * Runs through the HBase rootdir and checks how many stores for each table
773   * have more than one file in them. Checks -ROOT- and hbase:meta too. The total
774   * percentage across all tables is stored under the special key "-TOTAL-".
775   *
776   * @param master  The master defining the HBase root and file system.
777   * @return A map for each table and its percentage (never null).
778   *
779   * @throws IOException When scanning the directory fails.
780   */
781  public static Map<String, Integer> getTableFragmentation(final HMaster master)
782    throws IOException {
783    Path path = CommonFSUtils.getRootDir(master.getConfiguration());
784    // since HMaster.getFileSystem() is package private
785    FileSystem fs = path.getFileSystem(master.getConfiguration());
786    return getTableFragmentation(fs, path);
787  }
788
789  /**
790   * Runs through the HBase rootdir and checks how many stores for each table
791   * have more than one file in them. Checks -ROOT- and hbase:meta too. The total
792   * percentage across all tables is stored under the special key "-TOTAL-".
793   *
794   * @param fs  The file system to use
795   * @param hbaseRootDir  The root directory to scan
796   * @return A map for each table and its percentage (never null)
797   * @throws IOException When scanning the directory fails
798   */
799  public static Map<String, Integer> getTableFragmentation(
800    final FileSystem fs, final Path hbaseRootDir)
801  throws IOException {
802    Map<String, Integer> frags = new HashMap<>();
803    int cfCountTotal = 0;
804    int cfFragTotal = 0;
805    PathFilter regionFilter = new RegionDirFilter(fs);
806    PathFilter familyFilter = new FamilyDirFilter(fs);
807    List<Path> tableDirs = getTableDirs(fs, hbaseRootDir);
808    for (Path d : tableDirs) {
809      int cfCount = 0;
810      int cfFrag = 0;
811      FileStatus[] regionDirs = fs.listStatus(d, regionFilter);
812      for (FileStatus regionDir : regionDirs) {
813        Path dd = regionDir.getPath();
814        // else its a region name, now look in region for families
815        FileStatus[] familyDirs = fs.listStatus(dd, familyFilter);
816        for (FileStatus familyDir : familyDirs) {
817          cfCount++;
818          cfCountTotal++;
819          Path family = familyDir.getPath();
820          // now in family make sure only one file
821          FileStatus[] familyStatus = fs.listStatus(family);
822          if (familyStatus.length > 1) {
823            cfFrag++;
824            cfFragTotal++;
825          }
826        }
827      }
828      // compute percentage per table and store in result list
829      frags.put(CommonFSUtils.getTableName(d).getNameAsString(),
830        cfCount == 0? 0: Math.round((float) cfFrag / cfCount * 100));
831    }
832    // set overall percentage for all tables
833    frags.put("-TOTAL-",
834      cfCountTotal == 0? 0: Math.round((float) cfFragTotal / cfCountTotal * 100));
835    return frags;
836  }
837
838  public static void renameFile(FileSystem fs, Path src, Path dst) throws IOException {
839    if (fs.exists(dst) && !fs.delete(dst, false)) {
840      throw new IOException("Can not delete " + dst);
841    }
842    if (!fs.rename(src, dst)) {
843      throw new IOException("Can not rename from " + src + " to " + dst);
844    }
845  }
846
847  /**
848   * A {@link PathFilter} that returns only regular files.
849   */
850  static class FileFilter extends AbstractFileStatusFilter {
851    private final FileSystem fs;
852
853    public FileFilter(final FileSystem fs) {
854      this.fs = fs;
855    }
856
857    @Override
858    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
859      try {
860        return isFile(fs, isDir, p);
861      } catch (IOException e) {
862        LOG.warn("Unable to verify if path={} is a regular file", p, e);
863        return false;
864      }
865    }
866  }
867
868  /**
869   * Directory filter that doesn't include any of the directories in the specified blacklist
870   */
871  public static class BlackListDirFilter extends AbstractFileStatusFilter {
872    private final FileSystem fs;
873    private List<String> blacklist;
874
875    /**
876     * Create a filter on the givem filesystem with the specified blacklist
877     * @param fs filesystem to filter
878     * @param directoryNameBlackList list of the names of the directories to filter. If
879     *          <tt>null</tt>, all directories are returned
880     */
881    @SuppressWarnings("unchecked")
882    public BlackListDirFilter(final FileSystem fs, final List<String> directoryNameBlackList) {
883      this.fs = fs;
884      blacklist =
885        (List<String>) (directoryNameBlackList == null ? Collections.emptyList()
886          : directoryNameBlackList);
887    }
888
889    @Override
890    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
891      if (!isValidName(p.getName())) {
892        return false;
893      }
894
895      try {
896        return isDirectory(fs, isDir, p);
897      } catch (IOException e) {
898        LOG.warn("An error occurred while verifying if [{}] is a valid directory."
899            + " Returning 'not valid' and continuing.", p, e);
900        return false;
901      }
902    }
903
904    protected boolean isValidName(final String name) {
905      return !blacklist.contains(name);
906    }
907  }
908
909  /**
910   * A {@link PathFilter} that only allows directories.
911   */
912  public static class DirFilter extends BlackListDirFilter {
913
914    public DirFilter(FileSystem fs) {
915      super(fs, null);
916    }
917  }
918
919  /**
920   * A {@link PathFilter} that returns usertable directories. To get all directories use the
921   * {@link BlackListDirFilter} with a <tt>null</tt> blacklist
922   */
923  public static class UserTableDirFilter extends BlackListDirFilter {
924    public UserTableDirFilter(FileSystem fs) {
925      super(fs, HConstants.HBASE_NON_TABLE_DIRS);
926    }
927
928    @Override
929    protected boolean isValidName(final String name) {
930      if (!super.isValidName(name))
931        return false;
932
933      try {
934        TableName.isLegalTableQualifierName(Bytes.toBytes(name));
935      } catch (IllegalArgumentException e) {
936        LOG.info("Invalid table name: {}", name);
937        return false;
938      }
939      return true;
940    }
941  }
942
943  public static List<Path> getTableDirs(final FileSystem fs, final Path rootdir)
944      throws IOException {
945    List<Path> tableDirs = new ArrayList<>();
946
947    for (FileStatus status : fs
948        .globStatus(new Path(rootdir, new Path(HConstants.BASE_NAMESPACE_DIR, "*")))) {
949      tableDirs.addAll(FSUtils.getLocalTableDirs(fs, status.getPath()));
950    }
951    return tableDirs;
952  }
953
954  /**
955   * @param fs
956   * @param rootdir
957   * @return All the table directories under <code>rootdir</code>. Ignore non table hbase folders such as
958   * .logs, .oldlogs, .corrupt folders.
959   * @throws IOException
960   */
961  public static List<Path> getLocalTableDirs(final FileSystem fs, final Path rootdir)
962      throws IOException {
963    // presumes any directory under hbase.rootdir is a table
964    FileStatus[] dirs = fs.listStatus(rootdir, new UserTableDirFilter(fs));
965    List<Path> tabledirs = new ArrayList<>(dirs.length);
966    for (FileStatus dir: dirs) {
967      tabledirs.add(dir.getPath());
968    }
969    return tabledirs;
970  }
971
972  /**
973   * Filter for all dirs that don't start with '.'
974   */
975  public static class RegionDirFilter extends AbstractFileStatusFilter {
976    // This pattern will accept 0.90+ style hex region dirs and older numeric region dir names.
977    final public static Pattern regionDirPattern = Pattern.compile("^[0-9a-f]*$");
978    final FileSystem fs;
979
980    public RegionDirFilter(FileSystem fs) {
981      this.fs = fs;
982    }
983
984    @Override
985    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
986      if (!regionDirPattern.matcher(p.getName()).matches()) {
987        return false;
988      }
989
990      try {
991        return isDirectory(fs, isDir, p);
992      } catch (IOException ioe) {
993        // Maybe the file was moved or the fs was disconnected.
994        LOG.warn("Skipping file {} due to IOException", p, ioe);
995        return false;
996      }
997    }
998  }
999
1000  /**
1001   * Given a particular table dir, return all the regiondirs inside it, excluding files such as
1002   * .tableinfo
1003   * @param fs A file system for the Path
1004   * @param tableDir Path to a specific table directory &lt;hbase.rootdir&gt;/&lt;tabledir&gt;
1005   * @return List of paths to valid region directories in table dir.
1006   * @throws IOException
1007   */
1008  public static List<Path> getRegionDirs(final FileSystem fs, final Path tableDir) throws IOException {
1009    // assumes we are in a table dir.
1010    List<FileStatus> rds = listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs));
1011    if (rds == null) {
1012      return Collections.emptyList();
1013    }
1014    List<Path> regionDirs = new ArrayList<>(rds.size());
1015    for (FileStatus rdfs: rds) {
1016      Path rdPath = rdfs.getPath();
1017      regionDirs.add(rdPath);
1018    }
1019    return regionDirs;
1020  }
1021
1022  public static Path getRegionDirFromRootDir(Path rootDir, RegionInfo region) {
1023    return getRegionDirFromTableDir(CommonFSUtils.getTableDir(rootDir, region.getTable()), region);
1024  }
1025
1026  public static Path getRegionDirFromTableDir(Path tableDir, RegionInfo region) {
1027    return getRegionDirFromTableDir(tableDir,
1028        ServerRegionReplicaUtil.getRegionInfoForFs(region).getEncodedName());
1029  }
1030
1031  public static Path getRegionDirFromTableDir(Path tableDir, String encodedRegionName) {
1032    return new Path(tableDir, encodedRegionName);
1033  }
1034
1035  /**
1036   * Filter for all dirs that are legal column family names.  This is generally used for colfam
1037   * dirs &lt;hbase.rootdir&gt;/&lt;tabledir&gt;/&lt;regiondir&gt;/&lt;colfamdir&gt;.
1038   */
1039  public static class FamilyDirFilter extends AbstractFileStatusFilter {
1040    final FileSystem fs;
1041
1042    public FamilyDirFilter(FileSystem fs) {
1043      this.fs = fs;
1044    }
1045
1046    @Override
1047    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1048      try {
1049        // throws IAE if invalid
1050        ColumnFamilyDescriptorBuilder.isLegalColumnFamilyName(Bytes.toBytes(p.getName()));
1051      } catch (IllegalArgumentException iae) {
1052        // path name is an invalid family name and thus is excluded.
1053        return false;
1054      }
1055
1056      try {
1057        return isDirectory(fs, isDir, p);
1058      } catch (IOException ioe) {
1059        // Maybe the file was moved or the fs was disconnected.
1060        LOG.warn("Skipping file {} due to IOException", p, ioe);
1061        return false;
1062      }
1063    }
1064  }
1065
1066  /**
1067   * Given a particular region dir, return all the familydirs inside it
1068   *
1069   * @param fs A file system for the Path
1070   * @param regionDir Path to a specific region directory
1071   * @return List of paths to valid family directories in region dir.
1072   * @throws IOException
1073   */
1074  public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir) throws IOException {
1075    // assumes we are in a region dir.
1076    FileStatus[] fds = fs.listStatus(regionDir, new FamilyDirFilter(fs));
1077    List<Path> familyDirs = new ArrayList<>(fds.length);
1078    for (FileStatus fdfs: fds) {
1079      Path fdPath = fdfs.getPath();
1080      familyDirs.add(fdPath);
1081    }
1082    return familyDirs;
1083  }
1084
1085  public static List<Path> getReferenceFilePaths(final FileSystem fs, final Path familyDir) throws IOException {
1086    List<FileStatus> fds = listStatusWithStatusFilter(fs, familyDir, new ReferenceFileFilter(fs));
1087    if (fds == null) {
1088      return Collections.emptyList();
1089    }
1090    List<Path> referenceFiles = new ArrayList<>(fds.size());
1091    for (FileStatus fdfs: fds) {
1092      Path fdPath = fdfs.getPath();
1093      referenceFiles.add(fdPath);
1094    }
1095    return referenceFiles;
1096  }
1097
1098  /**
1099   * Filter for HFiles that excludes reference files.
1100   */
1101  public static class HFileFilter extends AbstractFileStatusFilter {
1102    final FileSystem fs;
1103
1104    public HFileFilter(FileSystem fs) {
1105      this.fs = fs;
1106    }
1107
1108    @Override
1109    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1110      if (!StoreFileInfo.isHFile(p) && !StoreFileInfo.isMobFile(p)) {
1111        return false;
1112      }
1113
1114      try {
1115        return isFile(fs, isDir, p);
1116      } catch (IOException ioe) {
1117        // Maybe the file was moved or the fs was disconnected.
1118        LOG.warn("Skipping file {} due to IOException", p, ioe);
1119        return false;
1120      }
1121    }
1122  }
1123
1124  /**
1125   * Filter for HFileLinks (StoreFiles and HFiles not included).
1126   * the filter itself does not consider if a link is file or not.
1127   */
1128  public static class HFileLinkFilter implements PathFilter {
1129
1130    @Override
1131    public boolean accept(Path p) {
1132      return HFileLink.isHFileLink(p);
1133    }
1134  }
1135
1136  public static class ReferenceFileFilter extends AbstractFileStatusFilter {
1137
1138    private final FileSystem fs;
1139
1140    public ReferenceFileFilter(FileSystem fs) {
1141      this.fs = fs;
1142    }
1143
1144    @Override
1145    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1146      if (!StoreFileInfo.isReference(p)) {
1147        return false;
1148      }
1149
1150      try {
1151        // only files can be references.
1152        return isFile(fs, isDir, p);
1153      } catch (IOException ioe) {
1154        // Maybe the file was moved or the fs was disconnected.
1155        LOG.warn("Skipping file {} due to IOException", p, ioe);
1156        return false;
1157      }
1158    }
1159  }
1160
1161  /**
1162   * Called every so-often by storefile map builder getTableStoreFilePathMap to
1163   * report progress.
1164   */
1165  interface ProgressReporter {
1166    /**
1167     * @param status File or directory we are about to process.
1168     */
1169    void progress(FileStatus status);
1170  }
1171
1172  /**
1173   * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1174   * table StoreFile names to the full Path.
1175   * <br>
1176   * Example...<br>
1177   * Key = 3944417774205889744  <br>
1178   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1179   *
1180   * @param map map to add values.  If null, this method will create and populate one to return
1181   * @param fs  The file system to use.
1182   * @param hbaseRootDir  The root directory to scan.
1183   * @param tableName name of the table to scan.
1184   * @return Map keyed by StoreFile name with a value of the full Path.
1185   * @throws IOException When scanning the directory fails.
1186   * @throws InterruptedException
1187   */
1188  public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map,
1189  final FileSystem fs, final Path hbaseRootDir, TableName tableName)
1190  throws IOException, InterruptedException {
1191    return getTableStoreFilePathMap(map, fs, hbaseRootDir, tableName, null, null,
1192        (ProgressReporter)null);
1193  }
1194
1195  /**
1196   * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1197   * table StoreFile names to the full Path.  Note that because this method can be called
1198   * on a 'live' HBase system that we will skip files that no longer exist by the time
1199   * we traverse them and similarly the user of the result needs to consider that some
1200   * entries in this map may not exist by the time this call completes.
1201   * <br>
1202   * Example...<br>
1203   * Key = 3944417774205889744  <br>
1204   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1205   *
1206   * @param resultMap map to add values.  If null, this method will create and populate one to return
1207   * @param fs  The file system to use.
1208   * @param hbaseRootDir  The root directory to scan.
1209   * @param tableName name of the table to scan.
1210   * @param sfFilter optional path filter to apply to store files
1211   * @param executor optional executor service to parallelize this operation
1212   * @param progressReporter Instance or null; gets called every time we move to new region of
1213   *   family dir and for each store file.
1214   * @return Map keyed by StoreFile name with a value of the full Path.
1215   * @throws IOException When scanning the directory fails.
1216   * @deprecated Since 2.3.0. For removal in hbase4. Use ProgressReporter override instead.
1217   */
1218  @Deprecated
1219  public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap,
1220      final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter,
1221      ExecutorService executor, final HbckErrorReporter progressReporter)
1222      throws IOException, InterruptedException {
1223    return getTableStoreFilePathMap(resultMap, fs, hbaseRootDir, tableName, sfFilter, executor,
1224        new ProgressReporter() {
1225          @Override
1226          public void progress(FileStatus status) {
1227            // status is not used in this implementation.
1228            progressReporter.progress();
1229          }
1230        });
1231  }
1232
1233  /**
1234   * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1235   * table StoreFile names to the full Path.  Note that because this method can be called
1236   * on a 'live' HBase system that we will skip files that no longer exist by the time
1237   * we traverse them and similarly the user of the result needs to consider that some
1238   * entries in this map may not exist by the time this call completes.
1239   * <br>
1240   * Example...<br>
1241   * Key = 3944417774205889744  <br>
1242   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1243   *
1244   * @param resultMap map to add values.  If null, this method will create and populate one
1245   *   to return
1246   * @param fs  The file system to use.
1247   * @param hbaseRootDir  The root directory to scan.
1248   * @param tableName name of the table to scan.
1249   * @param sfFilter optional path filter to apply to store files
1250   * @param executor optional executor service to parallelize this operation
1251   * @param progressReporter Instance or null; gets called every time we move to new region of
1252   *   family dir and for each store file.
1253   * @return Map keyed by StoreFile name with a value of the full Path.
1254   * @throws IOException When scanning the directory fails.
1255   * @throws InterruptedException the thread is interrupted, either before or during the activity.
1256   */
1257  public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap,
1258      final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter,
1259      ExecutorService executor, final ProgressReporter progressReporter)
1260    throws IOException, InterruptedException {
1261
1262    final Map<String, Path> finalResultMap =
1263        resultMap == null ? new ConcurrentHashMap<>(128, 0.75f, 32) : resultMap;
1264
1265    // only include the directory paths to tables
1266    Path tableDir = CommonFSUtils.getTableDir(hbaseRootDir, tableName);
1267    // Inside a table, there are compaction.dir directories to skip.  Otherwise, all else
1268    // should be regions.
1269    final FamilyDirFilter familyFilter = new FamilyDirFilter(fs);
1270    final Vector<Exception> exceptions = new Vector<>();
1271
1272    try {
1273      List<FileStatus> regionDirs = FSUtils.listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs));
1274      if (regionDirs == null) {
1275        return finalResultMap;
1276      }
1277
1278      final List<Future<?>> futures = new ArrayList<>(regionDirs.size());
1279
1280      for (FileStatus regionDir : regionDirs) {
1281        if (null != progressReporter) {
1282          progressReporter.progress(regionDir);
1283        }
1284        final Path dd = regionDir.getPath();
1285
1286        if (!exceptions.isEmpty()) {
1287          break;
1288        }
1289
1290        Runnable getRegionStoreFileMapCall = new Runnable() {
1291          @Override
1292          public void run() {
1293            try {
1294              HashMap<String,Path> regionStoreFileMap = new HashMap<>();
1295              List<FileStatus> familyDirs = FSUtils.listStatusWithStatusFilter(fs, dd, familyFilter);
1296              if (familyDirs == null) {
1297                if (!fs.exists(dd)) {
1298                  LOG.warn("Skipping region because it no longer exists: " + dd);
1299                } else {
1300                  LOG.warn("Skipping region because it has no family dirs: " + dd);
1301                }
1302                return;
1303              }
1304              for (FileStatus familyDir : familyDirs) {
1305                if (null != progressReporter) {
1306                  progressReporter.progress(familyDir);
1307                }
1308                Path family = familyDir.getPath();
1309                if (family.getName().equals(HConstants.RECOVERED_EDITS_DIR)) {
1310                  continue;
1311                }
1312                // now in family, iterate over the StoreFiles and
1313                // put in map
1314                FileStatus[] familyStatus = fs.listStatus(family);
1315                for (FileStatus sfStatus : familyStatus) {
1316                  if (null != progressReporter) {
1317                    progressReporter.progress(sfStatus);
1318                  }
1319                  Path sf = sfStatus.getPath();
1320                  if (sfFilter == null || sfFilter.accept(sf)) {
1321                    regionStoreFileMap.put( sf.getName(), sf);
1322                  }
1323                }
1324              }
1325              finalResultMap.putAll(regionStoreFileMap);
1326            } catch (Exception e) {
1327              LOG.error("Could not get region store file map for region: " + dd, e);
1328              exceptions.add(e);
1329            }
1330          }
1331        };
1332
1333        // If executor is available, submit async tasks to exec concurrently, otherwise
1334        // just do serial sync execution
1335        if (executor != null) {
1336          Future<?> future = executor.submit(getRegionStoreFileMapCall);
1337          futures.add(future);
1338        } else {
1339          FutureTask<?> future = new FutureTask<>(getRegionStoreFileMapCall, null);
1340          future.run();
1341          futures.add(future);
1342        }
1343      }
1344
1345      // Ensure all pending tasks are complete (or that we run into an exception)
1346      for (Future<?> f : futures) {
1347        if (!exceptions.isEmpty()) {
1348          break;
1349        }
1350        try {
1351          f.get();
1352        } catch (ExecutionException e) {
1353          LOG.error("Unexpected exec exception!  Should've been caught already.  (Bug?)", e);
1354          // Shouldn't happen, we already logged/caught any exceptions in the Runnable
1355        }
1356      }
1357    } catch (IOException e) {
1358      LOG.error("Cannot execute getTableStoreFilePathMap for " + tableName, e);
1359      exceptions.add(e);
1360    } finally {
1361      if (!exceptions.isEmpty()) {
1362        // Just throw the first exception as an indication something bad happened
1363        // Don't need to propagate all the exceptions, we already logged them all anyway
1364        Throwables.propagateIfPossible(exceptions.firstElement(), IOException.class);
1365        throw new IOException(exceptions.firstElement());
1366      }
1367    }
1368
1369    return finalResultMap;
1370  }
1371
1372  public static int getRegionReferenceFileCount(final FileSystem fs, final Path p) {
1373    int result = 0;
1374    try {
1375      for (Path familyDir:getFamilyDirs(fs, p)){
1376        result += getReferenceFilePaths(fs, familyDir).size();
1377      }
1378    } catch (IOException e) {
1379      LOG.warn("Error counting reference files", e);
1380    }
1381    return result;
1382  }
1383
1384  /**
1385   * Runs through the HBase rootdir and creates a reverse lookup map for
1386   * table StoreFile names to the full Path.
1387   * <br>
1388   * Example...<br>
1389   * Key = 3944417774205889744  <br>
1390   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1391   *
1392   * @param fs  The file system to use.
1393   * @param hbaseRootDir  The root directory to scan.
1394   * @return Map keyed by StoreFile name with a value of the full Path.
1395   * @throws IOException When scanning the directory fails.
1396   */
1397  public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs,
1398      final Path hbaseRootDir)
1399  throws IOException, InterruptedException {
1400    return getTableStoreFilePathMap(fs, hbaseRootDir, null, null, (ProgressReporter)null);
1401  }
1402
1403  /**
1404   * Runs through the HBase rootdir and creates a reverse lookup map for
1405   * table StoreFile names to the full Path.
1406   * <br>
1407   * Example...<br>
1408   * Key = 3944417774205889744  <br>
1409   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1410   *
1411   * @param fs  The file system to use.
1412   * @param hbaseRootDir  The root directory to scan.
1413   * @param sfFilter optional path filter to apply to store files
1414   * @param executor optional executor service to parallelize this operation
1415   * @param progressReporter Instance or null; gets called every time we move to new region of
1416   *   family dir and for each store file.
1417   * @return Map keyed by StoreFile name with a value of the full Path.
1418   * @throws IOException When scanning the directory fails.
1419   * @deprecated Since 2.3.0. Will be removed in hbase4. Used {@link
1420   *   #getTableStoreFilePathMap(FileSystem, Path, PathFilter, ExecutorService, ProgressReporter)}
1421   */
1422  @Deprecated
1423  public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs,
1424      final Path hbaseRootDir, PathFilter sfFilter, ExecutorService executor,
1425      HbckErrorReporter progressReporter)
1426    throws IOException, InterruptedException {
1427    return getTableStoreFilePathMap(fs, hbaseRootDir, sfFilter, executor,
1428        new ProgressReporter() {
1429          @Override
1430          public void progress(FileStatus status) {
1431            // status is not used in this implementation.
1432            progressReporter.progress();
1433          }
1434        });
1435  }
1436
1437  /**
1438   * Runs through the HBase rootdir and creates a reverse lookup map for
1439   * table StoreFile names to the full Path.
1440   * <br>
1441   * Example...<br>
1442   * Key = 3944417774205889744  <br>
1443   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1444   *
1445   * @param fs  The file system to use.
1446   * @param hbaseRootDir  The root directory to scan.
1447   * @param sfFilter optional path filter to apply to store files
1448   * @param executor optional executor service to parallelize this operation
1449   * @param progressReporter Instance or null; gets called every time we move to new region of
1450   *   family dir and for each store file.
1451   * @return Map keyed by StoreFile name with a value of the full Path.
1452   * @throws IOException When scanning the directory fails.
1453   * @throws InterruptedException
1454   */
1455  public static Map<String, Path> getTableStoreFilePathMap(
1456    final FileSystem fs, final Path hbaseRootDir, PathFilter sfFilter,
1457        ExecutorService executor, ProgressReporter progressReporter)
1458  throws IOException, InterruptedException {
1459    ConcurrentHashMap<String, Path> map = new ConcurrentHashMap<>(1024, 0.75f, 32);
1460
1461    // if this method looks similar to 'getTableFragmentation' that is because
1462    // it was borrowed from it.
1463
1464    // only include the directory paths to tables
1465    for (Path tableDir : FSUtils.getTableDirs(fs, hbaseRootDir)) {
1466      getTableStoreFilePathMap(map, fs, hbaseRootDir, CommonFSUtils.getTableName(tableDir),
1467        sfFilter, executor, progressReporter);
1468    }
1469    return map;
1470  }
1471
1472  /**
1473   * Filters FileStatuses in an array and returns a list
1474   *
1475   * @param input   An array of FileStatuses
1476   * @param filter  A required filter to filter the array
1477   * @return        A list of FileStatuses
1478   */
1479  public static List<FileStatus> filterFileStatuses(FileStatus[] input,
1480      FileStatusFilter filter) {
1481    if (input == null) return null;
1482    return filterFileStatuses(Iterators.forArray(input), filter);
1483  }
1484
1485  /**
1486   * Filters FileStatuses in an iterator and returns a list
1487   *
1488   * @param input   An iterator of FileStatuses
1489   * @param filter  A required filter to filter the array
1490   * @return        A list of FileStatuses
1491   */
1492  public static List<FileStatus> filterFileStatuses(Iterator<FileStatus> input,
1493      FileStatusFilter filter) {
1494    if (input == null) return null;
1495    ArrayList<FileStatus> results = new ArrayList<>();
1496    while (input.hasNext()) {
1497      FileStatus f = input.next();
1498      if (filter.accept(f)) {
1499        results.add(f);
1500      }
1501    }
1502    return results;
1503  }
1504
1505  /**
1506   * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
1507   * This accommodates differences between hadoop versions, where hadoop 1
1508   * does not throw a FileNotFoundException, and return an empty FileStatus[]
1509   * while Hadoop 2 will throw FileNotFoundException.
1510   *
1511   * @param fs file system
1512   * @param dir directory
1513   * @param filter file status filter
1514   * @return null if dir is empty or doesn't exist, otherwise FileStatus list
1515   */
1516  public static List<FileStatus> listStatusWithStatusFilter(final FileSystem fs,
1517      final Path dir, final FileStatusFilter filter) throws IOException {
1518    FileStatus [] status = null;
1519    try {
1520      status = fs.listStatus(dir);
1521    } catch (FileNotFoundException fnfe) {
1522      LOG.trace("{} does not exist", dir);
1523      return null;
1524    }
1525
1526    if (ArrayUtils.getLength(status) == 0)  {
1527      return null;
1528    }
1529
1530    if (filter == null) {
1531      return Arrays.asList(status);
1532    } else {
1533      List<FileStatus> status2 = filterFileStatuses(status, filter);
1534      if (status2 == null || status2.isEmpty()) {
1535        return null;
1536      } else {
1537        return status2;
1538      }
1539    }
1540  }
1541
1542  /**
1543   * This function is to scan the root path of the file system to get the
1544   * degree of locality for each region on each of the servers having at least
1545   * one block of that region.
1546   * This is used by the tool {@link org.apache.hadoop.hbase.master.RegionPlacementMaintainer}
1547   *
1548   * @param conf
1549   *          the configuration to use
1550   * @return the mapping from region encoded name to a map of server names to
1551   *           locality fraction
1552   * @throws IOException
1553   *           in case of file system errors or interrupts
1554   */
1555  public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
1556      final Configuration conf) throws IOException {
1557    return getRegionDegreeLocalityMappingFromFS(
1558        conf, null,
1559        conf.getInt(THREAD_POOLSIZE, DEFAULT_THREAD_POOLSIZE));
1560
1561  }
1562
1563  /**
1564   * This function is to scan the root path of the file system to get the
1565   * degree of locality for each region on each of the servers having at least
1566   * one block of that region.
1567   *
1568   * @param conf
1569   *          the configuration to use
1570   * @param desiredTable
1571   *          the table you wish to scan locality for
1572   * @param threadPoolSize
1573   *          the thread pool size to use
1574   * @return the mapping from region encoded name to a map of server names to
1575   *           locality fraction
1576   * @throws IOException
1577   *           in case of file system errors or interrupts
1578   */
1579  public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
1580      final Configuration conf, final String desiredTable, int threadPoolSize)
1581      throws IOException {
1582    Map<String, Map<String, Float>> regionDegreeLocalityMapping = new ConcurrentHashMap<>();
1583    getRegionLocalityMappingFromFS(conf, desiredTable, threadPoolSize, regionDegreeLocalityMapping);
1584    return regionDegreeLocalityMapping;
1585  }
1586
1587  /**
1588   * This function is to scan the root path of the file system to get either the
1589   * mapping between the region name and its best locality region server or the
1590   * degree of locality of each region on each of the servers having at least
1591   * one block of that region. The output map parameters are both optional.
1592   *
1593   * @param conf
1594   *          the configuration to use
1595   * @param desiredTable
1596   *          the table you wish to scan locality for
1597   * @param threadPoolSize
1598   *          the thread pool size to use
1599   * @param regionDegreeLocalityMapping
1600   *          the map into which to put the locality degree mapping or null,
1601   *          must be a thread-safe implementation
1602   * @throws IOException
1603   *           in case of file system errors or interrupts
1604   */
1605  private static void getRegionLocalityMappingFromFS(final Configuration conf,
1606    final String desiredTable, int threadPoolSize,
1607    final Map<String, Map<String, Float>> regionDegreeLocalityMapping) throws IOException {
1608    final FileSystem fs = FileSystem.get(conf);
1609    final Path rootPath = CommonFSUtils.getRootDir(conf);
1610    final long startTime = EnvironmentEdgeManager.currentTime();
1611    final Path queryPath;
1612    // The table files are in ${hbase.rootdir}/data/<namespace>/<table>/*
1613    if (null == desiredTable) {
1614      queryPath =
1615        new Path(new Path(rootPath, HConstants.BASE_NAMESPACE_DIR).toString() + "/*/*/*/");
1616    } else {
1617      queryPath = new Path(
1618        CommonFSUtils.getTableDir(rootPath, TableName.valueOf(desiredTable)).toString() + "/*/");
1619    }
1620
1621    // reject all paths that are not appropriate
1622    PathFilter pathFilter = new PathFilter() {
1623      @Override
1624      public boolean accept(Path path) {
1625        // this is the region name; it may get some noise data
1626        if (null == path) {
1627          return false;
1628        }
1629
1630        // no parent?
1631        Path parent = path.getParent();
1632        if (null == parent) {
1633          return false;
1634        }
1635
1636        String regionName = path.getName();
1637        if (null == regionName) {
1638          return false;
1639        }
1640
1641        if (!regionName.toLowerCase(Locale.ROOT).matches("[0-9a-f]+")) {
1642          return false;
1643        }
1644        return true;
1645      }
1646    };
1647
1648    FileStatus[] statusList = fs.globStatus(queryPath, pathFilter);
1649
1650    if (LOG.isDebugEnabled()) {
1651      LOG.debug("Query Path: {} ; # list of files: {}", queryPath, Arrays.toString(statusList));
1652    }
1653
1654    if (null == statusList) {
1655      return;
1656    }
1657
1658    // lower the number of threads in case we have very few expected regions
1659    threadPoolSize = Math.min(threadPoolSize, statusList.length);
1660
1661    // run in multiple threads
1662    final ExecutorService tpe = Executors.newFixedThreadPool(threadPoolSize,
1663      Threads.newDaemonThreadFactory("FSRegionQuery"));
1664    try {
1665      // ignore all file status items that are not of interest
1666      for (FileStatus regionStatus : statusList) {
1667        if (null == regionStatus || !regionStatus.isDirectory()) {
1668          continue;
1669        }
1670
1671        final Path regionPath = regionStatus.getPath();
1672        if (null != regionPath) {
1673          tpe.execute(new FSRegionScanner(fs, regionPath, null, regionDegreeLocalityMapping));
1674        }
1675      }
1676    } finally {
1677      tpe.shutdown();
1678      final long threadWakeFrequency = (long) conf.getInt(HConstants.THREAD_WAKE_FREQUENCY,
1679        HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
1680      try {
1681        // here we wait until TPE terminates, which is either naturally or by
1682        // exceptions in the execution of the threads
1683        while (!tpe.awaitTermination(threadWakeFrequency,
1684            TimeUnit.MILLISECONDS)) {
1685          // printing out rough estimate, so as to not introduce
1686          // AtomicInteger
1687          LOG.info("Locality checking is underway: { Scanned Regions : "
1688              + ((ThreadPoolExecutor) tpe).getCompletedTaskCount() + "/"
1689              + ((ThreadPoolExecutor) tpe).getTaskCount() + " }");
1690        }
1691      } catch (InterruptedException e) {
1692        Thread.currentThread().interrupt();
1693        throw (InterruptedIOException) new InterruptedIOException().initCause(e);
1694      }
1695    }
1696
1697    long overhead = EnvironmentEdgeManager.currentTime() - startTime;
1698    LOG.info("Scan DFS for locality info takes {}ms", overhead);
1699  }
1700
1701  /**
1702   * Do our short circuit read setup.
1703   * Checks buffer size to use and whether to do checksumming in hbase or hdfs.
1704   * @param conf
1705   */
1706  public static void setupShortCircuitRead(final Configuration conf) {
1707    // Check that the user has not set the "dfs.client.read.shortcircuit.skip.checksum" property.
1708    boolean shortCircuitSkipChecksum =
1709      conf.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false);
1710    boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
1711    if (shortCircuitSkipChecksum) {
1712      LOG.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not " +
1713        "be set to true." + (useHBaseChecksum ? " HBase checksum doesn't require " +
1714        "it, see https://issues.apache.org/jira/browse/HBASE-6868." : ""));
1715      assert !shortCircuitSkipChecksum; //this will fail if assertions are on
1716    }
1717    checkShortCircuitReadBufferSize(conf);
1718  }
1719
1720  /**
1721   * Check if short circuit read buffer size is set and if not, set it to hbase value.
1722   * @param conf
1723   */
1724  public static void checkShortCircuitReadBufferSize(final Configuration conf) {
1725    final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2;
1726    final int notSet = -1;
1727    // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2
1728    final String dfsKey = "dfs.client.read.shortcircuit.buffer.size";
1729    int size = conf.getInt(dfsKey, notSet);
1730    // If a size is set, return -- we will use it.
1731    if (size != notSet) return;
1732    // But short circuit buffer size is normally not set.  Put in place the hbase wanted size.
1733    int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize);
1734    conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
1735  }
1736
1737  /**
1738   * @param c
1739   * @return The DFSClient DFSHedgedReadMetrics instance or null if can't be found or not on hdfs.
1740   * @throws IOException
1741   */
1742  public static DFSHedgedReadMetrics getDFSHedgedReadMetrics(final Configuration c)
1743      throws IOException {
1744    if (!CommonFSUtils.isHDFS(c)) {
1745      return null;
1746    }
1747    // getHedgedReadMetrics is package private. Get the DFSClient instance that is internal
1748    // to the DFS FS instance and make the method getHedgedReadMetrics accessible, then invoke it
1749    // to get the singleton instance of DFSHedgedReadMetrics shared by DFSClients.
1750    final String name = "getHedgedReadMetrics";
1751    DFSClient dfsclient = ((DistributedFileSystem)FileSystem.get(c)).getClient();
1752    Method m;
1753    try {
1754      m = dfsclient.getClass().getDeclaredMethod(name);
1755    } catch (NoSuchMethodException e) {
1756      LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " +
1757          e.getMessage());
1758      return null;
1759    } catch (SecurityException e) {
1760      LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " +
1761          e.getMessage());
1762      return null;
1763    }
1764    m.setAccessible(true);
1765    try {
1766      return (DFSHedgedReadMetrics)m.invoke(dfsclient);
1767    } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
1768      LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " +
1769          e.getMessage());
1770      return null;
1771    }
1772  }
1773
1774  public static List<Path> copyFilesParallel(FileSystem srcFS, Path src, FileSystem dstFS, Path dst,
1775      Configuration conf, int threads) throws IOException {
1776    ExecutorService pool = Executors.newFixedThreadPool(threads);
1777    List<Future<Void>> futures = new ArrayList<>();
1778    List<Path> traversedPaths;
1779    try {
1780      traversedPaths = copyFiles(srcFS, src, dstFS, dst, conf, pool, futures);
1781      for (Future<Void> future : futures) {
1782        future.get();
1783      }
1784    } catch (ExecutionException | InterruptedException | IOException e) {
1785      throw new IOException("Copy snapshot reference files failed", e);
1786    } finally {
1787      pool.shutdownNow();
1788    }
1789    return traversedPaths;
1790  }
1791
1792  private static List<Path> copyFiles(FileSystem srcFS, Path src, FileSystem dstFS, Path dst,
1793      Configuration conf, ExecutorService pool, List<Future<Void>> futures) throws IOException {
1794    List<Path> traversedPaths = new ArrayList<>();
1795    traversedPaths.add(dst);
1796    FileStatus currentFileStatus = srcFS.getFileStatus(src);
1797    if (currentFileStatus.isDirectory()) {
1798      if (!dstFS.mkdirs(dst)) {
1799        throw new IOException("Create directory failed: " + dst);
1800      }
1801      FileStatus[] subPaths = srcFS.listStatus(src);
1802      for (FileStatus subPath : subPaths) {
1803        traversedPaths.addAll(copyFiles(srcFS, subPath.getPath(), dstFS,
1804          new Path(dst, subPath.getPath().getName()), conf, pool, futures));
1805      }
1806    } else {
1807      Future<Void> future = pool.submit(() -> {
1808        FileUtil.copy(srcFS, src, dstFS, dst, false, false, conf);
1809        return null;
1810      });
1811      futures.add(future);
1812    }
1813    return traversedPaths;
1814  }
1815
1816  /**
1817   * @return A set containing all namenode addresses of fs
1818   */
1819  private static Set<InetSocketAddress> getNNAddresses(DistributedFileSystem fs,
1820    Configuration conf) {
1821    Set<InetSocketAddress> addresses = new HashSet<>();
1822    String serviceName = fs.getCanonicalServiceName();
1823
1824    if (serviceName.startsWith("ha-hdfs")) {
1825      try {
1826        Map<String, Map<String, InetSocketAddress>> addressMap =
1827          DFSUtil.getNNServiceRpcAddressesForCluster(conf);
1828        String nameService = serviceName.substring(serviceName.indexOf(":") + 1);
1829        if (addressMap.containsKey(nameService)) {
1830          Map<String, InetSocketAddress> nnMap = addressMap.get(nameService);
1831          for (Map.Entry<String, InetSocketAddress> e2 : nnMap.entrySet()) {
1832            InetSocketAddress addr = e2.getValue();
1833            addresses.add(addr);
1834          }
1835        }
1836      } catch (Exception e) {
1837        LOG.warn("DFSUtil.getNNServiceRpcAddresses failed. serviceName=" + serviceName, e);
1838      }
1839    } else {
1840      URI uri = fs.getUri();
1841      int port = uri.getPort();
1842      if (port < 0) {
1843        int idx = serviceName.indexOf(':');
1844        port = Integer.parseInt(serviceName.substring(idx + 1));
1845      }
1846      InetSocketAddress addr = new InetSocketAddress(uri.getHost(), port);
1847      addresses.add(addr);
1848    }
1849
1850    return addresses;
1851  }
1852
1853  /**
1854   * @param conf the Configuration of HBase
1855   * @return Whether srcFs and desFs are on same hdfs or not
1856   */
1857  public static boolean isSameHdfs(Configuration conf, FileSystem srcFs, FileSystem desFs) {
1858    // By getCanonicalServiceName, we could make sure both srcFs and desFs
1859    // show a unified format which contains scheme, host and port.
1860    String srcServiceName = srcFs.getCanonicalServiceName();
1861    String desServiceName = desFs.getCanonicalServiceName();
1862
1863    if (srcServiceName == null || desServiceName == null) {
1864      return false;
1865    }
1866    if (srcServiceName.equals(desServiceName)) {
1867      return true;
1868    }
1869    if (srcServiceName.startsWith("ha-hdfs") && desServiceName.startsWith("ha-hdfs")) {
1870      Collection<String> internalNameServices =
1871        conf.getTrimmedStringCollection("dfs.internal.nameservices");
1872      if (!internalNameServices.isEmpty()) {
1873        if (internalNameServices.contains(srcServiceName.split(":")[1])) {
1874          return true;
1875        } else {
1876          return false;
1877        }
1878      }
1879    }
1880    if (srcFs instanceof DistributedFileSystem && desFs instanceof DistributedFileSystem) {
1881      // If one serviceName is an HA format while the other is a non-HA format,
1882      // maybe they refer to the same FileSystem.
1883      // For example, srcFs is "ha-hdfs://nameservices" and desFs is "hdfs://activeNamenode:port"
1884      Set<InetSocketAddress> srcAddrs = getNNAddresses((DistributedFileSystem) srcFs, conf);
1885      Set<InetSocketAddress> desAddrs = getNNAddresses((DistributedFileSystem) desFs, conf);
1886      if (Sets.intersection(srcAddrs, desAddrs).size() > 0) {
1887        return true;
1888      }
1889    }
1890
1891    return false;
1892  }
1893}