001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.util;
020
021import edu.umd.cs.findbugs.annotations.CheckForNull;
022import java.io.ByteArrayInputStream;
023import java.io.DataInputStream;
024import java.io.EOFException;
025import java.io.FileNotFoundException;
026import java.io.IOException;
027import java.io.InterruptedIOException;
028import java.lang.reflect.InvocationTargetException;
029import java.lang.reflect.Method;
030import java.net.InetSocketAddress;
031import java.util.ArrayList;
032import java.util.Arrays;
033import java.util.Collections;
034import java.util.HashMap;
035import java.util.Iterator;
036import java.util.List;
037import java.util.Locale;
038import java.util.Map;
039import java.util.Vector;
040import java.util.concurrent.ConcurrentHashMap;
041import java.util.concurrent.ExecutionException;
042import java.util.concurrent.ExecutorService;
043import java.util.concurrent.Executors;
044import java.util.concurrent.Future;
045import java.util.concurrent.FutureTask;
046import java.util.concurrent.ThreadPoolExecutor;
047import java.util.concurrent.TimeUnit;
048import java.util.regex.Pattern;
049
050import org.apache.commons.lang3.ArrayUtils;
051import org.apache.hadoop.conf.Configuration;
052import org.apache.hadoop.fs.BlockLocation;
053import org.apache.hadoop.fs.FSDataInputStream;
054import org.apache.hadoop.fs.FSDataOutputStream;
055import org.apache.hadoop.fs.FileStatus;
056import org.apache.hadoop.fs.FileSystem;
057import org.apache.hadoop.fs.FileUtil;
058import org.apache.hadoop.fs.Path;
059import org.apache.hadoop.fs.PathFilter;
060import org.apache.hadoop.fs.permission.FsAction;
061import org.apache.hadoop.fs.permission.FsPermission;
062import org.apache.hadoop.hbase.ClusterId;
063import org.apache.hadoop.hbase.HColumnDescriptor;
064import org.apache.hadoop.hbase.HConstants;
065import org.apache.hadoop.hbase.HDFSBlocksDistribution;
066import org.apache.hadoop.hbase.HRegionInfo;
067import org.apache.hadoop.hbase.TableName;
068import org.apache.hadoop.hbase.client.RegionInfo;
069import org.apache.hadoop.hbase.client.RegionInfoBuilder;
070import org.apache.hadoop.hbase.exceptions.DeserializationException;
071import org.apache.hadoop.hbase.fs.HFileSystem;
072import org.apache.hadoop.hbase.io.HFileLink;
073import org.apache.hadoop.hbase.master.HMaster;
074import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
075import org.apache.hadoop.hbase.security.AccessDeniedException;
076import org.apache.hadoop.hdfs.DFSClient;
077import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
078import org.apache.hadoop.hdfs.DistributedFileSystem;
079import org.apache.hadoop.hdfs.protocol.HdfsConstants;
080import org.apache.hadoop.io.IOUtils;
081import org.apache.hadoop.ipc.RemoteException;
082import org.apache.hadoop.security.UserGroupInformation;
083import org.apache.hadoop.util.Progressable;
084import org.apache.hadoop.util.ReflectionUtils;
085import org.apache.hadoop.util.StringUtils;
086import org.apache.yetus.audience.InterfaceAudience;
087import org.slf4j.Logger;
088import org.slf4j.LoggerFactory;
089
090import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
091import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
092import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
093import org.apache.hbase.thirdparty.com.google.common.primitives.Ints;
094
095import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
096import org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos;
097
098/**
099 * Utility methods for interacting with the underlying file system.
100 */
101@InterfaceAudience.Private
102public abstract class FSUtils extends CommonFSUtils {
103  private static final Logger LOG = LoggerFactory.getLogger(FSUtils.class);
104
105  private static final String THREAD_POOLSIZE = "hbase.client.localityCheck.threadPoolSize";
106  private static final int DEFAULT_THREAD_POOLSIZE = 2;
107
108  /** Set to true on Windows platforms */
109  @VisibleForTesting // currently only used in testing. TODO refactor into a test class
110  public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
111
112  protected FSUtils() {
113    super();
114  }
115
116  /**
117   * @return True is <code>fs</code> is instance of DistributedFileSystem
118   * @throws IOException
119   */
120  public static boolean isDistributedFileSystem(final FileSystem fs) throws IOException {
121    FileSystem fileSystem = fs;
122    // If passed an instance of HFileSystem, it fails instanceof DistributedFileSystem.
123    // Check its backing fs for dfs-ness.
124    if (fs instanceof HFileSystem) {
125      fileSystem = ((HFileSystem)fs).getBackingFs();
126    }
127    return fileSystem instanceof DistributedFileSystem;
128  }
129
130  /**
131   * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
132   * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true.  Does not consider
133   * schema; i.e. if schemas different but path or subpath matches, the two will equate.
134   * @param pathToSearch Path we will be trying to match.
135   * @param pathTail
136   * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
137   */
138  public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) {
139    Path tailPath = pathTail;
140    String tailName;
141    Path toSearch = pathToSearch;
142    String toSearchName;
143    boolean result = false;
144
145    if (pathToSearch.depth() != pathTail.depth()) {
146      return false;
147    }
148
149    do {
150      tailName = tailPath.getName();
151      if (tailName == null || tailName.isEmpty()) {
152        result = true;
153        break;
154      }
155      toSearchName = toSearch.getName();
156      if (toSearchName == null || toSearchName.isEmpty()) {
157        break;
158      }
159      // Move up a parent on each path for next go around.  Path doesn't let us go off the end.
160      tailPath = tailPath.getParent();
161      toSearch = toSearch.getParent();
162    } while(tailName.equals(toSearchName));
163    return result;
164  }
165
166  public static FSUtils getInstance(FileSystem fs, Configuration conf) {
167    String scheme = fs.getUri().getScheme();
168    if (scheme == null) {
169      LOG.warn("Could not find scheme for uri " +
170          fs.getUri() + ", default to hdfs");
171      scheme = "hdfs";
172    }
173    Class<?> fsUtilsClass = conf.getClass("hbase.fsutil." +
174        scheme + ".impl", FSHDFSUtils.class); // Default to HDFS impl
175    FSUtils fsUtils = (FSUtils)ReflectionUtils.newInstance(fsUtilsClass, conf);
176    return fsUtils;
177  }
178
179  /**
180   * Delete the region directory if exists.
181   * @param conf
182   * @param hri
183   * @return True if deleted the region directory.
184   * @throws IOException
185   */
186  public static boolean deleteRegionDir(final Configuration conf, final HRegionInfo hri)
187  throws IOException {
188    Path rootDir = getRootDir(conf);
189    FileSystem fs = rootDir.getFileSystem(conf);
190    return deleteDirectory(fs,
191      new Path(getTableDir(rootDir, hri.getTable()), hri.getEncodedName()));
192  }
193
194 /**
195   * Create the specified file on the filesystem. By default, this will:
196   * <ol>
197   * <li>overwrite the file if it exists</li>
198   * <li>apply the umask in the configuration (if it is enabled)</li>
199   * <li>use the fs configured buffer size (or 4096 if not set)</li>
200   * <li>use the configured column family replication or default replication if
201   * {@link HColumnDescriptor#DEFAULT_DFS_REPLICATION}</li>
202   * <li>use the default block size</li>
203   * <li>not track progress</li>
204   * </ol>
205   * @param conf configurations
206   * @param fs {@link FileSystem} on which to write the file
207   * @param path {@link Path} to the file to write
208   * @param perm permissions
209   * @param favoredNodes
210   * @return output stream to the created file
211   * @throws IOException if the file cannot be created
212   */
213  public static FSDataOutputStream create(Configuration conf, FileSystem fs, Path path,
214      FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException {
215    if (fs instanceof HFileSystem) {
216      FileSystem backingFs = ((HFileSystem)fs).getBackingFs();
217      if (backingFs instanceof DistributedFileSystem) {
218        // Try to use the favoredNodes version via reflection to allow backwards-
219        // compatibility.
220        short replication = Short.parseShort(conf.get(HColumnDescriptor.DFS_REPLICATION,
221          String.valueOf(HColumnDescriptor.DEFAULT_DFS_REPLICATION)));
222        try {
223          return (FSDataOutputStream) (DistributedFileSystem.class.getDeclaredMethod("create",
224            Path.class, FsPermission.class, boolean.class, int.class, short.class, long.class,
225            Progressable.class, InetSocketAddress[].class).invoke(backingFs, path, perm, true,
226            getDefaultBufferSize(backingFs),
227            replication > 0 ? replication : getDefaultReplication(backingFs, path),
228            getDefaultBlockSize(backingFs, path), null, favoredNodes));
229        } catch (InvocationTargetException ite) {
230          // Function was properly called, but threw it's own exception.
231          throw new IOException(ite.getCause());
232        } catch (NoSuchMethodException e) {
233          LOG.debug("DFS Client does not support most favored nodes create; using default create");
234          LOG.trace("Ignoring; use default create", e);
235        } catch (IllegalArgumentException | SecurityException |  IllegalAccessException e) {
236          LOG.debug("Ignoring (most likely Reflection related exception) " + e);
237        }
238      }
239    }
240    return create(fs, path, perm, true);
241  }
242
243  /**
244   * Checks to see if the specified file system is available
245   *
246   * @param fs filesystem
247   * @throws IOException e
248   */
249  public static void checkFileSystemAvailable(final FileSystem fs)
250  throws IOException {
251    if (!(fs instanceof DistributedFileSystem)) {
252      return;
253    }
254    IOException exception = null;
255    DistributedFileSystem dfs = (DistributedFileSystem) fs;
256    try {
257      if (dfs.exists(new Path("/"))) {
258        return;
259      }
260    } catch (IOException e) {
261      exception = e instanceof RemoteException ?
262              ((RemoteException)e).unwrapRemoteException() : e;
263    }
264    try {
265      fs.close();
266    } catch (Exception e) {
267      LOG.error("file system close failed: ", e);
268    }
269    IOException io = new IOException("File system is not available");
270    io.initCause(exception);
271    throw io;
272  }
273
274  /**
275   * We use reflection because {@link DistributedFileSystem#setSafeMode(
276   * HdfsConstants.SafeModeAction action, boolean isChecked)} is not in hadoop 1.1
277   *
278   * @param dfs
279   * @return whether we're in safe mode
280   * @throws IOException
281   */
282  private static boolean isInSafeMode(DistributedFileSystem dfs) throws IOException {
283    boolean inSafeMode = false;
284    try {
285      Method m = DistributedFileSystem.class.getMethod("setSafeMode", new Class<?> []{
286          org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.class, boolean.class});
287      inSafeMode = (Boolean) m.invoke(dfs,
288        org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET, true);
289    } catch (Exception e) {
290      if (e instanceof IOException) throw (IOException) e;
291
292      // Check whether dfs is on safemode.
293      inSafeMode = dfs.setSafeMode(
294        org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET);
295    }
296    return inSafeMode;
297  }
298
299  /**
300   * Check whether dfs is in safemode.
301   * @param conf
302   * @throws IOException
303   */
304  public static void checkDfsSafeMode(final Configuration conf)
305  throws IOException {
306    boolean isInSafeMode = false;
307    FileSystem fs = FileSystem.get(conf);
308    if (fs instanceof DistributedFileSystem) {
309      DistributedFileSystem dfs = (DistributedFileSystem)fs;
310      isInSafeMode = isInSafeMode(dfs);
311    }
312    if (isInSafeMode) {
313      throw new IOException("File system is in safemode, it can't be written now");
314    }
315  }
316
317  /**
318   * Verifies current version of file system
319   *
320   * @param fs filesystem object
321   * @param rootdir root hbase directory
322   * @return null if no version file exists, version string otherwise
323   * @throws IOException if the version file fails to open
324   * @throws DeserializationException if the version data cannot be translated into a version
325   */
326  public static String getVersion(FileSystem fs, Path rootdir)
327  throws IOException, DeserializationException {
328    final Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
329    FileStatus[] status = null;
330    try {
331      // hadoop 2.0 throws FNFE if directory does not exist.
332      // hadoop 1.0 returns null if directory does not exist.
333      status = fs.listStatus(versionFile);
334    } catch (FileNotFoundException fnfe) {
335      return null;
336    }
337    if (ArrayUtils.getLength(status) == 0) {
338      return null;
339    }
340    String version = null;
341    byte [] content = new byte [(int)status[0].getLen()];
342    FSDataInputStream s = fs.open(versionFile);
343    try {
344      IOUtils.readFully(s, content, 0, content.length);
345      if (ProtobufUtil.isPBMagicPrefix(content)) {
346        version = parseVersionFrom(content);
347      } else {
348        // Presume it pre-pb format.
349        try (DataInputStream dis = new DataInputStream(new ByteArrayInputStream(content))) {
350          version = dis.readUTF();
351        }
352      }
353    } catch (EOFException eof) {
354      LOG.warn("Version file was empty, odd, will try to set it.");
355    } finally {
356      s.close();
357    }
358    return version;
359  }
360
361  /**
362   * Parse the content of the ${HBASE_ROOTDIR}/hbase.version file.
363   * @param bytes The byte content of the hbase.version file
364   * @return The version found in the file as a String
365   * @throws DeserializationException if the version data cannot be translated into a version
366   */
367  static String parseVersionFrom(final byte [] bytes)
368  throws DeserializationException {
369    ProtobufUtil.expectPBMagicPrefix(bytes);
370    int pblen = ProtobufUtil.lengthOfPBMagic();
371    FSProtos.HBaseVersionFileContent.Builder builder =
372      FSProtos.HBaseVersionFileContent.newBuilder();
373    try {
374      ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
375      return builder.getVersion();
376    } catch (IOException e) {
377      // Convert
378      throw new DeserializationException(e);
379    }
380  }
381
382  /**
383   * Create the content to write into the ${HBASE_ROOTDIR}/hbase.version file.
384   * @param version Version to persist
385   * @return Serialized protobuf with <code>version</code> content and a bit of pb magic for a prefix.
386   */
387  static byte [] toVersionByteArray(final String version) {
388    FSProtos.HBaseVersionFileContent.Builder builder =
389      FSProtos.HBaseVersionFileContent.newBuilder();
390    return ProtobufUtil.prependPBMagic(builder.setVersion(version).build().toByteArray());
391  }
392
393  /**
394   * Verifies current version of file system
395   *
396   * @param fs file system
397   * @param rootdir root directory of HBase installation
398   * @param message if true, issues a message on System.out
399   * @throws IOException if the version file cannot be opened
400   * @throws DeserializationException if the contents of the version file cannot be parsed
401   */
402  public static void checkVersion(FileSystem fs, Path rootdir, boolean message)
403  throws IOException, DeserializationException {
404    checkVersion(fs, rootdir, message, 0, HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
405  }
406
407  /**
408   * Verifies current version of file system
409   *
410   * @param fs file system
411   * @param rootdir root directory of HBase installation
412   * @param message if true, issues a message on System.out
413   * @param wait wait interval
414   * @param retries number of times to retry
415   *
416   * @throws IOException if the version file cannot be opened
417   * @throws DeserializationException if the contents of the version file cannot be parsed
418   */
419  public static void checkVersion(FileSystem fs, Path rootdir,
420      boolean message, int wait, int retries)
421  throws IOException, DeserializationException {
422    String version = getVersion(fs, rootdir);
423    String msg;
424    if (version == null) {
425      if (!metaRegionExists(fs, rootdir)) {
426        // rootDir is empty (no version file and no root region)
427        // just create new version file (HBASE-1195)
428        setVersion(fs, rootdir, wait, retries);
429        return;
430      } else {
431        msg = "hbase.version file is missing. Is your hbase.rootdir valid? " +
432            "You can restore hbase.version file by running 'HBCK2 filesystem -fix'. " +
433            "See https://github.com/apache/hbase-operator-tools/tree/master/hbase-hbck2";
434      }
435    } else if (version.compareTo(HConstants.FILE_SYSTEM_VERSION) == 0) {
436      return;
437    } else {
438      msg = "HBase file layout needs to be upgraded. Current filesystem version is " + version +
439          " but software requires version " + HConstants.FILE_SYSTEM_VERSION +
440          ". Consult http://hbase.apache.org/book.html for further information about " +
441          "upgrading HBase.";
442    }
443
444    // version is deprecated require migration
445    // Output on stdout so user sees it in terminal.
446    if (message) {
447      System.out.println("WARNING! " + msg);
448    }
449    throw new FileSystemVersionException(msg);
450  }
451
452  /**
453   * Sets version of file system
454   *
455   * @param fs filesystem object
456   * @param rootdir hbase root
457   * @throws IOException e
458   */
459  public static void setVersion(FileSystem fs, Path rootdir)
460  throws IOException {
461    setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, 0,
462      HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
463  }
464
465  /**
466   * Sets version of file system
467   *
468   * @param fs filesystem object
469   * @param rootdir hbase root
470   * @param wait time to wait for retry
471   * @param retries number of times to retry before failing
472   * @throws IOException e
473   */
474  public static void setVersion(FileSystem fs, Path rootdir, int wait, int retries)
475  throws IOException {
476    setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, wait, retries);
477  }
478
479
480  /**
481   * Sets version of file system
482   *
483   * @param fs filesystem object
484   * @param rootdir hbase root directory
485   * @param version version to set
486   * @param wait time to wait for retry
487   * @param retries number of times to retry before throwing an IOException
488   * @throws IOException e
489   */
490  public static void setVersion(FileSystem fs, Path rootdir, String version,
491      int wait, int retries) throws IOException {
492    Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
493    Path tempVersionFile = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY + Path.SEPARATOR +
494      HConstants.VERSION_FILE_NAME);
495    while (true) {
496      try {
497        // Write the version to a temporary file
498        FSDataOutputStream s = fs.create(tempVersionFile);
499        try {
500          s.write(toVersionByteArray(version));
501          s.close();
502          s = null;
503          // Move the temp version file to its normal location. Returns false
504          // if the rename failed. Throw an IOE in that case.
505          if (!fs.rename(tempVersionFile, versionFile)) {
506            throw new IOException("Unable to move temp version file to " + versionFile);
507          }
508        } finally {
509          // Cleaning up the temporary if the rename failed would be trying
510          // too hard. We'll unconditionally create it again the next time
511          // through anyway, files are overwritten by default by create().
512
513          // Attempt to close the stream on the way out if it is still open.
514          try {
515            if (s != null) s.close();
516          } catch (IOException ignore) { }
517        }
518        LOG.info("Created version file at " + rootdir.toString() + " with version=" + version);
519        return;
520      } catch (IOException e) {
521        if (retries > 0) {
522          LOG.debug("Unable to create version file at " + rootdir.toString() + ", retrying", e);
523          fs.delete(versionFile, false);
524          try {
525            if (wait > 0) {
526              Thread.sleep(wait);
527            }
528          } catch (InterruptedException ie) {
529            throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
530          }
531          retries--;
532        } else {
533          throw e;
534        }
535      }
536    }
537  }
538
539  /**
540   * Checks that a cluster ID file exists in the HBase root directory
541   * @param fs the root directory FileSystem
542   * @param rootdir the HBase root directory in HDFS
543   * @param wait how long to wait between retries
544   * @return <code>true</code> if the file exists, otherwise <code>false</code>
545   * @throws IOException if checking the FileSystem fails
546   */
547  public static boolean checkClusterIdExists(FileSystem fs, Path rootdir,
548      long wait) throws IOException {
549    while (true) {
550      try {
551        Path filePath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
552        return fs.exists(filePath);
553      } catch (IOException ioe) {
554        if (wait > 0L) {
555          LOG.warn("Unable to check cluster ID file in {}, retrying in {}ms", rootdir, wait, ioe);
556          try {
557            Thread.sleep(wait);
558          } catch (InterruptedException e) {
559            Thread.currentThread().interrupt();
560            throw (InterruptedIOException) new InterruptedIOException().initCause(e);
561          }
562        } else {
563          throw ioe;
564        }
565      }
566    }
567  }
568
569  /**
570   * Returns the value of the unique cluster ID stored for this HBase instance.
571   * @param fs the root directory FileSystem
572   * @param rootdir the path to the HBase root directory
573   * @return the unique cluster identifier
574   * @throws IOException if reading the cluster ID file fails
575   */
576  public static ClusterId getClusterId(FileSystem fs, Path rootdir)
577  throws IOException {
578    Path idPath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
579    ClusterId clusterId = null;
580    FileStatus status = fs.exists(idPath)? fs.getFileStatus(idPath):  null;
581    if (status != null) {
582      int len = Ints.checkedCast(status.getLen());
583      byte [] content = new byte[len];
584      FSDataInputStream in = fs.open(idPath);
585      try {
586        in.readFully(content);
587      } catch (EOFException eof) {
588        LOG.warn("Cluster ID file {} is empty", idPath);
589      } finally{
590        in.close();
591      }
592      try {
593        clusterId = ClusterId.parseFrom(content);
594      } catch (DeserializationException e) {
595        throw new IOException("content=" + Bytes.toString(content), e);
596      }
597      // If not pb'd, make it so.
598      if (!ProtobufUtil.isPBMagicPrefix(content)) {
599        String cid = null;
600        in = fs.open(idPath);
601        try {
602          cid = in.readUTF();
603          clusterId = new ClusterId(cid);
604        } catch (EOFException eof) {
605          LOG.warn("Cluster ID file {} is empty", idPath);
606        } finally {
607          in.close();
608        }
609        rewriteAsPb(fs, rootdir, idPath, clusterId);
610      }
611      return clusterId;
612    } else {
613      LOG.warn("Cluster ID file does not exist at {}", idPath);
614    }
615    return clusterId;
616  }
617
618  /**
619   * @param cid
620   * @throws IOException
621   */
622  private static void rewriteAsPb(final FileSystem fs, final Path rootdir, final Path p,
623      final ClusterId cid)
624  throws IOException {
625    // Rewrite the file as pb.  Move aside the old one first, write new
626    // then delete the moved-aside file.
627    Path movedAsideName = new Path(p + "." + System.currentTimeMillis());
628    if (!fs.rename(p, movedAsideName)) throw new IOException("Failed rename of " + p);
629    setClusterId(fs, rootdir, cid, 100);
630    if (!fs.delete(movedAsideName, false)) {
631      throw new IOException("Failed delete of " + movedAsideName);
632    }
633    LOG.debug("Rewrote the hbase.id file as pb");
634  }
635
636  /**
637   * Writes a new unique identifier for this cluster to the "hbase.id" file
638   * in the HBase root directory
639   * @param fs the root directory FileSystem
640   * @param rootdir the path to the HBase root directory
641   * @param clusterId the unique identifier to store
642   * @param wait how long (in milliseconds) to wait between retries
643   * @throws IOException if writing to the FileSystem fails and no wait value
644   */
645  public static void setClusterId(FileSystem fs, Path rootdir, ClusterId clusterId,
646      int wait) throws IOException {
647    while (true) {
648      try {
649        Path idFile = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
650        Path tempIdFile = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY +
651          Path.SEPARATOR + HConstants.CLUSTER_ID_FILE_NAME);
652        // Write the id file to a temporary location
653        FSDataOutputStream s = fs.create(tempIdFile);
654        try {
655          s.write(clusterId.toByteArray());
656          s.close();
657          s = null;
658          // Move the temporary file to its normal location. Throw an IOE if
659          // the rename failed
660          if (!fs.rename(tempIdFile, idFile)) {
661            throw new IOException("Unable to move temp version file to " + idFile);
662          }
663        } finally {
664          // Attempt to close the stream if still open on the way out
665          try {
666            if (s != null) s.close();
667          } catch (IOException ignore) { }
668        }
669        if (LOG.isDebugEnabled()) {
670          LOG.debug("Created cluster ID file at " + idFile.toString() + " with ID: " + clusterId);
671        }
672        return;
673      } catch (IOException ioe) {
674        if (wait > 0) {
675          LOG.warn("Unable to create cluster ID file in " + rootdir.toString() +
676              ", retrying in " + wait + "msec: " + StringUtils.stringifyException(ioe));
677          try {
678            Thread.sleep(wait);
679          } catch (InterruptedException e) {
680            throw (InterruptedIOException)new InterruptedIOException().initCause(e);
681          }
682        } else {
683          throw ioe;
684        }
685      }
686    }
687  }
688
689  /**
690   * If DFS, check safe mode and if so, wait until we clear it.
691   * @param conf configuration
692   * @param wait Sleep between retries
693   * @throws IOException e
694   */
695  public static void waitOnSafeMode(final Configuration conf,
696    final long wait)
697  throws IOException {
698    FileSystem fs = FileSystem.get(conf);
699    if (!(fs instanceof DistributedFileSystem)) return;
700    DistributedFileSystem dfs = (DistributedFileSystem)fs;
701    // Make sure dfs is not in safe mode
702    while (isInSafeMode(dfs)) {
703      LOG.info("Waiting for dfs to exit safe mode...");
704      try {
705        Thread.sleep(wait);
706      } catch (InterruptedException e) {
707        Thread.currentThread().interrupt();
708        throw (InterruptedIOException) new InterruptedIOException().initCause(e);
709      }
710    }
711  }
712
713  /**
714   * Checks if meta region exists
715   * @param fs file system
716   * @param rootDir root directory of HBase installation
717   * @return true if exists
718   */
719  public static boolean metaRegionExists(FileSystem fs, Path rootDir) throws IOException {
720    Path metaRegionDir = getRegionDirFromRootDir(rootDir, RegionInfoBuilder.FIRST_META_REGIONINFO);
721    return fs.exists(metaRegionDir);
722  }
723
724  /**
725   * Compute HDFS blocks distribution of a given file, or a portion of the file
726   * @param fs file system
727   * @param status file status of the file
728   * @param start start position of the portion
729   * @param length length of the portion
730   * @return The HDFS blocks distribution
731   */
732  static public HDFSBlocksDistribution computeHDFSBlocksDistribution(
733    final FileSystem fs, FileStatus status, long start, long length)
734    throws IOException {
735    HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution();
736    BlockLocation [] blockLocations =
737      fs.getFileBlockLocations(status, start, length);
738    for(BlockLocation bl : blockLocations) {
739      String [] hosts = bl.getHosts();
740      long len = bl.getLength();
741      blocksDistribution.addHostsAndBlockWeight(hosts, len);
742    }
743
744    return blocksDistribution;
745  }
746
747  /**
748   * Update blocksDistribution with blockLocations
749   * @param blocksDistribution the hdfs blocks distribution
750   * @param blockLocations an array containing block location
751   */
752  static public void addToHDFSBlocksDistribution(
753      HDFSBlocksDistribution blocksDistribution, BlockLocation[] blockLocations)
754      throws IOException {
755    for (BlockLocation bl : blockLocations) {
756      String[] hosts = bl.getHosts();
757      long len = bl.getLength();
758      blocksDistribution.addHostsAndBlockWeight(hosts, len);
759    }
760  }
761
762  // TODO move this method OUT of FSUtils. No dependencies to HMaster
763  /**
764   * Returns the total overall fragmentation percentage. Includes hbase:meta and
765   * -ROOT- as well.
766   *
767   * @param master  The master defining the HBase root and file system
768   * @return A map for each table and its percentage (never null)
769   * @throws IOException When scanning the directory fails
770   */
771  public static int getTotalTableFragmentation(final HMaster master)
772  throws IOException {
773    Map<String, Integer> map = getTableFragmentation(master);
774    return map.isEmpty() ? -1 :  map.get("-TOTAL-");
775  }
776
777  /**
778   * Runs through the HBase rootdir and checks how many stores for each table
779   * have more than one file in them. Checks -ROOT- and hbase:meta too. The total
780   * percentage across all tables is stored under the special key "-TOTAL-".
781   *
782   * @param master  The master defining the HBase root and file system.
783   * @return A map for each table and its percentage (never null).
784   *
785   * @throws IOException When scanning the directory fails.
786   */
787  public static Map<String, Integer> getTableFragmentation(
788    final HMaster master)
789  throws IOException {
790    Path path = getRootDir(master.getConfiguration());
791    // since HMaster.getFileSystem() is package private
792    FileSystem fs = path.getFileSystem(master.getConfiguration());
793    return getTableFragmentation(fs, path);
794  }
795
796  /**
797   * Runs through the HBase rootdir and checks how many stores for each table
798   * have more than one file in them. Checks -ROOT- and hbase:meta too. The total
799   * percentage across all tables is stored under the special key "-TOTAL-".
800   *
801   * @param fs  The file system to use
802   * @param hbaseRootDir  The root directory to scan
803   * @return A map for each table and its percentage (never null)
804   * @throws IOException When scanning the directory fails
805   */
806  public static Map<String, Integer> getTableFragmentation(
807    final FileSystem fs, final Path hbaseRootDir)
808  throws IOException {
809    Map<String, Integer> frags = new HashMap<>();
810    int cfCountTotal = 0;
811    int cfFragTotal = 0;
812    PathFilter regionFilter = new RegionDirFilter(fs);
813    PathFilter familyFilter = new FamilyDirFilter(fs);
814    List<Path> tableDirs = getTableDirs(fs, hbaseRootDir);
815    for (Path d : tableDirs) {
816      int cfCount = 0;
817      int cfFrag = 0;
818      FileStatus[] regionDirs = fs.listStatus(d, regionFilter);
819      for (FileStatus regionDir : regionDirs) {
820        Path dd = regionDir.getPath();
821        // else its a region name, now look in region for families
822        FileStatus[] familyDirs = fs.listStatus(dd, familyFilter);
823        for (FileStatus familyDir : familyDirs) {
824          cfCount++;
825          cfCountTotal++;
826          Path family = familyDir.getPath();
827          // now in family make sure only one file
828          FileStatus[] familyStatus = fs.listStatus(family);
829          if (familyStatus.length > 1) {
830            cfFrag++;
831            cfFragTotal++;
832          }
833        }
834      }
835      // compute percentage per table and store in result list
836      frags.put(FSUtils.getTableName(d).getNameAsString(),
837        cfCount == 0? 0: Math.round((float) cfFrag / cfCount * 100));
838    }
839    // set overall percentage for all tables
840    frags.put("-TOTAL-",
841      cfCountTotal == 0? 0: Math.round((float) cfFragTotal / cfCountTotal * 100));
842    return frags;
843  }
844
845  /**
846   * A {@link PathFilter} that returns only regular files.
847   */
848  static class FileFilter extends AbstractFileStatusFilter {
849    private final FileSystem fs;
850
851    public FileFilter(final FileSystem fs) {
852      this.fs = fs;
853    }
854
855    @Override
856    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
857      try {
858        return isFile(fs, isDir, p);
859      } catch (IOException e) {
860        LOG.warn("Unable to verify if path={} is a regular file", p, e);
861        return false;
862      }
863    }
864  }
865
866  /**
867   * Directory filter that doesn't include any of the directories in the specified blacklist
868   */
869  public static class BlackListDirFilter extends AbstractFileStatusFilter {
870    private final FileSystem fs;
871    private List<String> blacklist;
872
873    /**
874     * Create a filter on the givem filesystem with the specified blacklist
875     * @param fs filesystem to filter
876     * @param directoryNameBlackList list of the names of the directories to filter. If
877     *          <tt>null</tt>, all directories are returned
878     */
879    @SuppressWarnings("unchecked")
880    public BlackListDirFilter(final FileSystem fs, final List<String> directoryNameBlackList) {
881      this.fs = fs;
882      blacklist =
883        (List<String>) (directoryNameBlackList == null ? Collections.emptyList()
884          : directoryNameBlackList);
885    }
886
887    @Override
888    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
889      if (!isValidName(p.getName())) {
890        return false;
891      }
892
893      try {
894        return isDirectory(fs, isDir, p);
895      } catch (IOException e) {
896        LOG.warn("An error occurred while verifying if [{}] is a valid directory."
897            + " Returning 'not valid' and continuing.", p, e);
898        return false;
899      }
900    }
901
902    protected boolean isValidName(final String name) {
903      return !blacklist.contains(name);
904    }
905  }
906
907  /**
908   * A {@link PathFilter} that only allows directories.
909   */
910  public static class DirFilter extends BlackListDirFilter {
911
912    public DirFilter(FileSystem fs) {
913      super(fs, null);
914    }
915  }
916
917  /**
918   * A {@link PathFilter} that returns usertable directories. To get all directories use the
919   * {@link BlackListDirFilter} with a <tt>null</tt> blacklist
920   */
921  public static class UserTableDirFilter extends BlackListDirFilter {
922    public UserTableDirFilter(FileSystem fs) {
923      super(fs, HConstants.HBASE_NON_TABLE_DIRS);
924    }
925
926    @Override
927    protected boolean isValidName(final String name) {
928      if (!super.isValidName(name))
929        return false;
930
931      try {
932        TableName.isLegalTableQualifierName(Bytes.toBytes(name));
933      } catch (IllegalArgumentException e) {
934        LOG.info("Invalid table name: {}", name);
935        return false;
936      }
937      return true;
938    }
939  }
940
941  /**
942   * Recover file lease. Used when a file might be suspect
943   * to be had been left open by another process.
944   * @param fs FileSystem handle
945   * @param p Path of file to recover lease
946   * @param conf Configuration handle
947   * @throws IOException
948   */
949  public abstract void recoverFileLease(final FileSystem fs, final Path p,
950      Configuration conf, CancelableProgressable reporter) throws IOException;
951
952  public static List<Path> getTableDirs(final FileSystem fs, final Path rootdir)
953      throws IOException {
954    List<Path> tableDirs = new ArrayList<>();
955
956    for (FileStatus status : fs
957        .globStatus(new Path(rootdir, new Path(HConstants.BASE_NAMESPACE_DIR, "*")))) {
958      tableDirs.addAll(FSUtils.getLocalTableDirs(fs, status.getPath()));
959    }
960    return tableDirs;
961  }
962
963  /**
964   * @param fs
965   * @param rootdir
966   * @return All the table directories under <code>rootdir</code>. Ignore non table hbase folders such as
967   * .logs, .oldlogs, .corrupt folders.
968   * @throws IOException
969   */
970  public static List<Path> getLocalTableDirs(final FileSystem fs, final Path rootdir)
971      throws IOException {
972    // presumes any directory under hbase.rootdir is a table
973    FileStatus[] dirs = fs.listStatus(rootdir, new UserTableDirFilter(fs));
974    List<Path> tabledirs = new ArrayList<>(dirs.length);
975    for (FileStatus dir: dirs) {
976      tabledirs.add(dir.getPath());
977    }
978    return tabledirs;
979  }
980
981  /**
982   * Filter for all dirs that don't start with '.'
983   */
984  public static class RegionDirFilter extends AbstractFileStatusFilter {
985    // This pattern will accept 0.90+ style hex region dirs and older numeric region dir names.
986    final public static Pattern regionDirPattern = Pattern.compile("^[0-9a-f]*$");
987    final FileSystem fs;
988
989    public RegionDirFilter(FileSystem fs) {
990      this.fs = fs;
991    }
992
993    @Override
994    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
995      if (!regionDirPattern.matcher(p.getName()).matches()) {
996        return false;
997      }
998
999      try {
1000        return isDirectory(fs, isDir, p);
1001      } catch (IOException ioe) {
1002        // Maybe the file was moved or the fs was disconnected.
1003        LOG.warn("Skipping file {} due to IOException", p, ioe);
1004        return false;
1005      }
1006    }
1007  }
1008
1009  /**
1010   * Given a particular table dir, return all the regiondirs inside it, excluding files such as
1011   * .tableinfo
1012   * @param fs A file system for the Path
1013   * @param tableDir Path to a specific table directory &lt;hbase.rootdir&gt;/&lt;tabledir&gt;
1014   * @return List of paths to valid region directories in table dir.
1015   * @throws IOException
1016   */
1017  public static List<Path> getRegionDirs(final FileSystem fs, final Path tableDir) throws IOException {
1018    // assumes we are in a table dir.
1019    List<FileStatus> rds = listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs));
1020    if (rds == null) {
1021      return Collections.emptyList();
1022    }
1023    List<Path> regionDirs = new ArrayList<>(rds.size());
1024    for (FileStatus rdfs: rds) {
1025      Path rdPath = rdfs.getPath();
1026      regionDirs.add(rdPath);
1027    }
1028    return regionDirs;
1029  }
1030
1031  public static Path getRegionDirFromRootDir(Path rootDir, RegionInfo region) {
1032    return getRegionDirFromTableDir(getTableDir(rootDir, region.getTable()), region);
1033  }
1034
1035  public static Path getRegionDirFromTableDir(Path tableDir, RegionInfo region) {
1036    return new Path(tableDir, ServerRegionReplicaUtil.getRegionInfoForFs(region).getEncodedName());
1037  }
1038
1039  /**
1040   * Filter for all dirs that are legal column family names.  This is generally used for colfam
1041   * dirs &lt;hbase.rootdir&gt;/&lt;tabledir&gt;/&lt;regiondir&gt;/&lt;colfamdir&gt;.
1042   */
1043  public static class FamilyDirFilter extends AbstractFileStatusFilter {
1044    final FileSystem fs;
1045
1046    public FamilyDirFilter(FileSystem fs) {
1047      this.fs = fs;
1048    }
1049
1050    @Override
1051    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1052      try {
1053        // throws IAE if invalid
1054        HColumnDescriptor.isLegalFamilyName(Bytes.toBytes(p.getName()));
1055      } catch (IllegalArgumentException iae) {
1056        // path name is an invalid family name and thus is excluded.
1057        return false;
1058      }
1059
1060      try {
1061        return isDirectory(fs, isDir, p);
1062      } catch (IOException ioe) {
1063        // Maybe the file was moved or the fs was disconnected.
1064        LOG.warn("Skipping file {} due to IOException", p, ioe);
1065        return false;
1066      }
1067    }
1068  }
1069
1070  /**
1071   * Given a particular region dir, return all the familydirs inside it
1072   *
1073   * @param fs A file system for the Path
1074   * @param regionDir Path to a specific region directory
1075   * @return List of paths to valid family directories in region dir.
1076   * @throws IOException
1077   */
1078  public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir) throws IOException {
1079    // assumes we are in a region dir.
1080    FileStatus[] fds = fs.listStatus(regionDir, new FamilyDirFilter(fs));
1081    List<Path> familyDirs = new ArrayList<>(fds.length);
1082    for (FileStatus fdfs: fds) {
1083      Path fdPath = fdfs.getPath();
1084      familyDirs.add(fdPath);
1085    }
1086    return familyDirs;
1087  }
1088
1089  public static List<Path> getReferenceFilePaths(final FileSystem fs, final Path familyDir) throws IOException {
1090    List<FileStatus> fds = listStatusWithStatusFilter(fs, familyDir, new ReferenceFileFilter(fs));
1091    if (fds == null) {
1092      return Collections.emptyList();
1093    }
1094    List<Path> referenceFiles = new ArrayList<>(fds.size());
1095    for (FileStatus fdfs: fds) {
1096      Path fdPath = fdfs.getPath();
1097      referenceFiles.add(fdPath);
1098    }
1099    return referenceFiles;
1100  }
1101
1102  /**
1103   * Filter for HFiles that excludes reference files.
1104   */
1105  public static class HFileFilter extends AbstractFileStatusFilter {
1106    final FileSystem fs;
1107
1108    public HFileFilter(FileSystem fs) {
1109      this.fs = fs;
1110    }
1111
1112    @Override
1113    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1114      if (!StoreFileInfo.isHFile(p)) {
1115        return false;
1116      }
1117
1118      try {
1119        return isFile(fs, isDir, p);
1120      } catch (IOException ioe) {
1121        // Maybe the file was moved or the fs was disconnected.
1122        LOG.warn("Skipping file {} due to IOException", p, ioe);
1123        return false;
1124      }
1125    }
1126  }
1127
1128  /**
1129   * Filter for HFileLinks (StoreFiles and HFiles not included).
1130   * the filter itself does not consider if a link is file or not.
1131   */
1132  public static class HFileLinkFilter implements PathFilter {
1133
1134    @Override
1135    public boolean accept(Path p) {
1136      return HFileLink.isHFileLink(p);
1137    }
1138  }
1139
1140  public static class ReferenceFileFilter extends AbstractFileStatusFilter {
1141
1142    private final FileSystem fs;
1143
1144    public ReferenceFileFilter(FileSystem fs) {
1145      this.fs = fs;
1146    }
1147
1148    @Override
1149    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1150      if (!StoreFileInfo.isReference(p)) {
1151        return false;
1152      }
1153
1154      try {
1155        // only files can be references.
1156        return isFile(fs, isDir, p);
1157      } catch (IOException ioe) {
1158        // Maybe the file was moved or the fs was disconnected.
1159        LOG.warn("Skipping file {} due to IOException", p, ioe);
1160        return false;
1161      }
1162    }
1163  }
1164
1165  /**
1166   * Called every so-often by storefile map builder getTableStoreFilePathMap to
1167   * report progress.
1168   */
1169  interface ProgressReporter {
1170    /**
1171     * @param status File or directory we are about to process.
1172     */
1173    void progress(FileStatus status);
1174  }
1175
1176  /**
1177   * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1178   * table StoreFile names to the full Path.
1179   * <br>
1180   * Example...<br>
1181   * Key = 3944417774205889744  <br>
1182   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1183   *
1184   * @param map map to add values.  If null, this method will create and populate one to return
1185   * @param fs  The file system to use.
1186   * @param hbaseRootDir  The root directory to scan.
1187   * @param tableName name of the table to scan.
1188   * @return Map keyed by StoreFile name with a value of the full Path.
1189   * @throws IOException When scanning the directory fails.
1190   * @throws InterruptedException
1191   */
1192  public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map,
1193  final FileSystem fs, final Path hbaseRootDir, TableName tableName)
1194  throws IOException, InterruptedException {
1195    return getTableStoreFilePathMap(map, fs, hbaseRootDir, tableName, null, null,
1196        (ProgressReporter)null);
1197  }
1198
1199  /**
1200   * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1201   * table StoreFile names to the full Path.  Note that because this method can be called
1202   * on a 'live' HBase system that we will skip files that no longer exist by the time
1203   * we traverse them and similarly the user of the result needs to consider that some
1204   * entries in this map may not exist by the time this call completes.
1205   * <br>
1206   * Example...<br>
1207   * Key = 3944417774205889744  <br>
1208   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1209   *
1210   * @param resultMap map to add values.  If null, this method will create and populate one to return
1211   * @param fs  The file system to use.
1212   * @param hbaseRootDir  The root directory to scan.
1213   * @param tableName name of the table to scan.
1214   * @param sfFilter optional path filter to apply to store files
1215   * @param executor optional executor service to parallelize this operation
1216   * @param progressReporter Instance or null; gets called every time we move to new region of
1217   *   family dir and for each store file.
1218   * @return Map keyed by StoreFile name with a value of the full Path.
1219   * @throws IOException When scanning the directory fails.
1220   * @deprecated Since 2.3.0. For removal in hbase4. Use ProgressReporter override instead.
1221   */
1222  @Deprecated
1223  public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap,
1224      final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter,
1225      ExecutorService executor, final HbckErrorReporter progressReporter)
1226      throws IOException, InterruptedException {
1227    return getTableStoreFilePathMap(resultMap, fs, hbaseRootDir, tableName, sfFilter, executor,
1228        new ProgressReporter() {
1229          @Override
1230          public void progress(FileStatus status) {
1231            // status is not used in this implementation.
1232            progressReporter.progress();
1233          }
1234        });
1235  }
1236
1237  /**
1238   * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1239   * table StoreFile names to the full Path.  Note that because this method can be called
1240   * on a 'live' HBase system that we will skip files that no longer exist by the time
1241   * we traverse them and similarly the user of the result needs to consider that some
1242   * entries in this map may not exist by the time this call completes.
1243   * <br>
1244   * Example...<br>
1245   * Key = 3944417774205889744  <br>
1246   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1247   *
1248   * @param resultMap map to add values.  If null, this method will create and populate one
1249   *   to return
1250   * @param fs  The file system to use.
1251   * @param hbaseRootDir  The root directory to scan.
1252   * @param tableName name of the table to scan.
1253   * @param sfFilter optional path filter to apply to store files
1254   * @param executor optional executor service to parallelize this operation
1255   * @param progressReporter Instance or null; gets called every time we move to new region of
1256   *   family dir and for each store file.
1257   * @return Map keyed by StoreFile name with a value of the full Path.
1258   * @throws IOException When scanning the directory fails.
1259   * @throws InterruptedException the thread is interrupted, either before or during the activity.
1260   */
1261  public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap,
1262      final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter,
1263      ExecutorService executor, final ProgressReporter progressReporter)
1264    throws IOException, InterruptedException {
1265
1266    final Map<String, Path> finalResultMap =
1267        resultMap == null ? new ConcurrentHashMap<>(128, 0.75f, 32) : resultMap;
1268
1269    // only include the directory paths to tables
1270    Path tableDir = FSUtils.getTableDir(hbaseRootDir, tableName);
1271    // Inside a table, there are compaction.dir directories to skip.  Otherwise, all else
1272    // should be regions.
1273    final FamilyDirFilter familyFilter = new FamilyDirFilter(fs);
1274    final Vector<Exception> exceptions = new Vector<>();
1275
1276    try {
1277      List<FileStatus> regionDirs = FSUtils.listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs));
1278      if (regionDirs == null) {
1279        return finalResultMap;
1280      }
1281
1282      final List<Future<?>> futures = new ArrayList<>(regionDirs.size());
1283
1284      for (FileStatus regionDir : regionDirs) {
1285        if (null != progressReporter) {
1286          progressReporter.progress(regionDir);
1287        }
1288        final Path dd = regionDir.getPath();
1289
1290        if (!exceptions.isEmpty()) {
1291          break;
1292        }
1293
1294        Runnable getRegionStoreFileMapCall = new Runnable() {
1295          @Override
1296          public void run() {
1297            try {
1298              HashMap<String,Path> regionStoreFileMap = new HashMap<>();
1299              List<FileStatus> familyDirs = FSUtils.listStatusWithStatusFilter(fs, dd, familyFilter);
1300              if (familyDirs == null) {
1301                if (!fs.exists(dd)) {
1302                  LOG.warn("Skipping region because it no longer exists: " + dd);
1303                } else {
1304                  LOG.warn("Skipping region because it has no family dirs: " + dd);
1305                }
1306                return;
1307              }
1308              for (FileStatus familyDir : familyDirs) {
1309                if (null != progressReporter) {
1310                  progressReporter.progress(familyDir);
1311                }
1312                Path family = familyDir.getPath();
1313                if (family.getName().equals(HConstants.RECOVERED_EDITS_DIR)) {
1314                  continue;
1315                }
1316                // now in family, iterate over the StoreFiles and
1317                // put in map
1318                FileStatus[] familyStatus = fs.listStatus(family);
1319                for (FileStatus sfStatus : familyStatus) {
1320                  if (null != progressReporter) {
1321                    progressReporter.progress(sfStatus);
1322                  }
1323                  Path sf = sfStatus.getPath();
1324                  if (sfFilter == null || sfFilter.accept(sf)) {
1325                    regionStoreFileMap.put( sf.getName(), sf);
1326                  }
1327                }
1328              }
1329              finalResultMap.putAll(regionStoreFileMap);
1330            } catch (Exception e) {
1331              LOG.error("Could not get region store file map for region: " + dd, e);
1332              exceptions.add(e);
1333            }
1334          }
1335        };
1336
1337        // If executor is available, submit async tasks to exec concurrently, otherwise
1338        // just do serial sync execution
1339        if (executor != null) {
1340          Future<?> future = executor.submit(getRegionStoreFileMapCall);
1341          futures.add(future);
1342        } else {
1343          FutureTask<?> future = new FutureTask<>(getRegionStoreFileMapCall, null);
1344          future.run();
1345          futures.add(future);
1346        }
1347      }
1348
1349      // Ensure all pending tasks are complete (or that we run into an exception)
1350      for (Future<?> f : futures) {
1351        if (!exceptions.isEmpty()) {
1352          break;
1353        }
1354        try {
1355          f.get();
1356        } catch (ExecutionException e) {
1357          LOG.error("Unexpected exec exception!  Should've been caught already.  (Bug?)", e);
1358          // Shouldn't happen, we already logged/caught any exceptions in the Runnable
1359        }
1360      }
1361    } catch (IOException e) {
1362      LOG.error("Cannot execute getTableStoreFilePathMap for " + tableName, e);
1363      exceptions.add(e);
1364    } finally {
1365      if (!exceptions.isEmpty()) {
1366        // Just throw the first exception as an indication something bad happened
1367        // Don't need to propagate all the exceptions, we already logged them all anyway
1368        Throwables.propagateIfInstanceOf(exceptions.firstElement(), IOException.class);
1369        throw Throwables.propagate(exceptions.firstElement());
1370      }
1371    }
1372
1373    return finalResultMap;
1374  }
1375
1376  public static int getRegionReferenceFileCount(final FileSystem fs, final Path p) {
1377    int result = 0;
1378    try {
1379      for (Path familyDir:getFamilyDirs(fs, p)){
1380        result += getReferenceFilePaths(fs, familyDir).size();
1381      }
1382    } catch (IOException e) {
1383      LOG.warn("Error counting reference files", e);
1384    }
1385    return result;
1386  }
1387
1388  /**
1389   * Runs through the HBase rootdir and creates a reverse lookup map for
1390   * table StoreFile names to the full Path.
1391   * <br>
1392   * Example...<br>
1393   * Key = 3944417774205889744  <br>
1394   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1395   *
1396   * @param fs  The file system to use.
1397   * @param hbaseRootDir  The root directory to scan.
1398   * @return Map keyed by StoreFile name with a value of the full Path.
1399   * @throws IOException When scanning the directory fails.
1400   */
1401  public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs,
1402      final Path hbaseRootDir)
1403  throws IOException, InterruptedException {
1404    return getTableStoreFilePathMap(fs, hbaseRootDir, null, null, (ProgressReporter)null);
1405  }
1406
1407  /**
1408   * Runs through the HBase rootdir and creates a reverse lookup map for
1409   * table StoreFile names to the full Path.
1410   * <br>
1411   * Example...<br>
1412   * Key = 3944417774205889744  <br>
1413   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1414   *
1415   * @param fs  The file system to use.
1416   * @param hbaseRootDir  The root directory to scan.
1417   * @param sfFilter optional path filter to apply to store files
1418   * @param executor optional executor service to parallelize this operation
1419   * @param progressReporter Instance or null; gets called every time we move to new region of
1420   *   family dir and for each store file.
1421   * @return Map keyed by StoreFile name with a value of the full Path.
1422   * @throws IOException When scanning the directory fails.
1423   * @deprecated Since 2.3.0. Will be removed in hbase4. Used {@link
1424   *   #getTableStoreFilePathMap(FileSystem, Path, PathFilter, ExecutorService, ProgressReporter)}
1425   */
1426  @Deprecated
1427  public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs,
1428      final Path hbaseRootDir, PathFilter sfFilter, ExecutorService executor,
1429      HbckErrorReporter progressReporter)
1430    throws IOException, InterruptedException {
1431    return getTableStoreFilePathMap(fs, hbaseRootDir, sfFilter, executor,
1432        new ProgressReporter() {
1433          @Override
1434          public void progress(FileStatus status) {
1435            // status is not used in this implementation.
1436            progressReporter.progress();
1437          }
1438        });
1439  }
1440
1441  /**
1442   * Runs through the HBase rootdir and creates a reverse lookup map for
1443   * table StoreFile names to the full Path.
1444   * <br>
1445   * Example...<br>
1446   * Key = 3944417774205889744  <br>
1447   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1448   *
1449   * @param fs  The file system to use.
1450   * @param hbaseRootDir  The root directory to scan.
1451   * @param sfFilter optional path filter to apply to store files
1452   * @param executor optional executor service to parallelize this operation
1453   * @param progressReporter Instance or null; gets called every time we move to new region of
1454   *   family dir and for each store file.
1455   * @return Map keyed by StoreFile name with a value of the full Path.
1456   * @throws IOException When scanning the directory fails.
1457   * @throws InterruptedException
1458   */
1459  public static Map<String, Path> getTableStoreFilePathMap(
1460    final FileSystem fs, final Path hbaseRootDir, PathFilter sfFilter,
1461        ExecutorService executor, ProgressReporter progressReporter)
1462  throws IOException, InterruptedException {
1463    ConcurrentHashMap<String, Path> map = new ConcurrentHashMap<>(1024, 0.75f, 32);
1464
1465    // if this method looks similar to 'getTableFragmentation' that is because
1466    // it was borrowed from it.
1467
1468    // only include the directory paths to tables
1469    for (Path tableDir : FSUtils.getTableDirs(fs, hbaseRootDir)) {
1470      getTableStoreFilePathMap(map, fs, hbaseRootDir,
1471          FSUtils.getTableName(tableDir), sfFilter, executor, progressReporter);
1472    }
1473    return map;
1474  }
1475
1476  /**
1477   * Filters FileStatuses in an array and returns a list
1478   *
1479   * @param input   An array of FileStatuses
1480   * @param filter  A required filter to filter the array
1481   * @return        A list of FileStatuses
1482   */
1483  public static List<FileStatus> filterFileStatuses(FileStatus[] input,
1484      FileStatusFilter filter) {
1485    if (input == null) return null;
1486    return filterFileStatuses(Iterators.forArray(input), filter);
1487  }
1488
1489  /**
1490   * Filters FileStatuses in an iterator and returns a list
1491   *
1492   * @param input   An iterator of FileStatuses
1493   * @param filter  A required filter to filter the array
1494   * @return        A list of FileStatuses
1495   */
1496  public static List<FileStatus> filterFileStatuses(Iterator<FileStatus> input,
1497      FileStatusFilter filter) {
1498    if (input == null) return null;
1499    ArrayList<FileStatus> results = new ArrayList<>();
1500    while (input.hasNext()) {
1501      FileStatus f = input.next();
1502      if (filter.accept(f)) {
1503        results.add(f);
1504      }
1505    }
1506    return results;
1507  }
1508
1509  /**
1510   * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
1511   * This accommodates differences between hadoop versions, where hadoop 1
1512   * does not throw a FileNotFoundException, and return an empty FileStatus[]
1513   * while Hadoop 2 will throw FileNotFoundException.
1514   *
1515   * @param fs file system
1516   * @param dir directory
1517   * @param filter file status filter
1518   * @return null if dir is empty or doesn't exist, otherwise FileStatus list
1519   */
1520  public static List<FileStatus> listStatusWithStatusFilter(final FileSystem fs,
1521      final Path dir, final FileStatusFilter filter) throws IOException {
1522    FileStatus [] status = null;
1523    try {
1524      status = fs.listStatus(dir);
1525    } catch (FileNotFoundException fnfe) {
1526      LOG.trace("{} does not exist", dir);
1527      return null;
1528    }
1529
1530    if (ArrayUtils.getLength(status) == 0)  {
1531      return null;
1532    }
1533
1534    if (filter == null) {
1535      return Arrays.asList(status);
1536    } else {
1537      List<FileStatus> status2 = filterFileStatuses(status, filter);
1538      if (status2 == null || status2.isEmpty()) {
1539        return null;
1540      } else {
1541        return status2;
1542      }
1543    }
1544  }
1545
1546  /**
1547   * Throw an exception if an action is not permitted by a user on a file.
1548   *
1549   * @param ugi
1550   *          the user
1551   * @param file
1552   *          the file
1553   * @param action
1554   *          the action
1555   */
1556  public static void checkAccess(UserGroupInformation ugi, FileStatus file,
1557      FsAction action) throws AccessDeniedException {
1558    if (ugi.getShortUserName().equals(file.getOwner())) {
1559      if (file.getPermission().getUserAction().implies(action)) {
1560        return;
1561      }
1562    } else if (ArrayUtils.contains(ugi.getGroupNames(), file.getGroup())) {
1563      if (file.getPermission().getGroupAction().implies(action)) {
1564        return;
1565      }
1566    } else if (file.getPermission().getOtherAction().implies(action)) {
1567      return;
1568    }
1569    throw new AccessDeniedException("Permission denied:" + " action=" + action
1570        + " path=" + file.getPath() + " user=" + ugi.getShortUserName());
1571  }
1572
1573  /**
1574   * This function is to scan the root path of the file system to get the
1575   * degree of locality for each region on each of the servers having at least
1576   * one block of that region.
1577   * This is used by the tool {@link org.apache.hadoop.hbase.master.RegionPlacementMaintainer}
1578   *
1579   * @param conf
1580   *          the configuration to use
1581   * @return the mapping from region encoded name to a map of server names to
1582   *           locality fraction
1583   * @throws IOException
1584   *           in case of file system errors or interrupts
1585   */
1586  public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
1587      final Configuration conf) throws IOException {
1588    return getRegionDegreeLocalityMappingFromFS(
1589        conf, null,
1590        conf.getInt(THREAD_POOLSIZE, DEFAULT_THREAD_POOLSIZE));
1591
1592  }
1593
1594  /**
1595   * This function is to scan the root path of the file system to get the
1596   * degree of locality for each region on each of the servers having at least
1597   * one block of that region.
1598   *
1599   * @param conf
1600   *          the configuration to use
1601   * @param desiredTable
1602   *          the table you wish to scan locality for
1603   * @param threadPoolSize
1604   *          the thread pool size to use
1605   * @return the mapping from region encoded name to a map of server names to
1606   *           locality fraction
1607   * @throws IOException
1608   *           in case of file system errors or interrupts
1609   */
1610  public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
1611      final Configuration conf, final String desiredTable, int threadPoolSize)
1612      throws IOException {
1613    Map<String, Map<String, Float>> regionDegreeLocalityMapping = new ConcurrentHashMap<>();
1614    getRegionLocalityMappingFromFS(conf, desiredTable, threadPoolSize, regionDegreeLocalityMapping);
1615    return regionDegreeLocalityMapping;
1616  }
1617
1618  /**
1619   * This function is to scan the root path of the file system to get either the
1620   * mapping between the region name and its best locality region server or the
1621   * degree of locality of each region on each of the servers having at least
1622   * one block of that region. The output map parameters are both optional.
1623   *
1624   * @param conf
1625   *          the configuration to use
1626   * @param desiredTable
1627   *          the table you wish to scan locality for
1628   * @param threadPoolSize
1629   *          the thread pool size to use
1630   * @param regionDegreeLocalityMapping
1631   *          the map into which to put the locality degree mapping or null,
1632   *          must be a thread-safe implementation
1633   * @throws IOException
1634   *           in case of file system errors or interrupts
1635   */
1636  private static void getRegionLocalityMappingFromFS(final Configuration conf,
1637      final String desiredTable, int threadPoolSize,
1638      final Map<String, Map<String, Float>> regionDegreeLocalityMapping) throws IOException {
1639    final FileSystem fs =  FileSystem.get(conf);
1640    final Path rootPath = FSUtils.getRootDir(conf);
1641    final long startTime = EnvironmentEdgeManager.currentTime();
1642    final Path queryPath;
1643    // The table files are in ${hbase.rootdir}/data/<namespace>/<table>/*
1644    if (null == desiredTable) {
1645      queryPath = new Path(new Path(rootPath, HConstants.BASE_NAMESPACE_DIR).toString() + "/*/*/*/");
1646    } else {
1647      queryPath = new Path(FSUtils.getTableDir(rootPath, TableName.valueOf(desiredTable)).toString() + "/*/");
1648    }
1649
1650    // reject all paths that are not appropriate
1651    PathFilter pathFilter = new PathFilter() {
1652      @Override
1653      public boolean accept(Path path) {
1654        // this is the region name; it may get some noise data
1655        if (null == path) {
1656          return false;
1657        }
1658
1659        // no parent?
1660        Path parent = path.getParent();
1661        if (null == parent) {
1662          return false;
1663        }
1664
1665        String regionName = path.getName();
1666        if (null == regionName) {
1667          return false;
1668        }
1669
1670        if (!regionName.toLowerCase(Locale.ROOT).matches("[0-9a-f]+")) {
1671          return false;
1672        }
1673        return true;
1674      }
1675    };
1676
1677    FileStatus[] statusList = fs.globStatus(queryPath, pathFilter);
1678
1679    if (LOG.isDebugEnabled()) {
1680      LOG.debug("Query Path: {} ; # list of files: {}", queryPath, Arrays.toString(statusList));
1681    }
1682
1683    if (null == statusList) {
1684      return;
1685    }
1686
1687    // lower the number of threads in case we have very few expected regions
1688    threadPoolSize = Math.min(threadPoolSize, statusList.length);
1689
1690    // run in multiple threads
1691    final ExecutorService tpe = Executors.newFixedThreadPool(threadPoolSize,
1692      Threads.newDaemonThreadFactory("FSRegionQuery"));
1693    try {
1694      // ignore all file status items that are not of interest
1695      for (FileStatus regionStatus : statusList) {
1696        if (null == regionStatus || !regionStatus.isDirectory()) {
1697          continue;
1698        }
1699
1700        final Path regionPath = regionStatus.getPath();
1701        if (null != regionPath) {
1702          tpe.execute(new FSRegionScanner(fs, regionPath, null, regionDegreeLocalityMapping));
1703        }
1704      }
1705    } finally {
1706      tpe.shutdown();
1707      final long threadWakeFrequency = (long) conf.getInt(HConstants.THREAD_WAKE_FREQUENCY,
1708        HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
1709      try {
1710        // here we wait until TPE terminates, which is either naturally or by
1711        // exceptions in the execution of the threads
1712        while (!tpe.awaitTermination(threadWakeFrequency,
1713            TimeUnit.MILLISECONDS)) {
1714          // printing out rough estimate, so as to not introduce
1715          // AtomicInteger
1716          LOG.info("Locality checking is underway: { Scanned Regions : "
1717              + ((ThreadPoolExecutor) tpe).getCompletedTaskCount() + "/"
1718              + ((ThreadPoolExecutor) tpe).getTaskCount() + " }");
1719        }
1720      } catch (InterruptedException e) {
1721        Thread.currentThread().interrupt();
1722        throw (InterruptedIOException) new InterruptedIOException().initCause(e);
1723      }
1724    }
1725
1726    long overhead = EnvironmentEdgeManager.currentTime() - startTime;
1727    LOG.info("Scan DFS for locality info takes {}ms", overhead);
1728  }
1729
1730  /**
1731   * Do our short circuit read setup.
1732   * Checks buffer size to use and whether to do checksumming in hbase or hdfs.
1733   * @param conf
1734   */
1735  public static void setupShortCircuitRead(final Configuration conf) {
1736    // Check that the user has not set the "dfs.client.read.shortcircuit.skip.checksum" property.
1737    boolean shortCircuitSkipChecksum =
1738      conf.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false);
1739    boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
1740    if (shortCircuitSkipChecksum) {
1741      LOG.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not " +
1742        "be set to true." + (useHBaseChecksum ? " HBase checksum doesn't require " +
1743        "it, see https://issues.apache.org/jira/browse/HBASE-6868." : ""));
1744      assert !shortCircuitSkipChecksum; //this will fail if assertions are on
1745    }
1746    checkShortCircuitReadBufferSize(conf);
1747  }
1748
1749  /**
1750   * Check if short circuit read buffer size is set and if not, set it to hbase value.
1751   * @param conf
1752   */
1753  public static void checkShortCircuitReadBufferSize(final Configuration conf) {
1754    final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2;
1755    final int notSet = -1;
1756    // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2
1757    final String dfsKey = "dfs.client.read.shortcircuit.buffer.size";
1758    int size = conf.getInt(dfsKey, notSet);
1759    // If a size is set, return -- we will use it.
1760    if (size != notSet) return;
1761    // But short circuit buffer size is normally not set.  Put in place the hbase wanted size.
1762    int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize);
1763    conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
1764  }
1765
1766  /**
1767   * @param c
1768   * @return The DFSClient DFSHedgedReadMetrics instance or null if can't be found or not on hdfs.
1769   * @throws IOException
1770   */
1771  public static DFSHedgedReadMetrics getDFSHedgedReadMetrics(final Configuration c)
1772      throws IOException {
1773    if (!isHDFS(c)) return null;
1774    // getHedgedReadMetrics is package private. Get the DFSClient instance that is internal
1775    // to the DFS FS instance and make the method getHedgedReadMetrics accessible, then invoke it
1776    // to get the singleton instance of DFSHedgedReadMetrics shared by DFSClients.
1777    final String name = "getHedgedReadMetrics";
1778    DFSClient dfsclient = ((DistributedFileSystem)FileSystem.get(c)).getClient();
1779    Method m;
1780    try {
1781      m = dfsclient.getClass().getDeclaredMethod(name);
1782    } catch (NoSuchMethodException e) {
1783      LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " +
1784          e.getMessage());
1785      return null;
1786    } catch (SecurityException e) {
1787      LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " +
1788          e.getMessage());
1789      return null;
1790    }
1791    m.setAccessible(true);
1792    try {
1793      return (DFSHedgedReadMetrics)m.invoke(dfsclient);
1794    } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
1795      LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " +
1796          e.getMessage());
1797      return null;
1798    }
1799  }
1800
1801  public static List<Path> copyFilesParallel(FileSystem srcFS, Path src, FileSystem dstFS, Path dst,
1802      Configuration conf, int threads) throws IOException {
1803    ExecutorService pool = Executors.newFixedThreadPool(threads);
1804    List<Future<Void>> futures = new ArrayList<>();
1805    List<Path> traversedPaths;
1806    try {
1807      traversedPaths = copyFiles(srcFS, src, dstFS, dst, conf, pool, futures);
1808      for (Future<Void> future : futures) {
1809        future.get();
1810      }
1811    } catch (ExecutionException | InterruptedException | IOException e) {
1812      throw new IOException("Copy snapshot reference files failed", e);
1813    } finally {
1814      pool.shutdownNow();
1815    }
1816    return traversedPaths;
1817  }
1818
1819  private static List<Path> copyFiles(FileSystem srcFS, Path src, FileSystem dstFS, Path dst,
1820      Configuration conf, ExecutorService pool, List<Future<Void>> futures) throws IOException {
1821    List<Path> traversedPaths = new ArrayList<>();
1822    traversedPaths.add(dst);
1823    FileStatus currentFileStatus = srcFS.getFileStatus(src);
1824    if (currentFileStatus.isDirectory()) {
1825      if (!dstFS.mkdirs(dst)) {
1826        throw new IOException("Create directory failed: " + dst);
1827      }
1828      FileStatus[] subPaths = srcFS.listStatus(src);
1829      for (FileStatus subPath : subPaths) {
1830        traversedPaths.addAll(copyFiles(srcFS, subPath.getPath(), dstFS,
1831          new Path(dst, subPath.getPath().getName()), conf, pool, futures));
1832      }
1833    } else {
1834      Future<Void> future = pool.submit(() -> {
1835        FileUtil.copy(srcFS, src, dstFS, dst, false, false, conf);
1836        return null;
1837      });
1838      futures.add(future);
1839    }
1840    return traversedPaths;
1841  }
1842}