001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.util; 020 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.lang.reflect.InvocationTargetException; 024import java.lang.reflect.Method; 025import java.net.URI; 026import java.net.URISyntaxException; 027import java.util.List; 028import java.util.Locale; 029import java.util.Map; 030import java.util.concurrent.ConcurrentHashMap; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FSDataOutputStream; 033import org.apache.hadoop.fs.FSDataOutputStreamBuilder; 034import org.apache.hadoop.fs.FileStatus; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.LocatedFileStatus; 037import org.apache.hadoop.fs.Path; 038import org.apache.hadoop.fs.PathFilter; 039import org.apache.hadoop.fs.RemoteIterator; 040import org.apache.hadoop.fs.permission.FsPermission; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.TableName; 043import org.apache.yetus.audience.InterfaceAudience; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 048 049/** 050 * Utility methods for interacting with the underlying file system. 051 * <p/> 052 * Note that {@link #setStoragePolicy(FileSystem, Path, String)} is tested in TestFSUtils and 053 * pre-commit will run the hbase-server tests if there's code change in this class. See 054 * <a href="https://issues.apache.org/jira/browse/HBASE-20838">HBASE-20838</a> for more details. 055 */ 056@InterfaceAudience.Private 057public final class CommonFSUtils { 058 private static final Logger LOG = LoggerFactory.getLogger(CommonFSUtils.class); 059 060 /** Parameter name for HBase WAL directory */ 061 public static final String HBASE_WAL_DIR = "hbase.wal.dir"; 062 063 /** Parameter to disable stream capability enforcement checks */ 064 public static final String UNSAFE_STREAM_CAPABILITY_ENFORCE = 065 "hbase.unsafe.stream.capability.enforce"; 066 067 /** Full access permissions (starting point for a umask) */ 068 public static final String FULL_RWX_PERMISSIONS = "777"; 069 070 private CommonFSUtils() { 071 } 072 073 /** 074 * Compare of path component. Does not consider schema; i.e. if schemas 075 * different but <code>path</code> starts with <code>rootPath</code>, 076 * then the function returns true 077 * @param rootPath value to check for 078 * @param path subject to check 079 * @return True if <code>path</code> starts with <code>rootPath</code> 080 */ 081 public static boolean isStartingWithPath(final Path rootPath, final String path) { 082 String uriRootPath = rootPath.toUri().getPath(); 083 String tailUriPath = (new Path(path)).toUri().getPath(); 084 return tailUriPath.startsWith(uriRootPath); 085 } 086 087 /** 088 * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the 089 * '/a/b/c' part. Does not consider schema; i.e. if schemas different but path or subpath matches, 090 * the two will equate. 091 * @param pathToSearch Path we will be trying to match against. 092 * @param pathTail what to match 093 * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code> 094 */ 095 public static boolean isMatchingTail(final Path pathToSearch, String pathTail) { 096 return isMatchingTail(pathToSearch, new Path(pathTail)); 097 } 098 099 /** 100 * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the 101 * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true. Does not consider 102 * schema; i.e. if schemas different but path or subpath matches, the two will equate. 103 * @param pathToSearch Path we will be trying to match agains against 104 * @param pathTail what to match 105 * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code> 106 */ 107 public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) { 108 if (pathToSearch.depth() != pathTail.depth()) { 109 return false; 110 } 111 Path tailPath = pathTail; 112 String tailName; 113 Path toSearch = pathToSearch; 114 String toSearchName; 115 boolean result = false; 116 do { 117 tailName = tailPath.getName(); 118 if (tailName == null || tailName.length() <= 0) { 119 result = true; 120 break; 121 } 122 toSearchName = toSearch.getName(); 123 if (toSearchName == null || toSearchName.length() <= 0) { 124 break; 125 } 126 // Move up a parent on each path for next go around. Path doesn't let us go off the end. 127 tailPath = tailPath.getParent(); 128 toSearch = toSearch.getParent(); 129 } while(tailName.equals(toSearchName)); 130 return result; 131 } 132 133 /** 134 * Delete if exists. 135 * @param fs filesystem object 136 * @param dir directory to delete 137 * @return True if deleted <code>dir</code> 138 * @throws IOException e 139 */ 140 public static boolean deleteDirectory(final FileSystem fs, final Path dir) throws IOException { 141 return fs.exists(dir) && fs.delete(dir, true); 142 } 143 144 /** 145 * Return the number of bytes that large input files should be optimally 146 * be split into to minimize i/o time. 147 * 148 * @param fs filesystem object 149 * @return the default block size for the path's filesystem 150 */ 151 public static long getDefaultBlockSize(final FileSystem fs, final Path path) { 152 return fs.getDefaultBlockSize(path); 153 } 154 155 /* 156 * Get the default replication. 157 * 158 * @param fs filesystem object 159 * @param f path of file 160 * @return default replication for the path's filesystem 161 */ 162 public static short getDefaultReplication(final FileSystem fs, final Path path) { 163 return fs.getDefaultReplication(path); 164 } 165 166 /** 167 * Returns the default buffer size to use during writes. 168 * 169 * The size of the buffer should probably be a multiple of hardware 170 * page size (4096 on Intel x86), and it determines how much data is 171 * buffered during read and write operations. 172 * 173 * @param fs filesystem object 174 * @return default buffer size to use during writes 175 */ 176 public static int getDefaultBufferSize(final FileSystem fs) { 177 return fs.getConf().getInt("io.file.buffer.size", 4096); 178 } 179 180 /** 181 * Create the specified file on the filesystem. By default, this will: 182 * <ol> 183 * <li>apply the umask in the configuration (if it is enabled)</li> 184 * <li>use the fs configured buffer size (or 4096 if not set)</li> 185 * <li>use the default replication</li> 186 * <li>use the default block size</li> 187 * <li>not track progress</li> 188 * </ol> 189 * 190 * @param fs {@link FileSystem} on which to write the file 191 * @param path {@link Path} to the file to write 192 * @param perm intial permissions 193 * @param overwrite Whether or not the created file should be overwritten. 194 * @return output stream to the created file 195 * @throws IOException if the file cannot be created 196 */ 197 public static FSDataOutputStream create(FileSystem fs, Path path, 198 FsPermission perm, boolean overwrite) throws IOException { 199 if (LOG.isTraceEnabled()) { 200 LOG.trace("Creating file={} with permission={}, overwrite={}", path, perm, overwrite); 201 } 202 return fs.create(path, perm, overwrite, getDefaultBufferSize(fs), 203 getDefaultReplication(fs, path), getDefaultBlockSize(fs, path), null); 204 } 205 206 /** 207 * Get the file permissions specified in the configuration, if they are 208 * enabled. 209 * 210 * @param fs filesystem that the file will be created on. 211 * @param conf configuration to read for determining if permissions are 212 * enabled and which to use 213 * @param permssionConfKey property key in the configuration to use when 214 * finding the permission 215 * @return the permission to use when creating a new file on the fs. If 216 * special permissions are not specified in the configuration, then 217 * the default permissions on the the fs will be returned. 218 */ 219 public static FsPermission getFilePermissions(final FileSystem fs, 220 final Configuration conf, final String permssionConfKey) { 221 boolean enablePermissions = conf.getBoolean( 222 HConstants.ENABLE_DATA_FILE_UMASK, false); 223 224 if (enablePermissions) { 225 try { 226 FsPermission perm = new FsPermission(FULL_RWX_PERMISSIONS); 227 // make sure that we have a mask, if not, go default. 228 String mask = conf.get(permssionConfKey); 229 if (mask == null) { 230 return FsPermission.getFileDefault(); 231 } 232 // appy the umask 233 FsPermission umask = new FsPermission(mask); 234 return perm.applyUMask(umask); 235 } catch (IllegalArgumentException e) { 236 LOG.warn( 237 "Incorrect umask attempted to be created: " 238 + conf.get(permssionConfKey) 239 + ", using default file permissions.", e); 240 return FsPermission.getFileDefault(); 241 } 242 } 243 return FsPermission.getFileDefault(); 244 } 245 246 /** 247 * Verifies root directory path is a valid URI with a scheme 248 * 249 * @param root root directory path 250 * @return Passed <code>root</code> argument. 251 * @throws IOException if not a valid URI with a scheme 252 */ 253 public static Path validateRootPath(Path root) throws IOException { 254 try { 255 URI rootURI = new URI(root.toString()); 256 String scheme = rootURI.getScheme(); 257 if (scheme == null) { 258 throw new IOException("Root directory does not have a scheme"); 259 } 260 return root; 261 } catch (URISyntaxException e) { 262 throw new IOException("Root directory path is not a valid " + 263 "URI -- check your " + HConstants.HBASE_DIR + " configuration", e); 264 } 265 } 266 267 /** 268 * Checks for the presence of the WAL log root path (using the provided conf object) in the given 269 * path. If it exists, this method removes it and returns the String representation of remaining 270 * relative path. 271 * @param path must not be null 272 * @param conf must not be null 273 * @return String representation of the remaining relative path 274 * @throws IOException from underlying filesystem 275 */ 276 public static String removeWALRootPath(Path path, final Configuration conf) throws IOException { 277 Path root = getWALRootDir(conf); 278 String pathStr = path.toString(); 279 // check that the path is absolute... it has the root path in it. 280 if (!pathStr.startsWith(root.toString())) { 281 return pathStr; 282 } 283 // if not, return as it is. 284 return pathStr.substring(root.toString().length() + 1);// remove the "/" too. 285 } 286 287 /** 288 * Return the 'path' component of a Path. In Hadoop, Path is a URI. This 289 * method returns the 'path' component of a Path's URI: e.g. If a Path is 290 * <code>hdfs://example.org:9000/hbase_trunk/TestTable/compaction.dir</code>, 291 * this method returns <code>/hbase_trunk/TestTable/compaction.dir</code>. 292 * This method is useful if you want to print out a Path without qualifying 293 * Filesystem instance. 294 * @param p Filesystem Path whose 'path' component we are to return. 295 * @return Path portion of the Filesystem 296 */ 297 public static String getPath(Path p) { 298 return p.toUri().getPath(); 299 } 300 301 /** 302 * @param c configuration 303 * @return {@link Path} to hbase root directory from 304 * configuration as a qualified Path. 305 * @throws IOException e 306 */ 307 public static Path getRootDir(final Configuration c) throws IOException { 308 Path p = new Path(c.get(HConstants.HBASE_DIR)); 309 FileSystem fs = p.getFileSystem(c); 310 return p.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 311 } 312 313 public static void setRootDir(final Configuration c, final Path root) { 314 c.set(HConstants.HBASE_DIR, root.toString()); 315 } 316 317 public static void setFsDefault(final Configuration c, final Path root) { 318 c.set("fs.defaultFS", root.toString()); // for hadoop 0.21+ 319 } 320 321 public static void setFsDefault(final Configuration c, final String uri) { 322 c.set("fs.defaultFS", uri); // for hadoop 0.21+ 323 } 324 325 public static FileSystem getRootDirFileSystem(final Configuration c) throws IOException { 326 Path p = getRootDir(c); 327 return p.getFileSystem(c); 328 } 329 330 /** 331 * @param c configuration 332 * @return {@link Path} to hbase log root directory: e.g. {@value HBASE_WAL_DIR} from 333 * configuration as a qualified Path. Defaults to HBase root dir. 334 * @throws IOException e 335 */ 336 public static Path getWALRootDir(final Configuration c) throws IOException { 337 338 Path p = new Path(c.get(HBASE_WAL_DIR, c.get(HConstants.HBASE_DIR))); 339 if (!isValidWALRootDir(p, c)) { 340 return getRootDir(c); 341 } 342 FileSystem fs = p.getFileSystem(c); 343 return p.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 344 } 345 346 /** 347 * Returns the URI in the string format 348 * @param c configuration 349 * @param p path 350 * @return - the URI's to string format 351 * @throws IOException 352 */ 353 public static String getDirUri(final Configuration c, Path p) throws IOException { 354 if (p.toUri().getScheme() != null) { 355 return p.toUri().toString(); 356 } 357 return null; 358 } 359 360 public static void setWALRootDir(final Configuration c, final Path root) { 361 c.set(HBASE_WAL_DIR, root.toString()); 362 } 363 364 public static FileSystem getWALFileSystem(final Configuration c) throws IOException { 365 Path p = getWALRootDir(c); 366 FileSystem fs = p.getFileSystem(c); 367 // hadoop-core does fs caching, so need to propagate this if set 368 String enforceStreamCapability = c.get(UNSAFE_STREAM_CAPABILITY_ENFORCE); 369 if (enforceStreamCapability != null) { 370 fs.getConf().set(UNSAFE_STREAM_CAPABILITY_ENFORCE, enforceStreamCapability); 371 } 372 return fs; 373 } 374 375 private static boolean isValidWALRootDir(Path walDir, final Configuration c) throws IOException { 376 Path rootDir = getRootDir(c); 377 FileSystem fs = walDir.getFileSystem(c); 378 Path qualifiedWalDir = walDir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 379 if (!qualifiedWalDir.equals(rootDir)) { 380 if (qualifiedWalDir.toString().startsWith(rootDir.toString() + "/")) { 381 throw new IllegalStateException("Illegal WAL directory specified. " + 382 "WAL directories are not permitted to be under root directory: rootDir=" + 383 rootDir.toString() + ", qualifiedWALDir=" + qualifiedWalDir); 384 } 385 } 386 return true; 387 } 388 389 /** 390 * Returns the WAL region directory based on the given table name and region name 391 * @param conf configuration to determine WALRootDir 392 * @param tableName Table that the region is under 393 * @param encodedRegionName Region name used for creating the final region directory 394 * @return the region directory used to store WALs under the WALRootDir 395 * @throws IOException if there is an exception determining the WALRootDir 396 */ 397 public static Path getWALRegionDir(final Configuration conf, final TableName tableName, 398 final String encodedRegionName) throws IOException { 399 return new Path(getWALTableDir(conf, tableName), encodedRegionName); 400 } 401 402 /** 403 * Returns the Table directory under the WALRootDir for the specified table name 404 * @param conf configuration used to get the WALRootDir 405 * @param tableName Table to get the directory for 406 * @return a path to the WAL table directory for the specified table 407 * @throws IOException if there is an exception determining the WALRootDir 408 */ 409 public static Path getWALTableDir(final Configuration conf, final TableName tableName) 410 throws IOException { 411 Path baseDir = new Path(getWALRootDir(conf), HConstants.BASE_NAMESPACE_DIR); 412 return new Path(new Path(baseDir, tableName.getNamespaceAsString()), 413 tableName.getQualifierAsString()); 414 } 415 416 /** 417 * For backward compatibility with HBASE-20734, where we store recovered edits in a wrong 418 * directory without BASE_NAMESPACE_DIR. See HBASE-22617 for more details. 419 * @deprecated For compatibility, will be removed in 4.0.0. 420 */ 421 @Deprecated 422 public static Path getWrongWALRegionDir(final Configuration conf, final TableName tableName, 423 final String encodedRegionName) throws IOException { 424 Path wrongTableDir = new Path(new Path(getWALRootDir(conf), tableName.getNamespaceAsString()), 425 tableName.getQualifierAsString()); 426 return new Path(wrongTableDir, encodedRegionName); 427 } 428 429 /** 430 * Returns the {@link org.apache.hadoop.fs.Path} object representing the table directory under 431 * path rootdir 432 * 433 * @param rootdir qualified path of HBase root directory 434 * @param tableName name of table 435 * @return {@link org.apache.hadoop.fs.Path} for table 436 */ 437 public static Path getTableDir(Path rootdir, final TableName tableName) { 438 return new Path(getNamespaceDir(rootdir, tableName.getNamespaceAsString()), 439 tableName.getQualifierAsString()); 440 } 441 442 /** 443 * Returns the {@link org.apache.hadoop.fs.Path} object representing the region directory under 444 * path rootdir 445 * 446 * @param rootdir qualified path of HBase root directory 447 * @param tableName name of table 448 * @param regionName The encoded region name 449 * @return {@link org.apache.hadoop.fs.Path} for region 450 */ 451 public static Path getRegionDir(Path rootdir, TableName tableName, String regionName) { 452 return new Path(getTableDir(rootdir, tableName), regionName); 453 } 454 455 /** 456 * Returns the {@link org.apache.hadoop.hbase.TableName} object representing 457 * the table directory under 458 * path rootdir 459 * 460 * @param tablePath path of table 461 * @return {@link org.apache.hadoop.fs.Path} for table 462 */ 463 public static TableName getTableName(Path tablePath) { 464 return TableName.valueOf(tablePath.getParent().getName(), tablePath.getName()); 465 } 466 467 /** 468 * Returns the {@link org.apache.hadoop.fs.Path} object representing 469 * the namespace directory under path rootdir 470 * 471 * @param rootdir qualified path of HBase root directory 472 * @param namespace namespace name 473 * @return {@link org.apache.hadoop.fs.Path} for table 474 */ 475 public static Path getNamespaceDir(Path rootdir, final String namespace) { 476 return new Path(rootdir, new Path(HConstants.BASE_NAMESPACE_DIR, 477 new Path(namespace))); 478 } 479 480 // this mapping means that under a federated FileSystem implementation, we'll 481 // only log the first failure from any of the underlying FileSystems at WARN and all others 482 // will be at DEBUG. 483 private static final Map<FileSystem, Boolean> warningMap = new ConcurrentHashMap<>(); 484 485 /** 486 * Sets storage policy for given path. 487 * If the passed path is a directory, we'll set the storage policy for all files 488 * created in the future in said directory. Note that this change in storage 489 * policy takes place at the FileSystem level; it will persist beyond this RS's lifecycle. 490 * If we're running on a version of FileSystem that doesn't support the given storage policy 491 * (or storage policies at all), then we'll issue a log message and continue. 492 * 493 * See http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html 494 * 495 * @param fs We only do anything it implements a setStoragePolicy method 496 * @param path the Path whose storage policy is to be set 497 * @param storagePolicy Policy to set on <code>path</code>; see hadoop 2.6+ 498 * org.apache.hadoop.hdfs.protocol.HdfsConstants for possible list e.g 499 * 'COLD', 'WARM', 'HOT', 'ONE_SSD', 'ALL_SSD', 'LAZY_PERSIST'. 500 */ 501 public static void setStoragePolicy(final FileSystem fs, final Path path, 502 final String storagePolicy) { 503 try { 504 setStoragePolicy(fs, path, storagePolicy, false); 505 } catch (IOException e) { 506 // should never arrive here 507 LOG.warn("We have chosen not to throw exception but some unexpectedly thrown out", e); 508 } 509 } 510 511 static void setStoragePolicy(final FileSystem fs, final Path path, final String storagePolicy, 512 boolean throwException) throws IOException { 513 if (storagePolicy == null) { 514 if (LOG.isTraceEnabled()) { 515 LOG.trace("We were passed a null storagePolicy, exiting early."); 516 } 517 return; 518 } 519 String trimmedStoragePolicy = storagePolicy.trim(); 520 if (trimmedStoragePolicy.isEmpty()) { 521 LOG.trace("We were passed an empty storagePolicy, exiting early."); 522 return; 523 } else { 524 trimmedStoragePolicy = trimmedStoragePolicy.toUpperCase(Locale.ROOT); 525 } 526 if (trimmedStoragePolicy.equals(HConstants.DEFER_TO_HDFS_STORAGE_POLICY)) { 527 LOG.trace("We were passed the defer-to-hdfs policy {}, exiting early.", trimmedStoragePolicy); 528 return; 529 } 530 try { 531 invokeSetStoragePolicy(fs, path, trimmedStoragePolicy); 532 } catch (IOException e) { 533 LOG.trace("Failed to invoke set storage policy API on FS", e); 534 if (throwException) { 535 throw e; 536 } 537 } 538 } 539 540 /* 541 * All args have been checked and are good. Run the setStoragePolicy invocation. 542 */ 543 private static void invokeSetStoragePolicy(final FileSystem fs, final Path path, 544 final String storagePolicy) throws IOException { 545 Exception toThrow = null; 546 547 try { 548 fs.setStoragePolicy(path, storagePolicy); 549 LOG.debug("Set storagePolicy={} for path={}", storagePolicy, path); 550 } catch (Exception e) { 551 toThrow = e; 552 // This swallows FNFE, should we be throwing it? seems more likely to indicate dev 553 // misuse than a runtime problem with HDFS. 554 if (!warningMap.containsKey(fs)) { 555 warningMap.put(fs, true); 556 LOG.warn("Unable to set storagePolicy=" + storagePolicy + " for path=" + path + ". " + 557 "DEBUG log level might have more details.", e); 558 } else if (LOG.isDebugEnabled()) { 559 LOG.debug("Unable to set storagePolicy=" + storagePolicy + " for path=" + path, e); 560 } 561 562 // Hadoop 2.8+, 3.0-a1+ added FileSystem.setStoragePolicy with a default implementation 563 // that throws UnsupportedOperationException 564 if (e instanceof UnsupportedOperationException) { 565 if (LOG.isDebugEnabled()) { 566 LOG.debug("The underlying FileSystem implementation doesn't support " + 567 "setStoragePolicy. This is probably intentional on their part, since HDFS-9345 " + 568 "appears to be present in your version of Hadoop. For more information check " + 569 "the Hadoop documentation on 'ArchivalStorage', the Hadoop FileSystem " + 570 "specification docs from HADOOP-11981, and/or related documentation from the " + 571 "provider of the underlying FileSystem (its name should appear in the " + 572 "stacktrace that accompanies this message). Note in particular that Hadoop's " + 573 "local filesystem implementation doesn't support storage policies.", e); 574 } 575 } 576 } 577 578 if (toThrow != null) { 579 throw new IOException(toThrow); 580 } 581 } 582 583 /** 584 * @param conf must not be null 585 * @return True if this filesystem whose scheme is 'hdfs'. 586 * @throws IOException from underlying FileSystem 587 */ 588 public static boolean isHDFS(final Configuration conf) throws IOException { 589 FileSystem fs = FileSystem.get(conf); 590 String scheme = fs.getUri().getScheme(); 591 return scheme.equalsIgnoreCase("hdfs"); 592 } 593 594 /** 595 * Checks if the given path is the one with 'recovered.edits' dir. 596 * @param path must not be null 597 * @return True if we recovered edits 598 */ 599 public static boolean isRecoveredEdits(Path path) { 600 return path.toString().contains(HConstants.RECOVERED_EDITS_DIR); 601 } 602 603 /** 604 * @param conf must not be null 605 * @return Returns the filesystem of the hbase rootdir. 606 * @throws IOException from underlying FileSystem 607 */ 608 public static FileSystem getCurrentFileSystem(Configuration conf) throws IOException { 609 return getRootDir(conf).getFileSystem(conf); 610 } 611 612 /** 613 * Calls fs.listStatus() and treats FileNotFoundException as non-fatal 614 * This accommodates differences between hadoop versions, where hadoop 1 615 * does not throw a FileNotFoundException, and return an empty FileStatus[] 616 * while Hadoop 2 will throw FileNotFoundException. 617 * 618 * Where possible, prefer FSUtils#listStatusWithStatusFilter(FileSystem, 619 * Path, FileStatusFilter) instead. 620 * 621 * @param fs file system 622 * @param dir directory 623 * @param filter path filter 624 * @return null if dir is empty or doesn't exist, otherwise FileStatus array 625 */ 626 public static FileStatus[] listStatus(final FileSystem fs, 627 final Path dir, final PathFilter filter) throws IOException { 628 FileStatus [] status = null; 629 try { 630 status = filter == null ? fs.listStatus(dir) : fs.listStatus(dir, filter); 631 } catch (FileNotFoundException fnfe) { 632 // if directory doesn't exist, return null 633 if (LOG.isTraceEnabled()) { 634 LOG.trace("{} doesn't exist", dir); 635 } 636 } 637 if (status == null || status.length < 1) { 638 return null; 639 } 640 return status; 641 } 642 643 /** 644 * Calls fs.listStatus() and treats FileNotFoundException as non-fatal 645 * This would accommodates differences between hadoop versions 646 * 647 * @param fs file system 648 * @param dir directory 649 * @return null if dir is empty or doesn't exist, otherwise FileStatus array 650 */ 651 public static FileStatus[] listStatus(final FileSystem fs, final Path dir) throws IOException { 652 return listStatus(fs, dir, null); 653 } 654 655 /** 656 * Calls fs.listFiles() to get FileStatus and BlockLocations together for reducing rpc call 657 * 658 * @param fs file system 659 * @param dir directory 660 * @return LocatedFileStatus list 661 */ 662 public static List<LocatedFileStatus> listLocatedStatus(final FileSystem fs, 663 final Path dir) throws IOException { 664 List<LocatedFileStatus> status = null; 665 try { 666 RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fs 667 .listFiles(dir, false); 668 while (locatedFileStatusRemoteIterator.hasNext()) { 669 if (status == null) { 670 status = Lists.newArrayList(); 671 } 672 status.add(locatedFileStatusRemoteIterator.next()); 673 } 674 } catch (FileNotFoundException fnfe) { 675 // if directory doesn't exist, return null 676 if (LOG.isTraceEnabled()) { 677 LOG.trace("{} doesn't exist", dir); 678 } 679 } 680 return status; 681 } 682 683 /** 684 * Calls fs.delete() and returns the value returned by the fs.delete() 685 * 686 * @param fs must not be null 687 * @param path must not be null 688 * @param recursive delete tree rooted at path 689 * @return the value returned by the fs.delete() 690 * @throws IOException from underlying FileSystem 691 */ 692 public static boolean delete(final FileSystem fs, final Path path, final boolean recursive) 693 throws IOException { 694 return fs.delete(path, recursive); 695 } 696 697 /** 698 * Calls fs.exists(). Checks if the specified path exists 699 * 700 * @param fs must not be null 701 * @param path must not be null 702 * @return the value returned by fs.exists() 703 * @throws IOException from underlying FileSystem 704 */ 705 public static boolean isExists(final FileSystem fs, final Path path) throws IOException { 706 return fs.exists(path); 707 } 708 709 /** 710 * Log the current state of the filesystem from a certain root directory 711 * @param fs filesystem to investigate 712 * @param root root file/directory to start logging from 713 * @param log log to output information 714 * @throws IOException if an unexpected exception occurs 715 */ 716 public static void logFileSystemState(final FileSystem fs, final Path root, Logger log) 717 throws IOException { 718 log.debug("File system contents for path {}", root); 719 logFSTree(log, fs, root, "|-"); 720 } 721 722 /** 723 * Recursive helper to log the state of the FS 724 * 725 * @see #logFileSystemState(FileSystem, Path, Logger) 726 */ 727 private static void logFSTree(Logger log, final FileSystem fs, final Path root, String prefix) 728 throws IOException { 729 FileStatus[] files = listStatus(fs, root, null); 730 if (files == null) { 731 return; 732 } 733 734 for (FileStatus file : files) { 735 if (file.isDirectory()) { 736 log.debug(prefix + file.getPath().getName() + "/"); 737 logFSTree(log, fs, file.getPath(), prefix + "---"); 738 } else { 739 log.debug(prefix + file.getPath().getName()); 740 } 741 } 742 } 743 744 public static boolean renameAndSetModifyTime(final FileSystem fs, final Path src, final Path dest) 745 throws IOException { 746 // set the modify time for TimeToLive Cleaner 747 fs.setTimes(src, EnvironmentEdgeManager.currentTime(), -1); 748 return fs.rename(src, dest); 749 } 750 751 /** 752 * Check if short circuit read buffer size is set and if not, set it to hbase value. 753 * @param conf must not be null 754 */ 755 public static void checkShortCircuitReadBufferSize(final Configuration conf) { 756 final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2; 757 final int notSet = -1; 758 // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2 759 final String dfsKey = "dfs.client.read.shortcircuit.buffer.size"; 760 int size = conf.getInt(dfsKey, notSet); 761 // If a size is set, return -- we will use it. 762 if (size != notSet) { 763 return; 764 } 765 // But short circuit buffer size is normally not set. Put in place the hbase wanted size. 766 int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize); 767 conf.setIfUnset(dfsKey, Integer.toString(hbaseSize)); 768 } 769 770 private static final class DfsBuilderUtility { 771 private static final Class<?> BUILDER; 772 private static final Method REPLICATE; 773 774 static { 775 String builderName = "org.apache.hadoop.hdfs.DistributedFileSystem$HdfsDataOutputStreamBuilder"; 776 Class<?> builderClass = null; 777 try { 778 builderClass = Class.forName(builderName); 779 } catch (ClassNotFoundException e) { 780 LOG.debug("{} not available, will not set replicate when creating output stream", builderName); 781 } 782 Method replicateMethod = null; 783 if (builderClass != null) { 784 try { 785 replicateMethod = builderClass.getMethod("replicate"); 786 LOG.debug("Using builder API via reflection for DFS file creation."); 787 } catch (NoSuchMethodException e) { 788 LOG.debug("Could not find replicate method on builder; will not set replicate when" + 789 " creating output stream", e); 790 } 791 } 792 BUILDER = builderClass; 793 REPLICATE = replicateMethod; 794 } 795 796 /** 797 * Attempt to use builder API via reflection to call the replicate method on the given builder. 798 */ 799 static void replicate(FSDataOutputStreamBuilder<?, ?> builder) { 800 if (BUILDER != null && REPLICATE != null && BUILDER.isAssignableFrom(builder.getClass())) { 801 try { 802 REPLICATE.invoke(builder); 803 } catch (IllegalAccessException | InvocationTargetException e) { 804 // Should have caught this failure during initialization, so log full trace here 805 LOG.warn("Couldn't use reflection with builder API", e); 806 } 807 } 808 } 809 } 810 811 /** 812 * Attempt to use builder API via reflection to create a file with the given parameters and 813 * replication enabled. 814 * <p/> 815 * Will not attempt to enable replication when passed an HFileSystem. 816 */ 817 public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwrite) 818 throws IOException { 819 FSDataOutputStreamBuilder<?, ?> builder = fs.createFile(path).overwrite(overwrite); 820 DfsBuilderUtility.replicate(builder); 821 return builder.build(); 822 } 823 824 /** 825 * Attempt to use builder API via reflection to create a file with the given parameters and 826 * replication enabled. 827 * <p/> 828 * Will not attempt to enable replication when passed an HFileSystem. 829 */ 830 public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwrite, 831 int bufferSize, short replication, long blockSize, boolean isRecursive) throws IOException { 832 FSDataOutputStreamBuilder<?, ?> builder = fs.createFile(path).overwrite(overwrite) 833 .bufferSize(bufferSize).replication(replication).blockSize(blockSize); 834 if (isRecursive) { 835 builder.recursive(); 836 } 837 DfsBuilderUtility.replicate(builder); 838 return builder.build(); 839 } 840 841 /** 842 * Helper exception for those cases where the place where we need to check a stream capability 843 * is not where we have the needed context to explain the impact and mitigation for a lack. 844 */ 845 public static class StreamLacksCapabilityException extends Exception { 846 public StreamLacksCapabilityException(String message, Throwable cause) { 847 super(message, cause); 848 } 849 public StreamLacksCapabilityException(String message) { 850 super(message); 851 } 852 } 853}