001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.util; 020 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.lang.reflect.InvocationTargetException; 024import java.lang.reflect.Method; 025import java.net.URI; 026import java.net.URISyntaxException; 027import java.util.List; 028import java.util.Locale; 029import java.util.Map; 030import java.util.concurrent.ConcurrentHashMap; 031import org.apache.hadoop.HadoopIllegalArgumentException; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FSDataOutputStream; 034import org.apache.hadoop.fs.FileStatus; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.LocatedFileStatus; 037import org.apache.hadoop.fs.Path; 038import org.apache.hadoop.fs.PathFilter; 039import org.apache.hadoop.fs.RemoteIterator; 040import org.apache.hadoop.fs.permission.FsPermission; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.TableName; 043import org.apache.hadoop.ipc.RemoteException; 044import org.apache.yetus.audience.InterfaceAudience; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 049import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 050 051/** 052 * Utility methods for interacting with the underlying file system. 053 * <p/> 054 * Note that {@link #setStoragePolicy(FileSystem, Path, String)} is tested in TestFSUtils and 055 * pre-commit will run the hbase-server tests if there's code change in this class. See 056 * <a href="https://issues.apache.org/jira/browse/HBASE-20838">HBASE-20838</a> for more details. 057 */ 058@InterfaceAudience.Private 059public abstract class CommonFSUtils { 060 private static final Logger LOG = LoggerFactory.getLogger(CommonFSUtils.class); 061 062 /** Parameter name for HBase WAL directory */ 063 public static final String HBASE_WAL_DIR = "hbase.wal.dir"; 064 065 /** Parameter to disable stream capability enforcement checks */ 066 public static final String UNSAFE_STREAM_CAPABILITY_ENFORCE = 067 "hbase.unsafe.stream.capability.enforce"; 068 069 /** Full access permissions (starting point for a umask) */ 070 public static final String FULL_RWX_PERMISSIONS = "777"; 071 072 protected CommonFSUtils() { 073 super(); 074 } 075 076 /** 077 * Compare of path component. Does not consider schema; i.e. if schemas 078 * different but <code>path</code> starts with <code>rootPath</code>, 079 * then the function returns true 080 * @param rootPath value to check for 081 * @param path subject to check 082 * @return True if <code>path</code> starts with <code>rootPath</code> 083 */ 084 public static boolean isStartingWithPath(final Path rootPath, final String path) { 085 String uriRootPath = rootPath.toUri().getPath(); 086 String tailUriPath = (new Path(path)).toUri().getPath(); 087 return tailUriPath.startsWith(uriRootPath); 088 } 089 090 /** 091 * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the 092 * '/a/b/c' part. Does not consider schema; i.e. if schemas different but path or subpath matches, 093 * the two will equate. 094 * @param pathToSearch Path we will be trying to match against. 095 * @param pathTail what to match 096 * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code> 097 */ 098 public static boolean isMatchingTail(final Path pathToSearch, String pathTail) { 099 return isMatchingTail(pathToSearch, new Path(pathTail)); 100 } 101 102 /** 103 * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the 104 * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true. Does not consider 105 * schema; i.e. if schemas different but path or subpath matches, the two will equate. 106 * @param pathToSearch Path we will be trying to match agains against 107 * @param pathTail what to match 108 * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code> 109 */ 110 public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) { 111 if (pathToSearch.depth() != pathTail.depth()) { 112 return false; 113 } 114 Path tailPath = pathTail; 115 String tailName; 116 Path toSearch = pathToSearch; 117 String toSearchName; 118 boolean result = false; 119 do { 120 tailName = tailPath.getName(); 121 if (tailName == null || tailName.length() <= 0) { 122 result = true; 123 break; 124 } 125 toSearchName = toSearch.getName(); 126 if (toSearchName == null || toSearchName.length() <= 0) { 127 break; 128 } 129 // Move up a parent on each path for next go around. Path doesn't let us go off the end. 130 tailPath = tailPath.getParent(); 131 toSearch = toSearch.getParent(); 132 } while(tailName.equals(toSearchName)); 133 return result; 134 } 135 136 /** 137 * Delete if exists. 138 * @param fs filesystem object 139 * @param dir directory to delete 140 * @return True if deleted <code>dir</code> 141 * @throws IOException e 142 */ 143 public static boolean deleteDirectory(final FileSystem fs, final Path dir) throws IOException { 144 return fs.exists(dir) && fs.delete(dir, true); 145 } 146 147 /** 148 * Return the number of bytes that large input files should be optimally 149 * be split into to minimize i/o time. 150 * 151 * @param fs filesystem object 152 * @return the default block size for the path's filesystem 153 */ 154 public static long getDefaultBlockSize(final FileSystem fs, final Path path) { 155 return fs.getDefaultBlockSize(path); 156 } 157 158 /* 159 * Get the default replication. 160 * 161 * @param fs filesystem object 162 * @param f path of file 163 * @return default replication for the path's filesystem 164 */ 165 public static short getDefaultReplication(final FileSystem fs, final Path path) { 166 return fs.getDefaultReplication(path); 167 } 168 169 /** 170 * Returns the default buffer size to use during writes. 171 * 172 * The size of the buffer should probably be a multiple of hardware 173 * page size (4096 on Intel x86), and it determines how much data is 174 * buffered during read and write operations. 175 * 176 * @param fs filesystem object 177 * @return default buffer size to use during writes 178 */ 179 public static int getDefaultBufferSize(final FileSystem fs) { 180 return fs.getConf().getInt("io.file.buffer.size", 4096); 181 } 182 183 /** 184 * Create the specified file on the filesystem. By default, this will: 185 * <ol> 186 * <li>apply the umask in the configuration (if it is enabled)</li> 187 * <li>use the fs configured buffer size (or 4096 if not set)</li> 188 * <li>use the default replication</li> 189 * <li>use the default block size</li> 190 * <li>not track progress</li> 191 * </ol> 192 * 193 * @param fs {@link FileSystem} on which to write the file 194 * @param path {@link Path} to the file to write 195 * @param perm intial permissions 196 * @param overwrite Whether or not the created file should be overwritten. 197 * @return output stream to the created file 198 * @throws IOException if the file cannot be created 199 */ 200 public static FSDataOutputStream create(FileSystem fs, Path path, 201 FsPermission perm, boolean overwrite) throws IOException { 202 if (LOG.isTraceEnabled()) { 203 LOG.trace("Creating file={} with permission={}, overwrite={}", path, perm, overwrite); 204 } 205 return fs.create(path, perm, overwrite, getDefaultBufferSize(fs), 206 getDefaultReplication(fs, path), getDefaultBlockSize(fs, path), null); 207 } 208 209 /** 210 * Get the file permissions specified in the configuration, if they are 211 * enabled. 212 * 213 * @param fs filesystem that the file will be created on. 214 * @param conf configuration to read for determining if permissions are 215 * enabled and which to use 216 * @param permssionConfKey property key in the configuration to use when 217 * finding the permission 218 * @return the permission to use when creating a new file on the fs. If 219 * special permissions are not specified in the configuration, then 220 * the default permissions on the the fs will be returned. 221 */ 222 public static FsPermission getFilePermissions(final FileSystem fs, 223 final Configuration conf, final String permssionConfKey) { 224 boolean enablePermissions = conf.getBoolean( 225 HConstants.ENABLE_DATA_FILE_UMASK, false); 226 227 if (enablePermissions) { 228 try { 229 FsPermission perm = new FsPermission(FULL_RWX_PERMISSIONS); 230 // make sure that we have a mask, if not, go default. 231 String mask = conf.get(permssionConfKey); 232 if (mask == null) { 233 return FsPermission.getFileDefault(); 234 } 235 // appy the umask 236 FsPermission umask = new FsPermission(mask); 237 return perm.applyUMask(umask); 238 } catch (IllegalArgumentException e) { 239 LOG.warn( 240 "Incorrect umask attempted to be created: " 241 + conf.get(permssionConfKey) 242 + ", using default file permissions.", e); 243 return FsPermission.getFileDefault(); 244 } 245 } 246 return FsPermission.getFileDefault(); 247 } 248 249 /** 250 * Verifies root directory path is a valid URI with a scheme 251 * 252 * @param root root directory path 253 * @return Passed <code>root</code> argument. 254 * @throws IOException if not a valid URI with a scheme 255 */ 256 public static Path validateRootPath(Path root) throws IOException { 257 try { 258 URI rootURI = new URI(root.toString()); 259 String scheme = rootURI.getScheme(); 260 if (scheme == null) { 261 throw new IOException("Root directory does not have a scheme"); 262 } 263 return root; 264 } catch (URISyntaxException e) { 265 IOException io = new IOException("Root directory path is not a valid " + 266 "URI -- check your " + HConstants.HBASE_DIR + " configuration"); 267 io.initCause(e); 268 throw io; 269 } 270 } 271 272 /** 273 * Checks for the presence of the WAL log root path (using the provided conf object) in the given 274 * path. If it exists, this method removes it and returns the String representation of remaining 275 * relative path. 276 * @param path must not be null 277 * @param conf must not be null 278 * @return String representation of the remaining relative path 279 * @throws IOException from underlying filesystem 280 */ 281 public static String removeWALRootPath(Path path, final Configuration conf) throws IOException { 282 Path root = getWALRootDir(conf); 283 String pathStr = path.toString(); 284 // check that the path is absolute... it has the root path in it. 285 if (!pathStr.startsWith(root.toString())) { 286 return pathStr; 287 } 288 // if not, return as it is. 289 return pathStr.substring(root.toString().length() + 1);// remove the "/" too. 290 } 291 292 /** 293 * Return the 'path' component of a Path. In Hadoop, Path is a URI. This 294 * method returns the 'path' component of a Path's URI: e.g. If a Path is 295 * <code>hdfs://example.org:9000/hbase_trunk/TestTable/compaction.dir</code>, 296 * this method returns <code>/hbase_trunk/TestTable/compaction.dir</code>. 297 * This method is useful if you want to print out a Path without qualifying 298 * Filesystem instance. 299 * @param p Filesystem Path whose 'path' component we are to return. 300 * @return Path portion of the Filesystem 301 */ 302 public static String getPath(Path p) { 303 return p.toUri().getPath(); 304 } 305 306 /** 307 * @param c configuration 308 * @return {@link Path} to hbase root directory from 309 * configuration as a qualified Path. 310 * @throws IOException e 311 */ 312 public static Path getRootDir(final Configuration c) throws IOException { 313 Path p = new Path(c.get(HConstants.HBASE_DIR)); 314 FileSystem fs = p.getFileSystem(c); 315 return p.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 316 } 317 318 public static void setRootDir(final Configuration c, final Path root) { 319 c.set(HConstants.HBASE_DIR, root.toString()); 320 } 321 322 public static void setFsDefault(final Configuration c, final Path root) { 323 c.set("fs.defaultFS", root.toString()); // for hadoop 0.21+ 324 } 325 326 public static FileSystem getRootDirFileSystem(final Configuration c) throws IOException { 327 Path p = getRootDir(c); 328 return p.getFileSystem(c); 329 } 330 331 /** 332 * @param c configuration 333 * @return {@link Path} to hbase log root directory: e.g. {@value HBASE_WAL_DIR} from 334 * configuration as a qualified Path. Defaults to HBase root dir. 335 * @throws IOException e 336 */ 337 public static Path getWALRootDir(final Configuration c) throws IOException { 338 339 Path p = new Path(c.get(HBASE_WAL_DIR, c.get(HConstants.HBASE_DIR))); 340 if (!isValidWALRootDir(p, c)) { 341 return getRootDir(c); 342 } 343 FileSystem fs = p.getFileSystem(c); 344 return p.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 345 } 346 347 @VisibleForTesting 348 public static void setWALRootDir(final Configuration c, final Path root) { 349 c.set(HBASE_WAL_DIR, root.toString()); 350 } 351 352 public static FileSystem getWALFileSystem(final Configuration c) throws IOException { 353 Path p = getWALRootDir(c); 354 FileSystem fs = p.getFileSystem(c); 355 // hadoop-core does fs caching, so need to propogate this if set 356 String enforceStreamCapability = c.get(UNSAFE_STREAM_CAPABILITY_ENFORCE); 357 if (enforceStreamCapability != null) { 358 fs.getConf().set(UNSAFE_STREAM_CAPABILITY_ENFORCE, enforceStreamCapability); 359 } 360 return fs; 361 } 362 363 private static boolean isValidWALRootDir(Path walDir, final Configuration c) throws IOException { 364 Path rootDir = getRootDir(c); 365 FileSystem fs = walDir.getFileSystem(c); 366 Path qualifiedWalDir = walDir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 367 if (!qualifiedWalDir.equals(rootDir)) { 368 if (qualifiedWalDir.toString().startsWith(rootDir.toString() + "/")) { 369 throw new IllegalStateException("Illegal WAL directory specified. " + 370 "WAL directories are not permitted to be under the root directory if set."); 371 } 372 } 373 return true; 374 } 375 376 /** 377 * Returns the WAL region directory based on the given table name and region name 378 * @param conf configuration to determine WALRootDir 379 * @param tableName Table that the region is under 380 * @param encodedRegionName Region name used for creating the final region directory 381 * @return the region directory used to store WALs under the WALRootDir 382 * @throws IOException if there is an exception determining the WALRootDir 383 */ 384 public static Path getWALRegionDir(final Configuration conf, final TableName tableName, 385 final String encodedRegionName) throws IOException { 386 return new Path(getWALTableDir(conf, tableName), encodedRegionName); 387 } 388 389 /** 390 * Returns the Table directory under the WALRootDir for the specified table name 391 * @param conf configuration used to get the WALRootDir 392 * @param tableName Table to get the directory for 393 * @return a path to the WAL table directory for the specified table 394 * @throws IOException if there is an exception determining the WALRootDir 395 */ 396 public static Path getWALTableDir(final Configuration conf, final TableName tableName) 397 throws IOException { 398 Path baseDir = new Path(getWALRootDir(conf), HConstants.BASE_NAMESPACE_DIR); 399 return new Path(new Path(baseDir, tableName.getNamespaceAsString()), 400 tableName.getQualifierAsString()); 401 } 402 403 /** 404 * For backward compatibility with HBASE-20734, where we store recovered edits in a wrong 405 * directory without BASE_NAMESPACE_DIR. See HBASE-22617 for more details. 406 * @deprecated For compatibility, will be removed in 4.0.0. 407 */ 408 @Deprecated 409 public static Path getWrongWALRegionDir(final Configuration conf, final TableName tableName, 410 final String encodedRegionName) throws IOException { 411 Path wrongTableDir = new Path(new Path(getWALRootDir(conf), tableName.getNamespaceAsString()), 412 tableName.getQualifierAsString()); 413 return new Path(wrongTableDir, encodedRegionName); 414 } 415 416 /** 417 * Returns the {@link org.apache.hadoop.fs.Path} object representing the table directory under 418 * path rootdir 419 * 420 * @param rootdir qualified path of HBase root directory 421 * @param tableName name of table 422 * @return {@link org.apache.hadoop.fs.Path} for table 423 */ 424 public static Path getTableDir(Path rootdir, final TableName tableName) { 425 return new Path(getNamespaceDir(rootdir, tableName.getNamespaceAsString()), 426 tableName.getQualifierAsString()); 427 } 428 429 /** 430 * Returns the {@link org.apache.hadoop.hbase.TableName} object representing 431 * the table directory under 432 * path rootdir 433 * 434 * @param tablePath path of table 435 * @return {@link org.apache.hadoop.fs.Path} for table 436 */ 437 public static TableName getTableName(Path tablePath) { 438 return TableName.valueOf(tablePath.getParent().getName(), tablePath.getName()); 439 } 440 441 /** 442 * Returns the {@link org.apache.hadoop.fs.Path} object representing 443 * the namespace directory under path rootdir 444 * 445 * @param rootdir qualified path of HBase root directory 446 * @param namespace namespace name 447 * @return {@link org.apache.hadoop.fs.Path} for table 448 */ 449 public static Path getNamespaceDir(Path rootdir, final String namespace) { 450 return new Path(rootdir, new Path(HConstants.BASE_NAMESPACE_DIR, 451 new Path(namespace))); 452 } 453 454 // this mapping means that under a federated FileSystem implementation, we'll 455 // only log the first failure from any of the underlying FileSystems at WARN and all others 456 // will be at DEBUG. 457 private static final Map<FileSystem, Boolean> warningMap = new ConcurrentHashMap<>(); 458 459 /** 460 * Sets storage policy for given path. 461 * If the passed path is a directory, we'll set the storage policy for all files 462 * created in the future in said directory. Note that this change in storage 463 * policy takes place at the FileSystem level; it will persist beyond this RS's lifecycle. 464 * If we're running on a version of FileSystem that doesn't support the given storage policy 465 * (or storage policies at all), then we'll issue a log message and continue. 466 * 467 * See http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html 468 * 469 * @param fs We only do anything it implements a setStoragePolicy method 470 * @param path the Path whose storage policy is to be set 471 * @param storagePolicy Policy to set on <code>path</code>; see hadoop 2.6+ 472 * org.apache.hadoop.hdfs.protocol.HdfsConstants for possible list e.g 473 * 'COLD', 'WARM', 'HOT', 'ONE_SSD', 'ALL_SSD', 'LAZY_PERSIST'. 474 */ 475 public static void setStoragePolicy(final FileSystem fs, final Path path, 476 final String storagePolicy) { 477 try { 478 setStoragePolicy(fs, path, storagePolicy, false); 479 } catch (IOException e) { 480 // should never arrive here 481 LOG.warn("We have chosen not to throw exception but some unexpectedly thrown out", e); 482 } 483 } 484 485 static void setStoragePolicy(final FileSystem fs, final Path path, final String storagePolicy, 486 boolean throwException) throws IOException { 487 if (storagePolicy == null) { 488 if (LOG.isTraceEnabled()) { 489 LOG.trace("We were passed a null storagePolicy, exiting early."); 490 } 491 return; 492 } 493 String trimmedStoragePolicy = storagePolicy.trim(); 494 if (trimmedStoragePolicy.isEmpty()) { 495 if (LOG.isTraceEnabled()) { 496 LOG.trace("We were passed an empty storagePolicy, exiting early."); 497 } 498 return; 499 } else { 500 trimmedStoragePolicy = trimmedStoragePolicy.toUpperCase(Locale.ROOT); 501 } 502 if (trimmedStoragePolicy.equals(HConstants.DEFER_TO_HDFS_STORAGE_POLICY)) { 503 if (LOG.isTraceEnabled()) { 504 LOG.trace("We were passed the defer-to-hdfs policy {}, exiting early.", 505 trimmedStoragePolicy); 506 } 507 return; 508 } 509 try { 510 invokeSetStoragePolicy(fs, path, trimmedStoragePolicy); 511 } catch (IOException e) { 512 if (LOG.isTraceEnabled()) { 513 LOG.trace("Failed to invoke set storage policy API on FS", e); 514 } 515 if (throwException) { 516 throw e; 517 } 518 } 519 } 520 521 /* 522 * All args have been checked and are good. Run the setStoragePolicy invocation. 523 */ 524 private static void invokeSetStoragePolicy(final FileSystem fs, final Path path, 525 final String storagePolicy) throws IOException { 526 Method m = null; 527 Exception toThrow = null; 528 try { 529 m = fs.getClass().getDeclaredMethod("setStoragePolicy", Path.class, String.class); 530 m.setAccessible(true); 531 } catch (NoSuchMethodException e) { 532 toThrow = e; 533 final String msg = "FileSystem doesn't support setStoragePolicy; HDFS-6584, HDFS-9345 " + 534 "not available. This is normal and expected on earlier Hadoop versions."; 535 if (!warningMap.containsKey(fs)) { 536 warningMap.put(fs, true); 537 LOG.warn(msg, e); 538 } else if (LOG.isDebugEnabled()) { 539 LOG.debug(msg, e); 540 } 541 m = null; 542 } catch (SecurityException e) { 543 toThrow = e; 544 final String msg = "No access to setStoragePolicy on FileSystem from the SecurityManager; " + 545 "HDFS-6584, HDFS-9345 not available. This is unusual and probably warrants an email " + 546 "to the user@hbase mailing list. Please be sure to include a link to your configs, and " + 547 "logs that include this message and period of time before it. Logs around service " + 548 "start up will probably be useful as well."; 549 if (!warningMap.containsKey(fs)) { 550 warningMap.put(fs, true); 551 LOG.warn(msg, e); 552 } else if (LOG.isDebugEnabled()) { 553 LOG.debug(msg, e); 554 } 555 m = null; // could happen on setAccessible() or getDeclaredMethod() 556 } 557 if (m != null) { 558 try { 559 m.invoke(fs, path, storagePolicy); 560 if (LOG.isDebugEnabled()) { 561 LOG.debug("Set storagePolicy={} for path={}", storagePolicy, path); 562 } 563 } catch (Exception e) { 564 toThrow = e; 565 // This swallows FNFE, should we be throwing it? seems more likely to indicate dev 566 // misuse than a runtime problem with HDFS. 567 if (!warningMap.containsKey(fs)) { 568 warningMap.put(fs, true); 569 LOG.warn("Unable to set storagePolicy=" + storagePolicy + " for path=" + path + ". " + 570 "DEBUG log level might have more details.", e); 571 } else if (LOG.isDebugEnabled()) { 572 LOG.debug("Unable to set storagePolicy=" + storagePolicy + " for path=" + path, e); 573 } 574 // check for lack of HDFS-7228 575 if (e instanceof InvocationTargetException) { 576 final Throwable exception = e.getCause(); 577 if (exception instanceof RemoteException && 578 HadoopIllegalArgumentException.class.getName().equals( 579 ((RemoteException)exception).getClassName())) { 580 if (LOG.isDebugEnabled()) { 581 LOG.debug("Given storage policy, '" +storagePolicy +"', was rejected and probably " + 582 "isn't a valid policy for the version of Hadoop you're running. I.e. if you're " + 583 "trying to use SSD related policies then you're likely missing HDFS-7228. For " + 584 "more information see the 'ArchivalStorage' docs for your Hadoop release."); 585 } 586 // Hadoop 2.8+, 3.0-a1+ added FileSystem.setStoragePolicy with a default implementation 587 // that throws UnsupportedOperationException 588 } else if (exception instanceof UnsupportedOperationException) { 589 if (LOG.isDebugEnabled()) { 590 LOG.debug("The underlying FileSystem implementation doesn't support " + 591 "setStoragePolicy. This is probably intentional on their part, since HDFS-9345 " + 592 "appears to be present in your version of Hadoop. For more information check " + 593 "the Hadoop documentation on 'ArchivalStorage', the Hadoop FileSystem " + 594 "specification docs from HADOOP-11981, and/or related documentation from the " + 595 "provider of the underlying FileSystem (its name should appear in the " + 596 "stacktrace that accompanies this message). Note in particular that Hadoop's " + 597 "local filesystem implementation doesn't support storage policies.", exception); 598 } 599 } 600 } 601 } 602 } 603 if (toThrow != null) { 604 throw new IOException(toThrow); 605 } 606 } 607 608 /** 609 * @param conf must not be null 610 * @return True if this filesystem whose scheme is 'hdfs'. 611 * @throws IOException from underlying FileSystem 612 */ 613 public static boolean isHDFS(final Configuration conf) throws IOException { 614 FileSystem fs = FileSystem.get(conf); 615 String scheme = fs.getUri().getScheme(); 616 return scheme.equalsIgnoreCase("hdfs"); 617 } 618 619 /** 620 * Checks if the given path is the one with 'recovered.edits' dir. 621 * @param path must not be null 622 * @return True if we recovered edits 623 */ 624 public static boolean isRecoveredEdits(Path path) { 625 return path.toString().contains(HConstants.RECOVERED_EDITS_DIR); 626 } 627 628 /** 629 * @param conf must not be null 630 * @return Returns the filesystem of the hbase rootdir. 631 * @throws IOException from underlying FileSystem 632 */ 633 public static FileSystem getCurrentFileSystem(Configuration conf) throws IOException { 634 return getRootDir(conf).getFileSystem(conf); 635 } 636 637 /** 638 * Calls fs.listStatus() and treats FileNotFoundException as non-fatal 639 * This accommodates differences between hadoop versions, where hadoop 1 640 * does not throw a FileNotFoundException, and return an empty FileStatus[] 641 * while Hadoop 2 will throw FileNotFoundException. 642 * 643 * Where possible, prefer FSUtils#listStatusWithStatusFilter(FileSystem, 644 * Path, FileStatusFilter) instead. 645 * 646 * @param fs file system 647 * @param dir directory 648 * @param filter path filter 649 * @return null if dir is empty or doesn't exist, otherwise FileStatus array 650 */ 651 public static FileStatus[] listStatus(final FileSystem fs, 652 final Path dir, final PathFilter filter) throws IOException { 653 FileStatus [] status = null; 654 try { 655 status = filter == null ? fs.listStatus(dir) : fs.listStatus(dir, filter); 656 } catch (FileNotFoundException fnfe) { 657 // if directory doesn't exist, return null 658 if (LOG.isTraceEnabled()) { 659 LOG.trace("{} doesn't exist", dir); 660 } 661 } 662 if (status == null || status.length < 1) { 663 return null; 664 } 665 return status; 666 } 667 668 /** 669 * Calls fs.listStatus() and treats FileNotFoundException as non-fatal 670 * This would accommodates differences between hadoop versions 671 * 672 * @param fs file system 673 * @param dir directory 674 * @return null if dir is empty or doesn't exist, otherwise FileStatus array 675 */ 676 public static FileStatus[] listStatus(final FileSystem fs, final Path dir) throws IOException { 677 return listStatus(fs, dir, null); 678 } 679 680 /** 681 * Calls fs.listFiles() to get FileStatus and BlockLocations together for reducing rpc call 682 * 683 * @param fs file system 684 * @param dir directory 685 * @return LocatedFileStatus list 686 */ 687 public static List<LocatedFileStatus> listLocatedStatus(final FileSystem fs, 688 final Path dir) throws IOException { 689 List<LocatedFileStatus> status = null; 690 try { 691 RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fs 692 .listFiles(dir, false); 693 while (locatedFileStatusRemoteIterator.hasNext()) { 694 if (status == null) { 695 status = Lists.newArrayList(); 696 } 697 status.add(locatedFileStatusRemoteIterator.next()); 698 } 699 } catch (FileNotFoundException fnfe) { 700 // if directory doesn't exist, return null 701 if (LOG.isTraceEnabled()) { 702 LOG.trace("{} doesn't exist", dir); 703 } 704 } 705 return status; 706 } 707 708 /** 709 * Calls fs.delete() and returns the value returned by the fs.delete() 710 * 711 * @param fs must not be null 712 * @param path must not be null 713 * @param recursive delete tree rooted at path 714 * @return the value returned by the fs.delete() 715 * @throws IOException from underlying FileSystem 716 */ 717 public static boolean delete(final FileSystem fs, final Path path, final boolean recursive) 718 throws IOException { 719 return fs.delete(path, recursive); 720 } 721 722 /** 723 * Calls fs.exists(). Checks if the specified path exists 724 * 725 * @param fs must not be null 726 * @param path must not be null 727 * @return the value returned by fs.exists() 728 * @throws IOException from underlying FileSystem 729 */ 730 public static boolean isExists(final FileSystem fs, final Path path) throws IOException { 731 return fs.exists(path); 732 } 733 734 /** 735 * Log the current state of the filesystem from a certain root directory 736 * @param fs filesystem to investigate 737 * @param root root file/directory to start logging from 738 * @param log log to output information 739 * @throws IOException if an unexpected exception occurs 740 */ 741 public static void logFileSystemState(final FileSystem fs, final Path root, Logger log) 742 throws IOException { 743 log.debug("File system contents for path {}", root); 744 logFSTree(log, fs, root, "|-"); 745 } 746 747 /** 748 * Recursive helper to log the state of the FS 749 * 750 * @see #logFileSystemState(FileSystem, Path, Logger) 751 */ 752 private static void logFSTree(Logger log, final FileSystem fs, final Path root, String prefix) 753 throws IOException { 754 FileStatus[] files = listStatus(fs, root, null); 755 if (files == null) { 756 return; 757 } 758 759 for (FileStatus file : files) { 760 if (file.isDirectory()) { 761 log.debug(prefix + file.getPath().getName() + "/"); 762 logFSTree(log, fs, file.getPath(), prefix + "---"); 763 } else { 764 log.debug(prefix + file.getPath().getName()); 765 } 766 } 767 } 768 769 public static boolean renameAndSetModifyTime(final FileSystem fs, final Path src, final Path dest) 770 throws IOException { 771 // set the modify time for TimeToLive Cleaner 772 fs.setTimes(src, EnvironmentEdgeManager.currentTime(), -1); 773 return fs.rename(src, dest); 774 } 775 776 /** 777 * Check if short circuit read buffer size is set and if not, set it to hbase value. 778 * @param conf must not be null 779 */ 780 public static void checkShortCircuitReadBufferSize(final Configuration conf) { 781 final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2; 782 final int notSet = -1; 783 // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2 784 final String dfsKey = "dfs.client.read.shortcircuit.buffer.size"; 785 int size = conf.getInt(dfsKey, notSet); 786 // If a size is set, return -- we will use it. 787 if (size != notSet) { 788 return; 789 } 790 // But short circuit buffer size is normally not set. Put in place the hbase wanted size. 791 int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize); 792 conf.setIfUnset(dfsKey, Integer.toString(hbaseSize)); 793 } 794 795 private static class DfsBuilderUtility { 796 static Class<?> dfsClass = null; 797 static Method createMethod; 798 static Method overwriteMethod; 799 static Method bufferSizeMethod; 800 static Method blockSizeMethod; 801 static Method recursiveMethod; 802 static Method replicateMethod; 803 static Method replicationMethod; 804 static Method buildMethod; 805 static boolean allMethodsPresent = false; 806 807 static { 808 String dfsName = "org.apache.hadoop.hdfs.DistributedFileSystem"; 809 String builderName = dfsName + "$HdfsDataOutputStreamBuilder"; 810 Class<?> builderClass = null; 811 812 try { 813 dfsClass = Class.forName(dfsName); 814 } catch (ClassNotFoundException e) { 815 LOG.debug("{} not available, will not use builder API for file creation.", dfsName); 816 } 817 try { 818 builderClass = Class.forName(builderName); 819 } catch (ClassNotFoundException e) { 820 LOG.debug("{} not available, will not use builder API for file creation.", builderName); 821 } 822 823 if (dfsClass != null && builderClass != null) { 824 try { 825 createMethod = dfsClass.getMethod("createFile", Path.class); 826 overwriteMethod = builderClass.getMethod("overwrite", boolean.class); 827 bufferSizeMethod = builderClass.getMethod("bufferSize", int.class); 828 blockSizeMethod = builderClass.getMethod("blockSize", long.class); 829 recursiveMethod = builderClass.getMethod("recursive"); 830 replicateMethod = builderClass.getMethod("replicate"); 831 replicationMethod = builderClass.getMethod("replication", short.class); 832 buildMethod = builderClass.getMethod("build"); 833 834 allMethodsPresent = true; 835 LOG.debug("Using builder API via reflection for DFS file creation."); 836 } catch (NoSuchMethodException e) { 837 LOG.debug("Could not find method on builder; will use old DFS API for file creation {}", 838 e.getMessage()); 839 } 840 } 841 } 842 843 /** 844 * Attempt to use builder API via reflection to create a file with the given parameters and 845 * replication enabled. 846 */ 847 static FSDataOutputStream createHelper(FileSystem fs, Path path, boolean overwritable, 848 int bufferSize, short replication, long blockSize, boolean isRecursive) throws IOException { 849 if (allMethodsPresent && dfsClass.isInstance(fs)) { 850 try { 851 Object builder; 852 853 builder = createMethod.invoke(fs, path); 854 builder = overwriteMethod.invoke(builder, overwritable); 855 builder = bufferSizeMethod.invoke(builder, bufferSize); 856 builder = blockSizeMethod.invoke(builder, blockSize); 857 if (isRecursive) { 858 builder = recursiveMethod.invoke(builder); 859 } 860 builder = replicateMethod.invoke(builder); 861 builder = replicationMethod.invoke(builder, replication); 862 return (FSDataOutputStream) buildMethod.invoke(builder); 863 } catch (IllegalAccessException | InvocationTargetException e) { 864 // Should have caught this failure during initialization, so log full trace here 865 LOG.warn("Couldn't use reflection with builder API", e); 866 } 867 } 868 869 if (isRecursive) { 870 return fs.create(path, overwritable, bufferSize, replication, blockSize, null); 871 } 872 return fs.createNonRecursive(path, overwritable, bufferSize, replication, blockSize, null); 873 } 874 875 /** 876 * Attempt to use builder API via reflection to create a file with the given parameters and 877 * replication enabled. 878 */ 879 static FSDataOutputStream createHelper(FileSystem fs, Path path, boolean overwritable) 880 throws IOException { 881 if (allMethodsPresent && dfsClass.isInstance(fs)) { 882 try { 883 Object builder; 884 885 builder = createMethod.invoke(fs, path); 886 builder = overwriteMethod.invoke(builder, overwritable); 887 builder = replicateMethod.invoke(builder); 888 return (FSDataOutputStream) buildMethod.invoke(builder); 889 } catch (IllegalAccessException | InvocationTargetException e) { 890 // Should have caught this failure during initialization, so log full trace here 891 LOG.warn("Couldn't use reflection with builder API", e); 892 } 893 } 894 895 return fs.create(path, overwritable); 896 } 897 } 898 899 /** 900 * Attempt to use builder API via reflection to create a file with the given parameters and 901 * replication enabled. 902 * <p> 903 * Will not attempt to enable replication when passed an HFileSystem. 904 */ 905 public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwritable) 906 throws IOException { 907 return DfsBuilderUtility.createHelper(fs, path, overwritable); 908 } 909 910 /** 911 * Attempt to use builder API via reflection to create a file with the given parameters and 912 * replication enabled. 913 * <p> 914 * Will not attempt to enable replication when passed an HFileSystem. 915 */ 916 public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwritable, 917 int bufferSize, short replication, long blockSize, boolean isRecursive) throws IOException { 918 return DfsBuilderUtility.createHelper(fs, path, overwritable, bufferSize, replication, 919 blockSize, isRecursive); 920 } 921 922 // Holder singleton idiom. JVM spec ensures this will be run at most once per Classloader, and 923 // not until we attempt to reference it. 924 private static class StreamCapabilities { 925 public static final boolean PRESENT; 926 public static final Class<?> CLASS; 927 public static final Method METHOD; 928 static { 929 boolean tmp = false; 930 Class<?> clazz = null; 931 Method method = null; 932 try { 933 clazz = Class.forName("org.apache.hadoop.fs.StreamCapabilities"); 934 method = clazz.getMethod("hasCapability", String.class); 935 tmp = true; 936 } catch(ClassNotFoundException|NoSuchMethodException|SecurityException exception) { 937 LOG.warn("Your Hadoop installation does not include the StreamCapabilities class from " + 938 "HDFS-11644, so we will skip checking if any FSDataOutputStreams actually " + 939 "support hflush/hsync. If you are running on top of HDFS this probably just " + 940 "means you have an older version and this can be ignored. If you are running on " + 941 "top of an alternate FileSystem implementation you should manually verify that " + 942 "hflush and hsync are implemented; otherwise you risk data loss and hard to " + 943 "diagnose errors when our assumptions are violated."); 944 LOG.debug("The first request to check for StreamCapabilities came from this stacktrace.", 945 exception); 946 } finally { 947 PRESENT = tmp; 948 CLASS = clazz; 949 METHOD = method; 950 } 951 } 952 } 953 954 /** 955 * If our FileSystem version includes the StreamCapabilities class, check if 956 * the given stream has a particular capability. 957 * @param stream capabilities are per-stream instance, so check this one specifically. must not be 958 * null 959 * @param capability what to look for, per Hadoop Common's FileSystem docs 960 * @return true if there are no StreamCapabilities. false if there are, but this stream doesn't 961 * implement it. return result of asking the stream otherwise. 962 */ 963 public static boolean hasCapability(FSDataOutputStream stream, String capability) { 964 // be consistent whether or not StreamCapabilities is present 965 if (stream == null) { 966 throw new NullPointerException("stream parameter must not be null."); 967 } 968 // If o.a.h.fs.StreamCapabilities doesn't exist, assume everyone does everything 969 // otherwise old versions of Hadoop will break. 970 boolean result = true; 971 if (StreamCapabilities.PRESENT) { 972 // if StreamCapabilities is present, but the stream doesn't implement it 973 // or we run into a problem invoking the method, 974 // we treat that as equivalent to not declaring anything 975 result = false; 976 if (StreamCapabilities.CLASS.isAssignableFrom(stream.getClass())) { 977 try { 978 result = ((Boolean)StreamCapabilities.METHOD.invoke(stream, capability)).booleanValue(); 979 } catch (IllegalAccessException|IllegalArgumentException|InvocationTargetException 980 exception) { 981 LOG.warn("Your Hadoop installation's StreamCapabilities implementation doesn't match " + 982 "our understanding of how it's supposed to work. Please file a JIRA and include " + 983 "the following stack trace. In the mean time we're interpreting this behavior " + 984 "difference as a lack of capability support, which will probably cause a failure.", 985 exception); 986 } 987 } 988 } 989 return result; 990 } 991 992 /** 993 * Helper exception for those cases where the place where we need to check a stream capability 994 * is not where we have the needed context to explain the impact and mitigation for a lack. 995 */ 996 public static class StreamLacksCapabilityException extends Exception { 997 public StreamLacksCapabilityException(String message, Throwable cause) { 998 super(message, cause); 999 } 1000 public StreamLacksCapabilityException(String message) { 1001 super(message); 1002 } 1003 } 1004}