001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.cleaner; 019 020import java.io.FileNotFoundException; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Arrays; 024import java.util.Comparator; 025import java.util.HashMap; 026import java.util.List; 027import java.util.Map; 028import java.util.concurrent.CompletableFuture; 029import java.util.concurrent.atomic.AtomicBoolean; 030import java.util.stream.Collectors; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FileStatus; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException; 036import org.apache.hadoop.hbase.ScheduledChore; 037import org.apache.hadoop.hbase.Stoppable; 038import org.apache.hadoop.hbase.util.FutureUtils; 039import org.apache.hadoop.ipc.RemoteException; 040import org.apache.yetus.audience.InterfaceAudience; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 045import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; 046import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 047import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 048 049/** 050 * Abstract Cleaner that uses a chain of delegates to clean a directory of files 051 * @param <T> Cleaner delegate class that is dynamically loaded from configuration 052 */ 053@InterfaceAudience.Private 054public abstract class CleanerChore<T extends FileCleanerDelegate> extends ScheduledChore { 055 056 private static final Logger LOG = LoggerFactory.getLogger(CleanerChore.class); 057 private static final int AVAIL_PROCESSORS = Runtime.getRuntime().availableProcessors(); 058 059 /** 060 * Configures the threadpool used for scanning the archive directory for the HFileCleaner If it is 061 * an integer and >= 1, it would be the size; if 0.0 < size <= 1.0, size would be available 062 * processors * size. Pay attention that 1.0 is different from 1, former indicates it will use 063 * 100% of cores, while latter will use only 1 thread for chore to scan dir. 064 */ 065 public static final String CHORE_POOL_SIZE = "hbase.cleaner.scan.dir.concurrent.size"; 066 static final String DEFAULT_CHORE_POOL_SIZE = "0.25"; 067 /** 068 * Configures the threadpool used for scanning the Old logs directory for the LogCleaner Follows 069 * the same configuration mechanism as CHORE_POOL_SIZE, but has a default of 1 thread. 070 */ 071 public static final String LOG_CLEANER_CHORE_SIZE = "hbase.log.cleaner.scan.dir.concurrent.size"; 072 static final String DEFAULT_LOG_CLEANER_CHORE_POOL_SIZE = "1"; 073 /** 074 * Enable the CleanerChore to sort the subdirectories by consumed space and start the cleaning 075 * with the largest subdirectory. Enabled by default. 076 */ 077 public static final String LOG_CLEANER_CHORE_DIRECTORY_SORTING = 078 "hbase.cleaner.directory.sorting"; 079 static final boolean DEFAULT_LOG_CLEANER_CHORE_DIRECTORY_SORTING = true; 080 081 private final DirScanPool pool; 082 083 protected final FileSystem fs; 084 private final Path oldFileDir; 085 private final Configuration conf; 086 protected final Map<String, Object> params; 087 private final AtomicBoolean enabled = new AtomicBoolean(true); 088 protected List<T> cleanersChain; 089 protected List<String> excludeDirs; 090 private CompletableFuture<Boolean> future; 091 private boolean forceRun; 092 private boolean sortDirectories; 093 094 public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf, 095 FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool) { 096 this(name, sleepPeriod, s, conf, fs, oldFileDir, confKey, pool, null, null); 097 } 098 099 /** 100 * @param name name of the chore being run 101 * @param sleepPeriod the period of time to sleep between each run 102 * @param s the stopper 103 * @param conf configuration to use 104 * @param fs handle to the FS 105 * @param oldFileDir the path to the archived files 106 * @param confKey configuration key for the classes to instantiate 107 * @param pool the thread pool used to scan directories 108 * @param params members could be used in cleaner 109 */ 110 public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf, 111 FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool, Map<String, Object> params, 112 List<Path> excludePaths) { 113 super(name, s, sleepPeriod); 114 115 Preconditions.checkNotNull(pool, "Chore's pool can not be null"); 116 this.pool = pool; 117 this.fs = fs; 118 this.oldFileDir = oldFileDir; 119 this.conf = conf; 120 this.params = params; 121 if (excludePaths != null && !excludePaths.isEmpty()) { 122 excludeDirs = new ArrayList<>(excludePaths.size()); 123 for (Path path : excludePaths) { 124 StringBuilder dirPart = new StringBuilder(path.toString()); 125 if (!path.toString().endsWith("/")) { 126 dirPart.append("/"); 127 } 128 excludeDirs.add(dirPart.toString()); 129 } 130 } 131 if (excludeDirs != null) { 132 LOG.info("Cleaner {} excludes sub dirs: {}", name, excludeDirs); 133 } 134 sortDirectories = conf.getBoolean(LOG_CLEANER_CHORE_DIRECTORY_SORTING, 135 DEFAULT_LOG_CLEANER_CHORE_DIRECTORY_SORTING); 136 initCleanerChain(confKey); 137 } 138 139 /** 140 * Calculate size for cleaner pool. 141 * @param poolSize size from configuration 142 * @return size of pool after calculation 143 */ 144 static int calculatePoolSize(String poolSize) { 145 if (poolSize.matches("[1-9][0-9]*")) { 146 // If poolSize is an integer, return it directly, 147 // but upmost to the number of available processors. 148 int size = Math.min(Integer.parseInt(poolSize), AVAIL_PROCESSORS); 149 if (size == AVAIL_PROCESSORS) { 150 LOG.warn("Use full core processors to scan dir, size={}", size); 151 } 152 return size; 153 } else if (poolSize.matches("0.[0-9]+|1.0")) { 154 // if poolSize is a double, return poolSize * availableProcessors; 155 // Ensure that we always return at least one. 156 int computedThreads = (int) (AVAIL_PROCESSORS * Double.parseDouble(poolSize)); 157 if (computedThreads < 1) { 158 LOG.debug("Computed {} threads for CleanerChore, using 1 instead", computedThreads); 159 return 1; 160 } 161 return computedThreads; 162 } else { 163 LOG.error("Unrecognized value: " + poolSize + " for " + CHORE_POOL_SIZE 164 + ", use default config: " + DEFAULT_CHORE_POOL_SIZE + " instead."); 165 return calculatePoolSize(DEFAULT_CHORE_POOL_SIZE); 166 } 167 } 168 169 /** 170 * Validate the file to see if it even belongs in the directory. If it is valid, then the file 171 * will go through the cleaner delegates, but otherwise the file is just deleted. 172 * @param file full {@link Path} of the file to be checked 173 * @return <tt>true</tt> if the file is valid, <tt>false</tt> otherwise 174 */ 175 protected abstract boolean validate(Path file); 176 177 /** 178 * Instantiate and initialize all the file cleaners set in the configuration 179 * @param confKey key to get the file cleaner classes from the configuration 180 */ 181 private void initCleanerChain(String confKey) { 182 this.cleanersChain = new ArrayList<>(); 183 String[] cleaners = conf.getStrings(confKey); 184 if (cleaners != null) { 185 for (String className : cleaners) { 186 className = className.trim(); 187 if (className.isEmpty()) { 188 continue; 189 } 190 T logCleaner = newFileCleaner(className, conf); 191 if (logCleaner != null) { 192 LOG.info("Initialize cleaner={}", className); 193 this.cleanersChain.add(logCleaner); 194 } 195 } 196 } 197 } 198 199 /** 200 * A utility method to create new instances of LogCleanerDelegate based on the class name of the 201 * LogCleanerDelegate. 202 * @param className fully qualified class name of the LogCleanerDelegate 203 * @param conf used configuration 204 * @return the new instance 205 */ 206 private T newFileCleaner(String className, Configuration conf) { 207 try { 208 Class<? extends FileCleanerDelegate> c = 209 Class.forName(className).asSubclass(FileCleanerDelegate.class); 210 @SuppressWarnings("unchecked") 211 T cleaner = (T) c.getDeclaredConstructor().newInstance(); 212 cleaner.setConf(conf); 213 cleaner.init(this.params); 214 return cleaner; 215 } catch (Exception e) { 216 LOG.warn("Can NOT create CleanerDelegate={}", className, e); 217 // skipping if can't instantiate 218 return null; 219 } 220 } 221 222 @Override 223 protected boolean initialChore() { 224 synchronized (this) { 225 if (forceRun) { 226 // wake up the threads waiting in triggerCleanerNow, as a triggerNow may triggers the first 227 // loop where we will only call initialChore. We need to trigger another run immediately. 228 forceRun = false; 229 notifyAll(); 230 } 231 } 232 return true; 233 } 234 235 @Override 236 protected void chore() { 237 CompletableFuture<Boolean> f; 238 synchronized (this) { 239 if (!enabled.get()) { 240 if (!forceRun) { 241 LOG.trace("Cleaner chore {} disabled! Not cleaning.", getName()); 242 return; 243 } else { 244 LOG.info("Force executing cleaner chore {} when disabled", getName()); 245 } 246 } 247 if (future != null) { 248 LOG.warn("A cleaner chore {}'s run is in progress, give up running", getName()); 249 return; 250 } 251 f = new CompletableFuture<>(); 252 future = f; 253 notifyAll(); 254 } 255 pool.latchCountUp(); 256 try { 257 preRunCleaner(); 258 pool.execute(() -> traverseAndDelete(oldFileDir, true, f)); 259 if (f.get()) { 260 LOG.trace("Cleaned all files under {}", oldFileDir); 261 } else { 262 LOG.trace("Files outstanding under {}", oldFileDir); 263 } 264 } catch (Exception e) { 265 LOG.info("Failed to traverse and delete the dir: {}", oldFileDir, e); 266 } finally { 267 postRunCleaner(); 268 synchronized (this) { 269 future = null; 270 forceRun = false; 271 } 272 pool.latchCountDown(); 273 // After each cleaner chore, checks if received reconfigure notification while cleaning. 274 // First in cleaner turns off notification, to avoid another cleaner updating pool again. 275 // This cleaner is waiting for other cleaners finishing their jobs. 276 // To avoid missing next chore, only wait 0.8 * period, then shutdown. 277 pool.tryUpdatePoolSize((long) (0.8 * getTimeUnit().toMillis(getPeriod()))); 278 } 279 } 280 281 private void preRunCleaner() { 282 cleanersChain.forEach(FileCleanerDelegate::preClean); 283 } 284 285 private void postRunCleaner() { 286 cleanersChain.forEach(FileCleanerDelegate::postClean); 287 } 288 289 /** 290 * Trigger the cleaner immediately and return a CompletableFuture for getting the result. Return 291 * {@code true} means all the old files have been deleted, otherwise {@code false}. 292 */ 293 public synchronized CompletableFuture<Boolean> triggerCleanerNow() throws InterruptedException { 294 for (;;) { 295 if (future != null) { 296 return future; 297 } 298 forceRun = true; 299 if (!triggerNow()) { 300 return CompletableFuture.completedFuture(false); 301 } 302 wait(); 303 } 304 } 305 306 /** 307 * Sort the given list in (descending) order of the space each element takes 308 * @param dirs the list to sort, element in it should be directory (not file) 309 */ 310 private void sortByConsumedSpace(List<FileStatus> dirs) { 311 if (dirs == null || dirs.size() < 2) { 312 // no need to sort for empty or single directory 313 return; 314 } 315 dirs.sort(new Comparator<FileStatus>() { 316 HashMap<FileStatus, Long> directorySpaces = new HashMap<>(); 317 318 @Override 319 public int compare(FileStatus f1, FileStatus f2) { 320 long f1ConsumedSpace = getSpace(f1); 321 long f2ConsumedSpace = getSpace(f2); 322 return Long.compare(f2ConsumedSpace, f1ConsumedSpace); 323 } 324 325 private long getSpace(FileStatus f) { 326 Long cached = directorySpaces.get(f); 327 if (cached != null) { 328 return cached; 329 } 330 try { 331 long space = 332 f.isDirectory() ? fs.getContentSummary(f.getPath()).getSpaceConsumed() : f.getLen(); 333 directorySpaces.put(f, space); 334 return space; 335 } catch (IOException e) { 336 LOG.trace("Failed to get space consumed by path={}", f, e); 337 return -1; 338 } 339 } 340 }); 341 } 342 343 /** 344 * Run the given files through each of the cleaners to see if it should be deleted, deleting it if 345 * necessary. 346 * @param files List of FileStatus for the files to check (and possibly delete) 347 * @return true iff successfully deleted all files 348 */ 349 private boolean checkAndDeleteFiles(List<FileStatus> files) { 350 if (files == null) { 351 return true; 352 } 353 354 // first check to see if the path is valid 355 List<FileStatus> validFiles = Lists.newArrayListWithCapacity(files.size()); 356 List<FileStatus> invalidFiles = Lists.newArrayList(); 357 for (FileStatus file : files) { 358 if (validate(file.getPath())) { 359 validFiles.add(file); 360 } else { 361 LOG.warn("Found a wrongly formatted file: " + file.getPath() + " - will delete it."); 362 invalidFiles.add(file); 363 } 364 } 365 366 Iterable<FileStatus> deletableValidFiles = validFiles; 367 // check each of the cleaners for the valid files 368 for (T cleaner : cleanersChain) { 369 if (cleaner.isStopped() || this.getStopper().isStopped()) { 370 LOG.warn("A file cleaner" + this.getName() + " is stopped, won't delete any more files in:" 371 + this.oldFileDir); 372 return false; 373 } 374 375 Iterable<FileStatus> filteredFiles = cleaner.getDeletableFiles(deletableValidFiles); 376 377 // trace which cleaner is holding on to each file 378 if (LOG.isTraceEnabled()) { 379 ImmutableSet<FileStatus> filteredFileSet = ImmutableSet.copyOf(filteredFiles); 380 for (FileStatus file : deletableValidFiles) { 381 if (!filteredFileSet.contains(file)) { 382 LOG.trace(file.getPath() + " is not deletable according to:" + cleaner); 383 } 384 } 385 } 386 387 deletableValidFiles = filteredFiles; 388 } 389 390 Iterable<FileStatus> filesToDelete = Iterables.concat(invalidFiles, deletableValidFiles); 391 return deleteFiles(filesToDelete) == files.size(); 392 } 393 394 /** 395 * Check if a empty directory with no subdirs or subfiles can be deleted 396 * @param dir Path of the directory 397 * @return True if the directory can be deleted, otherwise false 398 */ 399 private boolean isEmptyDirDeletable(Path dir) { 400 for (T cleaner : cleanersChain) { 401 if (cleaner.isStopped() || this.getStopper().isStopped()) { 402 LOG.warn("A file cleaner {} is stopped, won't delete the empty directory {}", 403 this.getName(), dir); 404 return false; 405 } 406 if (!cleaner.isEmptyDirDeletable(dir)) { 407 // If one of the cleaner need the empty directory, skip delete it 408 return false; 409 } 410 } 411 return true; 412 } 413 414 /** 415 * Delete the given files 416 * @param filesToDelete files to delete 417 * @return number of deleted files 418 */ 419 protected int deleteFiles(Iterable<FileStatus> filesToDelete) { 420 int deletedFileCount = 0; 421 for (FileStatus file : filesToDelete) { 422 Path filePath = file.getPath(); 423 LOG.trace("Removing {} from archive", filePath); 424 try { 425 boolean success = this.fs.delete(filePath, false); 426 if (success) { 427 deletedFileCount++; 428 } else { 429 LOG.warn("Attempted to delete:" + filePath 430 + ", but couldn't. Run cleaner chain and attempt to delete on next pass."); 431 } 432 } catch (IOException e) { 433 e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; 434 LOG.warn("Error while deleting: " + filePath, e); 435 } 436 } 437 return deletedFileCount; 438 } 439 440 @Override 441 public synchronized void cleanup() { 442 for (T lc : this.cleanersChain) { 443 try { 444 lc.stop("Exiting"); 445 } catch (Throwable t) { 446 LOG.warn("Stopping", t); 447 } 448 } 449 } 450 451 int getChorePoolSize() { 452 return pool.getSize(); 453 } 454 455 public boolean setEnabled(final boolean enabled) { 456 return this.enabled.getAndSet(enabled); 457 } 458 459 public boolean getEnabled() { 460 return this.enabled.get(); 461 } 462 463 private interface Action<T> { 464 T act() throws Exception; 465 } 466 467 /** 468 * Attempts to clean up a directory(its subdirectories, and files) in a 469 * {@link java.util.concurrent.ThreadPoolExecutor} concurrently. We can get the final result by 470 * calling result.get(). 471 */ 472 private void traverseAndDelete(Path dir, boolean root, CompletableFuture<Boolean> result) { 473 try { 474 // Step.1: List all files under the given directory. 475 List<FileStatus> allPaths = Arrays.asList(fs.listStatus(dir)); 476 List<FileStatus> subDirs = 477 allPaths.stream().filter(FileStatus::isDirectory).collect(Collectors.toList()); 478 List<FileStatus> files = 479 allPaths.stream().filter(FileStatus::isFile).collect(Collectors.toList()); 480 481 // Step.2: Try to delete all the deletable files. 482 boolean allFilesDeleted = 483 files.isEmpty() || deleteAction(() -> checkAndDeleteFiles(files), "files", dir); 484 485 // Step.3: Start to traverse and delete the sub-directories. 486 List<CompletableFuture<Boolean>> futures = new ArrayList<>(); 487 if (!subDirs.isEmpty()) { 488 if (sortDirectories) { 489 sortByConsumedSpace(subDirs); 490 } 491 // Submit the request of sub-directory deletion. 492 subDirs.forEach(subDir -> { 493 if (!shouldExclude(subDir)) { 494 CompletableFuture<Boolean> subFuture = new CompletableFuture<>(); 495 pool.execute(() -> traverseAndDelete(subDir.getPath(), false, subFuture)); 496 futures.add(subFuture); 497 } 498 }); 499 } 500 501 // Step.4: Once all sub-files & sub-directories are deleted, then can try to delete the 502 // current directory asynchronously. 503 FutureUtils.addListener( 504 CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])), 505 (voidObj, e) -> { 506 if (e != null) { 507 result.completeExceptionally(FutureUtils.unwrapCompletionException(e)); 508 return; 509 } 510 try { 511 boolean allSubDirsDeleted = futures.stream().allMatch(CompletableFuture::join); 512 boolean deleted = allFilesDeleted && allSubDirsDeleted && isEmptyDirDeletable(dir); 513 if (deleted && !root) { 514 // If and only if files and sub-dirs under current dir are deleted successfully, and 515 // the empty directory can be deleted, and it is not the root dir then task will 516 // try to delete it. 517 deleted = deleteAction(() -> fs.delete(dir, false), "dir", dir); 518 } 519 result.complete(deleted); 520 } catch (Exception ie) { 521 // Must handle the inner exception here, otherwise the result may get stuck if one 522 // sub-directory get some failure. 523 result.completeExceptionally(ie); 524 } 525 }); 526 } catch (Exception e) { 527 if (e instanceof FileNotFoundException) { 528 LOG.debug("Dir dose not exist, {}", dir); 529 } else { 530 LOG.error("Failed to traverse and delete the path: {}", dir, e); 531 } 532 result.completeExceptionally(e); 533 } 534 } 535 536 /** 537 * Check if a path should not perform clear 538 */ 539 private boolean shouldExclude(FileStatus f) { 540 if (!f.isDirectory()) { 541 return false; 542 } 543 if (excludeDirs != null && !excludeDirs.isEmpty()) { 544 for (String dirPart : excludeDirs) { 545 // since we make excludeDirs end with '/', 546 // if a path contains() the dirPart, the path should be excluded 547 if (f.getPath().toString().contains(dirPart)) { 548 return true; 549 } 550 } 551 } 552 return false; 553 } 554 555 /** 556 * Perform a delete on a specified type. 557 * @param deletion a delete 558 * @param type possible values are 'files', 'subdirs', 'dirs' 559 * @return true if it deleted successfully, false otherwise 560 */ 561 private boolean deleteAction(Action<Boolean> deletion, String type, Path dir) { 562 boolean deleted; 563 try { 564 LOG.trace("Start deleting {} under {}", type, dir); 565 deleted = deletion.act(); 566 } catch (PathIsNotEmptyDirectoryException exception) { 567 // N.B. HDFS throws this exception when we try to delete a non-empty directory, but 568 // LocalFileSystem throws a bare IOException. So some test code will get the verbose 569 // message below. 570 LOG.debug("Couldn't delete '{}' yet because it isn't empty w/exception.", dir, exception); 571 deleted = false; 572 } catch (IOException ioe) { 573 if (LOG.isTraceEnabled()) { 574 LOG.trace("Could not delete {} under {}; will retry. If it keeps happening, " 575 + "quote the exception when asking on mailing list.", type, dir, ioe); 576 } else { 577 LOG.info( 578 "Could not delete {} under {} because {}; will retry. If it keeps happening, enable" 579 + "TRACE-level logging and quote the exception when asking on mailing list.", 580 type, dir, ioe.getMessage()); 581 } 582 deleted = false; 583 } catch (Exception e) { 584 LOG.info("unexpected exception: ", e); 585 deleted = false; 586 } 587 LOG.trace("Finish deleting {} under {}, deleted = {}", type, dir, deleted); 588 return deleted; 589 } 590}