View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.util;
20  
21  import java.io.ByteArrayInputStream;
22  import java.io.DataInputStream;
23  import java.io.EOFException;
24  import java.io.FileNotFoundException;
25  import java.io.IOException;
26  import java.io.InputStream;
27  import java.io.InterruptedIOException;
28  import java.lang.reflect.InvocationTargetException;
29  import java.lang.reflect.Method;
30  import java.net.InetSocketAddress;
31  import java.net.URI;
32  import java.net.URISyntaxException;
33  import java.util.ArrayList;
34  import java.util.Collections;
35  import java.util.HashMap;
36  import java.util.LinkedList;
37  import java.util.List;
38  import java.util.Map;
39  import java.util.concurrent.ArrayBlockingQueue;
40  import java.util.concurrent.ConcurrentHashMap;
41  import java.util.concurrent.ThreadPoolExecutor;
42  import java.util.concurrent.TimeUnit;
43  import java.util.regex.Pattern;
44  
45  import org.apache.commons.logging.Log;
46  import org.apache.commons.logging.LogFactory;
47  import org.apache.hadoop.hbase.classification.InterfaceAudience;
48  import org.apache.hadoop.HadoopIllegalArgumentException;
49  import org.apache.hadoop.conf.Configuration;
50  import org.apache.hadoop.fs.BlockLocation;
51  import org.apache.hadoop.fs.FSDataInputStream;
52  import org.apache.hadoop.fs.FSDataOutputStream;
53  import org.apache.hadoop.fs.FileStatus;
54  import org.apache.hadoop.fs.FileSystem;
55  import org.apache.hadoop.fs.Path;
56  import org.apache.hadoop.fs.PathFilter;
57  import org.apache.hadoop.fs.permission.FsAction;
58  import org.apache.hadoop.fs.permission.FsPermission;
59  import org.apache.hadoop.hbase.ClusterId;
60  import org.apache.hadoop.hbase.HColumnDescriptor;
61  import org.apache.hadoop.hbase.HConstants;
62  import org.apache.hadoop.hbase.HDFSBlocksDistribution;
63  import org.apache.hadoop.hbase.HRegionInfo;
64  import org.apache.hadoop.hbase.TableName;
65  import org.apache.hadoop.hbase.exceptions.DeserializationException;
66  import org.apache.hadoop.hbase.fs.HFileSystem;
67  import org.apache.hadoop.hbase.master.HMaster;
68  import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
69  import org.apache.hadoop.hbase.security.AccessDeniedException;
70  import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter;
71  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
72  import org.apache.hadoop.hbase.protobuf.generated.FSProtos;
73  import org.apache.hadoop.hbase.regionserver.HRegion;
74  import org.apache.hadoop.hdfs.DFSClient;
75  import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
76  import org.apache.hadoop.hdfs.DistributedFileSystem;
77  import org.apache.hadoop.io.IOUtils;
78  import org.apache.hadoop.io.SequenceFile;
79  import org.apache.hadoop.ipc.RemoteException;
80  import org.apache.hadoop.security.UserGroupInformation;
81  import org.apache.hadoop.util.Progressable;
82  import org.apache.hadoop.util.ReflectionUtils;
83  import org.apache.hadoop.util.StringUtils;
84  
85  import com.google.common.primitives.Ints;
86  import com.google.protobuf.InvalidProtocolBufferException;
87  
88  /**
89   * Utility methods for interacting with the underlying file system.
90   */
91  @InterfaceAudience.Private
92  public abstract class FSUtils {
93    private static final Log LOG = LogFactory.getLog(FSUtils.class);
94  
95    /** Full access permissions (starting point for a umask) */
96    public static final String FULL_RWX_PERMISSIONS = "777";
97    private static final String THREAD_POOLSIZE = "hbase.client.localityCheck.threadPoolSize";
98    private static final int DEFAULT_THREAD_POOLSIZE = 2;
99  
100   /** Set to true on Windows platforms */
101   public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
102 
103   protected FSUtils() {
104     super();
105   }
106 
107   /**
108    * Sets storage policy for given path according to config setting.
109    * If the passed path is a directory, we'll set the storage policy for all files
110    * created in the future in said directory. Note that this change in storage
111    * policy takes place at the HDFS level; it will persist beyond this RS's lifecycle.
112    * If we're running on a version of HDFS that doesn't support the given storage policy
113    * (or storage policies at all), then we'll issue a log message and continue.
114    *
115    * See http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html
116    *
117    * @param fs We only do anything if an instance of DistributedFileSystem
118    * @param conf used to look up storage policy with given key; not modified.
119    * @param path the Path whose storage policy is to be set
120    * @param policyKey e.g. HConstants.WAL_STORAGE_POLICY
121    * @param defaultPolicy usually should be the policy NONE to delegate to HDFS
122    */
123   public static void setStoragePolicy(final FileSystem fs, final Configuration conf,
124       final Path path, final String policyKey, final String defaultPolicy) {
125     String storagePolicy = conf.get(policyKey, defaultPolicy).toUpperCase();
126     if (storagePolicy.equals(defaultPolicy)) {
127       if (LOG.isTraceEnabled()) {
128         LOG.trace("default policy of " + defaultPolicy + " requested, exiting early.");
129       }
130       return;
131     }
132     if (fs instanceof DistributedFileSystem) {
133       DistributedFileSystem dfs = (DistributedFileSystem)fs;
134       // Once our minimum supported Hadoop version is 2.6.0 we can remove reflection.
135       Class<? extends DistributedFileSystem> dfsClass = dfs.getClass();
136       Method m = null;
137       try {
138         m = dfsClass.getDeclaredMethod("setStoragePolicy",
139             new Class<?>[] { Path.class, String.class });
140         m.setAccessible(true);
141       } catch (NoSuchMethodException e) {
142         LOG.info("FileSystem doesn't support"
143             + " setStoragePolicy; --HDFS-6584 not available");
144       } catch (SecurityException e) {
145         LOG.info("Doesn't have access to setStoragePolicy on "
146             + "FileSystems --HDFS-6584 not available", e);
147         m = null; // could happen on setAccessible()
148       }
149       if (m != null) {
150         try {
151           m.invoke(dfs, path, storagePolicy);
152           LOG.info("set " + storagePolicy + " for " + path);
153         } catch (Exception e) {
154           // check for lack of HDFS-7228
155           boolean probablyBadPolicy = false;
156           if (e instanceof InvocationTargetException) {
157             final Throwable exception = e.getCause();
158             if (exception instanceof RemoteException &&
159                 HadoopIllegalArgumentException.class.getName().equals(
160                     ((RemoteException)exception).getClassName())) {
161               LOG.warn("Given storage policy, '" + storagePolicy + "', was rejected and probably " +
162                   "isn't a valid policy for the version of Hadoop you're running. I.e. if you're " +
163                   "trying to use SSD related policies then you're likely missing HDFS-7228. For " +
164                   "more information see the 'ArchivalStorage' docs for your Hadoop release.");
165               LOG.debug("More information about the invalid storage policy.", exception);
166               probablyBadPolicy = true;
167             }
168           }
169           if (!probablyBadPolicy) {
170             // This swallows FNFE, should we be throwing it? seems more likely to indicate dev
171             // misuse than a runtime problem with HDFS.
172             LOG.warn("Unable to set " + storagePolicy + " for " + path, e);
173           }
174         }
175       }
176     } else {
177       LOG.info("FileSystem isn't an instance of DistributedFileSystem; presuming it doesn't " +
178           "support setStoragePolicy.");
179     }
180   }
181 
182   /**
183    * Compare of path component. Does not consider schema; i.e. if schemas different but <code>path
184    * <code> starts with <code>rootPath<code>, then the function returns true
185    * @param rootPath
186    * @param path
187    * @return True if <code>path</code> starts with <code>rootPath</code>
188    */
189   public static boolean isStartingWithPath(final Path rootPath, final String path) {
190     String uriRootPath = rootPath.toUri().getPath();
191     String tailUriPath = (new Path(path)).toUri().getPath();
192     return tailUriPath.startsWith(uriRootPath);
193   }
194 
195   /**
196    * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
197    * '/a/b/c' part. Does not consider schema; i.e. if schemas different but path or subpath matches,
198    * the two will equate.
199    * @param pathToSearch Path we will be trying to match.
200    * @param pathTail
201    * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
202    */
203   public static boolean isMatchingTail(final Path pathToSearch, String pathTail) {
204     return isMatchingTail(pathToSearch, new Path(pathTail));
205   }
206 
207   /**
208    * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
209    * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true.  Does not consider
210    * schema; i.e. if schemas different but path or subpath matches, the two will equate.
211    * @param pathToSearch Path we will be trying to match.
212    * @param pathTail
213    * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
214    */
215   public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) {
216     if (pathToSearch.depth() != pathTail.depth()) return false;
217     Path tailPath = pathTail;
218     String tailName;
219     Path toSearch = pathToSearch;
220     String toSearchName;
221     boolean result = false;
222     do {
223       tailName = tailPath.getName();
224       if (tailName == null || tailName.length() <= 0) {
225         result = true;
226         break;
227       }
228       toSearchName = toSearch.getName();
229       if (toSearchName == null || toSearchName.length() <= 0) break;
230       // Move up a parent on each path for next go around.  Path doesn't let us go off the end.
231       tailPath = tailPath.getParent();
232       toSearch = toSearch.getParent();
233     } while(tailName.equals(toSearchName));
234     return result;
235   }
236 
237   public static FSUtils getInstance(FileSystem fs, Configuration conf) {
238     String scheme = fs.getUri().getScheme();
239     if (scheme == null) {
240       LOG.warn("Could not find scheme for uri " +
241           fs.getUri() + ", default to hdfs");
242       scheme = "hdfs";
243     }
244     Class<?> fsUtilsClass = conf.getClass("hbase.fsutil." +
245         scheme + ".impl", FSHDFSUtils.class); // Default to HDFS impl
246     FSUtils fsUtils = (FSUtils)ReflectionUtils.newInstance(fsUtilsClass, conf);
247     return fsUtils;
248   }
249 
250   /**
251    * Delete if exists.
252    * @param fs filesystem object
253    * @param dir directory to delete
254    * @return True if deleted <code>dir</code>
255    * @throws IOException e
256    */
257   public static boolean deleteDirectory(final FileSystem fs, final Path dir)
258   throws IOException {
259     return fs.exists(dir) && fs.delete(dir, true);
260   }
261 
262   /**
263    * Delete the region directory if exists.
264    * @param conf
265    * @param hri
266    * @return True if deleted the region directory.
267    * @throws IOException
268    */
269   public static boolean deleteRegionDir(final Configuration conf, final HRegionInfo hri)
270   throws IOException {
271     Path rootDir = getRootDir(conf);
272     FileSystem fs = rootDir.getFileSystem(conf);
273     return deleteDirectory(fs,
274       new Path(getTableDir(rootDir, hri.getTable()), hri.getEncodedName()));
275   }
276 
277   /**
278    * Return the number of bytes that large input files should be optimally
279    * be split into to minimize i/o time.
280    *
281    * use reflection to search for getDefaultBlockSize(Path f)
282    * if the method doesn't exist, fall back to using getDefaultBlockSize()
283    *
284    * @param fs filesystem object
285    * @return the default block size for the path's filesystem
286    * @throws IOException e
287    */
288   public static long getDefaultBlockSize(final FileSystem fs, final Path path) throws IOException {
289     Method m = null;
290     Class<? extends FileSystem> cls = fs.getClass();
291     try {
292       m = cls.getMethod("getDefaultBlockSize", new Class<?>[] { Path.class });
293     } catch (NoSuchMethodException e) {
294       LOG.info("FileSystem doesn't support getDefaultBlockSize");
295     } catch (SecurityException e) {
296       LOG.info("Doesn't have access to getDefaultBlockSize on FileSystems", e);
297       m = null; // could happen on setAccessible()
298     }
299     if (m == null) {
300       return fs.getDefaultBlockSize(path);
301     } else {
302       try {
303         Object ret = m.invoke(fs, path);
304         return ((Long)ret).longValue();
305       } catch (Exception e) {
306         throw new IOException(e);
307       }
308     }
309   }
310 
311   /*
312    * Get the default replication.
313    *
314    * use reflection to search for getDefaultReplication(Path f)
315    * if the method doesn't exist, fall back to using getDefaultReplication()
316    *
317    * @param fs filesystem object
318    * @param f path of file
319    * @return default replication for the path's filesystem
320    * @throws IOException e
321    */
322   public static short getDefaultReplication(final FileSystem fs, final Path path) throws IOException {
323     Method m = null;
324     Class<? extends FileSystem> cls = fs.getClass();
325     try {
326       m = cls.getMethod("getDefaultReplication", new Class<?>[] { Path.class });
327     } catch (NoSuchMethodException e) {
328       LOG.info("FileSystem doesn't support getDefaultReplication");
329     } catch (SecurityException e) {
330       LOG.info("Doesn't have access to getDefaultReplication on FileSystems", e);
331       m = null; // could happen on setAccessible()
332     }
333     if (m == null) {
334       return fs.getDefaultReplication(path);
335     } else {
336       try {
337         Object ret = m.invoke(fs, path);
338         return ((Number)ret).shortValue();
339       } catch (Exception e) {
340         throw new IOException(e);
341       }
342     }
343   }
344 
345   /**
346    * Returns the default buffer size to use during writes.
347    *
348    * The size of the buffer should probably be a multiple of hardware
349    * page size (4096 on Intel x86), and it determines how much data is
350    * buffered during read and write operations.
351    *
352    * @param fs filesystem object
353    * @return default buffer size to use during writes
354    */
355   public static int getDefaultBufferSize(final FileSystem fs) {
356     return fs.getConf().getInt("io.file.buffer.size", 4096);
357   }
358 
359   /**
360    * Create the specified file on the filesystem. By default, this will:
361    * <ol>
362    * <li>overwrite the file if it exists</li>
363    * <li>apply the umask in the configuration (if it is enabled)</li>
364    * <li>use the fs configured buffer size (or 4096 if not set)</li>
365    * <li>use the default replication</li>
366    * <li>use the default block size</li>
367    * <li>not track progress</li>
368    * </ol>
369    *
370    * @param fs {@link FileSystem} on which to write the file
371    * @param path {@link Path} to the file to write
372    * @param perm permissions
373    * @param favoredNodes
374    * @return output stream to the created file
375    * @throws IOException if the file cannot be created
376    */
377   public static FSDataOutputStream create(FileSystem fs, Path path,
378       FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException {
379     if (fs instanceof HFileSystem) {
380       FileSystem backingFs = ((HFileSystem)fs).getBackingFs();
381       if (backingFs instanceof DistributedFileSystem) {
382         // Try to use the favoredNodes version via reflection to allow backwards-
383         // compatibility.
384         try {
385           return (FSDataOutputStream) (DistributedFileSystem.class
386               .getDeclaredMethod("create", Path.class, FsPermission.class,
387                   boolean.class, int.class, short.class, long.class,
388                   Progressable.class, InetSocketAddress[].class)
389                   .invoke(backingFs, path, perm, true,
390                       getDefaultBufferSize(backingFs),
391                       getDefaultReplication(backingFs, path),
392                       getDefaultBlockSize(backingFs, path),
393                       null, favoredNodes));
394         } catch (InvocationTargetException ite) {
395           // Function was properly called, but threw it's own exception.
396           throw new IOException(ite.getCause());
397         } catch (NoSuchMethodException e) {
398           LOG.debug("DFS Client does not support most favored nodes create; using default create");
399           if (LOG.isTraceEnabled()) LOG.trace("Ignoring; use default create", e);
400         } catch (IllegalArgumentException e) {
401           LOG.debug("Ignoring (most likely Reflection related exception) " + e);
402         } catch (SecurityException e) {
403           LOG.debug("Ignoring (most likely Reflection related exception) " + e);
404         } catch (IllegalAccessException e) {
405           LOG.debug("Ignoring (most likely Reflection related exception) " + e);
406         }
407       }
408     }
409     return create(fs, path, perm, true);
410   }
411 
412   /**
413    * Create the specified file on the filesystem. By default, this will:
414    * <ol>
415    * <li>apply the umask in the configuration (if it is enabled)</li>
416    * <li>use the fs configured buffer size (or 4096 if not set)</li>
417    * <li>use the default replication</li>
418    * <li>use the default block size</li>
419    * <li>not track progress</li>
420    * </ol>
421    *
422    * @param fs {@link FileSystem} on which to write the file
423    * @param path {@link Path} to the file to write
424    * @param perm
425    * @param overwrite Whether or not the created file should be overwritten.
426    * @return output stream to the created file
427    * @throws IOException if the file cannot be created
428    */
429   public static FSDataOutputStream create(FileSystem fs, Path path,
430       FsPermission perm, boolean overwrite) throws IOException {
431     if (LOG.isTraceEnabled()) {
432       LOG.trace("Creating file=" + path + " with permission=" + perm + ", overwrite=" + overwrite);
433     }
434     return fs.create(path, perm, overwrite, getDefaultBufferSize(fs),
435         getDefaultReplication(fs, path), getDefaultBlockSize(fs, path), null);
436   }
437 
438   /**
439    * Get the file permissions specified in the configuration, if they are
440    * enabled.
441    *
442    * @param fs filesystem that the file will be created on.
443    * @param conf configuration to read for determining if permissions are
444    *          enabled and which to use
445    * @param permssionConfKey property key in the configuration to use when
446    *          finding the permission
447    * @return the permission to use when creating a new file on the fs. If
448    *         special permissions are not specified in the configuration, then
449    *         the default permissions on the the fs will be returned.
450    */
451   public static FsPermission getFilePermissions(final FileSystem fs,
452       final Configuration conf, final String permssionConfKey) {
453     boolean enablePermissions = conf.getBoolean(
454         HConstants.ENABLE_DATA_FILE_UMASK, false);
455 
456     if (enablePermissions) {
457       try {
458         FsPermission perm = new FsPermission(FULL_RWX_PERMISSIONS);
459         // make sure that we have a mask, if not, go default.
460         String mask = conf.get(permssionConfKey);
461         if (mask == null)
462           return FsPermission.getFileDefault();
463         // appy the umask
464         FsPermission umask = new FsPermission(mask);
465         return perm.applyUMask(umask);
466       } catch (IllegalArgumentException e) {
467         LOG.warn(
468             "Incorrect umask attempted to be created: "
469                 + conf.get(permssionConfKey)
470                 + ", using default file permissions.", e);
471         return FsPermission.getFileDefault();
472       }
473     }
474     return FsPermission.getFileDefault();
475   }
476 
477   /**
478    * Checks to see if the specified file system is available
479    *
480    * @param fs filesystem
481    * @throws IOException e
482    */
483   public static void checkFileSystemAvailable(final FileSystem fs)
484   throws IOException {
485     if (!(fs instanceof DistributedFileSystem)) {
486       return;
487     }
488     IOException exception = null;
489     DistributedFileSystem dfs = (DistributedFileSystem) fs;
490     try {
491       if (dfs.exists(new Path("/"))) {
492         return;
493       }
494     } catch (IOException e) {
495       exception = e instanceof RemoteException ?
496               ((RemoteException)e).unwrapRemoteException() : e;
497     }
498     try {
499       fs.close();
500     } catch (Exception e) {
501       LOG.error("file system close failed: ", e);
502     }
503     IOException io = new IOException("File system is not available");
504     io.initCause(exception);
505     throw io;
506   }
507 
508   /**
509    * We use reflection because {@link DistributedFileSystem#setSafeMode(
510    * FSConstants.SafeModeAction action, boolean isChecked)} is not in hadoop 1.1
511    *
512    * @param dfs
513    * @return whether we're in safe mode
514    * @throws IOException
515    */
516   private static boolean isInSafeMode(DistributedFileSystem dfs) throws IOException {
517     boolean inSafeMode = false;
518     try {
519       Method m = DistributedFileSystem.class.getMethod("setSafeMode", new Class<?> []{
520           org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction.class, boolean.class});
521       inSafeMode = (Boolean) m.invoke(dfs,
522         org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction.SAFEMODE_GET, true);
523     } catch (Exception e) {
524       if (e instanceof IOException) throw (IOException) e;
525 
526       // Check whether dfs is on safemode.
527       inSafeMode = dfs.setSafeMode(
528         org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction.SAFEMODE_GET);
529     }
530     return inSafeMode;
531   }
532 
533   /**
534    * Check whether dfs is in safemode.
535    * @param conf
536    * @throws IOException
537    */
538   public static void checkDfsSafeMode(final Configuration conf)
539   throws IOException {
540     boolean isInSafeMode = false;
541     FileSystem fs = FileSystem.get(conf);
542     if (fs instanceof DistributedFileSystem) {
543       DistributedFileSystem dfs = (DistributedFileSystem)fs;
544       isInSafeMode = isInSafeMode(dfs);
545     }
546     if (isInSafeMode) {
547       throw new IOException("File system is in safemode, it can't be written now");
548     }
549   }
550 
551   /**
552    * Verifies current version of file system
553    *
554    * @param fs filesystem object
555    * @param rootdir root hbase directory
556    * @return null if no version file exists, version string otherwise.
557    * @throws IOException e
558    * @throws org.apache.hadoop.hbase.exceptions.DeserializationException
559    */
560   public static String getVersion(FileSystem fs, Path rootdir)
561   throws IOException, DeserializationException {
562     Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
563     FileStatus[] status = null;
564     try {
565       // hadoop 2.0 throws FNFE if directory does not exist.
566       // hadoop 1.0 returns null if directory does not exist.
567       status = fs.listStatus(versionFile);
568     } catch (FileNotFoundException fnfe) {
569       return null;
570     }
571     if (status == null || status.length == 0) return null;
572     String version = null;
573     byte [] content = new byte [(int)status[0].getLen()];
574     FSDataInputStream s = fs.open(versionFile);
575     try {
576       IOUtils.readFully(s, content, 0, content.length);
577       if (ProtobufUtil.isPBMagicPrefix(content)) {
578         version = parseVersionFrom(content);
579       } else {
580         // Presume it pre-pb format.
581         InputStream is = new ByteArrayInputStream(content);
582         DataInputStream dis = new DataInputStream(is);
583         try {
584           version = dis.readUTF();
585         } finally {
586           dis.close();
587         }
588       }
589     } catch (EOFException eof) {
590       LOG.warn("Version file was empty, odd, will try to set it.");
591     } finally {
592       s.close();
593     }
594     return version;
595   }
596 
597   /**
598    * Parse the content of the ${HBASE_ROOTDIR}/hbase.version file.
599    * @param bytes The byte content of the hbase.version file.
600    * @return The version found in the file as a String.
601    * @throws DeserializationException
602    */
603   static String parseVersionFrom(final byte [] bytes)
604   throws DeserializationException {
605     ProtobufUtil.expectPBMagicPrefix(bytes);
606     int pblen = ProtobufUtil.lengthOfPBMagic();
607     FSProtos.HBaseVersionFileContent.Builder builder =
608       FSProtos.HBaseVersionFileContent.newBuilder();
609     FSProtos.HBaseVersionFileContent fileContent;
610     try {
611       fileContent = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
612       return fileContent.getVersion();
613     } catch (InvalidProtocolBufferException e) {
614       // Convert
615       throw new DeserializationException(e);
616     }
617   }
618 
619   /**
620    * Create the content to write into the ${HBASE_ROOTDIR}/hbase.version file.
621    * @param version Version to persist
622    * @return Serialized protobuf with <code>version</code> content and a bit of pb magic for a prefix.
623    */
624   static byte [] toVersionByteArray(final String version) {
625     FSProtos.HBaseVersionFileContent.Builder builder =
626       FSProtos.HBaseVersionFileContent.newBuilder();
627     return ProtobufUtil.prependPBMagic(builder.setVersion(version).build().toByteArray());
628   }
629 
630   /**
631    * Verifies current version of file system
632    *
633    * @param fs file system
634    * @param rootdir root directory of HBase installation
635    * @param message if true, issues a message on System.out
636    *
637    * @throws IOException e
638    * @throws DeserializationException
639    */
640   public static void checkVersion(FileSystem fs, Path rootdir, boolean message)
641   throws IOException, DeserializationException {
642     checkVersion(fs, rootdir, message, 0, HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
643   }
644 
645   /**
646    * Verifies current version of file system
647    *
648    * @param fs file system
649    * @param rootdir root directory of HBase installation
650    * @param message if true, issues a message on System.out
651    * @param wait wait interval
652    * @param retries number of times to retry
653    *
654    * @throws IOException e
655    * @throws DeserializationException
656    */
657   public static void checkVersion(FileSystem fs, Path rootdir,
658       boolean message, int wait, int retries)
659   throws IOException, DeserializationException {
660     String version = getVersion(fs, rootdir);
661     if (version == null) {
662       if (!metaRegionExists(fs, rootdir)) {
663         // rootDir is empty (no version file and no root region)
664         // just create new version file (HBASE-1195)
665         setVersion(fs, rootdir, wait, retries);
666         return;
667       }
668     } else if (version.compareTo(HConstants.FILE_SYSTEM_VERSION) == 0) return;
669 
670     // version is deprecated require migration
671     // Output on stdout so user sees it in terminal.
672     String msg = "HBase file layout needs to be upgraded."
673       + " You have version " + version
674       + " and I want version " + HConstants.FILE_SYSTEM_VERSION
675       + ". Consult http://hbase.apache.org/book.html for further information about upgrading HBase."
676       + " Is your hbase.rootdir valid? If so, you may need to run "
677       + "'hbase hbck -fixVersionFile'.";
678     if (message) {
679       System.out.println("WARNING! " + msg);
680     }
681     throw new FileSystemVersionException(msg);
682   }
683 
684   /**
685    * Sets version of file system
686    *
687    * @param fs filesystem object
688    * @param rootdir hbase root
689    * @throws IOException e
690    */
691   public static void setVersion(FileSystem fs, Path rootdir)
692   throws IOException {
693     setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, 0,
694       HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
695   }
696 
697   /**
698    * Sets version of file system
699    *
700    * @param fs filesystem object
701    * @param rootdir hbase root
702    * @param wait time to wait for retry
703    * @param retries number of times to retry before failing
704    * @throws IOException e
705    */
706   public static void setVersion(FileSystem fs, Path rootdir, int wait, int retries)
707   throws IOException {
708     setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, wait, retries);
709   }
710 
711 
712   /**
713    * Sets version of file system
714    *
715    * @param fs filesystem object
716    * @param rootdir hbase root directory
717    * @param version version to set
718    * @param wait time to wait for retry
719    * @param retries number of times to retry before throwing an IOException
720    * @throws IOException e
721    */
722   public static void setVersion(FileSystem fs, Path rootdir, String version,
723       int wait, int retries) throws IOException {
724     Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
725     Path tempVersionFile = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY + Path.SEPARATOR +
726       HConstants.VERSION_FILE_NAME);
727     while (true) {
728       try {
729         // Write the version to a temporary file
730         FSDataOutputStream s = fs.create(tempVersionFile);
731         try {
732           s.write(toVersionByteArray(version));
733           s.close();
734           s = null;
735           // Move the temp version file to its normal location. Returns false
736           // if the rename failed. Throw an IOE in that case.
737           if (!fs.rename(tempVersionFile, versionFile)) {
738             throw new IOException("Unable to move temp version file to " + versionFile);
739           }
740         } finally {
741           // Cleaning up the temporary if the rename failed would be trying
742           // too hard. We'll unconditionally create it again the next time
743           // through anyway, files are overwritten by default by create().
744 
745           // Attempt to close the stream on the way out if it is still open.
746           try {
747             if (s != null) s.close();
748           } catch (IOException ignore) { }
749         }
750         LOG.info("Created version file at " + rootdir.toString() + " with version=" + version);
751         return;
752       } catch (IOException e) {
753         if (retries > 0) {
754           LOG.debug("Unable to create version file at " + rootdir.toString() + ", retrying", e);
755           fs.delete(versionFile, false);
756           try {
757             if (wait > 0) {
758               Thread.sleep(wait);
759             }
760           } catch (InterruptedException ie) {
761             throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
762           }
763           retries--;
764         } else {
765           throw e;
766         }
767       }
768     }
769   }
770 
771   /**
772    * Checks that a cluster ID file exists in the HBase root directory
773    * @param fs the root directory FileSystem
774    * @param rootdir the HBase root directory in HDFS
775    * @param wait how long to wait between retries
776    * @return <code>true</code> if the file exists, otherwise <code>false</code>
777    * @throws IOException if checking the FileSystem fails
778    */
779   public static boolean checkClusterIdExists(FileSystem fs, Path rootdir,
780       int wait) throws IOException {
781     while (true) {
782       try {
783         Path filePath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
784         return fs.exists(filePath);
785       } catch (IOException ioe) {
786         if (wait > 0) {
787           LOG.warn("Unable to check cluster ID file in " + rootdir.toString() +
788               ", retrying in "+wait+"msec: "+StringUtils.stringifyException(ioe));
789           try {
790             Thread.sleep(wait);
791           } catch (InterruptedException e) {
792             throw (InterruptedIOException)new InterruptedIOException().initCause(e);
793           }
794         } else {
795           throw ioe;
796         }
797       }
798     }
799   }
800 
801   /**
802    * Returns the value of the unique cluster ID stored for this HBase instance.
803    * @param fs the root directory FileSystem
804    * @param rootdir the path to the HBase root directory
805    * @return the unique cluster identifier
806    * @throws IOException if reading the cluster ID file fails
807    */
808   public static ClusterId getClusterId(FileSystem fs, Path rootdir)
809   throws IOException {
810     Path idPath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
811     ClusterId clusterId = null;
812     FileStatus status = fs.exists(idPath)? fs.getFileStatus(idPath):  null;
813     if (status != null) {
814       int len = Ints.checkedCast(status.getLen());
815       byte [] content = new byte[len];
816       FSDataInputStream in = fs.open(idPath);
817       try {
818         in.readFully(content);
819       } catch (EOFException eof) {
820         LOG.warn("Cluster ID file " + idPath.toString() + " was empty");
821       } finally{
822         in.close();
823       }
824       try {
825         clusterId = ClusterId.parseFrom(content);
826       } catch (DeserializationException e) {
827         throw new IOException("content=" + Bytes.toString(content), e);
828       }
829       // If not pb'd, make it so.
830       if (!ProtobufUtil.isPBMagicPrefix(content)) {
831         String cid = null;
832         in = fs.open(idPath);
833         try {
834           cid = in.readUTF();
835           clusterId = new ClusterId(cid);
836         } catch (EOFException eof) {
837           LOG.warn("Cluster ID file " + idPath.toString() + " was empty");
838         } finally {
839           in.close();
840         }
841         rewriteAsPb(fs, rootdir, idPath, clusterId);
842       }
843       return clusterId;
844     } else {
845       LOG.warn("Cluster ID file does not exist at " + idPath.toString());
846     }
847     return clusterId;
848   }
849 
850   /**
851    * @param cid
852    * @throws IOException
853    */
854   private static void rewriteAsPb(final FileSystem fs, final Path rootdir, final Path p,
855       final ClusterId cid)
856   throws IOException {
857     // Rewrite the file as pb.  Move aside the old one first, write new
858     // then delete the moved-aside file.
859     Path movedAsideName = new Path(p + "." + System.currentTimeMillis());
860     if (!fs.rename(p, movedAsideName)) throw new IOException("Failed rename of " + p);
861     setClusterId(fs, rootdir, cid, 100);
862     if (!fs.delete(movedAsideName, false)) {
863       throw new IOException("Failed delete of " + movedAsideName);
864     }
865     LOG.debug("Rewrote the hbase.id file as pb");
866   }
867 
868   /**
869    * Writes a new unique identifier for this cluster to the "hbase.id" file
870    * in the HBase root directory
871    * @param fs the root directory FileSystem
872    * @param rootdir the path to the HBase root directory
873    * @param clusterId the unique identifier to store
874    * @param wait how long (in milliseconds) to wait between retries
875    * @throws IOException if writing to the FileSystem fails and no wait value
876    */
877   public static void setClusterId(FileSystem fs, Path rootdir, ClusterId clusterId,
878       int wait) throws IOException {
879     while (true) {
880       try {
881         Path idFile = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
882         Path tempIdFile = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY +
883           Path.SEPARATOR + HConstants.CLUSTER_ID_FILE_NAME);
884         // Write the id file to a temporary location
885         FSDataOutputStream s = fs.create(tempIdFile);
886         try {
887           s.write(clusterId.toByteArray());
888           s.close();
889           s = null;
890           // Move the temporary file to its normal location. Throw an IOE if
891           // the rename failed
892           if (!fs.rename(tempIdFile, idFile)) {
893             throw new IOException("Unable to move temp version file to " + idFile);
894           }
895         } finally {
896           // Attempt to close the stream if still open on the way out
897           try {
898             if (s != null) s.close();
899           } catch (IOException ignore) { }
900         }
901         if (LOG.isDebugEnabled()) {
902           LOG.debug("Created cluster ID file at " + idFile.toString() + " with ID: " + clusterId);
903         }
904         return;
905       } catch (IOException ioe) {
906         if (wait > 0) {
907           LOG.warn("Unable to create cluster ID file in " + rootdir.toString() +
908               ", retrying in " + wait + "msec: " + StringUtils.stringifyException(ioe));
909           try {
910             Thread.sleep(wait);
911           } catch (InterruptedException e) {
912             throw (InterruptedIOException)new InterruptedIOException().initCause(e);
913           }
914         } else {
915           throw ioe;
916         }
917       }
918     }
919   }
920 
921   /**
922    * Verifies root directory path is a valid URI with a scheme
923    *
924    * @param root root directory path
925    * @return Passed <code>root</code> argument.
926    * @throws IOException if not a valid URI with a scheme
927    */
928   public static Path validateRootPath(Path root) throws IOException {
929     try {
930       URI rootURI = new URI(root.toString());
931       String scheme = rootURI.getScheme();
932       if (scheme == null) {
933         throw new IOException("Root directory does not have a scheme");
934       }
935       return root;
936     } catch (URISyntaxException e) {
937       IOException io = new IOException("Root directory path is not a valid " +
938         "URI -- check your " + HConstants.HBASE_DIR + " configuration");
939       io.initCause(e);
940       throw io;
941     }
942   }
943 
944   /**
945    * Checks for the presence of the root path (using the provided conf object) in the given path. If
946    * it exists, this method removes it and returns the String representation of remaining relative path.
947    * @param path
948    * @param conf
949    * @return String representation of the remaining relative path
950    * @throws IOException
951    */
952   public static String removeRootPath(Path path, final Configuration conf) throws IOException {
953     Path root = FSUtils.getRootDir(conf);
954     String pathStr = path.toString();
955     // check that the path is absolute... it has the root path in it.
956     if (!pathStr.startsWith(root.toString())) return pathStr;
957     // if not, return as it is.
958     return pathStr.substring(root.toString().length() + 1);// remove the "/" too.
959   }
960 
961   /**
962    * If DFS, check safe mode and if so, wait until we clear it.
963    * @param conf configuration
964    * @param wait Sleep between retries
965    * @throws IOException e
966    */
967   public static void waitOnSafeMode(final Configuration conf,
968     final long wait)
969   throws IOException {
970     FileSystem fs = FileSystem.get(conf);
971     if (!(fs instanceof DistributedFileSystem)) return;
972     DistributedFileSystem dfs = (DistributedFileSystem)fs;
973     // Make sure dfs is not in safe mode
974     while (isInSafeMode(dfs)) {
975       LOG.info("Waiting for dfs to exit safe mode...");
976       try {
977         Thread.sleep(wait);
978       } catch (InterruptedException e) {
979         throw (InterruptedIOException)new InterruptedIOException().initCause(e);
980       }
981     }
982   }
983 
984   /**
985    * Return the 'path' component of a Path.  In Hadoop, Path is an URI.  This
986    * method returns the 'path' component of a Path's URI: e.g. If a Path is
987    * <code>hdfs://example.org:9000/hbase_trunk/TestTable/compaction.dir</code>,
988    * this method returns <code>/hbase_trunk/TestTable/compaction.dir</code>.
989    * This method is useful if you want to print out a Path without qualifying
990    * Filesystem instance.
991    * @param p Filesystem Path whose 'path' component we are to return.
992    * @return Path portion of the Filesystem
993    */
994   public static String getPath(Path p) {
995     return p.toUri().getPath();
996   }
997 
998   /**
999    * @param c configuration
1000    * @return Path to hbase root directory: i.e. <code>hbase.rootdir</code> from
1001    * configuration as a qualified Path.
1002    * @throws IOException e
1003    */
1004   public static Path getRootDir(final Configuration c) throws IOException {
1005     Path p = new Path(c.get(HConstants.HBASE_DIR));
1006     FileSystem fs = p.getFileSystem(c);
1007     return p.makeQualified(fs);
1008   }
1009 
1010   public static void setRootDir(final Configuration c, final Path root) throws IOException {
1011     c.set(HConstants.HBASE_DIR, root.toString());
1012   }
1013 
1014   public static void setFsDefault(final Configuration c, final Path root) throws IOException {
1015     c.set("fs.defaultFS", root.toString());    // for hadoop 0.21+
1016   }
1017 
1018   /**
1019    * Checks if meta region exists
1020    *
1021    * @param fs file system
1022    * @param rootdir root directory of HBase installation
1023    * @return true if exists
1024    * @throws IOException e
1025    */
1026   @SuppressWarnings("deprecation")
1027   public static boolean metaRegionExists(FileSystem fs, Path rootdir)
1028   throws IOException {
1029     Path metaRegionDir =
1030       HRegion.getRegionDir(rootdir, HRegionInfo.FIRST_META_REGIONINFO);
1031     return fs.exists(metaRegionDir);
1032   }
1033 
1034   /**
1035    * Compute HDFS blocks distribution of a given file, or a portion of the file
1036    * @param fs file system
1037    * @param status file status of the file
1038    * @param start start position of the portion
1039    * @param length length of the portion
1040    * @return The HDFS blocks distribution
1041    */
1042   static public HDFSBlocksDistribution computeHDFSBlocksDistribution(
1043     final FileSystem fs, FileStatus status, long start, long length)
1044     throws IOException {
1045     HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution();
1046     BlockLocation [] blockLocations =
1047       fs.getFileBlockLocations(status, start, length);
1048     for(BlockLocation bl : blockLocations) {
1049       String [] hosts = bl.getHosts();
1050       long len = bl.getLength();
1051       blocksDistribution.addHostsAndBlockWeight(hosts, len);
1052     }
1053 
1054     return blocksDistribution;
1055   }
1056 
1057 
1058 
1059   /**
1060    * Runs through the hbase rootdir and checks all stores have only
1061    * one file in them -- that is, they've been major compacted.  Looks
1062    * at root and meta tables too.
1063    * @param fs filesystem
1064    * @param hbaseRootDir hbase root directory
1065    * @return True if this hbase install is major compacted.
1066    * @throws IOException e
1067    */
1068   public static boolean isMajorCompacted(final FileSystem fs,
1069       final Path hbaseRootDir)
1070   throws IOException {
1071     List<Path> tableDirs = getTableDirs(fs, hbaseRootDir);
1072     PathFilter regionFilter = new RegionDirFilter(fs);
1073     PathFilter familyFilter = new FamilyDirFilter(fs);
1074     for (Path d : tableDirs) {
1075       FileStatus[] regionDirs = fs.listStatus(d, regionFilter);
1076       for (FileStatus regionDir : regionDirs) {
1077         Path dd = regionDir.getPath();
1078         // Else its a region name.  Now look in region for families.
1079         FileStatus[] familyDirs = fs.listStatus(dd, familyFilter);
1080         for (FileStatus familyDir : familyDirs) {
1081           Path family = familyDir.getPath();
1082           // Now in family make sure only one file.
1083           FileStatus[] familyStatus = fs.listStatus(family);
1084           if (familyStatus.length > 1) {
1085             LOG.debug(family.toString() + " has " + familyStatus.length +
1086                 " files.");
1087             return false;
1088           }
1089         }
1090       }
1091     }
1092     return true;
1093   }
1094 
1095   // TODO move this method OUT of FSUtils. No dependencies to HMaster
1096   /**
1097    * Returns the total overall fragmentation percentage. Includes hbase:meta and
1098    * -ROOT- as well.
1099    *
1100    * @param master  The master defining the HBase root and file system.
1101    * @return A map for each table and its percentage.
1102    * @throws IOException When scanning the directory fails.
1103    */
1104   public static int getTotalTableFragmentation(final HMaster master)
1105   throws IOException {
1106     Map<String, Integer> map = getTableFragmentation(master);
1107     return map != null && map.size() > 0 ? map.get("-TOTAL-") : -1;
1108   }
1109 
1110   /**
1111    * Runs through the HBase rootdir and checks how many stores for each table
1112    * have more than one file in them. Checks -ROOT- and hbase:meta too. The total
1113    * percentage across all tables is stored under the special key "-TOTAL-".
1114    *
1115    * @param master  The master defining the HBase root and file system.
1116    * @return A map for each table and its percentage.
1117    *
1118    * @throws IOException When scanning the directory fails.
1119    */
1120   public static Map<String, Integer> getTableFragmentation(
1121     final HMaster master)
1122   throws IOException {
1123     Path path = getRootDir(master.getConfiguration());
1124     // since HMaster.getFileSystem() is package private
1125     FileSystem fs = path.getFileSystem(master.getConfiguration());
1126     return getTableFragmentation(fs, path);
1127   }
1128 
1129   /**
1130    * Runs through the HBase rootdir and checks how many stores for each table
1131    * have more than one file in them. Checks -ROOT- and hbase:meta too. The total
1132    * percentage across all tables is stored under the special key "-TOTAL-".
1133    *
1134    * @param fs  The file system to use.
1135    * @param hbaseRootDir  The root directory to scan.
1136    * @return A map for each table and its percentage.
1137    * @throws IOException When scanning the directory fails.
1138    */
1139   public static Map<String, Integer> getTableFragmentation(
1140     final FileSystem fs, final Path hbaseRootDir)
1141   throws IOException {
1142     Map<String, Integer> frags = new HashMap<String, Integer>();
1143     int cfCountTotal = 0;
1144     int cfFragTotal = 0;
1145     PathFilter regionFilter = new RegionDirFilter(fs);
1146     PathFilter familyFilter = new FamilyDirFilter(fs);
1147     List<Path> tableDirs = getTableDirs(fs, hbaseRootDir);
1148     for (Path d : tableDirs) {
1149       int cfCount = 0;
1150       int cfFrag = 0;
1151       FileStatus[] regionDirs = fs.listStatus(d, regionFilter);
1152       for (FileStatus regionDir : regionDirs) {
1153         Path dd = regionDir.getPath();
1154         // else its a region name, now look in region for families
1155         FileStatus[] familyDirs = fs.listStatus(dd, familyFilter);
1156         for (FileStatus familyDir : familyDirs) {
1157           cfCount++;
1158           cfCountTotal++;
1159           Path family = familyDir.getPath();
1160           // now in family make sure only one file
1161           FileStatus[] familyStatus = fs.listStatus(family);
1162           if (familyStatus.length > 1) {
1163             cfFrag++;
1164             cfFragTotal++;
1165           }
1166         }
1167       }
1168       // compute percentage per table and store in result list
1169       frags.put(FSUtils.getTableName(d).getNameAsString(),
1170         cfCount == 0? 0: Math.round((float) cfFrag / cfCount * 100));
1171     }
1172     // set overall percentage for all tables
1173     frags.put("-TOTAL-",
1174       cfCountTotal == 0? 0: Math.round((float) cfFragTotal / cfCountTotal * 100));
1175     return frags;
1176   }
1177 
1178   /**
1179    * Returns the {@link org.apache.hadoop.fs.Path} object representing the table directory under
1180    * path rootdir
1181    *
1182    * @param rootdir qualified path of HBase root directory
1183    * @param tableName name of table
1184    * @return {@link org.apache.hadoop.fs.Path} for table
1185    */
1186   public static Path getTableDir(Path rootdir, final TableName tableName) {
1187     return new Path(getNamespaceDir(rootdir, tableName.getNamespaceAsString()),
1188         tableName.getQualifierAsString());
1189   }
1190 
1191   /**
1192    * Returns the {@link org.apache.hadoop.hbase.TableName} object representing
1193    * the table directory under
1194    * path rootdir
1195    *
1196    * @param tablePath path of table
1197    * @return {@link org.apache.hadoop.fs.Path} for table
1198    */
1199   public static TableName getTableName(Path tablePath) {
1200     return TableName.valueOf(tablePath.getParent().getName(), tablePath.getName());
1201   }
1202 
1203   /**
1204    * Returns the {@link org.apache.hadoop.fs.Path} object representing
1205    * the namespace directory under path rootdir
1206    *
1207    * @param rootdir qualified path of HBase root directory
1208    * @param namespace namespace name
1209    * @return {@link org.apache.hadoop.fs.Path} for table
1210    */
1211   public static Path getNamespaceDir(Path rootdir, final String namespace) {
1212     return new Path(rootdir, new Path(HConstants.BASE_NAMESPACE_DIR,
1213         new Path(namespace)));
1214   }
1215 
1216   /**
1217    * A {@link PathFilter} that returns only regular files.
1218    */
1219   static class FileFilter implements PathFilter {
1220     private final FileSystem fs;
1221 
1222     public FileFilter(final FileSystem fs) {
1223       this.fs = fs;
1224     }
1225 
1226     @Override
1227     public boolean accept(Path p) {
1228       try {
1229         return fs.isFile(p);
1230       } catch (IOException e) {
1231         LOG.debug("unable to verify if path=" + p + " is a regular file", e);
1232         return false;
1233       }
1234     }
1235   }
1236 
1237   /**
1238    * Directory filter that doesn't include any of the directories in the specified blacklist
1239    */
1240   public static class BlackListDirFilter implements PathFilter {
1241     private final FileSystem fs;
1242     private List<String> blacklist;
1243 
1244     /**
1245      * Create a filter on the givem filesystem with the specified blacklist
1246      * @param fs filesystem to filter
1247      * @param directoryNameBlackList list of the names of the directories to filter. If
1248      *          <tt>null</tt>, all directories are returned
1249      */
1250     @SuppressWarnings("unchecked")
1251     public BlackListDirFilter(final FileSystem fs, final List<String> directoryNameBlackList) {
1252       this.fs = fs;
1253       blacklist =
1254         (List<String>) (directoryNameBlackList == null ? Collections.emptyList()
1255           : directoryNameBlackList);
1256     }
1257 
1258     @Override
1259     public boolean accept(Path p) {
1260       boolean isValid = false;
1261       try {
1262         if (isValidName(p.getName())) {
1263           isValid = fs.getFileStatus(p).isDirectory();
1264         } else {
1265           isValid = false;
1266         }
1267       } catch (IOException e) {
1268         LOG.warn("An error occurred while verifying if [" + p.toString()
1269             + "] is a valid directory. Returning 'not valid' and continuing.", e);
1270       }
1271       return isValid;
1272     }
1273 
1274     protected boolean isValidName(final String name) {
1275       return !blacklist.contains(name);
1276     }
1277   }
1278 
1279   /**
1280    * A {@link PathFilter} that only allows directories.
1281    */
1282   public static class DirFilter extends BlackListDirFilter {
1283 
1284     public DirFilter(FileSystem fs) {
1285       super(fs, null);
1286     }
1287   }
1288 
1289   /**
1290    * A {@link PathFilter} that returns usertable directories. To get all directories use the
1291    * {@link BlackListDirFilter} with a <tt>null</tt> blacklist
1292    */
1293   public static class UserTableDirFilter extends BlackListDirFilter {
1294     public UserTableDirFilter(FileSystem fs) {
1295       super(fs, HConstants.HBASE_NON_TABLE_DIRS);
1296     }
1297 
1298     protected boolean isValidName(final String name) {
1299       if (!super.isValidName(name))
1300         return false;
1301 
1302       try {
1303         TableName.isLegalTableQualifierName(Bytes.toBytes(name));
1304       } catch (IllegalArgumentException e) {
1305         LOG.info("INVALID NAME " + name);
1306         return false;
1307       }
1308       return true;
1309     }
1310   }
1311 
1312   /**
1313    * Heuristic to determine whether is safe or not to open a file for append
1314    * Looks both for dfs.support.append and use reflection to search
1315    * for SequenceFile.Writer.syncFs() or FSDataOutputStream.hflush()
1316    * @param conf
1317    * @return True if append support
1318    */
1319   public static boolean isAppendSupported(final Configuration conf) {
1320     boolean append = conf.getBoolean("dfs.support.append", false);
1321     if (append) {
1322       try {
1323         // TODO: The implementation that comes back when we do a createWriter
1324         // may not be using SequenceFile so the below is not a definitive test.
1325         // Will do for now (hdfs-200).
1326         SequenceFile.Writer.class.getMethod("syncFs", new Class<?> []{});
1327         append = true;
1328       } catch (SecurityException e) {
1329       } catch (NoSuchMethodException e) {
1330         append = false;
1331       }
1332     }
1333     if (!append) {
1334       // Look for the 0.21, 0.22, new-style append evidence.
1335       try {
1336         FSDataOutputStream.class.getMethod("hflush", new Class<?> []{});
1337         append = true;
1338       } catch (NoSuchMethodException e) {
1339         append = false;
1340       }
1341     }
1342     return append;
1343   }
1344 
1345   /**
1346    * @param conf
1347    * @return True if this filesystem whose scheme is 'hdfs'.
1348    * @throws IOException
1349    */
1350   public static boolean isHDFS(final Configuration conf) throws IOException {
1351     FileSystem fs = FileSystem.get(conf);
1352     String scheme = fs.getUri().getScheme();
1353     return scheme.equalsIgnoreCase("hdfs");
1354   }
1355 
1356   /**
1357    * Recover file lease. Used when a file might be suspect
1358    * to be had been left open by another process.
1359    * @param fs FileSystem handle
1360    * @param p Path of file to recover lease
1361    * @param conf Configuration handle
1362    * @throws IOException
1363    */
1364   public abstract void recoverFileLease(final FileSystem fs, final Path p,
1365       Configuration conf, CancelableProgressable reporter) throws IOException;
1366 
1367   public static List<Path> getTableDirs(final FileSystem fs, final Path rootdir)
1368       throws IOException {
1369     List<Path> tableDirs = new LinkedList<Path>();
1370 
1371     for(FileStatus status :
1372         fs.globStatus(new Path(rootdir,
1373             new Path(HConstants.BASE_NAMESPACE_DIR, "*")))) {
1374       tableDirs.addAll(FSUtils.getLocalTableDirs(fs, status.getPath()));
1375     }
1376     return tableDirs;
1377   }
1378 
1379   /**
1380    * @param fs
1381    * @param rootdir
1382    * @return All the table directories under <code>rootdir</code>. Ignore non table hbase folders such as
1383    * .logs, .oldlogs, .corrupt folders.
1384    * @throws IOException
1385    */
1386   public static List<Path> getLocalTableDirs(final FileSystem fs, final Path rootdir)
1387       throws IOException {
1388     // presumes any directory under hbase.rootdir is a table
1389     FileStatus[] dirs = fs.listStatus(rootdir, new UserTableDirFilter(fs));
1390     List<Path> tabledirs = new ArrayList<Path>(dirs.length);
1391     for (FileStatus dir: dirs) {
1392       tabledirs.add(dir.getPath());
1393     }
1394     return tabledirs;
1395   }
1396 
1397   /**
1398    * Checks if the given path is the one with 'recovered.edits' dir.
1399    * @param path
1400    * @return True if we recovered edits
1401    */
1402   public static boolean isRecoveredEdits(Path path) {
1403     return path.toString().contains(HConstants.RECOVERED_EDITS_DIR);
1404   }
1405 
1406   /**
1407    * Filter for all dirs that don't start with '.'
1408    */
1409   public static class RegionDirFilter implements PathFilter {
1410     // This pattern will accept 0.90+ style hex region dirs and older numeric region dir names.
1411     final public static Pattern regionDirPattern = Pattern.compile("^[0-9a-f]*$");
1412     final FileSystem fs;
1413 
1414     public RegionDirFilter(FileSystem fs) {
1415       this.fs = fs;
1416     }
1417 
1418     @Override
1419     public boolean accept(Path rd) {
1420       if (!regionDirPattern.matcher(rd.getName()).matches()) {
1421         return false;
1422       }
1423 
1424       try {
1425         return fs.getFileStatus(rd).isDirectory();
1426       } catch (IOException ioe) {
1427         // Maybe the file was moved or the fs was disconnected.
1428         LOG.warn("Skipping file " + rd +" due to IOException", ioe);
1429         return false;
1430       }
1431     }
1432   }
1433 
1434   /**
1435    * Given a particular table dir, return all the regiondirs inside it, excluding files such as
1436    * .tableinfo
1437    * @param fs A file system for the Path
1438    * @param tableDir Path to a specific table directory <hbase.rootdir>/<tabledir>
1439    * @return List of paths to valid region directories in table dir.
1440    * @throws IOException
1441    */
1442   public static List<Path> getRegionDirs(final FileSystem fs, final Path tableDir) throws IOException {
1443     // assumes we are in a table dir.
1444     FileStatus[] rds = fs.listStatus(tableDir, new RegionDirFilter(fs));
1445     List<Path> regionDirs = new ArrayList<Path>(rds.length);
1446     for (FileStatus rdfs: rds) {
1447       Path rdPath = rdfs.getPath();
1448       regionDirs.add(rdPath);
1449     }
1450     return regionDirs;
1451   }
1452 
1453   /**
1454    * Filter for all dirs that are legal column family names.  This is generally used for colfam
1455    * dirs <hbase.rootdir>/<tabledir>/<regiondir>/<colfamdir>.
1456    */
1457   public static class FamilyDirFilter implements PathFilter {
1458     final FileSystem fs;
1459 
1460     public FamilyDirFilter(FileSystem fs) {
1461       this.fs = fs;
1462     }
1463 
1464     @Override
1465     public boolean accept(Path rd) {
1466       try {
1467         // throws IAE if invalid
1468         HColumnDescriptor.isLegalFamilyName(Bytes.toBytes(rd.getName()));
1469       } catch (IllegalArgumentException iae) {
1470         // path name is an invalid family name and thus is excluded.
1471         return false;
1472       }
1473 
1474       try {
1475         return fs.getFileStatus(rd).isDirectory();
1476       } catch (IOException ioe) {
1477         // Maybe the file was moved or the fs was disconnected.
1478         LOG.warn("Skipping file " + rd +" due to IOException", ioe);
1479         return false;
1480       }
1481     }
1482   }
1483 
1484   /**
1485    * Given a particular region dir, return all the familydirs inside it
1486    *
1487    * @param fs A file system for the Path
1488    * @param regionDir Path to a specific region directory
1489    * @return List of paths to valid family directories in region dir.
1490    * @throws IOException
1491    */
1492   public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir) throws IOException {
1493     // assumes we are in a region dir.
1494     FileStatus[] fds = fs.listStatus(regionDir, new FamilyDirFilter(fs));
1495     List<Path> familyDirs = new ArrayList<Path>(fds.length);
1496     for (FileStatus fdfs: fds) {
1497       Path fdPath = fdfs.getPath();
1498       familyDirs.add(fdPath);
1499     }
1500     return familyDirs;
1501   }
1502 
1503   public static List<Path> getReferenceFilePaths(final FileSystem fs, final Path familyDir) throws IOException {
1504     FileStatus[] fds = fs.listStatus(familyDir, new ReferenceFileFilter(fs));
1505     List<Path> referenceFiles = new ArrayList<Path>(fds.length);
1506     for (FileStatus fdfs: fds) {
1507       Path fdPath = fdfs.getPath();
1508       referenceFiles.add(fdPath);
1509     }
1510     return referenceFiles;
1511   }
1512 
1513   /**
1514    * Filter for HFiles that excludes reference files.
1515    */
1516   public static class HFileFilter implements PathFilter {
1517     final FileSystem fs;
1518 
1519     public HFileFilter(FileSystem fs) {
1520       this.fs = fs;
1521     }
1522 
1523     @Override
1524     public boolean accept(Path rd) {
1525       try {
1526         // only files
1527         return !fs.getFileStatus(rd).isDirectory() && StoreFileInfo.isHFile(rd);
1528       } catch (IOException ioe) {
1529         // Maybe the file was moved or the fs was disconnected.
1530         LOG.warn("Skipping file " + rd +" due to IOException", ioe);
1531         return false;
1532       }
1533     }
1534   }
1535 
1536   public static class ReferenceFileFilter implements PathFilter {
1537 
1538     private final FileSystem fs;
1539 
1540     public ReferenceFileFilter(FileSystem fs) {
1541       this.fs = fs;
1542     }
1543 
1544     @Override
1545     public boolean accept(Path rd) {
1546       try {
1547         // only files can be references.
1548         return !fs.getFileStatus(rd).isDirectory() && StoreFileInfo.isReference(rd);
1549       } catch (IOException ioe) {
1550         // Maybe the file was moved or the fs was disconnected.
1551         LOG.warn("Skipping file " + rd +" due to IOException", ioe);
1552         return false;
1553       }
1554     }
1555   }
1556 
1557 
1558   /**
1559    * @param conf
1560    * @return Returns the filesystem of the hbase rootdir.
1561    * @throws IOException
1562    */
1563   public static FileSystem getCurrentFileSystem(Configuration conf)
1564   throws IOException {
1565     return getRootDir(conf).getFileSystem(conf);
1566   }
1567 
1568 
1569   /**
1570    * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1571    * table StoreFile names to the full Path.
1572    * <br>
1573    * Example...<br>
1574    * Key = 3944417774205889744  <br>
1575    * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1576    *
1577    * @param map map to add values.  If null, this method will create and populate one to return
1578    * @param fs  The file system to use.
1579    * @param hbaseRootDir  The root directory to scan.
1580    * @param tableName name of the table to scan.
1581    * @return Map keyed by StoreFile name with a value of the full Path.
1582    * @throws IOException When scanning the directory fails.
1583    */
1584   public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map,
1585   final FileSystem fs, final Path hbaseRootDir, TableName tableName)
1586   throws IOException {
1587     return getTableStoreFilePathMap(map, fs, hbaseRootDir, tableName, null);
1588   }
1589 
1590   /**
1591    * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1592    * table StoreFile names to the full Path.
1593    * <br>
1594    * Example...<br>
1595    * Key = 3944417774205889744  <br>
1596    * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1597    *
1598    * @param map map to add values.  If null, this method will create and populate one to return
1599    * @param fs  The file system to use.
1600    * @param hbaseRootDir  The root directory to scan.
1601    * @param tableName name of the table to scan.
1602    * @param errors ErrorReporter instance or null
1603    * @return Map keyed by StoreFile name with a value of the full Path.
1604    * @throws IOException When scanning the directory fails.
1605    */
1606   public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map,
1607   final FileSystem fs, final Path hbaseRootDir, TableName tableName, ErrorReporter errors)
1608   throws IOException {
1609     if (map == null) {
1610       map = new HashMap<String, Path>();
1611     }
1612 
1613     // only include the directory paths to tables
1614     Path tableDir = FSUtils.getTableDir(hbaseRootDir, tableName);
1615     // Inside a table, there are compaction.dir directories to skip.  Otherwise, all else
1616     // should be regions.
1617     PathFilter familyFilter = new FamilyDirFilter(fs);
1618     FileStatus[] regionDirs = fs.listStatus(tableDir, new RegionDirFilter(fs));
1619     for (FileStatus regionDir : regionDirs) {
1620       if (null != errors) {
1621         errors.progress();
1622       }
1623       Path dd = regionDir.getPath();
1624       // else its a region name, now look in region for families
1625       FileStatus[] familyDirs = fs.listStatus(dd, familyFilter);
1626       for (FileStatus familyDir : familyDirs) {
1627         if (null != errors) {
1628           errors.progress();
1629         }
1630         Path family = familyDir.getPath();
1631         if (family.getName().equals(HConstants.RECOVERED_EDITS_DIR)) {
1632           continue;
1633         }
1634         // now in family, iterate over the StoreFiles and
1635         // put in map
1636         FileStatus[] familyStatus = fs.listStatus(family);
1637         for (FileStatus sfStatus : familyStatus) {
1638           if (null != errors) {
1639             errors.progress();
1640           }
1641           Path sf = sfStatus.getPath();
1642           map.put( sf.getName(), sf);
1643         }
1644       }
1645     }
1646     return map;
1647   }
1648 
1649   public static int getRegionReferenceFileCount(final FileSystem fs, final Path p) {
1650     int result = 0;
1651     try {
1652       for (Path familyDir:getFamilyDirs(fs, p)){
1653         result += getReferenceFilePaths(fs, familyDir).size();
1654       }
1655     } catch (IOException e) {
1656       LOG.warn("Error Counting reference files.", e);
1657     }
1658     return result;
1659   }
1660 
1661   /**
1662    * Runs through the HBase rootdir and creates a reverse lookup map for
1663    * table StoreFile names to the full Path.
1664    * <br>
1665    * Example...<br>
1666    * Key = 3944417774205889744  <br>
1667    * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1668    *
1669    * @param fs  The file system to use.
1670    * @param hbaseRootDir  The root directory to scan.
1671    * @return Map keyed by StoreFile name with a value of the full Path.
1672    * @throws IOException When scanning the directory fails.
1673    */
1674   public static Map<String, Path> getTableStoreFilePathMap(
1675     final FileSystem fs, final Path hbaseRootDir)
1676   throws IOException {
1677     return getTableStoreFilePathMap(fs, hbaseRootDir, null);
1678   }
1679 
1680   /**
1681    * Runs through the HBase rootdir and creates a reverse lookup map for
1682    * table StoreFile names to the full Path.
1683    * <br>
1684    * Example...<br>
1685    * Key = 3944417774205889744  <br>
1686    * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1687    *
1688    * @param fs  The file system to use.
1689    * @param hbaseRootDir  The root directory to scan.
1690    * @param errors ErrorReporter instance or null
1691    * @return Map keyed by StoreFile name with a value of the full Path.
1692    * @throws IOException When scanning the directory fails.
1693    */
1694   public static Map<String, Path> getTableStoreFilePathMap(
1695     final FileSystem fs, final Path hbaseRootDir, ErrorReporter errors)
1696   throws IOException {
1697     Map<String, Path> map = new HashMap<String, Path>();
1698 
1699     // if this method looks similar to 'getTableFragmentation' that is because
1700     // it was borrowed from it.
1701 
1702     // only include the directory paths to tables
1703     for (Path tableDir : FSUtils.getTableDirs(fs, hbaseRootDir)) {
1704       getTableStoreFilePathMap(map, fs, hbaseRootDir,
1705           FSUtils.getTableName(tableDir), errors);
1706     }
1707     return map;
1708   }
1709 
1710   /**
1711    * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
1712    * This accommodates differences between hadoop versions, where hadoop 1
1713    * does not throw a FileNotFoundException, and return an empty FileStatus[]
1714    * while Hadoop 2 will throw FileNotFoundException.
1715    *
1716    * @param fs file system
1717    * @param dir directory
1718    * @param filter path filter
1719    * @return null if dir is empty or doesn't exist, otherwise FileStatus array
1720    */
1721   public static FileStatus [] listStatus(final FileSystem fs,
1722       final Path dir, final PathFilter filter) throws IOException {
1723     FileStatus [] status = null;
1724     try {
1725       status = filter == null ? fs.listStatus(dir) : fs.listStatus(dir, filter);
1726     } catch (FileNotFoundException fnfe) {
1727       // if directory doesn't exist, return null
1728       if (LOG.isTraceEnabled()) {
1729         LOG.trace(dir + " doesn't exist");
1730       }
1731     }
1732     if (status == null || status.length < 1) return null;
1733     return status;
1734   }
1735 
1736   /**
1737    * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
1738    * This would accommodates differences between hadoop versions
1739    *
1740    * @param fs file system
1741    * @param dir directory
1742    * @return null if dir is empty or doesn't exist, otherwise FileStatus array
1743    */
1744   public static FileStatus[] listStatus(final FileSystem fs, final Path dir) throws IOException {
1745     return listStatus(fs, dir, null);
1746   }
1747 
1748   /**
1749    * Calls fs.delete() and returns the value returned by the fs.delete()
1750    *
1751    * @param fs
1752    * @param path
1753    * @param recursive
1754    * @return the value returned by the fs.delete()
1755    * @throws IOException
1756    */
1757   public static boolean delete(final FileSystem fs, final Path path, final boolean recursive)
1758       throws IOException {
1759     return fs.delete(path, recursive);
1760   }
1761 
1762   /**
1763    * Calls fs.exists(). Checks if the specified path exists
1764    *
1765    * @param fs
1766    * @param path
1767    * @return the value returned by fs.exists()
1768    * @throws IOException
1769    */
1770   public static boolean isExists(final FileSystem fs, final Path path) throws IOException {
1771     return fs.exists(path);
1772   }
1773 
1774   /**
1775    * Throw an exception if an action is not permitted by a user on a file.
1776    *
1777    * @param ugi
1778    *          the user
1779    * @param file
1780    *          the file
1781    * @param action
1782    *          the action
1783    */
1784   public static void checkAccess(UserGroupInformation ugi, FileStatus file,
1785       FsAction action) throws AccessDeniedException {
1786     if (ugi.getShortUserName().equals(file.getOwner())) {
1787       if (file.getPermission().getUserAction().implies(action)) {
1788         return;
1789       }
1790     } else if (contains(ugi.getGroupNames(), file.getGroup())) {
1791       if (file.getPermission().getGroupAction().implies(action)) {
1792         return;
1793       }
1794     } else if (file.getPermission().getOtherAction().implies(action)) {
1795       return;
1796     }
1797     throw new AccessDeniedException("Permission denied:" + " action=" + action
1798         + " path=" + file.getPath() + " user=" + ugi.getShortUserName());
1799   }
1800 
1801   private static boolean contains(String[] groups, String user) {
1802     for (String group : groups) {
1803       if (group.equals(user)) {
1804         return true;
1805       }
1806     }
1807     return false;
1808   }
1809 
1810   /**
1811    * Log the current state of the filesystem from a certain root directory
1812    * @param fs filesystem to investigate
1813    * @param root root file/directory to start logging from
1814    * @param LOG log to output information
1815    * @throws IOException if an unexpected exception occurs
1816    */
1817   public static void logFileSystemState(final FileSystem fs, final Path root, Log LOG)
1818       throws IOException {
1819     LOG.debug("Current file system:");
1820     logFSTree(LOG, fs, root, "|-");
1821   }
1822 
1823   /**
1824    * Recursive helper to log the state of the FS
1825    *
1826    * @see #logFileSystemState(FileSystem, Path, Log)
1827    */
1828   private static void logFSTree(Log LOG, final FileSystem fs, final Path root, String prefix)
1829       throws IOException {
1830     FileStatus[] files = FSUtils.listStatus(fs, root, null);
1831     if (files == null) return;
1832 
1833     for (FileStatus file : files) {
1834       if (file.isDirectory()) {
1835         LOG.debug(prefix + file.getPath().getName() + "/");
1836         logFSTree(LOG, fs, file.getPath(), prefix + "---");
1837       } else {
1838         LOG.debug(prefix + file.getPath().getName());
1839       }
1840     }
1841   }
1842 
1843   public static boolean renameAndSetModifyTime(final FileSystem fs, final Path src, final Path dest)
1844       throws IOException {
1845     // set the modify time for TimeToLive Cleaner
1846     fs.setTimes(src, EnvironmentEdgeManager.currentTime(), -1);
1847     return fs.rename(src, dest);
1848   }
1849 
1850   /**
1851    * This function is to scan the root path of the file system to get the
1852    * degree of locality for each region on each of the servers having at least
1853    * one block of that region.
1854    * This is used by the tool {@link org.apache.hadoop.hbase.master.RegionPlacementMaintainer}
1855    *
1856    * @param conf
1857    *          the configuration to use
1858    * @return the mapping from region encoded name to a map of server names to
1859    *           locality fraction
1860    * @throws IOException
1861    *           in case of file system errors or interrupts
1862    */
1863   public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
1864       final Configuration conf) throws IOException {
1865     return getRegionDegreeLocalityMappingFromFS(
1866         conf, null,
1867         conf.getInt(THREAD_POOLSIZE, DEFAULT_THREAD_POOLSIZE));
1868 
1869   }
1870 
1871   /**
1872    * This function is to scan the root path of the file system to get the
1873    * degree of locality for each region on each of the servers having at least
1874    * one block of that region.
1875    *
1876    * @param conf
1877    *          the configuration to use
1878    * @param desiredTable
1879    *          the table you wish to scan locality for
1880    * @param threadPoolSize
1881    *          the thread pool size to use
1882    * @return the mapping from region encoded name to a map of server names to
1883    *           locality fraction
1884    * @throws IOException
1885    *           in case of file system errors or interrupts
1886    */
1887   public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
1888       final Configuration conf, final String desiredTable, int threadPoolSize)
1889       throws IOException {
1890     Map<String, Map<String, Float>> regionDegreeLocalityMapping =
1891         new ConcurrentHashMap<String, Map<String, Float>>();
1892     getRegionLocalityMappingFromFS(conf, desiredTable, threadPoolSize, null,
1893         regionDegreeLocalityMapping);
1894     return regionDegreeLocalityMapping;
1895   }
1896 
1897   /**
1898    * This function is to scan the root path of the file system to get either the
1899    * mapping between the region name and its best locality region server or the
1900    * degree of locality of each region on each of the servers having at least
1901    * one block of that region. The output map parameters are both optional.
1902    *
1903    * @param conf
1904    *          the configuration to use
1905    * @param desiredTable
1906    *          the table you wish to scan locality for
1907    * @param threadPoolSize
1908    *          the thread pool size to use
1909    * @param regionToBestLocalityRSMapping
1910    *          the map into which to put the best locality mapping or null
1911    * @param regionDegreeLocalityMapping
1912    *          the map into which to put the locality degree mapping or null,
1913    *          must be a thread-safe implementation
1914    * @throws IOException
1915    *           in case of file system errors or interrupts
1916    */
1917   private static void getRegionLocalityMappingFromFS(
1918       final Configuration conf, final String desiredTable,
1919       int threadPoolSize,
1920       Map<String, String> regionToBestLocalityRSMapping,
1921       Map<String, Map<String, Float>> regionDegreeLocalityMapping)
1922       throws IOException {
1923     FileSystem fs =  FileSystem.get(conf);
1924     Path rootPath = FSUtils.getRootDir(conf);
1925     long startTime = EnvironmentEdgeManager.currentTime();
1926     Path queryPath;
1927     // The table files are in ${hbase.rootdir}/data/<namespace>/<table>/*
1928     if (null == desiredTable) {
1929       queryPath = new Path(new Path(rootPath, HConstants.BASE_NAMESPACE_DIR).toString() + "/*/*/*/");
1930     } else {
1931       queryPath = new Path(FSUtils.getTableDir(rootPath, TableName.valueOf(desiredTable)).toString() + "/*/");
1932     }
1933 
1934     // reject all paths that are not appropriate
1935     PathFilter pathFilter = new PathFilter() {
1936       @Override
1937       public boolean accept(Path path) {
1938         // this is the region name; it may get some noise data
1939         if (null == path) {
1940           return false;
1941         }
1942 
1943         // no parent?
1944         Path parent = path.getParent();
1945         if (null == parent) {
1946           return false;
1947         }
1948 
1949         String regionName = path.getName();
1950         if (null == regionName) {
1951           return false;
1952         }
1953 
1954         if (!regionName.toLowerCase().matches("[0-9a-f]+")) {
1955           return false;
1956         }
1957         return true;
1958       }
1959     };
1960 
1961     FileStatus[] statusList = fs.globStatus(queryPath, pathFilter);
1962 
1963     if (null == statusList) {
1964       return;
1965     } else {
1966       LOG.debug("Query Path: " + queryPath + " ; # list of files: " +
1967           statusList.length);
1968     }
1969 
1970     // lower the number of threads in case we have very few expected regions
1971     threadPoolSize = Math.min(threadPoolSize, statusList.length);
1972 
1973     // run in multiple threads
1974     ThreadPoolExecutor tpe = new ThreadPoolExecutor(threadPoolSize,
1975         threadPoolSize, 60, TimeUnit.SECONDS,
1976         new ArrayBlockingQueue<Runnable>(statusList.length));
1977     try {
1978       // ignore all file status items that are not of interest
1979       for (FileStatus regionStatus : statusList) {
1980         if (null == regionStatus) {
1981           continue;
1982         }
1983 
1984         if (!regionStatus.isDirectory()) {
1985           continue;
1986         }
1987 
1988         Path regionPath = regionStatus.getPath();
1989         if (null == regionPath) {
1990           continue;
1991         }
1992 
1993         tpe.execute(new FSRegionScanner(fs, regionPath,
1994             regionToBestLocalityRSMapping, regionDegreeLocalityMapping));
1995       }
1996     } finally {
1997       tpe.shutdown();
1998       int threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY,
1999           60 * 1000);
2000       try {
2001         // here we wait until TPE terminates, which is either naturally or by
2002         // exceptions in the execution of the threads
2003         while (!tpe.awaitTermination(threadWakeFrequency,
2004             TimeUnit.MILLISECONDS)) {
2005           // printing out rough estimate, so as to not introduce
2006           // AtomicInteger
2007           LOG.info("Locality checking is underway: { Scanned Regions : "
2008               + tpe.getCompletedTaskCount() + "/"
2009               + tpe.getTaskCount() + " }");
2010         }
2011       } catch (InterruptedException e) {
2012         throw (InterruptedIOException)new InterruptedIOException().initCause(e);
2013       }
2014     }
2015 
2016     long overhead = EnvironmentEdgeManager.currentTime() - startTime;
2017     String overheadMsg = "Scan DFS for locality info takes " + overhead + " ms";
2018 
2019     LOG.info(overheadMsg);
2020   }
2021 
2022   /**
2023    * Do our short circuit read setup.
2024    * Checks buffer size to use and whether to do checksumming in hbase or hdfs.
2025    * @param conf
2026    */
2027   public static void setupShortCircuitRead(final Configuration conf) {
2028     // Check that the user has not set the "dfs.client.read.shortcircuit.skip.checksum" property.
2029     boolean shortCircuitSkipChecksum =
2030       conf.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false);
2031     boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
2032     if (shortCircuitSkipChecksum) {
2033       LOG.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not " +
2034         "be set to true." + (useHBaseChecksum ? " HBase checksum doesn't require " +
2035         "it, see https://issues.apache.org/jira/browse/HBASE-6868." : ""));
2036       assert !shortCircuitSkipChecksum; //this will fail if assertions are on
2037     }
2038     checkShortCircuitReadBufferSize(conf);
2039   }
2040 
2041   /**
2042    * Check if short circuit read buffer size is set and if not, set it to hbase value.
2043    * @param conf
2044    */
2045   public static void checkShortCircuitReadBufferSize(final Configuration conf) {
2046     final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2;
2047     final int notSet = -1;
2048     // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2
2049     final String dfsKey = "dfs.client.read.shortcircuit.buffer.size";
2050     int size = conf.getInt(dfsKey, notSet);
2051     // If a size is set, return -- we will use it.
2052     if (size != notSet) return;
2053     // But short circuit buffer size is normally not set.  Put in place the hbase wanted size.
2054     int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize);
2055     conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
2056   }
2057 
2058   /**
2059    * @param c
2060    * @return The DFSClient DFSHedgedReadMetrics instance or null if can't be found or not on hdfs.
2061    * @throws IOException 
2062    */
2063   public static DFSHedgedReadMetrics getDFSHedgedReadMetrics(final Configuration c)
2064       throws IOException {
2065     if (!isHDFS(c)) return null;
2066     // getHedgedReadMetrics is package private. Get the DFSClient instance that is internal
2067     // to the DFS FS instance and make the method getHedgedReadMetrics accessible, then invoke it
2068     // to get the singleton instance of DFSHedgedReadMetrics shared by DFSClients.
2069     final String name = "getHedgedReadMetrics";
2070     DFSClient dfsclient = ((DistributedFileSystem)FileSystem.get(c)).getClient();
2071     Method m;
2072     try {
2073       m = dfsclient.getClass().getDeclaredMethod(name);
2074     } catch (NoSuchMethodException e) {
2075       LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " +
2076           e.getMessage());
2077       return null;
2078     } catch (SecurityException e) {
2079       LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " +
2080           e.getMessage());
2081       return null;
2082     }
2083     m.setAccessible(true);
2084     try {
2085       return (DFSHedgedReadMetrics)m.invoke(dfsclient);
2086     } catch (IllegalAccessException e) {
2087       LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " +
2088           e.getMessage());
2089       return null;
2090     } catch (IllegalArgumentException e) {
2091       LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " +
2092           e.getMessage());
2093       return null;
2094     } catch (InvocationTargetException e) {
2095       LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " +
2096           e.getMessage());
2097       return null;
2098     }
2099   }
2100 }