View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.util;
20  
21  import java.io.ByteArrayInputStream;
22  import java.io.DataInputStream;
23  import java.io.EOFException;
24  import java.io.FileNotFoundException;
25  import java.io.IOException;
26  import java.io.InputStream;
27  import java.io.InterruptedIOException;
28  import java.lang.reflect.InvocationTargetException;
29  import java.lang.reflect.Method;
30  import java.net.InetSocketAddress;
31  import java.net.URI;
32  import java.net.URISyntaxException;
33  import java.util.ArrayList;
34  import java.util.Collections;
35  import java.util.HashMap;
36  import java.util.LinkedList;
37  import java.util.List;
38  import java.util.Map;
39  import java.util.concurrent.ArrayBlockingQueue;
40  import java.util.concurrent.ConcurrentHashMap;
41  import java.util.concurrent.ThreadPoolExecutor;
42  import java.util.concurrent.TimeUnit;
43  import java.util.regex.Pattern;
44  
45  import org.apache.commons.logging.Log;
46  import org.apache.commons.logging.LogFactory;
47  import org.apache.hadoop.hbase.classification.InterfaceAudience;
48  import org.apache.hadoop.conf.Configuration;
49  import org.apache.hadoop.fs.BlockLocation;
50  import org.apache.hadoop.fs.FSDataInputStream;
51  import org.apache.hadoop.fs.FSDataOutputStream;
52  import org.apache.hadoop.fs.FileStatus;
53  import org.apache.hadoop.fs.FileSystem;
54  import org.apache.hadoop.fs.Path;
55  import org.apache.hadoop.fs.PathFilter;
56  import org.apache.hadoop.fs.permission.FsAction;
57  import org.apache.hadoop.fs.permission.FsPermission;
58  import org.apache.hadoop.hbase.ClusterId;
59  import org.apache.hadoop.hbase.HColumnDescriptor;
60  import org.apache.hadoop.hbase.HConstants;
61  import org.apache.hadoop.hbase.HDFSBlocksDistribution;
62  import org.apache.hadoop.hbase.HRegionInfo;
63  import org.apache.hadoop.hbase.TableName;
64  import org.apache.hadoop.hbase.exceptions.DeserializationException;
65  import org.apache.hadoop.hbase.fs.HFileSystem;
66  import org.apache.hadoop.hbase.master.HMaster;
67  import org.apache.hadoop.hbase.master.RegionPlacementMaintainer;
68  import org.apache.hadoop.hbase.security.AccessDeniedException;
69  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
70  import org.apache.hadoop.hbase.protobuf.generated.FSProtos;
71  import org.apache.hadoop.hbase.regionserver.HRegion;
72  import org.apache.hadoop.hdfs.DFSClient;
73  import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
74  import org.apache.hadoop.hdfs.DistributedFileSystem;
75  import org.apache.hadoop.hdfs.protocol.FSConstants;
76  import org.apache.hadoop.io.IOUtils;
77  import org.apache.hadoop.io.SequenceFile;
78  import org.apache.hadoop.ipc.RemoteException;
79  import org.apache.hadoop.security.UserGroupInformation;
80  import org.apache.hadoop.util.Progressable;
81  import org.apache.hadoop.util.ReflectionUtils;
82  import org.apache.hadoop.util.StringUtils;
83  
84  import com.google.common.primitives.Ints;
85  import com.google.protobuf.InvalidProtocolBufferException;
86  
87  /**
88   * Utility methods for interacting with the underlying file system.
89   */
90  @InterfaceAudience.Private
91  public abstract class FSUtils {
92    private static final Log LOG = LogFactory.getLog(FSUtils.class);
93  
94    /** Full access permissions (starting point for a umask) */
95    public static final String FULL_RWX_PERMISSIONS = "777";
96    private static final String THREAD_POOLSIZE = "hbase.client.localityCheck.threadPoolSize";
97    private static final int DEFAULT_THREAD_POOLSIZE = 2;
98  
99    /** Set to true on Windows platforms */
100   public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
101 
102   protected FSUtils() {
103     super();
104   }
105 
106   /**
107    * Compare of path component. Does not consider schema; i.e. if schemas different but <code>path
108    * <code> starts with <code>rootPath<code>, then the function returns true
109    * @param rootPath
110    * @param path
111    * @return True if <code>path</code> starts with <code>rootPath</code>
112    */
113   public static boolean isStartingWithPath(final Path rootPath, final String path) {
114     String uriRootPath = rootPath.toUri().getPath();
115     String tailUriPath = (new Path(path)).toUri().getPath();
116     return tailUriPath.startsWith(uriRootPath);
117   }
118 
119   /**
120    * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
121    * '/a/b/c' part. Does not consider schema; i.e. if schemas different but path or subpath matches,
122    * the two will equate.
123    * @param pathToSearch Path we will be trying to match.
124    * @param pathTail
125    * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
126    */
127   public static boolean isMatchingTail(final Path pathToSearch, String pathTail) {
128     return isMatchingTail(pathToSearch, new Path(pathTail));
129   }
130 
131   /**
132    * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
133    * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true.  Does not consider
134    * schema; i.e. if schemas different but path or subpath matches, the two will equate.
135    * @param pathToSearch Path we will be trying to match.
136    * @param pathTail
137    * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
138    */
139   public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) {
140     if (pathToSearch.depth() != pathTail.depth()) return false;
141     Path tailPath = pathTail;
142     String tailName;
143     Path toSearch = pathToSearch;
144     String toSearchName;
145     boolean result = false;
146     do {
147       tailName = tailPath.getName();
148       if (tailName == null || tailName.length() <= 0) {
149         result = true;
150         break;
151       }
152       toSearchName = toSearch.getName();
153       if (toSearchName == null || toSearchName.length() <= 0) break;
154       // Move up a parent on each path for next go around.  Path doesn't let us go off the end.
155       tailPath = tailPath.getParent();
156       toSearch = toSearch.getParent();
157     } while(tailName.equals(toSearchName));
158     return result;
159   }
160 
161   public static FSUtils getInstance(FileSystem fs, Configuration conf) {
162     String scheme = fs.getUri().getScheme();
163     if (scheme == null) {
164       LOG.warn("Could not find scheme for uri " +
165           fs.getUri() + ", default to hdfs");
166       scheme = "hdfs";
167     }
168     Class<?> fsUtilsClass = conf.getClass("hbase.fsutil." +
169         scheme + ".impl", FSHDFSUtils.class); // Default to HDFS impl
170     FSUtils fsUtils = (FSUtils)ReflectionUtils.newInstance(fsUtilsClass, conf);
171     return fsUtils;
172   }
173 
174   /**
175    * Delete if exists.
176    * @param fs filesystem object
177    * @param dir directory to delete
178    * @return True if deleted <code>dir</code>
179    * @throws IOException e
180    */
181   public static boolean deleteDirectory(final FileSystem fs, final Path dir)
182   throws IOException {
183     return fs.exists(dir) && fs.delete(dir, true);
184   }
185 
186   /**
187    * Return the number of bytes that large input files should be optimally
188    * be split into to minimize i/o time.
189    *
190    * use reflection to search for getDefaultBlockSize(Path f)
191    * if the method doesn't exist, fall back to using getDefaultBlockSize()
192    *
193    * @param fs filesystem object
194    * @return the default block size for the path's filesystem
195    * @throws IOException e
196    */
197   public static long getDefaultBlockSize(final FileSystem fs, final Path path) throws IOException {
198     Method m = null;
199     Class<? extends FileSystem> cls = fs.getClass();
200     try {
201       m = cls.getMethod("getDefaultBlockSize", new Class<?>[] { Path.class });
202     } catch (NoSuchMethodException e) {
203       LOG.info("FileSystem doesn't support getDefaultBlockSize");
204     } catch (SecurityException e) {
205       LOG.info("Doesn't have access to getDefaultBlockSize on FileSystems", e);
206       m = null; // could happen on setAccessible()
207     }
208     if (m == null) {
209       return fs.getDefaultBlockSize(path);
210     } else {
211       try {
212         Object ret = m.invoke(fs, path);
213         return ((Long)ret).longValue();
214       } catch (Exception e) {
215         throw new IOException(e);
216       }
217     }
218   }
219 
220   /*
221    * Get the default replication.
222    *
223    * use reflection to search for getDefaultReplication(Path f)
224    * if the method doesn't exist, fall back to using getDefaultReplication()
225    *
226    * @param fs filesystem object
227    * @param f path of file
228    * @return default replication for the path's filesystem
229    * @throws IOException e
230    */
231   public static short getDefaultReplication(final FileSystem fs, final Path path) throws IOException {
232     Method m = null;
233     Class<? extends FileSystem> cls = fs.getClass();
234     try {
235       m = cls.getMethod("getDefaultReplication", new Class<?>[] { Path.class });
236     } catch (NoSuchMethodException e) {
237       LOG.info("FileSystem doesn't support getDefaultReplication");
238     } catch (SecurityException e) {
239       LOG.info("Doesn't have access to getDefaultReplication on FileSystems", e);
240       m = null; // could happen on setAccessible()
241     }
242     if (m == null) {
243       return fs.getDefaultReplication(path);
244     } else {
245       try {
246         Object ret = m.invoke(fs, path);
247         return ((Number)ret).shortValue();
248       } catch (Exception e) {
249         throw new IOException(e);
250       }
251     }
252   }
253 
254   /**
255    * Returns the default buffer size to use during writes.
256    *
257    * The size of the buffer should probably be a multiple of hardware
258    * page size (4096 on Intel x86), and it determines how much data is
259    * buffered during read and write operations.
260    *
261    * @param fs filesystem object
262    * @return default buffer size to use during writes
263    */
264   public static int getDefaultBufferSize(final FileSystem fs) {
265     return fs.getConf().getInt("io.file.buffer.size", 4096);
266   }
267 
268   /**
269    * Create the specified file on the filesystem. By default, this will:
270    * <ol>
271    * <li>overwrite the file if it exists</li>
272    * <li>apply the umask in the configuration (if it is enabled)</li>
273    * <li>use the fs configured buffer size (or 4096 if not set)</li>
274    * <li>use the default replication</li>
275    * <li>use the default block size</li>
276    * <li>not track progress</li>
277    * </ol>
278    *
279    * @param fs {@link FileSystem} on which to write the file
280    * @param path {@link Path} to the file to write
281    * @param perm permissions
282    * @param favoredNodes
283    * @return output stream to the created file
284    * @throws IOException if the file cannot be created
285    */
286   public static FSDataOutputStream create(FileSystem fs, Path path,
287       FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException {
288     if (fs instanceof HFileSystem) {
289       FileSystem backingFs = ((HFileSystem)fs).getBackingFs();
290       if (backingFs instanceof DistributedFileSystem) {
291         // Try to use the favoredNodes version via reflection to allow backwards-
292         // compatibility.
293         try {
294           return (FSDataOutputStream) (DistributedFileSystem.class
295               .getDeclaredMethod("create", Path.class, FsPermission.class,
296                   boolean.class, int.class, short.class, long.class,
297                   Progressable.class, InetSocketAddress[].class)
298                   .invoke(backingFs, path, perm, true,
299                       getDefaultBufferSize(backingFs),
300                       getDefaultReplication(backingFs, path),
301                       getDefaultBlockSize(backingFs, path),
302                       null, favoredNodes));
303         } catch (InvocationTargetException ite) {
304           // Function was properly called, but threw it's own exception.
305           throw new IOException(ite.getCause());
306         } catch (NoSuchMethodException e) {
307           LOG.debug("DFS Client does not support most favored nodes create; using default create");
308           if (LOG.isTraceEnabled()) LOG.trace("Ignoring; use default create", e);
309         } catch (IllegalArgumentException e) {
310           LOG.debug("Ignoring (most likely Reflection related exception) " + e);
311         } catch (SecurityException e) {
312           LOG.debug("Ignoring (most likely Reflection related exception) " + e);
313         } catch (IllegalAccessException e) {
314           LOG.debug("Ignoring (most likely Reflection related exception) " + e);
315         }
316       }
317     }
318     return create(fs, path, perm, true);
319   }
320 
321   /**
322    * Create the specified file on the filesystem. By default, this will:
323    * <ol>
324    * <li>apply the umask in the configuration (if it is enabled)</li>
325    * <li>use the fs configured buffer size (or 4096 if not set)</li>
326    * <li>use the default replication</li>
327    * <li>use the default block size</li>
328    * <li>not track progress</li>
329    * </ol>
330    *
331    * @param fs {@link FileSystem} on which to write the file
332    * @param path {@link Path} to the file to write
333    * @param perm
334    * @param overwrite Whether or not the created file should be overwritten.
335    * @return output stream to the created file
336    * @throws IOException if the file cannot be created
337    */
338   public static FSDataOutputStream create(FileSystem fs, Path path,
339       FsPermission perm, boolean overwrite) throws IOException {
340     if (LOG.isTraceEnabled()) {
341       LOG.trace("Creating file=" + path + " with permission=" + perm + ", overwrite=" + overwrite);
342     }
343     return fs.create(path, perm, overwrite, getDefaultBufferSize(fs),
344         getDefaultReplication(fs, path), getDefaultBlockSize(fs, path), null);
345   }
346 
347   /**
348    * Get the file permissions specified in the configuration, if they are
349    * enabled.
350    *
351    * @param fs filesystem that the file will be created on.
352    * @param conf configuration to read for determining if permissions are
353    *          enabled and which to use
354    * @param permssionConfKey property key in the configuration to use when
355    *          finding the permission
356    * @return the permission to use when creating a new file on the fs. If
357    *         special permissions are not specified in the configuration, then
358    *         the default permissions on the the fs will be returned.
359    */
360   public static FsPermission getFilePermissions(final FileSystem fs,
361       final Configuration conf, final String permssionConfKey) {
362     boolean enablePermissions = conf.getBoolean(
363         HConstants.ENABLE_DATA_FILE_UMASK, false);
364 
365     if (enablePermissions) {
366       try {
367         FsPermission perm = new FsPermission(FULL_RWX_PERMISSIONS);
368         // make sure that we have a mask, if not, go default.
369         String mask = conf.get(permssionConfKey);
370         if (mask == null)
371           return FsPermission.getFileDefault();
372         // appy the umask
373         FsPermission umask = new FsPermission(mask);
374         return perm.applyUMask(umask);
375       } catch (IllegalArgumentException e) {
376         LOG.warn(
377             "Incorrect umask attempted to be created: "
378                 + conf.get(permssionConfKey)
379                 + ", using default file permissions.", e);
380         return FsPermission.getFileDefault();
381       }
382     }
383     return FsPermission.getFileDefault();
384   }
385 
386   /**
387    * Checks to see if the specified file system is available
388    *
389    * @param fs filesystem
390    * @throws IOException e
391    */
392   public static void checkFileSystemAvailable(final FileSystem fs)
393   throws IOException {
394     if (!(fs instanceof DistributedFileSystem)) {
395       return;
396     }
397     IOException exception = null;
398     DistributedFileSystem dfs = (DistributedFileSystem) fs;
399     try {
400       if (dfs.exists(new Path("/"))) {
401         return;
402       }
403     } catch (IOException e) {
404       exception = e instanceof RemoteException ?
405               ((RemoteException)e).unwrapRemoteException() : e;
406     }
407     try {
408       fs.close();
409     } catch (Exception e) {
410       LOG.error("file system close failed: ", e);
411     }
412     IOException io = new IOException("File system is not available");
413     io.initCause(exception);
414     throw io;
415   }
416 
417   /**
418    * We use reflection because {@link DistributedFileSystem#setSafeMode(
419    * FSConstants.SafeModeAction action, boolean isChecked)} is not in hadoop 1.1
420    *
421    * @param dfs
422    * @return whether we're in safe mode
423    * @throws IOException
424    */
425   private static boolean isInSafeMode(DistributedFileSystem dfs) throws IOException {
426     boolean inSafeMode = false;
427     try {
428       Method m = DistributedFileSystem.class.getMethod("setSafeMode", new Class<?> []{
429           org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction.class, boolean.class});
430       inSafeMode = (Boolean) m.invoke(dfs,
431         org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction.SAFEMODE_GET, true);
432     } catch (Exception e) {
433       if (e instanceof IOException) throw (IOException) e;
434 
435       // Check whether dfs is on safemode.
436       inSafeMode = dfs.setSafeMode(
437         org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction.SAFEMODE_GET);
438     }
439     return inSafeMode;
440   }
441 
442   /**
443    * Check whether dfs is in safemode.
444    * @param conf
445    * @throws IOException
446    */
447   public static void checkDfsSafeMode(final Configuration conf)
448   throws IOException {
449     boolean isInSafeMode = false;
450     FileSystem fs = FileSystem.get(conf);
451     if (fs instanceof DistributedFileSystem) {
452       DistributedFileSystem dfs = (DistributedFileSystem)fs;
453       isInSafeMode = isInSafeMode(dfs);
454     }
455     if (isInSafeMode) {
456       throw new IOException("File system is in safemode, it can't be written now");
457     }
458   }
459 
460   /**
461    * Verifies current version of file system
462    *
463    * @param fs filesystem object
464    * @param rootdir root hbase directory
465    * @return null if no version file exists, version string otherwise.
466    * @throws IOException e
467    * @throws org.apache.hadoop.hbase.exceptions.DeserializationException
468    */
469   public static String getVersion(FileSystem fs, Path rootdir)
470   throws IOException, DeserializationException {
471     Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
472     FileStatus[] status = null;
473     try {
474       // hadoop 2.0 throws FNFE if directory does not exist.
475       // hadoop 1.0 returns null if directory does not exist.
476       status = fs.listStatus(versionFile);
477     } catch (FileNotFoundException fnfe) {
478       return null;
479     }
480     if (status == null || status.length == 0) return null;
481     String version = null;
482     byte [] content = new byte [(int)status[0].getLen()];
483     FSDataInputStream s = fs.open(versionFile);
484     try {
485       IOUtils.readFully(s, content, 0, content.length);
486       if (ProtobufUtil.isPBMagicPrefix(content)) {
487         version = parseVersionFrom(content);
488       } else {
489         // Presume it pre-pb format.
490         InputStream is = new ByteArrayInputStream(content);
491         DataInputStream dis = new DataInputStream(is);
492         try {
493           version = dis.readUTF();
494         } finally {
495           dis.close();
496         }
497       }
498     } catch (EOFException eof) {
499       LOG.warn("Version file was empty, odd, will try to set it.");
500     } finally {
501       s.close();
502     }
503     return version;
504   }
505 
506   /**
507    * Parse the content of the ${HBASE_ROOTDIR}/hbase.version file.
508    * @param bytes The byte content of the hbase.version file.
509    * @return The version found in the file as a String.
510    * @throws DeserializationException
511    */
512   static String parseVersionFrom(final byte [] bytes)
513   throws DeserializationException {
514     ProtobufUtil.expectPBMagicPrefix(bytes);
515     int pblen = ProtobufUtil.lengthOfPBMagic();
516     FSProtos.HBaseVersionFileContent.Builder builder =
517       FSProtos.HBaseVersionFileContent.newBuilder();
518     FSProtos.HBaseVersionFileContent fileContent;
519     try {
520       fileContent = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
521       return fileContent.getVersion();
522     } catch (InvalidProtocolBufferException e) {
523       // Convert
524       throw new DeserializationException(e);
525     }
526   }
527 
528   /**
529    * Create the content to write into the ${HBASE_ROOTDIR}/hbase.version file.
530    * @param version Version to persist
531    * @return Serialized protobuf with <code>version</code> content and a bit of pb magic for a prefix.
532    */
533   static byte [] toVersionByteArray(final String version) {
534     FSProtos.HBaseVersionFileContent.Builder builder =
535       FSProtos.HBaseVersionFileContent.newBuilder();
536     return ProtobufUtil.prependPBMagic(builder.setVersion(version).build().toByteArray());
537   }
538 
539   /**
540    * Verifies current version of file system
541    *
542    * @param fs file system
543    * @param rootdir root directory of HBase installation
544    * @param message if true, issues a message on System.out
545    *
546    * @throws IOException e
547    * @throws DeserializationException
548    */
549   public static void checkVersion(FileSystem fs, Path rootdir, boolean message)
550   throws IOException, DeserializationException {
551     checkVersion(fs, rootdir, message, 0, HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
552   }
553 
554   /**
555    * Verifies current version of file system
556    *
557    * @param fs file system
558    * @param rootdir root directory of HBase installation
559    * @param message if true, issues a message on System.out
560    * @param wait wait interval
561    * @param retries number of times to retry
562    *
563    * @throws IOException e
564    * @throws DeserializationException
565    */
566   public static void checkVersion(FileSystem fs, Path rootdir,
567       boolean message, int wait, int retries)
568   throws IOException, DeserializationException {
569     String version = getVersion(fs, rootdir);
570     if (version == null) {
571       if (!metaRegionExists(fs, rootdir)) {
572         // rootDir is empty (no version file and no root region)
573         // just create new version file (HBASE-1195)
574         setVersion(fs, rootdir, wait, retries);
575         return;
576       }
577     } else if (version.compareTo(HConstants.FILE_SYSTEM_VERSION) == 0) return;
578 
579     // version is deprecated require migration
580     // Output on stdout so user sees it in terminal.
581     String msg = "HBase file layout needs to be upgraded."
582       + " You have version " + version
583       + " and I want version " + HConstants.FILE_SYSTEM_VERSION
584       + ". Consult http://hbase.apache.org/book.html for further information about upgrading HBase."
585       + " Is your hbase.rootdir valid? If so, you may need to run "
586       + "'hbase hbck -fixVersionFile'.";
587     if (message) {
588       System.out.println("WARNING! " + msg);
589     }
590     throw new FileSystemVersionException(msg);
591   }
592 
593   /**
594    * Sets version of file system
595    *
596    * @param fs filesystem object
597    * @param rootdir hbase root
598    * @throws IOException e
599    */
600   public static void setVersion(FileSystem fs, Path rootdir)
601   throws IOException {
602     setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, 0,
603       HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
604   }
605 
606   /**
607    * Sets version of file system
608    *
609    * @param fs filesystem object
610    * @param rootdir hbase root
611    * @param wait time to wait for retry
612    * @param retries number of times to retry before failing
613    * @throws IOException e
614    */
615   public static void setVersion(FileSystem fs, Path rootdir, int wait, int retries)
616   throws IOException {
617     setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, wait, retries);
618   }
619 
620 
621   /**
622    * Sets version of file system
623    *
624    * @param fs filesystem object
625    * @param rootdir hbase root directory
626    * @param version version to set
627    * @param wait time to wait for retry
628    * @param retries number of times to retry before throwing an IOException
629    * @throws IOException e
630    */
631   public static void setVersion(FileSystem fs, Path rootdir, String version,
632       int wait, int retries) throws IOException {
633     Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
634     Path tempVersionFile = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY + Path.SEPARATOR +
635       HConstants.VERSION_FILE_NAME);
636     while (true) {
637       try {
638         // Write the version to a temporary file
639         FSDataOutputStream s = fs.create(tempVersionFile);
640         try {
641           s.write(toVersionByteArray(version));
642           s.close();
643           s = null;
644           // Move the temp version file to its normal location. Returns false
645           // if the rename failed. Throw an IOE in that case.
646           if (!fs.rename(tempVersionFile, versionFile)) {
647             throw new IOException("Unable to move temp version file to " + versionFile);
648           }
649         } finally {
650           // Cleaning up the temporary if the rename failed would be trying
651           // too hard. We'll unconditionally create it again the next time
652           // through anyway, files are overwritten by default by create().
653 
654           // Attempt to close the stream on the way out if it is still open.
655           try {
656             if (s != null) s.close();
657           } catch (IOException ignore) { }
658         }
659         LOG.debug("Created version file at " + rootdir.toString() + " with version=" + version);
660         return;
661       } catch (IOException e) {
662         if (retries > 0) {
663           LOG.warn("Unable to create version file at " + rootdir.toString() + ", retrying", e);
664           fs.delete(versionFile, false);
665           try {
666             if (wait > 0) {
667               Thread.sleep(wait);
668             }
669           } catch (InterruptedException ie) {
670             throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
671           }
672           retries--;
673         } else {
674           throw e;
675         }
676       }
677     }
678   }
679 
680   /**
681    * Checks that a cluster ID file exists in the HBase root directory
682    * @param fs the root directory FileSystem
683    * @param rootdir the HBase root directory in HDFS
684    * @param wait how long to wait between retries
685    * @return <code>true</code> if the file exists, otherwise <code>false</code>
686    * @throws IOException if checking the FileSystem fails
687    */
688   public static boolean checkClusterIdExists(FileSystem fs, Path rootdir,
689       int wait) throws IOException {
690     while (true) {
691       try {
692         Path filePath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
693         return fs.exists(filePath);
694       } catch (IOException ioe) {
695         if (wait > 0) {
696           LOG.warn("Unable to check cluster ID file in " + rootdir.toString() +
697               ", retrying in "+wait+"msec: "+StringUtils.stringifyException(ioe));
698           try {
699             Thread.sleep(wait);
700           } catch (InterruptedException e) {
701             throw (InterruptedIOException)new InterruptedIOException().initCause(e);
702           }
703         } else {
704           throw ioe;
705         }
706       }
707     }
708   }
709 
710   /**
711    * Returns the value of the unique cluster ID stored for this HBase instance.
712    * @param fs the root directory FileSystem
713    * @param rootdir the path to the HBase root directory
714    * @return the unique cluster identifier
715    * @throws IOException if reading the cluster ID file fails
716    */
717   public static ClusterId getClusterId(FileSystem fs, Path rootdir)
718   throws IOException {
719     Path idPath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
720     ClusterId clusterId = null;
721     FileStatus status = fs.exists(idPath)? fs.getFileStatus(idPath):  null;
722     if (status != null) {
723       int len = Ints.checkedCast(status.getLen());
724       byte [] content = new byte[len];
725       FSDataInputStream in = fs.open(idPath);
726       try {
727         in.readFully(content);
728       } catch (EOFException eof) {
729         LOG.warn("Cluster ID file " + idPath.toString() + " was empty");
730       } finally{
731         in.close();
732       }
733       try {
734         clusterId = ClusterId.parseFrom(content);
735       } catch (DeserializationException e) {
736         throw new IOException("content=" + Bytes.toString(content), e);
737       }
738       // If not pb'd, make it so.
739       if (!ProtobufUtil.isPBMagicPrefix(content)) {
740         String cid = null;
741         in = fs.open(idPath);
742         try {
743           cid = in.readUTF();
744           clusterId = new ClusterId(cid);
745         } catch (EOFException eof) {
746           LOG.warn("Cluster ID file " + idPath.toString() + " was empty");
747         } finally {
748           in.close();
749         }
750         rewriteAsPb(fs, rootdir, idPath, clusterId);
751       }
752       return clusterId;
753     } else {
754       LOG.warn("Cluster ID file does not exist at " + idPath.toString());
755     }
756     return clusterId;
757   }
758 
759   /**
760    * @param cid
761    * @throws IOException
762    */
763   private static void rewriteAsPb(final FileSystem fs, final Path rootdir, final Path p,
764       final ClusterId cid)
765   throws IOException {
766     // Rewrite the file as pb.  Move aside the old one first, write new
767     // then delete the moved-aside file.
768     Path movedAsideName = new Path(p + "." + System.currentTimeMillis());
769     if (!fs.rename(p, movedAsideName)) throw new IOException("Failed rename of " + p);
770     setClusterId(fs, rootdir, cid, 100);
771     if (!fs.delete(movedAsideName, false)) {
772       throw new IOException("Failed delete of " + movedAsideName);
773     }
774     LOG.debug("Rewrote the hbase.id file as pb");
775   }
776 
777   /**
778    * Writes a new unique identifier for this cluster to the "hbase.id" file
779    * in the HBase root directory
780    * @param fs the root directory FileSystem
781    * @param rootdir the path to the HBase root directory
782    * @param clusterId the unique identifier to store
783    * @param wait how long (in milliseconds) to wait between retries
784    * @throws IOException if writing to the FileSystem fails and no wait value
785    */
786   public static void setClusterId(FileSystem fs, Path rootdir, ClusterId clusterId,
787       int wait) throws IOException {
788     while (true) {
789       try {
790         Path idFile = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
791         Path tempIdFile = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY +
792           Path.SEPARATOR + HConstants.CLUSTER_ID_FILE_NAME);
793         // Write the id file to a temporary location
794         FSDataOutputStream s = fs.create(tempIdFile);
795         try {
796           s.write(clusterId.toByteArray());
797           s.close();
798           s = null;
799           // Move the temporary file to its normal location. Throw an IOE if
800           // the rename failed
801           if (!fs.rename(tempIdFile, idFile)) {
802             throw new IOException("Unable to move temp version file to " + idFile);
803           }
804         } finally {
805           // Attempt to close the stream if still open on the way out
806           try {
807             if (s != null) s.close();
808           } catch (IOException ignore) { }
809         }
810         if (LOG.isDebugEnabled()) {
811           LOG.debug("Created cluster ID file at " + idFile.toString() + " with ID: " + clusterId);
812         }
813         return;
814       } catch (IOException ioe) {
815         if (wait > 0) {
816           LOG.warn("Unable to create cluster ID file in " + rootdir.toString() +
817               ", retrying in " + wait + "msec: " + StringUtils.stringifyException(ioe));
818           try {
819             Thread.sleep(wait);
820           } catch (InterruptedException e) {
821             throw (InterruptedIOException)new InterruptedIOException().initCause(e);
822           }
823         } else {
824           throw ioe;
825         }
826       }
827     }
828   }
829 
830   /**
831    * Verifies root directory path is a valid URI with a scheme
832    *
833    * @param root root directory path
834    * @return Passed <code>root</code> argument.
835    * @throws IOException if not a valid URI with a scheme
836    */
837   public static Path validateRootPath(Path root) throws IOException {
838     try {
839       URI rootURI = new URI(root.toString());
840       String scheme = rootURI.getScheme();
841       if (scheme == null) {
842         throw new IOException("Root directory does not have a scheme");
843       }
844       return root;
845     } catch (URISyntaxException e) {
846       IOException io = new IOException("Root directory path is not a valid " +
847         "URI -- check your " + HConstants.HBASE_DIR + " configuration");
848       io.initCause(e);
849       throw io;
850     }
851   }
852 
853   /**
854    * Checks for the presence of the root path (using the provided conf object) in the given path. If
855    * it exists, this method removes it and returns the String representation of remaining relative path.
856    * @param path
857    * @param conf
858    * @return String representation of the remaining relative path
859    * @throws IOException
860    */
861   public static String removeRootPath(Path path, final Configuration conf) throws IOException {
862     Path root = FSUtils.getRootDir(conf);
863     String pathStr = path.toString();
864     // check that the path is absolute... it has the root path in it.
865     if (!pathStr.startsWith(root.toString())) return pathStr;
866     // if not, return as it is.
867     return pathStr.substring(root.toString().length() + 1);// remove the "/" too.
868   }
869 
870   /**
871    * If DFS, check safe mode and if so, wait until we clear it.
872    * @param conf configuration
873    * @param wait Sleep between retries
874    * @throws IOException e
875    */
876   public static void waitOnSafeMode(final Configuration conf,
877     final long wait)
878   throws IOException {
879     FileSystem fs = FileSystem.get(conf);
880     if (!(fs instanceof DistributedFileSystem)) return;
881     DistributedFileSystem dfs = (DistributedFileSystem)fs;
882     // Make sure dfs is not in safe mode
883     while (isInSafeMode(dfs)) {
884       LOG.info("Waiting for dfs to exit safe mode...");
885       try {
886         Thread.sleep(wait);
887       } catch (InterruptedException e) {
888         throw (InterruptedIOException)new InterruptedIOException().initCause(e);
889       }
890     }
891   }
892 
893   /**
894    * Return the 'path' component of a Path.  In Hadoop, Path is an URI.  This
895    * method returns the 'path' component of a Path's URI: e.g. If a Path is
896    * <code>hdfs://example.org:9000/hbase_trunk/TestTable/compaction.dir</code>,
897    * this method returns <code>/hbase_trunk/TestTable/compaction.dir</code>.
898    * This method is useful if you want to print out a Path without qualifying
899    * Filesystem instance.
900    * @param p Filesystem Path whose 'path' component we are to return.
901    * @return Path portion of the Filesystem
902    */
903   public static String getPath(Path p) {
904     return p.toUri().getPath();
905   }
906 
907   /**
908    * @param c configuration
909    * @return Path to hbase root directory: i.e. <code>hbase.rootdir</code> from
910    * configuration as a qualified Path.
911    * @throws IOException e
912    */
913   public static Path getRootDir(final Configuration c) throws IOException {
914     Path p = new Path(c.get(HConstants.HBASE_DIR));
915     FileSystem fs = p.getFileSystem(c);
916     return p.makeQualified(fs);
917   }
918 
919   public static void setRootDir(final Configuration c, final Path root) throws IOException {
920     c.set(HConstants.HBASE_DIR, root.toString());
921   }
922 
923   public static void setFsDefault(final Configuration c, final Path root) throws IOException {
924     c.set("fs.defaultFS", root.toString());    // for hadoop 0.21+
925   }
926 
927   /**
928    * Checks if meta region exists
929    *
930    * @param fs file system
931    * @param rootdir root directory of HBase installation
932    * @return true if exists
933    * @throws IOException e
934    */
935   @SuppressWarnings("deprecation")
936   public static boolean metaRegionExists(FileSystem fs, Path rootdir)
937   throws IOException {
938     Path metaRegionDir =
939       HRegion.getRegionDir(rootdir, HRegionInfo.FIRST_META_REGIONINFO);
940     return fs.exists(metaRegionDir);
941   }
942 
943   /**
944    * Compute HDFS blocks distribution of a given file, or a portion of the file
945    * @param fs file system
946    * @param status file status of the file
947    * @param start start position of the portion
948    * @param length length of the portion
949    * @return The HDFS blocks distribution
950    */
951   static public HDFSBlocksDistribution computeHDFSBlocksDistribution(
952     final FileSystem fs, FileStatus status, long start, long length)
953     throws IOException {
954     HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution();
955     BlockLocation [] blockLocations =
956       fs.getFileBlockLocations(status, start, length);
957     for(BlockLocation bl : blockLocations) {
958       String [] hosts = bl.getHosts();
959       long len = bl.getLength();
960       blocksDistribution.addHostsAndBlockWeight(hosts, len);
961     }
962 
963     return blocksDistribution;
964   }
965 
966 
967 
968   /**
969    * Runs through the hbase rootdir and checks all stores have only
970    * one file in them -- that is, they've been major compacted.  Looks
971    * at root and meta tables too.
972    * @param fs filesystem
973    * @param hbaseRootDir hbase root directory
974    * @return True if this hbase install is major compacted.
975    * @throws IOException e
976    */
977   public static boolean isMajorCompacted(final FileSystem fs,
978       final Path hbaseRootDir)
979   throws IOException {
980     List<Path> tableDirs = getTableDirs(fs, hbaseRootDir);
981     PathFilter regionFilter = new RegionDirFilter(fs);
982     PathFilter familyFilter = new FamilyDirFilter(fs);
983     for (Path d : tableDirs) {
984       FileStatus[] regionDirs = fs.listStatus(d, regionFilter);
985       for (FileStatus regionDir : regionDirs) {
986         Path dd = regionDir.getPath();
987         // Else its a region name.  Now look in region for families.
988         FileStatus[] familyDirs = fs.listStatus(dd, familyFilter);
989         for (FileStatus familyDir : familyDirs) {
990           Path family = familyDir.getPath();
991           // Now in family make sure only one file.
992           FileStatus[] familyStatus = fs.listStatus(family);
993           if (familyStatus.length > 1) {
994             LOG.debug(family.toString() + " has " + familyStatus.length +
995                 " files.");
996             return false;
997           }
998         }
999       }
1000     }
1001     return true;
1002   }
1003 
1004   // TODO move this method OUT of FSUtils. No dependencies to HMaster
1005   /**
1006    * Returns the total overall fragmentation percentage. Includes hbase:meta and
1007    * -ROOT- as well.
1008    *
1009    * @param master  The master defining the HBase root and file system.
1010    * @return A map for each table and its percentage.
1011    * @throws IOException When scanning the directory fails.
1012    */
1013   public static int getTotalTableFragmentation(final HMaster master)
1014   throws IOException {
1015     Map<String, Integer> map = getTableFragmentation(master);
1016     return map != null && map.size() > 0 ? map.get("-TOTAL-") : -1;
1017   }
1018 
1019   /**
1020    * Runs through the HBase rootdir and checks how many stores for each table
1021    * have more than one file in them. Checks -ROOT- and hbase:meta too. The total
1022    * percentage across all tables is stored under the special key "-TOTAL-".
1023    *
1024    * @param master  The master defining the HBase root and file system.
1025    * @return A map for each table and its percentage.
1026    *
1027    * @throws IOException When scanning the directory fails.
1028    */
1029   public static Map<String, Integer> getTableFragmentation(
1030     final HMaster master)
1031   throws IOException {
1032     Path path = getRootDir(master.getConfiguration());
1033     // since HMaster.getFileSystem() is package private
1034     FileSystem fs = path.getFileSystem(master.getConfiguration());
1035     return getTableFragmentation(fs, path);
1036   }
1037 
1038   /**
1039    * Runs through the HBase rootdir and checks how many stores for each table
1040    * have more than one file in them. Checks -ROOT- and hbase:meta too. The total
1041    * percentage across all tables is stored under the special key "-TOTAL-".
1042    *
1043    * @param fs  The file system to use.
1044    * @param hbaseRootDir  The root directory to scan.
1045    * @return A map for each table and its percentage.
1046    * @throws IOException When scanning the directory fails.
1047    */
1048   public static Map<String, Integer> getTableFragmentation(
1049     final FileSystem fs, final Path hbaseRootDir)
1050   throws IOException {
1051     Map<String, Integer> frags = new HashMap<String, Integer>();
1052     int cfCountTotal = 0;
1053     int cfFragTotal = 0;
1054     PathFilter regionFilter = new RegionDirFilter(fs);
1055     PathFilter familyFilter = new FamilyDirFilter(fs);
1056     List<Path> tableDirs = getTableDirs(fs, hbaseRootDir);
1057     for (Path d : tableDirs) {
1058       int cfCount = 0;
1059       int cfFrag = 0;
1060       FileStatus[] regionDirs = fs.listStatus(d, regionFilter);
1061       for (FileStatus regionDir : regionDirs) {
1062         Path dd = regionDir.getPath();
1063         // else its a region name, now look in region for families
1064         FileStatus[] familyDirs = fs.listStatus(dd, familyFilter);
1065         for (FileStatus familyDir : familyDirs) {
1066           cfCount++;
1067           cfCountTotal++;
1068           Path family = familyDir.getPath();
1069           // now in family make sure only one file
1070           FileStatus[] familyStatus = fs.listStatus(family);
1071           if (familyStatus.length > 1) {
1072             cfFrag++;
1073             cfFragTotal++;
1074           }
1075         }
1076       }
1077       // compute percentage per table and store in result list
1078       frags.put(FSUtils.getTableName(d).getNameAsString(),
1079         cfCount == 0? 0: Math.round((float) cfFrag / cfCount * 100));
1080     }
1081     // set overall percentage for all tables
1082     frags.put("-TOTAL-",
1083       cfCountTotal == 0? 0: Math.round((float) cfFragTotal / cfCountTotal * 100));
1084     return frags;
1085   }
1086 
1087   /**
1088    * Returns the {@link org.apache.hadoop.fs.Path} object representing the table directory under
1089    * path rootdir
1090    *
1091    * @param rootdir qualified path of HBase root directory
1092    * @param tableName name of table
1093    * @return {@link org.apache.hadoop.fs.Path} for table
1094    */
1095   public static Path getTableDir(Path rootdir, final TableName tableName) {
1096     return new Path(getNamespaceDir(rootdir, tableName.getNamespaceAsString()),
1097         tableName.getQualifierAsString());
1098   }
1099 
1100   /**
1101    * Returns the {@link org.apache.hadoop.hbase.TableName} object representing
1102    * the table directory under
1103    * path rootdir
1104    *
1105    * @param tablePath path of table
1106    * @return {@link org.apache.hadoop.fs.Path} for table
1107    */
1108   public static TableName getTableName(Path tablePath) {
1109     return TableName.valueOf(tablePath.getParent().getName(), tablePath.getName());
1110   }
1111 
1112   /**
1113    * Returns the {@link org.apache.hadoop.fs.Path} object representing
1114    * the namespace directory under path rootdir
1115    *
1116    * @param rootdir qualified path of HBase root directory
1117    * @param namespace namespace name
1118    * @return {@link org.apache.hadoop.fs.Path} for table
1119    */
1120   public static Path getNamespaceDir(Path rootdir, final String namespace) {
1121     return new Path(rootdir, new Path(HConstants.BASE_NAMESPACE_DIR,
1122         new Path(namespace)));
1123   }
1124 
1125   /**
1126    * A {@link PathFilter} that returns only regular files.
1127    */
1128   static class FileFilter implements PathFilter {
1129     private final FileSystem fs;
1130 
1131     public FileFilter(final FileSystem fs) {
1132       this.fs = fs;
1133     }
1134 
1135     @Override
1136     public boolean accept(Path p) {
1137       try {
1138         return fs.isFile(p);
1139       } catch (IOException e) {
1140         LOG.debug("unable to verify if path=" + p + " is a regular file", e);
1141         return false;
1142       }
1143     }
1144   }
1145 
1146   /**
1147    * Directory filter that doesn't include any of the directories in the specified blacklist
1148    */
1149   public static class BlackListDirFilter implements PathFilter {
1150     private final FileSystem fs;
1151     private List<String> blacklist;
1152 
1153     /**
1154      * Create a filter on the give filesystem with the specified blacklist
1155      * @param fs filesystem to filter
1156      * @param directoryNameBlackList list of the names of the directories to filter. If
1157      *          <tt>null</tt>, all directories are returned
1158      */
1159     @SuppressWarnings("unchecked")
1160     public BlackListDirFilter(final FileSystem fs, final List<String> directoryNameBlackList) {
1161       this.fs = fs;
1162       blacklist =
1163         (List<String>) (directoryNameBlackList == null ? Collections.emptyList()
1164           : directoryNameBlackList);
1165     }
1166 
1167     @Override
1168     public boolean accept(Path p) {
1169       boolean isValid = false;
1170       try {
1171         if (isValidName(p.getName())) {
1172           isValid = fs.getFileStatus(p).isDirectory();
1173         } else {
1174           isValid = false;
1175         }
1176       } catch (IOException e) {
1177         LOG.warn("An error occurred while verifying if [" + p.toString()
1178             + "] is a valid directory. Returning 'not valid' and continuing.", e);
1179       }
1180       return isValid;
1181     }
1182 
1183     protected boolean isValidName(final String name) {
1184       return !blacklist.contains(name);
1185     }
1186   }
1187 
1188   /**
1189    * A {@link PathFilter} that only allows directories.
1190    */
1191   public static class DirFilter extends BlackListDirFilter {
1192 
1193     public DirFilter(FileSystem fs) {
1194       super(fs, null);
1195     }
1196   }
1197 
1198   /**
1199    * A {@link PathFilter} that returns usertable directories. To get all directories use the
1200    * {@link BlackListDirFilter} with a <tt>null</tt> blacklist
1201    */
1202   public static class UserTableDirFilter extends BlackListDirFilter {
1203     public UserTableDirFilter(FileSystem fs) {
1204       super(fs, HConstants.HBASE_NON_TABLE_DIRS);
1205     }
1206 
1207     protected boolean isValidName(final String name) {
1208       if (!super.isValidName(name))
1209         return false;
1210 
1211       try {
1212         TableName.isLegalTableQualifierName(Bytes.toBytes(name));
1213       } catch (IllegalArgumentException e) {
1214         LOG.info("INVALID NAME " + name);
1215         return false;
1216       }
1217       return true;
1218     }
1219   }
1220 
1221   /**
1222    * Heuristic to determine whether is safe or not to open a file for append
1223    * Looks both for dfs.support.append and use reflection to search
1224    * for SequenceFile.Writer.syncFs() or FSDataOutputStream.hflush()
1225    * @param conf
1226    * @return True if append support
1227    */
1228   public static boolean isAppendSupported(final Configuration conf) {
1229     boolean append = conf.getBoolean("dfs.support.append", false);
1230     if (append) {
1231       try {
1232         // TODO: The implementation that comes back when we do a createWriter
1233         // may not be using SequenceFile so the below is not a definitive test.
1234         // Will do for now (hdfs-200).
1235         SequenceFile.Writer.class.getMethod("syncFs", new Class<?> []{});
1236         append = true;
1237       } catch (SecurityException e) {
1238       } catch (NoSuchMethodException e) {
1239         append = false;
1240       }
1241     }
1242     if (!append) {
1243       // Look for the 0.21, 0.22, new-style append evidence.
1244       try {
1245         FSDataOutputStream.class.getMethod("hflush", new Class<?> []{});
1246         append = true;
1247       } catch (NoSuchMethodException e) {
1248         append = false;
1249       }
1250     }
1251     return append;
1252   }
1253 
1254   /**
1255    * @param conf
1256    * @return True if this filesystem whose scheme is 'hdfs'.
1257    * @throws IOException
1258    */
1259   public static boolean isHDFS(final Configuration conf) throws IOException {
1260     FileSystem fs = FileSystem.get(conf);
1261     String scheme = fs.getUri().getScheme();
1262     return scheme.equalsIgnoreCase("hdfs");
1263   }
1264 
1265   /**
1266    * Recover file lease. Used when a file might be suspect
1267    * to be had been left open by another process.
1268    * @param fs FileSystem handle
1269    * @param p Path of file to recover lease
1270    * @param conf Configuration handle
1271    * @throws IOException
1272    */
1273   public abstract void recoverFileLease(final FileSystem fs, final Path p,
1274       Configuration conf, CancelableProgressable reporter) throws IOException;
1275 
1276   public static List<Path> getTableDirs(final FileSystem fs, final Path rootdir)
1277       throws IOException {
1278     List<Path> tableDirs = new LinkedList<Path>();
1279 
1280     for(FileStatus status :
1281         fs.globStatus(new Path(rootdir,
1282             new Path(HConstants.BASE_NAMESPACE_DIR, "*")))) {
1283       tableDirs.addAll(FSUtils.getLocalTableDirs(fs, status.getPath()));
1284     }
1285     return tableDirs;
1286   }
1287 
1288   /**
1289    * @param fs
1290    * @param rootdir
1291    * @return All the table directories under <code>rootdir</code>. Ignore non table hbase folders such as
1292    * .logs, .oldlogs, .corrupt folders.
1293    * @throws IOException
1294    */
1295   public static List<Path> getLocalTableDirs(final FileSystem fs, final Path rootdir)
1296       throws IOException {
1297     // presumes any directory under hbase.rootdir is a table
1298     FileStatus[] dirs = fs.listStatus(rootdir, new UserTableDirFilter(fs));
1299     List<Path> tabledirs = new ArrayList<Path>(dirs.length);
1300     for (FileStatus dir: dirs) {
1301       tabledirs.add(dir.getPath());
1302     }
1303     return tabledirs;
1304   }
1305 
1306   /**
1307    * Checks if the given path is the one with 'recovered.edits' dir.
1308    * @param path
1309    * @return True if we recovered edits
1310    */
1311   public static boolean isRecoveredEdits(Path path) {
1312     return path.toString().contains(HConstants.RECOVERED_EDITS_DIR);
1313   }
1314 
1315   /**
1316    * Filter for all dirs that don't start with '.'
1317    */
1318   public static class RegionDirFilter implements PathFilter {
1319     // This pattern will accept 0.90+ style hex region dirs and older numeric region dir names.
1320     final public static Pattern regionDirPattern = Pattern.compile("^[0-9a-f]*$");
1321     final FileSystem fs;
1322 
1323     public RegionDirFilter(FileSystem fs) {
1324       this.fs = fs;
1325     }
1326 
1327     @Override
1328     public boolean accept(Path rd) {
1329       if (!regionDirPattern.matcher(rd.getName()).matches()) {
1330         return false;
1331       }
1332 
1333       try {
1334         return fs.getFileStatus(rd).isDirectory();
1335       } catch (IOException ioe) {
1336         // Maybe the file was moved or the fs was disconnected.
1337         LOG.warn("Skipping file " + rd +" due to IOException", ioe);
1338         return false;
1339       }
1340     }
1341   }
1342 
1343   /**
1344    * Given a particular table dir, return all the regiondirs inside it, excluding files such as
1345    * .tableinfo
1346    * @param fs A file system for the Path
1347    * @param tableDir Path to a specific table directory <hbase.rootdir>/<tabledir>
1348    * @return List of paths to valid region directories in table dir.
1349    * @throws IOException
1350    */
1351   public static List<Path> getRegionDirs(final FileSystem fs, final Path tableDir) throws IOException {
1352     // assumes we are in a table dir.
1353     FileStatus[] rds = fs.listStatus(tableDir, new RegionDirFilter(fs));
1354     List<Path> regionDirs = new ArrayList<Path>(rds.length);
1355     for (FileStatus rdfs: rds) {
1356       Path rdPath = rdfs.getPath();
1357       regionDirs.add(rdPath);
1358     }
1359     return regionDirs;
1360   }
1361 
1362   /**
1363    * Filter for all dirs that are legal column family names.  This is generally used for colfam
1364    * dirs <hbase.rootdir>/<tabledir>/<regiondir>/<colfamdir>.
1365    */
1366   public static class FamilyDirFilter implements PathFilter {
1367     final FileSystem fs;
1368 
1369     public FamilyDirFilter(FileSystem fs) {
1370       this.fs = fs;
1371     }
1372 
1373     @Override
1374     public boolean accept(Path rd) {
1375       try {
1376         // throws IAE if invalid
1377         HColumnDescriptor.isLegalFamilyName(Bytes.toBytes(rd.getName()));
1378       } catch (IllegalArgumentException iae) {
1379         // path name is an invalid family name and thus is excluded.
1380         return false;
1381       }
1382 
1383       try {
1384         return fs.getFileStatus(rd).isDirectory();
1385       } catch (IOException ioe) {
1386         // Maybe the file was moved or the fs was disconnected.
1387         LOG.warn("Skipping file " + rd +" due to IOException", ioe);
1388         return false;
1389       }
1390     }
1391   }
1392 
1393   /**
1394    * Given a particular region dir, return all the familydirs inside it
1395    *
1396    * @param fs A file system for the Path
1397    * @param regionDir Path to a specific region directory
1398    * @return List of paths to valid family directories in region dir.
1399    * @throws IOException
1400    */
1401   public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir) throws IOException {
1402     // assumes we are in a region dir.
1403     FileStatus[] fds = fs.listStatus(regionDir, new FamilyDirFilter(fs));
1404     List<Path> familyDirs = new ArrayList<Path>(fds.length);
1405     for (FileStatus fdfs: fds) {
1406       Path fdPath = fdfs.getPath();
1407       familyDirs.add(fdPath);
1408     }
1409     return familyDirs;
1410   }
1411 
1412   /**
1413    * Filter for HFiles that excludes reference files.
1414    */
1415   public static class HFileFilter implements PathFilter {
1416     // This pattern will accept 0.90+ style hex hfies files but reject reference files
1417     final public static Pattern hfilePattern = Pattern.compile("^([0-9a-f]+)$");
1418 
1419     final FileSystem fs;
1420 
1421     public HFileFilter(FileSystem fs) {
1422       this.fs = fs;
1423     }
1424 
1425     @Override
1426     public boolean accept(Path rd) {
1427       if (!hfilePattern.matcher(rd.getName()).matches()) {
1428         return false;
1429       }
1430 
1431       try {
1432         // only files
1433         return !fs.getFileStatus(rd).isDirectory();
1434       } catch (IOException ioe) {
1435         // Maybe the file was moved or the fs was disconnected.
1436         LOG.warn("Skipping file " + rd +" due to IOException", ioe);
1437         return false;
1438       }
1439     }
1440   }
1441 
1442   /**
1443    * @param conf
1444    * @return Returns the filesystem of the hbase rootdir.
1445    * @throws IOException
1446    */
1447   public static FileSystem getCurrentFileSystem(Configuration conf)
1448   throws IOException {
1449     return getRootDir(conf).getFileSystem(conf);
1450   }
1451 
1452 
1453   /**
1454    * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1455    * table StoreFile names to the full Path.
1456    * <br>
1457    * Example...<br>
1458    * Key = 3944417774205889744  <br>
1459    * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1460    *
1461    * @param map map to add values.  If null, this method will create and populate one to return
1462    * @param fs  The file system to use.
1463    * @param hbaseRootDir  The root directory to scan.
1464    * @param tableName name of the table to scan.
1465    * @return Map keyed by StoreFile name with a value of the full Path.
1466    * @throws IOException When scanning the directory fails.
1467    */
1468   public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map,
1469   final FileSystem fs, final Path hbaseRootDir, TableName tableName)
1470   throws IOException {
1471     if (map == null) {
1472       map = new HashMap<String, Path>();
1473     }
1474 
1475     // only include the directory paths to tables
1476     Path tableDir = FSUtils.getTableDir(hbaseRootDir, tableName);
1477     // Inside a table, there are compaction.dir directories to skip.  Otherwise, all else
1478     // should be regions.
1479     PathFilter familyFilter = new FamilyDirFilter(fs);
1480     FileStatus[] regionDirs = fs.listStatus(tableDir, new RegionDirFilter(fs));
1481     for (FileStatus regionDir : regionDirs) {
1482       Path dd = regionDir.getPath();
1483       // else its a region name, now look in region for families
1484       FileStatus[] familyDirs = fs.listStatus(dd, familyFilter);
1485       for (FileStatus familyDir : familyDirs) {
1486         Path family = familyDir.getPath();
1487         // now in family, iterate over the StoreFiles and
1488         // put in map
1489         FileStatus[] familyStatus = fs.listStatus(family);
1490         for (FileStatus sfStatus : familyStatus) {
1491           Path sf = sfStatus.getPath();
1492           map.put( sf.getName(), sf);
1493         }
1494       }
1495     }
1496     return map;
1497   }
1498 
1499 
1500   /**
1501    * Runs through the HBase rootdir and creates a reverse lookup map for
1502    * table StoreFile names to the full Path.
1503    * <br>
1504    * Example...<br>
1505    * Key = 3944417774205889744  <br>
1506    * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1507    *
1508    * @param fs  The file system to use.
1509    * @param hbaseRootDir  The root directory to scan.
1510    * @return Map keyed by StoreFile name with a value of the full Path.
1511    * @throws IOException When scanning the directory fails.
1512    */
1513   public static Map<String, Path> getTableStoreFilePathMap(
1514     final FileSystem fs, final Path hbaseRootDir)
1515   throws IOException {
1516     Map<String, Path> map = new HashMap<String, Path>();
1517 
1518     // if this method looks similar to 'getTableFragmentation' that is because
1519     // it was borrowed from it.
1520 
1521     // only include the directory paths to tables
1522     for (Path tableDir : FSUtils.getTableDirs(fs, hbaseRootDir)) {
1523       getTableStoreFilePathMap(map, fs, hbaseRootDir,
1524           FSUtils.getTableName(tableDir));
1525     }
1526     return map;
1527   }
1528 
1529   /**
1530    * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
1531    * This accommodates differences between hadoop versions, where hadoop 1
1532    * does not throw a FileNotFoundException, and return an empty FileStatus[]
1533    * while Hadoop 2 will throw FileNotFoundException.
1534    *
1535    * @param fs file system
1536    * @param dir directory
1537    * @param filter path filter
1538    * @return null if dir is empty or doesn't exist, otherwise FileStatus array
1539    */
1540   public static FileStatus [] listStatus(final FileSystem fs,
1541       final Path dir, final PathFilter filter) throws IOException {
1542     FileStatus [] status = null;
1543     try {
1544       status = filter == null ? fs.listStatus(dir) : fs.listStatus(dir, filter);
1545     } catch (FileNotFoundException fnfe) {
1546       // if directory doesn't exist, return null
1547       if (LOG.isTraceEnabled()) {
1548         LOG.trace(dir + " doesn't exist");
1549       }
1550     }
1551     if (status == null || status.length < 1) return null;
1552     return status;
1553   }
1554 
1555   /**
1556    * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
1557    * This would accommodates differences between hadoop versions
1558    *
1559    * @param fs file system
1560    * @param dir directory
1561    * @return null if dir is empty or doesn't exist, otherwise FileStatus array
1562    */
1563   public static FileStatus[] listStatus(final FileSystem fs, final Path dir) throws IOException {
1564     return listStatus(fs, dir, null);
1565   }
1566 
1567   /**
1568    * Calls fs.delete() and returns the value returned by the fs.delete()
1569    *
1570    * @param fs
1571    * @param path
1572    * @param recursive
1573    * @return the value returned by the fs.delete()
1574    * @throws IOException
1575    */
1576   public static boolean delete(final FileSystem fs, final Path path, final boolean recursive)
1577       throws IOException {
1578     return fs.delete(path, recursive);
1579   }
1580 
1581   /**
1582    * Calls fs.exists(). Checks if the specified path exists
1583    *
1584    * @param fs
1585    * @param path
1586    * @return the value returned by fs.exists()
1587    * @throws IOException
1588    */
1589   public static boolean isExists(final FileSystem fs, final Path path) throws IOException {
1590     return fs.exists(path);
1591   }
1592 
1593   /**
1594    * Throw an exception if an action is not permitted by a user on a file.
1595    *
1596    * @param ugi
1597    *          the user
1598    * @param file
1599    *          the file
1600    * @param action
1601    *          the action
1602    */
1603   public static void checkAccess(UserGroupInformation ugi, FileStatus file,
1604       FsAction action) throws AccessDeniedException {
1605     if (ugi.getShortUserName().equals(file.getOwner())) {
1606       if (file.getPermission().getUserAction().implies(action)) {
1607         return;
1608       }
1609     } else if (contains(ugi.getGroupNames(), file.getGroup())) {
1610       if (file.getPermission().getGroupAction().implies(action)) {
1611         return;
1612       }
1613     } else if (file.getPermission().getOtherAction().implies(action)) {
1614       return;
1615     }
1616     throw new AccessDeniedException("Permission denied:" + " action=" + action
1617         + " path=" + file.getPath() + " user=" + ugi.getShortUserName());
1618   }
1619 
1620   private static boolean contains(String[] groups, String user) {
1621     for (String group : groups) {
1622       if (group.equals(user)) {
1623         return true;
1624       }
1625     }
1626     return false;
1627   }
1628 
1629   /**
1630    * Log the current state of the filesystem from a certain root directory
1631    * @param fs filesystem to investigate
1632    * @param root root file/directory to start logging from
1633    * @param LOG log to output information
1634    * @throws IOException if an unexpected exception occurs
1635    */
1636   public static void logFileSystemState(final FileSystem fs, final Path root, Log LOG)
1637       throws IOException {
1638     LOG.debug("Current file system:");
1639     logFSTree(LOG, fs, root, "|-");
1640   }
1641 
1642   /**
1643    * Recursive helper to log the state of the FS
1644    *
1645    * @see #logFileSystemState(FileSystem, Path, Log)
1646    */
1647   private static void logFSTree(Log LOG, final FileSystem fs, final Path root, String prefix)
1648       throws IOException {
1649     FileStatus[] files = FSUtils.listStatus(fs, root, null);
1650     if (files == null) return;
1651 
1652     for (FileStatus file : files) {
1653       if (file.isDirectory()) {
1654         LOG.debug(prefix + file.getPath().getName() + "/");
1655         logFSTree(LOG, fs, file.getPath(), prefix + "---");
1656       } else {
1657         LOG.debug(prefix + file.getPath().getName());
1658       }
1659     }
1660   }
1661 
1662   public static boolean renameAndSetModifyTime(final FileSystem fs, final Path src, final Path dest)
1663       throws IOException {
1664     // set the modify time for TimeToLive Cleaner
1665     fs.setTimes(src, EnvironmentEdgeManager.currentTime(), -1);
1666     return fs.rename(src, dest);
1667   }
1668 
1669   /**
1670    * This function is to scan the root path of the file system to get the
1671    * degree of locality for each region on each of the servers having at least
1672    * one block of that region.
1673    * This is used by the tool {@link RegionPlacementMaintainer}
1674    *
1675    * @param conf
1676    *          the configuration to use
1677    * @return the mapping from region encoded name to a map of server names to
1678    *           locality fraction
1679    * @throws IOException
1680    *           in case of file system errors or interrupts
1681    */
1682   public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
1683       final Configuration conf) throws IOException {
1684     return getRegionDegreeLocalityMappingFromFS(
1685         conf, null,
1686         conf.getInt(THREAD_POOLSIZE, DEFAULT_THREAD_POOLSIZE));
1687 
1688   }
1689 
1690   /**
1691    * This function is to scan the root path of the file system to get the
1692    * degree of locality for each region on each of the servers having at least
1693    * one block of that region.
1694    *
1695    * @param conf
1696    *          the configuration to use
1697    * @param desiredTable
1698    *          the table you wish to scan locality for
1699    * @param threadPoolSize
1700    *          the thread pool size to use
1701    * @return the mapping from region encoded name to a map of server names to
1702    *           locality fraction
1703    * @throws IOException
1704    *           in case of file system errors or interrupts
1705    */
1706   public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
1707       final Configuration conf, final String desiredTable, int threadPoolSize)
1708       throws IOException {
1709     Map<String, Map<String, Float>> regionDegreeLocalityMapping =
1710         new ConcurrentHashMap<String, Map<String, Float>>();
1711     getRegionLocalityMappingFromFS(conf, desiredTable, threadPoolSize, null,
1712         regionDegreeLocalityMapping);
1713     return regionDegreeLocalityMapping;
1714   }
1715 
1716   /**
1717    * This function is to scan the root path of the file system to get either the
1718    * mapping between the region name and its best locality region server or the
1719    * degree of locality of each region on each of the servers having at least
1720    * one block of that region. The output map parameters are both optional.
1721    *
1722    * @param conf
1723    *          the configuration to use
1724    * @param desiredTable
1725    *          the table you wish to scan locality for
1726    * @param threadPoolSize
1727    *          the thread pool size to use
1728    * @param regionToBestLocalityRSMapping
1729    *          the map into which to put the best locality mapping or null
1730    * @param regionDegreeLocalityMapping
1731    *          the map into which to put the locality degree mapping or null,
1732    *          must be a thread-safe implementation
1733    * @throws IOException
1734    *           in case of file system errors or interrupts
1735    */
1736   private static void getRegionLocalityMappingFromFS(
1737       final Configuration conf, final String desiredTable,
1738       int threadPoolSize,
1739       Map<String, String> regionToBestLocalityRSMapping,
1740       Map<String, Map<String, Float>> regionDegreeLocalityMapping)
1741       throws IOException {
1742     FileSystem fs =  FileSystem.get(conf);
1743     Path rootPath = FSUtils.getRootDir(conf);
1744     long startTime = EnvironmentEdgeManager.currentTime();
1745     Path queryPath;
1746     // The table files are in ${hbase.rootdir}/data/<namespace>/<table>/*
1747     if (null == desiredTable) {
1748       queryPath = new Path(new Path(rootPath, HConstants.BASE_NAMESPACE_DIR).toString() + "/*/*/*/");
1749     } else {
1750       queryPath = new Path(FSUtils.getTableDir(rootPath, TableName.valueOf(desiredTable)).toString() + "/*/");
1751     }
1752 
1753     // reject all paths that are not appropriate
1754     PathFilter pathFilter = new PathFilter() {
1755       @Override
1756       public boolean accept(Path path) {
1757         // this is the region name; it may get some noise data
1758         if (null == path) {
1759           return false;
1760         }
1761 
1762         // no parent?
1763         Path parent = path.getParent();
1764         if (null == parent) {
1765           return false;
1766         }
1767 
1768         String regionName = path.getName();
1769         if (null == regionName) {
1770           return false;
1771         }
1772 
1773         if (!regionName.toLowerCase().matches("[0-9a-f]+")) {
1774           return false;
1775         }
1776         return true;
1777       }
1778     };
1779 
1780     FileStatus[] statusList = fs.globStatus(queryPath, pathFilter);
1781 
1782     if (null == statusList) {
1783       return;
1784     } else {
1785       LOG.debug("Query Path: " + queryPath + " ; # list of files: " +
1786           statusList.length);
1787     }
1788 
1789     // lower the number of threads in case we have very few expected regions
1790     threadPoolSize = Math.min(threadPoolSize, statusList.length);
1791 
1792     // run in multiple threads
1793     ThreadPoolExecutor tpe = new ThreadPoolExecutor(threadPoolSize,
1794         threadPoolSize, 60, TimeUnit.SECONDS,
1795         new ArrayBlockingQueue<Runnable>(statusList.length));
1796     try {
1797       // ignore all file status items that are not of interest
1798       for (FileStatus regionStatus : statusList) {
1799         if (null == regionStatus) {
1800           continue;
1801         }
1802 
1803         if (!regionStatus.isDirectory()) {
1804           continue;
1805         }
1806 
1807         Path regionPath = regionStatus.getPath();
1808         if (null == regionPath) {
1809           continue;
1810         }
1811 
1812         tpe.execute(new FSRegionScanner(fs, regionPath,
1813             regionToBestLocalityRSMapping, regionDegreeLocalityMapping));
1814       }
1815     } finally {
1816       tpe.shutdown();
1817       int threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY,
1818           60 * 1000);
1819       try {
1820         // here we wait until TPE terminates, which is either naturally or by
1821         // exceptions in the execution of the threads
1822         while (!tpe.awaitTermination(threadWakeFrequency,
1823             TimeUnit.MILLISECONDS)) {
1824           // printing out rough estimate, so as to not introduce
1825           // AtomicInteger
1826           LOG.info("Locality checking is underway: { Scanned Regions : "
1827               + tpe.getCompletedTaskCount() + "/"
1828               + tpe.getTaskCount() + " }");
1829         }
1830       } catch (InterruptedException e) {
1831         throw (InterruptedIOException)new InterruptedIOException().initCause(e);
1832       }
1833     }
1834 
1835     long overhead = EnvironmentEdgeManager.currentTime() - startTime;
1836     String overheadMsg = "Scan DFS for locality info takes " + overhead + " ms";
1837 
1838     LOG.info(overheadMsg);
1839   }
1840 
1841   /**
1842    * Do our short circuit read setup.
1843    * Checks buffer size to use and whether to do checksumming in hbase or hdfs.
1844    * @param conf
1845    */
1846   public static void setupShortCircuitRead(final Configuration conf) {
1847     // Check that the user has not set the "dfs.client.read.shortcircuit.skip.checksum" property.
1848     boolean shortCircuitSkipChecksum =
1849       conf.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false);
1850     boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
1851     if (shortCircuitSkipChecksum) {
1852       LOG.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not " +
1853         "be set to true." + (useHBaseChecksum ? " HBase checksum doesn't require " +
1854         "it, see https://issues.apache.org/jira/browse/HBASE-6868." : ""));
1855       assert !shortCircuitSkipChecksum; //this will fail if assertions are on
1856     }
1857     checkShortCircuitReadBufferSize(conf);
1858   }
1859 
1860   /**
1861    * Check if short circuit read buffer size is set and if not, set it to hbase value.
1862    * @param conf
1863    */
1864   public static void checkShortCircuitReadBufferSize(final Configuration conf) {
1865     final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2;
1866     final int notSet = -1;
1867     // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2
1868     final String dfsKey = "dfs.client.read.shortcircuit.buffer.size";
1869     int size = conf.getInt(dfsKey, notSet);
1870     // If a size is set, return -- we will use it.
1871     if (size != notSet) return;
1872     // But short circuit buffer size is normally not set.  Put in place the hbase wanted size.
1873     int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize);
1874     conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
1875   }
1876 
1877   /**
1878    * @param c
1879    * @return The DFSClient DFSHedgedReadMetrics instance or null if can't be found or not on hdfs.
1880    * @throws IOException 
1881    */
1882   public static DFSHedgedReadMetrics getDFSHedgedReadMetrics(final Configuration c)
1883       throws IOException {
1884     if (!isHDFS(c)) return null;
1885     // getHedgedReadMetrics is package private. Get the DFSClient instance that is internal
1886     // to the DFS FS instance and make the method getHedgedReadMetrics accessible, then invoke it
1887     // to get the singleton instance of DFSHedgedReadMetrics shared by DFSClients.
1888     final String name = "getHedgedReadMetrics";
1889     DFSClient dfsclient = ((DistributedFileSystem)FileSystem.get(c)).getClient();
1890     Method m;
1891     try {
1892       m = dfsclient.getClass().getDeclaredMethod(name);
1893     } catch (NoSuchMethodException e) {
1894       LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " +
1895           e.getMessage());
1896       return null;
1897     } catch (SecurityException e) {
1898       LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " +
1899           e.getMessage());
1900       return null;
1901     }
1902     m.setAccessible(true);
1903     try {
1904       return (DFSHedgedReadMetrics)m.invoke(dfsclient);
1905     } catch (IllegalAccessException e) {
1906       LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " +
1907           e.getMessage());
1908       return null;
1909     } catch (IllegalArgumentException e) {
1910       LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " +
1911           e.getMessage());
1912       return null;
1913     } catch (InvocationTargetException e) {
1914       LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " +
1915           e.getMessage());
1916       return null;
1917     }
1918   }
1919 }