001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import static org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET; 021 022import edu.umd.cs.findbugs.annotations.CheckForNull; 023import java.io.ByteArrayInputStream; 024import java.io.DataInputStream; 025import java.io.EOFException; 026import java.io.FileNotFoundException; 027import java.io.IOException; 028import java.io.InterruptedIOException; 029import java.lang.reflect.InvocationTargetException; 030import java.lang.reflect.Method; 031import java.net.InetSocketAddress; 032import java.net.URI; 033import java.util.ArrayList; 034import java.util.Arrays; 035import java.util.Collection; 036import java.util.Collections; 037import java.util.HashMap; 038import java.util.HashSet; 039import java.util.Iterator; 040import java.util.List; 041import java.util.Locale; 042import java.util.Map; 043import java.util.Set; 044import java.util.Vector; 045import java.util.concurrent.ConcurrentHashMap; 046import java.util.concurrent.ExecutionException; 047import java.util.concurrent.ExecutorService; 048import java.util.concurrent.Executors; 049import java.util.concurrent.Future; 050import java.util.concurrent.FutureTask; 051import java.util.concurrent.ThreadPoolExecutor; 052import java.util.concurrent.TimeUnit; 053import java.util.regex.Pattern; 054import org.apache.commons.lang3.ArrayUtils; 055import org.apache.hadoop.conf.Configuration; 056import org.apache.hadoop.fs.BlockLocation; 057import org.apache.hadoop.fs.FSDataInputStream; 058import org.apache.hadoop.fs.FSDataOutputStream; 059import org.apache.hadoop.fs.FileStatus; 060import org.apache.hadoop.fs.FileSystem; 061import org.apache.hadoop.fs.FileUtil; 062import org.apache.hadoop.fs.Path; 063import org.apache.hadoop.fs.PathFilter; 064import org.apache.hadoop.fs.StorageType; 065import org.apache.hadoop.fs.permission.FsPermission; 066import org.apache.hadoop.hbase.ClusterId; 067import org.apache.hadoop.hbase.HColumnDescriptor; 068import org.apache.hadoop.hbase.HConstants; 069import org.apache.hadoop.hbase.HDFSBlocksDistribution; 070import org.apache.hadoop.hbase.TableName; 071import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 072import org.apache.hadoop.hbase.client.RegionInfo; 073import org.apache.hadoop.hbase.client.RegionInfoBuilder; 074import org.apache.hadoop.hbase.exceptions.DeserializationException; 075import org.apache.hadoop.hbase.fs.HFileSystem; 076import org.apache.hadoop.hbase.io.HFileLink; 077import org.apache.hadoop.hbase.master.HMaster; 078import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 079import org.apache.hadoop.hdfs.DFSClient; 080import org.apache.hadoop.hdfs.DFSHedgedReadMetrics; 081import org.apache.hadoop.hdfs.DFSUtil; 082import org.apache.hadoop.hdfs.DistributedFileSystem; 083import org.apache.hadoop.hdfs.client.HdfsDataInputStream; 084import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 085import org.apache.hadoop.hdfs.protocol.LocatedBlock; 086import org.apache.hadoop.io.IOUtils; 087import org.apache.hadoop.ipc.RemoteException; 088import org.apache.hadoop.util.Progressable; 089import org.apache.hadoop.util.StringUtils; 090import org.apache.yetus.audience.InterfaceAudience; 091import org.slf4j.Logger; 092import org.slf4j.LoggerFactory; 093 094import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 095import org.apache.hbase.thirdparty.com.google.common.collect.Iterators; 096import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 097import org.apache.hbase.thirdparty.com.google.common.primitives.Ints; 098import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 099 100import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 101import org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos; 102 103/** 104 * Utility methods for interacting with the underlying file system. 105 */ 106@InterfaceAudience.Private 107public final class FSUtils { 108 private static final Logger LOG = LoggerFactory.getLogger(FSUtils.class); 109 110 private static final String THREAD_POOLSIZE = "hbase.client.localityCheck.threadPoolSize"; 111 private static final int DEFAULT_THREAD_POOLSIZE = 2; 112 113 /** Set to true on Windows platforms */ 114 // currently only used in testing. TODO refactor into a test class 115 public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows"); 116 117 private FSUtils() { 118 } 119 120 /** Returns True is <code>fs</code> is instance of DistributedFileSystem n */ 121 public static boolean isDistributedFileSystem(final FileSystem fs) throws IOException { 122 FileSystem fileSystem = fs; 123 // If passed an instance of HFileSystem, it fails instanceof DistributedFileSystem. 124 // Check its backing fs for dfs-ness. 125 if (fs instanceof HFileSystem) { 126 fileSystem = ((HFileSystem) fs).getBackingFs(); 127 } 128 return fileSystem instanceof DistributedFileSystem; 129 } 130 131 /** 132 * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the 133 * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true. Does not consider 134 * schema; i.e. if schemas different but path or subpath matches, the two will equate. 135 * @param pathToSearch Path we will be trying to match. n * @return True if <code>pathTail</code> 136 * is tail on the path of <code>pathToSearch</code> 137 */ 138 public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) { 139 Path tailPath = pathTail; 140 String tailName; 141 Path toSearch = pathToSearch; 142 String toSearchName; 143 boolean result = false; 144 145 if (pathToSearch.depth() != pathTail.depth()) { 146 return false; 147 } 148 149 do { 150 tailName = tailPath.getName(); 151 if (tailName == null || tailName.isEmpty()) { 152 result = true; 153 break; 154 } 155 toSearchName = toSearch.getName(); 156 if (toSearchName == null || toSearchName.isEmpty()) { 157 break; 158 } 159 // Move up a parent on each path for next go around. Path doesn't let us go off the end. 160 tailPath = tailPath.getParent(); 161 toSearch = toSearch.getParent(); 162 } while (tailName.equals(toSearchName)); 163 return result; 164 } 165 166 /** 167 * Delete the region directory if exists. 168 * @return True if deleted the region directory. n 169 */ 170 public static boolean deleteRegionDir(final Configuration conf, final RegionInfo hri) 171 throws IOException { 172 Path rootDir = CommonFSUtils.getRootDir(conf); 173 FileSystem fs = rootDir.getFileSystem(conf); 174 return CommonFSUtils.deleteDirectory(fs, 175 new Path(CommonFSUtils.getTableDir(rootDir, hri.getTable()), hri.getEncodedName())); 176 } 177 178 /** 179 * Create the specified file on the filesystem. By default, this will: 180 * <ol> 181 * <li>overwrite the file if it exists</li> 182 * <li>apply the umask in the configuration (if it is enabled)</li> 183 * <li>use the fs configured buffer size (or 4096 if not set)</li> 184 * <li>use the configured column family replication or default replication if 185 * {@link ColumnFamilyDescriptorBuilder#DEFAULT_DFS_REPLICATION}</li> 186 * <li>use the default block size</li> 187 * <li>not track progress</li> 188 * </ol> 189 * @param conf configurations 190 * @param fs {@link FileSystem} on which to write the file 191 * @param path {@link Path} to the file to write 192 * @param perm permissions 193 * @param favoredNodes favored data nodes 194 * @return output stream to the created file 195 * @throws IOException if the file cannot be created 196 */ 197 public static FSDataOutputStream create(Configuration conf, FileSystem fs, Path path, 198 FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException { 199 if (fs instanceof HFileSystem) { 200 FileSystem backingFs = ((HFileSystem) fs).getBackingFs(); 201 if (backingFs instanceof DistributedFileSystem) { 202 // Try to use the favoredNodes version via reflection to allow backwards- 203 // compatibility. 204 short replication = Short.parseShort(conf.get(ColumnFamilyDescriptorBuilder.DFS_REPLICATION, 205 String.valueOf(ColumnFamilyDescriptorBuilder.DEFAULT_DFS_REPLICATION))); 206 try { 207 return (FSDataOutputStream) (DistributedFileSystem.class 208 .getDeclaredMethod("create", Path.class, FsPermission.class, boolean.class, int.class, 209 short.class, long.class, Progressable.class, InetSocketAddress[].class) 210 .invoke(backingFs, path, perm, true, CommonFSUtils.getDefaultBufferSize(backingFs), 211 replication > 0 ? replication : CommonFSUtils.getDefaultReplication(backingFs, path), 212 CommonFSUtils.getDefaultBlockSize(backingFs, path), null, favoredNodes)); 213 } catch (InvocationTargetException ite) { 214 // Function was properly called, but threw it's own exception. 215 throw new IOException(ite.getCause()); 216 } catch (NoSuchMethodException e) { 217 LOG.debug("DFS Client does not support most favored nodes create; using default create"); 218 LOG.trace("Ignoring; use default create", e); 219 } catch (IllegalArgumentException | SecurityException | IllegalAccessException e) { 220 LOG.debug("Ignoring (most likely Reflection related exception) " + e); 221 } 222 } 223 } 224 return CommonFSUtils.create(fs, path, perm, true); 225 } 226 227 /** 228 * Checks to see if the specified file system is available 229 * @param fs filesystem 230 * @throws IOException e 231 */ 232 public static void checkFileSystemAvailable(final FileSystem fs) throws IOException { 233 if (!(fs instanceof DistributedFileSystem)) { 234 return; 235 } 236 IOException exception = null; 237 DistributedFileSystem dfs = (DistributedFileSystem) fs; 238 try { 239 if (dfs.exists(new Path("/"))) { 240 return; 241 } 242 } catch (IOException e) { 243 exception = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; 244 } 245 try { 246 fs.close(); 247 } catch (Exception e) { 248 LOG.error("file system close failed: ", e); 249 } 250 throw new IOException("File system is not available", exception); 251 } 252 253 /** 254 * Inquire the Active NameNode's safe mode status. 255 * @param dfs A DistributedFileSystem object representing the underlying HDFS. 256 * @return whether we're in safe mode n 257 */ 258 private static boolean isInSafeMode(DistributedFileSystem dfs) throws IOException { 259 return dfs.setSafeMode(SAFEMODE_GET, true); 260 } 261 262 /** 263 * Check whether dfs is in safemode. nn 264 */ 265 public static void checkDfsSafeMode(final Configuration conf) throws IOException { 266 boolean isInSafeMode = false; 267 FileSystem fs = FileSystem.get(conf); 268 if (fs instanceof DistributedFileSystem) { 269 DistributedFileSystem dfs = (DistributedFileSystem) fs; 270 isInSafeMode = isInSafeMode(dfs); 271 } 272 if (isInSafeMode) { 273 throw new IOException("File system is in safemode, it can't be written now"); 274 } 275 } 276 277 /** 278 * Verifies current version of file system 279 * @param fs filesystem object 280 * @param rootdir root hbase directory 281 * @return null if no version file exists, version string otherwise 282 * @throws IOException if the version file fails to open 283 * @throws DeserializationException if the version data cannot be translated into a version 284 */ 285 public static String getVersion(FileSystem fs, Path rootdir) 286 throws IOException, DeserializationException { 287 final Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME); 288 FileStatus[] status = null; 289 try { 290 // hadoop 2.0 throws FNFE if directory does not exist. 291 // hadoop 1.0 returns null if directory does not exist. 292 status = fs.listStatus(versionFile); 293 } catch (FileNotFoundException fnfe) { 294 return null; 295 } 296 if (ArrayUtils.getLength(status) == 0) { 297 return null; 298 } 299 String version = null; 300 byte[] content = new byte[(int) status[0].getLen()]; 301 FSDataInputStream s = fs.open(versionFile); 302 try { 303 IOUtils.readFully(s, content, 0, content.length); 304 if (ProtobufUtil.isPBMagicPrefix(content)) { 305 version = parseVersionFrom(content); 306 } else { 307 // Presume it pre-pb format. 308 try (DataInputStream dis = new DataInputStream(new ByteArrayInputStream(content))) { 309 version = dis.readUTF(); 310 } 311 } 312 } catch (EOFException eof) { 313 LOG.warn("Version file was empty, odd, will try to set it."); 314 } finally { 315 s.close(); 316 } 317 return version; 318 } 319 320 /** 321 * Parse the content of the ${HBASE_ROOTDIR}/hbase.version file. 322 * @param bytes The byte content of the hbase.version file 323 * @return The version found in the file as a String 324 * @throws DeserializationException if the version data cannot be translated into a version 325 */ 326 static String parseVersionFrom(final byte[] bytes) throws DeserializationException { 327 ProtobufUtil.expectPBMagicPrefix(bytes); 328 int pblen = ProtobufUtil.lengthOfPBMagic(); 329 FSProtos.HBaseVersionFileContent.Builder builder = 330 FSProtos.HBaseVersionFileContent.newBuilder(); 331 try { 332 ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen); 333 return builder.getVersion(); 334 } catch (IOException e) { 335 // Convert 336 throw new DeserializationException(e); 337 } 338 } 339 340 /** 341 * Create the content to write into the ${HBASE_ROOTDIR}/hbase.version file. 342 * @param version Version to persist 343 * @return Serialized protobuf with <code>version</code> content and a bit of pb magic for a 344 * prefix. 345 */ 346 static byte[] toVersionByteArray(final String version) { 347 FSProtos.HBaseVersionFileContent.Builder builder = 348 FSProtos.HBaseVersionFileContent.newBuilder(); 349 return ProtobufUtil.prependPBMagic(builder.setVersion(version).build().toByteArray()); 350 } 351 352 /** 353 * Verifies current version of file system 354 * @param fs file system 355 * @param rootdir root directory of HBase installation 356 * @param message if true, issues a message on System.out 357 * @throws IOException if the version file cannot be opened 358 * @throws DeserializationException if the contents of the version file cannot be parsed 359 */ 360 public static void checkVersion(FileSystem fs, Path rootdir, boolean message) 361 throws IOException, DeserializationException { 362 checkVersion(fs, rootdir, message, 0, HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS); 363 } 364 365 /** 366 * Verifies current version of file system 367 * @param fs file system 368 * @param rootdir root directory of HBase installation 369 * @param message if true, issues a message on System.out 370 * @param wait wait interval 371 * @param retries number of times to retry 372 * @throws IOException if the version file cannot be opened 373 * @throws DeserializationException if the contents of the version file cannot be parsed 374 */ 375 public static void checkVersion(FileSystem fs, Path rootdir, boolean message, int wait, 376 int retries) throws IOException, DeserializationException { 377 String version = getVersion(fs, rootdir); 378 String msg; 379 if (version == null) { 380 if (!metaRegionExists(fs, rootdir)) { 381 // rootDir is empty (no version file and no root region) 382 // just create new version file (HBASE-1195) 383 setVersion(fs, rootdir, wait, retries); 384 return; 385 } else { 386 msg = "hbase.version file is missing. Is your hbase.rootdir valid? " 387 + "You can restore hbase.version file by running 'HBCK2 filesystem -fix'. " 388 + "See https://github.com/apache/hbase-operator-tools/tree/master/hbase-hbck2"; 389 } 390 } else if (version.compareTo(HConstants.FILE_SYSTEM_VERSION) == 0) { 391 return; 392 } else { 393 msg = "HBase file layout needs to be upgraded. Current filesystem version is " + version 394 + " but software requires version " + HConstants.FILE_SYSTEM_VERSION 395 + ". Consult http://hbase.apache.org/book.html for further information about " 396 + "upgrading HBase."; 397 } 398 399 // version is deprecated require migration 400 // Output on stdout so user sees it in terminal. 401 if (message) { 402 System.out.println("WARNING! " + msg); 403 } 404 throw new FileSystemVersionException(msg); 405 } 406 407 /** 408 * Sets version of file system 409 * @param fs filesystem object 410 * @param rootdir hbase root 411 * @throws IOException e 412 */ 413 public static void setVersion(FileSystem fs, Path rootdir) throws IOException { 414 setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, 0, 415 HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS); 416 } 417 418 /** 419 * Sets version of file system 420 * @param fs filesystem object 421 * @param rootdir hbase root 422 * @param wait time to wait for retry 423 * @param retries number of times to retry before failing 424 * @throws IOException e 425 */ 426 public static void setVersion(FileSystem fs, Path rootdir, int wait, int retries) 427 throws IOException { 428 setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, wait, retries); 429 } 430 431 /** 432 * Sets version of file system 433 * @param fs filesystem object 434 * @param rootdir hbase root directory 435 * @param version version to set 436 * @param wait time to wait for retry 437 * @param retries number of times to retry before throwing an IOException 438 * @throws IOException e 439 */ 440 public static void setVersion(FileSystem fs, Path rootdir, String version, int wait, int retries) 441 throws IOException { 442 Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME); 443 Path tempVersionFile = new Path(rootdir, 444 HConstants.HBASE_TEMP_DIRECTORY + Path.SEPARATOR + HConstants.VERSION_FILE_NAME); 445 while (true) { 446 try { 447 // Write the version to a temporary file 448 FSDataOutputStream s = fs.create(tempVersionFile); 449 try { 450 s.write(toVersionByteArray(version)); 451 s.close(); 452 s = null; 453 // Move the temp version file to its normal location. Returns false 454 // if the rename failed. Throw an IOE in that case. 455 if (!fs.rename(tempVersionFile, versionFile)) { 456 throw new IOException("Unable to move temp version file to " + versionFile); 457 } 458 } finally { 459 // Cleaning up the temporary if the rename failed would be trying 460 // too hard. We'll unconditionally create it again the next time 461 // through anyway, files are overwritten by default by create(). 462 463 // Attempt to close the stream on the way out if it is still open. 464 try { 465 if (s != null) s.close(); 466 } catch (IOException ignore) { 467 } 468 } 469 LOG.info("Created version file at " + rootdir.toString() + " with version=" + version); 470 return; 471 } catch (IOException e) { 472 if (retries > 0) { 473 LOG.debug("Unable to create version file at " + rootdir.toString() + ", retrying", e); 474 fs.delete(versionFile, false); 475 try { 476 if (wait > 0) { 477 Thread.sleep(wait); 478 } 479 } catch (InterruptedException ie) { 480 throw (InterruptedIOException) new InterruptedIOException().initCause(ie); 481 } 482 retries--; 483 } else { 484 throw e; 485 } 486 } 487 } 488 } 489 490 /** 491 * Checks that a cluster ID file exists in the HBase root directory 492 * @param fs the root directory FileSystem 493 * @param rootdir the HBase root directory in HDFS 494 * @param wait how long to wait between retries 495 * @return <code>true</code> if the file exists, otherwise <code>false</code> 496 * @throws IOException if checking the FileSystem fails 497 */ 498 public static boolean checkClusterIdExists(FileSystem fs, Path rootdir, long wait) 499 throws IOException { 500 while (true) { 501 try { 502 Path filePath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME); 503 return fs.exists(filePath); 504 } catch (IOException ioe) { 505 if (wait > 0L) { 506 LOG.warn("Unable to check cluster ID file in {}, retrying in {}ms", rootdir, wait, ioe); 507 try { 508 Thread.sleep(wait); 509 } catch (InterruptedException e) { 510 Thread.currentThread().interrupt(); 511 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 512 } 513 } else { 514 throw ioe; 515 } 516 } 517 } 518 } 519 520 /** 521 * Returns the value of the unique cluster ID stored for this HBase instance. 522 * @param fs the root directory FileSystem 523 * @param rootdir the path to the HBase root directory 524 * @return the unique cluster identifier 525 * @throws IOException if reading the cluster ID file fails 526 */ 527 public static ClusterId getClusterId(FileSystem fs, Path rootdir) throws IOException { 528 Path idPath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME); 529 ClusterId clusterId = null; 530 FileStatus status = fs.exists(idPath) ? fs.getFileStatus(idPath) : null; 531 if (status != null) { 532 int len = Ints.checkedCast(status.getLen()); 533 byte[] content = new byte[len]; 534 FSDataInputStream in = fs.open(idPath); 535 try { 536 in.readFully(content); 537 } catch (EOFException eof) { 538 LOG.warn("Cluster ID file {} is empty", idPath); 539 } finally { 540 in.close(); 541 } 542 try { 543 clusterId = ClusterId.parseFrom(content); 544 } catch (DeserializationException e) { 545 throw new IOException("content=" + Bytes.toString(content), e); 546 } 547 // If not pb'd, make it so. 548 if (!ProtobufUtil.isPBMagicPrefix(content)) { 549 String cid = null; 550 in = fs.open(idPath); 551 try { 552 cid = in.readUTF(); 553 clusterId = new ClusterId(cid); 554 } catch (EOFException eof) { 555 LOG.warn("Cluster ID file {} is empty", idPath); 556 } finally { 557 in.close(); 558 } 559 rewriteAsPb(fs, rootdir, idPath, clusterId); 560 } 561 return clusterId; 562 } else { 563 LOG.warn("Cluster ID file does not exist at {}", idPath); 564 } 565 return clusterId; 566 } 567 568 /** 569 * nn 570 */ 571 private static void rewriteAsPb(final FileSystem fs, final Path rootdir, final Path p, 572 final ClusterId cid) throws IOException { 573 // Rewrite the file as pb. Move aside the old one first, write new 574 // then delete the moved-aside file. 575 Path movedAsideName = new Path(p + "." + EnvironmentEdgeManager.currentTime()); 576 if (!fs.rename(p, movedAsideName)) throw new IOException("Failed rename of " + p); 577 setClusterId(fs, rootdir, cid, 100); 578 if (!fs.delete(movedAsideName, false)) { 579 throw new IOException("Failed delete of " + movedAsideName); 580 } 581 LOG.debug("Rewrote the hbase.id file as pb"); 582 } 583 584 /** 585 * Writes a new unique identifier for this cluster to the "hbase.id" file in the HBase root 586 * directory 587 * @param fs the root directory FileSystem 588 * @param rootdir the path to the HBase root directory 589 * @param clusterId the unique identifier to store 590 * @param wait how long (in milliseconds) to wait between retries 591 * @throws IOException if writing to the FileSystem fails and no wait value 592 */ 593 public static void setClusterId(FileSystem fs, Path rootdir, ClusterId clusterId, int wait) 594 throws IOException { 595 while (true) { 596 try { 597 Path idFile = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME); 598 Path tempIdFile = new Path(rootdir, 599 HConstants.HBASE_TEMP_DIRECTORY + Path.SEPARATOR + HConstants.CLUSTER_ID_FILE_NAME); 600 // Write the id file to a temporary location 601 FSDataOutputStream s = fs.create(tempIdFile); 602 try { 603 s.write(clusterId.toByteArray()); 604 s.close(); 605 s = null; 606 // Move the temporary file to its normal location. Throw an IOE if 607 // the rename failed 608 if (!fs.rename(tempIdFile, idFile)) { 609 throw new IOException("Unable to move temp version file to " + idFile); 610 } 611 } finally { 612 // Attempt to close the stream if still open on the way out 613 try { 614 if (s != null) s.close(); 615 } catch (IOException ignore) { 616 } 617 } 618 if (LOG.isDebugEnabled()) { 619 LOG.debug("Created cluster ID file at " + idFile.toString() + " with ID: " + clusterId); 620 } 621 return; 622 } catch (IOException ioe) { 623 if (wait > 0) { 624 LOG.warn("Unable to create cluster ID file in " + rootdir.toString() + ", retrying in " 625 + wait + "msec: " + StringUtils.stringifyException(ioe)); 626 try { 627 Thread.sleep(wait); 628 } catch (InterruptedException e) { 629 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 630 } 631 } else { 632 throw ioe; 633 } 634 } 635 } 636 } 637 638 /** 639 * If DFS, check safe mode and if so, wait until we clear it. 640 * @param conf configuration 641 * @param wait Sleep between retries 642 * @throws IOException e 643 */ 644 public static void waitOnSafeMode(final Configuration conf, final long wait) throws IOException { 645 FileSystem fs = FileSystem.get(conf); 646 if (!(fs instanceof DistributedFileSystem)) return; 647 DistributedFileSystem dfs = (DistributedFileSystem) fs; 648 // Make sure dfs is not in safe mode 649 while (isInSafeMode(dfs)) { 650 LOG.info("Waiting for dfs to exit safe mode..."); 651 try { 652 Thread.sleep(wait); 653 } catch (InterruptedException e) { 654 Thread.currentThread().interrupt(); 655 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 656 } 657 } 658 } 659 660 /** 661 * Checks if meta region exists 662 * @param fs file system 663 * @param rootDir root directory of HBase installation 664 * @return true if exists 665 */ 666 public static boolean metaRegionExists(FileSystem fs, Path rootDir) throws IOException { 667 Path metaRegionDir = getRegionDirFromRootDir(rootDir, RegionInfoBuilder.FIRST_META_REGIONINFO); 668 return fs.exists(metaRegionDir); 669 } 670 671 /** 672 * Compute HDFS block distribution of a given HdfsDataInputStream. All HdfsDataInputStreams are 673 * backed by a series of LocatedBlocks, which are fetched periodically from the namenode. This 674 * method retrieves those blocks from the input stream and uses them to calculate 675 * HDFSBlockDistribution. The underlying method in DFSInputStream does attempt to use locally 676 * cached blocks, but may hit the namenode if the cache is determined to be incomplete. The method 677 * also involves making copies of all LocatedBlocks rather than return the underlying blocks 678 * themselves. 679 */ 680 public static HDFSBlocksDistribution 681 computeHDFSBlocksDistribution(HdfsDataInputStream inputStream) throws IOException { 682 List<LocatedBlock> blocks = inputStream.getAllBlocks(); 683 HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution(); 684 for (LocatedBlock block : blocks) { 685 String[] hosts = getHostsForLocations(block); 686 long len = block.getBlockSize(); 687 StorageType[] storageTypes = block.getStorageTypes(); 688 blocksDistribution.addHostsAndBlockWeight(hosts, len, storageTypes); 689 } 690 return blocksDistribution; 691 } 692 693 private static String[] getHostsForLocations(LocatedBlock block) { 694 DatanodeInfo[] locations = block.getLocations(); 695 String[] hosts = new String[locations.length]; 696 for (int i = 0; i < hosts.length; i++) { 697 hosts[i] = locations[i].getHostName(); 698 } 699 return hosts; 700 } 701 702 /** 703 * Compute HDFS blocks distribution of a given file, or a portion of the file 704 * @param fs file system 705 * @param status file status of the file 706 * @param start start position of the portion 707 * @param length length of the portion 708 * @return The HDFS blocks distribution 709 */ 710 static public HDFSBlocksDistribution computeHDFSBlocksDistribution(final FileSystem fs, 711 FileStatus status, long start, long length) throws IOException { 712 HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution(); 713 BlockLocation[] blockLocations = fs.getFileBlockLocations(status, start, length); 714 addToHDFSBlocksDistribution(blocksDistribution, blockLocations); 715 return blocksDistribution; 716 } 717 718 /** 719 * Update blocksDistribution with blockLocations 720 * @param blocksDistribution the hdfs blocks distribution 721 * @param blockLocations an array containing block location 722 */ 723 static public void addToHDFSBlocksDistribution(HDFSBlocksDistribution blocksDistribution, 724 BlockLocation[] blockLocations) throws IOException { 725 for (BlockLocation bl : blockLocations) { 726 String[] hosts = bl.getHosts(); 727 long len = bl.getLength(); 728 StorageType[] storageTypes = bl.getStorageTypes(); 729 blocksDistribution.addHostsAndBlockWeight(hosts, len, storageTypes); 730 } 731 } 732 733 // TODO move this method OUT of FSUtils. No dependencies to HMaster 734 /** 735 * Returns the total overall fragmentation percentage. Includes hbase:meta and -ROOT- as well. 736 * @param master The master defining the HBase root and file system 737 * @return A map for each table and its percentage (never null) 738 * @throws IOException When scanning the directory fails 739 */ 740 public static int getTotalTableFragmentation(final HMaster master) throws IOException { 741 Map<String, Integer> map = getTableFragmentation(master); 742 return map.isEmpty() ? -1 : map.get("-TOTAL-"); 743 } 744 745 /** 746 * Runs through the HBase rootdir and checks how many stores for each table have more than one 747 * file in them. Checks -ROOT- and hbase:meta too. The total percentage across all tables is 748 * stored under the special key "-TOTAL-". 749 * @param master The master defining the HBase root and file system. 750 * @return A map for each table and its percentage (never null). 751 * @throws IOException When scanning the directory fails. 752 */ 753 public static Map<String, Integer> getTableFragmentation(final HMaster master) 754 throws IOException { 755 Path path = CommonFSUtils.getRootDir(master.getConfiguration()); 756 // since HMaster.getFileSystem() is package private 757 FileSystem fs = path.getFileSystem(master.getConfiguration()); 758 return getTableFragmentation(fs, path); 759 } 760 761 /** 762 * Runs through the HBase rootdir and checks how many stores for each table have more than one 763 * file in them. Checks -ROOT- and hbase:meta too. The total percentage across all tables is 764 * stored under the special key "-TOTAL-". 765 * @param fs The file system to use 766 * @param hbaseRootDir The root directory to scan 767 * @return A map for each table and its percentage (never null) 768 * @throws IOException When scanning the directory fails 769 */ 770 public static Map<String, Integer> getTableFragmentation(final FileSystem fs, 771 final Path hbaseRootDir) throws IOException { 772 Map<String, Integer> frags = new HashMap<>(); 773 int cfCountTotal = 0; 774 int cfFragTotal = 0; 775 PathFilter regionFilter = new RegionDirFilter(fs); 776 PathFilter familyFilter = new FamilyDirFilter(fs); 777 List<Path> tableDirs = getTableDirs(fs, hbaseRootDir); 778 for (Path d : tableDirs) { 779 int cfCount = 0; 780 int cfFrag = 0; 781 FileStatus[] regionDirs = fs.listStatus(d, regionFilter); 782 for (FileStatus regionDir : regionDirs) { 783 Path dd = regionDir.getPath(); 784 // else its a region name, now look in region for families 785 FileStatus[] familyDirs = fs.listStatus(dd, familyFilter); 786 for (FileStatus familyDir : familyDirs) { 787 cfCount++; 788 cfCountTotal++; 789 Path family = familyDir.getPath(); 790 // now in family make sure only one file 791 FileStatus[] familyStatus = fs.listStatus(family); 792 if (familyStatus.length > 1) { 793 cfFrag++; 794 cfFragTotal++; 795 } 796 } 797 } 798 // compute percentage per table and store in result list 799 frags.put(CommonFSUtils.getTableName(d).getNameAsString(), 800 cfCount == 0 ? 0 : Math.round((float) cfFrag / cfCount * 100)); 801 } 802 // set overall percentage for all tables 803 frags.put("-TOTAL-", 804 cfCountTotal == 0 ? 0 : Math.round((float) cfFragTotal / cfCountTotal * 100)); 805 return frags; 806 } 807 808 /** 809 * A {@link PathFilter} that returns only regular files. 810 */ 811 static class FileFilter extends AbstractFileStatusFilter { 812 private final FileSystem fs; 813 814 public FileFilter(final FileSystem fs) { 815 this.fs = fs; 816 } 817 818 @Override 819 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 820 try { 821 return isFile(fs, isDir, p); 822 } catch (IOException e) { 823 LOG.warn("Unable to verify if path={} is a regular file", p, e); 824 return false; 825 } 826 } 827 } 828 829 /** 830 * Directory filter that doesn't include any of the directories in the specified blacklist 831 */ 832 public static class BlackListDirFilter extends AbstractFileStatusFilter { 833 private final FileSystem fs; 834 private List<String> blacklist; 835 836 /** 837 * Create a filter on the givem filesystem with the specified blacklist 838 * @param fs filesystem to filter 839 * @param directoryNameBlackList list of the names of the directories to filter. If 840 * <tt>null</tt>, all directories are returned 841 */ 842 @SuppressWarnings("unchecked") 843 public BlackListDirFilter(final FileSystem fs, final List<String> directoryNameBlackList) { 844 this.fs = fs; 845 blacklist = (List<String>) (directoryNameBlackList == null 846 ? Collections.emptyList() 847 : directoryNameBlackList); 848 } 849 850 @Override 851 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 852 if (!isValidName(p.getName())) { 853 return false; 854 } 855 856 try { 857 return isDirectory(fs, isDir, p); 858 } catch (IOException e) { 859 LOG.warn("An error occurred while verifying if [{}] is a valid directory." 860 + " Returning 'not valid' and continuing.", p, e); 861 return false; 862 } 863 } 864 865 protected boolean isValidName(final String name) { 866 return !blacklist.contains(name); 867 } 868 } 869 870 /** 871 * A {@link PathFilter} that only allows directories. 872 */ 873 public static class DirFilter extends BlackListDirFilter { 874 875 public DirFilter(FileSystem fs) { 876 super(fs, null); 877 } 878 } 879 880 /** 881 * A {@link PathFilter} that returns usertable directories. To get all directories use the 882 * {@link BlackListDirFilter} with a <tt>null</tt> blacklist 883 */ 884 public static class UserTableDirFilter extends BlackListDirFilter { 885 public UserTableDirFilter(FileSystem fs) { 886 super(fs, HConstants.HBASE_NON_TABLE_DIRS); 887 } 888 889 @Override 890 protected boolean isValidName(final String name) { 891 if (!super.isValidName(name)) return false; 892 893 try { 894 TableName.isLegalTableQualifierName(Bytes.toBytes(name)); 895 } catch (IllegalArgumentException e) { 896 LOG.info("Invalid table name: {}", name); 897 return false; 898 } 899 return true; 900 } 901 } 902 903 public static List<Path> getTableDirs(final FileSystem fs, final Path rootdir) 904 throws IOException { 905 List<Path> tableDirs = new ArrayList<>(); 906 Path baseNamespaceDir = new Path(rootdir, HConstants.BASE_NAMESPACE_DIR); 907 if (fs.exists(baseNamespaceDir)) { 908 for (FileStatus status : fs.globStatus(new Path(baseNamespaceDir, "*"))) { 909 tableDirs.addAll(FSUtils.getLocalTableDirs(fs, status.getPath())); 910 } 911 } 912 return tableDirs; 913 } 914 915 /** 916 * nn * @return All the table directories under <code>rootdir</code>. Ignore non table hbase 917 * folders such as .logs, .oldlogs, .corrupt folders. n 918 */ 919 public static List<Path> getLocalTableDirs(final FileSystem fs, final Path rootdir) 920 throws IOException { 921 // presumes any directory under hbase.rootdir is a table 922 FileStatus[] dirs = fs.listStatus(rootdir, new UserTableDirFilter(fs)); 923 List<Path> tabledirs = new ArrayList<>(dirs.length); 924 for (FileStatus dir : dirs) { 925 tabledirs.add(dir.getPath()); 926 } 927 return tabledirs; 928 } 929 930 /** 931 * Filter for all dirs that don't start with '.' 932 */ 933 public static class RegionDirFilter extends AbstractFileStatusFilter { 934 // This pattern will accept 0.90+ style hex region dirs and older numeric region dir names. 935 final public static Pattern regionDirPattern = Pattern.compile("^[0-9a-f]*$"); 936 final FileSystem fs; 937 938 public RegionDirFilter(FileSystem fs) { 939 this.fs = fs; 940 } 941 942 @Override 943 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 944 if (!regionDirPattern.matcher(p.getName()).matches()) { 945 return false; 946 } 947 948 try { 949 return isDirectory(fs, isDir, p); 950 } catch (IOException ioe) { 951 // Maybe the file was moved or the fs was disconnected. 952 LOG.warn("Skipping file {} due to IOException", p, ioe); 953 return false; 954 } 955 } 956 } 957 958 /** 959 * Given a particular table dir, return all the regiondirs inside it, excluding files such as 960 * .tableinfo 961 * @param fs A file system for the Path 962 * @param tableDir Path to a specific table directory <hbase.rootdir>/<tabledir> 963 * @return List of paths to valid region directories in table dir. n 964 */ 965 public static List<Path> getRegionDirs(final FileSystem fs, final Path tableDir) 966 throws IOException { 967 // assumes we are in a table dir. 968 List<FileStatus> rds = listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs)); 969 if (rds == null) { 970 return Collections.emptyList(); 971 } 972 List<Path> regionDirs = new ArrayList<>(rds.size()); 973 for (FileStatus rdfs : rds) { 974 Path rdPath = rdfs.getPath(); 975 regionDirs.add(rdPath); 976 } 977 return regionDirs; 978 } 979 980 public static Path getRegionDirFromRootDir(Path rootDir, RegionInfo region) { 981 return getRegionDirFromTableDir(CommonFSUtils.getTableDir(rootDir, region.getTable()), region); 982 } 983 984 public static Path getRegionDirFromTableDir(Path tableDir, RegionInfo region) { 985 return getRegionDirFromTableDir(tableDir, 986 ServerRegionReplicaUtil.getRegionInfoForFs(region).getEncodedName()); 987 } 988 989 public static Path getRegionDirFromTableDir(Path tableDir, String encodedRegionName) { 990 return new Path(tableDir, encodedRegionName); 991 } 992 993 /** 994 * Filter for all dirs that are legal column family names. This is generally used for colfam dirs 995 * <hbase.rootdir>/<tabledir>/<regiondir>/<colfamdir>. 996 */ 997 public static class FamilyDirFilter extends AbstractFileStatusFilter { 998 final FileSystem fs; 999 1000 public FamilyDirFilter(FileSystem fs) { 1001 this.fs = fs; 1002 } 1003 1004 @Override 1005 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 1006 try { 1007 // throws IAE if invalid 1008 HColumnDescriptor.isLegalFamilyName(Bytes.toBytes(p.getName())); 1009 } catch (IllegalArgumentException iae) { 1010 // path name is an invalid family name and thus is excluded. 1011 return false; 1012 } 1013 1014 try { 1015 return isDirectory(fs, isDir, p); 1016 } catch (IOException ioe) { 1017 // Maybe the file was moved or the fs was disconnected. 1018 LOG.warn("Skipping file {} due to IOException", p, ioe); 1019 return false; 1020 } 1021 } 1022 } 1023 1024 /** 1025 * Given a particular region dir, return all the familydirs inside it 1026 * @param fs A file system for the Path 1027 * @param regionDir Path to a specific region directory 1028 * @return List of paths to valid family directories in region dir. n 1029 */ 1030 public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir) 1031 throws IOException { 1032 // assumes we are in a region dir. 1033 return getFilePaths(fs, regionDir, new FamilyDirFilter(fs)); 1034 } 1035 1036 public static List<Path> getReferenceFilePaths(final FileSystem fs, final Path familyDir) 1037 throws IOException { 1038 return getFilePaths(fs, familyDir, new ReferenceFileFilter(fs)); 1039 } 1040 1041 public static List<Path> getReferenceAndLinkFilePaths(final FileSystem fs, final Path familyDir) 1042 throws IOException { 1043 return getFilePaths(fs, familyDir, new ReferenceAndLinkFileFilter(fs)); 1044 } 1045 1046 private static List<Path> getFilePaths(final FileSystem fs, final Path dir, 1047 final PathFilter pathFilter) throws IOException { 1048 FileStatus[] fds = fs.listStatus(dir, pathFilter); 1049 List<Path> files = new ArrayList<>(fds.length); 1050 for (FileStatus fdfs : fds) { 1051 Path fdPath = fdfs.getPath(); 1052 files.add(fdPath); 1053 } 1054 return files; 1055 } 1056 1057 public static int getRegionReferenceAndLinkFileCount(final FileSystem fs, final Path p) { 1058 int result = 0; 1059 try { 1060 for (Path familyDir : getFamilyDirs(fs, p)) { 1061 result += getReferenceAndLinkFilePaths(fs, familyDir).size(); 1062 } 1063 } catch (IOException e) { 1064 LOG.warn("Error Counting reference files.", e); 1065 } 1066 return result; 1067 } 1068 1069 public static class ReferenceAndLinkFileFilter implements PathFilter { 1070 1071 private final FileSystem fs; 1072 1073 public ReferenceAndLinkFileFilter(FileSystem fs) { 1074 this.fs = fs; 1075 } 1076 1077 @Override 1078 public boolean accept(Path rd) { 1079 try { 1080 // only files can be references. 1081 return !fs.getFileStatus(rd).isDirectory() 1082 && (StoreFileInfo.isReference(rd) || HFileLink.isHFileLink(rd)); 1083 } catch (IOException ioe) { 1084 // Maybe the file was moved or the fs was disconnected. 1085 LOG.warn("Skipping file " + rd + " due to IOException", ioe); 1086 return false; 1087 } 1088 } 1089 } 1090 1091 /** 1092 * Filter for HFiles that excludes reference files. 1093 */ 1094 public static class HFileFilter extends AbstractFileStatusFilter { 1095 final FileSystem fs; 1096 1097 public HFileFilter(FileSystem fs) { 1098 this.fs = fs; 1099 } 1100 1101 @Override 1102 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 1103 if (!StoreFileInfo.isHFile(p) && !StoreFileInfo.isMobFile(p)) { 1104 return false; 1105 } 1106 1107 try { 1108 return isFile(fs, isDir, p); 1109 } catch (IOException ioe) { 1110 // Maybe the file was moved or the fs was disconnected. 1111 LOG.warn("Skipping file {} due to IOException", p, ioe); 1112 return false; 1113 } 1114 } 1115 } 1116 1117 /** 1118 * Filter for HFileLinks (StoreFiles and HFiles not included). the filter itself does not consider 1119 * if a link is file or not. 1120 */ 1121 public static class HFileLinkFilter implements PathFilter { 1122 1123 @Override 1124 public boolean accept(Path p) { 1125 return HFileLink.isHFileLink(p); 1126 } 1127 } 1128 1129 public static class ReferenceFileFilter extends AbstractFileStatusFilter { 1130 1131 private final FileSystem fs; 1132 1133 public ReferenceFileFilter(FileSystem fs) { 1134 this.fs = fs; 1135 } 1136 1137 @Override 1138 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 1139 if (!StoreFileInfo.isReference(p)) { 1140 return false; 1141 } 1142 1143 try { 1144 // only files can be references. 1145 return isFile(fs, isDir, p); 1146 } catch (IOException ioe) { 1147 // Maybe the file was moved or the fs was disconnected. 1148 LOG.warn("Skipping file {} due to IOException", p, ioe); 1149 return false; 1150 } 1151 } 1152 } 1153 1154 /** 1155 * Called every so-often by storefile map builder getTableStoreFilePathMap to report progress. 1156 */ 1157 interface ProgressReporter { 1158 /** 1159 * @param status File or directory we are about to process. 1160 */ 1161 void progress(FileStatus status); 1162 } 1163 1164 /** 1165 * Runs through the HBase rootdir/tablename and creates a reverse lookup map for table StoreFile 1166 * names to the full Path. <br> 1167 * Example...<br> 1168 * Key = 3944417774205889744 <br> 1169 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1170 * @param map map to add values. If null, this method will create and populate one to 1171 * return 1172 * @param fs The file system to use. 1173 * @param hbaseRootDir The root directory to scan. 1174 * @param tableName name of the table to scan. 1175 * @return Map keyed by StoreFile name with a value of the full Path. 1176 * @throws IOException When scanning the directory fails. n 1177 */ 1178 public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map, 1179 final FileSystem fs, final Path hbaseRootDir, TableName tableName) 1180 throws IOException, InterruptedException { 1181 return getTableStoreFilePathMap(map, fs, hbaseRootDir, tableName, null, null, 1182 (ProgressReporter) null); 1183 } 1184 1185 /** 1186 * Runs through the HBase rootdir/tablename and creates a reverse lookup map for table StoreFile 1187 * names to the full Path. Note that because this method can be called on a 'live' HBase system 1188 * that we will skip files that no longer exist by the time we traverse them and similarly the 1189 * user of the result needs to consider that some entries in this map may not exist by the time 1190 * this call completes. <br> 1191 * Example...<br> 1192 * Key = 3944417774205889744 <br> 1193 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1194 * @param resultMap map to add values. If null, this method will create and populate one to 1195 * return 1196 * @param fs The file system to use. 1197 * @param hbaseRootDir The root directory to scan. 1198 * @param tableName name of the table to scan. 1199 * @param sfFilter optional path filter to apply to store files 1200 * @param executor optional executor service to parallelize this operation 1201 * @param progressReporter Instance or null; gets called every time we move to new region of 1202 * family dir and for each store file. 1203 * @return Map keyed by StoreFile name with a value of the full Path. 1204 * @throws IOException When scanning the directory fails. 1205 * @deprecated Since 2.3.0. For removal in hbase4. Use ProgressReporter override instead. 1206 */ 1207 @Deprecated 1208 public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap, 1209 final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter, 1210 ExecutorService executor, final HbckErrorReporter progressReporter) 1211 throws IOException, InterruptedException { 1212 return getTableStoreFilePathMap(resultMap, fs, hbaseRootDir, tableName, sfFilter, executor, 1213 new ProgressReporter() { 1214 @Override 1215 public void progress(FileStatus status) { 1216 // status is not used in this implementation. 1217 progressReporter.progress(); 1218 } 1219 }); 1220 } 1221 1222 /** 1223 * Runs through the HBase rootdir/tablename and creates a reverse lookup map for table StoreFile 1224 * names to the full Path. Note that because this method can be called on a 'live' HBase system 1225 * that we will skip files that no longer exist by the time we traverse them and similarly the 1226 * user of the result needs to consider that some entries in this map may not exist by the time 1227 * this call completes. <br> 1228 * Example...<br> 1229 * Key = 3944417774205889744 <br> 1230 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1231 * @param resultMap map to add values. If null, this method will create and populate one to 1232 * return 1233 * @param fs The file system to use. 1234 * @param hbaseRootDir The root directory to scan. 1235 * @param tableName name of the table to scan. 1236 * @param sfFilter optional path filter to apply to store files 1237 * @param executor optional executor service to parallelize this operation 1238 * @param progressReporter Instance or null; gets called every time we move to new region of 1239 * family dir and for each store file. 1240 * @return Map keyed by StoreFile name with a value of the full Path. 1241 * @throws IOException When scanning the directory fails. 1242 * @throws InterruptedException the thread is interrupted, either before or during the activity. 1243 */ 1244 public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap, 1245 final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter, 1246 ExecutorService executor, final ProgressReporter progressReporter) 1247 throws IOException, InterruptedException { 1248 1249 final Map<String, Path> finalResultMap = 1250 resultMap == null ? new ConcurrentHashMap<>(128, 0.75f, 32) : resultMap; 1251 1252 // only include the directory paths to tables 1253 Path tableDir = CommonFSUtils.getTableDir(hbaseRootDir, tableName); 1254 // Inside a table, there are compaction.dir directories to skip. Otherwise, all else 1255 // should be regions. 1256 final FamilyDirFilter familyFilter = new FamilyDirFilter(fs); 1257 final Vector<Exception> exceptions = new Vector<>(); 1258 1259 try { 1260 List<FileStatus> regionDirs = 1261 FSUtils.listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs)); 1262 if (regionDirs == null) { 1263 return finalResultMap; 1264 } 1265 1266 final List<Future<?>> futures = new ArrayList<>(regionDirs.size()); 1267 1268 for (FileStatus regionDir : regionDirs) { 1269 if (null != progressReporter) { 1270 progressReporter.progress(regionDir); 1271 } 1272 final Path dd = regionDir.getPath(); 1273 1274 if (!exceptions.isEmpty()) { 1275 break; 1276 } 1277 1278 Runnable getRegionStoreFileMapCall = new Runnable() { 1279 @Override 1280 public void run() { 1281 try { 1282 HashMap<String, Path> regionStoreFileMap = new HashMap<>(); 1283 List<FileStatus> familyDirs = 1284 FSUtils.listStatusWithStatusFilter(fs, dd, familyFilter); 1285 if (familyDirs == null) { 1286 if (!fs.exists(dd)) { 1287 LOG.warn("Skipping region because it no longer exists: " + dd); 1288 } else { 1289 LOG.warn("Skipping region because it has no family dirs: " + dd); 1290 } 1291 return; 1292 } 1293 for (FileStatus familyDir : familyDirs) { 1294 if (null != progressReporter) { 1295 progressReporter.progress(familyDir); 1296 } 1297 Path family = familyDir.getPath(); 1298 if (family.getName().equals(HConstants.RECOVERED_EDITS_DIR)) { 1299 continue; 1300 } 1301 // now in family, iterate over the StoreFiles and 1302 // put in map 1303 FileStatus[] familyStatus = fs.listStatus(family); 1304 for (FileStatus sfStatus : familyStatus) { 1305 if (null != progressReporter) { 1306 progressReporter.progress(sfStatus); 1307 } 1308 Path sf = sfStatus.getPath(); 1309 if (sfFilter == null || sfFilter.accept(sf)) { 1310 regionStoreFileMap.put(sf.getName(), sf); 1311 } 1312 } 1313 } 1314 finalResultMap.putAll(regionStoreFileMap); 1315 } catch (Exception e) { 1316 LOG.error("Could not get region store file map for region: " + dd, e); 1317 exceptions.add(e); 1318 } 1319 } 1320 }; 1321 1322 // If executor is available, submit async tasks to exec concurrently, otherwise 1323 // just do serial sync execution 1324 if (executor != null) { 1325 Future<?> future = executor.submit(getRegionStoreFileMapCall); 1326 futures.add(future); 1327 } else { 1328 FutureTask<?> future = new FutureTask<>(getRegionStoreFileMapCall, null); 1329 future.run(); 1330 futures.add(future); 1331 } 1332 } 1333 1334 // Ensure all pending tasks are complete (or that we run into an exception) 1335 for (Future<?> f : futures) { 1336 if (!exceptions.isEmpty()) { 1337 break; 1338 } 1339 try { 1340 f.get(); 1341 } catch (ExecutionException e) { 1342 LOG.error("Unexpected exec exception! Should've been caught already. (Bug?)", e); 1343 // Shouldn't happen, we already logged/caught any exceptions in the Runnable 1344 } 1345 } 1346 } catch (IOException e) { 1347 LOG.error("Cannot execute getTableStoreFilePathMap for " + tableName, e); 1348 exceptions.add(e); 1349 } finally { 1350 if (!exceptions.isEmpty()) { 1351 // Just throw the first exception as an indication something bad happened 1352 // Don't need to propagate all the exceptions, we already logged them all anyway 1353 Throwables.propagateIfPossible(exceptions.firstElement(), IOException.class); 1354 throw new IOException(exceptions.firstElement()); 1355 } 1356 } 1357 1358 return finalResultMap; 1359 } 1360 1361 public static int getRegionReferenceFileCount(final FileSystem fs, final Path p) { 1362 int result = 0; 1363 try { 1364 for (Path familyDir : getFamilyDirs(fs, p)) { 1365 result += getReferenceFilePaths(fs, familyDir).size(); 1366 } 1367 } catch (IOException e) { 1368 LOG.warn("Error counting reference files", e); 1369 } 1370 return result; 1371 } 1372 1373 /** 1374 * Runs through the HBase rootdir and creates a reverse lookup map for table StoreFile names to 1375 * the full Path. <br> 1376 * Example...<br> 1377 * Key = 3944417774205889744 <br> 1378 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1379 * @param fs The file system to use. 1380 * @param hbaseRootDir The root directory to scan. 1381 * @return Map keyed by StoreFile name with a value of the full Path. 1382 * @throws IOException When scanning the directory fails. 1383 */ 1384 public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs, 1385 final Path hbaseRootDir) throws IOException, InterruptedException { 1386 return getTableStoreFilePathMap(fs, hbaseRootDir, null, null, (ProgressReporter) null); 1387 } 1388 1389 /** 1390 * Runs through the HBase rootdir and creates a reverse lookup map for table StoreFile names to 1391 * the full Path. <br> 1392 * Example...<br> 1393 * Key = 3944417774205889744 <br> 1394 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1395 * @param fs The file system to use. 1396 * @param hbaseRootDir The root directory to scan. 1397 * @param sfFilter optional path filter to apply to store files 1398 * @param executor optional executor service to parallelize this operation 1399 * @param progressReporter Instance or null; gets called every time we move to new region of 1400 * family dir and for each store file. 1401 * @return Map keyed by StoreFile name with a value of the full Path. 1402 * @throws IOException When scanning the directory fails. 1403 * @deprecated Since 2.3.0. Will be removed in hbase4. Used 1404 * {@link #getTableStoreFilePathMap(FileSystem, Path, PathFilter, ExecutorService, ProgressReporter)} 1405 */ 1406 @Deprecated 1407 public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs, 1408 final Path hbaseRootDir, PathFilter sfFilter, ExecutorService executor, 1409 HbckErrorReporter progressReporter) throws IOException, InterruptedException { 1410 return getTableStoreFilePathMap(fs, hbaseRootDir, sfFilter, executor, new ProgressReporter() { 1411 @Override 1412 public void progress(FileStatus status) { 1413 // status is not used in this implementation. 1414 progressReporter.progress(); 1415 } 1416 }); 1417 } 1418 1419 /** 1420 * Runs through the HBase rootdir and creates a reverse lookup map for table StoreFile names to 1421 * the full Path. <br> 1422 * Example...<br> 1423 * Key = 3944417774205889744 <br> 1424 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1425 * @param fs The file system to use. 1426 * @param hbaseRootDir The root directory to scan. 1427 * @param sfFilter optional path filter to apply to store files 1428 * @param executor optional executor service to parallelize this operation 1429 * @param progressReporter Instance or null; gets called every time we move to new region of 1430 * family dir and for each store file. 1431 * @return Map keyed by StoreFile name with a value of the full Path. 1432 * @throws IOException When scanning the directory fails. n 1433 */ 1434 public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs, 1435 final Path hbaseRootDir, PathFilter sfFilter, ExecutorService executor, 1436 ProgressReporter progressReporter) throws IOException, InterruptedException { 1437 ConcurrentHashMap<String, Path> map = new ConcurrentHashMap<>(1024, 0.75f, 32); 1438 1439 // if this method looks similar to 'getTableFragmentation' that is because 1440 // it was borrowed from it. 1441 1442 // only include the directory paths to tables 1443 for (Path tableDir : FSUtils.getTableDirs(fs, hbaseRootDir)) { 1444 getTableStoreFilePathMap(map, fs, hbaseRootDir, CommonFSUtils.getTableName(tableDir), 1445 sfFilter, executor, progressReporter); 1446 } 1447 return map; 1448 } 1449 1450 /** 1451 * Filters FileStatuses in an array and returns a list 1452 * @param input An array of FileStatuses 1453 * @param filter A required filter to filter the array 1454 * @return A list of FileStatuses 1455 */ 1456 public static List<FileStatus> filterFileStatuses(FileStatus[] input, FileStatusFilter filter) { 1457 if (input == null) return null; 1458 return filterFileStatuses(Iterators.forArray(input), filter); 1459 } 1460 1461 /** 1462 * Filters FileStatuses in an iterator and returns a list 1463 * @param input An iterator of FileStatuses 1464 * @param filter A required filter to filter the array 1465 * @return A list of FileStatuses 1466 */ 1467 public static List<FileStatus> filterFileStatuses(Iterator<FileStatus> input, 1468 FileStatusFilter filter) { 1469 if (input == null) return null; 1470 ArrayList<FileStatus> results = new ArrayList<>(); 1471 while (input.hasNext()) { 1472 FileStatus f = input.next(); 1473 if (filter.accept(f)) { 1474 results.add(f); 1475 } 1476 } 1477 return results; 1478 } 1479 1480 /** 1481 * Calls fs.listStatus() and treats FileNotFoundException as non-fatal This accommodates 1482 * differences between hadoop versions, where hadoop 1 does not throw a FileNotFoundException, and 1483 * return an empty FileStatus[] while Hadoop 2 will throw FileNotFoundException. 1484 * @param fs file system 1485 * @param dir directory 1486 * @param filter file status filter 1487 * @return null if dir is empty or doesn't exist, otherwise FileStatus list 1488 */ 1489 public static List<FileStatus> listStatusWithStatusFilter(final FileSystem fs, final Path dir, 1490 final FileStatusFilter filter) throws IOException { 1491 FileStatus[] status = null; 1492 try { 1493 status = fs.listStatus(dir); 1494 } catch (FileNotFoundException fnfe) { 1495 LOG.trace("{} does not exist", dir); 1496 return null; 1497 } 1498 1499 if (ArrayUtils.getLength(status) == 0) { 1500 return null; 1501 } 1502 1503 if (filter == null) { 1504 return Arrays.asList(status); 1505 } else { 1506 List<FileStatus> status2 = filterFileStatuses(status, filter); 1507 if (status2 == null || status2.isEmpty()) { 1508 return null; 1509 } else { 1510 return status2; 1511 } 1512 } 1513 } 1514 1515 /** 1516 * This function is to scan the root path of the file system to get the degree of locality for 1517 * each region on each of the servers having at least one block of that region. This is used by 1518 * the tool {@link org.apache.hadoop.hbase.master.RegionPlacementMaintainer} n * the configuration 1519 * to use 1520 * @return the mapping from region encoded name to a map of server names to locality fraction n * 1521 * in case of file system errors or interrupts 1522 */ 1523 public static Map<String, Map<String, Float>> 1524 getRegionDegreeLocalityMappingFromFS(final Configuration conf) throws IOException { 1525 return getRegionDegreeLocalityMappingFromFS(conf, null, 1526 conf.getInt(THREAD_POOLSIZE, DEFAULT_THREAD_POOLSIZE)); 1527 1528 } 1529 1530 /** 1531 * This function is to scan the root path of the file system to get the degree of locality for 1532 * each region on each of the servers having at least one block of that region. n * the 1533 * configuration to use n * the table you wish to scan locality for n * the thread pool size to 1534 * use 1535 * @return the mapping from region encoded name to a map of server names to locality fraction n * 1536 * in case of file system errors or interrupts 1537 */ 1538 public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS( 1539 final Configuration conf, final String desiredTable, int threadPoolSize) throws IOException { 1540 Map<String, Map<String, Float>> regionDegreeLocalityMapping = new ConcurrentHashMap<>(); 1541 getRegionLocalityMappingFromFS(conf, desiredTable, threadPoolSize, regionDegreeLocalityMapping); 1542 return regionDegreeLocalityMapping; 1543 } 1544 1545 /** 1546 * This function is to scan the root path of the file system to get either the mapping between the 1547 * region name and its best locality region server or the degree of locality of each region on 1548 * each of the servers having at least one block of that region. The output map parameters are 1549 * both optional. n * the configuration to use n * the table you wish to scan locality for n * the 1550 * thread pool size to use n * the map into which to put the locality degree mapping or null, must 1551 * be a thread-safe implementation n * in case of file system errors or interrupts 1552 */ 1553 private static void getRegionLocalityMappingFromFS(final Configuration conf, 1554 final String desiredTable, int threadPoolSize, 1555 final Map<String, Map<String, Float>> regionDegreeLocalityMapping) throws IOException { 1556 final FileSystem fs = FileSystem.get(conf); 1557 final Path rootPath = CommonFSUtils.getRootDir(conf); 1558 final long startTime = EnvironmentEdgeManager.currentTime(); 1559 final Path queryPath; 1560 // The table files are in ${hbase.rootdir}/data/<namespace>/<table>/* 1561 if (null == desiredTable) { 1562 queryPath = 1563 new Path(new Path(rootPath, HConstants.BASE_NAMESPACE_DIR).toString() + "/*/*/*/"); 1564 } else { 1565 queryPath = new Path( 1566 CommonFSUtils.getTableDir(rootPath, TableName.valueOf(desiredTable)).toString() + "/*/"); 1567 } 1568 1569 // reject all paths that are not appropriate 1570 PathFilter pathFilter = new PathFilter() { 1571 @Override 1572 public boolean accept(Path path) { 1573 // this is the region name; it may get some noise data 1574 if (null == path) { 1575 return false; 1576 } 1577 1578 // no parent? 1579 Path parent = path.getParent(); 1580 if (null == parent) { 1581 return false; 1582 } 1583 1584 String regionName = path.getName(); 1585 if (null == regionName) { 1586 return false; 1587 } 1588 1589 if (!regionName.toLowerCase(Locale.ROOT).matches("[0-9a-f]+")) { 1590 return false; 1591 } 1592 return true; 1593 } 1594 }; 1595 1596 FileStatus[] statusList = fs.globStatus(queryPath, pathFilter); 1597 1598 if (LOG.isDebugEnabled()) { 1599 LOG.debug("Query Path: {} ; # list of files: {}", queryPath, Arrays.toString(statusList)); 1600 } 1601 1602 if (null == statusList) { 1603 return; 1604 } 1605 1606 // lower the number of threads in case we have very few expected regions 1607 threadPoolSize = Math.min(threadPoolSize, statusList.length); 1608 1609 // run in multiple threads 1610 final ExecutorService tpe = Executors.newFixedThreadPool(threadPoolSize, 1611 new ThreadFactoryBuilder().setNameFormat("FSRegionQuery-pool-%d").setDaemon(true) 1612 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 1613 try { 1614 // ignore all file status items that are not of interest 1615 for (FileStatus regionStatus : statusList) { 1616 if (null == regionStatus || !regionStatus.isDirectory()) { 1617 continue; 1618 } 1619 1620 final Path regionPath = regionStatus.getPath(); 1621 if (null != regionPath) { 1622 tpe.execute(new FSRegionScanner(fs, regionPath, null, regionDegreeLocalityMapping)); 1623 } 1624 } 1625 } finally { 1626 tpe.shutdown(); 1627 final long threadWakeFrequency = (long) conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 1628 HConstants.DEFAULT_THREAD_WAKE_FREQUENCY); 1629 try { 1630 // here we wait until TPE terminates, which is either naturally or by 1631 // exceptions in the execution of the threads 1632 while (!tpe.awaitTermination(threadWakeFrequency, TimeUnit.MILLISECONDS)) { 1633 // printing out rough estimate, so as to not introduce 1634 // AtomicInteger 1635 LOG.info("Locality checking is underway: { Scanned Regions : " 1636 + ((ThreadPoolExecutor) tpe).getCompletedTaskCount() + "/" 1637 + ((ThreadPoolExecutor) tpe).getTaskCount() + " }"); 1638 } 1639 } catch (InterruptedException e) { 1640 Thread.currentThread().interrupt(); 1641 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 1642 } 1643 } 1644 1645 long overhead = EnvironmentEdgeManager.currentTime() - startTime; 1646 LOG.info("Scan DFS for locality info takes {}ms", overhead); 1647 } 1648 1649 /** 1650 * Do our short circuit read setup. Checks buffer size to use and whether to do checksumming in 1651 * hbase or hdfs. n 1652 */ 1653 public static void setupShortCircuitRead(final Configuration conf) { 1654 // Check that the user has not set the "dfs.client.read.shortcircuit.skip.checksum" property. 1655 boolean shortCircuitSkipChecksum = 1656 conf.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false); 1657 boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true); 1658 if (shortCircuitSkipChecksum) { 1659 LOG.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not " 1660 + "be set to true." 1661 + (useHBaseChecksum 1662 ? " HBase checksum doesn't require " 1663 + "it, see https://issues.apache.org/jira/browse/HBASE-6868." 1664 : "")); 1665 assert !shortCircuitSkipChecksum; // this will fail if assertions are on 1666 } 1667 checkShortCircuitReadBufferSize(conf); 1668 } 1669 1670 /** 1671 * Check if short circuit read buffer size is set and if not, set it to hbase value. n 1672 */ 1673 public static void checkShortCircuitReadBufferSize(final Configuration conf) { 1674 final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2; 1675 final int notSet = -1; 1676 // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2 1677 final String dfsKey = "dfs.client.read.shortcircuit.buffer.size"; 1678 int size = conf.getInt(dfsKey, notSet); 1679 // If a size is set, return -- we will use it. 1680 if (size != notSet) return; 1681 // But short circuit buffer size is normally not set. Put in place the hbase wanted size. 1682 int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize); 1683 conf.setIfUnset(dfsKey, Integer.toString(hbaseSize)); 1684 } 1685 1686 /** 1687 * n * @return The DFSClient DFSHedgedReadMetrics instance or null if can't be found or not on 1688 * hdfs. n 1689 */ 1690 public static DFSHedgedReadMetrics getDFSHedgedReadMetrics(final Configuration c) 1691 throws IOException { 1692 if (!CommonFSUtils.isHDFS(c)) { 1693 return null; 1694 } 1695 // getHedgedReadMetrics is package private. Get the DFSClient instance that is internal 1696 // to the DFS FS instance and make the method getHedgedReadMetrics accessible, then invoke it 1697 // to get the singleton instance of DFSHedgedReadMetrics shared by DFSClients. 1698 final String name = "getHedgedReadMetrics"; 1699 DFSClient dfsclient = ((DistributedFileSystem) FileSystem.get(c)).getClient(); 1700 Method m; 1701 try { 1702 m = dfsclient.getClass().getDeclaredMethod(name); 1703 } catch (NoSuchMethodException e) { 1704 LOG.warn( 1705 "Failed find method " + name + " in dfsclient; no hedged read metrics: " + e.getMessage()); 1706 return null; 1707 } catch (SecurityException e) { 1708 LOG.warn( 1709 "Failed find method " + name + " in dfsclient; no hedged read metrics: " + e.getMessage()); 1710 return null; 1711 } 1712 m.setAccessible(true); 1713 try { 1714 return (DFSHedgedReadMetrics) m.invoke(dfsclient); 1715 } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { 1716 LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " 1717 + e.getMessage()); 1718 return null; 1719 } 1720 } 1721 1722 public static List<Path> copyFilesParallel(FileSystem srcFS, Path src, FileSystem dstFS, Path dst, 1723 Configuration conf, int threads) throws IOException { 1724 ExecutorService pool = Executors.newFixedThreadPool(threads); 1725 List<Future<Void>> futures = new ArrayList<>(); 1726 List<Path> traversedPaths; 1727 try { 1728 traversedPaths = copyFiles(srcFS, src, dstFS, dst, conf, pool, futures); 1729 for (Future<Void> future : futures) { 1730 future.get(); 1731 } 1732 } catch (ExecutionException | InterruptedException | IOException e) { 1733 throw new IOException("Copy snapshot reference files failed", e); 1734 } finally { 1735 pool.shutdownNow(); 1736 } 1737 return traversedPaths; 1738 } 1739 1740 private static List<Path> copyFiles(FileSystem srcFS, Path src, FileSystem dstFS, Path dst, 1741 Configuration conf, ExecutorService pool, List<Future<Void>> futures) throws IOException { 1742 List<Path> traversedPaths = new ArrayList<>(); 1743 traversedPaths.add(dst); 1744 FileStatus currentFileStatus = srcFS.getFileStatus(src); 1745 if (currentFileStatus.isDirectory()) { 1746 if (!dstFS.mkdirs(dst)) { 1747 throw new IOException("Create directory failed: " + dst); 1748 } 1749 FileStatus[] subPaths = srcFS.listStatus(src); 1750 for (FileStatus subPath : subPaths) { 1751 traversedPaths.addAll(copyFiles(srcFS, subPath.getPath(), dstFS, 1752 new Path(dst, subPath.getPath().getName()), conf, pool, futures)); 1753 } 1754 } else { 1755 Future<Void> future = pool.submit(() -> { 1756 FileUtil.copy(srcFS, src, dstFS, dst, false, false, conf); 1757 return null; 1758 }); 1759 futures.add(future); 1760 } 1761 return traversedPaths; 1762 } 1763 1764 /** Returns A set containing all namenode addresses of fs */ 1765 private static Set<InetSocketAddress> getNNAddresses(DistributedFileSystem fs, 1766 Configuration conf) { 1767 Set<InetSocketAddress> addresses = new HashSet<>(); 1768 String serviceName = fs.getCanonicalServiceName(); 1769 1770 if (serviceName.startsWith("ha-hdfs")) { 1771 try { 1772 Map<String, Map<String, InetSocketAddress>> addressMap = 1773 DFSUtil.getNNServiceRpcAddressesForCluster(conf); 1774 String nameService = serviceName.substring(serviceName.indexOf(":") + 1); 1775 if (addressMap.containsKey(nameService)) { 1776 Map<String, InetSocketAddress> nnMap = addressMap.get(nameService); 1777 for (Map.Entry<String, InetSocketAddress> e2 : nnMap.entrySet()) { 1778 InetSocketAddress addr = e2.getValue(); 1779 addresses.add(addr); 1780 } 1781 } 1782 } catch (Exception e) { 1783 LOG.warn("DFSUtil.getNNServiceRpcAddresses failed. serviceName=" + serviceName, e); 1784 } 1785 } else { 1786 URI uri = fs.getUri(); 1787 int port = uri.getPort(); 1788 if (port < 0) { 1789 int idx = serviceName.indexOf(':'); 1790 port = Integer.parseInt(serviceName.substring(idx + 1)); 1791 } 1792 InetSocketAddress addr = new InetSocketAddress(uri.getHost(), port); 1793 addresses.add(addr); 1794 } 1795 1796 return addresses; 1797 } 1798 1799 /** 1800 * @param conf the Configuration of HBase 1801 * @return Whether srcFs and desFs are on same hdfs or not 1802 */ 1803 public static boolean isSameHdfs(Configuration conf, FileSystem srcFs, FileSystem desFs) { 1804 // By getCanonicalServiceName, we could make sure both srcFs and desFs 1805 // show a unified format which contains scheme, host and port. 1806 String srcServiceName = srcFs.getCanonicalServiceName(); 1807 String desServiceName = desFs.getCanonicalServiceName(); 1808 1809 if (srcServiceName == null || desServiceName == null) { 1810 return false; 1811 } 1812 if (srcServiceName.equals(desServiceName)) { 1813 return true; 1814 } 1815 if (srcServiceName.startsWith("ha-hdfs") && desServiceName.startsWith("ha-hdfs")) { 1816 Collection<String> internalNameServices = 1817 conf.getTrimmedStringCollection("dfs.internal.nameservices"); 1818 if (!internalNameServices.isEmpty()) { 1819 if (internalNameServices.contains(srcServiceName.split(":")[1])) { 1820 return true; 1821 } else { 1822 return false; 1823 } 1824 } 1825 } 1826 if (srcFs instanceof DistributedFileSystem && desFs instanceof DistributedFileSystem) { 1827 // If one serviceName is an HA format while the other is a non-HA format, 1828 // maybe they refer to the same FileSystem. 1829 // For example, srcFs is "ha-hdfs://nameservices" and desFs is "hdfs://activeNamenode:port" 1830 Set<InetSocketAddress> srcAddrs = getNNAddresses((DistributedFileSystem) srcFs, conf); 1831 Set<InetSocketAddress> desAddrs = getNNAddresses((DistributedFileSystem) desFs, conf); 1832 if (Sets.intersection(srcAddrs, desAddrs).size() > 0) { 1833 return true; 1834 } 1835 } 1836 1837 return false; 1838 } 1839}