View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.util;
20  
21  import java.io.ByteArrayInputStream;
22  import java.io.DataInputStream;
23  import java.io.EOFException;
24  import java.io.FileNotFoundException;
25  import java.io.IOException;
26  import java.io.InputStream;
27  import java.io.InterruptedIOException;
28  import java.lang.reflect.InvocationTargetException;
29  import java.lang.reflect.Method;
30  import java.net.InetSocketAddress;
31  import java.net.URI;
32  import java.net.URISyntaxException;
33  import java.util.ArrayList;
34  import java.util.Collections;
35  import java.util.HashMap;
36  import java.util.LinkedList;
37  import java.util.List;
38  import java.util.Map;
39  import java.util.concurrent.ArrayBlockingQueue;
40  import java.util.concurrent.ConcurrentHashMap;
41  import java.util.concurrent.ThreadPoolExecutor;
42  import java.util.concurrent.TimeUnit;
43  import java.util.regex.Pattern;
44  
45  import org.apache.commons.logging.Log;
46  import org.apache.commons.logging.LogFactory;
47  import org.apache.hadoop.hbase.classification.InterfaceAudience;
48  import org.apache.hadoop.HadoopIllegalArgumentException;
49  import org.apache.hadoop.conf.Configuration;
50  import org.apache.hadoop.fs.BlockLocation;
51  import org.apache.hadoop.fs.FSDataInputStream;
52  import org.apache.hadoop.fs.FSDataOutputStream;
53  import org.apache.hadoop.fs.FileStatus;
54  import org.apache.hadoop.fs.FileSystem;
55  import org.apache.hadoop.fs.Path;
56  import org.apache.hadoop.fs.PathFilter;
57  import org.apache.hadoop.fs.permission.FsAction;
58  import org.apache.hadoop.fs.permission.FsPermission;
59  import org.apache.hadoop.hbase.ClusterId;
60  import org.apache.hadoop.hbase.HColumnDescriptor;
61  import org.apache.hadoop.hbase.HConstants;
62  import org.apache.hadoop.hbase.HDFSBlocksDistribution;
63  import org.apache.hadoop.hbase.HRegionInfo;
64  import org.apache.hadoop.hbase.TableName;
65  import org.apache.hadoop.hbase.exceptions.DeserializationException;
66  import org.apache.hadoop.hbase.fs.HFileSystem;
67  import org.apache.hadoop.hbase.master.HMaster;
68  import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
69  import org.apache.hadoop.hbase.security.AccessDeniedException;
70  import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter;
71  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
72  import org.apache.hadoop.hbase.protobuf.generated.FSProtos;
73  import org.apache.hadoop.hbase.regionserver.HRegion;
74  import org.apache.hadoop.hdfs.DFSClient;
75  import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
76  import org.apache.hadoop.hdfs.DistributedFileSystem;
77  import org.apache.hadoop.io.IOUtils;
78  import org.apache.hadoop.io.SequenceFile;
79  import org.apache.hadoop.ipc.RemoteException;
80  import org.apache.hadoop.security.UserGroupInformation;
81  import org.apache.hadoop.util.Progressable;
82  import org.apache.hadoop.util.ReflectionUtils;
83  import org.apache.hadoop.util.StringUtils;
84  
85  import com.google.common.primitives.Ints;
86  
87  /**
88   * Utility methods for interacting with the underlying file system.
89   */
90  @InterfaceAudience.Private
91  public abstract class FSUtils {
92    private static final Log LOG = LogFactory.getLog(FSUtils.class);
93  
94    /** Full access permissions (starting point for a umask) */
95    public static final String FULL_RWX_PERMISSIONS = "777";
96    private static final String THREAD_POOLSIZE = "hbase.client.localityCheck.threadPoolSize";
97    private static final int DEFAULT_THREAD_POOLSIZE = 2;
98  
99    /** Set to true on Windows platforms */
100   public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
101 
102   protected FSUtils() {
103     super();
104   }
105 
106   /**
107    * Sets storage policy for given path according to config setting.
108    * If the passed path is a directory, we'll set the storage policy for all files
109    * created in the future in said directory. Note that this change in storage
110    * policy takes place at the HDFS level; it will persist beyond this RS's lifecycle.
111    * If we're running on a version of HDFS that doesn't support the given storage policy
112    * (or storage policies at all), then we'll issue a log message and continue.
113    *
114    * See http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html
115    *
116    * @param fs We only do anything if an instance of DistributedFileSystem
117    * @param conf used to look up storage policy with given key; not modified.
118    * @param path the Path whose storage policy is to be set
119    * @param policyKey e.g. HConstants.WAL_STORAGE_POLICY
120    * @param defaultPolicy usually should be the policy NONE to delegate to HDFS
121    */
122   public static void setStoragePolicy(final FileSystem fs, final Configuration conf,
123       final Path path, final String policyKey, final String defaultPolicy) {
124     String storagePolicy = conf.get(policyKey, defaultPolicy).toUpperCase();
125     if (storagePolicy.equals(defaultPolicy)) {
126       if (LOG.isTraceEnabled()) {
127         LOG.trace("default policy of " + defaultPolicy + " requested, exiting early.");
128       }
129       return;
130     }
131     if (fs instanceof DistributedFileSystem) {
132       DistributedFileSystem dfs = (DistributedFileSystem)fs;
133       // Once our minimum supported Hadoop version is 2.6.0 we can remove reflection.
134       Class<? extends DistributedFileSystem> dfsClass = dfs.getClass();
135       Method m = null;
136       try {
137         m = dfsClass.getDeclaredMethod("setStoragePolicy",
138             new Class<?>[] { Path.class, String.class });
139         m.setAccessible(true);
140       } catch (NoSuchMethodException e) {
141         LOG.info("FileSystem doesn't support"
142             + " setStoragePolicy; --HDFS-6584 not available");
143       } catch (SecurityException e) {
144         LOG.info("Doesn't have access to setStoragePolicy on "
145             + "FileSystems --HDFS-6584 not available", e);
146         m = null; // could happen on setAccessible()
147       }
148       if (m != null) {
149         try {
150           m.invoke(dfs, path, storagePolicy);
151           LOG.info("set " + storagePolicy + " for " + path);
152         } catch (Exception e) {
153           // check for lack of HDFS-7228
154           boolean probablyBadPolicy = false;
155           if (e instanceof InvocationTargetException) {
156             final Throwable exception = e.getCause();
157             if (exception instanceof RemoteException &&
158                 HadoopIllegalArgumentException.class.getName().equals(
159                     ((RemoteException)exception).getClassName())) {
160               LOG.warn("Given storage policy, '" + storagePolicy + "', was rejected and probably " +
161                   "isn't a valid policy for the version of Hadoop you're running. I.e. if you're " +
162                   "trying to use SSD related policies then you're likely missing HDFS-7228. For " +
163                   "more information see the 'ArchivalStorage' docs for your Hadoop release.");
164               LOG.debug("More information about the invalid storage policy.", exception);
165               probablyBadPolicy = true;
166             }
167           }
168           if (!probablyBadPolicy) {
169             // This swallows FNFE, should we be throwing it? seems more likely to indicate dev
170             // misuse than a runtime problem with HDFS.
171             LOG.warn("Unable to set " + storagePolicy + " for " + path, e);
172           }
173         }
174       }
175     } else {
176       LOG.info("FileSystem isn't an instance of DistributedFileSystem; presuming it doesn't " +
177           "support setStoragePolicy.");
178     }
179   }
180 
181   /**
182    * Compare of path component. Does not consider schema; i.e. if schemas
183    * different but <code>path</code> starts with <code>rootPath</code>,
184    * then the function returns true
185    * @param rootPath
186    * @param path
187    * @return True if <code>path</code> starts with <code>rootPath</code>
188    */
189   public static boolean isStartingWithPath(final Path rootPath, final String path) {
190     String uriRootPath = rootPath.toUri().getPath();
191     String tailUriPath = (new Path(path)).toUri().getPath();
192     return tailUriPath.startsWith(uriRootPath);
193   }
194 
195   /**
196    * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
197    * '/a/b/c' part. Does not consider schema; i.e. if schemas different but path or subpath matches,
198    * the two will equate.
199    * @param pathToSearch Path we will be trying to match.
200    * @param pathTail
201    * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
202    */
203   public static boolean isMatchingTail(final Path pathToSearch, String pathTail) {
204     return isMatchingTail(pathToSearch, new Path(pathTail));
205   }
206 
207   /**
208    * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
209    * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true.  Does not consider
210    * schema; i.e. if schemas different but path or subpath matches, the two will equate.
211    * @param pathToSearch Path we will be trying to match.
212    * @param pathTail
213    * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
214    */
215   public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) {
216     if (pathToSearch.depth() != pathTail.depth()) return false;
217     Path tailPath = pathTail;
218     String tailName;
219     Path toSearch = pathToSearch;
220     String toSearchName;
221     boolean result = false;
222     do {
223       tailName = tailPath.getName();
224       if (tailName == null || tailName.length() <= 0) {
225         result = true;
226         break;
227       }
228       toSearchName = toSearch.getName();
229       if (toSearchName == null || toSearchName.length() <= 0) break;
230       // Move up a parent on each path for next go around.  Path doesn't let us go off the end.
231       tailPath = tailPath.getParent();
232       toSearch = toSearch.getParent();
233     } while(tailName.equals(toSearchName));
234     return result;
235   }
236 
237   public static FSUtils getInstance(FileSystem fs, Configuration conf) {
238     String scheme = fs.getUri().getScheme();
239     if (scheme == null) {
240       LOG.warn("Could not find scheme for uri " +
241           fs.getUri() + ", default to hdfs");
242       scheme = "hdfs";
243     }
244     Class<?> fsUtilsClass = conf.getClass("hbase.fsutil." +
245         scheme + ".impl", FSHDFSUtils.class); // Default to HDFS impl
246     FSUtils fsUtils = (FSUtils)ReflectionUtils.newInstance(fsUtilsClass, conf);
247     return fsUtils;
248   }
249 
250   /**
251    * Delete if exists.
252    * @param fs filesystem object
253    * @param dir directory to delete
254    * @return True if deleted <code>dir</code>
255    * @throws IOException e
256    */
257   public static boolean deleteDirectory(final FileSystem fs, final Path dir)
258   throws IOException {
259     return fs.exists(dir) && fs.delete(dir, true);
260   }
261 
262   /**
263    * Delete the region directory if exists.
264    * @param conf
265    * @param hri
266    * @return True if deleted the region directory.
267    * @throws IOException
268    */
269   public static boolean deleteRegionDir(final Configuration conf, final HRegionInfo hri)
270   throws IOException {
271     Path rootDir = getRootDir(conf);
272     FileSystem fs = rootDir.getFileSystem(conf);
273     return deleteDirectory(fs,
274       new Path(getTableDir(rootDir, hri.getTable()), hri.getEncodedName()));
275   }
276 
277   /**
278    * Return the number of bytes that large input files should be optimally
279    * be split into to minimize i/o time.
280    *
281    * use reflection to search for getDefaultBlockSize(Path f)
282    * if the method doesn't exist, fall back to using getDefaultBlockSize()
283    *
284    * @param fs filesystem object
285    * @return the default block size for the path's filesystem
286    * @throws IOException e
287    */
288   public static long getDefaultBlockSize(final FileSystem fs, final Path path) throws IOException {
289     Method m = null;
290     Class<? extends FileSystem> cls = fs.getClass();
291     try {
292       m = cls.getMethod("getDefaultBlockSize", new Class<?>[] { Path.class });
293     } catch (NoSuchMethodException e) {
294       LOG.info("FileSystem doesn't support getDefaultBlockSize");
295     } catch (SecurityException e) {
296       LOG.info("Doesn't have access to getDefaultBlockSize on FileSystems", e);
297       m = null; // could happen on setAccessible()
298     }
299     if (m == null) {
300       return fs.getDefaultBlockSize(path);
301     } else {
302       try {
303         Object ret = m.invoke(fs, path);
304         return ((Long)ret).longValue();
305       } catch (Exception e) {
306         throw new IOException(e);
307       }
308     }
309   }
310 
311   /*
312    * Get the default replication.
313    *
314    * use reflection to search for getDefaultReplication(Path f)
315    * if the method doesn't exist, fall back to using getDefaultReplication()
316    *
317    * @param fs filesystem object
318    * @param f path of file
319    * @return default replication for the path's filesystem
320    * @throws IOException e
321    */
322   public static short getDefaultReplication(final FileSystem fs, final Path path) throws IOException {
323     Method m = null;
324     Class<? extends FileSystem> cls = fs.getClass();
325     try {
326       m = cls.getMethod("getDefaultReplication", new Class<?>[] { Path.class });
327     } catch (NoSuchMethodException e) {
328       LOG.info("FileSystem doesn't support getDefaultReplication");
329     } catch (SecurityException e) {
330       LOG.info("Doesn't have access to getDefaultReplication on FileSystems", e);
331       m = null; // could happen on setAccessible()
332     }
333     if (m == null) {
334       return fs.getDefaultReplication(path);
335     } else {
336       try {
337         Object ret = m.invoke(fs, path);
338         return ((Number)ret).shortValue();
339       } catch (Exception e) {
340         throw new IOException(e);
341       }
342     }
343   }
344 
345   /**
346    * Returns the default buffer size to use during writes.
347    *
348    * The size of the buffer should probably be a multiple of hardware
349    * page size (4096 on Intel x86), and it determines how much data is
350    * buffered during read and write operations.
351    *
352    * @param fs filesystem object
353    * @return default buffer size to use during writes
354    */
355   public static int getDefaultBufferSize(final FileSystem fs) {
356     return fs.getConf().getInt("io.file.buffer.size", 4096);
357   }
358 
359   /**
360    * Create the specified file on the filesystem. By default, this will:
361    * <ol>
362    * <li>overwrite the file if it exists</li>
363    * <li>apply the umask in the configuration (if it is enabled)</li>
364    * <li>use the fs configured buffer size (or 4096 if not set)</li>
365    * <li>use the configured column family replication or default replication if
366    * {@link HColumnDescriptor#DEFAULT_DFS_REPLICATION}</li>
367    * <li>use the default block size</li>
368    * <li>not track progress</li>
369    * </ol>
370    * @param conf configurations
371    * @param fs {@link FileSystem} on which to write the file
372    * @param path {@link Path} to the file to write
373    * @param perm permissions
374    * @param favoredNodes
375    * @return output stream to the created file
376    * @throws IOException if the file cannot be created
377    */
378   public static FSDataOutputStream create(Configuration conf, FileSystem fs, Path path,
379       FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException {
380     if (fs instanceof HFileSystem) {
381       FileSystem backingFs = ((HFileSystem)fs).getBackingFs();
382       if (backingFs instanceof DistributedFileSystem) {
383         // Try to use the favoredNodes version via reflection to allow backwards-
384         // compatibility.
385         short replication = Short.parseShort(conf.get(HColumnDescriptor.DFS_REPLICATION,
386           String.valueOf(HColumnDescriptor.DEFAULT_DFS_REPLICATION)));
387         try {
388           return (FSDataOutputStream) (DistributedFileSystem.class.getDeclaredMethod("create",
389             Path.class, FsPermission.class, boolean.class, int.class, short.class, long.class,
390             Progressable.class, InetSocketAddress[].class).invoke(backingFs, path, perm, true,
391             getDefaultBufferSize(backingFs),
392             replication > 0 ? replication : getDefaultReplication(backingFs, path),
393             getDefaultBlockSize(backingFs, path), null, favoredNodes));
394         } catch (InvocationTargetException ite) {
395           // Function was properly called, but threw it's own exception.
396           throw new IOException(ite.getCause());
397         } catch (NoSuchMethodException e) {
398           LOG.debug("DFS Client does not support most favored nodes create; using default create");
399           if (LOG.isTraceEnabled()) LOG.trace("Ignoring; use default create", e);
400         } catch (IllegalArgumentException e) {
401           LOG.debug("Ignoring (most likely Reflection related exception) " + e);
402         } catch (SecurityException e) {
403           LOG.debug("Ignoring (most likely Reflection related exception) " + e);
404         } catch (IllegalAccessException e) {
405           LOG.debug("Ignoring (most likely Reflection related exception) " + e);
406         }
407       }
408     }
409     return create(fs, path, perm, true);
410   }
411 
412   /**
413    * Create the specified file on the filesystem. By default, this will:
414    * <ol>
415    * <li>apply the umask in the configuration (if it is enabled)</li>
416    * <li>use the fs configured buffer size (or 4096 if not set)</li>
417    * <li>use the default replication</li>
418    * <li>use the default block size</li>
419    * <li>not track progress</li>
420    * </ol>
421    *
422    * @param fs {@link FileSystem} on which to write the file
423    * @param path {@link Path} to the file to write
424    * @param perm
425    * @param overwrite Whether or not the created file should be overwritten.
426    * @return output stream to the created file
427    * @throws IOException if the file cannot be created
428    */
429   public static FSDataOutputStream create(FileSystem fs, Path path,
430       FsPermission perm, boolean overwrite) throws IOException {
431     if (LOG.isTraceEnabled()) {
432       LOG.trace("Creating file=" + path + " with permission=" + perm + ", overwrite=" + overwrite);
433     }
434     return fs.create(path, perm, overwrite, getDefaultBufferSize(fs),
435         getDefaultReplication(fs, path), getDefaultBlockSize(fs, path), null);
436   }
437 
438   /**
439    * Get the file permissions specified in the configuration, if they are
440    * enabled.
441    *
442    * @param fs filesystem that the file will be created on.
443    * @param conf configuration to read for determining if permissions are
444    *          enabled and which to use
445    * @param permssionConfKey property key in the configuration to use when
446    *          finding the permission
447    * @return the permission to use when creating a new file on the fs. If
448    *         special permissions are not specified in the configuration, then
449    *         the default permissions on the the fs will be returned.
450    */
451   public static FsPermission getFilePermissions(final FileSystem fs,
452       final Configuration conf, final String permssionConfKey) {
453     boolean enablePermissions = conf.getBoolean(
454         HConstants.ENABLE_DATA_FILE_UMASK, false);
455 
456     if (enablePermissions) {
457       try {
458         FsPermission perm = new FsPermission(FULL_RWX_PERMISSIONS);
459         // make sure that we have a mask, if not, go default.
460         String mask = conf.get(permssionConfKey);
461         if (mask == null)
462           return FsPermission.getFileDefault();
463         // appy the umask
464         FsPermission umask = new FsPermission(mask);
465         return perm.applyUMask(umask);
466       } catch (IllegalArgumentException e) {
467         LOG.warn(
468             "Incorrect umask attempted to be created: "
469                 + conf.get(permssionConfKey)
470                 + ", using default file permissions.", e);
471         return FsPermission.getFileDefault();
472       }
473     }
474     return FsPermission.getFileDefault();
475   }
476 
477   /**
478    * Checks to see if the specified file system is available
479    *
480    * @param fs filesystem
481    * @throws IOException e
482    */
483   public static void checkFileSystemAvailable(final FileSystem fs)
484   throws IOException {
485     if (!(fs instanceof DistributedFileSystem)) {
486       return;
487     }
488     IOException exception = null;
489     DistributedFileSystem dfs = (DistributedFileSystem) fs;
490     try {
491       if (dfs.exists(new Path("/"))) {
492         return;
493       }
494     } catch (IOException e) {
495       exception = e instanceof RemoteException ?
496               ((RemoteException)e).unwrapRemoteException() : e;
497     }
498     try {
499       fs.close();
500     } catch (Exception e) {
501       LOG.error("file system close failed: ", e);
502     }
503     IOException io = new IOException("File system is not available");
504     io.initCause(exception);
505     throw io;
506   }
507 
508   /**
509    * We use reflection because {@link DistributedFileSystem#setSafeMode(
510    * HdfsConstants.SafeModeAction action, boolean isChecked)} is not in hadoop 1.1
511    *
512    * @param dfs
513    * @return whether we're in safe mode
514    * @throws IOException
515    */
516   private static boolean isInSafeMode(DistributedFileSystem dfs) throws IOException {
517     boolean inSafeMode = false;
518     try {
519       Method m = DistributedFileSystem.class.getMethod("setSafeMode", new Class<?> []{
520           org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.class, boolean.class});
521       inSafeMode = (Boolean) m.invoke(dfs,
522         org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET, true);
523     } catch (Exception e) {
524       if (e instanceof IOException) throw (IOException) e;
525 
526       // Check whether dfs is on safemode.
527       inSafeMode = dfs.setSafeMode(
528         org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET);
529     }
530     return inSafeMode;
531   }
532 
533   /**
534    * Check whether dfs is in safemode.
535    * @param conf
536    * @throws IOException
537    */
538   public static void checkDfsSafeMode(final Configuration conf)
539   throws IOException {
540     boolean isInSafeMode = false;
541     FileSystem fs = FileSystem.get(conf);
542     if (fs instanceof DistributedFileSystem) {
543       DistributedFileSystem dfs = (DistributedFileSystem)fs;
544       isInSafeMode = isInSafeMode(dfs);
545     }
546     if (isInSafeMode) {
547       throw new IOException("File system is in safemode, it can't be written now");
548     }
549   }
550 
551   /**
552    * Verifies current version of file system
553    *
554    * @param fs filesystem object
555    * @param rootdir root hbase directory
556    * @return null if no version file exists, version string otherwise.
557    * @throws IOException e
558    * @throws org.apache.hadoop.hbase.exceptions.DeserializationException
559    */
560   public static String getVersion(FileSystem fs, Path rootdir)
561   throws IOException, DeserializationException {
562     Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
563     FileStatus[] status = null;
564     try {
565       // hadoop 2.0 throws FNFE if directory does not exist.
566       // hadoop 1.0 returns null if directory does not exist.
567       status = fs.listStatus(versionFile);
568     } catch (FileNotFoundException fnfe) {
569       return null;
570     }
571     if (status == null || status.length == 0) return null;
572     String version = null;
573     byte [] content = new byte [(int)status[0].getLen()];
574     FSDataInputStream s = fs.open(versionFile);
575     try {
576       IOUtils.readFully(s, content, 0, content.length);
577       if (ProtobufUtil.isPBMagicPrefix(content)) {
578         version = parseVersionFrom(content);
579       } else {
580         // Presume it pre-pb format.
581         InputStream is = new ByteArrayInputStream(content);
582         DataInputStream dis = new DataInputStream(is);
583         try {
584           version = dis.readUTF();
585         } finally {
586           dis.close();
587         }
588       }
589     } catch (EOFException eof) {
590       LOG.warn("Version file was empty, odd, will try to set it.");
591     } finally {
592       s.close();
593     }
594     return version;
595   }
596 
597   /**
598    * Parse the content of the ${HBASE_ROOTDIR}/hbase.version file.
599    * @param bytes The byte content of the hbase.version file.
600    * @return The version found in the file as a String.
601    * @throws DeserializationException
602    */
603   static String parseVersionFrom(final byte [] bytes)
604   throws DeserializationException {
605     ProtobufUtil.expectPBMagicPrefix(bytes);
606     int pblen = ProtobufUtil.lengthOfPBMagic();
607     FSProtos.HBaseVersionFileContent.Builder builder =
608       FSProtos.HBaseVersionFileContent.newBuilder();
609     try {
610       ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
611       return builder.getVersion();
612     } catch (IOException e) {
613       // Convert
614       throw new DeserializationException(e);
615     }
616   }
617 
618   /**
619    * Create the content to write into the ${HBASE_ROOTDIR}/hbase.version file.
620    * @param version Version to persist
621    * @return Serialized protobuf with <code>version</code> content and a bit of pb magic for a prefix.
622    */
623   static byte [] toVersionByteArray(final String version) {
624     FSProtos.HBaseVersionFileContent.Builder builder =
625       FSProtos.HBaseVersionFileContent.newBuilder();
626     return ProtobufUtil.prependPBMagic(builder.setVersion(version).build().toByteArray());
627   }
628 
629   /**
630    * Verifies current version of file system
631    *
632    * @param fs file system
633    * @param rootdir root directory of HBase installation
634    * @param message if true, issues a message on System.out
635    *
636    * @throws IOException e
637    * @throws DeserializationException
638    */
639   public static void checkVersion(FileSystem fs, Path rootdir, boolean message)
640   throws IOException, DeserializationException {
641     checkVersion(fs, rootdir, message, 0, HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
642   }
643 
644   /**
645    * Verifies current version of file system
646    *
647    * @param fs file system
648    * @param rootdir root directory of HBase installation
649    * @param message if true, issues a message on System.out
650    * @param wait wait interval
651    * @param retries number of times to retry
652    *
653    * @throws IOException e
654    * @throws DeserializationException
655    */
656   public static void checkVersion(FileSystem fs, Path rootdir,
657       boolean message, int wait, int retries)
658   throws IOException, DeserializationException {
659     String version = getVersion(fs, rootdir);
660     if (version == null) {
661       if (!metaRegionExists(fs, rootdir)) {
662         // rootDir is empty (no version file and no root region)
663         // just create new version file (HBASE-1195)
664         setVersion(fs, rootdir, wait, retries);
665         return;
666       }
667     } else if (version.compareTo(HConstants.FILE_SYSTEM_VERSION) == 0) return;
668 
669     // version is deprecated require migration
670     // Output on stdout so user sees it in terminal.
671     String msg = "HBase file layout needs to be upgraded."
672       + " You have version " + version
673       + " and I want version " + HConstants.FILE_SYSTEM_VERSION
674       + ". Consult http://hbase.apache.org/book.html for further information about upgrading HBase."
675       + " Is your hbase.rootdir valid? If so, you may need to run "
676       + "'hbase hbck -fixVersionFile'.";
677     if (message) {
678       System.out.println("WARNING! " + msg);
679     }
680     throw new FileSystemVersionException(msg);
681   }
682 
683   /**
684    * Sets version of file system
685    *
686    * @param fs filesystem object
687    * @param rootdir hbase root
688    * @throws IOException e
689    */
690   public static void setVersion(FileSystem fs, Path rootdir)
691   throws IOException {
692     setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, 0,
693       HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
694   }
695 
696   /**
697    * Sets version of file system
698    *
699    * @param fs filesystem object
700    * @param rootdir hbase root
701    * @param wait time to wait for retry
702    * @param retries number of times to retry before failing
703    * @throws IOException e
704    */
705   public static void setVersion(FileSystem fs, Path rootdir, int wait, int retries)
706   throws IOException {
707     setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, wait, retries);
708   }
709 
710 
711   /**
712    * Sets version of file system
713    *
714    * @param fs filesystem object
715    * @param rootdir hbase root directory
716    * @param version version to set
717    * @param wait time to wait for retry
718    * @param retries number of times to retry before throwing an IOException
719    * @throws IOException e
720    */
721   public static void setVersion(FileSystem fs, Path rootdir, String version,
722       int wait, int retries) throws IOException {
723     Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
724     Path tempVersionFile = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY + Path.SEPARATOR +
725       HConstants.VERSION_FILE_NAME);
726     while (true) {
727       try {
728         // Write the version to a temporary file
729         FSDataOutputStream s = fs.create(tempVersionFile);
730         try {
731           s.write(toVersionByteArray(version));
732           s.close();
733           s = null;
734           // Move the temp version file to its normal location. Returns false
735           // if the rename failed. Throw an IOE in that case.
736           if (!fs.rename(tempVersionFile, versionFile)) {
737             throw new IOException("Unable to move temp version file to " + versionFile);
738           }
739         } finally {
740           // Cleaning up the temporary if the rename failed would be trying
741           // too hard. We'll unconditionally create it again the next time
742           // through anyway, files are overwritten by default by create().
743 
744           // Attempt to close the stream on the way out if it is still open.
745           try {
746             if (s != null) s.close();
747           } catch (IOException ignore) { }
748         }
749         LOG.info("Created version file at " + rootdir.toString() + " with version=" + version);
750         return;
751       } catch (IOException e) {
752         if (retries > 0) {
753           LOG.debug("Unable to create version file at " + rootdir.toString() + ", retrying", e);
754           fs.delete(versionFile, false);
755           try {
756             if (wait > 0) {
757               Thread.sleep(wait);
758             }
759           } catch (InterruptedException ie) {
760             throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
761           }
762           retries--;
763         } else {
764           throw e;
765         }
766       }
767     }
768   }
769 
770   /**
771    * Checks that a cluster ID file exists in the HBase root directory
772    * @param fs the root directory FileSystem
773    * @param rootdir the HBase root directory in HDFS
774    * @param wait how long to wait between retries
775    * @return <code>true</code> if the file exists, otherwise <code>false</code>
776    * @throws IOException if checking the FileSystem fails
777    */
778   public static boolean checkClusterIdExists(FileSystem fs, Path rootdir,
779       int wait) throws IOException {
780     while (true) {
781       try {
782         Path filePath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
783         return fs.exists(filePath);
784       } catch (IOException ioe) {
785         if (wait > 0) {
786           LOG.warn("Unable to check cluster ID file in " + rootdir.toString() +
787               ", retrying in "+wait+"msec: "+StringUtils.stringifyException(ioe));
788           try {
789             Thread.sleep(wait);
790           } catch (InterruptedException e) {
791             throw (InterruptedIOException)new InterruptedIOException().initCause(e);
792           }
793         } else {
794           throw ioe;
795         }
796       }
797     }
798   }
799 
800   /**
801    * Returns the value of the unique cluster ID stored for this HBase instance.
802    * @param fs the root directory FileSystem
803    * @param rootdir the path to the HBase root directory
804    * @return the unique cluster identifier
805    * @throws IOException if reading the cluster ID file fails
806    */
807   public static ClusterId getClusterId(FileSystem fs, Path rootdir)
808   throws IOException {
809     Path idPath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
810     ClusterId clusterId = null;
811     FileStatus status = fs.exists(idPath)? fs.getFileStatus(idPath):  null;
812     if (status != null) {
813       int len = Ints.checkedCast(status.getLen());
814       byte [] content = new byte[len];
815       FSDataInputStream in = fs.open(idPath);
816       try {
817         in.readFully(content);
818       } catch (EOFException eof) {
819         LOG.warn("Cluster ID file " + idPath.toString() + " was empty");
820       } finally{
821         in.close();
822       }
823       try {
824         clusterId = ClusterId.parseFrom(content);
825       } catch (DeserializationException e) {
826         throw new IOException("content=" + Bytes.toString(content), e);
827       }
828       // If not pb'd, make it so.
829       if (!ProtobufUtil.isPBMagicPrefix(content)) {
830         String cid = null;
831         in = fs.open(idPath);
832         try {
833           cid = in.readUTF();
834           clusterId = new ClusterId(cid);
835         } catch (EOFException eof) {
836           LOG.warn("Cluster ID file " + idPath.toString() + " was empty");
837         } finally {
838           in.close();
839         }
840         rewriteAsPb(fs, rootdir, idPath, clusterId);
841       }
842       return clusterId;
843     } else {
844       LOG.warn("Cluster ID file does not exist at " + idPath.toString());
845     }
846     return clusterId;
847   }
848 
849   /**
850    * @param cid
851    * @throws IOException
852    */
853   private static void rewriteAsPb(final FileSystem fs, final Path rootdir, final Path p,
854       final ClusterId cid)
855   throws IOException {
856     // Rewrite the file as pb.  Move aside the old one first, write new
857     // then delete the moved-aside file.
858     Path movedAsideName = new Path(p + "." + System.currentTimeMillis());
859     if (!fs.rename(p, movedAsideName)) throw new IOException("Failed rename of " + p);
860     setClusterId(fs, rootdir, cid, 100);
861     if (!fs.delete(movedAsideName, false)) {
862       throw new IOException("Failed delete of " + movedAsideName);
863     }
864     LOG.debug("Rewrote the hbase.id file as pb");
865   }
866 
867   /**
868    * Writes a new unique identifier for this cluster to the "hbase.id" file
869    * in the HBase root directory
870    * @param fs the root directory FileSystem
871    * @param rootdir the path to the HBase root directory
872    * @param clusterId the unique identifier to store
873    * @param wait how long (in milliseconds) to wait between retries
874    * @throws IOException if writing to the FileSystem fails and no wait value
875    */
876   public static void setClusterId(FileSystem fs, Path rootdir, ClusterId clusterId,
877       int wait) throws IOException {
878     while (true) {
879       try {
880         Path idFile = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
881         Path tempIdFile = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY +
882           Path.SEPARATOR + HConstants.CLUSTER_ID_FILE_NAME);
883         // Write the id file to a temporary location
884         FSDataOutputStream s = fs.create(tempIdFile);
885         try {
886           s.write(clusterId.toByteArray());
887           s.close();
888           s = null;
889           // Move the temporary file to its normal location. Throw an IOE if
890           // the rename failed
891           if (!fs.rename(tempIdFile, idFile)) {
892             throw new IOException("Unable to move temp version file to " + idFile);
893           }
894         } finally {
895           // Attempt to close the stream if still open on the way out
896           try {
897             if (s != null) s.close();
898           } catch (IOException ignore) { }
899         }
900         if (LOG.isDebugEnabled()) {
901           LOG.debug("Created cluster ID file at " + idFile.toString() + " with ID: " + clusterId);
902         }
903         return;
904       } catch (IOException ioe) {
905         if (wait > 0) {
906           LOG.warn("Unable to create cluster ID file in " + rootdir.toString() +
907               ", retrying in " + wait + "msec: " + StringUtils.stringifyException(ioe));
908           try {
909             Thread.sleep(wait);
910           } catch (InterruptedException e) {
911             throw (InterruptedIOException)new InterruptedIOException().initCause(e);
912           }
913         } else {
914           throw ioe;
915         }
916       }
917     }
918   }
919 
920   /**
921    * Verifies root directory path is a valid URI with a scheme
922    *
923    * @param root root directory path
924    * @return Passed <code>root</code> argument.
925    * @throws IOException if not a valid URI with a scheme
926    */
927   public static Path validateRootPath(Path root) throws IOException {
928     try {
929       URI rootURI = new URI(root.toString());
930       String scheme = rootURI.getScheme();
931       if (scheme == null) {
932         throw new IOException("Root directory does not have a scheme");
933       }
934       return root;
935     } catch (URISyntaxException e) {
936       IOException io = new IOException("Root directory path is not a valid " +
937         "URI -- check your " + HConstants.HBASE_DIR + " configuration");
938       io.initCause(e);
939       throw io;
940     }
941   }
942 
943   /**
944    * Checks for the presence of the root path (using the provided conf object) in the given path. If
945    * it exists, this method removes it and returns the String representation of remaining relative path.
946    * @param path
947    * @param conf
948    * @return String representation of the remaining relative path
949    * @throws IOException
950    */
951   public static String removeRootPath(Path path, final Configuration conf) throws IOException {
952     Path root = FSUtils.getRootDir(conf);
953     String pathStr = path.toString();
954     // check that the path is absolute... it has the root path in it.
955     if (!pathStr.startsWith(root.toString())) return pathStr;
956     // if not, return as it is.
957     return pathStr.substring(root.toString().length() + 1);// remove the "/" too.
958   }
959 
960   /**
961    * If DFS, check safe mode and if so, wait until we clear it.
962    * @param conf configuration
963    * @param wait Sleep between retries
964    * @throws IOException e
965    */
966   public static void waitOnSafeMode(final Configuration conf,
967     final long wait)
968   throws IOException {
969     FileSystem fs = FileSystem.get(conf);
970     if (!(fs instanceof DistributedFileSystem)) return;
971     DistributedFileSystem dfs = (DistributedFileSystem)fs;
972     // Make sure dfs is not in safe mode
973     while (isInSafeMode(dfs)) {
974       LOG.info("Waiting for dfs to exit safe mode...");
975       try {
976         Thread.sleep(wait);
977       } catch (InterruptedException e) {
978         throw (InterruptedIOException)new InterruptedIOException().initCause(e);
979       }
980     }
981   }
982 
983   /**
984    * Return the 'path' component of a Path.  In Hadoop, Path is an URI.  This
985    * method returns the 'path' component of a Path's URI: e.g. If a Path is
986    * <code>hdfs://example.org:9000/hbase_trunk/TestTable/compaction.dir</code>,
987    * this method returns <code>/hbase_trunk/TestTable/compaction.dir</code>.
988    * This method is useful if you want to print out a Path without qualifying
989    * Filesystem instance.
990    * @param p Filesystem Path whose 'path' component we are to return.
991    * @return Path portion of the Filesystem
992    */
993   public static String getPath(Path p) {
994     return p.toUri().getPath();
995   }
996 
997   /**
998    * @param c configuration
999    * @return Path to hbase root directory: i.e. <code>hbase.rootdir</code> from
1000    * configuration as a qualified Path.
1001    * @throws IOException e
1002    */
1003   public static Path getRootDir(final Configuration c) throws IOException {
1004     Path p = new Path(c.get(HConstants.HBASE_DIR));
1005     FileSystem fs = p.getFileSystem(c);
1006     return p.makeQualified(fs);
1007   }
1008 
1009   public static void setRootDir(final Configuration c, final Path root) throws IOException {
1010     c.set(HConstants.HBASE_DIR, root.toString());
1011   }
1012 
1013   public static void setFsDefault(final Configuration c, final Path root) throws IOException {
1014     c.set("fs.defaultFS", root.toString());    // for hadoop 0.21+
1015   }
1016 
1017   /**
1018    * Checks if meta region exists
1019    *
1020    * @param fs file system
1021    * @param rootdir root directory of HBase installation
1022    * @return true if exists
1023    * @throws IOException e
1024    */
1025   @SuppressWarnings("deprecation")
1026   public static boolean metaRegionExists(FileSystem fs, Path rootdir)
1027   throws IOException {
1028     Path metaRegionDir =
1029       HRegion.getRegionDir(rootdir, HRegionInfo.FIRST_META_REGIONINFO);
1030     return fs.exists(metaRegionDir);
1031   }
1032 
1033   /**
1034    * Compute HDFS blocks distribution of a given file, or a portion of the file
1035    * @param fs file system
1036    * @param status file status of the file
1037    * @param start start position of the portion
1038    * @param length length of the portion
1039    * @return The HDFS blocks distribution
1040    */
1041   static public HDFSBlocksDistribution computeHDFSBlocksDistribution(
1042     final FileSystem fs, FileStatus status, long start, long length)
1043     throws IOException {
1044     HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution();
1045     BlockLocation [] blockLocations =
1046       fs.getFileBlockLocations(status, start, length);
1047     for(BlockLocation bl : blockLocations) {
1048       String [] hosts = bl.getHosts();
1049       long len = bl.getLength();
1050       blocksDistribution.addHostsAndBlockWeight(hosts, len);
1051     }
1052 
1053     return blocksDistribution;
1054   }
1055 
1056   // TODO move this method OUT of FSUtils. No dependencies to HMaster
1057   /**
1058    * Returns the total overall fragmentation percentage. Includes hbase:meta and
1059    * -ROOT- as well.
1060    *
1061    * @param master  The master defining the HBase root and file system.
1062    * @return A map for each table and its percentage.
1063    * @throws IOException When scanning the directory fails.
1064    */
1065   public static int getTotalTableFragmentation(final HMaster master)
1066   throws IOException {
1067     Map<String, Integer> map = getTableFragmentation(master);
1068     return map != null && map.size() > 0 ? map.get("-TOTAL-") : -1;
1069   }
1070 
1071   /**
1072    * Runs through the HBase rootdir and checks how many stores for each table
1073    * have more than one file in them. Checks -ROOT- and hbase:meta too. The total
1074    * percentage across all tables is stored under the special key "-TOTAL-".
1075    *
1076    * @param master  The master defining the HBase root and file system.
1077    * @return A map for each table and its percentage.
1078    *
1079    * @throws IOException When scanning the directory fails.
1080    */
1081   public static Map<String, Integer> getTableFragmentation(
1082     final HMaster master)
1083   throws IOException {
1084     Path path = getRootDir(master.getConfiguration());
1085     // since HMaster.getFileSystem() is package private
1086     FileSystem fs = path.getFileSystem(master.getConfiguration());
1087     return getTableFragmentation(fs, path);
1088   }
1089 
1090   /**
1091    * Runs through the HBase rootdir and checks how many stores for each table
1092    * have more than one file in them. Checks -ROOT- and hbase:meta too. The total
1093    * percentage across all tables is stored under the special key "-TOTAL-".
1094    *
1095    * @param fs  The file system to use.
1096    * @param hbaseRootDir  The root directory to scan.
1097    * @return A map for each table and its percentage.
1098    * @throws IOException When scanning the directory fails.
1099    */
1100   public static Map<String, Integer> getTableFragmentation(
1101     final FileSystem fs, final Path hbaseRootDir)
1102   throws IOException {
1103     Map<String, Integer> frags = new HashMap<String, Integer>();
1104     int cfCountTotal = 0;
1105     int cfFragTotal = 0;
1106     PathFilter regionFilter = new RegionDirFilter(fs);
1107     PathFilter familyFilter = new FamilyDirFilter(fs);
1108     List<Path> tableDirs = getTableDirs(fs, hbaseRootDir);
1109     for (Path d : tableDirs) {
1110       int cfCount = 0;
1111       int cfFrag = 0;
1112       FileStatus[] regionDirs = fs.listStatus(d, regionFilter);
1113       for (FileStatus regionDir : regionDirs) {
1114         Path dd = regionDir.getPath();
1115         // else its a region name, now look in region for families
1116         FileStatus[] familyDirs = fs.listStatus(dd, familyFilter);
1117         for (FileStatus familyDir : familyDirs) {
1118           cfCount++;
1119           cfCountTotal++;
1120           Path family = familyDir.getPath();
1121           // now in family make sure only one file
1122           FileStatus[] familyStatus = fs.listStatus(family);
1123           if (familyStatus.length > 1) {
1124             cfFrag++;
1125             cfFragTotal++;
1126           }
1127         }
1128       }
1129       // compute percentage per table and store in result list
1130       frags.put(FSUtils.getTableName(d).getNameAsString(),
1131         cfCount == 0? 0: Math.round((float) cfFrag / cfCount * 100));
1132     }
1133     // set overall percentage for all tables
1134     frags.put("-TOTAL-",
1135       cfCountTotal == 0? 0: Math.round((float) cfFragTotal / cfCountTotal * 100));
1136     return frags;
1137   }
1138 
1139   /**
1140    * Returns the {@link org.apache.hadoop.fs.Path} object representing the table directory under
1141    * path rootdir
1142    *
1143    * @param rootdir qualified path of HBase root directory
1144    * @param tableName name of table
1145    * @return {@link org.apache.hadoop.fs.Path} for table
1146    */
1147   public static Path getTableDir(Path rootdir, final TableName tableName) {
1148     return new Path(getNamespaceDir(rootdir, tableName.getNamespaceAsString()),
1149         tableName.getQualifierAsString());
1150   }
1151 
1152   /**
1153    * Returns the {@link org.apache.hadoop.hbase.TableName} object representing
1154    * the table directory under
1155    * path rootdir
1156    *
1157    * @param tablePath path of table
1158    * @return {@link org.apache.hadoop.fs.Path} for table
1159    */
1160   public static TableName getTableName(Path tablePath) {
1161     return TableName.valueOf(tablePath.getParent().getName(), tablePath.getName());
1162   }
1163 
1164   /**
1165    * Returns the {@link org.apache.hadoop.fs.Path} object representing
1166    * the namespace directory under path rootdir
1167    *
1168    * @param rootdir qualified path of HBase root directory
1169    * @param namespace namespace name
1170    * @return {@link org.apache.hadoop.fs.Path} for table
1171    */
1172   public static Path getNamespaceDir(Path rootdir, final String namespace) {
1173     return new Path(rootdir, new Path(HConstants.BASE_NAMESPACE_DIR,
1174         new Path(namespace)));
1175   }
1176 
1177   /**
1178    * A {@link PathFilter} that returns only regular files.
1179    */
1180   static class FileFilter implements PathFilter {
1181     private final FileSystem fs;
1182 
1183     public FileFilter(final FileSystem fs) {
1184       this.fs = fs;
1185     }
1186 
1187     @Override
1188     public boolean accept(Path p) {
1189       try {
1190         return fs.isFile(p);
1191       } catch (IOException e) {
1192         LOG.debug("unable to verify if path=" + p + " is a regular file", e);
1193         return false;
1194       }
1195     }
1196   }
1197 
1198   /**
1199    * Directory filter that doesn't include any of the directories in the specified blacklist
1200    */
1201   public static class BlackListDirFilter implements PathFilter {
1202     private final FileSystem fs;
1203     private List<String> blacklist;
1204 
1205     /**
1206      * Create a filter on the givem filesystem with the specified blacklist
1207      * @param fs filesystem to filter
1208      * @param directoryNameBlackList list of the names of the directories to filter. If
1209      *          <tt>null</tt>, all directories are returned
1210      */
1211     @SuppressWarnings("unchecked")
1212     public BlackListDirFilter(final FileSystem fs, final List<String> directoryNameBlackList) {
1213       this.fs = fs;
1214       blacklist =
1215         (List<String>) (directoryNameBlackList == null ? Collections.emptyList()
1216           : directoryNameBlackList);
1217     }
1218 
1219     @Override
1220     public boolean accept(Path p) {
1221       boolean isValid = false;
1222       try {
1223         if (isValidName(p.getName())) {
1224           isValid = fs.getFileStatus(p).isDirectory();
1225         } else {
1226           isValid = false;
1227         }
1228       } catch (IOException e) {
1229         LOG.warn("An error occurred while verifying if [" + p.toString()
1230             + "] is a valid directory. Returning 'not valid' and continuing.", e);
1231       }
1232       return isValid;
1233     }
1234 
1235     protected boolean isValidName(final String name) {
1236       return !blacklist.contains(name);
1237     }
1238   }
1239 
1240   /**
1241    * A {@link PathFilter} that only allows directories.
1242    */
1243   public static class DirFilter extends BlackListDirFilter {
1244 
1245     public DirFilter(FileSystem fs) {
1246       super(fs, null);
1247     }
1248   }
1249 
1250   /**
1251    * A {@link PathFilter} that returns usertable directories. To get all directories use the
1252    * {@link BlackListDirFilter} with a <tt>null</tt> blacklist
1253    */
1254   public static class UserTableDirFilter extends BlackListDirFilter {
1255     public UserTableDirFilter(FileSystem fs) {
1256       super(fs, HConstants.HBASE_NON_TABLE_DIRS);
1257     }
1258 
1259     protected boolean isValidName(final String name) {
1260       if (!super.isValidName(name))
1261         return false;
1262 
1263       try {
1264         TableName.isLegalTableQualifierName(Bytes.toBytes(name));
1265       } catch (IllegalArgumentException e) {
1266         LOG.info("INVALID NAME " + name);
1267         return false;
1268       }
1269       return true;
1270     }
1271   }
1272 
1273   /**
1274    * Heuristic to determine whether is safe or not to open a file for append
1275    * Looks both for dfs.support.append and use reflection to search
1276    * for SequenceFile.Writer.syncFs() or FSDataOutputStream.hflush()
1277    * @param conf
1278    * @return True if append support
1279    */
1280   public static boolean isAppendSupported(final Configuration conf) {
1281     boolean append = conf.getBoolean("dfs.support.append", false);
1282     if (append) {
1283       try {
1284         // TODO: The implementation that comes back when we do a createWriter
1285         // may not be using SequenceFile so the below is not a definitive test.
1286         // Will do for now (hdfs-200).
1287         SequenceFile.Writer.class.getMethod("syncFs", new Class<?> []{});
1288         append = true;
1289       } catch (SecurityException e) {
1290       } catch (NoSuchMethodException e) {
1291         append = false;
1292       }
1293     }
1294     if (!append) {
1295       // Look for the 0.21, 0.22, new-style append evidence.
1296       try {
1297         FSDataOutputStream.class.getMethod("hflush", new Class<?> []{});
1298         append = true;
1299       } catch (NoSuchMethodException e) {
1300         append = false;
1301       }
1302     }
1303     return append;
1304   }
1305 
1306   /**
1307    * @param conf
1308    * @return True if this filesystem whose scheme is 'hdfs'.
1309    * @throws IOException
1310    */
1311   public static boolean isHDFS(final Configuration conf) throws IOException {
1312     FileSystem fs = FileSystem.get(conf);
1313     String scheme = fs.getUri().getScheme();
1314     return scheme.equalsIgnoreCase("hdfs");
1315   }
1316 
1317   /**
1318    * Recover file lease. Used when a file might be suspect
1319    * to be had been left open by another process.
1320    * @param fs FileSystem handle
1321    * @param p Path of file to recover lease
1322    * @param conf Configuration handle
1323    * @throws IOException
1324    */
1325   public abstract void recoverFileLease(final FileSystem fs, final Path p,
1326       Configuration conf, CancelableProgressable reporter) throws IOException;
1327 
1328   public static List<Path> getTableDirs(final FileSystem fs, final Path rootdir)
1329       throws IOException {
1330     List<Path> tableDirs = new LinkedList<Path>();
1331 
1332     for(FileStatus status :
1333         fs.globStatus(new Path(rootdir,
1334             new Path(HConstants.BASE_NAMESPACE_DIR, "*")))) {
1335       tableDirs.addAll(FSUtils.getLocalTableDirs(fs, status.getPath()));
1336     }
1337     return tableDirs;
1338   }
1339 
1340   /**
1341    * @param fs
1342    * @param rootdir
1343    * @return All the table directories under <code>rootdir</code>. Ignore non table hbase folders such as
1344    * .logs, .oldlogs, .corrupt folders.
1345    * @throws IOException
1346    */
1347   public static List<Path> getLocalTableDirs(final FileSystem fs, final Path rootdir)
1348       throws IOException {
1349     // presumes any directory under hbase.rootdir is a table
1350     FileStatus[] dirs = fs.listStatus(rootdir, new UserTableDirFilter(fs));
1351     List<Path> tabledirs = new ArrayList<Path>(dirs.length);
1352     for (FileStatus dir: dirs) {
1353       tabledirs.add(dir.getPath());
1354     }
1355     return tabledirs;
1356   }
1357 
1358   /**
1359    * Checks if the given path is the one with 'recovered.edits' dir.
1360    * @param path
1361    * @return True if we recovered edits
1362    */
1363   public static boolean isRecoveredEdits(Path path) {
1364     return path.toString().contains(HConstants.RECOVERED_EDITS_DIR);
1365   }
1366 
1367   /**
1368    * Filter for all dirs that don't start with '.'
1369    */
1370   public static class RegionDirFilter implements PathFilter {
1371     // This pattern will accept 0.90+ style hex region dirs and older numeric region dir names.
1372     final public static Pattern regionDirPattern = Pattern.compile("^[0-9a-f]*$");
1373     final FileSystem fs;
1374 
1375     public RegionDirFilter(FileSystem fs) {
1376       this.fs = fs;
1377     }
1378 
1379     @Override
1380     public boolean accept(Path rd) {
1381       if (!regionDirPattern.matcher(rd.getName()).matches()) {
1382         return false;
1383       }
1384 
1385       try {
1386         return fs.getFileStatus(rd).isDirectory();
1387       } catch (IOException ioe) {
1388         // Maybe the file was moved or the fs was disconnected.
1389         LOG.warn("Skipping file " + rd +" due to IOException", ioe);
1390         return false;
1391       }
1392     }
1393   }
1394 
1395   /**
1396    * Given a particular table dir, return all the regiondirs inside it, excluding files such as
1397    * .tableinfo
1398    * @param fs A file system for the Path
1399    * @param tableDir Path to a specific table directory &lt;hbase.rootdir&gt;/&lt;tabledir&gt;
1400    * @return List of paths to valid region directories in table dir.
1401    * @throws IOException
1402    */
1403   public static List<Path> getRegionDirs(final FileSystem fs, final Path tableDir) throws IOException {
1404     // assumes we are in a table dir.
1405     FileStatus[] rds = fs.listStatus(tableDir, new RegionDirFilter(fs));
1406     List<Path> regionDirs = new ArrayList<Path>(rds.length);
1407     for (FileStatus rdfs: rds) {
1408       Path rdPath = rdfs.getPath();
1409       regionDirs.add(rdPath);
1410     }
1411     return regionDirs;
1412   }
1413 
1414   /**
1415    * Filter for all dirs that are legal column family names.  This is generally used for colfam
1416    * dirs &lt;hbase.rootdir&gt;/&lt;tabledir&gt;/&lt;regiondir&gt;/&lt;colfamdir&gt;.
1417    */
1418   public static class FamilyDirFilter implements PathFilter {
1419     final FileSystem fs;
1420 
1421     public FamilyDirFilter(FileSystem fs) {
1422       this.fs = fs;
1423     }
1424 
1425     @Override
1426     public boolean accept(Path rd) {
1427       try {
1428         // throws IAE if invalid
1429         HColumnDescriptor.isLegalFamilyName(Bytes.toBytes(rd.getName()));
1430       } catch (IllegalArgumentException iae) {
1431         // path name is an invalid family name and thus is excluded.
1432         return false;
1433       }
1434 
1435       try {
1436         return fs.getFileStatus(rd).isDirectory();
1437       } catch (IOException ioe) {
1438         // Maybe the file was moved or the fs was disconnected.
1439         LOG.warn("Skipping file " + rd +" due to IOException", ioe);
1440         return false;
1441       }
1442     }
1443   }
1444 
1445   /**
1446    * Given a particular region dir, return all the familydirs inside it
1447    *
1448    * @param fs A file system for the Path
1449    * @param regionDir Path to a specific region directory
1450    * @return List of paths to valid family directories in region dir.
1451    * @throws IOException
1452    */
1453   public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir) throws IOException {
1454     // assumes we are in a region dir.
1455     FileStatus[] fds = fs.listStatus(regionDir, new FamilyDirFilter(fs));
1456     List<Path> familyDirs = new ArrayList<Path>(fds.length);
1457     for (FileStatus fdfs: fds) {
1458       Path fdPath = fdfs.getPath();
1459       familyDirs.add(fdPath);
1460     }
1461     return familyDirs;
1462   }
1463 
1464   public static List<Path> getReferenceFilePaths(final FileSystem fs, final Path familyDir) throws IOException {
1465     FileStatus[] fds = fs.listStatus(familyDir, new ReferenceFileFilter(fs));
1466     List<Path> referenceFiles = new ArrayList<Path>(fds.length);
1467     for (FileStatus fdfs: fds) {
1468       Path fdPath = fdfs.getPath();
1469       referenceFiles.add(fdPath);
1470     }
1471     return referenceFiles;
1472   }
1473 
1474   /**
1475    * Filter for HFiles that excludes reference files.
1476    */
1477   public static class HFileFilter implements PathFilter {
1478     final FileSystem fs;
1479 
1480     public HFileFilter(FileSystem fs) {
1481       this.fs = fs;
1482     }
1483 
1484     @Override
1485     public boolean accept(Path rd) {
1486       try {
1487         // only files
1488         return !fs.getFileStatus(rd).isDirectory() && StoreFileInfo.isHFile(rd);
1489       } catch (IOException ioe) {
1490         // Maybe the file was moved or the fs was disconnected.
1491         LOG.warn("Skipping file " + rd +" due to IOException", ioe);
1492         return false;
1493       }
1494     }
1495   }
1496 
1497   public static class ReferenceFileFilter implements PathFilter {
1498 
1499     private final FileSystem fs;
1500 
1501     public ReferenceFileFilter(FileSystem fs) {
1502       this.fs = fs;
1503     }
1504 
1505     @Override
1506     public boolean accept(Path rd) {
1507       try {
1508         // only files can be references.
1509         return !fs.getFileStatus(rd).isDirectory() && StoreFileInfo.isReference(rd);
1510       } catch (IOException ioe) {
1511         // Maybe the file was moved or the fs was disconnected.
1512         LOG.warn("Skipping file " + rd +" due to IOException", ioe);
1513         return false;
1514       }
1515     }
1516   }
1517 
1518 
1519   /**
1520    * @param conf
1521    * @return Returns the filesystem of the hbase rootdir.
1522    * @throws IOException
1523    */
1524   public static FileSystem getCurrentFileSystem(Configuration conf)
1525   throws IOException {
1526     return getRootDir(conf).getFileSystem(conf);
1527   }
1528 
1529 
1530   /**
1531    * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1532    * table StoreFile names to the full Path.
1533    * <br>
1534    * Example...<br>
1535    * Key = 3944417774205889744  <br>
1536    * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1537    *
1538    * @param map map to add values.  If null, this method will create and populate one to return
1539    * @param fs  The file system to use.
1540    * @param hbaseRootDir  The root directory to scan.
1541    * @param tableName name of the table to scan.
1542    * @return Map keyed by StoreFile name with a value of the full Path.
1543    * @throws IOException When scanning the directory fails.
1544    */
1545   public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map,
1546   final FileSystem fs, final Path hbaseRootDir, TableName tableName)
1547   throws IOException {
1548     return getTableStoreFilePathMap(map, fs, hbaseRootDir, tableName, null);
1549   }
1550 
1551   /**
1552    * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1553    * table StoreFile names to the full Path.
1554    * <br>
1555    * Example...<br>
1556    * Key = 3944417774205889744  <br>
1557    * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1558    *
1559    * @param map map to add values.  If null, this method will create and populate one to return
1560    * @param fs  The file system to use.
1561    * @param hbaseRootDir  The root directory to scan.
1562    * @param tableName name of the table to scan.
1563    * @param errors ErrorReporter instance or null
1564    * @return Map keyed by StoreFile name with a value of the full Path.
1565    * @throws IOException When scanning the directory fails.
1566    */
1567   public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map,
1568   final FileSystem fs, final Path hbaseRootDir, TableName tableName, ErrorReporter errors)
1569   throws IOException {
1570     if (map == null) {
1571       map = new HashMap<String, Path>();
1572     }
1573 
1574     // only include the directory paths to tables
1575     Path tableDir = FSUtils.getTableDir(hbaseRootDir, tableName);
1576     // Inside a table, there are compaction.dir directories to skip.  Otherwise, all else
1577     // should be regions.
1578     PathFilter familyFilter = new FamilyDirFilter(fs);
1579     FileStatus[] regionDirs = fs.listStatus(tableDir, new RegionDirFilter(fs));
1580     for (FileStatus regionDir : regionDirs) {
1581       if (null != errors) {
1582         errors.progress();
1583       }
1584       Path dd = regionDir.getPath();
1585       // else its a region name, now look in region for families
1586       FileStatus[] familyDirs = fs.listStatus(dd, familyFilter);
1587       for (FileStatus familyDir : familyDirs) {
1588         if (null != errors) {
1589           errors.progress();
1590         }
1591         Path family = familyDir.getPath();
1592         if (family.getName().equals(HConstants.RECOVERED_EDITS_DIR)) {
1593           continue;
1594         }
1595         // now in family, iterate over the StoreFiles and
1596         // put in map
1597         FileStatus[] familyStatus = fs.listStatus(family);
1598         for (FileStatus sfStatus : familyStatus) {
1599           if (null != errors) {
1600             errors.progress();
1601           }
1602           Path sf = sfStatus.getPath();
1603           map.put( sf.getName(), sf);
1604         }
1605       }
1606     }
1607     return map;
1608   }
1609 
1610   public static int getRegionReferenceFileCount(final FileSystem fs, final Path p) {
1611     int result = 0;
1612     try {
1613       for (Path familyDir:getFamilyDirs(fs, p)){
1614         result += getReferenceFilePaths(fs, familyDir).size();
1615       }
1616     } catch (IOException e) {
1617       LOG.warn("Error Counting reference files.", e);
1618     }
1619     return result;
1620   }
1621 
1622   /**
1623    * Runs through the HBase rootdir and creates a reverse lookup map for
1624    * table StoreFile names to the full Path.
1625    * <br>
1626    * Example...<br>
1627    * Key = 3944417774205889744  <br>
1628    * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1629    *
1630    * @param fs  The file system to use.
1631    * @param hbaseRootDir  The root directory to scan.
1632    * @return Map keyed by StoreFile name with a value of the full Path.
1633    * @throws IOException When scanning the directory fails.
1634    */
1635   public static Map<String, Path> getTableStoreFilePathMap(
1636     final FileSystem fs, final Path hbaseRootDir)
1637   throws IOException {
1638     return getTableStoreFilePathMap(fs, hbaseRootDir, null);
1639   }
1640 
1641   /**
1642    * Runs through the HBase rootdir and creates a reverse lookup map for
1643    * table StoreFile names to the full Path.
1644    * <br>
1645    * Example...<br>
1646    * Key = 3944417774205889744  <br>
1647    * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1648    *
1649    * @param fs  The file system to use.
1650    * @param hbaseRootDir  The root directory to scan.
1651    * @param errors ErrorReporter instance or null
1652    * @return Map keyed by StoreFile name with a value of the full Path.
1653    * @throws IOException When scanning the directory fails.
1654    */
1655   public static Map<String, Path> getTableStoreFilePathMap(
1656     final FileSystem fs, final Path hbaseRootDir, ErrorReporter errors)
1657   throws IOException {
1658     Map<String, Path> map = new HashMap<String, Path>();
1659 
1660     // if this method looks similar to 'getTableFragmentation' that is because
1661     // it was borrowed from it.
1662 
1663     // only include the directory paths to tables
1664     for (Path tableDir : FSUtils.getTableDirs(fs, hbaseRootDir)) {
1665       getTableStoreFilePathMap(map, fs, hbaseRootDir,
1666           FSUtils.getTableName(tableDir), errors);
1667     }
1668     return map;
1669   }
1670 
1671   /**
1672    * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
1673    * This accommodates differences between hadoop versions, where hadoop 1
1674    * does not throw a FileNotFoundException, and return an empty FileStatus[]
1675    * while Hadoop 2 will throw FileNotFoundException.
1676    *
1677    * @param fs file system
1678    * @param dir directory
1679    * @param filter path filter
1680    * @return null if dir is empty or doesn't exist, otherwise FileStatus array
1681    */
1682   public static FileStatus [] listStatus(final FileSystem fs,
1683       final Path dir, final PathFilter filter) throws IOException {
1684     FileStatus [] status = null;
1685     try {
1686       status = filter == null ? fs.listStatus(dir) : fs.listStatus(dir, filter);
1687     } catch (FileNotFoundException fnfe) {
1688       // if directory doesn't exist, return null
1689       if (LOG.isTraceEnabled()) {
1690         LOG.trace(dir + " doesn't exist");
1691       }
1692     }
1693     if (status == null || status.length < 1) return null;
1694     return status;
1695   }
1696 
1697   /**
1698    * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
1699    * This would accommodates differences between hadoop versions
1700    *
1701    * @param fs file system
1702    * @param dir directory
1703    * @return null if dir is empty or doesn't exist, otherwise FileStatus array
1704    */
1705   public static FileStatus[] listStatus(final FileSystem fs, final Path dir) throws IOException {
1706     return listStatus(fs, dir, null);
1707   }
1708 
1709   /**
1710    * Calls fs.delete() and returns the value returned by the fs.delete()
1711    *
1712    * @param fs
1713    * @param path
1714    * @param recursive
1715    * @return the value returned by the fs.delete()
1716    * @throws IOException
1717    */
1718   public static boolean delete(final FileSystem fs, final Path path, final boolean recursive)
1719       throws IOException {
1720     return fs.delete(path, recursive);
1721   }
1722 
1723   /**
1724    * Calls fs.exists(). Checks if the specified path exists
1725    *
1726    * @param fs
1727    * @param path
1728    * @return the value returned by fs.exists()
1729    * @throws IOException
1730    */
1731   public static boolean isExists(final FileSystem fs, final Path path) throws IOException {
1732     return fs.exists(path);
1733   }
1734 
1735   /**
1736    * Throw an exception if an action is not permitted by a user on a file.
1737    *
1738    * @param ugi
1739    *          the user
1740    * @param file
1741    *          the file
1742    * @param action
1743    *          the action
1744    */
1745   public static void checkAccess(UserGroupInformation ugi, FileStatus file,
1746       FsAction action) throws AccessDeniedException {
1747     if (ugi.getShortUserName().equals(file.getOwner())) {
1748       if (file.getPermission().getUserAction().implies(action)) {
1749         return;
1750       }
1751     } else if (contains(ugi.getGroupNames(), file.getGroup())) {
1752       if (file.getPermission().getGroupAction().implies(action)) {
1753         return;
1754       }
1755     } else if (file.getPermission().getOtherAction().implies(action)) {
1756       return;
1757     }
1758     throw new AccessDeniedException("Permission denied:" + " action=" + action
1759         + " path=" + file.getPath() + " user=" + ugi.getShortUserName());
1760   }
1761 
1762   private static boolean contains(String[] groups, String user) {
1763     for (String group : groups) {
1764       if (group.equals(user)) {
1765         return true;
1766       }
1767     }
1768     return false;
1769   }
1770 
1771   /**
1772    * Log the current state of the filesystem from a certain root directory
1773    * @param fs filesystem to investigate
1774    * @param root root file/directory to start logging from
1775    * @param LOG log to output information
1776    * @throws IOException if an unexpected exception occurs
1777    */
1778   public static void logFileSystemState(final FileSystem fs, final Path root, Log LOG)
1779       throws IOException {
1780     LOG.debug("Current file system:");
1781     logFSTree(LOG, fs, root, "|-");
1782   }
1783 
1784   /**
1785    * Recursive helper to log the state of the FS
1786    *
1787    * @see #logFileSystemState(FileSystem, Path, Log)
1788    */
1789   private static void logFSTree(Log LOG, final FileSystem fs, final Path root, String prefix)
1790       throws IOException {
1791     FileStatus[] files = FSUtils.listStatus(fs, root, null);
1792     if (files == null) return;
1793 
1794     for (FileStatus file : files) {
1795       if (file.isDirectory()) {
1796         LOG.debug(prefix + file.getPath().getName() + "/");
1797         logFSTree(LOG, fs, file.getPath(), prefix + "---");
1798       } else {
1799         LOG.debug(prefix + file.getPath().getName());
1800       }
1801     }
1802   }
1803 
1804   public static boolean renameAndSetModifyTime(final FileSystem fs, final Path src, final Path dest)
1805       throws IOException {
1806     // set the modify time for TimeToLive Cleaner
1807     fs.setTimes(src, EnvironmentEdgeManager.currentTime(), -1);
1808     return fs.rename(src, dest);
1809   }
1810 
1811   /**
1812    * This function is to scan the root path of the file system to get the
1813    * degree of locality for each region on each of the servers having at least
1814    * one block of that region.
1815    * This is used by the tool {@link org.apache.hadoop.hbase.master.RegionPlacementMaintainer}
1816    *
1817    * @param conf
1818    *          the configuration to use
1819    * @return the mapping from region encoded name to a map of server names to
1820    *           locality fraction
1821    * @throws IOException
1822    *           in case of file system errors or interrupts
1823    */
1824   public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
1825       final Configuration conf) throws IOException {
1826     return getRegionDegreeLocalityMappingFromFS(
1827         conf, null,
1828         conf.getInt(THREAD_POOLSIZE, DEFAULT_THREAD_POOLSIZE));
1829 
1830   }
1831 
1832   /**
1833    * This function is to scan the root path of the file system to get the
1834    * degree of locality for each region on each of the servers having at least
1835    * one block of that region.
1836    *
1837    * @param conf
1838    *          the configuration to use
1839    * @param desiredTable
1840    *          the table you wish to scan locality for
1841    * @param threadPoolSize
1842    *          the thread pool size to use
1843    * @return the mapping from region encoded name to a map of server names to
1844    *           locality fraction
1845    * @throws IOException
1846    *           in case of file system errors or interrupts
1847    */
1848   public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
1849       final Configuration conf, final String desiredTable, int threadPoolSize)
1850       throws IOException {
1851     Map<String, Map<String, Float>> regionDegreeLocalityMapping =
1852         new ConcurrentHashMap<String, Map<String, Float>>();
1853     getRegionLocalityMappingFromFS(conf, desiredTable, threadPoolSize, null,
1854         regionDegreeLocalityMapping);
1855     return regionDegreeLocalityMapping;
1856   }
1857 
1858   /**
1859    * This function is to scan the root path of the file system to get either the
1860    * mapping between the region name and its best locality region server or the
1861    * degree of locality of each region on each of the servers having at least
1862    * one block of that region. The output map parameters are both optional.
1863    *
1864    * @param conf
1865    *          the configuration to use
1866    * @param desiredTable
1867    *          the table you wish to scan locality for
1868    * @param threadPoolSize
1869    *          the thread pool size to use
1870    * @param regionToBestLocalityRSMapping
1871    *          the map into which to put the best locality mapping or null
1872    * @param regionDegreeLocalityMapping
1873    *          the map into which to put the locality degree mapping or null,
1874    *          must be a thread-safe implementation
1875    * @throws IOException
1876    *           in case of file system errors or interrupts
1877    */
1878   private static void getRegionLocalityMappingFromFS(
1879       final Configuration conf, final String desiredTable,
1880       int threadPoolSize,
1881       Map<String, String> regionToBestLocalityRSMapping,
1882       Map<String, Map<String, Float>> regionDegreeLocalityMapping)
1883       throws IOException {
1884     FileSystem fs =  FileSystem.get(conf);
1885     Path rootPath = FSUtils.getRootDir(conf);
1886     long startTime = EnvironmentEdgeManager.currentTime();
1887     Path queryPath;
1888     // The table files are in ${hbase.rootdir}/data/<namespace>/<table>/*
1889     if (null == desiredTable) {
1890       queryPath = new Path(new Path(rootPath, HConstants.BASE_NAMESPACE_DIR).toString() + "/*/*/*/");
1891     } else {
1892       queryPath = new Path(FSUtils.getTableDir(rootPath, TableName.valueOf(desiredTable)).toString() + "/*/");
1893     }
1894 
1895     // reject all paths that are not appropriate
1896     PathFilter pathFilter = new PathFilter() {
1897       @Override
1898       public boolean accept(Path path) {
1899         // this is the region name; it may get some noise data
1900         if (null == path) {
1901           return false;
1902         }
1903 
1904         // no parent?
1905         Path parent = path.getParent();
1906         if (null == parent) {
1907           return false;
1908         }
1909 
1910         String regionName = path.getName();
1911         if (null == regionName) {
1912           return false;
1913         }
1914 
1915         if (!regionName.toLowerCase().matches("[0-9a-f]+")) {
1916           return false;
1917         }
1918         return true;
1919       }
1920     };
1921 
1922     FileStatus[] statusList = fs.globStatus(queryPath, pathFilter);
1923 
1924     if (null == statusList) {
1925       return;
1926     } else {
1927       LOG.debug("Query Path: " + queryPath + " ; # list of files: " +
1928           statusList.length);
1929     }
1930 
1931     // lower the number of threads in case we have very few expected regions
1932     threadPoolSize = Math.min(threadPoolSize, statusList.length);
1933 
1934     // run in multiple threads
1935     ThreadPoolExecutor tpe = new ThreadPoolExecutor(threadPoolSize,
1936         threadPoolSize, 60, TimeUnit.SECONDS,
1937         new ArrayBlockingQueue<Runnable>(statusList.length));
1938     try {
1939       // ignore all file status items that are not of interest
1940       for (FileStatus regionStatus : statusList) {
1941         if (null == regionStatus) {
1942           continue;
1943         }
1944 
1945         if (!regionStatus.isDirectory()) {
1946           continue;
1947         }
1948 
1949         Path regionPath = regionStatus.getPath();
1950         if (null == regionPath) {
1951           continue;
1952         }
1953 
1954         tpe.execute(new FSRegionScanner(fs, regionPath,
1955             regionToBestLocalityRSMapping, regionDegreeLocalityMapping));
1956       }
1957     } finally {
1958       tpe.shutdown();
1959       int threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY,
1960           60 * 1000);
1961       try {
1962         // here we wait until TPE terminates, which is either naturally or by
1963         // exceptions in the execution of the threads
1964         while (!tpe.awaitTermination(threadWakeFrequency,
1965             TimeUnit.MILLISECONDS)) {
1966           // printing out rough estimate, so as to not introduce
1967           // AtomicInteger
1968           LOG.info("Locality checking is underway: { Scanned Regions : "
1969               + tpe.getCompletedTaskCount() + "/"
1970               + tpe.getTaskCount() + " }");
1971         }
1972       } catch (InterruptedException e) {
1973         throw (InterruptedIOException)new InterruptedIOException().initCause(e);
1974       }
1975     }
1976 
1977     long overhead = EnvironmentEdgeManager.currentTime() - startTime;
1978     String overheadMsg = "Scan DFS for locality info takes " + overhead + " ms";
1979 
1980     LOG.info(overheadMsg);
1981   }
1982 
1983   /**
1984    * Do our short circuit read setup.
1985    * Checks buffer size to use and whether to do checksumming in hbase or hdfs.
1986    * @param conf
1987    */
1988   public static void setupShortCircuitRead(final Configuration conf) {
1989     // Check that the user has not set the "dfs.client.read.shortcircuit.skip.checksum" property.
1990     boolean shortCircuitSkipChecksum =
1991       conf.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false);
1992     boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
1993     if (shortCircuitSkipChecksum) {
1994       LOG.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not " +
1995         "be set to true." + (useHBaseChecksum ? " HBase checksum doesn't require " +
1996         "it, see https://issues.apache.org/jira/browse/HBASE-6868." : ""));
1997       assert !shortCircuitSkipChecksum; //this will fail if assertions are on
1998     }
1999     checkShortCircuitReadBufferSize(conf);
2000   }
2001 
2002   /**
2003    * Check if short circuit read buffer size is set and if not, set it to hbase value.
2004    * @param conf
2005    */
2006   public static void checkShortCircuitReadBufferSize(final Configuration conf) {
2007     final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2;
2008     final int notSet = -1;
2009     // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2
2010     final String dfsKey = "dfs.client.read.shortcircuit.buffer.size";
2011     int size = conf.getInt(dfsKey, notSet);
2012     // If a size is set, return -- we will use it.
2013     if (size != notSet) return;
2014     // But short circuit buffer size is normally not set.  Put in place the hbase wanted size.
2015     int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize);
2016     conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
2017   }
2018 
2019   /**
2020    * @param c
2021    * @return The DFSClient DFSHedgedReadMetrics instance or null if can't be found or not on hdfs.
2022    * @throws IOException
2023    */
2024   public static DFSHedgedReadMetrics getDFSHedgedReadMetrics(final Configuration c)
2025       throws IOException {
2026     if (!isHDFS(c)) return null;
2027     // getHedgedReadMetrics is package private. Get the DFSClient instance that is internal
2028     // to the DFS FS instance and make the method getHedgedReadMetrics accessible, then invoke it
2029     // to get the singleton instance of DFSHedgedReadMetrics shared by DFSClients.
2030     final String name = "getHedgedReadMetrics";
2031     DFSClient dfsclient = ((DistributedFileSystem)FileSystem.get(c)).getClient();
2032     Method m;
2033     try {
2034       m = dfsclient.getClass().getDeclaredMethod(name);
2035     } catch (NoSuchMethodException e) {
2036       LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " +
2037           e.getMessage());
2038       return null;
2039     } catch (SecurityException e) {
2040       LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " +
2041           e.getMessage());
2042       return null;
2043     }
2044     m.setAccessible(true);
2045     try {
2046       return (DFSHedgedReadMetrics)m.invoke(dfsclient);
2047     } catch (IllegalAccessException e) {
2048       LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " +
2049           e.getMessage());
2050       return null;
2051     } catch (IllegalArgumentException e) {
2052       LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " +
2053           e.getMessage());
2054       return null;
2055     } catch (InvocationTargetException e) {
2056       LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " +
2057           e.getMessage());
2058       return null;
2059     }
2060   }
2061 }