View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.util;
20  
21  import java.io.ByteArrayInputStream;
22  import java.io.DataInputStream;
23  import java.io.EOFException;
24  import java.io.FileNotFoundException;
25  import java.io.IOException;
26  import java.io.InputStream;
27  import java.io.InterruptedIOException;
28  import java.lang.reflect.InvocationTargetException;
29  import java.lang.reflect.Method;
30  import java.net.InetSocketAddress;
31  import java.net.URI;
32  import java.net.URISyntaxException;
33  import java.util.ArrayList;
34  import java.util.Collections;
35  import java.util.HashMap;
36  import java.util.LinkedList;
37  import java.util.List;
38  import java.util.Map;
39  import java.util.concurrent.ArrayBlockingQueue;
40  import java.util.concurrent.ConcurrentHashMap;
41  import java.util.concurrent.ThreadPoolExecutor;
42  import java.util.concurrent.TimeUnit;
43  import java.util.regex.Pattern;
44  
45  import org.apache.commons.logging.Log;
46  import org.apache.commons.logging.LogFactory;
47  import org.apache.hadoop.hbase.classification.InterfaceAudience;
48  import org.apache.hadoop.HadoopIllegalArgumentException;
49  import org.apache.hadoop.conf.Configuration;
50  import org.apache.hadoop.fs.BlockLocation;
51  import org.apache.hadoop.fs.FSDataInputStream;
52  import org.apache.hadoop.fs.FSDataOutputStream;
53  import org.apache.hadoop.fs.FileStatus;
54  import org.apache.hadoop.fs.FileSystem;
55  import org.apache.hadoop.fs.Path;
56  import org.apache.hadoop.fs.PathFilter;
57  import org.apache.hadoop.fs.permission.FsAction;
58  import org.apache.hadoop.fs.permission.FsPermission;
59  import org.apache.hadoop.hbase.ClusterId;
60  import org.apache.hadoop.hbase.HColumnDescriptor;
61  import org.apache.hadoop.hbase.HConstants;
62  import org.apache.hadoop.hbase.HDFSBlocksDistribution;
63  import org.apache.hadoop.hbase.HRegionInfo;
64  import org.apache.hadoop.hbase.RemoteExceptionHandler;
65  import org.apache.hadoop.hbase.TableName;
66  import org.apache.hadoop.hbase.exceptions.DeserializationException;
67  import org.apache.hadoop.hbase.fs.HFileSystem;
68  import org.apache.hadoop.hbase.master.HMaster;
69  import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
70  import org.apache.hadoop.hbase.security.AccessDeniedException;
71  import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter;
72  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
73  import org.apache.hadoop.hbase.protobuf.generated.FSProtos;
74  import org.apache.hadoop.hbase.regionserver.HRegion;
75  import org.apache.hadoop.hdfs.DistributedFileSystem;
76  import org.apache.hadoop.io.IOUtils;
77  import org.apache.hadoop.io.SequenceFile;
78  import org.apache.hadoop.ipc.RemoteException;
79  import org.apache.hadoop.security.UserGroupInformation;
80  import org.apache.hadoop.util.Progressable;
81  import org.apache.hadoop.util.ReflectionUtils;
82  import org.apache.hadoop.util.StringUtils;
83  
84  import com.google.common.primitives.Ints;
85  
86  /**
87   * Utility methods for interacting with the underlying file system.
88   */
89  @InterfaceAudience.Private
90  public abstract class FSUtils {
91    private static final Log LOG = LogFactory.getLog(FSUtils.class);
92  
93    /** Full access permissions (starting point for a umask) */
94    public static final String FULL_RWX_PERMISSIONS = "777";
95    private static final String THREAD_POOLSIZE = "hbase.client.localityCheck.threadPoolSize";
96    private static final int DEFAULT_THREAD_POOLSIZE = 2;
97  
98    /** Set to true on Windows platforms */
99    public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
100 
101   protected FSUtils() {
102     super();
103   }
104 
105   /**
106    * Sets storage policy for given path according to config setting.
107    * If the passed path is a directory, we'll set the storage policy for all files
108    * created in the future in said directory. Note that this change in storage
109    * policy takes place at the HDFS level; it will persist beyond this RS's lifecycle.
110    * If we're running on a version of HDFS that doesn't support the given storage policy
111    * (or storage policies at all), then we'll issue a log message and continue.
112    *
113    * See http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html
114    *
115    * @param fs We only do anything if an instance of DistributedFileSystem
116    * @param conf used to look up storage policy with given key; not modified.
117    * @param path the Path whose storage policy is to be set
118    * @param policyKey e.g. HConstants.WAL_STORAGE_POLICY
119    * @param defaultPolicy usually should be the policy NONE to delegate to HDFS
120    */
121   public static void setStoragePolicy(final FileSystem fs, final Configuration conf,
122       final Path path, final String policyKey, final String defaultPolicy) {
123     String storagePolicy = conf.get(policyKey, defaultPolicy).toUpperCase();
124     if (storagePolicy.equals(defaultPolicy)) {
125       if (LOG.isTraceEnabled()) {
126         LOG.trace("default policy of " + defaultPolicy + " requested, exiting early.");
127       }
128       return;
129     }
130     if (fs instanceof DistributedFileSystem) {
131       DistributedFileSystem dfs = (DistributedFileSystem)fs;
132       // Once our minimum supported Hadoop version is 2.6.0 we can remove reflection.
133       Class<? extends DistributedFileSystem> dfsClass = dfs.getClass();
134       Method m = null;
135       try {
136         m = dfsClass.getDeclaredMethod("setStoragePolicy",
137             new Class<?>[] { Path.class, String.class });
138         m.setAccessible(true);
139       } catch (NoSuchMethodException e) {
140         LOG.info("FileSystem doesn't support"
141             + " setStoragePolicy; --HDFS-6584 not available");
142       } catch (SecurityException e) {
143         LOG.info("Doesn't have access to setStoragePolicy on "
144             + "FileSystems --HDFS-6584 not available", e);
145         m = null; // could happen on setAccessible()
146       }
147       if (m != null) {
148         try {
149           m.invoke(dfs, path, storagePolicy);
150           LOG.info("set " + storagePolicy + " for " + path);
151         } catch (Exception e) {
152           // check for lack of HDFS-7228
153           boolean probablyBadPolicy = false;
154           if (e instanceof InvocationTargetException) {
155             final Throwable exception = e.getCause();
156             if (exception instanceof RemoteException &&
157                 HadoopIllegalArgumentException.class.getName().equals(
158                     ((RemoteException)exception).getClassName())) {
159               LOG.warn("Given storage policy, '" + storagePolicy + "', was rejected and probably " +
160                   "isn't a valid policy for the version of Hadoop you're running. I.e. if you're " +
161                   "trying to use SSD related policies then you're likely missing HDFS-7228. For " +
162                   "more information see the 'ArchivalStorage' docs for your Hadoop release.");
163               LOG.debug("More information about the invalid storage policy.", exception);
164               probablyBadPolicy = true;
165             }
166           }
167           if (!probablyBadPolicy) {
168             // This swallows FNFE, should we be throwing it? seems more likely to indicate dev
169             // misuse than a runtime problem with HDFS.
170             LOG.warn("Unable to set " + storagePolicy + " for " + path, e);
171           }
172         }
173       }
174     } else {
175       LOG.info("FileSystem isn't an instance of DistributedFileSystem; presuming it doesn't " +
176           "support setStoragePolicy.");
177     }
178   }
179 
180   /**
181    * Compare of path component. Does not consider schema; i.e. if schemas
182    * different but <code>path</code> starts with <code>rootPath</code>,
183    * then the function returns true
184    * @param rootPath
185    * @param path
186    * @return True if <code>path</code> starts with <code>rootPath</code>
187    */
188   public static boolean isStartingWithPath(final Path rootPath, final String path) {
189     String uriRootPath = rootPath.toUri().getPath();
190     String tailUriPath = (new Path(path)).toUri().getPath();
191     return tailUriPath.startsWith(uriRootPath);
192   }
193 
194   /**
195    * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
196    * '/a/b/c' part. Does not consider schema; i.e. if schemas different but path or subpath matches,
197    * the two will equate.
198    * @param pathToSearch Path we will be trying to match.
199    * @param pathTail
200    * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
201    */
202   public static boolean isMatchingTail(final Path pathToSearch, String pathTail) {
203     return isMatchingTail(pathToSearch, new Path(pathTail));
204   }
205 
206   /**
207    * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
208    * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true.  Does not consider
209    * schema; i.e. if schemas different but path or subpath matches, the two will equate.
210    * @param pathToSearch Path we will be trying to match.
211    * @param pathTail
212    * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
213    */
214   public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) {
215     if (pathToSearch.depth() != pathTail.depth()) return false;
216     Path tailPath = pathTail;
217     String tailName;
218     Path toSearch = pathToSearch;
219     String toSearchName;
220     boolean result = false;
221     do {
222       tailName = tailPath.getName();
223       if (tailName == null || tailName.length() <= 0) {
224         result = true;
225         break;
226       }
227       toSearchName = toSearch.getName();
228       if (toSearchName == null || toSearchName.length() <= 0) break;
229       // Move up a parent on each path for next go around.  Path doesn't let us go off the end.
230       tailPath = tailPath.getParent();
231       toSearch = toSearch.getParent();
232     } while(tailName.equals(toSearchName));
233     return result;
234   }
235 
236   public static FSUtils getInstance(FileSystem fs, Configuration conf) {
237     String scheme = fs.getUri().getScheme();
238     if (scheme == null) {
239       LOG.warn("Could not find scheme for uri " +
240           fs.getUri() + ", default to hdfs");
241       scheme = "hdfs";
242     }
243     Class<?> fsUtilsClass = conf.getClass("hbase.fsutil." +
244         scheme + ".impl", FSHDFSUtils.class); // Default to HDFS impl
245     FSUtils fsUtils = (FSUtils)ReflectionUtils.newInstance(fsUtilsClass, conf);
246     return fsUtils;
247   }
248 
249   /**
250    * Delete if exists.
251    * @param fs filesystem object
252    * @param dir directory to delete
253    * @return True if deleted <code>dir</code>
254    * @throws IOException e
255    */
256   public static boolean deleteDirectory(final FileSystem fs, final Path dir)
257   throws IOException {
258     return fs.exists(dir) && fs.delete(dir, true);
259   }
260 
261   /**
262    * Delete the region directory if exists.
263    * @param conf
264    * @param hri
265    * @return True if deleted the region directory.
266    * @throws IOException
267    */
268   public static boolean deleteRegionDir(final Configuration conf, final HRegionInfo hri)
269   throws IOException {
270     Path rootDir = getRootDir(conf);
271     FileSystem fs = rootDir.getFileSystem(conf);
272     return deleteDirectory(fs,
273       new Path(getTableDir(rootDir, hri.getTable()), hri.getEncodedName()));
274   }
275 
276   /**
277    * Return the number of bytes that large input files should be optimally
278    * be split into to minimize i/o time.
279    *
280    * use reflection to search for getDefaultBlockSize(Path f)
281    * if the method doesn't exist, fall back to using getDefaultBlockSize()
282    *
283    * @param fs filesystem object
284    * @return the default block size for the path's filesystem
285    * @throws IOException e
286    */
287   public static long getDefaultBlockSize(final FileSystem fs, final Path path) throws IOException {
288     Method m = null;
289     Class<? extends FileSystem> cls = fs.getClass();
290     try {
291       m = cls.getMethod("getDefaultBlockSize", new Class<?>[] { Path.class });
292     } catch (NoSuchMethodException e) {
293       LOG.info("FileSystem doesn't support getDefaultBlockSize");
294     } catch (SecurityException e) {
295       LOG.info("Doesn't have access to getDefaultBlockSize on FileSystems", e);
296       m = null; // could happen on setAccessible()
297     }
298     if (m == null) {
299       return fs.getDefaultBlockSize(path);
300     } else {
301       try {
302         Object ret = m.invoke(fs, path);
303         return ((Long)ret).longValue();
304       } catch (Exception e) {
305         throw new IOException(e);
306       }
307     }
308   }
309 
310   /*
311    * Get the default replication.
312    *
313    * use reflection to search for getDefaultReplication(Path f)
314    * if the method doesn't exist, fall back to using getDefaultReplication()
315    *
316    * @param fs filesystem object
317    * @param f path of file
318    * @return default replication for the path's filesystem
319    * @throws IOException e
320    */
321   public static short getDefaultReplication(final FileSystem fs, final Path path) throws IOException {
322     Method m = null;
323     Class<? extends FileSystem> cls = fs.getClass();
324     try {
325       m = cls.getMethod("getDefaultReplication", new Class<?>[] { Path.class });
326     } catch (NoSuchMethodException e) {
327       LOG.info("FileSystem doesn't support getDefaultReplication");
328     } catch (SecurityException e) {
329       LOG.info("Doesn't have access to getDefaultReplication on FileSystems", e);
330       m = null; // could happen on setAccessible()
331     }
332     if (m == null) {
333       return fs.getDefaultReplication(path);
334     } else {
335       try {
336         Object ret = m.invoke(fs, path);
337         return ((Number)ret).shortValue();
338       } catch (Exception e) {
339         throw new IOException(e);
340       }
341     }
342   }
343 
344   /**
345    * Returns the default buffer size to use during writes.
346    *
347    * The size of the buffer should probably be a multiple of hardware
348    * page size (4096 on Intel x86), and it determines how much data is
349    * buffered during read and write operations.
350    *
351    * @param fs filesystem object
352    * @return default buffer size to use during writes
353    */
354   public static int getDefaultBufferSize(final FileSystem fs) {
355     return fs.getConf().getInt("io.file.buffer.size", 4096);
356   }
357 
358   /**
359    * Create the specified file on the filesystem. By default, this will:
360    * <ol>
361    * <li>overwrite the file if it exists</li>
362    * <li>apply the umask in the configuration (if it is enabled)</li>
363    * <li>use the fs configured buffer size (or 4096 if not set)</li>
364    * <li>use the configured column family replication or default replication if
365    * {@link HColumnDescriptor#DEFAULT_DFS_REPLICATION}</li>
366    * <li>use the default block size</li>
367    * <li>not track progress</li>
368    * </ol>
369    * @param conf configurations
370    * @param fs {@link FileSystem} on which to write the file
371    * @param path {@link Path} to the file to write
372    * @param perm permissions
373    * @param favoredNodes
374    * @return output stream to the created file
375    * @throws IOException if the file cannot be created
376    */
377   public static FSDataOutputStream create(Configuration conf, FileSystem fs, Path path,
378       FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException {
379     if (fs instanceof HFileSystem) {
380       FileSystem backingFs = ((HFileSystem)fs).getBackingFs();
381       if (backingFs instanceof DistributedFileSystem) {
382         // Try to use the favoredNodes version via reflection to allow backwards-
383         // compatibility.
384         short replication = Short.parseShort(conf.get(HColumnDescriptor.DFS_REPLICATION,
385           String.valueOf(HColumnDescriptor.DEFAULT_DFS_REPLICATION)));
386         try {
387           return (FSDataOutputStream) (DistributedFileSystem.class.getDeclaredMethod("create",
388             Path.class, FsPermission.class, boolean.class, int.class, short.class, long.class,
389             Progressable.class, InetSocketAddress[].class).invoke(backingFs, path, perm, true,
390             getDefaultBufferSize(backingFs),
391             replication > 0 ? replication : getDefaultReplication(backingFs, path),
392             getDefaultBlockSize(backingFs, path), null, favoredNodes));
393         } catch (InvocationTargetException ite) {
394           // Function was properly called, but threw it's own exception.
395           throw new IOException(ite.getCause());
396         } catch (NoSuchMethodException e) {
397           LOG.debug("DFS Client does not support most favored nodes create; using default create");
398           if (LOG.isTraceEnabled()) LOG.trace("Ignoring; use default create", e);
399         } catch (IllegalArgumentException e) {
400           LOG.debug("Ignoring (most likely Reflection related exception) " + e);
401         } catch (SecurityException e) {
402           LOG.debug("Ignoring (most likely Reflection related exception) " + e);
403         } catch (IllegalAccessException e) {
404           LOG.debug("Ignoring (most likely Reflection related exception) " + e);
405         }
406       }
407     }
408     return create(fs, path, perm, true);
409   }
410 
411   /**
412    * Create the specified file on the filesystem. By default, this will:
413    * <ol>
414    * <li>apply the umask in the configuration (if it is enabled)</li>
415    * <li>use the fs configured buffer size (or 4096 if not set)</li>
416    * <li>use the default replication</li>
417    * <li>use the default block size</li>
418    * <li>not track progress</li>
419    * </ol>
420    *
421    * @param fs {@link FileSystem} on which to write the file
422    * @param path {@link Path} to the file to write
423    * @param perm
424    * @param overwrite Whether or not the created file should be overwritten.
425    * @return output stream to the created file
426    * @throws IOException if the file cannot be created
427    */
428   public static FSDataOutputStream create(FileSystem fs, Path path,
429       FsPermission perm, boolean overwrite) throws IOException {
430     if (LOG.isTraceEnabled()) {
431       LOG.trace("Creating file=" + path + " with permission=" + perm + ", overwrite=" + overwrite);
432     }
433     return fs.create(path, perm, overwrite, getDefaultBufferSize(fs),
434         getDefaultReplication(fs, path), getDefaultBlockSize(fs, path), null);
435   }
436 
437   /**
438    * Get the file permissions specified in the configuration, if they are
439    * enabled.
440    *
441    * @param fs filesystem that the file will be created on.
442    * @param conf configuration to read for determining if permissions are
443    *          enabled and which to use
444    * @param permssionConfKey property key in the configuration to use when
445    *          finding the permission
446    * @return the permission to use when creating a new file on the fs. If
447    *         special permissions are not specified in the configuration, then
448    *         the default permissions on the the fs will be returned.
449    */
450   public static FsPermission getFilePermissions(final FileSystem fs,
451       final Configuration conf, final String permssionConfKey) {
452     boolean enablePermissions = conf.getBoolean(
453         HConstants.ENABLE_DATA_FILE_UMASK, false);
454 
455     if (enablePermissions) {
456       try {
457         FsPermission perm = new FsPermission(FULL_RWX_PERMISSIONS);
458         // make sure that we have a mask, if not, go default.
459         String mask = conf.get(permssionConfKey);
460         if (mask == null)
461           return FsPermission.getFileDefault();
462         // appy the umask
463         FsPermission umask = new FsPermission(mask);
464         return perm.applyUMask(umask);
465       } catch (IllegalArgumentException e) {
466         LOG.warn(
467             "Incorrect umask attempted to be created: "
468                 + conf.get(permssionConfKey)
469                 + ", using default file permissions.", e);
470         return FsPermission.getFileDefault();
471       }
472     }
473     return FsPermission.getFileDefault();
474   }
475 
476   /**
477    * Checks to see if the specified file system is available
478    *
479    * @param fs filesystem
480    * @throws IOException e
481    */
482   public static void checkFileSystemAvailable(final FileSystem fs)
483   throws IOException {
484     if (!(fs instanceof DistributedFileSystem)) {
485       return;
486     }
487     IOException exception = null;
488     DistributedFileSystem dfs = (DistributedFileSystem) fs;
489     try {
490       if (dfs.exists(new Path("/"))) {
491         return;
492       }
493     } catch (IOException e) {
494       exception = RemoteExceptionHandler.checkIOException(e);
495     }
496     try {
497       fs.close();
498     } catch (Exception e) {
499       LOG.error("file system close failed: ", e);
500     }
501     IOException io = new IOException("File system is not available");
502     io.initCause(exception);
503     throw io;
504   }
505 
506   /**
507    * We use reflection because {@link DistributedFileSystem#setSafeMode(
508    * HdfsConstants.SafeModeAction action, boolean isChecked)} is not in hadoop 1.1
509    *
510    * @param dfs
511    * @return whether we're in safe mode
512    * @throws IOException
513    */
514   private static boolean isInSafeMode(DistributedFileSystem dfs) throws IOException {
515     boolean inSafeMode = false;
516     try {
517       Method m = DistributedFileSystem.class.getMethod("setSafeMode", new Class<?> []{
518           org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.class, boolean.class});
519       inSafeMode = (Boolean) m.invoke(dfs,
520         org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET, true);
521     } catch (Exception e) {
522       if (e instanceof IOException) throw (IOException) e;
523 
524       // Check whether dfs is on safemode.
525       inSafeMode = dfs.setSafeMode(
526         org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET);
527     }
528     return inSafeMode;
529   }
530 
531   /**
532    * Check whether dfs is in safemode.
533    * @param conf
534    * @throws IOException
535    */
536   public static void checkDfsSafeMode(final Configuration conf)
537   throws IOException {
538     boolean isInSafeMode = false;
539     FileSystem fs = FileSystem.get(conf);
540     if (fs instanceof DistributedFileSystem) {
541       DistributedFileSystem dfs = (DistributedFileSystem)fs;
542       isInSafeMode = isInSafeMode(dfs);
543     }
544     if (isInSafeMode) {
545       throw new IOException("File system is in safemode, it can't be written now");
546     }
547   }
548 
549   /**
550    * Verifies current version of file system
551    *
552    * @param fs filesystem object
553    * @param rootdir root hbase directory
554    * @return null if no version file exists, version string otherwise.
555    * @throws IOException e
556    * @throws org.apache.hadoop.hbase.exceptions.DeserializationException
557    */
558   public static String getVersion(FileSystem fs, Path rootdir)
559   throws IOException, DeserializationException {
560     Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
561     FileStatus[] status = null;
562     try {
563       // hadoop 2.0 throws FNFE if directory does not exist.
564       // hadoop 1.0 returns null if directory does not exist.
565       status = fs.listStatus(versionFile);
566     } catch (FileNotFoundException fnfe) {
567       return null;
568     }
569     if (status == null || status.length == 0) return null;
570     String version = null;
571     byte [] content = new byte [(int)status[0].getLen()];
572     FSDataInputStream s = fs.open(versionFile);
573     try {
574       IOUtils.readFully(s, content, 0, content.length);
575       if (ProtobufUtil.isPBMagicPrefix(content)) {
576         version = parseVersionFrom(content);
577       } else {
578         // Presume it pre-pb format.
579         InputStream is = new ByteArrayInputStream(content);
580         DataInputStream dis = new DataInputStream(is);
581         try {
582           version = dis.readUTF();
583         } finally {
584           dis.close();
585         }
586       }
587     } catch (EOFException eof) {
588       LOG.warn("Version file was empty, odd, will try to set it.");
589     } finally {
590       s.close();
591     }
592     return version;
593   }
594 
595   /**
596    * Parse the content of the ${HBASE_ROOTDIR}/hbase.version file.
597    * @param bytes The byte content of the hbase.version file.
598    * @return The version found in the file as a String.
599    * @throws DeserializationException
600    */
601   static String parseVersionFrom(final byte [] bytes)
602   throws DeserializationException {
603     ProtobufUtil.expectPBMagicPrefix(bytes);
604     int pblen = ProtobufUtil.lengthOfPBMagic();
605     FSProtos.HBaseVersionFileContent.Builder builder =
606       FSProtos.HBaseVersionFileContent.newBuilder();
607     try {
608       ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
609       return builder.getVersion();
610     } catch (IOException e) {
611       // Convert
612       throw new DeserializationException(e);
613     }
614   }
615 
616   /**
617    * Create the content to write into the ${HBASE_ROOTDIR}/hbase.version file.
618    * @param version Version to persist
619    * @return Serialized protobuf with <code>version</code> content and a bit of pb magic for a prefix.
620    */
621   static byte [] toVersionByteArray(final String version) {
622     FSProtos.HBaseVersionFileContent.Builder builder =
623       FSProtos.HBaseVersionFileContent.newBuilder();
624     return ProtobufUtil.prependPBMagic(builder.setVersion(version).build().toByteArray());
625   }
626 
627   /**
628    * Verifies current version of file system
629    *
630    * @param fs file system
631    * @param rootdir root directory of HBase installation
632    * @param message if true, issues a message on System.out
633    *
634    * @throws IOException e
635    * @throws DeserializationException
636    */
637   public static void checkVersion(FileSystem fs, Path rootdir, boolean message)
638   throws IOException, DeserializationException {
639     checkVersion(fs, rootdir, message, 0, HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
640   }
641 
642   /**
643    * Verifies current version of file system
644    *
645    * @param fs file system
646    * @param rootdir root directory of HBase installation
647    * @param message if true, issues a message on System.out
648    * @param wait wait interval
649    * @param retries number of times to retry
650    *
651    * @throws IOException e
652    * @throws DeserializationException
653    */
654   public static void checkVersion(FileSystem fs, Path rootdir,
655       boolean message, int wait, int retries)
656   throws IOException, DeserializationException {
657     String version = getVersion(fs, rootdir);
658     if (version == null) {
659       if (!metaRegionExists(fs, rootdir)) {
660         // rootDir is empty (no version file and no root region)
661         // just create new version file (HBASE-1195)
662         setVersion(fs, rootdir, wait, retries);
663         return;
664       }
665     } else if (version.compareTo(HConstants.FILE_SYSTEM_VERSION) == 0) return;
666 
667     // version is deprecated require migration
668     // Output on stdout so user sees it in terminal.
669     String msg = "HBase file layout needs to be upgraded."
670       + " You have version " + version
671       + " and I want version " + HConstants.FILE_SYSTEM_VERSION
672       + ". Consult http://hbase.apache.org/book.html for further information about upgrading HBase."
673       + " Is your hbase.rootdir valid? If so, you may need to run "
674       + "'hbase hbck -fixVersionFile'.";
675     if (message) {
676       System.out.println("WARNING! " + msg);
677     }
678     throw new FileSystemVersionException(msg);
679   }
680 
681   /**
682    * Sets version of file system
683    *
684    * @param fs filesystem object
685    * @param rootdir hbase root
686    * @throws IOException e
687    */
688   public static void setVersion(FileSystem fs, Path rootdir)
689   throws IOException {
690     setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, 0,
691       HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
692   }
693 
694   /**
695    * Sets version of file system
696    *
697    * @param fs filesystem object
698    * @param rootdir hbase root
699    * @param wait time to wait for retry
700    * @param retries number of times to retry before failing
701    * @throws IOException e
702    */
703   public static void setVersion(FileSystem fs, Path rootdir, int wait, int retries)
704   throws IOException {
705     setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, wait, retries);
706   }
707 
708 
709   /**
710    * Sets version of file system
711    *
712    * @param fs filesystem object
713    * @param rootdir hbase root directory
714    * @param version version to set
715    * @param wait time to wait for retry
716    * @param retries number of times to retry before throwing an IOException
717    * @throws IOException e
718    */
719   public static void setVersion(FileSystem fs, Path rootdir, String version,
720       int wait, int retries) throws IOException {
721     Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
722     Path tempVersionFile = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY + Path.SEPARATOR +
723       HConstants.VERSION_FILE_NAME);
724     while (true) {
725       try {
726         // Write the version to a temporary file
727         FSDataOutputStream s = fs.create(tempVersionFile);
728         try {
729           s.write(toVersionByteArray(version));
730           s.close();
731           s = null;
732           // Move the temp version file to its normal location. Returns false
733           // if the rename failed. Throw an IOE in that case.
734           if (!fs.rename(tempVersionFile, versionFile)) {
735             throw new IOException("Unable to move temp version file to " + versionFile);
736           }
737         } finally {
738           // Cleaning up the temporary if the rename failed would be trying
739           // too hard. We'll unconditionally create it again the next time
740           // through anyway, files are overwritten by default by create().
741 
742           // Attempt to close the stream on the way out if it is still open.
743           try {
744             if (s != null) s.close();
745           } catch (IOException ignore) { }
746         }
747         LOG.info("Created version file at " + rootdir.toString() + " with version=" + version);
748         return;
749       } catch (IOException e) {
750         if (retries > 0) {
751           LOG.debug("Unable to create version file at " + rootdir.toString() + ", retrying", e);
752           fs.delete(versionFile, false);
753           try {
754             if (wait > 0) {
755               Thread.sleep(wait);
756             }
757           } catch (InterruptedException ie) {
758             throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
759           }
760           retries--;
761         } else {
762           throw e;
763         }
764       }
765     }
766   }
767 
768   /**
769    * Checks that a cluster ID file exists in the HBase root directory
770    * @param fs the root directory FileSystem
771    * @param rootdir the HBase root directory in HDFS
772    * @param wait how long to wait between retries
773    * @return <code>true</code> if the file exists, otherwise <code>false</code>
774    * @throws IOException if checking the FileSystem fails
775    */
776   public static boolean checkClusterIdExists(FileSystem fs, Path rootdir,
777       int wait) throws IOException {
778     while (true) {
779       try {
780         Path filePath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
781         return fs.exists(filePath);
782       } catch (IOException ioe) {
783         if (wait > 0) {
784           LOG.warn("Unable to check cluster ID file in " + rootdir.toString() +
785               ", retrying in "+wait+"msec: "+StringUtils.stringifyException(ioe));
786           try {
787             Thread.sleep(wait);
788           } catch (InterruptedException e) {
789             throw (InterruptedIOException)new InterruptedIOException().initCause(e);
790           }
791         } else {
792           throw ioe;
793         }
794       }
795     }
796   }
797 
798   /**
799    * Returns the value of the unique cluster ID stored for this HBase instance.
800    * @param fs the root directory FileSystem
801    * @param rootdir the path to the HBase root directory
802    * @return the unique cluster identifier
803    * @throws IOException if reading the cluster ID file fails
804    */
805   public static ClusterId getClusterId(FileSystem fs, Path rootdir)
806   throws IOException {
807     Path idPath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
808     ClusterId clusterId = null;
809     FileStatus status = fs.exists(idPath)? fs.getFileStatus(idPath):  null;
810     if (status != null) {
811       int len = Ints.checkedCast(status.getLen());
812       byte [] content = new byte[len];
813       FSDataInputStream in = fs.open(idPath);
814       try {
815         in.readFully(content);
816       } catch (EOFException eof) {
817         LOG.warn("Cluster ID file " + idPath.toString() + " was empty");
818       } finally{
819         in.close();
820       }
821       try {
822         clusterId = ClusterId.parseFrom(content);
823       } catch (DeserializationException e) {
824         throw new IOException("content=" + Bytes.toString(content), e);
825       }
826       // If not pb'd, make it so.
827       if (!ProtobufUtil.isPBMagicPrefix(content)) {
828         String cid = null;
829         in = fs.open(idPath);
830         try {
831           cid = in.readUTF();
832           clusterId = new ClusterId(cid);
833         } catch (EOFException eof) {
834           LOG.warn("Cluster ID file " + idPath.toString() + " was empty");
835         } finally {
836           in.close();
837         }
838         rewriteAsPb(fs, rootdir, idPath, clusterId);
839       }
840       return clusterId;
841     } else {
842       LOG.warn("Cluster ID file does not exist at " + idPath.toString());
843     }
844     return clusterId;
845   }
846 
847   /**
848    * @param cid
849    * @throws IOException
850    */
851   private static void rewriteAsPb(final FileSystem fs, final Path rootdir, final Path p,
852       final ClusterId cid)
853   throws IOException {
854     // Rewrite the file as pb.  Move aside the old one first, write new
855     // then delete the moved-aside file.
856     Path movedAsideName = new Path(p + "." + System.currentTimeMillis());
857     if (!fs.rename(p, movedAsideName)) throw new IOException("Failed rename of " + p);
858     setClusterId(fs, rootdir, cid, 100);
859     if (!fs.delete(movedAsideName, false)) {
860       throw new IOException("Failed delete of " + movedAsideName);
861     }
862     LOG.debug("Rewrote the hbase.id file as pb");
863   }
864 
865   /**
866    * Writes a new unique identifier for this cluster to the "hbase.id" file
867    * in the HBase root directory
868    * @param fs the root directory FileSystem
869    * @param rootdir the path to the HBase root directory
870    * @param clusterId the unique identifier to store
871    * @param wait how long (in milliseconds) to wait between retries
872    * @throws IOException if writing to the FileSystem fails and no wait value
873    */
874   public static void setClusterId(FileSystem fs, Path rootdir, ClusterId clusterId,
875       int wait) throws IOException {
876     while (true) {
877       try {
878         Path idFile = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
879         Path tempIdFile = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY +
880           Path.SEPARATOR + HConstants.CLUSTER_ID_FILE_NAME);
881         // Write the id file to a temporary location
882         FSDataOutputStream s = fs.create(tempIdFile);
883         try {
884           s.write(clusterId.toByteArray());
885           s.close();
886           s = null;
887           // Move the temporary file to its normal location. Throw an IOE if
888           // the rename failed
889           if (!fs.rename(tempIdFile, idFile)) {
890             throw new IOException("Unable to move temp version file to " + idFile);
891           }
892         } finally {
893           // Attempt to close the stream if still open on the way out
894           try {
895             if (s != null) s.close();
896           } catch (IOException ignore) { }
897         }
898         if (LOG.isDebugEnabled()) {
899           LOG.debug("Created cluster ID file at " + idFile.toString() + " with ID: " + clusterId);
900         }
901         return;
902       } catch (IOException ioe) {
903         if (wait > 0) {
904           LOG.warn("Unable to create cluster ID file in " + rootdir.toString() +
905               ", retrying in " + wait + "msec: " + StringUtils.stringifyException(ioe));
906           try {
907             Thread.sleep(wait);
908           } catch (InterruptedException e) {
909             throw (InterruptedIOException)new InterruptedIOException().initCause(e);
910           }
911         } else {
912           throw ioe;
913         }
914       }
915     }
916   }
917 
918   /**
919    * Verifies root directory path is a valid URI with a scheme
920    *
921    * @param root root directory path
922    * @return Passed <code>root</code> argument.
923    * @throws IOException if not a valid URI with a scheme
924    */
925   public static Path validateRootPath(Path root) throws IOException {
926     try {
927       URI rootURI = new URI(root.toString());
928       String scheme = rootURI.getScheme();
929       if (scheme == null) {
930         throw new IOException("Root directory does not have a scheme");
931       }
932       return root;
933     } catch (URISyntaxException e) {
934       IOException io = new IOException("Root directory path is not a valid " +
935         "URI -- check your " + HConstants.HBASE_DIR + " configuration");
936       io.initCause(e);
937       throw io;
938     }
939   }
940 
941   /**
942    * Checks for the presence of the root path (using the provided conf object) in the given path. If
943    * it exists, this method removes it and returns the String representation of remaining relative path.
944    * @param path
945    * @param conf
946    * @return String representation of the remaining relative path
947    * @throws IOException
948    */
949   public static String removeRootPath(Path path, final Configuration conf) throws IOException {
950     Path root = FSUtils.getRootDir(conf);
951     String pathStr = path.toString();
952     // check that the path is absolute... it has the root path in it.
953     if (!pathStr.startsWith(root.toString())) return pathStr;
954     // if not, return as it is.
955     return pathStr.substring(root.toString().length() + 1);// remove the "/" too.
956   }
957 
958   /**
959    * If DFS, check safe mode and if so, wait until we clear it.
960    * @param conf configuration
961    * @param wait Sleep between retries
962    * @throws IOException e
963    */
964   public static void waitOnSafeMode(final Configuration conf,
965     final long wait)
966   throws IOException {
967     FileSystem fs = FileSystem.get(conf);
968     if (!(fs instanceof DistributedFileSystem)) return;
969     DistributedFileSystem dfs = (DistributedFileSystem)fs;
970     // Make sure dfs is not in safe mode
971     while (isInSafeMode(dfs)) {
972       LOG.info("Waiting for dfs to exit safe mode...");
973       try {
974         Thread.sleep(wait);
975       } catch (InterruptedException e) {
976         throw (InterruptedIOException)new InterruptedIOException().initCause(e);
977       }
978     }
979   }
980 
981   /**
982    * Return the 'path' component of a Path.  In Hadoop, Path is an URI.  This
983    * method returns the 'path' component of a Path's URI: e.g. If a Path is
984    * <code>hdfs://example.org:9000/hbase_trunk/TestTable/compaction.dir</code>,
985    * this method returns <code>/hbase_trunk/TestTable/compaction.dir</code>.
986    * This method is useful if you want to print out a Path without qualifying
987    * Filesystem instance.
988    * @param p Filesystem Path whose 'path' component we are to return.
989    * @return Path portion of the Filesystem
990    */
991   public static String getPath(Path p) {
992     return p.toUri().getPath();
993   }
994 
995   /**
996    * @param c configuration
997    * @return Path to hbase root directory: i.e. <code>hbase.rootdir</code> from
998    * configuration as a qualified Path.
999    * @throws IOException e
1000    */
1001   public static Path getRootDir(final Configuration c) throws IOException {
1002     Path p = new Path(c.get(HConstants.HBASE_DIR));
1003     FileSystem fs = p.getFileSystem(c);
1004     return p.makeQualified(fs);
1005   }
1006 
1007   public static void setRootDir(final Configuration c, final Path root) throws IOException {
1008     c.set(HConstants.HBASE_DIR, root.toString());
1009   }
1010 
1011   public static void setFsDefault(final Configuration c, final Path root) throws IOException {
1012     c.set("fs.defaultFS", root.toString());    // for hadoop 0.21+
1013   }
1014 
1015   /**
1016    * Checks if meta region exists
1017    *
1018    * @param fs file system
1019    * @param rootdir root directory of HBase installation
1020    * @return true if exists
1021    * @throws IOException e
1022    */
1023   @SuppressWarnings("deprecation")
1024   public static boolean metaRegionExists(FileSystem fs, Path rootdir)
1025   throws IOException {
1026     Path metaRegionDir =
1027       HRegion.getRegionDir(rootdir, HRegionInfo.FIRST_META_REGIONINFO);
1028     return fs.exists(metaRegionDir);
1029   }
1030 
1031   /**
1032    * Compute HDFS blocks distribution of a given file, or a portion of the file
1033    * @param fs file system
1034    * @param status file status of the file
1035    * @param start start position of the portion
1036    * @param length length of the portion
1037    * @return The HDFS blocks distribution
1038    */
1039   static public HDFSBlocksDistribution computeHDFSBlocksDistribution(
1040     final FileSystem fs, FileStatus status, long start, long length)
1041     throws IOException {
1042     HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution();
1043     BlockLocation [] blockLocations =
1044       fs.getFileBlockLocations(status, start, length);
1045     for(BlockLocation bl : blockLocations) {
1046       String [] hosts = bl.getHosts();
1047       long len = bl.getLength();
1048       blocksDistribution.addHostsAndBlockWeight(hosts, len);
1049     }
1050 
1051     return blocksDistribution;
1052   }
1053 
1054 
1055 
1056   /**
1057    * Runs through the hbase rootdir and checks all stores have only
1058    * one file in them -- that is, they've been major compacted.  Looks
1059    * at root and meta tables too.
1060    * @param fs filesystem
1061    * @param hbaseRootDir hbase root directory
1062    * @return True if this hbase install is major compacted.
1063    * @throws IOException e
1064    */
1065   public static boolean isMajorCompacted(final FileSystem fs,
1066       final Path hbaseRootDir)
1067   throws IOException {
1068     List<Path> tableDirs = getTableDirs(fs, hbaseRootDir);
1069     PathFilter regionFilter = new RegionDirFilter(fs);
1070     PathFilter familyFilter = new FamilyDirFilter(fs);
1071     for (Path d : tableDirs) {
1072       FileStatus[] regionDirs = fs.listStatus(d, regionFilter);
1073       for (FileStatus regionDir : regionDirs) {
1074         Path dd = regionDir.getPath();
1075         // Else its a region name.  Now look in region for families.
1076         FileStatus[] familyDirs = fs.listStatus(dd, familyFilter);
1077         for (FileStatus familyDir : familyDirs) {
1078           Path family = familyDir.getPath();
1079           // Now in family make sure only one file.
1080           FileStatus[] familyStatus = fs.listStatus(family);
1081           if (familyStatus.length > 1) {
1082             LOG.debug(family.toString() + " has " + familyStatus.length +
1083                 " files.");
1084             return false;
1085           }
1086         }
1087       }
1088     }
1089     return true;
1090   }
1091 
1092   // TODO move this method OUT of FSUtils. No dependencies to HMaster
1093   /**
1094    * Returns the total overall fragmentation percentage. Includes hbase:meta and
1095    * -ROOT- as well.
1096    *
1097    * @param master  The master defining the HBase root and file system.
1098    * @return A map for each table and its percentage.
1099    * @throws IOException When scanning the directory fails.
1100    */
1101   public static int getTotalTableFragmentation(final HMaster master)
1102   throws IOException {
1103     Map<String, Integer> map = getTableFragmentation(master);
1104     return map != null && map.size() > 0 ? map.get("-TOTAL-") : -1;
1105   }
1106 
1107   /**
1108    * Runs through the HBase rootdir and checks how many stores for each table
1109    * have more than one file in them. Checks -ROOT- and hbase:meta too. The total
1110    * percentage across all tables is stored under the special key "-TOTAL-".
1111    *
1112    * @param master  The master defining the HBase root and file system.
1113    * @return A map for each table and its percentage.
1114    *
1115    * @throws IOException When scanning the directory fails.
1116    */
1117   public static Map<String, Integer> getTableFragmentation(
1118     final HMaster master)
1119   throws IOException {
1120     Path path = getRootDir(master.getConfiguration());
1121     // since HMaster.getFileSystem() is package private
1122     FileSystem fs = path.getFileSystem(master.getConfiguration());
1123     return getTableFragmentation(fs, path);
1124   }
1125 
1126   /**
1127    * Runs through the HBase rootdir and checks how many stores for each table
1128    * have more than one file in them. Checks -ROOT- and hbase:meta too. The total
1129    * percentage across all tables is stored under the special key "-TOTAL-".
1130    *
1131    * @param fs  The file system to use.
1132    * @param hbaseRootDir  The root directory to scan.
1133    * @return A map for each table and its percentage.
1134    * @throws IOException When scanning the directory fails.
1135    */
1136   public static Map<String, Integer> getTableFragmentation(
1137     final FileSystem fs, final Path hbaseRootDir)
1138   throws IOException {
1139     Map<String, Integer> frags = new HashMap<String, Integer>();
1140     int cfCountTotal = 0;
1141     int cfFragTotal = 0;
1142     PathFilter regionFilter = new RegionDirFilter(fs);
1143     PathFilter familyFilter = new FamilyDirFilter(fs);
1144     List<Path> tableDirs = getTableDirs(fs, hbaseRootDir);
1145     for (Path d : tableDirs) {
1146       int cfCount = 0;
1147       int cfFrag = 0;
1148       FileStatus[] regionDirs = fs.listStatus(d, regionFilter);
1149       for (FileStatus regionDir : regionDirs) {
1150         Path dd = regionDir.getPath();
1151         // else its a region name, now look in region for families
1152         FileStatus[] familyDirs = fs.listStatus(dd, familyFilter);
1153         for (FileStatus familyDir : familyDirs) {
1154           cfCount++;
1155           cfCountTotal++;
1156           Path family = familyDir.getPath();
1157           // now in family make sure only one file
1158           FileStatus[] familyStatus = fs.listStatus(family);
1159           if (familyStatus.length > 1) {
1160             cfFrag++;
1161             cfFragTotal++;
1162           }
1163         }
1164       }
1165       // compute percentage per table and store in result list
1166       frags.put(FSUtils.getTableName(d).getNameAsString(),
1167         cfCount == 0? 0: Math.round((float) cfFrag / cfCount * 100));
1168     }
1169     // set overall percentage for all tables
1170     frags.put("-TOTAL-",
1171       cfCountTotal == 0? 0: Math.round((float) cfFragTotal / cfCountTotal * 100));
1172     return frags;
1173   }
1174 
1175   /**
1176    * Returns the {@link org.apache.hadoop.fs.Path} object representing the table directory under
1177    * path rootdir
1178    *
1179    * @param rootdir qualified path of HBase root directory
1180    * @param tableName name of table
1181    * @return {@link org.apache.hadoop.fs.Path} for table
1182    */
1183   public static Path getTableDir(Path rootdir, final TableName tableName) {
1184     return new Path(getNamespaceDir(rootdir, tableName.getNamespaceAsString()),
1185         tableName.getQualifierAsString());
1186   }
1187 
1188   /**
1189    * Returns the {@link org.apache.hadoop.hbase.TableName} object representing
1190    * the table directory under
1191    * path rootdir
1192    *
1193    * @param tablePath path of table
1194    * @return {@link org.apache.hadoop.fs.Path} for table
1195    */
1196   public static TableName getTableName(Path tablePath) {
1197     return TableName.valueOf(tablePath.getParent().getName(), tablePath.getName());
1198   }
1199 
1200   /**
1201    * Returns the {@link org.apache.hadoop.fs.Path} object representing
1202    * the namespace directory under path rootdir
1203    *
1204    * @param rootdir qualified path of HBase root directory
1205    * @param namespace namespace name
1206    * @return {@link org.apache.hadoop.fs.Path} for table
1207    */
1208   public static Path getNamespaceDir(Path rootdir, final String namespace) {
1209     return new Path(rootdir, new Path(HConstants.BASE_NAMESPACE_DIR,
1210         new Path(namespace)));
1211   }
1212 
1213   /**
1214    * A {@link PathFilter} that returns only regular files.
1215    */
1216   static class FileFilter implements PathFilter {
1217     private final FileSystem fs;
1218 
1219     public FileFilter(final FileSystem fs) {
1220       this.fs = fs;
1221     }
1222 
1223     @Override
1224     public boolean accept(Path p) {
1225       try {
1226         return fs.isFile(p);
1227       } catch (IOException e) {
1228         LOG.debug("unable to verify if path=" + p + " is a regular file", e);
1229         return false;
1230       }
1231     }
1232   }
1233 
1234   /**
1235    * Directory filter that doesn't include any of the directories in the specified blacklist
1236    */
1237   public static class BlackListDirFilter implements PathFilter {
1238     private final FileSystem fs;
1239     private List<String> blacklist;
1240 
1241     /**
1242      * Create a filter on the givem filesystem with the specified blacklist
1243      * @param fs filesystem to filter
1244      * @param directoryNameBlackList list of the names of the directories to filter. If
1245      *          <tt>null</tt>, all directories are returned
1246      */
1247     @SuppressWarnings("unchecked")
1248     public BlackListDirFilter(final FileSystem fs, final List<String> directoryNameBlackList) {
1249       this.fs = fs;
1250       blacklist =
1251         (List<String>) (directoryNameBlackList == null ? Collections.emptyList()
1252           : directoryNameBlackList);
1253     }
1254 
1255     @Override
1256     public boolean accept(Path p) {
1257       boolean isValid = false;
1258       try {
1259         if (isValidName(p.getName())) {
1260           isValid = fs.getFileStatus(p).isDirectory();
1261         } else {
1262           isValid = false;
1263         }
1264       } catch (IOException e) {
1265         LOG.warn("An error occurred while verifying if [" + p.toString()
1266             + "] is a valid directory. Returning 'not valid' and continuing.", e);
1267       }
1268       return isValid;
1269     }
1270 
1271     protected boolean isValidName(final String name) {
1272       return !blacklist.contains(name);
1273     }
1274   }
1275 
1276   /**
1277    * A {@link PathFilter} that only allows directories.
1278    */
1279   public static class DirFilter extends BlackListDirFilter {
1280 
1281     public DirFilter(FileSystem fs) {
1282       super(fs, null);
1283     }
1284   }
1285 
1286   /**
1287    * A {@link PathFilter} that returns usertable directories. To get all directories use the
1288    * {@link BlackListDirFilter} with a <tt>null</tt> blacklist
1289    */
1290   public static class UserTableDirFilter extends BlackListDirFilter {
1291     public UserTableDirFilter(FileSystem fs) {
1292       super(fs, HConstants.HBASE_NON_TABLE_DIRS);
1293     }
1294 
1295     protected boolean isValidName(final String name) {
1296       if (!super.isValidName(name))
1297         return false;
1298 
1299       try {
1300         TableName.isLegalTableQualifierName(Bytes.toBytes(name));
1301       } catch (IllegalArgumentException e) {
1302         LOG.info("INVALID NAME " + name);
1303         return false;
1304       }
1305       return true;
1306     }
1307   }
1308 
1309   /**
1310    * Heuristic to determine whether is safe or not to open a file for append
1311    * Looks both for dfs.support.append and use reflection to search
1312    * for SequenceFile.Writer.syncFs() or FSDataOutputStream.hflush()
1313    * @param conf
1314    * @return True if append support
1315    */
1316   public static boolean isAppendSupported(final Configuration conf) {
1317     boolean append = conf.getBoolean("dfs.support.append", false);
1318     if (append) {
1319       try {
1320         // TODO: The implementation that comes back when we do a createWriter
1321         // may not be using SequenceFile so the below is not a definitive test.
1322         // Will do for now (hdfs-200).
1323         SequenceFile.Writer.class.getMethod("syncFs", new Class<?> []{});
1324         append = true;
1325       } catch (SecurityException e) {
1326       } catch (NoSuchMethodException e) {
1327         append = false;
1328       }
1329     }
1330     if (!append) {
1331       // Look for the 0.21, 0.22, new-style append evidence.
1332       try {
1333         FSDataOutputStream.class.getMethod("hflush", new Class<?> []{});
1334         append = true;
1335       } catch (NoSuchMethodException e) {
1336         append = false;
1337       }
1338     }
1339     return append;
1340   }
1341 
1342   /**
1343    * @param conf
1344    * @return True if this filesystem whose scheme is 'hdfs'.
1345    * @throws IOException
1346    */
1347   public static boolean isHDFS(final Configuration conf) throws IOException {
1348     FileSystem fs = FileSystem.get(conf);
1349     String scheme = fs.getUri().getScheme();
1350     return scheme.equalsIgnoreCase("hdfs");
1351   }
1352 
1353   /**
1354    * Recover file lease. Used when a file might be suspect
1355    * to be had been left open by another process.
1356    * @param fs FileSystem handle
1357    * @param p Path of file to recover lease
1358    * @param conf Configuration handle
1359    * @throws IOException
1360    */
1361   public abstract void recoverFileLease(final FileSystem fs, final Path p,
1362       Configuration conf, CancelableProgressable reporter) throws IOException;
1363 
1364   public static List<Path> getTableDirs(final FileSystem fs, final Path rootdir)
1365       throws IOException {
1366     List<Path> tableDirs = new LinkedList<Path>();
1367 
1368     for(FileStatus status :
1369         fs.globStatus(new Path(rootdir,
1370             new Path(HConstants.BASE_NAMESPACE_DIR, "*")))) {
1371       tableDirs.addAll(FSUtils.getLocalTableDirs(fs, status.getPath()));
1372     }
1373     return tableDirs;
1374   }
1375 
1376   /**
1377    * @param fs
1378    * @param rootdir
1379    * @return All the table directories under <code>rootdir</code>. Ignore non table hbase folders such as
1380    * .logs, .oldlogs, .corrupt folders.
1381    * @throws IOException
1382    */
1383   public static List<Path> getLocalTableDirs(final FileSystem fs, final Path rootdir)
1384       throws IOException {
1385     // presumes any directory under hbase.rootdir is a table
1386     FileStatus[] dirs = fs.listStatus(rootdir, new UserTableDirFilter(fs));
1387     List<Path> tabledirs = new ArrayList<Path>(dirs.length);
1388     for (FileStatus dir: dirs) {
1389       tabledirs.add(dir.getPath());
1390     }
1391     return tabledirs;
1392   }
1393 
1394   /**
1395    * Checks if the given path is the one with 'recovered.edits' dir.
1396    * @param path
1397    * @return True if we recovered edits
1398    */
1399   public static boolean isRecoveredEdits(Path path) {
1400     return path.toString().contains(HConstants.RECOVERED_EDITS_DIR);
1401   }
1402 
1403   /**
1404    * Filter for all dirs that don't start with '.'
1405    */
1406   public static class RegionDirFilter implements PathFilter {
1407     // This pattern will accept 0.90+ style hex region dirs and older numeric region dir names.
1408     final public static Pattern regionDirPattern = Pattern.compile("^[0-9a-f]*$");
1409     final FileSystem fs;
1410 
1411     public RegionDirFilter(FileSystem fs) {
1412       this.fs = fs;
1413     }
1414 
1415     @Override
1416     public boolean accept(Path rd) {
1417       if (!regionDirPattern.matcher(rd.getName()).matches()) {
1418         return false;
1419       }
1420 
1421       try {
1422         return fs.getFileStatus(rd).isDirectory();
1423       } catch (IOException ioe) {
1424         // Maybe the file was moved or the fs was disconnected.
1425         LOG.warn("Skipping file " + rd +" due to IOException", ioe);
1426         return false;
1427       }
1428     }
1429   }
1430 
1431   /**
1432    * Given a particular table dir, return all the regiondirs inside it, excluding files such as
1433    * .tableinfo
1434    * @param fs A file system for the Path
1435    * @param tableDir Path to a specific table directory &lt;hbase.rootdir&gt;/&lt;tabledir&gt;
1436    * @return List of paths to valid region directories in table dir.
1437    * @throws IOException
1438    */
1439   public static List<Path> getRegionDirs(final FileSystem fs, final Path tableDir) throws IOException {
1440     // assumes we are in a table dir.
1441     FileStatus[] rds = fs.listStatus(tableDir, new RegionDirFilter(fs));
1442     List<Path> regionDirs = new ArrayList<Path>(rds.length);
1443     for (FileStatus rdfs: rds) {
1444       Path rdPath = rdfs.getPath();
1445       regionDirs.add(rdPath);
1446     }
1447     return regionDirs;
1448   }
1449 
1450   /**
1451    * Filter for all dirs that are legal column family names.  This is generally used for colfam
1452    * dirs &lt;hbase.rootdir&gt;/&lt;tabledir&gt;/&lt;regiondir&gt;/&lt;colfamdir&gt;.
1453    */
1454   public static class FamilyDirFilter implements PathFilter {
1455     final FileSystem fs;
1456 
1457     public FamilyDirFilter(FileSystem fs) {
1458       this.fs = fs;
1459     }
1460 
1461     @Override
1462     public boolean accept(Path rd) {
1463       try {
1464         // throws IAE if invalid
1465         HColumnDescriptor.isLegalFamilyName(Bytes.toBytes(rd.getName()));
1466       } catch (IllegalArgumentException iae) {
1467         // path name is an invalid family name and thus is excluded.
1468         return false;
1469       }
1470 
1471       try {
1472         return fs.getFileStatus(rd).isDirectory();
1473       } catch (IOException ioe) {
1474         // Maybe the file was moved or the fs was disconnected.
1475         LOG.warn("Skipping file " + rd +" due to IOException", ioe);
1476         return false;
1477       }
1478     }
1479   }
1480 
1481   /**
1482    * Given a particular region dir, return all the familydirs inside it
1483    *
1484    * @param fs A file system for the Path
1485    * @param regionDir Path to a specific region directory
1486    * @return List of paths to valid family directories in region dir.
1487    * @throws IOException
1488    */
1489   public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir) throws IOException {
1490     // assumes we are in a region dir.
1491     FileStatus[] fds = fs.listStatus(regionDir, new FamilyDirFilter(fs));
1492     List<Path> familyDirs = new ArrayList<Path>(fds.length);
1493     for (FileStatus fdfs: fds) {
1494       Path fdPath = fdfs.getPath();
1495       familyDirs.add(fdPath);
1496     }
1497     return familyDirs;
1498   }
1499 
1500   public static List<Path> getReferenceFilePaths(final FileSystem fs, final Path familyDir) throws IOException {
1501     FileStatus[] fds = fs.listStatus(familyDir, new ReferenceFileFilter(fs));
1502     List<Path> referenceFiles = new ArrayList<Path>(fds.length);
1503     for (FileStatus fdfs: fds) {
1504       Path fdPath = fdfs.getPath();
1505       referenceFiles.add(fdPath);
1506     }
1507     return referenceFiles;
1508   }
1509 
1510   /**
1511    * Filter for HFiles that excludes reference files.
1512    */
1513   public static class HFileFilter implements PathFilter {
1514     final FileSystem fs;
1515 
1516     public HFileFilter(FileSystem fs) {
1517       this.fs = fs;
1518     }
1519 
1520     @Override
1521     public boolean accept(Path rd) {
1522       try {
1523         // only files
1524         return !fs.getFileStatus(rd).isDirectory() && StoreFileInfo.isHFile(rd);
1525       } catch (IOException ioe) {
1526         // Maybe the file was moved or the fs was disconnected.
1527         LOG.warn("Skipping file " + rd +" due to IOException", ioe);
1528         return false;
1529       }
1530     }
1531   }
1532 
1533   public static class ReferenceFileFilter implements PathFilter {
1534 
1535     private final FileSystem fs;
1536 
1537     public ReferenceFileFilter(FileSystem fs) {
1538       this.fs = fs;
1539     }
1540 
1541     @Override
1542     public boolean accept(Path rd) {
1543       try {
1544         // only files can be references.
1545         return !fs.getFileStatus(rd).isDirectory() && StoreFileInfo.isReference(rd);
1546       } catch (IOException ioe) {
1547         // Maybe the file was moved or the fs was disconnected.
1548         LOG.warn("Skipping file " + rd +" due to IOException", ioe);
1549         return false;
1550       }
1551     }
1552   }
1553 
1554 
1555   /**
1556    * @param conf
1557    * @return Returns the filesystem of the hbase rootdir.
1558    * @throws IOException
1559    */
1560   public static FileSystem getCurrentFileSystem(Configuration conf)
1561   throws IOException {
1562     return getRootDir(conf).getFileSystem(conf);
1563   }
1564 
1565 
1566   /**
1567    * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1568    * table StoreFile names to the full Path.
1569    * <br>
1570    * Example...<br>
1571    * Key = 3944417774205889744  <br>
1572    * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1573    *
1574    * @param map map to add values.  If null, this method will create and populate one to return
1575    * @param fs  The file system to use.
1576    * @param hbaseRootDir  The root directory to scan.
1577    * @param tableName name of the table to scan.
1578    * @return Map keyed by StoreFile name with a value of the full Path.
1579    * @throws IOException When scanning the directory fails.
1580    */
1581   public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map,
1582   final FileSystem fs, final Path hbaseRootDir, TableName tableName)
1583   throws IOException {
1584     return getTableStoreFilePathMap(map, fs, hbaseRootDir, tableName, null);
1585   }
1586 
1587   /**
1588    * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1589    * table StoreFile names to the full Path.
1590    * <br>
1591    * Example...<br>
1592    * Key = 3944417774205889744  <br>
1593    * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1594    *
1595    * @param map map to add values.  If null, this method will create and populate one to return
1596    * @param fs  The file system to use.
1597    * @param hbaseRootDir  The root directory to scan.
1598    * @param tableName name of the table to scan.
1599    * @param errors ErrorReporter instance or null
1600    * @return Map keyed by StoreFile name with a value of the full Path.
1601    * @throws IOException When scanning the directory fails.
1602    */
1603   public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map,
1604   final FileSystem fs, final Path hbaseRootDir, TableName tableName, ErrorReporter errors)
1605   throws IOException {
1606     if (map == null) {
1607       map = new HashMap<String, Path>();
1608     }
1609 
1610     // only include the directory paths to tables
1611     Path tableDir = FSUtils.getTableDir(hbaseRootDir, tableName);
1612     // Inside a table, there are compaction.dir directories to skip.  Otherwise, all else
1613     // should be regions.
1614     PathFilter familyFilter = new FamilyDirFilter(fs);
1615     FileStatus[] regionDirs = fs.listStatus(tableDir, new RegionDirFilter(fs));
1616     for (FileStatus regionDir : regionDirs) {
1617       if (null != errors) {
1618         errors.progress();
1619       }
1620       Path dd = regionDir.getPath();
1621       // else its a region name, now look in region for families
1622       FileStatus[] familyDirs = fs.listStatus(dd, familyFilter);
1623       for (FileStatus familyDir : familyDirs) {
1624         if (null != errors) {
1625           errors.progress();
1626         }
1627         Path family = familyDir.getPath();
1628         if (family.getName().equals(HConstants.RECOVERED_EDITS_DIR)) {
1629           continue;
1630         }
1631         // now in family, iterate over the StoreFiles and
1632         // put in map
1633         FileStatus[] familyStatus = fs.listStatus(family);
1634         for (FileStatus sfStatus : familyStatus) {
1635           if (null != errors) {
1636             errors.progress();
1637           }
1638           Path sf = sfStatus.getPath();
1639           map.put( sf.getName(), sf);
1640         }
1641       }
1642     }
1643     return map;
1644   }
1645 
1646   public static int getRegionReferenceFileCount(final FileSystem fs, final Path p) {
1647     int result = 0;
1648     try {
1649       for (Path familyDir:getFamilyDirs(fs, p)){
1650         result += getReferenceFilePaths(fs, familyDir).size();
1651       }
1652     } catch (IOException e) {
1653       LOG.warn("Error Counting reference files.", e);
1654     }
1655     return result;
1656   }
1657 
1658   /**
1659    * Runs through the HBase rootdir and creates a reverse lookup map for
1660    * table StoreFile names to the full Path.
1661    * <br>
1662    * Example...<br>
1663    * Key = 3944417774205889744  <br>
1664    * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1665    *
1666    * @param fs  The file system to use.
1667    * @param hbaseRootDir  The root directory to scan.
1668    * @return Map keyed by StoreFile name with a value of the full Path.
1669    * @throws IOException When scanning the directory fails.
1670    */
1671   public static Map<String, Path> getTableStoreFilePathMap(
1672     final FileSystem fs, final Path hbaseRootDir)
1673   throws IOException {
1674     return getTableStoreFilePathMap(fs, hbaseRootDir, null);
1675   }
1676 
1677   /**
1678    * Runs through the HBase rootdir and creates a reverse lookup map for
1679    * table StoreFile names to the full Path.
1680    * <br>
1681    * Example...<br>
1682    * Key = 3944417774205889744  <br>
1683    * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1684    *
1685    * @param fs  The file system to use.
1686    * @param hbaseRootDir  The root directory to scan.
1687    * @param errors ErrorReporter instance or null
1688    * @return Map keyed by StoreFile name with a value of the full Path.
1689    * @throws IOException When scanning the directory fails.
1690    */
1691   public static Map<String, Path> getTableStoreFilePathMap(
1692     final FileSystem fs, final Path hbaseRootDir, ErrorReporter errors)
1693   throws IOException {
1694     Map<String, Path> map = new HashMap<String, Path>();
1695 
1696     // if this method looks similar to 'getTableFragmentation' that is because
1697     // it was borrowed from it.
1698 
1699     // only include the directory paths to tables
1700     for (Path tableDir : FSUtils.getTableDirs(fs, hbaseRootDir)) {
1701       getTableStoreFilePathMap(map, fs, hbaseRootDir,
1702           FSUtils.getTableName(tableDir), errors);
1703     }
1704     return map;
1705   }
1706 
1707   /**
1708    * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
1709    * This accommodates differences between hadoop versions, where hadoop 1
1710    * does not throw a FileNotFoundException, and return an empty FileStatus[]
1711    * while Hadoop 2 will throw FileNotFoundException.
1712    *
1713    * @param fs file system
1714    * @param dir directory
1715    * @param filter path filter
1716    * @return null if dir is empty or doesn't exist, otherwise FileStatus array
1717    */
1718   public static FileStatus [] listStatus(final FileSystem fs,
1719       final Path dir, final PathFilter filter) throws IOException {
1720     FileStatus [] status = null;
1721     try {
1722       status = filter == null ? fs.listStatus(dir) : fs.listStatus(dir, filter);
1723     } catch (FileNotFoundException fnfe) {
1724       // if directory doesn't exist, return null
1725       if (LOG.isTraceEnabled()) {
1726         LOG.trace(dir + " doesn't exist");
1727       }
1728     }
1729     if (status == null || status.length < 1) return null;
1730     return status;
1731   }
1732 
1733   /**
1734    * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
1735    * This would accommodates differences between hadoop versions
1736    *
1737    * @param fs file system
1738    * @param dir directory
1739    * @return null if dir is empty or doesn't exist, otherwise FileStatus array
1740    */
1741   public static FileStatus[] listStatus(final FileSystem fs, final Path dir) throws IOException {
1742     return listStatus(fs, dir, null);
1743   }
1744 
1745   /**
1746    * Calls fs.delete() and returns the value returned by the fs.delete()
1747    *
1748    * @param fs
1749    * @param path
1750    * @param recursive
1751    * @return the value returned by the fs.delete()
1752    * @throws IOException
1753    */
1754   public static boolean delete(final FileSystem fs, final Path path, final boolean recursive)
1755       throws IOException {
1756     return fs.delete(path, recursive);
1757   }
1758 
1759   /**
1760    * Calls fs.exists(). Checks if the specified path exists
1761    *
1762    * @param fs
1763    * @param path
1764    * @return the value returned by fs.exists()
1765    * @throws IOException
1766    */
1767   public static boolean isExists(final FileSystem fs, final Path path) throws IOException {
1768     return fs.exists(path);
1769   }
1770 
1771   /**
1772    * Throw an exception if an action is not permitted by a user on a file.
1773    *
1774    * @param ugi
1775    *          the user
1776    * @param file
1777    *          the file
1778    * @param action
1779    *          the action
1780    */
1781   public static void checkAccess(UserGroupInformation ugi, FileStatus file,
1782       FsAction action) throws AccessDeniedException {
1783     if (ugi.getShortUserName().equals(file.getOwner())) {
1784       if (file.getPermission().getUserAction().implies(action)) {
1785         return;
1786       }
1787     } else if (contains(ugi.getGroupNames(), file.getGroup())) {
1788       if (file.getPermission().getGroupAction().implies(action)) {
1789         return;
1790       }
1791     } else if (file.getPermission().getOtherAction().implies(action)) {
1792       return;
1793     }
1794     throw new AccessDeniedException("Permission denied:" + " action=" + action
1795         + " path=" + file.getPath() + " user=" + ugi.getShortUserName());
1796   }
1797 
1798   private static boolean contains(String[] groups, String user) {
1799     for (String group : groups) {
1800       if (group.equals(user)) {
1801         return true;
1802       }
1803     }
1804     return false;
1805   }
1806 
1807   /**
1808    * Log the current state of the filesystem from a certain root directory
1809    * @param fs filesystem to investigate
1810    * @param root root file/directory to start logging from
1811    * @param LOG log to output information
1812    * @throws IOException if an unexpected exception occurs
1813    */
1814   public static void logFileSystemState(final FileSystem fs, final Path root, Log LOG)
1815       throws IOException {
1816     LOG.debug("Current file system:");
1817     logFSTree(LOG, fs, root, "|-");
1818   }
1819 
1820   /**
1821    * Recursive helper to log the state of the FS
1822    *
1823    * @see #logFileSystemState(FileSystem, Path, Log)
1824    */
1825   private static void logFSTree(Log LOG, final FileSystem fs, final Path root, String prefix)
1826       throws IOException {
1827     FileStatus[] files = FSUtils.listStatus(fs, root, null);
1828     if (files == null) return;
1829 
1830     for (FileStatus file : files) {
1831       if (file.isDirectory()) {
1832         LOG.debug(prefix + file.getPath().getName() + "/");
1833         logFSTree(LOG, fs, file.getPath(), prefix + "---");
1834       } else {
1835         LOG.debug(prefix + file.getPath().getName());
1836       }
1837     }
1838   }
1839 
1840   public static boolean renameAndSetModifyTime(final FileSystem fs, final Path src, final Path dest)
1841       throws IOException {
1842     // set the modify time for TimeToLive Cleaner
1843     fs.setTimes(src, EnvironmentEdgeManager.currentTime(), -1);
1844     return fs.rename(src, dest);
1845   }
1846 
1847   /**
1848    * This function is to scan the root path of the file system to get the
1849    * degree of locality for each region on each of the servers having at least
1850    * one block of that region.
1851    * This is used by the tool {@link org.apache.hadoop.hbase.master.RegionPlacementMaintainer}
1852    *
1853    * @param conf
1854    *          the configuration to use
1855    * @return the mapping from region encoded name to a map of server names to
1856    *           locality fraction
1857    * @throws IOException
1858    *           in case of file system errors or interrupts
1859    */
1860   public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
1861       final Configuration conf) throws IOException {
1862     return getRegionDegreeLocalityMappingFromFS(
1863         conf, null,
1864         conf.getInt(THREAD_POOLSIZE, DEFAULT_THREAD_POOLSIZE));
1865 
1866   }
1867 
1868   /**
1869    * This function is to scan the root path of the file system to get the
1870    * degree of locality for each region on each of the servers having at least
1871    * one block of that region.
1872    *
1873    * @param conf
1874    *          the configuration to use
1875    * @param desiredTable
1876    *          the table you wish to scan locality for
1877    * @param threadPoolSize
1878    *          the thread pool size to use
1879    * @return the mapping from region encoded name to a map of server names to
1880    *           locality fraction
1881    * @throws IOException
1882    *           in case of file system errors or interrupts
1883    */
1884   public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
1885       final Configuration conf, final String desiredTable, int threadPoolSize)
1886       throws IOException {
1887     Map<String, Map<String, Float>> regionDegreeLocalityMapping =
1888         new ConcurrentHashMap<String, Map<String, Float>>();
1889     getRegionLocalityMappingFromFS(conf, desiredTable, threadPoolSize, null,
1890         regionDegreeLocalityMapping);
1891     return regionDegreeLocalityMapping;
1892   }
1893 
1894   /**
1895    * This function is to scan the root path of the file system to get either the
1896    * mapping between the region name and its best locality region server or the
1897    * degree of locality of each region on each of the servers having at least
1898    * one block of that region. The output map parameters are both optional.
1899    *
1900    * @param conf
1901    *          the configuration to use
1902    * @param desiredTable
1903    *          the table you wish to scan locality for
1904    * @param threadPoolSize
1905    *          the thread pool size to use
1906    * @param regionToBestLocalityRSMapping
1907    *          the map into which to put the best locality mapping or null
1908    * @param regionDegreeLocalityMapping
1909    *          the map into which to put the locality degree mapping or null,
1910    *          must be a thread-safe implementation
1911    * @throws IOException
1912    *           in case of file system errors or interrupts
1913    */
1914   private static void getRegionLocalityMappingFromFS(
1915       final Configuration conf, final String desiredTable,
1916       int threadPoolSize,
1917       Map<String, String> regionToBestLocalityRSMapping,
1918       Map<String, Map<String, Float>> regionDegreeLocalityMapping)
1919       throws IOException {
1920     FileSystem fs =  FileSystem.get(conf);
1921     Path rootPath = FSUtils.getRootDir(conf);
1922     long startTime = EnvironmentEdgeManager.currentTime();
1923     Path queryPath;
1924     // The table files are in ${hbase.rootdir}/data/<namespace>/<table>/*
1925     if (null == desiredTable) {
1926       queryPath = new Path(new Path(rootPath, HConstants.BASE_NAMESPACE_DIR).toString() + "/*/*/*/");
1927     } else {
1928       queryPath = new Path(FSUtils.getTableDir(rootPath, TableName.valueOf(desiredTable)).toString() + "/*/");
1929     }
1930 
1931     // reject all paths that are not appropriate
1932     PathFilter pathFilter = new PathFilter() {
1933       @Override
1934       public boolean accept(Path path) {
1935         // this is the region name; it may get some noise data
1936         if (null == path) {
1937           return false;
1938         }
1939 
1940         // no parent?
1941         Path parent = path.getParent();
1942         if (null == parent) {
1943           return false;
1944         }
1945 
1946         String regionName = path.getName();
1947         if (null == regionName) {
1948           return false;
1949         }
1950 
1951         if (!regionName.toLowerCase().matches("[0-9a-f]+")) {
1952           return false;
1953         }
1954         return true;
1955       }
1956     };
1957 
1958     FileStatus[] statusList = fs.globStatus(queryPath, pathFilter);
1959 
1960     if (null == statusList) {
1961       return;
1962     } else {
1963       LOG.debug("Query Path: " + queryPath + " ; # list of files: " +
1964           statusList.length);
1965     }
1966 
1967     // lower the number of threads in case we have very few expected regions
1968     threadPoolSize = Math.min(threadPoolSize, statusList.length);
1969 
1970     // run in multiple threads
1971     ThreadPoolExecutor tpe = new ThreadPoolExecutor(threadPoolSize,
1972         threadPoolSize, 60, TimeUnit.SECONDS,
1973         new ArrayBlockingQueue<Runnable>(statusList.length));
1974     try {
1975       // ignore all file status items that are not of interest
1976       for (FileStatus regionStatus : statusList) {
1977         if (null == regionStatus) {
1978           continue;
1979         }
1980 
1981         if (!regionStatus.isDirectory()) {
1982           continue;
1983         }
1984 
1985         Path regionPath = regionStatus.getPath();
1986         if (null == regionPath) {
1987           continue;
1988         }
1989 
1990         tpe.execute(new FSRegionScanner(fs, regionPath,
1991             regionToBestLocalityRSMapping, regionDegreeLocalityMapping));
1992       }
1993     } finally {
1994       tpe.shutdown();
1995       int threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY,
1996           60 * 1000);
1997       try {
1998         // here we wait until TPE terminates, which is either naturally or by
1999         // exceptions in the execution of the threads
2000         while (!tpe.awaitTermination(threadWakeFrequency,
2001             TimeUnit.MILLISECONDS)) {
2002           // printing out rough estimate, so as to not introduce
2003           // AtomicInteger
2004           LOG.info("Locality checking is underway: { Scanned Regions : "
2005               + tpe.getCompletedTaskCount() + "/"
2006               + tpe.getTaskCount() + " }");
2007         }
2008       } catch (InterruptedException e) {
2009         throw (InterruptedIOException)new InterruptedIOException().initCause(e);
2010       }
2011     }
2012 
2013     long overhead = EnvironmentEdgeManager.currentTime() - startTime;
2014     String overheadMsg = "Scan DFS for locality info takes " + overhead + " ms";
2015 
2016     LOG.info(overheadMsg);
2017   }
2018 
2019   /**
2020    * Do our short circuit read setup.
2021    * Checks buffer size to use and whether to do checksumming in hbase or hdfs.
2022    * @param conf
2023    */
2024   public static void setupShortCircuitRead(final Configuration conf) {
2025     // Check that the user has not set the "dfs.client.read.shortcircuit.skip.checksum" property.
2026     boolean shortCircuitSkipChecksum =
2027       conf.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false);
2028     boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
2029     if (shortCircuitSkipChecksum) {
2030       LOG.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not " +
2031         "be set to true." + (useHBaseChecksum ? " HBase checksum doesn't require " +
2032         "it, see https://issues.apache.org/jira/browse/HBASE-6868." : ""));
2033       assert !shortCircuitSkipChecksum; //this will fail if assertions are on
2034     }
2035     checkShortCircuitReadBufferSize(conf);
2036   }
2037 
2038   /**
2039    * Check if short circuit read buffer size is set and if not, set it to hbase value.
2040    * @param conf
2041    */
2042   public static void checkShortCircuitReadBufferSize(final Configuration conf) {
2043     final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2;
2044     final int notSet = -1;
2045     // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2
2046     final String dfsKey = "dfs.client.read.shortcircuit.buffer.size";
2047     int size = conf.getInt(dfsKey, notSet);
2048     // If a size is set, return -- we will use it.
2049     if (size != notSet) return;
2050     // But short circuit buffer size is normally not set.  Put in place the hbase wanted size.
2051     int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize);
2052     conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
2053   }
2054 }