001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.util; 020 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.lang.reflect.InvocationTargetException; 024import java.lang.reflect.Method; 025import java.net.URI; 026import java.net.URISyntaxException; 027import java.util.List; 028import java.util.Locale; 029import java.util.Map; 030import java.util.concurrent.ConcurrentHashMap; 031import org.apache.hadoop.HadoopIllegalArgumentException; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FSDataOutputStream; 034import org.apache.hadoop.fs.FileStatus; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.LocatedFileStatus; 037import org.apache.hadoop.fs.Path; 038import org.apache.hadoop.fs.PathFilter; 039import org.apache.hadoop.fs.RemoteIterator; 040import org.apache.hadoop.fs.permission.FsPermission; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.TableName; 043import org.apache.hadoop.ipc.RemoteException; 044import org.apache.yetus.audience.InterfaceAudience; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 049import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 050 051/** 052 * Utility methods for interacting with the underlying file system. 053 */ 054@InterfaceAudience.Private 055public abstract class CommonFSUtils { 056 private static final Logger LOG = LoggerFactory.getLogger(CommonFSUtils.class); 057 058 /** Parameter name for HBase WAL directory */ 059 public static final String HBASE_WAL_DIR = "hbase.wal.dir"; 060 061 /** Parameter to disable stream capability enforcement checks */ 062 public static final String UNSAFE_STREAM_CAPABILITY_ENFORCE = "hbase.unsafe.stream.capability.enforce"; 063 064 /** Full access permissions (starting point for a umask) */ 065 public static final String FULL_RWX_PERMISSIONS = "777"; 066 067 protected CommonFSUtils() { 068 super(); 069 } 070 071 /** 072 * Compare of path component. Does not consider schema; i.e. if schemas 073 * different but <code>path</code> starts with <code>rootPath</code>, 074 * then the function returns true 075 * @param rootPath value to check for 076 * @param path subject to check 077 * @return True if <code>path</code> starts with <code>rootPath</code> 078 */ 079 public static boolean isStartingWithPath(final Path rootPath, final String path) { 080 String uriRootPath = rootPath.toUri().getPath(); 081 String tailUriPath = (new Path(path)).toUri().getPath(); 082 return tailUriPath.startsWith(uriRootPath); 083 } 084 085 /** 086 * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the 087 * '/a/b/c' part. Does not consider schema; i.e. if schemas different but path or subpath matches, 088 * the two will equate. 089 * @param pathToSearch Path we will be trying to match against. 090 * @param pathTail what to match 091 * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code> 092 */ 093 public static boolean isMatchingTail(final Path pathToSearch, String pathTail) { 094 return isMatchingTail(pathToSearch, new Path(pathTail)); 095 } 096 097 /** 098 * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the 099 * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true. Does not consider 100 * schema; i.e. if schemas different but path or subpath matches, the two will equate. 101 * @param pathToSearch Path we will be trying to match agains against 102 * @param pathTail what to match 103 * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code> 104 */ 105 public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) { 106 if (pathToSearch.depth() != pathTail.depth()) { 107 return false; 108 } 109 Path tailPath = pathTail; 110 String tailName; 111 Path toSearch = pathToSearch; 112 String toSearchName; 113 boolean result = false; 114 do { 115 tailName = tailPath.getName(); 116 if (tailName == null || tailName.length() <= 0) { 117 result = true; 118 break; 119 } 120 toSearchName = toSearch.getName(); 121 if (toSearchName == null || toSearchName.length() <= 0) { 122 break; 123 } 124 // Move up a parent on each path for next go around. Path doesn't let us go off the end. 125 tailPath = tailPath.getParent(); 126 toSearch = toSearch.getParent(); 127 } while(tailName.equals(toSearchName)); 128 return result; 129 } 130 131 /** 132 * Delete if exists. 133 * @param fs filesystem object 134 * @param dir directory to delete 135 * @return True if deleted <code>dir</code> 136 * @throws IOException e 137 */ 138 public static boolean deleteDirectory(final FileSystem fs, final Path dir) 139 throws IOException { 140 return fs.exists(dir) && fs.delete(dir, true); 141 } 142 143 /** 144 * Return the number of bytes that large input files should be optimally 145 * be split into to minimize i/o time. 146 * 147 * use reflection to search for getDefaultBlockSize(Path f) 148 * if the method doesn't exist, fall back to using getDefaultBlockSize() 149 * 150 * @param fs filesystem object 151 * @return the default block size for the path's filesystem 152 * @throws IOException e 153 */ 154 public static long getDefaultBlockSize(final FileSystem fs, final Path path) throws IOException { 155 Method m = null; 156 Class<? extends FileSystem> cls = fs.getClass(); 157 try { 158 m = cls.getMethod("getDefaultBlockSize", new Class<?>[] { Path.class }); 159 } catch (NoSuchMethodException e) { 160 LOG.info("FileSystem doesn't support getDefaultBlockSize"); 161 } catch (SecurityException e) { 162 LOG.info("Doesn't have access to getDefaultBlockSize on FileSystems", e); 163 m = null; // could happen on setAccessible() 164 } 165 if (m == null) { 166 return fs.getDefaultBlockSize(path); 167 } else { 168 try { 169 Object ret = m.invoke(fs, path); 170 return ((Long)ret).longValue(); 171 } catch (Exception e) { 172 throw new IOException(e); 173 } 174 } 175 } 176 177 /* 178 * Get the default replication. 179 * 180 * use reflection to search for getDefaultReplication(Path f) 181 * if the method doesn't exist, fall back to using getDefaultReplication() 182 * 183 * @param fs filesystem object 184 * @param f path of file 185 * @return default replication for the path's filesystem 186 * @throws IOException e 187 */ 188 public static short getDefaultReplication(final FileSystem fs, final Path path) 189 throws IOException { 190 Method m = null; 191 Class<? extends FileSystem> cls = fs.getClass(); 192 try { 193 m = cls.getMethod("getDefaultReplication", new Class<?>[] { Path.class }); 194 } catch (NoSuchMethodException e) { 195 LOG.info("FileSystem doesn't support getDefaultReplication"); 196 } catch (SecurityException e) { 197 LOG.info("Doesn't have access to getDefaultReplication on FileSystems", e); 198 m = null; // could happen on setAccessible() 199 } 200 if (m == null) { 201 return fs.getDefaultReplication(path); 202 } else { 203 try { 204 Object ret = m.invoke(fs, path); 205 return ((Number)ret).shortValue(); 206 } catch (Exception e) { 207 throw new IOException(e); 208 } 209 } 210 } 211 212 /** 213 * Returns the default buffer size to use during writes. 214 * 215 * The size of the buffer should probably be a multiple of hardware 216 * page size (4096 on Intel x86), and it determines how much data is 217 * buffered during read and write operations. 218 * 219 * @param fs filesystem object 220 * @return default buffer size to use during writes 221 */ 222 public static int getDefaultBufferSize(final FileSystem fs) { 223 return fs.getConf().getInt("io.file.buffer.size", 4096); 224 } 225 226 /** 227 * Create the specified file on the filesystem. By default, this will: 228 * <ol> 229 * <li>apply the umask in the configuration (if it is enabled)</li> 230 * <li>use the fs configured buffer size (or 4096 if not set)</li> 231 * <li>use the default replication</li> 232 * <li>use the default block size</li> 233 * <li>not track progress</li> 234 * </ol> 235 * 236 * @param fs {@link FileSystem} on which to write the file 237 * @param path {@link Path} to the file to write 238 * @param perm intial permissions 239 * @param overwrite Whether or not the created file should be overwritten. 240 * @return output stream to the created file 241 * @throws IOException if the file cannot be created 242 */ 243 public static FSDataOutputStream create(FileSystem fs, Path path, 244 FsPermission perm, boolean overwrite) throws IOException { 245 if (LOG.isTraceEnabled()) { 246 LOG.trace("Creating file=" + path + " with permission=" + perm + ", overwrite=" + overwrite); 247 } 248 return fs.create(path, perm, overwrite, getDefaultBufferSize(fs), 249 getDefaultReplication(fs, path), getDefaultBlockSize(fs, path), null); 250 } 251 252 /** 253 * Get the file permissions specified in the configuration, if they are 254 * enabled. 255 * 256 * @param fs filesystem that the file will be created on. 257 * @param conf configuration to read for determining if permissions are 258 * enabled and which to use 259 * @param permssionConfKey property key in the configuration to use when 260 * finding the permission 261 * @return the permission to use when creating a new file on the fs. If 262 * special permissions are not specified in the configuration, then 263 * the default permissions on the the fs will be returned. 264 */ 265 public static FsPermission getFilePermissions(final FileSystem fs, 266 final Configuration conf, final String permssionConfKey) { 267 boolean enablePermissions = conf.getBoolean( 268 HConstants.ENABLE_DATA_FILE_UMASK, false); 269 270 if (enablePermissions) { 271 try { 272 FsPermission perm = new FsPermission(FULL_RWX_PERMISSIONS); 273 // make sure that we have a mask, if not, go default. 274 String mask = conf.get(permssionConfKey); 275 if (mask == null) { 276 return FsPermission.getFileDefault(); 277 } 278 // appy the umask 279 FsPermission umask = new FsPermission(mask); 280 return perm.applyUMask(umask); 281 } catch (IllegalArgumentException e) { 282 LOG.warn( 283 "Incorrect umask attempted to be created: " 284 + conf.get(permssionConfKey) 285 + ", using default file permissions.", e); 286 return FsPermission.getFileDefault(); 287 } 288 } 289 return FsPermission.getFileDefault(); 290 } 291 292 /** 293 * Verifies root directory path is a valid URI with a scheme 294 * 295 * @param root root directory path 296 * @return Passed <code>root</code> argument. 297 * @throws IOException if not a valid URI with a scheme 298 */ 299 public static Path validateRootPath(Path root) throws IOException { 300 try { 301 URI rootURI = new URI(root.toString()); 302 String scheme = rootURI.getScheme(); 303 if (scheme == null) { 304 throw new IOException("Root directory does not have a scheme"); 305 } 306 return root; 307 } catch (URISyntaxException e) { 308 IOException io = new IOException("Root directory path is not a valid " + 309 "URI -- check your " + HConstants.HBASE_DIR + " configuration"); 310 io.initCause(e); 311 throw io; 312 } 313 } 314 315 /** 316 * Checks for the presence of the WAL log root path (using the provided conf object) in the given 317 * path. If it exists, this method removes it and returns the String representation of remaining 318 * relative path. 319 * @param path must not be null 320 * @param conf must not be null 321 * @return String representation of the remaining relative path 322 * @throws IOException from underlying filesystem 323 */ 324 public static String removeWALRootPath(Path path, final Configuration conf) throws IOException { 325 Path root = getWALRootDir(conf); 326 String pathStr = path.toString(); 327 // check that the path is absolute... it has the root path in it. 328 if (!pathStr.startsWith(root.toString())) { 329 return pathStr; 330 } 331 // if not, return as it is. 332 return pathStr.substring(root.toString().length() + 1);// remove the "/" too. 333 } 334 335 /** 336 * Return the 'path' component of a Path. In Hadoop, Path is an URI. This 337 * method returns the 'path' component of a Path's URI: e.g. If a Path is 338 * <code>hdfs://example.org:9000/hbase_trunk/TestTable/compaction.dir</code>, 339 * this method returns <code>/hbase_trunk/TestTable/compaction.dir</code>. 340 * This method is useful if you want to print out a Path without qualifying 341 * Filesystem instance. 342 * @param p Filesystem Path whose 'path' component we are to return. 343 * @return Path portion of the Filesystem 344 */ 345 public static String getPath(Path p) { 346 return p.toUri().getPath(); 347 } 348 349 /** 350 * @param c configuration 351 * @return {@link Path} to hbase root directory from 352 * configuration as a qualified Path. 353 * @throws IOException e 354 */ 355 public static Path getRootDir(final Configuration c) throws IOException { 356 Path p = new Path(c.get(HConstants.HBASE_DIR)); 357 FileSystem fs = p.getFileSystem(c); 358 return p.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 359 } 360 361 public static void setRootDir(final Configuration c, final Path root) throws IOException { 362 c.set(HConstants.HBASE_DIR, root.toString()); 363 } 364 365 public static void setFsDefault(final Configuration c, final Path root) throws IOException { 366 c.set("fs.defaultFS", root.toString()); // for hadoop 0.21+ 367 } 368 369 public static FileSystem getRootDirFileSystem(final Configuration c) throws IOException { 370 Path p = getRootDir(c); 371 return p.getFileSystem(c); 372 } 373 374 /** 375 * @param c configuration 376 * @return {@link Path} to hbase log root directory: e.g. {@value HBASE_WAL_DIR} from 377 * configuration as a qualified Path. Defaults to HBase root dir. 378 * @throws IOException e 379 */ 380 public static Path getWALRootDir(final Configuration c) throws IOException { 381 382 Path p = new Path(c.get(HBASE_WAL_DIR, c.get(HConstants.HBASE_DIR))); 383 if (!isValidWALRootDir(p, c)) { 384 return getRootDir(c); 385 } 386 FileSystem fs = p.getFileSystem(c); 387 return p.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 388 } 389 390 @VisibleForTesting 391 public static void setWALRootDir(final Configuration c, final Path root) throws IOException { 392 c.set(HBASE_WAL_DIR, root.toString()); 393 } 394 395 public static FileSystem getWALFileSystem(final Configuration c) throws IOException { 396 Path p = getWALRootDir(c); 397 FileSystem fs = p.getFileSystem(c); 398 // hadoop-core does fs caching, so need to propogate this if set 399 String enforceStreamCapability = c.get(UNSAFE_STREAM_CAPABILITY_ENFORCE); 400 if (enforceStreamCapability != null) { 401 fs.getConf().set(UNSAFE_STREAM_CAPABILITY_ENFORCE, enforceStreamCapability); 402 } 403 return fs; 404 } 405 406 private static boolean isValidWALRootDir(Path walDir, final Configuration c) throws IOException { 407 Path rootDir = getRootDir(c); 408 FileSystem fs = walDir.getFileSystem(c); 409 Path qualifiedWalDir = walDir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 410 if (!qualifiedWalDir.equals(rootDir)) { 411 if (qualifiedWalDir.toString().startsWith(rootDir.toString() + "/")) { 412 throw new IllegalStateException("Illegal WAL directory specified. " + 413 "WAL directories are not permitted to be under the root directory if set."); 414 } 415 } 416 return true; 417 } 418 419 /** 420 * Returns the WAL region directory based on the given table name and region name 421 * @param conf configuration to determine WALRootDir 422 * @param tableName Table that the region is under 423 * @param encodedRegionName Region name used for creating the final region directory 424 * @return the region directory used to store WALs under the WALRootDir 425 * @throws IOException if there is an exception determining the WALRootDir 426 */ 427 public static Path getWALRegionDir(final Configuration conf, final TableName tableName, 428 final String encodedRegionName) throws IOException { 429 return new Path(getWALTableDir(conf, tableName), encodedRegionName); 430 } 431 432 /** 433 * Returns the Table directory under the WALRootDir for the specified table name 434 * @param conf configuration used to get the WALRootDir 435 * @param tableName Table to get the directory for 436 * @return a path to the WAL table directory for the specified table 437 * @throws IOException if there is an exception determining the WALRootDir 438 */ 439 public static Path getWALTableDir(final Configuration conf, final TableName tableName) 440 throws IOException { 441 Path baseDir = new Path(getWALRootDir(conf), HConstants.BASE_NAMESPACE_DIR); 442 return new Path(new Path(baseDir, tableName.getNamespaceAsString()), 443 tableName.getQualifierAsString()); 444 } 445 446 /** 447 * For backward compatibility with HBASE-20734, where we store recovered edits in a wrong 448 * directory without BASE_NAMESPACE_DIR. See HBASE-22617 for more details. 449 * @deprecated For compatibility, will be removed in 4.0.0. 450 */ 451 @Deprecated 452 public static Path getWrongWALRegionDir(final Configuration conf, final TableName tableName, 453 final String encodedRegionName) throws IOException { 454 Path wrongTableDir = new Path(new Path(getWALRootDir(conf), tableName.getNamespaceAsString()), 455 tableName.getQualifierAsString()); 456 return new Path(wrongTableDir, encodedRegionName); 457 } 458 459 /** 460 * Returns the {@link org.apache.hadoop.fs.Path} object representing the table directory under 461 * path rootdir 462 * 463 * @param rootdir qualified path of HBase root directory 464 * @param tableName name of table 465 * @return {@link org.apache.hadoop.fs.Path} for table 466 */ 467 public static Path getTableDir(Path rootdir, final TableName tableName) { 468 return new Path(getNamespaceDir(rootdir, tableName.getNamespaceAsString()), 469 tableName.getQualifierAsString()); 470 } 471 472 /** 473 * Returns the {@link org.apache.hadoop.hbase.TableName} object representing 474 * the table directory under 475 * path rootdir 476 * 477 * @param tablePath path of table 478 * @return {@link org.apache.hadoop.fs.Path} for table 479 */ 480 public static TableName getTableName(Path tablePath) { 481 return TableName.valueOf(tablePath.getParent().getName(), tablePath.getName()); 482 } 483 484 /** 485 * Returns the {@link org.apache.hadoop.fs.Path} object representing 486 * the namespace directory under path rootdir 487 * 488 * @param rootdir qualified path of HBase root directory 489 * @param namespace namespace name 490 * @return {@link org.apache.hadoop.fs.Path} for table 491 */ 492 public static Path getNamespaceDir(Path rootdir, final String namespace) { 493 return new Path(rootdir, new Path(HConstants.BASE_NAMESPACE_DIR, 494 new Path(namespace))); 495 } 496 497 // this mapping means that under a federated FileSystem implementation, we'll 498 // only log the first failure from any of the underlying FileSystems at WARN and all others 499 // will be at DEBUG. 500 private static final Map<FileSystem, Boolean> warningMap = 501 new ConcurrentHashMap<FileSystem, Boolean>(); 502 503 /** 504 * Sets storage policy for given path. 505 * If the passed path is a directory, we'll set the storage policy for all files 506 * created in the future in said directory. Note that this change in storage 507 * policy takes place at the FileSystem level; it will persist beyond this RS's lifecycle. 508 * If we're running on a version of FileSystem that doesn't support the given storage policy 509 * (or storage policies at all), then we'll issue a log message and continue. 510 * 511 * See http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html 512 * 513 * @param fs We only do anything it implements a setStoragePolicy method 514 * @param path the Path whose storage policy is to be set 515 * @param storagePolicy Policy to set on <code>path</code>; see hadoop 2.6+ 516 * org.apache.hadoop.hdfs.protocol.HdfsConstants for possible list e.g 517 * 'COLD', 'WARM', 'HOT', 'ONE_SSD', 'ALL_SSD', 'LAZY_PERSIST'. 518 */ 519 public static void setStoragePolicy(final FileSystem fs, final Path path, 520 final String storagePolicy) { 521 try { 522 setStoragePolicy(fs, path, storagePolicy, false); 523 } catch (IOException e) { 524 // should never arrive here 525 LOG.warn("We have chosen not to throw exception but some unexpectedly thrown out", e); 526 } 527 } 528 529 static void setStoragePolicy(final FileSystem fs, final Path path, final String storagePolicy, 530 boolean throwException) throws IOException { 531 if (storagePolicy == null) { 532 if (LOG.isTraceEnabled()) { 533 LOG.trace("We were passed a null storagePolicy, exiting early."); 534 } 535 return; 536 } 537 String trimmedStoragePolicy = storagePolicy.trim(); 538 if (trimmedStoragePolicy.isEmpty()) { 539 if (LOG.isTraceEnabled()) { 540 LOG.trace("We were passed an empty storagePolicy, exiting early."); 541 } 542 return; 543 } else { 544 trimmedStoragePolicy = trimmedStoragePolicy.toUpperCase(Locale.ROOT); 545 } 546 if (trimmedStoragePolicy.equals(HConstants.DEFER_TO_HDFS_STORAGE_POLICY)) { 547 if (LOG.isTraceEnabled()) { 548 LOG.trace("We were passed the defer-to-hdfs policy {}, exiting early.", 549 trimmedStoragePolicy); 550 } 551 return; 552 } 553 try { 554 invokeSetStoragePolicy(fs, path, trimmedStoragePolicy); 555 } catch (IOException e) { 556 if (LOG.isTraceEnabled()) { 557 LOG.trace("Failed to invoke set storage policy API on FS", e); 558 } 559 if (throwException) { 560 throw e; 561 } 562 } 563 } 564 565 /* 566 * All args have been checked and are good. Run the setStoragePolicy invocation. 567 */ 568 private static void invokeSetStoragePolicy(final FileSystem fs, final Path path, 569 final String storagePolicy) throws IOException { 570 Method m = null; 571 Exception toThrow = null; 572 try { 573 m = fs.getClass().getDeclaredMethod("setStoragePolicy", 574 new Class<?>[] { Path.class, String.class }); 575 m.setAccessible(true); 576 } catch (NoSuchMethodException e) { 577 toThrow = e; 578 final String msg = "FileSystem doesn't support setStoragePolicy; HDFS-6584, HDFS-9345 " + 579 "not available. This is normal and expected on earlier Hadoop versions."; 580 if (!warningMap.containsKey(fs)) { 581 warningMap.put(fs, true); 582 LOG.warn(msg, e); 583 } else if (LOG.isDebugEnabled()) { 584 LOG.debug(msg, e); 585 } 586 m = null; 587 } catch (SecurityException e) { 588 toThrow = e; 589 final String msg = "No access to setStoragePolicy on FileSystem from the SecurityManager; " + 590 "HDFS-6584, HDFS-9345 not available. This is unusual and probably warrants an email " + 591 "to the user@hbase mailing list. Please be sure to include a link to your configs, and " + 592 "logs that include this message and period of time before it. Logs around service " + 593 "start up will probably be useful as well."; 594 if (!warningMap.containsKey(fs)) { 595 warningMap.put(fs, true); 596 LOG.warn(msg, e); 597 } else if (LOG.isDebugEnabled()) { 598 LOG.debug(msg, e); 599 } 600 m = null; // could happen on setAccessible() or getDeclaredMethod() 601 } 602 if (m != null) { 603 try { 604 m.invoke(fs, path, storagePolicy); 605 if (LOG.isDebugEnabled()) { 606 LOG.debug("Set storagePolicy=" + storagePolicy + " for path=" + path); 607 } 608 } catch (Exception e) { 609 toThrow = e; 610 // This swallows FNFE, should we be throwing it? seems more likely to indicate dev 611 // misuse than a runtime problem with HDFS. 612 if (!warningMap.containsKey(fs)) { 613 warningMap.put(fs, true); 614 LOG.warn("Unable to set storagePolicy=" + storagePolicy + " for path=" + path + ". " + 615 "DEBUG log level might have more details.", e); 616 } else if (LOG.isDebugEnabled()) { 617 LOG.debug("Unable to set storagePolicy=" + storagePolicy + " for path=" + path, e); 618 } 619 // check for lack of HDFS-7228 620 if (e instanceof InvocationTargetException) { 621 final Throwable exception = e.getCause(); 622 if (exception instanceof RemoteException && 623 HadoopIllegalArgumentException.class.getName().equals( 624 ((RemoteException)exception).getClassName())) { 625 if (LOG.isDebugEnabled()) { 626 LOG.debug("Given storage policy, '" +storagePolicy +"', was rejected and probably " + 627 "isn't a valid policy for the version of Hadoop you're running. I.e. if you're " + 628 "trying to use SSD related policies then you're likely missing HDFS-7228. For " + 629 "more information see the 'ArchivalStorage' docs for your Hadoop release."); 630 } 631 // Hadoop 2.8+, 3.0-a1+ added FileSystem.setStoragePolicy with a default implementation 632 // that throws UnsupportedOperationException 633 } else if (exception instanceof UnsupportedOperationException) { 634 if (LOG.isDebugEnabled()) { 635 LOG.debug("The underlying FileSystem implementation doesn't support " + 636 "setStoragePolicy. This is probably intentional on their part, since HDFS-9345 " + 637 "appears to be present in your version of Hadoop. For more information check " + 638 "the Hadoop documentation on 'ArchivalStorage', the Hadoop FileSystem " + 639 "specification docs from HADOOP-11981, and/or related documentation from the " + 640 "provider of the underlying FileSystem (its name should appear in the " + 641 "stacktrace that accompanies this message). Note in particular that Hadoop's " + 642 "local filesystem implementation doesn't support storage policies.", exception); 643 } 644 } 645 } 646 } 647 } 648 if (toThrow != null) { 649 throw new IOException(toThrow); 650 } 651 } 652 653 /** 654 * @param conf must not be null 655 * @return True if this filesystem whose scheme is 'hdfs'. 656 * @throws IOException from underlying FileSystem 657 */ 658 public static boolean isHDFS(final Configuration conf) throws IOException { 659 FileSystem fs = FileSystem.get(conf); 660 String scheme = fs.getUri().getScheme(); 661 return scheme.equalsIgnoreCase("hdfs"); 662 } 663 664 /** 665 * Checks if the given path is the one with 'recovered.edits' dir. 666 * @param path must not be null 667 * @return True if we recovered edits 668 */ 669 public static boolean isRecoveredEdits(Path path) { 670 return path.toString().contains(HConstants.RECOVERED_EDITS_DIR); 671 } 672 673 /** 674 * @param conf must not be null 675 * @return Returns the filesystem of the hbase rootdir. 676 * @throws IOException from underlying FileSystem 677 */ 678 public static FileSystem getCurrentFileSystem(Configuration conf) 679 throws IOException { 680 return getRootDir(conf).getFileSystem(conf); 681 } 682 683 /** 684 * Calls fs.listStatus() and treats FileNotFoundException as non-fatal 685 * This accommodates differences between hadoop versions, where hadoop 1 686 * does not throw a FileNotFoundException, and return an empty FileStatus[] 687 * while Hadoop 2 will throw FileNotFoundException. 688 * 689 * Where possible, prefer FSUtils#listStatusWithStatusFilter(FileSystem, 690 * Path, FileStatusFilter) instead. 691 * 692 * @param fs file system 693 * @param dir directory 694 * @param filter path filter 695 * @return null if dir is empty or doesn't exist, otherwise FileStatus array 696 */ 697 public static FileStatus [] listStatus(final FileSystem fs, 698 final Path dir, final PathFilter filter) throws IOException { 699 FileStatus [] status = null; 700 try { 701 status = filter == null ? fs.listStatus(dir) : fs.listStatus(dir, filter); 702 } catch (FileNotFoundException fnfe) { 703 // if directory doesn't exist, return null 704 if (LOG.isTraceEnabled()) { 705 LOG.trace(dir + " doesn't exist"); 706 } 707 } 708 if (status == null || status.length < 1) { 709 return null; 710 } 711 return status; 712 } 713 714 /** 715 * Calls fs.listStatus() and treats FileNotFoundException as non-fatal 716 * This would accommodates differences between hadoop versions 717 * 718 * @param fs file system 719 * @param dir directory 720 * @return null if dir is empty or doesn't exist, otherwise FileStatus array 721 */ 722 public static FileStatus[] listStatus(final FileSystem fs, final Path dir) throws IOException { 723 return listStatus(fs, dir, null); 724 } 725 726 /** 727 * Calls fs.listFiles() to get FileStatus and BlockLocations together for reducing rpc call 728 * 729 * @param fs file system 730 * @param dir directory 731 * @return LocatedFileStatus list 732 */ 733 public static List<LocatedFileStatus> listLocatedStatus(final FileSystem fs, 734 final Path dir) throws IOException { 735 List<LocatedFileStatus> status = null; 736 try { 737 RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fs 738 .listFiles(dir, false); 739 while (locatedFileStatusRemoteIterator.hasNext()) { 740 if (status == null) { 741 status = Lists.newArrayList(); 742 } 743 status.add(locatedFileStatusRemoteIterator.next()); 744 } 745 } catch (FileNotFoundException fnfe) { 746 // if directory doesn't exist, return null 747 if (LOG.isTraceEnabled()) { 748 LOG.trace(dir + " doesn't exist"); 749 } 750 } 751 return status; 752 } 753 754 /** 755 * Calls fs.delete() and returns the value returned by the fs.delete() 756 * 757 * @param fs must not be null 758 * @param path must not be null 759 * @param recursive delete tree rooted at path 760 * @return the value returned by the fs.delete() 761 * @throws IOException from underlying FileSystem 762 */ 763 public static boolean delete(final FileSystem fs, final Path path, final boolean recursive) 764 throws IOException { 765 return fs.delete(path, recursive); 766 } 767 768 /** 769 * Calls fs.exists(). Checks if the specified path exists 770 * 771 * @param fs must not be null 772 * @param path must not be null 773 * @return the value returned by fs.exists() 774 * @throws IOException from underlying FileSystem 775 */ 776 public static boolean isExists(final FileSystem fs, final Path path) throws IOException { 777 return fs.exists(path); 778 } 779 780 /** 781 * Log the current state of the filesystem from a certain root directory 782 * @param fs filesystem to investigate 783 * @param root root file/directory to start logging from 784 * @param LOG log to output information 785 * @throws IOException if an unexpected exception occurs 786 */ 787 public static void logFileSystemState(final FileSystem fs, final Path root, Logger LOG) 788 throws IOException { 789 LOG.debug("File system contents for path " + root); 790 logFSTree(LOG, fs, root, "|-"); 791 } 792 793 /** 794 * Recursive helper to log the state of the FS 795 * 796 * @see #logFileSystemState(FileSystem, Path, Logger) 797 */ 798 private static void logFSTree(Logger LOG, final FileSystem fs, final Path root, String prefix) 799 throws IOException { 800 FileStatus[] files = listStatus(fs, root, null); 801 if (files == null) { 802 return; 803 } 804 805 for (FileStatus file : files) { 806 if (file.isDirectory()) { 807 LOG.debug(prefix + file.getPath().getName() + "/"); 808 logFSTree(LOG, fs, file.getPath(), prefix + "---"); 809 } else { 810 LOG.debug(prefix + file.getPath().getName()); 811 } 812 } 813 } 814 815 public static boolean renameAndSetModifyTime(final FileSystem fs, final Path src, final Path dest) 816 throws IOException { 817 // set the modify time for TimeToLive Cleaner 818 fs.setTimes(src, EnvironmentEdgeManager.currentTime(), -1); 819 return fs.rename(src, dest); 820 } 821 822 /** 823 * Do our short circuit read setup. 824 * Checks buffer size to use and whether to do checksumming in hbase or hdfs. 825 * @param conf must not be null 826 */ 827 public static void setupShortCircuitRead(final Configuration conf) { 828 // Check that the user has not set the "dfs.client.read.shortcircuit.skip.checksum" property. 829 boolean shortCircuitSkipChecksum = 830 conf.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false); 831 boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true); 832 if (shortCircuitSkipChecksum) { 833 LOG.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not " + 834 "be set to true." + (useHBaseChecksum ? " HBase checksum doesn't require " + 835 "it, see https://issues.apache.org/jira/browse/HBASE-6868." : "")); 836 assert !shortCircuitSkipChecksum; //this will fail if assertions are on 837 } 838 checkShortCircuitReadBufferSize(conf); 839 } 840 841 /** 842 * Check if short circuit read buffer size is set and if not, set it to hbase value. 843 * @param conf must not be null 844 */ 845 public static void checkShortCircuitReadBufferSize(final Configuration conf) { 846 final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2; 847 final int notSet = -1; 848 // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2 849 final String dfsKey = "dfs.client.read.shortcircuit.buffer.size"; 850 int size = conf.getInt(dfsKey, notSet); 851 // If a size is set, return -- we will use it. 852 if (size != notSet) { 853 return; 854 } 855 // But short circuit buffer size is normally not set. Put in place the hbase wanted size. 856 int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize); 857 conf.setIfUnset(dfsKey, Integer.toString(hbaseSize)); 858 } 859 860 private static class DfsBuilderUtility { 861 static Class<?> dfsClass = null; 862 static Method createMethod; 863 static Method overwriteMethod; 864 static Method bufferSizeMethod; 865 static Method blockSizeMethod; 866 static Method recursiveMethod; 867 static Method replicateMethod; 868 static Method replicationMethod; 869 static Method buildMethod; 870 static boolean allMethodsPresent = false; 871 872 static { 873 String dfsName = "org.apache.hadoop.hdfs.DistributedFileSystem"; 874 String builderName = dfsName + "$HdfsDataOutputStreamBuilder"; 875 Class<?> builderClass = null; 876 877 try { 878 dfsClass = Class.forName(dfsName); 879 } catch (ClassNotFoundException e) { 880 LOG.debug("{} not available, will not use builder API for file creation.", dfsName); 881 } 882 try { 883 builderClass = Class.forName(builderName); 884 } catch (ClassNotFoundException e) { 885 LOG.debug("{} not available, will not use builder API for file creation.", builderName); 886 } 887 888 if (dfsClass != null && builderClass != null) { 889 try { 890 createMethod = dfsClass.getMethod("createFile", Path.class); 891 overwriteMethod = builderClass.getMethod("overwrite", boolean.class); 892 bufferSizeMethod = builderClass.getMethod("bufferSize", int.class); 893 blockSizeMethod = builderClass.getMethod("blockSize", long.class); 894 recursiveMethod = builderClass.getMethod("recursive"); 895 replicateMethod = builderClass.getMethod("replicate"); 896 replicationMethod = builderClass.getMethod("replication", short.class); 897 buildMethod = builderClass.getMethod("build"); 898 899 allMethodsPresent = true; 900 LOG.debug("Using builder API via reflection for DFS file creation."); 901 } catch (NoSuchMethodException e) { 902 LOG.debug("Could not find method on builder; will use old DFS API for file creation {}", 903 e.getMessage()); 904 } 905 } 906 } 907 908 /** 909 * Attempt to use builder API via reflection to create a file with the given parameters and 910 * replication enabled. 911 */ 912 static FSDataOutputStream createHelper(FileSystem fs, Path path, boolean overwritable, 913 int bufferSize, short replication, long blockSize, boolean isRecursive) throws IOException { 914 if (allMethodsPresent && dfsClass.isInstance(fs)) { 915 try { 916 Object builder; 917 918 builder = createMethod.invoke(fs, path); 919 builder = overwriteMethod.invoke(builder, overwritable); 920 builder = bufferSizeMethod.invoke(builder, bufferSize); 921 builder = blockSizeMethod.invoke(builder, blockSize); 922 if (isRecursive) { 923 builder = recursiveMethod.invoke(builder); 924 } 925 builder = replicateMethod.invoke(builder); 926 builder = replicationMethod.invoke(builder, replication); 927 return (FSDataOutputStream) buildMethod.invoke(builder); 928 } catch (IllegalAccessException | InvocationTargetException e) { 929 // Should have caught this failure during initialization, so log full trace here 930 LOG.warn("Couldn't use reflection with builder API", e); 931 } 932 } 933 934 if (isRecursive) { 935 return fs.create(path, overwritable, bufferSize, replication, blockSize, null); 936 } 937 return fs.createNonRecursive(path, overwritable, bufferSize, replication, blockSize, null); 938 } 939 940 /** 941 * Attempt to use builder API via reflection to create a file with the given parameters and 942 * replication enabled. 943 */ 944 static FSDataOutputStream createHelper(FileSystem fs, Path path, boolean overwritable) 945 throws IOException { 946 if (allMethodsPresent && dfsClass.isInstance(fs)) { 947 try { 948 Object builder; 949 950 builder = createMethod.invoke(fs, path); 951 builder = overwriteMethod.invoke(builder, overwritable); 952 builder = replicateMethod.invoke(builder); 953 return (FSDataOutputStream) buildMethod.invoke(builder); 954 } catch (IllegalAccessException | InvocationTargetException e) { 955 // Should have caught this failure during initialization, so log full trace here 956 LOG.warn("Couldn't use reflection with builder API", e); 957 } 958 } 959 960 return fs.create(path, overwritable); 961 } 962 } 963 964 /** 965 * Attempt to use builder API via reflection to create a file with the given parameters and 966 * replication enabled. 967 * <p> 968 * Will not attempt to enable replication when passed an HFileSystem. 969 */ 970 public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwritable) 971 throws IOException { 972 return DfsBuilderUtility.createHelper(fs, path, overwritable); 973 } 974 975 /** 976 * Attempt to use builder API via reflection to create a file with the given parameters and 977 * replication enabled. 978 * <p> 979 * Will not attempt to enable replication when passed an HFileSystem. 980 */ 981 public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwritable, 982 int bufferSize, short replication, long blockSize, boolean isRecursive) throws IOException { 983 return DfsBuilderUtility.createHelper(fs, path, overwritable, bufferSize, replication, 984 blockSize, isRecursive); 985 } 986 987 // Holder singleton idiom. JVM spec ensures this will be run at most once per Classloader, and 988 // not until we attempt to reference it. 989 private static class StreamCapabilities { 990 public static final boolean PRESENT; 991 public static final Class<?> CLASS; 992 public static final Method METHOD; 993 static { 994 boolean tmp = false; 995 Class<?> clazz = null; 996 Method method = null; 997 try { 998 clazz = Class.forName("org.apache.hadoop.fs.StreamCapabilities"); 999 method = clazz.getMethod("hasCapability", String.class); 1000 tmp = true; 1001 } catch(ClassNotFoundException|NoSuchMethodException|SecurityException exception) { 1002 LOG.warn("Your Hadoop installation does not include the StreamCapabilities class from " + 1003 "HDFS-11644, so we will skip checking if any FSDataOutputStreams actually " + 1004 "support hflush/hsync. If you are running on top of HDFS this probably just " + 1005 "means you have an older version and this can be ignored. If you are running on " + 1006 "top of an alternate FileSystem implementation you should manually verify that " + 1007 "hflush and hsync are implemented; otherwise you risk data loss and hard to " + 1008 "diagnose errors when our assumptions are violated."); 1009 LOG.debug("The first request to check for StreamCapabilities came from this stacktrace.", 1010 exception); 1011 } finally { 1012 PRESENT = tmp; 1013 CLASS = clazz; 1014 METHOD = method; 1015 } 1016 } 1017 } 1018 1019 /** 1020 * If our FileSystem version includes the StreamCapabilities class, check if 1021 * the given stream has a particular capability. 1022 * @param stream capabilities are per-stream instance, so check this one specifically. must not be 1023 * null 1024 * @param capability what to look for, per Hadoop Common's FileSystem docs 1025 * @return true if there are no StreamCapabilities. false if there are, but this stream doesn't 1026 * implement it. return result of asking the stream otherwise. 1027 */ 1028 public static boolean hasCapability(FSDataOutputStream stream, String capability) { 1029 // be consistent whether or not StreamCapabilities is present 1030 if (stream == null) { 1031 throw new NullPointerException("stream parameter must not be null."); 1032 } 1033 // If o.a.h.fs.StreamCapabilities doesn't exist, assume everyone does everything 1034 // otherwise old versions of Hadoop will break. 1035 boolean result = true; 1036 if (StreamCapabilities.PRESENT) { 1037 // if StreamCapabilities is present, but the stream doesn't implement it 1038 // or we run into a problem invoking the method, 1039 // we treat that as equivalent to not declaring anything 1040 result = false; 1041 if (StreamCapabilities.CLASS.isAssignableFrom(stream.getClass())) { 1042 try { 1043 result = ((Boolean)StreamCapabilities.METHOD.invoke(stream, capability)).booleanValue(); 1044 } catch (IllegalAccessException|IllegalArgumentException|InvocationTargetException 1045 exception) { 1046 LOG.warn("Your Hadoop installation's StreamCapabilities implementation doesn't match " + 1047 "our understanding of how it's supposed to work. Please file a JIRA and include " + 1048 "the following stack trace. In the mean time we're interpreting this behavior " + 1049 "difference as a lack of capability support, which will probably cause a failure.", 1050 exception); 1051 } 1052 } 1053 } 1054 return result; 1055 } 1056 1057 /** 1058 * Helper exception for those cases where the place where we need to check a stream capability 1059 * is not where we have the needed context to explain the impact and mitigation for a lack. 1060 */ 1061 public static class StreamLacksCapabilityException extends Exception { 1062 public StreamLacksCapabilityException(String message, Throwable cause) { 1063 super(message, cause); 1064 } 1065 public StreamLacksCapabilityException(String message) { 1066 super(message); 1067 } 1068 } 1069}