001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import static org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET;
021
022import edu.umd.cs.findbugs.annotations.CheckForNull;
023import java.io.ByteArrayInputStream;
024import java.io.DataInputStream;
025import java.io.EOFException;
026import java.io.FileNotFoundException;
027import java.io.IOException;
028import java.io.InterruptedIOException;
029import java.lang.reflect.InvocationTargetException;
030import java.lang.reflect.Method;
031import java.net.InetSocketAddress;
032import java.net.URI;
033import java.util.ArrayList;
034import java.util.Arrays;
035import java.util.Collection;
036import java.util.Collections;
037import java.util.HashMap;
038import java.util.HashSet;
039import java.util.Iterator;
040import java.util.List;
041import java.util.Locale;
042import java.util.Map;
043import java.util.Optional;
044import java.util.Set;
045import java.util.Vector;
046import java.util.concurrent.ConcurrentHashMap;
047import java.util.concurrent.ExecutionException;
048import java.util.concurrent.ExecutorService;
049import java.util.concurrent.Executors;
050import java.util.concurrent.Future;
051import java.util.concurrent.FutureTask;
052import java.util.concurrent.ThreadPoolExecutor;
053import java.util.concurrent.TimeUnit;
054import java.util.regex.Pattern;
055import org.apache.commons.lang3.ArrayUtils;
056import org.apache.hadoop.conf.Configuration;
057import org.apache.hadoop.fs.BlockLocation;
058import org.apache.hadoop.fs.FSDataInputStream;
059import org.apache.hadoop.fs.FSDataOutputStream;
060import org.apache.hadoop.fs.FileStatus;
061import org.apache.hadoop.fs.FileSystem;
062import org.apache.hadoop.fs.FileUtil;
063import org.apache.hadoop.fs.Path;
064import org.apache.hadoop.fs.PathFilter;
065import org.apache.hadoop.fs.StorageType;
066import org.apache.hadoop.fs.permission.FsPermission;
067import org.apache.hadoop.hbase.ClusterId;
068import org.apache.hadoop.hbase.HConstants;
069import org.apache.hadoop.hbase.HDFSBlocksDistribution;
070import org.apache.hadoop.hbase.TableName;
071import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
072import org.apache.hadoop.hbase.client.RegionInfo;
073import org.apache.hadoop.hbase.client.RegionInfoBuilder;
074import org.apache.hadoop.hbase.exceptions.DeserializationException;
075import org.apache.hadoop.hbase.fs.HFileSystem;
076import org.apache.hadoop.hbase.io.HFileLink;
077import org.apache.hadoop.hbase.master.HMaster;
078import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
079import org.apache.hadoop.hdfs.DFSClient;
080import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
081import org.apache.hadoop.hdfs.DFSUtil;
082import org.apache.hadoop.hdfs.DistributedFileSystem;
083import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
084import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
085import org.apache.hadoop.hdfs.protocol.LocatedBlock;
086import org.apache.hadoop.io.IOUtils;
087import org.apache.hadoop.ipc.RemoteException;
088import org.apache.hadoop.util.Progressable;
089import org.apache.yetus.audience.InterfaceAudience;
090import org.slf4j.Logger;
091import org.slf4j.LoggerFactory;
092
093import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
094import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
095import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
096import org.apache.hbase.thirdparty.com.google.common.primitives.Ints;
097import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
098
099import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
100import org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos;
101
102/**
103 * Utility methods for interacting with the underlying file system.
104 */
105@InterfaceAudience.Private
106public final class FSUtils {
107  private static final Logger LOG = LoggerFactory.getLogger(FSUtils.class);
108
109  private static final String THREAD_POOLSIZE = "hbase.client.localityCheck.threadPoolSize";
110  private static final int DEFAULT_THREAD_POOLSIZE = 2;
111
112  /** Set to true on Windows platforms */
113  // currently only used in testing. TODO refactor into a test class
114  public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
115
116  private FSUtils() {
117  }
118
119  /**
120   * @return True is <code>fs</code> is instance of DistributedFileSystem n
121   */
122  public static boolean isDistributedFileSystem(final FileSystem fs) throws IOException {
123    FileSystem fileSystem = fs;
124    // If passed an instance of HFileSystem, it fails instanceof DistributedFileSystem.
125    // Check its backing fs for dfs-ness.
126    if (fs instanceof HFileSystem) {
127      fileSystem = ((HFileSystem) fs).getBackingFs();
128    }
129    return fileSystem instanceof DistributedFileSystem;
130  }
131
132  /**
133   * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
134   * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true. Does not consider
135   * schema; i.e. if schemas different but path or subpath matches, the two will equate.
136   * @param pathToSearch Path we will be trying to match. n * @return True if <code>pathTail</code>
137   *                     is tail on the path of <code>pathToSearch</code>
138   */
139  public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) {
140    Path tailPath = pathTail;
141    String tailName;
142    Path toSearch = pathToSearch;
143    String toSearchName;
144    boolean result = false;
145
146    if (pathToSearch.depth() != pathTail.depth()) {
147      return false;
148    }
149
150    do {
151      tailName = tailPath.getName();
152      if (tailName == null || tailName.isEmpty()) {
153        result = true;
154        break;
155      }
156      toSearchName = toSearch.getName();
157      if (toSearchName == null || toSearchName.isEmpty()) {
158        break;
159      }
160      // Move up a parent on each path for next go around. Path doesn't let us go off the end.
161      tailPath = tailPath.getParent();
162      toSearch = toSearch.getParent();
163    } while (tailName.equals(toSearchName));
164    return result;
165  }
166
167  /**
168   * Delete the region directory if exists.
169   * @return True if deleted the region directory. n
170   */
171  public static boolean deleteRegionDir(final Configuration conf, final RegionInfo hri)
172    throws IOException {
173    Path rootDir = CommonFSUtils.getRootDir(conf);
174    FileSystem fs = rootDir.getFileSystem(conf);
175    return CommonFSUtils.deleteDirectory(fs,
176      new Path(CommonFSUtils.getTableDir(rootDir, hri.getTable()), hri.getEncodedName()));
177  }
178
179  /**
180   * Create the specified file on the filesystem. By default, this will:
181   * <ol>
182   * <li>overwrite the file if it exists</li>
183   * <li>apply the umask in the configuration (if it is enabled)</li>
184   * <li>use the fs configured buffer size (or 4096 if not set)</li>
185   * <li>use the configured column family replication or default replication if
186   * {@link ColumnFamilyDescriptorBuilder#DEFAULT_DFS_REPLICATION}</li>
187   * <li>use the default block size</li>
188   * <li>not track progress</li>
189   * </ol>
190   * @param conf         configurations
191   * @param fs           {@link FileSystem} on which to write the file
192   * @param path         {@link Path} to the file to write
193   * @param perm         permissions
194   * @param favoredNodes favored data nodes
195   * @return output stream to the created file
196   * @throws IOException if the file cannot be created
197   */
198  public static FSDataOutputStream create(Configuration conf, FileSystem fs, Path path,
199    FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException {
200    if (fs instanceof HFileSystem) {
201      FileSystem backingFs = ((HFileSystem) fs).getBackingFs();
202      if (backingFs instanceof DistributedFileSystem) {
203        // Try to use the favoredNodes version via reflection to allow backwards-
204        // compatibility.
205        short replication = Short.parseShort(conf.get(ColumnFamilyDescriptorBuilder.DFS_REPLICATION,
206          String.valueOf(ColumnFamilyDescriptorBuilder.DEFAULT_DFS_REPLICATION)));
207        try {
208          return (FSDataOutputStream) (DistributedFileSystem.class
209            .getDeclaredMethod("create", Path.class, FsPermission.class, boolean.class, int.class,
210              short.class, long.class, Progressable.class, InetSocketAddress[].class)
211            .invoke(backingFs, path, perm, true, CommonFSUtils.getDefaultBufferSize(backingFs),
212              replication > 0 ? replication : CommonFSUtils.getDefaultReplication(backingFs, path),
213              CommonFSUtils.getDefaultBlockSize(backingFs, path), null, favoredNodes));
214        } catch (InvocationTargetException ite) {
215          // Function was properly called, but threw it's own exception.
216          throw new IOException(ite.getCause());
217        } catch (NoSuchMethodException e) {
218          LOG.debug("DFS Client does not support most favored nodes create; using default create");
219          LOG.trace("Ignoring; use default create", e);
220        } catch (IllegalArgumentException | SecurityException | IllegalAccessException e) {
221          LOG.debug("Ignoring (most likely Reflection related exception) " + e);
222        }
223      }
224    }
225    return CommonFSUtils.create(fs, path, perm, true);
226  }
227
228  /**
229   * Checks to see if the specified file system is available
230   * @param fs filesystem
231   * @throws IOException e
232   */
233  public static void checkFileSystemAvailable(final FileSystem fs) throws IOException {
234    if (!(fs instanceof DistributedFileSystem)) {
235      return;
236    }
237    IOException exception = null;
238    DistributedFileSystem dfs = (DistributedFileSystem) fs;
239    try {
240      if (dfs.exists(new Path("/"))) {
241        return;
242      }
243    } catch (IOException e) {
244      exception = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
245    }
246    try {
247      fs.close();
248    } catch (Exception e) {
249      LOG.error("file system close failed: ", e);
250    }
251    throw new IOException("File system is not available", exception);
252  }
253
254  /**
255   * Inquire the Active NameNode's safe mode status.
256   * @param dfs A DistributedFileSystem object representing the underlying HDFS.
257   * @return whether we're in safe mode n
258   */
259  private static boolean isInSafeMode(DistributedFileSystem dfs) throws IOException {
260    return dfs.setSafeMode(SAFEMODE_GET, true);
261  }
262
263  /**
264   * Check whether dfs is in safemode. nn
265   */
266  public static void checkDfsSafeMode(final Configuration conf) throws IOException {
267    boolean isInSafeMode = false;
268    FileSystem fs = FileSystem.get(conf);
269    if (fs instanceof DistributedFileSystem) {
270      DistributedFileSystem dfs = (DistributedFileSystem) fs;
271      isInSafeMode = isInSafeMode(dfs);
272    }
273    if (isInSafeMode) {
274      throw new IOException("File system is in safemode, it can't be written now");
275    }
276  }
277
278  /**
279   * Verifies current version of file system
280   * @param fs      filesystem object
281   * @param rootdir root hbase directory
282   * @return null if no version file exists, version string otherwise
283   * @throws IOException              if the version file fails to open
284   * @throws DeserializationException if the version data cannot be translated into a version
285   */
286  public static String getVersion(FileSystem fs, Path rootdir)
287    throws IOException, DeserializationException {
288    final Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
289    FileStatus[] status = null;
290    try {
291      // hadoop 2.0 throws FNFE if directory does not exist.
292      // hadoop 1.0 returns null if directory does not exist.
293      status = fs.listStatus(versionFile);
294    } catch (FileNotFoundException fnfe) {
295      return null;
296    }
297    if (ArrayUtils.getLength(status) == 0) {
298      return null;
299    }
300    String version = null;
301    byte[] content = new byte[(int) status[0].getLen()];
302    FSDataInputStream s = fs.open(versionFile);
303    try {
304      IOUtils.readFully(s, content, 0, content.length);
305      if (ProtobufUtil.isPBMagicPrefix(content)) {
306        version = parseVersionFrom(content);
307      } else {
308        // Presume it pre-pb format.
309        try (DataInputStream dis = new DataInputStream(new ByteArrayInputStream(content))) {
310          version = dis.readUTF();
311        }
312      }
313    } catch (EOFException eof) {
314      LOG.warn("Version file was empty, odd, will try to set it.");
315    } finally {
316      s.close();
317    }
318    return version;
319  }
320
321  /**
322   * Parse the content of the ${HBASE_ROOTDIR}/hbase.version file.
323   * @param bytes The byte content of the hbase.version file
324   * @return The version found in the file as a String
325   * @throws DeserializationException if the version data cannot be translated into a version
326   */
327  static String parseVersionFrom(final byte[] bytes) throws DeserializationException {
328    ProtobufUtil.expectPBMagicPrefix(bytes);
329    int pblen = ProtobufUtil.lengthOfPBMagic();
330    FSProtos.HBaseVersionFileContent.Builder builder =
331      FSProtos.HBaseVersionFileContent.newBuilder();
332    try {
333      ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
334      return builder.getVersion();
335    } catch (IOException e) {
336      // Convert
337      throw new DeserializationException(e);
338    }
339  }
340
341  /**
342   * Create the content to write into the ${HBASE_ROOTDIR}/hbase.version file.
343   * @param version Version to persist
344   * @return Serialized protobuf with <code>version</code> content and a bit of pb magic for a
345   *         prefix.
346   */
347  static byte[] toVersionByteArray(final String version) {
348    FSProtos.HBaseVersionFileContent.Builder builder =
349      FSProtos.HBaseVersionFileContent.newBuilder();
350    return ProtobufUtil.prependPBMagic(builder.setVersion(version).build().toByteArray());
351  }
352
353  /**
354   * Verifies current version of file system
355   * @param fs      file system
356   * @param rootdir root directory of HBase installation
357   * @param message if true, issues a message on System.out
358   * @throws IOException              if the version file cannot be opened
359   * @throws DeserializationException if the contents of the version file cannot be parsed
360   */
361  public static void checkVersion(FileSystem fs, Path rootdir, boolean message)
362    throws IOException, DeserializationException {
363    checkVersion(fs, rootdir, message, 0, HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
364  }
365
366  /**
367   * Verifies current version of file system
368   * @param fs      file system
369   * @param rootdir root directory of HBase installation
370   * @param message if true, issues a message on System.out
371   * @param wait    wait interval
372   * @param retries number of times to retry
373   * @throws IOException              if the version file cannot be opened
374   * @throws DeserializationException if the contents of the version file cannot be parsed
375   */
376  public static void checkVersion(FileSystem fs, Path rootdir, boolean message, int wait,
377    int retries) throws IOException, DeserializationException {
378    String version = getVersion(fs, rootdir);
379    String msg;
380    if (version == null) {
381      if (!metaRegionExists(fs, rootdir)) {
382        // rootDir is empty (no version file and no root region)
383        // just create new version file (HBASE-1195)
384        setVersion(fs, rootdir, wait, retries);
385        return;
386      } else {
387        msg = "hbase.version file is missing. Is your hbase.rootdir valid? "
388          + "You can restore hbase.version file by running 'HBCK2 filesystem -fix'. "
389          + "See https://github.com/apache/hbase-operator-tools/tree/master/hbase-hbck2";
390      }
391    } else if (version.compareTo(HConstants.FILE_SYSTEM_VERSION) == 0) {
392      return;
393    } else {
394      msg = "HBase file layout needs to be upgraded. Current filesystem version is " + version
395        + " but software requires version " + HConstants.FILE_SYSTEM_VERSION
396        + ". Consult http://hbase.apache.org/book.html for further information about "
397        + "upgrading HBase.";
398    }
399
400    // version is deprecated require migration
401    // Output on stdout so user sees it in terminal.
402    if (message) {
403      System.out.println("WARNING! " + msg);
404    }
405    throw new FileSystemVersionException(msg);
406  }
407
408  /**
409   * Sets version of file system
410   * @param fs      filesystem object
411   * @param rootdir hbase root
412   * @throws IOException e
413   */
414  public static void setVersion(FileSystem fs, Path rootdir) throws IOException {
415    setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, 0,
416      HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
417  }
418
419  /**
420   * Sets version of file system
421   * @param fs      filesystem object
422   * @param rootdir hbase root
423   * @param wait    time to wait for retry
424   * @param retries number of times to retry before failing
425   * @throws IOException e
426   */
427  public static void setVersion(FileSystem fs, Path rootdir, int wait, int retries)
428    throws IOException {
429    setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, wait, retries);
430  }
431
432  /**
433   * Sets version of file system
434   * @param fs      filesystem object
435   * @param rootdir hbase root directory
436   * @param version version to set
437   * @param wait    time to wait for retry
438   * @param retries number of times to retry before throwing an IOException
439   * @throws IOException e
440   */
441  public static void setVersion(FileSystem fs, Path rootdir, String version, int wait, int retries)
442    throws IOException {
443    Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
444    Path tempVersionFile = new Path(rootdir,
445      HConstants.HBASE_TEMP_DIRECTORY + Path.SEPARATOR + HConstants.VERSION_FILE_NAME);
446    while (true) {
447      try {
448        // Write the version to a temporary file
449        FSDataOutputStream s = fs.create(tempVersionFile);
450        try {
451          s.write(toVersionByteArray(version));
452          s.close();
453          s = null;
454          // Move the temp version file to its normal location. Returns false
455          // if the rename failed. Throw an IOE in that case.
456          if (!fs.rename(tempVersionFile, versionFile)) {
457            throw new IOException("Unable to move temp version file to " + versionFile);
458          }
459        } finally {
460          // Cleaning up the temporary if the rename failed would be trying
461          // too hard. We'll unconditionally create it again the next time
462          // through anyway, files are overwritten by default by create().
463
464          // Attempt to close the stream on the way out if it is still open.
465          try {
466            if (s != null) s.close();
467          } catch (IOException ignore) {
468          }
469        }
470        LOG.info("Created version file at " + rootdir.toString() + " with version=" + version);
471        return;
472      } catch (IOException e) {
473        if (retries > 0) {
474          LOG.debug("Unable to create version file at " + rootdir.toString() + ", retrying", e);
475          fs.delete(versionFile, false);
476          try {
477            if (wait > 0) {
478              Thread.sleep(wait);
479            }
480          } catch (InterruptedException ie) {
481            throw (InterruptedIOException) new InterruptedIOException().initCause(ie);
482          }
483          retries--;
484        } else {
485          throw e;
486        }
487      }
488    }
489  }
490
491  /**
492   * Checks that a cluster ID file exists in the HBase root directory
493   * @param fs      the root directory FileSystem
494   * @param rootdir the HBase root directory in HDFS
495   * @param wait    how long to wait between retries
496   * @return <code>true</code> if the file exists, otherwise <code>false</code>
497   * @throws IOException if checking the FileSystem fails
498   */
499  public static boolean checkClusterIdExists(FileSystem fs, Path rootdir, long wait)
500    throws IOException {
501    while (true) {
502      try {
503        Path filePath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
504        return fs.exists(filePath);
505      } catch (IOException ioe) {
506        if (wait > 0L) {
507          LOG.warn("Unable to check cluster ID file in {}, retrying in {}ms", rootdir, wait, ioe);
508          try {
509            Thread.sleep(wait);
510          } catch (InterruptedException e) {
511            Thread.currentThread().interrupt();
512            throw (InterruptedIOException) new InterruptedIOException().initCause(e);
513          }
514        } else {
515          throw ioe;
516        }
517      }
518    }
519  }
520
521  /**
522   * Returns the value of the unique cluster ID stored for this HBase instance.
523   * @param fs      the root directory FileSystem
524   * @param rootdir the path to the HBase root directory
525   * @return the unique cluster identifier
526   * @throws IOException if reading the cluster ID file fails
527   */
528  public static ClusterId getClusterId(FileSystem fs, Path rootdir) throws IOException {
529    Path idPath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
530    ClusterId clusterId = null;
531    FileStatus status = fs.exists(idPath) ? fs.getFileStatus(idPath) : null;
532    if (status != null) {
533      int len = Ints.checkedCast(status.getLen());
534      byte[] content = new byte[len];
535      FSDataInputStream in = fs.open(idPath);
536      try {
537        in.readFully(content);
538      } catch (EOFException eof) {
539        LOG.warn("Cluster ID file {} is empty", idPath);
540      } finally {
541        in.close();
542      }
543      try {
544        clusterId = ClusterId.parseFrom(content);
545      } catch (DeserializationException e) {
546        throw new IOException("content=" + Bytes.toString(content), e);
547      }
548      // If not pb'd, make it so.
549      if (!ProtobufUtil.isPBMagicPrefix(content)) {
550        String cid = null;
551        in = fs.open(idPath);
552        try {
553          cid = in.readUTF();
554          clusterId = new ClusterId(cid);
555        } catch (EOFException eof) {
556          LOG.warn("Cluster ID file {} is empty", idPath);
557        } finally {
558          in.close();
559        }
560        rewriteAsPb(fs, rootdir, idPath, clusterId);
561      }
562      return clusterId;
563    } else {
564      LOG.warn("Cluster ID file does not exist at {}", idPath);
565    }
566    return clusterId;
567  }
568
569  /**
570   * nn
571   */
572  private static void rewriteAsPb(final FileSystem fs, final Path rootdir, final Path p,
573    final ClusterId cid) throws IOException {
574    // Rewrite the file as pb. Move aside the old one first, write new
575    // then delete the moved-aside file.
576    Path movedAsideName = new Path(p + "." + EnvironmentEdgeManager.currentTime());
577    if (!fs.rename(p, movedAsideName)) throw new IOException("Failed rename of " + p);
578    setClusterId(fs, rootdir, cid, 100);
579    if (!fs.delete(movedAsideName, false)) {
580      throw new IOException("Failed delete of " + movedAsideName);
581    }
582    LOG.debug("Rewrote the hbase.id file as pb");
583  }
584
585  /**
586   * Writes a new unique identifier for this cluster to the "hbase.id" file in the HBase root
587   * directory. If any operations on the ID file fails, and {@code wait} is a positive value, the
588   * method will retry to produce the ID file until the thread is forcibly interrupted.
589   * @param fs        the root directory FileSystem
590   * @param rootdir   the path to the HBase root directory
591   * @param clusterId the unique identifier to store
592   * @param wait      how long (in milliseconds) to wait between retries
593   * @throws IOException if writing to the FileSystem fails and no wait value
594   */
595  public static void setClusterId(final FileSystem fs, final Path rootdir,
596    final ClusterId clusterId, final long wait) throws IOException {
597
598    final Path idFile = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
599    final Path tempDir = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY);
600    final Path tempIdFile = new Path(tempDir, HConstants.CLUSTER_ID_FILE_NAME);
601
602    LOG.debug("Create cluster ID file [{}] with ID: {}", idFile, clusterId);
603
604    while (true) {
605      Optional<IOException> failure = Optional.empty();
606
607      LOG.debug("Write the cluster ID file to a temporary location: {}", tempIdFile);
608      try (FSDataOutputStream s = fs.create(tempIdFile)) {
609        s.write(clusterId.toByteArray());
610      } catch (IOException ioe) {
611        failure = Optional.of(ioe);
612      }
613
614      if (!failure.isPresent()) {
615        try {
616          LOG.debug("Move the temporary cluster ID file to its target location [{}]:[{}]",
617            tempIdFile, idFile);
618
619          if (!fs.rename(tempIdFile, idFile)) {
620            failure =
621              Optional.of(new IOException("Unable to move temp cluster ID file to " + idFile));
622          }
623        } catch (IOException ioe) {
624          failure = Optional.of(ioe);
625        }
626      }
627
628      if (failure.isPresent()) {
629        final IOException cause = failure.get();
630        if (wait > 0L) {
631          LOG.warn("Unable to create cluster ID file in {}, retrying in {}ms", rootdir, wait,
632            cause);
633          try {
634            Thread.sleep(wait);
635          } catch (InterruptedException e) {
636            Thread.currentThread().interrupt();
637            throw (InterruptedIOException) new InterruptedIOException().initCause(e);
638          }
639          continue;
640        } else {
641          throw cause;
642        }
643      } else {
644        return;
645      }
646    }
647  }
648
649  /**
650   * If DFS, check safe mode and if so, wait until we clear it.
651   * @param conf configuration
652   * @param wait Sleep between retries
653   * @throws IOException e
654   */
655  public static void waitOnSafeMode(final Configuration conf, final long wait) throws IOException {
656    FileSystem fs = FileSystem.get(conf);
657    if (!(fs instanceof DistributedFileSystem)) return;
658    DistributedFileSystem dfs = (DistributedFileSystem) fs;
659    // Make sure dfs is not in safe mode
660    while (isInSafeMode(dfs)) {
661      LOG.info("Waiting for dfs to exit safe mode...");
662      try {
663        Thread.sleep(wait);
664      } catch (InterruptedException e) {
665        Thread.currentThread().interrupt();
666        throw (InterruptedIOException) new InterruptedIOException().initCause(e);
667      }
668    }
669  }
670
671  /**
672   * Checks if meta region exists
673   * @param fs      file system
674   * @param rootDir root directory of HBase installation
675   * @return true if exists
676   */
677  public static boolean metaRegionExists(FileSystem fs, Path rootDir) throws IOException {
678    Path metaRegionDir = getRegionDirFromRootDir(rootDir, RegionInfoBuilder.FIRST_META_REGIONINFO);
679    return fs.exists(metaRegionDir);
680  }
681
682  /**
683   * Compute HDFS block distribution of a given HdfsDataInputStream. All HdfsDataInputStreams are
684   * backed by a series of LocatedBlocks, which are fetched periodically from the namenode. This
685   * method retrieves those blocks from the input stream and uses them to calculate
686   * HDFSBlockDistribution. The underlying method in DFSInputStream does attempt to use locally
687   * cached blocks, but may hit the namenode if the cache is determined to be incomplete. The method
688   * also involves making copies of all LocatedBlocks rather than return the underlying blocks
689   * themselves.
690   */
691  static public HDFSBlocksDistribution
692    computeHDFSBlocksDistribution(HdfsDataInputStream inputStream) throws IOException {
693    List<LocatedBlock> blocks = inputStream.getAllBlocks();
694    HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution();
695    for (LocatedBlock block : blocks) {
696      String[] hosts = getHostsForLocations(block);
697      long len = block.getBlockSize();
698      StorageType[] storageTypes = block.getStorageTypes();
699      blocksDistribution.addHostsAndBlockWeight(hosts, len, storageTypes);
700    }
701    return blocksDistribution;
702  }
703
704  private static String[] getHostsForLocations(LocatedBlock block) {
705    DatanodeInfo[] locations = block.getLocations();
706    String[] hosts = new String[locations.length];
707    for (int i = 0; i < hosts.length; i++) {
708      hosts[i] = locations[i].getHostName();
709    }
710    return hosts;
711  }
712
713  /**
714   * Compute HDFS blocks distribution of a given file, or a portion of the file
715   * @param fs     file system
716   * @param status file status of the file
717   * @param start  start position of the portion
718   * @param length length of the portion
719   * @return The HDFS blocks distribution
720   */
721  static public HDFSBlocksDistribution computeHDFSBlocksDistribution(final FileSystem fs,
722    FileStatus status, long start, long length) throws IOException {
723    HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution();
724    BlockLocation[] blockLocations = fs.getFileBlockLocations(status, start, length);
725    addToHDFSBlocksDistribution(blocksDistribution, blockLocations);
726    return blocksDistribution;
727  }
728
729  /**
730   * Update blocksDistribution with blockLocations
731   * @param blocksDistribution the hdfs blocks distribution
732   * @param blockLocations     an array containing block location
733   */
734  static public void addToHDFSBlocksDistribution(HDFSBlocksDistribution blocksDistribution,
735    BlockLocation[] blockLocations) throws IOException {
736    for (BlockLocation bl : blockLocations) {
737      String[] hosts = bl.getHosts();
738      long len = bl.getLength();
739      StorageType[] storageTypes = bl.getStorageTypes();
740      blocksDistribution.addHostsAndBlockWeight(hosts, len, storageTypes);
741    }
742  }
743
744  // TODO move this method OUT of FSUtils. No dependencies to HMaster
745  /**
746   * Returns the total overall fragmentation percentage. Includes hbase:meta and -ROOT- as well.
747   * @param master The master defining the HBase root and file system
748   * @return A map for each table and its percentage (never null)
749   * @throws IOException When scanning the directory fails
750   */
751  public static int getTotalTableFragmentation(final HMaster master) throws IOException {
752    Map<String, Integer> map = getTableFragmentation(master);
753    return map.isEmpty() ? -1 : map.get("-TOTAL-");
754  }
755
756  /**
757   * Runs through the HBase rootdir and checks how many stores for each table have more than one
758   * file in them. Checks -ROOT- and hbase:meta too. The total percentage across all tables is
759   * stored under the special key "-TOTAL-".
760   * @param master The master defining the HBase root and file system.
761   * @return A map for each table and its percentage (never null).
762   * @throws IOException When scanning the directory fails.
763   */
764  public static Map<String, Integer> getTableFragmentation(final HMaster master)
765    throws IOException {
766    Path path = CommonFSUtils.getRootDir(master.getConfiguration());
767    // since HMaster.getFileSystem() is package private
768    FileSystem fs = path.getFileSystem(master.getConfiguration());
769    return getTableFragmentation(fs, path);
770  }
771
772  /**
773   * Runs through the HBase rootdir and checks how many stores for each table have more than one
774   * file in them. Checks -ROOT- and hbase:meta too. The total percentage across all tables is
775   * stored under the special key "-TOTAL-".
776   * @param fs           The file system to use
777   * @param hbaseRootDir The root directory to scan
778   * @return A map for each table and its percentage (never null)
779   * @throws IOException When scanning the directory fails
780   */
781  public static Map<String, Integer> getTableFragmentation(final FileSystem fs,
782    final Path hbaseRootDir) throws IOException {
783    Map<String, Integer> frags = new HashMap<>();
784    int cfCountTotal = 0;
785    int cfFragTotal = 0;
786    PathFilter regionFilter = new RegionDirFilter(fs);
787    PathFilter familyFilter = new FamilyDirFilter(fs);
788    List<Path> tableDirs = getTableDirs(fs, hbaseRootDir);
789    for (Path d : tableDirs) {
790      int cfCount = 0;
791      int cfFrag = 0;
792      FileStatus[] regionDirs = fs.listStatus(d, regionFilter);
793      for (FileStatus regionDir : regionDirs) {
794        Path dd = regionDir.getPath();
795        // else its a region name, now look in region for families
796        FileStatus[] familyDirs = fs.listStatus(dd, familyFilter);
797        for (FileStatus familyDir : familyDirs) {
798          cfCount++;
799          cfCountTotal++;
800          Path family = familyDir.getPath();
801          // now in family make sure only one file
802          FileStatus[] familyStatus = fs.listStatus(family);
803          if (familyStatus.length > 1) {
804            cfFrag++;
805            cfFragTotal++;
806          }
807        }
808      }
809      // compute percentage per table and store in result list
810      frags.put(CommonFSUtils.getTableName(d).getNameAsString(),
811        cfCount == 0 ? 0 : Math.round((float) cfFrag / cfCount * 100));
812    }
813    // set overall percentage for all tables
814    frags.put("-TOTAL-",
815      cfCountTotal == 0 ? 0 : Math.round((float) cfFragTotal / cfCountTotal * 100));
816    return frags;
817  }
818
819  public static void renameFile(FileSystem fs, Path src, Path dst) throws IOException {
820    if (fs.exists(dst) && !fs.delete(dst, false)) {
821      throw new IOException("Can not delete " + dst);
822    }
823    if (!fs.rename(src, dst)) {
824      throw new IOException("Can not rename from " + src + " to " + dst);
825    }
826  }
827
828  /**
829   * A {@link PathFilter} that returns only regular files.
830   */
831  static class FileFilter extends AbstractFileStatusFilter {
832    private final FileSystem fs;
833
834    public FileFilter(final FileSystem fs) {
835      this.fs = fs;
836    }
837
838    @Override
839    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
840      try {
841        return isFile(fs, isDir, p);
842      } catch (IOException e) {
843        LOG.warn("Unable to verify if path={} is a regular file", p, e);
844        return false;
845      }
846    }
847  }
848
849  /**
850   * Directory filter that doesn't include any of the directories in the specified blacklist
851   */
852  public static class BlackListDirFilter extends AbstractFileStatusFilter {
853    private final FileSystem fs;
854    private List<String> blacklist;
855
856    /**
857     * Create a filter on the givem filesystem with the specified blacklist
858     * @param fs                     filesystem to filter
859     * @param directoryNameBlackList list of the names of the directories to filter. If
860     *                               <tt>null</tt>, all directories are returned
861     */
862    @SuppressWarnings("unchecked")
863    public BlackListDirFilter(final FileSystem fs, final List<String> directoryNameBlackList) {
864      this.fs = fs;
865      blacklist = (List<String>) (directoryNameBlackList == null
866        ? Collections.emptyList()
867        : directoryNameBlackList);
868    }
869
870    @Override
871    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
872      if (!isValidName(p.getName())) {
873        return false;
874      }
875
876      try {
877        return isDirectory(fs, isDir, p);
878      } catch (IOException e) {
879        LOG.warn("An error occurred while verifying if [{}] is a valid directory."
880          + " Returning 'not valid' and continuing.", p, e);
881        return false;
882      }
883    }
884
885    protected boolean isValidName(final String name) {
886      return !blacklist.contains(name);
887    }
888  }
889
890  /**
891   * A {@link PathFilter} that only allows directories.
892   */
893  public static class DirFilter extends BlackListDirFilter {
894
895    public DirFilter(FileSystem fs) {
896      super(fs, null);
897    }
898  }
899
900  /**
901   * A {@link PathFilter} that returns usertable directories. To get all directories use the
902   * {@link BlackListDirFilter} with a <tt>null</tt> blacklist
903   */
904  public static class UserTableDirFilter extends BlackListDirFilter {
905    public UserTableDirFilter(FileSystem fs) {
906      super(fs, HConstants.HBASE_NON_TABLE_DIRS);
907    }
908
909    @Override
910    protected boolean isValidName(final String name) {
911      if (!super.isValidName(name)) return false;
912
913      try {
914        TableName.isLegalTableQualifierName(Bytes.toBytes(name));
915      } catch (IllegalArgumentException e) {
916        LOG.info("Invalid table name: {}", name);
917        return false;
918      }
919      return true;
920    }
921  }
922
923  public static List<Path> getTableDirs(final FileSystem fs, final Path rootdir)
924    throws IOException {
925    List<Path> tableDirs = new ArrayList<>();
926    Path baseNamespaceDir = new Path(rootdir, HConstants.BASE_NAMESPACE_DIR);
927    if (fs.exists(baseNamespaceDir)) {
928      for (FileStatus status : fs.globStatus(new Path(baseNamespaceDir, "*"))) {
929        tableDirs.addAll(FSUtils.getLocalTableDirs(fs, status.getPath()));
930      }
931    }
932    return tableDirs;
933  }
934
935  /**
936   * nn * @return All the table directories under <code>rootdir</code>. Ignore non table hbase
937   * folders such as .logs, .oldlogs, .corrupt folders. n
938   */
939  public static List<Path> getLocalTableDirs(final FileSystem fs, final Path rootdir)
940    throws IOException {
941    // presumes any directory under hbase.rootdir is a table
942    FileStatus[] dirs = fs.listStatus(rootdir, new UserTableDirFilter(fs));
943    List<Path> tabledirs = new ArrayList<>(dirs.length);
944    for (FileStatus dir : dirs) {
945      tabledirs.add(dir.getPath());
946    }
947    return tabledirs;
948  }
949
950  /**
951   * Filter for all dirs that don't start with '.'
952   */
953  public static class RegionDirFilter extends AbstractFileStatusFilter {
954    // This pattern will accept 0.90+ style hex region dirs and older numeric region dir names.
955    final public static Pattern regionDirPattern = Pattern.compile("^[0-9a-f]*$");
956    final FileSystem fs;
957
958    public RegionDirFilter(FileSystem fs) {
959      this.fs = fs;
960    }
961
962    @Override
963    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
964      if (!regionDirPattern.matcher(p.getName()).matches()) {
965        return false;
966      }
967
968      try {
969        return isDirectory(fs, isDir, p);
970      } catch (IOException ioe) {
971        // Maybe the file was moved or the fs was disconnected.
972        LOG.warn("Skipping file {} due to IOException", p, ioe);
973        return false;
974      }
975    }
976  }
977
978  /**
979   * Given a particular table dir, return all the regiondirs inside it, excluding files such as
980   * .tableinfo
981   * @param fs       A file system for the Path
982   * @param tableDir Path to a specific table directory &lt;hbase.rootdir&gt;/&lt;tabledir&gt;
983   * @return List of paths to valid region directories in table dir. n
984   */
985  public static List<Path> getRegionDirs(final FileSystem fs, final Path tableDir)
986    throws IOException {
987    // assumes we are in a table dir.
988    List<FileStatus> rds = listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs));
989    if (rds == null) {
990      return Collections.emptyList();
991    }
992    List<Path> regionDirs = new ArrayList<>(rds.size());
993    for (FileStatus rdfs : rds) {
994      Path rdPath = rdfs.getPath();
995      regionDirs.add(rdPath);
996    }
997    return regionDirs;
998  }
999
1000  public static Path getRegionDirFromRootDir(Path rootDir, RegionInfo region) {
1001    return getRegionDirFromTableDir(CommonFSUtils.getTableDir(rootDir, region.getTable()), region);
1002  }
1003
1004  public static Path getRegionDirFromTableDir(Path tableDir, RegionInfo region) {
1005    return getRegionDirFromTableDir(tableDir,
1006      ServerRegionReplicaUtil.getRegionInfoForFs(region).getEncodedName());
1007  }
1008
1009  public static Path getRegionDirFromTableDir(Path tableDir, String encodedRegionName) {
1010    return new Path(tableDir, encodedRegionName);
1011  }
1012
1013  /**
1014   * Filter for all dirs that are legal column family names. This is generally used for colfam dirs
1015   * &lt;hbase.rootdir&gt;/&lt;tabledir&gt;/&lt;regiondir&gt;/&lt;colfamdir&gt;.
1016   */
1017  public static class FamilyDirFilter extends AbstractFileStatusFilter {
1018    final FileSystem fs;
1019
1020    public FamilyDirFilter(FileSystem fs) {
1021      this.fs = fs;
1022    }
1023
1024    @Override
1025    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1026      try {
1027        // throws IAE if invalid
1028        ColumnFamilyDescriptorBuilder.isLegalColumnFamilyName(Bytes.toBytes(p.getName()));
1029      } catch (IllegalArgumentException iae) {
1030        // path name is an invalid family name and thus is excluded.
1031        return false;
1032      }
1033
1034      try {
1035        return isDirectory(fs, isDir, p);
1036      } catch (IOException ioe) {
1037        // Maybe the file was moved or the fs was disconnected.
1038        LOG.warn("Skipping file {} due to IOException", p, ioe);
1039        return false;
1040      }
1041    }
1042  }
1043
1044  /**
1045   * Given a particular region dir, return all the familydirs inside it
1046   * @param fs        A file system for the Path
1047   * @param regionDir Path to a specific region directory
1048   * @return List of paths to valid family directories in region dir. n
1049   */
1050  public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir)
1051    throws IOException {
1052    // assumes we are in a region dir.
1053    return getFilePaths(fs, regionDir, new FamilyDirFilter(fs));
1054  }
1055
1056  public static List<Path> getReferenceFilePaths(final FileSystem fs, final Path familyDir)
1057    throws IOException {
1058    return getFilePaths(fs, familyDir, new ReferenceFileFilter(fs));
1059  }
1060
1061  public static List<Path> getReferenceAndLinkFilePaths(final FileSystem fs, final Path familyDir)
1062    throws IOException {
1063    return getFilePaths(fs, familyDir, new ReferenceAndLinkFileFilter(fs));
1064  }
1065
1066  private static List<Path> getFilePaths(final FileSystem fs, final Path dir,
1067    final PathFilter pathFilter) throws IOException {
1068    FileStatus[] fds = fs.listStatus(dir, pathFilter);
1069    List<Path> files = new ArrayList<>(fds.length);
1070    for (FileStatus fdfs : fds) {
1071      Path fdPath = fdfs.getPath();
1072      files.add(fdPath);
1073    }
1074    return files;
1075  }
1076
1077  public static int getRegionReferenceAndLinkFileCount(final FileSystem fs, final Path p) {
1078    int result = 0;
1079    try {
1080      for (Path familyDir : getFamilyDirs(fs, p)) {
1081        result += getReferenceAndLinkFilePaths(fs, familyDir).size();
1082      }
1083    } catch (IOException e) {
1084      LOG.warn("Error Counting reference files.", e);
1085    }
1086    return result;
1087  }
1088
1089  public static class ReferenceAndLinkFileFilter implements PathFilter {
1090
1091    private final FileSystem fs;
1092
1093    public ReferenceAndLinkFileFilter(FileSystem fs) {
1094      this.fs = fs;
1095    }
1096
1097    @Override
1098    public boolean accept(Path rd) {
1099      try {
1100        // only files can be references.
1101        return !fs.getFileStatus(rd).isDirectory()
1102          && (StoreFileInfo.isReference(rd) || HFileLink.isHFileLink(rd));
1103      } catch (IOException ioe) {
1104        // Maybe the file was moved or the fs was disconnected.
1105        LOG.warn("Skipping file " + rd + " due to IOException", ioe);
1106        return false;
1107      }
1108    }
1109  }
1110
1111  /**
1112   * Filter for HFiles that excludes reference files.
1113   */
1114  public static class HFileFilter extends AbstractFileStatusFilter {
1115    final FileSystem fs;
1116
1117    public HFileFilter(FileSystem fs) {
1118      this.fs = fs;
1119    }
1120
1121    @Override
1122    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1123      if (!StoreFileInfo.isHFile(p) && !StoreFileInfo.isMobFile(p)) {
1124        return false;
1125      }
1126
1127      try {
1128        return isFile(fs, isDir, p);
1129      } catch (IOException ioe) {
1130        // Maybe the file was moved or the fs was disconnected.
1131        LOG.warn("Skipping file {} due to IOException", p, ioe);
1132        return false;
1133      }
1134    }
1135  }
1136
1137  /**
1138   * Filter for HFileLinks (StoreFiles and HFiles not included). the filter itself does not consider
1139   * if a link is file or not.
1140   */
1141  public static class HFileLinkFilter implements PathFilter {
1142
1143    @Override
1144    public boolean accept(Path p) {
1145      return HFileLink.isHFileLink(p);
1146    }
1147  }
1148
1149  public static class ReferenceFileFilter extends AbstractFileStatusFilter {
1150
1151    private final FileSystem fs;
1152
1153    public ReferenceFileFilter(FileSystem fs) {
1154      this.fs = fs;
1155    }
1156
1157    @Override
1158    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1159      if (!StoreFileInfo.isReference(p)) {
1160        return false;
1161      }
1162
1163      try {
1164        // only files can be references.
1165        return isFile(fs, isDir, p);
1166      } catch (IOException ioe) {
1167        // Maybe the file was moved or the fs was disconnected.
1168        LOG.warn("Skipping file {} due to IOException", p, ioe);
1169        return false;
1170      }
1171    }
1172  }
1173
1174  /**
1175   * Called every so-often by storefile map builder getTableStoreFilePathMap to report progress.
1176   */
1177  interface ProgressReporter {
1178    /**
1179     * @param status File or directory we are about to process.
1180     */
1181    void progress(FileStatus status);
1182  }
1183
1184  /**
1185   * Runs through the HBase rootdir/tablename and creates a reverse lookup map for table StoreFile
1186   * names to the full Path. <br>
1187   * Example...<br>
1188   * Key = 3944417774205889744 <br>
1189   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1190   * @param map          map to add values. If null, this method will create and populate one to
1191   *                     return
1192   * @param fs           The file system to use.
1193   * @param hbaseRootDir The root directory to scan.
1194   * @param tableName    name of the table to scan.
1195   * @return Map keyed by StoreFile name with a value of the full Path.
1196   * @throws IOException When scanning the directory fails. n
1197   */
1198  public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map,
1199    final FileSystem fs, final Path hbaseRootDir, TableName tableName)
1200    throws IOException, InterruptedException {
1201    return getTableStoreFilePathMap(map, fs, hbaseRootDir, tableName, null, null,
1202      (ProgressReporter) null);
1203  }
1204
1205  /**
1206   * Runs through the HBase rootdir/tablename and creates a reverse lookup map for table StoreFile
1207   * names to the full Path. Note that because this method can be called on a 'live' HBase system
1208   * that we will skip files that no longer exist by the time we traverse them and similarly the
1209   * user of the result needs to consider that some entries in this map may not exist by the time
1210   * this call completes. <br>
1211   * Example...<br>
1212   * Key = 3944417774205889744 <br>
1213   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1214   * @param resultMap        map to add values. If null, this method will create and populate one to
1215   *                         return
1216   * @param fs               The file system to use.
1217   * @param hbaseRootDir     The root directory to scan.
1218   * @param tableName        name of the table to scan.
1219   * @param sfFilter         optional path filter to apply to store files
1220   * @param executor         optional executor service to parallelize this operation
1221   * @param progressReporter Instance or null; gets called every time we move to new region of
1222   *                         family dir and for each store file.
1223   * @return Map keyed by StoreFile name with a value of the full Path.
1224   * @throws IOException When scanning the directory fails.
1225   * @deprecated Since 2.3.0. For removal in hbase4. Use ProgressReporter override instead.
1226   */
1227  @Deprecated
1228  public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap,
1229    final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter,
1230    ExecutorService executor, final HbckErrorReporter progressReporter)
1231    throws IOException, InterruptedException {
1232    return getTableStoreFilePathMap(resultMap, fs, hbaseRootDir, tableName, sfFilter, executor,
1233      new ProgressReporter() {
1234        @Override
1235        public void progress(FileStatus status) {
1236          // status is not used in this implementation.
1237          progressReporter.progress();
1238        }
1239      });
1240  }
1241
1242  /**
1243   * Runs through the HBase rootdir/tablename and creates a reverse lookup map for table StoreFile
1244   * names to the full Path. Note that because this method can be called on a 'live' HBase system
1245   * that we will skip files that no longer exist by the time we traverse them and similarly the
1246   * user of the result needs to consider that some entries in this map may not exist by the time
1247   * this call completes. <br>
1248   * Example...<br>
1249   * Key = 3944417774205889744 <br>
1250   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1251   * @param resultMap        map to add values. If null, this method will create and populate one to
1252   *                         return
1253   * @param fs               The file system to use.
1254   * @param hbaseRootDir     The root directory to scan.
1255   * @param tableName        name of the table to scan.
1256   * @param sfFilter         optional path filter to apply to store files
1257   * @param executor         optional executor service to parallelize this operation
1258   * @param progressReporter Instance or null; gets called every time we move to new region of
1259   *                         family dir and for each store file.
1260   * @return Map keyed by StoreFile name with a value of the full Path.
1261   * @throws IOException          When scanning the directory fails.
1262   * @throws InterruptedException the thread is interrupted, either before or during the activity.
1263   */
1264  public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap,
1265    final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter,
1266    ExecutorService executor, final ProgressReporter progressReporter)
1267    throws IOException, InterruptedException {
1268
1269    final Map<String, Path> finalResultMap =
1270      resultMap == null ? new ConcurrentHashMap<>(128, 0.75f, 32) : resultMap;
1271
1272    // only include the directory paths to tables
1273    Path tableDir = CommonFSUtils.getTableDir(hbaseRootDir, tableName);
1274    // Inside a table, there are compaction.dir directories to skip. Otherwise, all else
1275    // should be regions.
1276    final FamilyDirFilter familyFilter = new FamilyDirFilter(fs);
1277    final Vector<Exception> exceptions = new Vector<>();
1278
1279    try {
1280      List<FileStatus> regionDirs =
1281        FSUtils.listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs));
1282      if (regionDirs == null) {
1283        return finalResultMap;
1284      }
1285
1286      final List<Future<?>> futures = new ArrayList<>(regionDirs.size());
1287
1288      for (FileStatus regionDir : regionDirs) {
1289        if (null != progressReporter) {
1290          progressReporter.progress(regionDir);
1291        }
1292        final Path dd = regionDir.getPath();
1293
1294        if (!exceptions.isEmpty()) {
1295          break;
1296        }
1297
1298        Runnable getRegionStoreFileMapCall = new Runnable() {
1299          @Override
1300          public void run() {
1301            try {
1302              HashMap<String, Path> regionStoreFileMap = new HashMap<>();
1303              List<FileStatus> familyDirs =
1304                FSUtils.listStatusWithStatusFilter(fs, dd, familyFilter);
1305              if (familyDirs == null) {
1306                if (!fs.exists(dd)) {
1307                  LOG.warn("Skipping region because it no longer exists: " + dd);
1308                } else {
1309                  LOG.warn("Skipping region because it has no family dirs: " + dd);
1310                }
1311                return;
1312              }
1313              for (FileStatus familyDir : familyDirs) {
1314                if (null != progressReporter) {
1315                  progressReporter.progress(familyDir);
1316                }
1317                Path family = familyDir.getPath();
1318                if (family.getName().equals(HConstants.RECOVERED_EDITS_DIR)) {
1319                  continue;
1320                }
1321                // now in family, iterate over the StoreFiles and
1322                // put in map
1323                FileStatus[] familyStatus = fs.listStatus(family);
1324                for (FileStatus sfStatus : familyStatus) {
1325                  if (null != progressReporter) {
1326                    progressReporter.progress(sfStatus);
1327                  }
1328                  Path sf = sfStatus.getPath();
1329                  if (sfFilter == null || sfFilter.accept(sf)) {
1330                    regionStoreFileMap.put(sf.getName(), sf);
1331                  }
1332                }
1333              }
1334              finalResultMap.putAll(regionStoreFileMap);
1335            } catch (Exception e) {
1336              LOG.error("Could not get region store file map for region: " + dd, e);
1337              exceptions.add(e);
1338            }
1339          }
1340        };
1341
1342        // If executor is available, submit async tasks to exec concurrently, otherwise
1343        // just do serial sync execution
1344        if (executor != null) {
1345          Future<?> future = executor.submit(getRegionStoreFileMapCall);
1346          futures.add(future);
1347        } else {
1348          FutureTask<?> future = new FutureTask<>(getRegionStoreFileMapCall, null);
1349          future.run();
1350          futures.add(future);
1351        }
1352      }
1353
1354      // Ensure all pending tasks are complete (or that we run into an exception)
1355      for (Future<?> f : futures) {
1356        if (!exceptions.isEmpty()) {
1357          break;
1358        }
1359        try {
1360          f.get();
1361        } catch (ExecutionException e) {
1362          LOG.error("Unexpected exec exception!  Should've been caught already.  (Bug?)", e);
1363          // Shouldn't happen, we already logged/caught any exceptions in the Runnable
1364        }
1365      }
1366    } catch (IOException e) {
1367      LOG.error("Cannot execute getTableStoreFilePathMap for " + tableName, e);
1368      exceptions.add(e);
1369    } finally {
1370      if (!exceptions.isEmpty()) {
1371        // Just throw the first exception as an indication something bad happened
1372        // Don't need to propagate all the exceptions, we already logged them all anyway
1373        Throwables.propagateIfPossible(exceptions.firstElement(), IOException.class);
1374        throw new IOException(exceptions.firstElement());
1375      }
1376    }
1377
1378    return finalResultMap;
1379  }
1380
1381  public static int getRegionReferenceFileCount(final FileSystem fs, final Path p) {
1382    int result = 0;
1383    try {
1384      for (Path familyDir : getFamilyDirs(fs, p)) {
1385        result += getReferenceFilePaths(fs, familyDir).size();
1386      }
1387    } catch (IOException e) {
1388      LOG.warn("Error counting reference files", e);
1389    }
1390    return result;
1391  }
1392
1393  /**
1394   * Runs through the HBase rootdir and creates a reverse lookup map for table StoreFile names to
1395   * the full Path. <br>
1396   * Example...<br>
1397   * Key = 3944417774205889744 <br>
1398   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1399   * @param fs           The file system to use.
1400   * @param hbaseRootDir The root directory to scan.
1401   * @return Map keyed by StoreFile name with a value of the full Path.
1402   * @throws IOException When scanning the directory fails.
1403   */
1404  public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs,
1405    final Path hbaseRootDir) throws IOException, InterruptedException {
1406    return getTableStoreFilePathMap(fs, hbaseRootDir, null, null, (ProgressReporter) null);
1407  }
1408
1409  /**
1410   * Runs through the HBase rootdir and creates a reverse lookup map for table StoreFile names to
1411   * the full Path. <br>
1412   * Example...<br>
1413   * Key = 3944417774205889744 <br>
1414   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1415   * @param fs               The file system to use.
1416   * @param hbaseRootDir     The root directory to scan.
1417   * @param sfFilter         optional path filter to apply to store files
1418   * @param executor         optional executor service to parallelize this operation
1419   * @param progressReporter Instance or null; gets called every time we move to new region of
1420   *                         family dir and for each store file.
1421   * @return Map keyed by StoreFile name with a value of the full Path.
1422   * @throws IOException When scanning the directory fails.
1423   * @deprecated Since 2.3.0. Will be removed in hbase4. Used
1424   *             {@link #getTableStoreFilePathMap(FileSystem, Path, PathFilter, ExecutorService, ProgressReporter)}
1425   */
1426  @Deprecated
1427  public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs,
1428    final Path hbaseRootDir, PathFilter sfFilter, ExecutorService executor,
1429    HbckErrorReporter progressReporter) throws IOException, InterruptedException {
1430    return getTableStoreFilePathMap(fs, hbaseRootDir, sfFilter, executor, new ProgressReporter() {
1431      @Override
1432      public void progress(FileStatus status) {
1433        // status is not used in this implementation.
1434        progressReporter.progress();
1435      }
1436    });
1437  }
1438
1439  /**
1440   * Runs through the HBase rootdir and creates a reverse lookup map for table StoreFile names to
1441   * the full Path. <br>
1442   * Example...<br>
1443   * Key = 3944417774205889744 <br>
1444   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1445   * @param fs               The file system to use.
1446   * @param hbaseRootDir     The root directory to scan.
1447   * @param sfFilter         optional path filter to apply to store files
1448   * @param executor         optional executor service to parallelize this operation
1449   * @param progressReporter Instance or null; gets called every time we move to new region of
1450   *                         family dir and for each store file.
1451   * @return Map keyed by StoreFile name with a value of the full Path.
1452   * @throws IOException When scanning the directory fails. n
1453   */
1454  public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs,
1455    final Path hbaseRootDir, PathFilter sfFilter, ExecutorService executor,
1456    ProgressReporter progressReporter) throws IOException, InterruptedException {
1457    ConcurrentHashMap<String, Path> map = new ConcurrentHashMap<>(1024, 0.75f, 32);
1458
1459    // if this method looks similar to 'getTableFragmentation' that is because
1460    // it was borrowed from it.
1461
1462    // only include the directory paths to tables
1463    for (Path tableDir : FSUtils.getTableDirs(fs, hbaseRootDir)) {
1464      getTableStoreFilePathMap(map, fs, hbaseRootDir, CommonFSUtils.getTableName(tableDir),
1465        sfFilter, executor, progressReporter);
1466    }
1467    return map;
1468  }
1469
1470  /**
1471   * Filters FileStatuses in an array and returns a list
1472   * @param input  An array of FileStatuses
1473   * @param filter A required filter to filter the array
1474   * @return A list of FileStatuses
1475   */
1476  public static List<FileStatus> filterFileStatuses(FileStatus[] input, FileStatusFilter filter) {
1477    if (input == null) return null;
1478    return filterFileStatuses(Iterators.forArray(input), filter);
1479  }
1480
1481  /**
1482   * Filters FileStatuses in an iterator and returns a list
1483   * @param input  An iterator of FileStatuses
1484   * @param filter A required filter to filter the array
1485   * @return A list of FileStatuses
1486   */
1487  public static List<FileStatus> filterFileStatuses(Iterator<FileStatus> input,
1488    FileStatusFilter filter) {
1489    if (input == null) return null;
1490    ArrayList<FileStatus> results = new ArrayList<>();
1491    while (input.hasNext()) {
1492      FileStatus f = input.next();
1493      if (filter.accept(f)) {
1494        results.add(f);
1495      }
1496    }
1497    return results;
1498  }
1499
1500  /**
1501   * Calls fs.listStatus() and treats FileNotFoundException as non-fatal This accommodates
1502   * differences between hadoop versions, where hadoop 1 does not throw a FileNotFoundException, and
1503   * return an empty FileStatus[] while Hadoop 2 will throw FileNotFoundException.
1504   * @param fs     file system
1505   * @param dir    directory
1506   * @param filter file status filter
1507   * @return null if dir is empty or doesn't exist, otherwise FileStatus list
1508   */
1509  public static List<FileStatus> listStatusWithStatusFilter(final FileSystem fs, final Path dir,
1510    final FileStatusFilter filter) throws IOException {
1511    FileStatus[] status = null;
1512    try {
1513      status = fs.listStatus(dir);
1514    } catch (FileNotFoundException fnfe) {
1515      LOG.trace("{} does not exist", dir);
1516      return null;
1517    }
1518
1519    if (ArrayUtils.getLength(status) == 0) {
1520      return null;
1521    }
1522
1523    if (filter == null) {
1524      return Arrays.asList(status);
1525    } else {
1526      List<FileStatus> status2 = filterFileStatuses(status, filter);
1527      if (status2 == null || status2.isEmpty()) {
1528        return null;
1529      } else {
1530        return status2;
1531      }
1532    }
1533  }
1534
1535  /**
1536   * This function is to scan the root path of the file system to get the degree of locality for
1537   * each region on each of the servers having at least one block of that region. This is used by
1538   * the tool {@link org.apache.hadoop.hbase.master.RegionPlacementMaintainer} n * the configuration
1539   * to use
1540   * @return the mapping from region encoded name to a map of server names to locality fraction n *
1541   *         in case of file system errors or interrupts
1542   */
1543  public static Map<String, Map<String, Float>>
1544    getRegionDegreeLocalityMappingFromFS(final Configuration conf) throws IOException {
1545    return getRegionDegreeLocalityMappingFromFS(conf, null,
1546      conf.getInt(THREAD_POOLSIZE, DEFAULT_THREAD_POOLSIZE));
1547
1548  }
1549
1550  /**
1551   * This function is to scan the root path of the file system to get the degree of locality for
1552   * each region on each of the servers having at least one block of that region. n * the
1553   * configuration to use n * the table you wish to scan locality for n * the thread pool size to
1554   * use
1555   * @return the mapping from region encoded name to a map of server names to locality fraction n *
1556   *         in case of file system errors or interrupts
1557   */
1558  public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
1559    final Configuration conf, final String desiredTable, int threadPoolSize) throws IOException {
1560    Map<String, Map<String, Float>> regionDegreeLocalityMapping = new ConcurrentHashMap<>();
1561    getRegionLocalityMappingFromFS(conf, desiredTable, threadPoolSize, regionDegreeLocalityMapping);
1562    return regionDegreeLocalityMapping;
1563  }
1564
1565  /**
1566   * This function is to scan the root path of the file system to get either the mapping between the
1567   * region name and its best locality region server or the degree of locality of each region on
1568   * each of the servers having at least one block of that region. The output map parameters are
1569   * both optional. n * the configuration to use n * the table you wish to scan locality for n * the
1570   * thread pool size to use n * the map into which to put the locality degree mapping or null, must
1571   * be a thread-safe implementation n * in case of file system errors or interrupts
1572   */
1573  private static void getRegionLocalityMappingFromFS(final Configuration conf,
1574    final String desiredTable, int threadPoolSize,
1575    final Map<String, Map<String, Float>> regionDegreeLocalityMapping) throws IOException {
1576    final FileSystem fs = FileSystem.get(conf);
1577    final Path rootPath = CommonFSUtils.getRootDir(conf);
1578    final long startTime = EnvironmentEdgeManager.currentTime();
1579    final Path queryPath;
1580    // The table files are in ${hbase.rootdir}/data/<namespace>/<table>/*
1581    if (null == desiredTable) {
1582      queryPath =
1583        new Path(new Path(rootPath, HConstants.BASE_NAMESPACE_DIR).toString() + "/*/*/*/");
1584    } else {
1585      queryPath = new Path(
1586        CommonFSUtils.getTableDir(rootPath, TableName.valueOf(desiredTable)).toString() + "/*/");
1587    }
1588
1589    // reject all paths that are not appropriate
1590    PathFilter pathFilter = new PathFilter() {
1591      @Override
1592      public boolean accept(Path path) {
1593        // this is the region name; it may get some noise data
1594        if (null == path) {
1595          return false;
1596        }
1597
1598        // no parent?
1599        Path parent = path.getParent();
1600        if (null == parent) {
1601          return false;
1602        }
1603
1604        String regionName = path.getName();
1605        if (null == regionName) {
1606          return false;
1607        }
1608
1609        if (!regionName.toLowerCase(Locale.ROOT).matches("[0-9a-f]+")) {
1610          return false;
1611        }
1612        return true;
1613      }
1614    };
1615
1616    FileStatus[] statusList = fs.globStatus(queryPath, pathFilter);
1617
1618    if (LOG.isDebugEnabled()) {
1619      LOG.debug("Query Path: {} ; # list of files: {}", queryPath, Arrays.toString(statusList));
1620    }
1621
1622    if (null == statusList) {
1623      return;
1624    }
1625
1626    // lower the number of threads in case we have very few expected regions
1627    threadPoolSize = Math.min(threadPoolSize, statusList.length);
1628
1629    // run in multiple threads
1630    final ExecutorService tpe = Executors.newFixedThreadPool(threadPoolSize,
1631      new ThreadFactoryBuilder().setNameFormat("FSRegionQuery-pool-%d").setDaemon(true)
1632        .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
1633    try {
1634      // ignore all file status items that are not of interest
1635      for (FileStatus regionStatus : statusList) {
1636        if (null == regionStatus || !regionStatus.isDirectory()) {
1637          continue;
1638        }
1639
1640        final Path regionPath = regionStatus.getPath();
1641        if (null != regionPath) {
1642          tpe.execute(new FSRegionScanner(fs, regionPath, null, regionDegreeLocalityMapping));
1643        }
1644      }
1645    } finally {
1646      tpe.shutdown();
1647      final long threadWakeFrequency = (long) conf.getInt(HConstants.THREAD_WAKE_FREQUENCY,
1648        HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
1649      try {
1650        // here we wait until TPE terminates, which is either naturally or by
1651        // exceptions in the execution of the threads
1652        while (!tpe.awaitTermination(threadWakeFrequency, TimeUnit.MILLISECONDS)) {
1653          // printing out rough estimate, so as to not introduce
1654          // AtomicInteger
1655          LOG.info("Locality checking is underway: { Scanned Regions : "
1656            + ((ThreadPoolExecutor) tpe).getCompletedTaskCount() + "/"
1657            + ((ThreadPoolExecutor) tpe).getTaskCount() + " }");
1658        }
1659      } catch (InterruptedException e) {
1660        Thread.currentThread().interrupt();
1661        throw (InterruptedIOException) new InterruptedIOException().initCause(e);
1662      }
1663    }
1664
1665    long overhead = EnvironmentEdgeManager.currentTime() - startTime;
1666    LOG.info("Scan DFS for locality info takes {}ms", overhead);
1667  }
1668
1669  /**
1670   * Do our short circuit read setup. Checks buffer size to use and whether to do checksumming in
1671   * hbase or hdfs. n
1672   */
1673  public static void setupShortCircuitRead(final Configuration conf) {
1674    // Check that the user has not set the "dfs.client.read.shortcircuit.skip.checksum" property.
1675    boolean shortCircuitSkipChecksum =
1676      conf.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false);
1677    boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
1678    if (shortCircuitSkipChecksum) {
1679      LOG.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not "
1680        + "be set to true."
1681        + (useHBaseChecksum
1682          ? " HBase checksum doesn't require "
1683            + "it, see https://issues.apache.org/jira/browse/HBASE-6868."
1684          : ""));
1685      assert !shortCircuitSkipChecksum; // this will fail if assertions are on
1686    }
1687    checkShortCircuitReadBufferSize(conf);
1688  }
1689
1690  /**
1691   * Check if short circuit read buffer size is set and if not, set it to hbase value. n
1692   */
1693  public static void checkShortCircuitReadBufferSize(final Configuration conf) {
1694    final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2;
1695    final int notSet = -1;
1696    // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2
1697    final String dfsKey = "dfs.client.read.shortcircuit.buffer.size";
1698    int size = conf.getInt(dfsKey, notSet);
1699    // If a size is set, return -- we will use it.
1700    if (size != notSet) return;
1701    // But short circuit buffer size is normally not set. Put in place the hbase wanted size.
1702    int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize);
1703    conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
1704  }
1705
1706  /**
1707   * n * @return The DFSClient DFSHedgedReadMetrics instance or null if can't be found or not on
1708   * hdfs. n
1709   */
1710  public static DFSHedgedReadMetrics getDFSHedgedReadMetrics(final Configuration c)
1711    throws IOException {
1712    if (!CommonFSUtils.isHDFS(c)) {
1713      return null;
1714    }
1715    // getHedgedReadMetrics is package private. Get the DFSClient instance that is internal
1716    // to the DFS FS instance and make the method getHedgedReadMetrics accessible, then invoke it
1717    // to get the singleton instance of DFSHedgedReadMetrics shared by DFSClients.
1718    final String name = "getHedgedReadMetrics";
1719    DFSClient dfsclient = ((DistributedFileSystem) FileSystem.get(c)).getClient();
1720    Method m;
1721    try {
1722      m = dfsclient.getClass().getDeclaredMethod(name);
1723    } catch (NoSuchMethodException e) {
1724      LOG.warn(
1725        "Failed find method " + name + " in dfsclient; no hedged read metrics: " + e.getMessage());
1726      return null;
1727    } catch (SecurityException e) {
1728      LOG.warn(
1729        "Failed find method " + name + " in dfsclient; no hedged read metrics: " + e.getMessage());
1730      return null;
1731    }
1732    m.setAccessible(true);
1733    try {
1734      return (DFSHedgedReadMetrics) m.invoke(dfsclient);
1735    } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
1736      LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: "
1737        + e.getMessage());
1738      return null;
1739    }
1740  }
1741
1742  public static List<Path> copyFilesParallel(FileSystem srcFS, Path src, FileSystem dstFS, Path dst,
1743    Configuration conf, int threads) throws IOException {
1744    ExecutorService pool = Executors.newFixedThreadPool(threads);
1745    List<Future<Void>> futures = new ArrayList<>();
1746    List<Path> traversedPaths;
1747    try {
1748      traversedPaths = copyFiles(srcFS, src, dstFS, dst, conf, pool, futures);
1749      for (Future<Void> future : futures) {
1750        future.get();
1751      }
1752    } catch (ExecutionException | InterruptedException | IOException e) {
1753      throw new IOException("Copy snapshot reference files failed", e);
1754    } finally {
1755      pool.shutdownNow();
1756    }
1757    return traversedPaths;
1758  }
1759
1760  private static List<Path> copyFiles(FileSystem srcFS, Path src, FileSystem dstFS, Path dst,
1761    Configuration conf, ExecutorService pool, List<Future<Void>> futures) throws IOException {
1762    List<Path> traversedPaths = new ArrayList<>();
1763    traversedPaths.add(dst);
1764    FileStatus currentFileStatus = srcFS.getFileStatus(src);
1765    if (currentFileStatus.isDirectory()) {
1766      if (!dstFS.mkdirs(dst)) {
1767        throw new IOException("Create directory failed: " + dst);
1768      }
1769      FileStatus[] subPaths = srcFS.listStatus(src);
1770      for (FileStatus subPath : subPaths) {
1771        traversedPaths.addAll(copyFiles(srcFS, subPath.getPath(), dstFS,
1772          new Path(dst, subPath.getPath().getName()), conf, pool, futures));
1773      }
1774    } else {
1775      Future<Void> future = pool.submit(() -> {
1776        FileUtil.copy(srcFS, src, dstFS, dst, false, false, conf);
1777        return null;
1778      });
1779      futures.add(future);
1780    }
1781    return traversedPaths;
1782  }
1783
1784  /**
1785   * @return A set containing all namenode addresses of fs
1786   */
1787  private static Set<InetSocketAddress> getNNAddresses(DistributedFileSystem fs,
1788    Configuration conf) {
1789    Set<InetSocketAddress> addresses = new HashSet<>();
1790    String serviceName = fs.getCanonicalServiceName();
1791
1792    if (serviceName.startsWith("ha-hdfs")) {
1793      try {
1794        Map<String, Map<String, InetSocketAddress>> addressMap =
1795          DFSUtil.getNNServiceRpcAddressesForCluster(conf);
1796        String nameService = serviceName.substring(serviceName.indexOf(":") + 1);
1797        if (addressMap.containsKey(nameService)) {
1798          Map<String, InetSocketAddress> nnMap = addressMap.get(nameService);
1799          for (Map.Entry<String, InetSocketAddress> e2 : nnMap.entrySet()) {
1800            InetSocketAddress addr = e2.getValue();
1801            addresses.add(addr);
1802          }
1803        }
1804      } catch (Exception e) {
1805        LOG.warn("DFSUtil.getNNServiceRpcAddresses failed. serviceName=" + serviceName, e);
1806      }
1807    } else {
1808      URI uri = fs.getUri();
1809      int port = uri.getPort();
1810      if (port < 0) {
1811        int idx = serviceName.indexOf(':');
1812        port = Integer.parseInt(serviceName.substring(idx + 1));
1813      }
1814      InetSocketAddress addr = new InetSocketAddress(uri.getHost(), port);
1815      addresses.add(addr);
1816    }
1817
1818    return addresses;
1819  }
1820
1821  /**
1822   * @param conf the Configuration of HBase
1823   * @return Whether srcFs and desFs are on same hdfs or not
1824   */
1825  public static boolean isSameHdfs(Configuration conf, FileSystem srcFs, FileSystem desFs) {
1826    // By getCanonicalServiceName, we could make sure both srcFs and desFs
1827    // show a unified format which contains scheme, host and port.
1828    String srcServiceName = srcFs.getCanonicalServiceName();
1829    String desServiceName = desFs.getCanonicalServiceName();
1830
1831    if (srcServiceName == null || desServiceName == null) {
1832      return false;
1833    }
1834    if (srcServiceName.equals(desServiceName)) {
1835      return true;
1836    }
1837    if (srcServiceName.startsWith("ha-hdfs") && desServiceName.startsWith("ha-hdfs")) {
1838      Collection<String> internalNameServices =
1839        conf.getTrimmedStringCollection("dfs.internal.nameservices");
1840      if (!internalNameServices.isEmpty()) {
1841        if (internalNameServices.contains(srcServiceName.split(":")[1])) {
1842          return true;
1843        } else {
1844          return false;
1845        }
1846      }
1847    }
1848    if (srcFs instanceof DistributedFileSystem && desFs instanceof DistributedFileSystem) {
1849      // If one serviceName is an HA format while the other is a non-HA format,
1850      // maybe they refer to the same FileSystem.
1851      // For example, srcFs is "ha-hdfs://nameservices" and desFs is "hdfs://activeNamenode:port"
1852      Set<InetSocketAddress> srcAddrs = getNNAddresses((DistributedFileSystem) srcFs, conf);
1853      Set<InetSocketAddress> desAddrs = getNNAddresses((DistributedFileSystem) desFs, conf);
1854      if (Sets.intersection(srcAddrs, desAddrs).size() > 0) {
1855        return true;
1856      }
1857    }
1858
1859    return false;
1860  }
1861}