001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.cleaner;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Arrays;
023import java.util.Comparator;
024import java.util.HashMap;
025import java.util.LinkedList;
026import java.util.List;
027import java.util.Map;
028import java.util.concurrent.CompletableFuture;
029import java.util.concurrent.atomic.AtomicBoolean;
030import java.util.stream.Collectors;
031
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FileStatus;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.fs.Path;
036import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
037import org.apache.hadoop.hbase.ScheduledChore;
038import org.apache.hadoop.hbase.Stoppable;
039import org.apache.hadoop.hbase.util.FutureUtils;
040import org.apache.hadoop.ipc.RemoteException;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
046import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
047import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
048import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
049import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
050
051/**
052 * Abstract Cleaner that uses a chain of delegates to clean a directory of files
053 * @param <T> Cleaner delegate class that is dynamically loaded from configuration
054 */
055@InterfaceAudience.Private
056public abstract class CleanerChore<T extends FileCleanerDelegate> extends ScheduledChore {
057
058  private static final Logger LOG = LoggerFactory.getLogger(CleanerChore.class);
059  private static final int AVAIL_PROCESSORS = Runtime.getRuntime().availableProcessors();
060
061  /**
062   * If it is an integer and >= 1, it would be the size;
063   * if 0.0 < size <= 1.0, size would be available processors * size.
064   * Pay attention that 1.0 is different from 1, former indicates it will use 100% of cores,
065   * while latter will use only 1 thread for chore to scan dir.
066   */
067  public static final String CHORE_POOL_SIZE = "hbase.cleaner.scan.dir.concurrent.size";
068  static final String DEFAULT_CHORE_POOL_SIZE = "0.25";
069
070  private final DirScanPool pool;
071
072  protected final FileSystem fs;
073  private final Path oldFileDir;
074  private final Configuration conf;
075  protected final Map<String, Object> params;
076  private final AtomicBoolean enabled = new AtomicBoolean(true);
077  protected List<T> cleanersChain;
078
079  public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
080    FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool) {
081    this(name, sleepPeriod, s, conf, fs, oldFileDir, confKey, pool, null);
082  }
083
084  /**
085   * @param name name of the chore being run
086   * @param sleepPeriod the period of time to sleep between each run
087   * @param s the stopper
088   * @param conf configuration to use
089   * @param fs handle to the FS
090   * @param oldFileDir the path to the archived files
091   * @param confKey configuration key for the classes to instantiate
092   * @param pool the thread pool used to scan directories
093   * @param params members could be used in cleaner
094   */
095  public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
096    FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool, Map<String, Object> params) {
097    super(name, s, sleepPeriod);
098
099    Preconditions.checkNotNull(pool, "Chore's pool can not be null");
100    this.pool = pool;
101    this.fs = fs;
102    this.oldFileDir = oldFileDir;
103    this.conf = conf;
104    this.params = params;
105    initCleanerChain(confKey);
106  }
107
108  /**
109   * Calculate size for cleaner pool.
110   * @param poolSize size from configuration
111   * @return size of pool after calculation
112   */
113  static int calculatePoolSize(String poolSize) {
114    if (poolSize.matches("[1-9][0-9]*")) {
115      // If poolSize is an integer, return it directly,
116      // but upmost to the number of available processors.
117      int size = Math.min(Integer.parseInt(poolSize), AVAIL_PROCESSORS);
118      if (size == AVAIL_PROCESSORS) {
119        LOG.warn("Use full core processors to scan dir, size={}", size);
120      }
121      return size;
122    } else if (poolSize.matches("0.[0-9]+|1.0")) {
123      // if poolSize is a double, return poolSize * availableProcessors;
124      // Ensure that we always return at least one.
125      int computedThreads = (int) (AVAIL_PROCESSORS * Double.valueOf(poolSize));
126      if (computedThreads < 1) {
127        LOG.debug("Computed {} threads for CleanerChore, using 1 instead", computedThreads);
128        return 1;
129      }
130      return computedThreads;
131    } else {
132      LOG.error("Unrecognized value: " + poolSize + " for " + CHORE_POOL_SIZE +
133          ", use default config: " + DEFAULT_CHORE_POOL_SIZE + " instead.");
134      return calculatePoolSize(DEFAULT_CHORE_POOL_SIZE);
135    }
136  }
137
138  /**
139   * Validate the file to see if it even belongs in the directory. If it is valid, then the file
140   * will go through the cleaner delegates, but otherwise the file is just deleted.
141   * @param file full {@link Path} of the file to be checked
142   * @return <tt>true</tt> if the file is valid, <tt>false</tt> otherwise
143   */
144  protected abstract boolean validate(Path file);
145
146  /**
147   * Instantiate and initialize all the file cleaners set in the configuration
148   * @param confKey key to get the file cleaner classes from the configuration
149   */
150  private void initCleanerChain(String confKey) {
151    this.cleanersChain = new LinkedList<>();
152    String[] logCleaners = conf.getStrings(confKey);
153    if (logCleaners != null) {
154      for (String className: logCleaners) {
155        className = className.trim();
156        if (className.isEmpty()) {
157          continue;
158        }
159        T logCleaner = newFileCleaner(className, conf);
160        if (logCleaner != null) {
161          LOG.info("Initialize cleaner={}", className);
162          this.cleanersChain.add(logCleaner);
163        }
164      }
165    }
166  }
167
168  /**
169   * A utility method to create new instances of LogCleanerDelegate based on the class name of the
170   * LogCleanerDelegate.
171   * @param className fully qualified class name of the LogCleanerDelegate
172   * @param conf used configuration
173   * @return the new instance
174   */
175  private T newFileCleaner(String className, Configuration conf) {
176    try {
177      Class<? extends FileCleanerDelegate> c = Class.forName(className).asSubclass(
178        FileCleanerDelegate.class);
179      @SuppressWarnings("unchecked")
180      T cleaner = (T) c.getDeclaredConstructor().newInstance();
181      cleaner.setConf(conf);
182      cleaner.init(this.params);
183      return cleaner;
184    } catch (Exception e) {
185      LOG.warn("Can NOT create CleanerDelegate={}", className, e);
186      // skipping if can't instantiate
187      return null;
188    }
189  }
190
191  @Override
192  protected void chore() {
193    if (getEnabled()) {
194      try {
195        pool.latchCountUp();
196        if (runCleaner()) {
197          LOG.trace("Cleaned all WALs under {}", oldFileDir);
198        } else {
199          LOG.trace("WALs outstanding under {}", oldFileDir);
200        }
201      } finally {
202        pool.latchCountDown();
203      }
204      // After each cleaner chore, checks if received reconfigure notification while cleaning.
205      // First in cleaner turns off notification, to avoid another cleaner updating pool again.
206      // This cleaner is waiting for other cleaners finishing their jobs.
207      // To avoid missing next chore, only wait 0.8 * period, then shutdown.
208      pool.tryUpdatePoolSize((long) (0.8 * getTimeUnit().toMillis(getPeriod())));
209    } else {
210      LOG.trace("Cleaner chore disabled! Not cleaning.");
211    }
212  }
213
214  private void preRunCleaner() {
215    cleanersChain.forEach(FileCleanerDelegate::preClean);
216  }
217
218  public boolean runCleaner() {
219    preRunCleaner();
220    try {
221      CompletableFuture<Boolean> future = new CompletableFuture<>();
222      pool.execute(() -> traverseAndDelete(oldFileDir, true, future));
223      return future.get();
224    } catch (Exception e) {
225      LOG.info("Failed to traverse and delete the dir: {}", oldFileDir, e);
226      return false;
227    }
228  }
229
230  /**
231   * Sort the given list in (descending) order of the space each element takes
232   * @param dirs the list to sort, element in it should be directory (not file)
233   */
234  private void sortByConsumedSpace(List<FileStatus> dirs) {
235    if (dirs == null || dirs.size() < 2) {
236      // no need to sort for empty or single directory
237      return;
238    }
239    dirs.sort(new Comparator<FileStatus>() {
240      HashMap<FileStatus, Long> directorySpaces = new HashMap<>();
241
242      @Override
243      public int compare(FileStatus f1, FileStatus f2) {
244        long f1ConsumedSpace = getSpace(f1);
245        long f2ConsumedSpace = getSpace(f2);
246        return Long.compare(f2ConsumedSpace, f1ConsumedSpace);
247      }
248
249      private long getSpace(FileStatus f) {
250        Long cached = directorySpaces.get(f);
251        if (cached != null) {
252          return cached;
253        }
254        try {
255          long space =
256              f.isDirectory() ? fs.getContentSummary(f.getPath()).getSpaceConsumed() : f.getLen();
257          directorySpaces.put(f, space);
258          return space;
259        } catch (IOException e) {
260          LOG.trace("Failed to get space consumed by path={}", f, e);
261          return -1;
262        }
263      }
264    });
265  }
266
267  /**
268   * Run the given files through each of the cleaners to see if it should be deleted, deleting it if
269   * necessary.
270   * @param files List of FileStatus for the files to check (and possibly delete)
271   * @return true iff successfully deleted all files
272   */
273  private boolean checkAndDeleteFiles(List<FileStatus> files) {
274    if (files == null) {
275      return true;
276    }
277
278    // first check to see if the path is valid
279    List<FileStatus> validFiles = Lists.newArrayListWithCapacity(files.size());
280    List<FileStatus> invalidFiles = Lists.newArrayList();
281    for (FileStatus file : files) {
282      if (validate(file.getPath())) {
283        validFiles.add(file);
284      } else {
285        LOG.warn("Found a wrongly formatted file: " + file.getPath() + " - will delete it.");
286        invalidFiles.add(file);
287      }
288    }
289
290    Iterable<FileStatus> deletableValidFiles = validFiles;
291    // check each of the cleaners for the valid files
292    for (T cleaner : cleanersChain) {
293      if (cleaner.isStopped() || this.getStopper().isStopped()) {
294        LOG.warn("A file cleaner" + this.getName() + " is stopped, won't delete any more files in:"
295            + this.oldFileDir);
296        return false;
297      }
298
299      Iterable<FileStatus> filteredFiles = cleaner.getDeletableFiles(deletableValidFiles);
300
301      // trace which cleaner is holding on to each file
302      if (LOG.isTraceEnabled()) {
303        ImmutableSet<FileStatus> filteredFileSet = ImmutableSet.copyOf(filteredFiles);
304        for (FileStatus file : deletableValidFiles) {
305          if (!filteredFileSet.contains(file)) {
306            LOG.trace(file.getPath() + " is not deletable according to:" + cleaner);
307          }
308        }
309      }
310
311      deletableValidFiles = filteredFiles;
312    }
313
314    Iterable<FileStatus> filesToDelete = Iterables.concat(invalidFiles, deletableValidFiles);
315    return deleteFiles(filesToDelete) == files.size();
316  }
317
318  /**
319   * Check if a empty directory with no subdirs or subfiles can be deleted
320   * @param dir Path of the directory
321   * @return True if the directory can be deleted, otherwise false
322   */
323  private boolean isEmptyDirDeletable(Path dir) {
324    for (T cleaner : cleanersChain) {
325      if (cleaner.isStopped() || this.getStopper().isStopped()) {
326        LOG.warn("A file cleaner {} is stopped, won't delete the empty directory {}",
327          this.getName(), dir);
328        return false;
329      }
330      if (!cleaner.isEmptyDirDeletable(dir)) {
331        // If one of the cleaner need the empty directory, skip delete it
332        return false;
333      }
334    }
335    return true;
336  }
337
338  /**
339   * Delete the given files
340   * @param filesToDelete files to delete
341   * @return number of deleted files
342   */
343  protected int deleteFiles(Iterable<FileStatus> filesToDelete) {
344    int deletedFileCount = 0;
345    for (FileStatus file : filesToDelete) {
346      Path filePath = file.getPath();
347      LOG.trace("Removing {} from archive", filePath);
348      try {
349        boolean success = this.fs.delete(filePath, false);
350        if (success) {
351          deletedFileCount++;
352        } else {
353          LOG.warn("Attempted to delete:" + filePath
354              + ", but couldn't. Run cleaner chain and attempt to delete on next pass.");
355        }
356      } catch (IOException e) {
357        e = e instanceof RemoteException ?
358                  ((RemoteException)e).unwrapRemoteException() : e;
359        LOG.warn("Error while deleting: " + filePath, e);
360      }
361    }
362    return deletedFileCount;
363  }
364
365  @Override
366  public synchronized void cleanup() {
367    for (T lc : this.cleanersChain) {
368      try {
369        lc.stop("Exiting");
370      } catch (Throwable t) {
371        LOG.warn("Stopping", t);
372      }
373    }
374  }
375
376  @VisibleForTesting
377  int getChorePoolSize() {
378    return pool.getSize();
379  }
380
381  /**
382   * @param enabled
383   */
384  public boolean setEnabled(final boolean enabled) {
385    return this.enabled.getAndSet(enabled);
386  }
387
388  public boolean getEnabled() { return this.enabled.get();
389  }
390
391  private interface Action<T> {
392    T act() throws Exception;
393  }
394
395  /**
396   * Attempts to clean up a directory(its subdirectories, and files) in a
397   * {@link java.util.concurrent.ThreadPoolExecutor} concurrently. We can get the final result by
398   * calling result.get().
399   */
400  private void traverseAndDelete(Path dir, boolean root, CompletableFuture<Boolean> result) {
401    try {
402      // Step.1: List all files under the given directory.
403      List<FileStatus> allPaths = Arrays.asList(fs.listStatus(dir));
404      List<FileStatus> subDirs =
405          allPaths.stream().filter(FileStatus::isDirectory).collect(Collectors.toList());
406      List<FileStatus> files =
407          allPaths.stream().filter(FileStatus::isFile).collect(Collectors.toList());
408
409      // Step.2: Try to delete all the deletable files.
410      boolean allFilesDeleted =
411          files.isEmpty() || deleteAction(() -> checkAndDeleteFiles(files), "files", dir);
412
413      // Step.3: Start to traverse and delete the sub-directories.
414      List<CompletableFuture<Boolean>> futures = new ArrayList<>();
415      if (!subDirs.isEmpty()) {
416        sortByConsumedSpace(subDirs);
417        // Submit the request of sub-directory deletion.
418        subDirs.forEach(subDir -> {
419          CompletableFuture<Boolean> subFuture = new CompletableFuture<>();
420          pool.execute(() -> traverseAndDelete(subDir.getPath(), false, subFuture));
421          futures.add(subFuture);
422        });
423      }
424
425      // Step.4: Once all sub-files & sub-directories are deleted, then can try to delete the
426      // current directory asynchronously.
427      FutureUtils.addListener(
428        CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])),
429        (voidObj, e) -> {
430          if (e != null) {
431            result.completeExceptionally(e);
432            return;
433          }
434          try {
435            boolean allSubDirsDeleted = futures.stream().allMatch(CompletableFuture::join);
436            boolean deleted = allFilesDeleted && allSubDirsDeleted && isEmptyDirDeletable(dir);
437            if (deleted && !root) {
438              // If and only if files and sub-dirs under current dir are deleted successfully, and
439              // the empty directory can be deleted, and it is not the root dir then task will
440              // try to delete it.
441              deleted = deleteAction(() -> fs.delete(dir, false), "dir", dir);
442            }
443            result.complete(deleted);
444          } catch (Exception ie) {
445            // Must handle the inner exception here, otherwise the result may get stuck if one
446            // sub-directory get some failure.
447            result.completeExceptionally(ie);
448          }
449        });
450    } catch (Exception e) {
451      LOG.debug("Failed to traverse and delete the path: {}", dir, e);
452      result.completeExceptionally(e);
453    }
454  }
455
456  /**
457   * Perform a delete on a specified type.
458   * @param deletion a delete
459   * @param type possible values are 'files', 'subdirs', 'dirs'
460   * @return true if it deleted successfully, false otherwise
461   */
462  private boolean deleteAction(Action<Boolean> deletion, String type, Path dir) {
463    boolean deleted;
464    try {
465      LOG.trace("Start deleting {} under {}", type, dir);
466      deleted = deletion.act();
467    } catch (PathIsNotEmptyDirectoryException exception) {
468      // N.B. HDFS throws this exception when we try to delete a non-empty directory, but
469      // LocalFileSystem throws a bare IOException. So some test code will get the verbose
470      // message below.
471      LOG.debug("Couldn't delete '{}' yet because it isn't empty w/exception.", dir, exception);
472      deleted = false;
473    } catch (IOException ioe) {
474      LOG.info("Could not delete {} under {}. might be transient; we'll retry. if it keeps "
475          + "happening, use following exception when asking on mailing list.",
476        type, dir, ioe);
477      deleted = false;
478    } catch (Exception e) {
479      LOG.info("unexpected exception: ", e);
480      deleted = false;
481    }
482    LOG.trace("Finish deleting {} under {}, deleted=", type, dir, deleted);
483    return deleted;
484  }
485}