001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.cleaner;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Arrays;
024import java.util.Comparator;
025import java.util.HashMap;
026import java.util.LinkedList;
027import java.util.List;
028import java.util.Map;
029import java.util.concurrent.CompletableFuture;
030import java.util.concurrent.atomic.AtomicBoolean;
031import java.util.stream.Collectors;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FileStatus;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.fs.Path;
036import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
037import org.apache.hadoop.hbase.ScheduledChore;
038import org.apache.hadoop.hbase.Stoppable;
039import org.apache.hadoop.hbase.util.FutureUtils;
040import org.apache.hadoop.ipc.RemoteException;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
046import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
047import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
048import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
049
050/**
051 * Abstract Cleaner that uses a chain of delegates to clean a directory of files
052 * @param <T> Cleaner delegate class that is dynamically loaded from configuration
053 */
054@InterfaceAudience.Private
055public abstract class CleanerChore<T extends FileCleanerDelegate> extends ScheduledChore {
056
057  private static final Logger LOG = LoggerFactory.getLogger(CleanerChore.class);
058  private static final int AVAIL_PROCESSORS = Runtime.getRuntime().availableProcessors();
059
060  /**
061   * Configures the threadpool used for scanning the archive directory for the HFileCleaner If it is
062   * an integer and >= 1, it would be the size; if 0.0 < size <= 1.0, size would be available
063   * processors * size. Pay attention that 1.0 is different from 1, former indicates it will use
064   * 100% of cores, while latter will use only 1 thread for chore to scan dir.
065   */
066  public static final String CHORE_POOL_SIZE = "hbase.cleaner.scan.dir.concurrent.size";
067  static final String DEFAULT_CHORE_POOL_SIZE = "0.25";
068  /**
069   * Configures the threadpool used for scanning the Old logs directory for the LogCleaner Follows
070   * the same configuration mechanism as CHORE_POOL_SIZE, but has a default of 1 thread.
071   */
072  public static final String LOG_CLEANER_CHORE_SIZE = "hbase.log.cleaner.scan.dir.concurrent.size";
073  static final String DEFAULT_LOG_CLEANER_CHORE_POOL_SIZE = "1";
074
075  private final DirScanPool pool;
076
077  protected final FileSystem fs;
078  private final Path oldFileDir;
079  private final Configuration conf;
080  protected final Map<String, Object> params;
081  private final AtomicBoolean enabled = new AtomicBoolean(true);
082  protected List<T> cleanersChain;
083  protected List<String> excludeDirs;
084
085  public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
086    FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool) {
087    this(name, sleepPeriod, s, conf, fs, oldFileDir, confKey, pool, null, null);
088  }
089
090  /**
091   * @param name        name of the chore being run
092   * @param sleepPeriod the period of time to sleep between each run
093   * @param s           the stopper
094   * @param conf        configuration to use
095   * @param fs          handle to the FS
096   * @param oldFileDir  the path to the archived files
097   * @param confKey     configuration key for the classes to instantiate
098   * @param pool        the thread pool used to scan directories
099   * @param params      members could be used in cleaner
100   */
101  public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
102    FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool, Map<String, Object> params,
103    List<Path> excludePaths) {
104    super(name, s, sleepPeriod);
105
106    Preconditions.checkNotNull(pool, "Chore's pool can not be null");
107    this.pool = pool;
108    this.fs = fs;
109    this.oldFileDir = oldFileDir;
110    this.conf = conf;
111    this.params = params;
112    if (excludePaths != null && !excludePaths.isEmpty()) {
113      excludeDirs = new ArrayList<>(excludePaths.size());
114      for (Path path : excludePaths) {
115        StringBuilder dirPart = new StringBuilder(path.toString());
116        if (!path.toString().endsWith("/")) {
117          dirPart.append("/");
118        }
119        excludeDirs.add(dirPart.toString());
120      }
121    }
122    if (excludeDirs != null) {
123      LOG.info("Cleaner {} excludes sub dirs: {}", name, excludeDirs);
124    }
125    initCleanerChain(confKey);
126  }
127
128  /**
129   * Calculate size for cleaner pool.
130   * @param poolSize size from configuration
131   * @return size of pool after calculation
132   */
133  static int calculatePoolSize(String poolSize) {
134    if (poolSize.matches("[1-9][0-9]*")) {
135      // If poolSize is an integer, return it directly,
136      // but upmost to the number of available processors.
137      int size = Math.min(Integer.parseInt(poolSize), AVAIL_PROCESSORS);
138      if (size == AVAIL_PROCESSORS) {
139        LOG.warn("Use full core processors to scan dir, size={}", size);
140      }
141      return size;
142    } else if (poolSize.matches("0.[0-9]+|1.0")) {
143      // if poolSize is a double, return poolSize * availableProcessors;
144      // Ensure that we always return at least one.
145      int computedThreads = (int) (AVAIL_PROCESSORS * Double.parseDouble(poolSize));
146      if (computedThreads < 1) {
147        LOG.debug("Computed {} threads for CleanerChore, using 1 instead", computedThreads);
148        return 1;
149      }
150      return computedThreads;
151    } else {
152      LOG.error("Unrecognized value: " + poolSize + " for " + CHORE_POOL_SIZE
153        + ", use default config: " + DEFAULT_CHORE_POOL_SIZE + " instead.");
154      return calculatePoolSize(DEFAULT_CHORE_POOL_SIZE);
155    }
156  }
157
158  /**
159   * Validate the file to see if it even belongs in the directory. If it is valid, then the file
160   * will go through the cleaner delegates, but otherwise the file is just deleted.
161   * @param file full {@link Path} of the file to be checked
162   * @return <tt>true</tt> if the file is valid, <tt>false</tt> otherwise
163   */
164  protected abstract boolean validate(Path file);
165
166  /**
167   * Instantiate and initialize all the file cleaners set in the configuration
168   * @param confKey key to get the file cleaner classes from the configuration
169   */
170  private void initCleanerChain(String confKey) {
171    this.cleanersChain = new LinkedList<>();
172    String[] logCleaners = conf.getStrings(confKey);
173    if (logCleaners != null) {
174      for (String className : logCleaners) {
175        className = className.trim();
176        if (className.isEmpty()) {
177          continue;
178        }
179        T logCleaner = newFileCleaner(className, conf);
180        if (logCleaner != null) {
181          LOG.info("Initialize cleaner={}", className);
182          this.cleanersChain.add(logCleaner);
183        }
184      }
185    }
186  }
187
188  /**
189   * A utility method to create new instances of LogCleanerDelegate based on the class name of the
190   * LogCleanerDelegate.
191   * @param className fully qualified class name of the LogCleanerDelegate
192   * @param conf      used configuration
193   * @return the new instance
194   */
195  private T newFileCleaner(String className, Configuration conf) {
196    try {
197      Class<? extends FileCleanerDelegate> c =
198        Class.forName(className).asSubclass(FileCleanerDelegate.class);
199      @SuppressWarnings("unchecked")
200      T cleaner = (T) c.getDeclaredConstructor().newInstance();
201      cleaner.setConf(conf);
202      cleaner.init(this.params);
203      return cleaner;
204    } catch (Exception e) {
205      LOG.warn("Can NOT create CleanerDelegate={}", className, e);
206      // skipping if can't instantiate
207      return null;
208    }
209  }
210
211  @Override
212  protected void chore() {
213    if (getEnabled()) {
214      try {
215        pool.latchCountUp();
216        if (runCleaner()) {
217          LOG.trace("Cleaned all WALs under {}", oldFileDir);
218        } else {
219          LOG.trace("WALs outstanding under {}", oldFileDir);
220        }
221      } finally {
222        pool.latchCountDown();
223      }
224      // After each cleaner chore, checks if received reconfigure notification while cleaning.
225      // First in cleaner turns off notification, to avoid another cleaner updating pool again.
226      // This cleaner is waiting for other cleaners finishing their jobs.
227      // To avoid missing next chore, only wait 0.8 * period, then shutdown.
228      pool.tryUpdatePoolSize((long) (0.8 * getTimeUnit().toMillis(getPeriod())));
229    } else {
230      LOG.trace("Cleaner chore disabled! Not cleaning.");
231    }
232  }
233
234  private void preRunCleaner() {
235    cleanersChain.forEach(FileCleanerDelegate::preClean);
236  }
237
238  public boolean runCleaner() {
239    preRunCleaner();
240    try {
241      CompletableFuture<Boolean> future = new CompletableFuture<>();
242      pool.execute(() -> traverseAndDelete(oldFileDir, true, future));
243      return future.get();
244    } catch (Exception e) {
245      LOG.info("Failed to traverse and delete the dir: {}", oldFileDir, e);
246      return false;
247    }
248  }
249
250  /**
251   * Sort the given list in (descending) order of the space each element takes
252   * @param dirs the list to sort, element in it should be directory (not file)
253   */
254  private void sortByConsumedSpace(List<FileStatus> dirs) {
255    if (dirs == null || dirs.size() < 2) {
256      // no need to sort for empty or single directory
257      return;
258    }
259    dirs.sort(new Comparator<FileStatus>() {
260      HashMap<FileStatus, Long> directorySpaces = new HashMap<>();
261
262      @Override
263      public int compare(FileStatus f1, FileStatus f2) {
264        long f1ConsumedSpace = getSpace(f1);
265        long f2ConsumedSpace = getSpace(f2);
266        return Long.compare(f2ConsumedSpace, f1ConsumedSpace);
267      }
268
269      private long getSpace(FileStatus f) {
270        Long cached = directorySpaces.get(f);
271        if (cached != null) {
272          return cached;
273        }
274        try {
275          long space =
276            f.isDirectory() ? fs.getContentSummary(f.getPath()).getSpaceConsumed() : f.getLen();
277          directorySpaces.put(f, space);
278          return space;
279        } catch (IOException e) {
280          LOG.trace("Failed to get space consumed by path={}", f, e);
281          return -1;
282        }
283      }
284    });
285  }
286
287  /**
288   * Run the given files through each of the cleaners to see if it should be deleted, deleting it if
289   * necessary.
290   * @param files List of FileStatus for the files to check (and possibly delete)
291   * @return true iff successfully deleted all files
292   */
293  private boolean checkAndDeleteFiles(List<FileStatus> files) {
294    if (files == null) {
295      return true;
296    }
297
298    // first check to see if the path is valid
299    List<FileStatus> validFiles = Lists.newArrayListWithCapacity(files.size());
300    List<FileStatus> invalidFiles = Lists.newArrayList();
301    for (FileStatus file : files) {
302      if (validate(file.getPath())) {
303        validFiles.add(file);
304      } else {
305        LOG.warn("Found a wrongly formatted file: " + file.getPath() + " - will delete it.");
306        invalidFiles.add(file);
307      }
308    }
309
310    Iterable<FileStatus> deletableValidFiles = validFiles;
311    // check each of the cleaners for the valid files
312    for (T cleaner : cleanersChain) {
313      if (cleaner.isStopped() || this.getStopper().isStopped()) {
314        LOG.warn("A file cleaner" + this.getName() + " is stopped, won't delete any more files in:"
315          + this.oldFileDir);
316        return false;
317      }
318
319      Iterable<FileStatus> filteredFiles = cleaner.getDeletableFiles(deletableValidFiles);
320
321      // trace which cleaner is holding on to each file
322      if (LOG.isTraceEnabled()) {
323        ImmutableSet<FileStatus> filteredFileSet = ImmutableSet.copyOf(filteredFiles);
324        for (FileStatus file : deletableValidFiles) {
325          if (!filteredFileSet.contains(file)) {
326            LOG.trace(file.getPath() + " is not deletable according to:" + cleaner);
327          }
328        }
329      }
330
331      deletableValidFiles = filteredFiles;
332    }
333
334    Iterable<FileStatus> filesToDelete = Iterables.concat(invalidFiles, deletableValidFiles);
335    return deleteFiles(filesToDelete) == files.size();
336  }
337
338  /**
339   * Check if a empty directory with no subdirs or subfiles can be deleted
340   * @param dir Path of the directory
341   * @return True if the directory can be deleted, otherwise false
342   */
343  private boolean isEmptyDirDeletable(Path dir) {
344    for (T cleaner : cleanersChain) {
345      if (cleaner.isStopped() || this.getStopper().isStopped()) {
346        LOG.warn("A file cleaner {} is stopped, won't delete the empty directory {}",
347          this.getName(), dir);
348        return false;
349      }
350      if (!cleaner.isEmptyDirDeletable(dir)) {
351        // If one of the cleaner need the empty directory, skip delete it
352        return false;
353      }
354    }
355    return true;
356  }
357
358  /**
359   * Delete the given files
360   * @param filesToDelete files to delete
361   * @return number of deleted files
362   */
363  protected int deleteFiles(Iterable<FileStatus> filesToDelete) {
364    int deletedFileCount = 0;
365    for (FileStatus file : filesToDelete) {
366      Path filePath = file.getPath();
367      LOG.trace("Removing {} from archive", filePath);
368      try {
369        boolean success = this.fs.delete(filePath, false);
370        if (success) {
371          deletedFileCount++;
372        } else {
373          LOG.warn("Attempted to delete:" + filePath
374            + ", but couldn't. Run cleaner chain and attempt to delete on next pass.");
375        }
376      } catch (IOException e) {
377        e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
378        LOG.warn("Error while deleting: " + filePath, e);
379      }
380    }
381    return deletedFileCount;
382  }
383
384  @Override
385  public synchronized void cleanup() {
386    for (T lc : this.cleanersChain) {
387      try {
388        lc.stop("Exiting");
389      } catch (Throwable t) {
390        LOG.warn("Stopping", t);
391      }
392    }
393  }
394
395  int getChorePoolSize() {
396    return pool.getSize();
397  }
398
399  /**
400   * n
401   */
402  public boolean setEnabled(final boolean enabled) {
403    return this.enabled.getAndSet(enabled);
404  }
405
406  public boolean getEnabled() {
407    return this.enabled.get();
408  }
409
410  private interface Action<T> {
411    T act() throws Exception;
412  }
413
414  /**
415   * Attempts to clean up a directory(its subdirectories, and files) in a
416   * {@link java.util.concurrent.ThreadPoolExecutor} concurrently. We can get the final result by
417   * calling result.get().
418   */
419  private void traverseAndDelete(Path dir, boolean root, CompletableFuture<Boolean> result) {
420    try {
421      // Step.1: List all files under the given directory.
422      List<FileStatus> allPaths = Arrays.asList(fs.listStatus(dir));
423      List<FileStatus> subDirs =
424        allPaths.stream().filter(FileStatus::isDirectory).collect(Collectors.toList());
425      List<FileStatus> files =
426        allPaths.stream().filter(FileStatus::isFile).collect(Collectors.toList());
427
428      // Step.2: Try to delete all the deletable files.
429      boolean allFilesDeleted =
430        files.isEmpty() || deleteAction(() -> checkAndDeleteFiles(files), "files", dir);
431
432      // Step.3: Start to traverse and delete the sub-directories.
433      List<CompletableFuture<Boolean>> futures = new ArrayList<>();
434      if (!subDirs.isEmpty()) {
435        sortByConsumedSpace(subDirs);
436        // Submit the request of sub-directory deletion.
437        subDirs.forEach(subDir -> {
438          if (!shouldExclude(subDir)) {
439            CompletableFuture<Boolean> subFuture = new CompletableFuture<>();
440            pool.execute(() -> traverseAndDelete(subDir.getPath(), false, subFuture));
441            futures.add(subFuture);
442          }
443        });
444      }
445
446      // Step.4: Once all sub-files & sub-directories are deleted, then can try to delete the
447      // current directory asynchronously.
448      FutureUtils.addListener(
449        CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])),
450        (voidObj, e) -> {
451          if (e != null) {
452            result.completeExceptionally(e);
453            return;
454          }
455          try {
456            boolean allSubDirsDeleted = futures.stream().allMatch(CompletableFuture::join);
457            boolean deleted = allFilesDeleted && allSubDirsDeleted && isEmptyDirDeletable(dir);
458            if (deleted && !root) {
459              // If and only if files and sub-dirs under current dir are deleted successfully, and
460              // the empty directory can be deleted, and it is not the root dir then task will
461              // try to delete it.
462              deleted = deleteAction(() -> fs.delete(dir, false), "dir", dir);
463            }
464            result.complete(deleted);
465          } catch (Exception ie) {
466            // Must handle the inner exception here, otherwise the result may get stuck if one
467            // sub-directory get some failure.
468            result.completeExceptionally(ie);
469          }
470        });
471    } catch (Exception e) {
472      if (e instanceof FileNotFoundException) {
473        LOG.debug("Dir dose not exist, {}", dir);
474      } else {
475        LOG.error("Failed to traverse and delete the path: {}", dir, e);
476      }
477      result.completeExceptionally(e);
478    }
479  }
480
481  /**
482   * Check if a path should not perform clear
483   */
484  private boolean shouldExclude(FileStatus f) {
485    if (!f.isDirectory()) {
486      return false;
487    }
488    if (excludeDirs != null && !excludeDirs.isEmpty()) {
489      for (String dirPart : excludeDirs) {
490        // since we make excludeDirs end with '/',
491        // if a path contains() the dirPart, the path should be excluded
492        if (f.getPath().toString().contains(dirPart)) {
493          return true;
494        }
495      }
496    }
497    return false;
498  }
499
500  /**
501   * Perform a delete on a specified type.
502   * @param deletion a delete
503   * @param type     possible values are 'files', 'subdirs', 'dirs'
504   * @return true if it deleted successfully, false otherwise
505   */
506  private boolean deleteAction(Action<Boolean> deletion, String type, Path dir) {
507    boolean deleted;
508    try {
509      LOG.trace("Start deleting {} under {}", type, dir);
510      deleted = deletion.act();
511    } catch (PathIsNotEmptyDirectoryException exception) {
512      // N.B. HDFS throws this exception when we try to delete a non-empty directory, but
513      // LocalFileSystem throws a bare IOException. So some test code will get the verbose
514      // message below.
515      LOG.debug("Couldn't delete '{}' yet because it isn't empty w/exception.", dir, exception);
516      deleted = false;
517    } catch (IOException ioe) {
518      if (LOG.isTraceEnabled()) {
519        LOG.trace("Could not delete {} under {}; will retry. If it keeps happening, "
520          + "quote the exception when asking on mailing list.", type, dir, ioe);
521      } else {
522        LOG.info(
523          "Could not delete {} under {} because {}; will retry. If it  keeps happening, enable"
524            + "TRACE-level logging and quote the exception when asking on mailing list.",
525          type, dir, ioe.getMessage());
526      }
527      deleted = false;
528    } catch (Exception e) {
529      LOG.info("unexpected exception: ", e);
530      deleted = false;
531    }
532    LOG.trace("Finish deleting {} under {}, deleted = {}", type, dir, deleted);
533    return deleted;
534  }
535}