001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import static org.apache.hadoop.hbase.util.LocatedBlockHelper.getLocatedBlockLocations;
021import static org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET;
022
023import edu.umd.cs.findbugs.annotations.CheckForNull;
024import java.io.ByteArrayInputStream;
025import java.io.DataInputStream;
026import java.io.EOFException;
027import java.io.FileNotFoundException;
028import java.io.IOException;
029import java.io.InterruptedIOException;
030import java.lang.reflect.InvocationTargetException;
031import java.lang.reflect.Method;
032import java.net.InetSocketAddress;
033import java.net.URI;
034import java.util.ArrayList;
035import java.util.Arrays;
036import java.util.Collection;
037import java.util.Collections;
038import java.util.HashMap;
039import java.util.HashSet;
040import java.util.Iterator;
041import java.util.List;
042import java.util.Locale;
043import java.util.Map;
044import java.util.Optional;
045import java.util.Set;
046import java.util.Vector;
047import java.util.concurrent.ConcurrentHashMap;
048import java.util.concurrent.ExecutionException;
049import java.util.concurrent.ExecutorService;
050import java.util.concurrent.Executors;
051import java.util.concurrent.Future;
052import java.util.concurrent.FutureTask;
053import java.util.concurrent.ThreadPoolExecutor;
054import java.util.concurrent.TimeUnit;
055import java.util.regex.Pattern;
056import org.apache.commons.lang3.ArrayUtils;
057import org.apache.hadoop.conf.Configuration;
058import org.apache.hadoop.fs.BlockLocation;
059import org.apache.hadoop.fs.FSDataInputStream;
060import org.apache.hadoop.fs.FSDataOutputStream;
061import org.apache.hadoop.fs.FileStatus;
062import org.apache.hadoop.fs.FileSystem;
063import org.apache.hadoop.fs.FileUtil;
064import org.apache.hadoop.fs.Path;
065import org.apache.hadoop.fs.PathFilter;
066import org.apache.hadoop.fs.StorageType;
067import org.apache.hadoop.fs.permission.FsPermission;
068import org.apache.hadoop.hbase.ClusterId;
069import org.apache.hadoop.hbase.HConstants;
070import org.apache.hadoop.hbase.HDFSBlocksDistribution;
071import org.apache.hadoop.hbase.TableName;
072import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
073import org.apache.hadoop.hbase.client.RegionInfo;
074import org.apache.hadoop.hbase.client.RegionInfoBuilder;
075import org.apache.hadoop.hbase.exceptions.DeserializationException;
076import org.apache.hadoop.hbase.fs.HFileSystem;
077import org.apache.hadoop.hbase.io.HFileLink;
078import org.apache.hadoop.hbase.master.HMaster;
079import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
080import org.apache.hadoop.hdfs.DFSClient;
081import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
082import org.apache.hadoop.hdfs.DFSUtil;
083import org.apache.hadoop.hdfs.DistributedFileSystem;
084import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
085import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
086import org.apache.hadoop.hdfs.protocol.LocatedBlock;
087import org.apache.hadoop.io.IOUtils;
088import org.apache.hadoop.ipc.RemoteException;
089import org.apache.yetus.audience.InterfaceAudience;
090import org.slf4j.Logger;
091import org.slf4j.LoggerFactory;
092
093import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
094import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
095import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
096import org.apache.hbase.thirdparty.com.google.common.primitives.Ints;
097import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
098
099import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
100import org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos;
101
102/**
103 * Utility methods for interacting with the underlying file system.
104 */
105@InterfaceAudience.Private
106public final class FSUtils {
107  private static final Logger LOG = LoggerFactory.getLogger(FSUtils.class);
108
109  private static final String THREAD_POOLSIZE = "hbase.client.localityCheck.threadPoolSize";
110  private static final int DEFAULT_THREAD_POOLSIZE = 2;
111
112  /** Set to true on Windows platforms */
113  // currently only used in testing. TODO refactor into a test class
114  public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
115
116  private static Class<?> safeModeClazz = null;
117  private static Class<?> safeModeActionClazz = null;
118  private static Object safeModeGet = null;
119  {
120    try {
121      safeModeClazz = Class.forName("org.apache.hadoop.fs.SafeMode");
122      safeModeActionClazz = Class.forName("org.apache.hadoop.fs.SafeModeAction");
123      safeModeGet = safeModeClazz.getField("SAFEMODE_GET").get(null);
124    } catch (ClassNotFoundException | NoSuchFieldException e) {
125      LOG.debug("SafeMode interface not in the classpath, this means Hadoop 3.3.5 or below.");
126    } catch (IllegalAccessException e) {
127      LOG.error("SafeModeAction.SAFEMODE_GET is not accessible. "
128        + "Unexpected Hadoop version or messy classpath?", e);
129      throw new RuntimeException(e);
130    }
131  }
132
133  private FSUtils() {
134  }
135
136  /** Returns True is <code>fs</code> is instance of DistributedFileSystem n */
137  public static boolean isDistributedFileSystem(final FileSystem fs) throws IOException {
138    FileSystem fileSystem = fs;
139    // If passed an instance of HFileSystem, it fails instanceof DistributedFileSystem.
140    // Check its backing fs for dfs-ness.
141    if (fs instanceof HFileSystem) {
142      fileSystem = ((HFileSystem) fs).getBackingFs();
143    }
144    return fileSystem instanceof DistributedFileSystem;
145  }
146
147  /**
148   * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
149   * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true. Does not consider
150   * schema; i.e. if schemas different but path or subpath matches, the two will equate.
151   * @param pathToSearch Path we will be trying to match.
152   * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
153   */
154  public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) {
155    Path tailPath = pathTail;
156    String tailName;
157    Path toSearch = pathToSearch;
158    String toSearchName;
159    boolean result = false;
160
161    if (pathToSearch.depth() != pathTail.depth()) {
162      return false;
163    }
164
165    do {
166      tailName = tailPath.getName();
167      if (tailName == null || tailName.isEmpty()) {
168        result = true;
169        break;
170      }
171      toSearchName = toSearch.getName();
172      if (toSearchName == null || toSearchName.isEmpty()) {
173        break;
174      }
175      // Move up a parent on each path for next go around. Path doesn't let us go off the end.
176      tailPath = tailPath.getParent();
177      toSearch = toSearch.getParent();
178    } while (tailName.equals(toSearchName));
179    return result;
180  }
181
182  /**
183   * Delete the region directory if exists.
184   * @return True if deleted the region directory.
185   */
186  public static boolean deleteRegionDir(final Configuration conf, final RegionInfo hri)
187    throws IOException {
188    Path rootDir = CommonFSUtils.getRootDir(conf);
189    FileSystem fs = rootDir.getFileSystem(conf);
190    return CommonFSUtils.deleteDirectory(fs,
191      new Path(CommonFSUtils.getTableDir(rootDir, hri.getTable()), hri.getEncodedName()));
192  }
193
194  /**
195   * Create the specified file on the filesystem. By default, this will:
196   * <ol>
197   * <li>overwrite the file if it exists</li>
198   * <li>apply the umask in the configuration (if it is enabled)</li>
199   * <li>use the fs configured buffer size (or 4096 if not set)</li>
200   * <li>use the configured column family replication or default replication if
201   * {@link ColumnFamilyDescriptorBuilder#DEFAULT_DFS_REPLICATION}</li>
202   * <li>use the default block size</li>
203   * <li>not track progress</li>
204   * </ol>
205   * @param conf         configurations
206   * @param fs           {@link FileSystem} on which to write the file
207   * @param path         {@link Path} to the file to write
208   * @param perm         permissions
209   * @param favoredNodes favored data nodes
210   * @return output stream to the created file
211   * @throws IOException if the file cannot be created
212   */
213  public static FSDataOutputStream create(Configuration conf, FileSystem fs, Path path,
214    FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException {
215    if (fs instanceof HFileSystem) {
216      FileSystem backingFs = ((HFileSystem) fs).getBackingFs();
217      if (backingFs instanceof DistributedFileSystem) {
218        short replication = Short.parseShort(conf.get(ColumnFamilyDescriptorBuilder.DFS_REPLICATION,
219          String.valueOf(ColumnFamilyDescriptorBuilder.DEFAULT_DFS_REPLICATION)));
220        DistributedFileSystem.HdfsDataOutputStreamBuilder builder =
221          ((DistributedFileSystem) backingFs).createFile(path).recursive().permission(perm)
222            .create();
223        if (favoredNodes != null) {
224          builder.favoredNodes(favoredNodes);
225        }
226        if (replication > 0) {
227          builder.replication(replication);
228        }
229        return builder.build();
230      }
231
232    }
233    return CommonFSUtils.create(fs, path, perm, true);
234  }
235
236  /**
237   * Checks to see if the specified file system is available
238   * @param fs filesystem
239   * @throws IOException e
240   */
241  public static void checkFileSystemAvailable(final FileSystem fs) throws IOException {
242    if (!(fs instanceof DistributedFileSystem)) {
243      return;
244    }
245    IOException exception = null;
246    DistributedFileSystem dfs = (DistributedFileSystem) fs;
247    try {
248      if (dfs.exists(new Path("/"))) {
249        return;
250      }
251    } catch (IOException e) {
252      exception = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
253    }
254    try {
255      fs.close();
256    } catch (Exception e) {
257      LOG.error("file system close failed: ", e);
258    }
259    throw new IOException("File system is not available", exception);
260  }
261
262  /**
263   * Inquire the Active NameNode's safe mode status.
264   * @param dfs A DistributedFileSystem object representing the underlying HDFS.
265   * @return whether we're in safe mode
266   */
267  private static boolean isInSafeMode(FileSystem dfs) throws IOException {
268    if (isDistributedFileSystem(dfs)) {
269      return ((DistributedFileSystem) dfs).setSafeMode(SAFEMODE_GET, true);
270    } else {
271      try {
272        Object ret = dfs.getClass()
273          .getMethod("setSafeMode", new Class[] { safeModeActionClazz, Boolean.class })
274          .invoke(dfs, safeModeGet, true);
275        return (Boolean) ret;
276      } catch (InvocationTargetException | IllegalAccessException | NoSuchMethodException e) {
277        LOG.error("The file system does not support setSafeMode(). Abort.", e);
278        throw new RuntimeException(e);
279      }
280    }
281  }
282
283  /**
284   * Check whether dfs is in safemode.
285   */
286  public static void checkDfsSafeMode(final Configuration conf) throws IOException {
287    boolean isInSafeMode = false;
288    FileSystem fs = FileSystem.get(conf);
289    if (supportSafeMode(fs)) {
290      isInSafeMode = isInSafeMode(fs);
291    }
292    if (isInSafeMode) {
293      throw new IOException("File system is in safemode, it can't be written now");
294    }
295  }
296
297  /**
298   * Verifies current version of file system
299   * @param fs      filesystem object
300   * @param rootdir root hbase directory
301   * @return null if no version file exists, version string otherwise
302   * @throws IOException              if the version file fails to open
303   * @throws DeserializationException if the version data cannot be translated into a version
304   */
305  public static String getVersion(FileSystem fs, Path rootdir)
306    throws IOException, DeserializationException {
307    final Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
308    FileStatus[] status = null;
309    try {
310      // hadoop 2.0 throws FNFE if directory does not exist.
311      // hadoop 1.0 returns null if directory does not exist.
312      status = fs.listStatus(versionFile);
313    } catch (FileNotFoundException fnfe) {
314      return null;
315    }
316    if (ArrayUtils.getLength(status) == 0) {
317      return null;
318    }
319    String version = null;
320    byte[] content = new byte[(int) status[0].getLen()];
321    FSDataInputStream s = fs.open(versionFile);
322    try {
323      IOUtils.readFully(s, content, 0, content.length);
324      if (ProtobufUtil.isPBMagicPrefix(content)) {
325        version = parseVersionFrom(content);
326      } else {
327        // Presume it pre-pb format.
328        try (DataInputStream dis = new DataInputStream(new ByteArrayInputStream(content))) {
329          version = dis.readUTF();
330        }
331      }
332    } catch (EOFException eof) {
333      LOG.warn("Version file was empty, odd, will try to set it.");
334    } finally {
335      s.close();
336    }
337    return version;
338  }
339
340  /**
341   * Parse the content of the ${HBASE_ROOTDIR}/hbase.version file.
342   * @param bytes The byte content of the hbase.version file
343   * @return The version found in the file as a String
344   * @throws DeserializationException if the version data cannot be translated into a version
345   */
346  static String parseVersionFrom(final byte[] bytes) throws DeserializationException {
347    ProtobufUtil.expectPBMagicPrefix(bytes);
348    int pblen = ProtobufUtil.lengthOfPBMagic();
349    FSProtos.HBaseVersionFileContent.Builder builder =
350      FSProtos.HBaseVersionFileContent.newBuilder();
351    try {
352      ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
353      return builder.getVersion();
354    } catch (IOException e) {
355      // Convert
356      throw new DeserializationException(e);
357    }
358  }
359
360  /**
361   * Create the content to write into the ${HBASE_ROOTDIR}/hbase.version file.
362   * @param version Version to persist
363   * @return Serialized protobuf with <code>version</code> content and a bit of pb magic for a
364   *         prefix.
365   */
366  static byte[] toVersionByteArray(final String version) {
367    FSProtos.HBaseVersionFileContent.Builder builder =
368      FSProtos.HBaseVersionFileContent.newBuilder();
369    return ProtobufUtil.prependPBMagic(builder.setVersion(version).build().toByteArray());
370  }
371
372  /**
373   * Verifies current version of file system
374   * @param fs      file system
375   * @param rootdir root directory of HBase installation
376   * @param message if true, issues a message on System.out
377   * @throws IOException              if the version file cannot be opened
378   * @throws DeserializationException if the contents of the version file cannot be parsed
379   */
380  public static void checkVersion(FileSystem fs, Path rootdir, boolean message)
381    throws IOException, DeserializationException {
382    checkVersion(fs, rootdir, message, 0, HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
383  }
384
385  /**
386   * Verifies current version of file system
387   * @param fs      file system
388   * @param rootdir root directory of HBase installation
389   * @param message if true, issues a message on System.out
390   * @param wait    wait interval
391   * @param retries number of times to retry
392   * @throws IOException              if the version file cannot be opened
393   * @throws DeserializationException if the contents of the version file cannot be parsed
394   */
395  public static void checkVersion(FileSystem fs, Path rootdir, boolean message, int wait,
396    int retries) throws IOException, DeserializationException {
397    String version = getVersion(fs, rootdir);
398    String msg;
399    if (version == null) {
400      if (!metaRegionExists(fs, rootdir)) {
401        // rootDir is empty (no version file and no root region)
402        // just create new version file (HBASE-1195)
403        setVersion(fs, rootdir, wait, retries);
404        return;
405      } else {
406        msg = "hbase.version file is missing. Is your hbase.rootdir valid? "
407          + "You can restore hbase.version file by running 'HBCK2 filesystem -fix'. "
408          + "See https://github.com/apache/hbase-operator-tools/tree/master/hbase-hbck2";
409      }
410    } else if (version.compareTo(HConstants.FILE_SYSTEM_VERSION) == 0) {
411      return;
412    } else {
413      msg = "HBase file layout needs to be upgraded. Current filesystem version is " + version
414        + " but software requires version " + HConstants.FILE_SYSTEM_VERSION
415        + ". Consult http://hbase.apache.org/book.html for further information about "
416        + "upgrading HBase.";
417    }
418
419    // version is deprecated require migration
420    // Output on stdout so user sees it in terminal.
421    if (message) {
422      System.out.println("WARNING! " + msg);
423    }
424    throw new FileSystemVersionException(msg);
425  }
426
427  /**
428   * Sets version of file system
429   * @param fs      filesystem object
430   * @param rootdir hbase root
431   * @throws IOException e
432   */
433  public static void setVersion(FileSystem fs, Path rootdir) throws IOException {
434    setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, 0,
435      HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
436  }
437
438  /**
439   * Sets version of file system
440   * @param fs      filesystem object
441   * @param rootdir hbase root
442   * @param wait    time to wait for retry
443   * @param retries number of times to retry before failing
444   * @throws IOException e
445   */
446  public static void setVersion(FileSystem fs, Path rootdir, int wait, int retries)
447    throws IOException {
448    setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, wait, retries);
449  }
450
451  /**
452   * Sets version of file system
453   * @param fs      filesystem object
454   * @param rootdir hbase root directory
455   * @param version version to set
456   * @param wait    time to wait for retry
457   * @param retries number of times to retry before throwing an IOException
458   * @throws IOException e
459   */
460  public static void setVersion(FileSystem fs, Path rootdir, String version, int wait, int retries)
461    throws IOException {
462    Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
463    Path tempVersionFile = new Path(rootdir,
464      HConstants.HBASE_TEMP_DIRECTORY + Path.SEPARATOR + HConstants.VERSION_FILE_NAME);
465    while (true) {
466      try {
467        // Write the version to a temporary file
468        FSDataOutputStream s = fs.create(tempVersionFile);
469        try {
470          s.write(toVersionByteArray(version));
471          s.close();
472          s = null;
473          // Move the temp version file to its normal location. Returns false
474          // if the rename failed. Throw an IOE in that case.
475          if (!fs.rename(tempVersionFile, versionFile)) {
476            throw new IOException("Unable to move temp version file to " + versionFile);
477          }
478        } finally {
479          // Cleaning up the temporary if the rename failed would be trying
480          // too hard. We'll unconditionally create it again the next time
481          // through anyway, files are overwritten by default by create().
482
483          // Attempt to close the stream on the way out if it is still open.
484          try {
485            if (s != null) s.close();
486          } catch (IOException ignore) {
487          }
488        }
489        LOG.info("Created version file at " + rootdir.toString() + " with version=" + version);
490        return;
491      } catch (IOException e) {
492        if (retries > 0) {
493          LOG.debug("Unable to create version file at " + rootdir.toString() + ", retrying", e);
494          fs.delete(versionFile, false);
495          try {
496            if (wait > 0) {
497              Thread.sleep(wait);
498            }
499          } catch (InterruptedException ie) {
500            throw (InterruptedIOException) new InterruptedIOException().initCause(ie);
501          }
502          retries--;
503        } else {
504          throw e;
505        }
506      }
507    }
508  }
509
510  /**
511   * Checks that a cluster ID file exists in the HBase root directory
512   * @param fs      the root directory FileSystem
513   * @param rootdir the HBase root directory in HDFS
514   * @param wait    how long to wait between retries
515   * @return <code>true</code> if the file exists, otherwise <code>false</code>
516   * @throws IOException if checking the FileSystem fails
517   */
518  public static boolean checkClusterIdExists(FileSystem fs, Path rootdir, long wait)
519    throws IOException {
520    while (true) {
521      try {
522        Path filePath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
523        return fs.exists(filePath);
524      } catch (IOException ioe) {
525        if (wait > 0L) {
526          LOG.warn("Unable to check cluster ID file in {}, retrying in {}ms", rootdir, wait, ioe);
527          try {
528            Thread.sleep(wait);
529          } catch (InterruptedException e) {
530            Thread.currentThread().interrupt();
531            throw (InterruptedIOException) new InterruptedIOException().initCause(e);
532          }
533        } else {
534          throw ioe;
535        }
536      }
537    }
538  }
539
540  /**
541   * Returns the value of the unique cluster ID stored for this HBase instance.
542   * @param fs      the root directory FileSystem
543   * @param rootdir the path to the HBase root directory
544   * @return the unique cluster identifier
545   * @throws IOException if reading the cluster ID file fails
546   */
547  public static ClusterId getClusterId(FileSystem fs, Path rootdir) throws IOException {
548    Path idPath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
549    ClusterId clusterId = null;
550    FileStatus status = fs.exists(idPath) ? fs.getFileStatus(idPath) : null;
551    if (status != null) {
552      int len = Ints.checkedCast(status.getLen());
553      byte[] content = new byte[len];
554      FSDataInputStream in = fs.open(idPath);
555      try {
556        in.readFully(content);
557      } catch (EOFException eof) {
558        LOG.warn("Cluster ID file {} is empty", idPath);
559      } finally {
560        in.close();
561      }
562      try {
563        clusterId = ClusterId.parseFrom(content);
564      } catch (DeserializationException e) {
565        throw new IOException("content=" + Bytes.toString(content), e);
566      }
567      // If not pb'd, make it so.
568      if (!ProtobufUtil.isPBMagicPrefix(content)) {
569        String cid = null;
570        in = fs.open(idPath);
571        try {
572          cid = in.readUTF();
573          clusterId = new ClusterId(cid);
574        } catch (EOFException eof) {
575          LOG.warn("Cluster ID file {} is empty", idPath);
576        } finally {
577          in.close();
578        }
579        rewriteAsPb(fs, rootdir, idPath, clusterId);
580      }
581      return clusterId;
582    } else {
583      LOG.warn("Cluster ID file does not exist at {}", idPath);
584    }
585    return clusterId;
586  }
587
588  /**
589   *   */
590  private static void rewriteAsPb(final FileSystem fs, final Path rootdir, final Path p,
591    final ClusterId cid) throws IOException {
592    // Rewrite the file as pb. Move aside the old one first, write new
593    // then delete the moved-aside file.
594    Path movedAsideName = new Path(p + "." + EnvironmentEdgeManager.currentTime());
595    if (!fs.rename(p, movedAsideName)) throw new IOException("Failed rename of " + p);
596    setClusterId(fs, rootdir, cid, 100);
597    if (!fs.delete(movedAsideName, false)) {
598      throw new IOException("Failed delete of " + movedAsideName);
599    }
600    LOG.debug("Rewrote the hbase.id file as pb");
601  }
602
603  /**
604   * Writes a new unique identifier for this cluster to the "hbase.id" file in the HBase root
605   * directory. If any operations on the ID file fails, and {@code wait} is a positive value, the
606   * method will retry to produce the ID file until the thread is forcibly interrupted.
607   * @param fs        the root directory FileSystem
608   * @param rootdir   the path to the HBase root directory
609   * @param clusterId the unique identifier to store
610   * @param wait      how long (in milliseconds) to wait between retries
611   * @throws IOException if writing to the FileSystem fails and no wait value
612   */
613  public static void setClusterId(final FileSystem fs, final Path rootdir,
614    final ClusterId clusterId, final long wait) throws IOException {
615
616    final Path idFile = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
617    final Path tempDir = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY);
618    final Path tempIdFile = new Path(tempDir, HConstants.CLUSTER_ID_FILE_NAME);
619
620    LOG.debug("Create cluster ID file [{}] with ID: {}", idFile, clusterId);
621
622    while (true) {
623      Optional<IOException> failure = Optional.empty();
624
625      LOG.debug("Write the cluster ID file to a temporary location: {}", tempIdFile);
626      try (FSDataOutputStream s = fs.create(tempIdFile)) {
627        s.write(clusterId.toByteArray());
628      } catch (IOException ioe) {
629        failure = Optional.of(ioe);
630      }
631
632      if (!failure.isPresent()) {
633        try {
634          LOG.debug("Move the temporary cluster ID file to its target location [{}]:[{}]",
635            tempIdFile, idFile);
636
637          if (!fs.rename(tempIdFile, idFile)) {
638            failure =
639              Optional.of(new IOException("Unable to move temp cluster ID file to " + idFile));
640          }
641        } catch (IOException ioe) {
642          failure = Optional.of(ioe);
643        }
644      }
645
646      if (failure.isPresent()) {
647        final IOException cause = failure.get();
648        if (wait > 0L) {
649          LOG.warn("Unable to create cluster ID file in {}, retrying in {}ms", rootdir, wait,
650            cause);
651          try {
652            Thread.sleep(wait);
653          } catch (InterruptedException e) {
654            Thread.currentThread().interrupt();
655            throw (InterruptedIOException) new InterruptedIOException().initCause(e);
656          }
657          continue;
658        } else {
659          throw cause;
660        }
661      } else {
662        return;
663      }
664    }
665  }
666
667  /**
668   * If DFS, check safe mode and if so, wait until we clear it.
669   * @param conf configuration
670   * @param wait Sleep between retries
671   * @throws IOException e
672   */
673  public static void waitOnSafeMode(final Configuration conf, final long wait) throws IOException {
674    FileSystem fs = FileSystem.get(conf);
675    if (!supportSafeMode(fs)) {
676      return;
677    }
678    // Make sure dfs is not in safe mode
679    while (isInSafeMode(fs)) {
680      LOG.info("Waiting for dfs to exit safe mode...");
681      try {
682        Thread.sleep(wait);
683      } catch (InterruptedException e) {
684        Thread.currentThread().interrupt();
685        throw (InterruptedIOException) new InterruptedIOException().initCause(e);
686      }
687    }
688  }
689
690  public static boolean supportSafeMode(FileSystem fs) {
691    // return true if HDFS.
692    if (fs instanceof DistributedFileSystem) {
693      return true;
694    }
695    // return true if the file system implements SafeMode interface.
696    if (safeModeClazz != null) {
697      return (safeModeClazz.isAssignableFrom(fs.getClass()));
698    }
699    // return false if the file system is not HDFS and does not implement SafeMode interface.
700    return false;
701  }
702
703  /**
704   * Checks if meta region exists
705   * @param fs      file system
706   * @param rootDir root directory of HBase installation
707   * @return true if exists
708   */
709  public static boolean metaRegionExists(FileSystem fs, Path rootDir) throws IOException {
710    Path metaRegionDir = getRegionDirFromRootDir(rootDir, RegionInfoBuilder.FIRST_META_REGIONINFO);
711    return fs.exists(metaRegionDir);
712  }
713
714  /**
715   * Compute HDFS block distribution of a given HdfsDataInputStream. All HdfsDataInputStreams are
716   * backed by a series of LocatedBlocks, which are fetched periodically from the namenode. This
717   * method retrieves those blocks from the input stream and uses them to calculate
718   * HDFSBlockDistribution. The underlying method in DFSInputStream does attempt to use locally
719   * cached blocks, but may hit the namenode if the cache is determined to be incomplete. The method
720   * also involves making copies of all LocatedBlocks rather than return the underlying blocks
721   * themselves.
722   */
723  static public HDFSBlocksDistribution
724    computeHDFSBlocksDistribution(HdfsDataInputStream inputStream) throws IOException {
725    List<LocatedBlock> blocks = inputStream.getAllBlocks();
726    HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution();
727    for (LocatedBlock block : blocks) {
728      String[] hosts = getHostsForLocations(block);
729      long len = block.getBlockSize();
730      StorageType[] storageTypes = block.getStorageTypes();
731      blocksDistribution.addHostsAndBlockWeight(hosts, len, storageTypes);
732    }
733    return blocksDistribution;
734  }
735
736  private static String[] getHostsForLocations(LocatedBlock block) {
737    DatanodeInfo[] locations = getLocatedBlockLocations(block);
738    String[] hosts = new String[locations.length];
739    for (int i = 0; i < hosts.length; i++) {
740      hosts[i] = locations[i].getHostName();
741    }
742    return hosts;
743  }
744
745  /**
746   * Compute HDFS blocks distribution of a given file, or a portion of the file
747   * @param fs     file system
748   * @param status file status of the file
749   * @param start  start position of the portion
750   * @param length length of the portion
751   * @return The HDFS blocks distribution
752   */
753  static public HDFSBlocksDistribution computeHDFSBlocksDistribution(final FileSystem fs,
754    FileStatus status, long start, long length) throws IOException {
755    HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution();
756    BlockLocation[] blockLocations = fs.getFileBlockLocations(status, start, length);
757    addToHDFSBlocksDistribution(blocksDistribution, blockLocations);
758    return blocksDistribution;
759  }
760
761  /**
762   * Update blocksDistribution with blockLocations
763   * @param blocksDistribution the hdfs blocks distribution
764   * @param blockLocations     an array containing block location
765   */
766  static public void addToHDFSBlocksDistribution(HDFSBlocksDistribution blocksDistribution,
767    BlockLocation[] blockLocations) throws IOException {
768    for (BlockLocation bl : blockLocations) {
769      String[] hosts = bl.getHosts();
770      long len = bl.getLength();
771      StorageType[] storageTypes = bl.getStorageTypes();
772      blocksDistribution.addHostsAndBlockWeight(hosts, len, storageTypes);
773    }
774  }
775
776  // TODO move this method OUT of FSUtils. No dependencies to HMaster
777  /**
778   * Returns the total overall fragmentation percentage. Includes hbase:meta and -ROOT- as well.
779   * @param master The master defining the HBase root and file system
780   * @return A map for each table and its percentage (never null)
781   * @throws IOException When scanning the directory fails
782   */
783  public static int getTotalTableFragmentation(final HMaster master) throws IOException {
784    Map<String, Integer> map = getTableFragmentation(master);
785    return map.isEmpty() ? -1 : map.get("-TOTAL-");
786  }
787
788  /**
789   * Runs through the HBase rootdir and checks how many stores for each table have more than one
790   * file in them. Checks -ROOT- and hbase:meta too. The total percentage across all tables is
791   * stored under the special key "-TOTAL-".
792   * @param master The master defining the HBase root and file system.
793   * @return A map for each table and its percentage (never null).
794   * @throws IOException When scanning the directory fails.
795   */
796  public static Map<String, Integer> getTableFragmentation(final HMaster master)
797    throws IOException {
798    Path path = CommonFSUtils.getRootDir(master.getConfiguration());
799    // since HMaster.getFileSystem() is package private
800    FileSystem fs = path.getFileSystem(master.getConfiguration());
801    return getTableFragmentation(fs, path);
802  }
803
804  /**
805   * Runs through the HBase rootdir and checks how many stores for each table have more than one
806   * file in them. Checks -ROOT- and hbase:meta too. The total percentage across all tables is
807   * stored under the special key "-TOTAL-".
808   * @param fs           The file system to use
809   * @param hbaseRootDir The root directory to scan
810   * @return A map for each table and its percentage (never null)
811   * @throws IOException When scanning the directory fails
812   */
813  public static Map<String, Integer> getTableFragmentation(final FileSystem fs,
814    final Path hbaseRootDir) throws IOException {
815    Map<String, Integer> frags = new HashMap<>();
816    int cfCountTotal = 0;
817    int cfFragTotal = 0;
818    PathFilter regionFilter = new RegionDirFilter(fs);
819    PathFilter familyFilter = new FamilyDirFilter(fs);
820    List<Path> tableDirs = getTableDirs(fs, hbaseRootDir);
821    for (Path d : tableDirs) {
822      int cfCount = 0;
823      int cfFrag = 0;
824      FileStatus[] regionDirs = fs.listStatus(d, regionFilter);
825      for (FileStatus regionDir : regionDirs) {
826        Path dd = regionDir.getPath();
827        // else its a region name, now look in region for families
828        FileStatus[] familyDirs = fs.listStatus(dd, familyFilter);
829        for (FileStatus familyDir : familyDirs) {
830          cfCount++;
831          cfCountTotal++;
832          Path family = familyDir.getPath();
833          // now in family make sure only one file
834          FileStatus[] familyStatus = fs.listStatus(family);
835          if (familyStatus.length > 1) {
836            cfFrag++;
837            cfFragTotal++;
838          }
839        }
840      }
841      // compute percentage per table and store in result list
842      frags.put(CommonFSUtils.getTableName(d).getNameAsString(),
843        cfCount == 0 ? 0 : Math.round((float) cfFrag / cfCount * 100));
844    }
845    // set overall percentage for all tables
846    frags.put("-TOTAL-",
847      cfCountTotal == 0 ? 0 : Math.round((float) cfFragTotal / cfCountTotal * 100));
848    return frags;
849  }
850
851  public static void renameFile(FileSystem fs, Path src, Path dst) throws IOException {
852    if (fs.exists(dst) && !fs.delete(dst, false)) {
853      throw new IOException("Can not delete " + dst);
854    }
855    if (!fs.rename(src, dst)) {
856      throw new IOException("Can not rename from " + src + " to " + dst);
857    }
858  }
859
860  /**
861   * A {@link PathFilter} that returns only regular files.
862   */
863  static class FileFilter extends AbstractFileStatusFilter {
864    private final FileSystem fs;
865
866    public FileFilter(final FileSystem fs) {
867      this.fs = fs;
868    }
869
870    @Override
871    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
872      try {
873        return isFile(fs, isDir, p);
874      } catch (IOException e) {
875        LOG.warn("Unable to verify if path={} is a regular file", p, e);
876        return false;
877      }
878    }
879  }
880
881  /**
882   * Directory filter that doesn't include any of the directories in the specified blacklist
883   */
884  public static class BlackListDirFilter extends AbstractFileStatusFilter {
885    private final FileSystem fs;
886    private List<String> blacklist;
887
888    /**
889     * Create a filter on the givem filesystem with the specified blacklist
890     * @param fs                     filesystem to filter
891     * @param directoryNameBlackList list of the names of the directories to filter. If
892     *                               <tt>null</tt>, all directories are returned
893     */
894    @SuppressWarnings("unchecked")
895    public BlackListDirFilter(final FileSystem fs, final List<String> directoryNameBlackList) {
896      this.fs = fs;
897      blacklist = (List<String>) (directoryNameBlackList == null
898        ? Collections.emptyList()
899        : directoryNameBlackList);
900    }
901
902    @Override
903    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
904      if (!isValidName(p.getName())) {
905        return false;
906      }
907
908      try {
909        return isDirectory(fs, isDir, p);
910      } catch (IOException e) {
911        LOG.warn("An error occurred while verifying if [{}] is a valid directory."
912          + " Returning 'not valid' and continuing.", p, e);
913        return false;
914      }
915    }
916
917    protected boolean isValidName(final String name) {
918      return !blacklist.contains(name);
919    }
920  }
921
922  /**
923   * A {@link PathFilter} that only allows directories.
924   */
925  public static class DirFilter extends BlackListDirFilter {
926
927    public DirFilter(FileSystem fs) {
928      super(fs, null);
929    }
930  }
931
932  /**
933   * A {@link PathFilter} that returns usertable directories. To get all directories use the
934   * {@link BlackListDirFilter} with a <tt>null</tt> blacklist
935   */
936  public static class UserTableDirFilter extends BlackListDirFilter {
937    public UserTableDirFilter(FileSystem fs) {
938      super(fs, HConstants.HBASE_NON_TABLE_DIRS);
939    }
940
941    @Override
942    protected boolean isValidName(final String name) {
943      if (!super.isValidName(name)) return false;
944
945      try {
946        TableName.isLegalTableQualifierName(Bytes.toBytes(name));
947      } catch (IllegalArgumentException e) {
948        LOG.info("Invalid table name: {}", name);
949        return false;
950      }
951      return true;
952    }
953  }
954
955  public static List<Path> getTableDirs(final FileSystem fs, final Path rootdir)
956    throws IOException {
957    List<Path> tableDirs = new ArrayList<>();
958    Path baseNamespaceDir = new Path(rootdir, HConstants.BASE_NAMESPACE_DIR);
959    if (fs.exists(baseNamespaceDir)) {
960      for (FileStatus status : fs.globStatus(new Path(baseNamespaceDir, "*"))) {
961        tableDirs.addAll(FSUtils.getLocalTableDirs(fs, status.getPath()));
962      }
963    }
964    return tableDirs;
965  }
966
967  /**
968   * @return All the table directories under <code>rootdir</code>. Ignore non table hbase folders
969   *         such as .logs, .oldlogs, .corrupt folders.
970   */
971  public static List<Path> getLocalTableDirs(final FileSystem fs, final Path rootdir)
972    throws IOException {
973    // presumes any directory under hbase.rootdir is a table
974    FileStatus[] dirs = fs.listStatus(rootdir, new UserTableDirFilter(fs));
975    List<Path> tabledirs = new ArrayList<>(dirs.length);
976    for (FileStatus dir : dirs) {
977      tabledirs.add(dir.getPath());
978    }
979    return tabledirs;
980  }
981
982  /**
983   * Filter for all dirs that don't start with '.'
984   */
985  public static class RegionDirFilter extends AbstractFileStatusFilter {
986    // This pattern will accept 0.90+ style hex region dirs and older numeric region dir names.
987    final public static Pattern regionDirPattern = Pattern.compile("^[0-9a-f]*$");
988    final FileSystem fs;
989
990    public RegionDirFilter(FileSystem fs) {
991      this.fs = fs;
992    }
993
994    @Override
995    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
996      if (!regionDirPattern.matcher(p.getName()).matches()) {
997        return false;
998      }
999
1000      try {
1001        return isDirectory(fs, isDir, p);
1002      } catch (IOException ioe) {
1003        // Maybe the file was moved or the fs was disconnected.
1004        LOG.warn("Skipping file {} due to IOException", p, ioe);
1005        return false;
1006      }
1007    }
1008  }
1009
1010  /**
1011   * Given a particular table dir, return all the regiondirs inside it, excluding files such as
1012   * .tableinfo
1013   * @param fs       A file system for the Path
1014   * @param tableDir Path to a specific table directory &lt;hbase.rootdir&gt;/&lt;tabledir&gt;
1015   * @return List of paths to valid region directories in table dir.
1016   */
1017  public static List<Path> getRegionDirs(final FileSystem fs, final Path tableDir)
1018    throws IOException {
1019    // assumes we are in a table dir.
1020    List<FileStatus> rds = listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs));
1021    if (rds == null) {
1022      return Collections.emptyList();
1023    }
1024    List<Path> regionDirs = new ArrayList<>(rds.size());
1025    for (FileStatus rdfs : rds) {
1026      Path rdPath = rdfs.getPath();
1027      regionDirs.add(rdPath);
1028    }
1029    return regionDirs;
1030  }
1031
1032  public static Path getRegionDirFromRootDir(Path rootDir, RegionInfo region) {
1033    return getRegionDirFromTableDir(CommonFSUtils.getTableDir(rootDir, region.getTable()), region);
1034  }
1035
1036  public static Path getRegionDirFromTableDir(Path tableDir, RegionInfo region) {
1037    return getRegionDirFromTableDir(tableDir,
1038      ServerRegionReplicaUtil.getRegionInfoForFs(region).getEncodedName());
1039  }
1040
1041  public static Path getRegionDirFromTableDir(Path tableDir, String encodedRegionName) {
1042    return new Path(tableDir, encodedRegionName);
1043  }
1044
1045  /**
1046   * Filter for all dirs that are legal column family names. This is generally used for colfam dirs
1047   * &lt;hbase.rootdir&gt;/&lt;tabledir&gt;/&lt;regiondir&gt;/&lt;colfamdir&gt;.
1048   */
1049  public static class FamilyDirFilter extends AbstractFileStatusFilter {
1050    final FileSystem fs;
1051
1052    public FamilyDirFilter(FileSystem fs) {
1053      this.fs = fs;
1054    }
1055
1056    @Override
1057    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1058      try {
1059        // throws IAE if invalid
1060        ColumnFamilyDescriptorBuilder.isLegalColumnFamilyName(Bytes.toBytes(p.getName()));
1061      } catch (IllegalArgumentException iae) {
1062        // path name is an invalid family name and thus is excluded.
1063        return false;
1064      }
1065
1066      try {
1067        return isDirectory(fs, isDir, p);
1068      } catch (IOException ioe) {
1069        // Maybe the file was moved or the fs was disconnected.
1070        LOG.warn("Skipping file {} due to IOException", p, ioe);
1071        return false;
1072      }
1073    }
1074  }
1075
1076  /**
1077   * Given a particular region dir, return all the familydirs inside it
1078   * @param fs        A file system for the Path
1079   * @param regionDir Path to a specific region directory
1080   * @return List of paths to valid family directories in region dir.
1081   */
1082  public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir)
1083    throws IOException {
1084    // assumes we are in a region dir.
1085    return getFilePaths(fs, regionDir, new FamilyDirFilter(fs));
1086  }
1087
1088  public static List<Path> getReferenceFilePaths(final FileSystem fs, final Path familyDir)
1089    throws IOException {
1090    return getFilePaths(fs, familyDir, new ReferenceFileFilter(fs));
1091  }
1092
1093  public static List<Path> getReferenceAndLinkFilePaths(final FileSystem fs, final Path familyDir)
1094    throws IOException {
1095    return getFilePaths(fs, familyDir, new ReferenceAndLinkFileFilter(fs));
1096  }
1097
1098  private static List<Path> getFilePaths(final FileSystem fs, final Path dir,
1099    final PathFilter pathFilter) throws IOException {
1100    FileStatus[] fds = fs.listStatus(dir, pathFilter);
1101    List<Path> files = new ArrayList<>(fds.length);
1102    for (FileStatus fdfs : fds) {
1103      Path fdPath = fdfs.getPath();
1104      files.add(fdPath);
1105    }
1106    return files;
1107  }
1108
1109  public static int getRegionReferenceAndLinkFileCount(final FileSystem fs, final Path p) {
1110    int result = 0;
1111    try {
1112      for (Path familyDir : getFamilyDirs(fs, p)) {
1113        result += getReferenceAndLinkFilePaths(fs, familyDir).size();
1114      }
1115    } catch (IOException e) {
1116      LOG.warn("Error Counting reference files.", e);
1117    }
1118    return result;
1119  }
1120
1121  public static class ReferenceAndLinkFileFilter implements PathFilter {
1122
1123    private final FileSystem fs;
1124
1125    public ReferenceAndLinkFileFilter(FileSystem fs) {
1126      this.fs = fs;
1127    }
1128
1129    @Override
1130    public boolean accept(Path rd) {
1131      try {
1132        // only files can be references.
1133        return !fs.getFileStatus(rd).isDirectory()
1134          && (StoreFileInfo.isReference(rd) || HFileLink.isHFileLink(rd));
1135      } catch (IOException ioe) {
1136        // Maybe the file was moved or the fs was disconnected.
1137        LOG.warn("Skipping file " + rd + " due to IOException", ioe);
1138        return false;
1139      }
1140    }
1141  }
1142
1143  /**
1144   * Filter for HFiles that excludes reference files.
1145   */
1146  public static class HFileFilter extends AbstractFileStatusFilter {
1147    final FileSystem fs;
1148
1149    public HFileFilter(FileSystem fs) {
1150      this.fs = fs;
1151    }
1152
1153    @Override
1154    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1155      if (!StoreFileInfo.isHFile(p) && !StoreFileInfo.isMobFile(p)) {
1156        return false;
1157      }
1158
1159      try {
1160        return isFile(fs, isDir, p);
1161      } catch (IOException ioe) {
1162        // Maybe the file was moved or the fs was disconnected.
1163        LOG.warn("Skipping file {} due to IOException", p, ioe);
1164        return false;
1165      }
1166    }
1167  }
1168
1169  /**
1170   * Filter for HFileLinks (StoreFiles and HFiles not included). the filter itself does not consider
1171   * if a link is file or not.
1172   */
1173  public static class HFileLinkFilter implements PathFilter {
1174
1175    @Override
1176    public boolean accept(Path p) {
1177      return HFileLink.isHFileLink(p);
1178    }
1179  }
1180
1181  public static class ReferenceFileFilter extends AbstractFileStatusFilter {
1182
1183    private final FileSystem fs;
1184
1185    public ReferenceFileFilter(FileSystem fs) {
1186      this.fs = fs;
1187    }
1188
1189    @Override
1190    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1191      if (!StoreFileInfo.isReference(p)) {
1192        return false;
1193      }
1194
1195      try {
1196        // only files can be references.
1197        return isFile(fs, isDir, p);
1198      } catch (IOException ioe) {
1199        // Maybe the file was moved or the fs was disconnected.
1200        LOG.warn("Skipping file {} due to IOException", p, ioe);
1201        return false;
1202      }
1203    }
1204  }
1205
1206  /**
1207   * Called every so-often by storefile map builder getTableStoreFilePathMap to report progress.
1208   */
1209  interface ProgressReporter {
1210    /**
1211     * @param status File or directory we are about to process.
1212     */
1213    void progress(FileStatus status);
1214  }
1215
1216  /**
1217   * Runs through the HBase rootdir/tablename and creates a reverse lookup map for table StoreFile
1218   * names to the full Path. <br>
1219   * Example...<br>
1220   * Key = 3944417774205889744 <br>
1221   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1222   * @param map          map to add values. If null, this method will create and populate one to
1223   *                     return
1224   * @param fs           The file system to use.
1225   * @param hbaseRootDir The root directory to scan.
1226   * @param tableName    name of the table to scan.
1227   * @return Map keyed by StoreFile name with a value of the full Path.
1228   * @throws IOException When scanning the directory fails.
1229   */
1230  public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map,
1231    final FileSystem fs, final Path hbaseRootDir, TableName tableName)
1232    throws IOException, InterruptedException {
1233    return getTableStoreFilePathMap(map, fs, hbaseRootDir, tableName, null, null,
1234      (ProgressReporter) null);
1235  }
1236
1237  /**
1238   * Runs through the HBase rootdir/tablename and creates a reverse lookup map for table StoreFile
1239   * names to the full Path. Note that because this method can be called on a 'live' HBase system
1240   * that we will skip files that no longer exist by the time we traverse them and similarly the
1241   * user of the result needs to consider that some entries in this map may not exist by the time
1242   * this call completes. <br>
1243   * Example...<br>
1244   * Key = 3944417774205889744 <br>
1245   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1246   * @param resultMap        map to add values. If null, this method will create and populate one to
1247   *                         return
1248   * @param fs               The file system to use.
1249   * @param hbaseRootDir     The root directory to scan.
1250   * @param tableName        name of the table to scan.
1251   * @param sfFilter         optional path filter to apply to store files
1252   * @param executor         optional executor service to parallelize this operation
1253   * @param progressReporter Instance or null; gets called every time we move to new region of
1254   *                         family dir and for each store file.
1255   * @return Map keyed by StoreFile name with a value of the full Path.
1256   * @throws IOException When scanning the directory fails.
1257   * @deprecated Since 2.3.0. For removal in hbase4. Use ProgressReporter override instead.
1258   */
1259  @Deprecated
1260  public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap,
1261    final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter,
1262    ExecutorService executor, final HbckErrorReporter progressReporter)
1263    throws IOException, InterruptedException {
1264    return getTableStoreFilePathMap(resultMap, fs, hbaseRootDir, tableName, sfFilter, executor,
1265      new ProgressReporter() {
1266        @Override
1267        public void progress(FileStatus status) {
1268          // status is not used in this implementation.
1269          progressReporter.progress();
1270        }
1271      });
1272  }
1273
1274  /**
1275   * Runs through the HBase rootdir/tablename and creates a reverse lookup map for table StoreFile
1276   * names to the full Path. Note that because this method can be called on a 'live' HBase system
1277   * that we will skip files that no longer exist by the time we traverse them and similarly the
1278   * user of the result needs to consider that some entries in this map may not exist by the time
1279   * this call completes. <br>
1280   * Example...<br>
1281   * Key = 3944417774205889744 <br>
1282   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1283   * @param resultMap        map to add values. If null, this method will create and populate one to
1284   *                         return
1285   * @param fs               The file system to use.
1286   * @param hbaseRootDir     The root directory to scan.
1287   * @param tableName        name of the table to scan.
1288   * @param sfFilter         optional path filter to apply to store files
1289   * @param executor         optional executor service to parallelize this operation
1290   * @param progressReporter Instance or null; gets called every time we move to new region of
1291   *                         family dir and for each store file.
1292   * @return Map keyed by StoreFile name with a value of the full Path.
1293   * @throws IOException          When scanning the directory fails.
1294   * @throws InterruptedException the thread is interrupted, either before or during the activity.
1295   */
1296  public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap,
1297    final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter,
1298    ExecutorService executor, final ProgressReporter progressReporter)
1299    throws IOException, InterruptedException {
1300
1301    final Map<String, Path> finalResultMap =
1302      resultMap == null ? new ConcurrentHashMap<>(128, 0.75f, 32) : resultMap;
1303
1304    // only include the directory paths to tables
1305    Path tableDir = CommonFSUtils.getTableDir(hbaseRootDir, tableName);
1306    // Inside a table, there are compaction.dir directories to skip. Otherwise, all else
1307    // should be regions.
1308    final FamilyDirFilter familyFilter = new FamilyDirFilter(fs);
1309    final Vector<Exception> exceptions = new Vector<>();
1310
1311    try {
1312      List<FileStatus> regionDirs =
1313        FSUtils.listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs));
1314      if (regionDirs == null) {
1315        return finalResultMap;
1316      }
1317
1318      final List<Future<?>> futures = new ArrayList<>(regionDirs.size());
1319
1320      for (FileStatus regionDir : regionDirs) {
1321        if (null != progressReporter) {
1322          progressReporter.progress(regionDir);
1323        }
1324        final Path dd = regionDir.getPath();
1325
1326        if (!exceptions.isEmpty()) {
1327          break;
1328        }
1329
1330        Runnable getRegionStoreFileMapCall = new Runnable() {
1331          @Override
1332          public void run() {
1333            try {
1334              HashMap<String, Path> regionStoreFileMap = new HashMap<>();
1335              List<FileStatus> familyDirs =
1336                FSUtils.listStatusWithStatusFilter(fs, dd, familyFilter);
1337              if (familyDirs == null) {
1338                if (!fs.exists(dd)) {
1339                  LOG.warn("Skipping region because it no longer exists: " + dd);
1340                } else {
1341                  LOG.warn("Skipping region because it has no family dirs: " + dd);
1342                }
1343                return;
1344              }
1345              for (FileStatus familyDir : familyDirs) {
1346                if (null != progressReporter) {
1347                  progressReporter.progress(familyDir);
1348                }
1349                Path family = familyDir.getPath();
1350                if (family.getName().equals(HConstants.RECOVERED_EDITS_DIR)) {
1351                  continue;
1352                }
1353                // now in family, iterate over the StoreFiles and
1354                // put in map
1355                FileStatus[] familyStatus = fs.listStatus(family);
1356                for (FileStatus sfStatus : familyStatus) {
1357                  if (null != progressReporter) {
1358                    progressReporter.progress(sfStatus);
1359                  }
1360                  Path sf = sfStatus.getPath();
1361                  if (sfFilter == null || sfFilter.accept(sf)) {
1362                    regionStoreFileMap.put(sf.getName(), sf);
1363                  }
1364                }
1365              }
1366              finalResultMap.putAll(regionStoreFileMap);
1367            } catch (Exception e) {
1368              LOG.error("Could not get region store file map for region: " + dd, e);
1369              exceptions.add(e);
1370            }
1371          }
1372        };
1373
1374        // If executor is available, submit async tasks to exec concurrently, otherwise
1375        // just do serial sync execution
1376        if (executor != null) {
1377          Future<?> future = executor.submit(getRegionStoreFileMapCall);
1378          futures.add(future);
1379        } else {
1380          FutureTask<?> future = new FutureTask<>(getRegionStoreFileMapCall, null);
1381          future.run();
1382          futures.add(future);
1383        }
1384      }
1385
1386      // Ensure all pending tasks are complete (or that we run into an exception)
1387      for (Future<?> f : futures) {
1388        if (!exceptions.isEmpty()) {
1389          break;
1390        }
1391        try {
1392          f.get();
1393        } catch (ExecutionException e) {
1394          LOG.error("Unexpected exec exception!  Should've been caught already.  (Bug?)", e);
1395          // Shouldn't happen, we already logged/caught any exceptions in the Runnable
1396        }
1397      }
1398    } catch (IOException e) {
1399      LOG.error("Cannot execute getTableStoreFilePathMap for " + tableName, e);
1400      exceptions.add(e);
1401    } finally {
1402      if (!exceptions.isEmpty()) {
1403        // Just throw the first exception as an indication something bad happened
1404        // Don't need to propagate all the exceptions, we already logged them all anyway
1405        Throwables.propagateIfPossible(exceptions.firstElement(), IOException.class);
1406        throw new IOException(exceptions.firstElement());
1407      }
1408    }
1409
1410    return finalResultMap;
1411  }
1412
1413  public static int getRegionReferenceFileCount(final FileSystem fs, final Path p) {
1414    int result = 0;
1415    try {
1416      for (Path familyDir : getFamilyDirs(fs, p)) {
1417        result += getReferenceFilePaths(fs, familyDir).size();
1418      }
1419    } catch (IOException e) {
1420      LOG.warn("Error counting reference files", e);
1421    }
1422    return result;
1423  }
1424
1425  /**
1426   * Runs through the HBase rootdir and creates a reverse lookup map for table StoreFile names to
1427   * the full Path. <br>
1428   * Example...<br>
1429   * Key = 3944417774205889744 <br>
1430   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1431   * @param fs           The file system to use.
1432   * @param hbaseRootDir The root directory to scan.
1433   * @return Map keyed by StoreFile name with a value of the full Path.
1434   * @throws IOException When scanning the directory fails.
1435   */
1436  public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs,
1437    final Path hbaseRootDir) throws IOException, InterruptedException {
1438    return getTableStoreFilePathMap(fs, hbaseRootDir, null, null, (ProgressReporter) null);
1439  }
1440
1441  /**
1442   * Runs through the HBase rootdir and creates a reverse lookup map for table StoreFile names to
1443   * the full Path. <br>
1444   * Example...<br>
1445   * Key = 3944417774205889744 <br>
1446   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1447   * @param fs               The file system to use.
1448   * @param hbaseRootDir     The root directory to scan.
1449   * @param sfFilter         optional path filter to apply to store files
1450   * @param executor         optional executor service to parallelize this operation
1451   * @param progressReporter Instance or null; gets called every time we move to new region of
1452   *                         family dir and for each store file.
1453   * @return Map keyed by StoreFile name with a value of the full Path.
1454   * @throws IOException When scanning the directory fails.
1455   * @deprecated Since 2.3.0. Will be removed in hbase4. Used
1456   *             {@link #getTableStoreFilePathMap(FileSystem, Path, PathFilter, ExecutorService, ProgressReporter)}
1457   */
1458  @Deprecated
1459  public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs,
1460    final Path hbaseRootDir, PathFilter sfFilter, ExecutorService executor,
1461    HbckErrorReporter progressReporter) throws IOException, InterruptedException {
1462    return getTableStoreFilePathMap(fs, hbaseRootDir, sfFilter, executor, new ProgressReporter() {
1463      @Override
1464      public void progress(FileStatus status) {
1465        // status is not used in this implementation.
1466        progressReporter.progress();
1467      }
1468    });
1469  }
1470
1471  /**
1472   * Runs through the HBase rootdir and creates a reverse lookup map for table StoreFile names to
1473   * the full Path. <br>
1474   * Example...<br>
1475   * Key = 3944417774205889744 <br>
1476   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1477   * @param fs               The file system to use.
1478   * @param hbaseRootDir     The root directory to scan.
1479   * @param sfFilter         optional path filter to apply to store files
1480   * @param executor         optional executor service to parallelize this operation
1481   * @param progressReporter Instance or null; gets called every time we move to new region of
1482   *                         family dir and for each store file.
1483   * @return Map keyed by StoreFile name with a value of the full Path.
1484   * @throws IOException When scanning the directory fails.
1485   */
1486  public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs,
1487    final Path hbaseRootDir, PathFilter sfFilter, ExecutorService executor,
1488    ProgressReporter progressReporter) throws IOException, InterruptedException {
1489    ConcurrentHashMap<String, Path> map = new ConcurrentHashMap<>(1024, 0.75f, 32);
1490
1491    // if this method looks similar to 'getTableFragmentation' that is because
1492    // it was borrowed from it.
1493
1494    // only include the directory paths to tables
1495    for (Path tableDir : FSUtils.getTableDirs(fs, hbaseRootDir)) {
1496      getTableStoreFilePathMap(map, fs, hbaseRootDir, CommonFSUtils.getTableName(tableDir),
1497        sfFilter, executor, progressReporter);
1498    }
1499    return map;
1500  }
1501
1502  /**
1503   * Filters FileStatuses in an array and returns a list
1504   * @param input  An array of FileStatuses
1505   * @param filter A required filter to filter the array
1506   * @return A list of FileStatuses
1507   */
1508  public static List<FileStatus> filterFileStatuses(FileStatus[] input, FileStatusFilter filter) {
1509    if (input == null) return null;
1510    return filterFileStatuses(Iterators.forArray(input), filter);
1511  }
1512
1513  /**
1514   * Filters FileStatuses in an iterator and returns a list
1515   * @param input  An iterator of FileStatuses
1516   * @param filter A required filter to filter the array
1517   * @return A list of FileStatuses
1518   */
1519  public static List<FileStatus> filterFileStatuses(Iterator<FileStatus> input,
1520    FileStatusFilter filter) {
1521    if (input == null) return null;
1522    ArrayList<FileStatus> results = new ArrayList<>();
1523    while (input.hasNext()) {
1524      FileStatus f = input.next();
1525      if (filter.accept(f)) {
1526        results.add(f);
1527      }
1528    }
1529    return results;
1530  }
1531
1532  /**
1533   * Calls fs.listStatus() and treats FileNotFoundException as non-fatal This accommodates
1534   * differences between hadoop versions, where hadoop 1 does not throw a FileNotFoundException, and
1535   * return an empty FileStatus[] while Hadoop 2 will throw FileNotFoundException.
1536   * @param fs     file system
1537   * @param dir    directory
1538   * @param filter file status filter
1539   * @return null if dir is empty or doesn't exist, otherwise FileStatus list
1540   */
1541  public static List<FileStatus> listStatusWithStatusFilter(final FileSystem fs, final Path dir,
1542    final FileStatusFilter filter) throws IOException {
1543    FileStatus[] status = null;
1544    try {
1545      status = fs.listStatus(dir);
1546    } catch (FileNotFoundException fnfe) {
1547      LOG.trace("{} does not exist", dir);
1548      return null;
1549    }
1550
1551    if (ArrayUtils.getLength(status) == 0) {
1552      return null;
1553    }
1554
1555    if (filter == null) {
1556      return Arrays.asList(status);
1557    } else {
1558      List<FileStatus> status2 = filterFileStatuses(status, filter);
1559      if (status2 == null || status2.isEmpty()) {
1560        return null;
1561      } else {
1562        return status2;
1563      }
1564    }
1565  }
1566
1567  /**
1568   * This function is to scan the root path of the file system to get the degree of locality for
1569   * each region on each of the servers having at least one block of that region. This is used by
1570   * the tool {@link org.apache.hadoop.hbase.master.RegionPlacementMaintainer} the configuration to
1571   * use
1572   * @return the mapping from region encoded name to a map of server names to locality fraction in
1573   *         case of file system errors or interrupts
1574   */
1575  public static Map<String, Map<String, Float>>
1576    getRegionDegreeLocalityMappingFromFS(final Configuration conf) throws IOException {
1577    return getRegionDegreeLocalityMappingFromFS(conf, null,
1578      conf.getInt(THREAD_POOLSIZE, DEFAULT_THREAD_POOLSIZE));
1579
1580  }
1581
1582  /**
1583   * This function is to scan the root path of the file system to get the degree of locality for
1584   * each region on each of the servers having at least one block of that region. the configuration
1585   * to use the table you wish to scan locality for the thread pool size to use
1586   * @return the mapping from region encoded name to a map of server names to locality fraction in
1587   *         case of file system errors or interrupts
1588   */
1589  public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
1590    final Configuration conf, final String desiredTable, int threadPoolSize) throws IOException {
1591    Map<String, Map<String, Float>> regionDegreeLocalityMapping = new ConcurrentHashMap<>();
1592    getRegionLocalityMappingFromFS(conf, desiredTable, threadPoolSize, regionDegreeLocalityMapping);
1593    return regionDegreeLocalityMapping;
1594  }
1595
1596  /**
1597   * This function is to scan the root path of the file system to get either the mapping between the
1598   * region name and its best locality region server or the degree of locality of each region on
1599   * each of the servers having at least one block of that region. The output map parameters are
1600   * both optional. the configuration to use the table you wish to scan locality for the thread pool
1601   * size to use the map into which to put the locality degree mapping or null, must be a
1602   * thread-safe implementation in case of file system errors or interrupts
1603   */
1604  private static void getRegionLocalityMappingFromFS(final Configuration conf,
1605    final String desiredTable, int threadPoolSize,
1606    final Map<String, Map<String, Float>> regionDegreeLocalityMapping) throws IOException {
1607    final FileSystem fs = FileSystem.get(conf);
1608    final Path rootPath = CommonFSUtils.getRootDir(conf);
1609    final long startTime = EnvironmentEdgeManager.currentTime();
1610    final Path queryPath;
1611    // The table files are in ${hbase.rootdir}/data/<namespace>/<table>/*
1612    if (null == desiredTable) {
1613      queryPath =
1614        new Path(new Path(rootPath, HConstants.BASE_NAMESPACE_DIR).toString() + "/*/*/*/");
1615    } else {
1616      queryPath = new Path(
1617        CommonFSUtils.getTableDir(rootPath, TableName.valueOf(desiredTable)).toString() + "/*/");
1618    }
1619
1620    // reject all paths that are not appropriate
1621    PathFilter pathFilter = new PathFilter() {
1622      @Override
1623      public boolean accept(Path path) {
1624        // this is the region name; it may get some noise data
1625        if (null == path) {
1626          return false;
1627        }
1628
1629        // no parent?
1630        Path parent = path.getParent();
1631        if (null == parent) {
1632          return false;
1633        }
1634
1635        String regionName = path.getName();
1636        if (null == regionName) {
1637          return false;
1638        }
1639
1640        if (!regionName.toLowerCase(Locale.ROOT).matches("[0-9a-f]+")) {
1641          return false;
1642        }
1643        return true;
1644      }
1645    };
1646
1647    FileStatus[] statusList = fs.globStatus(queryPath, pathFilter);
1648
1649    if (LOG.isDebugEnabled()) {
1650      LOG.debug("Query Path: {} ; # list of files: {}", queryPath, Arrays.toString(statusList));
1651    }
1652
1653    if (null == statusList) {
1654      return;
1655    }
1656
1657    // lower the number of threads in case we have very few expected regions
1658    threadPoolSize = Math.min(threadPoolSize, statusList.length);
1659
1660    // run in multiple threads
1661    final ExecutorService tpe = Executors.newFixedThreadPool(threadPoolSize,
1662      new ThreadFactoryBuilder().setNameFormat("FSRegionQuery-pool-%d").setDaemon(true)
1663        .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
1664    try {
1665      // ignore all file status items that are not of interest
1666      for (FileStatus regionStatus : statusList) {
1667        if (null == regionStatus || !regionStatus.isDirectory()) {
1668          continue;
1669        }
1670
1671        final Path regionPath = regionStatus.getPath();
1672        if (null != regionPath) {
1673          tpe.execute(new FSRegionScanner(fs, regionPath, null, regionDegreeLocalityMapping));
1674        }
1675      }
1676    } finally {
1677      tpe.shutdown();
1678      final long threadWakeFrequency = (long) conf.getInt(HConstants.THREAD_WAKE_FREQUENCY,
1679        HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
1680      try {
1681        // here we wait until TPE terminates, which is either naturally or by
1682        // exceptions in the execution of the threads
1683        while (!tpe.awaitTermination(threadWakeFrequency, TimeUnit.MILLISECONDS)) {
1684          // printing out rough estimate, so as to not introduce
1685          // AtomicInteger
1686          LOG.info("Locality checking is underway: { Scanned Regions : "
1687            + ((ThreadPoolExecutor) tpe).getCompletedTaskCount() + "/"
1688            + ((ThreadPoolExecutor) tpe).getTaskCount() + " }");
1689        }
1690      } catch (InterruptedException e) {
1691        Thread.currentThread().interrupt();
1692        throw (InterruptedIOException) new InterruptedIOException().initCause(e);
1693      }
1694    }
1695
1696    long overhead = EnvironmentEdgeManager.currentTime() - startTime;
1697    LOG.info("Scan DFS for locality info takes {}ms", overhead);
1698  }
1699
1700  /**
1701   * Do our short circuit read setup. Checks buffer size to use and whether to do checksumming in
1702   * hbase or hdfs.
1703   */
1704  public static void setupShortCircuitRead(final Configuration conf) {
1705    // Check that the user has not set the "dfs.client.read.shortcircuit.skip.checksum" property.
1706    boolean shortCircuitSkipChecksum =
1707      conf.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false);
1708    boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
1709    if (shortCircuitSkipChecksum) {
1710      LOG.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not "
1711        + "be set to true."
1712        + (useHBaseChecksum
1713          ? " HBase checksum doesn't require "
1714            + "it, see https://issues.apache.org/jira/browse/HBASE-6868."
1715          : ""));
1716      assert !shortCircuitSkipChecksum; // this will fail if assertions are on
1717    }
1718    checkShortCircuitReadBufferSize(conf);
1719  }
1720
1721  /**
1722   * Check if short circuit read buffer size is set and if not, set it to hbase value.
1723   */
1724  public static void checkShortCircuitReadBufferSize(final Configuration conf) {
1725    final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2;
1726    final int notSet = -1;
1727    // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2
1728    final String dfsKey = "dfs.client.read.shortcircuit.buffer.size";
1729    int size = conf.getInt(dfsKey, notSet);
1730    // If a size is set, return -- we will use it.
1731    if (size != notSet) return;
1732    // But short circuit buffer size is normally not set. Put in place the hbase wanted size.
1733    int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize);
1734    conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
1735  }
1736
1737  /**
1738   * Returns The DFSClient DFSHedgedReadMetrics instance or null if can't be found or not on hdfs.
1739   */
1740  public static DFSHedgedReadMetrics getDFSHedgedReadMetrics(final Configuration c)
1741    throws IOException {
1742    if (!CommonFSUtils.isHDFS(c)) {
1743      return null;
1744    }
1745    // getHedgedReadMetrics is package private. Get the DFSClient instance that is internal
1746    // to the DFS FS instance and make the method getHedgedReadMetrics accessible, then invoke it
1747    // to get the singleton instance of DFSHedgedReadMetrics shared by DFSClients.
1748    final String name = "getHedgedReadMetrics";
1749    DFSClient dfsclient = ((DistributedFileSystem) FileSystem.get(c)).getClient();
1750    Method m;
1751    try {
1752      m = dfsclient.getClass().getDeclaredMethod(name);
1753    } catch (NoSuchMethodException e) {
1754      LOG.warn(
1755        "Failed find method " + name + " in dfsclient; no hedged read metrics: " + e.getMessage());
1756      return null;
1757    } catch (SecurityException e) {
1758      LOG.warn(
1759        "Failed find method " + name + " in dfsclient; no hedged read metrics: " + e.getMessage());
1760      return null;
1761    }
1762    m.setAccessible(true);
1763    try {
1764      return (DFSHedgedReadMetrics) m.invoke(dfsclient);
1765    } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
1766      LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: "
1767        + e.getMessage());
1768      return null;
1769    }
1770  }
1771
1772  public static List<Path> copyFilesParallel(FileSystem srcFS, Path src, FileSystem dstFS, Path dst,
1773    Configuration conf, int threads) throws IOException {
1774    ExecutorService pool = Executors.newFixedThreadPool(threads);
1775    List<Future<Void>> futures = new ArrayList<>();
1776    List<Path> traversedPaths;
1777    try {
1778      traversedPaths = copyFiles(srcFS, src, dstFS, dst, conf, pool, futures);
1779      for (Future<Void> future : futures) {
1780        future.get();
1781      }
1782    } catch (ExecutionException | InterruptedException | IOException e) {
1783      throw new IOException("Copy snapshot reference files failed", e);
1784    } finally {
1785      pool.shutdownNow();
1786    }
1787    return traversedPaths;
1788  }
1789
1790  private static List<Path> copyFiles(FileSystem srcFS, Path src, FileSystem dstFS, Path dst,
1791    Configuration conf, ExecutorService pool, List<Future<Void>> futures) throws IOException {
1792    List<Path> traversedPaths = new ArrayList<>();
1793    traversedPaths.add(dst);
1794    FileStatus currentFileStatus = srcFS.getFileStatus(src);
1795    if (currentFileStatus.isDirectory()) {
1796      if (!dstFS.mkdirs(dst)) {
1797        throw new IOException("Create directory failed: " + dst);
1798      }
1799      FileStatus[] subPaths = srcFS.listStatus(src);
1800      for (FileStatus subPath : subPaths) {
1801        traversedPaths.addAll(copyFiles(srcFS, subPath.getPath(), dstFS,
1802          new Path(dst, subPath.getPath().getName()), conf, pool, futures));
1803      }
1804    } else {
1805      Future<Void> future = pool.submit(() -> {
1806        FileUtil.copy(srcFS, src, dstFS, dst, false, false, conf);
1807        return null;
1808      });
1809      futures.add(future);
1810    }
1811    return traversedPaths;
1812  }
1813
1814  /** Returns A set containing all namenode addresses of fs */
1815  private static Set<InetSocketAddress> getNNAddresses(DistributedFileSystem fs,
1816    Configuration conf) {
1817    Set<InetSocketAddress> addresses = new HashSet<>();
1818    String serviceName = fs.getCanonicalServiceName();
1819
1820    if (serviceName.startsWith("ha-hdfs")) {
1821      try {
1822        Map<String, Map<String, InetSocketAddress>> addressMap =
1823          DFSUtil.getNNServiceRpcAddressesForCluster(conf);
1824        String nameService = serviceName.substring(serviceName.indexOf(":") + 1);
1825        if (addressMap.containsKey(nameService)) {
1826          Map<String, InetSocketAddress> nnMap = addressMap.get(nameService);
1827          for (Map.Entry<String, InetSocketAddress> e2 : nnMap.entrySet()) {
1828            InetSocketAddress addr = e2.getValue();
1829            addresses.add(addr);
1830          }
1831        }
1832      } catch (Exception e) {
1833        LOG.warn("DFSUtil.getNNServiceRpcAddresses failed. serviceName=" + serviceName, e);
1834      }
1835    } else {
1836      URI uri = fs.getUri();
1837      int port = uri.getPort();
1838      if (port < 0) {
1839        int idx = serviceName.indexOf(':');
1840        port = Integer.parseInt(serviceName.substring(idx + 1));
1841      }
1842      InetSocketAddress addr = new InetSocketAddress(uri.getHost(), port);
1843      addresses.add(addr);
1844    }
1845
1846    return addresses;
1847  }
1848
1849  /**
1850   * @param conf the Configuration of HBase
1851   * @return Whether srcFs and desFs are on same hdfs or not
1852   */
1853  public static boolean isSameHdfs(Configuration conf, FileSystem srcFs, FileSystem desFs) {
1854    // By getCanonicalServiceName, we could make sure both srcFs and desFs
1855    // show a unified format which contains scheme, host and port.
1856    String srcServiceName = srcFs.getCanonicalServiceName();
1857    String desServiceName = desFs.getCanonicalServiceName();
1858
1859    if (srcServiceName == null || desServiceName == null) {
1860      return false;
1861    }
1862    if (srcServiceName.equals(desServiceName)) {
1863      return true;
1864    }
1865    if (srcServiceName.startsWith("ha-hdfs") && desServiceName.startsWith("ha-hdfs")) {
1866      Collection<String> internalNameServices =
1867        conf.getTrimmedStringCollection("dfs.internal.nameservices");
1868      if (!internalNameServices.isEmpty()) {
1869        if (internalNameServices.contains(srcServiceName.split(":")[1])) {
1870          return true;
1871        } else {
1872          return false;
1873        }
1874      }
1875    }
1876    if (srcFs instanceof DistributedFileSystem && desFs instanceof DistributedFileSystem) {
1877      // If one serviceName is an HA format while the other is a non-HA format,
1878      // maybe they refer to the same FileSystem.
1879      // For example, srcFs is "ha-hdfs://nameservices" and desFs is "hdfs://activeNamenode:port"
1880      Set<InetSocketAddress> srcAddrs = getNNAddresses((DistributedFileSystem) srcFs, conf);
1881      Set<InetSocketAddress> desAddrs = getNNAddresses((DistributedFileSystem) desFs, conf);
1882      if (Sets.intersection(srcAddrs, desAddrs).size() > 0) {
1883        return true;
1884      }
1885    }
1886
1887    return false;
1888  }
1889}