001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.cleaner;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Arrays;
024import java.util.Comparator;
025import java.util.HashMap;
026import java.util.List;
027import java.util.Map;
028import java.util.concurrent.CompletableFuture;
029import java.util.concurrent.atomic.AtomicBoolean;
030import java.util.stream.Collectors;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileStatus;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
036import org.apache.hadoop.hbase.ScheduledChore;
037import org.apache.hadoop.hbase.Stoppable;
038import org.apache.hadoop.hbase.util.FutureUtils;
039import org.apache.hadoop.ipc.RemoteException;
040import org.apache.yetus.audience.InterfaceAudience;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
045import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
046import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
047import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
048
049/**
050 * Abstract Cleaner that uses a chain of delegates to clean a directory of files
051 * @param <T> Cleaner delegate class that is dynamically loaded from configuration
052 */
053@InterfaceAudience.Private
054public abstract class CleanerChore<T extends FileCleanerDelegate> extends ScheduledChore {
055
056  private static final Logger LOG = LoggerFactory.getLogger(CleanerChore.class);
057  private static final int AVAIL_PROCESSORS = Runtime.getRuntime().availableProcessors();
058
059  /**
060   * Configures the threadpool used for scanning the archive directory for the HFileCleaner If it is
061   * an integer and >= 1, it would be the size; if 0.0 < size <= 1.0, size would be available
062   * processors * size. Pay attention that 1.0 is different from 1, former indicates it will use
063   * 100% of cores, while latter will use only 1 thread for chore to scan dir.
064   */
065  public static final String CHORE_POOL_SIZE = "hbase.cleaner.scan.dir.concurrent.size";
066  static final String DEFAULT_CHORE_POOL_SIZE = "0.25";
067  /**
068   * Configures the threadpool used for scanning the Old logs directory for the LogCleaner Follows
069   * the same configuration mechanism as CHORE_POOL_SIZE, but has a default of 1 thread.
070   */
071  public static final String LOG_CLEANER_CHORE_SIZE = "hbase.log.cleaner.scan.dir.concurrent.size";
072  static final String DEFAULT_LOG_CLEANER_CHORE_POOL_SIZE = "1";
073  /**
074   * Enable the CleanerChore to sort the subdirectories by consumed space and start the cleaning
075   * with the largest subdirectory. Enabled by default.
076   */
077  public static final String LOG_CLEANER_CHORE_DIRECTORY_SORTING =
078    "hbase.cleaner.directory.sorting";
079  static final boolean DEFAULT_LOG_CLEANER_CHORE_DIRECTORY_SORTING = true;
080
081  private final DirScanPool pool;
082
083  protected final FileSystem fs;
084  private final Path oldFileDir;
085  private final Configuration conf;
086  protected final Map<String, Object> params;
087  private final AtomicBoolean enabled = new AtomicBoolean(true);
088  protected List<T> cleanersChain;
089  protected List<String> excludeDirs;
090  private CompletableFuture<Boolean> future;
091  private boolean forceRun;
092  private boolean sortDirectories;
093
094  public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
095    FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool) {
096    this(name, sleepPeriod, s, conf, fs, oldFileDir, confKey, pool, null, null);
097  }
098
099  /**
100   * @param name        name of the chore being run
101   * @param sleepPeriod the period of time to sleep between each run
102   * @param s           the stopper
103   * @param conf        configuration to use
104   * @param fs          handle to the FS
105   * @param oldFileDir  the path to the archived files
106   * @param confKey     configuration key for the classes to instantiate
107   * @param pool        the thread pool used to scan directories
108   * @param params      members could be used in cleaner
109   */
110  public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
111    FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool, Map<String, Object> params,
112    List<Path> excludePaths) {
113    super(name, s, sleepPeriod);
114
115    Preconditions.checkNotNull(pool, "Chore's pool can not be null");
116    this.pool = pool;
117    this.fs = fs;
118    this.oldFileDir = oldFileDir;
119    this.conf = conf;
120    this.params = params;
121    if (excludePaths != null && !excludePaths.isEmpty()) {
122      excludeDirs = new ArrayList<>(excludePaths.size());
123      for (Path path : excludePaths) {
124        StringBuilder dirPart = new StringBuilder(path.toString());
125        if (!path.toString().endsWith("/")) {
126          dirPart.append("/");
127        }
128        excludeDirs.add(dirPart.toString());
129      }
130    }
131    if (excludeDirs != null) {
132      LOG.info("Cleaner {} excludes sub dirs: {}", name, excludeDirs);
133    }
134    sortDirectories = conf.getBoolean(LOG_CLEANER_CHORE_DIRECTORY_SORTING,
135      DEFAULT_LOG_CLEANER_CHORE_DIRECTORY_SORTING);
136    initCleanerChain(confKey);
137  }
138
139  /**
140   * Calculate size for cleaner pool.
141   * @param poolSize size from configuration
142   * @return size of pool after calculation
143   */
144  static int calculatePoolSize(String poolSize) {
145    if (poolSize.matches("[1-9][0-9]*")) {
146      // If poolSize is an integer, return it directly,
147      // but upmost to the number of available processors.
148      int size = Math.min(Integer.parseInt(poolSize), AVAIL_PROCESSORS);
149      if (size == AVAIL_PROCESSORS) {
150        LOG.warn("Use full core processors to scan dir, size={}", size);
151      }
152      return size;
153    } else if (poolSize.matches("0.[0-9]+|1.0")) {
154      // if poolSize is a double, return poolSize * availableProcessors;
155      // Ensure that we always return at least one.
156      int computedThreads = (int) (AVAIL_PROCESSORS * Double.parseDouble(poolSize));
157      if (computedThreads < 1) {
158        LOG.debug("Computed {} threads for CleanerChore, using 1 instead", computedThreads);
159        return 1;
160      }
161      return computedThreads;
162    } else {
163      LOG.error("Unrecognized value: " + poolSize + " for " + CHORE_POOL_SIZE
164        + ", use default config: " + DEFAULT_CHORE_POOL_SIZE + " instead.");
165      return calculatePoolSize(DEFAULT_CHORE_POOL_SIZE);
166    }
167  }
168
169  /**
170   * Validate the file to see if it even belongs in the directory. If it is valid, then the file
171   * will go through the cleaner delegates, but otherwise the file is just deleted.
172   * @param file full {@link Path} of the file to be checked
173   * @return <tt>true</tt> if the file is valid, <tt>false</tt> otherwise
174   */
175  protected abstract boolean validate(Path file);
176
177  /**
178   * Instantiate and initialize all the file cleaners set in the configuration
179   * @param confKey key to get the file cleaner classes from the configuration
180   */
181  private void initCleanerChain(String confKey) {
182    this.cleanersChain = new ArrayList<>();
183    String[] cleaners = conf.getStrings(confKey);
184    if (cleaners != null) {
185      for (String className : cleaners) {
186        className = className.trim();
187        if (className.isEmpty()) {
188          continue;
189        }
190        T logCleaner = newFileCleaner(className, conf);
191        if (logCleaner != null) {
192          LOG.info("Initialize cleaner={}", className);
193          this.cleanersChain.add(logCleaner);
194        }
195      }
196    }
197  }
198
199  /**
200   * A utility method to create new instances of LogCleanerDelegate based on the class name of the
201   * LogCleanerDelegate.
202   * @param className fully qualified class name of the LogCleanerDelegate
203   * @param conf      used configuration
204   * @return the new instance
205   */
206  private T newFileCleaner(String className, Configuration conf) {
207    try {
208      Class<? extends FileCleanerDelegate> c =
209        Class.forName(className).asSubclass(FileCleanerDelegate.class);
210      @SuppressWarnings("unchecked")
211      T cleaner = (T) c.getDeclaredConstructor().newInstance();
212      cleaner.setConf(conf);
213      cleaner.init(this.params);
214      return cleaner;
215    } catch (Exception e) {
216      LOG.warn("Can NOT create CleanerDelegate={}", className, e);
217      // skipping if can't instantiate
218      return null;
219    }
220  }
221
222  @Override
223  protected boolean initialChore() {
224    synchronized (this) {
225      if (forceRun) {
226        // wake up the threads waiting in triggerCleanerNow, as a triggerNow may triggers the first
227        // loop where we will only call initialChore. We need to trigger another run immediately.
228        forceRun = false;
229        notifyAll();
230      }
231    }
232    return true;
233  }
234
235  @Override
236  protected void chore() {
237    CompletableFuture<Boolean> f;
238    synchronized (this) {
239      if (!enabled.get()) {
240        if (!forceRun) {
241          LOG.trace("Cleaner chore {} disabled! Not cleaning.", getName());
242          return;
243        } else {
244          LOG.info("Force executing cleaner chore {} when disabled", getName());
245        }
246      }
247      if (future != null) {
248        LOG.warn("A cleaner chore {}'s run is in progress, give up running", getName());
249        return;
250      }
251      f = new CompletableFuture<>();
252      future = f;
253      notifyAll();
254    }
255    pool.latchCountUp();
256    try {
257      preRunCleaner();
258      pool.execute(() -> traverseAndDelete(oldFileDir, true, f));
259      if (f.get()) {
260        LOG.trace("Cleaned all files under {}", oldFileDir);
261      } else {
262        LOG.trace("Files outstanding under {}", oldFileDir);
263      }
264    } catch (Exception e) {
265      LOG.info("Failed to traverse and delete the dir: {}", oldFileDir, e);
266    } finally {
267      postRunCleaner();
268      synchronized (this) {
269        future = null;
270        forceRun = false;
271      }
272      pool.latchCountDown();
273      // After each cleaner chore, checks if received reconfigure notification while cleaning.
274      // First in cleaner turns off notification, to avoid another cleaner updating pool again.
275      // This cleaner is waiting for other cleaners finishing their jobs.
276      // To avoid missing next chore, only wait 0.8 * period, then shutdown.
277      pool.tryUpdatePoolSize((long) (0.8 * getTimeUnit().toMillis(getPeriod())));
278    }
279  }
280
281  private void preRunCleaner() {
282    cleanersChain.forEach(FileCleanerDelegate::preClean);
283  }
284
285  private void postRunCleaner() {
286    cleanersChain.forEach(FileCleanerDelegate::postClean);
287  }
288
289  /**
290   * Trigger the cleaner immediately and return a CompletableFuture for getting the result. Return
291   * {@code true} means all the old files have been deleted, otherwise {@code false}.
292   */
293  public synchronized CompletableFuture<Boolean> triggerCleanerNow() throws InterruptedException {
294    for (;;) {
295      if (future != null) {
296        return future;
297      }
298      forceRun = true;
299      if (!triggerNow()) {
300        return CompletableFuture.completedFuture(false);
301      }
302      wait();
303    }
304  }
305
306  /**
307   * Sort the given list in (descending) order of the space each element takes
308   * @param dirs the list to sort, element in it should be directory (not file)
309   */
310  private void sortByConsumedSpace(List<FileStatus> dirs) {
311    if (dirs == null || dirs.size() < 2) {
312      // no need to sort for empty or single directory
313      return;
314    }
315    dirs.sort(new Comparator<FileStatus>() {
316      HashMap<FileStatus, Long> directorySpaces = new HashMap<>();
317
318      @Override
319      public int compare(FileStatus f1, FileStatus f2) {
320        long f1ConsumedSpace = getSpace(f1);
321        long f2ConsumedSpace = getSpace(f2);
322        return Long.compare(f2ConsumedSpace, f1ConsumedSpace);
323      }
324
325      private long getSpace(FileStatus f) {
326        Long cached = directorySpaces.get(f);
327        if (cached != null) {
328          return cached;
329        }
330        try {
331          long space =
332            f.isDirectory() ? fs.getContentSummary(f.getPath()).getSpaceConsumed() : f.getLen();
333          directorySpaces.put(f, space);
334          return space;
335        } catch (IOException e) {
336          LOG.trace("Failed to get space consumed by path={}", f, e);
337          return -1;
338        }
339      }
340    });
341  }
342
343  /**
344   * Run the given files through each of the cleaners to see if it should be deleted, deleting it if
345   * necessary.
346   * @param files List of FileStatus for the files to check (and possibly delete)
347   * @return true iff successfully deleted all files
348   */
349  private boolean checkAndDeleteFiles(List<FileStatus> files) {
350    if (files == null) {
351      return true;
352    }
353
354    // first check to see if the path is valid
355    List<FileStatus> validFiles = Lists.newArrayListWithCapacity(files.size());
356    List<FileStatus> invalidFiles = Lists.newArrayList();
357    for (FileStatus file : files) {
358      if (validate(file.getPath())) {
359        validFiles.add(file);
360      } else {
361        LOG.warn("Found a wrongly formatted file: " + file.getPath() + " - will delete it.");
362        invalidFiles.add(file);
363      }
364    }
365
366    Iterable<FileStatus> deletableValidFiles = validFiles;
367    // check each of the cleaners for the valid files
368    for (T cleaner : cleanersChain) {
369      if (cleaner.isStopped() || this.getStopper().isStopped()) {
370        LOG.warn("A file cleaner" + this.getName() + " is stopped, won't delete any more files in:"
371          + this.oldFileDir);
372        return false;
373      }
374
375      Iterable<FileStatus> filteredFiles = cleaner.getDeletableFiles(deletableValidFiles);
376
377      // trace which cleaner is holding on to each file
378      if (LOG.isTraceEnabled()) {
379        ImmutableSet<FileStatus> filteredFileSet = ImmutableSet.copyOf(filteredFiles);
380        for (FileStatus file : deletableValidFiles) {
381          if (!filteredFileSet.contains(file)) {
382            LOG.trace(file.getPath() + " is not deletable according to:" + cleaner);
383          }
384        }
385      }
386
387      deletableValidFiles = filteredFiles;
388    }
389
390    Iterable<FileStatus> filesToDelete = Iterables.concat(invalidFiles, deletableValidFiles);
391    return deleteFiles(filesToDelete) == files.size();
392  }
393
394  /**
395   * Check if a empty directory with no subdirs or subfiles can be deleted
396   * @param dir Path of the directory
397   * @return True if the directory can be deleted, otherwise false
398   */
399  private boolean isEmptyDirDeletable(Path dir) {
400    for (T cleaner : cleanersChain) {
401      if (cleaner.isStopped() || this.getStopper().isStopped()) {
402        LOG.warn("A file cleaner {} is stopped, won't delete the empty directory {}",
403          this.getName(), dir);
404        return false;
405      }
406      if (!cleaner.isEmptyDirDeletable(dir)) {
407        // If one of the cleaner need the empty directory, skip delete it
408        return false;
409      }
410    }
411    return true;
412  }
413
414  /**
415   * Delete the given files
416   * @param filesToDelete files to delete
417   * @return number of deleted files
418   */
419  protected int deleteFiles(Iterable<FileStatus> filesToDelete) {
420    int deletedFileCount = 0;
421    for (FileStatus file : filesToDelete) {
422      Path filePath = file.getPath();
423      LOG.trace("Removing {} from archive", filePath);
424      try {
425        boolean success = this.fs.delete(filePath, false);
426        if (success) {
427          deletedFileCount++;
428        } else {
429          LOG.warn("Attempted to delete:" + filePath
430            + ", but couldn't. Run cleaner chain and attempt to delete on next pass.");
431        }
432      } catch (IOException e) {
433        e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
434        LOG.warn("Error while deleting: " + filePath, e);
435      }
436    }
437    return deletedFileCount;
438  }
439
440  @Override
441  public synchronized void cleanup() {
442    for (T lc : this.cleanersChain) {
443      try {
444        lc.stop("Exiting");
445      } catch (Throwable t) {
446        LOG.warn("Stopping", t);
447      }
448    }
449  }
450
451  int getChorePoolSize() {
452    return pool.getSize();
453  }
454
455  public boolean setEnabled(final boolean enabled) {
456    return this.enabled.getAndSet(enabled);
457  }
458
459  public boolean getEnabled() {
460    return this.enabled.get();
461  }
462
463  private interface Action<T> {
464    T act() throws Exception;
465  }
466
467  /**
468   * Attempts to clean up a directory(its subdirectories, and files) in a
469   * {@link java.util.concurrent.ThreadPoolExecutor} concurrently. We can get the final result by
470   * calling result.get().
471   */
472  private void traverseAndDelete(Path dir, boolean root, CompletableFuture<Boolean> result) {
473    try {
474      // Step.1: List all files under the given directory.
475      List<FileStatus> allPaths = Arrays.asList(fs.listStatus(dir));
476      List<FileStatus> subDirs =
477        allPaths.stream().filter(FileStatus::isDirectory).collect(Collectors.toList());
478      List<FileStatus> files =
479        allPaths.stream().filter(FileStatus::isFile).collect(Collectors.toList());
480
481      // Step.2: Try to delete all the deletable files.
482      boolean allFilesDeleted =
483        files.isEmpty() || deleteAction(() -> checkAndDeleteFiles(files), "files", dir);
484
485      // Step.3: Start to traverse and delete the sub-directories.
486      List<CompletableFuture<Boolean>> futures = new ArrayList<>();
487      if (!subDirs.isEmpty()) {
488        if (sortDirectories) {
489          sortByConsumedSpace(subDirs);
490        }
491        // Submit the request of sub-directory deletion.
492        subDirs.forEach(subDir -> {
493          if (!shouldExclude(subDir)) {
494            CompletableFuture<Boolean> subFuture = new CompletableFuture<>();
495            pool.execute(() -> traverseAndDelete(subDir.getPath(), false, subFuture));
496            futures.add(subFuture);
497          }
498        });
499      }
500
501      // Step.4: Once all sub-files & sub-directories are deleted, then can try to delete the
502      // current directory asynchronously.
503      FutureUtils.addListener(
504        CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])),
505        (voidObj, e) -> {
506          if (e != null) {
507            result.completeExceptionally(FutureUtils.unwrapCompletionException(e));
508            return;
509          }
510          try {
511            boolean allSubDirsDeleted = futures.stream().allMatch(CompletableFuture::join);
512            boolean deleted = allFilesDeleted && allSubDirsDeleted && isEmptyDirDeletable(dir);
513            if (deleted && !root) {
514              // If and only if files and sub-dirs under current dir are deleted successfully, and
515              // the empty directory can be deleted, and it is not the root dir then task will
516              // try to delete it.
517              deleted = deleteAction(() -> fs.delete(dir, false), "dir", dir);
518            }
519            result.complete(deleted);
520          } catch (Exception ie) {
521            // Must handle the inner exception here, otherwise the result may get stuck if one
522            // sub-directory get some failure.
523            result.completeExceptionally(ie);
524          }
525        });
526    } catch (Exception e) {
527      if (e instanceof FileNotFoundException) {
528        LOG.debug("Dir dose not exist, {}", dir);
529      } else {
530        LOG.error("Failed to traverse and delete the path: {}", dir, e);
531      }
532      result.completeExceptionally(e);
533    }
534  }
535
536  /**
537   * Check if a path should not perform clear
538   */
539  private boolean shouldExclude(FileStatus f) {
540    if (!f.isDirectory()) {
541      return false;
542    }
543    if (excludeDirs != null && !excludeDirs.isEmpty()) {
544      for (String dirPart : excludeDirs) {
545        // since we make excludeDirs end with '/',
546        // if a path contains() the dirPart, the path should be excluded
547        if (f.getPath().toString().contains(dirPart)) {
548          return true;
549        }
550      }
551    }
552    return false;
553  }
554
555  /**
556   * Perform a delete on a specified type.
557   * @param deletion a delete
558   * @param type     possible values are 'files', 'subdirs', 'dirs'
559   * @return true if it deleted successfully, false otherwise
560   */
561  private boolean deleteAction(Action<Boolean> deletion, String type, Path dir) {
562    boolean deleted;
563    try {
564      LOG.trace("Start deleting {} under {}", type, dir);
565      deleted = deletion.act();
566    } catch (PathIsNotEmptyDirectoryException exception) {
567      // N.B. HDFS throws this exception when we try to delete a non-empty directory, but
568      // LocalFileSystem throws a bare IOException. So some test code will get the verbose
569      // message below.
570      LOG.debug("Couldn't delete '{}' yet because it isn't empty w/exception.", dir, exception);
571      deleted = false;
572    } catch (IOException ioe) {
573      if (LOG.isTraceEnabled()) {
574        LOG.trace("Could not delete {} under {}; will retry. If it keeps happening, "
575          + "quote the exception when asking on mailing list.", type, dir, ioe);
576      } else {
577        LOG.info(
578          "Could not delete {} under {} because {}; will retry. If it  keeps happening, enable"
579            + "TRACE-level logging and quote the exception when asking on mailing list.",
580          type, dir, ioe.getMessage());
581      }
582      deleted = false;
583    } catch (Exception e) {
584      LOG.info("unexpected exception: ", e);
585      deleted = false;
586    }
587    LOG.trace("Finish deleting {} under {}, deleted = {}", type, dir, deleted);
588    return deleted;
589  }
590}