001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import static org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET;
021
022import edu.umd.cs.findbugs.annotations.CheckForNull;
023import java.io.ByteArrayInputStream;
024import java.io.DataInputStream;
025import java.io.EOFException;
026import java.io.FileNotFoundException;
027import java.io.IOException;
028import java.io.InterruptedIOException;
029import java.lang.reflect.InvocationTargetException;
030import java.lang.reflect.Method;
031import java.net.InetSocketAddress;
032import java.net.URI;
033import java.util.ArrayList;
034import java.util.Arrays;
035import java.util.Collection;
036import java.util.Collections;
037import java.util.HashMap;
038import java.util.HashSet;
039import java.util.Iterator;
040import java.util.List;
041import java.util.Locale;
042import java.util.Map;
043import java.util.Optional;
044import java.util.Set;
045import java.util.Vector;
046import java.util.concurrent.ConcurrentHashMap;
047import java.util.concurrent.ExecutionException;
048import java.util.concurrent.ExecutorService;
049import java.util.concurrent.Executors;
050import java.util.concurrent.Future;
051import java.util.concurrent.FutureTask;
052import java.util.concurrent.ThreadPoolExecutor;
053import java.util.concurrent.TimeUnit;
054import java.util.regex.Pattern;
055import org.apache.commons.lang3.ArrayUtils;
056import org.apache.hadoop.conf.Configuration;
057import org.apache.hadoop.fs.BlockLocation;
058import org.apache.hadoop.fs.FSDataInputStream;
059import org.apache.hadoop.fs.FSDataOutputStream;
060import org.apache.hadoop.fs.FileStatus;
061import org.apache.hadoop.fs.FileSystem;
062import org.apache.hadoop.fs.FileUtil;
063import org.apache.hadoop.fs.Path;
064import org.apache.hadoop.fs.PathFilter;
065import org.apache.hadoop.fs.StorageType;
066import org.apache.hadoop.fs.permission.FsPermission;
067import org.apache.hadoop.hbase.ClusterId;
068import org.apache.hadoop.hbase.HConstants;
069import org.apache.hadoop.hbase.HDFSBlocksDistribution;
070import org.apache.hadoop.hbase.TableName;
071import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
072import org.apache.hadoop.hbase.client.RegionInfo;
073import org.apache.hadoop.hbase.client.RegionInfoBuilder;
074import org.apache.hadoop.hbase.exceptions.DeserializationException;
075import org.apache.hadoop.hbase.fs.HFileSystem;
076import org.apache.hadoop.hbase.io.HFileLink;
077import org.apache.hadoop.hbase.master.HMaster;
078import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
079import org.apache.hadoop.hdfs.DFSClient;
080import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
081import org.apache.hadoop.hdfs.DFSUtil;
082import org.apache.hadoop.hdfs.DistributedFileSystem;
083import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
084import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
085import org.apache.hadoop.hdfs.protocol.LocatedBlock;
086import org.apache.hadoop.io.IOUtils;
087import org.apache.hadoop.ipc.RemoteException;
088import org.apache.hadoop.util.Progressable;
089import org.apache.yetus.audience.InterfaceAudience;
090import org.slf4j.Logger;
091import org.slf4j.LoggerFactory;
092
093import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
094import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
095import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
096import org.apache.hbase.thirdparty.com.google.common.primitives.Ints;
097import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
098
099import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
100import org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos;
101
102/**
103 * Utility methods for interacting with the underlying file system.
104 */
105@InterfaceAudience.Private
106public final class FSUtils {
107  private static final Logger LOG = LoggerFactory.getLogger(FSUtils.class);
108
109  private static final String THREAD_POOLSIZE = "hbase.client.localityCheck.threadPoolSize";
110  private static final int DEFAULT_THREAD_POOLSIZE = 2;
111
112  /** Set to true on Windows platforms */
113  // currently only used in testing. TODO refactor into a test class
114  public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
115
116  private FSUtils() {
117  }
118
119  /** Returns True is <code>fs</code> is instance of DistributedFileSystem n */
120  public static boolean isDistributedFileSystem(final FileSystem fs) throws IOException {
121    FileSystem fileSystem = fs;
122    // If passed an instance of HFileSystem, it fails instanceof DistributedFileSystem.
123    // Check its backing fs for dfs-ness.
124    if (fs instanceof HFileSystem) {
125      fileSystem = ((HFileSystem) fs).getBackingFs();
126    }
127    return fileSystem instanceof DistributedFileSystem;
128  }
129
130  /**
131   * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
132   * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true. Does not consider
133   * schema; i.e. if schemas different but path or subpath matches, the two will equate.
134   * @param pathToSearch Path we will be trying to match. n * @return True if <code>pathTail</code>
135   *                     is tail on the path of <code>pathToSearch</code>
136   */
137  public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) {
138    Path tailPath = pathTail;
139    String tailName;
140    Path toSearch = pathToSearch;
141    String toSearchName;
142    boolean result = false;
143
144    if (pathToSearch.depth() != pathTail.depth()) {
145      return false;
146    }
147
148    do {
149      tailName = tailPath.getName();
150      if (tailName == null || tailName.isEmpty()) {
151        result = true;
152        break;
153      }
154      toSearchName = toSearch.getName();
155      if (toSearchName == null || toSearchName.isEmpty()) {
156        break;
157      }
158      // Move up a parent on each path for next go around. Path doesn't let us go off the end.
159      tailPath = tailPath.getParent();
160      toSearch = toSearch.getParent();
161    } while (tailName.equals(toSearchName));
162    return result;
163  }
164
165  /**
166   * Delete the region directory if exists.
167   * @return True if deleted the region directory. n
168   */
169  public static boolean deleteRegionDir(final Configuration conf, final RegionInfo hri)
170    throws IOException {
171    Path rootDir = CommonFSUtils.getRootDir(conf);
172    FileSystem fs = rootDir.getFileSystem(conf);
173    return CommonFSUtils.deleteDirectory(fs,
174      new Path(CommonFSUtils.getTableDir(rootDir, hri.getTable()), hri.getEncodedName()));
175  }
176
177  /**
178   * Create the specified file on the filesystem. By default, this will:
179   * <ol>
180   * <li>overwrite the file if it exists</li>
181   * <li>apply the umask in the configuration (if it is enabled)</li>
182   * <li>use the fs configured buffer size (or 4096 if not set)</li>
183   * <li>use the configured column family replication or default replication if
184   * {@link ColumnFamilyDescriptorBuilder#DEFAULT_DFS_REPLICATION}</li>
185   * <li>use the default block size</li>
186   * <li>not track progress</li>
187   * </ol>
188   * @param conf         configurations
189   * @param fs           {@link FileSystem} on which to write the file
190   * @param path         {@link Path} to the file to write
191   * @param perm         permissions
192   * @param favoredNodes favored data nodes
193   * @return output stream to the created file
194   * @throws IOException if the file cannot be created
195   */
196  public static FSDataOutputStream create(Configuration conf, FileSystem fs, Path path,
197    FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException {
198    if (fs instanceof HFileSystem) {
199      FileSystem backingFs = ((HFileSystem) fs).getBackingFs();
200      if (backingFs instanceof DistributedFileSystem) {
201        // Try to use the favoredNodes version via reflection to allow backwards-
202        // compatibility.
203        short replication = Short.parseShort(conf.get(ColumnFamilyDescriptorBuilder.DFS_REPLICATION,
204          String.valueOf(ColumnFamilyDescriptorBuilder.DEFAULT_DFS_REPLICATION)));
205        try {
206          return (FSDataOutputStream) (DistributedFileSystem.class
207            .getDeclaredMethod("create", Path.class, FsPermission.class, boolean.class, int.class,
208              short.class, long.class, Progressable.class, InetSocketAddress[].class)
209            .invoke(backingFs, path, perm, true, CommonFSUtils.getDefaultBufferSize(backingFs),
210              replication > 0 ? replication : CommonFSUtils.getDefaultReplication(backingFs, path),
211              CommonFSUtils.getDefaultBlockSize(backingFs, path), null, favoredNodes));
212        } catch (InvocationTargetException ite) {
213          // Function was properly called, but threw it's own exception.
214          throw new IOException(ite.getCause());
215        } catch (NoSuchMethodException e) {
216          LOG.debug("DFS Client does not support most favored nodes create; using default create");
217          LOG.trace("Ignoring; use default create", e);
218        } catch (IllegalArgumentException | SecurityException | IllegalAccessException e) {
219          LOG.debug("Ignoring (most likely Reflection related exception) " + e);
220        }
221      }
222    }
223    return CommonFSUtils.create(fs, path, perm, true);
224  }
225
226  /**
227   * Checks to see if the specified file system is available
228   * @param fs filesystem
229   * @throws IOException e
230   */
231  public static void checkFileSystemAvailable(final FileSystem fs) throws IOException {
232    if (!(fs instanceof DistributedFileSystem)) {
233      return;
234    }
235    IOException exception = null;
236    DistributedFileSystem dfs = (DistributedFileSystem) fs;
237    try {
238      if (dfs.exists(new Path("/"))) {
239        return;
240      }
241    } catch (IOException e) {
242      exception = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
243    }
244    try {
245      fs.close();
246    } catch (Exception e) {
247      LOG.error("file system close failed: ", e);
248    }
249    throw new IOException("File system is not available", exception);
250  }
251
252  /**
253   * Inquire the Active NameNode's safe mode status.
254   * @param dfs A DistributedFileSystem object representing the underlying HDFS.
255   * @return whether we're in safe mode n
256   */
257  private static boolean isInSafeMode(DistributedFileSystem dfs) throws IOException {
258    return dfs.setSafeMode(SAFEMODE_GET, true);
259  }
260
261  /**
262   * Check whether dfs is in safemode. nn
263   */
264  public static void checkDfsSafeMode(final Configuration conf) throws IOException {
265    boolean isInSafeMode = false;
266    FileSystem fs = FileSystem.get(conf);
267    if (fs instanceof DistributedFileSystem) {
268      DistributedFileSystem dfs = (DistributedFileSystem) fs;
269      isInSafeMode = isInSafeMode(dfs);
270    }
271    if (isInSafeMode) {
272      throw new IOException("File system is in safemode, it can't be written now");
273    }
274  }
275
276  /**
277   * Verifies current version of file system
278   * @param fs      filesystem object
279   * @param rootdir root hbase directory
280   * @return null if no version file exists, version string otherwise
281   * @throws IOException              if the version file fails to open
282   * @throws DeserializationException if the version data cannot be translated into a version
283   */
284  public static String getVersion(FileSystem fs, Path rootdir)
285    throws IOException, DeserializationException {
286    final Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
287    FileStatus[] status = null;
288    try {
289      // hadoop 2.0 throws FNFE if directory does not exist.
290      // hadoop 1.0 returns null if directory does not exist.
291      status = fs.listStatus(versionFile);
292    } catch (FileNotFoundException fnfe) {
293      return null;
294    }
295    if (ArrayUtils.getLength(status) == 0) {
296      return null;
297    }
298    String version = null;
299    byte[] content = new byte[(int) status[0].getLen()];
300    FSDataInputStream s = fs.open(versionFile);
301    try {
302      IOUtils.readFully(s, content, 0, content.length);
303      if (ProtobufUtil.isPBMagicPrefix(content)) {
304        version = parseVersionFrom(content);
305      } else {
306        // Presume it pre-pb format.
307        try (DataInputStream dis = new DataInputStream(new ByteArrayInputStream(content))) {
308          version = dis.readUTF();
309        }
310      }
311    } catch (EOFException eof) {
312      LOG.warn("Version file was empty, odd, will try to set it.");
313    } finally {
314      s.close();
315    }
316    return version;
317  }
318
319  /**
320   * Parse the content of the ${HBASE_ROOTDIR}/hbase.version file.
321   * @param bytes The byte content of the hbase.version file
322   * @return The version found in the file as a String
323   * @throws DeserializationException if the version data cannot be translated into a version
324   */
325  static String parseVersionFrom(final byte[] bytes) throws DeserializationException {
326    ProtobufUtil.expectPBMagicPrefix(bytes);
327    int pblen = ProtobufUtil.lengthOfPBMagic();
328    FSProtos.HBaseVersionFileContent.Builder builder =
329      FSProtos.HBaseVersionFileContent.newBuilder();
330    try {
331      ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
332      return builder.getVersion();
333    } catch (IOException e) {
334      // Convert
335      throw new DeserializationException(e);
336    }
337  }
338
339  /**
340   * Create the content to write into the ${HBASE_ROOTDIR}/hbase.version file.
341   * @param version Version to persist
342   * @return Serialized protobuf with <code>version</code> content and a bit of pb magic for a
343   *         prefix.
344   */
345  static byte[] toVersionByteArray(final String version) {
346    FSProtos.HBaseVersionFileContent.Builder builder =
347      FSProtos.HBaseVersionFileContent.newBuilder();
348    return ProtobufUtil.prependPBMagic(builder.setVersion(version).build().toByteArray());
349  }
350
351  /**
352   * Verifies current version of file system
353   * @param fs      file system
354   * @param rootdir root directory of HBase installation
355   * @param message if true, issues a message on System.out
356   * @throws IOException              if the version file cannot be opened
357   * @throws DeserializationException if the contents of the version file cannot be parsed
358   */
359  public static void checkVersion(FileSystem fs, Path rootdir, boolean message)
360    throws IOException, DeserializationException {
361    checkVersion(fs, rootdir, message, 0, HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
362  }
363
364  /**
365   * Verifies current version of file system
366   * @param fs      file system
367   * @param rootdir root directory of HBase installation
368   * @param message if true, issues a message on System.out
369   * @param wait    wait interval
370   * @param retries number of times to retry
371   * @throws IOException              if the version file cannot be opened
372   * @throws DeserializationException if the contents of the version file cannot be parsed
373   */
374  public static void checkVersion(FileSystem fs, Path rootdir, boolean message, int wait,
375    int retries) throws IOException, DeserializationException {
376    String version = getVersion(fs, rootdir);
377    String msg;
378    if (version == null) {
379      if (!metaRegionExists(fs, rootdir)) {
380        // rootDir is empty (no version file and no root region)
381        // just create new version file (HBASE-1195)
382        setVersion(fs, rootdir, wait, retries);
383        return;
384      } else {
385        msg = "hbase.version file is missing. Is your hbase.rootdir valid? "
386          + "You can restore hbase.version file by running 'HBCK2 filesystem -fix'. "
387          + "See https://github.com/apache/hbase-operator-tools/tree/master/hbase-hbck2";
388      }
389    } else if (version.compareTo(HConstants.FILE_SYSTEM_VERSION) == 0) {
390      return;
391    } else {
392      msg = "HBase file layout needs to be upgraded. Current filesystem version is " + version
393        + " but software requires version " + HConstants.FILE_SYSTEM_VERSION
394        + ". Consult http://hbase.apache.org/book.html for further information about "
395        + "upgrading HBase.";
396    }
397
398    // version is deprecated require migration
399    // Output on stdout so user sees it in terminal.
400    if (message) {
401      System.out.println("WARNING! " + msg);
402    }
403    throw new FileSystemVersionException(msg);
404  }
405
406  /**
407   * Sets version of file system
408   * @param fs      filesystem object
409   * @param rootdir hbase root
410   * @throws IOException e
411   */
412  public static void setVersion(FileSystem fs, Path rootdir) throws IOException {
413    setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, 0,
414      HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
415  }
416
417  /**
418   * Sets version of file system
419   * @param fs      filesystem object
420   * @param rootdir hbase root
421   * @param wait    time to wait for retry
422   * @param retries number of times to retry before failing
423   * @throws IOException e
424   */
425  public static void setVersion(FileSystem fs, Path rootdir, int wait, int retries)
426    throws IOException {
427    setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, wait, retries);
428  }
429
430  /**
431   * Sets version of file system
432   * @param fs      filesystem object
433   * @param rootdir hbase root directory
434   * @param version version to set
435   * @param wait    time to wait for retry
436   * @param retries number of times to retry before throwing an IOException
437   * @throws IOException e
438   */
439  public static void setVersion(FileSystem fs, Path rootdir, String version, int wait, int retries)
440    throws IOException {
441    Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
442    Path tempVersionFile = new Path(rootdir,
443      HConstants.HBASE_TEMP_DIRECTORY + Path.SEPARATOR + HConstants.VERSION_FILE_NAME);
444    while (true) {
445      try {
446        // Write the version to a temporary file
447        FSDataOutputStream s = fs.create(tempVersionFile);
448        try {
449          s.write(toVersionByteArray(version));
450          s.close();
451          s = null;
452          // Move the temp version file to its normal location. Returns false
453          // if the rename failed. Throw an IOE in that case.
454          if (!fs.rename(tempVersionFile, versionFile)) {
455            throw new IOException("Unable to move temp version file to " + versionFile);
456          }
457        } finally {
458          // Cleaning up the temporary if the rename failed would be trying
459          // too hard. We'll unconditionally create it again the next time
460          // through anyway, files are overwritten by default by create().
461
462          // Attempt to close the stream on the way out if it is still open.
463          try {
464            if (s != null) s.close();
465          } catch (IOException ignore) {
466          }
467        }
468        LOG.info("Created version file at " + rootdir.toString() + " with version=" + version);
469        return;
470      } catch (IOException e) {
471        if (retries > 0) {
472          LOG.debug("Unable to create version file at " + rootdir.toString() + ", retrying", e);
473          fs.delete(versionFile, false);
474          try {
475            if (wait > 0) {
476              Thread.sleep(wait);
477            }
478          } catch (InterruptedException ie) {
479            throw (InterruptedIOException) new InterruptedIOException().initCause(ie);
480          }
481          retries--;
482        } else {
483          throw e;
484        }
485      }
486    }
487  }
488
489  /**
490   * Checks that a cluster ID file exists in the HBase root directory
491   * @param fs      the root directory FileSystem
492   * @param rootdir the HBase root directory in HDFS
493   * @param wait    how long to wait between retries
494   * @return <code>true</code> if the file exists, otherwise <code>false</code>
495   * @throws IOException if checking the FileSystem fails
496   */
497  public static boolean checkClusterIdExists(FileSystem fs, Path rootdir, long wait)
498    throws IOException {
499    while (true) {
500      try {
501        Path filePath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
502        return fs.exists(filePath);
503      } catch (IOException ioe) {
504        if (wait > 0L) {
505          LOG.warn("Unable to check cluster ID file in {}, retrying in {}ms", rootdir, wait, ioe);
506          try {
507            Thread.sleep(wait);
508          } catch (InterruptedException e) {
509            Thread.currentThread().interrupt();
510            throw (InterruptedIOException) new InterruptedIOException().initCause(e);
511          }
512        } else {
513          throw ioe;
514        }
515      }
516    }
517  }
518
519  /**
520   * Returns the value of the unique cluster ID stored for this HBase instance.
521   * @param fs      the root directory FileSystem
522   * @param rootdir the path to the HBase root directory
523   * @return the unique cluster identifier
524   * @throws IOException if reading the cluster ID file fails
525   */
526  public static ClusterId getClusterId(FileSystem fs, Path rootdir) throws IOException {
527    Path idPath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
528    ClusterId clusterId = null;
529    FileStatus status = fs.exists(idPath) ? fs.getFileStatus(idPath) : null;
530    if (status != null) {
531      int len = Ints.checkedCast(status.getLen());
532      byte[] content = new byte[len];
533      FSDataInputStream in = fs.open(idPath);
534      try {
535        in.readFully(content);
536      } catch (EOFException eof) {
537        LOG.warn("Cluster ID file {} is empty", idPath);
538      } finally {
539        in.close();
540      }
541      try {
542        clusterId = ClusterId.parseFrom(content);
543      } catch (DeserializationException e) {
544        throw new IOException("content=" + Bytes.toString(content), e);
545      }
546      // If not pb'd, make it so.
547      if (!ProtobufUtil.isPBMagicPrefix(content)) {
548        String cid = null;
549        in = fs.open(idPath);
550        try {
551          cid = in.readUTF();
552          clusterId = new ClusterId(cid);
553        } catch (EOFException eof) {
554          LOG.warn("Cluster ID file {} is empty", idPath);
555        } finally {
556          in.close();
557        }
558        rewriteAsPb(fs, rootdir, idPath, clusterId);
559      }
560      return clusterId;
561    } else {
562      LOG.warn("Cluster ID file does not exist at {}", idPath);
563    }
564    return clusterId;
565  }
566
567  /**
568   * nn
569   */
570  private static void rewriteAsPb(final FileSystem fs, final Path rootdir, final Path p,
571    final ClusterId cid) throws IOException {
572    // Rewrite the file as pb. Move aside the old one first, write new
573    // then delete the moved-aside file.
574    Path movedAsideName = new Path(p + "." + EnvironmentEdgeManager.currentTime());
575    if (!fs.rename(p, movedAsideName)) throw new IOException("Failed rename of " + p);
576    setClusterId(fs, rootdir, cid, 100);
577    if (!fs.delete(movedAsideName, false)) {
578      throw new IOException("Failed delete of " + movedAsideName);
579    }
580    LOG.debug("Rewrote the hbase.id file as pb");
581  }
582
583  /**
584   * Writes a new unique identifier for this cluster to the "hbase.id" file in the HBase root
585   * directory. If any operations on the ID file fails, and {@code wait} is a positive value, the
586   * method will retry to produce the ID file until the thread is forcibly interrupted.
587   * @param fs        the root directory FileSystem
588   * @param rootdir   the path to the HBase root directory
589   * @param clusterId the unique identifier to store
590   * @param wait      how long (in milliseconds) to wait between retries
591   * @throws IOException if writing to the FileSystem fails and no wait value
592   */
593  public static void setClusterId(final FileSystem fs, final Path rootdir,
594    final ClusterId clusterId, final long wait) throws IOException {
595
596    final Path idFile = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
597    final Path tempDir = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY);
598    final Path tempIdFile = new Path(tempDir, HConstants.CLUSTER_ID_FILE_NAME);
599
600    LOG.debug("Create cluster ID file [{}] with ID: {}", idFile, clusterId);
601
602    while (true) {
603      Optional<IOException> failure = Optional.empty();
604
605      LOG.debug("Write the cluster ID file to a temporary location: {}", tempIdFile);
606      try (FSDataOutputStream s = fs.create(tempIdFile)) {
607        s.write(clusterId.toByteArray());
608      } catch (IOException ioe) {
609        failure = Optional.of(ioe);
610      }
611
612      if (!failure.isPresent()) {
613        try {
614          LOG.debug("Move the temporary cluster ID file to its target location [{}]:[{}]",
615            tempIdFile, idFile);
616
617          if (!fs.rename(tempIdFile, idFile)) {
618            failure =
619              Optional.of(new IOException("Unable to move temp cluster ID file to " + idFile));
620          }
621        } catch (IOException ioe) {
622          failure = Optional.of(ioe);
623        }
624      }
625
626      if (failure.isPresent()) {
627        final IOException cause = failure.get();
628        if (wait > 0L) {
629          LOG.warn("Unable to create cluster ID file in {}, retrying in {}ms", rootdir, wait,
630            cause);
631          try {
632            Thread.sleep(wait);
633          } catch (InterruptedException e) {
634            Thread.currentThread().interrupt();
635            throw (InterruptedIOException) new InterruptedIOException().initCause(e);
636          }
637          continue;
638        } else {
639          throw cause;
640        }
641      } else {
642        return;
643      }
644    }
645  }
646
647  /**
648   * If DFS, check safe mode and if so, wait until we clear it.
649   * @param conf configuration
650   * @param wait Sleep between retries
651   * @throws IOException e
652   */
653  public static void waitOnSafeMode(final Configuration conf, final long wait) throws IOException {
654    FileSystem fs = FileSystem.get(conf);
655    if (!(fs instanceof DistributedFileSystem)) return;
656    DistributedFileSystem dfs = (DistributedFileSystem) fs;
657    // Make sure dfs is not in safe mode
658    while (isInSafeMode(dfs)) {
659      LOG.info("Waiting for dfs to exit safe mode...");
660      try {
661        Thread.sleep(wait);
662      } catch (InterruptedException e) {
663        Thread.currentThread().interrupt();
664        throw (InterruptedIOException) new InterruptedIOException().initCause(e);
665      }
666    }
667  }
668
669  /**
670   * Checks if meta region exists
671   * @param fs      file system
672   * @param rootDir root directory of HBase installation
673   * @return true if exists
674   */
675  public static boolean metaRegionExists(FileSystem fs, Path rootDir) throws IOException {
676    Path metaRegionDir = getRegionDirFromRootDir(rootDir, RegionInfoBuilder.FIRST_META_REGIONINFO);
677    return fs.exists(metaRegionDir);
678  }
679
680  /**
681   * Compute HDFS block distribution of a given HdfsDataInputStream. All HdfsDataInputStreams are
682   * backed by a series of LocatedBlocks, which are fetched periodically from the namenode. This
683   * method retrieves those blocks from the input stream and uses them to calculate
684   * HDFSBlockDistribution. The underlying method in DFSInputStream does attempt to use locally
685   * cached blocks, but may hit the namenode if the cache is determined to be incomplete. The method
686   * also involves making copies of all LocatedBlocks rather than return the underlying blocks
687   * themselves.
688   */
689  static public HDFSBlocksDistribution
690    computeHDFSBlocksDistribution(HdfsDataInputStream inputStream) throws IOException {
691    List<LocatedBlock> blocks = inputStream.getAllBlocks();
692    HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution();
693    for (LocatedBlock block : blocks) {
694      String[] hosts = getHostsForLocations(block);
695      long len = block.getBlockSize();
696      StorageType[] storageTypes = block.getStorageTypes();
697      blocksDistribution.addHostsAndBlockWeight(hosts, len, storageTypes);
698    }
699    return blocksDistribution;
700  }
701
702  private static String[] getHostsForLocations(LocatedBlock block) {
703    DatanodeInfo[] locations = block.getLocations();
704    String[] hosts = new String[locations.length];
705    for (int i = 0; i < hosts.length; i++) {
706      hosts[i] = locations[i].getHostName();
707    }
708    return hosts;
709  }
710
711  /**
712   * Compute HDFS blocks distribution of a given file, or a portion of the file
713   * @param fs     file system
714   * @param status file status of the file
715   * @param start  start position of the portion
716   * @param length length of the portion
717   * @return The HDFS blocks distribution
718   */
719  static public HDFSBlocksDistribution computeHDFSBlocksDistribution(final FileSystem fs,
720    FileStatus status, long start, long length) throws IOException {
721    HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution();
722    BlockLocation[] blockLocations = fs.getFileBlockLocations(status, start, length);
723    addToHDFSBlocksDistribution(blocksDistribution, blockLocations);
724    return blocksDistribution;
725  }
726
727  /**
728   * Update blocksDistribution with blockLocations
729   * @param blocksDistribution the hdfs blocks distribution
730   * @param blockLocations     an array containing block location
731   */
732  static public void addToHDFSBlocksDistribution(HDFSBlocksDistribution blocksDistribution,
733    BlockLocation[] blockLocations) throws IOException {
734    for (BlockLocation bl : blockLocations) {
735      String[] hosts = bl.getHosts();
736      long len = bl.getLength();
737      StorageType[] storageTypes = bl.getStorageTypes();
738      blocksDistribution.addHostsAndBlockWeight(hosts, len, storageTypes);
739    }
740  }
741
742  // TODO move this method OUT of FSUtils. No dependencies to HMaster
743  /**
744   * Returns the total overall fragmentation percentage. Includes hbase:meta and -ROOT- as well.
745   * @param master The master defining the HBase root and file system
746   * @return A map for each table and its percentage (never null)
747   * @throws IOException When scanning the directory fails
748   */
749  public static int getTotalTableFragmentation(final HMaster master) throws IOException {
750    Map<String, Integer> map = getTableFragmentation(master);
751    return map.isEmpty() ? -1 : map.get("-TOTAL-");
752  }
753
754  /**
755   * Runs through the HBase rootdir and checks how many stores for each table have more than one
756   * file in them. Checks -ROOT- and hbase:meta too. The total percentage across all tables is
757   * stored under the special key "-TOTAL-".
758   * @param master The master defining the HBase root and file system.
759   * @return A map for each table and its percentage (never null).
760   * @throws IOException When scanning the directory fails.
761   */
762  public static Map<String, Integer> getTableFragmentation(final HMaster master)
763    throws IOException {
764    Path path = CommonFSUtils.getRootDir(master.getConfiguration());
765    // since HMaster.getFileSystem() is package private
766    FileSystem fs = path.getFileSystem(master.getConfiguration());
767    return getTableFragmentation(fs, path);
768  }
769
770  /**
771   * Runs through the HBase rootdir and checks how many stores for each table have more than one
772   * file in them. Checks -ROOT- and hbase:meta too. The total percentage across all tables is
773   * stored under the special key "-TOTAL-".
774   * @param fs           The file system to use
775   * @param hbaseRootDir The root directory to scan
776   * @return A map for each table and its percentage (never null)
777   * @throws IOException When scanning the directory fails
778   */
779  public static Map<String, Integer> getTableFragmentation(final FileSystem fs,
780    final Path hbaseRootDir) throws IOException {
781    Map<String, Integer> frags = new HashMap<>();
782    int cfCountTotal = 0;
783    int cfFragTotal = 0;
784    PathFilter regionFilter = new RegionDirFilter(fs);
785    PathFilter familyFilter = new FamilyDirFilter(fs);
786    List<Path> tableDirs = getTableDirs(fs, hbaseRootDir);
787    for (Path d : tableDirs) {
788      int cfCount = 0;
789      int cfFrag = 0;
790      FileStatus[] regionDirs = fs.listStatus(d, regionFilter);
791      for (FileStatus regionDir : regionDirs) {
792        Path dd = regionDir.getPath();
793        // else its a region name, now look in region for families
794        FileStatus[] familyDirs = fs.listStatus(dd, familyFilter);
795        for (FileStatus familyDir : familyDirs) {
796          cfCount++;
797          cfCountTotal++;
798          Path family = familyDir.getPath();
799          // now in family make sure only one file
800          FileStatus[] familyStatus = fs.listStatus(family);
801          if (familyStatus.length > 1) {
802            cfFrag++;
803            cfFragTotal++;
804          }
805        }
806      }
807      // compute percentage per table and store in result list
808      frags.put(CommonFSUtils.getTableName(d).getNameAsString(),
809        cfCount == 0 ? 0 : Math.round((float) cfFrag / cfCount * 100));
810    }
811    // set overall percentage for all tables
812    frags.put("-TOTAL-",
813      cfCountTotal == 0 ? 0 : Math.round((float) cfFragTotal / cfCountTotal * 100));
814    return frags;
815  }
816
817  public static void renameFile(FileSystem fs, Path src, Path dst) throws IOException {
818    if (fs.exists(dst) && !fs.delete(dst, false)) {
819      throw new IOException("Can not delete " + dst);
820    }
821    if (!fs.rename(src, dst)) {
822      throw new IOException("Can not rename from " + src + " to " + dst);
823    }
824  }
825
826  /**
827   * A {@link PathFilter} that returns only regular files.
828   */
829  static class FileFilter extends AbstractFileStatusFilter {
830    private final FileSystem fs;
831
832    public FileFilter(final FileSystem fs) {
833      this.fs = fs;
834    }
835
836    @Override
837    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
838      try {
839        return isFile(fs, isDir, p);
840      } catch (IOException e) {
841        LOG.warn("Unable to verify if path={} is a regular file", p, e);
842        return false;
843      }
844    }
845  }
846
847  /**
848   * Directory filter that doesn't include any of the directories in the specified blacklist
849   */
850  public static class BlackListDirFilter extends AbstractFileStatusFilter {
851    private final FileSystem fs;
852    private List<String> blacklist;
853
854    /**
855     * Create a filter on the givem filesystem with the specified blacklist
856     * @param fs                     filesystem to filter
857     * @param directoryNameBlackList list of the names of the directories to filter. If
858     *                               <tt>null</tt>, all directories are returned
859     */
860    @SuppressWarnings("unchecked")
861    public BlackListDirFilter(final FileSystem fs, final List<String> directoryNameBlackList) {
862      this.fs = fs;
863      blacklist = (List<String>) (directoryNameBlackList == null
864        ? Collections.emptyList()
865        : directoryNameBlackList);
866    }
867
868    @Override
869    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
870      if (!isValidName(p.getName())) {
871        return false;
872      }
873
874      try {
875        return isDirectory(fs, isDir, p);
876      } catch (IOException e) {
877        LOG.warn("An error occurred while verifying if [{}] is a valid directory."
878          + " Returning 'not valid' and continuing.", p, e);
879        return false;
880      }
881    }
882
883    protected boolean isValidName(final String name) {
884      return !blacklist.contains(name);
885    }
886  }
887
888  /**
889   * A {@link PathFilter} that only allows directories.
890   */
891  public static class DirFilter extends BlackListDirFilter {
892
893    public DirFilter(FileSystem fs) {
894      super(fs, null);
895    }
896  }
897
898  /**
899   * A {@link PathFilter} that returns usertable directories. To get all directories use the
900   * {@link BlackListDirFilter} with a <tt>null</tt> blacklist
901   */
902  public static class UserTableDirFilter extends BlackListDirFilter {
903    public UserTableDirFilter(FileSystem fs) {
904      super(fs, HConstants.HBASE_NON_TABLE_DIRS);
905    }
906
907    @Override
908    protected boolean isValidName(final String name) {
909      if (!super.isValidName(name)) return false;
910
911      try {
912        TableName.isLegalTableQualifierName(Bytes.toBytes(name));
913      } catch (IllegalArgumentException e) {
914        LOG.info("Invalid table name: {}", name);
915        return false;
916      }
917      return true;
918    }
919  }
920
921  public static List<Path> getTableDirs(final FileSystem fs, final Path rootdir)
922    throws IOException {
923    List<Path> tableDirs = new ArrayList<>();
924    Path baseNamespaceDir = new Path(rootdir, HConstants.BASE_NAMESPACE_DIR);
925    if (fs.exists(baseNamespaceDir)) {
926      for (FileStatus status : fs.globStatus(new Path(baseNamespaceDir, "*"))) {
927        tableDirs.addAll(FSUtils.getLocalTableDirs(fs, status.getPath()));
928      }
929    }
930    return tableDirs;
931  }
932
933  /**
934   * nn * @return All the table directories under <code>rootdir</code>. Ignore non table hbase
935   * folders such as .logs, .oldlogs, .corrupt folders. n
936   */
937  public static List<Path> getLocalTableDirs(final FileSystem fs, final Path rootdir)
938    throws IOException {
939    // presumes any directory under hbase.rootdir is a table
940    FileStatus[] dirs = fs.listStatus(rootdir, new UserTableDirFilter(fs));
941    List<Path> tabledirs = new ArrayList<>(dirs.length);
942    for (FileStatus dir : dirs) {
943      tabledirs.add(dir.getPath());
944    }
945    return tabledirs;
946  }
947
948  /**
949   * Filter for all dirs that don't start with '.'
950   */
951  public static class RegionDirFilter extends AbstractFileStatusFilter {
952    // This pattern will accept 0.90+ style hex region dirs and older numeric region dir names.
953    final public static Pattern regionDirPattern = Pattern.compile("^[0-9a-f]*$");
954    final FileSystem fs;
955
956    public RegionDirFilter(FileSystem fs) {
957      this.fs = fs;
958    }
959
960    @Override
961    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
962      if (!regionDirPattern.matcher(p.getName()).matches()) {
963        return false;
964      }
965
966      try {
967        return isDirectory(fs, isDir, p);
968      } catch (IOException ioe) {
969        // Maybe the file was moved or the fs was disconnected.
970        LOG.warn("Skipping file {} due to IOException", p, ioe);
971        return false;
972      }
973    }
974  }
975
976  /**
977   * Given a particular table dir, return all the regiondirs inside it, excluding files such as
978   * .tableinfo
979   * @param fs       A file system for the Path
980   * @param tableDir Path to a specific table directory &lt;hbase.rootdir&gt;/&lt;tabledir&gt;
981   * @return List of paths to valid region directories in table dir. n
982   */
983  public static List<Path> getRegionDirs(final FileSystem fs, final Path tableDir)
984    throws IOException {
985    // assumes we are in a table dir.
986    List<FileStatus> rds = listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs));
987    if (rds == null) {
988      return Collections.emptyList();
989    }
990    List<Path> regionDirs = new ArrayList<>(rds.size());
991    for (FileStatus rdfs : rds) {
992      Path rdPath = rdfs.getPath();
993      regionDirs.add(rdPath);
994    }
995    return regionDirs;
996  }
997
998  public static Path getRegionDirFromRootDir(Path rootDir, RegionInfo region) {
999    return getRegionDirFromTableDir(CommonFSUtils.getTableDir(rootDir, region.getTable()), region);
1000  }
1001
1002  public static Path getRegionDirFromTableDir(Path tableDir, RegionInfo region) {
1003    return getRegionDirFromTableDir(tableDir,
1004      ServerRegionReplicaUtil.getRegionInfoForFs(region).getEncodedName());
1005  }
1006
1007  public static Path getRegionDirFromTableDir(Path tableDir, String encodedRegionName) {
1008    return new Path(tableDir, encodedRegionName);
1009  }
1010
1011  /**
1012   * Filter for all dirs that are legal column family names. This is generally used for colfam dirs
1013   * &lt;hbase.rootdir&gt;/&lt;tabledir&gt;/&lt;regiondir&gt;/&lt;colfamdir&gt;.
1014   */
1015  public static class FamilyDirFilter extends AbstractFileStatusFilter {
1016    final FileSystem fs;
1017
1018    public FamilyDirFilter(FileSystem fs) {
1019      this.fs = fs;
1020    }
1021
1022    @Override
1023    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1024      try {
1025        // throws IAE if invalid
1026        ColumnFamilyDescriptorBuilder.isLegalColumnFamilyName(Bytes.toBytes(p.getName()));
1027      } catch (IllegalArgumentException iae) {
1028        // path name is an invalid family name and thus is excluded.
1029        return false;
1030      }
1031
1032      try {
1033        return isDirectory(fs, isDir, p);
1034      } catch (IOException ioe) {
1035        // Maybe the file was moved or the fs was disconnected.
1036        LOG.warn("Skipping file {} due to IOException", p, ioe);
1037        return false;
1038      }
1039    }
1040  }
1041
1042  /**
1043   * Given a particular region dir, return all the familydirs inside it
1044   * @param fs        A file system for the Path
1045   * @param regionDir Path to a specific region directory
1046   * @return List of paths to valid family directories in region dir. n
1047   */
1048  public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir)
1049    throws IOException {
1050    // assumes we are in a region dir.
1051    return getFilePaths(fs, regionDir, new FamilyDirFilter(fs));
1052  }
1053
1054  public static List<Path> getReferenceFilePaths(final FileSystem fs, final Path familyDir)
1055    throws IOException {
1056    return getFilePaths(fs, familyDir, new ReferenceFileFilter(fs));
1057  }
1058
1059  public static List<Path> getReferenceAndLinkFilePaths(final FileSystem fs, final Path familyDir)
1060    throws IOException {
1061    return getFilePaths(fs, familyDir, new ReferenceAndLinkFileFilter(fs));
1062  }
1063
1064  private static List<Path> getFilePaths(final FileSystem fs, final Path dir,
1065    final PathFilter pathFilter) throws IOException {
1066    FileStatus[] fds = fs.listStatus(dir, pathFilter);
1067    List<Path> files = new ArrayList<>(fds.length);
1068    for (FileStatus fdfs : fds) {
1069      Path fdPath = fdfs.getPath();
1070      files.add(fdPath);
1071    }
1072    return files;
1073  }
1074
1075  public static int getRegionReferenceAndLinkFileCount(final FileSystem fs, final Path p) {
1076    int result = 0;
1077    try {
1078      for (Path familyDir : getFamilyDirs(fs, p)) {
1079        result += getReferenceAndLinkFilePaths(fs, familyDir).size();
1080      }
1081    } catch (IOException e) {
1082      LOG.warn("Error Counting reference files.", e);
1083    }
1084    return result;
1085  }
1086
1087  public static class ReferenceAndLinkFileFilter implements PathFilter {
1088
1089    private final FileSystem fs;
1090
1091    public ReferenceAndLinkFileFilter(FileSystem fs) {
1092      this.fs = fs;
1093    }
1094
1095    @Override
1096    public boolean accept(Path rd) {
1097      try {
1098        // only files can be references.
1099        return !fs.getFileStatus(rd).isDirectory()
1100          && (StoreFileInfo.isReference(rd) || HFileLink.isHFileLink(rd));
1101      } catch (IOException ioe) {
1102        // Maybe the file was moved or the fs was disconnected.
1103        LOG.warn("Skipping file " + rd + " due to IOException", ioe);
1104        return false;
1105      }
1106    }
1107  }
1108
1109  /**
1110   * Filter for HFiles that excludes reference files.
1111   */
1112  public static class HFileFilter extends AbstractFileStatusFilter {
1113    final FileSystem fs;
1114
1115    public HFileFilter(FileSystem fs) {
1116      this.fs = fs;
1117    }
1118
1119    @Override
1120    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1121      if (!StoreFileInfo.isHFile(p) && !StoreFileInfo.isMobFile(p)) {
1122        return false;
1123      }
1124
1125      try {
1126        return isFile(fs, isDir, p);
1127      } catch (IOException ioe) {
1128        // Maybe the file was moved or the fs was disconnected.
1129        LOG.warn("Skipping file {} due to IOException", p, ioe);
1130        return false;
1131      }
1132    }
1133  }
1134
1135  /**
1136   * Filter for HFileLinks (StoreFiles and HFiles not included). the filter itself does not consider
1137   * if a link is file or not.
1138   */
1139  public static class HFileLinkFilter implements PathFilter {
1140
1141    @Override
1142    public boolean accept(Path p) {
1143      return HFileLink.isHFileLink(p);
1144    }
1145  }
1146
1147  public static class ReferenceFileFilter extends AbstractFileStatusFilter {
1148
1149    private final FileSystem fs;
1150
1151    public ReferenceFileFilter(FileSystem fs) {
1152      this.fs = fs;
1153    }
1154
1155    @Override
1156    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1157      if (!StoreFileInfo.isReference(p)) {
1158        return false;
1159      }
1160
1161      try {
1162        // only files can be references.
1163        return isFile(fs, isDir, p);
1164      } catch (IOException ioe) {
1165        // Maybe the file was moved or the fs was disconnected.
1166        LOG.warn("Skipping file {} due to IOException", p, ioe);
1167        return false;
1168      }
1169    }
1170  }
1171
1172  /**
1173   * Called every so-often by storefile map builder getTableStoreFilePathMap to report progress.
1174   */
1175  interface ProgressReporter {
1176    /**
1177     * @param status File or directory we are about to process.
1178     */
1179    void progress(FileStatus status);
1180  }
1181
1182  /**
1183   * Runs through the HBase rootdir/tablename and creates a reverse lookup map for table StoreFile
1184   * names to the full Path. <br>
1185   * Example...<br>
1186   * Key = 3944417774205889744 <br>
1187   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1188   * @param map          map to add values. If null, this method will create and populate one to
1189   *                     return
1190   * @param fs           The file system to use.
1191   * @param hbaseRootDir The root directory to scan.
1192   * @param tableName    name of the table to scan.
1193   * @return Map keyed by StoreFile name with a value of the full Path.
1194   * @throws IOException When scanning the directory fails. n
1195   */
1196  public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map,
1197    final FileSystem fs, final Path hbaseRootDir, TableName tableName)
1198    throws IOException, InterruptedException {
1199    return getTableStoreFilePathMap(map, fs, hbaseRootDir, tableName, null, null,
1200      (ProgressReporter) null);
1201  }
1202
1203  /**
1204   * Runs through the HBase rootdir/tablename and creates a reverse lookup map for table StoreFile
1205   * names to the full Path. Note that because this method can be called on a 'live' HBase system
1206   * that we will skip files that no longer exist by the time we traverse them and similarly the
1207   * user of the result needs to consider that some entries in this map may not exist by the time
1208   * this call completes. <br>
1209   * Example...<br>
1210   * Key = 3944417774205889744 <br>
1211   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1212   * @param resultMap        map to add values. If null, this method will create and populate one to
1213   *                         return
1214   * @param fs               The file system to use.
1215   * @param hbaseRootDir     The root directory to scan.
1216   * @param tableName        name of the table to scan.
1217   * @param sfFilter         optional path filter to apply to store files
1218   * @param executor         optional executor service to parallelize this operation
1219   * @param progressReporter Instance or null; gets called every time we move to new region of
1220   *                         family dir and for each store file.
1221   * @return Map keyed by StoreFile name with a value of the full Path.
1222   * @throws IOException When scanning the directory fails.
1223   * @deprecated Since 2.3.0. For removal in hbase4. Use ProgressReporter override instead.
1224   */
1225  @Deprecated
1226  public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap,
1227    final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter,
1228    ExecutorService executor, final HbckErrorReporter progressReporter)
1229    throws IOException, InterruptedException {
1230    return getTableStoreFilePathMap(resultMap, fs, hbaseRootDir, tableName, sfFilter, executor,
1231      new ProgressReporter() {
1232        @Override
1233        public void progress(FileStatus status) {
1234          // status is not used in this implementation.
1235          progressReporter.progress();
1236        }
1237      });
1238  }
1239
1240  /**
1241   * Runs through the HBase rootdir/tablename and creates a reverse lookup map for table StoreFile
1242   * names to the full Path. Note that because this method can be called on a 'live' HBase system
1243   * that we will skip files that no longer exist by the time we traverse them and similarly the
1244   * user of the result needs to consider that some entries in this map may not exist by the time
1245   * this call completes. <br>
1246   * Example...<br>
1247   * Key = 3944417774205889744 <br>
1248   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1249   * @param resultMap        map to add values. If null, this method will create and populate one to
1250   *                         return
1251   * @param fs               The file system to use.
1252   * @param hbaseRootDir     The root directory to scan.
1253   * @param tableName        name of the table to scan.
1254   * @param sfFilter         optional path filter to apply to store files
1255   * @param executor         optional executor service to parallelize this operation
1256   * @param progressReporter Instance or null; gets called every time we move to new region of
1257   *                         family dir and for each store file.
1258   * @return Map keyed by StoreFile name with a value of the full Path.
1259   * @throws IOException          When scanning the directory fails.
1260   * @throws InterruptedException the thread is interrupted, either before or during the activity.
1261   */
1262  public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap,
1263    final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter,
1264    ExecutorService executor, final ProgressReporter progressReporter)
1265    throws IOException, InterruptedException {
1266
1267    final Map<String, Path> finalResultMap =
1268      resultMap == null ? new ConcurrentHashMap<>(128, 0.75f, 32) : resultMap;
1269
1270    // only include the directory paths to tables
1271    Path tableDir = CommonFSUtils.getTableDir(hbaseRootDir, tableName);
1272    // Inside a table, there are compaction.dir directories to skip. Otherwise, all else
1273    // should be regions.
1274    final FamilyDirFilter familyFilter = new FamilyDirFilter(fs);
1275    final Vector<Exception> exceptions = new Vector<>();
1276
1277    try {
1278      List<FileStatus> regionDirs =
1279        FSUtils.listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs));
1280      if (regionDirs == null) {
1281        return finalResultMap;
1282      }
1283
1284      final List<Future<?>> futures = new ArrayList<>(regionDirs.size());
1285
1286      for (FileStatus regionDir : regionDirs) {
1287        if (null != progressReporter) {
1288          progressReporter.progress(regionDir);
1289        }
1290        final Path dd = regionDir.getPath();
1291
1292        if (!exceptions.isEmpty()) {
1293          break;
1294        }
1295
1296        Runnable getRegionStoreFileMapCall = new Runnable() {
1297          @Override
1298          public void run() {
1299            try {
1300              HashMap<String, Path> regionStoreFileMap = new HashMap<>();
1301              List<FileStatus> familyDirs =
1302                FSUtils.listStatusWithStatusFilter(fs, dd, familyFilter);
1303              if (familyDirs == null) {
1304                if (!fs.exists(dd)) {
1305                  LOG.warn("Skipping region because it no longer exists: " + dd);
1306                } else {
1307                  LOG.warn("Skipping region because it has no family dirs: " + dd);
1308                }
1309                return;
1310              }
1311              for (FileStatus familyDir : familyDirs) {
1312                if (null != progressReporter) {
1313                  progressReporter.progress(familyDir);
1314                }
1315                Path family = familyDir.getPath();
1316                if (family.getName().equals(HConstants.RECOVERED_EDITS_DIR)) {
1317                  continue;
1318                }
1319                // now in family, iterate over the StoreFiles and
1320                // put in map
1321                FileStatus[] familyStatus = fs.listStatus(family);
1322                for (FileStatus sfStatus : familyStatus) {
1323                  if (null != progressReporter) {
1324                    progressReporter.progress(sfStatus);
1325                  }
1326                  Path sf = sfStatus.getPath();
1327                  if (sfFilter == null || sfFilter.accept(sf)) {
1328                    regionStoreFileMap.put(sf.getName(), sf);
1329                  }
1330                }
1331              }
1332              finalResultMap.putAll(regionStoreFileMap);
1333            } catch (Exception e) {
1334              LOG.error("Could not get region store file map for region: " + dd, e);
1335              exceptions.add(e);
1336            }
1337          }
1338        };
1339
1340        // If executor is available, submit async tasks to exec concurrently, otherwise
1341        // just do serial sync execution
1342        if (executor != null) {
1343          Future<?> future = executor.submit(getRegionStoreFileMapCall);
1344          futures.add(future);
1345        } else {
1346          FutureTask<?> future = new FutureTask<>(getRegionStoreFileMapCall, null);
1347          future.run();
1348          futures.add(future);
1349        }
1350      }
1351
1352      // Ensure all pending tasks are complete (or that we run into an exception)
1353      for (Future<?> f : futures) {
1354        if (!exceptions.isEmpty()) {
1355          break;
1356        }
1357        try {
1358          f.get();
1359        } catch (ExecutionException e) {
1360          LOG.error("Unexpected exec exception!  Should've been caught already.  (Bug?)", e);
1361          // Shouldn't happen, we already logged/caught any exceptions in the Runnable
1362        }
1363      }
1364    } catch (IOException e) {
1365      LOG.error("Cannot execute getTableStoreFilePathMap for " + tableName, e);
1366      exceptions.add(e);
1367    } finally {
1368      if (!exceptions.isEmpty()) {
1369        // Just throw the first exception as an indication something bad happened
1370        // Don't need to propagate all the exceptions, we already logged them all anyway
1371        Throwables.propagateIfPossible(exceptions.firstElement(), IOException.class);
1372        throw new IOException(exceptions.firstElement());
1373      }
1374    }
1375
1376    return finalResultMap;
1377  }
1378
1379  public static int getRegionReferenceFileCount(final FileSystem fs, final Path p) {
1380    int result = 0;
1381    try {
1382      for (Path familyDir : getFamilyDirs(fs, p)) {
1383        result += getReferenceFilePaths(fs, familyDir).size();
1384      }
1385    } catch (IOException e) {
1386      LOG.warn("Error counting reference files", e);
1387    }
1388    return result;
1389  }
1390
1391  /**
1392   * Runs through the HBase rootdir and creates a reverse lookup map for table StoreFile names to
1393   * the full Path. <br>
1394   * Example...<br>
1395   * Key = 3944417774205889744 <br>
1396   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1397   * @param fs           The file system to use.
1398   * @param hbaseRootDir The root directory to scan.
1399   * @return Map keyed by StoreFile name with a value of the full Path.
1400   * @throws IOException When scanning the directory fails.
1401   */
1402  public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs,
1403    final Path hbaseRootDir) throws IOException, InterruptedException {
1404    return getTableStoreFilePathMap(fs, hbaseRootDir, null, null, (ProgressReporter) null);
1405  }
1406
1407  /**
1408   * Runs through the HBase rootdir and creates a reverse lookup map for table StoreFile names to
1409   * the full Path. <br>
1410   * Example...<br>
1411   * Key = 3944417774205889744 <br>
1412   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1413   * @param fs               The file system to use.
1414   * @param hbaseRootDir     The root directory to scan.
1415   * @param sfFilter         optional path filter to apply to store files
1416   * @param executor         optional executor service to parallelize this operation
1417   * @param progressReporter Instance or null; gets called every time we move to new region of
1418   *                         family dir and for each store file.
1419   * @return Map keyed by StoreFile name with a value of the full Path.
1420   * @throws IOException When scanning the directory fails.
1421   * @deprecated Since 2.3.0. Will be removed in hbase4. Used
1422   *             {@link #getTableStoreFilePathMap(FileSystem, Path, PathFilter, ExecutorService, ProgressReporter)}
1423   */
1424  @Deprecated
1425  public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs,
1426    final Path hbaseRootDir, PathFilter sfFilter, ExecutorService executor,
1427    HbckErrorReporter progressReporter) throws IOException, InterruptedException {
1428    return getTableStoreFilePathMap(fs, hbaseRootDir, sfFilter, executor, new ProgressReporter() {
1429      @Override
1430      public void progress(FileStatus status) {
1431        // status is not used in this implementation.
1432        progressReporter.progress();
1433      }
1434    });
1435  }
1436
1437  /**
1438   * Runs through the HBase rootdir and creates a reverse lookup map for table StoreFile names to
1439   * the full Path. <br>
1440   * Example...<br>
1441   * Key = 3944417774205889744 <br>
1442   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1443   * @param fs               The file system to use.
1444   * @param hbaseRootDir     The root directory to scan.
1445   * @param sfFilter         optional path filter to apply to store files
1446   * @param executor         optional executor service to parallelize this operation
1447   * @param progressReporter Instance or null; gets called every time we move to new region of
1448   *                         family dir and for each store file.
1449   * @return Map keyed by StoreFile name with a value of the full Path.
1450   * @throws IOException When scanning the directory fails. n
1451   */
1452  public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs,
1453    final Path hbaseRootDir, PathFilter sfFilter, ExecutorService executor,
1454    ProgressReporter progressReporter) throws IOException, InterruptedException {
1455    ConcurrentHashMap<String, Path> map = new ConcurrentHashMap<>(1024, 0.75f, 32);
1456
1457    // if this method looks similar to 'getTableFragmentation' that is because
1458    // it was borrowed from it.
1459
1460    // only include the directory paths to tables
1461    for (Path tableDir : FSUtils.getTableDirs(fs, hbaseRootDir)) {
1462      getTableStoreFilePathMap(map, fs, hbaseRootDir, CommonFSUtils.getTableName(tableDir),
1463        sfFilter, executor, progressReporter);
1464    }
1465    return map;
1466  }
1467
1468  /**
1469   * Filters FileStatuses in an array and returns a list
1470   * @param input  An array of FileStatuses
1471   * @param filter A required filter to filter the array
1472   * @return A list of FileStatuses
1473   */
1474  public static List<FileStatus> filterFileStatuses(FileStatus[] input, FileStatusFilter filter) {
1475    if (input == null) return null;
1476    return filterFileStatuses(Iterators.forArray(input), filter);
1477  }
1478
1479  /**
1480   * Filters FileStatuses in an iterator and returns a list
1481   * @param input  An iterator of FileStatuses
1482   * @param filter A required filter to filter the array
1483   * @return A list of FileStatuses
1484   */
1485  public static List<FileStatus> filterFileStatuses(Iterator<FileStatus> input,
1486    FileStatusFilter filter) {
1487    if (input == null) return null;
1488    ArrayList<FileStatus> results = new ArrayList<>();
1489    while (input.hasNext()) {
1490      FileStatus f = input.next();
1491      if (filter.accept(f)) {
1492        results.add(f);
1493      }
1494    }
1495    return results;
1496  }
1497
1498  /**
1499   * Calls fs.listStatus() and treats FileNotFoundException as non-fatal This accommodates
1500   * differences between hadoop versions, where hadoop 1 does not throw a FileNotFoundException, and
1501   * return an empty FileStatus[] while Hadoop 2 will throw FileNotFoundException.
1502   * @param fs     file system
1503   * @param dir    directory
1504   * @param filter file status filter
1505   * @return null if dir is empty or doesn't exist, otherwise FileStatus list
1506   */
1507  public static List<FileStatus> listStatusWithStatusFilter(final FileSystem fs, final Path dir,
1508    final FileStatusFilter filter) throws IOException {
1509    FileStatus[] status = null;
1510    try {
1511      status = fs.listStatus(dir);
1512    } catch (FileNotFoundException fnfe) {
1513      LOG.trace("{} does not exist", dir);
1514      return null;
1515    }
1516
1517    if (ArrayUtils.getLength(status) == 0) {
1518      return null;
1519    }
1520
1521    if (filter == null) {
1522      return Arrays.asList(status);
1523    } else {
1524      List<FileStatus> status2 = filterFileStatuses(status, filter);
1525      if (status2 == null || status2.isEmpty()) {
1526        return null;
1527      } else {
1528        return status2;
1529      }
1530    }
1531  }
1532
1533  /**
1534   * This function is to scan the root path of the file system to get the degree of locality for
1535   * each region on each of the servers having at least one block of that region. This is used by
1536   * the tool {@link org.apache.hadoop.hbase.master.RegionPlacementMaintainer} n * the configuration
1537   * to use
1538   * @return the mapping from region encoded name to a map of server names to locality fraction n *
1539   *         in case of file system errors or interrupts
1540   */
1541  public static Map<String, Map<String, Float>>
1542    getRegionDegreeLocalityMappingFromFS(final Configuration conf) throws IOException {
1543    return getRegionDegreeLocalityMappingFromFS(conf, null,
1544      conf.getInt(THREAD_POOLSIZE, DEFAULT_THREAD_POOLSIZE));
1545
1546  }
1547
1548  /**
1549   * This function is to scan the root path of the file system to get the degree of locality for
1550   * each region on each of the servers having at least one block of that region. n * the
1551   * configuration to use n * the table you wish to scan locality for n * the thread pool size to
1552   * use
1553   * @return the mapping from region encoded name to a map of server names to locality fraction n *
1554   *         in case of file system errors or interrupts
1555   */
1556  public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
1557    final Configuration conf, final String desiredTable, int threadPoolSize) throws IOException {
1558    Map<String, Map<String, Float>> regionDegreeLocalityMapping = new ConcurrentHashMap<>();
1559    getRegionLocalityMappingFromFS(conf, desiredTable, threadPoolSize, regionDegreeLocalityMapping);
1560    return regionDegreeLocalityMapping;
1561  }
1562
1563  /**
1564   * This function is to scan the root path of the file system to get either the mapping between the
1565   * region name and its best locality region server or the degree of locality of each region on
1566   * each of the servers having at least one block of that region. The output map parameters are
1567   * both optional. n * the configuration to use n * the table you wish to scan locality for n * the
1568   * thread pool size to use n * the map into which to put the locality degree mapping or null, must
1569   * be a thread-safe implementation n * in case of file system errors or interrupts
1570   */
1571  private static void getRegionLocalityMappingFromFS(final Configuration conf,
1572    final String desiredTable, int threadPoolSize,
1573    final Map<String, Map<String, Float>> regionDegreeLocalityMapping) throws IOException {
1574    final FileSystem fs = FileSystem.get(conf);
1575    final Path rootPath = CommonFSUtils.getRootDir(conf);
1576    final long startTime = EnvironmentEdgeManager.currentTime();
1577    final Path queryPath;
1578    // The table files are in ${hbase.rootdir}/data/<namespace>/<table>/*
1579    if (null == desiredTable) {
1580      queryPath =
1581        new Path(new Path(rootPath, HConstants.BASE_NAMESPACE_DIR).toString() + "/*/*/*/");
1582    } else {
1583      queryPath = new Path(
1584        CommonFSUtils.getTableDir(rootPath, TableName.valueOf(desiredTable)).toString() + "/*/");
1585    }
1586
1587    // reject all paths that are not appropriate
1588    PathFilter pathFilter = new PathFilter() {
1589      @Override
1590      public boolean accept(Path path) {
1591        // this is the region name; it may get some noise data
1592        if (null == path) {
1593          return false;
1594        }
1595
1596        // no parent?
1597        Path parent = path.getParent();
1598        if (null == parent) {
1599          return false;
1600        }
1601
1602        String regionName = path.getName();
1603        if (null == regionName) {
1604          return false;
1605        }
1606
1607        if (!regionName.toLowerCase(Locale.ROOT).matches("[0-9a-f]+")) {
1608          return false;
1609        }
1610        return true;
1611      }
1612    };
1613
1614    FileStatus[] statusList = fs.globStatus(queryPath, pathFilter);
1615
1616    if (LOG.isDebugEnabled()) {
1617      LOG.debug("Query Path: {} ; # list of files: {}", queryPath, Arrays.toString(statusList));
1618    }
1619
1620    if (null == statusList) {
1621      return;
1622    }
1623
1624    // lower the number of threads in case we have very few expected regions
1625    threadPoolSize = Math.min(threadPoolSize, statusList.length);
1626
1627    // run in multiple threads
1628    final ExecutorService tpe = Executors.newFixedThreadPool(threadPoolSize,
1629      new ThreadFactoryBuilder().setNameFormat("FSRegionQuery-pool-%d").setDaemon(true)
1630        .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
1631    try {
1632      // ignore all file status items that are not of interest
1633      for (FileStatus regionStatus : statusList) {
1634        if (null == regionStatus || !regionStatus.isDirectory()) {
1635          continue;
1636        }
1637
1638        final Path regionPath = regionStatus.getPath();
1639        if (null != regionPath) {
1640          tpe.execute(new FSRegionScanner(fs, regionPath, null, regionDegreeLocalityMapping));
1641        }
1642      }
1643    } finally {
1644      tpe.shutdown();
1645      final long threadWakeFrequency = (long) conf.getInt(HConstants.THREAD_WAKE_FREQUENCY,
1646        HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
1647      try {
1648        // here we wait until TPE terminates, which is either naturally or by
1649        // exceptions in the execution of the threads
1650        while (!tpe.awaitTermination(threadWakeFrequency, TimeUnit.MILLISECONDS)) {
1651          // printing out rough estimate, so as to not introduce
1652          // AtomicInteger
1653          LOG.info("Locality checking is underway: { Scanned Regions : "
1654            + ((ThreadPoolExecutor) tpe).getCompletedTaskCount() + "/"
1655            + ((ThreadPoolExecutor) tpe).getTaskCount() + " }");
1656        }
1657      } catch (InterruptedException e) {
1658        Thread.currentThread().interrupt();
1659        throw (InterruptedIOException) new InterruptedIOException().initCause(e);
1660      }
1661    }
1662
1663    long overhead = EnvironmentEdgeManager.currentTime() - startTime;
1664    LOG.info("Scan DFS for locality info takes {}ms", overhead);
1665  }
1666
1667  /**
1668   * Do our short circuit read setup. Checks buffer size to use and whether to do checksumming in
1669   * hbase or hdfs. n
1670   */
1671  public static void setupShortCircuitRead(final Configuration conf) {
1672    // Check that the user has not set the "dfs.client.read.shortcircuit.skip.checksum" property.
1673    boolean shortCircuitSkipChecksum =
1674      conf.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false);
1675    boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
1676    if (shortCircuitSkipChecksum) {
1677      LOG.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not "
1678        + "be set to true."
1679        + (useHBaseChecksum
1680          ? " HBase checksum doesn't require "
1681            + "it, see https://issues.apache.org/jira/browse/HBASE-6868."
1682          : ""));
1683      assert !shortCircuitSkipChecksum; // this will fail if assertions are on
1684    }
1685    checkShortCircuitReadBufferSize(conf);
1686  }
1687
1688  /**
1689   * Check if short circuit read buffer size is set and if not, set it to hbase value. n
1690   */
1691  public static void checkShortCircuitReadBufferSize(final Configuration conf) {
1692    final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2;
1693    final int notSet = -1;
1694    // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2
1695    final String dfsKey = "dfs.client.read.shortcircuit.buffer.size";
1696    int size = conf.getInt(dfsKey, notSet);
1697    // If a size is set, return -- we will use it.
1698    if (size != notSet) return;
1699    // But short circuit buffer size is normally not set. Put in place the hbase wanted size.
1700    int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize);
1701    conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
1702  }
1703
1704  /**
1705   * n * @return The DFSClient DFSHedgedReadMetrics instance or null if can't be found or not on
1706   * hdfs. n
1707   */
1708  public static DFSHedgedReadMetrics getDFSHedgedReadMetrics(final Configuration c)
1709    throws IOException {
1710    if (!CommonFSUtils.isHDFS(c)) {
1711      return null;
1712    }
1713    // getHedgedReadMetrics is package private. Get the DFSClient instance that is internal
1714    // to the DFS FS instance and make the method getHedgedReadMetrics accessible, then invoke it
1715    // to get the singleton instance of DFSHedgedReadMetrics shared by DFSClients.
1716    final String name = "getHedgedReadMetrics";
1717    DFSClient dfsclient = ((DistributedFileSystem) FileSystem.get(c)).getClient();
1718    Method m;
1719    try {
1720      m = dfsclient.getClass().getDeclaredMethod(name);
1721    } catch (NoSuchMethodException e) {
1722      LOG.warn(
1723        "Failed find method " + name + " in dfsclient; no hedged read metrics: " + e.getMessage());
1724      return null;
1725    } catch (SecurityException e) {
1726      LOG.warn(
1727        "Failed find method " + name + " in dfsclient; no hedged read metrics: " + e.getMessage());
1728      return null;
1729    }
1730    m.setAccessible(true);
1731    try {
1732      return (DFSHedgedReadMetrics) m.invoke(dfsclient);
1733    } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
1734      LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: "
1735        + e.getMessage());
1736      return null;
1737    }
1738  }
1739
1740  public static List<Path> copyFilesParallel(FileSystem srcFS, Path src, FileSystem dstFS, Path dst,
1741    Configuration conf, int threads) throws IOException {
1742    ExecutorService pool = Executors.newFixedThreadPool(threads);
1743    List<Future<Void>> futures = new ArrayList<>();
1744    List<Path> traversedPaths;
1745    try {
1746      traversedPaths = copyFiles(srcFS, src, dstFS, dst, conf, pool, futures);
1747      for (Future<Void> future : futures) {
1748        future.get();
1749      }
1750    } catch (ExecutionException | InterruptedException | IOException e) {
1751      throw new IOException("Copy snapshot reference files failed", e);
1752    } finally {
1753      pool.shutdownNow();
1754    }
1755    return traversedPaths;
1756  }
1757
1758  private static List<Path> copyFiles(FileSystem srcFS, Path src, FileSystem dstFS, Path dst,
1759    Configuration conf, ExecutorService pool, List<Future<Void>> futures) throws IOException {
1760    List<Path> traversedPaths = new ArrayList<>();
1761    traversedPaths.add(dst);
1762    FileStatus currentFileStatus = srcFS.getFileStatus(src);
1763    if (currentFileStatus.isDirectory()) {
1764      if (!dstFS.mkdirs(dst)) {
1765        throw new IOException("Create directory failed: " + dst);
1766      }
1767      FileStatus[] subPaths = srcFS.listStatus(src);
1768      for (FileStatus subPath : subPaths) {
1769        traversedPaths.addAll(copyFiles(srcFS, subPath.getPath(), dstFS,
1770          new Path(dst, subPath.getPath().getName()), conf, pool, futures));
1771      }
1772    } else {
1773      Future<Void> future = pool.submit(() -> {
1774        FileUtil.copy(srcFS, src, dstFS, dst, false, false, conf);
1775        return null;
1776      });
1777      futures.add(future);
1778    }
1779    return traversedPaths;
1780  }
1781
1782  /** Returns A set containing all namenode addresses of fs */
1783  private static Set<InetSocketAddress> getNNAddresses(DistributedFileSystem fs,
1784    Configuration conf) {
1785    Set<InetSocketAddress> addresses = new HashSet<>();
1786    String serviceName = fs.getCanonicalServiceName();
1787
1788    if (serviceName.startsWith("ha-hdfs")) {
1789      try {
1790        Map<String, Map<String, InetSocketAddress>> addressMap =
1791          DFSUtil.getNNServiceRpcAddressesForCluster(conf);
1792        String nameService = serviceName.substring(serviceName.indexOf(":") + 1);
1793        if (addressMap.containsKey(nameService)) {
1794          Map<String, InetSocketAddress> nnMap = addressMap.get(nameService);
1795          for (Map.Entry<String, InetSocketAddress> e2 : nnMap.entrySet()) {
1796            InetSocketAddress addr = e2.getValue();
1797            addresses.add(addr);
1798          }
1799        }
1800      } catch (Exception e) {
1801        LOG.warn("DFSUtil.getNNServiceRpcAddresses failed. serviceName=" + serviceName, e);
1802      }
1803    } else {
1804      URI uri = fs.getUri();
1805      int port = uri.getPort();
1806      if (port < 0) {
1807        int idx = serviceName.indexOf(':');
1808        port = Integer.parseInt(serviceName.substring(idx + 1));
1809      }
1810      InetSocketAddress addr = new InetSocketAddress(uri.getHost(), port);
1811      addresses.add(addr);
1812    }
1813
1814    return addresses;
1815  }
1816
1817  /**
1818   * @param conf the Configuration of HBase
1819   * @return Whether srcFs and desFs are on same hdfs or not
1820   */
1821  public static boolean isSameHdfs(Configuration conf, FileSystem srcFs, FileSystem desFs) {
1822    // By getCanonicalServiceName, we could make sure both srcFs and desFs
1823    // show a unified format which contains scheme, host and port.
1824    String srcServiceName = srcFs.getCanonicalServiceName();
1825    String desServiceName = desFs.getCanonicalServiceName();
1826
1827    if (srcServiceName == null || desServiceName == null) {
1828      return false;
1829    }
1830    if (srcServiceName.equals(desServiceName)) {
1831      return true;
1832    }
1833    if (srcServiceName.startsWith("ha-hdfs") && desServiceName.startsWith("ha-hdfs")) {
1834      Collection<String> internalNameServices =
1835        conf.getTrimmedStringCollection("dfs.internal.nameservices");
1836      if (!internalNameServices.isEmpty()) {
1837        if (internalNameServices.contains(srcServiceName.split(":")[1])) {
1838          return true;
1839        } else {
1840          return false;
1841        }
1842      }
1843    }
1844    if (srcFs instanceof DistributedFileSystem && desFs instanceof DistributedFileSystem) {
1845      // If one serviceName is an HA format while the other is a non-HA format,
1846      // maybe they refer to the same FileSystem.
1847      // For example, srcFs is "ha-hdfs://nameservices" and desFs is "hdfs://activeNamenode:port"
1848      Set<InetSocketAddress> srcAddrs = getNNAddresses((DistributedFileSystem) srcFs, conf);
1849      Set<InetSocketAddress> desAddrs = getNNAddresses((DistributedFileSystem) desFs, conf);
1850      if (Sets.intersection(srcAddrs, desAddrs).size() > 0) {
1851        return true;
1852      }
1853    }
1854
1855    return false;
1856  }
1857}