001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.cleaner; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Arrays; 023import java.util.Comparator; 024import java.util.HashMap; 025import java.util.LinkedList; 026import java.util.List; 027import java.util.Map; 028import java.util.concurrent.CompletableFuture; 029import java.util.concurrent.atomic.AtomicBoolean; 030import java.util.stream.Collectors; 031 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FileStatus; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException; 037import org.apache.hadoop.hbase.ScheduledChore; 038import org.apache.hadoop.hbase.Stoppable; 039import org.apache.hadoop.hbase.util.FutureUtils; 040import org.apache.hadoop.ipc.RemoteException; 041import org.apache.yetus.audience.InterfaceAudience; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 046import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 047import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; 048import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 049import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 050 051/** 052 * Abstract Cleaner that uses a chain of delegates to clean a directory of files 053 * @param <T> Cleaner delegate class that is dynamically loaded from configuration 054 */ 055@InterfaceAudience.Private 056public abstract class CleanerChore<T extends FileCleanerDelegate> extends ScheduledChore { 057 058 private static final Logger LOG = LoggerFactory.getLogger(CleanerChore.class); 059 private static final int AVAIL_PROCESSORS = Runtime.getRuntime().availableProcessors(); 060 061 /** 062 * If it is an integer and >= 1, it would be the size; 063 * if 0.0 < size <= 1.0, size would be available processors * size. 064 * Pay attention that 1.0 is different from 1, former indicates it will use 100% of cores, 065 * while latter will use only 1 thread for chore to scan dir. 066 */ 067 public static final String CHORE_POOL_SIZE = "hbase.cleaner.scan.dir.concurrent.size"; 068 static final String DEFAULT_CHORE_POOL_SIZE = "0.25"; 069 070 private final DirScanPool pool; 071 072 protected final FileSystem fs; 073 private final Path oldFileDir; 074 private final Configuration conf; 075 protected final Map<String, Object> params; 076 private final AtomicBoolean enabled = new AtomicBoolean(true); 077 protected List<T> cleanersChain; 078 079 public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf, 080 FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool) { 081 this(name, sleepPeriod, s, conf, fs, oldFileDir, confKey, pool, null); 082 } 083 084 /** 085 * @param name name of the chore being run 086 * @param sleepPeriod the period of time to sleep between each run 087 * @param s the stopper 088 * @param conf configuration to use 089 * @param fs handle to the FS 090 * @param oldFileDir the path to the archived files 091 * @param confKey configuration key for the classes to instantiate 092 * @param pool the thread pool used to scan directories 093 * @param params members could be used in cleaner 094 */ 095 public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf, 096 FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool, Map<String, Object> params) { 097 super(name, s, sleepPeriod); 098 099 Preconditions.checkNotNull(pool, "Chore's pool can not be null"); 100 this.pool = pool; 101 this.fs = fs; 102 this.oldFileDir = oldFileDir; 103 this.conf = conf; 104 this.params = params; 105 initCleanerChain(confKey); 106 } 107 108 /** 109 * Calculate size for cleaner pool. 110 * @param poolSize size from configuration 111 * @return size of pool after calculation 112 */ 113 static int calculatePoolSize(String poolSize) { 114 if (poolSize.matches("[1-9][0-9]*")) { 115 // If poolSize is an integer, return it directly, 116 // but upmost to the number of available processors. 117 int size = Math.min(Integer.parseInt(poolSize), AVAIL_PROCESSORS); 118 if (size == AVAIL_PROCESSORS) { 119 LOG.warn("Use full core processors to scan dir, size={}", size); 120 } 121 return size; 122 } else if (poolSize.matches("0.[0-9]+|1.0")) { 123 // if poolSize is a double, return poolSize * availableProcessors; 124 // Ensure that we always return at least one. 125 int computedThreads = (int) (AVAIL_PROCESSORS * Double.valueOf(poolSize)); 126 if (computedThreads < 1) { 127 LOG.debug("Computed {} threads for CleanerChore, using 1 instead", computedThreads); 128 return 1; 129 } 130 return computedThreads; 131 } else { 132 LOG.error("Unrecognized value: " + poolSize + " for " + CHORE_POOL_SIZE + 133 ", use default config: " + DEFAULT_CHORE_POOL_SIZE + " instead."); 134 return calculatePoolSize(DEFAULT_CHORE_POOL_SIZE); 135 } 136 } 137 138 /** 139 * Validate the file to see if it even belongs in the directory. If it is valid, then the file 140 * will go through the cleaner delegates, but otherwise the file is just deleted. 141 * @param file full {@link Path} of the file to be checked 142 * @return <tt>true</tt> if the file is valid, <tt>false</tt> otherwise 143 */ 144 protected abstract boolean validate(Path file); 145 146 /** 147 * Instantiate and initialize all the file cleaners set in the configuration 148 * @param confKey key to get the file cleaner classes from the configuration 149 */ 150 private void initCleanerChain(String confKey) { 151 this.cleanersChain = new LinkedList<>(); 152 String[] logCleaners = conf.getStrings(confKey); 153 if (logCleaners != null) { 154 for (String className : logCleaners) { 155 T logCleaner = newFileCleaner(className, conf); 156 if (logCleaner != null) { 157 LOG.info("Initialize cleaner={}", className); 158 this.cleanersChain.add(logCleaner); 159 } 160 } 161 } 162 } 163 164 /** 165 * A utility method to create new instances of LogCleanerDelegate based on the class name of the 166 * LogCleanerDelegate. 167 * @param className fully qualified class name of the LogCleanerDelegate 168 * @param conf used configuration 169 * @return the new instance 170 */ 171 private T newFileCleaner(String className, Configuration conf) { 172 try { 173 Class<? extends FileCleanerDelegate> c = Class.forName(className).asSubclass( 174 FileCleanerDelegate.class); 175 @SuppressWarnings("unchecked") 176 T cleaner = (T) c.getDeclaredConstructor().newInstance(); 177 cleaner.setConf(conf); 178 cleaner.init(this.params); 179 return cleaner; 180 } catch (Exception e) { 181 LOG.warn("Can NOT create CleanerDelegate={}", className, e); 182 // skipping if can't instantiate 183 return null; 184 } 185 } 186 187 @Override 188 protected void chore() { 189 if (getEnabled()) { 190 try { 191 pool.latchCountUp(); 192 if (runCleaner()) { 193 LOG.trace("Cleaned all WALs under {}", oldFileDir); 194 } else { 195 LOG.trace("WALs outstanding under {}", oldFileDir); 196 } 197 } finally { 198 pool.latchCountDown(); 199 } 200 // After each cleaner chore, checks if received reconfigure notification while cleaning. 201 // First in cleaner turns off notification, to avoid another cleaner updating pool again. 202 // This cleaner is waiting for other cleaners finishing their jobs. 203 // To avoid missing next chore, only wait 0.8 * period, then shutdown. 204 pool.tryUpdatePoolSize((long) (0.8 * getTimeUnit().toMillis(getPeriod()))); 205 } else { 206 LOG.trace("Cleaner chore disabled! Not cleaning."); 207 } 208 } 209 210 private void preRunCleaner() { 211 cleanersChain.forEach(FileCleanerDelegate::preClean); 212 } 213 214 public boolean runCleaner() { 215 preRunCleaner(); 216 try { 217 CompletableFuture<Boolean> future = new CompletableFuture<>(); 218 pool.execute(() -> traverseAndDelete(oldFileDir, true, future)); 219 return future.get(); 220 } catch (Exception e) { 221 LOG.info("Failed to traverse and delete the dir: {}", oldFileDir, e); 222 return false; 223 } 224 } 225 226 /** 227 * Sort the given list in (descending) order of the space each element takes 228 * @param dirs the list to sort, element in it should be directory (not file) 229 */ 230 private void sortByConsumedSpace(List<FileStatus> dirs) { 231 if (dirs == null || dirs.size() < 2) { 232 // no need to sort for empty or single directory 233 return; 234 } 235 dirs.sort(new Comparator<FileStatus>() { 236 HashMap<FileStatus, Long> directorySpaces = new HashMap<>(); 237 238 @Override 239 public int compare(FileStatus f1, FileStatus f2) { 240 long f1ConsumedSpace = getSpace(f1); 241 long f2ConsumedSpace = getSpace(f2); 242 return Long.compare(f2ConsumedSpace, f1ConsumedSpace); 243 } 244 245 private long getSpace(FileStatus f) { 246 Long cached = directorySpaces.get(f); 247 if (cached != null) { 248 return cached; 249 } 250 try { 251 long space = 252 f.isDirectory() ? fs.getContentSummary(f.getPath()).getSpaceConsumed() : f.getLen(); 253 directorySpaces.put(f, space); 254 return space; 255 } catch (IOException e) { 256 LOG.trace("Failed to get space consumed by path={}", f, e); 257 return -1; 258 } 259 } 260 }); 261 } 262 263 /** 264 * Run the given files through each of the cleaners to see if it should be deleted, deleting it if 265 * necessary. 266 * @param files List of FileStatus for the files to check (and possibly delete) 267 * @return true iff successfully deleted all files 268 */ 269 private boolean checkAndDeleteFiles(List<FileStatus> files) { 270 if (files == null) { 271 return true; 272 } 273 274 // first check to see if the path is valid 275 List<FileStatus> validFiles = Lists.newArrayListWithCapacity(files.size()); 276 List<FileStatus> invalidFiles = Lists.newArrayList(); 277 for (FileStatus file : files) { 278 if (validate(file.getPath())) { 279 validFiles.add(file); 280 } else { 281 LOG.warn("Found a wrongly formatted file: " + file.getPath() + " - will delete it."); 282 invalidFiles.add(file); 283 } 284 } 285 286 Iterable<FileStatus> deletableValidFiles = validFiles; 287 // check each of the cleaners for the valid files 288 for (T cleaner : cleanersChain) { 289 if (cleaner.isStopped() || this.getStopper().isStopped()) { 290 LOG.warn("A file cleaner" + this.getName() + " is stopped, won't delete any more files in:" 291 + this.oldFileDir); 292 return false; 293 } 294 295 Iterable<FileStatus> filteredFiles = cleaner.getDeletableFiles(deletableValidFiles); 296 297 // trace which cleaner is holding on to each file 298 if (LOG.isTraceEnabled()) { 299 ImmutableSet<FileStatus> filteredFileSet = ImmutableSet.copyOf(filteredFiles); 300 for (FileStatus file : deletableValidFiles) { 301 if (!filteredFileSet.contains(file)) { 302 LOG.trace(file.getPath() + " is not deletable according to:" + cleaner); 303 } 304 } 305 } 306 307 deletableValidFiles = filteredFiles; 308 } 309 310 Iterable<FileStatus> filesToDelete = Iterables.concat(invalidFiles, deletableValidFiles); 311 return deleteFiles(filesToDelete) == files.size(); 312 } 313 314 /** 315 * Check if a empty directory with no subdirs or subfiles can be deleted 316 * @param dir Path of the directory 317 * @return True if the directory can be deleted, otherwise false 318 */ 319 private boolean isEmptyDirDeletable(Path dir) { 320 for (T cleaner : cleanersChain) { 321 if (cleaner.isStopped() || this.getStopper().isStopped()) { 322 LOG.warn("A file cleaner {} is stopped, won't delete the empty directory {}", 323 this.getName(), dir); 324 return false; 325 } 326 if (!cleaner.isEmptyDirDeletable(dir)) { 327 // If one of the cleaner need the empty directory, skip delete it 328 return false; 329 } 330 } 331 return true; 332 } 333 334 /** 335 * Delete the given files 336 * @param filesToDelete files to delete 337 * @return number of deleted files 338 */ 339 protected int deleteFiles(Iterable<FileStatus> filesToDelete) { 340 int deletedFileCount = 0; 341 for (FileStatus file : filesToDelete) { 342 Path filePath = file.getPath(); 343 LOG.trace("Removing {} from archive", filePath); 344 try { 345 boolean success = this.fs.delete(filePath, false); 346 if (success) { 347 deletedFileCount++; 348 } else { 349 LOG.warn("Attempted to delete:" + filePath 350 + ", but couldn't. Run cleaner chain and attempt to delete on next pass."); 351 } 352 } catch (IOException e) { 353 e = e instanceof RemoteException ? 354 ((RemoteException)e).unwrapRemoteException() : e; 355 LOG.warn("Error while deleting: " + filePath, e); 356 } 357 } 358 return deletedFileCount; 359 } 360 361 @Override 362 public synchronized void cleanup() { 363 for (T lc : this.cleanersChain) { 364 try { 365 lc.stop("Exiting"); 366 } catch (Throwable t) { 367 LOG.warn("Stopping", t); 368 } 369 } 370 } 371 372 @VisibleForTesting 373 int getChorePoolSize() { 374 return pool.getSize(); 375 } 376 377 /** 378 * @param enabled 379 */ 380 public boolean setEnabled(final boolean enabled) { 381 return this.enabled.getAndSet(enabled); 382 } 383 384 public boolean getEnabled() { return this.enabled.get(); 385 } 386 387 private interface Action<T> { 388 T act() throws Exception; 389 } 390 391 /** 392 * Attempts to clean up a directory(its subdirectories, and files) in a 393 * {@link java.util.concurrent.ThreadPoolExecutor} concurrently. We can get the final result by 394 * calling result.get(). 395 */ 396 private void traverseAndDelete(Path dir, boolean root, CompletableFuture<Boolean> result) { 397 try { 398 // Step.1: List all files under the given directory. 399 List<FileStatus> allPaths = Arrays.asList(fs.listStatus(dir)); 400 List<FileStatus> subDirs = 401 allPaths.stream().filter(FileStatus::isDirectory).collect(Collectors.toList()); 402 List<FileStatus> files = 403 allPaths.stream().filter(FileStatus::isFile).collect(Collectors.toList()); 404 405 // Step.2: Try to delete all the deletable files. 406 boolean allFilesDeleted = 407 files.isEmpty() || deleteAction(() -> checkAndDeleteFiles(files), "files", dir); 408 409 // Step.3: Start to traverse and delete the sub-directories. 410 List<CompletableFuture<Boolean>> futures = new ArrayList<>(); 411 if (!subDirs.isEmpty()) { 412 sortByConsumedSpace(subDirs); 413 // Submit the request of sub-directory deletion. 414 subDirs.forEach(subDir -> { 415 CompletableFuture<Boolean> subFuture = new CompletableFuture<>(); 416 pool.execute(() -> traverseAndDelete(subDir.getPath(), false, subFuture)); 417 futures.add(subFuture); 418 }); 419 } 420 421 // Step.4: Once all sub-files & sub-directories are deleted, then can try to delete the 422 // current directory asynchronously. 423 FutureUtils.addListener( 424 CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])), 425 (voidObj, e) -> { 426 if (e != null) { 427 result.completeExceptionally(e); 428 return; 429 } 430 try { 431 boolean allSubDirsDeleted = futures.stream().allMatch(CompletableFuture::join); 432 boolean deleted = allFilesDeleted && allSubDirsDeleted && isEmptyDirDeletable(dir); 433 if (deleted && !root) { 434 // If and only if files and sub-dirs under current dir are deleted successfully, and 435 // the empty directory can be deleted, and it is not the root dir then task will 436 // try to delete it. 437 deleted = deleteAction(() -> fs.delete(dir, false), "dir", dir); 438 } 439 result.complete(deleted); 440 } catch (Exception ie) { 441 // Must handle the inner exception here, otherwise the result may get stuck if one 442 // sub-directory get some failure. 443 result.completeExceptionally(ie); 444 } 445 }); 446 } catch (Exception e) { 447 LOG.debug("Failed to traverse and delete the path: {}", dir, e); 448 result.completeExceptionally(e); 449 } 450 } 451 452 /** 453 * Perform a delete on a specified type. 454 * @param deletion a delete 455 * @param type possible values are 'files', 'subdirs', 'dirs' 456 * @return true if it deleted successfully, false otherwise 457 */ 458 private boolean deleteAction(Action<Boolean> deletion, String type, Path dir) { 459 boolean deleted; 460 try { 461 LOG.trace("Start deleting {} under {}", type, dir); 462 deleted = deletion.act(); 463 } catch (PathIsNotEmptyDirectoryException exception) { 464 // N.B. HDFS throws this exception when we try to delete a non-empty directory, but 465 // LocalFileSystem throws a bare IOException. So some test code will get the verbose 466 // message below. 467 LOG.debug("Couldn't delete '{}' yet because it isn't empty w/exception.", dir, exception); 468 deleted = false; 469 } catch (IOException ioe) { 470 LOG.info("Could not delete {} under {}. might be transient; we'll retry. if it keeps " 471 + "happening, use following exception when asking on mailing list.", 472 type, dir, ioe); 473 deleted = false; 474 } catch (Exception e) { 475 LOG.info("unexpected exception: ", e); 476 deleted = false; 477 } 478 LOG.trace("Finish deleting {} under {}, deleted=", type, dir, deleted); 479 return deleted; 480 } 481}