001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.util;
020
021import edu.umd.cs.findbugs.annotations.CheckForNull;
022import java.io.ByteArrayInputStream;
023import java.io.DataInputStream;
024import java.io.EOFException;
025import java.io.FileNotFoundException;
026import java.io.IOException;
027import java.io.InputStream;
028import java.io.InterruptedIOException;
029import java.lang.reflect.InvocationTargetException;
030import java.lang.reflect.Method;
031import java.net.InetSocketAddress;
032import java.util.ArrayList;
033import java.util.Arrays;
034import java.util.Collections;
035import java.util.HashMap;
036import java.util.Iterator;
037import java.util.LinkedList;
038import java.util.List;
039import java.util.Locale;
040import java.util.Map;
041import java.util.Vector;
042import java.util.concurrent.ArrayBlockingQueue;
043import java.util.concurrent.ConcurrentHashMap;
044import java.util.concurrent.ExecutionException;
045import java.util.concurrent.ExecutorService;
046import java.util.concurrent.Future;
047import java.util.concurrent.FutureTask;
048import java.util.concurrent.ThreadPoolExecutor;
049import java.util.concurrent.TimeUnit;
050import java.util.regex.Pattern;
051
052import org.apache.hadoop.conf.Configuration;
053import org.apache.hadoop.fs.BlockLocation;
054import org.apache.hadoop.fs.FSDataInputStream;
055import org.apache.hadoop.fs.FSDataOutputStream;
056import org.apache.hadoop.fs.FileStatus;
057import org.apache.hadoop.fs.FileSystem;
058import org.apache.hadoop.fs.Path;
059import org.apache.hadoop.fs.PathFilter;
060import org.apache.hadoop.fs.permission.FsAction;
061import org.apache.hadoop.fs.permission.FsPermission;
062import org.apache.hadoop.hbase.ClusterId;
063import org.apache.hadoop.hbase.HColumnDescriptor;
064import org.apache.hadoop.hbase.HConstants;
065import org.apache.hadoop.hbase.HDFSBlocksDistribution;
066import org.apache.hadoop.hbase.HRegionInfo;
067import org.apache.hadoop.hbase.TableName;
068import org.apache.hadoop.hbase.client.RegionInfo;
069import org.apache.hadoop.hbase.client.RegionInfoBuilder;
070import org.apache.hadoop.hbase.exceptions.DeserializationException;
071import org.apache.hadoop.hbase.fs.HFileSystem;
072import org.apache.hadoop.hbase.io.HFileLink;
073import org.apache.hadoop.hbase.master.HMaster;
074import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
075import org.apache.hadoop.hbase.security.AccessDeniedException;
076import org.apache.hadoop.hdfs.DFSClient;
077import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
078import org.apache.hadoop.hdfs.DistributedFileSystem;
079import org.apache.hadoop.hdfs.protocol.HdfsConstants;
080import org.apache.hadoop.io.IOUtils;
081import org.apache.hadoop.ipc.RemoteException;
082import org.apache.hadoop.security.UserGroupInformation;
083import org.apache.hadoop.util.Progressable;
084import org.apache.hadoop.util.ReflectionUtils;
085import org.apache.hadoop.util.StringUtils;
086import org.apache.yetus.audience.InterfaceAudience;
087import org.slf4j.Logger;
088import org.slf4j.LoggerFactory;
089
090import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
091import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
092import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
093import org.apache.hbase.thirdparty.com.google.common.primitives.Ints;
094
095import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
096import org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos;
097
098/**
099 * Utility methods for interacting with the underlying file system.
100 */
101@InterfaceAudience.Private
102public abstract class FSUtils extends CommonFSUtils {
103  private static final Logger LOG = LoggerFactory.getLogger(FSUtils.class);
104
105  private static final String THREAD_POOLSIZE = "hbase.client.localityCheck.threadPoolSize";
106  private static final int DEFAULT_THREAD_POOLSIZE = 2;
107
108  /** Set to true on Windows platforms */
109  @VisibleForTesting // currently only used in testing. TODO refactor into a test class
110  public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
111
112  protected FSUtils() {
113    super();
114  }
115
116  /**
117   * @return True is <code>fs</code> is instance of DistributedFileSystem
118   * @throws IOException
119   */
120  public static boolean isDistributedFileSystem(final FileSystem fs) throws IOException {
121    FileSystem fileSystem = fs;
122    // If passed an instance of HFileSystem, it fails instanceof DistributedFileSystem.
123    // Check its backing fs for dfs-ness.
124    if (fs instanceof HFileSystem) {
125      fileSystem = ((HFileSystem)fs).getBackingFs();
126    }
127    return fileSystem instanceof DistributedFileSystem;
128  }
129
130  /**
131   * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
132   * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true.  Does not consider
133   * schema; i.e. if schemas different but path or subpath matches, the two will equate.
134   * @param pathToSearch Path we will be trying to match.
135   * @param pathTail
136   * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
137   */
138  public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) {
139    if (pathToSearch.depth() != pathTail.depth()) return false;
140    Path tailPath = pathTail;
141    String tailName;
142    Path toSearch = pathToSearch;
143    String toSearchName;
144    boolean result = false;
145    do {
146      tailName = tailPath.getName();
147      if (tailName == null || tailName.length() <= 0) {
148        result = true;
149        break;
150      }
151      toSearchName = toSearch.getName();
152      if (toSearchName == null || toSearchName.length() <= 0) break;
153      // Move up a parent on each path for next go around.  Path doesn't let us go off the end.
154      tailPath = tailPath.getParent();
155      toSearch = toSearch.getParent();
156    } while(tailName.equals(toSearchName));
157    return result;
158  }
159
160  public static FSUtils getInstance(FileSystem fs, Configuration conf) {
161    String scheme = fs.getUri().getScheme();
162    if (scheme == null) {
163      LOG.warn("Could not find scheme for uri " +
164          fs.getUri() + ", default to hdfs");
165      scheme = "hdfs";
166    }
167    Class<?> fsUtilsClass = conf.getClass("hbase.fsutil." +
168        scheme + ".impl", FSHDFSUtils.class); // Default to HDFS impl
169    FSUtils fsUtils = (FSUtils)ReflectionUtils.newInstance(fsUtilsClass, conf);
170    return fsUtils;
171  }
172
173  /**
174   * Delete the region directory if exists.
175   * @param conf
176   * @param hri
177   * @return True if deleted the region directory.
178   * @throws IOException
179   */
180  public static boolean deleteRegionDir(final Configuration conf, final HRegionInfo hri)
181  throws IOException {
182    Path rootDir = getRootDir(conf);
183    FileSystem fs = rootDir.getFileSystem(conf);
184    return deleteDirectory(fs,
185      new Path(getTableDir(rootDir, hri.getTable()), hri.getEncodedName()));
186  }
187
188 /**
189   * Create the specified file on the filesystem. By default, this will:
190   * <ol>
191   * <li>overwrite the file if it exists</li>
192   * <li>apply the umask in the configuration (if it is enabled)</li>
193   * <li>use the fs configured buffer size (or 4096 if not set)</li>
194   * <li>use the configured column family replication or default replication if
195   * {@link HColumnDescriptor#DEFAULT_DFS_REPLICATION}</li>
196   * <li>use the default block size</li>
197   * <li>not track progress</li>
198   * </ol>
199   * @param conf configurations
200   * @param fs {@link FileSystem} on which to write the file
201   * @param path {@link Path} to the file to write
202   * @param perm permissions
203   * @param favoredNodes
204   * @return output stream to the created file
205   * @throws IOException if the file cannot be created
206   */
207  public static FSDataOutputStream create(Configuration conf, FileSystem fs, Path path,
208      FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException {
209    if (fs instanceof HFileSystem) {
210      FileSystem backingFs = ((HFileSystem)fs).getBackingFs();
211      if (backingFs instanceof DistributedFileSystem) {
212        // Try to use the favoredNodes version via reflection to allow backwards-
213        // compatibility.
214        short replication = Short.parseShort(conf.get(HColumnDescriptor.DFS_REPLICATION,
215          String.valueOf(HColumnDescriptor.DEFAULT_DFS_REPLICATION)));
216        try {
217          return (FSDataOutputStream) (DistributedFileSystem.class.getDeclaredMethod("create",
218            Path.class, FsPermission.class, boolean.class, int.class, short.class, long.class,
219            Progressable.class, InetSocketAddress[].class).invoke(backingFs, path, perm, true,
220            getDefaultBufferSize(backingFs),
221            replication > 0 ? replication : getDefaultReplication(backingFs, path),
222            getDefaultBlockSize(backingFs, path), null, favoredNodes));
223        } catch (InvocationTargetException ite) {
224          // Function was properly called, but threw it's own exception.
225          throw new IOException(ite.getCause());
226        } catch (NoSuchMethodException e) {
227          LOG.debug("DFS Client does not support most favored nodes create; using default create");
228          if (LOG.isTraceEnabled()) LOG.trace("Ignoring; use default create", e);
229        } catch (IllegalArgumentException e) {
230          LOG.debug("Ignoring (most likely Reflection related exception) " + e);
231        } catch (SecurityException e) {
232          LOG.debug("Ignoring (most likely Reflection related exception) " + e);
233        } catch (IllegalAccessException e) {
234          LOG.debug("Ignoring (most likely Reflection related exception) " + e);
235        }
236      }
237    }
238    return create(fs, path, perm, true);
239  }
240
241  /**
242   * Checks to see if the specified file system is available
243   *
244   * @param fs filesystem
245   * @throws IOException e
246   */
247  public static void checkFileSystemAvailable(final FileSystem fs)
248  throws IOException {
249    if (!(fs instanceof DistributedFileSystem)) {
250      return;
251    }
252    IOException exception = null;
253    DistributedFileSystem dfs = (DistributedFileSystem) fs;
254    try {
255      if (dfs.exists(new Path("/"))) {
256        return;
257      }
258    } catch (IOException e) {
259      exception = e instanceof RemoteException ?
260              ((RemoteException)e).unwrapRemoteException() : e;
261    }
262    try {
263      fs.close();
264    } catch (Exception e) {
265      LOG.error("file system close failed: ", e);
266    }
267    IOException io = new IOException("File system is not available");
268    io.initCause(exception);
269    throw io;
270  }
271
272  /**
273   * We use reflection because {@link DistributedFileSystem#setSafeMode(
274   * HdfsConstants.SafeModeAction action, boolean isChecked)} is not in hadoop 1.1
275   *
276   * @param dfs
277   * @return whether we're in safe mode
278   * @throws IOException
279   */
280  private static boolean isInSafeMode(DistributedFileSystem dfs) throws IOException {
281    boolean inSafeMode = false;
282    try {
283      Method m = DistributedFileSystem.class.getMethod("setSafeMode", new Class<?> []{
284          org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.class, boolean.class});
285      inSafeMode = (Boolean) m.invoke(dfs,
286        org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET, true);
287    } catch (Exception e) {
288      if (e instanceof IOException) throw (IOException) e;
289
290      // Check whether dfs is on safemode.
291      inSafeMode = dfs.setSafeMode(
292        org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET);
293    }
294    return inSafeMode;
295  }
296
297  /**
298   * Check whether dfs is in safemode.
299   * @param conf
300   * @throws IOException
301   */
302  public static void checkDfsSafeMode(final Configuration conf)
303  throws IOException {
304    boolean isInSafeMode = false;
305    FileSystem fs = FileSystem.get(conf);
306    if (fs instanceof DistributedFileSystem) {
307      DistributedFileSystem dfs = (DistributedFileSystem)fs;
308      isInSafeMode = isInSafeMode(dfs);
309    }
310    if (isInSafeMode) {
311      throw new IOException("File system is in safemode, it can't be written now");
312    }
313  }
314
315  /**
316   * Verifies current version of file system
317   *
318   * @param fs filesystem object
319   * @param rootdir root hbase directory
320   * @return null if no version file exists, version string otherwise.
321   * @throws IOException e
322   * @throws org.apache.hadoop.hbase.exceptions.DeserializationException
323   */
324  public static String getVersion(FileSystem fs, Path rootdir)
325  throws IOException, DeserializationException {
326    Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
327    FileStatus[] status = null;
328    try {
329      // hadoop 2.0 throws FNFE if directory does not exist.
330      // hadoop 1.0 returns null if directory does not exist.
331      status = fs.listStatus(versionFile);
332    } catch (FileNotFoundException fnfe) {
333      return null;
334    }
335    if (status == null || status.length == 0) return null;
336    String version = null;
337    byte [] content = new byte [(int)status[0].getLen()];
338    FSDataInputStream s = fs.open(versionFile);
339    try {
340      IOUtils.readFully(s, content, 0, content.length);
341      if (ProtobufUtil.isPBMagicPrefix(content)) {
342        version = parseVersionFrom(content);
343      } else {
344        // Presume it pre-pb format.
345        InputStream is = new ByteArrayInputStream(content);
346        DataInputStream dis = new DataInputStream(is);
347        try {
348          version = dis.readUTF();
349        } finally {
350          dis.close();
351        }
352      }
353    } catch (EOFException eof) {
354      LOG.warn("Version file was empty, odd, will try to set it.");
355    } finally {
356      s.close();
357    }
358    return version;
359  }
360
361  /**
362   * Parse the content of the ${HBASE_ROOTDIR}/hbase.version file.
363   * @param bytes The byte content of the hbase.version file.
364   * @return The version found in the file as a String.
365   * @throws DeserializationException
366   */
367  static String parseVersionFrom(final byte [] bytes)
368  throws DeserializationException {
369    ProtobufUtil.expectPBMagicPrefix(bytes);
370    int pblen = ProtobufUtil.lengthOfPBMagic();
371    FSProtos.HBaseVersionFileContent.Builder builder =
372      FSProtos.HBaseVersionFileContent.newBuilder();
373    try {
374      ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
375      return builder.getVersion();
376    } catch (IOException e) {
377      // Convert
378      throw new DeserializationException(e);
379    }
380  }
381
382  /**
383   * Create the content to write into the ${HBASE_ROOTDIR}/hbase.version file.
384   * @param version Version to persist
385   * @return Serialized protobuf with <code>version</code> content and a bit of pb magic for a prefix.
386   */
387  static byte [] toVersionByteArray(final String version) {
388    FSProtos.HBaseVersionFileContent.Builder builder =
389      FSProtos.HBaseVersionFileContent.newBuilder();
390    return ProtobufUtil.prependPBMagic(builder.setVersion(version).build().toByteArray());
391  }
392
393  /**
394   * Verifies current version of file system
395   *
396   * @param fs file system
397   * @param rootdir root directory of HBase installation
398   * @param message if true, issues a message on System.out
399   *
400   * @throws IOException e
401   * @throws DeserializationException
402   */
403  public static void checkVersion(FileSystem fs, Path rootdir, boolean message)
404  throws IOException, DeserializationException {
405    checkVersion(fs, rootdir, message, 0, HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
406  }
407
408  /**
409   * Verifies current version of file system
410   *
411   * @param fs file system
412   * @param rootdir root directory of HBase installation
413   * @param message if true, issues a message on System.out
414   * @param wait wait interval
415   * @param retries number of times to retry
416   *
417   * @throws IOException e
418   * @throws DeserializationException
419   */
420  public static void checkVersion(FileSystem fs, Path rootdir,
421      boolean message, int wait, int retries)
422  throws IOException, DeserializationException {
423    String version = getVersion(fs, rootdir);
424    String msg;
425    if (version == null) {
426      if (!metaRegionExists(fs, rootdir)) {
427        // rootDir is empty (no version file and no root region)
428        // just create new version file (HBASE-1195)
429        setVersion(fs, rootdir, wait, retries);
430        return;
431      } else {
432        msg = "hbase.version file is missing. Is your hbase.rootdir valid? " +
433            "You can restore hbase.version file by running 'HBCK2 filesystem -fix'. " +
434            "See https://github.com/apache/hbase-operator-tools/tree/master/hbase-hbck2";
435      }
436    } else if (version.compareTo(HConstants.FILE_SYSTEM_VERSION) == 0) {
437      return;
438    } else {
439      msg = "HBase file layout needs to be upgraded. Current filesystem version is " + version +
440          " but software requires version " + HConstants.FILE_SYSTEM_VERSION +
441          ". Consult http://hbase.apache.org/book.html for further information about " +
442          "upgrading HBase.";
443    }
444
445    // version is deprecated require migration
446    // Output on stdout so user sees it in terminal.
447    if (message) {
448      System.out.println("WARNING! " + msg);
449    }
450    throw new FileSystemVersionException(msg);
451  }
452
453  /**
454   * Sets version of file system
455   *
456   * @param fs filesystem object
457   * @param rootdir hbase root
458   * @throws IOException e
459   */
460  public static void setVersion(FileSystem fs, Path rootdir)
461  throws IOException {
462    setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, 0,
463      HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
464  }
465
466  /**
467   * Sets version of file system
468   *
469   * @param fs filesystem object
470   * @param rootdir hbase root
471   * @param wait time to wait for retry
472   * @param retries number of times to retry before failing
473   * @throws IOException e
474   */
475  public static void setVersion(FileSystem fs, Path rootdir, int wait, int retries)
476  throws IOException {
477    setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, wait, retries);
478  }
479
480
481  /**
482   * Sets version of file system
483   *
484   * @param fs filesystem object
485   * @param rootdir hbase root directory
486   * @param version version to set
487   * @param wait time to wait for retry
488   * @param retries number of times to retry before throwing an IOException
489   * @throws IOException e
490   */
491  public static void setVersion(FileSystem fs, Path rootdir, String version,
492      int wait, int retries) throws IOException {
493    Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
494    Path tempVersionFile = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY + Path.SEPARATOR +
495      HConstants.VERSION_FILE_NAME);
496    while (true) {
497      try {
498        // Write the version to a temporary file
499        FSDataOutputStream s = fs.create(tempVersionFile);
500        try {
501          s.write(toVersionByteArray(version));
502          s.close();
503          s = null;
504          // Move the temp version file to its normal location. Returns false
505          // if the rename failed. Throw an IOE in that case.
506          if (!fs.rename(tempVersionFile, versionFile)) {
507            throw new IOException("Unable to move temp version file to " + versionFile);
508          }
509        } finally {
510          // Cleaning up the temporary if the rename failed would be trying
511          // too hard. We'll unconditionally create it again the next time
512          // through anyway, files are overwritten by default by create().
513
514          // Attempt to close the stream on the way out if it is still open.
515          try {
516            if (s != null) s.close();
517          } catch (IOException ignore) { }
518        }
519        LOG.info("Created version file at " + rootdir.toString() + " with version=" + version);
520        return;
521      } catch (IOException e) {
522        if (retries > 0) {
523          LOG.debug("Unable to create version file at " + rootdir.toString() + ", retrying", e);
524          fs.delete(versionFile, false);
525          try {
526            if (wait > 0) {
527              Thread.sleep(wait);
528            }
529          } catch (InterruptedException ie) {
530            throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
531          }
532          retries--;
533        } else {
534          throw e;
535        }
536      }
537    }
538  }
539
540  /**
541   * Checks that a cluster ID file exists in the HBase root directory
542   * @param fs the root directory FileSystem
543   * @param rootdir the HBase root directory in HDFS
544   * @param wait how long to wait between retries
545   * @return <code>true</code> if the file exists, otherwise <code>false</code>
546   * @throws IOException if checking the FileSystem fails
547   */
548  public static boolean checkClusterIdExists(FileSystem fs, Path rootdir,
549      int wait) throws IOException {
550    while (true) {
551      try {
552        Path filePath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
553        return fs.exists(filePath);
554      } catch (IOException ioe) {
555        if (wait > 0) {
556          LOG.warn("Unable to check cluster ID file in " + rootdir.toString() +
557              ", retrying in "+wait+"msec: "+StringUtils.stringifyException(ioe));
558          try {
559            Thread.sleep(wait);
560          } catch (InterruptedException e) {
561            throw (InterruptedIOException)new InterruptedIOException().initCause(e);
562          }
563        } else {
564          throw ioe;
565        }
566      }
567    }
568  }
569
570  /**
571   * Returns the value of the unique cluster ID stored for this HBase instance.
572   * @param fs the root directory FileSystem
573   * @param rootdir the path to the HBase root directory
574   * @return the unique cluster identifier
575   * @throws IOException if reading the cluster ID file fails
576   */
577  public static ClusterId getClusterId(FileSystem fs, Path rootdir)
578  throws IOException {
579    Path idPath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
580    ClusterId clusterId = null;
581    FileStatus status = fs.exists(idPath)? fs.getFileStatus(idPath):  null;
582    if (status != null) {
583      int len = Ints.checkedCast(status.getLen());
584      byte [] content = new byte[len];
585      FSDataInputStream in = fs.open(idPath);
586      try {
587        in.readFully(content);
588      } catch (EOFException eof) {
589        LOG.warn("Cluster ID file " + idPath.toString() + " was empty");
590      } finally{
591        in.close();
592      }
593      try {
594        clusterId = ClusterId.parseFrom(content);
595      } catch (DeserializationException e) {
596        throw new IOException("content=" + Bytes.toString(content), e);
597      }
598      // If not pb'd, make it so.
599      if (!ProtobufUtil.isPBMagicPrefix(content)) {
600        String cid = null;
601        in = fs.open(idPath);
602        try {
603          cid = in.readUTF();
604          clusterId = new ClusterId(cid);
605        } catch (EOFException eof) {
606          LOG.warn("Cluster ID file " + idPath.toString() + " was empty");
607        } finally {
608          in.close();
609        }
610        rewriteAsPb(fs, rootdir, idPath, clusterId);
611      }
612      return clusterId;
613    } else {
614      LOG.warn("Cluster ID file does not exist at " + idPath.toString());
615    }
616    return clusterId;
617  }
618
619  /**
620   * @param cid
621   * @throws IOException
622   */
623  private static void rewriteAsPb(final FileSystem fs, final Path rootdir, final Path p,
624      final ClusterId cid)
625  throws IOException {
626    // Rewrite the file as pb.  Move aside the old one first, write new
627    // then delete the moved-aside file.
628    Path movedAsideName = new Path(p + "." + System.currentTimeMillis());
629    if (!fs.rename(p, movedAsideName)) throw new IOException("Failed rename of " + p);
630    setClusterId(fs, rootdir, cid, 100);
631    if (!fs.delete(movedAsideName, false)) {
632      throw new IOException("Failed delete of " + movedAsideName);
633    }
634    LOG.debug("Rewrote the hbase.id file as pb");
635  }
636
637  /**
638   * Writes a new unique identifier for this cluster to the "hbase.id" file
639   * in the HBase root directory
640   * @param fs the root directory FileSystem
641   * @param rootdir the path to the HBase root directory
642   * @param clusterId the unique identifier to store
643   * @param wait how long (in milliseconds) to wait between retries
644   * @throws IOException if writing to the FileSystem fails and no wait value
645   */
646  public static void setClusterId(FileSystem fs, Path rootdir, ClusterId clusterId,
647      int wait) throws IOException {
648    while (true) {
649      try {
650        Path idFile = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
651        Path tempIdFile = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY +
652          Path.SEPARATOR + HConstants.CLUSTER_ID_FILE_NAME);
653        // Write the id file to a temporary location
654        FSDataOutputStream s = fs.create(tempIdFile);
655        try {
656          s.write(clusterId.toByteArray());
657          s.close();
658          s = null;
659          // Move the temporary file to its normal location. Throw an IOE if
660          // the rename failed
661          if (!fs.rename(tempIdFile, idFile)) {
662            throw new IOException("Unable to move temp version file to " + idFile);
663          }
664        } finally {
665          // Attempt to close the stream if still open on the way out
666          try {
667            if (s != null) s.close();
668          } catch (IOException ignore) { }
669        }
670        if (LOG.isDebugEnabled()) {
671          LOG.debug("Created cluster ID file at " + idFile.toString() + " with ID: " + clusterId);
672        }
673        return;
674      } catch (IOException ioe) {
675        if (wait > 0) {
676          LOG.warn("Unable to create cluster ID file in " + rootdir.toString() +
677              ", retrying in " + wait + "msec: " + StringUtils.stringifyException(ioe));
678          try {
679            Thread.sleep(wait);
680          } catch (InterruptedException e) {
681            throw (InterruptedIOException)new InterruptedIOException().initCause(e);
682          }
683        } else {
684          throw ioe;
685        }
686      }
687    }
688  }
689
690  /**
691   * If DFS, check safe mode and if so, wait until we clear it.
692   * @param conf configuration
693   * @param wait Sleep between retries
694   * @throws IOException e
695   */
696  public static void waitOnSafeMode(final Configuration conf,
697    final long wait)
698  throws IOException {
699    FileSystem fs = FileSystem.get(conf);
700    if (!(fs instanceof DistributedFileSystem)) return;
701    DistributedFileSystem dfs = (DistributedFileSystem)fs;
702    // Make sure dfs is not in safe mode
703    while (isInSafeMode(dfs)) {
704      LOG.info("Waiting for dfs to exit safe mode...");
705      try {
706        Thread.sleep(wait);
707      } catch (InterruptedException e) {
708        throw (InterruptedIOException)new InterruptedIOException().initCause(e);
709      }
710    }
711  }
712
713  /**
714   * Checks if meta region exists
715   * @param fs file system
716   * @param rootDir root directory of HBase installation
717   * @return true if exists
718   */
719  public static boolean metaRegionExists(FileSystem fs, Path rootDir) throws IOException {
720    Path metaRegionDir = getRegionDirFromRootDir(rootDir, RegionInfoBuilder.FIRST_META_REGIONINFO);
721    return fs.exists(metaRegionDir);
722  }
723
724  /**
725   * Compute HDFS blocks distribution of a given file, or a portion of the file
726   * @param fs file system
727   * @param status file status of the file
728   * @param start start position of the portion
729   * @param length length of the portion
730   * @return The HDFS blocks distribution
731   */
732  static public HDFSBlocksDistribution computeHDFSBlocksDistribution(
733    final FileSystem fs, FileStatus status, long start, long length)
734    throws IOException {
735    HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution();
736    BlockLocation [] blockLocations =
737      fs.getFileBlockLocations(status, start, length);
738    for(BlockLocation bl : blockLocations) {
739      String [] hosts = bl.getHosts();
740      long len = bl.getLength();
741      blocksDistribution.addHostsAndBlockWeight(hosts, len);
742    }
743
744    return blocksDistribution;
745  }
746
747  /**
748   * Update blocksDistribution with blockLocations
749   * @param blocksDistribution the hdfs blocks distribution
750   * @param blockLocations an array containing block location
751   */
752  static public void addToHDFSBlocksDistribution(
753      HDFSBlocksDistribution blocksDistribution, BlockLocation[] blockLocations)
754      throws IOException {
755    for (BlockLocation bl : blockLocations) {
756      String[] hosts = bl.getHosts();
757      long len = bl.getLength();
758      blocksDistribution.addHostsAndBlockWeight(hosts, len);
759    }
760  }
761
762  // TODO move this method OUT of FSUtils. No dependencies to HMaster
763  /**
764   * Returns the total overall fragmentation percentage. Includes hbase:meta and
765   * -ROOT- as well.
766   *
767   * @param master  The master defining the HBase root and file system.
768   * @return A map for each table and its percentage.
769   * @throws IOException When scanning the directory fails.
770   */
771  public static int getTotalTableFragmentation(final HMaster master)
772  throws IOException {
773    Map<String, Integer> map = getTableFragmentation(master);
774    return map != null && map.size() > 0 ? map.get("-TOTAL-") : -1;
775  }
776
777  /**
778   * Runs through the HBase rootdir and checks how many stores for each table
779   * have more than one file in them. Checks -ROOT- and hbase:meta too. The total
780   * percentage across all tables is stored under the special key "-TOTAL-".
781   *
782   * @param master  The master defining the HBase root and file system.
783   * @return A map for each table and its percentage.
784   *
785   * @throws IOException When scanning the directory fails.
786   */
787  public static Map<String, Integer> getTableFragmentation(
788    final HMaster master)
789  throws IOException {
790    Path path = getRootDir(master.getConfiguration());
791    // since HMaster.getFileSystem() is package private
792    FileSystem fs = path.getFileSystem(master.getConfiguration());
793    return getTableFragmentation(fs, path);
794  }
795
796  /**
797   * Runs through the HBase rootdir and checks how many stores for each table
798   * have more than one file in them. Checks -ROOT- and hbase:meta too. The total
799   * percentage across all tables is stored under the special key "-TOTAL-".
800   *
801   * @param fs  The file system to use.
802   * @param hbaseRootDir  The root directory to scan.
803   * @return A map for each table and its percentage.
804   * @throws IOException When scanning the directory fails.
805   */
806  public static Map<String, Integer> getTableFragmentation(
807    final FileSystem fs, final Path hbaseRootDir)
808  throws IOException {
809    Map<String, Integer> frags = new HashMap<>();
810    int cfCountTotal = 0;
811    int cfFragTotal = 0;
812    PathFilter regionFilter = new RegionDirFilter(fs);
813    PathFilter familyFilter = new FamilyDirFilter(fs);
814    List<Path> tableDirs = getTableDirs(fs, hbaseRootDir);
815    for (Path d : tableDirs) {
816      int cfCount = 0;
817      int cfFrag = 0;
818      FileStatus[] regionDirs = fs.listStatus(d, regionFilter);
819      for (FileStatus regionDir : regionDirs) {
820        Path dd = regionDir.getPath();
821        // else its a region name, now look in region for families
822        FileStatus[] familyDirs = fs.listStatus(dd, familyFilter);
823        for (FileStatus familyDir : familyDirs) {
824          cfCount++;
825          cfCountTotal++;
826          Path family = familyDir.getPath();
827          // now in family make sure only one file
828          FileStatus[] familyStatus = fs.listStatus(family);
829          if (familyStatus.length > 1) {
830            cfFrag++;
831            cfFragTotal++;
832          }
833        }
834      }
835      // compute percentage per table and store in result list
836      frags.put(FSUtils.getTableName(d).getNameAsString(),
837        cfCount == 0? 0: Math.round((float) cfFrag / cfCount * 100));
838    }
839    // set overall percentage for all tables
840    frags.put("-TOTAL-",
841      cfCountTotal == 0? 0: Math.round((float) cfFragTotal / cfCountTotal * 100));
842    return frags;
843  }
844
845  /**
846   * A {@link PathFilter} that returns only regular files.
847   */
848  static class FileFilter extends AbstractFileStatusFilter {
849    private final FileSystem fs;
850
851    public FileFilter(final FileSystem fs) {
852      this.fs = fs;
853    }
854
855    @Override
856    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
857      try {
858        return isFile(fs, isDir, p);
859      } catch (IOException e) {
860        LOG.warn("unable to verify if path=" + p + " is a regular file", e);
861        return false;
862      }
863    }
864  }
865
866  /**
867   * Directory filter that doesn't include any of the directories in the specified blacklist
868   */
869  public static class BlackListDirFilter extends AbstractFileStatusFilter {
870    private final FileSystem fs;
871    private List<String> blacklist;
872
873    /**
874     * Create a filter on the givem filesystem with the specified blacklist
875     * @param fs filesystem to filter
876     * @param directoryNameBlackList list of the names of the directories to filter. If
877     *          <tt>null</tt>, all directories are returned
878     */
879    @SuppressWarnings("unchecked")
880    public BlackListDirFilter(final FileSystem fs, final List<String> directoryNameBlackList) {
881      this.fs = fs;
882      blacklist =
883        (List<String>) (directoryNameBlackList == null ? Collections.emptyList()
884          : directoryNameBlackList);
885    }
886
887    @Override
888    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
889      if (!isValidName(p.getName())) {
890        return false;
891      }
892
893      try {
894        return isDirectory(fs, isDir, p);
895      } catch (IOException e) {
896        LOG.warn("An error occurred while verifying if [" + p.toString()
897            + "] is a valid directory. Returning 'not valid' and continuing.", e);
898        return false;
899      }
900    }
901
902    protected boolean isValidName(final String name) {
903      return !blacklist.contains(name);
904    }
905  }
906
907  /**
908   * A {@link PathFilter} that only allows directories.
909   */
910  public static class DirFilter extends BlackListDirFilter {
911
912    public DirFilter(FileSystem fs) {
913      super(fs, null);
914    }
915  }
916
917  /**
918   * A {@link PathFilter} that returns usertable directories. To get all directories use the
919   * {@link BlackListDirFilter} with a <tt>null</tt> blacklist
920   */
921  public static class UserTableDirFilter extends BlackListDirFilter {
922    public UserTableDirFilter(FileSystem fs) {
923      super(fs, HConstants.HBASE_NON_TABLE_DIRS);
924    }
925
926    @Override
927    protected boolean isValidName(final String name) {
928      if (!super.isValidName(name))
929        return false;
930
931      try {
932        TableName.isLegalTableQualifierName(Bytes.toBytes(name));
933      } catch (IllegalArgumentException e) {
934        LOG.info("INVALID NAME " + name);
935        return false;
936      }
937      return true;
938    }
939  }
940
941  /**
942   * Recover file lease. Used when a file might be suspect
943   * to be had been left open by another process.
944   * @param fs FileSystem handle
945   * @param p Path of file to recover lease
946   * @param conf Configuration handle
947   * @throws IOException
948   */
949  public abstract void recoverFileLease(final FileSystem fs, final Path p,
950      Configuration conf, CancelableProgressable reporter) throws IOException;
951
952  public static List<Path> getTableDirs(final FileSystem fs, final Path rootdir)
953      throws IOException {
954    List<Path> tableDirs = new LinkedList<>();
955
956    for(FileStatus status :
957        fs.globStatus(new Path(rootdir,
958            new Path(HConstants.BASE_NAMESPACE_DIR, "*")))) {
959      tableDirs.addAll(FSUtils.getLocalTableDirs(fs, status.getPath()));
960    }
961    return tableDirs;
962  }
963
964  /**
965   * @param fs
966   * @param rootdir
967   * @return All the table directories under <code>rootdir</code>. Ignore non table hbase folders such as
968   * .logs, .oldlogs, .corrupt folders.
969   * @throws IOException
970   */
971  public static List<Path> getLocalTableDirs(final FileSystem fs, final Path rootdir)
972      throws IOException {
973    // presumes any directory under hbase.rootdir is a table
974    FileStatus[] dirs = fs.listStatus(rootdir, new UserTableDirFilter(fs));
975    List<Path> tabledirs = new ArrayList<>(dirs.length);
976    for (FileStatus dir: dirs) {
977      tabledirs.add(dir.getPath());
978    }
979    return tabledirs;
980  }
981
982  /**
983   * Filter for all dirs that don't start with '.'
984   */
985  public static class RegionDirFilter extends AbstractFileStatusFilter {
986    // This pattern will accept 0.90+ style hex region dirs and older numeric region dir names.
987    final public static Pattern regionDirPattern = Pattern.compile("^[0-9a-f]*$");
988    final FileSystem fs;
989
990    public RegionDirFilter(FileSystem fs) {
991      this.fs = fs;
992    }
993
994    @Override
995    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
996      if (!regionDirPattern.matcher(p.getName()).matches()) {
997        return false;
998      }
999
1000      try {
1001        return isDirectory(fs, isDir, p);
1002      } catch (IOException ioe) {
1003        // Maybe the file was moved or the fs was disconnected.
1004        LOG.warn("Skipping file " + p +" due to IOException", ioe);
1005        return false;
1006      }
1007    }
1008  }
1009
1010  /**
1011   * Given a particular table dir, return all the regiondirs inside it, excluding files such as
1012   * .tableinfo
1013   * @param fs A file system for the Path
1014   * @param tableDir Path to a specific table directory &lt;hbase.rootdir&gt;/&lt;tabledir&gt;
1015   * @return List of paths to valid region directories in table dir.
1016   * @throws IOException
1017   */
1018  public static List<Path> getRegionDirs(final FileSystem fs, final Path tableDir) throws IOException {
1019    // assumes we are in a table dir.
1020    List<FileStatus> rds = listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs));
1021    if (rds == null) {
1022      return new ArrayList<>();
1023    }
1024    List<Path> regionDirs = new ArrayList<>(rds.size());
1025    for (FileStatus rdfs: rds) {
1026      Path rdPath = rdfs.getPath();
1027      regionDirs.add(rdPath);
1028    }
1029    return regionDirs;
1030  }
1031
1032  public static Path getRegionDirFromRootDir(Path rootDir, RegionInfo region) {
1033    return getRegionDirFromTableDir(getTableDir(rootDir, region.getTable()), region);
1034  }
1035
1036  public static Path getRegionDirFromTableDir(Path tableDir, RegionInfo region) {
1037    return new Path(tableDir, ServerRegionReplicaUtil.getRegionInfoForFs(region).getEncodedName());
1038  }
1039
1040  /**
1041   * Filter for all dirs that are legal column family names.  This is generally used for colfam
1042   * dirs &lt;hbase.rootdir&gt;/&lt;tabledir&gt;/&lt;regiondir&gt;/&lt;colfamdir&gt;.
1043   */
1044  public static class FamilyDirFilter extends AbstractFileStatusFilter {
1045    final FileSystem fs;
1046
1047    public FamilyDirFilter(FileSystem fs) {
1048      this.fs = fs;
1049    }
1050
1051    @Override
1052    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1053      try {
1054        // throws IAE if invalid
1055        HColumnDescriptor.isLegalFamilyName(Bytes.toBytes(p.getName()));
1056      } catch (IllegalArgumentException iae) {
1057        // path name is an invalid family name and thus is excluded.
1058        return false;
1059      }
1060
1061      try {
1062        return isDirectory(fs, isDir, p);
1063      } catch (IOException ioe) {
1064        // Maybe the file was moved or the fs was disconnected.
1065        LOG.warn("Skipping file " + p +" due to IOException", ioe);
1066        return false;
1067      }
1068    }
1069  }
1070
1071  /**
1072   * Given a particular region dir, return all the familydirs inside it
1073   *
1074   * @param fs A file system for the Path
1075   * @param regionDir Path to a specific region directory
1076   * @return List of paths to valid family directories in region dir.
1077   * @throws IOException
1078   */
1079  public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir) throws IOException {
1080    // assumes we are in a region dir.
1081    FileStatus[] fds = fs.listStatus(regionDir, new FamilyDirFilter(fs));
1082    List<Path> familyDirs = new ArrayList<>(fds.length);
1083    for (FileStatus fdfs: fds) {
1084      Path fdPath = fdfs.getPath();
1085      familyDirs.add(fdPath);
1086    }
1087    return familyDirs;
1088  }
1089
1090  public static List<Path> getReferenceFilePaths(final FileSystem fs, final Path familyDir) throws IOException {
1091    List<FileStatus> fds = listStatusWithStatusFilter(fs, familyDir, new ReferenceFileFilter(fs));
1092    if (fds == null) {
1093      return new ArrayList<>();
1094    }
1095    List<Path> referenceFiles = new ArrayList<>(fds.size());
1096    for (FileStatus fdfs: fds) {
1097      Path fdPath = fdfs.getPath();
1098      referenceFiles.add(fdPath);
1099    }
1100    return referenceFiles;
1101  }
1102
1103  /**
1104   * Filter for HFiles that excludes reference files.
1105   */
1106  public static class HFileFilter extends AbstractFileStatusFilter {
1107    final FileSystem fs;
1108
1109    public HFileFilter(FileSystem fs) {
1110      this.fs = fs;
1111    }
1112
1113    @Override
1114    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1115      if (!StoreFileInfo.isHFile(p)) {
1116        return false;
1117      }
1118
1119      try {
1120        return isFile(fs, isDir, p);
1121      } catch (IOException ioe) {
1122        // Maybe the file was moved or the fs was disconnected.
1123        LOG.warn("Skipping file " + p +" due to IOException", ioe);
1124        return false;
1125      }
1126    }
1127  }
1128
1129  /**
1130   * Filter for HFileLinks (StoreFiles and HFiles not included).
1131   * the filter itself does not consider if a link is file or not.
1132   */
1133  public static class HFileLinkFilter implements PathFilter {
1134
1135    @Override
1136    public boolean accept(Path p) {
1137      return HFileLink.isHFileLink(p);
1138    }
1139  }
1140
1141  public static class ReferenceFileFilter extends AbstractFileStatusFilter {
1142
1143    private final FileSystem fs;
1144
1145    public ReferenceFileFilter(FileSystem fs) {
1146      this.fs = fs;
1147    }
1148
1149    @Override
1150    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1151      if (!StoreFileInfo.isReference(p)) {
1152        return false;
1153      }
1154
1155      try {
1156        // only files can be references.
1157        return isFile(fs, isDir, p);
1158      } catch (IOException ioe) {
1159        // Maybe the file was moved or the fs was disconnected.
1160        LOG.warn("Skipping file " + p +" due to IOException", ioe);
1161        return false;
1162      }
1163    }
1164  }
1165
1166  /**
1167   * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1168   * table StoreFile names to the full Path.
1169   * <br>
1170   * Example...<br>
1171   * Key = 3944417774205889744  <br>
1172   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1173   *
1174   * @param map map to add values.  If null, this method will create and populate one to return
1175   * @param fs  The file system to use.
1176   * @param hbaseRootDir  The root directory to scan.
1177   * @param tableName name of the table to scan.
1178   * @return Map keyed by StoreFile name with a value of the full Path.
1179   * @throws IOException When scanning the directory fails.
1180   * @throws InterruptedException
1181   */
1182  public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map,
1183  final FileSystem fs, final Path hbaseRootDir, TableName tableName)
1184  throws IOException, InterruptedException {
1185    return getTableStoreFilePathMap(map, fs, hbaseRootDir, tableName, null, null, null);
1186  }
1187
1188  /**
1189   * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1190   * table StoreFile names to the full Path.  Note that because this method can be called
1191   * on a 'live' HBase system that we will skip files that no longer exist by the time
1192   * we traverse them and similarly the user of the result needs to consider that some
1193   * entries in this map may not exist by the time this call completes.
1194   * <br>
1195   * Example...<br>
1196   * Key = 3944417774205889744  <br>
1197   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1198   *
1199   * @param resultMap map to add values.  If null, this method will create and populate one to return
1200   * @param fs  The file system to use.
1201   * @param hbaseRootDir  The root directory to scan.
1202   * @param tableName name of the table to scan.
1203   * @param sfFilter optional path filter to apply to store files
1204   * @param executor optional executor service to parallelize this operation
1205   * @param errors ErrorReporter instance or null
1206   * @return Map keyed by StoreFile name with a value of the full Path.
1207   * @throws IOException When scanning the directory fails.
1208   * @throws InterruptedException
1209   */
1210  public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap,
1211      final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter,
1212      ExecutorService executor, final HbckErrorReporter errors)
1213      throws IOException, InterruptedException {
1214
1215    final Map<String, Path> finalResultMap =
1216        resultMap == null ? new ConcurrentHashMap<>(128, 0.75f, 32) : resultMap;
1217
1218    // only include the directory paths to tables
1219    Path tableDir = FSUtils.getTableDir(hbaseRootDir, tableName);
1220    // Inside a table, there are compaction.dir directories to skip.  Otherwise, all else
1221    // should be regions.
1222    final FamilyDirFilter familyFilter = new FamilyDirFilter(fs);
1223    final Vector<Exception> exceptions = new Vector<>();
1224
1225    try {
1226      List<FileStatus> regionDirs = FSUtils.listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs));
1227      if (regionDirs == null) {
1228        return finalResultMap;
1229      }
1230
1231      final List<Future<?>> futures = new ArrayList<>(regionDirs.size());
1232
1233      for (FileStatus regionDir : regionDirs) {
1234        if (null != errors) {
1235          errors.progress();
1236        }
1237        final Path dd = regionDir.getPath();
1238
1239        if (!exceptions.isEmpty()) {
1240          break;
1241        }
1242
1243        Runnable getRegionStoreFileMapCall = new Runnable() {
1244          @Override
1245          public void run() {
1246            try {
1247              HashMap<String,Path> regionStoreFileMap = new HashMap<>();
1248              List<FileStatus> familyDirs = FSUtils.listStatusWithStatusFilter(fs, dd, familyFilter);
1249              if (familyDirs == null) {
1250                if (!fs.exists(dd)) {
1251                  LOG.warn("Skipping region because it no longer exists: " + dd);
1252                } else {
1253                  LOG.warn("Skipping region because it has no family dirs: " + dd);
1254                }
1255                return;
1256              }
1257              for (FileStatus familyDir : familyDirs) {
1258                if (null != errors) {
1259                  errors.progress();
1260                }
1261                Path family = familyDir.getPath();
1262                if (family.getName().equals(HConstants.RECOVERED_EDITS_DIR)) {
1263                  continue;
1264                }
1265                // now in family, iterate over the StoreFiles and
1266                // put in map
1267                FileStatus[] familyStatus = fs.listStatus(family);
1268                for (FileStatus sfStatus : familyStatus) {
1269                  if (null != errors) {
1270                    errors.progress();
1271                  }
1272                  Path sf = sfStatus.getPath();
1273                  if (sfFilter == null || sfFilter.accept(sf)) {
1274                    regionStoreFileMap.put( sf.getName(), sf);
1275                  }
1276                }
1277              }
1278              finalResultMap.putAll(regionStoreFileMap);
1279            } catch (Exception e) {
1280              LOG.error("Could not get region store file map for region: " + dd, e);
1281              exceptions.add(e);
1282            }
1283          }
1284        };
1285
1286        // If executor is available, submit async tasks to exec concurrently, otherwise
1287        // just do serial sync execution
1288        if (executor != null) {
1289          Future<?> future = executor.submit(getRegionStoreFileMapCall);
1290          futures.add(future);
1291        } else {
1292          FutureTask<?> future = new FutureTask<>(getRegionStoreFileMapCall, null);
1293          future.run();
1294          futures.add(future);
1295        }
1296      }
1297
1298      // Ensure all pending tasks are complete (or that we run into an exception)
1299      for (Future<?> f : futures) {
1300        if (!exceptions.isEmpty()) {
1301          break;
1302        }
1303        try {
1304          f.get();
1305        } catch (ExecutionException e) {
1306          LOG.error("Unexpected exec exception!  Should've been caught already.  (Bug?)", e);
1307          // Shouldn't happen, we already logged/caught any exceptions in the Runnable
1308        }
1309      }
1310    } catch (IOException e) {
1311      LOG.error("Cannot execute getTableStoreFilePathMap for " + tableName, e);
1312      exceptions.add(e);
1313    } finally {
1314      if (!exceptions.isEmpty()) {
1315        // Just throw the first exception as an indication something bad happened
1316        // Don't need to propagate all the exceptions, we already logged them all anyway
1317        Throwables.propagateIfInstanceOf(exceptions.firstElement(), IOException.class);
1318        throw Throwables.propagate(exceptions.firstElement());
1319      }
1320    }
1321
1322    return finalResultMap;
1323  }
1324
1325  public static int getRegionReferenceFileCount(final FileSystem fs, final Path p) {
1326    int result = 0;
1327    try {
1328      for (Path familyDir:getFamilyDirs(fs, p)){
1329        result += getReferenceFilePaths(fs, familyDir).size();
1330      }
1331    } catch (IOException e) {
1332      LOG.warn("Error Counting reference files.", e);
1333    }
1334    return result;
1335  }
1336
1337  /**
1338   * Runs through the HBase rootdir and creates a reverse lookup map for
1339   * table StoreFile names to the full Path.
1340   * <br>
1341   * Example...<br>
1342   * Key = 3944417774205889744  <br>
1343   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1344   *
1345   * @param fs  The file system to use.
1346   * @param hbaseRootDir  The root directory to scan.
1347   * @return Map keyed by StoreFile name with a value of the full Path.
1348   * @throws IOException When scanning the directory fails.
1349   * @throws InterruptedException
1350   */
1351  public static Map<String, Path> getTableStoreFilePathMap(
1352    final FileSystem fs, final Path hbaseRootDir)
1353  throws IOException, InterruptedException {
1354    return getTableStoreFilePathMap(fs, hbaseRootDir, null, null, null);
1355  }
1356
1357  /**
1358   * Runs through the HBase rootdir and creates a reverse lookup map for
1359   * table StoreFile names to the full Path.
1360   * <br>
1361   * Example...<br>
1362   * Key = 3944417774205889744  <br>
1363   * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1364   *
1365   * @param fs  The file system to use.
1366   * @param hbaseRootDir  The root directory to scan.
1367   * @param sfFilter optional path filter to apply to store files
1368   * @param executor optional executor service to parallelize this operation
1369   * @param errors ErrorReporter instance or null
1370   * @return Map keyed by StoreFile name with a value of the full Path.
1371   * @throws IOException When scanning the directory fails.
1372   * @throws InterruptedException
1373   */
1374  public static Map<String, Path> getTableStoreFilePathMap(
1375    final FileSystem fs, final Path hbaseRootDir, PathFilter sfFilter,
1376    ExecutorService executor, HbckErrorReporter errors)
1377  throws IOException, InterruptedException {
1378    ConcurrentHashMap<String, Path> map = new ConcurrentHashMap<>(1024, 0.75f, 32);
1379
1380    // if this method looks similar to 'getTableFragmentation' that is because
1381    // it was borrowed from it.
1382
1383    // only include the directory paths to tables
1384    for (Path tableDir : FSUtils.getTableDirs(fs, hbaseRootDir)) {
1385      getTableStoreFilePathMap(map, fs, hbaseRootDir,
1386          FSUtils.getTableName(tableDir), sfFilter, executor, errors);
1387    }
1388    return map;
1389  }
1390
1391  /**
1392   * Filters FileStatuses in an array and returns a list
1393   *
1394   * @param input   An array of FileStatuses
1395   * @param filter  A required filter to filter the array
1396   * @return        A list of FileStatuses
1397   */
1398  public static List<FileStatus> filterFileStatuses(FileStatus[] input,
1399      FileStatusFilter filter) {
1400    if (input == null) return null;
1401    return filterFileStatuses(Iterators.forArray(input), filter);
1402  }
1403
1404  /**
1405   * Filters FileStatuses in an iterator and returns a list
1406   *
1407   * @param input   An iterator of FileStatuses
1408   * @param filter  A required filter to filter the array
1409   * @return        A list of FileStatuses
1410   */
1411  public static List<FileStatus> filterFileStatuses(Iterator<FileStatus> input,
1412      FileStatusFilter filter) {
1413    if (input == null) return null;
1414    ArrayList<FileStatus> results = new ArrayList<>();
1415    while (input.hasNext()) {
1416      FileStatus f = input.next();
1417      if (filter.accept(f)) {
1418        results.add(f);
1419      }
1420    }
1421    return results;
1422  }
1423
1424  /**
1425   * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
1426   * This accommodates differences between hadoop versions, where hadoop 1
1427   * does not throw a FileNotFoundException, and return an empty FileStatus[]
1428   * while Hadoop 2 will throw FileNotFoundException.
1429   *
1430   * @param fs file system
1431   * @param dir directory
1432   * @param filter file status filter
1433   * @return null if dir is empty or doesn't exist, otherwise FileStatus list
1434   */
1435  public static List<FileStatus> listStatusWithStatusFilter(final FileSystem fs,
1436      final Path dir, final FileStatusFilter filter) throws IOException {
1437    FileStatus [] status = null;
1438    try {
1439      status = fs.listStatus(dir);
1440    } catch (FileNotFoundException fnfe) {
1441      // if directory doesn't exist, return null
1442      if (LOG.isTraceEnabled()) {
1443        LOG.trace(dir + " doesn't exist");
1444      }
1445    }
1446
1447    if (status == null || status.length < 1)  {
1448      return null;
1449    }
1450
1451    if (filter == null) {
1452      return Arrays.asList(status);
1453    } else {
1454      List<FileStatus> status2 = filterFileStatuses(status, filter);
1455      if (status2 == null || status2.isEmpty()) {
1456        return null;
1457      } else {
1458        return status2;
1459      }
1460    }
1461  }
1462
1463  /**
1464   * Throw an exception if an action is not permitted by a user on a file.
1465   *
1466   * @param ugi
1467   *          the user
1468   * @param file
1469   *          the file
1470   * @param action
1471   *          the action
1472   */
1473  public static void checkAccess(UserGroupInformation ugi, FileStatus file,
1474      FsAction action) throws AccessDeniedException {
1475    if (ugi.getShortUserName().equals(file.getOwner())) {
1476      if (file.getPermission().getUserAction().implies(action)) {
1477        return;
1478      }
1479    } else if (contains(ugi.getGroupNames(), file.getGroup())) {
1480      if (file.getPermission().getGroupAction().implies(action)) {
1481        return;
1482      }
1483    } else if (file.getPermission().getOtherAction().implies(action)) {
1484      return;
1485    }
1486    throw new AccessDeniedException("Permission denied:" + " action=" + action
1487        + " path=" + file.getPath() + " user=" + ugi.getShortUserName());
1488  }
1489
1490  private static boolean contains(String[] groups, String user) {
1491    for (String group : groups) {
1492      if (group.equals(user)) {
1493        return true;
1494      }
1495    }
1496    return false;
1497  }
1498
1499  /**
1500   * This function is to scan the root path of the file system to get the
1501   * degree of locality for each region on each of the servers having at least
1502   * one block of that region.
1503   * This is used by the tool {@link org.apache.hadoop.hbase.master.RegionPlacementMaintainer}
1504   *
1505   * @param conf
1506   *          the configuration to use
1507   * @return the mapping from region encoded name to a map of server names to
1508   *           locality fraction
1509   * @throws IOException
1510   *           in case of file system errors or interrupts
1511   */
1512  public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
1513      final Configuration conf) throws IOException {
1514    return getRegionDegreeLocalityMappingFromFS(
1515        conf, null,
1516        conf.getInt(THREAD_POOLSIZE, DEFAULT_THREAD_POOLSIZE));
1517
1518  }
1519
1520  /**
1521   * This function is to scan the root path of the file system to get the
1522   * degree of locality for each region on each of the servers having at least
1523   * one block of that region.
1524   *
1525   * @param conf
1526   *          the configuration to use
1527   * @param desiredTable
1528   *          the table you wish to scan locality for
1529   * @param threadPoolSize
1530   *          the thread pool size to use
1531   * @return the mapping from region encoded name to a map of server names to
1532   *           locality fraction
1533   * @throws IOException
1534   *           in case of file system errors or interrupts
1535   */
1536  public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
1537      final Configuration conf, final String desiredTable, int threadPoolSize)
1538      throws IOException {
1539    Map<String, Map<String, Float>> regionDegreeLocalityMapping = new ConcurrentHashMap<>();
1540    getRegionLocalityMappingFromFS(conf, desiredTable, threadPoolSize, null,
1541        regionDegreeLocalityMapping);
1542    return regionDegreeLocalityMapping;
1543  }
1544
1545  /**
1546   * This function is to scan the root path of the file system to get either the
1547   * mapping between the region name and its best locality region server or the
1548   * degree of locality of each region on each of the servers having at least
1549   * one block of that region. The output map parameters are both optional.
1550   *
1551   * @param conf
1552   *          the configuration to use
1553   * @param desiredTable
1554   *          the table you wish to scan locality for
1555   * @param threadPoolSize
1556   *          the thread pool size to use
1557   * @param regionToBestLocalityRSMapping
1558   *          the map into which to put the best locality mapping or null
1559   * @param regionDegreeLocalityMapping
1560   *          the map into which to put the locality degree mapping or null,
1561   *          must be a thread-safe implementation
1562   * @throws IOException
1563   *           in case of file system errors or interrupts
1564   */
1565  private static void getRegionLocalityMappingFromFS(
1566      final Configuration conf, final String desiredTable,
1567      int threadPoolSize,
1568      Map<String, String> regionToBestLocalityRSMapping,
1569      Map<String, Map<String, Float>> regionDegreeLocalityMapping)
1570      throws IOException {
1571    FileSystem fs =  FileSystem.get(conf);
1572    Path rootPath = FSUtils.getRootDir(conf);
1573    long startTime = EnvironmentEdgeManager.currentTime();
1574    Path queryPath;
1575    // The table files are in ${hbase.rootdir}/data/<namespace>/<table>/*
1576    if (null == desiredTable) {
1577      queryPath = new Path(new Path(rootPath, HConstants.BASE_NAMESPACE_DIR).toString() + "/*/*/*/");
1578    } else {
1579      queryPath = new Path(FSUtils.getTableDir(rootPath, TableName.valueOf(desiredTable)).toString() + "/*/");
1580    }
1581
1582    // reject all paths that are not appropriate
1583    PathFilter pathFilter = new PathFilter() {
1584      @Override
1585      public boolean accept(Path path) {
1586        // this is the region name; it may get some noise data
1587        if (null == path) {
1588          return false;
1589        }
1590
1591        // no parent?
1592        Path parent = path.getParent();
1593        if (null == parent) {
1594          return false;
1595        }
1596
1597        String regionName = path.getName();
1598        if (null == regionName) {
1599          return false;
1600        }
1601
1602        if (!regionName.toLowerCase(Locale.ROOT).matches("[0-9a-f]+")) {
1603          return false;
1604        }
1605        return true;
1606      }
1607    };
1608
1609    FileStatus[] statusList = fs.globStatus(queryPath, pathFilter);
1610
1611    if (null == statusList) {
1612      return;
1613    } else {
1614      LOG.debug("Query Path: " + queryPath + " ; # list of files: " +
1615          statusList.length);
1616    }
1617
1618    // lower the number of threads in case we have very few expected regions
1619    threadPoolSize = Math.min(threadPoolSize, statusList.length);
1620
1621    // run in multiple threads
1622    ThreadPoolExecutor tpe = new ThreadPoolExecutor(threadPoolSize,
1623        threadPoolSize, 60, TimeUnit.SECONDS,
1624        new ArrayBlockingQueue<>(statusList.length));
1625    try {
1626      // ignore all file status items that are not of interest
1627      for (FileStatus regionStatus : statusList) {
1628        if (null == regionStatus) {
1629          continue;
1630        }
1631
1632        if (!regionStatus.isDirectory()) {
1633          continue;
1634        }
1635
1636        Path regionPath = regionStatus.getPath();
1637        if (null == regionPath) {
1638          continue;
1639        }
1640
1641        tpe.execute(new FSRegionScanner(fs, regionPath,
1642            regionToBestLocalityRSMapping, regionDegreeLocalityMapping));
1643      }
1644    } finally {
1645      tpe.shutdown();
1646      int threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY,
1647          60 * 1000);
1648      try {
1649        // here we wait until TPE terminates, which is either naturally or by
1650        // exceptions in the execution of the threads
1651        while (!tpe.awaitTermination(threadWakeFrequency,
1652            TimeUnit.MILLISECONDS)) {
1653          // printing out rough estimate, so as to not introduce
1654          // AtomicInteger
1655          LOG.info("Locality checking is underway: { Scanned Regions : "
1656              + tpe.getCompletedTaskCount() + "/"
1657              + tpe.getTaskCount() + " }");
1658        }
1659      } catch (InterruptedException e) {
1660        throw (InterruptedIOException)new InterruptedIOException().initCause(e);
1661      }
1662    }
1663
1664    long overhead = EnvironmentEdgeManager.currentTime() - startTime;
1665    String overheadMsg = "Scan DFS for locality info takes " + overhead + " ms";
1666
1667    LOG.info(overheadMsg);
1668  }
1669
1670  /**
1671   * Do our short circuit read setup.
1672   * Checks buffer size to use and whether to do checksumming in hbase or hdfs.
1673   * @param conf
1674   */
1675  public static void setupShortCircuitRead(final Configuration conf) {
1676    // Check that the user has not set the "dfs.client.read.shortcircuit.skip.checksum" property.
1677    boolean shortCircuitSkipChecksum =
1678      conf.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false);
1679    boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
1680    if (shortCircuitSkipChecksum) {
1681      LOG.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not " +
1682        "be set to true." + (useHBaseChecksum ? " HBase checksum doesn't require " +
1683        "it, see https://issues.apache.org/jira/browse/HBASE-6868." : ""));
1684      assert !shortCircuitSkipChecksum; //this will fail if assertions are on
1685    }
1686    checkShortCircuitReadBufferSize(conf);
1687  }
1688
1689  /**
1690   * Check if short circuit read buffer size is set and if not, set it to hbase value.
1691   * @param conf
1692   */
1693  public static void checkShortCircuitReadBufferSize(final Configuration conf) {
1694    final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2;
1695    final int notSet = -1;
1696    // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2
1697    final String dfsKey = "dfs.client.read.shortcircuit.buffer.size";
1698    int size = conf.getInt(dfsKey, notSet);
1699    // If a size is set, return -- we will use it.
1700    if (size != notSet) return;
1701    // But short circuit buffer size is normally not set.  Put in place the hbase wanted size.
1702    int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize);
1703    conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
1704  }
1705
1706  /**
1707   * @param c
1708   * @return The DFSClient DFSHedgedReadMetrics instance or null if can't be found or not on hdfs.
1709   * @throws IOException
1710   */
1711  public static DFSHedgedReadMetrics getDFSHedgedReadMetrics(final Configuration c)
1712      throws IOException {
1713    if (!isHDFS(c)) return null;
1714    // getHedgedReadMetrics is package private. Get the DFSClient instance that is internal
1715    // to the DFS FS instance and make the method getHedgedReadMetrics accessible, then invoke it
1716    // to get the singleton instance of DFSHedgedReadMetrics shared by DFSClients.
1717    final String name = "getHedgedReadMetrics";
1718    DFSClient dfsclient = ((DistributedFileSystem)FileSystem.get(c)).getClient();
1719    Method m;
1720    try {
1721      m = dfsclient.getClass().getDeclaredMethod(name);
1722    } catch (NoSuchMethodException e) {
1723      LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " +
1724          e.getMessage());
1725      return null;
1726    } catch (SecurityException e) {
1727      LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " +
1728          e.getMessage());
1729      return null;
1730    }
1731    m.setAccessible(true);
1732    try {
1733      return (DFSHedgedReadMetrics)m.invoke(dfsclient);
1734    } catch (IllegalAccessException e) {
1735      LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " +
1736          e.getMessage());
1737      return null;
1738    } catch (IllegalArgumentException e) {
1739      LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " +
1740          e.getMessage());
1741      return null;
1742    } catch (InvocationTargetException e) {
1743      LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " +
1744          e.getMessage());
1745      return null;
1746    }
1747  }
1748
1749}