1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.util;
20  
21  import java.io.ByteArrayInputStream;
22  import java.io.DataInputStream;
23  import java.io.EOFException;
24  import java.io.FileNotFoundException;
25  import java.io.IOException;
26  import java.io.InputStream;
27  import java.lang.reflect.Method;
28  import java.net.URI;
29  import java.net.URISyntaxException;
30  import java.util.ArrayList;
31  import java.util.Collections;
32  import java.util.HashMap;
33  import java.util.List;
34  import java.util.Map;
35  import java.util.regex.Pattern;
36  
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  import org.apache.hadoop.classification.InterfaceAudience;
40  import org.apache.hadoop.classification.InterfaceStability;
41  import org.apache.hadoop.conf.Configuration;
42  import org.apache.hadoop.fs.BlockLocation;
43  import org.apache.hadoop.fs.FSDataInputStream;
44  import org.apache.hadoop.fs.FSDataOutputStream;
45  import org.apache.hadoop.fs.FileStatus;
46  import org.apache.hadoop.fs.FileSystem;
47  import org.apache.hadoop.fs.Path;
48  import org.apache.hadoop.fs.PathFilter;
49  import org.apache.hadoop.fs.permission.FsAction;
50  import org.apache.hadoop.fs.permission.FsPermission;
51  import org.apache.hadoop.hbase.ClusterId;
52  import org.apache.hadoop.hbase.HColumnDescriptor;
53  import org.apache.hadoop.hbase.HConstants;
54  import org.apache.hadoop.hbase.HDFSBlocksDistribution;
55  import org.apache.hadoop.hbase.HRegionInfo;
56  import org.apache.hadoop.hbase.RemoteExceptionHandler;
57  import org.apache.hadoop.hbase.exceptions.DeserializationException;
58  import org.apache.hadoop.hbase.exceptions.FileSystemVersionException;
59  import org.apache.hadoop.hbase.master.HMaster;
60  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
61  import org.apache.hadoop.hbase.protobuf.generated.FSProtos;
62  import org.apache.hadoop.hbase.regionserver.HRegion;
63  import org.apache.hadoop.hdfs.DistributedFileSystem;
64  import org.apache.hadoop.hdfs.protocol.FSConstants;
65  import org.apache.hadoop.io.IOUtils;
66  import org.apache.hadoop.io.SequenceFile;
67  import org.apache.hadoop.security.AccessControlException;
68  import org.apache.hadoop.security.UserGroupInformation;
69  import org.apache.hadoop.util.ReflectionUtils;
70  import org.apache.hadoop.util.StringUtils;
71  
72  import com.google.common.primitives.Ints;
73  import com.google.protobuf.InvalidProtocolBufferException;
74  
75  /**
76   * Utility methods for interacting with the underlying file system.
77   */
78  @InterfaceAudience.Public
79  @InterfaceStability.Evolving
80  public abstract class FSUtils {
81    private static final Log LOG = LogFactory.getLog(FSUtils.class);
82  
83    /** Full access permissions (starting point for a umask) */
84    private static final String FULL_RWX_PERMISSIONS = "777";
85  
86    protected FSUtils() {
87      super();
88    }
89  
90    /**
91     * Compare of path component. Does not consider schema; i.e. if schemas different but <code>path
92     * <code> starts with <code>rootPath<code>, then the function returns true
93     * @param rootPath
94     * @param path 
95     * @return True if <code>path</code> starts with <code>rootPath</code>
96     */
97    public static boolean isStartingWithPath(final Path rootPath, final String path) {
98      String uriRootPath = rootPath.toUri().getPath();
99      String tailUriPath = (new Path(path)).toUri().getPath();
100     return tailUriPath.startsWith(uriRootPath);
101   }
102 
103   /**
104    * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
105    * '/a/b/c' part. Does not consider schema; i.e. if schemas different but path or subpath matches,
106    * the two will equate.
107    * @param pathToSearch Path we will be trying to match.
108    * @param pathTail
109    * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
110    */
111   public static boolean isMatchingTail(final Path pathToSearch, String pathTail) {
112     return isMatchingTail(pathToSearch, new Path(pathTail));
113   }
114 
115   /**
116    * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
117    * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true.  Does not consider
118    * schema; i.e. if schemas different but path or subpath matches, the two will equate.
119    * @param pathToSearch Path we will be trying to match.
120    * @param pathTail
121    * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
122    */
123   public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) {
124     if (pathToSearch.depth() != pathTail.depth()) return false;
125     Path tailPath = pathTail;
126     String tailName;
127     Path toSearch = pathToSearch;
128     String toSearchName;
129     boolean result = false;
130     do {
131       tailName = tailPath.getName();
132       if (tailName == null || tailName.length() <= 0) {
133         result = true;
134         break;
135       }
136       toSearchName = toSearch.getName();
137       if (toSearchName == null || toSearchName.length() <= 0) break;
138       // Move up a parent on each path for next go around.  Path doesn't let us go off the end.
139       tailPath = tailPath.getParent();
140       toSearch = toSearch.getParent();
141     } while(tailName.equals(toSearchName));
142     return result;
143   }
144 
145   public static FSUtils getInstance(FileSystem fs, Configuration conf) {
146     String scheme = fs.getUri().getScheme();
147     if (scheme == null) {
148       LOG.warn("Could not find scheme for uri " +
149           fs.getUri() + ", default to hdfs");
150       scheme = "hdfs";
151     }
152     Class<?> fsUtilsClass = conf.getClass("hbase.fsutil." +
153         scheme + ".impl", FSHDFSUtils.class); // Default to HDFS impl
154     FSUtils fsUtils = (FSUtils)ReflectionUtils.newInstance(fsUtilsClass, conf);
155     return fsUtils;
156   }
157 
158   /**
159    * Delete if exists.
160    * @param fs filesystem object
161    * @param dir directory to delete
162    * @return True if deleted <code>dir</code>
163    * @throws IOException e
164    */
165   public static boolean deleteDirectory(final FileSystem fs, final Path dir)
166   throws IOException {
167     return fs.exists(dir) && fs.delete(dir, true);
168   }
169 
170   /**
171    * Return the number of bytes that large input files should be optimally
172    * be split into to minimize i/o time.
173    *
174    * use reflection to search for getDefaultBlockSize(Path f)
175    * if the method doesn't exist, fall back to using getDefaultBlockSize()
176    *
177    * @param fs filesystem object
178    * @return the default block size for the path's filesystem
179    * @throws IOException e
180    */
181   public static long getDefaultBlockSize(final FileSystem fs, final Path path) throws IOException {
182     Method m = null;
183     Class<? extends FileSystem> cls = fs.getClass();
184     try {
185       m = cls.getMethod("getDefaultBlockSize", new Class<?>[] { Path.class });
186     } catch (NoSuchMethodException e) {
187       LOG.info("FileSystem doesn't support getDefaultBlockSize");
188     } catch (SecurityException e) {
189       LOG.info("Doesn't have access to getDefaultBlockSize on FileSystems", e);
190       m = null; // could happen on setAccessible()
191     }
192     if (m == null) {
193       return fs.getDefaultBlockSize();
194     } else {
195       try {
196         Object ret = m.invoke(fs, path);
197         return ((Long)ret).longValue();
198       } catch (Exception e) {
199         throw new IOException(e);
200       }
201     }
202   }
203 
204   /*
205    * Get the default replication.
206    *
207    * use reflection to search for getDefaultReplication(Path f)
208    * if the method doesn't exist, fall back to using getDefaultReplication()
209    *
210    * @param fs filesystem object
211    * @param f path of file
212    * @return default replication for the path's filesystem
213    * @throws IOException e
214    */
215   public static short getDefaultReplication(final FileSystem fs, final Path path) throws IOException {
216     Method m = null;
217     Class<? extends FileSystem> cls = fs.getClass();
218     try {
219       m = cls.getMethod("getDefaultReplication", new Class<?>[] { Path.class });
220     } catch (NoSuchMethodException e) {
221       LOG.info("FileSystem doesn't support getDefaultReplication");
222     } catch (SecurityException e) {
223       LOG.info("Doesn't have access to getDefaultReplication on FileSystems", e);
224       m = null; // could happen on setAccessible()
225     }
226     if (m == null) {
227       return fs.getDefaultReplication();
228     } else {
229       try {
230         Object ret = m.invoke(fs, path);
231         return ((Number)ret).shortValue();
232       } catch (Exception e) {
233         throw new IOException(e);
234       }
235     }
236   }
237 
238   /**
239    * Returns the default buffer size to use during writes.
240    *
241    * The size of the buffer should probably be a multiple of hardware
242    * page size (4096 on Intel x86), and it determines how much data is
243    * buffered during read and write operations.
244    *
245    * @param fs filesystem object
246    * @return default buffer size to use during writes
247    */
248   public static int getDefaultBufferSize(final FileSystem fs) {
249     return fs.getConf().getInt("io.file.buffer.size", 4096);
250   }
251 
252   /**
253    * Create the specified file on the filesystem. By default, this will:
254    * <ol>
255    * <li>overwrite the file if it exists</li>
256    * <li>apply the umask in the configuration (if it is enabled)</li>
257    * <li>use the fs configured buffer size (or 4096 if not set)</li>
258    * <li>use the default replication</li>
259    * <li>use the default block size</li>
260    * <li>not track progress</li>
261    * </ol>
262    *
263    * @param fs {@link FileSystem} on which to write the file
264    * @param path {@link Path} to the file to write
265    * @return output stream to the created file
266    * @throws IOException if the file cannot be created
267    */
268   public static FSDataOutputStream create(FileSystem fs, Path path,
269       FsPermission perm) throws IOException {
270     return create(fs, path, perm, true);
271   }
272 
273   /**
274    * Create the specified file on the filesystem. By default, this will:
275    * <ol>
276    * <li>apply the umask in the configuration (if it is enabled)</li>
277    * <li>use the fs configured buffer size (or 4096 if not set)</li>
278    * <li>use the default replication</li>
279    * <li>use the default block size</li>
280    * <li>not track progress</li>
281    * </ol>
282    *
283    * @param fs {@link FileSystem} on which to write the file
284    * @param path {@link Path} to the file to write
285    * @param perm
286    * @param overwrite Whether or not the created file should be overwritten.
287    * @return output stream to the created file
288    * @throws IOException if the file cannot be created
289    */
290   @SuppressWarnings("deprecation")
291   public static FSDataOutputStream create(FileSystem fs, Path path,
292       FsPermission perm, boolean overwrite) throws IOException {
293     LOG.debug("Creating file=" + path + " with permission=" + perm);
294 
295     return fs.create(path, perm, overwrite, getDefaultBufferSize(fs),
296         getDefaultReplication(fs, path), getDefaultBlockSize(fs, path), null);
297   }
298 
299   /**
300    * Get the file permissions specified in the configuration, if they are
301    * enabled.
302    *
303    * @param fs filesystem that the file will be created on.
304    * @param conf configuration to read for determining if permissions are
305    *          enabled and which to use
306    * @param permssionConfKey property key in the configuration to use when
307    *          finding the permission
308    * @return the permission to use when creating a new file on the fs. If
309    *         special permissions are not specified in the configuration, then
310    *         the default permissions on the the fs will be returned.
311    */
312   public static FsPermission getFilePermissions(final FileSystem fs,
313       final Configuration conf, final String permssionConfKey) {
314     boolean enablePermissions = conf.getBoolean(
315         HConstants.ENABLE_DATA_FILE_UMASK, false);
316 
317     if (enablePermissions) {
318       try {
319         FsPermission perm = new FsPermission(FULL_RWX_PERMISSIONS);
320         // make sure that we have a mask, if not, go default.
321         String mask = conf.get(permssionConfKey);
322         if (mask == null)
323           return FsPermission.getDefault();
324         // appy the umask
325         FsPermission umask = new FsPermission(mask);
326         return perm.applyUMask(umask);
327       } catch (IllegalArgumentException e) {
328         LOG.warn(
329             "Incorrect umask attempted to be created: "
330                 + conf.get(permssionConfKey)
331                 + ", using default file permissions.", e);
332         return FsPermission.getDefault();
333       }
334     }
335     return FsPermission.getDefault();
336   }
337 
338   /**
339    * Checks to see if the specified file system is available
340    *
341    * @param fs filesystem
342    * @throws IOException e
343    */
344   public static void checkFileSystemAvailable(final FileSystem fs)
345   throws IOException {
346     if (!(fs instanceof DistributedFileSystem)) {
347       return;
348     }
349     IOException exception = null;
350     DistributedFileSystem dfs = (DistributedFileSystem) fs;
351     try {
352       if (dfs.exists(new Path("/"))) {
353         return;
354       }
355     } catch (IOException e) {
356       exception = RemoteExceptionHandler.checkIOException(e);
357     }
358     try {
359       fs.close();
360     } catch (Exception e) {
361       LOG.error("file system close failed: ", e);
362     }
363     IOException io = new IOException("File system is not available");
364     io.initCause(exception);
365     throw io;
366   }
367 
368   /**
369    * We use reflection because {@link DistributedFileSystem#setSafeMode(
370    * FSConstants.SafeModeAction action, boolean isChecked)} is not in hadoop 1.1
371    * 
372    * @param dfs
373    * @return whether we're in safe mode
374    * @throws IOException
375    */
376   private static boolean isInSafeMode(DistributedFileSystem dfs) throws IOException {
377     boolean inSafeMode = false;
378     try {
379       Method m = DistributedFileSystem.class.getMethod("setSafeMode", new Class<?> []{
380           org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction.class, boolean.class});
381       inSafeMode = (Boolean) m.invoke(dfs,
382         org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction.SAFEMODE_GET, true);
383     } catch (Exception e) {
384       if (e instanceof IOException) throw (IOException) e;
385       
386       // Check whether dfs is on safemode.
387       inSafeMode = dfs.setSafeMode(
388         org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction.SAFEMODE_GET);      
389     }
390     return inSafeMode;    
391   }
392   
393   /**
394    * Check whether dfs is in safemode.
395    * @param conf
396    * @throws IOException
397    */
398   public static void checkDfsSafeMode(final Configuration conf)
399   throws IOException {
400     boolean isInSafeMode = false;
401     FileSystem fs = FileSystem.get(conf);
402     if (fs instanceof DistributedFileSystem) {
403       DistributedFileSystem dfs = (DistributedFileSystem)fs;
404       isInSafeMode = isInSafeMode(dfs);
405     }
406     if (isInSafeMode) {
407       throw new IOException("File system is in safemode, it can't be written now");
408     }
409   }
410 
411   /**
412    * Verifies current version of file system
413    *
414    * @param fs filesystem object
415    * @param rootdir root hbase directory
416    * @return null if no version file exists, version string otherwise.
417    * @throws IOException e
418    * @throws org.apache.hadoop.hbase.exceptions.DeserializationException
419    */
420   public static String getVersion(FileSystem fs, Path rootdir)
421   throws IOException, DeserializationException {
422     Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
423     FileStatus[] status = null;
424     try {
425       // hadoop 2.0 throws FNFE if directory does not exist.  
426       // hadoop 1.0 returns null if directory does not exist.
427       status = fs.listStatus(versionFile);
428     } catch (FileNotFoundException fnfe) {
429       return null;
430     }
431     if (status == null || status.length == 0) return null;
432     String version = null;
433     byte [] content = new byte [(int)status[0].getLen()];
434     FSDataInputStream s = fs.open(versionFile);
435     try {
436       IOUtils.readFully(s, content, 0, content.length);
437       if (ProtobufUtil.isPBMagicPrefix(content)) {
438         version = parseVersionFrom(content);
439       } else {
440         // Presume it pre-pb format.
441         InputStream is = new ByteArrayInputStream(content);
442         DataInputStream dis = new DataInputStream(is);
443         try {
444           version = dis.readUTF();
445         } finally {
446           dis.close();
447         }
448         // Update the format
449         LOG.info("Updating the hbase.version file format with version=" + version);
450         setVersion(fs, rootdir, version, 0, HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
451       }
452     } catch (EOFException eof) {
453       LOG.warn("Version file was empty, odd, will try to set it.");
454     } finally {
455       s.close();
456     }
457     return version;
458   }
459 
460   /**
461    * Parse the content of the ${HBASE_ROOTDIR}/hbase.version file.
462    * @param bytes The byte content of the hbase.version file.
463    * @return The version found in the file as a String.
464    * @throws DeserializationException
465    */
466   static String parseVersionFrom(final byte [] bytes)
467   throws DeserializationException {
468     ProtobufUtil.expectPBMagicPrefix(bytes);
469     int pblen = ProtobufUtil.lengthOfPBMagic();
470     FSProtos.HBaseVersionFileContent.Builder builder =
471       FSProtos.HBaseVersionFileContent.newBuilder();
472     FSProtos.HBaseVersionFileContent fileContent;
473     try {
474       fileContent = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
475       return fileContent.getVersion();
476     } catch (InvalidProtocolBufferException e) {
477       // Convert
478       throw new DeserializationException(e);
479     }
480   }
481 
482   /**
483    * Create the content to write into the ${HBASE_ROOTDIR}/hbase.version file.
484    * @param version Version to persist
485    * @return Serialized protobuf with <code>version</code> content and a bit of pb magic for a prefix.
486    */
487   static byte [] toVersionByteArray(final String version) {
488     FSProtos.HBaseVersionFileContent.Builder builder =
489       FSProtos.HBaseVersionFileContent.newBuilder();
490     return ProtobufUtil.prependPBMagic(builder.setVersion(version).build().toByteArray());
491   }
492 
493   /**
494    * Verifies current version of file system
495    *
496    * @param fs file system
497    * @param rootdir root directory of HBase installation
498    * @param message if true, issues a message on System.out
499    *
500    * @throws IOException e
501    * @throws DeserializationException
502    */
503   public static void checkVersion(FileSystem fs, Path rootdir, boolean message)
504   throws IOException, DeserializationException {
505     checkVersion(fs, rootdir, message, 0, HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
506   }
507 
508   /**
509    * Verifies current version of file system
510    *
511    * @param fs file system
512    * @param rootdir root directory of HBase installation
513    * @param message if true, issues a message on System.out
514    * @param wait wait interval
515    * @param retries number of times to retry
516    *
517    * @throws IOException e
518    * @throws DeserializationException
519    */
520   public static void checkVersion(FileSystem fs, Path rootdir,
521       boolean message, int wait, int retries)
522   throws IOException, DeserializationException {
523     String version = getVersion(fs, rootdir);
524     if (version == null) {
525       if (!metaRegionExists(fs, rootdir)) {
526         // rootDir is empty (no version file and no root region)
527         // just create new version file (HBASE-1195)
528         setVersion(fs, rootdir, wait, retries);
529         return;
530       }
531     } else if (version.compareTo(HConstants.FILE_SYSTEM_VERSION) == 0) return;
532 
533     // version is deprecated require migration
534     // Output on stdout so user sees it in terminal.
535     String msg = "HBase file layout needs to be upgraded."
536       + "  You have version " + version
537       + " and I want version " + HConstants.FILE_SYSTEM_VERSION
538       + ".  Is your hbase.rootdir valid?  If so, you may need to run "
539       + "'hbase hbck -fixVersionFile'.";
540     if (message) {
541       System.out.println("WARNING! " + msg);
542     }
543     throw new FileSystemVersionException(msg);
544   }
545 
546   /**
547    * Sets version of file system
548    *
549    * @param fs filesystem object
550    * @param rootdir hbase root
551    * @throws IOException e
552    */
553   public static void setVersion(FileSystem fs, Path rootdir)
554   throws IOException {
555     setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, 0,
556       HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
557   }
558 
559   /**
560    * Sets version of file system
561    *
562    * @param fs filesystem object
563    * @param rootdir hbase root
564    * @param wait time to wait for retry
565    * @param retries number of times to retry before failing
566    * @throws IOException e
567    */
568   public static void setVersion(FileSystem fs, Path rootdir, int wait, int retries)
569   throws IOException {
570     setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, wait, retries);
571   }
572 
573 
574   /**
575    * Sets version of file system
576    *
577    * @param fs filesystem object
578    * @param rootdir hbase root directory
579    * @param version version to set
580    * @param wait time to wait for retry
581    * @param retries number of times to retry before throwing an IOException
582    * @throws IOException e
583    */
584   public static void setVersion(FileSystem fs, Path rootdir, String version,
585       int wait, int retries) throws IOException {
586     Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
587     while (true) {
588       try {
589         FSDataOutputStream s = fs.create(versionFile);
590         s.write(toVersionByteArray(version));
591         s.close();
592         LOG.debug("Created version file at " + rootdir.toString() + " with version=" + version);
593         return;
594       } catch (IOException e) {
595         if (retries > 0) {
596           LOG.warn("Unable to create version file at " + rootdir.toString() + ", retrying", e);
597           fs.delete(versionFile, false);
598           try {
599             if (wait > 0) {
600               Thread.sleep(wait);
601             }
602           } catch (InterruptedException ex) {
603             // ignore
604           }
605           retries--;
606         } else {
607           throw e;
608         }
609       }
610     }
611   }
612 
613   /**
614    * Checks that a cluster ID file exists in the HBase root directory
615    * @param fs the root directory FileSystem
616    * @param rootdir the HBase root directory in HDFS
617    * @param wait how long to wait between retries
618    * @return <code>true</code> if the file exists, otherwise <code>false</code>
619    * @throws IOException if checking the FileSystem fails
620    */
621   public static boolean checkClusterIdExists(FileSystem fs, Path rootdir,
622       int wait) throws IOException {
623     while (true) {
624       try {
625         Path filePath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
626         return fs.exists(filePath);
627       } catch (IOException ioe) {
628         if (wait > 0) {
629           LOG.warn("Unable to check cluster ID file in " + rootdir.toString() +
630               ", retrying in "+wait+"msec: "+StringUtils.stringifyException(ioe));
631           try {
632             Thread.sleep(wait);
633           } catch (InterruptedException ie) {
634             Thread.interrupted();
635             break;
636           }
637         } else {
638           throw ioe;
639         }
640       }
641     }
642     return false;
643   }
644 
645   /**
646    * Returns the value of the unique cluster ID stored for this HBase instance.
647    * @param fs the root directory FileSystem
648    * @param rootdir the path to the HBase root directory
649    * @return the unique cluster identifier
650    * @throws IOException if reading the cluster ID file fails
651    */
652   public static ClusterId getClusterId(FileSystem fs, Path rootdir)
653   throws IOException {
654     Path idPath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
655     ClusterId clusterId = null;
656     FileStatus status = fs.exists(idPath)? fs.getFileStatus(idPath):  null;
657     if (status != null) {
658       int len = Ints.checkedCast(status.getLen());
659       byte [] content = new byte[len];
660       FSDataInputStream in = fs.open(idPath);
661       try {
662         in.readFully(content);
663       } catch (EOFException eof) {
664         LOG.warn("Cluster ID file " + idPath.toString() + " was empty");
665       } finally{
666         in.close();
667       }
668       try {
669         clusterId = ClusterId.parseFrom(content);
670       } catch (DeserializationException e) {
671         throw new IOException("content=" + Bytes.toString(content), e);
672       }
673       // If not pb'd, make it so.
674       if (!ProtobufUtil.isPBMagicPrefix(content)) rewriteAsPb(fs, rootdir, idPath, clusterId);
675       return clusterId;
676     } else {
677       LOG.warn("Cluster ID file does not exist at " + idPath.toString());
678     }
679     return clusterId;
680   }
681 
682   /**
683    * @param cid
684    * @throws IOException
685    */
686   private static void rewriteAsPb(final FileSystem fs, final Path rootdir, final Path p,
687       final ClusterId cid)
688   throws IOException {
689     // Rewrite the file as pb.  Move aside the old one first, write new
690     // then delete the moved-aside file.
691     Path movedAsideName = new Path(p + "." + System.currentTimeMillis());
692     if (!fs.rename(p, movedAsideName)) throw new IOException("Failed rename of " + p);
693     setClusterId(fs, rootdir, cid, 100);
694     if (!fs.delete(movedAsideName, false)) {
695       throw new IOException("Failed delete of " + movedAsideName);
696     }
697     LOG.debug("Rewrote the hbase.id file as pb");
698   }
699 
700   /**
701    * Writes a new unique identifier for this cluster to the "hbase.id" file
702    * in the HBase root directory
703    * @param fs the root directory FileSystem
704    * @param rootdir the path to the HBase root directory
705    * @param clusterId the unique identifier to store
706    * @param wait how long (in milliseconds) to wait between retries
707    * @throws IOException if writing to the FileSystem fails and no wait value
708    */
709   public static void setClusterId(FileSystem fs, Path rootdir, ClusterId clusterId,
710       int wait) throws IOException {
711     while (true) {
712       try {
713         Path filePath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
714         FSDataOutputStream s = fs.create(filePath);
715         try {
716           s.write(clusterId.toByteArray());
717         } finally {
718           s.close();
719         }
720         if (LOG.isDebugEnabled()) {
721           LOG.debug("Created cluster ID file at " + filePath.toString() + " with ID: " + clusterId);
722         }
723         return;
724       } catch (IOException ioe) {
725         if (wait > 0) {
726           LOG.warn("Unable to create cluster ID file in " + rootdir.toString() +
727               ", retrying in " + wait + "msec: " + StringUtils.stringifyException(ioe));
728           try {
729             Thread.sleep(wait);
730           } catch (InterruptedException ie) {
731             Thread.interrupted();
732             break;
733           }
734         } else {
735           throw ioe;
736         }
737       }
738     }
739   }
740 
741   /**
742    * Verifies root directory path is a valid URI with a scheme
743    *
744    * @param root root directory path
745    * @return Passed <code>root</code> argument.
746    * @throws IOException if not a valid URI with a scheme
747    */
748   public static Path validateRootPath(Path root) throws IOException {
749     try {
750       URI rootURI = new URI(root.toString());
751       String scheme = rootURI.getScheme();
752       if (scheme == null) {
753         throw new IOException("Root directory does not have a scheme");
754       }
755       return root;
756     } catch (URISyntaxException e) {
757       IOException io = new IOException("Root directory path is not a valid " +
758         "URI -- check your " + HConstants.HBASE_DIR + " configuration");
759       io.initCause(e);
760       throw io;
761     }
762   }
763 
764   /**
765    * Checks for the presence of the root path (using the provided conf object) in the given path. If
766    * it exists, this method removes it and returns the String representation of remaining relative path.
767    * @param path
768    * @param conf
769    * @return String representation of the remaining relative path
770    * @throws IOException
771    */
772   public static String removeRootPath(Path path, final Configuration conf) throws IOException {
773     Path root = FSUtils.getRootDir(conf);
774     String pathStr = path.toString();
775     // check that the path is absolute... it has the root path in it.
776     if (!pathStr.startsWith(root.toString())) return pathStr;
777     // if not, return as it is.
778     return pathStr.substring(root.toString().length() + 1);// remove the "/" too.
779   }
780 
781   /**
782    * If DFS, check safe mode and if so, wait until we clear it.
783    * @param conf configuration
784    * @param wait Sleep between retries
785    * @throws IOException e
786    */
787   public static void waitOnSafeMode(final Configuration conf,
788     final long wait)
789   throws IOException {
790     FileSystem fs = FileSystem.get(conf);
791     if (!(fs instanceof DistributedFileSystem)) return;
792     DistributedFileSystem dfs = (DistributedFileSystem)fs;
793     // Make sure dfs is not in safe mode
794     while (isInSafeMode(dfs)) {
795       LOG.info("Waiting for dfs to exit safe mode...");
796       try {
797         Thread.sleep(wait);
798       } catch (InterruptedException e) {
799         //continue
800       }
801     }
802   }
803 
804   /**
805    * Return the 'path' component of a Path.  In Hadoop, Path is an URI.  This
806    * method returns the 'path' component of a Path's URI: e.g. If a Path is
807    * <code>hdfs://example.org:9000/hbase_trunk/TestTable/compaction.dir</code>,
808    * this method returns <code>/hbase_trunk/TestTable/compaction.dir</code>.
809    * This method is useful if you want to print out a Path without qualifying
810    * Filesystem instance.
811    * @param p Filesystem Path whose 'path' component we are to return.
812    * @return Path portion of the Filesystem
813    */
814   public static String getPath(Path p) {
815     return p.toUri().getPath();
816   }
817 
818   /**
819    * @param c configuration
820    * @return Path to hbase root directory: i.e. <code>hbase.rootdir</code> from
821    * configuration as a qualified Path.
822    * @throws IOException e
823    */
824   public static Path getRootDir(final Configuration c) throws IOException {
825     Path p = new Path(c.get(HConstants.HBASE_DIR));
826     FileSystem fs = p.getFileSystem(c);
827     return p.makeQualified(fs);
828   }
829 
830   public static void setRootDir(final Configuration c, final Path root) throws IOException {
831     c.set(HConstants.HBASE_DIR, root.toString());
832   }
833 
834   public static void setFsDefault(final Configuration c, final Path root) throws IOException {
835     c.set("fs.defaultFS", root.toString());    // for hadoop 0.21+
836     c.set("fs.default.name", root.toString()); // for hadoop 0.20
837   }
838 
839   /**
840    * Checks if root region exists
841    *
842    * @param fs file system
843    * @param rootdir root directory of HBase installation
844    * @return true if exists
845    * @throws IOException e
846    */
847   @SuppressWarnings("deprecation")
848   public static boolean metaRegionExists(FileSystem fs, Path rootdir)
849   throws IOException {
850     Path rootRegionDir =
851       HRegion.getRegionDir(rootdir, HRegionInfo.FIRST_META_REGIONINFO);
852     return fs.exists(rootRegionDir);
853   }
854 
855   /**
856    * Compute HDFS blocks distribution of a given file, or a portion of the file
857    * @param fs file system
858    * @param status file status of the file
859    * @param start start position of the portion
860    * @param length length of the portion
861    * @return The HDFS blocks distribution
862    */
863   static public HDFSBlocksDistribution computeHDFSBlocksDistribution(
864     final FileSystem fs, FileStatus status, long start, long length)
865     throws IOException {
866     HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution();
867     BlockLocation [] blockLocations =
868       fs.getFileBlockLocations(status, start, length);
869     for(BlockLocation bl : blockLocations) {
870       String [] hosts = bl.getHosts();
871       long len = bl.getLength();
872       blocksDistribution.addHostsAndBlockWeight(hosts, len);
873     }
874 
875     return blocksDistribution;
876   }
877 
878 
879 
880   /**
881    * Runs through the hbase rootdir and checks all stores have only
882    * one file in them -- that is, they've been major compacted.  Looks
883    * at root and meta tables too.
884    * @param fs filesystem
885    * @param hbaseRootDir hbase root directory
886    * @return True if this hbase install is major compacted.
887    * @throws IOException e
888    */
889   public static boolean isMajorCompacted(final FileSystem fs,
890       final Path hbaseRootDir)
891   throws IOException {
892     // Presumes any directory under hbase.rootdir is a table.
893     FileStatus [] tableDirs = fs.listStatus(hbaseRootDir, new DirFilter(fs));
894     for (FileStatus tableDir : tableDirs) {
895       // Skip the .log directory.  All others should be tables.  Inside a table,
896       // there are compaction.dir directories to skip.  Otherwise, all else
897       // should be regions.  Then in each region, should only be family
898       // directories.  Under each of these, should be one file only.
899       Path d = tableDir.getPath();
900       if (d.getName().equals(HConstants.HREGION_LOGDIR_NAME)) {
901         continue;
902       }
903       FileStatus[] regionDirs = fs.listStatus(d, new DirFilter(fs));
904       for (FileStatus regionDir : regionDirs) {
905         Path dd = regionDir.getPath();
906         if (dd.getName().equals(HConstants.HREGION_COMPACTIONDIR_NAME)) {
907           continue;
908         }
909         // Else its a region name.  Now look in region for families.
910         FileStatus[] familyDirs = fs.listStatus(dd, new DirFilter(fs));
911         for (FileStatus familyDir : familyDirs) {
912           Path family = familyDir.getPath();
913           // Now in family make sure only one file.
914           FileStatus[] familyStatus = fs.listStatus(family);
915           if (familyStatus.length > 1) {
916             LOG.debug(family.toString() + " has " + familyStatus.length +
917                 " files.");
918             return false;
919           }
920         }
921       }
922     }
923     return true;
924   }
925 
926   // TODO move this method OUT of FSUtils. No dependencies to HMaster
927   /**
928    * Returns the total overall fragmentation percentage. Includes .META. and
929    * -ROOT- as well.
930    *
931    * @param master  The master defining the HBase root and file system.
932    * @return A map for each table and its percentage.
933    * @throws IOException When scanning the directory fails.
934    */
935   public static int getTotalTableFragmentation(final HMaster master)
936   throws IOException {
937     Map<String, Integer> map = getTableFragmentation(master);
938     return map != null && map.size() > 0 ? map.get("-TOTAL-") : -1;
939   }
940 
941   /**
942    * Runs through the HBase rootdir and checks how many stores for each table
943    * have more than one file in them. Checks -ROOT- and .META. too. The total
944    * percentage across all tables is stored under the special key "-TOTAL-".
945    *
946    * @param master  The master defining the HBase root and file system.
947    * @return A map for each table and its percentage.
948    *
949    * @throws IOException When scanning the directory fails.
950    */
951   public static Map<String, Integer> getTableFragmentation(
952     final HMaster master)
953   throws IOException {
954     Path path = getRootDir(master.getConfiguration());
955     // since HMaster.getFileSystem() is package private
956     FileSystem fs = path.getFileSystem(master.getConfiguration());
957     return getTableFragmentation(fs, path);
958   }
959 
960   /**
961    * Runs through the HBase rootdir and checks how many stores for each table
962    * have more than one file in them. Checks -ROOT- and .META. too. The total
963    * percentage across all tables is stored under the special key "-TOTAL-".
964    *
965    * @param fs  The file system to use.
966    * @param hbaseRootDir  The root directory to scan.
967    * @return A map for each table and its percentage.
968    * @throws IOException When scanning the directory fails.
969    */
970   public static Map<String, Integer> getTableFragmentation(
971     final FileSystem fs, final Path hbaseRootDir)
972   throws IOException {
973     Map<String, Integer> frags = new HashMap<String, Integer>();
974     int cfCountTotal = 0;
975     int cfFragTotal = 0;
976     DirFilter df = new DirFilter(fs);
977     // presumes any directory under hbase.rootdir is a table
978     FileStatus [] tableDirs = fs.listStatus(hbaseRootDir, df);
979     for (FileStatus tableDir : tableDirs) {
980       // Skip the .log directory.  All others should be tables.  Inside a table,
981       // there are compaction.dir directories to skip.  Otherwise, all else
982       // should be regions.  Then in each region, should only be family
983       // directories.  Under each of these, should be one file only.
984       Path d = tableDir.getPath();
985       if (d.getName().equals(HConstants.HREGION_LOGDIR_NAME)) {
986         continue;
987       }
988       int cfCount = 0;
989       int cfFrag = 0;
990       FileStatus[] regionDirs = fs.listStatus(d, df);
991       for (FileStatus regionDir : regionDirs) {
992         Path dd = regionDir.getPath();
993         if (dd.getName().equals(HConstants.HREGION_COMPACTIONDIR_NAME)) {
994           continue;
995         }
996         // else its a region name, now look in region for families
997         FileStatus[] familyDirs = fs.listStatus(dd, df);
998         for (FileStatus familyDir : familyDirs) {
999           cfCount++;
1000           cfCountTotal++;
1001           Path family = familyDir.getPath();
1002           // now in family make sure only one file
1003           FileStatus[] familyStatus = fs.listStatus(family);
1004           if (familyStatus.length > 1) {
1005             cfFrag++;
1006             cfFragTotal++;
1007           }
1008         }
1009       }
1010       // compute percentage per table and store in result list
1011       frags.put(d.getName(), Math.round((float) cfFrag / cfCount * 100));
1012     }
1013     // set overall percentage for all tables
1014     frags.put("-TOTAL-", Math.round((float) cfFragTotal / cfCountTotal * 100));
1015     return frags;
1016   }
1017 
1018   /**
1019    * Expects to find -ROOT- directory.
1020    * @param fs filesystem
1021    * @param hbaseRootDir hbase root directory
1022    * @return True if this a pre020 layout.
1023    * @throws IOException e
1024    */
1025   public static boolean isPre020FileLayout(final FileSystem fs,
1026     final Path hbaseRootDir)
1027   throws IOException {
1028     Path mapfiles = new Path(new Path(new Path(new Path(hbaseRootDir, "-ROOT-"),
1029       "70236052"), "info"), "mapfiles");
1030     return fs.exists(mapfiles);
1031   }
1032 
1033   /**
1034    * Runs through the hbase rootdir and checks all stores have only
1035    * one file in them -- that is, they've been major compacted.  Looks
1036    * at root and meta tables too.  This version differs from
1037    * {@link #isMajorCompacted(FileSystem, Path)} in that it expects a
1038    * pre-0.20.0 hbase layout on the filesystem.  Used migrating.
1039    * @param fs filesystem
1040    * @param hbaseRootDir hbase root directory
1041    * @return True if this hbase install is major compacted.
1042    * @throws IOException e
1043    */
1044   public static boolean isMajorCompactedPre020(final FileSystem fs,
1045       final Path hbaseRootDir)
1046   throws IOException {
1047     // Presumes any directory under hbase.rootdir is a table.
1048     FileStatus [] tableDirs = fs.listStatus(hbaseRootDir, new DirFilter(fs));
1049     for (FileStatus tableDir : tableDirs) {
1050       // Inside a table, there are compaction.dir directories to skip.
1051       // Otherwise, all else should be regions.  Then in each region, should
1052       // only be family directories.  Under each of these, should be a mapfile
1053       // and info directory and in these only one file.
1054       Path d = tableDir.getPath();
1055       if (d.getName().equals(HConstants.HREGION_LOGDIR_NAME)) {
1056         continue;
1057       }
1058       FileStatus[] regionDirs = fs.listStatus(d, new DirFilter(fs));
1059       for (FileStatus regionDir : regionDirs) {
1060         Path dd = regionDir.getPath();
1061         if (dd.getName().equals(HConstants.HREGION_COMPACTIONDIR_NAME)) {
1062           continue;
1063         }
1064         // Else its a region name.  Now look in region for families.
1065         FileStatus[] familyDirs = fs.listStatus(dd, new DirFilter(fs));
1066         for (FileStatus familyDir : familyDirs) {
1067           Path family = familyDir.getPath();
1068           FileStatus[] infoAndMapfile = fs.listStatus(family);
1069           // Assert that only info and mapfile in family dir.
1070           if (infoAndMapfile.length != 0 && infoAndMapfile.length != 2) {
1071             LOG.debug(family.toString() +
1072                 " has more than just info and mapfile: " + infoAndMapfile.length);
1073             return false;
1074           }
1075           // Make sure directory named info or mapfile.
1076           for (int ll = 0; ll < 2; ll++) {
1077             if (infoAndMapfile[ll].getPath().getName().equals("info") ||
1078                 infoAndMapfile[ll].getPath().getName().equals("mapfiles"))
1079               continue;
1080             LOG.debug("Unexpected directory name: " +
1081                 infoAndMapfile[ll].getPath());
1082             return false;
1083           }
1084           // Now in family, there are 'mapfile' and 'info' subdirs.  Just
1085           // look in the 'mapfile' subdir.
1086           FileStatus[] familyStatus =
1087               fs.listStatus(new Path(family, "mapfiles"));
1088           if (familyStatus.length > 1) {
1089             LOG.debug(family.toString() + " has " + familyStatus.length +
1090                 " files.");
1091             return false;
1092           }
1093         }
1094       }
1095     }
1096     return true;
1097   }
1098 
1099   /**
1100    * A {@link PathFilter} that returns only regular files.
1101    */
1102   static class FileFilter implements PathFilter {
1103     private final FileSystem fs;
1104 
1105     public FileFilter(final FileSystem fs) {
1106       this.fs = fs;
1107     }
1108 
1109     @Override
1110     public boolean accept(Path p) {
1111       try {
1112         return fs.isFile(p);
1113       } catch (IOException e) {
1114         LOG.debug("unable to verify if path=" + p + " is a regular file", e);
1115         return false;
1116       }
1117     }
1118   }
1119 
1120   /**
1121    * Directory filter that doesn't include any of the directories in the specified blacklist
1122    */
1123   public static class BlackListDirFilter implements PathFilter {
1124     private final FileSystem fs;
1125     private List<String> blacklist;
1126 
1127     /**
1128      * Create a filter on the give filesystem with the specified blacklist
1129      * @param fs filesystem to filter
1130      * @param directoryNameBlackList list of the names of the directories to filter. If
1131      *          <tt>null</tt>, all directories are returned
1132      */
1133     @SuppressWarnings("unchecked")
1134     public BlackListDirFilter(final FileSystem fs, final List<String> directoryNameBlackList) {
1135       this.fs = fs;
1136       blacklist =
1137         (List<String>) (directoryNameBlackList == null ? Collections.emptyList()
1138           : directoryNameBlackList);
1139     }
1140 
1141     @Override
1142     public boolean accept(Path p) {
1143       boolean isValid = false;
1144       try {
1145         if (blacklist.contains(p.getName().toString())) {
1146           isValid = false;
1147         } else {
1148           isValid = fs.getFileStatus(p).isDir();
1149         }
1150       } catch (IOException e) {
1151         LOG.warn("An error occurred while verifying if [" + p.toString()
1152             + "] is a valid directory. Returning 'not valid' and continuing.", e);
1153       }
1154       return isValid;
1155     }
1156   }
1157 
1158   /**
1159    * A {@link PathFilter} that only allows directories.
1160    */
1161   public static class DirFilter extends BlackListDirFilter {
1162 
1163     public DirFilter(FileSystem fs) {
1164       super(fs, null);
1165     }
1166   }
1167 
1168   /**
1169    * A {@link PathFilter} that returns usertable directories. To get all directories use the
1170    * {@link BlackListDirFilter} with a <tt>null</tt> blacklist
1171    */
1172   public static class UserTableDirFilter extends BlackListDirFilter {
1173 
1174     public UserTableDirFilter(FileSystem fs) {
1175       super(fs, HConstants.HBASE_NON_USER_TABLE_DIRS);
1176     }
1177   }
1178 
1179   /**
1180    * Heuristic to determine whether is safe or not to open a file for append
1181    * Looks both for dfs.support.append and use reflection to search
1182    * for SequenceFile.Writer.syncFs() or FSDataOutputStream.hflush()
1183    * @param conf
1184    * @return True if append support
1185    */
1186   public static boolean isAppendSupported(final Configuration conf) {
1187     boolean append = conf.getBoolean("dfs.support.append", false);
1188     if (append) {
1189       try {
1190         // TODO: The implementation that comes back when we do a createWriter
1191         // may not be using SequenceFile so the below is not a definitive test.
1192         // Will do for now (hdfs-200).
1193         SequenceFile.Writer.class.getMethod("syncFs", new Class<?> []{});
1194         append = true;
1195       } catch (SecurityException e) {
1196       } catch (NoSuchMethodException e) {
1197         append = false;
1198       }
1199     }
1200     if (!append) {
1201       // Look for the 0.21, 0.22, new-style append evidence.
1202       try {
1203         FSDataOutputStream.class.getMethod("hflush", new Class<?> []{});
1204         append = true;
1205       } catch (NoSuchMethodException e) {
1206         append = false;
1207       }
1208     }
1209     return append;
1210   }
1211 
1212   /**
1213    * @param conf
1214    * @return True if this filesystem whose scheme is 'hdfs'.
1215    * @throws IOException
1216    */
1217   public static boolean isHDFS(final Configuration conf) throws IOException {
1218     FileSystem fs = FileSystem.get(conf);
1219     String scheme = fs.getUri().getScheme();
1220     return scheme.equalsIgnoreCase("hdfs");
1221   }
1222 
1223   /**
1224    * Recover file lease. Used when a file might be suspect
1225    * to be had been left open by another process.
1226    * @param fs FileSystem handle
1227    * @param p Path of file to recover lease
1228    * @param conf Configuration handle
1229    * @throws IOException
1230    */
1231   public abstract void recoverFileLease(final FileSystem fs, final Path p,
1232       Configuration conf, CancelableProgressable reporter) throws IOException;
1233 
1234   /**
1235    * @param fs
1236    * @param rootdir
1237    * @return All the table directories under <code>rootdir</code>. Ignore non table hbase folders such as
1238    * .logs, .oldlogs, .corrupt, .META., and -ROOT- folders.
1239    * @throws IOException
1240    */
1241   public static List<Path> getTableDirs(final FileSystem fs, final Path rootdir)
1242   throws IOException {
1243     // presumes any directory under hbase.rootdir is a table
1244     FileStatus[] dirs = fs.listStatus(rootdir, new UserTableDirFilter(fs));
1245     List<Path> tabledirs = new ArrayList<Path>(dirs.length);
1246     for (FileStatus dir: dirs) {
1247       tabledirs.add(dir.getPath());
1248     }
1249     return tabledirs;
1250   }
1251 
1252   public static Path getTablePath(Path rootdir, byte [] tableName) {
1253     return getTablePath(rootdir, Bytes.toString(tableName));
1254   }
1255 
1256   public static Path getTablePath(Path rootdir, final String tableName) {
1257     return new Path(rootdir, tableName);
1258   }
1259 
1260   /**
1261    * Filter for all dirs that don't start with '.'
1262    */
1263   public static class RegionDirFilter implements PathFilter {
1264     // This pattern will accept 0.90+ style hex region dirs and older numeric region dir names.
1265     final public static Pattern regionDirPattern = Pattern.compile("^[0-9a-f]*$");
1266     final FileSystem fs;
1267 
1268     public RegionDirFilter(FileSystem fs) {
1269       this.fs = fs;
1270     }
1271 
1272     @Override
1273     public boolean accept(Path rd) {
1274       if (!regionDirPattern.matcher(rd.getName()).matches()) {
1275         return false;
1276       }
1277 
1278       try {
1279         return fs.getFileStatus(rd).isDir();
1280       } catch (IOException ioe) {
1281         // Maybe the file was moved or the fs was disconnected.
1282         LOG.warn("Skipping file " + rd +" due to IOException", ioe);
1283         return false;
1284       }
1285     }
1286   }
1287 
1288   /**
1289    * Given a particular table dir, return all the regiondirs inside it, excluding files such as
1290    * .tableinfo
1291    * @param fs A file system for the Path
1292    * @param tableDir Path to a specific table directory <hbase.rootdir>/<tabledir>
1293    * @return List of paths to valid region directories in table dir.
1294    * @throws IOException
1295    */
1296   public static List<Path> getRegionDirs(final FileSystem fs, final Path tableDir) throws IOException {
1297     // assumes we are in a table dir.
1298     FileStatus[] rds = fs.listStatus(tableDir, new RegionDirFilter(fs));
1299     List<Path> regionDirs = new ArrayList<Path>(rds.length);
1300     for (FileStatus rdfs: rds) {
1301       Path rdPath = rdfs.getPath();
1302       regionDirs.add(rdPath);
1303     }
1304     return regionDirs;
1305   }
1306 
1307   /**
1308    * Filter for all dirs that are legal column family names.  This is generally used for colfam
1309    * dirs <hbase.rootdir>/<tabledir>/<regiondir>/<colfamdir>.
1310    */
1311   public static class FamilyDirFilter implements PathFilter {
1312     final FileSystem fs;
1313 
1314     public FamilyDirFilter(FileSystem fs) {
1315       this.fs = fs;
1316     }
1317 
1318     @Override
1319     public boolean accept(Path rd) {
1320       try {
1321         // throws IAE if invalid
1322         HColumnDescriptor.isLegalFamilyName(Bytes.toBytes(rd.getName()));
1323       } catch (IllegalArgumentException iae) {
1324         // path name is an invalid family name and thus is excluded.
1325         return false;
1326       }
1327 
1328       try {
1329         return fs.getFileStatus(rd).isDir();
1330       } catch (IOException ioe) {
1331         // Maybe the file was moved or the fs was disconnected.
1332         LOG.warn("Skipping file " + rd +" due to IOException", ioe);
1333         return false;
1334       }
1335     }
1336   }
1337 
1338   /**
1339    * Given a particular region dir, return all the familydirs inside it
1340    *
1341    * @param fs A file system for the Path
1342    * @param regionDir Path to a specific region directory
1343    * @return List of paths to valid family directories in region dir.
1344    * @throws IOException
1345    */
1346   public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir) throws IOException {
1347     // assumes we are in a region dir.
1348     FileStatus[] fds = fs.listStatus(regionDir, new FamilyDirFilter(fs));
1349     List<Path> familyDirs = new ArrayList<Path>(fds.length);
1350     for (FileStatus fdfs: fds) {
1351       Path fdPath = fdfs.getPath();
1352       familyDirs.add(fdPath);
1353     }
1354     return familyDirs;
1355   }
1356 
1357   /**
1358    * Filter for HFiles that excludes reference files.
1359    */
1360   public static class HFileFilter implements PathFilter {
1361     // This pattern will accept 0.90+ style hex hfies files but reject reference files
1362     final public static Pattern hfilePattern = Pattern.compile("^([0-9a-f]+)$");
1363 
1364     final FileSystem fs;
1365 
1366     public HFileFilter(FileSystem fs) {
1367       this.fs = fs;
1368     }
1369 
1370     @Override
1371     public boolean accept(Path rd) {
1372       if (!hfilePattern.matcher(rd.getName()).matches()) {
1373         return false;
1374       }
1375 
1376       try {
1377         // only files
1378         return !fs.getFileStatus(rd).isDir();
1379       } catch (IOException ioe) {
1380         // Maybe the file was moved or the fs was disconnected.
1381         LOG.warn("Skipping file " + rd +" due to IOException", ioe);
1382         return false;
1383       }
1384     }
1385   }
1386 
1387   /**
1388    * @param conf
1389    * @return Returns the filesystem of the hbase rootdir.
1390    * @throws IOException
1391    */
1392   public static FileSystem getCurrentFileSystem(Configuration conf)
1393   throws IOException {
1394     return getRootDir(conf).getFileSystem(conf);
1395   }
1396 
1397 
1398   /**
1399    * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1400    * table StoreFile names to the full Path.
1401    * <br>
1402    * Example...<br>
1403    * Key = 3944417774205889744  <br>
1404    * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1405    *
1406    * @param map map to add values.  If null, this method will create and populate one to return
1407    * @param fs  The file system to use.
1408    * @param hbaseRootDir  The root directory to scan.
1409    * @param tablename name of the table to scan.
1410    * @return Map keyed by StoreFile name with a value of the full Path.
1411    * @throws IOException When scanning the directory fails.
1412    */
1413   public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map, 
1414     final FileSystem fs, final Path hbaseRootDir, byte[] tablename)
1415   throws IOException {
1416     if (map == null) {
1417       map = new HashMap<String, Path>();
1418     }
1419 
1420     // only include the directory paths to tables
1421     Path tableDir = new Path(hbaseRootDir, Bytes.toString(tablename));
1422     // Inside a table, there are compaction.dir directories to skip.  Otherwise, all else
1423     // should be regions. 
1424     PathFilter df = new BlackListDirFilter(fs, HConstants.HBASE_NON_TABLE_DIRS);
1425     FileStatus[] regionDirs = fs.listStatus(tableDir);
1426     for (FileStatus regionDir : regionDirs) {
1427       Path dd = regionDir.getPath();
1428       if (dd.getName().equals(HConstants.HREGION_COMPACTIONDIR_NAME)) {
1429         continue;
1430       }
1431       // else its a region name, now look in region for families
1432       FileStatus[] familyDirs = fs.listStatus(dd, df);
1433       for (FileStatus familyDir : familyDirs) {
1434         Path family = familyDir.getPath();
1435         // now in family, iterate over the StoreFiles and
1436         // put in map
1437         FileStatus[] familyStatus = fs.listStatus(family);
1438         for (FileStatus sfStatus : familyStatus) {
1439           Path sf = sfStatus.getPath();
1440           map.put( sf.getName(), sf);
1441         }
1442       }
1443     }
1444     return map;
1445   }
1446 
1447   
1448   /**
1449    * Runs through the HBase rootdir and creates a reverse lookup map for
1450    * table StoreFile names to the full Path.
1451    * <br>
1452    * Example...<br>
1453    * Key = 3944417774205889744  <br>
1454    * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1455    *
1456    * @param fs  The file system to use.
1457    * @param hbaseRootDir  The root directory to scan.
1458    * @return Map keyed by StoreFile name with a value of the full Path.
1459    * @throws IOException When scanning the directory fails.
1460    */
1461   public static Map<String, Path> getTableStoreFilePathMap(
1462     final FileSystem fs, final Path hbaseRootDir)
1463   throws IOException {
1464     Map<String, Path> map = new HashMap<String, Path>();
1465 
1466     // if this method looks similar to 'getTableFragmentation' that is because
1467     // it was borrowed from it.
1468     
1469     // only include the directory paths to tables
1470     PathFilter df = new BlackListDirFilter(fs, HConstants.HBASE_NON_TABLE_DIRS);
1471     FileStatus [] tableDirs = fs.listStatus(hbaseRootDir, df);
1472     for (FileStatus tableDir : tableDirs) {
1473       byte[] tablename = Bytes.toBytes(tableDir.getPath().getName());
1474       getTableStoreFilePathMap(map, fs, hbaseRootDir, tablename);
1475     }
1476       return map;
1477   }
1478 
1479   /**
1480    * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
1481    * This accommodates differences between hadoop versions, where hadoop 1
1482    * does not throw a FileNotFoundException, and return an empty FileStatus[]
1483    * while Hadoop 2 will throw FileNotFoundException.
1484    *
1485    * @param fs file system
1486    * @param dir directory
1487    * @param filter path filter
1488    * @return null if tabledir doesn't exist, otherwise FileStatus array
1489    */
1490   public static FileStatus [] listStatus(final FileSystem fs,
1491       final Path dir, final PathFilter filter) throws IOException {
1492     FileStatus [] status = null;
1493     try {
1494       status = filter == null ? fs.listStatus(dir) : fs.listStatus(dir, filter);
1495     } catch (FileNotFoundException fnfe) {
1496       // if directory doesn't exist, return null
1497       LOG.debug(dir + " doesn't exist");
1498     }
1499     if (status == null || status.length < 1) return null;
1500     return status;
1501   }
1502 
1503   /**
1504    * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
1505    * This would accommodates differences between hadoop versions
1506    *
1507    * @param fs file system
1508    * @param dir directory
1509    * @return null if tabledir doesn't exist, otherwise FileStatus array
1510    */
1511   public static FileStatus[] listStatus(final FileSystem fs, final Path dir) throws IOException {
1512     return listStatus(fs, dir, null);
1513   }
1514 
1515   /**
1516    * Calls fs.delete() and returns the value returned by the fs.delete()
1517    *
1518    * @param fs
1519    * @param path
1520    * @param recursive
1521    * @return the value returned by the fs.delete()
1522    * @throws IOException
1523    */
1524   public static boolean delete(final FileSystem fs, final Path path, final boolean recursive)
1525       throws IOException {
1526     return fs.delete(path, recursive);
1527   }
1528 
1529   /**
1530    * Calls fs.exists(). Checks if the specified path exists
1531    *
1532    * @param fs
1533    * @param path
1534    * @return the value returned by fs.exists()
1535    * @throws IOException
1536    */
1537   public static boolean isExists(final FileSystem fs, final Path path) throws IOException {
1538     return fs.exists(path);
1539   }
1540 
1541   /**
1542    * Throw an exception if an action is not permitted by a user on a file.
1543    *
1544    * @param ugi
1545    *          the user
1546    * @param file
1547    *          the file
1548    * @param action
1549    *          the action
1550    */
1551   public static void checkAccess(UserGroupInformation ugi, FileStatus file,
1552       FsAction action) throws AccessControlException {
1553     if (ugi.getShortUserName().equals(file.getOwner())) {
1554       if (file.getPermission().getUserAction().implies(action)) {
1555         return;
1556       }
1557     } else if (contains(ugi.getGroupNames(), file.getGroup())) {
1558       if (file.getPermission().getGroupAction().implies(action)) {
1559         return;
1560       }
1561     } else if (file.getPermission().getOtherAction().implies(action)) {
1562       return;
1563     }
1564     throw new AccessControlException("Permission denied:" + " action=" + action
1565         + " path=" + file.getPath() + " user=" + ugi.getShortUserName());
1566   }
1567 
1568   private static boolean contains(String[] groups, String user) {
1569     for (String group : groups) {
1570       if (group.equals(user)) {
1571         return true;
1572       }
1573     }
1574     return false;
1575   }
1576 
1577   /**
1578    * Log the current state of the filesystem from a certain root directory
1579    * @param fs filesystem to investigate
1580    * @param root root file/directory to start logging from
1581    * @param LOG log to output information
1582    * @throws IOException if an unexpected exception occurs
1583    */
1584   public static void logFileSystemState(final FileSystem fs, final Path root, Log LOG)
1585       throws IOException {
1586     LOG.debug("Current file system:");
1587     logFSTree(LOG, fs, root, "|-");
1588   }
1589 
1590   /**
1591    * Recursive helper to log the state of the FS
1592    *
1593    * @see #logFileSystemState(FileSystem, Path, Log)
1594    */
1595   private static void logFSTree(Log LOG, final FileSystem fs, final Path root, String prefix)
1596       throws IOException {
1597     FileStatus[] files = FSUtils.listStatus(fs, root, null);
1598     if (files == null) return;
1599 
1600     for (FileStatus file : files) {
1601       if (file.isDir()) {
1602         LOG.debug(prefix + file.getPath().getName() + "/");
1603         logFSTree(LOG, fs, file.getPath(), prefix + "---");
1604       } else {
1605         LOG.debug(prefix + file.getPath().getName());
1606       }
1607     }
1608   }
1609 }