001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.util; 020 021import edu.umd.cs.findbugs.annotations.CheckForNull; 022import java.io.ByteArrayInputStream; 023import java.io.DataInputStream; 024import java.io.EOFException; 025import java.io.FileNotFoundException; 026import java.io.IOException; 027import java.io.InterruptedIOException; 028import java.lang.reflect.InvocationTargetException; 029import java.lang.reflect.Method; 030import java.net.InetSocketAddress; 031import java.net.URI; 032import java.util.ArrayList; 033import java.util.Arrays; 034import java.util.Collection; 035import java.util.Collections; 036import java.util.HashMap; 037import java.util.HashSet; 038import java.util.Iterator; 039import java.util.List; 040import java.util.Locale; 041import java.util.Map; 042import java.util.Set; 043import java.util.Vector; 044import java.util.concurrent.ConcurrentHashMap; 045import java.util.concurrent.ExecutionException; 046import java.util.concurrent.ExecutorService; 047import java.util.concurrent.Executors; 048import java.util.concurrent.Future; 049import java.util.concurrent.FutureTask; 050import java.util.concurrent.ThreadPoolExecutor; 051import java.util.concurrent.TimeUnit; 052import java.util.regex.Pattern; 053import org.apache.commons.lang3.ArrayUtils; 054import org.apache.hadoop.conf.Configuration; 055import org.apache.hadoop.fs.BlockLocation; 056import org.apache.hadoop.fs.FSDataInputStream; 057import org.apache.hadoop.fs.FSDataOutputStream; 058import org.apache.hadoop.fs.FileStatus; 059import org.apache.hadoop.fs.FileSystem; 060import org.apache.hadoop.fs.FileUtil; 061import org.apache.hadoop.fs.Path; 062import org.apache.hadoop.fs.PathFilter; 063import org.apache.hadoop.fs.StorageType; 064import org.apache.hadoop.fs.permission.FsPermission; 065import org.apache.hadoop.hbase.ClusterId; 066import org.apache.hadoop.hbase.HColumnDescriptor; 067import org.apache.hadoop.hbase.HConstants; 068import org.apache.hadoop.hbase.HDFSBlocksDistribution; 069import org.apache.hadoop.hbase.TableName; 070import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 071import org.apache.hadoop.hbase.client.RegionInfo; 072import org.apache.hadoop.hbase.client.RegionInfoBuilder; 073import org.apache.hadoop.hbase.exceptions.DeserializationException; 074import org.apache.hadoop.hbase.fs.HFileSystem; 075import org.apache.hadoop.hbase.io.HFileLink; 076import org.apache.hadoop.hbase.master.HMaster; 077import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 078import org.apache.hadoop.hdfs.DFSClient; 079import org.apache.hadoop.hdfs.DFSHedgedReadMetrics; 080import org.apache.hadoop.hdfs.DFSUtil; 081import org.apache.hadoop.hdfs.DistributedFileSystem; 082import org.apache.hadoop.hdfs.protocol.HdfsConstants; 083import org.apache.hadoop.io.IOUtils; 084import org.apache.hadoop.ipc.RemoteException; 085import org.apache.hadoop.util.Progressable; 086import org.apache.hadoop.util.StringUtils; 087import org.apache.yetus.audience.InterfaceAudience; 088import org.slf4j.Logger; 089import org.slf4j.LoggerFactory; 090 091import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 092import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 093import org.apache.hbase.thirdparty.com.google.common.collect.Iterators; 094import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 095import org.apache.hbase.thirdparty.com.google.common.primitives.Ints; 096 097import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 098import org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos; 099 100/** 101 * Utility methods for interacting with the underlying file system. 102 */ 103@InterfaceAudience.Private 104public final class FSUtils { 105 private static final Logger LOG = LoggerFactory.getLogger(FSUtils.class); 106 107 private static final String THREAD_POOLSIZE = "hbase.client.localityCheck.threadPoolSize"; 108 private static final int DEFAULT_THREAD_POOLSIZE = 2; 109 110 /** Set to true on Windows platforms */ 111 // currently only used in testing. TODO refactor into a test class 112 public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows"); 113 114 private FSUtils() { 115 } 116 117 /** 118 * @return True is <code>fs</code> is instance of DistributedFileSystem 119 * @throws IOException 120 */ 121 public static boolean isDistributedFileSystem(final FileSystem fs) throws IOException { 122 FileSystem fileSystem = fs; 123 // If passed an instance of HFileSystem, it fails instanceof DistributedFileSystem. 124 // Check its backing fs for dfs-ness. 125 if (fs instanceof HFileSystem) { 126 fileSystem = ((HFileSystem)fs).getBackingFs(); 127 } 128 return fileSystem instanceof DistributedFileSystem; 129 } 130 131 /** 132 * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the 133 * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true. Does not consider 134 * schema; i.e. if schemas different but path or subpath matches, the two will equate. 135 * @param pathToSearch Path we will be trying to match. 136 * @param pathTail 137 * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code> 138 */ 139 public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) { 140 Path tailPath = pathTail; 141 String tailName; 142 Path toSearch = pathToSearch; 143 String toSearchName; 144 boolean result = false; 145 146 if (pathToSearch.depth() != pathTail.depth()) { 147 return false; 148 } 149 150 do { 151 tailName = tailPath.getName(); 152 if (tailName == null || tailName.isEmpty()) { 153 result = true; 154 break; 155 } 156 toSearchName = toSearch.getName(); 157 if (toSearchName == null || toSearchName.isEmpty()) { 158 break; 159 } 160 // Move up a parent on each path for next go around. Path doesn't let us go off the end. 161 tailPath = tailPath.getParent(); 162 toSearch = toSearch.getParent(); 163 } while(tailName.equals(toSearchName)); 164 return result; 165 } 166 167 /** 168 * Delete the region directory if exists. 169 * @return True if deleted the region directory. 170 * @throws IOException 171 */ 172 public static boolean deleteRegionDir(final Configuration conf, final RegionInfo hri) 173 throws IOException { 174 Path rootDir = CommonFSUtils.getRootDir(conf); 175 FileSystem fs = rootDir.getFileSystem(conf); 176 return CommonFSUtils.deleteDirectory(fs, 177 new Path(CommonFSUtils.getTableDir(rootDir, hri.getTable()), hri.getEncodedName())); 178 } 179 180 /** 181 * Create the specified file on the filesystem. By default, this will: 182 * <ol> 183 * <li>overwrite the file if it exists</li> 184 * <li>apply the umask in the configuration (if it is enabled)</li> 185 * <li>use the fs configured buffer size (or 4096 if not set)</li> 186 * <li>use the configured column family replication or default replication if 187 * {@link ColumnFamilyDescriptorBuilder#DEFAULT_DFS_REPLICATION}</li> 188 * <li>use the default block size</li> 189 * <li>not track progress</li> 190 * </ol> 191 * @param conf configurations 192 * @param fs {@link FileSystem} on which to write the file 193 * @param path {@link Path} to the file to write 194 * @param perm permissions 195 * @param favoredNodes favored data nodes 196 * @return output stream to the created file 197 * @throws IOException if the file cannot be created 198 */ 199 public static FSDataOutputStream create(Configuration conf, FileSystem fs, Path path, 200 FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException { 201 if (fs instanceof HFileSystem) { 202 FileSystem backingFs = ((HFileSystem) fs).getBackingFs(); 203 if (backingFs instanceof DistributedFileSystem) { 204 // Try to use the favoredNodes version via reflection to allow backwards- 205 // compatibility. 206 short replication = Short.parseShort(conf.get(ColumnFamilyDescriptorBuilder.DFS_REPLICATION, 207 String.valueOf(ColumnFamilyDescriptorBuilder.DEFAULT_DFS_REPLICATION))); 208 try { 209 return (FSDataOutputStream) (DistributedFileSystem.class 210 .getDeclaredMethod("create", Path.class, FsPermission.class, boolean.class, int.class, 211 short.class, long.class, Progressable.class, InetSocketAddress[].class) 212 .invoke(backingFs, path, perm, true, CommonFSUtils.getDefaultBufferSize(backingFs), 213 replication > 0 ? replication : CommonFSUtils.getDefaultReplication(backingFs, path), 214 CommonFSUtils.getDefaultBlockSize(backingFs, path), null, favoredNodes)); 215 } catch (InvocationTargetException ite) { 216 // Function was properly called, but threw it's own exception. 217 throw new IOException(ite.getCause()); 218 } catch (NoSuchMethodException e) { 219 LOG.debug("DFS Client does not support most favored nodes create; using default create"); 220 LOG.trace("Ignoring; use default create", e); 221 } catch (IllegalArgumentException | SecurityException | IllegalAccessException e) { 222 LOG.debug("Ignoring (most likely Reflection related exception) " + e); 223 } 224 } 225 } 226 return CommonFSUtils.create(fs, path, perm, true); 227 } 228 229 /** 230 * Checks to see if the specified file system is available 231 * 232 * @param fs filesystem 233 * @throws IOException e 234 */ 235 public static void checkFileSystemAvailable(final FileSystem fs) 236 throws IOException { 237 if (!(fs instanceof DistributedFileSystem)) { 238 return; 239 } 240 IOException exception = null; 241 DistributedFileSystem dfs = (DistributedFileSystem) fs; 242 try { 243 if (dfs.exists(new Path("/"))) { 244 return; 245 } 246 } catch (IOException e) { 247 exception = e instanceof RemoteException ? 248 ((RemoteException)e).unwrapRemoteException() : e; 249 } 250 try { 251 fs.close(); 252 } catch (Exception e) { 253 LOG.error("file system close failed: ", e); 254 } 255 throw new IOException("File system is not available", exception); 256 } 257 258 /** 259 * We use reflection because {@link DistributedFileSystem#setSafeMode( 260 * HdfsConstants.SafeModeAction action, boolean isChecked)} is not in hadoop 1.1 261 * 262 * @param dfs 263 * @return whether we're in safe mode 264 * @throws IOException 265 */ 266 private static boolean isInSafeMode(DistributedFileSystem dfs) throws IOException { 267 boolean inSafeMode = false; 268 try { 269 Method m = DistributedFileSystem.class.getMethod("setSafeMode", new Class<?> []{ 270 org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.class, boolean.class}); 271 inSafeMode = (Boolean) m.invoke(dfs, 272 org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET, true); 273 } catch (Exception e) { 274 if (e instanceof IOException) throw (IOException) e; 275 276 // Check whether dfs is on safemode. 277 inSafeMode = dfs.setSafeMode( 278 org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET); 279 } 280 return inSafeMode; 281 } 282 283 /** 284 * Check whether dfs is in safemode. 285 * @param conf 286 * @throws IOException 287 */ 288 public static void checkDfsSafeMode(final Configuration conf) 289 throws IOException { 290 boolean isInSafeMode = false; 291 FileSystem fs = FileSystem.get(conf); 292 if (fs instanceof DistributedFileSystem) { 293 DistributedFileSystem dfs = (DistributedFileSystem)fs; 294 isInSafeMode = isInSafeMode(dfs); 295 } 296 if (isInSafeMode) { 297 throw new IOException("File system is in safemode, it can't be written now"); 298 } 299 } 300 301 /** 302 * Verifies current version of file system 303 * 304 * @param fs filesystem object 305 * @param rootdir root hbase directory 306 * @return null if no version file exists, version string otherwise 307 * @throws IOException if the version file fails to open 308 * @throws DeserializationException if the version data cannot be translated into a version 309 */ 310 public static String getVersion(FileSystem fs, Path rootdir) 311 throws IOException, DeserializationException { 312 final Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME); 313 FileStatus[] status = null; 314 try { 315 // hadoop 2.0 throws FNFE if directory does not exist. 316 // hadoop 1.0 returns null if directory does not exist. 317 status = fs.listStatus(versionFile); 318 } catch (FileNotFoundException fnfe) { 319 return null; 320 } 321 if (ArrayUtils.getLength(status) == 0) { 322 return null; 323 } 324 String version = null; 325 byte [] content = new byte [(int)status[0].getLen()]; 326 FSDataInputStream s = fs.open(versionFile); 327 try { 328 IOUtils.readFully(s, content, 0, content.length); 329 if (ProtobufUtil.isPBMagicPrefix(content)) { 330 version = parseVersionFrom(content); 331 } else { 332 // Presume it pre-pb format. 333 try (DataInputStream dis = new DataInputStream(new ByteArrayInputStream(content))) { 334 version = dis.readUTF(); 335 } 336 } 337 } catch (EOFException eof) { 338 LOG.warn("Version file was empty, odd, will try to set it."); 339 } finally { 340 s.close(); 341 } 342 return version; 343 } 344 345 /** 346 * Parse the content of the ${HBASE_ROOTDIR}/hbase.version file. 347 * @param bytes The byte content of the hbase.version file 348 * @return The version found in the file as a String 349 * @throws DeserializationException if the version data cannot be translated into a version 350 */ 351 static String parseVersionFrom(final byte [] bytes) 352 throws DeserializationException { 353 ProtobufUtil.expectPBMagicPrefix(bytes); 354 int pblen = ProtobufUtil.lengthOfPBMagic(); 355 FSProtos.HBaseVersionFileContent.Builder builder = 356 FSProtos.HBaseVersionFileContent.newBuilder(); 357 try { 358 ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen); 359 return builder.getVersion(); 360 } catch (IOException e) { 361 // Convert 362 throw new DeserializationException(e); 363 } 364 } 365 366 /** 367 * Create the content to write into the ${HBASE_ROOTDIR}/hbase.version file. 368 * @param version Version to persist 369 * @return Serialized protobuf with <code>version</code> content and a bit of pb magic for a prefix. 370 */ 371 static byte [] toVersionByteArray(final String version) { 372 FSProtos.HBaseVersionFileContent.Builder builder = 373 FSProtos.HBaseVersionFileContent.newBuilder(); 374 return ProtobufUtil.prependPBMagic(builder.setVersion(version).build().toByteArray()); 375 } 376 377 /** 378 * Verifies current version of file system 379 * 380 * @param fs file system 381 * @param rootdir root directory of HBase installation 382 * @param message if true, issues a message on System.out 383 * @throws IOException if the version file cannot be opened 384 * @throws DeserializationException if the contents of the version file cannot be parsed 385 */ 386 public static void checkVersion(FileSystem fs, Path rootdir, boolean message) 387 throws IOException, DeserializationException { 388 checkVersion(fs, rootdir, message, 0, HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS); 389 } 390 391 /** 392 * Verifies current version of file system 393 * 394 * @param fs file system 395 * @param rootdir root directory of HBase installation 396 * @param message if true, issues a message on System.out 397 * @param wait wait interval 398 * @param retries number of times to retry 399 * 400 * @throws IOException if the version file cannot be opened 401 * @throws DeserializationException if the contents of the version file cannot be parsed 402 */ 403 public static void checkVersion(FileSystem fs, Path rootdir, 404 boolean message, int wait, int retries) 405 throws IOException, DeserializationException { 406 String version = getVersion(fs, rootdir); 407 String msg; 408 if (version == null) { 409 if (!metaRegionExists(fs, rootdir)) { 410 // rootDir is empty (no version file and no root region) 411 // just create new version file (HBASE-1195) 412 setVersion(fs, rootdir, wait, retries); 413 return; 414 } else { 415 msg = "hbase.version file is missing. Is your hbase.rootdir valid? " + 416 "You can restore hbase.version file by running 'HBCK2 filesystem -fix'. " + 417 "See https://github.com/apache/hbase-operator-tools/tree/master/hbase-hbck2"; 418 } 419 } else if (version.compareTo(HConstants.FILE_SYSTEM_VERSION) == 0) { 420 return; 421 } else { 422 msg = "HBase file layout needs to be upgraded. Current filesystem version is " + version + 423 " but software requires version " + HConstants.FILE_SYSTEM_VERSION + 424 ". Consult http://hbase.apache.org/book.html for further information about " + 425 "upgrading HBase."; 426 } 427 428 // version is deprecated require migration 429 // Output on stdout so user sees it in terminal. 430 if (message) { 431 System.out.println("WARNING! " + msg); 432 } 433 throw new FileSystemVersionException(msg); 434 } 435 436 /** 437 * Sets version of file system 438 * 439 * @param fs filesystem object 440 * @param rootdir hbase root 441 * @throws IOException e 442 */ 443 public static void setVersion(FileSystem fs, Path rootdir) 444 throws IOException { 445 setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, 0, 446 HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS); 447 } 448 449 /** 450 * Sets version of file system 451 * 452 * @param fs filesystem object 453 * @param rootdir hbase root 454 * @param wait time to wait for retry 455 * @param retries number of times to retry before failing 456 * @throws IOException e 457 */ 458 public static void setVersion(FileSystem fs, Path rootdir, int wait, int retries) 459 throws IOException { 460 setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, wait, retries); 461 } 462 463 464 /** 465 * Sets version of file system 466 * 467 * @param fs filesystem object 468 * @param rootdir hbase root directory 469 * @param version version to set 470 * @param wait time to wait for retry 471 * @param retries number of times to retry before throwing an IOException 472 * @throws IOException e 473 */ 474 public static void setVersion(FileSystem fs, Path rootdir, String version, 475 int wait, int retries) throws IOException { 476 Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME); 477 Path tempVersionFile = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY + Path.SEPARATOR + 478 HConstants.VERSION_FILE_NAME); 479 while (true) { 480 try { 481 // Write the version to a temporary file 482 FSDataOutputStream s = fs.create(tempVersionFile); 483 try { 484 s.write(toVersionByteArray(version)); 485 s.close(); 486 s = null; 487 // Move the temp version file to its normal location. Returns false 488 // if the rename failed. Throw an IOE in that case. 489 if (!fs.rename(tempVersionFile, versionFile)) { 490 throw new IOException("Unable to move temp version file to " + versionFile); 491 } 492 } finally { 493 // Cleaning up the temporary if the rename failed would be trying 494 // too hard. We'll unconditionally create it again the next time 495 // through anyway, files are overwritten by default by create(). 496 497 // Attempt to close the stream on the way out if it is still open. 498 try { 499 if (s != null) s.close(); 500 } catch (IOException ignore) { } 501 } 502 LOG.info("Created version file at " + rootdir.toString() + " with version=" + version); 503 return; 504 } catch (IOException e) { 505 if (retries > 0) { 506 LOG.debug("Unable to create version file at " + rootdir.toString() + ", retrying", e); 507 fs.delete(versionFile, false); 508 try { 509 if (wait > 0) { 510 Thread.sleep(wait); 511 } 512 } catch (InterruptedException ie) { 513 throw (InterruptedIOException)new InterruptedIOException().initCause(ie); 514 } 515 retries--; 516 } else { 517 throw e; 518 } 519 } 520 } 521 } 522 523 /** 524 * Checks that a cluster ID file exists in the HBase root directory 525 * @param fs the root directory FileSystem 526 * @param rootdir the HBase root directory in HDFS 527 * @param wait how long to wait between retries 528 * @return <code>true</code> if the file exists, otherwise <code>false</code> 529 * @throws IOException if checking the FileSystem fails 530 */ 531 public static boolean checkClusterIdExists(FileSystem fs, Path rootdir, 532 long wait) throws IOException { 533 while (true) { 534 try { 535 Path filePath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME); 536 return fs.exists(filePath); 537 } catch (IOException ioe) { 538 if (wait > 0L) { 539 LOG.warn("Unable to check cluster ID file in {}, retrying in {}ms", rootdir, wait, ioe); 540 try { 541 Thread.sleep(wait); 542 } catch (InterruptedException e) { 543 Thread.currentThread().interrupt(); 544 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 545 } 546 } else { 547 throw ioe; 548 } 549 } 550 } 551 } 552 553 /** 554 * Returns the value of the unique cluster ID stored for this HBase instance. 555 * @param fs the root directory FileSystem 556 * @param rootdir the path to the HBase root directory 557 * @return the unique cluster identifier 558 * @throws IOException if reading the cluster ID file fails 559 */ 560 public static ClusterId getClusterId(FileSystem fs, Path rootdir) 561 throws IOException { 562 Path idPath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME); 563 ClusterId clusterId = null; 564 FileStatus status = fs.exists(idPath)? fs.getFileStatus(idPath): null; 565 if (status != null) { 566 int len = Ints.checkedCast(status.getLen()); 567 byte [] content = new byte[len]; 568 FSDataInputStream in = fs.open(idPath); 569 try { 570 in.readFully(content); 571 } catch (EOFException eof) { 572 LOG.warn("Cluster ID file {} is empty", idPath); 573 } finally{ 574 in.close(); 575 } 576 try { 577 clusterId = ClusterId.parseFrom(content); 578 } catch (DeserializationException e) { 579 throw new IOException("content=" + Bytes.toString(content), e); 580 } 581 // If not pb'd, make it so. 582 if (!ProtobufUtil.isPBMagicPrefix(content)) { 583 String cid = null; 584 in = fs.open(idPath); 585 try { 586 cid = in.readUTF(); 587 clusterId = new ClusterId(cid); 588 } catch (EOFException eof) { 589 LOG.warn("Cluster ID file {} is empty", idPath); 590 } finally { 591 in.close(); 592 } 593 rewriteAsPb(fs, rootdir, idPath, clusterId); 594 } 595 return clusterId; 596 } else { 597 LOG.warn("Cluster ID file does not exist at {}", idPath); 598 } 599 return clusterId; 600 } 601 602 /** 603 * @param cid 604 * @throws IOException 605 */ 606 private static void rewriteAsPb(final FileSystem fs, final Path rootdir, final Path p, 607 final ClusterId cid) 608 throws IOException { 609 // Rewrite the file as pb. Move aside the old one first, write new 610 // then delete the moved-aside file. 611 Path movedAsideName = new Path(p + "." + System.currentTimeMillis()); 612 if (!fs.rename(p, movedAsideName)) throw new IOException("Failed rename of " + p); 613 setClusterId(fs, rootdir, cid, 100); 614 if (!fs.delete(movedAsideName, false)) { 615 throw new IOException("Failed delete of " + movedAsideName); 616 } 617 LOG.debug("Rewrote the hbase.id file as pb"); 618 } 619 620 /** 621 * Writes a new unique identifier for this cluster to the "hbase.id" file 622 * in the HBase root directory 623 * @param fs the root directory FileSystem 624 * @param rootdir the path to the HBase root directory 625 * @param clusterId the unique identifier to store 626 * @param wait how long (in milliseconds) to wait between retries 627 * @throws IOException if writing to the FileSystem fails and no wait value 628 */ 629 public static void setClusterId(FileSystem fs, Path rootdir, ClusterId clusterId, 630 int wait) throws IOException { 631 while (true) { 632 try { 633 Path idFile = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME); 634 Path tempIdFile = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY + 635 Path.SEPARATOR + HConstants.CLUSTER_ID_FILE_NAME); 636 // Write the id file to a temporary location 637 FSDataOutputStream s = fs.create(tempIdFile); 638 try { 639 s.write(clusterId.toByteArray()); 640 s.close(); 641 s = null; 642 // Move the temporary file to its normal location. Throw an IOE if 643 // the rename failed 644 if (!fs.rename(tempIdFile, idFile)) { 645 throw new IOException("Unable to move temp version file to " + idFile); 646 } 647 } finally { 648 // Attempt to close the stream if still open on the way out 649 try { 650 if (s != null) s.close(); 651 } catch (IOException ignore) { } 652 } 653 if (LOG.isDebugEnabled()) { 654 LOG.debug("Created cluster ID file at " + idFile.toString() + " with ID: " + clusterId); 655 } 656 return; 657 } catch (IOException ioe) { 658 if (wait > 0) { 659 LOG.warn("Unable to create cluster ID file in " + rootdir.toString() + 660 ", retrying in " + wait + "msec: " + StringUtils.stringifyException(ioe)); 661 try { 662 Thread.sleep(wait); 663 } catch (InterruptedException e) { 664 throw (InterruptedIOException)new InterruptedIOException().initCause(e); 665 } 666 } else { 667 throw ioe; 668 } 669 } 670 } 671 } 672 673 /** 674 * If DFS, check safe mode and if so, wait until we clear it. 675 * @param conf configuration 676 * @param wait Sleep between retries 677 * @throws IOException e 678 */ 679 public static void waitOnSafeMode(final Configuration conf, 680 final long wait) 681 throws IOException { 682 FileSystem fs = FileSystem.get(conf); 683 if (!(fs instanceof DistributedFileSystem)) return; 684 DistributedFileSystem dfs = (DistributedFileSystem)fs; 685 // Make sure dfs is not in safe mode 686 while (isInSafeMode(dfs)) { 687 LOG.info("Waiting for dfs to exit safe mode..."); 688 try { 689 Thread.sleep(wait); 690 } catch (InterruptedException e) { 691 Thread.currentThread().interrupt(); 692 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 693 } 694 } 695 } 696 697 /** 698 * Checks if meta region exists 699 * @param fs file system 700 * @param rootDir root directory of HBase installation 701 * @return true if exists 702 */ 703 public static boolean metaRegionExists(FileSystem fs, Path rootDir) throws IOException { 704 Path metaRegionDir = getRegionDirFromRootDir(rootDir, RegionInfoBuilder.FIRST_META_REGIONINFO); 705 return fs.exists(metaRegionDir); 706 } 707 708 /** 709 * Compute HDFS blocks distribution of a given file, or a portion of the file 710 * @param fs file system 711 * @param status file status of the file 712 * @param start start position of the portion 713 * @param length length of the portion 714 * @return The HDFS blocks distribution 715 */ 716 static public HDFSBlocksDistribution computeHDFSBlocksDistribution( 717 final FileSystem fs, FileStatus status, long start, long length) 718 throws IOException { 719 HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution(); 720 BlockLocation [] blockLocations = 721 fs.getFileBlockLocations(status, start, length); 722 addToHDFSBlocksDistribution(blocksDistribution, blockLocations); 723 return blocksDistribution; 724 } 725 726 /** 727 * Update blocksDistribution with blockLocations 728 * @param blocksDistribution the hdfs blocks distribution 729 * @param blockLocations an array containing block location 730 */ 731 static public void addToHDFSBlocksDistribution( 732 HDFSBlocksDistribution blocksDistribution, BlockLocation[] blockLocations) 733 throws IOException { 734 for (BlockLocation bl : blockLocations) { 735 String[] hosts = bl.getHosts(); 736 long len = bl.getLength(); 737 StorageType[] storageTypes = bl.getStorageTypes(); 738 blocksDistribution.addHostsAndBlockWeight(hosts, len, storageTypes); 739 } 740 } 741 742 // TODO move this method OUT of FSUtils. No dependencies to HMaster 743 /** 744 * Returns the total overall fragmentation percentage. Includes hbase:meta and 745 * -ROOT- as well. 746 * 747 * @param master The master defining the HBase root and file system 748 * @return A map for each table and its percentage (never null) 749 * @throws IOException When scanning the directory fails 750 */ 751 public static int getTotalTableFragmentation(final HMaster master) 752 throws IOException { 753 Map<String, Integer> map = getTableFragmentation(master); 754 return map.isEmpty() ? -1 : map.get("-TOTAL-"); 755 } 756 757 /** 758 * Runs through the HBase rootdir and checks how many stores for each table 759 * have more than one file in them. Checks -ROOT- and hbase:meta too. The total 760 * percentage across all tables is stored under the special key "-TOTAL-". 761 * 762 * @param master The master defining the HBase root and file system. 763 * @return A map for each table and its percentage (never null). 764 * 765 * @throws IOException When scanning the directory fails. 766 */ 767 public static Map<String, Integer> getTableFragmentation(final HMaster master) 768 throws IOException { 769 Path path = CommonFSUtils.getRootDir(master.getConfiguration()); 770 // since HMaster.getFileSystem() is package private 771 FileSystem fs = path.getFileSystem(master.getConfiguration()); 772 return getTableFragmentation(fs, path); 773 } 774 775 /** 776 * Runs through the HBase rootdir and checks how many stores for each table 777 * have more than one file in them. Checks -ROOT- and hbase:meta too. The total 778 * percentage across all tables is stored under the special key "-TOTAL-". 779 * 780 * @param fs The file system to use 781 * @param hbaseRootDir The root directory to scan 782 * @return A map for each table and its percentage (never null) 783 * @throws IOException When scanning the directory fails 784 */ 785 public static Map<String, Integer> getTableFragmentation( 786 final FileSystem fs, final Path hbaseRootDir) 787 throws IOException { 788 Map<String, Integer> frags = new HashMap<>(); 789 int cfCountTotal = 0; 790 int cfFragTotal = 0; 791 PathFilter regionFilter = new RegionDirFilter(fs); 792 PathFilter familyFilter = new FamilyDirFilter(fs); 793 List<Path> tableDirs = getTableDirs(fs, hbaseRootDir); 794 for (Path d : tableDirs) { 795 int cfCount = 0; 796 int cfFrag = 0; 797 FileStatus[] regionDirs = fs.listStatus(d, regionFilter); 798 for (FileStatus regionDir : regionDirs) { 799 Path dd = regionDir.getPath(); 800 // else its a region name, now look in region for families 801 FileStatus[] familyDirs = fs.listStatus(dd, familyFilter); 802 for (FileStatus familyDir : familyDirs) { 803 cfCount++; 804 cfCountTotal++; 805 Path family = familyDir.getPath(); 806 // now in family make sure only one file 807 FileStatus[] familyStatus = fs.listStatus(family); 808 if (familyStatus.length > 1) { 809 cfFrag++; 810 cfFragTotal++; 811 } 812 } 813 } 814 // compute percentage per table and store in result list 815 frags.put(CommonFSUtils.getTableName(d).getNameAsString(), 816 cfCount == 0? 0: Math.round((float) cfFrag / cfCount * 100)); 817 } 818 // set overall percentage for all tables 819 frags.put("-TOTAL-", 820 cfCountTotal == 0? 0: Math.round((float) cfFragTotal / cfCountTotal * 100)); 821 return frags; 822 } 823 824 /** 825 * A {@link PathFilter} that returns only regular files. 826 */ 827 static class FileFilter extends AbstractFileStatusFilter { 828 private final FileSystem fs; 829 830 public FileFilter(final FileSystem fs) { 831 this.fs = fs; 832 } 833 834 @Override 835 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 836 try { 837 return isFile(fs, isDir, p); 838 } catch (IOException e) { 839 LOG.warn("Unable to verify if path={} is a regular file", p, e); 840 return false; 841 } 842 } 843 } 844 845 /** 846 * Directory filter that doesn't include any of the directories in the specified blacklist 847 */ 848 public static class BlackListDirFilter extends AbstractFileStatusFilter { 849 private final FileSystem fs; 850 private List<String> blacklist; 851 852 /** 853 * Create a filter on the givem filesystem with the specified blacklist 854 * @param fs filesystem to filter 855 * @param directoryNameBlackList list of the names of the directories to filter. If 856 * <tt>null</tt>, all directories are returned 857 */ 858 @SuppressWarnings("unchecked") 859 public BlackListDirFilter(final FileSystem fs, final List<String> directoryNameBlackList) { 860 this.fs = fs; 861 blacklist = 862 (List<String>) (directoryNameBlackList == null ? Collections.emptyList() 863 : directoryNameBlackList); 864 } 865 866 @Override 867 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 868 if (!isValidName(p.getName())) { 869 return false; 870 } 871 872 try { 873 return isDirectory(fs, isDir, p); 874 } catch (IOException e) { 875 LOG.warn("An error occurred while verifying if [{}] is a valid directory." 876 + " Returning 'not valid' and continuing.", p, e); 877 return false; 878 } 879 } 880 881 protected boolean isValidName(final String name) { 882 return !blacklist.contains(name); 883 } 884 } 885 886 /** 887 * A {@link PathFilter} that only allows directories. 888 */ 889 public static class DirFilter extends BlackListDirFilter { 890 891 public DirFilter(FileSystem fs) { 892 super(fs, null); 893 } 894 } 895 896 /** 897 * A {@link PathFilter} that returns usertable directories. To get all directories use the 898 * {@link BlackListDirFilter} with a <tt>null</tt> blacklist 899 */ 900 public static class UserTableDirFilter extends BlackListDirFilter { 901 public UserTableDirFilter(FileSystem fs) { 902 super(fs, HConstants.HBASE_NON_TABLE_DIRS); 903 } 904 905 @Override 906 protected boolean isValidName(final String name) { 907 if (!super.isValidName(name)) 908 return false; 909 910 try { 911 TableName.isLegalTableQualifierName(Bytes.toBytes(name)); 912 } catch (IllegalArgumentException e) { 913 LOG.info("Invalid table name: {}", name); 914 return false; 915 } 916 return true; 917 } 918 } 919 920 public static List<Path> getTableDirs(final FileSystem fs, final Path rootdir) 921 throws IOException { 922 List<Path> tableDirs = new ArrayList<>(); 923 924 for (FileStatus status : fs 925 .globStatus(new Path(rootdir, new Path(HConstants.BASE_NAMESPACE_DIR, "*")))) { 926 tableDirs.addAll(FSUtils.getLocalTableDirs(fs, status.getPath())); 927 } 928 return tableDirs; 929 } 930 931 /** 932 * @param fs 933 * @param rootdir 934 * @return All the table directories under <code>rootdir</code>. Ignore non table hbase folders such as 935 * .logs, .oldlogs, .corrupt folders. 936 * @throws IOException 937 */ 938 public static List<Path> getLocalTableDirs(final FileSystem fs, final Path rootdir) 939 throws IOException { 940 // presumes any directory under hbase.rootdir is a table 941 FileStatus[] dirs = fs.listStatus(rootdir, new UserTableDirFilter(fs)); 942 List<Path> tabledirs = new ArrayList<>(dirs.length); 943 for (FileStatus dir: dirs) { 944 tabledirs.add(dir.getPath()); 945 } 946 return tabledirs; 947 } 948 949 /** 950 * Filter for all dirs that don't start with '.' 951 */ 952 public static class RegionDirFilter extends AbstractFileStatusFilter { 953 // This pattern will accept 0.90+ style hex region dirs and older numeric region dir names. 954 final public static Pattern regionDirPattern = Pattern.compile("^[0-9a-f]*$"); 955 final FileSystem fs; 956 957 public RegionDirFilter(FileSystem fs) { 958 this.fs = fs; 959 } 960 961 @Override 962 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 963 if (!regionDirPattern.matcher(p.getName()).matches()) { 964 return false; 965 } 966 967 try { 968 return isDirectory(fs, isDir, p); 969 } catch (IOException ioe) { 970 // Maybe the file was moved or the fs was disconnected. 971 LOG.warn("Skipping file {} due to IOException", p, ioe); 972 return false; 973 } 974 } 975 } 976 977 /** 978 * Given a particular table dir, return all the regiondirs inside it, excluding files such as 979 * .tableinfo 980 * @param fs A file system for the Path 981 * @param tableDir Path to a specific table directory <hbase.rootdir>/<tabledir> 982 * @return List of paths to valid region directories in table dir. 983 * @throws IOException 984 */ 985 public static List<Path> getRegionDirs(final FileSystem fs, final Path tableDir) throws IOException { 986 // assumes we are in a table dir. 987 List<FileStatus> rds = listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs)); 988 if (rds == null) { 989 return Collections.emptyList(); 990 } 991 List<Path> regionDirs = new ArrayList<>(rds.size()); 992 for (FileStatus rdfs: rds) { 993 Path rdPath = rdfs.getPath(); 994 regionDirs.add(rdPath); 995 } 996 return regionDirs; 997 } 998 999 public static Path getRegionDirFromRootDir(Path rootDir, RegionInfo region) { 1000 return getRegionDirFromTableDir(CommonFSUtils.getTableDir(rootDir, region.getTable()), region); 1001 } 1002 1003 public static Path getRegionDirFromTableDir(Path tableDir, RegionInfo region) { 1004 return getRegionDirFromTableDir(tableDir, 1005 ServerRegionReplicaUtil.getRegionInfoForFs(region).getEncodedName()); 1006 } 1007 1008 public static Path getRegionDirFromTableDir(Path tableDir, String encodedRegionName) { 1009 return new Path(tableDir, encodedRegionName); 1010 } 1011 1012 /** 1013 * Filter for all dirs that are legal column family names. This is generally used for colfam 1014 * dirs <hbase.rootdir>/<tabledir>/<regiondir>/<colfamdir>. 1015 */ 1016 public static class FamilyDirFilter extends AbstractFileStatusFilter { 1017 final FileSystem fs; 1018 1019 public FamilyDirFilter(FileSystem fs) { 1020 this.fs = fs; 1021 } 1022 1023 @Override 1024 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 1025 try { 1026 // throws IAE if invalid 1027 HColumnDescriptor.isLegalFamilyName(Bytes.toBytes(p.getName())); 1028 } catch (IllegalArgumentException iae) { 1029 // path name is an invalid family name and thus is excluded. 1030 return false; 1031 } 1032 1033 try { 1034 return isDirectory(fs, isDir, p); 1035 } catch (IOException ioe) { 1036 // Maybe the file was moved or the fs was disconnected. 1037 LOG.warn("Skipping file {} due to IOException", p, ioe); 1038 return false; 1039 } 1040 } 1041 } 1042 1043 /** 1044 * Given a particular region dir, return all the familydirs inside it 1045 * 1046 * @param fs A file system for the Path 1047 * @param regionDir Path to a specific region directory 1048 * @return List of paths to valid family directories in region dir. 1049 * @throws IOException 1050 */ 1051 public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir) throws IOException { 1052 // assumes we are in a region dir. 1053 FileStatus[] fds = fs.listStatus(regionDir, new FamilyDirFilter(fs)); 1054 List<Path> familyDirs = new ArrayList<>(fds.length); 1055 for (FileStatus fdfs: fds) { 1056 Path fdPath = fdfs.getPath(); 1057 familyDirs.add(fdPath); 1058 } 1059 return familyDirs; 1060 } 1061 1062 public static List<Path> getReferenceFilePaths(final FileSystem fs, final Path familyDir) throws IOException { 1063 List<FileStatus> fds = listStatusWithStatusFilter(fs, familyDir, new ReferenceFileFilter(fs)); 1064 if (fds == null) { 1065 return Collections.emptyList(); 1066 } 1067 List<Path> referenceFiles = new ArrayList<>(fds.size()); 1068 for (FileStatus fdfs: fds) { 1069 Path fdPath = fdfs.getPath(); 1070 referenceFiles.add(fdPath); 1071 } 1072 return referenceFiles; 1073 } 1074 1075 /** 1076 * Filter for HFiles that excludes reference files. 1077 */ 1078 public static class HFileFilter extends AbstractFileStatusFilter { 1079 final FileSystem fs; 1080 1081 public HFileFilter(FileSystem fs) { 1082 this.fs = fs; 1083 } 1084 1085 @Override 1086 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 1087 if (!StoreFileInfo.isHFile(p)) { 1088 return false; 1089 } 1090 1091 try { 1092 return isFile(fs, isDir, p); 1093 } catch (IOException ioe) { 1094 // Maybe the file was moved or the fs was disconnected. 1095 LOG.warn("Skipping file {} due to IOException", p, ioe); 1096 return false; 1097 } 1098 } 1099 } 1100 1101 /** 1102 * Filter for HFileLinks (StoreFiles and HFiles not included). 1103 * the filter itself does not consider if a link is file or not. 1104 */ 1105 public static class HFileLinkFilter implements PathFilter { 1106 1107 @Override 1108 public boolean accept(Path p) { 1109 return HFileLink.isHFileLink(p); 1110 } 1111 } 1112 1113 public static class ReferenceFileFilter extends AbstractFileStatusFilter { 1114 1115 private final FileSystem fs; 1116 1117 public ReferenceFileFilter(FileSystem fs) { 1118 this.fs = fs; 1119 } 1120 1121 @Override 1122 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 1123 if (!StoreFileInfo.isReference(p)) { 1124 return false; 1125 } 1126 1127 try { 1128 // only files can be references. 1129 return isFile(fs, isDir, p); 1130 } catch (IOException ioe) { 1131 // Maybe the file was moved or the fs was disconnected. 1132 LOG.warn("Skipping file {} due to IOException", p, ioe); 1133 return false; 1134 } 1135 } 1136 } 1137 1138 /** 1139 * Called every so-often by storefile map builder getTableStoreFilePathMap to 1140 * report progress. 1141 */ 1142 interface ProgressReporter { 1143 /** 1144 * @param status File or directory we are about to process. 1145 */ 1146 void progress(FileStatus status); 1147 } 1148 1149 /** 1150 * Runs through the HBase rootdir/tablename and creates a reverse lookup map for 1151 * table StoreFile names to the full Path. 1152 * <br> 1153 * Example...<br> 1154 * Key = 3944417774205889744 <br> 1155 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1156 * 1157 * @param map map to add values. If null, this method will create and populate one to return 1158 * @param fs The file system to use. 1159 * @param hbaseRootDir The root directory to scan. 1160 * @param tableName name of the table to scan. 1161 * @return Map keyed by StoreFile name with a value of the full Path. 1162 * @throws IOException When scanning the directory fails. 1163 * @throws InterruptedException 1164 */ 1165 public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map, 1166 final FileSystem fs, final Path hbaseRootDir, TableName tableName) 1167 throws IOException, InterruptedException { 1168 return getTableStoreFilePathMap(map, fs, hbaseRootDir, tableName, null, null, 1169 (ProgressReporter)null); 1170 } 1171 1172 /** 1173 * Runs through the HBase rootdir/tablename and creates a reverse lookup map for 1174 * table StoreFile names to the full Path. Note that because this method can be called 1175 * on a 'live' HBase system that we will skip files that no longer exist by the time 1176 * we traverse them and similarly the user of the result needs to consider that some 1177 * entries in this map may not exist by the time this call completes. 1178 * <br> 1179 * Example...<br> 1180 * Key = 3944417774205889744 <br> 1181 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1182 * 1183 * @param resultMap map to add values. If null, this method will create and populate one to return 1184 * @param fs The file system to use. 1185 * @param hbaseRootDir The root directory to scan. 1186 * @param tableName name of the table to scan. 1187 * @param sfFilter optional path filter to apply to store files 1188 * @param executor optional executor service to parallelize this operation 1189 * @param progressReporter Instance or null; gets called every time we move to new region of 1190 * family dir and for each store file. 1191 * @return Map keyed by StoreFile name with a value of the full Path. 1192 * @throws IOException When scanning the directory fails. 1193 * @deprecated Since 2.3.0. For removal in hbase4. Use ProgressReporter override instead. 1194 */ 1195 @Deprecated 1196 public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap, 1197 final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter, 1198 ExecutorService executor, final HbckErrorReporter progressReporter) 1199 throws IOException, InterruptedException { 1200 return getTableStoreFilePathMap(resultMap, fs, hbaseRootDir, tableName, sfFilter, executor, 1201 new ProgressReporter() { 1202 @Override 1203 public void progress(FileStatus status) { 1204 // status is not used in this implementation. 1205 progressReporter.progress(); 1206 } 1207 }); 1208 } 1209 1210 /** 1211 * Runs through the HBase rootdir/tablename and creates a reverse lookup map for 1212 * table StoreFile names to the full Path. Note that because this method can be called 1213 * on a 'live' HBase system that we will skip files that no longer exist by the time 1214 * we traverse them and similarly the user of the result needs to consider that some 1215 * entries in this map may not exist by the time this call completes. 1216 * <br> 1217 * Example...<br> 1218 * Key = 3944417774205889744 <br> 1219 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1220 * 1221 * @param resultMap map to add values. If null, this method will create and populate one 1222 * to return 1223 * @param fs The file system to use. 1224 * @param hbaseRootDir The root directory to scan. 1225 * @param tableName name of the table to scan. 1226 * @param sfFilter optional path filter to apply to store files 1227 * @param executor optional executor service to parallelize this operation 1228 * @param progressReporter Instance or null; gets called every time we move to new region of 1229 * family dir and for each store file. 1230 * @return Map keyed by StoreFile name with a value of the full Path. 1231 * @throws IOException When scanning the directory fails. 1232 * @throws InterruptedException the thread is interrupted, either before or during the activity. 1233 */ 1234 public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap, 1235 final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter, 1236 ExecutorService executor, final ProgressReporter progressReporter) 1237 throws IOException, InterruptedException { 1238 1239 final Map<String, Path> finalResultMap = 1240 resultMap == null ? new ConcurrentHashMap<>(128, 0.75f, 32) : resultMap; 1241 1242 // only include the directory paths to tables 1243 Path tableDir = CommonFSUtils.getTableDir(hbaseRootDir, tableName); 1244 // Inside a table, there are compaction.dir directories to skip. Otherwise, all else 1245 // should be regions. 1246 final FamilyDirFilter familyFilter = new FamilyDirFilter(fs); 1247 final Vector<Exception> exceptions = new Vector<>(); 1248 1249 try { 1250 List<FileStatus> regionDirs = FSUtils.listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs)); 1251 if (regionDirs == null) { 1252 return finalResultMap; 1253 } 1254 1255 final List<Future<?>> futures = new ArrayList<>(regionDirs.size()); 1256 1257 for (FileStatus regionDir : regionDirs) { 1258 if (null != progressReporter) { 1259 progressReporter.progress(regionDir); 1260 } 1261 final Path dd = regionDir.getPath(); 1262 1263 if (!exceptions.isEmpty()) { 1264 break; 1265 } 1266 1267 Runnable getRegionStoreFileMapCall = new Runnable() { 1268 @Override 1269 public void run() { 1270 try { 1271 HashMap<String,Path> regionStoreFileMap = new HashMap<>(); 1272 List<FileStatus> familyDirs = FSUtils.listStatusWithStatusFilter(fs, dd, familyFilter); 1273 if (familyDirs == null) { 1274 if (!fs.exists(dd)) { 1275 LOG.warn("Skipping region because it no longer exists: " + dd); 1276 } else { 1277 LOG.warn("Skipping region because it has no family dirs: " + dd); 1278 } 1279 return; 1280 } 1281 for (FileStatus familyDir : familyDirs) { 1282 if (null != progressReporter) { 1283 progressReporter.progress(familyDir); 1284 } 1285 Path family = familyDir.getPath(); 1286 if (family.getName().equals(HConstants.RECOVERED_EDITS_DIR)) { 1287 continue; 1288 } 1289 // now in family, iterate over the StoreFiles and 1290 // put in map 1291 FileStatus[] familyStatus = fs.listStatus(family); 1292 for (FileStatus sfStatus : familyStatus) { 1293 if (null != progressReporter) { 1294 progressReporter.progress(sfStatus); 1295 } 1296 Path sf = sfStatus.getPath(); 1297 if (sfFilter == null || sfFilter.accept(sf)) { 1298 regionStoreFileMap.put( sf.getName(), sf); 1299 } 1300 } 1301 } 1302 finalResultMap.putAll(regionStoreFileMap); 1303 } catch (Exception e) { 1304 LOG.error("Could not get region store file map for region: " + dd, e); 1305 exceptions.add(e); 1306 } 1307 } 1308 }; 1309 1310 // If executor is available, submit async tasks to exec concurrently, otherwise 1311 // just do serial sync execution 1312 if (executor != null) { 1313 Future<?> future = executor.submit(getRegionStoreFileMapCall); 1314 futures.add(future); 1315 } else { 1316 FutureTask<?> future = new FutureTask<>(getRegionStoreFileMapCall, null); 1317 future.run(); 1318 futures.add(future); 1319 } 1320 } 1321 1322 // Ensure all pending tasks are complete (or that we run into an exception) 1323 for (Future<?> f : futures) { 1324 if (!exceptions.isEmpty()) { 1325 break; 1326 } 1327 try { 1328 f.get(); 1329 } catch (ExecutionException e) { 1330 LOG.error("Unexpected exec exception! Should've been caught already. (Bug?)", e); 1331 // Shouldn't happen, we already logged/caught any exceptions in the Runnable 1332 } 1333 } 1334 } catch (IOException e) { 1335 LOG.error("Cannot execute getTableStoreFilePathMap for " + tableName, e); 1336 exceptions.add(e); 1337 } finally { 1338 if (!exceptions.isEmpty()) { 1339 // Just throw the first exception as an indication something bad happened 1340 // Don't need to propagate all the exceptions, we already logged them all anyway 1341 Throwables.propagateIfPossible(exceptions.firstElement(), IOException.class); 1342 throw new IOException(exceptions.firstElement()); 1343 } 1344 } 1345 1346 return finalResultMap; 1347 } 1348 1349 public static int getRegionReferenceFileCount(final FileSystem fs, final Path p) { 1350 int result = 0; 1351 try { 1352 for (Path familyDir:getFamilyDirs(fs, p)){ 1353 result += getReferenceFilePaths(fs, familyDir).size(); 1354 } 1355 } catch (IOException e) { 1356 LOG.warn("Error counting reference files", e); 1357 } 1358 return result; 1359 } 1360 1361 /** 1362 * Runs through the HBase rootdir and creates a reverse lookup map for 1363 * table StoreFile names to the full Path. 1364 * <br> 1365 * Example...<br> 1366 * Key = 3944417774205889744 <br> 1367 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1368 * 1369 * @param fs The file system to use. 1370 * @param hbaseRootDir The root directory to scan. 1371 * @return Map keyed by StoreFile name with a value of the full Path. 1372 * @throws IOException When scanning the directory fails. 1373 */ 1374 public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs, 1375 final Path hbaseRootDir) 1376 throws IOException, InterruptedException { 1377 return getTableStoreFilePathMap(fs, hbaseRootDir, null, null, (ProgressReporter)null); 1378 } 1379 1380 /** 1381 * Runs through the HBase rootdir and creates a reverse lookup map for 1382 * table StoreFile names to the full Path. 1383 * <br> 1384 * Example...<br> 1385 * Key = 3944417774205889744 <br> 1386 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1387 * 1388 * @param fs The file system to use. 1389 * @param hbaseRootDir The root directory to scan. 1390 * @param sfFilter optional path filter to apply to store files 1391 * @param executor optional executor service to parallelize this operation 1392 * @param progressReporter Instance or null; gets called every time we move to new region of 1393 * family dir and for each store file. 1394 * @return Map keyed by StoreFile name with a value of the full Path. 1395 * @throws IOException When scanning the directory fails. 1396 * @deprecated Since 2.3.0. Will be removed in hbase4. Used {@link 1397 * #getTableStoreFilePathMap(FileSystem, Path, PathFilter, ExecutorService, ProgressReporter)} 1398 */ 1399 @Deprecated 1400 public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs, 1401 final Path hbaseRootDir, PathFilter sfFilter, ExecutorService executor, 1402 HbckErrorReporter progressReporter) 1403 throws IOException, InterruptedException { 1404 return getTableStoreFilePathMap(fs, hbaseRootDir, sfFilter, executor, 1405 new ProgressReporter() { 1406 @Override 1407 public void progress(FileStatus status) { 1408 // status is not used in this implementation. 1409 progressReporter.progress(); 1410 } 1411 }); 1412 } 1413 1414 /** 1415 * Runs through the HBase rootdir and creates a reverse lookup map for 1416 * table StoreFile names to the full Path. 1417 * <br> 1418 * Example...<br> 1419 * Key = 3944417774205889744 <br> 1420 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1421 * 1422 * @param fs The file system to use. 1423 * @param hbaseRootDir The root directory to scan. 1424 * @param sfFilter optional path filter to apply to store files 1425 * @param executor optional executor service to parallelize this operation 1426 * @param progressReporter Instance or null; gets called every time we move to new region of 1427 * family dir and for each store file. 1428 * @return Map keyed by StoreFile name with a value of the full Path. 1429 * @throws IOException When scanning the directory fails. 1430 * @throws InterruptedException 1431 */ 1432 public static Map<String, Path> getTableStoreFilePathMap( 1433 final FileSystem fs, final Path hbaseRootDir, PathFilter sfFilter, 1434 ExecutorService executor, ProgressReporter progressReporter) 1435 throws IOException, InterruptedException { 1436 ConcurrentHashMap<String, Path> map = new ConcurrentHashMap<>(1024, 0.75f, 32); 1437 1438 // if this method looks similar to 'getTableFragmentation' that is because 1439 // it was borrowed from it. 1440 1441 // only include the directory paths to tables 1442 for (Path tableDir : FSUtils.getTableDirs(fs, hbaseRootDir)) { 1443 getTableStoreFilePathMap(map, fs, hbaseRootDir, CommonFSUtils.getTableName(tableDir), 1444 sfFilter, executor, progressReporter); 1445 } 1446 return map; 1447 } 1448 1449 /** 1450 * Filters FileStatuses in an array and returns a list 1451 * 1452 * @param input An array of FileStatuses 1453 * @param filter A required filter to filter the array 1454 * @return A list of FileStatuses 1455 */ 1456 public static List<FileStatus> filterFileStatuses(FileStatus[] input, 1457 FileStatusFilter filter) { 1458 if (input == null) return null; 1459 return filterFileStatuses(Iterators.forArray(input), filter); 1460 } 1461 1462 /** 1463 * Filters FileStatuses in an iterator and returns a list 1464 * 1465 * @param input An iterator of FileStatuses 1466 * @param filter A required filter to filter the array 1467 * @return A list of FileStatuses 1468 */ 1469 public static List<FileStatus> filterFileStatuses(Iterator<FileStatus> input, 1470 FileStatusFilter filter) { 1471 if (input == null) return null; 1472 ArrayList<FileStatus> results = new ArrayList<>(); 1473 while (input.hasNext()) { 1474 FileStatus f = input.next(); 1475 if (filter.accept(f)) { 1476 results.add(f); 1477 } 1478 } 1479 return results; 1480 } 1481 1482 /** 1483 * Calls fs.listStatus() and treats FileNotFoundException as non-fatal 1484 * This accommodates differences between hadoop versions, where hadoop 1 1485 * does not throw a FileNotFoundException, and return an empty FileStatus[] 1486 * while Hadoop 2 will throw FileNotFoundException. 1487 * 1488 * @param fs file system 1489 * @param dir directory 1490 * @param filter file status filter 1491 * @return null if dir is empty or doesn't exist, otherwise FileStatus list 1492 */ 1493 public static List<FileStatus> listStatusWithStatusFilter(final FileSystem fs, 1494 final Path dir, final FileStatusFilter filter) throws IOException { 1495 FileStatus [] status = null; 1496 try { 1497 status = fs.listStatus(dir); 1498 } catch (FileNotFoundException fnfe) { 1499 LOG.trace("{} does not exist", dir); 1500 return null; 1501 } 1502 1503 if (ArrayUtils.getLength(status) == 0) { 1504 return null; 1505 } 1506 1507 if (filter == null) { 1508 return Arrays.asList(status); 1509 } else { 1510 List<FileStatus> status2 = filterFileStatuses(status, filter); 1511 if (status2 == null || status2.isEmpty()) { 1512 return null; 1513 } else { 1514 return status2; 1515 } 1516 } 1517 } 1518 1519 /** 1520 * This function is to scan the root path of the file system to get the 1521 * degree of locality for each region on each of the servers having at least 1522 * one block of that region. 1523 * This is used by the tool {@link org.apache.hadoop.hbase.master.RegionPlacementMaintainer} 1524 * 1525 * @param conf 1526 * the configuration to use 1527 * @return the mapping from region encoded name to a map of server names to 1528 * locality fraction 1529 * @throws IOException 1530 * in case of file system errors or interrupts 1531 */ 1532 public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS( 1533 final Configuration conf) throws IOException { 1534 return getRegionDegreeLocalityMappingFromFS( 1535 conf, null, 1536 conf.getInt(THREAD_POOLSIZE, DEFAULT_THREAD_POOLSIZE)); 1537 1538 } 1539 1540 /** 1541 * This function is to scan the root path of the file system to get the 1542 * degree of locality for each region on each of the servers having at least 1543 * one block of that region. 1544 * 1545 * @param conf 1546 * the configuration to use 1547 * @param desiredTable 1548 * the table you wish to scan locality for 1549 * @param threadPoolSize 1550 * the thread pool size to use 1551 * @return the mapping from region encoded name to a map of server names to 1552 * locality fraction 1553 * @throws IOException 1554 * in case of file system errors or interrupts 1555 */ 1556 public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS( 1557 final Configuration conf, final String desiredTable, int threadPoolSize) 1558 throws IOException { 1559 Map<String, Map<String, Float>> regionDegreeLocalityMapping = new ConcurrentHashMap<>(); 1560 getRegionLocalityMappingFromFS(conf, desiredTable, threadPoolSize, regionDegreeLocalityMapping); 1561 return regionDegreeLocalityMapping; 1562 } 1563 1564 /** 1565 * This function is to scan the root path of the file system to get either the 1566 * mapping between the region name and its best locality region server or the 1567 * degree of locality of each region on each of the servers having at least 1568 * one block of that region. The output map parameters are both optional. 1569 * 1570 * @param conf 1571 * the configuration to use 1572 * @param desiredTable 1573 * the table you wish to scan locality for 1574 * @param threadPoolSize 1575 * the thread pool size to use 1576 * @param regionDegreeLocalityMapping 1577 * the map into which to put the locality degree mapping or null, 1578 * must be a thread-safe implementation 1579 * @throws IOException 1580 * in case of file system errors or interrupts 1581 */ 1582 private static void getRegionLocalityMappingFromFS(final Configuration conf, 1583 final String desiredTable, int threadPoolSize, 1584 final Map<String, Map<String, Float>> regionDegreeLocalityMapping) throws IOException { 1585 final FileSystem fs = FileSystem.get(conf); 1586 final Path rootPath = CommonFSUtils.getRootDir(conf); 1587 final long startTime = EnvironmentEdgeManager.currentTime(); 1588 final Path queryPath; 1589 // The table files are in ${hbase.rootdir}/data/<namespace>/<table>/* 1590 if (null == desiredTable) { 1591 queryPath = 1592 new Path(new Path(rootPath, HConstants.BASE_NAMESPACE_DIR).toString() + "/*/*/*/"); 1593 } else { 1594 queryPath = new Path( 1595 CommonFSUtils.getTableDir(rootPath, TableName.valueOf(desiredTable)).toString() + "/*/"); 1596 } 1597 1598 // reject all paths that are not appropriate 1599 PathFilter pathFilter = new PathFilter() { 1600 @Override 1601 public boolean accept(Path path) { 1602 // this is the region name; it may get some noise data 1603 if (null == path) { 1604 return false; 1605 } 1606 1607 // no parent? 1608 Path parent = path.getParent(); 1609 if (null == parent) { 1610 return false; 1611 } 1612 1613 String regionName = path.getName(); 1614 if (null == regionName) { 1615 return false; 1616 } 1617 1618 if (!regionName.toLowerCase(Locale.ROOT).matches("[0-9a-f]+")) { 1619 return false; 1620 } 1621 return true; 1622 } 1623 }; 1624 1625 FileStatus[] statusList = fs.globStatus(queryPath, pathFilter); 1626 1627 if (LOG.isDebugEnabled()) { 1628 LOG.debug("Query Path: {} ; # list of files: {}", queryPath, Arrays.toString(statusList)); 1629 } 1630 1631 if (null == statusList) { 1632 return; 1633 } 1634 1635 // lower the number of threads in case we have very few expected regions 1636 threadPoolSize = Math.min(threadPoolSize, statusList.length); 1637 1638 // run in multiple threads 1639 final ExecutorService tpe = Executors.newFixedThreadPool(threadPoolSize, 1640 new ThreadFactoryBuilder().setNameFormat("FSRegionQuery-pool-%d").setDaemon(true) 1641 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 1642 try { 1643 // ignore all file status items that are not of interest 1644 for (FileStatus regionStatus : statusList) { 1645 if (null == regionStatus || !regionStatus.isDirectory()) { 1646 continue; 1647 } 1648 1649 final Path regionPath = regionStatus.getPath(); 1650 if (null != regionPath) { 1651 tpe.execute(new FSRegionScanner(fs, regionPath, null, regionDegreeLocalityMapping)); 1652 } 1653 } 1654 } finally { 1655 tpe.shutdown(); 1656 final long threadWakeFrequency = (long) conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 1657 HConstants.DEFAULT_THREAD_WAKE_FREQUENCY); 1658 try { 1659 // here we wait until TPE terminates, which is either naturally or by 1660 // exceptions in the execution of the threads 1661 while (!tpe.awaitTermination(threadWakeFrequency, 1662 TimeUnit.MILLISECONDS)) { 1663 // printing out rough estimate, so as to not introduce 1664 // AtomicInteger 1665 LOG.info("Locality checking is underway: { Scanned Regions : " 1666 + ((ThreadPoolExecutor) tpe).getCompletedTaskCount() + "/" 1667 + ((ThreadPoolExecutor) tpe).getTaskCount() + " }"); 1668 } 1669 } catch (InterruptedException e) { 1670 Thread.currentThread().interrupt(); 1671 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 1672 } 1673 } 1674 1675 long overhead = EnvironmentEdgeManager.currentTime() - startTime; 1676 LOG.info("Scan DFS for locality info takes {}ms", overhead); 1677 } 1678 1679 /** 1680 * Do our short circuit read setup. 1681 * Checks buffer size to use and whether to do checksumming in hbase or hdfs. 1682 * @param conf 1683 */ 1684 public static void setupShortCircuitRead(final Configuration conf) { 1685 // Check that the user has not set the "dfs.client.read.shortcircuit.skip.checksum" property. 1686 boolean shortCircuitSkipChecksum = 1687 conf.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false); 1688 boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true); 1689 if (shortCircuitSkipChecksum) { 1690 LOG.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not " + 1691 "be set to true." + (useHBaseChecksum ? " HBase checksum doesn't require " + 1692 "it, see https://issues.apache.org/jira/browse/HBASE-6868." : "")); 1693 assert !shortCircuitSkipChecksum; //this will fail if assertions are on 1694 } 1695 checkShortCircuitReadBufferSize(conf); 1696 } 1697 1698 /** 1699 * Check if short circuit read buffer size is set and if not, set it to hbase value. 1700 * @param conf 1701 */ 1702 public static void checkShortCircuitReadBufferSize(final Configuration conf) { 1703 final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2; 1704 final int notSet = -1; 1705 // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2 1706 final String dfsKey = "dfs.client.read.shortcircuit.buffer.size"; 1707 int size = conf.getInt(dfsKey, notSet); 1708 // If a size is set, return -- we will use it. 1709 if (size != notSet) return; 1710 // But short circuit buffer size is normally not set. Put in place the hbase wanted size. 1711 int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize); 1712 conf.setIfUnset(dfsKey, Integer.toString(hbaseSize)); 1713 } 1714 1715 /** 1716 * @param c 1717 * @return The DFSClient DFSHedgedReadMetrics instance or null if can't be found or not on hdfs. 1718 * @throws IOException 1719 */ 1720 public static DFSHedgedReadMetrics getDFSHedgedReadMetrics(final Configuration c) 1721 throws IOException { 1722 if (!CommonFSUtils.isHDFS(c)) { 1723 return null; 1724 } 1725 // getHedgedReadMetrics is package private. Get the DFSClient instance that is internal 1726 // to the DFS FS instance and make the method getHedgedReadMetrics accessible, then invoke it 1727 // to get the singleton instance of DFSHedgedReadMetrics shared by DFSClients. 1728 final String name = "getHedgedReadMetrics"; 1729 DFSClient dfsclient = ((DistributedFileSystem)FileSystem.get(c)).getClient(); 1730 Method m; 1731 try { 1732 m = dfsclient.getClass().getDeclaredMethod(name); 1733 } catch (NoSuchMethodException e) { 1734 LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " + 1735 e.getMessage()); 1736 return null; 1737 } catch (SecurityException e) { 1738 LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " + 1739 e.getMessage()); 1740 return null; 1741 } 1742 m.setAccessible(true); 1743 try { 1744 return (DFSHedgedReadMetrics)m.invoke(dfsclient); 1745 } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { 1746 LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " + 1747 e.getMessage()); 1748 return null; 1749 } 1750 } 1751 1752 public static List<Path> copyFilesParallel(FileSystem srcFS, Path src, FileSystem dstFS, Path dst, 1753 Configuration conf, int threads) throws IOException { 1754 ExecutorService pool = Executors.newFixedThreadPool(threads); 1755 List<Future<Void>> futures = new ArrayList<>(); 1756 List<Path> traversedPaths; 1757 try { 1758 traversedPaths = copyFiles(srcFS, src, dstFS, dst, conf, pool, futures); 1759 for (Future<Void> future : futures) { 1760 future.get(); 1761 } 1762 } catch (ExecutionException | InterruptedException | IOException e) { 1763 throw new IOException("Copy snapshot reference files failed", e); 1764 } finally { 1765 pool.shutdownNow(); 1766 } 1767 return traversedPaths; 1768 } 1769 1770 private static List<Path> copyFiles(FileSystem srcFS, Path src, FileSystem dstFS, Path dst, 1771 Configuration conf, ExecutorService pool, List<Future<Void>> futures) throws IOException { 1772 List<Path> traversedPaths = new ArrayList<>(); 1773 traversedPaths.add(dst); 1774 FileStatus currentFileStatus = srcFS.getFileStatus(src); 1775 if (currentFileStatus.isDirectory()) { 1776 if (!dstFS.mkdirs(dst)) { 1777 throw new IOException("Create directory failed: " + dst); 1778 } 1779 FileStatus[] subPaths = srcFS.listStatus(src); 1780 for (FileStatus subPath : subPaths) { 1781 traversedPaths.addAll(copyFiles(srcFS, subPath.getPath(), dstFS, 1782 new Path(dst, subPath.getPath().getName()), conf, pool, futures)); 1783 } 1784 } else { 1785 Future<Void> future = pool.submit(() -> { 1786 FileUtil.copy(srcFS, src, dstFS, dst, false, false, conf); 1787 return null; 1788 }); 1789 futures.add(future); 1790 } 1791 return traversedPaths; 1792 } 1793 1794 /** 1795 * @return A set containing all namenode addresses of fs 1796 */ 1797 private static Set<InetSocketAddress> getNNAddresses(DistributedFileSystem fs, 1798 Configuration conf) { 1799 Set<InetSocketAddress> addresses = new HashSet<>(); 1800 String serviceName = fs.getCanonicalServiceName(); 1801 1802 if (serviceName.startsWith("ha-hdfs")) { 1803 try { 1804 Map<String, Map<String, InetSocketAddress>> addressMap = 1805 DFSUtil.getNNServiceRpcAddressesForCluster(conf); 1806 String nameService = serviceName.substring(serviceName.indexOf(":") + 1); 1807 if (addressMap.containsKey(nameService)) { 1808 Map<String, InetSocketAddress> nnMap = addressMap.get(nameService); 1809 for (Map.Entry<String, InetSocketAddress> e2 : nnMap.entrySet()) { 1810 InetSocketAddress addr = e2.getValue(); 1811 addresses.add(addr); 1812 } 1813 } 1814 } catch (Exception e) { 1815 LOG.warn("DFSUtil.getNNServiceRpcAddresses failed. serviceName=" + serviceName, e); 1816 } 1817 } else { 1818 URI uri = fs.getUri(); 1819 int port = uri.getPort(); 1820 if (port < 0) { 1821 int idx = serviceName.indexOf(':'); 1822 port = Integer.parseInt(serviceName.substring(idx + 1)); 1823 } 1824 InetSocketAddress addr = new InetSocketAddress(uri.getHost(), port); 1825 addresses.add(addr); 1826 } 1827 1828 return addresses; 1829 } 1830 1831 /** 1832 * @param conf the Configuration of HBase 1833 * @return Whether srcFs and desFs are on same hdfs or not 1834 */ 1835 public static boolean isSameHdfs(Configuration conf, FileSystem srcFs, FileSystem desFs) { 1836 // By getCanonicalServiceName, we could make sure both srcFs and desFs 1837 // show a unified format which contains scheme, host and port. 1838 String srcServiceName = srcFs.getCanonicalServiceName(); 1839 String desServiceName = desFs.getCanonicalServiceName(); 1840 1841 if (srcServiceName == null || desServiceName == null) { 1842 return false; 1843 } 1844 if (srcServiceName.equals(desServiceName)) { 1845 return true; 1846 } 1847 if (srcServiceName.startsWith("ha-hdfs") && desServiceName.startsWith("ha-hdfs")) { 1848 Collection<String> internalNameServices = 1849 conf.getTrimmedStringCollection("dfs.internal.nameservices"); 1850 if (!internalNameServices.isEmpty()) { 1851 if (internalNameServices.contains(srcServiceName.split(":")[1])) { 1852 return true; 1853 } else { 1854 return false; 1855 } 1856 } 1857 } 1858 if (srcFs instanceof DistributedFileSystem && desFs instanceof DistributedFileSystem) { 1859 // If one serviceName is an HA format while the other is a non-HA format, 1860 // maybe they refer to the same FileSystem. 1861 // For example, srcFs is "ha-hdfs://nameservices" and desFs is "hdfs://activeNamenode:port" 1862 Set<InetSocketAddress> srcAddrs = getNNAddresses((DistributedFileSystem) srcFs, conf); 1863 Set<InetSocketAddress> desAddrs = getNNAddresses((DistributedFileSystem) desFs, conf); 1864 if (Sets.intersection(srcAddrs, desAddrs).size() > 0) { 1865 return true; 1866 } 1867 } 1868 1869 return false; 1870 } 1871}