View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.util;
20  
21  import java.io.ByteArrayInputStream;
22  import java.io.DataInputStream;
23  import java.io.EOFException;
24  import java.io.FileNotFoundException;
25  import java.io.IOException;
26  import java.io.InputStream;
27  import java.io.InterruptedIOException;
28  import java.lang.reflect.InvocationTargetException;
29  import java.lang.reflect.Method;
30  import java.net.InetSocketAddress;
31  import java.net.URI;
32  import java.net.URISyntaxException;
33  import java.util.ArrayList;
34  import java.util.Collections;
35  import java.util.HashMap;
36  import java.util.LinkedList;
37  import java.util.List;
38  import java.util.Map;
39  import java.util.concurrent.ArrayBlockingQueue;
40  import java.util.concurrent.ConcurrentHashMap;
41  import java.util.concurrent.ThreadPoolExecutor;
42  import java.util.concurrent.TimeUnit;
43  import java.util.regex.Pattern;
44  
45  import org.apache.commons.logging.Log;
46  import org.apache.commons.logging.LogFactory;
47  import org.apache.hadoop.hbase.classification.InterfaceAudience;
48  import org.apache.hadoop.HadoopIllegalArgumentException;
49  import org.apache.hadoop.conf.Configuration;
50  import org.apache.hadoop.fs.BlockLocation;
51  import org.apache.hadoop.fs.FSDataInputStream;
52  import org.apache.hadoop.fs.FSDataOutputStream;
53  import org.apache.hadoop.fs.FileStatus;
54  import org.apache.hadoop.fs.FileSystem;
55  import org.apache.hadoop.fs.Path;
56  import org.apache.hadoop.fs.PathFilter;
57  import org.apache.hadoop.fs.permission.FsAction;
58  import org.apache.hadoop.fs.permission.FsPermission;
59  import org.apache.hadoop.hbase.ClusterId;
60  import org.apache.hadoop.hbase.HColumnDescriptor;
61  import org.apache.hadoop.hbase.HConstants;
62  import org.apache.hadoop.hbase.HDFSBlocksDistribution;
63  import org.apache.hadoop.hbase.HRegionInfo;
64  import org.apache.hadoop.hbase.RemoteExceptionHandler;
65  import org.apache.hadoop.hbase.TableName;
66  import org.apache.hadoop.hbase.exceptions.DeserializationException;
67  import org.apache.hadoop.hbase.fs.HFileSystem;
68  import org.apache.hadoop.hbase.master.HMaster;
69  import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
70  import org.apache.hadoop.hbase.security.AccessDeniedException;
71  import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter;
72  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
73  import org.apache.hadoop.hbase.protobuf.generated.FSProtos;
74  import org.apache.hadoop.hbase.regionserver.HRegion;
75  import org.apache.hadoop.hdfs.DistributedFileSystem;
76  import org.apache.hadoop.io.IOUtils;
77  import org.apache.hadoop.io.SequenceFile;
78  import org.apache.hadoop.ipc.RemoteException;
79  import org.apache.hadoop.security.UserGroupInformation;
80  import org.apache.hadoop.util.Progressable;
81  import org.apache.hadoop.util.ReflectionUtils;
82  import org.apache.hadoop.util.StringUtils;
83  
84  import com.google.common.primitives.Ints;
85  
86  /**
87   * Utility methods for interacting with the underlying file system.
88   */
89  @InterfaceAudience.Private
90  public abstract class FSUtils {
91    private static final Log LOG = LogFactory.getLog(FSUtils.class);
92  
93    /** Full access permissions (starting point for a umask) */
94    public static final String FULL_RWX_PERMISSIONS = "777";
95    private static final String THREAD_POOLSIZE = "hbase.client.localityCheck.threadPoolSize";
96    private static final int DEFAULT_THREAD_POOLSIZE = 2;
97  
98    /** Set to true on Windows platforms */
99    public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
100 
101   protected FSUtils() {
102     super();
103   }
104 
105   /**
106    * Sets storage policy for given path according to config setting.
107    * If the passed path is a directory, we'll set the storage policy for all files
108    * created in the future in said directory. Note that this change in storage
109    * policy takes place at the HDFS level; it will persist beyond this RS's lifecycle.
110    * If we're running on a version of HDFS that doesn't support the given storage policy
111    * (or storage policies at all), then we'll issue a log message and continue.
112    *
113    * See http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html
114    *
115    * @param fs We only do anything if an instance of DistributedFileSystem
116    * @param conf used to look up storage policy with given key; not modified.
117    * @param path the Path whose storage policy is to be set
118    * @param policyKey e.g. HConstants.WAL_STORAGE_POLICY
119    * @param defaultPolicy usually should be the policy NONE to delegate to HDFS
120    */
121   public static void setStoragePolicy(final FileSystem fs, final Configuration conf,
122       final Path path, final String policyKey, final String defaultPolicy) {
123     String storagePolicy = conf.get(policyKey, defaultPolicy).toUpperCase();
124     if (storagePolicy.equals(defaultPolicy)) {
125       if (LOG.isTraceEnabled()) {
126         LOG.trace("default policy of " + defaultPolicy + " requested, exiting early.");
127       }
128       return;
129     }
130     if (fs instanceof DistributedFileSystem) {
131       DistributedFileSystem dfs = (DistributedFileSystem)fs;
132       // Once our minimum supported Hadoop version is 2.6.0 we can remove reflection.
133       Class<? extends DistributedFileSystem> dfsClass = dfs.getClass();
134       Method m = null;
135       try {
136         m = dfsClass.getDeclaredMethod("setStoragePolicy",
137             new Class<?>[] { Path.class, String.class });
138         m.setAccessible(true);
139       } catch (NoSuchMethodException e) {
140         LOG.info("FileSystem doesn't support"
141             + " setStoragePolicy; --HDFS-6584 not available");
142       } catch (SecurityException e) {
143         LOG.info("Doesn't have access to setStoragePolicy on "
144             + "FileSystems --HDFS-6584 not available", e);
145         m = null; // could happen on setAccessible()
146       }
147       if (m != null) {
148         try {
149           m.invoke(dfs, path, storagePolicy);
150           LOG.info("set " + storagePolicy + " for " + path);
151         } catch (Exception e) {
152           // check for lack of HDFS-7228
153           boolean probablyBadPolicy = false;
154           if (e instanceof InvocationTargetException) {
155             final Throwable exception = e.getCause();
156             if (exception instanceof RemoteException &&
157                 HadoopIllegalArgumentException.class.getName().equals(
158                     ((RemoteException)exception).getClassName())) {
159               LOG.warn("Given storage policy, '" + storagePolicy + "', was rejected and probably " +
160                   "isn't a valid policy for the version of Hadoop you're running. I.e. if you're " +
161                   "trying to use SSD related policies then you're likely missing HDFS-7228. For " +
162                   "more information see the 'ArchivalStorage' docs for your Hadoop release.");
163               LOG.debug("More information about the invalid storage policy.", exception);
164               probablyBadPolicy = true;
165             }
166           }
167           if (!probablyBadPolicy) {
168             // This swallows FNFE, should we be throwing it? seems more likely to indicate dev
169             // misuse than a runtime problem with HDFS.
170             LOG.warn("Unable to set " + storagePolicy + " for " + path, e);
171           }
172         }
173       }
174     } else {
175       LOG.info("FileSystem isn't an instance of DistributedFileSystem; presuming it doesn't " +
176           "support setStoragePolicy.");
177     }
178   }
179 
180   /**
181    * Compare of path component. Does not consider schema; i.e. if schemas different but <code>path
182    * <code> starts with <code>rootPath<code>, then the function returns true
183    * @param rootPath
184    * @param path
185    * @return True if <code>path</code> starts with <code>rootPath</code>
186    */
187   public static boolean isStartingWithPath(final Path rootPath, final String path) {
188     String uriRootPath = rootPath.toUri().getPath();
189     String tailUriPath = (new Path(path)).toUri().getPath();
190     return tailUriPath.startsWith(uriRootPath);
191   }
192 
193   /**
194    * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
195    * '/a/b/c' part. Does not consider schema; i.e. if schemas different but path or subpath matches,
196    * the two will equate.
197    * @param pathToSearch Path we will be trying to match.
198    * @param pathTail
199    * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
200    */
201   public static boolean isMatchingTail(final Path pathToSearch, String pathTail) {
202     return isMatchingTail(pathToSearch, new Path(pathTail));
203   }
204 
205   /**
206    * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
207    * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true.  Does not consider
208    * schema; i.e. if schemas different but path or subpath matches, the two will equate.
209    * @param pathToSearch Path we will be trying to match.
210    * @param pathTail
211    * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
212    */
213   public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) {
214     if (pathToSearch.depth() != pathTail.depth()) return false;
215     Path tailPath = pathTail;
216     String tailName;
217     Path toSearch = pathToSearch;
218     String toSearchName;
219     boolean result = false;
220     do {
221       tailName = tailPath.getName();
222       if (tailName == null || tailName.length() <= 0) {
223         result = true;
224         break;
225       }
226       toSearchName = toSearch.getName();
227       if (toSearchName == null || toSearchName.length() <= 0) break;
228       // Move up a parent on each path for next go around.  Path doesn't let us go off the end.
229       tailPath = tailPath.getParent();
230       toSearch = toSearch.getParent();
231     } while(tailName.equals(toSearchName));
232     return result;
233   }
234 
235   public static FSUtils getInstance(FileSystem fs, Configuration conf) {
236     String scheme = fs.getUri().getScheme();
237     if (scheme == null) {
238       LOG.warn("Could not find scheme for uri " +
239           fs.getUri() + ", default to hdfs");
240       scheme = "hdfs";
241     }
242     Class<?> fsUtilsClass = conf.getClass("hbase.fsutil." +
243         scheme + ".impl", FSHDFSUtils.class); // Default to HDFS impl
244     FSUtils fsUtils = (FSUtils)ReflectionUtils.newInstance(fsUtilsClass, conf);
245     return fsUtils;
246   }
247 
248   /**
249    * Delete if exists.
250    * @param fs filesystem object
251    * @param dir directory to delete
252    * @return True if deleted <code>dir</code>
253    * @throws IOException e
254    */
255   public static boolean deleteDirectory(final FileSystem fs, final Path dir)
256   throws IOException {
257     return fs.exists(dir) && fs.delete(dir, true);
258   }
259 
260   /**
261    * Delete the region directory if exists.
262    * @param conf
263    * @param hri
264    * @return True if deleted the region directory.
265    * @throws IOException
266    */
267   public static boolean deleteRegionDir(final Configuration conf, final HRegionInfo hri)
268   throws IOException {
269     Path rootDir = getRootDir(conf);
270     FileSystem fs = rootDir.getFileSystem(conf);
271     return deleteDirectory(fs,
272       new Path(getTableDir(rootDir, hri.getTable()), hri.getEncodedName()));
273   }
274 
275   /**
276    * Return the number of bytes that large input files should be optimally
277    * be split into to minimize i/o time.
278    *
279    * use reflection to search for getDefaultBlockSize(Path f)
280    * if the method doesn't exist, fall back to using getDefaultBlockSize()
281    *
282    * @param fs filesystem object
283    * @return the default block size for the path's filesystem
284    * @throws IOException e
285    */
286   public static long getDefaultBlockSize(final FileSystem fs, final Path path) throws IOException {
287     Method m = null;
288     Class<? extends FileSystem> cls = fs.getClass();
289     try {
290       m = cls.getMethod("getDefaultBlockSize", new Class<?>[] { Path.class });
291     } catch (NoSuchMethodException e) {
292       LOG.info("FileSystem doesn't support getDefaultBlockSize");
293     } catch (SecurityException e) {
294       LOG.info("Doesn't have access to getDefaultBlockSize on FileSystems", e);
295       m = null; // could happen on setAccessible()
296     }
297     if (m == null) {
298       return fs.getDefaultBlockSize(path);
299     } else {
300       try {
301         Object ret = m.invoke(fs, path);
302         return ((Long)ret).longValue();
303       } catch (Exception e) {
304         throw new IOException(e);
305       }
306     }
307   }
308 
309   /*
310    * Get the default replication.
311    *
312    * use reflection to search for getDefaultReplication(Path f)
313    * if the method doesn't exist, fall back to using getDefaultReplication()
314    *
315    * @param fs filesystem object
316    * @param f path of file
317    * @return default replication for the path's filesystem
318    * @throws IOException e
319    */
320   public static short getDefaultReplication(final FileSystem fs, final Path path) throws IOException {
321     Method m = null;
322     Class<? extends FileSystem> cls = fs.getClass();
323     try {
324       m = cls.getMethod("getDefaultReplication", new Class<?>[] { Path.class });
325     } catch (NoSuchMethodException e) {
326       LOG.info("FileSystem doesn't support getDefaultReplication");
327     } catch (SecurityException e) {
328       LOG.info("Doesn't have access to getDefaultReplication on FileSystems", e);
329       m = null; // could happen on setAccessible()
330     }
331     if (m == null) {
332       return fs.getDefaultReplication(path);
333     } else {
334       try {
335         Object ret = m.invoke(fs, path);
336         return ((Number)ret).shortValue();
337       } catch (Exception e) {
338         throw new IOException(e);
339       }
340     }
341   }
342 
343   /**
344    * Returns the default buffer size to use during writes.
345    *
346    * The size of the buffer should probably be a multiple of hardware
347    * page size (4096 on Intel x86), and it determines how much data is
348    * buffered during read and write operations.
349    *
350    * @param fs filesystem object
351    * @return default buffer size to use during writes
352    */
353   public static int getDefaultBufferSize(final FileSystem fs) {
354     return fs.getConf().getInt("io.file.buffer.size", 4096);
355   }
356 
357   /**
358    * Create the specified file on the filesystem. By default, this will:
359    * <ol>
360    * <li>overwrite the file if it exists</li>
361    * <li>apply the umask in the configuration (if it is enabled)</li>
362    * <li>use the fs configured buffer size (or 4096 if not set)</li>
363    * <li>use the default replication</li>
364    * <li>use the default block size</li>
365    * <li>not track progress</li>
366    * </ol>
367    *
368    * @param fs {@link FileSystem} on which to write the file
369    * @param path {@link Path} to the file to write
370    * @param perm permissions
371    * @param favoredNodes
372    * @return output stream to the created file
373    * @throws IOException if the file cannot be created
374    */
375   public static FSDataOutputStream create(FileSystem fs, Path path,
376       FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException {
377     if (fs instanceof HFileSystem) {
378       FileSystem backingFs = ((HFileSystem)fs).getBackingFs();
379       if (backingFs instanceof DistributedFileSystem) {
380         // Try to use the favoredNodes version via reflection to allow backwards-
381         // compatibility.
382         try {
383           return (FSDataOutputStream) (DistributedFileSystem.class
384               .getDeclaredMethod("create", Path.class, FsPermission.class,
385                   boolean.class, int.class, short.class, long.class,
386                   Progressable.class, InetSocketAddress[].class)
387                   .invoke(backingFs, path, perm, true,
388                       getDefaultBufferSize(backingFs),
389                       getDefaultReplication(backingFs, path),
390                       getDefaultBlockSize(backingFs, path),
391                       null, favoredNodes));
392         } catch (InvocationTargetException ite) {
393           // Function was properly called, but threw it's own exception.
394           throw new IOException(ite.getCause());
395         } catch (NoSuchMethodException e) {
396           LOG.debug("DFS Client does not support most favored nodes create; using default create");
397           if (LOG.isTraceEnabled()) LOG.trace("Ignoring; use default create", e);
398         } catch (IllegalArgumentException e) {
399           LOG.debug("Ignoring (most likely Reflection related exception) " + e);
400         } catch (SecurityException e) {
401           LOG.debug("Ignoring (most likely Reflection related exception) " + e);
402         } catch (IllegalAccessException e) {
403           LOG.debug("Ignoring (most likely Reflection related exception) " + e);
404         }
405       }
406     }
407     return create(fs, path, perm, true);
408   }
409 
410   /**
411    * Create the specified file on the filesystem. By default, this will:
412    * <ol>
413    * <li>apply the umask in the configuration (if it is enabled)</li>
414    * <li>use the fs configured buffer size (or 4096 if not set)</li>
415    * <li>use the default replication</li>
416    * <li>use the default block size</li>
417    * <li>not track progress</li>
418    * </ol>
419    *
420    * @param fs {@link FileSystem} on which to write the file
421    * @param path {@link Path} to the file to write
422    * @param perm
423    * @param overwrite Whether or not the created file should be overwritten.
424    * @return output stream to the created file
425    * @throws IOException if the file cannot be created
426    */
427   public static FSDataOutputStream create(FileSystem fs, Path path,
428       FsPermission perm, boolean overwrite) throws IOException {
429     if (LOG.isTraceEnabled()) {
430       LOG.trace("Creating file=" + path + " with permission=" + perm + ", overwrite=" + overwrite);
431     }
432     return fs.create(path, perm, overwrite, getDefaultBufferSize(fs),
433         getDefaultReplication(fs, path), getDefaultBlockSize(fs, path), null);
434   }
435 
436   /**
437    * Get the file permissions specified in the configuration, if they are
438    * enabled.
439    *
440    * @param fs filesystem that the file will be created on.
441    * @param conf configuration to read for determining if permissions are
442    *          enabled and which to use
443    * @param permssionConfKey property key in the configuration to use when
444    *          finding the permission
445    * @return the permission to use when creating a new file on the fs. If
446    *         special permissions are not specified in the configuration, then
447    *         the default permissions on the the fs will be returned.
448    */
449   public static FsPermission getFilePermissions(final FileSystem fs,
450       final Configuration conf, final String permssionConfKey) {
451     boolean enablePermissions = conf.getBoolean(
452         HConstants.ENABLE_DATA_FILE_UMASK, false);
453 
454     if (enablePermissions) {
455       try {
456         FsPermission perm = new FsPermission(FULL_RWX_PERMISSIONS);
457         // make sure that we have a mask, if not, go default.
458         String mask = conf.get(permssionConfKey);
459         if (mask == null)
460           return FsPermission.getFileDefault();
461         // appy the umask
462         FsPermission umask = new FsPermission(mask);
463         return perm.applyUMask(umask);
464       } catch (IllegalArgumentException e) {
465         LOG.warn(
466             "Incorrect umask attempted to be created: "
467                 + conf.get(permssionConfKey)
468                 + ", using default file permissions.", e);
469         return FsPermission.getFileDefault();
470       }
471     }
472     return FsPermission.getFileDefault();
473   }
474 
475   /**
476    * Checks to see if the specified file system is available
477    *
478    * @param fs filesystem
479    * @throws IOException e
480    */
481   public static void checkFileSystemAvailable(final FileSystem fs)
482   throws IOException {
483     if (!(fs instanceof DistributedFileSystem)) {
484       return;
485     }
486     IOException exception = null;
487     DistributedFileSystem dfs = (DistributedFileSystem) fs;
488     try {
489       if (dfs.exists(new Path("/"))) {
490         return;
491       }
492     } catch (IOException e) {
493       exception = RemoteExceptionHandler.checkIOException(e);
494     }
495     try {
496       fs.close();
497     } catch (Exception e) {
498       LOG.error("file system close failed: ", e);
499     }
500     IOException io = new IOException("File system is not available");
501     io.initCause(exception);
502     throw io;
503   }
504 
505   /**
506    * We use reflection because {@link DistributedFileSystem#setSafeMode(
507    * FSConstants.SafeModeAction action, boolean isChecked)} is not in hadoop 1.1
508    *
509    * @param dfs
510    * @return whether we're in safe mode
511    * @throws IOException
512    */
513   private static boolean isInSafeMode(DistributedFileSystem dfs) throws IOException {
514     boolean inSafeMode = false;
515     try {
516       Method m = DistributedFileSystem.class.getMethod("setSafeMode", new Class<?> []{
517           org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction.class, boolean.class});
518       inSafeMode = (Boolean) m.invoke(dfs,
519         org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction.SAFEMODE_GET, true);
520     } catch (Exception e) {
521       if (e instanceof IOException) throw (IOException) e;
522 
523       // Check whether dfs is on safemode.
524       inSafeMode = dfs.setSafeMode(
525         org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction.SAFEMODE_GET);
526     }
527     return inSafeMode;
528   }
529 
530   /**
531    * Check whether dfs is in safemode.
532    * @param conf
533    * @throws IOException
534    */
535   public static void checkDfsSafeMode(final Configuration conf)
536   throws IOException {
537     boolean isInSafeMode = false;
538     FileSystem fs = FileSystem.get(conf);
539     if (fs instanceof DistributedFileSystem) {
540       DistributedFileSystem dfs = (DistributedFileSystem)fs;
541       isInSafeMode = isInSafeMode(dfs);
542     }
543     if (isInSafeMode) {
544       throw new IOException("File system is in safemode, it can't be written now");
545     }
546   }
547 
548   /**
549    * Verifies current version of file system
550    *
551    * @param fs filesystem object
552    * @param rootdir root hbase directory
553    * @return null if no version file exists, version string otherwise.
554    * @throws IOException e
555    * @throws org.apache.hadoop.hbase.exceptions.DeserializationException
556    */
557   public static String getVersion(FileSystem fs, Path rootdir)
558   throws IOException, DeserializationException {
559     Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
560     FileStatus[] status = null;
561     try {
562       // hadoop 2.0 throws FNFE if directory does not exist.
563       // hadoop 1.0 returns null if directory does not exist.
564       status = fs.listStatus(versionFile);
565     } catch (FileNotFoundException fnfe) {
566       return null;
567     }
568     if (status == null || status.length == 0) return null;
569     String version = null;
570     byte [] content = new byte [(int)status[0].getLen()];
571     FSDataInputStream s = fs.open(versionFile);
572     try {
573       IOUtils.readFully(s, content, 0, content.length);
574       if (ProtobufUtil.isPBMagicPrefix(content)) {
575         version = parseVersionFrom(content);
576       } else {
577         // Presume it pre-pb format.
578         InputStream is = new ByteArrayInputStream(content);
579         DataInputStream dis = new DataInputStream(is);
580         try {
581           version = dis.readUTF();
582         } finally {
583           dis.close();
584         }
585       }
586     } catch (EOFException eof) {
587       LOG.warn("Version file was empty, odd, will try to set it.");
588     } finally {
589       s.close();
590     }
591     return version;
592   }
593 
594   /**
595    * Parse the content of the ${HBASE_ROOTDIR}/hbase.version file.
596    * @param bytes The byte content of the hbase.version file.
597    * @return The version found in the file as a String.
598    * @throws DeserializationException
599    */
600   static String parseVersionFrom(final byte [] bytes)
601   throws DeserializationException {
602     ProtobufUtil.expectPBMagicPrefix(bytes);
603     int pblen = ProtobufUtil.lengthOfPBMagic();
604     FSProtos.HBaseVersionFileContent.Builder builder =
605       FSProtos.HBaseVersionFileContent.newBuilder();
606     try {
607       ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
608       return builder.getVersion();
609     } catch (IOException e) {
610       // Convert
611       throw new DeserializationException(e);
612     }
613   }
614 
615   /**
616    * Create the content to write into the ${HBASE_ROOTDIR}/hbase.version file.
617    * @param version Version to persist
618    * @return Serialized protobuf with <code>version</code> content and a bit of pb magic for a prefix.
619    */
620   static byte [] toVersionByteArray(final String version) {
621     FSProtos.HBaseVersionFileContent.Builder builder =
622       FSProtos.HBaseVersionFileContent.newBuilder();
623     return ProtobufUtil.prependPBMagic(builder.setVersion(version).build().toByteArray());
624   }
625 
626   /**
627    * Verifies current version of file system
628    *
629    * @param fs file system
630    * @param rootdir root directory of HBase installation
631    * @param message if true, issues a message on System.out
632    *
633    * @throws IOException e
634    * @throws DeserializationException
635    */
636   public static void checkVersion(FileSystem fs, Path rootdir, boolean message)
637   throws IOException, DeserializationException {
638     checkVersion(fs, rootdir, message, 0, HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
639   }
640 
641   /**
642    * Verifies current version of file system
643    *
644    * @param fs file system
645    * @param rootdir root directory of HBase installation
646    * @param message if true, issues a message on System.out
647    * @param wait wait interval
648    * @param retries number of times to retry
649    *
650    * @throws IOException e
651    * @throws DeserializationException
652    */
653   public static void checkVersion(FileSystem fs, Path rootdir,
654       boolean message, int wait, int retries)
655   throws IOException, DeserializationException {
656     String version = getVersion(fs, rootdir);
657     if (version == null) {
658       if (!metaRegionExists(fs, rootdir)) {
659         // rootDir is empty (no version file and no root region)
660         // just create new version file (HBASE-1195)
661         setVersion(fs, rootdir, wait, retries);
662         return;
663       }
664     } else if (version.compareTo(HConstants.FILE_SYSTEM_VERSION) == 0) return;
665 
666     // version is deprecated require migration
667     // Output on stdout so user sees it in terminal.
668     String msg = "HBase file layout needs to be upgraded."
669       + " You have version " + version
670       + " and I want version " + HConstants.FILE_SYSTEM_VERSION
671       + ". Consult http://hbase.apache.org/book.html for further information about upgrading HBase."
672       + " Is your hbase.rootdir valid? If so, you may need to run "
673       + "'hbase hbck -fixVersionFile'.";
674     if (message) {
675       System.out.println("WARNING! " + msg);
676     }
677     throw new FileSystemVersionException(msg);
678   }
679 
680   /**
681    * Sets version of file system
682    *
683    * @param fs filesystem object
684    * @param rootdir hbase root
685    * @throws IOException e
686    */
687   public static void setVersion(FileSystem fs, Path rootdir)
688   throws IOException {
689     setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, 0,
690       HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
691   }
692 
693   /**
694    * Sets version of file system
695    *
696    * @param fs filesystem object
697    * @param rootdir hbase root
698    * @param wait time to wait for retry
699    * @param retries number of times to retry before failing
700    * @throws IOException e
701    */
702   public static void setVersion(FileSystem fs, Path rootdir, int wait, int retries)
703   throws IOException {
704     setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, wait, retries);
705   }
706 
707 
708   /**
709    * Sets version of file system
710    *
711    * @param fs filesystem object
712    * @param rootdir hbase root directory
713    * @param version version to set
714    * @param wait time to wait for retry
715    * @param retries number of times to retry before throwing an IOException
716    * @throws IOException e
717    */
718   public static void setVersion(FileSystem fs, Path rootdir, String version,
719       int wait, int retries) throws IOException {
720     Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
721     Path tempVersionFile = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY + Path.SEPARATOR +
722       HConstants.VERSION_FILE_NAME);
723     while (true) {
724       try {
725         // Write the version to a temporary file
726         FSDataOutputStream s = fs.create(tempVersionFile);
727         try {
728           s.write(toVersionByteArray(version));
729           s.close();
730           s = null;
731           // Move the temp version file to its normal location. Returns false
732           // if the rename failed. Throw an IOE in that case.
733           if (!fs.rename(tempVersionFile, versionFile)) {
734             throw new IOException("Unable to move temp version file to " + versionFile);
735           }
736         } finally {
737           // Cleaning up the temporary if the rename failed would be trying
738           // too hard. We'll unconditionally create it again the next time
739           // through anyway, files are overwritten by default by create().
740 
741           // Attempt to close the stream on the way out if it is still open.
742           try {
743             if (s != null) s.close();
744           } catch (IOException ignore) { }
745         }
746         LOG.info("Created version file at " + rootdir.toString() + " with version=" + version);
747         return;
748       } catch (IOException e) {
749         if (retries > 0) {
750           LOG.debug("Unable to create version file at " + rootdir.toString() + ", retrying", e);
751           fs.delete(versionFile, false);
752           try {
753             if (wait > 0) {
754               Thread.sleep(wait);
755             }
756           } catch (InterruptedException ie) {
757             throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
758           }
759           retries--;
760         } else {
761           throw e;
762         }
763       }
764     }
765   }
766 
767   /**
768    * Checks that a cluster ID file exists in the HBase root directory
769    * @param fs the root directory FileSystem
770    * @param rootdir the HBase root directory in HDFS
771    * @param wait how long to wait between retries
772    * @return <code>true</code> if the file exists, otherwise <code>false</code>
773    * @throws IOException if checking the FileSystem fails
774    */
775   public static boolean checkClusterIdExists(FileSystem fs, Path rootdir,
776       int wait) throws IOException {
777     while (true) {
778       try {
779         Path filePath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
780         return fs.exists(filePath);
781       } catch (IOException ioe) {
782         if (wait > 0) {
783           LOG.warn("Unable to check cluster ID file in " + rootdir.toString() +
784               ", retrying in "+wait+"msec: "+StringUtils.stringifyException(ioe));
785           try {
786             Thread.sleep(wait);
787           } catch (InterruptedException e) {
788             throw (InterruptedIOException)new InterruptedIOException().initCause(e);
789           }
790         } else {
791           throw ioe;
792         }
793       }
794     }
795   }
796 
797   /**
798    * Returns the value of the unique cluster ID stored for this HBase instance.
799    * @param fs the root directory FileSystem
800    * @param rootdir the path to the HBase root directory
801    * @return the unique cluster identifier
802    * @throws IOException if reading the cluster ID file fails
803    */
804   public static ClusterId getClusterId(FileSystem fs, Path rootdir)
805   throws IOException {
806     Path idPath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
807     ClusterId clusterId = null;
808     FileStatus status = fs.exists(idPath)? fs.getFileStatus(idPath):  null;
809     if (status != null) {
810       int len = Ints.checkedCast(status.getLen());
811       byte [] content = new byte[len];
812       FSDataInputStream in = fs.open(idPath);
813       try {
814         in.readFully(content);
815       } catch (EOFException eof) {
816         LOG.warn("Cluster ID file " + idPath.toString() + " was empty");
817       } finally{
818         in.close();
819       }
820       try {
821         clusterId = ClusterId.parseFrom(content);
822       } catch (DeserializationException e) {
823         throw new IOException("content=" + Bytes.toString(content), e);
824       }
825       // If not pb'd, make it so.
826       if (!ProtobufUtil.isPBMagicPrefix(content)) {
827         String cid = null;
828         in = fs.open(idPath);
829         try {
830           cid = in.readUTF();
831           clusterId = new ClusterId(cid);
832         } catch (EOFException eof) {
833           LOG.warn("Cluster ID file " + idPath.toString() + " was empty");
834         } finally {
835           in.close();
836         }
837         rewriteAsPb(fs, rootdir, idPath, clusterId);
838       }
839       return clusterId;
840     } else {
841       LOG.warn("Cluster ID file does not exist at " + idPath.toString());
842     }
843     return clusterId;
844   }
845 
846   /**
847    * @param cid
848    * @throws IOException
849    */
850   private static void rewriteAsPb(final FileSystem fs, final Path rootdir, final Path p,
851       final ClusterId cid)
852   throws IOException {
853     // Rewrite the file as pb.  Move aside the old one first, write new
854     // then delete the moved-aside file.
855     Path movedAsideName = new Path(p + "." + System.currentTimeMillis());
856     if (!fs.rename(p, movedAsideName)) throw new IOException("Failed rename of " + p);
857     setClusterId(fs, rootdir, cid, 100);
858     if (!fs.delete(movedAsideName, false)) {
859       throw new IOException("Failed delete of " + movedAsideName);
860     }
861     LOG.debug("Rewrote the hbase.id file as pb");
862   }
863 
864   /**
865    * Writes a new unique identifier for this cluster to the "hbase.id" file
866    * in the HBase root directory
867    * @param fs the root directory FileSystem
868    * @param rootdir the path to the HBase root directory
869    * @param clusterId the unique identifier to store
870    * @param wait how long (in milliseconds) to wait between retries
871    * @throws IOException if writing to the FileSystem fails and no wait value
872    */
873   public static void setClusterId(FileSystem fs, Path rootdir, ClusterId clusterId,
874       int wait) throws IOException {
875     while (true) {
876       try {
877         Path idFile = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
878         Path tempIdFile = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY +
879           Path.SEPARATOR + HConstants.CLUSTER_ID_FILE_NAME);
880         // Write the id file to a temporary location
881         FSDataOutputStream s = fs.create(tempIdFile);
882         try {
883           s.write(clusterId.toByteArray());
884           s.close();
885           s = null;
886           // Move the temporary file to its normal location. Throw an IOE if
887           // the rename failed
888           if (!fs.rename(tempIdFile, idFile)) {
889             throw new IOException("Unable to move temp version file to " + idFile);
890           }
891         } finally {
892           // Attempt to close the stream if still open on the way out
893           try {
894             if (s != null) s.close();
895           } catch (IOException ignore) { }
896         }
897         if (LOG.isDebugEnabled()) {
898           LOG.debug("Created cluster ID file at " + idFile.toString() + " with ID: " + clusterId);
899         }
900         return;
901       } catch (IOException ioe) {
902         if (wait > 0) {
903           LOG.warn("Unable to create cluster ID file in " + rootdir.toString() +
904               ", retrying in " + wait + "msec: " + StringUtils.stringifyException(ioe));
905           try {
906             Thread.sleep(wait);
907           } catch (InterruptedException e) {
908             throw (InterruptedIOException)new InterruptedIOException().initCause(e);
909           }
910         } else {
911           throw ioe;
912         }
913       }
914     }
915   }
916 
917   /**
918    * Verifies root directory path is a valid URI with a scheme
919    *
920    * @param root root directory path
921    * @return Passed <code>root</code> argument.
922    * @throws IOException if not a valid URI with a scheme
923    */
924   public static Path validateRootPath(Path root) throws IOException {
925     try {
926       URI rootURI = new URI(root.toString());
927       String scheme = rootURI.getScheme();
928       if (scheme == null) {
929         throw new IOException("Root directory does not have a scheme");
930       }
931       return root;
932     } catch (URISyntaxException e) {
933       IOException io = new IOException("Root directory path is not a valid " +
934         "URI -- check your " + HConstants.HBASE_DIR + " configuration");
935       io.initCause(e);
936       throw io;
937     }
938   }
939 
940   /**
941    * Checks for the presence of the root path (using the provided conf object) in the given path. If
942    * it exists, this method removes it and returns the String representation of remaining relative path.
943    * @param path
944    * @param conf
945    * @return String representation of the remaining relative path
946    * @throws IOException
947    */
948   public static String removeRootPath(Path path, final Configuration conf) throws IOException {
949     Path root = FSUtils.getRootDir(conf);
950     String pathStr = path.toString();
951     // check that the path is absolute... it has the root path in it.
952     if (!pathStr.startsWith(root.toString())) return pathStr;
953     // if not, return as it is.
954     return pathStr.substring(root.toString().length() + 1);// remove the "/" too.
955   }
956 
957   /**
958    * If DFS, check safe mode and if so, wait until we clear it.
959    * @param conf configuration
960    * @param wait Sleep between retries
961    * @throws IOException e
962    */
963   public static void waitOnSafeMode(final Configuration conf,
964     final long wait)
965   throws IOException {
966     FileSystem fs = FileSystem.get(conf);
967     if (!(fs instanceof DistributedFileSystem)) return;
968     DistributedFileSystem dfs = (DistributedFileSystem)fs;
969     // Make sure dfs is not in safe mode
970     while (isInSafeMode(dfs)) {
971       LOG.info("Waiting for dfs to exit safe mode...");
972       try {
973         Thread.sleep(wait);
974       } catch (InterruptedException e) {
975         throw (InterruptedIOException)new InterruptedIOException().initCause(e);
976       }
977     }
978   }
979 
980   /**
981    * Return the 'path' component of a Path.  In Hadoop, Path is an URI.  This
982    * method returns the 'path' component of a Path's URI: e.g. If a Path is
983    * <code>hdfs://example.org:9000/hbase_trunk/TestTable/compaction.dir</code>,
984    * this method returns <code>/hbase_trunk/TestTable/compaction.dir</code>.
985    * This method is useful if you want to print out a Path without qualifying
986    * Filesystem instance.
987    * @param p Filesystem Path whose 'path' component we are to return.
988    * @return Path portion of the Filesystem
989    */
990   public static String getPath(Path p) {
991     return p.toUri().getPath();
992   }
993 
994   /**
995    * @param c configuration
996    * @return Path to hbase root directory: i.e. <code>hbase.rootdir</code> from
997    * configuration as a qualified Path.
998    * @throws IOException e
999    */
1000   public static Path getRootDir(final Configuration c) throws IOException {
1001     Path p = new Path(c.get(HConstants.HBASE_DIR));
1002     FileSystem fs = p.getFileSystem(c);
1003     return p.makeQualified(fs);
1004   }
1005 
1006   public static void setRootDir(final Configuration c, final Path root) throws IOException {
1007     c.set(HConstants.HBASE_DIR, root.toString());
1008   }
1009 
1010   public static void setFsDefault(final Configuration c, final Path root) throws IOException {
1011     c.set("fs.defaultFS", root.toString());    // for hadoop 0.21+
1012   }
1013 
1014   /**
1015    * Checks if meta region exists
1016    *
1017    * @param fs file system
1018    * @param rootdir root directory of HBase installation
1019    * @return true if exists
1020    * @throws IOException e
1021    */
1022   @SuppressWarnings("deprecation")
1023   public static boolean metaRegionExists(FileSystem fs, Path rootdir)
1024   throws IOException {
1025     Path metaRegionDir =
1026       HRegion.getRegionDir(rootdir, HRegionInfo.FIRST_META_REGIONINFO);
1027     return fs.exists(metaRegionDir);
1028   }
1029 
1030   /**
1031    * Compute HDFS blocks distribution of a given file, or a portion of the file
1032    * @param fs file system
1033    * @param status file status of the file
1034    * @param start start position of the portion
1035    * @param length length of the portion
1036    * @return The HDFS blocks distribution
1037    */
1038   static public HDFSBlocksDistribution computeHDFSBlocksDistribution(
1039     final FileSystem fs, FileStatus status, long start, long length)
1040     throws IOException {
1041     HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution();
1042     BlockLocation [] blockLocations =
1043       fs.getFileBlockLocations(status, start, length);
1044     for(BlockLocation bl : blockLocations) {
1045       String [] hosts = bl.getHosts();
1046       long len = bl.getLength();
1047       blocksDistribution.addHostsAndBlockWeight(hosts, len);
1048     }
1049 
1050     return blocksDistribution;
1051   }
1052 
1053 
1054 
1055   /**
1056    * Runs through the hbase rootdir and checks all stores have only
1057    * one file in them -- that is, they've been major compacted.  Looks
1058    * at root and meta tables too.
1059    * @param fs filesystem
1060    * @param hbaseRootDir hbase root directory
1061    * @return True if this hbase install is major compacted.
1062    * @throws IOException e
1063    */
1064   public static boolean isMajorCompacted(final FileSystem fs,
1065       final Path hbaseRootDir)
1066   throws IOException {
1067     List<Path> tableDirs = getTableDirs(fs, hbaseRootDir);
1068     PathFilter regionFilter = new RegionDirFilter(fs);
1069     PathFilter familyFilter = new FamilyDirFilter(fs);
1070     for (Path d : tableDirs) {
1071       FileStatus[] regionDirs = fs.listStatus(d, regionFilter);
1072       for (FileStatus regionDir : regionDirs) {
1073         Path dd = regionDir.getPath();
1074         // Else its a region name.  Now look in region for families.
1075         FileStatus[] familyDirs = fs.listStatus(dd, familyFilter);
1076         for (FileStatus familyDir : familyDirs) {
1077           Path family = familyDir.getPath();
1078           // Now in family make sure only one file.
1079           FileStatus[] familyStatus = fs.listStatus(family);
1080           if (familyStatus.length > 1) {
1081             LOG.debug(family.toString() + " has " + familyStatus.length +
1082                 " files.");
1083             return false;
1084           }
1085         }
1086       }
1087     }
1088     return true;
1089   }
1090 
1091   // TODO move this method OUT of FSUtils. No dependencies to HMaster
1092   /**
1093    * Returns the total overall fragmentation percentage. Includes hbase:meta and
1094    * -ROOT- as well.
1095    *
1096    * @param master  The master defining the HBase root and file system.
1097    * @return A map for each table and its percentage.
1098    * @throws IOException When scanning the directory fails.
1099    */
1100   public static int getTotalTableFragmentation(final HMaster master)
1101   throws IOException {
1102     Map<String, Integer> map = getTableFragmentation(master);
1103     return map != null && map.size() > 0 ? map.get("-TOTAL-") : -1;
1104   }
1105 
1106   /**
1107    * Runs through the HBase rootdir and checks how many stores for each table
1108    * have more than one file in them. Checks -ROOT- and hbase:meta too. The total
1109    * percentage across all tables is stored under the special key "-TOTAL-".
1110    *
1111    * @param master  The master defining the HBase root and file system.
1112    * @return A map for each table and its percentage.
1113    *
1114    * @throws IOException When scanning the directory fails.
1115    */
1116   public static Map<String, Integer> getTableFragmentation(
1117     final HMaster master)
1118   throws IOException {
1119     Path path = getRootDir(master.getConfiguration());
1120     // since HMaster.getFileSystem() is package private
1121     FileSystem fs = path.getFileSystem(master.getConfiguration());
1122     return getTableFragmentation(fs, path);
1123   }
1124 
1125   /**
1126    * Runs through the HBase rootdir and checks how many stores for each table
1127    * have more than one file in them. Checks -ROOT- and hbase:meta too. The total
1128    * percentage across all tables is stored under the special key "-TOTAL-".
1129    *
1130    * @param fs  The file system to use.
1131    * @param hbaseRootDir  The root directory to scan.
1132    * @return A map for each table and its percentage.
1133    * @throws IOException When scanning the directory fails.
1134    */
1135   public static Map<String, Integer> getTableFragmentation(
1136     final FileSystem fs, final Path hbaseRootDir)
1137   throws IOException {
1138     Map<String, Integer> frags = new HashMap<String, Integer>();
1139     int cfCountTotal = 0;
1140     int cfFragTotal = 0;
1141     PathFilter regionFilter = new RegionDirFilter(fs);
1142     PathFilter familyFilter = new FamilyDirFilter(fs);
1143     List<Path> tableDirs = getTableDirs(fs, hbaseRootDir);
1144     for (Path d : tableDirs) {
1145       int cfCount = 0;
1146       int cfFrag = 0;
1147       FileStatus[] regionDirs = fs.listStatus(d, regionFilter);
1148       for (FileStatus regionDir : regionDirs) {
1149         Path dd = regionDir.getPath();
1150         // else its a region name, now look in region for families
1151         FileStatus[] familyDirs = fs.listStatus(dd, familyFilter);
1152         for (FileStatus familyDir : familyDirs) {
1153           cfCount++;
1154           cfCountTotal++;
1155           Path family = familyDir.getPath();
1156           // now in family make sure only one file
1157           FileStatus[] familyStatus = fs.listStatus(family);
1158           if (familyStatus.length > 1) {
1159             cfFrag++;
1160             cfFragTotal++;
1161           }
1162         }
1163       }
1164       // compute percentage per table and store in result list
1165       frags.put(FSUtils.getTableName(d).getNameAsString(),
1166         cfCount == 0? 0: Math.round((float) cfFrag / cfCount * 100));
1167     }
1168     // set overall percentage for all tables
1169     frags.put("-TOTAL-",
1170       cfCountTotal == 0? 0: Math.round((float) cfFragTotal / cfCountTotal * 100));
1171     return frags;
1172   }
1173 
1174   /**
1175    * Returns the {@link org.apache.hadoop.fs.Path} object representing the table directory under
1176    * path rootdir
1177    *
1178    * @param rootdir qualified path of HBase root directory
1179    * @param tableName name of table
1180    * @return {@link org.apache.hadoop.fs.Path} for table
1181    */
1182   public static Path getTableDir(Path rootdir, final TableName tableName) {
1183     return new Path(getNamespaceDir(rootdir, tableName.getNamespaceAsString()),
1184         tableName.getQualifierAsString());
1185   }
1186 
1187   /**
1188    * Returns the {@link org.apache.hadoop.hbase.TableName} object representing
1189    * the table directory under
1190    * path rootdir
1191    *
1192    * @param tablePath path of table
1193    * @return {@link org.apache.hadoop.fs.Path} for table
1194    */
1195   public static TableName getTableName(Path tablePath) {
1196     return TableName.valueOf(tablePath.getParent().getName(), tablePath.getName());
1197   }
1198 
1199   /**
1200    * Returns the {@link org.apache.hadoop.fs.Path} object representing
1201    * the namespace directory under path rootdir
1202    *
1203    * @param rootdir qualified path of HBase root directory
1204    * @param namespace namespace name
1205    * @return {@link org.apache.hadoop.fs.Path} for table
1206    */
1207   public static Path getNamespaceDir(Path rootdir, final String namespace) {
1208     return new Path(rootdir, new Path(HConstants.BASE_NAMESPACE_DIR,
1209         new Path(namespace)));
1210   }
1211 
1212   /**
1213    * A {@link PathFilter} that returns only regular files.
1214    */
1215   static class FileFilter implements PathFilter {
1216     private final FileSystem fs;
1217 
1218     public FileFilter(final FileSystem fs) {
1219       this.fs = fs;
1220     }
1221 
1222     @Override
1223     public boolean accept(Path p) {
1224       try {
1225         return fs.isFile(p);
1226       } catch (IOException e) {
1227         LOG.debug("unable to verify if path=" + p + " is a regular file", e);
1228         return false;
1229       }
1230     }
1231   }
1232 
1233   /**
1234    * Directory filter that doesn't include any of the directories in the specified blacklist
1235    */
1236   public static class BlackListDirFilter implements PathFilter {
1237     private final FileSystem fs;
1238     private List<String> blacklist;
1239 
1240     /**
1241      * Create a filter on the givem filesystem with the specified blacklist
1242      * @param fs filesystem to filter
1243      * @param directoryNameBlackList list of the names of the directories to filter. If
1244      *          <tt>null</tt>, all directories are returned
1245      */
1246     @SuppressWarnings("unchecked")
1247     public BlackListDirFilter(final FileSystem fs, final List<String> directoryNameBlackList) {
1248       this.fs = fs;
1249       blacklist =
1250         (List<String>) (directoryNameBlackList == null ? Collections.emptyList()
1251           : directoryNameBlackList);
1252     }
1253 
1254     @Override
1255     public boolean accept(Path p) {
1256       boolean isValid = false;
1257       try {
1258         if (isValidName(p.getName())) {
1259           isValid = fs.getFileStatus(p).isDirectory();
1260         } else {
1261           isValid = false;
1262         }
1263       } catch (IOException e) {
1264         LOG.warn("An error occurred while verifying if [" + p.toString()
1265             + "] is a valid directory. Returning 'not valid' and continuing.", e);
1266       }
1267       return isValid;
1268     }
1269 
1270     protected boolean isValidName(final String name) {
1271       return !blacklist.contains(name);
1272     }
1273   }
1274 
1275   /**
1276    * A {@link PathFilter} that only allows directories.
1277    */
1278   public static class DirFilter extends BlackListDirFilter {
1279 
1280     public DirFilter(FileSystem fs) {
1281       super(fs, null);
1282     }
1283   }
1284 
1285   /**
1286    * A {@link PathFilter} that returns usertable directories. To get all directories use the
1287    * {@link BlackListDirFilter} with a <tt>null</tt> blacklist
1288    */
1289   public static class UserTableDirFilter extends BlackListDirFilter {
1290     public UserTableDirFilter(FileSystem fs) {
1291       super(fs, HConstants.HBASE_NON_TABLE_DIRS);
1292     }
1293 
1294     protected boolean isValidName(final String name) {
1295       if (!super.isValidName(name))
1296         return false;
1297 
1298       try {
1299         TableName.isLegalTableQualifierName(Bytes.toBytes(name));
1300       } catch (IllegalArgumentException e) {
1301         LOG.info("INVALID NAME " + name);
1302         return false;
1303       }
1304       return true;
1305     }
1306   }
1307 
1308   /**
1309    * Heuristic to determine whether is safe or not to open a file for append
1310    * Looks both for dfs.support.append and use reflection to search
1311    * for SequenceFile.Writer.syncFs() or FSDataOutputStream.hflush()
1312    * @param conf
1313    * @return True if append support
1314    */
1315   public static boolean isAppendSupported(final Configuration conf) {
1316     boolean append = conf.getBoolean("dfs.support.append", false);
1317     if (append) {
1318       try {
1319         // TODO: The implementation that comes back when we do a createWriter
1320         // may not be using SequenceFile so the below is not a definitive test.
1321         // Will do for now (hdfs-200).
1322         SequenceFile.Writer.class.getMethod("syncFs", new Class<?> []{});
1323         append = true;
1324       } catch (SecurityException e) {
1325       } catch (NoSuchMethodException e) {
1326         append = false;
1327       }
1328     }
1329     if (!append) {
1330       // Look for the 0.21, 0.22, new-style append evidence.
1331       try {
1332         FSDataOutputStream.class.getMethod("hflush", new Class<?> []{});
1333         append = true;
1334       } catch (NoSuchMethodException e) {
1335         append = false;
1336       }
1337     }
1338     return append;
1339   }
1340 
1341   /**
1342    * @param conf
1343    * @return True if this filesystem whose scheme is 'hdfs'.
1344    * @throws IOException
1345    */
1346   public static boolean isHDFS(final Configuration conf) throws IOException {
1347     FileSystem fs = FileSystem.get(conf);
1348     String scheme = fs.getUri().getScheme();
1349     return scheme.equalsIgnoreCase("hdfs");
1350   }
1351 
1352   /**
1353    * Recover file lease. Used when a file might be suspect
1354    * to be had been left open by another process.
1355    * @param fs FileSystem handle
1356    * @param p Path of file to recover lease
1357    * @param conf Configuration handle
1358    * @throws IOException
1359    */
1360   public abstract void recoverFileLease(final FileSystem fs, final Path p,
1361       Configuration conf, CancelableProgressable reporter) throws IOException;
1362 
1363   public static List<Path> getTableDirs(final FileSystem fs, final Path rootdir)
1364       throws IOException {
1365     List<Path> tableDirs = new LinkedList<Path>();
1366 
1367     for(FileStatus status :
1368         fs.globStatus(new Path(rootdir,
1369             new Path(HConstants.BASE_NAMESPACE_DIR, "*")))) {
1370       tableDirs.addAll(FSUtils.getLocalTableDirs(fs, status.getPath()));
1371     }
1372     return tableDirs;
1373   }
1374 
1375   /**
1376    * @param fs
1377    * @param rootdir
1378    * @return All the table directories under <code>rootdir</code>. Ignore non table hbase folders such as
1379    * .logs, .oldlogs, .corrupt folders.
1380    * @throws IOException
1381    */
1382   public static List<Path> getLocalTableDirs(final FileSystem fs, final Path rootdir)
1383       throws IOException {
1384     // presumes any directory under hbase.rootdir is a table
1385     FileStatus[] dirs = fs.listStatus(rootdir, new UserTableDirFilter(fs));
1386     List<Path> tabledirs = new ArrayList<Path>(dirs.length);
1387     for (FileStatus dir: dirs) {
1388       tabledirs.add(dir.getPath());
1389     }
1390     return tabledirs;
1391   }
1392 
1393   /**
1394    * Checks if the given path is the one with 'recovered.edits' dir.
1395    * @param path
1396    * @return True if we recovered edits
1397    */
1398   public static boolean isRecoveredEdits(Path path) {
1399     return path.toString().contains(HConstants.RECOVERED_EDITS_DIR);
1400   }
1401 
1402   /**
1403    * Filter for all dirs that don't start with '.'
1404    */
1405   public static class RegionDirFilter implements PathFilter {
1406     // This pattern will accept 0.90+ style hex region dirs and older numeric region dir names.
1407     final public static Pattern regionDirPattern = Pattern.compile("^[0-9a-f]*$");
1408     final FileSystem fs;
1409 
1410     public RegionDirFilter(FileSystem fs) {
1411       this.fs = fs;
1412     }
1413 
1414     @Override
1415     public boolean accept(Path rd) {
1416       if (!regionDirPattern.matcher(rd.getName()).matches()) {
1417         return false;
1418       }
1419 
1420       try {
1421         return fs.getFileStatus(rd).isDirectory();
1422       } catch (IOException ioe) {
1423         // Maybe the file was moved or the fs was disconnected.
1424         LOG.warn("Skipping file " + rd +" due to IOException", ioe);
1425         return false;
1426       }
1427     }
1428   }
1429 
1430   /**
1431    * Given a particular table dir, return all the regiondirs inside it, excluding files such as
1432    * .tableinfo
1433    * @param fs A file system for the Path
1434    * @param tableDir Path to a specific table directory <hbase.rootdir>/<tabledir>
1435    * @return List of paths to valid region directories in table dir.
1436    * @throws IOException
1437    */
1438   public static List<Path> getRegionDirs(final FileSystem fs, final Path tableDir) throws IOException {
1439     // assumes we are in a table dir.
1440     FileStatus[] rds = fs.listStatus(tableDir, new RegionDirFilter(fs));
1441     List<Path> regionDirs = new ArrayList<Path>(rds.length);
1442     for (FileStatus rdfs: rds) {
1443       Path rdPath = rdfs.getPath();
1444       regionDirs.add(rdPath);
1445     }
1446     return regionDirs;
1447   }
1448 
1449   /**
1450    * Filter for all dirs that are legal column family names.  This is generally used for colfam
1451    * dirs <hbase.rootdir>/<tabledir>/<regiondir>/<colfamdir>.
1452    */
1453   public static class FamilyDirFilter implements PathFilter {
1454     final FileSystem fs;
1455 
1456     public FamilyDirFilter(FileSystem fs) {
1457       this.fs = fs;
1458     }
1459 
1460     @Override
1461     public boolean accept(Path rd) {
1462       try {
1463         // throws IAE if invalid
1464         HColumnDescriptor.isLegalFamilyName(Bytes.toBytes(rd.getName()));
1465       } catch (IllegalArgumentException iae) {
1466         // path name is an invalid family name and thus is excluded.
1467         return false;
1468       }
1469 
1470       try {
1471         return fs.getFileStatus(rd).isDirectory();
1472       } catch (IOException ioe) {
1473         // Maybe the file was moved or the fs was disconnected.
1474         LOG.warn("Skipping file " + rd +" due to IOException", ioe);
1475         return false;
1476       }
1477     }
1478   }
1479 
1480   /**
1481    * Given a particular region dir, return all the familydirs inside it
1482    *
1483    * @param fs A file system for the Path
1484    * @param regionDir Path to a specific region directory
1485    * @return List of paths to valid family directories in region dir.
1486    * @throws IOException
1487    */
1488   public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir) throws IOException {
1489     // assumes we are in a region dir.
1490     FileStatus[] fds = fs.listStatus(regionDir, new FamilyDirFilter(fs));
1491     List<Path> familyDirs = new ArrayList<Path>(fds.length);
1492     for (FileStatus fdfs: fds) {
1493       Path fdPath = fdfs.getPath();
1494       familyDirs.add(fdPath);
1495     }
1496     return familyDirs;
1497   }
1498 
1499   public static List<Path> getReferenceFilePaths(final FileSystem fs, final Path familyDir) throws IOException {
1500     FileStatus[] fds = fs.listStatus(familyDir, new ReferenceFileFilter(fs));
1501     List<Path> referenceFiles = new ArrayList<Path>(fds.length);
1502     for (FileStatus fdfs: fds) {
1503       Path fdPath = fdfs.getPath();
1504       referenceFiles.add(fdPath);
1505     }
1506     return referenceFiles;
1507   }
1508 
1509   /**
1510    * Filter for HFiles that excludes reference files.
1511    */
1512   public static class HFileFilter implements PathFilter {
1513     final FileSystem fs;
1514 
1515     public HFileFilter(FileSystem fs) {
1516       this.fs = fs;
1517     }
1518 
1519     @Override
1520     public boolean accept(Path rd) {
1521       try {
1522         // only files
1523         return !fs.getFileStatus(rd).isDirectory() && StoreFileInfo.isHFile(rd);
1524       } catch (IOException ioe) {
1525         // Maybe the file was moved or the fs was disconnected.
1526         LOG.warn("Skipping file " + rd +" due to IOException", ioe);
1527         return false;
1528       }
1529     }
1530   }
1531 
1532   public static class ReferenceFileFilter implements PathFilter {
1533 
1534     private final FileSystem fs;
1535 
1536     public ReferenceFileFilter(FileSystem fs) {
1537       this.fs = fs;
1538     }
1539 
1540     @Override
1541     public boolean accept(Path rd) {
1542       try {
1543         // only files can be references.
1544         return !fs.getFileStatus(rd).isDirectory() && StoreFileInfo.isReference(rd);
1545       } catch (IOException ioe) {
1546         // Maybe the file was moved or the fs was disconnected.
1547         LOG.warn("Skipping file " + rd +" due to IOException", ioe);
1548         return false;
1549       }
1550     }
1551   }
1552 
1553 
1554   /**
1555    * @param conf
1556    * @return Returns the filesystem of the hbase rootdir.
1557    * @throws IOException
1558    */
1559   public static FileSystem getCurrentFileSystem(Configuration conf)
1560   throws IOException {
1561     return getRootDir(conf).getFileSystem(conf);
1562   }
1563 
1564 
1565   /**
1566    * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1567    * table StoreFile names to the full Path.
1568    * <br>
1569    * Example...<br>
1570    * Key = 3944417774205889744  <br>
1571    * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1572    *
1573    * @param map map to add values.  If null, this method will create and populate one to return
1574    * @param fs  The file system to use.
1575    * @param hbaseRootDir  The root directory to scan.
1576    * @param tableName name of the table to scan.
1577    * @return Map keyed by StoreFile name with a value of the full Path.
1578    * @throws IOException When scanning the directory fails.
1579    */
1580   public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map,
1581   final FileSystem fs, final Path hbaseRootDir, TableName tableName)
1582   throws IOException {
1583     return getTableStoreFilePathMap(map, fs, hbaseRootDir, tableName, null);
1584   }
1585 
1586   /**
1587    * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1588    * table StoreFile names to the full Path.
1589    * <br>
1590    * Example...<br>
1591    * Key = 3944417774205889744  <br>
1592    * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1593    *
1594    * @param map map to add values.  If null, this method will create and populate one to return
1595    * @param fs  The file system to use.
1596    * @param hbaseRootDir  The root directory to scan.
1597    * @param tableName name of the table to scan.
1598    * @param errors ErrorReporter instance or null
1599    * @return Map keyed by StoreFile name with a value of the full Path.
1600    * @throws IOException When scanning the directory fails.
1601    */
1602   public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map,
1603   final FileSystem fs, final Path hbaseRootDir, TableName tableName, ErrorReporter errors)
1604   throws IOException {
1605     if (map == null) {
1606       map = new HashMap<String, Path>();
1607     }
1608 
1609     // only include the directory paths to tables
1610     Path tableDir = FSUtils.getTableDir(hbaseRootDir, tableName);
1611     // Inside a table, there are compaction.dir directories to skip.  Otherwise, all else
1612     // should be regions.
1613     PathFilter familyFilter = new FamilyDirFilter(fs);
1614     FileStatus[] regionDirs = fs.listStatus(tableDir, new RegionDirFilter(fs));
1615     for (FileStatus regionDir : regionDirs) {
1616       if (null != errors) {
1617         errors.progress();
1618       }
1619       Path dd = regionDir.getPath();
1620       // else its a region name, now look in region for families
1621       FileStatus[] familyDirs = fs.listStatus(dd, familyFilter);
1622       for (FileStatus familyDir : familyDirs) {
1623         if (null != errors) {
1624           errors.progress();
1625         }
1626         Path family = familyDir.getPath();
1627         if (family.getName().equals(HConstants.RECOVERED_EDITS_DIR)) {
1628           continue;
1629         }
1630         // now in family, iterate over the StoreFiles and
1631         // put in map
1632         FileStatus[] familyStatus = fs.listStatus(family);
1633         for (FileStatus sfStatus : familyStatus) {
1634           if (null != errors) {
1635             errors.progress();
1636           }
1637           Path sf = sfStatus.getPath();
1638           map.put( sf.getName(), sf);
1639         }
1640       }
1641     }
1642     return map;
1643   }
1644 
1645   public static int getRegionReferenceFileCount(final FileSystem fs, final Path p) {
1646     int result = 0;
1647     try {
1648       for (Path familyDir:getFamilyDirs(fs, p)){
1649         result += getReferenceFilePaths(fs, familyDir).size();
1650       }
1651     } catch (IOException e) {
1652       LOG.warn("Error Counting reference files.", e);
1653     }
1654     return result;
1655   }
1656 
1657   /**
1658    * Runs through the HBase rootdir and creates a reverse lookup map for
1659    * table StoreFile names to the full Path.
1660    * <br>
1661    * Example...<br>
1662    * Key = 3944417774205889744  <br>
1663    * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1664    *
1665    * @param fs  The file system to use.
1666    * @param hbaseRootDir  The root directory to scan.
1667    * @return Map keyed by StoreFile name with a value of the full Path.
1668    * @throws IOException When scanning the directory fails.
1669    */
1670   public static Map<String, Path> getTableStoreFilePathMap(
1671     final FileSystem fs, final Path hbaseRootDir)
1672   throws IOException {
1673     return getTableStoreFilePathMap(fs, hbaseRootDir, null);
1674   }
1675 
1676   /**
1677    * Runs through the HBase rootdir and creates a reverse lookup map for
1678    * table StoreFile names to the full Path.
1679    * <br>
1680    * Example...<br>
1681    * Key = 3944417774205889744  <br>
1682    * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1683    *
1684    * @param fs  The file system to use.
1685    * @param hbaseRootDir  The root directory to scan.
1686    * @param errors ErrorReporter instance or null
1687    * @return Map keyed by StoreFile name with a value of the full Path.
1688    * @throws IOException When scanning the directory fails.
1689    */
1690   public static Map<String, Path> getTableStoreFilePathMap(
1691     final FileSystem fs, final Path hbaseRootDir, ErrorReporter errors)
1692   throws IOException {
1693     Map<String, Path> map = new HashMap<String, Path>();
1694 
1695     // if this method looks similar to 'getTableFragmentation' that is because
1696     // it was borrowed from it.
1697 
1698     // only include the directory paths to tables
1699     for (Path tableDir : FSUtils.getTableDirs(fs, hbaseRootDir)) {
1700       getTableStoreFilePathMap(map, fs, hbaseRootDir,
1701           FSUtils.getTableName(tableDir), errors);
1702     }
1703     return map;
1704   }
1705 
1706   /**
1707    * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
1708    * This accommodates differences between hadoop versions, where hadoop 1
1709    * does not throw a FileNotFoundException, and return an empty FileStatus[]
1710    * while Hadoop 2 will throw FileNotFoundException.
1711    *
1712    * @param fs file system
1713    * @param dir directory
1714    * @param filter path filter
1715    * @return null if dir is empty or doesn't exist, otherwise FileStatus array
1716    */
1717   public static FileStatus [] listStatus(final FileSystem fs,
1718       final Path dir, final PathFilter filter) throws IOException {
1719     FileStatus [] status = null;
1720     try {
1721       status = filter == null ? fs.listStatus(dir) : fs.listStatus(dir, filter);
1722     } catch (FileNotFoundException fnfe) {
1723       // if directory doesn't exist, return null
1724       if (LOG.isTraceEnabled()) {
1725         LOG.trace(dir + " doesn't exist");
1726       }
1727     }
1728     if (status == null || status.length < 1) return null;
1729     return status;
1730   }
1731 
1732   /**
1733    * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
1734    * This would accommodates differences between hadoop versions
1735    *
1736    * @param fs file system
1737    * @param dir directory
1738    * @return null if dir is empty or doesn't exist, otherwise FileStatus array
1739    */
1740   public static FileStatus[] listStatus(final FileSystem fs, final Path dir) throws IOException {
1741     return listStatus(fs, dir, null);
1742   }
1743 
1744   /**
1745    * Calls fs.delete() and returns the value returned by the fs.delete()
1746    *
1747    * @param fs
1748    * @param path
1749    * @param recursive
1750    * @return the value returned by the fs.delete()
1751    * @throws IOException
1752    */
1753   public static boolean delete(final FileSystem fs, final Path path, final boolean recursive)
1754       throws IOException {
1755     return fs.delete(path, recursive);
1756   }
1757 
1758   /**
1759    * Calls fs.exists(). Checks if the specified path exists
1760    *
1761    * @param fs
1762    * @param path
1763    * @return the value returned by fs.exists()
1764    * @throws IOException
1765    */
1766   public static boolean isExists(final FileSystem fs, final Path path) throws IOException {
1767     return fs.exists(path);
1768   }
1769 
1770   /**
1771    * Throw an exception if an action is not permitted by a user on a file.
1772    *
1773    * @param ugi
1774    *          the user
1775    * @param file
1776    *          the file
1777    * @param action
1778    *          the action
1779    */
1780   public static void checkAccess(UserGroupInformation ugi, FileStatus file,
1781       FsAction action) throws AccessDeniedException {
1782     if (ugi.getShortUserName().equals(file.getOwner())) {
1783       if (file.getPermission().getUserAction().implies(action)) {
1784         return;
1785       }
1786     } else if (contains(ugi.getGroupNames(), file.getGroup())) {
1787       if (file.getPermission().getGroupAction().implies(action)) {
1788         return;
1789       }
1790     } else if (file.getPermission().getOtherAction().implies(action)) {
1791       return;
1792     }
1793     throw new AccessDeniedException("Permission denied:" + " action=" + action
1794         + " path=" + file.getPath() + " user=" + ugi.getShortUserName());
1795   }
1796 
1797   private static boolean contains(String[] groups, String user) {
1798     for (String group : groups) {
1799       if (group.equals(user)) {
1800         return true;
1801       }
1802     }
1803     return false;
1804   }
1805 
1806   /**
1807    * Log the current state of the filesystem from a certain root directory
1808    * @param fs filesystem to investigate
1809    * @param root root file/directory to start logging from
1810    * @param LOG log to output information
1811    * @throws IOException if an unexpected exception occurs
1812    */
1813   public static void logFileSystemState(final FileSystem fs, final Path root, Log LOG)
1814       throws IOException {
1815     LOG.debug("Current file system:");
1816     logFSTree(LOG, fs, root, "|-");
1817   }
1818 
1819   /**
1820    * Recursive helper to log the state of the FS
1821    *
1822    * @see #logFileSystemState(FileSystem, Path, Log)
1823    */
1824   private static void logFSTree(Log LOG, final FileSystem fs, final Path root, String prefix)
1825       throws IOException {
1826     FileStatus[] files = FSUtils.listStatus(fs, root, null);
1827     if (files == null) return;
1828 
1829     for (FileStatus file : files) {
1830       if (file.isDirectory()) {
1831         LOG.debug(prefix + file.getPath().getName() + "/");
1832         logFSTree(LOG, fs, file.getPath(), prefix + "---");
1833       } else {
1834         LOG.debug(prefix + file.getPath().getName());
1835       }
1836     }
1837   }
1838 
1839   public static boolean renameAndSetModifyTime(final FileSystem fs, final Path src, final Path dest)
1840       throws IOException {
1841     // set the modify time for TimeToLive Cleaner
1842     fs.setTimes(src, EnvironmentEdgeManager.currentTime(), -1);
1843     return fs.rename(src, dest);
1844   }
1845 
1846   /**
1847    * This function is to scan the root path of the file system to get the
1848    * degree of locality for each region on each of the servers having at least
1849    * one block of that region.
1850    * This is used by the tool {@link org.apache.hadoop.hbase.master.RegionPlacementMaintainer}
1851    *
1852    * @param conf
1853    *          the configuration to use
1854    * @return the mapping from region encoded name to a map of server names to
1855    *           locality fraction
1856    * @throws IOException
1857    *           in case of file system errors or interrupts
1858    */
1859   public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
1860       final Configuration conf) throws IOException {
1861     return getRegionDegreeLocalityMappingFromFS(
1862         conf, null,
1863         conf.getInt(THREAD_POOLSIZE, DEFAULT_THREAD_POOLSIZE));
1864 
1865   }
1866 
1867   /**
1868    * This function is to scan the root path of the file system to get the
1869    * degree of locality for each region on each of the servers having at least
1870    * one block of that region.
1871    *
1872    * @param conf
1873    *          the configuration to use
1874    * @param desiredTable
1875    *          the table you wish to scan locality for
1876    * @param threadPoolSize
1877    *          the thread pool size to use
1878    * @return the mapping from region encoded name to a map of server names to
1879    *           locality fraction
1880    * @throws IOException
1881    *           in case of file system errors or interrupts
1882    */
1883   public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
1884       final Configuration conf, final String desiredTable, int threadPoolSize)
1885       throws IOException {
1886     Map<String, Map<String, Float>> regionDegreeLocalityMapping =
1887         new ConcurrentHashMap<String, Map<String, Float>>();
1888     getRegionLocalityMappingFromFS(conf, desiredTable, threadPoolSize, null,
1889         regionDegreeLocalityMapping);
1890     return regionDegreeLocalityMapping;
1891   }
1892 
1893   /**
1894    * This function is to scan the root path of the file system to get either the
1895    * mapping between the region name and its best locality region server or the
1896    * degree of locality of each region on each of the servers having at least
1897    * one block of that region. The output map parameters are both optional.
1898    *
1899    * @param conf
1900    *          the configuration to use
1901    * @param desiredTable
1902    *          the table you wish to scan locality for
1903    * @param threadPoolSize
1904    *          the thread pool size to use
1905    * @param regionToBestLocalityRSMapping
1906    *          the map into which to put the best locality mapping or null
1907    * @param regionDegreeLocalityMapping
1908    *          the map into which to put the locality degree mapping or null,
1909    *          must be a thread-safe implementation
1910    * @throws IOException
1911    *           in case of file system errors or interrupts
1912    */
1913   private static void getRegionLocalityMappingFromFS(
1914       final Configuration conf, final String desiredTable,
1915       int threadPoolSize,
1916       Map<String, String> regionToBestLocalityRSMapping,
1917       Map<String, Map<String, Float>> regionDegreeLocalityMapping)
1918       throws IOException {
1919     FileSystem fs =  FileSystem.get(conf);
1920     Path rootPath = FSUtils.getRootDir(conf);
1921     long startTime = EnvironmentEdgeManager.currentTime();
1922     Path queryPath;
1923     // The table files are in ${hbase.rootdir}/data/<namespace>/<table>/*
1924     if (null == desiredTable) {
1925       queryPath = new Path(new Path(rootPath, HConstants.BASE_NAMESPACE_DIR).toString() + "/*/*/*/");
1926     } else {
1927       queryPath = new Path(FSUtils.getTableDir(rootPath, TableName.valueOf(desiredTable)).toString() + "/*/");
1928     }
1929 
1930     // reject all paths that are not appropriate
1931     PathFilter pathFilter = new PathFilter() {
1932       @Override
1933       public boolean accept(Path path) {
1934         // this is the region name; it may get some noise data
1935         if (null == path) {
1936           return false;
1937         }
1938 
1939         // no parent?
1940         Path parent = path.getParent();
1941         if (null == parent) {
1942           return false;
1943         }
1944 
1945         String regionName = path.getName();
1946         if (null == regionName) {
1947           return false;
1948         }
1949 
1950         if (!regionName.toLowerCase().matches("[0-9a-f]+")) {
1951           return false;
1952         }
1953         return true;
1954       }
1955     };
1956 
1957     FileStatus[] statusList = fs.globStatus(queryPath, pathFilter);
1958 
1959     if (null == statusList) {
1960       return;
1961     } else {
1962       LOG.debug("Query Path: " + queryPath + " ; # list of files: " +
1963           statusList.length);
1964     }
1965 
1966     // lower the number of threads in case we have very few expected regions
1967     threadPoolSize = Math.min(threadPoolSize, statusList.length);
1968 
1969     // run in multiple threads
1970     ThreadPoolExecutor tpe = new ThreadPoolExecutor(threadPoolSize,
1971         threadPoolSize, 60, TimeUnit.SECONDS,
1972         new ArrayBlockingQueue<Runnable>(statusList.length));
1973     try {
1974       // ignore all file status items that are not of interest
1975       for (FileStatus regionStatus : statusList) {
1976         if (null == regionStatus) {
1977           continue;
1978         }
1979 
1980         if (!regionStatus.isDirectory()) {
1981           continue;
1982         }
1983 
1984         Path regionPath = regionStatus.getPath();
1985         if (null == regionPath) {
1986           continue;
1987         }
1988 
1989         tpe.execute(new FSRegionScanner(fs, regionPath,
1990             regionToBestLocalityRSMapping, regionDegreeLocalityMapping));
1991       }
1992     } finally {
1993       tpe.shutdown();
1994       int threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY,
1995           60 * 1000);
1996       try {
1997         // here we wait until TPE terminates, which is either naturally or by
1998         // exceptions in the execution of the threads
1999         while (!tpe.awaitTermination(threadWakeFrequency,
2000             TimeUnit.MILLISECONDS)) {
2001           // printing out rough estimate, so as to not introduce
2002           // AtomicInteger
2003           LOG.info("Locality checking is underway: { Scanned Regions : "
2004               + tpe.getCompletedTaskCount() + "/"
2005               + tpe.getTaskCount() + " }");
2006         }
2007       } catch (InterruptedException e) {
2008         throw (InterruptedIOException)new InterruptedIOException().initCause(e);
2009       }
2010     }
2011 
2012     long overhead = EnvironmentEdgeManager.currentTime() - startTime;
2013     String overheadMsg = "Scan DFS for locality info takes " + overhead + " ms";
2014 
2015     LOG.info(overheadMsg);
2016   }
2017 
2018   /**
2019    * Do our short circuit read setup.
2020    * Checks buffer size to use and whether to do checksumming in hbase or hdfs.
2021    * @param conf
2022    */
2023   public static void setupShortCircuitRead(final Configuration conf) {
2024     // Check that the user has not set the "dfs.client.read.shortcircuit.skip.checksum" property.
2025     boolean shortCircuitSkipChecksum =
2026       conf.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false);
2027     boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
2028     if (shortCircuitSkipChecksum) {
2029       LOG.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not " +
2030         "be set to true." + (useHBaseChecksum ? " HBase checksum doesn't require " +
2031         "it, see https://issues.apache.org/jira/browse/HBASE-6868." : ""));
2032       assert !shortCircuitSkipChecksum; //this will fail if assertions are on
2033     }
2034     checkShortCircuitReadBufferSize(conf);
2035   }
2036 
2037   /**
2038    * Check if short circuit read buffer size is set and if not, set it to hbase value.
2039    * @param conf
2040    */
2041   public static void checkShortCircuitReadBufferSize(final Configuration conf) {
2042     final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2;
2043     final int notSet = -1;
2044     // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2
2045     final String dfsKey = "dfs.client.read.shortcircuit.buffer.size";
2046     int size = conf.getInt(dfsKey, notSet);
2047     // If a size is set, return -- we will use it.
2048     if (size != notSet) return;
2049     // But short circuit buffer size is normally not set.  Put in place the hbase wanted size.
2050     int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize);
2051     conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
2052   }
2053 }