View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.util;
20  
21  import java.io.ByteArrayInputStream;
22  import java.io.DataInputStream;
23  import java.io.EOFException;
24  import java.io.FileNotFoundException;
25  import java.io.IOException;
26  import java.io.InputStream;
27  import java.io.InterruptedIOException;
28  import java.lang.reflect.InvocationTargetException;
29  import java.lang.reflect.Method;
30  import java.net.InetSocketAddress;
31  import java.net.URI;
32  import java.net.URISyntaxException;
33  import java.util.ArrayList;
34  import java.util.Collections;
35  import java.util.HashMap;
36  import java.util.LinkedList;
37  import java.util.List;
38  import java.util.Map;
39  import java.util.concurrent.ArrayBlockingQueue;
40  import java.util.concurrent.ConcurrentHashMap;
41  import java.util.concurrent.ThreadPoolExecutor;
42  import java.util.concurrent.TimeUnit;
43  import java.util.regex.Pattern;
44  
45  import org.apache.commons.logging.Log;
46  import org.apache.commons.logging.LogFactory;
47  import org.apache.hadoop.hbase.classification.InterfaceAudience;
48  import org.apache.hadoop.conf.Configuration;
49  import org.apache.hadoop.fs.BlockLocation;
50  import org.apache.hadoop.fs.FSDataInputStream;
51  import org.apache.hadoop.fs.FSDataOutputStream;
52  import org.apache.hadoop.fs.FileStatus;
53  import org.apache.hadoop.fs.FileSystem;
54  import org.apache.hadoop.fs.Path;
55  import org.apache.hadoop.fs.PathFilter;
56  import org.apache.hadoop.fs.permission.FsAction;
57  import org.apache.hadoop.fs.permission.FsPermission;
58  import org.apache.hadoop.hbase.ClusterId;
59  import org.apache.hadoop.hbase.HColumnDescriptor;
60  import org.apache.hadoop.hbase.HConstants;
61  import org.apache.hadoop.hbase.HDFSBlocksDistribution;
62  import org.apache.hadoop.hbase.HRegionInfo;
63  import org.apache.hadoop.hbase.TableName;
64  import org.apache.hadoop.hbase.exceptions.DeserializationException;
65  import org.apache.hadoop.hbase.fs.HFileSystem;
66  import org.apache.hadoop.hbase.master.HMaster;
67  import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
68  import org.apache.hadoop.hbase.security.AccessDeniedException;
69  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
70  import org.apache.hadoop.hbase.protobuf.generated.FSProtos;
71  import org.apache.hadoop.hbase.regionserver.HRegion;
72  import org.apache.hadoop.hdfs.DFSClient;
73  import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
74  import org.apache.hadoop.hdfs.DistributedFileSystem;
75  import org.apache.hadoop.io.IOUtils;
76  import org.apache.hadoop.io.SequenceFile;
77  import org.apache.hadoop.ipc.RemoteException;
78  import org.apache.hadoop.security.UserGroupInformation;
79  import org.apache.hadoop.util.Progressable;
80  import org.apache.hadoop.util.ReflectionUtils;
81  import org.apache.hadoop.util.StringUtils;
82  
83  import com.google.common.primitives.Ints;
84  import com.google.protobuf.InvalidProtocolBufferException;
85  
86  /**
87   * Utility methods for interacting with the underlying file system.
88   */
89  @InterfaceAudience.Private
90  public abstract class FSUtils {
91    private static final Log LOG = LogFactory.getLog(FSUtils.class);
92  
93    /** Full access permissions (starting point for a umask) */
94    public static final String FULL_RWX_PERMISSIONS = "777";
95    private static final String THREAD_POOLSIZE = "hbase.client.localityCheck.threadPoolSize";
96    private static final int DEFAULT_THREAD_POOLSIZE = 2;
97  
98    /** Set to true on Windows platforms */
99    public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
100 
101   protected FSUtils() {
102     super();
103   }
104 
105   /*
106    * Sets storage policy for given path according to config setting
107    * @param fs
108    * @param conf
109    * @param path the Path whose storage policy is to be set
110    * @param policyKey
111    * @param defaultPolicy
112    */
113   public static void setStoragePolicy(final FileSystem fs, final Configuration conf,
114       final Path path, final String policyKey, final String defaultPolicy) {
115     String storagePolicy = conf.get(policyKey, defaultPolicy).toUpperCase();
116     if (!storagePolicy.equals(defaultPolicy) &&
117         fs instanceof DistributedFileSystem) {
118       DistributedFileSystem dfs = (DistributedFileSystem)fs;
119       Class<? extends DistributedFileSystem> dfsClass = dfs.getClass();
120       Method m = null;
121       try {
122         m = dfsClass.getDeclaredMethod("setStoragePolicy",
123             new Class<?>[] { Path.class, String.class });
124         m.setAccessible(true);
125       } catch (NoSuchMethodException e) {
126         LOG.info("FileSystem doesn't support"
127             + " setStoragePolicy; --HDFS-7228 not available");
128       } catch (SecurityException e) {
129         LOG.info("Doesn't have access to setStoragePolicy on "
130             + "FileSystems --HDFS-7228 not available", e);
131         m = null; // could happen on setAccessible()
132       }
133       if (m != null) {
134         try {
135           m.invoke(dfs, path, storagePolicy);
136           LOG.info("set " + storagePolicy + " for " + path);
137         } catch (Exception e) {
138           LOG.warn("Unable to set " + storagePolicy + " for " + path, e);
139         }
140       }
141     }
142   }
143 
144   /**
145    * Compare of path component. Does not consider schema; i.e. if schemas different but <code>path
146    * <code> starts with <code>rootPath<code>, then the function returns true
147    * @param rootPath
148    * @param path
149    * @return True if <code>path</code> starts with <code>rootPath</code>
150    */
151   public static boolean isStartingWithPath(final Path rootPath, final String path) {
152     String uriRootPath = rootPath.toUri().getPath();
153     String tailUriPath = (new Path(path)).toUri().getPath();
154     return tailUriPath.startsWith(uriRootPath);
155   }
156 
157   /**
158    * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
159    * '/a/b/c' part. Does not consider schema; i.e. if schemas different but path or subpath matches,
160    * the two will equate.
161    * @param pathToSearch Path we will be trying to match.
162    * @param pathTail
163    * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
164    */
165   public static boolean isMatchingTail(final Path pathToSearch, String pathTail) {
166     return isMatchingTail(pathToSearch, new Path(pathTail));
167   }
168 
169   /**
170    * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
171    * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true.  Does not consider
172    * schema; i.e. if schemas different but path or subpath matches, the two will equate.
173    * @param pathToSearch Path we will be trying to match.
174    * @param pathTail
175    * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
176    */
177   public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) {
178     if (pathToSearch.depth() != pathTail.depth()) return false;
179     Path tailPath = pathTail;
180     String tailName;
181     Path toSearch = pathToSearch;
182     String toSearchName;
183     boolean result = false;
184     do {
185       tailName = tailPath.getName();
186       if (tailName == null || tailName.length() <= 0) {
187         result = true;
188         break;
189       }
190       toSearchName = toSearch.getName();
191       if (toSearchName == null || toSearchName.length() <= 0) break;
192       // Move up a parent on each path for next go around.  Path doesn't let us go off the end.
193       tailPath = tailPath.getParent();
194       toSearch = toSearch.getParent();
195     } while(tailName.equals(toSearchName));
196     return result;
197   }
198 
199   public static FSUtils getInstance(FileSystem fs, Configuration conf) {
200     String scheme = fs.getUri().getScheme();
201     if (scheme == null) {
202       LOG.warn("Could not find scheme for uri " +
203           fs.getUri() + ", default to hdfs");
204       scheme = "hdfs";
205     }
206     Class<?> fsUtilsClass = conf.getClass("hbase.fsutil." +
207         scheme + ".impl", FSHDFSUtils.class); // Default to HDFS impl
208     FSUtils fsUtils = (FSUtils)ReflectionUtils.newInstance(fsUtilsClass, conf);
209     return fsUtils;
210   }
211 
212   /**
213    * Delete if exists.
214    * @param fs filesystem object
215    * @param dir directory to delete
216    * @return True if deleted <code>dir</code>
217    * @throws IOException e
218    */
219   public static boolean deleteDirectory(final FileSystem fs, final Path dir)
220   throws IOException {
221     return fs.exists(dir) && fs.delete(dir, true);
222   }
223 
224   /**
225    * Delete the region directory if exists.
226    * @param conf
227    * @param hri
228    * @return True if deleted the region directory.
229    * @throws IOException
230    */
231   public static boolean deleteRegionDir(final Configuration conf, final HRegionInfo hri)
232   throws IOException {
233     Path rootDir = getRootDir(conf);
234     FileSystem fs = rootDir.getFileSystem(conf);
235     return deleteDirectory(fs,
236       new Path(getTableDir(rootDir, hri.getTable()), hri.getEncodedName()));
237   }
238 
239   /**
240    * Return the number of bytes that large input files should be optimally
241    * be split into to minimize i/o time.
242    *
243    * use reflection to search for getDefaultBlockSize(Path f)
244    * if the method doesn't exist, fall back to using getDefaultBlockSize()
245    *
246    * @param fs filesystem object
247    * @return the default block size for the path's filesystem
248    * @throws IOException e
249    */
250   public static long getDefaultBlockSize(final FileSystem fs, final Path path) throws IOException {
251     Method m = null;
252     Class<? extends FileSystem> cls = fs.getClass();
253     try {
254       m = cls.getMethod("getDefaultBlockSize", new Class<?>[] { Path.class });
255     } catch (NoSuchMethodException e) {
256       LOG.info("FileSystem doesn't support getDefaultBlockSize");
257     } catch (SecurityException e) {
258       LOG.info("Doesn't have access to getDefaultBlockSize on FileSystems", e);
259       m = null; // could happen on setAccessible()
260     }
261     if (m == null) {
262       return fs.getDefaultBlockSize(path);
263     } else {
264       try {
265         Object ret = m.invoke(fs, path);
266         return ((Long)ret).longValue();
267       } catch (Exception e) {
268         throw new IOException(e);
269       }
270     }
271   }
272 
273   /*
274    * Get the default replication.
275    *
276    * use reflection to search for getDefaultReplication(Path f)
277    * if the method doesn't exist, fall back to using getDefaultReplication()
278    *
279    * @param fs filesystem object
280    * @param f path of file
281    * @return default replication for the path's filesystem
282    * @throws IOException e
283    */
284   public static short getDefaultReplication(final FileSystem fs, final Path path) throws IOException {
285     Method m = null;
286     Class<? extends FileSystem> cls = fs.getClass();
287     try {
288       m = cls.getMethod("getDefaultReplication", new Class<?>[] { Path.class });
289     } catch (NoSuchMethodException e) {
290       LOG.info("FileSystem doesn't support getDefaultReplication");
291     } catch (SecurityException e) {
292       LOG.info("Doesn't have access to getDefaultReplication on FileSystems", e);
293       m = null; // could happen on setAccessible()
294     }
295     if (m == null) {
296       return fs.getDefaultReplication(path);
297     } else {
298       try {
299         Object ret = m.invoke(fs, path);
300         return ((Number)ret).shortValue();
301       } catch (Exception e) {
302         throw new IOException(e);
303       }
304     }
305   }
306 
307   /**
308    * Returns the default buffer size to use during writes.
309    *
310    * The size of the buffer should probably be a multiple of hardware
311    * page size (4096 on Intel x86), and it determines how much data is
312    * buffered during read and write operations.
313    *
314    * @param fs filesystem object
315    * @return default buffer size to use during writes
316    */
317   public static int getDefaultBufferSize(final FileSystem fs) {
318     return fs.getConf().getInt("io.file.buffer.size", 4096);
319   }
320 
321   /**
322    * Create the specified file on the filesystem. By default, this will:
323    * <ol>
324    * <li>overwrite the file if it exists</li>
325    * <li>apply the umask in the configuration (if it is enabled)</li>
326    * <li>use the fs configured buffer size (or 4096 if not set)</li>
327    * <li>use the default replication</li>
328    * <li>use the default block size</li>
329    * <li>not track progress</li>
330    * </ol>
331    *
332    * @param fs {@link FileSystem} on which to write the file
333    * @param path {@link Path} to the file to write
334    * @param perm permissions
335    * @param favoredNodes
336    * @return output stream to the created file
337    * @throws IOException if the file cannot be created
338    */
339   public static FSDataOutputStream create(FileSystem fs, Path path,
340       FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException {
341     if (fs instanceof HFileSystem) {
342       FileSystem backingFs = ((HFileSystem)fs).getBackingFs();
343       if (backingFs instanceof DistributedFileSystem) {
344         // Try to use the favoredNodes version via reflection to allow backwards-
345         // compatibility.
346         try {
347           return (FSDataOutputStream) (DistributedFileSystem.class
348               .getDeclaredMethod("create", Path.class, FsPermission.class,
349                   boolean.class, int.class, short.class, long.class,
350                   Progressable.class, InetSocketAddress[].class)
351                   .invoke(backingFs, path, perm, true,
352                       getDefaultBufferSize(backingFs),
353                       getDefaultReplication(backingFs, path),
354                       getDefaultBlockSize(backingFs, path),
355                       null, favoredNodes));
356         } catch (InvocationTargetException ite) {
357           // Function was properly called, but threw it's own exception.
358           throw new IOException(ite.getCause());
359         } catch (NoSuchMethodException e) {
360           LOG.debug("DFS Client does not support most favored nodes create; using default create");
361           if (LOG.isTraceEnabled()) LOG.trace("Ignoring; use default create", e);
362         } catch (IllegalArgumentException e) {
363           LOG.debug("Ignoring (most likely Reflection related exception) " + e);
364         } catch (SecurityException e) {
365           LOG.debug("Ignoring (most likely Reflection related exception) " + e);
366         } catch (IllegalAccessException e) {
367           LOG.debug("Ignoring (most likely Reflection related exception) " + e);
368         }
369       }
370     }
371     return create(fs, path, perm, true);
372   }
373 
374   /**
375    * Create the specified file on the filesystem. By default, this will:
376    * <ol>
377    * <li>apply the umask in the configuration (if it is enabled)</li>
378    * <li>use the fs configured buffer size (or 4096 if not set)</li>
379    * <li>use the default replication</li>
380    * <li>use the default block size</li>
381    * <li>not track progress</li>
382    * </ol>
383    *
384    * @param fs {@link FileSystem} on which to write the file
385    * @param path {@link Path} to the file to write
386    * @param perm
387    * @param overwrite Whether or not the created file should be overwritten.
388    * @return output stream to the created file
389    * @throws IOException if the file cannot be created
390    */
391   public static FSDataOutputStream create(FileSystem fs, Path path,
392       FsPermission perm, boolean overwrite) throws IOException {
393     if (LOG.isTraceEnabled()) {
394       LOG.trace("Creating file=" + path + " with permission=" + perm + ", overwrite=" + overwrite);
395     }
396     return fs.create(path, perm, overwrite, getDefaultBufferSize(fs),
397         getDefaultReplication(fs, path), getDefaultBlockSize(fs, path), null);
398   }
399 
400   /**
401    * Get the file permissions specified in the configuration, if they are
402    * enabled.
403    *
404    * @param fs filesystem that the file will be created on.
405    * @param conf configuration to read for determining if permissions are
406    *          enabled and which to use
407    * @param permssionConfKey property key in the configuration to use when
408    *          finding the permission
409    * @return the permission to use when creating a new file on the fs. If
410    *         special permissions are not specified in the configuration, then
411    *         the default permissions on the the fs will be returned.
412    */
413   public static FsPermission getFilePermissions(final FileSystem fs,
414       final Configuration conf, final String permssionConfKey) {
415     boolean enablePermissions = conf.getBoolean(
416         HConstants.ENABLE_DATA_FILE_UMASK, false);
417 
418     if (enablePermissions) {
419       try {
420         FsPermission perm = new FsPermission(FULL_RWX_PERMISSIONS);
421         // make sure that we have a mask, if not, go default.
422         String mask = conf.get(permssionConfKey);
423         if (mask == null)
424           return FsPermission.getFileDefault();
425         // appy the umask
426         FsPermission umask = new FsPermission(mask);
427         return perm.applyUMask(umask);
428       } catch (IllegalArgumentException e) {
429         LOG.warn(
430             "Incorrect umask attempted to be created: "
431                 + conf.get(permssionConfKey)
432                 + ", using default file permissions.", e);
433         return FsPermission.getFileDefault();
434       }
435     }
436     return FsPermission.getFileDefault();
437   }
438 
439   /**
440    * Checks to see if the specified file system is available
441    *
442    * @param fs filesystem
443    * @throws IOException e
444    */
445   public static void checkFileSystemAvailable(final FileSystem fs)
446   throws IOException {
447     if (!(fs instanceof DistributedFileSystem)) {
448       return;
449     }
450     IOException exception = null;
451     DistributedFileSystem dfs = (DistributedFileSystem) fs;
452     try {
453       if (dfs.exists(new Path("/"))) {
454         return;
455       }
456     } catch (IOException e) {
457       exception = e instanceof RemoteException ?
458               ((RemoteException)e).unwrapRemoteException() : e;
459     }
460     try {
461       fs.close();
462     } catch (Exception e) {
463       LOG.error("file system close failed: ", e);
464     }
465     IOException io = new IOException("File system is not available");
466     io.initCause(exception);
467     throw io;
468   }
469 
470   /**
471    * We use reflection because {@link DistributedFileSystem#setSafeMode(
472    * FSConstants.SafeModeAction action, boolean isChecked)} is not in hadoop 1.1
473    *
474    * @param dfs
475    * @return whether we're in safe mode
476    * @throws IOException
477    */
478   private static boolean isInSafeMode(DistributedFileSystem dfs) throws IOException {
479     boolean inSafeMode = false;
480     try {
481       Method m = DistributedFileSystem.class.getMethod("setSafeMode", new Class<?> []{
482           org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction.class, boolean.class});
483       inSafeMode = (Boolean) m.invoke(dfs,
484         org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction.SAFEMODE_GET, true);
485     } catch (Exception e) {
486       if (e instanceof IOException) throw (IOException) e;
487 
488       // Check whether dfs is on safemode.
489       inSafeMode = dfs.setSafeMode(
490         org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction.SAFEMODE_GET);
491     }
492     return inSafeMode;
493   }
494 
495   /**
496    * Check whether dfs is in safemode.
497    * @param conf
498    * @throws IOException
499    */
500   public static void checkDfsSafeMode(final Configuration conf)
501   throws IOException {
502     boolean isInSafeMode = false;
503     FileSystem fs = FileSystem.get(conf);
504     if (fs instanceof DistributedFileSystem) {
505       DistributedFileSystem dfs = (DistributedFileSystem)fs;
506       isInSafeMode = isInSafeMode(dfs);
507     }
508     if (isInSafeMode) {
509       throw new IOException("File system is in safemode, it can't be written now");
510     }
511   }
512 
513   /**
514    * Verifies current version of file system
515    *
516    * @param fs filesystem object
517    * @param rootdir root hbase directory
518    * @return null if no version file exists, version string otherwise.
519    * @throws IOException e
520    * @throws org.apache.hadoop.hbase.exceptions.DeserializationException
521    */
522   public static String getVersion(FileSystem fs, Path rootdir)
523   throws IOException, DeserializationException {
524     Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
525     FileStatus[] status = null;
526     try {
527       // hadoop 2.0 throws FNFE if directory does not exist.
528       // hadoop 1.0 returns null if directory does not exist.
529       status = fs.listStatus(versionFile);
530     } catch (FileNotFoundException fnfe) {
531       return null;
532     }
533     if (status == null || status.length == 0) return null;
534     String version = null;
535     byte [] content = new byte [(int)status[0].getLen()];
536     FSDataInputStream s = fs.open(versionFile);
537     try {
538       IOUtils.readFully(s, content, 0, content.length);
539       if (ProtobufUtil.isPBMagicPrefix(content)) {
540         version = parseVersionFrom(content);
541       } else {
542         // Presume it pre-pb format.
543         InputStream is = new ByteArrayInputStream(content);
544         DataInputStream dis = new DataInputStream(is);
545         try {
546           version = dis.readUTF();
547         } finally {
548           dis.close();
549         }
550       }
551     } catch (EOFException eof) {
552       LOG.warn("Version file was empty, odd, will try to set it.");
553     } finally {
554       s.close();
555     }
556     return version;
557   }
558 
559   /**
560    * Parse the content of the ${HBASE_ROOTDIR}/hbase.version file.
561    * @param bytes The byte content of the hbase.version file.
562    * @return The version found in the file as a String.
563    * @throws DeserializationException
564    */
565   static String parseVersionFrom(final byte [] bytes)
566   throws DeserializationException {
567     ProtobufUtil.expectPBMagicPrefix(bytes);
568     int pblen = ProtobufUtil.lengthOfPBMagic();
569     FSProtos.HBaseVersionFileContent.Builder builder =
570       FSProtos.HBaseVersionFileContent.newBuilder();
571     FSProtos.HBaseVersionFileContent fileContent;
572     try {
573       fileContent = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
574       return fileContent.getVersion();
575     } catch (InvalidProtocolBufferException e) {
576       // Convert
577       throw new DeserializationException(e);
578     }
579   }
580 
581   /**
582    * Create the content to write into the ${HBASE_ROOTDIR}/hbase.version file.
583    * @param version Version to persist
584    * @return Serialized protobuf with <code>version</code> content and a bit of pb magic for a prefix.
585    */
586   static byte [] toVersionByteArray(final String version) {
587     FSProtos.HBaseVersionFileContent.Builder builder =
588       FSProtos.HBaseVersionFileContent.newBuilder();
589     return ProtobufUtil.prependPBMagic(builder.setVersion(version).build().toByteArray());
590   }
591 
592   /**
593    * Verifies current version of file system
594    *
595    * @param fs file system
596    * @param rootdir root directory of HBase installation
597    * @param message if true, issues a message on System.out
598    *
599    * @throws IOException e
600    * @throws DeserializationException
601    */
602   public static void checkVersion(FileSystem fs, Path rootdir, boolean message)
603   throws IOException, DeserializationException {
604     checkVersion(fs, rootdir, message, 0, HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
605   }
606 
607   /**
608    * Verifies current version of file system
609    *
610    * @param fs file system
611    * @param rootdir root directory of HBase installation
612    * @param message if true, issues a message on System.out
613    * @param wait wait interval
614    * @param retries number of times to retry
615    *
616    * @throws IOException e
617    * @throws DeserializationException
618    */
619   public static void checkVersion(FileSystem fs, Path rootdir,
620       boolean message, int wait, int retries)
621   throws IOException, DeserializationException {
622     String version = getVersion(fs, rootdir);
623     if (version == null) {
624       if (!metaRegionExists(fs, rootdir)) {
625         // rootDir is empty (no version file and no root region)
626         // just create new version file (HBASE-1195)
627         setVersion(fs, rootdir, wait, retries);
628         return;
629       }
630     } else if (version.compareTo(HConstants.FILE_SYSTEM_VERSION) == 0) return;
631 
632     // version is deprecated require migration
633     // Output on stdout so user sees it in terminal.
634     String msg = "HBase file layout needs to be upgraded."
635       + " You have version " + version
636       + " and I want version " + HConstants.FILE_SYSTEM_VERSION
637       + ". Consult http://hbase.apache.org/book.html for further information about upgrading HBase."
638       + " Is your hbase.rootdir valid? If so, you may need to run "
639       + "'hbase hbck -fixVersionFile'.";
640     if (message) {
641       System.out.println("WARNING! " + msg);
642     }
643     throw new FileSystemVersionException(msg);
644   }
645 
646   /**
647    * Sets version of file system
648    *
649    * @param fs filesystem object
650    * @param rootdir hbase root
651    * @throws IOException e
652    */
653   public static void setVersion(FileSystem fs, Path rootdir)
654   throws IOException {
655     setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, 0,
656       HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS);
657   }
658 
659   /**
660    * Sets version of file system
661    *
662    * @param fs filesystem object
663    * @param rootdir hbase root
664    * @param wait time to wait for retry
665    * @param retries number of times to retry before failing
666    * @throws IOException e
667    */
668   public static void setVersion(FileSystem fs, Path rootdir, int wait, int retries)
669   throws IOException {
670     setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, wait, retries);
671   }
672 
673 
674   /**
675    * Sets version of file system
676    *
677    * @param fs filesystem object
678    * @param rootdir hbase root directory
679    * @param version version to set
680    * @param wait time to wait for retry
681    * @param retries number of times to retry before throwing an IOException
682    * @throws IOException e
683    */
684   public static void setVersion(FileSystem fs, Path rootdir, String version,
685       int wait, int retries) throws IOException {
686     Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
687     Path tempVersionFile = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY + Path.SEPARATOR +
688       HConstants.VERSION_FILE_NAME);
689     while (true) {
690       try {
691         // Write the version to a temporary file
692         FSDataOutputStream s = fs.create(tempVersionFile);
693         try {
694           s.write(toVersionByteArray(version));
695           s.close();
696           s = null;
697           // Move the temp version file to its normal location. Returns false
698           // if the rename failed. Throw an IOE in that case.
699           if (!fs.rename(tempVersionFile, versionFile)) {
700             throw new IOException("Unable to move temp version file to " + versionFile);
701           }
702         } finally {
703           // Cleaning up the temporary if the rename failed would be trying
704           // too hard. We'll unconditionally create it again the next time
705           // through anyway, files are overwritten by default by create().
706 
707           // Attempt to close the stream on the way out if it is still open.
708           try {
709             if (s != null) s.close();
710           } catch (IOException ignore) { }
711         }
712         LOG.info("Created version file at " + rootdir.toString() + " with version=" + version);
713         return;
714       } catch (IOException e) {
715         if (retries > 0) {
716           LOG.debug("Unable to create version file at " + rootdir.toString() + ", retrying", e);
717           fs.delete(versionFile, false);
718           try {
719             if (wait > 0) {
720               Thread.sleep(wait);
721             }
722           } catch (InterruptedException ie) {
723             throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
724           }
725           retries--;
726         } else {
727           throw e;
728         }
729       }
730     }
731   }
732 
733   /**
734    * Checks that a cluster ID file exists in the HBase root directory
735    * @param fs the root directory FileSystem
736    * @param rootdir the HBase root directory in HDFS
737    * @param wait how long to wait between retries
738    * @return <code>true</code> if the file exists, otherwise <code>false</code>
739    * @throws IOException if checking the FileSystem fails
740    */
741   public static boolean checkClusterIdExists(FileSystem fs, Path rootdir,
742       int wait) throws IOException {
743     while (true) {
744       try {
745         Path filePath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
746         return fs.exists(filePath);
747       } catch (IOException ioe) {
748         if (wait > 0) {
749           LOG.warn("Unable to check cluster ID file in " + rootdir.toString() +
750               ", retrying in "+wait+"msec: "+StringUtils.stringifyException(ioe));
751           try {
752             Thread.sleep(wait);
753           } catch (InterruptedException e) {
754             throw (InterruptedIOException)new InterruptedIOException().initCause(e);
755           }
756         } else {
757           throw ioe;
758         }
759       }
760     }
761   }
762 
763   /**
764    * Returns the value of the unique cluster ID stored for this HBase instance.
765    * @param fs the root directory FileSystem
766    * @param rootdir the path to the HBase root directory
767    * @return the unique cluster identifier
768    * @throws IOException if reading the cluster ID file fails
769    */
770   public static ClusterId getClusterId(FileSystem fs, Path rootdir)
771   throws IOException {
772     Path idPath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
773     ClusterId clusterId = null;
774     FileStatus status = fs.exists(idPath)? fs.getFileStatus(idPath):  null;
775     if (status != null) {
776       int len = Ints.checkedCast(status.getLen());
777       byte [] content = new byte[len];
778       FSDataInputStream in = fs.open(idPath);
779       try {
780         in.readFully(content);
781       } catch (EOFException eof) {
782         LOG.warn("Cluster ID file " + idPath.toString() + " was empty");
783       } finally{
784         in.close();
785       }
786       try {
787         clusterId = ClusterId.parseFrom(content);
788       } catch (DeserializationException e) {
789         throw new IOException("content=" + Bytes.toString(content), e);
790       }
791       // If not pb'd, make it so.
792       if (!ProtobufUtil.isPBMagicPrefix(content)) {
793         String cid = null;
794         in = fs.open(idPath);
795         try {
796           cid = in.readUTF();
797           clusterId = new ClusterId(cid);
798         } catch (EOFException eof) {
799           LOG.warn("Cluster ID file " + idPath.toString() + " was empty");
800         } finally {
801           in.close();
802         }
803         rewriteAsPb(fs, rootdir, idPath, clusterId);
804       }
805       return clusterId;
806     } else {
807       LOG.warn("Cluster ID file does not exist at " + idPath.toString());
808     }
809     return clusterId;
810   }
811 
812   /**
813    * @param cid
814    * @throws IOException
815    */
816   private static void rewriteAsPb(final FileSystem fs, final Path rootdir, final Path p,
817       final ClusterId cid)
818   throws IOException {
819     // Rewrite the file as pb.  Move aside the old one first, write new
820     // then delete the moved-aside file.
821     Path movedAsideName = new Path(p + "." + System.currentTimeMillis());
822     if (!fs.rename(p, movedAsideName)) throw new IOException("Failed rename of " + p);
823     setClusterId(fs, rootdir, cid, 100);
824     if (!fs.delete(movedAsideName, false)) {
825       throw new IOException("Failed delete of " + movedAsideName);
826     }
827     LOG.debug("Rewrote the hbase.id file as pb");
828   }
829 
830   /**
831    * Writes a new unique identifier for this cluster to the "hbase.id" file
832    * in the HBase root directory
833    * @param fs the root directory FileSystem
834    * @param rootdir the path to the HBase root directory
835    * @param clusterId the unique identifier to store
836    * @param wait how long (in milliseconds) to wait between retries
837    * @throws IOException if writing to the FileSystem fails and no wait value
838    */
839   public static void setClusterId(FileSystem fs, Path rootdir, ClusterId clusterId,
840       int wait) throws IOException {
841     while (true) {
842       try {
843         Path idFile = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
844         Path tempIdFile = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY +
845           Path.SEPARATOR + HConstants.CLUSTER_ID_FILE_NAME);
846         // Write the id file to a temporary location
847         FSDataOutputStream s = fs.create(tempIdFile);
848         try {
849           s.write(clusterId.toByteArray());
850           s.close();
851           s = null;
852           // Move the temporary file to its normal location. Throw an IOE if
853           // the rename failed
854           if (!fs.rename(tempIdFile, idFile)) {
855             throw new IOException("Unable to move temp version file to " + idFile);
856           }
857         } finally {
858           // Attempt to close the stream if still open on the way out
859           try {
860             if (s != null) s.close();
861           } catch (IOException ignore) { }
862         }
863         if (LOG.isDebugEnabled()) {
864           LOG.debug("Created cluster ID file at " + idFile.toString() + " with ID: " + clusterId);
865         }
866         return;
867       } catch (IOException ioe) {
868         if (wait > 0) {
869           LOG.warn("Unable to create cluster ID file in " + rootdir.toString() +
870               ", retrying in " + wait + "msec: " + StringUtils.stringifyException(ioe));
871           try {
872             Thread.sleep(wait);
873           } catch (InterruptedException e) {
874             throw (InterruptedIOException)new InterruptedIOException().initCause(e);
875           }
876         } else {
877           throw ioe;
878         }
879       }
880     }
881   }
882 
883   /**
884    * Verifies root directory path is a valid URI with a scheme
885    *
886    * @param root root directory path
887    * @return Passed <code>root</code> argument.
888    * @throws IOException if not a valid URI with a scheme
889    */
890   public static Path validateRootPath(Path root) throws IOException {
891     try {
892       URI rootURI = new URI(root.toString());
893       String scheme = rootURI.getScheme();
894       if (scheme == null) {
895         throw new IOException("Root directory does not have a scheme");
896       }
897       return root;
898     } catch (URISyntaxException e) {
899       IOException io = new IOException("Root directory path is not a valid " +
900         "URI -- check your " + HConstants.HBASE_DIR + " configuration");
901       io.initCause(e);
902       throw io;
903     }
904   }
905 
906   /**
907    * Checks for the presence of the root path (using the provided conf object) in the given path. If
908    * it exists, this method removes it and returns the String representation of remaining relative path.
909    * @param path
910    * @param conf
911    * @return String representation of the remaining relative path
912    * @throws IOException
913    */
914   public static String removeRootPath(Path path, final Configuration conf) throws IOException {
915     Path root = FSUtils.getRootDir(conf);
916     String pathStr = path.toString();
917     // check that the path is absolute... it has the root path in it.
918     if (!pathStr.startsWith(root.toString())) return pathStr;
919     // if not, return as it is.
920     return pathStr.substring(root.toString().length() + 1);// remove the "/" too.
921   }
922 
923   /**
924    * If DFS, check safe mode and if so, wait until we clear it.
925    * @param conf configuration
926    * @param wait Sleep between retries
927    * @throws IOException e
928    */
929   public static void waitOnSafeMode(final Configuration conf,
930     final long wait)
931   throws IOException {
932     FileSystem fs = FileSystem.get(conf);
933     if (!(fs instanceof DistributedFileSystem)) return;
934     DistributedFileSystem dfs = (DistributedFileSystem)fs;
935     // Make sure dfs is not in safe mode
936     while (isInSafeMode(dfs)) {
937       LOG.info("Waiting for dfs to exit safe mode...");
938       try {
939         Thread.sleep(wait);
940       } catch (InterruptedException e) {
941         throw (InterruptedIOException)new InterruptedIOException().initCause(e);
942       }
943     }
944   }
945 
946   /**
947    * Return the 'path' component of a Path.  In Hadoop, Path is an URI.  This
948    * method returns the 'path' component of a Path's URI: e.g. If a Path is
949    * <code>hdfs://example.org:9000/hbase_trunk/TestTable/compaction.dir</code>,
950    * this method returns <code>/hbase_trunk/TestTable/compaction.dir</code>.
951    * This method is useful if you want to print out a Path without qualifying
952    * Filesystem instance.
953    * @param p Filesystem Path whose 'path' component we are to return.
954    * @return Path portion of the Filesystem
955    */
956   public static String getPath(Path p) {
957     return p.toUri().getPath();
958   }
959 
960   /**
961    * @param c configuration
962    * @return Path to hbase root directory: i.e. <code>hbase.rootdir</code> from
963    * configuration as a qualified Path.
964    * @throws IOException e
965    */
966   public static Path getRootDir(final Configuration c) throws IOException {
967     Path p = new Path(c.get(HConstants.HBASE_DIR));
968     FileSystem fs = p.getFileSystem(c);
969     return p.makeQualified(fs);
970   }
971 
972   public static void setRootDir(final Configuration c, final Path root) throws IOException {
973     c.set(HConstants.HBASE_DIR, root.toString());
974   }
975 
976   public static void setFsDefault(final Configuration c, final Path root) throws IOException {
977     c.set("fs.defaultFS", root.toString());    // for hadoop 0.21+
978   }
979 
980   /**
981    * Checks if meta region exists
982    *
983    * @param fs file system
984    * @param rootdir root directory of HBase installation
985    * @return true if exists
986    * @throws IOException e
987    */
988   @SuppressWarnings("deprecation")
989   public static boolean metaRegionExists(FileSystem fs, Path rootdir)
990   throws IOException {
991     Path metaRegionDir =
992       HRegion.getRegionDir(rootdir, HRegionInfo.FIRST_META_REGIONINFO);
993     return fs.exists(metaRegionDir);
994   }
995 
996   /**
997    * Compute HDFS blocks distribution of a given file, or a portion of the file
998    * @param fs file system
999    * @param status file status of the file
1000    * @param start start position of the portion
1001    * @param length length of the portion
1002    * @return The HDFS blocks distribution
1003    */
1004   static public HDFSBlocksDistribution computeHDFSBlocksDistribution(
1005     final FileSystem fs, FileStatus status, long start, long length)
1006     throws IOException {
1007     HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution();
1008     BlockLocation [] blockLocations =
1009       fs.getFileBlockLocations(status, start, length);
1010     for(BlockLocation bl : blockLocations) {
1011       String [] hosts = bl.getHosts();
1012       long len = bl.getLength();
1013       blocksDistribution.addHostsAndBlockWeight(hosts, len);
1014     }
1015 
1016     return blocksDistribution;
1017   }
1018 
1019 
1020 
1021   /**
1022    * Runs through the hbase rootdir and checks all stores have only
1023    * one file in them -- that is, they've been major compacted.  Looks
1024    * at root and meta tables too.
1025    * @param fs filesystem
1026    * @param hbaseRootDir hbase root directory
1027    * @return True if this hbase install is major compacted.
1028    * @throws IOException e
1029    */
1030   public static boolean isMajorCompacted(final FileSystem fs,
1031       final Path hbaseRootDir)
1032   throws IOException {
1033     List<Path> tableDirs = getTableDirs(fs, hbaseRootDir);
1034     PathFilter regionFilter = new RegionDirFilter(fs);
1035     PathFilter familyFilter = new FamilyDirFilter(fs);
1036     for (Path d : tableDirs) {
1037       FileStatus[] regionDirs = fs.listStatus(d, regionFilter);
1038       for (FileStatus regionDir : regionDirs) {
1039         Path dd = regionDir.getPath();
1040         // Else its a region name.  Now look in region for families.
1041         FileStatus[] familyDirs = fs.listStatus(dd, familyFilter);
1042         for (FileStatus familyDir : familyDirs) {
1043           Path family = familyDir.getPath();
1044           // Now in family make sure only one file.
1045           FileStatus[] familyStatus = fs.listStatus(family);
1046           if (familyStatus.length > 1) {
1047             LOG.debug(family.toString() + " has " + familyStatus.length +
1048                 " files.");
1049             return false;
1050           }
1051         }
1052       }
1053     }
1054     return true;
1055   }
1056 
1057   // TODO move this method OUT of FSUtils. No dependencies to HMaster
1058   /**
1059    * Returns the total overall fragmentation percentage. Includes hbase:meta and
1060    * -ROOT- as well.
1061    *
1062    * @param master  The master defining the HBase root and file system.
1063    * @return A map for each table and its percentage.
1064    * @throws IOException When scanning the directory fails.
1065    */
1066   public static int getTotalTableFragmentation(final HMaster master)
1067   throws IOException {
1068     Map<String, Integer> map = getTableFragmentation(master);
1069     return map != null && map.size() > 0 ? map.get("-TOTAL-") : -1;
1070   }
1071 
1072   /**
1073    * Runs through the HBase rootdir and checks how many stores for each table
1074    * have more than one file in them. Checks -ROOT- and hbase:meta too. The total
1075    * percentage across all tables is stored under the special key "-TOTAL-".
1076    *
1077    * @param master  The master defining the HBase root and file system.
1078    * @return A map for each table and its percentage.
1079    *
1080    * @throws IOException When scanning the directory fails.
1081    */
1082   public static Map<String, Integer> getTableFragmentation(
1083     final HMaster master)
1084   throws IOException {
1085     Path path = getRootDir(master.getConfiguration());
1086     // since HMaster.getFileSystem() is package private
1087     FileSystem fs = path.getFileSystem(master.getConfiguration());
1088     return getTableFragmentation(fs, path);
1089   }
1090 
1091   /**
1092    * Runs through the HBase rootdir and checks how many stores for each table
1093    * have more than one file in them. Checks -ROOT- and hbase:meta too. The total
1094    * percentage across all tables is stored under the special key "-TOTAL-".
1095    *
1096    * @param fs  The file system to use.
1097    * @param hbaseRootDir  The root directory to scan.
1098    * @return A map for each table and its percentage.
1099    * @throws IOException When scanning the directory fails.
1100    */
1101   public static Map<String, Integer> getTableFragmentation(
1102     final FileSystem fs, final Path hbaseRootDir)
1103   throws IOException {
1104     Map<String, Integer> frags = new HashMap<String, Integer>();
1105     int cfCountTotal = 0;
1106     int cfFragTotal = 0;
1107     PathFilter regionFilter = new RegionDirFilter(fs);
1108     PathFilter familyFilter = new FamilyDirFilter(fs);
1109     List<Path> tableDirs = getTableDirs(fs, hbaseRootDir);
1110     for (Path d : tableDirs) {
1111       int cfCount = 0;
1112       int cfFrag = 0;
1113       FileStatus[] regionDirs = fs.listStatus(d, regionFilter);
1114       for (FileStatus regionDir : regionDirs) {
1115         Path dd = regionDir.getPath();
1116         // else its a region name, now look in region for families
1117         FileStatus[] familyDirs = fs.listStatus(dd, familyFilter);
1118         for (FileStatus familyDir : familyDirs) {
1119           cfCount++;
1120           cfCountTotal++;
1121           Path family = familyDir.getPath();
1122           // now in family make sure only one file
1123           FileStatus[] familyStatus = fs.listStatus(family);
1124           if (familyStatus.length > 1) {
1125             cfFrag++;
1126             cfFragTotal++;
1127           }
1128         }
1129       }
1130       // compute percentage per table and store in result list
1131       frags.put(FSUtils.getTableName(d).getNameAsString(),
1132         cfCount == 0? 0: Math.round((float) cfFrag / cfCount * 100));
1133     }
1134     // set overall percentage for all tables
1135     frags.put("-TOTAL-",
1136       cfCountTotal == 0? 0: Math.round((float) cfFragTotal / cfCountTotal * 100));
1137     return frags;
1138   }
1139 
1140   /**
1141    * Returns the {@link org.apache.hadoop.fs.Path} object representing the table directory under
1142    * path rootdir
1143    *
1144    * @param rootdir qualified path of HBase root directory
1145    * @param tableName name of table
1146    * @return {@link org.apache.hadoop.fs.Path} for table
1147    */
1148   public static Path getTableDir(Path rootdir, final TableName tableName) {
1149     return new Path(getNamespaceDir(rootdir, tableName.getNamespaceAsString()),
1150         tableName.getQualifierAsString());
1151   }
1152 
1153   /**
1154    * Returns the {@link org.apache.hadoop.hbase.TableName} object representing
1155    * the table directory under
1156    * path rootdir
1157    *
1158    * @param tablePath path of table
1159    * @return {@link org.apache.hadoop.fs.Path} for table
1160    */
1161   public static TableName getTableName(Path tablePath) {
1162     return TableName.valueOf(tablePath.getParent().getName(), tablePath.getName());
1163   }
1164 
1165   /**
1166    * Returns the {@link org.apache.hadoop.fs.Path} object representing
1167    * the namespace directory under path rootdir
1168    *
1169    * @param rootdir qualified path of HBase root directory
1170    * @param namespace namespace name
1171    * @return {@link org.apache.hadoop.fs.Path} for table
1172    */
1173   public static Path getNamespaceDir(Path rootdir, final String namespace) {
1174     return new Path(rootdir, new Path(HConstants.BASE_NAMESPACE_DIR,
1175         new Path(namespace)));
1176   }
1177 
1178   /**
1179    * A {@link PathFilter} that returns only regular files.
1180    */
1181   static class FileFilter implements PathFilter {
1182     private final FileSystem fs;
1183 
1184     public FileFilter(final FileSystem fs) {
1185       this.fs = fs;
1186     }
1187 
1188     @Override
1189     public boolean accept(Path p) {
1190       try {
1191         return fs.isFile(p);
1192       } catch (IOException e) {
1193         LOG.debug("unable to verify if path=" + p + " is a regular file", e);
1194         return false;
1195       }
1196     }
1197   }
1198 
1199   /**
1200    * Directory filter that doesn't include any of the directories in the specified blacklist
1201    */
1202   public static class BlackListDirFilter implements PathFilter {
1203     private final FileSystem fs;
1204     private List<String> blacklist;
1205 
1206     /**
1207      * Create a filter on the give filesystem with the specified blacklist
1208      * @param fs filesystem to filter
1209      * @param directoryNameBlackList list of the names of the directories to filter. If
1210      *          <tt>null</tt>, all directories are returned
1211      */
1212     @SuppressWarnings("unchecked")
1213     public BlackListDirFilter(final FileSystem fs, final List<String> directoryNameBlackList) {
1214       this.fs = fs;
1215       blacklist =
1216         (List<String>) (directoryNameBlackList == null ? Collections.emptyList()
1217           : directoryNameBlackList);
1218     }
1219 
1220     @Override
1221     public boolean accept(Path p) {
1222       boolean isValid = false;
1223       try {
1224         if (isValidName(p.getName())) {
1225           isValid = fs.getFileStatus(p).isDirectory();
1226         } else {
1227           isValid = false;
1228         }
1229       } catch (IOException e) {
1230         LOG.warn("An error occurred while verifying if [" + p.toString()
1231             + "] is a valid directory. Returning 'not valid' and continuing.", e);
1232       }
1233       return isValid;
1234     }
1235 
1236     protected boolean isValidName(final String name) {
1237       return !blacklist.contains(name);
1238     }
1239   }
1240 
1241   /**
1242    * A {@link PathFilter} that only allows directories.
1243    */
1244   public static class DirFilter extends BlackListDirFilter {
1245 
1246     public DirFilter(FileSystem fs) {
1247       super(fs, null);
1248     }
1249   }
1250 
1251   /**
1252    * A {@link PathFilter} that returns usertable directories. To get all directories use the
1253    * {@link BlackListDirFilter} with a <tt>null</tt> blacklist
1254    */
1255   public static class UserTableDirFilter extends BlackListDirFilter {
1256     public UserTableDirFilter(FileSystem fs) {
1257       super(fs, HConstants.HBASE_NON_TABLE_DIRS);
1258     }
1259 
1260     protected boolean isValidName(final String name) {
1261       if (!super.isValidName(name))
1262         return false;
1263 
1264       try {
1265         TableName.isLegalTableQualifierName(Bytes.toBytes(name));
1266       } catch (IllegalArgumentException e) {
1267         LOG.info("INVALID NAME " + name);
1268         return false;
1269       }
1270       return true;
1271     }
1272   }
1273 
1274   /**
1275    * Heuristic to determine whether is safe or not to open a file for append
1276    * Looks both for dfs.support.append and use reflection to search
1277    * for SequenceFile.Writer.syncFs() or FSDataOutputStream.hflush()
1278    * @param conf
1279    * @return True if append support
1280    */
1281   public static boolean isAppendSupported(final Configuration conf) {
1282     boolean append = conf.getBoolean("dfs.support.append", false);
1283     if (append) {
1284       try {
1285         // TODO: The implementation that comes back when we do a createWriter
1286         // may not be using SequenceFile so the below is not a definitive test.
1287         // Will do for now (hdfs-200).
1288         SequenceFile.Writer.class.getMethod("syncFs", new Class<?> []{});
1289         append = true;
1290       } catch (SecurityException e) {
1291       } catch (NoSuchMethodException e) {
1292         append = false;
1293       }
1294     }
1295     if (!append) {
1296       // Look for the 0.21, 0.22, new-style append evidence.
1297       try {
1298         FSDataOutputStream.class.getMethod("hflush", new Class<?> []{});
1299         append = true;
1300       } catch (NoSuchMethodException e) {
1301         append = false;
1302       }
1303     }
1304     return append;
1305   }
1306 
1307   /**
1308    * @param conf
1309    * @return True if this filesystem whose scheme is 'hdfs'.
1310    * @throws IOException
1311    */
1312   public static boolean isHDFS(final Configuration conf) throws IOException {
1313     FileSystem fs = FileSystem.get(conf);
1314     String scheme = fs.getUri().getScheme();
1315     return scheme.equalsIgnoreCase("hdfs");
1316   }
1317 
1318   /**
1319    * Recover file lease. Used when a file might be suspect
1320    * to be had been left open by another process.
1321    * @param fs FileSystem handle
1322    * @param p Path of file to recover lease
1323    * @param conf Configuration handle
1324    * @throws IOException
1325    */
1326   public abstract void recoverFileLease(final FileSystem fs, final Path p,
1327       Configuration conf, CancelableProgressable reporter) throws IOException;
1328 
1329   public static List<Path> getTableDirs(final FileSystem fs, final Path rootdir)
1330       throws IOException {
1331     List<Path> tableDirs = new LinkedList<Path>();
1332 
1333     for(FileStatus status :
1334         fs.globStatus(new Path(rootdir,
1335             new Path(HConstants.BASE_NAMESPACE_DIR, "*")))) {
1336       tableDirs.addAll(FSUtils.getLocalTableDirs(fs, status.getPath()));
1337     }
1338     return tableDirs;
1339   }
1340 
1341   /**
1342    * @param fs
1343    * @param rootdir
1344    * @return All the table directories under <code>rootdir</code>. Ignore non table hbase folders such as
1345    * .logs, .oldlogs, .corrupt folders.
1346    * @throws IOException
1347    */
1348   public static List<Path> getLocalTableDirs(final FileSystem fs, final Path rootdir)
1349       throws IOException {
1350     // presumes any directory under hbase.rootdir is a table
1351     FileStatus[] dirs = fs.listStatus(rootdir, new UserTableDirFilter(fs));
1352     List<Path> tabledirs = new ArrayList<Path>(dirs.length);
1353     for (FileStatus dir: dirs) {
1354       tabledirs.add(dir.getPath());
1355     }
1356     return tabledirs;
1357   }
1358 
1359   /**
1360    * Checks if the given path is the one with 'recovered.edits' dir.
1361    * @param path
1362    * @return True if we recovered edits
1363    */
1364   public static boolean isRecoveredEdits(Path path) {
1365     return path.toString().contains(HConstants.RECOVERED_EDITS_DIR);
1366   }
1367 
1368   /**
1369    * Filter for all dirs that don't start with '.'
1370    */
1371   public static class RegionDirFilter implements PathFilter {
1372     // This pattern will accept 0.90+ style hex region dirs and older numeric region dir names.
1373     final public static Pattern regionDirPattern = Pattern.compile("^[0-9a-f]*$");
1374     final FileSystem fs;
1375 
1376     public RegionDirFilter(FileSystem fs) {
1377       this.fs = fs;
1378     }
1379 
1380     @Override
1381     public boolean accept(Path rd) {
1382       if (!regionDirPattern.matcher(rd.getName()).matches()) {
1383         return false;
1384       }
1385 
1386       try {
1387         return fs.getFileStatus(rd).isDirectory();
1388       } catch (IOException ioe) {
1389         // Maybe the file was moved or the fs was disconnected.
1390         LOG.warn("Skipping file " + rd +" due to IOException", ioe);
1391         return false;
1392       }
1393     }
1394   }
1395 
1396   /**
1397    * Given a particular table dir, return all the regiondirs inside it, excluding files such as
1398    * .tableinfo
1399    * @param fs A file system for the Path
1400    * @param tableDir Path to a specific table directory <hbase.rootdir>/<tabledir>
1401    * @return List of paths to valid region directories in table dir.
1402    * @throws IOException
1403    */
1404   public static List<Path> getRegionDirs(final FileSystem fs, final Path tableDir) throws IOException {
1405     // assumes we are in a table dir.
1406     FileStatus[] rds = fs.listStatus(tableDir, new RegionDirFilter(fs));
1407     List<Path> regionDirs = new ArrayList<Path>(rds.length);
1408     for (FileStatus rdfs: rds) {
1409       Path rdPath = rdfs.getPath();
1410       regionDirs.add(rdPath);
1411     }
1412     return regionDirs;
1413   }
1414 
1415   /**
1416    * Filter for all dirs that are legal column family names.  This is generally used for colfam
1417    * dirs <hbase.rootdir>/<tabledir>/<regiondir>/<colfamdir>.
1418    */
1419   public static class FamilyDirFilter implements PathFilter {
1420     final FileSystem fs;
1421 
1422     public FamilyDirFilter(FileSystem fs) {
1423       this.fs = fs;
1424     }
1425 
1426     @Override
1427     public boolean accept(Path rd) {
1428       try {
1429         // throws IAE if invalid
1430         HColumnDescriptor.isLegalFamilyName(Bytes.toBytes(rd.getName()));
1431       } catch (IllegalArgumentException iae) {
1432         // path name is an invalid family name and thus is excluded.
1433         return false;
1434       }
1435 
1436       try {
1437         return fs.getFileStatus(rd).isDirectory();
1438       } catch (IOException ioe) {
1439         // Maybe the file was moved or the fs was disconnected.
1440         LOG.warn("Skipping file " + rd +" due to IOException", ioe);
1441         return false;
1442       }
1443     }
1444   }
1445 
1446   /**
1447    * Given a particular region dir, return all the familydirs inside it
1448    *
1449    * @param fs A file system for the Path
1450    * @param regionDir Path to a specific region directory
1451    * @return List of paths to valid family directories in region dir.
1452    * @throws IOException
1453    */
1454   public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir) throws IOException {
1455     // assumes we are in a region dir.
1456     FileStatus[] fds = fs.listStatus(regionDir, new FamilyDirFilter(fs));
1457     List<Path> familyDirs = new ArrayList<Path>(fds.length);
1458     for (FileStatus fdfs: fds) {
1459       Path fdPath = fdfs.getPath();
1460       familyDirs.add(fdPath);
1461     }
1462     return familyDirs;
1463   }
1464 
1465   public static List<Path> getReferenceFilePaths(final FileSystem fs, final Path familyDir) throws IOException {
1466     FileStatus[] fds = fs.listStatus(familyDir, new ReferenceFileFilter(fs));
1467     List<Path> referenceFiles = new ArrayList<Path>(fds.length);
1468     for (FileStatus fdfs: fds) {
1469       Path fdPath = fdfs.getPath();
1470       referenceFiles.add(fdPath);
1471     }
1472     return referenceFiles;
1473   }
1474 
1475   /**
1476    * Filter for HFiles that excludes reference files.
1477    */
1478   public static class HFileFilter implements PathFilter {
1479     final FileSystem fs;
1480 
1481     public HFileFilter(FileSystem fs) {
1482       this.fs = fs;
1483     }
1484 
1485     @Override
1486     public boolean accept(Path rd) {
1487       try {
1488         // only files
1489         return !fs.getFileStatus(rd).isDirectory() && StoreFileInfo.isHFile(rd);
1490       } catch (IOException ioe) {
1491         // Maybe the file was moved or the fs was disconnected.
1492         LOG.warn("Skipping file " + rd +" due to IOException", ioe);
1493         return false;
1494       }
1495     }
1496   }
1497 
1498   public static class ReferenceFileFilter implements PathFilter {
1499 
1500     private final FileSystem fs;
1501 
1502     public ReferenceFileFilter(FileSystem fs) {
1503       this.fs = fs;
1504     }
1505 
1506     @Override
1507     public boolean accept(Path rd) {
1508       try {
1509         // only files can be references.
1510         return !fs.getFileStatus(rd).isDirectory() && StoreFileInfo.isReference(rd);
1511       } catch (IOException ioe) {
1512         // Maybe the file was moved or the fs was disconnected.
1513         LOG.warn("Skipping file " + rd +" due to IOException", ioe);
1514         return false;
1515       }
1516     }
1517   }
1518 
1519 
1520   /**
1521    * @param conf
1522    * @return Returns the filesystem of the hbase rootdir.
1523    * @throws IOException
1524    */
1525   public static FileSystem getCurrentFileSystem(Configuration conf)
1526   throws IOException {
1527     return getRootDir(conf).getFileSystem(conf);
1528   }
1529 
1530 
1531   /**
1532    * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
1533    * table StoreFile names to the full Path.
1534    * <br>
1535    * Example...<br>
1536    * Key = 3944417774205889744  <br>
1537    * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1538    *
1539    * @param map map to add values.  If null, this method will create and populate one to return
1540    * @param fs  The file system to use.
1541    * @param hbaseRootDir  The root directory to scan.
1542    * @param tableName name of the table to scan.
1543    * @return Map keyed by StoreFile name with a value of the full Path.
1544    * @throws IOException When scanning the directory fails.
1545    */
1546   public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map,
1547   final FileSystem fs, final Path hbaseRootDir, TableName tableName)
1548   throws IOException {
1549     if (map == null) {
1550       map = new HashMap<String, Path>();
1551     }
1552 
1553     // only include the directory paths to tables
1554     Path tableDir = FSUtils.getTableDir(hbaseRootDir, tableName);
1555     // Inside a table, there are compaction.dir directories to skip.  Otherwise, all else
1556     // should be regions.
1557     PathFilter familyFilter = new FamilyDirFilter(fs);
1558     FileStatus[] regionDirs = fs.listStatus(tableDir, new RegionDirFilter(fs));
1559     for (FileStatus regionDir : regionDirs) {
1560       Path dd = regionDir.getPath();
1561       // else its a region name, now look in region for families
1562       FileStatus[] familyDirs = fs.listStatus(dd, familyFilter);
1563       for (FileStatus familyDir : familyDirs) {
1564         Path family = familyDir.getPath();
1565         if (family.getName().equals(HConstants.RECOVERED_EDITS_DIR)) {
1566           continue;
1567         }
1568         // now in family, iterate over the StoreFiles and
1569         // put in map
1570         FileStatus[] familyStatus = fs.listStatus(family);
1571         for (FileStatus sfStatus : familyStatus) {
1572           Path sf = sfStatus.getPath();
1573           map.put( sf.getName(), sf);
1574         }
1575       }
1576     }
1577     return map;
1578   }
1579 
1580   public static int getRegionReferenceFileCount(final FileSystem fs, final Path p) {
1581     int result = 0;
1582     try {
1583       for (Path familyDir:getFamilyDirs(fs, p)){
1584         result += getReferenceFilePaths(fs, familyDir).size();
1585       }
1586     } catch (IOException e) {
1587       LOG.warn("Error Counting reference files.", e);
1588     }
1589     return result;
1590   }
1591 
1592 
1593   /**
1594    * Runs through the HBase rootdir and creates a reverse lookup map for
1595    * table StoreFile names to the full Path.
1596    * <br>
1597    * Example...<br>
1598    * Key = 3944417774205889744  <br>
1599    * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
1600    *
1601    * @param fs  The file system to use.
1602    * @param hbaseRootDir  The root directory to scan.
1603    * @return Map keyed by StoreFile name with a value of the full Path.
1604    * @throws IOException When scanning the directory fails.
1605    */
1606   public static Map<String, Path> getTableStoreFilePathMap(
1607     final FileSystem fs, final Path hbaseRootDir)
1608   throws IOException {
1609     Map<String, Path> map = new HashMap<String, Path>();
1610 
1611     // if this method looks similar to 'getTableFragmentation' that is because
1612     // it was borrowed from it.
1613 
1614     // only include the directory paths to tables
1615     for (Path tableDir : FSUtils.getTableDirs(fs, hbaseRootDir)) {
1616       getTableStoreFilePathMap(map, fs, hbaseRootDir,
1617           FSUtils.getTableName(tableDir));
1618     }
1619     return map;
1620   }
1621 
1622   /**
1623    * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
1624    * This accommodates differences between hadoop versions, where hadoop 1
1625    * does not throw a FileNotFoundException, and return an empty FileStatus[]
1626    * while Hadoop 2 will throw FileNotFoundException.
1627    *
1628    * @param fs file system
1629    * @param dir directory
1630    * @param filter path filter
1631    * @return null if dir is empty or doesn't exist, otherwise FileStatus array
1632    */
1633   public static FileStatus [] listStatus(final FileSystem fs,
1634       final Path dir, final PathFilter filter) throws IOException {
1635     FileStatus [] status = null;
1636     try {
1637       status = filter == null ? fs.listStatus(dir) : fs.listStatus(dir, filter);
1638     } catch (FileNotFoundException fnfe) {
1639       // if directory doesn't exist, return null
1640       if (LOG.isTraceEnabled()) {
1641         LOG.trace(dir + " doesn't exist");
1642       }
1643     }
1644     if (status == null || status.length < 1) return null;
1645     return status;
1646   }
1647 
1648   /**
1649    * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
1650    * This would accommodates differences between hadoop versions
1651    *
1652    * @param fs file system
1653    * @param dir directory
1654    * @return null if dir is empty or doesn't exist, otherwise FileStatus array
1655    */
1656   public static FileStatus[] listStatus(final FileSystem fs, final Path dir) throws IOException {
1657     return listStatus(fs, dir, null);
1658   }
1659 
1660   /**
1661    * Calls fs.delete() and returns the value returned by the fs.delete()
1662    *
1663    * @param fs
1664    * @param path
1665    * @param recursive
1666    * @return the value returned by the fs.delete()
1667    * @throws IOException
1668    */
1669   public static boolean delete(final FileSystem fs, final Path path, final boolean recursive)
1670       throws IOException {
1671     return fs.delete(path, recursive);
1672   }
1673 
1674   /**
1675    * Calls fs.exists(). Checks if the specified path exists
1676    *
1677    * @param fs
1678    * @param path
1679    * @return the value returned by fs.exists()
1680    * @throws IOException
1681    */
1682   public static boolean isExists(final FileSystem fs, final Path path) throws IOException {
1683     return fs.exists(path);
1684   }
1685 
1686   /**
1687    * Throw an exception if an action is not permitted by a user on a file.
1688    *
1689    * @param ugi
1690    *          the user
1691    * @param file
1692    *          the file
1693    * @param action
1694    *          the action
1695    */
1696   public static void checkAccess(UserGroupInformation ugi, FileStatus file,
1697       FsAction action) throws AccessDeniedException {
1698     if (ugi.getShortUserName().equals(file.getOwner())) {
1699       if (file.getPermission().getUserAction().implies(action)) {
1700         return;
1701       }
1702     } else if (contains(ugi.getGroupNames(), file.getGroup())) {
1703       if (file.getPermission().getGroupAction().implies(action)) {
1704         return;
1705       }
1706     } else if (file.getPermission().getOtherAction().implies(action)) {
1707       return;
1708     }
1709     throw new AccessDeniedException("Permission denied:" + " action=" + action
1710         + " path=" + file.getPath() + " user=" + ugi.getShortUserName());
1711   }
1712 
1713   private static boolean contains(String[] groups, String user) {
1714     for (String group : groups) {
1715       if (group.equals(user)) {
1716         return true;
1717       }
1718     }
1719     return false;
1720   }
1721 
1722   /**
1723    * Log the current state of the filesystem from a certain root directory
1724    * @param fs filesystem to investigate
1725    * @param root root file/directory to start logging from
1726    * @param LOG log to output information
1727    * @throws IOException if an unexpected exception occurs
1728    */
1729   public static void logFileSystemState(final FileSystem fs, final Path root, Log LOG)
1730       throws IOException {
1731     LOG.debug("Current file system:");
1732     logFSTree(LOG, fs, root, "|-");
1733   }
1734 
1735   /**
1736    * Recursive helper to log the state of the FS
1737    *
1738    * @see #logFileSystemState(FileSystem, Path, Log)
1739    */
1740   private static void logFSTree(Log LOG, final FileSystem fs, final Path root, String prefix)
1741       throws IOException {
1742     FileStatus[] files = FSUtils.listStatus(fs, root, null);
1743     if (files == null) return;
1744 
1745     for (FileStatus file : files) {
1746       if (file.isDirectory()) {
1747         LOG.debug(prefix + file.getPath().getName() + "/");
1748         logFSTree(LOG, fs, file.getPath(), prefix + "---");
1749       } else {
1750         LOG.debug(prefix + file.getPath().getName());
1751       }
1752     }
1753   }
1754 
1755   public static boolean renameAndSetModifyTime(final FileSystem fs, final Path src, final Path dest)
1756       throws IOException {
1757     // set the modify time for TimeToLive Cleaner
1758     fs.setTimes(src, EnvironmentEdgeManager.currentTime(), -1);
1759     return fs.rename(src, dest);
1760   }
1761 
1762   /**
1763    * This function is to scan the root path of the file system to get the
1764    * degree of locality for each region on each of the servers having at least
1765    * one block of that region.
1766    * This is used by the tool {@link org.apache.hadoop.hbase.master.RegionPlacementMaintainer}
1767    *
1768    * @param conf
1769    *          the configuration to use
1770    * @return the mapping from region encoded name to a map of server names to
1771    *           locality fraction
1772    * @throws IOException
1773    *           in case of file system errors or interrupts
1774    */
1775   public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
1776       final Configuration conf) throws IOException {
1777     return getRegionDegreeLocalityMappingFromFS(
1778         conf, null,
1779         conf.getInt(THREAD_POOLSIZE, DEFAULT_THREAD_POOLSIZE));
1780 
1781   }
1782 
1783   /**
1784    * This function is to scan the root path of the file system to get the
1785    * degree of locality for each region on each of the servers having at least
1786    * one block of that region.
1787    *
1788    * @param conf
1789    *          the configuration to use
1790    * @param desiredTable
1791    *          the table you wish to scan locality for
1792    * @param threadPoolSize
1793    *          the thread pool size to use
1794    * @return the mapping from region encoded name to a map of server names to
1795    *           locality fraction
1796    * @throws IOException
1797    *           in case of file system errors or interrupts
1798    */
1799   public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS(
1800       final Configuration conf, final String desiredTable, int threadPoolSize)
1801       throws IOException {
1802     Map<String, Map<String, Float>> regionDegreeLocalityMapping =
1803         new ConcurrentHashMap<String, Map<String, Float>>();
1804     getRegionLocalityMappingFromFS(conf, desiredTable, threadPoolSize, null,
1805         regionDegreeLocalityMapping);
1806     return regionDegreeLocalityMapping;
1807   }
1808 
1809   /**
1810    * This function is to scan the root path of the file system to get either the
1811    * mapping between the region name and its best locality region server or the
1812    * degree of locality of each region on each of the servers having at least
1813    * one block of that region. The output map parameters are both optional.
1814    *
1815    * @param conf
1816    *          the configuration to use
1817    * @param desiredTable
1818    *          the table you wish to scan locality for
1819    * @param threadPoolSize
1820    *          the thread pool size to use
1821    * @param regionToBestLocalityRSMapping
1822    *          the map into which to put the best locality mapping or null
1823    * @param regionDegreeLocalityMapping
1824    *          the map into which to put the locality degree mapping or null,
1825    *          must be a thread-safe implementation
1826    * @throws IOException
1827    *           in case of file system errors or interrupts
1828    */
1829   private static void getRegionLocalityMappingFromFS(
1830       final Configuration conf, final String desiredTable,
1831       int threadPoolSize,
1832       Map<String, String> regionToBestLocalityRSMapping,
1833       Map<String, Map<String, Float>> regionDegreeLocalityMapping)
1834       throws IOException {
1835     FileSystem fs =  FileSystem.get(conf);
1836     Path rootPath = FSUtils.getRootDir(conf);
1837     long startTime = EnvironmentEdgeManager.currentTime();
1838     Path queryPath;
1839     // The table files are in ${hbase.rootdir}/data/<namespace>/<table>/*
1840     if (null == desiredTable) {
1841       queryPath = new Path(new Path(rootPath, HConstants.BASE_NAMESPACE_DIR).toString() + "/*/*/*/");
1842     } else {
1843       queryPath = new Path(FSUtils.getTableDir(rootPath, TableName.valueOf(desiredTable)).toString() + "/*/");
1844     }
1845 
1846     // reject all paths that are not appropriate
1847     PathFilter pathFilter = new PathFilter() {
1848       @Override
1849       public boolean accept(Path path) {
1850         // this is the region name; it may get some noise data
1851         if (null == path) {
1852           return false;
1853         }
1854 
1855         // no parent?
1856         Path parent = path.getParent();
1857         if (null == parent) {
1858           return false;
1859         }
1860 
1861         String regionName = path.getName();
1862         if (null == regionName) {
1863           return false;
1864         }
1865 
1866         if (!regionName.toLowerCase().matches("[0-9a-f]+")) {
1867           return false;
1868         }
1869         return true;
1870       }
1871     };
1872 
1873     FileStatus[] statusList = fs.globStatus(queryPath, pathFilter);
1874 
1875     if (null == statusList) {
1876       return;
1877     } else {
1878       LOG.debug("Query Path: " + queryPath + " ; # list of files: " +
1879           statusList.length);
1880     }
1881 
1882     // lower the number of threads in case we have very few expected regions
1883     threadPoolSize = Math.min(threadPoolSize, statusList.length);
1884 
1885     // run in multiple threads
1886     ThreadPoolExecutor tpe = new ThreadPoolExecutor(threadPoolSize,
1887         threadPoolSize, 60, TimeUnit.SECONDS,
1888         new ArrayBlockingQueue<Runnable>(statusList.length));
1889     try {
1890       // ignore all file status items that are not of interest
1891       for (FileStatus regionStatus : statusList) {
1892         if (null == regionStatus) {
1893           continue;
1894         }
1895 
1896         if (!regionStatus.isDirectory()) {
1897           continue;
1898         }
1899 
1900         Path regionPath = regionStatus.getPath();
1901         if (null == regionPath) {
1902           continue;
1903         }
1904 
1905         tpe.execute(new FSRegionScanner(fs, regionPath,
1906             regionToBestLocalityRSMapping, regionDegreeLocalityMapping));
1907       }
1908     } finally {
1909       tpe.shutdown();
1910       int threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY,
1911           60 * 1000);
1912       try {
1913         // here we wait until TPE terminates, which is either naturally or by
1914         // exceptions in the execution of the threads
1915         while (!tpe.awaitTermination(threadWakeFrequency,
1916             TimeUnit.MILLISECONDS)) {
1917           // printing out rough estimate, so as to not introduce
1918           // AtomicInteger
1919           LOG.info("Locality checking is underway: { Scanned Regions : "
1920               + tpe.getCompletedTaskCount() + "/"
1921               + tpe.getTaskCount() + " }");
1922         }
1923       } catch (InterruptedException e) {
1924         throw (InterruptedIOException)new InterruptedIOException().initCause(e);
1925       }
1926     }
1927 
1928     long overhead = EnvironmentEdgeManager.currentTime() - startTime;
1929     String overheadMsg = "Scan DFS for locality info takes " + overhead + " ms";
1930 
1931     LOG.info(overheadMsg);
1932   }
1933 
1934   /**
1935    * Do our short circuit read setup.
1936    * Checks buffer size to use and whether to do checksumming in hbase or hdfs.
1937    * @param conf
1938    */
1939   public static void setupShortCircuitRead(final Configuration conf) {
1940     // Check that the user has not set the "dfs.client.read.shortcircuit.skip.checksum" property.
1941     boolean shortCircuitSkipChecksum =
1942       conf.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false);
1943     boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
1944     if (shortCircuitSkipChecksum) {
1945       LOG.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not " +
1946         "be set to true." + (useHBaseChecksum ? " HBase checksum doesn't require " +
1947         "it, see https://issues.apache.org/jira/browse/HBASE-6868." : ""));
1948       assert !shortCircuitSkipChecksum; //this will fail if assertions are on
1949     }
1950     checkShortCircuitReadBufferSize(conf);
1951   }
1952 
1953   /**
1954    * Check if short circuit read buffer size is set and if not, set it to hbase value.
1955    * @param conf
1956    */
1957   public static void checkShortCircuitReadBufferSize(final Configuration conf) {
1958     final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2;
1959     final int notSet = -1;
1960     // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2
1961     final String dfsKey = "dfs.client.read.shortcircuit.buffer.size";
1962     int size = conf.getInt(dfsKey, notSet);
1963     // If a size is set, return -- we will use it.
1964     if (size != notSet) return;
1965     // But short circuit buffer size is normally not set.  Put in place the hbase wanted size.
1966     int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize);
1967     conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
1968   }
1969 
1970   /**
1971    * @param c
1972    * @return The DFSClient DFSHedgedReadMetrics instance or null if can't be found or not on hdfs.
1973    * @throws IOException 
1974    */
1975   public static DFSHedgedReadMetrics getDFSHedgedReadMetrics(final Configuration c)
1976       throws IOException {
1977     if (!isHDFS(c)) return null;
1978     // getHedgedReadMetrics is package private. Get the DFSClient instance that is internal
1979     // to the DFS FS instance and make the method getHedgedReadMetrics accessible, then invoke it
1980     // to get the singleton instance of DFSHedgedReadMetrics shared by DFSClients.
1981     final String name = "getHedgedReadMetrics";
1982     DFSClient dfsclient = ((DistributedFileSystem)FileSystem.get(c)).getClient();
1983     Method m;
1984     try {
1985       m = dfsclient.getClass().getDeclaredMethod(name);
1986     } catch (NoSuchMethodException e) {
1987       LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " +
1988           e.getMessage());
1989       return null;
1990     } catch (SecurityException e) {
1991       LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " +
1992           e.getMessage());
1993       return null;
1994     }
1995     m.setAccessible(true);
1996     try {
1997       return (DFSHedgedReadMetrics)m.invoke(dfsclient);
1998     } catch (IllegalAccessException e) {
1999       LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " +
2000           e.getMessage());
2001       return null;
2002     } catch (IllegalArgumentException e) {
2003       LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " +
2004           e.getMessage());
2005       return null;
2006     } catch (InvocationTargetException e) {
2007       LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " +
2008           e.getMessage());
2009       return null;
2010     }
2011   }
2012 }