001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import static org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET;
021
022import edu.umd.cs.findbugs.annotations.CheckForNull;
023import java.io.ByteArrayInputStream;
024import java.io.DataInputStream;
025import java.io.EOFException;
026import java.io.FileNotFoundException;
027import java.io.IOException;
028import java.io.InterruptedIOException;
029import java.lang.reflect.InvocationTargetException;
030import java.lang.reflect.Method;
031import java.net.InetSocketAddress;
032import java.net.URI;
033import java.util.ArrayList;
034import java.util.Arrays;
035import java.util.Collection;
036import java.util.Collections;
037import java.util.HashMap;
038import java.util.HashSet;
039import java.util.Iterator;
040import java.util.List;
041import java.util.Locale;
042import java.util.Map;
043import java.util.Set;
044import java.util.Vector;
045import java.util.concurrent.ConcurrentHashMap;
046import java.util.concurrent.ExecutionException;
047import java.util.concurrent.ExecutorService;
048import java.util.concurrent.Executors;
049import java.util.concurrent.Future;
050import java.util.concurrent.FutureTask;
051import java.util.concurrent.ThreadPoolExecutor;
052import java.util.concurrent.TimeUnit;
053import java.util.regex.Pattern;
054import org.apache.commons.lang3.ArrayUtils;
055import org.apache.hadoop.conf.Configuration;
056import org.apache.hadoop.fs.BlockLocation;
057import org.apache.hadoop.fs.FSDataInputStream;
058import org.apache.hadoop.fs.FSDataOutputStream;
059import org.apache.hadoop.fs.FileStatus;
060import org.apache.hadoop.fs.FileSystem;
061import org.apache.hadoop.fs.FileUtil;
062import org.apache.hadoop.fs.Path;
063import org.apache.hadoop.fs.PathFilter;
064import org.apache.hadoop.fs.StorageType;
065import org.apache.hadoop.fs.permission.FsPermission;
066import org.apache.hadoop.hbase.ClusterId;
067import org.apache.hadoop.hbase.HColumnDescriptor;
068import org.apache.hadoop.hbase.HConstants;
069import org.apache.hadoop.hbase.HDFSBlocksDistribution;
070import org.apache.hadoop.hbase.TableName;
071import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
072import org.apache.hadoop.hbase.client.RegionInfo;
073import org.apache.hadoop.hbase.client.RegionInfoBuilder;
074import org.apache.hadoop.hbase.exceptions.DeserializationException;
075import org.apache.hadoop.hbase.fs.HFileSystem;
076import org.apache.hadoop.hbase.io.HFileLink;
077import org.apache.hadoop.hbase.master.HMaster;
078import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
079import org.apache.hadoop.hdfs.DFSClient;
080import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
081import org.apache.hadoop.hdfs.DFSUtil;
082import org.apache.hadoop.hdfs.DistributedFileSystem;
083import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
084import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
085import org.apache.hadoop.hdfs.protocol.LocatedBlock;
086import org.apache.hadoop.io.IOUtils;
087import org.apache.hadoop.ipc.RemoteException;
088import org.apache.hadoop.util.Progressable;
089import org.apache.hadoop.util.StringUtils;
090import org.apache.yetus.audience.InterfaceAudience;
091import org.slf4j.Logger;
092import org.slf4j.LoggerFactory;
093
094import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
095import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
096import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
097import org.apache.hbase.thirdparty.com.google.common.primitives.Ints;
098import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
099
100import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
101import org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos;
102
103/**
104 * Utility methods for interacting with the underlying file system.
105 */
106@InterfaceAudience.Private
107public final class FSUtils {
108  private static final Logger LOG = LoggerFactory.getLogger(FSUtils.class);
109
110  private static final String THREAD_POOLSIZE = "hbase.client.localityCheck.threadPoolSize";
111  private static final int DEFAULT_THREAD_POOLSIZE = 2;
112
113  /** Set to true on Windows platforms */
114  // currently only used in testing. TODO refactor into a test class
115  public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
116
117  private FSUtils() {
118  }
119
120  /** Returns True is <code>fs</code> is instance of DistributedFileSystem n */
121  public static boolean isDistributedFileSystem(final FileSystem fs) throws IOException {
122    FileSystem fileSystem = fs;
123    // If passed an instance of HFileSystem, it fails instanceof DistributedFileSystem.
124    // Check its backing fs for dfs-ness.
125    if (fs instanceof HFileSystem) {
126      fileSystem = ((HFileSystem) fs).getBackingFs();
127    }
128    return fileSystem instanceof DistributedFileSystem;
129  }
130
131  /**
132   * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
133   * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true. Does not consider
134   * schema; i.e. if schemas different but path or subpath matches, the two will equate.
135   * @param pathToSearch Path we will be trying to match. n * @return True if <code>pathTail</code>
136   *                     is tail on the path of <code>pathToSearch</code>
137   */
138  public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) {
139    Path tailPath = pathTail;
140    String tailName;
141    Path toSearch = pathToSearch;
142    String toSearchName;
143    boolean result = false;
144
145    if (pathToSearch.depth() != pathTail.depth()) {
146      return false;
147    }
148
149    do {
150      tailName = tailPath.getName();
151      if (tailName == null || tailName.isEmpty()) {
152        result = true;
153        break;
154      }
155      toSearchName = toSearch.getName();
156      if (toSearchName == null || toSearchName.isEmpty()) {
157        break;
158      }
159      // Move up a parent on each path for next go around. Path doesn't let us go off the end.
160      tailPath = tailPath.getParent();
161      toSearch = toSearch.getParent();
162    } while (tailName.equals(toSearchName));
163    return result;
164  }
165
166  /**
167   * Delete the region directory if exists.
168   * @return True if deleted the region directory. n
169   */
170  public static boolean deleteRegionDir(final Configuration conf, final RegionInfo hri)
171    throws IOException {
172    Path rootDir = CommonFSUtils.getRootDir(conf);
173    FileSystem fs = rootDir.getFileSystem(conf);
174    return CommonFSUtils.deleteDirectory(fs,
175      new Path(CommonFSUtils.getTableDir(rootDir, hri.getTable()), hri.getEncodedName()));
176  }
177
178  /**
179   * Create the specified file on the filesystem. By default, this will:
180   * <ol>
181   * <li>overwrite the file if it exists</li>
182   * <li>apply the umask in the configuration (if it is enabled)</li>
183   * <li>use the fs configured buffer size (or 4096 if not set)</li>
184   * <li>use the configured column family replication or default replication if
185   * {@link ColumnFamilyDescriptorBuilder#DEFAULT_DFS_REPLICATION}</li>
186   * <li>use the default block size</li>
187   * <li>not track progress</li>
188   * </ol>
189   * @param conf         configurations
190   * @param fs           {@link FileSystem} on which to write the file
191   * @param path         {@link Path} to the file to write
192   * @param perm         permissions
193   * @param favoredNodes favored data nodes
194   * @return output stream to the created file
195   * @throws IOException if the file cannot be created
196   */
197  public static FSDataOutputStream create(Configuration conf, FileSystem fs, Path path,
198    FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException {
199    if (fs instanceof HFileSystem) {
200      FileSystem backingFs = ((HFileSystem) fs).getBackingFs();
201      if (backingFs instanceof DistributedFileSystem) {
202        // Try to use the favoredNodes version via reflection to allow backwards-
203        // compatibility.
204        short replication = Short.parseShort(conf.get(ColumnFamilyDescriptorBuilder.DFS_REPLICATION,
205          String.valueOf(ColumnFamilyDescriptorBuilder.DEFAULT_DFS_REPLICATION)));
206        try {
207          return (FSDataOutputStream) (DistributedFileSystem.class
208            .getDeclaredMethod("create", Path.class, FsPermission.class, boolean.class, int.class,
209              short.class, long.class, Progressable.class, InetSocketAddress[].class)
210            .invoke(backingFs, path, perm, true, CommonFSUtils.getDefaultBufferSize(backingFs),
211              replication > 0 ? replication : CommonFSUtils.getDefaultReplication(backingFs, path),
212              CommonFSUtils.getDefaultBlockSize(backingFs, path), null, favoredNodes));
213        } catch (InvocationTargetException ite) {
214          // Function was properly called, but threw it's own exception.
215          throw new IOException(ite.getCause());
216        } catch (NoSuchMethodException e) {
217          LOG.debug("DFS Client does not support most favored nodes create; using default create");
218          LOG.trace("Ignoring; use default create", e);
219        } catch (IllegalArgumentException | SecurityException | IllegalAccessException e) {
220          LOG.debug("Ignoring (most likely Reflection related exception) " + e);
221        }
222      }
223    }
224    return CommonFSUtils.create(fs, path, perm, true);
225  }
226
227  /**
228   * Checks to see if the specified file system is available
229   * @param fs filesystem
230   * @throws IOException e
231   */
232  public static void checkFileSystemAvailable(final FileSystem fs) throws IOException {
233    if (!(fs instanceof DistributedFileSystem)) {
234      return;
235    }
236    IOException exception = null;
237    DistributedFileSystem dfs = (DistributedFileSystem) fs;
238    try {
239      if (dfs.exists(new Path("/"))) {
240        return;
241      }
242    } catch (IOException e) {
243      exception = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
244    }
245    try {
246      fs.close();
247    } catch (Exception e) {
248      LOG.error("file system close failed: ", e);
249    }
250    throw new IOException("File system is not available", exception);
251  }
252
253  /**
254   * Inquire the Active NameNode's safe mode status.
255   * @param dfs A DistributedFileSystem object representing the underlying HDFS.
256   * @return whether we're in safe mode n
257   */
258  private static boolean isInSafeMode(DistributedFileSystem dfs) throws IOException {
259    return dfs.setSafeMode(SAFEMODE_GET, true);
260  }
261
262  /**
263   * Check whether dfs is in safemode. nn
264   */
265  public static void checkDfsSafeMode(final Configuration conf) throws IOException {
266    boolean isInSafeMode = false;
267    FileSystem fs = FileSystem.get(conf);
268    if (fs instanceof DistributedFileSystem) {
269      DistributedFileSystem dfs = (DistributedFileSystem) fs;
270      isInSafeMode = isInSafeMode(dfs);
271    }
272    if (isInSafeMode) {
273      throw new IOException("File system is in safemode, it can't be written now");
274    }
275  }
276
277  /**
278   * Verifies current version of file system
279   * @param fs      filesystem object
280   * @param rootdir root hbase directory
281   * @return null if no version file exists, version string otherwise
282   * @throws IOException              if the version file fails to open
283   * @throws DeserializationException if the version data cannot be translated into a version
284   */
285  public static String getVersion(FileSystem fs, Path rootdir)
286    throws IOException, DeserializationException {
287    final Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
288    FileStatus[] status = null;
289    try {
290      // hadoop 2.0 throws FNFE if directory does not exist.
291      // hadoop 1.0 returns null if directory does not exist.
292      status = fs.listStatus(versionFile);
293    } catch (FileNotFoundException fnfe) {
294      return null;
295    }
296    if (ArrayUtils.getLength(status) == 0) {
297      return null;
298    }
299    String version = null;
300    byte[] content = new byte[(int) status[0].getLen()];
301    FSDataInputStream s = fs.open(versionFile);
302    try {
303      IOUtils.readFully(s, content, 0, content.length);
304      if (ProtobufUtil.isPBMagicPrefix(content)) {
305        version = parseVersionFrom(content);
306      } else {
307        // Presume it pre-pb format.
308        try (DataInputStream dis = new DataInputStream(new ByteArrayInputStream(content))) {
309          version = dis.readUTF();
310        }
311      }
312    } catch (EOFException eof) {
313      LOG.warn("Version file was empty, odd, will try to set it.");
314    } finally {
315      s.close();
316    }
317    return version;
318  }
319
320  /**
321   * Parse the content of the ${HBASE_ROOTDIR}/hbase.version file.
322   * @param bytes The byte content of the hbase.version file
323   * @return The version found in the file as a String
324   * @throws DeserializationException if the version data cannot be translated into a version
325   */
326  static String parseVersionFrom(final byte[] bytes) throws DeserializationException {
327    ProtobufUtil.expectPBMagicPrefix(bytes);
328    int pblen = ProtobufUtil.lengthOfPBMagic();
329    FSProtos.HBaseVersionFileContent.Builder builder =
330      FSProtos.HBaseVersionFileContent.newBuilder();
331    try {
332      ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
333      return builder.getVersion();
334    } catch (IOException e) {
335      // Convert
336      throw new DeserializationException(e);
337    }
338  }
339
340  /**
341   * Create the content to write into the ${HBASE_ROOTDIR}/hbase.version file.
342   * @param version Version to persist
343   * @return Serialized protobuf with <code>version</code> content and a bit of pb magic for a
344   *         prefix.
345   */
346  static byte[] toVersionByteArray(final String version) {
347    FSProtos.HBaseVersionFileContent.Builder builder =
348      FSProtos.HBaseVersionFileContent.newBuilder();
349    return ProtobufUtil.prependPBMagic(builder.setVersion(version).build().toByteArray());
350  }
351
352  /**
353   * Verifies current version of file system
354   * @param fs      file system
355   * @param rootdir root directory of HBase installation
356   * @param message if true, issues a message on System.out
357   * @throws IOException              if the version file cannot be opened
358   * @throws DeserializationException if the contents of the version file cannot be parsed
359   */
360  public static void checkVersion(FileSystem fs, Path rootdir, boolean message)
361    throws IOException, DeserializationException {
362    checkVersion(fs, rootdir, message, 0, HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
363  }
364
365  /**
366   * Verifies current version of file system
367   * @param fs      file system
368   * @param rootdir root directory of HBase installation
369   * @param message if true, issues a message on System.out
370   * @param wait    wait interval
371   * @param retries number of times to retry
372   * @throws IOException              if the version file cannot be opened
373   * @throws DeserializationException if the contents of the version file cannot be parsed
374   */
375  public static void checkVersion(FileSystem fs, Path rootdir, boolean message, int wait,
376    int retries) throws IOException, DeserializationException {
377    String version = getVersion(fs, rootdir);
378    String msg;
379    if (version == null) {
380      if (!metaRegionExists(fs, rootdir)) {
381        // rootDir is empty (no version file and no root region)
382        // just create new version file (HBASE-1195)
383        setVersion(fs, rootdir, wait, retries);
384        return;
385      } else {
386        msg = "hbase.version file is missing. Is your hbase.rootdir valid? "
387          + "You can restore hbase.version file by running 'HBCK2 filesystem -fix'. "
388          + "See https://github.com/apache/hbase-operator-tools/tree/master/hbase-hbck2";
389      }
390    } else if (version.compareTo(HConstants.FILE_SYSTEM_VERSION) == 0) {
391      return;
392    } else {
393      msg = "HBase file layout needs to be upgraded. Current filesystem version is " + version
394        + " but software requires version " + HConstants.FILE_SYSTEM_VERSION
395        + ". Consult http://hbase.apache.org/book.html for further information about "
396        + "upgrading HBase.";
397    }
398
399    // version is deprecated require migration
400    // Output on stdout so user sees it in terminal.
401    if (message) {
402      System.out.println("WARNING! " + msg);
403    }
404    throw new FileSystemVersionException(msg);
405  }
406
407  /**
408   * Sets version of file system
409   * @param fs      filesystem object
410   * @param rootdir hbase root
411   * @throws IOException e
412   */
413  public static void setVersion(FileSystem fs, Path rootdir) throws IOException {
414    setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, 0,
415      HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
416  }
417
418  /**
419   * Sets version of file system
420   * @param fs      filesystem object
421   * @param rootdir hbase root
422   * @param wait    time to wait for retry
423   * @param retries number of times to retry before failing
424   * @throws IOException e
425   */
426  public static void setVersion(FileSystem fs, Path rootdir, int wait, int retries)
427    throws IOException {
428    setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, wait, retries);
429  }
430
431  /**
432   * Sets version of file system
433   * @param fs      filesystem object
434   * @param rootdir hbase root directory
435   * @param version version to set
436   * @param wait    time to wait for retry
437   * @param retries number of times to retry before throwing an IOException
438   * @throws IOException e
439   */
440  public static void setVersion(FileSystem fs, Path rootdir, String version, int wait, int retries)
441    throws IOException {
442    Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
443    Path tempVersionFile = new Path(rootdir,
444      HConstants.HBASE_TEMP_DIRECTORY + Path.SEPARATOR + HConstants.VERSION_FILE_NAME);
445    while (true) {
446      try {
447        // Write the version to a temporary file
448        FSDataOutputStream s = fs.create(tempVersionFile);
449        try {
450          s.write(toVersionByteArray(version));
451          s.close();
452          s = null;
453          // Move the temp version file to its normal location. Returns false
454          // if the rename failed. Throw an IOE in that case.
455          if (!fs.rename(tempVersionFile, versionFile)) {
456            throw new IOException("Unable to move temp version file to " + versionFile);
457          }
458        } finally {
459          // Cleaning up the temporary if the rename failed would be trying
460          // too hard. We'll unconditionally create it again the next time
461          // through anyway, files are overwritten by default by create().
462
463          // Attempt to close the stream on the way out if it is still open.
464          try {
465            if (s != null) s.close();
466          } catch (IOException ignore) {
467          }
468        }
469        LOG.info("Created version file at " + rootdir.toString() + " with version=" + version);
470        return;
471      } catch (IOException e) {
472        if (retries > 0) {
473          LOG.debug("Unable to create version file at " + rootdir.toString() + ", retrying", e);
474          fs.delete(versionFile, false);
475          try {
476            if (wait > 0) {
477              Thread.sleep(wait);
478            }
479          } catch (InterruptedException ie) {
480            throw (InterruptedIOException) new InterruptedIOException().initCause(ie);
481          }
482          retries--;
483        } else {
484          throw e;
485        }
486      }
487    }
488  }
489
490  /**
491   * Checks that a cluster ID file exists in the HBase root directory
492   * @param fs      the root directory FileSystem
493   * @param rootdir the HBase root directory in HDFS
494   * @param wait    how long to wait between retries
495   * @return <code>true</code> if the file exists, otherwise <code>false</code>
496   * @throws IOException if checking the FileSystem fails
497   */
498  public static boolean checkClusterIdExists(FileSystem fs, Path rootdir, long wait)
499    throws IOException {
500    while (true) {
501      try {
502        Path filePath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
503        return fs.exists(filePath);
504      } catch (IOException ioe) {
505        if (wait > 0L) {
506          LOG.warn("Unable to check cluster ID file in {}, retrying in {}ms", rootdir, wait, ioe);
507          try {
508            Thread.sleep(wait);
509          } catch (InterruptedException e) {
510            Thread.currentThread().interrupt();
511            throw (InterruptedIOException) new InterruptedIOException().initCause(e);
512          }
513        } else {
514          throw ioe;
515        }
516      }
517    }
518  }
519
520  /**
521   * Returns the value of the unique cluster ID stored for this HBase instance.
522   * @param fs      the root directory FileSystem
523   * @param rootdir the path to the HBase root directory
524   * @return the unique cluster identifier
525   * @throws IOException if reading the cluster ID file fails
526   */
527  public static ClusterId getClusterId(FileSystem fs, Path rootdir) throws IOException {
528    Path idPath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
529    ClusterId clusterId = null;
530    FileStatus status = fs.exists(idPath) ? fs.getFileStatus(idPath) : null;
531    if (status != null) {
532      int len = Ints.checkedCast(status.getLen());
533      byte[] content = new byte[len];
534      FSDataInputStream in = fs.open(idPath);
535      try {
536        in.readFully(content);
537      } catch (EOFException eof) {
538        LOG.warn("Cluster ID file {} is empty", idPath);
539      } finally {
540        in.close();
541      }
542      try {
543        clusterId = ClusterId.parseFrom(content);
544      } catch (DeserializationException e) {
545        throw new IOException("content=" + Bytes.toString(content), e);
546      }
547      // If not pb'd, make it so.
548      if (!ProtobufUtil.isPBMagicPrefix(content)) {
549        String cid = null;
550        in = fs.open(idPath);
551        try {
552          cid = in.readUTF();
553          clusterId = new ClusterId(cid);
554        } catch (EOFException eof) {
555          LOG.warn("Cluster ID file {} is empty", idPath);
556        } finally {
557          in.close();
558        }
559        rewriteAsPb(fs, rootdir, idPath, clusterId);
560      }
561      return clusterId;
562    } else {
563      LOG.warn("Cluster ID file does not exist at {}", idPath);
564    }
565    return clusterId;
566  }
567
568  /**
569   * nn
570   */
571  private static void rewriteAsPb(final FileSystem fs, final Path rootdir, final Path p,
572    final ClusterId cid) throws IOException {
573    // Rewrite the file as pb. Move aside the old one first, write new
574    // then delete the moved-aside file.
575    Path movedAsideName = new Path(p + "." + EnvironmentEdgeManager.currentTime());
576    if (!fs.rename(p, movedAsideName)) throw new IOException("Failed rename of " + p);
577    setClusterId(fs, rootdir, cid, 100);
578    if (!fs.delete(movedAsideName, false)) {
579      throw new IOException("Failed delete of " + movedAsideName);
580    }
581    LOG.debug("Rewrote the hbase.id file as pb");
582  }
583
584  /**
585   * Writes a new unique identifier for this cluster to the "hbase.id" file in the HBase root
586   * directory
587   * @param fs        the root directory FileSystem
588   * @param rootdir   the path to the HBase root directory
589   * @param clusterId the unique identifier to store
590   * @param wait      how long (in milliseconds) to wait between retries
591   * @throws IOException if writing to the FileSystem fails and no wait value
592   */
593  public static void setClusterId(FileSystem fs, Path rootdir, ClusterId clusterId, int wait)
594    throws IOException {
595    while (true) {
596      try {
597        Path idFile = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
598        Path tempIdFile = new Path(rootdir,
599          HConstants.HBASE_TEMP_DIRECTORY + Path.SEPARATOR + HConstants.CLUSTER_ID_FILE_NAME);
600        // Write the id file to a temporary location
601        FSDataOutputStream s = fs.create(tempIdFile);
602        try {
603          s.write(clusterId.toByteArray());
604          s.close();
605          s = null;
606          // Move the temporary file to its normal location. Throw an IOE if
607          // the rename failed
608          if (!fs.rename(tempIdFile, idFile)) {
609            throw new IOException("Unable to move temp version file to " + idFile);
610          }
611        } finally {
612          // Attempt to close the stream if still open on the way out
613          try {
614            if (s != null) s.close();
615          } catch (IOException ignore) {
616          }
617        }
618        if (LOG.isDebugEnabled()) {
619          LOG.debug("Created cluster ID file at " + idFile.toString() + " with ID: " + clusterId);
620        }
621        return;
622      } catch (IOException ioe) {
623        if (wait > 0) {
624          LOG.warn("Unable to create cluster ID file in " + rootdir.toString() + ", retrying in "
625            + wait + "msec: " + StringUtils.stringifyException(ioe));
626          try {
627            Thread.sleep(wait);
628          } catch (InterruptedException e) {
629            throw (InterruptedIOException) new InterruptedIOException().initCause(e);
630          }
631        } else {
632          throw ioe;
633        }
634      }
635    }
636  }
637
638  /**
639   * If DFS, check safe mode and if so, wait until we clear it.
640   * @param conf configuration
641   * @param wait Sleep between retries
642   * @throws IOException e
643   */
644  public static void waitOnSafeMode(final Configuration conf, final long wait) throws IOException {
645    FileSystem fs = FileSystem.get(conf);
646    if (!(fs instanceof DistributedFileSystem)) return;
647    DistributedFileSystem dfs = (DistributedFileSystem) fs;
648    // Make sure dfs is not in safe mode
649    while (isInSafeMode(dfs)) {
650      LOG.info("Waiting for dfs to exit safe mode...");
651      try {
652        Thread.sleep(wait);
653      } catch (InterruptedException e) {
654        Thread.currentThread().interrupt();
655        throw (InterruptedIOException) new InterruptedIOException().initCause(e);
656      }
657    }
658  }
659
660  /**
661   * Checks if meta region exists
662   * @param fs      file system
663   * @param rootDir root directory of HBase installation
664   * @return true if exists
665   */
666  public static boolean metaRegionExists(FileSystem fs, Path rootDir) throws IOException {
667    Path metaRegionDir = getRegionDirFromRootDir(rootDir, RegionInfoBuilder.FIRST_META_REGIONINFO);
668    return fs.exists(metaRegionDir);
669  }
670
671  /**
672   * Compute HDFS block distribution of a given HdfsDataInputStream. All HdfsDataInputStreams are
673   * backed by a series of LocatedBlocks, which are fetched periodically from the namenode. This
674   * method retrieves those blocks from the input stream and uses them to calculate
675   * HDFSBlockDistribution. The underlying method in DFSInputStream does attempt to use locally
676   * cached blocks, but may hit the namenode if the cache is determined to be incomplete. The method
677   * also involves making copies of all LocatedBlocks rather than return the underlying blocks
678   * themselves.
679   */
680  public static HDFSBlocksDistribution
681    computeHDFSBlocksDistribution(HdfsDataInputStream inputStream) throws IOException {
682    List<LocatedBlock> blocks = inputStream.getAllBlocks();
683    HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution();
684    for (LocatedBlock block : blocks) {
685      String[] hosts = getHostsForLocations(block);
686      long len = block.getBlockSize();
687      StorageType[] storageTypes = block.getStorageTypes();
688      blocksDistribution.addHostsAndBlockWeight(hosts, len, storageTypes);
689    }
690    return blocksDistribution;
691  }
692
693  private static String[] getHostsForLocations(LocatedBlock block) {
694    DatanodeInfo[] locations = block.getLocations();
695    String[] hosts = new String[locations.length];
696    for (int i = 0; i < hosts.length; i++) {
697      hosts[i] = locations[i].getHostName();
698    }
699    return hosts;
700  }
701
702  /**
703   * Compute HDFS blocks distribution of a given file, or a portion of the file
704   * @param fs     file system
705   * @param status file status of the file
706   * @param start  start position of the portion
707   * @param length length of the portion
708   * @return The HDFS blocks distribution
709   */
710  static public HDFSBlocksDistribution computeHDFSBlocksDistribution(final FileSystem fs,
711    FileStatus status, long start, long length) throws IOException {
712    HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution();
713    BlockLocation[] blockLocations = fs.getFileBlockLocations(status, start, length);
714    addToHDFSBlocksDistribution(blocksDistribution, blockLocations);
715    return blocksDistribution;
716  }
717
718  /**
719   * Update blocksDistribution with blockLocations
720   * @param blocksDistribution the hdfs blocks distribution
721   * @param blockLocations     an array containing block location
722   */
723  static public void addToHDFSBlocksDistribution(HDFSBlocksDistribution blocksDistribution,
724    BlockLocation[] blockLocations) throws IOException {
725    for (BlockLocation bl : blockLocations) {
726      String[] hosts = bl.getHosts();
727      long len = bl.getLength();
728      StorageType[] storageTypes = bl.getStorageTypes();
729      blocksDistribution.addHostsAndBlockWeight(hosts, len, storageTypes);
730    }
731  }
732
733  // TODO move this method OUT of FSUtils. No dependencies to HMaster
734  /**
735   * Returns the total overall fragmentation percentage. Includes hbase:meta and -ROOT- as well.
736   * @param master The master defining the HBase root and file system
737   * @return A map for each table and its percentage (never null)
738   * @throws IOException When scanning the directory fails
739   */
740  public static int getTotalTableFragmentation(final HMaster master) throws IOException {
741    Map<String, Integer> map = getTableFragmentation(master);
742    return map.isEmpty() ? -1 : map.get("-TOTAL-");
743  }
744
745  /**
746   * Runs through the HBase rootdir and checks how many stores for each table have more than one
747   * file in them. Checks -ROOT- and hbase:meta too. The total percentage across all tables is
748   * stored under the special key "-TOTAL-".
749   * @param master The master defining the HBase root and file system.
750   * @return A map for each table and its percentage (never null).
751   * @throws IOException When scanning the directory fails.
752   */
753  public static Map<String, Integer> getTableFragmentation(final HMaster master)
754    throws IOException {
755    Path path = CommonFSUtils.getRootDir(master.getConfiguration());
756    // since HMaster.getFileSystem() is package private
757    FileSystem fs = path.getFileSystem(master.getConfiguration());
758    return getTableFragmentation(fs, path);
759  }
760
761  /**
762   * Runs through the HBase rootdir and checks how many stores for each table have more than one
763   * file in them. Checks -ROOT- and hbase:meta too. The total percentage across all tables is
764   * stored under the special key "-TOTAL-".
765   * @param fs           The file system to use
766   * @param hbaseRootDir The root directory to scan
767   * @return A map for each table and its percentage (never null)
768   * @throws IOException When scanning the directory fails
769   */
770  public static Map<String, Integer> getTableFragmentation(final FileSystem fs,
771    final Path hbaseRootDir) throws IOException {
772    Map<String, Integer> frags = new HashMap<>();
773    int cfCountTotal = 0;
774    int cfFragTotal = 0;
775    PathFilter regionFilter = new RegionDirFilter(fs);
776    PathFilter familyFilter = new FamilyDirFilter(fs);
777    List<Path> tableDirs = getTableDirs(fs, hbaseRootDir);
778    for (Path d : tableDirs) {
779      int cfCount = 0;
780      int cfFrag = 0;
781      FileStatus[] regionDirs = fs.listStatus(d, regionFilter);
782      for (FileStatus regionDir : regionDirs) {
783        Path dd = regionDir.getPath();
784        // else its a region name, now look in region for families
785        FileStatus[] familyDirs = fs.listStatus(dd, familyFilter);
786        for (FileStatus familyDir : familyDirs) {
787          cfCount++;
788          cfCountTotal++;
789          Path family = familyDir.getPath();
790          // now in family make sure only one file
791          FileStatus[] familyStatus = fs.listStatus(family);
792          if (familyStatus.length > 1) {
793            cfFrag++;
794            cfFragTotal++;
795          }
796        }
797      }
798      // compute percentage per table and store in result list
799      frags.put(CommonFSUtils.getTableName(d).getNameAsString(),
800        cfCount == 0 ? 0 : Math.round((float) cfFrag / cfCount * 100));
801    }
802    // set overall percentage for all tables
803    frags.put("-TOTAL-",
804      cfCountTotal == 0 ? 0 : Math.round((float) cfFragTotal / cfCountTotal * 100));
805    return frags;
806  }
807
808  /**
809   * A {@link PathFilter} that returns only regular files.
810   */
811  static class FileFilter extends AbstractFileStatusFilter {
812    private final FileSystem fs;
813
814    public FileFilter(final FileSystem fs) {
815      this.fs = fs;
816    }
817
818    @Override
819    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
820      try {
821        return isFile(fs, isDir, p);
822      } catch (IOException e) {
823        LOG.warn("Unable to verify if path={} is a regular file", p, e);
824        return false;
825      }
826    }
827  }
828
829  /**
830   * Directory filter that doesn't include any of the directories in the specified blacklist
831   */
832  public static class BlackListDirFilter extends AbstractFileStatusFilter {
833    private final FileSystem fs;
834    private List<String> blacklist;
835
836    /**
837     * Create a filter on the givem filesystem with the specified blacklist
838     * @param fs                     filesystem to filter
839     * @param directoryNameBlackList list of the names of the directories to filter. If
840     *                               <tt>null</tt>, all directories are returned
841     */
842    @SuppressWarnings("unchecked")
843    public BlackListDirFilter(final FileSystem fs, final List<String> directoryNameBlackList) {
844      this.fs = fs;
845      blacklist = (List<String>) (directoryNameBlackList == null
846        ? Collections.emptyList()
847        : directoryNameBlackList);
848    }
849
850    @Override
851    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
852      if (!isValidName(p.getName())) {
853        return false;
854      }
855
856      try {
857        return isDirectory(fs, isDir, p);
858      } catch (IOException e) {
859        LOG.warn("An error occurred while verifying if [{}] is a valid directory."
860          + " Returning 'not valid' and continuing.", p, e);
861        return false;
862      }
863    }
864
865    protected boolean isValidName(final String name) {
866      return !blacklist.contains(name);
867    }
868  }
869
870  /**
871   * A {@link PathFilter} that only allows directories.
872   */
873  public static class DirFilter extends BlackListDirFilter {
874
875    public DirFilter(FileSystem fs) {
876      super(fs, null);
877    }
878  }
879
880  /**
881   * A {@link PathFilter} that returns usertable directories. To get all directories use the
882   * {@link BlackListDirFilter} with a <tt>null</tt> blacklist
883   */
884  public static class UserTableDirFilter extends BlackListDirFilter {
885    public UserTableDirFilter(FileSystem fs) {
886      super(fs, HConstants.HBASE_NON_TABLE_DIRS);
887    }
888
889    @Override
890    protected boolean isValidName(final String name) {
891      if (!super.isValidName(name)) return false;
892
893      try {
894        TableName.isLegalTableQualifierName(Bytes.toBytes(name));
895      } catch (IllegalArgumentException e) {
896        LOG.info("Invalid table name: {}", name);
897        return false;
898      }
899      return true;
900    }
901  }
902
903  public static List<Path> getTableDirs(final FileSystem fs, final Path rootdir)
904    throws IOException {
905    List<Path> tableDirs = new ArrayList<>();
906    Path baseNamespaceDir = new Path(rootdir, HConstants.BASE_NAMESPACE_DIR);
907    if (fs.exists(baseNamespaceDir)) {
908      for (FileStatus status : fs.globStatus(new Path(baseNamespaceDir, "*"))) {
909        tableDirs.addAll(FSUtils.getLocalTableDirs(fs, status.getPath()));
910      }
911    }
912    return tableDirs;
913  }
914
915  /**
916   * nn * @return All the table directories under <code>rootdir</code>. Ignore non table hbase
917   * folders such as .logs, .oldlogs, .corrupt folders. n
918   */
919  public static List<Path> getLocalTableDirs(final FileSystem fs, final Path rootdir)
920    throws IOException {
921    // presumes any directory under hbase.rootdir is a table
922    FileStatus[] dirs = fs.listStatus(rootdir, new UserTableDirFilter(fs));
923    List<Path> tabledirs = new ArrayList<>(dirs.length);
924    for (FileStatus dir : dirs) {
925      tabledirs.add(dir.getPath());
926    }
927    return tabledirs;
928  }
929
930  /**
931   * Filter for all dirs that don't start with '.'
932   */
933  public static class RegionDirFilter extends AbstractFileStatusFilter {
934    // This pattern will accept 0.90+ style hex region dirs and older numeric region dir names.
935    final public static Pattern regionDirPattern = Pattern.compile("^[0-9a-f]*$");
936    final FileSystem fs;
937
938    public RegionDirFilter(FileSystem fs) {
939      this.fs = fs;
940    }
941
942    @Override
943    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
944      if (!regionDirPattern.matcher(p.getName()).matches()) {
945        return false;
946      }
947
948      try {
949        return isDirectory(fs, isDir, p);
950      } catch (IOException ioe) {
951        // Maybe the file was moved or the fs was disconnected.
952        LOG.warn("Skipping file {} due to IOException", p, ioe);
953        return false;
954      }
955    }
956  }
957
958  /**
959   * Given a particular table dir, return all the regiondirs inside it, excluding files such as
960   * .tableinfo
961   * @param fs       A file system for the Path
962   * @param tableDir Path to a specific table directory &lt;hbase.rootdir&gt;/&lt;tabledir&gt;
963   * @return List of paths to valid region directories in table dir. n
964   */
965  public static List<Path> getRegionDirs(final FileSystem fs, final Path tableDir)
966    throws IOException {
967    // assumes we are in a table dir.
968    List<FileStatus> rds = listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs));
969    if (rds == null) {
970      return Collections.emptyList();
971    }
972    List<Path> regionDirs = new ArrayList<>(rds.size());
973    for (FileStatus rdfs : rds) {
974      Path rdPath = rdfs.getPath();
975      regionDirs.add(rdPath);
976    }
977    return regionDirs;
978  }
979
980  public static Path getRegionDirFromRootDir(Path rootDir, RegionInfo region) {
981    return getRegionDirFromTableDir(CommonFSUtils.getTableDir(rootDir, region.getTable()), region);
982  }
983
984  public static Path getRegionDirFromTableDir(Path tableDir, RegionInfo region) {
985    return getRegionDirFromTableDir(tableDir,
986      ServerRegionReplicaUtil.getRegionInfoForFs(region).getEncodedName());
987  }
988
989  public static Path getRegionDirFromTableDir(Path tableDir, String encodedRegionName) {
990    return new Path(tableDir, encodedRegionName);
991  }
992
993  /**
994   * Filter for all dirs that are legal column family names. This is generally used for colfam dirs
995   * &lt;hbase.rootdir&gt;/&lt;tabledir&gt;/&lt;regiondir&gt;/&lt;colfamdir&gt;.
996   */
997  public static class FamilyDirFilter extends AbstractFileStatusFilter {
998    final FileSystem fs;
999
1000    public FamilyDirFilter(FileSystem fs) {
1001      this.fs = fs;
1002    }
1003
1004    @Override
1005    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1006      try {
1007        // throws IAE if invalid
1008        HColumnDescriptor.isLegalFamilyName(Bytes.toBytes(p.getName()));
1009      } catch (IllegalArgumentException iae) {
1010        // path name is an invalid family name and thus is excluded.
1011        return false;
1012      }
1013
1014      try {
1015        return isDirectory(fs, isDir, p);
1016      } catch (IOException ioe) {
1017        // Maybe the file was moved or the fs was disconnected.
1018        LOG.warn("Skipping file {} due to IOException", p, ioe);
1019        return false;
1020      }
1021    }
1022  }
1023
1024  /**
1025   * Given a particular region dir, return all the familydirs inside it
1026   * @param fs        A file system for the Path
1027   * @param regionDir Path to a specific region directory
1028   * @return List of paths to valid family directories in region dir. n
1029   */
1030  public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir)
1031    throws IOException {
1032    // assumes we are in a region dir.
1033    return getFilePaths(fs, regionDir, new FamilyDirFilter(fs));
1034  }
1035
1036  public static List<Path> getReferenceFilePaths(final FileSystem fs, final Path familyDir)
1037    throws IOException {
1038    return getFilePaths(fs, familyDir, new ReferenceFileFilter(fs));
1039  }
1040
1041  public static List<Path> getReferenceAndLinkFilePaths(final FileSystem fs, final Path familyDir)
1042    throws IOException {
1043    return getFilePaths(fs, familyDir, new ReferenceAndLinkFileFilter(fs));
1044  }
1045
1046  private static List<Path> getFilePaths(final FileSystem fs, final Path dir,
1047    final PathFilter pathFilter) throws IOException {
1048    FileStatus[] fds = fs.listStatus(dir, pathFilter);
1049    List<Path> files = new ArrayList<>(fds.length);
1050    for (FileStatus fdfs : fds) {
1051      Path fdPath = fdfs.getPath();
1052      files.add(fdPath);
1053    }
1054    return files;
1055  }
1056
1057  public static int getRegionReferenceAndLinkFileCount(final FileSystem fs, final Path p) {
1058    int result = 0;
1059    try {
1060      for (Path familyDir : getFamilyDirs(fs, p)) {
1061        result += getReferenceAndLinkFilePaths(fs, familyDir).size();
1062      }
1063    } catch (IOException e) {
1064      LOG.warn("Error Counting reference files.", e);
1065    }
1066    return result;
1067  }
1068
1069  public static class ReferenceAndLinkFileFilter implements PathFilter {
1070
1071    private final FileSystem fs;
1072
1073    public ReferenceAndLinkFileFilter(FileSystem fs) {
1074      this.fs = fs;
1075    }
1076
1077    @Override
1078    public boolean accept(Path rd) {
1079      try {
1080        // only files can be references.
1081        return !fs.getFileStatus(rd).isDirectory()
1082          && (StoreFileInfo.isReference(rd) || HFileLink.isHFileLink(rd));
1083      } catch (IOException ioe) {
1084        // Maybe the file was moved or the fs was disconnected.
1085        LOG.warn("Skipping file " + rd + " due to IOException", ioe);
1086        return false;
1087      }
1088    }
1089  }
1090
1091  /**
1092   * Filter for HFiles that excludes reference files.
1093   */
1094  public static class HFileFilter extends AbstractFileStatusFilter {
1095    final FileSystem fs;
1096
1097    public HFileFilter(FileSystem fs) {
1098      this.fs = fs;
1099    }
1100
1101    @Override
1102    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1103      if (!StoreFileInfo.isHFile(p) && !StoreFileInfo.isMobFile(p)) {
1104        return false;
1105      }
1106
1107      try {
1108        return isFile(fs, isDir, p);
1109      } catch (IOException ioe) {
1110        // Maybe the file was moved or the fs was disconnected.
1111        LOG.warn("Skipping file {} due to IOException", p, ioe);
1112        return false;
1113      }
1114    }
1115  }
1116
1117  /**
1118   * Filter for HFileLinks (StoreFiles and HFiles not included). the filter itself does not consider
1119   * if a link is file or not.
1120   */
1121  public static class HFileLinkFilter implements PathFilter {
1122
1123    @Override
1124    public boolean accept(Path p) {
1125      return HFileLink.isHFileLink(p);
1126    }
1127  }
1128
1129  public static class ReferenceFileFilter extends AbstractFileStatusFilter {
1130
1131    private final FileSystem fs;
1132
1133    public ReferenceFileFilter(FileSystem fs) {
1134      this.fs = fs;
1135    }
1136
1137    @Override
1138    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1139      if (!StoreFileInfo.isReference(p)) {
1140        return false;
1141      }
1142
1143      try {
1144        // only files can be references.
1145        return isFile(fs, isDir, p);
1146      } catch (IOException ioe) {
1147        // Maybe the file was moved or the fs was disconnected.
1148        LOG.warn("Skipping file {} due to IOException", p, ioe);
1149        return false;
1150      }
1151    }
1152  }
1153
1154  /**
1155   * Called every so-often by storefile map builder getTableStoreFilePathMap to report progress.
1156   */
1157  interface ProgressReporter {
1158    /**
1159     * @param status File or directory we are about to process.
1160     */
1161    void progress(FileStatus status);
1162  }
1163
1164  /**
1165   * Runs through the HBase rootdir/tablename and creates a reverse lookup map for table StoreFile
1166   * names to the full Path. <br>
1167   * Example...<br>
1168   * Key = 3944417774205889744 <br>
1169   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1170   * @param map          map to add values. If null, this method will create and populate one to
1171   *                     return
1172   * @param fs           The file system to use.
1173   * @param hbaseRootDir The root directory to scan.
1174   * @param tableName    name of the table to scan.
1175   * @return Map keyed by StoreFile name with a value of the full Path.
1176   * @throws IOException When scanning the directory fails. n
1177   */
1178  public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map,
1179    final FileSystem fs, final Path hbaseRootDir, TableName tableName)
1180    throws IOException, InterruptedException {
1181    return getTableStoreFilePathMap(map, fs, hbaseRootDir, tableName, null, null,
1182      (ProgressReporter) null);
1183  }
1184
1185  /**
1186   * Runs through the HBase rootdir/tablename and creates a reverse lookup map for table StoreFile
1187   * names to the full Path. Note that because this method can be called on a 'live' HBase system
1188   * that we will skip files that no longer exist by the time we traverse them and similarly the
1189   * user of the result needs to consider that some entries in this map may not exist by the time
1190   * this call completes. <br>
1191   * Example...<br>
1192   * Key = 3944417774205889744 <br>
1193   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1194   * @param resultMap        map to add values. If null, this method will create and populate one to
1195   *                         return
1196   * @param fs               The file system to use.
1197   * @param hbaseRootDir     The root directory to scan.
1198   * @param tableName        name of the table to scan.
1199   * @param sfFilter         optional path filter to apply to store files
1200   * @param executor         optional executor service to parallelize this operation
1201   * @param progressReporter Instance or null; gets called every time we move to new region of
1202   *                         family dir and for each store file.
1203   * @return Map keyed by StoreFile name with a value of the full Path.
1204   * @throws IOException When scanning the directory fails.
1205   * @deprecated Since 2.3.0. For removal in hbase4. Use ProgressReporter override instead.
1206   */
1207  @Deprecated
1208  public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap,
1209    final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter,
1210    ExecutorService executor, final HbckErrorReporter progressReporter)
1211    throws IOException, InterruptedException {
1212    return getTableStoreFilePathMap(resultMap, fs, hbaseRootDir, tableName, sfFilter, executor,
1213      new ProgressReporter() {
1214        @Override
1215        public void progress(FileStatus status) {
1216          // status is not used in this implementation.
1217          progressReporter.progress();
1218        }
1219      });
1220  }
1221
1222  /**
1223   * Runs through the HBase rootdir/tablename and creates a reverse lookup map for table StoreFile
1224   * names to the full Path. Note that because this method can be called on a 'live' HBase system
1225   * that we will skip files that no longer exist by the time we traverse them and similarly the
1226   * user of the result needs to consider that some entries in this map may not exist by the time
1227   * this call completes. <br>
1228   * Example...<br>
1229   * Key = 3944417774205889744 <br>
1230   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1231   * @param resultMap        map to add values. If null, this method will create and populate one to
1232   *                         return
1233   * @param fs               The file system to use.
1234   * @param hbaseRootDir     The root directory to scan.
1235   * @param tableName        name of the table to scan.
1236   * @param sfFilter         optional path filter to apply to store files
1237   * @param executor         optional executor service to parallelize this operation
1238   * @param progressReporter Instance or null; gets called every time we move to new region of
1239   *                         family dir and for each store file.
1240   * @return Map keyed by StoreFile name with a value of the full Path.
1241   * @throws IOException          When scanning the directory fails.
1242   * @throws InterruptedException the thread is interrupted, either before or during the activity.
1243   */
1244  public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap,
1245    final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter,
1246    ExecutorService executor, final ProgressReporter progressReporter)
1247    throws IOException, InterruptedException {
1248
1249    final Map<String, Path> finalResultMap =
1250      resultMap == null ? new ConcurrentHashMap<>(128, 0.75f, 32) : resultMap;
1251
1252    // only include the directory paths to tables
1253    Path tableDir = CommonFSUtils.getTableDir(hbaseRootDir, tableName);
1254    // Inside a table, there are compaction.dir directories to skip. Otherwise, all else
1255    // should be regions.
1256    final FamilyDirFilter familyFilter = new FamilyDirFilter(fs);
1257    final Vector<Exception> exceptions = new Vector<>();
1258
1259    try {
1260      List<FileStatus> regionDirs =
1261        FSUtils.listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs));
1262      if (regionDirs == null) {
1263        return finalResultMap;
1264      }
1265
1266      final List<Future<?>> futures = new ArrayList<>(regionDirs.size());
1267
1268      for (FileStatus regionDir : regionDirs) {
1269        if (null != progressReporter) {
1270          progressReporter.progress(regionDir);
1271        }
1272        final Path dd = regionDir.getPath();
1273
1274        if (!exceptions.isEmpty()) {
1275          break;
1276        }
1277
1278        Runnable getRegionStoreFileMapCall = new Runnable() {
1279          @Override
1280          public void run() {
1281            try {
1282              HashMap<String, Path> regionStoreFileMap = new HashMap<>();
1283              List<FileStatus> familyDirs =
1284                FSUtils.listStatusWithStatusFilter(fs, dd, familyFilter);
1285              if (familyDirs == null) {
1286                if (!fs.exists(dd)) {
1287                  LOG.warn("Skipping region because it no longer exists: " + dd);
1288                } else {
1289                  LOG.warn("Skipping region because it has no family dirs: " + dd);
1290                }
1291                return;
1292              }
1293              for (FileStatus familyDir : familyDirs) {
1294                if (null != progressReporter) {
1295                  progressReporter.progress(familyDir);
1296                }
1297                Path family = familyDir.getPath();
1298                if (family.getName().equals(HConstants.RECOVERED_EDITS_DIR)) {
1299                  continue;
1300                }
1301                // now in family, iterate over the StoreFiles and
1302                // put in map
1303                FileStatus[] familyStatus = fs.listStatus(family);
1304                for (FileStatus sfStatus : familyStatus) {
1305                  if (null != progressReporter) {
1306                    progressReporter.progress(sfStatus);
1307                  }
1308                  Path sf = sfStatus.getPath();
1309                  if (sfFilter == null || sfFilter.accept(sf)) {
1310                    regionStoreFileMap.put(sf.getName(), sf);
1311                  }
1312                }
1313              }
1314              finalResultMap.putAll(regionStoreFileMap);
1315            } catch (Exception e) {
1316              LOG.error("Could not get region store file map for region: " + dd, e);
1317              exceptions.add(e);
1318            }
1319          }
1320        };
1321
1322        // If executor is available, submit async tasks to exec concurrently, otherwise
1323        // just do serial sync execution
1324        if (executor != null) {
1325          Future<?> future = executor.submit(getRegionStoreFileMapCall);
1326          futures.add(future);
1327        } else {
1328          FutureTask<?> future = new FutureTask<>(getRegionStoreFileMapCall, null);
1329          future.run();
1330          futures.add(future);
1331        }
1332      }
1333
1334      // Ensure all pending tasks are complete (or that we run into an exception)
1335      for (Future<?> f : futures) {
1336        if (!exceptions.isEmpty()) {
1337          break;
1338        }
1339        try {
1340          f.get();
1341        } catch (ExecutionException e) {
1342          LOG.error("Unexpected exec exception!  Should've been caught already.  (Bug?)", e);
1343          // Shouldn't happen, we already logged/caught any exceptions in the Runnable
1344        }
1345      }
1346    } catch (IOException e) {
1347      LOG.error("Cannot execute getTableStoreFilePathMap for " + tableName, e);
1348      exceptions.add(e);
1349    } finally {
1350      if (!exceptions.isEmpty()) {
1351        // Just throw the first exception as an indication something bad happened
1352        // Don't need to propagate all the exceptions, we already logged them all anyway
1353        Throwables.propagateIfPossible(exceptions.firstElement(), IOException.class);
1354        throw new IOException(exceptions.firstElement());
1355      }
1356    }
1357
1358    return finalResultMap;
1359  }
1360
1361  public static int getRegionReferenceFileCount(final FileSystem fs, final Path p) {
1362    int result = 0;
1363    try {
1364      for (Path familyDir : getFamilyDirs(fs, p)) {
1365        result += getReferenceFilePaths(fs, familyDir).size();
1366      }
1367    } catch (IOException e) {
1368      LOG.warn("Error counting reference files", e);
1369    }
1370    return result;
1371  }
1372
1373  /**
1374   * Runs through the HBase rootdir and creates a reverse lookup map for table StoreFile names to
1375   * the full Path. <br>
1376   * Example...<br>
1377   * Key = 3944417774205889744 <br>
1378   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1379   * @param fs           The file system to use.
1380   * @param hbaseRootDir The root directory to scan.
1381   * @return Map keyed by StoreFile name with a value of the full Path.
1382   * @throws IOException When scanning the directory fails.
1383   */
1384  public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs,
1385    final Path hbaseRootDir) throws IOException, InterruptedException {
1386    return getTableStoreFilePathMap(fs, hbaseRootDir, null, null, (ProgressReporter) null);
1387  }
1388
1389  /**
1390   * Runs through the HBase rootdir and creates a reverse lookup map for table StoreFile names to
1391   * the full Path. <br>
1392   * Example...<br>
1393   * Key = 3944417774205889744 <br>
1394   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1395   * @param fs               The file system to use.
1396   * @param hbaseRootDir     The root directory to scan.
1397   * @param sfFilter         optional path filter to apply to store files
1398   * @param executor         optional executor service to parallelize this operation
1399   * @param progressReporter Instance or null; gets called every time we move to new region of
1400   *                         family dir and for each store file.
1401   * @return Map keyed by StoreFile name with a value of the full Path.
1402   * @throws IOException When scanning the directory fails.
1403   * @deprecated Since 2.3.0. Will be removed in hbase4. Used
1404   *             {@link #getTableStoreFilePathMap(FileSystem, Path, PathFilter, ExecutorService, ProgressReporter)}
1405   */
1406  @Deprecated
1407  public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs,
1408    final Path hbaseRootDir, PathFilter sfFilter, ExecutorService executor,
1409    HbckErrorReporter progressReporter) throws IOException, InterruptedException {
1410    return getTableStoreFilePathMap(fs, hbaseRootDir, sfFilter, executor, new ProgressReporter() {
1411      @Override
1412      public void progress(FileStatus status) {
1413        // status is not used in this implementation.
1414        progressReporter.progress();
1415      }
1416    });
1417  }
1418
1419  /**
1420   * Runs through the HBase rootdir and creates a reverse lookup map for table StoreFile names to
1421   * the full Path. <br>
1422   * Example...<br>
1423   * Key = 3944417774205889744 <br>
1424   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1425   * @param fs               The file system to use.
1426   * @param hbaseRootDir     The root directory to scan.
1427   * @param sfFilter         optional path filter to apply to store files
1428   * @param executor         optional executor service to parallelize this operation
1429   * @param progressReporter Instance or null; gets called every time we move to new region of
1430   *                         family dir and for each store file.
1431   * @return Map keyed by StoreFile name with a value of the full Path.
1432   * @throws IOException When scanning the directory fails. n
1433   */
1434  public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs,
1435    final Path hbaseRootDir, PathFilter sfFilter, ExecutorService executor,
1436    ProgressReporter progressReporter) throws IOException, InterruptedException {
1437    ConcurrentHashMap<String, Path> map = new ConcurrentHashMap<>(1024, 0.75f, 32);
1438
1439    // if this method looks similar to 'getTableFragmentation' that is because
1440    // it was borrowed from it.
1441
1442    // only include the directory paths to tables
1443    for (Path tableDir : FSUtils.getTableDirs(fs, hbaseRootDir)) {
1444      getTableStoreFilePathMap(map, fs, hbaseRootDir, CommonFSUtils.getTableName(tableDir),
1445        sfFilter, executor, progressReporter);
1446    }
1447    return map;
1448  }
1449
1450  /**
1451   * Filters FileStatuses in an array and returns a list
1452   * @param input  An array of FileStatuses
1453   * @param filter A required filter to filter the array
1454   * @return A list of FileStatuses
1455   */
1456  public static List<FileStatus> filterFileStatuses(FileStatus[] input, FileStatusFilter filter) {
1457    if (input == null) return null;
1458    return filterFileStatuses(Iterators.forArray(input), filter);
1459  }
1460
1461  /**
1462   * Filters FileStatuses in an iterator and returns a list
1463   * @param input  An iterator of FileStatuses
1464   * @param filter A required filter to filter the array
1465   * @return A list of FileStatuses
1466   */
1467  public static List<FileStatus> filterFileStatuses(Iterator<FileStatus> input,
1468    FileStatusFilter filter) {
1469    if (input == null) return null;
1470    ArrayList<FileStatus> results = new ArrayList<>();
1471    while (input.hasNext()) {
1472      FileStatus f = input.next();
1473      if (filter.accept(f)) {
1474        results.add(f);
1475      }
1476    }
1477    return results;
1478  }
1479
1480  /**
1481   * Calls fs.listStatus() and treats FileNotFoundException as non-fatal This accommodates
1482   * differences between hadoop versions, where hadoop 1 does not throw a FileNotFoundException, and
1483   * return an empty FileStatus[] while Hadoop 2 will throw FileNotFoundException.
1484   * @param fs     file system
1485   * @param dir    directory
1486   * @param filter file status filter
1487   * @return null if dir is empty or doesn't exist, otherwise FileStatus list
1488   */
1489  public static List<FileStatus> listStatusWithStatusFilter(final FileSystem fs, final Path dir,
1490    final FileStatusFilter filter) throws IOException {
1491    FileStatus[] status = null;
1492    try {
1493      status = fs.listStatus(dir);
1494    } catch (FileNotFoundException fnfe) {
1495      LOG.trace("{} does not exist", dir);
1496      return null;
1497    }
1498
1499    if (ArrayUtils.getLength(status) == 0) {
1500      return null;
1501    }
1502
1503    if (filter == null) {
1504      return Arrays.asList(status);
1505    } else {
1506      List<FileStatus> status2 = filterFileStatuses(status, filter);
1507      if (status2 == null || status2.isEmpty()) {
1508        return null;
1509      } else {
1510        return status2;
1511      }
1512    }
1513  }
1514
1515  /**
1516   * This function is to scan the root path of the file system to get the degree of locality for
1517   * each region on each of the servers having at least one block of that region. This is used by
1518   * the tool {@link org.apache.hadoop.hbase.master.RegionPlacementMaintainer} n * the configuration
1519   * to use
1520   * @return the mapping from region encoded name to a map of server names to locality fraction n *
1521   *         in case of file system errors or interrupts
1522   */
1523  public static Map<String, Map<String, Float>>
1524    getRegionDegreeLocalityMappingFromFS(final Configuration conf) throws IOException {
1525    return getRegionDegreeLocalityMappingFromFS(conf, null,
1526      conf.getInt(THREAD_POOLSIZE, DEFAULT_THREAD_POOLSIZE));
1527
1528  }
1529
1530  /**
1531   * This function is to scan the root path of the file system to get the degree of locality for
1532   * each region on each of the servers having at least one block of that region. n * the
1533   * configuration to use n * the table you wish to scan locality for n * the thread pool size to
1534   * use
1535   * @return the mapping from region encoded name to a map of server names to locality fraction n *
1536   *         in case of file system errors or interrupts
1537   */
1538  public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
1539    final Configuration conf, final String desiredTable, int threadPoolSize) throws IOException {
1540    Map<String, Map<String, Float>> regionDegreeLocalityMapping = new ConcurrentHashMap<>();
1541    getRegionLocalityMappingFromFS(conf, desiredTable, threadPoolSize, regionDegreeLocalityMapping);
1542    return regionDegreeLocalityMapping;
1543  }
1544
1545  /**
1546   * This function is to scan the root path of the file system to get either the mapping between the
1547   * region name and its best locality region server or the degree of locality of each region on
1548   * each of the servers having at least one block of that region. The output map parameters are
1549   * both optional. n * the configuration to use n * the table you wish to scan locality for n * the
1550   * thread pool size to use n * the map into which to put the locality degree mapping or null, must
1551   * be a thread-safe implementation n * in case of file system errors or interrupts
1552   */
1553  private static void getRegionLocalityMappingFromFS(final Configuration conf,
1554    final String desiredTable, int threadPoolSize,
1555    final Map<String, Map<String, Float>> regionDegreeLocalityMapping) throws IOException {
1556    final FileSystem fs = FileSystem.get(conf);
1557    final Path rootPath = CommonFSUtils.getRootDir(conf);
1558    final long startTime = EnvironmentEdgeManager.currentTime();
1559    final Path queryPath;
1560    // The table files are in ${hbase.rootdir}/data/<namespace>/<table>/*
1561    if (null == desiredTable) {
1562      queryPath =
1563        new Path(new Path(rootPath, HConstants.BASE_NAMESPACE_DIR).toString() + "/*/*/*/");
1564    } else {
1565      queryPath = new Path(
1566        CommonFSUtils.getTableDir(rootPath, TableName.valueOf(desiredTable)).toString() + "/*/");
1567    }
1568
1569    // reject all paths that are not appropriate
1570    PathFilter pathFilter = new PathFilter() {
1571      @Override
1572      public boolean accept(Path path) {
1573        // this is the region name; it may get some noise data
1574        if (null == path) {
1575          return false;
1576        }
1577
1578        // no parent?
1579        Path parent = path.getParent();
1580        if (null == parent) {
1581          return false;
1582        }
1583
1584        String regionName = path.getName();
1585        if (null == regionName) {
1586          return false;
1587        }
1588
1589        if (!regionName.toLowerCase(Locale.ROOT).matches("[0-9a-f]+")) {
1590          return false;
1591        }
1592        return true;
1593      }
1594    };
1595
1596    FileStatus[] statusList = fs.globStatus(queryPath, pathFilter);
1597
1598    if (LOG.isDebugEnabled()) {
1599      LOG.debug("Query Path: {} ; # list of files: {}", queryPath, Arrays.toString(statusList));
1600    }
1601
1602    if (null == statusList) {
1603      return;
1604    }
1605
1606    // lower the number of threads in case we have very few expected regions
1607    threadPoolSize = Math.min(threadPoolSize, statusList.length);
1608
1609    // run in multiple threads
1610    final ExecutorService tpe = Executors.newFixedThreadPool(threadPoolSize,
1611      new ThreadFactoryBuilder().setNameFormat("FSRegionQuery-pool-%d").setDaemon(true)
1612        .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
1613    try {
1614      // ignore all file status items that are not of interest
1615      for (FileStatus regionStatus : statusList) {
1616        if (null == regionStatus || !regionStatus.isDirectory()) {
1617          continue;
1618        }
1619
1620        final Path regionPath = regionStatus.getPath();
1621        if (null != regionPath) {
1622          tpe.execute(new FSRegionScanner(fs, regionPath, null, regionDegreeLocalityMapping));
1623        }
1624      }
1625    } finally {
1626      tpe.shutdown();
1627      final long threadWakeFrequency = (long) conf.getInt(HConstants.THREAD_WAKE_FREQUENCY,
1628        HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
1629      try {
1630        // here we wait until TPE terminates, which is either naturally or by
1631        // exceptions in the execution of the threads
1632        while (!tpe.awaitTermination(threadWakeFrequency, TimeUnit.MILLISECONDS)) {
1633          // printing out rough estimate, so as to not introduce
1634          // AtomicInteger
1635          LOG.info("Locality checking is underway: { Scanned Regions : "
1636            + ((ThreadPoolExecutor) tpe).getCompletedTaskCount() + "/"
1637            + ((ThreadPoolExecutor) tpe).getTaskCount() + " }");
1638        }
1639      } catch (InterruptedException e) {
1640        Thread.currentThread().interrupt();
1641        throw (InterruptedIOException) new InterruptedIOException().initCause(e);
1642      }
1643    }
1644
1645    long overhead = EnvironmentEdgeManager.currentTime() - startTime;
1646    LOG.info("Scan DFS for locality info takes {}ms", overhead);
1647  }
1648
1649  /**
1650   * Do our short circuit read setup. Checks buffer size to use and whether to do checksumming in
1651   * hbase or hdfs. n
1652   */
1653  public static void setupShortCircuitRead(final Configuration conf) {
1654    // Check that the user has not set the "dfs.client.read.shortcircuit.skip.checksum" property.
1655    boolean shortCircuitSkipChecksum =
1656      conf.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false);
1657    boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
1658    if (shortCircuitSkipChecksum) {
1659      LOG.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not "
1660        + "be set to true."
1661        + (useHBaseChecksum
1662          ? " HBase checksum doesn't require "
1663            + "it, see https://issues.apache.org/jira/browse/HBASE-6868."
1664          : ""));
1665      assert !shortCircuitSkipChecksum; // this will fail if assertions are on
1666    }
1667    checkShortCircuitReadBufferSize(conf);
1668  }
1669
1670  /**
1671   * Check if short circuit read buffer size is set and if not, set it to hbase value. n
1672   */
1673  public static void checkShortCircuitReadBufferSize(final Configuration conf) {
1674    final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2;
1675    final int notSet = -1;
1676    // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2
1677    final String dfsKey = "dfs.client.read.shortcircuit.buffer.size";
1678    int size = conf.getInt(dfsKey, notSet);
1679    // If a size is set, return -- we will use it.
1680    if (size != notSet) return;
1681    // But short circuit buffer size is normally not set. Put in place the hbase wanted size.
1682    int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize);
1683    conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
1684  }
1685
1686  /**
1687   * n * @return The DFSClient DFSHedgedReadMetrics instance or null if can't be found or not on
1688   * hdfs. n
1689   */
1690  public static DFSHedgedReadMetrics getDFSHedgedReadMetrics(final Configuration c)
1691    throws IOException {
1692    if (!CommonFSUtils.isHDFS(c)) {
1693      return null;
1694    }
1695    // getHedgedReadMetrics is package private. Get the DFSClient instance that is internal
1696    // to the DFS FS instance and make the method getHedgedReadMetrics accessible, then invoke it
1697    // to get the singleton instance of DFSHedgedReadMetrics shared by DFSClients.
1698    final String name = "getHedgedReadMetrics";
1699    DFSClient dfsclient = ((DistributedFileSystem) FileSystem.get(c)).getClient();
1700    Method m;
1701    try {
1702      m = dfsclient.getClass().getDeclaredMethod(name);
1703    } catch (NoSuchMethodException e) {
1704      LOG.warn(
1705        "Failed find method " + name + " in dfsclient; no hedged read metrics: " + e.getMessage());
1706      return null;
1707    } catch (SecurityException e) {
1708      LOG.warn(
1709        "Failed find method " + name + " in dfsclient; no hedged read metrics: " + e.getMessage());
1710      return null;
1711    }
1712    m.setAccessible(true);
1713    try {
1714      return (DFSHedgedReadMetrics) m.invoke(dfsclient);
1715    } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
1716      LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: "
1717        + e.getMessage());
1718      return null;
1719    }
1720  }
1721
1722  public static List<Path> copyFilesParallel(FileSystem srcFS, Path src, FileSystem dstFS, Path dst,
1723    Configuration conf, int threads) throws IOException {
1724    ExecutorService pool = Executors.newFixedThreadPool(threads);
1725    List<Future<Void>> futures = new ArrayList<>();
1726    List<Path> traversedPaths;
1727    try {
1728      traversedPaths = copyFiles(srcFS, src, dstFS, dst, conf, pool, futures);
1729      for (Future<Void> future : futures) {
1730        future.get();
1731      }
1732    } catch (ExecutionException | InterruptedException | IOException e) {
1733      throw new IOException("Copy snapshot reference files failed", e);
1734    } finally {
1735      pool.shutdownNow();
1736    }
1737    return traversedPaths;
1738  }
1739
1740  private static List<Path> copyFiles(FileSystem srcFS, Path src, FileSystem dstFS, Path dst,
1741    Configuration conf, ExecutorService pool, List<Future<Void>> futures) throws IOException {
1742    List<Path> traversedPaths = new ArrayList<>();
1743    traversedPaths.add(dst);
1744    FileStatus currentFileStatus = srcFS.getFileStatus(src);
1745    if (currentFileStatus.isDirectory()) {
1746      if (!dstFS.mkdirs(dst)) {
1747        throw new IOException("Create directory failed: " + dst);
1748      }
1749      FileStatus[] subPaths = srcFS.listStatus(src);
1750      for (FileStatus subPath : subPaths) {
1751        traversedPaths.addAll(copyFiles(srcFS, subPath.getPath(), dstFS,
1752          new Path(dst, subPath.getPath().getName()), conf, pool, futures));
1753      }
1754    } else {
1755      Future<Void> future = pool.submit(() -> {
1756        FileUtil.copy(srcFS, src, dstFS, dst, false, false, conf);
1757        return null;
1758      });
1759      futures.add(future);
1760    }
1761    return traversedPaths;
1762  }
1763
1764  /** Returns A set containing all namenode addresses of fs */
1765  private static Set<InetSocketAddress> getNNAddresses(DistributedFileSystem fs,
1766    Configuration conf) {
1767    Set<InetSocketAddress> addresses = new HashSet<>();
1768    String serviceName = fs.getCanonicalServiceName();
1769
1770    if (serviceName.startsWith("ha-hdfs")) {
1771      try {
1772        Map<String, Map<String, InetSocketAddress>> addressMap =
1773          DFSUtil.getNNServiceRpcAddressesForCluster(conf);
1774        String nameService = serviceName.substring(serviceName.indexOf(":") + 1);
1775        if (addressMap.containsKey(nameService)) {
1776          Map<String, InetSocketAddress> nnMap = addressMap.get(nameService);
1777          for (Map.Entry<String, InetSocketAddress> e2 : nnMap.entrySet()) {
1778            InetSocketAddress addr = e2.getValue();
1779            addresses.add(addr);
1780          }
1781        }
1782      } catch (Exception e) {
1783        LOG.warn("DFSUtil.getNNServiceRpcAddresses failed. serviceName=" + serviceName, e);
1784      }
1785    } else {
1786      URI uri = fs.getUri();
1787      int port = uri.getPort();
1788      if (port < 0) {
1789        int idx = serviceName.indexOf(':');
1790        port = Integer.parseInt(serviceName.substring(idx + 1));
1791      }
1792      InetSocketAddress addr = new InetSocketAddress(uri.getHost(), port);
1793      addresses.add(addr);
1794    }
1795
1796    return addresses;
1797  }
1798
1799  /**
1800   * @param conf the Configuration of HBase
1801   * @return Whether srcFs and desFs are on same hdfs or not
1802   */
1803  public static boolean isSameHdfs(Configuration conf, FileSystem srcFs, FileSystem desFs) {
1804    // By getCanonicalServiceName, we could make sure both srcFs and desFs
1805    // show a unified format which contains scheme, host and port.
1806    String srcServiceName = srcFs.getCanonicalServiceName();
1807    String desServiceName = desFs.getCanonicalServiceName();
1808
1809    if (srcServiceName == null || desServiceName == null) {
1810      return false;
1811    }
1812    if (srcServiceName.equals(desServiceName)) {
1813      return true;
1814    }
1815    if (srcServiceName.startsWith("ha-hdfs") && desServiceName.startsWith("ha-hdfs")) {
1816      Collection<String> internalNameServices =
1817        conf.getTrimmedStringCollection("dfs.internal.nameservices");
1818      if (!internalNameServices.isEmpty()) {
1819        if (internalNameServices.contains(srcServiceName.split(":")[1])) {
1820          return true;
1821        } else {
1822          return false;
1823        }
1824      }
1825    }
1826    if (srcFs instanceof DistributedFileSystem && desFs instanceof DistributedFileSystem) {
1827      // If one serviceName is an HA format while the other is a non-HA format,
1828      // maybe they refer to the same FileSystem.
1829      // For example, srcFs is "ha-hdfs://nameservices" and desFs is "hdfs://activeNamenode:port"
1830      Set<InetSocketAddress> srcAddrs = getNNAddresses((DistributedFileSystem) srcFs, conf);
1831      Set<InetSocketAddress> desAddrs = getNNAddresses((DistributedFileSystem) desFs, conf);
1832      if (Sets.intersection(srcAddrs, desAddrs).size() > 0) {
1833        return true;
1834      }
1835    }
1836
1837    return false;
1838  }
1839}