001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import static org.apache.hadoop.hbase.util.LocatedBlockHelper.getLocatedBlockLocations; 021import static org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET; 022 023import edu.umd.cs.findbugs.annotations.CheckForNull; 024import java.io.ByteArrayInputStream; 025import java.io.DataInputStream; 026import java.io.EOFException; 027import java.io.FileNotFoundException; 028import java.io.IOException; 029import java.io.InterruptedIOException; 030import java.lang.reflect.InvocationTargetException; 031import java.lang.reflect.Method; 032import java.net.InetSocketAddress; 033import java.net.URI; 034import java.util.ArrayList; 035import java.util.Arrays; 036import java.util.Collection; 037import java.util.Collections; 038import java.util.HashMap; 039import java.util.HashSet; 040import java.util.Iterator; 041import java.util.List; 042import java.util.Locale; 043import java.util.Map; 044import java.util.Optional; 045import java.util.Set; 046import java.util.Vector; 047import java.util.concurrent.ConcurrentHashMap; 048import java.util.concurrent.ExecutionException; 049import java.util.concurrent.ExecutorService; 050import java.util.concurrent.Executors; 051import java.util.concurrent.Future; 052import java.util.concurrent.FutureTask; 053import java.util.concurrent.ThreadPoolExecutor; 054import java.util.concurrent.TimeUnit; 055import java.util.regex.Pattern; 056import org.apache.commons.lang3.ArrayUtils; 057import org.apache.hadoop.conf.Configuration; 058import org.apache.hadoop.fs.BlockLocation; 059import org.apache.hadoop.fs.FSDataInputStream; 060import org.apache.hadoop.fs.FSDataOutputStream; 061import org.apache.hadoop.fs.FileStatus; 062import org.apache.hadoop.fs.FileSystem; 063import org.apache.hadoop.fs.FileUtil; 064import org.apache.hadoop.fs.Path; 065import org.apache.hadoop.fs.PathFilter; 066import org.apache.hadoop.fs.StorageType; 067import org.apache.hadoop.fs.permission.FsPermission; 068import org.apache.hadoop.hbase.ClusterId; 069import org.apache.hadoop.hbase.HConstants; 070import org.apache.hadoop.hbase.HDFSBlocksDistribution; 071import org.apache.hadoop.hbase.TableName; 072import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 073import org.apache.hadoop.hbase.client.RegionInfo; 074import org.apache.hadoop.hbase.client.RegionInfoBuilder; 075import org.apache.hadoop.hbase.exceptions.DeserializationException; 076import org.apache.hadoop.hbase.fs.HFileSystem; 077import org.apache.hadoop.hbase.io.HFileLink; 078import org.apache.hadoop.hbase.master.HMaster; 079import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 080import org.apache.hadoop.hdfs.DFSClient; 081import org.apache.hadoop.hdfs.DFSHedgedReadMetrics; 082import org.apache.hadoop.hdfs.DFSUtil; 083import org.apache.hadoop.hdfs.DistributedFileSystem; 084import org.apache.hadoop.hdfs.client.HdfsDataInputStream; 085import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 086import org.apache.hadoop.hdfs.protocol.LocatedBlock; 087import org.apache.hadoop.io.IOUtils; 088import org.apache.hadoop.ipc.RemoteException; 089import org.apache.yetus.audience.InterfaceAudience; 090import org.slf4j.Logger; 091import org.slf4j.LoggerFactory; 092 093import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 094import org.apache.hbase.thirdparty.com.google.common.collect.Iterators; 095import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 096import org.apache.hbase.thirdparty.com.google.common.primitives.Ints; 097import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 098 099import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 100import org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos; 101 102/** 103 * Utility methods for interacting with the underlying file system. 104 */ 105@InterfaceAudience.Private 106public final class FSUtils { 107 private static final Logger LOG = LoggerFactory.getLogger(FSUtils.class); 108 109 private static final String THREAD_POOLSIZE = "hbase.client.localityCheck.threadPoolSize"; 110 private static final int DEFAULT_THREAD_POOLSIZE = 2; 111 112 /** Set to true on Windows platforms */ 113 // currently only used in testing. TODO refactor into a test class 114 public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows"); 115 116 private static Class<?> safeModeClazz = null; 117 private static Class<?> safeModeActionClazz = null; 118 private static Object safeModeGet = null; 119 { 120 try { 121 safeModeClazz = Class.forName("org.apache.hadoop.fs.SafeMode"); 122 safeModeActionClazz = Class.forName("org.apache.hadoop.fs.SafeModeAction"); 123 safeModeGet = safeModeClazz.getField("SAFEMODE_GET").get(null); 124 } catch (ClassNotFoundException | NoSuchFieldException e) { 125 LOG.debug("SafeMode interface not in the classpath, this means Hadoop 3.3.5 or below."); 126 } catch (IllegalAccessException e) { 127 LOG.error("SafeModeAction.SAFEMODE_GET is not accessible. " 128 + "Unexpected Hadoop version or messy classpath?", e); 129 throw new RuntimeException(e); 130 } 131 } 132 133 private FSUtils() { 134 } 135 136 /** Returns True is <code>fs</code> is instance of DistributedFileSystem n */ 137 public static boolean isDistributedFileSystem(final FileSystem fs) throws IOException { 138 FileSystem fileSystem = fs; 139 // If passed an instance of HFileSystem, it fails instanceof DistributedFileSystem. 140 // Check its backing fs for dfs-ness. 141 if (fs instanceof HFileSystem) { 142 fileSystem = ((HFileSystem) fs).getBackingFs(); 143 } 144 return fileSystem instanceof DistributedFileSystem; 145 } 146 147 /** 148 * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the 149 * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true. Does not consider 150 * schema; i.e. if schemas different but path or subpath matches, the two will equate. 151 * @param pathToSearch Path we will be trying to match. 152 * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code> 153 */ 154 public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) { 155 Path tailPath = pathTail; 156 String tailName; 157 Path toSearch = pathToSearch; 158 String toSearchName; 159 boolean result = false; 160 161 if (pathToSearch.depth() != pathTail.depth()) { 162 return false; 163 } 164 165 do { 166 tailName = tailPath.getName(); 167 if (tailName == null || tailName.isEmpty()) { 168 result = true; 169 break; 170 } 171 toSearchName = toSearch.getName(); 172 if (toSearchName == null || toSearchName.isEmpty()) { 173 break; 174 } 175 // Move up a parent on each path for next go around. Path doesn't let us go off the end. 176 tailPath = tailPath.getParent(); 177 toSearch = toSearch.getParent(); 178 } while (tailName.equals(toSearchName)); 179 return result; 180 } 181 182 /** 183 * Delete the region directory if exists. 184 * @return True if deleted the region directory. 185 */ 186 public static boolean deleteRegionDir(final Configuration conf, final RegionInfo hri) 187 throws IOException { 188 Path rootDir = CommonFSUtils.getRootDir(conf); 189 FileSystem fs = rootDir.getFileSystem(conf); 190 return CommonFSUtils.deleteDirectory(fs, 191 new Path(CommonFSUtils.getTableDir(rootDir, hri.getTable()), hri.getEncodedName())); 192 } 193 194 /** 195 * Create the specified file on the filesystem. By default, this will: 196 * <ol> 197 * <li>overwrite the file if it exists</li> 198 * <li>apply the umask in the configuration (if it is enabled)</li> 199 * <li>use the fs configured buffer size (or 4096 if not set)</li> 200 * <li>use the configured column family replication or default replication if 201 * {@link ColumnFamilyDescriptorBuilder#DEFAULT_DFS_REPLICATION}</li> 202 * <li>use the default block size</li> 203 * <li>not track progress</li> 204 * </ol> 205 * @param conf configurations 206 * @param fs {@link FileSystem} on which to write the file 207 * @param path {@link Path} to the file to write 208 * @param perm permissions 209 * @param favoredNodes favored data nodes 210 * @return output stream to the created file 211 * @throws IOException if the file cannot be created 212 */ 213 public static FSDataOutputStream create(Configuration conf, FileSystem fs, Path path, 214 FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException { 215 if (fs instanceof HFileSystem) { 216 FileSystem backingFs = ((HFileSystem) fs).getBackingFs(); 217 if (backingFs instanceof DistributedFileSystem) { 218 short replication = Short.parseShort(conf.get(ColumnFamilyDescriptorBuilder.DFS_REPLICATION, 219 String.valueOf(ColumnFamilyDescriptorBuilder.DEFAULT_DFS_REPLICATION))); 220 DistributedFileSystem.HdfsDataOutputStreamBuilder builder = 221 ((DistributedFileSystem) backingFs).createFile(path).recursive().permission(perm) 222 .create(); 223 if (favoredNodes != null) { 224 builder.favoredNodes(favoredNodes); 225 } 226 if (replication > 0) { 227 builder.replication(replication); 228 } 229 return builder.build(); 230 } 231 232 } 233 return CommonFSUtils.create(fs, path, perm, true); 234 } 235 236 /** 237 * Checks to see if the specified file system is available 238 * @param fs filesystem 239 * @throws IOException e 240 */ 241 public static void checkFileSystemAvailable(final FileSystem fs) throws IOException { 242 if (!(fs instanceof DistributedFileSystem)) { 243 return; 244 } 245 IOException exception = null; 246 DistributedFileSystem dfs = (DistributedFileSystem) fs; 247 try { 248 if (dfs.exists(new Path("/"))) { 249 return; 250 } 251 } catch (IOException e) { 252 exception = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; 253 } 254 try { 255 fs.close(); 256 } catch (Exception e) { 257 LOG.error("file system close failed: ", e); 258 } 259 throw new IOException("File system is not available", exception); 260 } 261 262 /** 263 * Inquire the Active NameNode's safe mode status. 264 * @param dfs A DistributedFileSystem object representing the underlying HDFS. 265 * @return whether we're in safe mode 266 */ 267 private static boolean isInSafeMode(FileSystem dfs) throws IOException { 268 if (isDistributedFileSystem(dfs)) { 269 return ((DistributedFileSystem) dfs).setSafeMode(SAFEMODE_GET, true); 270 } else { 271 try { 272 Object ret = dfs.getClass() 273 .getMethod("setSafeMode", new Class[] { safeModeActionClazz, Boolean.class }) 274 .invoke(dfs, safeModeGet, true); 275 return (Boolean) ret; 276 } catch (InvocationTargetException | IllegalAccessException | NoSuchMethodException e) { 277 LOG.error("The file system does not support setSafeMode(). Abort.", e); 278 throw new RuntimeException(e); 279 } 280 } 281 } 282 283 /** 284 * Check whether dfs is in safemode. 285 */ 286 public static void checkDfsSafeMode(final Configuration conf) throws IOException { 287 boolean isInSafeMode = false; 288 FileSystem fs = FileSystem.get(conf); 289 if (supportSafeMode(fs)) { 290 isInSafeMode = isInSafeMode(fs); 291 } 292 if (isInSafeMode) { 293 throw new IOException("File system is in safemode, it can't be written now"); 294 } 295 } 296 297 /** 298 * Verifies current version of file system 299 * @param fs filesystem object 300 * @param rootdir root hbase directory 301 * @return null if no version file exists, version string otherwise 302 * @throws IOException if the version file fails to open 303 * @throws DeserializationException if the version data cannot be translated into a version 304 */ 305 public static String getVersion(FileSystem fs, Path rootdir) 306 throws IOException, DeserializationException { 307 final Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME); 308 FileStatus[] status = null; 309 try { 310 // hadoop 2.0 throws FNFE if directory does not exist. 311 // hadoop 1.0 returns null if directory does not exist. 312 status = fs.listStatus(versionFile); 313 } catch (FileNotFoundException fnfe) { 314 return null; 315 } 316 if (ArrayUtils.getLength(status) == 0) { 317 return null; 318 } 319 String version = null; 320 byte[] content = new byte[(int) status[0].getLen()]; 321 FSDataInputStream s = fs.open(versionFile); 322 try { 323 IOUtils.readFully(s, content, 0, content.length); 324 if (ProtobufUtil.isPBMagicPrefix(content)) { 325 version = parseVersionFrom(content); 326 } else { 327 // Presume it pre-pb format. 328 try (DataInputStream dis = new DataInputStream(new ByteArrayInputStream(content))) { 329 version = dis.readUTF(); 330 } 331 } 332 } catch (EOFException eof) { 333 LOG.warn("Version file was empty, odd, will try to set it."); 334 } finally { 335 s.close(); 336 } 337 return version; 338 } 339 340 /** 341 * Parse the content of the ${HBASE_ROOTDIR}/hbase.version file. 342 * @param bytes The byte content of the hbase.version file 343 * @return The version found in the file as a String 344 * @throws DeserializationException if the version data cannot be translated into a version 345 */ 346 static String parseVersionFrom(final byte[] bytes) throws DeserializationException { 347 ProtobufUtil.expectPBMagicPrefix(bytes); 348 int pblen = ProtobufUtil.lengthOfPBMagic(); 349 FSProtos.HBaseVersionFileContent.Builder builder = 350 FSProtos.HBaseVersionFileContent.newBuilder(); 351 try { 352 ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen); 353 return builder.getVersion(); 354 } catch (IOException e) { 355 // Convert 356 throw new DeserializationException(e); 357 } 358 } 359 360 /** 361 * Create the content to write into the ${HBASE_ROOTDIR}/hbase.version file. 362 * @param version Version to persist 363 * @return Serialized protobuf with <code>version</code> content and a bit of pb magic for a 364 * prefix. 365 */ 366 static byte[] toVersionByteArray(final String version) { 367 FSProtos.HBaseVersionFileContent.Builder builder = 368 FSProtos.HBaseVersionFileContent.newBuilder(); 369 return ProtobufUtil.prependPBMagic(builder.setVersion(version).build().toByteArray()); 370 } 371 372 /** 373 * Verifies current version of file system 374 * @param fs file system 375 * @param rootdir root directory of HBase installation 376 * @param message if true, issues a message on System.out 377 * @throws IOException if the version file cannot be opened 378 * @throws DeserializationException if the contents of the version file cannot be parsed 379 */ 380 public static void checkVersion(FileSystem fs, Path rootdir, boolean message) 381 throws IOException, DeserializationException { 382 checkVersion(fs, rootdir, message, 0, HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS); 383 } 384 385 /** 386 * Verifies current version of file system 387 * @param fs file system 388 * @param rootdir root directory of HBase installation 389 * @param message if true, issues a message on System.out 390 * @param wait wait interval 391 * @param retries number of times to retry 392 * @throws IOException if the version file cannot be opened 393 * @throws DeserializationException if the contents of the version file cannot be parsed 394 */ 395 public static void checkVersion(FileSystem fs, Path rootdir, boolean message, int wait, 396 int retries) throws IOException, DeserializationException { 397 String version = getVersion(fs, rootdir); 398 String msg; 399 if (version == null) { 400 if (!metaRegionExists(fs, rootdir)) { 401 // rootDir is empty (no version file and no root region) 402 // just create new version file (HBASE-1195) 403 setVersion(fs, rootdir, wait, retries); 404 return; 405 } else { 406 msg = "hbase.version file is missing. Is your hbase.rootdir valid? " 407 + "You can restore hbase.version file by running 'HBCK2 filesystem -fix'. " 408 + "See https://github.com/apache/hbase-operator-tools/tree/master/hbase-hbck2"; 409 } 410 } else if (version.compareTo(HConstants.FILE_SYSTEM_VERSION) == 0) { 411 return; 412 } else { 413 msg = "HBase file layout needs to be upgraded. Current filesystem version is " + version 414 + " but software requires version " + HConstants.FILE_SYSTEM_VERSION 415 + ". Consult http://hbase.apache.org/book.html for further information about " 416 + "upgrading HBase."; 417 } 418 419 // version is deprecated require migration 420 // Output on stdout so user sees it in terminal. 421 if (message) { 422 System.out.println("WARNING! " + msg); 423 } 424 throw new FileSystemVersionException(msg); 425 } 426 427 /** 428 * Sets version of file system 429 * @param fs filesystem object 430 * @param rootdir hbase root 431 * @throws IOException e 432 */ 433 public static void setVersion(FileSystem fs, Path rootdir) throws IOException { 434 setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, 0, 435 HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS); 436 } 437 438 /** 439 * Sets version of file system 440 * @param fs filesystem object 441 * @param rootdir hbase root 442 * @param wait time to wait for retry 443 * @param retries number of times to retry before failing 444 * @throws IOException e 445 */ 446 public static void setVersion(FileSystem fs, Path rootdir, int wait, int retries) 447 throws IOException { 448 setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, wait, retries); 449 } 450 451 /** 452 * Sets version of file system 453 * @param fs filesystem object 454 * @param rootdir hbase root directory 455 * @param version version to set 456 * @param wait time to wait for retry 457 * @param retries number of times to retry before throwing an IOException 458 * @throws IOException e 459 */ 460 public static void setVersion(FileSystem fs, Path rootdir, String version, int wait, int retries) 461 throws IOException { 462 Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME); 463 Path tempVersionFile = new Path(rootdir, 464 HConstants.HBASE_TEMP_DIRECTORY + Path.SEPARATOR + HConstants.VERSION_FILE_NAME); 465 while (true) { 466 try { 467 // Write the version to a temporary file 468 FSDataOutputStream s = fs.create(tempVersionFile); 469 try { 470 s.write(toVersionByteArray(version)); 471 s.close(); 472 s = null; 473 // Move the temp version file to its normal location. Returns false 474 // if the rename failed. Throw an IOE in that case. 475 if (!fs.rename(tempVersionFile, versionFile)) { 476 throw new IOException("Unable to move temp version file to " + versionFile); 477 } 478 } finally { 479 // Cleaning up the temporary if the rename failed would be trying 480 // too hard. We'll unconditionally create it again the next time 481 // through anyway, files are overwritten by default by create(). 482 483 // Attempt to close the stream on the way out if it is still open. 484 try { 485 if (s != null) s.close(); 486 } catch (IOException ignore) { 487 } 488 } 489 LOG.info("Created version file at " + rootdir.toString() + " with version=" + version); 490 return; 491 } catch (IOException e) { 492 if (retries > 0) { 493 LOG.debug("Unable to create version file at " + rootdir.toString() + ", retrying", e); 494 fs.delete(versionFile, false); 495 try { 496 if (wait > 0) { 497 Thread.sleep(wait); 498 } 499 } catch (InterruptedException ie) { 500 throw (InterruptedIOException) new InterruptedIOException().initCause(ie); 501 } 502 retries--; 503 } else { 504 throw e; 505 } 506 } 507 } 508 } 509 510 /** 511 * Checks that a cluster ID file exists in the HBase root directory 512 * @param fs the root directory FileSystem 513 * @param rootdir the HBase root directory in HDFS 514 * @param wait how long to wait between retries 515 * @return <code>true</code> if the file exists, otherwise <code>false</code> 516 * @throws IOException if checking the FileSystem fails 517 */ 518 public static boolean checkClusterIdExists(FileSystem fs, Path rootdir, long wait) 519 throws IOException { 520 while (true) { 521 try { 522 Path filePath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME); 523 return fs.exists(filePath); 524 } catch (IOException ioe) { 525 if (wait > 0L) { 526 LOG.warn("Unable to check cluster ID file in {}, retrying in {}ms", rootdir, wait, ioe); 527 try { 528 Thread.sleep(wait); 529 } catch (InterruptedException e) { 530 Thread.currentThread().interrupt(); 531 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 532 } 533 } else { 534 throw ioe; 535 } 536 } 537 } 538 } 539 540 /** 541 * Returns the value of the unique cluster ID stored for this HBase instance. 542 * @param fs the root directory FileSystem 543 * @param rootdir the path to the HBase root directory 544 * @return the unique cluster identifier 545 * @throws IOException if reading the cluster ID file fails 546 */ 547 public static ClusterId getClusterId(FileSystem fs, Path rootdir) throws IOException { 548 Path idPath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME); 549 ClusterId clusterId = null; 550 FileStatus status = fs.exists(idPath) ? fs.getFileStatus(idPath) : null; 551 if (status != null) { 552 int len = Ints.checkedCast(status.getLen()); 553 byte[] content = new byte[len]; 554 FSDataInputStream in = fs.open(idPath); 555 try { 556 in.readFully(content); 557 } catch (EOFException eof) { 558 LOG.warn("Cluster ID file {} is empty", idPath); 559 } finally { 560 in.close(); 561 } 562 try { 563 clusterId = ClusterId.parseFrom(content); 564 } catch (DeserializationException e) { 565 throw new IOException("content=" + Bytes.toString(content), e); 566 } 567 // If not pb'd, make it so. 568 if (!ProtobufUtil.isPBMagicPrefix(content)) { 569 String cid = null; 570 in = fs.open(idPath); 571 try { 572 cid = in.readUTF(); 573 clusterId = new ClusterId(cid); 574 } catch (EOFException eof) { 575 LOG.warn("Cluster ID file {} is empty", idPath); 576 } finally { 577 in.close(); 578 } 579 rewriteAsPb(fs, rootdir, idPath, clusterId); 580 } 581 return clusterId; 582 } else { 583 LOG.warn("Cluster ID file does not exist at {}", idPath); 584 } 585 return clusterId; 586 } 587 588 /** 589 * */ 590 private static void rewriteAsPb(final FileSystem fs, final Path rootdir, final Path p, 591 final ClusterId cid) throws IOException { 592 // Rewrite the file as pb. Move aside the old one first, write new 593 // then delete the moved-aside file. 594 Path movedAsideName = new Path(p + "." + EnvironmentEdgeManager.currentTime()); 595 if (!fs.rename(p, movedAsideName)) throw new IOException("Failed rename of " + p); 596 setClusterId(fs, rootdir, cid, 100); 597 if (!fs.delete(movedAsideName, false)) { 598 throw new IOException("Failed delete of " + movedAsideName); 599 } 600 LOG.debug("Rewrote the hbase.id file as pb"); 601 } 602 603 /** 604 * Writes a new unique identifier for this cluster to the "hbase.id" file in the HBase root 605 * directory. If any operations on the ID file fails, and {@code wait} is a positive value, the 606 * method will retry to produce the ID file until the thread is forcibly interrupted. 607 * @param fs the root directory FileSystem 608 * @param rootdir the path to the HBase root directory 609 * @param clusterId the unique identifier to store 610 * @param wait how long (in milliseconds) to wait between retries 611 * @throws IOException if writing to the FileSystem fails and no wait value 612 */ 613 public static void setClusterId(final FileSystem fs, final Path rootdir, 614 final ClusterId clusterId, final long wait) throws IOException { 615 616 final Path idFile = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME); 617 final Path tempDir = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY); 618 final Path tempIdFile = new Path(tempDir, HConstants.CLUSTER_ID_FILE_NAME); 619 620 LOG.debug("Create cluster ID file [{}] with ID: {}", idFile, clusterId); 621 622 while (true) { 623 Optional<IOException> failure = Optional.empty(); 624 625 LOG.debug("Write the cluster ID file to a temporary location: {}", tempIdFile); 626 try (FSDataOutputStream s = fs.create(tempIdFile)) { 627 s.write(clusterId.toByteArray()); 628 } catch (IOException ioe) { 629 failure = Optional.of(ioe); 630 } 631 632 if (!failure.isPresent()) { 633 try { 634 LOG.debug("Move the temporary cluster ID file to its target location [{}]:[{}]", 635 tempIdFile, idFile); 636 637 if (!fs.rename(tempIdFile, idFile)) { 638 failure = 639 Optional.of(new IOException("Unable to move temp cluster ID file to " + idFile)); 640 } 641 } catch (IOException ioe) { 642 failure = Optional.of(ioe); 643 } 644 } 645 646 if (failure.isPresent()) { 647 final IOException cause = failure.get(); 648 if (wait > 0L) { 649 LOG.warn("Unable to create cluster ID file in {}, retrying in {}ms", rootdir, wait, 650 cause); 651 try { 652 Thread.sleep(wait); 653 } catch (InterruptedException e) { 654 Thread.currentThread().interrupt(); 655 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 656 } 657 continue; 658 } else { 659 throw cause; 660 } 661 } else { 662 return; 663 } 664 } 665 } 666 667 /** 668 * If DFS, check safe mode and if so, wait until we clear it. 669 * @param conf configuration 670 * @param wait Sleep between retries 671 * @throws IOException e 672 */ 673 public static void waitOnSafeMode(final Configuration conf, final long wait) throws IOException { 674 FileSystem fs = FileSystem.get(conf); 675 if (!supportSafeMode(fs)) { 676 return; 677 } 678 // Make sure dfs is not in safe mode 679 while (isInSafeMode(fs)) { 680 LOG.info("Waiting for dfs to exit safe mode..."); 681 try { 682 Thread.sleep(wait); 683 } catch (InterruptedException e) { 684 Thread.currentThread().interrupt(); 685 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 686 } 687 } 688 } 689 690 public static boolean supportSafeMode(FileSystem fs) { 691 // return true if HDFS. 692 if (fs instanceof DistributedFileSystem) { 693 return true; 694 } 695 // return true if the file system implements SafeMode interface. 696 if (safeModeClazz != null) { 697 return (safeModeClazz.isAssignableFrom(fs.getClass())); 698 } 699 // return false if the file system is not HDFS and does not implement SafeMode interface. 700 return false; 701 } 702 703 /** 704 * Checks if meta region exists 705 * @param fs file system 706 * @param rootDir root directory of HBase installation 707 * @return true if exists 708 */ 709 public static boolean metaRegionExists(FileSystem fs, Path rootDir) throws IOException { 710 Path metaRegionDir = getRegionDirFromRootDir(rootDir, RegionInfoBuilder.FIRST_META_REGIONINFO); 711 return fs.exists(metaRegionDir); 712 } 713 714 /** 715 * Compute HDFS block distribution of a given HdfsDataInputStream. All HdfsDataInputStreams are 716 * backed by a series of LocatedBlocks, which are fetched periodically from the namenode. This 717 * method retrieves those blocks from the input stream and uses them to calculate 718 * HDFSBlockDistribution. The underlying method in DFSInputStream does attempt to use locally 719 * cached blocks, but may hit the namenode if the cache is determined to be incomplete. The method 720 * also involves making copies of all LocatedBlocks rather than return the underlying blocks 721 * themselves. 722 */ 723 static public HDFSBlocksDistribution 724 computeHDFSBlocksDistribution(HdfsDataInputStream inputStream) throws IOException { 725 List<LocatedBlock> blocks = inputStream.getAllBlocks(); 726 HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution(); 727 for (LocatedBlock block : blocks) { 728 String[] hosts = getHostsForLocations(block); 729 long len = block.getBlockSize(); 730 StorageType[] storageTypes = block.getStorageTypes(); 731 blocksDistribution.addHostsAndBlockWeight(hosts, len, storageTypes); 732 } 733 return blocksDistribution; 734 } 735 736 private static String[] getHostsForLocations(LocatedBlock block) { 737 DatanodeInfo[] locations = getLocatedBlockLocations(block); 738 String[] hosts = new String[locations.length]; 739 for (int i = 0; i < hosts.length; i++) { 740 hosts[i] = locations[i].getHostName(); 741 } 742 return hosts; 743 } 744 745 /** 746 * Compute HDFS blocks distribution of a given file, or a portion of the file 747 * @param fs file system 748 * @param status file status of the file 749 * @param start start position of the portion 750 * @param length length of the portion 751 * @return The HDFS blocks distribution 752 */ 753 static public HDFSBlocksDistribution computeHDFSBlocksDistribution(final FileSystem fs, 754 FileStatus status, long start, long length) throws IOException { 755 HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution(); 756 BlockLocation[] blockLocations = fs.getFileBlockLocations(status, start, length); 757 addToHDFSBlocksDistribution(blocksDistribution, blockLocations); 758 return blocksDistribution; 759 } 760 761 /** 762 * Update blocksDistribution with blockLocations 763 * @param blocksDistribution the hdfs blocks distribution 764 * @param blockLocations an array containing block location 765 */ 766 static public void addToHDFSBlocksDistribution(HDFSBlocksDistribution blocksDistribution, 767 BlockLocation[] blockLocations) throws IOException { 768 for (BlockLocation bl : blockLocations) { 769 String[] hosts = bl.getHosts(); 770 long len = bl.getLength(); 771 StorageType[] storageTypes = bl.getStorageTypes(); 772 blocksDistribution.addHostsAndBlockWeight(hosts, len, storageTypes); 773 } 774 } 775 776 // TODO move this method OUT of FSUtils. No dependencies to HMaster 777 /** 778 * Returns the total overall fragmentation percentage. Includes hbase:meta and -ROOT- as well. 779 * @param master The master defining the HBase root and file system 780 * @return A map for each table and its percentage (never null) 781 * @throws IOException When scanning the directory fails 782 */ 783 public static int getTotalTableFragmentation(final HMaster master) throws IOException { 784 Map<String, Integer> map = getTableFragmentation(master); 785 return map.isEmpty() ? -1 : map.get("-TOTAL-"); 786 } 787 788 /** 789 * Runs through the HBase rootdir and checks how many stores for each table have more than one 790 * file in them. Checks -ROOT- and hbase:meta too. The total percentage across all tables is 791 * stored under the special key "-TOTAL-". 792 * @param master The master defining the HBase root and file system. 793 * @return A map for each table and its percentage (never null). 794 * @throws IOException When scanning the directory fails. 795 */ 796 public static Map<String, Integer> getTableFragmentation(final HMaster master) 797 throws IOException { 798 Path path = CommonFSUtils.getRootDir(master.getConfiguration()); 799 // since HMaster.getFileSystem() is package private 800 FileSystem fs = path.getFileSystem(master.getConfiguration()); 801 return getTableFragmentation(fs, path); 802 } 803 804 /** 805 * Runs through the HBase rootdir and checks how many stores for each table have more than one 806 * file in them. Checks -ROOT- and hbase:meta too. The total percentage across all tables is 807 * stored under the special key "-TOTAL-". 808 * @param fs The file system to use 809 * @param hbaseRootDir The root directory to scan 810 * @return A map for each table and its percentage (never null) 811 * @throws IOException When scanning the directory fails 812 */ 813 public static Map<String, Integer> getTableFragmentation(final FileSystem fs, 814 final Path hbaseRootDir) throws IOException { 815 Map<String, Integer> frags = new HashMap<>(); 816 int cfCountTotal = 0; 817 int cfFragTotal = 0; 818 PathFilter regionFilter = new RegionDirFilter(fs); 819 PathFilter familyFilter = new FamilyDirFilter(fs); 820 List<Path> tableDirs = getTableDirs(fs, hbaseRootDir); 821 for (Path d : tableDirs) { 822 int cfCount = 0; 823 int cfFrag = 0; 824 FileStatus[] regionDirs = fs.listStatus(d, regionFilter); 825 for (FileStatus regionDir : regionDirs) { 826 Path dd = regionDir.getPath(); 827 // else its a region name, now look in region for families 828 FileStatus[] familyDirs = fs.listStatus(dd, familyFilter); 829 for (FileStatus familyDir : familyDirs) { 830 cfCount++; 831 cfCountTotal++; 832 Path family = familyDir.getPath(); 833 // now in family make sure only one file 834 FileStatus[] familyStatus = fs.listStatus(family); 835 if (familyStatus.length > 1) { 836 cfFrag++; 837 cfFragTotal++; 838 } 839 } 840 } 841 // compute percentage per table and store in result list 842 frags.put(CommonFSUtils.getTableName(d).getNameAsString(), 843 cfCount == 0 ? 0 : Math.round((float) cfFrag / cfCount * 100)); 844 } 845 // set overall percentage for all tables 846 frags.put("-TOTAL-", 847 cfCountTotal == 0 ? 0 : Math.round((float) cfFragTotal / cfCountTotal * 100)); 848 return frags; 849 } 850 851 public static void renameFile(FileSystem fs, Path src, Path dst) throws IOException { 852 if (fs.exists(dst) && !fs.delete(dst, false)) { 853 throw new IOException("Can not delete " + dst); 854 } 855 if (!fs.rename(src, dst)) { 856 throw new IOException("Can not rename from " + src + " to " + dst); 857 } 858 } 859 860 /** 861 * A {@link PathFilter} that returns only regular files. 862 */ 863 static class FileFilter extends AbstractFileStatusFilter { 864 private final FileSystem fs; 865 866 public FileFilter(final FileSystem fs) { 867 this.fs = fs; 868 } 869 870 @Override 871 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 872 try { 873 return isFile(fs, isDir, p); 874 } catch (IOException e) { 875 LOG.warn("Unable to verify if path={} is a regular file", p, e); 876 return false; 877 } 878 } 879 } 880 881 /** 882 * Directory filter that doesn't include any of the directories in the specified blacklist 883 */ 884 public static class BlackListDirFilter extends AbstractFileStatusFilter { 885 private final FileSystem fs; 886 private List<String> blacklist; 887 888 /** 889 * Create a filter on the givem filesystem with the specified blacklist 890 * @param fs filesystem to filter 891 * @param directoryNameBlackList list of the names of the directories to filter. If 892 * <tt>null</tt>, all directories are returned 893 */ 894 @SuppressWarnings("unchecked") 895 public BlackListDirFilter(final FileSystem fs, final List<String> directoryNameBlackList) { 896 this.fs = fs; 897 blacklist = (List<String>) (directoryNameBlackList == null 898 ? Collections.emptyList() 899 : directoryNameBlackList); 900 } 901 902 @Override 903 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 904 if (!isValidName(p.getName())) { 905 return false; 906 } 907 908 try { 909 return isDirectory(fs, isDir, p); 910 } catch (IOException e) { 911 LOG.warn("An error occurred while verifying if [{}] is a valid directory." 912 + " Returning 'not valid' and continuing.", p, e); 913 return false; 914 } 915 } 916 917 protected boolean isValidName(final String name) { 918 return !blacklist.contains(name); 919 } 920 } 921 922 /** 923 * A {@link PathFilter} that only allows directories. 924 */ 925 public static class DirFilter extends BlackListDirFilter { 926 927 public DirFilter(FileSystem fs) { 928 super(fs, null); 929 } 930 } 931 932 /** 933 * A {@link PathFilter} that returns usertable directories. To get all directories use the 934 * {@link BlackListDirFilter} with a <tt>null</tt> blacklist 935 */ 936 public static class UserTableDirFilter extends BlackListDirFilter { 937 public UserTableDirFilter(FileSystem fs) { 938 super(fs, HConstants.HBASE_NON_TABLE_DIRS); 939 } 940 941 @Override 942 protected boolean isValidName(final String name) { 943 if (!super.isValidName(name)) return false; 944 945 try { 946 TableName.isLegalTableQualifierName(Bytes.toBytes(name)); 947 } catch (IllegalArgumentException e) { 948 LOG.info("Invalid table name: {}", name); 949 return false; 950 } 951 return true; 952 } 953 } 954 955 public static List<Path> getTableDirs(final FileSystem fs, final Path rootdir) 956 throws IOException { 957 List<Path> tableDirs = new ArrayList<>(); 958 Path baseNamespaceDir = new Path(rootdir, HConstants.BASE_NAMESPACE_DIR); 959 if (fs.exists(baseNamespaceDir)) { 960 for (FileStatus status : fs.globStatus(new Path(baseNamespaceDir, "*"))) { 961 tableDirs.addAll(FSUtils.getLocalTableDirs(fs, status.getPath())); 962 } 963 } 964 return tableDirs; 965 } 966 967 /** 968 * @return All the table directories under <code>rootdir</code>. Ignore non table hbase folders 969 * such as .logs, .oldlogs, .corrupt folders. 970 */ 971 public static List<Path> getLocalTableDirs(final FileSystem fs, final Path rootdir) 972 throws IOException { 973 // presumes any directory under hbase.rootdir is a table 974 FileStatus[] dirs = fs.listStatus(rootdir, new UserTableDirFilter(fs)); 975 List<Path> tabledirs = new ArrayList<>(dirs.length); 976 for (FileStatus dir : dirs) { 977 tabledirs.add(dir.getPath()); 978 } 979 return tabledirs; 980 } 981 982 /** 983 * Filter for all dirs that don't start with '.' 984 */ 985 public static class RegionDirFilter extends AbstractFileStatusFilter { 986 // This pattern will accept 0.90+ style hex region dirs and older numeric region dir names. 987 final public static Pattern regionDirPattern = Pattern.compile("^[0-9a-f]*$"); 988 final FileSystem fs; 989 990 public RegionDirFilter(FileSystem fs) { 991 this.fs = fs; 992 } 993 994 @Override 995 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 996 if (!regionDirPattern.matcher(p.getName()).matches()) { 997 return false; 998 } 999 1000 try { 1001 return isDirectory(fs, isDir, p); 1002 } catch (IOException ioe) { 1003 // Maybe the file was moved or the fs was disconnected. 1004 LOG.warn("Skipping file {} due to IOException", p, ioe); 1005 return false; 1006 } 1007 } 1008 } 1009 1010 /** 1011 * Given a particular table dir, return all the regiondirs inside it, excluding files such as 1012 * .tableinfo 1013 * @param fs A file system for the Path 1014 * @param tableDir Path to a specific table directory <hbase.rootdir>/<tabledir> 1015 * @return List of paths to valid region directories in table dir. 1016 */ 1017 public static List<Path> getRegionDirs(final FileSystem fs, final Path tableDir) 1018 throws IOException { 1019 // assumes we are in a table dir. 1020 List<FileStatus> rds = listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs)); 1021 if (rds == null) { 1022 return Collections.emptyList(); 1023 } 1024 List<Path> regionDirs = new ArrayList<>(rds.size()); 1025 for (FileStatus rdfs : rds) { 1026 Path rdPath = rdfs.getPath(); 1027 regionDirs.add(rdPath); 1028 } 1029 return regionDirs; 1030 } 1031 1032 public static Path getRegionDirFromRootDir(Path rootDir, RegionInfo region) { 1033 return getRegionDirFromTableDir(CommonFSUtils.getTableDir(rootDir, region.getTable()), region); 1034 } 1035 1036 public static Path getRegionDirFromTableDir(Path tableDir, RegionInfo region) { 1037 return getRegionDirFromTableDir(tableDir, 1038 ServerRegionReplicaUtil.getRegionInfoForFs(region).getEncodedName()); 1039 } 1040 1041 public static Path getRegionDirFromTableDir(Path tableDir, String encodedRegionName) { 1042 return new Path(tableDir, encodedRegionName); 1043 } 1044 1045 /** 1046 * Filter for all dirs that are legal column family names. This is generally used for colfam dirs 1047 * <hbase.rootdir>/<tabledir>/<regiondir>/<colfamdir>. 1048 */ 1049 public static class FamilyDirFilter extends AbstractFileStatusFilter { 1050 final FileSystem fs; 1051 1052 public FamilyDirFilter(FileSystem fs) { 1053 this.fs = fs; 1054 } 1055 1056 @Override 1057 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 1058 try { 1059 // throws IAE if invalid 1060 ColumnFamilyDescriptorBuilder.isLegalColumnFamilyName(Bytes.toBytes(p.getName())); 1061 } catch (IllegalArgumentException iae) { 1062 // path name is an invalid family name and thus is excluded. 1063 return false; 1064 } 1065 1066 try { 1067 return isDirectory(fs, isDir, p); 1068 } catch (IOException ioe) { 1069 // Maybe the file was moved or the fs was disconnected. 1070 LOG.warn("Skipping file {} due to IOException", p, ioe); 1071 return false; 1072 } 1073 } 1074 } 1075 1076 /** 1077 * Given a particular region dir, return all the familydirs inside it 1078 * @param fs A file system for the Path 1079 * @param regionDir Path to a specific region directory 1080 * @return List of paths to valid family directories in region dir. 1081 */ 1082 public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir) 1083 throws IOException { 1084 // assumes we are in a region dir. 1085 return getFilePaths(fs, regionDir, new FamilyDirFilter(fs)); 1086 } 1087 1088 public static List<Path> getReferenceFilePaths(final FileSystem fs, final Path familyDir) 1089 throws IOException { 1090 return getFilePaths(fs, familyDir, new ReferenceFileFilter(fs)); 1091 } 1092 1093 public static List<Path> getReferenceAndLinkFilePaths(final FileSystem fs, final Path familyDir) 1094 throws IOException { 1095 return getFilePaths(fs, familyDir, new ReferenceAndLinkFileFilter(fs)); 1096 } 1097 1098 private static List<Path> getFilePaths(final FileSystem fs, final Path dir, 1099 final PathFilter pathFilter) throws IOException { 1100 FileStatus[] fds = fs.listStatus(dir, pathFilter); 1101 List<Path> files = new ArrayList<>(fds.length); 1102 for (FileStatus fdfs : fds) { 1103 Path fdPath = fdfs.getPath(); 1104 files.add(fdPath); 1105 } 1106 return files; 1107 } 1108 1109 public static int getRegionReferenceAndLinkFileCount(final FileSystem fs, final Path p) { 1110 int result = 0; 1111 try { 1112 for (Path familyDir : getFamilyDirs(fs, p)) { 1113 result += getReferenceAndLinkFilePaths(fs, familyDir).size(); 1114 } 1115 } catch (IOException e) { 1116 LOG.warn("Error Counting reference files.", e); 1117 } 1118 return result; 1119 } 1120 1121 public static class ReferenceAndLinkFileFilter implements PathFilter { 1122 1123 private final FileSystem fs; 1124 1125 public ReferenceAndLinkFileFilter(FileSystem fs) { 1126 this.fs = fs; 1127 } 1128 1129 @Override 1130 public boolean accept(Path rd) { 1131 try { 1132 // only files can be references. 1133 return !fs.getFileStatus(rd).isDirectory() 1134 && (StoreFileInfo.isReference(rd) || HFileLink.isHFileLink(rd)); 1135 } catch (IOException ioe) { 1136 // Maybe the file was moved or the fs was disconnected. 1137 LOG.warn("Skipping file " + rd + " due to IOException", ioe); 1138 return false; 1139 } 1140 } 1141 } 1142 1143 /** 1144 * Filter for HFiles that excludes reference files. 1145 */ 1146 public static class HFileFilter extends AbstractFileStatusFilter { 1147 final FileSystem fs; 1148 1149 public HFileFilter(FileSystem fs) { 1150 this.fs = fs; 1151 } 1152 1153 @Override 1154 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 1155 if (!StoreFileInfo.isHFile(p) && !StoreFileInfo.isMobFile(p)) { 1156 return false; 1157 } 1158 1159 try { 1160 return isFile(fs, isDir, p); 1161 } catch (IOException ioe) { 1162 // Maybe the file was moved or the fs was disconnected. 1163 LOG.warn("Skipping file {} due to IOException", p, ioe); 1164 return false; 1165 } 1166 } 1167 } 1168 1169 /** 1170 * Filter for HFileLinks (StoreFiles and HFiles not included). the filter itself does not consider 1171 * if a link is file or not. 1172 */ 1173 public static class HFileLinkFilter implements PathFilter { 1174 1175 @Override 1176 public boolean accept(Path p) { 1177 return HFileLink.isHFileLink(p); 1178 } 1179 } 1180 1181 public static class ReferenceFileFilter extends AbstractFileStatusFilter { 1182 1183 private final FileSystem fs; 1184 1185 public ReferenceFileFilter(FileSystem fs) { 1186 this.fs = fs; 1187 } 1188 1189 @Override 1190 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 1191 if (!StoreFileInfo.isReference(p)) { 1192 return false; 1193 } 1194 1195 try { 1196 // only files can be references. 1197 return isFile(fs, isDir, p); 1198 } catch (IOException ioe) { 1199 // Maybe the file was moved or the fs was disconnected. 1200 LOG.warn("Skipping file {} due to IOException", p, ioe); 1201 return false; 1202 } 1203 } 1204 } 1205 1206 /** 1207 * Called every so-often by storefile map builder getTableStoreFilePathMap to report progress. 1208 */ 1209 interface ProgressReporter { 1210 /** 1211 * @param status File or directory we are about to process. 1212 */ 1213 void progress(FileStatus status); 1214 } 1215 1216 /** 1217 * Runs through the HBase rootdir/tablename and creates a reverse lookup map for table StoreFile 1218 * names to the full Path. <br> 1219 * Example...<br> 1220 * Key = 3944417774205889744 <br> 1221 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1222 * @param map map to add values. If null, this method will create and populate one to 1223 * return 1224 * @param fs The file system to use. 1225 * @param hbaseRootDir The root directory to scan. 1226 * @param tableName name of the table to scan. 1227 * @return Map keyed by StoreFile name with a value of the full Path. 1228 * @throws IOException When scanning the directory fails. 1229 */ 1230 public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map, 1231 final FileSystem fs, final Path hbaseRootDir, TableName tableName) 1232 throws IOException, InterruptedException { 1233 return getTableStoreFilePathMap(map, fs, hbaseRootDir, tableName, null, null, 1234 (ProgressReporter) null); 1235 } 1236 1237 /** 1238 * Runs through the HBase rootdir/tablename and creates a reverse lookup map for table StoreFile 1239 * names to the full Path. Note that because this method can be called on a 'live' HBase system 1240 * that we will skip files that no longer exist by the time we traverse them and similarly the 1241 * user of the result needs to consider that some entries in this map may not exist by the time 1242 * this call completes. <br> 1243 * Example...<br> 1244 * Key = 3944417774205889744 <br> 1245 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1246 * @param resultMap map to add values. If null, this method will create and populate one to 1247 * return 1248 * @param fs The file system to use. 1249 * @param hbaseRootDir The root directory to scan. 1250 * @param tableName name of the table to scan. 1251 * @param sfFilter optional path filter to apply to store files 1252 * @param executor optional executor service to parallelize this operation 1253 * @param progressReporter Instance or null; gets called every time we move to new region of 1254 * family dir and for each store file. 1255 * @return Map keyed by StoreFile name with a value of the full Path. 1256 * @throws IOException When scanning the directory fails. 1257 * @deprecated Since 2.3.0. For removal in hbase4. Use ProgressReporter override instead. 1258 */ 1259 @Deprecated 1260 public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap, 1261 final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter, 1262 ExecutorService executor, final HbckErrorReporter progressReporter) 1263 throws IOException, InterruptedException { 1264 return getTableStoreFilePathMap(resultMap, fs, hbaseRootDir, tableName, sfFilter, executor, 1265 new ProgressReporter() { 1266 @Override 1267 public void progress(FileStatus status) { 1268 // status is not used in this implementation. 1269 progressReporter.progress(); 1270 } 1271 }); 1272 } 1273 1274 /** 1275 * Runs through the HBase rootdir/tablename and creates a reverse lookup map for table StoreFile 1276 * names to the full Path. Note that because this method can be called on a 'live' HBase system 1277 * that we will skip files that no longer exist by the time we traverse them and similarly the 1278 * user of the result needs to consider that some entries in this map may not exist by the time 1279 * this call completes. <br> 1280 * Example...<br> 1281 * Key = 3944417774205889744 <br> 1282 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1283 * @param resultMap map to add values. If null, this method will create and populate one to 1284 * return 1285 * @param fs The file system to use. 1286 * @param hbaseRootDir The root directory to scan. 1287 * @param tableName name of the table to scan. 1288 * @param sfFilter optional path filter to apply to store files 1289 * @param executor optional executor service to parallelize this operation 1290 * @param progressReporter Instance or null; gets called every time we move to new region of 1291 * family dir and for each store file. 1292 * @return Map keyed by StoreFile name with a value of the full Path. 1293 * @throws IOException When scanning the directory fails. 1294 * @throws InterruptedException the thread is interrupted, either before or during the activity. 1295 */ 1296 public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap, 1297 final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter, 1298 ExecutorService executor, final ProgressReporter progressReporter) 1299 throws IOException, InterruptedException { 1300 1301 final Map<String, Path> finalResultMap = 1302 resultMap == null ? new ConcurrentHashMap<>(128, 0.75f, 32) : resultMap; 1303 1304 // only include the directory paths to tables 1305 Path tableDir = CommonFSUtils.getTableDir(hbaseRootDir, tableName); 1306 // Inside a table, there are compaction.dir directories to skip. Otherwise, all else 1307 // should be regions. 1308 final FamilyDirFilter familyFilter = new FamilyDirFilter(fs); 1309 final Vector<Exception> exceptions = new Vector<>(); 1310 1311 try { 1312 List<FileStatus> regionDirs = 1313 FSUtils.listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs)); 1314 if (regionDirs == null) { 1315 return finalResultMap; 1316 } 1317 1318 final List<Future<?>> futures = new ArrayList<>(regionDirs.size()); 1319 1320 for (FileStatus regionDir : regionDirs) { 1321 if (null != progressReporter) { 1322 progressReporter.progress(regionDir); 1323 } 1324 final Path dd = regionDir.getPath(); 1325 1326 if (!exceptions.isEmpty()) { 1327 break; 1328 } 1329 1330 Runnable getRegionStoreFileMapCall = new Runnable() { 1331 @Override 1332 public void run() { 1333 try { 1334 HashMap<String, Path> regionStoreFileMap = new HashMap<>(); 1335 List<FileStatus> familyDirs = 1336 FSUtils.listStatusWithStatusFilter(fs, dd, familyFilter); 1337 if (familyDirs == null) { 1338 if (!fs.exists(dd)) { 1339 LOG.warn("Skipping region because it no longer exists: " + dd); 1340 } else { 1341 LOG.warn("Skipping region because it has no family dirs: " + dd); 1342 } 1343 return; 1344 } 1345 for (FileStatus familyDir : familyDirs) { 1346 if (null != progressReporter) { 1347 progressReporter.progress(familyDir); 1348 } 1349 Path family = familyDir.getPath(); 1350 if (family.getName().equals(HConstants.RECOVERED_EDITS_DIR)) { 1351 continue; 1352 } 1353 // now in family, iterate over the StoreFiles and 1354 // put in map 1355 FileStatus[] familyStatus = fs.listStatus(family); 1356 for (FileStatus sfStatus : familyStatus) { 1357 if (null != progressReporter) { 1358 progressReporter.progress(sfStatus); 1359 } 1360 Path sf = sfStatus.getPath(); 1361 if (sfFilter == null || sfFilter.accept(sf)) { 1362 regionStoreFileMap.put(sf.getName(), sf); 1363 } 1364 } 1365 } 1366 finalResultMap.putAll(regionStoreFileMap); 1367 } catch (Exception e) { 1368 LOG.error("Could not get region store file map for region: " + dd, e); 1369 exceptions.add(e); 1370 } 1371 } 1372 }; 1373 1374 // If executor is available, submit async tasks to exec concurrently, otherwise 1375 // just do serial sync execution 1376 if (executor != null) { 1377 Future<?> future = executor.submit(getRegionStoreFileMapCall); 1378 futures.add(future); 1379 } else { 1380 FutureTask<?> future = new FutureTask<>(getRegionStoreFileMapCall, null); 1381 future.run(); 1382 futures.add(future); 1383 } 1384 } 1385 1386 // Ensure all pending tasks are complete (or that we run into an exception) 1387 for (Future<?> f : futures) { 1388 if (!exceptions.isEmpty()) { 1389 break; 1390 } 1391 try { 1392 f.get(); 1393 } catch (ExecutionException e) { 1394 LOG.error("Unexpected exec exception! Should've been caught already. (Bug?)", e); 1395 // Shouldn't happen, we already logged/caught any exceptions in the Runnable 1396 } 1397 } 1398 } catch (IOException e) { 1399 LOG.error("Cannot execute getTableStoreFilePathMap for " + tableName, e); 1400 exceptions.add(e); 1401 } finally { 1402 if (!exceptions.isEmpty()) { 1403 // Just throw the first exception as an indication something bad happened 1404 // Don't need to propagate all the exceptions, we already logged them all anyway 1405 Throwables.propagateIfPossible(exceptions.firstElement(), IOException.class); 1406 throw new IOException(exceptions.firstElement()); 1407 } 1408 } 1409 1410 return finalResultMap; 1411 } 1412 1413 public static int getRegionReferenceFileCount(final FileSystem fs, final Path p) { 1414 int result = 0; 1415 try { 1416 for (Path familyDir : getFamilyDirs(fs, p)) { 1417 result += getReferenceFilePaths(fs, familyDir).size(); 1418 } 1419 } catch (IOException e) { 1420 LOG.warn("Error counting reference files", e); 1421 } 1422 return result; 1423 } 1424 1425 /** 1426 * Runs through the HBase rootdir and creates a reverse lookup map for table StoreFile names to 1427 * the full Path. <br> 1428 * Example...<br> 1429 * Key = 3944417774205889744 <br> 1430 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1431 * @param fs The file system to use. 1432 * @param hbaseRootDir The root directory to scan. 1433 * @return Map keyed by StoreFile name with a value of the full Path. 1434 * @throws IOException When scanning the directory fails. 1435 */ 1436 public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs, 1437 final Path hbaseRootDir) throws IOException, InterruptedException { 1438 return getTableStoreFilePathMap(fs, hbaseRootDir, null, null, (ProgressReporter) null); 1439 } 1440 1441 /** 1442 * Runs through the HBase rootdir and creates a reverse lookup map for table StoreFile names to 1443 * the full Path. <br> 1444 * Example...<br> 1445 * Key = 3944417774205889744 <br> 1446 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1447 * @param fs The file system to use. 1448 * @param hbaseRootDir The root directory to scan. 1449 * @param sfFilter optional path filter to apply to store files 1450 * @param executor optional executor service to parallelize this operation 1451 * @param progressReporter Instance or null; gets called every time we move to new region of 1452 * family dir and for each store file. 1453 * @return Map keyed by StoreFile name with a value of the full Path. 1454 * @throws IOException When scanning the directory fails. 1455 * @deprecated Since 2.3.0. Will be removed in hbase4. Used 1456 * {@link #getTableStoreFilePathMap(FileSystem, Path, PathFilter, ExecutorService, ProgressReporter)} 1457 */ 1458 @Deprecated 1459 public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs, 1460 final Path hbaseRootDir, PathFilter sfFilter, ExecutorService executor, 1461 HbckErrorReporter progressReporter) throws IOException, InterruptedException { 1462 return getTableStoreFilePathMap(fs, hbaseRootDir, sfFilter, executor, new ProgressReporter() { 1463 @Override 1464 public void progress(FileStatus status) { 1465 // status is not used in this implementation. 1466 progressReporter.progress(); 1467 } 1468 }); 1469 } 1470 1471 /** 1472 * Runs through the HBase rootdir and creates a reverse lookup map for table StoreFile names to 1473 * the full Path. <br> 1474 * Example...<br> 1475 * Key = 3944417774205889744 <br> 1476 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1477 * @param fs The file system to use. 1478 * @param hbaseRootDir The root directory to scan. 1479 * @param sfFilter optional path filter to apply to store files 1480 * @param executor optional executor service to parallelize this operation 1481 * @param progressReporter Instance or null; gets called every time we move to new region of 1482 * family dir and for each store file. 1483 * @return Map keyed by StoreFile name with a value of the full Path. 1484 * @throws IOException When scanning the directory fails. 1485 */ 1486 public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs, 1487 final Path hbaseRootDir, PathFilter sfFilter, ExecutorService executor, 1488 ProgressReporter progressReporter) throws IOException, InterruptedException { 1489 ConcurrentHashMap<String, Path> map = new ConcurrentHashMap<>(1024, 0.75f, 32); 1490 1491 // if this method looks similar to 'getTableFragmentation' that is because 1492 // it was borrowed from it. 1493 1494 // only include the directory paths to tables 1495 for (Path tableDir : FSUtils.getTableDirs(fs, hbaseRootDir)) { 1496 getTableStoreFilePathMap(map, fs, hbaseRootDir, CommonFSUtils.getTableName(tableDir), 1497 sfFilter, executor, progressReporter); 1498 } 1499 return map; 1500 } 1501 1502 /** 1503 * Filters FileStatuses in an array and returns a list 1504 * @param input An array of FileStatuses 1505 * @param filter A required filter to filter the array 1506 * @return A list of FileStatuses 1507 */ 1508 public static List<FileStatus> filterFileStatuses(FileStatus[] input, FileStatusFilter filter) { 1509 if (input == null) return null; 1510 return filterFileStatuses(Iterators.forArray(input), filter); 1511 } 1512 1513 /** 1514 * Filters FileStatuses in an iterator and returns a list 1515 * @param input An iterator of FileStatuses 1516 * @param filter A required filter to filter the array 1517 * @return A list of FileStatuses 1518 */ 1519 public static List<FileStatus> filterFileStatuses(Iterator<FileStatus> input, 1520 FileStatusFilter filter) { 1521 if (input == null) return null; 1522 ArrayList<FileStatus> results = new ArrayList<>(); 1523 while (input.hasNext()) { 1524 FileStatus f = input.next(); 1525 if (filter.accept(f)) { 1526 results.add(f); 1527 } 1528 } 1529 return results; 1530 } 1531 1532 /** 1533 * Calls fs.listStatus() and treats FileNotFoundException as non-fatal This accommodates 1534 * differences between hadoop versions, where hadoop 1 does not throw a FileNotFoundException, and 1535 * return an empty FileStatus[] while Hadoop 2 will throw FileNotFoundException. 1536 * @param fs file system 1537 * @param dir directory 1538 * @param filter file status filter 1539 * @return null if dir is empty or doesn't exist, otherwise FileStatus list 1540 */ 1541 public static List<FileStatus> listStatusWithStatusFilter(final FileSystem fs, final Path dir, 1542 final FileStatusFilter filter) throws IOException { 1543 FileStatus[] status = null; 1544 try { 1545 status = fs.listStatus(dir); 1546 } catch (FileNotFoundException fnfe) { 1547 LOG.trace("{} does not exist", dir); 1548 return null; 1549 } 1550 1551 if (ArrayUtils.getLength(status) == 0) { 1552 return null; 1553 } 1554 1555 if (filter == null) { 1556 return Arrays.asList(status); 1557 } else { 1558 List<FileStatus> status2 = filterFileStatuses(status, filter); 1559 if (status2 == null || status2.isEmpty()) { 1560 return null; 1561 } else { 1562 return status2; 1563 } 1564 } 1565 } 1566 1567 /** 1568 * This function is to scan the root path of the file system to get the degree of locality for 1569 * each region on each of the servers having at least one block of that region. This is used by 1570 * the tool {@link org.apache.hadoop.hbase.master.RegionPlacementMaintainer} the configuration to 1571 * use 1572 * @return the mapping from region encoded name to a map of server names to locality fraction in 1573 * case of file system errors or interrupts 1574 */ 1575 public static Map<String, Map<String, Float>> 1576 getRegionDegreeLocalityMappingFromFS(final Configuration conf) throws IOException { 1577 return getRegionDegreeLocalityMappingFromFS(conf, null, 1578 conf.getInt(THREAD_POOLSIZE, DEFAULT_THREAD_POOLSIZE)); 1579 1580 } 1581 1582 /** 1583 * This function is to scan the root path of the file system to get the degree of locality for 1584 * each region on each of the servers having at least one block of that region. the configuration 1585 * to use the table you wish to scan locality for the thread pool size to use 1586 * @return the mapping from region encoded name to a map of server names to locality fraction in 1587 * case of file system errors or interrupts 1588 */ 1589 public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS( 1590 final Configuration conf, final String desiredTable, int threadPoolSize) throws IOException { 1591 Map<String, Map<String, Float>> regionDegreeLocalityMapping = new ConcurrentHashMap<>(); 1592 getRegionLocalityMappingFromFS(conf, desiredTable, threadPoolSize, regionDegreeLocalityMapping); 1593 return regionDegreeLocalityMapping; 1594 } 1595 1596 /** 1597 * This function is to scan the root path of the file system to get either the mapping between the 1598 * region name and its best locality region server or the degree of locality of each region on 1599 * each of the servers having at least one block of that region. The output map parameters are 1600 * both optional. the configuration to use the table you wish to scan locality for the thread pool 1601 * size to use the map into which to put the locality degree mapping or null, must be a 1602 * thread-safe implementation in case of file system errors or interrupts 1603 */ 1604 private static void getRegionLocalityMappingFromFS(final Configuration conf, 1605 final String desiredTable, int threadPoolSize, 1606 final Map<String, Map<String, Float>> regionDegreeLocalityMapping) throws IOException { 1607 final FileSystem fs = FileSystem.get(conf); 1608 final Path rootPath = CommonFSUtils.getRootDir(conf); 1609 final long startTime = EnvironmentEdgeManager.currentTime(); 1610 final Path queryPath; 1611 // The table files are in ${hbase.rootdir}/data/<namespace>/<table>/* 1612 if (null == desiredTable) { 1613 queryPath = 1614 new Path(new Path(rootPath, HConstants.BASE_NAMESPACE_DIR).toString() + "/*/*/*/"); 1615 } else { 1616 queryPath = new Path( 1617 CommonFSUtils.getTableDir(rootPath, TableName.valueOf(desiredTable)).toString() + "/*/"); 1618 } 1619 1620 // reject all paths that are not appropriate 1621 PathFilter pathFilter = new PathFilter() { 1622 @Override 1623 public boolean accept(Path path) { 1624 // this is the region name; it may get some noise data 1625 if (null == path) { 1626 return false; 1627 } 1628 1629 // no parent? 1630 Path parent = path.getParent(); 1631 if (null == parent) { 1632 return false; 1633 } 1634 1635 String regionName = path.getName(); 1636 if (null == regionName) { 1637 return false; 1638 } 1639 1640 if (!regionName.toLowerCase(Locale.ROOT).matches("[0-9a-f]+")) { 1641 return false; 1642 } 1643 return true; 1644 } 1645 }; 1646 1647 FileStatus[] statusList = fs.globStatus(queryPath, pathFilter); 1648 1649 if (LOG.isDebugEnabled()) { 1650 LOG.debug("Query Path: {} ; # list of files: {}", queryPath, Arrays.toString(statusList)); 1651 } 1652 1653 if (null == statusList) { 1654 return; 1655 } 1656 1657 // lower the number of threads in case we have very few expected regions 1658 threadPoolSize = Math.min(threadPoolSize, statusList.length); 1659 1660 // run in multiple threads 1661 final ExecutorService tpe = Executors.newFixedThreadPool(threadPoolSize, 1662 new ThreadFactoryBuilder().setNameFormat("FSRegionQuery-pool-%d").setDaemon(true) 1663 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 1664 try { 1665 // ignore all file status items that are not of interest 1666 for (FileStatus regionStatus : statusList) { 1667 if (null == regionStatus || !regionStatus.isDirectory()) { 1668 continue; 1669 } 1670 1671 final Path regionPath = regionStatus.getPath(); 1672 if (null != regionPath) { 1673 tpe.execute(new FSRegionScanner(fs, regionPath, null, regionDegreeLocalityMapping)); 1674 } 1675 } 1676 } finally { 1677 tpe.shutdown(); 1678 final long threadWakeFrequency = (long) conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 1679 HConstants.DEFAULT_THREAD_WAKE_FREQUENCY); 1680 try { 1681 // here we wait until TPE terminates, which is either naturally or by 1682 // exceptions in the execution of the threads 1683 while (!tpe.awaitTermination(threadWakeFrequency, TimeUnit.MILLISECONDS)) { 1684 // printing out rough estimate, so as to not introduce 1685 // AtomicInteger 1686 LOG.info("Locality checking is underway: { Scanned Regions : " 1687 + ((ThreadPoolExecutor) tpe).getCompletedTaskCount() + "/" 1688 + ((ThreadPoolExecutor) tpe).getTaskCount() + " }"); 1689 } 1690 } catch (InterruptedException e) { 1691 Thread.currentThread().interrupt(); 1692 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 1693 } 1694 } 1695 1696 long overhead = EnvironmentEdgeManager.currentTime() - startTime; 1697 LOG.info("Scan DFS for locality info takes {}ms", overhead); 1698 } 1699 1700 /** 1701 * Do our short circuit read setup. Checks buffer size to use and whether to do checksumming in 1702 * hbase or hdfs. 1703 */ 1704 public static void setupShortCircuitRead(final Configuration conf) { 1705 // Check that the user has not set the "dfs.client.read.shortcircuit.skip.checksum" property. 1706 boolean shortCircuitSkipChecksum = 1707 conf.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false); 1708 boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true); 1709 if (shortCircuitSkipChecksum) { 1710 LOG.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not " 1711 + "be set to true." 1712 + (useHBaseChecksum 1713 ? " HBase checksum doesn't require " 1714 + "it, see https://issues.apache.org/jira/browse/HBASE-6868." 1715 : "")); 1716 assert !shortCircuitSkipChecksum; // this will fail if assertions are on 1717 } 1718 checkShortCircuitReadBufferSize(conf); 1719 } 1720 1721 /** 1722 * Check if short circuit read buffer size is set and if not, set it to hbase value. 1723 */ 1724 public static void checkShortCircuitReadBufferSize(final Configuration conf) { 1725 final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2; 1726 final int notSet = -1; 1727 // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2 1728 final String dfsKey = "dfs.client.read.shortcircuit.buffer.size"; 1729 int size = conf.getInt(dfsKey, notSet); 1730 // If a size is set, return -- we will use it. 1731 if (size != notSet) return; 1732 // But short circuit buffer size is normally not set. Put in place the hbase wanted size. 1733 int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize); 1734 conf.setIfUnset(dfsKey, Integer.toString(hbaseSize)); 1735 } 1736 1737 /** 1738 * Returns The DFSClient DFSHedgedReadMetrics instance or null if can't be found or not on hdfs. 1739 */ 1740 public static DFSHedgedReadMetrics getDFSHedgedReadMetrics(final Configuration c) 1741 throws IOException { 1742 if (!CommonFSUtils.isHDFS(c)) { 1743 return null; 1744 } 1745 // getHedgedReadMetrics is package private. Get the DFSClient instance that is internal 1746 // to the DFS FS instance and make the method getHedgedReadMetrics accessible, then invoke it 1747 // to get the singleton instance of DFSHedgedReadMetrics shared by DFSClients. 1748 final String name = "getHedgedReadMetrics"; 1749 DFSClient dfsclient = ((DistributedFileSystem) FileSystem.get(c)).getClient(); 1750 Method m; 1751 try { 1752 m = dfsclient.getClass().getDeclaredMethod(name); 1753 } catch (NoSuchMethodException e) { 1754 LOG.warn( 1755 "Failed find method " + name + " in dfsclient; no hedged read metrics: " + e.getMessage()); 1756 return null; 1757 } catch (SecurityException e) { 1758 LOG.warn( 1759 "Failed find method " + name + " in dfsclient; no hedged read metrics: " + e.getMessage()); 1760 return null; 1761 } 1762 m.setAccessible(true); 1763 try { 1764 return (DFSHedgedReadMetrics) m.invoke(dfsclient); 1765 } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { 1766 LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " 1767 + e.getMessage()); 1768 return null; 1769 } 1770 } 1771 1772 public static List<Path> copyFilesParallel(FileSystem srcFS, Path src, FileSystem dstFS, Path dst, 1773 Configuration conf, int threads) throws IOException { 1774 ExecutorService pool = Executors.newFixedThreadPool(threads); 1775 List<Future<Void>> futures = new ArrayList<>(); 1776 List<Path> traversedPaths; 1777 try { 1778 traversedPaths = copyFiles(srcFS, src, dstFS, dst, conf, pool, futures); 1779 for (Future<Void> future : futures) { 1780 future.get(); 1781 } 1782 } catch (ExecutionException | InterruptedException | IOException e) { 1783 throw new IOException("Copy snapshot reference files failed", e); 1784 } finally { 1785 pool.shutdownNow(); 1786 } 1787 return traversedPaths; 1788 } 1789 1790 private static List<Path> copyFiles(FileSystem srcFS, Path src, FileSystem dstFS, Path dst, 1791 Configuration conf, ExecutorService pool, List<Future<Void>> futures) throws IOException { 1792 List<Path> traversedPaths = new ArrayList<>(); 1793 traversedPaths.add(dst); 1794 FileStatus currentFileStatus = srcFS.getFileStatus(src); 1795 if (currentFileStatus.isDirectory()) { 1796 if (!dstFS.mkdirs(dst)) { 1797 throw new IOException("Create directory failed: " + dst); 1798 } 1799 FileStatus[] subPaths = srcFS.listStatus(src); 1800 for (FileStatus subPath : subPaths) { 1801 traversedPaths.addAll(copyFiles(srcFS, subPath.getPath(), dstFS, 1802 new Path(dst, subPath.getPath().getName()), conf, pool, futures)); 1803 } 1804 } else { 1805 Future<Void> future = pool.submit(() -> { 1806 FileUtil.copy(srcFS, src, dstFS, dst, false, false, conf); 1807 return null; 1808 }); 1809 futures.add(future); 1810 } 1811 return traversedPaths; 1812 } 1813 1814 /** Returns A set containing all namenode addresses of fs */ 1815 private static Set<InetSocketAddress> getNNAddresses(DistributedFileSystem fs, 1816 Configuration conf) { 1817 Set<InetSocketAddress> addresses = new HashSet<>(); 1818 String serviceName = fs.getCanonicalServiceName(); 1819 1820 if (serviceName.startsWith("ha-hdfs")) { 1821 try { 1822 Map<String, Map<String, InetSocketAddress>> addressMap = 1823 DFSUtil.getNNServiceRpcAddressesForCluster(conf); 1824 String nameService = serviceName.substring(serviceName.indexOf(":") + 1); 1825 if (addressMap.containsKey(nameService)) { 1826 Map<String, InetSocketAddress> nnMap = addressMap.get(nameService); 1827 for (Map.Entry<String, InetSocketAddress> e2 : nnMap.entrySet()) { 1828 InetSocketAddress addr = e2.getValue(); 1829 addresses.add(addr); 1830 } 1831 } 1832 } catch (Exception e) { 1833 LOG.warn("DFSUtil.getNNServiceRpcAddresses failed. serviceName=" + serviceName, e); 1834 } 1835 } else { 1836 URI uri = fs.getUri(); 1837 int port = uri.getPort(); 1838 if (port < 0) { 1839 int idx = serviceName.indexOf(':'); 1840 port = Integer.parseInt(serviceName.substring(idx + 1)); 1841 } 1842 InetSocketAddress addr = new InetSocketAddress(uri.getHost(), port); 1843 addresses.add(addr); 1844 } 1845 1846 return addresses; 1847 } 1848 1849 /** 1850 * @param conf the Configuration of HBase 1851 * @return Whether srcFs and desFs are on same hdfs or not 1852 */ 1853 public static boolean isSameHdfs(Configuration conf, FileSystem srcFs, FileSystem desFs) { 1854 // By getCanonicalServiceName, we could make sure both srcFs and desFs 1855 // show a unified format which contains scheme, host and port. 1856 String srcServiceName = srcFs.getCanonicalServiceName(); 1857 String desServiceName = desFs.getCanonicalServiceName(); 1858 1859 if (srcServiceName == null || desServiceName == null) { 1860 return false; 1861 } 1862 if (srcServiceName.equals(desServiceName)) { 1863 return true; 1864 } 1865 if (srcServiceName.startsWith("ha-hdfs") && desServiceName.startsWith("ha-hdfs")) { 1866 Collection<String> internalNameServices = 1867 conf.getTrimmedStringCollection("dfs.internal.nameservices"); 1868 if (!internalNameServices.isEmpty()) { 1869 if (internalNameServices.contains(srcServiceName.split(":")[1])) { 1870 return true; 1871 } else { 1872 return false; 1873 } 1874 } 1875 } 1876 if (srcFs instanceof DistributedFileSystem && desFs instanceof DistributedFileSystem) { 1877 // If one serviceName is an HA format while the other is a non-HA format, 1878 // maybe they refer to the same FileSystem. 1879 // For example, srcFs is "ha-hdfs://nameservices" and desFs is "hdfs://activeNamenode:port" 1880 Set<InetSocketAddress> srcAddrs = getNNAddresses((DistributedFileSystem) srcFs, conf); 1881 Set<InetSocketAddress> desAddrs = getNNAddresses((DistributedFileSystem) desFs, conf); 1882 if (Sets.intersection(srcAddrs, desAddrs).size() > 0) { 1883 return true; 1884 } 1885 } 1886 1887 return false; 1888 } 1889}