001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.util; 020 021import edu.umd.cs.findbugs.annotations.CheckForNull; 022import java.io.ByteArrayInputStream; 023import java.io.DataInputStream; 024import java.io.EOFException; 025import java.io.FileNotFoundException; 026import java.io.IOException; 027import java.io.InterruptedIOException; 028import java.lang.reflect.InvocationTargetException; 029import java.lang.reflect.Method; 030import java.net.InetSocketAddress; 031import java.util.ArrayList; 032import java.util.Arrays; 033import java.util.Collections; 034import java.util.HashMap; 035import java.util.Iterator; 036import java.util.List; 037import java.util.Locale; 038import java.util.Map; 039import java.util.Vector; 040import java.util.concurrent.ConcurrentHashMap; 041import java.util.concurrent.ExecutionException; 042import java.util.concurrent.ExecutorService; 043import java.util.concurrent.Executors; 044import java.util.concurrent.Future; 045import java.util.concurrent.FutureTask; 046import java.util.concurrent.ThreadPoolExecutor; 047import java.util.concurrent.TimeUnit; 048import java.util.regex.Pattern; 049 050import org.apache.commons.lang3.ArrayUtils; 051import org.apache.hadoop.conf.Configuration; 052import org.apache.hadoop.fs.BlockLocation; 053import org.apache.hadoop.fs.FSDataInputStream; 054import org.apache.hadoop.fs.FSDataOutputStream; 055import org.apache.hadoop.fs.FileStatus; 056import org.apache.hadoop.fs.FileSystem; 057import org.apache.hadoop.fs.FileUtil; 058import org.apache.hadoop.fs.Path; 059import org.apache.hadoop.fs.PathFilter; 060import org.apache.hadoop.fs.permission.FsAction; 061import org.apache.hadoop.fs.permission.FsPermission; 062import org.apache.hadoop.hbase.ClusterId; 063import org.apache.hadoop.hbase.HColumnDescriptor; 064import org.apache.hadoop.hbase.HConstants; 065import org.apache.hadoop.hbase.HDFSBlocksDistribution; 066import org.apache.hadoop.hbase.HRegionInfo; 067import org.apache.hadoop.hbase.TableName; 068import org.apache.hadoop.hbase.client.RegionInfo; 069import org.apache.hadoop.hbase.client.RegionInfoBuilder; 070import org.apache.hadoop.hbase.exceptions.DeserializationException; 071import org.apache.hadoop.hbase.fs.HFileSystem; 072import org.apache.hadoop.hbase.io.HFileLink; 073import org.apache.hadoop.hbase.master.HMaster; 074import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 075import org.apache.hadoop.hbase.security.AccessDeniedException; 076import org.apache.hadoop.hdfs.DFSClient; 077import org.apache.hadoop.hdfs.DFSHedgedReadMetrics; 078import org.apache.hadoop.hdfs.DistributedFileSystem; 079import org.apache.hadoop.hdfs.protocol.HdfsConstants; 080import org.apache.hadoop.io.IOUtils; 081import org.apache.hadoop.ipc.RemoteException; 082import org.apache.hadoop.security.UserGroupInformation; 083import org.apache.hadoop.util.Progressable; 084import org.apache.hadoop.util.ReflectionUtils; 085import org.apache.hadoop.util.StringUtils; 086import org.apache.yetus.audience.InterfaceAudience; 087import org.slf4j.Logger; 088import org.slf4j.LoggerFactory; 089 090import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 091import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 092import org.apache.hbase.thirdparty.com.google.common.collect.Iterators; 093import org.apache.hbase.thirdparty.com.google.common.primitives.Ints; 094 095import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 096import org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos; 097 098/** 099 * Utility methods for interacting with the underlying file system. 100 */ 101@InterfaceAudience.Private 102public abstract class FSUtils extends CommonFSUtils { 103 private static final Logger LOG = LoggerFactory.getLogger(FSUtils.class); 104 105 private static final String THREAD_POOLSIZE = "hbase.client.localityCheck.threadPoolSize"; 106 private static final int DEFAULT_THREAD_POOLSIZE = 2; 107 108 /** Set to true on Windows platforms */ 109 @VisibleForTesting // currently only used in testing. TODO refactor into a test class 110 public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows"); 111 112 protected FSUtils() { 113 super(); 114 } 115 116 /** 117 * @return True is <code>fs</code> is instance of DistributedFileSystem 118 * @throws IOException 119 */ 120 public static boolean isDistributedFileSystem(final FileSystem fs) throws IOException { 121 FileSystem fileSystem = fs; 122 // If passed an instance of HFileSystem, it fails instanceof DistributedFileSystem. 123 // Check its backing fs for dfs-ness. 124 if (fs instanceof HFileSystem) { 125 fileSystem = ((HFileSystem)fs).getBackingFs(); 126 } 127 return fileSystem instanceof DistributedFileSystem; 128 } 129 130 /** 131 * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the 132 * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true. Does not consider 133 * schema; i.e. if schemas different but path or subpath matches, the two will equate. 134 * @param pathToSearch Path we will be trying to match. 135 * @param pathTail 136 * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code> 137 */ 138 public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) { 139 Path tailPath = pathTail; 140 String tailName; 141 Path toSearch = pathToSearch; 142 String toSearchName; 143 boolean result = false; 144 145 if (pathToSearch.depth() != pathTail.depth()) { 146 return false; 147 } 148 149 do { 150 tailName = tailPath.getName(); 151 if (tailName == null || tailName.isEmpty()) { 152 result = true; 153 break; 154 } 155 toSearchName = toSearch.getName(); 156 if (toSearchName == null || toSearchName.isEmpty()) { 157 break; 158 } 159 // Move up a parent on each path for next go around. Path doesn't let us go off the end. 160 tailPath = tailPath.getParent(); 161 toSearch = toSearch.getParent(); 162 } while(tailName.equals(toSearchName)); 163 return result; 164 } 165 166 public static FSUtils getInstance(FileSystem fs, Configuration conf) { 167 String scheme = fs.getUri().getScheme(); 168 if (scheme == null) { 169 LOG.warn("Could not find scheme for uri " + 170 fs.getUri() + ", default to hdfs"); 171 scheme = "hdfs"; 172 } 173 Class<?> fsUtilsClass = conf.getClass("hbase.fsutil." + 174 scheme + ".impl", FSHDFSUtils.class); // Default to HDFS impl 175 FSUtils fsUtils = (FSUtils)ReflectionUtils.newInstance(fsUtilsClass, conf); 176 return fsUtils; 177 } 178 179 /** 180 * Delete the region directory if exists. 181 * @param conf 182 * @param hri 183 * @return True if deleted the region directory. 184 * @throws IOException 185 */ 186 public static boolean deleteRegionDir(final Configuration conf, final HRegionInfo hri) 187 throws IOException { 188 Path rootDir = getRootDir(conf); 189 FileSystem fs = rootDir.getFileSystem(conf); 190 return deleteDirectory(fs, 191 new Path(getTableDir(rootDir, hri.getTable()), hri.getEncodedName())); 192 } 193 194 /** 195 * Create the specified file on the filesystem. By default, this will: 196 * <ol> 197 * <li>overwrite the file if it exists</li> 198 * <li>apply the umask in the configuration (if it is enabled)</li> 199 * <li>use the fs configured buffer size (or 4096 if not set)</li> 200 * <li>use the configured column family replication or default replication if 201 * {@link HColumnDescriptor#DEFAULT_DFS_REPLICATION}</li> 202 * <li>use the default block size</li> 203 * <li>not track progress</li> 204 * </ol> 205 * @param conf configurations 206 * @param fs {@link FileSystem} on which to write the file 207 * @param path {@link Path} to the file to write 208 * @param perm permissions 209 * @param favoredNodes 210 * @return output stream to the created file 211 * @throws IOException if the file cannot be created 212 */ 213 public static FSDataOutputStream create(Configuration conf, FileSystem fs, Path path, 214 FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException { 215 if (fs instanceof HFileSystem) { 216 FileSystem backingFs = ((HFileSystem)fs).getBackingFs(); 217 if (backingFs instanceof DistributedFileSystem) { 218 // Try to use the favoredNodes version via reflection to allow backwards- 219 // compatibility. 220 short replication = Short.parseShort(conf.get(HColumnDescriptor.DFS_REPLICATION, 221 String.valueOf(HColumnDescriptor.DEFAULT_DFS_REPLICATION))); 222 try { 223 return (FSDataOutputStream) (DistributedFileSystem.class.getDeclaredMethod("create", 224 Path.class, FsPermission.class, boolean.class, int.class, short.class, long.class, 225 Progressable.class, InetSocketAddress[].class).invoke(backingFs, path, perm, true, 226 getDefaultBufferSize(backingFs), 227 replication > 0 ? replication : getDefaultReplication(backingFs, path), 228 getDefaultBlockSize(backingFs, path), null, favoredNodes)); 229 } catch (InvocationTargetException ite) { 230 // Function was properly called, but threw it's own exception. 231 throw new IOException(ite.getCause()); 232 } catch (NoSuchMethodException e) { 233 LOG.debug("DFS Client does not support most favored nodes create; using default create"); 234 LOG.trace("Ignoring; use default create", e); 235 } catch (IllegalArgumentException | SecurityException | IllegalAccessException e) { 236 LOG.debug("Ignoring (most likely Reflection related exception) " + e); 237 } 238 } 239 } 240 return create(fs, path, perm, true); 241 } 242 243 /** 244 * Checks to see if the specified file system is available 245 * 246 * @param fs filesystem 247 * @throws IOException e 248 */ 249 public static void checkFileSystemAvailable(final FileSystem fs) 250 throws IOException { 251 if (!(fs instanceof DistributedFileSystem)) { 252 return; 253 } 254 IOException exception = null; 255 DistributedFileSystem dfs = (DistributedFileSystem) fs; 256 try { 257 if (dfs.exists(new Path("/"))) { 258 return; 259 } 260 } catch (IOException e) { 261 exception = e instanceof RemoteException ? 262 ((RemoteException)e).unwrapRemoteException() : e; 263 } 264 try { 265 fs.close(); 266 } catch (Exception e) { 267 LOG.error("file system close failed: ", e); 268 } 269 IOException io = new IOException("File system is not available"); 270 io.initCause(exception); 271 throw io; 272 } 273 274 /** 275 * We use reflection because {@link DistributedFileSystem#setSafeMode( 276 * HdfsConstants.SafeModeAction action, boolean isChecked)} is not in hadoop 1.1 277 * 278 * @param dfs 279 * @return whether we're in safe mode 280 * @throws IOException 281 */ 282 private static boolean isInSafeMode(DistributedFileSystem dfs) throws IOException { 283 boolean inSafeMode = false; 284 try { 285 Method m = DistributedFileSystem.class.getMethod("setSafeMode", new Class<?> []{ 286 org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.class, boolean.class}); 287 inSafeMode = (Boolean) m.invoke(dfs, 288 org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET, true); 289 } catch (Exception e) { 290 if (e instanceof IOException) throw (IOException) e; 291 292 // Check whether dfs is on safemode. 293 inSafeMode = dfs.setSafeMode( 294 org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET); 295 } 296 return inSafeMode; 297 } 298 299 /** 300 * Check whether dfs is in safemode. 301 * @param conf 302 * @throws IOException 303 */ 304 public static void checkDfsSafeMode(final Configuration conf) 305 throws IOException { 306 boolean isInSafeMode = false; 307 FileSystem fs = FileSystem.get(conf); 308 if (fs instanceof DistributedFileSystem) { 309 DistributedFileSystem dfs = (DistributedFileSystem)fs; 310 isInSafeMode = isInSafeMode(dfs); 311 } 312 if (isInSafeMode) { 313 throw new IOException("File system is in safemode, it can't be written now"); 314 } 315 } 316 317 /** 318 * Verifies current version of file system 319 * 320 * @param fs filesystem object 321 * @param rootdir root hbase directory 322 * @return null if no version file exists, version string otherwise 323 * @throws IOException if the version file fails to open 324 * @throws DeserializationException if the version data cannot be translated into a version 325 */ 326 public static String getVersion(FileSystem fs, Path rootdir) 327 throws IOException, DeserializationException { 328 final Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME); 329 FileStatus[] status = null; 330 try { 331 // hadoop 2.0 throws FNFE if directory does not exist. 332 // hadoop 1.0 returns null if directory does not exist. 333 status = fs.listStatus(versionFile); 334 } catch (FileNotFoundException fnfe) { 335 return null; 336 } 337 if (ArrayUtils.getLength(status) == 0) { 338 return null; 339 } 340 String version = null; 341 byte [] content = new byte [(int)status[0].getLen()]; 342 FSDataInputStream s = fs.open(versionFile); 343 try { 344 IOUtils.readFully(s, content, 0, content.length); 345 if (ProtobufUtil.isPBMagicPrefix(content)) { 346 version = parseVersionFrom(content); 347 } else { 348 // Presume it pre-pb format. 349 try (DataInputStream dis = new DataInputStream(new ByteArrayInputStream(content))) { 350 version = dis.readUTF(); 351 } 352 } 353 } catch (EOFException eof) { 354 LOG.warn("Version file was empty, odd, will try to set it."); 355 } finally { 356 s.close(); 357 } 358 return version; 359 } 360 361 /** 362 * Parse the content of the ${HBASE_ROOTDIR}/hbase.version file. 363 * @param bytes The byte content of the hbase.version file 364 * @return The version found in the file as a String 365 * @throws DeserializationException if the version data cannot be translated into a version 366 */ 367 static String parseVersionFrom(final byte [] bytes) 368 throws DeserializationException { 369 ProtobufUtil.expectPBMagicPrefix(bytes); 370 int pblen = ProtobufUtil.lengthOfPBMagic(); 371 FSProtos.HBaseVersionFileContent.Builder builder = 372 FSProtos.HBaseVersionFileContent.newBuilder(); 373 try { 374 ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen); 375 return builder.getVersion(); 376 } catch (IOException e) { 377 // Convert 378 throw new DeserializationException(e); 379 } 380 } 381 382 /** 383 * Create the content to write into the ${HBASE_ROOTDIR}/hbase.version file. 384 * @param version Version to persist 385 * @return Serialized protobuf with <code>version</code> content and a bit of pb magic for a prefix. 386 */ 387 static byte [] toVersionByteArray(final String version) { 388 FSProtos.HBaseVersionFileContent.Builder builder = 389 FSProtos.HBaseVersionFileContent.newBuilder(); 390 return ProtobufUtil.prependPBMagic(builder.setVersion(version).build().toByteArray()); 391 } 392 393 /** 394 * Verifies current version of file system 395 * 396 * @param fs file system 397 * @param rootdir root directory of HBase installation 398 * @param message if true, issues a message on System.out 399 * @throws IOException if the version file cannot be opened 400 * @throws DeserializationException if the contents of the version file cannot be parsed 401 */ 402 public static void checkVersion(FileSystem fs, Path rootdir, boolean message) 403 throws IOException, DeserializationException { 404 checkVersion(fs, rootdir, message, 0, HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS); 405 } 406 407 /** 408 * Verifies current version of file system 409 * 410 * @param fs file system 411 * @param rootdir root directory of HBase installation 412 * @param message if true, issues a message on System.out 413 * @param wait wait interval 414 * @param retries number of times to retry 415 * 416 * @throws IOException if the version file cannot be opened 417 * @throws DeserializationException if the contents of the version file cannot be parsed 418 */ 419 public static void checkVersion(FileSystem fs, Path rootdir, 420 boolean message, int wait, int retries) 421 throws IOException, DeserializationException { 422 String version = getVersion(fs, rootdir); 423 String msg; 424 if (version == null) { 425 if (!metaRegionExists(fs, rootdir)) { 426 // rootDir is empty (no version file and no root region) 427 // just create new version file (HBASE-1195) 428 setVersion(fs, rootdir, wait, retries); 429 return; 430 } else { 431 msg = "hbase.version file is missing. Is your hbase.rootdir valid? " + 432 "You can restore hbase.version file by running 'HBCK2 filesystem -fix'. " + 433 "See https://github.com/apache/hbase-operator-tools/tree/master/hbase-hbck2"; 434 } 435 } else if (version.compareTo(HConstants.FILE_SYSTEM_VERSION) == 0) { 436 return; 437 } else { 438 msg = "HBase file layout needs to be upgraded. Current filesystem version is " + version + 439 " but software requires version " + HConstants.FILE_SYSTEM_VERSION + 440 ". Consult http://hbase.apache.org/book.html for further information about " + 441 "upgrading HBase."; 442 } 443 444 // version is deprecated require migration 445 // Output on stdout so user sees it in terminal. 446 if (message) { 447 System.out.println("WARNING! " + msg); 448 } 449 throw new FileSystemVersionException(msg); 450 } 451 452 /** 453 * Sets version of file system 454 * 455 * @param fs filesystem object 456 * @param rootdir hbase root 457 * @throws IOException e 458 */ 459 public static void setVersion(FileSystem fs, Path rootdir) 460 throws IOException { 461 setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, 0, 462 HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS); 463 } 464 465 /** 466 * Sets version of file system 467 * 468 * @param fs filesystem object 469 * @param rootdir hbase root 470 * @param wait time to wait for retry 471 * @param retries number of times to retry before failing 472 * @throws IOException e 473 */ 474 public static void setVersion(FileSystem fs, Path rootdir, int wait, int retries) 475 throws IOException { 476 setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, wait, retries); 477 } 478 479 480 /** 481 * Sets version of file system 482 * 483 * @param fs filesystem object 484 * @param rootdir hbase root directory 485 * @param version version to set 486 * @param wait time to wait for retry 487 * @param retries number of times to retry before throwing an IOException 488 * @throws IOException e 489 */ 490 public static void setVersion(FileSystem fs, Path rootdir, String version, 491 int wait, int retries) throws IOException { 492 Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME); 493 Path tempVersionFile = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY + Path.SEPARATOR + 494 HConstants.VERSION_FILE_NAME); 495 while (true) { 496 try { 497 // Write the version to a temporary file 498 FSDataOutputStream s = fs.create(tempVersionFile); 499 try { 500 s.write(toVersionByteArray(version)); 501 s.close(); 502 s = null; 503 // Move the temp version file to its normal location. Returns false 504 // if the rename failed. Throw an IOE in that case. 505 if (!fs.rename(tempVersionFile, versionFile)) { 506 throw new IOException("Unable to move temp version file to " + versionFile); 507 } 508 } finally { 509 // Cleaning up the temporary if the rename failed would be trying 510 // too hard. We'll unconditionally create it again the next time 511 // through anyway, files are overwritten by default by create(). 512 513 // Attempt to close the stream on the way out if it is still open. 514 try { 515 if (s != null) s.close(); 516 } catch (IOException ignore) { } 517 } 518 LOG.info("Created version file at " + rootdir.toString() + " with version=" + version); 519 return; 520 } catch (IOException e) { 521 if (retries > 0) { 522 LOG.debug("Unable to create version file at " + rootdir.toString() + ", retrying", e); 523 fs.delete(versionFile, false); 524 try { 525 if (wait > 0) { 526 Thread.sleep(wait); 527 } 528 } catch (InterruptedException ie) { 529 throw (InterruptedIOException)new InterruptedIOException().initCause(ie); 530 } 531 retries--; 532 } else { 533 throw e; 534 } 535 } 536 } 537 } 538 539 /** 540 * Checks that a cluster ID file exists in the HBase root directory 541 * @param fs the root directory FileSystem 542 * @param rootdir the HBase root directory in HDFS 543 * @param wait how long to wait between retries 544 * @return <code>true</code> if the file exists, otherwise <code>false</code> 545 * @throws IOException if checking the FileSystem fails 546 */ 547 public static boolean checkClusterIdExists(FileSystem fs, Path rootdir, 548 long wait) throws IOException { 549 while (true) { 550 try { 551 Path filePath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME); 552 return fs.exists(filePath); 553 } catch (IOException ioe) { 554 if (wait > 0L) { 555 LOG.warn("Unable to check cluster ID file in {}, retrying in {}ms", rootdir, wait, ioe); 556 try { 557 Thread.sleep(wait); 558 } catch (InterruptedException e) { 559 Thread.currentThread().interrupt(); 560 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 561 } 562 } else { 563 throw ioe; 564 } 565 } 566 } 567 } 568 569 /** 570 * Returns the value of the unique cluster ID stored for this HBase instance. 571 * @param fs the root directory FileSystem 572 * @param rootdir the path to the HBase root directory 573 * @return the unique cluster identifier 574 * @throws IOException if reading the cluster ID file fails 575 */ 576 public static ClusterId getClusterId(FileSystem fs, Path rootdir) 577 throws IOException { 578 Path idPath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME); 579 ClusterId clusterId = null; 580 FileStatus status = fs.exists(idPath)? fs.getFileStatus(idPath): null; 581 if (status != null) { 582 int len = Ints.checkedCast(status.getLen()); 583 byte [] content = new byte[len]; 584 FSDataInputStream in = fs.open(idPath); 585 try { 586 in.readFully(content); 587 } catch (EOFException eof) { 588 LOG.warn("Cluster ID file {} is empty", idPath); 589 } finally{ 590 in.close(); 591 } 592 try { 593 clusterId = ClusterId.parseFrom(content); 594 } catch (DeserializationException e) { 595 throw new IOException("content=" + Bytes.toString(content), e); 596 } 597 // If not pb'd, make it so. 598 if (!ProtobufUtil.isPBMagicPrefix(content)) { 599 String cid = null; 600 in = fs.open(idPath); 601 try { 602 cid = in.readUTF(); 603 clusterId = new ClusterId(cid); 604 } catch (EOFException eof) { 605 LOG.warn("Cluster ID file {} is empty", idPath); 606 } finally { 607 in.close(); 608 } 609 rewriteAsPb(fs, rootdir, idPath, clusterId); 610 } 611 return clusterId; 612 } else { 613 LOG.warn("Cluster ID file does not exist at {}", idPath); 614 } 615 return clusterId; 616 } 617 618 /** 619 * @param cid 620 * @throws IOException 621 */ 622 private static void rewriteAsPb(final FileSystem fs, final Path rootdir, final Path p, 623 final ClusterId cid) 624 throws IOException { 625 // Rewrite the file as pb. Move aside the old one first, write new 626 // then delete the moved-aside file. 627 Path movedAsideName = new Path(p + "." + System.currentTimeMillis()); 628 if (!fs.rename(p, movedAsideName)) throw new IOException("Failed rename of " + p); 629 setClusterId(fs, rootdir, cid, 100); 630 if (!fs.delete(movedAsideName, false)) { 631 throw new IOException("Failed delete of " + movedAsideName); 632 } 633 LOG.debug("Rewrote the hbase.id file as pb"); 634 } 635 636 /** 637 * Writes a new unique identifier for this cluster to the "hbase.id" file 638 * in the HBase root directory 639 * @param fs the root directory FileSystem 640 * @param rootdir the path to the HBase root directory 641 * @param clusterId the unique identifier to store 642 * @param wait how long (in milliseconds) to wait between retries 643 * @throws IOException if writing to the FileSystem fails and no wait value 644 */ 645 public static void setClusterId(FileSystem fs, Path rootdir, ClusterId clusterId, 646 int wait) throws IOException { 647 while (true) { 648 try { 649 Path idFile = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME); 650 Path tempIdFile = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY + 651 Path.SEPARATOR + HConstants.CLUSTER_ID_FILE_NAME); 652 // Write the id file to a temporary location 653 FSDataOutputStream s = fs.create(tempIdFile); 654 try { 655 s.write(clusterId.toByteArray()); 656 s.close(); 657 s = null; 658 // Move the temporary file to its normal location. Throw an IOE if 659 // the rename failed 660 if (!fs.rename(tempIdFile, idFile)) { 661 throw new IOException("Unable to move temp version file to " + idFile); 662 } 663 } finally { 664 // Attempt to close the stream if still open on the way out 665 try { 666 if (s != null) s.close(); 667 } catch (IOException ignore) { } 668 } 669 if (LOG.isDebugEnabled()) { 670 LOG.debug("Created cluster ID file at " + idFile.toString() + " with ID: " + clusterId); 671 } 672 return; 673 } catch (IOException ioe) { 674 if (wait > 0) { 675 LOG.warn("Unable to create cluster ID file in " + rootdir.toString() + 676 ", retrying in " + wait + "msec: " + StringUtils.stringifyException(ioe)); 677 try { 678 Thread.sleep(wait); 679 } catch (InterruptedException e) { 680 throw (InterruptedIOException)new InterruptedIOException().initCause(e); 681 } 682 } else { 683 throw ioe; 684 } 685 } 686 } 687 } 688 689 /** 690 * If DFS, check safe mode and if so, wait until we clear it. 691 * @param conf configuration 692 * @param wait Sleep between retries 693 * @throws IOException e 694 */ 695 public static void waitOnSafeMode(final Configuration conf, 696 final long wait) 697 throws IOException { 698 FileSystem fs = FileSystem.get(conf); 699 if (!(fs instanceof DistributedFileSystem)) return; 700 DistributedFileSystem dfs = (DistributedFileSystem)fs; 701 // Make sure dfs is not in safe mode 702 while (isInSafeMode(dfs)) { 703 LOG.info("Waiting for dfs to exit safe mode..."); 704 try { 705 Thread.sleep(wait); 706 } catch (InterruptedException e) { 707 Thread.currentThread().interrupt(); 708 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 709 } 710 } 711 } 712 713 /** 714 * Checks if meta region exists 715 * @param fs file system 716 * @param rootDir root directory of HBase installation 717 * @return true if exists 718 */ 719 public static boolean metaRegionExists(FileSystem fs, Path rootDir) throws IOException { 720 Path metaRegionDir = getRegionDirFromRootDir(rootDir, RegionInfoBuilder.FIRST_META_REGIONINFO); 721 return fs.exists(metaRegionDir); 722 } 723 724 /** 725 * Compute HDFS blocks distribution of a given file, or a portion of the file 726 * @param fs file system 727 * @param status file status of the file 728 * @param start start position of the portion 729 * @param length length of the portion 730 * @return The HDFS blocks distribution 731 */ 732 static public HDFSBlocksDistribution computeHDFSBlocksDistribution( 733 final FileSystem fs, FileStatus status, long start, long length) 734 throws IOException { 735 HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution(); 736 BlockLocation [] blockLocations = 737 fs.getFileBlockLocations(status, start, length); 738 for(BlockLocation bl : blockLocations) { 739 String [] hosts = bl.getHosts(); 740 long len = bl.getLength(); 741 blocksDistribution.addHostsAndBlockWeight(hosts, len); 742 } 743 744 return blocksDistribution; 745 } 746 747 /** 748 * Update blocksDistribution with blockLocations 749 * @param blocksDistribution the hdfs blocks distribution 750 * @param blockLocations an array containing block location 751 */ 752 static public void addToHDFSBlocksDistribution( 753 HDFSBlocksDistribution blocksDistribution, BlockLocation[] blockLocations) 754 throws IOException { 755 for (BlockLocation bl : blockLocations) { 756 String[] hosts = bl.getHosts(); 757 long len = bl.getLength(); 758 blocksDistribution.addHostsAndBlockWeight(hosts, len); 759 } 760 } 761 762 // TODO move this method OUT of FSUtils. No dependencies to HMaster 763 /** 764 * Returns the total overall fragmentation percentage. Includes hbase:meta and 765 * -ROOT- as well. 766 * 767 * @param master The master defining the HBase root and file system 768 * @return A map for each table and its percentage (never null) 769 * @throws IOException When scanning the directory fails 770 */ 771 public static int getTotalTableFragmentation(final HMaster master) 772 throws IOException { 773 Map<String, Integer> map = getTableFragmentation(master); 774 return map.isEmpty() ? -1 : map.get("-TOTAL-"); 775 } 776 777 /** 778 * Runs through the HBase rootdir and checks how many stores for each table 779 * have more than one file in them. Checks -ROOT- and hbase:meta too. The total 780 * percentage across all tables is stored under the special key "-TOTAL-". 781 * 782 * @param master The master defining the HBase root and file system. 783 * @return A map for each table and its percentage (never null). 784 * 785 * @throws IOException When scanning the directory fails. 786 */ 787 public static Map<String, Integer> getTableFragmentation( 788 final HMaster master) 789 throws IOException { 790 Path path = getRootDir(master.getConfiguration()); 791 // since HMaster.getFileSystem() is package private 792 FileSystem fs = path.getFileSystem(master.getConfiguration()); 793 return getTableFragmentation(fs, path); 794 } 795 796 /** 797 * Runs through the HBase rootdir and checks how many stores for each table 798 * have more than one file in them. Checks -ROOT- and hbase:meta too. The total 799 * percentage across all tables is stored under the special key "-TOTAL-". 800 * 801 * @param fs The file system to use 802 * @param hbaseRootDir The root directory to scan 803 * @return A map for each table and its percentage (never null) 804 * @throws IOException When scanning the directory fails 805 */ 806 public static Map<String, Integer> getTableFragmentation( 807 final FileSystem fs, final Path hbaseRootDir) 808 throws IOException { 809 Map<String, Integer> frags = new HashMap<>(); 810 int cfCountTotal = 0; 811 int cfFragTotal = 0; 812 PathFilter regionFilter = new RegionDirFilter(fs); 813 PathFilter familyFilter = new FamilyDirFilter(fs); 814 List<Path> tableDirs = getTableDirs(fs, hbaseRootDir); 815 for (Path d : tableDirs) { 816 int cfCount = 0; 817 int cfFrag = 0; 818 FileStatus[] regionDirs = fs.listStatus(d, regionFilter); 819 for (FileStatus regionDir : regionDirs) { 820 Path dd = regionDir.getPath(); 821 // else its a region name, now look in region for families 822 FileStatus[] familyDirs = fs.listStatus(dd, familyFilter); 823 for (FileStatus familyDir : familyDirs) { 824 cfCount++; 825 cfCountTotal++; 826 Path family = familyDir.getPath(); 827 // now in family make sure only one file 828 FileStatus[] familyStatus = fs.listStatus(family); 829 if (familyStatus.length > 1) { 830 cfFrag++; 831 cfFragTotal++; 832 } 833 } 834 } 835 // compute percentage per table and store in result list 836 frags.put(FSUtils.getTableName(d).getNameAsString(), 837 cfCount == 0? 0: Math.round((float) cfFrag / cfCount * 100)); 838 } 839 // set overall percentage for all tables 840 frags.put("-TOTAL-", 841 cfCountTotal == 0? 0: Math.round((float) cfFragTotal / cfCountTotal * 100)); 842 return frags; 843 } 844 845 /** 846 * A {@link PathFilter} that returns only regular files. 847 */ 848 static class FileFilter extends AbstractFileStatusFilter { 849 private final FileSystem fs; 850 851 public FileFilter(final FileSystem fs) { 852 this.fs = fs; 853 } 854 855 @Override 856 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 857 try { 858 return isFile(fs, isDir, p); 859 } catch (IOException e) { 860 LOG.warn("Unable to verify if path={} is a regular file", p, e); 861 return false; 862 } 863 } 864 } 865 866 /** 867 * Directory filter that doesn't include any of the directories in the specified blacklist 868 */ 869 public static class BlackListDirFilter extends AbstractFileStatusFilter { 870 private final FileSystem fs; 871 private List<String> blacklist; 872 873 /** 874 * Create a filter on the givem filesystem with the specified blacklist 875 * @param fs filesystem to filter 876 * @param directoryNameBlackList list of the names of the directories to filter. If 877 * <tt>null</tt>, all directories are returned 878 */ 879 @SuppressWarnings("unchecked") 880 public BlackListDirFilter(final FileSystem fs, final List<String> directoryNameBlackList) { 881 this.fs = fs; 882 blacklist = 883 (List<String>) (directoryNameBlackList == null ? Collections.emptyList() 884 : directoryNameBlackList); 885 } 886 887 @Override 888 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 889 if (!isValidName(p.getName())) { 890 return false; 891 } 892 893 try { 894 return isDirectory(fs, isDir, p); 895 } catch (IOException e) { 896 LOG.warn("An error occurred while verifying if [{}] is a valid directory." 897 + " Returning 'not valid' and continuing.", p, e); 898 return false; 899 } 900 } 901 902 protected boolean isValidName(final String name) { 903 return !blacklist.contains(name); 904 } 905 } 906 907 /** 908 * A {@link PathFilter} that only allows directories. 909 */ 910 public static class DirFilter extends BlackListDirFilter { 911 912 public DirFilter(FileSystem fs) { 913 super(fs, null); 914 } 915 } 916 917 /** 918 * A {@link PathFilter} that returns usertable directories. To get all directories use the 919 * {@link BlackListDirFilter} with a <tt>null</tt> blacklist 920 */ 921 public static class UserTableDirFilter extends BlackListDirFilter { 922 public UserTableDirFilter(FileSystem fs) { 923 super(fs, HConstants.HBASE_NON_TABLE_DIRS); 924 } 925 926 @Override 927 protected boolean isValidName(final String name) { 928 if (!super.isValidName(name)) 929 return false; 930 931 try { 932 TableName.isLegalTableQualifierName(Bytes.toBytes(name)); 933 } catch (IllegalArgumentException e) { 934 LOG.info("Invalid table name: {}", name); 935 return false; 936 } 937 return true; 938 } 939 } 940 941 /** 942 * Recover file lease. Used when a file might be suspect 943 * to be had been left open by another process. 944 * @param fs FileSystem handle 945 * @param p Path of file to recover lease 946 * @param conf Configuration handle 947 * @throws IOException 948 */ 949 public abstract void recoverFileLease(final FileSystem fs, final Path p, 950 Configuration conf, CancelableProgressable reporter) throws IOException; 951 952 public static List<Path> getTableDirs(final FileSystem fs, final Path rootdir) 953 throws IOException { 954 List<Path> tableDirs = new ArrayList<>(); 955 956 for (FileStatus status : fs 957 .globStatus(new Path(rootdir, new Path(HConstants.BASE_NAMESPACE_DIR, "*")))) { 958 tableDirs.addAll(FSUtils.getLocalTableDirs(fs, status.getPath())); 959 } 960 return tableDirs; 961 } 962 963 /** 964 * @param fs 965 * @param rootdir 966 * @return All the table directories under <code>rootdir</code>. Ignore non table hbase folders such as 967 * .logs, .oldlogs, .corrupt folders. 968 * @throws IOException 969 */ 970 public static List<Path> getLocalTableDirs(final FileSystem fs, final Path rootdir) 971 throws IOException { 972 // presumes any directory under hbase.rootdir is a table 973 FileStatus[] dirs = fs.listStatus(rootdir, new UserTableDirFilter(fs)); 974 List<Path> tabledirs = new ArrayList<>(dirs.length); 975 for (FileStatus dir: dirs) { 976 tabledirs.add(dir.getPath()); 977 } 978 return tabledirs; 979 } 980 981 /** 982 * Filter for all dirs that don't start with '.' 983 */ 984 public static class RegionDirFilter extends AbstractFileStatusFilter { 985 // This pattern will accept 0.90+ style hex region dirs and older numeric region dir names. 986 final public static Pattern regionDirPattern = Pattern.compile("^[0-9a-f]*$"); 987 final FileSystem fs; 988 989 public RegionDirFilter(FileSystem fs) { 990 this.fs = fs; 991 } 992 993 @Override 994 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 995 if (!regionDirPattern.matcher(p.getName()).matches()) { 996 return false; 997 } 998 999 try { 1000 return isDirectory(fs, isDir, p); 1001 } catch (IOException ioe) { 1002 // Maybe the file was moved or the fs was disconnected. 1003 LOG.warn("Skipping file {} due to IOException", p, ioe); 1004 return false; 1005 } 1006 } 1007 } 1008 1009 /** 1010 * Given a particular table dir, return all the regiondirs inside it, excluding files such as 1011 * .tableinfo 1012 * @param fs A file system for the Path 1013 * @param tableDir Path to a specific table directory <hbase.rootdir>/<tabledir> 1014 * @return List of paths to valid region directories in table dir. 1015 * @throws IOException 1016 */ 1017 public static List<Path> getRegionDirs(final FileSystem fs, final Path tableDir) throws IOException { 1018 // assumes we are in a table dir. 1019 List<FileStatus> rds = listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs)); 1020 if (rds == null) { 1021 return Collections.emptyList(); 1022 } 1023 List<Path> regionDirs = new ArrayList<>(rds.size()); 1024 for (FileStatus rdfs: rds) { 1025 Path rdPath = rdfs.getPath(); 1026 regionDirs.add(rdPath); 1027 } 1028 return regionDirs; 1029 } 1030 1031 public static Path getRegionDirFromRootDir(Path rootDir, RegionInfo region) { 1032 return getRegionDirFromTableDir(getTableDir(rootDir, region.getTable()), region); 1033 } 1034 1035 public static Path getRegionDirFromTableDir(Path tableDir, RegionInfo region) { 1036 return new Path(tableDir, ServerRegionReplicaUtil.getRegionInfoForFs(region).getEncodedName()); 1037 } 1038 1039 /** 1040 * Filter for all dirs that are legal column family names. This is generally used for colfam 1041 * dirs <hbase.rootdir>/<tabledir>/<regiondir>/<colfamdir>. 1042 */ 1043 public static class FamilyDirFilter extends AbstractFileStatusFilter { 1044 final FileSystem fs; 1045 1046 public FamilyDirFilter(FileSystem fs) { 1047 this.fs = fs; 1048 } 1049 1050 @Override 1051 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 1052 try { 1053 // throws IAE if invalid 1054 HColumnDescriptor.isLegalFamilyName(Bytes.toBytes(p.getName())); 1055 } catch (IllegalArgumentException iae) { 1056 // path name is an invalid family name and thus is excluded. 1057 return false; 1058 } 1059 1060 try { 1061 return isDirectory(fs, isDir, p); 1062 } catch (IOException ioe) { 1063 // Maybe the file was moved or the fs was disconnected. 1064 LOG.warn("Skipping file {} due to IOException", p, ioe); 1065 return false; 1066 } 1067 } 1068 } 1069 1070 /** 1071 * Given a particular region dir, return all the familydirs inside it 1072 * 1073 * @param fs A file system for the Path 1074 * @param regionDir Path to a specific region directory 1075 * @return List of paths to valid family directories in region dir. 1076 * @throws IOException 1077 */ 1078 public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir) throws IOException { 1079 // assumes we are in a region dir. 1080 FileStatus[] fds = fs.listStatus(regionDir, new FamilyDirFilter(fs)); 1081 List<Path> familyDirs = new ArrayList<>(fds.length); 1082 for (FileStatus fdfs: fds) { 1083 Path fdPath = fdfs.getPath(); 1084 familyDirs.add(fdPath); 1085 } 1086 return familyDirs; 1087 } 1088 1089 public static List<Path> getReferenceFilePaths(final FileSystem fs, final Path familyDir) throws IOException { 1090 List<FileStatus> fds = listStatusWithStatusFilter(fs, familyDir, new ReferenceFileFilter(fs)); 1091 if (fds == null) { 1092 return Collections.emptyList(); 1093 } 1094 List<Path> referenceFiles = new ArrayList<>(fds.size()); 1095 for (FileStatus fdfs: fds) { 1096 Path fdPath = fdfs.getPath(); 1097 referenceFiles.add(fdPath); 1098 } 1099 return referenceFiles; 1100 } 1101 1102 /** 1103 * Filter for HFiles that excludes reference files. 1104 */ 1105 public static class HFileFilter extends AbstractFileStatusFilter { 1106 final FileSystem fs; 1107 1108 public HFileFilter(FileSystem fs) { 1109 this.fs = fs; 1110 } 1111 1112 @Override 1113 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 1114 if (!StoreFileInfo.isHFile(p)) { 1115 return false; 1116 } 1117 1118 try { 1119 return isFile(fs, isDir, p); 1120 } catch (IOException ioe) { 1121 // Maybe the file was moved or the fs was disconnected. 1122 LOG.warn("Skipping file {} due to IOException", p, ioe); 1123 return false; 1124 } 1125 } 1126 } 1127 1128 /** 1129 * Filter for HFileLinks (StoreFiles and HFiles not included). 1130 * the filter itself does not consider if a link is file or not. 1131 */ 1132 public static class HFileLinkFilter implements PathFilter { 1133 1134 @Override 1135 public boolean accept(Path p) { 1136 return HFileLink.isHFileLink(p); 1137 } 1138 } 1139 1140 public static class ReferenceFileFilter extends AbstractFileStatusFilter { 1141 1142 private final FileSystem fs; 1143 1144 public ReferenceFileFilter(FileSystem fs) { 1145 this.fs = fs; 1146 } 1147 1148 @Override 1149 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 1150 if (!StoreFileInfo.isReference(p)) { 1151 return false; 1152 } 1153 1154 try { 1155 // only files can be references. 1156 return isFile(fs, isDir, p); 1157 } catch (IOException ioe) { 1158 // Maybe the file was moved or the fs was disconnected. 1159 LOG.warn("Skipping file {} due to IOException", p, ioe); 1160 return false; 1161 } 1162 } 1163 } 1164 1165 /** 1166 * Called every so-often by storefile map builder getTableStoreFilePathMap to 1167 * report progress. 1168 */ 1169 interface ProgressReporter { 1170 /** 1171 * @param status File or directory we are about to process. 1172 */ 1173 void progress(FileStatus status); 1174 } 1175 1176 /** 1177 * Runs through the HBase rootdir/tablename and creates a reverse lookup map for 1178 * table StoreFile names to the full Path. 1179 * <br> 1180 * Example...<br> 1181 * Key = 3944417774205889744 <br> 1182 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1183 * 1184 * @param map map to add values. If null, this method will create and populate one to return 1185 * @param fs The file system to use. 1186 * @param hbaseRootDir The root directory to scan. 1187 * @param tableName name of the table to scan. 1188 * @return Map keyed by StoreFile name with a value of the full Path. 1189 * @throws IOException When scanning the directory fails. 1190 * @throws InterruptedException 1191 */ 1192 public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map, 1193 final FileSystem fs, final Path hbaseRootDir, TableName tableName) 1194 throws IOException, InterruptedException { 1195 return getTableStoreFilePathMap(map, fs, hbaseRootDir, tableName, null, null, 1196 (ProgressReporter)null); 1197 } 1198 1199 /** 1200 * Runs through the HBase rootdir/tablename and creates a reverse lookup map for 1201 * table StoreFile names to the full Path. Note that because this method can be called 1202 * on a 'live' HBase system that we will skip files that no longer exist by the time 1203 * we traverse them and similarly the user of the result needs to consider that some 1204 * entries in this map may not exist by the time this call completes. 1205 * <br> 1206 * Example...<br> 1207 * Key = 3944417774205889744 <br> 1208 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1209 * 1210 * @param resultMap map to add values. If null, this method will create and populate one to return 1211 * @param fs The file system to use. 1212 * @param hbaseRootDir The root directory to scan. 1213 * @param tableName name of the table to scan. 1214 * @param sfFilter optional path filter to apply to store files 1215 * @param executor optional executor service to parallelize this operation 1216 * @param progressReporter Instance or null; gets called every time we move to new region of 1217 * family dir and for each store file. 1218 * @return Map keyed by StoreFile name with a value of the full Path. 1219 * @throws IOException When scanning the directory fails. 1220 * @deprecated Since 2.3.0. For removal in hbase4. Use ProgressReporter override instead. 1221 */ 1222 @Deprecated 1223 public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap, 1224 final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter, 1225 ExecutorService executor, final HbckErrorReporter progressReporter) 1226 throws IOException, InterruptedException { 1227 return getTableStoreFilePathMap(resultMap, fs, hbaseRootDir, tableName, sfFilter, executor, 1228 new ProgressReporter() { 1229 @Override 1230 public void progress(FileStatus status) { 1231 // status is not used in this implementation. 1232 progressReporter.progress(); 1233 } 1234 }); 1235 } 1236 1237 /** 1238 * Runs through the HBase rootdir/tablename and creates a reverse lookup map for 1239 * table StoreFile names to the full Path. Note that because this method can be called 1240 * on a 'live' HBase system that we will skip files that no longer exist by the time 1241 * we traverse them and similarly the user of the result needs to consider that some 1242 * entries in this map may not exist by the time this call completes. 1243 * <br> 1244 * Example...<br> 1245 * Key = 3944417774205889744 <br> 1246 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1247 * 1248 * @param resultMap map to add values. If null, this method will create and populate one 1249 * to return 1250 * @param fs The file system to use. 1251 * @param hbaseRootDir The root directory to scan. 1252 * @param tableName name of the table to scan. 1253 * @param sfFilter optional path filter to apply to store files 1254 * @param executor optional executor service to parallelize this operation 1255 * @param progressReporter Instance or null; gets called every time we move to new region of 1256 * family dir and for each store file. 1257 * @return Map keyed by StoreFile name with a value of the full Path. 1258 * @throws IOException When scanning the directory fails. 1259 * @throws InterruptedException the thread is interrupted, either before or during the activity. 1260 */ 1261 public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap, 1262 final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter, 1263 ExecutorService executor, final ProgressReporter progressReporter) 1264 throws IOException, InterruptedException { 1265 1266 final Map<String, Path> finalResultMap = 1267 resultMap == null ? new ConcurrentHashMap<>(128, 0.75f, 32) : resultMap; 1268 1269 // only include the directory paths to tables 1270 Path tableDir = FSUtils.getTableDir(hbaseRootDir, tableName); 1271 // Inside a table, there are compaction.dir directories to skip. Otherwise, all else 1272 // should be regions. 1273 final FamilyDirFilter familyFilter = new FamilyDirFilter(fs); 1274 final Vector<Exception> exceptions = new Vector<>(); 1275 1276 try { 1277 List<FileStatus> regionDirs = FSUtils.listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs)); 1278 if (regionDirs == null) { 1279 return finalResultMap; 1280 } 1281 1282 final List<Future<?>> futures = new ArrayList<>(regionDirs.size()); 1283 1284 for (FileStatus regionDir : regionDirs) { 1285 if (null != progressReporter) { 1286 progressReporter.progress(regionDir); 1287 } 1288 final Path dd = regionDir.getPath(); 1289 1290 if (!exceptions.isEmpty()) { 1291 break; 1292 } 1293 1294 Runnable getRegionStoreFileMapCall = new Runnable() { 1295 @Override 1296 public void run() { 1297 try { 1298 HashMap<String,Path> regionStoreFileMap = new HashMap<>(); 1299 List<FileStatus> familyDirs = FSUtils.listStatusWithStatusFilter(fs, dd, familyFilter); 1300 if (familyDirs == null) { 1301 if (!fs.exists(dd)) { 1302 LOG.warn("Skipping region because it no longer exists: " + dd); 1303 } else { 1304 LOG.warn("Skipping region because it has no family dirs: " + dd); 1305 } 1306 return; 1307 } 1308 for (FileStatus familyDir : familyDirs) { 1309 if (null != progressReporter) { 1310 progressReporter.progress(familyDir); 1311 } 1312 Path family = familyDir.getPath(); 1313 if (family.getName().equals(HConstants.RECOVERED_EDITS_DIR)) { 1314 continue; 1315 } 1316 // now in family, iterate over the StoreFiles and 1317 // put in map 1318 FileStatus[] familyStatus = fs.listStatus(family); 1319 for (FileStatus sfStatus : familyStatus) { 1320 if (null != progressReporter) { 1321 progressReporter.progress(sfStatus); 1322 } 1323 Path sf = sfStatus.getPath(); 1324 if (sfFilter == null || sfFilter.accept(sf)) { 1325 regionStoreFileMap.put( sf.getName(), sf); 1326 } 1327 } 1328 } 1329 finalResultMap.putAll(regionStoreFileMap); 1330 } catch (Exception e) { 1331 LOG.error("Could not get region store file map for region: " + dd, e); 1332 exceptions.add(e); 1333 } 1334 } 1335 }; 1336 1337 // If executor is available, submit async tasks to exec concurrently, otherwise 1338 // just do serial sync execution 1339 if (executor != null) { 1340 Future<?> future = executor.submit(getRegionStoreFileMapCall); 1341 futures.add(future); 1342 } else { 1343 FutureTask<?> future = new FutureTask<>(getRegionStoreFileMapCall, null); 1344 future.run(); 1345 futures.add(future); 1346 } 1347 } 1348 1349 // Ensure all pending tasks are complete (or that we run into an exception) 1350 for (Future<?> f : futures) { 1351 if (!exceptions.isEmpty()) { 1352 break; 1353 } 1354 try { 1355 f.get(); 1356 } catch (ExecutionException e) { 1357 LOG.error("Unexpected exec exception! Should've been caught already. (Bug?)", e); 1358 // Shouldn't happen, we already logged/caught any exceptions in the Runnable 1359 } 1360 } 1361 } catch (IOException e) { 1362 LOG.error("Cannot execute getTableStoreFilePathMap for " + tableName, e); 1363 exceptions.add(e); 1364 } finally { 1365 if (!exceptions.isEmpty()) { 1366 // Just throw the first exception as an indication something bad happened 1367 // Don't need to propagate all the exceptions, we already logged them all anyway 1368 Throwables.propagateIfInstanceOf(exceptions.firstElement(), IOException.class); 1369 throw Throwables.propagate(exceptions.firstElement()); 1370 } 1371 } 1372 1373 return finalResultMap; 1374 } 1375 1376 public static int getRegionReferenceFileCount(final FileSystem fs, final Path p) { 1377 int result = 0; 1378 try { 1379 for (Path familyDir:getFamilyDirs(fs, p)){ 1380 result += getReferenceFilePaths(fs, familyDir).size(); 1381 } 1382 } catch (IOException e) { 1383 LOG.warn("Error counting reference files", e); 1384 } 1385 return result; 1386 } 1387 1388 /** 1389 * Runs through the HBase rootdir and creates a reverse lookup map for 1390 * table StoreFile names to the full Path. 1391 * <br> 1392 * Example...<br> 1393 * Key = 3944417774205889744 <br> 1394 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1395 * 1396 * @param fs The file system to use. 1397 * @param hbaseRootDir The root directory to scan. 1398 * @return Map keyed by StoreFile name with a value of the full Path. 1399 * @throws IOException When scanning the directory fails. 1400 */ 1401 public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs, 1402 final Path hbaseRootDir) 1403 throws IOException, InterruptedException { 1404 return getTableStoreFilePathMap(fs, hbaseRootDir, null, null, (ProgressReporter)null); 1405 } 1406 1407 /** 1408 * Runs through the HBase rootdir and creates a reverse lookup map for 1409 * table StoreFile names to the full Path. 1410 * <br> 1411 * Example...<br> 1412 * Key = 3944417774205889744 <br> 1413 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1414 * 1415 * @param fs The file system to use. 1416 * @param hbaseRootDir The root directory to scan. 1417 * @param sfFilter optional path filter to apply to store files 1418 * @param executor optional executor service to parallelize this operation 1419 * @param progressReporter Instance or null; gets called every time we move to new region of 1420 * family dir and for each store file. 1421 * @return Map keyed by StoreFile name with a value of the full Path. 1422 * @throws IOException When scanning the directory fails. 1423 * @deprecated Since 2.3.0. Will be removed in hbase4. Used {@link 1424 * #getTableStoreFilePathMap(FileSystem, Path, PathFilter, ExecutorService, ProgressReporter)} 1425 */ 1426 @Deprecated 1427 public static Map<String, Path> getTableStoreFilePathMap(final FileSystem fs, 1428 final Path hbaseRootDir, PathFilter sfFilter, ExecutorService executor, 1429 HbckErrorReporter progressReporter) 1430 throws IOException, InterruptedException { 1431 return getTableStoreFilePathMap(fs, hbaseRootDir, sfFilter, executor, 1432 new ProgressReporter() { 1433 @Override 1434 public void progress(FileStatus status) { 1435 // status is not used in this implementation. 1436 progressReporter.progress(); 1437 } 1438 }); 1439 } 1440 1441 /** 1442 * Runs through the HBase rootdir and creates a reverse lookup map for 1443 * table StoreFile names to the full Path. 1444 * <br> 1445 * Example...<br> 1446 * Key = 3944417774205889744 <br> 1447 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1448 * 1449 * @param fs The file system to use. 1450 * @param hbaseRootDir The root directory to scan. 1451 * @param sfFilter optional path filter to apply to store files 1452 * @param executor optional executor service to parallelize this operation 1453 * @param progressReporter Instance or null; gets called every time we move to new region of 1454 * family dir and for each store file. 1455 * @return Map keyed by StoreFile name with a value of the full Path. 1456 * @throws IOException When scanning the directory fails. 1457 * @throws InterruptedException 1458 */ 1459 public static Map<String, Path> getTableStoreFilePathMap( 1460 final FileSystem fs, final Path hbaseRootDir, PathFilter sfFilter, 1461 ExecutorService executor, ProgressReporter progressReporter) 1462 throws IOException, InterruptedException { 1463 ConcurrentHashMap<String, Path> map = new ConcurrentHashMap<>(1024, 0.75f, 32); 1464 1465 // if this method looks similar to 'getTableFragmentation' that is because 1466 // it was borrowed from it. 1467 1468 // only include the directory paths to tables 1469 for (Path tableDir : FSUtils.getTableDirs(fs, hbaseRootDir)) { 1470 getTableStoreFilePathMap(map, fs, hbaseRootDir, 1471 FSUtils.getTableName(tableDir), sfFilter, executor, progressReporter); 1472 } 1473 return map; 1474 } 1475 1476 /** 1477 * Filters FileStatuses in an array and returns a list 1478 * 1479 * @param input An array of FileStatuses 1480 * @param filter A required filter to filter the array 1481 * @return A list of FileStatuses 1482 */ 1483 public static List<FileStatus> filterFileStatuses(FileStatus[] input, 1484 FileStatusFilter filter) { 1485 if (input == null) return null; 1486 return filterFileStatuses(Iterators.forArray(input), filter); 1487 } 1488 1489 /** 1490 * Filters FileStatuses in an iterator and returns a list 1491 * 1492 * @param input An iterator of FileStatuses 1493 * @param filter A required filter to filter the array 1494 * @return A list of FileStatuses 1495 */ 1496 public static List<FileStatus> filterFileStatuses(Iterator<FileStatus> input, 1497 FileStatusFilter filter) { 1498 if (input == null) return null; 1499 ArrayList<FileStatus> results = new ArrayList<>(); 1500 while (input.hasNext()) { 1501 FileStatus f = input.next(); 1502 if (filter.accept(f)) { 1503 results.add(f); 1504 } 1505 } 1506 return results; 1507 } 1508 1509 /** 1510 * Calls fs.listStatus() and treats FileNotFoundException as non-fatal 1511 * This accommodates differences between hadoop versions, where hadoop 1 1512 * does not throw a FileNotFoundException, and return an empty FileStatus[] 1513 * while Hadoop 2 will throw FileNotFoundException. 1514 * 1515 * @param fs file system 1516 * @param dir directory 1517 * @param filter file status filter 1518 * @return null if dir is empty or doesn't exist, otherwise FileStatus list 1519 */ 1520 public static List<FileStatus> listStatusWithStatusFilter(final FileSystem fs, 1521 final Path dir, final FileStatusFilter filter) throws IOException { 1522 FileStatus [] status = null; 1523 try { 1524 status = fs.listStatus(dir); 1525 } catch (FileNotFoundException fnfe) { 1526 LOG.trace("{} does not exist", dir); 1527 return null; 1528 } 1529 1530 if (ArrayUtils.getLength(status) == 0) { 1531 return null; 1532 } 1533 1534 if (filter == null) { 1535 return Arrays.asList(status); 1536 } else { 1537 List<FileStatus> status2 = filterFileStatuses(status, filter); 1538 if (status2 == null || status2.isEmpty()) { 1539 return null; 1540 } else { 1541 return status2; 1542 } 1543 } 1544 } 1545 1546 /** 1547 * Throw an exception if an action is not permitted by a user on a file. 1548 * 1549 * @param ugi 1550 * the user 1551 * @param file 1552 * the file 1553 * @param action 1554 * the action 1555 */ 1556 public static void checkAccess(UserGroupInformation ugi, FileStatus file, 1557 FsAction action) throws AccessDeniedException { 1558 if (ugi.getShortUserName().equals(file.getOwner())) { 1559 if (file.getPermission().getUserAction().implies(action)) { 1560 return; 1561 } 1562 } else if (ArrayUtils.contains(ugi.getGroupNames(), file.getGroup())) { 1563 if (file.getPermission().getGroupAction().implies(action)) { 1564 return; 1565 } 1566 } else if (file.getPermission().getOtherAction().implies(action)) { 1567 return; 1568 } 1569 throw new AccessDeniedException("Permission denied:" + " action=" + action 1570 + " path=" + file.getPath() + " user=" + ugi.getShortUserName()); 1571 } 1572 1573 /** 1574 * This function is to scan the root path of the file system to get the 1575 * degree of locality for each region on each of the servers having at least 1576 * one block of that region. 1577 * This is used by the tool {@link org.apache.hadoop.hbase.master.RegionPlacementMaintainer} 1578 * 1579 * @param conf 1580 * the configuration to use 1581 * @return the mapping from region encoded name to a map of server names to 1582 * locality fraction 1583 * @throws IOException 1584 * in case of file system errors or interrupts 1585 */ 1586 public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS( 1587 final Configuration conf) throws IOException { 1588 return getRegionDegreeLocalityMappingFromFS( 1589 conf, null, 1590 conf.getInt(THREAD_POOLSIZE, DEFAULT_THREAD_POOLSIZE)); 1591 1592 } 1593 1594 /** 1595 * This function is to scan the root path of the file system to get the 1596 * degree of locality for each region on each of the servers having at least 1597 * one block of that region. 1598 * 1599 * @param conf 1600 * the configuration to use 1601 * @param desiredTable 1602 * the table you wish to scan locality for 1603 * @param threadPoolSize 1604 * the thread pool size to use 1605 * @return the mapping from region encoded name to a map of server names to 1606 * locality fraction 1607 * @throws IOException 1608 * in case of file system errors or interrupts 1609 */ 1610 public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS( 1611 final Configuration conf, final String desiredTable, int threadPoolSize) 1612 throws IOException { 1613 Map<String, Map<String, Float>> regionDegreeLocalityMapping = new ConcurrentHashMap<>(); 1614 getRegionLocalityMappingFromFS(conf, desiredTable, threadPoolSize, regionDegreeLocalityMapping); 1615 return regionDegreeLocalityMapping; 1616 } 1617 1618 /** 1619 * This function is to scan the root path of the file system to get either the 1620 * mapping between the region name and its best locality region server or the 1621 * degree of locality of each region on each of the servers having at least 1622 * one block of that region. The output map parameters are both optional. 1623 * 1624 * @param conf 1625 * the configuration to use 1626 * @param desiredTable 1627 * the table you wish to scan locality for 1628 * @param threadPoolSize 1629 * the thread pool size to use 1630 * @param regionDegreeLocalityMapping 1631 * the map into which to put the locality degree mapping or null, 1632 * must be a thread-safe implementation 1633 * @throws IOException 1634 * in case of file system errors or interrupts 1635 */ 1636 private static void getRegionLocalityMappingFromFS(final Configuration conf, 1637 final String desiredTable, int threadPoolSize, 1638 final Map<String, Map<String, Float>> regionDegreeLocalityMapping) throws IOException { 1639 final FileSystem fs = FileSystem.get(conf); 1640 final Path rootPath = FSUtils.getRootDir(conf); 1641 final long startTime = EnvironmentEdgeManager.currentTime(); 1642 final Path queryPath; 1643 // The table files are in ${hbase.rootdir}/data/<namespace>/<table>/* 1644 if (null == desiredTable) { 1645 queryPath = new Path(new Path(rootPath, HConstants.BASE_NAMESPACE_DIR).toString() + "/*/*/*/"); 1646 } else { 1647 queryPath = new Path(FSUtils.getTableDir(rootPath, TableName.valueOf(desiredTable)).toString() + "/*/"); 1648 } 1649 1650 // reject all paths that are not appropriate 1651 PathFilter pathFilter = new PathFilter() { 1652 @Override 1653 public boolean accept(Path path) { 1654 // this is the region name; it may get some noise data 1655 if (null == path) { 1656 return false; 1657 } 1658 1659 // no parent? 1660 Path parent = path.getParent(); 1661 if (null == parent) { 1662 return false; 1663 } 1664 1665 String regionName = path.getName(); 1666 if (null == regionName) { 1667 return false; 1668 } 1669 1670 if (!regionName.toLowerCase(Locale.ROOT).matches("[0-9a-f]+")) { 1671 return false; 1672 } 1673 return true; 1674 } 1675 }; 1676 1677 FileStatus[] statusList = fs.globStatus(queryPath, pathFilter); 1678 1679 if (LOG.isDebugEnabled()) { 1680 LOG.debug("Query Path: {} ; # list of files: {}", queryPath, Arrays.toString(statusList)); 1681 } 1682 1683 if (null == statusList) { 1684 return; 1685 } 1686 1687 // lower the number of threads in case we have very few expected regions 1688 threadPoolSize = Math.min(threadPoolSize, statusList.length); 1689 1690 // run in multiple threads 1691 final ExecutorService tpe = Executors.newFixedThreadPool(threadPoolSize, 1692 Threads.newDaemonThreadFactory("FSRegionQuery")); 1693 try { 1694 // ignore all file status items that are not of interest 1695 for (FileStatus regionStatus : statusList) { 1696 if (null == regionStatus || !regionStatus.isDirectory()) { 1697 continue; 1698 } 1699 1700 final Path regionPath = regionStatus.getPath(); 1701 if (null != regionPath) { 1702 tpe.execute(new FSRegionScanner(fs, regionPath, null, regionDegreeLocalityMapping)); 1703 } 1704 } 1705 } finally { 1706 tpe.shutdown(); 1707 final long threadWakeFrequency = (long) conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 1708 HConstants.DEFAULT_THREAD_WAKE_FREQUENCY); 1709 try { 1710 // here we wait until TPE terminates, which is either naturally or by 1711 // exceptions in the execution of the threads 1712 while (!tpe.awaitTermination(threadWakeFrequency, 1713 TimeUnit.MILLISECONDS)) { 1714 // printing out rough estimate, so as to not introduce 1715 // AtomicInteger 1716 LOG.info("Locality checking is underway: { Scanned Regions : " 1717 + ((ThreadPoolExecutor) tpe).getCompletedTaskCount() + "/" 1718 + ((ThreadPoolExecutor) tpe).getTaskCount() + " }"); 1719 } 1720 } catch (InterruptedException e) { 1721 Thread.currentThread().interrupt(); 1722 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 1723 } 1724 } 1725 1726 long overhead = EnvironmentEdgeManager.currentTime() - startTime; 1727 LOG.info("Scan DFS for locality info takes {}ms", overhead); 1728 } 1729 1730 /** 1731 * Do our short circuit read setup. 1732 * Checks buffer size to use and whether to do checksumming in hbase or hdfs. 1733 * @param conf 1734 */ 1735 public static void setupShortCircuitRead(final Configuration conf) { 1736 // Check that the user has not set the "dfs.client.read.shortcircuit.skip.checksum" property. 1737 boolean shortCircuitSkipChecksum = 1738 conf.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false); 1739 boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true); 1740 if (shortCircuitSkipChecksum) { 1741 LOG.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not " + 1742 "be set to true." + (useHBaseChecksum ? " HBase checksum doesn't require " + 1743 "it, see https://issues.apache.org/jira/browse/HBASE-6868." : "")); 1744 assert !shortCircuitSkipChecksum; //this will fail if assertions are on 1745 } 1746 checkShortCircuitReadBufferSize(conf); 1747 } 1748 1749 /** 1750 * Check if short circuit read buffer size is set and if not, set it to hbase value. 1751 * @param conf 1752 */ 1753 public static void checkShortCircuitReadBufferSize(final Configuration conf) { 1754 final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2; 1755 final int notSet = -1; 1756 // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2 1757 final String dfsKey = "dfs.client.read.shortcircuit.buffer.size"; 1758 int size = conf.getInt(dfsKey, notSet); 1759 // If a size is set, return -- we will use it. 1760 if (size != notSet) return; 1761 // But short circuit buffer size is normally not set. Put in place the hbase wanted size. 1762 int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize); 1763 conf.setIfUnset(dfsKey, Integer.toString(hbaseSize)); 1764 } 1765 1766 /** 1767 * @param c 1768 * @return The DFSClient DFSHedgedReadMetrics instance or null if can't be found or not on hdfs. 1769 * @throws IOException 1770 */ 1771 public static DFSHedgedReadMetrics getDFSHedgedReadMetrics(final Configuration c) 1772 throws IOException { 1773 if (!isHDFS(c)) return null; 1774 // getHedgedReadMetrics is package private. Get the DFSClient instance that is internal 1775 // to the DFS FS instance and make the method getHedgedReadMetrics accessible, then invoke it 1776 // to get the singleton instance of DFSHedgedReadMetrics shared by DFSClients. 1777 final String name = "getHedgedReadMetrics"; 1778 DFSClient dfsclient = ((DistributedFileSystem)FileSystem.get(c)).getClient(); 1779 Method m; 1780 try { 1781 m = dfsclient.getClass().getDeclaredMethod(name); 1782 } catch (NoSuchMethodException e) { 1783 LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " + 1784 e.getMessage()); 1785 return null; 1786 } catch (SecurityException e) { 1787 LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " + 1788 e.getMessage()); 1789 return null; 1790 } 1791 m.setAccessible(true); 1792 try { 1793 return (DFSHedgedReadMetrics)m.invoke(dfsclient); 1794 } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { 1795 LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " + 1796 e.getMessage()); 1797 return null; 1798 } 1799 } 1800 1801 public static List<Path> copyFilesParallel(FileSystem srcFS, Path src, FileSystem dstFS, Path dst, 1802 Configuration conf, int threads) throws IOException { 1803 ExecutorService pool = Executors.newFixedThreadPool(threads); 1804 List<Future<Void>> futures = new ArrayList<>(); 1805 List<Path> traversedPaths; 1806 try { 1807 traversedPaths = copyFiles(srcFS, src, dstFS, dst, conf, pool, futures); 1808 for (Future<Void> future : futures) { 1809 future.get(); 1810 } 1811 } catch (ExecutionException | InterruptedException | IOException e) { 1812 throw new IOException("Copy snapshot reference files failed", e); 1813 } finally { 1814 pool.shutdownNow(); 1815 } 1816 return traversedPaths; 1817 } 1818 1819 private static List<Path> copyFiles(FileSystem srcFS, Path src, FileSystem dstFS, Path dst, 1820 Configuration conf, ExecutorService pool, List<Future<Void>> futures) throws IOException { 1821 List<Path> traversedPaths = new ArrayList<>(); 1822 traversedPaths.add(dst); 1823 FileStatus currentFileStatus = srcFS.getFileStatus(src); 1824 if (currentFileStatus.isDirectory()) { 1825 if (!dstFS.mkdirs(dst)) { 1826 throw new IOException("Create directory failed: " + dst); 1827 } 1828 FileStatus[] subPaths = srcFS.listStatus(src); 1829 for (FileStatus subPath : subPaths) { 1830 traversedPaths.addAll(copyFiles(srcFS, subPath.getPath(), dstFS, 1831 new Path(dst, subPath.getPath().getName()), conf, pool, futures)); 1832 } 1833 } else { 1834 Future<Void> future = pool.submit(() -> { 1835 FileUtil.copy(srcFS, src, dstFS, dst, false, false, conf); 1836 return null; 1837 }); 1838 futures.add(future); 1839 } 1840 return traversedPaths; 1841 } 1842}