001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.util;
020
021import edu.umd.cs.findbugs.annotations.CheckForNull;
022import java.io.ByteArrayInputStream;
023import java.io.DataInputStream;
024import java.io.EOFException;
025import java.io.FileNotFoundException;
026import java.io.IOException;
027import java.io.InterruptedIOException;
028import java.lang.reflect.InvocationTargetException;
029import java.lang.reflect.Method;
030import java.net.InetSocketAddress;
031import java.util.ArrayList;
032import java.util.Arrays;
033import java.util.Collections;
034import java.util.HashMap;
035import java.util.Iterator;
036import java.util.List;
037import java.util.Locale;
038import java.util.Map;
039import java.util.Vector;
040import java.util.concurrent.ConcurrentHashMap;
041import java.util.concurrent.ExecutionException;
042import java.util.concurrent.ExecutorService;
043import java.util.concurrent.Executors;
044import java.util.concurrent.Future;
045import java.util.concurrent.FutureTask;
046import java.util.concurrent.ThreadPoolExecutor;
047import java.util.concurrent.TimeUnit;
048import java.util.regex.Pattern;
049
050import org.apache.commons.lang3.ArrayUtils;
051import org.apache.hadoop.conf.Configuration;
052import org.apache.hadoop.fs.BlockLocation;
053import org.apache.hadoop.fs.FSDataInputStream;
054import org.apache.hadoop.fs.FSDataOutputStream;
055import org.apache.hadoop.fs.FileStatus;
056import org.apache.hadoop.fs.FileSystem;
057import org.apache.hadoop.fs.FileUtil;
058import org.apache.hadoop.fs.Path;
059import org.apache.hadoop.fs.PathFilter;
060import org.apache.hadoop.fs.permission.FsAction;
061import org.apache.hadoop.fs.permission.FsPermission;
062import org.apache.hadoop.hbase.ClusterId;
063import org.apache.hadoop.hbase.HColumnDescriptor;
064import org.apache.hadoop.hbase.HConstants;
065import org.apache.hadoop.hbase.HDFSBlocksDistribution;
066import org.apache.hadoop.hbase.HRegionInfo;
067import org.apache.hadoop.hbase.TableName;
068import org.apache.hadoop.hbase.client.RegionInfo;
069import org.apache.hadoop.hbase.client.RegionInfoBuilder;
070import org.apache.hadoop.hbase.exceptions.DeserializationException;
071import org.apache.hadoop.hbase.fs.HFileSystem;
072import org.apache.hadoop.hbase.io.HFileLink;
073import org.apache.hadoop.hbase.master.HMaster;
074import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
075import org.apache.hadoop.hbase.security.AccessDeniedException;
076import org.apache.hadoop.hdfs.DFSClient;
077import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
078import org.apache.hadoop.hdfs.DistributedFileSystem;
079import org.apache.hadoop.hdfs.protocol.HdfsConstants;
080import org.apache.hadoop.io.IOUtils;
081import org.apache.hadoop.ipc.RemoteException;
082import org.apache.hadoop.security.UserGroupInformation;
083import org.apache.hadoop.util.Progressable;
084import org.apache.hadoop.util.ReflectionUtils;
085import org.apache.hadoop.util.StringUtils;
086import org.apache.yetus.audience.InterfaceAudience;
087import org.slf4j.Logger;
088import org.slf4j.LoggerFactory;
089
090import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
091import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
092import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
093import org.apache.hbase.thirdparty.com.google.common.primitives.Ints;
094
095import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
096import org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos;
097
098/**
099 * Utility methods for interacting with the underlying file system.
100 */
101@InterfaceAudience.Private
102public abstract class FSUtils extends CommonFSUtils {
103  private static final Logger LOG = LoggerFactory.getLogger(FSUtils.class);
104
105  private static final String THREAD_POOLSIZE = "hbase.client.localityCheck.threadPoolSize";
106  private static final int DEFAULT_THREAD_POOLSIZE = 2;
107
108  /** Set to true on Windows platforms */
109  @VisibleForTesting // currently only used in testing. TODO refactor into a test class
110  public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
111
112  protected FSUtils() {
113    super();
114  }
115
116  /**
117   * @return True is <code>fs</code> is instance of DistributedFileSystem
118   * @throws IOException
119   */
120  public static boolean isDistributedFileSystem(final FileSystem fs) throws IOException {
121    FileSystem fileSystem = fs;
122    // If passed an instance of HFileSystem, it fails instanceof DistributedFileSystem.
123    // Check its backing fs for dfs-ness.
124    if (fs instanceof HFileSystem) {
125      fileSystem = ((HFileSystem)fs).getBackingFs();
126    }
127    return fileSystem instanceof DistributedFileSystem;
128  }
129
130  /**
131   * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
132   * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true.  Does not consider
133   * schema; i.e. if schemas different but path or subpath matches, the two will equate.
134   * @param pathToSearch Path we will be trying to match.
135   * @param pathTail
136   * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
137   */
138  public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) {
139    Path tailPath = pathTail;
140    String tailName;
141    Path toSearch = pathToSearch;
142    String toSearchName;
143    boolean result = false;
144
145    if (pathToSearch.depth() != pathTail.depth()) {
146      return false;
147    }
148
149    do {
150      tailName = tailPath.getName();
151      if (tailName == null || tailName.isEmpty()) {
152        result = true;
153        break;
154      }
155      toSearchName = toSearch.getName();
156      if (toSearchName == null || toSearchName.isEmpty()) {
157        break;
158      }
159      // Move up a parent on each path for next go around.  Path doesn't let us go off the end.
160      tailPath = tailPath.getParent();
161      toSearch = toSearch.getParent();
162    } while(tailName.equals(toSearchName));
163    return result;
164  }
165
166  public static FSUtils getInstance(FileSystem fs, Configuration conf) {
167    String scheme = fs.getUri().getScheme();
168    if (scheme == null) {
169      LOG.warn("Could not find scheme for uri " +
170          fs.getUri() + ", default to hdfs");
171      scheme = "hdfs";
172    }
173    Class<?> fsUtilsClass = conf.getClass("hbase.fsutil." +
174        scheme + ".impl", FSHDFSUtils.class); // Default to HDFS impl
175    FSUtils fsUtils = (FSUtils)ReflectionUtils.newInstance(fsUtilsClass, conf);
176    return fsUtils;
177  }
178
179  /**
180   * Delete the region directory if exists.
181   * @param conf
182   * @param hri
183   * @return True if deleted the region directory.
184   * @throws IOException
185   */
186  public static boolean deleteRegionDir(final Configuration conf, final HRegionInfo hri)
187  throws IOException {
188    Path rootDir = getRootDir(conf);
189    FileSystem fs = rootDir.getFileSystem(conf);
190    return deleteDirectory(fs,
191      new Path(getTableDir(rootDir, hri.getTable()), hri.getEncodedName()));
192  }
193
194 /**
195   * Create the specified file on the filesystem. By default, this will:
196   * <ol>
197   * <li>overwrite the file if it exists</li>
198   * <li>apply the umask in the configuration (if it is enabled)</li>
199   * <li>use the fs configured buffer size (or 4096 if not set)</li>
200   * <li>use the configured column family replication or default replication if
201   * {@link HColumnDescriptor#DEFAULT_DFS_REPLICATION}</li>
202   * <li>use the default block size</li>
203   * <li>not track progress</li>
204   * </ol>
205   * @param conf configurations
206   * @param fs {@link FileSystem} on which to write the file
207   * @param path {@link Path} to the file to write
208   * @param perm permissions
209   * @param favoredNodes
210   * @return output stream to the created file
211   * @throws IOException if the file cannot be created
212   */
213  public static FSDataOutputStream create(Configuration conf, FileSystem fs, Path path,
214      FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException {
215    if (fs instanceof HFileSystem) {
216      FileSystem backingFs = ((HFileSystem)fs).getBackingFs();
217      if (backingFs instanceof DistributedFileSystem) {
218        // Try to use the favoredNodes version via reflection to allow backwards-
219        // compatibility.
220        short replication = Short.parseShort(conf.get(HColumnDescriptor.DFS_REPLICATION,
221          String.valueOf(HColumnDescriptor.DEFAULT_DFS_REPLICATION)));
222        try {
223          return (FSDataOutputStream) (DistributedFileSystem.class.getDeclaredMethod("create",
224            Path.class, FsPermission.class, boolean.class, int.class, short.class, long.class,
225            Progressable.class, InetSocketAddress[].class).invoke(backingFs, path, perm, true,
226            getDefaultBufferSize(backingFs),
227            replication > 0 ? replication : getDefaultReplication(backingFs, path),
228            getDefaultBlockSize(backingFs, path), null, favoredNodes));
229        } catch (InvocationTargetException ite) {
230          // Function was properly called, but threw it's own exception.
231          throw new IOException(ite.getCause());
232        } catch (NoSuchMethodException e) {
233          LOG.debug("DFS Client does not support most favored nodes create; using default create");
234          LOG.trace("Ignoring; use default create", e);
235        } catch (IllegalArgumentException | SecurityException |  IllegalAccessException e) {
236          LOG.debug("Ignoring (most likely Reflection related exception) " + e);
237        }
238      }
239    }
240    return create(fs, path, perm, true);
241  }
242
243  /**
244   * Checks to see if the specified file system is available
245   *
246   * @param fs filesystem
247   * @throws IOException e
248   */
249  public static void checkFileSystemAvailable(final FileSystem fs)
250  throws IOException {
251    if (!(fs instanceof DistributedFileSystem)) {
252      return;
253    }
254    IOException exception = null;
255    DistributedFileSystem dfs = (DistributedFileSystem) fs;
256    try {
257      if (dfs.exists(new Path("/"))) {
258        return;
259      }
260    } catch (IOException e) {
261      exception = e instanceof RemoteException ?
262              ((RemoteException)e).unwrapRemoteException() : e;
263    }
264    try {
265      fs.close();
266    } catch (Exception e) {
267      LOG.error("file system close failed: ", e);
268    }
269    throw new IOException("File system is not available", exception);
270  }
271
272  /**
273   * We use reflection because {@link DistributedFileSystem#setSafeMode(
274   * HdfsConstants.SafeModeAction action, boolean isChecked)} is not in hadoop 1.1
275   *
276   * @param dfs
277   * @return whether we're in safe mode
278   * @throws IOException
279   */
280  private static boolean isInSafeMode(DistributedFileSystem dfs) throws IOException {
281    boolean inSafeMode = false;
282    try {
283      Method m = DistributedFileSystem.class.getMethod("setSafeMode", new Class<?> []{
284          org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.class, boolean.class});
285      inSafeMode = (Boolean) m.invoke(dfs,
286        org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET, true);
287    } catch (Exception e) {
288      if (e instanceof IOException) throw (IOException) e;
289
290      // Check whether dfs is on safemode.
291      inSafeMode = dfs.setSafeMode(
292        org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET);
293    }
294    return inSafeMode;
295  }
296
297  /**
298   * Check whether dfs is in safemode.
299   * @param conf
300   * @throws IOException
301   */
302  public static void checkDfsSafeMode(final Configuration conf)
303  throws IOException {
304    boolean isInSafeMode = false;
305    FileSystem fs = FileSystem.get(conf);
306    if (fs instanceof DistributedFileSystem) {
307      DistributedFileSystem dfs = (DistributedFileSystem)fs;
308      isInSafeMode = isInSafeMode(dfs);
309    }
310    if (isInSafeMode) {
311      throw new IOException("File system is in safemode, it can't be written now");
312    }
313  }
314
315  /**
316   * Verifies current version of file system
317   *
318   * @param fs filesystem object
319   * @param rootdir root hbase directory
320   * @return null if no version file exists, version string otherwise
321   * @throws IOException if the version file fails to open
322   * @throws DeserializationException if the version data cannot be translated into a version
323   */
324  public static String getVersion(FileSystem fs, Path rootdir)
325  throws IOException, DeserializationException {
326    final Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
327    FileStatus[] status = null;
328    try {
329      // hadoop 2.0 throws FNFE if directory does not exist.
330      // hadoop 1.0 returns null if directory does not exist.
331      status = fs.listStatus(versionFile);
332    } catch (FileNotFoundException fnfe) {
333      return null;
334    }
335    if (ArrayUtils.getLength(status) == 0) {
336      return null;
337    }
338    String version = null;
339    byte [] content = new byte [(int)status[0].getLen()];
340    FSDataInputStream s = fs.open(versionFile);
341    try {
342      IOUtils.readFully(s, content, 0, content.length);
343      if (ProtobufUtil.isPBMagicPrefix(content)) {
344        version = parseVersionFrom(content);
345      } else {
346        // Presume it pre-pb format.
347        try (DataInputStream dis = new DataInputStream(new ByteArrayInputStream(content))) {
348          version = dis.readUTF();
349        }
350      }
351    } catch (EOFException eof) {
352      LOG.warn("Version file was empty, odd, will try to set it.");
353    } finally {
354      s.close();
355    }
356    return version;
357  }
358
359  /**
360   * Parse the content of the ${HBASE_ROOTDIR}/hbase.version file.
361   * @param bytes The byte content of the hbase.version file
362   * @return The version found in the file as a String
363   * @throws DeserializationException if the version data cannot be translated into a version
364   */
365  static String parseVersionFrom(final byte [] bytes)
366  throws DeserializationException {
367    ProtobufUtil.expectPBMagicPrefix(bytes);
368    int pblen = ProtobufUtil.lengthOfPBMagic();
369    FSProtos.HBaseVersionFileContent.Builder builder =
370      FSProtos.HBaseVersionFileContent.newBuilder();
371    try {
372      ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
373      return builder.getVersion();
374    } catch (IOException e) {
375      // Convert
376      throw new DeserializationException(e);
377    }
378  }
379
380  /**
381   * Create the content to write into the ${HBASE_ROOTDIR}/hbase.version file.
382   * @param version Version to persist
383   * @return Serialized protobuf with <code>version</code> content and a bit of pb magic for a prefix.
384   */
385  static byte [] toVersionByteArray(final String version) {
386    FSProtos.HBaseVersionFileContent.Builder builder =
387      FSProtos.HBaseVersionFileContent.newBuilder();
388    return ProtobufUtil.prependPBMagic(builder.setVersion(version).build().toByteArray());
389  }
390
391  /**
392   * Verifies current version of file system
393   *
394   * @param fs file system
395   * @param rootdir root directory of HBase installation
396   * @param message if true, issues a message on System.out
397   * @throws IOException if the version file cannot be opened
398   * @throws DeserializationException if the contents of the version file cannot be parsed
399   */
400  public static void checkVersion(FileSystem fs, Path rootdir, boolean message)
401  throws IOException, DeserializationException {
402    checkVersion(fs, rootdir, message, 0, HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
403  }
404
405  /**
406   * Verifies current version of file system
407   *
408   * @param fs file system
409   * @param rootdir root directory of HBase installation
410   * @param message if true, issues a message on System.out
411   * @param wait wait interval
412   * @param retries number of times to retry
413   *
414   * @throws IOException if the version file cannot be opened
415   * @throws DeserializationException if the contents of the version file cannot be parsed
416   */
417  public static void checkVersion(FileSystem fs, Path rootdir,
418      boolean message, int wait, int retries)
419  throws IOException, DeserializationException {
420    String version = getVersion(fs, rootdir);
421    String msg;
422    if (version == null) {
423      if (!metaRegionExists(fs, rootdir)) {
424        // rootDir is empty (no version file and no root region)
425        // just create new version file (HBASE-1195)
426        setVersion(fs, rootdir, wait, retries);
427        return;
428      } else {
429        msg = "hbase.version file is missing. Is your hbase.rootdir valid? " +
430            "You can restore hbase.version file by running 'HBCK2 filesystem -fix'. " +
431            "See https://github.com/apache/hbase-operator-tools/tree/master/hbase-hbck2";
432      }
433    } else if (version.compareTo(HConstants.FILE_SYSTEM_VERSION) == 0) {
434      return;
435    } else {
436      msg = "HBase file layout needs to be upgraded. Current filesystem version is " + version +
437          " but software requires version " + HConstants.FILE_SYSTEM_VERSION +
438          ". Consult http://hbase.apache.org/book.html for further information about " +
439          "upgrading HBase.";
440    }
441
442    // version is deprecated require migration
443    // Output on stdout so user sees it in terminal.
444    if (message) {
445      System.out.println("WARNING! " + msg);
446    }
447    throw new FileSystemVersionException(msg);
448  }
449
450  /**
451   * Sets version of file system
452   *
453   * @param fs filesystem object
454   * @param rootdir hbase root
455   * @throws IOException e
456   */
457  public static void setVersion(FileSystem fs, Path rootdir)
458  throws IOException {
459    setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, 0,
460      HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
461  }
462
463  /**
464   * Sets version of file system
465   *
466   * @param fs filesystem object
467   * @param rootdir hbase root
468   * @param wait time to wait for retry
469   * @param retries number of times to retry before failing
470   * @throws IOException e
471   */
472  public static void setVersion(FileSystem fs, Path rootdir, int wait, int retries)
473  throws IOException {
474    setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, wait, retries);
475  }
476
477
478  /**
479   * Sets version of file system
480   *
481   * @param fs filesystem object
482   * @param rootdir hbase root directory
483   * @param version version to set
484   * @param wait time to wait for retry
485   * @param retries number of times to retry before throwing an IOException
486   * @throws IOException e
487   */
488  public static void setVersion(FileSystem fs, Path rootdir, String version,
489      int wait, int retries) throws IOException {
490    Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
491    Path tempVersionFile = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY + Path.SEPARATOR +
492      HConstants.VERSION_FILE_NAME);
493    while (true) {
494      try {
495        // Write the version to a temporary file
496        FSDataOutputStream s = fs.create(tempVersionFile);
497        try {
498          s.write(toVersionByteArray(version));
499          s.close();
500          s = null;
501          // Move the temp version file to its normal location. Returns false
502          // if the rename failed. Throw an IOE in that case.
503          if (!fs.rename(tempVersionFile, versionFile)) {
504            throw new IOException("Unable to move temp version file to " + versionFile);
505          }
506        } finally {
507          // Cleaning up the temporary if the rename failed would be trying
508          // too hard. We'll unconditionally create it again the next time
509          // through anyway, files are overwritten by default by create().
510
511          // Attempt to close the stream on the way out if it is still open.
512          try {
513            if (s != null) s.close();
514          } catch (IOException ignore) { }
515        }
516        LOG.info("Created version file at " + rootdir.toString() + " with version=" + version);
517        return;
518      } catch (IOException e) {
519        if (retries > 0) {
520          LOG.debug("Unable to create version file at " + rootdir.toString() + ", retrying", e);
521          fs.delete(versionFile, false);
522          try {
523            if (wait > 0) {
524              Thread.sleep(wait);
525            }
526          } catch (InterruptedException ie) {
527            throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
528          }
529          retries--;
530        } else {
531          throw e;
532        }
533      }
534    }
535  }
536
537  /**
538   * Checks that a cluster ID file exists in the HBase root directory
539   * @param fs the root directory FileSystem
540   * @param rootdir the HBase root directory in HDFS
541   * @param wait how long to wait between retries
542   * @return <code>true</code> if the file exists, otherwise <code>false</code>
543   * @throws IOException if checking the FileSystem fails
544   */
545  public static boolean checkClusterIdExists(FileSystem fs, Path rootdir,
546      long wait) throws IOException {
547    while (true) {
548      try {
549        Path filePath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
550        return fs.exists(filePath);
551      } catch (IOException ioe) {
552        if (wait > 0L) {
553          LOG.warn("Unable to check cluster ID file in {}, retrying in {}ms", rootdir, wait, ioe);
554          try {
555            Thread.sleep(wait);
556          } catch (InterruptedException e) {
557            Thread.currentThread().interrupt();
558            throw (InterruptedIOException) new InterruptedIOException().initCause(e);
559          }
560        } else {
561          throw ioe;
562        }
563      }
564    }
565  }
566
567  /**
568   * Returns the value of the unique cluster ID stored for this HBase instance.
569   * @param fs the root directory FileSystem
570   * @param rootdir the path to the HBase root directory
571   * @return the unique cluster identifier
572   * @throws IOException if reading the cluster ID file fails
573   */
574  public static ClusterId getClusterId(FileSystem fs, Path rootdir)
575  throws IOException {
576    Path idPath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
577    ClusterId clusterId = null;
578    FileStatus status = fs.exists(idPath)? fs.getFileStatus(idPath):  null;
579    if (status != null) {
580      int len = Ints.checkedCast(status.getLen());
581      byte [] content = new byte[len];
582      FSDataInputStream in = fs.open(idPath);
583      try {
584        in.readFully(content);
585      } catch (EOFException eof) {
586        LOG.warn("Cluster ID file {} is empty", idPath);
587      } finally{
588        in.close();
589      }
590      try {
591        clusterId = ClusterId.parseFrom(content);
592      } catch (DeserializationException e) {
593        throw new IOException("content=" + Bytes.toString(content), e);
594      }
595      // If not pb'd, make it so.
596      if (!ProtobufUtil.isPBMagicPrefix(content)) {
597        String cid = null;
598        in = fs.open(idPath);
599        try {
600          cid = in.readUTF();
601          clusterId = new ClusterId(cid);
602        } catch (EOFException eof) {
603          LOG.warn("Cluster ID file {} is empty", idPath);
604        } finally {
605          in.close();
606        }
607        rewriteAsPb(fs, rootdir, idPath, clusterId);
608      }
609      return clusterId;
610    } else {
611      LOG.warn("Cluster ID file does not exist at {}", idPath);
612    }
613    return clusterId;
614  }
615
616  /**
617   * @param cid
618   * @throws IOException
619   */
620  private static void rewriteAsPb(final FileSystem fs, final Path rootdir, final Path p,
621      final ClusterId cid)
622  throws IOException {
623    // Rewrite the file as pb.  Move aside the old one first, write new
624    // then delete the moved-aside file.
625    Path movedAsideName = new Path(p + "." + System.currentTimeMillis());
626    if (!fs.rename(p, movedAsideName)) throw new IOException("Failed rename of " + p);
627    setClusterId(fs, rootdir, cid, 100);
628    if (!fs.delete(movedAsideName, false)) {
629      throw new IOException("Failed delete of " + movedAsideName);
630    }
631    LOG.debug("Rewrote the hbase.id file as pb");
632  }
633
634  /**
635   * Writes a new unique identifier for this cluster to the "hbase.id" file
636   * in the HBase root directory
637   * @param fs the root directory FileSystem
638   * @param rootdir the path to the HBase root directory
639   * @param clusterId the unique identifier to store
640   * @param wait how long (in milliseconds) to wait between retries
641   * @throws IOException if writing to the FileSystem fails and no wait value
642   */
643  public static void setClusterId(FileSystem fs, Path rootdir, ClusterId clusterId,
644      int wait) throws IOException {
645    while (true) {
646      try {
647        Path idFile = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
648        Path tempIdFile = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY +
649          Path.SEPARATOR + HConstants.CLUSTER_ID_FILE_NAME);
650        // Write the id file to a temporary location
651        FSDataOutputStream s = fs.create(tempIdFile);
652        try {
653          s.write(clusterId.toByteArray());
654          s.close();
655          s = null;
656          // Move the temporary file to its normal location. Throw an IOE if
657          // the rename failed
658          if (!fs.rename(tempIdFile, idFile)) {
659            throw new IOException("Unable to move temp version file to " + idFile);
660          }
661        } finally {
662          // Attempt to close the stream if still open on the way out
663          try {
664            if (s != null) s.close();
665          } catch (IOException ignore) { }
666        }
667        if (LOG.isDebugEnabled()) {
668          LOG.debug("Created cluster ID file at " + idFile.toString() + " with ID: " + clusterId);
669        }
670        return;
671      } catch (IOException ioe) {
672        if (wait > 0) {
673          LOG.warn("Unable to create cluster ID file in " + rootdir.toString() +
674              ", retrying in " + wait + "msec: " + StringUtils.stringifyException(ioe));
675          try {
676            Thread.sleep(wait);
677          } catch (InterruptedException e) {
678            throw (InterruptedIOException)new InterruptedIOException().initCause(e);
679          }
680        } else {
681          throw ioe;
682        }
683      }
684    }
685  }
686
687  /**
688   * If DFS, check safe mode and if so, wait until we clear it.
689   * @param conf configuration
690   * @param wait Sleep between retries
691   * @throws IOException e
692   */
693  public static void waitOnSafeMode(final Configuration conf,
694    final long wait)
695  throws IOException {
696    FileSystem fs = FileSystem.get(conf);
697    if (!(fs instanceof DistributedFileSystem)) return;
698    DistributedFileSystem dfs = (DistributedFileSystem)fs;
699    // Make sure dfs is not in safe mode
700    while (isInSafeMode(dfs)) {
701      LOG.info("Waiting for dfs to exit safe mode...");
702      try {
703        Thread.sleep(wait);
704      } catch (InterruptedException e) {
705        Thread.currentThread().interrupt();
706        throw (InterruptedIOException) new InterruptedIOException().initCause(e);
707      }
708    }
709  }
710
711  /**
712   * Checks if meta region exists
713   * @param fs file system
714   * @param rootDir root directory of HBase installation
715   * @return true if exists
716   */
717  public static boolean metaRegionExists(FileSystem fs, Path rootDir) throws IOException {
718    Path metaRegionDir = getRegionDirFromRootDir(rootDir, RegionInfoBuilder.FIRST_META_REGIONINFO);
719    return fs.exists(metaRegionDir);
720  }
721
722  /**
723   * Compute HDFS blocks distribution of a given file, or a portion of the file
724   * @param fs file system
725   * @param status file status of the file
726   * @param start start position of the portion
727   * @param length length of the portion
728   * @return The HDFS blocks distribution
729   */
730  static public HDFSBlocksDistribution computeHDFSBlocksDistribution(
731    final FileSystem fs, FileStatus status, long start, long length)
732    throws IOException {
733    HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution();
734    BlockLocation [] blockLocations =
735      fs.getFileBlockLocations(status, start, length);
736    for(BlockLocation bl : blockLocations) {
737      String [] hosts = bl.getHosts();
738      long len = bl.getLength();
739      blocksDistribution.addHostsAndBlockWeight(hosts, len);
740    }
741
742    return blocksDistribution;
743  }
744
745  /**
746   * Update blocksDistribution with blockLocations
747   * @param blocksDistribution the hdfs blocks distribution
748   * @param blockLocations an array containing block location
749   */
750  static public void addToHDFSBlocksDistribution(
751      HDFSBlocksDistribution blocksDistribution, BlockLocation[] blockLocations)
752      throws IOException {
753    for (BlockLocation bl : blockLocations) {
754      String[] hosts = bl.getHosts();
755      long len = bl.getLength();
756      blocksDistribution.addHostsAndBlockWeight(hosts, len);
757    }
758  }
759
760  // TODO move this method OUT of FSUtils. No dependencies to HMaster
761  /**
762   * Returns the total overall fragmentation percentage. Includes hbase:meta and
763   * -ROOT- as well.
764   *
765   * @param master  The master defining the HBase root and file system
766   * @return A map for each table and its percentage (never null)
767   * @throws IOException When scanning the directory fails
768   */
769  public static int getTotalTableFragmentation(final HMaster master)
770  throws IOException {
771    Map<String, Integer> map = getTableFragmentation(master);
772    return map.isEmpty() ? -1 :  map.get("-TOTAL-");
773  }
774
775  /**
776   * Runs through the HBase rootdir and checks how many stores for each table
777   * have more than one file in them. Checks -ROOT- and hbase:meta too. The total
778   * percentage across all tables is stored under the special key "-TOTAL-".
779   *
780   * @param master  The master defining the HBase root and file system.
781   * @return A map for each table and its percentage (never null).
782   *
783   * @throws IOException When scanning the directory fails.
784   */
785  public static Map<String, Integer> getTableFragmentation(
786    final HMaster master)
787  throws IOException {
788    Path path = getRootDir(master.getConfiguration());
789    // since HMaster.getFileSystem() is package private
790    FileSystem fs = path.getFileSystem(master.getConfiguration());
791    return getTableFragmentation(fs, path);
792  }
793
794  /**
795   * Runs through the HBase rootdir and checks how many stores for each table
796   * have more than one file in them. Checks -ROOT- and hbase:meta too. The total
797   * percentage across all tables is stored under the special key "-TOTAL-".
798   *
799   * @param fs  The file system to use
800   * @param hbaseRootDir  The root directory to scan
801   * @return A map for each table and its percentage (never null)
802   * @throws IOException When scanning the directory fails
803   */
804  public static Map<String, Integer> getTableFragmentation(
805    final FileSystem fs, final Path hbaseRootDir)
806  throws IOException {
807    Map<String, Integer> frags = new HashMap<>();
808    int cfCountTotal = 0;
809    int cfFragTotal = 0;
810    PathFilter regionFilter = new RegionDirFilter(fs);
811    PathFilter familyFilter = new FamilyDirFilter(fs);
812    List<Path> tableDirs = getTableDirs(fs, hbaseRootDir);
813    for (Path d : tableDirs) {
814      int cfCount = 0;
815      int cfFrag = 0;
816      FileStatus[] regionDirs = fs.listStatus(d, regionFilter);
817      for (FileStatus regionDir : regionDirs) {
818        Path dd = regionDir.getPath();
819        // else its a region name, now look in region for families
820        FileStatus[] familyDirs = fs.listStatus(dd, familyFilter);
821        for (FileStatus familyDir : familyDirs) {
822          cfCount++;
823          cfCountTotal++;
824          Path family = familyDir.getPath();
825          // now in family make sure only one file
826          FileStatus[] familyStatus = fs.listStatus(family);
827          if (familyStatus.length > 1) {
828            cfFrag++;
829            cfFragTotal++;
830          }
831        }
832      }
833      // compute percentage per table and store in result list
834      frags.put(FSUtils.getTableName(d).getNameAsString(),
835        cfCount == 0? 0: Math.round((float) cfFrag / cfCount * 100));
836    }
837    // set overall percentage for all tables
838    frags.put("-TOTAL-",
839      cfCountTotal == 0? 0: Math.round((float) cfFragTotal / cfCountTotal * 100));
840    return frags;
841  }
842
843  public static void renameFile(FileSystem fs, Path src, Path dst) throws IOException {
844    if (fs.exists(dst) && !fs.delete(dst, false)) {
845      throw new IOException("Can not delete " + dst);
846    }
847    if (!fs.rename(src, dst)) {
848      throw new IOException("Can not rename from " + src + " to " + dst);
849    }
850  }
851
852  /**
853   * A {@link PathFilter} that returns only regular files.
854   */
855  static class FileFilter extends AbstractFileStatusFilter {
856    private final FileSystem fs;
857
858    public FileFilter(final FileSystem fs) {
859      this.fs = fs;
860    }
861
862    @Override
863    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
864      try {
865        return isFile(fs, isDir, p);
866      } catch (IOException e) {
867        LOG.warn("Unable to verify if path={} is a regular file", p, e);
868        return false;
869      }
870    }
871  }
872
873  /**
874   * Directory filter that doesn't include any of the directories in the specified blacklist
875   */
876  public static class BlackListDirFilter extends AbstractFileStatusFilter {
877    private final FileSystem fs;
878    private List<String> blacklist;
879
880    /**
881     * Create a filter on the givem filesystem with the specified blacklist
882     * @param fs filesystem to filter
883     * @param directoryNameBlackList list of the names of the directories to filter. If
884     *          <tt>null</tt>, all directories are returned
885     */
886    @SuppressWarnings("unchecked")
887    public BlackListDirFilter(final FileSystem fs, final List<String> directoryNameBlackList) {
888      this.fs = fs;
889      blacklist =
890        (List<String>) (directoryNameBlackList == null ? Collections.emptyList()
891          : directoryNameBlackList);
892    }
893
894    @Override
895    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
896      if (!isValidName(p.getName())) {
897        return false;
898      }
899
900      try {
901        return isDirectory(fs, isDir, p);
902      } catch (IOException e) {
903        LOG.warn("An error occurred while verifying if [{}] is a valid directory."
904            + " Returning 'not valid' and continuing.", p, e);
905        return false;
906      }
907    }
908
909    protected boolean isValidName(final String name) {
910      return !blacklist.contains(name);
911    }
912  }
913
914  /**
915   * A {@link PathFilter} that only allows directories.
916   */
917  public static class DirFilter extends BlackListDirFilter {
918
919    public DirFilter(FileSystem fs) {
920      super(fs, null);
921    }
922  }
923
924  /**
925   * A {@link PathFilter} that returns usertable directories. To get all directories use the
926   * {@link BlackListDirFilter} with a <tt>null</tt> blacklist
927   */
928  public static class UserTableDirFilter extends BlackListDirFilter {
929    public UserTableDirFilter(FileSystem fs) {
930      super(fs, HConstants.HBASE_NON_TABLE_DIRS);
931    }
932
933    @Override
934    protected boolean isValidName(final String name) {
935      if (!super.isValidName(name))
936        return false;
937
938      try {
939        TableName.isLegalTableQualifierName(Bytes.toBytes(name));
940      } catch (IllegalArgumentException e) {
941        LOG.info("Invalid table name: {}", name);
942        return false;
943      }
944      return true;
945    }
946  }
947
948  public void recoverFileLease(final FileSystem fs, final Path p, Configuration conf)
949      throws IOException {
950    recoverFileLease(fs, p, conf, null);
951  }
952
953  /**
954   * Recover file lease. Used when a file might be suspect
955   * to be had been left open by another process.
956   * @param fs FileSystem handle
957   * @param p Path of file to recover lease
958   * @param conf Configuration handle
959   * @throws IOException
960   */
961  public abstract void recoverFileLease(final FileSystem fs, final Path p,
962      Configuration conf, CancelableProgressable reporter) throws IOException;
963
964  public static List<Path> getTableDirs(final FileSystem fs, final Path rootdir)
965      throws IOException {
966    List<Path> tableDirs = new ArrayList<>();
967
968    for (FileStatus status : fs
969        .globStatus(new Path(rootdir, new Path(HConstants.BASE_NAMESPACE_DIR, "*")))) {
970      tableDirs.addAll(FSUtils.getLocalTableDirs(fs, status.getPath()));
971    }
972    return tableDirs;
973  }
974
975  /**
976   * @param fs
977   * @param rootdir
978   * @return All the table directories under <code>rootdir</code>. Ignore non table hbase folders such as
979   * .logs, .oldlogs, .corrupt folders.
980   * @throws IOException
981   */
982  public static List<Path> getLocalTableDirs(final FileSystem fs, final Path rootdir)
983      throws IOException {
984    // presumes any directory under hbase.rootdir is a table
985    FileStatus[] dirs = fs.listStatus(rootdir, new UserTableDirFilter(fs));
986    List<Path> tabledirs = new ArrayList<>(dirs.length);
987    for (FileStatus dir: dirs) {
988      tabledirs.add(dir.getPath());
989    }
990    return tabledirs;
991  }
992
993  /**
994   * Filter for all dirs that don't start with '.'
995   */
996  public static class RegionDirFilter extends AbstractFileStatusFilter {
997    // This pattern will accept 0.90+ style hex region dirs and older numeric region dir names.
998    final public static Pattern regionDirPattern = Pattern.compile("^[0-9a-f]*$");
999    final FileSystem fs;
1000
1001    public RegionDirFilter(FileSystem fs) {
1002      this.fs = fs;
1003    }
1004
1005    @Override
1006    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1007      if (!regionDirPattern.matcher(p.getName()).matches()) {
1008        return false;
1009      }
1010
1011      try {
1012        return isDirectory(fs, isDir, p);
1013      } catch (IOException ioe) {
1014        // Maybe the file was moved or the fs was disconnected.
1015        LOG.warn("Skipping file {} due to IOException", p, ioe);
1016        return false;
1017      }
1018    }
1019  }
1020
1021  /**
1022   * Given a particular table dir, return all the regiondirs inside it, excluding files such as
1023   * .tableinfo
1024   * @param fs A file system for the Path
1025   * @param tableDir Path to a specific table directory &lt;hbase.rootdir&gt;/&lt;tabledir&gt;
1026   * @return List of paths to valid region directories in table dir.
1027   * @throws IOException
1028   */
1029  public static List<Path> getRegionDirs(final FileSystem fs, final Path tableDir) throws IOException {
1030    // assumes we are in a table dir.
1031    List<FileStatus> rds = listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs));
1032    if (rds == null) {
1033      return Collections.emptyList();
1034    }
1035    List<Path> regionDirs = new ArrayList<>(rds.size());
1036    for (FileStatus rdfs: rds) {
1037      Path rdPath = rdfs.getPath();
1038      regionDirs.add(rdPath);
1039    }
1040    return regionDirs;
1041  }
1042
1043  public static Path getRegionDirFromRootDir(Path rootDir, RegionInfo region) {
1044    return getRegionDirFromTableDir(getTableDir(rootDir, region.getTable()), region);
1045  }
1046
1047  public static Path getRegionDirFromTableDir(Path tableDir, RegionInfo region) {
1048    return new Path(tableDir, ServerRegionReplicaUtil.getRegionInfoForFs(region).getEncodedName());
1049  }
1050
1051  /**
1052   * Filter for all dirs that are legal column family names.  This is generally used for colfam
1053   * dirs &lt;hbase.rootdir&gt;/&lt;tabledir&gt;/&lt;regiondir&gt;/&lt;colfamdir&gt;.
1054   */
1055  public static class FamilyDirFilter extends AbstractFileStatusFilter {
1056    final FileSystem fs;
1057
1058    public FamilyDirFilter(FileSystem fs) {
1059      this.fs = fs;
1060    }
1061
1062    @Override
1063    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1064      try {
1065        // throws IAE if invalid
1066        HColumnDescriptor.isLegalFamilyName(Bytes.toBytes(p.getName()));
1067      } catch (IllegalArgumentException iae) {
1068        // path name is an invalid family name and thus is excluded.
1069        return false;
1070      }
1071
1072      try {
1073        return isDirectory(fs, isDir, p);
1074      } catch (IOException ioe) {
1075        // Maybe the file was moved or the fs was disconnected.
1076        LOG.warn("Skipping file {} due to IOException", p, ioe);
1077        return false;
1078      }
1079    }
1080  }
1081
1082  /**
1083   * Given a particular region dir, return all the familydirs inside it
1084   *
1085   * @param fs A file system for the Path
1086   * @param regionDir Path to a specific region directory
1087   * @return List of paths to valid family directories in region dir.
1088   * @throws IOException
1089   */
1090  public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir) throws IOException {
1091    // assumes we are in a region dir.
1092    FileStatus[] fds = fs.listStatus(regionDir, new FamilyDirFilter(fs));
1093    List<Path> familyDirs = new ArrayList<>(fds.length);
1094    for (FileStatus fdfs: fds) {
1095      Path fdPath = fdfs.getPath();
1096      familyDirs.add(fdPath);
1097    }
1098    return familyDirs;
1099  }
1100
1101  public static List<Path> getReferenceFilePaths(final FileSystem fs, final Path familyDir) throws IOException {
1102    List<FileStatus> fds = listStatusWithStatusFilter(fs, familyDir, new ReferenceFileFilter(fs));
1103    if (fds == null) {
1104      return Collections.emptyList();
1105    }
1106    List<Path> referenceFiles = new ArrayList<>(fds.size());
1107    for (FileStatus fdfs: fds) {
1108      Path fdPath = fdfs.getPath();
1109      referenceFiles.add(fdPath);
1110    }
1111    return referenceFiles;
1112  }
1113
1114  /**
1115   * Filter for HFiles that excludes reference files.
1116   */
1117  public static class HFileFilter extends AbstractFileStatusFilter {
1118    final FileSystem fs;
1119
1120    public HFileFilter(FileSystem fs) {
1121      this.fs = fs;
1122    }
1123
1124    @Override
1125    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1126      if (!StoreFileInfo.isHFile(p)) {
1127        return false;
1128      }
1129
1130      try {
1131        return isFile(fs, isDir, p);
1132      } catch (IOException ioe) {
1133        // Maybe the file was moved or the fs was disconnected.
1134        LOG.warn("Skipping file {} due to IOException", p, ioe);
1135        return false;
1136      }
1137    }
1138  }
1139
1140  /**
1141   * Filter for HFileLinks (StoreFiles and HFiles not included).
1142   * the filter itself does not consider if a link is file or not.
1143   */
1144  public static class HFileLinkFilter implements PathFilter {
1145
1146    @Override
1147    public boolean accept(Path p) {
1148      return HFileLink.isHFileLink(p);
1149    }
1150  }
1151
1152  public static class ReferenceFileFilter extends AbstractFileStatusFilter {
1153
1154    private final FileSystem fs;
1155
1156    public ReferenceFileFilter(FileSystem fs) {
1157      this.fs = fs;
1158    }
1159
1160    @Override
1161    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1162      if (!StoreFileInfo.isReference(p)) {
1163        return false;
1164      }
1165
1166      try {
1167        // only files can be references.
1168        return isFile(fs, isDir, p);
1169      } catch (IOException ioe) {
1170        // Maybe the file was moved or the fs was disconnected.
1171        LOG.warn("Skipping file {} due to IOException", p, ioe);
1172        return false;
1173      }
1174    }
1175  }
1176
1177  /**
1178   * Called every so-often by storefile map builder getTableStoreFilePathMap to
1179   * report progress.
1180   */
1181  interface ProgressReporter {
1182    /**
1183     * @param status File or directory we are about to process.
1184     */
1185    void progress(FileStatus status);
1186  }
1187
1188  /**
1189   * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1190   * table StoreFile names to the full Path.
1191   * <br>
1192   * Example...<br>
1193   * Key = 3944417774205889744  <br>
1194   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1195   *
1196   * @param map map to add values.  If null, this method will create and populate one to return
1197   * @param fs  The file system to use.
1198   * @param hbaseRootDir  The root directory to scan.
1199   * @param tableName name of the table to scan.
1200   * @return Map keyed by StoreFile name with a value of the full Path.
1201   * @throws IOException When scanning the directory fails.
1202   * @throws InterruptedException
1203   */
1204  public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map,
1205  final FileSystem fs, final Path hbaseRootDir, TableName tableName)
1206  throws IOException, InterruptedException {
1207    return getTableStoreFilePathMap(map, fs, hbaseRootDir, tableName, null, null,
1208        (ProgressReporter)null);
1209  }
1210
1211  /**
1212   * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1213   * table StoreFile names to the full Path.  Note that because this method can be called
1214   * on a 'live' HBase system that we will skip files that no longer exist by the time
1215   * we traverse them and similarly the user of the result needs to consider that some
1216   * entries in this map may not exist by the time this call completes.
1217   * <br>
1218   * Example...<br>
1219   * Key = 3944417774205889744  <br>
1220   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1221   *
1222   * @param resultMap map to add values.  If null, this method will create and populate one to return
1223   * @param fs  The file system to use.
1224   * @param hbaseRootDir  The root directory to scan.
1225   * @param tableName name of the table to scan.
1226   * @param sfFilter optional path filter to apply to store files
1227   * @param executor optional executor service to parallelize this operation
1228   * @param progressReporter Instance or null; gets called every time we move to new region of
1229   *   family dir and for each store file.
1230   * @return Map keyed by StoreFile name with a value of the full Path.
1231   * @throws IOException When scanning the directory fails.
1232   * @deprecated Since 2.3.0. For removal in hbase4. Use ProgressReporter override instead.
1233   */
1234  @Deprecated
1235  public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap,
1236      final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter,
1237      ExecutorService executor, final HbckErrorReporter progressReporter)
1238      throws IOException, InterruptedException {
1239    return getTableStoreFilePathMap(resultMap, fs, hbaseRootDir, tableName, sfFilter, executor,
1240        new ProgressReporter() {
1241          @Override
1242          public void progress(FileStatus status) {
1243            // status is not used in this implementation.
1244            progressReporter.progress();
1245          }
1246        });
1247  }
1248
1249  /**
1250   * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1251   * table StoreFile names to the full Path.  Note that because this method can be called
1252   * on a 'live' HBase system that we will skip files that no longer exist by the time
1253   * we traverse them and similarly the user of the result needs to consider that some
1254   * entries in this map may not exist by the time this call completes.
1255   * <br>
1256   * Example...<br>
1257   * Key = 3944417774205889744  <br>
1258   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1259   *
1260   * @param resultMap map to add values.  If null, this method will create and populate one
1261   *   to return
1262   * @param fs  The file system to use.
1263   * @param hbaseRootDir  The root directory to scan.
1264   * @param tableName name of the table to scan.
1265   * @param sfFilter optional path filter to apply to store files
1266   * @param executor optional executor service to parallelize this operation
1267   * @param progressReporter Instance or null; gets called every time we move to new region of
1268   *   family dir and for each store file.
1269   * @return Map keyed by StoreFile name with a value of the full Path.
1270   * @throws IOException When scanning the directory fails.
1271   * @throws InterruptedException the thread is interrupted, either before or during the activity.
1272   */
1273  public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap,
1274      final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter,
1275      ExecutorService executor, final ProgressReporter progressReporter)
1276    throws IOException, InterruptedException {
1277
1278    final Map<String, Path> finalResultMap =
1279        resultMap == null ? new ConcurrentHashMap<>(128, 0.75f, 32) : resultMap;
1280
1281    // only include the directory paths to tables
1282    Path tableDir = FSUtils.getTableDir(hbaseRootDir, tableName);
1283    // Inside a table, there are compaction.dir directories to skip.  Otherwise, all else
1284    // should be regions.
1285    final FamilyDirFilter familyFilter = new FamilyDirFilter(fs);
1286    final Vector<Exception> exceptions = new Vector<>();
1287
1288    try {
1289      List<FileStatus> regionDirs = FSUtils.listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs));
1290      if (regionDirs == null) {
1291        return finalResultMap;
1292      }
1293
1294      final List<Future<?>> futures = new ArrayList<>(regionDirs.size());
1295
1296      for (FileStatus regionDir : regionDirs) {
1297        if (null != progressReporter) {
1298          progressReporter.progress(regionDir);
1299        }
1300        final Path dd = regionDir.getPath();
1301
1302        if (!exceptions.isEmpty()) {
1303          break;
1304        }
1305
1306        Runnable getRegionStoreFileMapCall = new Runnable() {
1307          @Override
1308          public void run() {
1309            try {
1310              HashMap<String,Path> regionStoreFileMap = new HashMap<>();
1311              List<FileStatus> familyDirs = FSUtils.listStatusWithStatusFilter(fs, dd, familyFilter);
1312              if (familyDirs == null) {
1313                if (!fs.exists(dd)) {
1314                  LOG.warn("Skipping region because it no longer exists: " + dd);
1315                } else {
1316                  LOG.warn("Skipping region because it has no family dirs: " + dd);
1317                }
1318                return;
1319              }
1320              for (FileStatus familyDir : familyDirs) {
1321                if (null != progressReporter) {
1322                  progressReporter.progress(familyDir);
1323                }
1324                Path family = familyDir.getPath();
1325                if (family.getName().equals(HConstants.RECOVERED_EDITS_DIR)) {
1326                  continue;
1327                }
1328                // now in family, iterate over the StoreFiles and
1329                // put in map
1330                FileStatus[] familyStatus = fs.listStatus(family);
1331                for (FileStatus sfStatus : familyStatus) {
1332                  if (null != progressReporter) {
1333                    progressReporter.progress(sfStatus);
1334                  }
1335                  Path sf = sfStatus.getPath();
1336                  if (sfFilter == null || sfFilter.accept(sf)) {
1337                    regionStoreFileMap.put( sf.getName(), sf);
1338                  }
1339                }
1340              }
1341              finalResultMap.putAll(regionStoreFileMap);
1342            } catch (Exception e) {
1343              LOG.error("Could not get region store file map for region: " + dd, e);
1344              exceptions.add(e);
1345            }
1346          }
1347        };
1348
1349        // If executor is available, submit async tasks to exec concurrently, otherwise
1350        // just do serial sync execution
1351        if (executor != null) {
1352          Future<?> future = executor.submit(getRegionStoreFileMapCall);
1353          futures.add(future);
1354        } else {
1355          FutureTask<?> future = new FutureTask<>(getRegionStoreFileMapCall, null);
1356          future.run();
1357          futures.add(future);
1358        }
1359      }
1360
1361      // Ensure all pending tasks are complete (or that we run into an exception)
1362      for (Future<?> f : futures) {
1363        if (!exceptions.isEmpty()) {
1364          break;
1365        }
1366        try {
1367          f.get();
1368        } catch (ExecutionException e) {
1369          LOG.error("Unexpected exec exception!  Should've been caught already.  (Bug?)", e);
1370          // Shouldn't happen, we already logged/caught any exceptions in the Runnable
1371        }
1372      }
1373    } catch (IOException e) {
1374      LOG.error("Cannot execute getTableStoreFilePathMap for " + tableName, e);
1375      exceptions.add(e);
1376    } finally {
1377      if (!exceptions.isEmpty()) {
1378        // Just throw the first exception as an indication something bad happened
1379        // Don't need to propagate all the exceptions, we already logged them all anyway
1380        Throwables.propagateIfInstanceOf(exceptions.firstElement(), IOException.class);
1381        throw Throwables.propagate(exceptions.firstElement());
1382      }
1383    }
1384
1385    return finalResultMap;
1386  }
1387
1388  public static int getRegionReferenceFileCount(final FileSystem fs, final Path p) {
1389    int result = 0;
1390    try {
1391      for (Path familyDir:getFamilyDirs(fs, p)){
1392        result += getReferenceFilePaths(fs, familyDir).size();
1393      }
1394    } catch (IOException e) {
1395      LOG.warn("Error counting reference files", e);
1396    }
1397    return result;
1398  }
1399
1400  /**
1401   * Runs through the HBase rootdir and creates a reverse lookup map for
1402   * table StoreFile names to the full Path.
1403   * <br>
1404   * Example...<br>
1405   * Key = 3944417774205889744  <br>
1406   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1407   *
1408   * @param fs  The file system to use.
1409   * @param hbaseRootDir  The root directory to scan.
1410   * @return Map keyed by StoreFile name with a value of the full Path.
1411   * @throws IOException When scanning the directory fails.
1412   */
1413  public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs,
1414      final Path hbaseRootDir)
1415  throws IOException, InterruptedException {
1416    return getTableStoreFilePathMap(fs, hbaseRootDir, null, null, (ProgressReporter)null);
1417  }
1418
1419  /**
1420   * Runs through the HBase rootdir and creates a reverse lookup map for
1421   * table StoreFile names to the full Path.
1422   * <br>
1423   * Example...<br>
1424   * Key = 3944417774205889744  <br>
1425   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1426   *
1427   * @param fs  The file system to use.
1428   * @param hbaseRootDir  The root directory to scan.
1429   * @param sfFilter optional path filter to apply to store files
1430   * @param executor optional executor service to parallelize this operation
1431   * @param progressReporter Instance or null; gets called every time we move to new region of
1432   *   family dir and for each store file.
1433   * @return Map keyed by StoreFile name with a value of the full Path.
1434   * @throws IOException When scanning the directory fails.
1435   * @deprecated Since 2.3.0. Will be removed in hbase4. Used {@link
1436   *   #getTableStoreFilePathMap(FileSystem, Path, PathFilter, ExecutorService, ProgressReporter)}
1437   */
1438  @Deprecated
1439  public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs,
1440      final Path hbaseRootDir, PathFilter sfFilter, ExecutorService executor,
1441      HbckErrorReporter progressReporter)
1442    throws IOException, InterruptedException {
1443    return getTableStoreFilePathMap(fs, hbaseRootDir, sfFilter, executor,
1444        new ProgressReporter() {
1445          @Override
1446          public void progress(FileStatus status) {
1447            // status is not used in this implementation.
1448            progressReporter.progress();
1449          }
1450        });
1451  }
1452
1453  /**
1454   * Runs through the HBase rootdir and creates a reverse lookup map for
1455   * table StoreFile names to the full Path.
1456   * <br>
1457   * Example...<br>
1458   * Key = 3944417774205889744  <br>
1459   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1460   *
1461   * @param fs  The file system to use.
1462   * @param hbaseRootDir  The root directory to scan.
1463   * @param sfFilter optional path filter to apply to store files
1464   * @param executor optional executor service to parallelize this operation
1465   * @param progressReporter Instance or null; gets called every time we move to new region of
1466   *   family dir and for each store file.
1467   * @return Map keyed by StoreFile name with a value of the full Path.
1468   * @throws IOException When scanning the directory fails.
1469   * @throws InterruptedException
1470   */
1471  public static Map<String, Path> getTableStoreFilePathMap(
1472    final FileSystem fs, final Path hbaseRootDir, PathFilter sfFilter,
1473        ExecutorService executor, ProgressReporter progressReporter)
1474  throws IOException, InterruptedException {
1475    ConcurrentHashMap<String, Path> map = new ConcurrentHashMap<>(1024, 0.75f, 32);
1476
1477    // if this method looks similar to 'getTableFragmentation' that is because
1478    // it was borrowed from it.
1479
1480    // only include the directory paths to tables
1481    for (Path tableDir : FSUtils.getTableDirs(fs, hbaseRootDir)) {
1482      getTableStoreFilePathMap(map, fs, hbaseRootDir,
1483          FSUtils.getTableName(tableDir), sfFilter, executor, progressReporter);
1484    }
1485    return map;
1486  }
1487
1488  /**
1489   * Filters FileStatuses in an array and returns a list
1490   *
1491   * @param input   An array of FileStatuses
1492   * @param filter  A required filter to filter the array
1493   * @return        A list of FileStatuses
1494   */
1495  public static List<FileStatus> filterFileStatuses(FileStatus[] input,
1496      FileStatusFilter filter) {
1497    if (input == null) return null;
1498    return filterFileStatuses(Iterators.forArray(input), filter);
1499  }
1500
1501  /**
1502   * Filters FileStatuses in an iterator and returns a list
1503   *
1504   * @param input   An iterator of FileStatuses
1505   * @param filter  A required filter to filter the array
1506   * @return        A list of FileStatuses
1507   */
1508  public static List<FileStatus> filterFileStatuses(Iterator<FileStatus> input,
1509      FileStatusFilter filter) {
1510    if (input == null) return null;
1511    ArrayList<FileStatus> results = new ArrayList<>();
1512    while (input.hasNext()) {
1513      FileStatus f = input.next();
1514      if (filter.accept(f)) {
1515        results.add(f);
1516      }
1517    }
1518    return results;
1519  }
1520
1521  /**
1522   * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
1523   * This accommodates differences between hadoop versions, where hadoop 1
1524   * does not throw a FileNotFoundException, and return an empty FileStatus[]
1525   * while Hadoop 2 will throw FileNotFoundException.
1526   *
1527   * @param fs file system
1528   * @param dir directory
1529   * @param filter file status filter
1530   * @return null if dir is empty or doesn't exist, otherwise FileStatus list
1531   */
1532  public static List<FileStatus> listStatusWithStatusFilter(final FileSystem fs,
1533      final Path dir, final FileStatusFilter filter) throws IOException {
1534    FileStatus [] status = null;
1535    try {
1536      status = fs.listStatus(dir);
1537    } catch (FileNotFoundException fnfe) {
1538      LOG.trace("{} does not exist", dir);
1539      return null;
1540    }
1541
1542    if (ArrayUtils.getLength(status) == 0)  {
1543      return null;
1544    }
1545
1546    if (filter == null) {
1547      return Arrays.asList(status);
1548    } else {
1549      List<FileStatus> status2 = filterFileStatuses(status, filter);
1550      if (status2 == null || status2.isEmpty()) {
1551        return null;
1552      } else {
1553        return status2;
1554      }
1555    }
1556  }
1557
1558  /**
1559   * Throw an exception if an action is not permitted by a user on a file.
1560   *
1561   * @param ugi
1562   *          the user
1563   * @param file
1564   *          the file
1565   * @param action
1566   *          the action
1567   */
1568  public static void checkAccess(UserGroupInformation ugi, FileStatus file,
1569      FsAction action) throws AccessDeniedException {
1570    if (ugi.getShortUserName().equals(file.getOwner())) {
1571      if (file.getPermission().getUserAction().implies(action)) {
1572        return;
1573      }
1574    } else if (ArrayUtils.contains(ugi.getGroupNames(), file.getGroup())) {
1575      if (file.getPermission().getGroupAction().implies(action)) {
1576        return;
1577      }
1578    } else if (file.getPermission().getOtherAction().implies(action)) {
1579      return;
1580    }
1581    throw new AccessDeniedException("Permission denied:" + " action=" + action
1582        + " path=" + file.getPath() + " user=" + ugi.getShortUserName());
1583  }
1584
1585  /**
1586   * This function is to scan the root path of the file system to get the
1587   * degree of locality for each region on each of the servers having at least
1588   * one block of that region.
1589   * This is used by the tool {@link org.apache.hadoop.hbase.master.RegionPlacementMaintainer}
1590   *
1591   * @param conf
1592   *          the configuration to use
1593   * @return the mapping from region encoded name to a map of server names to
1594   *           locality fraction
1595   * @throws IOException
1596   *           in case of file system errors or interrupts
1597   */
1598  public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
1599      final Configuration conf) throws IOException {
1600    return getRegionDegreeLocalityMappingFromFS(
1601        conf, null,
1602        conf.getInt(THREAD_POOLSIZE, DEFAULT_THREAD_POOLSIZE));
1603
1604  }
1605
1606  /**
1607   * This function is to scan the root path of the file system to get the
1608   * degree of locality for each region on each of the servers having at least
1609   * one block of that region.
1610   *
1611   * @param conf
1612   *          the configuration to use
1613   * @param desiredTable
1614   *          the table you wish to scan locality for
1615   * @param threadPoolSize
1616   *          the thread pool size to use
1617   * @return the mapping from region encoded name to a map of server names to
1618   *           locality fraction
1619   * @throws IOException
1620   *           in case of file system errors or interrupts
1621   */
1622  public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
1623      final Configuration conf, final String desiredTable, int threadPoolSize)
1624      throws IOException {
1625    Map<String, Map<String, Float>> regionDegreeLocalityMapping = new ConcurrentHashMap<>();
1626    getRegionLocalityMappingFromFS(conf, desiredTable, threadPoolSize, regionDegreeLocalityMapping);
1627    return regionDegreeLocalityMapping;
1628  }
1629
1630  /**
1631   * This function is to scan the root path of the file system to get either the
1632   * mapping between the region name and its best locality region server or the
1633   * degree of locality of each region on each of the servers having at least
1634   * one block of that region. The output map parameters are both optional.
1635   *
1636   * @param conf
1637   *          the configuration to use
1638   * @param desiredTable
1639   *          the table you wish to scan locality for
1640   * @param threadPoolSize
1641   *          the thread pool size to use
1642   * @param regionDegreeLocalityMapping
1643   *          the map into which to put the locality degree mapping or null,
1644   *          must be a thread-safe implementation
1645   * @throws IOException
1646   *           in case of file system errors or interrupts
1647   */
1648  private static void getRegionLocalityMappingFromFS(final Configuration conf,
1649      final String desiredTable, int threadPoolSize,
1650      final Map<String, Map<String, Float>> regionDegreeLocalityMapping) throws IOException {
1651    final FileSystem fs =  FileSystem.get(conf);
1652    final Path rootPath = FSUtils.getRootDir(conf);
1653    final long startTime = EnvironmentEdgeManager.currentTime();
1654    final Path queryPath;
1655    // The table files are in ${hbase.rootdir}/data/<namespace>/<table>/*
1656    if (null == desiredTable) {
1657      queryPath = new Path(new Path(rootPath, HConstants.BASE_NAMESPACE_DIR).toString() + "/*/*/*/");
1658    } else {
1659      queryPath = new Path(FSUtils.getTableDir(rootPath, TableName.valueOf(desiredTable)).toString() + "/*/");
1660    }
1661
1662    // reject all paths that are not appropriate
1663    PathFilter pathFilter = new PathFilter() {
1664      @Override
1665      public boolean accept(Path path) {
1666        // this is the region name; it may get some noise data
1667        if (null == path) {
1668          return false;
1669        }
1670
1671        // no parent?
1672        Path parent = path.getParent();
1673        if (null == parent) {
1674          return false;
1675        }
1676
1677        String regionName = path.getName();
1678        if (null == regionName) {
1679          return false;
1680        }
1681
1682        if (!regionName.toLowerCase(Locale.ROOT).matches("[0-9a-f]+")) {
1683          return false;
1684        }
1685        return true;
1686      }
1687    };
1688
1689    FileStatus[] statusList = fs.globStatus(queryPath, pathFilter);
1690
1691    if (LOG.isDebugEnabled()) {
1692      LOG.debug("Query Path: {} ; # list of files: {}", queryPath, Arrays.toString(statusList));
1693    }
1694
1695    if (null == statusList) {
1696      return;
1697    }
1698
1699    // lower the number of threads in case we have very few expected regions
1700    threadPoolSize = Math.min(threadPoolSize, statusList.length);
1701
1702    // run in multiple threads
1703    final ExecutorService tpe = Executors.newFixedThreadPool(threadPoolSize,
1704      Threads.newDaemonThreadFactory("FSRegionQuery"));
1705    try {
1706      // ignore all file status items that are not of interest
1707      for (FileStatus regionStatus : statusList) {
1708        if (null == regionStatus || !regionStatus.isDirectory()) {
1709          continue;
1710        }
1711
1712        final Path regionPath = regionStatus.getPath();
1713        if (null != regionPath) {
1714          tpe.execute(new FSRegionScanner(fs, regionPath, null, regionDegreeLocalityMapping));
1715        }
1716      }
1717    } finally {
1718      tpe.shutdown();
1719      final long threadWakeFrequency = (long) conf.getInt(HConstants.THREAD_WAKE_FREQUENCY,
1720        HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
1721      try {
1722        // here we wait until TPE terminates, which is either naturally or by
1723        // exceptions in the execution of the threads
1724        while (!tpe.awaitTermination(threadWakeFrequency,
1725            TimeUnit.MILLISECONDS)) {
1726          // printing out rough estimate, so as to not introduce
1727          // AtomicInteger
1728          LOG.info("Locality checking is underway: { Scanned Regions : "
1729              + ((ThreadPoolExecutor) tpe).getCompletedTaskCount() + "/"
1730              + ((ThreadPoolExecutor) tpe).getTaskCount() + " }");
1731        }
1732      } catch (InterruptedException e) {
1733        Thread.currentThread().interrupt();
1734        throw (InterruptedIOException) new InterruptedIOException().initCause(e);
1735      }
1736    }
1737
1738    long overhead = EnvironmentEdgeManager.currentTime() - startTime;
1739    LOG.info("Scan DFS for locality info takes {}ms", overhead);
1740  }
1741
1742  /**
1743   * Do our short circuit read setup.
1744   * Checks buffer size to use and whether to do checksumming in hbase or hdfs.
1745   * @param conf
1746   */
1747  public static void setupShortCircuitRead(final Configuration conf) {
1748    // Check that the user has not set the "dfs.client.read.shortcircuit.skip.checksum" property.
1749    boolean shortCircuitSkipChecksum =
1750      conf.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false);
1751    boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
1752    if (shortCircuitSkipChecksum) {
1753      LOG.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not " +
1754        "be set to true." + (useHBaseChecksum ? " HBase checksum doesn't require " +
1755        "it, see https://issues.apache.org/jira/browse/HBASE-6868." : ""));
1756      assert !shortCircuitSkipChecksum; //this will fail if assertions are on
1757    }
1758    checkShortCircuitReadBufferSize(conf);
1759  }
1760
1761  /**
1762   * Check if short circuit read buffer size is set and if not, set it to hbase value.
1763   * @param conf
1764   */
1765  public static void checkShortCircuitReadBufferSize(final Configuration conf) {
1766    final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2;
1767    final int notSet = -1;
1768    // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2
1769    final String dfsKey = "dfs.client.read.shortcircuit.buffer.size";
1770    int size = conf.getInt(dfsKey, notSet);
1771    // If a size is set, return -- we will use it.
1772    if (size != notSet) return;
1773    // But short circuit buffer size is normally not set.  Put in place the hbase wanted size.
1774    int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize);
1775    conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
1776  }
1777
1778  /**
1779   * @param c
1780   * @return The DFSClient DFSHedgedReadMetrics instance or null if can't be found or not on hdfs.
1781   * @throws IOException
1782   */
1783  public static DFSHedgedReadMetrics getDFSHedgedReadMetrics(final Configuration c)
1784      throws IOException {
1785    if (!isHDFS(c)) return null;
1786    // getHedgedReadMetrics is package private. Get the DFSClient instance that is internal
1787    // to the DFS FS instance and make the method getHedgedReadMetrics accessible, then invoke it
1788    // to get the singleton instance of DFSHedgedReadMetrics shared by DFSClients.
1789    final String name = "getHedgedReadMetrics";
1790    DFSClient dfsclient = ((DistributedFileSystem)FileSystem.get(c)).getClient();
1791    Method m;
1792    try {
1793      m = dfsclient.getClass().getDeclaredMethod(name);
1794    } catch (NoSuchMethodException e) {
1795      LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " +
1796          e.getMessage());
1797      return null;
1798    } catch (SecurityException e) {
1799      LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " +
1800          e.getMessage());
1801      return null;
1802    }
1803    m.setAccessible(true);
1804    try {
1805      return (DFSHedgedReadMetrics)m.invoke(dfsclient);
1806    } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
1807      LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " +
1808          e.getMessage());
1809      return null;
1810    }
1811  }
1812
1813  public static List<Path> copyFilesParallel(FileSystem srcFS, Path src, FileSystem dstFS, Path dst,
1814      Configuration conf, int threads) throws IOException {
1815    ExecutorService pool = Executors.newFixedThreadPool(threads);
1816    List<Future<Void>> futures = new ArrayList<>();
1817    List<Path> traversedPaths;
1818    try {
1819      traversedPaths = copyFiles(srcFS, src, dstFS, dst, conf, pool, futures);
1820      for (Future<Void> future : futures) {
1821        future.get();
1822      }
1823    } catch (ExecutionException | InterruptedException | IOException e) {
1824      throw new IOException("Copy snapshot reference files failed", e);
1825    } finally {
1826      pool.shutdownNow();
1827    }
1828    return traversedPaths;
1829  }
1830
1831  private static List<Path> copyFiles(FileSystem srcFS, Path src, FileSystem dstFS, Path dst,
1832      Configuration conf, ExecutorService pool, List<Future<Void>> futures) throws IOException {
1833    List<Path> traversedPaths = new ArrayList<>();
1834    traversedPaths.add(dst);
1835    FileStatus currentFileStatus = srcFS.getFileStatus(src);
1836    if (currentFileStatus.isDirectory()) {
1837      if (!dstFS.mkdirs(dst)) {
1838        throw new IOException("Create directory failed: " + dst);
1839      }
1840      FileStatus[] subPaths = srcFS.listStatus(src);
1841      for (FileStatus subPath : subPaths) {
1842        traversedPaths.addAll(copyFiles(srcFS, subPath.getPath(), dstFS,
1843          new Path(dst, subPath.getPath().getName()), conf, pool, futures));
1844      }
1845    } else {
1846      Future<Void> future = pool.submit(() -> {
1847        FileUtil.copy(srcFS, src, dstFS, dst, false, false, conf);
1848        return null;
1849      });
1850      futures.add(future);
1851    }
1852    return traversedPaths;
1853  }
1854}