001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import static org.apache.hadoop.hbase.util.LocatedBlockHelper.getLocatedBlockLocations; 021import static org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET; 022 023import edu.umd.cs.findbugs.annotations.CheckForNull; 024import java.io.ByteArrayInputStream; 025import java.io.DataInputStream; 026import java.io.EOFException; 027import java.io.FileNotFoundException; 028import java.io.IOException; 029import java.io.InterruptedIOException; 030import java.lang.reflect.InvocationTargetException; 031import java.lang.reflect.Method; 032import java.net.InetSocketAddress; 033import java.net.URI; 034import java.util.ArrayList; 035import java.util.Arrays; 036import java.util.Collection; 037import java.util.Collections; 038import java.util.HashMap; 039import java.util.HashSet; 040import java.util.Iterator; 041import java.util.List; 042import java.util.Locale; 043import java.util.Map; 044import java.util.Optional; 045import java.util.Set; 046import java.util.Vector; 047import java.util.concurrent.ConcurrentHashMap; 048import java.util.concurrent.ExecutionException; 049import java.util.concurrent.ExecutorService; 050import java.util.concurrent.Executors; 051import java.util.concurrent.Future; 052import java.util.concurrent.FutureTask; 053import java.util.concurrent.ThreadPoolExecutor; 054import java.util.concurrent.TimeUnit; 055import java.util.regex.Pattern; 056import org.apache.commons.lang3.ArrayUtils; 057import org.apache.hadoop.conf.Configuration; 058import org.apache.hadoop.fs.BlockLocation; 059import org.apache.hadoop.fs.FSDataInputStream; 060import org.apache.hadoop.fs.FSDataOutputStream; 061import org.apache.hadoop.fs.FileStatus; 062import org.apache.hadoop.fs.FileSystem; 063import org.apache.hadoop.fs.FileUtil; 064import org.apache.hadoop.fs.Path; 065import org.apache.hadoop.fs.PathFilter; 066import org.apache.hadoop.fs.StorageType; 067import org.apache.hadoop.fs.permission.FsPermission; 068import org.apache.hadoop.hbase.ClusterId; 069import org.apache.hadoop.hbase.HConstants; 070import org.apache.hadoop.hbase.HDFSBlocksDistribution; 071import org.apache.hadoop.hbase.TableName; 072import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 073import org.apache.hadoop.hbase.client.RegionInfo; 074import org.apache.hadoop.hbase.client.RegionInfoBuilder; 075import org.apache.hadoop.hbase.exceptions.DeserializationException; 076import org.apache.hadoop.hbase.fs.HFileSystem; 077import org.apache.hadoop.hbase.io.HFileLink; 078import org.apache.hadoop.hbase.master.HMaster; 079import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 080import org.apache.hadoop.hdfs.DFSClient; 081import org.apache.hadoop.hdfs.DFSHedgedReadMetrics; 082import org.apache.hadoop.hdfs.DFSUtil; 083import org.apache.hadoop.hdfs.DistributedFileSystem; 084import org.apache.hadoop.hdfs.client.HdfsDataInputStream; 085import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 086import org.apache.hadoop.hdfs.protocol.LocatedBlock; 087import org.apache.hadoop.io.IOUtils; 088import org.apache.hadoop.ipc.RemoteException; 089import org.apache.yetus.audience.InterfaceAudience; 090import org.slf4j.Logger; 091import org.slf4j.LoggerFactory; 092 093import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 094import org.apache.hbase.thirdparty.com.google.common.collect.Iterators; 095import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 096import org.apache.hbase.thirdparty.com.google.common.primitives.Ints; 097import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 098 099import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 100import org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos; 101 102/** 103 * Utility methods for interacting with the underlying file system. 104 */ 105@InterfaceAudience.Private 106public final class FSUtils { 107 private static final Logger LOG = LoggerFactory.getLogger(FSUtils.class); 108 109 private static final String THREAD_POOLSIZE = "hbase.client.localityCheck.threadPoolSize"; 110 private static final int DEFAULT_THREAD_POOLSIZE = 2; 111 112 /** Set to true on Windows platforms */ 113 // currently only used in testing. TODO refactor into a test class 114 public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows"); 115 116 private FSUtils() { 117 } 118 119 /** Returns True is <code>fs</code> is instance of DistributedFileSystem n */ 120 public static boolean isDistributedFileSystem(final FileSystem fs) throws IOException { 121 FileSystem fileSystem = fs; 122 // If passed an instance of HFileSystem, it fails instanceof DistributedFileSystem. 123 // Check its backing fs for dfs-ness. 124 if (fs instanceof HFileSystem) { 125 fileSystem = ((HFileSystem) fs).getBackingFs(); 126 } 127 return fileSystem instanceof DistributedFileSystem; 128 } 129 130 /** 131 * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the 132 * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true. Does not consider 133 * schema; i.e. if schemas different but path or subpath matches, the two will equate. 134 * @param pathToSearch Path we will be trying to match. 135 * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code> 136 */ 137 public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) { 138 Path tailPath = pathTail; 139 String tailName; 140 Path toSearch = pathToSearch; 141 String toSearchName; 142 boolean result = false; 143 144 if (pathToSearch.depth() != pathTail.depth()) { 145 return false; 146 } 147 148 do { 149 tailName = tailPath.getName(); 150 if (tailName == null || tailName.isEmpty()) { 151 result = true; 152 break; 153 } 154 toSearchName = toSearch.getName(); 155 if (toSearchName == null || toSearchName.isEmpty()) { 156 break; 157 } 158 // Move up a parent on each path for next go around. Path doesn't let us go off the end. 159 tailPath = tailPath.getParent(); 160 toSearch = toSearch.getParent(); 161 } while (tailName.equals(toSearchName)); 162 return result; 163 } 164 165 /** 166 * Delete the region directory if exists. 167 * @return True if deleted the region directory. 168 */ 169 public static boolean deleteRegionDir(final Configuration conf, final RegionInfo hri) 170 throws IOException { 171 Path rootDir = CommonFSUtils.getRootDir(conf); 172 FileSystem fs = rootDir.getFileSystem(conf); 173 return CommonFSUtils.deleteDirectory(fs, 174 new Path(CommonFSUtils.getTableDir(rootDir, hri.getTable()), hri.getEncodedName())); 175 } 176 177 /** 178 * Create the specified file on the filesystem. By default, this will: 179 * <ol> 180 * <li>overwrite the file if it exists</li> 181 * <li>apply the umask in the configuration (if it is enabled)</li> 182 * <li>use the fs configured buffer size (or 4096 if not set)</li> 183 * <li>use the configured column family replication or default replication if 184 * {@link ColumnFamilyDescriptorBuilder#DEFAULT_DFS_REPLICATION}</li> 185 * <li>use the default block size</li> 186 * <li>not track progress</li> 187 * </ol> 188 * @param conf configurations 189 * @param fs {@link FileSystem} on which to write the file 190 * @param path {@link Path} to the file to write 191 * @param perm permissions 192 * @param favoredNodes favored data nodes 193 * @return output stream to the created file 194 * @throws IOException if the file cannot be created 195 */ 196 public static FSDataOutputStream create(Configuration conf, FileSystem fs, Path path, 197 FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException { 198 if (fs instanceof HFileSystem) { 199 FileSystem backingFs = ((HFileSystem) fs).getBackingFs(); 200 if (backingFs instanceof DistributedFileSystem) { 201 short replication = Short.parseShort(conf.get(ColumnFamilyDescriptorBuilder.DFS_REPLICATION, 202 String.valueOf(ColumnFamilyDescriptorBuilder.DEFAULT_DFS_REPLICATION))); 203 DistributedFileSystem.HdfsDataOutputStreamBuilder builder = 204 ((DistributedFileSystem) backingFs).createFile(path).recursive().permission(perm) 205 .create(); 206 if (favoredNodes != null) { 207 builder.favoredNodes(favoredNodes); 208 } 209 if (replication > 0) { 210 builder.replication(replication); 211 } 212 return builder.build(); 213 } 214 215 } 216 return CommonFSUtils.create(fs, path, perm, true); 217 } 218 219 /** 220 * Checks to see if the specified file system is available 221 * @param fs filesystem 222 * @throws IOException e 223 */ 224 public static void checkFileSystemAvailable(final FileSystem fs) throws IOException { 225 if (!(fs instanceof DistributedFileSystem)) { 226 return; 227 } 228 IOException exception = null; 229 DistributedFileSystem dfs = (DistributedFileSystem) fs; 230 try { 231 if (dfs.exists(new Path("/"))) { 232 return; 233 } 234 } catch (IOException e) { 235 exception = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; 236 } 237 try { 238 fs.close(); 239 } catch (Exception e) { 240 LOG.error("file system close failed: ", e); 241 } 242 throw new IOException("File system is not available", exception); 243 } 244 245 /** 246 * Inquire the Active NameNode's safe mode status. 247 * @param dfs A DistributedFileSystem object representing the underlying HDFS. 248 * @return whether we're in safe mode 249 */ 250 private static boolean isInSafeMode(DistributedFileSystem dfs) throws IOException { 251 return dfs.setSafeMode(SAFEMODE_GET, true); 252 } 253 254 /** 255 * Check whether dfs is in safemode. 256 */ 257 public static void checkDfsSafeMode(final Configuration conf) throws IOException { 258 boolean isInSafeMode = false; 259 FileSystem fs = FileSystem.get(conf); 260 if (fs instanceof DistributedFileSystem) { 261 DistributedFileSystem dfs = (DistributedFileSystem) fs; 262 isInSafeMode = isInSafeMode(dfs); 263 } 264 if (isInSafeMode) { 265 throw new IOException("File system is in safemode, it can't be written now"); 266 } 267 } 268 269 /** 270 * Verifies current version of file system 271 * @param fs filesystem object 272 * @param rootdir root hbase directory 273 * @return null if no version file exists, version string otherwise 274 * @throws IOException if the version file fails to open 275 * @throws DeserializationException if the version data cannot be translated into a version 276 */ 277 public static String getVersion(FileSystem fs, Path rootdir) 278 throws IOException, DeserializationException { 279 final Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME); 280 FileStatus[] status = null; 281 try { 282 // hadoop 2.0 throws FNFE if directory does not exist. 283 // hadoop 1.0 returns null if directory does not exist. 284 status = fs.listStatus(versionFile); 285 } catch (FileNotFoundException fnfe) { 286 return null; 287 } 288 if (ArrayUtils.getLength(status) == 0) { 289 return null; 290 } 291 String version = null; 292 byte[] content = new byte[(int) status[0].getLen()]; 293 FSDataInputStream s = fs.open(versionFile); 294 try { 295 IOUtils.readFully(s, content, 0, content.length); 296 if (ProtobufUtil.isPBMagicPrefix(content)) { 297 version = parseVersionFrom(content); 298 } else { 299 // Presume it pre-pb format. 300 try (DataInputStream dis = new DataInputStream(new ByteArrayInputStream(content))) { 301 version = dis.readUTF(); 302 } 303 } 304 } catch (EOFException eof) { 305 LOG.warn("Version file was empty, odd, will try to set it."); 306 } finally { 307 s.close(); 308 } 309 return version; 310 } 311 312 /** 313 * Parse the content of the ${HBASE_ROOTDIR}/hbase.version file. 314 * @param bytes The byte content of the hbase.version file 315 * @return The version found in the file as a String 316 * @throws DeserializationException if the version data cannot be translated into a version 317 */ 318 static String parseVersionFrom(final byte[] bytes) throws DeserializationException { 319 ProtobufUtil.expectPBMagicPrefix(bytes); 320 int pblen = ProtobufUtil.lengthOfPBMagic(); 321 FSProtos.HBaseVersionFileContent.Builder builder = 322 FSProtos.HBaseVersionFileContent.newBuilder(); 323 try { 324 ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen); 325 return builder.getVersion(); 326 } catch (IOException e) { 327 // Convert 328 throw new DeserializationException(e); 329 } 330 } 331 332 /** 333 * Create the content to write into the ${HBASE_ROOTDIR}/hbase.version file. 334 * @param version Version to persist 335 * @return Serialized protobuf with <code>version</code> content and a bit of pb magic for a 336 * prefix. 337 */ 338 static byte[] toVersionByteArray(final String version) { 339 FSProtos.HBaseVersionFileContent.Builder builder = 340 FSProtos.HBaseVersionFileContent.newBuilder(); 341 return ProtobufUtil.prependPBMagic(builder.setVersion(version).build().toByteArray()); 342 } 343 344 /** 345 * Verifies current version of file system 346 * @param fs file system 347 * @param rootdir root directory of HBase installation 348 * @param message if true, issues a message on System.out 349 * @throws IOException if the version file cannot be opened 350 * @throws DeserializationException if the contents of the version file cannot be parsed 351 */ 352 public static void checkVersion(FileSystem fs, Path rootdir, boolean message) 353 throws IOException, DeserializationException { 354 checkVersion(fs, rootdir, message, 0, HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS); 355 } 356 357 /** 358 * Verifies current version of file system 359 * @param fs file system 360 * @param rootdir root directory of HBase installation 361 * @param message if true, issues a message on System.out 362 * @param wait wait interval 363 * @param retries number of times to retry 364 * @throws IOException if the version file cannot be opened 365 * @throws DeserializationException if the contents of the version file cannot be parsed 366 */ 367 public static void checkVersion(FileSystem fs, Path rootdir, boolean message, int wait, 368 int retries) throws IOException, DeserializationException { 369 String version = getVersion(fs, rootdir); 370 String msg; 371 if (version == null) { 372 if (!metaRegionExists(fs, rootdir)) { 373 // rootDir is empty (no version file and no root region) 374 // just create new version file (HBASE-1195) 375 setVersion(fs, rootdir, wait, retries); 376 return; 377 } else { 378 msg = "hbase.version file is missing. Is your hbase.rootdir valid? " 379 + "You can restore hbase.version file by running 'HBCK2 filesystem -fix'. " 380 + "See https://github.com/apache/hbase-operator-tools/tree/master/hbase-hbck2"; 381 } 382 } else if (version.compareTo(HConstants.FILE_SYSTEM_VERSION) == 0) { 383 return; 384 } else { 385 msg = "HBase file layout needs to be upgraded. Current filesystem version is " + version 386 + " but software requires version " + HConstants.FILE_SYSTEM_VERSION 387 + ". Consult http://hbase.apache.org/book.html for further information about " 388 + "upgrading HBase."; 389 } 390 391 // version is deprecated require migration 392 // Output on stdout so user sees it in terminal. 393 if (message) { 394 System.out.println("WARNING! " + msg); 395 } 396 throw new FileSystemVersionException(msg); 397 } 398 399 /** 400 * Sets version of file system 401 * @param fs filesystem object 402 * @param rootdir hbase root 403 * @throws IOException e 404 */ 405 public static void setVersion(FileSystem fs, Path rootdir) throws IOException { 406 setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, 0, 407 HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS); 408 } 409 410 /** 411 * Sets version of file system 412 * @param fs filesystem object 413 * @param rootdir hbase root 414 * @param wait time to wait for retry 415 * @param retries number of times to retry before failing 416 * @throws IOException e 417 */ 418 public static void setVersion(FileSystem fs, Path rootdir, int wait, int retries) 419 throws IOException { 420 setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, wait, retries); 421 } 422 423 /** 424 * Sets version of file system 425 * @param fs filesystem object 426 * @param rootdir hbase root directory 427 * @param version version to set 428 * @param wait time to wait for retry 429 * @param retries number of times to retry before throwing an IOException 430 * @throws IOException e 431 */ 432 public static void setVersion(FileSystem fs, Path rootdir, String version, int wait, int retries) 433 throws IOException { 434 Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME); 435 Path tempVersionFile = new Path(rootdir, 436 HConstants.HBASE_TEMP_DIRECTORY + Path.SEPARATOR + HConstants.VERSION_FILE_NAME); 437 while (true) { 438 try { 439 // Write the version to a temporary file 440 FSDataOutputStream s = fs.create(tempVersionFile); 441 try { 442 s.write(toVersionByteArray(version)); 443 s.close(); 444 s = null; 445 // Move the temp version file to its normal location. Returns false 446 // if the rename failed. Throw an IOE in that case. 447 if (!fs.rename(tempVersionFile, versionFile)) { 448 throw new IOException("Unable to move temp version file to " + versionFile); 449 } 450 } finally { 451 // Cleaning up the temporary if the rename failed would be trying 452 // too hard. We'll unconditionally create it again the next time 453 // through anyway, files are overwritten by default by create(). 454 455 // Attempt to close the stream on the way out if it is still open. 456 try { 457 if (s != null) s.close(); 458 } catch (IOException ignore) { 459 } 460 } 461 LOG.info("Created version file at " + rootdir.toString() + " with version=" + version); 462 return; 463 } catch (IOException e) { 464 if (retries > 0) { 465 LOG.debug("Unable to create version file at " + rootdir.toString() + ", retrying", e); 466 fs.delete(versionFile, false); 467 try { 468 if (wait > 0) { 469 Thread.sleep(wait); 470 } 471 } catch (InterruptedException ie) { 472 throw (InterruptedIOException) new InterruptedIOException().initCause(ie); 473 } 474 retries--; 475 } else { 476 throw e; 477 } 478 } 479 } 480 } 481 482 /** 483 * Checks that a cluster ID file exists in the HBase root directory 484 * @param fs the root directory FileSystem 485 * @param rootdir the HBase root directory in HDFS 486 * @param wait how long to wait between retries 487 * @return <code>true</code> if the file exists, otherwise <code>false</code> 488 * @throws IOException if checking the FileSystem fails 489 */ 490 public static boolean checkClusterIdExists(FileSystem fs, Path rootdir, long wait) 491 throws IOException { 492 while (true) { 493 try { 494 Path filePath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME); 495 return fs.exists(filePath); 496 } catch (IOException ioe) { 497 if (wait > 0L) { 498 LOG.warn("Unable to check cluster ID file in {}, retrying in {}ms", rootdir, wait, ioe); 499 try { 500 Thread.sleep(wait); 501 } catch (InterruptedException e) { 502 Thread.currentThread().interrupt(); 503 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 504 } 505 } else { 506 throw ioe; 507 } 508 } 509 } 510 } 511 512 /** 513 * Returns the value of the unique cluster ID stored for this HBase instance. 514 * @param fs the root directory FileSystem 515 * @param rootdir the path to the HBase root directory 516 * @return the unique cluster identifier 517 * @throws IOException if reading the cluster ID file fails 518 */ 519 public static ClusterId getClusterId(FileSystem fs, Path rootdir) throws IOException { 520 Path idPath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME); 521 ClusterId clusterId = null; 522 FileStatus status = fs.exists(idPath) ? fs.getFileStatus(idPath) : null; 523 if (status != null) { 524 int len = Ints.checkedCast(status.getLen()); 525 byte[] content = new byte[len]; 526 FSDataInputStream in = fs.open(idPath); 527 try { 528 in.readFully(content); 529 } catch (EOFException eof) { 530 LOG.warn("Cluster ID file {} is empty", idPath); 531 } finally { 532 in.close(); 533 } 534 try { 535 clusterId = ClusterId.parseFrom(content); 536 } catch (DeserializationException e) { 537 throw new IOException("content=" + Bytes.toString(content), e); 538 } 539 // If not pb'd, make it so. 540 if (!ProtobufUtil.isPBMagicPrefix(content)) { 541 String cid = null; 542 in = fs.open(idPath); 543 try { 544 cid = in.readUTF(); 545 clusterId = new ClusterId(cid); 546 } catch (EOFException eof) { 547 LOG.warn("Cluster ID file {} is empty", idPath); 548 } finally { 549 in.close(); 550 } 551 rewriteAsPb(fs, rootdir, idPath, clusterId); 552 } 553 return clusterId; 554 } else { 555 LOG.warn("Cluster ID file does not exist at {}", idPath); 556 } 557 return clusterId; 558 } 559 560 /** 561 * */ 562 private static void rewriteAsPb(final FileSystem fs, final Path rootdir, final Path p, 563 final ClusterId cid) throws IOException { 564 // Rewrite the file as pb. Move aside the old one first, write new 565 // then delete the moved-aside file. 566 Path movedAsideName = new Path(p + "." + EnvironmentEdgeManager.currentTime()); 567 if (!fs.rename(p, movedAsideName)) throw new IOException("Failed rename of " + p); 568 setClusterId(fs, rootdir, cid, 100); 569 if (!fs.delete(movedAsideName, false)) { 570 throw new IOException("Failed delete of " + movedAsideName); 571 } 572 LOG.debug("Rewrote the hbase.id file as pb"); 573 } 574 575 /** 576 * Writes a new unique identifier for this cluster to the "hbase.id" file in the HBase root 577 * directory. If any operations on the ID file fails, and {@code wait} is a positive value, the 578 * method will retry to produce the ID file until the thread is forcibly interrupted. 579 * @param fs the root directory FileSystem 580 * @param rootdir the path to the HBase root directory 581 * @param clusterId the unique identifier to store 582 * @param wait how long (in milliseconds) to wait between retries 583 * @throws IOException if writing to the FileSystem fails and no wait value 584 */ 585 public static void setClusterId(final FileSystem fs, final Path rootdir, 586 final ClusterId clusterId, final long wait) throws IOException { 587 588 final Path idFile = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME); 589 final Path tempDir = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY); 590 final Path tempIdFile = new Path(tempDir, HConstants.CLUSTER_ID_FILE_NAME); 591 592 LOG.debug("Create cluster ID file [{}] with ID: {}", idFile, clusterId); 593 594 while (true) { 595 Optional<IOException> failure = Optional.empty(); 596 597 LOG.debug("Write the cluster ID file to a temporary location: {}", tempIdFile); 598 try (FSDataOutputStream s = fs.create(tempIdFile)) { 599 s.write(clusterId.toByteArray()); 600 } catch (IOException ioe) { 601 failure = Optional.of(ioe); 602 } 603 604 if (!failure.isPresent()) { 605 try { 606 LOG.debug("Move the temporary cluster ID file to its target location [{}]:[{}]", 607 tempIdFile, idFile); 608 609 if (!fs.rename(tempIdFile, idFile)) { 610 failure = 611 Optional.of(new IOException("Unable to move temp cluster ID file to " + idFile)); 612 } 613 } catch (IOException ioe) { 614 failure = Optional.of(ioe); 615 } 616 } 617 618 if (failure.isPresent()) { 619 final IOException cause = failure.get(); 620 if (wait > 0L) { 621 LOG.warn("Unable to create cluster ID file in {}, retrying in {}ms", rootdir, wait, 622 cause); 623 try { 624 Thread.sleep(wait); 625 } catch (InterruptedException e) { 626 Thread.currentThread().interrupt(); 627 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 628 } 629 continue; 630 } else { 631 throw cause; 632 } 633 } else { 634 return; 635 } 636 } 637 } 638 639 /** 640 * If DFS, check safe mode and if so, wait until we clear it. 641 * @param conf configuration 642 * @param wait Sleep between retries 643 * @throws IOException e 644 */ 645 public static void waitOnSafeMode(final Configuration conf, final long wait) throws IOException { 646 FileSystem fs = FileSystem.get(conf); 647 if (!(fs instanceof DistributedFileSystem)) return; 648 DistributedFileSystem dfs = (DistributedFileSystem) fs; 649 // Make sure dfs is not in safe mode 650 while (isInSafeMode(dfs)) { 651 LOG.info("Waiting for dfs to exit safe mode..."); 652 try { 653 Thread.sleep(wait); 654 } catch (InterruptedException e) { 655 Thread.currentThread().interrupt(); 656 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 657 } 658 } 659 } 660 661 /** 662 * Checks if meta region exists 663 * @param fs file system 664 * @param rootDir root directory of HBase installation 665 * @return true if exists 666 */ 667 public static boolean metaRegionExists(FileSystem fs, Path rootDir) throws IOException { 668 Path metaRegionDir = getRegionDirFromRootDir(rootDir, RegionInfoBuilder.FIRST_META_REGIONINFO); 669 return fs.exists(metaRegionDir); 670 } 671 672 /** 673 * Compute HDFS block distribution of a given HdfsDataInputStream. All HdfsDataInputStreams are 674 * backed by a series of LocatedBlocks, which are fetched periodically from the namenode. This 675 * method retrieves those blocks from the input stream and uses them to calculate 676 * HDFSBlockDistribution. The underlying method in DFSInputStream does attempt to use locally 677 * cached blocks, but may hit the namenode if the cache is determined to be incomplete. The method 678 * also involves making copies of all LocatedBlocks rather than return the underlying blocks 679 * themselves. 680 */ 681 static public HDFSBlocksDistribution 682 computeHDFSBlocksDistribution(HdfsDataInputStream inputStream) throws IOException { 683 List<LocatedBlock> blocks = inputStream.getAllBlocks(); 684 HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution(); 685 for (LocatedBlock block : blocks) { 686 String[] hosts = getHostsForLocations(block); 687 long len = block.getBlockSize(); 688 StorageType[] storageTypes = block.getStorageTypes(); 689 blocksDistribution.addHostsAndBlockWeight(hosts, len, storageTypes); 690 } 691 return blocksDistribution; 692 } 693 694 private static String[] getHostsForLocations(LocatedBlock block) { 695 DatanodeInfo[] locations = getLocatedBlockLocations(block); 696 String[] hosts = new String[locations.length]; 697 for (int i = 0; i < hosts.length; i++) { 698 hosts[i] = locations[i].getHostName(); 699 } 700 return hosts; 701 } 702 703 /** 704 * Compute HDFS blocks distribution of a given file, or a portion of the file 705 * @param fs file system 706 * @param status file status of the file 707 * @param start start position of the portion 708 * @param length length of the portion 709 * @return The HDFS blocks distribution 710 */ 711 static public HDFSBlocksDistribution computeHDFSBlocksDistribution(final FileSystem fs, 712 FileStatus status, long start, long length) throws IOException { 713 HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution(); 714 BlockLocation[] blockLocations = fs.getFileBlockLocations(status, start, length); 715 addToHDFSBlocksDistribution(blocksDistribution, blockLocations); 716 return blocksDistribution; 717 } 718 719 /** 720 * Update blocksDistribution with blockLocations 721 * @param blocksDistribution the hdfs blocks distribution 722 * @param blockLocations an array containing block location 723 */ 724 static public void addToHDFSBlocksDistribution(HDFSBlocksDistribution blocksDistribution, 725 BlockLocation[] blockLocations) throws IOException { 726 for (BlockLocation bl : blockLocations) { 727 String[] hosts = bl.getHosts(); 728 long len = bl.getLength(); 729 StorageType[] storageTypes = bl.getStorageTypes(); 730 blocksDistribution.addHostsAndBlockWeight(hosts, len, storageTypes); 731 } 732 } 733 734 // TODO move this method OUT of FSUtils. No dependencies to HMaster 735 /** 736 * Returns the total overall fragmentation percentage. Includes hbase:meta and -ROOT- as well. 737 * @param master The master defining the HBase root and file system 738 * @return A map for each table and its percentage (never null) 739 * @throws IOException When scanning the directory fails 740 */ 741 public static int getTotalTableFragmentation(final HMaster master) throws IOException { 742 Map<String, Integer> map = getTableFragmentation(master); 743 return map.isEmpty() ? -1 : map.get("-TOTAL-"); 744 } 745 746 /** 747 * Runs through the HBase rootdir and checks how many stores for each table have more than one 748 * file in them. Checks -ROOT- and hbase:meta too. The total percentage across all tables is 749 * stored under the special key "-TOTAL-". 750 * @param master The master defining the HBase root and file system. 751 * @return A map for each table and its percentage (never null). 752 * @throws IOException When scanning the directory fails. 753 */ 754 public static Map<String, Integer> getTableFragmentation(final HMaster master) 755 throws IOException { 756 Path path = CommonFSUtils.getRootDir(master.getConfiguration()); 757 // since HMaster.getFileSystem() is package private 758 FileSystem fs = path.getFileSystem(master.getConfiguration()); 759 return getTableFragmentation(fs, path); 760 } 761 762 /** 763 * Runs through the HBase rootdir and checks how many stores for each table have more than one 764 * file in them. Checks -ROOT- and hbase:meta too. The total percentage across all tables is 765 * stored under the special key "-TOTAL-". 766 * @param fs The file system to use 767 * @param hbaseRootDir The root directory to scan 768 * @return A map for each table and its percentage (never null) 769 * @throws IOException When scanning the directory fails 770 */ 771 public static Map<String, Integer> getTableFragmentation(final FileSystem fs, 772 final Path hbaseRootDir) throws IOException { 773 Map<String, Integer> frags = new HashMap<>(); 774 int cfCountTotal = 0; 775 int cfFragTotal = 0; 776 PathFilter regionFilter = new RegionDirFilter(fs); 777 PathFilter familyFilter = new FamilyDirFilter(fs); 778 List<Path> tableDirs = getTableDirs(fs, hbaseRootDir); 779 for (Path d : tableDirs) { 780 int cfCount = 0; 781 int cfFrag = 0; 782 FileStatus[] regionDirs = fs.listStatus(d, regionFilter); 783 for (FileStatus regionDir : regionDirs) { 784 Path dd = regionDir.getPath(); 785 // else its a region name, now look in region for families 786 FileStatus[] familyDirs = fs.listStatus(dd, familyFilter); 787 for (FileStatus familyDir : familyDirs) { 788 cfCount++; 789 cfCountTotal++; 790 Path family = familyDir.getPath(); 791 // now in family make sure only one file 792 FileStatus[] familyStatus = fs.listStatus(family); 793 if (familyStatus.length > 1) { 794 cfFrag++; 795 cfFragTotal++; 796 } 797 } 798 } 799 // compute percentage per table and store in result list 800 frags.put(CommonFSUtils.getTableName(d).getNameAsString(), 801 cfCount == 0 ? 0 : Math.round((float) cfFrag / cfCount * 100)); 802 } 803 // set overall percentage for all tables 804 frags.put("-TOTAL-", 805 cfCountTotal == 0 ? 0 : Math.round((float) cfFragTotal / cfCountTotal * 100)); 806 return frags; 807 } 808 809 public static void renameFile(FileSystem fs, Path src, Path dst) throws IOException { 810 if (fs.exists(dst) && !fs.delete(dst, false)) { 811 throw new IOException("Can not delete " + dst); 812 } 813 if (!fs.rename(src, dst)) { 814 throw new IOException("Can not rename from " + src + " to " + dst); 815 } 816 } 817 818 /** 819 * A {@link PathFilter} that returns only regular files. 820 */ 821 static class FileFilter extends AbstractFileStatusFilter { 822 private final FileSystem fs; 823 824 public FileFilter(final FileSystem fs) { 825 this.fs = fs; 826 } 827 828 @Override 829 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 830 try { 831 return isFile(fs, isDir, p); 832 } catch (IOException e) { 833 LOG.warn("Unable to verify if path={} is a regular file", p, e); 834 return false; 835 } 836 } 837 } 838 839 /** 840 * Directory filter that doesn't include any of the directories in the specified blacklist 841 */ 842 public static class BlackListDirFilter extends AbstractFileStatusFilter { 843 private final FileSystem fs; 844 private List<String> blacklist; 845 846 /** 847 * Create a filter on the givem filesystem with the specified blacklist 848 * @param fs filesystem to filter 849 * @param directoryNameBlackList list of the names of the directories to filter. If 850 * <tt>null</tt>, all directories are returned 851 */ 852 @SuppressWarnings("unchecked") 853 public BlackListDirFilter(final FileSystem fs, final List<String> directoryNameBlackList) { 854 this.fs = fs; 855 blacklist = (List<String>) (directoryNameBlackList == null 856 ? Collections.emptyList() 857 : directoryNameBlackList); 858 } 859 860 @Override 861 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 862 if (!isValidName(p.getName())) { 863 return false; 864 } 865 866 try { 867 return isDirectory(fs, isDir, p); 868 } catch (IOException e) { 869 LOG.warn("An error occurred while verifying if [{}] is a valid directory." 870 + " Returning 'not valid' and continuing.", p, e); 871 return false; 872 } 873 } 874 875 protected boolean isValidName(final String name) { 876 return !blacklist.contains(name); 877 } 878 } 879 880 /** 881 * A {@link PathFilter} that only allows directories. 882 */ 883 public static class DirFilter extends BlackListDirFilter { 884 885 public DirFilter(FileSystem fs) { 886 super(fs, null); 887 } 888 } 889 890 /** 891 * A {@link PathFilter} that returns usertable directories. To get all directories use the 892 * {@link BlackListDirFilter} with a <tt>null</tt> blacklist 893 */ 894 public static class UserTableDirFilter extends BlackListDirFilter { 895 public UserTableDirFilter(FileSystem fs) { 896 super(fs, HConstants.HBASE_NON_TABLE_DIRS); 897 } 898 899 @Override 900 protected boolean isValidName(final String name) { 901 if (!super.isValidName(name)) return false; 902 903 try { 904 TableName.isLegalTableQualifierName(Bytes.toBytes(name)); 905 } catch (IllegalArgumentException e) { 906 LOG.info("Invalid table name: {}", name); 907 return false; 908 } 909 return true; 910 } 911 } 912 913 public static List<Path> getTableDirs(final FileSystem fs, final Path rootdir) 914 throws IOException { 915 List<Path> tableDirs = new ArrayList<>(); 916 Path baseNamespaceDir = new Path(rootdir, HConstants.BASE_NAMESPACE_DIR); 917 if (fs.exists(baseNamespaceDir)) { 918 for (FileStatus status : fs.globStatus(new Path(baseNamespaceDir, "*"))) { 919 tableDirs.addAll(FSUtils.getLocalTableDirs(fs, status.getPath())); 920 } 921 } 922 return tableDirs; 923 } 924 925 /** 926 * @return All the table directories under <code>rootdir</code>. Ignore non table hbase folders 927 * such as .logs, .oldlogs, .corrupt folders. 928 */ 929 public static List<Path> getLocalTableDirs(final FileSystem fs, final Path rootdir) 930 throws IOException { 931 // presumes any directory under hbase.rootdir is a table 932 FileStatus[] dirs = fs.listStatus(rootdir, new UserTableDirFilter(fs)); 933 List<Path> tabledirs = new ArrayList<>(dirs.length); 934 for (FileStatus dir : dirs) { 935 tabledirs.add(dir.getPath()); 936 } 937 return tabledirs; 938 } 939 940 /** 941 * Filter for all dirs that don't start with '.' 942 */ 943 public static class RegionDirFilter extends AbstractFileStatusFilter { 944 // This pattern will accept 0.90+ style hex region dirs and older numeric region dir names. 945 final public static Pattern regionDirPattern = Pattern.compile("^[0-9a-f]*$"); 946 final FileSystem fs; 947 948 public RegionDirFilter(FileSystem fs) { 949 this.fs = fs; 950 } 951 952 @Override 953 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 954 if (!regionDirPattern.matcher(p.getName()).matches()) { 955 return false; 956 } 957 958 try { 959 return isDirectory(fs, isDir, p); 960 } catch (IOException ioe) { 961 // Maybe the file was moved or the fs was disconnected. 962 LOG.warn("Skipping file {} due to IOException", p, ioe); 963 return false; 964 } 965 } 966 } 967 968 /** 969 * Given a particular table dir, return all the regiondirs inside it, excluding files such as 970 * .tableinfo 971 * @param fs A file system for the Path 972 * @param tableDir Path to a specific table directory <hbase.rootdir>/<tabledir> 973 * @return List of paths to valid region directories in table dir. 974 */ 975 public static List<Path> getRegionDirs(final FileSystem fs, final Path tableDir) 976 throws IOException { 977 // assumes we are in a table dir. 978 List<FileStatus> rds = listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs)); 979 if (rds == null) { 980 return Collections.emptyList(); 981 } 982 List<Path> regionDirs = new ArrayList<>(rds.size()); 983 for (FileStatus rdfs : rds) { 984 Path rdPath = rdfs.getPath(); 985 regionDirs.add(rdPath); 986 } 987 return regionDirs; 988 } 989 990 public static Path getRegionDirFromRootDir(Path rootDir, RegionInfo region) { 991 return getRegionDirFromTableDir(CommonFSUtils.getTableDir(rootDir, region.getTable()), region); 992 } 993 994 public static Path getRegionDirFromTableDir(Path tableDir, RegionInfo region) { 995 return getRegionDirFromTableDir(tableDir, 996 ServerRegionReplicaUtil.getRegionInfoForFs(region).getEncodedName()); 997 } 998 999 public static Path getRegionDirFromTableDir(Path tableDir, String encodedRegionName) { 1000 return new Path(tableDir, encodedRegionName); 1001 } 1002 1003 /** 1004 * Filter for all dirs that are legal column family names. This is generally used for colfam dirs 1005 * <hbase.rootdir>/<tabledir>/<regiondir>/<colfamdir>. 1006 */ 1007 public static class FamilyDirFilter extends AbstractFileStatusFilter { 1008 final FileSystem fs; 1009 1010 public FamilyDirFilter(FileSystem fs) { 1011 this.fs = fs; 1012 } 1013 1014 @Override 1015 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 1016 try { 1017 // throws IAE if invalid 1018 ColumnFamilyDescriptorBuilder.isLegalColumnFamilyName(Bytes.toBytes(p.getName())); 1019 } catch (IllegalArgumentException iae) { 1020 // path name is an invalid family name and thus is excluded. 1021 return false; 1022 } 1023 1024 try { 1025 return isDirectory(fs, isDir, p); 1026 } catch (IOException ioe) { 1027 // Maybe the file was moved or the fs was disconnected. 1028 LOG.warn("Skipping file {} due to IOException", p, ioe); 1029 return false; 1030 } 1031 } 1032 } 1033 1034 /** 1035 * Given a particular region dir, return all the familydirs inside it 1036 * @param fs A file system for the Path 1037 * @param regionDir Path to a specific region directory 1038 * @return List of paths to valid family directories in region dir. 1039 */ 1040 public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir) 1041 throws IOException { 1042 // assumes we are in a region dir. 1043 return getFilePaths(fs, regionDir, new FamilyDirFilter(fs)); 1044 } 1045 1046 public static List<Path> getReferenceFilePaths(final FileSystem fs, final Path familyDir) 1047 throws IOException { 1048 return getFilePaths(fs, familyDir, new ReferenceFileFilter(fs)); 1049 } 1050 1051 public static List<Path> getReferenceAndLinkFilePaths(final FileSystem fs, final Path familyDir) 1052 throws IOException { 1053 return getFilePaths(fs, familyDir, new ReferenceAndLinkFileFilter(fs)); 1054 } 1055 1056 private static List<Path> getFilePaths(final FileSystem fs, final Path dir, 1057 final PathFilter pathFilter) throws IOException { 1058 FileStatus[] fds = fs.listStatus(dir, pathFilter); 1059 List<Path> files = new ArrayList<>(fds.length); 1060 for (FileStatus fdfs : fds) { 1061 Path fdPath = fdfs.getPath(); 1062 files.add(fdPath); 1063 } 1064 return files; 1065 } 1066 1067 public static int getRegionReferenceAndLinkFileCount(final FileSystem fs, final Path p) { 1068 int result = 0; 1069 try { 1070 for (Path familyDir : getFamilyDirs(fs, p)) { 1071 result += getReferenceAndLinkFilePaths(fs, familyDir).size(); 1072 } 1073 } catch (IOException e) { 1074 LOG.warn("Error Counting reference files.", e); 1075 } 1076 return result; 1077 } 1078 1079 public static class ReferenceAndLinkFileFilter implements PathFilter { 1080 1081 private final FileSystem fs; 1082 1083 public ReferenceAndLinkFileFilter(FileSystem fs) { 1084 this.fs = fs; 1085 } 1086 1087 @Override 1088 public boolean accept(Path rd) { 1089 try { 1090 // only files can be references. 1091 return !fs.getFileStatus(rd).isDirectory() 1092 && (StoreFileInfo.isReference(rd) || HFileLink.isHFileLink(rd)); 1093 } catch (IOException ioe) { 1094 // Maybe the file was moved or the fs was disconnected. 1095 LOG.warn("Skipping file " + rd + " due to IOException", ioe); 1096 return false; 1097 } 1098 } 1099 } 1100 1101 /** 1102 * Filter for HFiles that excludes reference files. 1103 */ 1104 public static class HFileFilter extends AbstractFileStatusFilter { 1105 final FileSystem fs; 1106 1107 public HFileFilter(FileSystem fs) { 1108 this.fs = fs; 1109 } 1110 1111 @Override 1112 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 1113 if (!StoreFileInfo.isHFile(p) && !StoreFileInfo.isMobFile(p)) { 1114 return false; 1115 } 1116 1117 try { 1118 return isFile(fs, isDir, p); 1119 } catch (IOException ioe) { 1120 // Maybe the file was moved or the fs was disconnected. 1121 LOG.warn("Skipping file {} due to IOException", p, ioe); 1122 return false; 1123 } 1124 } 1125 } 1126 1127 /** 1128 * Filter for HFileLinks (StoreFiles and HFiles not included). the filter itself does not consider 1129 * if a link is file or not. 1130 */ 1131 public static class HFileLinkFilter implements PathFilter { 1132 1133 @Override 1134 public boolean accept(Path p) { 1135 return HFileLink.isHFileLink(p); 1136 } 1137 } 1138 1139 public static class ReferenceFileFilter extends AbstractFileStatusFilter { 1140 1141 private final FileSystem fs; 1142 1143 public ReferenceFileFilter(FileSystem fs) { 1144 this.fs = fs; 1145 } 1146 1147 @Override 1148 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 1149 if (!StoreFileInfo.isReference(p)) { 1150 return false; 1151 } 1152 1153 try { 1154 // only files can be references. 1155 return isFile(fs, isDir, p); 1156 } catch (IOException ioe) { 1157 // Maybe the file was moved or the fs was disconnected. 1158 LOG.warn("Skipping file {} due to IOException", p, ioe); 1159 return false; 1160 } 1161 } 1162 } 1163 1164 /** 1165 * Called every so-often by storefile map builder getTableStoreFilePathMap to report progress. 1166 */ 1167 interface ProgressReporter { 1168 /** 1169 * @param status File or directory we are about to process. 1170 */ 1171 void progress(FileStatus status); 1172 } 1173 1174 /** 1175 * Runs through the HBase rootdir/tablename and creates a reverse lookup map for table StoreFile 1176 * names to the full Path. <br> 1177 * Example...<br> 1178 * Key = 3944417774205889744 <br> 1179 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1180 * @param map map to add values. If null, this method will create and populate one to 1181 * return 1182 * @param fs The file system to use. 1183 * @param hbaseRootDir The root directory to scan. 1184 * @param tableName name of the table to scan. 1185 * @return Map keyed by StoreFile name with a value of the full Path. 1186 * @throws IOException When scanning the directory fails. 1187 */ 1188 public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map, 1189 final FileSystem fs, final Path hbaseRootDir, TableName tableName) 1190 throws IOException, InterruptedException { 1191 return getTableStoreFilePathMap(map, fs, hbaseRootDir, tableName, null, null, 1192 (ProgressReporter) null); 1193 } 1194 1195 /** 1196 * Runs through the HBase rootdir/tablename and creates a reverse lookup map for table StoreFile 1197 * names to the full Path. Note that because this method can be called on a 'live' HBase system 1198 * that we will skip files that no longer exist by the time we traverse them and similarly the 1199 * user of the result needs to consider that some entries in this map may not exist by the time 1200 * this call completes. <br> 1201 * Example...<br> 1202 * Key = 3944417774205889744 <br> 1203 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1204 * @param resultMap map to add values. If null, this method will create and populate one to 1205 * return 1206 * @param fs The file system to use. 1207 * @param hbaseRootDir The root directory to scan. 1208 * @param tableName name of the table to scan. 1209 * @param sfFilter optional path filter to apply to store files 1210 * @param executor optional executor service to parallelize this operation 1211 * @param progressReporter Instance or null; gets called every time we move to new region of 1212 * family dir and for each store file. 1213 * @return Map keyed by StoreFile name with a value of the full Path. 1214 * @throws IOException When scanning the directory fails. 1215 * @deprecated Since 2.3.0. For removal in hbase4. Use ProgressReporter override instead. 1216 */ 1217 @Deprecated 1218 public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap, 1219 final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter, 1220 ExecutorService executor, final HbckErrorReporter progressReporter) 1221 throws IOException, InterruptedException { 1222 return getTableStoreFilePathMap(resultMap, fs, hbaseRootDir, tableName, sfFilter, executor, 1223 new ProgressReporter() { 1224 @Override 1225 public void progress(FileStatus status) { 1226 // status is not used in this implementation. 1227 progressReporter.progress(); 1228 } 1229 }); 1230 } 1231 1232 /** 1233 * Runs through the HBase rootdir/tablename and creates a reverse lookup map for table StoreFile 1234 * names to the full Path. Note that because this method can be called on a 'live' HBase system 1235 * that we will skip files that no longer exist by the time we traverse them and similarly the 1236 * user of the result needs to consider that some entries in this map may not exist by the time 1237 * this call completes. <br> 1238 * Example...<br> 1239 * Key = 3944417774205889744 <br> 1240 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1241 * @param resultMap map to add values. If null, this method will create and populate one to 1242 * return 1243 * @param fs The file system to use. 1244 * @param hbaseRootDir The root directory to scan. 1245 * @param tableName name of the table to scan. 1246 * @param sfFilter optional path filter to apply to store files 1247 * @param executor optional executor service to parallelize this operation 1248 * @param progressReporter Instance or null; gets called every time we move to new region of 1249 * family dir and for each store file. 1250 * @return Map keyed by StoreFile name with a value of the full Path. 1251 * @throws IOException When scanning the directory fails. 1252 * @throws InterruptedException the thread is interrupted, either before or during the activity. 1253 */ 1254 public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap, 1255 final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter, 1256 ExecutorService executor, final ProgressReporter progressReporter) 1257 throws IOException, InterruptedException { 1258 1259 final Map<String, Path> finalResultMap = 1260 resultMap == null ? new ConcurrentHashMap<>(128, 0.75f, 32) : resultMap; 1261 1262 // only include the directory paths to tables 1263 Path tableDir = CommonFSUtils.getTableDir(hbaseRootDir, tableName); 1264 // Inside a table, there are compaction.dir directories to skip. Otherwise, all else 1265 // should be regions. 1266 final FamilyDirFilter familyFilter = new FamilyDirFilter(fs); 1267 final Vector<Exception> exceptions = new Vector<>(); 1268 1269 try { 1270 List<FileStatus> regionDirs = 1271 FSUtils.listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs)); 1272 if (regionDirs == null) { 1273 return finalResultMap; 1274 } 1275 1276 final List<Future<?>> futures = new ArrayList<>(regionDirs.size()); 1277 1278 for (FileStatus regionDir : regionDirs) { 1279 if (null != progressReporter) { 1280 progressReporter.progress(regionDir); 1281 } 1282 final Path dd = regionDir.getPath(); 1283 1284 if (!exceptions.isEmpty()) { 1285 break; 1286 } 1287 1288 Runnable getRegionStoreFileMapCall = new Runnable() { 1289 @Override 1290 public void run() { 1291 try { 1292 HashMap<String, Path> regionStoreFileMap = new HashMap<>(); 1293 List<FileStatus> familyDirs = 1294 FSUtils.listStatusWithStatusFilter(fs, dd, familyFilter); 1295 if (familyDirs == null) { 1296 if (!fs.exists(dd)) { 1297 LOG.warn("Skipping region because it no longer exists: " + dd); 1298 } else { 1299 LOG.warn("Skipping region because it has no family dirs: " + dd); 1300 } 1301 return; 1302 } 1303 for (FileStatus familyDir : familyDirs) { 1304 if (null != progressReporter) { 1305 progressReporter.progress(familyDir); 1306 } 1307 Path family = familyDir.getPath(); 1308 if (family.getName().equals(HConstants.RECOVERED_EDITS_DIR)) { 1309 continue; 1310 } 1311 // now in family, iterate over the StoreFiles and 1312 // put in map 1313 FileStatus[] familyStatus = fs.listStatus(family); 1314 for (FileStatus sfStatus : familyStatus) { 1315 if (null != progressReporter) { 1316 progressReporter.progress(sfStatus); 1317 } 1318 Path sf = sfStatus.getPath(); 1319 if (sfFilter == null || sfFilter.accept(sf)) { 1320 regionStoreFileMap.put(sf.getName(), sf); 1321 } 1322 } 1323 } 1324 finalResultMap.putAll(regionStoreFileMap); 1325 } catch (Exception e) { 1326 LOG.error("Could not get region store file map for region: " + dd, e); 1327 exceptions.add(e); 1328 } 1329 } 1330 }; 1331 1332 // If executor is available, submit async tasks to exec concurrently, otherwise 1333 // just do serial sync execution 1334 if (executor != null) { 1335 Future<?> future = executor.submit(getRegionStoreFileMapCall); 1336 futures.add(future); 1337 } else { 1338 FutureTask<?> future = new FutureTask<>(getRegionStoreFileMapCall, null); 1339 future.run(); 1340 futures.add(future); 1341 } 1342 } 1343 1344 // Ensure all pending tasks are complete (or that we run into an exception) 1345 for (Future<?> f : futures) { 1346 if (!exceptions.isEmpty()) { 1347 break; 1348 } 1349 try { 1350 f.get(); 1351 } catch (ExecutionException e) { 1352 LOG.error("Unexpected exec exception! Should've been caught already. (Bug?)", e); 1353 // Shouldn't happen, we already logged/caught any exceptions in the Runnable 1354 } 1355 } 1356 } catch (IOException e) { 1357 LOG.error("Cannot execute getTableStoreFilePathMap for " + tableName, e); 1358 exceptions.add(e); 1359 } finally { 1360 if (!exceptions.isEmpty()) { 1361 // Just throw the first exception as an indication something bad happened 1362 // Don't need to propagate all the exceptions, we already logged them all anyway 1363 Throwables.propagateIfPossible(exceptions.firstElement(), IOException.class); 1364 throw new IOException(exceptions.firstElement()); 1365 } 1366 } 1367 1368 return finalResultMap; 1369 } 1370 1371 public static int getRegionReferenceFileCount(final FileSystem fs, final Path p) { 1372 int result = 0; 1373 try { 1374 for (Path familyDir : getFamilyDirs(fs, p)) { 1375 result += getReferenceFilePaths(fs, familyDir).size(); 1376 } 1377 } catch (IOException e) { 1378 LOG.warn("Error counting reference files", e); 1379 } 1380 return result; 1381 } 1382 1383 /** 1384 * Runs through the HBase rootdir and creates a reverse lookup map for table StoreFile names to 1385 * the full Path. <br> 1386 * Example...<br> 1387 * Key = 3944417774205889744 <br> 1388 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1389 * @param fs The file system to use. 1390 * @param hbaseRootDir The root directory to scan. 1391 * @return Map keyed by StoreFile name with a value of the full Path. 1392 * @throws IOException When scanning the directory fails. 1393 */ 1394 public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs, 1395 final Path hbaseRootDir) throws IOException, InterruptedException { 1396 return getTableStoreFilePathMap(fs, hbaseRootDir, null, null, (ProgressReporter) null); 1397 } 1398 1399 /** 1400 * Runs through the HBase rootdir and creates a reverse lookup map for table StoreFile names to 1401 * the full Path. <br> 1402 * Example...<br> 1403 * Key = 3944417774205889744 <br> 1404 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1405 * @param fs The file system to use. 1406 * @param hbaseRootDir The root directory to scan. 1407 * @param sfFilter optional path filter to apply to store files 1408 * @param executor optional executor service to parallelize this operation 1409 * @param progressReporter Instance or null; gets called every time we move to new region of 1410 * family dir and for each store file. 1411 * @return Map keyed by StoreFile name with a value of the full Path. 1412 * @throws IOException When scanning the directory fails. 1413 * @deprecated Since 2.3.0. Will be removed in hbase4. Used 1414 * {@link #getTableStoreFilePathMap(FileSystem, Path, PathFilter, ExecutorService, ProgressReporter)} 1415 */ 1416 @Deprecated 1417 public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs, 1418 final Path hbaseRootDir, PathFilter sfFilter, ExecutorService executor, 1419 HbckErrorReporter progressReporter) throws IOException, InterruptedException { 1420 return getTableStoreFilePathMap(fs, hbaseRootDir, sfFilter, executor, new ProgressReporter() { 1421 @Override 1422 public void progress(FileStatus status) { 1423 // status is not used in this implementation. 1424 progressReporter.progress(); 1425 } 1426 }); 1427 } 1428 1429 /** 1430 * Runs through the HBase rootdir and creates a reverse lookup map for table StoreFile names to 1431 * the full Path. <br> 1432 * Example...<br> 1433 * Key = 3944417774205889744 <br> 1434 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1435 * @param fs The file system to use. 1436 * @param hbaseRootDir The root directory to scan. 1437 * @param sfFilter optional path filter to apply to store files 1438 * @param executor optional executor service to parallelize this operation 1439 * @param progressReporter Instance or null; gets called every time we move to new region of 1440 * family dir and for each store file. 1441 * @return Map keyed by StoreFile name with a value of the full Path. 1442 * @throws IOException When scanning the directory fails. 1443 */ 1444 public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs, 1445 final Path hbaseRootDir, PathFilter sfFilter, ExecutorService executor, 1446 ProgressReporter progressReporter) throws IOException, InterruptedException { 1447 ConcurrentHashMap<String, Path> map = new ConcurrentHashMap<>(1024, 0.75f, 32); 1448 1449 // if this method looks similar to 'getTableFragmentation' that is because 1450 // it was borrowed from it. 1451 1452 // only include the directory paths to tables 1453 for (Path tableDir : FSUtils.getTableDirs(fs, hbaseRootDir)) { 1454 getTableStoreFilePathMap(map, fs, hbaseRootDir, CommonFSUtils.getTableName(tableDir), 1455 sfFilter, executor, progressReporter); 1456 } 1457 return map; 1458 } 1459 1460 /** 1461 * Filters FileStatuses in an array and returns a list 1462 * @param input An array of FileStatuses 1463 * @param filter A required filter to filter the array 1464 * @return A list of FileStatuses 1465 */ 1466 public static List<FileStatus> filterFileStatuses(FileStatus[] input, FileStatusFilter filter) { 1467 if (input == null) return null; 1468 return filterFileStatuses(Iterators.forArray(input), filter); 1469 } 1470 1471 /** 1472 * Filters FileStatuses in an iterator and returns a list 1473 * @param input An iterator of FileStatuses 1474 * @param filter A required filter to filter the array 1475 * @return A list of FileStatuses 1476 */ 1477 public static List<FileStatus> filterFileStatuses(Iterator<FileStatus> input, 1478 FileStatusFilter filter) { 1479 if (input == null) return null; 1480 ArrayList<FileStatus> results = new ArrayList<>(); 1481 while (input.hasNext()) { 1482 FileStatus f = input.next(); 1483 if (filter.accept(f)) { 1484 results.add(f); 1485 } 1486 } 1487 return results; 1488 } 1489 1490 /** 1491 * Calls fs.listStatus() and treats FileNotFoundException as non-fatal This accommodates 1492 * differences between hadoop versions, where hadoop 1 does not throw a FileNotFoundException, and 1493 * return an empty FileStatus[] while Hadoop 2 will throw FileNotFoundException. 1494 * @param fs file system 1495 * @param dir directory 1496 * @param filter file status filter 1497 * @return null if dir is empty or doesn't exist, otherwise FileStatus list 1498 */ 1499 public static List<FileStatus> listStatusWithStatusFilter(final FileSystem fs, final Path dir, 1500 final FileStatusFilter filter) throws IOException { 1501 FileStatus[] status = null; 1502 try { 1503 status = fs.listStatus(dir); 1504 } catch (FileNotFoundException fnfe) { 1505 LOG.trace("{} does not exist", dir); 1506 return null; 1507 } 1508 1509 if (ArrayUtils.getLength(status) == 0) { 1510 return null; 1511 } 1512 1513 if (filter == null) { 1514 return Arrays.asList(status); 1515 } else { 1516 List<FileStatus> status2 = filterFileStatuses(status, filter); 1517 if (status2 == null || status2.isEmpty()) { 1518 return null; 1519 } else { 1520 return status2; 1521 } 1522 } 1523 } 1524 1525 /** 1526 * This function is to scan the root path of the file system to get the degree of locality for 1527 * each region on each of the servers having at least one block of that region. This is used by 1528 * the tool {@link org.apache.hadoop.hbase.master.RegionPlacementMaintainer} the configuration to 1529 * use 1530 * @return the mapping from region encoded name to a map of server names to locality fraction in 1531 * case of file system errors or interrupts 1532 */ 1533 public static Map<String, Map<String, Float>> 1534 getRegionDegreeLocalityMappingFromFS(final Configuration conf) throws IOException { 1535 return getRegionDegreeLocalityMappingFromFS(conf, null, 1536 conf.getInt(THREAD_POOLSIZE, DEFAULT_THREAD_POOLSIZE)); 1537 1538 } 1539 1540 /** 1541 * This function is to scan the root path of the file system to get the degree of locality for 1542 * each region on each of the servers having at least one block of that region. the configuration 1543 * to use the table you wish to scan locality for the thread pool size to use 1544 * @return the mapping from region encoded name to a map of server names to locality fraction in 1545 * case of file system errors or interrupts 1546 */ 1547 public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS( 1548 final Configuration conf, final String desiredTable, int threadPoolSize) throws IOException { 1549 Map<String, Map<String, Float>> regionDegreeLocalityMapping = new ConcurrentHashMap<>(); 1550 getRegionLocalityMappingFromFS(conf, desiredTable, threadPoolSize, regionDegreeLocalityMapping); 1551 return regionDegreeLocalityMapping; 1552 } 1553 1554 /** 1555 * This function is to scan the root path of the file system to get either the mapping between the 1556 * region name and its best locality region server or the degree of locality of each region on 1557 * each of the servers having at least one block of that region. The output map parameters are 1558 * both optional. the configuration to use the table you wish to scan locality for the thread pool 1559 * size to use the map into which to put the locality degree mapping or null, must be a 1560 * thread-safe implementation in case of file system errors or interrupts 1561 */ 1562 private static void getRegionLocalityMappingFromFS(final Configuration conf, 1563 final String desiredTable, int threadPoolSize, 1564 final Map<String, Map<String, Float>> regionDegreeLocalityMapping) throws IOException { 1565 final FileSystem fs = FileSystem.get(conf); 1566 final Path rootPath = CommonFSUtils.getRootDir(conf); 1567 final long startTime = EnvironmentEdgeManager.currentTime(); 1568 final Path queryPath; 1569 // The table files are in ${hbase.rootdir}/data/<namespace>/<table>/* 1570 if (null == desiredTable) { 1571 queryPath = 1572 new Path(new Path(rootPath, HConstants.BASE_NAMESPACE_DIR).toString() + "/*/*/*/"); 1573 } else { 1574 queryPath = new Path( 1575 CommonFSUtils.getTableDir(rootPath, TableName.valueOf(desiredTable)).toString() + "/*/"); 1576 } 1577 1578 // reject all paths that are not appropriate 1579 PathFilter pathFilter = new PathFilter() { 1580 @Override 1581 public boolean accept(Path path) { 1582 // this is the region name; it may get some noise data 1583 if (null == path) { 1584 return false; 1585 } 1586 1587 // no parent? 1588 Path parent = path.getParent(); 1589 if (null == parent) { 1590 return false; 1591 } 1592 1593 String regionName = path.getName(); 1594 if (null == regionName) { 1595 return false; 1596 } 1597 1598 if (!regionName.toLowerCase(Locale.ROOT).matches("[0-9a-f]+")) { 1599 return false; 1600 } 1601 return true; 1602 } 1603 }; 1604 1605 FileStatus[] statusList = fs.globStatus(queryPath, pathFilter); 1606 1607 if (LOG.isDebugEnabled()) { 1608 LOG.debug("Query Path: {} ; # list of files: {}", queryPath, Arrays.toString(statusList)); 1609 } 1610 1611 if (null == statusList) { 1612 return; 1613 } 1614 1615 // lower the number of threads in case we have very few expected regions 1616 threadPoolSize = Math.min(threadPoolSize, statusList.length); 1617 1618 // run in multiple threads 1619 final ExecutorService tpe = Executors.newFixedThreadPool(threadPoolSize, 1620 new ThreadFactoryBuilder().setNameFormat("FSRegionQuery-pool-%d").setDaemon(true) 1621 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 1622 try { 1623 // ignore all file status items that are not of interest 1624 for (FileStatus regionStatus : statusList) { 1625 if (null == regionStatus || !regionStatus.isDirectory()) { 1626 continue; 1627 } 1628 1629 final Path regionPath = regionStatus.getPath(); 1630 if (null != regionPath) { 1631 tpe.execute(new FSRegionScanner(fs, regionPath, null, regionDegreeLocalityMapping)); 1632 } 1633 } 1634 } finally { 1635 tpe.shutdown(); 1636 final long threadWakeFrequency = (long) conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 1637 HConstants.DEFAULT_THREAD_WAKE_FREQUENCY); 1638 try { 1639 // here we wait until TPE terminates, which is either naturally or by 1640 // exceptions in the execution of the threads 1641 while (!tpe.awaitTermination(threadWakeFrequency, TimeUnit.MILLISECONDS)) { 1642 // printing out rough estimate, so as to not introduce 1643 // AtomicInteger 1644 LOG.info("Locality checking is underway: { Scanned Regions : " 1645 + ((ThreadPoolExecutor) tpe).getCompletedTaskCount() + "/" 1646 + ((ThreadPoolExecutor) tpe).getTaskCount() + " }"); 1647 } 1648 } catch (InterruptedException e) { 1649 Thread.currentThread().interrupt(); 1650 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 1651 } 1652 } 1653 1654 long overhead = EnvironmentEdgeManager.currentTime() - startTime; 1655 LOG.info("Scan DFS for locality info takes {}ms", overhead); 1656 } 1657 1658 /** 1659 * Do our short circuit read setup. Checks buffer size to use and whether to do checksumming in 1660 * hbase or hdfs. 1661 */ 1662 public static void setupShortCircuitRead(final Configuration conf) { 1663 // Check that the user has not set the "dfs.client.read.shortcircuit.skip.checksum" property. 1664 boolean shortCircuitSkipChecksum = 1665 conf.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false); 1666 boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true); 1667 if (shortCircuitSkipChecksum) { 1668 LOG.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not " 1669 + "be set to true." 1670 + (useHBaseChecksum 1671 ? " HBase checksum doesn't require " 1672 + "it, see https://issues.apache.org/jira/browse/HBASE-6868." 1673 : "")); 1674 assert !shortCircuitSkipChecksum; // this will fail if assertions are on 1675 } 1676 checkShortCircuitReadBufferSize(conf); 1677 } 1678 1679 /** 1680 * Check if short circuit read buffer size is set and if not, set it to hbase value. 1681 */ 1682 public static void checkShortCircuitReadBufferSize(final Configuration conf) { 1683 final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2; 1684 final int notSet = -1; 1685 // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2 1686 final String dfsKey = "dfs.client.read.shortcircuit.buffer.size"; 1687 int size = conf.getInt(dfsKey, notSet); 1688 // If a size is set, return -- we will use it. 1689 if (size != notSet) return; 1690 // But short circuit buffer size is normally not set. Put in place the hbase wanted size. 1691 int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize); 1692 conf.setIfUnset(dfsKey, Integer.toString(hbaseSize)); 1693 } 1694 1695 /** 1696 * Returns The DFSClient DFSHedgedReadMetrics instance or null if can't be found or not on hdfs. 1697 */ 1698 public static DFSHedgedReadMetrics getDFSHedgedReadMetrics(final Configuration c) 1699 throws IOException { 1700 if (!CommonFSUtils.isHDFS(c)) { 1701 return null; 1702 } 1703 // getHedgedReadMetrics is package private. Get the DFSClient instance that is internal 1704 // to the DFS FS instance and make the method getHedgedReadMetrics accessible, then invoke it 1705 // to get the singleton instance of DFSHedgedReadMetrics shared by DFSClients. 1706 final String name = "getHedgedReadMetrics"; 1707 DFSClient dfsclient = ((DistributedFileSystem) FileSystem.get(c)).getClient(); 1708 Method m; 1709 try { 1710 m = dfsclient.getClass().getDeclaredMethod(name); 1711 } catch (NoSuchMethodException e) { 1712 LOG.warn( 1713 "Failed find method " + name + " in dfsclient; no hedged read metrics: " + e.getMessage()); 1714 return null; 1715 } catch (SecurityException e) { 1716 LOG.warn( 1717 "Failed find method " + name + " in dfsclient; no hedged read metrics: " + e.getMessage()); 1718 return null; 1719 } 1720 m.setAccessible(true); 1721 try { 1722 return (DFSHedgedReadMetrics) m.invoke(dfsclient); 1723 } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { 1724 LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " 1725 + e.getMessage()); 1726 return null; 1727 } 1728 } 1729 1730 public static List<Path> copyFilesParallel(FileSystem srcFS, Path src, FileSystem dstFS, Path dst, 1731 Configuration conf, int threads) throws IOException { 1732 ExecutorService pool = Executors.newFixedThreadPool(threads); 1733 List<Future<Void>> futures = new ArrayList<>(); 1734 List<Path> traversedPaths; 1735 try { 1736 traversedPaths = copyFiles(srcFS, src, dstFS, dst, conf, pool, futures); 1737 for (Future<Void> future : futures) { 1738 future.get(); 1739 } 1740 } catch (ExecutionException | InterruptedException | IOException e) { 1741 throw new IOException("Copy snapshot reference files failed", e); 1742 } finally { 1743 pool.shutdownNow(); 1744 } 1745 return traversedPaths; 1746 } 1747 1748 private static List<Path> copyFiles(FileSystem srcFS, Path src, FileSystem dstFS, Path dst, 1749 Configuration conf, ExecutorService pool, List<Future<Void>> futures) throws IOException { 1750 List<Path> traversedPaths = new ArrayList<>(); 1751 traversedPaths.add(dst); 1752 FileStatus currentFileStatus = srcFS.getFileStatus(src); 1753 if (currentFileStatus.isDirectory()) { 1754 if (!dstFS.mkdirs(dst)) { 1755 throw new IOException("Create directory failed: " + dst); 1756 } 1757 FileStatus[] subPaths = srcFS.listStatus(src); 1758 for (FileStatus subPath : subPaths) { 1759 traversedPaths.addAll(copyFiles(srcFS, subPath.getPath(), dstFS, 1760 new Path(dst, subPath.getPath().getName()), conf, pool, futures)); 1761 } 1762 } else { 1763 Future<Void> future = pool.submit(() -> { 1764 FileUtil.copy(srcFS, src, dstFS, dst, false, false, conf); 1765 return null; 1766 }); 1767 futures.add(future); 1768 } 1769 return traversedPaths; 1770 } 1771 1772 /** Returns A set containing all namenode addresses of fs */ 1773 private static Set<InetSocketAddress> getNNAddresses(DistributedFileSystem fs, 1774 Configuration conf) { 1775 Set<InetSocketAddress> addresses = new HashSet<>(); 1776 String serviceName = fs.getCanonicalServiceName(); 1777 1778 if (serviceName.startsWith("ha-hdfs")) { 1779 try { 1780 Map<String, Map<String, InetSocketAddress>> addressMap = 1781 DFSUtil.getNNServiceRpcAddressesForCluster(conf); 1782 String nameService = serviceName.substring(serviceName.indexOf(":") + 1); 1783 if (addressMap.containsKey(nameService)) { 1784 Map<String, InetSocketAddress> nnMap = addressMap.get(nameService); 1785 for (Map.Entry<String, InetSocketAddress> e2 : nnMap.entrySet()) { 1786 InetSocketAddress addr = e2.getValue(); 1787 addresses.add(addr); 1788 } 1789 } 1790 } catch (Exception e) { 1791 LOG.warn("DFSUtil.getNNServiceRpcAddresses failed. serviceName=" + serviceName, e); 1792 } 1793 } else { 1794 URI uri = fs.getUri(); 1795 int port = uri.getPort(); 1796 if (port < 0) { 1797 int idx = serviceName.indexOf(':'); 1798 port = Integer.parseInt(serviceName.substring(idx + 1)); 1799 } 1800 InetSocketAddress addr = new InetSocketAddress(uri.getHost(), port); 1801 addresses.add(addr); 1802 } 1803 1804 return addresses; 1805 } 1806 1807 /** 1808 * @param conf the Configuration of HBase 1809 * @return Whether srcFs and desFs are on same hdfs or not 1810 */ 1811 public static boolean isSameHdfs(Configuration conf, FileSystem srcFs, FileSystem desFs) { 1812 // By getCanonicalServiceName, we could make sure both srcFs and desFs 1813 // show a unified format which contains scheme, host and port. 1814 String srcServiceName = srcFs.getCanonicalServiceName(); 1815 String desServiceName = desFs.getCanonicalServiceName(); 1816 1817 if (srcServiceName == null || desServiceName == null) { 1818 return false; 1819 } 1820 if (srcServiceName.equals(desServiceName)) { 1821 return true; 1822 } 1823 if (srcServiceName.startsWith("ha-hdfs") && desServiceName.startsWith("ha-hdfs")) { 1824 Collection<String> internalNameServices = 1825 conf.getTrimmedStringCollection("dfs.internal.nameservices"); 1826 if (!internalNameServices.isEmpty()) { 1827 if (internalNameServices.contains(srcServiceName.split(":")[1])) { 1828 return true; 1829 } else { 1830 return false; 1831 } 1832 } 1833 } 1834 if (srcFs instanceof DistributedFileSystem && desFs instanceof DistributedFileSystem) { 1835 // If one serviceName is an HA format while the other is a non-HA format, 1836 // maybe they refer to the same FileSystem. 1837 // For example, srcFs is "ha-hdfs://nameservices" and desFs is "hdfs://activeNamenode:port" 1838 Set<InetSocketAddress> srcAddrs = getNNAddresses((DistributedFileSystem) srcFs, conf); 1839 Set<InetSocketAddress> desAddrs = getNNAddresses((DistributedFileSystem) desFs, conf); 1840 if (Sets.intersection(srcAddrs, desAddrs).size() > 0) { 1841 return true; 1842 } 1843 } 1844 1845 return false; 1846 } 1847}