001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.cleaner; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Arrays; 023import java.util.Comparator; 024import java.util.HashMap; 025import java.util.LinkedList; 026import java.util.List; 027import java.util.Map; 028import java.util.concurrent.CompletableFuture; 029import java.util.concurrent.atomic.AtomicBoolean; 030import java.util.stream.Collectors; 031 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FileStatus; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException; 037import org.apache.hadoop.hbase.ScheduledChore; 038import org.apache.hadoop.hbase.Stoppable; 039import org.apache.hadoop.hbase.util.FutureUtils; 040import org.apache.hadoop.ipc.RemoteException; 041import org.apache.yetus.audience.InterfaceAudience; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 046import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 047import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; 048import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 049import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 050 051/** 052 * Abstract Cleaner that uses a chain of delegates to clean a directory of files 053 * @param <T> Cleaner delegate class that is dynamically loaded from configuration 054 */ 055@InterfaceAudience.Private 056public abstract class CleanerChore<T extends FileCleanerDelegate> extends ScheduledChore { 057 058 private static final Logger LOG = LoggerFactory.getLogger(CleanerChore.class); 059 private static final int AVAIL_PROCESSORS = Runtime.getRuntime().availableProcessors(); 060 061 /** 062 * If it is an integer and >= 1, it would be the size; 063 * if 0.0 < size <= 1.0, size would be available processors * size. 064 * Pay attention that 1.0 is different from 1, former indicates it will use 100% of cores, 065 * while latter will use only 1 thread for chore to scan dir. 066 */ 067 public static final String CHORE_POOL_SIZE = "hbase.cleaner.scan.dir.concurrent.size"; 068 static final String DEFAULT_CHORE_POOL_SIZE = "0.25"; 069 070 private final DirScanPool pool; 071 072 protected final FileSystem fs; 073 private final Path oldFileDir; 074 private final Configuration conf; 075 protected final Map<String, Object> params; 076 private final AtomicBoolean enabled = new AtomicBoolean(true); 077 protected List<T> cleanersChain; 078 079 public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf, 080 FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool) { 081 this(name, sleepPeriod, s, conf, fs, oldFileDir, confKey, pool, null); 082 } 083 084 /** 085 * @param name name of the chore being run 086 * @param sleepPeriod the period of time to sleep between each run 087 * @param s the stopper 088 * @param conf configuration to use 089 * @param fs handle to the FS 090 * @param oldFileDir the path to the archived files 091 * @param confKey configuration key for the classes to instantiate 092 * @param pool the thread pool used to scan directories 093 * @param params members could be used in cleaner 094 */ 095 public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf, 096 FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool, Map<String, Object> params) { 097 super(name, s, sleepPeriod); 098 099 Preconditions.checkNotNull(pool, "Chore's pool can not be null"); 100 this.pool = pool; 101 this.fs = fs; 102 this.oldFileDir = oldFileDir; 103 this.conf = conf; 104 this.params = params; 105 initCleanerChain(confKey); 106 } 107 108 /** 109 * Calculate size for cleaner pool. 110 * @param poolSize size from configuration 111 * @return size of pool after calculation 112 */ 113 static int calculatePoolSize(String poolSize) { 114 if (poolSize.matches("[1-9][0-9]*")) { 115 // If poolSize is an integer, return it directly, 116 // but upmost to the number of available processors. 117 int size = Math.min(Integer.parseInt(poolSize), AVAIL_PROCESSORS); 118 if (size == AVAIL_PROCESSORS) { 119 LOG.warn("Use full core processors to scan dir, size={}", size); 120 } 121 return size; 122 } else if (poolSize.matches("0.[0-9]+|1.0")) { 123 // if poolSize is a double, return poolSize * availableProcessors; 124 // Ensure that we always return at least one. 125 int computedThreads = (int) (AVAIL_PROCESSORS * Double.valueOf(poolSize)); 126 if (computedThreads < 1) { 127 LOG.debug("Computed {} threads for CleanerChore, using 1 instead", computedThreads); 128 return 1; 129 } 130 return computedThreads; 131 } else { 132 LOG.error("Unrecognized value: " + poolSize + " for " + CHORE_POOL_SIZE + 133 ", use default config: " + DEFAULT_CHORE_POOL_SIZE + " instead."); 134 return calculatePoolSize(DEFAULT_CHORE_POOL_SIZE); 135 } 136 } 137 138 /** 139 * Validate the file to see if it even belongs in the directory. If it is valid, then the file 140 * will go through the cleaner delegates, but otherwise the file is just deleted. 141 * @param file full {@link Path} of the file to be checked 142 * @return <tt>true</tt> if the file is valid, <tt>false</tt> otherwise 143 */ 144 protected abstract boolean validate(Path file); 145 146 /** 147 * Instantiate and initialize all the file cleaners set in the configuration 148 * @param confKey key to get the file cleaner classes from the configuration 149 */ 150 private void initCleanerChain(String confKey) { 151 this.cleanersChain = new LinkedList<>(); 152 String[] logCleaners = conf.getStrings(confKey); 153 if (logCleaners != null) { 154 for (String className : logCleaners) { 155 T logCleaner = newFileCleaner(className, conf); 156 if (logCleaner != null) { 157 LOG.info("Initialize cleaner={}", className); 158 this.cleanersChain.add(logCleaner); 159 } 160 } 161 } 162 } 163 164 /** 165 * A utility method to create new instances of LogCleanerDelegate based on the class name of the 166 * LogCleanerDelegate. 167 * @param className fully qualified class name of the LogCleanerDelegate 168 * @param conf used configuration 169 * @return the new instance 170 */ 171 private T newFileCleaner(String className, Configuration conf) { 172 try { 173 Class<? extends FileCleanerDelegate> c = Class.forName(className).asSubclass( 174 FileCleanerDelegate.class); 175 @SuppressWarnings("unchecked") 176 T cleaner = (T) c.getDeclaredConstructor().newInstance(); 177 cleaner.setConf(conf); 178 cleaner.init(this.params); 179 return cleaner; 180 } catch (Exception e) { 181 LOG.warn("Can NOT create CleanerDelegate={}", className, e); 182 // skipping if can't instantiate 183 return null; 184 } 185 } 186 187 @Override 188 protected void chore() { 189 if (getEnabled()) { 190 try { 191 pool.latchCountUp(); 192 if (runCleaner()) { 193 LOG.trace("Cleaned all WALs under {}", oldFileDir); 194 } else { 195 LOG.trace("WALs outstanding under {}", oldFileDir); 196 } 197 } finally { 198 pool.latchCountDown(); 199 } 200 // After each cleaner chore, checks if received reconfigure notification while cleaning. 201 // First in cleaner turns off notification, to avoid another cleaner updating pool again. 202 // This cleaner is waiting for other cleaners finishing their jobs. 203 // To avoid missing next chore, only wait 0.8 * period, then shutdown. 204 pool.tryUpdatePoolSize((long) (0.8 * getTimeUnit().toMillis(getPeriod()))); 205 } else { 206 LOG.trace("Cleaner chore disabled! Not cleaning."); 207 } 208 } 209 210 private void preRunCleaner() { 211 cleanersChain.forEach(FileCleanerDelegate::preClean); 212 } 213 214 public boolean runCleaner() { 215 preRunCleaner(); 216 try { 217 CompletableFuture<Boolean> future = new CompletableFuture<>(); 218 pool.execute(() -> traverseAndDelete(oldFileDir, true, future)); 219 return future.get(); 220 } catch (Exception e) { 221 LOG.info("Failed to traverse and delete the dir: {}", oldFileDir, e); 222 return false; 223 } 224 } 225 226 /** 227 * Sort the given list in (descending) order of the space each element takes 228 * @param dirs the list to sort, element in it should be directory (not file) 229 */ 230 private void sortByConsumedSpace(List<FileStatus> dirs) { 231 if (dirs == null || dirs.size() < 2) { 232 // no need to sort for empty or single directory 233 return; 234 } 235 dirs.sort(new Comparator<FileStatus>() { 236 HashMap<FileStatus, Long> directorySpaces = new HashMap<>(); 237 238 @Override 239 public int compare(FileStatus f1, FileStatus f2) { 240 long f1ConsumedSpace = getSpace(f1); 241 long f2ConsumedSpace = getSpace(f2); 242 return Long.compare(f2ConsumedSpace, f1ConsumedSpace); 243 } 244 245 private long getSpace(FileStatus f) { 246 Long cached = directorySpaces.get(f); 247 if (cached != null) { 248 return cached; 249 } 250 try { 251 long space = 252 f.isDirectory() ? fs.getContentSummary(f.getPath()).getSpaceConsumed() : f.getLen(); 253 directorySpaces.put(f, space); 254 return space; 255 } catch (IOException e) { 256 LOG.trace("Failed to get space consumed by path={}", f, e); 257 return -1; 258 } 259 } 260 }); 261 } 262 263 /** 264 * Run the given files through each of the cleaners to see if it should be deleted, deleting it if 265 * necessary. 266 * @param files List of FileStatus for the files to check (and possibly delete) 267 * @return true iff successfully deleted all files 268 */ 269 private boolean checkAndDeleteFiles(List<FileStatus> files) { 270 if (files == null) { 271 return true; 272 } 273 274 // first check to see if the path is valid 275 List<FileStatus> validFiles = Lists.newArrayListWithCapacity(files.size()); 276 List<FileStatus> invalidFiles = Lists.newArrayList(); 277 for (FileStatus file : files) { 278 if (validate(file.getPath())) { 279 validFiles.add(file); 280 } else { 281 LOG.warn("Found a wrongly formatted file: " + file.getPath() + " - will delete it."); 282 invalidFiles.add(file); 283 } 284 } 285 286 Iterable<FileStatus> deletableValidFiles = validFiles; 287 // check each of the cleaners for the valid files 288 for (T cleaner : cleanersChain) { 289 if (cleaner.isStopped() || this.getStopper().isStopped()) { 290 LOG.warn("A file cleaner" + this.getName() + " is stopped, won't delete any more files in:" 291 + this.oldFileDir); 292 return false; 293 } 294 295 Iterable<FileStatus> filteredFiles = cleaner.getDeletableFiles(deletableValidFiles); 296 297 // trace which cleaner is holding on to each file 298 if (LOG.isTraceEnabled()) { 299 ImmutableSet<FileStatus> filteredFileSet = ImmutableSet.copyOf(filteredFiles); 300 for (FileStatus file : deletableValidFiles) { 301 if (!filteredFileSet.contains(file)) { 302 LOG.trace(file.getPath() + " is not deletable according to:" + cleaner); 303 } 304 } 305 } 306 307 deletableValidFiles = filteredFiles; 308 } 309 310 Iterable<FileStatus> filesToDelete = Iterables.concat(invalidFiles, deletableValidFiles); 311 return deleteFiles(filesToDelete) == files.size(); 312 } 313 314 /** 315 * Delete the given files 316 * @param filesToDelete files to delete 317 * @return number of deleted files 318 */ 319 protected int deleteFiles(Iterable<FileStatus> filesToDelete) { 320 int deletedFileCount = 0; 321 for (FileStatus file : filesToDelete) { 322 Path filePath = file.getPath(); 323 LOG.trace("Removing {} from archive", filePath); 324 try { 325 boolean success = this.fs.delete(filePath, false); 326 if (success) { 327 deletedFileCount++; 328 } else { 329 LOG.warn("Attempted to delete:" + filePath 330 + ", but couldn't. Run cleaner chain and attempt to delete on next pass."); 331 } 332 } catch (IOException e) { 333 e = e instanceof RemoteException ? 334 ((RemoteException)e).unwrapRemoteException() : e; 335 LOG.warn("Error while deleting: " + filePath, e); 336 } 337 } 338 return deletedFileCount; 339 } 340 341 @Override 342 public synchronized void cleanup() { 343 for (T lc : this.cleanersChain) { 344 try { 345 lc.stop("Exiting"); 346 } catch (Throwable t) { 347 LOG.warn("Stopping", t); 348 } 349 } 350 } 351 352 @VisibleForTesting 353 int getChorePoolSize() { 354 return pool.getSize(); 355 } 356 357 /** 358 * @param enabled 359 */ 360 public boolean setEnabled(final boolean enabled) { 361 return this.enabled.getAndSet(enabled); 362 } 363 364 public boolean getEnabled() { return this.enabled.get(); 365 } 366 367 private interface Action<T> { 368 T act() throws Exception; 369 } 370 371 /** 372 * Attempts to clean up a directory(its subdirectories, and files) in a 373 * {@link java.util.concurrent.ThreadPoolExecutor} concurrently. We can get the final result by 374 * calling result.get(). 375 */ 376 private void traverseAndDelete(Path dir, boolean root, CompletableFuture<Boolean> result) { 377 try { 378 // Step.1: List all files under the given directory. 379 List<FileStatus> allPaths = Arrays.asList(fs.listStatus(dir)); 380 List<FileStatus> subDirs = 381 allPaths.stream().filter(FileStatus::isDirectory).collect(Collectors.toList()); 382 List<FileStatus> files = 383 allPaths.stream().filter(FileStatus::isFile).collect(Collectors.toList()); 384 385 // Step.2: Try to delete all the deletable files. 386 boolean allFilesDeleted = 387 files.isEmpty() || deleteAction(() -> checkAndDeleteFiles(files), "files", dir); 388 389 // Step.3: Start to traverse and delete the sub-directories. 390 List<CompletableFuture<Boolean>> futures = new ArrayList<>(); 391 if (!subDirs.isEmpty()) { 392 sortByConsumedSpace(subDirs); 393 // Submit the request of sub-directory deletion. 394 subDirs.forEach(subDir -> { 395 CompletableFuture<Boolean> subFuture = new CompletableFuture<>(); 396 pool.execute(() -> traverseAndDelete(subDir.getPath(), false, subFuture)); 397 futures.add(subFuture); 398 }); 399 } 400 401 // Step.4: Once all sub-files & sub-directories are deleted, then can try to delete the 402 // current directory asynchronously. 403 FutureUtils.addListener( 404 CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])), 405 (voidObj, e) -> { 406 if (e != null) { 407 result.completeExceptionally(e); 408 return; 409 } 410 try { 411 boolean allSubDirsDeleted = futures.stream().allMatch(CompletableFuture::join); 412 boolean deleted = allFilesDeleted && allSubDirsDeleted; 413 if (deleted && !root) { 414 // If and only if files and sub-dirs under current dir are deleted successfully, and 415 // the empty directory can be deleted, and it is not the root dir then task will 416 // try to delete it. 417 deleted = deleteAction(() -> fs.delete(dir, false), "dir", dir); 418 } 419 result.complete(deleted); 420 } catch (Exception ie) { 421 // Must handle the inner exception here, otherwise the result may get stuck if one 422 // sub-directory get some failure. 423 result.completeExceptionally(ie); 424 } 425 }); 426 } catch (Exception e) { 427 LOG.debug("Failed to traverse and delete the path: {}", dir, e); 428 result.completeExceptionally(e); 429 } 430 } 431 432 /** 433 * Perform a delete on a specified type. 434 * @param deletion a delete 435 * @param type possible values are 'files', 'subdirs', 'dirs' 436 * @return true if it deleted successfully, false otherwise 437 */ 438 private boolean deleteAction(Action<Boolean> deletion, String type, Path dir) { 439 boolean deleted; 440 try { 441 LOG.trace("Start deleting {} under {}", type, dir); 442 deleted = deletion.act(); 443 } catch (PathIsNotEmptyDirectoryException exception) { 444 // N.B. HDFS throws this exception when we try to delete a non-empty directory, but 445 // LocalFileSystem throws a bare IOException. So some test code will get the verbose 446 // message below. 447 LOG.debug("Couldn't delete '{}' yet because it isn't empty w/exception.", dir, exception); 448 deleted = false; 449 } catch (IOException ioe) { 450 LOG.info("Could not delete {} under {}. might be transient; we'll retry. if it keeps " 451 + "happening, use following exception when asking on mailing list.", 452 type, dir, ioe); 453 deleted = false; 454 } catch (Exception e) { 455 LOG.info("unexpected exception: ", e); 456 deleted = false; 457 } 458 LOG.trace("Finish deleting {} under {}, deleted=", type, dir, deleted); 459 return deleted; 460 } 461}