View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.util;
20  
21  import java.io.ByteArrayInputStream;
22  import java.io.DataInputStream;
23  import java.io.EOFException;
24  import java.io.FileNotFoundException;
25  import java.io.IOException;
26  import java.io.InputStream;
27  import java.io.InterruptedIOException;
28  import java.lang.reflect.InvocationTargetException;
29  import java.lang.reflect.Method;
30  import java.net.InetSocketAddress;
31  import java.net.URI;
32  import java.net.URISyntaxException;
33  import java.util.ArrayList;
34  import java.util.Arrays;
35  import java.util.Collections;
36  import java.util.HashMap;
37  import java.util.Iterator;
38  import java.util.LinkedList;
39  import java.util.List;
40  import java.util.Locale;
41  import java.util.Map;
42  import java.util.Vector;
43  import java.util.concurrent.ArrayBlockingQueue;
44  import java.util.concurrent.ConcurrentHashMap;
45  import java.util.concurrent.ExecutionException;
46  import java.util.concurrent.ExecutorService;
47  import java.util.concurrent.Future;
48  import java.util.concurrent.FutureTask;
49  import java.util.concurrent.ThreadPoolExecutor;
50  import java.util.concurrent.TimeUnit;
51  import java.util.regex.Pattern;
52
53  import org.apache.commons.logging.Log;
54  import org.apache.commons.logging.LogFactory;
55  import org.apache.hadoop.hbase.classification.InterfaceAudience;
56  import org.apache.hadoop.HadoopIllegalArgumentException;
57  import org.apache.hadoop.conf.Configuration;
58  import org.apache.hadoop.fs.BlockLocation;
59  import org.apache.hadoop.fs.FSDataInputStream;
60  import org.apache.hadoop.fs.FSDataOutputStream;
61  import org.apache.hadoop.fs.FileStatus;
62  import org.apache.hadoop.fs.FileSystem;
63  import org.apache.hadoop.fs.Path;
64  import org.apache.hadoop.fs.PathFilter;
65  import org.apache.hadoop.fs.permission.FsAction;
66  import org.apache.hadoop.fs.permission.FsPermission;
67  import org.apache.hadoop.hbase.ClusterId;
68  import org.apache.hadoop.hbase.HColumnDescriptor;
69  import org.apache.hadoop.hbase.HConstants;
70  import org.apache.hadoop.hbase.HDFSBlocksDistribution;
71  import org.apache.hadoop.hbase.HRegionInfo;
72  import org.apache.hadoop.hbase.TableName;
73  import org.apache.hadoop.hbase.exceptions.DeserializationException;
74  import org.apache.hadoop.hbase.fs.HFileSystem;
75  import org.apache.hadoop.hbase.master.HMaster;
76  import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
77  import org.apache.hadoop.hbase.security.AccessDeniedException;
78  import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter;
79  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
80  import org.apache.hadoop.hbase.protobuf.generated.FSProtos;
81  import org.apache.hadoop.hbase.regionserver.HRegion;
82  import org.apache.hadoop.hdfs.DFSClient;
83  import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
84  import org.apache.hadoop.hdfs.DistributedFileSystem;
85  import org.apache.hadoop.io.IOUtils;
86  import org.apache.hadoop.ipc.RemoteException;
87  import org.apache.hadoop.security.UserGroupInformation;
88  import org.apache.hadoop.util.Progressable;
89  import org.apache.hadoop.util.ReflectionUtils;
90  import org.apache.hadoop.util.StringUtils;
91
92  import com.google.common.base.Throwables;
93  import com.google.common.collect.Iterators;
94  import com.google.common.primitives.Ints;
95
96  import edu.umd.cs.findbugs.annotations.CheckForNull;
97
98  /**
99   * Utility methods for interacting with the underlying file system.
100  */
101 @InterfaceAudience.Private
102 public abstract class FSUtils {
103   private static final Log LOG = LogFactory.getLog(FSUtils.class);
104
105   /** Full access permissions (starting point for a umask) */
106   public static final String FULL_RWX_PERMISSIONS = "777";
107   private static final String THREAD_POOLSIZE = "hbase.client.localityCheck.threadPoolSize";
108   private static final int DEFAULT_THREAD_POOLSIZE = 2;
109
110   /** Set to true on Windows platforms */
111   public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
112
113   protected FSUtils() {
114     super();
115   }
116
117   /**
118    * Sets storage policy for given path according to config setting.
119    * If the passed path is a directory, we'll set the storage policy for all files
120    * created in the future in said directory. Note that this change in storage
121    * policy takes place at the HDFS level; it will persist beyond this RS's lifecycle.
122    * If we're running on a version of HDFS that doesn't support the given storage policy
123    * (or storage policies at all), then we'll issue a log message and continue.
124    *
125    * See http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html
126    *
127    * @param fs We only do anything if an instance of DistributedFileSystem
128    * @param conf used to look up storage policy with given key; not modified.
129    * @param path the Path whose storage policy is to be set
130    * @param policyKey e.g. HConstants.WAL_STORAGE_POLICY
131    * @param defaultPolicy usually should be the policy NONE to delegate to HDFS
132    */
133   public static void setStoragePolicy(final FileSystem fs, final Configuration conf,
134       final Path path, final String policyKey, final String defaultPolicy) {
135     String storagePolicy = conf.get(policyKey, defaultPolicy).toUpperCase(Locale.ROOT);
136     if (storagePolicy.equals(defaultPolicy)) {
137       if (LOG.isTraceEnabled()) {
138         LOG.trace("default policy of " + defaultPolicy + " requested, exiting early.");
139       }
140       return;
141     }
142     if (fs instanceof DistributedFileSystem) {
143       DistributedFileSystem dfs = (DistributedFileSystem)fs;
144       // Once our minimum supported Hadoop version is 2.6.0 we can remove reflection.
145       Class<? extends DistributedFileSystem> dfsClass = dfs.getClass();
146       Method m = null;
147       try {
148         m = dfsClass.getDeclaredMethod("setStoragePolicy",
149             new Class<?>[] { Path.class, String.class });
150         m.setAccessible(true);
151       } catch (NoSuchMethodException e) {
152         LOG.info("FileSystem doesn't support"
153             + " setStoragePolicy; --HDFS-6584 not available");
154       } catch (SecurityException e) {
155         LOG.info("Doesn't have access to setStoragePolicy on "
156             + "FileSystems --HDFS-6584 not available", e);
157         m = null; // could happen on setAccessible()
158       }
159       if (m != null) {
160         try {
161           m.invoke(dfs, path, storagePolicy);
162           LOG.info("set " + storagePolicy + " for " + path);
163         } catch (Exception e) {
164           // check for lack of HDFS-7228
165           boolean probablyBadPolicy = false;
166           if (e instanceof InvocationTargetException) {
167             final Throwable exception = e.getCause();
168             if (exception instanceof RemoteException &&
169                 HadoopIllegalArgumentException.class.getName().equals(
170                     ((RemoteException)exception).getClassName())) {
171               LOG.warn("Given storage policy, '" + storagePolicy + "', was rejected and probably " +
172                   "isn't a valid policy for the version of Hadoop you're running. I.e. if you're " +
173                   "trying to use SSD related policies then you're likely missing HDFS-7228. For " +
174                   "more information see the 'ArchivalStorage' docs for your Hadoop release.");
175               LOG.debug("More information about the invalid storage policy.", exception);
176               probablyBadPolicy = true;
177             }
178           }
179           if (!probablyBadPolicy) {
180             // This swallows FNFE, should we be throwing it? seems more likely to indicate dev
181             // misuse than a runtime problem with HDFS.
182             LOG.warn("Unable to set " + storagePolicy + " for " + path, e);
183           }
184         }
185       }
186     } else {
187       LOG.info("FileSystem isn't an instance of DistributedFileSystem; presuming it doesn't " +
188           "support setStoragePolicy.");
189     }
190   }
191
192   /**
193    * Compare of path component. Does not consider schema; i.e. if schemas
194    * different but <code>path</code> starts with <code>rootPath</code>,
195    * then the function returns true
196    * @param rootPath
197    * @param path
198    * @return True if <code>path</code> starts with <code>rootPath</code>
199    */
200   public static boolean isStartingWithPath(final Path rootPath, final String path) {
201     String uriRootPath = rootPath.toUri().getPath();
202     String tailUriPath = (new Path(path)).toUri().getPath();
203     return tailUriPath.startsWith(uriRootPath);
204   }
205
206   /**
207    * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
208    * '/a/b/c' part. Does not consider schema; i.e. if schemas different but path or subpath matches,
209    * the two will equate.
210    * @param pathToSearch Path we will be trying to match.
211    * @param pathTail
212    * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
213    */
214   public static boolean isMatchingTail(final Path pathToSearch, String pathTail) {
215     return isMatchingTail(pathToSearch, new Path(pathTail));
216   }
217
218   /**
219    * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
220    * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true.  Does not consider
221    * schema; i.e. if schemas different but path or subpath matches, the two will equate.
222    * @param pathToSearch Path we will be trying to match.
223    * @param pathTail
224    * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
225    */
226   public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) {
227     if (pathToSearch.depth() != pathTail.depth()) return false;
228     Path tailPath = pathTail;
229     String tailName;
230     Path toSearch = pathToSearch;
231     String toSearchName;
232     boolean result = false;
233     do {
234       tailName = tailPath.getName();
235       if (tailName == null || tailName.length() <= 0) {
236         result = true;
237         break;
238       }
239       toSearchName = toSearch.getName();
240       if (toSearchName == null || toSearchName.length() <= 0) break;
241       // Move up a parent on each path for next go around.  Path doesn't let us go off the end.
242       tailPath = tailPath.getParent();
243       toSearch = toSearch.getParent();
244     } while(tailName.equals(toSearchName));
245     return result;
246   }
247
248   public static FSUtils getInstance(FileSystem fs, Configuration conf) {
249     String scheme = fs.getUri().getScheme();
250     if (scheme == null) {
251       LOG.warn("Could not find scheme for uri " +
252           fs.getUri() + ", default to hdfs");
253       scheme = "hdfs";
254     }
255     Class<?> fsUtilsClass = conf.getClass("hbase.fsutil." +
256         scheme + ".impl", FSHDFSUtils.class); // Default to HDFS impl
257     FSUtils fsUtils = (FSUtils)ReflectionUtils.newInstance(fsUtilsClass, conf);
258     return fsUtils;
259   }
260
261   /**
262    * Delete if exists.
263    * @param fs filesystem object
264    * @param dir directory to delete
265    * @return True if deleted <code>dir</code>
266    * @throws IOException e
267    */
268   public static boolean deleteDirectory(final FileSystem fs, final Path dir)
269   throws IOException {
270     return fs.exists(dir) && fs.delete(dir, true);
271   }
272
273   /**
274    * Delete the region directory if exists.
275    * @param conf
276    * @param hri
277    * @return True if deleted the region directory.
278    * @throws IOException
279    */
280   public static boolean deleteRegionDir(final Configuration conf, final HRegionInfo hri)
281   throws IOException {
282     Path rootDir = getRootDir(conf);
283     FileSystem fs = rootDir.getFileSystem(conf);
284     return deleteDirectory(fs,
285       new Path(getTableDir(rootDir, hri.getTable()), hri.getEncodedName()));
286   }
287
288   /**
289    * Return the number of bytes that large input files should be optimally
290    * be split into to minimize i/o time.
291    *
292    * use reflection to search for getDefaultBlockSize(Path f)
293    * if the method doesn't exist, fall back to using getDefaultBlockSize()
294    *
295    * @param fs filesystem object
296    * @return the default block size for the path's filesystem
297    * @throws IOException e
298    */
299   public static long getDefaultBlockSize(final FileSystem fs, final Path path) throws IOException {
300     Method m = null;
301     Class<? extends FileSystem> cls = fs.getClass();
302     try {
303       m = cls.getMethod("getDefaultBlockSize", new Class<?>[] { Path.class });
304     } catch (NoSuchMethodException e) {
305       LOG.info("FileSystem doesn't support getDefaultBlockSize");
306     } catch (SecurityException e) {
307       LOG.info("Doesn't have access to getDefaultBlockSize on FileSystems", e);
308       m = null; // could happen on setAccessible()
309     }
310     if (m == null) {
311       return fs.getDefaultBlockSize(path);
312     } else {
313       try {
314         Object ret = m.invoke(fs, path);
315         return ((Long)ret).longValue();
316       } catch (Exception e) {
317         throw new IOException(e);
318       }
319     }
320   }
321
322   /*
323    * Get the default replication.
324    *
325    * use reflection to search for getDefaultReplication(Path f)
326    * if the method doesn't exist, fall back to using getDefaultReplication()
327    *
328    * @param fs filesystem object
329    * @param f path of file
330    * @return default replication for the path's filesystem
331    * @throws IOException e
332    */
333   public static short getDefaultReplication(final FileSystem fs, final Path path) throws IOException {
334     Method m = null;
335     Class<? extends FileSystem> cls = fs.getClass();
336     try {
337       m = cls.getMethod("getDefaultReplication", new Class<?>[] { Path.class });
338     } catch (NoSuchMethodException e) {
339       LOG.info("FileSystem doesn't support getDefaultReplication");
340     } catch (SecurityException e) {
341       LOG.info("Doesn't have access to getDefaultReplication on FileSystems", e);
342       m = null; // could happen on setAccessible()
343     }
344     if (m == null) {
345       return fs.getDefaultReplication(path);
346     } else {
347       try {
348         Object ret = m.invoke(fs, path);
349         return ((Number)ret).shortValue();
350       } catch (Exception e) {
351         throw new IOException(e);
352       }
353     }
354   }
355
356   /**
357    * Returns the default buffer size to use during writes.
358    *
359    * The size of the buffer should probably be a multiple of hardware
360    * page size (4096 on Intel x86), and it determines how much data is
361    * buffered during read and write operations.
362    *
363    * @param fs filesystem object
364    * @return default buffer size to use during writes
365    */
366   public static int getDefaultBufferSize(final FileSystem fs) {
367     return fs.getConf().getInt("io.file.buffer.size", 4096);
368   }
369
370   /**
371    * Create the specified file on the filesystem. By default, this will:
372    * <ol>
373    * <li>overwrite the file if it exists</li>
374    * <li>apply the umask in the configuration (if it is enabled)</li>
375    * <li>use the fs configured buffer size (or 4096 if not set)</li>
376    * <li>use the configured column family replication or default replication if
377    * {@link HColumnDescriptor#DEFAULT_DFS_REPLICATION}</li>
378    * <li>use the default block size</li>
379    * <li>not track progress</li>
380    * </ol>
381    * @param conf configurations
382    * @param fs {@link FileSystem} on which to write the file
383    * @param path {@link Path} to the file to write
384    * @param perm permissions
385    * @param favoredNodes
386    * @return output stream to the created file
387    * @throws IOException if the file cannot be created
388    */
389   public static FSDataOutputStream create(Configuration conf, FileSystem fs, Path path,
390       FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException {
391     if (fs instanceof HFileSystem) {
392       FileSystem backingFs = ((HFileSystem)fs).getBackingFs();
393       if (backingFs instanceof DistributedFileSystem) {
394         // Try to use the favoredNodes version via reflection to allow backwards-
395         // compatibility.
396         short replication = Short.parseShort(conf.get(HColumnDescriptor.DFS_REPLICATION,
397           String.valueOf(HColumnDescriptor.DEFAULT_DFS_REPLICATION)));
398         try {
399           return (FSDataOutputStream) (DistributedFileSystem.class.getDeclaredMethod("create",
400             Path.class, FsPermission.class, boolean.class, int.class, short.class, long.class,
401             Progressable.class, InetSocketAddress[].class).invoke(backingFs, path, perm, true,
402             getDefaultBufferSize(backingFs),
403             replication > 0 ? replication : getDefaultReplication(backingFs, path),
404             getDefaultBlockSize(backingFs, path), null, favoredNodes));
405         } catch (InvocationTargetException ite) {
406           // Function was properly called, but threw it's own exception.
407           throw new IOException(ite.getCause());
408         } catch (NoSuchMethodException e) {
409           LOG.debug("DFS Client does not support most favored nodes create; using default create");
410           if (LOG.isTraceEnabled()) LOG.trace("Ignoring; use default create", e);
411         } catch (IllegalArgumentException e) {
412           LOG.debug("Ignoring (most likely Reflection related exception) " + e);
413         } catch (SecurityException e) {
414           LOG.debug("Ignoring (most likely Reflection related exception) " + e);
415         } catch (IllegalAccessException e) {
416           LOG.debug("Ignoring (most likely Reflection related exception) " + e);
417         }
418       }
419     }
420     return create(fs, path, perm, true);
421   }
422
423   /**
424    * Create the specified file on the filesystem. By default, this will:
425    * <ol>
426    * <li>apply the umask in the configuration (if it is enabled)</li>
427    * <li>use the fs configured buffer size (or 4096 if not set)</li>
428    * <li>use the default replication</li>
429    * <li>use the default block size</li>
430    * <li>not track progress</li>
431    * </ol>
432    *
433    * @param fs {@link FileSystem} on which to write the file
434    * @param path {@link Path} to the file to write
435    * @param perm
436    * @param overwrite Whether or not the created file should be overwritten.
437    * @return output stream to the created file
438    * @throws IOException if the file cannot be created
439    */
440   public static FSDataOutputStream create(FileSystem fs, Path path,
441       FsPermission perm, boolean overwrite) throws IOException {
442     if (LOG.isTraceEnabled()) {
443       LOG.trace("Creating file=" + path + " with permission=" + perm + ", overwrite=" + overwrite);
444     }
445     return fs.create(path, perm, overwrite, getDefaultBufferSize(fs),
446         getDefaultReplication(fs, path), getDefaultBlockSize(fs, path), null);
447   }
448
449   /**
450    * Get the file permissions specified in the configuration, if they are
451    * enabled.
452    *
453    * @param fs filesystem that the file will be created on.
454    * @param conf configuration to read for determining if permissions are
455    *          enabled and which to use
456    * @param permssionConfKey property key in the configuration to use when
457    *          finding the permission
458    * @return the permission to use when creating a new file on the fs. If
459    *         special permissions are not specified in the configuration, then
460    *         the default permissions on the the fs will be returned.
461    */
462   public static FsPermission getFilePermissions(final FileSystem fs,
463       final Configuration conf, final String permssionConfKey) {
464     boolean enablePermissions = conf.getBoolean(
465         HConstants.ENABLE_DATA_FILE_UMASK, false);
466
467     if (enablePermissions) {
468       try {
469         FsPermission perm = new FsPermission(FULL_RWX_PERMISSIONS);
470         // make sure that we have a mask, if not, go default.
471         String mask = conf.get(permssionConfKey);
472         if (mask == null)
473           return FsPermission.getFileDefault();
474         // appy the umask
475         FsPermission umask = new FsPermission(mask);
476         return perm.applyUMask(umask);
477       } catch (IllegalArgumentException e) {
478         LOG.warn(
479             "Incorrect umask attempted to be created: "
480                 + conf.get(permssionConfKey)
481                 + ", using default file permissions.", e);
482         return FsPermission.getFileDefault();
483       }
484     }
485     return FsPermission.getFileDefault();
486   }
487
488   /**
489    * Checks to see if the specified file system is available
490    *
491    * @param fs filesystem
492    * @throws IOException e
493    */
494   public static void checkFileSystemAvailable(final FileSystem fs)
495   throws IOException {
496     if (!(fs instanceof DistributedFileSystem)) {
497       return;
498     }
499     IOException exception = null;
500     DistributedFileSystem dfs = (DistributedFileSystem) fs;
501     try {
502       if (dfs.exists(new Path("/"))) {
503         return;
504       }
505     } catch (IOException e) {
506       exception = e instanceof RemoteException ?
507               ((RemoteException)e).unwrapRemoteException() : e;
508     }
509     try {
510       fs.close();
511     } catch (Exception e) {
512       LOG.error("file system close failed: ", e);
513     }
514     IOException io = new IOException("File system is not available");
515     io.initCause(exception);
516     throw io;
517   }
518
519   /**
520    * We use reflection because {@link DistributedFileSystem#setSafeMode(
521    * HdfsConstants.SafeModeAction action, boolean isChecked)} is not in hadoop 1.1
522    *
523    * @param dfs
524    * @return whether we're in safe mode
525    * @throws IOException
526    */
527   private static boolean isInSafeMode(DistributedFileSystem dfs) throws IOException {
528     boolean inSafeMode = false;
529     try {
530       Method m = DistributedFileSystem.class.getMethod("setSafeMode", new Class<?> []{
531           org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.class, boolean.class});
532       inSafeMode = (Boolean) m.invoke(dfs,
533         org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET, true);
534     } catch (Exception e) {
535       if (e instanceof IOException) throw (IOException) e;
536
537       // Check whether dfs is on safemode.
538       inSafeMode = dfs.setSafeMode(
539         org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET);
540     }
541     return inSafeMode;
542   }
543
544   /**
545    * Check whether dfs is in safemode.
546    * @param conf
547    * @throws IOException
548    */
549   public static void checkDfsSafeMode(final Configuration conf)
550   throws IOException {
551     boolean isInSafeMode = false;
552     FileSystem fs = FileSystem.get(conf);
553     if (fs instanceof DistributedFileSystem) {
554       DistributedFileSystem dfs = (DistributedFileSystem)fs;
555       isInSafeMode = isInSafeMode(dfs);
556     }
557     if (isInSafeMode) {
558       throw new IOException("File system is in safemode, it can't be written now");
559     }
560   }
561
562   /**
563    * Verifies current version of file system
564    *
565    * @param fs filesystem object
566    * @param rootdir root hbase directory
567    * @return null if no version file exists, version string otherwise.
568    * @throws IOException e
569    * @throws org.apache.hadoop.hbase.exceptions.DeserializationException
570    */
571   public static String getVersion(FileSystem fs, Path rootdir)
572   throws IOException, DeserializationException {
573     Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
574     FileStatus[] status = null;
575     try {
576       // hadoop 2.0 throws FNFE if directory does not exist.
577       // hadoop 1.0 returns null if directory does not exist.
578       status = fs.listStatus(versionFile);
579     } catch (FileNotFoundException fnfe) {
580       return null;
581     }
582     if (status == null || status.length == 0) return null;
583     String version = null;
584     byte [] content = new byte [(int)status[0].getLen()];
585     FSDataInputStream s = fs.open(versionFile);
586     try {
587       IOUtils.readFully(s, content, 0, content.length);
588       if (ProtobufUtil.isPBMagicPrefix(content)) {
589         version = parseVersionFrom(content);
590       } else {
591         // Presume it pre-pb format.
592         InputStream is = new ByteArrayInputStream(content);
593         DataInputStream dis = new DataInputStream(is);
594         try {
595           version = dis.readUTF();
596         } finally {
597           dis.close();
598         }
599       }
600     } catch (EOFException eof) {
601       LOG.warn("Version file was empty, odd, will try to set it.");
602     } finally {
603       s.close();
604     }
605     return version;
606   }
607
608   /**
609    * Parse the content of the ${HBASE_ROOTDIR}/hbase.version file.
610    * @param bytes The byte content of the hbase.version file.
611    * @return The version found in the file as a String.
612    * @throws DeserializationException
613    */
614   static String parseVersionFrom(final byte [] bytes)
615   throws DeserializationException {
616     ProtobufUtil.expectPBMagicPrefix(bytes);
617     int pblen = ProtobufUtil.lengthOfPBMagic();
618     FSProtos.HBaseVersionFileContent.Builder builder =
619       FSProtos.HBaseVersionFileContent.newBuilder();
620     try {
621       ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
622       return builder.getVersion();
623     } catch (IOException e) {
624       // Convert
625       throw new DeserializationException(e);
626     }
627   }
628
629   /**
630    * Create the content to write into the ${HBASE_ROOTDIR}/hbase.version file.
631    * @param version Version to persist
632    * @return Serialized protobuf with <code>version</code> content and a bit of pb magic for a prefix.
633    */
634   static byte [] toVersionByteArray(final String version) {
635     FSProtos.HBaseVersionFileContent.Builder builder =
636       FSProtos.HBaseVersionFileContent.newBuilder();
637     return ProtobufUtil.prependPBMagic(builder.setVersion(version).build().toByteArray());
638   }
639
640   /**
641    * Verifies current version of file system
642    *
643    * @param fs file system
644    * @param rootdir root directory of HBase installation
645    * @param message if true, issues a message on System.out
646    *
647    * @throws IOException e
648    * @throws DeserializationException
649    */
650   public static void checkVersion(FileSystem fs, Path rootdir, boolean message)
651   throws IOException, DeserializationException {
652     checkVersion(fs, rootdir, message, 0, HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
653   }
654
655   /**
656    * Verifies current version of file system
657    *
658    * @param fs file system
659    * @param rootdir root directory of HBase installation
660    * @param message if true, issues a message on System.out
661    * @param wait wait interval
662    * @param retries number of times to retry
663    *
664    * @throws IOException e
665    * @throws DeserializationException
666    */
667   public static void checkVersion(FileSystem fs, Path rootdir,
668       boolean message, int wait, int retries)
669   throws IOException, DeserializationException {
670     String version = getVersion(fs, rootdir);
671     if (version == null) {
672       if (!metaRegionExists(fs, rootdir)) {
673         // rootDir is empty (no version file and no root region)
674         // just create new version file (HBASE-1195)
675         setVersion(fs, rootdir, wait, retries);
676         return;
677       }
678     } else if (version.compareTo(HConstants.FILE_SYSTEM_VERSION) == 0) return;
679
680     // version is deprecated require migration
681     // Output on stdout so user sees it in terminal.
682     String msg = "HBase file layout needs to be upgraded."
683       + " You have version " + version
684       + " and I want version " + HConstants.FILE_SYSTEM_VERSION
685       + ". Consult http://hbase.apache.org/book.html for further information about upgrading HBase."
686       + " Is your hbase.rootdir valid? If so, you may need to run "
687       + "'hbase hbck -fixVersionFile'.";
688     if (message) {
689       System.out.println("WARNING! " + msg);
690     }
691     throw new FileSystemVersionException(msg);
692   }
693
694   /**
695    * Sets version of file system
696    *
697    * @param fs filesystem object
698    * @param rootdir hbase root
699    * @throws IOException e
700    */
701   public static void setVersion(FileSystem fs, Path rootdir)
702   throws IOException {
703     setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, 0,
704       HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
705   }
706
707   /**
708    * Sets version of file system
709    *
710    * @param fs filesystem object
711    * @param rootdir hbase root
712    * @param wait time to wait for retry
713    * @param retries number of times to retry before failing
714    * @throws IOException e
715    */
716   public static void setVersion(FileSystem fs, Path rootdir, int wait, int retries)
717   throws IOException {
718     setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, wait, retries);
719   }
720
721
722   /**
723    * Sets version of file system
724    *
725    * @param fs filesystem object
726    * @param rootdir hbase root directory
727    * @param version version to set
728    * @param wait time to wait for retry
729    * @param retries number of times to retry before throwing an IOException
730    * @throws IOException e
731    */
732   public static void setVersion(FileSystem fs, Path rootdir, String version,
733       int wait, int retries) throws IOException {
734     Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
735     Path tempVersionFile = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY + Path.SEPARATOR +
736       HConstants.VERSION_FILE_NAME);
737     while (true) {
738       try {
739         // Write the version to a temporary file
740         FSDataOutputStream s = fs.create(tempVersionFile);
741         try {
742           s.write(toVersionByteArray(version));
743           s.close();
744           s = null;
745           // Move the temp version file to its normal location. Returns false
746           // if the rename failed. Throw an IOE in that case.
747           if (!fs.rename(tempVersionFile, versionFile)) {
748             throw new IOException("Unable to move temp version file to " + versionFile);
749           }
750         } finally {
751           // Cleaning up the temporary if the rename failed would be trying
752           // too hard. We'll unconditionally create it again the next time
753           // through anyway, files are overwritten by default by create().
754
755           // Attempt to close the stream on the way out if it is still open.
756           try {
757             if (s != null) s.close();
758           } catch (IOException ignore) { }
759         }
760         LOG.info("Created version file at " + rootdir.toString() + " with version=" + version);
761         return;
762       } catch (IOException e) {
763         if (retries > 0) {
764           LOG.debug("Unable to create version file at " + rootdir.toString() + ", retrying", e);
765           fs.delete(versionFile, false);
766           try {
767             if (wait > 0) {
768               Thread.sleep(wait);
769             }
770           } catch (InterruptedException ie) {
771             throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
772           }
773           retries--;
774         } else {
775           throw e;
776         }
777       }
778     }
779   }
780
781   /**
782    * Checks that a cluster ID file exists in the HBase root directory
783    * @param fs the root directory FileSystem
784    * @param rootdir the HBase root directory in HDFS
785    * @param wait how long to wait between retries
786    * @return <code>true</code> if the file exists, otherwise <code>false</code>
787    * @throws IOException if checking the FileSystem fails
788    */
789   public static boolean checkClusterIdExists(FileSystem fs, Path rootdir,
790       int wait) throws IOException {
791     while (true) {
792       try {
793         Path filePath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
794         return fs.exists(filePath);
795       } catch (IOException ioe) {
796         if (wait > 0) {
797           LOG.warn("Unable to check cluster ID file in " + rootdir.toString() +
798               ", retrying in "+wait+"msec: "+StringUtils.stringifyException(ioe));
799           try {
800             Thread.sleep(wait);
801           } catch (InterruptedException e) {
802             throw (InterruptedIOException)new InterruptedIOException().initCause(e);
803           }
804         } else {
805           throw ioe;
806         }
807       }
808     }
809   }
810
811   /**
812    * Returns the value of the unique cluster ID stored for this HBase instance.
813    * @param fs the root directory FileSystem
814    * @param rootdir the path to the HBase root directory
815    * @return the unique cluster identifier
816    * @throws IOException if reading the cluster ID file fails
817    */
818   public static ClusterId getClusterId(FileSystem fs, Path rootdir)
819   throws IOException {
820     Path idPath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
821     ClusterId clusterId = null;
822     FileStatus status = fs.exists(idPath)? fs.getFileStatus(idPath):  null;
823     if (status != null) {
824       int len = Ints.checkedCast(status.getLen());
825       byte [] content = new byte[len];
826       FSDataInputStream in = fs.open(idPath);
827       try {
828         in.readFully(content);
829       } catch (EOFException eof) {
830         LOG.warn("Cluster ID file " + idPath.toString() + " was empty");
831       } finally{
832         in.close();
833       }
834       try {
835         clusterId = ClusterId.parseFrom(content);
836       } catch (DeserializationException e) {
837         throw new IOException("content=" + Bytes.toString(content), e);
838       }
839       // If not pb'd, make it so.
840       if (!ProtobufUtil.isPBMagicPrefix(content)) {
841         String cid = null;
842         in = fs.open(idPath);
843         try {
844           cid = in.readUTF();
845           clusterId = new ClusterId(cid);
846         } catch (EOFException eof) {
847           LOG.warn("Cluster ID file " + idPath.toString() + " was empty");
848         } finally {
849           in.close();
850         }
851         rewriteAsPb(fs, rootdir, idPath, clusterId);
852       }
853       return clusterId;
854     } else {
855       LOG.warn("Cluster ID file does not exist at " + idPath.toString());
856     }
857     return clusterId;
858   }
859
860   /**
861    * @param cid
862    * @throws IOException
863    */
864   private static void rewriteAsPb(final FileSystem fs, final Path rootdir, final Path p,
865       final ClusterId cid)
866   throws IOException {
867     // Rewrite the file as pb.  Move aside the old one first, write new
868     // then delete the moved-aside file.
869     Path movedAsideName = new Path(p + "." + System.currentTimeMillis());
870     if (!fs.rename(p, movedAsideName)) throw new IOException("Failed rename of " + p);
871     setClusterId(fs, rootdir, cid, 100);
872     if (!fs.delete(movedAsideName, false)) {
873       throw new IOException("Failed delete of " + movedAsideName);
874     }
875     LOG.debug("Rewrote the hbase.id file as pb");
876   }
877
878   /**
879    * Writes a new unique identifier for this cluster to the "hbase.id" file
880    * in the HBase root directory
881    * @param fs the root directory FileSystem
882    * @param rootdir the path to the HBase root directory
883    * @param clusterId the unique identifier to store
884    * @param wait how long (in milliseconds) to wait between retries
885    * @throws IOException if writing to the FileSystem fails and no wait value
886    */
887   public static void setClusterId(FileSystem fs, Path rootdir, ClusterId clusterId,
888       int wait) throws IOException {
889     while (true) {
890       try {
891         Path idFile = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
892         Path tempIdFile = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY +
893           Path.SEPARATOR + HConstants.CLUSTER_ID_FILE_NAME);
894         // Write the id file to a temporary location
895         FSDataOutputStream s = fs.create(tempIdFile);
896         try {
897           s.write(clusterId.toByteArray());
898           s.close();
899           s = null;
900           // Move the temporary file to its normal location. Throw an IOE if
901           // the rename failed
902           if (!fs.rename(tempIdFile, idFile)) {
903             throw new IOException("Unable to move temp version file to " + idFile);
904           }
905         } finally {
906           // Attempt to close the stream if still open on the way out
907           try {
908             if (s != null) s.close();
909           } catch (IOException ignore) { }
910         }
911         if (LOG.isDebugEnabled()) {
912           LOG.debug("Created cluster ID file at " + idFile.toString() + " with ID: " + clusterId);
913         }
914         return;
915       } catch (IOException ioe) {
916         if (wait > 0) {
917           LOG.warn("Unable to create cluster ID file in " + rootdir.toString() +
918               ", retrying in " + wait + "msec: " + StringUtils.stringifyException(ioe));
919           try {
920             Thread.sleep(wait);
921           } catch (InterruptedException e) {
922             throw (InterruptedIOException)new InterruptedIOException().initCause(e);
923           }
924         } else {
925           throw ioe;
926         }
927       }
928     }
929   }
930
931   /**
932    * Verifies root directory path is a valid URI with a scheme
933    *
934    * @param root root directory path
935    * @return Passed <code>root</code> argument.
936    * @throws IOException if not a valid URI with a scheme
937    */
938   public static Path validateRootPath(Path root) throws IOException {
939     try {
940       URI rootURI = new URI(root.toString());
941       String scheme = rootURI.getScheme();
942       if (scheme == null) {
943         throw new IOException("Root directory does not have a scheme");
944       }
945       return root;
946     } catch (URISyntaxException e) {
947       IOException io = new IOException("Root directory path is not a valid " +
948         "URI -- check your " + HConstants.HBASE_DIR + " configuration");
949       io.initCause(e);
950       throw io;
951     }
952   }
953
954   /**
955    * Checks for the presence of the root path (using the provided conf object) in the given path. If
956    * it exists, this method removes it and returns the String representation of remaining relative path.
957    * @param path
958    * @param conf
959    * @return String representation of the remaining relative path
960    * @throws IOException
961    */
962   public static String removeRootPath(Path path, final Configuration conf) throws IOException {
963     Path root = FSUtils.getRootDir(conf);
964     String pathStr = path.toString();
965     // check that the path is absolute... it has the root path in it.
966     if (!pathStr.startsWith(root.toString())) return pathStr;
967     // if not, return as it is.
968     return pathStr.substring(root.toString().length() + 1);// remove the "/" too.
969   }
970
971   /**
972    * If DFS, check safe mode and if so, wait until we clear it.
973    * @param conf configuration
974    * @param wait Sleep between retries
975    * @throws IOException e
976    */
977   public static void waitOnSafeMode(final Configuration conf,
978     final long wait)
979   throws IOException {
980     FileSystem fs = FileSystem.get(conf);
981     if (!(fs instanceof DistributedFileSystem)) return;
982     DistributedFileSystem dfs = (DistributedFileSystem)fs;
983     // Make sure dfs is not in safe mode
984     while (isInSafeMode(dfs)) {
985       LOG.info("Waiting for dfs to exit safe mode...");
986       try {
987         Thread.sleep(wait);
988       } catch (InterruptedException e) {
989         throw (InterruptedIOException)new InterruptedIOException().initCause(e);
990       }
991     }
992   }
993
994   /**
995    * Return the 'path' component of a Path.  In Hadoop, Path is an URI.  This
996    * method returns the 'path' component of a Path's URI: e.g. If a Path is
997    * <code>hdfs://example.org:9000/hbase_trunk/TestTable/compaction.dir</code>,
998    * this method returns <code>/hbase_trunk/TestTable/compaction.dir</code>.
999    * This method is useful if you want to print out a Path without qualifying
1000    * Filesystem instance.
1001    * @param p Filesystem Path whose 'path' component we are to return.
1002    * @return Path portion of the Filesystem
1003    */
1004   public static String getPath(Path p) {
1005     return p.toUri().getPath();
1006   }
1007
1008   /**
1009    * @param c configuration
1010    * @return Path to hbase root directory: i.e. <code>hbase.rootdir</code> from
1011    * configuration as a qualified Path.
1012    * @throws IOException e
1013    */
1014   public static Path getRootDir(final Configuration c) throws IOException {
1015     Path p = new Path(c.get(HConstants.HBASE_DIR));
1016     FileSystem fs = p.getFileSystem(c);
1017     return p.makeQualified(fs);
1018   }
1019
1020   public static void setRootDir(final Configuration c, final Path root) throws IOException {
1021     c.set(HConstants.HBASE_DIR, root.toString());
1022   }
1023
1024   public static void setFsDefault(final Configuration c, final Path root) throws IOException {
1025     c.set("fs.defaultFS", root.toString());    // for hadoop 0.21+
1026   }
1027
1028   /**
1029    * Checks if meta region exists
1030    *
1031    * @param fs file system
1032    * @param rootdir root directory of HBase installation
1033    * @return true if exists
1034    * @throws IOException e
1035    */
1036   @SuppressWarnings("deprecation")
1037   public static boolean metaRegionExists(FileSystem fs, Path rootdir)
1038   throws IOException {
1039     Path metaRegionDir =
1040       HRegion.getRegionDir(rootdir, HRegionInfo.FIRST_META_REGIONINFO);
1041     return fs.exists(metaRegionDir);
1042   }
1043
1044   /**
1045    * Compute HDFS blocks distribution of a given file, or a portion of the file
1046    * @param fs file system
1047    * @param status file status of the file
1048    * @param start start position of the portion
1049    * @param length length of the portion
1050    * @return The HDFS blocks distribution
1051    */
1052   static public HDFSBlocksDistribution computeHDFSBlocksDistribution(
1053     final FileSystem fs, FileStatus status, long start, long length)
1054     throws IOException {
1055     HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution();
1056     BlockLocation [] blockLocations =
1057       fs.getFileBlockLocations(status, start, length);
1058     for(BlockLocation bl : blockLocations) {
1059       String [] hosts = bl.getHosts();
1060       long len = bl.getLength();
1061       blocksDistribution.addHostsAndBlockWeight(hosts, len);
1062     }
1063
1064     return blocksDistribution;
1065   }
1066
1067   // TODO move this method OUT of FSUtils. No dependencies to HMaster
1068   /**
1069    * Returns the total overall fragmentation percentage. Includes hbase:meta and
1070    * -ROOT- as well.
1071    *
1072    * @param master  The master defining the HBase root and file system.
1073    * @return A map for each table and its percentage.
1074    * @throws IOException When scanning the directory fails.
1075    */
1076   public static int getTotalTableFragmentation(final HMaster master)
1077   throws IOException {
1078     Map<String, Integer> map = getTableFragmentation(master);
1079     return map != null && map.size() > 0 ? map.get("-TOTAL-") : -1;
1080   }
1081
1082   /**
1083    * Runs through the HBase rootdir and checks how many stores for each table
1084    * have more than one file in them. Checks -ROOT- and hbase:meta too. The total
1085    * percentage across all tables is stored under the special key "-TOTAL-".
1086    *
1087    * @param master  The master defining the HBase root and file system.
1088    * @return A map for each table and its percentage.
1089    *
1090    * @throws IOException When scanning the directory fails.
1091    */
1092   public static Map<String, Integer> getTableFragmentation(
1093     final HMaster master)
1094   throws IOException {
1095     Path path = getRootDir(master.getConfiguration());
1096     // since HMaster.getFileSystem() is package private
1097     FileSystem fs = path.getFileSystem(master.getConfiguration());
1098     return getTableFragmentation(fs, path);
1099   }
1100
1101   /**
1102    * Runs through the HBase rootdir and checks how many stores for each table
1103    * have more than one file in them. Checks -ROOT- and hbase:meta too. The total
1104    * percentage across all tables is stored under the special key "-TOTAL-".
1105    *
1106    * @param fs  The file system to use.
1107    * @param hbaseRootDir  The root directory to scan.
1108    * @return A map for each table and its percentage.
1109    * @throws IOException When scanning the directory fails.
1110    */
1111   public static Map<String, Integer> getTableFragmentation(
1112     final FileSystem fs, final Path hbaseRootDir)
1113   throws IOException {
1114     Map<String, Integer> frags = new HashMap<String, Integer>();
1115     int cfCountTotal = 0;
1116     int cfFragTotal = 0;
1117     PathFilter regionFilter = new RegionDirFilter(fs);
1118     PathFilter familyFilter = new FamilyDirFilter(fs);
1119     List<Path> tableDirs = getTableDirs(fs, hbaseRootDir);
1120     for (Path d : tableDirs) {
1121       int cfCount = 0;
1122       int cfFrag = 0;
1123       FileStatus[] regionDirs = fs.listStatus(d, regionFilter);
1124       for (FileStatus regionDir : regionDirs) {
1125         Path dd = regionDir.getPath();
1126         // else its a region name, now look in region for families
1127         FileStatus[] familyDirs = fs.listStatus(dd, familyFilter);
1128         for (FileStatus familyDir : familyDirs) {
1129           cfCount++;
1130           cfCountTotal++;
1131           Path family = familyDir.getPath();
1132           // now in family make sure only one file
1133           FileStatus[] familyStatus = fs.listStatus(family);
1134           if (familyStatus.length > 1) {
1135             cfFrag++;
1136             cfFragTotal++;
1137           }
1138         }
1139       }
1140       // compute percentage per table and store in result list
1141       frags.put(FSUtils.getTableName(d).getNameAsString(),
1142         cfCount == 0? 0: Math.round((float) cfFrag / cfCount * 100));
1143     }
1144     // set overall percentage for all tables
1145     frags.put("-TOTAL-",
1146       cfCountTotal == 0? 0: Math.round((float) cfFragTotal / cfCountTotal * 100));
1147     return frags;
1148   }
1149
1150   /**
1151    * Returns the {@link org.apache.hadoop.fs.Path} object representing the table directory under
1152    * path rootdir
1153    *
1154    * @param rootdir qualified path of HBase root directory
1155    * @param tableName name of table
1156    * @return {@link org.apache.hadoop.fs.Path} for table
1157    */
1158   public static Path getTableDir(Path rootdir, final TableName tableName) {
1159     return new Path(getNamespaceDir(rootdir, tableName.getNamespaceAsString()),
1160         tableName.getQualifierAsString());
1161   }
1162
1163   /**
1164    * Returns the {@link org.apache.hadoop.hbase.TableName} object representing
1165    * the table directory under
1166    * path rootdir
1167    *
1168    * @param tablePath path of table
1169    * @return {@link org.apache.hadoop.fs.Path} for table
1170    */
1171   public static TableName getTableName(Path tablePath) {
1172     return TableName.valueOf(tablePath.getParent().getName(), tablePath.getName());
1173   }
1174
1175   /**
1176    * Returns the {@link org.apache.hadoop.fs.Path} object representing
1177    * the namespace directory under path rootdir
1178    *
1179    * @param rootdir qualified path of HBase root directory
1180    * @param namespace namespace name
1181    * @return {@link org.apache.hadoop.fs.Path} for table
1182    */
1183   public static Path getNamespaceDir(Path rootdir, final String namespace) {
1184     return new Path(rootdir, new Path(HConstants.BASE_NAMESPACE_DIR,
1185         new Path(namespace)));
1186   }
1187
1188   /**
1189    * A {@link PathFilter} that returns only regular files.
1190    */
1191   static class FileFilter extends AbstractFileStatusFilter {
1192     private final FileSystem fs;
1193
1194     public FileFilter(final FileSystem fs) {
1195       this.fs = fs;
1196     }
1197
1198     @Override
1199     protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1200       try {
1201         return isFile(fs, isDir, p);
1202       } catch (IOException e) {
1203         LOG.warn("unable to verify if path=" + p + " is a regular file", e);
1204         return false;
1205       }
1206     }
1207   }
1208
1209   /**
1210    * Directory filter that doesn't include any of the directories in the specified blacklist
1211    */
1212   public static class BlackListDirFilter extends AbstractFileStatusFilter {
1213     private final FileSystem fs;
1214     private List<String> blacklist;
1215
1216     /**
1217      * Create a filter on the givem filesystem with the specified blacklist
1218      * @param fs filesystem to filter
1219      * @param directoryNameBlackList list of the names of the directories to filter. If
1220      *          <tt>null</tt>, all directories are returned
1221      */
1222     @SuppressWarnings("unchecked")
1223     public BlackListDirFilter(final FileSystem fs, final List<String> directoryNameBlackList) {
1224       this.fs = fs;
1225       blacklist =
1226         (List<String>) (directoryNameBlackList == null ? Collections.emptyList()
1227           : directoryNameBlackList);
1228     }
1229
1230     @Override
1231     protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1232       if (!isValidName(p.getName())) {
1233         return false;
1234       }
1235
1236       try {
1237         return isDirectory(fs, isDir, p);
1238       } catch (IOException e) {
1239         LOG.warn("An error occurred while verifying if [" + p.toString()
1240             + "] is a valid directory. Returning 'not valid' and continuing.", e);
1241         return false;
1242       }
1243     }
1244
1245     protected boolean isValidName(final String name) {
1246       return !blacklist.contains(name);
1247     }
1248   }
1249
1250   /**
1251    * A {@link PathFilter} that only allows directories.
1252    */
1253   public static class DirFilter extends BlackListDirFilter {
1254
1255     public DirFilter(FileSystem fs) {
1256       super(fs, null);
1257     }
1258   }
1259
1260   /**
1261    * A {@link PathFilter} that returns usertable directories. To get all directories use the
1262    * {@link BlackListDirFilter} with a <tt>null</tt> blacklist
1263    */
1264   public static class UserTableDirFilter extends BlackListDirFilter {
1265     public UserTableDirFilter(FileSystem fs) {
1266       super(fs, HConstants.HBASE_NON_TABLE_DIRS);
1267     }
1268
1269     @Override
1270     protected boolean isValidName(final String name) {
1271       if (!super.isValidName(name))
1272         return false;
1273
1274       try {
1275         TableName.isLegalTableQualifierName(Bytes.toBytes(name));
1276       } catch (IllegalArgumentException e) {
1277         LOG.info("INVALID NAME " + name);
1278         return false;
1279       }
1280       return true;
1281     }
1282   }
1283
1284   /**
1285    * @param conf
1286    * @return True if this filesystem whose scheme is 'hdfs'.
1287    * @throws IOException
1288    */
1289   public static boolean isHDFS(final Configuration conf) throws IOException {
1290     FileSystem fs = FileSystem.get(conf);
1291     String scheme = fs.getUri().getScheme();
1292     return scheme.equalsIgnoreCase("hdfs");
1293   }
1294
1295   /**
1296    * Recover file lease. Used when a file might be suspect
1297    * to be had been left open by another process.
1298    * @param fs FileSystem handle
1299    * @param p Path of file to recover lease
1300    * @param conf Configuration handle
1301    * @throws IOException
1302    */
1303   public abstract void recoverFileLease(final FileSystem fs, final Path p,
1304       Configuration conf, CancelableProgressable reporter) throws IOException;
1305
1306   public static List<Path> getTableDirs(final FileSystem fs, final Path rootdir)
1307       throws IOException {
1308     List<Path> tableDirs = new LinkedList<Path>();
1309
1310     for(FileStatus status :
1311         fs.globStatus(new Path(rootdir,
1312             new Path(HConstants.BASE_NAMESPACE_DIR, "*")))) {
1313       tableDirs.addAll(FSUtils.getLocalTableDirs(fs, status.getPath()));
1314     }
1315     return tableDirs;
1316   }
1317
1318   /**
1319    * @param fs
1320    * @param rootdir
1321    * @return All the table directories under <code>rootdir</code>. Ignore non table hbase folders such as
1322    * .logs, .oldlogs, .corrupt folders.
1323    * @throws IOException
1324    */
1325   public static List<Path> getLocalTableDirs(final FileSystem fs, final Path rootdir)
1326       throws IOException {
1327     // presumes any directory under hbase.rootdir is a table
1328     FileStatus[] dirs = fs.listStatus(rootdir, new UserTableDirFilter(fs));
1329     List<Path> tabledirs = new ArrayList<Path>(dirs.length);
1330     for (FileStatus dir: dirs) {
1331       tabledirs.add(dir.getPath());
1332     }
1333     return tabledirs;
1334   }
1335
1336   /**
1337    * Checks if the given path is the one with 'recovered.edits' dir.
1338    * @param path
1339    * @return True if we recovered edits
1340    */
1341   public static boolean isRecoveredEdits(Path path) {
1342     return path.toString().contains(HConstants.RECOVERED_EDITS_DIR);
1343   }
1344
1345   /**
1346    * Filter for all dirs that don't start with '.'
1347    */
1348   public static class RegionDirFilter extends AbstractFileStatusFilter {
1349     // This pattern will accept 0.90+ style hex region dirs and older numeric region dir names.
1350     final public static Pattern regionDirPattern = Pattern.compile("^[0-9a-f]*$");
1351     final FileSystem fs;
1352
1353     public RegionDirFilter(FileSystem fs) {
1354       this.fs = fs;
1355     }
1356
1357     @Override
1358     protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1359       if (!regionDirPattern.matcher(p.getName()).matches()) {
1360         return false;
1361       }
1362
1363       try {
1364         return isDirectory(fs, isDir, p);
1365       } catch (IOException ioe) {
1366         // Maybe the file was moved or the fs was disconnected.
1367         LOG.warn("Skipping file " + p +" due to IOException", ioe);
1368         return false;
1369       }
1370     }
1371   }
1372
1373   /**
1374    * Given a particular table dir, return all the regiondirs inside it, excluding files such as
1375    * .tableinfo
1376    * @param fs A file system for the Path
1377    * @param tableDir Path to a specific table directory &lt;hbase.rootdir&gt;/&lt;tabledir&gt;
1378    * @return List of paths to valid region directories in table dir.
1379    * @throws IOException
1380    */
1381   public static List<Path> getRegionDirs(final FileSystem fs, final Path tableDir) throws IOException {
1382     // assumes we are in a table dir.
1383     List<FileStatus> rds = listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs));
1384     if (rds == null) {
1385       return new ArrayList<Path>();
1386     }
1387     List<Path> regionDirs = new ArrayList<Path>(rds.size());
1388     for (FileStatus rdfs: rds) {
1389       Path rdPath = rdfs.getPath();
1390       regionDirs.add(rdPath);
1391     }
1392     return regionDirs;
1393   }
1394
1395   /**
1396    * Filter for all dirs that are legal column family names.  This is generally used for colfam
1397    * dirs &lt;hbase.rootdir&gt;/&lt;tabledir&gt;/&lt;regiondir&gt;/&lt;colfamdir&gt;.
1398    */
1399   public static class FamilyDirFilter extends AbstractFileStatusFilter {
1400     final FileSystem fs;
1401
1402     public FamilyDirFilter(FileSystem fs) {
1403       this.fs = fs;
1404     }
1405
1406     @Override
1407     protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1408       try {
1409         // throws IAE if invalid
1410         HColumnDescriptor.isLegalFamilyName(Bytes.toBytes(p.getName()));
1411       } catch (IllegalArgumentException iae) {
1412         // path name is an invalid family name and thus is excluded.
1413         return false;
1414       }
1415
1416       try {
1417         return isDirectory(fs, isDir, p);
1418       } catch (IOException ioe) {
1419         // Maybe the file was moved or the fs was disconnected.
1420         LOG.warn("Skipping file " + p +" due to IOException", ioe);
1421         return false;
1422       }
1423     }
1424   }
1425
1426   /**
1427    * Given a particular region dir, return all the familydirs inside it
1428    *
1429    * @param fs A file system for the Path
1430    * @param regionDir Path to a specific region directory
1431    * @return List of paths to valid family directories in region dir.
1432    * @throws IOException
1433    */
1434   public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir) throws IOException {
1435     // assumes we are in a region dir.
1436     FileStatus[] fds = fs.listStatus(regionDir, new FamilyDirFilter(fs));
1437     List<Path> familyDirs = new ArrayList<Path>(fds.length);
1438     for (FileStatus fdfs: fds) {
1439       Path fdPath = fdfs.getPath();
1440       familyDirs.add(fdPath);
1441     }
1442     return familyDirs;
1443   }
1444
1445   public static List<Path> getReferenceFilePaths(final FileSystem fs, final Path familyDir) throws IOException {
1446     List<FileStatus> fds = listStatusWithStatusFilter(fs, familyDir, new ReferenceFileFilter(fs));
1447     if (fds == null) {
1448       return new ArrayList<Path>();
1449     }
1450     List<Path> referenceFiles = new ArrayList<Path>(fds.size());
1451     for (FileStatus fdfs: fds) {
1452       Path fdPath = fdfs.getPath();
1453       referenceFiles.add(fdPath);
1454     }
1455     return referenceFiles;
1456   }
1457
1458   /**
1459    * Filter for HFiles that excludes reference files.
1460    */
1461   public static class HFileFilter extends AbstractFileStatusFilter {
1462     final FileSystem fs;
1463
1464     public HFileFilter(FileSystem fs) {
1465       this.fs = fs;
1466     }
1467
1468     @Override
1469     protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1470       if (!StoreFileInfo.isHFile(p)) {
1471         return false;
1472       }
1473
1474       try {
1475         return isFile(fs, isDir, p);
1476       } catch (IOException ioe) {
1477         // Maybe the file was moved or the fs was disconnected.
1478         LOG.warn("Skipping file " + p +" due to IOException", ioe);
1479         return false;
1480       }
1481     }
1482   }
1483
1484   public static class ReferenceFileFilter extends AbstractFileStatusFilter {
1485
1486     private final FileSystem fs;
1487
1488     public ReferenceFileFilter(FileSystem fs) {
1489       this.fs = fs;
1490     }
1491
1492     @Override
1493     protected boolean accept(Path p, @CheckForNull Boolean isDir) {
1494       if (!StoreFileInfo.isReference(p)) {
1495         return false;
1496       }
1497
1498       try {
1499         // only files can be references.
1500         return isFile(fs, isDir, p);
1501       } catch (IOException ioe) {
1502         // Maybe the file was moved or the fs was disconnected.
1503         LOG.warn("Skipping file " + p +" due to IOException", ioe);
1504         return false;
1505       }
1506     }
1507   }
1508
1509
1510   /**
1511    * @param conf
1512    * @return Returns the filesystem of the hbase rootdir.
1513    * @throws IOException
1514    */
1515   public static FileSystem getCurrentFileSystem(Configuration conf)
1516   throws IOException {
1517     return getRootDir(conf).getFileSystem(conf);
1518   }
1519
1520
1521   /**
1522    * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1523    * table StoreFile names to the full Path.
1524    * <br>
1525    * Example...<br>
1526    * Key = 3944417774205889744  <br>
1527    * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1528    *
1529    * @param map map to add values.  If null, this method will create and populate one to return
1530    * @param fs  The file system to use.
1531    * @param hbaseRootDir  The root directory to scan.
1532    * @param tableName name of the table to scan.
1533    * @return Map keyed by StoreFile name with a value of the full Path.
1534    * @throws IOException When scanning the directory fails.
1535    * @throws InterruptedException
1536    */
1537   public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map,
1538   final FileSystem fs, final Path hbaseRootDir, TableName tableName)
1539   throws IOException, InterruptedException {
1540     return getTableStoreFilePathMap(map, fs, hbaseRootDir, tableName, null, null, null);
1541   }
1542
1543   /**
1544    * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1545    * table StoreFile names to the full Path.  Note that because this method can be called
1546    * on a 'live' HBase system that we will skip files that no longer exist by the time
1547    * we traverse them and similarly the user of the result needs to consider that some
1548    * entries in this map may not exist by the time this call completes.
1549    * <br>
1550    * Example...<br>
1551    * Key = 3944417774205889744  <br>
1552    * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1553    *
1554    * @param resultMap map to add values.  If null, this method will create and populate one to return
1555    * @param fs  The file system to use.
1556    * @param hbaseRootDir  The root directory to scan.
1557    * @param tableName name of the table to scan.
1558    * @param sfFilter optional path filter to apply to store files
1559    * @param executor optional executor service to parallelize this operation
1560    * @param errors ErrorReporter instance or null
1561    * @return Map keyed by StoreFile name with a value of the full Path.
1562    * @throws IOException When scanning the directory fails.
1563    * @throws InterruptedException
1564    */
1565   public static Map<String, Path> getTableStoreFilePathMap(
1566       Map<String, Path> resultMap,
1567       final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter,
1568       ExecutorService executor, final ErrorReporter errors) throws IOException, InterruptedException {
1569
1570     final Map<String, Path> finalResultMap =
1571         resultMap == null ? new ConcurrentHashMap<String, Path>(128, 0.75f, 32) : resultMap;
1572
1573     // only include the directory paths to tables
1574     Path tableDir = FSUtils.getTableDir(hbaseRootDir, tableName);
1575     // Inside a table, there are compaction.dir directories to skip.  Otherwise, all else
1576     // should be regions.
1577     final FamilyDirFilter familyFilter = new FamilyDirFilter(fs);
1578     final Vector<Exception> exceptions = new Vector<Exception>();
1579
1580     try {
1581       List<FileStatus> regionDirs = FSUtils.listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs));
1582       if (regionDirs == null) {
1583         return finalResultMap;
1584       }
1585
1586       final List<Future<?>> futures = new ArrayList<Future<?>>(regionDirs.size());
1587
1588       for (FileStatus regionDir : regionDirs) {
1589         if (null != errors) {
1590           errors.progress();
1591         }
1592         final Path dd = regionDir.getPath();
1593
1594         if (!exceptions.isEmpty()) {
1595           break;
1596         }
1597
1598         Runnable getRegionStoreFileMapCall = new Runnable() {
1599           @Override
1600           public void run() {
1601             try {
1602               HashMap<String,Path> regionStoreFileMap = new HashMap<String, Path>();
1603               List<FileStatus> familyDirs = FSUtils.listStatusWithStatusFilter(fs, dd, familyFilter);
1604               if (familyDirs == null) {
1605                 if (!fs.exists(dd)) {
1606                   LOG.warn("Skipping region because it no longer exists: " + dd);
1607                 } else {
1608                   LOG.warn("Skipping region because it has no family dirs: " + dd);
1609                 }
1610                 return;
1611               }
1612               for (FileStatus familyDir : familyDirs) {
1613                 if (null != errors) {
1614                   errors.progress();
1615                 }
1616                 Path family = familyDir.getPath();
1617                 if (family.getName().equals(HConstants.RECOVERED_EDITS_DIR)) {
1618                   continue;
1619                 }
1620                 // now in family, iterate over the StoreFiles and
1621                 // put in map
1622                 FileStatus[] familyStatus = fs.listStatus(family);
1623                 for (FileStatus sfStatus : familyStatus) {
1624                   if (null != errors) {
1625                     errors.progress();
1626                   }
1627                   Path sf = sfStatus.getPath();
1628                   if (sfFilter == null || sfFilter.accept(sf)) {
1629                     regionStoreFileMap.put( sf.getName(), sf);
1630                   }
1631                 }
1632               }
1633               finalResultMap.putAll(regionStoreFileMap);
1634             } catch (Exception e) {
1635               LOG.error("Could not get region store file map for region: " + dd, e);
1636               exceptions.add(e);
1637             }
1638           }
1639         };
1640
1641         // If executor is available, submit async tasks to exec concurrently, otherwise
1642         // just do serial sync execution
1643         if (executor != null) {
1644           Future<?> future = executor.submit(getRegionStoreFileMapCall);
1645           futures.add(future);
1646         } else {
1647           FutureTask<?> future = new FutureTask<Object>(getRegionStoreFileMapCall, null);
1648           future.run();
1649           futures.add(future);
1650         }
1651       }
1652
1653       // Ensure all pending tasks are complete (or that we run into an exception)
1654       for (Future<?> f : futures) {
1655         if (!exceptions.isEmpty()) {
1656           break;
1657         }
1658         try {
1659           f.get();
1660         } catch (ExecutionException e) {
1661           LOG.error("Unexpected exec exception!  Should've been caught already.  (Bug?)", e);
1662           // Shouldn't happen, we already logged/caught any exceptions in the Runnable
1663         }
1664       }
1665     } catch (IOException e) {
1666       LOG.error("Cannot execute getTableStoreFilePathMap for " + tableName, e);
1667       exceptions.add(e);
1668     } finally {
1669       if (!exceptions.isEmpty()) {
1670         // Just throw the first exception as an indication something bad happened
1671         // Don't need to propagate all the exceptions, we already logged them all anyway
1672         Throwables.propagateIfInstanceOf(exceptions.firstElement(), IOException.class);
1673         throw Throwables.propagate(exceptions.firstElement());
1674       }
1675     }
1676
1677     return finalResultMap;
1678   }
1679
1680   public static int getRegionReferenceFileCount(final FileSystem fs, final Path p) {
1681     int result = 0;
1682     try {
1683       for (Path familyDir:getFamilyDirs(fs, p)){
1684         result += getReferenceFilePaths(fs, familyDir).size();
1685       }
1686     } catch (IOException e) {
1687       LOG.warn("Error Counting reference files.", e);
1688     }
1689     return result;
1690   }
1691
1692   /**
1693    * Runs through the HBase rootdir and creates a reverse lookup map for
1694    * table StoreFile names to the full Path.
1695    * <br>
1696    * Example...<br>
1697    * Key = 3944417774205889744  <br>
1698    * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1699    *
1700    * @param fs  The file system to use.
1701    * @param hbaseRootDir  The root directory to scan.
1702    * @return Map keyed by StoreFile name with a value of the full Path.
1703    * @throws IOException When scanning the directory fails.
1704    * @throws InterruptedException
1705    */
1706   public static Map<String, Path> getTableStoreFilePathMap(
1707     final FileSystem fs, final Path hbaseRootDir)
1708   throws IOException, InterruptedException {
1709     return getTableStoreFilePathMap(fs, hbaseRootDir, null, null, null);
1710   }
1711
1712   /**
1713    * Runs through the HBase rootdir and creates a reverse lookup map for
1714    * table StoreFile names to the full Path.
1715    * <br>
1716    * Example...<br>
1717    * Key = 3944417774205889744  <br>
1718    * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1719    *
1720    * @param fs  The file system to use.
1721    * @param hbaseRootDir  The root directory to scan.
1722    * @param sfFilter optional path filter to apply to store files
1723    * @param executor optional executor service to parallelize this operation
1724    * @param errors ErrorReporter instance or null
1725    * @return Map keyed by StoreFile name with a value of the full Path.
1726    * @throws IOException When scanning the directory fails.
1727    * @throws InterruptedException
1728    */
1729   public static Map<String, Path> getTableStoreFilePathMap(
1730     final FileSystem fs, final Path hbaseRootDir, PathFilter sfFilter,
1731     ExecutorService executor, ErrorReporter errors)
1732   throws IOException, InterruptedException {
1733     ConcurrentHashMap<String, Path> map = new ConcurrentHashMap<String, Path>(1024, 0.75f, 32);
1734
1735     // if this method looks similar to 'getTableFragmentation' that is because
1736     // it was borrowed from it.
1737
1738     // only include the directory paths to tables
1739     for (Path tableDir : FSUtils.getTableDirs(fs, hbaseRootDir)) {
1740       getTableStoreFilePathMap(map, fs, hbaseRootDir,
1741           FSUtils.getTableName(tableDir), sfFilter, executor, errors);
1742     }
1743     return map;
1744   }
1745
1746   /**
1747    * Filters FileStatuses in an array and returns a list
1748    *
1749    * @param input   An array of FileStatuses
1750    * @param filter  A required filter to filter the array
1751    * @return        A list of FileStatuses
1752    */
1753   public static List<FileStatus> filterFileStatuses(FileStatus[] input,
1754       FileStatusFilter filter) {
1755     if (input == null) return null;
1756     return filterFileStatuses(Iterators.forArray(input), filter);
1757   }
1758
1759   /**
1760    * Filters FileStatuses in an iterator and returns a list
1761    *
1762    * @param input   An iterator of FileStatuses
1763    * @param filter  A required filter to filter the array
1764    * @return        A list of FileStatuses
1765    */
1766   public static List<FileStatus> filterFileStatuses(Iterator<FileStatus> input,
1767       FileStatusFilter filter) {
1768     if (input == null) return null;
1769     ArrayList<FileStatus> results = new ArrayList<FileStatus>();
1770     while (input.hasNext()) {
1771       FileStatus f = input.next();
1772       if (filter.accept(f)) {
1773         results.add(f);
1774       }
1775     }
1776     return results;
1777   }
1778
1779   /**
1780    * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
1781    * This accommodates differences between hadoop versions, where hadoop 1
1782    * does not throw a FileNotFoundException, and return an empty FileStatus[]
1783    * while Hadoop 2 will throw FileNotFoundException.
1784    *
1785    * @param fs file system
1786    * @param dir directory
1787    * @param filter file status filter
1788    * @return null if dir is empty or doesn't exist, otherwise FileStatus list
1789    */
1790   public static List<FileStatus> listStatusWithStatusFilter(final FileSystem fs,
1791       final Path dir, final FileStatusFilter filter) throws IOException {
1792     FileStatus [] status = null;
1793     try {
1794       status = fs.listStatus(dir);
1795     } catch (FileNotFoundException fnfe) {
1796       // if directory doesn't exist, return null
1797       if (LOG.isTraceEnabled()) {
1798         LOG.trace(dir + " doesn't exist");
1799       }
1800     }
1801
1802     if (status == null || status.length < 1)  {
1803       return null;
1804     }
1805
1806     if (filter == null) {
1807       return Arrays.asList(status);
1808     } else {
1809       List<FileStatus> status2 = filterFileStatuses(status, filter);
1810       if (status2 == null || status2.isEmpty()) {
1811         return null;
1812       } else {
1813         return status2;
1814       }
1815     }
1816   }
1817
1818   /**
1819    * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
1820    * This accommodates differences between hadoop versions, where hadoop 1
1821    * does not throw a FileNotFoundException, and return an empty FileStatus[]
1822    * while Hadoop 2 will throw FileNotFoundException.
1823    *
1824    * Where possible, prefer {@link #listStatusWithStatusFilter(FileSystem,
1825    * Path, FileStatusFilter)} instead.
1826    *
1827    * @param fs file system
1828    * @param dir directory
1829    * @param filter path filter
1830    * @return null if dir is empty or doesn't exist, otherwise FileStatus array
1831    */
1832   public static FileStatus [] listStatus(final FileSystem fs,
1833       final Path dir, final PathFilter filter) throws IOException {
1834     FileStatus [] status = null;
1835     try {
1836       status = filter == null ? fs.listStatus(dir) : fs.listStatus(dir, filter);
1837     } catch (FileNotFoundException fnfe) {
1838       // if directory doesn't exist, return null
1839       if (LOG.isTraceEnabled()) {
1840         LOG.trace(dir + " doesn't exist");
1841       }
1842     }
1843     if (status == null || status.length < 1) return null;
1844     return status;
1845   }
1846
1847   /**
1848    * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
1849    * This would accommodates differences between hadoop versions
1850    *
1851    * @param fs file system
1852    * @param dir directory
1853    * @return null if dir is empty or doesn't exist, otherwise FileStatus array
1854    */
1855   public static FileStatus[] listStatus(final FileSystem fs, final Path dir) throws IOException {
1856     return listStatus(fs, dir, null);
1857   }
1858
1859   /**
1860    * Calls fs.delete() and returns the value returned by the fs.delete()
1861    *
1862    * @param fs
1863    * @param path
1864    * @param recursive
1865    * @return the value returned by the fs.delete()
1866    * @throws IOException
1867    */
1868   public static boolean delete(final FileSystem fs, final Path path, final boolean recursive)
1869       throws IOException {
1870     return fs.delete(path, recursive);
1871   }
1872
1873   /**
1874    * Calls fs.exists(). Checks if the specified path exists
1875    *
1876    * @param fs
1877    * @param path
1878    * @return the value returned by fs.exists()
1879    * @throws IOException
1880    */
1881   public static boolean isExists(final FileSystem fs, final Path path) throws IOException {
1882     return fs.exists(path);
1883   }
1884
1885   /**
1886    * Throw an exception if an action is not permitted by a user on a file.
1887    *
1888    * @param ugi
1889    *          the user
1890    * @param file
1891    *          the file
1892    * @param action
1893    *          the action
1894    */
1895   public static void checkAccess(UserGroupInformation ugi, FileStatus file,
1896       FsAction action) throws AccessDeniedException {
1897     if (ugi.getShortUserName().equals(file.getOwner())) {
1898       if (file.getPermission().getUserAction().implies(action)) {
1899         return;
1900       }
1901     } else if (contains(ugi.getGroupNames(), file.getGroup())) {
1902       if (file.getPermission().getGroupAction().implies(action)) {
1903         return;
1904       }
1905     } else if (file.getPermission().getOtherAction().implies(action)) {
1906       return;
1907     }
1908     throw new AccessDeniedException("Permission denied:" + " action=" + action
1909         + " path=" + file.getPath() + " user=" + ugi.getShortUserName());
1910   }
1911
1912   private static boolean contains(String[] groups, String user) {
1913     for (String group : groups) {
1914       if (group.equals(user)) {
1915         return true;
1916       }
1917     }
1918     return false;
1919   }
1920
1921   /**
1922    * Log the current state of the filesystem from a certain root directory
1923    * @param fs filesystem to investigate
1924    * @param root root file/directory to start logging from
1925    * @param LOG log to output information
1926    * @throws IOException if an unexpected exception occurs
1927    */
1928   public static void logFileSystemState(final FileSystem fs, final Path root, Log LOG)
1929       throws IOException {
1930     LOG.debug("Current file system:");
1931     logFSTree(LOG, fs, root, "|-");
1932   }
1933
1934   /**
1935    * Recursive helper to log the state of the FS
1936    *
1937    * @see #logFileSystemState(FileSystem, Path, Log)
1938    */
1939   private static void logFSTree(Log LOG, final FileSystem fs, final Path root, String prefix)
1940       throws IOException {
1941     FileStatus[] files = FSUtils.listStatus(fs, root, null);
1942     if (files == null) return;
1943
1944     for (FileStatus file : files) {
1945       if (file.isDirectory()) {
1946         LOG.debug(prefix + file.getPath().getName() + "/");
1947         logFSTree(LOG, fs, file.getPath(), prefix + "---");
1948       } else {
1949         LOG.debug(prefix + file.getPath().getName());
1950       }
1951     }
1952   }
1953
1954   public static boolean renameAndSetModifyTime(final FileSystem fs, final Path src, final Path dest)
1955       throws IOException {
1956     // set the modify time for TimeToLive Cleaner
1957     fs.setTimes(src, EnvironmentEdgeManager.currentTime(), -1);
1958     return fs.rename(src, dest);
1959   }
1960
1961   /**
1962    * This function is to scan the root path of the file system to get the
1963    * degree of locality for each region on each of the servers having at least
1964    * one block of that region.
1965    * This is used by the tool {@link org.apache.hadoop.hbase.master.RegionPlacementMaintainer}
1966    *
1967    * @param conf
1968    *          the configuration to use
1969    * @return the mapping from region encoded name to a map of server names to
1970    *           locality fraction
1971    * @throws IOException
1972    *           in case of file system errors or interrupts
1973    */
1974   public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
1975       final Configuration conf) throws IOException {
1976     return getRegionDegreeLocalityMappingFromFS(
1977         conf, null,
1978         conf.getInt(THREAD_POOLSIZE, DEFAULT_THREAD_POOLSIZE));
1979
1980   }
1981
1982   /**
1983    * This function is to scan the root path of the file system to get the
1984    * degree of locality for each region on each of the servers having at least
1985    * one block of that region.
1986    *
1987    * @param conf
1988    *          the configuration to use
1989    * @param desiredTable
1990    *          the table you wish to scan locality for
1991    * @param threadPoolSize
1992    *          the thread pool size to use
1993    * @return the mapping from region encoded name to a map of server names to
1994    *           locality fraction
1995    * @throws IOException
1996    *           in case of file system errors or interrupts
1997    */
1998   public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
1999       final Configuration conf, final String desiredTable, int threadPoolSize)
2000       throws IOException {
2001     Map<String, Map<String, Float>> regionDegreeLocalityMapping =
2002         new ConcurrentHashMap<String, Map<String, Float>>();
2003     getRegionLocalityMappingFromFS(conf, desiredTable, threadPoolSize, null,
2004         regionDegreeLocalityMapping);
2005     return regionDegreeLocalityMapping;
2006   }
2007
2008   /**
2009    * This function is to scan the root path of the file system to get either the
2010    * mapping between the region name and its best locality region server or the
2011    * degree of locality of each region on each of the servers having at least
2012    * one block of that region. The output map parameters are both optional.
2013    *
2014    * @param conf
2015    *          the configuration to use
2016    * @param desiredTable
2017    *          the table you wish to scan locality for
2018    * @param threadPoolSize
2019    *          the thread pool size to use
2020    * @param regionToBestLocalityRSMapping
2021    *          the map into which to put the best locality mapping or null
2022    * @param regionDegreeLocalityMapping
2023    *          the map into which to put the locality degree mapping or null,
2024    *          must be a thread-safe implementation
2025    * @throws IOException
2026    *           in case of file system errors or interrupts
2027    */
2028   private static void getRegionLocalityMappingFromFS(
2029       final Configuration conf, final String desiredTable,
2030       int threadPoolSize,
2031       Map<String, String> regionToBestLocalityRSMapping,
2032       Map<String, Map<String, Float>> regionDegreeLocalityMapping)
2033       throws IOException {
2034     FileSystem fs =  FileSystem.get(conf);
2035     Path rootPath = FSUtils.getRootDir(conf);
2036     long startTime = EnvironmentEdgeManager.currentTime();
2037     Path queryPath;
2038     // The table files are in ${hbase.rootdir}/data/<namespace>/<table>/*
2039     if (null == desiredTable) {
2040       queryPath = new Path(new Path(rootPath, HConstants.BASE_NAMESPACE_DIR).toString() + "/*/*/*/");
2041     } else {
2042       queryPath = new Path(FSUtils.getTableDir(rootPath, TableName.valueOf(desiredTable)).toString() + "/*/");
2043     }
2044
2045     // reject all paths that are not appropriate
2046     PathFilter pathFilter = new PathFilter() {
2047       @Override
2048       public boolean accept(Path path) {
2049         // this is the region name; it may get some noise data
2050         if (null == path) {
2051           return false;
2052         }
2053
2054         // no parent?
2055         Path parent = path.getParent();
2056         if (null == parent) {
2057           return false;
2058         }
2059
2060         String regionName = path.getName();
2061         if (null == regionName) {
2062           return false;
2063         }
2064
2065         if (!regionName.toLowerCase(Locale.ROOT).matches("[0-9a-f]+")) {
2066           return false;
2067         }
2068         return true;
2069       }
2070     };
2071
2072     FileStatus[] statusList = fs.globStatus(queryPath, pathFilter);
2073
2074     if (null == statusList) {
2075       return;
2076     } else {
2077       LOG.debug("Query Path: " + queryPath + " ; # list of files: " +
2078           statusList.length);
2079     }
2080
2081     // lower the number of threads in case we have very few expected regions
2082     threadPoolSize = Math.min(threadPoolSize, statusList.length);
2083
2084     // run in multiple threads
2085     ThreadPoolExecutor tpe = new ThreadPoolExecutor(threadPoolSize,
2086         threadPoolSize, 60, TimeUnit.SECONDS,
2087         new ArrayBlockingQueue<Runnable>(statusList.length));
2088     try {
2089       // ignore all file status items that are not of interest
2090       for (FileStatus regionStatus : statusList) {
2091         if (null == regionStatus) {
2092           continue;
2093         }
2094
2095         if (!regionStatus.isDirectory()) {
2096           continue;
2097         }
2098
2099         Path regionPath = regionStatus.getPath();
2100         if (null == regionPath) {
2101           continue;
2102         }
2103
2104         tpe.execute(new FSRegionScanner(fs, regionPath,
2105             regionToBestLocalityRSMapping, regionDegreeLocalityMapping));
2106       }
2107     } finally {
2108       tpe.shutdown();
2109       int threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY,
2110           60 * 1000);
2111       try {
2112         // here we wait until TPE terminates, which is either naturally or by
2113         // exceptions in the execution of the threads
2114         while (!tpe.awaitTermination(threadWakeFrequency,
2115             TimeUnit.MILLISECONDS)) {
2116           // printing out rough estimate, so as to not introduce
2117           // AtomicInteger
2118           LOG.info("Locality checking is underway: { Scanned Regions : "
2119               + tpe.getCompletedTaskCount() + "/"
2120               + tpe.getTaskCount() + " }");
2121         }
2122       } catch (InterruptedException e) {
2123         throw (InterruptedIOException)new InterruptedIOException().initCause(e);
2124       }
2125     }
2126
2127     long overhead = EnvironmentEdgeManager.currentTime() - startTime;
2128     String overheadMsg = "Scan DFS for locality info takes " + overhead + " ms";
2129
2130     LOG.info(overheadMsg);
2131   }
2132
2133   /**
2134    * Do our short circuit read setup.
2135    * Checks buffer size to use and whether to do checksumming in hbase or hdfs.
2136    * @param conf
2137    */
2138   public static void setupShortCircuitRead(final Configuration conf) {
2139     // Check that the user has not set the "dfs.client.read.shortcircuit.skip.checksum" property.
2140     boolean shortCircuitSkipChecksum =
2141       conf.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false);
2142     boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
2143     if (shortCircuitSkipChecksum) {
2144       LOG.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not " +
2145         "be set to true." + (useHBaseChecksum ? " HBase checksum doesn't require " +
2146         "it, see https://issues.apache.org/jira/browse/HBASE-6868." : ""));
2147       assert !shortCircuitSkipChecksum; //this will fail if assertions are on
2148     }
2149     checkShortCircuitReadBufferSize(conf);
2150   }
2151
2152   /**
2153    * Check if short circuit read buffer size is set and if not, set it to hbase value.
2154    * @param conf
2155    */
2156   public static void checkShortCircuitReadBufferSize(final Configuration conf) {
2157     final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2;
2158     final int notSet = -1;
2159     // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2
2160     final String dfsKey = "dfs.client.read.shortcircuit.buffer.size";
2161     int size = conf.getInt(dfsKey, notSet);
2162     // If a size is set, return -- we will use it.
2163     if (size != notSet) return;
2164     // But short circuit buffer size is normally not set.  Put in place the hbase wanted size.
2165     int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize);
2166     conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
2167   }
2168
2169   /**
2170    * @param c
2171    * @return The DFSClient DFSHedgedReadMetrics instance or null if can't be found or not on hdfs.
2172    * @throws IOException
2173    */
2174   public static DFSHedgedReadMetrics getDFSHedgedReadMetrics(final Configuration c)
2175       throws IOException {
2176     if (!isHDFS(c)) return null;
2177     // getHedgedReadMetrics is package private. Get the DFSClient instance that is internal
2178     // to the DFS FS instance and make the method getHedgedReadMetrics accessible, then invoke it
2179     // to get the singleton instance of DFSHedgedReadMetrics shared by DFSClients.
2180     final String name = "getHedgedReadMetrics";
2181     DFSClient dfsclient = ((DistributedFileSystem)FileSystem.get(c)).getClient();
2182     Method m;
2183     try {
2184       m = dfsclient.getClass().getDeclaredMethod(name);
2185     } catch (NoSuchMethodException e) {
2186       LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " +
2187           e.getMessage());
2188       return null;
2189     } catch (SecurityException e) {
2190       LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " +
2191           e.getMessage());
2192       return null;
2193     }
2194     m.setAccessible(true);
2195     try {
2196       return (DFSHedgedReadMetrics)m.invoke(dfsclient);
2197     } catch (IllegalAccessException e) {
2198       LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " +
2199           e.getMessage());
2200       return null;
2201     } catch (IllegalArgumentException e) {
2202       LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " +
2203           e.getMessage());
2204       return null;
2205     } catch (InvocationTargetException e) {
2206       LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " +
2207           e.getMessage());
2208       return null;
2209     }
2210   }
2211 }