001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.util;
020
021import edu.umd.cs.findbugs.annotations.CheckForNull;
022import java.io.ByteArrayInputStream;
023import java.io.DataInputStream;
024import java.io.EOFException;
025import java.io.FileNotFoundException;
026import java.io.IOException;
027import java.io.InputStream;
028import java.io.InterruptedIOException;
029import java.lang.reflect.InvocationTargetException;
030import java.lang.reflect.Method;
031import java.net.InetSocketAddress;
032import java.util.ArrayList;
033import java.util.Arrays;
034import java.util.Collections;
035import java.util.HashMap;
036import java.util.Iterator;
037import java.util.LinkedList;
038import java.util.List;
039import java.util.Locale;
040import java.util.Map;
041import java.util.Vector;
042import java.util.concurrent.ConcurrentHashMap;
043import java.util.concurrent.ExecutionException;
044import java.util.concurrent.ExecutorService;
045import java.util.concurrent.Executors;
046import java.util.concurrent.Future;
047import java.util.concurrent.FutureTask;
048import java.util.concurrent.ThreadPoolExecutor;
049import java.util.concurrent.TimeUnit;
050import java.util.regex.Pattern;
051
052import org.apache.hadoop.conf.Configuration;
053import org.apache.hadoop.fs.BlockLocation;
054import org.apache.hadoop.fs.FSDataInputStream;
055import org.apache.hadoop.fs.FSDataOutputStream;
056import org.apache.hadoop.fs.FileStatus;
057import org.apache.hadoop.fs.FileSystem;
058import org.apache.hadoop.fs.FileUtil;
059import org.apache.hadoop.fs.Path;
060import org.apache.hadoop.fs.PathFilter;
061import org.apache.hadoop.fs.permission.FsAction;
062import org.apache.hadoop.fs.permission.FsPermission;
063import org.apache.hadoop.hbase.ClusterId;
064import org.apache.hadoop.hbase.HColumnDescriptor;
065import org.apache.hadoop.hbase.HConstants;
066import org.apache.hadoop.hbase.HDFSBlocksDistribution;
067import org.apache.hadoop.hbase.HRegionInfo;
068import org.apache.hadoop.hbase.TableName;
069import org.apache.hadoop.hbase.client.RegionInfo;
070import org.apache.hadoop.hbase.client.RegionInfoBuilder;
071import org.apache.hadoop.hbase.exceptions.DeserializationException;
072import org.apache.hadoop.hbase.fs.HFileSystem;
073import org.apache.hadoop.hbase.io.HFileLink;
074import org.apache.hadoop.hbase.master.HMaster;
075import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
076import org.apache.hadoop.hbase.security.AccessDeniedException;
077import org.apache.hadoop.hdfs.DFSClient;
078import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
079import org.apache.hadoop.hdfs.DistributedFileSystem;
080import org.apache.hadoop.hdfs.protocol.HdfsConstants;
081import org.apache.hadoop.io.IOUtils;
082import org.apache.hadoop.ipc.RemoteException;
083import org.apache.hadoop.security.UserGroupInformation;
084import org.apache.hadoop.util.Progressable;
085import org.apache.hadoop.util.ReflectionUtils;
086import org.apache.hadoop.util.StringUtils;
087import org.apache.yetus.audience.InterfaceAudience;
088import org.slf4j.Logger;
089import org.slf4j.LoggerFactory;
090
091import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
092import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
093import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
094import org.apache.hbase.thirdparty.com.google.common.primitives.Ints;
095
096import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
097import org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos;
098
099/**
100 * Utility methods for interacting with the underlying file system.
101 */
102@InterfaceAudience.Private
103public abstract class FSUtils extends CommonFSUtils {
104  private static final Logger LOG = LoggerFactory.getLogger(FSUtils.class);
105
106  private static final String THREAD_POOLSIZE = "hbase.client.localityCheck.threadPoolSize";
107  private static final int DEFAULT_THREAD_POOLSIZE = 2;
108
109  /** Set to true on Windows platforms */
110  @VisibleForTesting // currently only used in testing. TODO refactor into a test class
111  public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
112
113  protected FSUtils() {
114    super();
115  }
116
117  /**
118   * @return True is <code>fs</code> is instance of DistributedFileSystem
119   * @throws IOException
120   */
121  public static boolean isDistributedFileSystem(final FileSystem fs) throws IOException {
122    FileSystem fileSystem = fs;
123    // If passed an instance of HFileSystem, it fails instanceof DistributedFileSystem.
124    // Check its backing fs for dfs-ness.
125    if (fs instanceof HFileSystem) {
126      fileSystem = ((HFileSystem)fs).getBackingFs();
127    }
128    return fileSystem instanceof DistributedFileSystem;
129  }
130
131  /**
132   * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
133   * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true.  Does not consider
134   * schema; i.e. if schemas different but path or subpath matches, the two will equate.
135   * @param pathToSearch Path we will be trying to match.
136   * @param pathTail
137   * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
138   */
139  public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) {
140    if (pathToSearch.depth() != pathTail.depth()) return false;
141    Path tailPath = pathTail;
142    String tailName;
143    Path toSearch = pathToSearch;
144    String toSearchName;
145    boolean result = false;
146    do {
147      tailName = tailPath.getName();
148      if (tailName == null || tailName.length() <= 0) {
149        result = true;
150        break;
151      }
152      toSearchName = toSearch.getName();
153      if (toSearchName == null || toSearchName.length() <= 0) break;
154      // Move up a parent on each path for next go around.  Path doesn't let us go off the end.
155      tailPath = tailPath.getParent();
156      toSearch = toSearch.getParent();
157    } while(tailName.equals(toSearchName));
158    return result;
159  }
160
161  public static FSUtils getInstance(FileSystem fs, Configuration conf) {
162    String scheme = fs.getUri().getScheme();
163    if (scheme == null) {
164      LOG.warn("Could not find scheme for uri " +
165          fs.getUri() + ", default to hdfs");
166      scheme = "hdfs";
167    }
168    Class<?> fsUtilsClass = conf.getClass("hbase.fsutil." +
169        scheme + ".impl", FSHDFSUtils.class); // Default to HDFS impl
170    FSUtils fsUtils = (FSUtils)ReflectionUtils.newInstance(fsUtilsClass, conf);
171    return fsUtils;
172  }
173
174  /**
175   * Delete the region directory if exists.
176   * @param conf
177   * @param hri
178   * @return True if deleted the region directory.
179   * @throws IOException
180   */
181  public static boolean deleteRegionDir(final Configuration conf, final HRegionInfo hri)
182  throws IOException {
183    Path rootDir = getRootDir(conf);
184    FileSystem fs = rootDir.getFileSystem(conf);
185    return deleteDirectory(fs,
186      new Path(getTableDir(rootDir, hri.getTable()), hri.getEncodedName()));
187  }
188
189 /**
190   * Create the specified file on the filesystem. By default, this will:
191   * <ol>
192   * <li>overwrite the file if it exists</li>
193   * <li>apply the umask in the configuration (if it is enabled)</li>
194   * <li>use the fs configured buffer size (or 4096 if not set)</li>
195   * <li>use the configured column family replication or default replication if
196   * {@link HColumnDescriptor#DEFAULT_DFS_REPLICATION}</li>
197   * <li>use the default block size</li>
198   * <li>not track progress</li>
199   * </ol>
200   * @param conf configurations
201   * @param fs {@link FileSystem} on which to write the file
202   * @param path {@link Path} to the file to write
203   * @param perm permissions
204   * @param favoredNodes
205   * @return output stream to the created file
206   * @throws IOException if the file cannot be created
207   */
208  public static FSDataOutputStream create(Configuration conf, FileSystem fs, Path path,
209      FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException {
210    if (fs instanceof HFileSystem) {
211      FileSystem backingFs = ((HFileSystem)fs).getBackingFs();
212      if (backingFs instanceof DistributedFileSystem) {
213        // Try to use the favoredNodes version via reflection to allow backwards-
214        // compatibility.
215        short replication = Short.parseShort(conf.get(HColumnDescriptor.DFS_REPLICATION,
216          String.valueOf(HColumnDescriptor.DEFAULT_DFS_REPLICATION)));
217        try {
218          return (FSDataOutputStream) (DistributedFileSystem.class.getDeclaredMethod("create",
219            Path.class, FsPermission.class, boolean.class, int.class, short.class, long.class,
220            Progressable.class, InetSocketAddress[].class).invoke(backingFs, path, perm, true,
221            getDefaultBufferSize(backingFs),
222            replication > 0 ? replication : getDefaultReplication(backingFs, path),
223            getDefaultBlockSize(backingFs, path), null, favoredNodes));
224        } catch (InvocationTargetException ite) {
225          // Function was properly called, but threw it's own exception.
226          throw new IOException(ite.getCause());
227        } catch (NoSuchMethodException e) {
228          LOG.debug("DFS Client does not support most favored nodes create; using default create");
229          if (LOG.isTraceEnabled()) LOG.trace("Ignoring; use default create", e);
230        } catch (IllegalArgumentException e) {
231          LOG.debug("Ignoring (most likely Reflection related exception) " + e);
232        } catch (SecurityException e) {
233          LOG.debug("Ignoring (most likely Reflection related exception) " + e);
234        } catch (IllegalAccessException e) {
235          LOG.debug("Ignoring (most likely Reflection related exception) " + e);
236        }
237      }
238    }
239    return create(fs, path, perm, true);
240  }
241
242  /**
243   * Checks to see if the specified file system is available
244   *
245   * @param fs filesystem
246   * @throws IOException e
247   */
248  public static void checkFileSystemAvailable(final FileSystem fs)
249  throws IOException {
250    if (!(fs instanceof DistributedFileSystem)) {
251      return;
252    }
253    IOException exception = null;
254    DistributedFileSystem dfs = (DistributedFileSystem) fs;
255    try {
256      if (dfs.exists(new Path("/"))) {
257        return;
258      }
259    } catch (IOException e) {
260      exception = e instanceof RemoteException ?
261              ((RemoteException)e).unwrapRemoteException() : e;
262    }
263    try {
264      fs.close();
265    } catch (Exception e) {
266      LOG.error("file system close failed: ", e);
267    }
268    throw new IOException("File system is not available", exception);
269  }
270
271  /**
272   * We use reflection because {@link DistributedFileSystem#setSafeMode(
273   * HdfsConstants.SafeModeAction action, boolean isChecked)} is not in hadoop 1.1
274   *
275   * @param dfs
276   * @return whether we're in safe mode
277   * @throws IOException
278   */
279  private static boolean isInSafeMode(DistributedFileSystem dfs) throws IOException {
280    boolean inSafeMode = false;
281    try {
282      Method m = DistributedFileSystem.class.getMethod("setSafeMode", new Class<?> []{
283          org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.class, boolean.class});
284      inSafeMode = (Boolean) m.invoke(dfs,
285        org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET, true);
286    } catch (Exception e) {
287      if (e instanceof IOException) throw (IOException) e;
288
289      // Check whether dfs is on safemode.
290      inSafeMode = dfs.setSafeMode(
291        org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET);
292    }
293    return inSafeMode;
294  }
295
296  /**
297   * Check whether dfs is in safemode.
298   * @param conf
299   * @throws IOException
300   */
301  public static void checkDfsSafeMode(final Configuration conf)
302  throws IOException {
303    boolean isInSafeMode = false;
304    FileSystem fs = FileSystem.get(conf);
305    if (fs instanceof DistributedFileSystem) {
306      DistributedFileSystem dfs = (DistributedFileSystem)fs;
307      isInSafeMode = isInSafeMode(dfs);
308    }
309    if (isInSafeMode) {
310      throw new IOException("File system is in safemode, it can't be written now");
311    }
312  }
313
314  /**
315   * Verifies current version of file system
316   *
317   * @param fs filesystem object
318   * @param rootdir root hbase directory
319   * @return null if no version file exists, version string otherwise.
320   * @throws IOException e
321   * @throws org.apache.hadoop.hbase.exceptions.DeserializationException
322   */
323  public static String getVersion(FileSystem fs, Path rootdir)
324  throws IOException, DeserializationException {
325    Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
326    FileStatus[] status = null;
327    try {
328      // hadoop 2.0 throws FNFE if directory does not exist.
329      // hadoop 1.0 returns null if directory does not exist.
330      status = fs.listStatus(versionFile);
331    } catch (FileNotFoundException fnfe) {
332      return null;
333    }
334    if (status == null || status.length == 0) return null;
335    String version = null;
336    byte [] content = new byte [(int)status[0].getLen()];
337    FSDataInputStream s = fs.open(versionFile);
338    try {
339      IOUtils.readFully(s, content, 0, content.length);
340      if (ProtobufUtil.isPBMagicPrefix(content)) {
341        version = parseVersionFrom(content);
342      } else {
343        // Presume it pre-pb format.
344        InputStream is = new ByteArrayInputStream(content);
345        DataInputStream dis = new DataInputStream(is);
346        try {
347          version = dis.readUTF();
348        } finally {
349          dis.close();
350        }
351      }
352    } catch (EOFException eof) {
353      LOG.warn("Version file was empty, odd, will try to set it.");
354    } finally {
355      s.close();
356    }
357    return version;
358  }
359
360  /**
361   * Parse the content of the ${HBASE_ROOTDIR}/hbase.version file.
362   * @param bytes The byte content of the hbase.version file.
363   * @return The version found in the file as a String.
364   * @throws DeserializationException
365   */
366  static String parseVersionFrom(final byte [] bytes)
367  throws DeserializationException {
368    ProtobufUtil.expectPBMagicPrefix(bytes);
369    int pblen = ProtobufUtil.lengthOfPBMagic();
370    FSProtos.HBaseVersionFileContent.Builder builder =
371      FSProtos.HBaseVersionFileContent.newBuilder();
372    try {
373      ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
374      return builder.getVersion();
375    } catch (IOException e) {
376      // Convert
377      throw new DeserializationException(e);
378    }
379  }
380
381  /**
382   * Create the content to write into the ${HBASE_ROOTDIR}/hbase.version file.
383   * @param version Version to persist
384   * @return Serialized protobuf with <code>version</code> content and a bit of pb magic for a prefix.
385   */
386  static byte [] toVersionByteArray(final String version) {
387    FSProtos.HBaseVersionFileContent.Builder builder =
388      FSProtos.HBaseVersionFileContent.newBuilder();
389    return ProtobufUtil.prependPBMagic(builder.setVersion(version).build().toByteArray());
390  }
391
392  /**
393   * Verifies current version of file system
394   *
395   * @param fs file system
396   * @param rootdir root directory of HBase installation
397   * @param message if true, issues a message on System.out
398   *
399   * @throws IOException e
400   * @throws DeserializationException
401   */
402  public static void checkVersion(FileSystem fs, Path rootdir, boolean message)
403  throws IOException, DeserializationException {
404    checkVersion(fs, rootdir, message, 0, HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
405  }
406
407  /**
408   * Verifies current version of file system
409   *
410   * @param fs file system
411   * @param rootdir root directory of HBase installation
412   * @param message if true, issues a message on System.out
413   * @param wait wait interval
414   * @param retries number of times to retry
415   *
416   * @throws IOException e
417   * @throws DeserializationException
418   */
419  public static void checkVersion(FileSystem fs, Path rootdir,
420      boolean message, int wait, int retries)
421  throws IOException, DeserializationException {
422    String version = getVersion(fs, rootdir);
423    String msg;
424    if (version == null) {
425      if (!metaRegionExists(fs, rootdir)) {
426        // rootDir is empty (no version file and no root region)
427        // just create new version file (HBASE-1195)
428        setVersion(fs, rootdir, wait, retries);
429        return;
430      } else {
431        msg = "hbase.version file is missing. Is your hbase.rootdir valid? " +
432            "You can restore hbase.version file by running 'HBCK2 filesystem -fix'. " +
433            "See https://github.com/apache/hbase-operator-tools/tree/master/hbase-hbck2";
434      }
435    } else if (version.compareTo(HConstants.FILE_SYSTEM_VERSION) == 0) {
436      return;
437    } else {
438      msg = "HBase file layout needs to be upgraded. Current filesystem version is " + version +
439          " but software requires version " + HConstants.FILE_SYSTEM_VERSION +
440          ". Consult http://hbase.apache.org/book.html for further information about " +
441          "upgrading HBase.";
442    }
443
444    // version is deprecated require migration
445    // Output on stdout so user sees it in terminal.
446    if (message) {
447      System.out.println("WARNING! " + msg);
448    }
449    throw new FileSystemVersionException(msg);
450  }
451
452  /**
453   * Sets version of file system
454   *
455   * @param fs filesystem object
456   * @param rootdir hbase root
457   * @throws IOException e
458   */
459  public static void setVersion(FileSystem fs, Path rootdir)
460  throws IOException {
461    setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, 0,
462      HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
463  }
464
465  /**
466   * Sets version of file system
467   *
468   * @param fs filesystem object
469   * @param rootdir hbase root
470   * @param wait time to wait for retry
471   * @param retries number of times to retry before failing
472   * @throws IOException e
473   */
474  public static void setVersion(FileSystem fs, Path rootdir, int wait, int retries)
475  throws IOException {
476    setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, wait, retries);
477  }
478
479
480  /**
481   * Sets version of file system
482   *
483   * @param fs filesystem object
484   * @param rootdir hbase root directory
485   * @param version version to set
486   * @param wait time to wait for retry
487   * @param retries number of times to retry before throwing an IOException
488   * @throws IOException e
489   */
490  public static void setVersion(FileSystem fs, Path rootdir, String version,
491      int wait, int retries) throws IOException {
492    Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
493    Path tempVersionFile = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY + Path.SEPARATOR +
494      HConstants.VERSION_FILE_NAME);
495    while (true) {
496      try {
497        // Write the version to a temporary file
498        FSDataOutputStream s = fs.create(tempVersionFile);
499        try {
500          s.write(toVersionByteArray(version));
501          s.close();
502          s = null;
503          // Move the temp version file to its normal location. Returns false
504          // if the rename failed. Throw an IOE in that case.
505          if (!fs.rename(tempVersionFile, versionFile)) {
506            throw new IOException("Unable to move temp version file to " + versionFile);
507          }
508        } finally {
509          // Cleaning up the temporary if the rename failed would be trying
510          // too hard. We'll unconditionally create it again the next time
511          // through anyway, files are overwritten by default by create().
512
513          // Attempt to close the stream on the way out if it is still open.
514          try {
515            if (s != null) s.close();
516          } catch (IOException ignore) { }
517        }
518        LOG.info("Created version file at " + rootdir.toString() + " with version=" + version);
519        return;
520      } catch (IOException e) {
521        if (retries > 0) {
522          LOG.debug("Unable to create version file at " + rootdir.toString() + ", retrying", e);
523          fs.delete(versionFile, false);
524          try {
525            if (wait > 0) {
526              Thread.sleep(wait);
527            }
528          } catch (InterruptedException ie) {
529            throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
530          }
531          retries--;
532        } else {
533          throw e;
534        }
535      }
536    }
537  }
538
539  /**
540   * Checks that a cluster ID file exists in the HBase root directory
541   * @param fs the root directory FileSystem
542   * @param rootdir the HBase root directory in HDFS
543   * @param wait how long to wait between retries
544   * @return <code>true</code> if the file exists, otherwise <code>false</code>
545   * @throws IOException if checking the FileSystem fails
546   */
547  public static boolean checkClusterIdExists(FileSystem fs, Path rootdir,
548      int wait) throws IOException {
549    while (true) {
550      try {
551        Path filePath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
552        return fs.exists(filePath);
553      } catch (IOException ioe) {
554        if (wait > 0) {
555          LOG.warn("Unable to check cluster ID file in " + rootdir.toString() +
556              ", retrying in "+wait+"msec: "+StringUtils.stringifyException(ioe));
557          try {
558            Thread.sleep(wait);
559          } catch (InterruptedException e) {
560            throw (InterruptedIOException)new InterruptedIOException().initCause(e);
561          }
562        } else {
563          throw ioe;
564        }
565      }
566    }
567  }
568
569  /**
570   * Returns the value of the unique cluster ID stored for this HBase instance.
571   * @param fs the root directory FileSystem
572   * @param rootdir the path to the HBase root directory
573   * @return the unique cluster identifier
574   * @throws IOException if reading the cluster ID file fails
575   */
576  public static ClusterId getClusterId(FileSystem fs, Path rootdir)
577  throws IOException {
578    Path idPath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
579    ClusterId clusterId = null;
580    FileStatus status = fs.exists(idPath)? fs.getFileStatus(idPath):  null;
581    if (status != null) {
582      int len = Ints.checkedCast(status.getLen());
583      byte [] content = new byte[len];
584      FSDataInputStream in = fs.open(idPath);
585      try {
586        in.readFully(content);
587      } catch (EOFException eof) {
588        LOG.warn("Cluster ID file " + idPath.toString() + " was empty");
589      } finally{
590        in.close();
591      }
592      try {
593        clusterId = ClusterId.parseFrom(content);
594      } catch (DeserializationException e) {
595        throw new IOException("content=" + Bytes.toString(content), e);
596      }
597      // If not pb'd, make it so.
598      if (!ProtobufUtil.isPBMagicPrefix(content)) {
599        String cid = null;
600        in = fs.open(idPath);
601        try {
602          cid = in.readUTF();
603          clusterId = new ClusterId(cid);
604        } catch (EOFException eof) {
605          LOG.warn("Cluster ID file " + idPath.toString() + " was empty");
606        } finally {
607          in.close();
608        }
609        rewriteAsPb(fs, rootdir, idPath, clusterId);
610      }
611      return clusterId;
612    } else {
613      LOG.warn("Cluster ID file does not exist at " + idPath.toString());
614    }
615    return clusterId;
616  }
617
618  /**
619   * @param cid
620   * @throws IOException
621   */
622  private static void rewriteAsPb(final FileSystem fs, final Path rootdir, final Path p,
623      final ClusterId cid)
624  throws IOException {
625    // Rewrite the file as pb.  Move aside the old one first, write new
626    // then delete the moved-aside file.
627    Path movedAsideName = new Path(p + "." + System.currentTimeMillis());
628    if (!fs.rename(p, movedAsideName)) throw new IOException("Failed rename of " + p);
629    setClusterId(fs, rootdir, cid, 100);
630    if (!fs.delete(movedAsideName, false)) {
631      throw new IOException("Failed delete of " + movedAsideName);
632    }
633    LOG.debug("Rewrote the hbase.id file as pb");
634  }
635
636  /**
637   * Writes a new unique identifier for this cluster to the "hbase.id" file
638   * in the HBase root directory
639   * @param fs the root directory FileSystem
640   * @param rootdir the path to the HBase root directory
641   * @param clusterId the unique identifier to store
642   * @param wait how long (in milliseconds) to wait between retries
643   * @throws IOException if writing to the FileSystem fails and no wait value
644   */
645  public static void setClusterId(FileSystem fs, Path rootdir, ClusterId clusterId,
646      int wait) throws IOException {
647    while (true) {
648      try {
649        Path idFile = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
650        Path tempIdFile = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY +
651          Path.SEPARATOR + HConstants.CLUSTER_ID_FILE_NAME);
652        // Write the id file to a temporary location
653        FSDataOutputStream s = fs.create(tempIdFile);
654        try {
655          s.write(clusterId.toByteArray());
656          s.close();
657          s = null;
658          // Move the temporary file to its normal location. Throw an IOE if
659          // the rename failed
660          if (!fs.rename(tempIdFile, idFile)) {
661            throw new IOException("Unable to move temp version file to " + idFile);
662          }
663        } finally {
664          // Attempt to close the stream if still open on the way out
665          try {
666            if (s != null) s.close();
667          } catch (IOException ignore) { }
668        }
669        if (LOG.isDebugEnabled()) {
670          LOG.debug("Created cluster ID file at " + idFile.toString() + " with ID: " + clusterId);
671        }
672        return;
673      } catch (IOException ioe) {
674        if (wait > 0) {
675          LOG.warn("Unable to create cluster ID file in " + rootdir.toString() +
676              ", retrying in " + wait + "msec: " + StringUtils.stringifyException(ioe));
677          try {
678            Thread.sleep(wait);
679          } catch (InterruptedException e) {
680            throw (InterruptedIOException)new InterruptedIOException().initCause(e);
681          }
682        } else {
683          throw ioe;
684        }
685      }
686    }
687  }
688
689  /**
690   * If DFS, check safe mode and if so, wait until we clear it.
691   * @param conf configuration
692   * @param wait Sleep between retries
693   * @throws IOException e
694   */
695  public static void waitOnSafeMode(final Configuration conf,
696    final long wait)
697  throws IOException {
698    FileSystem fs = FileSystem.get(conf);
699    if (!(fs instanceof DistributedFileSystem)) return;
700    DistributedFileSystem dfs = (DistributedFileSystem)fs;
701    // Make sure dfs is not in safe mode
702    while (isInSafeMode(dfs)) {
703      LOG.info("Waiting for dfs to exit safe mode...");
704      try {
705        Thread.sleep(wait);
706      } catch (InterruptedException e) {
707        throw (InterruptedIOException)new InterruptedIOException().initCause(e);
708      }
709    }
710  }
711
712  /**
713   * Checks if meta region exists
714   * @param fs file system
715   * @param rootDir root directory of HBase installation
716   * @return true if exists
717   */
718  public static boolean metaRegionExists(FileSystem fs, Path rootDir) throws IOException {
719    Path metaRegionDir = getRegionDirFromRootDir(rootDir, RegionInfoBuilder.FIRST_META_REGIONINFO);
720    return fs.exists(metaRegionDir);
721  }
722
723  /**
724   * Compute HDFS blocks distribution of a given file, or a portion of the file
725   * @param fs file system
726   * @param status file status of the file
727   * @param start start position of the portion
728   * @param length length of the portion
729   * @return The HDFS blocks distribution
730   */
731  static public HDFSBlocksDistribution computeHDFSBlocksDistribution(
732    final FileSystem fs, FileStatus status, long start, long length)
733    throws IOException {
734    HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution();
735    BlockLocation [] blockLocations =
736      fs.getFileBlockLocations(status, start, length);
737    for(BlockLocation bl : blockLocations) {
738      String [] hosts = bl.getHosts();
739      long len = bl.getLength();
740      blocksDistribution.addHostsAndBlockWeight(hosts, len);
741    }
742
743    return blocksDistribution;
744  }
745
746  /**
747   * Update blocksDistribution with blockLocations
748   * @param blocksDistribution the hdfs blocks distribution
749   * @param blockLocations an array containing block location
750   */
751  static public void addToHDFSBlocksDistribution(
752      HDFSBlocksDistribution blocksDistribution, BlockLocation[] blockLocations)
753      throws IOException {
754    for (BlockLocation bl : blockLocations) {
755      String[] hosts = bl.getHosts();
756      long len = bl.getLength();
757      blocksDistribution.addHostsAndBlockWeight(hosts, len);
758    }
759  }
760
761  // TODO move this method OUT of FSUtils. No dependencies to HMaster
762  /**
763   * Returns the total overall fragmentation percentage. Includes hbase:meta and
764   * -ROOT- as well.
765   *
766   * @param master  The master defining the HBase root and file system.
767   * @return A map for each table and its percentage.
768   * @throws IOException When scanning the directory fails.
769   */
770  public static int getTotalTableFragmentation(final HMaster master)
771  throws IOException {
772    Map<String, Integer> map = getTableFragmentation(master);
773    return map != null && map.size() > 0 ? map.get("-TOTAL-") : -1;
774  }
775
776  /**
777   * Runs through the HBase rootdir and checks how many stores for each table
778   * have more than one file in them. Checks -ROOT- and hbase:meta too. The total
779   * percentage across all tables is stored under the special key "-TOTAL-".
780   *
781   * @param master  The master defining the HBase root and file system.
782   * @return A map for each table and its percentage.
783   *
784   * @throws IOException When scanning the directory fails.
785   */
786  public static Map<String, Integer> getTableFragmentation(
787    final HMaster master)
788  throws IOException {
789    Path path = getRootDir(master.getConfiguration());
790    // since HMaster.getFileSystem() is package private
791    FileSystem fs = path.getFileSystem(master.getConfiguration());
792    return getTableFragmentation(fs, path);
793  }
794
795  /**
796   * Runs through the HBase rootdir and checks how many stores for each table
797   * have more than one file in them. Checks -ROOT- and hbase:meta too. The total
798   * percentage across all tables is stored under the special key "-TOTAL-".
799   *
800   * @param fs  The file system to use.
801   * @param hbaseRootDir  The root directory to scan.
802   * @return A map for each table and its percentage.
803   * @throws IOException When scanning the directory fails.
804   */
805  public static Map<String, Integer> getTableFragmentation(
806    final FileSystem fs, final Path hbaseRootDir)
807  throws IOException {
808    Map<String, Integer> frags = new HashMap<>();
809    int cfCountTotal = 0;
810    int cfFragTotal = 0;
811    PathFilter regionFilter = new RegionDirFilter(fs);
812    PathFilter familyFilter = new FamilyDirFilter(fs);
813    List<Path> tableDirs = getTableDirs(fs, hbaseRootDir);
814    for (Path d : tableDirs) {
815      int cfCount = 0;
816      int cfFrag = 0;
817      FileStatus[] regionDirs = fs.listStatus(d, regionFilter);
818      for (FileStatus regionDir : regionDirs) {
819        Path dd = regionDir.getPath();
820        // else its a region name, now look in region for families
821        FileStatus[] familyDirs = fs.listStatus(dd, familyFilter);
822        for (FileStatus familyDir : familyDirs) {
823          cfCount++;
824          cfCountTotal++;
825          Path family = familyDir.getPath();
826          // now in family make sure only one file
827          FileStatus[] familyStatus = fs.listStatus(family);
828          if (familyStatus.length > 1) {
829            cfFrag++;
830            cfFragTotal++;
831          }
832        }
833      }
834      // compute percentage per table and store in result list
835      frags.put(FSUtils.getTableName(d).getNameAsString(),
836        cfCount == 0? 0: Math.round((float) cfFrag / cfCount * 100));
837    }
838    // set overall percentage for all tables
839    frags.put("-TOTAL-",
840      cfCountTotal == 0? 0: Math.round((float) cfFragTotal / cfCountTotal * 100));
841    return frags;
842  }
843
844  public static void renameFile(FileSystem fs, Path src, Path dst) throws IOException {
845    if (fs.exists(dst) && !fs.delete(dst, false)) {
846      throw new IOException("Can not delete " + dst);
847    }
848    if (!fs.rename(src, dst)) {
849      throw new IOException("Can not rename from " + src + " to " + dst);
850    }
851  }
852
853  /**
854   * A {@link PathFilter} that returns only regular files.
855   */
856  static class FileFilter extends AbstractFileStatusFilter {
857    private final FileSystem fs;
858
859    public FileFilter(final FileSystem fs) {
860      this.fs = fs;
861    }
862
863    @Override
864    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
865      try {
866        return isFile(fs, isDir, p);
867      } catch (IOException e) {
868        LOG.warn("unable to verify if path=" + p + " is a regular file", e);
869        return false;
870      }
871    }
872  }
873
874  /**
875   * Directory filter that doesn't include any of the directories in the specified blacklist
876   */
877  public static class BlackListDirFilter extends AbstractFileStatusFilter {
878    private final FileSystem fs;
879    private List<String> blacklist;
880
881    /**
882     * Create a filter on the givem filesystem with the specified blacklist
883     * @param fs filesystem to filter
884     * @param directoryNameBlackList list of the names of the directories to filter. If
885     *          <tt>null</tt>, all directories are returned
886     */
887    @SuppressWarnings("unchecked")
888    public BlackListDirFilter(final FileSystem fs, final List<String> directoryNameBlackList) {
889      this.fs = fs;
890      blacklist =
891        (List<String>) (directoryNameBlackList == null ? Collections.emptyList()
892          : directoryNameBlackList);
893    }
894
895    @Override
896    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
897      if (!isValidName(p.getName())) {
898        return false;
899      }
900
901      try {
902        return isDirectory(fs, isDir, p);
903      } catch (IOException e) {
904        LOG.warn("An error occurred while verifying if [" + p.toString()
905            + "] is a valid directory. Returning 'not valid' and continuing.", e);
906        return false;
907      }
908    }
909
910    protected boolean isValidName(final String name) {
911      return !blacklist.contains(name);
912    }
913  }
914
915  /**
916   * A {@link PathFilter} that only allows directories.
917   */
918  public static class DirFilter extends BlackListDirFilter {
919
920    public DirFilter(FileSystem fs) {
921      super(fs, null);
922    }
923  }
924
925  /**
926   * A {@link PathFilter} that returns usertable directories. To get all directories use the
927   * {@link BlackListDirFilter} with a <tt>null</tt> blacklist
928   */
929  public static class UserTableDirFilter extends BlackListDirFilter {
930    public UserTableDirFilter(FileSystem fs) {
931      super(fs, HConstants.HBASE_NON_TABLE_DIRS);
932    }
933
934    @Override
935    protected boolean isValidName(final String name) {
936      if (!super.isValidName(name))
937        return false;
938
939      try {
940        TableName.isLegalTableQualifierName(Bytes.toBytes(name));
941      } catch (IllegalArgumentException e) {
942        LOG.info("INVALID NAME " + name);
943        return false;
944      }
945      return true;
946    }
947  }
948
949  public void recoverFileLease(final FileSystem fs, final Path p, Configuration conf)
950      throws IOException {
951    recoverFileLease(fs, p, conf, null);
952  }
953
954  /**
955   * Recover file lease. Used when a file might be suspect
956   * to be had been left open by another process.
957   * @param fs FileSystem handle
958   * @param p Path of file to recover lease
959   * @param conf Configuration handle
960   * @throws IOException
961   */
962  public abstract void recoverFileLease(final FileSystem fs, final Path p,
963      Configuration conf, CancelableProgressable reporter) throws IOException;
964
965  public static List<Path> getTableDirs(final FileSystem fs, final Path rootdir)
966      throws IOException {
967    List<Path> tableDirs = new LinkedList<>();
968
969    for(FileStatus status :
970        fs.globStatus(new Path(rootdir,
971            new Path(HConstants.BASE_NAMESPACE_DIR, "*")))) {
972      tableDirs.addAll(FSUtils.getLocalTableDirs(fs, status.getPath()));
973    }
974    return tableDirs;
975  }
976
977  /**
978   * @param fs
979   * @param rootdir
980   * @return All the table directories under <code>rootdir</code>. Ignore non table hbase folders such as
981   * .logs, .oldlogs, .corrupt folders.
982   * @throws IOException
983   */
984  public static List<Path> getLocalTableDirs(final FileSystem fs, final Path rootdir)
985      throws IOException {
986    // presumes any directory under hbase.rootdir is a table
987    FileStatus[] dirs = fs.listStatus(rootdir, new UserTableDirFilter(fs));
988    List<Path> tabledirs = new ArrayList<>(dirs.length);
989    for (FileStatus dir: dirs) {
990      tabledirs.add(dir.getPath());
991    }
992    return tabledirs;
993  }
994
995  /**
996   * Filter for all dirs that don't start with '.'
997   */
998  public static class RegionDirFilter extends AbstractFileStatusFilter {
999    // This pattern will accept 0.90+ style hex region dirs and older numeric region dir names.
1000    final public static Pattern regionDirPattern = Pattern.compile("^[0-9a-f]*$");
1001    final FileSystem fs;
1002
1003    public RegionDirFilter(FileSystem fs) {
1004      this.fs = fs;
1005    }
1006
1007    @Override
1008    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1009      if (!regionDirPattern.matcher(p.getName()).matches()) {
1010        return false;
1011      }
1012
1013      try {
1014        return isDirectory(fs, isDir, p);
1015      } catch (IOException ioe) {
1016        // Maybe the file was moved or the fs was disconnected.
1017        LOG.warn("Skipping file " + p +" due to IOException", ioe);
1018        return false;
1019      }
1020    }
1021  }
1022
1023  /**
1024   * Given a particular table dir, return all the regiondirs inside it, excluding files such as
1025   * .tableinfo
1026   * @param fs A file system for the Path
1027   * @param tableDir Path to a specific table directory &lt;hbase.rootdir&gt;/&lt;tabledir&gt;
1028   * @return List of paths to valid region directories in table dir.
1029   * @throws IOException
1030   */
1031  public static List<Path> getRegionDirs(final FileSystem fs, final Path tableDir) throws IOException {
1032    // assumes we are in a table dir.
1033    List<FileStatus> rds = listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs));
1034    if (rds == null) {
1035      return Collections.emptyList();
1036    }
1037    List<Path> regionDirs = new ArrayList<>(rds.size());
1038    for (FileStatus rdfs: rds) {
1039      Path rdPath = rdfs.getPath();
1040      regionDirs.add(rdPath);
1041    }
1042    return regionDirs;
1043  }
1044
1045  public static Path getRegionDirFromRootDir(Path rootDir, RegionInfo region) {
1046    return getRegionDirFromTableDir(getTableDir(rootDir, region.getTable()), region);
1047  }
1048
1049  public static Path getRegionDirFromTableDir(Path tableDir, RegionInfo region) {
1050    return new Path(tableDir, ServerRegionReplicaUtil.getRegionInfoForFs(region).getEncodedName());
1051  }
1052
1053  /**
1054   * Filter for all dirs that are legal column family names.  This is generally used for colfam
1055   * dirs &lt;hbase.rootdir&gt;/&lt;tabledir&gt;/&lt;regiondir&gt;/&lt;colfamdir&gt;.
1056   */
1057  public static class FamilyDirFilter extends AbstractFileStatusFilter {
1058    final FileSystem fs;
1059
1060    public FamilyDirFilter(FileSystem fs) {
1061      this.fs = fs;
1062    }
1063
1064    @Override
1065    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1066      try {
1067        // throws IAE if invalid
1068        HColumnDescriptor.isLegalFamilyName(Bytes.toBytes(p.getName()));
1069      } catch (IllegalArgumentException iae) {
1070        // path name is an invalid family name and thus is excluded.
1071        return false;
1072      }
1073
1074      try {
1075        return isDirectory(fs, isDir, p);
1076      } catch (IOException ioe) {
1077        // Maybe the file was moved or the fs was disconnected.
1078        LOG.warn("Skipping file " + p +" due to IOException", ioe);
1079        return false;
1080      }
1081    }
1082  }
1083
1084  /**
1085   * Given a particular region dir, return all the familydirs inside it
1086   *
1087   * @param fs A file system for the Path
1088   * @param regionDir Path to a specific region directory
1089   * @return List of paths to valid family directories in region dir.
1090   * @throws IOException
1091   */
1092  public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir) throws IOException {
1093    // assumes we are in a region dir.
1094    FileStatus[] fds = fs.listStatus(regionDir, new FamilyDirFilter(fs));
1095    List<Path> familyDirs = new ArrayList<>(fds.length);
1096    for (FileStatus fdfs: fds) {
1097      Path fdPath = fdfs.getPath();
1098      familyDirs.add(fdPath);
1099    }
1100    return familyDirs;
1101  }
1102
1103  public static List<Path> getReferenceFilePaths(final FileSystem fs, final Path familyDir) throws IOException {
1104    List<FileStatus> fds = listStatusWithStatusFilter(fs, familyDir, new ReferenceFileFilter(fs));
1105    if (fds == null) {
1106      return Collections.emptyList();
1107    }
1108    List<Path> referenceFiles = new ArrayList<>(fds.size());
1109    for (FileStatus fdfs: fds) {
1110      Path fdPath = fdfs.getPath();
1111      referenceFiles.add(fdPath);
1112    }
1113    return referenceFiles;
1114  }
1115
1116  /**
1117   * Filter for HFiles that excludes reference files.
1118   */
1119  public static class HFileFilter extends AbstractFileStatusFilter {
1120    final FileSystem fs;
1121
1122    public HFileFilter(FileSystem fs) {
1123      this.fs = fs;
1124    }
1125
1126    @Override
1127    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1128      if (!StoreFileInfo.isHFile(p)) {
1129        return false;
1130      }
1131
1132      try {
1133        return isFile(fs, isDir, p);
1134      } catch (IOException ioe) {
1135        // Maybe the file was moved or the fs was disconnected.
1136        LOG.warn("Skipping file " + p +" due to IOException", ioe);
1137        return false;
1138      }
1139    }
1140  }
1141
1142  /**
1143   * Filter for HFileLinks (StoreFiles and HFiles not included).
1144   * the filter itself does not consider if a link is file or not.
1145   */
1146  public static class HFileLinkFilter implements PathFilter {
1147
1148    @Override
1149    public boolean accept(Path p) {
1150      return HFileLink.isHFileLink(p);
1151    }
1152  }
1153
1154  public static class ReferenceFileFilter extends AbstractFileStatusFilter {
1155
1156    private final FileSystem fs;
1157
1158    public ReferenceFileFilter(FileSystem fs) {
1159      this.fs = fs;
1160    }
1161
1162    @Override
1163    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1164      if (!StoreFileInfo.isReference(p)) {
1165        return false;
1166      }
1167
1168      try {
1169        // only files can be references.
1170        return isFile(fs, isDir, p);
1171      } catch (IOException ioe) {
1172        // Maybe the file was moved or the fs was disconnected.
1173        LOG.warn("Skipping file " + p +" due to IOException", ioe);
1174        return false;
1175      }
1176    }
1177  }
1178
1179  /**
1180   * Called every so-often by storefile map builder getTableStoreFilePathMap to
1181   * report progress.
1182   */
1183  interface ProgressReporter {
1184    /**
1185     * @param status File or directory we are about to process.
1186     */
1187    void progress(FileStatus status);
1188  }
1189
1190  /**
1191   * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1192   * table StoreFile names to the full Path.
1193   * <br>
1194   * Example...<br>
1195   * Key = 3944417774205889744  <br>
1196   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1197   *
1198   * @param map map to add values.  If null, this method will create and populate one to return
1199   * @param fs  The file system to use.
1200   * @param hbaseRootDir  The root directory to scan.
1201   * @param tableName name of the table to scan.
1202   * @return Map keyed by StoreFile name with a value of the full Path.
1203   * @throws IOException When scanning the directory fails.
1204   * @throws InterruptedException
1205   */
1206  public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map,
1207  final FileSystem fs, final Path hbaseRootDir, TableName tableName)
1208  throws IOException, InterruptedException {
1209    return getTableStoreFilePathMap(map, fs, hbaseRootDir, tableName, null, null,
1210        (ProgressReporter)null);
1211  }
1212
1213  /**
1214   * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1215   * table StoreFile names to the full Path.  Note that because this method can be called
1216   * on a 'live' HBase system that we will skip files that no longer exist by the time
1217   * we traverse them and similarly the user of the result needs to consider that some
1218   * entries in this map may not exist by the time this call completes.
1219   * <br>
1220   * Example...<br>
1221   * Key = 3944417774205889744  <br>
1222   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1223   *
1224   * @param resultMap map to add values.  If null, this method will create and populate one to return
1225   * @param fs  The file system to use.
1226   * @param hbaseRootDir  The root directory to scan.
1227   * @param tableName name of the table to scan.
1228   * @param sfFilter optional path filter to apply to store files
1229   * @param executor optional executor service to parallelize this operation
1230   * @param progressReporter Instance or null; gets called every time we move to new region of
1231   *   family dir and for each store file.
1232   * @return Map keyed by StoreFile name with a value of the full Path.
1233   * @throws IOException When scanning the directory fails.
1234   * @deprecated Since 2.3.0. For removal in hbase4. Use ProgressReporter override instead.
1235   */
1236  @Deprecated
1237  public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap,
1238      final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter,
1239      ExecutorService executor, final HbckErrorReporter progressReporter)
1240      throws IOException, InterruptedException {
1241    return getTableStoreFilePathMap(resultMap, fs, hbaseRootDir, tableName, sfFilter, executor,
1242        new ProgressReporter() {
1243          @Override
1244          public void progress(FileStatus status) {
1245            // status is not used in this implementation.
1246            progressReporter.progress();
1247          }
1248        });
1249  }
1250
1251  /*
1252   * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1253   * table StoreFile names to the full Path.  Note that because this method can be called
1254   * on a 'live' HBase system that we will skip files that no longer exist by the time
1255   * we traverse them and similarly the user of the result needs to consider that some
1256   * entries in this map may not exist by the time this call completes.
1257   * <br>
1258   * Example...<br>
1259   * Key = 3944417774205889744  <br>
1260   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1261   *
1262   * @param resultMap map to add values.  If null, this method will create and populate one
1263   *   to return
1264   * @param fs  The file system to use.
1265   * @param hbaseRootDir  The root directory to scan.
1266   * @param tableName name of the table to scan.
1267   * @param sfFilter optional path filter to apply to store files
1268   * @param executor optional executor service to parallelize this operation
1269   * @param progressReporter Instance or null; gets called every time we move to new region of
1270   *   family dir and for each store file.
1271   * @return Map keyed by StoreFile name with a value of the full Path.
1272   * @throws IOException When scanning the directory fails.
1273   * @throws InterruptedException
1274   */
1275  public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap,
1276      final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter,
1277      ExecutorService executor, final ProgressReporter progressReporter)
1278    throws IOException, InterruptedException {
1279
1280    final Map<String, Path> finalResultMap =
1281        resultMap == null ? new ConcurrentHashMap<>(128, 0.75f, 32) : resultMap;
1282
1283    // only include the directory paths to tables
1284    Path tableDir = FSUtils.getTableDir(hbaseRootDir, tableName);
1285    // Inside a table, there are compaction.dir directories to skip.  Otherwise, all else
1286    // should be regions.
1287    final FamilyDirFilter familyFilter = new FamilyDirFilter(fs);
1288    final Vector<Exception> exceptions = new Vector<>();
1289
1290    try {
1291      List<FileStatus> regionDirs = FSUtils.listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs));
1292      if (regionDirs == null) {
1293        return finalResultMap;
1294      }
1295
1296      final List<Future<?>> futures = new ArrayList<>(regionDirs.size());
1297
1298      for (FileStatus regionDir : regionDirs) {
1299        if (null != progressReporter) {
1300          progressReporter.progress(regionDir);
1301        }
1302        final Path dd = regionDir.getPath();
1303
1304        if (!exceptions.isEmpty()) {
1305          break;
1306        }
1307
1308        Runnable getRegionStoreFileMapCall = new Runnable() {
1309          @Override
1310          public void run() {
1311            try {
1312              HashMap<String,Path> regionStoreFileMap = new HashMap<>();
1313              List<FileStatus> familyDirs = FSUtils.listStatusWithStatusFilter(fs, dd, familyFilter);
1314              if (familyDirs == null) {
1315                if (!fs.exists(dd)) {
1316                  LOG.warn("Skipping region because it no longer exists: " + dd);
1317                } else {
1318                  LOG.warn("Skipping region because it has no family dirs: " + dd);
1319                }
1320                return;
1321              }
1322              for (FileStatus familyDir : familyDirs) {
1323                if (null != progressReporter) {
1324                  progressReporter.progress(familyDir);
1325                }
1326                Path family = familyDir.getPath();
1327                if (family.getName().equals(HConstants.RECOVERED_EDITS_DIR)) {
1328                  continue;
1329                }
1330                // now in family, iterate over the StoreFiles and
1331                // put in map
1332                FileStatus[] familyStatus = fs.listStatus(family);
1333                for (FileStatus sfStatus : familyStatus) {
1334                  if (null != progressReporter) {
1335                    progressReporter.progress(sfStatus);
1336                  }
1337                  Path sf = sfStatus.getPath();
1338                  if (sfFilter == null || sfFilter.accept(sf)) {
1339                    regionStoreFileMap.put( sf.getName(), sf);
1340                  }
1341                }
1342              }
1343              finalResultMap.putAll(regionStoreFileMap);
1344            } catch (Exception e) {
1345              LOG.error("Could not get region store file map for region: " + dd, e);
1346              exceptions.add(e);
1347            }
1348          }
1349        };
1350
1351        // If executor is available, submit async tasks to exec concurrently, otherwise
1352        // just do serial sync execution
1353        if (executor != null) {
1354          Future<?> future = executor.submit(getRegionStoreFileMapCall);
1355          futures.add(future);
1356        } else {
1357          FutureTask<?> future = new FutureTask<>(getRegionStoreFileMapCall, null);
1358          future.run();
1359          futures.add(future);
1360        }
1361      }
1362
1363      // Ensure all pending tasks are complete (or that we run into an exception)
1364      for (Future<?> f : futures) {
1365        if (!exceptions.isEmpty()) {
1366          break;
1367        }
1368        try {
1369          f.get();
1370        } catch (ExecutionException e) {
1371          LOG.error("Unexpected exec exception!  Should've been caught already.  (Bug?)", e);
1372          // Shouldn't happen, we already logged/caught any exceptions in the Runnable
1373        }
1374      }
1375    } catch (IOException e) {
1376      LOG.error("Cannot execute getTableStoreFilePathMap for " + tableName, e);
1377      exceptions.add(e);
1378    } finally {
1379      if (!exceptions.isEmpty()) {
1380        // Just throw the first exception as an indication something bad happened
1381        // Don't need to propagate all the exceptions, we already logged them all anyway
1382        Throwables.propagateIfInstanceOf(exceptions.firstElement(), IOException.class);
1383        throw Throwables.propagate(exceptions.firstElement());
1384      }
1385    }
1386
1387    return finalResultMap;
1388  }
1389
1390  public static int getRegionReferenceFileCount(final FileSystem fs, final Path p) {
1391    int result = 0;
1392    try {
1393      for (Path familyDir:getFamilyDirs(fs, p)){
1394        result += getReferenceFilePaths(fs, familyDir).size();
1395      }
1396    } catch (IOException e) {
1397      LOG.warn("Error Counting reference files.", e);
1398    }
1399    return result;
1400  }
1401
1402  /**
1403   * Runs through the HBase rootdir and creates a reverse lookup map for
1404   * table StoreFile names to the full Path.
1405   * <br>
1406   * Example...<br>
1407   * Key = 3944417774205889744  <br>
1408   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1409   *
1410   * @param fs  The file system to use.
1411   * @param hbaseRootDir  The root directory to scan.
1412   * @return Map keyed by StoreFile name with a value of the full Path.
1413   * @throws IOException When scanning the directory fails.
1414   */
1415  public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs,
1416      final Path hbaseRootDir)
1417  throws IOException, InterruptedException {
1418    return getTableStoreFilePathMap(fs, hbaseRootDir, null, null, (ProgressReporter)null);
1419  }
1420
1421  /**
1422   * Runs through the HBase rootdir and creates a reverse lookup map for
1423   * table StoreFile names to the full Path.
1424   * <br>
1425   * Example...<br>
1426   * Key = 3944417774205889744  <br>
1427   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1428   *
1429   * @param fs  The file system to use.
1430   * @param hbaseRootDir  The root directory to scan.
1431   * @param sfFilter optional path filter to apply to store files
1432   * @param executor optional executor service to parallelize this operation
1433   * @param progressReporter Instance or null; gets called every time we move to new region of
1434   *   family dir and for each store file.
1435   * @return Map keyed by StoreFile name with a value of the full Path.
1436   * @throws IOException When scanning the directory fails.
1437   * @deprecated Since 2.3.0. Will be removed in hbase4. Used {@link
1438   *   #getTableStoreFilePathMap(FileSystem, Path, PathFilter, ExecutorService, ProgressReporter)}
1439   */
1440  @Deprecated
1441  public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs,
1442      final Path hbaseRootDir, PathFilter sfFilter, ExecutorService executor,
1443      HbckErrorReporter progressReporter)
1444    throws IOException, InterruptedException {
1445    return getTableStoreFilePathMap(fs, hbaseRootDir, sfFilter, executor,
1446        new ProgressReporter() {
1447          @Override
1448          public void progress(FileStatus status) {
1449            // status is not used in this implementation.
1450            progressReporter.progress();
1451          }
1452        });
1453  }
1454
1455  /**
1456   * Runs through the HBase rootdir and creates a reverse lookup map for
1457   * table StoreFile names to the full Path.
1458   * <br>
1459   * Example...<br>
1460   * Key = 3944417774205889744  <br>
1461   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1462   *
1463   * @param fs  The file system to use.
1464   * @param hbaseRootDir  The root directory to scan.
1465   * @param sfFilter optional path filter to apply to store files
1466   * @param executor optional executor service to parallelize this operation
1467   * @param progressReporter Instance or null; gets called every time we move to new region of
1468   *   family dir and for each store file.
1469   * @return Map keyed by StoreFile name with a value of the full Path.
1470   * @throws IOException When scanning the directory fails.
1471   * @throws InterruptedException
1472   */
1473  public static Map<String, Path> getTableStoreFilePathMap(
1474    final FileSystem fs, final Path hbaseRootDir, PathFilter sfFilter,
1475        ExecutorService executor, ProgressReporter progressReporter)
1476  throws IOException, InterruptedException {
1477    ConcurrentHashMap<String, Path> map = new ConcurrentHashMap<>(1024, 0.75f, 32);
1478
1479    // if this method looks similar to 'getTableFragmentation' that is because
1480    // it was borrowed from it.
1481
1482    // only include the directory paths to tables
1483    for (Path tableDir : FSUtils.getTableDirs(fs, hbaseRootDir)) {
1484      getTableStoreFilePathMap(map, fs, hbaseRootDir,
1485          FSUtils.getTableName(tableDir), sfFilter, executor, progressReporter);
1486    }
1487    return map;
1488  }
1489
1490  /**
1491   * Filters FileStatuses in an array and returns a list
1492   *
1493   * @param input   An array of FileStatuses
1494   * @param filter  A required filter to filter the array
1495   * @return        A list of FileStatuses
1496   */
1497  public static List<FileStatus> filterFileStatuses(FileStatus[] input,
1498      FileStatusFilter filter) {
1499    if (input == null) return null;
1500    return filterFileStatuses(Iterators.forArray(input), filter);
1501  }
1502
1503  /**
1504   * Filters FileStatuses in an iterator and returns a list
1505   *
1506   * @param input   An iterator of FileStatuses
1507   * @param filter  A required filter to filter the array
1508   * @return        A list of FileStatuses
1509   */
1510  public static List<FileStatus> filterFileStatuses(Iterator<FileStatus> input,
1511      FileStatusFilter filter) {
1512    if (input == null) return null;
1513    ArrayList<FileStatus> results = new ArrayList<>();
1514    while (input.hasNext()) {
1515      FileStatus f = input.next();
1516      if (filter.accept(f)) {
1517        results.add(f);
1518      }
1519    }
1520    return results;
1521  }
1522
1523  /**
1524   * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
1525   * This accommodates differences between hadoop versions, where hadoop 1
1526   * does not throw a FileNotFoundException, and return an empty FileStatus[]
1527   * while Hadoop 2 will throw FileNotFoundException.
1528   *
1529   * @param fs file system
1530   * @param dir directory
1531   * @param filter file status filter
1532   * @return null if dir is empty or doesn't exist, otherwise FileStatus list
1533   */
1534  public static List<FileStatus> listStatusWithStatusFilter(final FileSystem fs,
1535      final Path dir, final FileStatusFilter filter) throws IOException {
1536    FileStatus [] status = null;
1537    try {
1538      status = fs.listStatus(dir);
1539    } catch (FileNotFoundException fnfe) {
1540      // if directory doesn't exist, return null
1541      if (LOG.isTraceEnabled()) {
1542        LOG.trace(dir + " doesn't exist");
1543      }
1544    }
1545
1546    if (status == null || status.length < 1)  {
1547      return null;
1548    }
1549
1550    if (filter == null) {
1551      return Arrays.asList(status);
1552    } else {
1553      List<FileStatus> status2 = filterFileStatuses(status, filter);
1554      if (status2 == null || status2.isEmpty()) {
1555        return null;
1556      } else {
1557        return status2;
1558      }
1559    }
1560  }
1561
1562  /**
1563   * Throw an exception if an action is not permitted by a user on a file.
1564   *
1565   * @param ugi
1566   *          the user
1567   * @param file
1568   *          the file
1569   * @param action
1570   *          the action
1571   */
1572  public static void checkAccess(UserGroupInformation ugi, FileStatus file,
1573      FsAction action) throws AccessDeniedException {
1574    if (ugi.getShortUserName().equals(file.getOwner())) {
1575      if (file.getPermission().getUserAction().implies(action)) {
1576        return;
1577      }
1578    } else if (contains(ugi.getGroupNames(), file.getGroup())) {
1579      if (file.getPermission().getGroupAction().implies(action)) {
1580        return;
1581      }
1582    } else if (file.getPermission().getOtherAction().implies(action)) {
1583      return;
1584    }
1585    throw new AccessDeniedException("Permission denied:" + " action=" + action
1586        + " path=" + file.getPath() + " user=" + ugi.getShortUserName());
1587  }
1588
1589  private static boolean contains(String[] groups, String user) {
1590    for (String group : groups) {
1591      if (group.equals(user)) {
1592        return true;
1593      }
1594    }
1595    return false;
1596  }
1597
1598  /**
1599   * This function is to scan the root path of the file system to get the
1600   * degree of locality for each region on each of the servers having at least
1601   * one block of that region.
1602   * This is used by the tool {@link org.apache.hadoop.hbase.master.RegionPlacementMaintainer}
1603   *
1604   * @param conf
1605   *          the configuration to use
1606   * @return the mapping from region encoded name to a map of server names to
1607   *           locality fraction
1608   * @throws IOException
1609   *           in case of file system errors or interrupts
1610   */
1611  public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
1612      final Configuration conf) throws IOException {
1613    return getRegionDegreeLocalityMappingFromFS(
1614        conf, null,
1615        conf.getInt(THREAD_POOLSIZE, DEFAULT_THREAD_POOLSIZE));
1616
1617  }
1618
1619  /**
1620   * This function is to scan the root path of the file system to get the
1621   * degree of locality for each region on each of the servers having at least
1622   * one block of that region.
1623   *
1624   * @param conf
1625   *          the configuration to use
1626   * @param desiredTable
1627   *          the table you wish to scan locality for
1628   * @param threadPoolSize
1629   *          the thread pool size to use
1630   * @return the mapping from region encoded name to a map of server names to
1631   *           locality fraction
1632   * @throws IOException
1633   *           in case of file system errors or interrupts
1634   */
1635  public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
1636      final Configuration conf, final String desiredTable, int threadPoolSize)
1637      throws IOException {
1638    Map<String, Map<String, Float>> regionDegreeLocalityMapping = new ConcurrentHashMap<>();
1639    getRegionLocalityMappingFromFS(conf, desiredTable, threadPoolSize, regionDegreeLocalityMapping);
1640    return regionDegreeLocalityMapping;
1641  }
1642
1643  /**
1644   * This function is to scan the root path of the file system to get either the
1645   * mapping between the region name and its best locality region server or the
1646   * degree of locality of each region on each of the servers having at least
1647   * one block of that region. The output map parameters are both optional.
1648   *
1649   * @param conf
1650   *          the configuration to use
1651   * @param desiredTable
1652   *          the table you wish to scan locality for
1653   * @param threadPoolSize
1654   *          the thread pool size to use
1655   * @param regionDegreeLocalityMapping
1656   *          the map into which to put the locality degree mapping or null,
1657   *          must be a thread-safe implementation
1658   * @throws IOException
1659   *           in case of file system errors or interrupts
1660   */
1661  private static void getRegionLocalityMappingFromFS(final Configuration conf,
1662      final String desiredTable, int threadPoolSize,
1663      final Map<String, Map<String, Float>> regionDegreeLocalityMapping) throws IOException {
1664    final FileSystem fs =  FileSystem.get(conf);
1665    final Path rootPath = FSUtils.getRootDir(conf);
1666    final long startTime = EnvironmentEdgeManager.currentTime();
1667    final Path queryPath;
1668    // The table files are in ${hbase.rootdir}/data/<namespace>/<table>/*
1669    if (null == desiredTable) {
1670      queryPath = new Path(new Path(rootPath, HConstants.BASE_NAMESPACE_DIR).toString() + "/*/*/*/");
1671    } else {
1672      queryPath = new Path(FSUtils.getTableDir(rootPath, TableName.valueOf(desiredTable)).toString() + "/*/");
1673    }
1674
1675    // reject all paths that are not appropriate
1676    PathFilter pathFilter = new PathFilter() {
1677      @Override
1678      public boolean accept(Path path) {
1679        // this is the region name; it may get some noise data
1680        if (null == path) {
1681          return false;
1682        }
1683
1684        // no parent?
1685        Path parent = path.getParent();
1686        if (null == parent) {
1687          return false;
1688        }
1689
1690        String regionName = path.getName();
1691        if (null == regionName) {
1692          return false;
1693        }
1694
1695        if (!regionName.toLowerCase(Locale.ROOT).matches("[0-9a-f]+")) {
1696          return false;
1697        }
1698        return true;
1699      }
1700    };
1701
1702    FileStatus[] statusList = fs.globStatus(queryPath, pathFilter);
1703
1704    if (LOG.isDebugEnabled()) {
1705      LOG.debug("Query Path: {} ; # list of files: {}", queryPath, Arrays.toString(statusList));
1706    }
1707
1708    if (null == statusList) {
1709      return;
1710    }
1711
1712    // lower the number of threads in case we have very few expected regions
1713    threadPoolSize = Math.min(threadPoolSize, statusList.length);
1714
1715    // run in multiple threads
1716    final ExecutorService tpe = Executors.newFixedThreadPool(threadPoolSize,
1717      Threads.newDaemonThreadFactory("FSRegionQuery"));
1718    try {
1719      // ignore all file status items that are not of interest
1720      for (FileStatus regionStatus : statusList) {
1721        if (null == regionStatus || !regionStatus.isDirectory()) {
1722          continue;
1723        }
1724
1725        final Path regionPath = regionStatus.getPath();
1726        if (null != regionPath) {
1727          tpe.execute(new FSRegionScanner(fs, regionPath, null, regionDegreeLocalityMapping));
1728        }
1729      }
1730    } finally {
1731      tpe.shutdown();
1732      final long threadWakeFrequency = (long) conf.getInt(HConstants.THREAD_WAKE_FREQUENCY,
1733        HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
1734      try {
1735        // here we wait until TPE terminates, which is either naturally or by
1736        // exceptions in the execution of the threads
1737        while (!tpe.awaitTermination(threadWakeFrequency,
1738            TimeUnit.MILLISECONDS)) {
1739          // printing out rough estimate, so as to not introduce
1740          // AtomicInteger
1741          LOG.info("Locality checking is underway: { Scanned Regions : "
1742              + ((ThreadPoolExecutor) tpe).getCompletedTaskCount() + "/"
1743              + ((ThreadPoolExecutor) tpe).getTaskCount() + " }");
1744        }
1745      } catch (InterruptedException e) {
1746        Thread.currentThread().interrupt();
1747        throw (InterruptedIOException) new InterruptedIOException().initCause(e);
1748      }
1749    }
1750
1751    long overhead = EnvironmentEdgeManager.currentTime() - startTime;
1752    LOG.info("Scan DFS for locality info takes {}ms", overhead);
1753  }
1754
1755  /**
1756   * Do our short circuit read setup.
1757   * Checks buffer size to use and whether to do checksumming in hbase or hdfs.
1758   * @param conf
1759   */
1760  public static void setupShortCircuitRead(final Configuration conf) {
1761    // Check that the user has not set the "dfs.client.read.shortcircuit.skip.checksum" property.
1762    boolean shortCircuitSkipChecksum =
1763      conf.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false);
1764    boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
1765    if (shortCircuitSkipChecksum) {
1766      LOG.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not " +
1767        "be set to true." + (useHBaseChecksum ? " HBase checksum doesn't require " +
1768        "it, see https://issues.apache.org/jira/browse/HBASE-6868." : ""));
1769      assert !shortCircuitSkipChecksum; //this will fail if assertions are on
1770    }
1771    checkShortCircuitReadBufferSize(conf);
1772  }
1773
1774  /**
1775   * Check if short circuit read buffer size is set and if not, set it to hbase value.
1776   * @param conf
1777   */
1778  public static void checkShortCircuitReadBufferSize(final Configuration conf) {
1779    final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2;
1780    final int notSet = -1;
1781    // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2
1782    final String dfsKey = "dfs.client.read.shortcircuit.buffer.size";
1783    int size = conf.getInt(dfsKey, notSet);
1784    // If a size is set, return -- we will use it.
1785    if (size != notSet) return;
1786    // But short circuit buffer size is normally not set.  Put in place the hbase wanted size.
1787    int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize);
1788    conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
1789  }
1790
1791  /**
1792   * @param c
1793   * @return The DFSClient DFSHedgedReadMetrics instance or null if can't be found or not on hdfs.
1794   * @throws IOException
1795   */
1796  public static DFSHedgedReadMetrics getDFSHedgedReadMetrics(final Configuration c)
1797      throws IOException {
1798    if (!isHDFS(c)) return null;
1799    // getHedgedReadMetrics is package private. Get the DFSClient instance that is internal
1800    // to the DFS FS instance and make the method getHedgedReadMetrics accessible, then invoke it
1801    // to get the singleton instance of DFSHedgedReadMetrics shared by DFSClients.
1802    final String name = "getHedgedReadMetrics";
1803    DFSClient dfsclient = ((DistributedFileSystem)FileSystem.get(c)).getClient();
1804    Method m;
1805    try {
1806      m = dfsclient.getClass().getDeclaredMethod(name);
1807    } catch (NoSuchMethodException e) {
1808      LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " +
1809          e.getMessage());
1810      return null;
1811    } catch (SecurityException e) {
1812      LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " +
1813          e.getMessage());
1814      return null;
1815    }
1816    m.setAccessible(true);
1817    try {
1818      return (DFSHedgedReadMetrics)m.invoke(dfsclient);
1819    } catch (IllegalAccessException e) {
1820      LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " +
1821          e.getMessage());
1822      return null;
1823    } catch (IllegalArgumentException e) {
1824      LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " +
1825          e.getMessage());
1826      return null;
1827    } catch (InvocationTargetException e) {
1828      LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " +
1829          e.getMessage());
1830      return null;
1831    }
1832  }
1833
1834  public static List<Path> copyFilesParallel(FileSystem srcFS, Path src, FileSystem dstFS, Path dst,
1835      Configuration conf, int threads) throws IOException {
1836    ExecutorService pool = Executors.newFixedThreadPool(threads);
1837    List<Future<Void>> futures = new ArrayList<>();
1838    List<Path> traversedPaths;
1839    try {
1840      traversedPaths = copyFiles(srcFS, src, dstFS, dst, conf, pool, futures);
1841      for (Future<Void> future : futures) {
1842        future.get();
1843      }
1844    } catch (ExecutionException | InterruptedException | IOException e) {
1845      throw new IOException("copy snapshot reference files failed", e);
1846    } finally {
1847      pool.shutdownNow();
1848    }
1849    return traversedPaths;
1850  }
1851
1852  private static List<Path> copyFiles(FileSystem srcFS, Path src, FileSystem dstFS, Path dst,
1853      Configuration conf, ExecutorService pool, List<Future<Void>> futures) throws IOException {
1854    List<Path> traversedPaths = new ArrayList<>();
1855    traversedPaths.add(dst);
1856    FileStatus currentFileStatus = srcFS.getFileStatus(src);
1857    if (currentFileStatus.isDirectory()) {
1858      if (!dstFS.mkdirs(dst)) {
1859        throw new IOException("create dir failed: " + dst);
1860      }
1861      FileStatus[] subPaths = srcFS.listStatus(src);
1862      for (FileStatus subPath : subPaths) {
1863        traversedPaths.addAll(copyFiles(srcFS, subPath.getPath(), dstFS,
1864          new Path(dst, subPath.getPath().getName()), conf, pool, futures));
1865      }
1866    } else {
1867      Future<Void> future = pool.submit(() -> {
1868        FileUtil.copy(srcFS, src, dstFS, dst, false, false, conf);
1869        return null;
1870      });
1871      futures.add(future);
1872    }
1873    return traversedPaths;
1874  }
1875}