001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.util; 020 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.lang.reflect.InvocationTargetException; 024import java.lang.reflect.Method; 025import java.net.URI; 026import java.net.URISyntaxException; 027import java.util.List; 028import java.util.Locale; 029import java.util.Map; 030import java.util.concurrent.ConcurrentHashMap; 031import org.apache.hadoop.HadoopIllegalArgumentException; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FSDataOutputStream; 034import org.apache.hadoop.fs.FSDataOutputStreamBuilder; 035import org.apache.hadoop.fs.FileStatus; 036import org.apache.hadoop.fs.FileSystem; 037import org.apache.hadoop.fs.LocatedFileStatus; 038import org.apache.hadoop.fs.Path; 039import org.apache.hadoop.fs.PathFilter; 040import org.apache.hadoop.fs.RemoteIterator; 041import org.apache.hadoop.fs.permission.FsPermission; 042import org.apache.hadoop.hbase.HConstants; 043import org.apache.hadoop.hbase.TableName; 044import org.apache.hadoop.ipc.RemoteException; 045import org.apache.yetus.audience.InterfaceAudience; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 050import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 051 052/** 053 * Utility methods for interacting with the underlying file system. 054 * <p/> 055 * Note that {@link #setStoragePolicy(FileSystem, Path, String)} is tested in TestFSUtils and 056 * pre-commit will run the hbase-server tests if there's code change in this class. See 057 * <a href="https://issues.apache.org/jira/browse/HBASE-20838">HBASE-20838</a> for more details. 058 */ 059@InterfaceAudience.Private 060public final class CommonFSUtils { 061 private static final Logger LOG = LoggerFactory.getLogger(CommonFSUtils.class); 062 063 /** Parameter name for HBase WAL directory */ 064 public static final String HBASE_WAL_DIR = "hbase.wal.dir"; 065 066 /** Parameter to disable stream capability enforcement checks */ 067 public static final String UNSAFE_STREAM_CAPABILITY_ENFORCE = 068 "hbase.unsafe.stream.capability.enforce"; 069 070 /** Full access permissions (starting point for a umask) */ 071 public static final String FULL_RWX_PERMISSIONS = "777"; 072 073 private CommonFSUtils() { 074 } 075 076 /** 077 * Compare of path component. Does not consider schema; i.e. if schemas 078 * different but <code>path</code> starts with <code>rootPath</code>, 079 * then the function returns true 080 * @param rootPath value to check for 081 * @param path subject to check 082 * @return True if <code>path</code> starts with <code>rootPath</code> 083 */ 084 public static boolean isStartingWithPath(final Path rootPath, final String path) { 085 String uriRootPath = rootPath.toUri().getPath(); 086 String tailUriPath = (new Path(path)).toUri().getPath(); 087 return tailUriPath.startsWith(uriRootPath); 088 } 089 090 /** 091 * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the 092 * '/a/b/c' part. Does not consider schema; i.e. if schemas different but path or subpath matches, 093 * the two will equate. 094 * @param pathToSearch Path we will be trying to match against. 095 * @param pathTail what to match 096 * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code> 097 */ 098 public static boolean isMatchingTail(final Path pathToSearch, String pathTail) { 099 return isMatchingTail(pathToSearch, new Path(pathTail)); 100 } 101 102 /** 103 * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the 104 * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true. Does not consider 105 * schema; i.e. if schemas different but path or subpath matches, the two will equate. 106 * @param pathToSearch Path we will be trying to match agains against 107 * @param pathTail what to match 108 * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code> 109 */ 110 public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) { 111 if (pathToSearch.depth() != pathTail.depth()) { 112 return false; 113 } 114 Path tailPath = pathTail; 115 String tailName; 116 Path toSearch = pathToSearch; 117 String toSearchName; 118 boolean result = false; 119 do { 120 tailName = tailPath.getName(); 121 if (tailName == null || tailName.length() <= 0) { 122 result = true; 123 break; 124 } 125 toSearchName = toSearch.getName(); 126 if (toSearchName == null || toSearchName.length() <= 0) { 127 break; 128 } 129 // Move up a parent on each path for next go around. Path doesn't let us go off the end. 130 tailPath = tailPath.getParent(); 131 toSearch = toSearch.getParent(); 132 } while(tailName.equals(toSearchName)); 133 return result; 134 } 135 136 /** 137 * Delete if exists. 138 * @param fs filesystem object 139 * @param dir directory to delete 140 * @return True if deleted <code>dir</code> 141 * @throws IOException e 142 */ 143 public static boolean deleteDirectory(final FileSystem fs, final Path dir) throws IOException { 144 return fs.exists(dir) && fs.delete(dir, true); 145 } 146 147 /** 148 * Return the number of bytes that large input files should be optimally 149 * be split into to minimize i/o time. 150 * 151 * @param fs filesystem object 152 * @return the default block size for the path's filesystem 153 */ 154 public static long getDefaultBlockSize(final FileSystem fs, final Path path) { 155 return fs.getDefaultBlockSize(path); 156 } 157 158 /* 159 * Get the default replication. 160 * 161 * @param fs filesystem object 162 * @param f path of file 163 * @return default replication for the path's filesystem 164 */ 165 public static short getDefaultReplication(final FileSystem fs, final Path path) { 166 return fs.getDefaultReplication(path); 167 } 168 169 /** 170 * Returns the default buffer size to use during writes. 171 * 172 * The size of the buffer should probably be a multiple of hardware 173 * page size (4096 on Intel x86), and it determines how much data is 174 * buffered during read and write operations. 175 * 176 * @param fs filesystem object 177 * @return default buffer size to use during writes 178 */ 179 public static int getDefaultBufferSize(final FileSystem fs) { 180 return fs.getConf().getInt("io.file.buffer.size", 4096); 181 } 182 183 /** 184 * Create the specified file on the filesystem. By default, this will: 185 * <ol> 186 * <li>apply the umask in the configuration (if it is enabled)</li> 187 * <li>use the fs configured buffer size (or 4096 if not set)</li> 188 * <li>use the default replication</li> 189 * <li>use the default block size</li> 190 * <li>not track progress</li> 191 * </ol> 192 * 193 * @param fs {@link FileSystem} on which to write the file 194 * @param path {@link Path} to the file to write 195 * @param perm intial permissions 196 * @param overwrite Whether or not the created file should be overwritten. 197 * @return output stream to the created file 198 * @throws IOException if the file cannot be created 199 */ 200 public static FSDataOutputStream create(FileSystem fs, Path path, 201 FsPermission perm, boolean overwrite) throws IOException { 202 if (LOG.isTraceEnabled()) { 203 LOG.trace("Creating file={} with permission={}, overwrite={}", path, perm, overwrite); 204 } 205 return fs.create(path, perm, overwrite, getDefaultBufferSize(fs), 206 getDefaultReplication(fs, path), getDefaultBlockSize(fs, path), null); 207 } 208 209 /** 210 * Get the file permissions specified in the configuration, if they are 211 * enabled. 212 * 213 * @param fs filesystem that the file will be created on. 214 * @param conf configuration to read for determining if permissions are 215 * enabled and which to use 216 * @param permssionConfKey property key in the configuration to use when 217 * finding the permission 218 * @return the permission to use when creating a new file on the fs. If 219 * special permissions are not specified in the configuration, then 220 * the default permissions on the the fs will be returned. 221 */ 222 public static FsPermission getFilePermissions(final FileSystem fs, 223 final Configuration conf, final String permssionConfKey) { 224 boolean enablePermissions = conf.getBoolean( 225 HConstants.ENABLE_DATA_FILE_UMASK, false); 226 227 if (enablePermissions) { 228 try { 229 FsPermission perm = new FsPermission(FULL_RWX_PERMISSIONS); 230 // make sure that we have a mask, if not, go default. 231 String mask = conf.get(permssionConfKey); 232 if (mask == null) { 233 return FsPermission.getFileDefault(); 234 } 235 // appy the umask 236 FsPermission umask = new FsPermission(mask); 237 return perm.applyUMask(umask); 238 } catch (IllegalArgumentException e) { 239 LOG.warn( 240 "Incorrect umask attempted to be created: " 241 + conf.get(permssionConfKey) 242 + ", using default file permissions.", e); 243 return FsPermission.getFileDefault(); 244 } 245 } 246 return FsPermission.getFileDefault(); 247 } 248 249 /** 250 * Verifies root directory path is a valid URI with a scheme 251 * 252 * @param root root directory path 253 * @return Passed <code>root</code> argument. 254 * @throws IOException if not a valid URI with a scheme 255 */ 256 public static Path validateRootPath(Path root) throws IOException { 257 try { 258 URI rootURI = new URI(root.toString()); 259 String scheme = rootURI.getScheme(); 260 if (scheme == null) { 261 throw new IOException("Root directory does not have a scheme"); 262 } 263 return root; 264 } catch (URISyntaxException e) { 265 throw new IOException("Root directory path is not a valid " + 266 "URI -- check your " + HConstants.HBASE_DIR + " configuration", e); 267 } 268 } 269 270 /** 271 * Checks for the presence of the WAL log root path (using the provided conf object) in the given 272 * path. If it exists, this method removes it and returns the String representation of remaining 273 * relative path. 274 * @param path must not be null 275 * @param conf must not be null 276 * @return String representation of the remaining relative path 277 * @throws IOException from underlying filesystem 278 */ 279 public static String removeWALRootPath(Path path, final Configuration conf) throws IOException { 280 Path root = getWALRootDir(conf); 281 String pathStr = path.toString(); 282 // check that the path is absolute... it has the root path in it. 283 if (!pathStr.startsWith(root.toString())) { 284 return pathStr; 285 } 286 // if not, return as it is. 287 return pathStr.substring(root.toString().length() + 1);// remove the "/" too. 288 } 289 290 /** 291 * Return the 'path' component of a Path. In Hadoop, Path is a URI. This 292 * method returns the 'path' component of a Path's URI: e.g. If a Path is 293 * <code>hdfs://example.org:9000/hbase_trunk/TestTable/compaction.dir</code>, 294 * this method returns <code>/hbase_trunk/TestTable/compaction.dir</code>. 295 * This method is useful if you want to print out a Path without qualifying 296 * Filesystem instance. 297 * @param p Filesystem Path whose 'path' component we are to return. 298 * @return Path portion of the Filesystem 299 */ 300 public static String getPath(Path p) { 301 return p.toUri().getPath(); 302 } 303 304 /** 305 * @param c configuration 306 * @return {@link Path} to hbase root directory from 307 * configuration as a qualified Path. 308 * @throws IOException e 309 */ 310 public static Path getRootDir(final Configuration c) throws IOException { 311 Path p = new Path(c.get(HConstants.HBASE_DIR)); 312 FileSystem fs = p.getFileSystem(c); 313 return p.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 314 } 315 316 public static void setRootDir(final Configuration c, final Path root) { 317 c.set(HConstants.HBASE_DIR, root.toString()); 318 } 319 320 public static void setFsDefault(final Configuration c, final Path root) { 321 c.set("fs.defaultFS", root.toString()); // for hadoop 0.21+ 322 } 323 324 public static FileSystem getRootDirFileSystem(final Configuration c) throws IOException { 325 Path p = getRootDir(c); 326 return p.getFileSystem(c); 327 } 328 329 /** 330 * @param c configuration 331 * @return {@link Path} to hbase log root directory: e.g. {@value HBASE_WAL_DIR} from 332 * configuration as a qualified Path. Defaults to HBase root dir. 333 * @throws IOException e 334 */ 335 public static Path getWALRootDir(final Configuration c) throws IOException { 336 337 Path p = new Path(c.get(HBASE_WAL_DIR, c.get(HConstants.HBASE_DIR))); 338 if (!isValidWALRootDir(p, c)) { 339 return getRootDir(c); 340 } 341 FileSystem fs = p.getFileSystem(c); 342 return p.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 343 } 344 345 @VisibleForTesting 346 public static void setWALRootDir(final Configuration c, final Path root) { 347 c.set(HBASE_WAL_DIR, root.toString()); 348 } 349 350 public static FileSystem getWALFileSystem(final Configuration c) throws IOException { 351 Path p = getWALRootDir(c); 352 FileSystem fs = p.getFileSystem(c); 353 // hadoop-core does fs caching, so need to propagate this if set 354 String enforceStreamCapability = c.get(UNSAFE_STREAM_CAPABILITY_ENFORCE); 355 if (enforceStreamCapability != null) { 356 fs.getConf().set(UNSAFE_STREAM_CAPABILITY_ENFORCE, enforceStreamCapability); 357 } 358 return fs; 359 } 360 361 private static boolean isValidWALRootDir(Path walDir, final Configuration c) throws IOException { 362 Path rootDir = getRootDir(c); 363 FileSystem fs = walDir.getFileSystem(c); 364 Path qualifiedWalDir = walDir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 365 if (!qualifiedWalDir.equals(rootDir)) { 366 if (qualifiedWalDir.toString().startsWith(rootDir.toString() + "/")) { 367 throw new IllegalStateException("Illegal WAL directory specified. " + 368 "WAL directories are not permitted to be under the root directory if set."); 369 } 370 } 371 return true; 372 } 373 374 /** 375 * Returns the WAL region directory based on the given table name and region name 376 * @param conf configuration to determine WALRootDir 377 * @param tableName Table that the region is under 378 * @param encodedRegionName Region name used for creating the final region directory 379 * @return the region directory used to store WALs under the WALRootDir 380 * @throws IOException if there is an exception determining the WALRootDir 381 */ 382 public static Path getWALRegionDir(final Configuration conf, final TableName tableName, 383 final String encodedRegionName) throws IOException { 384 return new Path(getWALTableDir(conf, tableName), encodedRegionName); 385 } 386 387 /** 388 * Returns the Table directory under the WALRootDir for the specified table name 389 * @param conf configuration used to get the WALRootDir 390 * @param tableName Table to get the directory for 391 * @return a path to the WAL table directory for the specified table 392 * @throws IOException if there is an exception determining the WALRootDir 393 */ 394 public static Path getWALTableDir(final Configuration conf, final TableName tableName) 395 throws IOException { 396 Path baseDir = new Path(getWALRootDir(conf), HConstants.BASE_NAMESPACE_DIR); 397 return new Path(new Path(baseDir, tableName.getNamespaceAsString()), 398 tableName.getQualifierAsString()); 399 } 400 401 /** 402 * For backward compatibility with HBASE-20734, where we store recovered edits in a wrong 403 * directory without BASE_NAMESPACE_DIR. See HBASE-22617 for more details. 404 * @deprecated For compatibility, will be removed in 4.0.0. 405 */ 406 @Deprecated 407 public static Path getWrongWALRegionDir(final Configuration conf, final TableName tableName, 408 final String encodedRegionName) throws IOException { 409 Path wrongTableDir = new Path(new Path(getWALRootDir(conf), tableName.getNamespaceAsString()), 410 tableName.getQualifierAsString()); 411 return new Path(wrongTableDir, encodedRegionName); 412 } 413 414 /** 415 * Returns the {@link org.apache.hadoop.fs.Path} object representing the table directory under 416 * path rootdir 417 * 418 * @param rootdir qualified path of HBase root directory 419 * @param tableName name of table 420 * @return {@link org.apache.hadoop.fs.Path} for table 421 */ 422 public static Path getTableDir(Path rootdir, final TableName tableName) { 423 return new Path(getNamespaceDir(rootdir, tableName.getNamespaceAsString()), 424 tableName.getQualifierAsString()); 425 } 426 427 /** 428 * Returns the {@link org.apache.hadoop.fs.Path} object representing the region directory under 429 * path rootdir 430 * 431 * @param rootdir qualified path of HBase root directory 432 * @param tableName name of table 433 * @param regionName The encoded region name 434 * @return {@link org.apache.hadoop.fs.Path} for region 435 */ 436 public static Path getRegionDir(Path rootdir, TableName tableName, String regionName) { 437 return new Path(getTableDir(rootdir, tableName), regionName); 438 } 439 440 /** 441 * Returns the {@link org.apache.hadoop.hbase.TableName} object representing 442 * the table directory under 443 * path rootdir 444 * 445 * @param tablePath path of table 446 * @return {@link org.apache.hadoop.fs.Path} for table 447 */ 448 public static TableName getTableName(Path tablePath) { 449 return TableName.valueOf(tablePath.getParent().getName(), tablePath.getName()); 450 } 451 452 /** 453 * Returns the {@link org.apache.hadoop.fs.Path} object representing 454 * the namespace directory under path rootdir 455 * 456 * @param rootdir qualified path of HBase root directory 457 * @param namespace namespace name 458 * @return {@link org.apache.hadoop.fs.Path} for table 459 */ 460 public static Path getNamespaceDir(Path rootdir, final String namespace) { 461 return new Path(rootdir, new Path(HConstants.BASE_NAMESPACE_DIR, 462 new Path(namespace))); 463 } 464 465 // this mapping means that under a federated FileSystem implementation, we'll 466 // only log the first failure from any of the underlying FileSystems at WARN and all others 467 // will be at DEBUG. 468 private static final Map<FileSystem, Boolean> warningMap = new ConcurrentHashMap<>(); 469 470 /** 471 * Sets storage policy for given path. 472 * If the passed path is a directory, we'll set the storage policy for all files 473 * created in the future in said directory. Note that this change in storage 474 * policy takes place at the FileSystem level; it will persist beyond this RS's lifecycle. 475 * If we're running on a version of FileSystem that doesn't support the given storage policy 476 * (or storage policies at all), then we'll issue a log message and continue. 477 * 478 * See http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html 479 * 480 * @param fs We only do anything it implements a setStoragePolicy method 481 * @param path the Path whose storage policy is to be set 482 * @param storagePolicy Policy to set on <code>path</code>; see hadoop 2.6+ 483 * org.apache.hadoop.hdfs.protocol.HdfsConstants for possible list e.g 484 * 'COLD', 'WARM', 'HOT', 'ONE_SSD', 'ALL_SSD', 'LAZY_PERSIST'. 485 */ 486 public static void setStoragePolicy(final FileSystem fs, final Path path, 487 final String storagePolicy) { 488 try { 489 setStoragePolicy(fs, path, storagePolicy, false); 490 } catch (IOException e) { 491 // should never arrive here 492 LOG.warn("We have chosen not to throw exception but some unexpectedly thrown out", e); 493 } 494 } 495 496 static void setStoragePolicy(final FileSystem fs, final Path path, final String storagePolicy, 497 boolean throwException) throws IOException { 498 if (storagePolicy == null) { 499 if (LOG.isTraceEnabled()) { 500 LOG.trace("We were passed a null storagePolicy, exiting early."); 501 } 502 return; 503 } 504 String trimmedStoragePolicy = storagePolicy.trim(); 505 if (trimmedStoragePolicy.isEmpty()) { 506 LOG.trace("We were passed an empty storagePolicy, exiting early."); 507 return; 508 } else { 509 trimmedStoragePolicy = trimmedStoragePolicy.toUpperCase(Locale.ROOT); 510 } 511 if (trimmedStoragePolicy.equals(HConstants.DEFER_TO_HDFS_STORAGE_POLICY)) { 512 LOG.trace("We were passed the defer-to-hdfs policy {}, exiting early.", trimmedStoragePolicy); 513 return; 514 } 515 try { 516 invokeSetStoragePolicy(fs, path, trimmedStoragePolicy); 517 } catch (IOException e) { 518 LOG.trace("Failed to invoke set storage policy API on FS", e); 519 if (throwException) { 520 throw e; 521 } 522 } 523 } 524 525 /* 526 * All args have been checked and are good. Run the setStoragePolicy invocation. 527 */ 528 private static void invokeSetStoragePolicy(final FileSystem fs, final Path path, 529 final String storagePolicy) throws IOException { 530 Exception toThrow = null; 531 532 try { 533 fs.setStoragePolicy(path, storagePolicy); 534 LOG.debug("Set storagePolicy={} for path={}", storagePolicy, path); 535 } catch (Exception e) { 536 toThrow = e; 537 // This swallows FNFE, should we be throwing it? seems more likely to indicate dev 538 // misuse than a runtime problem with HDFS. 539 if (!warningMap.containsKey(fs)) { 540 warningMap.put(fs, true); 541 LOG.warn("Unable to set storagePolicy=" + storagePolicy + " for path=" + path + ". " + 542 "DEBUG log level might have more details.", e); 543 } else if (LOG.isDebugEnabled()) { 544 LOG.debug("Unable to set storagePolicy=" + storagePolicy + " for path=" + path, e); 545 } 546 547 // Hadoop 2.8+, 3.0-a1+ added FileSystem.setStoragePolicy with a default implementation 548 // that throws UnsupportedOperationException 549 if (e instanceof UnsupportedOperationException) { 550 if (LOG.isDebugEnabled()) { 551 LOG.debug("The underlying FileSystem implementation doesn't support " + 552 "setStoragePolicy. This is probably intentional on their part, since HDFS-9345 " + 553 "appears to be present in your version of Hadoop. For more information check " + 554 "the Hadoop documentation on 'ArchivalStorage', the Hadoop FileSystem " + 555 "specification docs from HADOOP-11981, and/or related documentation from the " + 556 "provider of the underlying FileSystem (its name should appear in the " + 557 "stacktrace that accompanies this message). Note in particular that Hadoop's " + 558 "local filesystem implementation doesn't support storage policies.", e); 559 } 560 } 561 } 562 563 if (toThrow != null) { 564 throw new IOException(toThrow); 565 } 566 } 567 568 /** 569 * @param conf must not be null 570 * @return True if this filesystem whose scheme is 'hdfs'. 571 * @throws IOException from underlying FileSystem 572 */ 573 public static boolean isHDFS(final Configuration conf) throws IOException { 574 FileSystem fs = FileSystem.get(conf); 575 String scheme = fs.getUri().getScheme(); 576 return scheme.equalsIgnoreCase("hdfs"); 577 } 578 579 /** 580 * Checks if the given path is the one with 'recovered.edits' dir. 581 * @param path must not be null 582 * @return True if we recovered edits 583 */ 584 public static boolean isRecoveredEdits(Path path) { 585 return path.toString().contains(HConstants.RECOVERED_EDITS_DIR); 586 } 587 588 /** 589 * @param conf must not be null 590 * @return Returns the filesystem of the hbase rootdir. 591 * @throws IOException from underlying FileSystem 592 */ 593 public static FileSystem getCurrentFileSystem(Configuration conf) throws IOException { 594 return getRootDir(conf).getFileSystem(conf); 595 } 596 597 /** 598 * Calls fs.listStatus() and treats FileNotFoundException as non-fatal 599 * This accommodates differences between hadoop versions, where hadoop 1 600 * does not throw a FileNotFoundException, and return an empty FileStatus[] 601 * while Hadoop 2 will throw FileNotFoundException. 602 * 603 * Where possible, prefer FSUtils#listStatusWithStatusFilter(FileSystem, 604 * Path, FileStatusFilter) instead. 605 * 606 * @param fs file system 607 * @param dir directory 608 * @param filter path filter 609 * @return null if dir is empty or doesn't exist, otherwise FileStatus array 610 */ 611 public static FileStatus[] listStatus(final FileSystem fs, 612 final Path dir, final PathFilter filter) throws IOException { 613 FileStatus [] status = null; 614 try { 615 status = filter == null ? fs.listStatus(dir) : fs.listStatus(dir, filter); 616 } catch (FileNotFoundException fnfe) { 617 // if directory doesn't exist, return null 618 if (LOG.isTraceEnabled()) { 619 LOG.trace("{} doesn't exist", dir); 620 } 621 } 622 if (status == null || status.length < 1) { 623 return null; 624 } 625 return status; 626 } 627 628 /** 629 * Calls fs.listStatus() and treats FileNotFoundException as non-fatal 630 * This would accommodates differences between hadoop versions 631 * 632 * @param fs file system 633 * @param dir directory 634 * @return null if dir is empty or doesn't exist, otherwise FileStatus array 635 */ 636 public static FileStatus[] listStatus(final FileSystem fs, final Path dir) throws IOException { 637 return listStatus(fs, dir, null); 638 } 639 640 /** 641 * Calls fs.listFiles() to get FileStatus and BlockLocations together for reducing rpc call 642 * 643 * @param fs file system 644 * @param dir directory 645 * @return LocatedFileStatus list 646 */ 647 public static List<LocatedFileStatus> listLocatedStatus(final FileSystem fs, 648 final Path dir) throws IOException { 649 List<LocatedFileStatus> status = null; 650 try { 651 RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fs 652 .listFiles(dir, false); 653 while (locatedFileStatusRemoteIterator.hasNext()) { 654 if (status == null) { 655 status = Lists.newArrayList(); 656 } 657 status.add(locatedFileStatusRemoteIterator.next()); 658 } 659 } catch (FileNotFoundException fnfe) { 660 // if directory doesn't exist, return null 661 if (LOG.isTraceEnabled()) { 662 LOG.trace("{} doesn't exist", dir); 663 } 664 } 665 return status; 666 } 667 668 /** 669 * Calls fs.delete() and returns the value returned by the fs.delete() 670 * 671 * @param fs must not be null 672 * @param path must not be null 673 * @param recursive delete tree rooted at path 674 * @return the value returned by the fs.delete() 675 * @throws IOException from underlying FileSystem 676 */ 677 public static boolean delete(final FileSystem fs, final Path path, final boolean recursive) 678 throws IOException { 679 return fs.delete(path, recursive); 680 } 681 682 /** 683 * Calls fs.exists(). Checks if the specified path exists 684 * 685 * @param fs must not be null 686 * @param path must not be null 687 * @return the value returned by fs.exists() 688 * @throws IOException from underlying FileSystem 689 */ 690 public static boolean isExists(final FileSystem fs, final Path path) throws IOException { 691 return fs.exists(path); 692 } 693 694 /** 695 * Log the current state of the filesystem from a certain root directory 696 * @param fs filesystem to investigate 697 * @param root root file/directory to start logging from 698 * @param log log to output information 699 * @throws IOException if an unexpected exception occurs 700 */ 701 public static void logFileSystemState(final FileSystem fs, final Path root, Logger log) 702 throws IOException { 703 log.debug("File system contents for path {}", root); 704 logFSTree(log, fs, root, "|-"); 705 } 706 707 /** 708 * Recursive helper to log the state of the FS 709 * 710 * @see #logFileSystemState(FileSystem, Path, Logger) 711 */ 712 private static void logFSTree(Logger log, final FileSystem fs, final Path root, String prefix) 713 throws IOException { 714 FileStatus[] files = listStatus(fs, root, null); 715 if (files == null) { 716 return; 717 } 718 719 for (FileStatus file : files) { 720 if (file.isDirectory()) { 721 log.debug(prefix + file.getPath().getName() + "/"); 722 logFSTree(log, fs, file.getPath(), prefix + "---"); 723 } else { 724 log.debug(prefix + file.getPath().getName()); 725 } 726 } 727 } 728 729 public static boolean renameAndSetModifyTime(final FileSystem fs, final Path src, final Path dest) 730 throws IOException { 731 // set the modify time for TimeToLive Cleaner 732 fs.setTimes(src, EnvironmentEdgeManager.currentTime(), -1); 733 return fs.rename(src, dest); 734 } 735 736 /** 737 * Check if short circuit read buffer size is set and if not, set it to hbase value. 738 * @param conf must not be null 739 */ 740 public static void checkShortCircuitReadBufferSize(final Configuration conf) { 741 final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2; 742 final int notSet = -1; 743 // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2 744 final String dfsKey = "dfs.client.read.shortcircuit.buffer.size"; 745 int size = conf.getInt(dfsKey, notSet); 746 // If a size is set, return -- we will use it. 747 if (size != notSet) { 748 return; 749 } 750 // But short circuit buffer size is normally not set. Put in place the hbase wanted size. 751 int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize); 752 conf.setIfUnset(dfsKey, Integer.toString(hbaseSize)); 753 } 754 755 private static final class DfsBuilderUtility { 756 private static final Class<?> BUILDER; 757 private static final Method REPLICATE; 758 759 static { 760 String builderName = "org.apache.hadoop.hdfs.DistributedFileSystem$HdfsDataOutputStreamBuilder"; 761 Class<?> builderClass = null; 762 try { 763 builderClass = Class.forName(builderName); 764 } catch (ClassNotFoundException e) { 765 LOG.debug("{} not available, will not set replicate when creating output stream", builderName); 766 } 767 Method replicateMethod = null; 768 if (builderClass != null) { 769 try { 770 replicateMethod = builderClass.getMethod("replicate"); 771 LOG.debug("Using builder API via reflection for DFS file creation."); 772 } catch (NoSuchMethodException e) { 773 LOG.debug("Could not find replicate method on builder; will not set replicate when" + 774 " creating output stream", e); 775 } 776 } 777 BUILDER = builderClass; 778 REPLICATE = replicateMethod; 779 } 780 781 /** 782 * Attempt to use builder API via reflection to call the replicate method on the given builder. 783 */ 784 static void replicate(FSDataOutputStreamBuilder<?, ?> builder) { 785 if (BUILDER != null && REPLICATE != null && BUILDER.isAssignableFrom(builder.getClass())) { 786 try { 787 REPLICATE.invoke(builder); 788 } catch (IllegalAccessException | InvocationTargetException e) { 789 // Should have caught this failure during initialization, so log full trace here 790 LOG.warn("Couldn't use reflection with builder API", e); 791 } 792 } 793 } 794 } 795 796 /** 797 * Attempt to use builder API via reflection to create a file with the given parameters and 798 * replication enabled. 799 * <p/> 800 * Will not attempt to enable replication when passed an HFileSystem. 801 */ 802 public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwrite) 803 throws IOException { 804 FSDataOutputStreamBuilder<?, ?> builder = fs.createFile(path).overwrite(overwrite); 805 DfsBuilderUtility.replicate(builder); 806 return builder.build(); 807 } 808 809 /** 810 * Attempt to use builder API via reflection to create a file with the given parameters and 811 * replication enabled. 812 * <p/> 813 * Will not attempt to enable replication when passed an HFileSystem. 814 */ 815 public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwrite, 816 int bufferSize, short replication, long blockSize, boolean isRecursive) throws IOException { 817 FSDataOutputStreamBuilder<?, ?> builder = fs.createFile(path).overwrite(overwrite) 818 .bufferSize(bufferSize).replication(replication).blockSize(blockSize); 819 if (isRecursive) { 820 builder.recursive(); 821 } 822 DfsBuilderUtility.replicate(builder); 823 return builder.build(); 824 } 825 826 /** 827 * Helper exception for those cases where the place where we need to check a stream capability 828 * is not where we have the needed context to explain the impact and mitigation for a lack. 829 */ 830 public static class StreamLacksCapabilityException extends Exception { 831 public StreamLacksCapabilityException(String message, Throwable cause) { 832 super(message, cause); 833 } 834 public StreamLacksCapabilityException(String message) { 835 super(message); 836 } 837 } 838}