001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.cleaner; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Arrays; 023import java.util.Comparator; 024import java.util.HashMap; 025import java.util.LinkedList; 026import java.util.List; 027import java.util.Map; 028import java.util.concurrent.CompletableFuture; 029import java.util.concurrent.atomic.AtomicBoolean; 030import java.util.stream.Collectors; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FileStatus; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException; 036import org.apache.hadoop.hbase.ScheduledChore; 037import org.apache.hadoop.hbase.Stoppable; 038import org.apache.hadoop.hbase.util.FutureUtils; 039import org.apache.hadoop.ipc.RemoteException; 040import org.apache.yetus.audience.InterfaceAudience; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 045import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; 046import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 047import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 048 049/** 050 * Abstract Cleaner that uses a chain of delegates to clean a directory of files 051 * @param <T> Cleaner delegate class that is dynamically loaded from configuration 052 */ 053@InterfaceAudience.Private 054public abstract class CleanerChore<T extends FileCleanerDelegate> extends ScheduledChore { 055 056 private static final Logger LOG = LoggerFactory.getLogger(CleanerChore.class); 057 private static final int AVAIL_PROCESSORS = Runtime.getRuntime().availableProcessors(); 058 059 /** 060 * If it is an integer and >= 1, it would be the size; 061 * if 0.0 < size <= 1.0, size would be available processors * size. 062 * Pay attention that 1.0 is different from 1, former indicates it will use 100% of cores, 063 * while latter will use only 1 thread for chore to scan dir. 064 */ 065 public static final String CHORE_POOL_SIZE = "hbase.cleaner.scan.dir.concurrent.size"; 066 static final String DEFAULT_CHORE_POOL_SIZE = "0.25"; 067 068 private final DirScanPool pool; 069 070 protected final FileSystem fs; 071 private final Path oldFileDir; 072 private final Configuration conf; 073 protected final Map<String, Object> params; 074 private final AtomicBoolean enabled = new AtomicBoolean(true); 075 protected List<T> cleanersChain; 076 077 public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf, 078 FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool) { 079 this(name, sleepPeriod, s, conf, fs, oldFileDir, confKey, pool, null); 080 } 081 082 /** 083 * @param name name of the chore being run 084 * @param sleepPeriod the period of time to sleep between each run 085 * @param s the stopper 086 * @param conf configuration to use 087 * @param fs handle to the FS 088 * @param oldFileDir the path to the archived files 089 * @param confKey configuration key for the classes to instantiate 090 * @param pool the thread pool used to scan directories 091 * @param params members could be used in cleaner 092 */ 093 public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf, 094 FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool, Map<String, Object> params) { 095 super(name, s, sleepPeriod); 096 097 Preconditions.checkNotNull(pool, "Chore's pool can not be null"); 098 this.pool = pool; 099 this.fs = fs; 100 this.oldFileDir = oldFileDir; 101 this.conf = conf; 102 this.params = params; 103 initCleanerChain(confKey); 104 } 105 106 /** 107 * Calculate size for cleaner pool. 108 * @param poolSize size from configuration 109 * @return size of pool after calculation 110 */ 111 static int calculatePoolSize(String poolSize) { 112 if (poolSize.matches("[1-9][0-9]*")) { 113 // If poolSize is an integer, return it directly, 114 // but upmost to the number of available processors. 115 int size = Math.min(Integer.parseInt(poolSize), AVAIL_PROCESSORS); 116 if (size == AVAIL_PROCESSORS) { 117 LOG.warn("Use full core processors to scan dir, size={}", size); 118 } 119 return size; 120 } else if (poolSize.matches("0.[0-9]+|1.0")) { 121 // if poolSize is a double, return poolSize * availableProcessors; 122 // Ensure that we always return at least one. 123 int computedThreads = (int) (AVAIL_PROCESSORS * Double.valueOf(poolSize)); 124 if (computedThreads < 1) { 125 LOG.debug("Computed {} threads for CleanerChore, using 1 instead", computedThreads); 126 return 1; 127 } 128 return computedThreads; 129 } else { 130 LOG.error("Unrecognized value: " + poolSize + " for " + CHORE_POOL_SIZE + 131 ", use default config: " + DEFAULT_CHORE_POOL_SIZE + " instead."); 132 return calculatePoolSize(DEFAULT_CHORE_POOL_SIZE); 133 } 134 } 135 136 /** 137 * Validate the file to see if it even belongs in the directory. If it is valid, then the file 138 * will go through the cleaner delegates, but otherwise the file is just deleted. 139 * @param file full {@link Path} of the file to be checked 140 * @return <tt>true</tt> if the file is valid, <tt>false</tt> otherwise 141 */ 142 protected abstract boolean validate(Path file); 143 144 /** 145 * Instantiate and initialize all the file cleaners set in the configuration 146 * @param confKey key to get the file cleaner classes from the configuration 147 */ 148 private void initCleanerChain(String confKey) { 149 this.cleanersChain = new LinkedList<>(); 150 String[] logCleaners = conf.getStrings(confKey); 151 if (logCleaners != null) { 152 for (String className: logCleaners) { 153 className = className.trim(); 154 if (className.isEmpty()) { 155 continue; 156 } 157 T logCleaner = newFileCleaner(className, conf); 158 if (logCleaner != null) { 159 LOG.info("Initialize cleaner={}", className); 160 this.cleanersChain.add(logCleaner); 161 } 162 } 163 } 164 } 165 166 /** 167 * A utility method to create new instances of LogCleanerDelegate based on the class name of the 168 * LogCleanerDelegate. 169 * @param className fully qualified class name of the LogCleanerDelegate 170 * @param conf used configuration 171 * @return the new instance 172 */ 173 private T newFileCleaner(String className, Configuration conf) { 174 try { 175 Class<? extends FileCleanerDelegate> c = Class.forName(className).asSubclass( 176 FileCleanerDelegate.class); 177 @SuppressWarnings("unchecked") 178 T cleaner = (T) c.getDeclaredConstructor().newInstance(); 179 cleaner.setConf(conf); 180 cleaner.init(this.params); 181 return cleaner; 182 } catch (Exception e) { 183 LOG.warn("Can NOT create CleanerDelegate={}", className, e); 184 // skipping if can't instantiate 185 return null; 186 } 187 } 188 189 @Override 190 protected void chore() { 191 if (getEnabled()) { 192 try { 193 pool.latchCountUp(); 194 if (runCleaner()) { 195 LOG.trace("Cleaned all WALs under {}", oldFileDir); 196 } else { 197 LOG.trace("WALs outstanding under {}", oldFileDir); 198 } 199 } finally { 200 pool.latchCountDown(); 201 } 202 // After each cleaner chore, checks if received reconfigure notification while cleaning. 203 // First in cleaner turns off notification, to avoid another cleaner updating pool again. 204 // This cleaner is waiting for other cleaners finishing their jobs. 205 // To avoid missing next chore, only wait 0.8 * period, then shutdown. 206 pool.tryUpdatePoolSize((long) (0.8 * getTimeUnit().toMillis(getPeriod()))); 207 } else { 208 LOG.trace("Cleaner chore disabled! Not cleaning."); 209 } 210 } 211 212 private void preRunCleaner() { 213 cleanersChain.forEach(FileCleanerDelegate::preClean); 214 } 215 216 public boolean runCleaner() { 217 preRunCleaner(); 218 try { 219 CompletableFuture<Boolean> future = new CompletableFuture<>(); 220 pool.execute(() -> traverseAndDelete(oldFileDir, true, future)); 221 return future.get(); 222 } catch (Exception e) { 223 LOG.info("Failed to traverse and delete the dir: {}", oldFileDir, e); 224 return false; 225 } 226 } 227 228 /** 229 * Sort the given list in (descending) order of the space each element takes 230 * @param dirs the list to sort, element in it should be directory (not file) 231 */ 232 private void sortByConsumedSpace(List<FileStatus> dirs) { 233 if (dirs == null || dirs.size() < 2) { 234 // no need to sort for empty or single directory 235 return; 236 } 237 dirs.sort(new Comparator<FileStatus>() { 238 HashMap<FileStatus, Long> directorySpaces = new HashMap<>(); 239 240 @Override 241 public int compare(FileStatus f1, FileStatus f2) { 242 long f1ConsumedSpace = getSpace(f1); 243 long f2ConsumedSpace = getSpace(f2); 244 return Long.compare(f2ConsumedSpace, f1ConsumedSpace); 245 } 246 247 private long getSpace(FileStatus f) { 248 Long cached = directorySpaces.get(f); 249 if (cached != null) { 250 return cached; 251 } 252 try { 253 long space = 254 f.isDirectory() ? fs.getContentSummary(f.getPath()).getSpaceConsumed() : f.getLen(); 255 directorySpaces.put(f, space); 256 return space; 257 } catch (IOException e) { 258 LOG.trace("Failed to get space consumed by path={}", f, e); 259 return -1; 260 } 261 } 262 }); 263 } 264 265 /** 266 * Run the given files through each of the cleaners to see if it should be deleted, deleting it if 267 * necessary. 268 * @param files List of FileStatus for the files to check (and possibly delete) 269 * @return true iff successfully deleted all files 270 */ 271 private boolean checkAndDeleteFiles(List<FileStatus> files) { 272 if (files == null) { 273 return true; 274 } 275 276 // first check to see if the path is valid 277 List<FileStatus> validFiles = Lists.newArrayListWithCapacity(files.size()); 278 List<FileStatus> invalidFiles = Lists.newArrayList(); 279 for (FileStatus file : files) { 280 if (validate(file.getPath())) { 281 validFiles.add(file); 282 } else { 283 LOG.warn("Found a wrongly formatted file: " + file.getPath() + " - will delete it."); 284 invalidFiles.add(file); 285 } 286 } 287 288 Iterable<FileStatus> deletableValidFiles = validFiles; 289 // check each of the cleaners for the valid files 290 for (T cleaner : cleanersChain) { 291 if (cleaner.isStopped() || this.getStopper().isStopped()) { 292 LOG.warn("A file cleaner" + this.getName() + " is stopped, won't delete any more files in:" 293 + this.oldFileDir); 294 return false; 295 } 296 297 Iterable<FileStatus> filteredFiles = cleaner.getDeletableFiles(deletableValidFiles); 298 299 // trace which cleaner is holding on to each file 300 if (LOG.isTraceEnabled()) { 301 ImmutableSet<FileStatus> filteredFileSet = ImmutableSet.copyOf(filteredFiles); 302 for (FileStatus file : deletableValidFiles) { 303 if (!filteredFileSet.contains(file)) { 304 LOG.trace(file.getPath() + " is not deletable according to:" + cleaner); 305 } 306 } 307 } 308 309 deletableValidFiles = filteredFiles; 310 } 311 312 Iterable<FileStatus> filesToDelete = Iterables.concat(invalidFiles, deletableValidFiles); 313 return deleteFiles(filesToDelete) == files.size(); 314 } 315 316 /** 317 * Check if a empty directory with no subdirs or subfiles can be deleted 318 * @param dir Path of the directory 319 * @return True if the directory can be deleted, otherwise false 320 */ 321 private boolean isEmptyDirDeletable(Path dir) { 322 for (T cleaner : cleanersChain) { 323 if (cleaner.isStopped() || this.getStopper().isStopped()) { 324 LOG.warn("A file cleaner {} is stopped, won't delete the empty directory {}", 325 this.getName(), dir); 326 return false; 327 } 328 if (!cleaner.isEmptyDirDeletable(dir)) { 329 // If one of the cleaner need the empty directory, skip delete it 330 return false; 331 } 332 } 333 return true; 334 } 335 336 /** 337 * Delete the given files 338 * @param filesToDelete files to delete 339 * @return number of deleted files 340 */ 341 protected int deleteFiles(Iterable<FileStatus> filesToDelete) { 342 int deletedFileCount = 0; 343 for (FileStatus file : filesToDelete) { 344 Path filePath = file.getPath(); 345 LOG.trace("Removing {} from archive", filePath); 346 try { 347 boolean success = this.fs.delete(filePath, false); 348 if (success) { 349 deletedFileCount++; 350 } else { 351 LOG.warn("Attempted to delete:" + filePath 352 + ", but couldn't. Run cleaner chain and attempt to delete on next pass."); 353 } 354 } catch (IOException e) { 355 e = e instanceof RemoteException ? 356 ((RemoteException)e).unwrapRemoteException() : e; 357 LOG.warn("Error while deleting: " + filePath, e); 358 } 359 } 360 return deletedFileCount; 361 } 362 363 @Override 364 public synchronized void cleanup() { 365 for (T lc : this.cleanersChain) { 366 try { 367 lc.stop("Exiting"); 368 } catch (Throwable t) { 369 LOG.warn("Stopping", t); 370 } 371 } 372 } 373 374 int getChorePoolSize() { 375 return pool.getSize(); 376 } 377 378 /** 379 * @param enabled 380 */ 381 public boolean setEnabled(final boolean enabled) { 382 return this.enabled.getAndSet(enabled); 383 } 384 385 public boolean getEnabled() { return this.enabled.get(); 386 } 387 388 private interface Action<T> { 389 T act() throws Exception; 390 } 391 392 /** 393 * Attempts to clean up a directory(its subdirectories, and files) in a 394 * {@link java.util.concurrent.ThreadPoolExecutor} concurrently. We can get the final result by 395 * calling result.get(). 396 */ 397 private void traverseAndDelete(Path dir, boolean root, CompletableFuture<Boolean> result) { 398 try { 399 // Step.1: List all files under the given directory. 400 List<FileStatus> allPaths = Arrays.asList(fs.listStatus(dir)); 401 List<FileStatus> subDirs = 402 allPaths.stream().filter(FileStatus::isDirectory).collect(Collectors.toList()); 403 List<FileStatus> files = 404 allPaths.stream().filter(FileStatus::isFile).collect(Collectors.toList()); 405 406 // Step.2: Try to delete all the deletable files. 407 boolean allFilesDeleted = 408 files.isEmpty() || deleteAction(() -> checkAndDeleteFiles(files), "files", dir); 409 410 // Step.3: Start to traverse and delete the sub-directories. 411 List<CompletableFuture<Boolean>> futures = new ArrayList<>(); 412 if (!subDirs.isEmpty()) { 413 sortByConsumedSpace(subDirs); 414 // Submit the request of sub-directory deletion. 415 subDirs.forEach(subDir -> { 416 CompletableFuture<Boolean> subFuture = new CompletableFuture<>(); 417 pool.execute(() -> traverseAndDelete(subDir.getPath(), false, subFuture)); 418 futures.add(subFuture); 419 }); 420 } 421 422 // Step.4: Once all sub-files & sub-directories are deleted, then can try to delete the 423 // current directory asynchronously. 424 FutureUtils.addListener( 425 CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])), 426 (voidObj, e) -> { 427 if (e != null) { 428 result.completeExceptionally(e); 429 return; 430 } 431 try { 432 boolean allSubDirsDeleted = futures.stream().allMatch(CompletableFuture::join); 433 boolean deleted = allFilesDeleted && allSubDirsDeleted && isEmptyDirDeletable(dir); 434 if (deleted && !root) { 435 // If and only if files and sub-dirs under current dir are deleted successfully, and 436 // the empty directory can be deleted, and it is not the root dir then task will 437 // try to delete it. 438 deleted = deleteAction(() -> fs.delete(dir, false), "dir", dir); 439 } 440 result.complete(deleted); 441 } catch (Exception ie) { 442 // Must handle the inner exception here, otherwise the result may get stuck if one 443 // sub-directory get some failure. 444 result.completeExceptionally(ie); 445 } 446 }); 447 } catch (Exception e) { 448 LOG.debug("Failed to traverse and delete the path: {}", dir, e); 449 result.completeExceptionally(e); 450 } 451 } 452 453 /** 454 * Perform a delete on a specified type. 455 * @param deletion a delete 456 * @param type possible values are 'files', 'subdirs', 'dirs' 457 * @return true if it deleted successfully, false otherwise 458 */ 459 private boolean deleteAction(Action<Boolean> deletion, String type, Path dir) { 460 boolean deleted; 461 try { 462 LOG.trace("Start deleting {} under {}", type, dir); 463 deleted = deletion.act(); 464 } catch (PathIsNotEmptyDirectoryException exception) { 465 // N.B. HDFS throws this exception when we try to delete a non-empty directory, but 466 // LocalFileSystem throws a bare IOException. So some test code will get the verbose 467 // message below. 468 LOG.debug("Couldn't delete '{}' yet because it isn't empty w/exception.", dir, exception); 469 deleted = false; 470 } catch (IOException ioe) { 471 LOG.info("Could not delete {} under {}. might be transient; we'll retry. if it keeps " 472 + "happening, use following exception when asking on mailing list.", 473 type, dir, ioe); 474 deleted = false; 475 } catch (Exception e) { 476 LOG.info("unexpected exception: ", e); 477 deleted = false; 478 } 479 LOG.trace("Finish deleting {} under {}, deleted=", type, dir, deleted); 480 return deleted; 481 } 482}