001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.cleaner; 019 020import java.io.FileNotFoundException; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Arrays; 024import java.util.Comparator; 025import java.util.HashMap; 026import java.util.LinkedList; 027import java.util.List; 028import java.util.Map; 029import java.util.concurrent.CompletableFuture; 030import java.util.concurrent.atomic.AtomicBoolean; 031import java.util.stream.Collectors; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FileStatus; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException; 037import org.apache.hadoop.hbase.ScheduledChore; 038import org.apache.hadoop.hbase.Stoppable; 039import org.apache.hadoop.hbase.util.FutureUtils; 040import org.apache.hadoop.ipc.RemoteException; 041import org.apache.yetus.audience.InterfaceAudience; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 046import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; 047import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 048import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 049 050/** 051 * Abstract Cleaner that uses a chain of delegates to clean a directory of files 052 * @param <T> Cleaner delegate class that is dynamically loaded from configuration 053 */ 054@InterfaceAudience.Private 055public abstract class CleanerChore<T extends FileCleanerDelegate> extends ScheduledChore { 056 057 private static final Logger LOG = LoggerFactory.getLogger(CleanerChore.class); 058 private static final int AVAIL_PROCESSORS = Runtime.getRuntime().availableProcessors(); 059 060 /** 061 * Configures the threadpool used for scanning the archive directory for the HFileCleaner If it is 062 * an integer and >= 1, it would be the size; if 0.0 < size <= 1.0, size would be available 063 * processors * size. Pay attention that 1.0 is different from 1, former indicates it will use 064 * 100% of cores, while latter will use only 1 thread for chore to scan dir. 065 */ 066 public static final String CHORE_POOL_SIZE = "hbase.cleaner.scan.dir.concurrent.size"; 067 static final String DEFAULT_CHORE_POOL_SIZE = "0.25"; 068 /** 069 * Configures the threadpool used for scanning the Old logs directory for the LogCleaner Follows 070 * the same configuration mechanism as CHORE_POOL_SIZE, but has a default of 1 thread. 071 */ 072 public static final String LOG_CLEANER_CHORE_SIZE = "hbase.log.cleaner.scan.dir.concurrent.size"; 073 static final String DEFAULT_LOG_CLEANER_CHORE_POOL_SIZE = "1"; 074 075 private final DirScanPool pool; 076 077 protected final FileSystem fs; 078 private final Path oldFileDir; 079 private final Configuration conf; 080 protected final Map<String, Object> params; 081 private final AtomicBoolean enabled = new AtomicBoolean(true); 082 protected List<T> cleanersChain; 083 protected List<String> excludeDirs; 084 085 public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf, 086 FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool) { 087 this(name, sleepPeriod, s, conf, fs, oldFileDir, confKey, pool, null, null); 088 } 089 090 /** 091 * @param name name of the chore being run 092 * @param sleepPeriod the period of time to sleep between each run 093 * @param s the stopper 094 * @param conf configuration to use 095 * @param fs handle to the FS 096 * @param oldFileDir the path to the archived files 097 * @param confKey configuration key for the classes to instantiate 098 * @param pool the thread pool used to scan directories 099 * @param params members could be used in cleaner 100 */ 101 public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf, 102 FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool, Map<String, Object> params, 103 List<Path> excludePaths) { 104 super(name, s, sleepPeriod); 105 106 Preconditions.checkNotNull(pool, "Chore's pool can not be null"); 107 this.pool = pool; 108 this.fs = fs; 109 this.oldFileDir = oldFileDir; 110 this.conf = conf; 111 this.params = params; 112 if (excludePaths != null && !excludePaths.isEmpty()) { 113 excludeDirs = new ArrayList<>(excludePaths.size()); 114 for (Path path : excludePaths) { 115 StringBuilder dirPart = new StringBuilder(path.toString()); 116 if (!path.toString().endsWith("/")) { 117 dirPart.append("/"); 118 } 119 excludeDirs.add(dirPart.toString()); 120 } 121 } 122 if (excludeDirs != null) { 123 LOG.info("Cleaner {} excludes sub dirs: {}", name, excludeDirs); 124 } 125 initCleanerChain(confKey); 126 } 127 128 /** 129 * Calculate size for cleaner pool. 130 * @param poolSize size from configuration 131 * @return size of pool after calculation 132 */ 133 static int calculatePoolSize(String poolSize) { 134 if (poolSize.matches("[1-9][0-9]*")) { 135 // If poolSize is an integer, return it directly, 136 // but upmost to the number of available processors. 137 int size = Math.min(Integer.parseInt(poolSize), AVAIL_PROCESSORS); 138 if (size == AVAIL_PROCESSORS) { 139 LOG.warn("Use full core processors to scan dir, size={}", size); 140 } 141 return size; 142 } else if (poolSize.matches("0.[0-9]+|1.0")) { 143 // if poolSize is a double, return poolSize * availableProcessors; 144 // Ensure that we always return at least one. 145 int computedThreads = (int) (AVAIL_PROCESSORS * Double.parseDouble(poolSize)); 146 if (computedThreads < 1) { 147 LOG.debug("Computed {} threads for CleanerChore, using 1 instead", computedThreads); 148 return 1; 149 } 150 return computedThreads; 151 } else { 152 LOG.error("Unrecognized value: " + poolSize + " for " + CHORE_POOL_SIZE 153 + ", use default config: " + DEFAULT_CHORE_POOL_SIZE + " instead."); 154 return calculatePoolSize(DEFAULT_CHORE_POOL_SIZE); 155 } 156 } 157 158 /** 159 * Validate the file to see if it even belongs in the directory. If it is valid, then the file 160 * will go through the cleaner delegates, but otherwise the file is just deleted. 161 * @param file full {@link Path} of the file to be checked 162 * @return <tt>true</tt> if the file is valid, <tt>false</tt> otherwise 163 */ 164 protected abstract boolean validate(Path file); 165 166 /** 167 * Instantiate and initialize all the file cleaners set in the configuration 168 * @param confKey key to get the file cleaner classes from the configuration 169 */ 170 private void initCleanerChain(String confKey) { 171 this.cleanersChain = new LinkedList<>(); 172 String[] logCleaners = conf.getStrings(confKey); 173 if (logCleaners != null) { 174 for (String className : logCleaners) { 175 className = className.trim(); 176 if (className.isEmpty()) { 177 continue; 178 } 179 T logCleaner = newFileCleaner(className, conf); 180 if (logCleaner != null) { 181 LOG.info("Initialize cleaner={}", className); 182 this.cleanersChain.add(logCleaner); 183 } 184 } 185 } 186 } 187 188 /** 189 * A utility method to create new instances of LogCleanerDelegate based on the class name of the 190 * LogCleanerDelegate. 191 * @param className fully qualified class name of the LogCleanerDelegate 192 * @param conf used configuration 193 * @return the new instance 194 */ 195 private T newFileCleaner(String className, Configuration conf) { 196 try { 197 Class<? extends FileCleanerDelegate> c = 198 Class.forName(className).asSubclass(FileCleanerDelegate.class); 199 @SuppressWarnings("unchecked") 200 T cleaner = (T) c.getDeclaredConstructor().newInstance(); 201 cleaner.setConf(conf); 202 cleaner.init(this.params); 203 return cleaner; 204 } catch (Exception e) { 205 LOG.warn("Can NOT create CleanerDelegate={}", className, e); 206 // skipping if can't instantiate 207 return null; 208 } 209 } 210 211 @Override 212 protected void chore() { 213 if (getEnabled()) { 214 try { 215 pool.latchCountUp(); 216 if (runCleaner()) { 217 LOG.trace("Cleaned all WALs under {}", oldFileDir); 218 } else { 219 LOG.trace("WALs outstanding under {}", oldFileDir); 220 } 221 } finally { 222 pool.latchCountDown(); 223 } 224 // After each cleaner chore, checks if received reconfigure notification while cleaning. 225 // First in cleaner turns off notification, to avoid another cleaner updating pool again. 226 // This cleaner is waiting for other cleaners finishing their jobs. 227 // To avoid missing next chore, only wait 0.8 * period, then shutdown. 228 pool.tryUpdatePoolSize((long) (0.8 * getTimeUnit().toMillis(getPeriod()))); 229 } else { 230 LOG.trace("Cleaner chore disabled! Not cleaning."); 231 } 232 } 233 234 private void preRunCleaner() { 235 cleanersChain.forEach(FileCleanerDelegate::preClean); 236 } 237 238 public boolean runCleaner() { 239 preRunCleaner(); 240 try { 241 CompletableFuture<Boolean> future = new CompletableFuture<>(); 242 pool.execute(() -> traverseAndDelete(oldFileDir, true, future)); 243 return future.get(); 244 } catch (Exception e) { 245 LOG.info("Failed to traverse and delete the dir: {}", oldFileDir, e); 246 return false; 247 } 248 } 249 250 /** 251 * Sort the given list in (descending) order of the space each element takes 252 * @param dirs the list to sort, element in it should be directory (not file) 253 */ 254 private void sortByConsumedSpace(List<FileStatus> dirs) { 255 if (dirs == null || dirs.size() < 2) { 256 // no need to sort for empty or single directory 257 return; 258 } 259 dirs.sort(new Comparator<FileStatus>() { 260 HashMap<FileStatus, Long> directorySpaces = new HashMap<>(); 261 262 @Override 263 public int compare(FileStatus f1, FileStatus f2) { 264 long f1ConsumedSpace = getSpace(f1); 265 long f2ConsumedSpace = getSpace(f2); 266 return Long.compare(f2ConsumedSpace, f1ConsumedSpace); 267 } 268 269 private long getSpace(FileStatus f) { 270 Long cached = directorySpaces.get(f); 271 if (cached != null) { 272 return cached; 273 } 274 try { 275 long space = 276 f.isDirectory() ? fs.getContentSummary(f.getPath()).getSpaceConsumed() : f.getLen(); 277 directorySpaces.put(f, space); 278 return space; 279 } catch (IOException e) { 280 LOG.trace("Failed to get space consumed by path={}", f, e); 281 return -1; 282 } 283 } 284 }); 285 } 286 287 /** 288 * Run the given files through each of the cleaners to see if it should be deleted, deleting it if 289 * necessary. 290 * @param files List of FileStatus for the files to check (and possibly delete) 291 * @return true iff successfully deleted all files 292 */ 293 private boolean checkAndDeleteFiles(List<FileStatus> files) { 294 if (files == null) { 295 return true; 296 } 297 298 // first check to see if the path is valid 299 List<FileStatus> validFiles = Lists.newArrayListWithCapacity(files.size()); 300 List<FileStatus> invalidFiles = Lists.newArrayList(); 301 for (FileStatus file : files) { 302 if (validate(file.getPath())) { 303 validFiles.add(file); 304 } else { 305 LOG.warn("Found a wrongly formatted file: " + file.getPath() + " - will delete it."); 306 invalidFiles.add(file); 307 } 308 } 309 310 Iterable<FileStatus> deletableValidFiles = validFiles; 311 // check each of the cleaners for the valid files 312 for (T cleaner : cleanersChain) { 313 if (cleaner.isStopped() || this.getStopper().isStopped()) { 314 LOG.warn("A file cleaner" + this.getName() + " is stopped, won't delete any more files in:" 315 + this.oldFileDir); 316 return false; 317 } 318 319 Iterable<FileStatus> filteredFiles = cleaner.getDeletableFiles(deletableValidFiles); 320 321 // trace which cleaner is holding on to each file 322 if (LOG.isTraceEnabled()) { 323 ImmutableSet<FileStatus> filteredFileSet = ImmutableSet.copyOf(filteredFiles); 324 for (FileStatus file : deletableValidFiles) { 325 if (!filteredFileSet.contains(file)) { 326 LOG.trace(file.getPath() + " is not deletable according to:" + cleaner); 327 } 328 } 329 } 330 331 deletableValidFiles = filteredFiles; 332 } 333 334 Iterable<FileStatus> filesToDelete = Iterables.concat(invalidFiles, deletableValidFiles); 335 return deleteFiles(filesToDelete) == files.size(); 336 } 337 338 /** 339 * Check if a empty directory with no subdirs or subfiles can be deleted 340 * @param dir Path of the directory 341 * @return True if the directory can be deleted, otherwise false 342 */ 343 private boolean isEmptyDirDeletable(Path dir) { 344 for (T cleaner : cleanersChain) { 345 if (cleaner.isStopped() || this.getStopper().isStopped()) { 346 LOG.warn("A file cleaner {} is stopped, won't delete the empty directory {}", 347 this.getName(), dir); 348 return false; 349 } 350 if (!cleaner.isEmptyDirDeletable(dir)) { 351 // If one of the cleaner need the empty directory, skip delete it 352 return false; 353 } 354 } 355 return true; 356 } 357 358 /** 359 * Delete the given files 360 * @param filesToDelete files to delete 361 * @return number of deleted files 362 */ 363 protected int deleteFiles(Iterable<FileStatus> filesToDelete) { 364 int deletedFileCount = 0; 365 for (FileStatus file : filesToDelete) { 366 Path filePath = file.getPath(); 367 LOG.trace("Removing {} from archive", filePath); 368 try { 369 boolean success = this.fs.delete(filePath, false); 370 if (success) { 371 deletedFileCount++; 372 } else { 373 LOG.warn("Attempted to delete:" + filePath 374 + ", but couldn't. Run cleaner chain and attempt to delete on next pass."); 375 } 376 } catch (IOException e) { 377 e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; 378 LOG.warn("Error while deleting: " + filePath, e); 379 } 380 } 381 return deletedFileCount; 382 } 383 384 @Override 385 public synchronized void cleanup() { 386 for (T lc : this.cleanersChain) { 387 try { 388 lc.stop("Exiting"); 389 } catch (Throwable t) { 390 LOG.warn("Stopping", t); 391 } 392 } 393 } 394 395 int getChorePoolSize() { 396 return pool.getSize(); 397 } 398 399 /** 400 * n 401 */ 402 public boolean setEnabled(final boolean enabled) { 403 return this.enabled.getAndSet(enabled); 404 } 405 406 public boolean getEnabled() { 407 return this.enabled.get(); 408 } 409 410 private interface Action<T> { 411 T act() throws Exception; 412 } 413 414 /** 415 * Attempts to clean up a directory(its subdirectories, and files) in a 416 * {@link java.util.concurrent.ThreadPoolExecutor} concurrently. We can get the final result by 417 * calling result.get(). 418 */ 419 private void traverseAndDelete(Path dir, boolean root, CompletableFuture<Boolean> result) { 420 try { 421 // Step.1: List all files under the given directory. 422 List<FileStatus> allPaths = Arrays.asList(fs.listStatus(dir)); 423 List<FileStatus> subDirs = 424 allPaths.stream().filter(FileStatus::isDirectory).collect(Collectors.toList()); 425 List<FileStatus> files = 426 allPaths.stream().filter(FileStatus::isFile).collect(Collectors.toList()); 427 428 // Step.2: Try to delete all the deletable files. 429 boolean allFilesDeleted = 430 files.isEmpty() || deleteAction(() -> checkAndDeleteFiles(files), "files", dir); 431 432 // Step.3: Start to traverse and delete the sub-directories. 433 List<CompletableFuture<Boolean>> futures = new ArrayList<>(); 434 if (!subDirs.isEmpty()) { 435 sortByConsumedSpace(subDirs); 436 // Submit the request of sub-directory deletion. 437 subDirs.forEach(subDir -> { 438 if (!shouldExclude(subDir)) { 439 CompletableFuture<Boolean> subFuture = new CompletableFuture<>(); 440 pool.execute(() -> traverseAndDelete(subDir.getPath(), false, subFuture)); 441 futures.add(subFuture); 442 } 443 }); 444 } 445 446 // Step.4: Once all sub-files & sub-directories are deleted, then can try to delete the 447 // current directory asynchronously. 448 FutureUtils.addListener( 449 CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])), 450 (voidObj, e) -> { 451 if (e != null) { 452 result.completeExceptionally(e); 453 return; 454 } 455 try { 456 boolean allSubDirsDeleted = futures.stream().allMatch(CompletableFuture::join); 457 boolean deleted = allFilesDeleted && allSubDirsDeleted && isEmptyDirDeletable(dir); 458 if (deleted && !root) { 459 // If and only if files and sub-dirs under current dir are deleted successfully, and 460 // the empty directory can be deleted, and it is not the root dir then task will 461 // try to delete it. 462 deleted = deleteAction(() -> fs.delete(dir, false), "dir", dir); 463 } 464 result.complete(deleted); 465 } catch (Exception ie) { 466 // Must handle the inner exception here, otherwise the result may get stuck if one 467 // sub-directory get some failure. 468 result.completeExceptionally(ie); 469 } 470 }); 471 } catch (Exception e) { 472 if (e instanceof FileNotFoundException) { 473 LOG.debug("Dir dose not exist, {}", dir); 474 } else { 475 LOG.error("Failed to traverse and delete the path: {}", dir, e); 476 } 477 result.completeExceptionally(e); 478 } 479 } 480 481 /** 482 * Check if a path should not perform clear 483 */ 484 private boolean shouldExclude(FileStatus f) { 485 if (!f.isDirectory()) { 486 return false; 487 } 488 if (excludeDirs != null && !excludeDirs.isEmpty()) { 489 for (String dirPart : excludeDirs) { 490 // since we make excludeDirs end with '/', 491 // if a path contains() the dirPart, the path should be excluded 492 if (f.getPath().toString().contains(dirPart)) { 493 return true; 494 } 495 } 496 } 497 return false; 498 } 499 500 /** 501 * Perform a delete on a specified type. 502 * @param deletion a delete 503 * @param type possible values are 'files', 'subdirs', 'dirs' 504 * @return true if it deleted successfully, false otherwise 505 */ 506 private boolean deleteAction(Action<Boolean> deletion, String type, Path dir) { 507 boolean deleted; 508 try { 509 LOG.trace("Start deleting {} under {}", type, dir); 510 deleted = deletion.act(); 511 } catch (PathIsNotEmptyDirectoryException exception) { 512 // N.B. HDFS throws this exception when we try to delete a non-empty directory, but 513 // LocalFileSystem throws a bare IOException. So some test code will get the verbose 514 // message below. 515 LOG.debug("Couldn't delete '{}' yet because it isn't empty w/exception.", dir, exception); 516 deleted = false; 517 } catch (IOException ioe) { 518 if (LOG.isTraceEnabled()) { 519 LOG.trace("Could not delete {} under {}; will retry. If it keeps happening, " 520 + "quote the exception when asking on mailing list.", type, dir, ioe); 521 } else { 522 LOG.info( 523 "Could not delete {} under {} because {}; will retry. If it keeps happening, enable" 524 + "TRACE-level logging and quote the exception when asking on mailing list.", 525 type, dir, ioe.getMessage()); 526 } 527 deleted = false; 528 } catch (Exception e) { 529 LOG.info("unexpected exception: ", e); 530 deleted = false; 531 } 532 LOG.trace("Finish deleting {} under {}, deleted = {}", type, dir, deleted); 533 return deleted; 534 } 535}