001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.util; 020 021import edu.umd.cs.findbugs.annotations.CheckForNull; 022import java.io.ByteArrayInputStream; 023import java.io.DataInputStream; 024import java.io.EOFException; 025import java.io.FileNotFoundException; 026import java.io.IOException; 027import java.io.InterruptedIOException; 028import java.lang.reflect.InvocationTargetException; 029import java.lang.reflect.Method; 030import java.net.InetSocketAddress; 031import java.net.URI; 032import java.util.ArrayList; 033import java.util.Arrays; 034import java.util.Collection; 035import java.util.Collections; 036import java.util.HashMap; 037import java.util.HashSet; 038import java.util.Iterator; 039import java.util.List; 040import java.util.Locale; 041import java.util.Map; 042import java.util.Set; 043import java.util.Vector; 044import java.util.concurrent.ConcurrentHashMap; 045import java.util.concurrent.ExecutionException; 046import java.util.concurrent.ExecutorService; 047import java.util.concurrent.Executors; 048import java.util.concurrent.Future; 049import java.util.concurrent.FutureTask; 050import java.util.concurrent.ThreadPoolExecutor; 051import java.util.concurrent.TimeUnit; 052import java.util.regex.Pattern; 053import org.apache.commons.lang3.ArrayUtils; 054import org.apache.hadoop.conf.Configuration; 055import org.apache.hadoop.fs.BlockLocation; 056import org.apache.hadoop.fs.FSDataInputStream; 057import org.apache.hadoop.fs.FSDataOutputStream; 058import org.apache.hadoop.fs.FileStatus; 059import org.apache.hadoop.fs.FileSystem; 060import org.apache.hadoop.fs.FileUtil; 061import org.apache.hadoop.fs.Path; 062import org.apache.hadoop.fs.PathFilter; 063import org.apache.hadoop.fs.permission.FsPermission; 064import org.apache.hadoop.hbase.ClusterId; 065import org.apache.hadoop.hbase.HColumnDescriptor; 066import org.apache.hadoop.hbase.HConstants; 067import org.apache.hadoop.hbase.HDFSBlocksDistribution; 068import org.apache.hadoop.hbase.TableName; 069import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 070import org.apache.hadoop.hbase.client.RegionInfo; 071import org.apache.hadoop.hbase.client.RegionInfoBuilder; 072import org.apache.hadoop.hbase.exceptions.DeserializationException; 073import org.apache.hadoop.hbase.fs.HFileSystem; 074import org.apache.hadoop.hbase.io.HFileLink; 075import org.apache.hadoop.hbase.master.HMaster; 076import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 077import org.apache.hadoop.hdfs.DFSClient; 078import org.apache.hadoop.hdfs.DFSHedgedReadMetrics; 079import org.apache.hadoop.hdfs.DFSUtil; 080import org.apache.hadoop.hdfs.DistributedFileSystem; 081import org.apache.hadoop.hdfs.protocol.HdfsConstants; 082import org.apache.hadoop.io.IOUtils; 083import org.apache.hadoop.ipc.RemoteException; 084import org.apache.hadoop.util.Progressable; 085import org.apache.hadoop.util.StringUtils; 086import org.apache.yetus.audience.InterfaceAudience; 087import org.slf4j.Logger; 088import org.slf4j.LoggerFactory; 089 090import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 091import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 092import org.apache.hbase.thirdparty.com.google.common.collect.Iterators; 093import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 094import org.apache.hbase.thirdparty.com.google.common.primitives.Ints; 095 096import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 097import org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos; 098 099/** 100 * Utility methods for interacting with the underlying file system. 101 */ 102@InterfaceAudience.Private 103public final class FSUtils { 104 private static final Logger LOG = LoggerFactory.getLogger(FSUtils.class); 105 106 private static final String THREAD_POOLSIZE = "hbase.client.localityCheck.threadPoolSize"; 107 private static final int DEFAULT_THREAD_POOLSIZE = 2; 108 109 /** Set to true on Windows platforms */ 110 @VisibleForTesting // currently only used in testing. TODO refactor into a test class 111 public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows"); 112 113 private FSUtils() { 114 } 115 116 /** 117 * @return True is <code>fs</code> is instance of DistributedFileSystem 118 * @throws IOException 119 */ 120 public static boolean isDistributedFileSystem(final FileSystem fs) throws IOException { 121 FileSystem fileSystem = fs; 122 // If passed an instance of HFileSystem, it fails instanceof DistributedFileSystem. 123 // Check its backing fs for dfs-ness. 124 if (fs instanceof HFileSystem) { 125 fileSystem = ((HFileSystem)fs).getBackingFs(); 126 } 127 return fileSystem instanceof DistributedFileSystem; 128 } 129 130 /** 131 * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the 132 * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true. Does not consider 133 * schema; i.e. if schemas different but path or subpath matches, the two will equate. 134 * @param pathToSearch Path we will be trying to match. 135 * @param pathTail 136 * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code> 137 */ 138 public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) { 139 Path tailPath = pathTail; 140 String tailName; 141 Path toSearch = pathToSearch; 142 String toSearchName; 143 boolean result = false; 144 145 if (pathToSearch.depth() != pathTail.depth()) { 146 return false; 147 } 148 149 do { 150 tailName = tailPath.getName(); 151 if (tailName == null || tailName.isEmpty()) { 152 result = true; 153 break; 154 } 155 toSearchName = toSearch.getName(); 156 if (toSearchName == null || toSearchName.isEmpty()) { 157 break; 158 } 159 // Move up a parent on each path for next go around. Path doesn't let us go off the end. 160 tailPath = tailPath.getParent(); 161 toSearch = toSearch.getParent(); 162 } while(tailName.equals(toSearchName)); 163 return result; 164 } 165 166 /** 167 * Delete the region directory if exists. 168 * @return True if deleted the region directory. 169 * @throws IOException 170 */ 171 public static boolean deleteRegionDir(final Configuration conf, final RegionInfo hri) 172 throws IOException { 173 Path rootDir = CommonFSUtils.getRootDir(conf); 174 FileSystem fs = rootDir.getFileSystem(conf); 175 return CommonFSUtils.deleteDirectory(fs, 176 new Path(CommonFSUtils.getTableDir(rootDir, hri.getTable()), hri.getEncodedName())); 177 } 178 179 /** 180 * Create the specified file on the filesystem. By default, this will: 181 * <ol> 182 * <li>overwrite the file if it exists</li> 183 * <li>apply the umask in the configuration (if it is enabled)</li> 184 * <li>use the fs configured buffer size (or 4096 if not set)</li> 185 * <li>use the configured column family replication or default replication if 186 * {@link ColumnFamilyDescriptorBuilder#DEFAULT_DFS_REPLICATION}</li> 187 * <li>use the default block size</li> 188 * <li>not track progress</li> 189 * </ol> 190 * @param conf configurations 191 * @param fs {@link FileSystem} on which to write the file 192 * @param path {@link Path} to the file to write 193 * @param perm permissions 194 * @param favoredNodes favored data nodes 195 * @return output stream to the created file 196 * @throws IOException if the file cannot be created 197 */ 198 public static FSDataOutputStream create(Configuration conf, FileSystem fs, Path path, 199 FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException { 200 if (fs instanceof HFileSystem) { 201 FileSystem backingFs = ((HFileSystem) fs).getBackingFs(); 202 if (backingFs instanceof DistributedFileSystem) { 203 // Try to use the favoredNodes version via reflection to allow backwards- 204 // compatibility. 205 short replication = Short.parseShort(conf.get(ColumnFamilyDescriptorBuilder.DFS_REPLICATION, 206 String.valueOf(ColumnFamilyDescriptorBuilder.DEFAULT_DFS_REPLICATION))); 207 try { 208 return (FSDataOutputStream) (DistributedFileSystem.class 209 .getDeclaredMethod("create", Path.class, FsPermission.class, boolean.class, int.class, 210 short.class, long.class, Progressable.class, InetSocketAddress[].class) 211 .invoke(backingFs, path, perm, true, CommonFSUtils.getDefaultBufferSize(backingFs), 212 replication > 0 ? replication : CommonFSUtils.getDefaultReplication(backingFs, path), 213 CommonFSUtils.getDefaultBlockSize(backingFs, path), null, favoredNodes)); 214 } catch (InvocationTargetException ite) { 215 // Function was properly called, but threw it's own exception. 216 throw new IOException(ite.getCause()); 217 } catch (NoSuchMethodException e) { 218 LOG.debug("DFS Client does not support most favored nodes create; using default create"); 219 LOG.trace("Ignoring; use default create", e); 220 } catch (IllegalArgumentException | SecurityException | IllegalAccessException e) { 221 LOG.debug("Ignoring (most likely Reflection related exception) " + e); 222 } 223 } 224 } 225 return CommonFSUtils.create(fs, path, perm, true); 226 } 227 228 /** 229 * Checks to see if the specified file system is available 230 * 231 * @param fs filesystem 232 * @throws IOException e 233 */ 234 public static void checkFileSystemAvailable(final FileSystem fs) 235 throws IOException { 236 if (!(fs instanceof DistributedFileSystem)) { 237 return; 238 } 239 IOException exception = null; 240 DistributedFileSystem dfs = (DistributedFileSystem) fs; 241 try { 242 if (dfs.exists(new Path("/"))) { 243 return; 244 } 245 } catch (IOException e) { 246 exception = e instanceof RemoteException ? 247 ((RemoteException)e).unwrapRemoteException() : e; 248 } 249 try { 250 fs.close(); 251 } catch (Exception e) { 252 LOG.error("file system close failed: ", e); 253 } 254 throw new IOException("File system is not available", exception); 255 } 256 257 /** 258 * We use reflection because {@link DistributedFileSystem#setSafeMode( 259 * HdfsConstants.SafeModeAction action, boolean isChecked)} is not in hadoop 1.1 260 * 261 * @param dfs 262 * @return whether we're in safe mode 263 * @throws IOException 264 */ 265 private static boolean isInSafeMode(DistributedFileSystem dfs) throws IOException { 266 boolean inSafeMode = false; 267 try { 268 Method m = DistributedFileSystem.class.getMethod("setSafeMode", new Class<?> []{ 269 org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.class, boolean.class}); 270 inSafeMode = (Boolean) m.invoke(dfs, 271 org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET, true); 272 } catch (Exception e) { 273 if (e instanceof IOException) throw (IOException) e; 274 275 // Check whether dfs is on safemode. 276 inSafeMode = dfs.setSafeMode( 277 org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET); 278 } 279 return inSafeMode; 280 } 281 282 /** 283 * Check whether dfs is in safemode. 284 * @param conf 285 * @throws IOException 286 */ 287 public static void checkDfsSafeMode(final Configuration conf) 288 throws IOException { 289 boolean isInSafeMode = false; 290 FileSystem fs = FileSystem.get(conf); 291 if (fs instanceof DistributedFileSystem) { 292 DistributedFileSystem dfs = (DistributedFileSystem)fs; 293 isInSafeMode = isInSafeMode(dfs); 294 } 295 if (isInSafeMode) { 296 throw new IOException("File system is in safemode, it can't be written now"); 297 } 298 } 299 300 /** 301 * Verifies current version of file system 302 * 303 * @param fs filesystem object 304 * @param rootdir root hbase directory 305 * @return null if no version file exists, version string otherwise 306 * @throws IOException if the version file fails to open 307 * @throws DeserializationException if the version data cannot be translated into a version 308 */ 309 public static String getVersion(FileSystem fs, Path rootdir) 310 throws IOException, DeserializationException { 311 final Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME); 312 FileStatus[] status = null; 313 try { 314 // hadoop 2.0 throws FNFE if directory does not exist. 315 // hadoop 1.0 returns null if directory does not exist. 316 status = fs.listStatus(versionFile); 317 } catch (FileNotFoundException fnfe) { 318 return null; 319 } 320 if (ArrayUtils.getLength(status) == 0) { 321 return null; 322 } 323 String version = null; 324 byte [] content = new byte [(int)status[0].getLen()]; 325 FSDataInputStream s = fs.open(versionFile); 326 try { 327 IOUtils.readFully(s, content, 0, content.length); 328 if (ProtobufUtil.isPBMagicPrefix(content)) { 329 version = parseVersionFrom(content); 330 } else { 331 // Presume it pre-pb format. 332 try (DataInputStream dis = new DataInputStream(new ByteArrayInputStream(content))) { 333 version = dis.readUTF(); 334 } 335 } 336 } catch (EOFException eof) { 337 LOG.warn("Version file was empty, odd, will try to set it."); 338 } finally { 339 s.close(); 340 } 341 return version; 342 } 343 344 /** 345 * Parse the content of the ${HBASE_ROOTDIR}/hbase.version file. 346 * @param bytes The byte content of the hbase.version file 347 * @return The version found in the file as a String 348 * @throws DeserializationException if the version data cannot be translated into a version 349 */ 350 static String parseVersionFrom(final byte [] bytes) 351 throws DeserializationException { 352 ProtobufUtil.expectPBMagicPrefix(bytes); 353 int pblen = ProtobufUtil.lengthOfPBMagic(); 354 FSProtos.HBaseVersionFileContent.Builder builder = 355 FSProtos.HBaseVersionFileContent.newBuilder(); 356 try { 357 ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen); 358 return builder.getVersion(); 359 } catch (IOException e) { 360 // Convert 361 throw new DeserializationException(e); 362 } 363 } 364 365 /** 366 * Create the content to write into the ${HBASE_ROOTDIR}/hbase.version file. 367 * @param version Version to persist 368 * @return Serialized protobuf with <code>version</code> content and a bit of pb magic for a prefix. 369 */ 370 static byte [] toVersionByteArray(final String version) { 371 FSProtos.HBaseVersionFileContent.Builder builder = 372 FSProtos.HBaseVersionFileContent.newBuilder(); 373 return ProtobufUtil.prependPBMagic(builder.setVersion(version).build().toByteArray()); 374 } 375 376 /** 377 * Verifies current version of file system 378 * 379 * @param fs file system 380 * @param rootdir root directory of HBase installation 381 * @param message if true, issues a message on System.out 382 * @throws IOException if the version file cannot be opened 383 * @throws DeserializationException if the contents of the version file cannot be parsed 384 */ 385 public static void checkVersion(FileSystem fs, Path rootdir, boolean message) 386 throws IOException, DeserializationException { 387 checkVersion(fs, rootdir, message, 0, HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS); 388 } 389 390 /** 391 * Verifies current version of file system 392 * 393 * @param fs file system 394 * @param rootdir root directory of HBase installation 395 * @param message if true, issues a message on System.out 396 * @param wait wait interval 397 * @param retries number of times to retry 398 * 399 * @throws IOException if the version file cannot be opened 400 * @throws DeserializationException if the contents of the version file cannot be parsed 401 */ 402 public static void checkVersion(FileSystem fs, Path rootdir, 403 boolean message, int wait, int retries) 404 throws IOException, DeserializationException { 405 String version = getVersion(fs, rootdir); 406 String msg; 407 if (version == null) { 408 if (!metaRegionExists(fs, rootdir)) { 409 // rootDir is empty (no version file and no root region) 410 // just create new version file (HBASE-1195) 411 setVersion(fs, rootdir, wait, retries); 412 return; 413 } else { 414 msg = "hbase.version file is missing. Is your hbase.rootdir valid? " + 415 "You can restore hbase.version file by running 'HBCK2 filesystem -fix'. " + 416 "See https://github.com/apache/hbase-operator-tools/tree/master/hbase-hbck2"; 417 } 418 } else if (version.compareTo(HConstants.FILE_SYSTEM_VERSION) == 0) { 419 return; 420 } else { 421 msg = "HBase file layout needs to be upgraded. Current filesystem version is " + version + 422 " but software requires version " + HConstants.FILE_SYSTEM_VERSION + 423 ". Consult http://hbase.apache.org/book.html for further information about " + 424 "upgrading HBase."; 425 } 426 427 // version is deprecated require migration 428 // Output on stdout so user sees it in terminal. 429 if (message) { 430 System.out.println("WARNING! " + msg); 431 } 432 throw new FileSystemVersionException(msg); 433 } 434 435 /** 436 * Sets version of file system 437 * 438 * @param fs filesystem object 439 * @param rootdir hbase root 440 * @throws IOException e 441 */ 442 public static void setVersion(FileSystem fs, Path rootdir) 443 throws IOException { 444 setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, 0, 445 HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS); 446 } 447 448 /** 449 * Sets version of file system 450 * 451 * @param fs filesystem object 452 * @param rootdir hbase root 453 * @param wait time to wait for retry 454 * @param retries number of times to retry before failing 455 * @throws IOException e 456 */ 457 public static void setVersion(FileSystem fs, Path rootdir, int wait, int retries) 458 throws IOException { 459 setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, wait, retries); 460 } 461 462 463 /** 464 * Sets version of file system 465 * 466 * @param fs filesystem object 467 * @param rootdir hbase root directory 468 * @param version version to set 469 * @param wait time to wait for retry 470 * @param retries number of times to retry before throwing an IOException 471 * @throws IOException e 472 */ 473 public static void setVersion(FileSystem fs, Path rootdir, String version, 474 int wait, int retries) throws IOException { 475 Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME); 476 Path tempVersionFile = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY + Path.SEPARATOR + 477 HConstants.VERSION_FILE_NAME); 478 while (true) { 479 try { 480 // Write the version to a temporary file 481 FSDataOutputStream s = fs.create(tempVersionFile); 482 try { 483 s.write(toVersionByteArray(version)); 484 s.close(); 485 s = null; 486 // Move the temp version file to its normal location. Returns false 487 // if the rename failed. Throw an IOE in that case. 488 if (!fs.rename(tempVersionFile, versionFile)) { 489 throw new IOException("Unable to move temp version file to " + versionFile); 490 } 491 } finally { 492 // Cleaning up the temporary if the rename failed would be trying 493 // too hard. We'll unconditionally create it again the next time 494 // through anyway, files are overwritten by default by create(). 495 496 // Attempt to close the stream on the way out if it is still open. 497 try { 498 if (s != null) s.close(); 499 } catch (IOException ignore) { } 500 } 501 LOG.info("Created version file at " + rootdir.toString() + " with version=" + version); 502 return; 503 } catch (IOException e) { 504 if (retries > 0) { 505 LOG.debug("Unable to create version file at " + rootdir.toString() + ", retrying", e); 506 fs.delete(versionFile, false); 507 try { 508 if (wait > 0) { 509 Thread.sleep(wait); 510 } 511 } catch (InterruptedException ie) { 512 throw (InterruptedIOException)new InterruptedIOException().initCause(ie); 513 } 514 retries--; 515 } else { 516 throw e; 517 } 518 } 519 } 520 } 521 522 /** 523 * Checks that a cluster ID file exists in the HBase root directory 524 * @param fs the root directory FileSystem 525 * @param rootdir the HBase root directory in HDFS 526 * @param wait how long to wait between retries 527 * @return <code>true</code> if the file exists, otherwise <code>false</code> 528 * @throws IOException if checking the FileSystem fails 529 */ 530 public static boolean checkClusterIdExists(FileSystem fs, Path rootdir, 531 long wait) throws IOException { 532 while (true) { 533 try { 534 Path filePath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME); 535 return fs.exists(filePath); 536 } catch (IOException ioe) { 537 if (wait > 0L) { 538 LOG.warn("Unable to check cluster ID file in {}, retrying in {}ms", rootdir, wait, ioe); 539 try { 540 Thread.sleep(wait); 541 } catch (InterruptedException e) { 542 Thread.currentThread().interrupt(); 543 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 544 } 545 } else { 546 throw ioe; 547 } 548 } 549 } 550 } 551 552 /** 553 * Returns the value of the unique cluster ID stored for this HBase instance. 554 * @param fs the root directory FileSystem 555 * @param rootdir the path to the HBase root directory 556 * @return the unique cluster identifier 557 * @throws IOException if reading the cluster ID file fails 558 */ 559 public static ClusterId getClusterId(FileSystem fs, Path rootdir) 560 throws IOException { 561 Path idPath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME); 562 ClusterId clusterId = null; 563 FileStatus status = fs.exists(idPath)? fs.getFileStatus(idPath): null; 564 if (status != null) { 565 int len = Ints.checkedCast(status.getLen()); 566 byte [] content = new byte[len]; 567 FSDataInputStream in = fs.open(idPath); 568 try { 569 in.readFully(content); 570 } catch (EOFException eof) { 571 LOG.warn("Cluster ID file {} is empty", idPath); 572 } finally{ 573 in.close(); 574 } 575 try { 576 clusterId = ClusterId.parseFrom(content); 577 } catch (DeserializationException e) { 578 throw new IOException("content=" + Bytes.toString(content), e); 579 } 580 // If not pb'd, make it so. 581 if (!ProtobufUtil.isPBMagicPrefix(content)) { 582 String cid = null; 583 in = fs.open(idPath); 584 try { 585 cid = in.readUTF(); 586 clusterId = new ClusterId(cid); 587 } catch (EOFException eof) { 588 LOG.warn("Cluster ID file {} is empty", idPath); 589 } finally { 590 in.close(); 591 } 592 rewriteAsPb(fs, rootdir, idPath, clusterId); 593 } 594 return clusterId; 595 } else { 596 LOG.warn("Cluster ID file does not exist at {}", idPath); 597 } 598 return clusterId; 599 } 600 601 /** 602 * @param cid 603 * @throws IOException 604 */ 605 private static void rewriteAsPb(final FileSystem fs, final Path rootdir, final Path p, 606 final ClusterId cid) 607 throws IOException { 608 // Rewrite the file as pb. Move aside the old one first, write new 609 // then delete the moved-aside file. 610 Path movedAsideName = new Path(p + "." + System.currentTimeMillis()); 611 if (!fs.rename(p, movedAsideName)) throw new IOException("Failed rename of " + p); 612 setClusterId(fs, rootdir, cid, 100); 613 if (!fs.delete(movedAsideName, false)) { 614 throw new IOException("Failed delete of " + movedAsideName); 615 } 616 LOG.debug("Rewrote the hbase.id file as pb"); 617 } 618 619 /** 620 * Writes a new unique identifier for this cluster to the "hbase.id" file 621 * in the HBase root directory 622 * @param fs the root directory FileSystem 623 * @param rootdir the path to the HBase root directory 624 * @param clusterId the unique identifier to store 625 * @param wait how long (in milliseconds) to wait between retries 626 * @throws IOException if writing to the FileSystem fails and no wait value 627 */ 628 public static void setClusterId(FileSystem fs, Path rootdir, ClusterId clusterId, 629 int wait) throws IOException { 630 while (true) { 631 try { 632 Path idFile = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME); 633 Path tempIdFile = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY + 634 Path.SEPARATOR + HConstants.CLUSTER_ID_FILE_NAME); 635 // Write the id file to a temporary location 636 FSDataOutputStream s = fs.create(tempIdFile); 637 try { 638 s.write(clusterId.toByteArray()); 639 s.close(); 640 s = null; 641 // Move the temporary file to its normal location. Throw an IOE if 642 // the rename failed 643 if (!fs.rename(tempIdFile, idFile)) { 644 throw new IOException("Unable to move temp version file to " + idFile); 645 } 646 } finally { 647 // Attempt to close the stream if still open on the way out 648 try { 649 if (s != null) s.close(); 650 } catch (IOException ignore) { } 651 } 652 if (LOG.isDebugEnabled()) { 653 LOG.debug("Created cluster ID file at " + idFile.toString() + " with ID: " + clusterId); 654 } 655 return; 656 } catch (IOException ioe) { 657 if (wait > 0) { 658 LOG.warn("Unable to create cluster ID file in " + rootdir.toString() + 659 ", retrying in " + wait + "msec: " + StringUtils.stringifyException(ioe)); 660 try { 661 Thread.sleep(wait); 662 } catch (InterruptedException e) { 663 throw (InterruptedIOException)new InterruptedIOException().initCause(e); 664 } 665 } else { 666 throw ioe; 667 } 668 } 669 } 670 } 671 672 /** 673 * If DFS, check safe mode and if so, wait until we clear it. 674 * @param conf configuration 675 * @param wait Sleep between retries 676 * @throws IOException e 677 */ 678 public static void waitOnSafeMode(final Configuration conf, 679 final long wait) 680 throws IOException { 681 FileSystem fs = FileSystem.get(conf); 682 if (!(fs instanceof DistributedFileSystem)) return; 683 DistributedFileSystem dfs = (DistributedFileSystem)fs; 684 // Make sure dfs is not in safe mode 685 while (isInSafeMode(dfs)) { 686 LOG.info("Waiting for dfs to exit safe mode..."); 687 try { 688 Thread.sleep(wait); 689 } catch (InterruptedException e) { 690 Thread.currentThread().interrupt(); 691 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 692 } 693 } 694 } 695 696 /** 697 * Checks if meta region exists 698 * @param fs file system 699 * @param rootDir root directory of HBase installation 700 * @return true if exists 701 */ 702 public static boolean metaRegionExists(FileSystem fs, Path rootDir) throws IOException { 703 Path metaRegionDir = getRegionDirFromRootDir(rootDir, RegionInfoBuilder.FIRST_META_REGIONINFO); 704 return fs.exists(metaRegionDir); 705 } 706 707 /** 708 * Compute HDFS blocks distribution of a given file, or a portion of the file 709 * @param fs file system 710 * @param status file status of the file 711 * @param start start position of the portion 712 * @param length length of the portion 713 * @return The HDFS blocks distribution 714 */ 715 static public HDFSBlocksDistribution computeHDFSBlocksDistribution( 716 final FileSystem fs, FileStatus status, long start, long length) 717 throws IOException { 718 HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution(); 719 BlockLocation [] blockLocations = 720 fs.getFileBlockLocations(status, start, length); 721 for(BlockLocation bl : blockLocations) { 722 String [] hosts = bl.getHosts(); 723 long len = bl.getLength(); 724 blocksDistribution.addHostsAndBlockWeight(hosts, len); 725 } 726 727 return blocksDistribution; 728 } 729 730 /** 731 * Update blocksDistribution with blockLocations 732 * @param blocksDistribution the hdfs blocks distribution 733 * @param blockLocations an array containing block location 734 */ 735 static public void addToHDFSBlocksDistribution( 736 HDFSBlocksDistribution blocksDistribution, BlockLocation[] blockLocations) 737 throws IOException { 738 for (BlockLocation bl : blockLocations) { 739 String[] hosts = bl.getHosts(); 740 long len = bl.getLength(); 741 blocksDistribution.addHostsAndBlockWeight(hosts, len); 742 } 743 } 744 745 // TODO move this method OUT of FSUtils. No dependencies to HMaster 746 /** 747 * Returns the total overall fragmentation percentage. Includes hbase:meta and 748 * -ROOT- as well. 749 * 750 * @param master The master defining the HBase root and file system 751 * @return A map for each table and its percentage (never null) 752 * @throws IOException When scanning the directory fails 753 */ 754 public static int getTotalTableFragmentation(final HMaster master) 755 throws IOException { 756 Map<String, Integer> map = getTableFragmentation(master); 757 return map.isEmpty() ? -1 : map.get("-TOTAL-"); 758 } 759 760 /** 761 * Runs through the HBase rootdir and checks how many stores for each table 762 * have more than one file in them. Checks -ROOT- and hbase:meta too. The total 763 * percentage across all tables is stored under the special key "-TOTAL-". 764 * 765 * @param master The master defining the HBase root and file system. 766 * @return A map for each table and its percentage (never null). 767 * 768 * @throws IOException When scanning the directory fails. 769 */ 770 public static Map<String, Integer> getTableFragmentation(final HMaster master) 771 throws IOException { 772 Path path = CommonFSUtils.getRootDir(master.getConfiguration()); 773 // since HMaster.getFileSystem() is package private 774 FileSystem fs = path.getFileSystem(master.getConfiguration()); 775 return getTableFragmentation(fs, path); 776 } 777 778 /** 779 * Runs through the HBase rootdir and checks how many stores for each table 780 * have more than one file in them. Checks -ROOT- and hbase:meta too. The total 781 * percentage across all tables is stored under the special key "-TOTAL-". 782 * 783 * @param fs The file system to use 784 * @param hbaseRootDir The root directory to scan 785 * @return A map for each table and its percentage (never null) 786 * @throws IOException When scanning the directory fails 787 */ 788 public static Map<String, Integer> getTableFragmentation( 789 final FileSystem fs, final Path hbaseRootDir) 790 throws IOException { 791 Map<String, Integer> frags = new HashMap<>(); 792 int cfCountTotal = 0; 793 int cfFragTotal = 0; 794 PathFilter regionFilter = new RegionDirFilter(fs); 795 PathFilter familyFilter = new FamilyDirFilter(fs); 796 List<Path> tableDirs = getTableDirs(fs, hbaseRootDir); 797 for (Path d : tableDirs) { 798 int cfCount = 0; 799 int cfFrag = 0; 800 FileStatus[] regionDirs = fs.listStatus(d, regionFilter); 801 for (FileStatus regionDir : regionDirs) { 802 Path dd = regionDir.getPath(); 803 // else its a region name, now look in region for families 804 FileStatus[] familyDirs = fs.listStatus(dd, familyFilter); 805 for (FileStatus familyDir : familyDirs) { 806 cfCount++; 807 cfCountTotal++; 808 Path family = familyDir.getPath(); 809 // now in family make sure only one file 810 FileStatus[] familyStatus = fs.listStatus(family); 811 if (familyStatus.length > 1) { 812 cfFrag++; 813 cfFragTotal++; 814 } 815 } 816 } 817 // compute percentage per table and store in result list 818 frags.put(CommonFSUtils.getTableName(d).getNameAsString(), 819 cfCount == 0? 0: Math.round((float) cfFrag / cfCount * 100)); 820 } 821 // set overall percentage for all tables 822 frags.put("-TOTAL-", 823 cfCountTotal == 0? 0: Math.round((float) cfFragTotal / cfCountTotal * 100)); 824 return frags; 825 } 826 827 /** 828 * A {@link PathFilter} that returns only regular files. 829 */ 830 static class FileFilter extends AbstractFileStatusFilter { 831 private final FileSystem fs; 832 833 public FileFilter(final FileSystem fs) { 834 this.fs = fs; 835 } 836 837 @Override 838 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 839 try { 840 return isFile(fs, isDir, p); 841 } catch (IOException e) { 842 LOG.warn("Unable to verify if path={} is a regular file", p, e); 843 return false; 844 } 845 } 846 } 847 848 /** 849 * Directory filter that doesn't include any of the directories in the specified blacklist 850 */ 851 public static class BlackListDirFilter extends AbstractFileStatusFilter { 852 private final FileSystem fs; 853 private List<String> blacklist; 854 855 /** 856 * Create a filter on the givem filesystem with the specified blacklist 857 * @param fs filesystem to filter 858 * @param directoryNameBlackList list of the names of the directories to filter. If 859 * <tt>null</tt>, all directories are returned 860 */ 861 @SuppressWarnings("unchecked") 862 public BlackListDirFilter(final FileSystem fs, final List<String> directoryNameBlackList) { 863 this.fs = fs; 864 blacklist = 865 (List<String>) (directoryNameBlackList == null ? Collections.emptyList() 866 : directoryNameBlackList); 867 } 868 869 @Override 870 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 871 if (!isValidName(p.getName())) { 872 return false; 873 } 874 875 try { 876 return isDirectory(fs, isDir, p); 877 } catch (IOException e) { 878 LOG.warn("An error occurred while verifying if [{}] is a valid directory." 879 + " Returning 'not valid' and continuing.", p, e); 880 return false; 881 } 882 } 883 884 protected boolean isValidName(final String name) { 885 return !blacklist.contains(name); 886 } 887 } 888 889 /** 890 * A {@link PathFilter} that only allows directories. 891 */ 892 public static class DirFilter extends BlackListDirFilter { 893 894 public DirFilter(FileSystem fs) { 895 super(fs, null); 896 } 897 } 898 899 /** 900 * A {@link PathFilter} that returns usertable directories. To get all directories use the 901 * {@link BlackListDirFilter} with a <tt>null</tt> blacklist 902 */ 903 public static class UserTableDirFilter extends BlackListDirFilter { 904 public UserTableDirFilter(FileSystem fs) { 905 super(fs, HConstants.HBASE_NON_TABLE_DIRS); 906 } 907 908 @Override 909 protected boolean isValidName(final String name) { 910 if (!super.isValidName(name)) 911 return false; 912 913 try { 914 TableName.isLegalTableQualifierName(Bytes.toBytes(name)); 915 } catch (IllegalArgumentException e) { 916 LOG.info("Invalid table name: {}", name); 917 return false; 918 } 919 return true; 920 } 921 } 922 923 public static List<Path> getTableDirs(final FileSystem fs, final Path rootdir) 924 throws IOException { 925 List<Path> tableDirs = new ArrayList<>(); 926 927 for (FileStatus status : fs 928 .globStatus(new Path(rootdir, new Path(HConstants.BASE_NAMESPACE_DIR, "*")))) { 929 tableDirs.addAll(FSUtils.getLocalTableDirs(fs, status.getPath())); 930 } 931 return tableDirs; 932 } 933 934 /** 935 * @param fs 936 * @param rootdir 937 * @return All the table directories under <code>rootdir</code>. Ignore non table hbase folders such as 938 * .logs, .oldlogs, .corrupt folders. 939 * @throws IOException 940 */ 941 public static List<Path> getLocalTableDirs(final FileSystem fs, final Path rootdir) 942 throws IOException { 943 // presumes any directory under hbase.rootdir is a table 944 FileStatus[] dirs = fs.listStatus(rootdir, new UserTableDirFilter(fs)); 945 List<Path> tabledirs = new ArrayList<>(dirs.length); 946 for (FileStatus dir: dirs) { 947 tabledirs.add(dir.getPath()); 948 } 949 return tabledirs; 950 } 951 952 /** 953 * Filter for all dirs that don't start with '.' 954 */ 955 public static class RegionDirFilter extends AbstractFileStatusFilter { 956 // This pattern will accept 0.90+ style hex region dirs and older numeric region dir names. 957 final public static Pattern regionDirPattern = Pattern.compile("^[0-9a-f]*$"); 958 final FileSystem fs; 959 960 public RegionDirFilter(FileSystem fs) { 961 this.fs = fs; 962 } 963 964 @Override 965 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 966 if (!regionDirPattern.matcher(p.getName()).matches()) { 967 return false; 968 } 969 970 try { 971 return isDirectory(fs, isDir, p); 972 } catch (IOException ioe) { 973 // Maybe the file was moved or the fs was disconnected. 974 LOG.warn("Skipping file {} due to IOException", p, ioe); 975 return false; 976 } 977 } 978 } 979 980 /** 981 * Given a particular table dir, return all the regiondirs inside it, excluding files such as 982 * .tableinfo 983 * @param fs A file system for the Path 984 * @param tableDir Path to a specific table directory <hbase.rootdir>/<tabledir> 985 * @return List of paths to valid region directories in table dir. 986 * @throws IOException 987 */ 988 public static List<Path> getRegionDirs(final FileSystem fs, final Path tableDir) throws IOException { 989 // assumes we are in a table dir. 990 List<FileStatus> rds = listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs)); 991 if (rds == null) { 992 return Collections.emptyList(); 993 } 994 List<Path> regionDirs = new ArrayList<>(rds.size()); 995 for (FileStatus rdfs: rds) { 996 Path rdPath = rdfs.getPath(); 997 regionDirs.add(rdPath); 998 } 999 return regionDirs; 1000 } 1001 1002 public static Path getRegionDirFromRootDir(Path rootDir, RegionInfo region) { 1003 return getRegionDirFromTableDir(CommonFSUtils.getTableDir(rootDir, region.getTable()), region); 1004 } 1005 1006 public static Path getRegionDirFromTableDir(Path tableDir, RegionInfo region) { 1007 return getRegionDirFromTableDir(tableDir, 1008 ServerRegionReplicaUtil.getRegionInfoForFs(region).getEncodedName()); 1009 } 1010 1011 public static Path getRegionDirFromTableDir(Path tableDir, String encodedRegionName) { 1012 return new Path(tableDir, encodedRegionName); 1013 } 1014 1015 /** 1016 * Filter for all dirs that are legal column family names. This is generally used for colfam 1017 * dirs <hbase.rootdir>/<tabledir>/<regiondir>/<colfamdir>. 1018 */ 1019 public static class FamilyDirFilter extends AbstractFileStatusFilter { 1020 final FileSystem fs; 1021 1022 public FamilyDirFilter(FileSystem fs) { 1023 this.fs = fs; 1024 } 1025 1026 @Override 1027 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 1028 try { 1029 // throws IAE if invalid 1030 HColumnDescriptor.isLegalFamilyName(Bytes.toBytes(p.getName())); 1031 } catch (IllegalArgumentException iae) { 1032 // path name is an invalid family name and thus is excluded. 1033 return false; 1034 } 1035 1036 try { 1037 return isDirectory(fs, isDir, p); 1038 } catch (IOException ioe) { 1039 // Maybe the file was moved or the fs was disconnected. 1040 LOG.warn("Skipping file {} due to IOException", p, ioe); 1041 return false; 1042 } 1043 } 1044 } 1045 1046 /** 1047 * Given a particular region dir, return all the familydirs inside it 1048 * 1049 * @param fs A file system for the Path 1050 * @param regionDir Path to a specific region directory 1051 * @return List of paths to valid family directories in region dir. 1052 * @throws IOException 1053 */ 1054 public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir) throws IOException { 1055 // assumes we are in a region dir. 1056 FileStatus[] fds = fs.listStatus(regionDir, new FamilyDirFilter(fs)); 1057 List<Path> familyDirs = new ArrayList<>(fds.length); 1058 for (FileStatus fdfs: fds) { 1059 Path fdPath = fdfs.getPath(); 1060 familyDirs.add(fdPath); 1061 } 1062 return familyDirs; 1063 } 1064 1065 public static List<Path> getReferenceFilePaths(final FileSystem fs, final Path familyDir) throws IOException { 1066 List<FileStatus> fds = listStatusWithStatusFilter(fs, familyDir, new ReferenceFileFilter(fs)); 1067 if (fds == null) { 1068 return Collections.emptyList(); 1069 } 1070 List<Path> referenceFiles = new ArrayList<>(fds.size()); 1071 for (FileStatus fdfs: fds) { 1072 Path fdPath = fdfs.getPath(); 1073 referenceFiles.add(fdPath); 1074 } 1075 return referenceFiles; 1076 } 1077 1078 /** 1079 * Filter for HFiles that excludes reference files. 1080 */ 1081 public static class HFileFilter extends AbstractFileStatusFilter { 1082 final FileSystem fs; 1083 1084 public HFileFilter(FileSystem fs) { 1085 this.fs = fs; 1086 } 1087 1088 @Override 1089 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 1090 if (!StoreFileInfo.isHFile(p)) { 1091 return false; 1092 } 1093 1094 try { 1095 return isFile(fs, isDir, p); 1096 } catch (IOException ioe) { 1097 // Maybe the file was moved or the fs was disconnected. 1098 LOG.warn("Skipping file {} due to IOException", p, ioe); 1099 return false; 1100 } 1101 } 1102 } 1103 1104 /** 1105 * Filter for HFileLinks (StoreFiles and HFiles not included). 1106 * the filter itself does not consider if a link is file or not. 1107 */ 1108 public static class HFileLinkFilter implements PathFilter { 1109 1110 @Override 1111 public boolean accept(Path p) { 1112 return HFileLink.isHFileLink(p); 1113 } 1114 } 1115 1116 public static class ReferenceFileFilter extends AbstractFileStatusFilter { 1117 1118 private final FileSystem fs; 1119 1120 public ReferenceFileFilter(FileSystem fs) { 1121 this.fs = fs; 1122 } 1123 1124 @Override 1125 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 1126 if (!StoreFileInfo.isReference(p)) { 1127 return false; 1128 } 1129 1130 try { 1131 // only files can be references. 1132 return isFile(fs, isDir, p); 1133 } catch (IOException ioe) { 1134 // Maybe the file was moved or the fs was disconnected. 1135 LOG.warn("Skipping file {} due to IOException", p, ioe); 1136 return false; 1137 } 1138 } 1139 } 1140 1141 /** 1142 * Called every so-often by storefile map builder getTableStoreFilePathMap to 1143 * report progress. 1144 */ 1145 interface ProgressReporter { 1146 /** 1147 * @param status File or directory we are about to process. 1148 */ 1149 void progress(FileStatus status); 1150 } 1151 1152 /** 1153 * Runs through the HBase rootdir/tablename and creates a reverse lookup map for 1154 * table StoreFile names to the full Path. 1155 * <br> 1156 * Example...<br> 1157 * Key = 3944417774205889744 <br> 1158 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1159 * 1160 * @param map map to add values. If null, this method will create and populate one to return 1161 * @param fs The file system to use. 1162 * @param hbaseRootDir The root directory to scan. 1163 * @param tableName name of the table to scan. 1164 * @return Map keyed by StoreFile name with a value of the full Path. 1165 * @throws IOException When scanning the directory fails. 1166 * @throws InterruptedException 1167 */ 1168 public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map, 1169 final FileSystem fs, final Path hbaseRootDir, TableName tableName) 1170 throws IOException, InterruptedException { 1171 return getTableStoreFilePathMap(map, fs, hbaseRootDir, tableName, null, null, 1172 (ProgressReporter)null); 1173 } 1174 1175 /** 1176 * Runs through the HBase rootdir/tablename and creates a reverse lookup map for 1177 * table StoreFile names to the full Path. Note that because this method can be called 1178 * on a 'live' HBase system that we will skip files that no longer exist by the time 1179 * we traverse them and similarly the user of the result needs to consider that some 1180 * entries in this map may not exist by the time this call completes. 1181 * <br> 1182 * Example...<br> 1183 * Key = 3944417774205889744 <br> 1184 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1185 * 1186 * @param resultMap map to add values. If null, this method will create and populate one to return 1187 * @param fs The file system to use. 1188 * @param hbaseRootDir The root directory to scan. 1189 * @param tableName name of the table to scan. 1190 * @param sfFilter optional path filter to apply to store files 1191 * @param executor optional executor service to parallelize this operation 1192 * @param progressReporter Instance or null; gets called every time we move to new region of 1193 * family dir and for each store file. 1194 * @return Map keyed by StoreFile name with a value of the full Path. 1195 * @throws IOException When scanning the directory fails. 1196 * @deprecated Since 2.3.0. For removal in hbase4. Use ProgressReporter override instead. 1197 */ 1198 @Deprecated 1199 public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap, 1200 final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter, 1201 ExecutorService executor, final HbckErrorReporter progressReporter) 1202 throws IOException, InterruptedException { 1203 return getTableStoreFilePathMap(resultMap, fs, hbaseRootDir, tableName, sfFilter, executor, 1204 new ProgressReporter() { 1205 @Override 1206 public void progress(FileStatus status) { 1207 // status is not used in this implementation. 1208 progressReporter.progress(); 1209 } 1210 }); 1211 } 1212 1213 /** 1214 * Runs through the HBase rootdir/tablename and creates a reverse lookup map for 1215 * table StoreFile names to the full Path. Note that because this method can be called 1216 * on a 'live' HBase system that we will skip files that no longer exist by the time 1217 * we traverse them and similarly the user of the result needs to consider that some 1218 * entries in this map may not exist by the time this call completes. 1219 * <br> 1220 * Example...<br> 1221 * Key = 3944417774205889744 <br> 1222 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1223 * 1224 * @param resultMap map to add values. If null, this method will create and populate one 1225 * to return 1226 * @param fs The file system to use. 1227 * @param hbaseRootDir The root directory to scan. 1228 * @param tableName name of the table to scan. 1229 * @param sfFilter optional path filter to apply to store files 1230 * @param executor optional executor service to parallelize this operation 1231 * @param progressReporter Instance or null; gets called every time we move to new region of 1232 * family dir and for each store file. 1233 * @return Map keyed by StoreFile name with a value of the full Path. 1234 * @throws IOException When scanning the directory fails. 1235 * @throws InterruptedException the thread is interrupted, either before or during the activity. 1236 */ 1237 public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap, 1238 final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter, 1239 ExecutorService executor, final ProgressReporter progressReporter) 1240 throws IOException, InterruptedException { 1241 1242 final Map<String, Path> finalResultMap = 1243 resultMap == null ? new ConcurrentHashMap<>(128, 0.75f, 32) : resultMap; 1244 1245 // only include the directory paths to tables 1246 Path tableDir = CommonFSUtils.getTableDir(hbaseRootDir, tableName); 1247 // Inside a table, there are compaction.dir directories to skip. Otherwise, all else 1248 // should be regions. 1249 final FamilyDirFilter familyFilter = new FamilyDirFilter(fs); 1250 final Vector<Exception> exceptions = new Vector<>(); 1251 1252 try { 1253 List<FileStatus> regionDirs = FSUtils.listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs)); 1254 if (regionDirs == null) { 1255 return finalResultMap; 1256 } 1257 1258 final List<Future<?>> futures = new ArrayList<>(regionDirs.size()); 1259 1260 for (FileStatus regionDir : regionDirs) { 1261 if (null != progressReporter) { 1262 progressReporter.progress(regionDir); 1263 } 1264 final Path dd = regionDir.getPath(); 1265 1266 if (!exceptions.isEmpty()) { 1267 break; 1268 } 1269 1270 Runnable getRegionStoreFileMapCall = new Runnable() { 1271 @Override 1272 public void run() { 1273 try { 1274 HashMap<String,Path> regionStoreFileMap = new HashMap<>(); 1275 List<FileStatus> familyDirs = FSUtils.listStatusWithStatusFilter(fs, dd, familyFilter); 1276 if (familyDirs == null) { 1277 if (!fs.exists(dd)) { 1278 LOG.warn("Skipping region because it no longer exists: " + dd); 1279 } else { 1280 LOG.warn("Skipping region because it has no family dirs: " + dd); 1281 } 1282 return; 1283 } 1284 for (FileStatus familyDir : familyDirs) { 1285 if (null != progressReporter) { 1286 progressReporter.progress(familyDir); 1287 } 1288 Path family = familyDir.getPath(); 1289 if (family.getName().equals(HConstants.RECOVERED_EDITS_DIR)) { 1290 continue; 1291 } 1292 // now in family, iterate over the StoreFiles and 1293 // put in map 1294 FileStatus[] familyStatus = fs.listStatus(family); 1295 for (FileStatus sfStatus : familyStatus) { 1296 if (null != progressReporter) { 1297 progressReporter.progress(sfStatus); 1298 } 1299 Path sf = sfStatus.getPath(); 1300 if (sfFilter == null || sfFilter.accept(sf)) { 1301 regionStoreFileMap.put( sf.getName(), sf); 1302 } 1303 } 1304 } 1305 finalResultMap.putAll(regionStoreFileMap); 1306 } catch (Exception e) { 1307 LOG.error("Could not get region store file map for region: " + dd, e); 1308 exceptions.add(e); 1309 } 1310 } 1311 }; 1312 1313 // If executor is available, submit async tasks to exec concurrently, otherwise 1314 // just do serial sync execution 1315 if (executor != null) { 1316 Future<?> future = executor.submit(getRegionStoreFileMapCall); 1317 futures.add(future); 1318 } else { 1319 FutureTask<?> future = new FutureTask<>(getRegionStoreFileMapCall, null); 1320 future.run(); 1321 futures.add(future); 1322 } 1323 } 1324 1325 // Ensure all pending tasks are complete (or that we run into an exception) 1326 for (Future<?> f : futures) { 1327 if (!exceptions.isEmpty()) { 1328 break; 1329 } 1330 try { 1331 f.get(); 1332 } catch (ExecutionException e) { 1333 LOG.error("Unexpected exec exception! Should've been caught already. (Bug?)", e); 1334 // Shouldn't happen, we already logged/caught any exceptions in the Runnable 1335 } 1336 } 1337 } catch (IOException e) { 1338 LOG.error("Cannot execute getTableStoreFilePathMap for " + tableName, e); 1339 exceptions.add(e); 1340 } finally { 1341 if (!exceptions.isEmpty()) { 1342 // Just throw the first exception as an indication something bad happened 1343 // Don't need to propagate all the exceptions, we already logged them all anyway 1344 Throwables.propagateIfPossible(exceptions.firstElement(), IOException.class); 1345 throw new IOException(exceptions.firstElement()); 1346 } 1347 } 1348 1349 return finalResultMap; 1350 } 1351 1352 public static int getRegionReferenceFileCount(final FileSystem fs, final Path p) { 1353 int result = 0; 1354 try { 1355 for (Path familyDir:getFamilyDirs(fs, p)){ 1356 result += getReferenceFilePaths(fs, familyDir).size(); 1357 } 1358 } catch (IOException e) { 1359 LOG.warn("Error counting reference files", e); 1360 } 1361 return result; 1362 } 1363 1364 /** 1365 * Runs through the HBase rootdir and creates a reverse lookup map for 1366 * table StoreFile names to the full Path. 1367 * <br> 1368 * Example...<br> 1369 * Key = 3944417774205889744 <br> 1370 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1371 * 1372 * @param fs The file system to use. 1373 * @param hbaseRootDir The root directory to scan. 1374 * @return Map keyed by StoreFile name with a value of the full Path. 1375 * @throws IOException When scanning the directory fails. 1376 */ 1377 public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs, 1378 final Path hbaseRootDir) 1379 throws IOException, InterruptedException { 1380 return getTableStoreFilePathMap(fs, hbaseRootDir, null, null, (ProgressReporter)null); 1381 } 1382 1383 /** 1384 * Runs through the HBase rootdir and creates a reverse lookup map for 1385 * table StoreFile names to the full Path. 1386 * <br> 1387 * Example...<br> 1388 * Key = 3944417774205889744 <br> 1389 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1390 * 1391 * @param fs The file system to use. 1392 * @param hbaseRootDir The root directory to scan. 1393 * @param sfFilter optional path filter to apply to store files 1394 * @param executor optional executor service to parallelize this operation 1395 * @param progressReporter Instance or null; gets called every time we move to new region of 1396 * family dir and for each store file. 1397 * @return Map keyed by StoreFile name with a value of the full Path. 1398 * @throws IOException When scanning the directory fails. 1399 * @deprecated Since 2.3.0. Will be removed in hbase4. Used {@link 1400 * #getTableStoreFilePathMap(FileSystem, Path, PathFilter, ExecutorService, ProgressReporter)} 1401 */ 1402 @Deprecated 1403 public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs, 1404 final Path hbaseRootDir, PathFilter sfFilter, ExecutorService executor, 1405 HbckErrorReporter progressReporter) 1406 throws IOException, InterruptedException { 1407 return getTableStoreFilePathMap(fs, hbaseRootDir, sfFilter, executor, 1408 new ProgressReporter() { 1409 @Override 1410 public void progress(FileStatus status) { 1411 // status is not used in this implementation. 1412 progressReporter.progress(); 1413 } 1414 }); 1415 } 1416 1417 /** 1418 * Runs through the HBase rootdir and creates a reverse lookup map for 1419 * table StoreFile names to the full Path. 1420 * <br> 1421 * Example...<br> 1422 * Key = 3944417774205889744 <br> 1423 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1424 * 1425 * @param fs The file system to use. 1426 * @param hbaseRootDir The root directory to scan. 1427 * @param sfFilter optional path filter to apply to store files 1428 * @param executor optional executor service to parallelize this operation 1429 * @param progressReporter Instance or null; gets called every time we move to new region of 1430 * family dir and for each store file. 1431 * @return Map keyed by StoreFile name with a value of the full Path. 1432 * @throws IOException When scanning the directory fails. 1433 * @throws InterruptedException 1434 */ 1435 public static Map<String, Path> getTableStoreFilePathMap( 1436 final FileSystem fs, final Path hbaseRootDir, PathFilter sfFilter, 1437 ExecutorService executor, ProgressReporter progressReporter) 1438 throws IOException, InterruptedException { 1439 ConcurrentHashMap<String, Path> map = new ConcurrentHashMap<>(1024, 0.75f, 32); 1440 1441 // if this method looks similar to 'getTableFragmentation' that is because 1442 // it was borrowed from it. 1443 1444 // only include the directory paths to tables 1445 for (Path tableDir : FSUtils.getTableDirs(fs, hbaseRootDir)) { 1446 getTableStoreFilePathMap(map, fs, hbaseRootDir, CommonFSUtils.getTableName(tableDir), 1447 sfFilter, executor, progressReporter); 1448 } 1449 return map; 1450 } 1451 1452 /** 1453 * Filters FileStatuses in an array and returns a list 1454 * 1455 * @param input An array of FileStatuses 1456 * @param filter A required filter to filter the array 1457 * @return A list of FileStatuses 1458 */ 1459 public static List<FileStatus> filterFileStatuses(FileStatus[] input, 1460 FileStatusFilter filter) { 1461 if (input == null) return null; 1462 return filterFileStatuses(Iterators.forArray(input), filter); 1463 } 1464 1465 /** 1466 * Filters FileStatuses in an iterator and returns a list 1467 * 1468 * @param input An iterator of FileStatuses 1469 * @param filter A required filter to filter the array 1470 * @return A list of FileStatuses 1471 */ 1472 public static List<FileStatus> filterFileStatuses(Iterator<FileStatus> input, 1473 FileStatusFilter filter) { 1474 if (input == null) return null; 1475 ArrayList<FileStatus> results = new ArrayList<>(); 1476 while (input.hasNext()) { 1477 FileStatus f = input.next(); 1478 if (filter.accept(f)) { 1479 results.add(f); 1480 } 1481 } 1482 return results; 1483 } 1484 1485 /** 1486 * Calls fs.listStatus() and treats FileNotFoundException as non-fatal 1487 * This accommodates differences between hadoop versions, where hadoop 1 1488 * does not throw a FileNotFoundException, and return an empty FileStatus[] 1489 * while Hadoop 2 will throw FileNotFoundException. 1490 * 1491 * @param fs file system 1492 * @param dir directory 1493 * @param filter file status filter 1494 * @return null if dir is empty or doesn't exist, otherwise FileStatus list 1495 */ 1496 public static List<FileStatus> listStatusWithStatusFilter(final FileSystem fs, 1497 final Path dir, final FileStatusFilter filter) throws IOException { 1498 FileStatus [] status = null; 1499 try { 1500 status = fs.listStatus(dir); 1501 } catch (FileNotFoundException fnfe) { 1502 LOG.trace("{} does not exist", dir); 1503 return null; 1504 } 1505 1506 if (ArrayUtils.getLength(status) == 0) { 1507 return null; 1508 } 1509 1510 if (filter == null) { 1511 return Arrays.asList(status); 1512 } else { 1513 List<FileStatus> status2 = filterFileStatuses(status, filter); 1514 if (status2 == null || status2.isEmpty()) { 1515 return null; 1516 } else { 1517 return status2; 1518 } 1519 } 1520 } 1521 1522 /** 1523 * This function is to scan the root path of the file system to get the 1524 * degree of locality for each region on each of the servers having at least 1525 * one block of that region. 1526 * This is used by the tool {@link org.apache.hadoop.hbase.master.RegionPlacementMaintainer} 1527 * 1528 * @param conf 1529 * the configuration to use 1530 * @return the mapping from region encoded name to a map of server names to 1531 * locality fraction 1532 * @throws IOException 1533 * in case of file system errors or interrupts 1534 */ 1535 public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS( 1536 final Configuration conf) throws IOException { 1537 return getRegionDegreeLocalityMappingFromFS( 1538 conf, null, 1539 conf.getInt(THREAD_POOLSIZE, DEFAULT_THREAD_POOLSIZE)); 1540 1541 } 1542 1543 /** 1544 * This function is to scan the root path of the file system to get the 1545 * degree of locality for each region on each of the servers having at least 1546 * one block of that region. 1547 * 1548 * @param conf 1549 * the configuration to use 1550 * @param desiredTable 1551 * the table you wish to scan locality for 1552 * @param threadPoolSize 1553 * the thread pool size to use 1554 * @return the mapping from region encoded name to a map of server names to 1555 * locality fraction 1556 * @throws IOException 1557 * in case of file system errors or interrupts 1558 */ 1559 public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS( 1560 final Configuration conf, final String desiredTable, int threadPoolSize) 1561 throws IOException { 1562 Map<String, Map<String, Float>> regionDegreeLocalityMapping = new ConcurrentHashMap<>(); 1563 getRegionLocalityMappingFromFS(conf, desiredTable, threadPoolSize, regionDegreeLocalityMapping); 1564 return regionDegreeLocalityMapping; 1565 } 1566 1567 /** 1568 * This function is to scan the root path of the file system to get either the 1569 * mapping between the region name and its best locality region server or the 1570 * degree of locality of each region on each of the servers having at least 1571 * one block of that region. The output map parameters are both optional. 1572 * 1573 * @param conf 1574 * the configuration to use 1575 * @param desiredTable 1576 * the table you wish to scan locality for 1577 * @param threadPoolSize 1578 * the thread pool size to use 1579 * @param regionDegreeLocalityMapping 1580 * the map into which to put the locality degree mapping or null, 1581 * must be a thread-safe implementation 1582 * @throws IOException 1583 * in case of file system errors or interrupts 1584 */ 1585 private static void getRegionLocalityMappingFromFS(final Configuration conf, 1586 final String desiredTable, int threadPoolSize, 1587 final Map<String, Map<String, Float>> regionDegreeLocalityMapping) throws IOException { 1588 final FileSystem fs = FileSystem.get(conf); 1589 final Path rootPath = CommonFSUtils.getRootDir(conf); 1590 final long startTime = EnvironmentEdgeManager.currentTime(); 1591 final Path queryPath; 1592 // The table files are in ${hbase.rootdir}/data/<namespace>/<table>/* 1593 if (null == desiredTable) { 1594 queryPath = 1595 new Path(new Path(rootPath, HConstants.BASE_NAMESPACE_DIR).toString() + "/*/*/*/"); 1596 } else { 1597 queryPath = new Path( 1598 CommonFSUtils.getTableDir(rootPath, TableName.valueOf(desiredTable)).toString() + "/*/"); 1599 } 1600 1601 // reject all paths that are not appropriate 1602 PathFilter pathFilter = new PathFilter() { 1603 @Override 1604 public boolean accept(Path path) { 1605 // this is the region name; it may get some noise data 1606 if (null == path) { 1607 return false; 1608 } 1609 1610 // no parent? 1611 Path parent = path.getParent(); 1612 if (null == parent) { 1613 return false; 1614 } 1615 1616 String regionName = path.getName(); 1617 if (null == regionName) { 1618 return false; 1619 } 1620 1621 if (!regionName.toLowerCase(Locale.ROOT).matches("[0-9a-f]+")) { 1622 return false; 1623 } 1624 return true; 1625 } 1626 }; 1627 1628 FileStatus[] statusList = fs.globStatus(queryPath, pathFilter); 1629 1630 if (LOG.isDebugEnabled()) { 1631 LOG.debug("Query Path: {} ; # list of files: {}", queryPath, Arrays.toString(statusList)); 1632 } 1633 1634 if (null == statusList) { 1635 return; 1636 } 1637 1638 // lower the number of threads in case we have very few expected regions 1639 threadPoolSize = Math.min(threadPoolSize, statusList.length); 1640 1641 // run in multiple threads 1642 final ExecutorService tpe = Executors.newFixedThreadPool(threadPoolSize, 1643 Threads.newDaemonThreadFactory("FSRegionQuery")); 1644 try { 1645 // ignore all file status items that are not of interest 1646 for (FileStatus regionStatus : statusList) { 1647 if (null == regionStatus || !regionStatus.isDirectory()) { 1648 continue; 1649 } 1650 1651 final Path regionPath = regionStatus.getPath(); 1652 if (null != regionPath) { 1653 tpe.execute(new FSRegionScanner(fs, regionPath, null, regionDegreeLocalityMapping)); 1654 } 1655 } 1656 } finally { 1657 tpe.shutdown(); 1658 final long threadWakeFrequency = (long) conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 1659 HConstants.DEFAULT_THREAD_WAKE_FREQUENCY); 1660 try { 1661 // here we wait until TPE terminates, which is either naturally or by 1662 // exceptions in the execution of the threads 1663 while (!tpe.awaitTermination(threadWakeFrequency, 1664 TimeUnit.MILLISECONDS)) { 1665 // printing out rough estimate, so as to not introduce 1666 // AtomicInteger 1667 LOG.info("Locality checking is underway: { Scanned Regions : " 1668 + ((ThreadPoolExecutor) tpe).getCompletedTaskCount() + "/" 1669 + ((ThreadPoolExecutor) tpe).getTaskCount() + " }"); 1670 } 1671 } catch (InterruptedException e) { 1672 Thread.currentThread().interrupt(); 1673 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 1674 } 1675 } 1676 1677 long overhead = EnvironmentEdgeManager.currentTime() - startTime; 1678 LOG.info("Scan DFS for locality info takes {}ms", overhead); 1679 } 1680 1681 /** 1682 * Do our short circuit read setup. 1683 * Checks buffer size to use and whether to do checksumming in hbase or hdfs. 1684 * @param conf 1685 */ 1686 public static void setupShortCircuitRead(final Configuration conf) { 1687 // Check that the user has not set the "dfs.client.read.shortcircuit.skip.checksum" property. 1688 boolean shortCircuitSkipChecksum = 1689 conf.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false); 1690 boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true); 1691 if (shortCircuitSkipChecksum) { 1692 LOG.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not " + 1693 "be set to true." + (useHBaseChecksum ? " HBase checksum doesn't require " + 1694 "it, see https://issues.apache.org/jira/browse/HBASE-6868." : "")); 1695 assert !shortCircuitSkipChecksum; //this will fail if assertions are on 1696 } 1697 checkShortCircuitReadBufferSize(conf); 1698 } 1699 1700 /** 1701 * Check if short circuit read buffer size is set and if not, set it to hbase value. 1702 * @param conf 1703 */ 1704 public static void checkShortCircuitReadBufferSize(final Configuration conf) { 1705 final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2; 1706 final int notSet = -1; 1707 // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2 1708 final String dfsKey = "dfs.client.read.shortcircuit.buffer.size"; 1709 int size = conf.getInt(dfsKey, notSet); 1710 // If a size is set, return -- we will use it. 1711 if (size != notSet) return; 1712 // But short circuit buffer size is normally not set. Put in place the hbase wanted size. 1713 int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize); 1714 conf.setIfUnset(dfsKey, Integer.toString(hbaseSize)); 1715 } 1716 1717 /** 1718 * @param c 1719 * @return The DFSClient DFSHedgedReadMetrics instance or null if can't be found or not on hdfs. 1720 * @throws IOException 1721 */ 1722 public static DFSHedgedReadMetrics getDFSHedgedReadMetrics(final Configuration c) 1723 throws IOException { 1724 if (!CommonFSUtils.isHDFS(c)) { 1725 return null; 1726 } 1727 // getHedgedReadMetrics is package private. Get the DFSClient instance that is internal 1728 // to the DFS FS instance and make the method getHedgedReadMetrics accessible, then invoke it 1729 // to get the singleton instance of DFSHedgedReadMetrics shared by DFSClients. 1730 final String name = "getHedgedReadMetrics"; 1731 DFSClient dfsclient = ((DistributedFileSystem)FileSystem.get(c)).getClient(); 1732 Method m; 1733 try { 1734 m = dfsclient.getClass().getDeclaredMethod(name); 1735 } catch (NoSuchMethodException e) { 1736 LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " + 1737 e.getMessage()); 1738 return null; 1739 } catch (SecurityException e) { 1740 LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " + 1741 e.getMessage()); 1742 return null; 1743 } 1744 m.setAccessible(true); 1745 try { 1746 return (DFSHedgedReadMetrics)m.invoke(dfsclient); 1747 } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { 1748 LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " + 1749 e.getMessage()); 1750 return null; 1751 } 1752 } 1753 1754 public static List<Path> copyFilesParallel(FileSystem srcFS, Path src, FileSystem dstFS, Path dst, 1755 Configuration conf, int threads) throws IOException { 1756 ExecutorService pool = Executors.newFixedThreadPool(threads); 1757 List<Future<Void>> futures = new ArrayList<>(); 1758 List<Path> traversedPaths; 1759 try { 1760 traversedPaths = copyFiles(srcFS, src, dstFS, dst, conf, pool, futures); 1761 for (Future<Void> future : futures) { 1762 future.get(); 1763 } 1764 } catch (ExecutionException | InterruptedException | IOException e) { 1765 throw new IOException("Copy snapshot reference files failed", e); 1766 } finally { 1767 pool.shutdownNow(); 1768 } 1769 return traversedPaths; 1770 } 1771 1772 private static List<Path> copyFiles(FileSystem srcFS, Path src, FileSystem dstFS, Path dst, 1773 Configuration conf, ExecutorService pool, List<Future<Void>> futures) throws IOException { 1774 List<Path> traversedPaths = new ArrayList<>(); 1775 traversedPaths.add(dst); 1776 FileStatus currentFileStatus = srcFS.getFileStatus(src); 1777 if (currentFileStatus.isDirectory()) { 1778 if (!dstFS.mkdirs(dst)) { 1779 throw new IOException("Create directory failed: " + dst); 1780 } 1781 FileStatus[] subPaths = srcFS.listStatus(src); 1782 for (FileStatus subPath : subPaths) { 1783 traversedPaths.addAll(copyFiles(srcFS, subPath.getPath(), dstFS, 1784 new Path(dst, subPath.getPath().getName()), conf, pool, futures)); 1785 } 1786 } else { 1787 Future<Void> future = pool.submit(() -> { 1788 FileUtil.copy(srcFS, src, dstFS, dst, false, false, conf); 1789 return null; 1790 }); 1791 futures.add(future); 1792 } 1793 return traversedPaths; 1794 } 1795 1796 /** 1797 * @return A set containing all namenode addresses of fs 1798 */ 1799 private static Set<InetSocketAddress> getNNAddresses(DistributedFileSystem fs, 1800 Configuration conf) { 1801 Set<InetSocketAddress> addresses = new HashSet<>(); 1802 String serviceName = fs.getCanonicalServiceName(); 1803 1804 if (serviceName.startsWith("ha-hdfs")) { 1805 try { 1806 Map<String, Map<String, InetSocketAddress>> addressMap = 1807 DFSUtil.getNNServiceRpcAddressesForCluster(conf); 1808 String nameService = serviceName.substring(serviceName.indexOf(":") + 1); 1809 if (addressMap.containsKey(nameService)) { 1810 Map<String, InetSocketAddress> nnMap = addressMap.get(nameService); 1811 for (Map.Entry<String, InetSocketAddress> e2 : nnMap.entrySet()) { 1812 InetSocketAddress addr = e2.getValue(); 1813 addresses.add(addr); 1814 } 1815 } 1816 } catch (Exception e) { 1817 LOG.warn("DFSUtil.getNNServiceRpcAddresses failed. serviceName=" + serviceName, e); 1818 } 1819 } else { 1820 URI uri = fs.getUri(); 1821 int port = uri.getPort(); 1822 if (port < 0) { 1823 int idx = serviceName.indexOf(':'); 1824 port = Integer.parseInt(serviceName.substring(idx + 1)); 1825 } 1826 InetSocketAddress addr = new InetSocketAddress(uri.getHost(), port); 1827 addresses.add(addr); 1828 } 1829 1830 return addresses; 1831 } 1832 1833 /** 1834 * @param conf the Configuration of HBase 1835 * @return Whether srcFs and desFs are on same hdfs or not 1836 */ 1837 public static boolean isSameHdfs(Configuration conf, FileSystem srcFs, FileSystem desFs) { 1838 // By getCanonicalServiceName, we could make sure both srcFs and desFs 1839 // show a unified format which contains scheme, host and port. 1840 String srcServiceName = srcFs.getCanonicalServiceName(); 1841 String desServiceName = desFs.getCanonicalServiceName(); 1842 1843 if (srcServiceName == null || desServiceName == null) { 1844 return false; 1845 } 1846 if (srcServiceName.equals(desServiceName)) { 1847 return true; 1848 } 1849 if (srcServiceName.startsWith("ha-hdfs") && desServiceName.startsWith("ha-hdfs")) { 1850 Collection<String> internalNameServices = 1851 conf.getTrimmedStringCollection("dfs.internal.nameservices"); 1852 if (!internalNameServices.isEmpty()) { 1853 if (internalNameServices.contains(srcServiceName.split(":")[1])) { 1854 return true; 1855 } else { 1856 return false; 1857 } 1858 } 1859 } 1860 if (srcFs instanceof DistributedFileSystem && desFs instanceof DistributedFileSystem) { 1861 // If one serviceName is an HA format while the other is a non-HA format, 1862 // maybe they refer to the same FileSystem. 1863 // For example, srcFs is "ha-hdfs://nameservices" and desFs is "hdfs://activeNamenode:port" 1864 Set<InetSocketAddress> srcAddrs = getNNAddresses((DistributedFileSystem) srcFs, conf); 1865 Set<InetSocketAddress> desAddrs = getNNAddresses((DistributedFileSystem) desFs, conf); 1866 if (Sets.intersection(srcAddrs, desAddrs).size() > 0) { 1867 return true; 1868 } 1869 } 1870 1871 return false; 1872 } 1873}