001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import static org.apache.hadoop.hbase.util.LocatedBlockHelper.getLocatedBlockLocations; 021import static org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET; 022 023import edu.umd.cs.findbugs.annotations.CheckForNull; 024import java.io.ByteArrayInputStream; 025import java.io.DataInputStream; 026import java.io.EOFException; 027import java.io.FileNotFoundException; 028import java.io.IOException; 029import java.io.InterruptedIOException; 030import java.lang.reflect.InvocationTargetException; 031import java.lang.reflect.Method; 032import java.net.InetSocketAddress; 033import java.net.URI; 034import java.util.ArrayList; 035import java.util.Arrays; 036import java.util.Collection; 037import java.util.Collections; 038import java.util.HashMap; 039import java.util.HashSet; 040import java.util.Iterator; 041import java.util.List; 042import java.util.Locale; 043import java.util.Map; 044import java.util.Optional; 045import java.util.Set; 046import java.util.Vector; 047import java.util.concurrent.ConcurrentHashMap; 048import java.util.concurrent.ExecutionException; 049import java.util.concurrent.ExecutorService; 050import java.util.concurrent.Executors; 051import java.util.concurrent.Future; 052import java.util.concurrent.FutureTask; 053import java.util.concurrent.ThreadPoolExecutor; 054import java.util.concurrent.TimeUnit; 055import java.util.regex.Pattern; 056import org.apache.commons.lang3.ArrayUtils; 057import org.apache.hadoop.conf.Configuration; 058import org.apache.hadoop.fs.BlockLocation; 059import org.apache.hadoop.fs.FSDataInputStream; 060import org.apache.hadoop.fs.FSDataOutputStream; 061import org.apache.hadoop.fs.FileStatus; 062import org.apache.hadoop.fs.FileSystem; 063import org.apache.hadoop.fs.FileUtil; 064import org.apache.hadoop.fs.Path; 065import org.apache.hadoop.fs.PathFilter; 066import org.apache.hadoop.fs.StorageType; 067import org.apache.hadoop.fs.permission.FsPermission; 068import org.apache.hadoop.hbase.ClusterId; 069import org.apache.hadoop.hbase.HConstants; 070import org.apache.hadoop.hbase.HDFSBlocksDistribution; 071import org.apache.hadoop.hbase.TableName; 072import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 073import org.apache.hadoop.hbase.client.RegionInfo; 074import org.apache.hadoop.hbase.client.RegionInfoBuilder; 075import org.apache.hadoop.hbase.exceptions.DeserializationException; 076import org.apache.hadoop.hbase.fs.HFileSystem; 077import org.apache.hadoop.hbase.io.HFileLink; 078import org.apache.hadoop.hbase.master.HMaster; 079import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 080import org.apache.hadoop.hdfs.DFSClient; 081import org.apache.hadoop.hdfs.DFSHedgedReadMetrics; 082import org.apache.hadoop.hdfs.DFSUtil; 083import org.apache.hadoop.hdfs.DistributedFileSystem; 084import org.apache.hadoop.hdfs.client.HdfsDataInputStream; 085import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 086import org.apache.hadoop.hdfs.protocol.LocatedBlock; 087import org.apache.hadoop.io.IOUtils; 088import org.apache.hadoop.ipc.RemoteException; 089import org.apache.yetus.audience.InterfaceAudience; 090import org.slf4j.Logger; 091import org.slf4j.LoggerFactory; 092 093import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 094import org.apache.hbase.thirdparty.com.google.common.collect.Iterators; 095import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 096import org.apache.hbase.thirdparty.com.google.common.primitives.Ints; 097import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 098 099import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 100import org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos; 101 102/** 103 * Utility methods for interacting with the underlying file system. 104 */ 105@InterfaceAudience.Private 106public final class FSUtils { 107 private static final Logger LOG = LoggerFactory.getLogger(FSUtils.class); 108 109 private static final String THREAD_POOLSIZE = "hbase.client.localityCheck.threadPoolSize"; 110 private static final int DEFAULT_THREAD_POOLSIZE = 2; 111 112 /** Set to true on Windows platforms */ 113 // currently only used in testing. TODO refactor into a test class 114 public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows"); 115 116 private static Class<?> safeModeClazz = null; 117 private static Class<?> safeModeActionClazz = null; 118 private static Object safeModeGet = null; 119 { 120 try { 121 safeModeClazz = Class.forName("org.apache.hadoop.fs.SafeMode"); 122 safeModeActionClazz = Class.forName("org.apache.hadoop.fs.SafeModeAction"); 123 safeModeGet = safeModeClazz.getField("SAFEMODE_GET").get(null); 124 } catch (ClassNotFoundException | NoSuchFieldException e) { 125 LOG.debug("SafeMode interface not in the classpath, this means Hadoop 3.3.5 or below."); 126 } catch (IllegalAccessException e) { 127 LOG.error("SafeModeAction.SAFEMODE_GET is not accessible. " 128 + "Unexpected Hadoop version or messy classpath?", e); 129 throw new RuntimeException(e); 130 } 131 } 132 133 private FSUtils() { 134 } 135 136 /** Returns True is <code>fs</code> is instance of DistributedFileSystem n */ 137 public static boolean isDistributedFileSystem(final FileSystem fs) throws IOException { 138 FileSystem fileSystem = fs; 139 // If passed an instance of HFileSystem, it fails instanceof DistributedFileSystem. 140 // Check its backing fs for dfs-ness. 141 if (fs instanceof HFileSystem) { 142 fileSystem = ((HFileSystem) fs).getBackingFs(); 143 } 144 return fileSystem instanceof DistributedFileSystem; 145 } 146 147 /** 148 * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the 149 * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true. Does not consider 150 * schema; i.e. if schemas different but path or subpath matches, the two will equate. 151 * @param pathToSearch Path we will be trying to match. 152 * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code> 153 */ 154 public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) { 155 Path tailPath = pathTail; 156 String tailName; 157 Path toSearch = pathToSearch; 158 String toSearchName; 159 boolean result = false; 160 161 if (pathToSearch.depth() != pathTail.depth()) { 162 return false; 163 } 164 165 do { 166 tailName = tailPath.getName(); 167 if (tailName == null || tailName.isEmpty()) { 168 result = true; 169 break; 170 } 171 toSearchName = toSearch.getName(); 172 if (toSearchName == null || toSearchName.isEmpty()) { 173 break; 174 } 175 // Move up a parent on each path for next go around. Path doesn't let us go off the end. 176 tailPath = tailPath.getParent(); 177 toSearch = toSearch.getParent(); 178 } while (tailName.equals(toSearchName)); 179 return result; 180 } 181 182 /** 183 * Delete the region directory if exists. 184 * @return True if deleted the region directory. 185 */ 186 public static boolean deleteRegionDir(final Configuration conf, final RegionInfo hri) 187 throws IOException { 188 Path rootDir = CommonFSUtils.getRootDir(conf); 189 FileSystem fs = rootDir.getFileSystem(conf); 190 return CommonFSUtils.deleteDirectory(fs, 191 new Path(CommonFSUtils.getTableDir(rootDir, hri.getTable()), hri.getEncodedName())); 192 } 193 194 /** 195 * Create the specified file on the filesystem. By default, this will: 196 * <ol> 197 * <li>overwrite the file if it exists</li> 198 * <li>apply the umask in the configuration (if it is enabled)</li> 199 * <li>use the fs configured buffer size (or 4096 if not set)</li> 200 * <li>use the configured column family replication or default replication if 201 * {@link ColumnFamilyDescriptorBuilder#DEFAULT_DFS_REPLICATION}</li> 202 * <li>use the default block size</li> 203 * <li>not track progress</li> 204 * </ol> 205 * @param conf configurations 206 * @param fs {@link FileSystem} on which to write the file 207 * @param path {@link Path} to the file to write 208 * @param perm permissions 209 * @param favoredNodes favored data nodes 210 * @return output stream to the created file 211 * @throws IOException if the file cannot be created 212 */ 213 public static FSDataOutputStream create(Configuration conf, FileSystem fs, Path path, 214 FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException { 215 return create(conf, fs, path, perm, favoredNodes, true); 216 } 217 218 /** 219 * Create the specified file on the filesystem. By default, this will: 220 * <ol> 221 * <li>overwrite the file if it exists</li> 222 * <li>apply the umask in the configuration (if it is enabled)</li> 223 * <li>use the fs configured buffer size (or 4096 if not set)</li> 224 * <li>use the configured column family replication or default replication if 225 * {@link ColumnFamilyDescriptorBuilder#DEFAULT_DFS_REPLICATION}</li> 226 * <li>use the default block size</li> 227 * <li>not track progress</li> 228 * </ol> 229 * @param conf configurations 230 * @param fs {@link FileSystem} on which to write the file 231 * @param path {@link Path} to the file to write 232 * @param perm permissions 233 * @param favoredNodes favored data nodes 234 * @param isRecursiveCreate recursively create parent directories 235 * @return output stream to the created file 236 * @throws IOException if the file cannot be created 237 */ 238 public static FSDataOutputStream create(Configuration conf, FileSystem fs, Path path, 239 FsPermission perm, InetSocketAddress[] favoredNodes, boolean isRecursiveCreate) 240 throws IOException { 241 if (fs instanceof HFileSystem) { 242 FileSystem backingFs = ((HFileSystem) fs).getBackingFs(); 243 if (backingFs instanceof DistributedFileSystem) { 244 short replication = Short.parseShort(conf.get(ColumnFamilyDescriptorBuilder.DFS_REPLICATION, 245 String.valueOf(ColumnFamilyDescriptorBuilder.DEFAULT_DFS_REPLICATION))); 246 DistributedFileSystem.HdfsDataOutputStreamBuilder builder = 247 ((DistributedFileSystem) backingFs).createFile(path).recursive().permission(perm) 248 .create(); 249 if (favoredNodes != null) { 250 builder.favoredNodes(favoredNodes); 251 } 252 if (replication > 0) { 253 builder.replication(replication); 254 } 255 return builder.build(); 256 } 257 258 } 259 return CommonFSUtils.create(fs, path, perm, true, isRecursiveCreate); 260 } 261 262 /** 263 * Checks to see if the specified file system is available 264 * @param fs filesystem 265 * @throws IOException e 266 */ 267 public static void checkFileSystemAvailable(final FileSystem fs) throws IOException { 268 if (!(fs instanceof DistributedFileSystem)) { 269 return; 270 } 271 IOException exception = null; 272 DistributedFileSystem dfs = (DistributedFileSystem) fs; 273 try { 274 if (dfs.exists(new Path("/"))) { 275 return; 276 } 277 } catch (IOException e) { 278 exception = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; 279 } 280 try { 281 fs.close(); 282 } catch (Exception e) { 283 LOG.error("file system close failed: ", e); 284 } 285 throw new IOException("File system is not available", exception); 286 } 287 288 /** 289 * Inquire the Active NameNode's safe mode status. 290 * @param dfs A DistributedFileSystem object representing the underlying HDFS. 291 * @return whether we're in safe mode 292 */ 293 private static boolean isInSafeMode(FileSystem dfs) throws IOException { 294 if (isDistributedFileSystem(dfs)) { 295 return ((DistributedFileSystem) dfs).setSafeMode(SAFEMODE_GET, true); 296 } else { 297 try { 298 Object ret = dfs.getClass() 299 .getMethod("setSafeMode", new Class[] { safeModeActionClazz, Boolean.class }) 300 .invoke(dfs, safeModeGet, true); 301 return (Boolean) ret; 302 } catch (InvocationTargetException | IllegalAccessException | NoSuchMethodException e) { 303 LOG.error("The file system does not support setSafeMode(). Abort.", e); 304 throw new RuntimeException(e); 305 } 306 } 307 } 308 309 /** 310 * Check whether dfs is in safemode. 311 */ 312 public static void checkDfsSafeMode(final Configuration conf) throws IOException { 313 boolean isInSafeMode = false; 314 FileSystem fs = FileSystem.get(conf); 315 if (supportSafeMode(fs)) { 316 isInSafeMode = isInSafeMode(fs); 317 } 318 if (isInSafeMode) { 319 throw new IOException("File system is in safemode, it can't be written now"); 320 } 321 } 322 323 /** 324 * Verifies current version of file system 325 * @param fs filesystem object 326 * @param rootdir root hbase directory 327 * @return null if no version file exists, version string otherwise 328 * @throws IOException if the version file fails to open 329 * @throws DeserializationException if the version data cannot be translated into a version 330 */ 331 public static String getVersion(FileSystem fs, Path rootdir) 332 throws IOException, DeserializationException { 333 final Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME); 334 FileStatus[] status = null; 335 try { 336 // hadoop 2.0 throws FNFE if directory does not exist. 337 // hadoop 1.0 returns null if directory does not exist. 338 status = fs.listStatus(versionFile); 339 } catch (FileNotFoundException fnfe) { 340 return null; 341 } 342 if (ArrayUtils.getLength(status) == 0) { 343 return null; 344 } 345 String version = null; 346 byte[] content = new byte[(int) status[0].getLen()]; 347 FSDataInputStream s = fs.open(versionFile); 348 try { 349 IOUtils.readFully(s, content, 0, content.length); 350 if (ProtobufUtil.isPBMagicPrefix(content)) { 351 version = parseVersionFrom(content); 352 } else { 353 // Presume it pre-pb format. 354 try (DataInputStream dis = new DataInputStream(new ByteArrayInputStream(content))) { 355 version = dis.readUTF(); 356 } 357 } 358 } catch (EOFException eof) { 359 LOG.warn("Version file was empty, odd, will try to set it."); 360 } finally { 361 s.close(); 362 } 363 return version; 364 } 365 366 /** 367 * Parse the content of the ${HBASE_ROOTDIR}/hbase.version file. 368 * @param bytes The byte content of the hbase.version file 369 * @return The version found in the file as a String 370 * @throws DeserializationException if the version data cannot be translated into a version 371 */ 372 static String parseVersionFrom(final byte[] bytes) throws DeserializationException { 373 ProtobufUtil.expectPBMagicPrefix(bytes); 374 int pblen = ProtobufUtil.lengthOfPBMagic(); 375 FSProtos.HBaseVersionFileContent.Builder builder = 376 FSProtos.HBaseVersionFileContent.newBuilder(); 377 try { 378 ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen); 379 return builder.getVersion(); 380 } catch (IOException e) { 381 // Convert 382 throw new DeserializationException(e); 383 } 384 } 385 386 /** 387 * Create the content to write into the ${HBASE_ROOTDIR}/hbase.version file. 388 * @param version Version to persist 389 * @return Serialized protobuf with <code>version</code> content and a bit of pb magic for a 390 * prefix. 391 */ 392 static byte[] toVersionByteArray(final String version) { 393 FSProtos.HBaseVersionFileContent.Builder builder = 394 FSProtos.HBaseVersionFileContent.newBuilder(); 395 return ProtobufUtil.prependPBMagic(builder.setVersion(version).build().toByteArray()); 396 } 397 398 /** 399 * Verifies current version of file system 400 * @param fs file system 401 * @param rootdir root directory of HBase installation 402 * @param message if true, issues a message on System.out 403 * @throws IOException if the version file cannot be opened 404 * @throws DeserializationException if the contents of the version file cannot be parsed 405 */ 406 public static void checkVersion(FileSystem fs, Path rootdir, boolean message) 407 throws IOException, DeserializationException { 408 checkVersion(fs, rootdir, message, 0, HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS); 409 } 410 411 /** 412 * Verifies current version of file system 413 * @param fs file system 414 * @param rootdir root directory of HBase installation 415 * @param message if true, issues a message on System.out 416 * @param wait wait interval 417 * @param retries number of times to retry 418 * @throws IOException if the version file cannot be opened 419 * @throws DeserializationException if the contents of the version file cannot be parsed 420 */ 421 public static void checkVersion(FileSystem fs, Path rootdir, boolean message, int wait, 422 int retries) throws IOException, DeserializationException { 423 String version = getVersion(fs, rootdir); 424 String msg; 425 if (version == null) { 426 if (!metaRegionExists(fs, rootdir)) { 427 // rootDir is empty (no version file and no root region) 428 // just create new version file (HBASE-1195) 429 setVersion(fs, rootdir, wait, retries); 430 return; 431 } else { 432 msg = "hbase.version file is missing. Is your hbase.rootdir valid? " 433 + "You can restore hbase.version file by running 'HBCK2 filesystem -fix'. " 434 + "See https://github.com/apache/hbase-operator-tools/tree/master/hbase-hbck2"; 435 } 436 } else if (version.compareTo(HConstants.FILE_SYSTEM_VERSION) == 0) { 437 return; 438 } else { 439 msg = "HBase file layout needs to be upgraded. Current filesystem version is " + version 440 + " but software requires version " + HConstants.FILE_SYSTEM_VERSION 441 + ". Consult http://hbase.apache.org/book.html for further information about " 442 + "upgrading HBase."; 443 } 444 445 // version is deprecated require migration 446 // Output on stdout so user sees it in terminal. 447 if (message) { 448 System.out.println("WARNING! " + msg); 449 } 450 throw new FileSystemVersionException(msg); 451 } 452 453 /** 454 * Sets version of file system 455 * @param fs filesystem object 456 * @param rootdir hbase root 457 * @throws IOException e 458 */ 459 public static void setVersion(FileSystem fs, Path rootdir) throws IOException { 460 setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, 0, 461 HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS); 462 } 463 464 /** 465 * Sets version of file system 466 * @param fs filesystem object 467 * @param rootdir hbase root 468 * @param wait time to wait for retry 469 * @param retries number of times to retry before failing 470 * @throws IOException e 471 */ 472 public static void setVersion(FileSystem fs, Path rootdir, int wait, int retries) 473 throws IOException { 474 setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, wait, retries); 475 } 476 477 /** 478 * Sets version of file system 479 * @param fs filesystem object 480 * @param rootdir hbase root directory 481 * @param version version to set 482 * @param wait time to wait for retry 483 * @param retries number of times to retry before throwing an IOException 484 * @throws IOException e 485 */ 486 public static void setVersion(FileSystem fs, Path rootdir, String version, int wait, int retries) 487 throws IOException { 488 Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME); 489 Path tempVersionFile = new Path(rootdir, 490 HConstants.HBASE_TEMP_DIRECTORY + Path.SEPARATOR + HConstants.VERSION_FILE_NAME); 491 while (true) { 492 try { 493 // Write the version to a temporary file 494 FSDataOutputStream s = fs.create(tempVersionFile); 495 try { 496 s.write(toVersionByteArray(version)); 497 s.close(); 498 s = null; 499 // Move the temp version file to its normal location. Returns false 500 // if the rename failed. Throw an IOE in that case. 501 if (!fs.rename(tempVersionFile, versionFile)) { 502 throw new IOException("Unable to move temp version file to " + versionFile); 503 } 504 } finally { 505 // Cleaning up the temporary if the rename failed would be trying 506 // too hard. We'll unconditionally create it again the next time 507 // through anyway, files are overwritten by default by create(). 508 509 // Attempt to close the stream on the way out if it is still open. 510 try { 511 if (s != null) s.close(); 512 } catch (IOException ignore) { 513 } 514 } 515 LOG.info("Created version file at " + rootdir.toString() + " with version=" + version); 516 return; 517 } catch (IOException e) { 518 if (retries > 0) { 519 LOG.debug("Unable to create version file at " + rootdir.toString() + ", retrying", e); 520 fs.delete(versionFile, false); 521 try { 522 if (wait > 0) { 523 Thread.sleep(wait); 524 } 525 } catch (InterruptedException ie) { 526 throw (InterruptedIOException) new InterruptedIOException().initCause(ie); 527 } 528 retries--; 529 } else { 530 throw e; 531 } 532 } 533 } 534 } 535 536 /** 537 * Checks that a cluster ID file exists in the HBase root directory 538 * @param fs the root directory FileSystem 539 * @param rootdir the HBase root directory in HDFS 540 * @param wait how long to wait between retries 541 * @return <code>true</code> if the file exists, otherwise <code>false</code> 542 * @throws IOException if checking the FileSystem fails 543 */ 544 public static boolean checkClusterIdExists(FileSystem fs, Path rootdir, long wait) 545 throws IOException { 546 while (true) { 547 try { 548 Path filePath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME); 549 return fs.exists(filePath); 550 } catch (IOException ioe) { 551 if (wait > 0L) { 552 LOG.warn("Unable to check cluster ID file in {}, retrying in {}ms", rootdir, wait, ioe); 553 try { 554 Thread.sleep(wait); 555 } catch (InterruptedException e) { 556 Thread.currentThread().interrupt(); 557 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 558 } 559 } else { 560 throw ioe; 561 } 562 } 563 } 564 } 565 566 /** 567 * Returns the value of the unique cluster ID stored for this HBase instance. 568 * @param fs the root directory FileSystem 569 * @param rootdir the path to the HBase root directory 570 * @return the unique cluster identifier 571 * @throws IOException if reading the cluster ID file fails 572 */ 573 public static ClusterId getClusterId(FileSystem fs, Path rootdir) throws IOException { 574 Path idPath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME); 575 ClusterId clusterId = null; 576 FileStatus status = fs.exists(idPath) ? fs.getFileStatus(idPath) : null; 577 if (status != null) { 578 int len = Ints.checkedCast(status.getLen()); 579 byte[] content = new byte[len]; 580 FSDataInputStream in = fs.open(idPath); 581 try { 582 in.readFully(content); 583 } catch (EOFException eof) { 584 LOG.warn("Cluster ID file {} is empty", idPath); 585 } finally { 586 in.close(); 587 } 588 try { 589 clusterId = ClusterId.parseFrom(content); 590 } catch (DeserializationException e) { 591 throw new IOException("content=" + Bytes.toString(content), e); 592 } 593 // If not pb'd, make it so. 594 if (!ProtobufUtil.isPBMagicPrefix(content)) { 595 String cid = null; 596 in = fs.open(idPath); 597 try { 598 cid = in.readUTF(); 599 clusterId = new ClusterId(cid); 600 } catch (EOFException eof) { 601 LOG.warn("Cluster ID file {} is empty", idPath); 602 } finally { 603 in.close(); 604 } 605 rewriteAsPb(fs, rootdir, idPath, clusterId); 606 } 607 return clusterId; 608 } else { 609 LOG.warn("Cluster ID file does not exist at {}", idPath); 610 } 611 return clusterId; 612 } 613 614 /** 615 * */ 616 private static void rewriteAsPb(final FileSystem fs, final Path rootdir, final Path p, 617 final ClusterId cid) throws IOException { 618 // Rewrite the file as pb. Move aside the old one first, write new 619 // then delete the moved-aside file. 620 Path movedAsideName = new Path(p + "." + EnvironmentEdgeManager.currentTime()); 621 if (!fs.rename(p, movedAsideName)) throw new IOException("Failed rename of " + p); 622 setClusterId(fs, rootdir, cid, 100); 623 if (!fs.delete(movedAsideName, false)) { 624 throw new IOException("Failed delete of " + movedAsideName); 625 } 626 LOG.debug("Rewrote the hbase.id file as pb"); 627 } 628 629 /** 630 * Writes a new unique identifier for this cluster to the "hbase.id" file in the HBase root 631 * directory. If any operations on the ID file fails, and {@code wait} is a positive value, the 632 * method will retry to produce the ID file until the thread is forcibly interrupted. 633 * @param fs the root directory FileSystem 634 * @param rootdir the path to the HBase root directory 635 * @param clusterId the unique identifier to store 636 * @param wait how long (in milliseconds) to wait between retries 637 * @throws IOException if writing to the FileSystem fails and no wait value 638 */ 639 public static void setClusterId(final FileSystem fs, final Path rootdir, 640 final ClusterId clusterId, final long wait) throws IOException { 641 642 final Path idFile = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME); 643 final Path tempDir = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY); 644 final Path tempIdFile = new Path(tempDir, HConstants.CLUSTER_ID_FILE_NAME); 645 646 LOG.debug("Create cluster ID file [{}] with ID: {}", idFile, clusterId); 647 648 while (true) { 649 Optional<IOException> failure = Optional.empty(); 650 651 LOG.debug("Write the cluster ID file to a temporary location: {}", tempIdFile); 652 try (FSDataOutputStream s = fs.create(tempIdFile)) { 653 s.write(clusterId.toByteArray()); 654 } catch (IOException ioe) { 655 failure = Optional.of(ioe); 656 } 657 658 if (!failure.isPresent()) { 659 try { 660 LOG.debug("Move the temporary cluster ID file to its target location [{}]:[{}]", 661 tempIdFile, idFile); 662 663 if (!fs.rename(tempIdFile, idFile)) { 664 failure = 665 Optional.of(new IOException("Unable to move temp cluster ID file to " + idFile)); 666 } 667 } catch (IOException ioe) { 668 failure = Optional.of(ioe); 669 } 670 } 671 672 if (failure.isPresent()) { 673 final IOException cause = failure.get(); 674 if (wait > 0L) { 675 LOG.warn("Unable to create cluster ID file in {}, retrying in {}ms", rootdir, wait, 676 cause); 677 try { 678 Thread.sleep(wait); 679 } catch (InterruptedException e) { 680 Thread.currentThread().interrupt(); 681 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 682 } 683 continue; 684 } else { 685 throw cause; 686 } 687 } else { 688 return; 689 } 690 } 691 } 692 693 /** 694 * If DFS, check safe mode and if so, wait until we clear it. 695 * @param conf configuration 696 * @param wait Sleep between retries 697 * @throws IOException e 698 */ 699 public static void waitOnSafeMode(final Configuration conf, final long wait) throws IOException { 700 FileSystem fs = FileSystem.get(conf); 701 if (!supportSafeMode(fs)) { 702 return; 703 } 704 // Make sure dfs is not in safe mode 705 while (isInSafeMode(fs)) { 706 LOG.info("Waiting for dfs to exit safe mode..."); 707 try { 708 Thread.sleep(wait); 709 } catch (InterruptedException e) { 710 Thread.currentThread().interrupt(); 711 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 712 } 713 } 714 } 715 716 public static boolean supportSafeMode(FileSystem fs) { 717 // return true if HDFS. 718 if (fs instanceof DistributedFileSystem) { 719 return true; 720 } 721 // return true if the file system implements SafeMode interface. 722 if (safeModeClazz != null) { 723 return (safeModeClazz.isAssignableFrom(fs.getClass())); 724 } 725 // return false if the file system is not HDFS and does not implement SafeMode interface. 726 return false; 727 } 728 729 /** 730 * Checks if meta region exists 731 * @param fs file system 732 * @param rootDir root directory of HBase installation 733 * @return true if exists 734 */ 735 public static boolean metaRegionExists(FileSystem fs, Path rootDir) throws IOException { 736 Path metaRegionDir = getRegionDirFromRootDir(rootDir, RegionInfoBuilder.FIRST_META_REGIONINFO); 737 return fs.exists(metaRegionDir); 738 } 739 740 /** 741 * Compute HDFS block distribution of a given HdfsDataInputStream. All HdfsDataInputStreams are 742 * backed by a series of LocatedBlocks, which are fetched periodically from the namenode. This 743 * method retrieves those blocks from the input stream and uses them to calculate 744 * HDFSBlockDistribution. The underlying method in DFSInputStream does attempt to use locally 745 * cached blocks, but may hit the namenode if the cache is determined to be incomplete. The method 746 * also involves making copies of all LocatedBlocks rather than return the underlying blocks 747 * themselves. 748 */ 749 static public HDFSBlocksDistribution 750 computeHDFSBlocksDistribution(HdfsDataInputStream inputStream) throws IOException { 751 List<LocatedBlock> blocks = inputStream.getAllBlocks(); 752 HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution(); 753 for (LocatedBlock block : blocks) { 754 String[] hosts = getHostsForLocations(block); 755 long len = block.getBlockSize(); 756 StorageType[] storageTypes = block.getStorageTypes(); 757 blocksDistribution.addHostsAndBlockWeight(hosts, len, storageTypes); 758 } 759 return blocksDistribution; 760 } 761 762 private static String[] getHostsForLocations(LocatedBlock block) { 763 DatanodeInfo[] locations = getLocatedBlockLocations(block); 764 String[] hosts = new String[locations.length]; 765 for (int i = 0; i < hosts.length; i++) { 766 hosts[i] = locations[i].getHostName(); 767 } 768 return hosts; 769 } 770 771 /** 772 * Compute HDFS blocks distribution of a given file, or a portion of the file 773 * @param fs file system 774 * @param status file status of the file 775 * @param start start position of the portion 776 * @param length length of the portion 777 * @return The HDFS blocks distribution 778 */ 779 static public HDFSBlocksDistribution computeHDFSBlocksDistribution(final FileSystem fs, 780 FileStatus status, long start, long length) throws IOException { 781 HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution(); 782 BlockLocation[] blockLocations = fs.getFileBlockLocations(status, start, length); 783 addToHDFSBlocksDistribution(blocksDistribution, blockLocations); 784 return blocksDistribution; 785 } 786 787 /** 788 * Update blocksDistribution with blockLocations 789 * @param blocksDistribution the hdfs blocks distribution 790 * @param blockLocations an array containing block location 791 */ 792 static public void addToHDFSBlocksDistribution(HDFSBlocksDistribution blocksDistribution, 793 BlockLocation[] blockLocations) throws IOException { 794 for (BlockLocation bl : blockLocations) { 795 String[] hosts = bl.getHosts(); 796 long len = bl.getLength(); 797 StorageType[] storageTypes = bl.getStorageTypes(); 798 blocksDistribution.addHostsAndBlockWeight(hosts, len, storageTypes); 799 } 800 } 801 802 // TODO move this method OUT of FSUtils. No dependencies to HMaster 803 /** 804 * Returns the total overall fragmentation percentage. Includes hbase:meta and -ROOT- as well. 805 * @param master The master defining the HBase root and file system 806 * @return A map for each table and its percentage (never null) 807 * @throws IOException When scanning the directory fails 808 */ 809 public static int getTotalTableFragmentation(final HMaster master) throws IOException { 810 Map<String, Integer> map = getTableFragmentation(master); 811 return map.isEmpty() ? -1 : map.get("-TOTAL-"); 812 } 813 814 /** 815 * Runs through the HBase rootdir and checks how many stores for each table have more than one 816 * file in them. Checks -ROOT- and hbase:meta too. The total percentage across all tables is 817 * stored under the special key "-TOTAL-". 818 * @param master The master defining the HBase root and file system. 819 * @return A map for each table and its percentage (never null). 820 * @throws IOException When scanning the directory fails. 821 */ 822 public static Map<String, Integer> getTableFragmentation(final HMaster master) 823 throws IOException { 824 Path path = CommonFSUtils.getRootDir(master.getConfiguration()); 825 // since HMaster.getFileSystem() is package private 826 FileSystem fs = path.getFileSystem(master.getConfiguration()); 827 return getTableFragmentation(fs, path); 828 } 829 830 /** 831 * Runs through the HBase rootdir and checks how many stores for each table have more than one 832 * file in them. Checks -ROOT- and hbase:meta too. The total percentage across all tables is 833 * stored under the special key "-TOTAL-". 834 * @param fs The file system to use 835 * @param hbaseRootDir The root directory to scan 836 * @return A map for each table and its percentage (never null) 837 * @throws IOException When scanning the directory fails 838 */ 839 public static Map<String, Integer> getTableFragmentation(final FileSystem fs, 840 final Path hbaseRootDir) throws IOException { 841 Map<String, Integer> frags = new HashMap<>(); 842 int cfCountTotal = 0; 843 int cfFragTotal = 0; 844 PathFilter regionFilter = new RegionDirFilter(fs); 845 PathFilter familyFilter = new FamilyDirFilter(fs); 846 List<Path> tableDirs = getTableDirs(fs, hbaseRootDir); 847 for (Path d : tableDirs) { 848 int cfCount = 0; 849 int cfFrag = 0; 850 FileStatus[] regionDirs = fs.listStatus(d, regionFilter); 851 for (FileStatus regionDir : regionDirs) { 852 Path dd = regionDir.getPath(); 853 // else its a region name, now look in region for families 854 FileStatus[] familyDirs = fs.listStatus(dd, familyFilter); 855 for (FileStatus familyDir : familyDirs) { 856 cfCount++; 857 cfCountTotal++; 858 Path family = familyDir.getPath(); 859 // now in family make sure only one file 860 FileStatus[] familyStatus = fs.listStatus(family); 861 if (familyStatus.length > 1) { 862 cfFrag++; 863 cfFragTotal++; 864 } 865 } 866 } 867 // compute percentage per table and store in result list 868 frags.put(CommonFSUtils.getTableName(d).getNameAsString(), 869 cfCount == 0 ? 0 : Math.round((float) cfFrag / cfCount * 100)); 870 } 871 // set overall percentage for all tables 872 frags.put("-TOTAL-", 873 cfCountTotal == 0 ? 0 : Math.round((float) cfFragTotal / cfCountTotal * 100)); 874 return frags; 875 } 876 877 public static void renameFile(FileSystem fs, Path src, Path dst) throws IOException { 878 if (fs.exists(dst) && !fs.delete(dst, false)) { 879 throw new IOException("Can not delete " + dst); 880 } 881 if (!fs.rename(src, dst)) { 882 throw new IOException("Can not rename from " + src + " to " + dst); 883 } 884 } 885 886 /** 887 * A {@link PathFilter} that returns only regular files. 888 */ 889 static class FileFilter extends AbstractFileStatusFilter { 890 private final FileSystem fs; 891 892 public FileFilter(final FileSystem fs) { 893 this.fs = fs; 894 } 895 896 @Override 897 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 898 try { 899 return isFile(fs, isDir, p); 900 } catch (IOException e) { 901 LOG.warn("Unable to verify if path={} is a regular file", p, e); 902 return false; 903 } 904 } 905 } 906 907 /** 908 * Directory filter that doesn't include any of the directories in the specified blacklist 909 */ 910 public static class BlackListDirFilter extends AbstractFileStatusFilter { 911 private final FileSystem fs; 912 private List<String> blacklist; 913 914 /** 915 * Create a filter on the givem filesystem with the specified blacklist 916 * @param fs filesystem to filter 917 * @param directoryNameBlackList list of the names of the directories to filter. If 918 * <tt>null</tt>, all directories are returned 919 */ 920 @SuppressWarnings("unchecked") 921 public BlackListDirFilter(final FileSystem fs, final List<String> directoryNameBlackList) { 922 this.fs = fs; 923 blacklist = (List<String>) (directoryNameBlackList == null 924 ? Collections.emptyList() 925 : directoryNameBlackList); 926 } 927 928 @Override 929 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 930 if (!isValidName(p.getName())) { 931 return false; 932 } 933 934 try { 935 return isDirectory(fs, isDir, p); 936 } catch (IOException e) { 937 LOG.warn("An error occurred while verifying if [{}] is a valid directory." 938 + " Returning 'not valid' and continuing.", p, e); 939 return false; 940 } 941 } 942 943 protected boolean isValidName(final String name) { 944 return !blacklist.contains(name); 945 } 946 } 947 948 /** 949 * A {@link PathFilter} that only allows directories. 950 */ 951 public static class DirFilter extends BlackListDirFilter { 952 953 public DirFilter(FileSystem fs) { 954 super(fs, null); 955 } 956 } 957 958 /** 959 * A {@link PathFilter} that returns usertable directories. To get all directories use the 960 * {@link BlackListDirFilter} with a <tt>null</tt> blacklist 961 */ 962 public static class UserTableDirFilter extends BlackListDirFilter { 963 public UserTableDirFilter(FileSystem fs) { 964 super(fs, HConstants.HBASE_NON_TABLE_DIRS); 965 } 966 967 @Override 968 protected boolean isValidName(final String name) { 969 if (!super.isValidName(name)) return false; 970 971 try { 972 TableName.isLegalTableQualifierName(Bytes.toBytes(name)); 973 } catch (IllegalArgumentException e) { 974 LOG.info("Invalid table name: {}", name); 975 return false; 976 } 977 return true; 978 } 979 } 980 981 public static List<Path> getTableDirs(final FileSystem fs, final Path rootdir) 982 throws IOException { 983 List<Path> tableDirs = new ArrayList<>(); 984 Path baseNamespaceDir = new Path(rootdir, HConstants.BASE_NAMESPACE_DIR); 985 if (fs.exists(baseNamespaceDir)) { 986 for (FileStatus status : fs.globStatus(new Path(baseNamespaceDir, "*"))) { 987 tableDirs.addAll(FSUtils.getLocalTableDirs(fs, status.getPath())); 988 } 989 } 990 return tableDirs; 991 } 992 993 /** 994 * @return All the table directories under <code>rootdir</code>. Ignore non table hbase folders 995 * such as .logs, .oldlogs, .corrupt folders. 996 */ 997 public static List<Path> getLocalTableDirs(final FileSystem fs, final Path rootdir) 998 throws IOException { 999 // presumes any directory under hbase.rootdir is a table 1000 FileStatus[] dirs = fs.listStatus(rootdir, new UserTableDirFilter(fs)); 1001 List<Path> tabledirs = new ArrayList<>(dirs.length); 1002 for (FileStatus dir : dirs) { 1003 tabledirs.add(dir.getPath()); 1004 } 1005 return tabledirs; 1006 } 1007 1008 /** 1009 * Filter for all dirs that don't start with '.' 1010 */ 1011 public static class RegionDirFilter extends AbstractFileStatusFilter { 1012 // This pattern will accept 0.90+ style hex region dirs and older numeric region dir names. 1013 final public static Pattern regionDirPattern = Pattern.compile("^[0-9a-f]*$"); 1014 final FileSystem fs; 1015 1016 public RegionDirFilter(FileSystem fs) { 1017 this.fs = fs; 1018 } 1019 1020 @Override 1021 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 1022 if (!regionDirPattern.matcher(p.getName()).matches()) { 1023 return false; 1024 } 1025 1026 try { 1027 return isDirectory(fs, isDir, p); 1028 } catch (IOException ioe) { 1029 // Maybe the file was moved or the fs was disconnected. 1030 LOG.warn("Skipping file {} due to IOException", p, ioe); 1031 return false; 1032 } 1033 } 1034 } 1035 1036 /** 1037 * Given a particular table dir, return all the regiondirs inside it, excluding files such as 1038 * .tableinfo 1039 * @param fs A file system for the Path 1040 * @param tableDir Path to a specific table directory <hbase.rootdir>/<tabledir> 1041 * @return List of paths to valid region directories in table dir. 1042 */ 1043 public static List<Path> getRegionDirs(final FileSystem fs, final Path tableDir) 1044 throws IOException { 1045 // assumes we are in a table dir. 1046 List<FileStatus> rds = listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs)); 1047 if (rds == null) { 1048 return Collections.emptyList(); 1049 } 1050 List<Path> regionDirs = new ArrayList<>(rds.size()); 1051 for (FileStatus rdfs : rds) { 1052 Path rdPath = rdfs.getPath(); 1053 regionDirs.add(rdPath); 1054 } 1055 return regionDirs; 1056 } 1057 1058 public static Path getRegionDirFromRootDir(Path rootDir, RegionInfo region) { 1059 return getRegionDirFromTableDir(CommonFSUtils.getTableDir(rootDir, region.getTable()), region); 1060 } 1061 1062 public static Path getRegionDirFromTableDir(Path tableDir, RegionInfo region) { 1063 return getRegionDirFromTableDir(tableDir, 1064 ServerRegionReplicaUtil.getRegionInfoForFs(region).getEncodedName()); 1065 } 1066 1067 public static Path getRegionDirFromTableDir(Path tableDir, String encodedRegionName) { 1068 return new Path(tableDir, encodedRegionName); 1069 } 1070 1071 /** 1072 * Filter for all dirs that are legal column family names. This is generally used for colfam dirs 1073 * <hbase.rootdir>/<tabledir>/<regiondir>/<colfamdir>. 1074 */ 1075 public static class FamilyDirFilter extends AbstractFileStatusFilter { 1076 final FileSystem fs; 1077 1078 public FamilyDirFilter(FileSystem fs) { 1079 this.fs = fs; 1080 } 1081 1082 @Override 1083 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 1084 try { 1085 // throws IAE if invalid 1086 ColumnFamilyDescriptorBuilder.isLegalColumnFamilyName(Bytes.toBytes(p.getName())); 1087 } catch (IllegalArgumentException iae) { 1088 // path name is an invalid family name and thus is excluded. 1089 return false; 1090 } 1091 1092 try { 1093 return isDirectory(fs, isDir, p); 1094 } catch (IOException ioe) { 1095 // Maybe the file was moved or the fs was disconnected. 1096 LOG.warn("Skipping file {} due to IOException", p, ioe); 1097 return false; 1098 } 1099 } 1100 } 1101 1102 /** 1103 * Given a particular region dir, return all the familydirs inside it 1104 * @param fs A file system for the Path 1105 * @param regionDir Path to a specific region directory 1106 * @return List of paths to valid family directories in region dir. 1107 */ 1108 public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir) 1109 throws IOException { 1110 // assumes we are in a region dir. 1111 return getFilePaths(fs, regionDir, new FamilyDirFilter(fs)); 1112 } 1113 1114 public static List<Path> getReferenceFilePaths(final FileSystem fs, final Path familyDir) 1115 throws IOException { 1116 return getFilePaths(fs, familyDir, new ReferenceFileFilter(fs)); 1117 } 1118 1119 public static List<Path> getReferenceAndLinkFilePaths(final FileSystem fs, final Path familyDir) 1120 throws IOException { 1121 return getFilePaths(fs, familyDir, new ReferenceAndLinkFileFilter(fs)); 1122 } 1123 1124 private static List<Path> getFilePaths(final FileSystem fs, final Path dir, 1125 final PathFilter pathFilter) throws IOException { 1126 FileStatus[] fds = fs.listStatus(dir, pathFilter); 1127 List<Path> files = new ArrayList<>(fds.length); 1128 for (FileStatus fdfs : fds) { 1129 Path fdPath = fdfs.getPath(); 1130 files.add(fdPath); 1131 } 1132 return files; 1133 } 1134 1135 public static int getRegionReferenceAndLinkFileCount(final FileSystem fs, final Path p) { 1136 int result = 0; 1137 try { 1138 for (Path familyDir : getFamilyDirs(fs, p)) { 1139 result += getReferenceAndLinkFilePaths(fs, familyDir).size(); 1140 } 1141 } catch (IOException e) { 1142 LOG.warn("Error Counting reference files.", e); 1143 } 1144 return result; 1145 } 1146 1147 public static class ReferenceAndLinkFileFilter implements PathFilter { 1148 1149 private final FileSystem fs; 1150 1151 public ReferenceAndLinkFileFilter(FileSystem fs) { 1152 this.fs = fs; 1153 } 1154 1155 @Override 1156 public boolean accept(Path rd) { 1157 try { 1158 // only files can be references. 1159 return !fs.getFileStatus(rd).isDirectory() 1160 && (StoreFileInfo.isReference(rd) || HFileLink.isHFileLink(rd)); 1161 } catch (IOException ioe) { 1162 // Maybe the file was moved or the fs was disconnected. 1163 LOG.warn("Skipping file " + rd + " due to IOException", ioe); 1164 return false; 1165 } 1166 } 1167 } 1168 1169 /** 1170 * Filter for HFiles that excludes reference files. 1171 */ 1172 public static class HFileFilter extends AbstractFileStatusFilter { 1173 final FileSystem fs; 1174 1175 public HFileFilter(FileSystem fs) { 1176 this.fs = fs; 1177 } 1178 1179 @Override 1180 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 1181 if (!StoreFileInfo.isHFile(p) && !StoreFileInfo.isMobFile(p)) { 1182 return false; 1183 } 1184 1185 try { 1186 return isFile(fs, isDir, p); 1187 } catch (IOException ioe) { 1188 // Maybe the file was moved or the fs was disconnected. 1189 LOG.warn("Skipping file {} due to IOException", p, ioe); 1190 return false; 1191 } 1192 } 1193 } 1194 1195 /** 1196 * Filter for HFileLinks (StoreFiles and HFiles not included). the filter itself does not consider 1197 * if a link is file or not. 1198 */ 1199 public static class HFileLinkFilter implements PathFilter { 1200 1201 @Override 1202 public boolean accept(Path p) { 1203 return HFileLink.isHFileLink(p); 1204 } 1205 } 1206 1207 public static class ReferenceFileFilter extends AbstractFileStatusFilter { 1208 1209 private final FileSystem fs; 1210 1211 public ReferenceFileFilter(FileSystem fs) { 1212 this.fs = fs; 1213 } 1214 1215 @Override 1216 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 1217 if (!StoreFileInfo.isReference(p)) { 1218 return false; 1219 } 1220 1221 try { 1222 // only files can be references. 1223 return isFile(fs, isDir, p); 1224 } catch (IOException ioe) { 1225 // Maybe the file was moved or the fs was disconnected. 1226 LOG.warn("Skipping file {} due to IOException", p, ioe); 1227 return false; 1228 } 1229 } 1230 } 1231 1232 /** 1233 * Called every so-often by storefile map builder getTableStoreFilePathMap to report progress. 1234 */ 1235 interface ProgressReporter { 1236 /** 1237 * @param status File or directory we are about to process. 1238 */ 1239 void progress(FileStatus status); 1240 } 1241 1242 /** 1243 * Runs through the HBase rootdir/tablename and creates a reverse lookup map for table StoreFile 1244 * names to the full Path. <br> 1245 * Example...<br> 1246 * Key = 3944417774205889744 <br> 1247 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1248 * @param map map to add values. If null, this method will create and populate one to 1249 * return 1250 * @param fs The file system to use. 1251 * @param hbaseRootDir The root directory to scan. 1252 * @param tableName name of the table to scan. 1253 * @return Map keyed by StoreFile name with a value of the full Path. 1254 * @throws IOException When scanning the directory fails. 1255 */ 1256 public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map, 1257 final FileSystem fs, final Path hbaseRootDir, TableName tableName) 1258 throws IOException, InterruptedException { 1259 return getTableStoreFilePathMap(map, fs, hbaseRootDir, tableName, null, null, 1260 (ProgressReporter) null); 1261 } 1262 1263 /** 1264 * Runs through the HBase rootdir/tablename and creates a reverse lookup map for table StoreFile 1265 * names to the full Path. Note that because this method can be called on a 'live' HBase system 1266 * that we will skip files that no longer exist by the time we traverse them and similarly the 1267 * user of the result needs to consider that some entries in this map may not exist by the time 1268 * this call completes. <br> 1269 * Example...<br> 1270 * Key = 3944417774205889744 <br> 1271 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1272 * @param resultMap map to add values. If null, this method will create and populate one to 1273 * return 1274 * @param fs The file system to use. 1275 * @param hbaseRootDir The root directory to scan. 1276 * @param tableName name of the table to scan. 1277 * @param sfFilter optional path filter to apply to store files 1278 * @param executor optional executor service to parallelize this operation 1279 * @param progressReporter Instance or null; gets called every time we move to new region of 1280 * family dir and for each store file. 1281 * @return Map keyed by StoreFile name with a value of the full Path. 1282 * @throws IOException When scanning the directory fails. 1283 * @deprecated Since 2.3.0. For removal in hbase4. Use ProgressReporter override instead. 1284 */ 1285 @Deprecated 1286 public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap, 1287 final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter, 1288 ExecutorService executor, final HbckErrorReporter progressReporter) 1289 throws IOException, InterruptedException { 1290 return getTableStoreFilePathMap(resultMap, fs, hbaseRootDir, tableName, sfFilter, executor, 1291 new ProgressReporter() { 1292 @Override 1293 public void progress(FileStatus status) { 1294 // status is not used in this implementation. 1295 progressReporter.progress(); 1296 } 1297 }); 1298 } 1299 1300 /** 1301 * Runs through the HBase rootdir/tablename and creates a reverse lookup map for table StoreFile 1302 * names to the full Path. Note that because this method can be called on a 'live' HBase system 1303 * that we will skip files that no longer exist by the time we traverse them and similarly the 1304 * user of the result needs to consider that some entries in this map may not exist by the time 1305 * this call completes. <br> 1306 * Example...<br> 1307 * Key = 3944417774205889744 <br> 1308 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1309 * @param resultMap map to add values. If null, this method will create and populate one to 1310 * return 1311 * @param fs The file system to use. 1312 * @param hbaseRootDir The root directory to scan. 1313 * @param tableName name of the table to scan. 1314 * @param sfFilter optional path filter to apply to store files 1315 * @param executor optional executor service to parallelize this operation 1316 * @param progressReporter Instance or null; gets called every time we move to new region of 1317 * family dir and for each store file. 1318 * @return Map keyed by StoreFile name with a value of the full Path. 1319 * @throws IOException When scanning the directory fails. 1320 * @throws InterruptedException the thread is interrupted, either before or during the activity. 1321 */ 1322 public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap, 1323 final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter, 1324 ExecutorService executor, final ProgressReporter progressReporter) 1325 throws IOException, InterruptedException { 1326 1327 final Map<String, Path> finalResultMap = 1328 resultMap == null ? new ConcurrentHashMap<>(128, 0.75f, 32) : resultMap; 1329 1330 // only include the directory paths to tables 1331 Path tableDir = CommonFSUtils.getTableDir(hbaseRootDir, tableName); 1332 // Inside a table, there are compaction.dir directories to skip. Otherwise, all else 1333 // should be regions. 1334 final FamilyDirFilter familyFilter = new FamilyDirFilter(fs); 1335 final Vector<Exception> exceptions = new Vector<>(); 1336 1337 try { 1338 List<FileStatus> regionDirs = 1339 FSUtils.listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs)); 1340 if (regionDirs == null) { 1341 return finalResultMap; 1342 } 1343 1344 final List<Future<?>> futures = new ArrayList<>(regionDirs.size()); 1345 1346 for (FileStatus regionDir : regionDirs) { 1347 if (null != progressReporter) { 1348 progressReporter.progress(regionDir); 1349 } 1350 final Path dd = regionDir.getPath(); 1351 1352 if (!exceptions.isEmpty()) { 1353 break; 1354 } 1355 1356 Runnable getRegionStoreFileMapCall = new Runnable() { 1357 @Override 1358 public void run() { 1359 try { 1360 HashMap<String, Path> regionStoreFileMap = new HashMap<>(); 1361 List<FileStatus> familyDirs = 1362 FSUtils.listStatusWithStatusFilter(fs, dd, familyFilter); 1363 if (familyDirs == null) { 1364 if (!fs.exists(dd)) { 1365 LOG.warn("Skipping region because it no longer exists: " + dd); 1366 } else { 1367 LOG.warn("Skipping region because it has no family dirs: " + dd); 1368 } 1369 return; 1370 } 1371 for (FileStatus familyDir : familyDirs) { 1372 if (null != progressReporter) { 1373 progressReporter.progress(familyDir); 1374 } 1375 Path family = familyDir.getPath(); 1376 if (family.getName().equals(HConstants.RECOVERED_EDITS_DIR)) { 1377 continue; 1378 } 1379 // now in family, iterate over the StoreFiles and 1380 // put in map 1381 FileStatus[] familyStatus = fs.listStatus(family); 1382 for (FileStatus sfStatus : familyStatus) { 1383 if (null != progressReporter) { 1384 progressReporter.progress(sfStatus); 1385 } 1386 Path sf = sfStatus.getPath(); 1387 if (sfFilter == null || sfFilter.accept(sf)) { 1388 regionStoreFileMap.put(sf.getName(), sf); 1389 } 1390 } 1391 } 1392 finalResultMap.putAll(regionStoreFileMap); 1393 } catch (Exception e) { 1394 LOG.error("Could not get region store file map for region: " + dd, e); 1395 exceptions.add(e); 1396 } 1397 } 1398 }; 1399 1400 // If executor is available, submit async tasks to exec concurrently, otherwise 1401 // just do serial sync execution 1402 if (executor != null) { 1403 Future<?> future = executor.submit(getRegionStoreFileMapCall); 1404 futures.add(future); 1405 } else { 1406 FutureTask<?> future = new FutureTask<>(getRegionStoreFileMapCall, null); 1407 future.run(); 1408 futures.add(future); 1409 } 1410 } 1411 1412 // Ensure all pending tasks are complete (or that we run into an exception) 1413 for (Future<?> f : futures) { 1414 if (!exceptions.isEmpty()) { 1415 break; 1416 } 1417 try { 1418 f.get(); 1419 } catch (ExecutionException e) { 1420 LOG.error("Unexpected exec exception! Should've been caught already. (Bug?)", e); 1421 // Shouldn't happen, we already logged/caught any exceptions in the Runnable 1422 } 1423 } 1424 } catch (IOException e) { 1425 LOG.error("Cannot execute getTableStoreFilePathMap for " + tableName, e); 1426 exceptions.add(e); 1427 } finally { 1428 if (!exceptions.isEmpty()) { 1429 // Just throw the first exception as an indication something bad happened 1430 // Don't need to propagate all the exceptions, we already logged them all anyway 1431 Throwables.propagateIfPossible(exceptions.firstElement(), IOException.class); 1432 throw new IOException(exceptions.firstElement()); 1433 } 1434 } 1435 1436 return finalResultMap; 1437 } 1438 1439 public static int getRegionReferenceFileCount(final FileSystem fs, final Path p) { 1440 int result = 0; 1441 try { 1442 for (Path familyDir : getFamilyDirs(fs, p)) { 1443 result += getReferenceFilePaths(fs, familyDir).size(); 1444 } 1445 } catch (IOException e) { 1446 LOG.warn("Error counting reference files", e); 1447 } 1448 return result; 1449 } 1450 1451 /** 1452 * Runs through the HBase rootdir and creates a reverse lookup map for table StoreFile names to 1453 * the full Path. <br> 1454 * Example...<br> 1455 * Key = 3944417774205889744 <br> 1456 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1457 * @param fs The file system to use. 1458 * @param hbaseRootDir The root directory to scan. 1459 * @return Map keyed by StoreFile name with a value of the full Path. 1460 * @throws IOException When scanning the directory fails. 1461 */ 1462 public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs, 1463 final Path hbaseRootDir) throws IOException, InterruptedException { 1464 return getTableStoreFilePathMap(fs, hbaseRootDir, null, null, (ProgressReporter) null); 1465 } 1466 1467 /** 1468 * Runs through the HBase rootdir and creates a reverse lookup map for table StoreFile names to 1469 * the full Path. <br> 1470 * Example...<br> 1471 * Key = 3944417774205889744 <br> 1472 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1473 * @param fs The file system to use. 1474 * @param hbaseRootDir The root directory to scan. 1475 * @param sfFilter optional path filter to apply to store files 1476 * @param executor optional executor service to parallelize this operation 1477 * @param progressReporter Instance or null; gets called every time we move to new region of 1478 * family dir and for each store file. 1479 * @return Map keyed by StoreFile name with a value of the full Path. 1480 * @throws IOException When scanning the directory fails. 1481 * @deprecated Since 2.3.0. Will be removed in hbase4. Used 1482 * {@link #getTableStoreFilePathMap(FileSystem, Path, PathFilter, ExecutorService, ProgressReporter)} 1483 */ 1484 @Deprecated 1485 public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs, 1486 final Path hbaseRootDir, PathFilter sfFilter, ExecutorService executor, 1487 HbckErrorReporter progressReporter) throws IOException, InterruptedException { 1488 return getTableStoreFilePathMap(fs, hbaseRootDir, sfFilter, executor, new ProgressReporter() { 1489 @Override 1490 public void progress(FileStatus status) { 1491 // status is not used in this implementation. 1492 progressReporter.progress(); 1493 } 1494 }); 1495 } 1496 1497 /** 1498 * Runs through the HBase rootdir and creates a reverse lookup map for table StoreFile names to 1499 * the full Path. <br> 1500 * Example...<br> 1501 * Key = 3944417774205889744 <br> 1502 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1503 * @param fs The file system to use. 1504 * @param hbaseRootDir The root directory to scan. 1505 * @param sfFilter optional path filter to apply to store files 1506 * @param executor optional executor service to parallelize this operation 1507 * @param progressReporter Instance or null; gets called every time we move to new region of 1508 * family dir and for each store file. 1509 * @return Map keyed by StoreFile name with a value of the full Path. 1510 * @throws IOException When scanning the directory fails. 1511 */ 1512 public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs, 1513 final Path hbaseRootDir, PathFilter sfFilter, ExecutorService executor, 1514 ProgressReporter progressReporter) throws IOException, InterruptedException { 1515 ConcurrentHashMap<String, Path> map = new ConcurrentHashMap<>(1024, 0.75f, 32); 1516 1517 // if this method looks similar to 'getTableFragmentation' that is because 1518 // it was borrowed from it. 1519 1520 // only include the directory paths to tables 1521 for (Path tableDir : FSUtils.getTableDirs(fs, hbaseRootDir)) { 1522 getTableStoreFilePathMap(map, fs, hbaseRootDir, CommonFSUtils.getTableName(tableDir), 1523 sfFilter, executor, progressReporter); 1524 } 1525 return map; 1526 } 1527 1528 /** 1529 * Filters FileStatuses in an array and returns a list 1530 * @param input An array of FileStatuses 1531 * @param filter A required filter to filter the array 1532 * @return A list of FileStatuses 1533 */ 1534 public static List<FileStatus> filterFileStatuses(FileStatus[] input, FileStatusFilter filter) { 1535 if (input == null) return null; 1536 return filterFileStatuses(Iterators.forArray(input), filter); 1537 } 1538 1539 /** 1540 * Filters FileStatuses in an iterator and returns a list 1541 * @param input An iterator of FileStatuses 1542 * @param filter A required filter to filter the array 1543 * @return A list of FileStatuses 1544 */ 1545 public static List<FileStatus> filterFileStatuses(Iterator<FileStatus> input, 1546 FileStatusFilter filter) { 1547 if (input == null) return null; 1548 ArrayList<FileStatus> results = new ArrayList<>(); 1549 while (input.hasNext()) { 1550 FileStatus f = input.next(); 1551 if (filter.accept(f)) { 1552 results.add(f); 1553 } 1554 } 1555 return results; 1556 } 1557 1558 /** 1559 * Calls fs.listStatus() and treats FileNotFoundException as non-fatal This accommodates 1560 * differences between hadoop versions, where hadoop 1 does not throw a FileNotFoundException, and 1561 * return an empty FileStatus[] while Hadoop 2 will throw FileNotFoundException. 1562 * @param fs file system 1563 * @param dir directory 1564 * @param filter file status filter 1565 * @return null if dir is empty or doesn't exist, otherwise FileStatus list 1566 */ 1567 public static List<FileStatus> listStatusWithStatusFilter(final FileSystem fs, final Path dir, 1568 final FileStatusFilter filter) throws IOException { 1569 FileStatus[] status = null; 1570 try { 1571 status = fs.listStatus(dir); 1572 } catch (FileNotFoundException fnfe) { 1573 LOG.trace("{} does not exist", dir); 1574 return null; 1575 } 1576 1577 if (ArrayUtils.getLength(status) == 0) { 1578 return null; 1579 } 1580 1581 if (filter == null) { 1582 return Arrays.asList(status); 1583 } else { 1584 List<FileStatus> status2 = filterFileStatuses(status, filter); 1585 if (status2 == null || status2.isEmpty()) { 1586 return null; 1587 } else { 1588 return status2; 1589 } 1590 } 1591 } 1592 1593 /** 1594 * This function is to scan the root path of the file system to get the degree of locality for 1595 * each region on each of the servers having at least one block of that region. This is used by 1596 * the tool {@link org.apache.hadoop.hbase.master.RegionPlacementMaintainer} the configuration to 1597 * use 1598 * @return the mapping from region encoded name to a map of server names to locality fraction in 1599 * case of file system errors or interrupts 1600 */ 1601 public static Map<String, Map<String, Float>> 1602 getRegionDegreeLocalityMappingFromFS(final Configuration conf) throws IOException { 1603 return getRegionDegreeLocalityMappingFromFS(conf, null, 1604 conf.getInt(THREAD_POOLSIZE, DEFAULT_THREAD_POOLSIZE)); 1605 1606 } 1607 1608 /** 1609 * This function is to scan the root path of the file system to get the degree of locality for 1610 * each region on each of the servers having at least one block of that region. the configuration 1611 * to use the table you wish to scan locality for the thread pool size to use 1612 * @return the mapping from region encoded name to a map of server names to locality fraction in 1613 * case of file system errors or interrupts 1614 */ 1615 public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS( 1616 final Configuration conf, final String desiredTable, int threadPoolSize) throws IOException { 1617 Map<String, Map<String, Float>> regionDegreeLocalityMapping = new ConcurrentHashMap<>(); 1618 getRegionLocalityMappingFromFS(conf, desiredTable, threadPoolSize, regionDegreeLocalityMapping); 1619 return regionDegreeLocalityMapping; 1620 } 1621 1622 /** 1623 * This function is to scan the root path of the file system to get either the mapping between the 1624 * region name and its best locality region server or the degree of locality of each region on 1625 * each of the servers having at least one block of that region. The output map parameters are 1626 * both optional. the configuration to use the table you wish to scan locality for the thread pool 1627 * size to use the map into which to put the locality degree mapping or null, must be a 1628 * thread-safe implementation in case of file system errors or interrupts 1629 */ 1630 private static void getRegionLocalityMappingFromFS(final Configuration conf, 1631 final String desiredTable, int threadPoolSize, 1632 final Map<String, Map<String, Float>> regionDegreeLocalityMapping) throws IOException { 1633 final FileSystem fs = FileSystem.get(conf); 1634 final Path rootPath = CommonFSUtils.getRootDir(conf); 1635 final long startTime = EnvironmentEdgeManager.currentTime(); 1636 final Path queryPath; 1637 // The table files are in ${hbase.rootdir}/data/<namespace>/<table>/* 1638 if (null == desiredTable) { 1639 queryPath = 1640 new Path(new Path(rootPath, HConstants.BASE_NAMESPACE_DIR).toString() + "/*/*/*/"); 1641 } else { 1642 queryPath = new Path( 1643 CommonFSUtils.getTableDir(rootPath, TableName.valueOf(desiredTable)).toString() + "/*/"); 1644 } 1645 1646 // reject all paths that are not appropriate 1647 PathFilter pathFilter = new PathFilter() { 1648 @Override 1649 public boolean accept(Path path) { 1650 // this is the region name; it may get some noise data 1651 if (null == path) { 1652 return false; 1653 } 1654 1655 // no parent? 1656 Path parent = path.getParent(); 1657 if (null == parent) { 1658 return false; 1659 } 1660 1661 String regionName = path.getName(); 1662 if (null == regionName) { 1663 return false; 1664 } 1665 1666 if (!regionName.toLowerCase(Locale.ROOT).matches("[0-9a-f]+")) { 1667 return false; 1668 } 1669 return true; 1670 } 1671 }; 1672 1673 FileStatus[] statusList = fs.globStatus(queryPath, pathFilter); 1674 1675 if (LOG.isDebugEnabled()) { 1676 LOG.debug("Query Path: {} ; # list of files: {}", queryPath, Arrays.toString(statusList)); 1677 } 1678 1679 if (null == statusList) { 1680 return; 1681 } 1682 1683 // lower the number of threads in case we have very few expected regions 1684 threadPoolSize = Math.min(threadPoolSize, statusList.length); 1685 1686 // run in multiple threads 1687 final ExecutorService tpe = Executors.newFixedThreadPool(threadPoolSize, 1688 new ThreadFactoryBuilder().setNameFormat("FSRegionQuery-pool-%d").setDaemon(true) 1689 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 1690 try { 1691 // ignore all file status items that are not of interest 1692 for (FileStatus regionStatus : statusList) { 1693 if (null == regionStatus || !regionStatus.isDirectory()) { 1694 continue; 1695 } 1696 1697 final Path regionPath = regionStatus.getPath(); 1698 if (null != regionPath) { 1699 tpe.execute(new FSRegionScanner(fs, regionPath, null, regionDegreeLocalityMapping)); 1700 } 1701 } 1702 } finally { 1703 tpe.shutdown(); 1704 final long threadWakeFrequency = (long) conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 1705 HConstants.DEFAULT_THREAD_WAKE_FREQUENCY); 1706 try { 1707 // here we wait until TPE terminates, which is either naturally or by 1708 // exceptions in the execution of the threads 1709 while (!tpe.awaitTermination(threadWakeFrequency, TimeUnit.MILLISECONDS)) { 1710 // printing out rough estimate, so as to not introduce 1711 // AtomicInteger 1712 LOG.info("Locality checking is underway: { Scanned Regions : " 1713 + ((ThreadPoolExecutor) tpe).getCompletedTaskCount() + "/" 1714 + ((ThreadPoolExecutor) tpe).getTaskCount() + " }"); 1715 } 1716 } catch (InterruptedException e) { 1717 Thread.currentThread().interrupt(); 1718 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 1719 } 1720 } 1721 1722 long overhead = EnvironmentEdgeManager.currentTime() - startTime; 1723 LOG.info("Scan DFS for locality info takes {}ms", overhead); 1724 } 1725 1726 /** 1727 * Do our short circuit read setup. Checks buffer size to use and whether to do checksumming in 1728 * hbase or hdfs. 1729 */ 1730 public static void setupShortCircuitRead(final Configuration conf) { 1731 // Check that the user has not set the "dfs.client.read.shortcircuit.skip.checksum" property. 1732 boolean shortCircuitSkipChecksum = 1733 conf.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false); 1734 boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true); 1735 if (shortCircuitSkipChecksum) { 1736 LOG.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not " 1737 + "be set to true." 1738 + (useHBaseChecksum 1739 ? " HBase checksum doesn't require " 1740 + "it, see https://issues.apache.org/jira/browse/HBASE-6868." 1741 : "")); 1742 assert !shortCircuitSkipChecksum; // this will fail if assertions are on 1743 } 1744 checkShortCircuitReadBufferSize(conf); 1745 } 1746 1747 /** 1748 * Check if short circuit read buffer size is set and if not, set it to hbase value. 1749 */ 1750 public static void checkShortCircuitReadBufferSize(final Configuration conf) { 1751 final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2; 1752 final int notSet = -1; 1753 // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2 1754 final String dfsKey = "dfs.client.read.shortcircuit.buffer.size"; 1755 int size = conf.getInt(dfsKey, notSet); 1756 // If a size is set, return -- we will use it. 1757 if (size != notSet) return; 1758 // But short circuit buffer size is normally not set. Put in place the hbase wanted size. 1759 int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize); 1760 conf.setIfUnset(dfsKey, Integer.toString(hbaseSize)); 1761 } 1762 1763 /** 1764 * Returns The DFSClient DFSHedgedReadMetrics instance or null if can't be found or not on hdfs. 1765 */ 1766 public static DFSHedgedReadMetrics getDFSHedgedReadMetrics(final Configuration c) 1767 throws IOException { 1768 if (!CommonFSUtils.isHDFS(c)) { 1769 return null; 1770 } 1771 // getHedgedReadMetrics is package private. Get the DFSClient instance that is internal 1772 // to the DFS FS instance and make the method getHedgedReadMetrics accessible, then invoke it 1773 // to get the singleton instance of DFSHedgedReadMetrics shared by DFSClients. 1774 final String name = "getHedgedReadMetrics"; 1775 DFSClient dfsclient = ((DistributedFileSystem) FileSystem.get(c)).getClient(); 1776 Method m; 1777 try { 1778 m = dfsclient.getClass().getDeclaredMethod(name); 1779 } catch (NoSuchMethodException e) { 1780 LOG.warn( 1781 "Failed find method " + name + " in dfsclient; no hedged read metrics: " + e.getMessage()); 1782 return null; 1783 } catch (SecurityException e) { 1784 LOG.warn( 1785 "Failed find method " + name + " in dfsclient; no hedged read metrics: " + e.getMessage()); 1786 return null; 1787 } 1788 m.setAccessible(true); 1789 try { 1790 return (DFSHedgedReadMetrics) m.invoke(dfsclient); 1791 } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { 1792 LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " 1793 + e.getMessage()); 1794 return null; 1795 } 1796 } 1797 1798 public static List<Path> copyFilesParallel(FileSystem srcFS, Path src, FileSystem dstFS, Path dst, 1799 Configuration conf, int threads) throws IOException { 1800 ExecutorService pool = Executors.newFixedThreadPool(threads); 1801 List<Future<Void>> futures = new ArrayList<>(); 1802 List<Path> traversedPaths; 1803 try { 1804 traversedPaths = copyFiles(srcFS, src, dstFS, dst, conf, pool, futures); 1805 for (Future<Void> future : futures) { 1806 future.get(); 1807 } 1808 } catch (ExecutionException | InterruptedException | IOException e) { 1809 throw new IOException("Copy snapshot reference files failed", e); 1810 } finally { 1811 pool.shutdownNow(); 1812 } 1813 return traversedPaths; 1814 } 1815 1816 private static List<Path> copyFiles(FileSystem srcFS, Path src, FileSystem dstFS, Path dst, 1817 Configuration conf, ExecutorService pool, List<Future<Void>> futures) throws IOException { 1818 List<Path> traversedPaths = new ArrayList<>(); 1819 traversedPaths.add(dst); 1820 FileStatus currentFileStatus = srcFS.getFileStatus(src); 1821 if (currentFileStatus.isDirectory()) { 1822 if (!dstFS.mkdirs(dst)) { 1823 throw new IOException("Create directory failed: " + dst); 1824 } 1825 FileStatus[] subPaths = srcFS.listStatus(src); 1826 for (FileStatus subPath : subPaths) { 1827 traversedPaths.addAll(copyFiles(srcFS, subPath.getPath(), dstFS, 1828 new Path(dst, subPath.getPath().getName()), conf, pool, futures)); 1829 } 1830 } else { 1831 Future<Void> future = pool.submit(() -> { 1832 FileUtil.copy(srcFS, src, dstFS, dst, false, false, conf); 1833 return null; 1834 }); 1835 futures.add(future); 1836 } 1837 return traversedPaths; 1838 } 1839 1840 /** Returns A set containing all namenode addresses of fs */ 1841 private static Set<InetSocketAddress> getNNAddresses(DistributedFileSystem fs, 1842 Configuration conf) { 1843 Set<InetSocketAddress> addresses = new HashSet<>(); 1844 String serviceName = fs.getCanonicalServiceName(); 1845 1846 if (serviceName.startsWith("ha-hdfs")) { 1847 try { 1848 Map<String, Map<String, InetSocketAddress>> addressMap = 1849 DFSUtil.getNNServiceRpcAddressesForCluster(conf); 1850 String nameService = serviceName.substring(serviceName.indexOf(":") + 1); 1851 if (addressMap.containsKey(nameService)) { 1852 Map<String, InetSocketAddress> nnMap = addressMap.get(nameService); 1853 for (Map.Entry<String, InetSocketAddress> e2 : nnMap.entrySet()) { 1854 InetSocketAddress addr = e2.getValue(); 1855 addresses.add(addr); 1856 } 1857 } 1858 } catch (Exception e) { 1859 LOG.warn("DFSUtil.getNNServiceRpcAddresses failed. serviceName=" + serviceName, e); 1860 } 1861 } else { 1862 URI uri = fs.getUri(); 1863 int port = uri.getPort(); 1864 if (port < 0) { 1865 int idx = serviceName.indexOf(':'); 1866 port = Integer.parseInt(serviceName.substring(idx + 1)); 1867 } 1868 InetSocketAddress addr = new InetSocketAddress(uri.getHost(), port); 1869 addresses.add(addr); 1870 } 1871 1872 return addresses; 1873 } 1874 1875 /** 1876 * @param conf the Configuration of HBase 1877 * @return Whether srcFs and desFs are on same hdfs or not 1878 */ 1879 public static boolean isSameHdfs(Configuration conf, FileSystem srcFs, FileSystem desFs) { 1880 // By getCanonicalServiceName, we could make sure both srcFs and desFs 1881 // show a unified format which contains scheme, host and port. 1882 String srcServiceName = srcFs.getCanonicalServiceName(); 1883 String desServiceName = desFs.getCanonicalServiceName(); 1884 1885 if (srcServiceName == null || desServiceName == null) { 1886 return false; 1887 } 1888 if (srcServiceName.equals(desServiceName)) { 1889 return true; 1890 } 1891 if (srcServiceName.startsWith("ha-hdfs") && desServiceName.startsWith("ha-hdfs")) { 1892 Collection<String> internalNameServices = 1893 conf.getTrimmedStringCollection("dfs.internal.nameservices"); 1894 if (!internalNameServices.isEmpty()) { 1895 if (internalNameServices.contains(srcServiceName.split(":")[1])) { 1896 return true; 1897 } else { 1898 return false; 1899 } 1900 } 1901 } 1902 if (srcFs instanceof DistributedFileSystem && desFs instanceof DistributedFileSystem) { 1903 // If one serviceName is an HA format while the other is a non-HA format, 1904 // maybe they refer to the same FileSystem. 1905 // For example, srcFs is "ha-hdfs://nameservices" and desFs is "hdfs://activeNamenode:port" 1906 Set<InetSocketAddress> srcAddrs = getNNAddresses((DistributedFileSystem) srcFs, conf); 1907 Set<InetSocketAddress> desAddrs = getNNAddresses((DistributedFileSystem) desFs, conf); 1908 if (Sets.intersection(srcAddrs, desAddrs).size() > 0) { 1909 return true; 1910 } 1911 } 1912 1913 return false; 1914 } 1915}