001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.util;
020
021import edu.umd.cs.findbugs.annotations.CheckForNull;
022import java.io.ByteArrayInputStream;
023import java.io.DataInputStream;
024import java.io.EOFException;
025import java.io.FileNotFoundException;
026import java.io.IOException;
027import java.io.InputStream;
028import java.io.InterruptedIOException;
029import java.lang.reflect.InvocationTargetException;
030import java.lang.reflect.Method;
031import java.net.InetSocketAddress;
032import java.util.ArrayList;
033import java.util.Arrays;
034import java.util.Collections;
035import java.util.HashMap;
036import java.util.Iterator;
037import java.util.LinkedList;
038import java.util.List;
039import java.util.Locale;
040import java.util.Map;
041import java.util.Vector;
042import java.util.concurrent.ArrayBlockingQueue;
043import java.util.concurrent.ConcurrentHashMap;
044import java.util.concurrent.ExecutionException;
045import java.util.concurrent.ExecutorService;
046import java.util.concurrent.Executors;
047import java.util.concurrent.Future;
048import java.util.concurrent.FutureTask;
049import java.util.concurrent.ThreadPoolExecutor;
050import java.util.concurrent.TimeUnit;
051import java.util.regex.Pattern;
052import org.apache.hadoop.conf.Configuration;
053import org.apache.hadoop.fs.BlockLocation;
054import org.apache.hadoop.fs.FSDataInputStream;
055import org.apache.hadoop.fs.FSDataOutputStream;
056import org.apache.hadoop.fs.FileStatus;
057import org.apache.hadoop.fs.FileSystem;
058import org.apache.hadoop.fs.FileUtil;
059import org.apache.hadoop.fs.Path;
060import org.apache.hadoop.fs.PathFilter;
061import org.apache.hadoop.fs.permission.FsAction;
062import org.apache.hadoop.fs.permission.FsPermission;
063import org.apache.hadoop.hbase.ClusterId;
064import org.apache.hadoop.hbase.HColumnDescriptor;
065import org.apache.hadoop.hbase.HConstants;
066import org.apache.hadoop.hbase.HDFSBlocksDistribution;
067import org.apache.hadoop.hbase.HRegionInfo;
068import org.apache.hadoop.hbase.TableName;
069import org.apache.hadoop.hbase.client.RegionInfo;
070import org.apache.hadoop.hbase.client.RegionInfoBuilder;
071import org.apache.hadoop.hbase.exceptions.DeserializationException;
072import org.apache.hadoop.hbase.fs.HFileSystem;
073import org.apache.hadoop.hbase.io.HFileLink;
074import org.apache.hadoop.hbase.master.HMaster;
075import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
076import org.apache.hadoop.hbase.security.AccessDeniedException;
077import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter;
078import org.apache.hadoop.hdfs.DFSClient;
079import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
080import org.apache.hadoop.hdfs.DistributedFileSystem;
081import org.apache.hadoop.hdfs.protocol.HdfsConstants;
082import org.apache.hadoop.io.IOUtils;
083import org.apache.hadoop.ipc.RemoteException;
084import org.apache.hadoop.security.UserGroupInformation;
085import org.apache.hadoop.util.Progressable;
086import org.apache.hadoop.util.ReflectionUtils;
087import org.apache.hadoop.util.StringUtils;
088import org.apache.yetus.audience.InterfaceAudience;
089import org.slf4j.Logger;
090import org.slf4j.LoggerFactory;
091
092import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
093import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
094import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
095import org.apache.hbase.thirdparty.com.google.common.primitives.Ints;
096
097import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
098import org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos;
099
100/**
101 * Utility methods for interacting with the underlying file system.
102 */
103@InterfaceAudience.Private
104public abstract class FSUtils extends CommonFSUtils {
105  private static final Logger LOG = LoggerFactory.getLogger(FSUtils.class);
106
107  private static final String THREAD_POOLSIZE = "hbase.client.localityCheck.threadPoolSize";
108  private static final int DEFAULT_THREAD_POOLSIZE = 2;
109
110  /** Set to true on Windows platforms */
111  @VisibleForTesting // currently only used in testing. TODO refactor into a test class
112  public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
113
114  protected FSUtils() {
115    super();
116  }
117
118  /**
119   * @return True is <code>fs</code> is instance of DistributedFileSystem
120   * @throws IOException
121   */
122  public static boolean isDistributedFileSystem(final FileSystem fs) throws IOException {
123    FileSystem fileSystem = fs;
124    // If passed an instance of HFileSystem, it fails instanceof DistributedFileSystem.
125    // Check its backing fs for dfs-ness.
126    if (fs instanceof HFileSystem) {
127      fileSystem = ((HFileSystem)fs).getBackingFs();
128    }
129    return fileSystem instanceof DistributedFileSystem;
130  }
131
132  /**
133   * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
134   * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true.  Does not consider
135   * schema; i.e. if schemas different but path or subpath matches, the two will equate.
136   * @param pathToSearch Path we will be trying to match.
137   * @param pathTail
138   * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
139   */
140  public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) {
141    if (pathToSearch.depth() != pathTail.depth()) return false;
142    Path tailPath = pathTail;
143    String tailName;
144    Path toSearch = pathToSearch;
145    String toSearchName;
146    boolean result = false;
147    do {
148      tailName = tailPath.getName();
149      if (tailName == null || tailName.length() <= 0) {
150        result = true;
151        break;
152      }
153      toSearchName = toSearch.getName();
154      if (toSearchName == null || toSearchName.length() <= 0) break;
155      // Move up a parent on each path for next go around.  Path doesn't let us go off the end.
156      tailPath = tailPath.getParent();
157      toSearch = toSearch.getParent();
158    } while(tailName.equals(toSearchName));
159    return result;
160  }
161
162  public static FSUtils getInstance(FileSystem fs, Configuration conf) {
163    String scheme = fs.getUri().getScheme();
164    if (scheme == null) {
165      LOG.warn("Could not find scheme for uri " +
166          fs.getUri() + ", default to hdfs");
167      scheme = "hdfs";
168    }
169    Class<?> fsUtilsClass = conf.getClass("hbase.fsutil." +
170        scheme + ".impl", FSHDFSUtils.class); // Default to HDFS impl
171    FSUtils fsUtils = (FSUtils)ReflectionUtils.newInstance(fsUtilsClass, conf);
172    return fsUtils;
173  }
174
175  /**
176   * Delete the region directory if exists.
177   * @param conf
178   * @param hri
179   * @return True if deleted the region directory.
180   * @throws IOException
181   */
182  public static boolean deleteRegionDir(final Configuration conf, final HRegionInfo hri)
183  throws IOException {
184    Path rootDir = getRootDir(conf);
185    FileSystem fs = rootDir.getFileSystem(conf);
186    return deleteDirectory(fs,
187      new Path(getTableDir(rootDir, hri.getTable()), hri.getEncodedName()));
188  }
189
190 /**
191   * Create the specified file on the filesystem. By default, this will:
192   * <ol>
193   * <li>overwrite the file if it exists</li>
194   * <li>apply the umask in the configuration (if it is enabled)</li>
195   * <li>use the fs configured buffer size (or 4096 if not set)</li>
196   * <li>use the configured column family replication or default replication if
197   * {@link HColumnDescriptor#DEFAULT_DFS_REPLICATION}</li>
198   * <li>use the default block size</li>
199   * <li>not track progress</li>
200   * </ol>
201   * @param conf configurations
202   * @param fs {@link FileSystem} on which to write the file
203   * @param path {@link Path} to the file to write
204   * @param perm permissions
205   * @param favoredNodes
206   * @return output stream to the created file
207   * @throws IOException if the file cannot be created
208   */
209  public static FSDataOutputStream create(Configuration conf, FileSystem fs, Path path,
210      FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException {
211    if (fs instanceof HFileSystem) {
212      FileSystem backingFs = ((HFileSystem)fs).getBackingFs();
213      if (backingFs instanceof DistributedFileSystem) {
214        // Try to use the favoredNodes version via reflection to allow backwards-
215        // compatibility.
216        short replication = Short.parseShort(conf.get(HColumnDescriptor.DFS_REPLICATION,
217          String.valueOf(HColumnDescriptor.DEFAULT_DFS_REPLICATION)));
218        try {
219          return (FSDataOutputStream) (DistributedFileSystem.class.getDeclaredMethod("create",
220            Path.class, FsPermission.class, boolean.class, int.class, short.class, long.class,
221            Progressable.class, InetSocketAddress[].class).invoke(backingFs, path, perm, true,
222            getDefaultBufferSize(backingFs),
223            replication > 0 ? replication : getDefaultReplication(backingFs, path),
224            getDefaultBlockSize(backingFs, path), null, favoredNodes));
225        } catch (InvocationTargetException ite) {
226          // Function was properly called, but threw it's own exception.
227          throw new IOException(ite.getCause());
228        } catch (NoSuchMethodException e) {
229          LOG.debug("DFS Client does not support most favored nodes create; using default create");
230          if (LOG.isTraceEnabled()) LOG.trace("Ignoring; use default create", e);
231        } catch (IllegalArgumentException e) {
232          LOG.debug("Ignoring (most likely Reflection related exception) " + e);
233        } catch (SecurityException e) {
234          LOG.debug("Ignoring (most likely Reflection related exception) " + e);
235        } catch (IllegalAccessException e) {
236          LOG.debug("Ignoring (most likely Reflection related exception) " + e);
237        }
238      }
239    }
240    return create(fs, path, perm, true);
241  }
242
243  /**
244   * Checks to see if the specified file system is available
245   *
246   * @param fs filesystem
247   * @throws IOException e
248   */
249  public static void checkFileSystemAvailable(final FileSystem fs)
250  throws IOException {
251    if (!(fs instanceof DistributedFileSystem)) {
252      return;
253    }
254    IOException exception = null;
255    DistributedFileSystem dfs = (DistributedFileSystem) fs;
256    try {
257      if (dfs.exists(new Path("/"))) {
258        return;
259      }
260    } catch (IOException e) {
261      exception = e instanceof RemoteException ?
262              ((RemoteException)e).unwrapRemoteException() : e;
263    }
264    try {
265      fs.close();
266    } catch (Exception e) {
267      LOG.error("file system close failed: ", e);
268    }
269    IOException io = new IOException("File system is not available");
270    io.initCause(exception);
271    throw io;
272  }
273
274  /**
275   * We use reflection because {@link DistributedFileSystem#setSafeMode(
276   * HdfsConstants.SafeModeAction action, boolean isChecked)} is not in hadoop 1.1
277   *
278   * @param dfs
279   * @return whether we're in safe mode
280   * @throws IOException
281   */
282  private static boolean isInSafeMode(DistributedFileSystem dfs) throws IOException {
283    boolean inSafeMode = false;
284    try {
285      Method m = DistributedFileSystem.class.getMethod("setSafeMode", new Class<?> []{
286          org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.class, boolean.class});
287      inSafeMode = (Boolean) m.invoke(dfs,
288        org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET, true);
289    } catch (Exception e) {
290      if (e instanceof IOException) throw (IOException) e;
291
292      // Check whether dfs is on safemode.
293      inSafeMode = dfs.setSafeMode(
294        org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET);
295    }
296    return inSafeMode;
297  }
298
299  /**
300   * Check whether dfs is in safemode.
301   * @param conf
302   * @throws IOException
303   */
304  public static void checkDfsSafeMode(final Configuration conf)
305  throws IOException {
306    boolean isInSafeMode = false;
307    FileSystem fs = FileSystem.get(conf);
308    if (fs instanceof DistributedFileSystem) {
309      DistributedFileSystem dfs = (DistributedFileSystem)fs;
310      isInSafeMode = isInSafeMode(dfs);
311    }
312    if (isInSafeMode) {
313      throw new IOException("File system is in safemode, it can't be written now");
314    }
315  }
316
317  /**
318   * Verifies current version of file system
319   *
320   * @param fs filesystem object
321   * @param rootdir root hbase directory
322   * @return null if no version file exists, version string otherwise.
323   * @throws IOException e
324   * @throws org.apache.hadoop.hbase.exceptions.DeserializationException
325   */
326  public static String getVersion(FileSystem fs, Path rootdir)
327  throws IOException, DeserializationException {
328    Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
329    FileStatus[] status = null;
330    try {
331      // hadoop 2.0 throws FNFE if directory does not exist.
332      // hadoop 1.0 returns null if directory does not exist.
333      status = fs.listStatus(versionFile);
334    } catch (FileNotFoundException fnfe) {
335      return null;
336    }
337    if (status == null || status.length == 0) return null;
338    String version = null;
339    byte [] content = new byte [(int)status[0].getLen()];
340    FSDataInputStream s = fs.open(versionFile);
341    try {
342      IOUtils.readFully(s, content, 0, content.length);
343      if (ProtobufUtil.isPBMagicPrefix(content)) {
344        version = parseVersionFrom(content);
345      } else {
346        // Presume it pre-pb format.
347        InputStream is = new ByteArrayInputStream(content);
348        DataInputStream dis = new DataInputStream(is);
349        try {
350          version = dis.readUTF();
351        } finally {
352          dis.close();
353        }
354      }
355    } catch (EOFException eof) {
356      LOG.warn("Version file was empty, odd, will try to set it.");
357    } finally {
358      s.close();
359    }
360    return version;
361  }
362
363  /**
364   * Parse the content of the ${HBASE_ROOTDIR}/hbase.version file.
365   * @param bytes The byte content of the hbase.version file.
366   * @return The version found in the file as a String.
367   * @throws DeserializationException
368   */
369  static String parseVersionFrom(final byte [] bytes)
370  throws DeserializationException {
371    ProtobufUtil.expectPBMagicPrefix(bytes);
372    int pblen = ProtobufUtil.lengthOfPBMagic();
373    FSProtos.HBaseVersionFileContent.Builder builder =
374      FSProtos.HBaseVersionFileContent.newBuilder();
375    try {
376      ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
377      return builder.getVersion();
378    } catch (IOException e) {
379      // Convert
380      throw new DeserializationException(e);
381    }
382  }
383
384  /**
385   * Create the content to write into the ${HBASE_ROOTDIR}/hbase.version file.
386   * @param version Version to persist
387   * @return Serialized protobuf with <code>version</code> content and a bit of pb magic for a prefix.
388   */
389  static byte [] toVersionByteArray(final String version) {
390    FSProtos.HBaseVersionFileContent.Builder builder =
391      FSProtos.HBaseVersionFileContent.newBuilder();
392    return ProtobufUtil.prependPBMagic(builder.setVersion(version).build().toByteArray());
393  }
394
395  /**
396   * Verifies current version of file system
397   *
398   * @param fs file system
399   * @param rootdir root directory of HBase installation
400   * @param message if true, issues a message on System.out
401   *
402   * @throws IOException e
403   * @throws DeserializationException
404   */
405  public static void checkVersion(FileSystem fs, Path rootdir, boolean message)
406  throws IOException, DeserializationException {
407    checkVersion(fs, rootdir, message, 0, HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
408  }
409
410  /**
411   * Verifies current version of file system
412   *
413   * @param fs file system
414   * @param rootdir root directory of HBase installation
415   * @param message if true, issues a message on System.out
416   * @param wait wait interval
417   * @param retries number of times to retry
418   *
419   * @throws IOException e
420   * @throws DeserializationException
421   */
422  public static void checkVersion(FileSystem fs, Path rootdir,
423      boolean message, int wait, int retries)
424  throws IOException, DeserializationException {
425    String version = getVersion(fs, rootdir);
426    String msg;
427    if (version == null) {
428      if (!metaRegionExists(fs, rootdir)) {
429        // rootDir is empty (no version file and no root region)
430        // just create new version file (HBASE-1195)
431        setVersion(fs, rootdir, wait, retries);
432        return;
433      } else {
434        msg = "hbase.version file is missing. Is your hbase.rootdir valid? " +
435            "You can restore hbase.version file by running 'HBCK2 filesystem -fix'. " +
436            "See https://github.com/apache/hbase-operator-tools/tree/master/hbase-hbck2";
437      }
438    } else if (version.compareTo(HConstants.FILE_SYSTEM_VERSION) == 0) {
439      return;
440    } else {
441      msg = "HBase file layout needs to be upgraded. Current filesystem version is " + version +
442          " but software requires version " + HConstants.FILE_SYSTEM_VERSION +
443          ". Consult http://hbase.apache.org/book.html for further information about " +
444          "upgrading HBase.";
445    }
446
447    // version is deprecated require migration
448    // Output on stdout so user sees it in terminal.
449    if (message) {
450      System.out.println("WARNING! " + msg);
451    }
452    throw new FileSystemVersionException(msg);
453  }
454
455  /**
456   * Sets version of file system
457   *
458   * @param fs filesystem object
459   * @param rootdir hbase root
460   * @throws IOException e
461   */
462  public static void setVersion(FileSystem fs, Path rootdir)
463  throws IOException {
464    setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, 0,
465      HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
466  }
467
468  /**
469   * Sets version of file system
470   *
471   * @param fs filesystem object
472   * @param rootdir hbase root
473   * @param wait time to wait for retry
474   * @param retries number of times to retry before failing
475   * @throws IOException e
476   */
477  public static void setVersion(FileSystem fs, Path rootdir, int wait, int retries)
478  throws IOException {
479    setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, wait, retries);
480  }
481
482
483  /**
484   * Sets version of file system
485   *
486   * @param fs filesystem object
487   * @param rootdir hbase root directory
488   * @param version version to set
489   * @param wait time to wait for retry
490   * @param retries number of times to retry before throwing an IOException
491   * @throws IOException e
492   */
493  public static void setVersion(FileSystem fs, Path rootdir, String version,
494      int wait, int retries) throws IOException {
495    Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
496    Path tempVersionFile = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY + Path.SEPARATOR +
497      HConstants.VERSION_FILE_NAME);
498    while (true) {
499      try {
500        // Write the version to a temporary file
501        FSDataOutputStream s = fs.create(tempVersionFile);
502        try {
503          s.write(toVersionByteArray(version));
504          s.close();
505          s = null;
506          // Move the temp version file to its normal location. Returns false
507          // if the rename failed. Throw an IOE in that case.
508          if (!fs.rename(tempVersionFile, versionFile)) {
509            throw new IOException("Unable to move temp version file to " + versionFile);
510          }
511        } finally {
512          // Cleaning up the temporary if the rename failed would be trying
513          // too hard. We'll unconditionally create it again the next time
514          // through anyway, files are overwritten by default by create().
515
516          // Attempt to close the stream on the way out if it is still open.
517          try {
518            if (s != null) s.close();
519          } catch (IOException ignore) { }
520        }
521        LOG.info("Created version file at " + rootdir.toString() + " with version=" + version);
522        return;
523      } catch (IOException e) {
524        if (retries > 0) {
525          LOG.debug("Unable to create version file at " + rootdir.toString() + ", retrying", e);
526          fs.delete(versionFile, false);
527          try {
528            if (wait > 0) {
529              Thread.sleep(wait);
530            }
531          } catch (InterruptedException ie) {
532            throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
533          }
534          retries--;
535        } else {
536          throw e;
537        }
538      }
539    }
540  }
541
542  /**
543   * Checks that a cluster ID file exists in the HBase root directory
544   * @param fs the root directory FileSystem
545   * @param rootdir the HBase root directory in HDFS
546   * @param wait how long to wait between retries
547   * @return <code>true</code> if the file exists, otherwise <code>false</code>
548   * @throws IOException if checking the FileSystem fails
549   */
550  public static boolean checkClusterIdExists(FileSystem fs, Path rootdir,
551      int wait) throws IOException {
552    while (true) {
553      try {
554        Path filePath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
555        return fs.exists(filePath);
556      } catch (IOException ioe) {
557        if (wait > 0) {
558          LOG.warn("Unable to check cluster ID file in " + rootdir.toString() +
559              ", retrying in "+wait+"msec: "+StringUtils.stringifyException(ioe));
560          try {
561            Thread.sleep(wait);
562          } catch (InterruptedException e) {
563            throw (InterruptedIOException)new InterruptedIOException().initCause(e);
564          }
565        } else {
566          throw ioe;
567        }
568      }
569    }
570  }
571
572  /**
573   * Returns the value of the unique cluster ID stored for this HBase instance.
574   * @param fs the root directory FileSystem
575   * @param rootdir the path to the HBase root directory
576   * @return the unique cluster identifier
577   * @throws IOException if reading the cluster ID file fails
578   */
579  public static ClusterId getClusterId(FileSystem fs, Path rootdir)
580  throws IOException {
581    Path idPath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
582    ClusterId clusterId = null;
583    FileStatus status = fs.exists(idPath)? fs.getFileStatus(idPath):  null;
584    if (status != null) {
585      int len = Ints.checkedCast(status.getLen());
586      byte [] content = new byte[len];
587      FSDataInputStream in = fs.open(idPath);
588      try {
589        in.readFully(content);
590      } catch (EOFException eof) {
591        LOG.warn("Cluster ID file " + idPath.toString() + " was empty");
592      } finally{
593        in.close();
594      }
595      try {
596        clusterId = ClusterId.parseFrom(content);
597      } catch (DeserializationException e) {
598        throw new IOException("content=" + Bytes.toString(content), e);
599      }
600      // If not pb'd, make it so.
601      if (!ProtobufUtil.isPBMagicPrefix(content)) {
602        String cid = null;
603        in = fs.open(idPath);
604        try {
605          cid = in.readUTF();
606          clusterId = new ClusterId(cid);
607        } catch (EOFException eof) {
608          LOG.warn("Cluster ID file " + idPath.toString() + " was empty");
609        } finally {
610          in.close();
611        }
612        rewriteAsPb(fs, rootdir, idPath, clusterId);
613      }
614      return clusterId;
615    } else {
616      LOG.warn("Cluster ID file does not exist at " + idPath.toString());
617    }
618    return clusterId;
619  }
620
621  /**
622   * @param cid
623   * @throws IOException
624   */
625  private static void rewriteAsPb(final FileSystem fs, final Path rootdir, final Path p,
626      final ClusterId cid)
627  throws IOException {
628    // Rewrite the file as pb.  Move aside the old one first, write new
629    // then delete the moved-aside file.
630    Path movedAsideName = new Path(p + "." + System.currentTimeMillis());
631    if (!fs.rename(p, movedAsideName)) throw new IOException("Failed rename of " + p);
632    setClusterId(fs, rootdir, cid, 100);
633    if (!fs.delete(movedAsideName, false)) {
634      throw new IOException("Failed delete of " + movedAsideName);
635    }
636    LOG.debug("Rewrote the hbase.id file as pb");
637  }
638
639  /**
640   * Writes a new unique identifier for this cluster to the "hbase.id" file
641   * in the HBase root directory
642   * @param fs the root directory FileSystem
643   * @param rootdir the path to the HBase root directory
644   * @param clusterId the unique identifier to store
645   * @param wait how long (in milliseconds) to wait between retries
646   * @throws IOException if writing to the FileSystem fails and no wait value
647   */
648  public static void setClusterId(FileSystem fs, Path rootdir, ClusterId clusterId,
649      int wait) throws IOException {
650    while (true) {
651      try {
652        Path idFile = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
653        Path tempIdFile = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY +
654          Path.SEPARATOR + HConstants.CLUSTER_ID_FILE_NAME);
655        // Write the id file to a temporary location
656        FSDataOutputStream s = fs.create(tempIdFile);
657        try {
658          s.write(clusterId.toByteArray());
659          s.close();
660          s = null;
661          // Move the temporary file to its normal location. Throw an IOE if
662          // the rename failed
663          if (!fs.rename(tempIdFile, idFile)) {
664            throw new IOException("Unable to move temp version file to " + idFile);
665          }
666        } finally {
667          // Attempt to close the stream if still open on the way out
668          try {
669            if (s != null) s.close();
670          } catch (IOException ignore) { }
671        }
672        if (LOG.isDebugEnabled()) {
673          LOG.debug("Created cluster ID file at " + idFile.toString() + " with ID: " + clusterId);
674        }
675        return;
676      } catch (IOException ioe) {
677        if (wait > 0) {
678          LOG.warn("Unable to create cluster ID file in " + rootdir.toString() +
679              ", retrying in " + wait + "msec: " + StringUtils.stringifyException(ioe));
680          try {
681            Thread.sleep(wait);
682          } catch (InterruptedException e) {
683            throw (InterruptedIOException)new InterruptedIOException().initCause(e);
684          }
685        } else {
686          throw ioe;
687        }
688      }
689    }
690  }
691
692  /**
693   * If DFS, check safe mode and if so, wait until we clear it.
694   * @param conf configuration
695   * @param wait Sleep between retries
696   * @throws IOException e
697   */
698  public static void waitOnSafeMode(final Configuration conf,
699    final long wait)
700  throws IOException {
701    FileSystem fs = FileSystem.get(conf);
702    if (!(fs instanceof DistributedFileSystem)) return;
703    DistributedFileSystem dfs = (DistributedFileSystem)fs;
704    // Make sure dfs is not in safe mode
705    while (isInSafeMode(dfs)) {
706      LOG.info("Waiting for dfs to exit safe mode...");
707      try {
708        Thread.sleep(wait);
709      } catch (InterruptedException e) {
710        throw (InterruptedIOException)new InterruptedIOException().initCause(e);
711      }
712    }
713  }
714
715  /**
716   * Checks if meta region exists
717   * @param fs file system
718   * @param rootDir root directory of HBase installation
719   * @return true if exists
720   */
721  public static boolean metaRegionExists(FileSystem fs, Path rootDir) throws IOException {
722    Path metaRegionDir = getRegionDirFromRootDir(rootDir, RegionInfoBuilder.FIRST_META_REGIONINFO);
723    return fs.exists(metaRegionDir);
724  }
725
726  /**
727   * Compute HDFS blocks distribution of a given file, or a portion of the file
728   * @param fs file system
729   * @param status file status of the file
730   * @param start start position of the portion
731   * @param length length of the portion
732   * @return The HDFS blocks distribution
733   */
734  static public HDFSBlocksDistribution computeHDFSBlocksDistribution(
735    final FileSystem fs, FileStatus status, long start, long length)
736    throws IOException {
737    HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution();
738    BlockLocation [] blockLocations =
739      fs.getFileBlockLocations(status, start, length);
740    for(BlockLocation bl : blockLocations) {
741      String [] hosts = bl.getHosts();
742      long len = bl.getLength();
743      blocksDistribution.addHostsAndBlockWeight(hosts, len);
744    }
745
746    return blocksDistribution;
747  }
748
749  /**
750   * Update blocksDistribution with blockLocations
751   * @param blocksDistribution the hdfs blocks distribution
752   * @param blockLocations an array containing block location
753   */
754  static public void addToHDFSBlocksDistribution(
755      HDFSBlocksDistribution blocksDistribution, BlockLocation[] blockLocations)
756      throws IOException {
757    for (BlockLocation bl : blockLocations) {
758      String[] hosts = bl.getHosts();
759      long len = bl.getLength();
760      blocksDistribution.addHostsAndBlockWeight(hosts, len);
761    }
762  }
763
764  // TODO move this method OUT of FSUtils. No dependencies to HMaster
765  /**
766   * Returns the total overall fragmentation percentage. Includes hbase:meta and
767   * -ROOT- as well.
768   *
769   * @param master  The master defining the HBase root and file system.
770   * @return A map for each table and its percentage.
771   * @throws IOException When scanning the directory fails.
772   */
773  public static int getTotalTableFragmentation(final HMaster master)
774  throws IOException {
775    Map<String, Integer> map = getTableFragmentation(master);
776    return map != null && map.size() > 0 ? map.get("-TOTAL-") : -1;
777  }
778
779  /**
780   * Runs through the HBase rootdir and checks how many stores for each table
781   * have more than one file in them. Checks -ROOT- and hbase:meta too. The total
782   * percentage across all tables is stored under the special key "-TOTAL-".
783   *
784   * @param master  The master defining the HBase root and file system.
785   * @return A map for each table and its percentage.
786   *
787   * @throws IOException When scanning the directory fails.
788   */
789  public static Map<String, Integer> getTableFragmentation(
790    final HMaster master)
791  throws IOException {
792    Path path = getRootDir(master.getConfiguration());
793    // since HMaster.getFileSystem() is package private
794    FileSystem fs = path.getFileSystem(master.getConfiguration());
795    return getTableFragmentation(fs, path);
796  }
797
798  /**
799   * Runs through the HBase rootdir and checks how many stores for each table
800   * have more than one file in them. Checks -ROOT- and hbase:meta too. The total
801   * percentage across all tables is stored under the special key "-TOTAL-".
802   *
803   * @param fs  The file system to use.
804   * @param hbaseRootDir  The root directory to scan.
805   * @return A map for each table and its percentage.
806   * @throws IOException When scanning the directory fails.
807   */
808  public static Map<String, Integer> getTableFragmentation(
809    final FileSystem fs, final Path hbaseRootDir)
810  throws IOException {
811    Map<String, Integer> frags = new HashMap<>();
812    int cfCountTotal = 0;
813    int cfFragTotal = 0;
814    PathFilter regionFilter = new RegionDirFilter(fs);
815    PathFilter familyFilter = new FamilyDirFilter(fs);
816    List<Path> tableDirs = getTableDirs(fs, hbaseRootDir);
817    for (Path d : tableDirs) {
818      int cfCount = 0;
819      int cfFrag = 0;
820      FileStatus[] regionDirs = fs.listStatus(d, regionFilter);
821      for (FileStatus regionDir : regionDirs) {
822        Path dd = regionDir.getPath();
823        // else its a region name, now look in region for families
824        FileStatus[] familyDirs = fs.listStatus(dd, familyFilter);
825        for (FileStatus familyDir : familyDirs) {
826          cfCount++;
827          cfCountTotal++;
828          Path family = familyDir.getPath();
829          // now in family make sure only one file
830          FileStatus[] familyStatus = fs.listStatus(family);
831          if (familyStatus.length > 1) {
832            cfFrag++;
833            cfFragTotal++;
834          }
835        }
836      }
837      // compute percentage per table and store in result list
838      frags.put(FSUtils.getTableName(d).getNameAsString(),
839        cfCount == 0? 0: Math.round((float) cfFrag / cfCount * 100));
840    }
841    // set overall percentage for all tables
842    frags.put("-TOTAL-",
843      cfCountTotal == 0? 0: Math.round((float) cfFragTotal / cfCountTotal * 100));
844    return frags;
845  }
846
847  public static void renameFile(FileSystem fs, Path src, Path dst) throws IOException {
848    if (fs.exists(dst) && !fs.delete(dst, false)) {
849      throw new IOException("Can not delete " + dst);
850    }
851    if (!fs.rename(src, dst)) {
852      throw new IOException("Can not rename from " + src + " to " + dst);
853    }
854  }
855
856  /**
857   * A {@link PathFilter} that returns only regular files.
858   */
859  static class FileFilter extends AbstractFileStatusFilter {
860    private final FileSystem fs;
861
862    public FileFilter(final FileSystem fs) {
863      this.fs = fs;
864    }
865
866    @Override
867    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
868      try {
869        return isFile(fs, isDir, p);
870      } catch (IOException e) {
871        LOG.warn("unable to verify if path=" + p + " is a regular file", e);
872        return false;
873      }
874    }
875  }
876
877  /**
878   * Directory filter that doesn't include any of the directories in the specified blacklist
879   */
880  public static class BlackListDirFilter extends AbstractFileStatusFilter {
881    private final FileSystem fs;
882    private List<String> blacklist;
883
884    /**
885     * Create a filter on the givem filesystem with the specified blacklist
886     * @param fs filesystem to filter
887     * @param directoryNameBlackList list of the names of the directories to filter. If
888     *          <tt>null</tt>, all directories are returned
889     */
890    @SuppressWarnings("unchecked")
891    public BlackListDirFilter(final FileSystem fs, final List<String> directoryNameBlackList) {
892      this.fs = fs;
893      blacklist =
894        (List<String>) (directoryNameBlackList == null ? Collections.emptyList()
895          : directoryNameBlackList);
896    }
897
898    @Override
899    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
900      if (!isValidName(p.getName())) {
901        return false;
902      }
903
904      try {
905        return isDirectory(fs, isDir, p);
906      } catch (IOException e) {
907        LOG.warn("An error occurred while verifying if [" + p.toString()
908            + "] is a valid directory. Returning 'not valid' and continuing.", e);
909        return false;
910      }
911    }
912
913    protected boolean isValidName(final String name) {
914      return !blacklist.contains(name);
915    }
916  }
917
918  /**
919   * A {@link PathFilter} that only allows directories.
920   */
921  public static class DirFilter extends BlackListDirFilter {
922
923    public DirFilter(FileSystem fs) {
924      super(fs, null);
925    }
926  }
927
928  /**
929   * A {@link PathFilter} that returns usertable directories. To get all directories use the
930   * {@link BlackListDirFilter} with a <tt>null</tt> blacklist
931   */
932  public static class UserTableDirFilter extends BlackListDirFilter {
933    public UserTableDirFilter(FileSystem fs) {
934      super(fs, HConstants.HBASE_NON_TABLE_DIRS);
935    }
936
937    @Override
938    protected boolean isValidName(final String name) {
939      if (!super.isValidName(name))
940        return false;
941
942      try {
943        TableName.isLegalTableQualifierName(Bytes.toBytes(name));
944      } catch (IllegalArgumentException e) {
945        LOG.info("INVALID NAME " + name);
946        return false;
947      }
948      return true;
949    }
950  }
951
952  public void recoverFileLease(final FileSystem fs, final Path p, Configuration conf)
953      throws IOException {
954    recoverFileLease(fs, p, conf, null);
955  }
956
957  /**
958   * Recover file lease. Used when a file might be suspect
959   * to be had been left open by another process.
960   * @param fs FileSystem handle
961   * @param p Path of file to recover lease
962   * @param conf Configuration handle
963   * @throws IOException
964   */
965  public abstract void recoverFileLease(final FileSystem fs, final Path p,
966      Configuration conf, CancelableProgressable reporter) throws IOException;
967
968  public static List<Path> getTableDirs(final FileSystem fs, final Path rootdir)
969      throws IOException {
970    List<Path> tableDirs = new LinkedList<>();
971
972    for(FileStatus status :
973        fs.globStatus(new Path(rootdir,
974            new Path(HConstants.BASE_NAMESPACE_DIR, "*")))) {
975      tableDirs.addAll(FSUtils.getLocalTableDirs(fs, status.getPath()));
976    }
977    return tableDirs;
978  }
979
980  /**
981   * @param fs
982   * @param rootdir
983   * @return All the table directories under <code>rootdir</code>. Ignore non table hbase folders such as
984   * .logs, .oldlogs, .corrupt folders.
985   * @throws IOException
986   */
987  public static List<Path> getLocalTableDirs(final FileSystem fs, final Path rootdir)
988      throws IOException {
989    // presumes any directory under hbase.rootdir is a table
990    FileStatus[] dirs = fs.listStatus(rootdir, new UserTableDirFilter(fs));
991    List<Path> tabledirs = new ArrayList<>(dirs.length);
992    for (FileStatus dir: dirs) {
993      tabledirs.add(dir.getPath());
994    }
995    return tabledirs;
996  }
997
998  /**
999   * Filter for all dirs that don't start with '.'
1000   */
1001  public static class RegionDirFilter extends AbstractFileStatusFilter {
1002    // This pattern will accept 0.90+ style hex region dirs and older numeric region dir names.
1003    final public static Pattern regionDirPattern = Pattern.compile("^[0-9a-f]*$");
1004    final FileSystem fs;
1005
1006    public RegionDirFilter(FileSystem fs) {
1007      this.fs = fs;
1008    }
1009
1010    @Override
1011    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1012      if (!regionDirPattern.matcher(p.getName()).matches()) {
1013        return false;
1014      }
1015
1016      try {
1017        return isDirectory(fs, isDir, p);
1018      } catch (IOException ioe) {
1019        // Maybe the file was moved or the fs was disconnected.
1020        LOG.warn("Skipping file " + p +" due to IOException", ioe);
1021        return false;
1022      }
1023    }
1024  }
1025
1026  /**
1027   * Given a particular table dir, return all the regiondirs inside it, excluding files such as
1028   * .tableinfo
1029   * @param fs A file system for the Path
1030   * @param tableDir Path to a specific table directory &lt;hbase.rootdir&gt;/&lt;tabledir&gt;
1031   * @return List of paths to valid region directories in table dir.
1032   * @throws IOException
1033   */
1034  public static List<Path> getRegionDirs(final FileSystem fs, final Path tableDir) throws IOException {
1035    // assumes we are in a table dir.
1036    List<FileStatus> rds = listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs));
1037    if (rds == null) {
1038      return Collections.emptyList();
1039    }
1040    List<Path> regionDirs = new ArrayList<>(rds.size());
1041    for (FileStatus rdfs: rds) {
1042      Path rdPath = rdfs.getPath();
1043      regionDirs.add(rdPath);
1044    }
1045    return regionDirs;
1046  }
1047
1048  public static Path getRegionDirFromRootDir(Path rootDir, RegionInfo region) {
1049    return getRegionDirFromTableDir(getTableDir(rootDir, region.getTable()), region);
1050  }
1051
1052  public static Path getRegionDirFromTableDir(Path tableDir, RegionInfo region) {
1053    return new Path(tableDir, ServerRegionReplicaUtil.getRegionInfoForFs(region).getEncodedName());
1054  }
1055
1056  /**
1057   * Filter for all dirs that are legal column family names.  This is generally used for colfam
1058   * dirs &lt;hbase.rootdir&gt;/&lt;tabledir&gt;/&lt;regiondir&gt;/&lt;colfamdir&gt;.
1059   */
1060  public static class FamilyDirFilter extends AbstractFileStatusFilter {
1061    final FileSystem fs;
1062
1063    public FamilyDirFilter(FileSystem fs) {
1064      this.fs = fs;
1065    }
1066
1067    @Override
1068    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1069      try {
1070        // throws IAE if invalid
1071        HColumnDescriptor.isLegalFamilyName(Bytes.toBytes(p.getName()));
1072      } catch (IllegalArgumentException iae) {
1073        // path name is an invalid family name and thus is excluded.
1074        return false;
1075      }
1076
1077      try {
1078        return isDirectory(fs, isDir, p);
1079      } catch (IOException ioe) {
1080        // Maybe the file was moved or the fs was disconnected.
1081        LOG.warn("Skipping file " + p +" due to IOException", ioe);
1082        return false;
1083      }
1084    }
1085  }
1086
1087  /**
1088   * Given a particular region dir, return all the familydirs inside it
1089   *
1090   * @param fs A file system for the Path
1091   * @param regionDir Path to a specific region directory
1092   * @return List of paths to valid family directories in region dir.
1093   * @throws IOException
1094   */
1095  public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir) throws IOException {
1096    // assumes we are in a region dir.
1097    FileStatus[] fds = fs.listStatus(regionDir, new FamilyDirFilter(fs));
1098    List<Path> familyDirs = new ArrayList<>(fds.length);
1099    for (FileStatus fdfs: fds) {
1100      Path fdPath = fdfs.getPath();
1101      familyDirs.add(fdPath);
1102    }
1103    return familyDirs;
1104  }
1105
1106  public static List<Path> getReferenceFilePaths(final FileSystem fs, final Path familyDir) throws IOException {
1107    List<FileStatus> fds = listStatusWithStatusFilter(fs, familyDir, new ReferenceFileFilter(fs));
1108    if (fds == null) {
1109      return Collections.emptyList();
1110    }
1111    List<Path> referenceFiles = new ArrayList<>(fds.size());
1112    for (FileStatus fdfs: fds) {
1113      Path fdPath = fdfs.getPath();
1114      referenceFiles.add(fdPath);
1115    }
1116    return referenceFiles;
1117  }
1118
1119  /**
1120   * Filter for HFiles that excludes reference files.
1121   */
1122  public static class HFileFilter extends AbstractFileStatusFilter {
1123    final FileSystem fs;
1124
1125    public HFileFilter(FileSystem fs) {
1126      this.fs = fs;
1127    }
1128
1129    @Override
1130    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1131      if (!StoreFileInfo.isHFile(p)) {
1132        return false;
1133      }
1134
1135      try {
1136        return isFile(fs, isDir, p);
1137      } catch (IOException ioe) {
1138        // Maybe the file was moved or the fs was disconnected.
1139        LOG.warn("Skipping file " + p +" due to IOException", ioe);
1140        return false;
1141      }
1142    }
1143  }
1144
1145  /**
1146   * Filter for HFileLinks (StoreFiles and HFiles not included).
1147   * the filter itself does not consider if a link is file or not.
1148   */
1149  public static class HFileLinkFilter implements PathFilter {
1150
1151    @Override
1152    public boolean accept(Path p) {
1153      return HFileLink.isHFileLink(p);
1154    }
1155  }
1156
1157  public static class ReferenceFileFilter extends AbstractFileStatusFilter {
1158
1159    private final FileSystem fs;
1160
1161    public ReferenceFileFilter(FileSystem fs) {
1162      this.fs = fs;
1163    }
1164
1165    @Override
1166    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1167      if (!StoreFileInfo.isReference(p)) {
1168        return false;
1169      }
1170
1171      try {
1172        // only files can be references.
1173        return isFile(fs, isDir, p);
1174      } catch (IOException ioe) {
1175        // Maybe the file was moved or the fs was disconnected.
1176        LOG.warn("Skipping file " + p +" due to IOException", ioe);
1177        return false;
1178      }
1179    }
1180  }
1181
1182  /**
1183   * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1184   * table StoreFile names to the full Path.
1185   * <br>
1186   * Example...<br>
1187   * Key = 3944417774205889744  <br>
1188   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1189   *
1190   * @param map map to add values.  If null, this method will create and populate one to return
1191   * @param fs  The file system to use.
1192   * @param hbaseRootDir  The root directory to scan.
1193   * @param tableName name of the table to scan.
1194   * @return Map keyed by StoreFile name with a value of the full Path.
1195   * @throws IOException When scanning the directory fails.
1196   * @throws InterruptedException
1197   */
1198  public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map,
1199  final FileSystem fs, final Path hbaseRootDir, TableName tableName)
1200  throws IOException, InterruptedException {
1201    return getTableStoreFilePathMap(map, fs, hbaseRootDir, tableName, null, null, null);
1202  }
1203
1204  /**
1205   * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1206   * table StoreFile names to the full Path.  Note that because this method can be called
1207   * on a 'live' HBase system that we will skip files that no longer exist by the time
1208   * we traverse them and similarly the user of the result needs to consider that some
1209   * entries in this map may not exist by the time this call completes.
1210   * <br>
1211   * Example...<br>
1212   * Key = 3944417774205889744  <br>
1213   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1214   *
1215   * @param resultMap map to add values.  If null, this method will create and populate one to return
1216   * @param fs  The file system to use.
1217   * @param hbaseRootDir  The root directory to scan.
1218   * @param tableName name of the table to scan.
1219   * @param sfFilter optional path filter to apply to store files
1220   * @param executor optional executor service to parallelize this operation
1221   * @param errors ErrorReporter instance or null
1222   * @return Map keyed by StoreFile name with a value of the full Path.
1223   * @throws IOException When scanning the directory fails.
1224   * @throws InterruptedException
1225   */
1226  public static Map<String, Path> getTableStoreFilePathMap(
1227      Map<String, Path> resultMap,
1228      final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter,
1229      ExecutorService executor, final ErrorReporter errors) throws IOException, InterruptedException {
1230
1231    final Map<String, Path> finalResultMap =
1232        resultMap == null ? new ConcurrentHashMap<>(128, 0.75f, 32) : resultMap;
1233
1234    // only include the directory paths to tables
1235    Path tableDir = FSUtils.getTableDir(hbaseRootDir, tableName);
1236    // Inside a table, there are compaction.dir directories to skip.  Otherwise, all else
1237    // should be regions.
1238    final FamilyDirFilter familyFilter = new FamilyDirFilter(fs);
1239    final Vector<Exception> exceptions = new Vector<>();
1240
1241    try {
1242      List<FileStatus> regionDirs = FSUtils.listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs));
1243      if (regionDirs == null) {
1244        return finalResultMap;
1245      }
1246
1247      final List<Future<?>> futures = new ArrayList<>(regionDirs.size());
1248
1249      for (FileStatus regionDir : regionDirs) {
1250        if (null != errors) {
1251          errors.progress();
1252        }
1253        final Path dd = regionDir.getPath();
1254
1255        if (!exceptions.isEmpty()) {
1256          break;
1257        }
1258
1259        Runnable getRegionStoreFileMapCall = new Runnable() {
1260          @Override
1261          public void run() {
1262            try {
1263              HashMap<String,Path> regionStoreFileMap = new HashMap<>();
1264              List<FileStatus> familyDirs = FSUtils.listStatusWithStatusFilter(fs, dd, familyFilter);
1265              if (familyDirs == null) {
1266                if (!fs.exists(dd)) {
1267                  LOG.warn("Skipping region because it no longer exists: " + dd);
1268                } else {
1269                  LOG.warn("Skipping region because it has no family dirs: " + dd);
1270                }
1271                return;
1272              }
1273              for (FileStatus familyDir : familyDirs) {
1274                if (null != errors) {
1275                  errors.progress();
1276                }
1277                Path family = familyDir.getPath();
1278                if (family.getName().equals(HConstants.RECOVERED_EDITS_DIR)) {
1279                  continue;
1280                }
1281                // now in family, iterate over the StoreFiles and
1282                // put in map
1283                FileStatus[] familyStatus = fs.listStatus(family);
1284                for (FileStatus sfStatus : familyStatus) {
1285                  if (null != errors) {
1286                    errors.progress();
1287                  }
1288                  Path sf = sfStatus.getPath();
1289                  if (sfFilter == null || sfFilter.accept(sf)) {
1290                    regionStoreFileMap.put( sf.getName(), sf);
1291                  }
1292                }
1293              }
1294              finalResultMap.putAll(regionStoreFileMap);
1295            } catch (Exception e) {
1296              LOG.error("Could not get region store file map for region: " + dd, e);
1297              exceptions.add(e);
1298            }
1299          }
1300        };
1301
1302        // If executor is available, submit async tasks to exec concurrently, otherwise
1303        // just do serial sync execution
1304        if (executor != null) {
1305          Future<?> future = executor.submit(getRegionStoreFileMapCall);
1306          futures.add(future);
1307        } else {
1308          FutureTask<?> future = new FutureTask<>(getRegionStoreFileMapCall, null);
1309          future.run();
1310          futures.add(future);
1311        }
1312      }
1313
1314      // Ensure all pending tasks are complete (or that we run into an exception)
1315      for (Future<?> f : futures) {
1316        if (!exceptions.isEmpty()) {
1317          break;
1318        }
1319        try {
1320          f.get();
1321        } catch (ExecutionException e) {
1322          LOG.error("Unexpected exec exception!  Should've been caught already.  (Bug?)", e);
1323          // Shouldn't happen, we already logged/caught any exceptions in the Runnable
1324        }
1325      }
1326    } catch (IOException e) {
1327      LOG.error("Cannot execute getTableStoreFilePathMap for " + tableName, e);
1328      exceptions.add(e);
1329    } finally {
1330      if (!exceptions.isEmpty()) {
1331        // Just throw the first exception as an indication something bad happened
1332        // Don't need to propagate all the exceptions, we already logged them all anyway
1333        Throwables.propagateIfInstanceOf(exceptions.firstElement(), IOException.class);
1334        throw Throwables.propagate(exceptions.firstElement());
1335      }
1336    }
1337
1338    return finalResultMap;
1339  }
1340
1341  public static int getRegionReferenceFileCount(final FileSystem fs, final Path p) {
1342    int result = 0;
1343    try {
1344      for (Path familyDir:getFamilyDirs(fs, p)){
1345        result += getReferenceFilePaths(fs, familyDir).size();
1346      }
1347    } catch (IOException e) {
1348      LOG.warn("Error Counting reference files.", e);
1349    }
1350    return result;
1351  }
1352
1353  /**
1354   * Runs through the HBase rootdir and creates a reverse lookup map for
1355   * table StoreFile names to the full Path.
1356   * <br>
1357   * Example...<br>
1358   * Key = 3944417774205889744  <br>
1359   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1360   *
1361   * @param fs  The file system to use.
1362   * @param hbaseRootDir  The root directory to scan.
1363   * @return Map keyed by StoreFile name with a value of the full Path.
1364   * @throws IOException When scanning the directory fails.
1365   * @throws InterruptedException
1366   */
1367  public static Map<String, Path> getTableStoreFilePathMap(
1368    final FileSystem fs, final Path hbaseRootDir)
1369  throws IOException, InterruptedException {
1370    return getTableStoreFilePathMap(fs, hbaseRootDir, null, null, null);
1371  }
1372
1373  /**
1374   * Runs through the HBase rootdir and creates a reverse lookup map for
1375   * table StoreFile names to the full Path.
1376   * <br>
1377   * Example...<br>
1378   * Key = 3944417774205889744  <br>
1379   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1380   *
1381   * @param fs  The file system to use.
1382   * @param hbaseRootDir  The root directory to scan.
1383   * @param sfFilter optional path filter to apply to store files
1384   * @param executor optional executor service to parallelize this operation
1385   * @param errors ErrorReporter instance or null
1386   * @return Map keyed by StoreFile name with a value of the full Path.
1387   * @throws IOException When scanning the directory fails.
1388   * @throws InterruptedException
1389   */
1390  public static Map<String, Path> getTableStoreFilePathMap(
1391    final FileSystem fs, final Path hbaseRootDir, PathFilter sfFilter,
1392    ExecutorService executor, ErrorReporter errors)
1393  throws IOException, InterruptedException {
1394    ConcurrentHashMap<String, Path> map = new ConcurrentHashMap<>(1024, 0.75f, 32);
1395
1396    // if this method looks similar to 'getTableFragmentation' that is because
1397    // it was borrowed from it.
1398
1399    // only include the directory paths to tables
1400    for (Path tableDir : FSUtils.getTableDirs(fs, hbaseRootDir)) {
1401      getTableStoreFilePathMap(map, fs, hbaseRootDir,
1402          FSUtils.getTableName(tableDir), sfFilter, executor, errors);
1403    }
1404    return map;
1405  }
1406
1407  /**
1408   * Filters FileStatuses in an array and returns a list
1409   *
1410   * @param input   An array of FileStatuses
1411   * @param filter  A required filter to filter the array
1412   * @return        A list of FileStatuses
1413   */
1414  public static List<FileStatus> filterFileStatuses(FileStatus[] input,
1415      FileStatusFilter filter) {
1416    if (input == null) return null;
1417    return filterFileStatuses(Iterators.forArray(input), filter);
1418  }
1419
1420  /**
1421   * Filters FileStatuses in an iterator and returns a list
1422   *
1423   * @param input   An iterator of FileStatuses
1424   * @param filter  A required filter to filter the array
1425   * @return        A list of FileStatuses
1426   */
1427  public static List<FileStatus> filterFileStatuses(Iterator<FileStatus> input,
1428      FileStatusFilter filter) {
1429    if (input == null) return null;
1430    ArrayList<FileStatus> results = new ArrayList<>();
1431    while (input.hasNext()) {
1432      FileStatus f = input.next();
1433      if (filter.accept(f)) {
1434        results.add(f);
1435      }
1436    }
1437    return results;
1438  }
1439
1440  /**
1441   * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
1442   * This accommodates differences between hadoop versions, where hadoop 1
1443   * does not throw a FileNotFoundException, and return an empty FileStatus[]
1444   * while Hadoop 2 will throw FileNotFoundException.
1445   *
1446   * @param fs file system
1447   * @param dir directory
1448   * @param filter file status filter
1449   * @return null if dir is empty or doesn't exist, otherwise FileStatus list
1450   */
1451  public static List<FileStatus> listStatusWithStatusFilter(final FileSystem fs,
1452      final Path dir, final FileStatusFilter filter) throws IOException {
1453    FileStatus [] status = null;
1454    try {
1455      status = fs.listStatus(dir);
1456    } catch (FileNotFoundException fnfe) {
1457      // if directory doesn't exist, return null
1458      if (LOG.isTraceEnabled()) {
1459        LOG.trace(dir + " doesn't exist");
1460      }
1461    }
1462
1463    if (status == null || status.length < 1)  {
1464      return null;
1465    }
1466
1467    if (filter == null) {
1468      return Arrays.asList(status);
1469    } else {
1470      List<FileStatus> status2 = filterFileStatuses(status, filter);
1471      if (status2 == null || status2.isEmpty()) {
1472        return null;
1473      } else {
1474        return status2;
1475      }
1476    }
1477  }
1478
1479  /**
1480   * Throw an exception if an action is not permitted by a user on a file.
1481   *
1482   * @param ugi
1483   *          the user
1484   * @param file
1485   *          the file
1486   * @param action
1487   *          the action
1488   */
1489  public static void checkAccess(UserGroupInformation ugi, FileStatus file,
1490      FsAction action) throws AccessDeniedException {
1491    if (ugi.getShortUserName().equals(file.getOwner())) {
1492      if (file.getPermission().getUserAction().implies(action)) {
1493        return;
1494      }
1495    } else if (contains(ugi.getGroupNames(), file.getGroup())) {
1496      if (file.getPermission().getGroupAction().implies(action)) {
1497        return;
1498      }
1499    } else if (file.getPermission().getOtherAction().implies(action)) {
1500      return;
1501    }
1502    throw new AccessDeniedException("Permission denied:" + " action=" + action
1503        + " path=" + file.getPath() + " user=" + ugi.getShortUserName());
1504  }
1505
1506  private static boolean contains(String[] groups, String user) {
1507    for (String group : groups) {
1508      if (group.equals(user)) {
1509        return true;
1510      }
1511    }
1512    return false;
1513  }
1514
1515  /**
1516   * This function is to scan the root path of the file system to get the
1517   * degree of locality for each region on each of the servers having at least
1518   * one block of that region.
1519   * This is used by the tool {@link org.apache.hadoop.hbase.master.RegionPlacementMaintainer}
1520   *
1521   * @param conf
1522   *          the configuration to use
1523   * @return the mapping from region encoded name to a map of server names to
1524   *           locality fraction
1525   * @throws IOException
1526   *           in case of file system errors or interrupts
1527   */
1528  public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
1529      final Configuration conf) throws IOException {
1530    return getRegionDegreeLocalityMappingFromFS(
1531        conf, null,
1532        conf.getInt(THREAD_POOLSIZE, DEFAULT_THREAD_POOLSIZE));
1533
1534  }
1535
1536  /**
1537   * This function is to scan the root path of the file system to get the
1538   * degree of locality for each region on each of the servers having at least
1539   * one block of that region.
1540   *
1541   * @param conf
1542   *          the configuration to use
1543   * @param desiredTable
1544   *          the table you wish to scan locality for
1545   * @param threadPoolSize
1546   *          the thread pool size to use
1547   * @return the mapping from region encoded name to a map of server names to
1548   *           locality fraction
1549   * @throws IOException
1550   *           in case of file system errors or interrupts
1551   */
1552  public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
1553      final Configuration conf, final String desiredTable, int threadPoolSize)
1554      throws IOException {
1555    Map<String, Map<String, Float>> regionDegreeLocalityMapping = new ConcurrentHashMap<>();
1556    getRegionLocalityMappingFromFS(conf, desiredTable, threadPoolSize, null,
1557        regionDegreeLocalityMapping);
1558    return regionDegreeLocalityMapping;
1559  }
1560
1561  /**
1562   * This function is to scan the root path of the file system to get either the
1563   * mapping between the region name and its best locality region server or the
1564   * degree of locality of each region on each of the servers having at least
1565   * one block of that region. The output map parameters are both optional.
1566   *
1567   * @param conf
1568   *          the configuration to use
1569   * @param desiredTable
1570   *          the table you wish to scan locality for
1571   * @param threadPoolSize
1572   *          the thread pool size to use
1573   * @param regionToBestLocalityRSMapping
1574   *          the map into which to put the best locality mapping or null
1575   * @param regionDegreeLocalityMapping
1576   *          the map into which to put the locality degree mapping or null,
1577   *          must be a thread-safe implementation
1578   * @throws IOException
1579   *           in case of file system errors or interrupts
1580   */
1581  private static void getRegionLocalityMappingFromFS(
1582      final Configuration conf, final String desiredTable,
1583      int threadPoolSize,
1584      Map<String, String> regionToBestLocalityRSMapping,
1585      Map<String, Map<String, Float>> regionDegreeLocalityMapping)
1586      throws IOException {
1587    FileSystem fs =  FileSystem.get(conf);
1588    Path rootPath = FSUtils.getRootDir(conf);
1589    long startTime = EnvironmentEdgeManager.currentTime();
1590    Path queryPath;
1591    // The table files are in ${hbase.rootdir}/data/<namespace>/<table>/*
1592    if (null == desiredTable) {
1593      queryPath = new Path(new Path(rootPath, HConstants.BASE_NAMESPACE_DIR).toString() + "/*/*/*/");
1594    } else {
1595      queryPath = new Path(FSUtils.getTableDir(rootPath, TableName.valueOf(desiredTable)).toString() + "/*/");
1596    }
1597
1598    // reject all paths that are not appropriate
1599    PathFilter pathFilter = new PathFilter() {
1600      @Override
1601      public boolean accept(Path path) {
1602        // this is the region name; it may get some noise data
1603        if (null == path) {
1604          return false;
1605        }
1606
1607        // no parent?
1608        Path parent = path.getParent();
1609        if (null == parent) {
1610          return false;
1611        }
1612
1613        String regionName = path.getName();
1614        if (null == regionName) {
1615          return false;
1616        }
1617
1618        if (!regionName.toLowerCase(Locale.ROOT).matches("[0-9a-f]+")) {
1619          return false;
1620        }
1621        return true;
1622      }
1623    };
1624
1625    FileStatus[] statusList = fs.globStatus(queryPath, pathFilter);
1626
1627    if (null == statusList) {
1628      return;
1629    } else {
1630      LOG.debug("Query Path: " + queryPath + " ; # list of files: " +
1631          statusList.length);
1632    }
1633
1634    // lower the number of threads in case we have very few expected regions
1635    threadPoolSize = Math.min(threadPoolSize, statusList.length);
1636
1637    // run in multiple threads
1638    ThreadPoolExecutor tpe = new ThreadPoolExecutor(threadPoolSize,
1639        threadPoolSize, 60, TimeUnit.SECONDS,
1640        new ArrayBlockingQueue<>(statusList.length));
1641    try {
1642      // ignore all file status items that are not of interest
1643      for (FileStatus regionStatus : statusList) {
1644        if (null == regionStatus) {
1645          continue;
1646        }
1647
1648        if (!regionStatus.isDirectory()) {
1649          continue;
1650        }
1651
1652        Path regionPath = regionStatus.getPath();
1653        if (null == regionPath) {
1654          continue;
1655        }
1656
1657        tpe.execute(new FSRegionScanner(fs, regionPath,
1658            regionToBestLocalityRSMapping, regionDegreeLocalityMapping));
1659      }
1660    } finally {
1661      tpe.shutdown();
1662      int threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY,
1663          60 * 1000);
1664      try {
1665        // here we wait until TPE terminates, which is either naturally or by
1666        // exceptions in the execution of the threads
1667        while (!tpe.awaitTermination(threadWakeFrequency,
1668            TimeUnit.MILLISECONDS)) {
1669          // printing out rough estimate, so as to not introduce
1670          // AtomicInteger
1671          LOG.info("Locality checking is underway: { Scanned Regions : "
1672              + tpe.getCompletedTaskCount() + "/"
1673              + tpe.getTaskCount() + " }");
1674        }
1675      } catch (InterruptedException e) {
1676        throw (InterruptedIOException)new InterruptedIOException().initCause(e);
1677      }
1678    }
1679
1680    long overhead = EnvironmentEdgeManager.currentTime() - startTime;
1681    String overheadMsg = "Scan DFS for locality info takes " + overhead + " ms";
1682
1683    LOG.info(overheadMsg);
1684  }
1685
1686  /**
1687   * Do our short circuit read setup.
1688   * Checks buffer size to use and whether to do checksumming in hbase or hdfs.
1689   * @param conf
1690   */
1691  public static void setupShortCircuitRead(final Configuration conf) {
1692    // Check that the user has not set the "dfs.client.read.shortcircuit.skip.checksum" property.
1693    boolean shortCircuitSkipChecksum =
1694      conf.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false);
1695    boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
1696    if (shortCircuitSkipChecksum) {
1697      LOG.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not " +
1698        "be set to true." + (useHBaseChecksum ? " HBase checksum doesn't require " +
1699        "it, see https://issues.apache.org/jira/browse/HBASE-6868." : ""));
1700      assert !shortCircuitSkipChecksum; //this will fail if assertions are on
1701    }
1702    checkShortCircuitReadBufferSize(conf);
1703  }
1704
1705  /**
1706   * Check if short circuit read buffer size is set and if not, set it to hbase value.
1707   * @param conf
1708   */
1709  public static void checkShortCircuitReadBufferSize(final Configuration conf) {
1710    final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2;
1711    final int notSet = -1;
1712    // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2
1713    final String dfsKey = "dfs.client.read.shortcircuit.buffer.size";
1714    int size = conf.getInt(dfsKey, notSet);
1715    // If a size is set, return -- we will use it.
1716    if (size != notSet) return;
1717    // But short circuit buffer size is normally not set.  Put in place the hbase wanted size.
1718    int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize);
1719    conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
1720  }
1721
1722  /**
1723   * @param c
1724   * @return The DFSClient DFSHedgedReadMetrics instance or null if can't be found or not on hdfs.
1725   * @throws IOException
1726   */
1727  public static DFSHedgedReadMetrics getDFSHedgedReadMetrics(final Configuration c)
1728      throws IOException {
1729    if (!isHDFS(c)) return null;
1730    // getHedgedReadMetrics is package private. Get the DFSClient instance that is internal
1731    // to the DFS FS instance and make the method getHedgedReadMetrics accessible, then invoke it
1732    // to get the singleton instance of DFSHedgedReadMetrics shared by DFSClients.
1733    final String name = "getHedgedReadMetrics";
1734    DFSClient dfsclient = ((DistributedFileSystem)FileSystem.get(c)).getClient();
1735    Method m;
1736    try {
1737      m = dfsclient.getClass().getDeclaredMethod(name);
1738    } catch (NoSuchMethodException e) {
1739      LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " +
1740          e.getMessage());
1741      return null;
1742    } catch (SecurityException e) {
1743      LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " +
1744          e.getMessage());
1745      return null;
1746    }
1747    m.setAccessible(true);
1748    try {
1749      return (DFSHedgedReadMetrics)m.invoke(dfsclient);
1750    } catch (IllegalAccessException e) {
1751      LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " +
1752          e.getMessage());
1753      return null;
1754    } catch (IllegalArgumentException e) {
1755      LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " +
1756          e.getMessage());
1757      return null;
1758    } catch (InvocationTargetException e) {
1759      LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " +
1760          e.getMessage());
1761      return null;
1762    }
1763  }
1764
1765  public static List<Path> copyFilesParallel(FileSystem srcFS, Path src, FileSystem dstFS, Path dst,
1766      Configuration conf, int threads) throws IOException {
1767    ExecutorService pool = Executors.newFixedThreadPool(threads);
1768    List<Future<Void>> futures = new ArrayList<>();
1769    List<Path> traversedPaths;
1770    try {
1771      traversedPaths = copyFiles(srcFS, src, dstFS, dst, conf, pool, futures);
1772      for (Future<Void> future : futures) {
1773        future.get();
1774      }
1775    } catch (ExecutionException | InterruptedException | IOException e) {
1776      throw new IOException("copy snapshot reference files failed", e);
1777    } finally {
1778      pool.shutdownNow();
1779    }
1780    return traversedPaths;
1781  }
1782
1783  private static List<Path> copyFiles(FileSystem srcFS, Path src, FileSystem dstFS, Path dst,
1784      Configuration conf, ExecutorService pool, List<Future<Void>> futures) throws IOException {
1785    List<Path> traversedPaths = new ArrayList<>();
1786    traversedPaths.add(dst);
1787    FileStatus currentFileStatus = srcFS.getFileStatus(src);
1788    if (currentFileStatus.isDirectory()) {
1789      if (!dstFS.mkdirs(dst)) {
1790        throw new IOException("create dir failed: " + dst);
1791      }
1792      FileStatus[] subPaths = srcFS.listStatus(src);
1793      for (FileStatus subPath : subPaths) {
1794        traversedPaths.addAll(copyFiles(srcFS, subPath.getPath(), dstFS,
1795          new Path(dst, subPath.getPath().getName()), conf, pool, futures));
1796      }
1797    } else {
1798      Future<Void> future = pool.submit(() -> {
1799        FileUtil.copy(srcFS, src, dstFS, dst, false, false, conf);
1800        return null;
1801      });
1802      futures.add(future);
1803    }
1804    return traversedPaths;
1805  }
1806}