View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.util;
20  
21  import java.io.ByteArrayInputStream;
22  import java.io.DataInputStream;
23  import java.io.EOFException;
24  import java.io.FileNotFoundException;
25  import java.io.IOException;
26  import java.io.InputStream;
27  import java.io.InterruptedIOException;
28  import java.lang.reflect.InvocationTargetException;
29  import java.lang.reflect.Method;
30  import java.net.InetSocketAddress;
31  import java.net.URI;
32  import java.net.URISyntaxException;
33  import java.util.ArrayList;
34  import java.util.Collections;
35  import java.util.HashMap;
36  import java.util.LinkedList;
37  import java.util.List;
38  import java.util.Map;
39  import java.util.concurrent.ArrayBlockingQueue;
40  import java.util.concurrent.ConcurrentHashMap;
41  import java.util.concurrent.ThreadPoolExecutor;
42  import java.util.concurrent.TimeUnit;
43  import java.util.regex.Pattern;
44  
45  import org.apache.commons.logging.Log;
46  import org.apache.commons.logging.LogFactory;
47  import org.apache.hadoop.hbase.classification.InterfaceAudience;
48  import org.apache.hadoop.HadoopIllegalArgumentException;
49  import org.apache.hadoop.conf.Configuration;
50  import org.apache.hadoop.fs.BlockLocation;
51  import org.apache.hadoop.fs.FSDataInputStream;
52  import org.apache.hadoop.fs.FSDataOutputStream;
53  import org.apache.hadoop.fs.FileStatus;
54  import org.apache.hadoop.fs.FileSystem;
55  import org.apache.hadoop.fs.Path;
56  import org.apache.hadoop.fs.PathFilter;
57  import org.apache.hadoop.fs.permission.FsAction;
58  import org.apache.hadoop.fs.permission.FsPermission;
59  import org.apache.hadoop.hbase.ClusterId;
60  import org.apache.hadoop.hbase.HColumnDescriptor;
61  import org.apache.hadoop.hbase.HConstants;
62  import org.apache.hadoop.hbase.HDFSBlocksDistribution;
63  import org.apache.hadoop.hbase.HRegionInfo;
64  import org.apache.hadoop.hbase.TableName;
65  import org.apache.hadoop.hbase.exceptions.DeserializationException;
66  import org.apache.hadoop.hbase.fs.HFileSystem;
67  import org.apache.hadoop.hbase.master.HMaster;
68  import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
69  import org.apache.hadoop.hbase.security.AccessDeniedException;
70  import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter;
71  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
72  import org.apache.hadoop.hbase.protobuf.generated.FSProtos;
73  import org.apache.hadoop.hbase.regionserver.HRegion;
74  import org.apache.hadoop.hdfs.DFSClient;
75  import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
76  import org.apache.hadoop.hdfs.DistributedFileSystem;
77  import org.apache.hadoop.io.IOUtils;
78  import org.apache.hadoop.io.SequenceFile;
79  import org.apache.hadoop.ipc.RemoteException;
80  import org.apache.hadoop.security.UserGroupInformation;
81  import org.apache.hadoop.util.Progressable;
82  import org.apache.hadoop.util.ReflectionUtils;
83  import org.apache.hadoop.util.StringUtils;
84  
85  import com.google.common.primitives.Ints;
86  
87  /**
88   * Utility methods for interacting with the underlying file system.
89   */
90  @InterfaceAudience.Private
91  public abstract class FSUtils {
92    private static final Log LOG = LogFactory.getLog(FSUtils.class);
93  
94    /** Full access permissions (starting point for a umask) */
95    public static final String FULL_RWX_PERMISSIONS = "777";
96    private static final String THREAD_POOLSIZE = "hbase.client.localityCheck.threadPoolSize";
97    private static final int DEFAULT_THREAD_POOLSIZE = 2;
98  
99    /** Set to true on Windows platforms */
100   public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
101 
102   protected FSUtils() {
103     super();
104   }
105 
106   /**
107    * Sets storage policy for given path according to config setting.
108    * If the passed path is a directory, we'll set the storage policy for all files
109    * created in the future in said directory. Note that this change in storage
110    * policy takes place at the HDFS level; it will persist beyond this RS's lifecycle.
111    * If we're running on a version of HDFS that doesn't support the given storage policy
112    * (or storage policies at all), then we'll issue a log message and continue.
113    *
114    * See http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html
115    *
116    * @param fs We only do anything if an instance of DistributedFileSystem
117    * @param conf used to look up storage policy with given key; not modified.
118    * @param path the Path whose storage policy is to be set
119    * @param policyKey e.g. HConstants.WAL_STORAGE_POLICY
120    * @param defaultPolicy usually should be the policy NONE to delegate to HDFS
121    */
122   public static void setStoragePolicy(final FileSystem fs, final Configuration conf,
123       final Path path, final String policyKey, final String defaultPolicy) {
124     String storagePolicy = conf.get(policyKey, defaultPolicy).toUpperCase();
125     if (storagePolicy.equals(defaultPolicy)) {
126       if (LOG.isTraceEnabled()) {
127         LOG.trace("default policy of " + defaultPolicy + " requested, exiting early.");
128       }
129       return;
130     }
131     if (fs instanceof DistributedFileSystem) {
132       DistributedFileSystem dfs = (DistributedFileSystem)fs;
133       // Once our minimum supported Hadoop version is 2.6.0 we can remove reflection.
134       Class<? extends DistributedFileSystem> dfsClass = dfs.getClass();
135       Method m = null;
136       try {
137         m = dfsClass.getDeclaredMethod("setStoragePolicy",
138             new Class<?>[] { Path.class, String.class });
139         m.setAccessible(true);
140       } catch (NoSuchMethodException e) {
141         LOG.info("FileSystem doesn't support"
142             + " setStoragePolicy; --HDFS-6584 not available");
143       } catch (SecurityException e) {
144         LOG.info("Doesn't have access to setStoragePolicy on "
145             + "FileSystems --HDFS-6584 not available", e);
146         m = null; // could happen on setAccessible()
147       }
148       if (m != null) {
149         try {
150           m.invoke(dfs, path, storagePolicy);
151           LOG.info("set " + storagePolicy + " for " + path);
152         } catch (Exception e) {
153           // check for lack of HDFS-7228
154           boolean probablyBadPolicy = false;
155           if (e instanceof InvocationTargetException) {
156             final Throwable exception = e.getCause();
157             if (exception instanceof RemoteException &&
158                 HadoopIllegalArgumentException.class.getName().equals(
159                     ((RemoteException)exception).getClassName())) {
160               LOG.warn("Given storage policy, '" + storagePolicy + "', was rejected and probably " +
161                   "isn't a valid policy for the version of Hadoop you're running. I.e. if you're " +
162                   "trying to use SSD related policies then you're likely missing HDFS-7228. For " +
163                   "more information see the 'ArchivalStorage' docs for your Hadoop release.");
164               LOG.debug("More information about the invalid storage policy.", exception);
165               probablyBadPolicy = true;
166             }
167           }
168           if (!probablyBadPolicy) {
169             // This swallows FNFE, should we be throwing it? seems more likely to indicate dev
170             // misuse than a runtime problem with HDFS.
171             LOG.warn("Unable to set " + storagePolicy + " for " + path, e);
172           }
173         }
174       }
175     } else {
176       LOG.info("FileSystem isn't an instance of DistributedFileSystem; presuming it doesn't " +
177           "support setStoragePolicy.");
178     }
179   }
180 
181   /**
182    * Compare of path component. Does not consider schema; i.e. if schemas
183    * different but <code>path</code> starts with <code>rootPath</code>,
184    * then the function returns true
185    * @param rootPath
186    * @param path
187    * @return True if <code>path</code> starts with <code>rootPath</code>
188    */
189   public static boolean isStartingWithPath(final Path rootPath, final String path) {
190     String uriRootPath = rootPath.toUri().getPath();
191     String tailUriPath = (new Path(path)).toUri().getPath();
192     return tailUriPath.startsWith(uriRootPath);
193   }
194 
195   /**
196    * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
197    * '/a/b/c' part. Does not consider schema; i.e. if schemas different but path or subpath matches,
198    * the two will equate.
199    * @param pathToSearch Path we will be trying to match.
200    * @param pathTail
201    * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
202    */
203   public static boolean isMatchingTail(final Path pathToSearch, String pathTail) {
204     return isMatchingTail(pathToSearch, new Path(pathTail));
205   }
206 
207   /**
208    * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
209    * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true.  Does not consider
210    * schema; i.e. if schemas different but path or subpath matches, the two will equate.
211    * @param pathToSearch Path we will be trying to match.
212    * @param pathTail
213    * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
214    */
215   public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) {
216     if (pathToSearch.depth() != pathTail.depth()) return false;
217     Path tailPath = pathTail;
218     String tailName;
219     Path toSearch = pathToSearch;
220     String toSearchName;
221     boolean result = false;
222     do {
223       tailName = tailPath.getName();
224       if (tailName == null || tailName.length() <= 0) {
225         result = true;
226         break;
227       }
228       toSearchName = toSearch.getName();
229       if (toSearchName == null || toSearchName.length() <= 0) break;
230       // Move up a parent on each path for next go around.  Path doesn't let us go off the end.
231       tailPath = tailPath.getParent();
232       toSearch = toSearch.getParent();
233     } while(tailName.equals(toSearchName));
234     return result;
235   }
236 
237   public static FSUtils getInstance(FileSystem fs, Configuration conf) {
238     String scheme = fs.getUri().getScheme();
239     if (scheme == null) {
240       LOG.warn("Could not find scheme for uri " +
241           fs.getUri() + ", default to hdfs");
242       scheme = "hdfs";
243     }
244     Class<?> fsUtilsClass = conf.getClass("hbase.fsutil." +
245         scheme + ".impl", FSHDFSUtils.class); // Default to HDFS impl
246     FSUtils fsUtils = (FSUtils)ReflectionUtils.newInstance(fsUtilsClass, conf);
247     return fsUtils;
248   }
249 
250   /**
251    * Delete if exists.
252    * @param fs filesystem object
253    * @param dir directory to delete
254    * @return True if deleted <code>dir</code>
255    * @throws IOException e
256    */
257   public static boolean deleteDirectory(final FileSystem fs, final Path dir)
258   throws IOException {
259     return fs.exists(dir) && fs.delete(dir, true);
260   }
261 
262   /**
263    * Delete the region directory if exists.
264    * @param conf
265    * @param hri
266    * @return True if deleted the region directory.
267    * @throws IOException
268    */
269   public static boolean deleteRegionDir(final Configuration conf, final HRegionInfo hri)
270   throws IOException {
271     Path rootDir = getRootDir(conf);
272     FileSystem fs = rootDir.getFileSystem(conf);
273     return deleteDirectory(fs,
274       new Path(getTableDir(rootDir, hri.getTable()), hri.getEncodedName()));
275   }
276 
277   /**
278    * Return the number of bytes that large input files should be optimally
279    * be split into to minimize i/o time.
280    *
281    * use reflection to search for getDefaultBlockSize(Path f)
282    * if the method doesn't exist, fall back to using getDefaultBlockSize()
283    *
284    * @param fs filesystem object
285    * @return the default block size for the path's filesystem
286    * @throws IOException e
287    */
288   public static long getDefaultBlockSize(final FileSystem fs, final Path path) throws IOException {
289     Method m = null;
290     Class<? extends FileSystem> cls = fs.getClass();
291     try {
292       m = cls.getMethod("getDefaultBlockSize", new Class<?>[] { Path.class });
293     } catch (NoSuchMethodException e) {
294       LOG.info("FileSystem doesn't support getDefaultBlockSize");
295     } catch (SecurityException e) {
296       LOG.info("Doesn't have access to getDefaultBlockSize on FileSystems", e);
297       m = null; // could happen on setAccessible()
298     }
299     if (m == null) {
300       return fs.getDefaultBlockSize(path);
301     } else {
302       try {
303         Object ret = m.invoke(fs, path);
304         return ((Long)ret).longValue();
305       } catch (Exception e) {
306         throw new IOException(e);
307       }
308     }
309   }
310 
311   /*
312    * Get the default replication.
313    *
314    * use reflection to search for getDefaultReplication(Path f)
315    * if the method doesn't exist, fall back to using getDefaultReplication()
316    *
317    * @param fs filesystem object
318    * @param f path of file
319    * @return default replication for the path's filesystem
320    * @throws IOException e
321    */
322   public static short getDefaultReplication(final FileSystem fs, final Path path) throws IOException {
323     Method m = null;
324     Class<? extends FileSystem> cls = fs.getClass();
325     try {
326       m = cls.getMethod("getDefaultReplication", new Class<?>[] { Path.class });
327     } catch (NoSuchMethodException e) {
328       LOG.info("FileSystem doesn't support getDefaultReplication");
329     } catch (SecurityException e) {
330       LOG.info("Doesn't have access to getDefaultReplication on FileSystems", e);
331       m = null; // could happen on setAccessible()
332     }
333     if (m == null) {
334       return fs.getDefaultReplication(path);
335     } else {
336       try {
337         Object ret = m.invoke(fs, path);
338         return ((Number)ret).shortValue();
339       } catch (Exception e) {
340         throw new IOException(e);
341       }
342     }
343   }
344 
345   /**
346    * Returns the default buffer size to use during writes.
347    *
348    * The size of the buffer should probably be a multiple of hardware
349    * page size (4096 on Intel x86), and it determines how much data is
350    * buffered during read and write operations.
351    *
352    * @param fs filesystem object
353    * @return default buffer size to use during writes
354    */
355   public static int getDefaultBufferSize(final FileSystem fs) {
356     return fs.getConf().getInt("io.file.buffer.size", 4096);
357   }
358 
359   /**
360    * Create the specified file on the filesystem. By default, this will:
361    * <ol>
362    * <li>overwrite the file if it exists</li>
363    * <li>apply the umask in the configuration (if it is enabled)</li>
364    * <li>use the fs configured buffer size (or 4096 if not set)</li>
365    * <li>use the configured column family replication or default replication if
366    * {@link HColumnDescriptor#DEFAULT_DFS_REPLICATION}</li>
367    * <li>use the default block size</li>
368    * <li>not track progress</li>
369    * </ol>
370    * @param conf configurations
371    * @param fs {@link FileSystem} on which to write the file
372    * @param path {@link Path} to the file to write
373    * @param perm permissions
374    * @param favoredNodes
375    * @return output stream to the created file
376    * @throws IOException if the file cannot be created
377    */
378   public static FSDataOutputStream create(Configuration conf, FileSystem fs, Path path,
379       FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException {
380     if (fs instanceof HFileSystem) {
381       FileSystem backingFs = ((HFileSystem)fs).getBackingFs();
382       if (backingFs instanceof DistributedFileSystem) {
383         // Try to use the favoredNodes version via reflection to allow backwards-
384         // compatibility.
385         short replication = Short.parseShort(conf.get(HColumnDescriptor.DFS_REPLICATION,
386           String.valueOf(HColumnDescriptor.DEFAULT_DFS_REPLICATION)));
387         try {
388           return (FSDataOutputStream) (DistributedFileSystem.class.getDeclaredMethod("create",
389             Path.class, FsPermission.class, boolean.class, int.class, short.class, long.class,
390             Progressable.class, InetSocketAddress[].class).invoke(backingFs, path, perm, true,
391             getDefaultBufferSize(backingFs),
392             replication > 0 ? replication : getDefaultReplication(backingFs, path),
393             getDefaultBlockSize(backingFs, path), null, favoredNodes));
394         } catch (InvocationTargetException ite) {
395           // Function was properly called, but threw it's own exception.
396           throw new IOException(ite.getCause());
397         } catch (NoSuchMethodException e) {
398           LOG.debug("DFS Client does not support most favored nodes create; using default create");
399           if (LOG.isTraceEnabled()) LOG.trace("Ignoring; use default create", e);
400         } catch (IllegalArgumentException e) {
401           LOG.debug("Ignoring (most likely Reflection related exception) " + e);
402         } catch (SecurityException e) {
403           LOG.debug("Ignoring (most likely Reflection related exception) " + e);
404         } catch (IllegalAccessException e) {
405           LOG.debug("Ignoring (most likely Reflection related exception) " + e);
406         }
407       }
408     }
409     return create(fs, path, perm, true);
410   }
411 
412   /**
413    * Create the specified file on the filesystem. By default, this will:
414    * <ol>
415    * <li>apply the umask in the configuration (if it is enabled)</li>
416    * <li>use the fs configured buffer size (or 4096 if not set)</li>
417    * <li>use the default replication</li>
418    * <li>use the default block size</li>
419    * <li>not track progress</li>
420    * </ol>
421    *
422    * @param fs {@link FileSystem} on which to write the file
423    * @param path {@link Path} to the file to write
424    * @param perm
425    * @param overwrite Whether or not the created file should be overwritten.
426    * @return output stream to the created file
427    * @throws IOException if the file cannot be created
428    */
429   public static FSDataOutputStream create(FileSystem fs, Path path,
430       FsPermission perm, boolean overwrite) throws IOException {
431     if (LOG.isTraceEnabled()) {
432       LOG.trace("Creating file=" + path + " with permission=" + perm + ", overwrite=" + overwrite);
433     }
434     return fs.create(path, perm, overwrite, getDefaultBufferSize(fs),
435         getDefaultReplication(fs, path), getDefaultBlockSize(fs, path), null);
436   }
437 
438   /**
439    * Get the file permissions specified in the configuration, if they are
440    * enabled.
441    *
442    * @param fs filesystem that the file will be created on.
443    * @param conf configuration to read for determining if permissions are
444    *          enabled and which to use
445    * @param permssionConfKey property key in the configuration to use when
446    *          finding the permission
447    * @return the permission to use when creating a new file on the fs. If
448    *         special permissions are not specified in the configuration, then
449    *         the default permissions on the the fs will be returned.
450    */
451   public static FsPermission getFilePermissions(final FileSystem fs,
452       final Configuration conf, final String permssionConfKey) {
453     boolean enablePermissions = conf.getBoolean(
454         HConstants.ENABLE_DATA_FILE_UMASK, false);
455 
456     if (enablePermissions) {
457       try {
458         FsPermission perm = new FsPermission(FULL_RWX_PERMISSIONS);
459         // make sure that we have a mask, if not, go default.
460         String mask = conf.get(permssionConfKey);
461         if (mask == null)
462           return FsPermission.getFileDefault();
463         // appy the umask
464         FsPermission umask = new FsPermission(mask);
465         return perm.applyUMask(umask);
466       } catch (IllegalArgumentException e) {
467         LOG.warn(
468             "Incorrect umask attempted to be created: "
469                 + conf.get(permssionConfKey)
470                 + ", using default file permissions.", e);
471         return FsPermission.getFileDefault();
472       }
473     }
474     return FsPermission.getFileDefault();
475   }
476 
477   /**
478    * Checks to see if the specified file system is available
479    *
480    * @param fs filesystem
481    * @throws IOException e
482    */
483   public static void checkFileSystemAvailable(final FileSystem fs)
484   throws IOException {
485     if (!(fs instanceof DistributedFileSystem)) {
486       return;
487     }
488     IOException exception = null;
489     DistributedFileSystem dfs = (DistributedFileSystem) fs;
490     try {
491       if (dfs.exists(new Path("/"))) {
492         return;
493       }
494     } catch (IOException e) {
495       exception = e instanceof RemoteException ?
496               ((RemoteException)e).unwrapRemoteException() : e;
497     }
498     try {
499       fs.close();
500     } catch (Exception e) {
501       LOG.error("file system close failed: ", e);
502     }
503     IOException io = new IOException("File system is not available");
504     io.initCause(exception);
505     throw io;
506   }
507 
508   /**
509    * We use reflection because {@link DistributedFileSystem#setSafeMode(
510    * HdfsConstants.SafeModeAction action, boolean isChecked)} is not in hadoop 1.1
511    *
512    * @param dfs
513    * @return whether we're in safe mode
514    * @throws IOException
515    */
516   private static boolean isInSafeMode(DistributedFileSystem dfs) throws IOException {
517     boolean inSafeMode = false;
518     try {
519       Method m = DistributedFileSystem.class.getMethod("setSafeMode", new Class<?> []{
520           org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.class, boolean.class});
521       inSafeMode = (Boolean) m.invoke(dfs,
522         org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET, true);
523     } catch (Exception e) {
524       if (e instanceof IOException) throw (IOException) e;
525 
526       // Check whether dfs is on safemode.
527       inSafeMode = dfs.setSafeMode(
528         org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET);
529     }
530     return inSafeMode;
531   }
532 
533   /**
534    * Check whether dfs is in safemode.
535    * @param conf
536    * @throws IOException
537    */
538   public static void checkDfsSafeMode(final Configuration conf)
539   throws IOException {
540     boolean isInSafeMode = false;
541     FileSystem fs = FileSystem.get(conf);
542     if (fs instanceof DistributedFileSystem) {
543       DistributedFileSystem dfs = (DistributedFileSystem)fs;
544       isInSafeMode = isInSafeMode(dfs);
545     }
546     if (isInSafeMode) {
547       throw new IOException("File system is in safemode, it can't be written now");
548     }
549   }
550 
551   /**
552    * Verifies current version of file system
553    *
554    * @param fs filesystem object
555    * @param rootdir root hbase directory
556    * @return null if no version file exists, version string otherwise.
557    * @throws IOException e
558    * @throws org.apache.hadoop.hbase.exceptions.DeserializationException
559    */
560   public static String getVersion(FileSystem fs, Path rootdir)
561   throws IOException, DeserializationException {
562     Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
563     FileStatus[] status = null;
564     try {
565       // hadoop 2.0 throws FNFE if directory does not exist.
566       // hadoop 1.0 returns null if directory does not exist.
567       status = fs.listStatus(versionFile);
568     } catch (FileNotFoundException fnfe) {
569       return null;
570     }
571     if (status == null || status.length == 0) return null;
572     String version = null;
573     byte [] content = new byte [(int)status[0].getLen()];
574     FSDataInputStream s = fs.open(versionFile);
575     try {
576       IOUtils.readFully(s, content, 0, content.length);
577       if (ProtobufUtil.isPBMagicPrefix(content)) {
578         version = parseVersionFrom(content);
579       } else {
580         // Presume it pre-pb format.
581         InputStream is = new ByteArrayInputStream(content);
582         DataInputStream dis = new DataInputStream(is);
583         try {
584           version = dis.readUTF();
585         } finally {
586           dis.close();
587         }
588       }
589     } catch (EOFException eof) {
590       LOG.warn("Version file was empty, odd, will try to set it.");
591     } finally {
592       s.close();
593     }
594     return version;
595   }
596 
597   /**
598    * Parse the content of the ${HBASE_ROOTDIR}/hbase.version file.
599    * @param bytes The byte content of the hbase.version file.
600    * @return The version found in the file as a String.
601    * @throws DeserializationException
602    */
603   static String parseVersionFrom(final byte [] bytes)
604   throws DeserializationException {
605     ProtobufUtil.expectPBMagicPrefix(bytes);
606     int pblen = ProtobufUtil.lengthOfPBMagic();
607     FSProtos.HBaseVersionFileContent.Builder builder =
608       FSProtos.HBaseVersionFileContent.newBuilder();
609     try {
610       ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
611       return builder.getVersion();
612     } catch (IOException e) {
613       // Convert
614       throw new DeserializationException(e);
615     }
616   }
617 
618   /**
619    * Create the content to write into the ${HBASE_ROOTDIR}/hbase.version file.
620    * @param version Version to persist
621    * @return Serialized protobuf with <code>version</code> content and a bit of pb magic for a prefix.
622    */
623   static byte [] toVersionByteArray(final String version) {
624     FSProtos.HBaseVersionFileContent.Builder builder =
625       FSProtos.HBaseVersionFileContent.newBuilder();
626     return ProtobufUtil.prependPBMagic(builder.setVersion(version).build().toByteArray());
627   }
628 
629   /**
630    * Verifies current version of file system
631    *
632    * @param fs file system
633    * @param rootdir root directory of HBase installation
634    * @param message if true, issues a message on System.out
635    *
636    * @throws IOException e
637    * @throws DeserializationException
638    */
639   public static void checkVersion(FileSystem fs, Path rootdir, boolean message)
640   throws IOException, DeserializationException {
641     checkVersion(fs, rootdir, message, 0, HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
642   }
643 
644   /**
645    * Verifies current version of file system
646    *
647    * @param fs file system
648    * @param rootdir root directory of HBase installation
649    * @param message if true, issues a message on System.out
650    * @param wait wait interval
651    * @param retries number of times to retry
652    *
653    * @throws IOException e
654    * @throws DeserializationException
655    */
656   public static void checkVersion(FileSystem fs, Path rootdir,
657       boolean message, int wait, int retries)
658   throws IOException, DeserializationException {
659     String version = getVersion(fs, rootdir);
660     if (version == null) {
661       if (!metaRegionExists(fs, rootdir)) {
662         // rootDir is empty (no version file and no root region)
663         // just create new version file (HBASE-1195)
664         setVersion(fs, rootdir, wait, retries);
665         return;
666       }
667     } else if (version.compareTo(HConstants.FILE_SYSTEM_VERSION) == 0) return;
668 
669     // version is deprecated require migration
670     // Output on stdout so user sees it in terminal.
671     String msg = "HBase file layout needs to be upgraded."
672       + " You have version " + version
673       + " and I want version " + HConstants.FILE_SYSTEM_VERSION
674       + ". Consult http://hbase.apache.org/book.html for further information about upgrading HBase."
675       + " Is your hbase.rootdir valid? If so, you may need to run "
676       + "'hbase hbck -fixVersionFile'.";
677     if (message) {
678       System.out.println("WARNING! " + msg);
679     }
680     throw new FileSystemVersionException(msg);
681   }
682 
683   /**
684    * Sets version of file system
685    *
686    * @param fs filesystem object
687    * @param rootdir hbase root
688    * @throws IOException e
689    */
690   public static void setVersion(FileSystem fs, Path rootdir)
691   throws IOException {
692     setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, 0,
693       HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
694   }
695 
696   /**
697    * Sets version of file system
698    *
699    * @param fs filesystem object
700    * @param rootdir hbase root
701    * @param wait time to wait for retry
702    * @param retries number of times to retry before failing
703    * @throws IOException e
704    */
705   public static void setVersion(FileSystem fs, Path rootdir, int wait, int retries)
706   throws IOException {
707     setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, wait, retries);
708   }
709 
710 
711   /**
712    * Sets version of file system
713    *
714    * @param fs filesystem object
715    * @param rootdir hbase root directory
716    * @param version version to set
717    * @param wait time to wait for retry
718    * @param retries number of times to retry before throwing an IOException
719    * @throws IOException e
720    */
721   public static void setVersion(FileSystem fs, Path rootdir, String version,
722       int wait, int retries) throws IOException {
723     Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
724     Path tempVersionFile = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY + Path.SEPARATOR +
725       HConstants.VERSION_FILE_NAME);
726     while (true) {
727       try {
728         // Write the version to a temporary file
729         FSDataOutputStream s = fs.create(tempVersionFile);
730         try {
731           s.write(toVersionByteArray(version));
732           s.close();
733           s = null;
734           // Move the temp version file to its normal location. Returns false
735           // if the rename failed. Throw an IOE in that case.
736           if (!fs.rename(tempVersionFile, versionFile)) {
737             throw new IOException("Unable to move temp version file to " + versionFile);
738           }
739         } finally {
740           // Cleaning up the temporary if the rename failed would be trying
741           // too hard. We'll unconditionally create it again the next time
742           // through anyway, files are overwritten by default by create().
743 
744           // Attempt to close the stream on the way out if it is still open.
745           try {
746             if (s != null) s.close();
747           } catch (IOException ignore) { }
748         }
749         LOG.info("Created version file at " + rootdir.toString() + " with version=" + version);
750         return;
751       } catch (IOException e) {
752         if (retries > 0) {
753           LOG.debug("Unable to create version file at " + rootdir.toString() + ", retrying", e);
754           fs.delete(versionFile, false);
755           try {
756             if (wait > 0) {
757               Thread.sleep(wait);
758             }
759           } catch (InterruptedException ie) {
760             throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
761           }
762           retries--;
763         } else {
764           throw e;
765         }
766       }
767     }
768   }
769 
770   /**
771    * Checks that a cluster ID file exists in the HBase root directory
772    * @param fs the root directory FileSystem
773    * @param rootdir the HBase root directory in HDFS
774    * @param wait how long to wait between retries
775    * @return <code>true</code> if the file exists, otherwise <code>false</code>
776    * @throws IOException if checking the FileSystem fails
777    */
778   public static boolean checkClusterIdExists(FileSystem fs, Path rootdir,
779       int wait) throws IOException {
780     while (true) {
781       try {
782         Path filePath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
783         return fs.exists(filePath);
784       } catch (IOException ioe) {
785         if (wait > 0) {
786           LOG.warn("Unable to check cluster ID file in " + rootdir.toString() +
787               ", retrying in "+wait+"msec: "+StringUtils.stringifyException(ioe));
788           try {
789             Thread.sleep(wait);
790           } catch (InterruptedException e) {
791             throw (InterruptedIOException)new InterruptedIOException().initCause(e);
792           }
793         } else {
794           throw ioe;
795         }
796       }
797     }
798   }
799 
800   /**
801    * Returns the value of the unique cluster ID stored for this HBase instance.
802    * @param fs the root directory FileSystem
803    * @param rootdir the path to the HBase root directory
804    * @return the unique cluster identifier
805    * @throws IOException if reading the cluster ID file fails
806    */
807   public static ClusterId getClusterId(FileSystem fs, Path rootdir)
808   throws IOException {
809     Path idPath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
810     ClusterId clusterId = null;
811     FileStatus status = fs.exists(idPath)? fs.getFileStatus(idPath):  null;
812     if (status != null) {
813       int len = Ints.checkedCast(status.getLen());
814       byte [] content = new byte[len];
815       FSDataInputStream in = fs.open(idPath);
816       try {
817         in.readFully(content);
818       } catch (EOFException eof) {
819         LOG.warn("Cluster ID file " + idPath.toString() + " was empty");
820       } finally{
821         in.close();
822       }
823       try {
824         clusterId = ClusterId.parseFrom(content);
825       } catch (DeserializationException e) {
826         throw new IOException("content=" + Bytes.toString(content), e);
827       }
828       // If not pb'd, make it so.
829       if (!ProtobufUtil.isPBMagicPrefix(content)) {
830         String cid = null;
831         in = fs.open(idPath);
832         try {
833           cid = in.readUTF();
834           clusterId = new ClusterId(cid);
835         } catch (EOFException eof) {
836           LOG.warn("Cluster ID file " + idPath.toString() + " was empty");
837         } finally {
838           in.close();
839         }
840         rewriteAsPb(fs, rootdir, idPath, clusterId);
841       }
842       return clusterId;
843     } else {
844       LOG.warn("Cluster ID file does not exist at " + idPath.toString());
845     }
846     return clusterId;
847   }
848 
849   /**
850    * @param cid
851    * @throws IOException
852    */
853   private static void rewriteAsPb(final FileSystem fs, final Path rootdir, final Path p,
854       final ClusterId cid)
855   throws IOException {
856     // Rewrite the file as pb.  Move aside the old one first, write new
857     // then delete the moved-aside file.
858     Path movedAsideName = new Path(p + "." + System.currentTimeMillis());
859     if (!fs.rename(p, movedAsideName)) throw new IOException("Failed rename of " + p);
860     setClusterId(fs, rootdir, cid, 100);
861     if (!fs.delete(movedAsideName, false)) {
862       throw new IOException("Failed delete of " + movedAsideName);
863     }
864     LOG.debug("Rewrote the hbase.id file as pb");
865   }
866 
867   /**
868    * Writes a new unique identifier for this cluster to the "hbase.id" file
869    * in the HBase root directory
870    * @param fs the root directory FileSystem
871    * @param rootdir the path to the HBase root directory
872    * @param clusterId the unique identifier to store
873    * @param wait how long (in milliseconds) to wait between retries
874    * @throws IOException if writing to the FileSystem fails and no wait value
875    */
876   public static void setClusterId(FileSystem fs, Path rootdir, ClusterId clusterId,
877       int wait) throws IOException {
878     while (true) {
879       try {
880         Path idFile = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
881         Path tempIdFile = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY +
882           Path.SEPARATOR + HConstants.CLUSTER_ID_FILE_NAME);
883         // Write the id file to a temporary location
884         FSDataOutputStream s = fs.create(tempIdFile);
885         try {
886           s.write(clusterId.toByteArray());
887           s.close();
888           s = null;
889           // Move the temporary file to its normal location. Throw an IOE if
890           // the rename failed
891           if (!fs.rename(tempIdFile, idFile)) {
892             throw new IOException("Unable to move temp version file to " + idFile);
893           }
894         } finally {
895           // Attempt to close the stream if still open on the way out
896           try {
897             if (s != null) s.close();
898           } catch (IOException ignore) { }
899         }
900         if (LOG.isDebugEnabled()) {
901           LOG.debug("Created cluster ID file at " + idFile.toString() + " with ID: " + clusterId);
902         }
903         return;
904       } catch (IOException ioe) {
905         if (wait > 0) {
906           LOG.warn("Unable to create cluster ID file in " + rootdir.toString() +
907               ", retrying in " + wait + "msec: " + StringUtils.stringifyException(ioe));
908           try {
909             Thread.sleep(wait);
910           } catch (InterruptedException e) {
911             throw (InterruptedIOException)new InterruptedIOException().initCause(e);
912           }
913         } else {
914           throw ioe;
915         }
916       }
917     }
918   }
919 
920   /**
921    * Verifies root directory path is a valid URI with a scheme
922    *
923    * @param root root directory path
924    * @return Passed <code>root</code> argument.
925    * @throws IOException if not a valid URI with a scheme
926    */
927   public static Path validateRootPath(Path root) throws IOException {
928     try {
929       URI rootURI = new URI(root.toString());
930       String scheme = rootURI.getScheme();
931       if (scheme == null) {
932         throw new IOException("Root directory does not have a scheme");
933       }
934       return root;
935     } catch (URISyntaxException e) {
936       IOException io = new IOException("Root directory path is not a valid " +
937         "URI -- check your " + HConstants.HBASE_DIR + " configuration");
938       io.initCause(e);
939       throw io;
940     }
941   }
942 
943   /**
944    * Checks for the presence of the root path (using the provided conf object) in the given path. If
945    * it exists, this method removes it and returns the String representation of remaining relative path.
946    * @param path
947    * @param conf
948    * @return String representation of the remaining relative path
949    * @throws IOException
950    */
951   public static String removeRootPath(Path path, final Configuration conf) throws IOException {
952     Path root = FSUtils.getRootDir(conf);
953     String pathStr = path.toString();
954     // check that the path is absolute... it has the root path in it.
955     if (!pathStr.startsWith(root.toString())) return pathStr;
956     // if not, return as it is.
957     return pathStr.substring(root.toString().length() + 1);// remove the "/" too.
958   }
959 
960   /**
961    * If DFS, check safe mode and if so, wait until we clear it.
962    * @param conf configuration
963    * @param wait Sleep between retries
964    * @throws IOException e
965    */
966   public static void waitOnSafeMode(final Configuration conf,
967     final long wait)
968   throws IOException {
969     FileSystem fs = FileSystem.get(conf);
970     if (!(fs instanceof DistributedFileSystem)) return;
971     DistributedFileSystem dfs = (DistributedFileSystem)fs;
972     // Make sure dfs is not in safe mode
973     while (isInSafeMode(dfs)) {
974       LOG.info("Waiting for dfs to exit safe mode...");
975       try {
976         Thread.sleep(wait);
977       } catch (InterruptedException e) {
978         throw (InterruptedIOException)new InterruptedIOException().initCause(e);
979       }
980     }
981   }
982 
983   /**
984    * Return the 'path' component of a Path.  In Hadoop, Path is an URI.  This
985    * method returns the 'path' component of a Path's URI: e.g. If a Path is
986    * <code>hdfs://example.org:9000/hbase_trunk/TestTable/compaction.dir</code>,
987    * this method returns <code>/hbase_trunk/TestTable/compaction.dir</code>.
988    * This method is useful if you want to print out a Path without qualifying
989    * Filesystem instance.
990    * @param p Filesystem Path whose 'path' component we are to return.
991    * @return Path portion of the Filesystem
992    */
993   public static String getPath(Path p) {
994     return p.toUri().getPath();
995   }
996 
997   /**
998    * @param c configuration
999    * @return Path to hbase root directory: i.e. <code>hbase.rootdir</code> from
1000    * configuration as a qualified Path.
1001    * @throws IOException e
1002    */
1003   public static Path getRootDir(final Configuration c) throws IOException {
1004     Path p = new Path(c.get(HConstants.HBASE_DIR));
1005     FileSystem fs = p.getFileSystem(c);
1006     return p.makeQualified(fs);
1007   }
1008 
1009   public static void setRootDir(final Configuration c, final Path root) throws IOException {
1010     c.set(HConstants.HBASE_DIR, root.toString());
1011   }
1012 
1013   public static void setFsDefault(final Configuration c, final Path root) throws IOException {
1014     c.set("fs.defaultFS", root.toString());    // for hadoop 0.21+
1015   }
1016 
1017   /**
1018    * Checks if meta region exists
1019    *
1020    * @param fs file system
1021    * @param rootdir root directory of HBase installation
1022    * @return true if exists
1023    * @throws IOException e
1024    */
1025   @SuppressWarnings("deprecation")
1026   public static boolean metaRegionExists(FileSystem fs, Path rootdir)
1027   throws IOException {
1028     Path metaRegionDir =
1029       HRegion.getRegionDir(rootdir, HRegionInfo.FIRST_META_REGIONINFO);
1030     return fs.exists(metaRegionDir);
1031   }
1032 
1033   /**
1034    * Compute HDFS blocks distribution of a given file, or a portion of the file
1035    * @param fs file system
1036    * @param status file status of the file
1037    * @param start start position of the portion
1038    * @param length length of the portion
1039    * @return The HDFS blocks distribution
1040    */
1041   static public HDFSBlocksDistribution computeHDFSBlocksDistribution(
1042     final FileSystem fs, FileStatus status, long start, long length)
1043     throws IOException {
1044     HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution();
1045     BlockLocation [] blockLocations =
1046       fs.getFileBlockLocations(status, start, length);
1047     for(BlockLocation bl : blockLocations) {
1048       String [] hosts = bl.getHosts();
1049       long len = bl.getLength();
1050       blocksDistribution.addHostsAndBlockWeight(hosts, len);
1051     }
1052 
1053     return blocksDistribution;
1054   }
1055 
1056 
1057 
1058   /**
1059    * Runs through the hbase rootdir and checks all stores have only
1060    * one file in them -- that is, they've been major compacted.  Looks
1061    * at root and meta tables too.
1062    * @param fs filesystem
1063    * @param hbaseRootDir hbase root directory
1064    * @return True if this hbase install is major compacted.
1065    * @throws IOException e
1066    */
1067   public static boolean isMajorCompacted(final FileSystem fs,
1068       final Path hbaseRootDir)
1069   throws IOException {
1070     List<Path> tableDirs = getTableDirs(fs, hbaseRootDir);
1071     PathFilter regionFilter = new RegionDirFilter(fs);
1072     PathFilter familyFilter = new FamilyDirFilter(fs);
1073     for (Path d : tableDirs) {
1074       FileStatus[] regionDirs = fs.listStatus(d, regionFilter);
1075       for (FileStatus regionDir : regionDirs) {
1076         Path dd = regionDir.getPath();
1077         // Else its a region name.  Now look in region for families.
1078         FileStatus[] familyDirs = fs.listStatus(dd, familyFilter);
1079         for (FileStatus familyDir : familyDirs) {
1080           Path family = familyDir.getPath();
1081           // Now in family make sure only one file.
1082           FileStatus[] familyStatus = fs.listStatus(family);
1083           if (familyStatus.length > 1) {
1084             LOG.debug(family.toString() + " has " + familyStatus.length +
1085                 " files.");
1086             return false;
1087           }
1088         }
1089       }
1090     }
1091     return true;
1092   }
1093 
1094   // TODO move this method OUT of FSUtils. No dependencies to HMaster
1095   /**
1096    * Returns the total overall fragmentation percentage. Includes hbase:meta and
1097    * -ROOT- as well.
1098    *
1099    * @param master  The master defining the HBase root and file system.
1100    * @return A map for each table and its percentage.
1101    * @throws IOException When scanning the directory fails.
1102    */
1103   public static int getTotalTableFragmentation(final HMaster master)
1104   throws IOException {
1105     Map<String, Integer> map = getTableFragmentation(master);
1106     return map != null && map.size() > 0 ? map.get("-TOTAL-") : -1;
1107   }
1108 
1109   /**
1110    * Runs through the HBase rootdir and checks how many stores for each table
1111    * have more than one file in them. Checks -ROOT- and hbase:meta too. The total
1112    * percentage across all tables is stored under the special key "-TOTAL-".
1113    *
1114    * @param master  The master defining the HBase root and file system.
1115    * @return A map for each table and its percentage.
1116    *
1117    * @throws IOException When scanning the directory fails.
1118    */
1119   public static Map<String, Integer> getTableFragmentation(
1120     final HMaster master)
1121   throws IOException {
1122     Path path = getRootDir(master.getConfiguration());
1123     // since HMaster.getFileSystem() is package private
1124     FileSystem fs = path.getFileSystem(master.getConfiguration());
1125     return getTableFragmentation(fs, path);
1126   }
1127 
1128   /**
1129    * Runs through the HBase rootdir and checks how many stores for each table
1130    * have more than one file in them. Checks -ROOT- and hbase:meta too. The total
1131    * percentage across all tables is stored under the special key "-TOTAL-".
1132    *
1133    * @param fs  The file system to use.
1134    * @param hbaseRootDir  The root directory to scan.
1135    * @return A map for each table and its percentage.
1136    * @throws IOException When scanning the directory fails.
1137    */
1138   public static Map<String, Integer> getTableFragmentation(
1139     final FileSystem fs, final Path hbaseRootDir)
1140   throws IOException {
1141     Map<String, Integer> frags = new HashMap<String, Integer>();
1142     int cfCountTotal = 0;
1143     int cfFragTotal = 0;
1144     PathFilter regionFilter = new RegionDirFilter(fs);
1145     PathFilter familyFilter = new FamilyDirFilter(fs);
1146     List<Path> tableDirs = getTableDirs(fs, hbaseRootDir);
1147     for (Path d : tableDirs) {
1148       int cfCount = 0;
1149       int cfFrag = 0;
1150       FileStatus[] regionDirs = fs.listStatus(d, regionFilter);
1151       for (FileStatus regionDir : regionDirs) {
1152         Path dd = regionDir.getPath();
1153         // else its a region name, now look in region for families
1154         FileStatus[] familyDirs = fs.listStatus(dd, familyFilter);
1155         for (FileStatus familyDir : familyDirs) {
1156           cfCount++;
1157           cfCountTotal++;
1158           Path family = familyDir.getPath();
1159           // now in family make sure only one file
1160           FileStatus[] familyStatus = fs.listStatus(family);
1161           if (familyStatus.length > 1) {
1162             cfFrag++;
1163             cfFragTotal++;
1164           }
1165         }
1166       }
1167       // compute percentage per table and store in result list
1168       frags.put(FSUtils.getTableName(d).getNameAsString(),
1169         cfCount == 0? 0: Math.round((float) cfFrag / cfCount * 100));
1170     }
1171     // set overall percentage for all tables
1172     frags.put("-TOTAL-",
1173       cfCountTotal == 0? 0: Math.round((float) cfFragTotal / cfCountTotal * 100));
1174     return frags;
1175   }
1176 
1177   /**
1178    * Returns the {@link org.apache.hadoop.fs.Path} object representing the table directory under
1179    * path rootdir
1180    *
1181    * @param rootdir qualified path of HBase root directory
1182    * @param tableName name of table
1183    * @return {@link org.apache.hadoop.fs.Path} for table
1184    */
1185   public static Path getTableDir(Path rootdir, final TableName tableName) {
1186     return new Path(getNamespaceDir(rootdir, tableName.getNamespaceAsString()),
1187         tableName.getQualifierAsString());
1188   }
1189 
1190   /**
1191    * Returns the {@link org.apache.hadoop.hbase.TableName} object representing
1192    * the table directory under
1193    * path rootdir
1194    *
1195    * @param tablePath path of table
1196    * @return {@link org.apache.hadoop.fs.Path} for table
1197    */
1198   public static TableName getTableName(Path tablePath) {
1199     return TableName.valueOf(tablePath.getParent().getName(), tablePath.getName());
1200   }
1201 
1202   /**
1203    * Returns the {@link org.apache.hadoop.fs.Path} object representing
1204    * the namespace directory under path rootdir
1205    *
1206    * @param rootdir qualified path of HBase root directory
1207    * @param namespace namespace name
1208    * @return {@link org.apache.hadoop.fs.Path} for table
1209    */
1210   public static Path getNamespaceDir(Path rootdir, final String namespace) {
1211     return new Path(rootdir, new Path(HConstants.BASE_NAMESPACE_DIR,
1212         new Path(namespace)));
1213   }
1214 
1215   /**
1216    * A {@link PathFilter} that returns only regular files.
1217    */
1218   static class FileFilter implements PathFilter {
1219     private final FileSystem fs;
1220 
1221     public FileFilter(final FileSystem fs) {
1222       this.fs = fs;
1223     }
1224 
1225     @Override
1226     public boolean accept(Path p) {
1227       try {
1228         return fs.isFile(p);
1229       } catch (IOException e) {
1230         LOG.debug("unable to verify if path=" + p + " is a regular file", e);
1231         return false;
1232       }
1233     }
1234   }
1235 
1236   /**
1237    * Directory filter that doesn't include any of the directories in the specified blacklist
1238    */
1239   public static class BlackListDirFilter implements PathFilter {
1240     private final FileSystem fs;
1241     private List<String> blacklist;
1242 
1243     /**
1244      * Create a filter on the givem filesystem with the specified blacklist
1245      * @param fs filesystem to filter
1246      * @param directoryNameBlackList list of the names of the directories to filter. If
1247      *          <tt>null</tt>, all directories are returned
1248      */
1249     @SuppressWarnings("unchecked")
1250     public BlackListDirFilter(final FileSystem fs, final List<String> directoryNameBlackList) {
1251       this.fs = fs;
1252       blacklist =
1253         (List<String>) (directoryNameBlackList == null ? Collections.emptyList()
1254           : directoryNameBlackList);
1255     }
1256 
1257     @Override
1258     public boolean accept(Path p) {
1259       boolean isValid = false;
1260       try {
1261         if (isValidName(p.getName())) {
1262           isValid = fs.getFileStatus(p).isDirectory();
1263         } else {
1264           isValid = false;
1265         }
1266       } catch (IOException e) {
1267         LOG.warn("An error occurred while verifying if [" + p.toString()
1268             + "] is a valid directory. Returning 'not valid' and continuing.", e);
1269       }
1270       return isValid;
1271     }
1272 
1273     protected boolean isValidName(final String name) {
1274       return !blacklist.contains(name);
1275     }
1276   }
1277 
1278   /**
1279    * A {@link PathFilter} that only allows directories.
1280    */
1281   public static class DirFilter extends BlackListDirFilter {
1282 
1283     public DirFilter(FileSystem fs) {
1284       super(fs, null);
1285     }
1286   }
1287 
1288   /**
1289    * A {@link PathFilter} that returns usertable directories. To get all directories use the
1290    * {@link BlackListDirFilter} with a <tt>null</tt> blacklist
1291    */
1292   public static class UserTableDirFilter extends BlackListDirFilter {
1293     public UserTableDirFilter(FileSystem fs) {
1294       super(fs, HConstants.HBASE_NON_TABLE_DIRS);
1295     }
1296 
1297     protected boolean isValidName(final String name) {
1298       if (!super.isValidName(name))
1299         return false;
1300 
1301       try {
1302         TableName.isLegalTableQualifierName(Bytes.toBytes(name));
1303       } catch (IllegalArgumentException e) {
1304         LOG.info("INVALID NAME " + name);
1305         return false;
1306       }
1307       return true;
1308     }
1309   }
1310 
1311   /**
1312    * Heuristic to determine whether is safe or not to open a file for append
1313    * Looks both for dfs.support.append and use reflection to search
1314    * for SequenceFile.Writer.syncFs() or FSDataOutputStream.hflush()
1315    * @param conf
1316    * @return True if append support
1317    */
1318   public static boolean isAppendSupported(final Configuration conf) {
1319     boolean append = conf.getBoolean("dfs.support.append", false);
1320     if (append) {
1321       try {
1322         // TODO: The implementation that comes back when we do a createWriter
1323         // may not be using SequenceFile so the below is not a definitive test.
1324         // Will do for now (hdfs-200).
1325         SequenceFile.Writer.class.getMethod("syncFs", new Class<?> []{});
1326         append = true;
1327       } catch (SecurityException e) {
1328       } catch (NoSuchMethodException e) {
1329         append = false;
1330       }
1331     }
1332     if (!append) {
1333       // Look for the 0.21, 0.22, new-style append evidence.
1334       try {
1335         FSDataOutputStream.class.getMethod("hflush", new Class<?> []{});
1336         append = true;
1337       } catch (NoSuchMethodException e) {
1338         append = false;
1339       }
1340     }
1341     return append;
1342   }
1343 
1344   /**
1345    * @param conf
1346    * @return True if this filesystem whose scheme is 'hdfs'.
1347    * @throws IOException
1348    */
1349   public static boolean isHDFS(final Configuration conf) throws IOException {
1350     FileSystem fs = FileSystem.get(conf);
1351     String scheme = fs.getUri().getScheme();
1352     return scheme.equalsIgnoreCase("hdfs");
1353   }
1354 
1355   /**
1356    * Recover file lease. Used when a file might be suspect
1357    * to be had been left open by another process.
1358    * @param fs FileSystem handle
1359    * @param p Path of file to recover lease
1360    * @param conf Configuration handle
1361    * @throws IOException
1362    */
1363   public abstract void recoverFileLease(final FileSystem fs, final Path p,
1364       Configuration conf, CancelableProgressable reporter) throws IOException;
1365 
1366   public static List<Path> getTableDirs(final FileSystem fs, final Path rootdir)
1367       throws IOException {
1368     List<Path> tableDirs = new LinkedList<Path>();
1369 
1370     for(FileStatus status :
1371         fs.globStatus(new Path(rootdir,
1372             new Path(HConstants.BASE_NAMESPACE_DIR, "*")))) {
1373       tableDirs.addAll(FSUtils.getLocalTableDirs(fs, status.getPath()));
1374     }
1375     return tableDirs;
1376   }
1377 
1378   /**
1379    * @param fs
1380    * @param rootdir
1381    * @return All the table directories under <code>rootdir</code>. Ignore non table hbase folders such as
1382    * .logs, .oldlogs, .corrupt folders.
1383    * @throws IOException
1384    */
1385   public static List<Path> getLocalTableDirs(final FileSystem fs, final Path rootdir)
1386       throws IOException {
1387     // presumes any directory under hbase.rootdir is a table
1388     FileStatus[] dirs = fs.listStatus(rootdir, new UserTableDirFilter(fs));
1389     List<Path> tabledirs = new ArrayList<Path>(dirs.length);
1390     for (FileStatus dir: dirs) {
1391       tabledirs.add(dir.getPath());
1392     }
1393     return tabledirs;
1394   }
1395 
1396   /**
1397    * Checks if the given path is the one with 'recovered.edits' dir.
1398    * @param path
1399    * @return True if we recovered edits
1400    */
1401   public static boolean isRecoveredEdits(Path path) {
1402     return path.toString().contains(HConstants.RECOVERED_EDITS_DIR);
1403   }
1404 
1405   /**
1406    * Filter for all dirs that don't start with '.'
1407    */
1408   public static class RegionDirFilter implements PathFilter {
1409     // This pattern will accept 0.90+ style hex region dirs and older numeric region dir names.
1410     final public static Pattern regionDirPattern = Pattern.compile("^[0-9a-f]*$");
1411     final FileSystem fs;
1412 
1413     public RegionDirFilter(FileSystem fs) {
1414       this.fs = fs;
1415     }
1416 
1417     @Override
1418     public boolean accept(Path rd) {
1419       if (!regionDirPattern.matcher(rd.getName()).matches()) {
1420         return false;
1421       }
1422 
1423       try {
1424         return fs.getFileStatus(rd).isDirectory();
1425       } catch (IOException ioe) {
1426         // Maybe the file was moved or the fs was disconnected.
1427         LOG.warn("Skipping file " + rd +" due to IOException", ioe);
1428         return false;
1429       }
1430     }
1431   }
1432 
1433   /**
1434    * Given a particular table dir, return all the regiondirs inside it, excluding files such as
1435    * .tableinfo
1436    * @param fs A file system for the Path
1437    * @param tableDir Path to a specific table directory &lt;hbase.rootdir&gt;/&lt;tabledir&gt;
1438    * @return List of paths to valid region directories in table dir.
1439    * @throws IOException
1440    */
1441   public static List<Path> getRegionDirs(final FileSystem fs, final Path tableDir) throws IOException {
1442     // assumes we are in a table dir.
1443     FileStatus[] rds = fs.listStatus(tableDir, new RegionDirFilter(fs));
1444     List<Path> regionDirs = new ArrayList<Path>(rds.length);
1445     for (FileStatus rdfs: rds) {
1446       Path rdPath = rdfs.getPath();
1447       regionDirs.add(rdPath);
1448     }
1449     return regionDirs;
1450   }
1451 
1452   /**
1453    * Filter for all dirs that are legal column family names.  This is generally used for colfam
1454    * dirs &lt;hbase.rootdir&gt;/&lt;tabledir&gt;/&lt;regiondir&gt;/&lt;colfamdir&gt;.
1455    */
1456   public static class FamilyDirFilter implements PathFilter {
1457     final FileSystem fs;
1458 
1459     public FamilyDirFilter(FileSystem fs) {
1460       this.fs = fs;
1461     }
1462 
1463     @Override
1464     public boolean accept(Path rd) {
1465       try {
1466         // throws IAE if invalid
1467         HColumnDescriptor.isLegalFamilyName(Bytes.toBytes(rd.getName()));
1468       } catch (IllegalArgumentException iae) {
1469         // path name is an invalid family name and thus is excluded.
1470         return false;
1471       }
1472 
1473       try {
1474         return fs.getFileStatus(rd).isDirectory();
1475       } catch (IOException ioe) {
1476         // Maybe the file was moved or the fs was disconnected.
1477         LOG.warn("Skipping file " + rd +" due to IOException", ioe);
1478         return false;
1479       }
1480     }
1481   }
1482 
1483   /**
1484    * Given a particular region dir, return all the familydirs inside it
1485    *
1486    * @param fs A file system for the Path
1487    * @param regionDir Path to a specific region directory
1488    * @return List of paths to valid family directories in region dir.
1489    * @throws IOException
1490    */
1491   public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir) throws IOException {
1492     // assumes we are in a region dir.
1493     FileStatus[] fds = fs.listStatus(regionDir, new FamilyDirFilter(fs));
1494     List<Path> familyDirs = new ArrayList<Path>(fds.length);
1495     for (FileStatus fdfs: fds) {
1496       Path fdPath = fdfs.getPath();
1497       familyDirs.add(fdPath);
1498     }
1499     return familyDirs;
1500   }
1501 
1502   public static List<Path> getReferenceFilePaths(final FileSystem fs, final Path familyDir) throws IOException {
1503     FileStatus[] fds = fs.listStatus(familyDir, new ReferenceFileFilter(fs));
1504     List<Path> referenceFiles = new ArrayList<Path>(fds.length);
1505     for (FileStatus fdfs: fds) {
1506       Path fdPath = fdfs.getPath();
1507       referenceFiles.add(fdPath);
1508     }
1509     return referenceFiles;
1510   }
1511 
1512   /**
1513    * Filter for HFiles that excludes reference files.
1514    */
1515   public static class HFileFilter implements PathFilter {
1516     final FileSystem fs;
1517 
1518     public HFileFilter(FileSystem fs) {
1519       this.fs = fs;
1520     }
1521 
1522     @Override
1523     public boolean accept(Path rd) {
1524       try {
1525         // only files
1526         return !fs.getFileStatus(rd).isDirectory() && StoreFileInfo.isHFile(rd);
1527       } catch (IOException ioe) {
1528         // Maybe the file was moved or the fs was disconnected.
1529         LOG.warn("Skipping file " + rd +" due to IOException", ioe);
1530         return false;
1531       }
1532     }
1533   }
1534 
1535   public static class ReferenceFileFilter implements PathFilter {
1536 
1537     private final FileSystem fs;
1538 
1539     public ReferenceFileFilter(FileSystem fs) {
1540       this.fs = fs;
1541     }
1542 
1543     @Override
1544     public boolean accept(Path rd) {
1545       try {
1546         // only files can be references.
1547         return !fs.getFileStatus(rd).isDirectory() && StoreFileInfo.isReference(rd);
1548       } catch (IOException ioe) {
1549         // Maybe the file was moved or the fs was disconnected.
1550         LOG.warn("Skipping file " + rd +" due to IOException", ioe);
1551         return false;
1552       }
1553     }
1554   }
1555 
1556 
1557   /**
1558    * @param conf
1559    * @return Returns the filesystem of the hbase rootdir.
1560    * @throws IOException
1561    */
1562   public static FileSystem getCurrentFileSystem(Configuration conf)
1563   throws IOException {
1564     return getRootDir(conf).getFileSystem(conf);
1565   }
1566 
1567 
1568   /**
1569    * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1570    * table StoreFile names to the full Path.
1571    * <br>
1572    * Example...<br>
1573    * Key = 3944417774205889744  <br>
1574    * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1575    *
1576    * @param map map to add values.  If null, this method will create and populate one to return
1577    * @param fs  The file system to use.
1578    * @param hbaseRootDir  The root directory to scan.
1579    * @param tableName name of the table to scan.
1580    * @return Map keyed by StoreFile name with a value of the full Path.
1581    * @throws IOException When scanning the directory fails.
1582    */
1583   public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map,
1584   final FileSystem fs, final Path hbaseRootDir, TableName tableName)
1585   throws IOException {
1586     return getTableStoreFilePathMap(map, fs, hbaseRootDir, tableName, null);
1587   }
1588 
1589   /**
1590    * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1591    * table StoreFile names to the full Path.
1592    * <br>
1593    * Example...<br>
1594    * Key = 3944417774205889744  <br>
1595    * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1596    *
1597    * @param map map to add values.  If null, this method will create and populate one to return
1598    * @param fs  The file system to use.
1599    * @param hbaseRootDir  The root directory to scan.
1600    * @param tableName name of the table to scan.
1601    * @param errors ErrorReporter instance or null
1602    * @return Map keyed by StoreFile name with a value of the full Path.
1603    * @throws IOException When scanning the directory fails.
1604    */
1605   public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map,
1606   final FileSystem fs, final Path hbaseRootDir, TableName tableName, ErrorReporter errors)
1607   throws IOException {
1608     if (map == null) {
1609       map = new HashMap<String, Path>();
1610     }
1611 
1612     // only include the directory paths to tables
1613     Path tableDir = FSUtils.getTableDir(hbaseRootDir, tableName);
1614     // Inside a table, there are compaction.dir directories to skip.  Otherwise, all else
1615     // should be regions.
1616     PathFilter familyFilter = new FamilyDirFilter(fs);
1617     FileStatus[] regionDirs = fs.listStatus(tableDir, new RegionDirFilter(fs));
1618     for (FileStatus regionDir : regionDirs) {
1619       if (null != errors) {
1620         errors.progress();
1621       }
1622       Path dd = regionDir.getPath();
1623       // else its a region name, now look in region for families
1624       FileStatus[] familyDirs = fs.listStatus(dd, familyFilter);
1625       for (FileStatus familyDir : familyDirs) {
1626         if (null != errors) {
1627           errors.progress();
1628         }
1629         Path family = familyDir.getPath();
1630         if (family.getName().equals(HConstants.RECOVERED_EDITS_DIR)) {
1631           continue;
1632         }
1633         // now in family, iterate over the StoreFiles and
1634         // put in map
1635         FileStatus[] familyStatus = fs.listStatus(family);
1636         for (FileStatus sfStatus : familyStatus) {
1637           if (null != errors) {
1638             errors.progress();
1639           }
1640           Path sf = sfStatus.getPath();
1641           map.put( sf.getName(), sf);
1642         }
1643       }
1644     }
1645     return map;
1646   }
1647 
1648   public static int getRegionReferenceFileCount(final FileSystem fs, final Path p) {
1649     int result = 0;
1650     try {
1651       for (Path familyDir:getFamilyDirs(fs, p)){
1652         result += getReferenceFilePaths(fs, familyDir).size();
1653       }
1654     } catch (IOException e) {
1655       LOG.warn("Error Counting reference files.", e);
1656     }
1657     return result;
1658   }
1659 
1660   /**
1661    * Runs through the HBase rootdir and creates a reverse lookup map for
1662    * table StoreFile names to the full Path.
1663    * <br>
1664    * Example...<br>
1665    * Key = 3944417774205889744  <br>
1666    * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1667    *
1668    * @param fs  The file system to use.
1669    * @param hbaseRootDir  The root directory to scan.
1670    * @return Map keyed by StoreFile name with a value of the full Path.
1671    * @throws IOException When scanning the directory fails.
1672    */
1673   public static Map<String, Path> getTableStoreFilePathMap(
1674     final FileSystem fs, final Path hbaseRootDir)
1675   throws IOException {
1676     return getTableStoreFilePathMap(fs, hbaseRootDir, null);
1677   }
1678 
1679   /**
1680    * Runs through the HBase rootdir and creates a reverse lookup map for
1681    * table StoreFile names to the full Path.
1682    * <br>
1683    * Example...<br>
1684    * Key = 3944417774205889744  <br>
1685    * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1686    *
1687    * @param fs  The file system to use.
1688    * @param hbaseRootDir  The root directory to scan.
1689    * @param errors ErrorReporter instance or null
1690    * @return Map keyed by StoreFile name with a value of the full Path.
1691    * @throws IOException When scanning the directory fails.
1692    */
1693   public static Map<String, Path> getTableStoreFilePathMap(
1694     final FileSystem fs, final Path hbaseRootDir, ErrorReporter errors)
1695   throws IOException {
1696     Map<String, Path> map = new HashMap<String, Path>();
1697 
1698     // if this method looks similar to 'getTableFragmentation' that is because
1699     // it was borrowed from it.
1700 
1701     // only include the directory paths to tables
1702     for (Path tableDir : FSUtils.getTableDirs(fs, hbaseRootDir)) {
1703       getTableStoreFilePathMap(map, fs, hbaseRootDir,
1704           FSUtils.getTableName(tableDir), errors);
1705     }
1706     return map;
1707   }
1708 
1709   /**
1710    * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
1711    * This accommodates differences between hadoop versions, where hadoop 1
1712    * does not throw a FileNotFoundException, and return an empty FileStatus[]
1713    * while Hadoop 2 will throw FileNotFoundException.
1714    *
1715    * @param fs file system
1716    * @param dir directory
1717    * @param filter path filter
1718    * @return null if dir is empty or doesn't exist, otherwise FileStatus array
1719    */
1720   public static FileStatus [] listStatus(final FileSystem fs,
1721       final Path dir, final PathFilter filter) throws IOException {
1722     FileStatus [] status = null;
1723     try {
1724       status = filter == null ? fs.listStatus(dir) : fs.listStatus(dir, filter);
1725     } catch (FileNotFoundException fnfe) {
1726       // if directory doesn't exist, return null
1727       if (LOG.isTraceEnabled()) {
1728         LOG.trace(dir + " doesn't exist");
1729       }
1730     }
1731     if (status == null || status.length < 1) return null;
1732     return status;
1733   }
1734 
1735   /**
1736    * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
1737    * This would accommodates differences between hadoop versions
1738    *
1739    * @param fs file system
1740    * @param dir directory
1741    * @return null if dir is empty or doesn't exist, otherwise FileStatus array
1742    */
1743   public static FileStatus[] listStatus(final FileSystem fs, final Path dir) throws IOException {
1744     return listStatus(fs, dir, null);
1745   }
1746 
1747   /**
1748    * Calls fs.delete() and returns the value returned by the fs.delete()
1749    *
1750    * @param fs
1751    * @param path
1752    * @param recursive
1753    * @return the value returned by the fs.delete()
1754    * @throws IOException
1755    */
1756   public static boolean delete(final FileSystem fs, final Path path, final boolean recursive)
1757       throws IOException {
1758     return fs.delete(path, recursive);
1759   }
1760 
1761   /**
1762    * Calls fs.exists(). Checks if the specified path exists
1763    *
1764    * @param fs
1765    * @param path
1766    * @return the value returned by fs.exists()
1767    * @throws IOException
1768    */
1769   public static boolean isExists(final FileSystem fs, final Path path) throws IOException {
1770     return fs.exists(path);
1771   }
1772 
1773   /**
1774    * Throw an exception if an action is not permitted by a user on a file.
1775    *
1776    * @param ugi
1777    *          the user
1778    * @param file
1779    *          the file
1780    * @param action
1781    *          the action
1782    */
1783   public static void checkAccess(UserGroupInformation ugi, FileStatus file,
1784       FsAction action) throws AccessDeniedException {
1785     if (ugi.getShortUserName().equals(file.getOwner())) {
1786       if (file.getPermission().getUserAction().implies(action)) {
1787         return;
1788       }
1789     } else if (contains(ugi.getGroupNames(), file.getGroup())) {
1790       if (file.getPermission().getGroupAction().implies(action)) {
1791         return;
1792       }
1793     } else if (file.getPermission().getOtherAction().implies(action)) {
1794       return;
1795     }
1796     throw new AccessDeniedException("Permission denied:" + " action=" + action
1797         + " path=" + file.getPath() + " user=" + ugi.getShortUserName());
1798   }
1799 
1800   private static boolean contains(String[] groups, String user) {
1801     for (String group : groups) {
1802       if (group.equals(user)) {
1803         return true;
1804       }
1805     }
1806     return false;
1807   }
1808 
1809   /**
1810    * Log the current state of the filesystem from a certain root directory
1811    * @param fs filesystem to investigate
1812    * @param root root file/directory to start logging from
1813    * @param LOG log to output information
1814    * @throws IOException if an unexpected exception occurs
1815    */
1816   public static void logFileSystemState(final FileSystem fs, final Path root, Log LOG)
1817       throws IOException {
1818     LOG.debug("Current file system:");
1819     logFSTree(LOG, fs, root, "|-");
1820   }
1821 
1822   /**
1823    * Recursive helper to log the state of the FS
1824    *
1825    * @see #logFileSystemState(FileSystem, Path, Log)
1826    */
1827   private static void logFSTree(Log LOG, final FileSystem fs, final Path root, String prefix)
1828       throws IOException {
1829     FileStatus[] files = FSUtils.listStatus(fs, root, null);
1830     if (files == null) return;
1831 
1832     for (FileStatus file : files) {
1833       if (file.isDirectory()) {
1834         LOG.debug(prefix + file.getPath().getName() + "/");
1835         logFSTree(LOG, fs, file.getPath(), prefix + "---");
1836       } else {
1837         LOG.debug(prefix + file.getPath().getName());
1838       }
1839     }
1840   }
1841 
1842   public static boolean renameAndSetModifyTime(final FileSystem fs, final Path src, final Path dest)
1843       throws IOException {
1844     // set the modify time for TimeToLive Cleaner
1845     fs.setTimes(src, EnvironmentEdgeManager.currentTime(), -1);
1846     return fs.rename(src, dest);
1847   }
1848 
1849   /**
1850    * This function is to scan the root path of the file system to get the
1851    * degree of locality for each region on each of the servers having at least
1852    * one block of that region.
1853    * This is used by the tool {@link org.apache.hadoop.hbase.master.RegionPlacementMaintainer}
1854    *
1855    * @param conf
1856    *          the configuration to use
1857    * @return the mapping from region encoded name to a map of server names to
1858    *           locality fraction
1859    * @throws IOException
1860    *           in case of file system errors or interrupts
1861    */
1862   public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
1863       final Configuration conf) throws IOException {
1864     return getRegionDegreeLocalityMappingFromFS(
1865         conf, null,
1866         conf.getInt(THREAD_POOLSIZE, DEFAULT_THREAD_POOLSIZE));
1867 
1868   }
1869 
1870   /**
1871    * This function is to scan the root path of the file system to get the
1872    * degree of locality for each region on each of the servers having at least
1873    * one block of that region.
1874    *
1875    * @param conf
1876    *          the configuration to use
1877    * @param desiredTable
1878    *          the table you wish to scan locality for
1879    * @param threadPoolSize
1880    *          the thread pool size to use
1881    * @return the mapping from region encoded name to a map of server names to
1882    *           locality fraction
1883    * @throws IOException
1884    *           in case of file system errors or interrupts
1885    */
1886   public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
1887       final Configuration conf, final String desiredTable, int threadPoolSize)
1888       throws IOException {
1889     Map<String, Map<String, Float>> regionDegreeLocalityMapping =
1890         new ConcurrentHashMap<String, Map<String, Float>>();
1891     getRegionLocalityMappingFromFS(conf, desiredTable, threadPoolSize, null,
1892         regionDegreeLocalityMapping);
1893     return regionDegreeLocalityMapping;
1894   }
1895 
1896   /**
1897    * This function is to scan the root path of the file system to get either the
1898    * mapping between the region name and its best locality region server or the
1899    * degree of locality of each region on each of the servers having at least
1900    * one block of that region. The output map parameters are both optional.
1901    *
1902    * @param conf
1903    *          the configuration to use
1904    * @param desiredTable
1905    *          the table you wish to scan locality for
1906    * @param threadPoolSize
1907    *          the thread pool size to use
1908    * @param regionToBestLocalityRSMapping
1909    *          the map into which to put the best locality mapping or null
1910    * @param regionDegreeLocalityMapping
1911    *          the map into which to put the locality degree mapping or null,
1912    *          must be a thread-safe implementation
1913    * @throws IOException
1914    *           in case of file system errors or interrupts
1915    */
1916   private static void getRegionLocalityMappingFromFS(
1917       final Configuration conf, final String desiredTable,
1918       int threadPoolSize,
1919       Map<String, String> regionToBestLocalityRSMapping,
1920       Map<String, Map<String, Float>> regionDegreeLocalityMapping)
1921       throws IOException {
1922     FileSystem fs =  FileSystem.get(conf);
1923     Path rootPath = FSUtils.getRootDir(conf);
1924     long startTime = EnvironmentEdgeManager.currentTime();
1925     Path queryPath;
1926     // The table files are in ${hbase.rootdir}/data/<namespace>/<table>/*
1927     if (null == desiredTable) {
1928       queryPath = new Path(new Path(rootPath, HConstants.BASE_NAMESPACE_DIR).toString() + "/*/*/*/");
1929     } else {
1930       queryPath = new Path(FSUtils.getTableDir(rootPath, TableName.valueOf(desiredTable)).toString() + "/*/");
1931     }
1932 
1933     // reject all paths that are not appropriate
1934     PathFilter pathFilter = new PathFilter() {
1935       @Override
1936       public boolean accept(Path path) {
1937         // this is the region name; it may get some noise data
1938         if (null == path) {
1939           return false;
1940         }
1941 
1942         // no parent?
1943         Path parent = path.getParent();
1944         if (null == parent) {
1945           return false;
1946         }
1947 
1948         String regionName = path.getName();
1949         if (null == regionName) {
1950           return false;
1951         }
1952 
1953         if (!regionName.toLowerCase().matches("[0-9a-f]+")) {
1954           return false;
1955         }
1956         return true;
1957       }
1958     };
1959 
1960     FileStatus[] statusList = fs.globStatus(queryPath, pathFilter);
1961 
1962     if (null == statusList) {
1963       return;
1964     } else {
1965       LOG.debug("Query Path: " + queryPath + " ; # list of files: " +
1966           statusList.length);
1967     }
1968 
1969     // lower the number of threads in case we have very few expected regions
1970     threadPoolSize = Math.min(threadPoolSize, statusList.length);
1971 
1972     // run in multiple threads
1973     ThreadPoolExecutor tpe = new ThreadPoolExecutor(threadPoolSize,
1974         threadPoolSize, 60, TimeUnit.SECONDS,
1975         new ArrayBlockingQueue<Runnable>(statusList.length));
1976     try {
1977       // ignore all file status items that are not of interest
1978       for (FileStatus regionStatus : statusList) {
1979         if (null == regionStatus) {
1980           continue;
1981         }
1982 
1983         if (!regionStatus.isDirectory()) {
1984           continue;
1985         }
1986 
1987         Path regionPath = regionStatus.getPath();
1988         if (null == regionPath) {
1989           continue;
1990         }
1991 
1992         tpe.execute(new FSRegionScanner(fs, regionPath,
1993             regionToBestLocalityRSMapping, regionDegreeLocalityMapping));
1994       }
1995     } finally {
1996       tpe.shutdown();
1997       int threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY,
1998           60 * 1000);
1999       try {
2000         // here we wait until TPE terminates, which is either naturally or by
2001         // exceptions in the execution of the threads
2002         while (!tpe.awaitTermination(threadWakeFrequency,
2003             TimeUnit.MILLISECONDS)) {
2004           // printing out rough estimate, so as to not introduce
2005           // AtomicInteger
2006           LOG.info("Locality checking is underway: { Scanned Regions : "
2007               + tpe.getCompletedTaskCount() + "/"
2008               + tpe.getTaskCount() + " }");
2009         }
2010       } catch (InterruptedException e) {
2011         throw (InterruptedIOException)new InterruptedIOException().initCause(e);
2012       }
2013     }
2014 
2015     long overhead = EnvironmentEdgeManager.currentTime() - startTime;
2016     String overheadMsg = "Scan DFS for locality info takes " + overhead + " ms";
2017 
2018     LOG.info(overheadMsg);
2019   }
2020 
2021   /**
2022    * Do our short circuit read setup.
2023    * Checks buffer size to use and whether to do checksumming in hbase or hdfs.
2024    * @param conf
2025    */
2026   public static void setupShortCircuitRead(final Configuration conf) {
2027     // Check that the user has not set the "dfs.client.read.shortcircuit.skip.checksum" property.
2028     boolean shortCircuitSkipChecksum =
2029       conf.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false);
2030     boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
2031     if (shortCircuitSkipChecksum) {
2032       LOG.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not " +
2033         "be set to true." + (useHBaseChecksum ? " HBase checksum doesn't require " +
2034         "it, see https://issues.apache.org/jira/browse/HBASE-6868." : ""));
2035       assert !shortCircuitSkipChecksum; //this will fail if assertions are on
2036     }
2037     checkShortCircuitReadBufferSize(conf);
2038   }
2039 
2040   /**
2041    * Check if short circuit read buffer size is set and if not, set it to hbase value.
2042    * @param conf
2043    */
2044   public static void checkShortCircuitReadBufferSize(final Configuration conf) {
2045     final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2;
2046     final int notSet = -1;
2047     // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2
2048     final String dfsKey = "dfs.client.read.shortcircuit.buffer.size";
2049     int size = conf.getInt(dfsKey, notSet);
2050     // If a size is set, return -- we will use it.
2051     if (size != notSet) return;
2052     // But short circuit buffer size is normally not set.  Put in place the hbase wanted size.
2053     int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize);
2054     conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
2055   }
2056 
2057   /**
2058    * @param c
2059    * @return The DFSClient DFSHedgedReadMetrics instance or null if can't be found or not on hdfs.
2060    * @throws IOException 
2061    */
2062   public static DFSHedgedReadMetrics getDFSHedgedReadMetrics(final Configuration c)
2063       throws IOException {
2064     if (!isHDFS(c)) return null;
2065     // getHedgedReadMetrics is package private. Get the DFSClient instance that is internal
2066     // to the DFS FS instance and make the method getHedgedReadMetrics accessible, then invoke it
2067     // to get the singleton instance of DFSHedgedReadMetrics shared by DFSClients.
2068     final String name = "getHedgedReadMetrics";
2069     DFSClient dfsclient = ((DistributedFileSystem)FileSystem.get(c)).getClient();
2070     Method m;
2071     try {
2072       m = dfsclient.getClass().getDeclaredMethod(name);
2073     } catch (NoSuchMethodException e) {
2074       LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " +
2075           e.getMessage());
2076       return null;
2077     } catch (SecurityException e) {
2078       LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " +
2079           e.getMessage());
2080       return null;
2081     }
2082     m.setAccessible(true);
2083     try {
2084       return (DFSHedgedReadMetrics)m.invoke(dfsclient);
2085     } catch (IllegalAccessException e) {
2086       LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " +
2087           e.getMessage());
2088       return null;
2089     } catch (IllegalArgumentException e) {
2090       LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " +
2091           e.getMessage());
2092       return null;
2093     } catch (InvocationTargetException e) {
2094       LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " +
2095           e.getMessage());
2096       return null;
2097     }
2098   }
2099 }