001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import static org.apache.hadoop.hbase.util.LocatedBlockHelper.getLocatedBlockLocations; 021import static org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET; 022 023import edu.umd.cs.findbugs.annotations.CheckForNull; 024import java.io.ByteArrayInputStream; 025import java.io.DataInputStream; 026import java.io.EOFException; 027import java.io.FileNotFoundException; 028import java.io.IOException; 029import java.io.InterruptedIOException; 030import java.lang.reflect.InvocationTargetException; 031import java.lang.reflect.Method; 032import java.net.InetSocketAddress; 033import java.net.URI; 034import java.util.ArrayList; 035import java.util.Arrays; 036import java.util.Collection; 037import java.util.Collections; 038import java.util.HashMap; 039import java.util.HashSet; 040import java.util.Iterator; 041import java.util.List; 042import java.util.Locale; 043import java.util.Map; 044import java.util.Optional; 045import java.util.Set; 046import java.util.Vector; 047import java.util.concurrent.ConcurrentHashMap; 048import java.util.concurrent.ExecutionException; 049import java.util.concurrent.ExecutorService; 050import java.util.concurrent.Executors; 051import java.util.concurrent.Future; 052import java.util.concurrent.FutureTask; 053import java.util.concurrent.ThreadPoolExecutor; 054import java.util.concurrent.TimeUnit; 055import java.util.regex.Pattern; 056import org.apache.commons.lang3.ArrayUtils; 057import org.apache.hadoop.conf.Configuration; 058import org.apache.hadoop.fs.BlockLocation; 059import org.apache.hadoop.fs.FSDataInputStream; 060import org.apache.hadoop.fs.FSDataOutputStream; 061import org.apache.hadoop.fs.FileStatus; 062import org.apache.hadoop.fs.FileSystem; 063import org.apache.hadoop.fs.FileUtil; 064import org.apache.hadoop.fs.Path; 065import org.apache.hadoop.fs.PathFilter; 066import org.apache.hadoop.fs.StorageType; 067import org.apache.hadoop.fs.permission.FsPermission; 068import org.apache.hadoop.hbase.ClusterIdFile; 069import org.apache.hadoop.hbase.ClusterIdFileParser; 070import org.apache.hadoop.hbase.HConstants; 071import org.apache.hadoop.hbase.HDFSBlocksDistribution; 072import org.apache.hadoop.hbase.TableName; 073import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 074import org.apache.hadoop.hbase.client.RegionInfo; 075import org.apache.hadoop.hbase.client.RegionInfoBuilder; 076import org.apache.hadoop.hbase.exceptions.DeserializationException; 077import org.apache.hadoop.hbase.fs.HFileSystem; 078import org.apache.hadoop.hbase.io.HFileLink; 079import org.apache.hadoop.hbase.master.HMaster; 080import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 081import org.apache.hadoop.hdfs.DFSClient; 082import org.apache.hadoop.hdfs.DFSHedgedReadMetrics; 083import org.apache.hadoop.hdfs.DFSUtil; 084import org.apache.hadoop.hdfs.DistributedFileSystem; 085import org.apache.hadoop.hdfs.client.HdfsDataInputStream; 086import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 087import org.apache.hadoop.hdfs.protocol.LocatedBlock; 088import org.apache.hadoop.io.IOUtils; 089import org.apache.hadoop.ipc.RemoteException; 090import org.apache.yetus.audience.InterfaceAudience; 091import org.slf4j.Logger; 092import org.slf4j.LoggerFactory; 093 094import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 095import org.apache.hbase.thirdparty.com.google.common.collect.Iterators; 096import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 097import org.apache.hbase.thirdparty.com.google.common.primitives.Ints; 098import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 099 100import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 101import org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos; 102 103/** 104 * Utility methods for interacting with the underlying file system. 105 */ 106@InterfaceAudience.Private 107public final class FSUtils { 108 private static final Logger LOG = LoggerFactory.getLogger(FSUtils.class); 109 110 private static final String THREAD_POOLSIZE = "hbase.client.localityCheck.threadPoolSize"; 111 private static final int DEFAULT_THREAD_POOLSIZE = 2; 112 113 /** Set to true on Windows platforms */ 114 // currently only used in testing. TODO refactor into a test class 115 public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows"); 116 117 private static Class<?> safeModeClazz = null; 118 private static Class<?> safeModeActionClazz = null; 119 private static Object safeModeGet = null; 120 { 121 try { 122 safeModeClazz = Class.forName("org.apache.hadoop.fs.SafeMode"); 123 safeModeActionClazz = Class.forName("org.apache.hadoop.fs.SafeModeAction"); 124 safeModeGet = safeModeClazz.getField("SAFEMODE_GET").get(null); 125 } catch (ClassNotFoundException | NoSuchFieldException e) { 126 LOG.debug("SafeMode interface not in the classpath, this means Hadoop 3.3.5 or below."); 127 } catch (IllegalAccessException e) { 128 LOG.error("SafeModeAction.SAFEMODE_GET is not accessible. " 129 + "Unexpected Hadoop version or messy classpath?", e); 130 throw new RuntimeException(e); 131 } 132 } 133 134 private FSUtils() { 135 } 136 137 /** Returns True is <code>fs</code> is instance of DistributedFileSystem n */ 138 public static boolean isDistributedFileSystem(final FileSystem fs) throws IOException { 139 FileSystem fileSystem = fs; 140 // If passed an instance of HFileSystem, it fails instanceof DistributedFileSystem. 141 // Check its backing fs for dfs-ness. 142 if (fs instanceof HFileSystem) { 143 fileSystem = ((HFileSystem) fs).getBackingFs(); 144 } 145 return fileSystem instanceof DistributedFileSystem; 146 } 147 148 /** 149 * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the 150 * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true. Does not consider 151 * schema; i.e. if schemas different but path or subpath matches, the two will equate. 152 * @param pathToSearch Path we will be trying to match. 153 * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code> 154 */ 155 public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) { 156 Path tailPath = pathTail; 157 String tailName; 158 Path toSearch = pathToSearch; 159 String toSearchName; 160 boolean result = false; 161 162 if (pathToSearch.depth() != pathTail.depth()) { 163 return false; 164 } 165 166 do { 167 tailName = tailPath.getName(); 168 if (tailName == null || tailName.isEmpty()) { 169 result = true; 170 break; 171 } 172 toSearchName = toSearch.getName(); 173 if (toSearchName == null || toSearchName.isEmpty()) { 174 break; 175 } 176 // Move up a parent on each path for next go around. Path doesn't let us go off the end. 177 tailPath = tailPath.getParent(); 178 toSearch = toSearch.getParent(); 179 } while (tailName.equals(toSearchName)); 180 return result; 181 } 182 183 /** 184 * Delete the region directory if exists. 185 * @return True if deleted the region directory. 186 */ 187 public static boolean deleteRegionDir(final Configuration conf, final RegionInfo hri) 188 throws IOException { 189 Path rootDir = CommonFSUtils.getRootDir(conf); 190 FileSystem fs = rootDir.getFileSystem(conf); 191 return CommonFSUtils.deleteDirectory(fs, 192 new Path(CommonFSUtils.getTableDir(rootDir, hri.getTable()), hri.getEncodedName())); 193 } 194 195 /** 196 * Create the specified file on the filesystem. By default, this will: 197 * <ol> 198 * <li>overwrite the file if it exists</li> 199 * <li>apply the umask in the configuration (if it is enabled)</li> 200 * <li>use the fs configured buffer size (or 4096 if not set)</li> 201 * <li>use the configured column family replication or default replication if 202 * {@link ColumnFamilyDescriptorBuilder#DEFAULT_DFS_REPLICATION}</li> 203 * <li>use the default block size</li> 204 * <li>not track progress</li> 205 * </ol> 206 * @param conf configurations 207 * @param fs {@link FileSystem} on which to write the file 208 * @param path {@link Path} to the file to write 209 * @param perm permissions 210 * @param favoredNodes favored data nodes 211 * @return output stream to the created file 212 * @throws IOException if the file cannot be created 213 */ 214 public static FSDataOutputStream create(Configuration conf, FileSystem fs, Path path, 215 FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException { 216 return create(conf, fs, path, perm, favoredNodes, true); 217 } 218 219 /** 220 * Create the specified file on the filesystem. By default, this will: 221 * <ol> 222 * <li>overwrite the file if it exists</li> 223 * <li>apply the umask in the configuration (if it is enabled)</li> 224 * <li>use the fs configured buffer size (or 4096 if not set)</li> 225 * <li>use the configured column family replication or default replication if 226 * {@link ColumnFamilyDescriptorBuilder#DEFAULT_DFS_REPLICATION}</li> 227 * <li>use the default block size</li> 228 * <li>not track progress</li> 229 * </ol> 230 * @param conf configurations 231 * @param fs {@link FileSystem} on which to write the file 232 * @param path {@link Path} to the file to write 233 * @param perm permissions 234 * @param favoredNodes favored data nodes 235 * @param isRecursiveCreate recursively create parent directories 236 * @return output stream to the created file 237 * @throws IOException if the file cannot be created 238 */ 239 public static FSDataOutputStream create(Configuration conf, FileSystem fs, Path path, 240 FsPermission perm, InetSocketAddress[] favoredNodes, boolean isRecursiveCreate) 241 throws IOException { 242 if (fs instanceof HFileSystem) { 243 FileSystem backingFs = ((HFileSystem) fs).getBackingFs(); 244 if (backingFs instanceof DistributedFileSystem) { 245 short replication = Short.parseShort(conf.get(ColumnFamilyDescriptorBuilder.DFS_REPLICATION, 246 String.valueOf(ColumnFamilyDescriptorBuilder.DEFAULT_DFS_REPLICATION))); 247 DistributedFileSystem.HdfsDataOutputStreamBuilder builder = 248 ((DistributedFileSystem) backingFs).createFile(path).recursive().permission(perm) 249 .create(); 250 if (favoredNodes != null) { 251 builder.favoredNodes(favoredNodes); 252 } 253 if (replication > 0) { 254 builder.replication(replication); 255 } 256 return builder.build(); 257 } 258 259 } 260 return CommonFSUtils.create(fs, path, perm, true, isRecursiveCreate); 261 } 262 263 /** 264 * Checks to see if the specified file system is available 265 * @param fs filesystem 266 * @throws IOException e 267 */ 268 public static void checkFileSystemAvailable(final FileSystem fs) throws IOException { 269 if (!(fs instanceof DistributedFileSystem)) { 270 return; 271 } 272 IOException exception = null; 273 DistributedFileSystem dfs = (DistributedFileSystem) fs; 274 try { 275 if (dfs.exists(new Path("/"))) { 276 return; 277 } 278 } catch (IOException e) { 279 exception = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; 280 } 281 try { 282 fs.close(); 283 } catch (Exception e) { 284 LOG.error("file system close failed: ", e); 285 } 286 throw new IOException("File system is not available", exception); 287 } 288 289 /** 290 * Inquire the Active NameNode's safe mode status. 291 * @param dfs A DistributedFileSystem object representing the underlying HDFS. 292 * @return whether we're in safe mode 293 */ 294 private static boolean isInSafeMode(FileSystem dfs) throws IOException { 295 if (isDistributedFileSystem(dfs)) { 296 return ((DistributedFileSystem) dfs).setSafeMode(SAFEMODE_GET, true); 297 } else { 298 try { 299 Object ret = dfs.getClass() 300 .getMethod("setSafeMode", new Class[] { safeModeActionClazz, Boolean.class }) 301 .invoke(dfs, safeModeGet, true); 302 return (Boolean) ret; 303 } catch (InvocationTargetException | IllegalAccessException | NoSuchMethodException e) { 304 LOG.error("The file system does not support setSafeMode(). Abort.", e); 305 throw new RuntimeException(e); 306 } 307 } 308 } 309 310 /** 311 * Check whether dfs is in safemode. 312 */ 313 public static void checkDfsSafeMode(final Configuration conf) throws IOException { 314 boolean isInSafeMode = false; 315 FileSystem fs = FileSystem.get(conf); 316 if (supportSafeMode(fs)) { 317 isInSafeMode = isInSafeMode(fs); 318 } 319 if (isInSafeMode) { 320 throw new IOException("File system is in safemode, it can't be written now"); 321 } 322 } 323 324 /** 325 * Verifies current version of file system 326 * @param fs filesystem object 327 * @param rootdir root hbase directory 328 * @return null if no version file exists, version string otherwise 329 * @throws IOException if the version file fails to open 330 * @throws DeserializationException if the version data cannot be translated into a version 331 */ 332 public static String getVersion(FileSystem fs, Path rootdir) 333 throws IOException, DeserializationException { 334 final Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME); 335 FileStatus[] status = null; 336 try { 337 // hadoop 2.0 throws FNFE if directory does not exist. 338 // hadoop 1.0 returns null if directory does not exist. 339 status = fs.listStatus(versionFile); 340 } catch (FileNotFoundException fnfe) { 341 return null; 342 } 343 if (ArrayUtils.getLength(status) == 0) { 344 return null; 345 } 346 String version = null; 347 byte[] content = new byte[(int) status[0].getLen()]; 348 FSDataInputStream s = fs.open(versionFile); 349 try { 350 IOUtils.readFully(s, content, 0, content.length); 351 if (ProtobufUtil.isPBMagicPrefix(content)) { 352 version = parseVersionFrom(content); 353 } else { 354 // Presume it pre-pb format. 355 try (DataInputStream dis = new DataInputStream(new ByteArrayInputStream(content))) { 356 version = dis.readUTF(); 357 } 358 } 359 } catch (EOFException eof) { 360 LOG.warn("Version file was empty, odd, will try to set it."); 361 } finally { 362 s.close(); 363 } 364 return version; 365 } 366 367 /** 368 * Parse the content of the ${HBASE_ROOTDIR}/hbase.version file. 369 * @param bytes The byte content of the hbase.version file 370 * @return The version found in the file as a String 371 * @throws DeserializationException if the version data cannot be translated into a version 372 */ 373 static String parseVersionFrom(final byte[] bytes) throws DeserializationException { 374 ProtobufUtil.expectPBMagicPrefix(bytes); 375 int pblen = ProtobufUtil.lengthOfPBMagic(); 376 FSProtos.HBaseVersionFileContent.Builder builder = 377 FSProtos.HBaseVersionFileContent.newBuilder(); 378 try { 379 ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen); 380 return builder.getVersion(); 381 } catch (IOException e) { 382 // Convert 383 throw new DeserializationException(e); 384 } 385 } 386 387 /** 388 * Create the content to write into the ${HBASE_ROOTDIR}/hbase.version file. 389 * @param version Version to persist 390 * @return Serialized protobuf with <code>version</code> content and a bit of pb magic for a 391 * prefix. 392 */ 393 static byte[] toVersionByteArray(final String version) { 394 FSProtos.HBaseVersionFileContent.Builder builder = 395 FSProtos.HBaseVersionFileContent.newBuilder(); 396 return ProtobufUtil.prependPBMagic(builder.setVersion(version).build().toByteArray()); 397 } 398 399 /** 400 * Verifies current version of file system 401 * @param fs file system 402 * @param rootdir root directory of HBase installation 403 * @param message if true, issues a message on System.out 404 * @throws IOException if the version file cannot be opened 405 * @throws DeserializationException if the contents of the version file cannot be parsed 406 */ 407 public static void checkVersion(FileSystem fs, Path rootdir, boolean message) 408 throws IOException, DeserializationException { 409 checkVersion(fs, rootdir, message, 0, HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS); 410 } 411 412 /** 413 * Verifies current version of file system 414 * @param fs file system 415 * @param rootdir root directory of HBase installation 416 * @param message if true, issues a message on System.out 417 * @param wait wait interval 418 * @param retries number of times to retry 419 * @throws IOException if the version file cannot be opened 420 * @throws DeserializationException if the contents of the version file cannot be parsed 421 */ 422 public static void checkVersion(FileSystem fs, Path rootdir, boolean message, int wait, 423 int retries) throws IOException, DeserializationException { 424 String version = getVersion(fs, rootdir); 425 String msg; 426 if (version == null) { 427 if (!metaRegionExists(fs, rootdir)) { 428 // rootDir is empty (no version file and no root region) 429 // just create new version file (HBASE-1195) 430 setVersion(fs, rootdir, wait, retries); 431 return; 432 } else { 433 msg = "hbase.version file is missing. Is your hbase.rootdir valid? " 434 + "You can restore hbase.version file by running 'HBCK2 filesystem -fix'. " 435 + "See https://github.com/apache/hbase-operator-tools/tree/master/hbase-hbck2"; 436 } 437 } else if (version.compareTo(HConstants.FILE_SYSTEM_VERSION) == 0) { 438 return; 439 } else { 440 msg = "HBase file layout needs to be upgraded. Current filesystem version is " + version 441 + " but software requires version " + HConstants.FILE_SYSTEM_VERSION 442 + ". Consult https://hbase.apache.org/docs for further information about " 443 + "upgrading HBase."; 444 } 445 446 // version is deprecated require migration 447 // Output on stdout so user sees it in terminal. 448 if (message) { 449 System.out.println("WARNING! " + msg); 450 } 451 throw new FileSystemVersionException(msg); 452 } 453 454 /** 455 * Sets version of file system 456 * @param fs filesystem object 457 * @param rootdir hbase root 458 * @throws IOException e 459 */ 460 public static void setVersion(FileSystem fs, Path rootdir) throws IOException { 461 setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, 0, 462 HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS); 463 } 464 465 /** 466 * Sets version of file system 467 * @param fs filesystem object 468 * @param rootdir hbase root 469 * @param wait time to wait for retry 470 * @param retries number of times to retry before failing 471 * @throws IOException e 472 */ 473 public static void setVersion(FileSystem fs, Path rootdir, int wait, int retries) 474 throws IOException { 475 setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, wait, retries); 476 } 477 478 /** 479 * Sets version of file system 480 * @param fs filesystem object 481 * @param rootdir hbase root directory 482 * @param version version to set 483 * @param wait time to wait for retry 484 * @param retries number of times to retry before throwing an IOException 485 * @throws IOException e 486 */ 487 public static void setVersion(FileSystem fs, Path rootdir, String version, int wait, int retries) 488 throws IOException { 489 Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME); 490 Path tempVersionFile = new Path(rootdir, 491 HConstants.HBASE_TEMP_DIRECTORY + Path.SEPARATOR + HConstants.VERSION_FILE_NAME); 492 while (true) { 493 try { 494 // Write the version to a temporary file 495 FSDataOutputStream s = fs.create(tempVersionFile); 496 try { 497 s.write(toVersionByteArray(version)); 498 s.close(); 499 s = null; 500 // Move the temp version file to its normal location. Returns false 501 // if the rename failed. Throw an IOE in that case. 502 if (!fs.rename(tempVersionFile, versionFile)) { 503 throw new IOException("Unable to move temp version file to " + versionFile); 504 } 505 } finally { 506 // Cleaning up the temporary if the rename failed would be trying 507 // too hard. We'll unconditionally create it again the next time 508 // through anyway, files are overwritten by default by create(). 509 510 // Attempt to close the stream on the way out if it is still open. 511 try { 512 if (s != null) s.close(); 513 } catch (IOException ignore) { 514 } 515 } 516 LOG.info("Created version file at " + rootdir.toString() + " with version=" + version); 517 return; 518 } catch (IOException e) { 519 if (retries > 0) { 520 LOG.debug("Unable to create version file at " + rootdir.toString() + ", retrying", e); 521 fs.delete(versionFile, false); 522 try { 523 if (wait > 0) { 524 Thread.sleep(wait); 525 } 526 } catch (InterruptedException ie) { 527 throw (InterruptedIOException) new InterruptedIOException().initCause(ie); 528 } 529 retries--; 530 } else { 531 throw e; 532 } 533 } 534 } 535 } 536 537 /** 538 * Checks that a cluster ID file exists in the HBase root directory 539 * @param fs the root directory FileSystem 540 * @param rootdir the HBase root directory in HDFS 541 * @param wait how long to wait between retries 542 * @return <code>true</code> if the file exists, otherwise <code>false</code> 543 * @throws IOException if checking the FileSystem fails 544 */ 545 public static boolean checkFileExistsInHbaseRootDir(FileSystem fs, Path rootdir, String file, 546 long wait) throws IOException { 547 while (true) { 548 try { 549 Path filePath = new Path(rootdir, file); 550 return fs.exists(filePath); 551 } catch (IOException ioe) { 552 if (wait > 0L) { 553 LOG.warn("Unable to check file {} in {}, retrying in {}ms", file, rootdir, wait, ioe); 554 try { 555 Thread.sleep(wait); 556 } catch (InterruptedException e) { 557 Thread.currentThread().interrupt(); 558 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 559 } 560 } else { 561 throw ioe; 562 } 563 } 564 } 565 } 566 567 /** 568 * Use the given parser object to read and parse contents of Cluster Id file. e.g. Cluster Id or 569 * Active read-replica Cluster Id 570 */ 571 public static <T extends ClusterIdFile> T getClusterIdFile(FileSystem fs, Path rootdir, 572 ClusterIdFileParser<T> parser) throws IOException { 573 Path idPath = new Path(rootdir, parser.getFileName()); 574 T cs = null; 575 FileStatus status = fs.exists(idPath) ? fs.getFileStatus(idPath) : null; 576 if (status != null) { 577 int len = Ints.checkedCast(status.getLen()); 578 byte[] content = new byte[len]; 579 FSDataInputStream in = fs.open(idPath); 580 try { 581 in.readFully(content); 582 } catch (EOFException eof) { 583 LOG.warn("Cluster file {} is empty", idPath); 584 } finally { 585 in.close(); 586 } 587 try { 588 cs = parser.parseFrom(content); 589 } catch (DeserializationException e) { 590 throw new IOException("content=" + Bytes.toString(content), e); 591 } 592 // If not pb'd, make it so. 593 if (!ProtobufUtil.isPBMagicPrefix(content)) { 594 String cid = null; 595 in = fs.open(idPath); 596 try { 597 cid = in.readUTF(); 598 cs = parser.readString(cid); 599 } catch (EOFException eof) { 600 LOG.warn("Cluster file {} is empty", idPath); 601 } finally { 602 in.close(); 603 } 604 rewriteAsPb(fs, rootdir, idPath, parser.getFileName(), cs); 605 } 606 return cs; 607 } else { 608 LOG.warn("Cluster file does not exist at {}", idPath); 609 } 610 return cs; 611 } 612 613 private static <T extends ClusterIdFile> void rewriteAsPb(final FileSystem fs, final Path rootdir, 614 final Path p, final String fileName, final T cs) throws IOException { 615 // Rewrite the file as pb. Move aside the old one first, write new 616 // then delete the moved-aside file. 617 Path movedAsideName = new Path(p + "." + EnvironmentEdgeManager.currentTime()); 618 if (!fs.rename(p, movedAsideName)) throw new IOException("Failed rename of " + p); 619 setClusterIdFile(fs, rootdir, fileName, cs, 100); 620 if (!fs.delete(movedAsideName, false)) { 621 throw new IOException("Failed delete of " + movedAsideName); 622 } 623 LOG.debug("Rewrote the {} file as pb", fileName); 624 } 625 626 /** 627 * Writes a new unique identifier for this cluster to the Cluster Id ("hbase.id" or 628 * "active.cluster.suffix.id") file in the HBase root directory. If any operations on the ID file 629 * fails, and {@code wait} is a positive value, the method will retry to produce the ID file until 630 * the thread is forcibly interrupted. 631 * @param fs the root directory FileSystem 632 * @param rootdir the path to the HBase root directory 633 * @param fileName name of the file to be written 634 * @param cs the object to be written 635 * @param wait how long (in milliseconds) to wait between retries 636 * @throws IOException if writing to the FileSystem fails and no wait value 637 */ 638 public static <T extends ClusterIdFile> void setClusterIdFile(final FileSystem fs, 639 final Path rootdir, final String fileName, final T cs, final long wait) throws IOException { 640 final Path idFile = new Path(rootdir, fileName); 641 final Path tempDir = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY); 642 final Path tempIdFile = new Path(tempDir, fileName); 643 644 LOG.debug("Cluster file [{}] is present and contains cluster id: {}", idFile, cs); 645 writeClusterInfo(fs, rootdir, idFile, tempIdFile, cs.toByteArray(), wait); 646 } 647 648 /** 649 * Writes information about this cluster to the specified file. For ex, it is used for writing 650 * cluster id in "hbase.id" file in the HBase root directory. Also, used for writing active 651 * cluster suffix in "active_cluster_suffix.id" file. If any operations on the ID file fails, and 652 * {@code wait} is a positive value, the method will retry to produce the ID file until the thread 653 * is forcibly interrupted. 654 */ 655 private static void writeClusterInfo(final FileSystem fs, final Path rootdir, final Path idFile, 656 final Path tempIdFile, byte[] fileData, final long wait) throws IOException { 657 while (true) { 658 Optional<IOException> failure = Optional.empty(); 659 660 LOG.debug("Write the file to a temporary location: {}", tempIdFile); 661 try (FSDataOutputStream s = fs.create(tempIdFile)) { 662 s.write(fileData); 663 } catch (IOException ioe) { 664 failure = Optional.of(ioe); 665 } 666 667 if (!failure.isPresent()) { 668 try { 669 LOG.debug("Move the temporary file to its target location [{}]:[{}]", tempIdFile, idFile); 670 671 if (!fs.rename(tempIdFile, idFile)) { 672 failure = Optional.of(new IOException("Unable to move temp file to " + idFile)); 673 } 674 } catch (IOException ioe) { 675 failure = Optional.of(ioe); 676 } 677 } 678 679 if (failure.isPresent()) { 680 final IOException cause = failure.get(); 681 if (wait > 0L) { 682 LOG.warn("Unable to create file in {}, retrying in {}ms", rootdir, wait, cause); 683 try { 684 Thread.sleep(wait); 685 } catch (InterruptedException e) { 686 Thread.currentThread().interrupt(); 687 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 688 } 689 continue; 690 } else { 691 throw cause; 692 } 693 } else { 694 return; 695 } 696 } 697 } 698 699 /** 700 * If DFS, check safe mode and if so, wait until we clear it. 701 * @param conf configuration 702 * @param wait Sleep between retries 703 * @throws IOException e 704 */ 705 public static void waitOnSafeMode(final Configuration conf, final long wait) throws IOException { 706 FileSystem fs = FileSystem.get(conf); 707 if (!supportSafeMode(fs)) { 708 return; 709 } 710 // Make sure dfs is not in safe mode 711 while (isInSafeMode(fs)) { 712 LOG.info("Waiting for dfs to exit safe mode..."); 713 try { 714 Thread.sleep(wait); 715 } catch (InterruptedException e) { 716 Thread.currentThread().interrupt(); 717 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 718 } 719 } 720 } 721 722 public static boolean supportSafeMode(FileSystem fs) { 723 // return true if HDFS. 724 if (fs instanceof DistributedFileSystem) { 725 return true; 726 } 727 // return true if the file system implements SafeMode interface. 728 if (safeModeClazz != null) { 729 return (safeModeClazz.isAssignableFrom(fs.getClass())); 730 } 731 // return false if the file system is not HDFS and does not implement SafeMode interface. 732 return false; 733 } 734 735 /** 736 * Checks if meta region exists 737 * @param fs file system 738 * @param rootDir root directory of HBase installation 739 * @return true if exists 740 */ 741 public static boolean metaRegionExists(FileSystem fs, Path rootDir) throws IOException { 742 Path metaRegionDir = getRegionDirFromRootDir(rootDir, RegionInfoBuilder.FIRST_META_REGIONINFO); 743 return fs.exists(metaRegionDir); 744 } 745 746 /** 747 * Compute HDFS block distribution of a given HdfsDataInputStream. All HdfsDataInputStreams are 748 * backed by a series of LocatedBlocks, which are fetched periodically from the namenode. This 749 * method retrieves those blocks from the input stream and uses them to calculate 750 * HDFSBlockDistribution. The underlying method in DFSInputStream does attempt to use locally 751 * cached blocks, but may hit the namenode if the cache is determined to be incomplete. The method 752 * also involves making copies of all LocatedBlocks rather than return the underlying blocks 753 * themselves. 754 */ 755 static public HDFSBlocksDistribution 756 computeHDFSBlocksDistribution(HdfsDataInputStream inputStream) throws IOException { 757 List<LocatedBlock> blocks = inputStream.getAllBlocks(); 758 HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution(); 759 for (LocatedBlock block : blocks) { 760 String[] hosts = getHostsForLocations(block); 761 long len = block.getBlockSize(); 762 StorageType[] storageTypes = block.getStorageTypes(); 763 blocksDistribution.addHostsAndBlockWeight(hosts, len, storageTypes); 764 } 765 return blocksDistribution; 766 } 767 768 private static String[] getHostsForLocations(LocatedBlock block) { 769 DatanodeInfo[] locations = getLocatedBlockLocations(block); 770 String[] hosts = new String[locations.length]; 771 for (int i = 0; i < hosts.length; i++) { 772 hosts[i] = locations[i].getHostName(); 773 } 774 return hosts; 775 } 776 777 /** 778 * Compute HDFS blocks distribution of a given file, or a portion of the file 779 * @param fs file system 780 * @param status file status of the file 781 * @param start start position of the portion 782 * @param length length of the portion 783 * @return The HDFS blocks distribution 784 */ 785 static public HDFSBlocksDistribution computeHDFSBlocksDistribution(final FileSystem fs, 786 FileStatus status, long start, long length) throws IOException { 787 HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution(); 788 BlockLocation[] blockLocations = fs.getFileBlockLocations(status, start, length); 789 addToHDFSBlocksDistribution(blocksDistribution, blockLocations); 790 return blocksDistribution; 791 } 792 793 /** 794 * Update blocksDistribution with blockLocations 795 * @param blocksDistribution the hdfs blocks distribution 796 * @param blockLocations an array containing block location 797 */ 798 static public void addToHDFSBlocksDistribution(HDFSBlocksDistribution blocksDistribution, 799 BlockLocation[] blockLocations) throws IOException { 800 for (BlockLocation bl : blockLocations) { 801 String[] hosts = bl.getHosts(); 802 long len = bl.getLength(); 803 StorageType[] storageTypes = bl.getStorageTypes(); 804 blocksDistribution.addHostsAndBlockWeight(hosts, len, storageTypes); 805 } 806 } 807 808 // TODO move this method OUT of FSUtils. No dependencies to HMaster 809 /** 810 * Returns the total overall fragmentation percentage. Includes hbase:meta and -ROOT- as well. 811 * @param master The master defining the HBase root and file system 812 * @return A map for each table and its percentage (never null) 813 * @throws IOException When scanning the directory fails 814 */ 815 public static int getTotalTableFragmentation(final HMaster master) throws IOException { 816 Map<String, Integer> map = getTableFragmentation(master); 817 return map.isEmpty() ? -1 : map.get("-TOTAL-"); 818 } 819 820 /** 821 * Runs through the HBase rootdir and checks how many stores for each table have more than one 822 * file in them. Checks -ROOT- and hbase:meta too. The total percentage across all tables is 823 * stored under the special key "-TOTAL-". 824 * @param master The master defining the HBase root and file system. 825 * @return A map for each table and its percentage (never null). 826 * @throws IOException When scanning the directory fails. 827 */ 828 public static Map<String, Integer> getTableFragmentation(final HMaster master) 829 throws IOException { 830 Path path = CommonFSUtils.getRootDir(master.getConfiguration()); 831 // since HMaster.getFileSystem() is package private 832 FileSystem fs = path.getFileSystem(master.getConfiguration()); 833 return getTableFragmentation(fs, path); 834 } 835 836 /** 837 * Runs through the HBase rootdir and checks how many stores for each table have more than one 838 * file in them. Checks -ROOT- and hbase:meta too. The total percentage across all tables is 839 * stored under the special key "-TOTAL-". 840 * @param fs The file system to use 841 * @param hbaseRootDir The root directory to scan 842 * @return A map for each table and its percentage (never null) 843 * @throws IOException When scanning the directory fails 844 */ 845 public static Map<String, Integer> getTableFragmentation(final FileSystem fs, 846 final Path hbaseRootDir) throws IOException { 847 Map<String, Integer> frags = new HashMap<>(); 848 int cfCountTotal = 0; 849 int cfFragTotal = 0; 850 PathFilter regionFilter = new RegionDirFilter(fs); 851 PathFilter familyFilter = new FamilyDirFilter(fs); 852 List<Path> tableDirs = getTableDirs(fs, hbaseRootDir); 853 for (Path d : tableDirs) { 854 int cfCount = 0; 855 int cfFrag = 0; 856 FileStatus[] regionDirs = fs.listStatus(d, regionFilter); 857 for (FileStatus regionDir : regionDirs) { 858 Path dd = regionDir.getPath(); 859 // else its a region name, now look in region for families 860 FileStatus[] familyDirs = fs.listStatus(dd, familyFilter); 861 for (FileStatus familyDir : familyDirs) { 862 cfCount++; 863 cfCountTotal++; 864 Path family = familyDir.getPath(); 865 // now in family make sure only one file 866 FileStatus[] familyStatus = fs.listStatus(family); 867 if (familyStatus.length > 1) { 868 cfFrag++; 869 cfFragTotal++; 870 } 871 } 872 } 873 // compute percentage per table and store in result list 874 frags.put(CommonFSUtils.getTableName(d).getNameAsString(), 875 cfCount == 0 ? 0 : Math.round((float) cfFrag / cfCount * 100)); 876 } 877 // set overall percentage for all tables 878 frags.put("-TOTAL-", 879 cfCountTotal == 0 ? 0 : Math.round((float) cfFragTotal / cfCountTotal * 100)); 880 return frags; 881 } 882 883 public static void renameFile(FileSystem fs, Path src, Path dst) throws IOException { 884 if (fs.exists(dst) && !fs.delete(dst, false)) { 885 throw new IOException("Can not delete " + dst); 886 } 887 if (!fs.rename(src, dst)) { 888 throw new IOException("Can not rename from " + src + " to " + dst); 889 } 890 } 891 892 /** 893 * A {@link PathFilter} that returns only regular files. 894 */ 895 static class FileFilter extends AbstractFileStatusFilter { 896 private final FileSystem fs; 897 898 public FileFilter(final FileSystem fs) { 899 this.fs = fs; 900 } 901 902 @Override 903 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 904 try { 905 return isFile(fs, isDir, p); 906 } catch (IOException e) { 907 LOG.warn("Unable to verify if path={} is a regular file", p, e); 908 return false; 909 } 910 } 911 } 912 913 /** 914 * Directory filter that doesn't include any of the directories in the specified blacklist 915 */ 916 public static class BlackListDirFilter extends AbstractFileStatusFilter { 917 private final FileSystem fs; 918 private List<String> blacklist; 919 920 /** 921 * Create a filter on the givem filesystem with the specified blacklist 922 * @param fs filesystem to filter 923 * @param directoryNameBlackList list of the names of the directories to filter. If 924 * <tt>null</tt>, all directories are returned 925 */ 926 @SuppressWarnings("unchecked") 927 public BlackListDirFilter(final FileSystem fs, final List<String> directoryNameBlackList) { 928 this.fs = fs; 929 blacklist = (List<String>) (directoryNameBlackList == null 930 ? Collections.emptyList() 931 : directoryNameBlackList); 932 } 933 934 @Override 935 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 936 if (!isValidName(p.getName())) { 937 return false; 938 } 939 940 try { 941 return isDirectory(fs, isDir, p); 942 } catch (IOException e) { 943 LOG.warn("An error occurred while verifying if [{}] is a valid directory." 944 + " Returning 'not valid' and continuing.", p, e); 945 return false; 946 } 947 } 948 949 protected boolean isValidName(final String name) { 950 return !blacklist.contains(name); 951 } 952 } 953 954 /** 955 * A {@link PathFilter} that only allows directories. 956 */ 957 public static class DirFilter extends BlackListDirFilter { 958 959 public DirFilter(FileSystem fs) { 960 super(fs, null); 961 } 962 } 963 964 /** 965 * A {@link PathFilter} that returns usertable directories. To get all directories use the 966 * {@link BlackListDirFilter} with a <tt>null</tt> blacklist 967 */ 968 public static class UserTableDirFilter extends BlackListDirFilter { 969 public UserTableDirFilter(FileSystem fs) { 970 super(fs, HConstants.HBASE_NON_TABLE_DIRS); 971 } 972 973 @Override 974 protected boolean isValidName(final String name) { 975 if (!super.isValidName(name)) return false; 976 977 try { 978 TableName.isLegalTableQualifierName(Bytes.toBytes(name)); 979 } catch (IllegalArgumentException e) { 980 LOG.info("Invalid table name: {}", name); 981 return false; 982 } 983 return true; 984 } 985 } 986 987 public static List<Path> getTableDirs(final FileSystem fs, final Path rootdir) 988 throws IOException { 989 List<Path> tableDirs = new ArrayList<>(); 990 Path baseNamespaceDir = new Path(rootdir, HConstants.BASE_NAMESPACE_DIR); 991 if (fs.exists(baseNamespaceDir)) { 992 for (FileStatus status : fs.globStatus(new Path(baseNamespaceDir, "*"))) { 993 tableDirs.addAll(FSUtils.getLocalTableDirs(fs, status.getPath())); 994 } 995 } 996 return tableDirs; 997 } 998 999 /** 1000 * @return All the table directories under <code>rootdir</code>. Ignore non table hbase folders 1001 * such as .logs, .oldlogs, .corrupt folders. 1002 */ 1003 public static List<Path> getLocalTableDirs(final FileSystem fs, final Path rootdir) 1004 throws IOException { 1005 // presumes any directory under hbase.rootdir is a table 1006 FileStatus[] dirs = fs.listStatus(rootdir, new UserTableDirFilter(fs)); 1007 List<Path> tabledirs = new ArrayList<>(dirs.length); 1008 for (FileStatus dir : dirs) { 1009 tabledirs.add(dir.getPath()); 1010 } 1011 return tabledirs; 1012 } 1013 1014 /** 1015 * A filter to exclude meta tables belonging to foreign clusters. This is essential in a 1016 * read-replica setup where multiple clusters share the same fs. 1017 * @param tablePath The Path to the table directory. 1018 * @return {@code true} if the path is a regular table or the cluster's own meta table. 1019 * {@code false} if it is a meta table belonging to a different cluster. 1020 */ 1021 public static boolean isLocalMetaTable(Path tablePath) { 1022 if (tablePath == null) { 1023 return false; 1024 } 1025 String dirName = tablePath.getName(); 1026 if (dirName.startsWith(TableName.META_TABLE_NAME.getQualifierAsString())) { 1027 return TableName.valueOf(TableName.META_TABLE_NAME.getNamespaceAsString(), dirName) 1028 .equals(TableName.META_TABLE_NAME); 1029 } 1030 return true; 1031 } 1032 1033 /** 1034 * Filter for all dirs that don't start with '.' 1035 */ 1036 public static class RegionDirFilter extends AbstractFileStatusFilter { 1037 // This pattern will accept 0.90+ style hex region dirs and older numeric region dir names. 1038 final public static Pattern regionDirPattern = Pattern.compile("^[0-9a-f]*$"); 1039 final FileSystem fs; 1040 1041 public RegionDirFilter(FileSystem fs) { 1042 this.fs = fs; 1043 } 1044 1045 @Override 1046 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 1047 if (!regionDirPattern.matcher(p.getName()).matches()) { 1048 return false; 1049 } 1050 1051 try { 1052 return isDirectory(fs, isDir, p); 1053 } catch (IOException ioe) { 1054 // Maybe the file was moved or the fs was disconnected. 1055 LOG.warn("Skipping file {} due to IOException", p, ioe); 1056 return false; 1057 } 1058 } 1059 } 1060 1061 /** 1062 * Given a particular table dir, return all the regiondirs inside it, excluding files such as 1063 * .tableinfo 1064 * @param fs A file system for the Path 1065 * @param tableDir Path to a specific table directory <hbase.rootdir>/<tabledir> 1066 * @return List of paths to valid region directories in table dir. 1067 */ 1068 public static List<Path> getRegionDirs(final FileSystem fs, final Path tableDir) 1069 throws IOException { 1070 // assumes we are in a table dir. 1071 List<FileStatus> rds = listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs)); 1072 if (rds == null) { 1073 return Collections.emptyList(); 1074 } 1075 List<Path> regionDirs = new ArrayList<>(rds.size()); 1076 for (FileStatus rdfs : rds) { 1077 Path rdPath = rdfs.getPath(); 1078 regionDirs.add(rdPath); 1079 } 1080 return regionDirs; 1081 } 1082 1083 public static Path getRegionDirFromRootDir(Path rootDir, RegionInfo region) { 1084 return getRegionDirFromTableDir(CommonFSUtils.getTableDir(rootDir, region.getTable()), region); 1085 } 1086 1087 public static Path getRegionDirFromTableDir(Path tableDir, RegionInfo region) { 1088 return getRegionDirFromTableDir(tableDir, 1089 ServerRegionReplicaUtil.getRegionInfoForFs(region).getEncodedName()); 1090 } 1091 1092 public static Path getRegionDirFromTableDir(Path tableDir, String encodedRegionName) { 1093 return new Path(tableDir, encodedRegionName); 1094 } 1095 1096 /** 1097 * Filter for all dirs that are legal column family names. This is generally used for colfam dirs 1098 * <hbase.rootdir>/<tabledir>/<regiondir>/<colfamdir>. 1099 */ 1100 public static class FamilyDirFilter extends AbstractFileStatusFilter { 1101 final FileSystem fs; 1102 1103 public FamilyDirFilter(FileSystem fs) { 1104 this.fs = fs; 1105 } 1106 1107 @Override 1108 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 1109 try { 1110 // throws IAE if invalid 1111 ColumnFamilyDescriptorBuilder.isLegalColumnFamilyName(Bytes.toBytes(p.getName())); 1112 } catch (IllegalArgumentException iae) { 1113 // path name is an invalid family name and thus is excluded. 1114 return false; 1115 } 1116 1117 try { 1118 return isDirectory(fs, isDir, p); 1119 } catch (IOException ioe) { 1120 // Maybe the file was moved or the fs was disconnected. 1121 LOG.warn("Skipping file {} due to IOException", p, ioe); 1122 return false; 1123 } 1124 } 1125 } 1126 1127 /** 1128 * Given a particular region dir, return all the familydirs inside it 1129 * @param fs A file system for the Path 1130 * @param regionDir Path to a specific region directory 1131 * @return List of paths to valid family directories in region dir. 1132 */ 1133 public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir) 1134 throws IOException { 1135 // assumes we are in a region dir. 1136 return getFilePaths(fs, regionDir, new FamilyDirFilter(fs)); 1137 } 1138 1139 public static List<Path> getReferenceFilePaths(final FileSystem fs, final Path familyDir) 1140 throws IOException { 1141 return getFilePaths(fs, familyDir, new ReferenceFileFilter(fs)); 1142 } 1143 1144 public static List<Path> getReferenceAndLinkFilePaths(final FileSystem fs, final Path familyDir) 1145 throws IOException { 1146 return getFilePaths(fs, familyDir, new ReferenceAndLinkFileFilter(fs)); 1147 } 1148 1149 private static List<Path> getFilePaths(final FileSystem fs, final Path dir, 1150 final PathFilter pathFilter) throws IOException { 1151 FileStatus[] fds = fs.listStatus(dir, pathFilter); 1152 List<Path> files = new ArrayList<>(fds.length); 1153 for (FileStatus fdfs : fds) { 1154 Path fdPath = fdfs.getPath(); 1155 files.add(fdPath); 1156 } 1157 return files; 1158 } 1159 1160 public static int getRegionReferenceAndLinkFileCount(final FileSystem fs, final Path p) { 1161 int result = 0; 1162 try { 1163 for (Path familyDir : getFamilyDirs(fs, p)) { 1164 result += getReferenceAndLinkFilePaths(fs, familyDir).size(); 1165 } 1166 } catch (IOException e) { 1167 LOG.warn("Error Counting reference files.", e); 1168 } 1169 return result; 1170 } 1171 1172 public static class ReferenceAndLinkFileFilter implements PathFilter { 1173 1174 private final FileSystem fs; 1175 1176 public ReferenceAndLinkFileFilter(FileSystem fs) { 1177 this.fs = fs; 1178 } 1179 1180 @Override 1181 public boolean accept(Path rd) { 1182 try { 1183 // only files can be references. 1184 return !fs.getFileStatus(rd).isDirectory() 1185 && (StoreFileInfo.isReference(rd) || HFileLink.isHFileLink(rd)); 1186 } catch (IOException ioe) { 1187 // Maybe the file was moved or the fs was disconnected. 1188 LOG.warn("Skipping file " + rd + " due to IOException", ioe); 1189 return false; 1190 } 1191 } 1192 } 1193 1194 /** 1195 * Filter for HFiles that excludes reference files. 1196 */ 1197 public static class HFileFilter extends AbstractFileStatusFilter { 1198 final FileSystem fs; 1199 1200 public HFileFilter(FileSystem fs) { 1201 this.fs = fs; 1202 } 1203 1204 @Override 1205 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 1206 if (!StoreFileInfo.isHFile(p) && !StoreFileInfo.isMobFile(p)) { 1207 return false; 1208 } 1209 1210 try { 1211 return isFile(fs, isDir, p); 1212 } catch (IOException ioe) { 1213 // Maybe the file was moved or the fs was disconnected. 1214 LOG.warn("Skipping file {} due to IOException", p, ioe); 1215 return false; 1216 } 1217 } 1218 } 1219 1220 /** 1221 * Filter for HFileLinks (StoreFiles and HFiles not included). the filter itself does not consider 1222 * if a link is file or not. 1223 */ 1224 public static class HFileLinkFilter implements PathFilter { 1225 1226 @Override 1227 public boolean accept(Path p) { 1228 return HFileLink.isHFileLink(p); 1229 } 1230 } 1231 1232 public static class ReferenceFileFilter extends AbstractFileStatusFilter { 1233 1234 private final FileSystem fs; 1235 1236 public ReferenceFileFilter(FileSystem fs) { 1237 this.fs = fs; 1238 } 1239 1240 @Override 1241 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 1242 if (!StoreFileInfo.isReference(p)) { 1243 return false; 1244 } 1245 1246 try { 1247 // only files can be references. 1248 return isFile(fs, isDir, p); 1249 } catch (IOException ioe) { 1250 // Maybe the file was moved or the fs was disconnected. 1251 LOG.warn("Skipping file {} due to IOException", p, ioe); 1252 return false; 1253 } 1254 } 1255 } 1256 1257 /** 1258 * Called every so-often by storefile map builder getTableStoreFilePathMap to report progress. 1259 */ 1260 interface ProgressReporter { 1261 /** 1262 * @param status File or directory we are about to process. 1263 */ 1264 void progress(FileStatus status); 1265 } 1266 1267 /** 1268 * Runs through the HBase rootdir/tablename and creates a reverse lookup map for table StoreFile 1269 * names to the full Path. <br> 1270 * Example...<br> 1271 * Key = 3944417774205889744 <br> 1272 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1273 * @param map map to add values. If null, this method will create and populate one to 1274 * return 1275 * @param fs The file system to use. 1276 * @param hbaseRootDir The root directory to scan. 1277 * @param tableName name of the table to scan. 1278 * @return Map keyed by StoreFile name with a value of the full Path. 1279 * @throws IOException When scanning the directory fails. 1280 */ 1281 public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map, 1282 final FileSystem fs, final Path hbaseRootDir, TableName tableName) 1283 throws IOException, InterruptedException { 1284 return getTableStoreFilePathMap(map, fs, hbaseRootDir, tableName, null, null, 1285 (ProgressReporter) null); 1286 } 1287 1288 /** 1289 * Runs through the HBase rootdir/tablename and creates a reverse lookup map for table StoreFile 1290 * names to the full Path. Note that because this method can be called on a 'live' HBase system 1291 * that we will skip files that no longer exist by the time we traverse them and similarly the 1292 * user of the result needs to consider that some entries in this map may not exist by the time 1293 * this call completes. <br> 1294 * Example...<br> 1295 * Key = 3944417774205889744 <br> 1296 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1297 * @param resultMap map to add values. If null, this method will create and populate one to 1298 * return 1299 * @param fs The file system to use. 1300 * @param hbaseRootDir The root directory to scan. 1301 * @param tableName name of the table to scan. 1302 * @param sfFilter optional path filter to apply to store files 1303 * @param executor optional executor service to parallelize this operation 1304 * @param progressReporter Instance or null; gets called every time we move to new region of 1305 * family dir and for each store file. 1306 * @return Map keyed by StoreFile name with a value of the full Path. 1307 * @throws IOException When scanning the directory fails. 1308 * @deprecated Since 2.3.0. For removal in hbase4. Use ProgressReporter override instead. 1309 */ 1310 @Deprecated 1311 public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap, 1312 final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter, 1313 ExecutorService executor, final HbckErrorReporter progressReporter) 1314 throws IOException, InterruptedException { 1315 return getTableStoreFilePathMap(resultMap, fs, hbaseRootDir, tableName, sfFilter, executor, 1316 new ProgressReporter() { 1317 @Override 1318 public void progress(FileStatus status) { 1319 // status is not used in this implementation. 1320 progressReporter.progress(); 1321 } 1322 }); 1323 } 1324 1325 /** 1326 * Runs through the HBase rootdir/tablename and creates a reverse lookup map for table StoreFile 1327 * names to the full Path. Note that because this method can be called on a 'live' HBase system 1328 * that we will skip files that no longer exist by the time we traverse them and similarly the 1329 * user of the result needs to consider that some entries in this map may not exist by the time 1330 * this call completes. <br> 1331 * Example...<br> 1332 * Key = 3944417774205889744 <br> 1333 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1334 * @param resultMap map to add values. If null, this method will create and populate one to 1335 * return 1336 * @param fs The file system to use. 1337 * @param hbaseRootDir The root directory to scan. 1338 * @param tableName name of the table to scan. 1339 * @param sfFilter optional path filter to apply to store files 1340 * @param executor optional executor service to parallelize this operation 1341 * @param progressReporter Instance or null; gets called every time we move to new region of 1342 * family dir and for each store file. 1343 * @return Map keyed by StoreFile name with a value of the full Path. 1344 * @throws IOException When scanning the directory fails. 1345 * @throws InterruptedException the thread is interrupted, either before or during the activity. 1346 */ 1347 public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap, 1348 final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter, 1349 ExecutorService executor, final ProgressReporter progressReporter) 1350 throws IOException, InterruptedException { 1351 1352 final Map<String, Path> finalResultMap = 1353 resultMap == null ? new ConcurrentHashMap<>(128, 0.75f, 32) : resultMap; 1354 1355 // only include the directory paths to tables 1356 Path tableDir = CommonFSUtils.getTableDir(hbaseRootDir, tableName); 1357 // Inside a table, there are compaction.dir directories to skip. Otherwise, all else 1358 // should be regions. 1359 final FamilyDirFilter familyFilter = new FamilyDirFilter(fs); 1360 final Vector<Exception> exceptions = new Vector<>(); 1361 1362 try { 1363 List<FileStatus> regionDirs = 1364 FSUtils.listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs)); 1365 if (regionDirs == null) { 1366 return finalResultMap; 1367 } 1368 1369 final List<Future<?>> futures = new ArrayList<>(regionDirs.size()); 1370 1371 for (FileStatus regionDir : regionDirs) { 1372 if (null != progressReporter) { 1373 progressReporter.progress(regionDir); 1374 } 1375 final Path dd = regionDir.getPath(); 1376 1377 if (!exceptions.isEmpty()) { 1378 break; 1379 } 1380 1381 Runnable getRegionStoreFileMapCall = new Runnable() { 1382 @Override 1383 public void run() { 1384 try { 1385 HashMap<String, Path> regionStoreFileMap = new HashMap<>(); 1386 List<FileStatus> familyDirs = 1387 FSUtils.listStatusWithStatusFilter(fs, dd, familyFilter); 1388 if (familyDirs == null) { 1389 if (!fs.exists(dd)) { 1390 LOG.warn("Skipping region because it no longer exists: " + dd); 1391 } else { 1392 LOG.warn("Skipping region because it has no family dirs: " + dd); 1393 } 1394 return; 1395 } 1396 for (FileStatus familyDir : familyDirs) { 1397 if (null != progressReporter) { 1398 progressReporter.progress(familyDir); 1399 } 1400 Path family = familyDir.getPath(); 1401 if (family.getName().equals(HConstants.RECOVERED_EDITS_DIR)) { 1402 continue; 1403 } 1404 // now in family, iterate over the StoreFiles and 1405 // put in map 1406 FileStatus[] familyStatus = fs.listStatus(family); 1407 for (FileStatus sfStatus : familyStatus) { 1408 if (null != progressReporter) { 1409 progressReporter.progress(sfStatus); 1410 } 1411 Path sf = sfStatus.getPath(); 1412 if (sfFilter == null || sfFilter.accept(sf)) { 1413 regionStoreFileMap.put(sf.getName(), sf); 1414 } 1415 } 1416 } 1417 finalResultMap.putAll(regionStoreFileMap); 1418 } catch (Exception e) { 1419 LOG.error("Could not get region store file map for region: " + dd, e); 1420 exceptions.add(e); 1421 } 1422 } 1423 }; 1424 1425 // If executor is available, submit async tasks to exec concurrently, otherwise 1426 // just do serial sync execution 1427 if (executor != null) { 1428 Future<?> future = executor.submit(getRegionStoreFileMapCall); 1429 futures.add(future); 1430 } else { 1431 FutureTask<?> future = new FutureTask<>(getRegionStoreFileMapCall, null); 1432 future.run(); 1433 futures.add(future); 1434 } 1435 } 1436 1437 // Ensure all pending tasks are complete (or that we run into an exception) 1438 for (Future<?> f : futures) { 1439 if (!exceptions.isEmpty()) { 1440 break; 1441 } 1442 try { 1443 f.get(); 1444 } catch (ExecutionException e) { 1445 LOG.error("Unexpected exec exception! Should've been caught already. (Bug?)", e); 1446 // Shouldn't happen, we already logged/caught any exceptions in the Runnable 1447 } 1448 } 1449 } catch (IOException e) { 1450 LOG.error("Cannot execute getTableStoreFilePathMap for " + tableName, e); 1451 exceptions.add(e); 1452 } finally { 1453 if (!exceptions.isEmpty()) { 1454 // Just throw the first exception as an indication something bad happened 1455 // Don't need to propagate all the exceptions, we already logged them all anyway 1456 Throwables.propagateIfPossible(exceptions.firstElement(), IOException.class); 1457 throw new IOException(exceptions.firstElement()); 1458 } 1459 } 1460 1461 return finalResultMap; 1462 } 1463 1464 public static int getRegionReferenceFileCount(final FileSystem fs, final Path p) { 1465 int result = 0; 1466 try { 1467 for (Path familyDir : getFamilyDirs(fs, p)) { 1468 result += getReferenceFilePaths(fs, familyDir).size(); 1469 } 1470 } catch (IOException e) { 1471 LOG.warn("Error counting reference files", e); 1472 } 1473 return result; 1474 } 1475 1476 /** 1477 * Runs through the HBase rootdir and creates a reverse lookup map for table StoreFile names to 1478 * the full Path. <br> 1479 * Example...<br> 1480 * Key = 3944417774205889744 <br> 1481 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1482 * @param fs The file system to use. 1483 * @param hbaseRootDir The root directory to scan. 1484 * @return Map keyed by StoreFile name with a value of the full Path. 1485 * @throws IOException When scanning the directory fails. 1486 */ 1487 public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs, 1488 final Path hbaseRootDir) throws IOException, InterruptedException { 1489 return getTableStoreFilePathMap(fs, hbaseRootDir, null, null, (ProgressReporter) null); 1490 } 1491 1492 /** 1493 * Runs through the HBase rootdir and creates a reverse lookup map for table StoreFile names to 1494 * the full Path. <br> 1495 * Example...<br> 1496 * Key = 3944417774205889744 <br> 1497 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1498 * @param fs The file system to use. 1499 * @param hbaseRootDir The root directory to scan. 1500 * @param sfFilter optional path filter to apply to store files 1501 * @param executor optional executor service to parallelize this operation 1502 * @param progressReporter Instance or null; gets called every time we move to new region of 1503 * family dir and for each store file. 1504 * @return Map keyed by StoreFile name with a value of the full Path. 1505 * @throws IOException When scanning the directory fails. 1506 * @deprecated Since 2.3.0. Will be removed in hbase4. Used 1507 * {@link #getTableStoreFilePathMap(FileSystem, Path, PathFilter, ExecutorService, ProgressReporter)} 1508 */ 1509 @Deprecated 1510 public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs, 1511 final Path hbaseRootDir, PathFilter sfFilter, ExecutorService executor, 1512 HbckErrorReporter progressReporter) throws IOException, InterruptedException { 1513 return getTableStoreFilePathMap(fs, hbaseRootDir, sfFilter, executor, new ProgressReporter() { 1514 @Override 1515 public void progress(FileStatus status) { 1516 // status is not used in this implementation. 1517 progressReporter.progress(); 1518 } 1519 }); 1520 } 1521 1522 /** 1523 * Runs through the HBase rootdir and creates a reverse lookup map for table StoreFile names to 1524 * the full Path. <br> 1525 * Example...<br> 1526 * Key = 3944417774205889744 <br> 1527 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1528 * @param fs The file system to use. 1529 * @param hbaseRootDir The root directory to scan. 1530 * @param sfFilter optional path filter to apply to store files 1531 * @param executor optional executor service to parallelize this operation 1532 * @param progressReporter Instance or null; gets called every time we move to new region of 1533 * family dir and for each store file. 1534 * @return Map keyed by StoreFile name with a value of the full Path. 1535 * @throws IOException When scanning the directory fails. 1536 */ 1537 public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs, 1538 final Path hbaseRootDir, PathFilter sfFilter, ExecutorService executor, 1539 ProgressReporter progressReporter) throws IOException, InterruptedException { 1540 ConcurrentHashMap<String, Path> map = new ConcurrentHashMap<>(1024, 0.75f, 32); 1541 1542 // if this method looks similar to 'getTableFragmentation' that is because 1543 // it was borrowed from it. 1544 1545 // only include the directory paths to tables 1546 for (Path tableDir : FSUtils.getTableDirs(fs, hbaseRootDir)) { 1547 getTableStoreFilePathMap(map, fs, hbaseRootDir, CommonFSUtils.getTableName(tableDir), 1548 sfFilter, executor, progressReporter); 1549 } 1550 return map; 1551 } 1552 1553 /** 1554 * Filters FileStatuses in an array and returns a list 1555 * @param input An array of FileStatuses 1556 * @param filter A required filter to filter the array 1557 * @return A list of FileStatuses 1558 */ 1559 public static List<FileStatus> filterFileStatuses(FileStatus[] input, FileStatusFilter filter) { 1560 if (input == null) return null; 1561 return filterFileStatuses(Iterators.forArray(input), filter); 1562 } 1563 1564 /** 1565 * Filters FileStatuses in an iterator and returns a list 1566 * @param input An iterator of FileStatuses 1567 * @param filter A required filter to filter the array 1568 * @return A list of FileStatuses 1569 */ 1570 public static List<FileStatus> filterFileStatuses(Iterator<FileStatus> input, 1571 FileStatusFilter filter) { 1572 if (input == null) return null; 1573 ArrayList<FileStatus> results = new ArrayList<>(); 1574 while (input.hasNext()) { 1575 FileStatus f = input.next(); 1576 if (filter.accept(f)) { 1577 results.add(f); 1578 } 1579 } 1580 return results; 1581 } 1582 1583 /** 1584 * Calls fs.listStatus() and treats FileNotFoundException as non-fatal This accommodates 1585 * differences between hadoop versions, where hadoop 1 does not throw a FileNotFoundException, and 1586 * return an empty FileStatus[] while Hadoop 2 will throw FileNotFoundException. 1587 * @param fs file system 1588 * @param dir directory 1589 * @param filter file status filter 1590 * @return null if dir is empty or doesn't exist, otherwise FileStatus list 1591 */ 1592 public static List<FileStatus> listStatusWithStatusFilter(final FileSystem fs, final Path dir, 1593 final FileStatusFilter filter) throws IOException { 1594 FileStatus[] status = null; 1595 try { 1596 status = fs.listStatus(dir); 1597 } catch (FileNotFoundException fnfe) { 1598 LOG.trace("{} does not exist", dir); 1599 return null; 1600 } 1601 1602 if (ArrayUtils.getLength(status) == 0) { 1603 return null; 1604 } 1605 1606 if (filter == null) { 1607 return Arrays.asList(status); 1608 } else { 1609 List<FileStatus> status2 = filterFileStatuses(status, filter); 1610 if (status2 == null || status2.isEmpty()) { 1611 return null; 1612 } else { 1613 return status2; 1614 } 1615 } 1616 } 1617 1618 /** 1619 * This function is to scan the root path of the file system to get the degree of locality for 1620 * each region on each of the servers having at least one block of that region. This is used by 1621 * the tool {@link org.apache.hadoop.hbase.master.RegionPlacementMaintainer} the configuration to 1622 * use 1623 * @return the mapping from region encoded name to a map of server names to locality fraction in 1624 * case of file system errors or interrupts 1625 */ 1626 public static Map<String, Map<String, Float>> 1627 getRegionDegreeLocalityMappingFromFS(final Configuration conf) throws IOException { 1628 return getRegionDegreeLocalityMappingFromFS(conf, null, 1629 conf.getInt(THREAD_POOLSIZE, DEFAULT_THREAD_POOLSIZE)); 1630 1631 } 1632 1633 /** 1634 * This function is to scan the root path of the file system to get the degree of locality for 1635 * each region on each of the servers having at least one block of that region. the configuration 1636 * to use the table you wish to scan locality for the thread pool size to use 1637 * @return the mapping from region encoded name to a map of server names to locality fraction in 1638 * case of file system errors or interrupts 1639 */ 1640 public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS( 1641 final Configuration conf, final String desiredTable, int threadPoolSize) throws IOException { 1642 Map<String, Map<String, Float>> regionDegreeLocalityMapping = new ConcurrentHashMap<>(); 1643 getRegionLocalityMappingFromFS(conf, desiredTable, threadPoolSize, regionDegreeLocalityMapping); 1644 return regionDegreeLocalityMapping; 1645 } 1646 1647 /** 1648 * This function is to scan the root path of the file system to get either the mapping between the 1649 * region name and its best locality region server or the degree of locality of each region on 1650 * each of the servers having at least one block of that region. The output map parameters are 1651 * both optional. the configuration to use the table you wish to scan locality for the thread pool 1652 * size to use the map into which to put the locality degree mapping or null, must be a 1653 * thread-safe implementation in case of file system errors or interrupts 1654 */ 1655 private static void getRegionLocalityMappingFromFS(final Configuration conf, 1656 final String desiredTable, int threadPoolSize, 1657 final Map<String, Map<String, Float>> regionDegreeLocalityMapping) throws IOException { 1658 final FileSystem fs = FileSystem.get(conf); 1659 final Path rootPath = CommonFSUtils.getRootDir(conf); 1660 final long startTime = EnvironmentEdgeManager.currentTime(); 1661 final Path queryPath; 1662 // The table files are in ${hbase.rootdir}/data/<namespace>/<table>/* 1663 if (null == desiredTable) { 1664 queryPath = 1665 new Path(new Path(rootPath, HConstants.BASE_NAMESPACE_DIR).toString() + "/*/*/*/"); 1666 } else { 1667 queryPath = new Path( 1668 CommonFSUtils.getTableDir(rootPath, TableName.valueOf(desiredTable)).toString() + "/*/"); 1669 } 1670 1671 // reject all paths that are not appropriate 1672 PathFilter pathFilter = new PathFilter() { 1673 @Override 1674 public boolean accept(Path path) { 1675 // this is the region name; it may get some noise data 1676 if (null == path) { 1677 return false; 1678 } 1679 1680 // no parent? 1681 Path parent = path.getParent(); 1682 if (null == parent) { 1683 return false; 1684 } 1685 1686 String regionName = path.getName(); 1687 if (null == regionName) { 1688 return false; 1689 } 1690 1691 if (!regionName.toLowerCase(Locale.ROOT).matches("[0-9a-f]+")) { 1692 return false; 1693 } 1694 return true; 1695 } 1696 }; 1697 1698 FileStatus[] statusList = fs.globStatus(queryPath, pathFilter); 1699 1700 if (LOG.isDebugEnabled()) { 1701 LOG.debug("Query Path: {} ; # list of files: {}", queryPath, Arrays.toString(statusList)); 1702 } 1703 1704 if (null == statusList) { 1705 return; 1706 } 1707 1708 // lower the number of threads in case we have very few expected regions 1709 threadPoolSize = Math.min(threadPoolSize, statusList.length); 1710 1711 // run in multiple threads 1712 final ExecutorService tpe = Executors.newFixedThreadPool(threadPoolSize, 1713 new ThreadFactoryBuilder().setNameFormat("FSRegionQuery-pool-%d").setDaemon(true) 1714 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 1715 try { 1716 // ignore all file status items that are not of interest 1717 for (FileStatus regionStatus : statusList) { 1718 if (null == regionStatus || !regionStatus.isDirectory()) { 1719 continue; 1720 } 1721 1722 final Path regionPath = regionStatus.getPath(); 1723 if (null != regionPath) { 1724 tpe.execute(new FSRegionScanner(fs, regionPath, null, regionDegreeLocalityMapping)); 1725 } 1726 } 1727 } finally { 1728 tpe.shutdown(); 1729 final long threadWakeFrequency = (long) conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 1730 HConstants.DEFAULT_THREAD_WAKE_FREQUENCY); 1731 try { 1732 // here we wait until TPE terminates, which is either naturally or by 1733 // exceptions in the execution of the threads 1734 while (!tpe.awaitTermination(threadWakeFrequency, TimeUnit.MILLISECONDS)) { 1735 // printing out rough estimate, so as to not introduce 1736 // AtomicInteger 1737 LOG.info("Locality checking is underway: { Scanned Regions : " 1738 + ((ThreadPoolExecutor) tpe).getCompletedTaskCount() + "/" 1739 + ((ThreadPoolExecutor) tpe).getTaskCount() + " }"); 1740 } 1741 } catch (InterruptedException e) { 1742 Thread.currentThread().interrupt(); 1743 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 1744 } 1745 } 1746 1747 long overhead = EnvironmentEdgeManager.currentTime() - startTime; 1748 LOG.info("Scan DFS for locality info takes {}ms", overhead); 1749 } 1750 1751 /** 1752 * Do our short circuit read setup. Checks buffer size to use and whether to do checksumming in 1753 * hbase or hdfs. 1754 */ 1755 public static void setupShortCircuitRead(final Configuration conf) { 1756 // Check that the user has not set the "dfs.client.read.shortcircuit.skip.checksum" property. 1757 boolean shortCircuitSkipChecksum = 1758 conf.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false); 1759 boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true); 1760 if (shortCircuitSkipChecksum) { 1761 LOG.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not " 1762 + "be set to true." 1763 + (useHBaseChecksum 1764 ? " HBase checksum doesn't require " 1765 + "it, see https://issues.apache.org/jira/browse/HBASE-6868." 1766 : "")); 1767 assert !shortCircuitSkipChecksum; // this will fail if assertions are on 1768 } 1769 checkShortCircuitReadBufferSize(conf); 1770 } 1771 1772 /** 1773 * Check if short circuit read buffer size is set and if not, set it to hbase value. 1774 */ 1775 public static void checkShortCircuitReadBufferSize(final Configuration conf) { 1776 final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2; 1777 final int notSet = -1; 1778 // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2 1779 final String dfsKey = "dfs.client.read.shortcircuit.buffer.size"; 1780 int size = conf.getInt(dfsKey, notSet); 1781 // If a size is set, return -- we will use it. 1782 if (size != notSet) return; 1783 // But short circuit buffer size is normally not set. Put in place the hbase wanted size. 1784 int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize); 1785 conf.setIfUnset(dfsKey, Integer.toString(hbaseSize)); 1786 } 1787 1788 /** 1789 * Returns The DFSClient DFSHedgedReadMetrics instance or null if can't be found or not on hdfs. 1790 */ 1791 public static DFSHedgedReadMetrics getDFSHedgedReadMetrics(final Configuration c) 1792 throws IOException { 1793 if (!CommonFSUtils.isHDFS(c)) { 1794 return null; 1795 } 1796 // getHedgedReadMetrics is package private. Get the DFSClient instance that is internal 1797 // to the DFS FS instance and make the method getHedgedReadMetrics accessible, then invoke it 1798 // to get the singleton instance of DFSHedgedReadMetrics shared by DFSClients. 1799 final String name = "getHedgedReadMetrics"; 1800 DFSClient dfsclient = ((DistributedFileSystem) FileSystem.get(c)).getClient(); 1801 Method m; 1802 try { 1803 m = dfsclient.getClass().getDeclaredMethod(name); 1804 } catch (NoSuchMethodException e) { 1805 LOG.warn( 1806 "Failed find method " + name + " in dfsclient; no hedged read metrics: " + e.getMessage()); 1807 return null; 1808 } catch (SecurityException e) { 1809 LOG.warn( 1810 "Failed find method " + name + " in dfsclient; no hedged read metrics: " + e.getMessage()); 1811 return null; 1812 } 1813 m.setAccessible(true); 1814 try { 1815 return (DFSHedgedReadMetrics) m.invoke(dfsclient); 1816 } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { 1817 LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " 1818 + e.getMessage()); 1819 return null; 1820 } 1821 } 1822 1823 public static List<Path> copyFilesParallel(FileSystem srcFS, Path src, FileSystem dstFS, Path dst, 1824 Configuration conf, int threads) throws IOException { 1825 ExecutorService pool = Executors.newFixedThreadPool(threads); 1826 List<Future<Void>> futures = new ArrayList<>(); 1827 List<Path> traversedPaths; 1828 try { 1829 traversedPaths = copyFiles(srcFS, src, dstFS, dst, conf, pool, futures); 1830 for (Future<Void> future : futures) { 1831 future.get(); 1832 } 1833 } catch (ExecutionException | InterruptedException | IOException e) { 1834 throw new IOException("Copy snapshot reference files failed", e); 1835 } finally { 1836 pool.shutdownNow(); 1837 } 1838 return traversedPaths; 1839 } 1840 1841 private static List<Path> copyFiles(FileSystem srcFS, Path src, FileSystem dstFS, Path dst, 1842 Configuration conf, ExecutorService pool, List<Future<Void>> futures) throws IOException { 1843 List<Path> traversedPaths = new ArrayList<>(); 1844 traversedPaths.add(dst); 1845 FileStatus currentFileStatus = srcFS.getFileStatus(src); 1846 if (currentFileStatus.isDirectory()) { 1847 if (!dstFS.mkdirs(dst)) { 1848 throw new IOException("Create directory failed: " + dst); 1849 } 1850 FileStatus[] subPaths = srcFS.listStatus(src); 1851 for (FileStatus subPath : subPaths) { 1852 traversedPaths.addAll(copyFiles(srcFS, subPath.getPath(), dstFS, 1853 new Path(dst, subPath.getPath().getName()), conf, pool, futures)); 1854 } 1855 } else { 1856 Future<Void> future = pool.submit(() -> { 1857 FileUtil.copy(srcFS, src, dstFS, dst, false, false, conf); 1858 return null; 1859 }); 1860 futures.add(future); 1861 } 1862 return traversedPaths; 1863 } 1864 1865 /** Returns A set containing all namenode addresses of fs */ 1866 private static Set<InetSocketAddress> getNNAddresses(DistributedFileSystem fs, 1867 Configuration conf) { 1868 Set<InetSocketAddress> addresses = new HashSet<>(); 1869 String serviceName = fs.getCanonicalServiceName(); 1870 1871 if (serviceName.startsWith("ha-hdfs")) { 1872 try { 1873 Map<String, Map<String, InetSocketAddress>> addressMap = 1874 DFSUtil.getNNServiceRpcAddressesForCluster(conf); 1875 String nameService = serviceName.substring(serviceName.indexOf(":") + 1); 1876 if (addressMap.containsKey(nameService)) { 1877 Map<String, InetSocketAddress> nnMap = addressMap.get(nameService); 1878 for (Map.Entry<String, InetSocketAddress> e2 : nnMap.entrySet()) { 1879 InetSocketAddress addr = e2.getValue(); 1880 addresses.add(addr); 1881 } 1882 } 1883 } catch (Exception e) { 1884 LOG.warn("DFSUtil.getNNServiceRpcAddresses failed. serviceName=" + serviceName, e); 1885 } 1886 } else { 1887 URI uri = fs.getUri(); 1888 int port = uri.getPort(); 1889 if (port < 0) { 1890 int idx = serviceName.indexOf(':'); 1891 port = Integer.parseInt(serviceName.substring(idx + 1)); 1892 } 1893 InetSocketAddress addr = new InetSocketAddress(uri.getHost(), port); 1894 addresses.add(addr); 1895 } 1896 1897 return addresses; 1898 } 1899 1900 /** 1901 * @param conf the Configuration of HBase 1902 * @return Whether srcFs and desFs are on same hdfs or not 1903 */ 1904 public static boolean isSameHdfs(Configuration conf, FileSystem srcFs, FileSystem desFs) { 1905 // By getCanonicalServiceName, we could make sure both srcFs and desFs 1906 // show a unified format which contains scheme, host and port. 1907 String srcServiceName = srcFs.getCanonicalServiceName(); 1908 String desServiceName = desFs.getCanonicalServiceName(); 1909 1910 if (srcServiceName == null || desServiceName == null) { 1911 return false; 1912 } 1913 if (srcServiceName.equals(desServiceName)) { 1914 return true; 1915 } 1916 if (srcServiceName.startsWith("ha-hdfs") && desServiceName.startsWith("ha-hdfs")) { 1917 Collection<String> internalNameServices = 1918 conf.getTrimmedStringCollection("dfs.internal.nameservices"); 1919 if (!internalNameServices.isEmpty()) { 1920 if (internalNameServices.contains(srcServiceName.split(":")[1])) { 1921 return true; 1922 } else { 1923 return false; 1924 } 1925 } 1926 } 1927 if (srcFs instanceof DistributedFileSystem && desFs instanceof DistributedFileSystem) { 1928 // If one serviceName is an HA format while the other is a non-HA format, 1929 // maybe they refer to the same FileSystem. 1930 // For example, srcFs is "ha-hdfs://nameservices" and desFs is "hdfs://activeNamenode:port" 1931 Set<InetSocketAddress> srcAddrs = getNNAddresses((DistributedFileSystem) srcFs, conf); 1932 Set<InetSocketAddress> desAddrs = getNNAddresses((DistributedFileSystem) desFs, conf); 1933 if (Sets.intersection(srcAddrs, desAddrs).size() > 0) { 1934 return true; 1935 } 1936 } 1937 1938 return false; 1939 } 1940}