001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import static org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET; 021 022import edu.umd.cs.findbugs.annotations.CheckForNull; 023import java.io.ByteArrayInputStream; 024import java.io.DataInputStream; 025import java.io.EOFException; 026import java.io.FileNotFoundException; 027import java.io.IOException; 028import java.io.InterruptedIOException; 029import java.lang.reflect.InvocationTargetException; 030import java.lang.reflect.Method; 031import java.net.InetSocketAddress; 032import java.net.URI; 033import java.util.ArrayList; 034import java.util.Arrays; 035import java.util.Collection; 036import java.util.Collections; 037import java.util.HashMap; 038import java.util.HashSet; 039import java.util.Iterator; 040import java.util.List; 041import java.util.Locale; 042import java.util.Map; 043import java.util.Optional; 044import java.util.Set; 045import java.util.Vector; 046import java.util.concurrent.ConcurrentHashMap; 047import java.util.concurrent.ExecutionException; 048import java.util.concurrent.ExecutorService; 049import java.util.concurrent.Executors; 050import java.util.concurrent.Future; 051import java.util.concurrent.FutureTask; 052import java.util.concurrent.ThreadPoolExecutor; 053import java.util.concurrent.TimeUnit; 054import java.util.regex.Pattern; 055import org.apache.commons.lang3.ArrayUtils; 056import org.apache.hadoop.conf.Configuration; 057import org.apache.hadoop.fs.BlockLocation; 058import org.apache.hadoop.fs.FSDataInputStream; 059import org.apache.hadoop.fs.FSDataOutputStream; 060import org.apache.hadoop.fs.FileStatus; 061import org.apache.hadoop.fs.FileSystem; 062import org.apache.hadoop.fs.FileUtil; 063import org.apache.hadoop.fs.Path; 064import org.apache.hadoop.fs.PathFilter; 065import org.apache.hadoop.fs.StorageType; 066import org.apache.hadoop.fs.permission.FsPermission; 067import org.apache.hadoop.hbase.ClusterId; 068import org.apache.hadoop.hbase.HConstants; 069import org.apache.hadoop.hbase.HDFSBlocksDistribution; 070import org.apache.hadoop.hbase.TableName; 071import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 072import org.apache.hadoop.hbase.client.RegionInfo; 073import org.apache.hadoop.hbase.client.RegionInfoBuilder; 074import org.apache.hadoop.hbase.exceptions.DeserializationException; 075import org.apache.hadoop.hbase.fs.HFileSystem; 076import org.apache.hadoop.hbase.io.HFileLink; 077import org.apache.hadoop.hbase.master.HMaster; 078import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 079import org.apache.hadoop.hdfs.DFSClient; 080import org.apache.hadoop.hdfs.DFSHedgedReadMetrics; 081import org.apache.hadoop.hdfs.DFSUtil; 082import org.apache.hadoop.hdfs.DistributedFileSystem; 083import org.apache.hadoop.hdfs.client.HdfsDataInputStream; 084import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 085import org.apache.hadoop.hdfs.protocol.LocatedBlock; 086import org.apache.hadoop.io.IOUtils; 087import org.apache.hadoop.ipc.RemoteException; 088import org.apache.yetus.audience.InterfaceAudience; 089import org.slf4j.Logger; 090import org.slf4j.LoggerFactory; 091 092import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 093import org.apache.hbase.thirdparty.com.google.common.collect.Iterators; 094import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 095import org.apache.hbase.thirdparty.com.google.common.primitives.Ints; 096import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 097 098import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 099import org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos; 100 101/** 102 * Utility methods for interacting with the underlying file system. 103 */ 104@InterfaceAudience.Private 105public final class FSUtils { 106 private static final Logger LOG = LoggerFactory.getLogger(FSUtils.class); 107 108 private static final String THREAD_POOLSIZE = "hbase.client.localityCheck.threadPoolSize"; 109 private static final int DEFAULT_THREAD_POOLSIZE = 2; 110 111 /** Set to true on Windows platforms */ 112 // currently only used in testing. TODO refactor into a test class 113 public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows"); 114 115 private FSUtils() { 116 } 117 118 /** Returns True is <code>fs</code> is instance of DistributedFileSystem n */ 119 public static boolean isDistributedFileSystem(final FileSystem fs) throws IOException { 120 FileSystem fileSystem = fs; 121 // If passed an instance of HFileSystem, it fails instanceof DistributedFileSystem. 122 // Check its backing fs for dfs-ness. 123 if (fs instanceof HFileSystem) { 124 fileSystem = ((HFileSystem) fs).getBackingFs(); 125 } 126 return fileSystem instanceof DistributedFileSystem; 127 } 128 129 /** 130 * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the 131 * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true. Does not consider 132 * schema; i.e. if schemas different but path or subpath matches, the two will equate. 133 * @param pathToSearch Path we will be trying to match. 134 * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code> 135 */ 136 public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) { 137 Path tailPath = pathTail; 138 String tailName; 139 Path toSearch = pathToSearch; 140 String toSearchName; 141 boolean result = false; 142 143 if (pathToSearch.depth() != pathTail.depth()) { 144 return false; 145 } 146 147 do { 148 tailName = tailPath.getName(); 149 if (tailName == null || tailName.isEmpty()) { 150 result = true; 151 break; 152 } 153 toSearchName = toSearch.getName(); 154 if (toSearchName == null || toSearchName.isEmpty()) { 155 break; 156 } 157 // Move up a parent on each path for next go around. Path doesn't let us go off the end. 158 tailPath = tailPath.getParent(); 159 toSearch = toSearch.getParent(); 160 } while (tailName.equals(toSearchName)); 161 return result; 162 } 163 164 /** 165 * Delete the region directory if exists. 166 * @return True if deleted the region directory. 167 */ 168 public static boolean deleteRegionDir(final Configuration conf, final RegionInfo hri) 169 throws IOException { 170 Path rootDir = CommonFSUtils.getRootDir(conf); 171 FileSystem fs = rootDir.getFileSystem(conf); 172 return CommonFSUtils.deleteDirectory(fs, 173 new Path(CommonFSUtils.getTableDir(rootDir, hri.getTable()), hri.getEncodedName())); 174 } 175 176 /** 177 * Create the specified file on the filesystem. By default, this will: 178 * <ol> 179 * <li>overwrite the file if it exists</li> 180 * <li>apply the umask in the configuration (if it is enabled)</li> 181 * <li>use the fs configured buffer size (or 4096 if not set)</li> 182 * <li>use the configured column family replication or default replication if 183 * {@link ColumnFamilyDescriptorBuilder#DEFAULT_DFS_REPLICATION}</li> 184 * <li>use the default block size</li> 185 * <li>not track progress</li> 186 * </ol> 187 * @param conf configurations 188 * @param fs {@link FileSystem} on which to write the file 189 * @param path {@link Path} to the file to write 190 * @param perm permissions 191 * @param favoredNodes favored data nodes 192 * @return output stream to the created file 193 * @throws IOException if the file cannot be created 194 */ 195 public static FSDataOutputStream create(Configuration conf, FileSystem fs, Path path, 196 FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException { 197 if (fs instanceof HFileSystem) { 198 FileSystem backingFs = ((HFileSystem) fs).getBackingFs(); 199 if (backingFs instanceof DistributedFileSystem) { 200 short replication = Short.parseShort(conf.get(ColumnFamilyDescriptorBuilder.DFS_REPLICATION, 201 String.valueOf(ColumnFamilyDescriptorBuilder.DEFAULT_DFS_REPLICATION))); 202 DistributedFileSystem.HdfsDataOutputStreamBuilder builder = 203 ((DistributedFileSystem) backingFs).createFile(path).recursive().permission(perm) 204 .create(); 205 if (favoredNodes != null) { 206 builder.favoredNodes(favoredNodes); 207 } 208 if (replication > 0) { 209 builder.replication(replication); 210 } 211 return builder.build(); 212 } 213 214 } 215 return CommonFSUtils.create(fs, path, perm, true); 216 } 217 218 /** 219 * Checks to see if the specified file system is available 220 * @param fs filesystem 221 * @throws IOException e 222 */ 223 public static void checkFileSystemAvailable(final FileSystem fs) throws IOException { 224 if (!(fs instanceof DistributedFileSystem)) { 225 return; 226 } 227 IOException exception = null; 228 DistributedFileSystem dfs = (DistributedFileSystem) fs; 229 try { 230 if (dfs.exists(new Path("/"))) { 231 return; 232 } 233 } catch (IOException e) { 234 exception = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; 235 } 236 try { 237 fs.close(); 238 } catch (Exception e) { 239 LOG.error("file system close failed: ", e); 240 } 241 throw new IOException("File system is not available", exception); 242 } 243 244 /** 245 * Inquire the Active NameNode's safe mode status. 246 * @param dfs A DistributedFileSystem object representing the underlying HDFS. 247 * @return whether we're in safe mode 248 */ 249 private static boolean isInSafeMode(DistributedFileSystem dfs) throws IOException { 250 return dfs.setSafeMode(SAFEMODE_GET, true); 251 } 252 253 /** 254 * Check whether dfs is in safemode. 255 */ 256 public static void checkDfsSafeMode(final Configuration conf) throws IOException { 257 boolean isInSafeMode = false; 258 FileSystem fs = FileSystem.get(conf); 259 if (fs instanceof DistributedFileSystem) { 260 DistributedFileSystem dfs = (DistributedFileSystem) fs; 261 isInSafeMode = isInSafeMode(dfs); 262 } 263 if (isInSafeMode) { 264 throw new IOException("File system is in safemode, it can't be written now"); 265 } 266 } 267 268 /** 269 * Verifies current version of file system 270 * @param fs filesystem object 271 * @param rootdir root hbase directory 272 * @return null if no version file exists, version string otherwise 273 * @throws IOException if the version file fails to open 274 * @throws DeserializationException if the version data cannot be translated into a version 275 */ 276 public static String getVersion(FileSystem fs, Path rootdir) 277 throws IOException, DeserializationException { 278 final Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME); 279 FileStatus[] status = null; 280 try { 281 // hadoop 2.0 throws FNFE if directory does not exist. 282 // hadoop 1.0 returns null if directory does not exist. 283 status = fs.listStatus(versionFile); 284 } catch (FileNotFoundException fnfe) { 285 return null; 286 } 287 if (ArrayUtils.getLength(status) == 0) { 288 return null; 289 } 290 String version = null; 291 byte[] content = new byte[(int) status[0].getLen()]; 292 FSDataInputStream s = fs.open(versionFile); 293 try { 294 IOUtils.readFully(s, content, 0, content.length); 295 if (ProtobufUtil.isPBMagicPrefix(content)) { 296 version = parseVersionFrom(content); 297 } else { 298 // Presume it pre-pb format. 299 try (DataInputStream dis = new DataInputStream(new ByteArrayInputStream(content))) { 300 version = dis.readUTF(); 301 } 302 } 303 } catch (EOFException eof) { 304 LOG.warn("Version file was empty, odd, will try to set it."); 305 } finally { 306 s.close(); 307 } 308 return version; 309 } 310 311 /** 312 * Parse the content of the ${HBASE_ROOTDIR}/hbase.version file. 313 * @param bytes The byte content of the hbase.version file 314 * @return The version found in the file as a String 315 * @throws DeserializationException if the version data cannot be translated into a version 316 */ 317 static String parseVersionFrom(final byte[] bytes) throws DeserializationException { 318 ProtobufUtil.expectPBMagicPrefix(bytes); 319 int pblen = ProtobufUtil.lengthOfPBMagic(); 320 FSProtos.HBaseVersionFileContent.Builder builder = 321 FSProtos.HBaseVersionFileContent.newBuilder(); 322 try { 323 ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen); 324 return builder.getVersion(); 325 } catch (IOException e) { 326 // Convert 327 throw new DeserializationException(e); 328 } 329 } 330 331 /** 332 * Create the content to write into the ${HBASE_ROOTDIR}/hbase.version file. 333 * @param version Version to persist 334 * @return Serialized protobuf with <code>version</code> content and a bit of pb magic for a 335 * prefix. 336 */ 337 static byte[] toVersionByteArray(final String version) { 338 FSProtos.HBaseVersionFileContent.Builder builder = 339 FSProtos.HBaseVersionFileContent.newBuilder(); 340 return ProtobufUtil.prependPBMagic(builder.setVersion(version).build().toByteArray()); 341 } 342 343 /** 344 * Verifies current version of file system 345 * @param fs file system 346 * @param rootdir root directory of HBase installation 347 * @param message if true, issues a message on System.out 348 * @throws IOException if the version file cannot be opened 349 * @throws DeserializationException if the contents of the version file cannot be parsed 350 */ 351 public static void checkVersion(FileSystem fs, Path rootdir, boolean message) 352 throws IOException, DeserializationException { 353 checkVersion(fs, rootdir, message, 0, HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS); 354 } 355 356 /** 357 * Verifies current version of file system 358 * @param fs file system 359 * @param rootdir root directory of HBase installation 360 * @param message if true, issues a message on System.out 361 * @param wait wait interval 362 * @param retries number of times to retry 363 * @throws IOException if the version file cannot be opened 364 * @throws DeserializationException if the contents of the version file cannot be parsed 365 */ 366 public static void checkVersion(FileSystem fs, Path rootdir, boolean message, int wait, 367 int retries) throws IOException, DeserializationException { 368 String version = getVersion(fs, rootdir); 369 String msg; 370 if (version == null) { 371 if (!metaRegionExists(fs, rootdir)) { 372 // rootDir is empty (no version file and no root region) 373 // just create new version file (HBASE-1195) 374 setVersion(fs, rootdir, wait, retries); 375 return; 376 } else { 377 msg = "hbase.version file is missing. Is your hbase.rootdir valid? " 378 + "You can restore hbase.version file by running 'HBCK2 filesystem -fix'. " 379 + "See https://github.com/apache/hbase-operator-tools/tree/master/hbase-hbck2"; 380 } 381 } else if (version.compareTo(HConstants.FILE_SYSTEM_VERSION) == 0) { 382 return; 383 } else { 384 msg = "HBase file layout needs to be upgraded. Current filesystem version is " + version 385 + " but software requires version " + HConstants.FILE_SYSTEM_VERSION 386 + ". Consult http://hbase.apache.org/book.html for further information about " 387 + "upgrading HBase."; 388 } 389 390 // version is deprecated require migration 391 // Output on stdout so user sees it in terminal. 392 if (message) { 393 System.out.println("WARNING! " + msg); 394 } 395 throw new FileSystemVersionException(msg); 396 } 397 398 /** 399 * Sets version of file system 400 * @param fs filesystem object 401 * @param rootdir hbase root 402 * @throws IOException e 403 */ 404 public static void setVersion(FileSystem fs, Path rootdir) throws IOException { 405 setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, 0, 406 HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS); 407 } 408 409 /** 410 * Sets version of file system 411 * @param fs filesystem object 412 * @param rootdir hbase root 413 * @param wait time to wait for retry 414 * @param retries number of times to retry before failing 415 * @throws IOException e 416 */ 417 public static void setVersion(FileSystem fs, Path rootdir, int wait, int retries) 418 throws IOException { 419 setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, wait, retries); 420 } 421 422 /** 423 * Sets version of file system 424 * @param fs filesystem object 425 * @param rootdir hbase root directory 426 * @param version version to set 427 * @param wait time to wait for retry 428 * @param retries number of times to retry before throwing an IOException 429 * @throws IOException e 430 */ 431 public static void setVersion(FileSystem fs, Path rootdir, String version, int wait, int retries) 432 throws IOException { 433 Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME); 434 Path tempVersionFile = new Path(rootdir, 435 HConstants.HBASE_TEMP_DIRECTORY + Path.SEPARATOR + HConstants.VERSION_FILE_NAME); 436 while (true) { 437 try { 438 // Write the version to a temporary file 439 FSDataOutputStream s = fs.create(tempVersionFile); 440 try { 441 s.write(toVersionByteArray(version)); 442 s.close(); 443 s = null; 444 // Move the temp version file to its normal location. Returns false 445 // if the rename failed. Throw an IOE in that case. 446 if (!fs.rename(tempVersionFile, versionFile)) { 447 throw new IOException("Unable to move temp version file to " + versionFile); 448 } 449 } finally { 450 // Cleaning up the temporary if the rename failed would be trying 451 // too hard. We'll unconditionally create it again the next time 452 // through anyway, files are overwritten by default by create(). 453 454 // Attempt to close the stream on the way out if it is still open. 455 try { 456 if (s != null) s.close(); 457 } catch (IOException ignore) { 458 } 459 } 460 LOG.info("Created version file at " + rootdir.toString() + " with version=" + version); 461 return; 462 } catch (IOException e) { 463 if (retries > 0) { 464 LOG.debug("Unable to create version file at " + rootdir.toString() + ", retrying", e); 465 fs.delete(versionFile, false); 466 try { 467 if (wait > 0) { 468 Thread.sleep(wait); 469 } 470 } catch (InterruptedException ie) { 471 throw (InterruptedIOException) new InterruptedIOException().initCause(ie); 472 } 473 retries--; 474 } else { 475 throw e; 476 } 477 } 478 } 479 } 480 481 /** 482 * Checks that a cluster ID file exists in the HBase root directory 483 * @param fs the root directory FileSystem 484 * @param rootdir the HBase root directory in HDFS 485 * @param wait how long to wait between retries 486 * @return <code>true</code> if the file exists, otherwise <code>false</code> 487 * @throws IOException if checking the FileSystem fails 488 */ 489 public static boolean checkClusterIdExists(FileSystem fs, Path rootdir, long wait) 490 throws IOException { 491 while (true) { 492 try { 493 Path filePath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME); 494 return fs.exists(filePath); 495 } catch (IOException ioe) { 496 if (wait > 0L) { 497 LOG.warn("Unable to check cluster ID file in {}, retrying in {}ms", rootdir, wait, ioe); 498 try { 499 Thread.sleep(wait); 500 } catch (InterruptedException e) { 501 Thread.currentThread().interrupt(); 502 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 503 } 504 } else { 505 throw ioe; 506 } 507 } 508 } 509 } 510 511 /** 512 * Returns the value of the unique cluster ID stored for this HBase instance. 513 * @param fs the root directory FileSystem 514 * @param rootdir the path to the HBase root directory 515 * @return the unique cluster identifier 516 * @throws IOException if reading the cluster ID file fails 517 */ 518 public static ClusterId getClusterId(FileSystem fs, Path rootdir) throws IOException { 519 Path idPath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME); 520 ClusterId clusterId = null; 521 FileStatus status = fs.exists(idPath) ? fs.getFileStatus(idPath) : null; 522 if (status != null) { 523 int len = Ints.checkedCast(status.getLen()); 524 byte[] content = new byte[len]; 525 FSDataInputStream in = fs.open(idPath); 526 try { 527 in.readFully(content); 528 } catch (EOFException eof) { 529 LOG.warn("Cluster ID file {} is empty", idPath); 530 } finally { 531 in.close(); 532 } 533 try { 534 clusterId = ClusterId.parseFrom(content); 535 } catch (DeserializationException e) { 536 throw new IOException("content=" + Bytes.toString(content), e); 537 } 538 // If not pb'd, make it so. 539 if (!ProtobufUtil.isPBMagicPrefix(content)) { 540 String cid = null; 541 in = fs.open(idPath); 542 try { 543 cid = in.readUTF(); 544 clusterId = new ClusterId(cid); 545 } catch (EOFException eof) { 546 LOG.warn("Cluster ID file {} is empty", idPath); 547 } finally { 548 in.close(); 549 } 550 rewriteAsPb(fs, rootdir, idPath, clusterId); 551 } 552 return clusterId; 553 } else { 554 LOG.warn("Cluster ID file does not exist at {}", idPath); 555 } 556 return clusterId; 557 } 558 559 /** 560 * */ 561 private static void rewriteAsPb(final FileSystem fs, final Path rootdir, final Path p, 562 final ClusterId cid) throws IOException { 563 // Rewrite the file as pb. Move aside the old one first, write new 564 // then delete the moved-aside file. 565 Path movedAsideName = new Path(p + "." + EnvironmentEdgeManager.currentTime()); 566 if (!fs.rename(p, movedAsideName)) throw new IOException("Failed rename of " + p); 567 setClusterId(fs, rootdir, cid, 100); 568 if (!fs.delete(movedAsideName, false)) { 569 throw new IOException("Failed delete of " + movedAsideName); 570 } 571 LOG.debug("Rewrote the hbase.id file as pb"); 572 } 573 574 /** 575 * Writes a new unique identifier for this cluster to the "hbase.id" file in the HBase root 576 * directory. If any operations on the ID file fails, and {@code wait} is a positive value, the 577 * method will retry to produce the ID file until the thread is forcibly interrupted. 578 * @param fs the root directory FileSystem 579 * @param rootdir the path to the HBase root directory 580 * @param clusterId the unique identifier to store 581 * @param wait how long (in milliseconds) to wait between retries 582 * @throws IOException if writing to the FileSystem fails and no wait value 583 */ 584 public static void setClusterId(final FileSystem fs, final Path rootdir, 585 final ClusterId clusterId, final long wait) throws IOException { 586 587 final Path idFile = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME); 588 final Path tempDir = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY); 589 final Path tempIdFile = new Path(tempDir, HConstants.CLUSTER_ID_FILE_NAME); 590 591 LOG.debug("Create cluster ID file [{}] with ID: {}", idFile, clusterId); 592 593 while (true) { 594 Optional<IOException> failure = Optional.empty(); 595 596 LOG.debug("Write the cluster ID file to a temporary location: {}", tempIdFile); 597 try (FSDataOutputStream s = fs.create(tempIdFile)) { 598 s.write(clusterId.toByteArray()); 599 } catch (IOException ioe) { 600 failure = Optional.of(ioe); 601 } 602 603 if (!failure.isPresent()) { 604 try { 605 LOG.debug("Move the temporary cluster ID file to its target location [{}]:[{}]", 606 tempIdFile, idFile); 607 608 if (!fs.rename(tempIdFile, idFile)) { 609 failure = 610 Optional.of(new IOException("Unable to move temp cluster ID file to " + idFile)); 611 } 612 } catch (IOException ioe) { 613 failure = Optional.of(ioe); 614 } 615 } 616 617 if (failure.isPresent()) { 618 final IOException cause = failure.get(); 619 if (wait > 0L) { 620 LOG.warn("Unable to create cluster ID file in {}, retrying in {}ms", rootdir, wait, 621 cause); 622 try { 623 Thread.sleep(wait); 624 } catch (InterruptedException e) { 625 Thread.currentThread().interrupt(); 626 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 627 } 628 continue; 629 } else { 630 throw cause; 631 } 632 } else { 633 return; 634 } 635 } 636 } 637 638 /** 639 * If DFS, check safe mode and if so, wait until we clear it. 640 * @param conf configuration 641 * @param wait Sleep between retries 642 * @throws IOException e 643 */ 644 public static void waitOnSafeMode(final Configuration conf, final long wait) throws IOException { 645 FileSystem fs = FileSystem.get(conf); 646 if (!(fs instanceof DistributedFileSystem)) return; 647 DistributedFileSystem dfs = (DistributedFileSystem) fs; 648 // Make sure dfs is not in safe mode 649 while (isInSafeMode(dfs)) { 650 LOG.info("Waiting for dfs to exit safe mode..."); 651 try { 652 Thread.sleep(wait); 653 } catch (InterruptedException e) { 654 Thread.currentThread().interrupt(); 655 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 656 } 657 } 658 } 659 660 /** 661 * Checks if meta region exists 662 * @param fs file system 663 * @param rootDir root directory of HBase installation 664 * @return true if exists 665 */ 666 public static boolean metaRegionExists(FileSystem fs, Path rootDir) throws IOException { 667 Path metaRegionDir = getRegionDirFromRootDir(rootDir, RegionInfoBuilder.FIRST_META_REGIONINFO); 668 return fs.exists(metaRegionDir); 669 } 670 671 /** 672 * Compute HDFS block distribution of a given HdfsDataInputStream. All HdfsDataInputStreams are 673 * backed by a series of LocatedBlocks, which are fetched periodically from the namenode. This 674 * method retrieves those blocks from the input stream and uses them to calculate 675 * HDFSBlockDistribution. The underlying method in DFSInputStream does attempt to use locally 676 * cached blocks, but may hit the namenode if the cache is determined to be incomplete. The method 677 * also involves making copies of all LocatedBlocks rather than return the underlying blocks 678 * themselves. 679 */ 680 static public HDFSBlocksDistribution 681 computeHDFSBlocksDistribution(HdfsDataInputStream inputStream) throws IOException { 682 List<LocatedBlock> blocks = inputStream.getAllBlocks(); 683 HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution(); 684 for (LocatedBlock block : blocks) { 685 String[] hosts = getHostsForLocations(block); 686 long len = block.getBlockSize(); 687 StorageType[] storageTypes = block.getStorageTypes(); 688 blocksDistribution.addHostsAndBlockWeight(hosts, len, storageTypes); 689 } 690 return blocksDistribution; 691 } 692 693 private static String[] getHostsForLocations(LocatedBlock block) { 694 DatanodeInfo[] locations = block.getLocations(); 695 String[] hosts = new String[locations.length]; 696 for (int i = 0; i < hosts.length; i++) { 697 hosts[i] = locations[i].getHostName(); 698 } 699 return hosts; 700 } 701 702 /** 703 * Compute HDFS blocks distribution of a given file, or a portion of the file 704 * @param fs file system 705 * @param status file status of the file 706 * @param start start position of the portion 707 * @param length length of the portion 708 * @return The HDFS blocks distribution 709 */ 710 static public HDFSBlocksDistribution computeHDFSBlocksDistribution(final FileSystem fs, 711 FileStatus status, long start, long length) throws IOException { 712 HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution(); 713 BlockLocation[] blockLocations = fs.getFileBlockLocations(status, start, length); 714 addToHDFSBlocksDistribution(blocksDistribution, blockLocations); 715 return blocksDistribution; 716 } 717 718 /** 719 * Update blocksDistribution with blockLocations 720 * @param blocksDistribution the hdfs blocks distribution 721 * @param blockLocations an array containing block location 722 */ 723 static public void addToHDFSBlocksDistribution(HDFSBlocksDistribution blocksDistribution, 724 BlockLocation[] blockLocations) throws IOException { 725 for (BlockLocation bl : blockLocations) { 726 String[] hosts = bl.getHosts(); 727 long len = bl.getLength(); 728 StorageType[] storageTypes = bl.getStorageTypes(); 729 blocksDistribution.addHostsAndBlockWeight(hosts, len, storageTypes); 730 } 731 } 732 733 // TODO move this method OUT of FSUtils. No dependencies to HMaster 734 /** 735 * Returns the total overall fragmentation percentage. Includes hbase:meta and -ROOT- as well. 736 * @param master The master defining the HBase root and file system 737 * @return A map for each table and its percentage (never null) 738 * @throws IOException When scanning the directory fails 739 */ 740 public static int getTotalTableFragmentation(final HMaster master) throws IOException { 741 Map<String, Integer> map = getTableFragmentation(master); 742 return map.isEmpty() ? -1 : map.get("-TOTAL-"); 743 } 744 745 /** 746 * Runs through the HBase rootdir and checks how many stores for each table have more than one 747 * file in them. Checks -ROOT- and hbase:meta too. The total percentage across all tables is 748 * stored under the special key "-TOTAL-". 749 * @param master The master defining the HBase root and file system. 750 * @return A map for each table and its percentage (never null). 751 * @throws IOException When scanning the directory fails. 752 */ 753 public static Map<String, Integer> getTableFragmentation(final HMaster master) 754 throws IOException { 755 Path path = CommonFSUtils.getRootDir(master.getConfiguration()); 756 // since HMaster.getFileSystem() is package private 757 FileSystem fs = path.getFileSystem(master.getConfiguration()); 758 return getTableFragmentation(fs, path); 759 } 760 761 /** 762 * Runs through the HBase rootdir and checks how many stores for each table have more than one 763 * file in them. Checks -ROOT- and hbase:meta too. The total percentage across all tables is 764 * stored under the special key "-TOTAL-". 765 * @param fs The file system to use 766 * @param hbaseRootDir The root directory to scan 767 * @return A map for each table and its percentage (never null) 768 * @throws IOException When scanning the directory fails 769 */ 770 public static Map<String, Integer> getTableFragmentation(final FileSystem fs, 771 final Path hbaseRootDir) throws IOException { 772 Map<String, Integer> frags = new HashMap<>(); 773 int cfCountTotal = 0; 774 int cfFragTotal = 0; 775 PathFilter regionFilter = new RegionDirFilter(fs); 776 PathFilter familyFilter = new FamilyDirFilter(fs); 777 List<Path> tableDirs = getTableDirs(fs, hbaseRootDir); 778 for (Path d : tableDirs) { 779 int cfCount = 0; 780 int cfFrag = 0; 781 FileStatus[] regionDirs = fs.listStatus(d, regionFilter); 782 for (FileStatus regionDir : regionDirs) { 783 Path dd = regionDir.getPath(); 784 // else its a region name, now look in region for families 785 FileStatus[] familyDirs = fs.listStatus(dd, familyFilter); 786 for (FileStatus familyDir : familyDirs) { 787 cfCount++; 788 cfCountTotal++; 789 Path family = familyDir.getPath(); 790 // now in family make sure only one file 791 FileStatus[] familyStatus = fs.listStatus(family); 792 if (familyStatus.length > 1) { 793 cfFrag++; 794 cfFragTotal++; 795 } 796 } 797 } 798 // compute percentage per table and store in result list 799 frags.put(CommonFSUtils.getTableName(d).getNameAsString(), 800 cfCount == 0 ? 0 : Math.round((float) cfFrag / cfCount * 100)); 801 } 802 // set overall percentage for all tables 803 frags.put("-TOTAL-", 804 cfCountTotal == 0 ? 0 : Math.round((float) cfFragTotal / cfCountTotal * 100)); 805 return frags; 806 } 807 808 public static void renameFile(FileSystem fs, Path src, Path dst) throws IOException { 809 if (fs.exists(dst) && !fs.delete(dst, false)) { 810 throw new IOException("Can not delete " + dst); 811 } 812 if (!fs.rename(src, dst)) { 813 throw new IOException("Can not rename from " + src + " to " + dst); 814 } 815 } 816 817 /** 818 * A {@link PathFilter} that returns only regular files. 819 */ 820 static class FileFilter extends AbstractFileStatusFilter { 821 private final FileSystem fs; 822 823 public FileFilter(final FileSystem fs) { 824 this.fs = fs; 825 } 826 827 @Override 828 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 829 try { 830 return isFile(fs, isDir, p); 831 } catch (IOException e) { 832 LOG.warn("Unable to verify if path={} is a regular file", p, e); 833 return false; 834 } 835 } 836 } 837 838 /** 839 * Directory filter that doesn't include any of the directories in the specified blacklist 840 */ 841 public static class BlackListDirFilter extends AbstractFileStatusFilter { 842 private final FileSystem fs; 843 private List<String> blacklist; 844 845 /** 846 * Create a filter on the givem filesystem with the specified blacklist 847 * @param fs filesystem to filter 848 * @param directoryNameBlackList list of the names of the directories to filter. If 849 * <tt>null</tt>, all directories are returned 850 */ 851 @SuppressWarnings("unchecked") 852 public BlackListDirFilter(final FileSystem fs, final List<String> directoryNameBlackList) { 853 this.fs = fs; 854 blacklist = (List<String>) (directoryNameBlackList == null 855 ? Collections.emptyList() 856 : directoryNameBlackList); 857 } 858 859 @Override 860 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 861 if (!isValidName(p.getName())) { 862 return false; 863 } 864 865 try { 866 return isDirectory(fs, isDir, p); 867 } catch (IOException e) { 868 LOG.warn("An error occurred while verifying if [{}] is a valid directory." 869 + " Returning 'not valid' and continuing.", p, e); 870 return false; 871 } 872 } 873 874 protected boolean isValidName(final String name) { 875 return !blacklist.contains(name); 876 } 877 } 878 879 /** 880 * A {@link PathFilter} that only allows directories. 881 */ 882 public static class DirFilter extends BlackListDirFilter { 883 884 public DirFilter(FileSystem fs) { 885 super(fs, null); 886 } 887 } 888 889 /** 890 * A {@link PathFilter} that returns usertable directories. To get all directories use the 891 * {@link BlackListDirFilter} with a <tt>null</tt> blacklist 892 */ 893 public static class UserTableDirFilter extends BlackListDirFilter { 894 public UserTableDirFilter(FileSystem fs) { 895 super(fs, HConstants.HBASE_NON_TABLE_DIRS); 896 } 897 898 @Override 899 protected boolean isValidName(final String name) { 900 if (!super.isValidName(name)) return false; 901 902 try { 903 TableName.isLegalTableQualifierName(Bytes.toBytes(name)); 904 } catch (IllegalArgumentException e) { 905 LOG.info("Invalid table name: {}", name); 906 return false; 907 } 908 return true; 909 } 910 } 911 912 public static List<Path> getTableDirs(final FileSystem fs, final Path rootdir) 913 throws IOException { 914 List<Path> tableDirs = new ArrayList<>(); 915 Path baseNamespaceDir = new Path(rootdir, HConstants.BASE_NAMESPACE_DIR); 916 if (fs.exists(baseNamespaceDir)) { 917 for (FileStatus status : fs.globStatus(new Path(baseNamespaceDir, "*"))) { 918 tableDirs.addAll(FSUtils.getLocalTableDirs(fs, status.getPath())); 919 } 920 } 921 return tableDirs; 922 } 923 924 /** 925 * @return All the table directories under <code>rootdir</code>. Ignore non table hbase folders 926 * such as .logs, .oldlogs, .corrupt folders. 927 */ 928 public static List<Path> getLocalTableDirs(final FileSystem fs, final Path rootdir) 929 throws IOException { 930 // presumes any directory under hbase.rootdir is a table 931 FileStatus[] dirs = fs.listStatus(rootdir, new UserTableDirFilter(fs)); 932 List<Path> tabledirs = new ArrayList<>(dirs.length); 933 for (FileStatus dir : dirs) { 934 tabledirs.add(dir.getPath()); 935 } 936 return tabledirs; 937 } 938 939 /** 940 * Filter for all dirs that don't start with '.' 941 */ 942 public static class RegionDirFilter extends AbstractFileStatusFilter { 943 // This pattern will accept 0.90+ style hex region dirs and older numeric region dir names. 944 final public static Pattern regionDirPattern = Pattern.compile("^[0-9a-f]*$"); 945 final FileSystem fs; 946 947 public RegionDirFilter(FileSystem fs) { 948 this.fs = fs; 949 } 950 951 @Override 952 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 953 if (!regionDirPattern.matcher(p.getName()).matches()) { 954 return false; 955 } 956 957 try { 958 return isDirectory(fs, isDir, p); 959 } catch (IOException ioe) { 960 // Maybe the file was moved or the fs was disconnected. 961 LOG.warn("Skipping file {} due to IOException", p, ioe); 962 return false; 963 } 964 } 965 } 966 967 /** 968 * Given a particular table dir, return all the regiondirs inside it, excluding files such as 969 * .tableinfo 970 * @param fs A file system for the Path 971 * @param tableDir Path to a specific table directory <hbase.rootdir>/<tabledir> 972 * @return List of paths to valid region directories in table dir. 973 */ 974 public static List<Path> getRegionDirs(final FileSystem fs, final Path tableDir) 975 throws IOException { 976 // assumes we are in a table dir. 977 List<FileStatus> rds = listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs)); 978 if (rds == null) { 979 return Collections.emptyList(); 980 } 981 List<Path> regionDirs = new ArrayList<>(rds.size()); 982 for (FileStatus rdfs : rds) { 983 Path rdPath = rdfs.getPath(); 984 regionDirs.add(rdPath); 985 } 986 return regionDirs; 987 } 988 989 public static Path getRegionDirFromRootDir(Path rootDir, RegionInfo region) { 990 return getRegionDirFromTableDir(CommonFSUtils.getTableDir(rootDir, region.getTable()), region); 991 } 992 993 public static Path getRegionDirFromTableDir(Path tableDir, RegionInfo region) { 994 return getRegionDirFromTableDir(tableDir, 995 ServerRegionReplicaUtil.getRegionInfoForFs(region).getEncodedName()); 996 } 997 998 public static Path getRegionDirFromTableDir(Path tableDir, String encodedRegionName) { 999 return new Path(tableDir, encodedRegionName); 1000 } 1001 1002 /** 1003 * Filter for all dirs that are legal column family names. This is generally used for colfam dirs 1004 * <hbase.rootdir>/<tabledir>/<regiondir>/<colfamdir>. 1005 */ 1006 public static class FamilyDirFilter extends AbstractFileStatusFilter { 1007 final FileSystem fs; 1008 1009 public FamilyDirFilter(FileSystem fs) { 1010 this.fs = fs; 1011 } 1012 1013 @Override 1014 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 1015 try { 1016 // throws IAE if invalid 1017 ColumnFamilyDescriptorBuilder.isLegalColumnFamilyName(Bytes.toBytes(p.getName())); 1018 } catch (IllegalArgumentException iae) { 1019 // path name is an invalid family name and thus is excluded. 1020 return false; 1021 } 1022 1023 try { 1024 return isDirectory(fs, isDir, p); 1025 } catch (IOException ioe) { 1026 // Maybe the file was moved or the fs was disconnected. 1027 LOG.warn("Skipping file {} due to IOException", p, ioe); 1028 return false; 1029 } 1030 } 1031 } 1032 1033 /** 1034 * Given a particular region dir, return all the familydirs inside it 1035 * @param fs A file system for the Path 1036 * @param regionDir Path to a specific region directory 1037 * @return List of paths to valid family directories in region dir. 1038 */ 1039 public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir) 1040 throws IOException { 1041 // assumes we are in a region dir. 1042 return getFilePaths(fs, regionDir, new FamilyDirFilter(fs)); 1043 } 1044 1045 public static List<Path> getReferenceFilePaths(final FileSystem fs, final Path familyDir) 1046 throws IOException { 1047 return getFilePaths(fs, familyDir, new ReferenceFileFilter(fs)); 1048 } 1049 1050 public static List<Path> getReferenceAndLinkFilePaths(final FileSystem fs, final Path familyDir) 1051 throws IOException { 1052 return getFilePaths(fs, familyDir, new ReferenceAndLinkFileFilter(fs)); 1053 } 1054 1055 private static List<Path> getFilePaths(final FileSystem fs, final Path dir, 1056 final PathFilter pathFilter) throws IOException { 1057 FileStatus[] fds = fs.listStatus(dir, pathFilter); 1058 List<Path> files = new ArrayList<>(fds.length); 1059 for (FileStatus fdfs : fds) { 1060 Path fdPath = fdfs.getPath(); 1061 files.add(fdPath); 1062 } 1063 return files; 1064 } 1065 1066 public static int getRegionReferenceAndLinkFileCount(final FileSystem fs, final Path p) { 1067 int result = 0; 1068 try { 1069 for (Path familyDir : getFamilyDirs(fs, p)) { 1070 result += getReferenceAndLinkFilePaths(fs, familyDir).size(); 1071 } 1072 } catch (IOException e) { 1073 LOG.warn("Error Counting reference files.", e); 1074 } 1075 return result; 1076 } 1077 1078 public static class ReferenceAndLinkFileFilter implements PathFilter { 1079 1080 private final FileSystem fs; 1081 1082 public ReferenceAndLinkFileFilter(FileSystem fs) { 1083 this.fs = fs; 1084 } 1085 1086 @Override 1087 public boolean accept(Path rd) { 1088 try { 1089 // only files can be references. 1090 return !fs.getFileStatus(rd).isDirectory() 1091 && (StoreFileInfo.isReference(rd) || HFileLink.isHFileLink(rd)); 1092 } catch (IOException ioe) { 1093 // Maybe the file was moved or the fs was disconnected. 1094 LOG.warn("Skipping file " + rd + " due to IOException", ioe); 1095 return false; 1096 } 1097 } 1098 } 1099 1100 /** 1101 * Filter for HFiles that excludes reference files. 1102 */ 1103 public static class HFileFilter extends AbstractFileStatusFilter { 1104 final FileSystem fs; 1105 1106 public HFileFilter(FileSystem fs) { 1107 this.fs = fs; 1108 } 1109 1110 @Override 1111 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 1112 if (!StoreFileInfo.isHFile(p) && !StoreFileInfo.isMobFile(p)) { 1113 return false; 1114 } 1115 1116 try { 1117 return isFile(fs, isDir, p); 1118 } catch (IOException ioe) { 1119 // Maybe the file was moved or the fs was disconnected. 1120 LOG.warn("Skipping file {} due to IOException", p, ioe); 1121 return false; 1122 } 1123 } 1124 } 1125 1126 /** 1127 * Filter for HFileLinks (StoreFiles and HFiles not included). the filter itself does not consider 1128 * if a link is file or not. 1129 */ 1130 public static class HFileLinkFilter implements PathFilter { 1131 1132 @Override 1133 public boolean accept(Path p) { 1134 return HFileLink.isHFileLink(p); 1135 } 1136 } 1137 1138 public static class ReferenceFileFilter extends AbstractFileStatusFilter { 1139 1140 private final FileSystem fs; 1141 1142 public ReferenceFileFilter(FileSystem fs) { 1143 this.fs = fs; 1144 } 1145 1146 @Override 1147 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 1148 if (!StoreFileInfo.isReference(p)) { 1149 return false; 1150 } 1151 1152 try { 1153 // only files can be references. 1154 return isFile(fs, isDir, p); 1155 } catch (IOException ioe) { 1156 // Maybe the file was moved or the fs was disconnected. 1157 LOG.warn("Skipping file {} due to IOException", p, ioe); 1158 return false; 1159 } 1160 } 1161 } 1162 1163 /** 1164 * Called every so-often by storefile map builder getTableStoreFilePathMap to report progress. 1165 */ 1166 interface ProgressReporter { 1167 /** 1168 * @param status File or directory we are about to process. 1169 */ 1170 void progress(FileStatus status); 1171 } 1172 1173 /** 1174 * Runs through the HBase rootdir/tablename and creates a reverse lookup map for table StoreFile 1175 * names to the full Path. <br> 1176 * Example...<br> 1177 * Key = 3944417774205889744 <br> 1178 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1179 * @param map map to add values. If null, this method will create and populate one to 1180 * return 1181 * @param fs The file system to use. 1182 * @param hbaseRootDir The root directory to scan. 1183 * @param tableName name of the table to scan. 1184 * @return Map keyed by StoreFile name with a value of the full Path. 1185 * @throws IOException When scanning the directory fails. 1186 */ 1187 public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map, 1188 final FileSystem fs, final Path hbaseRootDir, TableName tableName) 1189 throws IOException, InterruptedException { 1190 return getTableStoreFilePathMap(map, fs, hbaseRootDir, tableName, null, null, 1191 (ProgressReporter) null); 1192 } 1193 1194 /** 1195 * Runs through the HBase rootdir/tablename and creates a reverse lookup map for table StoreFile 1196 * names to the full Path. Note that because this method can be called on a 'live' HBase system 1197 * that we will skip files that no longer exist by the time we traverse them and similarly the 1198 * user of the result needs to consider that some entries in this map may not exist by the time 1199 * this call completes. <br> 1200 * Example...<br> 1201 * Key = 3944417774205889744 <br> 1202 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1203 * @param resultMap map to add values. If null, this method will create and populate one to 1204 * return 1205 * @param fs The file system to use. 1206 * @param hbaseRootDir The root directory to scan. 1207 * @param tableName name of the table to scan. 1208 * @param sfFilter optional path filter to apply to store files 1209 * @param executor optional executor service to parallelize this operation 1210 * @param progressReporter Instance or null; gets called every time we move to new region of 1211 * family dir and for each store file. 1212 * @return Map keyed by StoreFile name with a value of the full Path. 1213 * @throws IOException When scanning the directory fails. 1214 * @deprecated Since 2.3.0. For removal in hbase4. Use ProgressReporter override instead. 1215 */ 1216 @Deprecated 1217 public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap, 1218 final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter, 1219 ExecutorService executor, final HbckErrorReporter progressReporter) 1220 throws IOException, InterruptedException { 1221 return getTableStoreFilePathMap(resultMap, fs, hbaseRootDir, tableName, sfFilter, executor, 1222 new ProgressReporter() { 1223 @Override 1224 public void progress(FileStatus status) { 1225 // status is not used in this implementation. 1226 progressReporter.progress(); 1227 } 1228 }); 1229 } 1230 1231 /** 1232 * Runs through the HBase rootdir/tablename and creates a reverse lookup map for table StoreFile 1233 * names to the full Path. Note that because this method can be called on a 'live' HBase system 1234 * that we will skip files that no longer exist by the time we traverse them and similarly the 1235 * user of the result needs to consider that some entries in this map may not exist by the time 1236 * this call completes. <br> 1237 * Example...<br> 1238 * Key = 3944417774205889744 <br> 1239 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1240 * @param resultMap map to add values. If null, this method will create and populate one to 1241 * return 1242 * @param fs The file system to use. 1243 * @param hbaseRootDir The root directory to scan. 1244 * @param tableName name of the table to scan. 1245 * @param sfFilter optional path filter to apply to store files 1246 * @param executor optional executor service to parallelize this operation 1247 * @param progressReporter Instance or null; gets called every time we move to new region of 1248 * family dir and for each store file. 1249 * @return Map keyed by StoreFile name with a value of the full Path. 1250 * @throws IOException When scanning the directory fails. 1251 * @throws InterruptedException the thread is interrupted, either before or during the activity. 1252 */ 1253 public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap, 1254 final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter, 1255 ExecutorService executor, final ProgressReporter progressReporter) 1256 throws IOException, InterruptedException { 1257 1258 final Map<String, Path> finalResultMap = 1259 resultMap == null ? new ConcurrentHashMap<>(128, 0.75f, 32) : resultMap; 1260 1261 // only include the directory paths to tables 1262 Path tableDir = CommonFSUtils.getTableDir(hbaseRootDir, tableName); 1263 // Inside a table, there are compaction.dir directories to skip. Otherwise, all else 1264 // should be regions. 1265 final FamilyDirFilter familyFilter = new FamilyDirFilter(fs); 1266 final Vector<Exception> exceptions = new Vector<>(); 1267 1268 try { 1269 List<FileStatus> regionDirs = 1270 FSUtils.listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs)); 1271 if (regionDirs == null) { 1272 return finalResultMap; 1273 } 1274 1275 final List<Future<?>> futures = new ArrayList<>(regionDirs.size()); 1276 1277 for (FileStatus regionDir : regionDirs) { 1278 if (null != progressReporter) { 1279 progressReporter.progress(regionDir); 1280 } 1281 final Path dd = regionDir.getPath(); 1282 1283 if (!exceptions.isEmpty()) { 1284 break; 1285 } 1286 1287 Runnable getRegionStoreFileMapCall = new Runnable() { 1288 @Override 1289 public void run() { 1290 try { 1291 HashMap<String, Path> regionStoreFileMap = new HashMap<>(); 1292 List<FileStatus> familyDirs = 1293 FSUtils.listStatusWithStatusFilter(fs, dd, familyFilter); 1294 if (familyDirs == null) { 1295 if (!fs.exists(dd)) { 1296 LOG.warn("Skipping region because it no longer exists: " + dd); 1297 } else { 1298 LOG.warn("Skipping region because it has no family dirs: " + dd); 1299 } 1300 return; 1301 } 1302 for (FileStatus familyDir : familyDirs) { 1303 if (null != progressReporter) { 1304 progressReporter.progress(familyDir); 1305 } 1306 Path family = familyDir.getPath(); 1307 if (family.getName().equals(HConstants.RECOVERED_EDITS_DIR)) { 1308 continue; 1309 } 1310 // now in family, iterate over the StoreFiles and 1311 // put in map 1312 FileStatus[] familyStatus = fs.listStatus(family); 1313 for (FileStatus sfStatus : familyStatus) { 1314 if (null != progressReporter) { 1315 progressReporter.progress(sfStatus); 1316 } 1317 Path sf = sfStatus.getPath(); 1318 if (sfFilter == null || sfFilter.accept(sf)) { 1319 regionStoreFileMap.put(sf.getName(), sf); 1320 } 1321 } 1322 } 1323 finalResultMap.putAll(regionStoreFileMap); 1324 } catch (Exception e) { 1325 LOG.error("Could not get region store file map for region: " + dd, e); 1326 exceptions.add(e); 1327 } 1328 } 1329 }; 1330 1331 // If executor is available, submit async tasks to exec concurrently, otherwise 1332 // just do serial sync execution 1333 if (executor != null) { 1334 Future<?> future = executor.submit(getRegionStoreFileMapCall); 1335 futures.add(future); 1336 } else { 1337 FutureTask<?> future = new FutureTask<>(getRegionStoreFileMapCall, null); 1338 future.run(); 1339 futures.add(future); 1340 } 1341 } 1342 1343 // Ensure all pending tasks are complete (or that we run into an exception) 1344 for (Future<?> f : futures) { 1345 if (!exceptions.isEmpty()) { 1346 break; 1347 } 1348 try { 1349 f.get(); 1350 } catch (ExecutionException e) { 1351 LOG.error("Unexpected exec exception! Should've been caught already. (Bug?)", e); 1352 // Shouldn't happen, we already logged/caught any exceptions in the Runnable 1353 } 1354 } 1355 } catch (IOException e) { 1356 LOG.error("Cannot execute getTableStoreFilePathMap for " + tableName, e); 1357 exceptions.add(e); 1358 } finally { 1359 if (!exceptions.isEmpty()) { 1360 // Just throw the first exception as an indication something bad happened 1361 // Don't need to propagate all the exceptions, we already logged them all anyway 1362 Throwables.propagateIfPossible(exceptions.firstElement(), IOException.class); 1363 throw new IOException(exceptions.firstElement()); 1364 } 1365 } 1366 1367 return finalResultMap; 1368 } 1369 1370 public static int getRegionReferenceFileCount(final FileSystem fs, final Path p) { 1371 int result = 0; 1372 try { 1373 for (Path familyDir : getFamilyDirs(fs, p)) { 1374 result += getReferenceFilePaths(fs, familyDir).size(); 1375 } 1376 } catch (IOException e) { 1377 LOG.warn("Error counting reference files", e); 1378 } 1379 return result; 1380 } 1381 1382 /** 1383 * Runs through the HBase rootdir and creates a reverse lookup map for table StoreFile names to 1384 * the full Path. <br> 1385 * Example...<br> 1386 * Key = 3944417774205889744 <br> 1387 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1388 * @param fs The file system to use. 1389 * @param hbaseRootDir The root directory to scan. 1390 * @return Map keyed by StoreFile name with a value of the full Path. 1391 * @throws IOException When scanning the directory fails. 1392 */ 1393 public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs, 1394 final Path hbaseRootDir) throws IOException, InterruptedException { 1395 return getTableStoreFilePathMap(fs, hbaseRootDir, null, null, (ProgressReporter) null); 1396 } 1397 1398 /** 1399 * Runs through the HBase rootdir and creates a reverse lookup map for table StoreFile names to 1400 * the full Path. <br> 1401 * Example...<br> 1402 * Key = 3944417774205889744 <br> 1403 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1404 * @param fs The file system to use. 1405 * @param hbaseRootDir The root directory to scan. 1406 * @param sfFilter optional path filter to apply to store files 1407 * @param executor optional executor service to parallelize this operation 1408 * @param progressReporter Instance or null; gets called every time we move to new region of 1409 * family dir and for each store file. 1410 * @return Map keyed by StoreFile name with a value of the full Path. 1411 * @throws IOException When scanning the directory fails. 1412 * @deprecated Since 2.3.0. Will be removed in hbase4. Used 1413 * {@link #getTableStoreFilePathMap(FileSystem, Path, PathFilter, ExecutorService, ProgressReporter)} 1414 */ 1415 @Deprecated 1416 public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs, 1417 final Path hbaseRootDir, PathFilter sfFilter, ExecutorService executor, 1418 HbckErrorReporter progressReporter) throws IOException, InterruptedException { 1419 return getTableStoreFilePathMap(fs, hbaseRootDir, sfFilter, executor, new ProgressReporter() { 1420 @Override 1421 public void progress(FileStatus status) { 1422 // status is not used in this implementation. 1423 progressReporter.progress(); 1424 } 1425 }); 1426 } 1427 1428 /** 1429 * Runs through the HBase rootdir and creates a reverse lookup map for table StoreFile names to 1430 * the full Path. <br> 1431 * Example...<br> 1432 * Key = 3944417774205889744 <br> 1433 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1434 * @param fs The file system to use. 1435 * @param hbaseRootDir The root directory to scan. 1436 * @param sfFilter optional path filter to apply to store files 1437 * @param executor optional executor service to parallelize this operation 1438 * @param progressReporter Instance or null; gets called every time we move to new region of 1439 * family dir and for each store file. 1440 * @return Map keyed by StoreFile name with a value of the full Path. 1441 * @throws IOException When scanning the directory fails. 1442 */ 1443 public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs, 1444 final Path hbaseRootDir, PathFilter sfFilter, ExecutorService executor, 1445 ProgressReporter progressReporter) throws IOException, InterruptedException { 1446 ConcurrentHashMap<String, Path> map = new ConcurrentHashMap<>(1024, 0.75f, 32); 1447 1448 // if this method looks similar to 'getTableFragmentation' that is because 1449 // it was borrowed from it. 1450 1451 // only include the directory paths to tables 1452 for (Path tableDir : FSUtils.getTableDirs(fs, hbaseRootDir)) { 1453 getTableStoreFilePathMap(map, fs, hbaseRootDir, CommonFSUtils.getTableName(tableDir), 1454 sfFilter, executor, progressReporter); 1455 } 1456 return map; 1457 } 1458 1459 /** 1460 * Filters FileStatuses in an array and returns a list 1461 * @param input An array of FileStatuses 1462 * @param filter A required filter to filter the array 1463 * @return A list of FileStatuses 1464 */ 1465 public static List<FileStatus> filterFileStatuses(FileStatus[] input, FileStatusFilter filter) { 1466 if (input == null) return null; 1467 return filterFileStatuses(Iterators.forArray(input), filter); 1468 } 1469 1470 /** 1471 * Filters FileStatuses in an iterator and returns a list 1472 * @param input An iterator of FileStatuses 1473 * @param filter A required filter to filter the array 1474 * @return A list of FileStatuses 1475 */ 1476 public static List<FileStatus> filterFileStatuses(Iterator<FileStatus> input, 1477 FileStatusFilter filter) { 1478 if (input == null) return null; 1479 ArrayList<FileStatus> results = new ArrayList<>(); 1480 while (input.hasNext()) { 1481 FileStatus f = input.next(); 1482 if (filter.accept(f)) { 1483 results.add(f); 1484 } 1485 } 1486 return results; 1487 } 1488 1489 /** 1490 * Calls fs.listStatus() and treats FileNotFoundException as non-fatal This accommodates 1491 * differences between hadoop versions, where hadoop 1 does not throw a FileNotFoundException, and 1492 * return an empty FileStatus[] while Hadoop 2 will throw FileNotFoundException. 1493 * @param fs file system 1494 * @param dir directory 1495 * @param filter file status filter 1496 * @return null if dir is empty or doesn't exist, otherwise FileStatus list 1497 */ 1498 public static List<FileStatus> listStatusWithStatusFilter(final FileSystem fs, final Path dir, 1499 final FileStatusFilter filter) throws IOException { 1500 FileStatus[] status = null; 1501 try { 1502 status = fs.listStatus(dir); 1503 } catch (FileNotFoundException fnfe) { 1504 LOG.trace("{} does not exist", dir); 1505 return null; 1506 } 1507 1508 if (ArrayUtils.getLength(status) == 0) { 1509 return null; 1510 } 1511 1512 if (filter == null) { 1513 return Arrays.asList(status); 1514 } else { 1515 List<FileStatus> status2 = filterFileStatuses(status, filter); 1516 if (status2 == null || status2.isEmpty()) { 1517 return null; 1518 } else { 1519 return status2; 1520 } 1521 } 1522 } 1523 1524 /** 1525 * This function is to scan the root path of the file system to get the degree of locality for 1526 * each region on each of the servers having at least one block of that region. This is used by 1527 * the tool {@link org.apache.hadoop.hbase.master.RegionPlacementMaintainer} the configuration to 1528 * use 1529 * @return the mapping from region encoded name to a map of server names to locality fraction in 1530 * case of file system errors or interrupts 1531 */ 1532 public static Map<String, Map<String, Float>> 1533 getRegionDegreeLocalityMappingFromFS(final Configuration conf) throws IOException { 1534 return getRegionDegreeLocalityMappingFromFS(conf, null, 1535 conf.getInt(THREAD_POOLSIZE, DEFAULT_THREAD_POOLSIZE)); 1536 1537 } 1538 1539 /** 1540 * This function is to scan the root path of the file system to get the degree of locality for 1541 * each region on each of the servers having at least one block of that region. the configuration 1542 * to use the table you wish to scan locality for the thread pool size to use 1543 * @return the mapping from region encoded name to a map of server names to locality fraction in 1544 * case of file system errors or interrupts 1545 */ 1546 public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS( 1547 final Configuration conf, final String desiredTable, int threadPoolSize) throws IOException { 1548 Map<String, Map<String, Float>> regionDegreeLocalityMapping = new ConcurrentHashMap<>(); 1549 getRegionLocalityMappingFromFS(conf, desiredTable, threadPoolSize, regionDegreeLocalityMapping); 1550 return regionDegreeLocalityMapping; 1551 } 1552 1553 /** 1554 * This function is to scan the root path of the file system to get either the mapping between the 1555 * region name and its best locality region server or the degree of locality of each region on 1556 * each of the servers having at least one block of that region. The output map parameters are 1557 * both optional. the configuration to use the table you wish to scan locality for the thread pool 1558 * size to use the map into which to put the locality degree mapping or null, must be a 1559 * thread-safe implementation in case of file system errors or interrupts 1560 */ 1561 private static void getRegionLocalityMappingFromFS(final Configuration conf, 1562 final String desiredTable, int threadPoolSize, 1563 final Map<String, Map<String, Float>> regionDegreeLocalityMapping) throws IOException { 1564 final FileSystem fs = FileSystem.get(conf); 1565 final Path rootPath = CommonFSUtils.getRootDir(conf); 1566 final long startTime = EnvironmentEdgeManager.currentTime(); 1567 final Path queryPath; 1568 // The table files are in ${hbase.rootdir}/data/<namespace>/<table>/* 1569 if (null == desiredTable) { 1570 queryPath = 1571 new Path(new Path(rootPath, HConstants.BASE_NAMESPACE_DIR).toString() + "/*/*/*/"); 1572 } else { 1573 queryPath = new Path( 1574 CommonFSUtils.getTableDir(rootPath, TableName.valueOf(desiredTable)).toString() + "/*/"); 1575 } 1576 1577 // reject all paths that are not appropriate 1578 PathFilter pathFilter = new PathFilter() { 1579 @Override 1580 public boolean accept(Path path) { 1581 // this is the region name; it may get some noise data 1582 if (null == path) { 1583 return false; 1584 } 1585 1586 // no parent? 1587 Path parent = path.getParent(); 1588 if (null == parent) { 1589 return false; 1590 } 1591 1592 String regionName = path.getName(); 1593 if (null == regionName) { 1594 return false; 1595 } 1596 1597 if (!regionName.toLowerCase(Locale.ROOT).matches("[0-9a-f]+")) { 1598 return false; 1599 } 1600 return true; 1601 } 1602 }; 1603 1604 FileStatus[] statusList = fs.globStatus(queryPath, pathFilter); 1605 1606 if (LOG.isDebugEnabled()) { 1607 LOG.debug("Query Path: {} ; # list of files: {}", queryPath, Arrays.toString(statusList)); 1608 } 1609 1610 if (null == statusList) { 1611 return; 1612 } 1613 1614 // lower the number of threads in case we have very few expected regions 1615 threadPoolSize = Math.min(threadPoolSize, statusList.length); 1616 1617 // run in multiple threads 1618 final ExecutorService tpe = Executors.newFixedThreadPool(threadPoolSize, 1619 new ThreadFactoryBuilder().setNameFormat("FSRegionQuery-pool-%d").setDaemon(true) 1620 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 1621 try { 1622 // ignore all file status items that are not of interest 1623 for (FileStatus regionStatus : statusList) { 1624 if (null == regionStatus || !regionStatus.isDirectory()) { 1625 continue; 1626 } 1627 1628 final Path regionPath = regionStatus.getPath(); 1629 if (null != regionPath) { 1630 tpe.execute(new FSRegionScanner(fs, regionPath, null, regionDegreeLocalityMapping)); 1631 } 1632 } 1633 } finally { 1634 tpe.shutdown(); 1635 final long threadWakeFrequency = (long) conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 1636 HConstants.DEFAULT_THREAD_WAKE_FREQUENCY); 1637 try { 1638 // here we wait until TPE terminates, which is either naturally or by 1639 // exceptions in the execution of the threads 1640 while (!tpe.awaitTermination(threadWakeFrequency, TimeUnit.MILLISECONDS)) { 1641 // printing out rough estimate, so as to not introduce 1642 // AtomicInteger 1643 LOG.info("Locality checking is underway: { Scanned Regions : " 1644 + ((ThreadPoolExecutor) tpe).getCompletedTaskCount() + "/" 1645 + ((ThreadPoolExecutor) tpe).getTaskCount() + " }"); 1646 } 1647 } catch (InterruptedException e) { 1648 Thread.currentThread().interrupt(); 1649 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 1650 } 1651 } 1652 1653 long overhead = EnvironmentEdgeManager.currentTime() - startTime; 1654 LOG.info("Scan DFS for locality info takes {}ms", overhead); 1655 } 1656 1657 /** 1658 * Do our short circuit read setup. Checks buffer size to use and whether to do checksumming in 1659 * hbase or hdfs. 1660 */ 1661 public static void setupShortCircuitRead(final Configuration conf) { 1662 // Check that the user has not set the "dfs.client.read.shortcircuit.skip.checksum" property. 1663 boolean shortCircuitSkipChecksum = 1664 conf.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false); 1665 boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true); 1666 if (shortCircuitSkipChecksum) { 1667 LOG.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not " 1668 + "be set to true." 1669 + (useHBaseChecksum 1670 ? " HBase checksum doesn't require " 1671 + "it, see https://issues.apache.org/jira/browse/HBASE-6868." 1672 : "")); 1673 assert !shortCircuitSkipChecksum; // this will fail if assertions are on 1674 } 1675 checkShortCircuitReadBufferSize(conf); 1676 } 1677 1678 /** 1679 * Check if short circuit read buffer size is set and if not, set it to hbase value. 1680 */ 1681 public static void checkShortCircuitReadBufferSize(final Configuration conf) { 1682 final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2; 1683 final int notSet = -1; 1684 // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2 1685 final String dfsKey = "dfs.client.read.shortcircuit.buffer.size"; 1686 int size = conf.getInt(dfsKey, notSet); 1687 // If a size is set, return -- we will use it. 1688 if (size != notSet) return; 1689 // But short circuit buffer size is normally not set. Put in place the hbase wanted size. 1690 int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize); 1691 conf.setIfUnset(dfsKey, Integer.toString(hbaseSize)); 1692 } 1693 1694 /** 1695 * Returns The DFSClient DFSHedgedReadMetrics instance or null if can't be found or not on hdfs. 1696 */ 1697 public static DFSHedgedReadMetrics getDFSHedgedReadMetrics(final Configuration c) 1698 throws IOException { 1699 if (!CommonFSUtils.isHDFS(c)) { 1700 return null; 1701 } 1702 // getHedgedReadMetrics is package private. Get the DFSClient instance that is internal 1703 // to the DFS FS instance and make the method getHedgedReadMetrics accessible, then invoke it 1704 // to get the singleton instance of DFSHedgedReadMetrics shared by DFSClients. 1705 final String name = "getHedgedReadMetrics"; 1706 DFSClient dfsclient = ((DistributedFileSystem) FileSystem.get(c)).getClient(); 1707 Method m; 1708 try { 1709 m = dfsclient.getClass().getDeclaredMethod(name); 1710 } catch (NoSuchMethodException e) { 1711 LOG.warn( 1712 "Failed find method " + name + " in dfsclient; no hedged read metrics: " + e.getMessage()); 1713 return null; 1714 } catch (SecurityException e) { 1715 LOG.warn( 1716 "Failed find method " + name + " in dfsclient; no hedged read metrics: " + e.getMessage()); 1717 return null; 1718 } 1719 m.setAccessible(true); 1720 try { 1721 return (DFSHedgedReadMetrics) m.invoke(dfsclient); 1722 } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { 1723 LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " 1724 + e.getMessage()); 1725 return null; 1726 } 1727 } 1728 1729 public static List<Path> copyFilesParallel(FileSystem srcFS, Path src, FileSystem dstFS, Path dst, 1730 Configuration conf, int threads) throws IOException { 1731 ExecutorService pool = Executors.newFixedThreadPool(threads); 1732 List<Future<Void>> futures = new ArrayList<>(); 1733 List<Path> traversedPaths; 1734 try { 1735 traversedPaths = copyFiles(srcFS, src, dstFS, dst, conf, pool, futures); 1736 for (Future<Void> future : futures) { 1737 future.get(); 1738 } 1739 } catch (ExecutionException | InterruptedException | IOException e) { 1740 throw new IOException("Copy snapshot reference files failed", e); 1741 } finally { 1742 pool.shutdownNow(); 1743 } 1744 return traversedPaths; 1745 } 1746 1747 private static List<Path> copyFiles(FileSystem srcFS, Path src, FileSystem dstFS, Path dst, 1748 Configuration conf, ExecutorService pool, List<Future<Void>> futures) throws IOException { 1749 List<Path> traversedPaths = new ArrayList<>(); 1750 traversedPaths.add(dst); 1751 FileStatus currentFileStatus = srcFS.getFileStatus(src); 1752 if (currentFileStatus.isDirectory()) { 1753 if (!dstFS.mkdirs(dst)) { 1754 throw new IOException("Create directory failed: " + dst); 1755 } 1756 FileStatus[] subPaths = srcFS.listStatus(src); 1757 for (FileStatus subPath : subPaths) { 1758 traversedPaths.addAll(copyFiles(srcFS, subPath.getPath(), dstFS, 1759 new Path(dst, subPath.getPath().getName()), conf, pool, futures)); 1760 } 1761 } else { 1762 Future<Void> future = pool.submit(() -> { 1763 FileUtil.copy(srcFS, src, dstFS, dst, false, false, conf); 1764 return null; 1765 }); 1766 futures.add(future); 1767 } 1768 return traversedPaths; 1769 } 1770 1771 /** Returns A set containing all namenode addresses of fs */ 1772 private static Set<InetSocketAddress> getNNAddresses(DistributedFileSystem fs, 1773 Configuration conf) { 1774 Set<InetSocketAddress> addresses = new HashSet<>(); 1775 String serviceName = fs.getCanonicalServiceName(); 1776 1777 if (serviceName.startsWith("ha-hdfs")) { 1778 try { 1779 Map<String, Map<String, InetSocketAddress>> addressMap = 1780 DFSUtil.getNNServiceRpcAddressesForCluster(conf); 1781 String nameService = serviceName.substring(serviceName.indexOf(":") + 1); 1782 if (addressMap.containsKey(nameService)) { 1783 Map<String, InetSocketAddress> nnMap = addressMap.get(nameService); 1784 for (Map.Entry<String, InetSocketAddress> e2 : nnMap.entrySet()) { 1785 InetSocketAddress addr = e2.getValue(); 1786 addresses.add(addr); 1787 } 1788 } 1789 } catch (Exception e) { 1790 LOG.warn("DFSUtil.getNNServiceRpcAddresses failed. serviceName=" + serviceName, e); 1791 } 1792 } else { 1793 URI uri = fs.getUri(); 1794 int port = uri.getPort(); 1795 if (port < 0) { 1796 int idx = serviceName.indexOf(':'); 1797 port = Integer.parseInt(serviceName.substring(idx + 1)); 1798 } 1799 InetSocketAddress addr = new InetSocketAddress(uri.getHost(), port); 1800 addresses.add(addr); 1801 } 1802 1803 return addresses; 1804 } 1805 1806 /** 1807 * @param conf the Configuration of HBase 1808 * @return Whether srcFs and desFs are on same hdfs or not 1809 */ 1810 public static boolean isSameHdfs(Configuration conf, FileSystem srcFs, FileSystem desFs) { 1811 // By getCanonicalServiceName, we could make sure both srcFs and desFs 1812 // show a unified format which contains scheme, host and port. 1813 String srcServiceName = srcFs.getCanonicalServiceName(); 1814 String desServiceName = desFs.getCanonicalServiceName(); 1815 1816 if (srcServiceName == null || desServiceName == null) { 1817 return false; 1818 } 1819 if (srcServiceName.equals(desServiceName)) { 1820 return true; 1821 } 1822 if (srcServiceName.startsWith("ha-hdfs") && desServiceName.startsWith("ha-hdfs")) { 1823 Collection<String> internalNameServices = 1824 conf.getTrimmedStringCollection("dfs.internal.nameservices"); 1825 if (!internalNameServices.isEmpty()) { 1826 if (internalNameServices.contains(srcServiceName.split(":")[1])) { 1827 return true; 1828 } else { 1829 return false; 1830 } 1831 } 1832 } 1833 if (srcFs instanceof DistributedFileSystem && desFs instanceof DistributedFileSystem) { 1834 // If one serviceName is an HA format while the other is a non-HA format, 1835 // maybe they refer to the same FileSystem. 1836 // For example, srcFs is "ha-hdfs://nameservices" and desFs is "hdfs://activeNamenode:port" 1837 Set<InetSocketAddress> srcAddrs = getNNAddresses((DistributedFileSystem) srcFs, conf); 1838 Set<InetSocketAddress> desAddrs = getNNAddresses((DistributedFileSystem) desFs, conf); 1839 if (Sets.intersection(srcAddrs, desAddrs).size() > 0) { 1840 return true; 1841 } 1842 } 1843 1844 return false; 1845 } 1846}