View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.util;
20  
21  import java.io.ByteArrayInputStream;
22  import java.io.DataInputStream;
23  import java.io.EOFException;
24  import java.io.FileNotFoundException;
25  import java.io.IOException;
26  import java.io.InputStream;
27  import java.io.InterruptedIOException;
28  import java.lang.reflect.InvocationTargetException;
29  import java.lang.reflect.Method;
30  import java.net.InetSocketAddress;
31  import java.net.URI;
32  import java.net.URISyntaxException;
33  import java.util.ArrayList;
34  import java.util.Collections;
35  import java.util.HashMap;
36  import java.util.LinkedList;
37  import java.util.List;
38  import java.util.Map;
39  import java.util.concurrent.ArrayBlockingQueue;
40  import java.util.concurrent.ConcurrentHashMap;
41  import java.util.concurrent.ThreadPoolExecutor;
42  import java.util.concurrent.TimeUnit;
43  import java.util.regex.Pattern;
44  
45  import org.apache.commons.logging.Log;
46  import org.apache.commons.logging.LogFactory;
47  import org.apache.hadoop.hbase.classification.InterfaceAudience;
48  import org.apache.hadoop.HadoopIllegalArgumentException;
49  import org.apache.hadoop.conf.Configuration;
50  import org.apache.hadoop.fs.BlockLocation;
51  import org.apache.hadoop.fs.FSDataInputStream;
52  import org.apache.hadoop.fs.FSDataOutputStream;
53  import org.apache.hadoop.fs.FileStatus;
54  import org.apache.hadoop.fs.FileSystem;
55  import org.apache.hadoop.fs.Path;
56  import org.apache.hadoop.fs.PathFilter;
57  import org.apache.hadoop.fs.permission.FsAction;
58  import org.apache.hadoop.fs.permission.FsPermission;
59  import org.apache.hadoop.hbase.ClusterId;
60  import org.apache.hadoop.hbase.HColumnDescriptor;
61  import org.apache.hadoop.hbase.HConstants;
62  import org.apache.hadoop.hbase.HDFSBlocksDistribution;
63  import org.apache.hadoop.hbase.HRegionInfo;
64  import org.apache.hadoop.hbase.TableName;
65  import org.apache.hadoop.hbase.exceptions.DeserializationException;
66  import org.apache.hadoop.hbase.fs.HFileSystem;
67  import org.apache.hadoop.hbase.master.HMaster;
68  import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
69  import org.apache.hadoop.hbase.security.AccessDeniedException;
70  import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter;
71  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
72  import org.apache.hadoop.hbase.protobuf.generated.FSProtos;
73  import org.apache.hadoop.hbase.regionserver.HRegion;
74  import org.apache.hadoop.hdfs.DFSClient;
75  import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
76  import org.apache.hadoop.hdfs.DistributedFileSystem;
77  import org.apache.hadoop.io.IOUtils;
78  import org.apache.hadoop.io.SequenceFile;
79  import org.apache.hadoop.ipc.RemoteException;
80  import org.apache.hadoop.security.UserGroupInformation;
81  import org.apache.hadoop.util.Progressable;
82  import org.apache.hadoop.util.ReflectionUtils;
83  import org.apache.hadoop.util.StringUtils;
84  
85  import com.google.common.primitives.Ints;
86  import com.google.protobuf.InvalidProtocolBufferException;
87  
88  /**
89   * Utility methods for interacting with the underlying file system.
90   */
91  @InterfaceAudience.Private
92  public abstract class FSUtils {
93    private static final Log LOG = LogFactory.getLog(FSUtils.class);
94  
95    /** Full access permissions (starting point for a umask) */
96    public static final String FULL_RWX_PERMISSIONS = "777";
97    private static final String THREAD_POOLSIZE = "hbase.client.localityCheck.threadPoolSize";
98    private static final int DEFAULT_THREAD_POOLSIZE = 2;
99  
100   /** Set to true on Windows platforms */
101   public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
102 
103   protected FSUtils() {
104     super();
105   }
106 
107   /**
108    * Sets storage policy for given path according to config setting.
109    * If the passed path is a directory, we'll set the storage policy for all files
110    * created in the future in said directory. Note that this change in storage
111    * policy takes place at the HDFS level; it will persist beyond this RS's lifecycle.
112    * If we're running on a version of HDFS that doesn't support the given storage policy
113    * (or storage policies at all), then we'll issue a log message and continue.
114    *
115    * See http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html
116    *
117    * @param fs We only do anything if an instance of DistributedFileSystem
118    * @param conf used to look up storage policy with given key; not modified.
119    * @param path the Path whose storage policy is to be set
120    * @param policyKey e.g. HConstants.WAL_STORAGE_POLICY
121    * @param defaultPolicy usually should be the policy NONE to delegate to HDFS
122    */
123   public static void setStoragePolicy(final FileSystem fs, final Configuration conf,
124       final Path path, final String policyKey, final String defaultPolicy) {
125     String storagePolicy = conf.get(policyKey, defaultPolicy).toUpperCase();
126     if (storagePolicy.equals(defaultPolicy)) {
127       if (LOG.isTraceEnabled()) {
128         LOG.trace("default policy of " + defaultPolicy + " requested, exiting early.");
129       }
130       return;
131     }
132     if (fs instanceof DistributedFileSystem) {
133       DistributedFileSystem dfs = (DistributedFileSystem)fs;
134       // Once our minimum supported Hadoop version is 2.6.0 we can remove reflection.
135       Class<? extends DistributedFileSystem> dfsClass = dfs.getClass();
136       Method m = null;
137       try {
138         m = dfsClass.getDeclaredMethod("setStoragePolicy",
139             new Class<?>[] { Path.class, String.class });
140         m.setAccessible(true);
141       } catch (NoSuchMethodException e) {
142         LOG.info("FileSystem doesn't support"
143             + " setStoragePolicy; --HDFS-6584 not available");
144       } catch (SecurityException e) {
145         LOG.info("Doesn't have access to setStoragePolicy on "
146             + "FileSystems --HDFS-6584 not available", e);
147         m = null; // could happen on setAccessible()
148       }
149       if (m != null) {
150         try {
151           m.invoke(dfs, path, storagePolicy);
152           LOG.info("set " + storagePolicy + " for " + path);
153         } catch (Exception e) {
154           // check for lack of HDFS-7228
155           boolean probablyBadPolicy = false;
156           if (e instanceof InvocationTargetException) {
157             final Throwable exception = e.getCause();
158             if (exception instanceof RemoteException &&
159                 HadoopIllegalArgumentException.class.getName().equals(
160                     ((RemoteException)exception).getClassName())) {
161               LOG.warn("Given storage policy, '" + storagePolicy + "', was rejected and probably " +
162                   "isn't a valid policy for the version of Hadoop you're running. I.e. if you're " +
163                   "trying to use SSD related policies then you're likely missing HDFS-7228. For " +
164                   "more information see the 'ArchivalStorage' docs for your Hadoop release.");
165               LOG.debug("More information about the invalid storage policy.", exception);
166               probablyBadPolicy = true;
167             }
168           }
169           if (!probablyBadPolicy) {
170             // This swallows FNFE, should we be throwing it? seems more likely to indicate dev
171             // misuse than a runtime problem with HDFS.
172             LOG.warn("Unable to set " + storagePolicy + " for " + path, e);
173           }
174         }
175       }
176     } else {
177       LOG.info("FileSystem isn't an instance of DistributedFileSystem; presuming it doesn't " +
178           "support setStoragePolicy.");
179     }
180   }
181 
182   /**
183    * Compare of path component. Does not consider schema; i.e. if schemas
184    * different but <code>path</code> starts with <code>rootPath</code>,
185    * then the function returns true
186    * @param rootPath
187    * @param path
188    * @return True if <code>path</code> starts with <code>rootPath</code>
189    */
190   public static boolean isStartingWithPath(final Path rootPath, final String path) {
191     String uriRootPath = rootPath.toUri().getPath();
192     String tailUriPath = (new Path(path)).toUri().getPath();
193     return tailUriPath.startsWith(uriRootPath);
194   }
195 
196   /**
197    * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
198    * '/a/b/c' part. Does not consider schema; i.e. if schemas different but path or subpath matches,
199    * the two will equate.
200    * @param pathToSearch Path we will be trying to match.
201    * @param pathTail
202    * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
203    */
204   public static boolean isMatchingTail(final Path pathToSearch, String pathTail) {
205     return isMatchingTail(pathToSearch, new Path(pathTail));
206   }
207 
208   /**
209    * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
210    * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true.  Does not consider
211    * schema; i.e. if schemas different but path or subpath matches, the two will equate.
212    * @param pathToSearch Path we will be trying to match.
213    * @param pathTail
214    * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
215    */
216   public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) {
217     if (pathToSearch.depth() != pathTail.depth()) return false;
218     Path tailPath = pathTail;
219     String tailName;
220     Path toSearch = pathToSearch;
221     String toSearchName;
222     boolean result = false;
223     do {
224       tailName = tailPath.getName();
225       if (tailName == null || tailName.length() <= 0) {
226         result = true;
227         break;
228       }
229       toSearchName = toSearch.getName();
230       if (toSearchName == null || toSearchName.length() <= 0) break;
231       // Move up a parent on each path for next go around.  Path doesn't let us go off the end.
232       tailPath = tailPath.getParent();
233       toSearch = toSearch.getParent();
234     } while(tailName.equals(toSearchName));
235     return result;
236   }
237 
238   public static FSUtils getInstance(FileSystem fs, Configuration conf) {
239     String scheme = fs.getUri().getScheme();
240     if (scheme == null) {
241       LOG.warn("Could not find scheme for uri " +
242           fs.getUri() + ", default to hdfs");
243       scheme = "hdfs";
244     }
245     Class<?> fsUtilsClass = conf.getClass("hbase.fsutil." +
246         scheme + ".impl", FSHDFSUtils.class); // Default to HDFS impl
247     FSUtils fsUtils = (FSUtils)ReflectionUtils.newInstance(fsUtilsClass, conf);
248     return fsUtils;
249   }
250 
251   /**
252    * Delete if exists.
253    * @param fs filesystem object
254    * @param dir directory to delete
255    * @return True if deleted <code>dir</code>
256    * @throws IOException e
257    */
258   public static boolean deleteDirectory(final FileSystem fs, final Path dir)
259   throws IOException {
260     return fs.exists(dir) && fs.delete(dir, true);
261   }
262 
263   /**
264    * Delete the region directory if exists.
265    * @param conf
266    * @param hri
267    * @return True if deleted the region directory.
268    * @throws IOException
269    */
270   public static boolean deleteRegionDir(final Configuration conf, final HRegionInfo hri)
271   throws IOException {
272     Path rootDir = getRootDir(conf);
273     FileSystem fs = rootDir.getFileSystem(conf);
274     return deleteDirectory(fs,
275       new Path(getTableDir(rootDir, hri.getTable()), hri.getEncodedName()));
276   }
277 
278   /**
279    * Return the number of bytes that large input files should be optimally
280    * be split into to minimize i/o time.
281    *
282    * use reflection to search for getDefaultBlockSize(Path f)
283    * if the method doesn't exist, fall back to using getDefaultBlockSize()
284    *
285    * @param fs filesystem object
286    * @return the default block size for the path's filesystem
287    * @throws IOException e
288    */
289   public static long getDefaultBlockSize(final FileSystem fs, final Path path) throws IOException {
290     Method m = null;
291     Class<? extends FileSystem> cls = fs.getClass();
292     try {
293       m = cls.getMethod("getDefaultBlockSize", new Class<?>[] { Path.class });
294     } catch (NoSuchMethodException e) {
295       LOG.info("FileSystem doesn't support getDefaultBlockSize");
296     } catch (SecurityException e) {
297       LOG.info("Doesn't have access to getDefaultBlockSize on FileSystems", e);
298       m = null; // could happen on setAccessible()
299     }
300     if (m == null) {
301       return fs.getDefaultBlockSize(path);
302     } else {
303       try {
304         Object ret = m.invoke(fs, path);
305         return ((Long)ret).longValue();
306       } catch (Exception e) {
307         throw new IOException(e);
308       }
309     }
310   }
311 
312   /*
313    * Get the default replication.
314    *
315    * use reflection to search for getDefaultReplication(Path f)
316    * if the method doesn't exist, fall back to using getDefaultReplication()
317    *
318    * @param fs filesystem object
319    * @param f path of file
320    * @return default replication for the path's filesystem
321    * @throws IOException e
322    */
323   public static short getDefaultReplication(final FileSystem fs, final Path path) throws IOException {
324     Method m = null;
325     Class<? extends FileSystem> cls = fs.getClass();
326     try {
327       m = cls.getMethod("getDefaultReplication", new Class<?>[] { Path.class });
328     } catch (NoSuchMethodException e) {
329       LOG.info("FileSystem doesn't support getDefaultReplication");
330     } catch (SecurityException e) {
331       LOG.info("Doesn't have access to getDefaultReplication on FileSystems", e);
332       m = null; // could happen on setAccessible()
333     }
334     if (m == null) {
335       return fs.getDefaultReplication(path);
336     } else {
337       try {
338         Object ret = m.invoke(fs, path);
339         return ((Number)ret).shortValue();
340       } catch (Exception e) {
341         throw new IOException(e);
342       }
343     }
344   }
345 
346   /**
347    * Returns the default buffer size to use during writes.
348    *
349    * The size of the buffer should probably be a multiple of hardware
350    * page size (4096 on Intel x86), and it determines how much data is
351    * buffered during read and write operations.
352    *
353    * @param fs filesystem object
354    * @return default buffer size to use during writes
355    */
356   public static int getDefaultBufferSize(final FileSystem fs) {
357     return fs.getConf().getInt("io.file.buffer.size", 4096);
358   }
359 
360   /**
361    * Create the specified file on the filesystem. By default, this will:
362    * <ol>
363    * <li>overwrite the file if it exists</li>
364    * <li>apply the umask in the configuration (if it is enabled)</li>
365    * <li>use the fs configured buffer size (or 4096 if not set)</li>
366    * <li>use the default replication</li>
367    * <li>use the default block size</li>
368    * <li>not track progress</li>
369    * </ol>
370    *
371    * @param fs {@link FileSystem} on which to write the file
372    * @param path {@link Path} to the file to write
373    * @param perm permissions
374    * @param favoredNodes
375    * @return output stream to the created file
376    * @throws IOException if the file cannot be created
377    */
378   public static FSDataOutputStream create(FileSystem fs, Path path,
379       FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException {
380     if (fs instanceof HFileSystem) {
381       FileSystem backingFs = ((HFileSystem)fs).getBackingFs();
382       if (backingFs instanceof DistributedFileSystem) {
383         // Try to use the favoredNodes version via reflection to allow backwards-
384         // compatibility.
385         try {
386           return (FSDataOutputStream) (DistributedFileSystem.class
387               .getDeclaredMethod("create", Path.class, FsPermission.class,
388                   boolean.class, int.class, short.class, long.class,
389                   Progressable.class, InetSocketAddress[].class)
390                   .invoke(backingFs, path, perm, true,
391                       getDefaultBufferSize(backingFs),
392                       getDefaultReplication(backingFs, path),
393                       getDefaultBlockSize(backingFs, path),
394                       null, favoredNodes));
395         } catch (InvocationTargetException ite) {
396           // Function was properly called, but threw it's own exception.
397           throw new IOException(ite.getCause());
398         } catch (NoSuchMethodException e) {
399           LOG.debug("DFS Client does not support most favored nodes create; using default create");
400           if (LOG.isTraceEnabled()) LOG.trace("Ignoring; use default create", e);
401         } catch (IllegalArgumentException e) {
402           LOG.debug("Ignoring (most likely Reflection related exception) " + e);
403         } catch (SecurityException e) {
404           LOG.debug("Ignoring (most likely Reflection related exception) " + e);
405         } catch (IllegalAccessException e) {
406           LOG.debug("Ignoring (most likely Reflection related exception) " + e);
407         }
408       }
409     }
410     return create(fs, path, perm, true);
411   }
412 
413   /**
414    * Create the specified file on the filesystem. By default, this will:
415    * <ol>
416    * <li>apply the umask in the configuration (if it is enabled)</li>
417    * <li>use the fs configured buffer size (or 4096 if not set)</li>
418    * <li>use the default replication</li>
419    * <li>use the default block size</li>
420    * <li>not track progress</li>
421    * </ol>
422    *
423    * @param fs {@link FileSystem} on which to write the file
424    * @param path {@link Path} to the file to write
425    * @param perm
426    * @param overwrite Whether or not the created file should be overwritten.
427    * @return output stream to the created file
428    * @throws IOException if the file cannot be created
429    */
430   public static FSDataOutputStream create(FileSystem fs, Path path,
431       FsPermission perm, boolean overwrite) throws IOException {
432     if (LOG.isTraceEnabled()) {
433       LOG.trace("Creating file=" + path + " with permission=" + perm + ", overwrite=" + overwrite);
434     }
435     return fs.create(path, perm, overwrite, getDefaultBufferSize(fs),
436         getDefaultReplication(fs, path), getDefaultBlockSize(fs, path), null);
437   }
438 
439   /**
440    * Get the file permissions specified in the configuration, if they are
441    * enabled.
442    *
443    * @param fs filesystem that the file will be created on.
444    * @param conf configuration to read for determining if permissions are
445    *          enabled and which to use
446    * @param permssionConfKey property key in the configuration to use when
447    *          finding the permission
448    * @return the permission to use when creating a new file on the fs. If
449    *         special permissions are not specified in the configuration, then
450    *         the default permissions on the the fs will be returned.
451    */
452   public static FsPermission getFilePermissions(final FileSystem fs,
453       final Configuration conf, final String permssionConfKey) {
454     boolean enablePermissions = conf.getBoolean(
455         HConstants.ENABLE_DATA_FILE_UMASK, false);
456 
457     if (enablePermissions) {
458       try {
459         FsPermission perm = new FsPermission(FULL_RWX_PERMISSIONS);
460         // make sure that we have a mask, if not, go default.
461         String mask = conf.get(permssionConfKey);
462         if (mask == null)
463           return FsPermission.getFileDefault();
464         // appy the umask
465         FsPermission umask = new FsPermission(mask);
466         return perm.applyUMask(umask);
467       } catch (IllegalArgumentException e) {
468         LOG.warn(
469             "Incorrect umask attempted to be created: "
470                 + conf.get(permssionConfKey)
471                 + ", using default file permissions.", e);
472         return FsPermission.getFileDefault();
473       }
474     }
475     return FsPermission.getFileDefault();
476   }
477 
478   /**
479    * Checks to see if the specified file system is available
480    *
481    * @param fs filesystem
482    * @throws IOException e
483    */
484   public static void checkFileSystemAvailable(final FileSystem fs)
485   throws IOException {
486     if (!(fs instanceof DistributedFileSystem)) {
487       return;
488     }
489     IOException exception = null;
490     DistributedFileSystem dfs = (DistributedFileSystem) fs;
491     try {
492       if (dfs.exists(new Path("/"))) {
493         return;
494       }
495     } catch (IOException e) {
496       exception = e instanceof RemoteException ?
497               ((RemoteException)e).unwrapRemoteException() : e;
498     }
499     try {
500       fs.close();
501     } catch (Exception e) {
502       LOG.error("file system close failed: ", e);
503     }
504     IOException io = new IOException("File system is not available");
505     io.initCause(exception);
506     throw io;
507   }
508 
509   /**
510    * We use reflection because {@link DistributedFileSystem#setSafeMode(
511    * HdfsConstants.SafeModeAction action, boolean isChecked)} is not in hadoop 1.1
512    *
513    * @param dfs
514    * @return whether we're in safe mode
515    * @throws IOException
516    */
517   private static boolean isInSafeMode(DistributedFileSystem dfs) throws IOException {
518     boolean inSafeMode = false;
519     try {
520       Method m = DistributedFileSystem.class.getMethod("setSafeMode", new Class<?> []{
521           org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.class, boolean.class});
522       inSafeMode = (Boolean) m.invoke(dfs,
523         org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET, true);
524     } catch (Exception e) {
525       if (e instanceof IOException) throw (IOException) e;
526 
527       // Check whether dfs is on safemode.
528       inSafeMode = dfs.setSafeMode(
529         org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET);
530     }
531     return inSafeMode;
532   }
533 
534   /**
535    * Check whether dfs is in safemode.
536    * @param conf
537    * @throws IOException
538    */
539   public static void checkDfsSafeMode(final Configuration conf)
540   throws IOException {
541     boolean isInSafeMode = false;
542     FileSystem fs = FileSystem.get(conf);
543     if (fs instanceof DistributedFileSystem) {
544       DistributedFileSystem dfs = (DistributedFileSystem)fs;
545       isInSafeMode = isInSafeMode(dfs);
546     }
547     if (isInSafeMode) {
548       throw new IOException("File system is in safemode, it can't be written now");
549     }
550   }
551 
552   /**
553    * Verifies current version of file system
554    *
555    * @param fs filesystem object
556    * @param rootdir root hbase directory
557    * @return null if no version file exists, version string otherwise.
558    * @throws IOException e
559    * @throws org.apache.hadoop.hbase.exceptions.DeserializationException
560    */
561   public static String getVersion(FileSystem fs, Path rootdir)
562   throws IOException, DeserializationException {
563     Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
564     FileStatus[] status = null;
565     try {
566       // hadoop 2.0 throws FNFE if directory does not exist.
567       // hadoop 1.0 returns null if directory does not exist.
568       status = fs.listStatus(versionFile);
569     } catch (FileNotFoundException fnfe) {
570       return null;
571     }
572     if (status == null || status.length == 0) return null;
573     String version = null;
574     byte [] content = new byte [(int)status[0].getLen()];
575     FSDataInputStream s = fs.open(versionFile);
576     try {
577       IOUtils.readFully(s, content, 0, content.length);
578       if (ProtobufUtil.isPBMagicPrefix(content)) {
579         version = parseVersionFrom(content);
580       } else {
581         // Presume it pre-pb format.
582         InputStream is = new ByteArrayInputStream(content);
583         DataInputStream dis = new DataInputStream(is);
584         try {
585           version = dis.readUTF();
586         } finally {
587           dis.close();
588         }
589       }
590     } catch (EOFException eof) {
591       LOG.warn("Version file was empty, odd, will try to set it.");
592     } finally {
593       s.close();
594     }
595     return version;
596   }
597 
598   /**
599    * Parse the content of the ${HBASE_ROOTDIR}/hbase.version file.
600    * @param bytes The byte content of the hbase.version file.
601    * @return The version found in the file as a String.
602    * @throws DeserializationException
603    */
604   static String parseVersionFrom(final byte [] bytes)
605   throws DeserializationException {
606     ProtobufUtil.expectPBMagicPrefix(bytes);
607     int pblen = ProtobufUtil.lengthOfPBMagic();
608     FSProtos.HBaseVersionFileContent.Builder builder =
609       FSProtos.HBaseVersionFileContent.newBuilder();
610     FSProtos.HBaseVersionFileContent fileContent;
611     try {
612       fileContent = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
613       return fileContent.getVersion();
614     } catch (InvalidProtocolBufferException e) {
615       // Convert
616       throw new DeserializationException(e);
617     }
618   }
619 
620   /**
621    * Create the content to write into the ${HBASE_ROOTDIR}/hbase.version file.
622    * @param version Version to persist
623    * @return Serialized protobuf with <code>version</code> content and a bit of pb magic for a prefix.
624    */
625   static byte [] toVersionByteArray(final String version) {
626     FSProtos.HBaseVersionFileContent.Builder builder =
627       FSProtos.HBaseVersionFileContent.newBuilder();
628     return ProtobufUtil.prependPBMagic(builder.setVersion(version).build().toByteArray());
629   }
630 
631   /**
632    * Verifies current version of file system
633    *
634    * @param fs file system
635    * @param rootdir root directory of HBase installation
636    * @param message if true, issues a message on System.out
637    *
638    * @throws IOException e
639    * @throws DeserializationException
640    */
641   public static void checkVersion(FileSystem fs, Path rootdir, boolean message)
642   throws IOException, DeserializationException {
643     checkVersion(fs, rootdir, message, 0, HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
644   }
645 
646   /**
647    * Verifies current version of file system
648    *
649    * @param fs file system
650    * @param rootdir root directory of HBase installation
651    * @param message if true, issues a message on System.out
652    * @param wait wait interval
653    * @param retries number of times to retry
654    *
655    * @throws IOException e
656    * @throws DeserializationException
657    */
658   public static void checkVersion(FileSystem fs, Path rootdir,
659       boolean message, int wait, int retries)
660   throws IOException, DeserializationException {
661     String version = getVersion(fs, rootdir);
662     if (version == null) {
663       if (!metaRegionExists(fs, rootdir)) {
664         // rootDir is empty (no version file and no root region)
665         // just create new version file (HBASE-1195)
666         setVersion(fs, rootdir, wait, retries);
667         return;
668       }
669     } else if (version.compareTo(HConstants.FILE_SYSTEM_VERSION) == 0) return;
670 
671     // version is deprecated require migration
672     // Output on stdout so user sees it in terminal.
673     String msg = "HBase file layout needs to be upgraded."
674       + " You have version " + version
675       + " and I want version " + HConstants.FILE_SYSTEM_VERSION
676       + ". Consult http://hbase.apache.org/book.html for further information about upgrading HBase."
677       + " Is your hbase.rootdir valid? If so, you may need to run "
678       + "'hbase hbck -fixVersionFile'.";
679     if (message) {
680       System.out.println("WARNING! " + msg);
681     }
682     throw new FileSystemVersionException(msg);
683   }
684 
685   /**
686    * Sets version of file system
687    *
688    * @param fs filesystem object
689    * @param rootdir hbase root
690    * @throws IOException e
691    */
692   public static void setVersion(FileSystem fs, Path rootdir)
693   throws IOException {
694     setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, 0,
695       HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
696   }
697 
698   /**
699    * Sets version of file system
700    *
701    * @param fs filesystem object
702    * @param rootdir hbase root
703    * @param wait time to wait for retry
704    * @param retries number of times to retry before failing
705    * @throws IOException e
706    */
707   public static void setVersion(FileSystem fs, Path rootdir, int wait, int retries)
708   throws IOException {
709     setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, wait, retries);
710   }
711 
712 
713   /**
714    * Sets version of file system
715    *
716    * @param fs filesystem object
717    * @param rootdir hbase root directory
718    * @param version version to set
719    * @param wait time to wait for retry
720    * @param retries number of times to retry before throwing an IOException
721    * @throws IOException e
722    */
723   public static void setVersion(FileSystem fs, Path rootdir, String version,
724       int wait, int retries) throws IOException {
725     Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
726     Path tempVersionFile = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY + Path.SEPARATOR +
727       HConstants.VERSION_FILE_NAME);
728     while (true) {
729       try {
730         // Write the version to a temporary file
731         FSDataOutputStream s = fs.create(tempVersionFile);
732         try {
733           s.write(toVersionByteArray(version));
734           s.close();
735           s = null;
736           // Move the temp version file to its normal location. Returns false
737           // if the rename failed. Throw an IOE in that case.
738           if (!fs.rename(tempVersionFile, versionFile)) {
739             throw new IOException("Unable to move temp version file to " + versionFile);
740           }
741         } finally {
742           // Cleaning up the temporary if the rename failed would be trying
743           // too hard. We'll unconditionally create it again the next time
744           // through anyway, files are overwritten by default by create().
745 
746           // Attempt to close the stream on the way out if it is still open.
747           try {
748             if (s != null) s.close();
749           } catch (IOException ignore) { }
750         }
751         LOG.info("Created version file at " + rootdir.toString() + " with version=" + version);
752         return;
753       } catch (IOException e) {
754         if (retries > 0) {
755           LOG.debug("Unable to create version file at " + rootdir.toString() + ", retrying", e);
756           fs.delete(versionFile, false);
757           try {
758             if (wait > 0) {
759               Thread.sleep(wait);
760             }
761           } catch (InterruptedException ie) {
762             throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
763           }
764           retries--;
765         } else {
766           throw e;
767         }
768       }
769     }
770   }
771 
772   /**
773    * Checks that a cluster ID file exists in the HBase root directory
774    * @param fs the root directory FileSystem
775    * @param rootdir the HBase root directory in HDFS
776    * @param wait how long to wait between retries
777    * @return <code>true</code> if the file exists, otherwise <code>false</code>
778    * @throws IOException if checking the FileSystem fails
779    */
780   public static boolean checkClusterIdExists(FileSystem fs, Path rootdir,
781       int wait) throws IOException {
782     while (true) {
783       try {
784         Path filePath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
785         return fs.exists(filePath);
786       } catch (IOException ioe) {
787         if (wait > 0) {
788           LOG.warn("Unable to check cluster ID file in " + rootdir.toString() +
789               ", retrying in "+wait+"msec: "+StringUtils.stringifyException(ioe));
790           try {
791             Thread.sleep(wait);
792           } catch (InterruptedException e) {
793             throw (InterruptedIOException)new InterruptedIOException().initCause(e);
794           }
795         } else {
796           throw ioe;
797         }
798       }
799     }
800   }
801 
802   /**
803    * Returns the value of the unique cluster ID stored for this HBase instance.
804    * @param fs the root directory FileSystem
805    * @param rootdir the path to the HBase root directory
806    * @return the unique cluster identifier
807    * @throws IOException if reading the cluster ID file fails
808    */
809   public static ClusterId getClusterId(FileSystem fs, Path rootdir)
810   throws IOException {
811     Path idPath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
812     ClusterId clusterId = null;
813     FileStatus status = fs.exists(idPath)? fs.getFileStatus(idPath):  null;
814     if (status != null) {
815       int len = Ints.checkedCast(status.getLen());
816       byte [] content = new byte[len];
817       FSDataInputStream in = fs.open(idPath);
818       try {
819         in.readFully(content);
820       } catch (EOFException eof) {
821         LOG.warn("Cluster ID file " + idPath.toString() + " was empty");
822       } finally{
823         in.close();
824       }
825       try {
826         clusterId = ClusterId.parseFrom(content);
827       } catch (DeserializationException e) {
828         throw new IOException("content=" + Bytes.toString(content), e);
829       }
830       // If not pb'd, make it so.
831       if (!ProtobufUtil.isPBMagicPrefix(content)) {
832         String cid = null;
833         in = fs.open(idPath);
834         try {
835           cid = in.readUTF();
836           clusterId = new ClusterId(cid);
837         } catch (EOFException eof) {
838           LOG.warn("Cluster ID file " + idPath.toString() + " was empty");
839         } finally {
840           in.close();
841         }
842         rewriteAsPb(fs, rootdir, idPath, clusterId);
843       }
844       return clusterId;
845     } else {
846       LOG.warn("Cluster ID file does not exist at " + idPath.toString());
847     }
848     return clusterId;
849   }
850 
851   /**
852    * @param cid
853    * @throws IOException
854    */
855   private static void rewriteAsPb(final FileSystem fs, final Path rootdir, final Path p,
856       final ClusterId cid)
857   throws IOException {
858     // Rewrite the file as pb.  Move aside the old one first, write new
859     // then delete the moved-aside file.
860     Path movedAsideName = new Path(p + "." + System.currentTimeMillis());
861     if (!fs.rename(p, movedAsideName)) throw new IOException("Failed rename of " + p);
862     setClusterId(fs, rootdir, cid, 100);
863     if (!fs.delete(movedAsideName, false)) {
864       throw new IOException("Failed delete of " + movedAsideName);
865     }
866     LOG.debug("Rewrote the hbase.id file as pb");
867   }
868 
869   /**
870    * Writes a new unique identifier for this cluster to the "hbase.id" file
871    * in the HBase root directory
872    * @param fs the root directory FileSystem
873    * @param rootdir the path to the HBase root directory
874    * @param clusterId the unique identifier to store
875    * @param wait how long (in milliseconds) to wait between retries
876    * @throws IOException if writing to the FileSystem fails and no wait value
877    */
878   public static void setClusterId(FileSystem fs, Path rootdir, ClusterId clusterId,
879       int wait) throws IOException {
880     while (true) {
881       try {
882         Path idFile = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
883         Path tempIdFile = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY +
884           Path.SEPARATOR + HConstants.CLUSTER_ID_FILE_NAME);
885         // Write the id file to a temporary location
886         FSDataOutputStream s = fs.create(tempIdFile);
887         try {
888           s.write(clusterId.toByteArray());
889           s.close();
890           s = null;
891           // Move the temporary file to its normal location. Throw an IOE if
892           // the rename failed
893           if (!fs.rename(tempIdFile, idFile)) {
894             throw new IOException("Unable to move temp version file to " + idFile);
895           }
896         } finally {
897           // Attempt to close the stream if still open on the way out
898           try {
899             if (s != null) s.close();
900           } catch (IOException ignore) { }
901         }
902         if (LOG.isDebugEnabled()) {
903           LOG.debug("Created cluster ID file at " + idFile.toString() + " with ID: " + clusterId);
904         }
905         return;
906       } catch (IOException ioe) {
907         if (wait > 0) {
908           LOG.warn("Unable to create cluster ID file in " + rootdir.toString() +
909               ", retrying in " + wait + "msec: " + StringUtils.stringifyException(ioe));
910           try {
911             Thread.sleep(wait);
912           } catch (InterruptedException e) {
913             throw (InterruptedIOException)new InterruptedIOException().initCause(e);
914           }
915         } else {
916           throw ioe;
917         }
918       }
919     }
920   }
921 
922   /**
923    * Verifies root directory path is a valid URI with a scheme
924    *
925    * @param root root directory path
926    * @return Passed <code>root</code> argument.
927    * @throws IOException if not a valid URI with a scheme
928    */
929   public static Path validateRootPath(Path root) throws IOException {
930     try {
931       URI rootURI = new URI(root.toString());
932       String scheme = rootURI.getScheme();
933       if (scheme == null) {
934         throw new IOException("Root directory does not have a scheme");
935       }
936       return root;
937     } catch (URISyntaxException e) {
938       IOException io = new IOException("Root directory path is not a valid " +
939         "URI -- check your " + HConstants.HBASE_DIR + " configuration");
940       io.initCause(e);
941       throw io;
942     }
943   }
944 
945   /**
946    * Checks for the presence of the root path (using the provided conf object) in the given path. If
947    * it exists, this method removes it and returns the String representation of remaining relative path.
948    * @param path
949    * @param conf
950    * @return String representation of the remaining relative path
951    * @throws IOException
952    */
953   public static String removeRootPath(Path path, final Configuration conf) throws IOException {
954     Path root = FSUtils.getRootDir(conf);
955     String pathStr = path.toString();
956     // check that the path is absolute... it has the root path in it.
957     if (!pathStr.startsWith(root.toString())) return pathStr;
958     // if not, return as it is.
959     return pathStr.substring(root.toString().length() + 1);// remove the "/" too.
960   }
961 
962   /**
963    * If DFS, check safe mode and if so, wait until we clear it.
964    * @param conf configuration
965    * @param wait Sleep between retries
966    * @throws IOException e
967    */
968   public static void waitOnSafeMode(final Configuration conf,
969     final long wait)
970   throws IOException {
971     FileSystem fs = FileSystem.get(conf);
972     if (!(fs instanceof DistributedFileSystem)) return;
973     DistributedFileSystem dfs = (DistributedFileSystem)fs;
974     // Make sure dfs is not in safe mode
975     while (isInSafeMode(dfs)) {
976       LOG.info("Waiting for dfs to exit safe mode...");
977       try {
978         Thread.sleep(wait);
979       } catch (InterruptedException e) {
980         throw (InterruptedIOException)new InterruptedIOException().initCause(e);
981       }
982     }
983   }
984 
985   /**
986    * Return the 'path' component of a Path.  In Hadoop, Path is an URI.  This
987    * method returns the 'path' component of a Path's URI: e.g. If a Path is
988    * <code>hdfs://example.org:9000/hbase_trunk/TestTable/compaction.dir</code>,
989    * this method returns <code>/hbase_trunk/TestTable/compaction.dir</code>.
990    * This method is useful if you want to print out a Path without qualifying
991    * Filesystem instance.
992    * @param p Filesystem Path whose 'path' component we are to return.
993    * @return Path portion of the Filesystem
994    */
995   public static String getPath(Path p) {
996     return p.toUri().getPath();
997   }
998 
999   /**
1000    * @param c configuration
1001    * @return Path to hbase root directory: i.e. <code>hbase.rootdir</code> from
1002    * configuration as a qualified Path.
1003    * @throws IOException e
1004    */
1005   public static Path getRootDir(final Configuration c) throws IOException {
1006     Path p = new Path(c.get(HConstants.HBASE_DIR));
1007     FileSystem fs = p.getFileSystem(c);
1008     return p.makeQualified(fs);
1009   }
1010 
1011   public static void setRootDir(final Configuration c, final Path root) throws IOException {
1012     c.set(HConstants.HBASE_DIR, root.toString());
1013   }
1014 
1015   public static void setFsDefault(final Configuration c, final Path root) throws IOException {
1016     c.set("fs.defaultFS", root.toString());    // for hadoop 0.21+
1017   }
1018 
1019   /**
1020    * Checks if meta region exists
1021    *
1022    * @param fs file system
1023    * @param rootdir root directory of HBase installation
1024    * @return true if exists
1025    * @throws IOException e
1026    */
1027   @SuppressWarnings("deprecation")
1028   public static boolean metaRegionExists(FileSystem fs, Path rootdir)
1029   throws IOException {
1030     Path metaRegionDir =
1031       HRegion.getRegionDir(rootdir, HRegionInfo.FIRST_META_REGIONINFO);
1032     return fs.exists(metaRegionDir);
1033   }
1034 
1035   /**
1036    * Compute HDFS blocks distribution of a given file, or a portion of the file
1037    * @param fs file system
1038    * @param status file status of the file
1039    * @param start start position of the portion
1040    * @param length length of the portion
1041    * @return The HDFS blocks distribution
1042    */
1043   static public HDFSBlocksDistribution computeHDFSBlocksDistribution(
1044     final FileSystem fs, FileStatus status, long start, long length)
1045     throws IOException {
1046     HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution();
1047     BlockLocation [] blockLocations =
1048       fs.getFileBlockLocations(status, start, length);
1049     for(BlockLocation bl : blockLocations) {
1050       String [] hosts = bl.getHosts();
1051       long len = bl.getLength();
1052       blocksDistribution.addHostsAndBlockWeight(hosts, len);
1053     }
1054 
1055     return blocksDistribution;
1056   }
1057 
1058 
1059 
1060   /**
1061    * Runs through the hbase rootdir and checks all stores have only
1062    * one file in them -- that is, they've been major compacted.  Looks
1063    * at root and meta tables too.
1064    * @param fs filesystem
1065    * @param hbaseRootDir hbase root directory
1066    * @return True if this hbase install is major compacted.
1067    * @throws IOException e
1068    */
1069   public static boolean isMajorCompacted(final FileSystem fs,
1070       final Path hbaseRootDir)
1071   throws IOException {
1072     List<Path> tableDirs = getTableDirs(fs, hbaseRootDir);
1073     PathFilter regionFilter = new RegionDirFilter(fs);
1074     PathFilter familyFilter = new FamilyDirFilter(fs);
1075     for (Path d : tableDirs) {
1076       FileStatus[] regionDirs = fs.listStatus(d, regionFilter);
1077       for (FileStatus regionDir : regionDirs) {
1078         Path dd = regionDir.getPath();
1079         // Else its a region name.  Now look in region for families.
1080         FileStatus[] familyDirs = fs.listStatus(dd, familyFilter);
1081         for (FileStatus familyDir : familyDirs) {
1082           Path family = familyDir.getPath();
1083           // Now in family make sure only one file.
1084           FileStatus[] familyStatus = fs.listStatus(family);
1085           if (familyStatus.length > 1) {
1086             LOG.debug(family.toString() + " has " + familyStatus.length +
1087                 " files.");
1088             return false;
1089           }
1090         }
1091       }
1092     }
1093     return true;
1094   }
1095 
1096   // TODO move this method OUT of FSUtils. No dependencies to HMaster
1097   /**
1098    * Returns the total overall fragmentation percentage. Includes hbase:meta and
1099    * -ROOT- as well.
1100    *
1101    * @param master  The master defining the HBase root and file system.
1102    * @return A map for each table and its percentage.
1103    * @throws IOException When scanning the directory fails.
1104    */
1105   public static int getTotalTableFragmentation(final HMaster master)
1106   throws IOException {
1107     Map<String, Integer> map = getTableFragmentation(master);
1108     return map != null && map.size() > 0 ? map.get("-TOTAL-") : -1;
1109   }
1110 
1111   /**
1112    * Runs through the HBase rootdir and checks how many stores for each table
1113    * have more than one file in them. Checks -ROOT- and hbase:meta too. The total
1114    * percentage across all tables is stored under the special key "-TOTAL-".
1115    *
1116    * @param master  The master defining the HBase root and file system.
1117    * @return A map for each table and its percentage.
1118    *
1119    * @throws IOException When scanning the directory fails.
1120    */
1121   public static Map<String, Integer> getTableFragmentation(
1122     final HMaster master)
1123   throws IOException {
1124     Path path = getRootDir(master.getConfiguration());
1125     // since HMaster.getFileSystem() is package private
1126     FileSystem fs = path.getFileSystem(master.getConfiguration());
1127     return getTableFragmentation(fs, path);
1128   }
1129 
1130   /**
1131    * Runs through the HBase rootdir and checks how many stores for each table
1132    * have more than one file in them. Checks -ROOT- and hbase:meta too. The total
1133    * percentage across all tables is stored under the special key "-TOTAL-".
1134    *
1135    * @param fs  The file system to use.
1136    * @param hbaseRootDir  The root directory to scan.
1137    * @return A map for each table and its percentage.
1138    * @throws IOException When scanning the directory fails.
1139    */
1140   public static Map<String, Integer> getTableFragmentation(
1141     final FileSystem fs, final Path hbaseRootDir)
1142   throws IOException {
1143     Map<String, Integer> frags = new HashMap<String, Integer>();
1144     int cfCountTotal = 0;
1145     int cfFragTotal = 0;
1146     PathFilter regionFilter = new RegionDirFilter(fs);
1147     PathFilter familyFilter = new FamilyDirFilter(fs);
1148     List<Path> tableDirs = getTableDirs(fs, hbaseRootDir);
1149     for (Path d : tableDirs) {
1150       int cfCount = 0;
1151       int cfFrag = 0;
1152       FileStatus[] regionDirs = fs.listStatus(d, regionFilter);
1153       for (FileStatus regionDir : regionDirs) {
1154         Path dd = regionDir.getPath();
1155         // else its a region name, now look in region for families
1156         FileStatus[] familyDirs = fs.listStatus(dd, familyFilter);
1157         for (FileStatus familyDir : familyDirs) {
1158           cfCount++;
1159           cfCountTotal++;
1160           Path family = familyDir.getPath();
1161           // now in family make sure only one file
1162           FileStatus[] familyStatus = fs.listStatus(family);
1163           if (familyStatus.length > 1) {
1164             cfFrag++;
1165             cfFragTotal++;
1166           }
1167         }
1168       }
1169       // compute percentage per table and store in result list
1170       frags.put(FSUtils.getTableName(d).getNameAsString(),
1171         cfCount == 0? 0: Math.round((float) cfFrag / cfCount * 100));
1172     }
1173     // set overall percentage for all tables
1174     frags.put("-TOTAL-",
1175       cfCountTotal == 0? 0: Math.round((float) cfFragTotal / cfCountTotal * 100));
1176     return frags;
1177   }
1178 
1179   /**
1180    * Returns the {@link org.apache.hadoop.fs.Path} object representing the table directory under
1181    * path rootdir
1182    *
1183    * @param rootdir qualified path of HBase root directory
1184    * @param tableName name of table
1185    * @return {@link org.apache.hadoop.fs.Path} for table
1186    */
1187   public static Path getTableDir(Path rootdir, final TableName tableName) {
1188     return new Path(getNamespaceDir(rootdir, tableName.getNamespaceAsString()),
1189         tableName.getQualifierAsString());
1190   }
1191 
1192   /**
1193    * Returns the {@link org.apache.hadoop.hbase.TableName} object representing
1194    * the table directory under
1195    * path rootdir
1196    *
1197    * @param tablePath path of table
1198    * @return {@link org.apache.hadoop.fs.Path} for table
1199    */
1200   public static TableName getTableName(Path tablePath) {
1201     return TableName.valueOf(tablePath.getParent().getName(), tablePath.getName());
1202   }
1203 
1204   /**
1205    * Returns the {@link org.apache.hadoop.fs.Path} object representing
1206    * the namespace directory under path rootdir
1207    *
1208    * @param rootdir qualified path of HBase root directory
1209    * @param namespace namespace name
1210    * @return {@link org.apache.hadoop.fs.Path} for table
1211    */
1212   public static Path getNamespaceDir(Path rootdir, final String namespace) {
1213     return new Path(rootdir, new Path(HConstants.BASE_NAMESPACE_DIR,
1214         new Path(namespace)));
1215   }
1216 
1217   /**
1218    * A {@link PathFilter} that returns only regular files.
1219    */
1220   static class FileFilter implements PathFilter {
1221     private final FileSystem fs;
1222 
1223     public FileFilter(final FileSystem fs) {
1224       this.fs = fs;
1225     }
1226 
1227     @Override
1228     public boolean accept(Path p) {
1229       try {
1230         return fs.isFile(p);
1231       } catch (IOException e) {
1232         LOG.debug("unable to verify if path=" + p + " is a regular file", e);
1233         return false;
1234       }
1235     }
1236   }
1237 
1238   /**
1239    * Directory filter that doesn't include any of the directories in the specified blacklist
1240    */
1241   public static class BlackListDirFilter implements PathFilter {
1242     private final FileSystem fs;
1243     private List<String> blacklist;
1244 
1245     /**
1246      * Create a filter on the givem filesystem with the specified blacklist
1247      * @param fs filesystem to filter
1248      * @param directoryNameBlackList list of the names of the directories to filter. If
1249      *          <tt>null</tt>, all directories are returned
1250      */
1251     @SuppressWarnings("unchecked")
1252     public BlackListDirFilter(final FileSystem fs, final List<String> directoryNameBlackList) {
1253       this.fs = fs;
1254       blacklist =
1255         (List<String>) (directoryNameBlackList == null ? Collections.emptyList()
1256           : directoryNameBlackList);
1257     }
1258 
1259     @Override
1260     public boolean accept(Path p) {
1261       boolean isValid = false;
1262       try {
1263         if (isValidName(p.getName())) {
1264           isValid = fs.getFileStatus(p).isDirectory();
1265         } else {
1266           isValid = false;
1267         }
1268       } catch (IOException e) {
1269         LOG.warn("An error occurred while verifying if [" + p.toString()
1270             + "] is a valid directory. Returning 'not valid' and continuing.", e);
1271       }
1272       return isValid;
1273     }
1274 
1275     protected boolean isValidName(final String name) {
1276       return !blacklist.contains(name);
1277     }
1278   }
1279 
1280   /**
1281    * A {@link PathFilter} that only allows directories.
1282    */
1283   public static class DirFilter extends BlackListDirFilter {
1284 
1285     public DirFilter(FileSystem fs) {
1286       super(fs, null);
1287     }
1288   }
1289 
1290   /**
1291    * A {@link PathFilter} that returns usertable directories. To get all directories use the
1292    * {@link BlackListDirFilter} with a <tt>null</tt> blacklist
1293    */
1294   public static class UserTableDirFilter extends BlackListDirFilter {
1295     public UserTableDirFilter(FileSystem fs) {
1296       super(fs, HConstants.HBASE_NON_TABLE_DIRS);
1297     }
1298 
1299     protected boolean isValidName(final String name) {
1300       if (!super.isValidName(name))
1301         return false;
1302 
1303       try {
1304         TableName.isLegalTableQualifierName(Bytes.toBytes(name));
1305       } catch (IllegalArgumentException e) {
1306         LOG.info("INVALID NAME " + name);
1307         return false;
1308       }
1309       return true;
1310     }
1311   }
1312 
1313   /**
1314    * Heuristic to determine whether is safe or not to open a file for append
1315    * Looks both for dfs.support.append and use reflection to search
1316    * for SequenceFile.Writer.syncFs() or FSDataOutputStream.hflush()
1317    * @param conf
1318    * @return True if append support
1319    */
1320   public static boolean isAppendSupported(final Configuration conf) {
1321     boolean append = conf.getBoolean("dfs.support.append", false);
1322     if (append) {
1323       try {
1324         // TODO: The implementation that comes back when we do a createWriter
1325         // may not be using SequenceFile so the below is not a definitive test.
1326         // Will do for now (hdfs-200).
1327         SequenceFile.Writer.class.getMethod("syncFs", new Class<?> []{});
1328         append = true;
1329       } catch (SecurityException e) {
1330       } catch (NoSuchMethodException e) {
1331         append = false;
1332       }
1333     }
1334     if (!append) {
1335       // Look for the 0.21, 0.22, new-style append evidence.
1336       try {
1337         FSDataOutputStream.class.getMethod("hflush", new Class<?> []{});
1338         append = true;
1339       } catch (NoSuchMethodException e) {
1340         append = false;
1341       }
1342     }
1343     return append;
1344   }
1345 
1346   /**
1347    * @param conf
1348    * @return True if this filesystem whose scheme is 'hdfs'.
1349    * @throws IOException
1350    */
1351   public static boolean isHDFS(final Configuration conf) throws IOException {
1352     FileSystem fs = FileSystem.get(conf);
1353     String scheme = fs.getUri().getScheme();
1354     return scheme.equalsIgnoreCase("hdfs");
1355   }
1356 
1357   /**
1358    * Recover file lease. Used when a file might be suspect
1359    * to be had been left open by another process.
1360    * @param fs FileSystem handle
1361    * @param p Path of file to recover lease
1362    * @param conf Configuration handle
1363    * @throws IOException
1364    */
1365   public abstract void recoverFileLease(final FileSystem fs, final Path p,
1366       Configuration conf, CancelableProgressable reporter) throws IOException;
1367 
1368   public static List<Path> getTableDirs(final FileSystem fs, final Path rootdir)
1369       throws IOException {
1370     List<Path> tableDirs = new LinkedList<Path>();
1371 
1372     for(FileStatus status :
1373         fs.globStatus(new Path(rootdir,
1374             new Path(HConstants.BASE_NAMESPACE_DIR, "*")))) {
1375       tableDirs.addAll(FSUtils.getLocalTableDirs(fs, status.getPath()));
1376     }
1377     return tableDirs;
1378   }
1379 
1380   /**
1381    * @param fs
1382    * @param rootdir
1383    * @return All the table directories under <code>rootdir</code>. Ignore non table hbase folders such as
1384    * .logs, .oldlogs, .corrupt folders.
1385    * @throws IOException
1386    */
1387   public static List<Path> getLocalTableDirs(final FileSystem fs, final Path rootdir)
1388       throws IOException {
1389     // presumes any directory under hbase.rootdir is a table
1390     FileStatus[] dirs = fs.listStatus(rootdir, new UserTableDirFilter(fs));
1391     List<Path> tabledirs = new ArrayList<Path>(dirs.length);
1392     for (FileStatus dir: dirs) {
1393       tabledirs.add(dir.getPath());
1394     }
1395     return tabledirs;
1396   }
1397 
1398   /**
1399    * Checks if the given path is the one with 'recovered.edits' dir.
1400    * @param path
1401    * @return True if we recovered edits
1402    */
1403   public static boolean isRecoveredEdits(Path path) {
1404     return path.toString().contains(HConstants.RECOVERED_EDITS_DIR);
1405   }
1406 
1407   /**
1408    * Filter for all dirs that don't start with '.'
1409    */
1410   public static class RegionDirFilter implements PathFilter {
1411     // This pattern will accept 0.90+ style hex region dirs and older numeric region dir names.
1412     final public static Pattern regionDirPattern = Pattern.compile("^[0-9a-f]*$");
1413     final FileSystem fs;
1414 
1415     public RegionDirFilter(FileSystem fs) {
1416       this.fs = fs;
1417     }
1418 
1419     @Override
1420     public boolean accept(Path rd) {
1421       if (!regionDirPattern.matcher(rd.getName()).matches()) {
1422         return false;
1423       }
1424 
1425       try {
1426         return fs.getFileStatus(rd).isDirectory();
1427       } catch (IOException ioe) {
1428         // Maybe the file was moved or the fs was disconnected.
1429         LOG.warn("Skipping file " + rd +" due to IOException", ioe);
1430         return false;
1431       }
1432     }
1433   }
1434 
1435   /**
1436    * Given a particular table dir, return all the regiondirs inside it, excluding files such as
1437    * .tableinfo
1438    * @param fs A file system for the Path
1439    * @param tableDir Path to a specific table directory &lt;hbase.rootdir&gt;/&lt;tabledir&gt;
1440    * @return List of paths to valid region directories in table dir.
1441    * @throws IOException
1442    */
1443   public static List<Path> getRegionDirs(final FileSystem fs, final Path tableDir) throws IOException {
1444     // assumes we are in a table dir.
1445     FileStatus[] rds = fs.listStatus(tableDir, new RegionDirFilter(fs));
1446     List<Path> regionDirs = new ArrayList<Path>(rds.length);
1447     for (FileStatus rdfs: rds) {
1448       Path rdPath = rdfs.getPath();
1449       regionDirs.add(rdPath);
1450     }
1451     return regionDirs;
1452   }
1453 
1454   /**
1455    * Filter for all dirs that are legal column family names.  This is generally used for colfam
1456    * dirs &lt;hbase.rootdir&gt;/&lt;tabledir&gt;/&lt;regiondir&gt;/&lt;colfamdir&gt;.
1457    */
1458   public static class FamilyDirFilter implements PathFilter {
1459     final FileSystem fs;
1460 
1461     public FamilyDirFilter(FileSystem fs) {
1462       this.fs = fs;
1463     }
1464 
1465     @Override
1466     public boolean accept(Path rd) {
1467       try {
1468         // throws IAE if invalid
1469         HColumnDescriptor.isLegalFamilyName(Bytes.toBytes(rd.getName()));
1470       } catch (IllegalArgumentException iae) {
1471         // path name is an invalid family name and thus is excluded.
1472         return false;
1473       }
1474 
1475       try {
1476         return fs.getFileStatus(rd).isDirectory();
1477       } catch (IOException ioe) {
1478         // Maybe the file was moved or the fs was disconnected.
1479         LOG.warn("Skipping file " + rd +" due to IOException", ioe);
1480         return false;
1481       }
1482     }
1483   }
1484 
1485   /**
1486    * Given a particular region dir, return all the familydirs inside it
1487    *
1488    * @param fs A file system for the Path
1489    * @param regionDir Path to a specific region directory
1490    * @return List of paths to valid family directories in region dir.
1491    * @throws IOException
1492    */
1493   public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir) throws IOException {
1494     // assumes we are in a region dir.
1495     FileStatus[] fds = fs.listStatus(regionDir, new FamilyDirFilter(fs));
1496     List<Path> familyDirs = new ArrayList<Path>(fds.length);
1497     for (FileStatus fdfs: fds) {
1498       Path fdPath = fdfs.getPath();
1499       familyDirs.add(fdPath);
1500     }
1501     return familyDirs;
1502   }
1503 
1504   public static List<Path> getReferenceFilePaths(final FileSystem fs, final Path familyDir) throws IOException {
1505     FileStatus[] fds = fs.listStatus(familyDir, new ReferenceFileFilter(fs));
1506     List<Path> referenceFiles = new ArrayList<Path>(fds.length);
1507     for (FileStatus fdfs: fds) {
1508       Path fdPath = fdfs.getPath();
1509       referenceFiles.add(fdPath);
1510     }
1511     return referenceFiles;
1512   }
1513 
1514   /**
1515    * Filter for HFiles that excludes reference files.
1516    */
1517   public static class HFileFilter implements PathFilter {
1518     final FileSystem fs;
1519 
1520     public HFileFilter(FileSystem fs) {
1521       this.fs = fs;
1522     }
1523 
1524     @Override
1525     public boolean accept(Path rd) {
1526       try {
1527         // only files
1528         return !fs.getFileStatus(rd).isDirectory() && StoreFileInfo.isHFile(rd);
1529       } catch (IOException ioe) {
1530         // Maybe the file was moved or the fs was disconnected.
1531         LOG.warn("Skipping file " + rd +" due to IOException", ioe);
1532         return false;
1533       }
1534     }
1535   }
1536 
1537   public static class ReferenceFileFilter implements PathFilter {
1538 
1539     private final FileSystem fs;
1540 
1541     public ReferenceFileFilter(FileSystem fs) {
1542       this.fs = fs;
1543     }
1544 
1545     @Override
1546     public boolean accept(Path rd) {
1547       try {
1548         // only files can be references.
1549         return !fs.getFileStatus(rd).isDirectory() && StoreFileInfo.isReference(rd);
1550       } catch (IOException ioe) {
1551         // Maybe the file was moved or the fs was disconnected.
1552         LOG.warn("Skipping file " + rd +" due to IOException", ioe);
1553         return false;
1554       }
1555     }
1556   }
1557 
1558 
1559   /**
1560    * @param conf
1561    * @return Returns the filesystem of the hbase rootdir.
1562    * @throws IOException
1563    */
1564   public static FileSystem getCurrentFileSystem(Configuration conf)
1565   throws IOException {
1566     return getRootDir(conf).getFileSystem(conf);
1567   }
1568 
1569 
1570   /**
1571    * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1572    * table StoreFile names to the full Path.
1573    * <br>
1574    * Example...<br>
1575    * Key = 3944417774205889744  <br>
1576    * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1577    *
1578    * @param map map to add values.  If null, this method will create and populate one to return
1579    * @param fs  The file system to use.
1580    * @param hbaseRootDir  The root directory to scan.
1581    * @param tableName name of the table to scan.
1582    * @return Map keyed by StoreFile name with a value of the full Path.
1583    * @throws IOException When scanning the directory fails.
1584    */
1585   public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map,
1586   final FileSystem fs, final Path hbaseRootDir, TableName tableName)
1587   throws IOException {
1588     return getTableStoreFilePathMap(map, fs, hbaseRootDir, tableName, null);
1589   }
1590 
1591   /**
1592    * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1593    * table StoreFile names to the full Path.
1594    * <br>
1595    * Example...<br>
1596    * Key = 3944417774205889744  <br>
1597    * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1598    *
1599    * @param map map to add values.  If null, this method will create and populate one to return
1600    * @param fs  The file system to use.
1601    * @param hbaseRootDir  The root directory to scan.
1602    * @param tableName name of the table to scan.
1603    * @param errors ErrorReporter instance or null
1604    * @return Map keyed by StoreFile name with a value of the full Path.
1605    * @throws IOException When scanning the directory fails.
1606    */
1607   public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map,
1608   final FileSystem fs, final Path hbaseRootDir, TableName tableName, ErrorReporter errors)
1609   throws IOException {
1610     if (map == null) {
1611       map = new HashMap<String, Path>();
1612     }
1613 
1614     // only include the directory paths to tables
1615     Path tableDir = FSUtils.getTableDir(hbaseRootDir, tableName);
1616     // Inside a table, there are compaction.dir directories to skip.  Otherwise, all else
1617     // should be regions.
1618     PathFilter familyFilter = new FamilyDirFilter(fs);
1619     FileStatus[] regionDirs = fs.listStatus(tableDir, new RegionDirFilter(fs));
1620     for (FileStatus regionDir : regionDirs) {
1621       if (null != errors) {
1622         errors.progress();
1623       }
1624       Path dd = regionDir.getPath();
1625       // else its a region name, now look in region for families
1626       FileStatus[] familyDirs = fs.listStatus(dd, familyFilter);
1627       for (FileStatus familyDir : familyDirs) {
1628         if (null != errors) {
1629           errors.progress();
1630         }
1631         Path family = familyDir.getPath();
1632         if (family.getName().equals(HConstants.RECOVERED_EDITS_DIR)) {
1633           continue;
1634         }
1635         // now in family, iterate over the StoreFiles and
1636         // put in map
1637         FileStatus[] familyStatus = fs.listStatus(family);
1638         for (FileStatus sfStatus : familyStatus) {
1639           if (null != errors) {
1640             errors.progress();
1641           }
1642           Path sf = sfStatus.getPath();
1643           map.put( sf.getName(), sf);
1644         }
1645       }
1646     }
1647     return map;
1648   }
1649 
1650   public static int getRegionReferenceFileCount(final FileSystem fs, final Path p) {
1651     int result = 0;
1652     try {
1653       for (Path familyDir:getFamilyDirs(fs, p)){
1654         result += getReferenceFilePaths(fs, familyDir).size();
1655       }
1656     } catch (IOException e) {
1657       LOG.warn("Error Counting reference files.", e);
1658     }
1659     return result;
1660   }
1661 
1662   /**
1663    * Runs through the HBase rootdir and creates a reverse lookup map for
1664    * table StoreFile names to the full Path.
1665    * <br>
1666    * Example...<br>
1667    * Key = 3944417774205889744  <br>
1668    * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1669    *
1670    * @param fs  The file system to use.
1671    * @param hbaseRootDir  The root directory to scan.
1672    * @return Map keyed by StoreFile name with a value of the full Path.
1673    * @throws IOException When scanning the directory fails.
1674    */
1675   public static Map<String, Path> getTableStoreFilePathMap(
1676     final FileSystem fs, final Path hbaseRootDir)
1677   throws IOException {
1678     return getTableStoreFilePathMap(fs, hbaseRootDir, null);
1679   }
1680 
1681   /**
1682    * Runs through the HBase rootdir and creates a reverse lookup map for
1683    * table StoreFile names to the full Path.
1684    * <br>
1685    * Example...<br>
1686    * Key = 3944417774205889744  <br>
1687    * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1688    *
1689    * @param fs  The file system to use.
1690    * @param hbaseRootDir  The root directory to scan.
1691    * @param errors ErrorReporter instance or null
1692    * @return Map keyed by StoreFile name with a value of the full Path.
1693    * @throws IOException When scanning the directory fails.
1694    */
1695   public static Map<String, Path> getTableStoreFilePathMap(
1696     final FileSystem fs, final Path hbaseRootDir, ErrorReporter errors)
1697   throws IOException {
1698     Map<String, Path> map = new HashMap<String, Path>();
1699 
1700     // if this method looks similar to 'getTableFragmentation' that is because
1701     // it was borrowed from it.
1702 
1703     // only include the directory paths to tables
1704     for (Path tableDir : FSUtils.getTableDirs(fs, hbaseRootDir)) {
1705       getTableStoreFilePathMap(map, fs, hbaseRootDir,
1706           FSUtils.getTableName(tableDir), errors);
1707     }
1708     return map;
1709   }
1710 
1711   /**
1712    * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
1713    * This accommodates differences between hadoop versions, where hadoop 1
1714    * does not throw a FileNotFoundException, and return an empty FileStatus[]
1715    * while Hadoop 2 will throw FileNotFoundException.
1716    *
1717    * @param fs file system
1718    * @param dir directory
1719    * @param filter path filter
1720    * @return null if dir is empty or doesn't exist, otherwise FileStatus array
1721    */
1722   public static FileStatus [] listStatus(final FileSystem fs,
1723       final Path dir, final PathFilter filter) throws IOException {
1724     FileStatus [] status = null;
1725     try {
1726       status = filter == null ? fs.listStatus(dir) : fs.listStatus(dir, filter);
1727     } catch (FileNotFoundException fnfe) {
1728       // if directory doesn't exist, return null
1729       if (LOG.isTraceEnabled()) {
1730         LOG.trace(dir + " doesn't exist");
1731       }
1732     }
1733     if (status == null || status.length < 1) return null;
1734     return status;
1735   }
1736 
1737   /**
1738    * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
1739    * This would accommodates differences between hadoop versions
1740    *
1741    * @param fs file system
1742    * @param dir directory
1743    * @return null if dir is empty or doesn't exist, otherwise FileStatus array
1744    */
1745   public static FileStatus[] listStatus(final FileSystem fs, final Path dir) throws IOException {
1746     return listStatus(fs, dir, null);
1747   }
1748 
1749   /**
1750    * Calls fs.delete() and returns the value returned by the fs.delete()
1751    *
1752    * @param fs
1753    * @param path
1754    * @param recursive
1755    * @return the value returned by the fs.delete()
1756    * @throws IOException
1757    */
1758   public static boolean delete(final FileSystem fs, final Path path, final boolean recursive)
1759       throws IOException {
1760     return fs.delete(path, recursive);
1761   }
1762 
1763   /**
1764    * Calls fs.exists(). Checks if the specified path exists
1765    *
1766    * @param fs
1767    * @param path
1768    * @return the value returned by fs.exists()
1769    * @throws IOException
1770    */
1771   public static boolean isExists(final FileSystem fs, final Path path) throws IOException {
1772     return fs.exists(path);
1773   }
1774 
1775   /**
1776    * Throw an exception if an action is not permitted by a user on a file.
1777    *
1778    * @param ugi
1779    *          the user
1780    * @param file
1781    *          the file
1782    * @param action
1783    *          the action
1784    */
1785   public static void checkAccess(UserGroupInformation ugi, FileStatus file,
1786       FsAction action) throws AccessDeniedException {
1787     if (ugi.getShortUserName().equals(file.getOwner())) {
1788       if (file.getPermission().getUserAction().implies(action)) {
1789         return;
1790       }
1791     } else if (contains(ugi.getGroupNames(), file.getGroup())) {
1792       if (file.getPermission().getGroupAction().implies(action)) {
1793         return;
1794       }
1795     } else if (file.getPermission().getOtherAction().implies(action)) {
1796       return;
1797     }
1798     throw new AccessDeniedException("Permission denied:" + " action=" + action
1799         + " path=" + file.getPath() + " user=" + ugi.getShortUserName());
1800   }
1801 
1802   private static boolean contains(String[] groups, String user) {
1803     for (String group : groups) {
1804       if (group.equals(user)) {
1805         return true;
1806       }
1807     }
1808     return false;
1809   }
1810 
1811   /**
1812    * Log the current state of the filesystem from a certain root directory
1813    * @param fs filesystem to investigate
1814    * @param root root file/directory to start logging from
1815    * @param LOG log to output information
1816    * @throws IOException if an unexpected exception occurs
1817    */
1818   public static void logFileSystemState(final FileSystem fs, final Path root, Log LOG)
1819       throws IOException {
1820     LOG.debug("Current file system:");
1821     logFSTree(LOG, fs, root, "|-");
1822   }
1823 
1824   /**
1825    * Recursive helper to log the state of the FS
1826    *
1827    * @see #logFileSystemState(FileSystem, Path, Log)
1828    */
1829   private static void logFSTree(Log LOG, final FileSystem fs, final Path root, String prefix)
1830       throws IOException {
1831     FileStatus[] files = FSUtils.listStatus(fs, root, null);
1832     if (files == null) return;
1833 
1834     for (FileStatus file : files) {
1835       if (file.isDirectory()) {
1836         LOG.debug(prefix + file.getPath().getName() + "/");
1837         logFSTree(LOG, fs, file.getPath(), prefix + "---");
1838       } else {
1839         LOG.debug(prefix + file.getPath().getName());
1840       }
1841     }
1842   }
1843 
1844   public static boolean renameAndSetModifyTime(final FileSystem fs, final Path src, final Path dest)
1845       throws IOException {
1846     // set the modify time for TimeToLive Cleaner
1847     fs.setTimes(src, EnvironmentEdgeManager.currentTime(), -1);
1848     return fs.rename(src, dest);
1849   }
1850 
1851   /**
1852    * This function is to scan the root path of the file system to get the
1853    * degree of locality for each region on each of the servers having at least
1854    * one block of that region.
1855    * This is used by the tool {@link org.apache.hadoop.hbase.master.RegionPlacementMaintainer}
1856    *
1857    * @param conf
1858    *          the configuration to use
1859    * @return the mapping from region encoded name to a map of server names to
1860    *           locality fraction
1861    * @throws IOException
1862    *           in case of file system errors or interrupts
1863    */
1864   public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
1865       final Configuration conf) throws IOException {
1866     return getRegionDegreeLocalityMappingFromFS(
1867         conf, null,
1868         conf.getInt(THREAD_POOLSIZE, DEFAULT_THREAD_POOLSIZE));
1869 
1870   }
1871 
1872   /**
1873    * This function is to scan the root path of the file system to get the
1874    * degree of locality for each region on each of the servers having at least
1875    * one block of that region.
1876    *
1877    * @param conf
1878    *          the configuration to use
1879    * @param desiredTable
1880    *          the table you wish to scan locality for
1881    * @param threadPoolSize
1882    *          the thread pool size to use
1883    * @return the mapping from region encoded name to a map of server names to
1884    *           locality fraction
1885    * @throws IOException
1886    *           in case of file system errors or interrupts
1887    */
1888   public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
1889       final Configuration conf, final String desiredTable, int threadPoolSize)
1890       throws IOException {
1891     Map<String, Map<String, Float>> regionDegreeLocalityMapping =
1892         new ConcurrentHashMap<String, Map<String, Float>>();
1893     getRegionLocalityMappingFromFS(conf, desiredTable, threadPoolSize, null,
1894         regionDegreeLocalityMapping);
1895     return regionDegreeLocalityMapping;
1896   }
1897 
1898   /**
1899    * This function is to scan the root path of the file system to get either the
1900    * mapping between the region name and its best locality region server or the
1901    * degree of locality of each region on each of the servers having at least
1902    * one block of that region. The output map parameters are both optional.
1903    *
1904    * @param conf
1905    *          the configuration to use
1906    * @param desiredTable
1907    *          the table you wish to scan locality for
1908    * @param threadPoolSize
1909    *          the thread pool size to use
1910    * @param regionToBestLocalityRSMapping
1911    *          the map into which to put the best locality mapping or null
1912    * @param regionDegreeLocalityMapping
1913    *          the map into which to put the locality degree mapping or null,
1914    *          must be a thread-safe implementation
1915    * @throws IOException
1916    *           in case of file system errors or interrupts
1917    */
1918   private static void getRegionLocalityMappingFromFS(
1919       final Configuration conf, final String desiredTable,
1920       int threadPoolSize,
1921       Map<String, String> regionToBestLocalityRSMapping,
1922       Map<String, Map<String, Float>> regionDegreeLocalityMapping)
1923       throws IOException {
1924     FileSystem fs =  FileSystem.get(conf);
1925     Path rootPath = FSUtils.getRootDir(conf);
1926     long startTime = EnvironmentEdgeManager.currentTime();
1927     Path queryPath;
1928     // The table files are in ${hbase.rootdir}/data/<namespace>/<table>/*
1929     if (null == desiredTable) {
1930       queryPath = new Path(new Path(rootPath, HConstants.BASE_NAMESPACE_DIR).toString() + "/*/*/*/");
1931     } else {
1932       queryPath = new Path(FSUtils.getTableDir(rootPath, TableName.valueOf(desiredTable)).toString() + "/*/");
1933     }
1934 
1935     // reject all paths that are not appropriate
1936     PathFilter pathFilter = new PathFilter() {
1937       @Override
1938       public boolean accept(Path path) {
1939         // this is the region name; it may get some noise data
1940         if (null == path) {
1941           return false;
1942         }
1943 
1944         // no parent?
1945         Path parent = path.getParent();
1946         if (null == parent) {
1947           return false;
1948         }
1949 
1950         String regionName = path.getName();
1951         if (null == regionName) {
1952           return false;
1953         }
1954 
1955         if (!regionName.toLowerCase().matches("[0-9a-f]+")) {
1956           return false;
1957         }
1958         return true;
1959       }
1960     };
1961 
1962     FileStatus[] statusList = fs.globStatus(queryPath, pathFilter);
1963 
1964     if (null == statusList) {
1965       return;
1966     } else {
1967       LOG.debug("Query Path: " + queryPath + " ; # list of files: " +
1968           statusList.length);
1969     }
1970 
1971     // lower the number of threads in case we have very few expected regions
1972     threadPoolSize = Math.min(threadPoolSize, statusList.length);
1973 
1974     // run in multiple threads
1975     ThreadPoolExecutor tpe = new ThreadPoolExecutor(threadPoolSize,
1976         threadPoolSize, 60, TimeUnit.SECONDS,
1977         new ArrayBlockingQueue<Runnable>(statusList.length));
1978     try {
1979       // ignore all file status items that are not of interest
1980       for (FileStatus regionStatus : statusList) {
1981         if (null == regionStatus) {
1982           continue;
1983         }
1984 
1985         if (!regionStatus.isDirectory()) {
1986           continue;
1987         }
1988 
1989         Path regionPath = regionStatus.getPath();
1990         if (null == regionPath) {
1991           continue;
1992         }
1993 
1994         tpe.execute(new FSRegionScanner(fs, regionPath,
1995             regionToBestLocalityRSMapping, regionDegreeLocalityMapping));
1996       }
1997     } finally {
1998       tpe.shutdown();
1999       int threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY,
2000           60 * 1000);
2001       try {
2002         // here we wait until TPE terminates, which is either naturally or by
2003         // exceptions in the execution of the threads
2004         while (!tpe.awaitTermination(threadWakeFrequency,
2005             TimeUnit.MILLISECONDS)) {
2006           // printing out rough estimate, so as to not introduce
2007           // AtomicInteger
2008           LOG.info("Locality checking is underway: { Scanned Regions : "
2009               + tpe.getCompletedTaskCount() + "/"
2010               + tpe.getTaskCount() + " }");
2011         }
2012       } catch (InterruptedException e) {
2013         throw (InterruptedIOException)new InterruptedIOException().initCause(e);
2014       }
2015     }
2016 
2017     long overhead = EnvironmentEdgeManager.currentTime() - startTime;
2018     String overheadMsg = "Scan DFS for locality info takes " + overhead + " ms";
2019 
2020     LOG.info(overheadMsg);
2021   }
2022 
2023   /**
2024    * Do our short circuit read setup.
2025    * Checks buffer size to use and whether to do checksumming in hbase or hdfs.
2026    * @param conf
2027    */
2028   public static void setupShortCircuitRead(final Configuration conf) {
2029     // Check that the user has not set the "dfs.client.read.shortcircuit.skip.checksum" property.
2030     boolean shortCircuitSkipChecksum =
2031       conf.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false);
2032     boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
2033     if (shortCircuitSkipChecksum) {
2034       LOG.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not " +
2035         "be set to true." + (useHBaseChecksum ? " HBase checksum doesn't require " +
2036         "it, see https://issues.apache.org/jira/browse/HBASE-6868." : ""));
2037       assert !shortCircuitSkipChecksum; //this will fail if assertions are on
2038     }
2039     checkShortCircuitReadBufferSize(conf);
2040   }
2041 
2042   /**
2043    * Check if short circuit read buffer size is set and if not, set it to hbase value.
2044    * @param conf
2045    */
2046   public static void checkShortCircuitReadBufferSize(final Configuration conf) {
2047     final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2;
2048     final int notSet = -1;
2049     // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2
2050     final String dfsKey = "dfs.client.read.shortcircuit.buffer.size";
2051     int size = conf.getInt(dfsKey, notSet);
2052     // If a size is set, return -- we will use it.
2053     if (size != notSet) return;
2054     // But short circuit buffer size is normally not set.  Put in place the hbase wanted size.
2055     int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize);
2056     conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
2057   }
2058 
2059   /**
2060    * @param c
2061    * @return The DFSClient DFSHedgedReadMetrics instance or null if can't be found or not on hdfs.
2062    * @throws IOException 
2063    */
2064   public static DFSHedgedReadMetrics getDFSHedgedReadMetrics(final Configuration c)
2065       throws IOException {
2066     if (!isHDFS(c)) return null;
2067     // getHedgedReadMetrics is package private. Get the DFSClient instance that is internal
2068     // to the DFS FS instance and make the method getHedgedReadMetrics accessible, then invoke it
2069     // to get the singleton instance of DFSHedgedReadMetrics shared by DFSClients.
2070     final String name = "getHedgedReadMetrics";
2071     DFSClient dfsclient = ((DistributedFileSystem)FileSystem.get(c)).getClient();
2072     Method m;
2073     try {
2074       m = dfsclient.getClass().getDeclaredMethod(name);
2075     } catch (NoSuchMethodException e) {
2076       LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " +
2077           e.getMessage());
2078       return null;
2079     } catch (SecurityException e) {
2080       LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " +
2081           e.getMessage());
2082       return null;
2083     }
2084     m.setAccessible(true);
2085     try {
2086       return (DFSHedgedReadMetrics)m.invoke(dfsclient);
2087     } catch (IllegalAccessException e) {
2088       LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " +
2089           e.getMessage());
2090       return null;
2091     } catch (IllegalArgumentException e) {
2092       LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " +
2093           e.getMessage());
2094       return null;
2095     } catch (InvocationTargetException e) {
2096       LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " +
2097           e.getMessage());
2098       return null;
2099     }
2100   }
2101 }