001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import java.io.FileNotFoundException; 021import java.io.IOException; 022import java.lang.reflect.InvocationTargetException; 023import java.lang.reflect.Method; 024import java.net.URI; 025import java.net.URISyntaxException; 026import java.util.List; 027import java.util.Locale; 028import java.util.Map; 029import java.util.concurrent.ConcurrentHashMap; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FSDataOutputStream; 032import org.apache.hadoop.fs.FSDataOutputStreamBuilder; 033import org.apache.hadoop.fs.FileStatus; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.fs.LocatedFileStatus; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.fs.PathFilter; 038import org.apache.hadoop.fs.RemoteIterator; 039import org.apache.hadoop.fs.permission.FsPermission; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.yetus.audience.InterfaceAudience; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 047 048/** 049 * Utility methods for interacting with the underlying file system. 050 * <p/> 051 * Note that {@link #setStoragePolicy(FileSystem, Path, String)} is tested in TestFSUtils and 052 * pre-commit will run the hbase-server tests if there's code change in this class. See 053 * <a href="https://issues.apache.org/jira/browse/HBASE-20838">HBASE-20838</a> for more details. 054 */ 055@InterfaceAudience.Private 056public final class CommonFSUtils { 057 private static final Logger LOG = LoggerFactory.getLogger(CommonFSUtils.class); 058 059 /** Parameter name for HBase WAL directory */ 060 public static final String HBASE_WAL_DIR = "hbase.wal.dir"; 061 062 /** Parameter to disable stream capability enforcement checks */ 063 public static final String UNSAFE_STREAM_CAPABILITY_ENFORCE = 064 "hbase.unsafe.stream.capability.enforce"; 065 066 /** Full access permissions (starting point for a umask) */ 067 public static final String FULL_RWX_PERMISSIONS = "777"; 068 069 private CommonFSUtils() { 070 } 071 072 /** 073 * Compare of path component. Does not consider schema; i.e. if schemas different but 074 * <code>path</code> starts with <code>rootPath</code>, then the function returns true 075 * @param rootPath value to check for 076 * @param path subject to check 077 * @return True if <code>path</code> starts with <code>rootPath</code> 078 */ 079 public static boolean isStartingWithPath(final Path rootPath, final String path) { 080 String uriRootPath = rootPath.toUri().getPath(); 081 String tailUriPath = new Path(path).toUri().getPath(); 082 return tailUriPath.startsWith(uriRootPath); 083 } 084 085 /** 086 * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the 087 * '/a/b/c' part. Does not consider schema; i.e. if schemas different but path or subpath matches, 088 * the two will equate. 089 * @param pathToSearch Path we will be trying to match against. 090 * @param pathTail what to match 091 * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code> 092 */ 093 public static boolean isMatchingTail(final Path pathToSearch, String pathTail) { 094 return isMatchingTail(pathToSearch, new Path(pathTail)); 095 } 096 097 /** 098 * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the 099 * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true. Does not consider 100 * schema; i.e. if schemas different but path or subpath matches, the two will equate. 101 * @param pathToSearch Path we will be trying to match agains against 102 * @param pathTail what to match 103 * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code> 104 */ 105 public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) { 106 if (pathToSearch.depth() != pathTail.depth()) { 107 return false; 108 } 109 Path tailPath = pathTail; 110 String tailName; 111 Path toSearch = pathToSearch; 112 String toSearchName; 113 boolean result = false; 114 do { 115 tailName = tailPath.getName(); 116 if (tailName == null || tailName.length() <= 0) { 117 result = true; 118 break; 119 } 120 toSearchName = toSearch.getName(); 121 if (toSearchName == null || toSearchName.length() <= 0) { 122 break; 123 } 124 // Move up a parent on each path for next go around. Path doesn't let us go off the end. 125 tailPath = tailPath.getParent(); 126 toSearch = toSearch.getParent(); 127 } while (tailName.equals(toSearchName)); 128 return result; 129 } 130 131 /** 132 * Delete if exists. 133 * @param fs filesystem object 134 * @param dir directory to delete 135 * @return True if deleted <code>dir</code> 136 * @throws IOException e 137 */ 138 public static boolean deleteDirectory(final FileSystem fs, final Path dir) throws IOException { 139 return fs.exists(dir) && fs.delete(dir, true); 140 } 141 142 /** 143 * Return the number of bytes that large input files should be optimally be split into to minimize 144 * i/o time. 145 * @param fs filesystem object 146 * @return the default block size for the path's filesystem 147 */ 148 public static long getDefaultBlockSize(final FileSystem fs, final Path path) { 149 return fs.getDefaultBlockSize(path); 150 } 151 152 /* 153 * Get the default replication. 154 * @param fs filesystem object 155 * @param f path of file 156 * @return default replication for the path's filesystem 157 */ 158 public static short getDefaultReplication(final FileSystem fs, final Path path) { 159 return fs.getDefaultReplication(path); 160 } 161 162 /** 163 * Returns the default buffer size to use during writes. The size of the buffer should probably be 164 * a multiple of hardware page size (4096 on Intel x86), and it determines how much data is 165 * buffered during read and write operations. 166 * @param fs filesystem object 167 * @return default buffer size to use during writes 168 */ 169 public static int getDefaultBufferSize(final FileSystem fs) { 170 return fs.getConf().getInt("io.file.buffer.size", 4096); 171 } 172 173 /** 174 * Create the specified file on the filesystem. By default, this will: 175 * <ol> 176 * <li>apply the umask in the configuration (if it is enabled)</li> 177 * <li>use the fs configured buffer size (or 4096 if not set)</li> 178 * <li>use the default replication</li> 179 * <li>use the default block size</li> 180 * <li>not track progress</li> 181 * </ol> 182 * @param fs {@link FileSystem} on which to write the file 183 * @param path {@link Path} to the file to write 184 * @param perm intial permissions 185 * @param overwrite Whether or not the created file should be overwritten. 186 * @return output stream to the created file 187 * @throws IOException if the file cannot be created 188 */ 189 public static FSDataOutputStream create(FileSystem fs, Path path, FsPermission perm, 190 boolean overwrite) throws IOException { 191 if (LOG.isTraceEnabled()) { 192 LOG.trace("Creating file={} with permission={}, overwrite={}", path, perm, overwrite); 193 } 194 return fs.create(path, perm, overwrite, getDefaultBufferSize(fs), 195 getDefaultReplication(fs, path), getDefaultBlockSize(fs, path), null); 196 } 197 198 /** 199 * Get the file permissions specified in the configuration, if they are enabled. 200 * @param fs filesystem that the file will be created on. 201 * @param conf configuration to read for determining if permissions are enabled and 202 * which to use 203 * @param permssionConfKey property key in the configuration to use when finding the permission 204 * @return the permission to use when creating a new file on the fs. If special permissions are 205 * not specified in the configuration, then the default permissions on the the fs will be 206 * returned. 207 */ 208 public static FsPermission getFilePermissions(final FileSystem fs, final Configuration conf, 209 final String permssionConfKey) { 210 boolean enablePermissions = conf.getBoolean(HConstants.ENABLE_DATA_FILE_UMASK, false); 211 212 if (enablePermissions) { 213 try { 214 FsPermission perm = new FsPermission(FULL_RWX_PERMISSIONS); 215 // make sure that we have a mask, if not, go default. 216 String mask = conf.get(permssionConfKey); 217 if (mask == null) { 218 return FsPermission.getFileDefault(); 219 } 220 // appy the umask 221 FsPermission umask = new FsPermission(mask); 222 return perm.applyUMask(umask); 223 } catch (IllegalArgumentException e) { 224 LOG.warn("Incorrect umask attempted to be created: " + conf.get(permssionConfKey) 225 + ", using default file permissions.", e); 226 return FsPermission.getFileDefault(); 227 } 228 } 229 return FsPermission.getFileDefault(); 230 } 231 232 /** 233 * Verifies root directory path is a valid URI with a scheme 234 * @param root root directory path 235 * @return Passed <code>root</code> argument. 236 * @throws IOException if not a valid URI with a scheme 237 */ 238 public static Path validateRootPath(Path root) throws IOException { 239 try { 240 URI rootURI = new URI(root.toString()); 241 String scheme = rootURI.getScheme(); 242 if (scheme == null) { 243 throw new IOException("Root directory does not have a scheme"); 244 } 245 return root; 246 } catch (URISyntaxException e) { 247 throw new IOException("Root directory path is not a valid " + "URI -- check your " 248 + HConstants.HBASE_DIR + " configuration", e); 249 } 250 } 251 252 /** 253 * Checks for the presence of the WAL log root path (using the provided conf object) in the given 254 * path. If it exists, this method removes it and returns the String representation of remaining 255 * relative path. 256 * @param path must not be null 257 * @param conf must not be null 258 * @return String representation of the remaining relative path 259 * @throws IOException from underlying filesystem 260 */ 261 public static String removeWALRootPath(Path path, final Configuration conf) throws IOException { 262 Path root = getWALRootDir(conf); 263 String pathStr = path.toString(); 264 // check that the path is absolute... it has the root path in it. 265 if (!pathStr.startsWith(root.toString())) { 266 return pathStr; 267 } 268 // if not, return as it is. 269 return pathStr.substring(root.toString().length() + 1);// remove the "/" too. 270 } 271 272 /** 273 * Return the 'path' component of a Path. In Hadoop, Path is a URI. This method returns the 'path' 274 * component of a Path's URI: e.g. If a Path is 275 * <code>hdfs://example.org:9000/hbase_trunk/TestTable/compaction.dir</code>, this method returns 276 * <code>/hbase_trunk/TestTable/compaction.dir</code>. This method is useful if you want to print 277 * out a Path without qualifying Filesystem instance. 278 * @param p Filesystem Path whose 'path' component we are to return. 279 * @return Path portion of the Filesystem 280 */ 281 public static String getPath(Path p) { 282 return p.toUri().getPath(); 283 } 284 285 /** 286 * Get the path for the root data directory 287 * @param c configuration 288 * @return {@link Path} to hbase root directory from configuration as a qualified Path. 289 * @throws IOException e 290 */ 291 public static Path getRootDir(final Configuration c) throws IOException { 292 Path p = new Path(c.get(HConstants.HBASE_DIR)); 293 FileSystem fs = p.getFileSystem(c); 294 return p.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 295 } 296 297 public static void setRootDir(final Configuration c, final Path root) { 298 c.set(HConstants.HBASE_DIR, root.toString()); 299 } 300 301 public static void setFsDefault(final Configuration c, final Path root) { 302 c.set("fs.defaultFS", root.toString()); // for hadoop 0.21+ 303 } 304 305 public static void setFsDefault(final Configuration c, final String uri) { 306 c.set("fs.defaultFS", uri); // for hadoop 0.21+ 307 } 308 309 public static FileSystem getRootDirFileSystem(final Configuration c) throws IOException { 310 Path p = getRootDir(c); 311 return p.getFileSystem(c); 312 } 313 314 /** 315 * Get the path for the root directory for WAL data 316 * @param c configuration 317 * @return {@link Path} to hbase log root directory: e.g. {@value HBASE_WAL_DIR} from 318 * configuration as a qualified Path. Defaults to HBase root dir. 319 * @throws IOException e 320 */ 321 public static Path getWALRootDir(final Configuration c) throws IOException { 322 323 Path p = new Path(c.get(HBASE_WAL_DIR, c.get(HConstants.HBASE_DIR))); 324 if (!isValidWALRootDir(p, c)) { 325 return getRootDir(c); 326 } 327 FileSystem fs = p.getFileSystem(c); 328 return p.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 329 } 330 331 /** 332 * Returns the URI in the string format 333 * @param c configuration 334 * @param p path 335 * @return - the URI's to string format n 336 */ 337 public static String getDirUri(final Configuration c, Path p) throws IOException { 338 if (p.toUri().getScheme() != null) { 339 return p.toUri().toString(); 340 } 341 return null; 342 } 343 344 public static void setWALRootDir(final Configuration c, final Path root) { 345 c.set(HBASE_WAL_DIR, root.toString()); 346 } 347 348 public static FileSystem getWALFileSystem(final Configuration c) throws IOException { 349 Path p = getWALRootDir(c); 350 FileSystem fs = p.getFileSystem(c); 351 // hadoop-core does fs caching, so need to propagate this if set 352 String enforceStreamCapability = c.get(UNSAFE_STREAM_CAPABILITY_ENFORCE); 353 if (enforceStreamCapability != null) { 354 fs.getConf().set(UNSAFE_STREAM_CAPABILITY_ENFORCE, enforceStreamCapability); 355 } 356 return fs; 357 } 358 359 private static boolean isValidWALRootDir(Path walDir, final Configuration c) throws IOException { 360 Path rootDir = getRootDir(c); 361 FileSystem fs = walDir.getFileSystem(c); 362 Path qualifiedWalDir = walDir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 363 if (!qualifiedWalDir.equals(rootDir)) { 364 if (qualifiedWalDir.toString().startsWith(rootDir.toString() + "/")) { 365 throw new IllegalStateException("Illegal WAL directory specified. " 366 + "WAL directories are not permitted to be under root directory: rootDir=" 367 + rootDir.toString() + ", qualifiedWALDir=" + qualifiedWalDir); 368 } 369 } 370 return true; 371 } 372 373 /** 374 * Returns the WAL region directory based on the given table name and region name 375 * @param conf configuration to determine WALRootDir 376 * @param tableName Table that the region is under 377 * @param encodedRegionName Region name used for creating the final region directory 378 * @return the region directory used to store WALs under the WALRootDir 379 * @throws IOException if there is an exception determining the WALRootDir 380 */ 381 public static Path getWALRegionDir(final Configuration conf, final TableName tableName, 382 final String encodedRegionName) throws IOException { 383 return new Path(getWALTableDir(conf, tableName), encodedRegionName); 384 } 385 386 /** 387 * Returns the Table directory under the WALRootDir for the specified table name 388 * @param conf configuration used to get the WALRootDir 389 * @param tableName Table to get the directory for 390 * @return a path to the WAL table directory for the specified table 391 * @throws IOException if there is an exception determining the WALRootDir 392 */ 393 public static Path getWALTableDir(final Configuration conf, final TableName tableName) 394 throws IOException { 395 Path baseDir = new Path(getWALRootDir(conf), HConstants.BASE_NAMESPACE_DIR); 396 return new Path(new Path(baseDir, tableName.getNamespaceAsString()), 397 tableName.getQualifierAsString()); 398 } 399 400 /** 401 * For backward compatibility with HBASE-20734, where we store recovered edits in a wrong 402 * directory without BASE_NAMESPACE_DIR. See HBASE-22617 for more details. 403 * @deprecated For compatibility, will be removed in 4.0.0. 404 */ 405 @Deprecated 406 public static Path getWrongWALRegionDir(final Configuration conf, final TableName tableName, 407 final String encodedRegionName) throws IOException { 408 Path wrongTableDir = new Path(new Path(getWALRootDir(conf), tableName.getNamespaceAsString()), 409 tableName.getQualifierAsString()); 410 return new Path(wrongTableDir, encodedRegionName); 411 } 412 413 /** 414 * Returns the {@link org.apache.hadoop.fs.Path} object representing the table directory under 415 * path rootdir 416 * @param rootdir qualified path of HBase root directory 417 * @param tableName name of table 418 * @return {@link org.apache.hadoop.fs.Path} for table 419 */ 420 public static Path getTableDir(Path rootdir, final TableName tableName) { 421 return new Path(getNamespaceDir(rootdir, tableName.getNamespaceAsString()), 422 tableName.getQualifierAsString()); 423 } 424 425 /** 426 * Returns the {@link org.apache.hadoop.fs.Path} object representing the region directory under 427 * path rootdir 428 * @param rootdir qualified path of HBase root directory 429 * @param tableName name of table 430 * @param regionName The encoded region name 431 * @return {@link org.apache.hadoop.fs.Path} for region 432 */ 433 public static Path getRegionDir(Path rootdir, TableName tableName, String regionName) { 434 return new Path(getTableDir(rootdir, tableName), regionName); 435 } 436 437 /** 438 * Returns the {@link org.apache.hadoop.hbase.TableName} object representing the table directory 439 * under path rootdir 440 * @param tablePath path of table 441 * @return {@link org.apache.hadoop.fs.Path} for table 442 */ 443 public static TableName getTableName(Path tablePath) { 444 return TableName.valueOf(tablePath.getParent().getName(), tablePath.getName()); 445 } 446 447 /** 448 * Returns the {@link org.apache.hadoop.fs.Path} object representing the namespace directory under 449 * path rootdir 450 * @param rootdir qualified path of HBase root directory 451 * @param namespace namespace name 452 * @return {@link org.apache.hadoop.fs.Path} for table 453 */ 454 public static Path getNamespaceDir(Path rootdir, final String namespace) { 455 return new Path(rootdir, new Path(HConstants.BASE_NAMESPACE_DIR, new Path(namespace))); 456 } 457 458 // this mapping means that under a federated FileSystem implementation, we'll 459 // only log the first failure from any of the underlying FileSystems at WARN and all others 460 // will be at DEBUG. 461 private static final Map<FileSystem, Boolean> warningMap = new ConcurrentHashMap<>(); 462 463 /** 464 * Sets storage policy for given path. If the passed path is a directory, we'll set the storage 465 * policy for all files created in the future in said directory. Note that this change in storage 466 * policy takes place at the FileSystem level; it will persist beyond this RS's lifecycle. If 467 * we're running on a version of FileSystem that doesn't support the given storage policy (or 468 * storage policies at all), then we'll issue a log message and continue. See 469 * http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html 470 * @param fs We only do anything it implements a setStoragePolicy method 471 * @param path the Path whose storage policy is to be set 472 * @param storagePolicy Policy to set on <code>path</code>; see hadoop 2.6+ 473 * org.apache.hadoop.hdfs.protocol.HdfsConstants for possible list e.g 474 * 'COLD', 'WARM', 'HOT', 'ONE_SSD', 'ALL_SSD', 'LAZY_PERSIST'. 475 */ 476 public static void setStoragePolicy(final FileSystem fs, final Path path, 477 final String storagePolicy) { 478 try { 479 setStoragePolicy(fs, path, storagePolicy, false); 480 } catch (IOException e) { 481 // should never arrive here 482 LOG.warn("We have chosen not to throw exception but some unexpectedly thrown out", e); 483 } 484 } 485 486 static void setStoragePolicy(final FileSystem fs, final Path path, final String storagePolicy, 487 boolean throwException) throws IOException { 488 if (storagePolicy == null) { 489 if (LOG.isTraceEnabled()) { 490 LOG.trace("We were passed a null storagePolicy, exiting early."); 491 } 492 return; 493 } 494 String trimmedStoragePolicy = storagePolicy.trim(); 495 if (trimmedStoragePolicy.isEmpty()) { 496 LOG.trace("We were passed an empty storagePolicy, exiting early."); 497 return; 498 } else { 499 trimmedStoragePolicy = trimmedStoragePolicy.toUpperCase(Locale.ROOT); 500 } 501 if (trimmedStoragePolicy.equals(HConstants.DEFER_TO_HDFS_STORAGE_POLICY)) { 502 LOG.trace("We were passed the defer-to-hdfs policy {}, exiting early.", trimmedStoragePolicy); 503 return; 504 } 505 try { 506 invokeSetStoragePolicy(fs, path, trimmedStoragePolicy); 507 } catch (IOException e) { 508 LOG.trace("Failed to invoke set storage policy API on FS", e); 509 if (throwException) { 510 throw e; 511 } 512 } 513 } 514 515 /* 516 * All args have been checked and are good. Run the setStoragePolicy invocation. 517 */ 518 private static void invokeSetStoragePolicy(final FileSystem fs, final Path path, 519 final String storagePolicy) throws IOException { 520 Exception toThrow = null; 521 522 try { 523 fs.setStoragePolicy(path, storagePolicy); 524 LOG.debug("Set storagePolicy={} for path={}", storagePolicy, path); 525 } catch (Exception e) { 526 toThrow = e; 527 // This swallows FNFE, should we be throwing it? seems more likely to indicate dev 528 // misuse than a runtime problem with HDFS. 529 if (!warningMap.containsKey(fs)) { 530 warningMap.put(fs, true); 531 LOG.warn("Unable to set storagePolicy=" + storagePolicy + " for path=" + path + ". " 532 + "DEBUG log level might have more details.", e); 533 } else if (LOG.isDebugEnabled()) { 534 LOG.debug("Unable to set storagePolicy=" + storagePolicy + " for path=" + path, e); 535 } 536 537 // Hadoop 2.8+, 3.0-a1+ added FileSystem.setStoragePolicy with a default implementation 538 // that throws UnsupportedOperationException 539 if (e instanceof UnsupportedOperationException) { 540 if (LOG.isDebugEnabled()) { 541 LOG.debug("The underlying FileSystem implementation doesn't support " 542 + "setStoragePolicy. This is probably intentional on their part, since HDFS-9345 " 543 + "appears to be present in your version of Hadoop. For more information check " 544 + "the Hadoop documentation on 'ArchivalStorage', the Hadoop FileSystem " 545 + "specification docs from HADOOP-11981, and/or related documentation from the " 546 + "provider of the underlying FileSystem (its name should appear in the " 547 + "stacktrace that accompanies this message). Note in particular that Hadoop's " 548 + "local filesystem implementation doesn't support storage policies.", e); 549 } 550 } 551 } 552 553 if (toThrow != null) { 554 throw new IOException(toThrow); 555 } 556 } 557 558 /** 559 * Return true if this is a filesystem whose scheme is 'hdfs'. 560 * @throws IOException from underlying FileSystem 561 */ 562 public static boolean isHDFS(final Configuration conf) throws IOException { 563 FileSystem fs = FileSystem.get(conf); 564 String scheme = fs.getUri().getScheme(); 565 return scheme.equalsIgnoreCase("hdfs"); 566 } 567 568 /** 569 * Checks if the given path is the one with 'recovered.edits' dir. 570 * @param path must not be null 571 * @return True if we recovered edits 572 */ 573 public static boolean isRecoveredEdits(Path path) { 574 return path.toString().contains(HConstants.RECOVERED_EDITS_DIR); 575 } 576 577 /** 578 * Returns the filesystem of the hbase rootdir. 579 * @throws IOException from underlying FileSystem 580 */ 581 public static FileSystem getCurrentFileSystem(Configuration conf) throws IOException { 582 return getRootDir(conf).getFileSystem(conf); 583 } 584 585 /** 586 * Calls fs.listStatus() and treats FileNotFoundException as non-fatal This accommodates 587 * differences between hadoop versions, where hadoop 1 does not throw a FileNotFoundException, and 588 * return an empty FileStatus[] while Hadoop 2 will throw FileNotFoundException. Where possible, 589 * prefer FSUtils#listStatusWithStatusFilter(FileSystem, Path, FileStatusFilter) instead. 590 * @param fs file system 591 * @param dir directory 592 * @param filter path filter 593 * @return null if dir is empty or doesn't exist, otherwise FileStatus array 594 */ 595 public static FileStatus[] listStatus(final FileSystem fs, final Path dir, 596 final PathFilter filter) throws IOException { 597 FileStatus[] status = null; 598 try { 599 status = filter == null ? fs.listStatus(dir) : fs.listStatus(dir, filter); 600 } catch (FileNotFoundException fnfe) { 601 // if directory doesn't exist, return null 602 if (LOG.isTraceEnabled()) { 603 LOG.trace("{} doesn't exist", dir); 604 } 605 } 606 if (status == null || status.length < 1) { 607 return null; 608 } 609 return status; 610 } 611 612 /** 613 * Calls fs.listStatus() and treats FileNotFoundException as non-fatal This would accommodates 614 * differences between hadoop versions 615 * @param fs file system 616 * @param dir directory 617 * @return null if dir is empty or doesn't exist, otherwise FileStatus array 618 */ 619 public static FileStatus[] listStatus(final FileSystem fs, final Path dir) throws IOException { 620 return listStatus(fs, dir, null); 621 } 622 623 /** 624 * Calls fs.listFiles() to get FileStatus and BlockLocations together for reducing rpc call 625 * @param fs file system 626 * @param dir directory 627 * @return LocatedFileStatus list 628 */ 629 public static List<LocatedFileStatus> listLocatedStatus(final FileSystem fs, final Path dir) 630 throws IOException { 631 List<LocatedFileStatus> status = null; 632 try { 633 RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fs.listFiles(dir, false); 634 while (locatedFileStatusRemoteIterator.hasNext()) { 635 if (status == null) { 636 status = Lists.newArrayList(); 637 } 638 status.add(locatedFileStatusRemoteIterator.next()); 639 } 640 } catch (FileNotFoundException fnfe) { 641 // if directory doesn't exist, return null 642 if (LOG.isTraceEnabled()) { 643 LOG.trace("{} doesn't exist", dir); 644 } 645 } 646 return status; 647 } 648 649 /** 650 * Calls fs.delete() and returns the value returned by the fs.delete() 651 * @param fs must not be null 652 * @param path must not be null 653 * @param recursive delete tree rooted at path 654 * @return the value returned by the fs.delete() 655 * @throws IOException from underlying FileSystem 656 */ 657 public static boolean delete(final FileSystem fs, final Path path, final boolean recursive) 658 throws IOException { 659 return fs.delete(path, recursive); 660 } 661 662 /** 663 * Calls fs.exists(). Checks if the specified path exists 664 * @param fs must not be null 665 * @param path must not be null 666 * @return the value returned by fs.exists() 667 * @throws IOException from underlying FileSystem 668 */ 669 public static boolean isExists(final FileSystem fs, final Path path) throws IOException { 670 return fs.exists(path); 671 } 672 673 /** 674 * Log the current state of the filesystem from a certain root directory 675 * @param fs filesystem to investigate 676 * @param root root file/directory to start logging from 677 * @param log log to output information 678 * @throws IOException if an unexpected exception occurs 679 */ 680 public static void logFileSystemState(final FileSystem fs, final Path root, Logger log) 681 throws IOException { 682 log.debug("File system contents for path {}", root); 683 logFSTree(log, fs, root, "|-"); 684 } 685 686 /** 687 * Recursive helper to log the state of the FS 688 * @see #logFileSystemState(FileSystem, Path, Logger) 689 */ 690 private static void logFSTree(Logger log, final FileSystem fs, final Path root, String prefix) 691 throws IOException { 692 FileStatus[] files = listStatus(fs, root, null); 693 if (files == null) { 694 return; 695 } 696 697 for (FileStatus file : files) { 698 if (file.isDirectory()) { 699 log.debug(prefix + file.getPath().getName() + "/"); 700 logFSTree(log, fs, file.getPath(), prefix + "---"); 701 } else { 702 log.debug(prefix + file.getPath().getName()); 703 } 704 } 705 } 706 707 public static boolean renameAndSetModifyTime(final FileSystem fs, final Path src, final Path dest) 708 throws IOException { 709 // set the modify time for TimeToLive Cleaner 710 fs.setTimes(src, EnvironmentEdgeManager.currentTime(), -1); 711 return fs.rename(src, dest); 712 } 713 714 /** 715 * Check if short circuit read buffer size is set and if not, set it to hbase value. 716 * @param conf must not be null 717 */ 718 public static void checkShortCircuitReadBufferSize(final Configuration conf) { 719 final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2; 720 final int notSet = -1; 721 // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2 722 final String dfsKey = "dfs.client.read.shortcircuit.buffer.size"; 723 int size = conf.getInt(dfsKey, notSet); 724 // If a size is set, return -- we will use it. 725 if (size != notSet) { 726 return; 727 } 728 // But short circuit buffer size is normally not set. Put in place the hbase wanted size. 729 int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize); 730 conf.setIfUnset(dfsKey, Integer.toString(hbaseSize)); 731 } 732 733 private static final class DfsBuilderUtility { 734 private static final Class<?> BUILDER; 735 private static final Method REPLICATE; 736 737 static { 738 String builderName = 739 "org.apache.hadoop.hdfs.DistributedFileSystem$HdfsDataOutputStreamBuilder"; 740 Class<?> builderClass = null; 741 try { 742 builderClass = Class.forName(builderName); 743 } catch (ClassNotFoundException e) { 744 LOG.debug("{} not available, will not set replicate when creating output stream", 745 builderName); 746 } 747 Method replicateMethod = null; 748 if (builderClass != null) { 749 try { 750 replicateMethod = builderClass.getMethod("replicate"); 751 LOG.debug("Using builder API via reflection for DFS file creation."); 752 } catch (NoSuchMethodException e) { 753 LOG.debug("Could not find replicate method on builder; will not set replicate when" 754 + " creating output stream", e); 755 } 756 } 757 BUILDER = builderClass; 758 REPLICATE = replicateMethod; 759 } 760 761 /** 762 * Attempt to use builder API via reflection to call the replicate method on the given builder. 763 */ 764 static void replicate(FSDataOutputStreamBuilder<?, ?> builder) { 765 if (BUILDER != null && REPLICATE != null && BUILDER.isAssignableFrom(builder.getClass())) { 766 try { 767 REPLICATE.invoke(builder); 768 } catch (IllegalAccessException | InvocationTargetException e) { 769 // Should have caught this failure during initialization, so log full trace here 770 LOG.warn("Couldn't use reflection with builder API", e); 771 } 772 } 773 } 774 } 775 776 /** 777 * Attempt to use builder API via reflection to create a file with the given parameters and 778 * replication enabled. 779 * <p/> 780 * Will not attempt to enable replication when passed an HFileSystem. 781 */ 782 public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwrite) 783 throws IOException { 784 FSDataOutputStreamBuilder<?, ?> builder = fs.createFile(path).overwrite(overwrite); 785 DfsBuilderUtility.replicate(builder); 786 return builder.build(); 787 } 788 789 /** 790 * Attempt to use builder API via reflection to create a file with the given parameters and 791 * replication enabled. 792 * <p/> 793 * Will not attempt to enable replication when passed an HFileSystem. 794 */ 795 public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwrite, 796 int bufferSize, short replication, long blockSize, boolean isRecursive) throws IOException { 797 FSDataOutputStreamBuilder<?, ?> builder = fs.createFile(path).overwrite(overwrite) 798 .bufferSize(bufferSize).replication(replication).blockSize(blockSize); 799 if (isRecursive) { 800 builder.recursive(); 801 } 802 DfsBuilderUtility.replicate(builder); 803 return builder.build(); 804 } 805 806 /** 807 * Helper exception for those cases where the place where we need to check a stream capability is 808 * not where we have the needed context to explain the impact and mitigation for a lack. 809 */ 810 public static class StreamLacksCapabilityException extends Exception { 811 public StreamLacksCapabilityException(String message, Throwable cause) { 812 super(message, cause); 813 } 814 815 public StreamLacksCapabilityException(String message) { 816 super(message); 817 } 818 } 819}