001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import static org.apache.hadoop.hbase.util.LocatedBlockHelper.getLocatedBlockLocations;
021import static org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET;
022
023import edu.umd.cs.findbugs.annotations.CheckForNull;
024import java.io.ByteArrayInputStream;
025import java.io.DataInputStream;
026import java.io.EOFException;
027import java.io.FileNotFoundException;
028import java.io.IOException;
029import java.io.InterruptedIOException;
030import java.lang.reflect.InvocationTargetException;
031import java.lang.reflect.Method;
032import java.net.InetSocketAddress;
033import java.net.URI;
034import java.util.ArrayList;
035import java.util.Arrays;
036import java.util.Collection;
037import java.util.Collections;
038import java.util.HashMap;
039import java.util.HashSet;
040import java.util.Iterator;
041import java.util.List;
042import java.util.Locale;
043import java.util.Map;
044import java.util.Optional;
045import java.util.Set;
046import java.util.Vector;
047import java.util.concurrent.ConcurrentHashMap;
048import java.util.concurrent.ExecutionException;
049import java.util.concurrent.ExecutorService;
050import java.util.concurrent.Executors;
051import java.util.concurrent.Future;
052import java.util.concurrent.FutureTask;
053import java.util.concurrent.ThreadPoolExecutor;
054import java.util.concurrent.TimeUnit;
055import java.util.regex.Pattern;
056import org.apache.commons.lang3.ArrayUtils;
057import org.apache.hadoop.conf.Configuration;
058import org.apache.hadoop.fs.BlockLocation;
059import org.apache.hadoop.fs.FSDataInputStream;
060import org.apache.hadoop.fs.FSDataOutputStream;
061import org.apache.hadoop.fs.FileStatus;
062import org.apache.hadoop.fs.FileSystem;
063import org.apache.hadoop.fs.FileUtil;
064import org.apache.hadoop.fs.Path;
065import org.apache.hadoop.fs.PathFilter;
066import org.apache.hadoop.fs.StorageType;
067import org.apache.hadoop.fs.permission.FsPermission;
068import org.apache.hadoop.hbase.ClusterId;
069import org.apache.hadoop.hbase.HConstants;
070import org.apache.hadoop.hbase.HDFSBlocksDistribution;
071import org.apache.hadoop.hbase.TableName;
072import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
073import org.apache.hadoop.hbase.client.RegionInfo;
074import org.apache.hadoop.hbase.client.RegionInfoBuilder;
075import org.apache.hadoop.hbase.exceptions.DeserializationException;
076import org.apache.hadoop.hbase.fs.HFileSystem;
077import org.apache.hadoop.hbase.io.HFileLink;
078import org.apache.hadoop.hbase.master.HMaster;
079import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
080import org.apache.hadoop.hdfs.DFSClient;
081import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
082import org.apache.hadoop.hdfs.DFSUtil;
083import org.apache.hadoop.hdfs.DistributedFileSystem;
084import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
085import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
086import org.apache.hadoop.hdfs.protocol.LocatedBlock;
087import org.apache.hadoop.io.IOUtils;
088import org.apache.hadoop.ipc.RemoteException;
089import org.apache.yetus.audience.InterfaceAudience;
090import org.slf4j.Logger;
091import org.slf4j.LoggerFactory;
092
093import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
094import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
095import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
096import org.apache.hbase.thirdparty.com.google.common.primitives.Ints;
097import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
098
099import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
100import org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos;
101
102/**
103 * Utility methods for interacting with the underlying file system.
104 */
105@InterfaceAudience.Private
106public final class FSUtils {
107  private static final Logger LOG = LoggerFactory.getLogger(FSUtils.class);
108
109  private static final String THREAD_POOLSIZE = "hbase.client.localityCheck.threadPoolSize";
110  private static final int DEFAULT_THREAD_POOLSIZE = 2;
111
112  /** Set to true on Windows platforms */
113  // currently only used in testing. TODO refactor into a test class
114  public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
115
116  private static Class<?> safeModeClazz = null;
117  private static Class<?> safeModeActionClazz = null;
118  private static Object safeModeGet = null;
119  {
120    try {
121      safeModeClazz = Class.forName("org.apache.hadoop.fs.SafeMode");
122      safeModeActionClazz = Class.forName("org.apache.hadoop.fs.SafeModeAction");
123      safeModeGet = safeModeClazz.getField("SAFEMODE_GET").get(null);
124    } catch (ClassNotFoundException | NoSuchFieldException e) {
125      LOG.debug("SafeMode interface not in the classpath, this means Hadoop 3.3.5 or below.");
126    } catch (IllegalAccessException e) {
127      LOG.error("SafeModeAction.SAFEMODE_GET is not accessible. "
128        + "Unexpected Hadoop version or messy classpath?", e);
129      throw new RuntimeException(e);
130    }
131  }
132
133  private FSUtils() {
134  }
135
136  /** Returns True is <code>fs</code> is instance of DistributedFileSystem n */
137  public static boolean isDistributedFileSystem(final FileSystem fs) throws IOException {
138    FileSystem fileSystem = fs;
139    // If passed an instance of HFileSystem, it fails instanceof DistributedFileSystem.
140    // Check its backing fs for dfs-ness.
141    if (fs instanceof HFileSystem) {
142      fileSystem = ((HFileSystem) fs).getBackingFs();
143    }
144    return fileSystem instanceof DistributedFileSystem;
145  }
146
147  /**
148   * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
149   * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true. Does not consider
150   * schema; i.e. if schemas different but path or subpath matches, the two will equate.
151   * @param pathToSearch Path we will be trying to match.
152   * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
153   */
154  public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) {
155    Path tailPath = pathTail;
156    String tailName;
157    Path toSearch = pathToSearch;
158    String toSearchName;
159    boolean result = false;
160
161    if (pathToSearch.depth() != pathTail.depth()) {
162      return false;
163    }
164
165    do {
166      tailName = tailPath.getName();
167      if (tailName == null || tailName.isEmpty()) {
168        result = true;
169        break;
170      }
171      toSearchName = toSearch.getName();
172      if (toSearchName == null || toSearchName.isEmpty()) {
173        break;
174      }
175      // Move up a parent on each path for next go around. Path doesn't let us go off the end.
176      tailPath = tailPath.getParent();
177      toSearch = toSearch.getParent();
178    } while (tailName.equals(toSearchName));
179    return result;
180  }
181
182  /**
183   * Delete the region directory if exists.
184   * @return True if deleted the region directory.
185   */
186  public static boolean deleteRegionDir(final Configuration conf, final RegionInfo hri)
187    throws IOException {
188    Path rootDir = CommonFSUtils.getRootDir(conf);
189    FileSystem fs = rootDir.getFileSystem(conf);
190    return CommonFSUtils.deleteDirectory(fs,
191      new Path(CommonFSUtils.getTableDir(rootDir, hri.getTable()), hri.getEncodedName()));
192  }
193
194  /**
195   * Create the specified file on the filesystem. By default, this will:
196   * <ol>
197   * <li>overwrite the file if it exists</li>
198   * <li>apply the umask in the configuration (if it is enabled)</li>
199   * <li>use the fs configured buffer size (or 4096 if not set)</li>
200   * <li>use the configured column family replication or default replication if
201   * {@link ColumnFamilyDescriptorBuilder#DEFAULT_DFS_REPLICATION}</li>
202   * <li>use the default block size</li>
203   * <li>not track progress</li>
204   * </ol>
205   * @param conf         configurations
206   * @param fs           {@link FileSystem} on which to write the file
207   * @param path         {@link Path} to the file to write
208   * @param perm         permissions
209   * @param favoredNodes favored data nodes
210   * @return output stream to the created file
211   * @throws IOException if the file cannot be created
212   */
213  public static FSDataOutputStream create(Configuration conf, FileSystem fs, Path path,
214    FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException {
215    return create(conf, fs, path, perm, favoredNodes, true);
216  }
217
218  /**
219   * Create the specified file on the filesystem. By default, this will:
220   * <ol>
221   * <li>overwrite the file if it exists</li>
222   * <li>apply the umask in the configuration (if it is enabled)</li>
223   * <li>use the fs configured buffer size (or 4096 if not set)</li>
224   * <li>use the configured column family replication or default replication if
225   * {@link ColumnFamilyDescriptorBuilder#DEFAULT_DFS_REPLICATION}</li>
226   * <li>use the default block size</li>
227   * <li>not track progress</li>
228   * </ol>
229   * @param conf              configurations
230   * @param fs                {@link FileSystem} on which to write the file
231   * @param path              {@link Path} to the file to write
232   * @param perm              permissions
233   * @param favoredNodes      favored data nodes
234   * @param isRecursiveCreate recursively create parent directories
235   * @return output stream to the created file
236   * @throws IOException if the file cannot be created
237   */
238  public static FSDataOutputStream create(Configuration conf, FileSystem fs, Path path,
239    FsPermission perm, InetSocketAddress[] favoredNodes, boolean isRecursiveCreate)
240    throws IOException {
241    if (fs instanceof HFileSystem) {
242      FileSystem backingFs = ((HFileSystem) fs).getBackingFs();
243      if (backingFs instanceof DistributedFileSystem) {
244        short replication = Short.parseShort(conf.get(ColumnFamilyDescriptorBuilder.DFS_REPLICATION,
245          String.valueOf(ColumnFamilyDescriptorBuilder.DEFAULT_DFS_REPLICATION)));
246        DistributedFileSystem.HdfsDataOutputStreamBuilder builder =
247          ((DistributedFileSystem) backingFs).createFile(path).recursive().permission(perm)
248            .create();
249        if (favoredNodes != null) {
250          builder.favoredNodes(favoredNodes);
251        }
252        if (replication > 0) {
253          builder.replication(replication);
254        }
255        return builder.build();
256      }
257
258    }
259    return CommonFSUtils.create(fs, path, perm, true, isRecursiveCreate);
260  }
261
262  /**
263   * Checks to see if the specified file system is available
264   * @param fs filesystem
265   * @throws IOException e
266   */
267  public static void checkFileSystemAvailable(final FileSystem fs) throws IOException {
268    if (!(fs instanceof DistributedFileSystem)) {
269      return;
270    }
271    IOException exception = null;
272    DistributedFileSystem dfs = (DistributedFileSystem) fs;
273    try {
274      if (dfs.exists(new Path("/"))) {
275        return;
276      }
277    } catch (IOException e) {
278      exception = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
279    }
280    try {
281      fs.close();
282    } catch (Exception e) {
283      LOG.error("file system close failed: ", e);
284    }
285    throw new IOException("File system is not available", exception);
286  }
287
288  /**
289   * Inquire the Active NameNode's safe mode status.
290   * @param dfs A DistributedFileSystem object representing the underlying HDFS.
291   * @return whether we're in safe mode
292   */
293  private static boolean isInSafeMode(FileSystem dfs) throws IOException {
294    if (isDistributedFileSystem(dfs)) {
295      return ((DistributedFileSystem) dfs).setSafeMode(SAFEMODE_GET, true);
296    } else {
297      try {
298        Object ret = dfs.getClass()
299          .getMethod("setSafeMode", new Class[] { safeModeActionClazz, Boolean.class })
300          .invoke(dfs, safeModeGet, true);
301        return (Boolean) ret;
302      } catch (InvocationTargetException | IllegalAccessException | NoSuchMethodException e) {
303        LOG.error("The file system does not support setSafeMode(). Abort.", e);
304        throw new RuntimeException(e);
305      }
306    }
307  }
308
309  /**
310   * Check whether dfs is in safemode.
311   */
312  public static void checkDfsSafeMode(final Configuration conf) throws IOException {
313    boolean isInSafeMode = false;
314    FileSystem fs = FileSystem.get(conf);
315    if (supportSafeMode(fs)) {
316      isInSafeMode = isInSafeMode(fs);
317    }
318    if (isInSafeMode) {
319      throw new IOException("File system is in safemode, it can't be written now");
320    }
321  }
322
323  /**
324   * Verifies current version of file system
325   * @param fs      filesystem object
326   * @param rootdir root hbase directory
327   * @return null if no version file exists, version string otherwise
328   * @throws IOException              if the version file fails to open
329   * @throws DeserializationException if the version data cannot be translated into a version
330   */
331  public static String getVersion(FileSystem fs, Path rootdir)
332    throws IOException, DeserializationException {
333    final Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
334    FileStatus[] status = null;
335    try {
336      // hadoop 2.0 throws FNFE if directory does not exist.
337      // hadoop 1.0 returns null if directory does not exist.
338      status = fs.listStatus(versionFile);
339    } catch (FileNotFoundException fnfe) {
340      return null;
341    }
342    if (ArrayUtils.getLength(status) == 0) {
343      return null;
344    }
345    String version = null;
346    byte[] content = new byte[(int) status[0].getLen()];
347    FSDataInputStream s = fs.open(versionFile);
348    try {
349      IOUtils.readFully(s, content, 0, content.length);
350      if (ProtobufUtil.isPBMagicPrefix(content)) {
351        version = parseVersionFrom(content);
352      } else {
353        // Presume it pre-pb format.
354        try (DataInputStream dis = new DataInputStream(new ByteArrayInputStream(content))) {
355          version = dis.readUTF();
356        }
357      }
358    } catch (EOFException eof) {
359      LOG.warn("Version file was empty, odd, will try to set it.");
360    } finally {
361      s.close();
362    }
363    return version;
364  }
365
366  /**
367   * Parse the content of the ${HBASE_ROOTDIR}/hbase.version file.
368   * @param bytes The byte content of the hbase.version file
369   * @return The version found in the file as a String
370   * @throws DeserializationException if the version data cannot be translated into a version
371   */
372  static String parseVersionFrom(final byte[] bytes) throws DeserializationException {
373    ProtobufUtil.expectPBMagicPrefix(bytes);
374    int pblen = ProtobufUtil.lengthOfPBMagic();
375    FSProtos.HBaseVersionFileContent.Builder builder =
376      FSProtos.HBaseVersionFileContent.newBuilder();
377    try {
378      ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
379      return builder.getVersion();
380    } catch (IOException e) {
381      // Convert
382      throw new DeserializationException(e);
383    }
384  }
385
386  /**
387   * Create the content to write into the ${HBASE_ROOTDIR}/hbase.version file.
388   * @param version Version to persist
389   * @return Serialized protobuf with <code>version</code> content and a bit of pb magic for a
390   *         prefix.
391   */
392  static byte[] toVersionByteArray(final String version) {
393    FSProtos.HBaseVersionFileContent.Builder builder =
394      FSProtos.HBaseVersionFileContent.newBuilder();
395    return ProtobufUtil.prependPBMagic(builder.setVersion(version).build().toByteArray());
396  }
397
398  /**
399   * Verifies current version of file system
400   * @param fs      file system
401   * @param rootdir root directory of HBase installation
402   * @param message if true, issues a message on System.out
403   * @throws IOException              if the version file cannot be opened
404   * @throws DeserializationException if the contents of the version file cannot be parsed
405   */
406  public static void checkVersion(FileSystem fs, Path rootdir, boolean message)
407    throws IOException, DeserializationException {
408    checkVersion(fs, rootdir, message, 0, HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
409  }
410
411  /**
412   * Verifies current version of file system
413   * @param fs      file system
414   * @param rootdir root directory of HBase installation
415   * @param message if true, issues a message on System.out
416   * @param wait    wait interval
417   * @param retries number of times to retry
418   * @throws IOException              if the version file cannot be opened
419   * @throws DeserializationException if the contents of the version file cannot be parsed
420   */
421  public static void checkVersion(FileSystem fs, Path rootdir, boolean message, int wait,
422    int retries) throws IOException, DeserializationException {
423    String version = getVersion(fs, rootdir);
424    String msg;
425    if (version == null) {
426      if (!metaRegionExists(fs, rootdir)) {
427        // rootDir is empty (no version file and no root region)
428        // just create new version file (HBASE-1195)
429        setVersion(fs, rootdir, wait, retries);
430        return;
431      } else {
432        msg = "hbase.version file is missing. Is your hbase.rootdir valid? "
433          + "You can restore hbase.version file by running 'HBCK2 filesystem -fix'. "
434          + "See https://github.com/apache/hbase-operator-tools/tree/master/hbase-hbck2";
435      }
436    } else if (version.compareTo(HConstants.FILE_SYSTEM_VERSION) == 0) {
437      return;
438    } else {
439      msg = "HBase file layout needs to be upgraded. Current filesystem version is " + version
440        + " but software requires version " + HConstants.FILE_SYSTEM_VERSION
441        + ". Consult http://hbase.apache.org/book.html for further information about "
442        + "upgrading HBase.";
443    }
444
445    // version is deprecated require migration
446    // Output on stdout so user sees it in terminal.
447    if (message) {
448      System.out.println("WARNING! " + msg);
449    }
450    throw new FileSystemVersionException(msg);
451  }
452
453  /**
454   * Sets version of file system
455   * @param fs      filesystem object
456   * @param rootdir hbase root
457   * @throws IOException e
458   */
459  public static void setVersion(FileSystem fs, Path rootdir) throws IOException {
460    setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, 0,
461      HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
462  }
463
464  /**
465   * Sets version of file system
466   * @param fs      filesystem object
467   * @param rootdir hbase root
468   * @param wait    time to wait for retry
469   * @param retries number of times to retry before failing
470   * @throws IOException e
471   */
472  public static void setVersion(FileSystem fs, Path rootdir, int wait, int retries)
473    throws IOException {
474    setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, wait, retries);
475  }
476
477  /**
478   * Sets version of file system
479   * @param fs      filesystem object
480   * @param rootdir hbase root directory
481   * @param version version to set
482   * @param wait    time to wait for retry
483   * @param retries number of times to retry before throwing an IOException
484   * @throws IOException e
485   */
486  public static void setVersion(FileSystem fs, Path rootdir, String version, int wait, int retries)
487    throws IOException {
488    Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
489    Path tempVersionFile = new Path(rootdir,
490      HConstants.HBASE_TEMP_DIRECTORY + Path.SEPARATOR + HConstants.VERSION_FILE_NAME);
491    while (true) {
492      try {
493        // Write the version to a temporary file
494        FSDataOutputStream s = fs.create(tempVersionFile);
495        try {
496          s.write(toVersionByteArray(version));
497          s.close();
498          s = null;
499          // Move the temp version file to its normal location. Returns false
500          // if the rename failed. Throw an IOE in that case.
501          if (!fs.rename(tempVersionFile, versionFile)) {
502            throw new IOException("Unable to move temp version file to " + versionFile);
503          }
504        } finally {
505          // Cleaning up the temporary if the rename failed would be trying
506          // too hard. We'll unconditionally create it again the next time
507          // through anyway, files are overwritten by default by create().
508
509          // Attempt to close the stream on the way out if it is still open.
510          try {
511            if (s != null) s.close();
512          } catch (IOException ignore) {
513          }
514        }
515        LOG.info("Created version file at " + rootdir.toString() + " with version=" + version);
516        return;
517      } catch (IOException e) {
518        if (retries > 0) {
519          LOG.debug("Unable to create version file at " + rootdir.toString() + ", retrying", e);
520          fs.delete(versionFile, false);
521          try {
522            if (wait > 0) {
523              Thread.sleep(wait);
524            }
525          } catch (InterruptedException ie) {
526            throw (InterruptedIOException) new InterruptedIOException().initCause(ie);
527          }
528          retries--;
529        } else {
530          throw e;
531        }
532      }
533    }
534  }
535
536  /**
537   * Checks that a cluster ID file exists in the HBase root directory
538   * @param fs      the root directory FileSystem
539   * @param rootdir the HBase root directory in HDFS
540   * @param wait    how long to wait between retries
541   * @return <code>true</code> if the file exists, otherwise <code>false</code>
542   * @throws IOException if checking the FileSystem fails
543   */
544  public static boolean checkClusterIdExists(FileSystem fs, Path rootdir, long wait)
545    throws IOException {
546    while (true) {
547      try {
548        Path filePath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
549        return fs.exists(filePath);
550      } catch (IOException ioe) {
551        if (wait > 0L) {
552          LOG.warn("Unable to check cluster ID file in {}, retrying in {}ms", rootdir, wait, ioe);
553          try {
554            Thread.sleep(wait);
555          } catch (InterruptedException e) {
556            Thread.currentThread().interrupt();
557            throw (InterruptedIOException) new InterruptedIOException().initCause(e);
558          }
559        } else {
560          throw ioe;
561        }
562      }
563    }
564  }
565
566  /**
567   * Returns the value of the unique cluster ID stored for this HBase instance.
568   * @param fs      the root directory FileSystem
569   * @param rootdir the path to the HBase root directory
570   * @return the unique cluster identifier
571   * @throws IOException if reading the cluster ID file fails
572   */
573  public static ClusterId getClusterId(FileSystem fs, Path rootdir) throws IOException {
574    Path idPath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
575    ClusterId clusterId = null;
576    FileStatus status = fs.exists(idPath) ? fs.getFileStatus(idPath) : null;
577    if (status != null) {
578      int len = Ints.checkedCast(status.getLen());
579      byte[] content = new byte[len];
580      FSDataInputStream in = fs.open(idPath);
581      try {
582        in.readFully(content);
583      } catch (EOFException eof) {
584        LOG.warn("Cluster ID file {} is empty", idPath);
585      } finally {
586        in.close();
587      }
588      try {
589        clusterId = ClusterId.parseFrom(content);
590      } catch (DeserializationException e) {
591        throw new IOException("content=" + Bytes.toString(content), e);
592      }
593      // If not pb'd, make it so.
594      if (!ProtobufUtil.isPBMagicPrefix(content)) {
595        String cid = null;
596        in = fs.open(idPath);
597        try {
598          cid = in.readUTF();
599          clusterId = new ClusterId(cid);
600        } catch (EOFException eof) {
601          LOG.warn("Cluster ID file {} is empty", idPath);
602        } finally {
603          in.close();
604        }
605        rewriteAsPb(fs, rootdir, idPath, clusterId);
606      }
607      return clusterId;
608    } else {
609      LOG.warn("Cluster ID file does not exist at {}", idPath);
610    }
611    return clusterId;
612  }
613
614  /**
615   *   */
616  private static void rewriteAsPb(final FileSystem fs, final Path rootdir, final Path p,
617    final ClusterId cid) throws IOException {
618    // Rewrite the file as pb. Move aside the old one first, write new
619    // then delete the moved-aside file.
620    Path movedAsideName = new Path(p + "." + EnvironmentEdgeManager.currentTime());
621    if (!fs.rename(p, movedAsideName)) throw new IOException("Failed rename of " + p);
622    setClusterId(fs, rootdir, cid, 100);
623    if (!fs.delete(movedAsideName, false)) {
624      throw new IOException("Failed delete of " + movedAsideName);
625    }
626    LOG.debug("Rewrote the hbase.id file as pb");
627  }
628
629  /**
630   * Writes a new unique identifier for this cluster to the "hbase.id" file in the HBase root
631   * directory. If any operations on the ID file fails, and {@code wait} is a positive value, the
632   * method will retry to produce the ID file until the thread is forcibly interrupted.
633   * @param fs        the root directory FileSystem
634   * @param rootdir   the path to the HBase root directory
635   * @param clusterId the unique identifier to store
636   * @param wait      how long (in milliseconds) to wait between retries
637   * @throws IOException if writing to the FileSystem fails and no wait value
638   */
639  public static void setClusterId(final FileSystem fs, final Path rootdir,
640    final ClusterId clusterId, final long wait) throws IOException {
641
642    final Path idFile = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
643    final Path tempDir = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY);
644    final Path tempIdFile = new Path(tempDir, HConstants.CLUSTER_ID_FILE_NAME);
645
646    LOG.debug("Create cluster ID file [{}] with ID: {}", idFile, clusterId);
647
648    while (true) {
649      Optional<IOException> failure = Optional.empty();
650
651      LOG.debug("Write the cluster ID file to a temporary location: {}", tempIdFile);
652      try (FSDataOutputStream s = fs.create(tempIdFile)) {
653        s.write(clusterId.toByteArray());
654      } catch (IOException ioe) {
655        failure = Optional.of(ioe);
656      }
657
658      if (!failure.isPresent()) {
659        try {
660          LOG.debug("Move the temporary cluster ID file to its target location [{}]:[{}]",
661            tempIdFile, idFile);
662
663          if (!fs.rename(tempIdFile, idFile)) {
664            failure =
665              Optional.of(new IOException("Unable to move temp cluster ID file to " + idFile));
666          }
667        } catch (IOException ioe) {
668          failure = Optional.of(ioe);
669        }
670      }
671
672      if (failure.isPresent()) {
673        final IOException cause = failure.get();
674        if (wait > 0L) {
675          LOG.warn("Unable to create cluster ID file in {}, retrying in {}ms", rootdir, wait,
676            cause);
677          try {
678            Thread.sleep(wait);
679          } catch (InterruptedException e) {
680            Thread.currentThread().interrupt();
681            throw (InterruptedIOException) new InterruptedIOException().initCause(e);
682          }
683          continue;
684        } else {
685          throw cause;
686        }
687      } else {
688        return;
689      }
690    }
691  }
692
693  /**
694   * If DFS, check safe mode and if so, wait until we clear it.
695   * @param conf configuration
696   * @param wait Sleep between retries
697   * @throws IOException e
698   */
699  public static void waitOnSafeMode(final Configuration conf, final long wait) throws IOException {
700    FileSystem fs = FileSystem.get(conf);
701    if (!supportSafeMode(fs)) {
702      return;
703    }
704    // Make sure dfs is not in safe mode
705    while (isInSafeMode(fs)) {
706      LOG.info("Waiting for dfs to exit safe mode...");
707      try {
708        Thread.sleep(wait);
709      } catch (InterruptedException e) {
710        Thread.currentThread().interrupt();
711        throw (InterruptedIOException) new InterruptedIOException().initCause(e);
712      }
713    }
714  }
715
716  public static boolean supportSafeMode(FileSystem fs) {
717    // return true if HDFS.
718    if (fs instanceof DistributedFileSystem) {
719      return true;
720    }
721    // return true if the file system implements SafeMode interface.
722    if (safeModeClazz != null) {
723      return (safeModeClazz.isAssignableFrom(fs.getClass()));
724    }
725    // return false if the file system is not HDFS and does not implement SafeMode interface.
726    return false;
727  }
728
729  /**
730   * Checks if meta region exists
731   * @param fs      file system
732   * @param rootDir root directory of HBase installation
733   * @return true if exists
734   */
735  public static boolean metaRegionExists(FileSystem fs, Path rootDir) throws IOException {
736    Path metaRegionDir = getRegionDirFromRootDir(rootDir, RegionInfoBuilder.FIRST_META_REGIONINFO);
737    return fs.exists(metaRegionDir);
738  }
739
740  /**
741   * Compute HDFS block distribution of a given HdfsDataInputStream. All HdfsDataInputStreams are
742   * backed by a series of LocatedBlocks, which are fetched periodically from the namenode. This
743   * method retrieves those blocks from the input stream and uses them to calculate
744   * HDFSBlockDistribution. The underlying method in DFSInputStream does attempt to use locally
745   * cached blocks, but may hit the namenode if the cache is determined to be incomplete. The method
746   * also involves making copies of all LocatedBlocks rather than return the underlying blocks
747   * themselves.
748   */
749  static public HDFSBlocksDistribution
750    computeHDFSBlocksDistribution(HdfsDataInputStream inputStream) throws IOException {
751    List<LocatedBlock> blocks = inputStream.getAllBlocks();
752    HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution();
753    for (LocatedBlock block : blocks) {
754      String[] hosts = getHostsForLocations(block);
755      long len = block.getBlockSize();
756      StorageType[] storageTypes = block.getStorageTypes();
757      blocksDistribution.addHostsAndBlockWeight(hosts, len, storageTypes);
758    }
759    return blocksDistribution;
760  }
761
762  private static String[] getHostsForLocations(LocatedBlock block) {
763    DatanodeInfo[] locations = getLocatedBlockLocations(block);
764    String[] hosts = new String[locations.length];
765    for (int i = 0; i < hosts.length; i++) {
766      hosts[i] = locations[i].getHostName();
767    }
768    return hosts;
769  }
770
771  /**
772   * Compute HDFS blocks distribution of a given file, or a portion of the file
773   * @param fs     file system
774   * @param status file status of the file
775   * @param start  start position of the portion
776   * @param length length of the portion
777   * @return The HDFS blocks distribution
778   */
779  static public HDFSBlocksDistribution computeHDFSBlocksDistribution(final FileSystem fs,
780    FileStatus status, long start, long length) throws IOException {
781    HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution();
782    BlockLocation[] blockLocations = fs.getFileBlockLocations(status, start, length);
783    addToHDFSBlocksDistribution(blocksDistribution, blockLocations);
784    return blocksDistribution;
785  }
786
787  /**
788   * Update blocksDistribution with blockLocations
789   * @param blocksDistribution the hdfs blocks distribution
790   * @param blockLocations     an array containing block location
791   */
792  static public void addToHDFSBlocksDistribution(HDFSBlocksDistribution blocksDistribution,
793    BlockLocation[] blockLocations) throws IOException {
794    for (BlockLocation bl : blockLocations) {
795      String[] hosts = bl.getHosts();
796      long len = bl.getLength();
797      StorageType[] storageTypes = bl.getStorageTypes();
798      blocksDistribution.addHostsAndBlockWeight(hosts, len, storageTypes);
799    }
800  }
801
802  // TODO move this method OUT of FSUtils. No dependencies to HMaster
803  /**
804   * Returns the total overall fragmentation percentage. Includes hbase:meta and -ROOT- as well.
805   * @param master The master defining the HBase root and file system
806   * @return A map for each table and its percentage (never null)
807   * @throws IOException When scanning the directory fails
808   */
809  public static int getTotalTableFragmentation(final HMaster master) throws IOException {
810    Map<String, Integer> map = getTableFragmentation(master);
811    return map.isEmpty() ? -1 : map.get("-TOTAL-");
812  }
813
814  /**
815   * Runs through the HBase rootdir and checks how many stores for each table have more than one
816   * file in them. Checks -ROOT- and hbase:meta too. The total percentage across all tables is
817   * stored under the special key "-TOTAL-".
818   * @param master The master defining the HBase root and file system.
819   * @return A map for each table and its percentage (never null).
820   * @throws IOException When scanning the directory fails.
821   */
822  public static Map<String, Integer> getTableFragmentation(final HMaster master)
823    throws IOException {
824    Path path = CommonFSUtils.getRootDir(master.getConfiguration());
825    // since HMaster.getFileSystem() is package private
826    FileSystem fs = path.getFileSystem(master.getConfiguration());
827    return getTableFragmentation(fs, path);
828  }
829
830  /**
831   * Runs through the HBase rootdir and checks how many stores for each table have more than one
832   * file in them. Checks -ROOT- and hbase:meta too. The total percentage across all tables is
833   * stored under the special key "-TOTAL-".
834   * @param fs           The file system to use
835   * @param hbaseRootDir The root directory to scan
836   * @return A map for each table and its percentage (never null)
837   * @throws IOException When scanning the directory fails
838   */
839  public static Map<String, Integer> getTableFragmentation(final FileSystem fs,
840    final Path hbaseRootDir) throws IOException {
841    Map<String, Integer> frags = new HashMap<>();
842    int cfCountTotal = 0;
843    int cfFragTotal = 0;
844    PathFilter regionFilter = new RegionDirFilter(fs);
845    PathFilter familyFilter = new FamilyDirFilter(fs);
846    List<Path> tableDirs = getTableDirs(fs, hbaseRootDir);
847    for (Path d : tableDirs) {
848      int cfCount = 0;
849      int cfFrag = 0;
850      FileStatus[] regionDirs = fs.listStatus(d, regionFilter);
851      for (FileStatus regionDir : regionDirs) {
852        Path dd = regionDir.getPath();
853        // else its a region name, now look in region for families
854        FileStatus[] familyDirs = fs.listStatus(dd, familyFilter);
855        for (FileStatus familyDir : familyDirs) {
856          cfCount++;
857          cfCountTotal++;
858          Path family = familyDir.getPath();
859          // now in family make sure only one file
860          FileStatus[] familyStatus = fs.listStatus(family);
861          if (familyStatus.length > 1) {
862            cfFrag++;
863            cfFragTotal++;
864          }
865        }
866      }
867      // compute percentage per table and store in result list
868      frags.put(CommonFSUtils.getTableName(d).getNameAsString(),
869        cfCount == 0 ? 0 : Math.round((float) cfFrag / cfCount * 100));
870    }
871    // set overall percentage for all tables
872    frags.put("-TOTAL-",
873      cfCountTotal == 0 ? 0 : Math.round((float) cfFragTotal / cfCountTotal * 100));
874    return frags;
875  }
876
877  public static void renameFile(FileSystem fs, Path src, Path dst) throws IOException {
878    if (fs.exists(dst) && !fs.delete(dst, false)) {
879      throw new IOException("Can not delete " + dst);
880    }
881    if (!fs.rename(src, dst)) {
882      throw new IOException("Can not rename from " + src + " to " + dst);
883    }
884  }
885
886  /**
887   * A {@link PathFilter} that returns only regular files.
888   */
889  static class FileFilter extends AbstractFileStatusFilter {
890    private final FileSystem fs;
891
892    public FileFilter(final FileSystem fs) {
893      this.fs = fs;
894    }
895
896    @Override
897    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
898      try {
899        return isFile(fs, isDir, p);
900      } catch (IOException e) {
901        LOG.warn("Unable to verify if path={} is a regular file", p, e);
902        return false;
903      }
904    }
905  }
906
907  /**
908   * Directory filter that doesn't include any of the directories in the specified blacklist
909   */
910  public static class BlackListDirFilter extends AbstractFileStatusFilter {
911    private final FileSystem fs;
912    private List<String> blacklist;
913
914    /**
915     * Create a filter on the givem filesystem with the specified blacklist
916     * @param fs                     filesystem to filter
917     * @param directoryNameBlackList list of the names of the directories to filter. If
918     *                               <tt>null</tt>, all directories are returned
919     */
920    @SuppressWarnings("unchecked")
921    public BlackListDirFilter(final FileSystem fs, final List<String> directoryNameBlackList) {
922      this.fs = fs;
923      blacklist = (List<String>) (directoryNameBlackList == null
924        ? Collections.emptyList()
925        : directoryNameBlackList);
926    }
927
928    @Override
929    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
930      if (!isValidName(p.getName())) {
931        return false;
932      }
933
934      try {
935        return isDirectory(fs, isDir, p);
936      } catch (IOException e) {
937        LOG.warn("An error occurred while verifying if [{}] is a valid directory."
938          + " Returning 'not valid' and continuing.", p, e);
939        return false;
940      }
941    }
942
943    protected boolean isValidName(final String name) {
944      return !blacklist.contains(name);
945    }
946  }
947
948  /**
949   * A {@link PathFilter} that only allows directories.
950   */
951  public static class DirFilter extends BlackListDirFilter {
952
953    public DirFilter(FileSystem fs) {
954      super(fs, null);
955    }
956  }
957
958  /**
959   * A {@link PathFilter} that returns usertable directories. To get all directories use the
960   * {@link BlackListDirFilter} with a <tt>null</tt> blacklist
961   */
962  public static class UserTableDirFilter extends BlackListDirFilter {
963    public UserTableDirFilter(FileSystem fs) {
964      super(fs, HConstants.HBASE_NON_TABLE_DIRS);
965    }
966
967    @Override
968    protected boolean isValidName(final String name) {
969      if (!super.isValidName(name)) return false;
970
971      try {
972        TableName.isLegalTableQualifierName(Bytes.toBytes(name));
973      } catch (IllegalArgumentException e) {
974        LOG.info("Invalid table name: {}", name);
975        return false;
976      }
977      return true;
978    }
979  }
980
981  public static List<Path> getTableDirs(final FileSystem fs, final Path rootdir)
982    throws IOException {
983    List<Path> tableDirs = new ArrayList<>();
984    Path baseNamespaceDir = new Path(rootdir, HConstants.BASE_NAMESPACE_DIR);
985    if (fs.exists(baseNamespaceDir)) {
986      for (FileStatus status : fs.globStatus(new Path(baseNamespaceDir, "*"))) {
987        tableDirs.addAll(FSUtils.getLocalTableDirs(fs, status.getPath()));
988      }
989    }
990    return tableDirs;
991  }
992
993  /**
994   * @return All the table directories under <code>rootdir</code>. Ignore non table hbase folders
995   *         such as .logs, .oldlogs, .corrupt folders.
996   */
997  public static List<Path> getLocalTableDirs(final FileSystem fs, final Path rootdir)
998    throws IOException {
999    // presumes any directory under hbase.rootdir is a table
1000    FileStatus[] dirs = fs.listStatus(rootdir, new UserTableDirFilter(fs));
1001    List<Path> tabledirs = new ArrayList<>(dirs.length);
1002    for (FileStatus dir : dirs) {
1003      tabledirs.add(dir.getPath());
1004    }
1005    return tabledirs;
1006  }
1007
1008  /**
1009   * Filter for all dirs that don't start with '.'
1010   */
1011  public static class RegionDirFilter extends AbstractFileStatusFilter {
1012    // This pattern will accept 0.90+ style hex region dirs and older numeric region dir names.
1013    final public static Pattern regionDirPattern = Pattern.compile("^[0-9a-f]*$");
1014    final FileSystem fs;
1015
1016    public RegionDirFilter(FileSystem fs) {
1017      this.fs = fs;
1018    }
1019
1020    @Override
1021    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1022      if (!regionDirPattern.matcher(p.getName()).matches()) {
1023        return false;
1024      }
1025
1026      try {
1027        return isDirectory(fs, isDir, p);
1028      } catch (IOException ioe) {
1029        // Maybe the file was moved or the fs was disconnected.
1030        LOG.warn("Skipping file {} due to IOException", p, ioe);
1031        return false;
1032      }
1033    }
1034  }
1035
1036  /**
1037   * Given a particular table dir, return all the regiondirs inside it, excluding files such as
1038   * .tableinfo
1039   * @param fs       A file system for the Path
1040   * @param tableDir Path to a specific table directory &lt;hbase.rootdir&gt;/&lt;tabledir&gt;
1041   * @return List of paths to valid region directories in table dir.
1042   */
1043  public static List<Path> getRegionDirs(final FileSystem fs, final Path tableDir)
1044    throws IOException {
1045    // assumes we are in a table dir.
1046    List<FileStatus> rds = listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs));
1047    if (rds == null) {
1048      return Collections.emptyList();
1049    }
1050    List<Path> regionDirs = new ArrayList<>(rds.size());
1051    for (FileStatus rdfs : rds) {
1052      Path rdPath = rdfs.getPath();
1053      regionDirs.add(rdPath);
1054    }
1055    return regionDirs;
1056  }
1057
1058  public static Path getRegionDirFromRootDir(Path rootDir, RegionInfo region) {
1059    return getRegionDirFromTableDir(CommonFSUtils.getTableDir(rootDir, region.getTable()), region);
1060  }
1061
1062  public static Path getRegionDirFromTableDir(Path tableDir, RegionInfo region) {
1063    return getRegionDirFromTableDir(tableDir,
1064      ServerRegionReplicaUtil.getRegionInfoForFs(region).getEncodedName());
1065  }
1066
1067  public static Path getRegionDirFromTableDir(Path tableDir, String encodedRegionName) {
1068    return new Path(tableDir, encodedRegionName);
1069  }
1070
1071  /**
1072   * Filter for all dirs that are legal column family names. This is generally used for colfam dirs
1073   * &lt;hbase.rootdir&gt;/&lt;tabledir&gt;/&lt;regiondir&gt;/&lt;colfamdir&gt;.
1074   */
1075  public static class FamilyDirFilter extends AbstractFileStatusFilter {
1076    final FileSystem fs;
1077
1078    public FamilyDirFilter(FileSystem fs) {
1079      this.fs = fs;
1080    }
1081
1082    @Override
1083    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1084      try {
1085        // throws IAE if invalid
1086        ColumnFamilyDescriptorBuilder.isLegalColumnFamilyName(Bytes.toBytes(p.getName()));
1087      } catch (IllegalArgumentException iae) {
1088        // path name is an invalid family name and thus is excluded.
1089        return false;
1090      }
1091
1092      try {
1093        return isDirectory(fs, isDir, p);
1094      } catch (IOException ioe) {
1095        // Maybe the file was moved or the fs was disconnected.
1096        LOG.warn("Skipping file {} due to IOException", p, ioe);
1097        return false;
1098      }
1099    }
1100  }
1101
1102  /**
1103   * Given a particular region dir, return all the familydirs inside it
1104   * @param fs        A file system for the Path
1105   * @param regionDir Path to a specific region directory
1106   * @return List of paths to valid family directories in region dir.
1107   */
1108  public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir)
1109    throws IOException {
1110    // assumes we are in a region dir.
1111    return getFilePaths(fs, regionDir, new FamilyDirFilter(fs));
1112  }
1113
1114  public static List<Path> getReferenceFilePaths(final FileSystem fs, final Path familyDir)
1115    throws IOException {
1116    return getFilePaths(fs, familyDir, new ReferenceFileFilter(fs));
1117  }
1118
1119  public static List<Path> getReferenceAndLinkFilePaths(final FileSystem fs, final Path familyDir)
1120    throws IOException {
1121    return getFilePaths(fs, familyDir, new ReferenceAndLinkFileFilter(fs));
1122  }
1123
1124  private static List<Path> getFilePaths(final FileSystem fs, final Path dir,
1125    final PathFilter pathFilter) throws IOException {
1126    FileStatus[] fds = fs.listStatus(dir, pathFilter);
1127    List<Path> files = new ArrayList<>(fds.length);
1128    for (FileStatus fdfs : fds) {
1129      Path fdPath = fdfs.getPath();
1130      files.add(fdPath);
1131    }
1132    return files;
1133  }
1134
1135  public static int getRegionReferenceAndLinkFileCount(final FileSystem fs, final Path p) {
1136    int result = 0;
1137    try {
1138      for (Path familyDir : getFamilyDirs(fs, p)) {
1139        result += getReferenceAndLinkFilePaths(fs, familyDir).size();
1140      }
1141    } catch (IOException e) {
1142      LOG.warn("Error Counting reference files.", e);
1143    }
1144    return result;
1145  }
1146
1147  public static class ReferenceAndLinkFileFilter implements PathFilter {
1148
1149    private final FileSystem fs;
1150
1151    public ReferenceAndLinkFileFilter(FileSystem fs) {
1152      this.fs = fs;
1153    }
1154
1155    @Override
1156    public boolean accept(Path rd) {
1157      try {
1158        // only files can be references.
1159        return !fs.getFileStatus(rd).isDirectory()
1160          && (StoreFileInfo.isReference(rd) || HFileLink.isHFileLink(rd));
1161      } catch (IOException ioe) {
1162        // Maybe the file was moved or the fs was disconnected.
1163        LOG.warn("Skipping file " + rd + " due to IOException", ioe);
1164        return false;
1165      }
1166    }
1167  }
1168
1169  /**
1170   * Filter for HFiles that excludes reference files.
1171   */
1172  public static class HFileFilter extends AbstractFileStatusFilter {
1173    final FileSystem fs;
1174
1175    public HFileFilter(FileSystem fs) {
1176      this.fs = fs;
1177    }
1178
1179    @Override
1180    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1181      if (!StoreFileInfo.isHFile(p) && !StoreFileInfo.isMobFile(p)) {
1182        return false;
1183      }
1184
1185      try {
1186        return isFile(fs, isDir, p);
1187      } catch (IOException ioe) {
1188        // Maybe the file was moved or the fs was disconnected.
1189        LOG.warn("Skipping file {} due to IOException", p, ioe);
1190        return false;
1191      }
1192    }
1193  }
1194
1195  /**
1196   * Filter for HFileLinks (StoreFiles and HFiles not included). the filter itself does not consider
1197   * if a link is file or not.
1198   */
1199  public static class HFileLinkFilter implements PathFilter {
1200
1201    @Override
1202    public boolean accept(Path p) {
1203      return HFileLink.isHFileLink(p);
1204    }
1205  }
1206
1207  public static class ReferenceFileFilter extends AbstractFileStatusFilter {
1208
1209    private final FileSystem fs;
1210
1211    public ReferenceFileFilter(FileSystem fs) {
1212      this.fs = fs;
1213    }
1214
1215    @Override
1216    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1217      if (!StoreFileInfo.isReference(p)) {
1218        return false;
1219      }
1220
1221      try {
1222        // only files can be references.
1223        return isFile(fs, isDir, p);
1224      } catch (IOException ioe) {
1225        // Maybe the file was moved or the fs was disconnected.
1226        LOG.warn("Skipping file {} due to IOException", p, ioe);
1227        return false;
1228      }
1229    }
1230  }
1231
1232  /**
1233   * Called every so-often by storefile map builder getTableStoreFilePathMap to report progress.
1234   */
1235  interface ProgressReporter {
1236    /**
1237     * @param status File or directory we are about to process.
1238     */
1239    void progress(FileStatus status);
1240  }
1241
1242  /**
1243   * Runs through the HBase rootdir/tablename and creates a reverse lookup map for table StoreFile
1244   * names to the full Path. <br>
1245   * Example...<br>
1246   * Key = 3944417774205889744 <br>
1247   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1248   * @param map          map to add values. If null, this method will create and populate one to
1249   *                     return
1250   * @param fs           The file system to use.
1251   * @param hbaseRootDir The root directory to scan.
1252   * @param tableName    name of the table to scan.
1253   * @return Map keyed by StoreFile name with a value of the full Path.
1254   * @throws IOException When scanning the directory fails.
1255   */
1256  public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map,
1257    final FileSystem fs, final Path hbaseRootDir, TableName tableName)
1258    throws IOException, InterruptedException {
1259    return getTableStoreFilePathMap(map, fs, hbaseRootDir, tableName, null, null,
1260      (ProgressReporter) null);
1261  }
1262
1263  /**
1264   * Runs through the HBase rootdir/tablename and creates a reverse lookup map for table StoreFile
1265   * names to the full Path. Note that because this method can be called on a 'live' HBase system
1266   * that we will skip files that no longer exist by the time we traverse them and similarly the
1267   * user of the result needs to consider that some entries in this map may not exist by the time
1268   * this call completes. <br>
1269   * Example...<br>
1270   * Key = 3944417774205889744 <br>
1271   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1272   * @param resultMap        map to add values. If null, this method will create and populate one to
1273   *                         return
1274   * @param fs               The file system to use.
1275   * @param hbaseRootDir     The root directory to scan.
1276   * @param tableName        name of the table to scan.
1277   * @param sfFilter         optional path filter to apply to store files
1278   * @param executor         optional executor service to parallelize this operation
1279   * @param progressReporter Instance or null; gets called every time we move to new region of
1280   *                         family dir and for each store file.
1281   * @return Map keyed by StoreFile name with a value of the full Path.
1282   * @throws IOException When scanning the directory fails.
1283   * @deprecated Since 2.3.0. For removal in hbase4. Use ProgressReporter override instead.
1284   */
1285  @Deprecated
1286  public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap,
1287    final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter,
1288    ExecutorService executor, final HbckErrorReporter progressReporter)
1289    throws IOException, InterruptedException {
1290    return getTableStoreFilePathMap(resultMap, fs, hbaseRootDir, tableName, sfFilter, executor,
1291      new ProgressReporter() {
1292        @Override
1293        public void progress(FileStatus status) {
1294          // status is not used in this implementation.
1295          progressReporter.progress();
1296        }
1297      });
1298  }
1299
1300  /**
1301   * Runs through the HBase rootdir/tablename and creates a reverse lookup map for table StoreFile
1302   * names to the full Path. Note that because this method can be called on a 'live' HBase system
1303   * that we will skip files that no longer exist by the time we traverse them and similarly the
1304   * user of the result needs to consider that some entries in this map may not exist by the time
1305   * this call completes. <br>
1306   * Example...<br>
1307   * Key = 3944417774205889744 <br>
1308   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1309   * @param resultMap        map to add values. If null, this method will create and populate one to
1310   *                         return
1311   * @param fs               The file system to use.
1312   * @param hbaseRootDir     The root directory to scan.
1313   * @param tableName        name of the table to scan.
1314   * @param sfFilter         optional path filter to apply to store files
1315   * @param executor         optional executor service to parallelize this operation
1316   * @param progressReporter Instance or null; gets called every time we move to new region of
1317   *                         family dir and for each store file.
1318   * @return Map keyed by StoreFile name with a value of the full Path.
1319   * @throws IOException          When scanning the directory fails.
1320   * @throws InterruptedException the thread is interrupted, either before or during the activity.
1321   */
1322  public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap,
1323    final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter,
1324    ExecutorService executor, final ProgressReporter progressReporter)
1325    throws IOException, InterruptedException {
1326
1327    final Map<String, Path> finalResultMap =
1328      resultMap == null ? new ConcurrentHashMap<>(128, 0.75f, 32) : resultMap;
1329
1330    // only include the directory paths to tables
1331    Path tableDir = CommonFSUtils.getTableDir(hbaseRootDir, tableName);
1332    // Inside a table, there are compaction.dir directories to skip. Otherwise, all else
1333    // should be regions.
1334    final FamilyDirFilter familyFilter = new FamilyDirFilter(fs);
1335    final Vector<Exception> exceptions = new Vector<>();
1336
1337    try {
1338      List<FileStatus> regionDirs =
1339        FSUtils.listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs));
1340      if (regionDirs == null) {
1341        return finalResultMap;
1342      }
1343
1344      final List<Future<?>> futures = new ArrayList<>(regionDirs.size());
1345
1346      for (FileStatus regionDir : regionDirs) {
1347        if (null != progressReporter) {
1348          progressReporter.progress(regionDir);
1349        }
1350        final Path dd = regionDir.getPath();
1351
1352        if (!exceptions.isEmpty()) {
1353          break;
1354        }
1355
1356        Runnable getRegionStoreFileMapCall = new Runnable() {
1357          @Override
1358          public void run() {
1359            try {
1360              HashMap<String, Path> regionStoreFileMap = new HashMap<>();
1361              List<FileStatus> familyDirs =
1362                FSUtils.listStatusWithStatusFilter(fs, dd, familyFilter);
1363              if (familyDirs == null) {
1364                if (!fs.exists(dd)) {
1365                  LOG.warn("Skipping region because it no longer exists: " + dd);
1366                } else {
1367                  LOG.warn("Skipping region because it has no family dirs: " + dd);
1368                }
1369                return;
1370              }
1371              for (FileStatus familyDir : familyDirs) {
1372                if (null != progressReporter) {
1373                  progressReporter.progress(familyDir);
1374                }
1375                Path family = familyDir.getPath();
1376                if (family.getName().equals(HConstants.RECOVERED_EDITS_DIR)) {
1377                  continue;
1378                }
1379                // now in family, iterate over the StoreFiles and
1380                // put in map
1381                FileStatus[] familyStatus = fs.listStatus(family);
1382                for (FileStatus sfStatus : familyStatus) {
1383                  if (null != progressReporter) {
1384                    progressReporter.progress(sfStatus);
1385                  }
1386                  Path sf = sfStatus.getPath();
1387                  if (sfFilter == null || sfFilter.accept(sf)) {
1388                    regionStoreFileMap.put(sf.getName(), sf);
1389                  }
1390                }
1391              }
1392              finalResultMap.putAll(regionStoreFileMap);
1393            } catch (Exception e) {
1394              LOG.error("Could not get region store file map for region: " + dd, e);
1395              exceptions.add(e);
1396            }
1397          }
1398        };
1399
1400        // If executor is available, submit async tasks to exec concurrently, otherwise
1401        // just do serial sync execution
1402        if (executor != null) {
1403          Future<?> future = executor.submit(getRegionStoreFileMapCall);
1404          futures.add(future);
1405        } else {
1406          FutureTask<?> future = new FutureTask<>(getRegionStoreFileMapCall, null);
1407          future.run();
1408          futures.add(future);
1409        }
1410      }
1411
1412      // Ensure all pending tasks are complete (or that we run into an exception)
1413      for (Future<?> f : futures) {
1414        if (!exceptions.isEmpty()) {
1415          break;
1416        }
1417        try {
1418          f.get();
1419        } catch (ExecutionException e) {
1420          LOG.error("Unexpected exec exception!  Should've been caught already.  (Bug?)", e);
1421          // Shouldn't happen, we already logged/caught any exceptions in the Runnable
1422        }
1423      }
1424    } catch (IOException e) {
1425      LOG.error("Cannot execute getTableStoreFilePathMap for " + tableName, e);
1426      exceptions.add(e);
1427    } finally {
1428      if (!exceptions.isEmpty()) {
1429        // Just throw the first exception as an indication something bad happened
1430        // Don't need to propagate all the exceptions, we already logged them all anyway
1431        Throwables.propagateIfPossible(exceptions.firstElement(), IOException.class);
1432        throw new IOException(exceptions.firstElement());
1433      }
1434    }
1435
1436    return finalResultMap;
1437  }
1438
1439  public static int getRegionReferenceFileCount(final FileSystem fs, final Path p) {
1440    int result = 0;
1441    try {
1442      for (Path familyDir : getFamilyDirs(fs, p)) {
1443        result += getReferenceFilePaths(fs, familyDir).size();
1444      }
1445    } catch (IOException e) {
1446      LOG.warn("Error counting reference files", e);
1447    }
1448    return result;
1449  }
1450
1451  /**
1452   * Runs through the HBase rootdir and creates a reverse lookup map for table StoreFile names to
1453   * the full Path. <br>
1454   * Example...<br>
1455   * Key = 3944417774205889744 <br>
1456   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1457   * @param fs           The file system to use.
1458   * @param hbaseRootDir The root directory to scan.
1459   * @return Map keyed by StoreFile name with a value of the full Path.
1460   * @throws IOException When scanning the directory fails.
1461   */
1462  public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs,
1463    final Path hbaseRootDir) throws IOException, InterruptedException {
1464    return getTableStoreFilePathMap(fs, hbaseRootDir, null, null, (ProgressReporter) null);
1465  }
1466
1467  /**
1468   * Runs through the HBase rootdir and creates a reverse lookup map for table StoreFile names to
1469   * the full Path. <br>
1470   * Example...<br>
1471   * Key = 3944417774205889744 <br>
1472   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1473   * @param fs               The file system to use.
1474   * @param hbaseRootDir     The root directory to scan.
1475   * @param sfFilter         optional path filter to apply to store files
1476   * @param executor         optional executor service to parallelize this operation
1477   * @param progressReporter Instance or null; gets called every time we move to new region of
1478   *                         family dir and for each store file.
1479   * @return Map keyed by StoreFile name with a value of the full Path.
1480   * @throws IOException When scanning the directory fails.
1481   * @deprecated Since 2.3.0. Will be removed in hbase4. Used
1482   *             {@link #getTableStoreFilePathMap(FileSystem, Path, PathFilter, ExecutorService, ProgressReporter)}
1483   */
1484  @Deprecated
1485  public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs,
1486    final Path hbaseRootDir, PathFilter sfFilter, ExecutorService executor,
1487    HbckErrorReporter progressReporter) throws IOException, InterruptedException {
1488    return getTableStoreFilePathMap(fs, hbaseRootDir, sfFilter, executor, new ProgressReporter() {
1489      @Override
1490      public void progress(FileStatus status) {
1491        // status is not used in this implementation.
1492        progressReporter.progress();
1493      }
1494    });
1495  }
1496
1497  /**
1498   * Runs through the HBase rootdir and creates a reverse lookup map for table StoreFile names to
1499   * the full Path. <br>
1500   * Example...<br>
1501   * Key = 3944417774205889744 <br>
1502   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1503   * @param fs               The file system to use.
1504   * @param hbaseRootDir     The root directory to scan.
1505   * @param sfFilter         optional path filter to apply to store files
1506   * @param executor         optional executor service to parallelize this operation
1507   * @param progressReporter Instance or null; gets called every time we move to new region of
1508   *                         family dir and for each store file.
1509   * @return Map keyed by StoreFile name with a value of the full Path.
1510   * @throws IOException When scanning the directory fails.
1511   */
1512  public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs,
1513    final Path hbaseRootDir, PathFilter sfFilter, ExecutorService executor,
1514    ProgressReporter progressReporter) throws IOException, InterruptedException {
1515    ConcurrentHashMap<String, Path> map = new ConcurrentHashMap<>(1024, 0.75f, 32);
1516
1517    // if this method looks similar to 'getTableFragmentation' that is because
1518    // it was borrowed from it.
1519
1520    // only include the directory paths to tables
1521    for (Path tableDir : FSUtils.getTableDirs(fs, hbaseRootDir)) {
1522      getTableStoreFilePathMap(map, fs, hbaseRootDir, CommonFSUtils.getTableName(tableDir),
1523        sfFilter, executor, progressReporter);
1524    }
1525    return map;
1526  }
1527
1528  /**
1529   * Filters FileStatuses in an array and returns a list
1530   * @param input  An array of FileStatuses
1531   * @param filter A required filter to filter the array
1532   * @return A list of FileStatuses
1533   */
1534  public static List<FileStatus> filterFileStatuses(FileStatus[] input, FileStatusFilter filter) {
1535    if (input == null) return null;
1536    return filterFileStatuses(Iterators.forArray(input), filter);
1537  }
1538
1539  /**
1540   * Filters FileStatuses in an iterator and returns a list
1541   * @param input  An iterator of FileStatuses
1542   * @param filter A required filter to filter the array
1543   * @return A list of FileStatuses
1544   */
1545  public static List<FileStatus> filterFileStatuses(Iterator<FileStatus> input,
1546    FileStatusFilter filter) {
1547    if (input == null) return null;
1548    ArrayList<FileStatus> results = new ArrayList<>();
1549    while (input.hasNext()) {
1550      FileStatus f = input.next();
1551      if (filter.accept(f)) {
1552        results.add(f);
1553      }
1554    }
1555    return results;
1556  }
1557
1558  /**
1559   * Calls fs.listStatus() and treats FileNotFoundException as non-fatal This accommodates
1560   * differences between hadoop versions, where hadoop 1 does not throw a FileNotFoundException, and
1561   * return an empty FileStatus[] while Hadoop 2 will throw FileNotFoundException.
1562   * @param fs     file system
1563   * @param dir    directory
1564   * @param filter file status filter
1565   * @return null if dir is empty or doesn't exist, otherwise FileStatus list
1566   */
1567  public static List<FileStatus> listStatusWithStatusFilter(final FileSystem fs, final Path dir,
1568    final FileStatusFilter filter) throws IOException {
1569    FileStatus[] status = null;
1570    try {
1571      status = fs.listStatus(dir);
1572    } catch (FileNotFoundException fnfe) {
1573      LOG.trace("{} does not exist", dir);
1574      return null;
1575    }
1576
1577    if (ArrayUtils.getLength(status) == 0) {
1578      return null;
1579    }
1580
1581    if (filter == null) {
1582      return Arrays.asList(status);
1583    } else {
1584      List<FileStatus> status2 = filterFileStatuses(status, filter);
1585      if (status2 == null || status2.isEmpty()) {
1586        return null;
1587      } else {
1588        return status2;
1589      }
1590    }
1591  }
1592
1593  /**
1594   * This function is to scan the root path of the file system to get the degree of locality for
1595   * each region on each of the servers having at least one block of that region. This is used by
1596   * the tool {@link org.apache.hadoop.hbase.master.RegionPlacementMaintainer} the configuration to
1597   * use
1598   * @return the mapping from region encoded name to a map of server names to locality fraction in
1599   *         case of file system errors or interrupts
1600   */
1601  public static Map<String, Map<String, Float>>
1602    getRegionDegreeLocalityMappingFromFS(final Configuration conf) throws IOException {
1603    return getRegionDegreeLocalityMappingFromFS(conf, null,
1604      conf.getInt(THREAD_POOLSIZE, DEFAULT_THREAD_POOLSIZE));
1605
1606  }
1607
1608  /**
1609   * This function is to scan the root path of the file system to get the degree of locality for
1610   * each region on each of the servers having at least one block of that region. the configuration
1611   * to use the table you wish to scan locality for the thread pool size to use
1612   * @return the mapping from region encoded name to a map of server names to locality fraction in
1613   *         case of file system errors or interrupts
1614   */
1615  public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
1616    final Configuration conf, final String desiredTable, int threadPoolSize) throws IOException {
1617    Map<String, Map<String, Float>> regionDegreeLocalityMapping = new ConcurrentHashMap<>();
1618    getRegionLocalityMappingFromFS(conf, desiredTable, threadPoolSize, regionDegreeLocalityMapping);
1619    return regionDegreeLocalityMapping;
1620  }
1621
1622  /**
1623   * This function is to scan the root path of the file system to get either the mapping between the
1624   * region name and its best locality region server or the degree of locality of each region on
1625   * each of the servers having at least one block of that region. The output map parameters are
1626   * both optional. the configuration to use the table you wish to scan locality for the thread pool
1627   * size to use the map into which to put the locality degree mapping or null, must be a
1628   * thread-safe implementation in case of file system errors or interrupts
1629   */
1630  private static void getRegionLocalityMappingFromFS(final Configuration conf,
1631    final String desiredTable, int threadPoolSize,
1632    final Map<String, Map<String, Float>> regionDegreeLocalityMapping) throws IOException {
1633    final FileSystem fs = FileSystem.get(conf);
1634    final Path rootPath = CommonFSUtils.getRootDir(conf);
1635    final long startTime = EnvironmentEdgeManager.currentTime();
1636    final Path queryPath;
1637    // The table files are in ${hbase.rootdir}/data/<namespace>/<table>/*
1638    if (null == desiredTable) {
1639      queryPath =
1640        new Path(new Path(rootPath, HConstants.BASE_NAMESPACE_DIR).toString() + "/*/*/*/");
1641    } else {
1642      queryPath = new Path(
1643        CommonFSUtils.getTableDir(rootPath, TableName.valueOf(desiredTable)).toString() + "/*/");
1644    }
1645
1646    // reject all paths that are not appropriate
1647    PathFilter pathFilter = new PathFilter() {
1648      @Override
1649      public boolean accept(Path path) {
1650        // this is the region name; it may get some noise data
1651        if (null == path) {
1652          return false;
1653        }
1654
1655        // no parent?
1656        Path parent = path.getParent();
1657        if (null == parent) {
1658          return false;
1659        }
1660
1661        String regionName = path.getName();
1662        if (null == regionName) {
1663          return false;
1664        }
1665
1666        if (!regionName.toLowerCase(Locale.ROOT).matches("[0-9a-f]+")) {
1667          return false;
1668        }
1669        return true;
1670      }
1671    };
1672
1673    FileStatus[] statusList = fs.globStatus(queryPath, pathFilter);
1674
1675    if (LOG.isDebugEnabled()) {
1676      LOG.debug("Query Path: {} ; # list of files: {}", queryPath, Arrays.toString(statusList));
1677    }
1678
1679    if (null == statusList) {
1680      return;
1681    }
1682
1683    // lower the number of threads in case we have very few expected regions
1684    threadPoolSize = Math.min(threadPoolSize, statusList.length);
1685
1686    // run in multiple threads
1687    final ExecutorService tpe = Executors.newFixedThreadPool(threadPoolSize,
1688      new ThreadFactoryBuilder().setNameFormat("FSRegionQuery-pool-%d").setDaemon(true)
1689        .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
1690    try {
1691      // ignore all file status items that are not of interest
1692      for (FileStatus regionStatus : statusList) {
1693        if (null == regionStatus || !regionStatus.isDirectory()) {
1694          continue;
1695        }
1696
1697        final Path regionPath = regionStatus.getPath();
1698        if (null != regionPath) {
1699          tpe.execute(new FSRegionScanner(fs, regionPath, null, regionDegreeLocalityMapping));
1700        }
1701      }
1702    } finally {
1703      tpe.shutdown();
1704      final long threadWakeFrequency = (long) conf.getInt(HConstants.THREAD_WAKE_FREQUENCY,
1705        HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
1706      try {
1707        // here we wait until TPE terminates, which is either naturally or by
1708        // exceptions in the execution of the threads
1709        while (!tpe.awaitTermination(threadWakeFrequency, TimeUnit.MILLISECONDS)) {
1710          // printing out rough estimate, so as to not introduce
1711          // AtomicInteger
1712          LOG.info("Locality checking is underway: { Scanned Regions : "
1713            + ((ThreadPoolExecutor) tpe).getCompletedTaskCount() + "/"
1714            + ((ThreadPoolExecutor) tpe).getTaskCount() + " }");
1715        }
1716      } catch (InterruptedException e) {
1717        Thread.currentThread().interrupt();
1718        throw (InterruptedIOException) new InterruptedIOException().initCause(e);
1719      }
1720    }
1721
1722    long overhead = EnvironmentEdgeManager.currentTime() - startTime;
1723    LOG.info("Scan DFS for locality info takes {}ms", overhead);
1724  }
1725
1726  /**
1727   * Do our short circuit read setup. Checks buffer size to use and whether to do checksumming in
1728   * hbase or hdfs.
1729   */
1730  public static void setupShortCircuitRead(final Configuration conf) {
1731    // Check that the user has not set the "dfs.client.read.shortcircuit.skip.checksum" property.
1732    boolean shortCircuitSkipChecksum =
1733      conf.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false);
1734    boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
1735    if (shortCircuitSkipChecksum) {
1736      LOG.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not "
1737        + "be set to true."
1738        + (useHBaseChecksum
1739          ? " HBase checksum doesn't require "
1740            + "it, see https://issues.apache.org/jira/browse/HBASE-6868."
1741          : ""));
1742      assert !shortCircuitSkipChecksum; // this will fail if assertions are on
1743    }
1744    checkShortCircuitReadBufferSize(conf);
1745  }
1746
1747  /**
1748   * Check if short circuit read buffer size is set and if not, set it to hbase value.
1749   */
1750  public static void checkShortCircuitReadBufferSize(final Configuration conf) {
1751    final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2;
1752    final int notSet = -1;
1753    // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2
1754    final String dfsKey = "dfs.client.read.shortcircuit.buffer.size";
1755    int size = conf.getInt(dfsKey, notSet);
1756    // If a size is set, return -- we will use it.
1757    if (size != notSet) return;
1758    // But short circuit buffer size is normally not set. Put in place the hbase wanted size.
1759    int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize);
1760    conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
1761  }
1762
1763  /**
1764   * Returns The DFSClient DFSHedgedReadMetrics instance or null if can't be found or not on hdfs.
1765   */
1766  public static DFSHedgedReadMetrics getDFSHedgedReadMetrics(final Configuration c)
1767    throws IOException {
1768    if (!CommonFSUtils.isHDFS(c)) {
1769      return null;
1770    }
1771    // getHedgedReadMetrics is package private. Get the DFSClient instance that is internal
1772    // to the DFS FS instance and make the method getHedgedReadMetrics accessible, then invoke it
1773    // to get the singleton instance of DFSHedgedReadMetrics shared by DFSClients.
1774    final String name = "getHedgedReadMetrics";
1775    DFSClient dfsclient = ((DistributedFileSystem) FileSystem.get(c)).getClient();
1776    Method m;
1777    try {
1778      m = dfsclient.getClass().getDeclaredMethod(name);
1779    } catch (NoSuchMethodException e) {
1780      LOG.warn(
1781        "Failed find method " + name + " in dfsclient; no hedged read metrics: " + e.getMessage());
1782      return null;
1783    } catch (SecurityException e) {
1784      LOG.warn(
1785        "Failed find method " + name + " in dfsclient; no hedged read metrics: " + e.getMessage());
1786      return null;
1787    }
1788    m.setAccessible(true);
1789    try {
1790      return (DFSHedgedReadMetrics) m.invoke(dfsclient);
1791    } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
1792      LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: "
1793        + e.getMessage());
1794      return null;
1795    }
1796  }
1797
1798  public static List<Path> copyFilesParallel(FileSystem srcFS, Path src, FileSystem dstFS, Path dst,
1799    Configuration conf, int threads) throws IOException {
1800    ExecutorService pool = Executors.newFixedThreadPool(threads);
1801    List<Future<Void>> futures = new ArrayList<>();
1802    List<Path> traversedPaths;
1803    try {
1804      traversedPaths = copyFiles(srcFS, src, dstFS, dst, conf, pool, futures);
1805      for (Future<Void> future : futures) {
1806        future.get();
1807      }
1808    } catch (ExecutionException | InterruptedException | IOException e) {
1809      throw new IOException("Copy snapshot reference files failed", e);
1810    } finally {
1811      pool.shutdownNow();
1812    }
1813    return traversedPaths;
1814  }
1815
1816  private static List<Path> copyFiles(FileSystem srcFS, Path src, FileSystem dstFS, Path dst,
1817    Configuration conf, ExecutorService pool, List<Future<Void>> futures) throws IOException {
1818    List<Path> traversedPaths = new ArrayList<>();
1819    traversedPaths.add(dst);
1820    FileStatus currentFileStatus = srcFS.getFileStatus(src);
1821    if (currentFileStatus.isDirectory()) {
1822      if (!dstFS.mkdirs(dst)) {
1823        throw new IOException("Create directory failed: " + dst);
1824      }
1825      FileStatus[] subPaths = srcFS.listStatus(src);
1826      for (FileStatus subPath : subPaths) {
1827        traversedPaths.addAll(copyFiles(srcFS, subPath.getPath(), dstFS,
1828          new Path(dst, subPath.getPath().getName()), conf, pool, futures));
1829      }
1830    } else {
1831      Future<Void> future = pool.submit(() -> {
1832        FileUtil.copy(srcFS, src, dstFS, dst, false, false, conf);
1833        return null;
1834      });
1835      futures.add(future);
1836    }
1837    return traversedPaths;
1838  }
1839
1840  /** Returns A set containing all namenode addresses of fs */
1841  private static Set<InetSocketAddress> getNNAddresses(DistributedFileSystem fs,
1842    Configuration conf) {
1843    Set<InetSocketAddress> addresses = new HashSet<>();
1844    String serviceName = fs.getCanonicalServiceName();
1845
1846    if (serviceName.startsWith("ha-hdfs")) {
1847      try {
1848        Map<String, Map<String, InetSocketAddress>> addressMap =
1849          DFSUtil.getNNServiceRpcAddressesForCluster(conf);
1850        String nameService = serviceName.substring(serviceName.indexOf(":") + 1);
1851        if (addressMap.containsKey(nameService)) {
1852          Map<String, InetSocketAddress> nnMap = addressMap.get(nameService);
1853          for (Map.Entry<String, InetSocketAddress> e2 : nnMap.entrySet()) {
1854            InetSocketAddress addr = e2.getValue();
1855            addresses.add(addr);
1856          }
1857        }
1858      } catch (Exception e) {
1859        LOG.warn("DFSUtil.getNNServiceRpcAddresses failed. serviceName=" + serviceName, e);
1860      }
1861    } else {
1862      URI uri = fs.getUri();
1863      int port = uri.getPort();
1864      if (port < 0) {
1865        int idx = serviceName.indexOf(':');
1866        port = Integer.parseInt(serviceName.substring(idx + 1));
1867      }
1868      InetSocketAddress addr = new InetSocketAddress(uri.getHost(), port);
1869      addresses.add(addr);
1870    }
1871
1872    return addresses;
1873  }
1874
1875  /**
1876   * @param conf the Configuration of HBase
1877   * @return Whether srcFs and desFs are on same hdfs or not
1878   */
1879  public static boolean isSameHdfs(Configuration conf, FileSystem srcFs, FileSystem desFs) {
1880    // By getCanonicalServiceName, we could make sure both srcFs and desFs
1881    // show a unified format which contains scheme, host and port.
1882    String srcServiceName = srcFs.getCanonicalServiceName();
1883    String desServiceName = desFs.getCanonicalServiceName();
1884
1885    if (srcServiceName == null || desServiceName == null) {
1886      return false;
1887    }
1888    if (srcServiceName.equals(desServiceName)) {
1889      return true;
1890    }
1891    if (srcServiceName.startsWith("ha-hdfs") && desServiceName.startsWith("ha-hdfs")) {
1892      Collection<String> internalNameServices =
1893        conf.getTrimmedStringCollection("dfs.internal.nameservices");
1894      if (!internalNameServices.isEmpty()) {
1895        if (internalNameServices.contains(srcServiceName.split(":")[1])) {
1896          return true;
1897        } else {
1898          return false;
1899        }
1900      }
1901    }
1902    if (srcFs instanceof DistributedFileSystem && desFs instanceof DistributedFileSystem) {
1903      // If one serviceName is an HA format while the other is a non-HA format,
1904      // maybe they refer to the same FileSystem.
1905      // For example, srcFs is "ha-hdfs://nameservices" and desFs is "hdfs://activeNamenode:port"
1906      Set<InetSocketAddress> srcAddrs = getNNAddresses((DistributedFileSystem) srcFs, conf);
1907      Set<InetSocketAddress> desAddrs = getNNAddresses((DistributedFileSystem) desFs, conf);
1908      if (Sets.intersection(srcAddrs, desAddrs).size() > 0) {
1909        return true;
1910      }
1911    }
1912
1913    return false;
1914  }
1915}