001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import static org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET; 021 022import edu.umd.cs.findbugs.annotations.CheckForNull; 023import java.io.ByteArrayInputStream; 024import java.io.DataInputStream; 025import java.io.EOFException; 026import java.io.FileNotFoundException; 027import java.io.IOException; 028import java.io.InterruptedIOException; 029import java.lang.reflect.InvocationTargetException; 030import java.lang.reflect.Method; 031import java.net.InetSocketAddress; 032import java.net.URI; 033import java.util.ArrayList; 034import java.util.Arrays; 035import java.util.Collection; 036import java.util.Collections; 037import java.util.HashMap; 038import java.util.HashSet; 039import java.util.Iterator; 040import java.util.List; 041import java.util.Locale; 042import java.util.Map; 043import java.util.Optional; 044import java.util.Set; 045import java.util.Vector; 046import java.util.concurrent.ConcurrentHashMap; 047import java.util.concurrent.ExecutionException; 048import java.util.concurrent.ExecutorService; 049import java.util.concurrent.Executors; 050import java.util.concurrent.Future; 051import java.util.concurrent.FutureTask; 052import java.util.concurrent.ThreadPoolExecutor; 053import java.util.concurrent.TimeUnit; 054import java.util.regex.Pattern; 055import org.apache.commons.lang3.ArrayUtils; 056import org.apache.hadoop.conf.Configuration; 057import org.apache.hadoop.fs.BlockLocation; 058import org.apache.hadoop.fs.FSDataInputStream; 059import org.apache.hadoop.fs.FSDataOutputStream; 060import org.apache.hadoop.fs.FileStatus; 061import org.apache.hadoop.fs.FileSystem; 062import org.apache.hadoop.fs.FileUtil; 063import org.apache.hadoop.fs.Path; 064import org.apache.hadoop.fs.PathFilter; 065import org.apache.hadoop.fs.StorageType; 066import org.apache.hadoop.fs.permission.FsPermission; 067import org.apache.hadoop.hbase.ClusterId; 068import org.apache.hadoop.hbase.HConstants; 069import org.apache.hadoop.hbase.HDFSBlocksDistribution; 070import org.apache.hadoop.hbase.TableName; 071import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 072import org.apache.hadoop.hbase.client.RegionInfo; 073import org.apache.hadoop.hbase.client.RegionInfoBuilder; 074import org.apache.hadoop.hbase.exceptions.DeserializationException; 075import org.apache.hadoop.hbase.fs.HFileSystem; 076import org.apache.hadoop.hbase.io.HFileLink; 077import org.apache.hadoop.hbase.master.HMaster; 078import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 079import org.apache.hadoop.hdfs.DFSClient; 080import org.apache.hadoop.hdfs.DFSHedgedReadMetrics; 081import org.apache.hadoop.hdfs.DFSUtil; 082import org.apache.hadoop.hdfs.DistributedFileSystem; 083import org.apache.hadoop.hdfs.client.HdfsDataInputStream; 084import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 085import org.apache.hadoop.hdfs.protocol.LocatedBlock; 086import org.apache.hadoop.io.IOUtils; 087import org.apache.hadoop.ipc.RemoteException; 088import org.apache.hadoop.util.Progressable; 089import org.apache.yetus.audience.InterfaceAudience; 090import org.slf4j.Logger; 091import org.slf4j.LoggerFactory; 092 093import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 094import org.apache.hbase.thirdparty.com.google.common.collect.Iterators; 095import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 096import org.apache.hbase.thirdparty.com.google.common.primitives.Ints; 097import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 098 099import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 100import org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos; 101 102/** 103 * Utility methods for interacting with the underlying file system. 104 */ 105@InterfaceAudience.Private 106public final class FSUtils { 107 private static final Logger LOG = LoggerFactory.getLogger(FSUtils.class); 108 109 private static final String THREAD_POOLSIZE = "hbase.client.localityCheck.threadPoolSize"; 110 private static final int DEFAULT_THREAD_POOLSIZE = 2; 111 112 /** Set to true on Windows platforms */ 113 // currently only used in testing. TODO refactor into a test class 114 public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows"); 115 116 private FSUtils() { 117 } 118 119 /** 120 * @return True is <code>fs</code> is instance of DistributedFileSystem n 121 */ 122 public static boolean isDistributedFileSystem(final FileSystem fs) throws IOException { 123 FileSystem fileSystem = fs; 124 // If passed an instance of HFileSystem, it fails instanceof DistributedFileSystem. 125 // Check its backing fs for dfs-ness. 126 if (fs instanceof HFileSystem) { 127 fileSystem = ((HFileSystem) fs).getBackingFs(); 128 } 129 return fileSystem instanceof DistributedFileSystem; 130 } 131 132 /** 133 * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the 134 * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true. Does not consider 135 * schema; i.e. if schemas different but path or subpath matches, the two will equate. 136 * @param pathToSearch Path we will be trying to match. n * @return True if <code>pathTail</code> 137 * is tail on the path of <code>pathToSearch</code> 138 */ 139 public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) { 140 Path tailPath = pathTail; 141 String tailName; 142 Path toSearch = pathToSearch; 143 String toSearchName; 144 boolean result = false; 145 146 if (pathToSearch.depth() != pathTail.depth()) { 147 return false; 148 } 149 150 do { 151 tailName = tailPath.getName(); 152 if (tailName == null || tailName.isEmpty()) { 153 result = true; 154 break; 155 } 156 toSearchName = toSearch.getName(); 157 if (toSearchName == null || toSearchName.isEmpty()) { 158 break; 159 } 160 // Move up a parent on each path for next go around. Path doesn't let us go off the end. 161 tailPath = tailPath.getParent(); 162 toSearch = toSearch.getParent(); 163 } while (tailName.equals(toSearchName)); 164 return result; 165 } 166 167 /** 168 * Delete the region directory if exists. 169 * @return True if deleted the region directory. n 170 */ 171 public static boolean deleteRegionDir(final Configuration conf, final RegionInfo hri) 172 throws IOException { 173 Path rootDir = CommonFSUtils.getRootDir(conf); 174 FileSystem fs = rootDir.getFileSystem(conf); 175 return CommonFSUtils.deleteDirectory(fs, 176 new Path(CommonFSUtils.getTableDir(rootDir, hri.getTable()), hri.getEncodedName())); 177 } 178 179 /** 180 * Create the specified file on the filesystem. By default, this will: 181 * <ol> 182 * <li>overwrite the file if it exists</li> 183 * <li>apply the umask in the configuration (if it is enabled)</li> 184 * <li>use the fs configured buffer size (or 4096 if not set)</li> 185 * <li>use the configured column family replication or default replication if 186 * {@link ColumnFamilyDescriptorBuilder#DEFAULT_DFS_REPLICATION}</li> 187 * <li>use the default block size</li> 188 * <li>not track progress</li> 189 * </ol> 190 * @param conf configurations 191 * @param fs {@link FileSystem} on which to write the file 192 * @param path {@link Path} to the file to write 193 * @param perm permissions 194 * @param favoredNodes favored data nodes 195 * @return output stream to the created file 196 * @throws IOException if the file cannot be created 197 */ 198 public static FSDataOutputStream create(Configuration conf, FileSystem fs, Path path, 199 FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException { 200 if (fs instanceof HFileSystem) { 201 FileSystem backingFs = ((HFileSystem) fs).getBackingFs(); 202 if (backingFs instanceof DistributedFileSystem) { 203 // Try to use the favoredNodes version via reflection to allow backwards- 204 // compatibility. 205 short replication = Short.parseShort(conf.get(ColumnFamilyDescriptorBuilder.DFS_REPLICATION, 206 String.valueOf(ColumnFamilyDescriptorBuilder.DEFAULT_DFS_REPLICATION))); 207 try { 208 return (FSDataOutputStream) (DistributedFileSystem.class 209 .getDeclaredMethod("create", Path.class, FsPermission.class, boolean.class, int.class, 210 short.class, long.class, Progressable.class, InetSocketAddress[].class) 211 .invoke(backingFs, path, perm, true, CommonFSUtils.getDefaultBufferSize(backingFs), 212 replication > 0 ? replication : CommonFSUtils.getDefaultReplication(backingFs, path), 213 CommonFSUtils.getDefaultBlockSize(backingFs, path), null, favoredNodes)); 214 } catch (InvocationTargetException ite) { 215 // Function was properly called, but threw it's own exception. 216 throw new IOException(ite.getCause()); 217 } catch (NoSuchMethodException e) { 218 LOG.debug("DFS Client does not support most favored nodes create; using default create"); 219 LOG.trace("Ignoring; use default create", e); 220 } catch (IllegalArgumentException | SecurityException | IllegalAccessException e) { 221 LOG.debug("Ignoring (most likely Reflection related exception) " + e); 222 } 223 } 224 } 225 return CommonFSUtils.create(fs, path, perm, true); 226 } 227 228 /** 229 * Checks to see if the specified file system is available 230 * @param fs filesystem 231 * @throws IOException e 232 */ 233 public static void checkFileSystemAvailable(final FileSystem fs) throws IOException { 234 if (!(fs instanceof DistributedFileSystem)) { 235 return; 236 } 237 IOException exception = null; 238 DistributedFileSystem dfs = (DistributedFileSystem) fs; 239 try { 240 if (dfs.exists(new Path("/"))) { 241 return; 242 } 243 } catch (IOException e) { 244 exception = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; 245 } 246 try { 247 fs.close(); 248 } catch (Exception e) { 249 LOG.error("file system close failed: ", e); 250 } 251 throw new IOException("File system is not available", exception); 252 } 253 254 /** 255 * Inquire the Active NameNode's safe mode status. 256 * @param dfs A DistributedFileSystem object representing the underlying HDFS. 257 * @return whether we're in safe mode n 258 */ 259 private static boolean isInSafeMode(DistributedFileSystem dfs) throws IOException { 260 return dfs.setSafeMode(SAFEMODE_GET, true); 261 } 262 263 /** 264 * Check whether dfs is in safemode. nn 265 */ 266 public static void checkDfsSafeMode(final Configuration conf) throws IOException { 267 boolean isInSafeMode = false; 268 FileSystem fs = FileSystem.get(conf); 269 if (fs instanceof DistributedFileSystem) { 270 DistributedFileSystem dfs = (DistributedFileSystem) fs; 271 isInSafeMode = isInSafeMode(dfs); 272 } 273 if (isInSafeMode) { 274 throw new IOException("File system is in safemode, it can't be written now"); 275 } 276 } 277 278 /** 279 * Verifies current version of file system 280 * @param fs filesystem object 281 * @param rootdir root hbase directory 282 * @return null if no version file exists, version string otherwise 283 * @throws IOException if the version file fails to open 284 * @throws DeserializationException if the version data cannot be translated into a version 285 */ 286 public static String getVersion(FileSystem fs, Path rootdir) 287 throws IOException, DeserializationException { 288 final Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME); 289 FileStatus[] status = null; 290 try { 291 // hadoop 2.0 throws FNFE if directory does not exist. 292 // hadoop 1.0 returns null if directory does not exist. 293 status = fs.listStatus(versionFile); 294 } catch (FileNotFoundException fnfe) { 295 return null; 296 } 297 if (ArrayUtils.getLength(status) == 0) { 298 return null; 299 } 300 String version = null; 301 byte[] content = new byte[(int) status[0].getLen()]; 302 FSDataInputStream s = fs.open(versionFile); 303 try { 304 IOUtils.readFully(s, content, 0, content.length); 305 if (ProtobufUtil.isPBMagicPrefix(content)) { 306 version = parseVersionFrom(content); 307 } else { 308 // Presume it pre-pb format. 309 try (DataInputStream dis = new DataInputStream(new ByteArrayInputStream(content))) { 310 version = dis.readUTF(); 311 } 312 } 313 } catch (EOFException eof) { 314 LOG.warn("Version file was empty, odd, will try to set it."); 315 } finally { 316 s.close(); 317 } 318 return version; 319 } 320 321 /** 322 * Parse the content of the ${HBASE_ROOTDIR}/hbase.version file. 323 * @param bytes The byte content of the hbase.version file 324 * @return The version found in the file as a String 325 * @throws DeserializationException if the version data cannot be translated into a version 326 */ 327 static String parseVersionFrom(final byte[] bytes) throws DeserializationException { 328 ProtobufUtil.expectPBMagicPrefix(bytes); 329 int pblen = ProtobufUtil.lengthOfPBMagic(); 330 FSProtos.HBaseVersionFileContent.Builder builder = 331 FSProtos.HBaseVersionFileContent.newBuilder(); 332 try { 333 ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen); 334 return builder.getVersion(); 335 } catch (IOException e) { 336 // Convert 337 throw new DeserializationException(e); 338 } 339 } 340 341 /** 342 * Create the content to write into the ${HBASE_ROOTDIR}/hbase.version file. 343 * @param version Version to persist 344 * @return Serialized protobuf with <code>version</code> content and a bit of pb magic for a 345 * prefix. 346 */ 347 static byte[] toVersionByteArray(final String version) { 348 FSProtos.HBaseVersionFileContent.Builder builder = 349 FSProtos.HBaseVersionFileContent.newBuilder(); 350 return ProtobufUtil.prependPBMagic(builder.setVersion(version).build().toByteArray()); 351 } 352 353 /** 354 * Verifies current version of file system 355 * @param fs file system 356 * @param rootdir root directory of HBase installation 357 * @param message if true, issues a message on System.out 358 * @throws IOException if the version file cannot be opened 359 * @throws DeserializationException if the contents of the version file cannot be parsed 360 */ 361 public static void checkVersion(FileSystem fs, Path rootdir, boolean message) 362 throws IOException, DeserializationException { 363 checkVersion(fs, rootdir, message, 0, HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS); 364 } 365 366 /** 367 * Verifies current version of file system 368 * @param fs file system 369 * @param rootdir root directory of HBase installation 370 * @param message if true, issues a message on System.out 371 * @param wait wait interval 372 * @param retries number of times to retry 373 * @throws IOException if the version file cannot be opened 374 * @throws DeserializationException if the contents of the version file cannot be parsed 375 */ 376 public static void checkVersion(FileSystem fs, Path rootdir, boolean message, int wait, 377 int retries) throws IOException, DeserializationException { 378 String version = getVersion(fs, rootdir); 379 String msg; 380 if (version == null) { 381 if (!metaRegionExists(fs, rootdir)) { 382 // rootDir is empty (no version file and no root region) 383 // just create new version file (HBASE-1195) 384 setVersion(fs, rootdir, wait, retries); 385 return; 386 } else { 387 msg = "hbase.version file is missing. Is your hbase.rootdir valid? " 388 + "You can restore hbase.version file by running 'HBCK2 filesystem -fix'. " 389 + "See https://github.com/apache/hbase-operator-tools/tree/master/hbase-hbck2"; 390 } 391 } else if (version.compareTo(HConstants.FILE_SYSTEM_VERSION) == 0) { 392 return; 393 } else { 394 msg = "HBase file layout needs to be upgraded. Current filesystem version is " + version 395 + " but software requires version " + HConstants.FILE_SYSTEM_VERSION 396 + ". Consult http://hbase.apache.org/book.html for further information about " 397 + "upgrading HBase."; 398 } 399 400 // version is deprecated require migration 401 // Output on stdout so user sees it in terminal. 402 if (message) { 403 System.out.println("WARNING! " + msg); 404 } 405 throw new FileSystemVersionException(msg); 406 } 407 408 /** 409 * Sets version of file system 410 * @param fs filesystem object 411 * @param rootdir hbase root 412 * @throws IOException e 413 */ 414 public static void setVersion(FileSystem fs, Path rootdir) throws IOException { 415 setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, 0, 416 HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS); 417 } 418 419 /** 420 * Sets version of file system 421 * @param fs filesystem object 422 * @param rootdir hbase root 423 * @param wait time to wait for retry 424 * @param retries number of times to retry before failing 425 * @throws IOException e 426 */ 427 public static void setVersion(FileSystem fs, Path rootdir, int wait, int retries) 428 throws IOException { 429 setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, wait, retries); 430 } 431 432 /** 433 * Sets version of file system 434 * @param fs filesystem object 435 * @param rootdir hbase root directory 436 * @param version version to set 437 * @param wait time to wait for retry 438 * @param retries number of times to retry before throwing an IOException 439 * @throws IOException e 440 */ 441 public static void setVersion(FileSystem fs, Path rootdir, String version, int wait, int retries) 442 throws IOException { 443 Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME); 444 Path tempVersionFile = new Path(rootdir, 445 HConstants.HBASE_TEMP_DIRECTORY + Path.SEPARATOR + HConstants.VERSION_FILE_NAME); 446 while (true) { 447 try { 448 // Write the version to a temporary file 449 FSDataOutputStream s = fs.create(tempVersionFile); 450 try { 451 s.write(toVersionByteArray(version)); 452 s.close(); 453 s = null; 454 // Move the temp version file to its normal location. Returns false 455 // if the rename failed. Throw an IOE in that case. 456 if (!fs.rename(tempVersionFile, versionFile)) { 457 throw new IOException("Unable to move temp version file to " + versionFile); 458 } 459 } finally { 460 // Cleaning up the temporary if the rename failed would be trying 461 // too hard. We'll unconditionally create it again the next time 462 // through anyway, files are overwritten by default by create(). 463 464 // Attempt to close the stream on the way out if it is still open. 465 try { 466 if (s != null) s.close(); 467 } catch (IOException ignore) { 468 } 469 } 470 LOG.info("Created version file at " + rootdir.toString() + " with version=" + version); 471 return; 472 } catch (IOException e) { 473 if (retries > 0) { 474 LOG.debug("Unable to create version file at " + rootdir.toString() + ", retrying", e); 475 fs.delete(versionFile, false); 476 try { 477 if (wait > 0) { 478 Thread.sleep(wait); 479 } 480 } catch (InterruptedException ie) { 481 throw (InterruptedIOException) new InterruptedIOException().initCause(ie); 482 } 483 retries--; 484 } else { 485 throw e; 486 } 487 } 488 } 489 } 490 491 /** 492 * Checks that a cluster ID file exists in the HBase root directory 493 * @param fs the root directory FileSystem 494 * @param rootdir the HBase root directory in HDFS 495 * @param wait how long to wait between retries 496 * @return <code>true</code> if the file exists, otherwise <code>false</code> 497 * @throws IOException if checking the FileSystem fails 498 */ 499 public static boolean checkClusterIdExists(FileSystem fs, Path rootdir, long wait) 500 throws IOException { 501 while (true) { 502 try { 503 Path filePath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME); 504 return fs.exists(filePath); 505 } catch (IOException ioe) { 506 if (wait > 0L) { 507 LOG.warn("Unable to check cluster ID file in {}, retrying in {}ms", rootdir, wait, ioe); 508 try { 509 Thread.sleep(wait); 510 } catch (InterruptedException e) { 511 Thread.currentThread().interrupt(); 512 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 513 } 514 } else { 515 throw ioe; 516 } 517 } 518 } 519 } 520 521 /** 522 * Returns the value of the unique cluster ID stored for this HBase instance. 523 * @param fs the root directory FileSystem 524 * @param rootdir the path to the HBase root directory 525 * @return the unique cluster identifier 526 * @throws IOException if reading the cluster ID file fails 527 */ 528 public static ClusterId getClusterId(FileSystem fs, Path rootdir) throws IOException { 529 Path idPath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME); 530 ClusterId clusterId = null; 531 FileStatus status = fs.exists(idPath) ? fs.getFileStatus(idPath) : null; 532 if (status != null) { 533 int len = Ints.checkedCast(status.getLen()); 534 byte[] content = new byte[len]; 535 FSDataInputStream in = fs.open(idPath); 536 try { 537 in.readFully(content); 538 } catch (EOFException eof) { 539 LOG.warn("Cluster ID file {} is empty", idPath); 540 } finally { 541 in.close(); 542 } 543 try { 544 clusterId = ClusterId.parseFrom(content); 545 } catch (DeserializationException e) { 546 throw new IOException("content=" + Bytes.toString(content), e); 547 } 548 // If not pb'd, make it so. 549 if (!ProtobufUtil.isPBMagicPrefix(content)) { 550 String cid = null; 551 in = fs.open(idPath); 552 try { 553 cid = in.readUTF(); 554 clusterId = new ClusterId(cid); 555 } catch (EOFException eof) { 556 LOG.warn("Cluster ID file {} is empty", idPath); 557 } finally { 558 in.close(); 559 } 560 rewriteAsPb(fs, rootdir, idPath, clusterId); 561 } 562 return clusterId; 563 } else { 564 LOG.warn("Cluster ID file does not exist at {}", idPath); 565 } 566 return clusterId; 567 } 568 569 /** 570 * nn 571 */ 572 private static void rewriteAsPb(final FileSystem fs, final Path rootdir, final Path p, 573 final ClusterId cid) throws IOException { 574 // Rewrite the file as pb. Move aside the old one first, write new 575 // then delete the moved-aside file. 576 Path movedAsideName = new Path(p + "." + EnvironmentEdgeManager.currentTime()); 577 if (!fs.rename(p, movedAsideName)) throw new IOException("Failed rename of " + p); 578 setClusterId(fs, rootdir, cid, 100); 579 if (!fs.delete(movedAsideName, false)) { 580 throw new IOException("Failed delete of " + movedAsideName); 581 } 582 LOG.debug("Rewrote the hbase.id file as pb"); 583 } 584 585 /** 586 * Writes a new unique identifier for this cluster to the "hbase.id" file in the HBase root 587 * directory. If any operations on the ID file fails, and {@code wait} is a positive value, the 588 * method will retry to produce the ID file until the thread is forcibly interrupted. 589 * @param fs the root directory FileSystem 590 * @param rootdir the path to the HBase root directory 591 * @param clusterId the unique identifier to store 592 * @param wait how long (in milliseconds) to wait between retries 593 * @throws IOException if writing to the FileSystem fails and no wait value 594 */ 595 public static void setClusterId(final FileSystem fs, final Path rootdir, 596 final ClusterId clusterId, final long wait) throws IOException { 597 598 final Path idFile = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME); 599 final Path tempDir = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY); 600 final Path tempIdFile = new Path(tempDir, HConstants.CLUSTER_ID_FILE_NAME); 601 602 LOG.debug("Create cluster ID file [{}] with ID: {}", idFile, clusterId); 603 604 while (true) { 605 Optional<IOException> failure = Optional.empty(); 606 607 LOG.debug("Write the cluster ID file to a temporary location: {}", tempIdFile); 608 try (FSDataOutputStream s = fs.create(tempIdFile)) { 609 s.write(clusterId.toByteArray()); 610 } catch (IOException ioe) { 611 failure = Optional.of(ioe); 612 } 613 614 if (!failure.isPresent()) { 615 try { 616 LOG.debug("Move the temporary cluster ID file to its target location [{}]:[{}]", 617 tempIdFile, idFile); 618 619 if (!fs.rename(tempIdFile, idFile)) { 620 failure = 621 Optional.of(new IOException("Unable to move temp cluster ID file to " + idFile)); 622 } 623 } catch (IOException ioe) { 624 failure = Optional.of(ioe); 625 } 626 } 627 628 if (failure.isPresent()) { 629 final IOException cause = failure.get(); 630 if (wait > 0L) { 631 LOG.warn("Unable to create cluster ID file in {}, retrying in {}ms", rootdir, wait, 632 cause); 633 try { 634 Thread.sleep(wait); 635 } catch (InterruptedException e) { 636 Thread.currentThread().interrupt(); 637 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 638 } 639 continue; 640 } else { 641 throw cause; 642 } 643 } else { 644 return; 645 } 646 } 647 } 648 649 /** 650 * If DFS, check safe mode and if so, wait until we clear it. 651 * @param conf configuration 652 * @param wait Sleep between retries 653 * @throws IOException e 654 */ 655 public static void waitOnSafeMode(final Configuration conf, final long wait) throws IOException { 656 FileSystem fs = FileSystem.get(conf); 657 if (!(fs instanceof DistributedFileSystem)) return; 658 DistributedFileSystem dfs = (DistributedFileSystem) fs; 659 // Make sure dfs is not in safe mode 660 while (isInSafeMode(dfs)) { 661 LOG.info("Waiting for dfs to exit safe mode..."); 662 try { 663 Thread.sleep(wait); 664 } catch (InterruptedException e) { 665 Thread.currentThread().interrupt(); 666 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 667 } 668 } 669 } 670 671 /** 672 * Checks if meta region exists 673 * @param fs file system 674 * @param rootDir root directory of HBase installation 675 * @return true if exists 676 */ 677 public static boolean metaRegionExists(FileSystem fs, Path rootDir) throws IOException { 678 Path metaRegionDir = getRegionDirFromRootDir(rootDir, RegionInfoBuilder.FIRST_META_REGIONINFO); 679 return fs.exists(metaRegionDir); 680 } 681 682 /** 683 * Compute HDFS block distribution of a given HdfsDataInputStream. All HdfsDataInputStreams are 684 * backed by a series of LocatedBlocks, which are fetched periodically from the namenode. This 685 * method retrieves those blocks from the input stream and uses them to calculate 686 * HDFSBlockDistribution. The underlying method in DFSInputStream does attempt to use locally 687 * cached blocks, but may hit the namenode if the cache is determined to be incomplete. The method 688 * also involves making copies of all LocatedBlocks rather than return the underlying blocks 689 * themselves. 690 */ 691 static public HDFSBlocksDistribution 692 computeHDFSBlocksDistribution(HdfsDataInputStream inputStream) throws IOException { 693 List<LocatedBlock> blocks = inputStream.getAllBlocks(); 694 HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution(); 695 for (LocatedBlock block : blocks) { 696 String[] hosts = getHostsForLocations(block); 697 long len = block.getBlockSize(); 698 StorageType[] storageTypes = block.getStorageTypes(); 699 blocksDistribution.addHostsAndBlockWeight(hosts, len, storageTypes); 700 } 701 return blocksDistribution; 702 } 703 704 private static String[] getHostsForLocations(LocatedBlock block) { 705 DatanodeInfo[] locations = block.getLocations(); 706 String[] hosts = new String[locations.length]; 707 for (int i = 0; i < hosts.length; i++) { 708 hosts[i] = locations[i].getHostName(); 709 } 710 return hosts; 711 } 712 713 /** 714 * Compute HDFS blocks distribution of a given file, or a portion of the file 715 * @param fs file system 716 * @param status file status of the file 717 * @param start start position of the portion 718 * @param length length of the portion 719 * @return The HDFS blocks distribution 720 */ 721 static public HDFSBlocksDistribution computeHDFSBlocksDistribution(final FileSystem fs, 722 FileStatus status, long start, long length) throws IOException { 723 HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution(); 724 BlockLocation[] blockLocations = fs.getFileBlockLocations(status, start, length); 725 addToHDFSBlocksDistribution(blocksDistribution, blockLocations); 726 return blocksDistribution; 727 } 728 729 /** 730 * Update blocksDistribution with blockLocations 731 * @param blocksDistribution the hdfs blocks distribution 732 * @param blockLocations an array containing block location 733 */ 734 static public void addToHDFSBlocksDistribution(HDFSBlocksDistribution blocksDistribution, 735 BlockLocation[] blockLocations) throws IOException { 736 for (BlockLocation bl : blockLocations) { 737 String[] hosts = bl.getHosts(); 738 long len = bl.getLength(); 739 StorageType[] storageTypes = bl.getStorageTypes(); 740 blocksDistribution.addHostsAndBlockWeight(hosts, len, storageTypes); 741 } 742 } 743 744 // TODO move this method OUT of FSUtils. No dependencies to HMaster 745 /** 746 * Returns the total overall fragmentation percentage. Includes hbase:meta and -ROOT- as well. 747 * @param master The master defining the HBase root and file system 748 * @return A map for each table and its percentage (never null) 749 * @throws IOException When scanning the directory fails 750 */ 751 public static int getTotalTableFragmentation(final HMaster master) throws IOException { 752 Map<String, Integer> map = getTableFragmentation(master); 753 return map.isEmpty() ? -1 : map.get("-TOTAL-"); 754 } 755 756 /** 757 * Runs through the HBase rootdir and checks how many stores for each table have more than one 758 * file in them. Checks -ROOT- and hbase:meta too. The total percentage across all tables is 759 * stored under the special key "-TOTAL-". 760 * @param master The master defining the HBase root and file system. 761 * @return A map for each table and its percentage (never null). 762 * @throws IOException When scanning the directory fails. 763 */ 764 public static Map<String, Integer> getTableFragmentation(final HMaster master) 765 throws IOException { 766 Path path = CommonFSUtils.getRootDir(master.getConfiguration()); 767 // since HMaster.getFileSystem() is package private 768 FileSystem fs = path.getFileSystem(master.getConfiguration()); 769 return getTableFragmentation(fs, path); 770 } 771 772 /** 773 * Runs through the HBase rootdir and checks how many stores for each table have more than one 774 * file in them. Checks -ROOT- and hbase:meta too. The total percentage across all tables is 775 * stored under the special key "-TOTAL-". 776 * @param fs The file system to use 777 * @param hbaseRootDir The root directory to scan 778 * @return A map for each table and its percentage (never null) 779 * @throws IOException When scanning the directory fails 780 */ 781 public static Map<String, Integer> getTableFragmentation(final FileSystem fs, 782 final Path hbaseRootDir) throws IOException { 783 Map<String, Integer> frags = new HashMap<>(); 784 int cfCountTotal = 0; 785 int cfFragTotal = 0; 786 PathFilter regionFilter = new RegionDirFilter(fs); 787 PathFilter familyFilter = new FamilyDirFilter(fs); 788 List<Path> tableDirs = getTableDirs(fs, hbaseRootDir); 789 for (Path d : tableDirs) { 790 int cfCount = 0; 791 int cfFrag = 0; 792 FileStatus[] regionDirs = fs.listStatus(d, regionFilter); 793 for (FileStatus regionDir : regionDirs) { 794 Path dd = regionDir.getPath(); 795 // else its a region name, now look in region for families 796 FileStatus[] familyDirs = fs.listStatus(dd, familyFilter); 797 for (FileStatus familyDir : familyDirs) { 798 cfCount++; 799 cfCountTotal++; 800 Path family = familyDir.getPath(); 801 // now in family make sure only one file 802 FileStatus[] familyStatus = fs.listStatus(family); 803 if (familyStatus.length > 1) { 804 cfFrag++; 805 cfFragTotal++; 806 } 807 } 808 } 809 // compute percentage per table and store in result list 810 frags.put(CommonFSUtils.getTableName(d).getNameAsString(), 811 cfCount == 0 ? 0 : Math.round((float) cfFrag / cfCount * 100)); 812 } 813 // set overall percentage for all tables 814 frags.put("-TOTAL-", 815 cfCountTotal == 0 ? 0 : Math.round((float) cfFragTotal / cfCountTotal * 100)); 816 return frags; 817 } 818 819 public static void renameFile(FileSystem fs, Path src, Path dst) throws IOException { 820 if (fs.exists(dst) && !fs.delete(dst, false)) { 821 throw new IOException("Can not delete " + dst); 822 } 823 if (!fs.rename(src, dst)) { 824 throw new IOException("Can not rename from " + src + " to " + dst); 825 } 826 } 827 828 /** 829 * A {@link PathFilter} that returns only regular files. 830 */ 831 static class FileFilter extends AbstractFileStatusFilter { 832 private final FileSystem fs; 833 834 public FileFilter(final FileSystem fs) { 835 this.fs = fs; 836 } 837 838 @Override 839 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 840 try { 841 return isFile(fs, isDir, p); 842 } catch (IOException e) { 843 LOG.warn("Unable to verify if path={} is a regular file", p, e); 844 return false; 845 } 846 } 847 } 848 849 /** 850 * Directory filter that doesn't include any of the directories in the specified blacklist 851 */ 852 public static class BlackListDirFilter extends AbstractFileStatusFilter { 853 private final FileSystem fs; 854 private List<String> blacklist; 855 856 /** 857 * Create a filter on the givem filesystem with the specified blacklist 858 * @param fs filesystem to filter 859 * @param directoryNameBlackList list of the names of the directories to filter. If 860 * <tt>null</tt>, all directories are returned 861 */ 862 @SuppressWarnings("unchecked") 863 public BlackListDirFilter(final FileSystem fs, final List<String> directoryNameBlackList) { 864 this.fs = fs; 865 blacklist = (List<String>) (directoryNameBlackList == null 866 ? Collections.emptyList() 867 : directoryNameBlackList); 868 } 869 870 @Override 871 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 872 if (!isValidName(p.getName())) { 873 return false; 874 } 875 876 try { 877 return isDirectory(fs, isDir, p); 878 } catch (IOException e) { 879 LOG.warn("An error occurred while verifying if [{}] is a valid directory." 880 + " Returning 'not valid' and continuing.", p, e); 881 return false; 882 } 883 } 884 885 protected boolean isValidName(final String name) { 886 return !blacklist.contains(name); 887 } 888 } 889 890 /** 891 * A {@link PathFilter} that only allows directories. 892 */ 893 public static class DirFilter extends BlackListDirFilter { 894 895 public DirFilter(FileSystem fs) { 896 super(fs, null); 897 } 898 } 899 900 /** 901 * A {@link PathFilter} that returns usertable directories. To get all directories use the 902 * {@link BlackListDirFilter} with a <tt>null</tt> blacklist 903 */ 904 public static class UserTableDirFilter extends BlackListDirFilter { 905 public UserTableDirFilter(FileSystem fs) { 906 super(fs, HConstants.HBASE_NON_TABLE_DIRS); 907 } 908 909 @Override 910 protected boolean isValidName(final String name) { 911 if (!super.isValidName(name)) return false; 912 913 try { 914 TableName.isLegalTableQualifierName(Bytes.toBytes(name)); 915 } catch (IllegalArgumentException e) { 916 LOG.info("Invalid table name: {}", name); 917 return false; 918 } 919 return true; 920 } 921 } 922 923 public static List<Path> getTableDirs(final FileSystem fs, final Path rootdir) 924 throws IOException { 925 List<Path> tableDirs = new ArrayList<>(); 926 Path baseNamespaceDir = new Path(rootdir, HConstants.BASE_NAMESPACE_DIR); 927 if (fs.exists(baseNamespaceDir)) { 928 for (FileStatus status : fs.globStatus(new Path(baseNamespaceDir, "*"))) { 929 tableDirs.addAll(FSUtils.getLocalTableDirs(fs, status.getPath())); 930 } 931 } 932 return tableDirs; 933 } 934 935 /** 936 * nn * @return All the table directories under <code>rootdir</code>. Ignore non table hbase 937 * folders such as .logs, .oldlogs, .corrupt folders. n 938 */ 939 public static List<Path> getLocalTableDirs(final FileSystem fs, final Path rootdir) 940 throws IOException { 941 // presumes any directory under hbase.rootdir is a table 942 FileStatus[] dirs = fs.listStatus(rootdir, new UserTableDirFilter(fs)); 943 List<Path> tabledirs = new ArrayList<>(dirs.length); 944 for (FileStatus dir : dirs) { 945 tabledirs.add(dir.getPath()); 946 } 947 return tabledirs; 948 } 949 950 /** 951 * Filter for all dirs that don't start with '.' 952 */ 953 public static class RegionDirFilter extends AbstractFileStatusFilter { 954 // This pattern will accept 0.90+ style hex region dirs and older numeric region dir names. 955 final public static Pattern regionDirPattern = Pattern.compile("^[0-9a-f]*$"); 956 final FileSystem fs; 957 958 public RegionDirFilter(FileSystem fs) { 959 this.fs = fs; 960 } 961 962 @Override 963 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 964 if (!regionDirPattern.matcher(p.getName()).matches()) { 965 return false; 966 } 967 968 try { 969 return isDirectory(fs, isDir, p); 970 } catch (IOException ioe) { 971 // Maybe the file was moved or the fs was disconnected. 972 LOG.warn("Skipping file {} due to IOException", p, ioe); 973 return false; 974 } 975 } 976 } 977 978 /** 979 * Given a particular table dir, return all the regiondirs inside it, excluding files such as 980 * .tableinfo 981 * @param fs A file system for the Path 982 * @param tableDir Path to a specific table directory <hbase.rootdir>/<tabledir> 983 * @return List of paths to valid region directories in table dir. n 984 */ 985 public static List<Path> getRegionDirs(final FileSystem fs, final Path tableDir) 986 throws IOException { 987 // assumes we are in a table dir. 988 List<FileStatus> rds = listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs)); 989 if (rds == null) { 990 return Collections.emptyList(); 991 } 992 List<Path> regionDirs = new ArrayList<>(rds.size()); 993 for (FileStatus rdfs : rds) { 994 Path rdPath = rdfs.getPath(); 995 regionDirs.add(rdPath); 996 } 997 return regionDirs; 998 } 999 1000 public static Path getRegionDirFromRootDir(Path rootDir, RegionInfo region) { 1001 return getRegionDirFromTableDir(CommonFSUtils.getTableDir(rootDir, region.getTable()), region); 1002 } 1003 1004 public static Path getRegionDirFromTableDir(Path tableDir, RegionInfo region) { 1005 return getRegionDirFromTableDir(tableDir, 1006 ServerRegionReplicaUtil.getRegionInfoForFs(region).getEncodedName()); 1007 } 1008 1009 public static Path getRegionDirFromTableDir(Path tableDir, String encodedRegionName) { 1010 return new Path(tableDir, encodedRegionName); 1011 } 1012 1013 /** 1014 * Filter for all dirs that are legal column family names. This is generally used for colfam dirs 1015 * <hbase.rootdir>/<tabledir>/<regiondir>/<colfamdir>. 1016 */ 1017 public static class FamilyDirFilter extends AbstractFileStatusFilter { 1018 final FileSystem fs; 1019 1020 public FamilyDirFilter(FileSystem fs) { 1021 this.fs = fs; 1022 } 1023 1024 @Override 1025 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 1026 try { 1027 // throws IAE if invalid 1028 ColumnFamilyDescriptorBuilder.isLegalColumnFamilyName(Bytes.toBytes(p.getName())); 1029 } catch (IllegalArgumentException iae) { 1030 // path name is an invalid family name and thus is excluded. 1031 return false; 1032 } 1033 1034 try { 1035 return isDirectory(fs, isDir, p); 1036 } catch (IOException ioe) { 1037 // Maybe the file was moved or the fs was disconnected. 1038 LOG.warn("Skipping file {} due to IOException", p, ioe); 1039 return false; 1040 } 1041 } 1042 } 1043 1044 /** 1045 * Given a particular region dir, return all the familydirs inside it 1046 * @param fs A file system for the Path 1047 * @param regionDir Path to a specific region directory 1048 * @return List of paths to valid family directories in region dir. n 1049 */ 1050 public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir) 1051 throws IOException { 1052 // assumes we are in a region dir. 1053 return getFilePaths(fs, regionDir, new FamilyDirFilter(fs)); 1054 } 1055 1056 public static List<Path> getReferenceFilePaths(final FileSystem fs, final Path familyDir) 1057 throws IOException { 1058 return getFilePaths(fs, familyDir, new ReferenceFileFilter(fs)); 1059 } 1060 1061 public static List<Path> getReferenceAndLinkFilePaths(final FileSystem fs, final Path familyDir) 1062 throws IOException { 1063 return getFilePaths(fs, familyDir, new ReferenceAndLinkFileFilter(fs)); 1064 } 1065 1066 private static List<Path> getFilePaths(final FileSystem fs, final Path dir, 1067 final PathFilter pathFilter) throws IOException { 1068 FileStatus[] fds = fs.listStatus(dir, pathFilter); 1069 List<Path> files = new ArrayList<>(fds.length); 1070 for (FileStatus fdfs : fds) { 1071 Path fdPath = fdfs.getPath(); 1072 files.add(fdPath); 1073 } 1074 return files; 1075 } 1076 1077 public static int getRegionReferenceAndLinkFileCount(final FileSystem fs, final Path p) { 1078 int result = 0; 1079 try { 1080 for (Path familyDir : getFamilyDirs(fs, p)) { 1081 result += getReferenceAndLinkFilePaths(fs, familyDir).size(); 1082 } 1083 } catch (IOException e) { 1084 LOG.warn("Error Counting reference files.", e); 1085 } 1086 return result; 1087 } 1088 1089 public static class ReferenceAndLinkFileFilter implements PathFilter { 1090 1091 private final FileSystem fs; 1092 1093 public ReferenceAndLinkFileFilter(FileSystem fs) { 1094 this.fs = fs; 1095 } 1096 1097 @Override 1098 public boolean accept(Path rd) { 1099 try { 1100 // only files can be references. 1101 return !fs.getFileStatus(rd).isDirectory() 1102 && (StoreFileInfo.isReference(rd) || HFileLink.isHFileLink(rd)); 1103 } catch (IOException ioe) { 1104 // Maybe the file was moved or the fs was disconnected. 1105 LOG.warn("Skipping file " + rd + " due to IOException", ioe); 1106 return false; 1107 } 1108 } 1109 } 1110 1111 /** 1112 * Filter for HFiles that excludes reference files. 1113 */ 1114 public static class HFileFilter extends AbstractFileStatusFilter { 1115 final FileSystem fs; 1116 1117 public HFileFilter(FileSystem fs) { 1118 this.fs = fs; 1119 } 1120 1121 @Override 1122 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 1123 if (!StoreFileInfo.isHFile(p) && !StoreFileInfo.isMobFile(p)) { 1124 return false; 1125 } 1126 1127 try { 1128 return isFile(fs, isDir, p); 1129 } catch (IOException ioe) { 1130 // Maybe the file was moved or the fs was disconnected. 1131 LOG.warn("Skipping file {} due to IOException", p, ioe); 1132 return false; 1133 } 1134 } 1135 } 1136 1137 /** 1138 * Filter for HFileLinks (StoreFiles and HFiles not included). the filter itself does not consider 1139 * if a link is file or not. 1140 */ 1141 public static class HFileLinkFilter implements PathFilter { 1142 1143 @Override 1144 public boolean accept(Path p) { 1145 return HFileLink.isHFileLink(p); 1146 } 1147 } 1148 1149 public static class ReferenceFileFilter extends AbstractFileStatusFilter { 1150 1151 private final FileSystem fs; 1152 1153 public ReferenceFileFilter(FileSystem fs) { 1154 this.fs = fs; 1155 } 1156 1157 @Override 1158 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 1159 if (!StoreFileInfo.isReference(p)) { 1160 return false; 1161 } 1162 1163 try { 1164 // only files can be references. 1165 return isFile(fs, isDir, p); 1166 } catch (IOException ioe) { 1167 // Maybe the file was moved or the fs was disconnected. 1168 LOG.warn("Skipping file {} due to IOException", p, ioe); 1169 return false; 1170 } 1171 } 1172 } 1173 1174 /** 1175 * Called every so-often by storefile map builder getTableStoreFilePathMap to report progress. 1176 */ 1177 interface ProgressReporter { 1178 /** 1179 * @param status File or directory we are about to process. 1180 */ 1181 void progress(FileStatus status); 1182 } 1183 1184 /** 1185 * Runs through the HBase rootdir/tablename and creates a reverse lookup map for table StoreFile 1186 * names to the full Path. <br> 1187 * Example...<br> 1188 * Key = 3944417774205889744 <br> 1189 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1190 * @param map map to add values. If null, this method will create and populate one to 1191 * return 1192 * @param fs The file system to use. 1193 * @param hbaseRootDir The root directory to scan. 1194 * @param tableName name of the table to scan. 1195 * @return Map keyed by StoreFile name with a value of the full Path. 1196 * @throws IOException When scanning the directory fails. n 1197 */ 1198 public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map, 1199 final FileSystem fs, final Path hbaseRootDir, TableName tableName) 1200 throws IOException, InterruptedException { 1201 return getTableStoreFilePathMap(map, fs, hbaseRootDir, tableName, null, null, 1202 (ProgressReporter) null); 1203 } 1204 1205 /** 1206 * Runs through the HBase rootdir/tablename and creates a reverse lookup map for table StoreFile 1207 * names to the full Path. Note that because this method can be called on a 'live' HBase system 1208 * that we will skip files that no longer exist by the time we traverse them and similarly the 1209 * user of the result needs to consider that some entries in this map may not exist by the time 1210 * this call completes. <br> 1211 * Example...<br> 1212 * Key = 3944417774205889744 <br> 1213 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1214 * @param resultMap map to add values. If null, this method will create and populate one to 1215 * return 1216 * @param fs The file system to use. 1217 * @param hbaseRootDir The root directory to scan. 1218 * @param tableName name of the table to scan. 1219 * @param sfFilter optional path filter to apply to store files 1220 * @param executor optional executor service to parallelize this operation 1221 * @param progressReporter Instance or null; gets called every time we move to new region of 1222 * family dir and for each store file. 1223 * @return Map keyed by StoreFile name with a value of the full Path. 1224 * @throws IOException When scanning the directory fails. 1225 * @deprecated Since 2.3.0. For removal in hbase4. Use ProgressReporter override instead. 1226 */ 1227 @Deprecated 1228 public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap, 1229 final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter, 1230 ExecutorService executor, final HbckErrorReporter progressReporter) 1231 throws IOException, InterruptedException { 1232 return getTableStoreFilePathMap(resultMap, fs, hbaseRootDir, tableName, sfFilter, executor, 1233 new ProgressReporter() { 1234 @Override 1235 public void progress(FileStatus status) { 1236 // status is not used in this implementation. 1237 progressReporter.progress(); 1238 } 1239 }); 1240 } 1241 1242 /** 1243 * Runs through the HBase rootdir/tablename and creates a reverse lookup map for table StoreFile 1244 * names to the full Path. Note that because this method can be called on a 'live' HBase system 1245 * that we will skip files that no longer exist by the time we traverse them and similarly the 1246 * user of the result needs to consider that some entries in this map may not exist by the time 1247 * this call completes. <br> 1248 * Example...<br> 1249 * Key = 3944417774205889744 <br> 1250 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1251 * @param resultMap map to add values. If null, this method will create and populate one to 1252 * return 1253 * @param fs The file system to use. 1254 * @param hbaseRootDir The root directory to scan. 1255 * @param tableName name of the table to scan. 1256 * @param sfFilter optional path filter to apply to store files 1257 * @param executor optional executor service to parallelize this operation 1258 * @param progressReporter Instance or null; gets called every time we move to new region of 1259 * family dir and for each store file. 1260 * @return Map keyed by StoreFile name with a value of the full Path. 1261 * @throws IOException When scanning the directory fails. 1262 * @throws InterruptedException the thread is interrupted, either before or during the activity. 1263 */ 1264 public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap, 1265 final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter, 1266 ExecutorService executor, final ProgressReporter progressReporter) 1267 throws IOException, InterruptedException { 1268 1269 final Map<String, Path> finalResultMap = 1270 resultMap == null ? new ConcurrentHashMap<>(128, 0.75f, 32) : resultMap; 1271 1272 // only include the directory paths to tables 1273 Path tableDir = CommonFSUtils.getTableDir(hbaseRootDir, tableName); 1274 // Inside a table, there are compaction.dir directories to skip. Otherwise, all else 1275 // should be regions. 1276 final FamilyDirFilter familyFilter = new FamilyDirFilter(fs); 1277 final Vector<Exception> exceptions = new Vector<>(); 1278 1279 try { 1280 List<FileStatus> regionDirs = 1281 FSUtils.listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs)); 1282 if (regionDirs == null) { 1283 return finalResultMap; 1284 } 1285 1286 final List<Future<?>> futures = new ArrayList<>(regionDirs.size()); 1287 1288 for (FileStatus regionDir : regionDirs) { 1289 if (null != progressReporter) { 1290 progressReporter.progress(regionDir); 1291 } 1292 final Path dd = regionDir.getPath(); 1293 1294 if (!exceptions.isEmpty()) { 1295 break; 1296 } 1297 1298 Runnable getRegionStoreFileMapCall = new Runnable() { 1299 @Override 1300 public void run() { 1301 try { 1302 HashMap<String, Path> regionStoreFileMap = new HashMap<>(); 1303 List<FileStatus> familyDirs = 1304 FSUtils.listStatusWithStatusFilter(fs, dd, familyFilter); 1305 if (familyDirs == null) { 1306 if (!fs.exists(dd)) { 1307 LOG.warn("Skipping region because it no longer exists: " + dd); 1308 } else { 1309 LOG.warn("Skipping region because it has no family dirs: " + dd); 1310 } 1311 return; 1312 } 1313 for (FileStatus familyDir : familyDirs) { 1314 if (null != progressReporter) { 1315 progressReporter.progress(familyDir); 1316 } 1317 Path family = familyDir.getPath(); 1318 if (family.getName().equals(HConstants.RECOVERED_EDITS_DIR)) { 1319 continue; 1320 } 1321 // now in family, iterate over the StoreFiles and 1322 // put in map 1323 FileStatus[] familyStatus = fs.listStatus(family); 1324 for (FileStatus sfStatus : familyStatus) { 1325 if (null != progressReporter) { 1326 progressReporter.progress(sfStatus); 1327 } 1328 Path sf = sfStatus.getPath(); 1329 if (sfFilter == null || sfFilter.accept(sf)) { 1330 regionStoreFileMap.put(sf.getName(), sf); 1331 } 1332 } 1333 } 1334 finalResultMap.putAll(regionStoreFileMap); 1335 } catch (Exception e) { 1336 LOG.error("Could not get region store file map for region: " + dd, e); 1337 exceptions.add(e); 1338 } 1339 } 1340 }; 1341 1342 // If executor is available, submit async tasks to exec concurrently, otherwise 1343 // just do serial sync execution 1344 if (executor != null) { 1345 Future<?> future = executor.submit(getRegionStoreFileMapCall); 1346 futures.add(future); 1347 } else { 1348 FutureTask<?> future = new FutureTask<>(getRegionStoreFileMapCall, null); 1349 future.run(); 1350 futures.add(future); 1351 } 1352 } 1353 1354 // Ensure all pending tasks are complete (or that we run into an exception) 1355 for (Future<?> f : futures) { 1356 if (!exceptions.isEmpty()) { 1357 break; 1358 } 1359 try { 1360 f.get(); 1361 } catch (ExecutionException e) { 1362 LOG.error("Unexpected exec exception! Should've been caught already. (Bug?)", e); 1363 // Shouldn't happen, we already logged/caught any exceptions in the Runnable 1364 } 1365 } 1366 } catch (IOException e) { 1367 LOG.error("Cannot execute getTableStoreFilePathMap for " + tableName, e); 1368 exceptions.add(e); 1369 } finally { 1370 if (!exceptions.isEmpty()) { 1371 // Just throw the first exception as an indication something bad happened 1372 // Don't need to propagate all the exceptions, we already logged them all anyway 1373 Throwables.propagateIfPossible(exceptions.firstElement(), IOException.class); 1374 throw new IOException(exceptions.firstElement()); 1375 } 1376 } 1377 1378 return finalResultMap; 1379 } 1380 1381 public static int getRegionReferenceFileCount(final FileSystem fs, final Path p) { 1382 int result = 0; 1383 try { 1384 for (Path familyDir : getFamilyDirs(fs, p)) { 1385 result += getReferenceFilePaths(fs, familyDir).size(); 1386 } 1387 } catch (IOException e) { 1388 LOG.warn("Error counting reference files", e); 1389 } 1390 return result; 1391 } 1392 1393 /** 1394 * Runs through the HBase rootdir and creates a reverse lookup map for table StoreFile names to 1395 * the full Path. <br> 1396 * Example...<br> 1397 * Key = 3944417774205889744 <br> 1398 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1399 * @param fs The file system to use. 1400 * @param hbaseRootDir The root directory to scan. 1401 * @return Map keyed by StoreFile name with a value of the full Path. 1402 * @throws IOException When scanning the directory fails. 1403 */ 1404 public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs, 1405 final Path hbaseRootDir) throws IOException, InterruptedException { 1406 return getTableStoreFilePathMap(fs, hbaseRootDir, null, null, (ProgressReporter) null); 1407 } 1408 1409 /** 1410 * Runs through the HBase rootdir and creates a reverse lookup map for table StoreFile names to 1411 * the full Path. <br> 1412 * Example...<br> 1413 * Key = 3944417774205889744 <br> 1414 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1415 * @param fs The file system to use. 1416 * @param hbaseRootDir The root directory to scan. 1417 * @param sfFilter optional path filter to apply to store files 1418 * @param executor optional executor service to parallelize this operation 1419 * @param progressReporter Instance or null; gets called every time we move to new region of 1420 * family dir and for each store file. 1421 * @return Map keyed by StoreFile name with a value of the full Path. 1422 * @throws IOException When scanning the directory fails. 1423 * @deprecated Since 2.3.0. Will be removed in hbase4. Used 1424 * {@link #getTableStoreFilePathMap(FileSystem, Path, PathFilter, ExecutorService, ProgressReporter)} 1425 */ 1426 @Deprecated 1427 public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs, 1428 final Path hbaseRootDir, PathFilter sfFilter, ExecutorService executor, 1429 HbckErrorReporter progressReporter) throws IOException, InterruptedException { 1430 return getTableStoreFilePathMap(fs, hbaseRootDir, sfFilter, executor, new ProgressReporter() { 1431 @Override 1432 public void progress(FileStatus status) { 1433 // status is not used in this implementation. 1434 progressReporter.progress(); 1435 } 1436 }); 1437 } 1438 1439 /** 1440 * Runs through the HBase rootdir and creates a reverse lookup map for table StoreFile names to 1441 * the full Path. <br> 1442 * Example...<br> 1443 * Key = 3944417774205889744 <br> 1444 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1445 * @param fs The file system to use. 1446 * @param hbaseRootDir The root directory to scan. 1447 * @param sfFilter optional path filter to apply to store files 1448 * @param executor optional executor service to parallelize this operation 1449 * @param progressReporter Instance or null; gets called every time we move to new region of 1450 * family dir and for each store file. 1451 * @return Map keyed by StoreFile name with a value of the full Path. 1452 * @throws IOException When scanning the directory fails. n 1453 */ 1454 public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs, 1455 final Path hbaseRootDir, PathFilter sfFilter, ExecutorService executor, 1456 ProgressReporter progressReporter) throws IOException, InterruptedException { 1457 ConcurrentHashMap<String, Path> map = new ConcurrentHashMap<>(1024, 0.75f, 32); 1458 1459 // if this method looks similar to 'getTableFragmentation' that is because 1460 // it was borrowed from it. 1461 1462 // only include the directory paths to tables 1463 for (Path tableDir : FSUtils.getTableDirs(fs, hbaseRootDir)) { 1464 getTableStoreFilePathMap(map, fs, hbaseRootDir, CommonFSUtils.getTableName(tableDir), 1465 sfFilter, executor, progressReporter); 1466 } 1467 return map; 1468 } 1469 1470 /** 1471 * Filters FileStatuses in an array and returns a list 1472 * @param input An array of FileStatuses 1473 * @param filter A required filter to filter the array 1474 * @return A list of FileStatuses 1475 */ 1476 public static List<FileStatus> filterFileStatuses(FileStatus[] input, FileStatusFilter filter) { 1477 if (input == null) return null; 1478 return filterFileStatuses(Iterators.forArray(input), filter); 1479 } 1480 1481 /** 1482 * Filters FileStatuses in an iterator and returns a list 1483 * @param input An iterator of FileStatuses 1484 * @param filter A required filter to filter the array 1485 * @return A list of FileStatuses 1486 */ 1487 public static List<FileStatus> filterFileStatuses(Iterator<FileStatus> input, 1488 FileStatusFilter filter) { 1489 if (input == null) return null; 1490 ArrayList<FileStatus> results = new ArrayList<>(); 1491 while (input.hasNext()) { 1492 FileStatus f = input.next(); 1493 if (filter.accept(f)) { 1494 results.add(f); 1495 } 1496 } 1497 return results; 1498 } 1499 1500 /** 1501 * Calls fs.listStatus() and treats FileNotFoundException as non-fatal This accommodates 1502 * differences between hadoop versions, where hadoop 1 does not throw a FileNotFoundException, and 1503 * return an empty FileStatus[] while Hadoop 2 will throw FileNotFoundException. 1504 * @param fs file system 1505 * @param dir directory 1506 * @param filter file status filter 1507 * @return null if dir is empty or doesn't exist, otherwise FileStatus list 1508 */ 1509 public static List<FileStatus> listStatusWithStatusFilter(final FileSystem fs, final Path dir, 1510 final FileStatusFilter filter) throws IOException { 1511 FileStatus[] status = null; 1512 try { 1513 status = fs.listStatus(dir); 1514 } catch (FileNotFoundException fnfe) { 1515 LOG.trace("{} does not exist", dir); 1516 return null; 1517 } 1518 1519 if (ArrayUtils.getLength(status) == 0) { 1520 return null; 1521 } 1522 1523 if (filter == null) { 1524 return Arrays.asList(status); 1525 } else { 1526 List<FileStatus> status2 = filterFileStatuses(status, filter); 1527 if (status2 == null || status2.isEmpty()) { 1528 return null; 1529 } else { 1530 return status2; 1531 } 1532 } 1533 } 1534 1535 /** 1536 * This function is to scan the root path of the file system to get the degree of locality for 1537 * each region on each of the servers having at least one block of that region. This is used by 1538 * the tool {@link org.apache.hadoop.hbase.master.RegionPlacementMaintainer} n * the configuration 1539 * to use 1540 * @return the mapping from region encoded name to a map of server names to locality fraction n * 1541 * in case of file system errors or interrupts 1542 */ 1543 public static Map<String, Map<String, Float>> 1544 getRegionDegreeLocalityMappingFromFS(final Configuration conf) throws IOException { 1545 return getRegionDegreeLocalityMappingFromFS(conf, null, 1546 conf.getInt(THREAD_POOLSIZE, DEFAULT_THREAD_POOLSIZE)); 1547 1548 } 1549 1550 /** 1551 * This function is to scan the root path of the file system to get the degree of locality for 1552 * each region on each of the servers having at least one block of that region. n * the 1553 * configuration to use n * the table you wish to scan locality for n * the thread pool size to 1554 * use 1555 * @return the mapping from region encoded name to a map of server names to locality fraction n * 1556 * in case of file system errors or interrupts 1557 */ 1558 public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS( 1559 final Configuration conf, final String desiredTable, int threadPoolSize) throws IOException { 1560 Map<String, Map<String, Float>> regionDegreeLocalityMapping = new ConcurrentHashMap<>(); 1561 getRegionLocalityMappingFromFS(conf, desiredTable, threadPoolSize, regionDegreeLocalityMapping); 1562 return regionDegreeLocalityMapping; 1563 } 1564 1565 /** 1566 * This function is to scan the root path of the file system to get either the mapping between the 1567 * region name and its best locality region server or the degree of locality of each region on 1568 * each of the servers having at least one block of that region. The output map parameters are 1569 * both optional. n * the configuration to use n * the table you wish to scan locality for n * the 1570 * thread pool size to use n * the map into which to put the locality degree mapping or null, must 1571 * be a thread-safe implementation n * in case of file system errors or interrupts 1572 */ 1573 private static void getRegionLocalityMappingFromFS(final Configuration conf, 1574 final String desiredTable, int threadPoolSize, 1575 final Map<String, Map<String, Float>> regionDegreeLocalityMapping) throws IOException { 1576 final FileSystem fs = FileSystem.get(conf); 1577 final Path rootPath = CommonFSUtils.getRootDir(conf); 1578 final long startTime = EnvironmentEdgeManager.currentTime(); 1579 final Path queryPath; 1580 // The table files are in ${hbase.rootdir}/data/<namespace>/<table>/* 1581 if (null == desiredTable) { 1582 queryPath = 1583 new Path(new Path(rootPath, HConstants.BASE_NAMESPACE_DIR).toString() + "/*/*/*/"); 1584 } else { 1585 queryPath = new Path( 1586 CommonFSUtils.getTableDir(rootPath, TableName.valueOf(desiredTable)).toString() + "/*/"); 1587 } 1588 1589 // reject all paths that are not appropriate 1590 PathFilter pathFilter = new PathFilter() { 1591 @Override 1592 public boolean accept(Path path) { 1593 // this is the region name; it may get some noise data 1594 if (null == path) { 1595 return false; 1596 } 1597 1598 // no parent? 1599 Path parent = path.getParent(); 1600 if (null == parent) { 1601 return false; 1602 } 1603 1604 String regionName = path.getName(); 1605 if (null == regionName) { 1606 return false; 1607 } 1608 1609 if (!regionName.toLowerCase(Locale.ROOT).matches("[0-9a-f]+")) { 1610 return false; 1611 } 1612 return true; 1613 } 1614 }; 1615 1616 FileStatus[] statusList = fs.globStatus(queryPath, pathFilter); 1617 1618 if (LOG.isDebugEnabled()) { 1619 LOG.debug("Query Path: {} ; # list of files: {}", queryPath, Arrays.toString(statusList)); 1620 } 1621 1622 if (null == statusList) { 1623 return; 1624 } 1625 1626 // lower the number of threads in case we have very few expected regions 1627 threadPoolSize = Math.min(threadPoolSize, statusList.length); 1628 1629 // run in multiple threads 1630 final ExecutorService tpe = Executors.newFixedThreadPool(threadPoolSize, 1631 new ThreadFactoryBuilder().setNameFormat("FSRegionQuery-pool-%d").setDaemon(true) 1632 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 1633 try { 1634 // ignore all file status items that are not of interest 1635 for (FileStatus regionStatus : statusList) { 1636 if (null == regionStatus || !regionStatus.isDirectory()) { 1637 continue; 1638 } 1639 1640 final Path regionPath = regionStatus.getPath(); 1641 if (null != regionPath) { 1642 tpe.execute(new FSRegionScanner(fs, regionPath, null, regionDegreeLocalityMapping)); 1643 } 1644 } 1645 } finally { 1646 tpe.shutdown(); 1647 final long threadWakeFrequency = (long) conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 1648 HConstants.DEFAULT_THREAD_WAKE_FREQUENCY); 1649 try { 1650 // here we wait until TPE terminates, which is either naturally or by 1651 // exceptions in the execution of the threads 1652 while (!tpe.awaitTermination(threadWakeFrequency, TimeUnit.MILLISECONDS)) { 1653 // printing out rough estimate, so as to not introduce 1654 // AtomicInteger 1655 LOG.info("Locality checking is underway: { Scanned Regions : " 1656 + ((ThreadPoolExecutor) tpe).getCompletedTaskCount() + "/" 1657 + ((ThreadPoolExecutor) tpe).getTaskCount() + " }"); 1658 } 1659 } catch (InterruptedException e) { 1660 Thread.currentThread().interrupt(); 1661 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 1662 } 1663 } 1664 1665 long overhead = EnvironmentEdgeManager.currentTime() - startTime; 1666 LOG.info("Scan DFS for locality info takes {}ms", overhead); 1667 } 1668 1669 /** 1670 * Do our short circuit read setup. Checks buffer size to use and whether to do checksumming in 1671 * hbase or hdfs. n 1672 */ 1673 public static void setupShortCircuitRead(final Configuration conf) { 1674 // Check that the user has not set the "dfs.client.read.shortcircuit.skip.checksum" property. 1675 boolean shortCircuitSkipChecksum = 1676 conf.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false); 1677 boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true); 1678 if (shortCircuitSkipChecksum) { 1679 LOG.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not " 1680 + "be set to true." 1681 + (useHBaseChecksum 1682 ? " HBase checksum doesn't require " 1683 + "it, see https://issues.apache.org/jira/browse/HBASE-6868." 1684 : "")); 1685 assert !shortCircuitSkipChecksum; // this will fail if assertions are on 1686 } 1687 checkShortCircuitReadBufferSize(conf); 1688 } 1689 1690 /** 1691 * Check if short circuit read buffer size is set and if not, set it to hbase value. n 1692 */ 1693 public static void checkShortCircuitReadBufferSize(final Configuration conf) { 1694 final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2; 1695 final int notSet = -1; 1696 // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2 1697 final String dfsKey = "dfs.client.read.shortcircuit.buffer.size"; 1698 int size = conf.getInt(dfsKey, notSet); 1699 // If a size is set, return -- we will use it. 1700 if (size != notSet) return; 1701 // But short circuit buffer size is normally not set. Put in place the hbase wanted size. 1702 int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize); 1703 conf.setIfUnset(dfsKey, Integer.toString(hbaseSize)); 1704 } 1705 1706 /** 1707 * n * @return The DFSClient DFSHedgedReadMetrics instance or null if can't be found or not on 1708 * hdfs. n 1709 */ 1710 public static DFSHedgedReadMetrics getDFSHedgedReadMetrics(final Configuration c) 1711 throws IOException { 1712 if (!CommonFSUtils.isHDFS(c)) { 1713 return null; 1714 } 1715 // getHedgedReadMetrics is package private. Get the DFSClient instance that is internal 1716 // to the DFS FS instance and make the method getHedgedReadMetrics accessible, then invoke it 1717 // to get the singleton instance of DFSHedgedReadMetrics shared by DFSClients. 1718 final String name = "getHedgedReadMetrics"; 1719 DFSClient dfsclient = ((DistributedFileSystem) FileSystem.get(c)).getClient(); 1720 Method m; 1721 try { 1722 m = dfsclient.getClass().getDeclaredMethod(name); 1723 } catch (NoSuchMethodException e) { 1724 LOG.warn( 1725 "Failed find method " + name + " in dfsclient; no hedged read metrics: " + e.getMessage()); 1726 return null; 1727 } catch (SecurityException e) { 1728 LOG.warn( 1729 "Failed find method " + name + " in dfsclient; no hedged read metrics: " + e.getMessage()); 1730 return null; 1731 } 1732 m.setAccessible(true); 1733 try { 1734 return (DFSHedgedReadMetrics) m.invoke(dfsclient); 1735 } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { 1736 LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " 1737 + e.getMessage()); 1738 return null; 1739 } 1740 } 1741 1742 public static List<Path> copyFilesParallel(FileSystem srcFS, Path src, FileSystem dstFS, Path dst, 1743 Configuration conf, int threads) throws IOException { 1744 ExecutorService pool = Executors.newFixedThreadPool(threads); 1745 List<Future<Void>> futures = new ArrayList<>(); 1746 List<Path> traversedPaths; 1747 try { 1748 traversedPaths = copyFiles(srcFS, src, dstFS, dst, conf, pool, futures); 1749 for (Future<Void> future : futures) { 1750 future.get(); 1751 } 1752 } catch (ExecutionException | InterruptedException | IOException e) { 1753 throw new IOException("Copy snapshot reference files failed", e); 1754 } finally { 1755 pool.shutdownNow(); 1756 } 1757 return traversedPaths; 1758 } 1759 1760 private static List<Path> copyFiles(FileSystem srcFS, Path src, FileSystem dstFS, Path dst, 1761 Configuration conf, ExecutorService pool, List<Future<Void>> futures) throws IOException { 1762 List<Path> traversedPaths = new ArrayList<>(); 1763 traversedPaths.add(dst); 1764 FileStatus currentFileStatus = srcFS.getFileStatus(src); 1765 if (currentFileStatus.isDirectory()) { 1766 if (!dstFS.mkdirs(dst)) { 1767 throw new IOException("Create directory failed: " + dst); 1768 } 1769 FileStatus[] subPaths = srcFS.listStatus(src); 1770 for (FileStatus subPath : subPaths) { 1771 traversedPaths.addAll(copyFiles(srcFS, subPath.getPath(), dstFS, 1772 new Path(dst, subPath.getPath().getName()), conf, pool, futures)); 1773 } 1774 } else { 1775 Future<Void> future = pool.submit(() -> { 1776 FileUtil.copy(srcFS, src, dstFS, dst, false, false, conf); 1777 return null; 1778 }); 1779 futures.add(future); 1780 } 1781 return traversedPaths; 1782 } 1783 1784 /** 1785 * @return A set containing all namenode addresses of fs 1786 */ 1787 private static Set<InetSocketAddress> getNNAddresses(DistributedFileSystem fs, 1788 Configuration conf) { 1789 Set<InetSocketAddress> addresses = new HashSet<>(); 1790 String serviceName = fs.getCanonicalServiceName(); 1791 1792 if (serviceName.startsWith("ha-hdfs")) { 1793 try { 1794 Map<String, Map<String, InetSocketAddress>> addressMap = 1795 DFSUtil.getNNServiceRpcAddressesForCluster(conf); 1796 String nameService = serviceName.substring(serviceName.indexOf(":") + 1); 1797 if (addressMap.containsKey(nameService)) { 1798 Map<String, InetSocketAddress> nnMap = addressMap.get(nameService); 1799 for (Map.Entry<String, InetSocketAddress> e2 : nnMap.entrySet()) { 1800 InetSocketAddress addr = e2.getValue(); 1801 addresses.add(addr); 1802 } 1803 } 1804 } catch (Exception e) { 1805 LOG.warn("DFSUtil.getNNServiceRpcAddresses failed. serviceName=" + serviceName, e); 1806 } 1807 } else { 1808 URI uri = fs.getUri(); 1809 int port = uri.getPort(); 1810 if (port < 0) { 1811 int idx = serviceName.indexOf(':'); 1812 port = Integer.parseInt(serviceName.substring(idx + 1)); 1813 } 1814 InetSocketAddress addr = new InetSocketAddress(uri.getHost(), port); 1815 addresses.add(addr); 1816 } 1817 1818 return addresses; 1819 } 1820 1821 /** 1822 * @param conf the Configuration of HBase 1823 * @return Whether srcFs and desFs are on same hdfs or not 1824 */ 1825 public static boolean isSameHdfs(Configuration conf, FileSystem srcFs, FileSystem desFs) { 1826 // By getCanonicalServiceName, we could make sure both srcFs and desFs 1827 // show a unified format which contains scheme, host and port. 1828 String srcServiceName = srcFs.getCanonicalServiceName(); 1829 String desServiceName = desFs.getCanonicalServiceName(); 1830 1831 if (srcServiceName == null || desServiceName == null) { 1832 return false; 1833 } 1834 if (srcServiceName.equals(desServiceName)) { 1835 return true; 1836 } 1837 if (srcServiceName.startsWith("ha-hdfs") && desServiceName.startsWith("ha-hdfs")) { 1838 Collection<String> internalNameServices = 1839 conf.getTrimmedStringCollection("dfs.internal.nameservices"); 1840 if (!internalNameServices.isEmpty()) { 1841 if (internalNameServices.contains(srcServiceName.split(":")[1])) { 1842 return true; 1843 } else { 1844 return false; 1845 } 1846 } 1847 } 1848 if (srcFs instanceof DistributedFileSystem && desFs instanceof DistributedFileSystem) { 1849 // If one serviceName is an HA format while the other is a non-HA format, 1850 // maybe they refer to the same FileSystem. 1851 // For example, srcFs is "ha-hdfs://nameservices" and desFs is "hdfs://activeNamenode:port" 1852 Set<InetSocketAddress> srcAddrs = getNNAddresses((DistributedFileSystem) srcFs, conf); 1853 Set<InetSocketAddress> desAddrs = getNNAddresses((DistributedFileSystem) desFs, conf); 1854 if (Sets.intersection(srcAddrs, desAddrs).size() > 0) { 1855 return true; 1856 } 1857 } 1858 1859 return false; 1860 } 1861}