View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.util;
20  
21  import java.io.ByteArrayInputStream;
22  import java.io.DataInputStream;
23  import java.io.EOFException;
24  import java.io.FileNotFoundException;
25  import java.io.IOException;
26  import java.io.InputStream;
27  import java.io.InterruptedIOException;
28  import java.lang.reflect.InvocationTargetException;
29  import java.lang.reflect.Method;
30  import java.net.InetSocketAddress;
31  import java.net.URI;
32  import java.net.URISyntaxException;
33  import java.util.ArrayList;
34  import java.util.Arrays;
35  import java.util.Collections;
36  import java.util.HashMap;
37  import java.util.Iterator;
38  import java.util.LinkedList;
39  import java.util.List;
40  import java.util.Locale;
41  import java.util.Map;
42  import java.util.Vector;
43  import java.util.concurrent.ArrayBlockingQueue;
44  import java.util.concurrent.ConcurrentHashMap;
45  import java.util.concurrent.ExecutionException;
46  import java.util.concurrent.ExecutorService;
47  import java.util.concurrent.Future;
48  import java.util.concurrent.FutureTask;
49  import java.util.concurrent.ThreadPoolExecutor;
50  import java.util.concurrent.TimeUnit;
51  import java.util.regex.Pattern;
52
53  import org.apache.commons.logging.Log;
54  import org.apache.commons.logging.LogFactory;
55  import org.apache.hadoop.hbase.classification.InterfaceAudience;
56  import org.apache.hadoop.HadoopIllegalArgumentException;
57  import org.apache.hadoop.conf.Configuration;
58  import org.apache.hadoop.fs.BlockLocation;
59  import org.apache.hadoop.fs.FSDataInputStream;
60  import org.apache.hadoop.fs.FSDataOutputStream;
61  import org.apache.hadoop.fs.FileStatus;
62  import org.apache.hadoop.fs.FileSystem;
63  import org.apache.hadoop.fs.Path;
64  import org.apache.hadoop.fs.PathFilter;
65  import org.apache.hadoop.fs.permission.FsAction;
66  import org.apache.hadoop.fs.permission.FsPermission;
67  import org.apache.hadoop.hbase.ClusterId;
68  import org.apache.hadoop.hbase.HColumnDescriptor;
69  import org.apache.hadoop.hbase.HConstants;
70  import org.apache.hadoop.hbase.HDFSBlocksDistribution;
71  import org.apache.hadoop.hbase.HRegionInfo;
72  import org.apache.hadoop.hbase.TableName;
73  import org.apache.hadoop.hbase.exceptions.DeserializationException;
74  import org.apache.hadoop.hbase.fs.HFileSystem;
75  import org.apache.hadoop.hbase.master.HMaster;
76  import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
77  import org.apache.hadoop.hbase.security.AccessDeniedException;
78  import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter;
79  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
80  import org.apache.hadoop.hbase.protobuf.generated.FSProtos;
81  import org.apache.hadoop.hbase.regionserver.HRegion;
82  import org.apache.hadoop.hdfs.DFSClient;
83  import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
84  import org.apache.hadoop.hdfs.DistributedFileSystem;
85  import org.apache.hadoop.io.IOUtils;
86  import org.apache.hadoop.io.SequenceFile;
87  import org.apache.hadoop.ipc.RemoteException;
88  import org.apache.hadoop.security.UserGroupInformation;
89  import org.apache.hadoop.util.Progressable;
90  import org.apache.hadoop.util.ReflectionUtils;
91  import org.apache.hadoop.util.StringUtils;
92
93  import com.google.common.base.Throwables;
94  import com.google.common.collect.Iterators;
95  import com.google.common.primitives.Ints;
96
97  import edu.umd.cs.findbugs.annotations.CheckForNull;
98
99  /**
100  * Utility methods for interacting with the underlying file system.
101  */
102 @InterfaceAudience.Private
103 public abstract class FSUtils {
104   private static final Log LOG = LogFactory.getLog(FSUtils.class);
105
106   /** Full access permissions (starting point for a umask) */
107   public static final String FULL_RWX_PERMISSIONS = "777";
108   private static final String THREAD_POOLSIZE = "hbase.client.localityCheck.threadPoolSize";
109   private static final int DEFAULT_THREAD_POOLSIZE = 2;
110
111   /** Set to true on Windows platforms */
112   public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
113
114   protected FSUtils() {
115     super();
116   }
117
118   /**
119    * Sets storage policy for given path according to config setting.
120    * If the passed path is a directory, we'll set the storage policy for all files
121    * created in the future in said directory. Note that this change in storage
122    * policy takes place at the HDFS level; it will persist beyond this RS's lifecycle.
123    * If we're running on a version of HDFS that doesn't support the given storage policy
124    * (or storage policies at all), then we'll issue a log message and continue.
125    *
126    * See http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html
127    *
128    * @param fs We only do anything if an instance of DistributedFileSystem
129    * @param conf used to look up storage policy with given key; not modified.
130    * @param path the Path whose storage policy is to be set
131    * @param policyKey e.g. HConstants.WAL_STORAGE_POLICY
132    * @param defaultPolicy usually should be the policy NONE to delegate to HDFS
133    */
134   public static void setStoragePolicy(final FileSystem fs, final Configuration conf,
135       final Path path, final String policyKey, final String defaultPolicy) {
136     String storagePolicy = conf.get(policyKey, defaultPolicy).toUpperCase(Locale.ROOT);
137     if (storagePolicy.equals(defaultPolicy)) {
138       if (LOG.isTraceEnabled()) {
139         LOG.trace("default policy of " + defaultPolicy + " requested, exiting early.");
140       }
141       return;
142     }
143     if (fs instanceof DistributedFileSystem) {
144       DistributedFileSystem dfs = (DistributedFileSystem)fs;
145       // Once our minimum supported Hadoop version is 2.6.0 we can remove reflection.
146       Class<? extends DistributedFileSystem> dfsClass = dfs.getClass();
147       Method m = null;
148       try {
149         m = dfsClass.getDeclaredMethod("setStoragePolicy",
150             new Class<?>[] { Path.class, String.class });
151         m.setAccessible(true);
152       } catch (NoSuchMethodException e) {
153         LOG.info("FileSystem doesn't support"
154             + " setStoragePolicy; --HDFS-6584 not available");
155       } catch (SecurityException e) {
156         LOG.info("Doesn't have access to setStoragePolicy on "
157             + "FileSystems --HDFS-6584 not available", e);
158         m = null; // could happen on setAccessible()
159       }
160       if (m != null) {
161         try {
162           m.invoke(dfs, path, storagePolicy);
163           LOG.info("set " + storagePolicy + " for " + path);
164         } catch (Exception e) {
165           // check for lack of HDFS-7228
166           boolean probablyBadPolicy = false;
167           if (e instanceof InvocationTargetException) {
168             final Throwable exception = e.getCause();
169             if (exception instanceof RemoteException &&
170                 HadoopIllegalArgumentException.class.getName().equals(
171                     ((RemoteException)exception).getClassName())) {
172               LOG.warn("Given storage policy, '" + storagePolicy + "', was rejected and probably " +
173                   "isn't a valid policy for the version of Hadoop you're running. I.e. if you're " +
174                   "trying to use SSD related policies then you're likely missing HDFS-7228. For " +
175                   "more information see the 'ArchivalStorage' docs for your Hadoop release.");
176               LOG.debug("More information about the invalid storage policy.", exception);
177               probablyBadPolicy = true;
178             }
179           }
180           if (!probablyBadPolicy) {
181             // This swallows FNFE, should we be throwing it? seems more likely to indicate dev
182             // misuse than a runtime problem with HDFS.
183             LOG.warn("Unable to set " + storagePolicy + " for " + path, e);
184           }
185         }
186       }
187     } else {
188       LOG.info("FileSystem isn't an instance of DistributedFileSystem; presuming it doesn't " +
189           "support setStoragePolicy.");
190     }
191   }
192
193   /**
194    * Compare of path component. Does not consider schema; i.e. if schemas
195    * different but <code>path</code> starts with <code>rootPath</code>,
196    * then the function returns true
197    * @param rootPath
198    * @param path
199    * @return True if <code>path</code> starts with <code>rootPath</code>
200    */
201   public static boolean isStartingWithPath(final Path rootPath, final String path) {
202     String uriRootPath = rootPath.toUri().getPath();
203     String tailUriPath = (new Path(path)).toUri().getPath();
204     return tailUriPath.startsWith(uriRootPath);
205   }
206
207   /**
208    * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
209    * '/a/b/c' part. Does not consider schema; i.e. if schemas different but path or subpath matches,
210    * the two will equate.
211    * @param pathToSearch Path we will be trying to match.
212    * @param pathTail
213    * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
214    */
215   public static boolean isMatchingTail(final Path pathToSearch, String pathTail) {
216     return isMatchingTail(pathToSearch, new Path(pathTail));
217   }
218
219   /**
220    * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
221    * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true.  Does not consider
222    * schema; i.e. if schemas different but path or subpath matches, the two will equate.
223    * @param pathToSearch Path we will be trying to match.
224    * @param pathTail
225    * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
226    */
227   public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) {
228     if (pathToSearch.depth() != pathTail.depth()) return false;
229     Path tailPath = pathTail;
230     String tailName;
231     Path toSearch = pathToSearch;
232     String toSearchName;
233     boolean result = false;
234     do {
235       tailName = tailPath.getName();
236       if (tailName == null || tailName.length() <= 0) {
237         result = true;
238         break;
239       }
240       toSearchName = toSearch.getName();
241       if (toSearchName == null || toSearchName.length() <= 0) break;
242       // Move up a parent on each path for next go around.  Path doesn't let us go off the end.
243       tailPath = tailPath.getParent();
244       toSearch = toSearch.getParent();
245     } while(tailName.equals(toSearchName));
246     return result;
247   }
248
249   public static FSUtils getInstance(FileSystem fs, Configuration conf) {
250     String scheme = fs.getUri().getScheme();
251     if (scheme == null) {
252       LOG.warn("Could not find scheme for uri " +
253           fs.getUri() + ", default to hdfs");
254       scheme = "hdfs";
255     }
256     Class<?> fsUtilsClass = conf.getClass("hbase.fsutil." +
257         scheme + ".impl", FSHDFSUtils.class); // Default to HDFS impl
258     FSUtils fsUtils = (FSUtils)ReflectionUtils.newInstance(fsUtilsClass, conf);
259     return fsUtils;
260   }
261
262   /**
263    * Delete if exists.
264    * @param fs filesystem object
265    * @param dir directory to delete
266    * @return True if deleted <code>dir</code>
267    * @throws IOException e
268    */
269   public static boolean deleteDirectory(final FileSystem fs, final Path dir)
270   throws IOException {
271     return fs.exists(dir) && fs.delete(dir, true);
272   }
273
274   /**
275    * Delete the region directory if exists.
276    * @param conf
277    * @param hri
278    * @return True if deleted the region directory.
279    * @throws IOException
280    */
281   public static boolean deleteRegionDir(final Configuration conf, final HRegionInfo hri)
282   throws IOException {
283     Path rootDir = getRootDir(conf);
284     FileSystem fs = rootDir.getFileSystem(conf);
285     return deleteDirectory(fs,
286       new Path(getTableDir(rootDir, hri.getTable()), hri.getEncodedName()));
287   }
288
289   /**
290    * Return the number of bytes that large input files should be optimally
291    * be split into to minimize i/o time.
292    *
293    * use reflection to search for getDefaultBlockSize(Path f)
294    * if the method doesn't exist, fall back to using getDefaultBlockSize()
295    *
296    * @param fs filesystem object
297    * @return the default block size for the path's filesystem
298    * @throws IOException e
299    */
300   public static long getDefaultBlockSize(final FileSystem fs, final Path path) throws IOException {
301     Method m = null;
302     Class<? extends FileSystem> cls = fs.getClass();
303     try {
304       m = cls.getMethod("getDefaultBlockSize", new Class<?>[] { Path.class });
305     } catch (NoSuchMethodException e) {
306       LOG.info("FileSystem doesn't support getDefaultBlockSize");
307     } catch (SecurityException e) {
308       LOG.info("Doesn't have access to getDefaultBlockSize on FileSystems", e);
309       m = null; // could happen on setAccessible()
310     }
311     if (m == null) {
312       return fs.getDefaultBlockSize(path);
313     } else {
314       try {
315         Object ret = m.invoke(fs, path);
316         return ((Long)ret).longValue();
317       } catch (Exception e) {
318         throw new IOException(e);
319       }
320     }
321   }
322
323   /*
324    * Get the default replication.
325    *
326    * use reflection to search for getDefaultReplication(Path f)
327    * if the method doesn't exist, fall back to using getDefaultReplication()
328    *
329    * @param fs filesystem object
330    * @param f path of file
331    * @return default replication for the path's filesystem
332    * @throws IOException e
333    */
334   public static short getDefaultReplication(final FileSystem fs, final Path path) throws IOException {
335     Method m = null;
336     Class<? extends FileSystem> cls = fs.getClass();
337     try {
338       m = cls.getMethod("getDefaultReplication", new Class<?>[] { Path.class });
339     } catch (NoSuchMethodException e) {
340       LOG.info("FileSystem doesn't support getDefaultReplication");
341     } catch (SecurityException e) {
342       LOG.info("Doesn't have access to getDefaultReplication on FileSystems", e);
343       m = null; // could happen on setAccessible()
344     }
345     if (m == null) {
346       return fs.getDefaultReplication(path);
347     } else {
348       try {
349         Object ret = m.invoke(fs, path);
350         return ((Number)ret).shortValue();
351       } catch (Exception e) {
352         throw new IOException(e);
353       }
354     }
355   }
356
357   /**
358    * Returns the default buffer size to use during writes.
359    *
360    * The size of the buffer should probably be a multiple of hardware
361    * page size (4096 on Intel x86), and it determines how much data is
362    * buffered during read and write operations.
363    *
364    * @param fs filesystem object
365    * @return default buffer size to use during writes
366    */
367   public static int getDefaultBufferSize(final FileSystem fs) {
368     return fs.getConf().getInt("io.file.buffer.size", 4096);
369   }
370
371   /**
372    * Create the specified file on the filesystem. By default, this will:
373    * <ol>
374    * <li>overwrite the file if it exists</li>
375    * <li>apply the umask in the configuration (if it is enabled)</li>
376    * <li>use the fs configured buffer size (or 4096 if not set)</li>
377    * <li>use the configured column family replication or default replication if
378    * {@link HColumnDescriptor#DEFAULT_DFS_REPLICATION}</li>
379    * <li>use the default block size</li>
380    * <li>not track progress</li>
381    * </ol>
382    * @param conf configurations
383    * @param fs {@link FileSystem} on which to write the file
384    * @param path {@link Path} to the file to write
385    * @param perm permissions
386    * @param favoredNodes
387    * @return output stream to the created file
388    * @throws IOException if the file cannot be created
389    */
390   public static FSDataOutputStream create(Configuration conf, FileSystem fs, Path path,
391       FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException {
392     if (fs instanceof HFileSystem) {
393       FileSystem backingFs = ((HFileSystem)fs).getBackingFs();
394       if (backingFs instanceof DistributedFileSystem) {
395         // Try to use the favoredNodes version via reflection to allow backwards-
396         // compatibility.
397         short replication = Short.parseShort(conf.get(HColumnDescriptor.DFS_REPLICATION,
398           String.valueOf(HColumnDescriptor.DEFAULT_DFS_REPLICATION)));
399         try {
400           return (FSDataOutputStream) (DistributedFileSystem.class.getDeclaredMethod("create",
401             Path.class, FsPermission.class, boolean.class, int.class, short.class, long.class,
402             Progressable.class, InetSocketAddress[].class).invoke(backingFs, path, perm, true,
403             getDefaultBufferSize(backingFs),
404             replication > 0 ? replication : getDefaultReplication(backingFs, path),
405             getDefaultBlockSize(backingFs, path), null, favoredNodes));
406         } catch (InvocationTargetException ite) {
407           // Function was properly called, but threw it's own exception.
408           throw new IOException(ite.getCause());
409         } catch (NoSuchMethodException e) {
410           LOG.debug("DFS Client does not support most favored nodes create; using default create");
411           if (LOG.isTraceEnabled()) LOG.trace("Ignoring; use default create", e);
412         } catch (IllegalArgumentException e) {
413           LOG.debug("Ignoring (most likely Reflection related exception) " + e);
414         } catch (SecurityException e) {
415           LOG.debug("Ignoring (most likely Reflection related exception) " + e);
416         } catch (IllegalAccessException e) {
417           LOG.debug("Ignoring (most likely Reflection related exception) " + e);
418         }
419       }
420     }
421     return create(fs, path, perm, true);
422   }
423
424   /**
425    * Create the specified file on the filesystem. By default, this will:
426    * <ol>
427    * <li>apply the umask in the configuration (if it is enabled)</li>
428    * <li>use the fs configured buffer size (or 4096 if not set)</li>
429    * <li>use the default replication</li>
430    * <li>use the default block size</li>
431    * <li>not track progress</li>
432    * </ol>
433    *
434    * @param fs {@link FileSystem} on which to write the file
435    * @param path {@link Path} to the file to write
436    * @param perm
437    * @param overwrite Whether or not the created file should be overwritten.
438    * @return output stream to the created file
439    * @throws IOException if the file cannot be created
440    */
441   public static FSDataOutputStream create(FileSystem fs, Path path,
442       FsPermission perm, boolean overwrite) throws IOException {
443     if (LOG.isTraceEnabled()) {
444       LOG.trace("Creating file=" + path + " with permission=" + perm + ", overwrite=" + overwrite);
445     }
446     return fs.create(path, perm, overwrite, getDefaultBufferSize(fs),
447         getDefaultReplication(fs, path), getDefaultBlockSize(fs, path), null);
448   }
449
450   /**
451    * Get the file permissions specified in the configuration, if they are
452    * enabled.
453    *
454    * @param fs filesystem that the file will be created on.
455    * @param conf configuration to read for determining if permissions are
456    *          enabled and which to use
457    * @param permssionConfKey property key in the configuration to use when
458    *          finding the permission
459    * @return the permission to use when creating a new file on the fs. If
460    *         special permissions are not specified in the configuration, then
461    *         the default permissions on the the fs will be returned.
462    */
463   public static FsPermission getFilePermissions(final FileSystem fs,
464       final Configuration conf, final String permssionConfKey) {
465     boolean enablePermissions = conf.getBoolean(
466         HConstants.ENABLE_DATA_FILE_UMASK, false);
467
468     if (enablePermissions) {
469       try {
470         FsPermission perm = new FsPermission(FULL_RWX_PERMISSIONS);
471         // make sure that we have a mask, if not, go default.
472         String mask = conf.get(permssionConfKey);
473         if (mask == null)
474           return FsPermission.getFileDefault();
475         // appy the umask
476         FsPermission umask = new FsPermission(mask);
477         return perm.applyUMask(umask);
478       } catch (IllegalArgumentException e) {
479         LOG.warn(
480             "Incorrect umask attempted to be created: "
481                 + conf.get(permssionConfKey)
482                 + ", using default file permissions.", e);
483         return FsPermission.getFileDefault();
484       }
485     }
486     return FsPermission.getFileDefault();
487   }
488
489   /**
490    * Checks to see if the specified file system is available
491    *
492    * @param fs filesystem
493    * @throws IOException e
494    */
495   public static void checkFileSystemAvailable(final FileSystem fs)
496   throws IOException {
497     if (!(fs instanceof DistributedFileSystem)) {
498       return;
499     }
500     IOException exception = null;
501     DistributedFileSystem dfs = (DistributedFileSystem) fs;
502     try {
503       if (dfs.exists(new Path("/"))) {
504         return;
505       }
506     } catch (IOException e) {
507       exception = e instanceof RemoteException ?
508               ((RemoteException)e).unwrapRemoteException() : e;
509     }
510     try {
511       fs.close();
512     } catch (Exception e) {
513       LOG.error("file system close failed: ", e);
514     }
515     IOException io = new IOException("File system is not available");
516     io.initCause(exception);
517     throw io;
518   }
519
520   /**
521    * We use reflection because {@link DistributedFileSystem#setSafeMode(
522    * HdfsConstants.SafeModeAction action, boolean isChecked)} is not in hadoop 1.1
523    *
524    * @param dfs
525    * @return whether we're in safe mode
526    * @throws IOException
527    */
528   private static boolean isInSafeMode(DistributedFileSystem dfs) throws IOException {
529     boolean inSafeMode = false;
530     try {
531       Method m = DistributedFileSystem.class.getMethod("setSafeMode", new Class<?> []{
532           org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.class, boolean.class});
533       inSafeMode = (Boolean) m.invoke(dfs,
534         org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET, true);
535     } catch (Exception e) {
536       if (e instanceof IOException) throw (IOException) e;
537
538       // Check whether dfs is on safemode.
539       inSafeMode = dfs.setSafeMode(
540         org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET);
541     }
542     return inSafeMode;
543   }
544
545   /**
546    * Check whether dfs is in safemode.
547    * @param conf
548    * @throws IOException
549    */
550   public static void checkDfsSafeMode(final Configuration conf)
551   throws IOException {
552     boolean isInSafeMode = false;
553     FileSystem fs = FileSystem.get(conf);
554     if (fs instanceof DistributedFileSystem) {
555       DistributedFileSystem dfs = (DistributedFileSystem)fs;
556       isInSafeMode = isInSafeMode(dfs);
557     }
558     if (isInSafeMode) {
559       throw new IOException("File system is in safemode, it can't be written now");
560     }
561   }
562
563   /**
564    * Verifies current version of file system
565    *
566    * @param fs filesystem object
567    * @param rootdir root hbase directory
568    * @return null if no version file exists, version string otherwise.
569    * @throws IOException e
570    * @throws org.apache.hadoop.hbase.exceptions.DeserializationException
571    */
572   public static String getVersion(FileSystem fs, Path rootdir)
573   throws IOException, DeserializationException {
574     Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
575     FileStatus[] status = null;
576     try {
577       // hadoop 2.0 throws FNFE if directory does not exist.
578       // hadoop 1.0 returns null if directory does not exist.
579       status = fs.listStatus(versionFile);
580     } catch (FileNotFoundException fnfe) {
581       return null;
582     }
583     if (status == null || status.length == 0) return null;
584     String version = null;
585     byte [] content = new byte [(int)status[0].getLen()];
586     FSDataInputStream s = fs.open(versionFile);
587     try {
588       IOUtils.readFully(s, content, 0, content.length);
589       if (ProtobufUtil.isPBMagicPrefix(content)) {
590         version = parseVersionFrom(content);
591       } else {
592         // Presume it pre-pb format.
593         InputStream is = new ByteArrayInputStream(content);
594         DataInputStream dis = new DataInputStream(is);
595         try {
596           version = dis.readUTF();
597         } finally {
598           dis.close();
599         }
600       }
601     } catch (EOFException eof) {
602       LOG.warn("Version file was empty, odd, will try to set it.");
603     } finally {
604       s.close();
605     }
606     return version;
607   }
608
609   /**
610    * Parse the content of the ${HBASE_ROOTDIR}/hbase.version file.
611    * @param bytes The byte content of the hbase.version file.
612    * @return The version found in the file as a String.
613    * @throws DeserializationException
614    */
615   static String parseVersionFrom(final byte [] bytes)
616   throws DeserializationException {
617     ProtobufUtil.expectPBMagicPrefix(bytes);
618     int pblen = ProtobufUtil.lengthOfPBMagic();
619     FSProtos.HBaseVersionFileContent.Builder builder =
620       FSProtos.HBaseVersionFileContent.newBuilder();
621     try {
622       ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
623       return builder.getVersion();
624     } catch (IOException e) {
625       // Convert
626       throw new DeserializationException(e);
627     }
628   }
629
630   /**
631    * Create the content to write into the ${HBASE_ROOTDIR}/hbase.version file.
632    * @param version Version to persist
633    * @return Serialized protobuf with <code>version</code> content and a bit of pb magic for a prefix.
634    */
635   static byte [] toVersionByteArray(final String version) {
636     FSProtos.HBaseVersionFileContent.Builder builder =
637       FSProtos.HBaseVersionFileContent.newBuilder();
638     return ProtobufUtil.prependPBMagic(builder.setVersion(version).build().toByteArray());
639   }
640
641   /**
642    * Verifies current version of file system
643    *
644    * @param fs file system
645    * @param rootdir root directory of HBase installation
646    * @param message if true, issues a message on System.out
647    *
648    * @throws IOException e
649    * @throws DeserializationException
650    */
651   public static void checkVersion(FileSystem fs, Path rootdir, boolean message)
652   throws IOException, DeserializationException {
653     checkVersion(fs, rootdir, message, 0, HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
654   }
655
656   /**
657    * Verifies current version of file system
658    *
659    * @param fs file system
660    * @param rootdir root directory of HBase installation
661    * @param message if true, issues a message on System.out
662    * @param wait wait interval
663    * @param retries number of times to retry
664    *
665    * @throws IOException e
666    * @throws DeserializationException
667    */
668   public static void checkVersion(FileSystem fs, Path rootdir,
669       boolean message, int wait, int retries)
670   throws IOException, DeserializationException {
671     String version = getVersion(fs, rootdir);
672     if (version == null) {
673       if (!metaRegionExists(fs, rootdir)) {
674         // rootDir is empty (no version file and no root region)
675         // just create new version file (HBASE-1195)
676         setVersion(fs, rootdir, wait, retries);
677         return;
678       }
679     } else if (version.compareTo(HConstants.FILE_SYSTEM_VERSION) == 0) return;
680
681     // version is deprecated require migration
682     // Output on stdout so user sees it in terminal.
683     String msg = "HBase file layout needs to be upgraded."
684       + " You have version " + version
685       + " and I want version " + HConstants.FILE_SYSTEM_VERSION
686       + ". Consult http://hbase.apache.org/book.html for further information about upgrading HBase."
687       + " Is your hbase.rootdir valid? If so, you may need to run "
688       + "'hbase hbck -fixVersionFile'.";
689     if (message) {
690       System.out.println("WARNING! " + msg);
691     }
692     throw new FileSystemVersionException(msg);
693   }
694
695   /**
696    * Sets version of file system
697    *
698    * @param fs filesystem object
699    * @param rootdir hbase root
700    * @throws IOException e
701    */
702   public static void setVersion(FileSystem fs, Path rootdir)
703   throws IOException {
704     setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, 0,
705       HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
706   }
707
708   /**
709    * Sets version of file system
710    *
711    * @param fs filesystem object
712    * @param rootdir hbase root
713    * @param wait time to wait for retry
714    * @param retries number of times to retry before failing
715    * @throws IOException e
716    */
717   public static void setVersion(FileSystem fs, Path rootdir, int wait, int retries)
718   throws IOException {
719     setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, wait, retries);
720   }
721
722
723   /**
724    * Sets version of file system
725    *
726    * @param fs filesystem object
727    * @param rootdir hbase root directory
728    * @param version version to set
729    * @param wait time to wait for retry
730    * @param retries number of times to retry before throwing an IOException
731    * @throws IOException e
732    */
733   public static void setVersion(FileSystem fs, Path rootdir, String version,
734       int wait, int retries) throws IOException {
735     Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
736     Path tempVersionFile = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY + Path.SEPARATOR +
737       HConstants.VERSION_FILE_NAME);
738     while (true) {
739       try {
740         // Write the version to a temporary file
741         FSDataOutputStream s = fs.create(tempVersionFile);
742         try {
743           s.write(toVersionByteArray(version));
744           s.close();
745           s = null;
746           // Move the temp version file to its normal location. Returns false
747           // if the rename failed. Throw an IOE in that case.
748           if (!fs.rename(tempVersionFile, versionFile)) {
749             throw new IOException("Unable to move temp version file to " + versionFile);
750           }
751         } finally {
752           // Cleaning up the temporary if the rename failed would be trying
753           // too hard. We'll unconditionally create it again the next time
754           // through anyway, files are overwritten by default by create().
755
756           // Attempt to close the stream on the way out if it is still open.
757           try {
758             if (s != null) s.close();
759           } catch (IOException ignore) { }
760         }
761         LOG.info("Created version file at " + rootdir.toString() + " with version=" + version);
762         return;
763       } catch (IOException e) {
764         if (retries > 0) {
765           LOG.debug("Unable to create version file at " + rootdir.toString() + ", retrying", e);
766           fs.delete(versionFile, false);
767           try {
768             if (wait > 0) {
769               Thread.sleep(wait);
770             }
771           } catch (InterruptedException ie) {
772             throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
773           }
774           retries--;
775         } else {
776           throw e;
777         }
778       }
779     }
780   }
781
782   /**
783    * Checks that a cluster ID file exists in the HBase root directory
784    * @param fs the root directory FileSystem
785    * @param rootdir the HBase root directory in HDFS
786    * @param wait how long to wait between retries
787    * @return <code>true</code> if the file exists, otherwise <code>false</code>
788    * @throws IOException if checking the FileSystem fails
789    */
790   public static boolean checkClusterIdExists(FileSystem fs, Path rootdir,
791       int wait) throws IOException {
792     while (true) {
793       try {
794         Path filePath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
795         return fs.exists(filePath);
796       } catch (IOException ioe) {
797         if (wait > 0) {
798           LOG.warn("Unable to check cluster ID file in " + rootdir.toString() +
799               ", retrying in "+wait+"msec: "+StringUtils.stringifyException(ioe));
800           try {
801             Thread.sleep(wait);
802           } catch (InterruptedException e) {
803             throw (InterruptedIOException)new InterruptedIOException().initCause(e);
804           }
805         } else {
806           throw ioe;
807         }
808       }
809     }
810   }
811
812   /**
813    * Returns the value of the unique cluster ID stored for this HBase instance.
814    * @param fs the root directory FileSystem
815    * @param rootdir the path to the HBase root directory
816    * @return the unique cluster identifier
817    * @throws IOException if reading the cluster ID file fails
818    */
819   public static ClusterId getClusterId(FileSystem fs, Path rootdir)
820   throws IOException {
821     Path idPath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
822     ClusterId clusterId = null;
823     FileStatus status = fs.exists(idPath)? fs.getFileStatus(idPath):  null;
824     if (status != null) {
825       int len = Ints.checkedCast(status.getLen());
826       byte [] content = new byte[len];
827       FSDataInputStream in = fs.open(idPath);
828       try {
829         in.readFully(content);
830       } catch (EOFException eof) {
831         LOG.warn("Cluster ID file " + idPath.toString() + " was empty");
832       } finally{
833         in.close();
834       }
835       try {
836         clusterId = ClusterId.parseFrom(content);
837       } catch (DeserializationException e) {
838         throw new IOException("content=" + Bytes.toString(content), e);
839       }
840       // If not pb'd, make it so.
841       if (!ProtobufUtil.isPBMagicPrefix(content)) {
842         String cid = null;
843         in = fs.open(idPath);
844         try {
845           cid = in.readUTF();
846           clusterId = new ClusterId(cid);
847         } catch (EOFException eof) {
848           LOG.warn("Cluster ID file " + idPath.toString() + " was empty");
849         } finally {
850           in.close();
851         }
852         rewriteAsPb(fs, rootdir, idPath, clusterId);
853       }
854       return clusterId;
855     } else {
856       LOG.warn("Cluster ID file does not exist at " + idPath.toString());
857     }
858     return clusterId;
859   }
860
861   /**
862    * @param cid
863    * @throws IOException
864    */
865   private static void rewriteAsPb(final FileSystem fs, final Path rootdir, final Path p,
866       final ClusterId cid)
867   throws IOException {
868     // Rewrite the file as pb.  Move aside the old one first, write new
869     // then delete the moved-aside file.
870     Path movedAsideName = new Path(p + "." + System.currentTimeMillis());
871     if (!fs.rename(p, movedAsideName)) throw new IOException("Failed rename of " + p);
872     setClusterId(fs, rootdir, cid, 100);
873     if (!fs.delete(movedAsideName, false)) {
874       throw new IOException("Failed delete of " + movedAsideName);
875     }
876     LOG.debug("Rewrote the hbase.id file as pb");
877   }
878
879   /**
880    * Writes a new unique identifier for this cluster to the "hbase.id" file
881    * in the HBase root directory
882    * @param fs the root directory FileSystem
883    * @param rootdir the path to the HBase root directory
884    * @param clusterId the unique identifier to store
885    * @param wait how long (in milliseconds) to wait between retries
886    * @throws IOException if writing to the FileSystem fails and no wait value
887    */
888   public static void setClusterId(FileSystem fs, Path rootdir, ClusterId clusterId,
889       int wait) throws IOException {
890     while (true) {
891       try {
892         Path idFile = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
893         Path tempIdFile = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY +
894           Path.SEPARATOR + HConstants.CLUSTER_ID_FILE_NAME);
895         // Write the id file to a temporary location
896         FSDataOutputStream s = fs.create(tempIdFile);
897         try {
898           s.write(clusterId.toByteArray());
899           s.close();
900           s = null;
901           // Move the temporary file to its normal location. Throw an IOE if
902           // the rename failed
903           if (!fs.rename(tempIdFile, idFile)) {
904             throw new IOException("Unable to move temp version file to " + idFile);
905           }
906         } finally {
907           // Attempt to close the stream if still open on the way out
908           try {
909             if (s != null) s.close();
910           } catch (IOException ignore) { }
911         }
912         if (LOG.isDebugEnabled()) {
913           LOG.debug("Created cluster ID file at " + idFile.toString() + " with ID: " + clusterId);
914         }
915         return;
916       } catch (IOException ioe) {
917         if (wait > 0) {
918           LOG.warn("Unable to create cluster ID file in " + rootdir.toString() +
919               ", retrying in " + wait + "msec: " + StringUtils.stringifyException(ioe));
920           try {
921             Thread.sleep(wait);
922           } catch (InterruptedException e) {
923             throw (InterruptedIOException)new InterruptedIOException().initCause(e);
924           }
925         } else {
926           throw ioe;
927         }
928       }
929     }
930   }
931
932   /**
933    * Verifies root directory path is a valid URI with a scheme
934    *
935    * @param root root directory path
936    * @return Passed <code>root</code> argument.
937    * @throws IOException if not a valid URI with a scheme
938    */
939   public static Path validateRootPath(Path root) throws IOException {
940     try {
941       URI rootURI = new URI(root.toString());
942       String scheme = rootURI.getScheme();
943       if (scheme == null) {
944         throw new IOException("Root directory does not have a scheme");
945       }
946       return root;
947     } catch (URISyntaxException e) {
948       IOException io = new IOException("Root directory path is not a valid " +
949         "URI -- check your " + HConstants.HBASE_DIR + " configuration");
950       io.initCause(e);
951       throw io;
952     }
953   }
954
955   /**
956    * Checks for the presence of the root path (using the provided conf object) in the given path. If
957    * it exists, this method removes it and returns the String representation of remaining relative path.
958    * @param path
959    * @param conf
960    * @return String representation of the remaining relative path
961    * @throws IOException
962    */
963   public static String removeRootPath(Path path, final Configuration conf) throws IOException {
964     Path root = FSUtils.getRootDir(conf);
965     String pathStr = path.toString();
966     // check that the path is absolute... it has the root path in it.
967     if (!pathStr.startsWith(root.toString())) return pathStr;
968     // if not, return as it is.
969     return pathStr.substring(root.toString().length() + 1);// remove the "/" too.
970   }
971
972   /**
973    * If DFS, check safe mode and if so, wait until we clear it.
974    * @param conf configuration
975    * @param wait Sleep between retries
976    * @throws IOException e
977    */
978   public static void waitOnSafeMode(final Configuration conf,
979     final long wait)
980   throws IOException {
981     FileSystem fs = FileSystem.get(conf);
982     if (!(fs instanceof DistributedFileSystem)) return;
983     DistributedFileSystem dfs = (DistributedFileSystem)fs;
984     // Make sure dfs is not in safe mode
985     while (isInSafeMode(dfs)) {
986       LOG.info("Waiting for dfs to exit safe mode...");
987       try {
988         Thread.sleep(wait);
989       } catch (InterruptedException e) {
990         throw (InterruptedIOException)new InterruptedIOException().initCause(e);
991       }
992     }
993   }
994
995   /**
996    * Return the 'path' component of a Path.  In Hadoop, Path is an URI.  This
997    * method returns the 'path' component of a Path's URI: e.g. If a Path is
998    * <code>hdfs://example.org:9000/hbase_trunk/TestTable/compaction.dir</code>,
999    * this method returns <code>/hbase_trunk/TestTable/compaction.dir</code>.
1000    * This method is useful if you want to print out a Path without qualifying
1001    * Filesystem instance.
1002    * @param p Filesystem Path whose 'path' component we are to return.
1003    * @return Path portion of the Filesystem
1004    */
1005   public static String getPath(Path p) {
1006     return p.toUri().getPath();
1007   }
1008
1009   /**
1010    * @param c configuration
1011    * @return Path to hbase root directory: i.e. <code>hbase.rootdir</code> from
1012    * configuration as a qualified Path.
1013    * @throws IOException e
1014    */
1015   public static Path getRootDir(final Configuration c) throws IOException {
1016     Path p = new Path(c.get(HConstants.HBASE_DIR));
1017     FileSystem fs = p.getFileSystem(c);
1018     return p.makeQualified(fs);
1019   }
1020
1021   public static void setRootDir(final Configuration c, final Path root) throws IOException {
1022     c.set(HConstants.HBASE_DIR, root.toString());
1023   }
1024
1025   public static void setFsDefault(final Configuration c, final Path root) throws IOException {
1026     c.set("fs.defaultFS", root.toString());    // for hadoop 0.21+
1027   }
1028
1029   /**
1030    * Checks if meta region exists
1031    *
1032    * @param fs file system
1033    * @param rootdir root directory of HBase installation
1034    * @return true if exists
1035    * @throws IOException e
1036    */
1037   @SuppressWarnings("deprecation")
1038   public static boolean metaRegionExists(FileSystem fs, Path rootdir)
1039   throws IOException {
1040     Path metaRegionDir =
1041       HRegion.getRegionDir(rootdir, HRegionInfo.FIRST_META_REGIONINFO);
1042     return fs.exists(metaRegionDir);
1043   }
1044
1045   /**
1046    * Compute HDFS blocks distribution of a given file, or a portion of the file
1047    * @param fs file system
1048    * @param status file status of the file
1049    * @param start start position of the portion
1050    * @param length length of the portion
1051    * @return The HDFS blocks distribution
1052    */
1053   static public HDFSBlocksDistribution computeHDFSBlocksDistribution(
1054     final FileSystem fs, FileStatus status, long start, long length)
1055     throws IOException {
1056     HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution();
1057     BlockLocation [] blockLocations =
1058       fs.getFileBlockLocations(status, start, length);
1059     for(BlockLocation bl : blockLocations) {
1060       String [] hosts = bl.getHosts();
1061       long len = bl.getLength();
1062       blocksDistribution.addHostsAndBlockWeight(hosts, len);
1063     }
1064
1065     return blocksDistribution;
1066   }
1067
1068   // TODO move this method OUT of FSUtils. No dependencies to HMaster
1069   /**
1070    * Returns the total overall fragmentation percentage. Includes hbase:meta and
1071    * -ROOT- as well.
1072    *
1073    * @param master  The master defining the HBase root and file system.
1074    * @return A map for each table and its percentage.
1075    * @throws IOException When scanning the directory fails.
1076    */
1077   public static int getTotalTableFragmentation(final HMaster master)
1078   throws IOException {
1079     Map<String, Integer> map = getTableFragmentation(master);
1080     return map != null && map.size() > 0 ? map.get("-TOTAL-") : -1;
1081   }
1082
1083   /**
1084    * Runs through the HBase rootdir and checks how many stores for each table
1085    * have more than one file in them. Checks -ROOT- and hbase:meta too. The total
1086    * percentage across all tables is stored under the special key "-TOTAL-".
1087    *
1088    * @param master  The master defining the HBase root and file system.
1089    * @return A map for each table and its percentage.
1090    *
1091    * @throws IOException When scanning the directory fails.
1092    */
1093   public static Map<String, Integer> getTableFragmentation(
1094     final HMaster master)
1095   throws IOException {
1096     Path path = getRootDir(master.getConfiguration());
1097     // since HMaster.getFileSystem() is package private
1098     FileSystem fs = path.getFileSystem(master.getConfiguration());
1099     return getTableFragmentation(fs, path);
1100   }
1101
1102   /**
1103    * Runs through the HBase rootdir and checks how many stores for each table
1104    * have more than one file in them. Checks -ROOT- and hbase:meta too. The total
1105    * percentage across all tables is stored under the special key "-TOTAL-".
1106    *
1107    * @param fs  The file system to use.
1108    * @param hbaseRootDir  The root directory to scan.
1109    * @return A map for each table and its percentage.
1110    * @throws IOException When scanning the directory fails.
1111    */
1112   public static Map<String, Integer> getTableFragmentation(
1113     final FileSystem fs, final Path hbaseRootDir)
1114   throws IOException {
1115     Map<String, Integer> frags = new HashMap<String, Integer>();
1116     int cfCountTotal = 0;
1117     int cfFragTotal = 0;
1118     PathFilter regionFilter = new RegionDirFilter(fs);
1119     PathFilter familyFilter = new FamilyDirFilter(fs);
1120     List<Path> tableDirs = getTableDirs(fs, hbaseRootDir);
1121     for (Path d : tableDirs) {
1122       int cfCount = 0;
1123       int cfFrag = 0;
1124       FileStatus[] regionDirs = fs.listStatus(d, regionFilter);
1125       for (FileStatus regionDir : regionDirs) {
1126         Path dd = regionDir.getPath();
1127         // else its a region name, now look in region for families
1128         FileStatus[] familyDirs = fs.listStatus(dd, familyFilter);
1129         for (FileStatus familyDir : familyDirs) {
1130           cfCount++;
1131           cfCountTotal++;
1132           Path family = familyDir.getPath();
1133           // now in family make sure only one file
1134           FileStatus[] familyStatus = fs.listStatus(family);
1135           if (familyStatus.length > 1) {
1136             cfFrag++;
1137             cfFragTotal++;
1138           }
1139         }
1140       }
1141       // compute percentage per table and store in result list
1142       frags.put(FSUtils.getTableName(d).getNameAsString(),
1143         cfCount == 0? 0: Math.round((float) cfFrag / cfCount * 100));
1144     }
1145     // set overall percentage for all tables
1146     frags.put("-TOTAL-",
1147       cfCountTotal == 0? 0: Math.round((float) cfFragTotal / cfCountTotal * 100));
1148     return frags;
1149   }
1150
1151   /**
1152    * Returns the {@link org.apache.hadoop.fs.Path} object representing the table directory under
1153    * path rootdir
1154    *
1155    * @param rootdir qualified path of HBase root directory
1156    * @param tableName name of table
1157    * @return {@link org.apache.hadoop.fs.Path} for table
1158    */
1159   public static Path getTableDir(Path rootdir, final TableName tableName) {
1160     return new Path(getNamespaceDir(rootdir, tableName.getNamespaceAsString()),
1161         tableName.getQualifierAsString());
1162   }
1163
1164   /**
1165    * Returns the {@link org.apache.hadoop.hbase.TableName} object representing
1166    * the table directory under
1167    * path rootdir
1168    *
1169    * @param tablePath path of table
1170    * @return {@link org.apache.hadoop.fs.Path} for table
1171    */
1172   public static TableName getTableName(Path tablePath) {
1173     return TableName.valueOf(tablePath.getParent().getName(), tablePath.getName());
1174   }
1175
1176   /**
1177    * Returns the {@link org.apache.hadoop.fs.Path} object representing
1178    * the namespace directory under path rootdir
1179    *
1180    * @param rootdir qualified path of HBase root directory
1181    * @param namespace namespace name
1182    * @return {@link org.apache.hadoop.fs.Path} for table
1183    */
1184   public static Path getNamespaceDir(Path rootdir, final String namespace) {
1185     return new Path(rootdir, new Path(HConstants.BASE_NAMESPACE_DIR,
1186         new Path(namespace)));
1187   }
1188
1189   /**
1190    * A {@link PathFilter} that returns only regular files.
1191    */
1192   static class FileFilter extends AbstractFileStatusFilter {
1193     private final FileSystem fs;
1194
1195     public FileFilter(final FileSystem fs) {
1196       this.fs = fs;
1197     }
1198
1199     @Override
1200     protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1201       try {
1202         return isFile(fs, isDir, p);
1203       } catch (IOException e) {
1204         LOG.warn("unable to verify if path=" + p + " is a regular file", e);
1205         return false;
1206       }
1207     }
1208   }
1209
1210   /**
1211    * Directory filter that doesn't include any of the directories in the specified blacklist
1212    */
1213   public static class BlackListDirFilter extends AbstractFileStatusFilter {
1214     private final FileSystem fs;
1215     private List<String> blacklist;
1216
1217     /**
1218      * Create a filter on the givem filesystem with the specified blacklist
1219      * @param fs filesystem to filter
1220      * @param directoryNameBlackList list of the names of the directories to filter. If
1221      *          <tt>null</tt>, all directories are returned
1222      */
1223     @SuppressWarnings("unchecked")
1224     public BlackListDirFilter(final FileSystem fs, final List<String> directoryNameBlackList) {
1225       this.fs = fs;
1226       blacklist =
1227         (List<String>) (directoryNameBlackList == null ? Collections.emptyList()
1228           : directoryNameBlackList);
1229     }
1230
1231     @Override
1232     protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1233       if (!isValidName(p.getName())) {
1234         return false;
1235       }
1236
1237       try {
1238         return isDirectory(fs, isDir, p);
1239       } catch (IOException e) {
1240         LOG.warn("An error occurred while verifying if [" + p.toString()
1241             + "] is a valid directory. Returning 'not valid' and continuing.", e);
1242         return false;
1243       }
1244     }
1245
1246     protected boolean isValidName(final String name) {
1247       return !blacklist.contains(name);
1248     }
1249   }
1250
1251   /**
1252    * A {@link PathFilter} that only allows directories.
1253    */
1254   public static class DirFilter extends BlackListDirFilter {
1255
1256     public DirFilter(FileSystem fs) {
1257       super(fs, null);
1258     }
1259   }
1260
1261   /**
1262    * A {@link PathFilter} that returns usertable directories. To get all directories use the
1263    * {@link BlackListDirFilter} with a <tt>null</tt> blacklist
1264    */
1265   public static class UserTableDirFilter extends BlackListDirFilter {
1266     public UserTableDirFilter(FileSystem fs) {
1267       super(fs, HConstants.HBASE_NON_TABLE_DIRS);
1268     }
1269
1270     protected boolean isValidName(final String name) {
1271       if (!super.isValidName(name))
1272         return false;
1273
1274       try {
1275         TableName.isLegalTableQualifierName(Bytes.toBytes(name));
1276       } catch (IllegalArgumentException e) {
1277         LOG.info("INVALID NAME " + name);
1278         return false;
1279       }
1280       return true;
1281     }
1282   }
1283
1284   /**
1285    * Heuristic to determine whether is safe or not to open a file for append
1286    * Looks both for dfs.support.append and use reflection to search
1287    * for SequenceFile.Writer.syncFs() or FSDataOutputStream.hflush()
1288    * @param conf
1289    * @return True if append support
1290    */
1291   public static boolean isAppendSupported(final Configuration conf) {
1292     boolean append = conf.getBoolean("dfs.support.append", false);
1293     if (append) {
1294       try {
1295         // TODO: The implementation that comes back when we do a createWriter
1296         // may not be using SequenceFile so the below is not a definitive test.
1297         // Will do for now (hdfs-200).
1298         SequenceFile.Writer.class.getMethod("syncFs", new Class<?> []{});
1299         append = true;
1300       } catch (SecurityException e) {
1301       } catch (NoSuchMethodException e) {
1302         append = false;
1303       }
1304     }
1305     if (!append) {
1306       // Look for the 0.21, 0.22, new-style append evidence.
1307       try {
1308         FSDataOutputStream.class.getMethod("hflush", new Class<?> []{});
1309         append = true;
1310       } catch (NoSuchMethodException e) {
1311         append = false;
1312       }
1313     }
1314     return append;
1315   }
1316
1317   /**
1318    * @param conf
1319    * @return True if this filesystem whose scheme is 'hdfs'.
1320    * @throws IOException
1321    */
1322   public static boolean isHDFS(final Configuration conf) throws IOException {
1323     FileSystem fs = FileSystem.get(conf);
1324     String scheme = fs.getUri().getScheme();
1325     return scheme.equalsIgnoreCase("hdfs");
1326   }
1327
1328   /**
1329    * Recover file lease. Used when a file might be suspect
1330    * to be had been left open by another process.
1331    * @param fs FileSystem handle
1332    * @param p Path of file to recover lease
1333    * @param conf Configuration handle
1334    * @throws IOException
1335    */
1336   public abstract void recoverFileLease(final FileSystem fs, final Path p,
1337       Configuration conf, CancelableProgressable reporter) throws IOException;
1338
1339   public static List<Path> getTableDirs(final FileSystem fs, final Path rootdir)
1340       throws IOException {
1341     List<Path> tableDirs = new LinkedList<Path>();
1342
1343     for(FileStatus status :
1344         fs.globStatus(new Path(rootdir,
1345             new Path(HConstants.BASE_NAMESPACE_DIR, "*")))) {
1346       tableDirs.addAll(FSUtils.getLocalTableDirs(fs, status.getPath()));
1347     }
1348     return tableDirs;
1349   }
1350
1351   /**
1352    * @param fs
1353    * @param rootdir
1354    * @return All the table directories under <code>rootdir</code>. Ignore non table hbase folders such as
1355    * .logs, .oldlogs, .corrupt folders.
1356    * @throws IOException
1357    */
1358   public static List<Path> getLocalTableDirs(final FileSystem fs, final Path rootdir)
1359       throws IOException {
1360     // presumes any directory under hbase.rootdir is a table
1361     FileStatus[] dirs = fs.listStatus(rootdir, new UserTableDirFilter(fs));
1362     List<Path> tabledirs = new ArrayList<Path>(dirs.length);
1363     for (FileStatus dir: dirs) {
1364       tabledirs.add(dir.getPath());
1365     }
1366     return tabledirs;
1367   }
1368
1369   /**
1370    * Checks if the given path is the one with 'recovered.edits' dir.
1371    * @param path
1372    * @return True if we recovered edits
1373    */
1374   public static boolean isRecoveredEdits(Path path) {
1375     return path.toString().contains(HConstants.RECOVERED_EDITS_DIR);
1376   }
1377
1378   /**
1379    * Filter for all dirs that don't start with '.'
1380    */
1381   public static class RegionDirFilter extends AbstractFileStatusFilter {
1382     // This pattern will accept 0.90+ style hex region dirs and older numeric region dir names.
1383     final public static Pattern regionDirPattern = Pattern.compile("^[0-9a-f]*$");
1384     final FileSystem fs;
1385
1386     public RegionDirFilter(FileSystem fs) {
1387       this.fs = fs;
1388     }
1389
1390     @Override
1391     protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1392       if (!regionDirPattern.matcher(p.getName()).matches()) {
1393         return false;
1394       }
1395
1396       try {
1397         return isDirectory(fs, isDir, p);
1398       } catch (IOException ioe) {
1399         // Maybe the file was moved or the fs was disconnected.
1400         LOG.warn("Skipping file " + p +" due to IOException", ioe);
1401         return false;
1402       }
1403     }
1404   }
1405
1406   /**
1407    * Given a particular table dir, return all the regiondirs inside it, excluding files such as
1408    * .tableinfo
1409    * @param fs A file system for the Path
1410    * @param tableDir Path to a specific table directory &lt;hbase.rootdir&gt;/&lt;tabledir&gt;
1411    * @return List of paths to valid region directories in table dir.
1412    * @throws IOException
1413    */
1414   public static List<Path> getRegionDirs(final FileSystem fs, final Path tableDir) throws IOException {
1415     // assumes we are in a table dir.
1416     List<FileStatus> rds = listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs));
1417     if (rds == null) {
1418       return new ArrayList<Path>();
1419     }
1420     List<Path> regionDirs = new ArrayList<Path>(rds.size());
1421     for (FileStatus rdfs: rds) {
1422       Path rdPath = rdfs.getPath();
1423       regionDirs.add(rdPath);
1424     }
1425     return regionDirs;
1426   }
1427
1428   /**
1429    * Filter for all dirs that are legal column family names.  This is generally used for colfam
1430    * dirs &lt;hbase.rootdir&gt;/&lt;tabledir&gt;/&lt;regiondir&gt;/&lt;colfamdir&gt;.
1431    */
1432   public static class FamilyDirFilter extends AbstractFileStatusFilter {
1433     final FileSystem fs;
1434
1435     public FamilyDirFilter(FileSystem fs) {
1436       this.fs = fs;
1437     }
1438
1439     @Override
1440     protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1441       try {
1442         // throws IAE if invalid
1443         HColumnDescriptor.isLegalFamilyName(Bytes.toBytes(p.getName()));
1444       } catch (IllegalArgumentException iae) {
1445         // path name is an invalid family name and thus is excluded.
1446         return false;
1447       }
1448
1449       try {
1450         return isDirectory(fs, isDir, p);
1451       } catch (IOException ioe) {
1452         // Maybe the file was moved or the fs was disconnected.
1453         LOG.warn("Skipping file " + p +" due to IOException", ioe);
1454         return false;
1455       }
1456     }
1457   }
1458
1459   /**
1460    * Given a particular region dir, return all the familydirs inside it
1461    *
1462    * @param fs A file system for the Path
1463    * @param regionDir Path to a specific region directory
1464    * @return List of paths to valid family directories in region dir.
1465    * @throws IOException
1466    */
1467   public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir) throws IOException {
1468     // assumes we are in a region dir.
1469     FileStatus[] fds = fs.listStatus(regionDir, new FamilyDirFilter(fs));
1470     List<Path> familyDirs = new ArrayList<Path>(fds.length);
1471     for (FileStatus fdfs: fds) {
1472       Path fdPath = fdfs.getPath();
1473       familyDirs.add(fdPath);
1474     }
1475     return familyDirs;
1476   }
1477
1478   public static List<Path> getReferenceFilePaths(final FileSystem fs, final Path familyDir) throws IOException {
1479     List<FileStatus> fds = listStatusWithStatusFilter(fs, familyDir, new ReferenceFileFilter(fs));
1480     if (fds == null) {
1481       return new ArrayList<Path>();
1482     }
1483     List<Path> referenceFiles = new ArrayList<Path>(fds.size());
1484     for (FileStatus fdfs: fds) {
1485       Path fdPath = fdfs.getPath();
1486       referenceFiles.add(fdPath);
1487     }
1488     return referenceFiles;
1489   }
1490
1491   /**
1492    * Filter for HFiles that excludes reference files.
1493    */
1494   public static class HFileFilter extends AbstractFileStatusFilter {
1495     final FileSystem fs;
1496
1497     public HFileFilter(FileSystem fs) {
1498       this.fs = fs;
1499     }
1500
1501     @Override
1502     protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1503       if (!StoreFileInfo.isHFile(p)) {
1504         return false;
1505       }
1506
1507       try {
1508         return isFile(fs, isDir, p);
1509       } catch (IOException ioe) {
1510         // Maybe the file was moved or the fs was disconnected.
1511         LOG.warn("Skipping file " + p +" due to IOException", ioe);
1512         return false;
1513       }
1514     }
1515   }
1516
1517   public static class ReferenceFileFilter extends AbstractFileStatusFilter {
1518 
1519     private final FileSystem fs;
1520
1521     public ReferenceFileFilter(FileSystem fs) {
1522       this.fs = fs;
1523     }
1524
1525     @Override
1526     protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1527       if (!StoreFileInfo.isReference(p)) {
1528         return false;
1529       }
1530
1531       try {
1532         // only files can be references.
1533         return isFile(fs, isDir, p);
1534       } catch (IOException ioe) {
1535         // Maybe the file was moved or the fs was disconnected.
1536         LOG.warn("Skipping file " + p +" due to IOException", ioe);
1537         return false;
1538       }
1539     }
1540   }
1541
1542
1543   /**
1544    * @param conf
1545    * @return Returns the filesystem of the hbase rootdir.
1546    * @throws IOException
1547    */
1548   public static FileSystem getCurrentFileSystem(Configuration conf)
1549   throws IOException {
1550     return getRootDir(conf).getFileSystem(conf);
1551   }
1552
1553
1554   /**
1555    * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1556    * table StoreFile names to the full Path.
1557    * <br>
1558    * Example...<br>
1559    * Key = 3944417774205889744  <br>
1560    * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1561    *
1562    * @param map map to add values.  If null, this method will create and populate one to return
1563    * @param fs  The file system to use.
1564    * @param hbaseRootDir  The root directory to scan.
1565    * @param tableName name of the table to scan.
1566    * @return Map keyed by StoreFile name with a value of the full Path.
1567    * @throws IOException When scanning the directory fails.
1568    * @throws InterruptedException
1569    */
1570   public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map,
1571   final FileSystem fs, final Path hbaseRootDir, TableName tableName)
1572   throws IOException, InterruptedException {
1573     return getTableStoreFilePathMap(map, fs, hbaseRootDir, tableName, null, null, null);
1574   }
1575
1576   /**
1577    * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1578    * table StoreFile names to the full Path.  Note that because this method can be called
1579    * on a 'live' HBase system that we will skip files that no longer exist by the time
1580    * we traverse them and similarly the user of the result needs to consider that some
1581    * entries in this map may not exist by the time this call completes.
1582    * <br>
1583    * Example...<br>
1584    * Key = 3944417774205889744  <br>
1585    * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1586    *
1587    * @param resultMap map to add values.  If null, this method will create and populate one to return
1588    * @param fs  The file system to use.
1589    * @param hbaseRootDir  The root directory to scan.
1590    * @param tableName name of the table to scan.
1591    * @param sfFilter optional path filter to apply to store files
1592    * @param executor optional executor service to parallelize this operation
1593    * @param errors ErrorReporter instance or null
1594    * @return Map keyed by StoreFile name with a value of the full Path.
1595    * @throws IOException When scanning the directory fails.
1596    * @throws InterruptedException
1597    */
1598   public static Map<String, Path> getTableStoreFilePathMap(
1599       Map<String, Path> resultMap,
1600       final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter,
1601       ExecutorService executor, final ErrorReporter errors) throws IOException, InterruptedException {
1602
1603     final Map<String, Path> finalResultMap =
1604         resultMap == null ? new ConcurrentHashMap<String, Path>(128, 0.75f, 32) : resultMap;
1605
1606     // only include the directory paths to tables
1607     Path tableDir = FSUtils.getTableDir(hbaseRootDir, tableName);
1608     // Inside a table, there are compaction.dir directories to skip.  Otherwise, all else
1609     // should be regions.
1610     final FamilyDirFilter familyFilter = new FamilyDirFilter(fs);
1611     final Vector<Exception> exceptions = new Vector<Exception>();
1612
1613     try {
1614       List<FileStatus> regionDirs = FSUtils.listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs));
1615       if (regionDirs == null) {
1616         return finalResultMap;
1617       }
1618
1619       final List<Future<?>> futures = new ArrayList<Future<?>>(regionDirs.size());
1620
1621       for (FileStatus regionDir : regionDirs) {
1622         if (null != errors) {
1623           errors.progress();
1624         }
1625         final Path dd = regionDir.getPath();
1626
1627         if (!exceptions.isEmpty()) {
1628           break;
1629         }
1630
1631         Runnable getRegionStoreFileMapCall = new Runnable() {
1632           @Override
1633           public void run() {
1634             try {
1635               HashMap<String,Path> regionStoreFileMap = new HashMap<String, Path>();
1636               List<FileStatus> familyDirs = FSUtils.listStatusWithStatusFilter(fs, dd, familyFilter);
1637               if (familyDirs == null) {
1638                 if (!fs.exists(dd)) {
1639                   LOG.warn("Skipping region because it no longer exists: " + dd);
1640                 } else {
1641                   LOG.warn("Skipping region because it has no family dirs: " + dd);
1642                 }
1643                 return;
1644               }
1645               for (FileStatus familyDir : familyDirs) {
1646                 if (null != errors) {
1647                   errors.progress();
1648                 }
1649                 Path family = familyDir.getPath();
1650                 if (family.getName().equals(HConstants.RECOVERED_EDITS_DIR)) {
1651                   continue;
1652                 }
1653                 // now in family, iterate over the StoreFiles and
1654                 // put in map
1655                 FileStatus[] familyStatus = fs.listStatus(family);
1656                 for (FileStatus sfStatus : familyStatus) {
1657                   if (null != errors) {
1658                     errors.progress();
1659                   }
1660                   Path sf = sfStatus.getPath();
1661                   if (sfFilter == null || sfFilter.accept(sf)) {
1662                     regionStoreFileMap.put( sf.getName(), sf);
1663                   }
1664                 }
1665               }
1666               finalResultMap.putAll(regionStoreFileMap);
1667             } catch (Exception e) {
1668               LOG.error("Could not get region store file map for region: " + dd, e);
1669               exceptions.add(e);
1670             }
1671           }
1672         };
1673
1674         // If executor is available, submit async tasks to exec concurrently, otherwise
1675         // just do serial sync execution
1676         if (executor != null) {
1677           Future<?> future = executor.submit(getRegionStoreFileMapCall);
1678           futures.add(future);
1679         } else {
1680           FutureTask<?> future = new FutureTask<Object>(getRegionStoreFileMapCall, null);
1681           future.run();
1682           futures.add(future);
1683         }
1684       }
1685
1686       // Ensure all pending tasks are complete (or that we run into an exception)
1687       for (Future<?> f : futures) {
1688         if (!exceptions.isEmpty()) {
1689           break;
1690         }
1691         try {
1692           f.get();
1693         } catch (ExecutionException e) {
1694           LOG.error("Unexpected exec exception!  Should've been caught already.  (Bug?)", e);
1695           // Shouldn't happen, we already logged/caught any exceptions in the Runnable
1696         }
1697       }
1698     } catch (IOException e) {
1699       LOG.error("Cannot execute getTableStoreFilePathMap for " + tableName, e);
1700       exceptions.add(e);
1701     } finally {
1702       if (!exceptions.isEmpty()) {
1703         // Just throw the first exception as an indication something bad happened
1704         // Don't need to propagate all the exceptions, we already logged them all anyway
1705         Throwables.propagateIfInstanceOf(exceptions.firstElement(), IOException.class);
1706         throw Throwables.propagate(exceptions.firstElement());
1707       }
1708     }
1709
1710     return finalResultMap;
1711   }
1712
1713   public static int getRegionReferenceFileCount(final FileSystem fs, final Path p) {
1714     int result = 0;
1715     try {
1716       for (Path familyDir:getFamilyDirs(fs, p)){
1717         result += getReferenceFilePaths(fs, familyDir).size();
1718       }
1719     } catch (IOException e) {
1720       LOG.warn("Error Counting reference files.", e);
1721     }
1722     return result;
1723   }
1724
1725   /**
1726    * Runs through the HBase rootdir and creates a reverse lookup map for
1727    * table StoreFile names to the full Path.
1728    * <br>
1729    * Example...<br>
1730    * Key = 3944417774205889744  <br>
1731    * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1732    *
1733    * @param fs  The file system to use.
1734    * @param hbaseRootDir  The root directory to scan.
1735    * @return Map keyed by StoreFile name with a value of the full Path.
1736    * @throws IOException When scanning the directory fails.
1737    * @throws InterruptedException
1738    */
1739   public static Map<String, Path> getTableStoreFilePathMap(
1740     final FileSystem fs, final Path hbaseRootDir)
1741   throws IOException, InterruptedException {
1742     return getTableStoreFilePathMap(fs, hbaseRootDir, null, null, null);
1743   }
1744
1745   /**
1746    * Runs through the HBase rootdir and creates a reverse lookup map for
1747    * table StoreFile names to the full Path.
1748    * <br>
1749    * Example...<br>
1750    * Key = 3944417774205889744  <br>
1751    * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1752    *
1753    * @param fs  The file system to use.
1754    * @param hbaseRootDir  The root directory to scan.
1755    * @param sfFilter optional path filter to apply to store files
1756    * @param executor optional executor service to parallelize this operation
1757    * @param errors ErrorReporter instance or null
1758    * @return Map keyed by StoreFile name with a value of the full Path.
1759    * @throws IOException When scanning the directory fails.
1760    * @throws InterruptedException
1761    */
1762   public static Map<String, Path> getTableStoreFilePathMap(
1763     final FileSystem fs, final Path hbaseRootDir, PathFilter sfFilter,
1764     ExecutorService executor, ErrorReporter errors)
1765   throws IOException, InterruptedException {
1766     ConcurrentHashMap<String, Path> map = new ConcurrentHashMap<String, Path>(1024, 0.75f, 32);
1767
1768     // if this method looks similar to 'getTableFragmentation' that is because
1769     // it was borrowed from it.
1770
1771     // only include the directory paths to tables
1772     for (Path tableDir : FSUtils.getTableDirs(fs, hbaseRootDir)) {
1773       getTableStoreFilePathMap(map, fs, hbaseRootDir,
1774           FSUtils.getTableName(tableDir), sfFilter, executor, errors);
1775     }
1776     return map;
1777   }
1778
1779   /**
1780    * Filters FileStatuses in an array and returns a list
1781    *
1782    * @param input   An array of FileStatuses
1783    * @param filter  A required filter to filter the array
1784    * @return        A list of FileStatuses
1785    */
1786   public static List<FileStatus> filterFileStatuses(FileStatus[] input,
1787       FileStatusFilter filter) {
1788     if (input == null) return null;
1789     return filterFileStatuses(Iterators.forArray(input), filter);
1790   }
1791
1792   /**
1793    * Filters FileStatuses in an iterator and returns a list
1794    *
1795    * @param input   An iterator of FileStatuses
1796    * @param filter  A required filter to filter the array
1797    * @return        A list of FileStatuses
1798    */
1799   public static List<FileStatus> filterFileStatuses(Iterator<FileStatus> input,
1800       FileStatusFilter filter) {
1801     if (input == null) return null;
1802     ArrayList<FileStatus> results = new ArrayList<FileStatus>();
1803     while (input.hasNext()) {
1804       FileStatus f = input.next();
1805       if (filter.accept(f)) {
1806         results.add(f);
1807       }
1808     }
1809     return results;
1810   }
1811
1812   /**
1813    * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
1814    * This accommodates differences between hadoop versions, where hadoop 1
1815    * does not throw a FileNotFoundException, and return an empty FileStatus[]
1816    * while Hadoop 2 will throw FileNotFoundException.
1817    *
1818    * @param fs file system
1819    * @param dir directory
1820    * @param filter file status filter
1821    * @return null if dir is empty or doesn't exist, otherwise FileStatus list
1822    */
1823   public static List<FileStatus> listStatusWithStatusFilter(final FileSystem fs,
1824       final Path dir, final FileStatusFilter filter) throws IOException {
1825     FileStatus [] status = null;
1826     try {
1827       status = fs.listStatus(dir);
1828     } catch (FileNotFoundException fnfe) {
1829       // if directory doesn't exist, return null
1830       if (LOG.isTraceEnabled()) {
1831         LOG.trace(dir + " doesn't exist");
1832       }
1833     }
1834
1835     if (status == null || status.length < 1)  {
1836       return null;
1837     }
1838
1839     if (filter == null) {
1840       return Arrays.asList(status);
1841     } else {
1842       List<FileStatus> status2 = filterFileStatuses(status, filter);
1843       if (status2 == null || status2.isEmpty()) {
1844         return null;
1845       } else {
1846         return status2;
1847       }
1848     }
1849   }
1850
1851   /**
1852    * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
1853    * This accommodates differences between hadoop versions, where hadoop 1
1854    * does not throw a FileNotFoundException, and return an empty FileStatus[]
1855    * while Hadoop 2 will throw FileNotFoundException.
1856    *
1857    * Where possible, prefer {@link #listStatusWithStatusFilter(FileSystem,
1858    * Path, FileStatusFilter)} instead.
1859    *
1860    * @param fs file system
1861    * @param dir directory
1862    * @param filter path filter
1863    * @return null if dir is empty or doesn't exist, otherwise FileStatus array
1864    */
1865   public static FileStatus [] listStatus(final FileSystem fs,
1866       final Path dir, final PathFilter filter) throws IOException {
1867     FileStatus [] status = null;
1868     try {
1869       status = filter == null ? fs.listStatus(dir) : fs.listStatus(dir, filter);
1870     } catch (FileNotFoundException fnfe) {
1871       // if directory doesn't exist, return null
1872       if (LOG.isTraceEnabled()) {
1873         LOG.trace(dir + " doesn't exist");
1874       }
1875     }
1876     if (status == null || status.length < 1) return null;
1877     return status;
1878   }
1879
1880   /**
1881    * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
1882    * This would accommodates differences between hadoop versions
1883    *
1884    * @param fs file system
1885    * @param dir directory
1886    * @return null if dir is empty or doesn't exist, otherwise FileStatus array
1887    */
1888   public static FileStatus[] listStatus(final FileSystem fs, final Path dir) throws IOException {
1889     return listStatus(fs, dir, null);
1890   }
1891
1892   /**
1893    * Calls fs.delete() and returns the value returned by the fs.delete()
1894    *
1895    * @param fs
1896    * @param path
1897    * @param recursive
1898    * @return the value returned by the fs.delete()
1899    * @throws IOException
1900    */
1901   public static boolean delete(final FileSystem fs, final Path path, final boolean recursive)
1902       throws IOException {
1903     return fs.delete(path, recursive);
1904   }
1905
1906   /**
1907    * Calls fs.exists(). Checks if the specified path exists
1908    *
1909    * @param fs
1910    * @param path
1911    * @return the value returned by fs.exists()
1912    * @throws IOException
1913    */
1914   public static boolean isExists(final FileSystem fs, final Path path) throws IOException {
1915     return fs.exists(path);
1916   }
1917
1918   /**
1919    * Throw an exception if an action is not permitted by a user on a file.
1920    *
1921    * @param ugi
1922    *          the user
1923    * @param file
1924    *          the file
1925    * @param action
1926    *          the action
1927    */
1928   public static void checkAccess(UserGroupInformation ugi, FileStatus file,
1929       FsAction action) throws AccessDeniedException {
1930     if (ugi.getShortUserName().equals(file.getOwner())) {
1931       if (file.getPermission().getUserAction().implies(action)) {
1932         return;
1933       }
1934     } else if (contains(ugi.getGroupNames(), file.getGroup())) {
1935       if (file.getPermission().getGroupAction().implies(action)) {
1936         return;
1937       }
1938     } else if (file.getPermission().getOtherAction().implies(action)) {
1939       return;
1940     }
1941     throw new AccessDeniedException("Permission denied:" + " action=" + action
1942         + " path=" + file.getPath() + " user=" + ugi.getShortUserName());
1943   }
1944
1945   private static boolean contains(String[] groups, String user) {
1946     for (String group : groups) {
1947       if (group.equals(user)) {
1948         return true;
1949       }
1950     }
1951     return false;
1952   }
1953
1954   /**
1955    * Log the current state of the filesystem from a certain root directory
1956    * @param fs filesystem to investigate
1957    * @param root root file/directory to start logging from
1958    * @param LOG log to output information
1959    * @throws IOException if an unexpected exception occurs
1960    */
1961   public static void logFileSystemState(final FileSystem fs, final Path root, Log LOG)
1962       throws IOException {
1963     LOG.debug("Current file system:");
1964     logFSTree(LOG, fs, root, "|-");
1965   }
1966
1967   /**
1968    * Recursive helper to log the state of the FS
1969    *
1970    * @see #logFileSystemState(FileSystem, Path, Log)
1971    */
1972   private static void logFSTree(Log LOG, final FileSystem fs, final Path root, String prefix)
1973       throws IOException {
1974     FileStatus[] files = FSUtils.listStatus(fs, root, null);
1975     if (files == null) return;
1976
1977     for (FileStatus file : files) {
1978       if (file.isDirectory()) {
1979         LOG.debug(prefix + file.getPath().getName() + "/");
1980         logFSTree(LOG, fs, file.getPath(), prefix + "---");
1981       } else {
1982         LOG.debug(prefix + file.getPath().getName());
1983       }
1984     }
1985   }
1986
1987   public static boolean renameAndSetModifyTime(final FileSystem fs, final Path src, final Path dest)
1988       throws IOException {
1989     // set the modify time for TimeToLive Cleaner
1990     fs.setTimes(src, EnvironmentEdgeManager.currentTime(), -1);
1991     return fs.rename(src, dest);
1992   }
1993
1994   /**
1995    * This function is to scan the root path of the file system to get the
1996    * degree of locality for each region on each of the servers having at least
1997    * one block of that region.
1998    * This is used by the tool {@link org.apache.hadoop.hbase.master.RegionPlacementMaintainer}
1999    *
2000    * @param conf
2001    *          the configuration to use
2002    * @return the mapping from region encoded name to a map of server names to
2003    *           locality fraction
2004    * @throws IOException
2005    *           in case of file system errors or interrupts
2006    */
2007   public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
2008       final Configuration conf) throws IOException {
2009     return getRegionDegreeLocalityMappingFromFS(
2010         conf, null,
2011         conf.getInt(THREAD_POOLSIZE, DEFAULT_THREAD_POOLSIZE));
2012
2013   }
2014
2015   /**
2016    * This function is to scan the root path of the file system to get the
2017    * degree of locality for each region on each of the servers having at least
2018    * one block of that region.
2019    *
2020    * @param conf
2021    *          the configuration to use
2022    * @param desiredTable
2023    *          the table you wish to scan locality for
2024    * @param threadPoolSize
2025    *          the thread pool size to use
2026    * @return the mapping from region encoded name to a map of server names to
2027    *           locality fraction
2028    * @throws IOException
2029    *           in case of file system errors or interrupts
2030    */
2031   public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
2032       final Configuration conf, final String desiredTable, int threadPoolSize)
2033       throws IOException {
2034     Map<String, Map<String, Float>> regionDegreeLocalityMapping =
2035         new ConcurrentHashMap<String, Map<String, Float>>();
2036     getRegionLocalityMappingFromFS(conf, desiredTable, threadPoolSize, null,
2037         regionDegreeLocalityMapping);
2038     return regionDegreeLocalityMapping;
2039   }
2040
2041   /**
2042    * This function is to scan the root path of the file system to get either the
2043    * mapping between the region name and its best locality region server or the
2044    * degree of locality of each region on each of the servers having at least
2045    * one block of that region. The output map parameters are both optional.
2046    *
2047    * @param conf
2048    *          the configuration to use
2049    * @param desiredTable
2050    *          the table you wish to scan locality for
2051    * @param threadPoolSize
2052    *          the thread pool size to use
2053    * @param regionToBestLocalityRSMapping
2054    *          the map into which to put the best locality mapping or null
2055    * @param regionDegreeLocalityMapping
2056    *          the map into which to put the locality degree mapping or null,
2057    *          must be a thread-safe implementation
2058    * @throws IOException
2059    *           in case of file system errors or interrupts
2060    */
2061   private static void getRegionLocalityMappingFromFS(
2062       final Configuration conf, final String desiredTable,
2063       int threadPoolSize,
2064       Map<String, String> regionToBestLocalityRSMapping,
2065       Map<String, Map<String, Float>> regionDegreeLocalityMapping)
2066       throws IOException {
2067     FileSystem fs =  FileSystem.get(conf);
2068     Path rootPath = FSUtils.getRootDir(conf);
2069     long startTime = EnvironmentEdgeManager.currentTime();
2070     Path queryPath;
2071     // The table files are in ${hbase.rootdir}/data/<namespace>/<table>/*
2072     if (null == desiredTable) {
2073       queryPath = new Path(new Path(rootPath, HConstants.BASE_NAMESPACE_DIR).toString() + "/*/*/*/");
2074     } else {
2075       queryPath = new Path(FSUtils.getTableDir(rootPath, TableName.valueOf(desiredTable)).toString() + "/*/");
2076     }
2077
2078     // reject all paths that are not appropriate
2079     PathFilter pathFilter = new PathFilter() {
2080       @Override
2081       public boolean accept(Path path) {
2082         // this is the region name; it may get some noise data
2083         if (null == path) {
2084           return false;
2085         }
2086
2087         // no parent?
2088         Path parent = path.getParent();
2089         if (null == parent) {
2090           return false;
2091         }
2092
2093         String regionName = path.getName();
2094         if (null == regionName) {
2095           return false;
2096         }
2097
2098         if (!regionName.toLowerCase(Locale.ROOT).matches("[0-9a-f]+")) {
2099           return false;
2100         }
2101         return true;
2102       }
2103     };
2104
2105     FileStatus[] statusList = fs.globStatus(queryPath, pathFilter);
2106
2107     if (null == statusList) {
2108       return;
2109     } else {
2110       LOG.debug("Query Path: " + queryPath + " ; # list of files: " +
2111           statusList.length);
2112     }
2113
2114     // lower the number of threads in case we have very few expected regions
2115     threadPoolSize = Math.min(threadPoolSize, statusList.length);
2116
2117     // run in multiple threads
2118     ThreadPoolExecutor tpe = new ThreadPoolExecutor(threadPoolSize,
2119         threadPoolSize, 60, TimeUnit.SECONDS,
2120         new ArrayBlockingQueue<Runnable>(statusList.length));
2121     try {
2122       // ignore all file status items that are not of interest
2123       for (FileStatus regionStatus : statusList) {
2124         if (null == regionStatus) {
2125           continue;
2126         }
2127
2128         if (!regionStatus.isDirectory()) {
2129           continue;
2130         }
2131
2132         Path regionPath = regionStatus.getPath();
2133         if (null == regionPath) {
2134           continue;
2135         }
2136
2137         tpe.execute(new FSRegionScanner(fs, regionPath,
2138             regionToBestLocalityRSMapping, regionDegreeLocalityMapping));
2139       }
2140     } finally {
2141       tpe.shutdown();
2142       int threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY,
2143           60 * 1000);
2144       try {
2145         // here we wait until TPE terminates, which is either naturally or by
2146         // exceptions in the execution of the threads
2147         while (!tpe.awaitTermination(threadWakeFrequency,
2148             TimeUnit.MILLISECONDS)) {
2149           // printing out rough estimate, so as to not introduce
2150           // AtomicInteger
2151           LOG.info("Locality checking is underway: { Scanned Regions : "
2152               + tpe.getCompletedTaskCount() + "/"
2153               + tpe.getTaskCount() + " }");
2154         }
2155       } catch (InterruptedException e) {
2156         throw (InterruptedIOException)new InterruptedIOException().initCause(e);
2157       }
2158     }
2159
2160     long overhead = EnvironmentEdgeManager.currentTime() - startTime;
2161     String overheadMsg = "Scan DFS for locality info takes " + overhead + " ms";
2162
2163     LOG.info(overheadMsg);
2164   }
2165
2166   /**
2167    * Do our short circuit read setup.
2168    * Checks buffer size to use and whether to do checksumming in hbase or hdfs.
2169    * @param conf
2170    */
2171   public static void setupShortCircuitRead(final Configuration conf) {
2172     // Check that the user has not set the "dfs.client.read.shortcircuit.skip.checksum" property.
2173     boolean shortCircuitSkipChecksum =
2174       conf.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false);
2175     boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
2176     if (shortCircuitSkipChecksum) {
2177       LOG.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not " +
2178         "be set to true." + (useHBaseChecksum ? " HBase checksum doesn't require " +
2179         "it, see https://issues.apache.org/jira/browse/HBASE-6868." : ""));
2180       assert !shortCircuitSkipChecksum; //this will fail if assertions are on
2181     }
2182     checkShortCircuitReadBufferSize(conf);
2183   }
2184
2185   /**
2186    * Check if short circuit read buffer size is set and if not, set it to hbase value.
2187    * @param conf
2188    */
2189   public static void checkShortCircuitReadBufferSize(final Configuration conf) {
2190     final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2;
2191     final int notSet = -1;
2192     // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2
2193     final String dfsKey = "dfs.client.read.shortcircuit.buffer.size";
2194     int size = conf.getInt(dfsKey, notSet);
2195     // If a size is set, return -- we will use it.
2196     if (size != notSet) return;
2197     // But short circuit buffer size is normally not set.  Put in place the hbase wanted size.
2198     int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize);
2199     conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
2200   }
2201
2202   /**
2203    * @param c
2204    * @return The DFSClient DFSHedgedReadMetrics instance or null if can't be found or not on hdfs.
2205    * @throws IOException
2206    */
2207   public static DFSHedgedReadMetrics getDFSHedgedReadMetrics(final Configuration c)
2208       throws IOException {
2209     if (!isHDFS(c)) return null;
2210     // getHedgedReadMetrics is package private. Get the DFSClient instance that is internal
2211     // to the DFS FS instance and make the method getHedgedReadMetrics accessible, then invoke it
2212     // to get the singleton instance of DFSHedgedReadMetrics shared by DFSClients.
2213     final String name = "getHedgedReadMetrics";
2214     DFSClient dfsclient = ((DistributedFileSystem)FileSystem.get(c)).getClient();
2215     Method m;
2216     try {
2217       m = dfsclient.getClass().getDeclaredMethod(name);
2218     } catch (NoSuchMethodException e) {
2219       LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " +
2220           e.getMessage());
2221       return null;
2222     } catch (SecurityException e) {
2223       LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " +
2224           e.getMessage());
2225       return null;
2226     }
2227     m.setAccessible(true);
2228     try {
2229       return (DFSHedgedReadMetrics)m.invoke(dfsclient);
2230     } catch (IllegalAccessException e) {
2231       LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " +
2232           e.getMessage());
2233       return null;
2234     } catch (IllegalArgumentException e) {
2235       LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " +
2236           e.getMessage());
2237       return null;
2238     } catch (InvocationTargetException e) {
2239       LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " +
2240           e.getMessage());
2241       return null;
2242     }
2243   }
2244 }