001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.util; 020 021import edu.umd.cs.findbugs.annotations.CheckForNull; 022import java.io.ByteArrayInputStream; 023import java.io.DataInputStream; 024import java.io.EOFException; 025import java.io.FileNotFoundException; 026import java.io.IOException; 027import java.io.InputStream; 028import java.io.InterruptedIOException; 029import java.lang.reflect.InvocationTargetException; 030import java.lang.reflect.Method; 031import java.net.InetSocketAddress; 032import java.util.ArrayList; 033import java.util.Arrays; 034import java.util.Collections; 035import java.util.HashMap; 036import java.util.Iterator; 037import java.util.LinkedList; 038import java.util.List; 039import java.util.Locale; 040import java.util.Map; 041import java.util.Vector; 042import java.util.concurrent.ArrayBlockingQueue; 043import java.util.concurrent.ConcurrentHashMap; 044import java.util.concurrent.ExecutionException; 045import java.util.concurrent.ExecutorService; 046import java.util.concurrent.Future; 047import java.util.concurrent.FutureTask; 048import java.util.concurrent.ThreadPoolExecutor; 049import java.util.concurrent.TimeUnit; 050import java.util.regex.Pattern; 051 052import org.apache.hadoop.conf.Configuration; 053import org.apache.hadoop.fs.BlockLocation; 054import org.apache.hadoop.fs.FSDataInputStream; 055import org.apache.hadoop.fs.FSDataOutputStream; 056import org.apache.hadoop.fs.FileStatus; 057import org.apache.hadoop.fs.FileSystem; 058import org.apache.hadoop.fs.Path; 059import org.apache.hadoop.fs.PathFilter; 060import org.apache.hadoop.fs.permission.FsAction; 061import org.apache.hadoop.fs.permission.FsPermission; 062import org.apache.hadoop.hbase.ClusterId; 063import org.apache.hadoop.hbase.HColumnDescriptor; 064import org.apache.hadoop.hbase.HConstants; 065import org.apache.hadoop.hbase.HDFSBlocksDistribution; 066import org.apache.hadoop.hbase.HRegionInfo; 067import org.apache.hadoop.hbase.TableName; 068import org.apache.hadoop.hbase.client.RegionInfo; 069import org.apache.hadoop.hbase.client.RegionInfoBuilder; 070import org.apache.hadoop.hbase.exceptions.DeserializationException; 071import org.apache.hadoop.hbase.fs.HFileSystem; 072import org.apache.hadoop.hbase.io.HFileLink; 073import org.apache.hadoop.hbase.master.HMaster; 074import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 075import org.apache.hadoop.hbase.security.AccessDeniedException; 076import org.apache.hadoop.hdfs.DFSClient; 077import org.apache.hadoop.hdfs.DFSHedgedReadMetrics; 078import org.apache.hadoop.hdfs.DistributedFileSystem; 079import org.apache.hadoop.hdfs.protocol.HdfsConstants; 080import org.apache.hadoop.io.IOUtils; 081import org.apache.hadoop.ipc.RemoteException; 082import org.apache.hadoop.security.UserGroupInformation; 083import org.apache.hadoop.util.Progressable; 084import org.apache.hadoop.util.ReflectionUtils; 085import org.apache.hadoop.util.StringUtils; 086import org.apache.yetus.audience.InterfaceAudience; 087import org.slf4j.Logger; 088import org.slf4j.LoggerFactory; 089 090import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 091import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 092import org.apache.hbase.thirdparty.com.google.common.collect.Iterators; 093import org.apache.hbase.thirdparty.com.google.common.primitives.Ints; 094 095import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 096import org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos; 097 098/** 099 * Utility methods for interacting with the underlying file system. 100 */ 101@InterfaceAudience.Private 102public abstract class FSUtils extends CommonFSUtils { 103 private static final Logger LOG = LoggerFactory.getLogger(FSUtils.class); 104 105 private static final String THREAD_POOLSIZE = "hbase.client.localityCheck.threadPoolSize"; 106 private static final int DEFAULT_THREAD_POOLSIZE = 2; 107 108 /** Set to true on Windows platforms */ 109 @VisibleForTesting // currently only used in testing. TODO refactor into a test class 110 public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows"); 111 112 protected FSUtils() { 113 super(); 114 } 115 116 /** 117 * @return True is <code>fs</code> is instance of DistributedFileSystem 118 * @throws IOException 119 */ 120 public static boolean isDistributedFileSystem(final FileSystem fs) throws IOException { 121 FileSystem fileSystem = fs; 122 // If passed an instance of HFileSystem, it fails instanceof DistributedFileSystem. 123 // Check its backing fs for dfs-ness. 124 if (fs instanceof HFileSystem) { 125 fileSystem = ((HFileSystem)fs).getBackingFs(); 126 } 127 return fileSystem instanceof DistributedFileSystem; 128 } 129 130 /** 131 * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the 132 * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true. Does not consider 133 * schema; i.e. if schemas different but path or subpath matches, the two will equate. 134 * @param pathToSearch Path we will be trying to match. 135 * @param pathTail 136 * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code> 137 */ 138 public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) { 139 if (pathToSearch.depth() != pathTail.depth()) return false; 140 Path tailPath = pathTail; 141 String tailName; 142 Path toSearch = pathToSearch; 143 String toSearchName; 144 boolean result = false; 145 do { 146 tailName = tailPath.getName(); 147 if (tailName == null || tailName.length() <= 0) { 148 result = true; 149 break; 150 } 151 toSearchName = toSearch.getName(); 152 if (toSearchName == null || toSearchName.length() <= 0) break; 153 // Move up a parent on each path for next go around. Path doesn't let us go off the end. 154 tailPath = tailPath.getParent(); 155 toSearch = toSearch.getParent(); 156 } while(tailName.equals(toSearchName)); 157 return result; 158 } 159 160 public static FSUtils getInstance(FileSystem fs, Configuration conf) { 161 String scheme = fs.getUri().getScheme(); 162 if (scheme == null) { 163 LOG.warn("Could not find scheme for uri " + 164 fs.getUri() + ", default to hdfs"); 165 scheme = "hdfs"; 166 } 167 Class<?> fsUtilsClass = conf.getClass("hbase.fsutil." + 168 scheme + ".impl", FSHDFSUtils.class); // Default to HDFS impl 169 FSUtils fsUtils = (FSUtils)ReflectionUtils.newInstance(fsUtilsClass, conf); 170 return fsUtils; 171 } 172 173 /** 174 * Delete the region directory if exists. 175 * @param conf 176 * @param hri 177 * @return True if deleted the region directory. 178 * @throws IOException 179 */ 180 public static boolean deleteRegionDir(final Configuration conf, final HRegionInfo hri) 181 throws IOException { 182 Path rootDir = getRootDir(conf); 183 FileSystem fs = rootDir.getFileSystem(conf); 184 return deleteDirectory(fs, 185 new Path(getTableDir(rootDir, hri.getTable()), hri.getEncodedName())); 186 } 187 188 /** 189 * Create the specified file on the filesystem. By default, this will: 190 * <ol> 191 * <li>overwrite the file if it exists</li> 192 * <li>apply the umask in the configuration (if it is enabled)</li> 193 * <li>use the fs configured buffer size (or 4096 if not set)</li> 194 * <li>use the configured column family replication or default replication if 195 * {@link HColumnDescriptor#DEFAULT_DFS_REPLICATION}</li> 196 * <li>use the default block size</li> 197 * <li>not track progress</li> 198 * </ol> 199 * @param conf configurations 200 * @param fs {@link FileSystem} on which to write the file 201 * @param path {@link Path} to the file to write 202 * @param perm permissions 203 * @param favoredNodes 204 * @return output stream to the created file 205 * @throws IOException if the file cannot be created 206 */ 207 public static FSDataOutputStream create(Configuration conf, FileSystem fs, Path path, 208 FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException { 209 if (fs instanceof HFileSystem) { 210 FileSystem backingFs = ((HFileSystem)fs).getBackingFs(); 211 if (backingFs instanceof DistributedFileSystem) { 212 // Try to use the favoredNodes version via reflection to allow backwards- 213 // compatibility. 214 short replication = Short.parseShort(conf.get(HColumnDescriptor.DFS_REPLICATION, 215 String.valueOf(HColumnDescriptor.DEFAULT_DFS_REPLICATION))); 216 try { 217 return (FSDataOutputStream) (DistributedFileSystem.class.getDeclaredMethod("create", 218 Path.class, FsPermission.class, boolean.class, int.class, short.class, long.class, 219 Progressable.class, InetSocketAddress[].class).invoke(backingFs, path, perm, true, 220 getDefaultBufferSize(backingFs), 221 replication > 0 ? replication : getDefaultReplication(backingFs, path), 222 getDefaultBlockSize(backingFs, path), null, favoredNodes)); 223 } catch (InvocationTargetException ite) { 224 // Function was properly called, but threw it's own exception. 225 throw new IOException(ite.getCause()); 226 } catch (NoSuchMethodException e) { 227 LOG.debug("DFS Client does not support most favored nodes create; using default create"); 228 if (LOG.isTraceEnabled()) LOG.trace("Ignoring; use default create", e); 229 } catch (IllegalArgumentException e) { 230 LOG.debug("Ignoring (most likely Reflection related exception) " + e); 231 } catch (SecurityException e) { 232 LOG.debug("Ignoring (most likely Reflection related exception) " + e); 233 } catch (IllegalAccessException e) { 234 LOG.debug("Ignoring (most likely Reflection related exception) " + e); 235 } 236 } 237 } 238 return create(fs, path, perm, true); 239 } 240 241 /** 242 * Checks to see if the specified file system is available 243 * 244 * @param fs filesystem 245 * @throws IOException e 246 */ 247 public static void checkFileSystemAvailable(final FileSystem fs) 248 throws IOException { 249 if (!(fs instanceof DistributedFileSystem)) { 250 return; 251 } 252 IOException exception = null; 253 DistributedFileSystem dfs = (DistributedFileSystem) fs; 254 try { 255 if (dfs.exists(new Path("/"))) { 256 return; 257 } 258 } catch (IOException e) { 259 exception = e instanceof RemoteException ? 260 ((RemoteException)e).unwrapRemoteException() : e; 261 } 262 try { 263 fs.close(); 264 } catch (Exception e) { 265 LOG.error("file system close failed: ", e); 266 } 267 IOException io = new IOException("File system is not available"); 268 io.initCause(exception); 269 throw io; 270 } 271 272 /** 273 * We use reflection because {@link DistributedFileSystem#setSafeMode( 274 * HdfsConstants.SafeModeAction action, boolean isChecked)} is not in hadoop 1.1 275 * 276 * @param dfs 277 * @return whether we're in safe mode 278 * @throws IOException 279 */ 280 private static boolean isInSafeMode(DistributedFileSystem dfs) throws IOException { 281 boolean inSafeMode = false; 282 try { 283 Method m = DistributedFileSystem.class.getMethod("setSafeMode", new Class<?> []{ 284 org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.class, boolean.class}); 285 inSafeMode = (Boolean) m.invoke(dfs, 286 org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET, true); 287 } catch (Exception e) { 288 if (e instanceof IOException) throw (IOException) e; 289 290 // Check whether dfs is on safemode. 291 inSafeMode = dfs.setSafeMode( 292 org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET); 293 } 294 return inSafeMode; 295 } 296 297 /** 298 * Check whether dfs is in safemode. 299 * @param conf 300 * @throws IOException 301 */ 302 public static void checkDfsSafeMode(final Configuration conf) 303 throws IOException { 304 boolean isInSafeMode = false; 305 FileSystem fs = FileSystem.get(conf); 306 if (fs instanceof DistributedFileSystem) { 307 DistributedFileSystem dfs = (DistributedFileSystem)fs; 308 isInSafeMode = isInSafeMode(dfs); 309 } 310 if (isInSafeMode) { 311 throw new IOException("File system is in safemode, it can't be written now"); 312 } 313 } 314 315 /** 316 * Verifies current version of file system 317 * 318 * @param fs filesystem object 319 * @param rootdir root hbase directory 320 * @return null if no version file exists, version string otherwise. 321 * @throws IOException e 322 * @throws org.apache.hadoop.hbase.exceptions.DeserializationException 323 */ 324 public static String getVersion(FileSystem fs, Path rootdir) 325 throws IOException, DeserializationException { 326 Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME); 327 FileStatus[] status = null; 328 try { 329 // hadoop 2.0 throws FNFE if directory does not exist. 330 // hadoop 1.0 returns null if directory does not exist. 331 status = fs.listStatus(versionFile); 332 } catch (FileNotFoundException fnfe) { 333 return null; 334 } 335 if (status == null || status.length == 0) return null; 336 String version = null; 337 byte [] content = new byte [(int)status[0].getLen()]; 338 FSDataInputStream s = fs.open(versionFile); 339 try { 340 IOUtils.readFully(s, content, 0, content.length); 341 if (ProtobufUtil.isPBMagicPrefix(content)) { 342 version = parseVersionFrom(content); 343 } else { 344 // Presume it pre-pb format. 345 InputStream is = new ByteArrayInputStream(content); 346 DataInputStream dis = new DataInputStream(is); 347 try { 348 version = dis.readUTF(); 349 } finally { 350 dis.close(); 351 } 352 } 353 } catch (EOFException eof) { 354 LOG.warn("Version file was empty, odd, will try to set it."); 355 } finally { 356 s.close(); 357 } 358 return version; 359 } 360 361 /** 362 * Parse the content of the ${HBASE_ROOTDIR}/hbase.version file. 363 * @param bytes The byte content of the hbase.version file. 364 * @return The version found in the file as a String. 365 * @throws DeserializationException 366 */ 367 static String parseVersionFrom(final byte [] bytes) 368 throws DeserializationException { 369 ProtobufUtil.expectPBMagicPrefix(bytes); 370 int pblen = ProtobufUtil.lengthOfPBMagic(); 371 FSProtos.HBaseVersionFileContent.Builder builder = 372 FSProtos.HBaseVersionFileContent.newBuilder(); 373 try { 374 ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen); 375 return builder.getVersion(); 376 } catch (IOException e) { 377 // Convert 378 throw new DeserializationException(e); 379 } 380 } 381 382 /** 383 * Create the content to write into the ${HBASE_ROOTDIR}/hbase.version file. 384 * @param version Version to persist 385 * @return Serialized protobuf with <code>version</code> content and a bit of pb magic for a prefix. 386 */ 387 static byte [] toVersionByteArray(final String version) { 388 FSProtos.HBaseVersionFileContent.Builder builder = 389 FSProtos.HBaseVersionFileContent.newBuilder(); 390 return ProtobufUtil.prependPBMagic(builder.setVersion(version).build().toByteArray()); 391 } 392 393 /** 394 * Verifies current version of file system 395 * 396 * @param fs file system 397 * @param rootdir root directory of HBase installation 398 * @param message if true, issues a message on System.out 399 * 400 * @throws IOException e 401 * @throws DeserializationException 402 */ 403 public static void checkVersion(FileSystem fs, Path rootdir, boolean message) 404 throws IOException, DeserializationException { 405 checkVersion(fs, rootdir, message, 0, HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS); 406 } 407 408 /** 409 * Verifies current version of file system 410 * 411 * @param fs file system 412 * @param rootdir root directory of HBase installation 413 * @param message if true, issues a message on System.out 414 * @param wait wait interval 415 * @param retries number of times to retry 416 * 417 * @throws IOException e 418 * @throws DeserializationException 419 */ 420 public static void checkVersion(FileSystem fs, Path rootdir, 421 boolean message, int wait, int retries) 422 throws IOException, DeserializationException { 423 String version = getVersion(fs, rootdir); 424 String msg; 425 if (version == null) { 426 if (!metaRegionExists(fs, rootdir)) { 427 // rootDir is empty (no version file and no root region) 428 // just create new version file (HBASE-1195) 429 setVersion(fs, rootdir, wait, retries); 430 return; 431 } else { 432 msg = "hbase.version file is missing. Is your hbase.rootdir valid? " + 433 "You can restore hbase.version file by running 'HBCK2 filesystem -fix'. " + 434 "See https://github.com/apache/hbase-operator-tools/tree/master/hbase-hbck2"; 435 } 436 } else if (version.compareTo(HConstants.FILE_SYSTEM_VERSION) == 0) { 437 return; 438 } else { 439 msg = "HBase file layout needs to be upgraded. Current filesystem version is " + version + 440 " but software requires version " + HConstants.FILE_SYSTEM_VERSION + 441 ". Consult http://hbase.apache.org/book.html for further information about " + 442 "upgrading HBase."; 443 } 444 445 // version is deprecated require migration 446 // Output on stdout so user sees it in terminal. 447 if (message) { 448 System.out.println("WARNING! " + msg); 449 } 450 throw new FileSystemVersionException(msg); 451 } 452 453 /** 454 * Sets version of file system 455 * 456 * @param fs filesystem object 457 * @param rootdir hbase root 458 * @throws IOException e 459 */ 460 public static void setVersion(FileSystem fs, Path rootdir) 461 throws IOException { 462 setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, 0, 463 HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS); 464 } 465 466 /** 467 * Sets version of file system 468 * 469 * @param fs filesystem object 470 * @param rootdir hbase root 471 * @param wait time to wait for retry 472 * @param retries number of times to retry before failing 473 * @throws IOException e 474 */ 475 public static void setVersion(FileSystem fs, Path rootdir, int wait, int retries) 476 throws IOException { 477 setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, wait, retries); 478 } 479 480 481 /** 482 * Sets version of file system 483 * 484 * @param fs filesystem object 485 * @param rootdir hbase root directory 486 * @param version version to set 487 * @param wait time to wait for retry 488 * @param retries number of times to retry before throwing an IOException 489 * @throws IOException e 490 */ 491 public static void setVersion(FileSystem fs, Path rootdir, String version, 492 int wait, int retries) throws IOException { 493 Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME); 494 Path tempVersionFile = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY + Path.SEPARATOR + 495 HConstants.VERSION_FILE_NAME); 496 while (true) { 497 try { 498 // Write the version to a temporary file 499 FSDataOutputStream s = fs.create(tempVersionFile); 500 try { 501 s.write(toVersionByteArray(version)); 502 s.close(); 503 s = null; 504 // Move the temp version file to its normal location. Returns false 505 // if the rename failed. Throw an IOE in that case. 506 if (!fs.rename(tempVersionFile, versionFile)) { 507 throw new IOException("Unable to move temp version file to " + versionFile); 508 } 509 } finally { 510 // Cleaning up the temporary if the rename failed would be trying 511 // too hard. We'll unconditionally create it again the next time 512 // through anyway, files are overwritten by default by create(). 513 514 // Attempt to close the stream on the way out if it is still open. 515 try { 516 if (s != null) s.close(); 517 } catch (IOException ignore) { } 518 } 519 LOG.info("Created version file at " + rootdir.toString() + " with version=" + version); 520 return; 521 } catch (IOException e) { 522 if (retries > 0) { 523 LOG.debug("Unable to create version file at " + rootdir.toString() + ", retrying", e); 524 fs.delete(versionFile, false); 525 try { 526 if (wait > 0) { 527 Thread.sleep(wait); 528 } 529 } catch (InterruptedException ie) { 530 throw (InterruptedIOException)new InterruptedIOException().initCause(ie); 531 } 532 retries--; 533 } else { 534 throw e; 535 } 536 } 537 } 538 } 539 540 /** 541 * Checks that a cluster ID file exists in the HBase root directory 542 * @param fs the root directory FileSystem 543 * @param rootdir the HBase root directory in HDFS 544 * @param wait how long to wait between retries 545 * @return <code>true</code> if the file exists, otherwise <code>false</code> 546 * @throws IOException if checking the FileSystem fails 547 */ 548 public static boolean checkClusterIdExists(FileSystem fs, Path rootdir, 549 int wait) throws IOException { 550 while (true) { 551 try { 552 Path filePath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME); 553 return fs.exists(filePath); 554 } catch (IOException ioe) { 555 if (wait > 0) { 556 LOG.warn("Unable to check cluster ID file in " + rootdir.toString() + 557 ", retrying in "+wait+"msec: "+StringUtils.stringifyException(ioe)); 558 try { 559 Thread.sleep(wait); 560 } catch (InterruptedException e) { 561 throw (InterruptedIOException)new InterruptedIOException().initCause(e); 562 } 563 } else { 564 throw ioe; 565 } 566 } 567 } 568 } 569 570 /** 571 * Returns the value of the unique cluster ID stored for this HBase instance. 572 * @param fs the root directory FileSystem 573 * @param rootdir the path to the HBase root directory 574 * @return the unique cluster identifier 575 * @throws IOException if reading the cluster ID file fails 576 */ 577 public static ClusterId getClusterId(FileSystem fs, Path rootdir) 578 throws IOException { 579 Path idPath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME); 580 ClusterId clusterId = null; 581 FileStatus status = fs.exists(idPath)? fs.getFileStatus(idPath): null; 582 if (status != null) { 583 int len = Ints.checkedCast(status.getLen()); 584 byte [] content = new byte[len]; 585 FSDataInputStream in = fs.open(idPath); 586 try { 587 in.readFully(content); 588 } catch (EOFException eof) { 589 LOG.warn("Cluster ID file " + idPath.toString() + " was empty"); 590 } finally{ 591 in.close(); 592 } 593 try { 594 clusterId = ClusterId.parseFrom(content); 595 } catch (DeserializationException e) { 596 throw new IOException("content=" + Bytes.toString(content), e); 597 } 598 // If not pb'd, make it so. 599 if (!ProtobufUtil.isPBMagicPrefix(content)) { 600 String cid = null; 601 in = fs.open(idPath); 602 try { 603 cid = in.readUTF(); 604 clusterId = new ClusterId(cid); 605 } catch (EOFException eof) { 606 LOG.warn("Cluster ID file " + idPath.toString() + " was empty"); 607 } finally { 608 in.close(); 609 } 610 rewriteAsPb(fs, rootdir, idPath, clusterId); 611 } 612 return clusterId; 613 } else { 614 LOG.warn("Cluster ID file does not exist at " + idPath.toString()); 615 } 616 return clusterId; 617 } 618 619 /** 620 * @param cid 621 * @throws IOException 622 */ 623 private static void rewriteAsPb(final FileSystem fs, final Path rootdir, final Path p, 624 final ClusterId cid) 625 throws IOException { 626 // Rewrite the file as pb. Move aside the old one first, write new 627 // then delete the moved-aside file. 628 Path movedAsideName = new Path(p + "." + System.currentTimeMillis()); 629 if (!fs.rename(p, movedAsideName)) throw new IOException("Failed rename of " + p); 630 setClusterId(fs, rootdir, cid, 100); 631 if (!fs.delete(movedAsideName, false)) { 632 throw new IOException("Failed delete of " + movedAsideName); 633 } 634 LOG.debug("Rewrote the hbase.id file as pb"); 635 } 636 637 /** 638 * Writes a new unique identifier for this cluster to the "hbase.id" file 639 * in the HBase root directory 640 * @param fs the root directory FileSystem 641 * @param rootdir the path to the HBase root directory 642 * @param clusterId the unique identifier to store 643 * @param wait how long (in milliseconds) to wait between retries 644 * @throws IOException if writing to the FileSystem fails and no wait value 645 */ 646 public static void setClusterId(FileSystem fs, Path rootdir, ClusterId clusterId, 647 int wait) throws IOException { 648 while (true) { 649 try { 650 Path idFile = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME); 651 Path tempIdFile = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY + 652 Path.SEPARATOR + HConstants.CLUSTER_ID_FILE_NAME); 653 // Write the id file to a temporary location 654 FSDataOutputStream s = fs.create(tempIdFile); 655 try { 656 s.write(clusterId.toByteArray()); 657 s.close(); 658 s = null; 659 // Move the temporary file to its normal location. Throw an IOE if 660 // the rename failed 661 if (!fs.rename(tempIdFile, idFile)) { 662 throw new IOException("Unable to move temp version file to " + idFile); 663 } 664 } finally { 665 // Attempt to close the stream if still open on the way out 666 try { 667 if (s != null) s.close(); 668 } catch (IOException ignore) { } 669 } 670 if (LOG.isDebugEnabled()) { 671 LOG.debug("Created cluster ID file at " + idFile.toString() + " with ID: " + clusterId); 672 } 673 return; 674 } catch (IOException ioe) { 675 if (wait > 0) { 676 LOG.warn("Unable to create cluster ID file in " + rootdir.toString() + 677 ", retrying in " + wait + "msec: " + StringUtils.stringifyException(ioe)); 678 try { 679 Thread.sleep(wait); 680 } catch (InterruptedException e) { 681 throw (InterruptedIOException)new InterruptedIOException().initCause(e); 682 } 683 } else { 684 throw ioe; 685 } 686 } 687 } 688 } 689 690 /** 691 * If DFS, check safe mode and if so, wait until we clear it. 692 * @param conf configuration 693 * @param wait Sleep between retries 694 * @throws IOException e 695 */ 696 public static void waitOnSafeMode(final Configuration conf, 697 final long wait) 698 throws IOException { 699 FileSystem fs = FileSystem.get(conf); 700 if (!(fs instanceof DistributedFileSystem)) return; 701 DistributedFileSystem dfs = (DistributedFileSystem)fs; 702 // Make sure dfs is not in safe mode 703 while (isInSafeMode(dfs)) { 704 LOG.info("Waiting for dfs to exit safe mode..."); 705 try { 706 Thread.sleep(wait); 707 } catch (InterruptedException e) { 708 throw (InterruptedIOException)new InterruptedIOException().initCause(e); 709 } 710 } 711 } 712 713 /** 714 * Checks if meta region exists 715 * @param fs file system 716 * @param rootDir root directory of HBase installation 717 * @return true if exists 718 */ 719 public static boolean metaRegionExists(FileSystem fs, Path rootDir) throws IOException { 720 Path metaRegionDir = getRegionDirFromRootDir(rootDir, RegionInfoBuilder.FIRST_META_REGIONINFO); 721 return fs.exists(metaRegionDir); 722 } 723 724 /** 725 * Compute HDFS blocks distribution of a given file, or a portion of the file 726 * @param fs file system 727 * @param status file status of the file 728 * @param start start position of the portion 729 * @param length length of the portion 730 * @return The HDFS blocks distribution 731 */ 732 static public HDFSBlocksDistribution computeHDFSBlocksDistribution( 733 final FileSystem fs, FileStatus status, long start, long length) 734 throws IOException { 735 HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution(); 736 BlockLocation [] blockLocations = 737 fs.getFileBlockLocations(status, start, length); 738 for(BlockLocation bl : blockLocations) { 739 String [] hosts = bl.getHosts(); 740 long len = bl.getLength(); 741 blocksDistribution.addHostsAndBlockWeight(hosts, len); 742 } 743 744 return blocksDistribution; 745 } 746 747 /** 748 * Update blocksDistribution with blockLocations 749 * @param blocksDistribution the hdfs blocks distribution 750 * @param blockLocations an array containing block location 751 */ 752 static public void addToHDFSBlocksDistribution( 753 HDFSBlocksDistribution blocksDistribution, BlockLocation[] blockLocations) 754 throws IOException { 755 for (BlockLocation bl : blockLocations) { 756 String[] hosts = bl.getHosts(); 757 long len = bl.getLength(); 758 blocksDistribution.addHostsAndBlockWeight(hosts, len); 759 } 760 } 761 762 // TODO move this method OUT of FSUtils. No dependencies to HMaster 763 /** 764 * Returns the total overall fragmentation percentage. Includes hbase:meta and 765 * -ROOT- as well. 766 * 767 * @param master The master defining the HBase root and file system. 768 * @return A map for each table and its percentage. 769 * @throws IOException When scanning the directory fails. 770 */ 771 public static int getTotalTableFragmentation(final HMaster master) 772 throws IOException { 773 Map<String, Integer> map = getTableFragmentation(master); 774 return map != null && map.size() > 0 ? map.get("-TOTAL-") : -1; 775 } 776 777 /** 778 * Runs through the HBase rootdir and checks how many stores for each table 779 * have more than one file in them. Checks -ROOT- and hbase:meta too. The total 780 * percentage across all tables is stored under the special key "-TOTAL-". 781 * 782 * @param master The master defining the HBase root and file system. 783 * @return A map for each table and its percentage. 784 * 785 * @throws IOException When scanning the directory fails. 786 */ 787 public static Map<String, Integer> getTableFragmentation( 788 final HMaster master) 789 throws IOException { 790 Path path = getRootDir(master.getConfiguration()); 791 // since HMaster.getFileSystem() is package private 792 FileSystem fs = path.getFileSystem(master.getConfiguration()); 793 return getTableFragmentation(fs, path); 794 } 795 796 /** 797 * Runs through the HBase rootdir and checks how many stores for each table 798 * have more than one file in them. Checks -ROOT- and hbase:meta too. The total 799 * percentage across all tables is stored under the special key "-TOTAL-". 800 * 801 * @param fs The file system to use. 802 * @param hbaseRootDir The root directory to scan. 803 * @return A map for each table and its percentage. 804 * @throws IOException When scanning the directory fails. 805 */ 806 public static Map<String, Integer> getTableFragmentation( 807 final FileSystem fs, final Path hbaseRootDir) 808 throws IOException { 809 Map<String, Integer> frags = new HashMap<>(); 810 int cfCountTotal = 0; 811 int cfFragTotal = 0; 812 PathFilter regionFilter = new RegionDirFilter(fs); 813 PathFilter familyFilter = new FamilyDirFilter(fs); 814 List<Path> tableDirs = getTableDirs(fs, hbaseRootDir); 815 for (Path d : tableDirs) { 816 int cfCount = 0; 817 int cfFrag = 0; 818 FileStatus[] regionDirs = fs.listStatus(d, regionFilter); 819 for (FileStatus regionDir : regionDirs) { 820 Path dd = regionDir.getPath(); 821 // else its a region name, now look in region for families 822 FileStatus[] familyDirs = fs.listStatus(dd, familyFilter); 823 for (FileStatus familyDir : familyDirs) { 824 cfCount++; 825 cfCountTotal++; 826 Path family = familyDir.getPath(); 827 // now in family make sure only one file 828 FileStatus[] familyStatus = fs.listStatus(family); 829 if (familyStatus.length > 1) { 830 cfFrag++; 831 cfFragTotal++; 832 } 833 } 834 } 835 // compute percentage per table and store in result list 836 frags.put(FSUtils.getTableName(d).getNameAsString(), 837 cfCount == 0? 0: Math.round((float) cfFrag / cfCount * 100)); 838 } 839 // set overall percentage for all tables 840 frags.put("-TOTAL-", 841 cfCountTotal == 0? 0: Math.round((float) cfFragTotal / cfCountTotal * 100)); 842 return frags; 843 } 844 845 /** 846 * A {@link PathFilter} that returns only regular files. 847 */ 848 static class FileFilter extends AbstractFileStatusFilter { 849 private final FileSystem fs; 850 851 public FileFilter(final FileSystem fs) { 852 this.fs = fs; 853 } 854 855 @Override 856 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 857 try { 858 return isFile(fs, isDir, p); 859 } catch (IOException e) { 860 LOG.warn("unable to verify if path=" + p + " is a regular file", e); 861 return false; 862 } 863 } 864 } 865 866 /** 867 * Directory filter that doesn't include any of the directories in the specified blacklist 868 */ 869 public static class BlackListDirFilter extends AbstractFileStatusFilter { 870 private final FileSystem fs; 871 private List<String> blacklist; 872 873 /** 874 * Create a filter on the givem filesystem with the specified blacklist 875 * @param fs filesystem to filter 876 * @param directoryNameBlackList list of the names of the directories to filter. If 877 * <tt>null</tt>, all directories are returned 878 */ 879 @SuppressWarnings("unchecked") 880 public BlackListDirFilter(final FileSystem fs, final List<String> directoryNameBlackList) { 881 this.fs = fs; 882 blacklist = 883 (List<String>) (directoryNameBlackList == null ? Collections.emptyList() 884 : directoryNameBlackList); 885 } 886 887 @Override 888 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 889 if (!isValidName(p.getName())) { 890 return false; 891 } 892 893 try { 894 return isDirectory(fs, isDir, p); 895 } catch (IOException e) { 896 LOG.warn("An error occurred while verifying if [" + p.toString() 897 + "] is a valid directory. Returning 'not valid' and continuing.", e); 898 return false; 899 } 900 } 901 902 protected boolean isValidName(final String name) { 903 return !blacklist.contains(name); 904 } 905 } 906 907 /** 908 * A {@link PathFilter} that only allows directories. 909 */ 910 public static class DirFilter extends BlackListDirFilter { 911 912 public DirFilter(FileSystem fs) { 913 super(fs, null); 914 } 915 } 916 917 /** 918 * A {@link PathFilter} that returns usertable directories. To get all directories use the 919 * {@link BlackListDirFilter} with a <tt>null</tt> blacklist 920 */ 921 public static class UserTableDirFilter extends BlackListDirFilter { 922 public UserTableDirFilter(FileSystem fs) { 923 super(fs, HConstants.HBASE_NON_TABLE_DIRS); 924 } 925 926 @Override 927 protected boolean isValidName(final String name) { 928 if (!super.isValidName(name)) 929 return false; 930 931 try { 932 TableName.isLegalTableQualifierName(Bytes.toBytes(name)); 933 } catch (IllegalArgumentException e) { 934 LOG.info("INVALID NAME " + name); 935 return false; 936 } 937 return true; 938 } 939 } 940 941 /** 942 * Recover file lease. Used when a file might be suspect 943 * to be had been left open by another process. 944 * @param fs FileSystem handle 945 * @param p Path of file to recover lease 946 * @param conf Configuration handle 947 * @throws IOException 948 */ 949 public abstract void recoverFileLease(final FileSystem fs, final Path p, 950 Configuration conf, CancelableProgressable reporter) throws IOException; 951 952 public static List<Path> getTableDirs(final FileSystem fs, final Path rootdir) 953 throws IOException { 954 List<Path> tableDirs = new LinkedList<>(); 955 956 for(FileStatus status : 957 fs.globStatus(new Path(rootdir, 958 new Path(HConstants.BASE_NAMESPACE_DIR, "*")))) { 959 tableDirs.addAll(FSUtils.getLocalTableDirs(fs, status.getPath())); 960 } 961 return tableDirs; 962 } 963 964 /** 965 * @param fs 966 * @param rootdir 967 * @return All the table directories under <code>rootdir</code>. Ignore non table hbase folders such as 968 * .logs, .oldlogs, .corrupt folders. 969 * @throws IOException 970 */ 971 public static List<Path> getLocalTableDirs(final FileSystem fs, final Path rootdir) 972 throws IOException { 973 // presumes any directory under hbase.rootdir is a table 974 FileStatus[] dirs = fs.listStatus(rootdir, new UserTableDirFilter(fs)); 975 List<Path> tabledirs = new ArrayList<>(dirs.length); 976 for (FileStatus dir: dirs) { 977 tabledirs.add(dir.getPath()); 978 } 979 return tabledirs; 980 } 981 982 /** 983 * Filter for all dirs that don't start with '.' 984 */ 985 public static class RegionDirFilter extends AbstractFileStatusFilter { 986 // This pattern will accept 0.90+ style hex region dirs and older numeric region dir names. 987 final public static Pattern regionDirPattern = Pattern.compile("^[0-9a-f]*$"); 988 final FileSystem fs; 989 990 public RegionDirFilter(FileSystem fs) { 991 this.fs = fs; 992 } 993 994 @Override 995 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 996 if (!regionDirPattern.matcher(p.getName()).matches()) { 997 return false; 998 } 999 1000 try { 1001 return isDirectory(fs, isDir, p); 1002 } catch (IOException ioe) { 1003 // Maybe the file was moved or the fs was disconnected. 1004 LOG.warn("Skipping file " + p +" due to IOException", ioe); 1005 return false; 1006 } 1007 } 1008 } 1009 1010 /** 1011 * Given a particular table dir, return all the regiondirs inside it, excluding files such as 1012 * .tableinfo 1013 * @param fs A file system for the Path 1014 * @param tableDir Path to a specific table directory <hbase.rootdir>/<tabledir> 1015 * @return List of paths to valid region directories in table dir. 1016 * @throws IOException 1017 */ 1018 public static List<Path> getRegionDirs(final FileSystem fs, final Path tableDir) throws IOException { 1019 // assumes we are in a table dir. 1020 List<FileStatus> rds = listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs)); 1021 if (rds == null) { 1022 return new ArrayList<>(); 1023 } 1024 List<Path> regionDirs = new ArrayList<>(rds.size()); 1025 for (FileStatus rdfs: rds) { 1026 Path rdPath = rdfs.getPath(); 1027 regionDirs.add(rdPath); 1028 } 1029 return regionDirs; 1030 } 1031 1032 public static Path getRegionDirFromRootDir(Path rootDir, RegionInfo region) { 1033 return getRegionDirFromTableDir(getTableDir(rootDir, region.getTable()), region); 1034 } 1035 1036 public static Path getRegionDirFromTableDir(Path tableDir, RegionInfo region) { 1037 return new Path(tableDir, ServerRegionReplicaUtil.getRegionInfoForFs(region).getEncodedName()); 1038 } 1039 1040 /** 1041 * Filter for all dirs that are legal column family names. This is generally used for colfam 1042 * dirs <hbase.rootdir>/<tabledir>/<regiondir>/<colfamdir>. 1043 */ 1044 public static class FamilyDirFilter extends AbstractFileStatusFilter { 1045 final FileSystem fs; 1046 1047 public FamilyDirFilter(FileSystem fs) { 1048 this.fs = fs; 1049 } 1050 1051 @Override 1052 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 1053 try { 1054 // throws IAE if invalid 1055 HColumnDescriptor.isLegalFamilyName(Bytes.toBytes(p.getName())); 1056 } catch (IllegalArgumentException iae) { 1057 // path name is an invalid family name and thus is excluded. 1058 return false; 1059 } 1060 1061 try { 1062 return isDirectory(fs, isDir, p); 1063 } catch (IOException ioe) { 1064 // Maybe the file was moved or the fs was disconnected. 1065 LOG.warn("Skipping file " + p +" due to IOException", ioe); 1066 return false; 1067 } 1068 } 1069 } 1070 1071 /** 1072 * Given a particular region dir, return all the familydirs inside it 1073 * 1074 * @param fs A file system for the Path 1075 * @param regionDir Path to a specific region directory 1076 * @return List of paths to valid family directories in region dir. 1077 * @throws IOException 1078 */ 1079 public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir) throws IOException { 1080 // assumes we are in a region dir. 1081 FileStatus[] fds = fs.listStatus(regionDir, new FamilyDirFilter(fs)); 1082 List<Path> familyDirs = new ArrayList<>(fds.length); 1083 for (FileStatus fdfs: fds) { 1084 Path fdPath = fdfs.getPath(); 1085 familyDirs.add(fdPath); 1086 } 1087 return familyDirs; 1088 } 1089 1090 public static List<Path> getReferenceFilePaths(final FileSystem fs, final Path familyDir) throws IOException { 1091 List<FileStatus> fds = listStatusWithStatusFilter(fs, familyDir, new ReferenceFileFilter(fs)); 1092 if (fds == null) { 1093 return new ArrayList<>(); 1094 } 1095 List<Path> referenceFiles = new ArrayList<>(fds.size()); 1096 for (FileStatus fdfs: fds) { 1097 Path fdPath = fdfs.getPath(); 1098 referenceFiles.add(fdPath); 1099 } 1100 return referenceFiles; 1101 } 1102 1103 /** 1104 * Filter for HFiles that excludes reference files. 1105 */ 1106 public static class HFileFilter extends AbstractFileStatusFilter { 1107 final FileSystem fs; 1108 1109 public HFileFilter(FileSystem fs) { 1110 this.fs = fs; 1111 } 1112 1113 @Override 1114 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 1115 if (!StoreFileInfo.isHFile(p)) { 1116 return false; 1117 } 1118 1119 try { 1120 return isFile(fs, isDir, p); 1121 } catch (IOException ioe) { 1122 // Maybe the file was moved or the fs was disconnected. 1123 LOG.warn("Skipping file " + p +" due to IOException", ioe); 1124 return false; 1125 } 1126 } 1127 } 1128 1129 /** 1130 * Filter for HFileLinks (StoreFiles and HFiles not included). 1131 * the filter itself does not consider if a link is file or not. 1132 */ 1133 public static class HFileLinkFilter implements PathFilter { 1134 1135 @Override 1136 public boolean accept(Path p) { 1137 return HFileLink.isHFileLink(p); 1138 } 1139 } 1140 1141 public static class ReferenceFileFilter extends AbstractFileStatusFilter { 1142 1143 private final FileSystem fs; 1144 1145 public ReferenceFileFilter(FileSystem fs) { 1146 this.fs = fs; 1147 } 1148 1149 @Override 1150 protected boolean accept(Path p, @CheckForNull Boolean isDir) { 1151 if (!StoreFileInfo.isReference(p)) { 1152 return false; 1153 } 1154 1155 try { 1156 // only files can be references. 1157 return isFile(fs, isDir, p); 1158 } catch (IOException ioe) { 1159 // Maybe the file was moved or the fs was disconnected. 1160 LOG.warn("Skipping file " + p +" due to IOException", ioe); 1161 return false; 1162 } 1163 } 1164 } 1165 1166 /** 1167 * Runs through the HBase rootdir/tablename and creates a reverse lookup map for 1168 * table StoreFile names to the full Path. 1169 * <br> 1170 * Example...<br> 1171 * Key = 3944417774205889744 <br> 1172 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1173 * 1174 * @param map map to add values. If null, this method will create and populate one to return 1175 * @param fs The file system to use. 1176 * @param hbaseRootDir The root directory to scan. 1177 * @param tableName name of the table to scan. 1178 * @return Map keyed by StoreFile name with a value of the full Path. 1179 * @throws IOException When scanning the directory fails. 1180 * @throws InterruptedException 1181 */ 1182 public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map, 1183 final FileSystem fs, final Path hbaseRootDir, TableName tableName) 1184 throws IOException, InterruptedException { 1185 return getTableStoreFilePathMap(map, fs, hbaseRootDir, tableName, null, null, null); 1186 } 1187 1188 /** 1189 * Runs through the HBase rootdir/tablename and creates a reverse lookup map for 1190 * table StoreFile names to the full Path. Note that because this method can be called 1191 * on a 'live' HBase system that we will skip files that no longer exist by the time 1192 * we traverse them and similarly the user of the result needs to consider that some 1193 * entries in this map may not exist by the time this call completes. 1194 * <br> 1195 * Example...<br> 1196 * Key = 3944417774205889744 <br> 1197 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1198 * 1199 * @param resultMap map to add values. If null, this method will create and populate one to return 1200 * @param fs The file system to use. 1201 * @param hbaseRootDir The root directory to scan. 1202 * @param tableName name of the table to scan. 1203 * @param sfFilter optional path filter to apply to store files 1204 * @param executor optional executor service to parallelize this operation 1205 * @param errors ErrorReporter instance or null 1206 * @return Map keyed by StoreFile name with a value of the full Path. 1207 * @throws IOException When scanning the directory fails. 1208 * @throws InterruptedException 1209 */ 1210 public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> resultMap, 1211 final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter, 1212 ExecutorService executor, final HbckErrorReporter errors) 1213 throws IOException, InterruptedException { 1214 1215 final Map<String, Path> finalResultMap = 1216 resultMap == null ? new ConcurrentHashMap<>(128, 0.75f, 32) : resultMap; 1217 1218 // only include the directory paths to tables 1219 Path tableDir = FSUtils.getTableDir(hbaseRootDir, tableName); 1220 // Inside a table, there are compaction.dir directories to skip. Otherwise, all else 1221 // should be regions. 1222 final FamilyDirFilter familyFilter = new FamilyDirFilter(fs); 1223 final Vector<Exception> exceptions = new Vector<>(); 1224 1225 try { 1226 List<FileStatus> regionDirs = FSUtils.listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs)); 1227 if (regionDirs == null) { 1228 return finalResultMap; 1229 } 1230 1231 final List<Future<?>> futures = new ArrayList<>(regionDirs.size()); 1232 1233 for (FileStatus regionDir : regionDirs) { 1234 if (null != errors) { 1235 errors.progress(); 1236 } 1237 final Path dd = regionDir.getPath(); 1238 1239 if (!exceptions.isEmpty()) { 1240 break; 1241 } 1242 1243 Runnable getRegionStoreFileMapCall = new Runnable() { 1244 @Override 1245 public void run() { 1246 try { 1247 HashMap<String,Path> regionStoreFileMap = new HashMap<>(); 1248 List<FileStatus> familyDirs = FSUtils.listStatusWithStatusFilter(fs, dd, familyFilter); 1249 if (familyDirs == null) { 1250 if (!fs.exists(dd)) { 1251 LOG.warn("Skipping region because it no longer exists: " + dd); 1252 } else { 1253 LOG.warn("Skipping region because it has no family dirs: " + dd); 1254 } 1255 return; 1256 } 1257 for (FileStatus familyDir : familyDirs) { 1258 if (null != errors) { 1259 errors.progress(); 1260 } 1261 Path family = familyDir.getPath(); 1262 if (family.getName().equals(HConstants.RECOVERED_EDITS_DIR)) { 1263 continue; 1264 } 1265 // now in family, iterate over the StoreFiles and 1266 // put in map 1267 FileStatus[] familyStatus = fs.listStatus(family); 1268 for (FileStatus sfStatus : familyStatus) { 1269 if (null != errors) { 1270 errors.progress(); 1271 } 1272 Path sf = sfStatus.getPath(); 1273 if (sfFilter == null || sfFilter.accept(sf)) { 1274 regionStoreFileMap.put( sf.getName(), sf); 1275 } 1276 } 1277 } 1278 finalResultMap.putAll(regionStoreFileMap); 1279 } catch (Exception e) { 1280 LOG.error("Could not get region store file map for region: " + dd, e); 1281 exceptions.add(e); 1282 } 1283 } 1284 }; 1285 1286 // If executor is available, submit async tasks to exec concurrently, otherwise 1287 // just do serial sync execution 1288 if (executor != null) { 1289 Future<?> future = executor.submit(getRegionStoreFileMapCall); 1290 futures.add(future); 1291 } else { 1292 FutureTask<?> future = new FutureTask<>(getRegionStoreFileMapCall, null); 1293 future.run(); 1294 futures.add(future); 1295 } 1296 } 1297 1298 // Ensure all pending tasks are complete (or that we run into an exception) 1299 for (Future<?> f : futures) { 1300 if (!exceptions.isEmpty()) { 1301 break; 1302 } 1303 try { 1304 f.get(); 1305 } catch (ExecutionException e) { 1306 LOG.error("Unexpected exec exception! Should've been caught already. (Bug?)", e); 1307 // Shouldn't happen, we already logged/caught any exceptions in the Runnable 1308 } 1309 } 1310 } catch (IOException e) { 1311 LOG.error("Cannot execute getTableStoreFilePathMap for " + tableName, e); 1312 exceptions.add(e); 1313 } finally { 1314 if (!exceptions.isEmpty()) { 1315 // Just throw the first exception as an indication something bad happened 1316 // Don't need to propagate all the exceptions, we already logged them all anyway 1317 Throwables.propagateIfInstanceOf(exceptions.firstElement(), IOException.class); 1318 throw Throwables.propagate(exceptions.firstElement()); 1319 } 1320 } 1321 1322 return finalResultMap; 1323 } 1324 1325 public static int getRegionReferenceFileCount(final FileSystem fs, final Path p) { 1326 int result = 0; 1327 try { 1328 for (Path familyDir:getFamilyDirs(fs, p)){ 1329 result += getReferenceFilePaths(fs, familyDir).size(); 1330 } 1331 } catch (IOException e) { 1332 LOG.warn("Error Counting reference files.", e); 1333 } 1334 return result; 1335 } 1336 1337 /** 1338 * Runs through the HBase rootdir and creates a reverse lookup map for 1339 * table StoreFile names to the full Path. 1340 * <br> 1341 * Example...<br> 1342 * Key = 3944417774205889744 <br> 1343 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1344 * 1345 * @param fs The file system to use. 1346 * @param hbaseRootDir The root directory to scan. 1347 * @return Map keyed by StoreFile name with a value of the full Path. 1348 * @throws IOException When scanning the directory fails. 1349 * @throws InterruptedException 1350 */ 1351 public static Map<String, Path> getTableStoreFilePathMap( 1352 final FileSystem fs, final Path hbaseRootDir) 1353 throws IOException, InterruptedException { 1354 return getTableStoreFilePathMap(fs, hbaseRootDir, null, null, null); 1355 } 1356 1357 /** 1358 * Runs through the HBase rootdir and creates a reverse lookup map for 1359 * table StoreFile names to the full Path. 1360 * <br> 1361 * Example...<br> 1362 * Key = 3944417774205889744 <br> 1363 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 1364 * 1365 * @param fs The file system to use. 1366 * @param hbaseRootDir The root directory to scan. 1367 * @param sfFilter optional path filter to apply to store files 1368 * @param executor optional executor service to parallelize this operation 1369 * @param errors ErrorReporter instance or null 1370 * @return Map keyed by StoreFile name with a value of the full Path. 1371 * @throws IOException When scanning the directory fails. 1372 * @throws InterruptedException 1373 */ 1374 public static Map<String, Path> getTableStoreFilePathMap( 1375 final FileSystem fs, final Path hbaseRootDir, PathFilter sfFilter, 1376 ExecutorService executor, HbckErrorReporter errors) 1377 throws IOException, InterruptedException { 1378 ConcurrentHashMap<String, Path> map = new ConcurrentHashMap<>(1024, 0.75f, 32); 1379 1380 // if this method looks similar to 'getTableFragmentation' that is because 1381 // it was borrowed from it. 1382 1383 // only include the directory paths to tables 1384 for (Path tableDir : FSUtils.getTableDirs(fs, hbaseRootDir)) { 1385 getTableStoreFilePathMap(map, fs, hbaseRootDir, 1386 FSUtils.getTableName(tableDir), sfFilter, executor, errors); 1387 } 1388 return map; 1389 } 1390 1391 /** 1392 * Filters FileStatuses in an array and returns a list 1393 * 1394 * @param input An array of FileStatuses 1395 * @param filter A required filter to filter the array 1396 * @return A list of FileStatuses 1397 */ 1398 public static List<FileStatus> filterFileStatuses(FileStatus[] input, 1399 FileStatusFilter filter) { 1400 if (input == null) return null; 1401 return filterFileStatuses(Iterators.forArray(input), filter); 1402 } 1403 1404 /** 1405 * Filters FileStatuses in an iterator and returns a list 1406 * 1407 * @param input An iterator of FileStatuses 1408 * @param filter A required filter to filter the array 1409 * @return A list of FileStatuses 1410 */ 1411 public static List<FileStatus> filterFileStatuses(Iterator<FileStatus> input, 1412 FileStatusFilter filter) { 1413 if (input == null) return null; 1414 ArrayList<FileStatus> results = new ArrayList<>(); 1415 while (input.hasNext()) { 1416 FileStatus f = input.next(); 1417 if (filter.accept(f)) { 1418 results.add(f); 1419 } 1420 } 1421 return results; 1422 } 1423 1424 /** 1425 * Calls fs.listStatus() and treats FileNotFoundException as non-fatal 1426 * This accommodates differences between hadoop versions, where hadoop 1 1427 * does not throw a FileNotFoundException, and return an empty FileStatus[] 1428 * while Hadoop 2 will throw FileNotFoundException. 1429 * 1430 * @param fs file system 1431 * @param dir directory 1432 * @param filter file status filter 1433 * @return null if dir is empty or doesn't exist, otherwise FileStatus list 1434 */ 1435 public static List<FileStatus> listStatusWithStatusFilter(final FileSystem fs, 1436 final Path dir, final FileStatusFilter filter) throws IOException { 1437 FileStatus [] status = null; 1438 try { 1439 status = fs.listStatus(dir); 1440 } catch (FileNotFoundException fnfe) { 1441 // if directory doesn't exist, return null 1442 if (LOG.isTraceEnabled()) { 1443 LOG.trace(dir + " doesn't exist"); 1444 } 1445 } 1446 1447 if (status == null || status.length < 1) { 1448 return null; 1449 } 1450 1451 if (filter == null) { 1452 return Arrays.asList(status); 1453 } else { 1454 List<FileStatus> status2 = filterFileStatuses(status, filter); 1455 if (status2 == null || status2.isEmpty()) { 1456 return null; 1457 } else { 1458 return status2; 1459 } 1460 } 1461 } 1462 1463 /** 1464 * Throw an exception if an action is not permitted by a user on a file. 1465 * 1466 * @param ugi 1467 * the user 1468 * @param file 1469 * the file 1470 * @param action 1471 * the action 1472 */ 1473 public static void checkAccess(UserGroupInformation ugi, FileStatus file, 1474 FsAction action) throws AccessDeniedException { 1475 if (ugi.getShortUserName().equals(file.getOwner())) { 1476 if (file.getPermission().getUserAction().implies(action)) { 1477 return; 1478 } 1479 } else if (contains(ugi.getGroupNames(), file.getGroup())) { 1480 if (file.getPermission().getGroupAction().implies(action)) { 1481 return; 1482 } 1483 } else if (file.getPermission().getOtherAction().implies(action)) { 1484 return; 1485 } 1486 throw new AccessDeniedException("Permission denied:" + " action=" + action 1487 + " path=" + file.getPath() + " user=" + ugi.getShortUserName()); 1488 } 1489 1490 private static boolean contains(String[] groups, String user) { 1491 for (String group : groups) { 1492 if (group.equals(user)) { 1493 return true; 1494 } 1495 } 1496 return false; 1497 } 1498 1499 /** 1500 * This function is to scan the root path of the file system to get the 1501 * degree of locality for each region on each of the servers having at least 1502 * one block of that region. 1503 * This is used by the tool {@link org.apache.hadoop.hbase.master.RegionPlacementMaintainer} 1504 * 1505 * @param conf 1506 * the configuration to use 1507 * @return the mapping from region encoded name to a map of server names to 1508 * locality fraction 1509 * @throws IOException 1510 * in case of file system errors or interrupts 1511 */ 1512 public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS( 1513 final Configuration conf) throws IOException { 1514 return getRegionDegreeLocalityMappingFromFS( 1515 conf, null, 1516 conf.getInt(THREAD_POOLSIZE, DEFAULT_THREAD_POOLSIZE)); 1517 1518 } 1519 1520 /** 1521 * This function is to scan the root path of the file system to get the 1522 * degree of locality for each region on each of the servers having at least 1523 * one block of that region. 1524 * 1525 * @param conf 1526 * the configuration to use 1527 * @param desiredTable 1528 * the table you wish to scan locality for 1529 * @param threadPoolSize 1530 * the thread pool size to use 1531 * @return the mapping from region encoded name to a map of server names to 1532 * locality fraction 1533 * @throws IOException 1534 * in case of file system errors or interrupts 1535 */ 1536 public static Map<String, Map<String, Float>> getRegionDegreeLocalityMappingFromFS( 1537 final Configuration conf, final String desiredTable, int threadPoolSize) 1538 throws IOException { 1539 Map<String, Map<String, Float>> regionDegreeLocalityMapping = new ConcurrentHashMap<>(); 1540 getRegionLocalityMappingFromFS(conf, desiredTable, threadPoolSize, null, 1541 regionDegreeLocalityMapping); 1542 return regionDegreeLocalityMapping; 1543 } 1544 1545 /** 1546 * This function is to scan the root path of the file system to get either the 1547 * mapping between the region name and its best locality region server or the 1548 * degree of locality of each region on each of the servers having at least 1549 * one block of that region. The output map parameters are both optional. 1550 * 1551 * @param conf 1552 * the configuration to use 1553 * @param desiredTable 1554 * the table you wish to scan locality for 1555 * @param threadPoolSize 1556 * the thread pool size to use 1557 * @param regionToBestLocalityRSMapping 1558 * the map into which to put the best locality mapping or null 1559 * @param regionDegreeLocalityMapping 1560 * the map into which to put the locality degree mapping or null, 1561 * must be a thread-safe implementation 1562 * @throws IOException 1563 * in case of file system errors or interrupts 1564 */ 1565 private static void getRegionLocalityMappingFromFS( 1566 final Configuration conf, final String desiredTable, 1567 int threadPoolSize, 1568 Map<String, String> regionToBestLocalityRSMapping, 1569 Map<String, Map<String, Float>> regionDegreeLocalityMapping) 1570 throws IOException { 1571 FileSystem fs = FileSystem.get(conf); 1572 Path rootPath = FSUtils.getRootDir(conf); 1573 long startTime = EnvironmentEdgeManager.currentTime(); 1574 Path queryPath; 1575 // The table files are in ${hbase.rootdir}/data/<namespace>/<table>/* 1576 if (null == desiredTable) { 1577 queryPath = new Path(new Path(rootPath, HConstants.BASE_NAMESPACE_DIR).toString() + "/*/*/*/"); 1578 } else { 1579 queryPath = new Path(FSUtils.getTableDir(rootPath, TableName.valueOf(desiredTable)).toString() + "/*/"); 1580 } 1581 1582 // reject all paths that are not appropriate 1583 PathFilter pathFilter = new PathFilter() { 1584 @Override 1585 public boolean accept(Path path) { 1586 // this is the region name; it may get some noise data 1587 if (null == path) { 1588 return false; 1589 } 1590 1591 // no parent? 1592 Path parent = path.getParent(); 1593 if (null == parent) { 1594 return false; 1595 } 1596 1597 String regionName = path.getName(); 1598 if (null == regionName) { 1599 return false; 1600 } 1601 1602 if (!regionName.toLowerCase(Locale.ROOT).matches("[0-9a-f]+")) { 1603 return false; 1604 } 1605 return true; 1606 } 1607 }; 1608 1609 FileStatus[] statusList = fs.globStatus(queryPath, pathFilter); 1610 1611 if (null == statusList) { 1612 return; 1613 } else { 1614 LOG.debug("Query Path: " + queryPath + " ; # list of files: " + 1615 statusList.length); 1616 } 1617 1618 // lower the number of threads in case we have very few expected regions 1619 threadPoolSize = Math.min(threadPoolSize, statusList.length); 1620 1621 // run in multiple threads 1622 ThreadPoolExecutor tpe = new ThreadPoolExecutor(threadPoolSize, 1623 threadPoolSize, 60, TimeUnit.SECONDS, 1624 new ArrayBlockingQueue<>(statusList.length)); 1625 try { 1626 // ignore all file status items that are not of interest 1627 for (FileStatus regionStatus : statusList) { 1628 if (null == regionStatus) { 1629 continue; 1630 } 1631 1632 if (!regionStatus.isDirectory()) { 1633 continue; 1634 } 1635 1636 Path regionPath = regionStatus.getPath(); 1637 if (null == regionPath) { 1638 continue; 1639 } 1640 1641 tpe.execute(new FSRegionScanner(fs, regionPath, 1642 regionToBestLocalityRSMapping, regionDegreeLocalityMapping)); 1643 } 1644 } finally { 1645 tpe.shutdown(); 1646 int threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 1647 60 * 1000); 1648 try { 1649 // here we wait until TPE terminates, which is either naturally or by 1650 // exceptions in the execution of the threads 1651 while (!tpe.awaitTermination(threadWakeFrequency, 1652 TimeUnit.MILLISECONDS)) { 1653 // printing out rough estimate, so as to not introduce 1654 // AtomicInteger 1655 LOG.info("Locality checking is underway: { Scanned Regions : " 1656 + tpe.getCompletedTaskCount() + "/" 1657 + tpe.getTaskCount() + " }"); 1658 } 1659 } catch (InterruptedException e) { 1660 throw (InterruptedIOException)new InterruptedIOException().initCause(e); 1661 } 1662 } 1663 1664 long overhead = EnvironmentEdgeManager.currentTime() - startTime; 1665 String overheadMsg = "Scan DFS for locality info takes " + overhead + " ms"; 1666 1667 LOG.info(overheadMsg); 1668 } 1669 1670 /** 1671 * Do our short circuit read setup. 1672 * Checks buffer size to use and whether to do checksumming in hbase or hdfs. 1673 * @param conf 1674 */ 1675 public static void setupShortCircuitRead(final Configuration conf) { 1676 // Check that the user has not set the "dfs.client.read.shortcircuit.skip.checksum" property. 1677 boolean shortCircuitSkipChecksum = 1678 conf.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false); 1679 boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true); 1680 if (shortCircuitSkipChecksum) { 1681 LOG.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not " + 1682 "be set to true." + (useHBaseChecksum ? " HBase checksum doesn't require " + 1683 "it, see https://issues.apache.org/jira/browse/HBASE-6868." : "")); 1684 assert !shortCircuitSkipChecksum; //this will fail if assertions are on 1685 } 1686 checkShortCircuitReadBufferSize(conf); 1687 } 1688 1689 /** 1690 * Check if short circuit read buffer size is set and if not, set it to hbase value. 1691 * @param conf 1692 */ 1693 public static void checkShortCircuitReadBufferSize(final Configuration conf) { 1694 final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2; 1695 final int notSet = -1; 1696 // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2 1697 final String dfsKey = "dfs.client.read.shortcircuit.buffer.size"; 1698 int size = conf.getInt(dfsKey, notSet); 1699 // If a size is set, return -- we will use it. 1700 if (size != notSet) return; 1701 // But short circuit buffer size is normally not set. Put in place the hbase wanted size. 1702 int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize); 1703 conf.setIfUnset(dfsKey, Integer.toString(hbaseSize)); 1704 } 1705 1706 /** 1707 * @param c 1708 * @return The DFSClient DFSHedgedReadMetrics instance or null if can't be found or not on hdfs. 1709 * @throws IOException 1710 */ 1711 public static DFSHedgedReadMetrics getDFSHedgedReadMetrics(final Configuration c) 1712 throws IOException { 1713 if (!isHDFS(c)) return null; 1714 // getHedgedReadMetrics is package private. Get the DFSClient instance that is internal 1715 // to the DFS FS instance and make the method getHedgedReadMetrics accessible, then invoke it 1716 // to get the singleton instance of DFSHedgedReadMetrics shared by DFSClients. 1717 final String name = "getHedgedReadMetrics"; 1718 DFSClient dfsclient = ((DistributedFileSystem)FileSystem.get(c)).getClient(); 1719 Method m; 1720 try { 1721 m = dfsclient.getClass().getDeclaredMethod(name); 1722 } catch (NoSuchMethodException e) { 1723 LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " + 1724 e.getMessage()); 1725 return null; 1726 } catch (SecurityException e) { 1727 LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " + 1728 e.getMessage()); 1729 return null; 1730 } 1731 m.setAccessible(true); 1732 try { 1733 return (DFSHedgedReadMetrics)m.invoke(dfsclient); 1734 } catch (IllegalAccessException e) { 1735 LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " + 1736 e.getMessage()); 1737 return null; 1738 } catch (IllegalArgumentException e) { 1739 LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " + 1740 e.getMessage()); 1741 return null; 1742 } catch (InvocationTargetException e) { 1743 LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " + 1744 e.getMessage()); 1745 return null; 1746 } 1747 } 1748 1749}