001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.cleaner; 019 020import java.io.IOException; 021import java.util.Comparator; 022import java.util.HashMap; 023import java.util.LinkedList; 024import java.util.List; 025import java.util.Map; 026import java.util.concurrent.ExecutionException; 027import java.util.concurrent.ForkJoinPool; 028import java.util.concurrent.RecursiveTask; 029import java.util.concurrent.atomic.AtomicBoolean; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FileStatus; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException; 035import org.apache.hadoop.hbase.ScheduledChore; 036import org.apache.hadoop.hbase.Stoppable; 037import org.apache.hadoop.hbase.conf.ConfigurationObserver; 038import org.apache.hadoop.hbase.util.FSUtils; 039import org.apache.hadoop.ipc.RemoteException; 040import org.apache.yetus.audience.InterfaceAudience; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 045import org.apache.hbase.thirdparty.com.google.common.base.Predicate; 046import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; 047import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 048import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 049 050/** 051 * Abstract Cleaner that uses a chain of delegates to clean a directory of files 052 * @param <T> Cleaner delegate class that is dynamically loaded from configuration 053 */ 054@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD", 055 justification="TODO: Fix. It is wonky have static pool initialized from instance") 056@InterfaceAudience.Private 057public abstract class CleanerChore<T extends FileCleanerDelegate> extends ScheduledChore 058 implements ConfigurationObserver { 059 060 private static final Logger LOG = LoggerFactory.getLogger(CleanerChore.class); 061 private static final int AVAIL_PROCESSORS = Runtime.getRuntime().availableProcessors(); 062 063 /** 064 * If it is an integer and >= 1, it would be the size; 065 * if 0.0 < size <= 1.0, size would be available processors * size. 066 * Pay attention that 1.0 is different from 1, former indicates it will use 100% of cores, 067 * while latter will use only 1 thread for chore to scan dir. 068 */ 069 public static final String CHORE_POOL_SIZE = "hbase.cleaner.scan.dir.concurrent.size"; 070 private static final String DEFAULT_CHORE_POOL_SIZE = "0.25"; 071 072 // It may be waste resources for each cleaner chore own its pool, 073 // so let's make pool for all cleaner chores. 074 private static volatile ForkJoinPool CHOREPOOL; 075 private static volatile int CHOREPOOLSIZE; 076 077 protected final FileSystem fs; 078 private final Path oldFileDir; 079 private final Configuration conf; 080 protected final Map<String, Object> params; 081 private final AtomicBoolean enabled = new AtomicBoolean(true); 082 private final AtomicBoolean reconfig = new AtomicBoolean(false); 083 protected List<T> cleanersChain; 084 085 public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf, 086 FileSystem fs, Path oldFileDir, String confKey) { 087 this(name, sleepPeriod, s, conf, fs, oldFileDir, confKey, null); 088 } 089 090 /** 091 * @param name name of the chore being run 092 * @param sleepPeriod the period of time to sleep between each run 093 * @param s the stopper 094 * @param conf configuration to use 095 * @param fs handle to the FS 096 * @param oldFileDir the path to the archived files 097 * @param confKey configuration key for the classes to instantiate 098 * @param params members could be used in cleaner 099 */ 100 public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf, 101 FileSystem fs, Path oldFileDir, String confKey, Map<String, Object> params) { 102 super(name, s, sleepPeriod); 103 this.fs = fs; 104 this.oldFileDir = oldFileDir; 105 this.conf = conf; 106 this.params = params; 107 initCleanerChain(confKey); 108 109 if (CHOREPOOL == null) { 110 String poolSize = conf.get(CHORE_POOL_SIZE, DEFAULT_CHORE_POOL_SIZE); 111 CHOREPOOLSIZE = calculatePoolSize(poolSize); 112 // poolSize may be 0 or 0.0 from a careless configuration, 113 // double check to make sure. 114 CHOREPOOLSIZE = CHOREPOOLSIZE == 0? calculatePoolSize(DEFAULT_CHORE_POOL_SIZE): CHOREPOOLSIZE; 115 this.CHOREPOOL = new ForkJoinPool(CHOREPOOLSIZE); 116 LOG.info("Cleaner pool size is {}", CHOREPOOLSIZE); 117 } 118 } 119 120 /** 121 * Calculate size for cleaner pool. 122 * @param poolSize size from configuration 123 * @return size of pool after calculation 124 */ 125 static int calculatePoolSize(String poolSize) { 126 if (poolSize.matches("[1-9][0-9]*")) { 127 // If poolSize is an integer, return it directly, 128 // but upmost to the number of available processors. 129 int size = Math.min(Integer.parseInt(poolSize), AVAIL_PROCESSORS); 130 if (size == AVAIL_PROCESSORS) { 131 LOG.warn("Use full core processors to scan dir, size={}", size); 132 } 133 return size; 134 } else if (poolSize.matches("0.[0-9]+|1.0")) { 135 // if poolSize is a double, return poolSize * availableProcessors; 136 // Ensure that we always return at least one. 137 int computedThreads = (int) (AVAIL_PROCESSORS * Double.valueOf(poolSize)); 138 if (computedThreads < 1) { 139 LOG.debug("Computed {} threads for CleanerChore, using 1 instead", computedThreads); 140 return 1; 141 } 142 return computedThreads; 143 } else { 144 LOG.error("Unrecognized value: " + poolSize + " for " + CHORE_POOL_SIZE + 145 ", use default config: " + DEFAULT_CHORE_POOL_SIZE + " instead."); 146 return calculatePoolSize(DEFAULT_CHORE_POOL_SIZE); 147 } 148 } 149 150 /** 151 * Validate the file to see if it even belongs in the directory. If it is valid, then the file 152 * will go through the cleaner delegates, but otherwise the file is just deleted. 153 * @param file full {@link Path} of the file to be checked 154 * @return <tt>true</tt> if the file is valid, <tt>false</tt> otherwise 155 */ 156 protected abstract boolean validate(Path file); 157 158 /** 159 * Instantiate and initialize all the file cleaners set in the configuration 160 * @param confKey key to get the file cleaner classes from the configuration 161 */ 162 private void initCleanerChain(String confKey) { 163 this.cleanersChain = new LinkedList<>(); 164 String[] logCleaners = conf.getStrings(confKey); 165 if (logCleaners != null) { 166 for (String className : logCleaners) { 167 T logCleaner = newFileCleaner(className, conf); 168 if (logCleaner != null) { 169 LOG.debug("Initialize cleaner={}", className); 170 this.cleanersChain.add(logCleaner); 171 } 172 } 173 } 174 } 175 176 @Override 177 public void onConfigurationChange(Configuration conf) { 178 int updatedSize = calculatePoolSize(conf.get(CHORE_POOL_SIZE, DEFAULT_CHORE_POOL_SIZE)); 179 if (updatedSize == CHOREPOOLSIZE) { 180 LOG.trace("Size from configuration is same as previous={}, no need to update.", updatedSize); 181 return; 182 } 183 CHOREPOOLSIZE = updatedSize; 184 if (CHOREPOOL.getPoolSize() == 0) { 185 // Chore does not work now, update it directly. 186 updateChorePoolSize(updatedSize); 187 return; 188 } 189 // Chore is working, update it after chore finished. 190 reconfig.set(true); 191 } 192 193 private void updateChorePoolSize(int updatedSize) { 194 CHOREPOOL.shutdownNow(); 195 LOG.info("Update chore's pool size from {} to {}", CHOREPOOL.getParallelism(), updatedSize); 196 CHOREPOOL = new ForkJoinPool(updatedSize); 197 } 198 199 /** 200 * A utility method to create new instances of LogCleanerDelegate based on the class name of the 201 * LogCleanerDelegate. 202 * @param className fully qualified class name of the LogCleanerDelegate 203 * @param conf used configuration 204 * @return the new instance 205 */ 206 private T newFileCleaner(String className, Configuration conf) { 207 try { 208 Class<? extends FileCleanerDelegate> c = Class.forName(className).asSubclass( 209 FileCleanerDelegate.class); 210 @SuppressWarnings("unchecked") 211 T cleaner = (T) c.getDeclaredConstructor().newInstance(); 212 cleaner.setConf(conf); 213 cleaner.init(this.params); 214 return cleaner; 215 } catch (Exception e) { 216 LOG.warn("Can NOT create CleanerDelegate={}", className, e); 217 // skipping if can't instantiate 218 return null; 219 } 220 } 221 222 @Override 223 protected void chore() { 224 if (getEnabled()) { 225 if (runCleaner()) { 226 LOG.trace("Cleaned all WALs under {}", oldFileDir); 227 } else { 228 LOG.trace("WALs outstanding under {}", oldFileDir); 229 } 230 // After each clean chore, checks if receives reconfigure notification while cleaning 231 if (reconfig.compareAndSet(true, false)) { 232 updateChorePoolSize(CHOREPOOLSIZE); 233 } 234 } else { 235 LOG.trace("Cleaner chore disabled! Not cleaning."); 236 } 237 } 238 239 private void preRunCleaner() { 240 cleanersChain.forEach(FileCleanerDelegate::preClean); 241 } 242 243 public Boolean runCleaner() { 244 preRunCleaner(); 245 CleanerTask task = new CleanerTask(this.oldFileDir, true); 246 CHOREPOOL.submit(task); 247 return task.join(); 248 } 249 250 /** 251 * Sort the given list in (descending) order of the space each element takes 252 * @param dirs the list to sort, element in it should be directory (not file) 253 */ 254 private void sortByConsumedSpace(List<FileStatus> dirs) { 255 if (dirs == null || dirs.size() < 2) { 256 // no need to sort for empty or single directory 257 return; 258 } 259 dirs.sort(new Comparator<FileStatus>() { 260 HashMap<FileStatus, Long> directorySpaces = new HashMap<>(); 261 262 @Override 263 public int compare(FileStatus f1, FileStatus f2) { 264 long f1ConsumedSpace = getSpace(f1); 265 long f2ConsumedSpace = getSpace(f2); 266 return Long.compare(f2ConsumedSpace, f1ConsumedSpace); 267 } 268 269 private long getSpace(FileStatus f) { 270 Long cached = directorySpaces.get(f); 271 if (cached != null) { 272 return cached; 273 } 274 try { 275 long space = 276 f.isDirectory() ? fs.getContentSummary(f.getPath()).getSpaceConsumed() : f.getLen(); 277 directorySpaces.put(f, space); 278 return space; 279 } catch (IOException e) { 280 LOG.trace("Failed to get space consumed by path={}", f, e); 281 return -1; 282 } 283 } 284 }); 285 } 286 287 /** 288 * Run the given files through each of the cleaners to see if it should be deleted, deleting it if 289 * necessary. 290 * @param files List of FileStatus for the files to check (and possibly delete) 291 * @return true iff successfully deleted all files 292 */ 293 private boolean checkAndDeleteFiles(List<FileStatus> files) { 294 if (files == null) { 295 return true; 296 } 297 298 // first check to see if the path is valid 299 List<FileStatus> validFiles = Lists.newArrayListWithCapacity(files.size()); 300 List<FileStatus> invalidFiles = Lists.newArrayList(); 301 for (FileStatus file : files) { 302 if (validate(file.getPath())) { 303 validFiles.add(file); 304 } else { 305 LOG.warn("Found a wrongly formatted file: " + file.getPath() + " - will delete it."); 306 invalidFiles.add(file); 307 } 308 } 309 310 Iterable<FileStatus> deletableValidFiles = validFiles; 311 // check each of the cleaners for the valid files 312 for (T cleaner : cleanersChain) { 313 if (cleaner.isStopped() || this.getStopper().isStopped()) { 314 LOG.warn("A file cleaner" + this.getName() + " is stopped, won't delete any more files in:" 315 + this.oldFileDir); 316 return false; 317 } 318 319 Iterable<FileStatus> filteredFiles = cleaner.getDeletableFiles(deletableValidFiles); 320 321 // trace which cleaner is holding on to each file 322 if (LOG.isTraceEnabled()) { 323 ImmutableSet<FileStatus> filteredFileSet = ImmutableSet.copyOf(filteredFiles); 324 for (FileStatus file : deletableValidFiles) { 325 if (!filteredFileSet.contains(file)) { 326 LOG.trace(file.getPath() + " is not deletable according to:" + cleaner); 327 } 328 } 329 } 330 331 deletableValidFiles = filteredFiles; 332 } 333 334 Iterable<FileStatus> filesToDelete = Iterables.concat(invalidFiles, deletableValidFiles); 335 return deleteFiles(filesToDelete) == files.size(); 336 } 337 338 /** 339 * Delete the given files 340 * @param filesToDelete files to delete 341 * @return number of deleted files 342 */ 343 protected int deleteFiles(Iterable<FileStatus> filesToDelete) { 344 int deletedFileCount = 0; 345 for (FileStatus file : filesToDelete) { 346 Path filePath = file.getPath(); 347 LOG.trace("Removing {} from archive", filePath); 348 try { 349 boolean success = this.fs.delete(filePath, false); 350 if (success) { 351 deletedFileCount++; 352 } else { 353 LOG.warn("Attempted to delete:" + filePath 354 + ", but couldn't. Run cleaner chain and attempt to delete on next pass."); 355 } 356 } catch (IOException e) { 357 e = e instanceof RemoteException ? 358 ((RemoteException)e).unwrapRemoteException() : e; 359 LOG.warn("Error while deleting: " + filePath, e); 360 } 361 } 362 return deletedFileCount; 363 } 364 365 @Override 366 public synchronized void cleanup() { 367 for (T lc : this.cleanersChain) { 368 try { 369 lc.stop("Exiting"); 370 } catch (Throwable t) { 371 LOG.warn("Stopping", t); 372 } 373 } 374 } 375 376 @VisibleForTesting 377 int getChorePoolSize() { 378 return CHOREPOOLSIZE; 379 } 380 381 /** 382 * @param enabled 383 */ 384 public boolean setEnabled(final boolean enabled) { 385 return this.enabled.getAndSet(enabled); 386 } 387 388 public boolean getEnabled() { return this.enabled.get(); 389 } 390 391 private interface Action<T> { 392 T act() throws IOException; 393 } 394 395 /** 396 * Attemps to clean up a directory, its subdirectories, and files. 397 * Return value is true if everything was deleted. false on partial / total failures. 398 */ 399 private class CleanerTask extends RecursiveTask<Boolean> { 400 private final Path dir; 401 private final boolean root; 402 403 CleanerTask(final FileStatus dir, final boolean root) { 404 this(dir.getPath(), root); 405 } 406 407 CleanerTask(final Path dir, final boolean root) { 408 this.dir = dir; 409 this.root = root; 410 } 411 412 @Override 413 protected Boolean compute() { 414 LOG.trace("Cleaning under {}", dir); 415 List<FileStatus> subDirs; 416 List<FileStatus> files; 417 try { 418 // if dir doesn't exist, we'll get null back for both of these 419 // which will fall through to succeeding. 420 subDirs = getFilteredStatus(status -> status.isDirectory()); 421 files = getFilteredStatus(status -> status.isFile()); 422 } catch (IOException ioe) { 423 LOG.warn("failed to get FileStatus for contents of '{}'", dir, ioe); 424 return false; 425 } 426 427 boolean nullSubDirs = subDirs == null; 428 if (nullSubDirs) { 429 LOG.trace("There is no subdir under {}", dir); 430 } 431 if (files == null) { 432 LOG.trace("There is no file under {}", dir); 433 } 434 435 int capacity = nullSubDirs ? 0 : subDirs.size(); 436 List<CleanerTask> tasks = Lists.newArrayListWithCapacity(capacity); 437 if (!nullSubDirs) { 438 sortByConsumedSpace(subDirs); 439 for (FileStatus subdir : subDirs) { 440 CleanerTask task = new CleanerTask(subdir, false); 441 tasks.add(task); 442 task.fork(); 443 } 444 } 445 446 boolean result = true; 447 result &= deleteAction(() -> checkAndDeleteFiles(files), "files"); 448 result &= deleteAction(() -> getCleanResult(tasks), "subdirs"); 449 // if and only if files and subdirs under current dir are deleted successfully, and 450 // it is not the root dir, then task will try to delete it. 451 if (result && !root) { 452 result &= deleteAction(() -> fs.delete(dir, false), "dir"); 453 } 454 return result; 455 } 456 457 /** 458 * Get FileStatus with filter. 459 * Pay attention that FSUtils #listStatusWithStatusFilter would return null, 460 * even though status is empty but not null. 461 * @param function a filter function 462 * @return filtered FileStatus or null if dir doesn't exist 463 * @throws IOException if there's an error other than dir not existing 464 */ 465 private List<FileStatus> getFilteredStatus(Predicate<FileStatus> function) throws IOException { 466 return FSUtils.listStatusWithStatusFilter(fs, dir, status -> function.test(status)); 467 } 468 469 /** 470 * Perform a delete on a specified type. 471 * @param deletion a delete 472 * @param type possible values are 'files', 'subdirs', 'dirs' 473 * @return true if it deleted successfully, false otherwise 474 */ 475 private boolean deleteAction(Action<Boolean> deletion, String type) { 476 boolean deleted; 477 try { 478 LOG.trace("Start deleting {} under {}", type, dir); 479 deleted = deletion.act(); 480 } catch (PathIsNotEmptyDirectoryException exception) { 481 // N.B. HDFS throws this exception when we try to delete a non-empty directory, but 482 // LocalFileSystem throws a bare IOException. So some test code will get the verbose 483 // message below. 484 LOG.debug("Couldn't delete '{}' yet because it isn't empty. Probably transient. " + 485 "exception details at TRACE.", dir); 486 LOG.trace("Couldn't delete '{}' yet because it isn't empty w/exception.", dir, exception); 487 deleted = false; 488 } catch (IOException ioe) { 489 LOG.info("Could not delete {} under {}. might be transient; we'll retry. if it keeps " + 490 "happening, use following exception when asking on mailing list.", 491 type, dir, ioe); 492 deleted = false; 493 } 494 LOG.trace("Finish deleting {} under {}, deleted=", type, dir, deleted); 495 return deleted; 496 } 497 498 /** 499 * Get cleaner results of subdirs. 500 * @param tasks subdirs cleaner tasks 501 * @return true if all subdirs deleted successfully, false for patial/all failures 502 * @throws IOException something happen during computation 503 */ 504 private boolean getCleanResult(List<CleanerTask> tasks) throws IOException { 505 boolean cleaned = true; 506 try { 507 for (CleanerTask task : tasks) { 508 cleaned &= task.get(); 509 } 510 } catch (InterruptedException | ExecutionException e) { 511 throw new IOException(e); 512 } 513 return cleaned; 514 } 515 } 516}