001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.util; 020 021import edu.umd.cs.findbugs.annotations.CheckForNull; 022import java.io.ByteArrayInputStream; 023import java.io.DataInputStream; 024import java.io.EOFException; 025import java.io.FileNotFoundException; 026import java.io.IOException; 027import java.io.InterruptedIOException; 028import java.lang.reflect.InvocationTargetException; 029import java.lang.reflect.Method; 030import java.net.InetSocketAddress; 031import java.net.URI; 032import java.util.ArrayList; 033import java.util.Arrays; 034import java.util.Collection; 035import java.util.Collections; 036import java.util.HashMap; 037import java.util.HashSet; 038import java.util.Iterator; 039import java.util.List; 040import java.util.Locale; 041import java.util.Map; 042import java.util.Optional; 043import java.util.Set; 044import java.util.Vector; 045import java.util.concurrent.ConcurrentHashMap; 046import java.util.concurrent.ExecutionException; 047import java.util.concurrent.ExecutorService; 048import java.util.concurrent.Executors; 049import java.util.concurrent.Future; 050import java.util.concurrent.FutureTask; 051import java.util.concurrent.ThreadPoolExecutor; 052import java.util.concurrent.TimeUnit; 053import java.util.regex.Pattern; 054import org.apache.commons.lang3.ArrayUtils; 055import org.apache.hadoop.conf.Configuration; 056import org.apache.hadoop.fs.BlockLocation; 057import org.apache.hadoop.fs.FSDataInputStream; 058import org.apache.hadoop.fs.FSDataOutputStream; 059import org.apache.hadoop.fs.FileStatus; 060import org.apache.hadoop.fs.FileSystem; 061import org.apache.hadoop.fs.FileUtil; 062import org.apache.hadoop.fs.Path; 063import org.apache.hadoop.fs.PathFilter; 064import org.apache.hadoop.fs.StorageType; 065import org.apache.hadoop.fs.permission.FsPermission; 066import org.apache.hadoop.hbase.ClusterId; 067import org.apache.hadoop.hbase.HConstants; 068import org.apache.hadoop.hbase.HDFSBlocksDistribution; 069import org.apache.hadoop.hbase.TableName; 070import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 071import org.apache.hadoop.hbase.client.RegionInfo; 072import org.apache.hadoop.hbase.client.RegionInfoBuilder; 073import org.apache.hadoop.hbase.exceptions.DeserializationException; 074import org.apache.hadoop.hbase.fs.HFileSystem; 075import org.apache.hadoop.hbase.io.HFileLink; 076import org.apache.hadoop.hbase.master.HMaster; 077import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 078import org.apache.hadoop.hdfs.DFSClient; 079import org.apache.hadoop.hdfs.DFSHedgedReadMetrics; 080import org.apache.hadoop.hdfs.DFSUtil; 081import org.apache.hadoop.hdfs.DistributedFileSystem; 082import org.apache.hadoop.hdfs.protocol.HdfsConstants; 083import org.apache.hadoop.io.IOUtils; 084import org.apache.hadoop.ipc.RemoteException; 085import org.apache.hadoop.util.Progressable; 086import org.apache.yetus.audience.InterfaceAudience; 087import org.slf4j.Logger; 088import org.slf4j.LoggerFactory; 089 090import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 091import org.apache.hbase.thirdparty.com.google.common.collect.Iterators; 092import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 093import org.apache.hbase.thirdparty.com.google.common.primitives.Ints; 094import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 095 096import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 097import org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos; 098 099/** 100 * Utility methods for interacting with the underlying file system. 101 */ 102@InterfaceAudience.Private 103public final class FSUtils { 104 private static final Logger LOG = LoggerFactory.getLogger(FSUtils.class); 105 106 private static final String THREAD_POOLSIZE = "hbase.client.localityCheck.threadPoolSize"; 107 private static final int DEFAULT_THREAD_POOLSIZE = 2; 108 109 /** Set to true on Windows platforms */ 110 // currently only used in testing. TODO refactor into a test class 111 public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows"); 112 113 private FSUtils() { 114 } 115 116 /** 117 * @return True is <code>fs</code> is instance of DistributedFileSystem 118 * @throws IOException 119 */ 120 public static boolean isDistributedFileSystem(final FileSystem fs) throws IOException { 121 FileSystem fileSystem = fs; 122 // If passed an instance of HFileSystem, it fails instanceof DistributedFileSystem. 123 // Check its backing fs for dfs-ness. 124 if (fs instanceof HFileSystem) { 125 fileSystem = ((HFileSystem)fs).getBackingFs(); 126 } 127 return fileSystem instanceof DistributedFileSystem; 128 } 129 130 /** 131 * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the 132 * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true. Does not consider 133 * schema; i.e. if schemas different but path or subpath matches, the two will equate. 134 * @param pathToSearch Path we will be trying to match. 135 * @param pathTail 136 * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code> 137 */ 138 public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) { 139 Path tailPath = pathTail; 140 String tailName; 141 Path toSearch = pathToSearch; 142 String toSearchName; 143 boolean result = false; 144 145 if (pathToSearch.depth() != pathTail.depth()) { 146 return false; 147 } 148 149 do { 150 tailName = tailPath.getName(); 151 if (tailName == null || tailName.isEmpty()) { 152 result = true; 153 break; 154 } 155 toSearchName = toSearch.getName(); 156 if (toSearchName == null || toSearchName.isEmpty()) { 157 break; 158 } 159 // Move up a parent on each path for next go around. Path doesn't let us go off the end. 160 tailPath = tailPath.getParent(); 161 toSearch = toSearch.getParent(); 162 } while(tailName.equals(toSearchName)); 163 return result; 164 } 165 166 /** 167 * Delete the region directory if exists. 168 * @return True if deleted the region directory. 169 * @throws IOException 170 */ 171 public static boolean deleteRegionDir(final Configuration conf, final RegionInfo hri) 172 throws IOException { 173 Path rootDir = CommonFSUtils.getRootDir(conf); 174 FileSystem fs = rootDir.getFileSystem(conf); 175 return CommonFSUtils.deleteDirectory(fs, 176 new Path(CommonFSUtils.getTableDir(rootDir, hri.getTable()), hri.getEncodedName())); 177 } 178 179 /** 180 * Create the specified file on the filesystem. By default, this will: 181 * <ol> 182 * <li>overwrite the file if it exists</li> 183 * <li>apply the umask in the configuration (if it is enabled)</li> 184 * <li>use the fs configured buffer size (or 4096 if not set)</li> 185 * <li>use the configured column family replication or default replication if 186 * {@link ColumnFamilyDescriptorBuilder#DEFAULT_DFS_REPLICATION}</li> 187 * <li>use the default block size</li> 188 * <li>not track progress</li> 189 * </ol> 190 * @param conf configurations 191 * @param fs {@link FileSystem} on which to write the file 192 * @param path {@link Path} to the file to write 193 * @param perm permissions 194 * @param favoredNodes favored data nodes 195 * @return output stream to the created file 196 * @throws IOException if the file cannot be created 197 */ 198 public static FSDataOutputStream create(Configuration conf, FileSystem fs, Path path, 199 FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException { 200 if (fs instanceof HFileSystem) { 201 FileSystem backingFs = ((HFileSystem) fs).getBackingFs(); 202 if (backingFs instanceof DistributedFileSystem) { 203 // Try to use the favoredNodes version via reflection to allow backwards- 204 // compatibility. 205 short replication = Short.parseShort(conf.get(ColumnFamilyDescriptorBuilder.DFS_REPLICATION, 206 String.valueOf(ColumnFamilyDescriptorBuilder.DEFAULT_DFS_REPLICATION))); 207 try { 208 return (FSDataOutputStream) (DistributedFileSystem.class 209 .getDeclaredMethod("create", Path.class, FsPermission.class, boolean.class, int.class, 210 short.class, long.class, Progressable.class, InetSocketAddress[].class) 211 .invoke(backingFs, path, perm, true, CommonFSUtils.getDefaultBufferSize(backingFs), 212 replication > 0 ? replication : CommonFSUtils.getDefaultReplication(backingFs, path), 213 CommonFSUtils.getDefaultBlockSize(backingFs, path), null, favoredNodes)); 214 } catch (InvocationTargetException ite) { 215 // Function was properly called, but threw it's own exception. 216 throw new IOException(ite.getCause()); 217 } catch (NoSuchMethodException e) { 218 LOG.debug("DFS Client does not support most favored nodes create; using default create"); 219 LOG.trace("Ignoring; use default create", e); 220 } catch (IllegalArgumentException | SecurityException | IllegalAccessException e) { 221 LOG.debug("Ignoring (most likely Reflection related exception) " + e); 222 } 223 } 224 } 225 return CommonFSUtils.create(fs, path, perm, true); 226 } 227 228 /** 229 * Checks to see if the specified file system is available 230 * 231 * @param fs filesystem 232 * @throws IOException e 233 */ 234 public static void checkFileSystemAvailable(final FileSystem fs) 235 throws IOException { 236 if (!(fs instanceof DistributedFileSystem)) { 237 return; 238 } 239 IOException exception = null; 240 DistributedFileSystem dfs = (DistributedFileSystem) fs; 241 try { 242 if (dfs.exists(new Path("/"))) { 243 return; 244 } 245 } catch (IOException e) { 246 exception = e instanceof RemoteException ? 247 ((RemoteException)e).unwrapRemoteException() : e; 248 } 249 try { 250 fs.close(); 251 } catch (Exception e) { 252 LOG.error("file system close failed: ", e); 253 } 254 throw new IOException("File system is not available", exception); 255 } 256 257 /** 258 * We use reflection because {@link DistributedFileSystem#setSafeMode( 259 * HdfsConstants.SafeModeAction action, boolean isChecked)} is not in hadoop 1.1 260 * 261 * @param dfs 262 * @return whether we're in safe mode 263 * @throws IOException 264 */ 265 private static boolean isInSafeMode(DistributedFileSystem dfs) throws IOException { 266 boolean inSafeMode = false; 267 try { 268 Method m = DistributedFileSystem.class.getMethod("setSafeMode", new Class<?> []{ 269 org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.class, boolean.class}); 270 inSafeMode = (Boolean) m.invoke(dfs, 271 org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET, true); 272 } catch (Exception e) { 273 if (e instanceof IOException) throw (IOException) e; 274 275 // Check whether dfs is on safemode. 276 inSafeMode = dfs.setSafeMode( 277 org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET); 278 } 279 return inSafeMode; 280 } 281 282 /** 283 * Check whether dfs is in safemode. 284 * @param conf 285 * @throws IOException 286 */ 287 public static void checkDfsSafeMode(final Configuration conf) 288 throws IOException { 289 boolean isInSafeMode = false; 290 FileSystem fs = FileSystem.get(conf); 291 if (fs instanceof DistributedFileSystem) { 292 DistributedFileSystem dfs = (DistributedFileSystem)fs; 293 isInSafeMode = isInSafeMode(dfs); 294 } 295 if (isInSafeMode) { 296 throw new IOException("File system is in safemode, it can't be written now"); 297 } 298 } 299 300 /** 301 * Verifies current version of file system 302 * 303 * @param fs filesystem object 304 * @param rootdir root hbase directory 305 * @return null if no version file exists, version string otherwise 306 * @throws IOException if the version file fails to open 307 * @throws DeserializationException if the version data cannot be translated into a version 308 */ 309 public static String getVersion(FileSystem fs, Path rootdir) 310 throws IOException, DeserializationException { 311 final Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME); 312 FileStatus[] status = null; 313 try { 314 // hadoop 2.0 throws FNFE if directory does not exist. 315 // hadoop 1.0 returns null if directory does not exist. 316 status = fs.listStatus(versionFile); 317 } catch (FileNotFoundException fnfe) { 318 return null; 319 } 320 if (ArrayUtils.getLength(status) == 0) { 321 return null; 322 } 323 String version = null; 324 byte [] content = new byte [(int)status[0].getLen()]; 325 FSDataInputStream s = fs.open(versionFile); 326 try { 327 IOUtils.readFully(s, content, 0, content.length); 328 if (ProtobufUtil.isPBMagicPrefix(content)) { 329 version = parseVersionFrom(content); 330 } else { 331 // Presume it pre-pb format. 332 try (DataInputStream dis = new DataInputStream(new ByteArrayInputStream(content))) { 333 version = dis.readUTF(); 334 } 335 } 336 } catch (EOFException eof) { 337 LOG.warn("Version file was empty, odd, will try to set it."); 338 } finally { 339 s.close(); 340 } 341 return version; 342 } 343 344 /** 345 * Parse the content of the ${HBASE_ROOTDIR}/hbase.version file. 346 * @param bytes The byte content of the hbase.version file 347 * @return The version found in the file as a String 348 * @throws DeserializationException if the version data cannot be translated into a version 349 */ 350 static String parseVersionFrom(final byte [] bytes) 351 throws DeserializationException { 352 ProtobufUtil.expectPBMagicPrefix(bytes); 353 int pblen = ProtobufUtil.lengthOfPBMagic(); 354 FSProtos.HBaseVersionFileContent.Builder builder = 355 FSProtos.HBaseVersionFileContent.newBuilder(); 356 try { 357 ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen); 358 return builder.getVersion(); 359 } catch (IOException e) { 360 // Convert 361 throw new DeserializationException(e); 362 } 363 } 364 365 /** 366 * Create the content to write into the ${HBASE_ROOTDIR}/hbase.version file. 367 * @param version Version to persist 368 * @return Serialized protobuf with <code>version</code> content and a bit of pb magic for a prefix. 369 */ 370 static byte [] toVersionByteArray(final String version) { 371 FSProtos.HBaseVersionFileContent.Builder builder = 372 FSProtos.HBaseVersionFileContent.newBuilder(); 373 return ProtobufUtil.prependPBMagic(builder.setVersion(version).build().toByteArray()); 374 } 375 376 /** 377 * Verifies current version of file system 378 * 379 * @param fs file system 380 * @param rootdir root directory of HBase installation 381 * @param message if true, issues a message on System.out 382 * @throws IOException if the version file cannot be opened 383 * @throws DeserializationException if the contents of the version file cannot be parsed 384 */ 385 public static void checkVersion(FileSystem fs, Path rootdir, boolean message) 386 throws IOException, DeserializationException { 387 checkVersion(fs, rootdir, message, 0, HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS); 388 } 389 390 /** 391 * Verifies current version of file system 392 * 393 * @param fs file system 394 * @param rootdir root directory of HBase installation 395 * @param message if true, issues a message on System.out 396 * @param wait wait interval 397 * @param retries number of times to retry 398 * 399 * @throws IOException if the version file cannot be opened 400 * @throws DeserializationException if the contents of the version file cannot be parsed 401 */ 402 public static void checkVersion(FileSystem fs, Path rootdir, 403 boolean message, int wait, int retries) 404 throws IOException, DeserializationException { 405 String version = getVersion(fs, rootdir); 406 String msg; 407 if (version == null) { 408 if (!metaRegionExists(fs, rootdir)) { 409 // rootDir is empty (no version file and no root region) 410 // just create new version file (HBASE-1195) 411 setVersion(fs, rootdir, wait, retries); 412 return; 413 } else { 414 msg = "hbase.version file is missing. Is your hbase.rootdir valid? " + 415 "You can restore hbase.version file by running 'HBCK2 filesystem -fix'. " + 416 "See https://github.com/apache/hbase-operator-tools/tree/master/hbase-hbck2"; 417 } 418 } else if (version.compareTo(HConstants.FILE_SYSTEM_VERSION) == 0) { 419 return; 420 } else { 421 msg = "HBase file layout needs to be upgraded. Current filesystem version is " + version + 422 " but software requires version " + HConstants.FILE_SYSTEM_VERSION + 423 ". Consult http://hbase.apache.org/book.html for further information about " + 424 "upgrading HBase."; 425 } 426 427 // version is deprecated require migration 428 // Output on stdout so user sees it in terminal. 429 if (message) { 430 System.out.println("WARNING! " + msg); 431 } 432 throw new FileSystemVersionException(msg); 433 } 434 435 /** 436 * Sets version of file system 437 * 438 * @param fs filesystem object 439 * @param rootdir hbase root 440 * @throws IOException e 441 */ 442 public static void setVersion(FileSystem fs, Path rootdir) 443 throws IOException { 444 setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, 0, 445 HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS); 446 } 447 448 /** 449 * Sets version of file system 450 * 451 * @param fs filesystem object 452 * @param rootdir hbase root 453 * @param wait time to wait for retry 454 * @param retries number of times to retry before failing 455 * @throws IOException e 456 */ 457 public static void setVersion(FileSystem fs, Path rootdir, int wait, int retries) 458 throws IOException { 459 setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, wait, retries); 460 } 461 462 463 /** 464 * Sets version of file system 465 * 466 * @param fs filesystem object 467 * @param rootdir hbase root directory 468 * @param version version to set 469 * @param wait time to wait for retry 470 * @param retries number of times to retry before throwing an IOException 471 * @throws IOException e 472 */ 473 public static void setVersion(FileSystem fs, Path rootdir, String version, 474 int wait, int retries) throws IOException { 475 Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME); 476 Path tempVersionFile = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY + Path.SEPARATOR + 477 HConstants.VERSION_FILE_NAME); 478 while (true) { 479 try { 480 // Write the version to a temporary file 481 FSDataOutputStream s = fs.create(tempVersionFile); 482 try { 483 s.write(toVersionByteArray(version)); 484 s.close(); 485 s = null; 486 // Move the temp version file to its normal location. Returns false 487 // if the rename failed. Throw an IOE in that case. 488 if (!fs.rename(tempVersionFile, versionFile)) { 489 throw new IOException("Unable to move temp version file to " + versionFile); 490 } 491 } finally { 492 // Cleaning up the temporary if the rename failed would be trying 493 // too hard. We'll unconditionally create it again the next time 494 // through anyway, files are overwritten by default by create(). 495 496 // Attempt to close the stream on the way out if it is still open. 497 try { 498 if (s != null) s.close(); 499 } catch (IOException ignore) { } 500 } 501 LOG.info("Created version file at " + rootdir.toString() + " with version=" + version); 502 return; 503 } catch (IOException e) { 504 if (retries > 0) { 505 LOG.debug("Unable to create version file at " + rootdir.toString() + ", retrying", e); 506 fs.delete(versionFile, false); 507 try { 508 if (wait > 0) { 509 Thread.sleep(wait); 510 } 511 } catch (InterruptedException ie) { 512 throw (InterruptedIOException)new InterruptedIOException().initCause(ie); 513 } 514 retries--; 515 } else { 516 throw e; 517 } 518 } 519 } 520 } 521 522 /** 523 * Checks that a cluster ID file exists in the HBase root directory 524 * @param fs the root directory FileSystem 525 * @param rootdir the HBase root directory in HDFS 526 * @param wait how long to wait between retries 527 * @return <code>true</code> if the file exists, otherwise <code>false</code> 528 * @throws IOException if checking the FileSystem fails 529 */ 530 public static boolean checkClusterIdExists(FileSystem fs, Path rootdir, 531 long wait) throws IOException { 532 while (true) { 533 try { 534 Path filePath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME); 535 return fs.exists(filePath); 536 } catch (IOException ioe) { 537 if (wait > 0L) { 538 LOG.warn("Unable to check cluster ID file in {}, retrying in {}ms", rootdir, wait, ioe); 539 try { 540 Thread.sleep(wait); 541 } catch (InterruptedException e) { 542 Thread.currentThread().interrupt(); 543 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 544 } 545 } else { 546 throw ioe; 547 } 548 } 549 } 550 } 551 552 /** 553 * Returns the value of the unique cluster ID stored for this HBase instance. 554 * @param fs the root directory FileSystem 555 * @param rootdir the path to the HBase root directory 556 * @return the unique cluster identifier 557 * @throws IOException if reading the cluster ID file fails 558 */ 559 public static ClusterId getClusterId(FileSystem fs, Path rootdir) 560 throws IOException { 561 Path idPath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME); 562 ClusterId clusterId = null; 563 FileStatus status = fs.exists(idPath)? fs.getFileStatus(idPath): null; 564 if (status != null) { 565 int len = Ints.checkedCast(status.getLen()); 566 byte [] content = new byte[len]; 567 FSDataInputStream in = fs.open(idPath); 568 try { 569 in.readFully(content); 570 } catch (EOFException eof) { 571 LOG.warn("Cluster ID file {} is empty", idPath); 572 } finally{ 573 in.close(); 574 } 575 try { 576 clusterId = ClusterId.parseFrom(content); 577 } catch (DeserializationException e) { 578 throw new IOException("content=" + Bytes.toString(content), e); 579 } 580 // If not pb'd, make it so. 581 if (!ProtobufUtil.isPBMagicPrefix(content)) { 582 String cid = null; 583 in = fs.open(idPath); 584 try { 585 cid = in.readUTF(); 586 clusterId = new ClusterId(cid); 587 } catch (EOFException eof) { 588 LOG.warn("Cluster ID file {} is empty", idPath); 589 } finally { 590 in.close(); 591 } 592 rewriteAsPb(fs, rootdir, idPath, clusterId); 593 } 594 return clusterId; 595 } else { 596 LOG.warn("Cluster ID file does not exist at {}", idPath); 597 } 598 return clusterId; 599 } 600 601 /** 602 * @param cid 603 * @throws IOException 604 */ 605 private static void rewriteAsPb(final FileSystem fs, final Path rootdir, final Path p, 606 final ClusterId cid) 607 throws IOException { 608 // Rewrite the file as pb. Move aside the old one first, write new 609 // then delete the moved-aside file. 610 Path movedAsideName = new Path(p + "." + System.currentTimeMillis()); 611 if (!fs.rename(p, movedAsideName)) throw new IOException("Failed rename of " + p); 612 setClusterId(fs, rootdir, cid, 100); 613 if (!fs.delete(movedAsideName, false)) { 614 throw new IOException("Failed delete of " + movedAsideName); 615 } 616 LOG.debug("Rewrote the hbase.id file as pb"); 617 } 618 619 /** 620 * Writes a new unique identifier for this cluster to the "hbase.id" file in the HBase root 621 * directory. If any operations on the ID file fails, and {@code wait} is a positive value, the 622 * method will retry to produce the ID file until the thread is forcibly interrupted. 623 * 624 * @param fs the root directory FileSystem 625 * @param rootdir the path to the HBase root directory 626 * @param clusterId the unique identifier to store 627 * @param wait how long (in milliseconds) to wait between retries 628 * @throws IOException if writing to the FileSystem fails and no wait value 629 */ 630 public static void setClusterId(final FileSystem fs, final Path rootdir, 631 final ClusterId clusterId, final long wait) throws IOException { 632 633 final Path idFile = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME); 634 final Path tempDir = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY); 635 final Path tempIdFile = new Path(tempDir, HConstants.CLUSTER_ID_FILE_NAME); 636 637 LOG.debug("Create cluster ID file [{}] with ID: {}", idFile, clusterId); 638 639 while (true) { 640 Optional<IOException> failure = Optional.empty(); 641 642 LOG.debug("Write the cluster ID file to a temporary location: {}", tempIdFile); 643 try (FSDataOutputStream s = fs.create(tempIdFile)) { 644 s.write(clusterId.toByteArray()); 645 } catch (IOException ioe) { 646 failure = Optional.of(ioe); 647 } 648 649 if (!failure.isPresent()) { 650 try { 651 LOG.debug("Move the temporary cluster ID file to its target location [{}]:[{}]", 652 tempIdFile, idFile); 653 654 if (!fs.rename(tempIdFile, idFile)) { 655 failure = 656 Optional.of(new IOException("Unable to move temp cluster ID file to " + idFile)); 657 } 658 } catch (IOException ioe) { 659 failure = Optional.of(ioe); 660 } 661 } 662 663 if (failure.isPresent()) { 664 final IOException cause = failure.get(); 665 if (wait > 0L) { 666 LOG.warn("Unable to create cluster ID file in {}, retrying in {}ms", rootdir, wait, 667 cause); 668 try { 669 Thread.sleep(wait); 670 } catch (InterruptedException e) { 671 Thread.currentThread().interrupt(); 672 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 673 } 674 continue; 675 } else { 676 throw cause; 677 } 678 } else { 679 return; 680 } 681 } 682 } 683 684 /** 685 * If DFS, check safe mode and if so, wait until we clear it. 686 * @param conf configuration 687 * @param wait Sleep between retries 688 * @throws IOException e 689 */ 690 public static void waitOnSafeMode(final Configuration conf, 691 final long wait) 692 throws IOException { 693 FileSystem fs = FileSystem.get(conf); 694 if (!(fs instanceof DistributedFileSystem)) return; 695 DistributedFileSystem dfs = (DistributedFileSystem)fs; 696 // Make sure dfs is not in safe mode 697 while (isInSafeMode(dfs)) { 698 LOG.info("Waiting for dfs to exit safe mode..."); 699 try { 700 Thread.sleep(wait); 701 } catch (InterruptedException e) { 702 Thread.currentThread().interrupt(); 703 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 704 } 705 } 706 } 707 708 /** 709 * Checks if meta region exists 710 * @param fs file system 711 * @param rootDir root directory of HBase installation 712 * @return true if exists 713 */ 714 public static boolean metaRegionExists(FileSystem fs, Path rootDir) throws IOException { 715 Path metaRegionDir = getRegionDirFromRootDir(rootDir, RegionInfoBuilder.FIRST_META_REGIONINFO); 716 return fs.exists(metaRegionDir); 717 } 718 719 /** 720 * Compute HDFS blocks distribution of a given file, or a portion of the file 721 * @param fs file system 722 * @param status file status of the file 723 * @param start start position of the portion 724 * @param length length of the portion 725 * @return The HDFS blocks distribution 726 */ 727 static public HDFSBlocksDistribution computeHDFSBlocksDistribution( 728 final FileSystem fs, FileStatus status, long start, long length) 729 throws IOException { 730 HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution(); 731 BlockLocation [] blockLocations = 732 fs.getFileBlockLocations(status, start, length); 733 addToHDFSBlocksDistribution(blocksDistribution, blockLocations); 734 return blocksDistribution; 735 } 736 737 /** 738 * Update blocksDistribution with blockLocations 739 * @param blocksDistribution the hdfs blocks distribution 740 * @param blockLocations an array containing block location 741 */ 742 static public void addToHDFSBlocksDistribution( 743 HDFSBlocksDistribution blocksDistribution, BlockLocation[] blockLocations) 744 throws IOException { 745 for (BlockLocation bl : blockLocations) { 746 String[] hosts = bl.getHosts(); 747 long len = bl.getLength(); 748 StorageType[] storageTypes = bl.getStorageTypes(); 749 blocksDistribution.addHostsAndBlockWeight(hosts, len, storageTypes); 750 } 751 } 752 753 // TODO move this method OUT of FSUtils. No dependencies to HMaster 754 /** 755 * Returns the total overall fragmentation percentage. Includes hbase:meta and 756 * -ROOT- as well. 757 * 758 * @param master The master defining the HBase root and file system 759 * @return A map for each table and its percentage (never null) 760 * @throws IOException When scanning the directory fails 761 */ 762 public static int getTotalTableFragmentation(final HMaster master) 763 throws IOException { 764 Map<String, Integer> map = getTableFragmentation(master); 765 return map.isEmpty() ? -1 : map.get("-TOTAL-"); 766 } 767 768 /** 769 * Runs through the HBase rootdir and checks how many stores for each table 770 * have more than one file in them. Checks -ROOT- and hbase:meta too. The total 771 * percentage across all tables is stored under the special key "-TOTAL-". 772 * 773 * @param master The master defining the HBase root and file system. 774 * @return A map for each table and its percentage (never null). 775 * 776 * @throws IOException When scanning the directory fails. 777 */ 778 public static Map<String, Integer> getTableFragmentation(final HMaster master) 779 throws IOException { 780 Path path = CommonFSUtils.getRootDir(master.getConfiguration()); 781 // since HMaster.getFileSystem() is package private 782 FileSystem fs = path.getFileSystem(master.getConfiguration()); 783 return getTableFragmentation(fs, path); 784 } 785 786 /** 787 * Runs through the HBase rootdir and checks how many stores for each table 788 * have more than one file in them. Checks -ROOT- and hbase:meta too. The total 789 * percentage across all tables is stored under the special key "-TOTAL-". 790 * 791 * @param fs The file system to use 792 * @param hbaseRootDir The root directory to scan 793 * @return A map for each table and its percentage (never null) 794 * @throws IOException When scanning the directory fails 795 */ 796 public static Map<String, Integer> getTableFragmentation( 797 final FileSystem fs, final Path hbaseRootDir) 798 throws IOException { 799 Map<String, Integer> frags = new HashMap<>(); 800 int cfCountTotal = 0; 801 int cfFragTotal = 0; 802 PathFilter regionFilter = new RegionDirFilter(fs); 803 PathFilter familyFilter = new FamilyDirFilter(fs); 804 List<Path> tableDirs = getTableDirs(fs, hbaseRootDir); 805 for (Path d : tableDirs) { 806 int cfCount = 0; 807 int cfFrag = 0; 808 FileStatus[] regionDirs = fs.listStatus(d, regionFilter); 809 for (FileStatus regionDir : regionDirs) { 810 Path dd = regionDir.getPath(); 811 // else its a region name, now look in region for families 812 FileStatus[] familyDirs = fs.listStatus(dd, familyFilter); 813 for (FileStatus familyDir : familyDirs) { 814 cfCount++; 815 cfCountTotal++; 816 Path family = familyDir.getPath(); 817 // now in family make sure only one file 818 FileStatus[] familyStatus = fs.listStatus(family); 819 if (familyStatus.length > 1) { 820 cfFrag++; 821 cfFragTotal++; 822 } 823 } 824 } 825 // compute percentage per table and store in result list 826 frags.put(CommonFSUtils.getTableName(d).getNameAsString(), 827 cfCount == 0? 0: Math.round((float) cfFrag / cfCount * 100)); 828 } 829 // set overall percentage for all tables 830 frags.put("-TOTAL-", 831 cfCountTotal == 0? 0: Math.round((float) cfFragTotal / cfCountTotal * 100)); 832 return frags; 833 } 834 835 public static void renameFile(FileSystem fs, Path src, Path dst) throws IOException { 836 if (fs.exists(dst) && !fs.delete(dst, false)) { 837 throw new IOException("Can not delete " + dst); 838 } 839 if (!fs.rename(src, dst)) { 840 throw new IOException("Can not rename from " + src + " to " + dst); 841 } 842 } 843 844 /** 845 * A {@link PathFilter} that returns only regular files. 846 */ 847 static class FileFilter extends AbstractFileStatusFilter { 848 private final FileSystem fs; 849 850 public FileFilter(final FileSystem fs) { 851 this.fs = fs; 852 } 853 854 @Override 855 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 856 try { 857 return isFile(fs, isDir, p); 858 } catch (IOException e) { 859 LOG.warn("Unable to verify if path={} is a regular file", p, e); 860 return false; 861 } 862 } 863 } 864 865 /** 866 * Directory filter that doesn't include any of the directories in the specified blacklist 867 */ 868 public static class BlackListDirFilter extends AbstractFileStatusFilter { 869 private final FileSystem fs; 870 private List<String> blacklist; 871 872 /** 873 * Create a filter on the givem filesystem with the specified blacklist 874 * @param fs filesystem to filter 875 * @param directoryNameBlackList list of the names of the directories to filter. If 876 * <tt>null</tt>, all directories are returned 877 */ 878 @SuppressWarnings("unchecked") 879 public BlackListDirFilter(final FileSystem fs, final List<String> directoryNameBlackList) { 880 this.fs = fs; 881 blacklist = 882 (List<String>) (directoryNameBlackList == null ? Collections.emptyList() 883 : directoryNameBlackList); 884 } 885 886 @Override 887 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 888 if (!isValidName(p.getName())) { 889 return false; 890 } 891 892 try { 893 return isDirectory(fs, isDir, p); 894 } catch (IOException e) { 895 LOG.warn("An error occurred while verifying if [{}] is a valid directory." 896 + " Returning 'not valid' and continuing.", p, e); 897 return false; 898 } 899 } 900 901 protected boolean isValidName(final String name) { 902 return !blacklist.contains(name); 903 } 904 } 905 906 /** 907 * A {@link PathFilter} that only allows directories. 908 */ 909 public static class DirFilter extends BlackListDirFilter { 910 911 public DirFilter(FileSystem fs) { 912 super(fs, null); 913 } 914 } 915 916 /** 917 * A {@link PathFilter} that returns usertable directories. To get all directories use the 918 * {@link BlackListDirFilter} with a <tt>null</tt> blacklist 919 */ 920 public static class UserTableDirFilter extends BlackListDirFilter { 921 public UserTableDirFilter(FileSystem fs) { 922 super(fs, HConstants.HBASE_NON_TABLE_DIRS); 923 } 924 925 @Override 926 protected boolean isValidName(final String name) { 927 if (!super.isValidName(name)) 928 return false; 929 930 try { 931 TableName.isLegalTableQualifierName(Bytes.toBytes(name)); 932 } catch (IllegalArgumentException e) { 933 LOG.info("Invalid table name: {}", name); 934 return false; 935 } 936 return true; 937 } 938 } 939 940 public static List<Path> getTableDirs(final FileSystem fs, final Path rootdir) 941 throws IOException { 942 List<Path> tableDirs = new ArrayList<>(); 943 944 for (FileStatus status : fs 945 .globStatus(new Path(rootdir, new Path(HConstants.BASE_NAMESPACE_DIR, "*")))) { 946 tableDirs.addAll(FSUtils.getLocalTableDirs(fs, status.getPath())); 947 } 948 return tableDirs; 949 } 950 951 /** 952 * @param fs 953 * @param rootdir 954 * @return All the table directories under <code>rootdir</code>. Ignore non table hbase folders such as 955 * .logs, .oldlogs, .corrupt folders. 956 * @throws IOException 957 */ 958 public static List<Path> getLocalTableDirs(final FileSystem fs, final Path rootdir) 959 throws IOException { 960 // presumes any directory under hbase.rootdir is a table 961 FileStatus[] dirs = fs.listStatus(rootdir, new UserTableDirFilter(fs)); 962 List<Path> tabledirs = new ArrayList<>(dirs.length); 963 for (FileStatus dir: dirs) { 964 tabledirs.add(dir.getPath()); 965 } 966 return tabledirs; 967 } 968 969 /** 970 * Filter for all dirs that don't start with '.' 971 */ 972 public static class RegionDirFilter extends AbstractFileStatusFilter { 973 // This pattern will accept 0.90+ style hex region dirs and older numeric region dir names. 974 final public static Pattern regionDirPattern = Pattern.compile("^[0-9a-f]*$"); 975 final FileSystem fs; 976 977 public RegionDirFilter(FileSystem fs) { 978 this.fs = fs; 979 } 980 981 @Override 982 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 983 if (!regionDirPattern.matcher(p.getName()).matches()) { 984 return false; 985 } 986 987 try { 988 return isDirectory(fs, isDir, p); 989 } catch (IOException ioe) { 990 // Maybe the file was moved or the fs was disconnected. 991 LOG.warn("Skipping file {} due to IOException", p, ioe); 992 return false; 993 } 994 } 995 } 996 997 /** 998 * Given a particular table dir, return all the regiondirs inside it, excluding files such as 999 * .tableinfo 1000 * @param fs A file system for the Path 1001 * @param tableDir Path to a specific table directory <hbase.rootdir>/<tabledir> 1002 * @return List of paths to valid region directories in table dir. 1003 * @throws IOException 1004 */ 1005 public static List<Path> getRegionDirs(final FileSystem fs, final Path tableDir) throws IOException { 1006 // assumes we are in a table dir. 1007 List<FileStatus> rds = listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs)); 1008 if (rds == null) { 1009 return Collections.emptyList(); 1010 } 1011 List<Path> regionDirs = new ArrayList<>(rds.size()); 1012 for (FileStatus rdfs: rds) { 1013 Path rdPath = rdfs.getPath(); 1014 regionDirs.add(rdPath); 1015 } 1016 return regionDirs; 1017 } 1018 1019 public static Path getRegionDirFromRootDir(Path rootDir, RegionInfo region) { 1020 return getRegionDirFromTableDir(CommonFSUtils.getTableDir(rootDir, region.getTable()), region); 1021 } 1022 1023 public static Path getRegionDirFromTableDir(Path tableDir, RegionInfo region) { 1024 return getRegionDirFromTableDir(tableDir, 1025 ServerRegionReplicaUtil.getRegionInfoForFs(region).getEncodedName()); 1026 } 1027 1028 public static Path getRegionDirFromTableDir(Path tableDir, String encodedRegionName) { 1029 return new Path(tableDir, encodedRegionName); 1030 } 1031 1032 /** 1033 * Filter for all dirs that are legal column family names. This is generally used for colfam 1034 * dirs <hbase.rootdir>/<tabledir>/<regiondir>/<colfamdir>. 1035 */ 1036 public static class FamilyDirFilter extends AbstractFileStatusFilter { 1037 final FileSystem fs; 1038 1039 public FamilyDirFilter(FileSystem fs) { 1040 this.fs = fs; 1041 } 1042 1043 @Override 1044 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 1045 try { 1046 // throws IAE if invalid 1047 ColumnFamilyDescriptorBuilder.isLegalColumnFamilyName(Bytes.toBytes(p.getName())); 1048 } catch (IllegalArgumentException iae) { 1049 // path name is an invalid family name and thus is excluded. 1050 return false; 1051 } 1052 1053 try { 1054 return isDirectory(fs, isDir, p); 1055 } catch (IOException ioe) { 1056 // Maybe the file was moved or the fs was disconnected. 1057 LOG.warn("Skipping file {} due to IOException", p, ioe); 1058 return false; 1059 } 1060 } 1061 } 1062 1063 /** 1064 * Given a particular region dir, return all the familydirs inside it 1065 * 1066 * @param fs A file system for the Path 1067 * @param regionDir Path to a specific region directory 1068 * @return List of paths to valid family directories in region dir. 1069 * @throws IOException 1070 */ 1071 public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir) throws IOException { 1072 // assumes we are in a region dir. 1073 FileStatus[] fds = fs.listStatus(regionDir, new FamilyDirFilter(fs)); 1074 List<Path> familyDirs = new ArrayList<>(fds.length); 1075 for (FileStatus fdfs: fds) { 1076 Path fdPath = fdfs.getPath(); 1077 familyDirs.add(fdPath); 1078 } 1079 return familyDirs; 1080 } 1081 1082 public static List<Path> getReferenceFilePaths(final FileSystem fs, final Path familyDir) throws IOException { 1083 List<FileStatus> fds = listStatusWithStatusFilter(fs, familyDir, new ReferenceFileFilter(fs)); 1084 if (fds == null) { 1085 return Collections.emptyList(); 1086 } 1087 List<Path> referenceFiles = new ArrayList<>(fds.size()); 1088 for (FileStatus fdfs: fds) { 1089 Path fdPath = fdfs.getPath(); 1090 referenceFiles.add(fdPath); 1091 } 1092 return referenceFiles; 1093 } 1094 1095 /** 1096 * Filter for HFiles that excludes reference files. 1097 */ 1098 public static class HFileFilter extends AbstractFileStatusFilter { 1099 final FileSystem fs; 1100 1101 public HFileFilter(FileSystem fs) { 1102 this.fs = fs; 1103 } 1104 1105 @Override 1106 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 1107 if (!StoreFileInfo.isHFile(p) && !StoreFileInfo.isMobFile(p)) { 1108 return false; 1109 } 1110 1111 try { 1112 return isFile(fs, isDir, p); 1113 } catch (IOException ioe) { 1114 // Maybe the file was moved or the fs was disconnected. 1115 LOG.warn("Skipping file {} due to IOException", p, ioe); 1116 return false; 1117 } 1118 } 1119 } 1120 1121 /** 1122 * Filter for HFileLinks (StoreFiles and HFiles not included). 1123 * the filter itself does not consider if a link is file or not. 1124 */ 1125 public static class HFileLinkFilter implements PathFilter { 1126 1127 @Override 1128 public boolean accept(Path p) { 1129 return HFileLink.isHFileLink(p); 1130 } 1131 } 1132 1133 public static class ReferenceFileFilter extends AbstractFileStatusFilter { 1134 1135 private final FileSystem fs; 1136 1137 public ReferenceFileFilter(FileSystem fs) { 1138 this.fs = fs; 1139 } 1140 1141 @Override 1142 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 1143 if (!StoreFileInfo.isReference(p)) { 1144 return false; 1145 } 1146 1147 try { 1148 // only files can be references. 1149 return isFile(fs, isDir, p); 1150 } catch (IOException ioe) { 1151 // Maybe the file was moved or the fs was disconnected. 1152 LOG.warn("Skipping file {} due to IOException", p, ioe); 1153 return false; 1154 } 1155 } 1156 } 1157 1158 /** 1159 * Called every so-often by storefile map builder getTableStoreFilePathMap to 1160 * report progress. 1161 */ 1162 interface ProgressReporter { 1163 /** 1164 * @param status File or directory we are about to process. 1165 */ 1166 void progress(FileStatus status); 1167 } 1168 1169 /** 1170 * Runs through the HBase rootdir/tablename and creates a reverse lookup map for 1171 * table StoreFile names to the full Path. 1172 * <br> 1173 * Example...<br> 1174 * Key = 3944417774205889744 <br> 1175 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1176 * 1177 * @param map map to add values. If null, this method will create and populate one to return 1178 * @param fs The file system to use. 1179 * @param hbaseRootDir The root directory to scan. 1180 * @param tableName name of the table to scan. 1181 * @return Map keyed by StoreFile name with a value of the full Path. 1182 * @throws IOException When scanning the directory fails. 1183 * @throws InterruptedException 1184 */ 1185 public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map, 1186 final FileSystem fs, final Path hbaseRootDir, TableName tableName) 1187 throws IOException, InterruptedException { 1188 return getTableStoreFilePathMap(map, fs, hbaseRootDir, tableName, null, null, 1189 (ProgressReporter)null); 1190 } 1191 1192 /** 1193 * Runs through the HBase rootdir/tablename and creates a reverse lookup map for 1194 * table StoreFile names to the full Path. Note that because this method can be called 1195 * on a 'live' HBase system that we will skip files that no longer exist by the time 1196 * we traverse them and similarly the user of the result needs to consider that some 1197 * entries in this map may not exist by the time this call completes. 1198 * <br> 1199 * Example...<br> 1200 * Key = 3944417774205889744 <br> 1201 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1202 * 1203 * @param resultMap map to add values. If null, this method will create and populate one to return 1204 * @param fs The file system to use. 1205 * @param hbaseRootDir The root directory to scan. 1206 * @param tableName name of the table to scan. 1207 * @param sfFilter optional path filter to apply to store files 1208 * @param executor optional executor service to parallelize this operation 1209 * @param progressReporter Instance or null; gets called every time we move to new region of 1210 * family dir and for each store file. 1211 * @return Map keyed by StoreFile name with a value of the full Path. 1212 * @throws IOException When scanning the directory fails. 1213 * @deprecated Since 2.3.0. For removal in hbase4. Use ProgressReporter override instead. 1214 */ 1215 @Deprecated 1216 public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap, 1217 final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter, 1218 ExecutorService executor, final HbckErrorReporter progressReporter) 1219 throws IOException, InterruptedException { 1220 return getTableStoreFilePathMap(resultMap, fs, hbaseRootDir, tableName, sfFilter, executor, 1221 new ProgressReporter() { 1222 @Override 1223 public void progress(FileStatus status) { 1224 // status is not used in this implementation. 1225 progressReporter.progress(); 1226 } 1227 }); 1228 } 1229 1230 /** 1231 * Runs through the HBase rootdir/tablename and creates a reverse lookup map for 1232 * table StoreFile names to the full Path. Note that because this method can be called 1233 * on a 'live' HBase system that we will skip files that no longer exist by the time 1234 * we traverse them and similarly the user of the result needs to consider that some 1235 * entries in this map may not exist by the time this call completes. 1236 * <br> 1237 * Example...<br> 1238 * Key = 3944417774205889744 <br> 1239 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1240 * 1241 * @param resultMap map to add values. If null, this method will create and populate one 1242 * to return 1243 * @param fs The file system to use. 1244 * @param hbaseRootDir The root directory to scan. 1245 * @param tableName name of the table to scan. 1246 * @param sfFilter optional path filter to apply to store files 1247 * @param executor optional executor service to parallelize this operation 1248 * @param progressReporter Instance or null; gets called every time we move to new region of 1249 * family dir and for each store file. 1250 * @return Map keyed by StoreFile name with a value of the full Path. 1251 * @throws IOException When scanning the directory fails. 1252 * @throws InterruptedException the thread is interrupted, either before or during the activity. 1253 */ 1254 public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap, 1255 final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter, 1256 ExecutorService executor, final ProgressReporter progressReporter) 1257 throws IOException, InterruptedException { 1258 1259 final Map<String, Path> finalResultMap = 1260 resultMap == null ? new ConcurrentHashMap<>(128, 0.75f, 32) : resultMap; 1261 1262 // only include the directory paths to tables 1263 Path tableDir = CommonFSUtils.getTableDir(hbaseRootDir, tableName); 1264 // Inside a table, there are compaction.dir directories to skip. Otherwise, all else 1265 // should be regions. 1266 final FamilyDirFilter familyFilter = new FamilyDirFilter(fs); 1267 final Vector<Exception> exceptions = new Vector<>(); 1268 1269 try { 1270 List<FileStatus> regionDirs = FSUtils.listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs)); 1271 if (regionDirs == null) { 1272 return finalResultMap; 1273 } 1274 1275 final List<Future<?>> futures = new ArrayList<>(regionDirs.size()); 1276 1277 for (FileStatus regionDir : regionDirs) { 1278 if (null != progressReporter) { 1279 progressReporter.progress(regionDir); 1280 } 1281 final Path dd = regionDir.getPath(); 1282 1283 if (!exceptions.isEmpty()) { 1284 break; 1285 } 1286 1287 Runnable getRegionStoreFileMapCall = new Runnable() { 1288 @Override 1289 public void run() { 1290 try { 1291 HashMap<String,Path> regionStoreFileMap = new HashMap<>(); 1292 List<FileStatus> familyDirs = FSUtils.listStatusWithStatusFilter(fs, dd, familyFilter); 1293 if (familyDirs == null) { 1294 if (!fs.exists(dd)) { 1295 LOG.warn("Skipping region because it no longer exists: " + dd); 1296 } else { 1297 LOG.warn("Skipping region because it has no family dirs: " + dd); 1298 } 1299 return; 1300 } 1301 for (FileStatus familyDir : familyDirs) { 1302 if (null != progressReporter) { 1303 progressReporter.progress(familyDir); 1304 } 1305 Path family = familyDir.getPath(); 1306 if (family.getName().equals(HConstants.RECOVERED_EDITS_DIR)) { 1307 continue; 1308 } 1309 // now in family, iterate over the StoreFiles and 1310 // put in map 1311 FileStatus[] familyStatus = fs.listStatus(family); 1312 for (FileStatus sfStatus : familyStatus) { 1313 if (null != progressReporter) { 1314 progressReporter.progress(sfStatus); 1315 } 1316 Path sf = sfStatus.getPath(); 1317 if (sfFilter == null || sfFilter.accept(sf)) { 1318 regionStoreFileMap.put( sf.getName(), sf); 1319 } 1320 } 1321 } 1322 finalResultMap.putAll(regionStoreFileMap); 1323 } catch (Exception e) { 1324 LOG.error("Could not get region store file map for region: " + dd, e); 1325 exceptions.add(e); 1326 } 1327 } 1328 }; 1329 1330 // If executor is available, submit async tasks to exec concurrently, otherwise 1331 // just do serial sync execution 1332 if (executor != null) { 1333 Future<?> future = executor.submit(getRegionStoreFileMapCall); 1334 futures.add(future); 1335 } else { 1336 FutureTask<?> future = new FutureTask<>(getRegionStoreFileMapCall, null); 1337 future.run(); 1338 futures.add(future); 1339 } 1340 } 1341 1342 // Ensure all pending tasks are complete (or that we run into an exception) 1343 for (Future<?> f : futures) { 1344 if (!exceptions.isEmpty()) { 1345 break; 1346 } 1347 try { 1348 f.get(); 1349 } catch (ExecutionException e) { 1350 LOG.error("Unexpected exec exception! Should've been caught already. (Bug?)", e); 1351 // Shouldn't happen, we already logged/caught any exceptions in the Runnable 1352 } 1353 } 1354 } catch (IOException e) { 1355 LOG.error("Cannot execute getTableStoreFilePathMap for " + tableName, e); 1356 exceptions.add(e); 1357 } finally { 1358 if (!exceptions.isEmpty()) { 1359 // Just throw the first exception as an indication something bad happened 1360 // Don't need to propagate all the exceptions, we already logged them all anyway 1361 Throwables.propagateIfPossible(exceptions.firstElement(), IOException.class); 1362 throw new IOException(exceptions.firstElement()); 1363 } 1364 } 1365 1366 return finalResultMap; 1367 } 1368 1369 public static int getRegionReferenceFileCount(final FileSystem fs, final Path p) { 1370 int result = 0; 1371 try { 1372 for (Path familyDir:getFamilyDirs(fs, p)){ 1373 result += getReferenceFilePaths(fs, familyDir).size(); 1374 } 1375 } catch (IOException e) { 1376 LOG.warn("Error counting reference files", e); 1377 } 1378 return result; 1379 } 1380 1381 /** 1382 * Runs through the HBase rootdir and creates a reverse lookup map for 1383 * table StoreFile names to the full Path. 1384 * <br> 1385 * Example...<br> 1386 * Key = 3944417774205889744 <br> 1387 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1388 * 1389 * @param fs The file system to use. 1390 * @param hbaseRootDir The root directory to scan. 1391 * @return Map keyed by StoreFile name with a value of the full Path. 1392 * @throws IOException When scanning the directory fails. 1393 */ 1394 public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs, 1395 final Path hbaseRootDir) 1396 throws IOException, InterruptedException { 1397 return getTableStoreFilePathMap(fs, hbaseRootDir, null, null, (ProgressReporter)null); 1398 } 1399 1400 /** 1401 * Runs through the HBase rootdir and creates a reverse lookup map for 1402 * table StoreFile names to the full Path. 1403 * <br> 1404 * Example...<br> 1405 * Key = 3944417774205889744 <br> 1406 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1407 * 1408 * @param fs The file system to use. 1409 * @param hbaseRootDir The root directory to scan. 1410 * @param sfFilter optional path filter to apply to store files 1411 * @param executor optional executor service to parallelize this operation 1412 * @param progressReporter Instance or null; gets called every time we move to new region of 1413 * family dir and for each store file. 1414 * @return Map keyed by StoreFile name with a value of the full Path. 1415 * @throws IOException When scanning the directory fails. 1416 * @deprecated Since 2.3.0. Will be removed in hbase4. Used {@link 1417 * #getTableStoreFilePathMap(FileSystem, Path, PathFilter, ExecutorService, ProgressReporter)} 1418 */ 1419 @Deprecated 1420 public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs, 1421 final Path hbaseRootDir, PathFilter sfFilter, ExecutorService executor, 1422 HbckErrorReporter progressReporter) 1423 throws IOException, InterruptedException { 1424 return getTableStoreFilePathMap(fs, hbaseRootDir, sfFilter, executor, 1425 new ProgressReporter() { 1426 @Override 1427 public void progress(FileStatus status) { 1428 // status is not used in this implementation. 1429 progressReporter.progress(); 1430 } 1431 }); 1432 } 1433 1434 /** 1435 * Runs through the HBase rootdir and creates a reverse lookup map for 1436 * table StoreFile names to the full Path. 1437 * <br> 1438 * Example...<br> 1439 * Key = 3944417774205889744 <br> 1440 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1441 * 1442 * @param fs The file system to use. 1443 * @param hbaseRootDir The root directory to scan. 1444 * @param sfFilter optional path filter to apply to store files 1445 * @param executor optional executor service to parallelize this operation 1446 * @param progressReporter Instance or null; gets called every time we move to new region of 1447 * family dir and for each store file. 1448 * @return Map keyed by StoreFile name with a value of the full Path. 1449 * @throws IOException When scanning the directory fails. 1450 * @throws InterruptedException 1451 */ 1452 public static Map<String, Path> getTableStoreFilePathMap( 1453 final FileSystem fs, final Path hbaseRootDir, PathFilter sfFilter, 1454 ExecutorService executor, ProgressReporter progressReporter) 1455 throws IOException, InterruptedException { 1456 ConcurrentHashMap<String, Path> map = new ConcurrentHashMap<>(1024, 0.75f, 32); 1457 1458 // if this method looks similar to 'getTableFragmentation' that is because 1459 // it was borrowed from it. 1460 1461 // only include the directory paths to tables 1462 for (Path tableDir : FSUtils.getTableDirs(fs, hbaseRootDir)) { 1463 getTableStoreFilePathMap(map, fs, hbaseRootDir, CommonFSUtils.getTableName(tableDir), 1464 sfFilter, executor, progressReporter); 1465 } 1466 return map; 1467 } 1468 1469 /** 1470 * Filters FileStatuses in an array and returns a list 1471 * 1472 * @param input An array of FileStatuses 1473 * @param filter A required filter to filter the array 1474 * @return A list of FileStatuses 1475 */ 1476 public static List<FileStatus> filterFileStatuses(FileStatus[] input, 1477 FileStatusFilter filter) { 1478 if (input == null) return null; 1479 return filterFileStatuses(Iterators.forArray(input), filter); 1480 } 1481 1482 /** 1483 * Filters FileStatuses in an iterator and returns a list 1484 * 1485 * @param input An iterator of FileStatuses 1486 * @param filter A required filter to filter the array 1487 * @return A list of FileStatuses 1488 */ 1489 public static List<FileStatus> filterFileStatuses(Iterator<FileStatus> input, 1490 FileStatusFilter filter) { 1491 if (input == null) return null; 1492 ArrayList<FileStatus> results = new ArrayList<>(); 1493 while (input.hasNext()) { 1494 FileStatus f = input.next(); 1495 if (filter.accept(f)) { 1496 results.add(f); 1497 } 1498 } 1499 return results; 1500 } 1501 1502 /** 1503 * Calls fs.listStatus() and treats FileNotFoundException as non-fatal 1504 * This accommodates differences between hadoop versions, where hadoop 1 1505 * does not throw a FileNotFoundException, and return an empty FileStatus[] 1506 * while Hadoop 2 will throw FileNotFoundException. 1507 * 1508 * @param fs file system 1509 * @param dir directory 1510 * @param filter file status filter 1511 * @return null if dir is empty or doesn't exist, otherwise FileStatus list 1512 */ 1513 public static List<FileStatus> listStatusWithStatusFilter(final FileSystem fs, 1514 final Path dir, final FileStatusFilter filter) throws IOException { 1515 FileStatus [] status = null; 1516 try { 1517 status = fs.listStatus(dir); 1518 } catch (FileNotFoundException fnfe) { 1519 LOG.trace("{} does not exist", dir); 1520 return null; 1521 } 1522 1523 if (ArrayUtils.getLength(status) == 0) { 1524 return null; 1525 } 1526 1527 if (filter == null) { 1528 return Arrays.asList(status); 1529 } else { 1530 List<FileStatus> status2 = filterFileStatuses(status, filter); 1531 if (status2 == null || status2.isEmpty()) { 1532 return null; 1533 } else { 1534 return status2; 1535 } 1536 } 1537 } 1538 1539 /** 1540 * This function is to scan the root path of the file system to get the 1541 * degree of locality for each region on each of the servers having at least 1542 * one block of that region. 1543 * This is used by the tool {@link org.apache.hadoop.hbase.master.RegionPlacementMaintainer} 1544 * 1545 * @param conf 1546 * the configuration to use 1547 * @return the mapping from region encoded name to a map of server names to 1548 * locality fraction 1549 * @throws IOException 1550 * in case of file system errors or interrupts 1551 */ 1552 public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS( 1553 final Configuration conf) throws IOException { 1554 return getRegionDegreeLocalityMappingFromFS( 1555 conf, null, 1556 conf.getInt(THREAD_POOLSIZE, DEFAULT_THREAD_POOLSIZE)); 1557 1558 } 1559 1560 /** 1561 * This function is to scan the root path of the file system to get the 1562 * degree of locality for each region on each of the servers having at least 1563 * one block of that region. 1564 * 1565 * @param conf 1566 * the configuration to use 1567 * @param desiredTable 1568 * the table you wish to scan locality for 1569 * @param threadPoolSize 1570 * the thread pool size to use 1571 * @return the mapping from region encoded name to a map of server names to 1572 * locality fraction 1573 * @throws IOException 1574 * in case of file system errors or interrupts 1575 */ 1576 public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS( 1577 final Configuration conf, final String desiredTable, int threadPoolSize) 1578 throws IOException { 1579 Map<String, Map<String, Float>> regionDegreeLocalityMapping = new ConcurrentHashMap<>(); 1580 getRegionLocalityMappingFromFS(conf, desiredTable, threadPoolSize, regionDegreeLocalityMapping); 1581 return regionDegreeLocalityMapping; 1582 } 1583 1584 /** 1585 * This function is to scan the root path of the file system to get either the 1586 * mapping between the region name and its best locality region server or the 1587 * degree of locality of each region on each of the servers having at least 1588 * one block of that region. The output map parameters are both optional. 1589 * 1590 * @param conf 1591 * the configuration to use 1592 * @param desiredTable 1593 * the table you wish to scan locality for 1594 * @param threadPoolSize 1595 * the thread pool size to use 1596 * @param regionDegreeLocalityMapping 1597 * the map into which to put the locality degree mapping or null, 1598 * must be a thread-safe implementation 1599 * @throws IOException 1600 * in case of file system errors or interrupts 1601 */ 1602 private static void getRegionLocalityMappingFromFS(final Configuration conf, 1603 final String desiredTable, int threadPoolSize, 1604 final Map<String, Map<String, Float>> regionDegreeLocalityMapping) throws IOException { 1605 final FileSystem fs = FileSystem.get(conf); 1606 final Path rootPath = CommonFSUtils.getRootDir(conf); 1607 final long startTime = EnvironmentEdgeManager.currentTime(); 1608 final Path queryPath; 1609 // The table files are in ${hbase.rootdir}/data/<namespace>/<table>/* 1610 if (null == desiredTable) { 1611 queryPath = 1612 new Path(new Path(rootPath, HConstants.BASE_NAMESPACE_DIR).toString() + "/*/*/*/"); 1613 } else { 1614 queryPath = new Path( 1615 CommonFSUtils.getTableDir(rootPath, TableName.valueOf(desiredTable)).toString() + "/*/"); 1616 } 1617 1618 // reject all paths that are not appropriate 1619 PathFilter pathFilter = new PathFilter() { 1620 @Override 1621 public boolean accept(Path path) { 1622 // this is the region name; it may get some noise data 1623 if (null == path) { 1624 return false; 1625 } 1626 1627 // no parent? 1628 Path parent = path.getParent(); 1629 if (null == parent) { 1630 return false; 1631 } 1632 1633 String regionName = path.getName(); 1634 if (null == regionName) { 1635 return false; 1636 } 1637 1638 if (!regionName.toLowerCase(Locale.ROOT).matches("[0-9a-f]+")) { 1639 return false; 1640 } 1641 return true; 1642 } 1643 }; 1644 1645 FileStatus[] statusList = fs.globStatus(queryPath, pathFilter); 1646 1647 if (LOG.isDebugEnabled()) { 1648 LOG.debug("Query Path: {} ; # list of files: {}", queryPath, Arrays.toString(statusList)); 1649 } 1650 1651 if (null == statusList) { 1652 return; 1653 } 1654 1655 // lower the number of threads in case we have very few expected regions 1656 threadPoolSize = Math.min(threadPoolSize, statusList.length); 1657 1658 // run in multiple threads 1659 final ExecutorService tpe = Executors.newFixedThreadPool(threadPoolSize, 1660 new ThreadFactoryBuilder().setNameFormat("FSRegionQuery-pool-%d").setDaemon(true) 1661 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 1662 try { 1663 // ignore all file status items that are not of interest 1664 for (FileStatus regionStatus : statusList) { 1665 if (null == regionStatus || !regionStatus.isDirectory()) { 1666 continue; 1667 } 1668 1669 final Path regionPath = regionStatus.getPath(); 1670 if (null != regionPath) { 1671 tpe.execute(new FSRegionScanner(fs, regionPath, null, regionDegreeLocalityMapping)); 1672 } 1673 } 1674 } finally { 1675 tpe.shutdown(); 1676 final long threadWakeFrequency = (long) conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 1677 HConstants.DEFAULT_THREAD_WAKE_FREQUENCY); 1678 try { 1679 // here we wait until TPE terminates, which is either naturally or by 1680 // exceptions in the execution of the threads 1681 while (!tpe.awaitTermination(threadWakeFrequency, 1682 TimeUnit.MILLISECONDS)) { 1683 // printing out rough estimate, so as to not introduce 1684 // AtomicInteger 1685 LOG.info("Locality checking is underway: { Scanned Regions : " 1686 + ((ThreadPoolExecutor) tpe).getCompletedTaskCount() + "/" 1687 + ((ThreadPoolExecutor) tpe).getTaskCount() + " }"); 1688 } 1689 } catch (InterruptedException e) { 1690 Thread.currentThread().interrupt(); 1691 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 1692 } 1693 } 1694 1695 long overhead = EnvironmentEdgeManager.currentTime() - startTime; 1696 LOG.info("Scan DFS for locality info takes {}ms", overhead); 1697 } 1698 1699 /** 1700 * Do our short circuit read setup. 1701 * Checks buffer size to use and whether to do checksumming in hbase or hdfs. 1702 * @param conf 1703 */ 1704 public static void setupShortCircuitRead(final Configuration conf) { 1705 // Check that the user has not set the "dfs.client.read.shortcircuit.skip.checksum" property. 1706 boolean shortCircuitSkipChecksum = 1707 conf.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false); 1708 boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true); 1709 if (shortCircuitSkipChecksum) { 1710 LOG.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not " + 1711 "be set to true." + (useHBaseChecksum ? " HBase checksum doesn't require " + 1712 "it, see https://issues.apache.org/jira/browse/HBASE-6868." : "")); 1713 assert !shortCircuitSkipChecksum; //this will fail if assertions are on 1714 } 1715 checkShortCircuitReadBufferSize(conf); 1716 } 1717 1718 /** 1719 * Check if short circuit read buffer size is set and if not, set it to hbase value. 1720 * @param conf 1721 */ 1722 public static void checkShortCircuitReadBufferSize(final Configuration conf) { 1723 final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2; 1724 final int notSet = -1; 1725 // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2 1726 final String dfsKey = "dfs.client.read.shortcircuit.buffer.size"; 1727 int size = conf.getInt(dfsKey, notSet); 1728 // If a size is set, return -- we will use it. 1729 if (size != notSet) return; 1730 // But short circuit buffer size is normally not set. Put in place the hbase wanted size. 1731 int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize); 1732 conf.setIfUnset(dfsKey, Integer.toString(hbaseSize)); 1733 } 1734 1735 /** 1736 * @param c 1737 * @return The DFSClient DFSHedgedReadMetrics instance or null if can't be found or not on hdfs. 1738 * @throws IOException 1739 */ 1740 public static DFSHedgedReadMetrics getDFSHedgedReadMetrics(final Configuration c) 1741 throws IOException { 1742 if (!CommonFSUtils.isHDFS(c)) { 1743 return null; 1744 } 1745 // getHedgedReadMetrics is package private. Get the DFSClient instance that is internal 1746 // to the DFS FS instance and make the method getHedgedReadMetrics accessible, then invoke it 1747 // to get the singleton instance of DFSHedgedReadMetrics shared by DFSClients. 1748 final String name = "getHedgedReadMetrics"; 1749 DFSClient dfsclient = ((DistributedFileSystem)FileSystem.get(c)).getClient(); 1750 Method m; 1751 try { 1752 m = dfsclient.getClass().getDeclaredMethod(name); 1753 } catch (NoSuchMethodException e) { 1754 LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " + 1755 e.getMessage()); 1756 return null; 1757 } catch (SecurityException e) { 1758 LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " + 1759 e.getMessage()); 1760 return null; 1761 } 1762 m.setAccessible(true); 1763 try { 1764 return (DFSHedgedReadMetrics)m.invoke(dfsclient); 1765 } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { 1766 LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " + 1767 e.getMessage()); 1768 return null; 1769 } 1770 } 1771 1772 public static List<Path> copyFilesParallel(FileSystem srcFS, Path src, FileSystem dstFS, Path dst, 1773 Configuration conf, int threads) throws IOException { 1774 ExecutorService pool = Executors.newFixedThreadPool(threads); 1775 List<Future<Void>> futures = new ArrayList<>(); 1776 List<Path> traversedPaths; 1777 try { 1778 traversedPaths = copyFiles(srcFS, src, dstFS, dst, conf, pool, futures); 1779 for (Future<Void> future : futures) { 1780 future.get(); 1781 } 1782 } catch (ExecutionException | InterruptedException | IOException e) { 1783 throw new IOException("Copy snapshot reference files failed", e); 1784 } finally { 1785 pool.shutdownNow(); 1786 } 1787 return traversedPaths; 1788 } 1789 1790 private static List<Path> copyFiles(FileSystem srcFS, Path src, FileSystem dstFS, Path dst, 1791 Configuration conf, ExecutorService pool, List<Future<Void>> futures) throws IOException { 1792 List<Path> traversedPaths = new ArrayList<>(); 1793 traversedPaths.add(dst); 1794 FileStatus currentFileStatus = srcFS.getFileStatus(src); 1795 if (currentFileStatus.isDirectory()) { 1796 if (!dstFS.mkdirs(dst)) { 1797 throw new IOException("Create directory failed: " + dst); 1798 } 1799 FileStatus[] subPaths = srcFS.listStatus(src); 1800 for (FileStatus subPath : subPaths) { 1801 traversedPaths.addAll(copyFiles(srcFS, subPath.getPath(), dstFS, 1802 new Path(dst, subPath.getPath().getName()), conf, pool, futures)); 1803 } 1804 } else { 1805 Future<Void> future = pool.submit(() -> { 1806 FileUtil.copy(srcFS, src, dstFS, dst, false, false, conf); 1807 return null; 1808 }); 1809 futures.add(future); 1810 } 1811 return traversedPaths; 1812 } 1813 1814 /** 1815 * @return A set containing all namenode addresses of fs 1816 */ 1817 private static Set<InetSocketAddress> getNNAddresses(DistributedFileSystem fs, 1818 Configuration conf) { 1819 Set<InetSocketAddress> addresses = new HashSet<>(); 1820 String serviceName = fs.getCanonicalServiceName(); 1821 1822 if (serviceName.startsWith("ha-hdfs")) { 1823 try { 1824 Map<String, Map<String, InetSocketAddress>> addressMap = 1825 DFSUtil.getNNServiceRpcAddressesForCluster(conf); 1826 String nameService = serviceName.substring(serviceName.indexOf(":") + 1); 1827 if (addressMap.containsKey(nameService)) { 1828 Map<String, InetSocketAddress> nnMap = addressMap.get(nameService); 1829 for (Map.Entry<String, InetSocketAddress> e2 : nnMap.entrySet()) { 1830 InetSocketAddress addr = e2.getValue(); 1831 addresses.add(addr); 1832 } 1833 } 1834 } catch (Exception e) { 1835 LOG.warn("DFSUtil.getNNServiceRpcAddresses failed. serviceName=" + serviceName, e); 1836 } 1837 } else { 1838 URI uri = fs.getUri(); 1839 int port = uri.getPort(); 1840 if (port < 0) { 1841 int idx = serviceName.indexOf(':'); 1842 port = Integer.parseInt(serviceName.substring(idx + 1)); 1843 } 1844 InetSocketAddress addr = new InetSocketAddress(uri.getHost(), port); 1845 addresses.add(addr); 1846 } 1847 1848 return addresses; 1849 } 1850 1851 /** 1852 * @param conf the Configuration of HBase 1853 * @return Whether srcFs and desFs are on same hdfs or not 1854 */ 1855 public static boolean isSameHdfs(Configuration conf, FileSystem srcFs, FileSystem desFs) { 1856 // By getCanonicalServiceName, we could make sure both srcFs and desFs 1857 // show a unified format which contains scheme, host and port. 1858 String srcServiceName = srcFs.getCanonicalServiceName(); 1859 String desServiceName = desFs.getCanonicalServiceName(); 1860 1861 if (srcServiceName == null || desServiceName == null) { 1862 return false; 1863 } 1864 if (srcServiceName.equals(desServiceName)) { 1865 return true; 1866 } 1867 if (srcServiceName.startsWith("ha-hdfs") && desServiceName.startsWith("ha-hdfs")) { 1868 Collection<String> internalNameServices = 1869 conf.getTrimmedStringCollection("dfs.internal.nameservices"); 1870 if (!internalNameServices.isEmpty()) { 1871 if (internalNameServices.contains(srcServiceName.split(":")[1])) { 1872 return true; 1873 } else { 1874 return false; 1875 } 1876 } 1877 } 1878 if (srcFs instanceof DistributedFileSystem && desFs instanceof DistributedFileSystem) { 1879 // If one serviceName is an HA format while the other is a non-HA format, 1880 // maybe they refer to the same FileSystem. 1881 // For example, srcFs is "ha-hdfs://nameservices" and desFs is "hdfs://activeNamenode:port" 1882 Set<InetSocketAddress> srcAddrs = getNNAddresses((DistributedFileSystem) srcFs, conf); 1883 Set<InetSocketAddress> desAddrs = getNNAddresses((DistributedFileSystem) desFs, conf); 1884 if (Sets.intersection(srcAddrs, desAddrs).size() > 0) { 1885 return true; 1886 } 1887 } 1888 1889 return false; 1890 } 1891}