View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.util;
20  
21  import java.io.ByteArrayInputStream;
22  import java.io.DataInputStream;
23  import java.io.EOFException;
24  import java.io.FileNotFoundException;
25  import java.io.IOException;
26  import java.io.InputStream;
27  import java.io.InterruptedIOException;
28  import java.lang.reflect.InvocationTargetException;
29  import java.lang.reflect.Method;
30  import java.net.InetSocketAddress;
31  import java.net.URI;
32  import java.net.URISyntaxException;
33  import java.util.ArrayList;
34  import java.util.Collections;
35  import java.util.HashMap;
36  import java.util.LinkedList;
37  import java.util.List;
38  import java.util.Map;
39  import java.util.concurrent.ArrayBlockingQueue;
40  import java.util.concurrent.ConcurrentHashMap;
41  import java.util.concurrent.ThreadPoolExecutor;
42  import java.util.concurrent.TimeUnit;
43  import java.util.regex.Pattern;
44  
45  import org.apache.commons.logging.Log;
46  import org.apache.commons.logging.LogFactory;
47  import org.apache.hadoop.classification.InterfaceAudience;
48  import org.apache.hadoop.conf.Configuration;
49  import org.apache.hadoop.fs.BlockLocation;
50  import org.apache.hadoop.fs.FSDataInputStream;
51  import org.apache.hadoop.fs.FSDataOutputStream;
52  import org.apache.hadoop.fs.FileStatus;
53  import org.apache.hadoop.fs.FileSystem;
54  import org.apache.hadoop.fs.Path;
55  import org.apache.hadoop.fs.PathFilter;
56  import org.apache.hadoop.fs.permission.FsAction;
57  import org.apache.hadoop.fs.permission.FsPermission;
58  import org.apache.hadoop.hbase.ClusterId;
59  import org.apache.hadoop.hbase.HColumnDescriptor;
60  import org.apache.hadoop.hbase.HConstants;
61  import org.apache.hadoop.hbase.HDFSBlocksDistribution;
62  import org.apache.hadoop.hbase.HRegionInfo;
63  import org.apache.hadoop.hbase.RemoteExceptionHandler;
64  import org.apache.hadoop.hbase.TableName;
65  import org.apache.hadoop.hbase.exceptions.DeserializationException;
66  import org.apache.hadoop.hbase.fs.HFileSystem;
67  import org.apache.hadoop.hbase.master.HMaster;
68  import org.apache.hadoop.hbase.master.RegionPlacementMaintainer;
69  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
70  import org.apache.hadoop.hbase.protobuf.generated.FSProtos;
71  import org.apache.hadoop.hbase.regionserver.HRegion;
72  import org.apache.hadoop.hdfs.DistributedFileSystem;
73  import org.apache.hadoop.hdfs.protocol.FSConstants;
74  import org.apache.hadoop.io.IOUtils;
75  import org.apache.hadoop.io.SequenceFile;
76  import org.apache.hadoop.security.AccessControlException;
77  import org.apache.hadoop.security.UserGroupInformation;
78  import org.apache.hadoop.util.Progressable;
79  import org.apache.hadoop.util.ReflectionUtils;
80  import org.apache.hadoop.util.StringUtils;
81  
82  import com.google.common.primitives.Ints;
83  import com.google.protobuf.InvalidProtocolBufferException;
84  
85  /**
86   * Utility methods for interacting with the underlying file system.
87   */
88  @InterfaceAudience.Private
89  public abstract class FSUtils {
90    private static final Log LOG = LogFactory.getLog(FSUtils.class);
91  
92    /** Full access permissions (starting point for a umask) */
93    private static final String FULL_RWX_PERMISSIONS = "777";
94    private static final String THREAD_POOLSIZE = "hbase.client.localityCheck.threadPoolSize";
95    private static final int DEFAULT_THREAD_POOLSIZE = 2;
96  
97    /** Set to true on Windows platforms */
98    public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
99  
100   protected FSUtils() {
101     super();
102   }
103 
104   /**
105    * Compare of path component. Does not consider schema; i.e. if schemas different but <code>path
106    * <code> starts with <code>rootPath<code>, then the function returns true
107    * @param rootPath
108    * @param path
109    * @return True if <code>path</code> starts with <code>rootPath</code>
110    */
111   public static boolean isStartingWithPath(final Path rootPath, final String path) {
112     String uriRootPath = rootPath.toUri().getPath();
113     String tailUriPath = (new Path(path)).toUri().getPath();
114     return tailUriPath.startsWith(uriRootPath);
115   }
116 
117   /**
118    * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
119    * '/a/b/c' part. Does not consider schema; i.e. if schemas different but path or subpath matches,
120    * the two will equate.
121    * @param pathToSearch Path we will be trying to match.
122    * @param pathTail
123    * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
124    */
125   public static boolean isMatchingTail(final Path pathToSearch, String pathTail) {
126     return isMatchingTail(pathToSearch, new Path(pathTail));
127   }
128 
129   /**
130    * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
131    * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true.  Does not consider
132    * schema; i.e. if schemas different but path or subpath matches, the two will equate.
133    * @param pathToSearch Path we will be trying to match.
134    * @param pathTail
135    * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
136    */
137   public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) {
138     if (pathToSearch.depth() != pathTail.depth()) return false;
139     Path tailPath = pathTail;
140     String tailName;
141     Path toSearch = pathToSearch;
142     String toSearchName;
143     boolean result = false;
144     do {
145       tailName = tailPath.getName();
146       if (tailName == null || tailName.length() <= 0) {
147         result = true;
148         break;
149       }
150       toSearchName = toSearch.getName();
151       if (toSearchName == null || toSearchName.length() <= 0) break;
152       // Move up a parent on each path for next go around.  Path doesn't let us go off the end.
153       tailPath = tailPath.getParent();
154       toSearch = toSearch.getParent();
155     } while(tailName.equals(toSearchName));
156     return result;
157   }
158 
159   public static FSUtils getInstance(FileSystem fs, Configuration conf) {
160     String scheme = fs.getUri().getScheme();
161     if (scheme == null) {
162       LOG.warn("Could not find scheme for uri " +
163           fs.getUri() + ", default to hdfs");
164       scheme = "hdfs";
165     }
166     Class<?> fsUtilsClass = conf.getClass("hbase.fsutil." +
167         scheme + ".impl", FSHDFSUtils.class); // Default to HDFS impl
168     FSUtils fsUtils = (FSUtils)ReflectionUtils.newInstance(fsUtilsClass, conf);
169     return fsUtils;
170   }
171 
172   /**
173    * Delete if exists.
174    * @param fs filesystem object
175    * @param dir directory to delete
176    * @return True if deleted <code>dir</code>
177    * @throws IOException e
178    */
179   public static boolean deleteDirectory(final FileSystem fs, final Path dir)
180   throws IOException {
181     return fs.exists(dir) && fs.delete(dir, true);
182   }
183 
184   /**
185    * Return the number of bytes that large input files should be optimally
186    * be split into to minimize i/o time.
187    *
188    * use reflection to search for getDefaultBlockSize(Path f)
189    * if the method doesn't exist, fall back to using getDefaultBlockSize()
190    *
191    * @param fs filesystem object
192    * @return the default block size for the path's filesystem
193    * @throws IOException e
194    */
195   public static long getDefaultBlockSize(final FileSystem fs, final Path path) throws IOException {
196     Method m = null;
197     Class<? extends FileSystem> cls = fs.getClass();
198     try {
199       m = cls.getMethod("getDefaultBlockSize", new Class<?>[] { Path.class });
200     } catch (NoSuchMethodException e) {
201       LOG.info("FileSystem doesn't support getDefaultBlockSize");
202     } catch (SecurityException e) {
203       LOG.info("Doesn't have access to getDefaultBlockSize on FileSystems", e);
204       m = null; // could happen on setAccessible()
205     }
206     if (m == null) {
207       return fs.getDefaultBlockSize();
208     } else {
209       try {
210         Object ret = m.invoke(fs, path);
211         return ((Long)ret).longValue();
212       } catch (Exception e) {
213         throw new IOException(e);
214       }
215     }
216   }
217 
218   /*
219    * Get the default replication.
220    *
221    * use reflection to search for getDefaultReplication(Path f)
222    * if the method doesn't exist, fall back to using getDefaultReplication()
223    *
224    * @param fs filesystem object
225    * @param f path of file
226    * @return default replication for the path's filesystem
227    * @throws IOException e
228    */
229   public static short getDefaultReplication(final FileSystem fs, final Path path) throws IOException {
230     Method m = null;
231     Class<? extends FileSystem> cls = fs.getClass();
232     try {
233       m = cls.getMethod("getDefaultReplication", new Class<?>[] { Path.class });
234     } catch (NoSuchMethodException e) {
235       LOG.info("FileSystem doesn't support getDefaultReplication");
236     } catch (SecurityException e) {
237       LOG.info("Doesn't have access to getDefaultReplication on FileSystems", e);
238       m = null; // could happen on setAccessible()
239     }
240     if (m == null) {
241       return fs.getDefaultReplication();
242     } else {
243       try {
244         Object ret = m.invoke(fs, path);
245         return ((Number)ret).shortValue();
246       } catch (Exception e) {
247         throw new IOException(e);
248       }
249     }
250   }
251 
252   /**
253    * Returns the default buffer size to use during writes.
254    *
255    * The size of the buffer should probably be a multiple of hardware
256    * page size (4096 on Intel x86), and it determines how much data is
257    * buffered during read and write operations.
258    *
259    * @param fs filesystem object
260    * @return default buffer size to use during writes
261    */
262   public static int getDefaultBufferSize(final FileSystem fs) {
263     return fs.getConf().getInt("io.file.buffer.size", 4096);
264   }
265 
266   /**
267    * Create the specified file on the filesystem. By default, this will:
268    * <ol>
269    * <li>overwrite the file if it exists</li>
270    * <li>apply the umask in the configuration (if it is enabled)</li>
271    * <li>use the fs configured buffer size (or 4096 if not set)</li>
272    * <li>use the default replication</li>
273    * <li>use the default block size</li>
274    * <li>not track progress</li>
275    * </ol>
276    *
277    * @param fs {@link FileSystem} on which to write the file
278    * @param path {@link Path} to the file to write
279    * @param perm permissions
280    * @param favoredNodes
281    * @return output stream to the created file
282    * @throws IOException if the file cannot be created
283    */
284   public static FSDataOutputStream create(FileSystem fs, Path path,
285       FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException {
286     if (fs instanceof HFileSystem) {
287       FileSystem backingFs = ((HFileSystem)fs).getBackingFs();
288       if (backingFs instanceof DistributedFileSystem) {
289         // Try to use the favoredNodes version via reflection to allow backwards-
290         // compatibility.
291         try {
292           return (FSDataOutputStream) (DistributedFileSystem.class
293               .getDeclaredMethod("create", Path.class, FsPermission.class,
294                   boolean.class, int.class, short.class, long.class,
295                   Progressable.class, InetSocketAddress[].class)
296                   .invoke(backingFs, path, FsPermission.getDefault(), true,
297                       getDefaultBufferSize(backingFs),
298                       getDefaultReplication(backingFs, path),
299                       getDefaultBlockSize(backingFs, path),
300                       null, favoredNodes));
301         } catch (InvocationTargetException ite) {
302           // Function was properly called, but threw it's own exception.
303           throw new IOException(ite.getCause());
304         } catch (NoSuchMethodException e) {
305           LOG.debug("DFS Client does not support most favored nodes create; using default create");
306           if (LOG.isTraceEnabled()) LOG.trace("Ignoring; use default create", e);
307         } catch (IllegalArgumentException e) {
308           LOG.debug("Ignoring (most likely Reflection related exception) " + e);
309         } catch (SecurityException e) {
310           LOG.debug("Ignoring (most likely Reflection related exception) " + e);
311         } catch (IllegalAccessException e) {
312           LOG.debug("Ignoring (most likely Reflection related exception) " + e);
313         }
314       }
315     }
316     return create(fs, path, perm, true);
317   }
318 
319   /**
320    * Create the specified file on the filesystem. By default, this will:
321    * <ol>
322    * <li>apply the umask in the configuration (if it is enabled)</li>
323    * <li>use the fs configured buffer size (or 4096 if not set)</li>
324    * <li>use the default replication</li>
325    * <li>use the default block size</li>
326    * <li>not track progress</li>
327    * </ol>
328    *
329    * @param fs {@link FileSystem} on which to write the file
330    * @param path {@link Path} to the file to write
331    * @param perm
332    * @param overwrite Whether or not the created file should be overwritten.
333    * @return output stream to the created file
334    * @throws IOException if the file cannot be created
335    */
336   public static FSDataOutputStream create(FileSystem fs, Path path,
337       FsPermission perm, boolean overwrite) throws IOException {
338     if (LOG.isTraceEnabled()) {
339       LOG.trace("Creating file=" + path + " with permission=" + perm + ", overwrite=" + overwrite);
340     }
341     return fs.create(path, perm, overwrite, getDefaultBufferSize(fs),
342         getDefaultReplication(fs, path), getDefaultBlockSize(fs, path), null);
343   }
344 
345   /**
346    * Get the file permissions specified in the configuration, if they are
347    * enabled.
348    *
349    * @param fs filesystem that the file will be created on.
350    * @param conf configuration to read for determining if permissions are
351    *          enabled and which to use
352    * @param permssionConfKey property key in the configuration to use when
353    *          finding the permission
354    * @return the permission to use when creating a new file on the fs. If
355    *         special permissions are not specified in the configuration, then
356    *         the default permissions on the the fs will be returned.
357    */
358   public static FsPermission getFilePermissions(final FileSystem fs,
359       final Configuration conf, final String permssionConfKey) {
360     boolean enablePermissions = conf.getBoolean(
361         HConstants.ENABLE_DATA_FILE_UMASK, false);
362 
363     if (enablePermissions) {
364       try {
365         FsPermission perm = new FsPermission(FULL_RWX_PERMISSIONS);
366         // make sure that we have a mask, if not, go default.
367         String mask = conf.get(permssionConfKey);
368         if (mask == null)
369           return FsPermission.getDefault();
370         // appy the umask
371         FsPermission umask = new FsPermission(mask);
372         return perm.applyUMask(umask);
373       } catch (IllegalArgumentException e) {
374         LOG.warn(
375             "Incorrect umask attempted to be created: "
376                 + conf.get(permssionConfKey)
377                 + ", using default file permissions.", e);
378         return FsPermission.getDefault();
379       }
380     }
381     return FsPermission.getDefault();
382   }
383 
384   /**
385    * Checks to see if the specified file system is available
386    *
387    * @param fs filesystem
388    * @throws IOException e
389    */
390   public static void checkFileSystemAvailable(final FileSystem fs)
391   throws IOException {
392     if (!(fs instanceof DistributedFileSystem)) {
393       return;
394     }
395     IOException exception = null;
396     DistributedFileSystem dfs = (DistributedFileSystem) fs;
397     try {
398       if (dfs.exists(new Path("/"))) {
399         return;
400       }
401     } catch (IOException e) {
402       exception = RemoteExceptionHandler.checkIOException(e);
403     }
404     try {
405       fs.close();
406     } catch (Exception e) {
407       LOG.error("file system close failed: ", e);
408     }
409     IOException io = new IOException("File system is not available");
410     io.initCause(exception);
411     throw io;
412   }
413 
414   /**
415    * We use reflection because {@link DistributedFileSystem#setSafeMode(
416    * FSConstants.SafeModeAction action, boolean isChecked)} is not in hadoop 1.1
417    *
418    * @param dfs
419    * @return whether we're in safe mode
420    * @throws IOException
421    */
422   private static boolean isInSafeMode(DistributedFileSystem dfs) throws IOException {
423     boolean inSafeMode = false;
424     try {
425       Method m = DistributedFileSystem.class.getMethod("setSafeMode", new Class<?> []{
426           org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction.class, boolean.class});
427       inSafeMode = (Boolean) m.invoke(dfs,
428         org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction.SAFEMODE_GET, true);
429     } catch (Exception e) {
430       if (e instanceof IOException) throw (IOException) e;
431 
432       // Check whether dfs is on safemode.
433       inSafeMode = dfs.setSafeMode(
434         org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction.SAFEMODE_GET);
435     }
436     return inSafeMode;
437   }
438 
439   /**
440    * Check whether dfs is in safemode.
441    * @param conf
442    * @throws IOException
443    */
444   public static void checkDfsSafeMode(final Configuration conf)
445   throws IOException {
446     boolean isInSafeMode = false;
447     FileSystem fs = FileSystem.get(conf);
448     if (fs instanceof DistributedFileSystem) {
449       DistributedFileSystem dfs = (DistributedFileSystem)fs;
450       isInSafeMode = isInSafeMode(dfs);
451     }
452     if (isInSafeMode) {
453       throw new IOException("File system is in safemode, it can't be written now");
454     }
455   }
456 
457   /**
458    * Verifies current version of file system
459    *
460    * @param fs filesystem object
461    * @param rootdir root hbase directory
462    * @return null if no version file exists, version string otherwise.
463    * @throws IOException e
464    * @throws org.apache.hadoop.hbase.exceptions.DeserializationException
465    */
466   public static String getVersion(FileSystem fs, Path rootdir)
467   throws IOException, DeserializationException {
468     Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
469     FileStatus[] status = null;
470     try {
471       // hadoop 2.0 throws FNFE if directory does not exist.
472       // hadoop 1.0 returns null if directory does not exist.
473       status = fs.listStatus(versionFile);
474     } catch (FileNotFoundException fnfe) {
475       return null;
476     }
477     if (status == null || status.length == 0) return null;
478     String version = null;
479     byte [] content = new byte [(int)status[0].getLen()];
480     FSDataInputStream s = fs.open(versionFile);
481     try {
482       IOUtils.readFully(s, content, 0, content.length);
483       if (ProtobufUtil.isPBMagicPrefix(content)) {
484         version = parseVersionFrom(content);
485       } else {
486         // Presume it pre-pb format.
487         InputStream is = new ByteArrayInputStream(content);
488         DataInputStream dis = new DataInputStream(is);
489         try {
490           version = dis.readUTF();
491         } finally {
492           dis.close();
493         }
494         // Update the format
495         LOG.info("Updating the hbase.version file format with version=" + version);
496         setVersion(fs, rootdir, version, 0, HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
497       }
498     } catch (EOFException eof) {
499       LOG.warn("Version file was empty, odd, will try to set it.");
500     } finally {
501       s.close();
502     }
503     return version;
504   }
505 
506   /**
507    * Parse the content of the ${HBASE_ROOTDIR}/hbase.version file.
508    * @param bytes The byte content of the hbase.version file.
509    * @return The version found in the file as a String.
510    * @throws DeserializationException
511    */
512   static String parseVersionFrom(final byte [] bytes)
513   throws DeserializationException {
514     ProtobufUtil.expectPBMagicPrefix(bytes);
515     int pblen = ProtobufUtil.lengthOfPBMagic();
516     FSProtos.HBaseVersionFileContent.Builder builder =
517       FSProtos.HBaseVersionFileContent.newBuilder();
518     FSProtos.HBaseVersionFileContent fileContent;
519     try {
520       fileContent = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
521       return fileContent.getVersion();
522     } catch (InvalidProtocolBufferException e) {
523       // Convert
524       throw new DeserializationException(e);
525     }
526   }
527 
528   /**
529    * Create the content to write into the ${HBASE_ROOTDIR}/hbase.version file.
530    * @param version Version to persist
531    * @return Serialized protobuf with <code>version</code> content and a bit of pb magic for a prefix.
532    */
533   static byte [] toVersionByteArray(final String version) {
534     FSProtos.HBaseVersionFileContent.Builder builder =
535       FSProtos.HBaseVersionFileContent.newBuilder();
536     return ProtobufUtil.prependPBMagic(builder.setVersion(version).build().toByteArray());
537   }
538 
539   /**
540    * Verifies current version of file system
541    *
542    * @param fs file system
543    * @param rootdir root directory of HBase installation
544    * @param message if true, issues a message on System.out
545    *
546    * @throws IOException e
547    * @throws DeserializationException
548    */
549   public static void checkVersion(FileSystem fs, Path rootdir, boolean message)
550   throws IOException, DeserializationException {
551     checkVersion(fs, rootdir, message, 0, HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
552   }
553 
554   /**
555    * Verifies current version of file system
556    *
557    * @param fs file system
558    * @param rootdir root directory of HBase installation
559    * @param message if true, issues a message on System.out
560    * @param wait wait interval
561    * @param retries number of times to retry
562    *
563    * @throws IOException e
564    * @throws DeserializationException
565    */
566   public static void checkVersion(FileSystem fs, Path rootdir,
567       boolean message, int wait, int retries)
568   throws IOException, DeserializationException {
569     String version = getVersion(fs, rootdir);
570     if (version == null) {
571       if (!metaRegionExists(fs, rootdir)) {
572         // rootDir is empty (no version file and no root region)
573         // just create new version file (HBASE-1195)
574         setVersion(fs, rootdir, wait, retries);
575         return;
576       }
577     } else if (version.compareTo(HConstants.FILE_SYSTEM_VERSION) == 0) return;
578 
579     // version is deprecated require migration
580     // Output on stdout so user sees it in terminal.
581     String msg = "HBase file layout needs to be upgraded."
582       + "  You have version " + version
583       + " and I want version " + HConstants.FILE_SYSTEM_VERSION
584       + ".  Is your hbase.rootdir valid?  If so, you may need to run "
585       + "'hbase hbck -fixVersionFile'.";
586     if (message) {
587       System.out.println("WARNING! " + msg);
588     }
589     throw new FileSystemVersionException(msg);
590   }
591 
592   /**
593    * Sets version of file system
594    *
595    * @param fs filesystem object
596    * @param rootdir hbase root
597    * @throws IOException e
598    */
599   public static void setVersion(FileSystem fs, Path rootdir)
600   throws IOException {
601     setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, 0,
602       HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
603   }
604 
605   /**
606    * Sets version of file system
607    *
608    * @param fs filesystem object
609    * @param rootdir hbase root
610    * @param wait time to wait for retry
611    * @param retries number of times to retry before failing
612    * @throws IOException e
613    */
614   public static void setVersion(FileSystem fs, Path rootdir, int wait, int retries)
615   throws IOException {
616     setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, wait, retries);
617   }
618 
619 
620   /**
621    * Sets version of file system
622    *
623    * @param fs filesystem object
624    * @param rootdir hbase root directory
625    * @param version version to set
626    * @param wait time to wait for retry
627    * @param retries number of times to retry before throwing an IOException
628    * @throws IOException e
629    */
630   public static void setVersion(FileSystem fs, Path rootdir, String version,
631       int wait, int retries) throws IOException {
632     Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
633     while (true) {
634       try {
635         FSDataOutputStream s = fs.create(versionFile);
636         s.write(toVersionByteArray(version));
637         s.close();
638         LOG.debug("Created version file at " + rootdir.toString() + " with version=" + version);
639         return;
640       } catch (IOException e) {
641         if (retries > 0) {
642           LOG.warn("Unable to create version file at " + rootdir.toString() + ", retrying", e);
643           fs.delete(versionFile, false);
644           try {
645             if (wait > 0) {
646               Thread.sleep(wait);
647             }
648           } catch (InterruptedException ie) {
649             throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
650           }
651           retries--;
652         } else {
653           throw e;
654         }
655       }
656     }
657   }
658 
659   /**
660    * Checks that a cluster ID file exists in the HBase root directory
661    * @param fs the root directory FileSystem
662    * @param rootdir the HBase root directory in HDFS
663    * @param wait how long to wait between retries
664    * @return <code>true</code> if the file exists, otherwise <code>false</code>
665    * @throws IOException if checking the FileSystem fails
666    */
667   public static boolean checkClusterIdExists(FileSystem fs, Path rootdir,
668       int wait) throws IOException {
669     while (true) {
670       try {
671         Path filePath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
672         return fs.exists(filePath);
673       } catch (IOException ioe) {
674         if (wait > 0) {
675           LOG.warn("Unable to check cluster ID file in " + rootdir.toString() +
676               ", retrying in "+wait+"msec: "+StringUtils.stringifyException(ioe));
677           try {
678             Thread.sleep(wait);
679           } catch (InterruptedException e) {
680             throw (InterruptedIOException)new InterruptedIOException().initCause(e);
681           }
682         } else {
683           throw ioe;
684         }
685       }
686     }
687   }
688 
689   /**
690    * Returns the value of the unique cluster ID stored for this HBase instance.
691    * @param fs the root directory FileSystem
692    * @param rootdir the path to the HBase root directory
693    * @return the unique cluster identifier
694    * @throws IOException if reading the cluster ID file fails
695    */
696   public static ClusterId getClusterId(FileSystem fs, Path rootdir)
697   throws IOException {
698     Path idPath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
699     ClusterId clusterId = null;
700     FileStatus status = fs.exists(idPath)? fs.getFileStatus(idPath):  null;
701     if (status != null) {
702       int len = Ints.checkedCast(status.getLen());
703       byte [] content = new byte[len];
704       FSDataInputStream in = fs.open(idPath);
705       try {
706         in.readFully(content);
707       } catch (EOFException eof) {
708         LOG.warn("Cluster ID file " + idPath.toString() + " was empty");
709       } finally{
710         in.close();
711       }
712       try {
713         clusterId = ClusterId.parseFrom(content);
714       } catch (DeserializationException e) {
715         throw new IOException("content=" + Bytes.toString(content), e);
716       }
717       // If not pb'd, make it so.
718       if (!ProtobufUtil.isPBMagicPrefix(content)) {
719         String cid = null;
720         in = fs.open(idPath);
721         try {
722           cid = in.readUTF();
723           clusterId = new ClusterId(cid);
724         } catch (EOFException eof) {
725           LOG.warn("Cluster ID file " + idPath.toString() + " was empty");
726         } finally {
727           in.close();
728         }
729         rewriteAsPb(fs, rootdir, idPath, clusterId);
730       }
731       return clusterId;
732     } else {
733       LOG.warn("Cluster ID file does not exist at " + idPath.toString());
734     }
735     return clusterId;
736   }
737 
738   /**
739    * @param cid
740    * @throws IOException
741    */
742   private static void rewriteAsPb(final FileSystem fs, final Path rootdir, final Path p,
743       final ClusterId cid)
744   throws IOException {
745     // Rewrite the file as pb.  Move aside the old one first, write new
746     // then delete the moved-aside file.
747     Path movedAsideName = new Path(p + "." + System.currentTimeMillis());
748     if (!fs.rename(p, movedAsideName)) throw new IOException("Failed rename of " + p);
749     setClusterId(fs, rootdir, cid, 100);
750     if (!fs.delete(movedAsideName, false)) {
751       throw new IOException("Failed delete of " + movedAsideName);
752     }
753     LOG.debug("Rewrote the hbase.id file as pb");
754   }
755 
756   /**
757    * Writes a new unique identifier for this cluster to the "hbase.id" file
758    * in the HBase root directory
759    * @param fs the root directory FileSystem
760    * @param rootdir the path to the HBase root directory
761    * @param clusterId the unique identifier to store
762    * @param wait how long (in milliseconds) to wait between retries
763    * @throws IOException if writing to the FileSystem fails and no wait value
764    */
765   public static void setClusterId(FileSystem fs, Path rootdir, ClusterId clusterId,
766       int wait) throws IOException {
767     while (true) {
768       try {
769         Path filePath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
770         FSDataOutputStream s = fs.create(filePath);
771         try {
772           s.write(clusterId.toByteArray());
773         } finally {
774           s.close();
775         }
776         if (LOG.isDebugEnabled()) {
777           LOG.debug("Created cluster ID file at " + filePath.toString() + " with ID: " + clusterId);
778         }
779         return;
780       } catch (IOException ioe) {
781         if (wait > 0) {
782           LOG.warn("Unable to create cluster ID file in " + rootdir.toString() +
783               ", retrying in " + wait + "msec: " + StringUtils.stringifyException(ioe));
784           try {
785             Thread.sleep(wait);
786           } catch (InterruptedException e) {
787             throw (InterruptedIOException)new InterruptedIOException().initCause(e);
788           }
789         } else {
790           throw ioe;
791         }
792       }
793     }
794   }
795 
796   /**
797    * Verifies root directory path is a valid URI with a scheme
798    *
799    * @param root root directory path
800    * @return Passed <code>root</code> argument.
801    * @throws IOException if not a valid URI with a scheme
802    */
803   public static Path validateRootPath(Path root) throws IOException {
804     try {
805       URI rootURI = new URI(root.toString());
806       String scheme = rootURI.getScheme();
807       if (scheme == null) {
808         throw new IOException("Root directory does not have a scheme");
809       }
810       return root;
811     } catch (URISyntaxException e) {
812       IOException io = new IOException("Root directory path is not a valid " +
813         "URI -- check your " + HConstants.HBASE_DIR + " configuration");
814       io.initCause(e);
815       throw io;
816     }
817   }
818 
819   /**
820    * Checks for the presence of the root path (using the provided conf object) in the given path. If
821    * it exists, this method removes it and returns the String representation of remaining relative path.
822    * @param path
823    * @param conf
824    * @return String representation of the remaining relative path
825    * @throws IOException
826    */
827   public static String removeRootPath(Path path, final Configuration conf) throws IOException {
828     Path root = FSUtils.getRootDir(conf);
829     String pathStr = path.toString();
830     // check that the path is absolute... it has the root path in it.
831     if (!pathStr.startsWith(root.toString())) return pathStr;
832     // if not, return as it is.
833     return pathStr.substring(root.toString().length() + 1);// remove the "/" too.
834   }
835 
836   /**
837    * If DFS, check safe mode and if so, wait until we clear it.
838    * @param conf configuration
839    * @param wait Sleep between retries
840    * @throws IOException e
841    */
842   public static void waitOnSafeMode(final Configuration conf,
843     final long wait)
844   throws IOException {
845     FileSystem fs = FileSystem.get(conf);
846     if (!(fs instanceof DistributedFileSystem)) return;
847     DistributedFileSystem dfs = (DistributedFileSystem)fs;
848     // Make sure dfs is not in safe mode
849     while (isInSafeMode(dfs)) {
850       LOG.info("Waiting for dfs to exit safe mode...");
851       try {
852         Thread.sleep(wait);
853       } catch (InterruptedException e) {
854         throw (InterruptedIOException)new InterruptedIOException().initCause(e);
855       }
856     }
857   }
858 
859   /**
860    * Return the 'path' component of a Path.  In Hadoop, Path is an URI.  This
861    * method returns the 'path' component of a Path's URI: e.g. If a Path is
862    * <code>hdfs://example.org:9000/hbase_trunk/TestTable/compaction.dir</code>,
863    * this method returns <code>/hbase_trunk/TestTable/compaction.dir</code>.
864    * This method is useful if you want to print out a Path without qualifying
865    * Filesystem instance.
866    * @param p Filesystem Path whose 'path' component we are to return.
867    * @return Path portion of the Filesystem
868    */
869   public static String getPath(Path p) {
870     return p.toUri().getPath();
871   }
872 
873   /**
874    * @param c configuration
875    * @return Path to hbase root directory: i.e. <code>hbase.rootdir</code> from
876    * configuration as a qualified Path.
877    * @throws IOException e
878    */
879   public static Path getRootDir(final Configuration c) throws IOException {
880     Path p = new Path(c.get(HConstants.HBASE_DIR));
881     FileSystem fs = p.getFileSystem(c);
882     return p.makeQualified(fs);
883   }
884 
885   public static void setRootDir(final Configuration c, final Path root) throws IOException {
886     c.set(HConstants.HBASE_DIR, root.toString());
887   }
888 
889   public static void setFsDefault(final Configuration c, final Path root) throws IOException {
890     c.set("fs.defaultFS", root.toString());    // for hadoop 0.21+
891     c.set("fs.default.name", root.toString()); // for hadoop 0.20
892   }
893 
894   /**
895    * Checks if meta region exists
896    *
897    * @param fs file system
898    * @param rootdir root directory of HBase installation
899    * @return true if exists
900    * @throws IOException e
901    */
902   @SuppressWarnings("deprecation")
903   public static boolean metaRegionExists(FileSystem fs, Path rootdir)
904   throws IOException {
905     Path metaRegionDir =
906       HRegion.getRegionDir(rootdir, HRegionInfo.FIRST_META_REGIONINFO);
907     return fs.exists(metaRegionDir);
908   }
909 
910   /**
911    * Compute HDFS blocks distribution of a given file, or a portion of the file
912    * @param fs file system
913    * @param status file status of the file
914    * @param start start position of the portion
915    * @param length length of the portion
916    * @return The HDFS blocks distribution
917    */
918   static public HDFSBlocksDistribution computeHDFSBlocksDistribution(
919     final FileSystem fs, FileStatus status, long start, long length)
920     throws IOException {
921     HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution();
922     BlockLocation [] blockLocations =
923       fs.getFileBlockLocations(status, start, length);
924     for(BlockLocation bl : blockLocations) {
925       String [] hosts = bl.getHosts();
926       long len = bl.getLength();
927       blocksDistribution.addHostsAndBlockWeight(hosts, len);
928     }
929 
930     return blocksDistribution;
931   }
932 
933 
934 
935   /**
936    * Runs through the hbase rootdir and checks all stores have only
937    * one file in them -- that is, they've been major compacted.  Looks
938    * at root and meta tables too.
939    * @param fs filesystem
940    * @param hbaseRootDir hbase root directory
941    * @return True if this hbase install is major compacted.
942    * @throws IOException e
943    */
944   public static boolean isMajorCompacted(final FileSystem fs,
945       final Path hbaseRootDir)
946   throws IOException {
947     List<Path> tableDirs = getTableDirs(fs, hbaseRootDir);
948     for (Path d : tableDirs) {
949       FileStatus[] regionDirs = fs.listStatus(d, new DirFilter(fs));
950       for (FileStatus regionDir : regionDirs) {
951         Path dd = regionDir.getPath();
952         if (dd.getName().equals(HConstants.HREGION_COMPACTIONDIR_NAME)) {
953           continue;
954         }
955         // Else its a region name.  Now look in region for families.
956         FileStatus[] familyDirs = fs.listStatus(dd, new DirFilter(fs));
957         for (FileStatus familyDir : familyDirs) {
958           Path family = familyDir.getPath();
959           // Now in family make sure only one file.
960           FileStatus[] familyStatus = fs.listStatus(family);
961           if (familyStatus.length > 1) {
962             LOG.debug(family.toString() + " has " + familyStatus.length +
963                 " files.");
964             return false;
965           }
966         }
967       }
968     }
969     return true;
970   }
971 
972   // TODO move this method OUT of FSUtils. No dependencies to HMaster
973   /**
974    * Returns the total overall fragmentation percentage. Includes hbase:meta and
975    * -ROOT- as well.
976    *
977    * @param master  The master defining the HBase root and file system.
978    * @return A map for each table and its percentage.
979    * @throws IOException When scanning the directory fails.
980    */
981   public static int getTotalTableFragmentation(final HMaster master)
982   throws IOException {
983     Map<String, Integer> map = getTableFragmentation(master);
984     return map != null && map.size() > 0 ? map.get("-TOTAL-") : -1;
985   }
986 
987   /**
988    * Runs through the HBase rootdir and checks how many stores for each table
989    * have more than one file in them. Checks -ROOT- and hbase:meta too. The total
990    * percentage across all tables is stored under the special key "-TOTAL-".
991    *
992    * @param master  The master defining the HBase root and file system.
993    * @return A map for each table and its percentage.
994    *
995    * @throws IOException When scanning the directory fails.
996    */
997   public static Map<String, Integer> getTableFragmentation(
998     final HMaster master)
999   throws IOException {
1000     Path path = getRootDir(master.getConfiguration());
1001     // since HMaster.getFileSystem() is package private
1002     FileSystem fs = path.getFileSystem(master.getConfiguration());
1003     return getTableFragmentation(fs, path);
1004   }
1005 
1006   /**
1007    * Runs through the HBase rootdir and checks how many stores for each table
1008    * have more than one file in them. Checks -ROOT- and hbase:meta too. The total
1009    * percentage across all tables is stored under the special key "-TOTAL-".
1010    *
1011    * @param fs  The file system to use.
1012    * @param hbaseRootDir  The root directory to scan.
1013    * @return A map for each table and its percentage.
1014    * @throws IOException When scanning the directory fails.
1015    */
1016   public static Map<String, Integer> getTableFragmentation(
1017     final FileSystem fs, final Path hbaseRootDir)
1018   throws IOException {
1019     Map<String, Integer> frags = new HashMap<String, Integer>();
1020     int cfCountTotal = 0;
1021     int cfFragTotal = 0;
1022     DirFilter df = new DirFilter(fs);
1023     List<Path> tableDirs = getTableDirs(fs, hbaseRootDir);
1024     for (Path d : tableDirs) {
1025       int cfCount = 0;
1026       int cfFrag = 0;
1027       FileStatus[] regionDirs = fs.listStatus(d, df);
1028       for (FileStatus regionDir : regionDirs) {
1029         Path dd = regionDir.getPath();
1030         if (dd.getName().equals(HConstants.HREGION_COMPACTIONDIR_NAME)) {
1031           continue;
1032         }
1033         // else its a region name, now look in region for families
1034         FileStatus[] familyDirs = fs.listStatus(dd, df);
1035         for (FileStatus familyDir : familyDirs) {
1036           cfCount++;
1037           cfCountTotal++;
1038           Path family = familyDir.getPath();
1039           // now in family make sure only one file
1040           FileStatus[] familyStatus = fs.listStatus(family);
1041           if (familyStatus.length > 1) {
1042             cfFrag++;
1043             cfFragTotal++;
1044           }
1045         }
1046       }
1047       // compute percentage per table and store in result list
1048       frags.put(FSUtils.getTableName(d).getNameAsString(),
1049           Math.round((float) cfFrag / cfCount * 100));
1050     }
1051     // set overall percentage for all tables
1052     frags.put("-TOTAL-", Math.round((float) cfFragTotal / cfCountTotal * 100));
1053     return frags;
1054   }
1055 
1056   /**
1057    * Expects to find -ROOT- directory.
1058    * @param fs filesystem
1059    * @param hbaseRootDir hbase root directory
1060    * @return True if this a pre020 layout.
1061    * @throws IOException e
1062    */
1063   public static boolean isPre020FileLayout(final FileSystem fs,
1064     final Path hbaseRootDir)
1065   throws IOException {
1066     Path mapfiles = new Path(new Path(new Path(new Path(hbaseRootDir, "-ROOT-"),
1067       "70236052"), "info"), "mapfiles");
1068     return fs.exists(mapfiles);
1069   }
1070 
1071   /**
1072    * Runs through the hbase rootdir and checks all stores have only
1073    * one file in them -- that is, they've been major compacted.  Looks
1074    * at root and meta tables too.  This version differs from
1075    * {@link #isMajorCompacted(FileSystem, Path)} in that it expects a
1076    * pre-0.20.0 hbase layout on the filesystem.  Used migrating.
1077    * @param fs filesystem
1078    * @param hbaseRootDir hbase root directory
1079    * @return True if this hbase install is major compacted.
1080    * @throws IOException e
1081    */
1082   public static boolean isMajorCompactedPre020(final FileSystem fs,
1083       final Path hbaseRootDir)
1084   throws IOException {
1085     // Presumes any directory under hbase.rootdir is a table.
1086     List<Path> tableDirs = getTableDirs(fs, hbaseRootDir);
1087     for (Path d: tableDirs) {
1088       // Inside a table, there are compaction.dir directories to skip.
1089       // Otherwise, all else should be regions.  Then in each region, should
1090       // only be family directories.  Under each of these, should be a mapfile
1091       // and info directory and in these only one file.
1092       if (d.getName().equals(HConstants.HREGION_LOGDIR_NAME)) {
1093         continue;
1094       }
1095       FileStatus[] regionDirs = fs.listStatus(d, new DirFilter(fs));
1096       for (FileStatus regionDir : regionDirs) {
1097         Path dd = regionDir.getPath();
1098         if (dd.getName().equals(HConstants.HREGION_COMPACTIONDIR_NAME)) {
1099           continue;
1100         }
1101         // Else its a region name.  Now look in region for families.
1102         FileStatus[] familyDirs = fs.listStatus(dd, new DirFilter(fs));
1103         for (FileStatus familyDir : familyDirs) {
1104           Path family = familyDir.getPath();
1105           FileStatus[] infoAndMapfile = fs.listStatus(family);
1106           // Assert that only info and mapfile in family dir.
1107           if (infoAndMapfile.length != 0 && infoAndMapfile.length != 2) {
1108             LOG.debug(family.toString() +
1109                 " has more than just info and mapfile: " + infoAndMapfile.length);
1110             return false;
1111           }
1112           // Make sure directory named info or mapfile.
1113           for (int ll = 0; ll < 2; ll++) {
1114             if (infoAndMapfile[ll].getPath().getName().equals("info") ||
1115                 infoAndMapfile[ll].getPath().getName().equals("mapfiles"))
1116               continue;
1117             LOG.debug("Unexpected directory name: " +
1118                 infoAndMapfile[ll].getPath());
1119             return false;
1120           }
1121           // Now in family, there are 'mapfile' and 'info' subdirs.  Just
1122           // look in the 'mapfile' subdir.
1123           FileStatus[] familyStatus =
1124               fs.listStatus(new Path(family, "mapfiles"));
1125           if (familyStatus.length > 1) {
1126             LOG.debug(family.toString() + " has " + familyStatus.length +
1127                 " files.");
1128             return false;
1129           }
1130         }
1131       }
1132     }
1133     return true;
1134   }
1135 
1136   /**
1137    * Returns the {@link org.apache.hadoop.fs.Path} object representing the table directory under
1138    * path rootdir
1139    *
1140    * @param rootdir qualified path of HBase root directory
1141    * @param tableName name of table
1142    * @return {@link org.apache.hadoop.fs.Path} for table
1143    */
1144   public static Path getTableDir(Path rootdir, final TableName tableName) {
1145     return new Path(getNamespaceDir(rootdir, tableName.getNamespaceAsString()),
1146         tableName.getQualifierAsString());
1147   }
1148 
1149   /**
1150    * Returns the {@link org.apache.hadoop.hbase.TableName} object representing
1151    * the table directory under
1152    * path rootdir
1153    *
1154    * @param tablePath path of table
1155    * @return {@link org.apache.hadoop.fs.Path} for table
1156    */
1157   public static TableName getTableName(Path tablePath) {
1158     return TableName.valueOf(tablePath.getParent().getName(), tablePath.getName());
1159   }
1160 
1161   /**
1162    * Returns the {@link org.apache.hadoop.fs.Path} object representing
1163    * the namespace directory under path rootdir
1164    *
1165    * @param rootdir qualified path of HBase root directory
1166    * @param namespace namespace name
1167    * @return {@link org.apache.hadoop.fs.Path} for table
1168    */
1169   public static Path getNamespaceDir(Path rootdir, final String namespace) {
1170     return new Path(rootdir, new Path(HConstants.BASE_NAMESPACE_DIR,
1171         new Path(namespace)));
1172   }
1173 
1174   /**
1175    * A {@link PathFilter} that returns only regular files.
1176    */
1177   static class FileFilter implements PathFilter {
1178     private final FileSystem fs;
1179 
1180     public FileFilter(final FileSystem fs) {
1181       this.fs = fs;
1182     }
1183 
1184     @Override
1185     public boolean accept(Path p) {
1186       try {
1187         return fs.isFile(p);
1188       } catch (IOException e) {
1189         LOG.debug("unable to verify if path=" + p + " is a regular file", e);
1190         return false;
1191       }
1192     }
1193   }
1194 
1195   /**
1196    * Directory filter that doesn't include any of the directories in the specified blacklist
1197    */
1198   public static class BlackListDirFilter implements PathFilter {
1199     private final FileSystem fs;
1200     private List<String> blacklist;
1201 
1202     /**
1203      * Create a filter on the give filesystem with the specified blacklist
1204      * @param fs filesystem to filter
1205      * @param directoryNameBlackList list of the names of the directories to filter. If
1206      *          <tt>null</tt>, all directories are returned
1207      */
1208     @SuppressWarnings("unchecked")
1209     public BlackListDirFilter(final FileSystem fs, final List<String> directoryNameBlackList) {
1210       this.fs = fs;
1211       blacklist =
1212         (List<String>) (directoryNameBlackList == null ? Collections.emptyList()
1213           : directoryNameBlackList);
1214     }
1215 
1216     @Override
1217     public boolean accept(Path p) {
1218       boolean isValid = false;
1219       try {
1220         if (blacklist.contains(p.getName().toString())) {
1221           isValid = false;
1222         } else {
1223           isValid = fs.getFileStatus(p).isDir();
1224         }
1225       } catch (IOException e) {
1226         LOG.warn("An error occurred while verifying if [" + p.toString()
1227             + "] is a valid directory. Returning 'not valid' and continuing.", e);
1228       }
1229       return isValid;
1230     }
1231   }
1232 
1233   /**
1234    * A {@link PathFilter} that only allows directories.
1235    */
1236   public static class DirFilter extends BlackListDirFilter {
1237 
1238     public DirFilter(FileSystem fs) {
1239       super(fs, null);
1240     }
1241   }
1242 
1243   /**
1244    * A {@link PathFilter} that returns usertable directories. To get all directories use the
1245    * {@link BlackListDirFilter} with a <tt>null</tt> blacklist
1246    */
1247   public static class UserTableDirFilter extends BlackListDirFilter {
1248 
1249     public UserTableDirFilter(FileSystem fs) {
1250       super(fs, HConstants.HBASE_NON_TABLE_DIRS);
1251     }
1252   }
1253 
1254   /**
1255    * Heuristic to determine whether is safe or not to open a file for append
1256    * Looks both for dfs.support.append and use reflection to search
1257    * for SequenceFile.Writer.syncFs() or FSDataOutputStream.hflush()
1258    * @param conf
1259    * @return True if append support
1260    */
1261   public static boolean isAppendSupported(final Configuration conf) {
1262     boolean append = conf.getBoolean("dfs.support.append", false);
1263     if (append) {
1264       try {
1265         // TODO: The implementation that comes back when we do a createWriter
1266         // may not be using SequenceFile so the below is not a definitive test.
1267         // Will do for now (hdfs-200).
1268         SequenceFile.Writer.class.getMethod("syncFs", new Class<?> []{});
1269         append = true;
1270       } catch (SecurityException e) {
1271       } catch (NoSuchMethodException e) {
1272         append = false;
1273       }
1274     }
1275     if (!append) {
1276       // Look for the 0.21, 0.22, new-style append evidence.
1277       try {
1278         FSDataOutputStream.class.getMethod("hflush", new Class<?> []{});
1279         append = true;
1280       } catch (NoSuchMethodException e) {
1281         append = false;
1282       }
1283     }
1284     return append;
1285   }
1286 
1287   /**
1288    * @param conf
1289    * @return True if this filesystem whose scheme is 'hdfs'.
1290    * @throws IOException
1291    */
1292   public static boolean isHDFS(final Configuration conf) throws IOException {
1293     FileSystem fs = FileSystem.get(conf);
1294     String scheme = fs.getUri().getScheme();
1295     return scheme.equalsIgnoreCase("hdfs");
1296   }
1297 
1298   /**
1299    * Recover file lease. Used when a file might be suspect
1300    * to be had been left open by another process.
1301    * @param fs FileSystem handle
1302    * @param p Path of file to recover lease
1303    * @param conf Configuration handle
1304    * @throws IOException
1305    */
1306   public abstract void recoverFileLease(final FileSystem fs, final Path p,
1307       Configuration conf, CancelableProgressable reporter) throws IOException;
1308 
1309   public static List<Path> getTableDirs(final FileSystem fs, final Path rootdir)
1310       throws IOException {
1311     List<Path> tableDirs = new LinkedList<Path>();
1312 
1313     for(FileStatus status :
1314         fs.globStatus(new Path(rootdir,
1315             new Path(HConstants.BASE_NAMESPACE_DIR, "*")))) {
1316       tableDirs.addAll(FSUtils.getLocalTableDirs(fs, status.getPath()));
1317     }
1318     return tableDirs;
1319   }
1320 
1321   /**
1322    * @param fs
1323    * @param rootdir
1324    * @return All the table directories under <code>rootdir</code>. Ignore non table hbase folders such as
1325    * .logs, .oldlogs, .corrupt folders.
1326    * @throws IOException
1327    */
1328   public static List<Path> getLocalTableDirs(final FileSystem fs, final Path rootdir)
1329       throws IOException {
1330     // presumes any directory under hbase.rootdir is a table
1331     FileStatus[] dirs = fs.listStatus(rootdir, new UserTableDirFilter(fs));
1332     List<Path> tabledirs = new ArrayList<Path>(dirs.length);
1333     for (FileStatus dir: dirs) {
1334       tabledirs.add(dir.getPath());
1335     }
1336     return tabledirs;
1337   }
1338 
1339   /**
1340    * Checks if the given path is the one with 'recovered.edits' dir.
1341    * @param path
1342    * @return True if we recovered edits
1343    */
1344   public static boolean isRecoveredEdits(Path path) {
1345     return path.toString().contains(HConstants.RECOVERED_EDITS_DIR);
1346   }
1347 
1348   /**
1349    * Filter for all dirs that don't start with '.'
1350    */
1351   public static class RegionDirFilter implements PathFilter {
1352     // This pattern will accept 0.90+ style hex region dirs and older numeric region dir names.
1353     final public static Pattern regionDirPattern = Pattern.compile("^[0-9a-f]*$");
1354     final FileSystem fs;
1355 
1356     public RegionDirFilter(FileSystem fs) {
1357       this.fs = fs;
1358     }
1359 
1360     @Override
1361     public boolean accept(Path rd) {
1362       if (!regionDirPattern.matcher(rd.getName()).matches()) {
1363         return false;
1364       }
1365 
1366       try {
1367         return fs.getFileStatus(rd).isDir();
1368       } catch (IOException ioe) {
1369         // Maybe the file was moved or the fs was disconnected.
1370         LOG.warn("Skipping file " + rd +" due to IOException", ioe);
1371         return false;
1372       }
1373     }
1374   }
1375 
1376   /**
1377    * Given a particular table dir, return all the regiondirs inside it, excluding files such as
1378    * .tableinfo
1379    * @param fs A file system for the Path
1380    * @param tableDir Path to a specific table directory <hbase.rootdir>/<tabledir>
1381    * @return List of paths to valid region directories in table dir.
1382    * @throws IOException
1383    */
1384   public static List<Path> getRegionDirs(final FileSystem fs, final Path tableDir) throws IOException {
1385     // assumes we are in a table dir.
1386     FileStatus[] rds = fs.listStatus(tableDir, new RegionDirFilter(fs));
1387     List<Path> regionDirs = new ArrayList<Path>(rds.length);
1388     for (FileStatus rdfs: rds) {
1389       Path rdPath = rdfs.getPath();
1390       regionDirs.add(rdPath);
1391     }
1392     return regionDirs;
1393   }
1394 
1395   /**
1396    * Filter for all dirs that are legal column family names.  This is generally used for colfam
1397    * dirs <hbase.rootdir>/<tabledir>/<regiondir>/<colfamdir>.
1398    */
1399   public static class FamilyDirFilter implements PathFilter {
1400     final FileSystem fs;
1401 
1402     public FamilyDirFilter(FileSystem fs) {
1403       this.fs = fs;
1404     }
1405 
1406     @Override
1407     public boolean accept(Path rd) {
1408       try {
1409         // throws IAE if invalid
1410         HColumnDescriptor.isLegalFamilyName(Bytes.toBytes(rd.getName()));
1411       } catch (IllegalArgumentException iae) {
1412         // path name is an invalid family name and thus is excluded.
1413         return false;
1414       }
1415 
1416       try {
1417         return fs.getFileStatus(rd).isDir();
1418       } catch (IOException ioe) {
1419         // Maybe the file was moved or the fs was disconnected.
1420         LOG.warn("Skipping file " + rd +" due to IOException", ioe);
1421         return false;
1422       }
1423     }
1424   }
1425 
1426   /**
1427    * Given a particular region dir, return all the familydirs inside it
1428    *
1429    * @param fs A file system for the Path
1430    * @param regionDir Path to a specific region directory
1431    * @return List of paths to valid family directories in region dir.
1432    * @throws IOException
1433    */
1434   public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir) throws IOException {
1435     // assumes we are in a region dir.
1436     FileStatus[] fds = fs.listStatus(regionDir, new FamilyDirFilter(fs));
1437     List<Path> familyDirs = new ArrayList<Path>(fds.length);
1438     for (FileStatus fdfs: fds) {
1439       Path fdPath = fdfs.getPath();
1440       familyDirs.add(fdPath);
1441     }
1442     return familyDirs;
1443   }
1444 
1445   /**
1446    * Filter for HFiles that excludes reference files.
1447    */
1448   public static class HFileFilter implements PathFilter {
1449     // This pattern will accept 0.90+ style hex hfies files but reject reference files
1450     final public static Pattern hfilePattern = Pattern.compile("^([0-9a-f]+)$");
1451 
1452     final FileSystem fs;
1453 
1454     public HFileFilter(FileSystem fs) {
1455       this.fs = fs;
1456     }
1457 
1458     @Override
1459     public boolean accept(Path rd) {
1460       if (!hfilePattern.matcher(rd.getName()).matches()) {
1461         return false;
1462       }
1463 
1464       try {
1465         // only files
1466         return !fs.getFileStatus(rd).isDir();
1467       } catch (IOException ioe) {
1468         // Maybe the file was moved or the fs was disconnected.
1469         LOG.warn("Skipping file " + rd +" due to IOException", ioe);
1470         return false;
1471       }
1472     }
1473   }
1474 
1475   /**
1476    * @param conf
1477    * @return Returns the filesystem of the hbase rootdir.
1478    * @throws IOException
1479    */
1480   public static FileSystem getCurrentFileSystem(Configuration conf)
1481   throws IOException {
1482     return getRootDir(conf).getFileSystem(conf);
1483   }
1484 
1485 
1486   /**
1487    * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1488    * table StoreFile names to the full Path.
1489    * <br>
1490    * Example...<br>
1491    * Key = 3944417774205889744  <br>
1492    * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1493    *
1494    * @param map map to add values.  If null, this method will create and populate one to return
1495    * @param fs  The file system to use.
1496    * @param hbaseRootDir  The root directory to scan.
1497    * @param tableName name of the table to scan.
1498    * @return Map keyed by StoreFile name with a value of the full Path.
1499    * @throws IOException When scanning the directory fails.
1500    */
1501   public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map,
1502   final FileSystem fs, final Path hbaseRootDir, TableName tableName)
1503   throws IOException {
1504     if (map == null) {
1505       map = new HashMap<String, Path>();
1506     }
1507 
1508     // only include the directory paths to tables
1509     Path tableDir = FSUtils.getTableDir(hbaseRootDir, tableName);
1510     // Inside a table, there are compaction.dir directories to skip.  Otherwise, all else
1511     // should be regions.
1512     PathFilter df = new BlackListDirFilter(fs, HConstants.HBASE_NON_TABLE_DIRS);
1513     FileStatus[] regionDirs = fs.listStatus(tableDir);
1514     for (FileStatus regionDir : regionDirs) {
1515       Path dd = regionDir.getPath();
1516       if (dd.getName().equals(HConstants.HREGION_COMPACTIONDIR_NAME)) {
1517         continue;
1518       }
1519       // else its a region name, now look in region for families
1520       FileStatus[] familyDirs = fs.listStatus(dd, df);
1521       for (FileStatus familyDir : familyDirs) {
1522         Path family = familyDir.getPath();
1523         // now in family, iterate over the StoreFiles and
1524         // put in map
1525         FileStatus[] familyStatus = fs.listStatus(family);
1526         for (FileStatus sfStatus : familyStatus) {
1527           Path sf = sfStatus.getPath();
1528           map.put( sf.getName(), sf);
1529         }
1530       }
1531     }
1532     return map;
1533   }
1534 
1535 
1536   /**
1537    * Runs through the HBase rootdir and creates a reverse lookup map for
1538    * table StoreFile names to the full Path.
1539    * <br>
1540    * Example...<br>
1541    * Key = 3944417774205889744  <br>
1542    * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1543    *
1544    * @param fs  The file system to use.
1545    * @param hbaseRootDir  The root directory to scan.
1546    * @return Map keyed by StoreFile name with a value of the full Path.
1547    * @throws IOException When scanning the directory fails.
1548    */
1549   public static Map<String, Path> getTableStoreFilePathMap(
1550     final FileSystem fs, final Path hbaseRootDir)
1551   throws IOException {
1552     Map<String, Path> map = new HashMap<String, Path>();
1553 
1554     // if this method looks similar to 'getTableFragmentation' that is because
1555     // it was borrowed from it.
1556 
1557     // only include the directory paths to tables
1558     for (Path tableDir : FSUtils.getTableDirs(fs, hbaseRootDir)) {
1559       getTableStoreFilePathMap(map, fs, hbaseRootDir,
1560           FSUtils.getTableName(tableDir));
1561     }
1562     return map;
1563   }
1564 
1565   /**
1566    * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
1567    * This accommodates differences between hadoop versions, where hadoop 1
1568    * does not throw a FileNotFoundException, and return an empty FileStatus[]
1569    * while Hadoop 2 will throw FileNotFoundException.
1570    *
1571    * @param fs file system
1572    * @param dir directory
1573    * @param filter path filter
1574    * @return null if dir is empty or doesn't exist, otherwise FileStatus array
1575    */
1576   public static FileStatus [] listStatus(final FileSystem fs,
1577       final Path dir, final PathFilter filter) throws IOException {
1578     FileStatus [] status = null;
1579     try {
1580       status = filter == null ? fs.listStatus(dir) : fs.listStatus(dir, filter);
1581     } catch (FileNotFoundException fnfe) {
1582       // if directory doesn't exist, return null
1583       if (LOG.isTraceEnabled()) {
1584         LOG.trace(dir + " doesn't exist");
1585       }
1586     }
1587     if (status == null || status.length < 1) return null;
1588     return status;
1589   }
1590 
1591   /**
1592    * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
1593    * This would accommodates differences between hadoop versions
1594    *
1595    * @param fs file system
1596    * @param dir directory
1597    * @return null if dir is empty or doesn't exist, otherwise FileStatus array
1598    */
1599   public static FileStatus[] listStatus(final FileSystem fs, final Path dir) throws IOException {
1600     return listStatus(fs, dir, null);
1601   }
1602 
1603   /**
1604    * Calls fs.delete() and returns the value returned by the fs.delete()
1605    *
1606    * @param fs
1607    * @param path
1608    * @param recursive
1609    * @return the value returned by the fs.delete()
1610    * @throws IOException
1611    */
1612   public static boolean delete(final FileSystem fs, final Path path, final boolean recursive)
1613       throws IOException {
1614     return fs.delete(path, recursive);
1615   }
1616 
1617   /**
1618    * Calls fs.exists(). Checks if the specified path exists
1619    *
1620    * @param fs
1621    * @param path
1622    * @return the value returned by fs.exists()
1623    * @throws IOException
1624    */
1625   public static boolean isExists(final FileSystem fs, final Path path) throws IOException {
1626     return fs.exists(path);
1627   }
1628 
1629   /**
1630    * Throw an exception if an action is not permitted by a user on a file.
1631    *
1632    * @param ugi
1633    *          the user
1634    * @param file
1635    *          the file
1636    * @param action
1637    *          the action
1638    */
1639   public static void checkAccess(UserGroupInformation ugi, FileStatus file,
1640       FsAction action) throws AccessControlException {
1641     if (ugi.getShortUserName().equals(file.getOwner())) {
1642       if (file.getPermission().getUserAction().implies(action)) {
1643         return;
1644       }
1645     } else if (contains(ugi.getGroupNames(), file.getGroup())) {
1646       if (file.getPermission().getGroupAction().implies(action)) {
1647         return;
1648       }
1649     } else if (file.getPermission().getOtherAction().implies(action)) {
1650       return;
1651     }
1652     throw new AccessControlException("Permission denied:" + " action=" + action
1653         + " path=" + file.getPath() + " user=" + ugi.getShortUserName());
1654   }
1655 
1656   private static boolean contains(String[] groups, String user) {
1657     for (String group : groups) {
1658       if (group.equals(user)) {
1659         return true;
1660       }
1661     }
1662     return false;
1663   }
1664 
1665   /**
1666    * Log the current state of the filesystem from a certain root directory
1667    * @param fs filesystem to investigate
1668    * @param root root file/directory to start logging from
1669    * @param LOG log to output information
1670    * @throws IOException if an unexpected exception occurs
1671    */
1672   public static void logFileSystemState(final FileSystem fs, final Path root, Log LOG)
1673       throws IOException {
1674     LOG.debug("Current file system:");
1675     logFSTree(LOG, fs, root, "|-");
1676   }
1677 
1678   /**
1679    * Recursive helper to log the state of the FS
1680    *
1681    * @see #logFileSystemState(FileSystem, Path, Log)
1682    */
1683   private static void logFSTree(Log LOG, final FileSystem fs, final Path root, String prefix)
1684       throws IOException {
1685     FileStatus[] files = FSUtils.listStatus(fs, root, null);
1686     if (files == null) return;
1687 
1688     for (FileStatus file : files) {
1689       if (file.isDir()) {
1690         LOG.debug(prefix + file.getPath().getName() + "/");
1691         logFSTree(LOG, fs, file.getPath(), prefix + "---");
1692       } else {
1693         LOG.debug(prefix + file.getPath().getName());
1694       }
1695     }
1696   }
1697 
1698   public static boolean renameAndSetModifyTime(final FileSystem fs, final Path src, final Path dest)
1699       throws IOException {
1700     // set the modify time for TimeToLive Cleaner
1701     fs.setTimes(src, EnvironmentEdgeManager.currentTimeMillis(), -1);
1702     return fs.rename(src, dest);
1703   }
1704 
1705   /**
1706    * This function is to scan the root path of the file system to get the
1707    * degree of locality for each region on each of the servers having at least
1708    * one block of that region.
1709    * This is used by the tool {@link RegionPlacementMaintainer}
1710    *
1711    * @param conf
1712    *          the configuration to use
1713    * @return the mapping from region encoded name to a map of server names to
1714    *           locality fraction
1715    * @throws IOException
1716    *           in case of file system errors or interrupts
1717    */
1718   public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
1719       final Configuration conf) throws IOException {
1720     return getRegionDegreeLocalityMappingFromFS(
1721         conf, null,
1722         conf.getInt(THREAD_POOLSIZE, DEFAULT_THREAD_POOLSIZE));
1723 
1724   }
1725 
1726   /**
1727    * This function is to scan the root path of the file system to get the
1728    * degree of locality for each region on each of the servers having at least
1729    * one block of that region.
1730    *
1731    * @param conf
1732    *          the configuration to use
1733    * @param desiredTable
1734    *          the table you wish to scan locality for
1735    * @param threadPoolSize
1736    *          the thread pool size to use
1737    * @return the mapping from region encoded name to a map of server names to
1738    *           locality fraction
1739    * @throws IOException
1740    *           in case of file system errors or interrupts
1741    */
1742   public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
1743       final Configuration conf, final String desiredTable, int threadPoolSize)
1744       throws IOException {
1745     Map<String, Map<String, Float>> regionDegreeLocalityMapping =
1746         new ConcurrentHashMap<String, Map<String, Float>>();
1747     getRegionLocalityMappingFromFS(conf, desiredTable, threadPoolSize, null,
1748         regionDegreeLocalityMapping);
1749     return regionDegreeLocalityMapping;
1750   }
1751 
1752   /**
1753    * This function is to scan the root path of the file system to get either the
1754    * mapping between the region name and its best locality region server or the
1755    * degree of locality of each region on each of the servers having at least
1756    * one block of that region. The output map parameters are both optional.
1757    *
1758    * @param conf
1759    *          the configuration to use
1760    * @param desiredTable
1761    *          the table you wish to scan locality for
1762    * @param threadPoolSize
1763    *          the thread pool size to use
1764    * @param regionToBestLocalityRSMapping
1765    *          the map into which to put the best locality mapping or null
1766    * @param regionDegreeLocalityMapping
1767    *          the map into which to put the locality degree mapping or null,
1768    *          must be a thread-safe implementation
1769    * @throws IOException
1770    *           in case of file system errors or interrupts
1771    */
1772   private static void getRegionLocalityMappingFromFS(
1773       final Configuration conf, final String desiredTable,
1774       int threadPoolSize,
1775       Map<String, String> regionToBestLocalityRSMapping,
1776       Map<String, Map<String, Float>> regionDegreeLocalityMapping)
1777       throws IOException {
1778     FileSystem fs =  FileSystem.get(conf);
1779     Path rootPath = FSUtils.getRootDir(conf);
1780     long startTime = EnvironmentEdgeManager.currentTimeMillis();
1781     Path queryPath;
1782     // The table files are in ${hbase.rootdir}/data/<namespace>/<table>/*
1783     if (null == desiredTable) {
1784       queryPath = new Path(new Path(rootPath, HConstants.BASE_NAMESPACE_DIR).toString() + "/*/*/*/");
1785     } else {
1786       queryPath = new Path(FSUtils.getTableDir(rootPath, TableName.valueOf(desiredTable)).toString() + "/*/");
1787     }
1788 
1789     // reject all paths that are not appropriate
1790     PathFilter pathFilter = new PathFilter() {
1791       @Override
1792       public boolean accept(Path path) {
1793         // this is the region name; it may get some noise data
1794         if (null == path) {
1795           return false;
1796         }
1797 
1798         // no parent?
1799         Path parent = path.getParent();
1800         if (null == parent) {
1801           return false;
1802         }
1803 
1804         String regionName = path.getName();
1805         if (null == regionName) {
1806           return false;
1807         }
1808 
1809         if (!regionName.toLowerCase().matches("[0-9a-f]+")) {
1810           return false;
1811         }
1812         return true;
1813       }
1814     };
1815 
1816     FileStatus[] statusList = fs.globStatus(queryPath, pathFilter);
1817 
1818     if (null == statusList) {
1819       return;
1820     } else {
1821       LOG.debug("Query Path: " + queryPath + " ; # list of files: " +
1822           statusList.length);
1823     }
1824 
1825     // lower the number of threads in case we have very few expected regions
1826     threadPoolSize = Math.min(threadPoolSize, statusList.length);
1827 
1828     // run in multiple threads
1829     ThreadPoolExecutor tpe = new ThreadPoolExecutor(threadPoolSize,
1830         threadPoolSize, 60, TimeUnit.SECONDS,
1831         new ArrayBlockingQueue<Runnable>(statusList.length));
1832     try {
1833       // ignore all file status items that are not of interest
1834       for (FileStatus regionStatus : statusList) {
1835         if (null == regionStatus) {
1836           continue;
1837         }
1838 
1839         if (!regionStatus.isDir()) {
1840           continue;
1841         }
1842 
1843         Path regionPath = regionStatus.getPath();
1844         if (null == regionPath) {
1845           continue;
1846         }
1847 
1848         tpe.execute(new FSRegionScanner(fs, regionPath,
1849             regionToBestLocalityRSMapping, regionDegreeLocalityMapping));
1850       }
1851     } finally {
1852       tpe.shutdown();
1853       int threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY,
1854           60 * 1000);
1855       try {
1856         // here we wait until TPE terminates, which is either naturally or by
1857         // exceptions in the execution of the threads
1858         while (!tpe.awaitTermination(threadWakeFrequency,
1859             TimeUnit.MILLISECONDS)) {
1860           // printing out rough estimate, so as to not introduce
1861           // AtomicInteger
1862           LOG.info("Locality checking is underway: { Scanned Regions : "
1863               + tpe.getCompletedTaskCount() + "/"
1864               + tpe.getTaskCount() + " }");
1865         }
1866       } catch (InterruptedException e) {
1867         throw (InterruptedIOException)new InterruptedIOException().initCause(e);
1868       }
1869     }
1870 
1871     long overhead = EnvironmentEdgeManager.currentTimeMillis() - startTime;
1872     String overheadMsg = "Scan DFS for locality info takes " + overhead + " ms";
1873 
1874     LOG.info(overheadMsg);
1875   }
1876 
1877   /**
1878    * Do our short circuit read setup.
1879    * Checks buffer size to use and whether to do checksumming in hbase or hdfs.
1880    * @param conf
1881    */
1882   public static void setupShortCircuitRead(final Configuration conf) {
1883     // Check that the user has not set the "dfs.client.read.shortcircuit.skip.checksum" property.
1884     boolean shortCircuitSkipChecksum =
1885       conf.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false);
1886     boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
1887     if (shortCircuitSkipChecksum) {
1888       LOG.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not " +
1889         "be set to true." + (useHBaseChecksum ? " HBase checksum doesn't require " +
1890         "it, see https://issues.apache.org/jira/browse/HBASE-6868." : ""));
1891       assert !shortCircuitSkipChecksum; //this will fail if assertions are on
1892     }
1893     checkShortCircuitReadBufferSize(conf);
1894   }
1895 
1896   /**
1897    * Check if short circuit read buffer size is set and if not, set it to hbase value.
1898    * @param conf
1899    */
1900   public static void checkShortCircuitReadBufferSize(final Configuration conf) {
1901     final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2;
1902     final int notSet = -1;
1903     // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2
1904     final String dfsKey = "dfs.client.read.shortcircuit.buffer.size";
1905     int size = conf.getInt(dfsKey, notSet);
1906     // If a size is set, return -- we will use it.
1907     if (size != notSet) return;
1908     // But short circuit buffer size is normally not set.  Put in place the hbase wanted size.
1909     int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize);
1910     conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
1911   }
1912 }