001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.util; 020 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.lang.reflect.InvocationTargetException; 024import java.lang.reflect.Method; 025import java.net.URI; 026import java.net.URISyntaxException; 027import java.util.List; 028import java.util.Locale; 029import java.util.Map; 030import java.util.concurrent.ConcurrentHashMap; 031import org.apache.hadoop.HadoopIllegalArgumentException; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FSDataOutputStream; 034import org.apache.hadoop.fs.FileStatus; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.LocatedFileStatus; 037import org.apache.hadoop.fs.Path; 038import org.apache.hadoop.fs.PathFilter; 039import org.apache.hadoop.fs.RemoteIterator; 040import org.apache.hadoop.fs.permission.FsPermission; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.TableName; 043import org.apache.hadoop.ipc.RemoteException; 044import org.apache.yetus.audience.InterfaceAudience; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 049import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 050 051/** 052 * Utility methods for interacting with the underlying file system. 053 * <p/> 054 * Note that {@link #setStoragePolicy(FileSystem, Path, String)} is tested in TestFSUtils and 055 * pre-commit will run the hbase-server tests if there's code change in this class. See 056 * <a href="https://issues.apache.org/jira/browse/HBASE-20838">HBASE-20838</a> for more details. 057 */ 058@InterfaceAudience.Private 059public abstract class CommonFSUtils { 060 private static final Logger LOG = LoggerFactory.getLogger(CommonFSUtils.class); 061 062 /** Parameter name for HBase WAL directory */ 063 public static final String HBASE_WAL_DIR = "hbase.wal.dir"; 064 065 /** Parameter to disable stream capability enforcement checks */ 066 public static final String UNSAFE_STREAM_CAPABILITY_ENFORCE = 067 "hbase.unsafe.stream.capability.enforce"; 068 069 /** Full access permissions (starting point for a umask) */ 070 public static final String FULL_RWX_PERMISSIONS = "777"; 071 072 protected CommonFSUtils() { 073 super(); 074 } 075 076 /** 077 * Compare of path component. Does not consider schema; i.e. if schemas 078 * different but <code>path</code> starts with <code>rootPath</code>, 079 * then the function returns true 080 * @param rootPath value to check for 081 * @param path subject to check 082 * @return True if <code>path</code> starts with <code>rootPath</code> 083 */ 084 public static boolean isStartingWithPath(final Path rootPath, final String path) { 085 String uriRootPath = rootPath.toUri().getPath(); 086 String tailUriPath = (new Path(path)).toUri().getPath(); 087 return tailUriPath.startsWith(uriRootPath); 088 } 089 090 /** 091 * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the 092 * '/a/b/c' part. Does not consider schema; i.e. if schemas different but path or subpath matches, 093 * the two will equate. 094 * @param pathToSearch Path we will be trying to match against. 095 * @param pathTail what to match 096 * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code> 097 */ 098 public static boolean isMatchingTail(final Path pathToSearch, String pathTail) { 099 return isMatchingTail(pathToSearch, new Path(pathTail)); 100 } 101 102 /** 103 * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the 104 * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true. Does not consider 105 * schema; i.e. if schemas different but path or subpath matches, the two will equate. 106 * @param pathToSearch Path we will be trying to match agains against 107 * @param pathTail what to match 108 * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code> 109 */ 110 public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) { 111 if (pathToSearch.depth() != pathTail.depth()) { 112 return false; 113 } 114 Path tailPath = pathTail; 115 String tailName; 116 Path toSearch = pathToSearch; 117 String toSearchName; 118 boolean result = false; 119 do { 120 tailName = tailPath.getName(); 121 if (tailName == null || tailName.length() <= 0) { 122 result = true; 123 break; 124 } 125 toSearchName = toSearch.getName(); 126 if (toSearchName == null || toSearchName.length() <= 0) { 127 break; 128 } 129 // Move up a parent on each path for next go around. Path doesn't let us go off the end. 130 tailPath = tailPath.getParent(); 131 toSearch = toSearch.getParent(); 132 } while(tailName.equals(toSearchName)); 133 return result; 134 } 135 136 /** 137 * Delete if exists. 138 * @param fs filesystem object 139 * @param dir directory to delete 140 * @return True if deleted <code>dir</code> 141 * @throws IOException e 142 */ 143 public static boolean deleteDirectory(final FileSystem fs, final Path dir) throws IOException { 144 return fs.exists(dir) && fs.delete(dir, true); 145 } 146 147 /** 148 * Return the number of bytes that large input files should be optimally 149 * be split into to minimize i/o time. 150 * 151 * use reflection to search for getDefaultBlockSize(Path f) 152 * if the method doesn't exist, fall back to using getDefaultBlockSize() 153 * 154 * @param fs filesystem object 155 * @return the default block size for the path's filesystem 156 * @throws IOException e 157 */ 158 public static long getDefaultBlockSize(final FileSystem fs, final Path path) throws IOException { 159 Method m = null; 160 Class<? extends FileSystem> cls = fs.getClass(); 161 try { 162 m = cls.getMethod("getDefaultBlockSize", Path.class); 163 } catch (NoSuchMethodException e) { 164 LOG.info("FileSystem doesn't support getDefaultBlockSize"); 165 } catch (SecurityException e) { 166 LOG.info("Doesn't have access to getDefaultBlockSize on FileSystems", e); 167 m = null; // could happen on setAccessible() 168 } 169 if (m == null) { 170 return fs.getDefaultBlockSize(path); 171 } else { 172 try { 173 Object ret = m.invoke(fs, path); 174 return ((Long)ret).longValue(); 175 } catch (Exception e) { 176 throw new IOException(e); 177 } 178 } 179 } 180 181 /* 182 * Get the default replication. 183 * 184 * use reflection to search for getDefaultReplication(Path f) 185 * if the method doesn't exist, fall back to using getDefaultReplication() 186 * 187 * @param fs filesystem object 188 * @param f path of file 189 * @return default replication for the path's filesystem 190 * @throws IOException e 191 */ 192 public static short getDefaultReplication(final FileSystem fs, final Path path) 193 throws IOException { 194 Method m = null; 195 Class<? extends FileSystem> cls = fs.getClass(); 196 try { 197 m = cls.getMethod("getDefaultReplication", Path.class); 198 } catch (NoSuchMethodException e) { 199 LOG.info("FileSystem doesn't support getDefaultReplication"); 200 } catch (SecurityException e) { 201 LOG.info("Doesn't have access to getDefaultReplication on FileSystems", e); 202 m = null; // could happen on setAccessible() 203 } 204 if (m == null) { 205 return fs.getDefaultReplication(path); 206 } else { 207 try { 208 Object ret = m.invoke(fs, path); 209 return ((Number)ret).shortValue(); 210 } catch (Exception e) { 211 throw new IOException(e); 212 } 213 } 214 } 215 216 /** 217 * Returns the default buffer size to use during writes. 218 * 219 * The size of the buffer should probably be a multiple of hardware 220 * page size (4096 on Intel x86), and it determines how much data is 221 * buffered during read and write operations. 222 * 223 * @param fs filesystem object 224 * @return default buffer size to use during writes 225 */ 226 public static int getDefaultBufferSize(final FileSystem fs) { 227 return fs.getConf().getInt("io.file.buffer.size", 4096); 228 } 229 230 /** 231 * Create the specified file on the filesystem. By default, this will: 232 * <ol> 233 * <li>apply the umask in the configuration (if it is enabled)</li> 234 * <li>use the fs configured buffer size (or 4096 if not set)</li> 235 * <li>use the default replication</li> 236 * <li>use the default block size</li> 237 * <li>not track progress</li> 238 * </ol> 239 * 240 * @param fs {@link FileSystem} on which to write the file 241 * @param path {@link Path} to the file to write 242 * @param perm intial permissions 243 * @param overwrite Whether or not the created file should be overwritten. 244 * @return output stream to the created file 245 * @throws IOException if the file cannot be created 246 */ 247 public static FSDataOutputStream create(FileSystem fs, Path path, 248 FsPermission perm, boolean overwrite) throws IOException { 249 if (LOG.isTraceEnabled()) { 250 LOG.trace("Creating file={} with permission={}, overwrite={}", path, perm, overwrite); 251 } 252 return fs.create(path, perm, overwrite, getDefaultBufferSize(fs), 253 getDefaultReplication(fs, path), getDefaultBlockSize(fs, path), null); 254 } 255 256 /** 257 * Get the file permissions specified in the configuration, if they are 258 * enabled. 259 * 260 * @param fs filesystem that the file will be created on. 261 * @param conf configuration to read for determining if permissions are 262 * enabled and which to use 263 * @param permssionConfKey property key in the configuration to use when 264 * finding the permission 265 * @return the permission to use when creating a new file on the fs. If 266 * special permissions are not specified in the configuration, then 267 * the default permissions on the the fs will be returned. 268 */ 269 public static FsPermission getFilePermissions(final FileSystem fs, 270 final Configuration conf, final String permssionConfKey) { 271 boolean enablePermissions = conf.getBoolean( 272 HConstants.ENABLE_DATA_FILE_UMASK, false); 273 274 if (enablePermissions) { 275 try { 276 FsPermission perm = new FsPermission(FULL_RWX_PERMISSIONS); 277 // make sure that we have a mask, if not, go default. 278 String mask = conf.get(permssionConfKey); 279 if (mask == null) { 280 return FsPermission.getFileDefault(); 281 } 282 // appy the umask 283 FsPermission umask = new FsPermission(mask); 284 return perm.applyUMask(umask); 285 } catch (IllegalArgumentException e) { 286 LOG.warn( 287 "Incorrect umask attempted to be created: " 288 + conf.get(permssionConfKey) 289 + ", using default file permissions.", e); 290 return FsPermission.getFileDefault(); 291 } 292 } 293 return FsPermission.getFileDefault(); 294 } 295 296 /** 297 * Verifies root directory path is a valid URI with a scheme 298 * 299 * @param root root directory path 300 * @return Passed <code>root</code> argument. 301 * @throws IOException if not a valid URI with a scheme 302 */ 303 public static Path validateRootPath(Path root) throws IOException { 304 try { 305 URI rootURI = new URI(root.toString()); 306 String scheme = rootURI.getScheme(); 307 if (scheme == null) { 308 throw new IOException("Root directory does not have a scheme"); 309 } 310 return root; 311 } catch (URISyntaxException e) { 312 IOException io = new IOException("Root directory path is not a valid " + 313 "URI -- check your " + HConstants.HBASE_DIR + " configuration"); 314 io.initCause(e); 315 throw io; 316 } 317 } 318 319 /** 320 * Checks for the presence of the WAL log root path (using the provided conf object) in the given 321 * path. If it exists, this method removes it and returns the String representation of remaining 322 * relative path. 323 * @param path must not be null 324 * @param conf must not be null 325 * @return String representation of the remaining relative path 326 * @throws IOException from underlying filesystem 327 */ 328 public static String removeWALRootPath(Path path, final Configuration conf) throws IOException { 329 Path root = getWALRootDir(conf); 330 String pathStr = path.toString(); 331 // check that the path is absolute... it has the root path in it. 332 if (!pathStr.startsWith(root.toString())) { 333 return pathStr; 334 } 335 // if not, return as it is. 336 return pathStr.substring(root.toString().length() + 1);// remove the "/" too. 337 } 338 339 /** 340 * Return the 'path' component of a Path. In Hadoop, Path is a URI. This 341 * method returns the 'path' component of a Path's URI: e.g. If a Path is 342 * <code>hdfs://example.org:9000/hbase_trunk/TestTable/compaction.dir</code>, 343 * this method returns <code>/hbase_trunk/TestTable/compaction.dir</code>. 344 * This method is useful if you want to print out a Path without qualifying 345 * Filesystem instance. 346 * @param p Filesystem Path whose 'path' component we are to return. 347 * @return Path portion of the Filesystem 348 */ 349 public static String getPath(Path p) { 350 return p.toUri().getPath(); 351 } 352 353 /** 354 * @param c configuration 355 * @return {@link Path} to hbase root directory from 356 * configuration as a qualified Path. 357 * @throws IOException e 358 */ 359 public static Path getRootDir(final Configuration c) throws IOException { 360 Path p = new Path(c.get(HConstants.HBASE_DIR)); 361 FileSystem fs = p.getFileSystem(c); 362 return p.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 363 } 364 365 public static void setRootDir(final Configuration c, final Path root) { 366 c.set(HConstants.HBASE_DIR, root.toString()); 367 } 368 369 public static void setFsDefault(final Configuration c, final Path root) { 370 c.set("fs.defaultFS", root.toString()); // for hadoop 0.21+ 371 } 372 373 public static FileSystem getRootDirFileSystem(final Configuration c) throws IOException { 374 Path p = getRootDir(c); 375 return p.getFileSystem(c); 376 } 377 378 /** 379 * @param c configuration 380 * @return {@link Path} to hbase log root directory: e.g. {@value HBASE_WAL_DIR} from 381 * configuration as a qualified Path. Defaults to HBase root dir. 382 * @throws IOException e 383 */ 384 public static Path getWALRootDir(final Configuration c) throws IOException { 385 386 Path p = new Path(c.get(HBASE_WAL_DIR, c.get(HConstants.HBASE_DIR))); 387 if (!isValidWALRootDir(p, c)) { 388 return getRootDir(c); 389 } 390 FileSystem fs = p.getFileSystem(c); 391 return p.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 392 } 393 394 @VisibleForTesting 395 public static void setWALRootDir(final Configuration c, final Path root) { 396 c.set(HBASE_WAL_DIR, root.toString()); 397 } 398 399 public static FileSystem getWALFileSystem(final Configuration c) throws IOException { 400 Path p = getWALRootDir(c); 401 FileSystem fs = p.getFileSystem(c); 402 // hadoop-core does fs caching, so need to propogate this if set 403 String enforceStreamCapability = c.get(UNSAFE_STREAM_CAPABILITY_ENFORCE); 404 if (enforceStreamCapability != null) { 405 fs.getConf().set(UNSAFE_STREAM_CAPABILITY_ENFORCE, enforceStreamCapability); 406 } 407 return fs; 408 } 409 410 private static boolean isValidWALRootDir(Path walDir, final Configuration c) throws IOException { 411 Path rootDir = getRootDir(c); 412 FileSystem fs = walDir.getFileSystem(c); 413 Path qualifiedWalDir = walDir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 414 if (!qualifiedWalDir.equals(rootDir)) { 415 if (qualifiedWalDir.toString().startsWith(rootDir.toString() + "/")) { 416 throw new IllegalStateException("Illegal WAL directory specified. " + 417 "WAL directories are not permitted to be under the root directory if set."); 418 } 419 } 420 return true; 421 } 422 423 /** 424 * Returns the WAL region directory based on the given table name and region name 425 * @param conf configuration to determine WALRootDir 426 * @param tableName Table that the region is under 427 * @param encodedRegionName Region name used for creating the final region directory 428 * @return the region directory used to store WALs under the WALRootDir 429 * @throws IOException if there is an exception determining the WALRootDir 430 */ 431 public static Path getWALRegionDir(final Configuration conf, final TableName tableName, 432 final String encodedRegionName) throws IOException { 433 return new Path(getWALTableDir(conf, tableName), encodedRegionName); 434 } 435 436 /** 437 * Returns the Table directory under the WALRootDir for the specified table name 438 * @param conf configuration used to get the WALRootDir 439 * @param tableName Table to get the directory for 440 * @return a path to the WAL table directory for the specified table 441 * @throws IOException if there is an exception determining the WALRootDir 442 */ 443 public static Path getWALTableDir(final Configuration conf, final TableName tableName) 444 throws IOException { 445 Path baseDir = new Path(getWALRootDir(conf), HConstants.BASE_NAMESPACE_DIR); 446 return new Path(new Path(baseDir, tableName.getNamespaceAsString()), 447 tableName.getQualifierAsString()); 448 } 449 450 /** 451 * For backward compatibility with HBASE-20734, where we store recovered edits in a wrong 452 * directory without BASE_NAMESPACE_DIR. See HBASE-22617 for more details. 453 * @deprecated For compatibility, will be removed in 4.0.0. 454 */ 455 @Deprecated 456 public static Path getWrongWALRegionDir(final Configuration conf, final TableName tableName, 457 final String encodedRegionName) throws IOException { 458 Path wrongTableDir = new Path(new Path(getWALRootDir(conf), tableName.getNamespaceAsString()), 459 tableName.getQualifierAsString()); 460 return new Path(wrongTableDir, encodedRegionName); 461 } 462 463 /** 464 * Returns the {@link org.apache.hadoop.fs.Path} object representing the table directory under 465 * path rootdir 466 * 467 * @param rootdir qualified path of HBase root directory 468 * @param tableName name of table 469 * @return {@link org.apache.hadoop.fs.Path} for table 470 */ 471 public static Path getTableDir(Path rootdir, final TableName tableName) { 472 return new Path(getNamespaceDir(rootdir, tableName.getNamespaceAsString()), 473 tableName.getQualifierAsString()); 474 } 475 476 /** 477 * Returns the {@link org.apache.hadoop.hbase.TableName} object representing 478 * the table directory under 479 * path rootdir 480 * 481 * @param tablePath path of table 482 * @return {@link org.apache.hadoop.fs.Path} for table 483 */ 484 public static TableName getTableName(Path tablePath) { 485 return TableName.valueOf(tablePath.getParent().getName(), tablePath.getName()); 486 } 487 488 /** 489 * Returns the {@link org.apache.hadoop.fs.Path} object representing 490 * the namespace directory under path rootdir 491 * 492 * @param rootdir qualified path of HBase root directory 493 * @param namespace namespace name 494 * @return {@link org.apache.hadoop.fs.Path} for table 495 */ 496 public static Path getNamespaceDir(Path rootdir, final String namespace) { 497 return new Path(rootdir, new Path(HConstants.BASE_NAMESPACE_DIR, 498 new Path(namespace))); 499 } 500 501 // this mapping means that under a federated FileSystem implementation, we'll 502 // only log the first failure from any of the underlying FileSystems at WARN and all others 503 // will be at DEBUG. 504 private static final Map<FileSystem, Boolean> warningMap = new ConcurrentHashMap<>(); 505 506 /** 507 * Sets storage policy for given path. 508 * If the passed path is a directory, we'll set the storage policy for all files 509 * created in the future in said directory. Note that this change in storage 510 * policy takes place at the FileSystem level; it will persist beyond this RS's lifecycle. 511 * If we're running on a version of FileSystem that doesn't support the given storage policy 512 * (or storage policies at all), then we'll issue a log message and continue. 513 * 514 * See http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html 515 * 516 * @param fs We only do anything it implements a setStoragePolicy method 517 * @param path the Path whose storage policy is to be set 518 * @param storagePolicy Policy to set on <code>path</code>; see hadoop 2.6+ 519 * org.apache.hadoop.hdfs.protocol.HdfsConstants for possible list e.g 520 * 'COLD', 'WARM', 'HOT', 'ONE_SSD', 'ALL_SSD', 'LAZY_PERSIST'. 521 */ 522 public static void setStoragePolicy(final FileSystem fs, final Path path, 523 final String storagePolicy) { 524 try { 525 setStoragePolicy(fs, path, storagePolicy, false); 526 } catch (IOException e) { 527 // should never arrive here 528 LOG.warn("We have chosen not to throw exception but some unexpectedly thrown out", e); 529 } 530 } 531 532 static void setStoragePolicy(final FileSystem fs, final Path path, final String storagePolicy, 533 boolean throwException) throws IOException { 534 if (storagePolicy == null) { 535 if (LOG.isTraceEnabled()) { 536 LOG.trace("We were passed a null storagePolicy, exiting early."); 537 } 538 return; 539 } 540 String trimmedStoragePolicy = storagePolicy.trim(); 541 if (trimmedStoragePolicy.isEmpty()) { 542 if (LOG.isTraceEnabled()) { 543 LOG.trace("We were passed an empty storagePolicy, exiting early."); 544 } 545 return; 546 } else { 547 trimmedStoragePolicy = trimmedStoragePolicy.toUpperCase(Locale.ROOT); 548 } 549 if (trimmedStoragePolicy.equals(HConstants.DEFER_TO_HDFS_STORAGE_POLICY)) { 550 if (LOG.isTraceEnabled()) { 551 LOG.trace("We were passed the defer-to-hdfs policy {}, exiting early.", 552 trimmedStoragePolicy); 553 } 554 return; 555 } 556 try { 557 invokeSetStoragePolicy(fs, path, trimmedStoragePolicy); 558 } catch (IOException e) { 559 if (LOG.isTraceEnabled()) { 560 LOG.trace("Failed to invoke set storage policy API on FS", e); 561 } 562 if (throwException) { 563 throw e; 564 } 565 } 566 } 567 568 /* 569 * All args have been checked and are good. Run the setStoragePolicy invocation. 570 */ 571 private static void invokeSetStoragePolicy(final FileSystem fs, final Path path, 572 final String storagePolicy) throws IOException { 573 Method m = null; 574 Exception toThrow = null; 575 try { 576 m = fs.getClass().getDeclaredMethod("setStoragePolicy", Path.class, String.class); 577 m.setAccessible(true); 578 } catch (NoSuchMethodException e) { 579 toThrow = e; 580 final String msg = "FileSystem doesn't support setStoragePolicy; HDFS-6584, HDFS-9345 " + 581 "not available. This is normal and expected on earlier Hadoop versions."; 582 if (!warningMap.containsKey(fs)) { 583 warningMap.put(fs, true); 584 LOG.warn(msg, e); 585 } else if (LOG.isDebugEnabled()) { 586 LOG.debug(msg, e); 587 } 588 m = null; 589 } catch (SecurityException e) { 590 toThrow = e; 591 final String msg = "No access to setStoragePolicy on FileSystem from the SecurityManager; " + 592 "HDFS-6584, HDFS-9345 not available. This is unusual and probably warrants an email " + 593 "to the user@hbase mailing list. Please be sure to include a link to your configs, and " + 594 "logs that include this message and period of time before it. Logs around service " + 595 "start up will probably be useful as well."; 596 if (!warningMap.containsKey(fs)) { 597 warningMap.put(fs, true); 598 LOG.warn(msg, e); 599 } else if (LOG.isDebugEnabled()) { 600 LOG.debug(msg, e); 601 } 602 m = null; // could happen on setAccessible() or getDeclaredMethod() 603 } 604 if (m != null) { 605 try { 606 m.invoke(fs, path, storagePolicy); 607 if (LOG.isDebugEnabled()) { 608 LOG.debug("Set storagePolicy={} for path={}", storagePolicy, path); 609 } 610 } catch (Exception e) { 611 toThrow = e; 612 // This swallows FNFE, should we be throwing it? seems more likely to indicate dev 613 // misuse than a runtime problem with HDFS. 614 if (!warningMap.containsKey(fs)) { 615 warningMap.put(fs, true); 616 LOG.warn("Unable to set storagePolicy=" + storagePolicy + " for path=" + path + ". " + 617 "DEBUG log level might have more details.", e); 618 } else if (LOG.isDebugEnabled()) { 619 LOG.debug("Unable to set storagePolicy=" + storagePolicy + " for path=" + path, e); 620 } 621 // check for lack of HDFS-7228 622 if (e instanceof InvocationTargetException) { 623 final Throwable exception = e.getCause(); 624 if (exception instanceof RemoteException && 625 HadoopIllegalArgumentException.class.getName().equals( 626 ((RemoteException)exception).getClassName())) { 627 if (LOG.isDebugEnabled()) { 628 LOG.debug("Given storage policy, '" +storagePolicy +"', was rejected and probably " + 629 "isn't a valid policy for the version of Hadoop you're running. I.e. if you're " + 630 "trying to use SSD related policies then you're likely missing HDFS-7228. For " + 631 "more information see the 'ArchivalStorage' docs for your Hadoop release."); 632 } 633 // Hadoop 2.8+, 3.0-a1+ added FileSystem.setStoragePolicy with a default implementation 634 // that throws UnsupportedOperationException 635 } else if (exception instanceof UnsupportedOperationException) { 636 if (LOG.isDebugEnabled()) { 637 LOG.debug("The underlying FileSystem implementation doesn't support " + 638 "setStoragePolicy. This is probably intentional on their part, since HDFS-9345 " + 639 "appears to be present in your version of Hadoop. For more information check " + 640 "the Hadoop documentation on 'ArchivalStorage', the Hadoop FileSystem " + 641 "specification docs from HADOOP-11981, and/or related documentation from the " + 642 "provider of the underlying FileSystem (its name should appear in the " + 643 "stacktrace that accompanies this message). Note in particular that Hadoop's " + 644 "local filesystem implementation doesn't support storage policies.", exception); 645 } 646 } 647 } 648 } 649 } 650 if (toThrow != null) { 651 throw new IOException(toThrow); 652 } 653 } 654 655 /** 656 * @param conf must not be null 657 * @return True if this filesystem whose scheme is 'hdfs'. 658 * @throws IOException from underlying FileSystem 659 */ 660 public static boolean isHDFS(final Configuration conf) throws IOException { 661 FileSystem fs = FileSystem.get(conf); 662 String scheme = fs.getUri().getScheme(); 663 return scheme.equalsIgnoreCase("hdfs"); 664 } 665 666 /** 667 * Checks if the given path is the one with 'recovered.edits' dir. 668 * @param path must not be null 669 * @return True if we recovered edits 670 */ 671 public static boolean isRecoveredEdits(Path path) { 672 return path.toString().contains(HConstants.RECOVERED_EDITS_DIR); 673 } 674 675 /** 676 * @param conf must not be null 677 * @return Returns the filesystem of the hbase rootdir. 678 * @throws IOException from underlying FileSystem 679 */ 680 public static FileSystem getCurrentFileSystem(Configuration conf) throws IOException { 681 return getRootDir(conf).getFileSystem(conf); 682 } 683 684 /** 685 * Calls fs.listStatus() and treats FileNotFoundException as non-fatal 686 * This accommodates differences between hadoop versions, where hadoop 1 687 * does not throw a FileNotFoundException, and return an empty FileStatus[] 688 * while Hadoop 2 will throw FileNotFoundException. 689 * 690 * Where possible, prefer FSUtils#listStatusWithStatusFilter(FileSystem, 691 * Path, FileStatusFilter) instead. 692 * 693 * @param fs file system 694 * @param dir directory 695 * @param filter path filter 696 * @return null if dir is empty or doesn't exist, otherwise FileStatus array 697 */ 698 public static FileStatus[] listStatus(final FileSystem fs, 699 final Path dir, final PathFilter filter) throws IOException { 700 FileStatus [] status = null; 701 try { 702 status = filter == null ? fs.listStatus(dir) : fs.listStatus(dir, filter); 703 } catch (FileNotFoundException fnfe) { 704 // if directory doesn't exist, return null 705 if (LOG.isTraceEnabled()) { 706 LOG.trace("{} doesn't exist", dir); 707 } 708 } 709 if (status == null || status.length < 1) { 710 return null; 711 } 712 return status; 713 } 714 715 /** 716 * Calls fs.listStatus() and treats FileNotFoundException as non-fatal 717 * This would accommodates differences between hadoop versions 718 * 719 * @param fs file system 720 * @param dir directory 721 * @return null if dir is empty or doesn't exist, otherwise FileStatus array 722 */ 723 public static FileStatus[] listStatus(final FileSystem fs, final Path dir) throws IOException { 724 return listStatus(fs, dir, null); 725 } 726 727 /** 728 * Calls fs.listFiles() to get FileStatus and BlockLocations together for reducing rpc call 729 * 730 * @param fs file system 731 * @param dir directory 732 * @return LocatedFileStatus list 733 */ 734 public static List<LocatedFileStatus> listLocatedStatus(final FileSystem fs, 735 final Path dir) throws IOException { 736 List<LocatedFileStatus> status = null; 737 try { 738 RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fs 739 .listFiles(dir, false); 740 while (locatedFileStatusRemoteIterator.hasNext()) { 741 if (status == null) { 742 status = Lists.newArrayList(); 743 } 744 status.add(locatedFileStatusRemoteIterator.next()); 745 } 746 } catch (FileNotFoundException fnfe) { 747 // if directory doesn't exist, return null 748 if (LOG.isTraceEnabled()) { 749 LOG.trace("{} doesn't exist", dir); 750 } 751 } 752 return status; 753 } 754 755 /** 756 * Calls fs.delete() and returns the value returned by the fs.delete() 757 * 758 * @param fs must not be null 759 * @param path must not be null 760 * @param recursive delete tree rooted at path 761 * @return the value returned by the fs.delete() 762 * @throws IOException from underlying FileSystem 763 */ 764 public static boolean delete(final FileSystem fs, final Path path, final boolean recursive) 765 throws IOException { 766 return fs.delete(path, recursive); 767 } 768 769 /** 770 * Calls fs.exists(). Checks if the specified path exists 771 * 772 * @param fs must not be null 773 * @param path must not be null 774 * @return the value returned by fs.exists() 775 * @throws IOException from underlying FileSystem 776 */ 777 public static boolean isExists(final FileSystem fs, final Path path) throws IOException { 778 return fs.exists(path); 779 } 780 781 /** 782 * Log the current state of the filesystem from a certain root directory 783 * @param fs filesystem to investigate 784 * @param root root file/directory to start logging from 785 * @param log log to output information 786 * @throws IOException if an unexpected exception occurs 787 */ 788 public static void logFileSystemState(final FileSystem fs, final Path root, Logger log) 789 throws IOException { 790 log.debug("File system contents for path {}", root); 791 logFSTree(log, fs, root, "|-"); 792 } 793 794 /** 795 * Recursive helper to log the state of the FS 796 * 797 * @see #logFileSystemState(FileSystem, Path, Logger) 798 */ 799 private static void logFSTree(Logger log, final FileSystem fs, final Path root, String prefix) 800 throws IOException { 801 FileStatus[] files = listStatus(fs, root, null); 802 if (files == null) { 803 return; 804 } 805 806 for (FileStatus file : files) { 807 if (file.isDirectory()) { 808 log.debug(prefix + file.getPath().getName() + "/"); 809 logFSTree(log, fs, file.getPath(), prefix + "---"); 810 } else { 811 log.debug(prefix + file.getPath().getName()); 812 } 813 } 814 } 815 816 public static boolean renameAndSetModifyTime(final FileSystem fs, final Path src, final Path dest) 817 throws IOException { 818 // set the modify time for TimeToLive Cleaner 819 fs.setTimes(src, EnvironmentEdgeManager.currentTime(), -1); 820 return fs.rename(src, dest); 821 } 822 823 /** 824 * Check if short circuit read buffer size is set and if not, set it to hbase value. 825 * @param conf must not be null 826 */ 827 public static void checkShortCircuitReadBufferSize(final Configuration conf) { 828 final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2; 829 final int notSet = -1; 830 // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2 831 final String dfsKey = "dfs.client.read.shortcircuit.buffer.size"; 832 int size = conf.getInt(dfsKey, notSet); 833 // If a size is set, return -- we will use it. 834 if (size != notSet) { 835 return; 836 } 837 // But short circuit buffer size is normally not set. Put in place the hbase wanted size. 838 int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize); 839 conf.setIfUnset(dfsKey, Integer.toString(hbaseSize)); 840 } 841 842 private static class DfsBuilderUtility { 843 static Class<?> dfsClass = null; 844 static Method createMethod; 845 static Method overwriteMethod; 846 static Method bufferSizeMethod; 847 static Method blockSizeMethod; 848 static Method recursiveMethod; 849 static Method replicateMethod; 850 static Method replicationMethod; 851 static Method buildMethod; 852 static boolean allMethodsPresent = false; 853 854 static { 855 String dfsName = "org.apache.hadoop.hdfs.DistributedFileSystem"; 856 String builderName = dfsName + "$HdfsDataOutputStreamBuilder"; 857 Class<?> builderClass = null; 858 859 try { 860 dfsClass = Class.forName(dfsName); 861 } catch (ClassNotFoundException e) { 862 LOG.debug("{} not available, will not use builder API for file creation.", dfsName); 863 } 864 try { 865 builderClass = Class.forName(builderName); 866 } catch (ClassNotFoundException e) { 867 LOG.debug("{} not available, will not use builder API for file creation.", builderName); 868 } 869 870 if (dfsClass != null && builderClass != null) { 871 try { 872 createMethod = dfsClass.getMethod("createFile", Path.class); 873 overwriteMethod = builderClass.getMethod("overwrite", boolean.class); 874 bufferSizeMethod = builderClass.getMethod("bufferSize", int.class); 875 blockSizeMethod = builderClass.getMethod("blockSize", long.class); 876 recursiveMethod = builderClass.getMethod("recursive"); 877 replicateMethod = builderClass.getMethod("replicate"); 878 replicationMethod = builderClass.getMethod("replication", short.class); 879 buildMethod = builderClass.getMethod("build"); 880 881 allMethodsPresent = true; 882 LOG.debug("Using builder API via reflection for DFS file creation."); 883 } catch (NoSuchMethodException e) { 884 LOG.debug("Could not find method on builder; will use old DFS API for file creation {}", 885 e.getMessage()); 886 } 887 } 888 } 889 890 /** 891 * Attempt to use builder API via reflection to create a file with the given parameters and 892 * replication enabled. 893 */ 894 static FSDataOutputStream createHelper(FileSystem fs, Path path, boolean overwritable, 895 int bufferSize, short replication, long blockSize, boolean isRecursive) throws IOException { 896 if (allMethodsPresent && dfsClass.isInstance(fs)) { 897 try { 898 Object builder; 899 900 builder = createMethod.invoke(fs, path); 901 builder = overwriteMethod.invoke(builder, overwritable); 902 builder = bufferSizeMethod.invoke(builder, bufferSize); 903 builder = blockSizeMethod.invoke(builder, blockSize); 904 if (isRecursive) { 905 builder = recursiveMethod.invoke(builder); 906 } 907 builder = replicateMethod.invoke(builder); 908 builder = replicationMethod.invoke(builder, replication); 909 return (FSDataOutputStream) buildMethod.invoke(builder); 910 } catch (IllegalAccessException | InvocationTargetException e) { 911 // Should have caught this failure during initialization, so log full trace here 912 LOG.warn("Couldn't use reflection with builder API", e); 913 } 914 } 915 916 if (isRecursive) { 917 return fs.create(path, overwritable, bufferSize, replication, blockSize, null); 918 } 919 return fs.createNonRecursive(path, overwritable, bufferSize, replication, blockSize, null); 920 } 921 922 /** 923 * Attempt to use builder API via reflection to create a file with the given parameters and 924 * replication enabled. 925 */ 926 static FSDataOutputStream createHelper(FileSystem fs, Path path, boolean overwritable) 927 throws IOException { 928 if (allMethodsPresent && dfsClass.isInstance(fs)) { 929 try { 930 Object builder; 931 932 builder = createMethod.invoke(fs, path); 933 builder = overwriteMethod.invoke(builder, overwritable); 934 builder = replicateMethod.invoke(builder); 935 return (FSDataOutputStream) buildMethod.invoke(builder); 936 } catch (IllegalAccessException | InvocationTargetException e) { 937 // Should have caught this failure during initialization, so log full trace here 938 LOG.warn("Couldn't use reflection with builder API", e); 939 } 940 } 941 942 return fs.create(path, overwritable); 943 } 944 } 945 946 /** 947 * Attempt to use builder API via reflection to create a file with the given parameters and 948 * replication enabled. 949 * <p> 950 * Will not attempt to enable replication when passed an HFileSystem. 951 */ 952 public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwritable) 953 throws IOException { 954 return DfsBuilderUtility.createHelper(fs, path, overwritable); 955 } 956 957 /** 958 * Attempt to use builder API via reflection to create a file with the given parameters and 959 * replication enabled. 960 * <p> 961 * Will not attempt to enable replication when passed an HFileSystem. 962 */ 963 public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwritable, 964 int bufferSize, short replication, long blockSize, boolean isRecursive) throws IOException { 965 return DfsBuilderUtility.createHelper(fs, path, overwritable, bufferSize, replication, 966 blockSize, isRecursive); 967 } 968 969 // Holder singleton idiom. JVM spec ensures this will be run at most once per Classloader, and 970 // not until we attempt to reference it. 971 private static class StreamCapabilities { 972 public static final boolean PRESENT; 973 public static final Class<?> CLASS; 974 public static final Method METHOD; 975 static { 976 boolean tmp = false; 977 Class<?> clazz = null; 978 Method method = null; 979 try { 980 clazz = Class.forName("org.apache.hadoop.fs.StreamCapabilities"); 981 method = clazz.getMethod("hasCapability", String.class); 982 tmp = true; 983 } catch(ClassNotFoundException|NoSuchMethodException|SecurityException exception) { 984 LOG.warn("Your Hadoop installation does not include the StreamCapabilities class from " + 985 "HDFS-11644, so we will skip checking if any FSDataOutputStreams actually " + 986 "support hflush/hsync. If you are running on top of HDFS this probably just " + 987 "means you have an older version and this can be ignored. If you are running on " + 988 "top of an alternate FileSystem implementation you should manually verify that " + 989 "hflush and hsync are implemented; otherwise you risk data loss and hard to " + 990 "diagnose errors when our assumptions are violated."); 991 LOG.debug("The first request to check for StreamCapabilities came from this stacktrace.", 992 exception); 993 } finally { 994 PRESENT = tmp; 995 CLASS = clazz; 996 METHOD = method; 997 } 998 } 999 } 1000 1001 /** 1002 * If our FileSystem version includes the StreamCapabilities class, check if 1003 * the given stream has a particular capability. 1004 * @param stream capabilities are per-stream instance, so check this one specifically. must not be 1005 * null 1006 * @param capability what to look for, per Hadoop Common's FileSystem docs 1007 * @return true if there are no StreamCapabilities. false if there are, but this stream doesn't 1008 * implement it. return result of asking the stream otherwise. 1009 */ 1010 public static boolean hasCapability(FSDataOutputStream stream, String capability) { 1011 // be consistent whether or not StreamCapabilities is present 1012 if (stream == null) { 1013 throw new NullPointerException("stream parameter must not be null."); 1014 } 1015 // If o.a.h.fs.StreamCapabilities doesn't exist, assume everyone does everything 1016 // otherwise old versions of Hadoop will break. 1017 boolean result = true; 1018 if (StreamCapabilities.PRESENT) { 1019 // if StreamCapabilities is present, but the stream doesn't implement it 1020 // or we run into a problem invoking the method, 1021 // we treat that as equivalent to not declaring anything 1022 result = false; 1023 if (StreamCapabilities.CLASS.isAssignableFrom(stream.getClass())) { 1024 try { 1025 result = ((Boolean)StreamCapabilities.METHOD.invoke(stream, capability)).booleanValue(); 1026 } catch (IllegalAccessException|IllegalArgumentException|InvocationTargetException 1027 exception) { 1028 LOG.warn("Your Hadoop installation's StreamCapabilities implementation doesn't match " + 1029 "our understanding of how it's supposed to work. Please file a JIRA and include " + 1030 "the following stack trace. In the mean time we're interpreting this behavior " + 1031 "difference as a lack of capability support, which will probably cause a failure.", 1032 exception); 1033 } 1034 } 1035 } 1036 return result; 1037 } 1038 1039 /** 1040 * Helper exception for those cases where the place where we need to check a stream capability 1041 * is not where we have the needed context to explain the impact and mitigation for a lack. 1042 */ 1043 public static class StreamLacksCapabilityException extends Exception { 1044 public StreamLacksCapabilityException(String message, Throwable cause) { 1045 super(message, cause); 1046 } 1047 public StreamLacksCapabilityException(String message) { 1048 super(message); 1049 } 1050 } 1051}